Whamcloud - gitweb
21c13c9ad929cb3d12287619f773c5cde752409e
[fs/lustre-release.git] / lnet / lnet / lib-move.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lnet/lnet/lib-move.c
33  *
34  * Data movement routines
35  */
36
37 #define DEBUG_SUBSYSTEM S_LNET
38
39 #include <linux/pagemap.h>
40
41 #include <lnet/lib-lnet.h>
42 #include <linux/nsproxy.h>
43 #include <net/net_namespace.h>
44
45 extern unsigned int lnet_current_net_count;
46
47 static int local_nid_dist_zero = 1;
48 module_param(local_nid_dist_zero, int, 0444);
49 MODULE_PARM_DESC(local_nid_dist_zero, "Reserved");
50
51 struct lnet_send_data {
52         struct lnet_ni *sd_best_ni;
53         struct lnet_peer_ni *sd_best_lpni;
54         struct lnet_peer_ni *sd_final_dst_lpni;
55         struct lnet_peer *sd_peer;
56         struct lnet_peer *sd_gw_peer;
57         struct lnet_peer_ni *sd_gw_lpni;
58         struct lnet_peer_net *sd_peer_net;
59         struct lnet_msg *sd_msg;
60         lnet_nid_t sd_dst_nid;
61         lnet_nid_t sd_src_nid;
62         lnet_nid_t sd_rtr_nid;
63         int sd_cpt;
64         int sd_md_cpt;
65         __u32 sd_send_case;
66 };
67
68 static inline struct lnet_comm_count *
69 get_stats_counts(struct lnet_element_stats *stats,
70                  enum lnet_stats_type stats_type)
71 {
72         switch (stats_type) {
73         case LNET_STATS_TYPE_SEND:
74                 return &stats->el_send_stats;
75         case LNET_STATS_TYPE_RECV:
76                 return &stats->el_recv_stats;
77         case LNET_STATS_TYPE_DROP:
78                 return &stats->el_drop_stats;
79         default:
80                 CERROR("Unknown stats type\n");
81         }
82
83         return NULL;
84 }
85
86 void lnet_incr_stats(struct lnet_element_stats *stats,
87                      enum lnet_msg_type msg_type,
88                      enum lnet_stats_type stats_type)
89 {
90         struct lnet_comm_count *counts = get_stats_counts(stats, stats_type);
91         if (!counts)
92                 return;
93
94         switch (msg_type) {
95         case LNET_MSG_ACK:
96                 atomic_inc(&counts->co_ack_count);
97                 break;
98         case LNET_MSG_PUT:
99                 atomic_inc(&counts->co_put_count);
100                 break;
101         case LNET_MSG_GET:
102                 atomic_inc(&counts->co_get_count);
103                 break;
104         case LNET_MSG_REPLY:
105                 atomic_inc(&counts->co_reply_count);
106                 break;
107         case LNET_MSG_HELLO:
108                 atomic_inc(&counts->co_hello_count);
109                 break;
110         default:
111                 CERROR("There is a BUG in the code. Unknown message type\n");
112                 break;
113         }
114 }
115
116 __u32 lnet_sum_stats(struct lnet_element_stats *stats,
117                      enum lnet_stats_type stats_type)
118 {
119         struct lnet_comm_count *counts = get_stats_counts(stats, stats_type);
120         if (!counts)
121                 return 0;
122
123         return (atomic_read(&counts->co_ack_count) +
124                 atomic_read(&counts->co_put_count) +
125                 atomic_read(&counts->co_get_count) +
126                 atomic_read(&counts->co_reply_count) +
127                 atomic_read(&counts->co_hello_count));
128 }
129
130 static inline void assign_stats(struct lnet_ioctl_comm_count *msg_stats,
131                                 struct lnet_comm_count *counts)
132 {
133         msg_stats->ico_get_count = atomic_read(&counts->co_get_count);
134         msg_stats->ico_put_count = atomic_read(&counts->co_put_count);
135         msg_stats->ico_reply_count = atomic_read(&counts->co_reply_count);
136         msg_stats->ico_ack_count = atomic_read(&counts->co_ack_count);
137         msg_stats->ico_hello_count = atomic_read(&counts->co_hello_count);
138 }
139
140 void lnet_usr_translate_stats(struct lnet_ioctl_element_msg_stats *msg_stats,
141                               struct lnet_element_stats *stats)
142 {
143         struct lnet_comm_count *counts;
144
145         LASSERT(msg_stats);
146         LASSERT(stats);
147
148         counts = get_stats_counts(stats, LNET_STATS_TYPE_SEND);
149         if (!counts)
150                 return;
151         assign_stats(&msg_stats->im_send_stats, counts);
152
153         counts = get_stats_counts(stats, LNET_STATS_TYPE_RECV);
154         if (!counts)
155                 return;
156         assign_stats(&msg_stats->im_recv_stats, counts);
157
158         counts = get_stats_counts(stats, LNET_STATS_TYPE_DROP);
159         if (!counts)
160                 return;
161         assign_stats(&msg_stats->im_drop_stats, counts);
162 }
163
164 int
165 lnet_fail_nid(lnet_nid_t nid, unsigned int threshold)
166 {
167         struct lnet_test_peer *tp;
168         struct list_head *el;
169         struct list_head *next;
170         struct list_head  cull;
171
172         /* NB: use lnet_net_lock(0) to serialize operations on test peers */
173         if (threshold != 0) {
174                 /* Adding a new entry */
175                 LIBCFS_ALLOC(tp, sizeof(*tp));
176                 if (tp == NULL)
177                         return -ENOMEM;
178
179                 tp->tp_nid = nid;
180                 tp->tp_threshold = threshold;
181
182                 lnet_net_lock(0);
183                 list_add_tail(&tp->tp_list, &the_lnet.ln_test_peers);
184                 lnet_net_unlock(0);
185                 return 0;
186         }
187
188         /* removing entries */
189         INIT_LIST_HEAD(&cull);
190
191         lnet_net_lock(0);
192
193         list_for_each_safe(el, next, &the_lnet.ln_test_peers) {
194                 tp = list_entry(el, struct lnet_test_peer, tp_list);
195
196                 if (tp->tp_threshold == 0 ||    /* needs culling anyway */
197                     nid == LNET_NID_ANY ||      /* removing all entries */
198                     tp->tp_nid == nid) {        /* matched this one */
199                         list_del(&tp->tp_list);
200                         list_add(&tp->tp_list, &cull);
201                 }
202         }
203
204         lnet_net_unlock(0);
205
206         while (!list_empty(&cull)) {
207                 tp = list_entry(cull.next, struct lnet_test_peer, tp_list);
208
209                 list_del(&tp->tp_list);
210                 LIBCFS_FREE(tp, sizeof(*tp));
211         }
212         return 0;
213 }
214
215 static int
216 fail_peer (lnet_nid_t nid, int outgoing)
217 {
218         struct lnet_test_peer *tp;
219         struct list_head *el;
220         struct list_head *next;
221         struct list_head  cull;
222         int               fail = 0;
223
224         INIT_LIST_HEAD(&cull);
225
226         /* NB: use lnet_net_lock(0) to serialize operations on test peers */
227         lnet_net_lock(0);
228
229         list_for_each_safe(el, next, &the_lnet.ln_test_peers) {
230                 tp = list_entry(el, struct lnet_test_peer, tp_list);
231
232                 if (tp->tp_threshold == 0) {
233                         /* zombie entry */
234                         if (outgoing) {
235                                 /* only cull zombies on outgoing tests,
236                                  * since we may be at interrupt priority on
237                                  * incoming messages. */
238                                 list_del(&tp->tp_list);
239                                 list_add(&tp->tp_list, &cull);
240                         }
241                         continue;
242                 }
243
244                 if (tp->tp_nid == LNET_NID_ANY ||       /* fail every peer */
245                     nid == tp->tp_nid) {                /* fail this peer */
246                         fail = 1;
247
248                         if (tp->tp_threshold != LNET_MD_THRESH_INF) {
249                                 tp->tp_threshold--;
250                                 if (outgoing &&
251                                     tp->tp_threshold == 0) {
252                                         /* see above */
253                                         list_del(&tp->tp_list);
254                                         list_add(&tp->tp_list, &cull);
255                                 }
256                         }
257                         break;
258                 }
259         }
260
261         lnet_net_unlock(0);
262
263         while (!list_empty(&cull)) {
264                 tp = list_entry(cull.next, struct lnet_test_peer, tp_list);
265                 list_del(&tp->tp_list);
266
267                 LIBCFS_FREE(tp, sizeof(*tp));
268         }
269
270         return fail;
271 }
272
273 unsigned int
274 lnet_iov_nob(unsigned int niov, struct kvec *iov)
275 {
276         unsigned int nob = 0;
277
278         LASSERT(niov == 0 || iov != NULL);
279         while (niov-- > 0)
280                 nob += (iov++)->iov_len;
281
282         return (nob);
283 }
284 EXPORT_SYMBOL(lnet_iov_nob);
285
286 void
287 lnet_copy_iov2iov(unsigned int ndiov, struct kvec *diov, unsigned int doffset,
288                   unsigned int nsiov, struct kvec *siov, unsigned int soffset,
289                   unsigned int nob)
290 {
291         /* NB diov, siov are READ-ONLY */
292         unsigned int  this_nob;
293
294         if (nob == 0)
295                 return;
296
297         /* skip complete frags before 'doffset' */
298         LASSERT(ndiov > 0);
299         while (doffset >= diov->iov_len) {
300                 doffset -= diov->iov_len;
301                 diov++;
302                 ndiov--;
303                 LASSERT(ndiov > 0);
304         }
305
306         /* skip complete frags before 'soffset' */
307         LASSERT(nsiov > 0);
308         while (soffset >= siov->iov_len) {
309                 soffset -= siov->iov_len;
310                 siov++;
311                 nsiov--;
312                 LASSERT(nsiov > 0);
313         }
314
315         do {
316                 LASSERT(ndiov > 0);
317                 LASSERT(nsiov > 0);
318                 this_nob = MIN(diov->iov_len - doffset,
319                                siov->iov_len - soffset);
320                 this_nob = MIN(this_nob, nob);
321
322                 memcpy((char *)diov->iov_base + doffset,
323                        (char *)siov->iov_base + soffset, this_nob);
324                 nob -= this_nob;
325
326                 if (diov->iov_len > doffset + this_nob) {
327                         doffset += this_nob;
328                 } else {
329                         diov++;
330                         ndiov--;
331                         doffset = 0;
332                 }
333
334                 if (siov->iov_len > soffset + this_nob) {
335                         soffset += this_nob;
336                 } else {
337                         siov++;
338                         nsiov--;
339                         soffset = 0;
340                 }
341         } while (nob > 0);
342 }
343 EXPORT_SYMBOL(lnet_copy_iov2iov);
344
345 int
346 lnet_extract_iov(int dst_niov, struct kvec *dst,
347                  int src_niov, struct kvec *src,
348                  unsigned int offset, unsigned int len)
349 {
350         /* Initialise 'dst' to the subset of 'src' starting at 'offset',
351          * for exactly 'len' bytes, and return the number of entries.
352          * NB not destructive to 'src' */
353         unsigned int    frag_len;
354         unsigned int    niov;
355
356         if (len == 0)                           /* no data => */
357                 return (0);                     /* no frags */
358
359         LASSERT(src_niov > 0);
360         while (offset >= src->iov_len) {      /* skip initial frags */
361                 offset -= src->iov_len;
362                 src_niov--;
363                 src++;
364                 LASSERT(src_niov > 0);
365         }
366
367         niov = 1;
368         for (;;) {
369                 LASSERT(src_niov > 0);
370                 LASSERT((int)niov <= dst_niov);
371
372                 frag_len = src->iov_len - offset;
373                 dst->iov_base = ((char *)src->iov_base) + offset;
374
375                 if (len <= frag_len) {
376                         dst->iov_len = len;
377                         return (niov);
378                 }
379
380                 dst->iov_len = frag_len;
381
382                 len -= frag_len;
383                 dst++;
384                 src++;
385                 niov++;
386                 src_niov--;
387                 offset = 0;
388         }
389 }
390 EXPORT_SYMBOL(lnet_extract_iov);
391
392
393 unsigned int
394 lnet_kiov_nob(unsigned int niov, lnet_kiov_t *kiov)
395 {
396         unsigned int  nob = 0;
397
398         LASSERT(niov == 0 || kiov != NULL);
399         while (niov-- > 0)
400                 nob += (kiov++)->kiov_len;
401
402         return (nob);
403 }
404 EXPORT_SYMBOL(lnet_kiov_nob);
405
406 void
407 lnet_copy_kiov2kiov(unsigned int ndiov, lnet_kiov_t *diov, unsigned int doffset,
408                     unsigned int nsiov, lnet_kiov_t *siov, unsigned int soffset,
409                     unsigned int nob)
410 {
411         /* NB diov, siov are READ-ONLY */
412         unsigned int    this_nob;
413         char           *daddr = NULL;
414         char           *saddr = NULL;
415
416         if (nob == 0)
417                 return;
418
419         LASSERT (!in_interrupt ());
420
421         LASSERT (ndiov > 0);
422         while (doffset >= diov->kiov_len) {
423                 doffset -= diov->kiov_len;
424                 diov++;
425                 ndiov--;
426                 LASSERT(ndiov > 0);
427         }
428
429         LASSERT(nsiov > 0);
430         while (soffset >= siov->kiov_len) {
431                 soffset -= siov->kiov_len;
432                 siov++;
433                 nsiov--;
434                 LASSERT(nsiov > 0);
435         }
436
437         do {
438                 LASSERT(ndiov > 0);
439                 LASSERT(nsiov > 0);
440                 this_nob = MIN(diov->kiov_len - doffset,
441                                siov->kiov_len - soffset);
442                 this_nob = MIN(this_nob, nob);
443
444                 if (daddr == NULL)
445                         daddr = ((char *)kmap(diov->kiov_page)) +
446                                 diov->kiov_offset + doffset;
447                 if (saddr == NULL)
448                         saddr = ((char *)kmap(siov->kiov_page)) +
449                                 siov->kiov_offset + soffset;
450
451                 /* Vanishing risk of kmap deadlock when mapping 2 pages.
452                  * However in practice at least one of the kiovs will be mapped
453                  * kernel pages and the map/unmap will be NOOPs */
454
455                 memcpy (daddr, saddr, this_nob);
456                 nob -= this_nob;
457
458                 if (diov->kiov_len > doffset + this_nob) {
459                         daddr += this_nob;
460                         doffset += this_nob;
461                 } else {
462                         kunmap(diov->kiov_page);
463                         daddr = NULL;
464                         diov++;
465                         ndiov--;
466                         doffset = 0;
467                 }
468
469                 if (siov->kiov_len > soffset + this_nob) {
470                         saddr += this_nob;
471                         soffset += this_nob;
472                 } else {
473                         kunmap(siov->kiov_page);
474                         saddr = NULL;
475                         siov++;
476                         nsiov--;
477                         soffset = 0;
478                 }
479         } while (nob > 0);
480
481         if (daddr != NULL)
482                 kunmap(diov->kiov_page);
483         if (saddr != NULL)
484                 kunmap(siov->kiov_page);
485 }
486 EXPORT_SYMBOL(lnet_copy_kiov2kiov);
487
488 void
489 lnet_copy_kiov2iov (unsigned int niov, struct kvec *iov, unsigned int iovoffset,
490                     unsigned int nkiov, lnet_kiov_t *kiov, unsigned int kiovoffset,
491                     unsigned int nob)
492 {
493         /* NB iov, kiov are READ-ONLY */
494         unsigned int    this_nob;
495         char           *addr = NULL;
496
497         if (nob == 0)
498                 return;
499
500         LASSERT (!in_interrupt ());
501
502         LASSERT (niov > 0);
503         while (iovoffset >= iov->iov_len) {
504                 iovoffset -= iov->iov_len;
505                 iov++;
506                 niov--;
507                 LASSERT(niov > 0);
508         }
509
510         LASSERT(nkiov > 0);
511         while (kiovoffset >= kiov->kiov_len) {
512                 kiovoffset -= kiov->kiov_len;
513                 kiov++;
514                 nkiov--;
515                 LASSERT(nkiov > 0);
516         }
517
518         do {
519                 LASSERT(niov > 0);
520                 LASSERT(nkiov > 0);
521                 this_nob = MIN(iov->iov_len - iovoffset,
522                                kiov->kiov_len - kiovoffset);
523                 this_nob = MIN(this_nob, nob);
524
525                 if (addr == NULL)
526                         addr = ((char *)kmap(kiov->kiov_page)) +
527                                 kiov->kiov_offset + kiovoffset;
528
529                 memcpy((char *)iov->iov_base + iovoffset, addr, this_nob);
530                 nob -= this_nob;
531
532                 if (iov->iov_len > iovoffset + this_nob) {
533                         iovoffset += this_nob;
534                 } else {
535                         iov++;
536                         niov--;
537                         iovoffset = 0;
538                 }
539
540                 if (kiov->kiov_len > kiovoffset + this_nob) {
541                         addr += this_nob;
542                         kiovoffset += this_nob;
543                 } else {
544                         kunmap(kiov->kiov_page);
545                         addr = NULL;
546                         kiov++;
547                         nkiov--;
548                         kiovoffset = 0;
549                 }
550
551         } while (nob > 0);
552
553         if (addr != NULL)
554                 kunmap(kiov->kiov_page);
555 }
556 EXPORT_SYMBOL(lnet_copy_kiov2iov);
557
558 void
559 lnet_copy_iov2kiov(unsigned int nkiov, lnet_kiov_t *kiov, unsigned int kiovoffset,
560                    unsigned int niov, struct kvec *iov, unsigned int iovoffset,
561                    unsigned int nob)
562 {
563         /* NB kiov, iov are READ-ONLY */
564         unsigned int    this_nob;
565         char           *addr = NULL;
566
567         if (nob == 0)
568                 return;
569
570         LASSERT (!in_interrupt ());
571
572         LASSERT (nkiov > 0);
573         while (kiovoffset >= kiov->kiov_len) {
574                 kiovoffset -= kiov->kiov_len;
575                 kiov++;
576                 nkiov--;
577                 LASSERT(nkiov > 0);
578         }
579
580         LASSERT(niov > 0);
581         while (iovoffset >= iov->iov_len) {
582                 iovoffset -= iov->iov_len;
583                 iov++;
584                 niov--;
585                 LASSERT(niov > 0);
586         }
587
588         do {
589                 LASSERT(nkiov > 0);
590                 LASSERT(niov > 0);
591                 this_nob = MIN(kiov->kiov_len - kiovoffset,
592                                iov->iov_len - iovoffset);
593                 this_nob = MIN(this_nob, nob);
594
595                 if (addr == NULL)
596                         addr = ((char *)kmap(kiov->kiov_page)) +
597                                 kiov->kiov_offset + kiovoffset;
598
599                 memcpy (addr, (char *)iov->iov_base + iovoffset, this_nob);
600                 nob -= this_nob;
601
602                 if (kiov->kiov_len > kiovoffset + this_nob) {
603                         addr += this_nob;
604                         kiovoffset += this_nob;
605                 } else {
606                         kunmap(kiov->kiov_page);
607                         addr = NULL;
608                         kiov++;
609                         nkiov--;
610                         kiovoffset = 0;
611                 }
612
613                 if (iov->iov_len > iovoffset + this_nob) {
614                         iovoffset += this_nob;
615                 } else {
616                         iov++;
617                         niov--;
618                         iovoffset = 0;
619                 }
620         } while (nob > 0);
621
622         if (addr != NULL)
623                 kunmap(kiov->kiov_page);
624 }
625 EXPORT_SYMBOL(lnet_copy_iov2kiov);
626
627 int
628 lnet_extract_kiov(int dst_niov, lnet_kiov_t *dst,
629                   int src_niov, lnet_kiov_t *src,
630                   unsigned int offset, unsigned int len)
631 {
632         /* Initialise 'dst' to the subset of 'src' starting at 'offset',
633          * for exactly 'len' bytes, and return the number of entries.
634          * NB not destructive to 'src' */
635         unsigned int    frag_len;
636         unsigned int    niov;
637
638         if (len == 0)                           /* no data => */
639                 return (0);                     /* no frags */
640
641         LASSERT(src_niov > 0);
642         while (offset >= src->kiov_len) {      /* skip initial frags */
643                 offset -= src->kiov_len;
644                 src_niov--;
645                 src++;
646                 LASSERT(src_niov > 0);
647         }
648
649         niov = 1;
650         for (;;) {
651                 LASSERT(src_niov > 0);
652                 LASSERT((int)niov <= dst_niov);
653
654                 frag_len = src->kiov_len - offset;
655                 dst->kiov_page = src->kiov_page;
656                 dst->kiov_offset = src->kiov_offset + offset;
657
658                 if (len <= frag_len) {
659                         dst->kiov_len = len;
660                         LASSERT(dst->kiov_offset + dst->kiov_len <= PAGE_SIZE);
661                         return niov;
662                 }
663
664                 dst->kiov_len = frag_len;
665                 LASSERT(dst->kiov_offset + dst->kiov_len <= PAGE_SIZE);
666
667                 len -= frag_len;
668                 dst++;
669                 src++;
670                 niov++;
671                 src_niov--;
672                 offset = 0;
673         }
674 }
675 EXPORT_SYMBOL(lnet_extract_kiov);
676
677 void
678 lnet_ni_recv(struct lnet_ni *ni, void *private, struct lnet_msg *msg,
679              int delayed, unsigned int offset, unsigned int mlen,
680              unsigned int rlen)
681 {
682         unsigned int  niov = 0;
683         struct kvec *iov = NULL;
684         lnet_kiov_t  *kiov = NULL;
685         int           rc;
686
687         LASSERT (!in_interrupt ());
688         LASSERT (mlen == 0 || msg != NULL);
689
690         if (msg != NULL) {
691                 LASSERT(msg->msg_receiving);
692                 LASSERT(!msg->msg_sending);
693                 LASSERT(rlen == msg->msg_len);
694                 LASSERT(mlen <= msg->msg_len);
695                 LASSERT(msg->msg_offset == offset);
696                 LASSERT(msg->msg_wanted == mlen);
697
698                 msg->msg_receiving = 0;
699
700                 if (mlen != 0) {
701                         niov = msg->msg_niov;
702                         iov  = msg->msg_iov;
703                         kiov = msg->msg_kiov;
704
705                         LASSERT (niov > 0);
706                         LASSERT ((iov == NULL) != (kiov == NULL));
707                 }
708         }
709
710         rc = (ni->ni_net->net_lnd->lnd_recv)(ni, private, msg, delayed,
711                                              niov, iov, kiov, offset, mlen,
712                                              rlen);
713         if (rc < 0)
714                 lnet_finalize(msg, rc);
715 }
716
717 static void
718 lnet_setpayloadbuffer(struct lnet_msg *msg)
719 {
720         struct lnet_libmd *md = msg->msg_md;
721
722         LASSERT(msg->msg_len > 0);
723         LASSERT(!msg->msg_routing);
724         LASSERT(md != NULL);
725         LASSERT(msg->msg_niov == 0);
726         LASSERT(msg->msg_iov == NULL);
727         LASSERT(msg->msg_kiov == NULL);
728
729         msg->msg_niov = md->md_niov;
730         if ((md->md_options & LNET_MD_KIOV) != 0)
731                 msg->msg_kiov = md->md_iov.kiov;
732         else
733                 msg->msg_iov = md->md_iov.iov;
734 }
735
736 void
737 lnet_prep_send(struct lnet_msg *msg, int type, struct lnet_process_id target,
738                unsigned int offset, unsigned int len)
739 {
740         msg->msg_type = type;
741         msg->msg_target = target;
742         msg->msg_len = len;
743         msg->msg_offset = offset;
744
745         if (len != 0)
746                 lnet_setpayloadbuffer(msg);
747
748         memset (&msg->msg_hdr, 0, sizeof (msg->msg_hdr));
749         msg->msg_hdr.type           = cpu_to_le32(type);
750         /* dest_nid will be overwritten by lnet_select_pathway() */
751         msg->msg_hdr.dest_nid       = cpu_to_le64(target.nid);
752         msg->msg_hdr.dest_pid       = cpu_to_le32(target.pid);
753         /* src_nid will be set later */
754         msg->msg_hdr.src_pid        = cpu_to_le32(the_lnet.ln_pid);
755         msg->msg_hdr.payload_length = cpu_to_le32(len);
756 }
757
758 static void
759 lnet_ni_send(struct lnet_ni *ni, struct lnet_msg *msg)
760 {
761         void   *priv = msg->msg_private;
762         int rc;
763
764         LASSERT (!in_interrupt ());
765         LASSERT (LNET_NETTYP(LNET_NIDNET(ni->ni_nid)) == LOLND ||
766                  (msg->msg_txcredit && msg->msg_peertxcredit));
767
768         rc = (ni->ni_net->net_lnd->lnd_send)(ni, priv, msg);
769         if (rc < 0) {
770                 msg->msg_no_resend = true;
771                 lnet_finalize(msg, rc);
772         }
773 }
774
775 static int
776 lnet_ni_eager_recv(struct lnet_ni *ni, struct lnet_msg *msg)
777 {
778         int     rc;
779
780         LASSERT(!msg->msg_sending);
781         LASSERT(msg->msg_receiving);
782         LASSERT(!msg->msg_rx_ready_delay);
783         LASSERT(ni->ni_net->net_lnd->lnd_eager_recv != NULL);
784
785         msg->msg_rx_ready_delay = 1;
786         rc = (ni->ni_net->net_lnd->lnd_eager_recv)(ni, msg->msg_private, msg,
787                                                   &msg->msg_private);
788         if (rc != 0) {
789                 CERROR("recv from %s / send to %s aborted: "
790                        "eager_recv failed %d\n",
791                        libcfs_nid2str(msg->msg_rxpeer->lpni_nid),
792                        libcfs_id2str(msg->msg_target), rc);
793                 LASSERT(rc < 0); /* required by my callers */
794         }
795
796         return rc;
797 }
798
799 /* NB: returns 1 when alive, 0 when dead, negative when error;
800  *     may drop the lnet_net_lock */
801 static int
802 lnet_peer_alive_locked(struct lnet_ni *ni, struct lnet_peer_ni *lpni,
803                        struct lnet_msg *msg)
804 {
805         if (!lnet_peer_aliveness_enabled(lpni))
806                 return -ENODEV;
807
808         /*
809          * If we're resending a message, let's attempt to send it even if
810          * the peer is down to fulfill our resend quota on the message
811          */
812         if (msg->msg_retry_count > 0)
813                 return 1;
814
815         /* try and send recovery messages irregardless */
816         if (msg->msg_recovery)
817                 return 1;
818
819         /* always send any responses */
820         if (msg->msg_type == LNET_MSG_ACK ||
821             msg->msg_type == LNET_MSG_REPLY)
822                 return 1;
823
824         return lnet_is_peer_ni_alive(lpni);
825 }
826
827 /**
828  * \param msg The message to be sent.
829  * \param do_send True if lnet_ni_send() should be called in this function.
830  *        lnet_send() is going to lnet_net_unlock immediately after this, so
831  *        it sets do_send FALSE and I don't do the unlock/send/lock bit.
832  *
833  * \retval LNET_CREDIT_OK If \a msg sent or OK to send.
834  * \retval LNET_CREDIT_WAIT If \a msg blocked for credit.
835  * \retval -EHOSTUNREACH If the next hop of the message appears dead.
836  * \retval -ECANCELED If the MD of the message has been unlinked.
837  */
838 static int
839 lnet_post_send_locked(struct lnet_msg *msg, int do_send)
840 {
841         struct lnet_peer_ni     *lp = msg->msg_txpeer;
842         struct lnet_ni          *ni = msg->msg_txni;
843         int                     cpt = msg->msg_tx_cpt;
844         struct lnet_tx_queue    *tq = ni->ni_tx_queues[cpt];
845
846         /* non-lnet_send() callers have checked before */
847         LASSERT(!do_send || msg->msg_tx_delayed);
848         LASSERT(!msg->msg_receiving);
849         LASSERT(msg->msg_tx_committed);
850         /* can't get here if we're sending to the loopback interface */
851         LASSERT(lp->lpni_nid != the_lnet.ln_loni->ni_nid);
852
853         /* NB 'lp' is always the next hop */
854         if ((msg->msg_target.pid & LNET_PID_USERFLAG) == 0 &&
855             lnet_peer_alive_locked(ni, lp, msg) == 0) {
856                 the_lnet.ln_counters[cpt]->lct_common.lcc_drop_count++;
857                 the_lnet.ln_counters[cpt]->lct_common.lcc_drop_length +=
858                         msg->msg_len;
859                 lnet_net_unlock(cpt);
860                 if (msg->msg_txpeer)
861                         lnet_incr_stats(&msg->msg_txpeer->lpni_stats,
862                                         msg->msg_type,
863                                         LNET_STATS_TYPE_DROP);
864                 if (msg->msg_txni)
865                         lnet_incr_stats(&msg->msg_txni->ni_stats,
866                                         msg->msg_type,
867                                         LNET_STATS_TYPE_DROP);
868
869                 CNETERR("Dropping message for %s: peer not alive\n",
870                         libcfs_id2str(msg->msg_target));
871                 msg->msg_health_status = LNET_MSG_STATUS_REMOTE_DROPPED;
872                 if (do_send)
873                         lnet_finalize(msg, -EHOSTUNREACH);
874
875                 lnet_net_lock(cpt);
876                 return -EHOSTUNREACH;
877         }
878
879         if (msg->msg_md != NULL &&
880             (msg->msg_md->md_flags & LNET_MD_FLAG_ABORTED) != 0) {
881                 lnet_net_unlock(cpt);
882
883                 CNETERR("Aborting message for %s: LNetM[DE]Unlink() already "
884                         "called on the MD/ME.\n",
885                         libcfs_id2str(msg->msg_target));
886                 if (do_send) {
887                         msg->msg_no_resend = true;
888                         CDEBUG(D_NET, "msg %p to %s canceled and will not be resent\n",
889                                msg, libcfs_id2str(msg->msg_target));
890                         lnet_finalize(msg, -ECANCELED);
891                 }
892
893                 lnet_net_lock(cpt);
894                 return -ECANCELED;
895         }
896
897         if (!msg->msg_peertxcredit) {
898                 spin_lock(&lp->lpni_lock);
899                 LASSERT((lp->lpni_txcredits < 0) ==
900                         !list_empty(&lp->lpni_txq));
901
902                 msg->msg_peertxcredit = 1;
903                 lp->lpni_txqnob += msg->msg_len + sizeof(struct lnet_hdr);
904                 lp->lpni_txcredits--;
905
906                 if (lp->lpni_txcredits < lp->lpni_mintxcredits)
907                         lp->lpni_mintxcredits = lp->lpni_txcredits;
908
909                 if (lp->lpni_txcredits < 0) {
910                         msg->msg_tx_delayed = 1;
911                         list_add_tail(&msg->msg_list, &lp->lpni_txq);
912                         spin_unlock(&lp->lpni_lock);
913                         return LNET_CREDIT_WAIT;
914                 }
915                 spin_unlock(&lp->lpni_lock);
916         }
917
918         if (!msg->msg_txcredit) {
919                 LASSERT((tq->tq_credits < 0) ==
920                         !list_empty(&tq->tq_delayed));
921
922                 msg->msg_txcredit = 1;
923                 tq->tq_credits--;
924                 atomic_dec(&ni->ni_tx_credits);
925
926                 if (tq->tq_credits < tq->tq_credits_min)
927                         tq->tq_credits_min = tq->tq_credits;
928
929                 if (tq->tq_credits < 0) {
930                         msg->msg_tx_delayed = 1;
931                         list_add_tail(&msg->msg_list, &tq->tq_delayed);
932                         return LNET_CREDIT_WAIT;
933                 }
934         }
935
936         /* unset the tx_delay flag as we're going to send it now */
937         msg->msg_tx_delayed = 0;
938
939         if (do_send) {
940                 lnet_net_unlock(cpt);
941                 lnet_ni_send(ni, msg);
942                 lnet_net_lock(cpt);
943         }
944         return LNET_CREDIT_OK;
945 }
946
947
948 static struct lnet_rtrbufpool *
949 lnet_msg2bufpool(struct lnet_msg *msg)
950 {
951         struct lnet_rtrbufpool  *rbp;
952         int                     cpt;
953
954         LASSERT(msg->msg_rx_committed);
955
956         cpt = msg->msg_rx_cpt;
957         rbp = &the_lnet.ln_rtrpools[cpt][0];
958
959         LASSERT(msg->msg_len <= LNET_MTU);
960         while (msg->msg_len > (unsigned int)rbp->rbp_npages * PAGE_SIZE) {
961                 rbp++;
962                 LASSERT(rbp < &the_lnet.ln_rtrpools[cpt][LNET_NRBPOOLS]);
963         }
964
965         return rbp;
966 }
967
968 static int
969 lnet_post_routed_recv_locked(struct lnet_msg *msg, int do_recv)
970 {
971         /* lnet_parse is going to lnet_net_unlock immediately after this, so it
972          * sets do_recv FALSE and I don't do the unlock/send/lock bit.
973          * I return LNET_CREDIT_WAIT if msg blocked and LNET_CREDIT_OK if
974          * received or OK to receive */
975         struct lnet_peer_ni *lpni = msg->msg_rxpeer;
976         struct lnet_peer *lp;
977         struct lnet_rtrbufpool *rbp;
978         struct lnet_rtrbuf *rb;
979
980         LASSERT(msg->msg_iov == NULL);
981         LASSERT(msg->msg_kiov == NULL);
982         LASSERT(msg->msg_niov == 0);
983         LASSERT(msg->msg_routing);
984         LASSERT(msg->msg_receiving);
985         LASSERT(!msg->msg_sending);
986         LASSERT(lpni->lpni_peer_net);
987         LASSERT(lpni->lpni_peer_net->lpn_peer);
988
989         lp = lpni->lpni_peer_net->lpn_peer;
990
991         /* non-lnet_parse callers only receive delayed messages */
992         LASSERT(!do_recv || msg->msg_rx_delayed);
993
994         if (!msg->msg_peerrtrcredit) {
995                 /* lpni_lock protects the credit manipulation */
996                 spin_lock(&lpni->lpni_lock);
997                 /* lp_lock protects the lp_rtrq */
998                 spin_lock(&lp->lp_lock);
999
1000                 msg->msg_peerrtrcredit = 1;
1001                 lpni->lpni_rtrcredits--;
1002                 if (lpni->lpni_rtrcredits < lpni->lpni_minrtrcredits)
1003                         lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits;
1004
1005                 if (lpni->lpni_rtrcredits < 0) {
1006                         /* must have checked eager_recv before here */
1007                         LASSERT(msg->msg_rx_ready_delay);
1008                         msg->msg_rx_delayed = 1;
1009                         list_add_tail(&msg->msg_list, &lp->lp_rtrq);
1010                         spin_unlock(&lp->lp_lock);
1011                         spin_unlock(&lpni->lpni_lock);
1012                         return LNET_CREDIT_WAIT;
1013                 }
1014                 spin_unlock(&lp->lp_lock);
1015                 spin_unlock(&lpni->lpni_lock);
1016         }
1017
1018         rbp = lnet_msg2bufpool(msg);
1019
1020         if (!msg->msg_rtrcredit) {
1021                 msg->msg_rtrcredit = 1;
1022                 rbp->rbp_credits--;
1023                 if (rbp->rbp_credits < rbp->rbp_mincredits)
1024                         rbp->rbp_mincredits = rbp->rbp_credits;
1025
1026                 if (rbp->rbp_credits < 0) {
1027                         /* must have checked eager_recv before here */
1028                         LASSERT(msg->msg_rx_ready_delay);
1029                         msg->msg_rx_delayed = 1;
1030                         list_add_tail(&msg->msg_list, &rbp->rbp_msgs);
1031                         return LNET_CREDIT_WAIT;
1032                 }
1033         }
1034
1035         LASSERT(!list_empty(&rbp->rbp_bufs));
1036         rb = list_entry(rbp->rbp_bufs.next, struct lnet_rtrbuf, rb_list);
1037         list_del(&rb->rb_list);
1038
1039         msg->msg_niov = rbp->rbp_npages;
1040         msg->msg_kiov = &rb->rb_kiov[0];
1041
1042         /* unset the msg-rx_delayed flag since we're receiving the message */
1043         msg->msg_rx_delayed = 0;
1044
1045         if (do_recv) {
1046                 int cpt = msg->msg_rx_cpt;
1047
1048                 lnet_net_unlock(cpt);
1049                 lnet_ni_recv(msg->msg_rxni, msg->msg_private, msg, 1,
1050                              0, msg->msg_len, msg->msg_len);
1051                 lnet_net_lock(cpt);
1052         }
1053         return LNET_CREDIT_OK;
1054 }
1055
1056 void
1057 lnet_return_tx_credits_locked(struct lnet_msg *msg)
1058 {
1059         struct lnet_peer_ni     *txpeer = msg->msg_txpeer;
1060         struct lnet_ni          *txni = msg->msg_txni;
1061         struct lnet_msg         *msg2;
1062
1063         if (msg->msg_txcredit) {
1064                 struct lnet_ni       *ni = msg->msg_txni;
1065                 struct lnet_tx_queue *tq = ni->ni_tx_queues[msg->msg_tx_cpt];
1066
1067                 /* give back NI txcredits */
1068                 msg->msg_txcredit = 0;
1069
1070                 LASSERT((tq->tq_credits < 0) ==
1071                         !list_empty(&tq->tq_delayed));
1072
1073                 tq->tq_credits++;
1074                 atomic_inc(&ni->ni_tx_credits);
1075                 if (tq->tq_credits <= 0) {
1076                         msg2 = list_entry(tq->tq_delayed.next,
1077                                           struct lnet_msg, msg_list);
1078                         list_del(&msg2->msg_list);
1079
1080                         LASSERT(msg2->msg_txni == ni);
1081                         LASSERT(msg2->msg_tx_delayed);
1082                         LASSERT(msg2->msg_tx_cpt == msg->msg_tx_cpt);
1083
1084                         (void) lnet_post_send_locked(msg2, 1);
1085                 }
1086         }
1087
1088         if (msg->msg_peertxcredit) {
1089                 /* give back peer txcredits */
1090                 msg->msg_peertxcredit = 0;
1091
1092                 spin_lock(&txpeer->lpni_lock);
1093                 LASSERT((txpeer->lpni_txcredits < 0) ==
1094                         !list_empty(&txpeer->lpni_txq));
1095
1096                 txpeer->lpni_txqnob -= msg->msg_len + sizeof(struct lnet_hdr);
1097                 LASSERT(txpeer->lpni_txqnob >= 0);
1098
1099                 txpeer->lpni_txcredits++;
1100                 if (txpeer->lpni_txcredits <= 0) {
1101                         int msg2_cpt;
1102
1103                         msg2 = list_entry(txpeer->lpni_txq.next,
1104                                               struct lnet_msg, msg_list);
1105                         list_del(&msg2->msg_list);
1106                         spin_unlock(&txpeer->lpni_lock);
1107
1108                         LASSERT(msg2->msg_txpeer == txpeer);
1109                         LASSERT(msg2->msg_tx_delayed);
1110
1111                         msg2_cpt = msg2->msg_tx_cpt;
1112
1113                         /*
1114                          * The msg_cpt can be different from the msg2_cpt
1115                          * so we need to make sure we lock the correct cpt
1116                          * for msg2.
1117                          * Once we call lnet_post_send_locked() it is no
1118                          * longer safe to access msg2, since it could've
1119                          * been freed by lnet_finalize(), but we still
1120                          * need to relock the correct cpt, so we cache the
1121                          * msg2_cpt for the purpose of the check that
1122                          * follows the call to lnet_pose_send_locked().
1123                          */
1124                         if (msg2_cpt != msg->msg_tx_cpt) {
1125                                 lnet_net_unlock(msg->msg_tx_cpt);
1126                                 lnet_net_lock(msg2_cpt);
1127                         }
1128                         (void) lnet_post_send_locked(msg2, 1);
1129                         if (msg2_cpt != msg->msg_tx_cpt) {
1130                                 lnet_net_unlock(msg2_cpt);
1131                                 lnet_net_lock(msg->msg_tx_cpt);
1132                         }
1133                 } else {
1134                         spin_unlock(&txpeer->lpni_lock);
1135                 }
1136         }
1137
1138         if (txni != NULL) {
1139                 msg->msg_txni = NULL;
1140                 lnet_ni_decref_locked(txni, msg->msg_tx_cpt);
1141         }
1142
1143         if (txpeer != NULL) {
1144                 msg->msg_txpeer = NULL;
1145                 lnet_peer_ni_decref_locked(txpeer);
1146         }
1147 }
1148
1149 void
1150 lnet_schedule_blocked_locked(struct lnet_rtrbufpool *rbp)
1151 {
1152         struct lnet_msg *msg;
1153
1154         if (list_empty(&rbp->rbp_msgs))
1155                 return;
1156         msg = list_entry(rbp->rbp_msgs.next,
1157                          struct lnet_msg, msg_list);
1158         list_del(&msg->msg_list);
1159
1160         (void)lnet_post_routed_recv_locked(msg, 1);
1161 }
1162
1163 void
1164 lnet_drop_routed_msgs_locked(struct list_head *list, int cpt)
1165 {
1166         struct lnet_msg *msg;
1167         struct lnet_msg *tmp;
1168
1169         lnet_net_unlock(cpt);
1170
1171         list_for_each_entry_safe(msg, tmp, list, msg_list) {
1172                 lnet_ni_recv(msg->msg_rxni, msg->msg_private, NULL,
1173                              0, 0, 0, msg->msg_hdr.payload_length);
1174                 list_del_init(&msg->msg_list);
1175                 msg->msg_no_resend = true;
1176                 msg->msg_health_status = LNET_MSG_STATUS_REMOTE_ERROR;
1177                 lnet_finalize(msg, -ECANCELED);
1178         }
1179
1180         lnet_net_lock(cpt);
1181 }
1182
1183 void
1184 lnet_return_rx_credits_locked(struct lnet_msg *msg)
1185 {
1186         struct lnet_peer_ni *rxpeerni = msg->msg_rxpeer;
1187         struct lnet_peer *lp;
1188         struct lnet_ni *rxni = msg->msg_rxni;
1189         struct lnet_msg *msg2;
1190
1191         if (msg->msg_rtrcredit) {
1192                 /* give back global router credits */
1193                 struct lnet_rtrbuf *rb;
1194                 struct lnet_rtrbufpool *rbp;
1195
1196                 /* NB If a msg ever blocks for a buffer in rbp_msgs, it stays
1197                  * there until it gets one allocated, or aborts the wait
1198                  * itself */
1199                 LASSERT(msg->msg_kiov != NULL);
1200
1201                 rb = list_entry(msg->msg_kiov, struct lnet_rtrbuf, rb_kiov[0]);
1202                 rbp = rb->rb_pool;
1203
1204                 msg->msg_kiov = NULL;
1205                 msg->msg_rtrcredit = 0;
1206
1207                 LASSERT(rbp == lnet_msg2bufpool(msg));
1208
1209                 LASSERT((rbp->rbp_credits > 0) ==
1210                         !list_empty(&rbp->rbp_bufs));
1211
1212                 /* If routing is now turned off, we just drop this buffer and
1213                  * don't bother trying to return credits.  */
1214                 if (!the_lnet.ln_routing) {
1215                         lnet_destroy_rtrbuf(rb, rbp->rbp_npages);
1216                         goto routing_off;
1217                 }
1218
1219                 /* It is possible that a user has lowered the desired number of
1220                  * buffers in this pool.  Make sure we never put back
1221                  * more buffers than the stated number. */
1222                 if (unlikely(rbp->rbp_credits >= rbp->rbp_req_nbuffers)) {
1223                         /* Discard this buffer so we don't have too
1224                          * many. */
1225                         lnet_destroy_rtrbuf(rb, rbp->rbp_npages);
1226                         rbp->rbp_nbuffers--;
1227                 } else {
1228                         list_add(&rb->rb_list, &rbp->rbp_bufs);
1229                         rbp->rbp_credits++;
1230                         if (rbp->rbp_credits <= 0)
1231                                 lnet_schedule_blocked_locked(rbp);
1232                 }
1233         }
1234
1235 routing_off:
1236         if (msg->msg_peerrtrcredit) {
1237                 LASSERT(rxpeerni);
1238                 LASSERT(rxpeerni->lpni_peer_net);
1239                 LASSERT(rxpeerni->lpni_peer_net->lpn_peer);
1240
1241                 lp = rxpeerni->lpni_peer_net->lpn_peer;
1242
1243                 /* give back peer router credits */
1244                 msg->msg_peerrtrcredit = 0;
1245
1246                 spin_lock(&rxpeerni->lpni_lock);
1247                 spin_lock(&lp->lp_lock);
1248
1249                 rxpeerni->lpni_rtrcredits++;
1250
1251                 /* drop all messages which are queued to be routed on that
1252                  * peer. */
1253                 if (!the_lnet.ln_routing) {
1254                         struct list_head drop;
1255                         INIT_LIST_HEAD(&drop);
1256                         list_splice_init(&lp->lp_rtrq, &drop);
1257                         spin_unlock(&lp->lp_lock);
1258                         spin_unlock(&rxpeerni->lpni_lock);
1259                         lnet_drop_routed_msgs_locked(&drop, msg->msg_rx_cpt);
1260                 } else if (!list_empty(&lp->lp_rtrq)) {
1261                         int msg2_cpt;
1262
1263                         msg2 = list_entry(lp->lp_rtrq.next,
1264                                           struct lnet_msg, msg_list);
1265                         list_del(&msg2->msg_list);
1266                         msg2_cpt = msg2->msg_rx_cpt;
1267                         spin_unlock(&lp->lp_lock);
1268                         spin_unlock(&rxpeerni->lpni_lock);
1269                         /*
1270                          * messages on the lp_rtrq can be from any NID in
1271                          * the peer, which means they might have different
1272                          * cpts. We need to make sure we lock the right
1273                          * one.
1274                          */
1275                         if (msg2_cpt != msg->msg_rx_cpt) {
1276                                 lnet_net_unlock(msg->msg_rx_cpt);
1277                                 lnet_net_lock(msg2_cpt);
1278                         }
1279                         (void) lnet_post_routed_recv_locked(msg2, 1);
1280                         if (msg2_cpt != msg->msg_rx_cpt) {
1281                                 lnet_net_unlock(msg2_cpt);
1282                                 lnet_net_lock(msg->msg_rx_cpt);
1283                         }
1284                 } else {
1285                         spin_unlock(&lp->lp_lock);
1286                         spin_unlock(&rxpeerni->lpni_lock);
1287                 }
1288         }
1289         if (rxni != NULL) {
1290                 msg->msg_rxni = NULL;
1291                 lnet_ni_decref_locked(rxni, msg->msg_rx_cpt);
1292         }
1293         if (rxpeerni != NULL) {
1294                 msg->msg_rxpeer = NULL;
1295                 lnet_peer_ni_decref_locked(rxpeerni);
1296         }
1297 }
1298
1299 static int
1300 lnet_compare_peers(struct lnet_peer_ni *p1, struct lnet_peer_ni *p2)
1301 {
1302         if (p1->lpni_txqnob < p2->lpni_txqnob)
1303                 return 1;
1304
1305         if (p1->lpni_txqnob > p2->lpni_txqnob)
1306                 return -1;
1307
1308         if (p1->lpni_txcredits > p2->lpni_txcredits)
1309                 return 1;
1310
1311         if (p1->lpni_txcredits < p2->lpni_txcredits)
1312                 return -1;
1313
1314         return 0;
1315 }
1316
1317 static struct lnet_peer_ni *
1318 lnet_select_peer_ni(struct lnet_ni *best_ni, lnet_nid_t dst_nid,
1319                     struct lnet_peer *peer,
1320                     struct lnet_peer_net *peer_net)
1321 {
1322         /*
1323          * Look at the peer NIs for the destination peer that connect
1324          * to the chosen net. If a peer_ni is preferred when using the
1325          * best_ni to communicate, we use that one. If there is no
1326          * preferred peer_ni, or there are multiple preferred peer_ni,
1327          * the available transmit credits are used. If the transmit
1328          * credits are equal, we round-robin over the peer_ni.
1329          */
1330         struct lnet_peer_ni *lpni = NULL;
1331         struct lnet_peer_ni *best_lpni = NULL;
1332         int best_lpni_credits = INT_MIN;
1333         bool preferred = false;
1334         bool ni_is_pref;
1335         int best_lpni_healthv = 0;
1336         int lpni_healthv;
1337
1338         while ((lpni = lnet_get_next_peer_ni_locked(peer, peer_net, lpni))) {
1339                 /*
1340                  * if the best_ni we've chosen aleady has this lpni
1341                  * preferred, then let's use it
1342                  */
1343                 if (best_ni) {
1344                         ni_is_pref = lnet_peer_is_pref_nid_locked(lpni,
1345                                                                 best_ni->ni_nid);
1346                         CDEBUG(D_NET, "%s ni_is_pref = %d\n",
1347                                libcfs_nid2str(best_ni->ni_nid), ni_is_pref);
1348                 } else {
1349                         ni_is_pref = false;
1350                 }
1351
1352                 lpni_healthv = atomic_read(&lpni->lpni_healthv);
1353
1354                 if (best_lpni)
1355                         CDEBUG(D_NET, "%s c:[%d, %d], s:[%d, %d]\n",
1356                                 libcfs_nid2str(lpni->lpni_nid),
1357                                 lpni->lpni_txcredits, best_lpni_credits,
1358                                 lpni->lpni_seq, best_lpni->lpni_seq);
1359
1360                 /* pick the healthiest peer ni */
1361                 if (lpni_healthv < best_lpni_healthv) {
1362                         continue;
1363                 } else if (lpni_healthv > best_lpni_healthv) {
1364                         best_lpni_healthv = lpni_healthv;
1365                 /* if this is a preferred peer use it */
1366                 } else if (!preferred && ni_is_pref) {
1367                         preferred = true;
1368                 } else if (preferred && !ni_is_pref) {
1369                         /*
1370                          * this is not the preferred peer so let's ignore
1371                          * it.
1372                          */
1373                         continue;
1374                 } else if (lpni->lpni_txcredits < best_lpni_credits) {
1375                         /*
1376                          * We already have a peer that has more credits
1377                          * available than this one. No need to consider
1378                          * this peer further.
1379                          */
1380                         continue;
1381                 } else if (lpni->lpni_txcredits == best_lpni_credits) {
1382                         /*
1383                          * The best peer found so far and the current peer
1384                          * have the same number of available credits let's
1385                          * make sure to select between them using Round
1386                          * Robin
1387                          */
1388                         if (best_lpni) {
1389                                 if (best_lpni->lpni_seq <= lpni->lpni_seq)
1390                                         continue;
1391                         }
1392                 }
1393
1394                 best_lpni = lpni;
1395                 best_lpni_credits = lpni->lpni_txcredits;
1396         }
1397
1398         /* if we still can't find a peer ni then we can't reach it */
1399         if (!best_lpni) {
1400                 __u32 net_id = (peer_net) ? peer_net->lpn_net_id :
1401                         LNET_NIDNET(dst_nid);
1402                 CDEBUG(D_NET, "no peer_ni found on peer net %s\n",
1403                                 libcfs_net2str(net_id));
1404                 return NULL;
1405         }
1406
1407         CDEBUG(D_NET, "sd_best_lpni = %s\n",
1408                libcfs_nid2str(best_lpni->lpni_nid));
1409
1410         return best_lpni;
1411 }
1412
1413 /*
1414  * Prerequisite: the best_ni should already be set in the sd
1415  */
1416 static inline struct lnet_peer_ni *
1417 lnet_find_best_lpni_on_net(struct lnet_send_data *sd, struct lnet_peer *peer,
1418                            __u32 net_id)
1419 {
1420         struct lnet_peer_net *peer_net;
1421
1422         /*
1423          * The gateway is Multi-Rail capable so now we must select the
1424          * proper peer_ni
1425          */
1426         peer_net = lnet_peer_get_net_locked(peer, net_id);
1427
1428         if (!peer_net) {
1429                 CERROR("gateway peer %s has no NI on net %s\n",
1430                        libcfs_nid2str(peer->lp_primary_nid),
1431                        libcfs_net2str(net_id));
1432                 return NULL;
1433         }
1434
1435         return lnet_select_peer_ni(sd->sd_best_ni, sd->sd_dst_nid,
1436                                    peer, peer_net);
1437 }
1438
1439 static int
1440 lnet_compare_routes(struct lnet_route *r1, struct lnet_route *r2,
1441                     struct lnet_peer_ni **best_lpni)
1442 {
1443         int r1_hops = (r1->lr_hops == LNET_UNDEFINED_HOPS) ? 1 : r1->lr_hops;
1444         int r2_hops = (r2->lr_hops == LNET_UNDEFINED_HOPS) ? 1 : r2->lr_hops;
1445         struct lnet_peer *lp1 = r1->lr_gateway;
1446         struct lnet_peer *lp2 = r2->lr_gateway;
1447         struct lnet_peer_ni *lpni1;
1448         struct lnet_peer_ni *lpni2;
1449         struct lnet_send_data sd;
1450         int rc;
1451
1452         sd.sd_best_ni = NULL;
1453         sd.sd_dst_nid = LNET_NID_ANY;
1454         lpni1 = lnet_find_best_lpni_on_net(&sd, lp1, r1->lr_lnet);
1455         lpni2 = lnet_find_best_lpni_on_net(&sd, lp2, r2->lr_lnet);
1456         LASSERT(lpni1 && lpni2);
1457
1458         if (r1->lr_priority < r2->lr_priority) {
1459                 *best_lpni = lpni1;
1460                 return 1;
1461         }
1462
1463         if (r1->lr_priority > r2->lr_priority) {
1464                 *best_lpni = lpni2;
1465                 return -1;
1466         }
1467
1468         if (r1_hops < r2_hops) {
1469                 *best_lpni = lpni1;
1470                 return 1;
1471         }
1472
1473         if (r1_hops > r2_hops) {
1474                 *best_lpni = lpni2;
1475                 return -1;
1476         }
1477
1478         rc = lnet_compare_peers(lpni1, lpni2);
1479         if (rc == 1) {
1480                 *best_lpni = lpni1;
1481                 return rc;
1482         } else if (rc == -1) {
1483                 *best_lpni = lpni2;
1484                 return rc;
1485         }
1486
1487         if (r1->lr_seq - r2->lr_seq <= 0) {
1488                 *best_lpni = lpni1;
1489                 return 1;
1490         }
1491
1492         *best_lpni = lpni2;
1493         return -1;
1494 }
1495
1496 static struct lnet_route *
1497 lnet_find_route_locked(struct lnet_net *net, __u32 remote_net,
1498                        lnet_nid_t rtr_nid, struct lnet_route **prev_route,
1499                        struct lnet_peer_ni **gwni)
1500 {
1501         struct lnet_peer_ni *best_gw_ni = NULL;
1502         struct lnet_route *best_route;
1503         struct lnet_route *last_route;
1504         struct lnet_remotenet *rnet;
1505         struct lnet_peer *lp_best;
1506         struct lnet_route *route;
1507         struct lnet_peer *lp;
1508         int rc;
1509
1510         /* If @rtr_nid is not LNET_NID_ANY, return the gateway with
1511          * rtr_nid nid, otherwise find the best gateway I can use */
1512
1513         rnet = lnet_find_rnet_locked(remote_net);
1514         if (rnet == NULL)
1515                 return NULL;
1516
1517         lp_best = NULL;
1518         best_route = last_route = NULL;
1519         list_for_each_entry(route, &rnet->lrn_routes, lr_list) {
1520                 lp = route->lr_gateway;
1521
1522                 if (!lnet_is_route_alive(route))
1523                         continue;
1524
1525                 if (lp_best == NULL) {
1526                         best_route = last_route = route;
1527                         lp_best = lp;
1528                 }
1529
1530                 /* no protection on below fields, but it's harmless */
1531                 if (last_route->lr_seq - route->lr_seq < 0)
1532                         last_route = route;
1533
1534                 rc = lnet_compare_routes(route, best_route, &best_gw_ni);
1535                 if (rc < 0)
1536                         continue;
1537
1538                 best_route = route;
1539                 lp_best = lp;
1540         }
1541
1542         *prev_route = last_route;
1543         *gwni = best_gw_ni;
1544
1545         return best_route;
1546 }
1547
1548 static struct lnet_ni *
1549 lnet_get_best_ni(struct lnet_net *local_net, struct lnet_ni *best_ni,
1550                  struct lnet_peer *peer, struct lnet_peer_net *peer_net,
1551                  int md_cpt)
1552 {
1553         struct lnet_ni *ni = NULL;
1554         unsigned int shortest_distance;
1555         int best_credits;
1556         int best_healthv;
1557
1558         /*
1559          * If there is no peer_ni that we can send to on this network,
1560          * then there is no point in looking for a new best_ni here.
1561         */
1562         if (!lnet_get_next_peer_ni_locked(peer, peer_net, NULL))
1563                 return best_ni;
1564
1565         if (best_ni == NULL) {
1566                 shortest_distance = UINT_MAX;
1567                 best_credits = INT_MIN;
1568                 best_healthv = 0;
1569         } else {
1570                 shortest_distance = cfs_cpt_distance(lnet_cpt_table(), md_cpt,
1571                                                      best_ni->ni_dev_cpt);
1572                 best_credits = atomic_read(&best_ni->ni_tx_credits);
1573                 best_healthv = atomic_read(&best_ni->ni_healthv);
1574         }
1575
1576         while ((ni = lnet_get_next_ni_locked(local_net, ni))) {
1577                 unsigned int distance;
1578                 int ni_credits;
1579                 int ni_healthv;
1580                 int ni_fatal;
1581
1582                 ni_credits = atomic_read(&ni->ni_tx_credits);
1583                 ni_healthv = atomic_read(&ni->ni_healthv);
1584                 ni_fatal = atomic_read(&ni->ni_fatal_error_on);
1585
1586                 /*
1587                  * calculate the distance from the CPT on which
1588                  * the message memory is allocated to the CPT of
1589                  * the NI's physical device
1590                  */
1591                 distance = cfs_cpt_distance(lnet_cpt_table(),
1592                                             md_cpt,
1593                                             ni->ni_dev_cpt);
1594
1595                 CDEBUG(D_NET, "compare ni %s [c:%d, d:%d, s:%d] with best_ni %s [c:%d, d:%d, s:%d]\n",
1596                        libcfs_nid2str(ni->ni_nid), ni_credits, distance,
1597                        ni->ni_seq, (best_ni) ? libcfs_nid2str(best_ni->ni_nid)
1598                         : "not seleced", best_credits, shortest_distance,
1599                         (best_ni) ? best_ni->ni_seq : 0);
1600
1601                 /*
1602                  * All distances smaller than the NUMA range
1603                  * are treated equally.
1604                  */
1605                 if (distance < lnet_numa_range)
1606                         distance = lnet_numa_range;
1607
1608                 /*
1609                  * Select on health, shorter distance, available
1610                  * credits, then round-robin.
1611                  */
1612                 if (ni_fatal) {
1613                         continue;
1614                 } else if (ni_healthv < best_healthv) {
1615                         continue;
1616                 } else if (ni_healthv > best_healthv) {
1617                         best_healthv = ni_healthv;
1618                         /*
1619                          * If we're going to prefer this ni because it's
1620                          * the healthiest, then we should set the
1621                          * shortest_distance in the algorithm in case
1622                          * there are multiple NIs with the same health but
1623                          * different distances.
1624                          */
1625                         if (distance < shortest_distance)
1626                                 shortest_distance = distance;
1627                 } else if (distance > shortest_distance) {
1628                         continue;
1629                 } else if (distance < shortest_distance) {
1630                         shortest_distance = distance;
1631                 } else if (ni_credits < best_credits) {
1632                         continue;
1633                 } else if (ni_credits == best_credits) {
1634                         if (best_ni && best_ni->ni_seq <= ni->ni_seq)
1635                                 continue;
1636                 }
1637                 best_ni = ni;
1638                 best_credits = ni_credits;
1639         }
1640
1641         CDEBUG(D_NET, "selected best_ni %s\n",
1642                (best_ni) ? libcfs_nid2str(best_ni->ni_nid) : "no selection");
1643
1644         return best_ni;
1645 }
1646
1647 /*
1648  * Traffic to the LNET_RESERVED_PORTAL may not trigger peer discovery,
1649  * because such traffic is required to perform discovery. We therefore
1650  * exclude all GET and PUT on that portal. We also exclude all ACK and
1651  * REPLY traffic, but that is because the portal is not tracked in the
1652  * message structure for these message types. We could restrict this
1653  * further by also checking for LNET_PROTO_PING_MATCHBITS.
1654  */
1655 static bool
1656 lnet_msg_discovery(struct lnet_msg *msg)
1657 {
1658         if (msg->msg_type == LNET_MSG_PUT) {
1659                 if (msg->msg_hdr.msg.put.ptl_index != LNET_RESERVED_PORTAL)
1660                         return true;
1661         } else if (msg->msg_type == LNET_MSG_GET) {
1662                 if (msg->msg_hdr.msg.get.ptl_index != LNET_RESERVED_PORTAL)
1663                         return true;
1664         }
1665         return false;
1666 }
1667
1668 #define SRC_SPEC        0x0001
1669 #define SRC_ANY         0x0002
1670 #define LOCAL_DST       0x0004
1671 #define REMOTE_DST      0x0008
1672 #define MR_DST          0x0010
1673 #define NMR_DST         0x0020
1674 #define SND_RESP        0x0040
1675
1676 /* The following to defines are used for return codes */
1677 #define REPEAT_SEND     0x1000
1678 #define PASS_THROUGH    0x2000
1679
1680 /* The different cases lnet_select pathway needs to handle */
1681 #define SRC_SPEC_LOCAL_MR_DST   (SRC_SPEC | LOCAL_DST | MR_DST)
1682 #define SRC_SPEC_ROUTER_MR_DST  (SRC_SPEC | REMOTE_DST | MR_DST)
1683 #define SRC_SPEC_LOCAL_NMR_DST  (SRC_SPEC | LOCAL_DST | NMR_DST)
1684 #define SRC_SPEC_ROUTER_NMR_DST (SRC_SPEC | REMOTE_DST | NMR_DST)
1685 #define SRC_ANY_LOCAL_MR_DST    (SRC_ANY | LOCAL_DST | MR_DST)
1686 #define SRC_ANY_ROUTER_MR_DST   (SRC_ANY | REMOTE_DST | MR_DST)
1687 #define SRC_ANY_LOCAL_NMR_DST   (SRC_ANY | LOCAL_DST | NMR_DST)
1688 #define SRC_ANY_ROUTER_NMR_DST  (SRC_ANY | REMOTE_DST | NMR_DST)
1689
1690 static int
1691 lnet_handle_lo_send(struct lnet_send_data *sd)
1692 {
1693         struct lnet_msg *msg = sd->sd_msg;
1694         int cpt = sd->sd_cpt;
1695
1696         /* No send credit hassles with LOLND */
1697         lnet_ni_addref_locked(the_lnet.ln_loni, cpt);
1698         msg->msg_hdr.dest_nid = cpu_to_le64(the_lnet.ln_loni->ni_nid);
1699         if (!msg->msg_routing)
1700                 msg->msg_hdr.src_nid =
1701                         cpu_to_le64(the_lnet.ln_loni->ni_nid);
1702         msg->msg_target.nid = the_lnet.ln_loni->ni_nid;
1703         lnet_msg_commit(msg, cpt);
1704         msg->msg_txni = the_lnet.ln_loni;
1705
1706         return LNET_CREDIT_OK;
1707 }
1708
1709 static int
1710 lnet_handle_send(struct lnet_send_data *sd)
1711 {
1712         struct lnet_ni *best_ni = sd->sd_best_ni;
1713         struct lnet_peer_ni *best_lpni = sd->sd_best_lpni;
1714         struct lnet_peer_ni *final_dst_lpni = sd->sd_final_dst_lpni;
1715         struct lnet_msg *msg = sd->sd_msg;
1716         int cpt2;
1717         __u32 send_case = sd->sd_send_case;
1718         int rc;
1719         __u32 routing = send_case & REMOTE_DST;
1720          struct lnet_rsp_tracker *rspt;
1721
1722         /*
1723          * Increment sequence number of the selected peer so that we
1724          * pick the next one in Round Robin.
1725          */
1726         best_lpni->lpni_seq++;
1727
1728         /*
1729          * grab a reference on the peer_ni so it sticks around even if
1730          * we need to drop and relock the lnet_net_lock below.
1731          */
1732         lnet_peer_ni_addref_locked(best_lpni);
1733
1734         /*
1735          * Use lnet_cpt_of_nid() to determine the CPT used to commit the
1736          * message. This ensures that we get a CPT that is correct for
1737          * the NI when the NI has been restricted to a subset of all CPTs.
1738          * If the selected CPT differs from the one currently locked, we
1739          * must unlock and relock the lnet_net_lock(), and then check whether
1740          * the configuration has changed. We don't have a hold on the best_ni
1741          * yet, and it may have vanished.
1742          */
1743         cpt2 = lnet_cpt_of_nid_locked(best_lpni->lpni_nid, best_ni);
1744         if (sd->sd_cpt != cpt2) {
1745                 __u32 seq = lnet_get_dlc_seq_locked();
1746                 lnet_net_unlock(sd->sd_cpt);
1747                 sd->sd_cpt = cpt2;
1748                 lnet_net_lock(sd->sd_cpt);
1749                 if (seq != lnet_get_dlc_seq_locked()) {
1750                         lnet_peer_ni_decref_locked(best_lpni);
1751                         return REPEAT_SEND;
1752                 }
1753         }
1754
1755         /*
1756          * store the best_lpni in the message right away to avoid having
1757          * to do the same operation under different conditions
1758          */
1759         msg->msg_txpeer = best_lpni;
1760         msg->msg_txni = best_ni;
1761
1762         /*
1763          * grab a reference for the best_ni since now it's in use in this
1764          * send. The reference will be dropped in lnet_finalize()
1765          */
1766         lnet_ni_addref_locked(msg->msg_txni, sd->sd_cpt);
1767
1768         /*
1769          * Always set the target.nid to the best peer picked. Either the
1770          * NID will be one of the peer NIDs selected, or the same NID as
1771          * what was originally set in the target or it will be the NID of
1772          * a router if this message should be routed
1773          */
1774         msg->msg_target.nid = msg->msg_txpeer->lpni_nid;
1775
1776         /*
1777          * lnet_msg_commit assigns the correct cpt to the message, which
1778          * is used to decrement the correct refcount on the ni when it's
1779          * time to return the credits
1780          */
1781         lnet_msg_commit(msg, sd->sd_cpt);
1782
1783         /*
1784          * If we are routing the message then we keep the src_nid that was
1785          * set by the originator. If we are not routing then we are the
1786          * originator and set it here.
1787          */
1788         if (!msg->msg_routing)
1789                 msg->msg_hdr.src_nid = cpu_to_le64(msg->msg_txni->ni_nid);
1790
1791         if (routing) {
1792                 msg->msg_target_is_router = 1;
1793                 msg->msg_target.pid = LNET_PID_LUSTRE;
1794                 /*
1795                  * since we're routing we want to ensure that the
1796                  * msg_hdr.dest_nid is set to the final destination. When
1797                  * the router receives this message it knows how to route
1798                  * it.
1799                  *
1800                  * final_dst_lpni is set at the beginning of the
1801                  * lnet_select_pathway() function and is never changed.
1802                  * It's safe to use it here.
1803                  */
1804                 msg->msg_hdr.dest_nid = cpu_to_le64(final_dst_lpni->lpni_nid);
1805         } else {
1806                 /*
1807                  * if we're not routing set the dest_nid to the best peer
1808                  * ni NID that we picked earlier in the algorithm.
1809                  */
1810                 msg->msg_hdr.dest_nid = cpu_to_le64(msg->msg_txpeer->lpni_nid);
1811         }
1812
1813         /*
1814          * if we have response tracker block update it with the next hop
1815          * nid
1816          */
1817         if (msg->msg_md) {
1818                 rspt = msg->msg_md->md_rspt_ptr;
1819                 if (rspt) {
1820                         rspt->rspt_next_hop_nid = msg->msg_txpeer->lpni_nid;
1821                         CDEBUG(D_NET, "rspt_next_hop_nid = %s\n",
1822                                libcfs_nid2str(rspt->rspt_next_hop_nid));
1823                 }
1824         }
1825
1826         rc = lnet_post_send_locked(msg, 0);
1827
1828         if (!rc)
1829                 CDEBUG(D_NET, "TRACE: %s(%s:%s) -> %s(%s:%s) : %s try# %d\n",
1830                        libcfs_nid2str(msg->msg_hdr.src_nid),
1831                        libcfs_nid2str(msg->msg_txni->ni_nid),
1832                        libcfs_nid2str(sd->sd_src_nid),
1833                        libcfs_nid2str(msg->msg_hdr.dest_nid),
1834                        libcfs_nid2str(sd->sd_dst_nid),
1835                        libcfs_nid2str(msg->msg_txpeer->lpni_nid),
1836                        lnet_msgtyp2str(msg->msg_type), msg->msg_retry_count);
1837
1838         return rc;
1839 }
1840
1841 static inline void
1842 lnet_set_non_mr_pref_nid(struct lnet_send_data *sd)
1843 {
1844         if (sd->sd_send_case & NMR_DST &&
1845             sd->sd_msg->msg_type != LNET_MSG_REPLY &&
1846             sd->sd_msg->msg_type != LNET_MSG_ACK &&
1847             sd->sd_best_lpni->lpni_pref_nnids == 0) {
1848                 CDEBUG(D_NET, "Setting preferred local NID %s on NMR peer %s\n",
1849                        libcfs_nid2str(sd->sd_best_ni->ni_nid),
1850                        libcfs_nid2str(sd->sd_best_lpni->lpni_nid));
1851                 lnet_peer_ni_set_non_mr_pref_nid(sd->sd_best_lpni,
1852                                                  sd->sd_best_ni->ni_nid);
1853         }
1854 }
1855
1856 /*
1857  * Source Specified
1858  * Local Destination
1859  * non-mr peer
1860  *
1861  * use the source and destination NIDs as the pathway
1862  */
1863 static int
1864 lnet_handle_spec_local_nmr_dst(struct lnet_send_data *sd)
1865 {
1866         /* the destination lpni is set before we get here. */
1867
1868         /* find local NI */
1869         sd->sd_best_ni = lnet_nid2ni_locked(sd->sd_src_nid, sd->sd_cpt);
1870         if (!sd->sd_best_ni) {
1871                 CERROR("Can't send to %s: src %s is not a "
1872                        "local nid\n", libcfs_nid2str(sd->sd_dst_nid),
1873                                 libcfs_nid2str(sd->sd_src_nid));
1874                 return -EINVAL;
1875         }
1876
1877         /*
1878          * the preferred NID will only be set for NMR peers
1879          */
1880         lnet_set_non_mr_pref_nid(sd);
1881
1882         return lnet_handle_send(sd);
1883 }
1884
1885 /*
1886  * Source Specified
1887  * Local Destination
1888  * MR Peer
1889  *
1890  * Run the selection algorithm on the peer NIs unless we're sending
1891  * a response, in this case just send to the destination
1892  */
1893 static int
1894 lnet_handle_spec_local_mr_dst(struct lnet_send_data *sd)
1895 {
1896         sd->sd_best_ni = lnet_nid2ni_locked(sd->sd_src_nid, sd->sd_cpt);
1897         if (!sd->sd_best_ni) {
1898                 CERROR("Can't send to %s: src %s is not a "
1899                        "local nid\n", libcfs_nid2str(sd->sd_dst_nid),
1900                                 libcfs_nid2str(sd->sd_src_nid));
1901                 return -EINVAL;
1902         }
1903
1904         /*
1905          * only run the selection algorithm to pick the peer_ni if we're
1906          * sending a GET or a PUT. Responses are sent to the same
1907          * destination NID provided.
1908          */
1909         if (!(sd->sd_send_case & SND_RESP)) {
1910                 sd->sd_best_lpni =
1911                   lnet_find_best_lpni_on_net(sd, sd->sd_peer,
1912                                              sd->sd_best_ni->ni_net->net_id);
1913         }
1914
1915         if (sd->sd_best_lpni &&
1916             sd->sd_best_lpni->lpni_nid == the_lnet.ln_loni->ni_nid)
1917                 return lnet_handle_lo_send(sd);
1918         else if (sd->sd_best_lpni)
1919                 return lnet_handle_send(sd);
1920
1921         CERROR("can't send to %s. no NI on %s\n",
1922                libcfs_nid2str(sd->sd_dst_nid),
1923                libcfs_net2str(sd->sd_best_ni->ni_net->net_id));
1924
1925         return -EHOSTUNREACH;
1926 }
1927
1928 struct lnet_ni *
1929 lnet_find_best_ni_on_spec_net(struct lnet_ni *cur_best_ni,
1930                               struct lnet_peer *peer,
1931                               struct lnet_peer_net *peer_net,
1932                               int cpt,
1933                               bool incr_seq)
1934 {
1935         struct lnet_net *local_net;
1936         struct lnet_ni *best_ni;
1937
1938         local_net = lnet_get_net_locked(peer_net->lpn_net_id);
1939         if (!local_net)
1940                 return NULL;
1941
1942         /*
1943          * Iterate through the NIs in this local Net and select
1944          * the NI to send from. The selection is determined by
1945          * these 3 criterion in the following priority:
1946          *      1. NUMA
1947          *      2. NI available credits
1948          *      3. Round Robin
1949          */
1950         best_ni = lnet_get_best_ni(local_net, cur_best_ni,
1951                                    peer, peer_net, cpt);
1952
1953         if (incr_seq && best_ni)
1954                 best_ni->ni_seq++;
1955
1956         return best_ni;
1957 }
1958
1959 static int
1960 lnet_initiate_peer_discovery(struct lnet_peer_ni *lpni,
1961                              struct lnet_msg *msg, lnet_nid_t rtr_nid,
1962                              int cpt)
1963 {
1964         struct lnet_peer *peer;
1965         lnet_nid_t primary_nid;
1966         int rc;
1967
1968         lnet_peer_ni_addref_locked(lpni);
1969
1970         peer = lpni->lpni_peer_net->lpn_peer;
1971
1972         if (lnet_peer_gw_discovery(peer)) {
1973                 lnet_peer_ni_decref_locked(lpni);
1974                 return 0;
1975         }
1976
1977         if (!lnet_msg_discovery(msg) || lnet_peer_is_uptodate(peer)) {
1978                 lnet_peer_ni_decref_locked(lpni);
1979                 return 0;
1980         }
1981
1982         rc = lnet_discover_peer_locked(lpni, cpt, false);
1983         if (rc) {
1984                 lnet_peer_ni_decref_locked(lpni);
1985                 return rc;
1986         }
1987         /* The peer may have changed. */
1988         peer = lpni->lpni_peer_net->lpn_peer;
1989         /* queue message and return */
1990         msg->msg_rtr_nid_param = rtr_nid;
1991         msg->msg_sending = 0;
1992         msg->msg_txpeer = NULL;
1993         spin_lock(&peer->lp_lock);
1994         list_add_tail(&msg->msg_list, &peer->lp_dc_pendq);
1995         spin_unlock(&peer->lp_lock);
1996         lnet_peer_ni_decref_locked(lpni);
1997         primary_nid = peer->lp_primary_nid;
1998
1999         CDEBUG(D_NET, "msg %p delayed. %s pending discovery\n",
2000                 msg, libcfs_nid2str(primary_nid));
2001
2002         return LNET_DC_WAIT;
2003 }
2004
2005 static int
2006 lnet_handle_find_routed_path(struct lnet_send_data *sd,
2007                              lnet_nid_t dst_nid,
2008                              struct lnet_peer_ni **gw_lpni,
2009                              struct lnet_peer **gw_peer)
2010 {
2011         int rc;
2012         struct lnet_peer *gw;
2013         struct lnet_peer *lp;
2014         struct lnet_peer_net *lpn;
2015         struct lnet_peer_net *best_lpn = NULL;
2016         struct lnet_remotenet *rnet;
2017         struct lnet_route *best_route;
2018         struct lnet_route *last_route;
2019         struct lnet_peer_ni *lpni = NULL;
2020         struct lnet_peer_ni *gwni = NULL;
2021         lnet_nid_t src_nid = sd->sd_src_nid;
2022
2023         /* we've already looked up the initial lpni using dst_nid */
2024         lpni = sd->sd_best_lpni;
2025         /* the peer tree must be in existence */
2026         LASSERT(lpni && lpni->lpni_peer_net && lpni->lpni_peer_net->lpn_peer);
2027         lp = lpni->lpni_peer_net->lpn_peer;
2028
2029         list_for_each_entry(lpn, &lp->lp_peer_nets, lpn_peer_nets) {
2030                 /* is this remote network reachable?  */
2031                 rnet = lnet_find_rnet_locked(lpn->lpn_net_id);
2032                 if (!rnet)
2033                         continue;
2034
2035                 if (!best_lpn)
2036                         best_lpn = lpn;
2037
2038                 if (best_lpn->lpn_seq <= lpn->lpn_seq)
2039                         continue;
2040
2041                 best_lpn = lpn;
2042         }
2043
2044         if (!best_lpn) {
2045                 CERROR("peer %s has no available nets \n",
2046                        libcfs_nid2str(sd->sd_dst_nid));
2047                 return -EHOSTUNREACH;
2048         }
2049
2050         sd->sd_best_lpni = lnet_find_best_lpni_on_net(sd, lp, best_lpn->lpn_net_id);
2051         if (!sd->sd_best_lpni) {
2052                 CERROR("peer %s down\n", libcfs_nid2str(sd->sd_dst_nid));
2053                 return -EHOSTUNREACH;
2054         }
2055
2056         best_route = lnet_find_route_locked(NULL, best_lpn->lpn_net_id,
2057                                             sd->sd_rtr_nid, &last_route,
2058                                             &gwni);
2059         if (!best_route) {
2060                 CERROR("no route to %s from %s\n",
2061                        libcfs_nid2str(dst_nid), libcfs_nid2str(src_nid));
2062                 return -EHOSTUNREACH;
2063         }
2064
2065         if (!gwni) {
2066                 CERROR("Internal Error. Route expected to %s from %s\n",
2067                         libcfs_nid2str(dst_nid),
2068                         libcfs_nid2str(src_nid));
2069                 return -EFAULT;
2070         }
2071
2072         gw = best_route->lr_gateway;
2073         LASSERT(gw == gwni->lpni_peer_net->lpn_peer);
2074
2075         /*
2076          * Discover this gateway if it hasn't already been discovered.
2077          * This means we might delay the message until discovery has
2078          * completed
2079          */
2080         sd->sd_msg->msg_src_nid_param = sd->sd_src_nid;
2081         rc = lnet_initiate_peer_discovery(gwni, sd->sd_msg, sd->sd_rtr_nid,
2082                                           sd->sd_cpt);
2083         if (rc)
2084                 return rc;
2085
2086         if (!sd->sd_best_ni)
2087                 sd->sd_best_ni = lnet_find_best_ni_on_spec_net(NULL, gw,
2088                                         lnet_peer_get_net_locked(gw,
2089                                                 best_route->lr_lnet),
2090                                         sd->sd_md_cpt,
2091                                         true);
2092
2093         if (!sd->sd_best_ni) {
2094                 CERROR("Internal Error. Expected local ni on %s "
2095                        "but non found :%s\n",
2096                        libcfs_net2str(best_route->lr_lnet),
2097                        libcfs_nid2str(sd->sd_src_nid));
2098                 return -EFAULT;
2099         }
2100
2101         *gw_lpni = gwni;
2102         *gw_peer = gw;
2103
2104         /*
2105          * increment the sequence numbers since now we're sure we're
2106          * going to use this path
2107          */
2108         LASSERT(best_route && last_route);
2109         best_route->lr_seq = last_route->lr_seq + 1;
2110         best_lpn->lpn_seq++;
2111
2112         return 0;
2113 }
2114
2115 /*
2116  * Handle two cases:
2117  *
2118  * Case 1:
2119  *  Source specified
2120  *  Remote destination
2121  *  Non-MR destination
2122  *
2123  * Case 2:
2124  *  Source specified
2125  *  Remote destination
2126  *  MR destination
2127  *
2128  * The handling of these two cases is similar. Even though the destination
2129  * can be MR or non-MR, we'll deal directly with the router.
2130  */
2131 static int
2132 lnet_handle_spec_router_dst(struct lnet_send_data *sd)
2133 {
2134         int rc;
2135         struct lnet_peer_ni *gw_lpni = NULL;
2136         struct lnet_peer *gw_peer = NULL;
2137
2138         /* find local NI */
2139         sd->sd_best_ni = lnet_nid2ni_locked(sd->sd_src_nid, sd->sd_cpt);
2140         if (!sd->sd_best_ni) {
2141                 CERROR("Can't send to %s: src %s is not a "
2142                        "local nid\n", libcfs_nid2str(sd->sd_dst_nid),
2143                                 libcfs_nid2str(sd->sd_src_nid));
2144                 return -EINVAL;
2145         }
2146
2147         rc = lnet_handle_find_routed_path(sd, sd->sd_dst_nid, &gw_lpni,
2148                                      &gw_peer);
2149         if (rc)
2150                 return rc;
2151
2152         if (sd->sd_send_case & NMR_DST)
2153                 /*
2154                 * since the final destination is non-MR let's set its preferred
2155                 * NID before we send
2156                 */
2157                 lnet_set_non_mr_pref_nid(sd);
2158
2159         /*
2160          * We're going to send to the gw found so let's set its
2161          * info
2162          */
2163         sd->sd_peer = gw_peer;
2164         sd->sd_best_lpni = gw_lpni;
2165
2166         return lnet_handle_send(sd);
2167 }
2168
2169 struct lnet_ni *
2170 lnet_find_best_ni_on_local_net(struct lnet_peer *peer, int md_cpt,
2171                                bool discovery)
2172 {
2173         struct lnet_peer_net *peer_net = NULL;
2174         struct lnet_ni *best_ni = NULL;
2175
2176         /*
2177          * The peer can have multiple interfaces, some of them can be on
2178          * the local network and others on a routed network. We should
2179          * prefer the local network. However if the local network is not
2180          * available then we need to try the routed network
2181          */
2182
2183         /* go through all the peer nets and find the best_ni */
2184         list_for_each_entry(peer_net, &peer->lp_peer_nets, lpn_peer_nets) {
2185                 /*
2186                  * The peer's list of nets can contain non-local nets. We
2187                  * want to only examine the local ones.
2188                  */
2189                 if (!lnet_get_net_locked(peer_net->lpn_net_id))
2190                         continue;
2191                 best_ni = lnet_find_best_ni_on_spec_net(best_ni, peer,
2192                                                    peer_net, md_cpt, false);
2193
2194                 /*
2195                  * if this is a discovery message and lp_disc_net_id is
2196                  * specified then use that net to send the discovery on.
2197                  */
2198                 if (peer->lp_disc_net_id == peer_net->lpn_net_id &&
2199                     discovery)
2200                         break;
2201         }
2202
2203         if (best_ni)
2204                 /* increment sequence number so we can round robin */
2205                 best_ni->ni_seq++;
2206
2207         return best_ni;
2208 }
2209
2210 static struct lnet_ni *
2211 lnet_find_existing_preferred_best_ni(struct lnet_send_data *sd)
2212 {
2213         struct lnet_ni *best_ni = NULL;
2214         struct lnet_peer_net *peer_net;
2215         struct lnet_peer *peer = sd->sd_peer;
2216         struct lnet_peer_ni *best_lpni = sd->sd_best_lpni;
2217         struct lnet_peer_ni *lpni;
2218         int cpt = sd->sd_cpt;
2219
2220         /*
2221          * We must use a consistent source address when sending to a
2222          * non-MR peer. However, a non-MR peer can have multiple NIDs
2223          * on multiple networks, and we may even need to talk to this
2224          * peer on multiple networks -- certain types of
2225          * load-balancing configuration do this.
2226          *
2227          * So we need to pick the NI the peer prefers for this
2228          * particular network.
2229          */
2230
2231         /* Get the target peer_ni */
2232         peer_net = lnet_peer_get_net_locked(peer,
2233                         LNET_NIDNET(best_lpni->lpni_nid));
2234         LASSERT(peer_net != NULL);
2235         list_for_each_entry(lpni, &peer_net->lpn_peer_nis,
2236                                 lpni_peer_nis) {
2237                 if (lpni->lpni_pref_nnids == 0)
2238                         continue;
2239                 LASSERT(lpni->lpni_pref_nnids == 1);
2240                 best_ni = lnet_nid2ni_locked(
2241                                 lpni->lpni_pref.nid, cpt);
2242                 break;
2243         }
2244
2245         return best_ni;
2246 }
2247
2248 /* Prerequisite: sd->sd_peer and sd->sd_best_lpni should be set */
2249 static int
2250 lnet_select_preferred_best_ni(struct lnet_send_data *sd)
2251 {
2252         struct lnet_ni *best_ni = NULL;
2253         struct lnet_peer_ni *best_lpni = sd->sd_best_lpni;
2254
2255         /*
2256          * We must use a consistent source address when sending to a
2257          * non-MR peer. However, a non-MR peer can have multiple NIDs
2258          * on multiple networks, and we may even need to talk to this
2259          * peer on multiple networks -- certain types of
2260          * load-balancing configuration do this.
2261          *
2262          * So we need to pick the NI the peer prefers for this
2263          * particular network.
2264          */
2265
2266         best_ni = lnet_find_existing_preferred_best_ni(sd);
2267
2268         /* if best_ni is still not set just pick one */
2269         if (!best_ni) {
2270                 best_ni =
2271                   lnet_find_best_ni_on_spec_net(NULL, sd->sd_peer,
2272                                                 sd->sd_best_lpni->lpni_peer_net,
2273                                                 sd->sd_md_cpt, true);
2274                 /* If there is no best_ni we don't have a route */
2275                 if (!best_ni) {
2276                         CERROR("no path to %s from net %s\n",
2277                                 libcfs_nid2str(best_lpni->lpni_nid),
2278                                 libcfs_net2str(best_lpni->lpni_net->net_id));
2279                         return -EHOSTUNREACH;
2280                 }
2281         }
2282
2283         sd->sd_best_ni = best_ni;
2284
2285         /* Set preferred NI if necessary. */
2286         lnet_set_non_mr_pref_nid(sd);
2287
2288         return 0;
2289 }
2290
2291
2292 /*
2293  * Source not specified
2294  * Local destination
2295  * Non-MR Peer
2296  *
2297  * always use the same source NID for NMR peers
2298  * If we've talked to that peer before then we already have a preferred
2299  * source NI associated with it. Otherwise, we select a preferred local NI
2300  * and store it in the peer
2301  */
2302 static int
2303 lnet_handle_any_local_nmr_dst(struct lnet_send_data *sd)
2304 {
2305         int rc;
2306
2307         /* sd->sd_best_lpni is already set to the final destination */
2308
2309         /*
2310          * At this point we should've created the peer ni and peer. If we
2311          * can't find it, then something went wrong. Instead of assert
2312          * output a relevant message and fail the send
2313          */
2314         if (!sd->sd_best_lpni) {
2315                 CERROR("Internal fault. Unable to send msg %s to %s. "
2316                        "NID not known\n",
2317                        lnet_msgtyp2str(sd->sd_msg->msg_type),
2318                        libcfs_nid2str(sd->sd_dst_nid));
2319                 return -EFAULT;
2320         }
2321
2322         rc = lnet_select_preferred_best_ni(sd);
2323         if (!rc)
2324                 rc = lnet_handle_send(sd);
2325
2326         return rc;
2327 }
2328
2329 static int
2330 lnet_handle_any_mr_dsta(struct lnet_send_data *sd)
2331 {
2332         /*
2333          * NOTE we've already handled the remote peer case. So we only
2334          * need to worry about the local case here.
2335          *
2336          * if we're sending a response, ACK or reply, we need to send it
2337          * to the destination NID given to us. At this point we already
2338          * have the peer_ni we're suppose to send to, so just find the
2339          * best_ni on the peer net and use that. Since we're sending to an
2340          * MR peer then we can just run the selection algorithm on our
2341          * local NIs and pick the best one.
2342          */
2343         if (sd->sd_send_case & SND_RESP) {
2344                 sd->sd_best_ni =
2345                   lnet_find_best_ni_on_spec_net(NULL, sd->sd_peer,
2346                                                 sd->sd_best_lpni->lpni_peer_net,
2347                                                 sd->sd_md_cpt, true);
2348
2349                 if (!sd->sd_best_ni) {
2350                         /*
2351                          * We're not going to deal with not able to send
2352                          * a response to the provided final destination
2353                          */
2354                         CERROR("Can't send response to %s. "
2355                                "No local NI available\n",
2356                                 libcfs_nid2str(sd->sd_dst_nid));
2357                         return -EHOSTUNREACH;
2358                 }
2359
2360                 return lnet_handle_send(sd);
2361         }
2362
2363         /*
2364          * If we get here that means we're sending a fresh request, PUT or
2365          * GET, so we need to run our standard selection algorithm.
2366          * First find the best local interface that's on any of the peer's
2367          * networks.
2368          */
2369         sd->sd_best_ni = lnet_find_best_ni_on_local_net(sd->sd_peer,
2370                                         sd->sd_md_cpt,
2371                                         lnet_msg_discovery(sd->sd_msg));
2372         if (sd->sd_best_ni) {
2373                 sd->sd_best_lpni =
2374                   lnet_find_best_lpni_on_net(sd, sd->sd_peer,
2375                                              sd->sd_best_ni->ni_net->net_id);
2376
2377                 /*
2378                  * if we're successful in selecting a peer_ni on the local
2379                  * network, then send to it. Otherwise fall through and
2380                  * try and see if we can reach it over another routed
2381                  * network
2382                  */
2383                 if (sd->sd_best_lpni &&
2384                     sd->sd_best_lpni->lpni_nid == the_lnet.ln_loni->ni_nid) {
2385                         /*
2386                          * in case we initially started with a routed
2387                          * destination, let's reset to local
2388                          */
2389                         sd->sd_send_case &= ~REMOTE_DST;
2390                         sd->sd_send_case |= LOCAL_DST;
2391                         return lnet_handle_lo_send(sd);
2392                 } else if (sd->sd_best_lpni) {
2393                         /*
2394                          * in case we initially started with a routed
2395                          * destination, let's reset to local
2396                          */
2397                         sd->sd_send_case &= ~REMOTE_DST;
2398                         sd->sd_send_case |= LOCAL_DST;
2399                         return lnet_handle_send(sd);
2400                 }
2401
2402                 CERROR("Internal Error. Expected to have a best_lpni: "
2403                        "%s -> %s\n",
2404                        libcfs_nid2str(sd->sd_src_nid),
2405                        libcfs_nid2str(sd->sd_dst_nid));
2406
2407                 return -EFAULT;
2408         }
2409
2410         /*
2411          * Peer doesn't have a local network. Let's see if there is
2412          * a remote network we can reach it on.
2413          */
2414         return PASS_THROUGH;
2415 }
2416
2417 /*
2418  * Case 1:
2419  *      Source NID not specified
2420  *      Local destination
2421  *      MR peer
2422  *
2423  * Case 2:
2424  *      Source NID not speified
2425  *      Remote destination
2426  *      MR peer
2427  *
2428  * In both of these cases if we're sending a response, ACK or REPLY, then
2429  * we need to send to the destination NID provided.
2430  *
2431  * In the remote case let's deal with MR routers.
2432  *
2433  */
2434
2435 static int
2436 lnet_handle_any_mr_dst(struct lnet_send_data *sd)
2437 {
2438         int rc = 0;
2439         struct lnet_peer *gw_peer = NULL;
2440         struct lnet_peer_ni *gw_lpni = NULL;
2441
2442         /*
2443          * handle sending a response to a remote peer here so we don't
2444          * have to worry about it if we hit lnet_handle_any_mr_dsta()
2445          */
2446         if (sd->sd_send_case & REMOTE_DST &&
2447             sd->sd_send_case & SND_RESP) {
2448                 struct lnet_peer_ni *gw;
2449                 struct lnet_peer *gw_peer;
2450
2451                 rc = lnet_handle_find_routed_path(sd, sd->sd_dst_nid, &gw,
2452                                                   &gw_peer);
2453                 if (rc < 0) {
2454                         CERROR("Can't send response to %s. "
2455                                "No route available\n",
2456                                 libcfs_nid2str(sd->sd_dst_nid));
2457                         return -EHOSTUNREACH;
2458                 } else if (rc > 0) {
2459                         return rc;
2460                 }
2461
2462                 sd->sd_best_lpni = gw;
2463                 sd->sd_peer = gw_peer;
2464
2465                 return lnet_handle_send(sd);
2466         }
2467
2468         /*
2469          * Even though the NID for the peer might not be on a local network,
2470          * since the peer is MR there could be other interfaces on the
2471          * local network. In that case we'd still like to prefer the local
2472          * network over the routed network. If we're unable to do that
2473          * then we select the best router among the different routed networks,
2474          * and if the router is MR then we can deal with it as such.
2475          */
2476         rc = lnet_handle_any_mr_dsta(sd);
2477         if (rc != PASS_THROUGH)
2478                 return rc;
2479
2480         /*
2481          * Now that we must route to the destination, we must consider the
2482          * MR case, where the destination has multiple interfaces, some of
2483          * which we can route to and others we do not. For this reason we
2484          * need to select the destination which we can route to and if
2485          * there are multiple, we need to round robin.
2486          */
2487         rc = lnet_handle_find_routed_path(sd, sd->sd_dst_nid, &gw_lpni,
2488                                           &gw_peer);
2489         if (rc)
2490                 return rc;
2491
2492         sd->sd_send_case &= ~LOCAL_DST;
2493         sd->sd_send_case |= REMOTE_DST;
2494
2495         sd->sd_peer = gw_peer;
2496         sd->sd_best_lpni = gw_lpni;
2497
2498         return lnet_handle_send(sd);
2499 }
2500
2501 /*
2502  * Source not specified
2503  * Remote destination
2504  * Non-MR peer
2505  *
2506  * Must send to the specified peer NID using the same source NID that
2507  * we've used before. If it's the first time to talk to that peer then
2508  * find the source NI and assign it as preferred to that peer
2509  */
2510 static int
2511 lnet_handle_any_router_nmr_dst(struct lnet_send_data *sd)
2512 {
2513         int rc;
2514         struct lnet_peer_ni *gw_lpni = NULL;
2515         struct lnet_peer *gw_peer = NULL;
2516
2517         /*
2518          * Let's set if we have a preferred NI to talk to this NMR peer
2519          */
2520         sd->sd_best_ni = lnet_find_existing_preferred_best_ni(sd);
2521
2522         /*
2523          * find the router and that'll find the best NI if we didn't find
2524          * it already.
2525          */
2526         rc = lnet_handle_find_routed_path(sd, sd->sd_dst_nid, &gw_lpni,
2527                                           &gw_peer);
2528         if (rc)
2529                 return rc;
2530
2531         /*
2532          * set the best_ni we've chosen as the preferred one for
2533          * this peer
2534          */
2535         lnet_set_non_mr_pref_nid(sd);
2536
2537         /* we'll be sending to the gw */
2538         sd->sd_best_lpni = gw_lpni;
2539         sd->sd_peer = gw_peer;
2540
2541         return lnet_handle_send(sd);
2542 }
2543
2544 static int
2545 lnet_handle_send_case_locked(struct lnet_send_data *sd)
2546 {
2547         /*
2548          * turn off the SND_RESP bit.
2549          * It will be checked in the case handling
2550          */
2551         __u32 send_case = sd->sd_send_case &= ~SND_RESP ;
2552
2553         CDEBUG(D_NET, "Source %s%s to %s %s %s destination\n",
2554                 (send_case & SRC_SPEC) ? "Specified: " : "ANY",
2555                 (send_case & SRC_SPEC) ? libcfs_nid2str(sd->sd_src_nid) : "",
2556                 (send_case & MR_DST) ? "MR: " : "NMR: ",
2557                 libcfs_nid2str(sd->sd_dst_nid),
2558                 (send_case & LOCAL_DST) ? "local" : "routed");
2559
2560         switch (send_case) {
2561         /*
2562          * For all cases where the source is specified, we should always
2563          * use the destination NID, whether it's an MR destination or not,
2564          * since we're continuing a series of related messages for the
2565          * same RPC
2566          */
2567         case SRC_SPEC_LOCAL_NMR_DST:
2568                 return lnet_handle_spec_local_nmr_dst(sd);
2569         case SRC_SPEC_LOCAL_MR_DST:
2570                 return lnet_handle_spec_local_mr_dst(sd);
2571         case SRC_SPEC_ROUTER_NMR_DST:
2572         case SRC_SPEC_ROUTER_MR_DST:
2573                 return lnet_handle_spec_router_dst(sd);
2574         case SRC_ANY_LOCAL_NMR_DST:
2575                 return lnet_handle_any_local_nmr_dst(sd);
2576         case SRC_ANY_LOCAL_MR_DST:
2577         case SRC_ANY_ROUTER_MR_DST:
2578                 return lnet_handle_any_mr_dst(sd);
2579         case SRC_ANY_ROUTER_NMR_DST:
2580                 return lnet_handle_any_router_nmr_dst(sd);
2581         default:
2582                 CERROR("Unknown send case\n");
2583                 return -1;
2584         }
2585 }
2586
2587 static int
2588 lnet_select_pathway(lnet_nid_t src_nid, lnet_nid_t dst_nid,
2589                     struct lnet_msg *msg, lnet_nid_t rtr_nid)
2590 {
2591         struct lnet_peer_ni     *lpni;
2592         struct lnet_peer        *peer;
2593         struct lnet_send_data   send_data;
2594         int                     cpt, rc;
2595         int                     md_cpt;
2596         __u32                   send_case = 0;
2597
2598         memset(&send_data, 0, sizeof(send_data));
2599
2600         /*
2601          * get an initial CPT to use for locking. The idea here is not to
2602          * serialize the calls to select_pathway, so that as many
2603          * operations can run concurrently as possible. To do that we use
2604          * the CPT where this call is being executed. Later on when we
2605          * determine the CPT to use in lnet_message_commit, we switch the
2606          * lock and check if there was any configuration change.  If none,
2607          * then we proceed, if there is, then we restart the operation.
2608          */
2609         cpt = lnet_net_lock_current();
2610
2611         md_cpt = lnet_cpt_of_md(msg->msg_md, msg->msg_offset);
2612         if (md_cpt == CFS_CPT_ANY)
2613                 md_cpt = cpt;
2614
2615 again:
2616
2617         /*
2618          * If we're being asked to send to the loopback interface, there
2619          * is no need to go through any selection. We can just shortcut
2620          * the entire process and send over lolnd
2621          */
2622         send_data.sd_msg = msg;
2623         send_data.sd_cpt = cpt;
2624         if (LNET_NETTYP(LNET_NIDNET(dst_nid)) == LOLND) {
2625                 rc = lnet_handle_lo_send(&send_data);
2626                 lnet_net_unlock(cpt);
2627                 return rc;
2628         }
2629
2630         /*
2631          * find an existing peer_ni, or create one and mark it as having been
2632          * created due to network traffic. This call will create the
2633          * peer->peer_net->peer_ni tree.
2634          */
2635         lpni = lnet_nid2peerni_locked(dst_nid, LNET_NID_ANY, cpt);
2636         if (IS_ERR(lpni)) {
2637                 lnet_net_unlock(cpt);
2638                 return PTR_ERR(lpni);
2639         }
2640
2641         /*
2642          * Cache the original src_nid. If we need to resend the message
2643          * then we'll need to know whether the src_nid was originally
2644          * specified for this message. If it was originally specified,
2645          * then we need to keep using the same src_nid since it's
2646          * continuing the same sequence of messages.
2647          */
2648         msg->msg_src_nid_param = src_nid;
2649
2650         /*
2651          * Now that we have a peer_ni, check if we want to discover
2652          * the peer. Traffic to the LNET_RESERVED_PORTAL should not
2653          * trigger discovery.
2654          */
2655         peer = lpni->lpni_peer_net->lpn_peer;
2656         rc = lnet_initiate_peer_discovery(lpni, msg, rtr_nid, cpt);
2657         if (rc) {
2658                 lnet_peer_ni_decref_locked(lpni);
2659                 lnet_net_unlock(cpt);
2660                 return rc;
2661         }
2662         lnet_peer_ni_decref_locked(lpni);
2663
2664         /*
2665          * Identify the different send cases
2666          */
2667         if (src_nid == LNET_NID_ANY)
2668                 send_case |= SRC_ANY;
2669         else
2670                 send_case |= SRC_SPEC;
2671
2672         if (lnet_get_net_locked(LNET_NIDNET(dst_nid)))
2673                 send_case |= LOCAL_DST;
2674         else
2675                 send_case |= REMOTE_DST;
2676
2677         /*
2678          * if this is a non-MR peer or if we're recovering a peer ni then
2679          * let's consider this an NMR case so we can hit the destination
2680          * NID.
2681          */
2682         if (!lnet_peer_is_multi_rail(peer) || msg->msg_recovery)
2683                 send_case |= NMR_DST;
2684         else
2685                 send_case |= MR_DST;
2686
2687         if (msg->msg_type == LNET_MSG_REPLY ||
2688             msg->msg_type == LNET_MSG_ACK)
2689                 send_case |= SND_RESP;
2690
2691         /* assign parameters to the send_data */
2692         send_data.sd_rtr_nid = rtr_nid;
2693         send_data.sd_src_nid = src_nid;
2694         send_data.sd_dst_nid = dst_nid;
2695         send_data.sd_best_lpni = lpni;
2696         /*
2697          * keep a pointer to the final destination in case we're going to
2698          * route, so we'll need to access it later
2699          */
2700         send_data.sd_final_dst_lpni = lpni;
2701         send_data.sd_peer = peer;
2702         send_data.sd_md_cpt = md_cpt;
2703         send_data.sd_send_case = send_case;
2704
2705         rc = lnet_handle_send_case_locked(&send_data);
2706
2707         /*
2708          * Update the local cpt since send_data.sd_cpt might've been
2709          * updated as a result of calling lnet_handle_send_case_locked().
2710          */
2711         cpt = send_data.sd_cpt;
2712
2713         if (rc == REPEAT_SEND)
2714                 goto again;
2715
2716         lnet_net_unlock(cpt);
2717
2718         return rc;
2719 }
2720
2721 int
2722 lnet_send(lnet_nid_t src_nid, struct lnet_msg *msg, lnet_nid_t rtr_nid)
2723 {
2724         lnet_nid_t              dst_nid = msg->msg_target.nid;
2725         int                     rc;
2726
2727         /*
2728          * NB: rtr_nid is set to LNET_NID_ANY for all current use-cases,
2729          * but we might want to use pre-determined router for ACK/REPLY
2730          * in the future
2731          */
2732         /* NB: ni != NULL == interface pre-determined (ACK/REPLY) */
2733         LASSERT(msg->msg_txpeer == NULL);
2734         LASSERT(msg->msg_txni == NULL);
2735         LASSERT(!msg->msg_sending);
2736         LASSERT(!msg->msg_target_is_router);
2737         LASSERT(!msg->msg_receiving);
2738
2739         msg->msg_sending = 1;
2740
2741         LASSERT(!msg->msg_tx_committed);
2742
2743         rc = lnet_select_pathway(src_nid, dst_nid, msg, rtr_nid);
2744         if (rc < 0) {
2745                 if (rc == -EHOSTUNREACH)
2746                         msg->msg_health_status = LNET_MSG_STATUS_REMOTE_ERROR;
2747                 else
2748                         msg->msg_health_status = LNET_MSG_STATUS_LOCAL_ERROR;
2749                 return rc;
2750         }
2751
2752         if (rc == LNET_CREDIT_OK)
2753                 lnet_ni_send(msg->msg_txni, msg);
2754
2755         /* rc == LNET_CREDIT_OK or LNET_CREDIT_WAIT or LNET_DC_WAIT */
2756         return 0;
2757 }
2758
2759 enum lnet_mt_event_type {
2760         MT_TYPE_LOCAL_NI = 0,
2761         MT_TYPE_PEER_NI
2762 };
2763
2764 struct lnet_mt_event_info {
2765         enum lnet_mt_event_type mt_type;
2766         lnet_nid_t mt_nid;
2767 };
2768
2769 /* called with res_lock held */
2770 void
2771 lnet_detach_rsp_tracker(struct lnet_libmd *md, int cpt)
2772 {
2773         struct lnet_rsp_tracker *rspt;
2774
2775         /*
2776          * msg has a refcount on the MD so the MD is not going away.
2777          * The rspt queue for the cpt is protected by
2778          * the lnet_net_lock(cpt). cpt is the cpt of the MD cookie.
2779          */
2780         if (!md->md_rspt_ptr)
2781                 return;
2782
2783         rspt = md->md_rspt_ptr;
2784         md->md_rspt_ptr = NULL;
2785
2786         /* debug code */
2787         LASSERT(rspt->rspt_cpt == cpt);
2788
2789         /*
2790          * invalidate the handle to indicate that a response has been
2791          * received, which will then lead the monitor thread to clean up
2792          * the rspt block.
2793          */
2794         LNetInvalidateMDHandle(&rspt->rspt_mdh);
2795 }
2796
2797 static void
2798 lnet_finalize_expired_responses(bool force)
2799 {
2800         struct lnet_libmd *md;
2801         struct list_head local_queue;
2802         struct lnet_rsp_tracker *rspt, *tmp;
2803         int i;
2804
2805         if (the_lnet.ln_mt_rstq == NULL)
2806                 return;
2807
2808         cfs_cpt_for_each(i, lnet_cpt_table()) {
2809                 INIT_LIST_HEAD(&local_queue);
2810
2811                 lnet_net_lock(i);
2812                 if (!the_lnet.ln_mt_rstq[i]) {
2813                         lnet_net_unlock(i);
2814                         continue;
2815                 }
2816                 list_splice_init(the_lnet.ln_mt_rstq[i], &local_queue);
2817                 lnet_net_unlock(i);
2818
2819                 list_for_each_entry_safe(rspt, tmp, &local_queue, rspt_on_list) {
2820                         /*
2821                          * The rspt mdh will be invalidated when a response
2822                          * is received or whenever we want to discard the
2823                          * block the monitor thread will walk the queue
2824                          * and clean up any rsts with an invalid mdh.
2825                          * The monitor thread will walk the queue until
2826                          * the first unexpired rspt block. This means that
2827                          * some rspt blocks which received their
2828                          * corresponding responses will linger in the
2829                          * queue until they are cleaned up eventually.
2830                          */
2831                         lnet_res_lock(i);
2832                         if (LNetMDHandleIsInvalid(rspt->rspt_mdh)) {
2833                                 lnet_res_unlock(i);
2834                                 list_del_init(&rspt->rspt_on_list);
2835                                 lnet_rspt_free(rspt, i);
2836                                 continue;
2837                         }
2838
2839                         if (ktime_compare(ktime_get(), rspt->rspt_deadline) >= 0 ||
2840                             force) {
2841                                 struct lnet_peer_ni *lpni;
2842                                 lnet_nid_t nid;
2843
2844                                 md = lnet_handle2md(&rspt->rspt_mdh);
2845                                 if (!md) {
2846                                         LNetInvalidateMDHandle(&rspt->rspt_mdh);
2847                                         lnet_res_unlock(i);
2848                                         list_del_init(&rspt->rspt_on_list);
2849                                         lnet_rspt_free(rspt, i);
2850                                         continue;
2851                                 }
2852                                 LASSERT(md->md_rspt_ptr == rspt);
2853                                 md->md_rspt_ptr = NULL;
2854                                 lnet_res_unlock(i);
2855
2856                                 lnet_net_lock(i);
2857                                 the_lnet.ln_counters[i]->lct_health.lch_response_timeout_count++;
2858                                 lnet_net_unlock(i);
2859
2860                                 list_del_init(&rspt->rspt_on_list);
2861
2862                                 nid = rspt->rspt_next_hop_nid;
2863
2864                                 CNETERR("Response timed out: md = %p: nid = %s\n",
2865                                         md, libcfs_nid2str(nid));
2866                                 LNetMDUnlink(rspt->rspt_mdh);
2867                                 lnet_rspt_free(rspt, i);
2868
2869                                 /*
2870                                  * If there is a timeout on the response
2871                                  * from the next hop decrement its health
2872                                  * value so that we don't use it
2873                                  */
2874                                 lnet_net_lock(0);
2875                                 lpni = lnet_find_peer_ni_locked(nid);
2876                                 if (lpni) {
2877                                         lnet_handle_remote_failure_locked(lpni);
2878                                         lnet_peer_ni_decref_locked(lpni);
2879                                 }
2880                                 lnet_net_unlock(0);
2881                         } else {
2882                                 lnet_res_unlock(i);
2883                                 break;
2884                         }
2885                 }
2886
2887                 lnet_net_lock(i);
2888                 if (!list_empty(&local_queue))
2889                         list_splice(&local_queue, the_lnet.ln_mt_rstq[i]);
2890                 lnet_net_unlock(i);
2891         }
2892 }
2893
2894 static void
2895 lnet_resend_pending_msgs_locked(struct list_head *resendq, int cpt)
2896 {
2897         struct lnet_msg *msg;
2898
2899         while (!list_empty(resendq)) {
2900                 struct lnet_peer_ni *lpni;
2901
2902                 msg = list_entry(resendq->next, struct lnet_msg,
2903                                  msg_list);
2904
2905                 list_del_init(&msg->msg_list);
2906
2907                 lpni = lnet_find_peer_ni_locked(msg->msg_hdr.dest_nid);
2908                 if (!lpni) {
2909                         lnet_net_unlock(cpt);
2910                         CERROR("Expected that a peer is already created for %s\n",
2911                                libcfs_nid2str(msg->msg_hdr.dest_nid));
2912                         msg->msg_no_resend = true;
2913                         lnet_finalize(msg, -EFAULT);
2914                         lnet_net_lock(cpt);
2915                 } else {
2916                         struct lnet_peer *peer;
2917                         int rc;
2918                         lnet_nid_t src_nid = LNET_NID_ANY;
2919
2920                         /*
2921                          * if this message is not being routed and the
2922                          * peer is non-MR then we must use the same
2923                          * src_nid that was used in the original send.
2924                          * Otherwise if we're routing the message (IE
2925                          * we're a router) then we can use any of our
2926                          * local interfaces. It doesn't matter to the
2927                          * final destination.
2928                          */
2929                         peer = lpni->lpni_peer_net->lpn_peer;
2930                         if (!msg->msg_routing &&
2931                             !lnet_peer_is_multi_rail(peer))
2932                                 src_nid = le64_to_cpu(msg->msg_hdr.src_nid);
2933
2934                         /*
2935                          * If we originally specified a src NID, then we
2936                          * must attempt to reuse it in the resend as well.
2937                          */
2938                         if (msg->msg_src_nid_param != LNET_NID_ANY)
2939                                 src_nid = msg->msg_src_nid_param;
2940                         lnet_peer_ni_decref_locked(lpni);
2941
2942                         lnet_net_unlock(cpt);
2943                         CDEBUG(D_NET, "resending %s->%s: %s recovery %d try# %d\n",
2944                                libcfs_nid2str(src_nid),
2945                                libcfs_id2str(msg->msg_target),
2946                                lnet_msgtyp2str(msg->msg_type),
2947                                msg->msg_recovery,
2948                                msg->msg_retry_count);
2949                         rc = lnet_send(src_nid, msg, LNET_NID_ANY);
2950                         if (rc) {
2951                                 CERROR("Error sending %s to %s: %d\n",
2952                                        lnet_msgtyp2str(msg->msg_type),
2953                                        libcfs_id2str(msg->msg_target), rc);
2954                                 msg->msg_no_resend = true;
2955                                 lnet_finalize(msg, rc);
2956                         }
2957                         lnet_net_lock(cpt);
2958                         if (!rc)
2959                                 the_lnet.ln_counters[cpt]->lct_health.lch_resend_count++;
2960                 }
2961         }
2962 }
2963
2964 static void
2965 lnet_resend_pending_msgs(void)
2966 {
2967         int i;
2968
2969         cfs_cpt_for_each(i, lnet_cpt_table()) {
2970                 lnet_net_lock(i);
2971                 lnet_resend_pending_msgs_locked(the_lnet.ln_mt_resendqs[i], i);
2972                 lnet_net_unlock(i);
2973         }
2974 }
2975
2976 /* called with cpt and ni_lock held */
2977 static void
2978 lnet_unlink_ni_recovery_mdh_locked(struct lnet_ni *ni, int cpt, bool force)
2979 {
2980         struct lnet_handle_md recovery_mdh;
2981
2982         LNetInvalidateMDHandle(&recovery_mdh);
2983
2984         if (ni->ni_recovery_state & LNET_NI_RECOVERY_PENDING ||
2985             force) {
2986                 recovery_mdh = ni->ni_ping_mdh;
2987                 LNetInvalidateMDHandle(&ni->ni_ping_mdh);
2988         }
2989         lnet_ni_unlock(ni);
2990         lnet_net_unlock(cpt);
2991         if (!LNetMDHandleIsInvalid(recovery_mdh))
2992                 LNetMDUnlink(recovery_mdh);
2993         lnet_net_lock(cpt);
2994         lnet_ni_lock(ni);
2995 }
2996
2997 static void
2998 lnet_recover_local_nis(void)
2999 {
3000         struct lnet_mt_event_info *ev_info;
3001         struct list_head processed_list;
3002         struct list_head local_queue;
3003         struct lnet_handle_md mdh;
3004         struct lnet_ni *tmp;
3005         struct lnet_ni *ni;
3006         lnet_nid_t nid;
3007         int healthv;
3008         int rc;
3009
3010         INIT_LIST_HEAD(&local_queue);
3011         INIT_LIST_HEAD(&processed_list);
3012
3013         /*
3014          * splice the recovery queue on a local queue. We will iterate
3015          * through the local queue and update it as needed. Once we're
3016          * done with the traversal, we'll splice the local queue back on
3017          * the head of the ln_mt_localNIRecovq. Any newly added local NIs
3018          * will be traversed in the next iteration.
3019          */
3020         lnet_net_lock(0);
3021         list_splice_init(&the_lnet.ln_mt_localNIRecovq,
3022                          &local_queue);
3023         lnet_net_unlock(0);
3024
3025         list_for_each_entry_safe(ni, tmp, &local_queue, ni_recovery) {
3026                 /*
3027                  * if an NI is being deleted or it is now healthy, there
3028                  * is no need to keep it around in the recovery queue.
3029                  * The monitor thread is the only thread responsible for
3030                  * removing the NI from the recovery queue.
3031                  * Multiple threads can be adding NIs to the recovery
3032                  * queue.
3033                  */
3034                 healthv = atomic_read(&ni->ni_healthv);
3035
3036                 lnet_net_lock(0);
3037                 lnet_ni_lock(ni);
3038                 if (ni->ni_state != LNET_NI_STATE_ACTIVE ||
3039                     healthv == LNET_MAX_HEALTH_VALUE) {
3040                         list_del_init(&ni->ni_recovery);
3041                         lnet_unlink_ni_recovery_mdh_locked(ni, 0, false);
3042                         lnet_ni_unlock(ni);
3043                         lnet_ni_decref_locked(ni, 0);
3044                         lnet_net_unlock(0);
3045                         continue;
3046                 }
3047
3048                 /*
3049                  * if the local NI failed recovery we must unlink the md.
3050                  * But we want to keep the local_ni on the recovery queue
3051                  * so we can continue the attempts to recover it.
3052                  */
3053                 if (ni->ni_recovery_state & LNET_NI_RECOVERY_FAILED) {
3054                         lnet_unlink_ni_recovery_mdh_locked(ni, 0, true);
3055                         ni->ni_recovery_state &= ~LNET_NI_RECOVERY_FAILED;
3056                 }
3057
3058                 lnet_ni_unlock(ni);
3059                 lnet_net_unlock(0);
3060
3061
3062                 CDEBUG(D_NET, "attempting to recover local ni: %s\n",
3063                        libcfs_nid2str(ni->ni_nid));
3064
3065                 lnet_ni_lock(ni);
3066                 if (!(ni->ni_recovery_state & LNET_NI_RECOVERY_PENDING)) {
3067                         ni->ni_recovery_state |= LNET_NI_RECOVERY_PENDING;
3068                         lnet_ni_unlock(ni);
3069
3070                         LIBCFS_ALLOC(ev_info, sizeof(*ev_info));
3071                         if (!ev_info) {
3072                                 CERROR("out of memory. Can't recover %s\n",
3073                                        libcfs_nid2str(ni->ni_nid));
3074                                 lnet_ni_lock(ni);
3075                                 ni->ni_recovery_state &=
3076                                   ~LNET_NI_RECOVERY_PENDING;
3077                                 lnet_ni_unlock(ni);
3078                                 continue;
3079                         }
3080
3081                         mdh = ni->ni_ping_mdh;
3082                         /*
3083                          * Invalidate the ni mdh in case it's deleted.
3084                          * We'll unlink the mdh in this case below.
3085                          */
3086                         LNetInvalidateMDHandle(&ni->ni_ping_mdh);
3087                         nid = ni->ni_nid;
3088
3089                         /*
3090                          * remove the NI from the local queue and drop the
3091                          * reference count to it while we're recovering
3092                          * it. The reason for that, is that the NI could
3093                          * be deleted, and the way the code is structured
3094                          * is if we don't drop the NI, then the deletion
3095                          * code will enter a loop waiting for the
3096                          * reference count to be removed while holding the
3097                          * ln_mutex_lock(). When we look up the peer to
3098                          * send to in lnet_select_pathway() we will try to
3099                          * lock the ln_mutex_lock() as well, leading to
3100                          * a deadlock. By dropping the refcount and
3101                          * removing it from the list, we allow for the NI
3102                          * to be removed, then we use the cached NID to
3103                          * look it up again. If it's gone, then we just
3104                          * continue examining the rest of the queue.
3105                          */
3106                         lnet_net_lock(0);
3107                         list_del_init(&ni->ni_recovery);
3108                         lnet_ni_decref_locked(ni, 0);
3109                         lnet_net_unlock(0);
3110
3111                         ev_info->mt_type = MT_TYPE_LOCAL_NI;
3112                         ev_info->mt_nid = nid;
3113                         rc = lnet_send_ping(nid, &mdh, LNET_INTERFACES_MIN,
3114                                             ev_info, the_lnet.ln_mt_eqh, true);
3115                         /* lookup the nid again */
3116                         lnet_net_lock(0);
3117                         ni = lnet_nid2ni_locked(nid, 0);
3118                         if (!ni) {
3119                                 /*
3120                                  * the NI has been deleted when we dropped
3121                                  * the ref count
3122                                  */
3123                                 lnet_net_unlock(0);
3124                                 LNetMDUnlink(mdh);
3125                                 continue;
3126                         }
3127                         /*
3128                          * Same note as in lnet_recover_peer_nis(). When
3129                          * we're sending the ping, the NI is free to be
3130                          * deleted or manipulated. By this point it
3131                          * could've been added back on the recovery queue,
3132                          * and a refcount taken on it.
3133                          * So we can't just add it blindly again or we'll
3134                          * corrupt the queue. We must check under lock if
3135                          * it's not on any list and if not then add it
3136                          * to the processed list, which will eventually be
3137                          * spliced back on to the recovery queue.
3138                          */
3139                         ni->ni_ping_mdh = mdh;
3140                         if (list_empty(&ni->ni_recovery)) {
3141                                 list_add_tail(&ni->ni_recovery, &processed_list);
3142                                 lnet_ni_addref_locked(ni, 0);
3143                         }
3144                         lnet_net_unlock(0);
3145
3146                         lnet_ni_lock(ni);
3147                         if (rc)
3148                                 ni->ni_recovery_state &= ~LNET_NI_RECOVERY_PENDING;
3149                 }
3150                 lnet_ni_unlock(ni);
3151         }
3152
3153         /*
3154          * put back the remaining NIs on the ln_mt_localNIRecovq to be
3155          * reexamined in the next iteration.
3156          */
3157         list_splice_init(&processed_list, &local_queue);
3158         lnet_net_lock(0);
3159         list_splice(&local_queue, &the_lnet.ln_mt_localNIRecovq);
3160         lnet_net_unlock(0);
3161 }
3162
3163 static struct list_head **
3164 lnet_create_array_of_queues(void)
3165 {
3166         struct list_head **qs;
3167         struct list_head *q;
3168         int i;
3169
3170         qs = cfs_percpt_alloc(lnet_cpt_table(),
3171                               sizeof(struct list_head));
3172         if (!qs) {
3173                 CERROR("Failed to allocate queues\n");
3174                 return NULL;
3175         }
3176
3177         cfs_percpt_for_each(q, i, qs)
3178                 INIT_LIST_HEAD(q);
3179
3180         return qs;
3181 }
3182
3183 static int
3184 lnet_resendqs_create(void)
3185 {
3186         struct list_head **resendqs;
3187         resendqs = lnet_create_array_of_queues();
3188
3189         if (!resendqs)
3190                 return -ENOMEM;
3191
3192         lnet_net_lock(LNET_LOCK_EX);
3193         the_lnet.ln_mt_resendqs = resendqs;
3194         lnet_net_unlock(LNET_LOCK_EX);
3195
3196         return 0;
3197 }
3198
3199 static void
3200 lnet_clean_local_ni_recoveryq(void)
3201 {
3202         struct lnet_ni *ni;
3203
3204         /* This is only called when the monitor thread has stopped */
3205         lnet_net_lock(0);
3206
3207         while (!list_empty(&the_lnet.ln_mt_localNIRecovq)) {
3208                 ni = list_entry(the_lnet.ln_mt_localNIRecovq.next,
3209                                 struct lnet_ni, ni_recovery);
3210                 list_del_init(&ni->ni_recovery);
3211                 lnet_ni_lock(ni);
3212                 lnet_unlink_ni_recovery_mdh_locked(ni, 0, true);
3213                 lnet_ni_unlock(ni);
3214                 lnet_ni_decref_locked(ni, 0);
3215         }
3216
3217         lnet_net_unlock(0);
3218 }
3219
3220 static void
3221 lnet_unlink_lpni_recovery_mdh_locked(struct lnet_peer_ni *lpni, int cpt,
3222                                      bool force)
3223 {
3224         struct lnet_handle_md recovery_mdh;
3225
3226         LNetInvalidateMDHandle(&recovery_mdh);
3227
3228         if (lpni->lpni_state & LNET_PEER_NI_RECOVERY_PENDING || force) {
3229                 recovery_mdh = lpni->lpni_recovery_ping_mdh;
3230                 LNetInvalidateMDHandle(&lpni->lpni_recovery_ping_mdh);
3231         }
3232         spin_unlock(&lpni->lpni_lock);
3233         lnet_net_unlock(cpt);
3234         if (!LNetMDHandleIsInvalid(recovery_mdh))
3235                 LNetMDUnlink(recovery_mdh);
3236         lnet_net_lock(cpt);
3237         spin_lock(&lpni->lpni_lock);
3238 }
3239
3240 static void
3241 lnet_clean_peer_ni_recoveryq(void)
3242 {
3243         struct lnet_peer_ni *lpni, *tmp;
3244
3245         lnet_net_lock(LNET_LOCK_EX);
3246
3247         list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_mt_peerNIRecovq,
3248                                  lpni_recovery) {
3249                 list_del_init(&lpni->lpni_recovery);
3250                 spin_lock(&lpni->lpni_lock);
3251                 lnet_unlink_lpni_recovery_mdh_locked(lpni, LNET_LOCK_EX, true);
3252                 spin_unlock(&lpni->lpni_lock);
3253                 lnet_peer_ni_decref_locked(lpni);
3254         }
3255
3256         lnet_net_unlock(LNET_LOCK_EX);
3257 }
3258
3259 static void
3260 lnet_clean_resendqs(void)
3261 {
3262         struct lnet_msg *msg, *tmp;
3263         struct list_head msgs;
3264         int i;
3265
3266         INIT_LIST_HEAD(&msgs);
3267
3268         cfs_cpt_for_each(i, lnet_cpt_table()) {
3269                 lnet_net_lock(i);
3270                 list_splice_init(the_lnet.ln_mt_resendqs[i], &msgs);
3271                 lnet_net_unlock(i);
3272                 list_for_each_entry_safe(msg, tmp, &msgs, msg_list) {
3273                         list_del_init(&msg->msg_list);
3274                         msg->msg_no_resend = true;
3275                         lnet_finalize(msg, -ESHUTDOWN);
3276                 }
3277         }
3278
3279         cfs_percpt_free(the_lnet.ln_mt_resendqs);
3280 }
3281
3282 static void
3283 lnet_recover_peer_nis(void)
3284 {
3285         struct lnet_mt_event_info *ev_info;
3286         struct list_head processed_list;
3287         struct list_head local_queue;
3288         struct lnet_handle_md mdh;
3289         struct lnet_peer_ni *lpni;
3290         struct lnet_peer_ni *tmp;
3291         lnet_nid_t nid;
3292         int healthv;
3293         int rc;
3294
3295         INIT_LIST_HEAD(&local_queue);
3296         INIT_LIST_HEAD(&processed_list);
3297
3298         /*
3299          * Always use cpt 0 for locking across all interactions with
3300          * ln_mt_peerNIRecovq
3301          */
3302         lnet_net_lock(0);
3303         list_splice_init(&the_lnet.ln_mt_peerNIRecovq,
3304                          &local_queue);
3305         lnet_net_unlock(0);
3306
3307         list_for_each_entry_safe(lpni, tmp, &local_queue,
3308                                  lpni_recovery) {
3309                 /*
3310                  * The same protection strategy is used here as is in the
3311                  * local recovery case.
3312                  */
3313                 lnet_net_lock(0);
3314                 healthv = atomic_read(&lpni->lpni_healthv);
3315                 spin_lock(&lpni->lpni_lock);
3316                 if (lpni->lpni_state & LNET_PEER_NI_DELETING ||
3317                     healthv == LNET_MAX_HEALTH_VALUE) {
3318                         list_del_init(&lpni->lpni_recovery);
3319                         lnet_unlink_lpni_recovery_mdh_locked(lpni, 0, false);
3320                         spin_unlock(&lpni->lpni_lock);
3321                         lnet_peer_ni_decref_locked(lpni);
3322                         lnet_net_unlock(0);
3323                         continue;
3324                 }
3325
3326                 /*
3327                  * If the peer NI has failed recovery we must unlink the
3328                  * md. But we want to keep the peer ni on the recovery
3329                  * queue so we can try to continue recovering it
3330                  */
3331                 if (lpni->lpni_state & LNET_PEER_NI_RECOVERY_FAILED) {
3332                         lnet_unlink_lpni_recovery_mdh_locked(lpni, 0, true);
3333                         lpni->lpni_state &= ~LNET_PEER_NI_RECOVERY_FAILED;
3334                 }
3335
3336                 spin_unlock(&lpni->lpni_lock);
3337                 lnet_net_unlock(0);
3338
3339                 /*
3340                  * NOTE: we're racing with peer deletion from user space.
3341                  * It's possible that a peer is deleted after we check its
3342                  * state. In this case the recovery can create a new peer
3343                  */
3344                 spin_lock(&lpni->lpni_lock);
3345                 if (!(lpni->lpni_state & LNET_PEER_NI_RECOVERY_PENDING) &&
3346                     !(lpni->lpni_state & LNET_PEER_NI_DELETING)) {
3347                         lpni->lpni_state |= LNET_PEER_NI_RECOVERY_PENDING;
3348                         spin_unlock(&lpni->lpni_lock);
3349
3350                         LIBCFS_ALLOC(ev_info, sizeof(*ev_info));
3351                         if (!ev_info) {
3352                                 CERROR("out of memory. Can't recover %s\n",
3353                                        libcfs_nid2str(lpni->lpni_nid));
3354                                 spin_lock(&lpni->lpni_lock);
3355                                 lpni->lpni_state &= ~LNET_PEER_NI_RECOVERY_PENDING;
3356                                 spin_unlock(&lpni->lpni_lock);
3357                                 continue;
3358                         }
3359
3360                         /* look at the comments in lnet_recover_local_nis() */
3361                         mdh = lpni->lpni_recovery_ping_mdh;
3362                         LNetInvalidateMDHandle(&lpni->lpni_recovery_ping_mdh);
3363                         nid = lpni->lpni_nid;
3364                         lnet_net_lock(0);
3365                         list_del_init(&lpni->lpni_recovery);
3366                         lnet_peer_ni_decref_locked(lpni);
3367                         lnet_net_unlock(0);
3368
3369                         ev_info->mt_type = MT_TYPE_PEER_NI;
3370                         ev_info->mt_nid = nid;
3371                         rc = lnet_send_ping(nid, &mdh, LNET_INTERFACES_MIN,
3372                                             ev_info, the_lnet.ln_mt_eqh, true);
3373                         lnet_net_lock(0);
3374                         /*
3375                          * lnet_find_peer_ni_locked() grabs a refcount for
3376                          * us. No need to take it explicitly.
3377                          */
3378                         lpni = lnet_find_peer_ni_locked(nid);
3379                         if (!lpni) {
3380                                 lnet_net_unlock(0);
3381                                 LNetMDUnlink(mdh);
3382                                 continue;
3383                         }
3384
3385                         lpni->lpni_recovery_ping_mdh = mdh;
3386                         /*
3387                          * While we're unlocked the lpni could've been
3388                          * readded on the recovery queue. In this case we
3389                          * don't need to add it to the local queue, since
3390                          * it's already on there and the thread that added
3391                          * it would've incremented the refcount on the
3392                          * peer, which means we need to decref the refcount
3393                          * that was implicitly grabbed by find_peer_ni_locked.
3394                          * Otherwise, if the lpni is still not on
3395                          * the recovery queue, then we'll add it to the
3396                          * processed list.
3397                          */
3398                         if (list_empty(&lpni->lpni_recovery))
3399                                 list_add_tail(&lpni->lpni_recovery, &processed_list);
3400                         else
3401                                 lnet_peer_ni_decref_locked(lpni);
3402                         lnet_net_unlock(0);
3403
3404                         spin_lock(&lpni->lpni_lock);
3405                         if (rc)
3406                                 lpni->lpni_state &= ~LNET_PEER_NI_RECOVERY_PENDING;
3407                 }
3408                 spin_unlock(&lpni->lpni_lock);
3409         }
3410
3411         list_splice_init(&processed_list, &local_queue);
3412         lnet_net_lock(0);
3413         list_splice(&local_queue, &the_lnet.ln_mt_peerNIRecovq);
3414         lnet_net_unlock(0);
3415 }
3416
3417 static int
3418 lnet_monitor_thread(void *arg)
3419 {
3420         time64_t recovery_timeout = 0;
3421         time64_t rsp_timeout = 0;
3422         int interval;
3423         time64_t now;
3424
3425         /*
3426          * The monitor thread takes care of the following:
3427          *  1. Checks the aliveness of routers
3428          *  2. Checks if there are messages on the resend queue to resend
3429          *     them.
3430          *  3. Check if there are any NIs on the local recovery queue and
3431          *     pings them
3432          *  4. Checks if there are any NIs on the remote recovery queue
3433          *     and pings them.
3434          */
3435         cfs_block_allsigs();
3436
3437         while (the_lnet.ln_mt_state == LNET_MT_STATE_RUNNING) {
3438                 now = ktime_get_real_seconds();
3439
3440                 if (lnet_router_checker_active())
3441                         lnet_check_routers();
3442
3443                 lnet_resend_pending_msgs();
3444
3445                 if (now >= rsp_timeout) {
3446                         lnet_finalize_expired_responses(false);
3447                         rsp_timeout = now + (lnet_transaction_timeout / 2);
3448                 }
3449
3450                 if (now >= recovery_timeout) {
3451                         lnet_recover_local_nis();
3452                         lnet_recover_peer_nis();
3453                         recovery_timeout = now + lnet_recovery_interval;
3454                 }
3455
3456                 /*
3457                  * TODO do we need to check if we should sleep without
3458                  * timeout?  Technically, an active system will always
3459                  * have messages in flight so this check will always
3460                  * evaluate to false. And on an idle system do we care
3461                  * if we wake up every 1 second? Although, we've seen
3462                  * cases where we get a complaint that an idle thread
3463                  * is waking up unnecessarily.
3464                  *
3465                  * Take into account the current net_count when you wake
3466                  * up for alive router checking, since we need to check
3467                  * possibly as many networks as we have configured.
3468                  */
3469                 interval = min(lnet_recovery_interval,
3470                                min((unsigned int) alive_router_check_interval /
3471                                         lnet_current_net_count,
3472                                    lnet_transaction_timeout / 2));
3473                 wait_event_interruptible_timeout(the_lnet.ln_mt_waitq,
3474                                                 false,
3475                                                 cfs_time_seconds(interval));
3476         }
3477
3478         /* Shutting down */
3479         lnet_net_lock(LNET_LOCK_EX);
3480         the_lnet.ln_mt_state = LNET_MT_STATE_SHUTDOWN;
3481         lnet_net_unlock(LNET_LOCK_EX);
3482
3483         /* signal that the monitor thread is exiting */
3484         up(&the_lnet.ln_mt_signal);
3485
3486         return 0;
3487 }
3488
3489 /*
3490  * lnet_send_ping
3491  * Sends a ping.
3492  * Returns == 0 if success
3493  * Returns > 0 if LNetMDBind or prior fails
3494  * Returns < 0 if LNetGet fails
3495  */
3496 int
3497 lnet_send_ping(lnet_nid_t dest_nid,
3498                struct lnet_handle_md *mdh, int nnis,
3499                void *user_data, struct lnet_handle_eq eqh, bool recovery)
3500 {
3501         struct lnet_md md = { NULL };
3502         struct lnet_process_id id;
3503         struct lnet_ping_buffer *pbuf;
3504         int rc;
3505
3506         if (dest_nid == LNET_NID_ANY) {
3507                 rc = -EHOSTUNREACH;
3508                 goto fail_error;
3509         }
3510
3511         pbuf = lnet_ping_buffer_alloc(nnis, GFP_NOFS);
3512         if (!pbuf) {
3513                 rc = ENOMEM;
3514                 goto fail_error;
3515         }
3516
3517         /* initialize md content */
3518         md.start     = &pbuf->pb_info;
3519         md.length    = LNET_PING_INFO_SIZE(nnis);
3520         md.threshold = 2; /* GET/REPLY */
3521         md.max_size  = 0;
3522         md.options   = LNET_MD_TRUNCATE;
3523         md.user_ptr  = user_data;
3524         md.eq_handle = eqh;
3525
3526         rc = LNetMDBind(md, LNET_UNLINK, mdh);
3527         if (rc) {
3528                 lnet_ping_buffer_decref(pbuf);
3529                 CERROR("Can't bind MD: %d\n", rc);
3530                 rc = -rc; /* change the rc to positive */
3531                 goto fail_error;
3532         }
3533         id.pid = LNET_PID_LUSTRE;
3534         id.nid = dest_nid;
3535
3536         rc = LNetGet(LNET_NID_ANY, *mdh, id,
3537                      LNET_RESERVED_PORTAL,
3538                      LNET_PROTO_PING_MATCHBITS, 0, recovery);
3539
3540         if (rc)
3541                 goto fail_unlink_md;
3542
3543         return 0;
3544
3545 fail_unlink_md:
3546         LNetMDUnlink(*mdh);
3547         LNetInvalidateMDHandle(mdh);
3548 fail_error:
3549         return rc;
3550 }
3551
3552 static void
3553 lnet_handle_recovery_reply(struct lnet_mt_event_info *ev_info,
3554                            int status, bool unlink_event)
3555 {
3556         lnet_nid_t nid = ev_info->mt_nid;
3557
3558         if (ev_info->mt_type == MT_TYPE_LOCAL_NI) {
3559                 struct lnet_ni *ni;
3560
3561                 lnet_net_lock(0);
3562                 ni = lnet_nid2ni_locked(nid, 0);
3563                 if (!ni) {
3564                         lnet_net_unlock(0);
3565                         return;
3566                 }
3567                 lnet_ni_lock(ni);
3568                 ni->ni_recovery_state &= ~LNET_NI_RECOVERY_PENDING;
3569                 if (status)
3570                         ni->ni_recovery_state |= LNET_NI_RECOVERY_FAILED;
3571                 lnet_ni_unlock(ni);
3572                 lnet_net_unlock(0);
3573
3574                 if (status != 0) {
3575                         CERROR("local NI (%s) recovery failed with %d\n",
3576                                libcfs_nid2str(nid), status);
3577                         return;
3578                 }
3579                 /*
3580                  * need to increment healthv for the ni here, because in
3581                  * the lnet_finalize() path we don't have access to this
3582                  * NI. And in order to get access to it, we'll need to
3583                  * carry forward too much information.
3584                  * In the peer case, it'll naturally be incremented
3585                  */
3586                 if (!unlink_event)
3587                         lnet_inc_healthv(&ni->ni_healthv);
3588         } else {
3589                 struct lnet_peer_ni *lpni;
3590                 int cpt;
3591
3592                 cpt = lnet_net_lock_current();
3593                 lpni = lnet_find_peer_ni_locked(nid);
3594                 if (!lpni) {
3595                         lnet_net_unlock(cpt);
3596                         return;
3597                 }
3598                 spin_lock(&lpni->lpni_lock);
3599                 lpni->lpni_state &= ~LNET_PEER_NI_RECOVERY_PENDING;
3600                 if (status)
3601                         lpni->lpni_state |= LNET_PEER_NI_RECOVERY_FAILED;
3602                 spin_unlock(&lpni->lpni_lock);
3603                 lnet_peer_ni_decref_locked(lpni);
3604                 lnet_net_unlock(cpt);
3605
3606                 if (status != 0)
3607                         CERROR("peer NI (%s) recovery failed with %d\n",
3608                                libcfs_nid2str(nid), status);
3609         }
3610 }
3611
3612 void
3613 lnet_mt_event_handler(struct lnet_event *event)
3614 {
3615         struct lnet_mt_event_info *ev_info = event->md.user_ptr;
3616         struct lnet_ping_buffer *pbuf;
3617
3618         /* TODO: remove assert */
3619         LASSERT(event->type == LNET_EVENT_REPLY ||
3620                 event->type == LNET_EVENT_SEND ||
3621                 event->type == LNET_EVENT_UNLINK);
3622
3623         CDEBUG(D_NET, "Received event: %d status: %d\n", event->type,
3624                event->status);
3625
3626         switch (event->type) {
3627         case LNET_EVENT_UNLINK:
3628                 CDEBUG(D_NET, "%s recovery ping unlinked\n",
3629                        libcfs_nid2str(ev_info->mt_nid));
3630         case LNET_EVENT_REPLY:
3631                 lnet_handle_recovery_reply(ev_info, event->status,
3632                                            event->type == LNET_EVENT_UNLINK);
3633                 break;
3634         case LNET_EVENT_SEND:
3635                 CDEBUG(D_NET, "%s recovery message sent %s:%d\n",
3636                                libcfs_nid2str(ev_info->mt_nid),
3637                                (event->status) ? "unsuccessfully" :
3638                                "successfully", event->status);
3639                 break;
3640         default:
3641                 CERROR("Unexpected event: %d\n", event->type);
3642                 break;
3643         }
3644         if (event->unlinked) {
3645                 LIBCFS_FREE(ev_info, sizeof(*ev_info));
3646                 pbuf = LNET_PING_INFO_TO_BUFFER(event->md.start);
3647                 lnet_ping_buffer_decref(pbuf);
3648         }
3649 }
3650
3651 static int
3652 lnet_rsp_tracker_create(void)
3653 {
3654         struct list_head **rstqs;
3655         rstqs = lnet_create_array_of_queues();
3656
3657         if (!rstqs)
3658                 return -ENOMEM;
3659
3660         the_lnet.ln_mt_rstq = rstqs;
3661
3662         return 0;
3663 }
3664
3665 static void
3666 lnet_rsp_tracker_clean(void)
3667 {
3668         lnet_finalize_expired_responses(true);
3669
3670         cfs_percpt_free(the_lnet.ln_mt_rstq);
3671         the_lnet.ln_mt_rstq = NULL;
3672 }
3673
3674 int lnet_monitor_thr_start(void)
3675 {
3676         int rc = 0;
3677         struct task_struct *task;
3678
3679         if (the_lnet.ln_mt_state != LNET_MT_STATE_SHUTDOWN)
3680                 return -EALREADY;
3681
3682         rc = lnet_resendqs_create();
3683         if (rc)
3684                 return rc;
3685
3686         rc = lnet_rsp_tracker_create();
3687         if (rc)
3688                 goto clean_queues;
3689
3690         sema_init(&the_lnet.ln_mt_signal, 0);
3691
3692         lnet_net_lock(LNET_LOCK_EX);
3693         the_lnet.ln_mt_state = LNET_MT_STATE_RUNNING;
3694         lnet_net_unlock(LNET_LOCK_EX);
3695         task = kthread_run(lnet_monitor_thread, NULL, "monitor_thread");
3696         if (IS_ERR(task)) {
3697                 rc = PTR_ERR(task);
3698                 CERROR("Can't start monitor thread: %d\n", rc);
3699                 goto clean_thread;
3700         }
3701
3702         return 0;
3703
3704 clean_thread:
3705         lnet_net_lock(LNET_LOCK_EX);
3706         the_lnet.ln_mt_state = LNET_MT_STATE_STOPPING;
3707         lnet_net_unlock(LNET_LOCK_EX);
3708         /* block until event callback signals exit */
3709         down(&the_lnet.ln_mt_signal);
3710         /* clean up */
3711         lnet_net_lock(LNET_LOCK_EX);
3712         the_lnet.ln_mt_state = LNET_MT_STATE_SHUTDOWN;
3713         lnet_net_unlock(LNET_LOCK_EX);
3714         lnet_rsp_tracker_clean();
3715         lnet_clean_local_ni_recoveryq();
3716         lnet_clean_peer_ni_recoveryq();
3717         lnet_clean_resendqs();
3718         LNetInvalidateEQHandle(&the_lnet.ln_mt_eqh);
3719         return rc;
3720 clean_queues:
3721         lnet_rsp_tracker_clean();
3722         lnet_clean_local_ni_recoveryq();
3723         lnet_clean_peer_ni_recoveryq();
3724         lnet_clean_resendqs();
3725         return rc;
3726 }
3727
3728 void lnet_monitor_thr_stop(void)
3729 {
3730         if (the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN)
3731                 return;
3732
3733         LASSERT(the_lnet.ln_mt_state == LNET_MT_STATE_RUNNING);
3734         lnet_net_lock(LNET_LOCK_EX);
3735         the_lnet.ln_mt_state = LNET_MT_STATE_STOPPING;
3736         lnet_net_unlock(LNET_LOCK_EX);
3737
3738         /* tell the monitor thread that we're shutting down */
3739         wake_up(&the_lnet.ln_mt_waitq);
3740
3741         /* block until monitor thread signals that it's done */
3742         down(&the_lnet.ln_mt_signal);
3743         LASSERT(the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN);
3744
3745         /* perform cleanup tasks */
3746         lnet_rsp_tracker_clean();
3747         lnet_clean_local_ni_recoveryq();
3748         lnet_clean_peer_ni_recoveryq();
3749         lnet_clean_resendqs();
3750
3751         return;
3752 }
3753
3754 void
3755 lnet_drop_message(struct lnet_ni *ni, int cpt, void *private, unsigned int nob,
3756                   __u32 msg_type)
3757 {
3758         lnet_net_lock(cpt);
3759         lnet_incr_stats(&ni->ni_stats, msg_type, LNET_STATS_TYPE_DROP);
3760         the_lnet.ln_counters[cpt]->lct_common.lcc_drop_count++;
3761         the_lnet.ln_counters[cpt]->lct_common.lcc_drop_length += nob;
3762         lnet_net_unlock(cpt);
3763
3764         lnet_ni_recv(ni, private, NULL, 0, 0, 0, nob);
3765 }
3766
3767 static void
3768 lnet_recv_put(struct lnet_ni *ni, struct lnet_msg *msg)
3769 {
3770         struct lnet_hdr *hdr = &msg->msg_hdr;
3771
3772         if (msg->msg_wanted != 0)
3773                 lnet_setpayloadbuffer(msg);
3774
3775         lnet_build_msg_event(msg, LNET_EVENT_PUT);
3776
3777         /* Must I ACK?  If so I'll grab the ack_wmd out of the header and put
3778          * it back into the ACK during lnet_finalize() */
3779         msg->msg_ack = (!lnet_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
3780                         (msg->msg_md->md_options & LNET_MD_ACK_DISABLE) == 0);
3781
3782         lnet_ni_recv(ni, msg->msg_private, msg, msg->msg_rx_delayed,
3783                      msg->msg_offset, msg->msg_wanted, hdr->payload_length);
3784 }
3785
3786 static int
3787 lnet_parse_put(struct lnet_ni *ni, struct lnet_msg *msg)
3788 {
3789         struct lnet_hdr         *hdr = &msg->msg_hdr;
3790         struct lnet_match_info  info;
3791         int                     rc;
3792         bool                    ready_delay;
3793
3794         /* Convert put fields to host byte order */
3795         hdr->msg.put.match_bits = le64_to_cpu(hdr->msg.put.match_bits);
3796         hdr->msg.put.ptl_index  = le32_to_cpu(hdr->msg.put.ptl_index);
3797         hdr->msg.put.offset     = le32_to_cpu(hdr->msg.put.offset);
3798
3799         /* Primary peer NID. */
3800         info.mi_id.nid  = msg->msg_initiator;
3801         info.mi_id.pid  = hdr->src_pid;
3802         info.mi_opc     = LNET_MD_OP_PUT;
3803         info.mi_portal  = hdr->msg.put.ptl_index;
3804         info.mi_rlength = hdr->payload_length;
3805         info.mi_roffset = hdr->msg.put.offset;
3806         info.mi_mbits   = hdr->msg.put.match_bits;
3807         info.mi_cpt     = lnet_cpt_of_nid(msg->msg_initiator, ni);
3808
3809         msg->msg_rx_ready_delay = ni->ni_net->net_lnd->lnd_eager_recv == NULL;
3810         ready_delay = msg->msg_rx_ready_delay;
3811
3812  again:
3813         rc = lnet_ptl_match_md(&info, msg);
3814         switch (rc) {
3815         default:
3816                 LBUG();
3817
3818         case LNET_MATCHMD_OK:
3819                 lnet_recv_put(ni, msg);
3820                 return 0;
3821
3822         case LNET_MATCHMD_NONE:
3823                 if (ready_delay)
3824                         /* no eager_recv or has already called it, should
3825                          * have been attached on delayed list */
3826                         return 0;
3827
3828                 rc = lnet_ni_eager_recv(ni, msg);
3829                 if (rc == 0) {
3830                         ready_delay = true;
3831                         goto again;
3832                 }
3833                 /* fall through */
3834
3835         case LNET_MATCHMD_DROP:
3836                 CNETERR("Dropping PUT from %s portal %d match %llu"
3837                         " offset %d length %d: %d\n",
3838                         libcfs_id2str(info.mi_id), info.mi_portal,
3839                         info.mi_mbits, info.mi_roffset, info.mi_rlength, rc);
3840
3841                 return -ENOENT; /* -ve: OK but no match */
3842         }
3843 }
3844
3845 static int
3846 lnet_parse_get(struct lnet_ni *ni, struct lnet_msg *msg, int rdma_get)
3847 {
3848         struct lnet_match_info info;
3849         struct lnet_hdr *hdr = &msg->msg_hdr;
3850         struct lnet_process_id source_id;
3851         struct lnet_handle_wire reply_wmd;
3852         int rc;
3853
3854         /* Convert get fields to host byte order */
3855         hdr->msg.get.match_bits   = le64_to_cpu(hdr->msg.get.match_bits);
3856         hdr->msg.get.ptl_index    = le32_to_cpu(hdr->msg.get.ptl_index);
3857         hdr->msg.get.sink_length  = le32_to_cpu(hdr->msg.get.sink_length);
3858         hdr->msg.get.src_offset   = le32_to_cpu(hdr->msg.get.src_offset);
3859
3860         source_id.nid = hdr->src_nid;
3861         source_id.pid = hdr->src_pid;
3862         /* Primary peer NID */
3863         info.mi_id.nid  = msg->msg_initiator;
3864         info.mi_id.pid  = hdr->src_pid;
3865         info.mi_opc     = LNET_MD_OP_GET;
3866         info.mi_portal  = hdr->msg.get.ptl_index;
3867         info.mi_rlength = hdr->msg.get.sink_length;
3868         info.mi_roffset = hdr->msg.get.src_offset;
3869         info.mi_mbits   = hdr->msg.get.match_bits;
3870         info.mi_cpt     = lnet_cpt_of_nid(msg->msg_initiator, ni);
3871
3872         rc = lnet_ptl_match_md(&info, msg);
3873         if (rc == LNET_MATCHMD_DROP) {
3874                 CNETERR("Dropping GET from %s portal %d match %llu"
3875                         " offset %d length %d\n",
3876                         libcfs_id2str(info.mi_id), info.mi_portal,
3877                         info.mi_mbits, info.mi_roffset, info.mi_rlength);
3878                 return -ENOENT; /* -ve: OK but no match */
3879         }
3880
3881         LASSERT(rc == LNET_MATCHMD_OK);
3882
3883         lnet_build_msg_event(msg, LNET_EVENT_GET);
3884
3885         reply_wmd = hdr->msg.get.return_wmd;
3886
3887         lnet_prep_send(msg, LNET_MSG_REPLY, source_id,
3888                        msg->msg_offset, msg->msg_wanted);
3889
3890         msg->msg_hdr.msg.reply.dst_wmd = reply_wmd;
3891
3892         if (rdma_get) {
3893                 /* The LND completes the REPLY from her recv procedure */
3894                 lnet_ni_recv(ni, msg->msg_private, msg, 0,
3895                              msg->msg_offset, msg->msg_len, msg->msg_len);
3896                 return 0;
3897         }
3898
3899         lnet_ni_recv(ni, msg->msg_private, NULL, 0, 0, 0, 0);
3900         msg->msg_receiving = 0;
3901
3902         rc = lnet_send(ni->ni_nid, msg, msg->msg_from);
3903         if (rc < 0) {
3904                 /* didn't get as far as lnet_ni_send() */
3905                 CERROR("%s: Unable to send REPLY for GET from %s: %d\n",
3906                        libcfs_nid2str(ni->ni_nid),
3907                        libcfs_id2str(info.mi_id), rc);
3908
3909                 lnet_finalize(msg, rc);
3910         }
3911
3912         return 0;
3913 }
3914
3915 static int
3916 lnet_parse_reply(struct lnet_ni *ni, struct lnet_msg *msg)
3917 {
3918         void *private = msg->msg_private;
3919         struct lnet_hdr *hdr = &msg->msg_hdr;
3920         struct lnet_process_id src = {0};
3921         struct lnet_libmd *md;
3922         int rlength;
3923         int mlength;
3924         int cpt;
3925
3926         cpt = lnet_cpt_of_cookie(hdr->msg.reply.dst_wmd.wh_object_cookie);
3927         lnet_res_lock(cpt);
3928
3929         src.nid = hdr->src_nid;
3930         src.pid = hdr->src_pid;
3931
3932         /* NB handles only looked up by creator (no flips) */
3933         md = lnet_wire_handle2md(&hdr->msg.reply.dst_wmd);
3934         if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
3935                 CNETERR("%s: Dropping REPLY from %s for %s "
3936                         "MD %#llx.%#llx\n",
3937                         libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
3938                         (md == NULL) ? "invalid" : "inactive",
3939                         hdr->msg.reply.dst_wmd.wh_interface_cookie,
3940                         hdr->msg.reply.dst_wmd.wh_object_cookie);
3941                 if (md != NULL && md->md_me != NULL)
3942                         CERROR("REPLY MD also attached to portal %d\n",
3943                                md->md_me->me_portal);
3944
3945                 lnet_res_unlock(cpt);
3946                 return -ENOENT; /* -ve: OK but no match */
3947         }
3948
3949         LASSERT(md->md_offset == 0);
3950
3951         rlength = hdr->payload_length;
3952         mlength = MIN(rlength, (int)md->md_length);
3953
3954         if (mlength < rlength &&
3955             (md->md_options & LNET_MD_TRUNCATE) == 0) {
3956                 CNETERR("%s: Dropping REPLY from %s length %d "
3957                         "for MD %#llx would overflow (%d)\n",
3958                         libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
3959                         rlength, hdr->msg.reply.dst_wmd.wh_object_cookie,
3960                         mlength);
3961                 lnet_res_unlock(cpt);
3962                 return -ENOENT; /* -ve: OK but no match */
3963         }
3964
3965         CDEBUG(D_NET, "%s: Reply from %s of length %d/%d into md %#llx\n",
3966                libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
3967                mlength, rlength, hdr->msg.reply.dst_wmd.wh_object_cookie);
3968
3969         lnet_msg_attach_md(msg, md, 0, mlength);
3970
3971         if (mlength != 0)
3972                 lnet_setpayloadbuffer(msg);
3973
3974         lnet_res_unlock(cpt);
3975
3976         lnet_build_msg_event(msg, LNET_EVENT_REPLY);
3977
3978         lnet_ni_recv(ni, private, msg, 0, 0, mlength, rlength);
3979         return 0;
3980 }
3981
3982 static int
3983 lnet_parse_ack(struct lnet_ni *ni, struct lnet_msg *msg)
3984 {
3985         struct lnet_hdr *hdr = &msg->msg_hdr;
3986         struct lnet_process_id src = {0};
3987         struct lnet_libmd *md;
3988         int cpt;
3989
3990         src.nid = hdr->src_nid;
3991         src.pid = hdr->src_pid;
3992
3993         /* Convert ack fields to host byte order */
3994         hdr->msg.ack.match_bits = le64_to_cpu(hdr->msg.ack.match_bits);
3995         hdr->msg.ack.mlength = le32_to_cpu(hdr->msg.ack.mlength);
3996
3997         cpt = lnet_cpt_of_cookie(hdr->msg.ack.dst_wmd.wh_object_cookie);
3998         lnet_res_lock(cpt);
3999
4000         /* NB handles only looked up by creator (no flips) */
4001         md = lnet_wire_handle2md(&hdr->msg.ack.dst_wmd);
4002         if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
4003                 /* Don't moan; this is expected */
4004                 CDEBUG(D_NET,
4005                        "%s: Dropping ACK from %s to %s MD %#llx.%#llx\n",
4006                        libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
4007                        (md == NULL) ? "invalid" : "inactive",
4008                        hdr->msg.ack.dst_wmd.wh_interface_cookie,
4009                        hdr->msg.ack.dst_wmd.wh_object_cookie);
4010                 if (md != NULL && md->md_me != NULL)
4011                         CERROR("Source MD also attached to portal %d\n",
4012                                md->md_me->me_portal);
4013
4014                 lnet_res_unlock(cpt);
4015                 return -ENOENT;                  /* -ve! */
4016         }
4017
4018         CDEBUG(D_NET, "%s: ACK from %s into md %#llx\n",
4019                libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
4020                hdr->msg.ack.dst_wmd.wh_object_cookie);
4021
4022         lnet_msg_attach_md(msg, md, 0, 0);
4023
4024         lnet_res_unlock(cpt);
4025
4026         lnet_build_msg_event(msg, LNET_EVENT_ACK);
4027
4028         lnet_ni_recv(ni, msg->msg_private, msg, 0, 0, 0, msg->msg_len);
4029         return 0;
4030 }
4031
4032 /**
4033  * \retval LNET_CREDIT_OK       If \a msg is forwarded
4034  * \retval LNET_CREDIT_WAIT     If \a msg is blocked because w/o buffer
4035  * \retval -ve                  error code
4036  */
4037 int
4038 lnet_parse_forward_locked(struct lnet_ni *ni, struct lnet_msg *msg)
4039 {
4040         int     rc = 0;
4041
4042         if (!the_lnet.ln_routing)
4043                 return -ECANCELED;
4044
4045         if (msg->msg_rxpeer->lpni_rtrcredits <= 0 ||
4046             lnet_msg2bufpool(msg)->rbp_credits <= 0) {
4047                 if (ni->ni_net->net_lnd->lnd_eager_recv == NULL) {
4048                         msg->msg_rx_ready_delay = 1;
4049                 } else {
4050                         lnet_net_unlock(msg->msg_rx_cpt);
4051                         rc = lnet_ni_eager_recv(ni, msg);
4052                         lnet_net_lock(msg->msg_rx_cpt);
4053                 }
4054         }
4055
4056         if (rc == 0)
4057                 rc = lnet_post_routed_recv_locked(msg, 0);
4058         return rc;
4059 }
4060
4061 int
4062 lnet_parse_local(struct lnet_ni *ni, struct lnet_msg *msg)
4063 {
4064         int     rc;
4065
4066         switch (msg->msg_type) {
4067         case LNET_MSG_ACK:
4068                 rc = lnet_parse_ack(ni, msg);
4069                 break;
4070         case LNET_MSG_PUT:
4071                 rc = lnet_parse_put(ni, msg);
4072                 break;
4073         case LNET_MSG_GET:
4074                 rc = lnet_parse_get(ni, msg, msg->msg_rdma_get);
4075                 break;
4076         case LNET_MSG_REPLY:
4077                 rc = lnet_parse_reply(ni, msg);
4078                 break;
4079         default: /* prevent an unused label if !kernel */
4080                 LASSERT(0);
4081                 return -EPROTO;
4082         }
4083
4084         LASSERT(rc == 0 || rc == -ENOENT);
4085         return rc;
4086 }
4087
4088 char *
4089 lnet_msgtyp2str (int type)
4090 {
4091         switch (type) {
4092         case LNET_MSG_ACK:
4093                 return ("ACK");
4094         case LNET_MSG_PUT:
4095                 return ("PUT");
4096         case LNET_MSG_GET:
4097                 return ("GET");
4098         case LNET_MSG_REPLY:
4099                 return ("REPLY");
4100         case LNET_MSG_HELLO:
4101                 return ("HELLO");
4102         default:
4103                 return ("<UNKNOWN>");
4104         }
4105 }
4106
4107 void
4108 lnet_print_hdr(struct lnet_hdr *hdr)
4109 {
4110         struct lnet_process_id src = {
4111                 .nid = hdr->src_nid,
4112                 .pid = hdr->src_pid,
4113         };
4114         struct lnet_process_id dst = {
4115                 .nid = hdr->dest_nid,
4116                 .pid = hdr->dest_pid,
4117         };
4118         char *type_str = lnet_msgtyp2str(hdr->type);
4119
4120         CWARN("P3 Header at %p of type %s\n", hdr, type_str);
4121         CWARN("    From %s\n", libcfs_id2str(src));
4122         CWARN("    To   %s\n", libcfs_id2str(dst));
4123
4124         switch (hdr->type) {
4125         default:
4126                 break;
4127
4128         case LNET_MSG_PUT:
4129                 CWARN("    Ptl index %d, ack md %#llx.%#llx, "
4130                       "match bits %llu\n",
4131                       hdr->msg.put.ptl_index,
4132                       hdr->msg.put.ack_wmd.wh_interface_cookie,
4133                       hdr->msg.put.ack_wmd.wh_object_cookie,
4134                       hdr->msg.put.match_bits);
4135                 CWARN("    Length %d, offset %d, hdr data %#llx\n",
4136                       hdr->payload_length, hdr->msg.put.offset,
4137                       hdr->msg.put.hdr_data);
4138                 break;
4139
4140         case LNET_MSG_GET:
4141                 CWARN("    Ptl index %d, return md %#llx.%#llx, "
4142                       "match bits %llu\n", hdr->msg.get.ptl_index,
4143                       hdr->msg.get.return_wmd.wh_interface_cookie,
4144                       hdr->msg.get.return_wmd.wh_object_cookie,
4145                       hdr->msg.get.match_bits);
4146                 CWARN("    Length %d, src offset %d\n",
4147                       hdr->msg.get.sink_length,
4148                       hdr->msg.get.src_offset);
4149                 break;
4150
4151         case LNET_MSG_ACK:
4152                 CWARN("    dst md %#llx.%#llx, "
4153                       "manipulated length %d\n",
4154                       hdr->msg.ack.dst_wmd.wh_interface_cookie,
4155                       hdr->msg.ack.dst_wmd.wh_object_cookie,
4156                       hdr->msg.ack.mlength);
4157                 break;
4158
4159         case LNET_MSG_REPLY:
4160                 CWARN("    dst md %#llx.%#llx, "
4161                       "length %d\n",
4162                       hdr->msg.reply.dst_wmd.wh_interface_cookie,
4163                       hdr->msg.reply.dst_wmd.wh_object_cookie,
4164                       hdr->payload_length);
4165         }
4166
4167 }
4168
4169 int
4170 lnet_parse(struct lnet_ni *ni, struct lnet_hdr *hdr, lnet_nid_t from_nid,
4171            void *private, int rdma_req)
4172 {
4173         struct lnet_peer_ni *lpni;
4174         struct lnet_msg *msg;
4175         __u32 payload_length;
4176         lnet_pid_t dest_pid;
4177         lnet_nid_t dest_nid;
4178         lnet_nid_t src_nid;
4179         bool push = false;
4180         int for_me;
4181         __u32 type;
4182         int rc = 0;
4183         int cpt;
4184
4185         LASSERT (!in_interrupt ());
4186
4187         type = le32_to_cpu(hdr->type);
4188         src_nid = le64_to_cpu(hdr->src_nid);
4189         dest_nid = le64_to_cpu(hdr->dest_nid);
4190         dest_pid = le32_to_cpu(hdr->dest_pid);
4191         payload_length = le32_to_cpu(hdr->payload_length);
4192
4193         for_me = (ni->ni_nid == dest_nid);
4194         cpt = lnet_cpt_of_nid(from_nid, ni);
4195
4196         CDEBUG(D_NET, "TRACE: %s(%s) <- %s : %s - %s\n",
4197                 libcfs_nid2str(dest_nid),
4198                 libcfs_nid2str(ni->ni_nid),
4199                 libcfs_nid2str(src_nid),
4200                 lnet_msgtyp2str(type),
4201                 (for_me) ? "for me" : "routed");
4202
4203         switch (type) {
4204         case LNET_MSG_ACK:
4205         case LNET_MSG_GET:
4206                 if (payload_length > 0) {
4207                         CERROR("%s, src %s: bad %s payload %d (0 expected)\n",
4208                                libcfs_nid2str(from_nid),
4209                                libcfs_nid2str(src_nid),
4210                                lnet_msgtyp2str(type), payload_length);
4211                         return -EPROTO;
4212                 }
4213                 break;
4214
4215         case LNET_MSG_PUT:
4216         case LNET_MSG_REPLY:
4217                 if (payload_length >
4218                     (__u32)(for_me ? LNET_MAX_PAYLOAD : LNET_MTU)) {
4219                         CERROR("%s, src %s: bad %s payload %d "
4220                                "(%d max expected)\n",
4221                                libcfs_nid2str(from_nid),
4222                                libcfs_nid2str(src_nid),
4223                                lnet_msgtyp2str(type),
4224                                payload_length,
4225                                for_me ? LNET_MAX_PAYLOAD : LNET_MTU);
4226                         return -EPROTO;
4227                 }
4228                 break;
4229
4230         default:
4231                 CERROR("%s, src %s: Bad message type 0x%x\n",
4232                        libcfs_nid2str(from_nid),
4233                        libcfs_nid2str(src_nid), type);
4234                 return -EPROTO;
4235         }
4236
4237         if (the_lnet.ln_routing &&
4238             ni->ni_net->net_last_alive != ktime_get_real_seconds()) {
4239                 lnet_ni_lock(ni);
4240                 spin_lock(&ni->ni_net->net_lock);
4241                 ni->ni_net->net_last_alive = ktime_get_real_seconds();
4242                 spin_unlock(&ni->ni_net->net_lock);
4243                 if (ni->ni_status != NULL &&
4244                     ni->ni_status->ns_status == LNET_NI_STATUS_DOWN) {
4245                         ni->ni_status->ns_status = LNET_NI_STATUS_UP;
4246                         push = true;
4247                 }
4248                 lnet_ni_unlock(ni);
4249         }
4250
4251         if (push)
4252                 lnet_push_update_to_peers(1);
4253
4254         /* Regard a bad destination NID as a protocol error.  Senders should
4255          * know what they're doing; if they don't they're misconfigured, buggy
4256          * or malicious so we chop them off at the knees :) */
4257
4258         if (!for_me) {
4259                 if (LNET_NIDNET(dest_nid) == LNET_NIDNET(ni->ni_nid)) {
4260                         /* should have gone direct */
4261                         CERROR("%s, src %s: Bad dest nid %s "
4262                                "(should have been sent direct)\n",
4263                                 libcfs_nid2str(from_nid),
4264                                 libcfs_nid2str(src_nid),
4265                                 libcfs_nid2str(dest_nid));
4266                         return -EPROTO;
4267                 }
4268
4269                 if (lnet_islocalnid(dest_nid)) {
4270                         /* dest is another local NI; sender should have used
4271                          * this node's NID on its own network */
4272                         CERROR("%s, src %s: Bad dest nid %s "
4273                                "(it's my nid but on a different network)\n",
4274                                 libcfs_nid2str(from_nid),
4275                                 libcfs_nid2str(src_nid),
4276                                 libcfs_nid2str(dest_nid));
4277                         return -EPROTO;
4278                 }
4279
4280                 if (rdma_req && type == LNET_MSG_GET) {
4281                         CERROR("%s, src %s: Bad optimized GET for %s "
4282                                "(final destination must be me)\n",
4283                                 libcfs_nid2str(from_nid),
4284                                 libcfs_nid2str(src_nid),
4285                                 libcfs_nid2str(dest_nid));
4286                         return -EPROTO;
4287                 }
4288
4289                 if (!the_lnet.ln_routing) {
4290                         CERROR("%s, src %s: Dropping message for %s "
4291                                "(routing not enabled)\n",
4292                                 libcfs_nid2str(from_nid),
4293                                 libcfs_nid2str(src_nid),
4294                                 libcfs_nid2str(dest_nid));
4295                         goto drop;
4296                 }
4297         }
4298
4299         /* Message looks OK; we're not going to return an error, so we MUST
4300          * call back lnd_recv() come what may... */
4301
4302         if (!list_empty(&the_lnet.ln_test_peers) &&     /* normally we don't */
4303             fail_peer(src_nid, 0)) {                    /* shall we now? */
4304                 CERROR("%s, src %s: Dropping %s to simulate failure\n",
4305                        libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
4306                        lnet_msgtyp2str(type));
4307                 goto drop;
4308         }
4309
4310         if (!list_empty(&the_lnet.ln_drop_rules) &&
4311             lnet_drop_rule_match(hdr, ni->ni_nid, NULL)) {
4312                 CDEBUG(D_NET, "%s, src %s, dst %s: Dropping %s to simulate"
4313                               "silent message loss\n",
4314                        libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
4315                        libcfs_nid2str(dest_nid), lnet_msgtyp2str(type));
4316                 goto drop;
4317         }
4318
4319         if (lnet_drop_asym_route && for_me &&
4320             LNET_NIDNET(src_nid) != LNET_NIDNET(from_nid)) {
4321                 struct lnet_net *net;
4322                 struct lnet_remotenet *rnet;
4323                 bool found = true;
4324
4325                 /* we are dealing with a routed message,
4326                  * so see if route to reach src_nid goes through from_nid
4327                  */
4328                 lnet_net_lock(cpt);
4329                 net = lnet_get_net_locked(LNET_NIDNET(ni->ni_nid));
4330                 if (!net) {
4331                         lnet_net_unlock(cpt);
4332                         CERROR("net %s not found\n",
4333                                libcfs_net2str(LNET_NIDNET(ni->ni_nid)));
4334                         return -EPROTO;
4335                 }
4336
4337                 rnet = lnet_find_rnet_locked(LNET_NIDNET(src_nid));
4338                 if (rnet) {
4339                         struct lnet_peer *gw = NULL;
4340                         struct lnet_peer_ni *lpni = NULL;
4341                         struct lnet_route *route;
4342
4343                         list_for_each_entry(route, &rnet->lrn_routes, lr_list) {
4344                                 found = false;
4345                                 gw = route->lr_gateway;
4346                                 if (route->lr_lnet != net->net_id)
4347                                         continue;
4348                                 /*
4349                                  * if the nid is one of the gateway's NIDs
4350                                  * then this is a valid gateway
4351                                  */
4352                                 while ((lpni = lnet_get_next_peer_ni_locked(gw,
4353                                                 NULL, lpni)) != NULL) {
4354                                         if (lpni->lpni_nid == from_nid) {
4355                                                 found = true;
4356                                                 break;
4357                                         }
4358                                 }
4359                         }
4360                 }
4361                 lnet_net_unlock(cpt);
4362                 if (!found) {
4363                         /* we would not use from_nid to route a message to
4364                          * src_nid
4365                          * => asymmetric routing detected but forbidden
4366                          */
4367                         CERROR("%s, src %s: Dropping asymmetrical route %s\n",
4368                                libcfs_nid2str(from_nid),
4369                                libcfs_nid2str(src_nid), lnet_msgtyp2str(type));
4370                         goto drop;
4371                 }
4372         }
4373
4374         msg = lnet_msg_alloc();
4375         if (msg == NULL) {
4376                 CERROR("%s, src %s: Dropping %s (out of memory)\n",
4377                        libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
4378                        lnet_msgtyp2str(type));
4379                 goto drop;
4380         }
4381
4382         /* msg zeroed in lnet_msg_alloc; i.e. flags all clear,
4383          * pointers NULL etc */
4384
4385         msg->msg_type = type;
4386         msg->msg_private = private;
4387         msg->msg_receiving = 1;
4388         msg->msg_rdma_get = rdma_req;
4389         msg->msg_len = msg->msg_wanted = payload_length;
4390         msg->msg_offset = 0;
4391         msg->msg_hdr = *hdr;
4392         /* for building message event */
4393         msg->msg_from = from_nid;
4394         if (!for_me) {
4395                 msg->msg_target.pid     = dest_pid;
4396                 msg->msg_target.nid     = dest_nid;
4397                 msg->msg_routing        = 1;
4398
4399         } else {
4400                 /* convert common msg->hdr fields to host byteorder */
4401                 msg->msg_hdr.type       = type;
4402                 msg->msg_hdr.src_nid    = src_nid;
4403                 msg->msg_hdr.src_pid    = le32_to_cpu(msg->msg_hdr.src_pid);
4404                 msg->msg_hdr.dest_nid   = dest_nid;
4405                 msg->msg_hdr.dest_pid   = dest_pid;
4406                 msg->msg_hdr.payload_length = payload_length;
4407         }
4408
4409         lnet_net_lock(cpt);
4410         lpni = lnet_nid2peerni_locked(from_nid, ni->ni_nid, cpt);
4411         if (IS_ERR(lpni)) {
4412                 lnet_net_unlock(cpt);
4413                 CERROR("%s, src %s: Dropping %s "
4414                        "(error %ld looking up sender)\n",
4415                        libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
4416                        lnet_msgtyp2str(type), PTR_ERR(lpni));
4417                 lnet_msg_free(msg);
4418                 if (rc == -ESHUTDOWN)
4419                         /* We are shutting down.  Don't do anything more */
4420                         return 0;
4421                 goto drop;
4422         }
4423         msg->msg_rxpeer = lpni;
4424         msg->msg_rxni = ni;
4425         lnet_ni_addref_locked(ni, cpt);
4426         /* Multi-Rail: Primary NID of source. */
4427         msg->msg_initiator = lnet_peer_primary_nid_locked(src_nid);
4428
4429         /*
4430          * mark the status of this lpni as UP since we received a message
4431          * from it. The ping response reports back the ns_status which is
4432          * marked on the remote as up or down and we cache it here.
4433          */
4434         msg->msg_rxpeer->lpni_ns_status = LNET_NI_STATUS_UP;
4435
4436         lnet_msg_commit(msg, cpt);
4437
4438         /* message delay simulation */
4439         if (unlikely(!list_empty(&the_lnet.ln_delay_rules) &&
4440                      lnet_delay_rule_match_locked(hdr, msg))) {
4441                 lnet_net_unlock(cpt);
4442                 return 0;
4443         }
4444
4445         if (!for_me) {
4446                 rc = lnet_parse_forward_locked(ni, msg);
4447                 lnet_net_unlock(cpt);
4448
4449                 if (rc < 0)
4450                         goto free_drop;
4451
4452                 if (rc == LNET_CREDIT_OK) {
4453                         lnet_ni_recv(ni, msg->msg_private, msg, 0,
4454                                      0, payload_length, payload_length);
4455                 }
4456                 return 0;
4457         }
4458
4459         lnet_net_unlock(cpt);
4460
4461         rc = lnet_parse_local(ni, msg);
4462         if (rc != 0)
4463                 goto free_drop;
4464         return 0;
4465
4466  free_drop:
4467         LASSERT(msg->msg_md == NULL);
4468         lnet_finalize(msg, rc);
4469
4470  drop:
4471         lnet_drop_message(ni, cpt, private, payload_length, type);
4472         return 0;
4473 }
4474 EXPORT_SYMBOL(lnet_parse);
4475
4476 void
4477 lnet_drop_delayed_msg_list(struct list_head *head, char *reason)
4478 {
4479         while (!list_empty(head)) {
4480                 struct lnet_process_id id = {0};
4481                 struct lnet_msg *msg;
4482
4483                 msg = list_entry(head->next, struct lnet_msg, msg_list);
4484                 list_del(&msg->msg_list);
4485
4486                 id.nid = msg->msg_hdr.src_nid;
4487                 id.pid = msg->msg_hdr.src_pid;
4488
4489                 LASSERT(msg->msg_md == NULL);
4490                 LASSERT(msg->msg_rx_delayed);
4491                 LASSERT(msg->msg_rxpeer != NULL);
4492                 LASSERT(msg->msg_hdr.type == LNET_MSG_PUT);
4493
4494                 CWARN("Dropping delayed PUT from %s portal %d match %llu"
4495                       " offset %d length %d: %s\n",
4496                       libcfs_id2str(id),
4497                       msg->msg_hdr.msg.put.ptl_index,
4498                       msg->msg_hdr.msg.put.match_bits,
4499                       msg->msg_hdr.msg.put.offset,
4500                       msg->msg_hdr.payload_length, reason);
4501
4502                 /* NB I can't drop msg's ref on msg_rxpeer until after I've
4503                  * called lnet_drop_message(), so I just hang onto msg as well
4504                  * until that's done */
4505
4506                 lnet_drop_message(msg->msg_rxni, msg->msg_rx_cpt,
4507                                   msg->msg_private, msg->msg_len,
4508                                   msg->msg_type);
4509
4510                 msg->msg_no_resend = true;
4511                 /*
4512                  * NB: message will not generate event because w/o attached MD,
4513                  * but we still should give error code so lnet_msg_decommit()
4514                  * can skip counters operations and other checks.
4515                  */
4516                 lnet_finalize(msg, -ENOENT);
4517         }
4518 }
4519
4520 void
4521 lnet_recv_delayed_msg_list(struct list_head *head)
4522 {
4523         while (!list_empty(head)) {
4524                 struct lnet_msg *msg;
4525                 struct lnet_process_id id;
4526
4527                 msg = list_entry(head->next, struct lnet_msg, msg_list);
4528                 list_del(&msg->msg_list);
4529
4530                 /* md won't disappear under me, since each msg
4531                  * holds a ref on it */
4532
4533                 id.nid = msg->msg_hdr.src_nid;
4534                 id.pid = msg->msg_hdr.src_pid;
4535
4536                 LASSERT(msg->msg_rx_delayed);
4537                 LASSERT(msg->msg_md != NULL);
4538                 LASSERT(msg->msg_rxpeer != NULL);
4539                 LASSERT(msg->msg_rxni != NULL);
4540                 LASSERT(msg->msg_hdr.type == LNET_MSG_PUT);
4541
4542                 CDEBUG(D_NET, "Resuming delayed PUT from %s portal %d "
4543                        "match %llu offset %d length %d.\n",
4544                         libcfs_id2str(id), msg->msg_hdr.msg.put.ptl_index,
4545                         msg->msg_hdr.msg.put.match_bits,
4546                         msg->msg_hdr.msg.put.offset,
4547                         msg->msg_hdr.payload_length);
4548
4549                 lnet_recv_put(msg->msg_rxni, msg);
4550         }
4551 }
4552
4553 static void
4554 lnet_attach_rsp_tracker(struct lnet_rsp_tracker *rspt, int cpt,
4555                         struct lnet_libmd *md, struct lnet_handle_md mdh)
4556 {
4557         s64 timeout_ns;
4558         bool new_entry = true;
4559         struct lnet_rsp_tracker *local_rspt;
4560
4561         /*
4562          * MD has a refcount taken by message so it's not going away.
4563          * The MD however can be looked up. We need to secure the access
4564          * to the md_rspt_ptr by taking the res_lock.
4565          * The rspt can be accessed without protection up to when it gets
4566          * added to the list.
4567          */
4568
4569         lnet_res_lock(cpt);
4570         local_rspt = md->md_rspt_ptr;
4571         timeout_ns = lnet_transaction_timeout * NSEC_PER_SEC;
4572         if (local_rspt != NULL) {
4573                 /*
4574                  * we already have an rspt attached to the md, so we'll
4575                  * update the deadline on that one.
4576                  */
4577                 LIBCFS_FREE(rspt, sizeof(*rspt));
4578                 new_entry = false;
4579         } else {
4580                 /* new md */
4581                 rspt->rspt_mdh = mdh;
4582                 rspt->rspt_cpt = cpt;
4583                 /* store the rspt so we can access it when we get the REPLY */
4584                 md->md_rspt_ptr = rspt;
4585                 local_rspt = rspt;
4586         }
4587         local_rspt->rspt_deadline = ktime_add_ns(ktime_get(), timeout_ns);
4588
4589         /*
4590          * add to the list of tracked responses. It's added to tail of the
4591          * list in order to expire all the older entries first.
4592          */
4593         lnet_net_lock(cpt);
4594         if (!new_entry && !list_empty(&local_rspt->rspt_on_list))
4595                 list_del_init(&local_rspt->rspt_on_list);
4596         list_add_tail(&local_rspt->rspt_on_list, the_lnet.ln_mt_rstq[cpt]);
4597         lnet_net_unlock(cpt);
4598         lnet_res_unlock(cpt);
4599 }
4600
4601 /**
4602  * Initiate an asynchronous PUT operation.
4603  *
4604  * There are several events associated with a PUT: completion of the send on
4605  * the initiator node (LNET_EVENT_SEND), and when the send completes
4606  * successfully, the receipt of an acknowledgment (LNET_EVENT_ACK) indicating
4607  * that the operation was accepted by the target. The event LNET_EVENT_PUT is
4608  * used at the target node to indicate the completion of incoming data
4609  * delivery.
4610  *
4611  * The local events will be logged in the EQ associated with the MD pointed to
4612  * by \a mdh handle. Using a MD without an associated EQ results in these
4613  * events being discarded. In this case, the caller must have another
4614  * mechanism (e.g., a higher level protocol) for determining when it is safe
4615  * to modify the memory region associated with the MD.
4616  *
4617  * Note that LNet does not guarantee the order of LNET_EVENT_SEND and
4618  * LNET_EVENT_ACK, though intuitively ACK should happen after SEND.
4619  *
4620  * \param self Indicates the NID of a local interface through which to send
4621  * the PUT request. Use LNET_NID_ANY to let LNet choose one by itself.
4622  * \param mdh A handle for the MD that describes the memory to be sent. The MD
4623  * must be "free floating" (See LNetMDBind()).
4624  * \param ack Controls whether an acknowledgment is requested.
4625  * Acknowledgments are only sent when they are requested by the initiating
4626  * process and the target MD enables them.
4627  * \param target A process identifier for the target process.
4628  * \param portal The index in the \a target's portal table.
4629  * \param match_bits The match bits to use for MD selection at the target
4630  * process.
4631  * \param offset The offset into the target MD (only used when the target
4632  * MD has the LNET_MD_MANAGE_REMOTE option set).
4633  * \param hdr_data 64 bits of user data that can be included in the message
4634  * header. This data is written to an event queue entry at the target if an
4635  * EQ is present on the matching MD.
4636  *
4637  * \retval  0      Success, and only in this case events will be generated
4638  * and logged to EQ (if it exists).
4639  * \retval -EIO    Simulated failure.
4640  * \retval -ENOMEM Memory allocation failure.
4641  * \retval -ENOENT Invalid MD object.
4642  *
4643  * \see struct lnet_event::hdr_data and lnet_event_kind_t.
4644  */
4645 int
4646 LNetPut(lnet_nid_t self, struct lnet_handle_md mdh, enum lnet_ack_req ack,
4647         struct lnet_process_id target, unsigned int portal,
4648         __u64 match_bits, unsigned int offset,
4649         __u64 hdr_data)
4650 {
4651         struct lnet_msg *msg;
4652         struct lnet_libmd *md;
4653         int cpt;
4654         int rc;
4655         struct lnet_rsp_tracker *rspt = NULL;
4656
4657         LASSERT(the_lnet.ln_refcount > 0);
4658
4659         if (!list_empty(&the_lnet.ln_test_peers) &&     /* normally we don't */
4660             fail_peer(target.nid, 1)) {                 /* shall we now? */
4661                 CERROR("Dropping PUT to %s: simulated failure\n",
4662                        libcfs_id2str(target));
4663                 return -EIO;
4664         }
4665
4666         msg = lnet_msg_alloc();
4667         if (msg == NULL) {
4668                 CERROR("Dropping PUT to %s: ENOMEM on struct lnet_msg\n",
4669                        libcfs_id2str(target));
4670                 return -ENOMEM;
4671         }
4672         msg->msg_vmflush = !!memory_pressure_get();
4673
4674         cpt = lnet_cpt_of_cookie(mdh.cookie);
4675
4676         if (ack == LNET_ACK_REQ) {
4677                 rspt = lnet_rspt_alloc(cpt);
4678                 if (!rspt) {
4679                         CERROR("Dropping PUT to %s: ENOMEM on response tracker\n",
4680                                 libcfs_id2str(target));
4681                         return -ENOMEM;
4682                 }
4683                 INIT_LIST_HEAD(&rspt->rspt_on_list);
4684         }
4685
4686         lnet_res_lock(cpt);
4687
4688         md = lnet_handle2md(&mdh);
4689         if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
4690                 CERROR("Dropping PUT (%llu:%d:%s): MD (%d) invalid\n",
4691                        match_bits, portal, libcfs_id2str(target),
4692                        md == NULL ? -1 : md->md_threshold);
4693                 if (md != NULL && md->md_me != NULL)
4694                         CERROR("Source MD also attached to portal %d\n",
4695                                md->md_me->me_portal);
4696                 lnet_res_unlock(cpt);
4697
4698                 LIBCFS_FREE(rspt, sizeof(*rspt));
4699                 lnet_msg_free(msg);
4700                 return -ENOENT;
4701         }
4702
4703         CDEBUG(D_NET, "LNetPut -> %s\n", libcfs_id2str(target));
4704
4705         lnet_msg_attach_md(msg, md, 0, 0);
4706
4707         lnet_prep_send(msg, LNET_MSG_PUT, target, 0, md->md_length);
4708
4709         msg->msg_hdr.msg.put.match_bits = cpu_to_le64(match_bits);
4710         msg->msg_hdr.msg.put.ptl_index = cpu_to_le32(portal);
4711         msg->msg_hdr.msg.put.offset = cpu_to_le32(offset);
4712         msg->msg_hdr.msg.put.hdr_data = hdr_data;
4713
4714         /* NB handles only looked up by creator (no flips) */
4715         if (ack == LNET_ACK_REQ) {
4716                 msg->msg_hdr.msg.put.ack_wmd.wh_interface_cookie =
4717                         the_lnet.ln_interface_cookie;
4718                 msg->msg_hdr.msg.put.ack_wmd.wh_object_cookie =
4719                         md->md_lh.lh_cookie;
4720         } else {
4721                 msg->msg_hdr.msg.put.ack_wmd.wh_interface_cookie =
4722                         LNET_WIRE_HANDLE_COOKIE_NONE;
4723                 msg->msg_hdr.msg.put.ack_wmd.wh_object_cookie =
4724                         LNET_WIRE_HANDLE_COOKIE_NONE;
4725         }
4726
4727         lnet_res_unlock(cpt);
4728
4729         lnet_build_msg_event(msg, LNET_EVENT_SEND);
4730
4731         if (ack == LNET_ACK_REQ)
4732                 lnet_attach_rsp_tracker(rspt, cpt, md, mdh);
4733
4734         if (CFS_FAIL_CHECK_ORSET(CFS_FAIL_PTLRPC_OST_BULK_CB2,
4735                                  CFS_FAIL_ONCE))
4736                 rc = -EIO;
4737         else
4738                 rc = lnet_send(self, msg, LNET_NID_ANY);
4739
4740         if (rc != 0) {
4741                 CNETERR("Error sending PUT to %s: %d\n",
4742                         libcfs_id2str(target), rc);
4743                 msg->msg_no_resend = true;
4744                 lnet_finalize(msg, rc);
4745         }
4746
4747         /* completion will be signalled by an event */
4748         return 0;
4749 }
4750 EXPORT_SYMBOL(LNetPut);
4751
4752 /*
4753  * The LND can DMA direct to the GET md (i.e. no REPLY msg).  This
4754  * returns a msg for the LND to pass to lnet_finalize() when the sink
4755  * data has been received.
4756  *
4757  * CAVEAT EMPTOR: 'getmsg' is the original GET, which is freed when
4758  * lnet_finalize() is called on it, so the LND must call this first
4759  */
4760 struct lnet_msg *
4761 lnet_create_reply_msg(struct lnet_ni *ni, struct lnet_msg *getmsg)
4762 {
4763         struct lnet_msg *msg = lnet_msg_alloc();
4764         struct lnet_libmd *getmd = getmsg->msg_md;
4765         struct lnet_process_id peer_id = getmsg->msg_target;
4766         int cpt;
4767
4768         LASSERT(!getmsg->msg_target_is_router);
4769         LASSERT(!getmsg->msg_routing);
4770
4771         if (msg == NULL) {
4772                 CERROR("%s: Dropping REPLY from %s: can't allocate msg\n",
4773                        libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id));
4774                 goto drop;
4775         }
4776
4777         cpt = lnet_cpt_of_cookie(getmd->md_lh.lh_cookie);
4778         lnet_res_lock(cpt);
4779
4780         LASSERT(getmd->md_refcount > 0);
4781
4782         if (getmd->md_threshold == 0) {
4783                 CERROR("%s: Dropping REPLY from %s for inactive MD %p\n",
4784                         libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id),
4785                         getmd);
4786                 lnet_res_unlock(cpt);
4787                 goto drop;
4788         }
4789
4790         LASSERT(getmd->md_offset == 0);
4791
4792         CDEBUG(D_NET, "%s: Reply from %s md %p\n",
4793                libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id), getmd);
4794
4795         /* setup information for lnet_build_msg_event */
4796         msg->msg_initiator = getmsg->msg_txpeer->lpni_peer_net->lpn_peer->lp_primary_nid;
4797         msg->msg_from = peer_id.nid;
4798         msg->msg_type = LNET_MSG_GET; /* flag this msg as an "optimized" GET */
4799         msg->msg_hdr.src_nid = peer_id.nid;
4800         msg->msg_hdr.payload_length = getmd->md_length;
4801         msg->msg_receiving = 1; /* required by lnet_msg_attach_md */
4802
4803         lnet_msg_attach_md(msg, getmd, getmd->md_offset, getmd->md_length);
4804         lnet_res_unlock(cpt);
4805
4806         cpt = lnet_cpt_of_nid(peer_id.nid, ni);
4807
4808         lnet_net_lock(cpt);
4809         lnet_msg_commit(msg, cpt);
4810         lnet_net_unlock(cpt);
4811
4812         lnet_build_msg_event(msg, LNET_EVENT_REPLY);
4813
4814         return msg;
4815
4816  drop:
4817         cpt = lnet_cpt_of_nid(peer_id.nid, ni);
4818
4819         lnet_net_lock(cpt);
4820         lnet_incr_stats(&ni->ni_stats, LNET_MSG_GET, LNET_STATS_TYPE_DROP);
4821         the_lnet.ln_counters[cpt]->lct_common.lcc_drop_count++;
4822         the_lnet.ln_counters[cpt]->lct_common.lcc_drop_length +=
4823                 getmd->md_length;
4824         lnet_net_unlock(cpt);
4825
4826         if (msg != NULL)
4827                 lnet_msg_free(msg);
4828
4829         return NULL;
4830 }
4831 EXPORT_SYMBOL(lnet_create_reply_msg);
4832
4833 void
4834 lnet_set_reply_msg_len(struct lnet_ni *ni, struct lnet_msg *reply,
4835                        unsigned int len)
4836 {
4837         /* Set the REPLY length, now the RDMA that elides the REPLY message has
4838          * completed and I know it. */
4839         LASSERT(reply != NULL);
4840         LASSERT(reply->msg_type == LNET_MSG_GET);
4841         LASSERT(reply->msg_ev.type == LNET_EVENT_REPLY);
4842
4843         /* NB I trusted my peer to RDMA.  If she tells me she's written beyond
4844          * the end of my buffer, I might as well be dead. */
4845         LASSERT(len <= reply->msg_ev.mlength);
4846
4847         reply->msg_ev.mlength = len;
4848 }
4849 EXPORT_SYMBOL(lnet_set_reply_msg_len);
4850
4851 /**
4852  * Initiate an asynchronous GET operation.
4853  *
4854  * On the initiator node, an LNET_EVENT_SEND is logged when the GET request
4855  * is sent, and an LNET_EVENT_REPLY is logged when the data returned from
4856  * the target node in the REPLY has been written to local MD.
4857  *
4858  * On the target node, an LNET_EVENT_GET is logged when the GET request
4859  * arrives and is accepted into a MD.
4860  *
4861  * \param self,target,portal,match_bits,offset See the discussion in LNetPut().
4862  * \param mdh A handle for the MD that describes the memory into which the
4863  * requested data will be received. The MD must be "free floating" (See LNetMDBind()).
4864  *
4865  * \retval  0      Success, and only in this case events will be generated
4866  * and logged to EQ (if it exists) of the MD.
4867  * \retval -EIO    Simulated failure.
4868  * \retval -ENOMEM Memory allocation failure.
4869  * \retval -ENOENT Invalid MD object.
4870  */
4871 int
4872 LNetGet(lnet_nid_t self, struct lnet_handle_md mdh,
4873         struct lnet_process_id target, unsigned int portal,
4874         __u64 match_bits, unsigned int offset, bool recovery)
4875 {
4876         struct lnet_msg *msg;
4877         struct lnet_libmd *md;
4878         struct lnet_rsp_tracker *rspt;
4879         int cpt;
4880         int rc;
4881
4882         LASSERT(the_lnet.ln_refcount > 0);
4883
4884         if (!list_empty(&the_lnet.ln_test_peers) &&     /* normally we don't */
4885             fail_peer(target.nid, 1))                   /* shall we now? */
4886         {
4887                 CERROR("Dropping GET to %s: simulated failure\n",
4888                        libcfs_id2str(target));
4889                 return -EIO;
4890         }
4891
4892         msg = lnet_msg_alloc();
4893         if (!msg) {
4894                 CERROR("Dropping GET to %s: ENOMEM on struct lnet_msg\n",
4895                        libcfs_id2str(target));
4896                 return -ENOMEM;
4897         }
4898
4899         cpt = lnet_cpt_of_cookie(mdh.cookie);
4900
4901         rspt = lnet_rspt_alloc(cpt);
4902         if (!rspt) {
4903                 CERROR("Dropping GET to %s: ENOMEM on response tracker\n",
4904                        libcfs_id2str(target));
4905                 return -ENOMEM;
4906         }
4907         INIT_LIST_HEAD(&rspt->rspt_on_list);
4908
4909         msg->msg_recovery = recovery;
4910
4911         lnet_res_lock(cpt);
4912
4913         md = lnet_handle2md(&mdh);
4914         if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
4915                 CERROR("Dropping GET (%llu:%d:%s): MD (%d) invalid\n",
4916                        match_bits, portal, libcfs_id2str(target),
4917                        md == NULL ? -1 : md->md_threshold);
4918                 if (md != NULL && md->md_me != NULL)
4919                         CERROR("REPLY MD also attached to portal %d\n",
4920                                md->md_me->me_portal);
4921
4922                 lnet_res_unlock(cpt);
4923
4924                 lnet_msg_free(msg);
4925                 LIBCFS_FREE(rspt, sizeof(*rspt));
4926                 return -ENOENT;
4927         }
4928
4929         CDEBUG(D_NET, "LNetGet -> %s\n", libcfs_id2str(target));
4930
4931         lnet_msg_attach_md(msg, md, 0, 0);
4932
4933         lnet_prep_send(msg, LNET_MSG_GET, target, 0, 0);
4934
4935         msg->msg_hdr.msg.get.match_bits = cpu_to_le64(match_bits);
4936         msg->msg_hdr.msg.get.ptl_index = cpu_to_le32(portal);
4937         msg->msg_hdr.msg.get.src_offset = cpu_to_le32(offset);
4938         msg->msg_hdr.msg.get.sink_length = cpu_to_le32(md->md_length);
4939
4940         /* NB handles only looked up by creator (no flips) */
4941         msg->msg_hdr.msg.get.return_wmd.wh_interface_cookie =
4942                 the_lnet.ln_interface_cookie;
4943         msg->msg_hdr.msg.get.return_wmd.wh_object_cookie =
4944                 md->md_lh.lh_cookie;
4945
4946         lnet_res_unlock(cpt);
4947
4948         lnet_build_msg_event(msg, LNET_EVENT_SEND);
4949
4950         lnet_attach_rsp_tracker(rspt, cpt, md, mdh);
4951
4952         rc = lnet_send(self, msg, LNET_NID_ANY);
4953         if (rc < 0) {
4954                 CNETERR("Error sending GET to %s: %d\n",
4955                         libcfs_id2str(target), rc);
4956                 msg->msg_no_resend = true;
4957                 lnet_finalize(msg, rc);
4958         }
4959
4960         /* completion will be signalled by an event */
4961         return 0;
4962 }
4963 EXPORT_SYMBOL(LNetGet);
4964
4965 /**
4966  * Calculate distance to node at \a dstnid.
4967  *
4968  * \param dstnid Target NID.
4969  * \param srcnidp If not NULL, NID of the local interface to reach \a dstnid
4970  * is saved here.
4971  * \param orderp If not NULL, order of the route to reach \a dstnid is saved
4972  * here.
4973  *
4974  * \retval 0 If \a dstnid belongs to a local interface, and reserved option
4975  * local_nid_dist_zero is set, which is the default.
4976  * \retval positives Distance to target NID, i.e. number of hops plus one.
4977  * \retval -EHOSTUNREACH If \a dstnid is not reachable.
4978  */
4979 int
4980 LNetDist(lnet_nid_t dstnid, lnet_nid_t *srcnidp, __u32 *orderp)
4981 {
4982         struct list_head        *e;
4983         struct lnet_ni *ni = NULL;
4984         struct lnet_remotenet *rnet;
4985         __u32                   dstnet = LNET_NIDNET(dstnid);
4986         int                     hops;
4987         int                     cpt;
4988         __u32                   order = 2;
4989         struct list_head        *rn_list;
4990
4991         /* if !local_nid_dist_zero, I don't return a distance of 0 ever
4992          * (when lustre sees a distance of 0, it substitutes 0@lo), so I
4993          * keep order 0 free for 0@lo and order 1 free for a local NID
4994          * match */
4995
4996         LASSERT(the_lnet.ln_refcount > 0);
4997
4998         cpt = lnet_net_lock_current();
4999
5000         while ((ni = lnet_get_next_ni_locked(NULL, ni))) {
5001                 if (ni->ni_nid == dstnid) {
5002                         if (srcnidp != NULL)
5003                                 *srcnidp = dstnid;
5004                         if (orderp != NULL) {
5005                                 if (LNET_NETTYP(LNET_NIDNET(dstnid)) == LOLND)
5006                                         *orderp = 0;
5007                                 else
5008                                         *orderp = 1;
5009                         }
5010                         lnet_net_unlock(cpt);
5011
5012                         return local_nid_dist_zero ? 0 : 1;
5013                 }
5014
5015                 if (LNET_NIDNET(ni->ni_nid) == dstnet) {
5016                         /* Check if ni was originally created in
5017                          * current net namespace.
5018                          * If not, assign order above 0xffff0000,
5019                          * to make this ni not a priority. */
5020                         if (!net_eq(ni->ni_net_ns, current->nsproxy->net_ns))
5021                                 order += 0xffff0000;
5022
5023                         if (srcnidp != NULL)
5024                                 *srcnidp = ni->ni_nid;
5025                         if (orderp != NULL)
5026                                 *orderp = order;
5027                         lnet_net_unlock(cpt);
5028                         return 1;
5029                 }
5030
5031                 order++;
5032         }
5033
5034         rn_list = lnet_net2rnethash(dstnet);
5035         list_for_each(e, rn_list) {
5036                 rnet = list_entry(e, struct lnet_remotenet, lrn_list);
5037
5038                 if (rnet->lrn_net == dstnet) {
5039                         struct lnet_route *route;
5040                         struct lnet_route *shortest = NULL;
5041                         __u32 shortest_hops = LNET_UNDEFINED_HOPS;
5042                         __u32 route_hops;
5043
5044                         LASSERT(!list_empty(&rnet->lrn_routes));
5045
5046                         list_for_each_entry(route, &rnet->lrn_routes,
5047                                             lr_list) {
5048                                 route_hops = route->lr_hops;
5049                                 if (route_hops == LNET_UNDEFINED_HOPS)
5050                                         route_hops = 1;
5051                                 if (shortest == NULL ||
5052                                     route_hops < shortest_hops) {
5053                                         shortest = route;
5054                                         shortest_hops = route_hops;
5055                                 }
5056                         }
5057
5058                         LASSERT(shortest != NULL);
5059                         hops = shortest_hops;
5060                         if (srcnidp != NULL) {
5061                                 struct lnet_net *net;
5062                                 net = lnet_get_net_locked(shortest->lr_lnet);
5063                                 LASSERT(net);
5064                                 ni = lnet_get_next_ni_locked(net, NULL);
5065                                 *srcnidp = ni->ni_nid;
5066                         }
5067                         if (orderp != NULL)
5068                                 *orderp = order;
5069                         lnet_net_unlock(cpt);
5070                         return hops + 1;
5071                 }
5072                 order++;
5073         }
5074
5075         lnet_net_unlock(cpt);
5076         return -EHOSTUNREACH;
5077 }
5078 EXPORT_SYMBOL(LNetDist);