Whamcloud - gitweb
b31ab864fb533b9b72011ea0f90ffae50d79fced
[fs/lustre-release.git] / lnet / lnet / lib-move.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lnet/lnet/lib-move.c
33  *
34  * Data movement routines
35  */
36
37 #define DEBUG_SUBSYSTEM S_LNET
38
39 #include <lnet/lib-lnet.h>
40 #include <linux/nsproxy.h>
41 #include <net/net_namespace.h>
42
43 static int local_nid_dist_zero = 1;
44 module_param(local_nid_dist_zero, int, 0444);
45 MODULE_PARM_DESC(local_nid_dist_zero, "Reserved");
46
47 static inline struct lnet_comm_count *
48 get_stats_counts(struct lnet_element_stats *stats,
49                  enum lnet_stats_type stats_type)
50 {
51         switch (stats_type) {
52         case LNET_STATS_TYPE_SEND:
53                 return &stats->el_send_stats;
54         case LNET_STATS_TYPE_RECV:
55                 return &stats->el_recv_stats;
56         case LNET_STATS_TYPE_DROP:
57                 return &stats->el_drop_stats;
58         default:
59                 CERROR("Unknown stats type\n");
60         }
61
62         return NULL;
63 }
64
65 void lnet_incr_stats(struct lnet_element_stats *stats,
66                      enum lnet_msg_type msg_type,
67                      enum lnet_stats_type stats_type)
68 {
69         struct lnet_comm_count *counts = get_stats_counts(stats, stats_type);
70         if (!counts)
71                 return;
72
73         switch (msg_type) {
74         case LNET_MSG_ACK:
75                 atomic_inc(&counts->co_ack_count);
76                 break;
77         case LNET_MSG_PUT:
78                 atomic_inc(&counts->co_put_count);
79                 break;
80         case LNET_MSG_GET:
81                 atomic_inc(&counts->co_get_count);
82                 break;
83         case LNET_MSG_REPLY:
84                 atomic_inc(&counts->co_reply_count);
85                 break;
86         case LNET_MSG_HELLO:
87                 atomic_inc(&counts->co_hello_count);
88                 break;
89         default:
90                 CERROR("There is a BUG in the code. Unknown message type\n");
91                 break;
92         }
93 }
94
95 __u32 lnet_sum_stats(struct lnet_element_stats *stats,
96                      enum lnet_stats_type stats_type)
97 {
98         struct lnet_comm_count *counts = get_stats_counts(stats, stats_type);
99         if (!counts)
100                 return 0;
101
102         return (atomic_read(&counts->co_ack_count) +
103                 atomic_read(&counts->co_put_count) +
104                 atomic_read(&counts->co_get_count) +
105                 atomic_read(&counts->co_reply_count) +
106                 atomic_read(&counts->co_hello_count));
107 }
108
109 static inline void assign_stats(struct lnet_ioctl_comm_count *msg_stats,
110                                 struct lnet_comm_count *counts)
111 {
112         msg_stats->ico_get_count = atomic_read(&counts->co_get_count);
113         msg_stats->ico_put_count = atomic_read(&counts->co_put_count);
114         msg_stats->ico_reply_count = atomic_read(&counts->co_reply_count);
115         msg_stats->ico_ack_count = atomic_read(&counts->co_ack_count);
116         msg_stats->ico_hello_count = atomic_read(&counts->co_hello_count);
117 }
118
119 void lnet_usr_translate_stats(struct lnet_ioctl_element_msg_stats *msg_stats,
120                               struct lnet_element_stats *stats)
121 {
122         struct lnet_comm_count *counts;
123
124         LASSERT(msg_stats);
125         LASSERT(stats);
126
127         counts = get_stats_counts(stats, LNET_STATS_TYPE_SEND);
128         if (!counts)
129                 return;
130         assign_stats(&msg_stats->im_send_stats, counts);
131
132         counts = get_stats_counts(stats, LNET_STATS_TYPE_RECV);
133         if (!counts)
134                 return;
135         assign_stats(&msg_stats->im_recv_stats, counts);
136
137         counts = get_stats_counts(stats, LNET_STATS_TYPE_DROP);
138         if (!counts)
139                 return;
140         assign_stats(&msg_stats->im_drop_stats, counts);
141 }
142
143 int
144 lnet_fail_nid(lnet_nid_t nid, unsigned int threshold)
145 {
146         struct lnet_test_peer *tp;
147         struct list_head *el;
148         struct list_head *next;
149         struct list_head  cull;
150
151         /* NB: use lnet_net_lock(0) to serialize operations on test peers */
152         if (threshold != 0) {
153                 /* Adding a new entry */
154                 LIBCFS_ALLOC(tp, sizeof(*tp));
155                 if (tp == NULL)
156                         return -ENOMEM;
157
158                 tp->tp_nid = nid;
159                 tp->tp_threshold = threshold;
160
161                 lnet_net_lock(0);
162                 list_add_tail(&tp->tp_list, &the_lnet.ln_test_peers);
163                 lnet_net_unlock(0);
164                 return 0;
165         }
166
167         /* removing entries */
168         INIT_LIST_HEAD(&cull);
169
170         lnet_net_lock(0);
171
172         list_for_each_safe(el, next, &the_lnet.ln_test_peers) {
173                 tp = list_entry(el, struct lnet_test_peer, tp_list);
174
175                 if (tp->tp_threshold == 0 ||    /* needs culling anyway */
176                     nid == LNET_NID_ANY ||      /* removing all entries */
177                     tp->tp_nid == nid) {        /* matched this one */
178                         list_del(&tp->tp_list);
179                         list_add(&tp->tp_list, &cull);
180                 }
181         }
182
183         lnet_net_unlock(0);
184
185         while (!list_empty(&cull)) {
186                 tp = list_entry(cull.next, struct lnet_test_peer, tp_list);
187
188                 list_del(&tp->tp_list);
189                 LIBCFS_FREE(tp, sizeof(*tp));
190         }
191         return 0;
192 }
193
194 static int
195 fail_peer (lnet_nid_t nid, int outgoing)
196 {
197         struct lnet_test_peer *tp;
198         struct list_head *el;
199         struct list_head *next;
200         struct list_head  cull;
201         int               fail = 0;
202
203         INIT_LIST_HEAD(&cull);
204
205         /* NB: use lnet_net_lock(0) to serialize operations on test peers */
206         lnet_net_lock(0);
207
208         list_for_each_safe(el, next, &the_lnet.ln_test_peers) {
209                 tp = list_entry(el, struct lnet_test_peer, tp_list);
210
211                 if (tp->tp_threshold == 0) {
212                         /* zombie entry */
213                         if (outgoing) {
214                                 /* only cull zombies on outgoing tests,
215                                  * since we may be at interrupt priority on
216                                  * incoming messages. */
217                                 list_del(&tp->tp_list);
218                                 list_add(&tp->tp_list, &cull);
219                         }
220                         continue;
221                 }
222
223                 if (tp->tp_nid == LNET_NID_ANY ||       /* fail every peer */
224                     nid == tp->tp_nid) {                /* fail this peer */
225                         fail = 1;
226
227                         if (tp->tp_threshold != LNET_MD_THRESH_INF) {
228                                 tp->tp_threshold--;
229                                 if (outgoing &&
230                                     tp->tp_threshold == 0) {
231                                         /* see above */
232                                         list_del(&tp->tp_list);
233                                         list_add(&tp->tp_list, &cull);
234                                 }
235                         }
236                         break;
237                 }
238         }
239
240         lnet_net_unlock(0);
241
242         while (!list_empty(&cull)) {
243                 tp = list_entry(cull.next, struct lnet_test_peer, tp_list);
244                 list_del(&tp->tp_list);
245
246                 LIBCFS_FREE(tp, sizeof(*tp));
247         }
248
249         return fail;
250 }
251
252 unsigned int
253 lnet_iov_nob(unsigned int niov, struct kvec *iov)
254 {
255         unsigned int nob = 0;
256
257         LASSERT(niov == 0 || iov != NULL);
258         while (niov-- > 0)
259                 nob += (iov++)->iov_len;
260
261         return (nob);
262 }
263 EXPORT_SYMBOL(lnet_iov_nob);
264
265 void
266 lnet_copy_iov2iov(unsigned int ndiov, struct kvec *diov, unsigned int doffset,
267                   unsigned int nsiov, struct kvec *siov, unsigned int soffset,
268                   unsigned int nob)
269 {
270         /* NB diov, siov are READ-ONLY */
271         unsigned int  this_nob;
272
273         if (nob == 0)
274                 return;
275
276         /* skip complete frags before 'doffset' */
277         LASSERT(ndiov > 0);
278         while (doffset >= diov->iov_len) {
279                 doffset -= diov->iov_len;
280                 diov++;
281                 ndiov--;
282                 LASSERT(ndiov > 0);
283         }
284
285         /* skip complete frags before 'soffset' */
286         LASSERT(nsiov > 0);
287         while (soffset >= siov->iov_len) {
288                 soffset -= siov->iov_len;
289                 siov++;
290                 nsiov--;
291                 LASSERT(nsiov > 0);
292         }
293
294         do {
295                 LASSERT(ndiov > 0);
296                 LASSERT(nsiov > 0);
297                 this_nob = MIN(diov->iov_len - doffset,
298                                siov->iov_len - soffset);
299                 this_nob = MIN(this_nob, nob);
300
301                 memcpy((char *)diov->iov_base + doffset,
302                        (char *)siov->iov_base + soffset, this_nob);
303                 nob -= this_nob;
304
305                 if (diov->iov_len > doffset + this_nob) {
306                         doffset += this_nob;
307                 } else {
308                         diov++;
309                         ndiov--;
310                         doffset = 0;
311                 }
312
313                 if (siov->iov_len > soffset + this_nob) {
314                         soffset += this_nob;
315                 } else {
316                         siov++;
317                         nsiov--;
318                         soffset = 0;
319                 }
320         } while (nob > 0);
321 }
322 EXPORT_SYMBOL(lnet_copy_iov2iov);
323
324 int
325 lnet_extract_iov(int dst_niov, struct kvec *dst,
326                  int src_niov, struct kvec *src,
327                  unsigned int offset, unsigned int len)
328 {
329         /* Initialise 'dst' to the subset of 'src' starting at 'offset',
330          * for exactly 'len' bytes, and return the number of entries.
331          * NB not destructive to 'src' */
332         unsigned int    frag_len;
333         unsigned int    niov;
334
335         if (len == 0)                           /* no data => */
336                 return (0);                     /* no frags */
337
338         LASSERT(src_niov > 0);
339         while (offset >= src->iov_len) {      /* skip initial frags */
340                 offset -= src->iov_len;
341                 src_niov--;
342                 src++;
343                 LASSERT(src_niov > 0);
344         }
345
346         niov = 1;
347         for (;;) {
348                 LASSERT(src_niov > 0);
349                 LASSERT((int)niov <= dst_niov);
350
351                 frag_len = src->iov_len - offset;
352                 dst->iov_base = ((char *)src->iov_base) + offset;
353
354                 if (len <= frag_len) {
355                         dst->iov_len = len;
356                         return (niov);
357                 }
358
359                 dst->iov_len = frag_len;
360
361                 len -= frag_len;
362                 dst++;
363                 src++;
364                 niov++;
365                 src_niov--;
366                 offset = 0;
367         }
368 }
369 EXPORT_SYMBOL(lnet_extract_iov);
370
371
372 unsigned int
373 lnet_kiov_nob(unsigned int niov, lnet_kiov_t *kiov)
374 {
375         unsigned int  nob = 0;
376
377         LASSERT(niov == 0 || kiov != NULL);
378         while (niov-- > 0)
379                 nob += (kiov++)->kiov_len;
380
381         return (nob);
382 }
383 EXPORT_SYMBOL(lnet_kiov_nob);
384
385 void
386 lnet_copy_kiov2kiov(unsigned int ndiov, lnet_kiov_t *diov, unsigned int doffset,
387                     unsigned int nsiov, lnet_kiov_t *siov, unsigned int soffset,
388                     unsigned int nob)
389 {
390         /* NB diov, siov are READ-ONLY */
391         unsigned int    this_nob;
392         char           *daddr = NULL;
393         char           *saddr = NULL;
394
395         if (nob == 0)
396                 return;
397
398         LASSERT (!in_interrupt ());
399
400         LASSERT (ndiov > 0);
401         while (doffset >= diov->kiov_len) {
402                 doffset -= diov->kiov_len;
403                 diov++;
404                 ndiov--;
405                 LASSERT(ndiov > 0);
406         }
407
408         LASSERT(nsiov > 0);
409         while (soffset >= siov->kiov_len) {
410                 soffset -= siov->kiov_len;
411                 siov++;
412                 nsiov--;
413                 LASSERT(nsiov > 0);
414         }
415
416         do {
417                 LASSERT(ndiov > 0);
418                 LASSERT(nsiov > 0);
419                 this_nob = MIN(diov->kiov_len - doffset,
420                                siov->kiov_len - soffset);
421                 this_nob = MIN(this_nob, nob);
422
423                 if (daddr == NULL)
424                         daddr = ((char *)kmap(diov->kiov_page)) +
425                                 diov->kiov_offset + doffset;
426                 if (saddr == NULL)
427                         saddr = ((char *)kmap(siov->kiov_page)) +
428                                 siov->kiov_offset + soffset;
429
430                 /* Vanishing risk of kmap deadlock when mapping 2 pages.
431                  * However in practice at least one of the kiovs will be mapped
432                  * kernel pages and the map/unmap will be NOOPs */
433
434                 memcpy (daddr, saddr, this_nob);
435                 nob -= this_nob;
436
437                 if (diov->kiov_len > doffset + this_nob) {
438                         daddr += this_nob;
439                         doffset += this_nob;
440                 } else {
441                         kunmap(diov->kiov_page);
442                         daddr = NULL;
443                         diov++;
444                         ndiov--;
445                         doffset = 0;
446                 }
447
448                 if (siov->kiov_len > soffset + this_nob) {
449                         saddr += this_nob;
450                         soffset += this_nob;
451                 } else {
452                         kunmap(siov->kiov_page);
453                         saddr = NULL;
454                         siov++;
455                         nsiov--;
456                         soffset = 0;
457                 }
458         } while (nob > 0);
459
460         if (daddr != NULL)
461                 kunmap(diov->kiov_page);
462         if (saddr != NULL)
463                 kunmap(siov->kiov_page);
464 }
465 EXPORT_SYMBOL(lnet_copy_kiov2kiov);
466
467 void
468 lnet_copy_kiov2iov (unsigned int niov, struct kvec *iov, unsigned int iovoffset,
469                     unsigned int nkiov, lnet_kiov_t *kiov, unsigned int kiovoffset,
470                     unsigned int nob)
471 {
472         /* NB iov, kiov are READ-ONLY */
473         unsigned int    this_nob;
474         char           *addr = NULL;
475
476         if (nob == 0)
477                 return;
478
479         LASSERT (!in_interrupt ());
480
481         LASSERT (niov > 0);
482         while (iovoffset >= iov->iov_len) {
483                 iovoffset -= iov->iov_len;
484                 iov++;
485                 niov--;
486                 LASSERT(niov > 0);
487         }
488
489         LASSERT(nkiov > 0);
490         while (kiovoffset >= kiov->kiov_len) {
491                 kiovoffset -= kiov->kiov_len;
492                 kiov++;
493                 nkiov--;
494                 LASSERT(nkiov > 0);
495         }
496
497         do {
498                 LASSERT(niov > 0);
499                 LASSERT(nkiov > 0);
500                 this_nob = MIN(iov->iov_len - iovoffset,
501                                kiov->kiov_len - kiovoffset);
502                 this_nob = MIN(this_nob, nob);
503
504                 if (addr == NULL)
505                         addr = ((char *)kmap(kiov->kiov_page)) +
506                                 kiov->kiov_offset + kiovoffset;
507
508                 memcpy((char *)iov->iov_base + iovoffset, addr, this_nob);
509                 nob -= this_nob;
510
511                 if (iov->iov_len > iovoffset + this_nob) {
512                         iovoffset += this_nob;
513                 } else {
514                         iov++;
515                         niov--;
516                         iovoffset = 0;
517                 }
518
519                 if (kiov->kiov_len > kiovoffset + this_nob) {
520                         addr += this_nob;
521                         kiovoffset += this_nob;
522                 } else {
523                         kunmap(kiov->kiov_page);
524                         addr = NULL;
525                         kiov++;
526                         nkiov--;
527                         kiovoffset = 0;
528                 }
529
530         } while (nob > 0);
531
532         if (addr != NULL)
533                 kunmap(kiov->kiov_page);
534 }
535 EXPORT_SYMBOL(lnet_copy_kiov2iov);
536
537 void
538 lnet_copy_iov2kiov(unsigned int nkiov, lnet_kiov_t *kiov, unsigned int kiovoffset,
539                    unsigned int niov, struct kvec *iov, unsigned int iovoffset,
540                    unsigned int nob)
541 {
542         /* NB kiov, iov are READ-ONLY */
543         unsigned int    this_nob;
544         char           *addr = NULL;
545
546         if (nob == 0)
547                 return;
548
549         LASSERT (!in_interrupt ());
550
551         LASSERT (nkiov > 0);
552         while (kiovoffset >= kiov->kiov_len) {
553                 kiovoffset -= kiov->kiov_len;
554                 kiov++;
555                 nkiov--;
556                 LASSERT(nkiov > 0);
557         }
558
559         LASSERT(niov > 0);
560         while (iovoffset >= iov->iov_len) {
561                 iovoffset -= iov->iov_len;
562                 iov++;
563                 niov--;
564                 LASSERT(niov > 0);
565         }
566
567         do {
568                 LASSERT(nkiov > 0);
569                 LASSERT(niov > 0);
570                 this_nob = MIN(kiov->kiov_len - kiovoffset,
571                                iov->iov_len - iovoffset);
572                 this_nob = MIN(this_nob, nob);
573
574                 if (addr == NULL)
575                         addr = ((char *)kmap(kiov->kiov_page)) +
576                                 kiov->kiov_offset + kiovoffset;
577
578                 memcpy (addr, (char *)iov->iov_base + iovoffset, this_nob);
579                 nob -= this_nob;
580
581                 if (kiov->kiov_len > kiovoffset + this_nob) {
582                         addr += this_nob;
583                         kiovoffset += this_nob;
584                 } else {
585                         kunmap(kiov->kiov_page);
586                         addr = NULL;
587                         kiov++;
588                         nkiov--;
589                         kiovoffset = 0;
590                 }
591
592                 if (iov->iov_len > iovoffset + this_nob) {
593                         iovoffset += this_nob;
594                 } else {
595                         iov++;
596                         niov--;
597                         iovoffset = 0;
598                 }
599         } while (nob > 0);
600
601         if (addr != NULL)
602                 kunmap(kiov->kiov_page);
603 }
604 EXPORT_SYMBOL(lnet_copy_iov2kiov);
605
606 int
607 lnet_extract_kiov(int dst_niov, lnet_kiov_t *dst,
608                   int src_niov, lnet_kiov_t *src,
609                   unsigned int offset, unsigned int len)
610 {
611         /* Initialise 'dst' to the subset of 'src' starting at 'offset',
612          * for exactly 'len' bytes, and return the number of entries.
613          * NB not destructive to 'src' */
614         unsigned int    frag_len;
615         unsigned int    niov;
616
617         if (len == 0)                           /* no data => */
618                 return (0);                     /* no frags */
619
620         LASSERT(src_niov > 0);
621         while (offset >= src->kiov_len) {      /* skip initial frags */
622                 offset -= src->kiov_len;
623                 src_niov--;
624                 src++;
625                 LASSERT(src_niov > 0);
626         }
627
628         niov = 1;
629         for (;;) {
630                 LASSERT(src_niov > 0);
631                 LASSERT((int)niov <= dst_niov);
632
633                 frag_len = src->kiov_len - offset;
634                 dst->kiov_page = src->kiov_page;
635                 dst->kiov_offset = src->kiov_offset + offset;
636
637                 if (len <= frag_len) {
638                         dst->kiov_len = len;
639                         LASSERT(dst->kiov_offset + dst->kiov_len <= PAGE_SIZE);
640                         return niov;
641                 }
642
643                 dst->kiov_len = frag_len;
644                 LASSERT(dst->kiov_offset + dst->kiov_len <= PAGE_SIZE);
645
646                 len -= frag_len;
647                 dst++;
648                 src++;
649                 niov++;
650                 src_niov--;
651                 offset = 0;
652         }
653 }
654 EXPORT_SYMBOL(lnet_extract_kiov);
655
656 void
657 lnet_ni_recv(struct lnet_ni *ni, void *private, struct lnet_msg *msg,
658              int delayed, unsigned int offset, unsigned int mlen,
659              unsigned int rlen)
660 {
661         unsigned int  niov = 0;
662         struct kvec *iov = NULL;
663         lnet_kiov_t  *kiov = NULL;
664         int           rc;
665
666         LASSERT (!in_interrupt ());
667         LASSERT (mlen == 0 || msg != NULL);
668
669         if (msg != NULL) {
670                 LASSERT(msg->msg_receiving);
671                 LASSERT(!msg->msg_sending);
672                 LASSERT(rlen == msg->msg_len);
673                 LASSERT(mlen <= msg->msg_len);
674                 LASSERT(msg->msg_offset == offset);
675                 LASSERT(msg->msg_wanted == mlen);
676
677                 msg->msg_receiving = 0;
678
679                 if (mlen != 0) {
680                         niov = msg->msg_niov;
681                         iov  = msg->msg_iov;
682                         kiov = msg->msg_kiov;
683
684                         LASSERT (niov > 0);
685                         LASSERT ((iov == NULL) != (kiov == NULL));
686                 }
687         }
688
689         rc = (ni->ni_net->net_lnd->lnd_recv)(ni, private, msg, delayed,
690                                              niov, iov, kiov, offset, mlen,
691                                              rlen);
692         if (rc < 0)
693                 lnet_finalize(msg, rc);
694 }
695
696 static void
697 lnet_setpayloadbuffer(struct lnet_msg *msg)
698 {
699         struct lnet_libmd *md = msg->msg_md;
700
701         LASSERT(msg->msg_len > 0);
702         LASSERT(!msg->msg_routing);
703         LASSERT(md != NULL);
704         LASSERT(msg->msg_niov == 0);
705         LASSERT(msg->msg_iov == NULL);
706         LASSERT(msg->msg_kiov == NULL);
707
708         msg->msg_niov = md->md_niov;
709         if ((md->md_options & LNET_MD_KIOV) != 0)
710                 msg->msg_kiov = md->md_iov.kiov;
711         else
712                 msg->msg_iov = md->md_iov.iov;
713 }
714
715 void
716 lnet_prep_send(struct lnet_msg *msg, int type, struct lnet_process_id target,
717                unsigned int offset, unsigned int len)
718 {
719         msg->msg_type = type;
720         msg->msg_target = target;
721         msg->msg_len = len;
722         msg->msg_offset = offset;
723
724         if (len != 0)
725                 lnet_setpayloadbuffer(msg);
726
727         memset (&msg->msg_hdr, 0, sizeof (msg->msg_hdr));
728         msg->msg_hdr.type           = cpu_to_le32(type);
729         /* dest_nid will be overwritten by lnet_select_pathway() */
730         msg->msg_hdr.dest_nid       = cpu_to_le64(target.nid);
731         msg->msg_hdr.dest_pid       = cpu_to_le32(target.pid);
732         /* src_nid will be set later */
733         msg->msg_hdr.src_pid        = cpu_to_le32(the_lnet.ln_pid);
734         msg->msg_hdr.payload_length = cpu_to_le32(len);
735 }
736
737 static void
738 lnet_ni_send(struct lnet_ni *ni, struct lnet_msg *msg)
739 {
740         void   *priv = msg->msg_private;
741         int     rc;
742
743         LASSERT (!in_interrupt ());
744         LASSERT (LNET_NETTYP(LNET_NIDNET(ni->ni_nid)) == LOLND ||
745                  (msg->msg_txcredit && msg->msg_peertxcredit));
746
747         rc = (ni->ni_net->net_lnd->lnd_send)(ni, priv, msg);
748         if (rc < 0)
749                 lnet_finalize(msg, rc);
750 }
751
752 static int
753 lnet_ni_eager_recv(struct lnet_ni *ni, struct lnet_msg *msg)
754 {
755         int     rc;
756
757         LASSERT(!msg->msg_sending);
758         LASSERT(msg->msg_receiving);
759         LASSERT(!msg->msg_rx_ready_delay);
760         LASSERT(ni->ni_net->net_lnd->lnd_eager_recv != NULL);
761
762         msg->msg_rx_ready_delay = 1;
763         rc = (ni->ni_net->net_lnd->lnd_eager_recv)(ni, msg->msg_private, msg,
764                                                   &msg->msg_private);
765         if (rc != 0) {
766                 CERROR("recv from %s / send to %s aborted: "
767                        "eager_recv failed %d\n",
768                        libcfs_nid2str(msg->msg_rxpeer->lpni_nid),
769                        libcfs_id2str(msg->msg_target), rc);
770                 LASSERT(rc < 0); /* required by my callers */
771         }
772
773         return rc;
774 }
775
776 /*
777  * This function can be called from two paths:
778  *      1. when sending a message
779  *      2. when decommiting a message (lnet_msg_decommit_tx())
780  * In both these cases the peer_ni should have it's reference count
781  * acquired by the caller and therefore it is safe to drop the spin
782  * lock before calling lnd_query()
783  */
784 static void
785 lnet_ni_query_locked(struct lnet_ni *ni, struct lnet_peer_ni *lp)
786 {
787         cfs_time_t last_alive = 0;
788         int cpt = lnet_cpt_of_nid_locked(lp->lpni_nid, ni);
789
790         LASSERT(lnet_peer_aliveness_enabled(lp));
791         LASSERT(ni->ni_net->net_lnd->lnd_query != NULL);
792
793         lnet_net_unlock(cpt);
794         (ni->ni_net->net_lnd->lnd_query)(ni, lp->lpni_nid, &last_alive);
795         lnet_net_lock(cpt);
796
797         lp->lpni_last_query = cfs_time_current();
798
799         if (last_alive != 0) /* NI has updated timestamp */
800                 lp->lpni_last_alive = last_alive;
801 }
802
803 /* NB: always called with lnet_net_lock held */
804 static inline int
805 lnet_peer_is_alive (struct lnet_peer_ni *lp, cfs_time_t now)
806 {
807         int        alive;
808         cfs_time_t deadline;
809
810         LASSERT (lnet_peer_aliveness_enabled(lp));
811
812         /*
813          * Trust lnet_notify() if it has more recent aliveness news, but
814          * ignore the initial assumed death (see lnet_peers_start_down()).
815          */
816         spin_lock(&lp->lpni_lock);
817         if (!lp->lpni_alive && lp->lpni_alive_count > 0 &&
818             cfs_time_aftereq(lp->lpni_timestamp, lp->lpni_last_alive)) {
819                 spin_unlock(&lp->lpni_lock);
820                 return 0;
821         }
822
823         deadline =
824           cfs_time_add(lp->lpni_last_alive,
825                        cfs_time_seconds(lp->lpni_net->net_tunables.
826                                         lct_peer_timeout));
827         alive = cfs_time_after(deadline, now);
828
829         /*
830          * Update obsolete lp_alive except for routers assumed to be dead
831          * initially, because router checker would update aliveness in this
832          * case, and moreover lpni_last_alive at peer creation is assumed.
833          */
834         if (alive && !lp->lpni_alive &&
835             !(lnet_isrouter(lp) && lp->lpni_alive_count == 0)) {
836                 spin_unlock(&lp->lpni_lock);
837                 lnet_notify_locked(lp, 0, 1, lp->lpni_last_alive);
838         } else {
839                 spin_unlock(&lp->lpni_lock);
840         }
841
842         return alive;
843 }
844
845
846 /* NB: returns 1 when alive, 0 when dead, negative when error;
847  *     may drop the lnet_net_lock */
848 static int
849 lnet_peer_alive_locked (struct lnet_ni *ni, struct lnet_peer_ni *lp)
850 {
851         cfs_time_t now = cfs_time_current();
852
853         if (!lnet_peer_aliveness_enabled(lp))
854                 return -ENODEV;
855
856         if (lnet_peer_is_alive(lp, now))
857                 return 1;
858
859         /*
860          * Peer appears dead, but we should avoid frequent NI queries (at
861          * most once per lnet_queryinterval seconds).
862          */
863         if (lp->lpni_last_query != 0) {
864                 static const int lnet_queryinterval = 1;
865
866                 cfs_time_t next_query =
867                            cfs_time_add(lp->lpni_last_query,
868                                         cfs_time_seconds(lnet_queryinterval));
869
870                 if (cfs_time_before(now, next_query)) {
871                         if (lp->lpni_alive)
872                                 CWARN("Unexpected aliveness of peer %s: "
873                                       "%d < %d (%d/%d)\n",
874                                       libcfs_nid2str(lp->lpni_nid),
875                                       (int)now, (int)next_query,
876                                       lnet_queryinterval,
877                                       lp->lpni_net->net_tunables.lct_peer_timeout);
878                         return 0;
879                 }
880         }
881
882         /* query NI for latest aliveness news */
883         lnet_ni_query_locked(ni, lp);
884
885         if (lnet_peer_is_alive(lp, now))
886                 return 1;
887
888         lnet_notify_locked(lp, 0, 0, lp->lpni_last_alive);
889         return 0;
890 }
891
892 /**
893  * \param msg The message to be sent.
894  * \param do_send True if lnet_ni_send() should be called in this function.
895  *        lnet_send() is going to lnet_net_unlock immediately after this, so
896  *        it sets do_send FALSE and I don't do the unlock/send/lock bit.
897  *
898  * \retval LNET_CREDIT_OK If \a msg sent or OK to send.
899  * \retval LNET_CREDIT_WAIT If \a msg blocked for credit.
900  * \retval -EHOSTUNREACH If the next hop of the message appears dead.
901  * \retval -ECANCELED If the MD of the message has been unlinked.
902  */
903 static int
904 lnet_post_send_locked(struct lnet_msg *msg, int do_send)
905 {
906         struct lnet_peer_ni     *lp = msg->msg_txpeer;
907         struct lnet_ni          *ni = msg->msg_txni;
908         int                     cpt = msg->msg_tx_cpt;
909         struct lnet_tx_queue    *tq = ni->ni_tx_queues[cpt];
910
911         /* non-lnet_send() callers have checked before */
912         LASSERT(!do_send || msg->msg_tx_delayed);
913         LASSERT(!msg->msg_receiving);
914         LASSERT(msg->msg_tx_committed);
915
916         /* NB 'lp' is always the next hop */
917         if ((msg->msg_target.pid & LNET_PID_USERFLAG) == 0 &&
918             lnet_peer_alive_locked(ni, lp) == 0) {
919                 the_lnet.ln_counters[cpt]->drop_count++;
920                 the_lnet.ln_counters[cpt]->drop_length += msg->msg_len;
921                 lnet_net_unlock(cpt);
922                 if (msg->msg_txpeer)
923                         lnet_incr_stats(&msg->msg_txpeer->lpni_stats,
924                                         msg->msg_type,
925                                         LNET_STATS_TYPE_DROP);
926                 if (msg->msg_txni)
927                         lnet_incr_stats(&msg->msg_txni->ni_stats,
928                                         msg->msg_type,
929                                         LNET_STATS_TYPE_DROP);
930
931                 CNETERR("Dropping message for %s: peer not alive\n",
932                         libcfs_id2str(msg->msg_target));
933                 if (do_send)
934                         lnet_finalize(msg, -EHOSTUNREACH);
935
936                 lnet_net_lock(cpt);
937                 return -EHOSTUNREACH;
938         }
939
940         if (msg->msg_md != NULL &&
941             (msg->msg_md->md_flags & LNET_MD_FLAG_ABORTED) != 0) {
942                 lnet_net_unlock(cpt);
943
944                 CNETERR("Aborting message for %s: LNetM[DE]Unlink() already "
945                         "called on the MD/ME.\n",
946                         libcfs_id2str(msg->msg_target));
947                 if (do_send)
948                         lnet_finalize(msg, -ECANCELED);
949
950                 lnet_net_lock(cpt);
951                 return -ECANCELED;
952         }
953
954         if (!msg->msg_peertxcredit) {
955                 spin_lock(&lp->lpni_lock);
956                 LASSERT((lp->lpni_txcredits < 0) ==
957                         !list_empty(&lp->lpni_txq));
958
959                 msg->msg_peertxcredit = 1;
960                 lp->lpni_txqnob += msg->msg_len + sizeof(struct lnet_hdr);
961                 lp->lpni_txcredits--;
962
963                 if (lp->lpni_txcredits < lp->lpni_mintxcredits)
964                         lp->lpni_mintxcredits = lp->lpni_txcredits;
965
966                 if (lp->lpni_txcredits < 0) {
967                         msg->msg_tx_delayed = 1;
968                         list_add_tail(&msg->msg_list, &lp->lpni_txq);
969                         spin_unlock(&lp->lpni_lock);
970                         return LNET_CREDIT_WAIT;
971                 }
972                 spin_unlock(&lp->lpni_lock);
973         }
974
975         if (!msg->msg_txcredit) {
976                 LASSERT((tq->tq_credits < 0) ==
977                         !list_empty(&tq->tq_delayed));
978
979                 msg->msg_txcredit = 1;
980                 tq->tq_credits--;
981                 atomic_dec(&ni->ni_tx_credits);
982
983                 if (tq->tq_credits < tq->tq_credits_min)
984                         tq->tq_credits_min = tq->tq_credits;
985
986                 if (tq->tq_credits < 0) {
987                         msg->msg_tx_delayed = 1;
988                         list_add_tail(&msg->msg_list, &tq->tq_delayed);
989                         return LNET_CREDIT_WAIT;
990                 }
991         }
992
993         if (do_send) {
994                 lnet_net_unlock(cpt);
995                 lnet_ni_send(ni, msg);
996                 lnet_net_lock(cpt);
997         }
998         return LNET_CREDIT_OK;
999 }
1000
1001
1002 static struct lnet_rtrbufpool *
1003 lnet_msg2bufpool(struct lnet_msg *msg)
1004 {
1005         struct lnet_rtrbufpool  *rbp;
1006         int                     cpt;
1007
1008         LASSERT(msg->msg_rx_committed);
1009
1010         cpt = msg->msg_rx_cpt;
1011         rbp = &the_lnet.ln_rtrpools[cpt][0];
1012
1013         LASSERT(msg->msg_len <= LNET_MTU);
1014         while (msg->msg_len > (unsigned int)rbp->rbp_npages * PAGE_SIZE) {
1015                 rbp++;
1016                 LASSERT(rbp < &the_lnet.ln_rtrpools[cpt][LNET_NRBPOOLS]);
1017         }
1018
1019         return rbp;
1020 }
1021
1022 static int
1023 lnet_post_routed_recv_locked(struct lnet_msg *msg, int do_recv)
1024 {
1025         /* lnet_parse is going to lnet_net_unlock immediately after this, so it
1026          * sets do_recv FALSE and I don't do the unlock/send/lock bit.
1027          * I return LNET_CREDIT_WAIT if msg blocked and LNET_CREDIT_OK if
1028          * received or OK to receive */
1029         struct lnet_peer_ni *lp = msg->msg_rxpeer;
1030         struct lnet_rtrbufpool *rbp;
1031         struct lnet_rtrbuf *rb;
1032
1033         LASSERT (msg->msg_iov == NULL);
1034         LASSERT (msg->msg_kiov == NULL);
1035         LASSERT (msg->msg_niov == 0);
1036         LASSERT (msg->msg_routing);
1037         LASSERT (msg->msg_receiving);
1038         LASSERT (!msg->msg_sending);
1039
1040         /* non-lnet_parse callers only receive delayed messages */
1041         LASSERT(!do_recv || msg->msg_rx_delayed);
1042
1043         if (!msg->msg_peerrtrcredit) {
1044                 spin_lock(&lp->lpni_lock);
1045                 LASSERT((lp->lpni_rtrcredits < 0) ==
1046                         !list_empty(&lp->lpni_rtrq));
1047
1048                 msg->msg_peerrtrcredit = 1;
1049                 lp->lpni_rtrcredits--;
1050                 if (lp->lpni_rtrcredits < lp->lpni_minrtrcredits)
1051                         lp->lpni_minrtrcredits = lp->lpni_rtrcredits;
1052
1053                 if (lp->lpni_rtrcredits < 0) {
1054                         /* must have checked eager_recv before here */
1055                         LASSERT(msg->msg_rx_ready_delay);
1056                         msg->msg_rx_delayed = 1;
1057                         list_add_tail(&msg->msg_list, &lp->lpni_rtrq);
1058                         spin_unlock(&lp->lpni_lock);
1059                         return LNET_CREDIT_WAIT;
1060                 }
1061                 spin_unlock(&lp->lpni_lock);
1062         }
1063
1064         rbp = lnet_msg2bufpool(msg);
1065
1066         if (!msg->msg_rtrcredit) {
1067                 msg->msg_rtrcredit = 1;
1068                 rbp->rbp_credits--;
1069                 if (rbp->rbp_credits < rbp->rbp_mincredits)
1070                         rbp->rbp_mincredits = rbp->rbp_credits;
1071
1072                 if (rbp->rbp_credits < 0) {
1073                         /* must have checked eager_recv before here */
1074                         LASSERT(msg->msg_rx_ready_delay);
1075                         msg->msg_rx_delayed = 1;
1076                         list_add_tail(&msg->msg_list, &rbp->rbp_msgs);
1077                         return LNET_CREDIT_WAIT;
1078                 }
1079         }
1080
1081         LASSERT(!list_empty(&rbp->rbp_bufs));
1082         rb = list_entry(rbp->rbp_bufs.next, struct lnet_rtrbuf, rb_list);
1083         list_del(&rb->rb_list);
1084
1085         msg->msg_niov = rbp->rbp_npages;
1086         msg->msg_kiov = &rb->rb_kiov[0];
1087
1088         if (do_recv) {
1089                 int cpt = msg->msg_rx_cpt;
1090
1091                 lnet_net_unlock(cpt);
1092                 lnet_ni_recv(msg->msg_rxni, msg->msg_private, msg, 1,
1093                              0, msg->msg_len, msg->msg_len);
1094                 lnet_net_lock(cpt);
1095         }
1096         return LNET_CREDIT_OK;
1097 }
1098
1099 void
1100 lnet_return_tx_credits_locked(struct lnet_msg *msg)
1101 {
1102         struct lnet_peer_ni     *txpeer = msg->msg_txpeer;
1103         struct lnet_ni          *txni = msg->msg_txni;
1104         struct lnet_msg         *msg2;
1105
1106         if (msg->msg_txcredit) {
1107                 struct lnet_ni       *ni = msg->msg_txni;
1108                 struct lnet_tx_queue *tq = ni->ni_tx_queues[msg->msg_tx_cpt];
1109
1110                 /* give back NI txcredits */
1111                 msg->msg_txcredit = 0;
1112
1113                 LASSERT((tq->tq_credits < 0) ==
1114                         !list_empty(&tq->tq_delayed));
1115
1116                 tq->tq_credits++;
1117                 atomic_inc(&ni->ni_tx_credits);
1118                 if (tq->tq_credits <= 0) {
1119                         msg2 = list_entry(tq->tq_delayed.next,
1120                                           struct lnet_msg, msg_list);
1121                         list_del(&msg2->msg_list);
1122
1123                         LASSERT(msg2->msg_txni == ni);
1124                         LASSERT(msg2->msg_tx_delayed);
1125                         LASSERT(msg2->msg_tx_cpt == msg->msg_tx_cpt);
1126
1127                         (void) lnet_post_send_locked(msg2, 1);
1128                 }
1129         }
1130
1131         if (msg->msg_peertxcredit) {
1132                 /* give back peer txcredits */
1133                 msg->msg_peertxcredit = 0;
1134
1135                 spin_lock(&txpeer->lpni_lock);
1136                 LASSERT((txpeer->lpni_txcredits < 0) ==
1137                         !list_empty(&txpeer->lpni_txq));
1138
1139                 txpeer->lpni_txqnob -= msg->msg_len + sizeof(struct lnet_hdr);
1140                 LASSERT(txpeer->lpni_txqnob >= 0);
1141
1142                 txpeer->lpni_txcredits++;
1143                 if (txpeer->lpni_txcredits <= 0) {
1144                         int msg2_cpt;
1145
1146                         msg2 = list_entry(txpeer->lpni_txq.next,
1147                                               struct lnet_msg, msg_list);
1148                         list_del(&msg2->msg_list);
1149                         spin_unlock(&txpeer->lpni_lock);
1150
1151                         LASSERT(msg2->msg_txpeer == txpeer);
1152                         LASSERT(msg2->msg_tx_delayed);
1153
1154                         msg2_cpt = msg2->msg_tx_cpt;
1155
1156                         /*
1157                          * The msg_cpt can be different from the msg2_cpt
1158                          * so we need to make sure we lock the correct cpt
1159                          * for msg2.
1160                          * Once we call lnet_post_send_locked() it is no
1161                          * longer safe to access msg2, since it could've
1162                          * been freed by lnet_finalize(), but we still
1163                          * need to relock the correct cpt, so we cache the
1164                          * msg2_cpt for the purpose of the check that
1165                          * follows the call to lnet_pose_send_locked().
1166                          */
1167                         if (msg2_cpt != msg->msg_tx_cpt) {
1168                                 lnet_net_unlock(msg->msg_tx_cpt);
1169                                 lnet_net_lock(msg2_cpt);
1170                         }
1171                         (void) lnet_post_send_locked(msg2, 1);
1172                         if (msg2_cpt != msg->msg_tx_cpt) {
1173                                 lnet_net_unlock(msg2_cpt);
1174                                 lnet_net_lock(msg->msg_tx_cpt);
1175                         }
1176                 } else {
1177                         spin_unlock(&txpeer->lpni_lock);
1178                 }
1179         }
1180
1181         if (txni != NULL) {
1182                 msg->msg_txni = NULL;
1183                 lnet_ni_decref_locked(txni, msg->msg_tx_cpt);
1184         }
1185
1186         if (txpeer != NULL) {
1187                 /*
1188                  * TODO:
1189                  * Once the patch for the health comes in we need to set
1190                  * the health of the peer ni to bad when we fail to send
1191                  * a message.
1192                  * int status = msg->msg_ev.status;
1193                  * if (status != 0)
1194                  *      lnet_set_peer_ni_health_locked(txpeer, false)
1195                  */
1196                 msg->msg_txpeer = NULL;
1197                 lnet_peer_ni_decref_locked(txpeer);
1198         }
1199 }
1200
1201 void
1202 lnet_schedule_blocked_locked(struct lnet_rtrbufpool *rbp)
1203 {
1204         struct lnet_msg *msg;
1205
1206         if (list_empty(&rbp->rbp_msgs))
1207                 return;
1208         msg = list_entry(rbp->rbp_msgs.next,
1209                          struct lnet_msg, msg_list);
1210         list_del(&msg->msg_list);
1211
1212         (void)lnet_post_routed_recv_locked(msg, 1);
1213 }
1214
1215 void
1216 lnet_drop_routed_msgs_locked(struct list_head *list, int cpt)
1217 {
1218         struct lnet_msg *msg;
1219         struct lnet_msg *tmp;
1220
1221         lnet_net_unlock(cpt);
1222
1223         list_for_each_entry_safe(msg, tmp, list, msg_list) {
1224                 lnet_ni_recv(msg->msg_rxni, msg->msg_private, NULL,
1225                              0, 0, 0, msg->msg_hdr.payload_length);
1226                 list_del_init(&msg->msg_list);
1227                 lnet_finalize(msg, -ECANCELED);
1228         }
1229
1230         lnet_net_lock(cpt);
1231 }
1232
1233 void
1234 lnet_return_rx_credits_locked(struct lnet_msg *msg)
1235 {
1236         struct lnet_peer_ni *rxpeer = msg->msg_rxpeer;
1237         struct lnet_ni *rxni = msg->msg_rxni;
1238         struct lnet_msg *msg2;
1239
1240         if (msg->msg_rtrcredit) {
1241                 /* give back global router credits */
1242                 struct lnet_rtrbuf *rb;
1243                 struct lnet_rtrbufpool *rbp;
1244
1245                 /* NB If a msg ever blocks for a buffer in rbp_msgs, it stays
1246                  * there until it gets one allocated, or aborts the wait
1247                  * itself */
1248                 LASSERT(msg->msg_kiov != NULL);
1249
1250                 rb = list_entry(msg->msg_kiov, struct lnet_rtrbuf, rb_kiov[0]);
1251                 rbp = rb->rb_pool;
1252
1253                 msg->msg_kiov = NULL;
1254                 msg->msg_rtrcredit = 0;
1255
1256                 LASSERT(rbp == lnet_msg2bufpool(msg));
1257
1258                 LASSERT((rbp->rbp_credits > 0) ==
1259                         !list_empty(&rbp->rbp_bufs));
1260
1261                 /* If routing is now turned off, we just drop this buffer and
1262                  * don't bother trying to return credits.  */
1263                 if (!the_lnet.ln_routing) {
1264                         lnet_destroy_rtrbuf(rb, rbp->rbp_npages);
1265                         goto routing_off;
1266                 }
1267
1268                 /* It is possible that a user has lowered the desired number of
1269                  * buffers in this pool.  Make sure we never put back
1270                  * more buffers than the stated number. */
1271                 if (unlikely(rbp->rbp_credits >= rbp->rbp_req_nbuffers)) {
1272                         /* Discard this buffer so we don't have too
1273                          * many. */
1274                         lnet_destroy_rtrbuf(rb, rbp->rbp_npages);
1275                         rbp->rbp_nbuffers--;
1276                 } else {
1277                         list_add(&rb->rb_list, &rbp->rbp_bufs);
1278                         rbp->rbp_credits++;
1279                         if (rbp->rbp_credits <= 0)
1280                                 lnet_schedule_blocked_locked(rbp);
1281                 }
1282         }
1283
1284 routing_off:
1285         if (msg->msg_peerrtrcredit) {
1286                 /* give back peer router credits */
1287                 msg->msg_peerrtrcredit = 0;
1288
1289                 spin_lock(&rxpeer->lpni_lock);
1290                 LASSERT((rxpeer->lpni_rtrcredits < 0) ==
1291                         !list_empty(&rxpeer->lpni_rtrq));
1292
1293                 rxpeer->lpni_rtrcredits++;
1294
1295                 /* drop all messages which are queued to be routed on that
1296                  * peer. */
1297                 if (!the_lnet.ln_routing) {
1298                         struct list_head drop;
1299                         INIT_LIST_HEAD(&drop);
1300                         list_splice_init(&rxpeer->lpni_rtrq, &drop);
1301                         spin_unlock(&rxpeer->lpni_lock);
1302                         lnet_drop_routed_msgs_locked(&drop, msg->msg_rx_cpt);
1303                 } else if (rxpeer->lpni_rtrcredits <= 0) {
1304                         msg2 = list_entry(rxpeer->lpni_rtrq.next,
1305                                           struct lnet_msg, msg_list);
1306                         list_del(&msg2->msg_list);
1307                         spin_unlock(&rxpeer->lpni_lock);
1308                         (void) lnet_post_routed_recv_locked(msg2, 1);
1309                 } else {
1310                         spin_unlock(&rxpeer->lpni_lock);
1311                 }
1312         }
1313         if (rxni != NULL) {
1314                 msg->msg_rxni = NULL;
1315                 lnet_ni_decref_locked(rxni, msg->msg_rx_cpt);
1316         }
1317         if (rxpeer != NULL) {
1318                 msg->msg_rxpeer = NULL;
1319                 lnet_peer_ni_decref_locked(rxpeer);
1320         }
1321 }
1322
1323 static int
1324 lnet_compare_peers(struct lnet_peer_ni *p1, struct lnet_peer_ni *p2)
1325 {
1326         if (p1->lpni_txqnob < p2->lpni_txqnob)
1327                 return 1;
1328
1329         if (p1->lpni_txqnob > p2->lpni_txqnob)
1330                 return -1;
1331
1332         if (p1->lpni_txcredits > p2->lpni_txcredits)
1333                 return 1;
1334
1335         if (p1->lpni_txcredits < p2->lpni_txcredits)
1336                 return -1;
1337
1338         return 0;
1339 }
1340
1341 static int
1342 lnet_compare_routes(struct lnet_route *r1, struct lnet_route *r2)
1343 {
1344         struct lnet_peer_ni *p1 = r1->lr_gateway;
1345         struct lnet_peer_ni *p2 = r2->lr_gateway;
1346         int r1_hops = (r1->lr_hops == LNET_UNDEFINED_HOPS) ? 1 : r1->lr_hops;
1347         int r2_hops = (r2->lr_hops == LNET_UNDEFINED_HOPS) ? 1 : r2->lr_hops;
1348         int rc;
1349
1350         if (r1->lr_priority < r2->lr_priority)
1351                 return 1;
1352
1353         if (r1->lr_priority > r2->lr_priority)
1354                 return -1;
1355
1356         if (r1_hops < r2_hops)
1357                 return 1;
1358
1359         if (r1_hops > r2_hops)
1360                 return -1;
1361
1362         rc = lnet_compare_peers(p1, p2);
1363         if (rc)
1364                 return rc;
1365
1366         if (r1->lr_seq - r2->lr_seq <= 0)
1367                 return 1;
1368
1369         return -1;
1370 }
1371
1372 static struct lnet_peer_ni *
1373 lnet_find_route_locked(struct lnet_net *net, lnet_nid_t target,
1374                        lnet_nid_t rtr_nid)
1375 {
1376         struct lnet_remotenet   *rnet;
1377         struct lnet_route               *route;
1378         struct lnet_route               *best_route;
1379         struct lnet_route               *last_route;
1380         struct lnet_peer_ni     *lpni_best;
1381         struct lnet_peer_ni     *lp;
1382         int                     rc;
1383
1384         /* If @rtr_nid is not LNET_NID_ANY, return the gateway with
1385          * rtr_nid nid, otherwise find the best gateway I can use */
1386
1387         rnet = lnet_find_rnet_locked(LNET_NIDNET(target));
1388         if (rnet == NULL)
1389                 return NULL;
1390
1391         lpni_best = NULL;
1392         best_route = last_route = NULL;
1393         list_for_each_entry(route, &rnet->lrn_routes, lr_list) {
1394                 lp = route->lr_gateway;
1395
1396                 if (!lnet_is_route_alive(route))
1397                         continue;
1398
1399                 if (net != NULL && lp->lpni_net != net)
1400                         continue;
1401
1402                 if (lp->lpni_nid == rtr_nid) /* it's pre-determined router */
1403                         return lp;
1404
1405                 if (lpni_best == NULL) {
1406                         best_route = last_route = route;
1407                         lpni_best = lp;
1408                         continue;
1409                 }
1410
1411                 /* no protection on below fields, but it's harmless */
1412                 if (last_route->lr_seq - route->lr_seq < 0)
1413                         last_route = route;
1414
1415                 rc = lnet_compare_routes(route, best_route);
1416                 if (rc < 0)
1417                         continue;
1418
1419                 best_route = route;
1420                 lpni_best = lp;
1421         }
1422
1423         /* set sequence number on the best router to the latest sequence + 1
1424          * so we can round-robin all routers, it's race and inaccurate but
1425          * harmless and functional  */
1426         if (best_route != NULL)
1427                 best_route->lr_seq = last_route->lr_seq + 1;
1428         return lpni_best;
1429 }
1430
1431 static struct lnet_ni *
1432 lnet_get_best_ni(struct lnet_net *local_net, struct lnet_ni *cur_ni,
1433                  int md_cpt)
1434 {
1435         struct lnet_ni *ni = NULL, *best_ni = cur_ni;
1436         unsigned int shortest_distance;
1437         int best_credits;
1438
1439         if (best_ni == NULL) {
1440                 shortest_distance = UINT_MAX;
1441                 best_credits = INT_MIN;
1442         } else {
1443                 shortest_distance = cfs_cpt_distance(lnet_cpt_table(), md_cpt,
1444                                                      best_ni->ni_dev_cpt);
1445                 best_credits = atomic_read(&best_ni->ni_tx_credits);
1446         }
1447
1448         while ((ni = lnet_get_next_ni_locked(local_net, ni))) {
1449                 unsigned int distance;
1450                 int ni_credits;
1451
1452                 if (!lnet_is_ni_healthy_locked(ni))
1453                         continue;
1454
1455                 ni_credits = atomic_read(&ni->ni_tx_credits);
1456
1457                 /*
1458                  * calculate the distance from the CPT on which
1459                  * the message memory is allocated to the CPT of
1460                  * the NI's physical device
1461                  */
1462                 distance = cfs_cpt_distance(lnet_cpt_table(),
1463                                             md_cpt,
1464                                             ni->ni_dev_cpt);
1465
1466                 /*
1467                  * All distances smaller than the NUMA range
1468                  * are treated equally.
1469                  */
1470                 if (distance < lnet_numa_range)
1471                         distance = lnet_numa_range;
1472
1473                 /*
1474                  * Select on shorter distance, then available
1475                  * credits, then round-robin.
1476                  */
1477                 if (distance > shortest_distance) {
1478                         continue;
1479                 } else if (distance < shortest_distance) {
1480                         shortest_distance = distance;
1481                 } else if (ni_credits < best_credits) {
1482                         continue;
1483                 } else if (ni_credits == best_credits) {
1484                         if (best_ni && (best_ni)->ni_seq <= ni->ni_seq)
1485                                 continue;
1486                 }
1487                 best_ni = ni;
1488                 best_credits = ni_credits;
1489         }
1490
1491         return best_ni;
1492 }
1493
1494 /*
1495  * Traffic to the LNET_RESERVED_PORTAL may not trigger peer discovery,
1496  * because such traffic is required to perform discovery. We therefore
1497  * exclude all GET and PUT on that portal. We also exclude all ACK and
1498  * REPLY traffic, but that is because the portal is not tracked in the
1499  * message structure for these message types. We could restrict this
1500  * further by also checking for LNET_PROTO_PING_MATCHBITS.
1501  */
1502 static bool
1503 lnet_msg_discovery(struct lnet_msg *msg)
1504 {
1505         if (msg->msg_type == LNET_MSG_PUT) {
1506                 if (msg->msg_hdr.msg.put.ptl_index != LNET_RESERVED_PORTAL)
1507                         return true;
1508         } else if (msg->msg_type == LNET_MSG_GET) {
1509                 if (msg->msg_hdr.msg.get.ptl_index != LNET_RESERVED_PORTAL)
1510                         return true;
1511         }
1512         return false;
1513 }
1514
1515 static int
1516 lnet_select_pathway(lnet_nid_t src_nid, lnet_nid_t dst_nid,
1517                     struct lnet_msg *msg, lnet_nid_t rtr_nid)
1518 {
1519         struct lnet_ni          *best_ni;
1520         struct lnet_peer_ni     *best_lpni;
1521         struct lnet_peer_ni     *best_gw;
1522         struct lnet_peer_ni     *lpni;
1523         struct lnet_peer_ni     *final_dst;
1524         struct lnet_peer        *peer;
1525         struct lnet_peer_net    *peer_net;
1526         struct lnet_net         *local_net;
1527         int                     cpt, cpt2, rc;
1528         bool                    routing;
1529         bool                    routing2;
1530         bool                    ni_is_pref;
1531         bool                    preferred;
1532         bool                    local_found;
1533         int                     best_lpni_credits;
1534         int                     md_cpt;
1535
1536         /*
1537          * get an initial CPT to use for locking. The idea here is not to
1538          * serialize the calls to select_pathway, so that as many
1539          * operations can run concurrently as possible. To do that we use
1540          * the CPT where this call is being executed. Later on when we
1541          * determine the CPT to use in lnet_message_commit, we switch the
1542          * lock and check if there was any configuration change.  If none,
1543          * then we proceed, if there is, then we restart the operation.
1544          */
1545         cpt = lnet_net_lock_current();
1546
1547         md_cpt = lnet_cpt_of_md(msg->msg_md, msg->msg_offset);
1548         if (md_cpt == CFS_CPT_ANY)
1549                 md_cpt = cpt;
1550
1551 again:
1552         best_ni = NULL;
1553         best_lpni = NULL;
1554         best_gw = NULL;
1555         final_dst = NULL;
1556         local_net = NULL;
1557         routing = false;
1558         routing2 = false;
1559         local_found = false;
1560
1561         /*
1562          * lnet_nid2peerni_locked() is the path that will find an
1563          * existing peer_ni, or create one and mark it as having been
1564          * created due to network traffic.
1565          */
1566         lpni = lnet_nid2peerni_locked(dst_nid, LNET_NID_ANY, cpt);
1567         if (IS_ERR(lpni)) {
1568                 lnet_net_unlock(cpt);
1569                 return PTR_ERR(lpni);
1570         }
1571
1572         /*
1573          * If we're being asked to send to the loopback interface, there
1574          * is no need to go through any selection. We can just shortcut
1575          * the entire process and send over lolnd
1576          */
1577         if (LNET_NETTYP(LNET_NIDNET(dst_nid)) == LOLND) {
1578                 lnet_peer_ni_decref_locked(lpni);
1579                 best_ni = the_lnet.ln_loni;
1580                 goto send;
1581         }
1582
1583         /*
1584          * Now that we have a peer_ni, check if we want to discover
1585          * the peer. Traffic to the LNET_RESERVED_PORTAL should not
1586          * trigger discovery.
1587          */
1588         peer = lpni->lpni_peer_net->lpn_peer;
1589         if (lnet_msg_discovery(msg) && !lnet_peer_is_uptodate(peer)) {
1590                 rc = lnet_discover_peer_locked(lpni, cpt, false);
1591                 if (rc) {
1592                         lnet_peer_ni_decref_locked(lpni);
1593                         lnet_net_unlock(cpt);
1594                         return rc;
1595                 }
1596                 /* The peer may have changed. */
1597                 peer = lpni->lpni_peer_net->lpn_peer;
1598                 /* queue message and return */
1599                 msg->msg_src_nid_param = src_nid;
1600                 msg->msg_rtr_nid_param = rtr_nid;
1601                 msg->msg_sending = 0;
1602                 list_add_tail(&msg->msg_list, &peer->lp_dc_pendq);
1603                 lnet_peer_ni_decref_locked(lpni);
1604                 lnet_net_unlock(cpt);
1605
1606                 CDEBUG(D_NET, "%s pending discovery\n",
1607                        libcfs_nid2str(peer->lp_primary_nid));
1608
1609                 return LNET_DC_WAIT;
1610         }
1611         lnet_peer_ni_decref_locked(lpni);
1612
1613         /* If peer is not healthy then can not send anything to it */
1614         if (!lnet_is_peer_healthy_locked(peer)) {
1615                 lnet_net_unlock(cpt);
1616                 return -EHOSTUNREACH;
1617         }
1618
1619         /*
1620          * STEP 1: first jab at determining best_ni
1621          * if src_nid is explicitly specified, then best_ni is already
1622          * pre-determiend for us. Otherwise we need to select the best
1623          * one to use later on
1624          */
1625         if (src_nid != LNET_NID_ANY) {
1626                 best_ni = lnet_nid2ni_locked(src_nid, cpt);
1627                 if (!best_ni) {
1628                         lnet_net_unlock(cpt);
1629                         LCONSOLE_WARN("Can't send to %s: src %s is not a "
1630                                       "local nid\n", libcfs_nid2str(dst_nid),
1631                                       libcfs_nid2str(src_nid));
1632                         return -EINVAL;
1633                 }
1634         }
1635
1636         if (msg->msg_type == LNET_MSG_REPLY ||
1637             msg->msg_type == LNET_MSG_ACK ||
1638             !lnet_peer_is_multi_rail(peer) ||
1639             best_ni) {
1640                 /*
1641                  * for replies we want to respond on the same peer_ni we
1642                  * received the message on if possible. If not, then pick
1643                  * a peer_ni to send to
1644                  *
1645                  * if the peer is non-multi-rail then you want to send to
1646                  * the dst_nid provided as well.
1647                  *
1648                  * If the best_ni has already been determined, IE the
1649                  * src_nid has been specified, then use the
1650                  * destination_nid provided as well, since we're
1651                  * continuing a series of related messages for the same
1652                  * RPC.
1653                  *
1654                  * It is expected to find the lpni using dst_nid, since we
1655                  * created it earlier.
1656                  */
1657                 best_lpni = lnet_find_peer_ni_locked(dst_nid);
1658                 if (best_lpni)
1659                         lnet_peer_ni_decref_locked(best_lpni);
1660
1661                 if (best_lpni && !lnet_get_net_locked(LNET_NIDNET(dst_nid))) {
1662                         /*
1663                          * this lpni is not on a local network so we need
1664                          * to route this reply.
1665                          */
1666                         best_gw = lnet_find_route_locked(NULL,
1667                                                          best_lpni->lpni_nid,
1668                                                          rtr_nid);
1669                         if (best_gw) {
1670                                 /*
1671                                 * RULE: Each node considers only the next-hop
1672                                 *
1673                                 * We're going to route the message, so change the peer to
1674                                 * the router.
1675                                 */
1676                                 LASSERT(best_gw->lpni_peer_net);
1677                                 LASSERT(best_gw->lpni_peer_net->lpn_peer);
1678                                 peer = best_gw->lpni_peer_net->lpn_peer;
1679
1680                                 /*
1681                                 * if the router is not multi-rail then use the best_gw
1682                                 * found to send the message to
1683                                 */
1684                                 if (!lnet_peer_is_multi_rail(peer))
1685                                         best_lpni = best_gw;
1686                                 else
1687                                         best_lpni = NULL;
1688
1689                                 routing = true;
1690                         } else {
1691                                 best_lpni = NULL;
1692                         }
1693                 } else if (!best_lpni) {
1694                         lnet_net_unlock(cpt);
1695                         CERROR("unable to send msg_type %d to "
1696                               "originating %s. Destination NID not in DB\n",
1697                               msg->msg_type, libcfs_nid2str(dst_nid));
1698                         return -EINVAL;
1699                 }
1700         }
1701
1702         /*
1703          * We must use a consistent source address when sending to a
1704          * non-MR peer. However, a non-MR peer can have multiple NIDs
1705          * on multiple networks, and we may even need to talk to this
1706          * peer on multiple networks -- certain types of
1707          * load-balancing configuration do this.
1708          *
1709          * So we need to pick the NI the peer prefers for this
1710          * particular network.
1711          */
1712         if (!lnet_peer_is_multi_rail(peer)) {
1713                 if (!best_lpni) {
1714                         lnet_net_unlock(cpt);
1715                         CERROR("no route to %s\n",
1716                                libcfs_nid2str(dst_nid));
1717                         return -EHOSTUNREACH;
1718                 }
1719
1720                 /* best ni is already set if src_nid was provided */
1721                 if (!best_ni) {
1722                         /* Get the target peer_ni */
1723                         peer_net = lnet_peer_get_net_locked(peer,
1724                                         LNET_NIDNET(best_lpni->lpni_nid));
1725                         LASSERT(peer_net != NULL);
1726                         list_for_each_entry(lpni, &peer_net->lpn_peer_nis,
1727                                             lpni_peer_nis) {
1728                                 if (lpni->lpni_pref_nnids == 0)
1729                                         continue;
1730                                 LASSERT(lpni->lpni_pref_nnids == 1);
1731                                 best_ni = lnet_nid2ni_locked(
1732                                                 lpni->lpni_pref.nid, cpt);
1733                                 break;
1734                         }
1735                 }
1736                 /* if best_ni is still not set just pick one */
1737                 if (!best_ni) {
1738                         best_ni = lnet_net2ni_locked(
1739                                 best_lpni->lpni_net->net_id, cpt);
1740                         /* If there is no best_ni we don't have a route */
1741                         if (!best_ni) {
1742                                 lnet_net_unlock(cpt);
1743                                 CERROR("no path to %s from net %s\n",
1744                                         libcfs_nid2str(best_lpni->lpni_nid),
1745                                         libcfs_net2str(best_lpni->lpni_net->net_id));
1746                                 return -EHOSTUNREACH;
1747                         }
1748                         lpni = list_entry(peer_net->lpn_peer_nis.next,
1749                                         struct lnet_peer_ni,
1750                                         lpni_peer_nis);
1751                 }
1752                 /* Set preferred NI if necessary. */
1753                 if (lpni->lpni_pref_nnids == 0)
1754                         lnet_peer_ni_set_non_mr_pref_nid(lpni, best_ni->ni_nid);
1755         }
1756
1757         /*
1758          * if we already found a best_ni because src_nid is specified and
1759          * best_lpni because we are replying to a message then just send
1760          * the message
1761          */
1762         if (best_ni && best_lpni)
1763                 goto send;
1764
1765         /*
1766          * If we already found a best_ni because src_nid is specified then
1767          * pick the peer then send the message
1768          */
1769         if (best_ni)
1770                 goto pick_peer;
1771
1772         /*
1773          * pick the best_ni by going through all the possible networks of
1774          * that peer and see which local NI is best suited to talk to that
1775          * peer.
1776          *
1777          * Locally connected networks will always be preferred over
1778          * a routed network. If there are only routed paths to the peer,
1779          * then the best route is chosen. If all routes are equal then
1780          * they are used in round robin.
1781          */
1782         list_for_each_entry(peer_net, &peer->lp_peer_nets, lpn_peer_nets) {
1783                 if (!lnet_is_peer_net_healthy_locked(peer_net))
1784                         continue;
1785
1786                 local_net = lnet_get_net_locked(peer_net->lpn_net_id);
1787                 if (!local_net && !routing && !local_found) {
1788                         struct lnet_peer_ni *net_gw;
1789
1790                         lpni = list_entry(peer_net->lpn_peer_nis.next,
1791                                           struct lnet_peer_ni,
1792                                           lpni_peer_nis);
1793
1794                         net_gw = lnet_find_route_locked(NULL,
1795                                                         lpni->lpni_nid,
1796                                                         rtr_nid);
1797                         if (!net_gw)
1798                                 continue;
1799
1800                         if (best_gw) {
1801                                 /*
1802                                  * lnet_find_route_locked() call
1803                                  * will return the best_Gw on the
1804                                  * lpni->lpni_nid network.
1805                                  * However, best_gw and net_gw can
1806                                  * be on different networks.
1807                                  * Therefore need to compare them
1808                                  * to pick the better of either.
1809                                  */
1810                                 if (lnet_compare_peers(best_gw, net_gw) > 0)
1811                                         continue;
1812                                 if (best_gw->lpni_gw_seq <= net_gw->lpni_gw_seq)
1813                                         continue;
1814                         }
1815                         best_gw = net_gw;
1816                         final_dst = lpni;
1817
1818                         routing2 = true;
1819                 } else {
1820                         best_gw = NULL;
1821                         final_dst = NULL;
1822                         routing2 = false;
1823                         local_found = true;
1824                 }
1825
1826                 /*
1827                  * a gw on this network is found, but there could be
1828                  * other better gateways on other networks. So don't pick
1829                  * the best_ni until we determine the best_gw.
1830                  */
1831                 if (best_gw)
1832                         continue;
1833
1834                 /* if no local_net found continue */
1835                 if (!local_net)
1836                         continue;
1837
1838                 /*
1839                  * Iterate through the NIs in this local Net and select
1840                  * the NI to send from. The selection is determined by
1841                  * these 3 criterion in the following priority:
1842                  *      1. NUMA
1843                  *      2. NI available credits
1844                  *      3. Round Robin
1845                  */
1846                 best_ni = lnet_get_best_ni(local_net, best_ni, md_cpt);
1847         }
1848
1849         if (!best_ni && !best_gw) {
1850                 lnet_net_unlock(cpt);
1851                 LCONSOLE_WARN("No local ni found to send from to %s\n",
1852                         libcfs_nid2str(dst_nid));
1853                 return -EINVAL;
1854         }
1855
1856         if (!best_ni) {
1857                 best_ni = lnet_get_best_ni(best_gw->lpni_net, best_ni, md_cpt);
1858                 LASSERT(best_gw && best_ni);
1859
1860                 /*
1861                  * We're going to route the message, so change the peer to
1862                  * the router.
1863                  */
1864                 LASSERT(best_gw->lpni_peer_net);
1865                 LASSERT(best_gw->lpni_peer_net->lpn_peer);
1866                 best_gw->lpni_gw_seq++;
1867                 peer = best_gw->lpni_peer_net->lpn_peer;
1868         }
1869
1870         /*
1871          * Now that we selected the NI to use increment its sequence
1872          * number so the Round Robin algorithm will detect that it has
1873          * been used and pick the next NI.
1874          */
1875         best_ni->ni_seq++;
1876
1877 pick_peer:
1878         /*
1879          * At this point the best_ni is on a local network on which
1880          * the peer has a peer_ni as well
1881          */
1882         peer_net = lnet_peer_get_net_locked(peer,
1883                                             best_ni->ni_net->net_id);
1884         /*
1885          * peer_net is not available or the src_nid is explicitly defined
1886          * and the peer_net for that src_nid is unhealthy. find a route to
1887          * the destination nid.
1888          */
1889         if (!peer_net ||
1890             (src_nid != LNET_NID_ANY &&
1891              !lnet_is_peer_net_healthy_locked(peer_net))) {
1892                 best_gw = lnet_find_route_locked(best_ni->ni_net,
1893                                                  dst_nid,
1894                                                  rtr_nid);
1895                 /*
1896                  * if no route is found for that network then
1897                  * move onto the next peer_ni in the peer
1898                  */
1899                 if (!best_gw) {
1900                         lnet_net_unlock(cpt);
1901                         LCONSOLE_WARN("No route to peer from %s\n",
1902                                 libcfs_nid2str(best_ni->ni_nid));
1903                         return -EHOSTUNREACH;
1904                 }
1905
1906                 CDEBUG(D_NET, "Best route to %s via %s for %s %d\n",
1907                         libcfs_nid2str(dst_nid),
1908                         libcfs_nid2str(best_gw->lpni_nid),
1909                         lnet_msgtyp2str(msg->msg_type), msg->msg_len);
1910
1911                 routing2 = true;
1912                 /*
1913                  * RULE: Each node considers only the next-hop
1914                  *
1915                  * We're going to route the message, so change the peer to
1916                  * the router.
1917                  */
1918                 LASSERT(best_gw->lpni_peer_net);
1919                 LASSERT(best_gw->lpni_peer_net->lpn_peer);
1920                 peer = best_gw->lpni_peer_net->lpn_peer;
1921         } else if (!lnet_is_peer_net_healthy_locked(peer_net)) {
1922                 /*
1923                  * this peer_net is unhealthy but we still have an opportunity
1924                  * to find another peer_net that we can use
1925                  */
1926                 __u32 net_id = peer_net->lpn_net_id;
1927                 LCONSOLE_WARN("peer net %s unhealthy\n",
1928                               libcfs_net2str(net_id));
1929                 goto again;
1930         }
1931
1932         /*
1933          * Look at the peer NIs for the destination peer that connect
1934          * to the chosen net. If a peer_ni is preferred when using the
1935          * best_ni to communicate, we use that one. If there is no
1936          * preferred peer_ni, or there are multiple preferred peer_ni,
1937          * the available transmit credits are used. If the transmit
1938          * credits are equal, we round-robin over the peer_ni.
1939          */
1940         lpni = NULL;
1941         best_lpni_credits = INT_MIN;
1942         preferred = false;
1943         best_lpni = NULL;
1944         while ((lpni = lnet_get_next_peer_ni_locked(peer, peer_net, lpni))) {
1945                 /*
1946                  * if this peer ni is not healthy just skip it, no point in
1947                  * examining it further
1948                  */
1949                 if (!lnet_is_peer_ni_healthy_locked(lpni))
1950                         continue;
1951                 ni_is_pref = lnet_peer_is_pref_nid_locked(lpni,
1952                                                           best_ni->ni_nid);
1953
1954                 /* if this is a preferred peer use it */
1955                 if (!preferred && ni_is_pref) {
1956                         preferred = true;
1957                 } else if (preferred && !ni_is_pref) {
1958                         /*
1959                          * this is not the preferred peer so let's ignore
1960                          * it.
1961                          */
1962                         continue;
1963                 } else if (lpni->lpni_txcredits < best_lpni_credits) {
1964                         /*
1965                          * We already have a peer that has more credits
1966                          * available than this one. No need to consider
1967                          * this peer further.
1968                          */
1969                         continue;
1970                 } else if (lpni->lpni_txcredits == best_lpni_credits) {
1971                         /*
1972                          * The best peer found so far and the current peer
1973                          * have the same number of available credits let's
1974                          * make sure to select between them using Round
1975                          * Robin
1976                          */
1977                         if (best_lpni) {
1978                                 if (best_lpni->lpni_seq <= lpni->lpni_seq)
1979                                         continue;
1980                         }
1981                 }
1982
1983                 best_lpni = lpni;
1984                 best_lpni_credits = lpni->lpni_txcredits;
1985         }
1986
1987         /* if we still can't find a peer ni then we can't reach it */
1988         if (!best_lpni) {
1989                 __u32 net_id = (peer_net) ? peer_net->lpn_net_id :
1990                         LNET_NIDNET(dst_nid);
1991                 lnet_net_unlock(cpt);
1992                 LCONSOLE_WARN("no peer_ni found on peer net %s\n",
1993                                 libcfs_net2str(net_id));
1994                 return -EHOSTUNREACH;
1995         }
1996
1997
1998 send:
1999         /* Shortcut for loopback. */
2000         if (best_ni == the_lnet.ln_loni) {
2001                 /* No send credit hassles with LOLND */
2002                 lnet_ni_addref_locked(best_ni, cpt);
2003                 msg->msg_hdr.dest_nid = cpu_to_le64(best_ni->ni_nid);
2004                 if (!msg->msg_routing)
2005                         msg->msg_hdr.src_nid = cpu_to_le64(best_ni->ni_nid);
2006                 msg->msg_target.nid = best_ni->ni_nid;
2007                 lnet_msg_commit(msg, cpt);
2008                 msg->msg_txni = best_ni;
2009                 lnet_net_unlock(cpt);
2010
2011                 return LNET_CREDIT_OK;
2012         }
2013
2014         routing = routing || routing2;
2015
2016         /*
2017          * Increment sequence number of the peer selected so that we
2018          * pick the next one in Round Robin.
2019          */
2020         best_lpni->lpni_seq++;
2021
2022         /*
2023          * grab a reference on the peer_ni so it sticks around even if
2024          * we need to drop and relock the lnet_net_lock below.
2025          */
2026         lnet_peer_ni_addref_locked(best_lpni);
2027
2028         /*
2029          * Use lnet_cpt_of_nid() to determine the CPT used to commit the
2030          * message. This ensures that we get a CPT that is correct for
2031          * the NI when the NI has been restricted to a subset of all CPTs.
2032          * If the selected CPT differs from the one currently locked, we
2033          * must unlock and relock the lnet_net_lock(), and then check whether
2034          * the configuration has changed. We don't have a hold on the best_ni
2035          * yet, and it may have vanished.
2036          */
2037         cpt2 = lnet_cpt_of_nid_locked(best_lpni->lpni_nid, best_ni);
2038         if (cpt != cpt2) {
2039                 __u32 seq = lnet_get_dlc_seq_locked();
2040                 lnet_net_unlock(cpt);
2041                 cpt = cpt2;
2042                 lnet_net_lock(cpt);
2043                 if (seq != lnet_get_dlc_seq_locked()) {
2044                         lnet_peer_ni_decref_locked(best_lpni);
2045                         goto again;
2046                 }
2047         }
2048
2049         /*
2050          * store the best_lpni in the message right away to avoid having
2051          * to do the same operation under different conditions
2052          */
2053         msg->msg_txpeer = best_lpni;
2054         msg->msg_txni = best_ni;
2055
2056         /*
2057          * grab a reference for the best_ni since now it's in use in this
2058          * send. the reference will need to be dropped when the message is
2059          * finished in lnet_finalize()
2060          */
2061         lnet_ni_addref_locked(msg->msg_txni, cpt);
2062
2063         /*
2064          * Always set the target.nid to the best peer picked. Either the
2065          * nid will be one of the preconfigured NIDs, or the same NID as
2066          * what was originally set in the target or it will be the NID of
2067          * a router if this message should be routed
2068          */
2069         msg->msg_target.nid = msg->msg_txpeer->lpni_nid;
2070
2071         /*
2072          * lnet_msg_commit assigns the correct cpt to the message, which
2073          * is used to decrement the correct refcount on the ni when it's
2074          * time to return the credits
2075          */
2076         lnet_msg_commit(msg, cpt);
2077
2078         /*
2079          * If we are routing the message then we don't need to overwrite
2080          * the src_nid since it would've been set at the origin. Otherwise
2081          * we are the originator so we need to set it.
2082          */
2083         if (!msg->msg_routing)
2084                 msg->msg_hdr.src_nid = cpu_to_le64(msg->msg_txni->ni_nid);
2085
2086         if (routing) {
2087                 msg->msg_target_is_router = 1;
2088                 msg->msg_target.pid = LNET_PID_LUSTRE;
2089                 /*
2090                  * since we're routing we want to ensure that the
2091                  * msg_hdr.dest_nid is set to the final destination. When
2092                  * the router receives this message it knows how to route
2093                  * it.
2094                  */
2095                 msg->msg_hdr.dest_nid =
2096                         cpu_to_le64(final_dst ? final_dst->lpni_nid : dst_nid);
2097         } else {
2098                 /*
2099                  * if we're not routing set the dest_nid to the best peer
2100                  * ni that we picked earlier in the algorithm.
2101                  */
2102                 msg->msg_hdr.dest_nid = cpu_to_le64(msg->msg_txpeer->lpni_nid);
2103         }
2104
2105         rc = lnet_post_send_locked(msg, 0);
2106
2107         if (!rc)
2108                 CDEBUG(D_NET, "TRACE: %s(%s:%s) -> %s(%s:%s) : %s\n",
2109                        libcfs_nid2str(msg->msg_hdr.src_nid),
2110                        libcfs_nid2str(msg->msg_txni->ni_nid),
2111                        libcfs_nid2str(src_nid),
2112                        libcfs_nid2str(msg->msg_hdr.dest_nid),
2113                        libcfs_nid2str(dst_nid),
2114                        libcfs_nid2str(msg->msg_txpeer->lpni_nid),
2115                        lnet_msgtyp2str(msg->msg_type));
2116
2117         lnet_net_unlock(cpt);
2118
2119         return rc;
2120 }
2121
2122 int
2123 lnet_send(lnet_nid_t src_nid, struct lnet_msg *msg, lnet_nid_t rtr_nid)
2124 {
2125         lnet_nid_t              dst_nid = msg->msg_target.nid;
2126         int                     rc;
2127
2128         /*
2129          * NB: rtr_nid is set to LNET_NID_ANY for all current use-cases,
2130          * but we might want to use pre-determined router for ACK/REPLY
2131          * in the future
2132          */
2133         /* NB: ni != NULL == interface pre-determined (ACK/REPLY) */
2134         LASSERT (msg->msg_txpeer == NULL);
2135         LASSERT (!msg->msg_sending);
2136         LASSERT (!msg->msg_target_is_router);
2137         LASSERT (!msg->msg_receiving);
2138
2139         msg->msg_sending = 1;
2140
2141         LASSERT(!msg->msg_tx_committed);
2142
2143         rc = lnet_select_pathway(src_nid, dst_nid, msg, rtr_nid);
2144         if (rc < 0)
2145                 return rc;
2146
2147         if (rc == LNET_CREDIT_OK)
2148                 lnet_ni_send(msg->msg_txni, msg);
2149
2150         /* rc == LNET_CREDIT_OK or LNET_CREDIT_WAIT or LNET_DC_WAIT */
2151         return 0;
2152 }
2153
2154 void
2155 lnet_drop_message(struct lnet_ni *ni, int cpt, void *private, unsigned int nob,
2156                   __u32 msg_type)
2157 {
2158         lnet_net_lock(cpt);
2159         lnet_incr_stats(&ni->ni_stats, msg_type, LNET_STATS_TYPE_DROP);
2160         the_lnet.ln_counters[cpt]->drop_count++;
2161         the_lnet.ln_counters[cpt]->drop_length += nob;
2162         lnet_net_unlock(cpt);
2163
2164         lnet_ni_recv(ni, private, NULL, 0, 0, 0, nob);
2165 }
2166
2167 static void
2168 lnet_recv_put(struct lnet_ni *ni, struct lnet_msg *msg)
2169 {
2170         struct lnet_hdr *hdr = &msg->msg_hdr;
2171
2172         if (msg->msg_wanted != 0)
2173                 lnet_setpayloadbuffer(msg);
2174
2175         lnet_build_msg_event(msg, LNET_EVENT_PUT);
2176
2177         /* Must I ACK?  If so I'll grab the ack_wmd out of the header and put
2178          * it back into the ACK during lnet_finalize() */
2179         msg->msg_ack = (!lnet_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
2180                         (msg->msg_md->md_options & LNET_MD_ACK_DISABLE) == 0);
2181
2182         lnet_ni_recv(ni, msg->msg_private, msg, msg->msg_rx_delayed,
2183                      msg->msg_offset, msg->msg_wanted, hdr->payload_length);
2184 }
2185
2186 static int
2187 lnet_parse_put(struct lnet_ni *ni, struct lnet_msg *msg)
2188 {
2189         struct lnet_hdr         *hdr = &msg->msg_hdr;
2190         struct lnet_match_info  info;
2191         int                     rc;
2192         bool                    ready_delay;
2193
2194         /* Convert put fields to host byte order */
2195         hdr->msg.put.match_bits = le64_to_cpu(hdr->msg.put.match_bits);
2196         hdr->msg.put.ptl_index  = le32_to_cpu(hdr->msg.put.ptl_index);
2197         hdr->msg.put.offset     = le32_to_cpu(hdr->msg.put.offset);
2198
2199         /* Primary peer NID. */
2200         info.mi_id.nid  = msg->msg_initiator;
2201         info.mi_id.pid  = hdr->src_pid;
2202         info.mi_opc     = LNET_MD_OP_PUT;
2203         info.mi_portal  = hdr->msg.put.ptl_index;
2204         info.mi_rlength = hdr->payload_length;
2205         info.mi_roffset = hdr->msg.put.offset;
2206         info.mi_mbits   = hdr->msg.put.match_bits;
2207         info.mi_cpt     = lnet_cpt_of_nid(msg->msg_rxpeer->lpni_nid, ni);
2208
2209         msg->msg_rx_ready_delay = ni->ni_net->net_lnd->lnd_eager_recv == NULL;
2210         ready_delay = msg->msg_rx_ready_delay;
2211
2212  again:
2213         rc = lnet_ptl_match_md(&info, msg);
2214         switch (rc) {
2215         default:
2216                 LBUG();
2217
2218         case LNET_MATCHMD_OK:
2219                 lnet_recv_put(ni, msg);
2220                 return 0;
2221
2222         case LNET_MATCHMD_NONE:
2223                 if (ready_delay)
2224                         /* no eager_recv or has already called it, should
2225                          * have been attached on delayed list */
2226                         return 0;
2227
2228                 rc = lnet_ni_eager_recv(ni, msg);
2229                 if (rc == 0) {
2230                         ready_delay = true;
2231                         goto again;
2232                 }
2233                 /* fall through */
2234
2235         case LNET_MATCHMD_DROP:
2236                 CNETERR("Dropping PUT from %s portal %d match %llu"
2237                         " offset %d length %d: %d\n",
2238                         libcfs_id2str(info.mi_id), info.mi_portal,
2239                         info.mi_mbits, info.mi_roffset, info.mi_rlength, rc);
2240
2241                 return -ENOENT; /* -ve: OK but no match */
2242         }
2243 }
2244
2245 static int
2246 lnet_parse_get(struct lnet_ni *ni, struct lnet_msg *msg, int rdma_get)
2247 {
2248         struct lnet_match_info info;
2249         struct lnet_hdr *hdr = &msg->msg_hdr;
2250         struct lnet_process_id source_id;
2251         struct lnet_handle_wire reply_wmd;
2252         int rc;
2253
2254         /* Convert get fields to host byte order */
2255         hdr->msg.get.match_bits   = le64_to_cpu(hdr->msg.get.match_bits);
2256         hdr->msg.get.ptl_index    = le32_to_cpu(hdr->msg.get.ptl_index);
2257         hdr->msg.get.sink_length  = le32_to_cpu(hdr->msg.get.sink_length);
2258         hdr->msg.get.src_offset   = le32_to_cpu(hdr->msg.get.src_offset);
2259
2260         source_id.nid = hdr->src_nid;
2261         source_id.pid = hdr->src_pid;
2262         /* Primary peer NID */
2263         info.mi_id.nid  = msg->msg_initiator;
2264         info.mi_id.pid  = hdr->src_pid;
2265         info.mi_opc     = LNET_MD_OP_GET;
2266         info.mi_portal  = hdr->msg.get.ptl_index;
2267         info.mi_rlength = hdr->msg.get.sink_length;
2268         info.mi_roffset = hdr->msg.get.src_offset;
2269         info.mi_mbits   = hdr->msg.get.match_bits;
2270         info.mi_cpt     = lnet_cpt_of_nid(msg->msg_rxpeer->lpni_nid, ni);
2271
2272         rc = lnet_ptl_match_md(&info, msg);
2273         if (rc == LNET_MATCHMD_DROP) {
2274                 CNETERR("Dropping GET from %s portal %d match %llu"
2275                         " offset %d length %d\n",
2276                         libcfs_id2str(info.mi_id), info.mi_portal,
2277                         info.mi_mbits, info.mi_roffset, info.mi_rlength);
2278                 return -ENOENT; /* -ve: OK but no match */
2279         }
2280
2281         LASSERT(rc == LNET_MATCHMD_OK);
2282
2283         lnet_build_msg_event(msg, LNET_EVENT_GET);
2284
2285         reply_wmd = hdr->msg.get.return_wmd;
2286
2287         lnet_prep_send(msg, LNET_MSG_REPLY, source_id,
2288                        msg->msg_offset, msg->msg_wanted);
2289
2290         msg->msg_hdr.msg.reply.dst_wmd = reply_wmd;
2291
2292         if (rdma_get) {
2293                 /* The LND completes the REPLY from her recv procedure */
2294                 lnet_ni_recv(ni, msg->msg_private, msg, 0,
2295                              msg->msg_offset, msg->msg_len, msg->msg_len);
2296                 return 0;
2297         }
2298
2299         lnet_ni_recv(ni, msg->msg_private, NULL, 0, 0, 0, 0);
2300         msg->msg_receiving = 0;
2301
2302         rc = lnet_send(ni->ni_nid, msg, LNET_NID_ANY);
2303         if (rc < 0) {
2304                 /* didn't get as far as lnet_ni_send() */
2305                 CERROR("%s: Unable to send REPLY for GET from %s: %d\n",
2306                        libcfs_nid2str(ni->ni_nid),
2307                        libcfs_id2str(info.mi_id), rc);
2308
2309                 lnet_finalize(msg, rc);
2310         }
2311
2312         return 0;
2313 }
2314
2315 static int
2316 lnet_parse_reply(struct lnet_ni *ni, struct lnet_msg *msg)
2317 {
2318         void             *private = msg->msg_private;
2319         struct lnet_hdr  *hdr = &msg->msg_hdr;
2320         struct lnet_process_id src = {0};
2321         struct lnet_libmd        *md;
2322         int               rlength;
2323         int               mlength;
2324         int                     cpt;
2325
2326         cpt = lnet_cpt_of_cookie(hdr->msg.reply.dst_wmd.wh_object_cookie);
2327         lnet_res_lock(cpt);
2328
2329         src.nid = hdr->src_nid;
2330         src.pid = hdr->src_pid;
2331
2332         /* NB handles only looked up by creator (no flips) */
2333         md = lnet_wire_handle2md(&hdr->msg.reply.dst_wmd);
2334         if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
2335                 CNETERR("%s: Dropping REPLY from %s for %s "
2336                         "MD %#llx.%#llx\n",
2337                         libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
2338                         (md == NULL) ? "invalid" : "inactive",
2339                         hdr->msg.reply.dst_wmd.wh_interface_cookie,
2340                         hdr->msg.reply.dst_wmd.wh_object_cookie);
2341                 if (md != NULL && md->md_me != NULL)
2342                         CERROR("REPLY MD also attached to portal %d\n",
2343                                md->md_me->me_portal);
2344
2345                 lnet_res_unlock(cpt);
2346                 return -ENOENT; /* -ve: OK but no match */
2347         }
2348
2349         LASSERT(md->md_offset == 0);
2350
2351         rlength = hdr->payload_length;
2352         mlength = MIN(rlength, (int)md->md_length);
2353
2354         if (mlength < rlength &&
2355             (md->md_options & LNET_MD_TRUNCATE) == 0) {
2356                 CNETERR("%s: Dropping REPLY from %s length %d "
2357                         "for MD %#llx would overflow (%d)\n",
2358                         libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
2359                         rlength, hdr->msg.reply.dst_wmd.wh_object_cookie,
2360                         mlength);
2361                 lnet_res_unlock(cpt);
2362                 return -ENOENT; /* -ve: OK but no match */
2363         }
2364
2365         CDEBUG(D_NET, "%s: Reply from %s of length %d/%d into md %#llx\n",
2366                libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
2367                mlength, rlength, hdr->msg.reply.dst_wmd.wh_object_cookie);
2368
2369         lnet_msg_attach_md(msg, md, 0, mlength);
2370
2371         if (mlength != 0)
2372                 lnet_setpayloadbuffer(msg);
2373
2374         lnet_res_unlock(cpt);
2375
2376         lnet_build_msg_event(msg, LNET_EVENT_REPLY);
2377
2378         lnet_ni_recv(ni, private, msg, 0, 0, mlength, rlength);
2379         return 0;
2380 }
2381
2382 static int
2383 lnet_parse_ack(struct lnet_ni *ni, struct lnet_msg *msg)
2384 {
2385         struct lnet_hdr  *hdr = &msg->msg_hdr;
2386         struct lnet_process_id src = {0};
2387         struct lnet_libmd        *md;
2388         int                     cpt;
2389
2390         src.nid = hdr->src_nid;
2391         src.pid = hdr->src_pid;
2392
2393         /* Convert ack fields to host byte order */
2394         hdr->msg.ack.match_bits = le64_to_cpu(hdr->msg.ack.match_bits);
2395         hdr->msg.ack.mlength = le32_to_cpu(hdr->msg.ack.mlength);
2396
2397         cpt = lnet_cpt_of_cookie(hdr->msg.ack.dst_wmd.wh_object_cookie);
2398         lnet_res_lock(cpt);
2399
2400         /* NB handles only looked up by creator (no flips) */
2401         md = lnet_wire_handle2md(&hdr->msg.ack.dst_wmd);
2402         if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
2403                 /* Don't moan; this is expected */
2404                 CDEBUG(D_NET,
2405                        "%s: Dropping ACK from %s to %s MD %#llx.%#llx\n",
2406                        libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
2407                        (md == NULL) ? "invalid" : "inactive",
2408                        hdr->msg.ack.dst_wmd.wh_interface_cookie,
2409                        hdr->msg.ack.dst_wmd.wh_object_cookie);
2410                 if (md != NULL && md->md_me != NULL)
2411                         CERROR("Source MD also attached to portal %d\n",
2412                                md->md_me->me_portal);
2413
2414                 lnet_res_unlock(cpt);
2415                 return -ENOENT;                  /* -ve! */
2416         }
2417
2418         CDEBUG(D_NET, "%s: ACK from %s into md %#llx\n",
2419                libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
2420                hdr->msg.ack.dst_wmd.wh_object_cookie);
2421
2422         lnet_msg_attach_md(msg, md, 0, 0);
2423
2424         lnet_res_unlock(cpt);
2425
2426         lnet_build_msg_event(msg, LNET_EVENT_ACK);
2427
2428         lnet_ni_recv(ni, msg->msg_private, msg, 0, 0, 0, msg->msg_len);
2429         return 0;
2430 }
2431
2432 /**
2433  * \retval LNET_CREDIT_OK       If \a msg is forwarded
2434  * \retval LNET_CREDIT_WAIT     If \a msg is blocked because w/o buffer
2435  * \retval -ve                  error code
2436  */
2437 int
2438 lnet_parse_forward_locked(struct lnet_ni *ni, struct lnet_msg *msg)
2439 {
2440         int     rc = 0;
2441
2442         if (!the_lnet.ln_routing)
2443                 return -ECANCELED;
2444
2445         if (msg->msg_rxpeer->lpni_rtrcredits <= 0 ||
2446             lnet_msg2bufpool(msg)->rbp_credits <= 0) {
2447                 if (ni->ni_net->net_lnd->lnd_eager_recv == NULL) {
2448                         msg->msg_rx_ready_delay = 1;
2449                 } else {
2450                         lnet_net_unlock(msg->msg_rx_cpt);
2451                         rc = lnet_ni_eager_recv(ni, msg);
2452                         lnet_net_lock(msg->msg_rx_cpt);
2453                 }
2454         }
2455
2456         if (rc == 0)
2457                 rc = lnet_post_routed_recv_locked(msg, 0);
2458         return rc;
2459 }
2460
2461 int
2462 lnet_parse_local(struct lnet_ni *ni, struct lnet_msg *msg)
2463 {
2464         int     rc;
2465
2466         switch (msg->msg_type) {
2467         case LNET_MSG_ACK:
2468                 rc = lnet_parse_ack(ni, msg);
2469                 break;
2470         case LNET_MSG_PUT:
2471                 rc = lnet_parse_put(ni, msg);
2472                 break;
2473         case LNET_MSG_GET:
2474                 rc = lnet_parse_get(ni, msg, msg->msg_rdma_get);
2475                 break;
2476         case LNET_MSG_REPLY:
2477                 rc = lnet_parse_reply(ni, msg);
2478                 break;
2479         default: /* prevent an unused label if !kernel */
2480                 LASSERT(0);
2481                 return -EPROTO;
2482         }
2483
2484         LASSERT(rc == 0 || rc == -ENOENT);
2485         return rc;
2486 }
2487
2488 char *
2489 lnet_msgtyp2str (int type)
2490 {
2491         switch (type) {
2492         case LNET_MSG_ACK:
2493                 return ("ACK");
2494         case LNET_MSG_PUT:
2495                 return ("PUT");
2496         case LNET_MSG_GET:
2497                 return ("GET");
2498         case LNET_MSG_REPLY:
2499                 return ("REPLY");
2500         case LNET_MSG_HELLO:
2501                 return ("HELLO");
2502         default:
2503                 return ("<UNKNOWN>");
2504         }
2505 }
2506
2507 void
2508 lnet_print_hdr(struct lnet_hdr *hdr)
2509 {
2510         struct lnet_process_id src = {
2511                 .nid = hdr->src_nid,
2512                 .pid = hdr->src_pid,
2513         };
2514         struct lnet_process_id dst = {
2515                 .nid = hdr->dest_nid,
2516                 .pid = hdr->dest_pid,
2517         };
2518         char *type_str = lnet_msgtyp2str(hdr->type);
2519
2520         CWARN("P3 Header at %p of type %s\n", hdr, type_str);
2521         CWARN("    From %s\n", libcfs_id2str(src));
2522         CWARN("    To   %s\n", libcfs_id2str(dst));
2523
2524         switch (hdr->type) {
2525         default:
2526                 break;
2527
2528         case LNET_MSG_PUT:
2529                 CWARN("    Ptl index %d, ack md %#llx.%#llx, "
2530                       "match bits %llu\n",
2531                       hdr->msg.put.ptl_index,
2532                       hdr->msg.put.ack_wmd.wh_interface_cookie,
2533                       hdr->msg.put.ack_wmd.wh_object_cookie,
2534                       hdr->msg.put.match_bits);
2535                 CWARN("    Length %d, offset %d, hdr data %#llx\n",
2536                       hdr->payload_length, hdr->msg.put.offset,
2537                       hdr->msg.put.hdr_data);
2538                 break;
2539
2540         case LNET_MSG_GET:
2541                 CWARN("    Ptl index %d, return md %#llx.%#llx, "
2542                       "match bits %llu\n", hdr->msg.get.ptl_index,
2543                       hdr->msg.get.return_wmd.wh_interface_cookie,
2544                       hdr->msg.get.return_wmd.wh_object_cookie,
2545                       hdr->msg.get.match_bits);
2546                 CWARN("    Length %d, src offset %d\n",
2547                       hdr->msg.get.sink_length,
2548                       hdr->msg.get.src_offset);
2549                 break;
2550
2551         case LNET_MSG_ACK:
2552                 CWARN("    dst md %#llx.%#llx, "
2553                       "manipulated length %d\n",
2554                       hdr->msg.ack.dst_wmd.wh_interface_cookie,
2555                       hdr->msg.ack.dst_wmd.wh_object_cookie,
2556                       hdr->msg.ack.mlength);
2557                 break;
2558
2559         case LNET_MSG_REPLY:
2560                 CWARN("    dst md %#llx.%#llx, "
2561                       "length %d\n",
2562                       hdr->msg.reply.dst_wmd.wh_interface_cookie,
2563                       hdr->msg.reply.dst_wmd.wh_object_cookie,
2564                       hdr->payload_length);
2565         }
2566
2567 }
2568
2569 int
2570 lnet_parse(struct lnet_ni *ni, struct lnet_hdr *hdr, lnet_nid_t from_nid,
2571            void *private, int rdma_req)
2572 {
2573         int             rc = 0;
2574         int             cpt;
2575         int             for_me;
2576         struct lnet_msg *msg;
2577         lnet_pid_t     dest_pid;
2578         lnet_nid_t     dest_nid;
2579         lnet_nid_t     src_nid;
2580         struct lnet_peer_ni *lpni;
2581         __u32          payload_length;
2582         __u32          type;
2583
2584         LASSERT (!in_interrupt ());
2585
2586         type = le32_to_cpu(hdr->type);
2587         src_nid = le64_to_cpu(hdr->src_nid);
2588         dest_nid = le64_to_cpu(hdr->dest_nid);
2589         dest_pid = le32_to_cpu(hdr->dest_pid);
2590         payload_length = le32_to_cpu(hdr->payload_length);
2591
2592         for_me = (ni->ni_nid == dest_nid);
2593         cpt = lnet_cpt_of_nid(from_nid, ni);
2594
2595         CDEBUG(D_NET, "TRACE: %s(%s) <- %s : %s - %s\n",
2596                 libcfs_nid2str(dest_nid),
2597                 libcfs_nid2str(ni->ni_nid),
2598                 libcfs_nid2str(src_nid),
2599                 lnet_msgtyp2str(type),
2600                 (for_me) ? "for me" : "routed");
2601
2602         switch (type) {
2603         case LNET_MSG_ACK:
2604         case LNET_MSG_GET:
2605                 if (payload_length > 0) {
2606                         CERROR("%s, src %s: bad %s payload %d (0 expected)\n",
2607                                libcfs_nid2str(from_nid),
2608                                libcfs_nid2str(src_nid),
2609                                lnet_msgtyp2str(type), payload_length);
2610                         return -EPROTO;
2611                 }
2612                 break;
2613
2614         case LNET_MSG_PUT:
2615         case LNET_MSG_REPLY:
2616                 if (payload_length >
2617                     (__u32)(for_me ? LNET_MAX_PAYLOAD : LNET_MTU)) {
2618                         CERROR("%s, src %s: bad %s payload %d "
2619                                "(%d max expected)\n",
2620                                libcfs_nid2str(from_nid),
2621                                libcfs_nid2str(src_nid),
2622                                lnet_msgtyp2str(type),
2623                                payload_length,
2624                                for_me ? LNET_MAX_PAYLOAD : LNET_MTU);
2625                         return -EPROTO;
2626                 }
2627                 break;
2628
2629         default:
2630                 CERROR("%s, src %s: Bad message type 0x%x\n",
2631                        libcfs_nid2str(from_nid),
2632                        libcfs_nid2str(src_nid), type);
2633                 return -EPROTO;
2634         }
2635
2636         if (the_lnet.ln_routing &&
2637             ni->ni_last_alive != ktime_get_real_seconds()) {
2638                 /* NB: so far here is the only place to set NI status to "up */
2639                 lnet_ni_lock(ni);
2640                 ni->ni_last_alive = ktime_get_real_seconds();
2641                 if (ni->ni_status != NULL &&
2642                     ni->ni_status->ns_status == LNET_NI_STATUS_DOWN)
2643                         ni->ni_status->ns_status = LNET_NI_STATUS_UP;
2644                 lnet_ni_unlock(ni);
2645         }
2646
2647         /* Regard a bad destination NID as a protocol error.  Senders should
2648          * know what they're doing; if they don't they're misconfigured, buggy
2649          * or malicious so we chop them off at the knees :) */
2650
2651         if (!for_me) {
2652                 if (LNET_NIDNET(dest_nid) == LNET_NIDNET(ni->ni_nid)) {
2653                         /* should have gone direct */
2654                         CERROR("%s, src %s: Bad dest nid %s "
2655                                "(should have been sent direct)\n",
2656                                 libcfs_nid2str(from_nid),
2657                                 libcfs_nid2str(src_nid),
2658                                 libcfs_nid2str(dest_nid));
2659                         return -EPROTO;
2660                 }
2661
2662                 if (lnet_islocalnid(dest_nid)) {
2663                         /* dest is another local NI; sender should have used
2664                          * this node's NID on its own network */
2665                         CERROR("%s, src %s: Bad dest nid %s "
2666                                "(it's my nid but on a different network)\n",
2667                                 libcfs_nid2str(from_nid),
2668                                 libcfs_nid2str(src_nid),
2669                                 libcfs_nid2str(dest_nid));
2670                         return -EPROTO;
2671                 }
2672
2673                 if (rdma_req && type == LNET_MSG_GET) {
2674                         CERROR("%s, src %s: Bad optimized GET for %s "
2675                                "(final destination must be me)\n",
2676                                 libcfs_nid2str(from_nid),
2677                                 libcfs_nid2str(src_nid),
2678                                 libcfs_nid2str(dest_nid));
2679                         return -EPROTO;
2680                 }
2681
2682                 if (!the_lnet.ln_routing) {
2683                         CERROR("%s, src %s: Dropping message for %s "
2684                                "(routing not enabled)\n",
2685                                 libcfs_nid2str(from_nid),
2686                                 libcfs_nid2str(src_nid),
2687                                 libcfs_nid2str(dest_nid));
2688                         goto drop;
2689                 }
2690         }
2691
2692         /* Message looks OK; we're not going to return an error, so we MUST
2693          * call back lnd_recv() come what may... */
2694
2695         if (!list_empty(&the_lnet.ln_test_peers) &&     /* normally we don't */
2696             fail_peer(src_nid, 0)) {                    /* shall we now? */
2697                 CERROR("%s, src %s: Dropping %s to simulate failure\n",
2698                        libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
2699                        lnet_msgtyp2str(type));
2700                 goto drop;
2701         }
2702
2703         if (!list_empty(&the_lnet.ln_drop_rules) &&
2704             lnet_drop_rule_match(hdr)) {
2705                 CDEBUG(D_NET, "%s, src %s, dst %s: Dropping %s to simulate"
2706                               "silent message loss\n",
2707                        libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
2708                        libcfs_nid2str(dest_nid), lnet_msgtyp2str(type));
2709                 goto drop;
2710         }
2711
2712
2713         msg = lnet_msg_alloc();
2714         if (msg == NULL) {
2715                 CERROR("%s, src %s: Dropping %s (out of memory)\n",
2716                        libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
2717                        lnet_msgtyp2str(type));
2718                 goto drop;
2719         }
2720
2721         /* msg zeroed in lnet_msg_alloc; i.e. flags all clear,
2722          * pointers NULL etc */
2723
2724         msg->msg_type = type;
2725         msg->msg_private = private;
2726         msg->msg_receiving = 1;
2727         msg->msg_rdma_get = rdma_req;
2728         msg->msg_len = msg->msg_wanted = payload_length;
2729         msg->msg_offset = 0;
2730         msg->msg_hdr = *hdr;
2731         /* for building message event */
2732         msg->msg_from = from_nid;
2733         if (!for_me) {
2734                 msg->msg_target.pid     = dest_pid;
2735                 msg->msg_target.nid     = dest_nid;
2736                 msg->msg_routing        = 1;
2737
2738         } else {
2739                 /* convert common msg->hdr fields to host byteorder */
2740                 msg->msg_hdr.type       = type;
2741                 msg->msg_hdr.src_nid    = src_nid;
2742                 msg->msg_hdr.src_pid    = le32_to_cpu(msg->msg_hdr.src_pid);
2743                 msg->msg_hdr.dest_nid   = dest_nid;
2744                 msg->msg_hdr.dest_pid   = dest_pid;
2745                 msg->msg_hdr.payload_length = payload_length;
2746         }
2747
2748         lnet_net_lock(cpt);
2749         lpni = lnet_nid2peerni_locked(from_nid, ni->ni_nid, cpt);
2750         if (IS_ERR(lpni)) {
2751                 lnet_net_unlock(cpt);
2752                 CERROR("%s, src %s: Dropping %s "
2753                        "(error %ld looking up sender)\n",
2754                        libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
2755                        lnet_msgtyp2str(type), PTR_ERR(lpni));
2756                 lnet_msg_free(msg);
2757                 if (rc == -ESHUTDOWN)
2758                         /* We are shutting down.  Don't do anything more */
2759                         return 0;
2760                 goto drop;
2761         }
2762         msg->msg_rxpeer = lpni;
2763         msg->msg_rxni = ni;
2764         lnet_ni_addref_locked(ni, cpt);
2765         /* Multi-Rail: Primary NID of source. */
2766         msg->msg_initiator = lnet_peer_primary_nid_locked(src_nid);
2767
2768         if (lnet_isrouter(msg->msg_rxpeer)) {
2769                 lnet_peer_set_alive(msg->msg_rxpeer);
2770                 if (avoid_asym_router_failure &&
2771                     LNET_NIDNET(src_nid) != LNET_NIDNET(from_nid)) {
2772                         /* received a remote message from router, update
2773                          * remote NI status on this router.
2774                          * NB: multi-hop routed message will be ignored.
2775                          */
2776                         lnet_router_ni_update_locked(msg->msg_rxpeer,
2777                                                      LNET_NIDNET(src_nid));
2778                 }
2779         }
2780
2781         lnet_msg_commit(msg, cpt);
2782
2783         /* message delay simulation */
2784         if (unlikely(!list_empty(&the_lnet.ln_delay_rules) &&
2785                      lnet_delay_rule_match_locked(hdr, msg))) {
2786                 lnet_net_unlock(cpt);
2787                 return 0;
2788         }
2789
2790         if (!for_me) {
2791                 rc = lnet_parse_forward_locked(ni, msg);
2792                 lnet_net_unlock(cpt);
2793
2794                 if (rc < 0)
2795                         goto free_drop;
2796
2797                 if (rc == LNET_CREDIT_OK) {
2798                         lnet_ni_recv(ni, msg->msg_private, msg, 0,
2799                                      0, payload_length, payload_length);
2800                 }
2801                 return 0;
2802         }
2803
2804         lnet_net_unlock(cpt);
2805
2806         rc = lnet_parse_local(ni, msg);
2807         if (rc != 0)
2808                 goto free_drop;
2809         return 0;
2810
2811  free_drop:
2812         LASSERT(msg->msg_md == NULL);
2813         lnet_finalize(msg, rc);
2814
2815  drop:
2816         lnet_drop_message(ni, cpt, private, payload_length, type);
2817         return 0;
2818 }
2819 EXPORT_SYMBOL(lnet_parse);
2820
2821 void
2822 lnet_drop_delayed_msg_list(struct list_head *head, char *reason)
2823 {
2824         while (!list_empty(head)) {
2825                 struct lnet_process_id id = {0};
2826                 struct lnet_msg *msg;
2827
2828                 msg = list_entry(head->next, struct lnet_msg, msg_list);
2829                 list_del(&msg->msg_list);
2830
2831                 id.nid = msg->msg_hdr.src_nid;
2832                 id.pid = msg->msg_hdr.src_pid;
2833
2834                 LASSERT(msg->msg_md == NULL);
2835                 LASSERT(msg->msg_rx_delayed);
2836                 LASSERT(msg->msg_rxpeer != NULL);
2837                 LASSERT(msg->msg_hdr.type == LNET_MSG_PUT);
2838
2839                 CWARN("Dropping delayed PUT from %s portal %d match %llu"
2840                       " offset %d length %d: %s\n",
2841                       libcfs_id2str(id),
2842                       msg->msg_hdr.msg.put.ptl_index,
2843                       msg->msg_hdr.msg.put.match_bits,
2844                       msg->msg_hdr.msg.put.offset,
2845                       msg->msg_hdr.payload_length, reason);
2846
2847                 /* NB I can't drop msg's ref on msg_rxpeer until after I've
2848                  * called lnet_drop_message(), so I just hang onto msg as well
2849                  * until that's done */
2850
2851                 lnet_drop_message(msg->msg_rxni, msg->msg_rx_cpt,
2852                                   msg->msg_private, msg->msg_len,
2853                                   msg->msg_type);
2854                 /*
2855                  * NB: message will not generate event because w/o attached MD,
2856                  * but we still should give error code so lnet_msg_decommit()
2857                  * can skip counters operations and other checks.
2858                  */
2859                 lnet_finalize(msg, -ENOENT);
2860         }
2861 }
2862
2863 void
2864 lnet_recv_delayed_msg_list(struct list_head *head)
2865 {
2866         while (!list_empty(head)) {
2867                 struct lnet_msg *msg;
2868                 struct lnet_process_id id;
2869
2870                 msg = list_entry(head->next, struct lnet_msg, msg_list);
2871                 list_del(&msg->msg_list);
2872
2873                 /* md won't disappear under me, since each msg
2874                  * holds a ref on it */
2875
2876                 id.nid = msg->msg_hdr.src_nid;
2877                 id.pid = msg->msg_hdr.src_pid;
2878
2879                 LASSERT(msg->msg_rx_delayed);
2880                 LASSERT(msg->msg_md != NULL);
2881                 LASSERT(msg->msg_rxpeer != NULL);
2882                 LASSERT(msg->msg_rxni != NULL);
2883                 LASSERT(msg->msg_hdr.type == LNET_MSG_PUT);
2884
2885                 CDEBUG(D_NET, "Resuming delayed PUT from %s portal %d "
2886                        "match %llu offset %d length %d.\n",
2887                         libcfs_id2str(id), msg->msg_hdr.msg.put.ptl_index,
2888                         msg->msg_hdr.msg.put.match_bits,
2889                         msg->msg_hdr.msg.put.offset,
2890                         msg->msg_hdr.payload_length);
2891
2892                 lnet_recv_put(msg->msg_rxni, msg);
2893         }
2894 }
2895
2896 /**
2897  * Initiate an asynchronous PUT operation.
2898  *
2899  * There are several events associated with a PUT: completion of the send on
2900  * the initiator node (LNET_EVENT_SEND), and when the send completes
2901  * successfully, the receipt of an acknowledgment (LNET_EVENT_ACK) indicating
2902  * that the operation was accepted by the target. The event LNET_EVENT_PUT is
2903  * used at the target node to indicate the completion of incoming data
2904  * delivery.
2905  *
2906  * The local events will be logged in the EQ associated with the MD pointed to
2907  * by \a mdh handle. Using a MD without an associated EQ results in these
2908  * events being discarded. In this case, the caller must have another
2909  * mechanism (e.g., a higher level protocol) for determining when it is safe
2910  * to modify the memory region associated with the MD.
2911  *
2912  * Note that LNet does not guarantee the order of LNET_EVENT_SEND and
2913  * LNET_EVENT_ACK, though intuitively ACK should happen after SEND.
2914  *
2915  * \param self Indicates the NID of a local interface through which to send
2916  * the PUT request. Use LNET_NID_ANY to let LNet choose one by itself.
2917  * \param mdh A handle for the MD that describes the memory to be sent. The MD
2918  * must be "free floating" (See LNetMDBind()).
2919  * \param ack Controls whether an acknowledgment is requested.
2920  * Acknowledgments are only sent when they are requested by the initiating
2921  * process and the target MD enables them.
2922  * \param target A process identifier for the target process.
2923  * \param portal The index in the \a target's portal table.
2924  * \param match_bits The match bits to use for MD selection at the target
2925  * process.
2926  * \param offset The offset into the target MD (only used when the target
2927  * MD has the LNET_MD_MANAGE_REMOTE option set).
2928  * \param hdr_data 64 bits of user data that can be included in the message
2929  * header. This data is written to an event queue entry at the target if an
2930  * EQ is present on the matching MD.
2931  *
2932  * \retval  0      Success, and only in this case events will be generated
2933  * and logged to EQ (if it exists).
2934  * \retval -EIO    Simulated failure.
2935  * \retval -ENOMEM Memory allocation failure.
2936  * \retval -ENOENT Invalid MD object.
2937  *
2938  * \see struct lnet_event::hdr_data and lnet_event_kind_t.
2939  */
2940 int
2941 LNetPut(lnet_nid_t self, struct lnet_handle_md mdh, enum lnet_ack_req ack,
2942         struct lnet_process_id target, unsigned int portal,
2943         __u64 match_bits, unsigned int offset,
2944         __u64 hdr_data)
2945 {
2946         struct lnet_msg         *msg;
2947         struct lnet_libmd       *md;
2948         int                     cpt;
2949         int                     rc;
2950
2951         LASSERT(the_lnet.ln_refcount > 0);
2952
2953         if (!list_empty(&the_lnet.ln_test_peers) &&     /* normally we don't */
2954             fail_peer(target.nid, 1)) {                 /* shall we now? */
2955                 CERROR("Dropping PUT to %s: simulated failure\n",
2956                        libcfs_id2str(target));
2957                 return -EIO;
2958         }
2959
2960         msg = lnet_msg_alloc();
2961         if (msg == NULL) {
2962                 CERROR("Dropping PUT to %s: ENOMEM on struct lnet_msg\n",
2963                        libcfs_id2str(target));
2964                 return -ENOMEM;
2965         }
2966         msg->msg_vmflush = !!memory_pressure_get();
2967
2968         cpt = lnet_cpt_of_cookie(mdh.cookie);
2969         lnet_res_lock(cpt);
2970
2971         md = lnet_handle2md(&mdh);
2972         if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
2973                 CERROR("Dropping PUT (%llu:%d:%s): MD (%d) invalid\n",
2974                        match_bits, portal, libcfs_id2str(target),
2975                        md == NULL ? -1 : md->md_threshold);
2976                 if (md != NULL && md->md_me != NULL)
2977                         CERROR("Source MD also attached to portal %d\n",
2978                                md->md_me->me_portal);
2979                 lnet_res_unlock(cpt);
2980
2981                 lnet_msg_free(msg);
2982                 return -ENOENT;
2983         }
2984
2985         CDEBUG(D_NET, "LNetPut -> %s\n", libcfs_id2str(target));
2986
2987         lnet_msg_attach_md(msg, md, 0, 0);
2988
2989         lnet_prep_send(msg, LNET_MSG_PUT, target, 0, md->md_length);
2990
2991         msg->msg_hdr.msg.put.match_bits = cpu_to_le64(match_bits);
2992         msg->msg_hdr.msg.put.ptl_index = cpu_to_le32(portal);
2993         msg->msg_hdr.msg.put.offset = cpu_to_le32(offset);
2994         msg->msg_hdr.msg.put.hdr_data = hdr_data;
2995
2996         /* NB handles only looked up by creator (no flips) */
2997         if (ack == LNET_ACK_REQ) {
2998                 msg->msg_hdr.msg.put.ack_wmd.wh_interface_cookie =
2999                         the_lnet.ln_interface_cookie;
3000                 msg->msg_hdr.msg.put.ack_wmd.wh_object_cookie =
3001                         md->md_lh.lh_cookie;
3002         } else {
3003                 msg->msg_hdr.msg.put.ack_wmd.wh_interface_cookie =
3004                         LNET_WIRE_HANDLE_COOKIE_NONE;
3005                 msg->msg_hdr.msg.put.ack_wmd.wh_object_cookie =
3006                         LNET_WIRE_HANDLE_COOKIE_NONE;
3007         }
3008
3009         lnet_res_unlock(cpt);
3010
3011         lnet_build_msg_event(msg, LNET_EVENT_SEND);
3012
3013         rc = lnet_send(self, msg, LNET_NID_ANY);
3014         if (rc != 0) {
3015                 CNETERR("Error sending PUT to %s: %d\n",
3016                         libcfs_id2str(target), rc);
3017                 lnet_finalize(msg, rc);
3018         }
3019
3020         /* completion will be signalled by an event */
3021         return 0;
3022 }
3023 EXPORT_SYMBOL(LNetPut);
3024
3025 /*
3026  * The LND can DMA direct to the GET md (i.e. no REPLY msg).  This
3027  * returns a msg for the LND to pass to lnet_finalize() when the sink
3028  * data has been received.
3029  *
3030  * CAVEAT EMPTOR: 'getmsg' is the original GET, which is freed when
3031  * lnet_finalize() is called on it, so the LND must call this first
3032  */
3033 struct lnet_msg *
3034 lnet_create_reply_msg(struct lnet_ni *ni, struct lnet_msg *getmsg)
3035 {
3036         struct lnet_msg *msg = lnet_msg_alloc();
3037         struct lnet_libmd *getmd = getmsg->msg_md;
3038         struct lnet_process_id peer_id = getmsg->msg_target;
3039         int cpt;
3040
3041         LASSERT(!getmsg->msg_target_is_router);
3042         LASSERT(!getmsg->msg_routing);
3043
3044         if (msg == NULL) {
3045                 CERROR("%s: Dropping REPLY from %s: can't allocate msg\n",
3046                        libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id));
3047                 goto drop;
3048         }
3049
3050         cpt = lnet_cpt_of_cookie(getmd->md_lh.lh_cookie);
3051         lnet_res_lock(cpt);
3052
3053         LASSERT(getmd->md_refcount > 0);
3054
3055         if (getmd->md_threshold == 0) {
3056                 CERROR("%s: Dropping REPLY from %s for inactive MD %p\n",
3057                         libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id),
3058                         getmd);
3059                 lnet_res_unlock(cpt);
3060                 goto drop;
3061         }
3062
3063         LASSERT(getmd->md_offset == 0);
3064
3065         CDEBUG(D_NET, "%s: Reply from %s md %p\n",
3066                libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id), getmd);
3067
3068         /* setup information for lnet_build_msg_event */
3069         msg->msg_initiator = getmsg->msg_txpeer->lpni_peer_net->lpn_peer->lp_primary_nid;
3070         msg->msg_from = peer_id.nid;
3071         msg->msg_type = LNET_MSG_GET; /* flag this msg as an "optimized" GET */
3072         msg->msg_hdr.src_nid = peer_id.nid;
3073         msg->msg_hdr.payload_length = getmd->md_length;
3074         msg->msg_receiving = 1; /* required by lnet_msg_attach_md */
3075
3076         lnet_msg_attach_md(msg, getmd, getmd->md_offset, getmd->md_length);
3077         lnet_res_unlock(cpt);
3078
3079         cpt = lnet_cpt_of_nid(peer_id.nid, ni);
3080
3081         lnet_net_lock(cpt);
3082         lnet_msg_commit(msg, cpt);
3083         lnet_net_unlock(cpt);
3084
3085         lnet_build_msg_event(msg, LNET_EVENT_REPLY);
3086
3087         return msg;
3088
3089  drop:
3090         cpt = lnet_cpt_of_nid(peer_id.nid, ni);
3091
3092         lnet_net_lock(cpt);
3093         lnet_incr_stats(&ni->ni_stats, LNET_MSG_GET, LNET_STATS_TYPE_DROP);
3094         the_lnet.ln_counters[cpt]->drop_count++;
3095         the_lnet.ln_counters[cpt]->drop_length += getmd->md_length;
3096         lnet_net_unlock(cpt);
3097
3098         if (msg != NULL)
3099                 lnet_msg_free(msg);
3100
3101         return NULL;
3102 }
3103 EXPORT_SYMBOL(lnet_create_reply_msg);
3104
3105 void
3106 lnet_set_reply_msg_len(struct lnet_ni *ni, struct lnet_msg *reply,
3107                        unsigned int len)
3108 {
3109         /* Set the REPLY length, now the RDMA that elides the REPLY message has
3110          * completed and I know it. */
3111         LASSERT(reply != NULL);
3112         LASSERT(reply->msg_type == LNET_MSG_GET);
3113         LASSERT(reply->msg_ev.type == LNET_EVENT_REPLY);
3114
3115         /* NB I trusted my peer to RDMA.  If she tells me she's written beyond
3116          * the end of my buffer, I might as well be dead. */
3117         LASSERT(len <= reply->msg_ev.mlength);
3118
3119         reply->msg_ev.mlength = len;
3120 }
3121 EXPORT_SYMBOL(lnet_set_reply_msg_len);
3122
3123 /**
3124  * Initiate an asynchronous GET operation.
3125  *
3126  * On the initiator node, an LNET_EVENT_SEND is logged when the GET request
3127  * is sent, and an LNET_EVENT_REPLY is logged when the data returned from
3128  * the target node in the REPLY has been written to local MD.
3129  *
3130  * On the target node, an LNET_EVENT_GET is logged when the GET request
3131  * arrives and is accepted into a MD.
3132  *
3133  * \param self,target,portal,match_bits,offset See the discussion in LNetPut().
3134  * \param mdh A handle for the MD that describes the memory into which the
3135  * requested data will be received. The MD must be "free floating" (See LNetMDBind()).
3136  *
3137  * \retval  0      Success, and only in this case events will be generated
3138  * and logged to EQ (if it exists) of the MD.
3139  * \retval -EIO    Simulated failure.
3140  * \retval -ENOMEM Memory allocation failure.
3141  * \retval -ENOENT Invalid MD object.
3142  */
3143 int
3144 LNetGet(lnet_nid_t self, struct lnet_handle_md mdh,
3145         struct lnet_process_id target, unsigned int portal,
3146         __u64 match_bits, unsigned int offset)
3147 {
3148         struct lnet_msg         *msg;
3149         struct lnet_libmd       *md;
3150         int                     cpt;
3151         int                     rc;
3152
3153         LASSERT(the_lnet.ln_refcount > 0);
3154
3155         if (!list_empty(&the_lnet.ln_test_peers) &&     /* normally we don't */
3156             fail_peer(target.nid, 1))                   /* shall we now? */
3157         {
3158                 CERROR("Dropping GET to %s: simulated failure\n",
3159                        libcfs_id2str(target));
3160                 return -EIO;
3161         }
3162
3163         msg = lnet_msg_alloc();
3164         if (msg == NULL) {
3165                 CERROR("Dropping GET to %s: ENOMEM on struct lnet_msg\n",
3166                        libcfs_id2str(target));
3167                 return -ENOMEM;
3168         }
3169
3170         cpt = lnet_cpt_of_cookie(mdh.cookie);
3171         lnet_res_lock(cpt);
3172
3173         md = lnet_handle2md(&mdh);
3174         if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
3175                 CERROR("Dropping GET (%llu:%d:%s): MD (%d) invalid\n",
3176                        match_bits, portal, libcfs_id2str(target),
3177                        md == NULL ? -1 : md->md_threshold);
3178                 if (md != NULL && md->md_me != NULL)
3179                         CERROR("REPLY MD also attached to portal %d\n",
3180                                md->md_me->me_portal);
3181
3182                 lnet_res_unlock(cpt);
3183
3184                 lnet_msg_free(msg);
3185                 return -ENOENT;
3186         }
3187
3188         CDEBUG(D_NET, "LNetGet -> %s\n", libcfs_id2str(target));
3189
3190         lnet_msg_attach_md(msg, md, 0, 0);
3191
3192         lnet_prep_send(msg, LNET_MSG_GET, target, 0, 0);
3193
3194         msg->msg_hdr.msg.get.match_bits = cpu_to_le64(match_bits);
3195         msg->msg_hdr.msg.get.ptl_index = cpu_to_le32(portal);
3196         msg->msg_hdr.msg.get.src_offset = cpu_to_le32(offset);
3197         msg->msg_hdr.msg.get.sink_length = cpu_to_le32(md->md_length);
3198
3199         /* NB handles only looked up by creator (no flips) */
3200         msg->msg_hdr.msg.get.return_wmd.wh_interface_cookie =
3201                 the_lnet.ln_interface_cookie;
3202         msg->msg_hdr.msg.get.return_wmd.wh_object_cookie =
3203                 md->md_lh.lh_cookie;
3204
3205         lnet_res_unlock(cpt);
3206
3207         lnet_build_msg_event(msg, LNET_EVENT_SEND);
3208
3209         rc = lnet_send(self, msg, LNET_NID_ANY);
3210         if (rc < 0) {
3211                 CNETERR("Error sending GET to %s: %d\n",
3212                         libcfs_id2str(target), rc);
3213                 lnet_finalize(msg, rc);
3214         }
3215
3216         /* completion will be signalled by an event */
3217         return 0;
3218 }
3219 EXPORT_SYMBOL(LNetGet);
3220
3221 /**
3222  * Calculate distance to node at \a dstnid.
3223  *
3224  * \param dstnid Target NID.
3225  * \param srcnidp If not NULL, NID of the local interface to reach \a dstnid
3226  * is saved here.
3227  * \param orderp If not NULL, order of the route to reach \a dstnid is saved
3228  * here.
3229  *
3230  * \retval 0 If \a dstnid belongs to a local interface, and reserved option
3231  * local_nid_dist_zero is set, which is the default.
3232  * \retval positives Distance to target NID, i.e. number of hops plus one.
3233  * \retval -EHOSTUNREACH If \a dstnid is not reachable.
3234  */
3235 int
3236 LNetDist(lnet_nid_t dstnid, lnet_nid_t *srcnidp, __u32 *orderp)
3237 {
3238         struct list_head        *e;
3239         struct lnet_ni *ni = NULL;
3240         struct lnet_remotenet *rnet;
3241         __u32                   dstnet = LNET_NIDNET(dstnid);
3242         int                     hops;
3243         int                     cpt;
3244         __u32                   order = 2;
3245         struct list_head        *rn_list;
3246
3247         /* if !local_nid_dist_zero, I don't return a distance of 0 ever
3248          * (when lustre sees a distance of 0, it substitutes 0@lo), so I
3249          * keep order 0 free for 0@lo and order 1 free for a local NID
3250          * match */
3251
3252         LASSERT(the_lnet.ln_refcount > 0);
3253
3254         cpt = lnet_net_lock_current();
3255
3256         while ((ni = lnet_get_next_ni_locked(NULL, ni))) {
3257                 if (ni->ni_nid == dstnid) {
3258                         if (srcnidp != NULL)
3259                                 *srcnidp = dstnid;
3260                         if (orderp != NULL) {
3261                                 if (LNET_NETTYP(LNET_NIDNET(dstnid)) == LOLND)
3262                                         *orderp = 0;
3263                                 else
3264                                         *orderp = 1;
3265                         }
3266                         lnet_net_unlock(cpt);
3267
3268                         return local_nid_dist_zero ? 0 : 1;
3269                 }
3270
3271                 if (LNET_NIDNET(ni->ni_nid) == dstnet) {
3272                         /* Check if ni was originally created in
3273                          * current net namespace.
3274                          * If not, assign order above 0xffff0000,
3275                          * to make this ni not a priority. */
3276                         if (!net_eq(ni->ni_net_ns, current->nsproxy->net_ns))
3277                                 order += 0xffff0000;
3278
3279                         if (srcnidp != NULL)
3280                                 *srcnidp = ni->ni_nid;
3281                         if (orderp != NULL)
3282                                 *orderp = order;
3283                         lnet_net_unlock(cpt);
3284                         return 1;
3285                 }
3286
3287                 order++;
3288         }
3289
3290         rn_list = lnet_net2rnethash(dstnet);
3291         list_for_each(e, rn_list) {
3292                 rnet = list_entry(e, struct lnet_remotenet, lrn_list);
3293
3294                 if (rnet->lrn_net == dstnet) {
3295                         struct lnet_route *route;
3296                         struct lnet_route *shortest = NULL;
3297                         __u32 shortest_hops = LNET_UNDEFINED_HOPS;
3298                         __u32 route_hops;
3299
3300                         LASSERT(!list_empty(&rnet->lrn_routes));
3301
3302                         list_for_each_entry(route, &rnet->lrn_routes,
3303                                             lr_list) {
3304                                 route_hops = route->lr_hops;
3305                                 if (route_hops == LNET_UNDEFINED_HOPS)
3306                                         route_hops = 1;
3307                                 if (shortest == NULL ||
3308                                     route_hops < shortest_hops) {
3309                                         shortest = route;
3310                                         shortest_hops = route_hops;
3311                                 }
3312                         }
3313
3314                         LASSERT(shortest != NULL);
3315                         hops = shortest_hops;
3316                         if (srcnidp != NULL) {
3317                                 ni = lnet_get_next_ni_locked(
3318                                         shortest->lr_gateway->lpni_net,
3319                                         NULL);
3320                                 *srcnidp = ni->ni_nid;
3321                         }
3322                         if (orderp != NULL)
3323                                 *orderp = order;
3324                         lnet_net_unlock(cpt);
3325                         return hops + 1;
3326                 }
3327                 order++;
3328         }
3329
3330         lnet_net_unlock(cpt);
3331         return -EHOSTUNREACH;
3332 }
3333 EXPORT_SYMBOL(LNetDist);