Whamcloud - gitweb
7067164f7f016f95db71a8de36d15fcd6b6bd6e2
[fs/lustre-release.git] / lnet / lnet / lib-move.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lnet/lnet/lib-move.c
33  *
34  * Data movement routines
35  */
36
37 #define DEBUG_SUBSYSTEM S_LNET
38
39 #include <linux/pagemap.h>
40
41 #include <lnet/lib-lnet.h>
42 #include <linux/nsproxy.h>
43 #include <net/net_namespace.h>
44
45 extern unsigned int lnet_current_net_count;
46
47 static int local_nid_dist_zero = 1;
48 module_param(local_nid_dist_zero, int, 0444);
49 MODULE_PARM_DESC(local_nid_dist_zero, "Reserved");
50
51 struct lnet_send_data {
52         struct lnet_ni *sd_best_ni;
53         struct lnet_peer_ni *sd_best_lpni;
54         struct lnet_peer_ni *sd_final_dst_lpni;
55         struct lnet_peer *sd_peer;
56         struct lnet_peer *sd_gw_peer;
57         struct lnet_peer_ni *sd_gw_lpni;
58         struct lnet_peer_net *sd_peer_net;
59         struct lnet_msg *sd_msg;
60         lnet_nid_t sd_dst_nid;
61         lnet_nid_t sd_src_nid;
62         lnet_nid_t sd_rtr_nid;
63         int sd_cpt;
64         int sd_md_cpt;
65         __u32 sd_send_case;
66 };
67
68 static inline struct lnet_comm_count *
69 get_stats_counts(struct lnet_element_stats *stats,
70                  enum lnet_stats_type stats_type)
71 {
72         switch (stats_type) {
73         case LNET_STATS_TYPE_SEND:
74                 return &stats->el_send_stats;
75         case LNET_STATS_TYPE_RECV:
76                 return &stats->el_recv_stats;
77         case LNET_STATS_TYPE_DROP:
78                 return &stats->el_drop_stats;
79         default:
80                 CERROR("Unknown stats type\n");
81         }
82
83         return NULL;
84 }
85
86 void lnet_incr_stats(struct lnet_element_stats *stats,
87                      enum lnet_msg_type msg_type,
88                      enum lnet_stats_type stats_type)
89 {
90         struct lnet_comm_count *counts = get_stats_counts(stats, stats_type);
91         if (!counts)
92                 return;
93
94         switch (msg_type) {
95         case LNET_MSG_ACK:
96                 atomic_inc(&counts->co_ack_count);
97                 break;
98         case LNET_MSG_PUT:
99                 atomic_inc(&counts->co_put_count);
100                 break;
101         case LNET_MSG_GET:
102                 atomic_inc(&counts->co_get_count);
103                 break;
104         case LNET_MSG_REPLY:
105                 atomic_inc(&counts->co_reply_count);
106                 break;
107         case LNET_MSG_HELLO:
108                 atomic_inc(&counts->co_hello_count);
109                 break;
110         default:
111                 CERROR("There is a BUG in the code. Unknown message type\n");
112                 break;
113         }
114 }
115
116 __u32 lnet_sum_stats(struct lnet_element_stats *stats,
117                      enum lnet_stats_type stats_type)
118 {
119         struct lnet_comm_count *counts = get_stats_counts(stats, stats_type);
120         if (!counts)
121                 return 0;
122
123         return (atomic_read(&counts->co_ack_count) +
124                 atomic_read(&counts->co_put_count) +
125                 atomic_read(&counts->co_get_count) +
126                 atomic_read(&counts->co_reply_count) +
127                 atomic_read(&counts->co_hello_count));
128 }
129
130 static inline void assign_stats(struct lnet_ioctl_comm_count *msg_stats,
131                                 struct lnet_comm_count *counts)
132 {
133         msg_stats->ico_get_count = atomic_read(&counts->co_get_count);
134         msg_stats->ico_put_count = atomic_read(&counts->co_put_count);
135         msg_stats->ico_reply_count = atomic_read(&counts->co_reply_count);
136         msg_stats->ico_ack_count = atomic_read(&counts->co_ack_count);
137         msg_stats->ico_hello_count = atomic_read(&counts->co_hello_count);
138 }
139
140 void lnet_usr_translate_stats(struct lnet_ioctl_element_msg_stats *msg_stats,
141                               struct lnet_element_stats *stats)
142 {
143         struct lnet_comm_count *counts;
144
145         LASSERT(msg_stats);
146         LASSERT(stats);
147
148         counts = get_stats_counts(stats, LNET_STATS_TYPE_SEND);
149         if (!counts)
150                 return;
151         assign_stats(&msg_stats->im_send_stats, counts);
152
153         counts = get_stats_counts(stats, LNET_STATS_TYPE_RECV);
154         if (!counts)
155                 return;
156         assign_stats(&msg_stats->im_recv_stats, counts);
157
158         counts = get_stats_counts(stats, LNET_STATS_TYPE_DROP);
159         if (!counts)
160                 return;
161         assign_stats(&msg_stats->im_drop_stats, counts);
162 }
163
164 int
165 lnet_fail_nid(lnet_nid_t nid, unsigned int threshold)
166 {
167         struct lnet_test_peer *tp;
168         struct list_head *el;
169         struct list_head *next;
170         struct list_head  cull;
171
172         /* NB: use lnet_net_lock(0) to serialize operations on test peers */
173         if (threshold != 0) {
174                 /* Adding a new entry */
175                 LIBCFS_ALLOC(tp, sizeof(*tp));
176                 if (tp == NULL)
177                         return -ENOMEM;
178
179                 tp->tp_nid = nid;
180                 tp->tp_threshold = threshold;
181
182                 lnet_net_lock(0);
183                 list_add_tail(&tp->tp_list, &the_lnet.ln_test_peers);
184                 lnet_net_unlock(0);
185                 return 0;
186         }
187
188         /* removing entries */
189         INIT_LIST_HEAD(&cull);
190
191         lnet_net_lock(0);
192
193         list_for_each_safe(el, next, &the_lnet.ln_test_peers) {
194                 tp = list_entry(el, struct lnet_test_peer, tp_list);
195
196                 if (tp->tp_threshold == 0 ||    /* needs culling anyway */
197                     nid == LNET_NID_ANY ||      /* removing all entries */
198                     tp->tp_nid == nid) {        /* matched this one */
199                         list_del(&tp->tp_list);
200                         list_add(&tp->tp_list, &cull);
201                 }
202         }
203
204         lnet_net_unlock(0);
205
206         while (!list_empty(&cull)) {
207                 tp = list_entry(cull.next, struct lnet_test_peer, tp_list);
208
209                 list_del(&tp->tp_list);
210                 LIBCFS_FREE(tp, sizeof(*tp));
211         }
212         return 0;
213 }
214
215 static int
216 fail_peer (lnet_nid_t nid, int outgoing)
217 {
218         struct lnet_test_peer *tp;
219         struct list_head *el;
220         struct list_head *next;
221         struct list_head  cull;
222         int               fail = 0;
223
224         INIT_LIST_HEAD(&cull);
225
226         /* NB: use lnet_net_lock(0) to serialize operations on test peers */
227         lnet_net_lock(0);
228
229         list_for_each_safe(el, next, &the_lnet.ln_test_peers) {
230                 tp = list_entry(el, struct lnet_test_peer, tp_list);
231
232                 if (tp->tp_threshold == 0) {
233                         /* zombie entry */
234                         if (outgoing) {
235                                 /* only cull zombies on outgoing tests,
236                                  * since we may be at interrupt priority on
237                                  * incoming messages. */
238                                 list_del(&tp->tp_list);
239                                 list_add(&tp->tp_list, &cull);
240                         }
241                         continue;
242                 }
243
244                 if (tp->tp_nid == LNET_NID_ANY ||       /* fail every peer */
245                     nid == tp->tp_nid) {                /* fail this peer */
246                         fail = 1;
247
248                         if (tp->tp_threshold != LNET_MD_THRESH_INF) {
249                                 tp->tp_threshold--;
250                                 if (outgoing &&
251                                     tp->tp_threshold == 0) {
252                                         /* see above */
253                                         list_del(&tp->tp_list);
254                                         list_add(&tp->tp_list, &cull);
255                                 }
256                         }
257                         break;
258                 }
259         }
260
261         lnet_net_unlock(0);
262
263         while (!list_empty(&cull)) {
264                 tp = list_entry(cull.next, struct lnet_test_peer, tp_list);
265                 list_del(&tp->tp_list);
266
267                 LIBCFS_FREE(tp, sizeof(*tp));
268         }
269
270         return fail;
271 }
272
273 unsigned int
274 lnet_iov_nob(unsigned int niov, struct kvec *iov)
275 {
276         unsigned int nob = 0;
277
278         LASSERT(niov == 0 || iov != NULL);
279         while (niov-- > 0)
280                 nob += (iov++)->iov_len;
281
282         return (nob);
283 }
284 EXPORT_SYMBOL(lnet_iov_nob);
285
286 void
287 lnet_copy_iov2iov(unsigned int ndiov, struct kvec *diov, unsigned int doffset,
288                   unsigned int nsiov, struct kvec *siov, unsigned int soffset,
289                   unsigned int nob)
290 {
291         /* NB diov, siov are READ-ONLY */
292         unsigned int  this_nob;
293
294         if (nob == 0)
295                 return;
296
297         /* skip complete frags before 'doffset' */
298         LASSERT(ndiov > 0);
299         while (doffset >= diov->iov_len) {
300                 doffset -= diov->iov_len;
301                 diov++;
302                 ndiov--;
303                 LASSERT(ndiov > 0);
304         }
305
306         /* skip complete frags before 'soffset' */
307         LASSERT(nsiov > 0);
308         while (soffset >= siov->iov_len) {
309                 soffset -= siov->iov_len;
310                 siov++;
311                 nsiov--;
312                 LASSERT(nsiov > 0);
313         }
314
315         do {
316                 LASSERT(ndiov > 0);
317                 LASSERT(nsiov > 0);
318                 this_nob = MIN(diov->iov_len - doffset,
319                                siov->iov_len - soffset);
320                 this_nob = MIN(this_nob, nob);
321
322                 memcpy((char *)diov->iov_base + doffset,
323                        (char *)siov->iov_base + soffset, this_nob);
324                 nob -= this_nob;
325
326                 if (diov->iov_len > doffset + this_nob) {
327                         doffset += this_nob;
328                 } else {
329                         diov++;
330                         ndiov--;
331                         doffset = 0;
332                 }
333
334                 if (siov->iov_len > soffset + this_nob) {
335                         soffset += this_nob;
336                 } else {
337                         siov++;
338                         nsiov--;
339                         soffset = 0;
340                 }
341         } while (nob > 0);
342 }
343 EXPORT_SYMBOL(lnet_copy_iov2iov);
344
345 int
346 lnet_extract_iov(int dst_niov, struct kvec *dst,
347                  int src_niov, struct kvec *src,
348                  unsigned int offset, unsigned int len)
349 {
350         /* Initialise 'dst' to the subset of 'src' starting at 'offset',
351          * for exactly 'len' bytes, and return the number of entries.
352          * NB not destructive to 'src' */
353         unsigned int    frag_len;
354         unsigned int    niov;
355
356         if (len == 0)                           /* no data => */
357                 return (0);                     /* no frags */
358
359         LASSERT(src_niov > 0);
360         while (offset >= src->iov_len) {      /* skip initial frags */
361                 offset -= src->iov_len;
362                 src_niov--;
363                 src++;
364                 LASSERT(src_niov > 0);
365         }
366
367         niov = 1;
368         for (;;) {
369                 LASSERT(src_niov > 0);
370                 LASSERT((int)niov <= dst_niov);
371
372                 frag_len = src->iov_len - offset;
373                 dst->iov_base = ((char *)src->iov_base) + offset;
374
375                 if (len <= frag_len) {
376                         dst->iov_len = len;
377                         return (niov);
378                 }
379
380                 dst->iov_len = frag_len;
381
382                 len -= frag_len;
383                 dst++;
384                 src++;
385                 niov++;
386                 src_niov--;
387                 offset = 0;
388         }
389 }
390 EXPORT_SYMBOL(lnet_extract_iov);
391
392
393 unsigned int
394 lnet_kiov_nob(unsigned int niov, lnet_kiov_t *kiov)
395 {
396         unsigned int  nob = 0;
397
398         LASSERT(niov == 0 || kiov != NULL);
399         while (niov-- > 0)
400                 nob += (kiov++)->kiov_len;
401
402         return (nob);
403 }
404 EXPORT_SYMBOL(lnet_kiov_nob);
405
406 void
407 lnet_copy_kiov2kiov(unsigned int ndiov, lnet_kiov_t *diov, unsigned int doffset,
408                     unsigned int nsiov, lnet_kiov_t *siov, unsigned int soffset,
409                     unsigned int nob)
410 {
411         /* NB diov, siov are READ-ONLY */
412         unsigned int    this_nob;
413         char           *daddr = NULL;
414         char           *saddr = NULL;
415
416         if (nob == 0)
417                 return;
418
419         LASSERT (!in_interrupt ());
420
421         LASSERT (ndiov > 0);
422         while (doffset >= diov->kiov_len) {
423                 doffset -= diov->kiov_len;
424                 diov++;
425                 ndiov--;
426                 LASSERT(ndiov > 0);
427         }
428
429         LASSERT(nsiov > 0);
430         while (soffset >= siov->kiov_len) {
431                 soffset -= siov->kiov_len;
432                 siov++;
433                 nsiov--;
434                 LASSERT(nsiov > 0);
435         }
436
437         do {
438                 LASSERT(ndiov > 0);
439                 LASSERT(nsiov > 0);
440                 this_nob = MIN(diov->kiov_len - doffset,
441                                siov->kiov_len - soffset);
442                 this_nob = MIN(this_nob, nob);
443
444                 if (daddr == NULL)
445                         daddr = ((char *)kmap(diov->kiov_page)) +
446                                 diov->kiov_offset + doffset;
447                 if (saddr == NULL)
448                         saddr = ((char *)kmap(siov->kiov_page)) +
449                                 siov->kiov_offset + soffset;
450
451                 /* Vanishing risk of kmap deadlock when mapping 2 pages.
452                  * However in practice at least one of the kiovs will be mapped
453                  * kernel pages and the map/unmap will be NOOPs */
454
455                 memcpy (daddr, saddr, this_nob);
456                 nob -= this_nob;
457
458                 if (diov->kiov_len > doffset + this_nob) {
459                         daddr += this_nob;
460                         doffset += this_nob;
461                 } else {
462                         kunmap(diov->kiov_page);
463                         daddr = NULL;
464                         diov++;
465                         ndiov--;
466                         doffset = 0;
467                 }
468
469                 if (siov->kiov_len > soffset + this_nob) {
470                         saddr += this_nob;
471                         soffset += this_nob;
472                 } else {
473                         kunmap(siov->kiov_page);
474                         saddr = NULL;
475                         siov++;
476                         nsiov--;
477                         soffset = 0;
478                 }
479         } while (nob > 0);
480
481         if (daddr != NULL)
482                 kunmap(diov->kiov_page);
483         if (saddr != NULL)
484                 kunmap(siov->kiov_page);
485 }
486 EXPORT_SYMBOL(lnet_copy_kiov2kiov);
487
488 void
489 lnet_copy_kiov2iov (unsigned int niov, struct kvec *iov, unsigned int iovoffset,
490                     unsigned int nkiov, lnet_kiov_t *kiov, unsigned int kiovoffset,
491                     unsigned int nob)
492 {
493         /* NB iov, kiov are READ-ONLY */
494         unsigned int    this_nob;
495         char           *addr = NULL;
496
497         if (nob == 0)
498                 return;
499
500         LASSERT (!in_interrupt ());
501
502         LASSERT (niov > 0);
503         while (iovoffset >= iov->iov_len) {
504                 iovoffset -= iov->iov_len;
505                 iov++;
506                 niov--;
507                 LASSERT(niov > 0);
508         }
509
510         LASSERT(nkiov > 0);
511         while (kiovoffset >= kiov->kiov_len) {
512                 kiovoffset -= kiov->kiov_len;
513                 kiov++;
514                 nkiov--;
515                 LASSERT(nkiov > 0);
516         }
517
518         do {
519                 LASSERT(niov > 0);
520                 LASSERT(nkiov > 0);
521                 this_nob = MIN(iov->iov_len - iovoffset,
522                                kiov->kiov_len - kiovoffset);
523                 this_nob = MIN(this_nob, nob);
524
525                 if (addr == NULL)
526                         addr = ((char *)kmap(kiov->kiov_page)) +
527                                 kiov->kiov_offset + kiovoffset;
528
529                 memcpy((char *)iov->iov_base + iovoffset, addr, this_nob);
530                 nob -= this_nob;
531
532                 if (iov->iov_len > iovoffset + this_nob) {
533                         iovoffset += this_nob;
534                 } else {
535                         iov++;
536                         niov--;
537                         iovoffset = 0;
538                 }
539
540                 if (kiov->kiov_len > kiovoffset + this_nob) {
541                         addr += this_nob;
542                         kiovoffset += this_nob;
543                 } else {
544                         kunmap(kiov->kiov_page);
545                         addr = NULL;
546                         kiov++;
547                         nkiov--;
548                         kiovoffset = 0;
549                 }
550
551         } while (nob > 0);
552
553         if (addr != NULL)
554                 kunmap(kiov->kiov_page);
555 }
556 EXPORT_SYMBOL(lnet_copy_kiov2iov);
557
558 void
559 lnet_copy_iov2kiov(unsigned int nkiov, lnet_kiov_t *kiov, unsigned int kiovoffset,
560                    unsigned int niov, struct kvec *iov, unsigned int iovoffset,
561                    unsigned int nob)
562 {
563         /* NB kiov, iov are READ-ONLY */
564         unsigned int    this_nob;
565         char           *addr = NULL;
566
567         if (nob == 0)
568                 return;
569
570         LASSERT (!in_interrupt ());
571
572         LASSERT (nkiov > 0);
573         while (kiovoffset >= kiov->kiov_len) {
574                 kiovoffset -= kiov->kiov_len;
575                 kiov++;
576                 nkiov--;
577                 LASSERT(nkiov > 0);
578         }
579
580         LASSERT(niov > 0);
581         while (iovoffset >= iov->iov_len) {
582                 iovoffset -= iov->iov_len;
583                 iov++;
584                 niov--;
585                 LASSERT(niov > 0);
586         }
587
588         do {
589                 LASSERT(nkiov > 0);
590                 LASSERT(niov > 0);
591                 this_nob = MIN(kiov->kiov_len - kiovoffset,
592                                iov->iov_len - iovoffset);
593                 this_nob = MIN(this_nob, nob);
594
595                 if (addr == NULL)
596                         addr = ((char *)kmap(kiov->kiov_page)) +
597                                 kiov->kiov_offset + kiovoffset;
598
599                 memcpy (addr, (char *)iov->iov_base + iovoffset, this_nob);
600                 nob -= this_nob;
601
602                 if (kiov->kiov_len > kiovoffset + this_nob) {
603                         addr += this_nob;
604                         kiovoffset += this_nob;
605                 } else {
606                         kunmap(kiov->kiov_page);
607                         addr = NULL;
608                         kiov++;
609                         nkiov--;
610                         kiovoffset = 0;
611                 }
612
613                 if (iov->iov_len > iovoffset + this_nob) {
614                         iovoffset += this_nob;
615                 } else {
616                         iov++;
617                         niov--;
618                         iovoffset = 0;
619                 }
620         } while (nob > 0);
621
622         if (addr != NULL)
623                 kunmap(kiov->kiov_page);
624 }
625 EXPORT_SYMBOL(lnet_copy_iov2kiov);
626
627 int
628 lnet_extract_kiov(int dst_niov, lnet_kiov_t *dst,
629                   int src_niov, lnet_kiov_t *src,
630                   unsigned int offset, unsigned int len)
631 {
632         /* Initialise 'dst' to the subset of 'src' starting at 'offset',
633          * for exactly 'len' bytes, and return the number of entries.
634          * NB not destructive to 'src' */
635         unsigned int    frag_len;
636         unsigned int    niov;
637
638         if (len == 0)                           /* no data => */
639                 return (0);                     /* no frags */
640
641         LASSERT(src_niov > 0);
642         while (offset >= src->kiov_len) {      /* skip initial frags */
643                 offset -= src->kiov_len;
644                 src_niov--;
645                 src++;
646                 LASSERT(src_niov > 0);
647         }
648
649         niov = 1;
650         for (;;) {
651                 LASSERT(src_niov > 0);
652                 LASSERT((int)niov <= dst_niov);
653
654                 frag_len = src->kiov_len - offset;
655                 dst->kiov_page = src->kiov_page;
656                 dst->kiov_offset = src->kiov_offset + offset;
657
658                 if (len <= frag_len) {
659                         dst->kiov_len = len;
660                         LASSERT(dst->kiov_offset + dst->kiov_len <= PAGE_SIZE);
661                         return niov;
662                 }
663
664                 dst->kiov_len = frag_len;
665                 LASSERT(dst->kiov_offset + dst->kiov_len <= PAGE_SIZE);
666
667                 len -= frag_len;
668                 dst++;
669                 src++;
670                 niov++;
671                 src_niov--;
672                 offset = 0;
673         }
674 }
675 EXPORT_SYMBOL(lnet_extract_kiov);
676
677 void
678 lnet_ni_recv(struct lnet_ni *ni, void *private, struct lnet_msg *msg,
679              int delayed, unsigned int offset, unsigned int mlen,
680              unsigned int rlen)
681 {
682         unsigned int  niov = 0;
683         struct kvec *iov = NULL;
684         lnet_kiov_t  *kiov = NULL;
685         int           rc;
686
687         LASSERT (!in_interrupt ());
688         LASSERT (mlen == 0 || msg != NULL);
689
690         if (msg != NULL) {
691                 LASSERT(msg->msg_receiving);
692                 LASSERT(!msg->msg_sending);
693                 LASSERT(rlen == msg->msg_len);
694                 LASSERT(mlen <= msg->msg_len);
695                 LASSERT(msg->msg_offset == offset);
696                 LASSERT(msg->msg_wanted == mlen);
697
698                 msg->msg_receiving = 0;
699
700                 if (mlen != 0) {
701                         niov = msg->msg_niov;
702                         iov  = msg->msg_iov;
703                         kiov = msg->msg_kiov;
704
705                         LASSERT (niov > 0);
706                         LASSERT ((iov == NULL) != (kiov == NULL));
707                 }
708         }
709
710         rc = (ni->ni_net->net_lnd->lnd_recv)(ni, private, msg, delayed,
711                                              niov, iov, kiov, offset, mlen,
712                                              rlen);
713         if (rc < 0)
714                 lnet_finalize(msg, rc);
715 }
716
717 static void
718 lnet_setpayloadbuffer(struct lnet_msg *msg)
719 {
720         struct lnet_libmd *md = msg->msg_md;
721
722         LASSERT(msg->msg_len > 0);
723         LASSERT(!msg->msg_routing);
724         LASSERT(md != NULL);
725         LASSERT(msg->msg_niov == 0);
726         LASSERT(msg->msg_iov == NULL);
727         LASSERT(msg->msg_kiov == NULL);
728
729         msg->msg_niov = md->md_niov;
730         if ((md->md_options & LNET_MD_KIOV) != 0)
731                 msg->msg_kiov = md->md_iov.kiov;
732         else
733                 msg->msg_iov = md->md_iov.iov;
734 }
735
736 void
737 lnet_prep_send(struct lnet_msg *msg, int type, struct lnet_process_id target,
738                unsigned int offset, unsigned int len)
739 {
740         msg->msg_type = type;
741         msg->msg_target = target;
742         msg->msg_len = len;
743         msg->msg_offset = offset;
744
745         if (len != 0)
746                 lnet_setpayloadbuffer(msg);
747
748         memset (&msg->msg_hdr, 0, sizeof (msg->msg_hdr));
749         msg->msg_hdr.type           = cpu_to_le32(type);
750         /* dest_nid will be overwritten by lnet_select_pathway() */
751         msg->msg_hdr.dest_nid       = cpu_to_le64(target.nid);
752         msg->msg_hdr.dest_pid       = cpu_to_le32(target.pid);
753         /* src_nid will be set later */
754         msg->msg_hdr.src_pid        = cpu_to_le32(the_lnet.ln_pid);
755         msg->msg_hdr.payload_length = cpu_to_le32(len);
756 }
757
758 static void
759 lnet_ni_send(struct lnet_ni *ni, struct lnet_msg *msg)
760 {
761         void   *priv = msg->msg_private;
762         int rc;
763
764         LASSERT (!in_interrupt ());
765         LASSERT (LNET_NETTYP(LNET_NIDNET(ni->ni_nid)) == LOLND ||
766                  (msg->msg_txcredit && msg->msg_peertxcredit));
767
768         rc = (ni->ni_net->net_lnd->lnd_send)(ni, priv, msg);
769         if (rc < 0) {
770                 msg->msg_no_resend = true;
771                 lnet_finalize(msg, rc);
772         }
773 }
774
775 static int
776 lnet_ni_eager_recv(struct lnet_ni *ni, struct lnet_msg *msg)
777 {
778         int     rc;
779
780         LASSERT(!msg->msg_sending);
781         LASSERT(msg->msg_receiving);
782         LASSERT(!msg->msg_rx_ready_delay);
783         LASSERT(ni->ni_net->net_lnd->lnd_eager_recv != NULL);
784
785         msg->msg_rx_ready_delay = 1;
786         rc = (ni->ni_net->net_lnd->lnd_eager_recv)(ni, msg->msg_private, msg,
787                                                   &msg->msg_private);
788         if (rc != 0) {
789                 CERROR("recv from %s / send to %s aborted: "
790                        "eager_recv failed %d\n",
791                        libcfs_nid2str(msg->msg_rxpeer->lpni_nid),
792                        libcfs_id2str(msg->msg_target), rc);
793                 LASSERT(rc < 0); /* required by my callers */
794         }
795
796         return rc;
797 }
798
799 /* NB: returns 1 when alive, 0 when dead, negative when error;
800  *     may drop the lnet_net_lock */
801 static int
802 lnet_peer_alive_locked(struct lnet_ni *ni, struct lnet_peer_ni *lpni,
803                        struct lnet_msg *msg)
804 {
805         if (!lnet_peer_aliveness_enabled(lpni))
806                 return -ENODEV;
807
808         /*
809          * If we're resending a message, let's attempt to send it even if
810          * the peer is down to fulfill our resend quota on the message
811          */
812         if (msg->msg_retry_count > 0)
813                 return 1;
814
815         /* try and send recovery messages irregardless */
816         if (msg->msg_recovery)
817                 return 1;
818
819         /* always send any responses */
820         if (msg->msg_type == LNET_MSG_ACK ||
821             msg->msg_type == LNET_MSG_REPLY)
822                 return 1;
823
824         return lnet_is_peer_ni_alive(lpni);
825 }
826
827 /**
828  * \param msg The message to be sent.
829  * \param do_send True if lnet_ni_send() should be called in this function.
830  *        lnet_send() is going to lnet_net_unlock immediately after this, so
831  *        it sets do_send FALSE and I don't do the unlock/send/lock bit.
832  *
833  * \retval LNET_CREDIT_OK If \a msg sent or OK to send.
834  * \retval LNET_CREDIT_WAIT If \a msg blocked for credit.
835  * \retval -EHOSTUNREACH If the next hop of the message appears dead.
836  * \retval -ECANCELED If the MD of the message has been unlinked.
837  */
838 static int
839 lnet_post_send_locked(struct lnet_msg *msg, int do_send)
840 {
841         struct lnet_peer_ni     *lp = msg->msg_txpeer;
842         struct lnet_ni          *ni = msg->msg_txni;
843         int                     cpt = msg->msg_tx_cpt;
844         struct lnet_tx_queue    *tq = ni->ni_tx_queues[cpt];
845
846         /* non-lnet_send() callers have checked before */
847         LASSERT(!do_send || msg->msg_tx_delayed);
848         LASSERT(!msg->msg_receiving);
849         LASSERT(msg->msg_tx_committed);
850         /* can't get here if we're sending to the loopback interface */
851         LASSERT(lp->lpni_nid != the_lnet.ln_loni->ni_nid);
852
853         /* NB 'lp' is always the next hop */
854         if ((msg->msg_target.pid & LNET_PID_USERFLAG) == 0 &&
855             lnet_peer_alive_locked(ni, lp, msg) == 0) {
856                 the_lnet.ln_counters[cpt]->lct_common.lcc_drop_count++;
857                 the_lnet.ln_counters[cpt]->lct_common.lcc_drop_length +=
858                         msg->msg_len;
859                 lnet_net_unlock(cpt);
860                 if (msg->msg_txpeer)
861                         lnet_incr_stats(&msg->msg_txpeer->lpni_stats,
862                                         msg->msg_type,
863                                         LNET_STATS_TYPE_DROP);
864                 if (msg->msg_txni)
865                         lnet_incr_stats(&msg->msg_txni->ni_stats,
866                                         msg->msg_type,
867                                         LNET_STATS_TYPE_DROP);
868
869                 CNETERR("Dropping message for %s: peer not alive\n",
870                         libcfs_id2str(msg->msg_target));
871                 msg->msg_health_status = LNET_MSG_STATUS_REMOTE_DROPPED;
872                 if (do_send)
873                         lnet_finalize(msg, -EHOSTUNREACH);
874
875                 lnet_net_lock(cpt);
876                 return -EHOSTUNREACH;
877         }
878
879         if (msg->msg_md != NULL &&
880             (msg->msg_md->md_flags & LNET_MD_FLAG_ABORTED) != 0) {
881                 lnet_net_unlock(cpt);
882
883                 CNETERR("Aborting message for %s: LNetM[DE]Unlink() already "
884                         "called on the MD/ME.\n",
885                         libcfs_id2str(msg->msg_target));
886                 if (do_send) {
887                         msg->msg_no_resend = true;
888                         CDEBUG(D_NET, "msg %p to %s canceled and will not be resent\n",
889                                msg, libcfs_id2str(msg->msg_target));
890                         lnet_finalize(msg, -ECANCELED);
891                 }
892
893                 lnet_net_lock(cpt);
894                 return -ECANCELED;
895         }
896
897         if (!msg->msg_peertxcredit) {
898                 spin_lock(&lp->lpni_lock);
899                 LASSERT((lp->lpni_txcredits < 0) ==
900                         !list_empty(&lp->lpni_txq));
901
902                 msg->msg_peertxcredit = 1;
903                 lp->lpni_txqnob += msg->msg_len + sizeof(struct lnet_hdr);
904                 lp->lpni_txcredits--;
905
906                 if (lp->lpni_txcredits < lp->lpni_mintxcredits)
907                         lp->lpni_mintxcredits = lp->lpni_txcredits;
908
909                 if (lp->lpni_txcredits < 0) {
910                         msg->msg_tx_delayed = 1;
911                         list_add_tail(&msg->msg_list, &lp->lpni_txq);
912                         spin_unlock(&lp->lpni_lock);
913                         return LNET_CREDIT_WAIT;
914                 }
915                 spin_unlock(&lp->lpni_lock);
916         }
917
918         if (!msg->msg_txcredit) {
919                 LASSERT((tq->tq_credits < 0) ==
920                         !list_empty(&tq->tq_delayed));
921
922                 msg->msg_txcredit = 1;
923                 tq->tq_credits--;
924                 atomic_dec(&ni->ni_tx_credits);
925
926                 if (tq->tq_credits < tq->tq_credits_min)
927                         tq->tq_credits_min = tq->tq_credits;
928
929                 if (tq->tq_credits < 0) {
930                         msg->msg_tx_delayed = 1;
931                         list_add_tail(&msg->msg_list, &tq->tq_delayed);
932                         return LNET_CREDIT_WAIT;
933                 }
934         }
935
936         /* unset the tx_delay flag as we're going to send it now */
937         msg->msg_tx_delayed = 0;
938
939         if (do_send) {
940                 lnet_net_unlock(cpt);
941                 lnet_ni_send(ni, msg);
942                 lnet_net_lock(cpt);
943         }
944         return LNET_CREDIT_OK;
945 }
946
947
948 static struct lnet_rtrbufpool *
949 lnet_msg2bufpool(struct lnet_msg *msg)
950 {
951         struct lnet_rtrbufpool  *rbp;
952         int                     cpt;
953
954         LASSERT(msg->msg_rx_committed);
955
956         cpt = msg->msg_rx_cpt;
957         rbp = &the_lnet.ln_rtrpools[cpt][0];
958
959         LASSERT(msg->msg_len <= LNET_MTU);
960         while (msg->msg_len > (unsigned int)rbp->rbp_npages * PAGE_SIZE) {
961                 rbp++;
962                 LASSERT(rbp < &the_lnet.ln_rtrpools[cpt][LNET_NRBPOOLS]);
963         }
964
965         return rbp;
966 }
967
968 static int
969 lnet_post_routed_recv_locked(struct lnet_msg *msg, int do_recv)
970 {
971         /* lnet_parse is going to lnet_net_unlock immediately after this, so it
972          * sets do_recv FALSE and I don't do the unlock/send/lock bit.
973          * I return LNET_CREDIT_WAIT if msg blocked and LNET_CREDIT_OK if
974          * received or OK to receive */
975         struct lnet_peer_ni *lpni = msg->msg_rxpeer;
976         struct lnet_peer *lp;
977         struct lnet_rtrbufpool *rbp;
978         struct lnet_rtrbuf *rb;
979
980         LASSERT(msg->msg_iov == NULL);
981         LASSERT(msg->msg_kiov == NULL);
982         LASSERT(msg->msg_niov == 0);
983         LASSERT(msg->msg_routing);
984         LASSERT(msg->msg_receiving);
985         LASSERT(!msg->msg_sending);
986         LASSERT(lpni->lpni_peer_net);
987         LASSERT(lpni->lpni_peer_net->lpn_peer);
988
989         lp = lpni->lpni_peer_net->lpn_peer;
990
991         /* non-lnet_parse callers only receive delayed messages */
992         LASSERT(!do_recv || msg->msg_rx_delayed);
993
994         if (!msg->msg_peerrtrcredit) {
995                 /* lpni_lock protects the credit manipulation */
996                 spin_lock(&lpni->lpni_lock);
997                 /* lp_lock protects the lp_rtrq */
998                 spin_lock(&lp->lp_lock);
999
1000                 msg->msg_peerrtrcredit = 1;
1001                 lpni->lpni_rtrcredits--;
1002                 if (lpni->lpni_rtrcredits < lpni->lpni_minrtrcredits)
1003                         lpni->lpni_minrtrcredits = lpni->lpni_rtrcredits;
1004
1005                 if (lpni->lpni_rtrcredits < 0) {
1006                         /* must have checked eager_recv before here */
1007                         LASSERT(msg->msg_rx_ready_delay);
1008                         msg->msg_rx_delayed = 1;
1009                         list_add_tail(&msg->msg_list, &lp->lp_rtrq);
1010                         spin_unlock(&lp->lp_lock);
1011                         spin_unlock(&lpni->lpni_lock);
1012                         return LNET_CREDIT_WAIT;
1013                 }
1014                 spin_unlock(&lp->lp_lock);
1015                 spin_unlock(&lpni->lpni_lock);
1016         }
1017
1018         rbp = lnet_msg2bufpool(msg);
1019
1020         if (!msg->msg_rtrcredit) {
1021                 msg->msg_rtrcredit = 1;
1022                 rbp->rbp_credits--;
1023                 if (rbp->rbp_credits < rbp->rbp_mincredits)
1024                         rbp->rbp_mincredits = rbp->rbp_credits;
1025
1026                 if (rbp->rbp_credits < 0) {
1027                         /* must have checked eager_recv before here */
1028                         LASSERT(msg->msg_rx_ready_delay);
1029                         msg->msg_rx_delayed = 1;
1030                         list_add_tail(&msg->msg_list, &rbp->rbp_msgs);
1031                         return LNET_CREDIT_WAIT;
1032                 }
1033         }
1034
1035         LASSERT(!list_empty(&rbp->rbp_bufs));
1036         rb = list_entry(rbp->rbp_bufs.next, struct lnet_rtrbuf, rb_list);
1037         list_del(&rb->rb_list);
1038
1039         msg->msg_niov = rbp->rbp_npages;
1040         msg->msg_kiov = &rb->rb_kiov[0];
1041
1042         /* unset the msg-rx_delayed flag since we're receiving the message */
1043         msg->msg_rx_delayed = 0;
1044
1045         if (do_recv) {
1046                 int cpt = msg->msg_rx_cpt;
1047
1048                 lnet_net_unlock(cpt);
1049                 lnet_ni_recv(msg->msg_rxni, msg->msg_private, msg, 1,
1050                              0, msg->msg_len, msg->msg_len);
1051                 lnet_net_lock(cpt);
1052         }
1053         return LNET_CREDIT_OK;
1054 }
1055
1056 void
1057 lnet_return_tx_credits_locked(struct lnet_msg *msg)
1058 {
1059         struct lnet_peer_ni     *txpeer = msg->msg_txpeer;
1060         struct lnet_ni          *txni = msg->msg_txni;
1061         struct lnet_msg         *msg2;
1062
1063         if (msg->msg_txcredit) {
1064                 struct lnet_ni       *ni = msg->msg_txni;
1065                 struct lnet_tx_queue *tq = ni->ni_tx_queues[msg->msg_tx_cpt];
1066
1067                 /* give back NI txcredits */
1068                 msg->msg_txcredit = 0;
1069
1070                 LASSERT((tq->tq_credits < 0) ==
1071                         !list_empty(&tq->tq_delayed));
1072
1073                 tq->tq_credits++;
1074                 atomic_inc(&ni->ni_tx_credits);
1075                 if (tq->tq_credits <= 0) {
1076                         msg2 = list_entry(tq->tq_delayed.next,
1077                                           struct lnet_msg, msg_list);
1078                         list_del(&msg2->msg_list);
1079
1080                         LASSERT(msg2->msg_txni == ni);
1081                         LASSERT(msg2->msg_tx_delayed);
1082                         LASSERT(msg2->msg_tx_cpt == msg->msg_tx_cpt);
1083
1084                         (void) lnet_post_send_locked(msg2, 1);
1085                 }
1086         }
1087
1088         if (msg->msg_peertxcredit) {
1089                 /* give back peer txcredits */
1090                 msg->msg_peertxcredit = 0;
1091
1092                 spin_lock(&txpeer->lpni_lock);
1093                 LASSERT((txpeer->lpni_txcredits < 0) ==
1094                         !list_empty(&txpeer->lpni_txq));
1095
1096                 txpeer->lpni_txqnob -= msg->msg_len + sizeof(struct lnet_hdr);
1097                 LASSERT(txpeer->lpni_txqnob >= 0);
1098
1099                 txpeer->lpni_txcredits++;
1100                 if (txpeer->lpni_txcredits <= 0) {
1101                         int msg2_cpt;
1102
1103                         msg2 = list_entry(txpeer->lpni_txq.next,
1104                                               struct lnet_msg, msg_list);
1105                         list_del(&msg2->msg_list);
1106                         spin_unlock(&txpeer->lpni_lock);
1107
1108                         LASSERT(msg2->msg_txpeer == txpeer);
1109                         LASSERT(msg2->msg_tx_delayed);
1110
1111                         msg2_cpt = msg2->msg_tx_cpt;
1112
1113                         /*
1114                          * The msg_cpt can be different from the msg2_cpt
1115                          * so we need to make sure we lock the correct cpt
1116                          * for msg2.
1117                          * Once we call lnet_post_send_locked() it is no
1118                          * longer safe to access msg2, since it could've
1119                          * been freed by lnet_finalize(), but we still
1120                          * need to relock the correct cpt, so we cache the
1121                          * msg2_cpt for the purpose of the check that
1122                          * follows the call to lnet_pose_send_locked().
1123                          */
1124                         if (msg2_cpt != msg->msg_tx_cpt) {
1125                                 lnet_net_unlock(msg->msg_tx_cpt);
1126                                 lnet_net_lock(msg2_cpt);
1127                         }
1128                         (void) lnet_post_send_locked(msg2, 1);
1129                         if (msg2_cpt != msg->msg_tx_cpt) {
1130                                 lnet_net_unlock(msg2_cpt);
1131                                 lnet_net_lock(msg->msg_tx_cpt);
1132                         }
1133                 } else {
1134                         spin_unlock(&txpeer->lpni_lock);
1135                 }
1136         }
1137
1138         if (txni != NULL) {
1139                 msg->msg_txni = NULL;
1140                 lnet_ni_decref_locked(txni, msg->msg_tx_cpt);
1141         }
1142
1143         if (txpeer != NULL) {
1144                 msg->msg_txpeer = NULL;
1145                 lnet_peer_ni_decref_locked(txpeer);
1146         }
1147 }
1148
1149 void
1150 lnet_schedule_blocked_locked(struct lnet_rtrbufpool *rbp)
1151 {
1152         struct lnet_msg *msg;
1153
1154         if (list_empty(&rbp->rbp_msgs))
1155                 return;
1156         msg = list_entry(rbp->rbp_msgs.next,
1157                          struct lnet_msg, msg_list);
1158         list_del(&msg->msg_list);
1159
1160         (void)lnet_post_routed_recv_locked(msg, 1);
1161 }
1162
1163 void
1164 lnet_drop_routed_msgs_locked(struct list_head *list, int cpt)
1165 {
1166         struct lnet_msg *msg;
1167         struct lnet_msg *tmp;
1168
1169         lnet_net_unlock(cpt);
1170
1171         list_for_each_entry_safe(msg, tmp, list, msg_list) {
1172                 lnet_ni_recv(msg->msg_rxni, msg->msg_private, NULL,
1173                              0, 0, 0, msg->msg_hdr.payload_length);
1174                 list_del_init(&msg->msg_list);
1175                 msg->msg_no_resend = true;
1176                 msg->msg_health_status = LNET_MSG_STATUS_REMOTE_ERROR;
1177                 lnet_finalize(msg, -ECANCELED);
1178         }
1179
1180         lnet_net_lock(cpt);
1181 }
1182
1183 void
1184 lnet_return_rx_credits_locked(struct lnet_msg *msg)
1185 {
1186         struct lnet_peer_ni *rxpeerni = msg->msg_rxpeer;
1187         struct lnet_peer *lp;
1188         struct lnet_ni *rxni = msg->msg_rxni;
1189         struct lnet_msg *msg2;
1190
1191         if (msg->msg_rtrcredit) {
1192                 /* give back global router credits */
1193                 struct lnet_rtrbuf *rb;
1194                 struct lnet_rtrbufpool *rbp;
1195
1196                 /* NB If a msg ever blocks for a buffer in rbp_msgs, it stays
1197                  * there until it gets one allocated, or aborts the wait
1198                  * itself */
1199                 LASSERT(msg->msg_kiov != NULL);
1200
1201                 rb = list_entry(msg->msg_kiov, struct lnet_rtrbuf, rb_kiov[0]);
1202                 rbp = rb->rb_pool;
1203
1204                 msg->msg_kiov = NULL;
1205                 msg->msg_rtrcredit = 0;
1206
1207                 LASSERT(rbp == lnet_msg2bufpool(msg));
1208
1209                 LASSERT((rbp->rbp_credits > 0) ==
1210                         !list_empty(&rbp->rbp_bufs));
1211
1212                 /* If routing is now turned off, we just drop this buffer and
1213                  * don't bother trying to return credits.  */
1214                 if (!the_lnet.ln_routing) {
1215                         lnet_destroy_rtrbuf(rb, rbp->rbp_npages);
1216                         goto routing_off;
1217                 }
1218
1219                 /* It is possible that a user has lowered the desired number of
1220                  * buffers in this pool.  Make sure we never put back
1221                  * more buffers than the stated number. */
1222                 if (unlikely(rbp->rbp_credits >= rbp->rbp_req_nbuffers)) {
1223                         /* Discard this buffer so we don't have too
1224                          * many. */
1225                         lnet_destroy_rtrbuf(rb, rbp->rbp_npages);
1226                         rbp->rbp_nbuffers--;
1227                 } else {
1228                         list_add(&rb->rb_list, &rbp->rbp_bufs);
1229                         rbp->rbp_credits++;
1230                         if (rbp->rbp_credits <= 0)
1231                                 lnet_schedule_blocked_locked(rbp);
1232                 }
1233         }
1234
1235 routing_off:
1236         if (msg->msg_peerrtrcredit) {
1237                 LASSERT(rxpeerni);
1238                 LASSERT(rxpeerni->lpni_peer_net);
1239                 LASSERT(rxpeerni->lpni_peer_net->lpn_peer);
1240
1241                 lp = rxpeerni->lpni_peer_net->lpn_peer;
1242
1243                 /* give back peer router credits */
1244                 msg->msg_peerrtrcredit = 0;
1245
1246                 spin_lock(&rxpeerni->lpni_lock);
1247                 spin_lock(&lp->lp_lock);
1248
1249                 rxpeerni->lpni_rtrcredits++;
1250
1251                 /* drop all messages which are queued to be routed on that
1252                  * peer. */
1253                 if (!the_lnet.ln_routing) {
1254                         struct list_head drop;
1255                         INIT_LIST_HEAD(&drop);
1256                         list_splice_init(&lp->lp_rtrq, &drop);
1257                         spin_unlock(&lp->lp_lock);
1258                         spin_unlock(&rxpeerni->lpni_lock);
1259                         lnet_drop_routed_msgs_locked(&drop, msg->msg_rx_cpt);
1260                 } else if (!list_empty(&lp->lp_rtrq)) {
1261                         int msg2_cpt;
1262
1263                         msg2 = list_entry(lp->lp_rtrq.next,
1264                                           struct lnet_msg, msg_list);
1265                         list_del(&msg2->msg_list);
1266                         msg2_cpt = msg2->msg_rx_cpt;
1267                         spin_unlock(&lp->lp_lock);
1268                         spin_unlock(&rxpeerni->lpni_lock);
1269                         /*
1270                          * messages on the lp_rtrq can be from any NID in
1271                          * the peer, which means they might have different
1272                          * cpts. We need to make sure we lock the right
1273                          * one.
1274                          */
1275                         if (msg2_cpt != msg->msg_rx_cpt) {
1276                                 lnet_net_unlock(msg->msg_rx_cpt);
1277                                 lnet_net_lock(msg2_cpt);
1278                         }
1279                         (void) lnet_post_routed_recv_locked(msg2, 1);
1280                         if (msg2_cpt != msg->msg_rx_cpt) {
1281                                 lnet_net_unlock(msg2_cpt);
1282                                 lnet_net_lock(msg->msg_rx_cpt);
1283                         }
1284                 } else {
1285                         spin_unlock(&lp->lp_lock);
1286                         spin_unlock(&rxpeerni->lpni_lock);
1287                 }
1288         }
1289         if (rxni != NULL) {
1290                 msg->msg_rxni = NULL;
1291                 lnet_ni_decref_locked(rxni, msg->msg_rx_cpt);
1292         }
1293         if (rxpeerni != NULL) {
1294                 msg->msg_rxpeer = NULL;
1295                 lnet_peer_ni_decref_locked(rxpeerni);
1296         }
1297 }
1298
1299 static int
1300 lnet_compare_peers(struct lnet_peer_ni *p1, struct lnet_peer_ni *p2)
1301 {
1302         if (p1->lpni_txqnob < p2->lpni_txqnob)
1303                 return 1;
1304
1305         if (p1->lpni_txqnob > p2->lpni_txqnob)
1306                 return -1;
1307
1308         if (p1->lpni_txcredits > p2->lpni_txcredits)
1309                 return 1;
1310
1311         if (p1->lpni_txcredits < p2->lpni_txcredits)
1312                 return -1;
1313
1314         return 0;
1315 }
1316
1317 static struct lnet_peer_ni *
1318 lnet_select_peer_ni(struct lnet_ni *best_ni, lnet_nid_t dst_nid,
1319                     struct lnet_peer *peer,
1320                     struct lnet_peer_net *peer_net)
1321 {
1322         /*
1323          * Look at the peer NIs for the destination peer that connect
1324          * to the chosen net. If a peer_ni is preferred when using the
1325          * best_ni to communicate, we use that one. If there is no
1326          * preferred peer_ni, or there are multiple preferred peer_ni,
1327          * the available transmit credits are used. If the transmit
1328          * credits are equal, we round-robin over the peer_ni.
1329          */
1330         struct lnet_peer_ni *lpni = NULL;
1331         struct lnet_peer_ni *best_lpni = NULL;
1332         int best_lpni_credits = INT_MIN;
1333         bool preferred = false;
1334         bool ni_is_pref;
1335         int best_lpni_healthv = 0;
1336         int lpni_healthv;
1337
1338         while ((lpni = lnet_get_next_peer_ni_locked(peer, peer_net, lpni))) {
1339                 /*
1340                  * if the best_ni we've chosen aleady has this lpni
1341                  * preferred, then let's use it
1342                  */
1343                 if (best_ni) {
1344                         ni_is_pref = lnet_peer_is_pref_nid_locked(lpni,
1345                                                                 best_ni->ni_nid);
1346                         CDEBUG(D_NET, "%s ni_is_pref = %d\n",
1347                                libcfs_nid2str(best_ni->ni_nid), ni_is_pref);
1348                 } else {
1349                         ni_is_pref = false;
1350                 }
1351
1352                 lpni_healthv = atomic_read(&lpni->lpni_healthv);
1353
1354                 if (best_lpni)
1355                         CDEBUG(D_NET, "%s c:[%d, %d], s:[%d, %d]\n",
1356                                 libcfs_nid2str(lpni->lpni_nid),
1357                                 lpni->lpni_txcredits, best_lpni_credits,
1358                                 lpni->lpni_seq, best_lpni->lpni_seq);
1359
1360                 /* pick the healthiest peer ni */
1361                 if (lpni_healthv < best_lpni_healthv) {
1362                         continue;
1363                 } else if (lpni_healthv > best_lpni_healthv) {
1364                         best_lpni_healthv = lpni_healthv;
1365                 /* if this is a preferred peer use it */
1366                 } else if (!preferred && ni_is_pref) {
1367                         preferred = true;
1368                 } else if (preferred && !ni_is_pref) {
1369                         /*
1370                          * this is not the preferred peer so let's ignore
1371                          * it.
1372                          */
1373                         continue;
1374                 } else if (lpni->lpni_txcredits < best_lpni_credits) {
1375                         /*
1376                          * We already have a peer that has more credits
1377                          * available than this one. No need to consider
1378                          * this peer further.
1379                          */
1380                         continue;
1381                 } else if (lpni->lpni_txcredits == best_lpni_credits) {
1382                         /*
1383                          * The best peer found so far and the current peer
1384                          * have the same number of available credits let's
1385                          * make sure to select between them using Round
1386                          * Robin
1387                          */
1388                         if (best_lpni) {
1389                                 if (best_lpni->lpni_seq <= lpni->lpni_seq)
1390                                         continue;
1391                         }
1392                 }
1393
1394                 best_lpni = lpni;
1395                 best_lpni_credits = lpni->lpni_txcredits;
1396         }
1397
1398         /* if we still can't find a peer ni then we can't reach it */
1399         if (!best_lpni) {
1400                 __u32 net_id = (peer_net) ? peer_net->lpn_net_id :
1401                         LNET_NIDNET(dst_nid);
1402                 CDEBUG(D_NET, "no peer_ni found on peer net %s\n",
1403                                 libcfs_net2str(net_id));
1404                 return NULL;
1405         }
1406
1407         CDEBUG(D_NET, "sd_best_lpni = %s\n",
1408                libcfs_nid2str(best_lpni->lpni_nid));
1409
1410         return best_lpni;
1411 }
1412
1413 /*
1414  * Prerequisite: the best_ni should already be set in the sd
1415  */
1416 static inline struct lnet_peer_ni *
1417 lnet_find_best_lpni_on_net(struct lnet_send_data *sd, struct lnet_peer *peer,
1418                            __u32 net_id)
1419 {
1420         struct lnet_peer_net *peer_net;
1421
1422         /*
1423          * The gateway is Multi-Rail capable so now we must select the
1424          * proper peer_ni
1425          */
1426         peer_net = lnet_peer_get_net_locked(peer, net_id);
1427
1428         if (!peer_net) {
1429                 CERROR("gateway peer %s has no NI on net %s\n",
1430                        libcfs_nid2str(peer->lp_primary_nid),
1431                        libcfs_net2str(net_id));
1432                 return NULL;
1433         }
1434
1435         return lnet_select_peer_ni(sd->sd_best_ni, sd->sd_dst_nid,
1436                                    peer, peer_net);
1437 }
1438
1439 static int
1440 lnet_compare_routes(struct lnet_route *r1, struct lnet_route *r2,
1441                     struct lnet_peer_ni **best_lpni)
1442 {
1443         int r1_hops = (r1->lr_hops == LNET_UNDEFINED_HOPS) ? 1 : r1->lr_hops;
1444         int r2_hops = (r2->lr_hops == LNET_UNDEFINED_HOPS) ? 1 : r2->lr_hops;
1445         struct lnet_peer *lp1 = r1->lr_gateway;
1446         struct lnet_peer *lp2 = r2->lr_gateway;
1447         struct lnet_peer_ni *lpni1;
1448         struct lnet_peer_ni *lpni2;
1449         struct lnet_send_data sd;
1450         int rc;
1451
1452         sd.sd_best_ni = NULL;
1453         sd.sd_dst_nid = LNET_NID_ANY;
1454         lpni1 = lnet_find_best_lpni_on_net(&sd, lp1, r1->lr_lnet);
1455         lpni2 = lnet_find_best_lpni_on_net(&sd, lp2, r2->lr_lnet);
1456         LASSERT(lpni1 && lpni2);
1457
1458         if (r1->lr_priority < r2->lr_priority) {
1459                 *best_lpni = lpni1;
1460                 return 1;
1461         }
1462
1463         if (r1->lr_priority > r2->lr_priority) {
1464                 *best_lpni = lpni2;
1465                 return -1;
1466         }
1467
1468         if (r1_hops < r2_hops) {
1469                 *best_lpni = lpni1;
1470                 return 1;
1471         }
1472
1473         if (r1_hops > r2_hops) {
1474                 *best_lpni = lpni2;
1475                 return -1;
1476         }
1477
1478         rc = lnet_compare_peers(lpni1, lpni2);
1479         if (rc == 1) {
1480                 *best_lpni = lpni1;
1481                 return rc;
1482         } else if (rc == -1) {
1483                 *best_lpni = lpni2;
1484                 return rc;
1485         }
1486
1487         if (r1->lr_seq - r2->lr_seq <= 0) {
1488                 *best_lpni = lpni1;
1489                 return 1;
1490         }
1491
1492         *best_lpni = lpni2;
1493         return -1;
1494 }
1495
1496 static struct lnet_route *
1497 lnet_find_route_locked(struct lnet_net *net, __u32 remote_net,
1498                        lnet_nid_t rtr_nid, struct lnet_route **prev_route,
1499                        struct lnet_peer_ni **gwni)
1500 {
1501         struct lnet_peer_ni *best_gw_ni = NULL;
1502         struct lnet_route *best_route;
1503         struct lnet_route *last_route;
1504         struct lnet_remotenet *rnet;
1505         struct lnet_peer *lp_best;
1506         struct lnet_route *route;
1507         struct lnet_peer *lp;
1508         int rc;
1509
1510         /* If @rtr_nid is not LNET_NID_ANY, return the gateway with
1511          * rtr_nid nid, otherwise find the best gateway I can use */
1512
1513         rnet = lnet_find_rnet_locked(remote_net);
1514         if (rnet == NULL)
1515                 return NULL;
1516
1517         lp_best = NULL;
1518         best_route = last_route = NULL;
1519         list_for_each_entry(route, &rnet->lrn_routes, lr_list) {
1520                 lp = route->lr_gateway;
1521
1522                 if (!lnet_is_route_alive(route))
1523                         continue;
1524
1525                 if (lp_best == NULL) {
1526                         best_route = last_route = route;
1527                         lp_best = lp;
1528                 }
1529
1530                 /* no protection on below fields, but it's harmless */
1531                 if (last_route->lr_seq - route->lr_seq < 0)
1532                         last_route = route;
1533
1534                 rc = lnet_compare_routes(route, best_route, &best_gw_ni);
1535                 if (rc < 0)
1536                         continue;
1537
1538                 best_route = route;
1539                 lp_best = lp;
1540         }
1541
1542         *prev_route = last_route;
1543         *gwni = best_gw_ni;
1544
1545         return best_route;
1546 }
1547
1548 static struct lnet_ni *
1549 lnet_get_best_ni(struct lnet_net *local_net, struct lnet_ni *best_ni,
1550                  struct lnet_peer *peer, struct lnet_peer_net *peer_net,
1551                  int md_cpt)
1552 {
1553         struct lnet_ni *ni = NULL;
1554         unsigned int shortest_distance;
1555         int best_credits;
1556         int best_healthv;
1557
1558         /*
1559          * If there is no peer_ni that we can send to on this network,
1560          * then there is no point in looking for a new best_ni here.
1561         */
1562         if (!lnet_get_next_peer_ni_locked(peer, peer_net, NULL))
1563                 return best_ni;
1564
1565         if (best_ni == NULL) {
1566                 shortest_distance = UINT_MAX;
1567                 best_credits = INT_MIN;
1568                 best_healthv = 0;
1569         } else {
1570                 shortest_distance = cfs_cpt_distance(lnet_cpt_table(), md_cpt,
1571                                                      best_ni->ni_dev_cpt);
1572                 best_credits = atomic_read(&best_ni->ni_tx_credits);
1573                 best_healthv = atomic_read(&best_ni->ni_healthv);
1574         }
1575
1576         while ((ni = lnet_get_next_ni_locked(local_net, ni))) {
1577                 unsigned int distance;
1578                 int ni_credits;
1579                 int ni_healthv;
1580                 int ni_fatal;
1581
1582                 ni_credits = atomic_read(&ni->ni_tx_credits);
1583                 ni_healthv = atomic_read(&ni->ni_healthv);
1584                 ni_fatal = atomic_read(&ni->ni_fatal_error_on);
1585
1586                 /*
1587                  * calculate the distance from the CPT on which
1588                  * the message memory is allocated to the CPT of
1589                  * the NI's physical device
1590                  */
1591                 distance = cfs_cpt_distance(lnet_cpt_table(),
1592                                             md_cpt,
1593                                             ni->ni_dev_cpt);
1594
1595                 CDEBUG(D_NET, "compare ni %s [c:%d, d:%d, s:%d] with best_ni %s [c:%d, d:%d, s:%d]\n",
1596                        libcfs_nid2str(ni->ni_nid), ni_credits, distance,
1597                        ni->ni_seq, (best_ni) ? libcfs_nid2str(best_ni->ni_nid)
1598                         : "not seleced", best_credits, shortest_distance,
1599                         (best_ni) ? best_ni->ni_seq : 0);
1600
1601                 /*
1602                  * All distances smaller than the NUMA range
1603                  * are treated equally.
1604                  */
1605                 if (distance < lnet_numa_range)
1606                         distance = lnet_numa_range;
1607
1608                 /*
1609                  * Select on health, shorter distance, available
1610                  * credits, then round-robin.
1611                  */
1612                 if (ni_fatal) {
1613                         continue;
1614                 } else if (ni_healthv < best_healthv) {
1615                         continue;
1616                 } else if (ni_healthv > best_healthv) {
1617                         best_healthv = ni_healthv;
1618                         /*
1619                          * If we're going to prefer this ni because it's
1620                          * the healthiest, then we should set the
1621                          * shortest_distance in the algorithm in case
1622                          * there are multiple NIs with the same health but
1623                          * different distances.
1624                          */
1625                         if (distance < shortest_distance)
1626                                 shortest_distance = distance;
1627                 } else if (distance > shortest_distance) {
1628                         continue;
1629                 } else if (distance < shortest_distance) {
1630                         shortest_distance = distance;
1631                 } else if (ni_credits < best_credits) {
1632                         continue;
1633                 } else if (ni_credits == best_credits) {
1634                         if (best_ni && best_ni->ni_seq <= ni->ni_seq)
1635                                 continue;
1636                 }
1637                 best_ni = ni;
1638                 best_credits = ni_credits;
1639         }
1640
1641         CDEBUG(D_NET, "selected best_ni %s\n",
1642                (best_ni) ? libcfs_nid2str(best_ni->ni_nid) : "no selection");
1643
1644         return best_ni;
1645 }
1646
1647 /*
1648  * Traffic to the LNET_RESERVED_PORTAL may not trigger peer discovery,
1649  * because such traffic is required to perform discovery. We therefore
1650  * exclude all GET and PUT on that portal. We also exclude all ACK and
1651  * REPLY traffic, but that is because the portal is not tracked in the
1652  * message structure for these message types. We could restrict this
1653  * further by also checking for LNET_PROTO_PING_MATCHBITS.
1654  */
1655 static bool
1656 lnet_msg_discovery(struct lnet_msg *msg)
1657 {
1658         if (msg->msg_type == LNET_MSG_PUT) {
1659                 if (msg->msg_hdr.msg.put.ptl_index != LNET_RESERVED_PORTAL)
1660                         return true;
1661         } else if (msg->msg_type == LNET_MSG_GET) {
1662                 if (msg->msg_hdr.msg.get.ptl_index != LNET_RESERVED_PORTAL)
1663                         return true;
1664         }
1665         return false;
1666 }
1667
1668 #define SRC_SPEC        0x0001
1669 #define SRC_ANY         0x0002
1670 #define LOCAL_DST       0x0004
1671 #define REMOTE_DST      0x0008
1672 #define MR_DST          0x0010
1673 #define NMR_DST         0x0020
1674 #define SND_RESP        0x0040
1675
1676 /* The following to defines are used for return codes */
1677 #define REPEAT_SEND     0x1000
1678 #define PASS_THROUGH    0x2000
1679
1680 /* The different cases lnet_select pathway needs to handle */
1681 #define SRC_SPEC_LOCAL_MR_DST   (SRC_SPEC | LOCAL_DST | MR_DST)
1682 #define SRC_SPEC_ROUTER_MR_DST  (SRC_SPEC | REMOTE_DST | MR_DST)
1683 #define SRC_SPEC_LOCAL_NMR_DST  (SRC_SPEC | LOCAL_DST | NMR_DST)
1684 #define SRC_SPEC_ROUTER_NMR_DST (SRC_SPEC | REMOTE_DST | NMR_DST)
1685 #define SRC_ANY_LOCAL_MR_DST    (SRC_ANY | LOCAL_DST | MR_DST)
1686 #define SRC_ANY_ROUTER_MR_DST   (SRC_ANY | REMOTE_DST | MR_DST)
1687 #define SRC_ANY_LOCAL_NMR_DST   (SRC_ANY | LOCAL_DST | NMR_DST)
1688 #define SRC_ANY_ROUTER_NMR_DST  (SRC_ANY | REMOTE_DST | NMR_DST)
1689
1690 static int
1691 lnet_handle_lo_send(struct lnet_send_data *sd)
1692 {
1693         struct lnet_msg *msg = sd->sd_msg;
1694         int cpt = sd->sd_cpt;
1695
1696         /* No send credit hassles with LOLND */
1697         lnet_ni_addref_locked(the_lnet.ln_loni, cpt);
1698         msg->msg_hdr.dest_nid = cpu_to_le64(the_lnet.ln_loni->ni_nid);
1699         if (!msg->msg_routing)
1700                 msg->msg_hdr.src_nid =
1701                         cpu_to_le64(the_lnet.ln_loni->ni_nid);
1702         msg->msg_target.nid = the_lnet.ln_loni->ni_nid;
1703         lnet_msg_commit(msg, cpt);
1704         msg->msg_txni = the_lnet.ln_loni;
1705
1706         return LNET_CREDIT_OK;
1707 }
1708
1709 static int
1710 lnet_handle_send(struct lnet_send_data *sd)
1711 {
1712         struct lnet_ni *best_ni = sd->sd_best_ni;
1713         struct lnet_peer_ni *best_lpni = sd->sd_best_lpni;
1714         struct lnet_peer_ni *final_dst_lpni = sd->sd_final_dst_lpni;
1715         struct lnet_msg *msg = sd->sd_msg;
1716         int cpt2;
1717         __u32 send_case = sd->sd_send_case;
1718         int rc;
1719         __u32 routing = send_case & REMOTE_DST;
1720          struct lnet_rsp_tracker *rspt;
1721
1722         /*
1723          * Increment sequence number of the selected peer so that we
1724          * pick the next one in Round Robin.
1725          */
1726         best_lpni->lpni_seq++;
1727
1728         /*
1729          * grab a reference on the peer_ni so it sticks around even if
1730          * we need to drop and relock the lnet_net_lock below.
1731          */
1732         lnet_peer_ni_addref_locked(best_lpni);
1733
1734         /*
1735          * Use lnet_cpt_of_nid() to determine the CPT used to commit the
1736          * message. This ensures that we get a CPT that is correct for
1737          * the NI when the NI has been restricted to a subset of all CPTs.
1738          * If the selected CPT differs from the one currently locked, we
1739          * must unlock and relock the lnet_net_lock(), and then check whether
1740          * the configuration has changed. We don't have a hold on the best_ni
1741          * yet, and it may have vanished.
1742          */
1743         cpt2 = lnet_cpt_of_nid_locked(best_lpni->lpni_nid, best_ni);
1744         if (sd->sd_cpt != cpt2) {
1745                 __u32 seq = lnet_get_dlc_seq_locked();
1746                 lnet_net_unlock(sd->sd_cpt);
1747                 sd->sd_cpt = cpt2;
1748                 lnet_net_lock(sd->sd_cpt);
1749                 if (seq != lnet_get_dlc_seq_locked()) {
1750                         lnet_peer_ni_decref_locked(best_lpni);
1751                         return REPEAT_SEND;
1752                 }
1753         }
1754
1755         /*
1756          * store the best_lpni in the message right away to avoid having
1757          * to do the same operation under different conditions
1758          */
1759         msg->msg_txpeer = best_lpni;
1760         msg->msg_txni = best_ni;
1761
1762         /*
1763          * grab a reference for the best_ni since now it's in use in this
1764          * send. The reference will be dropped in lnet_finalize()
1765          */
1766         lnet_ni_addref_locked(msg->msg_txni, sd->sd_cpt);
1767
1768         /*
1769          * Always set the target.nid to the best peer picked. Either the
1770          * NID will be one of the peer NIDs selected, or the same NID as
1771          * what was originally set in the target or it will be the NID of
1772          * a router if this message should be routed
1773          */
1774         msg->msg_target.nid = msg->msg_txpeer->lpni_nid;
1775
1776         /*
1777          * lnet_msg_commit assigns the correct cpt to the message, which
1778          * is used to decrement the correct refcount on the ni when it's
1779          * time to return the credits
1780          */
1781         lnet_msg_commit(msg, sd->sd_cpt);
1782
1783         /*
1784          * If we are routing the message then we keep the src_nid that was
1785          * set by the originator. If we are not routing then we are the
1786          * originator and set it here.
1787          */
1788         if (!msg->msg_routing)
1789                 msg->msg_hdr.src_nid = cpu_to_le64(msg->msg_txni->ni_nid);
1790
1791         if (routing) {
1792                 msg->msg_target_is_router = 1;
1793                 msg->msg_target.pid = LNET_PID_LUSTRE;
1794                 /*
1795                  * since we're routing we want to ensure that the
1796                  * msg_hdr.dest_nid is set to the final destination. When
1797                  * the router receives this message it knows how to route
1798                  * it.
1799                  *
1800                  * final_dst_lpni is set at the beginning of the
1801                  * lnet_select_pathway() function and is never changed.
1802                  * It's safe to use it here.
1803                  */
1804                 msg->msg_hdr.dest_nid = cpu_to_le64(final_dst_lpni->lpni_nid);
1805         } else {
1806                 /*
1807                  * if we're not routing set the dest_nid to the best peer
1808                  * ni NID that we picked earlier in the algorithm.
1809                  */
1810                 msg->msg_hdr.dest_nid = cpu_to_le64(msg->msg_txpeer->lpni_nid);
1811         }
1812
1813         /*
1814          * if we have response tracker block update it with the next hop
1815          * nid
1816          */
1817         if (msg->msg_md) {
1818                 rspt = msg->msg_md->md_rspt_ptr;
1819                 if (rspt) {
1820                         rspt->rspt_next_hop_nid = msg->msg_txpeer->lpni_nid;
1821                         CDEBUG(D_NET, "rspt_next_hop_nid = %s\n",
1822                                libcfs_nid2str(rspt->rspt_next_hop_nid));
1823                 }
1824         }
1825
1826         rc = lnet_post_send_locked(msg, 0);
1827
1828         if (!rc)
1829                 CDEBUG(D_NET, "TRACE: %s(%s:%s) -> %s(%s:%s) : %s try# %d\n",
1830                        libcfs_nid2str(msg->msg_hdr.src_nid),
1831                        libcfs_nid2str(msg->msg_txni->ni_nid),
1832                        libcfs_nid2str(sd->sd_src_nid),
1833                        libcfs_nid2str(msg->msg_hdr.dest_nid),
1834                        libcfs_nid2str(sd->sd_dst_nid),
1835                        libcfs_nid2str(msg->msg_txpeer->lpni_nid),
1836                        lnet_msgtyp2str(msg->msg_type), msg->msg_retry_count);
1837
1838         return rc;
1839 }
1840
1841 static inline void
1842 lnet_set_non_mr_pref_nid(struct lnet_send_data *sd)
1843 {
1844         if (sd->sd_send_case & NMR_DST &&
1845             sd->sd_msg->msg_type != LNET_MSG_REPLY &&
1846             sd->sd_msg->msg_type != LNET_MSG_ACK &&
1847             sd->sd_best_lpni->lpni_pref_nnids == 0) {
1848                 CDEBUG(D_NET, "Setting preferred local NID %s on NMR peer %s\n",
1849                        libcfs_nid2str(sd->sd_best_ni->ni_nid),
1850                        libcfs_nid2str(sd->sd_best_lpni->lpni_nid));
1851                 lnet_peer_ni_set_non_mr_pref_nid(sd->sd_best_lpni,
1852                                                  sd->sd_best_ni->ni_nid);
1853         }
1854 }
1855
1856 /*
1857  * Source Specified
1858  * Local Destination
1859  * non-mr peer
1860  *
1861  * use the source and destination NIDs as the pathway
1862  */
1863 static int
1864 lnet_handle_spec_local_nmr_dst(struct lnet_send_data *sd)
1865 {
1866         /* the destination lpni is set before we get here. */
1867
1868         /* find local NI */
1869         sd->sd_best_ni = lnet_nid2ni_locked(sd->sd_src_nid, sd->sd_cpt);
1870         if (!sd->sd_best_ni) {
1871                 CERROR("Can't send to %s: src %s is not a "
1872                        "local nid\n", libcfs_nid2str(sd->sd_dst_nid),
1873                                 libcfs_nid2str(sd->sd_src_nid));
1874                 return -EINVAL;
1875         }
1876
1877         /*
1878          * the preferred NID will only be set for NMR peers
1879          */
1880         lnet_set_non_mr_pref_nid(sd);
1881
1882         return lnet_handle_send(sd);
1883 }
1884
1885 /*
1886  * Source Specified
1887  * Local Destination
1888  * MR Peer
1889  *
1890  * Run the selection algorithm on the peer NIs unless we're sending
1891  * a response, in this case just send to the destination
1892  */
1893 static int
1894 lnet_handle_spec_local_mr_dst(struct lnet_send_data *sd)
1895 {
1896         sd->sd_best_ni = lnet_nid2ni_locked(sd->sd_src_nid, sd->sd_cpt);
1897         if (!sd->sd_best_ni) {
1898                 CERROR("Can't send to %s: src %s is not a "
1899                        "local nid\n", libcfs_nid2str(sd->sd_dst_nid),
1900                                 libcfs_nid2str(sd->sd_src_nid));
1901                 return -EINVAL;
1902         }
1903
1904         /*
1905          * only run the selection algorithm to pick the peer_ni if we're
1906          * sending a GET or a PUT. Responses are sent to the same
1907          * destination NID provided.
1908          */
1909         if (!(sd->sd_send_case & SND_RESP)) {
1910                 sd->sd_best_lpni =
1911                   lnet_find_best_lpni_on_net(sd, sd->sd_peer,
1912                                              sd->sd_best_ni->ni_net->net_id);
1913         }
1914
1915         if (sd->sd_best_lpni &&
1916             sd->sd_best_lpni->lpni_nid == the_lnet.ln_loni->ni_nid)
1917                 return lnet_handle_lo_send(sd);
1918         else if (sd->sd_best_lpni)
1919                 return lnet_handle_send(sd);
1920
1921         CERROR("can't send to %s. no NI on %s\n",
1922                libcfs_nid2str(sd->sd_dst_nid),
1923                libcfs_net2str(sd->sd_best_ni->ni_net->net_id));
1924
1925         return -EHOSTUNREACH;
1926 }
1927
1928 struct lnet_ni *
1929 lnet_find_best_ni_on_spec_net(struct lnet_ni *cur_best_ni,
1930                               struct lnet_peer *peer,
1931                               struct lnet_peer_net *peer_net,
1932                               int cpt,
1933                               bool incr_seq)
1934 {
1935         struct lnet_net *local_net;
1936         struct lnet_ni *best_ni;
1937
1938         local_net = lnet_get_net_locked(peer_net->lpn_net_id);
1939         if (!local_net)
1940                 return NULL;
1941
1942         /*
1943          * Iterate through the NIs in this local Net and select
1944          * the NI to send from. The selection is determined by
1945          * these 3 criterion in the following priority:
1946          *      1. NUMA
1947          *      2. NI available credits
1948          *      3. Round Robin
1949          */
1950         best_ni = lnet_get_best_ni(local_net, cur_best_ni,
1951                                    peer, peer_net, cpt);
1952
1953         if (incr_seq && best_ni)
1954                 best_ni->ni_seq++;
1955
1956         return best_ni;
1957 }
1958
1959 static int
1960 lnet_initiate_peer_discovery(struct lnet_peer_ni *lpni,
1961                              struct lnet_msg *msg, lnet_nid_t rtr_nid,
1962                              int cpt)
1963 {
1964         struct lnet_peer *peer;
1965         lnet_nid_t primary_nid;
1966         int rc;
1967
1968         lnet_peer_ni_addref_locked(lpni);
1969
1970         peer = lpni->lpni_peer_net->lpn_peer;
1971
1972         if (lnet_peer_gw_discovery(peer)) {
1973                 lnet_peer_ni_decref_locked(lpni);
1974                 return 0;
1975         }
1976
1977         if (!lnet_msg_discovery(msg) || lnet_peer_is_uptodate(peer)) {
1978                 lnet_peer_ni_decref_locked(lpni);
1979                 return 0;
1980         }
1981
1982         rc = lnet_discover_peer_locked(lpni, cpt, false);
1983         if (rc) {
1984                 lnet_peer_ni_decref_locked(lpni);
1985                 return rc;
1986         }
1987         /* The peer may have changed. */
1988         peer = lpni->lpni_peer_net->lpn_peer;
1989         /* queue message and return */
1990         msg->msg_rtr_nid_param = rtr_nid;
1991         msg->msg_sending = 0;
1992         msg->msg_txpeer = NULL;
1993         spin_lock(&peer->lp_lock);
1994         list_add_tail(&msg->msg_list, &peer->lp_dc_pendq);
1995         spin_unlock(&peer->lp_lock);
1996         lnet_peer_ni_decref_locked(lpni);
1997         primary_nid = peer->lp_primary_nid;
1998
1999         CDEBUG(D_NET, "msg %p delayed. %s pending discovery\n",
2000                 msg, libcfs_nid2str(primary_nid));
2001
2002         return LNET_DC_WAIT;
2003 }
2004
2005 static int
2006 lnet_handle_find_routed_path(struct lnet_send_data *sd,
2007                              lnet_nid_t dst_nid,
2008                              struct lnet_peer_ni **gw_lpni,
2009                              struct lnet_peer **gw_peer)
2010 {
2011         int rc;
2012         struct lnet_peer *gw;
2013         struct lnet_route *best_route;
2014         struct lnet_route *last_route;
2015         struct lnet_peer_ni *lpni = NULL;
2016         lnet_nid_t src_nid = sd->sd_src_nid;
2017
2018         best_route = lnet_find_route_locked(NULL, LNET_NIDNET(dst_nid),
2019                                             sd->sd_rtr_nid, &last_route,
2020                                             &lpni);
2021         if (!best_route) {
2022                 CERROR("no route to %s from %s\n",
2023                        libcfs_nid2str(dst_nid), libcfs_nid2str(src_nid));
2024                 return -EHOSTUNREACH;
2025         }
2026
2027         if (!lpni) {
2028                 CERROR("Internal Error. Route expected to %s from %s\n",
2029                         libcfs_nid2str(dst_nid),
2030                         libcfs_nid2str(src_nid));
2031                 return -EFAULT;
2032         }
2033
2034         gw = best_route->lr_gateway;
2035         LASSERT(gw == lpni->lpni_peer_net->lpn_peer);
2036
2037         /*
2038          * Discover this gateway if it hasn't already been discovered.
2039          * This means we might delay the message until discovery has
2040          * completed
2041          */
2042         sd->sd_msg->msg_src_nid_param = sd->sd_src_nid;
2043         rc = lnet_initiate_peer_discovery(lpni, sd->sd_msg, sd->sd_rtr_nid,
2044                                           sd->sd_cpt);
2045         if (rc)
2046                 return rc;
2047
2048         if (!sd->sd_best_ni)
2049                 sd->sd_best_ni = lnet_find_best_ni_on_spec_net(NULL, gw,
2050                                         lnet_peer_get_net_locked(gw,
2051                                                 best_route->lr_lnet),
2052                                         sd->sd_md_cpt,
2053                                         true);
2054
2055         if (!sd->sd_best_ni) {
2056                 CERROR("Internal Error. Expected local ni on %s "
2057                        "but non found :%s\n",
2058                        libcfs_net2str(best_route->lr_lnet),
2059                        libcfs_nid2str(sd->sd_src_nid));
2060                 return -EFAULT;
2061         }
2062
2063         *gw_lpni = lpni;
2064         *gw_peer = gw;
2065
2066         /*
2067          * increment the route sequence number since now we're sure we're
2068          * going to use it
2069          */
2070         LASSERT(best_route && last_route);
2071         best_route->lr_seq = last_route->lr_seq + 1;
2072
2073         return 0;
2074 }
2075
2076 /*
2077  * Handle two cases:
2078  *
2079  * Case 1:
2080  *  Source specified
2081  *  Remote destination
2082  *  Non-MR destination
2083  *
2084  * Case 2:
2085  *  Source specified
2086  *  Remote destination
2087  *  MR destination
2088  *
2089  * The handling of these two cases is similar. Even though the destination
2090  * can be MR or non-MR, we'll deal directly with the router.
2091  */
2092 static int
2093 lnet_handle_spec_router_dst(struct lnet_send_data *sd)
2094 {
2095         int rc;
2096         struct lnet_peer_ni *gw_lpni = NULL;
2097         struct lnet_peer *gw_peer = NULL;
2098
2099         /* find local NI */
2100         sd->sd_best_ni = lnet_nid2ni_locked(sd->sd_src_nid, sd->sd_cpt);
2101         if (!sd->sd_best_ni) {
2102                 CERROR("Can't send to %s: src %s is not a "
2103                        "local nid\n", libcfs_nid2str(sd->sd_dst_nid),
2104                                 libcfs_nid2str(sd->sd_src_nid));
2105                 return -EINVAL;
2106         }
2107
2108         rc = lnet_handle_find_routed_path(sd, sd->sd_dst_nid, &gw_lpni,
2109                                      &gw_peer);
2110         if (rc)
2111                 return rc;
2112
2113         if (sd->sd_send_case & NMR_DST)
2114                 /*
2115                 * since the final destination is non-MR let's set its preferred
2116                 * NID before we send
2117                 */
2118                 lnet_set_non_mr_pref_nid(sd);
2119
2120         /*
2121          * We're going to send to the gw found so let's set its
2122          * info
2123          */
2124         sd->sd_peer = gw_peer;
2125         sd->sd_best_lpni = gw_lpni;
2126
2127         return lnet_handle_send(sd);
2128 }
2129
2130 struct lnet_ni *
2131 lnet_find_best_ni_on_local_net(struct lnet_peer *peer, int md_cpt,
2132                                bool discovery)
2133 {
2134         struct lnet_peer_net *peer_net = NULL;
2135         struct lnet_ni *best_ni = NULL;
2136
2137         /*
2138          * The peer can have multiple interfaces, some of them can be on
2139          * the local network and others on a routed network. We should
2140          * prefer the local network. However if the local network is not
2141          * available then we need to try the routed network
2142          */
2143
2144         /* go through all the peer nets and find the best_ni */
2145         list_for_each_entry(peer_net, &peer->lp_peer_nets, lpn_peer_nets) {
2146                 /*
2147                  * The peer's list of nets can contain non-local nets. We
2148                  * want to only examine the local ones.
2149                  */
2150                 if (!lnet_get_net_locked(peer_net->lpn_net_id))
2151                         continue;
2152                 best_ni = lnet_find_best_ni_on_spec_net(best_ni, peer,
2153                                                    peer_net, md_cpt, false);
2154
2155                 /*
2156                  * if this is a discovery message and lp_disc_net_id is
2157                  * specified then use that net to send the discovery on.
2158                  */
2159                 if (peer->lp_disc_net_id == peer_net->lpn_net_id &&
2160                     discovery)
2161                         break;
2162         }
2163
2164         if (best_ni)
2165                 /* increment sequence number so we can round robin */
2166                 best_ni->ni_seq++;
2167
2168         return best_ni;
2169 }
2170
2171 static struct lnet_ni *
2172 lnet_find_existing_preferred_best_ni(struct lnet_send_data *sd)
2173 {
2174         struct lnet_ni *best_ni = NULL;
2175         struct lnet_peer_net *peer_net;
2176         struct lnet_peer *peer = sd->sd_peer;
2177         struct lnet_peer_ni *best_lpni = sd->sd_best_lpni;
2178         struct lnet_peer_ni *lpni;
2179         int cpt = sd->sd_cpt;
2180
2181         /*
2182          * We must use a consistent source address when sending to a
2183          * non-MR peer. However, a non-MR peer can have multiple NIDs
2184          * on multiple networks, and we may even need to talk to this
2185          * peer on multiple networks -- certain types of
2186          * load-balancing configuration do this.
2187          *
2188          * So we need to pick the NI the peer prefers for this
2189          * particular network.
2190          */
2191
2192         /* Get the target peer_ni */
2193         peer_net = lnet_peer_get_net_locked(peer,
2194                         LNET_NIDNET(best_lpni->lpni_nid));
2195         LASSERT(peer_net != NULL);
2196         list_for_each_entry(lpni, &peer_net->lpn_peer_nis,
2197                                 lpni_peer_nis) {
2198                 if (lpni->lpni_pref_nnids == 0)
2199                         continue;
2200                 LASSERT(lpni->lpni_pref_nnids == 1);
2201                 best_ni = lnet_nid2ni_locked(
2202                                 lpni->lpni_pref.nid, cpt);
2203                 break;
2204         }
2205
2206         return best_ni;
2207 }
2208
2209 /* Prerequisite: sd->sd_peer and sd->sd_best_lpni should be set */
2210 static int
2211 lnet_select_preferred_best_ni(struct lnet_send_data *sd)
2212 {
2213         struct lnet_ni *best_ni = NULL;
2214         struct lnet_peer_ni *best_lpni = sd->sd_best_lpni;
2215
2216         /*
2217          * We must use a consistent source address when sending to a
2218          * non-MR peer. However, a non-MR peer can have multiple NIDs
2219          * on multiple networks, and we may even need to talk to this
2220          * peer on multiple networks -- certain types of
2221          * load-balancing configuration do this.
2222          *
2223          * So we need to pick the NI the peer prefers for this
2224          * particular network.
2225          */
2226
2227         best_ni = lnet_find_existing_preferred_best_ni(sd);
2228
2229         /* if best_ni is still not set just pick one */
2230         if (!best_ni) {
2231                 best_ni =
2232                   lnet_find_best_ni_on_spec_net(NULL, sd->sd_peer,
2233                                                 sd->sd_best_lpni->lpni_peer_net,
2234                                                 sd->sd_md_cpt, true);
2235                 /* If there is no best_ni we don't have a route */
2236                 if (!best_ni) {
2237                         CERROR("no path to %s from net %s\n",
2238                                 libcfs_nid2str(best_lpni->lpni_nid),
2239                                 libcfs_net2str(best_lpni->lpni_net->net_id));
2240                         return -EHOSTUNREACH;
2241                 }
2242         }
2243
2244         sd->sd_best_ni = best_ni;
2245
2246         /* Set preferred NI if necessary. */
2247         lnet_set_non_mr_pref_nid(sd);
2248
2249         return 0;
2250 }
2251
2252
2253 /*
2254  * Source not specified
2255  * Local destination
2256  * Non-MR Peer
2257  *
2258  * always use the same source NID for NMR peers
2259  * If we've talked to that peer before then we already have a preferred
2260  * source NI associated with it. Otherwise, we select a preferred local NI
2261  * and store it in the peer
2262  */
2263 static int
2264 lnet_handle_any_local_nmr_dst(struct lnet_send_data *sd)
2265 {
2266         int rc;
2267
2268         /* sd->sd_best_lpni is already set to the final destination */
2269
2270         /*
2271          * At this point we should've created the peer ni and peer. If we
2272          * can't find it, then something went wrong. Instead of assert
2273          * output a relevant message and fail the send
2274          */
2275         if (!sd->sd_best_lpni) {
2276                 CERROR("Internal fault. Unable to send msg %s to %s. "
2277                        "NID not known\n",
2278                        lnet_msgtyp2str(sd->sd_msg->msg_type),
2279                        libcfs_nid2str(sd->sd_dst_nid));
2280                 return -EFAULT;
2281         }
2282
2283         rc = lnet_select_preferred_best_ni(sd);
2284         if (!rc)
2285                 rc = lnet_handle_send(sd);
2286
2287         return rc;
2288 }
2289
2290 static int
2291 lnet_handle_any_mr_dsta(struct lnet_send_data *sd)
2292 {
2293         /*
2294          * NOTE we've already handled the remote peer case. So we only
2295          * need to worry about the local case here.
2296          *
2297          * if we're sending a response, ACK or reply, we need to send it
2298          * to the destination NID given to us. At this point we already
2299          * have the peer_ni we're suppose to send to, so just find the
2300          * best_ni on the peer net and use that. Since we're sending to an
2301          * MR peer then we can just run the selection algorithm on our
2302          * local NIs and pick the best one.
2303          */
2304         if (sd->sd_send_case & SND_RESP) {
2305                 sd->sd_best_ni =
2306                   lnet_find_best_ni_on_spec_net(NULL, sd->sd_peer,
2307                                                 sd->sd_best_lpni->lpni_peer_net,
2308                                                 sd->sd_md_cpt, true);
2309
2310                 if (!sd->sd_best_ni) {
2311                         /*
2312                          * We're not going to deal with not able to send
2313                          * a response to the provided final destination
2314                          */
2315                         CERROR("Can't send response to %s. "
2316                                "No local NI available\n",
2317                                 libcfs_nid2str(sd->sd_dst_nid));
2318                         return -EHOSTUNREACH;
2319                 }
2320
2321                 return lnet_handle_send(sd);
2322         }
2323
2324         /*
2325          * If we get here that means we're sending a fresh request, PUT or
2326          * GET, so we need to run our standard selection algorithm.
2327          * First find the best local interface that's on any of the peer's
2328          * networks.
2329          */
2330         sd->sd_best_ni = lnet_find_best_ni_on_local_net(sd->sd_peer,
2331                                         sd->sd_md_cpt,
2332                                         lnet_msg_discovery(sd->sd_msg));
2333         if (sd->sd_best_ni) {
2334                 sd->sd_best_lpni =
2335                   lnet_find_best_lpni_on_net(sd, sd->sd_peer,
2336                                              sd->sd_best_ni->ni_net->net_id);
2337
2338                 /*
2339                  * if we're successful in selecting a peer_ni on the local
2340                  * network, then send to it. Otherwise fall through and
2341                  * try and see if we can reach it over another routed
2342                  * network
2343                  */
2344                 if (sd->sd_best_lpni &&
2345                     sd->sd_best_lpni->lpni_nid == the_lnet.ln_loni->ni_nid) {
2346                         /*
2347                          * in case we initially started with a routed
2348                          * destination, let's reset to local
2349                          */
2350                         sd->sd_send_case &= ~REMOTE_DST;
2351                         sd->sd_send_case |= LOCAL_DST;
2352                         return lnet_handle_lo_send(sd);
2353                 } else if (sd->sd_best_lpni) {
2354                         /*
2355                          * in case we initially started with a routed
2356                          * destination, let's reset to local
2357                          */
2358                         sd->sd_send_case &= ~REMOTE_DST;
2359                         sd->sd_send_case |= LOCAL_DST;
2360                         return lnet_handle_send(sd);
2361                 }
2362
2363                 CERROR("Internal Error. Expected to have a best_lpni: "
2364                        "%s -> %s\n",
2365                        libcfs_nid2str(sd->sd_src_nid),
2366                        libcfs_nid2str(sd->sd_dst_nid));
2367
2368                 return -EFAULT;
2369         }
2370
2371         /*
2372          * Peer doesn't have a local network. Let's see if there is
2373          * a remote network we can reach it on.
2374          */
2375         return PASS_THROUGH;
2376 }
2377
2378 /*
2379  * Case 1:
2380  *      Source NID not specified
2381  *      Local destination
2382  *      MR peer
2383  *
2384  * Case 2:
2385  *      Source NID not speified
2386  *      Remote destination
2387  *      MR peer
2388  *
2389  * In both of these cases if we're sending a response, ACK or REPLY, then
2390  * we need to send to the destination NID provided.
2391  *
2392  * In the remote case let's deal with MR routers.
2393  *
2394  */
2395
2396 static int
2397 lnet_handle_any_mr_dst(struct lnet_send_data *sd)
2398 {
2399         int rc = 0;
2400         struct lnet_peer *gw_peer = NULL;
2401         struct lnet_peer_ni *gw_lpni = NULL;
2402
2403         /*
2404          * handle sending a response to a remote peer here so we don't
2405          * have to worry about it if we hit lnet_handle_any_mr_dsta()
2406          */
2407         if (sd->sd_send_case & REMOTE_DST &&
2408             sd->sd_send_case & SND_RESP) {
2409                 struct lnet_peer_ni *gw;
2410                 struct lnet_peer *gw_peer;
2411
2412                 rc = lnet_handle_find_routed_path(sd, sd->sd_dst_nid, &gw,
2413                                                   &gw_peer);
2414                 if (rc < 0) {
2415                         CERROR("Can't send response to %s. "
2416                                "No route available\n",
2417                                 libcfs_nid2str(sd->sd_dst_nid));
2418                         return -EHOSTUNREACH;
2419                 } else if (rc > 0) {
2420                         return rc;
2421                 }
2422
2423                 sd->sd_best_lpni = gw;
2424                 sd->sd_peer = gw_peer;
2425
2426                 return lnet_handle_send(sd);
2427         }
2428
2429         /*
2430          * Even though the NID for the peer might not be on a local network,
2431          * since the peer is MR there could be other interfaces on the
2432          * local network. In that case we'd still like to prefer the local
2433          * network over the routed network. If we're unable to do that
2434          * then we select the best router among the different routed networks,
2435          * and if the router is MR then we can deal with it as such.
2436          */
2437         rc = lnet_handle_any_mr_dsta(sd);
2438         if (rc != PASS_THROUGH)
2439                 return rc;
2440
2441         /*
2442          * TODO; One possible enhancement is to run the selection
2443          * algorithm on the peer. However for remote peers the credits are
2444          * not decremented, so we'll be basically going over the peer NIs
2445          * in round robin. An MR router will run the selection algorithm
2446          * on the next-hop interfaces.
2447          */
2448         rc = lnet_handle_find_routed_path(sd, sd->sd_dst_nid, &gw_lpni,
2449                                           &gw_peer);
2450         if (rc)
2451                 return rc;
2452
2453         sd->sd_send_case &= ~LOCAL_DST;
2454         sd->sd_send_case |= REMOTE_DST;
2455
2456         sd->sd_peer = gw_peer;
2457         sd->sd_best_lpni = gw_lpni;
2458
2459         return lnet_handle_send(sd);
2460 }
2461
2462 /*
2463  * Source not specified
2464  * Remote destination
2465  * Non-MR peer
2466  *
2467  * Must send to the specified peer NID using the same source NID that
2468  * we've used before. If it's the first time to talk to that peer then
2469  * find the source NI and assign it as preferred to that peer
2470  */
2471 static int
2472 lnet_handle_any_router_nmr_dst(struct lnet_send_data *sd)
2473 {
2474         int rc;
2475         struct lnet_peer_ni *gw_lpni = NULL;
2476         struct lnet_peer *gw_peer = NULL;
2477
2478         /*
2479          * Let's set if we have a preferred NI to talk to this NMR peer
2480          */
2481         sd->sd_best_ni = lnet_find_existing_preferred_best_ni(sd);
2482
2483         /*
2484          * find the router and that'll find the best NI if we didn't find
2485          * it already.
2486          */
2487         rc = lnet_handle_find_routed_path(sd, sd->sd_dst_nid, &gw_lpni,
2488                                           &gw_peer);
2489         if (rc)
2490                 return rc;
2491
2492         /*
2493          * set the best_ni we've chosen as the preferred one for
2494          * this peer
2495          */
2496         lnet_set_non_mr_pref_nid(sd);
2497
2498         /* we'll be sending to the gw */
2499         sd->sd_best_lpni = gw_lpni;
2500         sd->sd_peer = gw_peer;
2501
2502         return lnet_handle_send(sd);
2503 }
2504
2505 static int
2506 lnet_handle_send_case_locked(struct lnet_send_data *sd)
2507 {
2508         /*
2509          * turn off the SND_RESP bit.
2510          * It will be checked in the case handling
2511          */
2512         __u32 send_case = sd->sd_send_case &= ~SND_RESP ;
2513
2514         CDEBUG(D_NET, "Source %s%s to %s %s %s destination\n",
2515                 (send_case & SRC_SPEC) ? "Specified: " : "ANY",
2516                 (send_case & SRC_SPEC) ? libcfs_nid2str(sd->sd_src_nid) : "",
2517                 (send_case & MR_DST) ? "MR: " : "NMR: ",
2518                 libcfs_nid2str(sd->sd_dst_nid),
2519                 (send_case & LOCAL_DST) ? "local" : "routed");
2520
2521         switch (send_case) {
2522         /*
2523          * For all cases where the source is specified, we should always
2524          * use the destination NID, whether it's an MR destination or not,
2525          * since we're continuing a series of related messages for the
2526          * same RPC
2527          */
2528         case SRC_SPEC_LOCAL_NMR_DST:
2529                 return lnet_handle_spec_local_nmr_dst(sd);
2530         case SRC_SPEC_LOCAL_MR_DST:
2531                 return lnet_handle_spec_local_mr_dst(sd);
2532         case SRC_SPEC_ROUTER_NMR_DST:
2533         case SRC_SPEC_ROUTER_MR_DST:
2534                 return lnet_handle_spec_router_dst(sd);
2535         case SRC_ANY_LOCAL_NMR_DST:
2536                 return lnet_handle_any_local_nmr_dst(sd);
2537         case SRC_ANY_LOCAL_MR_DST:
2538         case SRC_ANY_ROUTER_MR_DST:
2539                 return lnet_handle_any_mr_dst(sd);
2540         case SRC_ANY_ROUTER_NMR_DST:
2541                 return lnet_handle_any_router_nmr_dst(sd);
2542         default:
2543                 CERROR("Unknown send case\n");
2544                 return -1;
2545         }
2546 }
2547
2548 static int
2549 lnet_select_pathway(lnet_nid_t src_nid, lnet_nid_t dst_nid,
2550                     struct lnet_msg *msg, lnet_nid_t rtr_nid)
2551 {
2552         struct lnet_peer_ni     *lpni;
2553         struct lnet_peer        *peer;
2554         struct lnet_send_data   send_data;
2555         int                     cpt, rc;
2556         int                     md_cpt;
2557         __u32                   send_case = 0;
2558
2559         memset(&send_data, 0, sizeof(send_data));
2560
2561         /*
2562          * get an initial CPT to use for locking. The idea here is not to
2563          * serialize the calls to select_pathway, so that as many
2564          * operations can run concurrently as possible. To do that we use
2565          * the CPT where this call is being executed. Later on when we
2566          * determine the CPT to use in lnet_message_commit, we switch the
2567          * lock and check if there was any configuration change.  If none,
2568          * then we proceed, if there is, then we restart the operation.
2569          */
2570         cpt = lnet_net_lock_current();
2571
2572         md_cpt = lnet_cpt_of_md(msg->msg_md, msg->msg_offset);
2573         if (md_cpt == CFS_CPT_ANY)
2574                 md_cpt = cpt;
2575
2576 again:
2577
2578         /*
2579          * If we're being asked to send to the loopback interface, there
2580          * is no need to go through any selection. We can just shortcut
2581          * the entire process and send over lolnd
2582          */
2583         send_data.sd_msg = msg;
2584         send_data.sd_cpt = cpt;
2585         if (LNET_NETTYP(LNET_NIDNET(dst_nid)) == LOLND) {
2586                 rc = lnet_handle_lo_send(&send_data);
2587                 lnet_net_unlock(cpt);
2588                 return rc;
2589         }
2590
2591         /*
2592          * find an existing peer_ni, or create one and mark it as having been
2593          * created due to network traffic. This call will create the
2594          * peer->peer_net->peer_ni tree.
2595          */
2596         lpni = lnet_nid2peerni_locked(dst_nid, LNET_NID_ANY, cpt);
2597         if (IS_ERR(lpni)) {
2598                 lnet_net_unlock(cpt);
2599                 return PTR_ERR(lpni);
2600         }
2601
2602         /*
2603          * Cache the original src_nid. If we need to resend the message
2604          * then we'll need to know whether the src_nid was originally
2605          * specified for this message. If it was originally specified,
2606          * then we need to keep using the same src_nid since it's
2607          * continuing the same sequence of messages.
2608          */
2609         msg->msg_src_nid_param = src_nid;
2610
2611         /*
2612          * Now that we have a peer_ni, check if we want to discover
2613          * the peer. Traffic to the LNET_RESERVED_PORTAL should not
2614          * trigger discovery.
2615          */
2616         peer = lpni->lpni_peer_net->lpn_peer;
2617         rc = lnet_initiate_peer_discovery(lpni, msg, rtr_nid, cpt);
2618         if (rc) {
2619                 lnet_peer_ni_decref_locked(lpni);
2620                 lnet_net_unlock(cpt);
2621                 return rc;
2622         }
2623         lnet_peer_ni_decref_locked(lpni);
2624
2625         /*
2626          * Identify the different send cases
2627          */
2628         if (src_nid == LNET_NID_ANY)
2629                 send_case |= SRC_ANY;
2630         else
2631                 send_case |= SRC_SPEC;
2632
2633         if (lnet_get_net_locked(LNET_NIDNET(dst_nid)))
2634                 send_case |= LOCAL_DST;
2635         else
2636                 send_case |= REMOTE_DST;
2637
2638         /*
2639          * if this is a non-MR peer or if we're recovering a peer ni then
2640          * let's consider this an NMR case so we can hit the destination
2641          * NID.
2642          */
2643         if (!lnet_peer_is_multi_rail(peer) || msg->msg_recovery)
2644                 send_case |= NMR_DST;
2645         else
2646                 send_case |= MR_DST;
2647
2648         if (msg->msg_type == LNET_MSG_REPLY ||
2649             msg->msg_type == LNET_MSG_ACK)
2650                 send_case |= SND_RESP;
2651
2652         /* assign parameters to the send_data */
2653         send_data.sd_rtr_nid = rtr_nid;
2654         send_data.sd_src_nid = src_nid;
2655         send_data.sd_dst_nid = dst_nid;
2656         send_data.sd_best_lpni = lpni;
2657         /*
2658          * keep a pointer to the final destination in case we're going to
2659          * route, so we'll need to access it later
2660          */
2661         send_data.sd_final_dst_lpni = lpni;
2662         send_data.sd_peer = peer;
2663         send_data.sd_md_cpt = md_cpt;
2664         send_data.sd_send_case = send_case;
2665
2666         rc = lnet_handle_send_case_locked(&send_data);
2667
2668         /*
2669          * Update the local cpt since send_data.sd_cpt might've been
2670          * updated as a result of calling lnet_handle_send_case_locked().
2671          */
2672         cpt = send_data.sd_cpt;
2673
2674         if (rc == REPEAT_SEND)
2675                 goto again;
2676
2677         lnet_net_unlock(cpt);
2678
2679         return rc;
2680 }
2681
2682 int
2683 lnet_send(lnet_nid_t src_nid, struct lnet_msg *msg, lnet_nid_t rtr_nid)
2684 {
2685         lnet_nid_t              dst_nid = msg->msg_target.nid;
2686         int                     rc;
2687
2688         /*
2689          * NB: rtr_nid is set to LNET_NID_ANY for all current use-cases,
2690          * but we might want to use pre-determined router for ACK/REPLY
2691          * in the future
2692          */
2693         /* NB: ni != NULL == interface pre-determined (ACK/REPLY) */
2694         LASSERT(msg->msg_txpeer == NULL);
2695         LASSERT(msg->msg_txni == NULL);
2696         LASSERT(!msg->msg_sending);
2697         LASSERT(!msg->msg_target_is_router);
2698         LASSERT(!msg->msg_receiving);
2699
2700         msg->msg_sending = 1;
2701
2702         LASSERT(!msg->msg_tx_committed);
2703
2704         rc = lnet_select_pathway(src_nid, dst_nid, msg, rtr_nid);
2705         if (rc < 0)
2706                 return rc;
2707
2708         if (rc == LNET_CREDIT_OK)
2709                 lnet_ni_send(msg->msg_txni, msg);
2710
2711         /* rc == LNET_CREDIT_OK or LNET_CREDIT_WAIT or LNET_DC_WAIT */
2712         return 0;
2713 }
2714
2715 enum lnet_mt_event_type {
2716         MT_TYPE_LOCAL_NI = 0,
2717         MT_TYPE_PEER_NI
2718 };
2719
2720 struct lnet_mt_event_info {
2721         enum lnet_mt_event_type mt_type;
2722         lnet_nid_t mt_nid;
2723 };
2724
2725 /* called with res_lock held */
2726 void
2727 lnet_detach_rsp_tracker(struct lnet_libmd *md, int cpt)
2728 {
2729         struct lnet_rsp_tracker *rspt;
2730
2731         /*
2732          * msg has a refcount on the MD so the MD is not going away.
2733          * The rspt queue for the cpt is protected by
2734          * the lnet_net_lock(cpt). cpt is the cpt of the MD cookie.
2735          */
2736         if (!md->md_rspt_ptr)
2737                 return;
2738
2739         rspt = md->md_rspt_ptr;
2740         md->md_rspt_ptr = NULL;
2741
2742         /* debug code */
2743         LASSERT(rspt->rspt_cpt == cpt);
2744
2745         /*
2746          * invalidate the handle to indicate that a response has been
2747          * received, which will then lead the monitor thread to clean up
2748          * the rspt block.
2749          */
2750         LNetInvalidateMDHandle(&rspt->rspt_mdh);
2751 }
2752
2753 static void
2754 lnet_finalize_expired_responses(bool force)
2755 {
2756         struct lnet_libmd *md;
2757         struct list_head local_queue;
2758         struct lnet_rsp_tracker *rspt, *tmp;
2759         int i;
2760
2761         if (the_lnet.ln_mt_rstq == NULL)
2762                 return;
2763
2764         cfs_cpt_for_each(i, lnet_cpt_table()) {
2765                 INIT_LIST_HEAD(&local_queue);
2766
2767                 lnet_net_lock(i);
2768                 if (!the_lnet.ln_mt_rstq[i]) {
2769                         lnet_net_unlock(i);
2770                         continue;
2771                 }
2772                 list_splice_init(the_lnet.ln_mt_rstq[i], &local_queue);
2773                 lnet_net_unlock(i);
2774
2775                 list_for_each_entry_safe(rspt, tmp, &local_queue, rspt_on_list) {
2776                         /*
2777                          * The rspt mdh will be invalidated when a response
2778                          * is received or whenever we want to discard the
2779                          * block the monitor thread will walk the queue
2780                          * and clean up any rsts with an invalid mdh.
2781                          * The monitor thread will walk the queue until
2782                          * the first unexpired rspt block. This means that
2783                          * some rspt blocks which received their
2784                          * corresponding responses will linger in the
2785                          * queue until they are cleaned up eventually.
2786                          */
2787                         lnet_res_lock(i);
2788                         if (LNetMDHandleIsInvalid(rspt->rspt_mdh)) {
2789                                 lnet_res_unlock(i);
2790                                 list_del_init(&rspt->rspt_on_list);
2791                                 lnet_rspt_free(rspt, i);
2792                                 continue;
2793                         }
2794
2795                         if (ktime_compare(ktime_get(), rspt->rspt_deadline) >= 0 ||
2796                             force) {
2797                                 struct lnet_peer_ni *lpni;
2798                                 lnet_nid_t nid;
2799
2800                                 md = lnet_handle2md(&rspt->rspt_mdh);
2801                                 if (!md) {
2802                                         LNetInvalidateMDHandle(&rspt->rspt_mdh);
2803                                         lnet_res_unlock(i);
2804                                         list_del_init(&rspt->rspt_on_list);
2805                                         lnet_rspt_free(rspt, i);
2806                                         continue;
2807                                 }
2808                                 LASSERT(md->md_rspt_ptr == rspt);
2809                                 md->md_rspt_ptr = NULL;
2810                                 lnet_res_unlock(i);
2811
2812                                 lnet_net_lock(i);
2813                                 the_lnet.ln_counters[i]->lct_health.lch_response_timeout_count++;
2814                                 lnet_net_unlock(i);
2815
2816                                 list_del_init(&rspt->rspt_on_list);
2817
2818                                 nid = rspt->rspt_next_hop_nid;
2819
2820                                 CNETERR("Response timed out: md = %p: nid = %s\n",
2821                                         md, libcfs_nid2str(nid));
2822                                 LNetMDUnlink(rspt->rspt_mdh);
2823                                 lnet_rspt_free(rspt, i);
2824
2825                                 /*
2826                                  * If there is a timeout on the response
2827                                  * from the next hop decrement its health
2828                                  * value so that we don't use it
2829                                  */
2830                                 lnet_net_lock(0);
2831                                 lpni = lnet_find_peer_ni_locked(nid);
2832                                 if (lpni) {
2833                                         lnet_handle_remote_failure_locked(lpni);
2834                                         lnet_peer_ni_decref_locked(lpni);
2835                                 }
2836                                 lnet_net_unlock(0);
2837                         } else {
2838                                 lnet_res_unlock(i);
2839                                 break;
2840                         }
2841                 }
2842
2843                 lnet_net_lock(i);
2844                 if (!list_empty(&local_queue))
2845                         list_splice(&local_queue, the_lnet.ln_mt_rstq[i]);
2846                 lnet_net_unlock(i);
2847         }
2848 }
2849
2850 static void
2851 lnet_resend_pending_msgs_locked(struct list_head *resendq, int cpt)
2852 {
2853         struct lnet_msg *msg;
2854
2855         while (!list_empty(resendq)) {
2856                 struct lnet_peer_ni *lpni;
2857
2858                 msg = list_entry(resendq->next, struct lnet_msg,
2859                                  msg_list);
2860
2861                 list_del_init(&msg->msg_list);
2862
2863                 lpni = lnet_find_peer_ni_locked(msg->msg_hdr.dest_nid);
2864                 if (!lpni) {
2865                         lnet_net_unlock(cpt);
2866                         CERROR("Expected that a peer is already created for %s\n",
2867                                libcfs_nid2str(msg->msg_hdr.dest_nid));
2868                         msg->msg_no_resend = true;
2869                         lnet_finalize(msg, -EFAULT);
2870                         lnet_net_lock(cpt);
2871                 } else {
2872                         struct lnet_peer *peer;
2873                         int rc;
2874                         lnet_nid_t src_nid = LNET_NID_ANY;
2875
2876                         /*
2877                          * if this message is not being routed and the
2878                          * peer is non-MR then we must use the same
2879                          * src_nid that was used in the original send.
2880                          * Otherwise if we're routing the message (IE
2881                          * we're a router) then we can use any of our
2882                          * local interfaces. It doesn't matter to the
2883                          * final destination.
2884                          */
2885                         peer = lpni->lpni_peer_net->lpn_peer;
2886                         if (!msg->msg_routing &&
2887                             !lnet_peer_is_multi_rail(peer))
2888                                 src_nid = le64_to_cpu(msg->msg_hdr.src_nid);
2889
2890                         /*
2891                          * If we originally specified a src NID, then we
2892                          * must attempt to reuse it in the resend as well.
2893                          */
2894                         if (msg->msg_src_nid_param != LNET_NID_ANY)
2895                                 src_nid = msg->msg_src_nid_param;
2896                         lnet_peer_ni_decref_locked(lpni);
2897
2898                         lnet_net_unlock(cpt);
2899                         CDEBUG(D_NET, "resending %s->%s: %s recovery %d try# %d\n",
2900                                libcfs_nid2str(src_nid),
2901                                libcfs_id2str(msg->msg_target),
2902                                lnet_msgtyp2str(msg->msg_type),
2903                                msg->msg_recovery,
2904                                msg->msg_retry_count);
2905                         rc = lnet_send(src_nid, msg, LNET_NID_ANY);
2906                         if (rc) {
2907                                 CERROR("Error sending %s to %s: %d\n",
2908                                        lnet_msgtyp2str(msg->msg_type),
2909                                        libcfs_id2str(msg->msg_target), rc);
2910                                 msg->msg_no_resend = true;
2911                                 lnet_finalize(msg, rc);
2912                         }
2913                         lnet_net_lock(cpt);
2914                         if (!rc)
2915                                 the_lnet.ln_counters[cpt]->lct_health.lch_resend_count++;
2916                 }
2917         }
2918 }
2919
2920 static void
2921 lnet_resend_pending_msgs(void)
2922 {
2923         int i;
2924
2925         cfs_cpt_for_each(i, lnet_cpt_table()) {
2926                 lnet_net_lock(i);
2927                 lnet_resend_pending_msgs_locked(the_lnet.ln_mt_resendqs[i], i);
2928                 lnet_net_unlock(i);
2929         }
2930 }
2931
2932 /* called with cpt and ni_lock held */
2933 static void
2934 lnet_unlink_ni_recovery_mdh_locked(struct lnet_ni *ni, int cpt, bool force)
2935 {
2936         struct lnet_handle_md recovery_mdh;
2937
2938         LNetInvalidateMDHandle(&recovery_mdh);
2939
2940         if (ni->ni_recovery_state & LNET_NI_RECOVERY_PENDING ||
2941             force) {
2942                 recovery_mdh = ni->ni_ping_mdh;
2943                 LNetInvalidateMDHandle(&ni->ni_ping_mdh);
2944         }
2945         lnet_ni_unlock(ni);
2946         lnet_net_unlock(cpt);
2947         if (!LNetMDHandleIsInvalid(recovery_mdh))
2948                 LNetMDUnlink(recovery_mdh);
2949         lnet_net_lock(cpt);
2950         lnet_ni_lock(ni);
2951 }
2952
2953 static void
2954 lnet_recover_local_nis(void)
2955 {
2956         struct lnet_mt_event_info *ev_info;
2957         struct list_head processed_list;
2958         struct list_head local_queue;
2959         struct lnet_handle_md mdh;
2960         struct lnet_ni *tmp;
2961         struct lnet_ni *ni;
2962         lnet_nid_t nid;
2963         int healthv;
2964         int rc;
2965
2966         INIT_LIST_HEAD(&local_queue);
2967         INIT_LIST_HEAD(&processed_list);
2968
2969         /*
2970          * splice the recovery queue on a local queue. We will iterate
2971          * through the local queue and update it as needed. Once we're
2972          * done with the traversal, we'll splice the local queue back on
2973          * the head of the ln_mt_localNIRecovq. Any newly added local NIs
2974          * will be traversed in the next iteration.
2975          */
2976         lnet_net_lock(0);
2977         list_splice_init(&the_lnet.ln_mt_localNIRecovq,
2978                          &local_queue);
2979         lnet_net_unlock(0);
2980
2981         list_for_each_entry_safe(ni, tmp, &local_queue, ni_recovery) {
2982                 /*
2983                  * if an NI is being deleted or it is now healthy, there
2984                  * is no need to keep it around in the recovery queue.
2985                  * The monitor thread is the only thread responsible for
2986                  * removing the NI from the recovery queue.
2987                  * Multiple threads can be adding NIs to the recovery
2988                  * queue.
2989                  */
2990                 healthv = atomic_read(&ni->ni_healthv);
2991
2992                 lnet_net_lock(0);
2993                 lnet_ni_lock(ni);
2994                 if (ni->ni_state != LNET_NI_STATE_ACTIVE ||
2995                     healthv == LNET_MAX_HEALTH_VALUE) {
2996                         list_del_init(&ni->ni_recovery);
2997                         lnet_unlink_ni_recovery_mdh_locked(ni, 0, false);
2998                         lnet_ni_unlock(ni);
2999                         lnet_ni_decref_locked(ni, 0);
3000                         lnet_net_unlock(0);
3001                         continue;
3002                 }
3003
3004                 /*
3005                  * if the local NI failed recovery we must unlink the md.
3006                  * But we want to keep the local_ni on the recovery queue
3007                  * so we can continue the attempts to recover it.
3008                  */
3009                 if (ni->ni_recovery_state & LNET_NI_RECOVERY_FAILED) {
3010                         lnet_unlink_ni_recovery_mdh_locked(ni, 0, true);
3011                         ni->ni_recovery_state &= ~LNET_NI_RECOVERY_FAILED;
3012                 }
3013
3014                 lnet_ni_unlock(ni);
3015                 lnet_net_unlock(0);
3016
3017
3018                 CDEBUG(D_NET, "attempting to recover local ni: %s\n",
3019                        libcfs_nid2str(ni->ni_nid));
3020
3021                 lnet_ni_lock(ni);
3022                 if (!(ni->ni_recovery_state & LNET_NI_RECOVERY_PENDING)) {
3023                         ni->ni_recovery_state |= LNET_NI_RECOVERY_PENDING;
3024                         lnet_ni_unlock(ni);
3025
3026                         LIBCFS_ALLOC(ev_info, sizeof(*ev_info));
3027                         if (!ev_info) {
3028                                 CERROR("out of memory. Can't recover %s\n",
3029                                        libcfs_nid2str(ni->ni_nid));
3030                                 lnet_ni_lock(ni);
3031                                 ni->ni_recovery_state &=
3032                                   ~LNET_NI_RECOVERY_PENDING;
3033                                 lnet_ni_unlock(ni);
3034                                 continue;
3035                         }
3036
3037                         mdh = ni->ni_ping_mdh;
3038                         /*
3039                          * Invalidate the ni mdh in case it's deleted.
3040                          * We'll unlink the mdh in this case below.
3041                          */
3042                         LNetInvalidateMDHandle(&ni->ni_ping_mdh);
3043                         nid = ni->ni_nid;
3044
3045                         /*
3046                          * remove the NI from the local queue and drop the
3047                          * reference count to it while we're recovering
3048                          * it. The reason for that, is that the NI could
3049                          * be deleted, and the way the code is structured
3050                          * is if we don't drop the NI, then the deletion
3051                          * code will enter a loop waiting for the
3052                          * reference count to be removed while holding the
3053                          * ln_mutex_lock(). When we look up the peer to
3054                          * send to in lnet_select_pathway() we will try to
3055                          * lock the ln_mutex_lock() as well, leading to
3056                          * a deadlock. By dropping the refcount and
3057                          * removing it from the list, we allow for the NI
3058                          * to be removed, then we use the cached NID to
3059                          * look it up again. If it's gone, then we just
3060                          * continue examining the rest of the queue.
3061                          */
3062                         lnet_net_lock(0);
3063                         list_del_init(&ni->ni_recovery);
3064                         lnet_ni_decref_locked(ni, 0);
3065                         lnet_net_unlock(0);
3066
3067                         ev_info->mt_type = MT_TYPE_LOCAL_NI;
3068                         ev_info->mt_nid = nid;
3069                         rc = lnet_send_ping(nid, &mdh, LNET_INTERFACES_MIN,
3070                                             ev_info, the_lnet.ln_mt_eqh, true);
3071                         /* lookup the nid again */
3072                         lnet_net_lock(0);
3073                         ni = lnet_nid2ni_locked(nid, 0);
3074                         if (!ni) {
3075                                 /*
3076                                  * the NI has been deleted when we dropped
3077                                  * the ref count
3078                                  */
3079                                 lnet_net_unlock(0);
3080                                 LNetMDUnlink(mdh);
3081                                 continue;
3082                         }
3083                         /*
3084                          * Same note as in lnet_recover_peer_nis(). When
3085                          * we're sending the ping, the NI is free to be
3086                          * deleted or manipulated. By this point it
3087                          * could've been added back on the recovery queue,
3088                          * and a refcount taken on it.
3089                          * So we can't just add it blindly again or we'll
3090                          * corrupt the queue. We must check under lock if
3091                          * it's not on any list and if not then add it
3092                          * to the processed list, which will eventually be
3093                          * spliced back on to the recovery queue.
3094                          */
3095                         ni->ni_ping_mdh = mdh;
3096                         if (list_empty(&ni->ni_recovery)) {
3097                                 list_add_tail(&ni->ni_recovery, &processed_list);
3098                                 lnet_ni_addref_locked(ni, 0);
3099                         }
3100                         lnet_net_unlock(0);
3101
3102                         lnet_ni_lock(ni);
3103                         if (rc)
3104                                 ni->ni_recovery_state &= ~LNET_NI_RECOVERY_PENDING;
3105                 }
3106                 lnet_ni_unlock(ni);
3107         }
3108
3109         /*
3110          * put back the remaining NIs on the ln_mt_localNIRecovq to be
3111          * reexamined in the next iteration.
3112          */
3113         list_splice_init(&processed_list, &local_queue);
3114         lnet_net_lock(0);
3115         list_splice(&local_queue, &the_lnet.ln_mt_localNIRecovq);
3116         lnet_net_unlock(0);
3117 }
3118
3119 static struct list_head **
3120 lnet_create_array_of_queues(void)
3121 {
3122         struct list_head **qs;
3123         struct list_head *q;
3124         int i;
3125
3126         qs = cfs_percpt_alloc(lnet_cpt_table(),
3127                               sizeof(struct list_head));
3128         if (!qs) {
3129                 CERROR("Failed to allocate queues\n");
3130                 return NULL;
3131         }
3132
3133         cfs_percpt_for_each(q, i, qs)
3134                 INIT_LIST_HEAD(q);
3135
3136         return qs;
3137 }
3138
3139 static int
3140 lnet_resendqs_create(void)
3141 {
3142         struct list_head **resendqs;
3143         resendqs = lnet_create_array_of_queues();
3144
3145         if (!resendqs)
3146                 return -ENOMEM;
3147
3148         lnet_net_lock(LNET_LOCK_EX);
3149         the_lnet.ln_mt_resendqs = resendqs;
3150         lnet_net_unlock(LNET_LOCK_EX);
3151
3152         return 0;
3153 }
3154
3155 static void
3156 lnet_clean_local_ni_recoveryq(void)
3157 {
3158         struct lnet_ni *ni;
3159
3160         /* This is only called when the monitor thread has stopped */
3161         lnet_net_lock(0);
3162
3163         while (!list_empty(&the_lnet.ln_mt_localNIRecovq)) {
3164                 ni = list_entry(the_lnet.ln_mt_localNIRecovq.next,
3165                                 struct lnet_ni, ni_recovery);
3166                 list_del_init(&ni->ni_recovery);
3167                 lnet_ni_lock(ni);
3168                 lnet_unlink_ni_recovery_mdh_locked(ni, 0, true);
3169                 lnet_ni_unlock(ni);
3170                 lnet_ni_decref_locked(ni, 0);
3171         }
3172
3173         lnet_net_unlock(0);
3174 }
3175
3176 static void
3177 lnet_unlink_lpni_recovery_mdh_locked(struct lnet_peer_ni *lpni, int cpt,
3178                                      bool force)
3179 {
3180         struct lnet_handle_md recovery_mdh;
3181
3182         LNetInvalidateMDHandle(&recovery_mdh);
3183
3184         if (lpni->lpni_state & LNET_PEER_NI_RECOVERY_PENDING || force) {
3185                 recovery_mdh = lpni->lpni_recovery_ping_mdh;
3186                 LNetInvalidateMDHandle(&lpni->lpni_recovery_ping_mdh);
3187         }
3188         spin_unlock(&lpni->lpni_lock);
3189         lnet_net_unlock(cpt);
3190         if (!LNetMDHandleIsInvalid(recovery_mdh))
3191                 LNetMDUnlink(recovery_mdh);
3192         lnet_net_lock(cpt);
3193         spin_lock(&lpni->lpni_lock);
3194 }
3195
3196 static void
3197 lnet_clean_peer_ni_recoveryq(void)
3198 {
3199         struct lnet_peer_ni *lpni, *tmp;
3200
3201         lnet_net_lock(LNET_LOCK_EX);
3202
3203         list_for_each_entry_safe(lpni, tmp, &the_lnet.ln_mt_peerNIRecovq,
3204                                  lpni_recovery) {
3205                 list_del_init(&lpni->lpni_recovery);
3206                 spin_lock(&lpni->lpni_lock);
3207                 lnet_unlink_lpni_recovery_mdh_locked(lpni, LNET_LOCK_EX, true);
3208                 spin_unlock(&lpni->lpni_lock);
3209                 lnet_peer_ni_decref_locked(lpni);
3210         }
3211
3212         lnet_net_unlock(LNET_LOCK_EX);
3213 }
3214
3215 static void
3216 lnet_clean_resendqs(void)
3217 {
3218         struct lnet_msg *msg, *tmp;
3219         struct list_head msgs;
3220         int i;
3221
3222         INIT_LIST_HEAD(&msgs);
3223
3224         cfs_cpt_for_each(i, lnet_cpt_table()) {
3225                 lnet_net_lock(i);
3226                 list_splice_init(the_lnet.ln_mt_resendqs[i], &msgs);
3227                 lnet_net_unlock(i);
3228                 list_for_each_entry_safe(msg, tmp, &msgs, msg_list) {
3229                         list_del_init(&msg->msg_list);
3230                         msg->msg_no_resend = true;
3231                         lnet_finalize(msg, -ESHUTDOWN);
3232                 }
3233         }
3234
3235         cfs_percpt_free(the_lnet.ln_mt_resendqs);
3236 }
3237
3238 static void
3239 lnet_recover_peer_nis(void)
3240 {
3241         struct lnet_mt_event_info *ev_info;
3242         struct list_head processed_list;
3243         struct list_head local_queue;
3244         struct lnet_handle_md mdh;
3245         struct lnet_peer_ni *lpni;
3246         struct lnet_peer_ni *tmp;
3247         lnet_nid_t nid;
3248         int healthv;
3249         int rc;
3250
3251         INIT_LIST_HEAD(&local_queue);
3252         INIT_LIST_HEAD(&processed_list);
3253
3254         /*
3255          * Always use cpt 0 for locking across all interactions with
3256          * ln_mt_peerNIRecovq
3257          */
3258         lnet_net_lock(0);
3259         list_splice_init(&the_lnet.ln_mt_peerNIRecovq,
3260                          &local_queue);
3261         lnet_net_unlock(0);
3262
3263         list_for_each_entry_safe(lpni, tmp, &local_queue,
3264                                  lpni_recovery) {
3265                 /*
3266                  * The same protection strategy is used here as is in the
3267                  * local recovery case.
3268                  */
3269                 lnet_net_lock(0);
3270                 healthv = atomic_read(&lpni->lpni_healthv);
3271                 spin_lock(&lpni->lpni_lock);
3272                 if (lpni->lpni_state & LNET_PEER_NI_DELETING ||
3273                     healthv == LNET_MAX_HEALTH_VALUE) {
3274                         list_del_init(&lpni->lpni_recovery);
3275                         lnet_unlink_lpni_recovery_mdh_locked(lpni, 0, false);
3276                         spin_unlock(&lpni->lpni_lock);
3277                         lnet_peer_ni_decref_locked(lpni);
3278                         lnet_net_unlock(0);
3279                         continue;
3280                 }
3281
3282                 /*
3283                  * If the peer NI has failed recovery we must unlink the
3284                  * md. But we want to keep the peer ni on the recovery
3285                  * queue so we can try to continue recovering it
3286                  */
3287                 if (lpni->lpni_state & LNET_PEER_NI_RECOVERY_FAILED) {
3288                         lnet_unlink_lpni_recovery_mdh_locked(lpni, 0, true);
3289                         lpni->lpni_state &= ~LNET_PEER_NI_RECOVERY_FAILED;
3290                 }
3291
3292                 spin_unlock(&lpni->lpni_lock);
3293                 lnet_net_unlock(0);
3294
3295                 /*
3296                  * NOTE: we're racing with peer deletion from user space.
3297                  * It's possible that a peer is deleted after we check its
3298                  * state. In this case the recovery can create a new peer
3299                  */
3300                 spin_lock(&lpni->lpni_lock);
3301                 if (!(lpni->lpni_state & LNET_PEER_NI_RECOVERY_PENDING) &&
3302                     !(lpni->lpni_state & LNET_PEER_NI_DELETING)) {
3303                         lpni->lpni_state |= LNET_PEER_NI_RECOVERY_PENDING;
3304                         spin_unlock(&lpni->lpni_lock);
3305
3306                         LIBCFS_ALLOC(ev_info, sizeof(*ev_info));
3307                         if (!ev_info) {
3308                                 CERROR("out of memory. Can't recover %s\n",
3309                                        libcfs_nid2str(lpni->lpni_nid));
3310                                 spin_lock(&lpni->lpni_lock);
3311                                 lpni->lpni_state &= ~LNET_PEER_NI_RECOVERY_PENDING;
3312                                 spin_unlock(&lpni->lpni_lock);
3313                                 continue;
3314                         }
3315
3316                         /* look at the comments in lnet_recover_local_nis() */
3317                         mdh = lpni->lpni_recovery_ping_mdh;
3318                         LNetInvalidateMDHandle(&lpni->lpni_recovery_ping_mdh);
3319                         nid = lpni->lpni_nid;
3320                         lnet_net_lock(0);
3321                         list_del_init(&lpni->lpni_recovery);
3322                         lnet_peer_ni_decref_locked(lpni);
3323                         lnet_net_unlock(0);
3324
3325                         ev_info->mt_type = MT_TYPE_PEER_NI;
3326                         ev_info->mt_nid = nid;
3327                         rc = lnet_send_ping(nid, &mdh, LNET_INTERFACES_MIN,
3328                                             ev_info, the_lnet.ln_mt_eqh, true);
3329                         lnet_net_lock(0);
3330                         /*
3331                          * lnet_find_peer_ni_locked() grabs a refcount for
3332                          * us. No need to take it explicitly.
3333                          */
3334                         lpni = lnet_find_peer_ni_locked(nid);
3335                         if (!lpni) {
3336                                 lnet_net_unlock(0);
3337                                 LNetMDUnlink(mdh);
3338                                 continue;
3339                         }
3340
3341                         lpni->lpni_recovery_ping_mdh = mdh;
3342                         /*
3343                          * While we're unlocked the lpni could've been
3344                          * readded on the recovery queue. In this case we
3345                          * don't need to add it to the local queue, since
3346                          * it's already on there and the thread that added
3347                          * it would've incremented the refcount on the
3348                          * peer, which means we need to decref the refcount
3349                          * that was implicitly grabbed by find_peer_ni_locked.
3350                          * Otherwise, if the lpni is still not on
3351                          * the recovery queue, then we'll add it to the
3352                          * processed list.
3353                          */
3354                         if (list_empty(&lpni->lpni_recovery))
3355                                 list_add_tail(&lpni->lpni_recovery, &processed_list);
3356                         else
3357                                 lnet_peer_ni_decref_locked(lpni);
3358                         lnet_net_unlock(0);
3359
3360                         spin_lock(&lpni->lpni_lock);
3361                         if (rc)
3362                                 lpni->lpni_state &= ~LNET_PEER_NI_RECOVERY_PENDING;
3363                 }
3364                 spin_unlock(&lpni->lpni_lock);
3365         }
3366
3367         list_splice_init(&processed_list, &local_queue);
3368         lnet_net_lock(0);
3369         list_splice(&local_queue, &the_lnet.ln_mt_peerNIRecovq);
3370         lnet_net_unlock(0);
3371 }
3372
3373 static int
3374 lnet_monitor_thread(void *arg)
3375 {
3376         time64_t recovery_timeout = 0;
3377         time64_t rsp_timeout = 0;
3378         int interval;
3379         time64_t now;
3380
3381         /*
3382          * The monitor thread takes care of the following:
3383          *  1. Checks the aliveness of routers
3384          *  2. Checks if there are messages on the resend queue to resend
3385          *     them.
3386          *  3. Check if there are any NIs on the local recovery queue and
3387          *     pings them
3388          *  4. Checks if there are any NIs on the remote recovery queue
3389          *     and pings them.
3390          */
3391         cfs_block_allsigs();
3392
3393         while (the_lnet.ln_mt_state == LNET_MT_STATE_RUNNING) {
3394                 now = ktime_get_real_seconds();
3395
3396                 if (lnet_router_checker_active())
3397                         lnet_check_routers();
3398
3399                 lnet_resend_pending_msgs();
3400
3401                 if (now >= rsp_timeout) {
3402                         lnet_finalize_expired_responses(false);
3403                         rsp_timeout = now + (lnet_transaction_timeout / 2);
3404                 }
3405
3406                 if (now >= recovery_timeout) {
3407                         lnet_recover_local_nis();
3408                         lnet_recover_peer_nis();
3409                         recovery_timeout = now + lnet_recovery_interval;
3410                 }
3411
3412                 /*
3413                  * TODO do we need to check if we should sleep without
3414                  * timeout?  Technically, an active system will always
3415                  * have messages in flight so this check will always
3416                  * evaluate to false. And on an idle system do we care
3417                  * if we wake up every 1 second? Although, we've seen
3418                  * cases where we get a complaint that an idle thread
3419                  * is waking up unnecessarily.
3420                  *
3421                  * Take into account the current net_count when you wake
3422                  * up for alive router checking, since we need to check
3423                  * possibly as many networks as we have configured.
3424                  */
3425                 interval = min(lnet_recovery_interval,
3426                                min((unsigned int) alive_router_check_interval /
3427                                         lnet_current_net_count,
3428                                    lnet_transaction_timeout / 2));
3429                 wait_event_interruptible_timeout(the_lnet.ln_mt_waitq,
3430                                                 false,
3431                                                 cfs_time_seconds(interval));
3432         }
3433
3434         /* Shutting down */
3435         lnet_net_lock(LNET_LOCK_EX);
3436         the_lnet.ln_mt_state = LNET_MT_STATE_SHUTDOWN;
3437         lnet_net_unlock(LNET_LOCK_EX);
3438
3439         /* signal that the monitor thread is exiting */
3440         up(&the_lnet.ln_mt_signal);
3441
3442         return 0;
3443 }
3444
3445 /*
3446  * lnet_send_ping
3447  * Sends a ping.
3448  * Returns == 0 if success
3449  * Returns > 0 if LNetMDBind or prior fails
3450  * Returns < 0 if LNetGet fails
3451  */
3452 int
3453 lnet_send_ping(lnet_nid_t dest_nid,
3454                struct lnet_handle_md *mdh, int nnis,
3455                void *user_data, struct lnet_handle_eq eqh, bool recovery)
3456 {
3457         struct lnet_md md = { NULL };
3458         struct lnet_process_id id;
3459         struct lnet_ping_buffer *pbuf;
3460         int rc;
3461
3462         if (dest_nid == LNET_NID_ANY) {
3463                 rc = -EHOSTUNREACH;
3464                 goto fail_error;
3465         }
3466
3467         pbuf = lnet_ping_buffer_alloc(nnis, GFP_NOFS);
3468         if (!pbuf) {
3469                 rc = ENOMEM;
3470                 goto fail_error;
3471         }
3472
3473         /* initialize md content */
3474         md.start     = &pbuf->pb_info;
3475         md.length    = LNET_PING_INFO_SIZE(nnis);
3476         md.threshold = 2; /* GET/REPLY */
3477         md.max_size  = 0;
3478         md.options   = LNET_MD_TRUNCATE;
3479         md.user_ptr  = user_data;
3480         md.eq_handle = eqh;
3481
3482         rc = LNetMDBind(md, LNET_UNLINK, mdh);
3483         if (rc) {
3484                 lnet_ping_buffer_decref(pbuf);
3485                 CERROR("Can't bind MD: %d\n", rc);
3486                 rc = -rc; /* change the rc to positive */
3487                 goto fail_error;
3488         }
3489         id.pid = LNET_PID_LUSTRE;
3490         id.nid = dest_nid;
3491
3492         rc = LNetGet(LNET_NID_ANY, *mdh, id,
3493                      LNET_RESERVED_PORTAL,
3494                      LNET_PROTO_PING_MATCHBITS, 0, recovery);
3495
3496         if (rc)
3497                 goto fail_unlink_md;
3498
3499         return 0;
3500
3501 fail_unlink_md:
3502         LNetMDUnlink(*mdh);
3503         LNetInvalidateMDHandle(mdh);
3504 fail_error:
3505         return rc;
3506 }
3507
3508 static void
3509 lnet_handle_recovery_reply(struct lnet_mt_event_info *ev_info,
3510                            int status, bool unlink_event)
3511 {
3512         lnet_nid_t nid = ev_info->mt_nid;
3513
3514         if (ev_info->mt_type == MT_TYPE_LOCAL_NI) {
3515                 struct lnet_ni *ni;
3516
3517                 lnet_net_lock(0);
3518                 ni = lnet_nid2ni_locked(nid, 0);
3519                 if (!ni) {
3520                         lnet_net_unlock(0);
3521                         return;
3522                 }
3523                 lnet_ni_lock(ni);
3524                 ni->ni_recovery_state &= ~LNET_NI_RECOVERY_PENDING;
3525                 if (status)
3526                         ni->ni_recovery_state |= LNET_NI_RECOVERY_FAILED;
3527                 lnet_ni_unlock(ni);
3528                 lnet_net_unlock(0);
3529
3530                 if (status != 0) {
3531                         CERROR("local NI (%s) recovery failed with %d\n",
3532                                libcfs_nid2str(nid), status);
3533                         return;
3534                 }
3535                 /*
3536                  * need to increment healthv for the ni here, because in
3537                  * the lnet_finalize() path we don't have access to this
3538                  * NI. And in order to get access to it, we'll need to
3539                  * carry forward too much information.
3540                  * In the peer case, it'll naturally be incremented
3541                  */
3542                 if (!unlink_event)
3543                         lnet_inc_healthv(&ni->ni_healthv);
3544         } else {
3545                 struct lnet_peer_ni *lpni;
3546                 int cpt;
3547
3548                 cpt = lnet_net_lock_current();
3549                 lpni = lnet_find_peer_ni_locked(nid);
3550                 if (!lpni) {
3551                         lnet_net_unlock(cpt);
3552                         return;
3553                 }
3554                 spin_lock(&lpni->lpni_lock);
3555                 lpni->lpni_state &= ~LNET_PEER_NI_RECOVERY_PENDING;
3556                 if (status)
3557                         lpni->lpni_state |= LNET_PEER_NI_RECOVERY_FAILED;
3558                 spin_unlock(&lpni->lpni_lock);
3559                 lnet_peer_ni_decref_locked(lpni);
3560                 lnet_net_unlock(cpt);
3561
3562                 if (status != 0)
3563                         CERROR("peer NI (%s) recovery failed with %d\n",
3564                                libcfs_nid2str(nid), status);
3565         }
3566 }
3567
3568 void
3569 lnet_mt_event_handler(struct lnet_event *event)
3570 {
3571         struct lnet_mt_event_info *ev_info = event->md.user_ptr;
3572         struct lnet_ping_buffer *pbuf;
3573
3574         /* TODO: remove assert */
3575         LASSERT(event->type == LNET_EVENT_REPLY ||
3576                 event->type == LNET_EVENT_SEND ||
3577                 event->type == LNET_EVENT_UNLINK);
3578
3579         CDEBUG(D_NET, "Received event: %d status: %d\n", event->type,
3580                event->status);
3581
3582         switch (event->type) {
3583         case LNET_EVENT_UNLINK:
3584                 CDEBUG(D_NET, "%s recovery ping unlinked\n",
3585                        libcfs_nid2str(ev_info->mt_nid));
3586         case LNET_EVENT_REPLY:
3587                 lnet_handle_recovery_reply(ev_info, event->status,
3588                                            event->type == LNET_EVENT_UNLINK);
3589                 break;
3590         case LNET_EVENT_SEND:
3591                 CDEBUG(D_NET, "%s recovery message sent %s:%d\n",
3592                                libcfs_nid2str(ev_info->mt_nid),
3593                                (event->status) ? "unsuccessfully" :
3594                                "successfully", event->status);
3595                 break;
3596         default:
3597                 CERROR("Unexpected event: %d\n", event->type);
3598                 break;
3599         }
3600         if (event->unlinked) {
3601                 LIBCFS_FREE(ev_info, sizeof(*ev_info));
3602                 pbuf = LNET_PING_INFO_TO_BUFFER(event->md.start);
3603                 lnet_ping_buffer_decref(pbuf);
3604         }
3605 }
3606
3607 static int
3608 lnet_rsp_tracker_create(void)
3609 {
3610         struct list_head **rstqs;
3611         rstqs = lnet_create_array_of_queues();
3612
3613         if (!rstqs)
3614                 return -ENOMEM;
3615
3616         the_lnet.ln_mt_rstq = rstqs;
3617
3618         return 0;
3619 }
3620
3621 static void
3622 lnet_rsp_tracker_clean(void)
3623 {
3624         lnet_finalize_expired_responses(true);
3625
3626         cfs_percpt_free(the_lnet.ln_mt_rstq);
3627         the_lnet.ln_mt_rstq = NULL;
3628 }
3629
3630 int lnet_monitor_thr_start(void)
3631 {
3632         int rc = 0;
3633         struct task_struct *task;
3634
3635         if (the_lnet.ln_mt_state != LNET_MT_STATE_SHUTDOWN)
3636                 return -EALREADY;
3637
3638         rc = lnet_resendqs_create();
3639         if (rc)
3640                 return rc;
3641
3642         rc = lnet_rsp_tracker_create();
3643         if (rc)
3644                 goto clean_queues;
3645
3646         sema_init(&the_lnet.ln_mt_signal, 0);
3647
3648         lnet_net_lock(LNET_LOCK_EX);
3649         the_lnet.ln_mt_state = LNET_MT_STATE_RUNNING;
3650         lnet_net_unlock(LNET_LOCK_EX);
3651         task = kthread_run(lnet_monitor_thread, NULL, "monitor_thread");
3652         if (IS_ERR(task)) {
3653                 rc = PTR_ERR(task);
3654                 CERROR("Can't start monitor thread: %d\n", rc);
3655                 goto clean_thread;
3656         }
3657
3658         return 0;
3659
3660 clean_thread:
3661         lnet_net_lock(LNET_LOCK_EX);
3662         the_lnet.ln_mt_state = LNET_MT_STATE_STOPPING;
3663         lnet_net_unlock(LNET_LOCK_EX);
3664         /* block until event callback signals exit */
3665         down(&the_lnet.ln_mt_signal);
3666         /* clean up */
3667         lnet_net_lock(LNET_LOCK_EX);
3668         the_lnet.ln_mt_state = LNET_MT_STATE_SHUTDOWN;
3669         lnet_net_unlock(LNET_LOCK_EX);
3670         lnet_rsp_tracker_clean();
3671         lnet_clean_local_ni_recoveryq();
3672         lnet_clean_peer_ni_recoveryq();
3673         lnet_clean_resendqs();
3674         LNetInvalidateEQHandle(&the_lnet.ln_mt_eqh);
3675         return rc;
3676 clean_queues:
3677         lnet_rsp_tracker_clean();
3678         lnet_clean_local_ni_recoveryq();
3679         lnet_clean_peer_ni_recoveryq();
3680         lnet_clean_resendqs();
3681         return rc;
3682 }
3683
3684 void lnet_monitor_thr_stop(void)
3685 {
3686         if (the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN)
3687                 return;
3688
3689         LASSERT(the_lnet.ln_mt_state == LNET_MT_STATE_RUNNING);
3690         lnet_net_lock(LNET_LOCK_EX);
3691         the_lnet.ln_mt_state = LNET_MT_STATE_STOPPING;
3692         lnet_net_unlock(LNET_LOCK_EX);
3693
3694         /* tell the monitor thread that we're shutting down */
3695         wake_up(&the_lnet.ln_mt_waitq);
3696
3697         /* block until monitor thread signals that it's done */
3698         down(&the_lnet.ln_mt_signal);
3699         LASSERT(the_lnet.ln_mt_state == LNET_MT_STATE_SHUTDOWN);
3700
3701         /* perform cleanup tasks */
3702         lnet_rsp_tracker_clean();
3703         lnet_clean_local_ni_recoveryq();
3704         lnet_clean_peer_ni_recoveryq();
3705         lnet_clean_resendqs();
3706
3707         return;
3708 }
3709
3710 void
3711 lnet_drop_message(struct lnet_ni *ni, int cpt, void *private, unsigned int nob,
3712                   __u32 msg_type)
3713 {
3714         lnet_net_lock(cpt);
3715         lnet_incr_stats(&ni->ni_stats, msg_type, LNET_STATS_TYPE_DROP);
3716         the_lnet.ln_counters[cpt]->lct_common.lcc_drop_count++;
3717         the_lnet.ln_counters[cpt]->lct_common.lcc_drop_length += nob;
3718         lnet_net_unlock(cpt);
3719
3720         lnet_ni_recv(ni, private, NULL, 0, 0, 0, nob);
3721 }
3722
3723 static void
3724 lnet_recv_put(struct lnet_ni *ni, struct lnet_msg *msg)
3725 {
3726         struct lnet_hdr *hdr = &msg->msg_hdr;
3727
3728         if (msg->msg_wanted != 0)
3729                 lnet_setpayloadbuffer(msg);
3730
3731         lnet_build_msg_event(msg, LNET_EVENT_PUT);
3732
3733         /* Must I ACK?  If so I'll grab the ack_wmd out of the header and put
3734          * it back into the ACK during lnet_finalize() */
3735         msg->msg_ack = (!lnet_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
3736                         (msg->msg_md->md_options & LNET_MD_ACK_DISABLE) == 0);
3737
3738         lnet_ni_recv(ni, msg->msg_private, msg, msg->msg_rx_delayed,
3739                      msg->msg_offset, msg->msg_wanted, hdr->payload_length);
3740 }
3741
3742 static int
3743 lnet_parse_put(struct lnet_ni *ni, struct lnet_msg *msg)
3744 {
3745         struct lnet_hdr         *hdr = &msg->msg_hdr;
3746         struct lnet_match_info  info;
3747         int                     rc;
3748         bool                    ready_delay;
3749
3750         /* Convert put fields to host byte order */
3751         hdr->msg.put.match_bits = le64_to_cpu(hdr->msg.put.match_bits);
3752         hdr->msg.put.ptl_index  = le32_to_cpu(hdr->msg.put.ptl_index);
3753         hdr->msg.put.offset     = le32_to_cpu(hdr->msg.put.offset);
3754
3755         /* Primary peer NID. */
3756         info.mi_id.nid  = msg->msg_initiator;
3757         info.mi_id.pid  = hdr->src_pid;
3758         info.mi_opc     = LNET_MD_OP_PUT;
3759         info.mi_portal  = hdr->msg.put.ptl_index;
3760         info.mi_rlength = hdr->payload_length;
3761         info.mi_roffset = hdr->msg.put.offset;
3762         info.mi_mbits   = hdr->msg.put.match_bits;
3763         info.mi_cpt     = lnet_cpt_of_nid(msg->msg_initiator, ni);
3764
3765         msg->msg_rx_ready_delay = ni->ni_net->net_lnd->lnd_eager_recv == NULL;
3766         ready_delay = msg->msg_rx_ready_delay;
3767
3768  again:
3769         rc = lnet_ptl_match_md(&info, msg);
3770         switch (rc) {
3771         default:
3772                 LBUG();
3773
3774         case LNET_MATCHMD_OK:
3775                 lnet_recv_put(ni, msg);
3776                 return 0;
3777
3778         case LNET_MATCHMD_NONE:
3779                 if (ready_delay)
3780                         /* no eager_recv or has already called it, should
3781                          * have been attached on delayed list */
3782                         return 0;
3783
3784                 rc = lnet_ni_eager_recv(ni, msg);
3785                 if (rc == 0) {
3786                         ready_delay = true;
3787                         goto again;
3788                 }
3789                 /* fall through */
3790
3791         case LNET_MATCHMD_DROP:
3792                 CNETERR("Dropping PUT from %s portal %d match %llu"
3793                         " offset %d length %d: %d\n",
3794                         libcfs_id2str(info.mi_id), info.mi_portal,
3795                         info.mi_mbits, info.mi_roffset, info.mi_rlength, rc);
3796
3797                 return -ENOENT; /* -ve: OK but no match */
3798         }
3799 }
3800
3801 static int
3802 lnet_parse_get(struct lnet_ni *ni, struct lnet_msg *msg, int rdma_get)
3803 {
3804         struct lnet_match_info info;
3805         struct lnet_hdr *hdr = &msg->msg_hdr;
3806         struct lnet_process_id source_id;
3807         struct lnet_handle_wire reply_wmd;
3808         int rc;
3809
3810         /* Convert get fields to host byte order */
3811         hdr->msg.get.match_bits   = le64_to_cpu(hdr->msg.get.match_bits);
3812         hdr->msg.get.ptl_index    = le32_to_cpu(hdr->msg.get.ptl_index);
3813         hdr->msg.get.sink_length  = le32_to_cpu(hdr->msg.get.sink_length);
3814         hdr->msg.get.src_offset   = le32_to_cpu(hdr->msg.get.src_offset);
3815
3816         source_id.nid = hdr->src_nid;
3817         source_id.pid = hdr->src_pid;
3818         /* Primary peer NID */
3819         info.mi_id.nid  = msg->msg_initiator;
3820         info.mi_id.pid  = hdr->src_pid;
3821         info.mi_opc     = LNET_MD_OP_GET;
3822         info.mi_portal  = hdr->msg.get.ptl_index;
3823         info.mi_rlength = hdr->msg.get.sink_length;
3824         info.mi_roffset = hdr->msg.get.src_offset;
3825         info.mi_mbits   = hdr->msg.get.match_bits;
3826         info.mi_cpt     = lnet_cpt_of_nid(msg->msg_initiator, ni);
3827
3828         rc = lnet_ptl_match_md(&info, msg);
3829         if (rc == LNET_MATCHMD_DROP) {
3830                 CNETERR("Dropping GET from %s portal %d match %llu"
3831                         " offset %d length %d\n",
3832                         libcfs_id2str(info.mi_id), info.mi_portal,
3833                         info.mi_mbits, info.mi_roffset, info.mi_rlength);
3834                 return -ENOENT; /* -ve: OK but no match */
3835         }
3836
3837         LASSERT(rc == LNET_MATCHMD_OK);
3838
3839         lnet_build_msg_event(msg, LNET_EVENT_GET);
3840
3841         reply_wmd = hdr->msg.get.return_wmd;
3842
3843         lnet_prep_send(msg, LNET_MSG_REPLY, source_id,
3844                        msg->msg_offset, msg->msg_wanted);
3845
3846         msg->msg_hdr.msg.reply.dst_wmd = reply_wmd;
3847
3848         if (rdma_get) {
3849                 /* The LND completes the REPLY from her recv procedure */
3850                 lnet_ni_recv(ni, msg->msg_private, msg, 0,
3851                              msg->msg_offset, msg->msg_len, msg->msg_len);
3852                 return 0;
3853         }
3854
3855         lnet_ni_recv(ni, msg->msg_private, NULL, 0, 0, 0, 0);
3856         msg->msg_receiving = 0;
3857
3858         rc = lnet_send(ni->ni_nid, msg, msg->msg_from);
3859         if (rc < 0) {
3860                 /* didn't get as far as lnet_ni_send() */
3861                 CERROR("%s: Unable to send REPLY for GET from %s: %d\n",
3862                        libcfs_nid2str(ni->ni_nid),
3863                        libcfs_id2str(info.mi_id), rc);
3864
3865                 lnet_finalize(msg, rc);
3866         }
3867
3868         return 0;
3869 }
3870
3871 static int
3872 lnet_parse_reply(struct lnet_ni *ni, struct lnet_msg *msg)
3873 {
3874         void *private = msg->msg_private;
3875         struct lnet_hdr *hdr = &msg->msg_hdr;
3876         struct lnet_process_id src = {0};
3877         struct lnet_libmd *md;
3878         int rlength;
3879         int mlength;
3880         int cpt;
3881
3882         cpt = lnet_cpt_of_cookie(hdr->msg.reply.dst_wmd.wh_object_cookie);
3883         lnet_res_lock(cpt);
3884
3885         src.nid = hdr->src_nid;
3886         src.pid = hdr->src_pid;
3887
3888         /* NB handles only looked up by creator (no flips) */
3889         md = lnet_wire_handle2md(&hdr->msg.reply.dst_wmd);
3890         if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
3891                 CNETERR("%s: Dropping REPLY from %s for %s "
3892                         "MD %#llx.%#llx\n",
3893                         libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
3894                         (md == NULL) ? "invalid" : "inactive",
3895                         hdr->msg.reply.dst_wmd.wh_interface_cookie,
3896                         hdr->msg.reply.dst_wmd.wh_object_cookie);
3897                 if (md != NULL && md->md_me != NULL)
3898                         CERROR("REPLY MD also attached to portal %d\n",
3899                                md->md_me->me_portal);
3900
3901                 lnet_res_unlock(cpt);
3902                 return -ENOENT; /* -ve: OK but no match */
3903         }
3904
3905         LASSERT(md->md_offset == 0);
3906
3907         rlength = hdr->payload_length;
3908         mlength = MIN(rlength, (int)md->md_length);
3909
3910         if (mlength < rlength &&
3911             (md->md_options & LNET_MD_TRUNCATE) == 0) {
3912                 CNETERR("%s: Dropping REPLY from %s length %d "
3913                         "for MD %#llx would overflow (%d)\n",
3914                         libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
3915                         rlength, hdr->msg.reply.dst_wmd.wh_object_cookie,
3916                         mlength);
3917                 lnet_res_unlock(cpt);
3918                 return -ENOENT; /* -ve: OK but no match */
3919         }
3920
3921         CDEBUG(D_NET, "%s: Reply from %s of length %d/%d into md %#llx\n",
3922                libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
3923                mlength, rlength, hdr->msg.reply.dst_wmd.wh_object_cookie);
3924
3925         lnet_msg_attach_md(msg, md, 0, mlength);
3926
3927         if (mlength != 0)
3928                 lnet_setpayloadbuffer(msg);
3929
3930         lnet_res_unlock(cpt);
3931
3932         lnet_build_msg_event(msg, LNET_EVENT_REPLY);
3933
3934         lnet_ni_recv(ni, private, msg, 0, 0, mlength, rlength);
3935         return 0;
3936 }
3937
3938 static int
3939 lnet_parse_ack(struct lnet_ni *ni, struct lnet_msg *msg)
3940 {
3941         struct lnet_hdr *hdr = &msg->msg_hdr;
3942         struct lnet_process_id src = {0};
3943         struct lnet_libmd *md;
3944         int cpt;
3945
3946         src.nid = hdr->src_nid;
3947         src.pid = hdr->src_pid;
3948
3949         /* Convert ack fields to host byte order */
3950         hdr->msg.ack.match_bits = le64_to_cpu(hdr->msg.ack.match_bits);
3951         hdr->msg.ack.mlength = le32_to_cpu(hdr->msg.ack.mlength);
3952
3953         cpt = lnet_cpt_of_cookie(hdr->msg.ack.dst_wmd.wh_object_cookie);
3954         lnet_res_lock(cpt);
3955
3956         /* NB handles only looked up by creator (no flips) */
3957         md = lnet_wire_handle2md(&hdr->msg.ack.dst_wmd);
3958         if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
3959                 /* Don't moan; this is expected */
3960                 CDEBUG(D_NET,
3961                        "%s: Dropping ACK from %s to %s MD %#llx.%#llx\n",
3962                        libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
3963                        (md == NULL) ? "invalid" : "inactive",
3964                        hdr->msg.ack.dst_wmd.wh_interface_cookie,
3965                        hdr->msg.ack.dst_wmd.wh_object_cookie);
3966                 if (md != NULL && md->md_me != NULL)
3967                         CERROR("Source MD also attached to portal %d\n",
3968                                md->md_me->me_portal);
3969
3970                 lnet_res_unlock(cpt);
3971                 return -ENOENT;                  /* -ve! */
3972         }
3973
3974         CDEBUG(D_NET, "%s: ACK from %s into md %#llx\n",
3975                libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
3976                hdr->msg.ack.dst_wmd.wh_object_cookie);
3977
3978         lnet_msg_attach_md(msg, md, 0, 0);
3979
3980         lnet_res_unlock(cpt);
3981
3982         lnet_build_msg_event(msg, LNET_EVENT_ACK);
3983
3984         lnet_ni_recv(ni, msg->msg_private, msg, 0, 0, 0, msg->msg_len);
3985         return 0;
3986 }
3987
3988 /**
3989  * \retval LNET_CREDIT_OK       If \a msg is forwarded
3990  * \retval LNET_CREDIT_WAIT     If \a msg is blocked because w/o buffer
3991  * \retval -ve                  error code
3992  */
3993 int
3994 lnet_parse_forward_locked(struct lnet_ni *ni, struct lnet_msg *msg)
3995 {
3996         int     rc = 0;
3997
3998         if (!the_lnet.ln_routing)
3999                 return -ECANCELED;
4000
4001         if (msg->msg_rxpeer->lpni_rtrcredits <= 0 ||
4002             lnet_msg2bufpool(msg)->rbp_credits <= 0) {
4003                 if (ni->ni_net->net_lnd->lnd_eager_recv == NULL) {
4004                         msg->msg_rx_ready_delay = 1;
4005                 } else {
4006                         lnet_net_unlock(msg->msg_rx_cpt);
4007                         rc = lnet_ni_eager_recv(ni, msg);
4008                         lnet_net_lock(msg->msg_rx_cpt);
4009                 }
4010         }
4011
4012         if (rc == 0)
4013                 rc = lnet_post_routed_recv_locked(msg, 0);
4014         return rc;
4015 }
4016
4017 int
4018 lnet_parse_local(struct lnet_ni *ni, struct lnet_msg *msg)
4019 {
4020         int     rc;
4021
4022         switch (msg->msg_type) {
4023         case LNET_MSG_ACK:
4024                 rc = lnet_parse_ack(ni, msg);
4025                 break;
4026         case LNET_MSG_PUT:
4027                 rc = lnet_parse_put(ni, msg);
4028                 break;
4029         case LNET_MSG_GET:
4030                 rc = lnet_parse_get(ni, msg, msg->msg_rdma_get);
4031                 break;
4032         case LNET_MSG_REPLY:
4033                 rc = lnet_parse_reply(ni, msg);
4034                 break;
4035         default: /* prevent an unused label if !kernel */
4036                 LASSERT(0);
4037                 return -EPROTO;
4038         }
4039
4040         LASSERT(rc == 0 || rc == -ENOENT);
4041         return rc;
4042 }
4043
4044 char *
4045 lnet_msgtyp2str (int type)
4046 {
4047         switch (type) {
4048         case LNET_MSG_ACK:
4049                 return ("ACK");
4050         case LNET_MSG_PUT:
4051                 return ("PUT");
4052         case LNET_MSG_GET:
4053                 return ("GET");
4054         case LNET_MSG_REPLY:
4055                 return ("REPLY");
4056         case LNET_MSG_HELLO:
4057                 return ("HELLO");
4058         default:
4059                 return ("<UNKNOWN>");
4060         }
4061 }
4062
4063 void
4064 lnet_print_hdr(struct lnet_hdr *hdr)
4065 {
4066         struct lnet_process_id src = {
4067                 .nid = hdr->src_nid,
4068                 .pid = hdr->src_pid,
4069         };
4070         struct lnet_process_id dst = {
4071                 .nid = hdr->dest_nid,
4072                 .pid = hdr->dest_pid,
4073         };
4074         char *type_str = lnet_msgtyp2str(hdr->type);
4075
4076         CWARN("P3 Header at %p of type %s\n", hdr, type_str);
4077         CWARN("    From %s\n", libcfs_id2str(src));
4078         CWARN("    To   %s\n", libcfs_id2str(dst));
4079
4080         switch (hdr->type) {
4081         default:
4082                 break;
4083
4084         case LNET_MSG_PUT:
4085                 CWARN("    Ptl index %d, ack md %#llx.%#llx, "
4086                       "match bits %llu\n",
4087                       hdr->msg.put.ptl_index,
4088                       hdr->msg.put.ack_wmd.wh_interface_cookie,
4089                       hdr->msg.put.ack_wmd.wh_object_cookie,
4090                       hdr->msg.put.match_bits);
4091                 CWARN("    Length %d, offset %d, hdr data %#llx\n",
4092                       hdr->payload_length, hdr->msg.put.offset,
4093                       hdr->msg.put.hdr_data);
4094                 break;
4095
4096         case LNET_MSG_GET:
4097                 CWARN("    Ptl index %d, return md %#llx.%#llx, "
4098                       "match bits %llu\n", hdr->msg.get.ptl_index,
4099                       hdr->msg.get.return_wmd.wh_interface_cookie,
4100                       hdr->msg.get.return_wmd.wh_object_cookie,
4101                       hdr->msg.get.match_bits);
4102                 CWARN("    Length %d, src offset %d\n",
4103                       hdr->msg.get.sink_length,
4104                       hdr->msg.get.src_offset);
4105                 break;
4106
4107         case LNET_MSG_ACK:
4108                 CWARN("    dst md %#llx.%#llx, "
4109                       "manipulated length %d\n",
4110                       hdr->msg.ack.dst_wmd.wh_interface_cookie,
4111                       hdr->msg.ack.dst_wmd.wh_object_cookie,
4112                       hdr->msg.ack.mlength);
4113                 break;
4114
4115         case LNET_MSG_REPLY:
4116                 CWARN("    dst md %#llx.%#llx, "
4117                       "length %d\n",
4118                       hdr->msg.reply.dst_wmd.wh_interface_cookie,
4119                       hdr->msg.reply.dst_wmd.wh_object_cookie,
4120                       hdr->payload_length);
4121         }
4122
4123 }
4124
4125 int
4126 lnet_parse(struct lnet_ni *ni, struct lnet_hdr *hdr, lnet_nid_t from_nid,
4127            void *private, int rdma_req)
4128 {
4129         struct lnet_peer_ni *lpni;
4130         struct lnet_msg *msg;
4131         __u32 payload_length;
4132         lnet_pid_t dest_pid;
4133         lnet_nid_t dest_nid;
4134         lnet_nid_t src_nid;
4135         bool push = false;
4136         int for_me;
4137         __u32 type;
4138         int rc = 0;
4139         int cpt;
4140
4141         LASSERT (!in_interrupt ());
4142
4143         type = le32_to_cpu(hdr->type);
4144         src_nid = le64_to_cpu(hdr->src_nid);
4145         dest_nid = le64_to_cpu(hdr->dest_nid);
4146         dest_pid = le32_to_cpu(hdr->dest_pid);
4147         payload_length = le32_to_cpu(hdr->payload_length);
4148
4149         for_me = (ni->ni_nid == dest_nid);
4150         cpt = lnet_cpt_of_nid(from_nid, ni);
4151
4152         CDEBUG(D_NET, "TRACE: %s(%s) <- %s : %s - %s\n",
4153                 libcfs_nid2str(dest_nid),
4154                 libcfs_nid2str(ni->ni_nid),
4155                 libcfs_nid2str(src_nid),
4156                 lnet_msgtyp2str(type),
4157                 (for_me) ? "for me" : "routed");
4158
4159         switch (type) {
4160         case LNET_MSG_ACK:
4161         case LNET_MSG_GET:
4162                 if (payload_length > 0) {
4163                         CERROR("%s, src %s: bad %s payload %d (0 expected)\n",
4164                                libcfs_nid2str(from_nid),
4165                                libcfs_nid2str(src_nid),
4166                                lnet_msgtyp2str(type), payload_length);
4167                         return -EPROTO;
4168                 }
4169                 break;
4170
4171         case LNET_MSG_PUT:
4172         case LNET_MSG_REPLY:
4173                 if (payload_length >
4174                     (__u32)(for_me ? LNET_MAX_PAYLOAD : LNET_MTU)) {
4175                         CERROR("%s, src %s: bad %s payload %d "
4176                                "(%d max expected)\n",
4177                                libcfs_nid2str(from_nid),
4178                                libcfs_nid2str(src_nid),
4179                                lnet_msgtyp2str(type),
4180                                payload_length,
4181                                for_me ? LNET_MAX_PAYLOAD : LNET_MTU);
4182                         return -EPROTO;
4183                 }
4184                 break;
4185
4186         default:
4187                 CERROR("%s, src %s: Bad message type 0x%x\n",
4188                        libcfs_nid2str(from_nid),
4189                        libcfs_nid2str(src_nid), type);
4190                 return -EPROTO;
4191         }
4192
4193         if (the_lnet.ln_routing &&
4194             ni->ni_net->net_last_alive != ktime_get_real_seconds()) {
4195                 lnet_ni_lock(ni);
4196                 spin_lock(&ni->ni_net->net_lock);
4197                 ni->ni_net->net_last_alive = ktime_get_real_seconds();
4198                 spin_unlock(&ni->ni_net->net_lock);
4199                 if (ni->ni_status != NULL &&
4200                     ni->ni_status->ns_status == LNET_NI_STATUS_DOWN) {
4201                         ni->ni_status->ns_status = LNET_NI_STATUS_UP;
4202                         push = true;
4203                 }
4204                 lnet_ni_unlock(ni);
4205         }
4206
4207         if (push)
4208                 lnet_push_update_to_peers(1);
4209
4210         /* Regard a bad destination NID as a protocol error.  Senders should
4211          * know what they're doing; if they don't they're misconfigured, buggy
4212          * or malicious so we chop them off at the knees :) */
4213
4214         if (!for_me) {
4215                 if (LNET_NIDNET(dest_nid) == LNET_NIDNET(ni->ni_nid)) {
4216                         /* should have gone direct */
4217                         CERROR("%s, src %s: Bad dest nid %s "
4218                                "(should have been sent direct)\n",
4219                                 libcfs_nid2str(from_nid),
4220                                 libcfs_nid2str(src_nid),
4221                                 libcfs_nid2str(dest_nid));
4222                         return -EPROTO;
4223                 }
4224
4225                 if (lnet_islocalnid(dest_nid)) {
4226                         /* dest is another local NI; sender should have used
4227                          * this node's NID on its own network */
4228                         CERROR("%s, src %s: Bad dest nid %s "
4229                                "(it's my nid but on a different network)\n",
4230                                 libcfs_nid2str(from_nid),
4231                                 libcfs_nid2str(src_nid),
4232                                 libcfs_nid2str(dest_nid));
4233                         return -EPROTO;
4234                 }
4235
4236                 if (rdma_req && type == LNET_MSG_GET) {
4237                         CERROR("%s, src %s: Bad optimized GET for %s "
4238                                "(final destination must be me)\n",
4239                                 libcfs_nid2str(from_nid),
4240                                 libcfs_nid2str(src_nid),
4241                                 libcfs_nid2str(dest_nid));
4242                         return -EPROTO;
4243                 }
4244
4245                 if (!the_lnet.ln_routing) {
4246                         CERROR("%s, src %s: Dropping message for %s "
4247                                "(routing not enabled)\n",
4248                                 libcfs_nid2str(from_nid),
4249                                 libcfs_nid2str(src_nid),
4250                                 libcfs_nid2str(dest_nid));
4251                         goto drop;
4252                 }
4253         }
4254
4255         /* Message looks OK; we're not going to return an error, so we MUST
4256          * call back lnd_recv() come what may... */
4257
4258         if (!list_empty(&the_lnet.ln_test_peers) &&     /* normally we don't */
4259             fail_peer(src_nid, 0)) {                    /* shall we now? */
4260                 CERROR("%s, src %s: Dropping %s to simulate failure\n",
4261                        libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
4262                        lnet_msgtyp2str(type));
4263                 goto drop;
4264         }
4265
4266         if (!list_empty(&the_lnet.ln_drop_rules) &&
4267             lnet_drop_rule_match(hdr, ni->ni_nid, NULL)) {
4268                 CDEBUG(D_NET, "%s, src %s, dst %s: Dropping %s to simulate"
4269                               "silent message loss\n",
4270                        libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
4271                        libcfs_nid2str(dest_nid), lnet_msgtyp2str(type));
4272                 goto drop;
4273         }
4274
4275         if (lnet_drop_asym_route && for_me &&
4276             LNET_NIDNET(src_nid) != LNET_NIDNET(from_nid)) {
4277                 struct lnet_net *net;
4278                 struct lnet_remotenet *rnet;
4279                 bool found = true;
4280
4281                 /* we are dealing with a routed message,
4282                  * so see if route to reach src_nid goes through from_nid
4283                  */
4284                 lnet_net_lock(cpt);
4285                 net = lnet_get_net_locked(LNET_NIDNET(ni->ni_nid));
4286                 if (!net) {
4287                         lnet_net_unlock(cpt);
4288                         CERROR("net %s not found\n",
4289                                libcfs_net2str(LNET_NIDNET(ni->ni_nid)));
4290                         return -EPROTO;
4291                 }
4292
4293                 rnet = lnet_find_rnet_locked(LNET_NIDNET(src_nid));
4294                 if (rnet) {
4295                         struct lnet_peer *gw = NULL;
4296                         struct lnet_peer_ni *lpni = NULL;
4297                         struct lnet_route *route;
4298
4299                         list_for_each_entry(route, &rnet->lrn_routes, lr_list) {
4300                                 found = false;
4301                                 gw = route->lr_gateway;
4302                                 if (route->lr_lnet != net->net_id)
4303                                         continue;
4304                                 /*
4305                                  * if the nid is one of the gateway's NIDs
4306                                  * then this is a valid gateway
4307                                  */
4308                                 while ((lpni = lnet_get_next_peer_ni_locked(gw,
4309                                                 NULL, lpni)) != NULL) {
4310                                         if (lpni->lpni_nid == from_nid) {
4311                                                 found = true;
4312                                                 break;
4313                                         }
4314                                 }
4315                         }
4316                 }
4317                 lnet_net_unlock(cpt);
4318                 if (!found) {
4319                         /* we would not use from_nid to route a message to
4320                          * src_nid
4321                          * => asymmetric routing detected but forbidden
4322                          */
4323                         CERROR("%s, src %s: Dropping asymmetrical route %s\n",
4324                                libcfs_nid2str(from_nid),
4325                                libcfs_nid2str(src_nid), lnet_msgtyp2str(type));
4326                         goto drop;
4327                 }
4328         }
4329
4330         msg = lnet_msg_alloc();
4331         if (msg == NULL) {
4332                 CERROR("%s, src %s: Dropping %s (out of memory)\n",
4333                        libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
4334                        lnet_msgtyp2str(type));
4335                 goto drop;
4336         }
4337
4338         /* msg zeroed in lnet_msg_alloc; i.e. flags all clear,
4339          * pointers NULL etc */
4340
4341         msg->msg_type = type;
4342         msg->msg_private = private;
4343         msg->msg_receiving = 1;
4344         msg->msg_rdma_get = rdma_req;
4345         msg->msg_len = msg->msg_wanted = payload_length;
4346         msg->msg_offset = 0;
4347         msg->msg_hdr = *hdr;
4348         /* for building message event */
4349         msg->msg_from = from_nid;
4350         if (!for_me) {
4351                 msg->msg_target.pid     = dest_pid;
4352                 msg->msg_target.nid     = dest_nid;
4353                 msg->msg_routing        = 1;
4354
4355         } else {
4356                 /* convert common msg->hdr fields to host byteorder */
4357                 msg->msg_hdr.type       = type;
4358                 msg->msg_hdr.src_nid    = src_nid;
4359                 msg->msg_hdr.src_pid    = le32_to_cpu(msg->msg_hdr.src_pid);
4360                 msg->msg_hdr.dest_nid   = dest_nid;
4361                 msg->msg_hdr.dest_pid   = dest_pid;
4362                 msg->msg_hdr.payload_length = payload_length;
4363         }
4364
4365         lnet_net_lock(cpt);
4366         lpni = lnet_nid2peerni_locked(from_nid, ni->ni_nid, cpt);
4367         if (IS_ERR(lpni)) {
4368                 lnet_net_unlock(cpt);
4369                 CERROR("%s, src %s: Dropping %s "
4370                        "(error %ld looking up sender)\n",
4371                        libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
4372                        lnet_msgtyp2str(type), PTR_ERR(lpni));
4373                 lnet_msg_free(msg);
4374                 if (rc == -ESHUTDOWN)
4375                         /* We are shutting down.  Don't do anything more */
4376                         return 0;
4377                 goto drop;
4378         }
4379         msg->msg_rxpeer = lpni;
4380         msg->msg_rxni = ni;
4381         lnet_ni_addref_locked(ni, cpt);
4382         /* Multi-Rail: Primary NID of source. */
4383         msg->msg_initiator = lnet_peer_primary_nid_locked(src_nid);
4384
4385         /*
4386          * mark the status of this lpni as UP since we received a message
4387          * from it. The ping response reports back the ns_status which is
4388          * marked on the remote as up or down and we cache it here.
4389          */
4390         msg->msg_rxpeer->lpni_ns_status = LNET_NI_STATUS_UP;
4391
4392         lnet_msg_commit(msg, cpt);
4393
4394         /* message delay simulation */
4395         if (unlikely(!list_empty(&the_lnet.ln_delay_rules) &&
4396                      lnet_delay_rule_match_locked(hdr, msg))) {
4397                 lnet_net_unlock(cpt);
4398                 return 0;
4399         }
4400
4401         if (!for_me) {
4402                 rc = lnet_parse_forward_locked(ni, msg);
4403                 lnet_net_unlock(cpt);
4404
4405                 if (rc < 0)
4406                         goto free_drop;
4407
4408                 if (rc == LNET_CREDIT_OK) {
4409                         lnet_ni_recv(ni, msg->msg_private, msg, 0,
4410                                      0, payload_length, payload_length);
4411                 }
4412                 return 0;
4413         }
4414
4415         lnet_net_unlock(cpt);
4416
4417         rc = lnet_parse_local(ni, msg);
4418         if (rc != 0)
4419                 goto free_drop;
4420         return 0;
4421
4422  free_drop:
4423         LASSERT(msg->msg_md == NULL);
4424         lnet_finalize(msg, rc);
4425
4426  drop:
4427         lnet_drop_message(ni, cpt, private, payload_length, type);
4428         return 0;
4429 }
4430 EXPORT_SYMBOL(lnet_parse);
4431
4432 void
4433 lnet_drop_delayed_msg_list(struct list_head *head, char *reason)
4434 {
4435         while (!list_empty(head)) {
4436                 struct lnet_process_id id = {0};
4437                 struct lnet_msg *msg;
4438
4439                 msg = list_entry(head->next, struct lnet_msg, msg_list);
4440                 list_del(&msg->msg_list);
4441
4442                 id.nid = msg->msg_hdr.src_nid;
4443                 id.pid = msg->msg_hdr.src_pid;
4444
4445                 LASSERT(msg->msg_md == NULL);
4446                 LASSERT(msg->msg_rx_delayed);
4447                 LASSERT(msg->msg_rxpeer != NULL);
4448                 LASSERT(msg->msg_hdr.type == LNET_MSG_PUT);
4449
4450                 CWARN("Dropping delayed PUT from %s portal %d match %llu"
4451                       " offset %d length %d: %s\n",
4452                       libcfs_id2str(id),
4453                       msg->msg_hdr.msg.put.ptl_index,
4454                       msg->msg_hdr.msg.put.match_bits,
4455                       msg->msg_hdr.msg.put.offset,
4456                       msg->msg_hdr.payload_length, reason);
4457
4458                 /* NB I can't drop msg's ref on msg_rxpeer until after I've
4459                  * called lnet_drop_message(), so I just hang onto msg as well
4460                  * until that's done */
4461
4462                 lnet_drop_message(msg->msg_rxni, msg->msg_rx_cpt,
4463                                   msg->msg_private, msg->msg_len,
4464                                   msg->msg_type);
4465
4466                 msg->msg_no_resend = true;
4467                 /*
4468                  * NB: message will not generate event because w/o attached MD,
4469                  * but we still should give error code so lnet_msg_decommit()
4470                  * can skip counters operations and other checks.
4471                  */
4472                 lnet_finalize(msg, -ENOENT);
4473         }
4474 }
4475
4476 void
4477 lnet_recv_delayed_msg_list(struct list_head *head)
4478 {
4479         while (!list_empty(head)) {
4480                 struct lnet_msg *msg;
4481                 struct lnet_process_id id;
4482
4483                 msg = list_entry(head->next, struct lnet_msg, msg_list);
4484                 list_del(&msg->msg_list);
4485
4486                 /* md won't disappear under me, since each msg
4487                  * holds a ref on it */
4488
4489                 id.nid = msg->msg_hdr.src_nid;
4490                 id.pid = msg->msg_hdr.src_pid;
4491
4492                 LASSERT(msg->msg_rx_delayed);
4493                 LASSERT(msg->msg_md != NULL);
4494                 LASSERT(msg->msg_rxpeer != NULL);
4495                 LASSERT(msg->msg_rxni != NULL);
4496                 LASSERT(msg->msg_hdr.type == LNET_MSG_PUT);
4497
4498                 CDEBUG(D_NET, "Resuming delayed PUT from %s portal %d "
4499                        "match %llu offset %d length %d.\n",
4500                         libcfs_id2str(id), msg->msg_hdr.msg.put.ptl_index,
4501                         msg->msg_hdr.msg.put.match_bits,
4502                         msg->msg_hdr.msg.put.offset,
4503                         msg->msg_hdr.payload_length);
4504
4505                 lnet_recv_put(msg->msg_rxni, msg);
4506         }
4507 }
4508
4509 static void
4510 lnet_attach_rsp_tracker(struct lnet_rsp_tracker *rspt, int cpt,
4511                         struct lnet_libmd *md, struct lnet_handle_md mdh)
4512 {
4513         s64 timeout_ns;
4514         bool new_entry = true;
4515         struct lnet_rsp_tracker *local_rspt;
4516
4517         /*
4518          * MD has a refcount taken by message so it's not going away.
4519          * The MD however can be looked up. We need to secure the access
4520          * to the md_rspt_ptr by taking the res_lock.
4521          * The rspt can be accessed without protection up to when it gets
4522          * added to the list.
4523          */
4524
4525         lnet_res_lock(cpt);
4526         local_rspt = md->md_rspt_ptr;
4527         timeout_ns = lnet_transaction_timeout * NSEC_PER_SEC;
4528         if (local_rspt != NULL) {
4529                 /*
4530                  * we already have an rspt attached to the md, so we'll
4531                  * update the deadline on that one.
4532                  */
4533                 LIBCFS_FREE(rspt, sizeof(*rspt));
4534                 new_entry = false;
4535         } else {
4536                 /* new md */
4537                 rspt->rspt_mdh = mdh;
4538                 rspt->rspt_cpt = cpt;
4539                 /* store the rspt so we can access it when we get the REPLY */
4540                 md->md_rspt_ptr = rspt;
4541                 local_rspt = rspt;
4542         }
4543         local_rspt->rspt_deadline = ktime_add_ns(ktime_get(), timeout_ns);
4544
4545         /*
4546          * add to the list of tracked responses. It's added to tail of the
4547          * list in order to expire all the older entries first.
4548          */
4549         lnet_net_lock(cpt);
4550         if (!new_entry && !list_empty(&local_rspt->rspt_on_list))
4551                 list_del_init(&local_rspt->rspt_on_list);
4552         list_add_tail(&local_rspt->rspt_on_list, the_lnet.ln_mt_rstq[cpt]);
4553         lnet_net_unlock(cpt);
4554         lnet_res_unlock(cpt);
4555 }
4556
4557 /**
4558  * Initiate an asynchronous PUT operation.
4559  *
4560  * There are several events associated with a PUT: completion of the send on
4561  * the initiator node (LNET_EVENT_SEND), and when the send completes
4562  * successfully, the receipt of an acknowledgment (LNET_EVENT_ACK) indicating
4563  * that the operation was accepted by the target. The event LNET_EVENT_PUT is
4564  * used at the target node to indicate the completion of incoming data
4565  * delivery.
4566  *
4567  * The local events will be logged in the EQ associated with the MD pointed to
4568  * by \a mdh handle. Using a MD without an associated EQ results in these
4569  * events being discarded. In this case, the caller must have another
4570  * mechanism (e.g., a higher level protocol) for determining when it is safe
4571  * to modify the memory region associated with the MD.
4572  *
4573  * Note that LNet does not guarantee the order of LNET_EVENT_SEND and
4574  * LNET_EVENT_ACK, though intuitively ACK should happen after SEND.
4575  *
4576  * \param self Indicates the NID of a local interface through which to send
4577  * the PUT request. Use LNET_NID_ANY to let LNet choose one by itself.
4578  * \param mdh A handle for the MD that describes the memory to be sent. The MD
4579  * must be "free floating" (See LNetMDBind()).
4580  * \param ack Controls whether an acknowledgment is requested.
4581  * Acknowledgments are only sent when they are requested by the initiating
4582  * process and the target MD enables them.
4583  * \param target A process identifier for the target process.
4584  * \param portal The index in the \a target's portal table.
4585  * \param match_bits The match bits to use for MD selection at the target
4586  * process.
4587  * \param offset The offset into the target MD (only used when the target
4588  * MD has the LNET_MD_MANAGE_REMOTE option set).
4589  * \param hdr_data 64 bits of user data that can be included in the message
4590  * header. This data is written to an event queue entry at the target if an
4591  * EQ is present on the matching MD.
4592  *
4593  * \retval  0      Success, and only in this case events will be generated
4594  * and logged to EQ (if it exists).
4595  * \retval -EIO    Simulated failure.
4596  * \retval -ENOMEM Memory allocation failure.
4597  * \retval -ENOENT Invalid MD object.
4598  *
4599  * \see struct lnet_event::hdr_data and lnet_event_kind_t.
4600  */
4601 int
4602 LNetPut(lnet_nid_t self, struct lnet_handle_md mdh, enum lnet_ack_req ack,
4603         struct lnet_process_id target, unsigned int portal,
4604         __u64 match_bits, unsigned int offset,
4605         __u64 hdr_data)
4606 {
4607         struct lnet_msg *msg;
4608         struct lnet_libmd *md;
4609         int cpt;
4610         int rc;
4611         struct lnet_rsp_tracker *rspt = NULL;
4612
4613         LASSERT(the_lnet.ln_refcount > 0);
4614
4615         if (!list_empty(&the_lnet.ln_test_peers) &&     /* normally we don't */
4616             fail_peer(target.nid, 1)) {                 /* shall we now? */
4617                 CERROR("Dropping PUT to %s: simulated failure\n",
4618                        libcfs_id2str(target));
4619                 return -EIO;
4620         }
4621
4622         msg = lnet_msg_alloc();
4623         if (msg == NULL) {
4624                 CERROR("Dropping PUT to %s: ENOMEM on struct lnet_msg\n",
4625                        libcfs_id2str(target));
4626                 return -ENOMEM;
4627         }
4628         msg->msg_vmflush = !!memory_pressure_get();
4629
4630         cpt = lnet_cpt_of_cookie(mdh.cookie);
4631
4632         if (ack == LNET_ACK_REQ) {
4633                 rspt = lnet_rspt_alloc(cpt);
4634                 if (!rspt) {
4635                         CERROR("Dropping PUT to %s: ENOMEM on response tracker\n",
4636                                 libcfs_id2str(target));
4637                         return -ENOMEM;
4638                 }
4639                 INIT_LIST_HEAD(&rspt->rspt_on_list);
4640         }
4641
4642         lnet_res_lock(cpt);
4643
4644         md = lnet_handle2md(&mdh);
4645         if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
4646                 CERROR("Dropping PUT (%llu:%d:%s): MD (%d) invalid\n",
4647                        match_bits, portal, libcfs_id2str(target),
4648                        md == NULL ? -1 : md->md_threshold);
4649                 if (md != NULL && md->md_me != NULL)
4650                         CERROR("Source MD also attached to portal %d\n",
4651                                md->md_me->me_portal);
4652                 lnet_res_unlock(cpt);
4653
4654                 LIBCFS_FREE(rspt, sizeof(*rspt));
4655                 lnet_msg_free(msg);
4656                 return -ENOENT;
4657         }
4658
4659         CDEBUG(D_NET, "LNetPut -> %s\n", libcfs_id2str(target));
4660
4661         lnet_msg_attach_md(msg, md, 0, 0);
4662
4663         lnet_prep_send(msg, LNET_MSG_PUT, target, 0, md->md_length);
4664
4665         msg->msg_hdr.msg.put.match_bits = cpu_to_le64(match_bits);
4666         msg->msg_hdr.msg.put.ptl_index = cpu_to_le32(portal);
4667         msg->msg_hdr.msg.put.offset = cpu_to_le32(offset);
4668         msg->msg_hdr.msg.put.hdr_data = hdr_data;
4669
4670         /* NB handles only looked up by creator (no flips) */
4671         if (ack == LNET_ACK_REQ) {
4672                 msg->msg_hdr.msg.put.ack_wmd.wh_interface_cookie =
4673                         the_lnet.ln_interface_cookie;
4674                 msg->msg_hdr.msg.put.ack_wmd.wh_object_cookie =
4675                         md->md_lh.lh_cookie;
4676         } else {
4677                 msg->msg_hdr.msg.put.ack_wmd.wh_interface_cookie =
4678                         LNET_WIRE_HANDLE_COOKIE_NONE;
4679                 msg->msg_hdr.msg.put.ack_wmd.wh_object_cookie =
4680                         LNET_WIRE_HANDLE_COOKIE_NONE;
4681         }
4682
4683         lnet_res_unlock(cpt);
4684
4685         lnet_build_msg_event(msg, LNET_EVENT_SEND);
4686
4687         if (ack == LNET_ACK_REQ)
4688                 lnet_attach_rsp_tracker(rspt, cpt, md, mdh);
4689
4690         if (CFS_FAIL_CHECK_ORSET(CFS_FAIL_PTLRPC_OST_BULK_CB2,
4691                                  CFS_FAIL_ONCE))
4692                 rc = -EIO;
4693         else
4694                 rc = lnet_send(self, msg, LNET_NID_ANY);
4695
4696         if (rc != 0) {
4697                 CNETERR("Error sending PUT to %s: %d\n",
4698                         libcfs_id2str(target), rc);
4699                 msg->msg_no_resend = true;
4700                 lnet_finalize(msg, rc);
4701         }
4702
4703         /* completion will be signalled by an event */
4704         return 0;
4705 }
4706 EXPORT_SYMBOL(LNetPut);
4707
4708 /*
4709  * The LND can DMA direct to the GET md (i.e. no REPLY msg).  This
4710  * returns a msg for the LND to pass to lnet_finalize() when the sink
4711  * data has been received.
4712  *
4713  * CAVEAT EMPTOR: 'getmsg' is the original GET, which is freed when
4714  * lnet_finalize() is called on it, so the LND must call this first
4715  */
4716 struct lnet_msg *
4717 lnet_create_reply_msg(struct lnet_ni *ni, struct lnet_msg *getmsg)
4718 {
4719         struct lnet_msg *msg = lnet_msg_alloc();
4720         struct lnet_libmd *getmd = getmsg->msg_md;
4721         struct lnet_process_id peer_id = getmsg->msg_target;
4722         int cpt;
4723
4724         LASSERT(!getmsg->msg_target_is_router);
4725         LASSERT(!getmsg->msg_routing);
4726
4727         if (msg == NULL) {
4728                 CERROR("%s: Dropping REPLY from %s: can't allocate msg\n",
4729                        libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id));
4730                 goto drop;
4731         }
4732
4733         cpt = lnet_cpt_of_cookie(getmd->md_lh.lh_cookie);
4734         lnet_res_lock(cpt);
4735
4736         LASSERT(getmd->md_refcount > 0);
4737
4738         if (getmd->md_threshold == 0) {
4739                 CERROR("%s: Dropping REPLY from %s for inactive MD %p\n",
4740                         libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id),
4741                         getmd);
4742                 lnet_res_unlock(cpt);
4743                 goto drop;
4744         }
4745
4746         LASSERT(getmd->md_offset == 0);
4747
4748         CDEBUG(D_NET, "%s: Reply from %s md %p\n",
4749                libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id), getmd);
4750
4751         /* setup information for lnet_build_msg_event */
4752         msg->msg_initiator = getmsg->msg_txpeer->lpni_peer_net->lpn_peer->lp_primary_nid;
4753         msg->msg_from = peer_id.nid;
4754         msg->msg_type = LNET_MSG_GET; /* flag this msg as an "optimized" GET */
4755         msg->msg_hdr.src_nid = peer_id.nid;
4756         msg->msg_hdr.payload_length = getmd->md_length;
4757         msg->msg_receiving = 1; /* required by lnet_msg_attach_md */
4758
4759         lnet_msg_attach_md(msg, getmd, getmd->md_offset, getmd->md_length);
4760         lnet_res_unlock(cpt);
4761
4762         cpt = lnet_cpt_of_nid(peer_id.nid, ni);
4763
4764         lnet_net_lock(cpt);
4765         lnet_msg_commit(msg, cpt);
4766         lnet_net_unlock(cpt);
4767
4768         lnet_build_msg_event(msg, LNET_EVENT_REPLY);
4769
4770         return msg;
4771
4772  drop:
4773         cpt = lnet_cpt_of_nid(peer_id.nid, ni);
4774
4775         lnet_net_lock(cpt);
4776         lnet_incr_stats(&ni->ni_stats, LNET_MSG_GET, LNET_STATS_TYPE_DROP);
4777         the_lnet.ln_counters[cpt]->lct_common.lcc_drop_count++;
4778         the_lnet.ln_counters[cpt]->lct_common.lcc_drop_length +=
4779                 getmd->md_length;
4780         lnet_net_unlock(cpt);
4781
4782         if (msg != NULL)
4783                 lnet_msg_free(msg);
4784
4785         return NULL;
4786 }
4787 EXPORT_SYMBOL(lnet_create_reply_msg);
4788
4789 void
4790 lnet_set_reply_msg_len(struct lnet_ni *ni, struct lnet_msg *reply,
4791                        unsigned int len)
4792 {
4793         /* Set the REPLY length, now the RDMA that elides the REPLY message has
4794          * completed and I know it. */
4795         LASSERT(reply != NULL);
4796         LASSERT(reply->msg_type == LNET_MSG_GET);
4797         LASSERT(reply->msg_ev.type == LNET_EVENT_REPLY);
4798
4799         /* NB I trusted my peer to RDMA.  If she tells me she's written beyond
4800          * the end of my buffer, I might as well be dead. */
4801         LASSERT(len <= reply->msg_ev.mlength);
4802
4803         reply->msg_ev.mlength = len;
4804 }
4805 EXPORT_SYMBOL(lnet_set_reply_msg_len);
4806
4807 /**
4808  * Initiate an asynchronous GET operation.
4809  *
4810  * On the initiator node, an LNET_EVENT_SEND is logged when the GET request
4811  * is sent, and an LNET_EVENT_REPLY is logged when the data returned from
4812  * the target node in the REPLY has been written to local MD.
4813  *
4814  * On the target node, an LNET_EVENT_GET is logged when the GET request
4815  * arrives and is accepted into a MD.
4816  *
4817  * \param self,target,portal,match_bits,offset See the discussion in LNetPut().
4818  * \param mdh A handle for the MD that describes the memory into which the
4819  * requested data will be received. The MD must be "free floating" (See LNetMDBind()).
4820  *
4821  * \retval  0      Success, and only in this case events will be generated
4822  * and logged to EQ (if it exists) of the MD.
4823  * \retval -EIO    Simulated failure.
4824  * \retval -ENOMEM Memory allocation failure.
4825  * \retval -ENOENT Invalid MD object.
4826  */
4827 int
4828 LNetGet(lnet_nid_t self, struct lnet_handle_md mdh,
4829         struct lnet_process_id target, unsigned int portal,
4830         __u64 match_bits, unsigned int offset, bool recovery)
4831 {
4832         struct lnet_msg *msg;
4833         struct lnet_libmd *md;
4834         struct lnet_rsp_tracker *rspt;
4835         int cpt;
4836         int rc;
4837
4838         LASSERT(the_lnet.ln_refcount > 0);
4839
4840         if (!list_empty(&the_lnet.ln_test_peers) &&     /* normally we don't */
4841             fail_peer(target.nid, 1))                   /* shall we now? */
4842         {
4843                 CERROR("Dropping GET to %s: simulated failure\n",
4844                        libcfs_id2str(target));
4845                 return -EIO;
4846         }
4847
4848         msg = lnet_msg_alloc();
4849         if (!msg) {
4850                 CERROR("Dropping GET to %s: ENOMEM on struct lnet_msg\n",
4851                        libcfs_id2str(target));
4852                 return -ENOMEM;
4853         }
4854
4855         cpt = lnet_cpt_of_cookie(mdh.cookie);
4856
4857         rspt = lnet_rspt_alloc(cpt);
4858         if (!rspt) {
4859                 CERROR("Dropping GET to %s: ENOMEM on response tracker\n",
4860                        libcfs_id2str(target));
4861                 return -ENOMEM;
4862         }
4863         INIT_LIST_HEAD(&rspt->rspt_on_list);
4864
4865         msg->msg_recovery = recovery;
4866
4867         lnet_res_lock(cpt);
4868
4869         md = lnet_handle2md(&mdh);
4870         if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
4871                 CERROR("Dropping GET (%llu:%d:%s): MD (%d) invalid\n",
4872                        match_bits, portal, libcfs_id2str(target),
4873                        md == NULL ? -1 : md->md_threshold);
4874                 if (md != NULL && md->md_me != NULL)
4875                         CERROR("REPLY MD also attached to portal %d\n",
4876                                md->md_me->me_portal);
4877
4878                 lnet_res_unlock(cpt);
4879
4880                 lnet_msg_free(msg);
4881                 LIBCFS_FREE(rspt, sizeof(*rspt));
4882                 return -ENOENT;
4883         }
4884
4885         CDEBUG(D_NET, "LNetGet -> %s\n", libcfs_id2str(target));
4886
4887         lnet_msg_attach_md(msg, md, 0, 0);
4888
4889         lnet_prep_send(msg, LNET_MSG_GET, target, 0, 0);
4890
4891         msg->msg_hdr.msg.get.match_bits = cpu_to_le64(match_bits);
4892         msg->msg_hdr.msg.get.ptl_index = cpu_to_le32(portal);
4893         msg->msg_hdr.msg.get.src_offset = cpu_to_le32(offset);
4894         msg->msg_hdr.msg.get.sink_length = cpu_to_le32(md->md_length);
4895
4896         /* NB handles only looked up by creator (no flips) */
4897         msg->msg_hdr.msg.get.return_wmd.wh_interface_cookie =
4898                 the_lnet.ln_interface_cookie;
4899         msg->msg_hdr.msg.get.return_wmd.wh_object_cookie =
4900                 md->md_lh.lh_cookie;
4901
4902         lnet_res_unlock(cpt);
4903
4904         lnet_build_msg_event(msg, LNET_EVENT_SEND);
4905
4906         lnet_attach_rsp_tracker(rspt, cpt, md, mdh);
4907
4908         rc = lnet_send(self, msg, LNET_NID_ANY);
4909         if (rc < 0) {
4910                 CNETERR("Error sending GET to %s: %d\n",
4911                         libcfs_id2str(target), rc);
4912                 msg->msg_no_resend = true;
4913                 lnet_finalize(msg, rc);
4914         }
4915
4916         /* completion will be signalled by an event */
4917         return 0;
4918 }
4919 EXPORT_SYMBOL(LNetGet);
4920
4921 /**
4922  * Calculate distance to node at \a dstnid.
4923  *
4924  * \param dstnid Target NID.
4925  * \param srcnidp If not NULL, NID of the local interface to reach \a dstnid
4926  * is saved here.
4927  * \param orderp If not NULL, order of the route to reach \a dstnid is saved
4928  * here.
4929  *
4930  * \retval 0 If \a dstnid belongs to a local interface, and reserved option
4931  * local_nid_dist_zero is set, which is the default.
4932  * \retval positives Distance to target NID, i.e. number of hops plus one.
4933  * \retval -EHOSTUNREACH If \a dstnid is not reachable.
4934  */
4935 int
4936 LNetDist(lnet_nid_t dstnid, lnet_nid_t *srcnidp, __u32 *orderp)
4937 {
4938         struct list_head        *e;
4939         struct lnet_ni *ni = NULL;
4940         struct lnet_remotenet *rnet;
4941         __u32                   dstnet = LNET_NIDNET(dstnid);
4942         int                     hops;
4943         int                     cpt;
4944         __u32                   order = 2;
4945         struct list_head        *rn_list;
4946
4947         /* if !local_nid_dist_zero, I don't return a distance of 0 ever
4948          * (when lustre sees a distance of 0, it substitutes 0@lo), so I
4949          * keep order 0 free for 0@lo and order 1 free for a local NID
4950          * match */
4951
4952         LASSERT(the_lnet.ln_refcount > 0);
4953
4954         cpt = lnet_net_lock_current();
4955
4956         while ((ni = lnet_get_next_ni_locked(NULL, ni))) {
4957                 if (ni->ni_nid == dstnid) {
4958                         if (srcnidp != NULL)
4959                                 *srcnidp = dstnid;
4960                         if (orderp != NULL) {
4961                                 if (LNET_NETTYP(LNET_NIDNET(dstnid)) == LOLND)
4962                                         *orderp = 0;
4963                                 else
4964                                         *orderp = 1;
4965                         }
4966                         lnet_net_unlock(cpt);
4967
4968                         return local_nid_dist_zero ? 0 : 1;
4969                 }
4970
4971                 if (LNET_NIDNET(ni->ni_nid) == dstnet) {
4972                         /* Check if ni was originally created in
4973                          * current net namespace.
4974                          * If not, assign order above 0xffff0000,
4975                          * to make this ni not a priority. */
4976                         if (!net_eq(ni->ni_net_ns, current->nsproxy->net_ns))
4977                                 order += 0xffff0000;
4978
4979                         if (srcnidp != NULL)
4980                                 *srcnidp = ni->ni_nid;
4981                         if (orderp != NULL)
4982                                 *orderp = order;
4983                         lnet_net_unlock(cpt);
4984                         return 1;
4985                 }
4986
4987                 order++;
4988         }
4989
4990         rn_list = lnet_net2rnethash(dstnet);
4991         list_for_each(e, rn_list) {
4992                 rnet = list_entry(e, struct lnet_remotenet, lrn_list);
4993
4994                 if (rnet->lrn_net == dstnet) {
4995                         struct lnet_route *route;
4996                         struct lnet_route *shortest = NULL;
4997                         __u32 shortest_hops = LNET_UNDEFINED_HOPS;
4998                         __u32 route_hops;
4999
5000                         LASSERT(!list_empty(&rnet->lrn_routes));
5001
5002                         list_for_each_entry(route, &rnet->lrn_routes,
5003                                             lr_list) {
5004                                 route_hops = route->lr_hops;
5005                                 if (route_hops == LNET_UNDEFINED_HOPS)
5006                                         route_hops = 1;
5007                                 if (shortest == NULL ||
5008                                     route_hops < shortest_hops) {
5009                                         shortest = route;
5010                                         shortest_hops = route_hops;
5011                                 }
5012                         }
5013
5014                         LASSERT(shortest != NULL);
5015                         hops = shortest_hops;
5016                         if (srcnidp != NULL) {
5017                                 struct lnet_net *net;
5018                                 net = lnet_get_net_locked(shortest->lr_lnet);
5019                                 LASSERT(net);
5020                                 ni = lnet_get_next_ni_locked(net, NULL);
5021                                 *srcnidp = ni->ni_nid;
5022                         }
5023                         if (orderp != NULL)
5024                                 *orderp = order;
5025                         lnet_net_unlock(cpt);
5026                         return hops + 1;
5027                 }
5028                 order++;
5029         }
5030
5031         lnet_net_unlock(cpt);
5032         return -EHOSTUNREACH;
5033 }
5034 EXPORT_SYMBOL(LNetDist);