Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lnet / lnet / lib-move.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/lnet/lib-move.c
37  *
38  * Data movement routines
39  */
40
41 #define DEBUG_SUBSYSTEM S_LNET
42
43 #include <lnet/lib-lnet.h>
44
45 static int local_nid_dist_zero = 1;
46 CFS_MODULE_PARM(local_nid_dist_zero, "i", int, 0444,
47                 "Reserved");
48
49 /* forward ref */
50 static void lnet_commit_md (lnet_libmd_t *md, lnet_msg_t *msg);
51
52 #define LNET_MATCHMD_NONE     0   /* Didn't match */
53 #define LNET_MATCHMD_OK       1   /* Matched OK */
54 #define LNET_MATCHMD_DROP     2   /* Must be discarded */
55
56 static int
57 lnet_try_match_md (int index, int op_mask, lnet_process_id_t src,
58                    unsigned int rlength, unsigned int roffset,
59                    __u64 match_bits, lnet_libmd_t *md, lnet_msg_t *msg,
60                    unsigned int *mlength_out, unsigned int *offset_out)
61 {
62         /* ALWAYS called holding the LNET_LOCK, and can't LNET_UNLOCK;
63          * lnet_match_blocked_msg() relies on this to avoid races */
64         unsigned int  offset;
65         unsigned int  mlength;
66         lnet_me_t    *me = md->md_me;
67
68         /* mismatched MD op */
69         if ((md->md_options & op_mask) == 0)
70                 return LNET_MATCHMD_NONE;
71
72         /* MD exhausted */
73         if (lnet_md_exhausted(md))
74                 return LNET_MATCHMD_NONE;
75
76         /* mismatched ME nid/pid? */
77         if (me->me_match_id.nid != LNET_NID_ANY &&
78             me->me_match_id.nid != src.nid)
79                 return LNET_MATCHMD_NONE;
80
81         if (me->me_match_id.pid != LNET_PID_ANY &&
82             me->me_match_id.pid != src.pid)
83                 return LNET_MATCHMD_NONE;
84
85         /* mismatched ME matchbits? */
86         if (((me->me_match_bits ^ match_bits) & ~me->me_ignore_bits) != 0)
87                 return LNET_MATCHMD_NONE;
88
89         /* Hurrah! This _is_ a match; check it out... */
90
91         if ((md->md_options & LNET_MD_MANAGE_REMOTE) == 0)
92                 offset = md->md_offset;
93         else
94                 offset = roffset;
95
96         if ((md->md_options & LNET_MD_MAX_SIZE) != 0) {
97                 mlength = md->md_max_size;
98                 LASSERT (md->md_offset + mlength <= md->md_length);
99         } else {
100                 mlength = md->md_length - offset;
101         }
102
103         if (rlength <= mlength) {        /* fits in allowed space */
104                 mlength = rlength;
105         } else if ((md->md_options & LNET_MD_TRUNCATE) == 0) {
106                 /* this packet _really_ is too big */
107                 CERROR("Matching packet from %s, match "LPU64
108                        " length %d too big: %d left, %d allowed\n",
109                        libcfs_id2str(src), match_bits, rlength,
110                        md->md_length - offset, mlength);
111
112                 return LNET_MATCHMD_DROP;
113         }
114
115         /* Commit to this ME/MD */
116         CDEBUG(D_NET, "Incoming %s index %x from %s of "
117                "length %d/%d into md "LPX64" [%d] + %d\n",
118                (op_mask == LNET_MD_OP_PUT) ? "put" : "get",
119                index, libcfs_id2str(src), mlength, rlength,
120                md->md_lh.lh_cookie, md->md_niov, offset);
121
122         lnet_commit_md(md, msg);
123         md->md_offset = offset + mlength;
124
125         /* NB Caller will set ev.type and ev.hdr_data */
126         msg->msg_ev.initiator = src;
127         msg->msg_ev.pt_index = index;
128         msg->msg_ev.match_bits = match_bits;
129         msg->msg_ev.rlength = rlength;
130         msg->msg_ev.mlength = mlength;
131         msg->msg_ev.offset = offset;
132
133         lnet_md_deconstruct(md, &msg->msg_ev.md);
134         lnet_md2handle(&msg->msg_ev.md_handle, md);
135
136         *offset_out = offset;
137         *mlength_out = mlength;
138
139         /* Auto-unlink NOW, so the ME gets unlinked if required.
140          * We bumped md->md_refcount above so the MD just gets flagged
141          * for unlink when it is finalized. */
142         if ((md->md_flags & LNET_MD_FLAG_AUTO_UNLINK) != 0 &&
143             lnet_md_exhausted(md)) {
144                 lnet_md_unlink(md);
145         }
146
147         return LNET_MATCHMD_OK;
148 }
149
150 static int
151 lnet_match_md(int index, int op_mask, lnet_process_id_t src,
152               unsigned int rlength, unsigned int roffset,
153               __u64 match_bits, lnet_msg_t *msg,
154               unsigned int *mlength_out, unsigned int *offset_out,
155               lnet_libmd_t **md_out)
156 {
157         lnet_portal_t    *ptl = &the_lnet.ln_portals[index];
158         lnet_me_t        *me;
159         lnet_me_t        *tmp;
160         lnet_libmd_t     *md;
161         int               rc;
162
163         CDEBUG (D_NET, "Request from %s of length %d into portal %d "
164                 "MB="LPX64"\n", libcfs_id2str(src), rlength, index, match_bits);
165
166         if (index < 0 || index >= the_lnet.ln_nportals) {
167                 CERROR("Invalid portal %d not in [0-%d]\n",
168                        index, the_lnet.ln_nportals);
169                 return LNET_MATCHMD_DROP;
170         }
171
172         list_for_each_entry_safe (me, tmp, &ptl->ptl_ml, me_list) {
173                 md = me->me_md;
174
175                 /* ME attached but MD not attached yet */
176                 if (md == NULL)
177                         continue;
178
179                 LASSERT (me == md->md_me);
180
181                 rc = lnet_try_match_md(index, op_mask, src, rlength,
182                                        roffset, match_bits, md, msg,
183                                        mlength_out, offset_out);
184                 switch (rc) {
185                 default:
186                         LBUG();
187
188                 case LNET_MATCHMD_NONE:
189                         continue;
190
191                 case LNET_MATCHMD_OK:
192                         *md_out = md;
193                         return LNET_MATCHMD_OK;
194
195                 case LNET_MATCHMD_DROP:
196                         return LNET_MATCHMD_DROP;
197                 }
198                 /* not reached */
199         }
200
201         if (op_mask == LNET_MD_OP_GET ||
202             (ptl->ptl_options & LNET_PTL_LAZY) == 0)
203                 return LNET_MATCHMD_DROP;
204
205         return LNET_MATCHMD_NONE;
206 }
207
208 int
209 lnet_fail_nid (lnet_nid_t nid, unsigned int threshold)
210 {
211         lnet_test_peer_t   *tp;
212         struct list_head  *el;
213         struct list_head  *next;
214         struct list_head   cull;
215
216         LASSERT (the_lnet.ln_init);
217
218         if (threshold != 0) {
219                 /* Adding a new entry */
220                 LIBCFS_ALLOC(tp, sizeof(*tp));
221                 if (tp == NULL)
222                         return -ENOMEM;
223
224                 tp->tp_nid = nid;
225                 tp->tp_threshold = threshold;
226
227                 LNET_LOCK();
228                 list_add_tail (&tp->tp_list, &the_lnet.ln_test_peers);
229                 LNET_UNLOCK();
230                 return 0;
231         }
232
233         /* removing entries */
234         CFS_INIT_LIST_HEAD (&cull);
235
236         LNET_LOCK();
237
238         list_for_each_safe (el, next, &the_lnet.ln_test_peers) {
239                 tp = list_entry (el, lnet_test_peer_t, tp_list);
240
241                 if (tp->tp_threshold == 0 ||    /* needs culling anyway */
242                     nid == LNET_NID_ANY ||       /* removing all entries */
243                     tp->tp_nid == nid)          /* matched this one */
244                 {
245                         list_del (&tp->tp_list);
246                         list_add (&tp->tp_list, &cull);
247                 }
248         }
249
250         LNET_UNLOCK();
251
252         while (!list_empty (&cull)) {
253                 tp = list_entry (cull.next, lnet_test_peer_t, tp_list);
254
255                 list_del (&tp->tp_list);
256                 LIBCFS_FREE(tp, sizeof (*tp));
257         }
258         return 0;
259 }
260
261 static int
262 fail_peer (lnet_nid_t nid, int outgoing)
263 {
264         lnet_test_peer_t  *tp;
265         struct list_head *el;
266         struct list_head *next;
267         struct list_head  cull;
268         int               fail = 0;
269
270         CFS_INIT_LIST_HEAD (&cull);
271
272         LNET_LOCK();
273
274         list_for_each_safe (el, next, &the_lnet.ln_test_peers) {
275                 tp = list_entry (el, lnet_test_peer_t, tp_list);
276
277                 if (tp->tp_threshold == 0) {
278                         /* zombie entry */
279                         if (outgoing) {
280                                 /* only cull zombies on outgoing tests,
281                                  * since we may be at interrupt priority on
282                                  * incoming messages. */
283                                 list_del (&tp->tp_list);
284                                 list_add (&tp->tp_list, &cull);
285                         }
286                         continue;
287                 }
288
289                 if (tp->tp_nid == LNET_NID_ANY || /* fail every peer */
290                     nid == tp->tp_nid) {        /* fail this peer */
291                         fail = 1;
292
293                         if (tp->tp_threshold != LNET_MD_THRESH_INF) {
294                                 tp->tp_threshold--;
295                                 if (outgoing &&
296                                     tp->tp_threshold == 0) {
297                                         /* see above */
298                                         list_del (&tp->tp_list);
299                                         list_add (&tp->tp_list, &cull);
300                                 }
301                         }
302                         break;
303                 }
304         }
305
306         LNET_UNLOCK ();
307
308         while (!list_empty (&cull)) {
309                 tp = list_entry (cull.next, lnet_test_peer_t, tp_list);
310                 list_del (&tp->tp_list);
311
312                 LIBCFS_FREE(tp, sizeof (*tp));
313         }
314
315         return (fail);
316 }
317
318 unsigned int
319 lnet_iov_nob (unsigned int niov, struct iovec *iov)
320 {
321         unsigned int nob = 0;
322
323         while (niov-- > 0)
324                 nob += (iov++)->iov_len;
325
326         return (nob);
327 }
328
329 void
330 lnet_copy_iov2iov (unsigned int ndiov, struct iovec *diov, unsigned int doffset,
331                    unsigned int nsiov, struct iovec *siov, unsigned int soffset,
332                    unsigned int nob)
333 {
334         /* NB diov, siov are READ-ONLY */
335         unsigned int  this_nob;
336
337         if (nob == 0)
338                 return;
339
340         /* skip complete frags before 'doffset' */
341         LASSERT (ndiov > 0);
342         while (doffset >= diov->iov_len) {
343                 doffset -= diov->iov_len;
344                 diov++;
345                 ndiov--;
346                 LASSERT (ndiov > 0);
347         }
348
349         /* skip complete frags before 'soffset' */
350         LASSERT (nsiov > 0);
351         while (soffset >= siov->iov_len) {
352                 soffset -= siov->iov_len;
353                 siov++;
354                 nsiov--;
355                 LASSERT (nsiov > 0);
356         }
357
358         do {
359                 LASSERT (ndiov > 0);
360                 LASSERT (nsiov > 0);
361                 this_nob = MIN(diov->iov_len - doffset,
362                                siov->iov_len - soffset);
363                 this_nob = MIN(this_nob, nob);
364
365                 memcpy ((char *)diov->iov_base + doffset,
366                         (char *)siov->iov_base + soffset, this_nob);
367                 nob -= this_nob;
368
369                 if (diov->iov_len > doffset + this_nob) {
370                         doffset += this_nob;
371                 } else {
372                         diov++;
373                         ndiov--;
374                         doffset = 0;
375                 }
376
377                 if (siov->iov_len > soffset + this_nob) {
378                         soffset += this_nob;
379                 } else {
380                         siov++;
381                         nsiov--;
382                         soffset = 0;
383                 }
384         } while (nob > 0);
385 }
386
387 int
388 lnet_extract_iov (int dst_niov, struct iovec *dst,
389                   int src_niov, struct iovec *src,
390                   unsigned int offset, unsigned int len)
391 {
392         /* Initialise 'dst' to the subset of 'src' starting at 'offset',
393          * for exactly 'len' bytes, and return the number of entries.
394          * NB not destructive to 'src' */
395         unsigned int    frag_len;
396         unsigned int    niov;
397
398         if (len == 0)                           /* no data => */
399                 return (0);                     /* no frags */
400
401         LASSERT (src_niov > 0);
402         while (offset >= src->iov_len) {      /* skip initial frags */
403                 offset -= src->iov_len;
404                 src_niov--;
405                 src++;
406                 LASSERT (src_niov > 0);
407         }
408
409         niov = 1;
410         for (;;) {
411                 LASSERT (src_niov > 0);
412                 LASSERT (niov <= dst_niov);
413
414                 frag_len = src->iov_len - offset;
415                 dst->iov_base = ((char *)src->iov_base) + offset;
416
417                 if (len <= frag_len) {
418                         dst->iov_len = len;
419                         return (niov);
420                 }
421
422                 dst->iov_len = frag_len;
423
424                 len -= frag_len;
425                 dst++;
426                 src++;
427                 niov++;
428                 src_niov--;
429                 offset = 0;
430         }
431 }
432
433 #ifndef __KERNEL__
434 unsigned int
435 lnet_kiov_nob (unsigned int niov, lnet_kiov_t *kiov)
436 {
437         LASSERT (0);
438         return (0);
439 }
440
441 void
442 lnet_copy_kiov2kiov (unsigned int ndkiov, lnet_kiov_t *dkiov, unsigned int doffset,
443                      unsigned int nskiov, lnet_kiov_t *skiov, unsigned int soffset,
444                      unsigned int nob)
445 {
446         LASSERT (0);
447 }
448
449 void
450 lnet_copy_kiov2iov (unsigned int niov, struct iovec *iov, unsigned int iovoffset,
451                     unsigned int nkiov, lnet_kiov_t *kiov, unsigned int kiovoffset,
452                     unsigned int nob)
453 {
454         LASSERT (0);
455 }
456
457 void
458 lnet_copy_iov2kiov (unsigned int nkiov, lnet_kiov_t *kiov, unsigned int kiovoffset,
459                     unsigned int niov, struct iovec *iov, unsigned int iovoffset,
460                     unsigned int nob)
461 {
462         LASSERT (0);
463 }
464
465 int
466 lnet_extract_kiov (int dst_niov, lnet_kiov_t *dst,
467                    int src_niov, lnet_kiov_t *src,
468                    unsigned int offset, unsigned int len)
469 {
470         LASSERT (0);
471 }
472
473 #else /* __KERNEL__ */
474
475 unsigned int
476 lnet_kiov_nob (unsigned int niov, lnet_kiov_t *kiov)
477 {
478         unsigned int  nob = 0;
479
480         while (niov-- > 0)
481                 nob += (kiov++)->kiov_len;
482
483         return (nob);
484 }
485
486 void
487 lnet_copy_kiov2kiov (unsigned int ndiov, lnet_kiov_t *diov, unsigned int doffset,
488                      unsigned int nsiov, lnet_kiov_t *siov, unsigned int soffset,
489                      unsigned int nob)
490 {
491         /* NB diov, siov are READ-ONLY */
492         unsigned int    this_nob;
493         char           *daddr = NULL;
494         char           *saddr = NULL;
495
496         if (nob == 0)
497                 return;
498
499         LASSERT (!in_interrupt ());
500
501         LASSERT (ndiov > 0);
502         while (doffset >= diov->kiov_len) {
503                 doffset -= diov->kiov_len;
504                 diov++;
505                 ndiov--;
506                 LASSERT (ndiov > 0);
507         }
508
509         LASSERT (nsiov > 0);
510         while (soffset >= siov->kiov_len) {
511                 soffset -= siov->kiov_len;
512                 siov++;
513                 nsiov--;
514                 LASSERT (nsiov > 0);
515         }
516
517         do {
518                 LASSERT (ndiov > 0);
519                 LASSERT (nsiov > 0);
520                 this_nob = MIN(diov->kiov_len - doffset,
521                                siov->kiov_len - soffset);
522                 this_nob = MIN(this_nob, nob);
523
524                 if (daddr == NULL)
525                         daddr = ((char *)cfs_kmap(diov->kiov_page)) + 
526                                 diov->kiov_offset + doffset;
527                 if (saddr == NULL)
528                         saddr = ((char *)cfs_kmap(siov->kiov_page)) + 
529                                 siov->kiov_offset + soffset;
530
531                 /* Vanishing risk of kmap deadlock when mapping 2 pages.
532                  * However in practice at least one of the kiovs will be mapped
533                  * kernel pages and the map/unmap will be NOOPs */
534
535                 memcpy (daddr, saddr, this_nob);
536                 nob -= this_nob;
537
538                 if (diov->kiov_len > doffset + this_nob) {
539                         daddr += this_nob;
540                         doffset += this_nob;
541                 } else {
542                         cfs_kunmap(diov->kiov_page);
543                         daddr = NULL;
544                         diov++;
545                         ndiov--;
546                         doffset = 0;
547                 }
548
549                 if (siov->kiov_len > soffset + this_nob) {
550                         saddr += this_nob;
551                         soffset += this_nob;
552                 } else {
553                         cfs_kunmap(siov->kiov_page);
554                         saddr = NULL;
555                         siov++;
556                         nsiov--;
557                         soffset = 0;
558                 }
559         } while (nob > 0);
560
561         if (daddr != NULL)
562                 cfs_kunmap(diov->kiov_page);
563         if (saddr != NULL)
564                 cfs_kunmap(siov->kiov_page);
565 }
566
567 void
568 lnet_copy_kiov2iov (unsigned int niov, struct iovec *iov, unsigned int iovoffset,
569                     unsigned int nkiov, lnet_kiov_t *kiov, unsigned int kiovoffset,
570                     unsigned int nob)
571 {
572         /* NB iov, kiov are READ-ONLY */
573         unsigned int    this_nob;
574         char           *addr = NULL;
575
576         if (nob == 0)
577                 return;
578
579         LASSERT (!in_interrupt ());
580
581         LASSERT (niov > 0);
582         while (iovoffset >= iov->iov_len) {
583                 iovoffset -= iov->iov_len;
584                 iov++;
585                 niov--;
586                 LASSERT (niov > 0);
587         }
588
589         LASSERT (nkiov > 0);
590         while (kiovoffset >= kiov->kiov_len) {
591                 kiovoffset -= kiov->kiov_len;
592                 kiov++;
593                 nkiov--;
594                 LASSERT (nkiov > 0);
595         }
596
597         do {
598                 LASSERT (niov > 0);
599                 LASSERT (nkiov > 0);
600                 this_nob = MIN(iov->iov_len - iovoffset,
601                                kiov->kiov_len - kiovoffset);
602                 this_nob = MIN(this_nob, nob);
603
604                 if (addr == NULL)
605                         addr = ((char *)cfs_kmap(kiov->kiov_page)) + 
606                                 kiov->kiov_offset + kiovoffset;
607
608                 memcpy ((char *)iov->iov_base + iovoffset, addr, this_nob);
609                 nob -= this_nob;
610
611                 if (iov->iov_len > iovoffset + this_nob) {
612                         iovoffset += this_nob;
613                 } else {
614                         iov++;
615                         niov--;
616                         iovoffset = 0;
617                 }
618
619                 if (kiov->kiov_len > kiovoffset + this_nob) {
620                         addr += this_nob;
621                         kiovoffset += this_nob;
622                 } else {
623                         cfs_kunmap(kiov->kiov_page);
624                         addr = NULL;
625                         kiov++;
626                         nkiov--;
627                         kiovoffset = 0;
628                 }
629
630         } while (nob > 0);
631
632         if (addr != NULL)
633                 cfs_kunmap(kiov->kiov_page);
634 }
635
636 void
637 lnet_copy_iov2kiov (unsigned int nkiov, lnet_kiov_t *kiov, unsigned int kiovoffset,
638                     unsigned int niov, struct iovec *iov, unsigned int iovoffset,
639                     unsigned int nob)
640 {
641         /* NB kiov, iov are READ-ONLY */
642         unsigned int    this_nob;
643         char           *addr = NULL;
644
645         if (nob == 0)
646                 return;
647
648         LASSERT (!in_interrupt ());
649
650         LASSERT (nkiov > 0);
651         while (kiovoffset >= kiov->kiov_len) {
652                 kiovoffset -= kiov->kiov_len;
653                 kiov++;
654                 nkiov--;
655                 LASSERT (nkiov > 0);
656         }
657
658         LASSERT (niov > 0);
659         while (iovoffset >= iov->iov_len) {
660                 iovoffset -= iov->iov_len;
661                 iov++;
662                 niov--;
663                 LASSERT (niov > 0);
664         }
665
666         do {
667                 LASSERT (nkiov > 0);
668                 LASSERT (niov > 0);
669                 this_nob = MIN(kiov->kiov_len - kiovoffset,
670                                iov->iov_len - iovoffset);
671                 this_nob = MIN(this_nob, nob);
672
673                 if (addr == NULL)
674                         addr = ((char *)cfs_kmap(kiov->kiov_page)) + 
675                                 kiov->kiov_offset + kiovoffset;
676
677                 memcpy (addr, (char *)iov->iov_base + iovoffset, this_nob);
678                 nob -= this_nob;
679
680                 if (kiov->kiov_len > kiovoffset + this_nob) {
681                         addr += this_nob;
682                         kiovoffset += this_nob;
683                 } else {
684                         cfs_kunmap(kiov->kiov_page);
685                         addr = NULL;
686                         kiov++;
687                         nkiov--;
688                         kiovoffset = 0;
689                 }
690
691                 if (iov->iov_len > iovoffset + this_nob) {
692                         iovoffset += this_nob;
693                 } else {
694                         iov++;
695                         niov--;
696                         iovoffset = 0;
697                 }
698         } while (nob > 0);
699
700         if (addr != NULL)
701                 cfs_kunmap(kiov->kiov_page);
702 }
703
704 int
705 lnet_extract_kiov (int dst_niov, lnet_kiov_t *dst,
706                    int src_niov, lnet_kiov_t *src,
707                    unsigned int offset, unsigned int len)
708 {
709         /* Initialise 'dst' to the subset of 'src' starting at 'offset',
710          * for exactly 'len' bytes, and return the number of entries.
711          * NB not destructive to 'src' */
712         unsigned int    frag_len;
713         unsigned int    niov;
714
715         if (len == 0)                           /* no data => */
716                 return (0);                     /* no frags */
717
718         LASSERT (src_niov > 0);
719         while (offset >= src->kiov_len) {      /* skip initial frags */
720                 offset -= src->kiov_len;
721                 src_niov--;
722                 src++;
723                 LASSERT (src_niov > 0);
724         }
725
726         niov = 1;
727         for (;;) {
728                 LASSERT (src_niov > 0);
729                 LASSERT (niov <= dst_niov);
730
731                 frag_len = src->kiov_len - offset;
732                 dst->kiov_page = src->kiov_page;
733                 dst->kiov_offset = src->kiov_offset + offset;
734
735                 if (len <= frag_len) {
736                         dst->kiov_len = len;
737                         LASSERT (dst->kiov_offset + dst->kiov_len <= CFS_PAGE_SIZE);
738                         return (niov);
739                 }
740
741                 dst->kiov_len = frag_len;
742                 LASSERT (dst->kiov_offset + dst->kiov_len <= CFS_PAGE_SIZE);
743
744                 len -= frag_len;
745                 dst++;
746                 src++;
747                 niov++;
748                 src_niov--;
749                 offset = 0;
750         }
751 }
752 #endif
753
754 void
755 lnet_ni_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed,
756              unsigned int offset, unsigned int mlen, unsigned int rlen)
757 {
758         unsigned int  niov = 0;
759         struct iovec *iov = NULL;
760         lnet_kiov_t  *kiov = NULL;
761         int           rc;
762
763         LASSERT (!in_interrupt ());
764         LASSERT (mlen == 0 || msg != NULL);
765
766         if (msg != NULL) {
767                 LASSERT(msg->msg_receiving);
768                 LASSERT(!msg->msg_sending);
769                 LASSERT(rlen == msg->msg_len);
770                 LASSERT(mlen <= msg->msg_len);
771
772                 msg->msg_wanted = mlen;
773                 msg->msg_offset = offset;
774                 msg->msg_receiving = 0;
775
776                 if (mlen != 0) {
777                         niov = msg->msg_niov;
778                         iov  = msg->msg_iov;
779                         kiov = msg->msg_kiov;
780
781                         LASSERT (niov > 0);
782                         LASSERT ((iov == NULL) != (kiov == NULL));
783                 }
784         }
785
786         rc = (ni->ni_lnd->lnd_recv)(ni, private, msg, delayed,
787                                     niov, iov, kiov, offset, mlen, rlen);
788         if (rc < 0)
789                 lnet_finalize(ni, msg, rc);
790 }
791
792 int
793 lnet_compare_routers(lnet_peer_t *p1, lnet_peer_t *p2)
794 {
795         if (p1->lp_txqnob < p2->lp_txqnob)
796                 return 1;
797
798         if (p1->lp_txqnob > p2->lp_txqnob)
799                 return -1;
800
801         if (p1->lp_txcredits > p2->lp_txcredits)
802                 return 1;
803
804         if (p1->lp_txcredits < p2->lp_txcredits)
805                 return -1;
806
807         return 0;
808 }
809
810
811 void
812 lnet_setpayloadbuffer(lnet_msg_t *msg)
813 {
814         lnet_libmd_t *md = msg->msg_md;
815
816         LASSERT (msg->msg_len > 0);
817         LASSERT (!msg->msg_routing);
818         LASSERT (md != NULL);
819         LASSERT (msg->msg_niov == 0);
820         LASSERT (msg->msg_iov == NULL);
821         LASSERT (msg->msg_kiov == NULL);
822
823         msg->msg_niov = md->md_niov;
824         if ((md->md_options & LNET_MD_KIOV) != 0)
825                 msg->msg_kiov = md->md_iov.kiov;
826         else
827                 msg->msg_iov = md->md_iov.iov;
828 }
829
830 void
831 lnet_prep_send(lnet_msg_t *msg, int type, lnet_process_id_t target,
832                unsigned int offset, unsigned int len) 
833 {
834         msg->msg_type = type;
835         msg->msg_target = target;
836         msg->msg_len = len;
837         msg->msg_offset = offset;
838
839         if (len != 0)
840                 lnet_setpayloadbuffer(msg);
841
842         memset (&msg->msg_hdr, 0, sizeof (msg->msg_hdr));
843         msg->msg_hdr.type           = cpu_to_le32(type);
844         msg->msg_hdr.dest_nid       = cpu_to_le64(target.nid);
845         msg->msg_hdr.dest_pid       = cpu_to_le32(target.pid);
846         /* src_nid will be set later */
847         msg->msg_hdr.src_pid        = cpu_to_le32(the_lnet.ln_pid);
848         msg->msg_hdr.payload_length = cpu_to_le32(len);
849 }
850
851 void
852 lnet_ni_send(lnet_ni_t *ni, lnet_msg_t *msg)
853 {
854         void   *priv = msg->msg_private;
855         int     rc;
856
857         LASSERT (!in_interrupt ());
858         LASSERT (LNET_NETTYP(LNET_NIDNET(ni->ni_nid)) == LOLND ||
859                  (msg->msg_txcredit && msg->msg_peertxcredit));
860
861         rc = (ni->ni_lnd->lnd_send)(ni, priv, msg);
862         if (rc < 0)
863                 lnet_finalize(ni, msg, rc);
864 }
865
866 int
867 lnet_eager_recv_locked(lnet_msg_t *msg)
868 {
869         lnet_peer_t *peer;
870         lnet_ni_t   *ni;
871         int          rc = 0;
872
873         LASSERT (!msg->msg_delayed);
874         msg->msg_delayed = 1;
875
876         LASSERT (msg->msg_receiving);
877         LASSERT (!msg->msg_sending);
878
879         peer = msg->msg_rxpeer;
880         ni   = peer->lp_ni;
881
882         if (ni->ni_lnd->lnd_eager_recv != NULL) {
883                 LNET_UNLOCK();
884
885                 rc = (ni->ni_lnd->lnd_eager_recv)(ni, msg->msg_private, msg,
886                                                   &msg->msg_private);
887                 if (rc != 0) {
888                         CERROR("recv from %s / send to %s aborted: "
889                                "eager_recv failed %d\n",
890                                libcfs_nid2str(peer->lp_nid),
891                                libcfs_id2str(msg->msg_target), rc);
892                         LASSERT (rc < 0); /* required by my callers */
893                 }
894
895                 LNET_LOCK();
896         }
897
898         return rc;
899 }
900
901 int
902 lnet_post_send_locked (lnet_msg_t *msg, int do_send)
903 {
904         /* lnet_send is going to LNET_UNLOCK immediately after this, so it sets
905          * do_send FALSE and I don't do the unlock/send/lock bit.  I return
906          * EAGAIN if msg blocked and 0 if sent or OK to send */
907         lnet_peer_t *lp = msg->msg_txpeer;
908         lnet_ni_t   *ni = lp->lp_ni;
909
910         /* non-lnet_send() callers have checked before */
911         LASSERT (!do_send || msg->msg_delayed);
912         LASSERT (!msg->msg_receiving);
913
914         if (!msg->msg_peertxcredit) {
915                 LASSERT ((lp->lp_txcredits < 0) == !list_empty(&lp->lp_txq));
916
917                 msg->msg_peertxcredit = 1;
918                 lp->lp_txqnob += msg->msg_len + sizeof(lnet_hdr_t);
919                 lp->lp_txcredits--;
920
921                 if (lp->lp_txcredits < lp->lp_mintxcredits)
922                         lp->lp_mintxcredits = lp->lp_txcredits;
923
924                 if (lp->lp_txcredits < 0) {
925                         msg->msg_delayed = 1;
926                         list_add_tail (&msg->msg_list, &lp->lp_txq);
927                         return EAGAIN;
928                 }
929         }
930
931         if (!msg->msg_txcredit) {
932                 LASSERT ((ni->ni_txcredits < 0) == !list_empty(&ni->ni_txq));
933
934                 msg->msg_txcredit = 1;
935                 ni->ni_txcredits--;
936
937                 if (ni->ni_txcredits < ni->ni_mintxcredits)
938                         ni->ni_mintxcredits = ni->ni_txcredits;
939
940                 if (ni->ni_txcredits < 0) {
941                         msg->msg_delayed = 1;
942                         list_add_tail (&msg->msg_list, &ni->ni_txq);
943                         return EAGAIN;
944                 }
945         }
946
947         if (do_send) {
948                 LNET_UNLOCK();
949                 lnet_ni_send(ni, msg);
950                 LNET_LOCK();
951         }
952         return 0;
953 }
954
955 #ifdef __KERNEL__
956 static void
957 lnet_commit_routedmsg (lnet_msg_t *msg)
958 {
959         /* ALWAYS called holding the LNET_LOCK */
960         LASSERT (msg->msg_routing);
961
962         the_lnet.ln_counters.msgs_alloc++;
963         if (the_lnet.ln_counters.msgs_alloc >
964             the_lnet.ln_counters.msgs_max)
965                 the_lnet.ln_counters.msgs_max =
966                         the_lnet.ln_counters.msgs_alloc;
967
968         the_lnet.ln_counters.route_count++;
969         the_lnet.ln_counters.route_length += msg->msg_len;
970
971         LASSERT (!msg->msg_onactivelist);
972         msg->msg_onactivelist = 1;
973         list_add (&msg->msg_activelist, &the_lnet.ln_active_msgs);
974 }
975
976 lnet_rtrbufpool_t *
977 lnet_msg2bufpool(lnet_msg_t *msg)
978 {
979         lnet_rtrbufpool_t *rbp = &the_lnet.ln_rtrpools[0];
980
981         LASSERT (msg->msg_len <= LNET_MTU);
982         while (msg->msg_len > rbp->rbp_npages * CFS_PAGE_SIZE) {
983                 rbp++;
984                 LASSERT (rbp < &the_lnet.ln_rtrpools[LNET_NRBPOOLS]);
985         }
986
987         return rbp;
988 }
989
990 int
991 lnet_post_routed_recv_locked (lnet_msg_t *msg, int do_recv)
992 {
993         /* lnet_parse is going to LNET_UNLOCK immediately after this, so it
994          * sets do_recv FALSE and I don't do the unlock/send/lock bit.  I
995          * return EAGAIN if msg blocked and 0 if sent or OK to send */
996         lnet_peer_t         *lp = msg->msg_rxpeer;
997         lnet_rtrbufpool_t   *rbp;
998         lnet_rtrbuf_t       *rb;
999
1000         LASSERT (msg->msg_iov == NULL);
1001         LASSERT (msg->msg_kiov == NULL);
1002         LASSERT (msg->msg_niov == 0);
1003         LASSERT (msg->msg_routing);
1004         LASSERT (msg->msg_receiving);
1005         LASSERT (!msg->msg_sending);
1006
1007         /* non-lnet_parse callers only send delayed messages */
1008         LASSERT (!do_recv || msg->msg_delayed);
1009
1010         if (!msg->msg_peerrtrcredit) {
1011                 LASSERT ((lp->lp_rtrcredits < 0) == !list_empty(&lp->lp_rtrq));
1012
1013                 msg->msg_peerrtrcredit = 1;
1014                 lp->lp_rtrcredits--;
1015                 if (lp->lp_rtrcredits < lp->lp_minrtrcredits)
1016                         lp->lp_minrtrcredits = lp->lp_rtrcredits;
1017
1018                 if (lp->lp_rtrcredits < 0) {
1019                         /* must have checked eager_recv before here */
1020                         LASSERT (msg->msg_delayed);
1021                         list_add_tail(&msg->msg_list, &lp->lp_rtrq);
1022                         return EAGAIN;
1023                 }
1024         }
1025
1026         rbp = lnet_msg2bufpool(msg);
1027
1028         if (!msg->msg_rtrcredit) {
1029                 LASSERT ((rbp->rbp_credits < 0) == !list_empty(&rbp->rbp_msgs));
1030
1031                 msg->msg_rtrcredit = 1;
1032                 rbp->rbp_credits--;
1033                 if (rbp->rbp_credits < rbp->rbp_mincredits)
1034                         rbp->rbp_mincredits = rbp->rbp_credits;
1035
1036                 if (rbp->rbp_credits < 0) {
1037                         /* must have checked eager_recv before here */
1038                         LASSERT (msg->msg_delayed);
1039                         list_add_tail(&msg->msg_list, &rbp->rbp_msgs);
1040                         return EAGAIN;
1041                 }
1042         }
1043
1044         LASSERT (!list_empty(&rbp->rbp_bufs));
1045         rb = list_entry(rbp->rbp_bufs.next, lnet_rtrbuf_t, rb_list);
1046         list_del(&rb->rb_list);
1047
1048         msg->msg_niov = rbp->rbp_npages;
1049         msg->msg_kiov = &rb->rb_kiov[0];
1050
1051         if (do_recv) {
1052                 LNET_UNLOCK();
1053                 lnet_ni_recv(lp->lp_ni, msg->msg_private, msg, 1,
1054                              0, msg->msg_len, msg->msg_len);
1055                 LNET_LOCK();
1056         }
1057         return 0;
1058 }
1059 #endif
1060
1061 void
1062 lnet_return_credits_locked (lnet_msg_t *msg)
1063 {
1064         lnet_peer_t       *txpeer = msg->msg_txpeer;
1065         lnet_peer_t       *rxpeer = msg->msg_rxpeer;
1066         lnet_msg_t        *msg2;
1067         lnet_ni_t         *ni;
1068
1069         if (msg->msg_txcredit) {
1070                 /* give back NI txcredits */
1071                 msg->msg_txcredit = 0;
1072                 ni = txpeer->lp_ni;
1073
1074                 LASSERT((ni->ni_txcredits < 0) == !list_empty(&ni->ni_txq));
1075
1076                 ni->ni_txcredits++;
1077                 if (ni->ni_txcredits <= 0) {
1078                         msg2 = list_entry(ni->ni_txq.next, lnet_msg_t, msg_list);
1079                         list_del(&msg2->msg_list);
1080
1081                         LASSERT(msg2->msg_txpeer->lp_ni == ni);
1082                         LASSERT(msg2->msg_delayed);
1083
1084                         (void) lnet_post_send_locked(msg2, 1);
1085                 }
1086         }
1087
1088         if (msg->msg_peertxcredit) {
1089                 /* give back peer txcredits */
1090                 msg->msg_peertxcredit = 0;
1091
1092                 LASSERT((txpeer->lp_txcredits < 0) == !list_empty(&txpeer->lp_txq));
1093
1094                 txpeer->lp_txqnob -= msg->msg_len + sizeof(lnet_hdr_t);
1095                 LASSERT (txpeer->lp_txqnob >= 0);
1096
1097                 txpeer->lp_txcredits++;
1098                 if (txpeer->lp_txcredits <= 0) {
1099                         msg2 = list_entry(txpeer->lp_txq.next,
1100                                           lnet_msg_t, msg_list);
1101                         list_del(&msg2->msg_list);
1102
1103                         LASSERT (msg2->msg_txpeer == txpeer);
1104                         LASSERT (msg2->msg_delayed);
1105
1106                         (void) lnet_post_send_locked(msg2, 1);
1107                 }
1108         }
1109
1110         if (txpeer != NULL) {
1111                 msg->msg_txpeer = NULL;
1112                 lnet_peer_decref_locked(txpeer);
1113         }
1114
1115 #ifdef __KERNEL__
1116         if (msg->msg_rtrcredit) {
1117                 /* give back global router credits */
1118                 lnet_rtrbuf_t     *rb;
1119                 lnet_rtrbufpool_t *rbp;
1120
1121                 /* NB If a msg ever blocks for a buffer in rbp_msgs, it stays
1122                  * there until it gets one allocated, or aborts the wait
1123                  * itself */
1124                 LASSERT (msg->msg_kiov != NULL);
1125
1126                 rb = list_entry(msg->msg_kiov, lnet_rtrbuf_t, rb_kiov[0]);
1127                 rbp = rb->rb_pool;
1128                 LASSERT (rbp == lnet_msg2bufpool(msg));
1129
1130                 msg->msg_kiov = NULL;
1131                 msg->msg_rtrcredit = 0;
1132
1133                 LASSERT((rbp->rbp_credits < 0) == !list_empty(&rbp->rbp_msgs));
1134                 LASSERT((rbp->rbp_credits > 0) == !list_empty(&rbp->rbp_bufs));
1135
1136                 list_add(&rb->rb_list, &rbp->rbp_bufs);
1137                 rbp->rbp_credits++;
1138                 if (rbp->rbp_credits <= 0) {
1139                         msg2 = list_entry(rbp->rbp_msgs.next,
1140                                           lnet_msg_t, msg_list);
1141                         list_del(&msg2->msg_list);
1142
1143                         (void) lnet_post_routed_recv_locked(msg2, 1);
1144                 }
1145         }
1146
1147         if (msg->msg_peerrtrcredit) {
1148                 /* give back peer router credits */
1149                 msg->msg_peerrtrcredit = 0;
1150
1151                 LASSERT((rxpeer->lp_rtrcredits < 0) == !list_empty(&rxpeer->lp_rtrq));
1152
1153                 rxpeer->lp_rtrcredits++;
1154                 if (rxpeer->lp_rtrcredits <= 0) {
1155                         msg2 = list_entry(rxpeer->lp_rtrq.next,
1156                                           lnet_msg_t, msg_list);
1157                         list_del(&msg2->msg_list);
1158
1159                         (void) lnet_post_routed_recv_locked(msg2, 1);
1160                 }
1161         }
1162 #else
1163         LASSERT (!msg->msg_rtrcredit);
1164         LASSERT (!msg->msg_peerrtrcredit);
1165 #endif
1166         if (rxpeer != NULL) {
1167                 msg->msg_rxpeer = NULL;
1168                 lnet_peer_decref_locked(rxpeer);
1169         }
1170 }
1171
1172 int
1173 lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg)
1174 {
1175         lnet_nid_t        dst_nid = msg->msg_target.nid;
1176         lnet_ni_t        *src_ni;
1177         lnet_ni_t        *local_ni;
1178         lnet_remotenet_t *rnet;
1179         lnet_route_t     *route;
1180         lnet_route_t     *best_route;
1181         struct list_head *tmp;
1182         lnet_peer_t      *lp;
1183         lnet_peer_t      *lp2;
1184         int               rc;
1185
1186         LASSERT (msg->msg_txpeer == NULL);
1187         LASSERT (!msg->msg_sending);
1188         LASSERT (!msg->msg_target_is_router);
1189         LASSERT (!msg->msg_receiving);
1190
1191         msg->msg_sending = 1;
1192
1193         /* NB! ni != NULL == interface pre-determined (ACK/REPLY) */
1194
1195         LNET_LOCK();
1196
1197         if (the_lnet.ln_shutdown) {
1198                 LNET_UNLOCK();
1199                 return -ESHUTDOWN;
1200         }
1201
1202         if (src_nid == LNET_NID_ANY) {
1203                 src_ni = NULL;
1204         } else {
1205                 src_ni = lnet_nid2ni_locked(src_nid);
1206                 if (src_ni == NULL) {
1207                         LNET_UNLOCK();
1208                         CERROR("Can't send to %s: src %s is not a local nid\n",
1209                                libcfs_nid2str(dst_nid), libcfs_nid2str(src_nid));
1210                         return -EINVAL;
1211                 }
1212                 LASSERT (!msg->msg_routing);
1213         }
1214
1215         /* Is this for someone on a local network? */
1216         local_ni = lnet_net2ni_locked(LNET_NIDNET(dst_nid));
1217
1218         if (local_ni != NULL) {
1219                 if (src_ni == NULL) {
1220                         src_ni = local_ni;
1221                         src_nid = src_ni->ni_nid;
1222                 } else if (src_ni == local_ni) {
1223                         lnet_ni_decref_locked(local_ni);
1224                 } else {
1225                         lnet_ni_decref_locked(local_ni);
1226                         lnet_ni_decref_locked(src_ni);
1227                         LNET_UNLOCK();
1228                         CERROR("no route to %s via from %s\n",
1229                                libcfs_nid2str(dst_nid), libcfs_nid2str(src_nid));
1230                         return -EINVAL;
1231                 }
1232
1233                 LASSERT (src_nid != LNET_NID_ANY);
1234
1235                 if (!msg->msg_routing)
1236                         msg->msg_hdr.src_nid = cpu_to_le64(src_nid);
1237
1238                 if (src_ni == the_lnet.ln_loni) {
1239                         /* No send credit hassles with LOLND */
1240                         LNET_UNLOCK();
1241                         lnet_ni_send(src_ni, msg);
1242                         lnet_ni_decref(src_ni);
1243                         return 0;
1244                 }
1245
1246                 rc = lnet_nid2peer_locked(&lp, dst_nid);
1247                 lnet_ni_decref_locked(src_ni);  /* lp has ref on src_ni; lose mine */
1248                 if (rc != 0) {
1249                         LNET_UNLOCK();
1250                         CERROR("Error %d finding peer %s\n", rc,
1251                                libcfs_nid2str(dst_nid));
1252                         /* ENOMEM or shutting down */
1253                         return rc;
1254                 }
1255                 LASSERT (lp->lp_ni == src_ni);
1256         } else {
1257                 /* sending to a remote network */
1258                 rnet = lnet_find_net_locked(LNET_NIDNET(dst_nid));
1259                 if (rnet == NULL) {
1260                         if (src_ni != NULL)
1261                                 lnet_ni_decref_locked(src_ni);
1262                         LNET_UNLOCK();
1263                         CERROR("No route to %s\n", libcfs_id2str(msg->msg_target));
1264                         return -EHOSTUNREACH;
1265                 }
1266
1267                 /* Find the best gateway I can use */
1268                 lp = NULL;
1269                 best_route = NULL;
1270                 list_for_each(tmp, &rnet->lrn_routes) {
1271                         route = list_entry(tmp, lnet_route_t, lr_list);
1272                         lp2 = route->lr_gateway;
1273
1274                         if (lp2->lp_alive &&
1275                             (src_ni == NULL || lp2->lp_ni == src_ni) &&
1276                             (lp == NULL || lnet_compare_routers(lp2, lp) > 0)) {
1277                                 best_route = route;
1278                                 lp = lp2;
1279                         }
1280                 }
1281
1282                 if (lp == NULL) {
1283                         if (src_ni != NULL)
1284                                 lnet_ni_decref_locked(src_ni);
1285                         LNET_UNLOCK();
1286                         CERROR("No route to %s (all routers down)\n",
1287                                libcfs_id2str(msg->msg_target));
1288                         return -EHOSTUNREACH;
1289                 }
1290
1291                 /* Place selected route at the end of the route list to ensure
1292                  * fairness; everything else being equal... */
1293                 list_del(&best_route->lr_list);
1294                 list_add_tail(&best_route->lr_list, &rnet->lrn_routes);
1295
1296                 if (src_ni == NULL) {
1297                         src_ni = lp->lp_ni;
1298                         src_nid = src_ni->ni_nid;
1299                 } else {
1300                         LASSERT (src_ni == lp->lp_ni);
1301                         lnet_ni_decref_locked(src_ni);
1302                 }
1303
1304                 lnet_peer_addref_locked(lp);
1305
1306                 LASSERT (src_nid != LNET_NID_ANY);
1307
1308                 if (!msg->msg_routing) {
1309                         /* I'm the source and now I know which NI to send on */
1310                         msg->msg_hdr.src_nid = cpu_to_le64(src_nid);
1311                 }
1312
1313                 msg->msg_target_is_router = 1;
1314                 msg->msg_target.nid = lp->lp_nid;
1315                 msg->msg_target.pid = LUSTRE_SRV_LNET_PID;
1316         }
1317
1318         /* 'lp' is our best choice of peer */
1319
1320         LASSERT (!msg->msg_peertxcredit);
1321         LASSERT (!msg->msg_txcredit);
1322         LASSERT (msg->msg_txpeer == NULL);
1323
1324         msg->msg_txpeer = lp;                   /* msg takes my ref on lp */
1325
1326         rc = lnet_post_send_locked(msg, 0);
1327         LNET_UNLOCK();
1328
1329         if (rc == 0)
1330                 lnet_ni_send(src_ni, msg);
1331
1332         return 0;
1333 }
1334
1335 static void
1336 lnet_commit_md (lnet_libmd_t *md, lnet_msg_t *msg)
1337 {
1338         /* ALWAYS called holding the LNET_LOCK */
1339         /* Here, we commit the MD to a network OP by marking it busy and
1340          * decrementing its threshold.  Come what may, the network "owns"
1341          * the MD until a call to lnet_finalize() signals completion. */
1342         LASSERT (!msg->msg_routing);
1343
1344         msg->msg_md = md;
1345
1346         md->md_refcount++;
1347         if (md->md_threshold != LNET_MD_THRESH_INF) {
1348                 LASSERT (md->md_threshold > 0);
1349                 md->md_threshold--;
1350         }
1351
1352         the_lnet.ln_counters.msgs_alloc++;
1353         if (the_lnet.ln_counters.msgs_alloc > 
1354             the_lnet.ln_counters.msgs_max)
1355                 the_lnet.ln_counters.msgs_max = 
1356                         the_lnet.ln_counters.msgs_alloc;
1357
1358         LASSERT (!msg->msg_onactivelist);
1359         msg->msg_onactivelist = 1;
1360         list_add (&msg->msg_activelist, &the_lnet.ln_active_msgs);
1361 }
1362
1363 static void
1364 lnet_drop_message (lnet_ni_t *ni, void *private, unsigned int nob)
1365 {
1366         LNET_LOCK();
1367         the_lnet.ln_counters.drop_count++;
1368         the_lnet.ln_counters.drop_length += nob;
1369         LNET_UNLOCK();
1370
1371         lnet_ni_recv(ni, private, NULL, 0, 0, 0, nob);
1372 }
1373
1374 static void
1375 lnet_drop_delayed_put(lnet_msg_t *msg, char *reason)
1376 {
1377         LASSERT (msg->msg_md == NULL);
1378         LASSERT (msg->msg_delayed);
1379         LASSERT (msg->msg_rxpeer != NULL);
1380         LASSERT (msg->msg_hdr.type == LNET_MSG_PUT);
1381
1382         CWARN("Dropping delayed PUT from %s portal %d match "LPU64
1383               " offset %d length %d: %s\n", 
1384               libcfs_id2str((lnet_process_id_t){
1385                       .nid = msg->msg_hdr.src_nid,
1386                       .pid = msg->msg_hdr.src_pid}),
1387               msg->msg_hdr.msg.put.ptl_index,
1388               msg->msg_hdr.msg.put.match_bits,
1389               msg->msg_hdr.msg.put.offset,
1390               msg->msg_hdr.payload_length,
1391               reason);
1392
1393         /* NB I can't drop msg's ref on msg_rxpeer until after I've
1394          * called lnet_drop_message(), so I just hang onto msg as well
1395          * until that's done */
1396
1397         lnet_drop_message(msg->msg_rxpeer->lp_ni,
1398                           msg->msg_private, msg->msg_len);
1399
1400         LNET_LOCK();
1401
1402         lnet_peer_decref_locked(msg->msg_rxpeer);
1403         msg->msg_rxpeer = NULL;
1404
1405         lnet_msg_free(msg);
1406
1407         LNET_UNLOCK();
1408 }
1409
1410 int
1411 LNetSetLazyPortal(int portal)
1412 {
1413         lnet_portal_t *ptl = &the_lnet.ln_portals[portal];
1414
1415         if (portal < 0 || portal >= the_lnet.ln_nportals)
1416                 return -EINVAL;
1417
1418         CDEBUG(D_NET, "Setting portal %d lazy\n", portal);
1419
1420         LNET_LOCK();
1421
1422         ptl->ptl_options |= LNET_PTL_LAZY;
1423
1424         LNET_UNLOCK();
1425
1426         return 0;
1427 }
1428
1429 int
1430 LNetClearLazyPortal(int portal)
1431 {
1432         struct list_head  zombies;
1433         lnet_portal_t    *ptl = &the_lnet.ln_portals[portal];
1434         lnet_msg_t       *msg;
1435
1436         if (portal < 0 || portal >= the_lnet.ln_nportals)
1437                 return -EINVAL;
1438
1439         LNET_LOCK();
1440
1441         if ((ptl->ptl_options & LNET_PTL_LAZY) == 0) {
1442                 LNET_UNLOCK();
1443                 return 0;
1444         }
1445
1446         if (the_lnet.ln_shutdown)
1447                 CWARN ("Active lazy portal %d on exit\n", portal);
1448         else
1449                 CDEBUG (D_NET, "clearing portal %d lazy\n", portal);
1450
1451         /* grab all the blocked messages atomically */
1452         list_add(&zombies, &ptl->ptl_msgq);
1453         list_del_init(&ptl->ptl_msgq);
1454
1455         ptl->ptl_msgq_version++;
1456         ptl->ptl_options &= ~LNET_PTL_LAZY;
1457
1458         LNET_UNLOCK();
1459
1460         while (!list_empty(&zombies)) {
1461                 msg = list_entry(zombies.next, lnet_msg_t, msg_list);
1462                 list_del(&msg->msg_list);
1463
1464                 lnet_drop_delayed_put(msg, "Clearing lazy portal attr");
1465         }
1466
1467         return 0;
1468 }
1469
1470 static void
1471 lnet_recv_put(lnet_libmd_t *md, lnet_msg_t *msg, int delayed,
1472               unsigned int offset, unsigned int mlength)
1473 {
1474         lnet_hdr_t       *hdr = &msg->msg_hdr;
1475
1476         LNET_LOCK();
1477
1478         the_lnet.ln_counters.recv_count++;
1479         the_lnet.ln_counters.recv_length += mlength;
1480
1481         LNET_UNLOCK();
1482
1483         if (mlength != 0)
1484                 lnet_setpayloadbuffer(msg);
1485
1486         msg->msg_ev.type       = LNET_EVENT_PUT;
1487         msg->msg_ev.target.pid = hdr->dest_pid;
1488         msg->msg_ev.target.nid = hdr->dest_nid;
1489         msg->msg_ev.hdr_data   = hdr->msg.put.hdr_data;
1490
1491         /* Must I ACK?  If so I'll grab the ack_wmd out of the header and put
1492          * it back into the ACK during lnet_finalize() */
1493         msg->msg_ack = (!lnet_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
1494                         (md->md_options & LNET_MD_ACK_DISABLE) == 0);
1495
1496         lnet_ni_recv(msg->msg_rxpeer->lp_ni,
1497                      msg->msg_private,
1498                      msg, delayed, offset, mlength,
1499                      hdr->payload_length);
1500 }
1501
1502 /* called with LNET_LOCK held */
1503 void
1504 lnet_match_blocked_msg(lnet_libmd_t *md)
1505 {
1506         CFS_LIST_HEAD    (drops);
1507         CFS_LIST_HEAD    (matches);
1508         struct list_head *tmp;
1509         struct list_head *entry;
1510         lnet_msg_t       *msg;
1511         lnet_me_t        *me  = md->md_me;
1512         lnet_portal_t    *ptl = &the_lnet.ln_portals[me->me_portal];
1513
1514         LASSERT (me->me_portal < the_lnet.ln_nportals);
1515
1516         if ((ptl->ptl_options & LNET_PTL_LAZY) == 0) {
1517                 LASSERT (list_empty(&ptl->ptl_msgq));
1518                 return;
1519         }
1520
1521         LASSERT (md->md_refcount == 0); /* a brand new MD */
1522
1523         list_for_each_safe (entry, tmp, &ptl->ptl_msgq) {
1524                 int               rc;
1525                 int               index;
1526                 unsigned int      mlength;
1527                 unsigned int      offset;
1528                 lnet_hdr_t       *hdr;
1529                 lnet_process_id_t src;
1530
1531                 msg = list_entry(entry, lnet_msg_t, msg_list);
1532
1533                 LASSERT (msg->msg_delayed);
1534
1535                 hdr   = &msg->msg_hdr;
1536                 index = hdr->msg.put.ptl_index;
1537
1538                 src.nid = hdr->src_nid;
1539                 src.pid = hdr->src_pid;
1540
1541                 rc = lnet_try_match_md(index, LNET_MD_OP_PUT, src,
1542                                        hdr->payload_length,
1543                                        hdr->msg.put.offset,
1544                                        hdr->msg.put.match_bits,
1545                                        md, msg, &mlength, &offset);
1546
1547                 if (rc == LNET_MATCHMD_NONE)
1548                         continue;
1549
1550                 /* Hurrah! This _is_ a match */
1551                 list_del(&msg->msg_list);
1552                 ptl->ptl_msgq_version++;
1553
1554                 if (rc == LNET_MATCHMD_OK) {
1555                         list_add_tail(&msg->msg_list, &matches);
1556
1557                         CDEBUG(D_NET, "Resuming delayed PUT from %s portal %d "
1558                                "match "LPU64" offset %d length %d.\n",
1559                                libcfs_id2str(src),
1560                                hdr->msg.put.ptl_index,
1561                                hdr->msg.put.match_bits,
1562                                hdr->msg.put.offset,
1563                                hdr->payload_length);
1564                 } else {
1565                         LASSERT (rc == LNET_MATCHMD_DROP);
1566
1567                         list_add_tail(&msg->msg_list, &drops);
1568                 }
1569
1570                 if (lnet_md_exhausted(md))
1571                         break;
1572         }
1573
1574         LNET_UNLOCK();
1575
1576         list_for_each_safe (entry, tmp, &drops) {
1577                 msg = list_entry(entry, lnet_msg_t, msg_list);
1578
1579                 list_del(&msg->msg_list);
1580
1581                 lnet_drop_delayed_put(msg, "Bad match");
1582         }
1583
1584         list_for_each_safe (entry, tmp, &matches) {
1585                 msg = list_entry(entry, lnet_msg_t, msg_list);
1586
1587                 list_del(&msg->msg_list);
1588
1589                 /* md won't disappear under me, since each msg
1590                  * holds a ref on it */
1591                 lnet_recv_put(md, msg, 1,
1592                               msg->msg_ev.offset,
1593                               msg->msg_ev.mlength);
1594         }
1595
1596         LNET_LOCK();
1597 }
1598
1599 static int
1600 lnet_parse_put(lnet_ni_t *ni, lnet_msg_t *msg)
1601 {
1602         int               rc;
1603         int               index;
1604         lnet_hdr_t       *hdr = &msg->msg_hdr;
1605         unsigned int      rlength = hdr->payload_length;
1606         unsigned int      mlength = 0;
1607         unsigned int      offset = 0;
1608         lnet_process_id_t src = {/* .nid = */ hdr->src_nid,
1609                                  /* .pid = */ hdr->src_pid};
1610         lnet_libmd_t     *md;
1611
1612         /* Convert put fields to host byte order */
1613         hdr->msg.put.match_bits = le64_to_cpu(hdr->msg.put.match_bits);
1614         hdr->msg.put.ptl_index = le32_to_cpu(hdr->msg.put.ptl_index);
1615         hdr->msg.put.offset = le32_to_cpu(hdr->msg.put.offset);
1616
1617         index = hdr->msg.put.ptl_index;
1618
1619         LNET_LOCK();
1620
1621         rc = lnet_match_md(index, LNET_MD_OP_PUT, src,
1622                            rlength, hdr->msg.put.offset,
1623                            hdr->msg.put.match_bits, msg,
1624                            &mlength, &offset, &md);
1625         switch (rc) {
1626         default:
1627                 LBUG();
1628
1629         case LNET_MATCHMD_OK:
1630                 LNET_UNLOCK();
1631                 lnet_recv_put(md, msg, 0, offset, mlength);
1632                 return 0;
1633
1634         case LNET_MATCHMD_NONE:
1635                 rc = lnet_eager_recv_locked(msg);
1636                 if (rc == 0 && !the_lnet.ln_shutdown) {
1637                         list_add_tail(&msg->msg_list,
1638                                       &the_lnet.ln_portals[index].ptl_msgq);
1639
1640                         the_lnet.ln_portals[index].ptl_msgq_version++;
1641
1642                         CDEBUG(D_NET, "Delaying PUT from %s portal %d match "
1643                                LPU64" offset %d length %d: no match \n",
1644                                libcfs_id2str(src), index,
1645                                hdr->msg.put.match_bits,
1646                                hdr->msg.put.offset, rlength);
1647
1648                         LNET_UNLOCK();
1649                         return 0;
1650                 }
1651                 /* fall through */
1652
1653         case LNET_MATCHMD_DROP:
1654                 CDEBUG(D_NETERROR,
1655                        "Dropping PUT from %s portal %d match "LPU64
1656                        " offset %d length %d: %d\n",
1657                        libcfs_id2str(src), index,
1658                        hdr->msg.put.match_bits,
1659                        hdr->msg.put.offset, rlength, rc);
1660                 LNET_UNLOCK();
1661
1662                 return ENOENT;          /* +ve: OK but no match */
1663         }
1664 }
1665
1666 static int
1667 lnet_parse_get(lnet_ni_t *ni, lnet_msg_t *msg, int rdma_get)
1668 {
1669         lnet_hdr_t        *hdr = &msg->msg_hdr;
1670         unsigned int       mlength = 0;
1671         unsigned int       offset = 0;
1672         lnet_process_id_t  src = {/* .nid = */ hdr->src_nid,
1673                                   /* .pid = */ hdr->src_pid};
1674         lnet_handle_wire_t reply_wmd;
1675         lnet_libmd_t      *md;
1676         int                rc;
1677
1678         /* Convert get fields to host byte order */
1679         hdr->msg.get.match_bits = le64_to_cpu(hdr->msg.get.match_bits);
1680         hdr->msg.get.ptl_index = le32_to_cpu(hdr->msg.get.ptl_index);
1681         hdr->msg.get.sink_length = le32_to_cpu(hdr->msg.get.sink_length);
1682         hdr->msg.get.src_offset = le32_to_cpu(hdr->msg.get.src_offset);
1683
1684         LNET_LOCK();
1685
1686         rc = lnet_match_md(hdr->msg.get.ptl_index, LNET_MD_OP_GET, src,
1687                            hdr->msg.get.sink_length, hdr->msg.get.src_offset,
1688                            hdr->msg.get.match_bits, msg,
1689                            &mlength, &offset, &md);
1690         if (rc == LNET_MATCHMD_DROP) {
1691                 CDEBUG(D_NETERROR,
1692                        "Dropping GET from %s portal %d match "LPU64
1693                        " offset %d length %d\n",
1694                        libcfs_id2str(src),
1695                        hdr->msg.get.ptl_index,
1696                        hdr->msg.get.match_bits,
1697                        hdr->msg.get.src_offset,
1698                        hdr->msg.get.sink_length);
1699                 LNET_UNLOCK();
1700                 return ENOENT;                  /* +ve: OK but no match */
1701         }
1702
1703         LASSERT (rc == LNET_MATCHMD_OK);
1704
1705         the_lnet.ln_counters.send_count++;
1706         the_lnet.ln_counters.send_length += mlength;
1707
1708         LNET_UNLOCK();
1709
1710         reply_wmd = hdr->msg.get.return_wmd;
1711
1712         lnet_prep_send(msg, LNET_MSG_REPLY, src, offset, mlength);
1713
1714         msg->msg_hdr.msg.reply.dst_wmd = reply_wmd;
1715
1716         msg->msg_ev.type = LNET_EVENT_GET;
1717         msg->msg_ev.target.pid = hdr->dest_pid;
1718         msg->msg_ev.target.nid = hdr->dest_nid;
1719         msg->msg_ev.hdr_data = 0;
1720
1721         if (rdma_get) {
1722                 /* The LND completes the REPLY from her recv procedure */
1723                 lnet_ni_recv(ni, msg->msg_private, msg, 0,
1724                              msg->msg_offset, msg->msg_len, msg->msg_len);
1725                 return 0;
1726         }
1727
1728         lnet_ni_recv(ni, msg->msg_private, NULL, 0, 0, 0, 0);
1729         msg->msg_receiving = 0;
1730
1731         rc = lnet_send(ni->ni_nid, msg);
1732         if (rc < 0) {
1733                 /* didn't get as far as lnet_ni_send() */
1734                 CERROR("%s: Unable to send REPLY for GET from %s: %d\n",
1735                        libcfs_nid2str(ni->ni_nid), libcfs_id2str(src), rc);
1736
1737                 lnet_finalize(ni, msg, rc);
1738         }
1739
1740         return 0;
1741 }
1742
1743 static int
1744 lnet_parse_reply(lnet_ni_t *ni, lnet_msg_t *msg)
1745 {
1746         void             *private = msg->msg_private;
1747         lnet_hdr_t       *hdr = &msg->msg_hdr;
1748         lnet_process_id_t src = {/* .nid = */ hdr->src_nid,
1749                                  /* .pid = */ hdr->src_pid};
1750         lnet_libmd_t     *md;
1751         int               rlength;
1752         int               mlength;
1753
1754         LNET_LOCK();
1755
1756         /* NB handles only looked up by creator (no flips) */
1757         md = lnet_wire_handle2md(&hdr->msg.reply.dst_wmd);
1758         if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
1759                 CDEBUG(D_NETERROR, "%s: Dropping REPLY from %s for %s "
1760                        "MD "LPX64"."LPX64"\n", 
1761                        libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1762                        (md == NULL) ? "invalid" : "inactive",
1763                        hdr->msg.reply.dst_wmd.wh_interface_cookie,
1764                        hdr->msg.reply.dst_wmd.wh_object_cookie);
1765                 if (md != NULL && md->md_me != NULL)
1766                         CERROR("REPLY MD also attached to portal %d\n",
1767                                md->md_me->me_portal);
1768
1769                 LNET_UNLOCK();
1770                 return ENOENT;                  /* +ve: OK but no match */
1771         }
1772
1773         LASSERT (md->md_offset == 0);
1774
1775         rlength = hdr->payload_length;
1776         mlength = MIN(rlength, md->md_length);
1777
1778         if (mlength < rlength &&
1779             (md->md_options & LNET_MD_TRUNCATE) == 0) {
1780                 CDEBUG(D_NETERROR, "%s: Dropping REPLY from %s length %d "
1781                        "for MD "LPX64" would overflow (%d)\n",
1782                        libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1783                         rlength, hdr->msg.reply.dst_wmd.wh_object_cookie,
1784                         mlength);
1785                 LNET_UNLOCK();
1786                 return ENOENT;          /* +ve: OK but no match */
1787         }
1788
1789         CDEBUG(D_NET, "%s: Reply from %s of length %d/%d into md "LPX64"\n",
1790                libcfs_nid2str(ni->ni_nid), libcfs_id2str(src), 
1791                mlength, rlength, hdr->msg.reply.dst_wmd.wh_object_cookie);
1792
1793         lnet_commit_md(md, msg);
1794
1795         if (mlength != 0)
1796                 lnet_setpayloadbuffer(msg);
1797
1798         msg->msg_ev.type = LNET_EVENT_REPLY;
1799         msg->msg_ev.target.pid = hdr->dest_pid;
1800         msg->msg_ev.target.nid = hdr->dest_nid;
1801         msg->msg_ev.initiator = src;
1802         msg->msg_ev.rlength = rlength;
1803         msg->msg_ev.mlength = mlength;
1804         msg->msg_ev.offset = 0;
1805
1806         lnet_md_deconstruct(md, &msg->msg_ev.md);
1807         lnet_md2handle(&msg->msg_ev.md_handle, md);
1808
1809         the_lnet.ln_counters.recv_count++;
1810         the_lnet.ln_counters.recv_length += mlength;
1811
1812         LNET_UNLOCK();
1813
1814         lnet_ni_recv(ni, private, msg, 0, 0, mlength, rlength);
1815         return 0;
1816 }
1817
1818 static int
1819 lnet_parse_ack(lnet_ni_t *ni, lnet_msg_t *msg)
1820 {
1821         lnet_hdr_t       *hdr = &msg->msg_hdr;
1822         lnet_process_id_t src = {/* .nid = */ hdr->src_nid,
1823                                  /* .pid = */ hdr->src_pid};
1824         lnet_libmd_t    *md;
1825
1826         /* Convert ack fields to host byte order */
1827         hdr->msg.ack.match_bits = le64_to_cpu(hdr->msg.ack.match_bits);
1828         hdr->msg.ack.mlength = le32_to_cpu(hdr->msg.ack.mlength);
1829
1830         LNET_LOCK();
1831
1832         /* NB handles only looked up by creator (no flips) */
1833         md = lnet_wire_handle2md(&hdr->msg.ack.dst_wmd);
1834         if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
1835                 /* Don't moan; this is expected */
1836                 CDEBUG(D_NET,
1837                        "%s: Dropping ACK from %s to %s MD "LPX64"."LPX64"\n",
1838                        libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1839                        (md == NULL) ? "invalid" : "inactive",
1840                        hdr->msg.ack.dst_wmd.wh_interface_cookie,
1841                        hdr->msg.ack.dst_wmd.wh_object_cookie);
1842                 if (md != NULL && md->md_me != NULL)
1843                         CERROR("Source MD also attached to portal %d\n",
1844                                md->md_me->me_portal);
1845
1846                 LNET_UNLOCK();
1847                 return ENOENT;                  /* +ve! */
1848         }
1849
1850         CDEBUG(D_NET, "%s: ACK from %s into md "LPX64"\n",
1851                libcfs_nid2str(ni->ni_nid), libcfs_id2str(src), 
1852                hdr->msg.ack.dst_wmd.wh_object_cookie);
1853
1854         lnet_commit_md(md, msg);
1855
1856         msg->msg_ev.type = LNET_EVENT_ACK;
1857         msg->msg_ev.target.pid = hdr->dest_pid;
1858         msg->msg_ev.target.nid = hdr->dest_nid;
1859         msg->msg_ev.initiator = src;
1860         msg->msg_ev.mlength = hdr->msg.ack.mlength;
1861         msg->msg_ev.match_bits = hdr->msg.ack.match_bits;
1862
1863         lnet_md_deconstruct(md, &msg->msg_ev.md);
1864         lnet_md2handle(&msg->msg_ev.md_handle, md);
1865
1866         the_lnet.ln_counters.recv_count++;
1867
1868         LNET_UNLOCK();
1869
1870         lnet_ni_recv(ni, msg->msg_private, msg, 0, 0, 0, msg->msg_len);
1871         return 0;
1872 }
1873
1874 char *
1875 lnet_msgtyp2str (int type)
1876 {
1877         switch (type) {
1878         case LNET_MSG_ACK:
1879                 return ("ACK");
1880         case LNET_MSG_PUT:
1881                 return ("PUT");
1882         case LNET_MSG_GET:
1883                 return ("GET");
1884         case LNET_MSG_REPLY:
1885                 return ("REPLY");
1886         case LNET_MSG_HELLO:
1887                 return ("HELLO");
1888         default:
1889                 return ("<UNKNOWN>");
1890         }
1891 }
1892
1893 void
1894 lnet_print_hdr(lnet_hdr_t * hdr)
1895 {
1896         lnet_process_id_t src = {/* .nid = */ hdr->src_nid,
1897                                  /* .pid = */ hdr->src_pid};
1898         lnet_process_id_t dst = {/* .nid = */ hdr->dest_nid,
1899                                  /* .pid = */ hdr->dest_pid};
1900         char *type_str = lnet_msgtyp2str (hdr->type);
1901
1902         CWARN("P3 Header at %p of type %s\n", hdr, type_str);
1903         CWARN("    From %s\n", libcfs_id2str(src));
1904         CWARN("    To   %s\n", libcfs_id2str(dst));
1905
1906         switch (hdr->type) {
1907         default:
1908                 break;
1909
1910         case LNET_MSG_PUT:
1911                 CWARN("    Ptl index %d, ack md "LPX64"."LPX64", "
1912                       "match bits "LPU64"\n",
1913                       hdr->msg.put.ptl_index,
1914                       hdr->msg.put.ack_wmd.wh_interface_cookie,
1915                       hdr->msg.put.ack_wmd.wh_object_cookie,
1916                       hdr->msg.put.match_bits);
1917                 CWARN("    Length %d, offset %d, hdr data "LPX64"\n",
1918                       hdr->payload_length, hdr->msg.put.offset,
1919                       hdr->msg.put.hdr_data);
1920                 break;
1921
1922         case LNET_MSG_GET:
1923                 CWARN("    Ptl index %d, return md "LPX64"."LPX64", "
1924                       "match bits "LPU64"\n", hdr->msg.get.ptl_index,
1925                       hdr->msg.get.return_wmd.wh_interface_cookie,
1926                       hdr->msg.get.return_wmd.wh_object_cookie,
1927                       hdr->msg.get.match_bits);
1928                 CWARN("    Length %d, src offset %d\n",
1929                       hdr->msg.get.sink_length,
1930                       hdr->msg.get.src_offset);
1931                 break;
1932
1933         case LNET_MSG_ACK:
1934                 CWARN("    dst md "LPX64"."LPX64", "
1935                       "manipulated length %d\n",
1936                       hdr->msg.ack.dst_wmd.wh_interface_cookie,
1937                       hdr->msg.ack.dst_wmd.wh_object_cookie,
1938                       hdr->msg.ack.mlength);
1939                 break;
1940
1941         case LNET_MSG_REPLY:
1942                 CWARN("    dst md "LPX64"."LPX64", "
1943                       "length %d\n",
1944                       hdr->msg.reply.dst_wmd.wh_interface_cookie,
1945                       hdr->msg.reply.dst_wmd.wh_object_cookie,
1946                       hdr->payload_length);
1947         }
1948
1949 }
1950
1951
1952 int
1953 lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid, 
1954            void *private, int rdma_req)
1955 {
1956         int            rc = 0;
1957         int            for_me;
1958         lnet_msg_t    *msg;
1959         lnet_nid_t     dest_nid;
1960         lnet_nid_t     src_nid;
1961         __u32          payload_length;
1962         __u32          type;
1963
1964         LASSERT (!in_interrupt ());
1965
1966         type = le32_to_cpu(hdr->type);
1967         src_nid = le64_to_cpu(hdr->src_nid);
1968         dest_nid = le64_to_cpu(hdr->dest_nid);
1969         payload_length = le32_to_cpu(hdr->payload_length);
1970
1971         for_me = (ni->ni_nid == dest_nid);
1972
1973         switch (type) {
1974         case LNET_MSG_ACK:
1975         case LNET_MSG_GET:
1976                 if (payload_length > 0) {
1977                         CERROR("%s, src %s: bad %s payload %d (0 expected)\n",
1978                                libcfs_nid2str(from_nid),
1979                                libcfs_nid2str(src_nid),
1980                                lnet_msgtyp2str(type), payload_length);
1981                         return -EPROTO;
1982                 }
1983                 break;
1984
1985         case LNET_MSG_PUT:
1986         case LNET_MSG_REPLY:
1987                 if (payload_length > (for_me ? LNET_MAX_PAYLOAD : LNET_MTU)) {
1988                         CERROR("%s, src %s: bad %s payload %d "
1989                                "(%d max expected)\n",
1990                                libcfs_nid2str(from_nid),
1991                                libcfs_nid2str(src_nid),
1992                                lnet_msgtyp2str(type),
1993                                payload_length,
1994                                for_me ? LNET_MAX_PAYLOAD : LNET_MTU);
1995                         return -EPROTO;
1996                 }
1997                 break;
1998
1999         default:
2000                 CERROR("%s, src %s: Bad message type 0x%x\n",
2001                        libcfs_nid2str(from_nid),
2002                        libcfs_nid2str(src_nid), type);
2003                 return -EPROTO;
2004         }
2005
2006         /* Regard a bad destination NID as a protocol error.  Senders should
2007          * know what they're doing; if they don't they're misconfigured, buggy
2008          * or malicious so we chop them off at the knees :) */
2009
2010         if (!for_me) {
2011                 if (LNET_NIDNET(dest_nid) == LNET_NIDNET(ni->ni_nid)) {
2012                         /* should have gone direct */
2013                         CERROR ("%s, src %s: Bad dest nid %s "
2014                                 "(should have been sent direct)\n",
2015                                 libcfs_nid2str(from_nid),
2016                                 libcfs_nid2str(src_nid),
2017                                 libcfs_nid2str(dest_nid));
2018                         return -EPROTO;
2019                 }
2020
2021                 if (lnet_islocalnid(dest_nid)) {
2022                         /* dest is another local NI; sender should have used
2023                          * this node's NID on its own network */
2024                         CERROR ("%s, src %s: Bad dest nid %s "
2025                                 "(it's my nid but on a different network)\n",
2026                                 libcfs_nid2str(from_nid),
2027                                 libcfs_nid2str(src_nid),
2028                                 libcfs_nid2str(dest_nid));
2029                         return -EPROTO;
2030                 }
2031
2032                 if (rdma_req && type == LNET_MSG_GET) {
2033                         CERROR ("%s, src %s: Bad optimized GET for %s "
2034                                 "(final destination must be me)\n",
2035                                 libcfs_nid2str(from_nid),
2036                                 libcfs_nid2str(src_nid),
2037                                 libcfs_nid2str(dest_nid));
2038                         return -EPROTO;
2039                 }
2040
2041                 if (!the_lnet.ln_routing) {
2042                         CERROR ("%s, src %s: Dropping message for %s "
2043                                 "(routing not enabled)\n",
2044                                 libcfs_nid2str(from_nid),
2045                                 libcfs_nid2str(src_nid),
2046                                 libcfs_nid2str(dest_nid));
2047                         goto drop;
2048                 }
2049         }
2050
2051         /* Message looks OK; we're not going to return an error, so we MUST
2052          * call back lnd_recv() come what may... */
2053
2054         if (!list_empty (&the_lnet.ln_test_peers) && /* normally we don't */
2055             fail_peer (src_nid, 0))             /* shall we now? */
2056         {
2057                 CERROR("%s, src %s: Dropping %s to simulate failure\n",
2058                        libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
2059                        lnet_msgtyp2str(type));
2060                 goto drop;
2061         }
2062
2063         msg = lnet_msg_alloc();
2064         if (msg == NULL) {
2065                 CERROR("%s, src %s: Dropping %s (out of memory)\n",
2066                        libcfs_nid2str(from_nid), libcfs_nid2str(src_nid), 
2067                        lnet_msgtyp2str(type));
2068                 goto drop;
2069         }
2070
2071         /* msg zeroed in lnet_msg_alloc; i.e. flags all clear, pointers NULL etc */
2072
2073         msg->msg_type = type;
2074         msg->msg_private = private;
2075         msg->msg_receiving = 1;
2076         msg->msg_len = msg->msg_wanted = payload_length;
2077         msg->msg_offset = 0;
2078         msg->msg_hdr = *hdr;
2079
2080         LNET_LOCK();
2081         rc = lnet_nid2peer_locked(&msg->msg_rxpeer, from_nid);
2082         if (rc != 0) {
2083                 LNET_UNLOCK();
2084                 CERROR("%s, src %s: Dropping %s "
2085                        "(error %d looking up sender)\n",
2086                        libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
2087                        lnet_msgtyp2str(type), rc);
2088                 goto free_drop;
2089         }
2090         LNET_UNLOCK();
2091
2092 #ifndef __KERNEL__
2093         LASSERT (for_me);
2094 #else
2095         if (!for_me) {
2096                 msg->msg_target.pid = le32_to_cpu(hdr->dest_pid);
2097                 msg->msg_target.nid = dest_nid;
2098                 msg->msg_routing = 1;
2099                 msg->msg_offset = 0;
2100
2101                 LNET_LOCK();
2102                 if (msg->msg_rxpeer->lp_rtrcredits <= 0 ||
2103                     lnet_msg2bufpool(msg)->rbp_credits <= 0) {
2104                         rc = lnet_eager_recv_locked(msg);
2105                         if (rc != 0) {
2106                                 LNET_UNLOCK();
2107                                 goto free_drop;
2108                         }
2109                 }
2110
2111                 lnet_commit_routedmsg(msg);
2112                 rc = lnet_post_routed_recv_locked(msg, 0);
2113                 LNET_UNLOCK();
2114
2115                 if (rc == 0)
2116                         lnet_ni_recv(ni, msg->msg_private, msg, 0,
2117                                      0, payload_length, payload_length);
2118                 return 0;
2119         }
2120 #endif
2121         /* convert common msg->hdr fields to host byteorder */
2122         msg->msg_hdr.type = type;
2123         msg->msg_hdr.src_nid = src_nid;
2124         msg->msg_hdr.src_pid = le32_to_cpu(msg->msg_hdr.src_pid);
2125         msg->msg_hdr.dest_nid = dest_nid;
2126         msg->msg_hdr.dest_pid = le32_to_cpu(msg->msg_hdr.dest_pid);
2127         msg->msg_hdr.payload_length = payload_length;
2128
2129         msg->msg_ev.sender = from_nid;
2130
2131         switch (type) {
2132         case LNET_MSG_ACK:
2133                 rc = lnet_parse_ack(ni, msg);
2134                 break;
2135         case LNET_MSG_PUT:
2136                 rc = lnet_parse_put(ni, msg);
2137                 break;
2138         case LNET_MSG_GET:
2139                 rc = lnet_parse_get(ni, msg, rdma_req);
2140                 break;
2141         case LNET_MSG_REPLY:
2142                 rc = lnet_parse_reply(ni, msg);
2143                 break;
2144         default:
2145                 LASSERT(0);
2146                 goto free_drop;  /* prevent an unused label if !kernel */
2147         }
2148
2149         if (rc == 0)
2150                 return 0;
2151
2152         LASSERT (rc == ENOENT);
2153
2154  free_drop:
2155         LASSERT (msg->msg_md == NULL);
2156         LNET_LOCK();
2157         if (msg->msg_rxpeer != NULL) {
2158                 lnet_peer_decref_locked(msg->msg_rxpeer);
2159                 msg->msg_rxpeer = NULL;
2160         }
2161         lnet_msg_free(msg);                     /* expects LNET_LOCK held */
2162         LNET_UNLOCK();
2163
2164  drop:
2165         lnet_drop_message(ni, private, payload_length);
2166         return 0;
2167 }
2168
2169 int
2170 LNetPut(lnet_nid_t self, lnet_handle_md_t mdh, lnet_ack_req_t ack,
2171         lnet_process_id_t target, unsigned int portal,
2172         __u64 match_bits, unsigned int offset,
2173         __u64 hdr_data)
2174 {
2175         lnet_msg_t       *msg;
2176         lnet_libmd_t     *md;
2177         int               rc;
2178
2179         LASSERT (the_lnet.ln_init);
2180         LASSERT (the_lnet.ln_refcount > 0);
2181
2182         if (!list_empty (&the_lnet.ln_test_peers) && /* normally we don't */
2183             fail_peer (target.nid, 1))          /* shall we now? */
2184         {
2185                 CERROR("Dropping PUT to %s: simulated failure\n",
2186                        libcfs_id2str(target));
2187                 return -EIO;
2188         }
2189
2190         msg = lnet_msg_alloc();
2191         if (msg == NULL) {
2192                 CERROR("Dropping PUT to %s: ENOMEM on lnet_msg_t\n",
2193                        libcfs_id2str(target));
2194                 return -ENOMEM;
2195         }
2196
2197         LNET_LOCK();
2198
2199         md = lnet_handle2md(&mdh);
2200         if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
2201                 lnet_msg_free(msg);
2202
2203                 CERROR("Dropping PUT ("LPU64":%d:%s): MD (%d) invalid\n",
2204                        match_bits, portal, libcfs_id2str(target),
2205                        md == NULL ? -1 : md->md_threshold);
2206                 if (md != NULL && md->md_me != NULL)
2207                         CERROR("Source MD also attached to portal %d\n",
2208                                md->md_me->me_portal);
2209
2210                 LNET_UNLOCK();
2211                 return -ENOENT;
2212         }
2213
2214         CDEBUG(D_NET, "LNetPut -> %s\n", libcfs_id2str(target));
2215
2216         lnet_commit_md(md, msg);
2217
2218         lnet_prep_send(msg, LNET_MSG_PUT, target, 0, md->md_length);
2219
2220         msg->msg_hdr.msg.put.match_bits = cpu_to_le64(match_bits);
2221         msg->msg_hdr.msg.put.ptl_index = cpu_to_le32(portal);
2222         msg->msg_hdr.msg.put.offset = cpu_to_le32(offset);
2223         msg->msg_hdr.msg.put.hdr_data = hdr_data;
2224
2225         /* NB handles only looked up by creator (no flips) */
2226         if (ack == LNET_ACK_REQ) {
2227                 msg->msg_hdr.msg.put.ack_wmd.wh_interface_cookie = 
2228                         the_lnet.ln_interface_cookie;
2229                 msg->msg_hdr.msg.put.ack_wmd.wh_object_cookie = 
2230                         md->md_lh.lh_cookie;
2231         } else {
2232                 msg->msg_hdr.msg.put.ack_wmd = LNET_WIRE_HANDLE_NONE;
2233         }
2234
2235         msg->msg_ev.type = LNET_EVENT_SEND;
2236         msg->msg_ev.initiator.nid = LNET_NID_ANY;
2237         msg->msg_ev.initiator.pid = the_lnet.ln_pid;
2238         msg->msg_ev.target = target;
2239         msg->msg_ev.sender = LNET_NID_ANY;
2240         msg->msg_ev.pt_index = portal;
2241         msg->msg_ev.match_bits = match_bits;
2242         msg->msg_ev.rlength = md->md_length;
2243         msg->msg_ev.mlength = md->md_length;
2244         msg->msg_ev.offset = offset;
2245         msg->msg_ev.hdr_data = hdr_data;
2246
2247         lnet_md_deconstruct(md, &msg->msg_ev.md);
2248         lnet_md2handle(&msg->msg_ev.md_handle, md);
2249
2250         the_lnet.ln_counters.send_count++;
2251         the_lnet.ln_counters.send_length += md->md_length;
2252
2253         LNET_UNLOCK();
2254
2255         rc = lnet_send(self, msg);
2256         if (rc != 0) {
2257                 CERROR("Error sending PUT to %s: %d\n",
2258                        libcfs_id2str(target), rc);
2259                 lnet_finalize (NULL, msg, rc);
2260         }
2261
2262         /* completion will be signalled by an event */
2263         return 0;
2264 }
2265
2266 lnet_msg_t *
2267 lnet_create_reply_msg (lnet_ni_t *ni, lnet_msg_t *getmsg)
2268 {
2269         /* The LND can DMA direct to the GET md (i.e. no REPLY msg).  This
2270          * returns a msg for the LND to pass to lnet_finalize() when the sink
2271          * data has been received.
2272          *
2273          * CAVEAT EMPTOR: 'getmsg' is the original GET, which is freed when
2274          * lnet_finalize() is called on it, so the LND must call this first */
2275
2276         lnet_msg_t        *msg = lnet_msg_alloc();
2277         lnet_libmd_t      *getmd = getmsg->msg_md;
2278         lnet_process_id_t  peer_id = getmsg->msg_target;
2279
2280         LASSERT (!getmsg->msg_target_is_router);
2281         LASSERT (!getmsg->msg_routing);
2282
2283         LNET_LOCK();
2284
2285         LASSERT (getmd->md_refcount > 0);
2286
2287         if (msg == NULL) {
2288                 CERROR ("%s: Dropping REPLY from %s: can't allocate msg\n",
2289                         libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id));
2290                 goto drop;
2291         }
2292
2293         if (getmd->md_threshold == 0) {
2294                 CERROR ("%s: Dropping REPLY from %s for inactive MD %p\n",
2295                         libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id), 
2296                         getmd);
2297                 goto drop_msg;
2298         }
2299
2300         LASSERT (getmd->md_offset == 0);
2301
2302         CDEBUG(D_NET, "%s: Reply from %s md %p\n", 
2303                libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id), getmd);
2304
2305         lnet_commit_md (getmd, msg);
2306
2307         msg->msg_type = LNET_MSG_GET; /* flag this msg as an "optimized" GET */
2308
2309         msg->msg_ev.type = LNET_EVENT_REPLY;
2310         msg->msg_ev.initiator = peer_id;
2311         msg->msg_ev.sender = peer_id.nid;  /* optimized GETs can't be routed */
2312         msg->msg_ev.rlength = msg->msg_ev.mlength = getmd->md_length;
2313         msg->msg_ev.offset = 0;
2314
2315         lnet_md_deconstruct(getmd, &msg->msg_ev.md);
2316         lnet_md2handle(&msg->msg_ev.md_handle, getmd);
2317
2318         the_lnet.ln_counters.recv_count++;
2319         the_lnet.ln_counters.recv_length += getmd->md_length;
2320
2321         LNET_UNLOCK();
2322
2323         return msg;
2324
2325  drop_msg:
2326         lnet_msg_free(msg);
2327  drop:
2328         the_lnet.ln_counters.drop_count++;
2329         the_lnet.ln_counters.drop_length += getmd->md_length;
2330
2331         LNET_UNLOCK ();
2332
2333         return NULL;
2334 }
2335
2336 void
2337 lnet_set_reply_msg_len(lnet_ni_t *ni, lnet_msg_t *reply, unsigned int len)
2338 {
2339         /* Set the REPLY length, now the RDMA that elides the REPLY message has
2340          * completed and I know it. */
2341         LASSERT (reply != NULL);
2342         LASSERT (reply->msg_type == LNET_MSG_GET);
2343         LASSERT (reply->msg_ev.type == LNET_EVENT_REPLY);
2344
2345         /* NB I trusted my peer to RDMA.  If she tells me she's written beyond
2346          * the end of my buffer, I might as well be dead. */
2347         LASSERT (len <= reply->msg_ev.mlength);
2348
2349         reply->msg_ev.mlength = len;
2350 }
2351
2352 int
2353 LNetGet(lnet_nid_t self, lnet_handle_md_t mdh, 
2354         lnet_process_id_t target, unsigned int portal, 
2355         __u64 match_bits, unsigned int offset)
2356 {
2357         lnet_msg_t       *msg;
2358         lnet_libmd_t     *md;
2359         int               rc;
2360
2361         LASSERT (the_lnet.ln_init);
2362         LASSERT (the_lnet.ln_refcount > 0);
2363
2364         if (!list_empty (&the_lnet.ln_test_peers) && /* normally we don't */
2365             fail_peer (target.nid, 1))          /* shall we now? */
2366         {
2367                 CERROR("Dropping GET to %s: simulated failure\n",
2368                        libcfs_id2str(target));
2369                 return -EIO;
2370         }
2371
2372         msg = lnet_msg_alloc();
2373         if (msg == NULL) {
2374                 CERROR("Dropping GET to %s: ENOMEM on lnet_msg_t\n",
2375                        libcfs_id2str(target));
2376                 return -ENOMEM;
2377         }
2378
2379         LNET_LOCK();
2380
2381         md = lnet_handle2md(&mdh);
2382         if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
2383                 lnet_msg_free(msg);
2384
2385                 CERROR("Dropping GET ("LPU64":%d:%s): MD (%d) invalid\n",
2386                        match_bits, portal, libcfs_id2str(target),
2387                        md == NULL ? -1 : md->md_threshold);
2388                 if (md != NULL && md->md_me != NULL)
2389                         CERROR("REPLY MD also attached to portal %d\n",
2390                                md->md_me->me_portal);
2391
2392                 LNET_UNLOCK();
2393                 return -ENOENT;
2394         }
2395
2396         CDEBUG(D_NET, "LNetGet -> %s\n", libcfs_id2str(target));
2397
2398         lnet_commit_md(md, msg);
2399
2400         lnet_prep_send(msg, LNET_MSG_GET, target, 0, 0);
2401
2402         msg->msg_hdr.msg.get.match_bits = cpu_to_le64(match_bits);
2403         msg->msg_hdr.msg.get.ptl_index = cpu_to_le32(portal);
2404         msg->msg_hdr.msg.get.src_offset = cpu_to_le32(offset);
2405         msg->msg_hdr.msg.get.sink_length = cpu_to_le32(md->md_length);
2406
2407         /* NB handles only looked up by creator (no flips) */
2408         msg->msg_hdr.msg.get.return_wmd.wh_interface_cookie = 
2409                 the_lnet.ln_interface_cookie;
2410         msg->msg_hdr.msg.get.return_wmd.wh_object_cookie = 
2411                 md->md_lh.lh_cookie;
2412
2413         msg->msg_ev.type = LNET_EVENT_SEND;
2414         msg->msg_ev.initiator.nid = LNET_NID_ANY;
2415         msg->msg_ev.initiator.pid = the_lnet.ln_pid;
2416         msg->msg_ev.target = target;
2417         msg->msg_ev.sender = LNET_NID_ANY;
2418         msg->msg_ev.pt_index = portal;
2419         msg->msg_ev.match_bits = match_bits;
2420         msg->msg_ev.rlength = md->md_length;
2421         msg->msg_ev.mlength = md->md_length;
2422         msg->msg_ev.offset = offset;
2423         msg->msg_ev.hdr_data = 0;
2424
2425         lnet_md_deconstruct(md, &msg->msg_ev.md);
2426         lnet_md2handle(&msg->msg_ev.md_handle, md);
2427
2428         the_lnet.ln_counters.send_count++;
2429
2430         LNET_UNLOCK();
2431
2432         rc = lnet_send(self, msg);
2433         if (rc < 0) {
2434                 CERROR("error sending GET to %s: %d\n",
2435                        libcfs_id2str(target), rc);
2436                 lnet_finalize (NULL, msg, rc);
2437         }
2438
2439         /* completion will be signalled by an event */
2440         return 0;
2441 }
2442
2443 int
2444 LNetDist (lnet_nid_t dstnid, lnet_nid_t *srcnidp, __u32 *orderp)
2445 {
2446         struct list_head *e;
2447         lnet_ni_t        *ni;
2448         lnet_route_t     *route;
2449         lnet_remotenet_t *rnet;
2450         __u32             dstnet = LNET_NIDNET(dstnid);
2451         int               hops;
2452         __u32             order = 2;
2453
2454         /* if !local_nid_dist_zero, I don't return a distance of 0 ever
2455          * (when lustre sees a distance of 0, it substitutes 0@lo), so I
2456          * keep order 0 free for 0@lo and order 1 free for a local NID
2457          * match */
2458
2459         LASSERT (the_lnet.ln_init);
2460         LASSERT (the_lnet.ln_refcount > 0);
2461
2462         LNET_LOCK();
2463
2464         list_for_each (e, &the_lnet.ln_nis) {
2465                 ni = list_entry(e, lnet_ni_t, ni_list);
2466
2467                 if (ni->ni_nid == dstnid) {
2468                         if (srcnidp != NULL)
2469                                 *srcnidp = dstnid;
2470                         if (orderp != NULL) {
2471                                 if (LNET_NETTYP(LNET_NIDNET(dstnid)) == LOLND)
2472                                         *orderp = 0;
2473                                 else
2474                                         *orderp = 1;
2475                         }
2476                         LNET_UNLOCK();
2477
2478                         return local_nid_dist_zero ? 0 : 1;
2479                 }
2480
2481                 if (LNET_NIDNET(ni->ni_nid) == dstnet) {
2482                         if (srcnidp != NULL)
2483                                 *srcnidp = ni->ni_nid;
2484                         if (orderp != NULL)
2485                                 *orderp = order;
2486                         LNET_UNLOCK();
2487                         return 1;
2488                 }
2489
2490                 order++;
2491         }
2492
2493         list_for_each (e, &the_lnet.ln_remote_nets) {
2494                 rnet = list_entry(e, lnet_remotenet_t, lrn_list);
2495
2496                 if (rnet->lrn_net == dstnet) {
2497                         LASSERT (!list_empty(&rnet->lrn_routes));
2498                         route = list_entry(rnet->lrn_routes.next,
2499                                            lnet_route_t, lr_list);
2500                         hops = rnet->lrn_hops;
2501                         if (srcnidp != NULL)
2502                                 *srcnidp = route->lr_gateway->lp_ni->ni_nid;
2503                         if (orderp != NULL)
2504                                 *orderp = order;
2505                         LNET_UNLOCK();
2506                         return hops + 1;
2507                 }
2508                 order++;
2509         }
2510
2511         LNET_UNLOCK();
2512         return -EHOSTUNREACH;
2513 }
2514
2515 int
2516 LNetSetAsync(lnet_process_id_t id, int nasync)
2517 {
2518 #ifdef __KERNEL__
2519         return 0;
2520 #else
2521         lnet_ni_t        *ni;
2522         lnet_remotenet_t *rnet;
2523         struct list_head *tmp;
2524         lnet_route_t     *route;
2525         lnet_nid_t       *nids;
2526         int               nnids;
2527         int               maxnids = 256;
2528         int               rc = 0;
2529         int               rc2;
2530
2531         /* Target on a local network? */ 
2532
2533         ni = lnet_net2ni(LNET_NIDNET(id.nid));
2534         if (ni != NULL) {
2535                 if (ni->ni_lnd->lnd_setasync != NULL) 
2536                         rc = (ni->ni_lnd->lnd_setasync)(ni, id, nasync);
2537                 lnet_ni_decref(ni);
2538                 return rc;
2539         }
2540
2541         /* Target on a remote network: apply to routers */
2542  again:
2543         LIBCFS_ALLOC(nids, maxnids * sizeof(*nids));
2544         if (nids == NULL)
2545                 return -ENOMEM;
2546         nnids = 0;
2547
2548         /* Snapshot all the router NIDs */
2549         LNET_LOCK();
2550         rnet = lnet_find_net_locked(LNET_NIDNET(id.nid));
2551         if (rnet != NULL) {
2552                 list_for_each(tmp, &rnet->lrn_routes) {
2553                         if (nnids == maxnids) {
2554                                 LNET_UNLOCK();
2555                                 LIBCFS_FREE(nids, maxnids * sizeof(*nids));
2556                                 maxnids *= 2;
2557                                 goto again;
2558                         }
2559
2560                         route = list_entry(tmp, lnet_route_t, lr_list);
2561                         nids[nnids++] = route->lr_gateway->lp_nid;
2562                 }
2563         }
2564         LNET_UNLOCK();
2565
2566         /* set async on all the routers */
2567         while (nnids-- > 0) {
2568                 id.pid = LUSTRE_SRV_LNET_PID;
2569                 id.nid = nids[nnids];
2570
2571                 ni = lnet_net2ni(LNET_NIDNET(id.nid));
2572                 if (ni == NULL)
2573                         continue;
2574
2575                 if (ni->ni_lnd->lnd_setasync != NULL) {
2576                         rc2 = (ni->ni_lnd->lnd_setasync)(ni, id, nasync);
2577                         if (rc2 != 0)
2578                                 rc = rc2;
2579                 }
2580                 lnet_ni_decref(ni);
2581         }
2582
2583         LIBCFS_FREE(nids, maxnids * sizeof(*nids));
2584         return rc;
2585 #endif
2586 }