Whamcloud - gitweb
i=liang:
[fs/lustre-release.git] / lnet / lnet / lib-move.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * lib/lib-move.c
5  * Data movement routines
6  *
7  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
8  *
9  *   This file is part of Lustre, http://www.lustre.org
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #define DEBUG_SUBSYSTEM S_LNET
26
27 #include <lnet/lib-lnet.h>
28
29 static int local_nid_dist_zero = 1;
30 CFS_MODULE_PARM(local_nid_dist_zero, "i", int, 0444,
31                 "Reserved");
32
33 /* forward ref */
34 static void lnet_commit_md (lnet_libmd_t *md, lnet_msg_t *msg);
35 static void lnet_drop_delayed_put(lnet_msg_t *msg, char *reason);
36
37 #define LNET_MATCHMD_NONE     0   /* Didn't match */
38 #define LNET_MATCHMD_OK       1   /* Matched OK */
39 #define LNET_MATCHMD_DROP     2   /* Must be discarded */
40
41 static int
42 lnet_try_match_md (int index, int op_mask, lnet_process_id_t src,
43                    unsigned int rlength, unsigned int roffset, 
44                    __u64 match_bits, lnet_libmd_t *md, lnet_msg_t *msg,
45                    unsigned int *mlength_out, unsigned int *offset_out)
46 {
47         /* ALWAYS called holding the LNET_LOCK, and can't LNET_UNLOCK;
48          * lnet_match_blocked_msg() relies on this to avoid races */
49         unsigned int  offset;
50         unsigned int  mlength;
51         lnet_me_t    *me = md->md_me;
52
53         /* mismatched MD op */
54         if ((md->md_options & op_mask) == 0)
55                 return LNET_MATCHMD_NONE;
56
57         /* MD exhausted */
58         if (lnet_md_exhausted(md))
59                 return LNET_MATCHMD_NONE;
60
61         /* mismatched ME nid/pid? */
62         if (me->me_match_id.nid != LNET_NID_ANY &&
63             me->me_match_id.nid != src.nid)
64                 return LNET_MATCHMD_NONE;
65
66         if (me->me_match_id.pid != LNET_PID_ANY &&
67             me->me_match_id.pid != src.pid)
68                 return LNET_MATCHMD_NONE;
69
70         /* mismatched ME matchbits? */
71         if (((me->me_match_bits ^ match_bits) & ~me->me_ignore_bits) != 0)
72                 return LNET_MATCHMD_NONE;
73
74         /* Hurrah! This _is_ a match; check it out... */
75
76         if ((md->md_options & LNET_MD_MANAGE_REMOTE) == 0)
77                 offset = md->md_offset;
78         else
79                 offset = roffset;
80
81         if ((md->md_options & LNET_MD_MAX_SIZE) != 0) {
82                 mlength = md->md_max_size;
83                 LASSERT (md->md_offset + mlength <= md->md_length);
84         } else {
85                 mlength = md->md_length - offset;
86         }
87
88         if (rlength <= mlength) {        /* fits in allowed space */
89                 mlength = rlength;
90         } else if ((md->md_options & LNET_MD_TRUNCATE) == 0) {
91                 /* this packet _really_ is too big */
92                 CERROR("Matching packet from %s, match "LPU64
93                        " length %d too big: %d left, %d allowed\n", 
94                        libcfs_id2str(src), match_bits, rlength,
95                        md->md_length - offset, mlength);
96
97                 return LNET_MATCHMD_DROP;
98         }
99
100         /* Commit to this ME/MD */
101         CDEBUG(D_NET, "Incoming %s index %x from %s of "
102                "length %d/%d into md "LPX64" [%d] + %d\n",
103                (op_mask == LNET_MD_OP_PUT) ? "put" : "get",
104                index, libcfs_id2str(src), mlength, rlength,
105                md->md_lh.lh_cookie, md->md_niov, offset);
106
107         lnet_commit_md(md, msg);
108         md->md_offset = offset + mlength;
109
110         /* NB Caller will set ev.type and ev.hdr_data */
111         msg->msg_ev.initiator = src;
112         msg->msg_ev.pt_index = index;
113         msg->msg_ev.match_bits = match_bits;
114         msg->msg_ev.rlength = rlength;
115         msg->msg_ev.mlength = mlength;
116         msg->msg_ev.offset = offset;
117
118         lnet_md_deconstruct(md, &msg->msg_ev.md);
119         lnet_md2handle(&msg->msg_ev.md_handle, md);
120
121         *offset_out = offset;
122         *mlength_out = mlength;
123
124         /* Auto-unlink NOW, so the ME gets unlinked if required.
125          * We bumped md->md_refcount above so the MD just gets flagged
126          * for unlink when it is finalized. */
127         if ((md->md_flags & LNET_MD_FLAG_AUTO_UNLINK) != 0 &&
128             lnet_md_exhausted(md)) {
129                 lnet_md_unlink(md);
130         }
131
132         return LNET_MATCHMD_OK;
133 }
134
135 static int
136 lnet_match_md(int index, int op_mask, lnet_process_id_t src,
137               unsigned int rlength, unsigned int roffset,
138               __u64 match_bits, lnet_msg_t *msg,
139               unsigned int *mlength_out, unsigned int *offset_out,
140               lnet_libmd_t **md_out)
141 {
142         lnet_portal_t    *ptl = &the_lnet.ln_portals[index];
143         lnet_me_t        *me;
144         lnet_me_t        *tmp;
145         lnet_libmd_t     *md;
146         int               rc;
147
148         CDEBUG (D_NET, "Request from %s of length %d into portal %d "
149                 "MB="LPX64"\n", libcfs_id2str(src), rlength, index, match_bits);
150
151         if (index < 0 || index >= the_lnet.ln_nportals) {
152                 CERROR("Invalid portal %d not in [0-%d]\n",
153                        index, the_lnet.ln_nportals);
154                 return LNET_MATCHMD_DROP;
155         }
156
157         list_for_each_entry_safe (me, tmp, &ptl->ptl_ml, me_list) {
158                 md = me->me_md;
159
160                 /* ME attached but MD not attached yet */
161                 if (md == NULL)
162                         continue;
163
164                 LASSERT (me == md->md_me);
165
166                 rc = lnet_try_match_md(index, op_mask, src, rlength, 
167                                        roffset, match_bits, md, msg,
168                                        mlength_out, offset_out);
169                 switch (rc) {
170                 default:
171                         LBUG();
172                         
173                 case LNET_MATCHMD_NONE:
174                         continue;
175                         
176                 case LNET_MATCHMD_OK:
177                         *md_out = md;
178                         return LNET_MATCHMD_OK;
179                         
180                 case LNET_MATCHMD_DROP:
181                         return LNET_MATCHMD_DROP;
182                 }
183                 /* not reached */
184         }
185
186         if (op_mask == LNET_MD_OP_GET ||
187             (ptl->ptl_options & LNET_PTL_LAZY) == 0)
188                 return LNET_MATCHMD_DROP;
189         
190         return LNET_MATCHMD_NONE;
191 }
192
193 int
194 lnet_fail_nid (lnet_nid_t nid, unsigned int threshold)
195 {
196         lnet_test_peer_t   *tp;
197         struct list_head  *el;
198         struct list_head  *next;
199         struct list_head   cull;
200
201         LASSERT (the_lnet.ln_init);
202         
203         if (threshold != 0) {
204                 /* Adding a new entry */
205                 LIBCFS_ALLOC(tp, sizeof(*tp));
206                 if (tp == NULL)
207                         return -ENOMEM;
208
209                 tp->tp_nid = nid;
210                 tp->tp_threshold = threshold;
211
212                 LNET_LOCK();
213                 list_add_tail (&tp->tp_list, &the_lnet.ln_test_peers);
214                 LNET_UNLOCK();
215                 return 0;
216         }
217
218         /* removing entries */
219         CFS_INIT_LIST_HEAD (&cull);
220
221         LNET_LOCK();
222
223         list_for_each_safe (el, next, &the_lnet.ln_test_peers) {
224                 tp = list_entry (el, lnet_test_peer_t, tp_list);
225
226                 if (tp->tp_threshold == 0 ||    /* needs culling anyway */
227                     nid == LNET_NID_ANY ||       /* removing all entries */
228                     tp->tp_nid == nid)          /* matched this one */
229                 {
230                         list_del (&tp->tp_list);
231                         list_add (&tp->tp_list, &cull);
232                 }
233         }
234
235         LNET_UNLOCK();
236
237         while (!list_empty (&cull)) {
238                 tp = list_entry (cull.next, lnet_test_peer_t, tp_list);
239
240                 list_del (&tp->tp_list);
241                 LIBCFS_FREE(tp, sizeof (*tp));
242         }
243         return 0;
244 }
245
246 static int
247 fail_peer (lnet_nid_t nid, int outgoing)
248 {
249         lnet_test_peer_t  *tp;
250         struct list_head *el;
251         struct list_head *next;
252         struct list_head  cull;
253         int               fail = 0;
254
255         CFS_INIT_LIST_HEAD (&cull);
256
257         LNET_LOCK();
258
259         list_for_each_safe (el, next, &the_lnet.ln_test_peers) {
260                 tp = list_entry (el, lnet_test_peer_t, tp_list);
261
262                 if (tp->tp_threshold == 0) {
263                         /* zombie entry */
264                         if (outgoing) {
265                                 /* only cull zombies on outgoing tests,
266                                  * since we may be at interrupt priority on
267                                  * incoming messages. */
268                                 list_del (&tp->tp_list);
269                                 list_add (&tp->tp_list, &cull);
270                         }
271                         continue;
272                 }
273
274                 if (tp->tp_nid == LNET_NID_ANY || /* fail every peer */
275                     nid == tp->tp_nid) {        /* fail this peer */
276                         fail = 1;
277
278                         if (tp->tp_threshold != LNET_MD_THRESH_INF) {
279                                 tp->tp_threshold--;
280                                 if (outgoing &&
281                                     tp->tp_threshold == 0) {
282                                         /* see above */
283                                         list_del (&tp->tp_list);
284                                         list_add (&tp->tp_list, &cull);
285                                 }
286                         }
287                         break;
288                 }
289         }
290
291         LNET_UNLOCK ();
292
293         while (!list_empty (&cull)) {
294                 tp = list_entry (cull.next, lnet_test_peer_t, tp_list);
295                 list_del (&tp->tp_list);
296
297                 LIBCFS_FREE(tp, sizeof (*tp));
298         }
299
300         return (fail);
301 }
302
303 unsigned int
304 lnet_iov_nob (unsigned int niov, struct iovec *iov)
305 {
306         unsigned int nob = 0;
307
308         while (niov-- > 0)
309                 nob += (iov++)->iov_len;
310
311         return (nob);
312 }
313
314 void
315 lnet_copy_iov2iov (unsigned int ndiov, struct iovec *diov, unsigned int doffset,
316                    unsigned int nsiov, struct iovec *siov, unsigned int soffset,
317                    unsigned int nob)
318 {
319         /* NB diov, siov are READ-ONLY */
320         unsigned int  this_nob;
321
322         if (nob == 0)
323                 return;
324
325         /* skip complete frags before 'doffset' */
326         LASSERT (ndiov > 0);
327         while (doffset >= diov->iov_len) {
328                 doffset -= diov->iov_len;
329                 diov++;
330                 ndiov--;
331                 LASSERT (ndiov > 0);
332         }
333         
334         /* skip complete frags before 'soffset' */
335         LASSERT (nsiov > 0);
336         while (soffset >= siov->iov_len) {
337                 soffset -= siov->iov_len;
338                 siov++;
339                 nsiov--;
340                 LASSERT (nsiov > 0);
341         }
342
343         do {
344                 LASSERT (ndiov > 0);
345                 LASSERT (nsiov > 0);
346                 this_nob = MIN(diov->iov_len - doffset,
347                                siov->iov_len - soffset);
348                 this_nob = MIN(this_nob, nob);
349
350                 memcpy ((char *)diov->iov_base + doffset,
351                         (char *)siov->iov_base + soffset, this_nob);
352                 nob -= this_nob;
353
354                 if (diov->iov_len > doffset + this_nob) {
355                         doffset += this_nob;
356                 } else {
357                         diov++;
358                         ndiov--;
359                         doffset = 0;
360                 }
361                 
362                 if (siov->iov_len > soffset + this_nob) {
363                         soffset += this_nob;
364                 } else {
365                         siov++;
366                         nsiov--;
367                         soffset = 0;
368                 }
369         } while (nob > 0);
370 }
371
372 int
373 lnet_extract_iov (int dst_niov, struct iovec *dst,
374                   int src_niov, struct iovec *src,
375                   unsigned int offset, unsigned int len)
376 {
377         /* Initialise 'dst' to the subset of 'src' starting at 'offset',
378          * for exactly 'len' bytes, and return the number of entries.
379          * NB not destructive to 'src' */
380         unsigned int    frag_len;
381         unsigned int    niov;
382
383         if (len == 0)                           /* no data => */
384                 return (0);                     /* no frags */
385
386         LASSERT (src_niov > 0);
387         while (offset >= src->iov_len) {      /* skip initial frags */
388                 offset -= src->iov_len;
389                 src_niov--;
390                 src++;
391                 LASSERT (src_niov > 0);
392         }
393
394         niov = 1;
395         for (;;) {
396                 LASSERT (src_niov > 0);
397                 LASSERT (niov <= dst_niov);
398
399                 frag_len = src->iov_len - offset;
400                 dst->iov_base = ((char *)src->iov_base) + offset;
401
402                 if (len <= frag_len) {
403                         dst->iov_len = len;
404                         return (niov);
405                 }
406
407                 dst->iov_len = frag_len;
408
409                 len -= frag_len;
410                 dst++;
411                 src++;
412                 niov++;
413                 src_niov--;
414                 offset = 0;
415         }
416 }
417
418 #ifndef __KERNEL__
419 unsigned int
420 lnet_kiov_nob (unsigned int niov, lnet_kiov_t *kiov)
421 {
422         LASSERT (0);
423         return (0);
424 }
425
426 void
427 lnet_copy_kiov2kiov (unsigned int ndkiov, lnet_kiov_t *dkiov, unsigned int doffset,
428                      unsigned int nskiov, lnet_kiov_t *skiov, unsigned int soffset,
429                      unsigned int nob)
430 {
431         LASSERT (0);
432 }
433
434 void
435 lnet_copy_kiov2iov (unsigned int niov, struct iovec *iov, unsigned int iovoffset,
436                     unsigned int nkiov, lnet_kiov_t *kiov, unsigned int kiovoffset,
437                     unsigned int nob)
438 {
439         LASSERT (0);
440 }
441
442 void
443 lnet_copy_iov2kiov (unsigned int nkiov, lnet_kiov_t *kiov, unsigned int kiovoffset,
444                     unsigned int niov, struct iovec *iov, unsigned int iovoffset,
445                     unsigned int nob)
446 {
447         LASSERT (0);
448 }
449
450 int
451 lnet_extract_kiov (int dst_niov, lnet_kiov_t *dst,
452                    int src_niov, lnet_kiov_t *src,
453                    unsigned int offset, unsigned int len)
454 {
455         LASSERT (0);
456 }
457
458 #else /* __KERNEL__ */
459
460 unsigned int
461 lnet_kiov_nob (unsigned int niov, lnet_kiov_t *kiov)
462 {
463         unsigned int  nob = 0;
464
465         while (niov-- > 0)
466                 nob += (kiov++)->kiov_len;
467
468         return (nob);
469 }
470
471 void
472 lnet_copy_kiov2kiov (unsigned int ndiov, lnet_kiov_t *diov, unsigned int doffset,
473                      unsigned int nsiov, lnet_kiov_t *siov, unsigned int soffset,
474                      unsigned int nob)
475 {
476         /* NB diov, siov are READ-ONLY */
477         unsigned int    this_nob;
478         char           *daddr = NULL;
479         char           *saddr = NULL;
480
481         if (nob == 0)
482                 return;
483
484         LASSERT (!in_interrupt ());
485
486         LASSERT (ndiov > 0);
487         while (doffset >= diov->kiov_len) {
488                 doffset -= diov->kiov_len;
489                 diov++;
490                 ndiov--;
491                 LASSERT (ndiov > 0);
492         }
493
494         LASSERT (nsiov > 0);
495         while (soffset >= siov->kiov_len) {
496                 soffset -= siov->kiov_len;
497                 siov++;
498                 nsiov--;
499                 LASSERT (nsiov > 0);
500         }
501
502         do {
503                 LASSERT (ndiov > 0);
504                 LASSERT (nsiov > 0);
505                 this_nob = MIN(diov->kiov_len - doffset,
506                                siov->kiov_len - soffset);
507                 this_nob = MIN(this_nob, nob);
508
509                 if (daddr == NULL)
510                         daddr = ((char *)cfs_kmap(diov->kiov_page)) + 
511                                 diov->kiov_offset + doffset;
512                 if (saddr == NULL)
513                         saddr = ((char *)cfs_kmap(siov->kiov_page)) + 
514                                 siov->kiov_offset + soffset;
515
516                 /* Vanishing risk of kmap deadlock when mapping 2 pages.
517                  * However in practice at least one of the kiovs will be mapped
518                  * kernel pages and the map/unmap will be NOOPs */
519
520                 memcpy (daddr, saddr, this_nob);
521                 nob -= this_nob;
522
523                 if (diov->kiov_len > doffset + this_nob) {
524                         daddr += this_nob;
525                         doffset += this_nob;
526                 } else {
527                         cfs_kunmap(diov->kiov_page);
528                         daddr = NULL;
529                         diov++;
530                         ndiov--;
531                         doffset = 0;
532                 }
533
534                 if (siov->kiov_len > soffset + this_nob) {
535                         saddr += this_nob;
536                         soffset += this_nob;
537                 } else {
538                         cfs_kunmap(siov->kiov_page);
539                         saddr = NULL;
540                         siov++;
541                         nsiov--;
542                         soffset = 0;
543                 }
544         } while (nob > 0);
545
546         if (daddr != NULL)
547                 cfs_kunmap(diov->kiov_page);
548         if (saddr != NULL)
549                 cfs_kunmap(siov->kiov_page);
550 }
551
552 void
553 lnet_copy_kiov2iov (unsigned int niov, struct iovec *iov, unsigned int iovoffset,
554                     unsigned int nkiov, lnet_kiov_t *kiov, unsigned int kiovoffset,
555                     unsigned int nob)
556 {
557         /* NB iov, kiov are READ-ONLY */
558         unsigned int    this_nob;
559         char           *addr = NULL;
560
561         if (nob == 0)
562                 return;
563
564         LASSERT (!in_interrupt ());
565
566         LASSERT (niov > 0);
567         while (iovoffset >= iov->iov_len) {
568                 iovoffset -= iov->iov_len;
569                 iov++;
570                 niov--;
571                 LASSERT (niov > 0);
572         }
573
574         LASSERT (nkiov > 0);
575         while (kiovoffset >= kiov->kiov_len) {
576                 kiovoffset -= kiov->kiov_len;
577                 kiov++;
578                 nkiov--;
579                 LASSERT (nkiov > 0);
580         }
581
582         do {
583                 LASSERT (niov > 0);
584                 LASSERT (nkiov > 0);
585                 this_nob = MIN(iov->iov_len - iovoffset,
586                                kiov->kiov_len - kiovoffset);
587                 this_nob = MIN(this_nob, nob);
588
589                 if (addr == NULL)
590                         addr = ((char *)cfs_kmap(kiov->kiov_page)) + 
591                                 kiov->kiov_offset + kiovoffset;
592
593                 memcpy ((char *)iov->iov_base + iovoffset, addr, this_nob);
594                 nob -= this_nob;
595
596                 if (iov->iov_len > iovoffset + this_nob) {
597                         iovoffset += this_nob;
598                 } else {
599                         iov++;
600                         niov--;
601                         iovoffset = 0;
602                 }
603
604                 if (kiov->kiov_len > kiovoffset + this_nob) {
605                         addr += this_nob;
606                         kiovoffset += this_nob;
607                 } else {
608                         cfs_kunmap(kiov->kiov_page);
609                         addr = NULL;
610                         kiov++;
611                         nkiov--;
612                         kiovoffset = 0;
613                 }
614
615         } while (nob > 0);
616
617         if (addr != NULL)
618                 cfs_kunmap(kiov->kiov_page);
619 }
620
621 void
622 lnet_copy_iov2kiov (unsigned int nkiov, lnet_kiov_t *kiov, unsigned int kiovoffset,
623                     unsigned int niov, struct iovec *iov, unsigned int iovoffset,
624                     unsigned int nob)
625 {
626         /* NB kiov, iov are READ-ONLY */
627         unsigned int    this_nob;
628         char           *addr = NULL;
629
630         if (nob == 0)
631                 return;
632
633         LASSERT (!in_interrupt ());
634
635         LASSERT (nkiov > 0);
636         while (kiovoffset >= kiov->kiov_len) {
637                 kiovoffset -= kiov->kiov_len;
638                 kiov++;
639                 nkiov--;
640                 LASSERT (nkiov > 0);
641         }
642
643         LASSERT (niov > 0);
644         while (iovoffset >= iov->iov_len) {
645                 iovoffset -= iov->iov_len;
646                 iov++;
647                 niov--;
648                 LASSERT (niov > 0);
649         }
650
651         do {
652                 LASSERT (nkiov > 0);
653                 LASSERT (niov > 0);
654                 this_nob = MIN(kiov->kiov_len - kiovoffset,
655                                iov->iov_len - iovoffset);
656                 this_nob = MIN(this_nob, nob);
657
658                 if (addr == NULL)
659                         addr = ((char *)cfs_kmap(kiov->kiov_page)) + 
660                                 kiov->kiov_offset + kiovoffset;
661
662                 memcpy (addr, (char *)iov->iov_base + iovoffset, this_nob);
663                 nob -= this_nob;
664
665                 if (kiov->kiov_len > kiovoffset + this_nob) {
666                         addr += this_nob;
667                         kiovoffset += this_nob;
668                 } else {
669                         cfs_kunmap(kiov->kiov_page);
670                         addr = NULL;
671                         kiov++;
672                         nkiov--;
673                         kiovoffset = 0;
674                 }
675
676                 if (iov->iov_len > iovoffset + this_nob) {
677                         iovoffset += this_nob;
678                 } else {
679                         iov++;
680                         niov--;
681                         iovoffset = 0;
682                 }
683         } while (nob > 0);
684
685         if (addr != NULL)
686                 cfs_kunmap(kiov->kiov_page);
687 }
688
689 int
690 lnet_extract_kiov (int dst_niov, lnet_kiov_t *dst,
691                    int src_niov, lnet_kiov_t *src,
692                    unsigned int offset, unsigned int len)
693 {
694         /* Initialise 'dst' to the subset of 'src' starting at 'offset',
695          * for exactly 'len' bytes, and return the number of entries.
696          * NB not destructive to 'src' */
697         unsigned int    frag_len;
698         unsigned int    niov;
699
700         if (len == 0)                           /* no data => */
701                 return (0);                     /* no frags */
702
703         LASSERT (src_niov > 0);
704         while (offset >= src->kiov_len) {      /* skip initial frags */
705                 offset -= src->kiov_len;
706                 src_niov--;
707                 src++;
708                 LASSERT (src_niov > 0);
709         }
710
711         niov = 1;
712         for (;;) {
713                 LASSERT (src_niov > 0);
714                 LASSERT (niov <= dst_niov);
715
716                 frag_len = src->kiov_len - offset;
717                 dst->kiov_page = src->kiov_page;
718                 dst->kiov_offset = src->kiov_offset + offset;
719
720                 if (len <= frag_len) {
721                         dst->kiov_len = len;
722                         LASSERT (dst->kiov_offset + dst->kiov_len <= CFS_PAGE_SIZE);
723                         return (niov);
724                 }
725
726                 dst->kiov_len = frag_len;
727                 LASSERT (dst->kiov_offset + dst->kiov_len <= CFS_PAGE_SIZE);
728
729                 len -= frag_len;
730                 dst++;
731                 src++;
732                 niov++;
733                 src_niov--;
734                 offset = 0;
735         }
736 }
737 #endif
738
739 void
740 lnet_ni_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed,
741              unsigned int offset, unsigned int mlen, unsigned int rlen)
742 {
743         unsigned int  niov = 0;
744         struct iovec *iov = NULL;
745         lnet_kiov_t  *kiov = NULL;
746         int           rc;
747
748         LASSERT (!in_interrupt ());
749         LASSERT (mlen == 0 || msg != NULL);
750         
751         if (msg != NULL) {
752                 LASSERT(msg->msg_receiving);
753                 LASSERT(!msg->msg_sending);
754                 LASSERT(rlen == msg->msg_len);
755                 LASSERT(mlen <= msg->msg_len);
756
757                 msg->msg_wanted = mlen;
758                 msg->msg_offset = offset;
759                 msg->msg_receiving = 0;
760
761                 if (mlen != 0) {
762                         niov = msg->msg_niov;
763                         iov  = msg->msg_iov;
764                         kiov = msg->msg_kiov;
765                 
766                         LASSERT (niov > 0);
767                         LASSERT ((iov == NULL) != (kiov == NULL));
768                 }
769         }
770         
771         rc = (ni->ni_lnd->lnd_recv)(ni, private, msg, delayed,
772                                     niov, iov, kiov, offset, mlen, rlen);
773         if (rc < 0)
774                 lnet_finalize(ni, msg, rc);
775 }
776
777 int
778 lnet_compare_routers(lnet_peer_t *p1, lnet_peer_t *p2)
779 {
780         if (p1->lp_txqnob < p2->lp_txqnob)
781                 return 1;
782         
783         if (p1->lp_txqnob > p2->lp_txqnob)
784                 return -1;
785         
786         if (p1->lp_txcredits > p2->lp_txcredits)
787                 return 1;
788         
789         if (p1->lp_txcredits < p2->lp_txcredits)
790                 return -1;
791         
792         return 0;
793 }
794
795
796 void
797 lnet_setpayloadbuffer(lnet_msg_t *msg)
798 {
799         lnet_libmd_t *md = msg->msg_md;
800
801         LASSERT (msg->msg_len > 0);
802         LASSERT (!msg->msg_routing);
803         LASSERT (md != NULL);
804         LASSERT (msg->msg_niov == 0);
805         LASSERT (msg->msg_iov == NULL);
806         LASSERT (msg->msg_kiov == NULL);
807
808         msg->msg_niov = md->md_niov;
809         if ((md->md_options & LNET_MD_KIOV) != 0)
810                 msg->msg_kiov = md->md_iov.kiov;
811         else
812                 msg->msg_iov = md->md_iov.iov;
813 }
814
815 void
816 lnet_prep_send(lnet_msg_t *msg, int type, lnet_process_id_t target,
817                unsigned int offset, unsigned int len) 
818 {
819         msg->msg_type = type;
820         msg->msg_target = target;
821         msg->msg_len = len;
822         msg->msg_offset = offset;
823
824         if (len != 0)
825                 lnet_setpayloadbuffer(msg);
826
827         memset (&msg->msg_hdr, 0, sizeof (msg->msg_hdr));
828         msg->msg_hdr.type           = cpu_to_le32(type);
829         msg->msg_hdr.dest_nid       = cpu_to_le64(target.nid);
830         msg->msg_hdr.dest_pid       = cpu_to_le32(target.pid);
831         /* src_nid will be set later */
832         msg->msg_hdr.src_pid        = cpu_to_le32(the_lnet.ln_pid);
833         msg->msg_hdr.payload_length = cpu_to_le32(len);
834 }
835
836 void
837 lnet_ni_send(lnet_ni_t *ni, lnet_msg_t *msg) 
838 {
839         void   *priv = msg->msg_private;
840         int     rc;
841
842         LASSERT (!in_interrupt ());
843         LASSERT (LNET_NETTYP(LNET_NIDNET(ni->ni_nid)) == LOLND ||
844                  (msg->msg_txcredit && msg->msg_peertxcredit));
845
846         rc = (ni->ni_lnd->lnd_send)(ni, priv, msg);
847         if (rc < 0)
848                 lnet_finalize(ni, msg, rc);
849 }
850
851 int
852 lnet_eager_recv_locked(lnet_msg_t *msg)
853 {
854         lnet_peer_t *peer;
855         lnet_ni_t   *ni;
856         int          rc = 0;
857
858         LASSERT (!msg->msg_delayed);
859         msg->msg_delayed = 1;
860
861         LASSERT (msg->msg_receiving);
862         LASSERT (!msg->msg_sending);
863         
864         peer = msg->msg_rxpeer;
865         ni   = peer->lp_ni;
866
867         if (ni->ni_lnd->lnd_eager_recv != NULL) {
868                 LNET_UNLOCK();
869                         
870                 rc = (ni->ni_lnd->lnd_eager_recv)(ni, msg->msg_private, msg, 
871                                                   &msg->msg_private);
872                 if (rc != 0) {
873                         CERROR("recv from %s / send to %s aborted: "
874                                "eager_recv failed %d\n",
875                                libcfs_nid2str(peer->lp_nid),
876                                libcfs_id2str(msg->msg_target), rc);
877                         LASSERT (rc < 0); /* required by my callers */
878                 }
879
880                 LNET_LOCK();
881         }
882
883         return rc;
884 }
885
886 int
887 lnet_post_send_locked (lnet_msg_t *msg, int do_send)
888 {
889         /* lnet_send is going to LNET_UNLOCK immediately after this, so it sets
890          * do_send FALSE and I don't do the unlock/send/lock bit.  I return
891          * EAGAIN if msg blocked and 0 if sent or OK to send */
892         lnet_peer_t *lp = msg->msg_txpeer;
893         lnet_ni_t   *ni = lp->lp_ni;
894
895         /* non-lnet_send() callers have checked before */
896         LASSERT (!do_send || msg->msg_delayed);
897         LASSERT (!msg->msg_receiving);
898
899         if (!msg->msg_peertxcredit) {
900                 LASSERT ((lp->lp_txcredits < 0) == !list_empty(&lp->lp_txq));
901
902                 msg->msg_peertxcredit = 1;
903                 lp->lp_txqnob += msg->msg_len + sizeof(lnet_hdr_t);
904                 lp->lp_txcredits--;
905
906                 if (lp->lp_txcredits < lp->lp_mintxcredits)
907                         lp->lp_mintxcredits = lp->lp_txcredits;
908
909                 if (lp->lp_txcredits < 0) {
910                         msg->msg_delayed = 1;
911                         list_add_tail (&msg->msg_list, &lp->lp_txq);
912                         return EAGAIN;
913                 }
914         }
915         
916         if (!msg->msg_txcredit) {
917                 LASSERT ((ni->ni_txcredits < 0) == !list_empty(&ni->ni_txq));
918
919                 msg->msg_txcredit = 1;
920                 ni->ni_txcredits--;
921
922                 if (ni->ni_txcredits < ni->ni_mintxcredits)
923                         ni->ni_mintxcredits = ni->ni_txcredits;
924
925                 if (ni->ni_txcredits < 0) {
926                         msg->msg_delayed = 1;
927                         list_add_tail (&msg->msg_list, &ni->ni_txq);
928                         return EAGAIN;
929                 }
930         }
931
932         if (do_send) {
933                 LNET_UNLOCK();
934                 lnet_ni_send(ni, msg);
935                 LNET_LOCK();
936         }
937         return 0;
938 }
939
940 #ifdef __KERNEL__
941 static void
942 lnet_commit_routedmsg (lnet_msg_t *msg)
943 {
944         /* ALWAYS called holding the LNET_LOCK */
945         LASSERT (msg->msg_routing);
946         
947         the_lnet.ln_counters.msgs_alloc++;
948         if (the_lnet.ln_counters.msgs_alloc > 
949             the_lnet.ln_counters.msgs_max)
950                 the_lnet.ln_counters.msgs_max = 
951                         the_lnet.ln_counters.msgs_alloc;
952
953         the_lnet.ln_counters.route_count++;
954         the_lnet.ln_counters.route_length += msg->msg_len;
955         
956         LASSERT (!msg->msg_onactivelist);
957         msg->msg_onactivelist = 1;
958         list_add (&msg->msg_activelist, &the_lnet.ln_active_msgs);
959 }
960
961 lnet_rtrbufpool_t *
962 lnet_msg2bufpool(lnet_msg_t *msg) 
963 {
964         lnet_rtrbufpool_t *rbp = &the_lnet.ln_rtrpools[0];
965
966         LASSERT (msg->msg_len <= LNET_MTU);
967         while (msg->msg_len > rbp->rbp_npages * CFS_PAGE_SIZE) {
968                 rbp++;
969                 LASSERT (rbp < &the_lnet.ln_rtrpools[LNET_NRBPOOLS]);
970         }
971
972         return rbp;
973 }
974
975 int
976 lnet_post_routed_recv_locked (lnet_msg_t *msg, int do_recv)
977 {
978         /* lnet_parse is going to LNET_UNLOCK immediately after this, so it
979          * sets do_recv FALSE and I don't do the unlock/send/lock bit.  I
980          * return EAGAIN if msg blocked and 0 if sent or OK to send */
981         lnet_peer_t         *lp = msg->msg_rxpeer;
982         lnet_rtrbufpool_t   *rbp;
983         lnet_rtrbuf_t       *rb;
984
985         LASSERT (msg->msg_iov == NULL);
986         LASSERT (msg->msg_kiov == NULL);
987         LASSERT (msg->msg_niov == 0);
988         LASSERT (msg->msg_routing);
989         LASSERT (msg->msg_receiving);
990         LASSERT (!msg->msg_sending);
991
992         /* non-lnet_parse callers only send delayed messages */
993         LASSERT (!do_recv || msg->msg_delayed);
994
995         if (!msg->msg_peerrtrcredit) {
996                 LASSERT ((lp->lp_rtrcredits < 0) == !list_empty(&lp->lp_rtrq));
997                 
998                 msg->msg_peerrtrcredit = 1;
999                 lp->lp_rtrcredits--;
1000                 if (lp->lp_rtrcredits < lp->lp_minrtrcredits)
1001                         lp->lp_minrtrcredits = lp->lp_rtrcredits;
1002                         
1003                 if (lp->lp_rtrcredits < 0) {
1004                         /* must have checked eager_recv before here */
1005                         LASSERT (msg->msg_delayed);
1006                         list_add_tail(&msg->msg_list, &lp->lp_rtrq);
1007                         return EAGAIN;
1008                 }
1009         }
1010         
1011         rbp = lnet_msg2bufpool(msg);
1012
1013         if (!msg->msg_rtrcredit) {
1014                 LASSERT ((rbp->rbp_credits < 0) == !list_empty(&rbp->rbp_msgs));
1015
1016                 msg->msg_rtrcredit = 1;
1017                 rbp->rbp_credits--;
1018                 if (rbp->rbp_credits < rbp->rbp_mincredits)
1019                         rbp->rbp_mincredits = rbp->rbp_credits;
1020
1021                 if (rbp->rbp_credits < 0) {
1022                         /* must have checked eager_recv before here */
1023                         LASSERT (msg->msg_delayed);
1024                         list_add_tail(&msg->msg_list, &rbp->rbp_msgs);
1025                         return EAGAIN;
1026                 }
1027         }
1028         
1029         LASSERT (!list_empty(&rbp->rbp_bufs));
1030         rb = list_entry(rbp->rbp_bufs.next, lnet_rtrbuf_t, rb_list);
1031         list_del(&rb->rb_list);
1032         
1033         msg->msg_niov = rbp->rbp_npages;
1034         msg->msg_kiov = &rb->rb_kiov[0];
1035
1036         if (do_recv) {
1037                 LNET_UNLOCK();
1038                 lnet_ni_recv(lp->lp_ni, msg->msg_private, msg, 1,
1039                              0, msg->msg_len, msg->msg_len);
1040                 LNET_LOCK();
1041         }
1042         return 0;
1043 }
1044 #endif
1045
1046 void
1047 lnet_return_credits_locked (lnet_msg_t *msg)
1048 {
1049         lnet_peer_t       *txpeer = msg->msg_txpeer;
1050         lnet_peer_t       *rxpeer = msg->msg_rxpeer;
1051         lnet_msg_t        *msg2;
1052         lnet_ni_t         *ni;
1053
1054         if (msg->msg_txcredit) {
1055                 /* give back NI txcredits */
1056                 msg->msg_txcredit = 0;
1057                 ni = txpeer->lp_ni;
1058
1059                 LASSERT((ni->ni_txcredits < 0) == !list_empty(&ni->ni_txq));
1060
1061                 ni->ni_txcredits++;
1062                 if (ni->ni_txcredits <= 0) {
1063                         msg2 = list_entry(ni->ni_txq.next, lnet_msg_t, msg_list);
1064                         list_del(&msg2->msg_list);
1065
1066                         LASSERT(msg2->msg_txpeer->lp_ni == ni);
1067                         LASSERT(msg2->msg_delayed);
1068
1069                         (void) lnet_post_send_locked(msg2, 1);
1070                 }
1071         }
1072
1073         if (msg->msg_peertxcredit) {
1074                 /* give back peer txcredits */
1075                 msg->msg_peertxcredit = 0;
1076
1077                 LASSERT((txpeer->lp_txcredits < 0) == !list_empty(&txpeer->lp_txq));
1078
1079                 txpeer->lp_txqnob -= msg->msg_len + sizeof(lnet_hdr_t);
1080                 LASSERT (txpeer->lp_txqnob >= 0);
1081
1082                 txpeer->lp_txcredits++;
1083                 if (txpeer->lp_txcredits <= 0) {
1084                         msg2 = list_entry(txpeer->lp_txq.next, 
1085                                           lnet_msg_t, msg_list);
1086                         list_del(&msg2->msg_list);
1087
1088                         LASSERT (msg2->msg_txpeer == txpeer);
1089                         LASSERT (msg2->msg_delayed);
1090
1091                         (void) lnet_post_send_locked(msg2, 1);
1092                 }
1093         }
1094
1095         if (txpeer != NULL) {
1096                 msg->msg_txpeer = NULL;
1097                 lnet_peer_decref_locked(txpeer);
1098         }
1099
1100 #ifdef __KERNEL__        
1101         if (msg->msg_rtrcredit) {
1102                 /* give back global router credits */
1103                 lnet_rtrbuf_t     *rb;
1104                 lnet_rtrbufpool_t *rbp;
1105
1106                 /* NB If a msg ever blocks for a buffer in rbp_msgs, it stays
1107                  * there until it gets one allocated, or aborts the wait
1108                  * itself */
1109                 LASSERT (msg->msg_kiov != NULL);
1110                 
1111                 rb = list_entry(msg->msg_kiov, lnet_rtrbuf_t, rb_kiov[0]);
1112                 rbp = rb->rb_pool;
1113                 LASSERT (rbp == lnet_msg2bufpool(msg));
1114
1115                 msg->msg_kiov = NULL;
1116                 msg->msg_rtrcredit = 0;
1117                 
1118                 LASSERT((rbp->rbp_credits < 0) == !list_empty(&rbp->rbp_msgs));
1119                 LASSERT((rbp->rbp_credits > 0) == !list_empty(&rbp->rbp_bufs));
1120
1121                 list_add(&rb->rb_list, &rbp->rbp_bufs);
1122                 rbp->rbp_credits++;
1123                 if (rbp->rbp_credits <= 0) {
1124                         msg2 = list_entry(rbp->rbp_msgs.next, 
1125                                           lnet_msg_t, msg_list);
1126                         list_del(&msg2->msg_list);
1127                         
1128                         (void) lnet_post_routed_recv_locked(msg2, 1);
1129                 }
1130         }
1131         
1132         if (msg->msg_peerrtrcredit) {
1133                 /* give back peer router credits */
1134                 msg->msg_peerrtrcredit = 0;
1135                 
1136                 LASSERT((rxpeer->lp_rtrcredits < 0) == !list_empty(&rxpeer->lp_rtrq));
1137
1138                 rxpeer->lp_rtrcredits++;
1139                 if (rxpeer->lp_rtrcredits <= 0) {
1140                         msg2 = list_entry(rxpeer->lp_rtrq.next,
1141                                           lnet_msg_t, msg_list);
1142                         list_del(&msg2->msg_list);
1143                         
1144                         (void) lnet_post_routed_recv_locked(msg2, 1);
1145                 }
1146         }
1147 #else
1148         LASSERT (!msg->msg_rtrcredit);
1149         LASSERT (!msg->msg_peerrtrcredit);
1150 #endif
1151         if (rxpeer != NULL) {
1152                 msg->msg_rxpeer = NULL;
1153                 lnet_peer_decref_locked(rxpeer);
1154         }
1155 }
1156
1157 int
1158 lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg)
1159 {
1160         lnet_nid_t        dst_nid = msg->msg_target.nid;
1161         lnet_ni_t        *src_ni;
1162         lnet_ni_t        *local_ni;
1163         lnet_remotenet_t *rnet;
1164         lnet_route_t     *route;
1165         lnet_route_t     *best_route;
1166         struct list_head *tmp;
1167         lnet_peer_t      *lp;
1168         lnet_peer_t      *lp2;
1169         int               rc;
1170
1171         LASSERT (msg->msg_txpeer == NULL);
1172         LASSERT (!msg->msg_sending);
1173         LASSERT (!msg->msg_target_is_router);
1174         LASSERT (!msg->msg_receiving);
1175
1176         msg->msg_sending = 1;
1177
1178         /* NB! ni != NULL == interface pre-determined (ACK/REPLY) */
1179
1180         LNET_LOCK();
1181
1182         if (the_lnet.ln_shutdown) {
1183                 LNET_UNLOCK();
1184                 return -ESHUTDOWN;
1185         }
1186
1187         if (src_nid == LNET_NID_ANY) {
1188                 src_ni = NULL;
1189         } else {
1190                 src_ni = lnet_nid2ni_locked(src_nid);
1191                 if (src_ni == NULL) {
1192                         LNET_UNLOCK();
1193                         CERROR("Can't send to %s: src %s is not a local nid\n",
1194                                libcfs_nid2str(dst_nid), libcfs_nid2str(src_nid));
1195                         return -EINVAL;
1196                 }
1197                 LASSERT (!msg->msg_routing);
1198         }
1199
1200         /* Is this for someone on a local network? */ 
1201         local_ni = lnet_net2ni_locked(LNET_NIDNET(dst_nid));
1202
1203         if (local_ni != NULL) {
1204                 if (src_ni == NULL) {
1205                         src_ni = local_ni;
1206                         src_nid = src_ni->ni_nid;
1207                 } else if (src_ni == local_ni) {
1208                         lnet_ni_decref_locked(local_ni);
1209                 } else {
1210                         lnet_ni_decref_locked(local_ni);
1211                         lnet_ni_decref_locked(src_ni);
1212                         LNET_UNLOCK();
1213                         CERROR("no route to %s via from %s\n",
1214                                libcfs_nid2str(dst_nid), libcfs_nid2str(src_nid));
1215                         return -EINVAL;
1216                 }
1217
1218                 LASSERT (src_nid != LNET_NID_ANY);
1219
1220                 if (!msg->msg_routing) {
1221                         src_nid = lnet_ptlcompat_srcnid(src_nid, dst_nid);
1222                         msg->msg_hdr.src_nid = cpu_to_le64(src_nid);
1223                 }
1224                 
1225                 if (src_ni == the_lnet.ln_loni) {
1226                         /* No send credit hassles with LOLND */
1227                         LNET_UNLOCK();
1228                         lnet_ni_send(src_ni, msg);
1229                         lnet_ni_decref(src_ni);
1230                         return 0;
1231                 }
1232                 
1233                 rc = lnet_nid2peer_locked(&lp, dst_nid);
1234                 lnet_ni_decref_locked(src_ni);  /* lp has ref on src_ni; lose mine */
1235                 if (rc != 0) {
1236                         LNET_UNLOCK();
1237                         CERROR("Error %d finding peer %s\n", rc,
1238                                libcfs_nid2str(dst_nid));
1239                         /* ENOMEM or shutting down */
1240                         return rc;
1241                 }
1242                 LASSERT (lp->lp_ni == src_ni);
1243         } else {
1244                 /* sending to a remote network */
1245                 rnet = lnet_find_net_locked(LNET_NIDNET(dst_nid));
1246                 if (rnet == NULL) {
1247                         if (src_ni != NULL)
1248                                 lnet_ni_decref_locked(src_ni);
1249                         LNET_UNLOCK();
1250                         CERROR("No route to %s\n", libcfs_id2str(msg->msg_target));
1251                         return -EHOSTUNREACH;
1252                 }
1253
1254                 /* Find the best gateway I can use */
1255                 lp = NULL;
1256                 best_route = NULL;
1257                 list_for_each(tmp, &rnet->lrn_routes) {
1258                         route = list_entry(tmp, lnet_route_t, lr_list);
1259                         lp2 = route->lr_gateway;
1260
1261                         if (lp2->lp_alive &&
1262                             (src_ni == NULL || lp2->lp_ni == src_ni) &&
1263                             (lp == NULL || lnet_compare_routers(lp2, lp) > 0)) {
1264                                 best_route = route;
1265                                 lp = lp2;
1266                         }
1267                 }
1268
1269                 if (lp == NULL) {
1270                         if (src_ni != NULL)
1271                                 lnet_ni_decref_locked(src_ni);
1272                         LNET_UNLOCK();
1273                         CERROR("No route to %s (all routers down)\n", 
1274                                libcfs_id2str(msg->msg_target));
1275                         return -EHOSTUNREACH;
1276                 }
1277
1278                 /* Place selected route at the end of the route list to ensure
1279                  * fairness; everything else being equal... */
1280                 list_del(&best_route->lr_list);
1281                 list_add_tail(&best_route->lr_list, &rnet->lrn_routes);
1282
1283                 if (src_ni == NULL) {
1284                         src_ni = lp->lp_ni;
1285                         src_nid = src_ni->ni_nid;
1286                 } else {
1287                         LASSERT (src_ni == lp->lp_ni);
1288                         lnet_ni_decref_locked(src_ni);
1289                 }
1290
1291                 lnet_peer_addref_locked(lp);
1292
1293                 LASSERT (src_nid != LNET_NID_ANY);
1294
1295                 if (!msg->msg_routing) {
1296                         /* I'm the source and now I know which NI to send on */
1297                         src_nid = lnet_ptlcompat_srcnid(src_nid, dst_nid);
1298                         msg->msg_hdr.src_nid = cpu_to_le64(src_nid);
1299                 }
1300
1301                 msg->msg_target_is_router = 1;
1302                 msg->msg_target.nid = lp->lp_nid;
1303                 msg->msg_target.pid = LUSTRE_SRV_LNET_PID;
1304         }
1305
1306         /* 'lp' is our best choice of peer */
1307
1308         LASSERT (!msg->msg_peertxcredit);
1309         LASSERT (!msg->msg_txcredit);
1310         LASSERT (msg->msg_txpeer == NULL);
1311
1312         msg->msg_txpeer = lp;                   /* msg takes my ref on lp */
1313
1314         rc = lnet_post_send_locked(msg, 0);
1315         LNET_UNLOCK();
1316
1317         if (rc == 0)
1318                 lnet_ni_send(src_ni, msg);
1319
1320         return 0;
1321 }
1322
1323 static void
1324 lnet_commit_md (lnet_libmd_t *md, lnet_msg_t *msg)
1325 {
1326         /* ALWAYS called holding the LNET_LOCK */
1327         /* Here, we commit the MD to a network OP by marking it busy and
1328          * decrementing its threshold.  Come what may, the network "owns"
1329          * the MD until a call to lnet_finalize() signals completion. */
1330         LASSERT (!msg->msg_routing);
1331
1332         msg->msg_md = md;
1333
1334         md->md_refcount++;
1335         if (md->md_threshold != LNET_MD_THRESH_INF) {
1336                 LASSERT (md->md_threshold > 0);
1337                 md->md_threshold--;
1338         }
1339
1340         the_lnet.ln_counters.msgs_alloc++;
1341         if (the_lnet.ln_counters.msgs_alloc > 
1342             the_lnet.ln_counters.msgs_max)
1343                 the_lnet.ln_counters.msgs_max = 
1344                         the_lnet.ln_counters.msgs_alloc;
1345
1346         LASSERT (!msg->msg_onactivelist);
1347         msg->msg_onactivelist = 1;
1348         list_add (&msg->msg_activelist, &the_lnet.ln_active_msgs);
1349 }
1350
1351 static void
1352 lnet_drop_message (lnet_ni_t *ni, void *private, unsigned int nob)
1353 {
1354         LNET_LOCK();
1355         the_lnet.ln_counters.drop_count++;
1356         the_lnet.ln_counters.drop_length += nob;
1357         LNET_UNLOCK();
1358         
1359         lnet_ni_recv(ni, private, NULL, 0, 0, 0, nob);
1360 }
1361
1362 static void
1363 lnet_drop_delayed_put(lnet_msg_t *msg, char *reason) 
1364 {
1365         LASSERT (msg->msg_md == NULL);
1366         LASSERT (msg->msg_delayed);
1367         LASSERT (msg->msg_rxpeer != NULL);
1368         LASSERT (msg->msg_hdr.type == LNET_MSG_PUT);
1369
1370         CWARN("Dropping delayed PUT from %s portal %d match "LPU64
1371               " offset %d length %d: %s\n", 
1372               libcfs_id2str((lnet_process_id_t){        
1373                       .nid = msg->msg_hdr.src_nid,
1374                       .pid = msg->msg_hdr.src_pid}),
1375               msg->msg_hdr.msg.put.ptl_index, 
1376               msg->msg_hdr.msg.put.match_bits, 
1377               msg->msg_hdr.msg.put.offset,
1378               msg->msg_hdr.payload_length,
1379               reason);
1380
1381         /* NB I can't drop msg's ref on msg_rxpeer until after I've
1382          * called lnet_drop_message(), so I just hang onto msg as well
1383          * until that's done */
1384
1385         lnet_drop_message(msg->msg_rxpeer->lp_ni, 
1386                           msg->msg_private, msg->msg_len);
1387
1388         LNET_LOCK();
1389
1390         lnet_peer_decref_locked(msg->msg_rxpeer);
1391         msg->msg_rxpeer = NULL;
1392                 
1393         lnet_msg_free(msg);
1394                 
1395         LNET_UNLOCK();
1396 }
1397
1398 int
1399 LNetSetLazyPortal(int portal)
1400 {
1401         lnet_portal_t *ptl = &the_lnet.ln_portals[portal];
1402
1403         if (portal < 0 || portal >= the_lnet.ln_nportals)
1404                 return -EINVAL;
1405
1406         CDEBUG(D_NET, "Setting portal %d lazy\n", portal);
1407
1408         LNET_LOCK();
1409
1410         ptl->ptl_options |= LNET_PTL_LAZY;
1411
1412         LNET_UNLOCK();
1413
1414         return 0;
1415 }
1416
1417 int
1418 LNetClearLazyPortal(int portal)
1419 {
1420         struct list_head  zombies;
1421         lnet_portal_t    *ptl = &the_lnet.ln_portals[portal];
1422         lnet_msg_t       *msg;
1423
1424         if (portal < 0 || portal >= the_lnet.ln_nportals)
1425                 return -EINVAL;
1426
1427         LNET_LOCK();
1428
1429         if ((ptl->ptl_options & LNET_PTL_LAZY) == 0) {
1430                 LNET_UNLOCK();
1431                 return 0;
1432         }
1433
1434         if (the_lnet.ln_shutdown)
1435                 CWARN ("Active lazy portal %d on exit\n", portal);
1436         else
1437                 CDEBUG (D_NET, "clearing portal %d lazy\n", portal);
1438
1439         /* grab all the blocked messages atomically */
1440         list_add(&zombies, &ptl->ptl_msgq);
1441         list_del_init(&ptl->ptl_msgq);
1442
1443         ptl->ptl_msgq_version++;
1444         ptl->ptl_options &= ~LNET_PTL_LAZY;
1445
1446         LNET_UNLOCK();
1447         
1448         while (!list_empty(&zombies)) {
1449                 msg = list_entry(zombies.next, lnet_msg_t, msg_list);
1450                 list_del(&msg->msg_list);
1451
1452                 lnet_drop_delayed_put(msg, "Clearing lazy portal attr");
1453         }
1454
1455         return 0;
1456 }
1457
1458 static void
1459 lnet_recv_put(lnet_libmd_t *md, lnet_msg_t *msg, int delayed,
1460               unsigned int offset, unsigned int mlength)
1461 {
1462         lnet_hdr_t       *hdr = &msg->msg_hdr;
1463
1464         LNET_LOCK();
1465
1466         the_lnet.ln_counters.recv_count++;
1467         the_lnet.ln_counters.recv_length += mlength;
1468
1469         LNET_UNLOCK();
1470
1471         if (mlength != 0)
1472                 lnet_setpayloadbuffer(msg);
1473
1474         msg->msg_ev.type       = LNET_EVENT_PUT;
1475         msg->msg_ev.target.pid = hdr->dest_pid;
1476         msg->msg_ev.target.nid = hdr->dest_nid;
1477         msg->msg_ev.hdr_data   = hdr->msg.put.hdr_data;
1478
1479         /* Must I ACK?  If so I'll grab the ack_wmd out of the header and put
1480          * it back into the ACK during lnet_finalize() */
1481         msg->msg_ack = (!lnet_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
1482                         (md->md_options & LNET_MD_ACK_DISABLE) == 0);
1483         
1484         lnet_ni_recv(msg->msg_rxpeer->lp_ni, 
1485                      msg->msg_private, 
1486                      msg, delayed, offset, mlength, 
1487                      hdr->payload_length);
1488 }
1489
1490 /* called with LNET_LOCK held */
1491 void
1492 lnet_match_blocked_msg(lnet_libmd_t *md)
1493 {
1494         CFS_LIST_HEAD    (drops);
1495         CFS_LIST_HEAD    (matches);
1496         struct list_head *tmp;
1497         struct list_head *entry;
1498         lnet_msg_t       *msg;
1499         lnet_me_t        *me  = md->md_me;
1500         lnet_portal_t    *ptl = &the_lnet.ln_portals[me->me_portal];
1501
1502         LASSERT (me->me_portal < the_lnet.ln_nportals);
1503
1504         if ((ptl->ptl_options & LNET_PTL_LAZY) == 0) {
1505                 LASSERT (list_empty(&ptl->ptl_msgq));
1506                 return;
1507         }
1508
1509         LASSERT (md->md_refcount == 0); /* a brand new MD */
1510
1511         list_for_each_safe (entry, tmp, &ptl->ptl_msgq) {
1512                 int               rc;
1513                 int               index;
1514                 unsigned int      mlength;
1515                 unsigned int      offset;
1516                 lnet_hdr_t       *hdr;
1517                 lnet_process_id_t src;
1518
1519                 msg = list_entry(entry, lnet_msg_t, msg_list);
1520
1521                 LASSERT (msg->msg_delayed);
1522
1523                 hdr   = &msg->msg_hdr;
1524                 index = hdr->msg.put.ptl_index;
1525
1526                 src.nid = hdr->src_nid;
1527                 src.pid = hdr->src_pid;
1528
1529                 rc = lnet_try_match_md(index, LNET_MD_OP_PUT, src,
1530                                        hdr->payload_length, 
1531                                        hdr->msg.put.offset, 
1532                                        hdr->msg.put.match_bits, 
1533                                        md, msg, &mlength, &offset);
1534
1535                 if (rc == LNET_MATCHMD_NONE)
1536                         continue;
1537                 
1538                 /* Hurrah! This _is_ a match */
1539                 list_del(&msg->msg_list); 
1540                 ptl->ptl_msgq_version++;
1541
1542                 if (rc == LNET_MATCHMD_OK) {
1543                         list_add_tail(&msg->msg_list, &matches);
1544
1545                         CDEBUG(D_NET, "Resuming delayed PUT from %s portal %d "
1546                                "match "LPU64" offset %d length %d.\n",
1547                                libcfs_id2str(src),
1548                                hdr->msg.put.ptl_index, 
1549                                hdr->msg.put.match_bits, 
1550                                hdr->msg.put.offset,
1551                                hdr->payload_length);
1552                 } else {
1553                         LASSERT (rc == LNET_MATCHMD_DROP);
1554
1555                         list_add_tail(&msg->msg_list, &drops);
1556                 }
1557
1558                 if (lnet_md_exhausted(md))
1559                         break;
1560         }
1561
1562         LNET_UNLOCK();
1563
1564         list_for_each_safe (entry, tmp, &drops) {
1565                 msg = list_entry(entry, lnet_msg_t, msg_list);
1566
1567                 list_del(&msg->msg_list); 
1568
1569                 lnet_drop_delayed_put(msg, "Bad match");
1570         }
1571
1572         list_for_each_safe (entry, tmp, &matches) {
1573                 msg = list_entry(entry, lnet_msg_t, msg_list);
1574
1575                 list_del(&msg->msg_list); 
1576
1577                 /* md won't disappear under me, since each msg
1578                  * holds a ref on it */
1579                 lnet_recv_put(md, msg, 1,
1580                               msg->msg_ev.offset,
1581                               msg->msg_ev.mlength);
1582         }
1583
1584         LNET_LOCK();
1585 }
1586
1587 static int
1588 lnet_parse_put(lnet_ni_t *ni, lnet_msg_t *msg)
1589 {
1590         int               rc;
1591         int               index;
1592         lnet_hdr_t       *hdr = &msg->msg_hdr;
1593         unsigned int      rlength = hdr->payload_length;
1594         unsigned int      mlength = 0;
1595         unsigned int      offset = 0;
1596         lnet_process_id_t src = {/* .nid = */ hdr->src_nid,
1597                                  /* .pid = */ hdr->src_pid};
1598         lnet_libmd_t     *md;
1599
1600         /* Convert put fields to host byte order */
1601         hdr->msg.put.match_bits = le64_to_cpu(hdr->msg.put.match_bits);
1602         hdr->msg.put.ptl_index = le32_to_cpu(hdr->msg.put.ptl_index);
1603         hdr->msg.put.offset = le32_to_cpu(hdr->msg.put.offset);
1604
1605         index = hdr->msg.put.ptl_index;
1606
1607         LNET_LOCK();
1608
1609         rc = lnet_match_md(index, LNET_MD_OP_PUT, src,
1610                            rlength, hdr->msg.put.offset,
1611                            hdr->msg.put.match_bits, msg,
1612                            &mlength, &offset, &md);
1613         switch (rc) {
1614         default:
1615                 LBUG();
1616                 
1617         case LNET_MATCHMD_OK:
1618                 LNET_UNLOCK();
1619                 lnet_recv_put(md, msg, 0, offset, mlength);
1620                 return 0;
1621                 
1622         case LNET_MATCHMD_NONE:
1623                 rc = lnet_eager_recv_locked(msg);
1624                 if (rc == 0 && !the_lnet.ln_shutdown) {
1625                         list_add_tail(&msg->msg_list, 
1626                                       &the_lnet.ln_portals[index].ptl_msgq);
1627
1628                         the_lnet.ln_portals[index].ptl_msgq_version++;
1629
1630                         CDEBUG(D_NET, "Delaying PUT from %s portal %d match "
1631                                LPU64" offset %d length %d: no match \n",
1632                                libcfs_id2str(src), index, 
1633                                hdr->msg.put.match_bits, 
1634                                hdr->msg.put.offset, rlength);
1635                         
1636                         LNET_UNLOCK();
1637                         return 0;
1638                 }
1639                 /* fall through */
1640
1641         case LNET_MATCHMD_DROP:
1642                 CDEBUG(D_NETERROR,
1643                        "Dropping PUT from %s portal %d match "LPU64
1644                        " offset %d length %d: %d\n",
1645                        libcfs_id2str(src), index, 
1646                        hdr->msg.put.match_bits, 
1647                        hdr->msg.put.offset, rlength, rc);
1648                 LNET_UNLOCK();
1649
1650                 return ENOENT;          /* +ve: OK but no match */
1651         }
1652 }
1653
1654 static int
1655 lnet_parse_get(lnet_ni_t *ni, lnet_msg_t *msg, int rdma_get)
1656 {
1657         lnet_hdr_t        *hdr = &msg->msg_hdr;
1658         unsigned int       mlength = 0;
1659         unsigned int       offset = 0;
1660         lnet_process_id_t  src = {/* .nid = */ hdr->src_nid,
1661                                   /* .pid = */ hdr->src_pid};
1662         lnet_handle_wire_t reply_wmd;
1663         lnet_libmd_t      *md;
1664         int                rc;
1665
1666         /* Convert get fields to host byte order */
1667         hdr->msg.get.match_bits = le64_to_cpu(hdr->msg.get.match_bits);
1668         hdr->msg.get.ptl_index = le32_to_cpu(hdr->msg.get.ptl_index);
1669         hdr->msg.get.sink_length = le32_to_cpu(hdr->msg.get.sink_length);
1670         hdr->msg.get.src_offset = le32_to_cpu(hdr->msg.get.src_offset);
1671
1672         LNET_LOCK();
1673
1674         rc = lnet_match_md(hdr->msg.get.ptl_index, LNET_MD_OP_GET, src,
1675                            hdr->msg.get.sink_length, hdr->msg.get.src_offset,
1676                            hdr->msg.get.match_bits, msg,
1677                            &mlength, &offset, &md);
1678         if (rc == LNET_MATCHMD_DROP) {
1679                 CDEBUG(D_NETERROR,
1680                        "Dropping GET from %s portal %d match "LPU64
1681                        " offset %d length %d\n",
1682                        libcfs_id2str(src), 
1683                        hdr->msg.get.ptl_index, 
1684                        hdr->msg.get.match_bits, 
1685                        hdr->msg.get.src_offset,
1686                        hdr->msg.get.sink_length);
1687                 LNET_UNLOCK();
1688                 return ENOENT;                  /* +ve: OK but no match */
1689         }
1690
1691         LASSERT (rc == LNET_MATCHMD_OK);
1692                 
1693         the_lnet.ln_counters.send_count++;
1694         the_lnet.ln_counters.send_length += mlength;
1695
1696         LNET_UNLOCK();
1697
1698         reply_wmd = hdr->msg.get.return_wmd;
1699
1700         lnet_prep_send(msg, LNET_MSG_REPLY, src, offset, mlength);
1701
1702         msg->msg_hdr.msg.reply.dst_wmd = reply_wmd;
1703
1704         msg->msg_ev.type = LNET_EVENT_GET;
1705         msg->msg_ev.target.pid = hdr->dest_pid;
1706         msg->msg_ev.target.nid = hdr->dest_nid;
1707         msg->msg_ev.hdr_data = 0;
1708
1709         if (rdma_get) {
1710                 /* The LND completes the REPLY from her recv procedure */
1711                 lnet_ni_recv(ni, msg->msg_private, msg, 0,
1712                              msg->msg_offset, msg->msg_len, msg->msg_len);
1713                 return 0;
1714         }
1715
1716         lnet_ni_recv(ni, msg->msg_private, NULL, 0, 0, 0, 0);
1717         msg->msg_receiving = 0;
1718                              
1719         rc = lnet_send(ni->ni_nid, msg);
1720         if (rc < 0) {
1721                 /* didn't get as far as lnet_ni_send() */
1722                 CERROR("%s: Unable to send REPLY for GET from %s: %d\n",
1723                        libcfs_nid2str(ni->ni_nid), libcfs_id2str(src), rc);
1724
1725                 lnet_finalize(ni, msg, rc);
1726         }
1727
1728         return 0;
1729 }
1730
1731 static int
1732 lnet_parse_reply(lnet_ni_t *ni, lnet_msg_t *msg)
1733 {
1734         void             *private = msg->msg_private;
1735         lnet_hdr_t       *hdr = &msg->msg_hdr;
1736         lnet_process_id_t src = {/* .nid = */ hdr->src_nid,
1737                                  /* .pid = */ hdr->src_pid};
1738         lnet_libmd_t     *md;
1739         int               rlength;
1740         int               mlength;
1741
1742         LNET_LOCK();
1743
1744         /* NB handles only looked up by creator (no flips) */
1745         md = lnet_wire_handle2md(&hdr->msg.reply.dst_wmd);
1746         if (md == NULL || md->md_threshold == 0) {
1747                 CDEBUG(D_NETERROR, "%s: Dropping REPLY from %s for %s "
1748                        "MD "LPX64"."LPX64"\n", 
1749                        libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1750                        (md == NULL) ? "invalid" : "inactive",
1751                        hdr->msg.reply.dst_wmd.wh_interface_cookie,
1752                        hdr->msg.reply.dst_wmd.wh_object_cookie);
1753
1754                 LNET_UNLOCK();
1755                 return ENOENT;                  /* +ve: OK but no match */
1756         }
1757
1758         LASSERT (md->md_offset == 0);
1759
1760         rlength = hdr->payload_length;
1761         mlength = MIN(rlength, md->md_length);
1762
1763         if (mlength < rlength &&
1764             (md->md_options & LNET_MD_TRUNCATE) == 0) {
1765                 CDEBUG(D_NETERROR, "%s: Dropping REPLY from %s length %d "
1766                        "for MD "LPX64" would overflow (%d)\n",
1767                        libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1768                         rlength, hdr->msg.reply.dst_wmd.wh_object_cookie,
1769                         mlength);
1770                 LNET_UNLOCK();
1771                 return ENOENT;          /* +ve: OK but no match */
1772         }
1773
1774         CDEBUG(D_NET, "%s: Reply from %s of length %d/%d into md "LPX64"\n",
1775                libcfs_nid2str(ni->ni_nid), libcfs_id2str(src), 
1776                mlength, rlength, hdr->msg.reply.dst_wmd.wh_object_cookie);
1777
1778         lnet_commit_md(md, msg);
1779
1780         if (mlength != 0)
1781                 lnet_setpayloadbuffer(msg);
1782
1783         msg->msg_ev.type = LNET_EVENT_REPLY;
1784         msg->msg_ev.target.pid = hdr->dest_pid;
1785         msg->msg_ev.target.nid = hdr->dest_nid;
1786         msg->msg_ev.initiator = src;
1787         msg->msg_ev.rlength = rlength;
1788         msg->msg_ev.mlength = mlength;
1789         msg->msg_ev.offset = 0;
1790
1791         lnet_md_deconstruct(md, &msg->msg_ev.md);
1792         lnet_md2handle(&msg->msg_ev.md_handle, md);
1793
1794         the_lnet.ln_counters.recv_count++;
1795         the_lnet.ln_counters.recv_length += mlength;
1796
1797         LNET_UNLOCK();
1798
1799         lnet_ni_recv(ni, private, msg, 0, 0, mlength, rlength);
1800         return 0;
1801 }
1802
1803 static int
1804 lnet_parse_ack(lnet_ni_t *ni, lnet_msg_t *msg)
1805 {
1806         lnet_hdr_t       *hdr = &msg->msg_hdr;
1807         lnet_process_id_t src = {/* .nid = */ hdr->src_nid,
1808                                  /* .pid = */ hdr->src_pid};
1809         lnet_libmd_t    *md;
1810
1811         /* Convert ack fields to host byte order */
1812         hdr->msg.ack.match_bits = le64_to_cpu(hdr->msg.ack.match_bits);
1813         hdr->msg.ack.mlength = le32_to_cpu(hdr->msg.ack.mlength);
1814
1815         LNET_LOCK();
1816
1817         /* NB handles only looked up by creator (no flips) */
1818         md = lnet_wire_handle2md(&hdr->msg.ack.dst_wmd);
1819         if (md == NULL || md->md_threshold == 0) {
1820                 /* Don't moan; this is expected */
1821                 CDEBUG(D_NET,
1822                        "%s: Dropping ACK from %s to %s MD "LPX64"."LPX64"\n",
1823                        libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1824                        (md == NULL) ? "invalid" : "inactive",
1825                        hdr->msg.ack.dst_wmd.wh_interface_cookie,
1826                        hdr->msg.ack.dst_wmd.wh_object_cookie);
1827                 LNET_UNLOCK();
1828                 return ENOENT;                  /* +ve! */
1829         }
1830
1831         CDEBUG(D_NET, "%s: ACK from %s into md "LPX64"\n",
1832                libcfs_nid2str(ni->ni_nid), libcfs_id2str(src), 
1833                hdr->msg.ack.dst_wmd.wh_object_cookie);
1834
1835         lnet_commit_md(md, msg);
1836
1837         msg->msg_ev.type = LNET_EVENT_ACK;
1838         msg->msg_ev.target.pid = hdr->dest_pid;
1839         msg->msg_ev.target.nid = hdr->dest_nid;
1840         msg->msg_ev.initiator = src;
1841         msg->msg_ev.mlength = hdr->msg.ack.mlength;
1842         msg->msg_ev.match_bits = hdr->msg.ack.match_bits;
1843
1844         lnet_md_deconstruct(md, &msg->msg_ev.md);
1845         lnet_md2handle(&msg->msg_ev.md_handle, md);
1846
1847         the_lnet.ln_counters.recv_count++;
1848
1849         LNET_UNLOCK();
1850
1851         lnet_ni_recv(ni, msg->msg_private, msg, 0, 0, 0, msg->msg_len);
1852         return 0;
1853 }
1854
1855 char *
1856 lnet_msgtyp2str (int type)
1857 {
1858         switch (type) {
1859         case LNET_MSG_ACK:
1860                 return ("ACK");
1861         case LNET_MSG_PUT:
1862                 return ("PUT");
1863         case LNET_MSG_GET:
1864                 return ("GET");
1865         case LNET_MSG_REPLY:
1866                 return ("REPLY");
1867         case LNET_MSG_HELLO:
1868                 return ("HELLO");
1869         default:
1870                 return ("<UNKNOWN>");
1871         }
1872 }
1873
1874 void
1875 lnet_print_hdr(lnet_hdr_t * hdr)
1876 {
1877         lnet_process_id_t src = {/* .nid = */ hdr->src_nid,
1878                                  /* .pid = */ hdr->src_pid};
1879         lnet_process_id_t dst = {/* .nid = */ hdr->dest_nid,
1880                                  /* .pid = */ hdr->dest_pid};
1881         char *type_str = lnet_msgtyp2str (hdr->type);
1882
1883         CWARN("P3 Header at %p of type %s\n", hdr, type_str);
1884         CWARN("    From %s\n", libcfs_id2str(src));
1885         CWARN("    To   %s\n", libcfs_id2str(dst));
1886
1887         switch (hdr->type) {
1888         default:
1889                 break;
1890
1891         case LNET_MSG_PUT:
1892                 CWARN("    Ptl index %d, ack md "LPX64"."LPX64", "
1893                       "match bits "LPU64"\n",
1894                       hdr->msg.put.ptl_index,
1895                       hdr->msg.put.ack_wmd.wh_interface_cookie,
1896                       hdr->msg.put.ack_wmd.wh_object_cookie,
1897                       hdr->msg.put.match_bits);
1898                 CWARN("    Length %d, offset %d, hdr data "LPX64"\n",
1899                       hdr->payload_length, hdr->msg.put.offset,
1900                       hdr->msg.put.hdr_data);
1901                 break;
1902
1903         case LNET_MSG_GET:
1904                 CWARN("    Ptl index %d, return md "LPX64"."LPX64", "
1905                       "match bits "LPU64"\n", hdr->msg.get.ptl_index,
1906                       hdr->msg.get.return_wmd.wh_interface_cookie,
1907                       hdr->msg.get.return_wmd.wh_object_cookie,
1908                       hdr->msg.get.match_bits);
1909                 CWARN("    Length %d, src offset %d\n",
1910                       hdr->msg.get.sink_length,
1911                       hdr->msg.get.src_offset);
1912                 break;
1913
1914         case LNET_MSG_ACK:
1915                 CWARN("    dst md "LPX64"."LPX64", "
1916                       "manipulated length %d\n",
1917                       hdr->msg.ack.dst_wmd.wh_interface_cookie,
1918                       hdr->msg.ack.dst_wmd.wh_object_cookie,
1919                       hdr->msg.ack.mlength);
1920                 break;
1921
1922         case LNET_MSG_REPLY:
1923                 CWARN("    dst md "LPX64"."LPX64", "
1924                       "length %d\n",
1925                       hdr->msg.reply.dst_wmd.wh_interface_cookie,
1926                       hdr->msg.reply.dst_wmd.wh_object_cookie,
1927                       hdr->payload_length);
1928         }
1929
1930 }
1931
1932
1933 int
1934 lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid, 
1935            void *private, int rdma_req)
1936 {
1937         int            rc = 0;
1938         int            for_me;
1939         lnet_msg_t    *msg;
1940         lnet_nid_t     dest_nid;
1941         lnet_nid_t     src_nid;
1942         __u32          payload_length;
1943         __u32          type;
1944
1945         LASSERT (!in_interrupt ());
1946
1947         type = le32_to_cpu(hdr->type);
1948         src_nid = le64_to_cpu(hdr->src_nid);
1949         dest_nid = le64_to_cpu(hdr->dest_nid);
1950         payload_length = le32_to_cpu(hdr->payload_length);
1951
1952         for_me = lnet_ptlcompat_matchnid(ni->ni_nid, dest_nid);
1953
1954         switch (type) {
1955         case LNET_MSG_ACK:
1956         case LNET_MSG_GET:
1957                 if (payload_length > 0) {
1958                         CERROR("%s, src %s: bad %s payload %d (0 expected)\n",
1959                                libcfs_nid2str(from_nid), 
1960                                libcfs_nid2str(src_nid), 
1961                                lnet_msgtyp2str(type), payload_length);
1962                         return -EPROTO;
1963                 }
1964                 break;
1965                                
1966         case LNET_MSG_PUT:
1967         case LNET_MSG_REPLY:
1968                 if (payload_length > (for_me ? LNET_MAX_PAYLOAD : LNET_MTU)) {
1969                         CERROR("%s, src %s: bad %s payload %d "
1970                                "(%d max expected)\n", 
1971                                libcfs_nid2str(from_nid),
1972                                libcfs_nid2str(src_nid), 
1973                                lnet_msgtyp2str(type),
1974                                payload_length,
1975                                for_me ? LNET_MAX_PAYLOAD : LNET_MTU);
1976                         return -EPROTO;
1977                 }
1978                 break;
1979
1980         default:
1981                 CERROR("%s, src %s: Bad message type 0x%x\n",
1982                        libcfs_nid2str(from_nid),
1983                        libcfs_nid2str(src_nid), type);
1984                 return -EPROTO;
1985         }
1986
1987         /* Regard a bad destination NID as a protocol error.  Senders should
1988          * know what they're doing; if they don't they're misconfigured, buggy
1989          * or malicious so we chop them off at the knees :) */
1990
1991         if (!for_me) {
1992                 if (the_lnet.ln_ptlcompat > 0) {
1993                         /* portals compatibility is single-network */
1994                         CERROR ("%s, src %s: Bad dest nid %s "
1995                                 "(routing not supported)\n",
1996                                 libcfs_nid2str(from_nid),
1997                                 libcfs_nid2str(src_nid),
1998                                 libcfs_nid2str(dest_nid));
1999                         return -EPROTO;
2000                 }
2001
2002                 if (the_lnet.ln_ptlcompat == 0 &&
2003                     LNET_NIDNET(dest_nid) == LNET_NIDNET(ni->ni_nid)) {
2004                         /* should have gone direct */
2005                         CERROR ("%s, src %s: Bad dest nid %s "
2006                                 "(should have been sent direct)\n",
2007                                 libcfs_nid2str(from_nid),
2008                                 libcfs_nid2str(src_nid),
2009                                 libcfs_nid2str(dest_nid));
2010                         return -EPROTO;
2011                 }
2012
2013                 if (the_lnet.ln_ptlcompat == 0 &&
2014                     lnet_islocalnid(dest_nid)) {
2015                         /* dest is another local NI; sender should have used
2016                          * this node's NID on its own network */
2017                         CERROR ("%s, src %s: Bad dest nid %s "
2018                                 "(it's my nid but on a different network)\n",
2019                                 libcfs_nid2str(from_nid),
2020                                 libcfs_nid2str(src_nid),
2021                                 libcfs_nid2str(dest_nid));
2022                         return -EPROTO;
2023                 }
2024
2025                 if (rdma_req && type == LNET_MSG_GET) {
2026                         CERROR ("%s, src %s: Bad optimized GET for %s "
2027                                 "(final destination must be me)\n",
2028                                 libcfs_nid2str(from_nid),
2029                                 libcfs_nid2str(src_nid),
2030                                 libcfs_nid2str(dest_nid));
2031                         return -EPROTO;
2032                 }
2033                 
2034                 if (!the_lnet.ln_routing) {
2035                         CERROR ("%s, src %s: Dropping message for %s "
2036                                 "(routing not enabled)\n",
2037                                 libcfs_nid2str(from_nid),
2038                                 libcfs_nid2str(src_nid),
2039                                 libcfs_nid2str(dest_nid));
2040                         goto drop;
2041                 }
2042         }
2043
2044         /* Message looks OK; we're not going to return an error, so we MUST
2045          * call back lnd_recv() come what may... */
2046
2047         if (!list_empty (&the_lnet.ln_test_peers) && /* normally we don't */
2048             fail_peer (src_nid, 0))             /* shall we now? */
2049         {
2050                 CERROR("%s, src %s: Dropping %s to simulate failure\n",
2051                        libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
2052                        lnet_msgtyp2str(type));
2053                 goto drop;
2054         }
2055
2056         msg = lnet_msg_alloc();
2057         if (msg == NULL) {
2058                 CERROR("%s, src %s: Dropping %s (out of memory)\n",
2059                        libcfs_nid2str(from_nid), libcfs_nid2str(src_nid), 
2060                        lnet_msgtyp2str(type));
2061                 goto drop;
2062         }
2063
2064         /* msg zeroed in lnet_msg_alloc; i.e. flags all clear, pointers NULL etc */
2065         
2066         msg->msg_type = type;
2067         msg->msg_private = private;
2068         msg->msg_receiving = 1;
2069         msg->msg_len = msg->msg_wanted = payload_length;
2070         msg->msg_offset = 0;
2071         msg->msg_hdr = *hdr;
2072
2073         LNET_LOCK();
2074         rc = lnet_nid2peer_locked(&msg->msg_rxpeer, from_nid);
2075         if (rc != 0) {
2076                 LNET_UNLOCK();
2077                 CERROR("%s, src %s: Dropping %s "
2078                        "(error %d looking up sender)\n",
2079                        libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
2080                        lnet_msgtyp2str(type), rc);
2081                 goto free_drop;
2082         }
2083         LNET_UNLOCK();
2084
2085 #ifndef __KERNEL__
2086         LASSERT (for_me);
2087 #else
2088         if (!for_me) {
2089                 msg->msg_target.pid = le32_to_cpu(hdr->dest_pid);
2090                 msg->msg_target.nid = dest_nid;
2091                 msg->msg_routing = 1;
2092                 msg->msg_offset = 0;
2093
2094                 LNET_LOCK();
2095                 if (msg->msg_rxpeer->lp_rtrcredits <= 0 ||
2096                     lnet_msg2bufpool(msg)->rbp_credits <= 0) {
2097                         rc = lnet_eager_recv_locked(msg);
2098                         if (rc != 0) {
2099                                 LNET_UNLOCK();
2100                                 goto free_drop;
2101                         }
2102                 }
2103                 
2104                 lnet_commit_routedmsg(msg);
2105                 rc = lnet_post_routed_recv_locked(msg, 0);
2106                 LNET_UNLOCK();
2107
2108                 if (rc == 0)
2109                         lnet_ni_recv(ni, msg->msg_private, msg, 0,
2110                                      0, payload_length, payload_length);
2111                 return 0;
2112         }
2113 #endif
2114         /* convert common msg->hdr fields to host byteorder */
2115         msg->msg_hdr.type = type;
2116         msg->msg_hdr.src_nid = src_nid;
2117         msg->msg_hdr.src_pid = le32_to_cpu(msg->msg_hdr.src_pid);
2118         msg->msg_hdr.dest_nid = dest_nid;
2119         msg->msg_hdr.dest_pid = le32_to_cpu(msg->msg_hdr.dest_pid);
2120         msg->msg_hdr.payload_length = payload_length;
2121
2122         msg->msg_ev.sender = from_nid;
2123         
2124         switch (type) {
2125         case LNET_MSG_ACK:
2126                 rc = lnet_parse_ack(ni, msg);
2127                 break;
2128         case LNET_MSG_PUT:
2129                 rc = lnet_parse_put(ni, msg);
2130                 break;
2131         case LNET_MSG_GET:
2132                 rc = lnet_parse_get(ni, msg, rdma_req);
2133                 break;
2134         case LNET_MSG_REPLY:
2135                 rc = lnet_parse_reply(ni, msg);
2136                 break;
2137         default:
2138                 LASSERT(0);
2139                 goto free_drop;  /* prevent an unused label if !kernel */
2140         }
2141
2142         if (rc == 0)
2143                 return 0;
2144         
2145         LASSERT (rc == ENOENT);
2146
2147  free_drop:
2148         LASSERT (msg->msg_md == NULL);
2149         LNET_LOCK();
2150         if (msg->msg_rxpeer != NULL) {
2151                 lnet_peer_decref_locked(msg->msg_rxpeer);
2152                 msg->msg_rxpeer = NULL;
2153         }
2154         lnet_msg_free(msg);                     /* expects LNET_LOCK held */
2155         LNET_UNLOCK();
2156
2157  drop:
2158         lnet_drop_message(ni, private, payload_length);
2159         return 0;
2160 }
2161
2162 int
2163 LNetPut(lnet_nid_t self, lnet_handle_md_t mdh, lnet_ack_req_t ack,
2164         lnet_process_id_t target, unsigned int portal,
2165         __u64 match_bits, unsigned int offset, 
2166         __u64 hdr_data)
2167 {
2168         lnet_msg_t       *msg;
2169         lnet_libmd_t     *md;
2170         int               rc;
2171
2172         LASSERT (the_lnet.ln_init);
2173         LASSERT (the_lnet.ln_refcount > 0);
2174         
2175         if (!list_empty (&the_lnet.ln_test_peers) && /* normally we don't */
2176             fail_peer (target.nid, 1))          /* shall we now? */
2177         {
2178                 CERROR("Dropping PUT to %s: simulated failure\n",
2179                        libcfs_id2str(target));
2180                 return -EIO;
2181         }
2182
2183         msg = lnet_msg_alloc();
2184         if (msg == NULL) {
2185                 CERROR("Dropping PUT to %s: ENOMEM on lnet_msg_t\n",
2186                        libcfs_id2str(target));
2187                 return -ENOMEM;
2188         }
2189
2190         LNET_LOCK();
2191
2192         md = lnet_handle2md(&mdh);
2193         if (md == NULL || md->md_threshold == 0) {
2194                 lnet_msg_free(msg);
2195                 LNET_UNLOCK();
2196
2197                 CERROR("Dropping PUT to %s: MD invalid\n", 
2198                        libcfs_id2str(target));
2199                 return -ENOENT;
2200         }
2201
2202         CDEBUG(D_NET, "LNetPut -> %s\n", libcfs_id2str(target));
2203
2204         lnet_commit_md(md, msg);
2205
2206         lnet_prep_send(msg, LNET_MSG_PUT, target, 0, md->md_length);
2207
2208         msg->msg_hdr.msg.put.match_bits = cpu_to_le64(match_bits);
2209         msg->msg_hdr.msg.put.ptl_index = cpu_to_le32(portal);
2210         msg->msg_hdr.msg.put.offset = cpu_to_le32(offset);
2211         msg->msg_hdr.msg.put.hdr_data = hdr_data;
2212
2213         /* NB handles only looked up by creator (no flips) */
2214         if (ack == LNET_ACK_REQ) {
2215                 msg->msg_hdr.msg.put.ack_wmd.wh_interface_cookie = 
2216                         the_lnet.ln_interface_cookie;
2217                 msg->msg_hdr.msg.put.ack_wmd.wh_object_cookie = 
2218                         md->md_lh.lh_cookie;
2219         } else {
2220                 msg->msg_hdr.msg.put.ack_wmd = LNET_WIRE_HANDLE_NONE;
2221         }
2222
2223         msg->msg_ev.type = LNET_EVENT_SEND;
2224         msg->msg_ev.initiator.nid = LNET_NID_ANY;
2225         msg->msg_ev.initiator.pid = the_lnet.ln_pid;
2226         msg->msg_ev.target = target;
2227         msg->msg_ev.sender = LNET_NID_ANY;
2228         msg->msg_ev.pt_index = portal;
2229         msg->msg_ev.match_bits = match_bits;
2230         msg->msg_ev.rlength = md->md_length;
2231         msg->msg_ev.mlength = md->md_length;
2232         msg->msg_ev.offset = offset;
2233         msg->msg_ev.hdr_data = hdr_data;
2234
2235         lnet_md_deconstruct(md, &msg->msg_ev.md);
2236         lnet_md2handle(&msg->msg_ev.md_handle, md);
2237
2238         the_lnet.ln_counters.send_count++;
2239         the_lnet.ln_counters.send_length += md->md_length;
2240
2241         LNET_UNLOCK();
2242
2243         rc = lnet_send(self, msg);
2244         if (rc != 0) {
2245                 CERROR("Error sending PUT to %s: %d\n",
2246                        libcfs_id2str(target), rc);
2247                 lnet_finalize (NULL, msg, rc);
2248         }
2249
2250         /* completion will be signalled by an event */
2251         return 0;
2252 }
2253
2254 lnet_msg_t *
2255 lnet_create_reply_msg (lnet_ni_t *ni, lnet_msg_t *getmsg)
2256 {
2257         /* The LND can DMA direct to the GET md (i.e. no REPLY msg).  This
2258          * returns a msg for the LND to pass to lnet_finalize() when the sink
2259          * data has been received.
2260          *
2261          * CAVEAT EMPTOR: 'getmsg' is the original GET, which is freed when
2262          * lnet_finalize() is called on it, so the LND must call this first */
2263
2264         lnet_msg_t        *msg = lnet_msg_alloc();
2265         lnet_libmd_t      *getmd = getmsg->msg_md;
2266         lnet_process_id_t  peer_id = getmsg->msg_target;
2267
2268         LASSERT (!getmsg->msg_target_is_router);
2269         LASSERT (!getmsg->msg_routing);
2270
2271         LNET_LOCK();
2272
2273         LASSERT (getmd->md_refcount > 0);
2274
2275         if (msg == NULL) {
2276                 CERROR ("%s: Dropping REPLY from %s: can't allocate msg\n",
2277                         libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id));
2278                 goto drop;
2279         }
2280
2281         if (getmd->md_threshold == 0) {
2282                 CERROR ("%s: Dropping REPLY from %s for inactive MD %p\n",
2283                         libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id), 
2284                         getmd);
2285                 goto drop_msg;
2286         }
2287
2288         LASSERT (getmd->md_offset == 0);
2289
2290         CDEBUG(D_NET, "%s: Reply from %s md %p\n", 
2291                libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id), getmd);
2292
2293         lnet_commit_md (getmd, msg);
2294
2295         msg->msg_type = LNET_MSG_GET; /* flag this msg as an "optimized" GET */
2296
2297         msg->msg_ev.type = LNET_EVENT_REPLY;
2298         msg->msg_ev.initiator = peer_id;
2299         msg->msg_ev.sender = peer_id.nid;  /* optimized GETs can't be routed */
2300         msg->msg_ev.rlength = msg->msg_ev.mlength = getmd->md_length;
2301         msg->msg_ev.offset = 0;
2302
2303         lnet_md_deconstruct(getmd, &msg->msg_ev.md);
2304         lnet_md2handle(&msg->msg_ev.md_handle, getmd);
2305
2306         the_lnet.ln_counters.recv_count++;
2307         the_lnet.ln_counters.recv_length += getmd->md_length;
2308
2309         LNET_UNLOCK();
2310
2311         return msg;
2312
2313  drop_msg:
2314         lnet_msg_free(msg);
2315  drop:
2316         the_lnet.ln_counters.drop_count++;
2317         the_lnet.ln_counters.drop_length += getmd->md_length;
2318
2319         LNET_UNLOCK ();
2320
2321         return NULL;
2322 }
2323
2324 void
2325 lnet_set_reply_msg_len(lnet_ni_t *ni, lnet_msg_t *reply, unsigned int len)
2326 {
2327         /* Set the REPLY length, now the RDMA that elides the REPLY message has
2328          * completed and I know it. */
2329         LASSERT (reply != NULL);
2330         LASSERT (reply->msg_type == LNET_MSG_GET);
2331         LASSERT (reply->msg_ev.type == LNET_EVENT_REPLY);
2332
2333         /* NB I trusted my peer to RDMA.  If she tells me she's written beyond
2334          * the end of my buffer, I might as well be dead. */
2335         LASSERT (len <= reply->msg_ev.mlength);
2336         
2337         reply->msg_ev.mlength = len;
2338 }
2339
2340 int
2341 LNetGet(lnet_nid_t self, lnet_handle_md_t mdh, 
2342         lnet_process_id_t target, unsigned int portal, 
2343         __u64 match_bits, unsigned int offset)
2344 {
2345         lnet_msg_t       *msg;
2346         lnet_libmd_t     *md;
2347         int               rc;
2348
2349         LASSERT (the_lnet.ln_init);
2350         LASSERT (the_lnet.ln_refcount > 0);
2351         
2352         if (!list_empty (&the_lnet.ln_test_peers) && /* normally we don't */
2353             fail_peer (target.nid, 1))          /* shall we now? */
2354         {
2355                 CERROR("Dropping GET to %s: simulated failure\n",
2356                        libcfs_id2str(target));
2357                 return -EIO;
2358         }
2359
2360         msg = lnet_msg_alloc();
2361         if (msg == NULL) {
2362                 CERROR("Dropping GET to %s: ENOMEM on lnet_msg_t\n",
2363                        libcfs_id2str(target));
2364                 return -ENOMEM;
2365         }
2366
2367         LNET_LOCK();
2368
2369         md = lnet_handle2md(&mdh);
2370         if (md == NULL || md->md_threshold == 0) {
2371                 lnet_msg_free(msg);
2372                 LNET_UNLOCK();
2373
2374                 CERROR("Dropping GET to %s: MD invalid\n",
2375                        libcfs_id2str(target));
2376                 return -ENOENT;
2377         }
2378
2379         CDEBUG(D_NET, "LNetGet -> %s\n", libcfs_id2str(target));
2380
2381         lnet_commit_md(md, msg);
2382
2383         lnet_prep_send(msg, LNET_MSG_GET, target, 0, 0);
2384
2385         msg->msg_hdr.msg.get.match_bits = cpu_to_le64(match_bits);
2386         msg->msg_hdr.msg.get.ptl_index = cpu_to_le32(portal);
2387         msg->msg_hdr.msg.get.src_offset = cpu_to_le32(offset);
2388         msg->msg_hdr.msg.get.sink_length = cpu_to_le32(md->md_length);
2389
2390         /* NB handles only looked up by creator (no flips) */
2391         msg->msg_hdr.msg.get.return_wmd.wh_interface_cookie = 
2392                 the_lnet.ln_interface_cookie;
2393         msg->msg_hdr.msg.get.return_wmd.wh_object_cookie = 
2394                 md->md_lh.lh_cookie;
2395
2396         msg->msg_ev.type = LNET_EVENT_SEND;
2397         msg->msg_ev.initiator.nid = LNET_NID_ANY;
2398         msg->msg_ev.initiator.pid = the_lnet.ln_pid;
2399         msg->msg_ev.target = target;
2400         msg->msg_ev.sender = LNET_NID_ANY;
2401         msg->msg_ev.pt_index = portal;
2402         msg->msg_ev.match_bits = match_bits;
2403         msg->msg_ev.rlength = md->md_length;
2404         msg->msg_ev.mlength = md->md_length;
2405         msg->msg_ev.offset = offset;
2406         msg->msg_ev.hdr_data = 0;
2407
2408         lnet_md_deconstruct(md, &msg->msg_ev.md);
2409         lnet_md2handle(&msg->msg_ev.md_handle, md);
2410
2411         the_lnet.ln_counters.send_count++;
2412
2413         LNET_UNLOCK();
2414
2415         rc = lnet_send(self, msg);
2416         if (rc < 0) {
2417                 CERROR("error sending GET to %s: %d\n",
2418                        libcfs_id2str(target), rc);
2419                 lnet_finalize (NULL, msg, rc);
2420         }
2421
2422         /* completion will be signalled by an event */
2423         return 0;
2424 }
2425
2426 int
2427 LNetDist (lnet_nid_t dstnid, lnet_nid_t *srcnidp, __u32 *orderp)
2428 {
2429         struct list_head *e;
2430         lnet_ni_t        *ni;
2431         lnet_route_t     *route;
2432         lnet_remotenet_t *rnet;
2433         __u32             dstnet = LNET_NIDNET(dstnid);
2434         int               hops;
2435         __u32             order = 2;
2436
2437         /* if !local_nid_dist_zero, I don't return a distance of 0 ever
2438          * (when lustre sees a distance of 0, it substitutes 0@lo), so I
2439          * keep order 0 free for 0@lo and order 1 free for a local NID
2440          * match */
2441
2442         LASSERT (the_lnet.ln_init);
2443         LASSERT (the_lnet.ln_refcount > 0);
2444
2445         LNET_LOCK();
2446
2447         list_for_each (e, &the_lnet.ln_nis) {
2448                 ni = list_entry(e, lnet_ni_t, ni_list);
2449
2450                 if (ni->ni_nid == dstnid ||
2451                     (the_lnet.ln_ptlcompat > 0 &&
2452                      LNET_NIDNET(dstnid) == 0 &&
2453                      LNET_NIDADDR(dstnid) == LNET_NIDADDR(ni->ni_nid) &&
2454                      LNET_NETTYP(LNET_NIDNET(ni->ni_nid)) != LOLND)) {
2455                         if (srcnidp != NULL)
2456                                 *srcnidp = dstnid;
2457                         if (orderp != NULL) {
2458                                 if (LNET_NETTYP(LNET_NIDNET(dstnid)) == LOLND)
2459                                         *orderp = 0;
2460                                 else
2461                                         *orderp = 1;
2462                         }
2463                         LNET_UNLOCK();
2464
2465                         return local_nid_dist_zero ? 0 : 1;
2466                 }
2467
2468                 if (LNET_NIDNET(ni->ni_nid) == dstnet ||
2469                     (the_lnet.ln_ptlcompat > 0 &&
2470                      dstnet == 0 &&
2471                      LNET_NETTYP(LNET_NIDNET(ni->ni_nid)) != LOLND)) {
2472                         if (srcnidp != NULL)
2473                                 *srcnidp = ni->ni_nid;
2474                         if (orderp != NULL)
2475                                 *orderp = order;
2476                         LNET_UNLOCK();
2477                         return 1;
2478                 }
2479
2480                 order++;
2481         }
2482
2483         list_for_each (e, &the_lnet.ln_remote_nets) {
2484                 rnet = list_entry(e, lnet_remotenet_t, lrn_list);
2485
2486                 if (rnet->lrn_net == dstnet) {
2487                         LASSERT (!list_empty(&rnet->lrn_routes));
2488                         route = list_entry(rnet->lrn_routes.next,
2489                                            lnet_route_t, lr_list);
2490                         hops = rnet->lrn_hops;
2491                         if (srcnidp != NULL)
2492                                 *srcnidp = route->lr_gateway->lp_ni->ni_nid;
2493                         if (orderp != NULL)
2494                                 *orderp = order;
2495                         LNET_UNLOCK();
2496                         return hops + 1;
2497                 }
2498                 order++;
2499         }
2500
2501         LNET_UNLOCK();
2502         return -EHOSTUNREACH;
2503 }
2504
2505 int
2506 LNetSetAsync(lnet_process_id_t id, int nasync)
2507 {
2508 #ifdef __KERNEL__
2509         return 0;
2510 #else
2511         lnet_ni_t        *ni;
2512         lnet_remotenet_t *rnet;
2513         struct list_head *tmp;
2514         lnet_route_t     *route;
2515         lnet_nid_t       *nids;
2516         int               nnids;
2517         int               maxnids = 256;
2518         int               rc = 0;
2519         int               rc2;
2520         
2521         /* Target on a local network? */ 
2522         
2523         ni = lnet_net2ni(LNET_NIDNET(id.nid));
2524         if (ni != NULL) {
2525                 if (ni->ni_lnd->lnd_setasync != NULL) 
2526                         rc = (ni->ni_lnd->lnd_setasync)(ni, id, nasync);
2527                 lnet_ni_decref(ni);
2528                 return rc;
2529         }
2530
2531         /* Target on a remote network: apply to routers */
2532  again:
2533         LIBCFS_ALLOC(nids, maxnids * sizeof(*nids));
2534         if (nids == NULL)
2535                 return -ENOMEM;
2536         nnids = 0;
2537
2538         /* Snapshot all the router NIDs */
2539         LNET_LOCK();
2540         rnet = lnet_find_net_locked(LNET_NIDNET(id.nid));
2541         if (rnet != NULL) {
2542                 list_for_each(tmp, &rnet->lrn_routes) {
2543                         if (nnids == maxnids) {
2544                                 LNET_UNLOCK();
2545                                 LIBCFS_FREE(nids, maxnids * sizeof(*nids));
2546                                 maxnids *= 2;
2547                                 goto again;
2548                         }
2549                         
2550                         route = list_entry(tmp, lnet_route_t, lr_list);
2551                         nids[nnids++] = route->lr_gateway->lp_nid;
2552                 }
2553         }
2554         LNET_UNLOCK();
2555
2556         /* set async on all the routers */
2557         while (nnids-- > 0) {
2558                 id.pid = LUSTRE_SRV_LNET_PID;
2559                 id.nid = nids[nnids];
2560
2561                 ni = lnet_net2ni(LNET_NIDNET(id.nid));
2562                 if (ni == NULL)
2563                         continue;
2564                 
2565                 if (ni->ni_lnd->lnd_setasync != NULL) {
2566                         rc2 = (ni->ni_lnd->lnd_setasync)(ni, id, nasync);
2567                         if (rc2 != 0)
2568                                 rc = rc2;
2569                 }
2570                 lnet_ni_decref(ni);
2571         }
2572
2573         LIBCFS_FREE(nids, maxnids * sizeof(*nids));
2574         return rc;
2575 #endif
2576 }
2577