Whamcloud - gitweb
LU-1600 lnet: another race in lnet_nid2peer_locked
[fs/lustre-release.git] / lnet / lnet / api-ni.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_LNET
38 #include <lnet/lib-lnet.h>
39
40 #ifdef __KERNEL__
41 #define D_LNI D_CONSOLE
42 #else
43 #define D_LNI D_CONFIG
44 #endif
45
46 lnet_t      the_lnet;                           /* THE state of the network */
47 EXPORT_SYMBOL(the_lnet);
48
49 #ifdef __KERNEL__
50
51 static char *ip2nets = "";
52 CFS_MODULE_PARM(ip2nets, "s", charp, 0444,
53                 "LNET network <- IP table");
54
55 static char *networks = "";
56 CFS_MODULE_PARM(networks, "s", charp, 0444,
57                 "local networks");
58
59 static char *routes = "";
60 CFS_MODULE_PARM(routes, "s", charp, 0444,
61                 "routes to non-local networks");
62
63 char *
64 lnet_get_routes(void)
65 {
66         return routes;
67 }
68
69 char *
70 lnet_get_networks(void)
71 {
72         char   *nets;
73         int     rc;
74
75         if (*networks != 0 && *ip2nets != 0) {
76                 LCONSOLE_ERROR_MSG(0x101, "Please specify EITHER 'networks' or "
77                                    "'ip2nets' but not both at once\n");
78                 return NULL;
79         }
80
81         if (*ip2nets != 0) {
82                 rc = lnet_parse_ip2nets(&nets, ip2nets);
83                 return (rc == 0) ? nets : NULL;
84         }
85
86         if (*networks != 0)
87                 return networks;
88
89         return "tcp";
90 }
91
92 void
93 lnet_init_locks(void)
94 {
95         cfs_spin_lock_init(&the_lnet.ln_eq_wait_lock);
96         cfs_waitq_init(&the_lnet.ln_eq_waitq);
97         cfs_mutex_init(&the_lnet.ln_lnd_mutex);
98         cfs_mutex_init(&the_lnet.ln_api_mutex);
99 }
100
101 void
102 lnet_fini_locks(void)
103 {
104 }
105
106 #else
107
108 char *
109 lnet_get_routes(void)
110 {
111         char *str = getenv("LNET_ROUTES");
112
113         return (str == NULL) ? "" : str;
114 }
115
116 char *
117 lnet_get_networks (void)
118 {
119         static char       default_networks[256];
120         char             *networks = getenv ("LNET_NETWORKS");
121         char             *ip2nets  = getenv ("LNET_IP2NETS");
122         char             *str;
123         char             *sep;
124         int               len;
125         int               nob;
126         int               rc;
127         cfs_list_t       *tmp;
128
129 #ifdef NOT_YET
130         if (networks != NULL && ip2nets != NULL) {
131                 LCONSOLE_ERROR_MSG(0x103, "Please set EITHER 'LNET_NETWORKS' or"
132                                    " 'LNET_IP2NETS' but not both at once\n");
133                 return NULL;
134         }
135
136         if (ip2nets != NULL) {
137                 rc = lnet_parse_ip2nets(&networks, ip2nets);
138                 return (rc == 0) ? networks : NULL;
139         }
140 #else
141         SET_BUT_UNUSED(ip2nets);
142         SET_BUT_UNUSED(rc);
143 #endif
144         if (networks != NULL)
145                 return networks;
146
147         /* In userland, the default 'networks=' is the list of known net types */
148
149         len = sizeof(default_networks);
150         str = default_networks;
151         *str = 0;
152         sep = "";
153
154         cfs_list_for_each (tmp, &the_lnet.ln_lnds) {
155                 lnd_t *lnd = cfs_list_entry(tmp, lnd_t, lnd_list);
156
157                 nob = snprintf(str, len, "%s%s", sep,
158                                libcfs_lnd2str(lnd->lnd_type));
159                 len -= nob;
160                 if (len < 0) {
161                         /* overflowed the string; leave it where it was */
162                         *str = 0;
163                         break;
164                 }
165
166                 str += nob;
167                 sep = ",";
168         }
169
170         return default_networks;
171 }
172
173 # ifndef HAVE_LIBPTHREAD
174
175 void lnet_init_locks(void)
176 {
177         the_lnet.ln_eq_wait_lock = 0;
178         the_lnet.ln_lnd_mutex = 0;
179         the_lnet.ln_api_mutex = 0;
180 }
181
182 void lnet_fini_locks(void)
183 {
184         LASSERT(the_lnet.ln_api_mutex == 0);
185         LASSERT(the_lnet.ln_lnd_mutex == 0);
186         LASSERT(the_lnet.ln_eq_wait_lock == 0);
187 }
188
189 # else
190
191 void lnet_init_locks(void)
192 {
193         pthread_cond_init(&the_lnet.ln_eq_cond, NULL);
194         pthread_mutex_init(&the_lnet.ln_eq_wait_lock, NULL);
195         pthread_mutex_init(&the_lnet.ln_lnd_mutex, NULL);
196         pthread_mutex_init(&the_lnet.ln_api_mutex, NULL);
197 }
198
199 void lnet_fini_locks(void)
200 {
201         pthread_mutex_destroy(&the_lnet.ln_api_mutex);
202         pthread_mutex_destroy(&the_lnet.ln_lnd_mutex);
203         pthread_mutex_destroy(&the_lnet.ln_eq_wait_lock);
204         pthread_cond_destroy(&the_lnet.ln_eq_cond);
205 }
206
207 # endif
208 #endif
209
210 static int
211 lnet_create_locks(void)
212 {
213         lnet_init_locks();
214
215         the_lnet.ln_res_lock = cfs_percpt_lock_alloc(lnet_cpt_table());
216         if (the_lnet.ln_res_lock == NULL)
217                 goto failed;
218
219         the_lnet.ln_net_lock = cfs_percpt_lock_alloc(lnet_cpt_table());
220         if (the_lnet.ln_net_lock == NULL)
221                 goto failed;
222
223         return 0;
224
225  failed:
226         lnet_fini_locks();
227         return -ENOMEM;
228 }
229
230 static void
231 lnet_destroy_locks(void)
232 {
233         if (the_lnet.ln_res_lock != NULL) {
234                 cfs_percpt_lock_free(the_lnet.ln_res_lock);
235                 the_lnet.ln_res_lock = NULL;
236         }
237
238         if (the_lnet.ln_net_lock != NULL) {
239                 cfs_percpt_lock_free(the_lnet.ln_net_lock);
240                 the_lnet.ln_net_lock = NULL;
241         }
242
243         lnet_fini_locks();
244 }
245
246 void lnet_assert_wire_constants (void)
247 {
248         /* Wire protocol assertions generated by 'wirecheck'
249          * running on Linux robert.bartonsoftware.com 2.6.8-1.521
250          * #1 Mon Aug 16 09:01:18 EDT 2004 i686 athlon i386 GNU/Linux
251          * with gcc version 3.3.3 20040412 (Red Hat Linux 3.3.3-7) */
252
253         /* Constants... */
254         CLASSERT (LNET_PROTO_TCP_MAGIC == 0xeebc0ded);
255         CLASSERT (LNET_PROTO_TCP_VERSION_MAJOR == 1);
256         CLASSERT (LNET_PROTO_TCP_VERSION_MINOR == 0);
257         CLASSERT (LNET_MSG_ACK == 0);
258         CLASSERT (LNET_MSG_PUT == 1);
259         CLASSERT (LNET_MSG_GET == 2);
260         CLASSERT (LNET_MSG_REPLY == 3);
261         CLASSERT (LNET_MSG_HELLO == 4);
262
263         /* Checks for struct ptl_handle_wire_t */
264         CLASSERT ((int)sizeof(lnet_handle_wire_t) == 16);
265         CLASSERT ((int)offsetof(lnet_handle_wire_t, wh_interface_cookie) == 0);
266         CLASSERT ((int)sizeof(((lnet_handle_wire_t *)0)->wh_interface_cookie) == 8);
267         CLASSERT ((int)offsetof(lnet_handle_wire_t, wh_object_cookie) == 8);
268         CLASSERT ((int)sizeof(((lnet_handle_wire_t *)0)->wh_object_cookie) == 8);
269
270         /* Checks for struct lnet_magicversion_t */
271         CLASSERT ((int)sizeof(lnet_magicversion_t) == 8);
272         CLASSERT ((int)offsetof(lnet_magicversion_t, magic) == 0);
273         CLASSERT ((int)sizeof(((lnet_magicversion_t *)0)->magic) == 4);
274         CLASSERT ((int)offsetof(lnet_magicversion_t, version_major) == 4);
275         CLASSERT ((int)sizeof(((lnet_magicversion_t *)0)->version_major) == 2);
276         CLASSERT ((int)offsetof(lnet_magicversion_t, version_minor) == 6);
277         CLASSERT ((int)sizeof(((lnet_magicversion_t *)0)->version_minor) == 2);
278
279         /* Checks for struct lnet_hdr_t */
280         CLASSERT ((int)sizeof(lnet_hdr_t) == 72);
281         CLASSERT ((int)offsetof(lnet_hdr_t, dest_nid) == 0);
282         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->dest_nid) == 8);
283         CLASSERT ((int)offsetof(lnet_hdr_t, src_nid) == 8);
284         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->src_nid) == 8);
285         CLASSERT ((int)offsetof(lnet_hdr_t, dest_pid) == 16);
286         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->dest_pid) == 4);
287         CLASSERT ((int)offsetof(lnet_hdr_t, src_pid) == 20);
288         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->src_pid) == 4);
289         CLASSERT ((int)offsetof(lnet_hdr_t, type) == 24);
290         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->type) == 4);
291         CLASSERT ((int)offsetof(lnet_hdr_t, payload_length) == 28);
292         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->payload_length) == 4);
293         CLASSERT ((int)offsetof(lnet_hdr_t, msg) == 32);
294         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->msg) == 40);
295
296         /* Ack */
297         CLASSERT ((int)offsetof(lnet_hdr_t, msg.ack.dst_wmd) == 32);
298         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->msg.ack.dst_wmd) == 16);
299         CLASSERT ((int)offsetof(lnet_hdr_t, msg.ack.match_bits) == 48);
300         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->msg.ack.match_bits) == 8);
301         CLASSERT ((int)offsetof(lnet_hdr_t, msg.ack.mlength) == 56);
302         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->msg.ack.mlength) == 4);
303
304         /* Put */
305         CLASSERT ((int)offsetof(lnet_hdr_t, msg.put.ack_wmd) == 32);
306         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->msg.put.ack_wmd) == 16);
307         CLASSERT ((int)offsetof(lnet_hdr_t, msg.put.match_bits) == 48);
308         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->msg.put.match_bits) == 8);
309         CLASSERT ((int)offsetof(lnet_hdr_t, msg.put.hdr_data) == 56);
310         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->msg.put.hdr_data) == 8);
311         CLASSERT ((int)offsetof(lnet_hdr_t, msg.put.ptl_index) == 64);
312         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->msg.put.ptl_index) == 4);
313         CLASSERT ((int)offsetof(lnet_hdr_t, msg.put.offset) == 68);
314         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->msg.put.offset) == 4);
315
316         /* Get */
317         CLASSERT ((int)offsetof(lnet_hdr_t, msg.get.return_wmd) == 32);
318         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->msg.get.return_wmd) == 16);
319         CLASSERT ((int)offsetof(lnet_hdr_t, msg.get.match_bits) == 48);
320         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->msg.get.match_bits) == 8);
321         CLASSERT ((int)offsetof(lnet_hdr_t, msg.get.ptl_index) == 56);
322         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->msg.get.ptl_index) == 4);
323         CLASSERT ((int)offsetof(lnet_hdr_t, msg.get.src_offset) == 60);
324         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->msg.get.src_offset) == 4);
325         CLASSERT ((int)offsetof(lnet_hdr_t, msg.get.sink_length) == 64);
326         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->msg.get.sink_length) == 4);
327
328         /* Reply */
329         CLASSERT ((int)offsetof(lnet_hdr_t, msg.reply.dst_wmd) == 32);
330         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->msg.reply.dst_wmd) == 16);
331
332         /* Hello */
333         CLASSERT ((int)offsetof(lnet_hdr_t, msg.hello.incarnation) == 32);
334         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->msg.hello.incarnation) == 8);
335         CLASSERT ((int)offsetof(lnet_hdr_t, msg.hello.type) == 40);
336         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->msg.hello.type) == 4);
337 }
338
339 lnd_t *
340 lnet_find_lnd_by_type (int type)
341 {
342         lnd_t              *lnd;
343         cfs_list_t         *tmp;
344
345         /* holding lnd mutex */
346         cfs_list_for_each (tmp, &the_lnet.ln_lnds) {
347                 lnd = cfs_list_entry(tmp, lnd_t, lnd_list);
348
349                 if ((int)lnd->lnd_type == type)
350                         return lnd;
351         }
352
353         return NULL;
354 }
355
356 void
357 lnet_register_lnd (lnd_t *lnd)
358 {
359         LNET_MUTEX_LOCK(&the_lnet.ln_lnd_mutex);
360
361         LASSERT (the_lnet.ln_init);
362         LASSERT (libcfs_isknown_lnd(lnd->lnd_type));
363         LASSERT (lnet_find_lnd_by_type(lnd->lnd_type) == NULL);
364
365         cfs_list_add_tail (&lnd->lnd_list, &the_lnet.ln_lnds);
366         lnd->lnd_refcount = 0;
367
368         CDEBUG(D_NET, "%s LND registered\n", libcfs_lnd2str(lnd->lnd_type));
369
370         LNET_MUTEX_UNLOCK(&the_lnet.ln_lnd_mutex);
371 }
372 EXPORT_SYMBOL(lnet_register_lnd);
373
374 void
375 lnet_unregister_lnd (lnd_t *lnd)
376 {
377         LNET_MUTEX_LOCK(&the_lnet.ln_lnd_mutex);
378
379         LASSERT (the_lnet.ln_init);
380         LASSERT (lnet_find_lnd_by_type(lnd->lnd_type) == lnd);
381         LASSERT (lnd->lnd_refcount == 0);
382
383         cfs_list_del (&lnd->lnd_list);
384         CDEBUG(D_NET, "%s LND unregistered\n", libcfs_lnd2str(lnd->lnd_type));
385
386         LNET_MUTEX_UNLOCK(&the_lnet.ln_lnd_mutex);
387 }
388 EXPORT_SYMBOL(lnet_unregister_lnd);
389
390 void
391 lnet_counters_get(lnet_counters_t *counters)
392 {
393         lnet_counters_t *ctr;
394         int             i;
395
396         memset(counters, 0, sizeof(*counters));
397
398         lnet_net_lock(LNET_LOCK_EX);
399
400         cfs_percpt_for_each(ctr, i, the_lnet.ln_counters) {
401                 counters->msgs_max     += ctr->msgs_max;
402                 counters->msgs_alloc   += ctr->msgs_alloc;
403                 counters->errors       += ctr->errors;
404                 counters->send_count   += ctr->send_count;
405                 counters->recv_count   += ctr->recv_count;
406                 counters->route_count  += ctr->route_count;
407                 counters->drop_length  += ctr->drop_length;
408                 counters->send_length  += ctr->send_length;
409                 counters->recv_length  += ctr->recv_length;
410                 counters->route_length += ctr->route_length;
411                 counters->drop_length  += ctr->drop_length;
412
413         }
414         lnet_net_unlock(LNET_LOCK_EX);
415 }
416 EXPORT_SYMBOL(lnet_counters_get);
417
418 void
419 lnet_counters_reset(void)
420 {
421         lnet_counters_t *counters;
422         int             i;
423
424         lnet_net_lock(LNET_LOCK_EX);
425
426         cfs_percpt_for_each(counters, i, the_lnet.ln_counters)
427                 memset(counters, 0, sizeof(lnet_counters_t));
428
429         lnet_net_unlock(LNET_LOCK_EX);
430 }
431 EXPORT_SYMBOL(lnet_counters_reset);
432
433 #ifdef LNET_USE_LIB_FREELIST
434
435 int
436 lnet_freelist_init (lnet_freelist_t *fl, int n, int size)
437 {
438         char *space;
439
440         LASSERT (n > 0);
441
442         size += offsetof (lnet_freeobj_t, fo_contents);
443
444         LIBCFS_ALLOC(space, n * size);
445         if (space == NULL)
446                 return (-ENOMEM);
447
448         CFS_INIT_LIST_HEAD (&fl->fl_list);
449         fl->fl_objs = space;
450         fl->fl_nobjs = n;
451         fl->fl_objsize = size;
452
453         do
454         {
455                 memset (space, 0, size);
456                 cfs_list_add ((cfs_list_t *)space, &fl->fl_list);
457                 space += size;
458         } while (--n != 0);
459
460         return (0);
461 }
462
463 void
464 lnet_freelist_fini (lnet_freelist_t *fl)
465 {
466         cfs_list_t       *el;
467         int               count;
468
469         if (fl->fl_nobjs == 0)
470                 return;
471
472         count = 0;
473         for (el = fl->fl_list.next; el != &fl->fl_list; el = el->next)
474                 count++;
475
476         LASSERT (count == fl->fl_nobjs);
477
478         LIBCFS_FREE(fl->fl_objs, fl->fl_nobjs * fl->fl_objsize);
479         memset (fl, 0, sizeof (*fl));
480 }
481
482 #endif /* LNET_USE_LIB_FREELIST */
483
484 __u64
485 lnet_create_interface_cookie (void)
486 {
487         /* NB the interface cookie in wire handles guards against delayed
488          * replies and ACKs appearing valid after reboot. Initialisation time,
489          * even if it's only implemented to millisecond resolution is probably
490          * easily good enough. */
491         struct timeval tv;
492         __u64          cookie;
493 #ifndef __KERNEL__
494         int            rc = gettimeofday (&tv, NULL);
495         LASSERT (rc == 0);
496 #else
497         cfs_gettimeofday(&tv);
498 #endif
499         cookie = tv.tv_sec;
500         cookie *= 1000000;
501         cookie += tv.tv_usec;
502         return cookie;
503 }
504
505 static char *
506 lnet_res_type2str(int type)
507 {
508         switch (type) {
509         default:
510                 LBUG();
511         case LNET_COOKIE_TYPE_MD:
512                 return "MD";
513         case LNET_COOKIE_TYPE_ME:
514                 return "ME";
515         case LNET_COOKIE_TYPE_EQ:
516                 return "EQ";
517         }
518 }
519
520 void
521 lnet_res_container_cleanup(struct lnet_res_container *rec)
522 {
523         int     count = 0;
524
525         if (rec->rec_type == 0) /* not set yet, it's uninitialized */
526                 return;
527
528         while (!cfs_list_empty(&rec->rec_active)) {
529                 cfs_list_t *e = rec->rec_active.next;
530
531                 cfs_list_del_init(e);
532                 if (rec->rec_type == LNET_COOKIE_TYPE_EQ) {
533                         lnet_eq_free(cfs_list_entry(e, lnet_eq_t, eq_list));
534
535                 } else if (rec->rec_type == LNET_COOKIE_TYPE_MD) {
536                         lnet_md_free(cfs_list_entry(e, lnet_libmd_t, md_list));
537
538                 } else { /* NB: Active MEs should be attached on portals */
539                         LBUG();
540                 }
541                 count++;
542         }
543
544         if (count > 0) {
545                 /* Found alive MD/ME/EQ, user really should unlink/free
546                  * all of them before finalize LNet, but if someone didn't,
547                  * we have to recycle garbage for him */
548                 CERROR("%d active elements on exit of %s container\n",
549                        count, lnet_res_type2str(rec->rec_type));
550         }
551
552 #ifdef LNET_USE_LIB_FREELIST
553         lnet_freelist_fini(&rec->rec_freelist);
554 #endif
555         if (rec->rec_lh_hash != NULL) {
556                 LIBCFS_FREE(rec->rec_lh_hash,
557                             LNET_LH_HASH_SIZE * sizeof(rec->rec_lh_hash[0]));
558                 rec->rec_lh_hash = NULL;
559         }
560
561         rec->rec_type = 0; /* mark it as finalized */
562 }
563
564 int
565 lnet_res_container_setup(struct lnet_res_container *rec,
566                          int cpt, int type, int objnum, int objsz)
567 {
568         int     rc = 0;
569         int     i;
570
571         LASSERT(rec->rec_type == 0);
572
573         rec->rec_type = type;
574         CFS_INIT_LIST_HEAD(&rec->rec_active);
575
576 #ifdef LNET_USE_LIB_FREELIST
577         memset(&rec->rec_freelist, 0, sizeof(rec->rec_freelist));
578         rc = lnet_freelist_init(&rec->rec_freelist, objnum, objsz);
579         if (rc != 0)
580                 goto out;
581 #endif
582         rec->rec_lh_cookie = (cpt << LNET_COOKIE_TYPE_BITS) | type;
583
584         /* Arbitrary choice of hash table size */
585         LIBCFS_CPT_ALLOC(rec->rec_lh_hash, lnet_cpt_table(), cpt,
586                          LNET_LH_HASH_SIZE * sizeof(rec->rec_lh_hash[0]));
587         if (rec->rec_lh_hash == NULL) {
588                 rc = -ENOMEM;
589                 goto out;
590         }
591
592         for (i = 0; i < LNET_LH_HASH_SIZE; i++)
593                 CFS_INIT_LIST_HEAD(&rec->rec_lh_hash[i]);
594
595         return 0;
596
597 out:
598         CERROR("Failed to setup %s resource container\n",
599                lnet_res_type2str(type));
600         lnet_res_container_cleanup(rec);
601         return rc;
602 }
603
604 static void
605 lnet_res_containers_destroy(struct lnet_res_container **recs)
606 {
607         struct lnet_res_container       *rec;
608         int                             i;
609
610         cfs_percpt_for_each(rec, i, recs)
611                 lnet_res_container_cleanup(rec);
612
613         cfs_percpt_free(recs);
614 }
615
616 static struct lnet_res_container **
617 lnet_res_containers_create(int type, int objnum, int objsz)
618 {
619         struct lnet_res_container       **recs;
620         struct lnet_res_container       *rec;
621         int                             rc;
622         int                             i;
623
624         recs = cfs_percpt_alloc(lnet_cpt_table(), sizeof(*rec));
625         if (recs == NULL) {
626                 CERROR("Failed to allocate %s resource containers\n",
627                        lnet_res_type2str(type));
628                 return NULL;
629         }
630
631         cfs_percpt_for_each(rec, i, recs) {
632                 rc = lnet_res_container_setup(rec, i, type, objnum, objsz);
633                 if (rc != 0) {
634                         lnet_res_containers_destroy(recs);
635                         return NULL;
636                 }
637         }
638
639         return recs;
640 }
641
642 lnet_libhandle_t *
643 lnet_res_lh_lookup(struct lnet_res_container *rec, __u64 cookie)
644 {
645         /* ALWAYS called with lnet_res_lock held */
646         cfs_list_t              *head;
647         lnet_libhandle_t        *lh;
648         unsigned int            hash;
649
650         if ((cookie & LNET_COOKIE_MASK) != rec->rec_type)
651                 return NULL;
652
653         hash = cookie >> (LNET_COOKIE_TYPE_BITS + LNET_CPT_BITS);
654         head = &rec->rec_lh_hash[hash & LNET_LH_HASH_MASK];
655
656         cfs_list_for_each_entry(lh, head, lh_hash_chain) {
657                 if (lh->lh_cookie == cookie)
658                         return lh;
659         }
660
661         return NULL;
662 }
663
664 void
665 lnet_res_lh_initialize(struct lnet_res_container *rec, lnet_libhandle_t *lh)
666 {
667         /* ALWAYS called with lnet_res_lock held */
668         unsigned int    ibits = LNET_COOKIE_TYPE_BITS + LNET_CPT_BITS;
669         unsigned int    hash;
670
671         lh->lh_cookie = rec->rec_lh_cookie;
672         rec->rec_lh_cookie += 1 << ibits;
673
674         hash = (lh->lh_cookie >> ibits) & LNET_LH_HASH_MASK;
675
676         cfs_list_add(&lh->lh_hash_chain, &rec->rec_lh_hash[hash]);
677 }
678
679 #ifndef __KERNEL__
680 /**
681  * Reserved API - do not use.
682  * Temporary workaround to allow uOSS and test programs force server
683  * mode in userspace. See comments near ln_server_mode_flag in
684  * lnet/lib-types.h */
685
686 void
687 lnet_server_mode() {
688         the_lnet.ln_server_mode_flag = 1;
689 }
690 #endif
691
692 int lnet_unprepare(void);
693
694 int
695 lnet_prepare(lnet_pid_t requested_pid)
696 {
697         /* Prepare to bring up the network */
698         struct lnet_res_container **recs;
699         int                       rc = 0;
700
701         LASSERT (the_lnet.ln_refcount == 0);
702
703         the_lnet.ln_routing = 0;
704
705 #ifdef __KERNEL__
706         LASSERT ((requested_pid & LNET_PID_USERFLAG) == 0);
707         the_lnet.ln_pid = requested_pid;
708 #else
709         if (the_lnet.ln_server_mode_flag) {/* server case (uOSS) */
710                 LASSERT ((requested_pid & LNET_PID_USERFLAG) == 0);
711
712                 if (cfs_curproc_uid())/* Only root can run user-space server */
713                         return -EPERM;
714                 the_lnet.ln_pid = requested_pid;
715
716         } else {/* client case (liblustre) */
717
718                 /* My PID must be unique on this node and flag I'm userspace */
719                 the_lnet.ln_pid = getpid() | LNET_PID_USERFLAG;
720         }
721 #endif
722
723         CFS_INIT_LIST_HEAD(&the_lnet.ln_test_peers);
724         CFS_INIT_LIST_HEAD(&the_lnet.ln_nis);
725         CFS_INIT_LIST_HEAD(&the_lnet.ln_nis_cpt);
726         CFS_INIT_LIST_HEAD(&the_lnet.ln_nis_zombie);
727         CFS_INIT_LIST_HEAD(&the_lnet.ln_remote_nets);
728         CFS_INIT_LIST_HEAD(&the_lnet.ln_routers);
729
730         the_lnet.ln_interface_cookie = lnet_create_interface_cookie();
731
732         the_lnet.ln_counters = cfs_percpt_alloc(lnet_cpt_table(),
733                                                 sizeof(lnet_counters_t));
734         if (the_lnet.ln_counters == NULL) {
735                 CERROR("Failed to allocate counters for LNet\n");
736                 rc = -ENOMEM;
737                 goto failed;
738         }
739
740         rc = lnet_peer_tables_create();
741         if (rc != 0)
742                 goto failed;
743
744         rc = lnet_msg_containers_create();
745         if (rc != 0)
746                 goto failed;
747
748         rc = lnet_res_container_setup(&the_lnet.ln_eq_container, 0,
749                                       LNET_COOKIE_TYPE_EQ, LNET_FL_MAX_EQS,
750                                       sizeof(lnet_eq_t));
751         if (rc != 0)
752                 goto failed;
753
754         recs = lnet_res_containers_create(LNET_COOKIE_TYPE_ME, LNET_FL_MAX_MES,
755                                           sizeof(lnet_me_t));
756         if (recs == NULL)
757                 goto failed;
758
759         the_lnet.ln_me_containers = recs;
760
761         recs = lnet_res_containers_create(LNET_COOKIE_TYPE_MD, LNET_FL_MAX_MDS,
762                                           sizeof(lnet_libmd_t));
763         if (recs == NULL)
764                 goto failed;
765
766         the_lnet.ln_md_containers = recs;
767
768         rc = lnet_portals_create();
769         if (rc != 0) {
770                 CERROR("Failed to create portals for LNet: %d\n", rc);
771                 goto failed;
772         }
773
774         return 0;
775
776  failed:
777         lnet_unprepare();
778         return rc;
779 }
780
781 int
782 lnet_unprepare (void)
783 {
784         /* NB no LNET_LOCK since this is the last reference.  All LND instances
785          * have shut down already, so it is safe to unlink and free all
786          * descriptors, even those that appear committed to a network op (eg MD
787          * with non-zero pending count) */
788
789         lnet_fail_nid(LNET_NID_ANY, 0);
790
791         LASSERT(the_lnet.ln_refcount == 0);
792         LASSERT(cfs_list_empty(&the_lnet.ln_test_peers));
793         LASSERT(cfs_list_empty(&the_lnet.ln_nis));
794         LASSERT(cfs_list_empty(&the_lnet.ln_nis_cpt));
795         LASSERT(cfs_list_empty(&the_lnet.ln_nis_zombie));
796
797         lnet_portals_destroy();
798
799         if (the_lnet.ln_md_containers != NULL) {
800                 lnet_res_containers_destroy(the_lnet.ln_md_containers);
801                 the_lnet.ln_md_containers = NULL;
802         }
803
804         if (the_lnet.ln_me_containers != NULL) {
805                 lnet_res_containers_destroy(the_lnet.ln_me_containers);
806                 the_lnet.ln_me_containers = NULL;
807         }
808
809         lnet_res_container_cleanup(&the_lnet.ln_eq_container);
810
811         lnet_msg_containers_destroy();
812         lnet_peer_tables_destroy();
813         lnet_rtrpools_free();
814
815         if (the_lnet.ln_counters != NULL) {
816                 cfs_percpt_free(the_lnet.ln_counters);
817                 the_lnet.ln_counters = NULL;
818         }
819
820         return 0;
821 }
822
823 lnet_ni_t  *
824 lnet_net2ni_locked(__u32 net, int cpt)
825 {
826         cfs_list_t      *tmp;
827         lnet_ni_t       *ni;
828
829         LASSERT(cpt != LNET_LOCK_EX);
830
831         cfs_list_for_each(tmp, &the_lnet.ln_nis) {
832                 ni = cfs_list_entry(tmp, lnet_ni_t, ni_list);
833
834                 if (LNET_NIDNET(ni->ni_nid) == net) {
835                         lnet_ni_addref_locked(ni, cpt);
836                         return ni;
837                 }
838         }
839
840         return NULL;
841 }
842
843 lnet_ni_t *
844 lnet_net2ni(__u32 net)
845 {
846         lnet_ni_t *ni;
847
848         lnet_net_lock(0);
849         ni = lnet_net2ni_locked(net, 0);
850         lnet_net_unlock(0);
851
852         return ni;
853 }
854 EXPORT_SYMBOL(lnet_net2ni);
855
856 static unsigned int
857 lnet_nid_cpt_hash(lnet_nid_t nid, unsigned int number)
858 {
859         __u64           key = nid;
860         unsigned int    val;
861
862         LASSERT(number >= 1 && number <= LNET_CPT_NUMBER);
863
864         if (number == 1)
865                 return 0;
866
867         val = cfs_hash_long(key, LNET_CPT_BITS);
868         /* NB: LNET_CP_NUMBER doesn't have to be PO2 */
869         if (val < number)
870                 return val;
871
872         return (unsigned int)(key + val + (val >> 1)) % number;
873 }
874
875 int
876 lnet_cpt_of_nid_locked(lnet_nid_t nid)
877 {
878         struct lnet_ni *ni;
879
880         /* must called with hold of lnet_net_lock */
881         if (LNET_CPT_NUMBER == 1)
882                 return 0; /* the only one */
883
884         /* take lnet_net_lock(any) would be OK */
885         if (!cfs_list_empty(&the_lnet.ln_nis_cpt)) {
886                 cfs_list_for_each_entry(ni, &the_lnet.ln_nis_cpt, ni_cptlist) {
887                         if (LNET_NIDNET(ni->ni_nid) != LNET_NIDNET(nid))
888                                 continue;
889
890                         LASSERT(ni->ni_cpts != NULL);
891                         return ni->ni_cpts[lnet_nid_cpt_hash
892                                            (nid, ni->ni_ncpts)];
893                 }
894         }
895
896         return lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
897 }
898
899 int
900 lnet_cpt_of_nid(lnet_nid_t nid)
901 {
902         int     cpt;
903         int     cpt2;
904
905         if (LNET_CPT_NUMBER == 1)
906                 return 0; /* the only one */
907
908         if (cfs_list_empty(&the_lnet.ln_nis_cpt))
909                 return lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
910
911         cpt = lnet_net_lock_current();
912         cpt2 = lnet_cpt_of_nid_locked(nid);
913         lnet_net_unlock(cpt);
914
915         return cpt2;
916 }
917 EXPORT_SYMBOL(lnet_cpt_of_nid);
918
919 int
920 lnet_islocalnet(__u32 net)
921 {
922         struct lnet_ni  *ni;
923         int             cpt;
924
925         cpt = lnet_net_lock_current();
926
927         ni = lnet_net2ni_locked(net, cpt);
928         if (ni != NULL)
929                 lnet_ni_decref_locked(ni, cpt);
930
931         lnet_net_unlock(cpt);
932
933         return ni != NULL;
934 }
935
936 lnet_ni_t  *
937 lnet_nid2ni_locked(lnet_nid_t nid, int cpt)
938 {
939         struct lnet_ni  *ni;
940         cfs_list_t      *tmp;
941
942         LASSERT(cpt != LNET_LOCK_EX);
943
944         cfs_list_for_each(tmp, &the_lnet.ln_nis) {
945                 ni = cfs_list_entry(tmp, lnet_ni_t, ni_list);
946
947                 if (ni->ni_nid == nid) {
948                         lnet_ni_addref_locked(ni, cpt);
949                         return ni;
950                 }
951         }
952
953         return NULL;
954 }
955
956 int
957 lnet_islocalnid(lnet_nid_t nid)
958 {
959         struct lnet_ni  *ni;
960         int             cpt;
961
962         cpt = lnet_net_lock_current();
963         ni = lnet_nid2ni_locked(nid, cpt);
964         if (ni != NULL)
965                 lnet_ni_decref_locked(ni, cpt);
966         lnet_net_unlock(cpt);
967
968         return ni != NULL;
969 }
970
971 int
972 lnet_count_acceptor_nis (void)
973 {
974         /* Return the # of NIs that need the acceptor. */
975         int             count = 0;
976 #if defined(__KERNEL__) || defined(HAVE_LIBPTHREAD)
977         cfs_list_t      *tmp;
978         struct lnet_ni  *ni;
979         int             cpt;
980
981         cpt = lnet_net_lock_current();
982         cfs_list_for_each(tmp, &the_lnet.ln_nis) {
983                 ni = cfs_list_entry(tmp, lnet_ni_t, ni_list);
984
985                 if (ni->ni_lnd->lnd_accept != NULL)
986                         count++;
987         }
988
989         lnet_net_unlock(cpt);
990
991 #endif /* defined(__KERNEL__) || defined(HAVE_LIBPTHREAD) */
992         return count;
993 }
994
995 static int
996 lnet_ni_tq_credits(lnet_ni_t *ni)
997 {
998         int     credits;
999
1000         LASSERT(ni->ni_ncpts >= 1);
1001
1002         if (ni->ni_ncpts == 1)
1003                 return ni->ni_maxtxcredits;
1004
1005         credits = ni->ni_maxtxcredits / ni->ni_ncpts;
1006         credits = max(credits, 8 * ni->ni_peertxcredits);
1007         credits = min(credits, ni->ni_maxtxcredits);
1008
1009         return credits;
1010 }
1011
1012 void
1013 lnet_shutdown_lndnis (void)
1014 {
1015         int                i;
1016         int                islo;
1017         lnet_ni_t         *ni;
1018
1019         /* NB called holding the global mutex */
1020
1021         /* All quiet on the API front */
1022         LASSERT(!the_lnet.ln_shutdown);
1023         LASSERT(the_lnet.ln_refcount == 0);
1024         LASSERT(cfs_list_empty(&the_lnet.ln_nis_zombie));
1025         LASSERT(cfs_list_empty(&the_lnet.ln_remote_nets));
1026
1027         lnet_net_lock(LNET_LOCK_EX);
1028         the_lnet.ln_shutdown = 1;       /* flag shutdown */
1029
1030         /* Unlink NIs from the global table */
1031         while (!cfs_list_empty(&the_lnet.ln_nis)) {
1032                 ni = cfs_list_entry(the_lnet.ln_nis.next,
1033                                     lnet_ni_t, ni_list);
1034                 /* move it to zombie list and nobody can find it anymore */
1035                 cfs_list_move(&ni->ni_list, &the_lnet.ln_nis_zombie);
1036                 lnet_ni_decref_locked(ni, 0);   /* drop ln_nis' ref */
1037
1038                 if (!cfs_list_empty(&ni->ni_cptlist)) {
1039                         cfs_list_del_init(&ni->ni_cptlist);
1040                         lnet_ni_decref_locked(ni, 0);
1041                 }
1042         }
1043
1044         /* Drop the cached eqwait NI. */
1045         if (the_lnet.ln_eq_waitni != NULL) {
1046                 lnet_ni_decref_locked(the_lnet.ln_eq_waitni, 0);
1047                 the_lnet.ln_eq_waitni = NULL;
1048         }
1049
1050         /* Drop the cached loopback NI. */
1051         if (the_lnet.ln_loni != NULL) {
1052                 lnet_ni_decref_locked(the_lnet.ln_loni, 0);
1053                 the_lnet.ln_loni = NULL;
1054         }
1055
1056         lnet_net_unlock(LNET_LOCK_EX);
1057
1058         /* Clear lazy portals and drop delayed messages which hold refs
1059          * on their lnet_msg_t::msg_rxpeer */
1060         for (i = 0; i < the_lnet.ln_nportals; i++)
1061                 LNetClearLazyPortal(i);
1062
1063         /* Clear the peer table and wait for all peers to go (they hold refs on
1064          * their NIs) */
1065         lnet_peer_tables_cleanup();
1066
1067         lnet_net_lock(LNET_LOCK_EX);
1068         /* Now wait for the NI's I just nuked to show up on ln_zombie_nis
1069          * and shut them down in guaranteed thread context */
1070         i = 2;
1071         while (!cfs_list_empty(&the_lnet.ln_nis_zombie)) {
1072                 int     *ref;
1073                 int     j;
1074
1075                 ni = cfs_list_entry(the_lnet.ln_nis_zombie.next,
1076                                     lnet_ni_t, ni_list);
1077                 cfs_list_del_init(&ni->ni_list);
1078                 cfs_percpt_for_each(ref, j, ni->ni_refs) {
1079                         if (*ref == 0)
1080                                 continue;
1081                         /* still busy, add it back to zombie list */
1082                         cfs_list_add(&ni->ni_list, &the_lnet.ln_nis_zombie);
1083                         break;
1084                 }
1085
1086                 while (!cfs_list_empty(&ni->ni_list)) {
1087                         lnet_net_unlock(LNET_LOCK_EX);
1088                         ++i;
1089                         if ((i & (-i)) == i) {
1090                                 CDEBUG(D_WARNING,
1091                                        "Waiting for zombie LNI %s\n",
1092                                        libcfs_nid2str(ni->ni_nid));
1093                         }
1094                         cfs_pause(cfs_time_seconds(1));
1095                         lnet_net_lock(LNET_LOCK_EX);
1096                         continue;
1097                 }
1098
1099                 ni->ni_lnd->lnd_refcount--;
1100                 lnet_net_unlock(LNET_LOCK_EX);
1101
1102                 islo = ni->ni_lnd->lnd_type == LOLND;
1103
1104                 LASSERT (!cfs_in_interrupt ());
1105                 (ni->ni_lnd->lnd_shutdown)(ni);
1106
1107                 /* can't deref lnd anymore now; it might have unregistered
1108                  * itself...  */
1109
1110                 if (!islo)
1111                         CDEBUG(D_LNI, "Removed LNI %s\n",
1112                                libcfs_nid2str(ni->ni_nid));
1113
1114                 lnet_ni_free(ni);
1115                 lnet_net_lock(LNET_LOCK_EX);
1116         }
1117
1118         the_lnet.ln_shutdown = 0;
1119         lnet_net_unlock(LNET_LOCK_EX);
1120
1121         if (the_lnet.ln_network_tokens != NULL) {
1122                 LIBCFS_FREE(the_lnet.ln_network_tokens,
1123                             the_lnet.ln_network_tokens_nob);
1124                 the_lnet.ln_network_tokens = NULL;
1125         }
1126 }
1127
1128 int
1129 lnet_startup_lndnis (void)
1130 {
1131         lnd_t                   *lnd;
1132         struct lnet_ni          *ni;
1133         struct lnet_tx_queue    *tq;
1134         cfs_list_t              nilist;
1135         int                     i;
1136         int                rc = 0;
1137         int                lnd_type;
1138         int                nicount = 0;
1139         char              *nets = lnet_get_networks();
1140
1141         CFS_INIT_LIST_HEAD(&nilist);
1142
1143         if (nets == NULL)
1144                 goto failed;
1145
1146         rc = lnet_parse_networks(&nilist, nets);
1147         if (rc != 0)
1148                 goto failed;
1149
1150         while (!cfs_list_empty(&nilist)) {
1151                 ni = cfs_list_entry(nilist.next, lnet_ni_t, ni_list);
1152                 lnd_type = LNET_NETTYP(LNET_NIDNET(ni->ni_nid));
1153
1154                 LASSERT (libcfs_isknown_lnd(lnd_type));
1155
1156                 if (lnd_type == CIBLND    ||
1157                     lnd_type == OPENIBLND ||
1158                     lnd_type == IIBLND    ||
1159                     lnd_type == VIBLND) {
1160                         CERROR("LND %s obsoleted\n",
1161                                libcfs_lnd2str(lnd_type));
1162                         goto failed;
1163                 }
1164
1165                 LNET_MUTEX_LOCK(&the_lnet.ln_lnd_mutex);
1166                 lnd = lnet_find_lnd_by_type(lnd_type);
1167
1168 #ifdef __KERNEL__
1169                 if (lnd == NULL) {
1170                         LNET_MUTEX_UNLOCK(&the_lnet.ln_lnd_mutex);
1171                         rc = cfs_request_module("%s",
1172                                                 libcfs_lnd2modname(lnd_type));
1173                         LNET_MUTEX_LOCK(&the_lnet.ln_lnd_mutex);
1174
1175                         lnd = lnet_find_lnd_by_type(lnd_type);
1176                         if (lnd == NULL) {
1177                                 LNET_MUTEX_UNLOCK(&the_lnet.ln_lnd_mutex);
1178                                 CERROR("Can't load LND %s, module %s, rc=%d\n",
1179                                        libcfs_lnd2str(lnd_type),
1180                                        libcfs_lnd2modname(lnd_type), rc);
1181 #ifndef HAVE_MODULE_LOADING_SUPPORT
1182                                 LCONSOLE_ERROR_MSG(0x104, "Your kernel must be "
1183                                          "compiled with kernel module "
1184                                          "loading support.");
1185 #endif
1186                                 goto failed;
1187                         }
1188                 }
1189 #else
1190                 if (lnd == NULL) {
1191                         LNET_MUTEX_UNLOCK(&the_lnet.ln_lnd_mutex);
1192                         CERROR("LND %s not supported\n",
1193                                libcfs_lnd2str(lnd_type));
1194                         goto failed;
1195                 }
1196 #endif
1197
1198                 lnet_net_lock(LNET_LOCK_EX);
1199                 lnd->lnd_refcount++;
1200                 lnet_net_unlock(LNET_LOCK_EX);
1201
1202                 ni->ni_lnd = lnd;
1203
1204                 rc = (lnd->lnd_startup)(ni);
1205
1206                 LNET_MUTEX_UNLOCK(&the_lnet.ln_lnd_mutex);
1207
1208                 if (rc != 0) {
1209                         LCONSOLE_ERROR_MSG(0x105, "Error %d starting up LNI %s"
1210                                            "\n",
1211                                            rc, libcfs_lnd2str(lnd->lnd_type));
1212                         lnet_net_lock(LNET_LOCK_EX);
1213                         lnd->lnd_refcount--;
1214                         lnet_net_unlock(LNET_LOCK_EX);
1215                         goto failed;
1216                 }
1217
1218                 LASSERT (ni->ni_peertimeout <= 0 || lnd->lnd_query != NULL);
1219
1220                 cfs_list_del(&ni->ni_list);
1221
1222                 lnet_net_lock(LNET_LOCK_EX);
1223                 /* refcount for ln_nis */
1224                 lnet_ni_addref_locked(ni, 0);
1225                 cfs_list_add_tail(&ni->ni_list, &the_lnet.ln_nis);
1226                 if (ni->ni_cpts != NULL) {
1227                         cfs_list_add_tail(&ni->ni_cptlist,
1228                                           &the_lnet.ln_nis_cpt);
1229                         lnet_ni_addref_locked(ni, 0);
1230                 }
1231
1232                 lnet_net_unlock(LNET_LOCK_EX);
1233
1234                 if (lnd->lnd_type == LOLND) {
1235                         lnet_ni_addref(ni);
1236                         LASSERT (the_lnet.ln_loni == NULL);
1237                         the_lnet.ln_loni = ni;
1238                         continue;
1239                 }
1240
1241 #ifndef __KERNEL__
1242                 if (lnd->lnd_wait != NULL) {
1243                         if (the_lnet.ln_eq_waitni == NULL) {
1244                                 lnet_ni_addref(ni);
1245                                 the_lnet.ln_eq_waitni = ni;
1246                         }
1247                 } else {
1248 # ifndef HAVE_LIBPTHREAD
1249                         LCONSOLE_ERROR_MSG(0x106, "LND %s not supported in a "
1250                                            "single-threaded runtime\n",
1251                                            libcfs_lnd2str(lnd_type));
1252                         goto failed;
1253 # endif
1254                 }
1255 #endif
1256                 if (ni->ni_peertxcredits == 0 ||
1257                     ni->ni_maxtxcredits == 0) {
1258                         LCONSOLE_ERROR_MSG(0x107, "LNI %s has no %scredits\n",
1259                                            libcfs_lnd2str(lnd->lnd_type),
1260                                            ni->ni_peertxcredits == 0 ?
1261                                            "" : "per-peer ");
1262                         goto failed;
1263                 }
1264
1265                 cfs_percpt_for_each(tq, i, ni->ni_tx_queues) {
1266                         tq->tq_credits_min =
1267                         tq->tq_credits_max =
1268                         tq->tq_credits = lnet_ni_tq_credits(ni);
1269                 }
1270
1271                 CDEBUG(D_LNI, "Added LNI %s [%d/%d/%d/%d]\n",
1272                        libcfs_nid2str(ni->ni_nid), ni->ni_peertxcredits,
1273                        lnet_ni_tq_credits(ni) * LNET_CPT_NUMBER,
1274                        ni->ni_peerrtrcredits, ni->ni_peertimeout);
1275
1276                 nicount++;
1277         }
1278
1279         if (the_lnet.ln_eq_waitni != NULL && nicount > 1) {
1280                 lnd_type = the_lnet.ln_eq_waitni->ni_lnd->lnd_type;
1281                 LCONSOLE_ERROR_MSG(0x109, "LND %s can only run single-network"
1282                                    "\n",
1283                                    libcfs_lnd2str(lnd_type));
1284                 goto failed;
1285         }
1286
1287         return 0;
1288
1289  failed:
1290         lnet_shutdown_lndnis();
1291
1292         while (!cfs_list_empty(&nilist)) {
1293                 ni = cfs_list_entry(nilist.next, lnet_ni_t, ni_list);
1294                 cfs_list_del(&ni->ni_list);
1295                 lnet_ni_free(ni);
1296         }
1297
1298         return -ENETDOWN;
1299 }
1300
1301 /**
1302  * Initialize LNet library.
1303  *
1304  * Only userspace program needs to call this function - it's automatically
1305  * called in the kernel at module loading time. Caller has to call LNetFini()
1306  * after a call to LNetInit(), if and only if the latter returned 0. It must
1307  * be called exactly once.
1308  *
1309  * \return 0 on success, and -ve on failures.
1310  */
1311 int
1312 LNetInit(void)
1313 {
1314         int     rc;
1315
1316         lnet_assert_wire_constants();
1317         LASSERT(!the_lnet.ln_init);
1318
1319         memset(&the_lnet, 0, sizeof(the_lnet));
1320
1321         /* refer to global cfs_cpt_table for now */
1322         the_lnet.ln_cpt_table   = cfs_cpt_table;
1323         the_lnet.ln_cpt_number  = cfs_cpt_number(cfs_cpt_table);
1324
1325         LASSERT(the_lnet.ln_cpt_number > 0);
1326         if (the_lnet.ln_cpt_number > LNET_CPT_MAX) {
1327                 /* we are under risk of consuming all lh_cookie */
1328                 CERROR("Can't have %d CPTs for LNet (max allowed is %d), "
1329                        "please change setting of CPT-table and retry\n",
1330                        the_lnet.ln_cpt_number, LNET_CPT_MAX);
1331                 return -1;
1332         }
1333
1334         while ((1 << the_lnet.ln_cpt_bits) < the_lnet.ln_cpt_number)
1335                 the_lnet.ln_cpt_bits++;
1336
1337         rc = lnet_create_locks();
1338         if (rc != 0) {
1339                 CERROR("Can't create LNet global locks: %d\n", rc);
1340                 return -1;
1341         }
1342
1343         the_lnet.ln_refcount = 0;
1344         the_lnet.ln_init = 1;
1345         LNetInvalidateHandle(&the_lnet.ln_rc_eqh);
1346         CFS_INIT_LIST_HEAD(&the_lnet.ln_lnds);
1347         CFS_INIT_LIST_HEAD(&the_lnet.ln_rcd_zombie);
1348         CFS_INIT_LIST_HEAD(&the_lnet.ln_rcd_deathrow);
1349
1350 #ifdef __KERNEL__
1351         /* All LNDs apart from the LOLND are in separate modules.  They
1352          * register themselves when their module loads, and unregister
1353          * themselves when their module is unloaded. */
1354 #else
1355         /* Register LNDs
1356          * NB the order here determines default 'networks=' order */
1357 # ifdef HAVE_LIBPTHREAD
1358         LNET_REGISTER_ULND(the_tcplnd);
1359 # endif
1360 #endif
1361         lnet_register_lnd(&the_lolnd);
1362         return 0;
1363 }
1364 EXPORT_SYMBOL(LNetInit);
1365
1366 /**
1367  * Finalize LNet library.
1368  *
1369  * Only userspace program needs to call this function. It can be called
1370  * at most once.
1371  *
1372  * \pre LNetInit() called with success.
1373  * \pre All LNet users called LNetNIFini() for matching LNetNIInit() calls.
1374  */
1375 void
1376 LNetFini(void)
1377 {
1378         LASSERT(the_lnet.ln_init);
1379         LASSERT(the_lnet.ln_refcount == 0);
1380
1381         while (!cfs_list_empty(&the_lnet.ln_lnds))
1382                 lnet_unregister_lnd(cfs_list_entry(the_lnet.ln_lnds.next,
1383                                                    lnd_t, lnd_list));
1384         lnet_destroy_locks();
1385
1386         the_lnet.ln_init = 0;
1387 }
1388 EXPORT_SYMBOL(LNetFini);
1389
1390 /**
1391  * Set LNet PID and start LNet interfaces, routing, and forwarding.
1392  *
1393  * Userspace program should call this after a successful call to LNetInit().
1394  * Users must call this function at least once before any other functions.
1395  * For each successful call there must be a corresponding call to
1396  * LNetNIFini(). For subsequent calls to LNetNIInit(), \a requested_pid is
1397  * ignored.
1398  *
1399  * The PID used by LNet may be different from the one requested.
1400  * See LNetGetId().
1401  *
1402  * \param requested_pid PID requested by the caller.
1403  *
1404  * \return >= 0 on success, and < 0 error code on failures.
1405  */
1406 int
1407 LNetNIInit(lnet_pid_t requested_pid)
1408 {
1409         int         im_a_router = 0;
1410         int         rc;
1411
1412         LNET_MUTEX_LOCK(&the_lnet.ln_api_mutex);
1413
1414         LASSERT (the_lnet.ln_init);
1415         CDEBUG(D_OTHER, "refs %d\n", the_lnet.ln_refcount);
1416
1417         if (the_lnet.ln_refcount > 0) {
1418                 rc = the_lnet.ln_refcount++;
1419                 goto out;
1420         }
1421
1422         lnet_get_tunables();
1423
1424         if (requested_pid == LNET_PID_ANY) {
1425                 /* Don't instantiate LNET just for me */
1426                 rc = -ENETDOWN;
1427                 goto failed0;
1428         }
1429
1430         rc = lnet_prepare(requested_pid);
1431         if (rc != 0)
1432                 goto failed0;
1433
1434         rc = lnet_startup_lndnis();
1435         if (rc != 0)
1436                 goto failed1;
1437
1438         rc = lnet_parse_routes(lnet_get_routes(), &im_a_router);
1439         if (rc != 0)
1440                 goto failed2;
1441
1442         rc = lnet_check_routes();
1443         if (rc != 0)
1444                 goto failed2;
1445
1446         rc = lnet_rtrpools_alloc(im_a_router);
1447         if (rc != 0)
1448                 goto failed2;
1449
1450         rc = lnet_acceptor_start();
1451         if (rc != 0)
1452                 goto failed2;
1453
1454         the_lnet.ln_refcount = 1;
1455         /* Now I may use my own API functions... */
1456
1457         /* NB router checker needs the_lnet.ln_ping_info in
1458          * lnet_router_checker -> lnet_update_ni_status_locked */
1459         rc = lnet_ping_target_init();
1460         if (rc != 0)
1461                 goto failed3;
1462
1463         rc = lnet_router_checker_start();
1464         if (rc != 0)
1465                 goto failed4;
1466
1467         lnet_proc_init();
1468         goto out;
1469
1470  failed4:
1471         lnet_ping_target_fini();
1472  failed3:
1473         the_lnet.ln_refcount = 0;
1474         lnet_acceptor_stop();
1475  failed2:
1476         lnet_destroy_routes();
1477         lnet_shutdown_lndnis();
1478  failed1:
1479         lnet_unprepare();
1480  failed0:
1481         LASSERT (rc < 0);
1482  out:
1483         LNET_MUTEX_UNLOCK(&the_lnet.ln_api_mutex);
1484         return rc;
1485 }
1486 EXPORT_SYMBOL(LNetNIInit);
1487
1488 /**
1489  * Stop LNet interfaces, routing, and forwarding.
1490  *
1491  * Users must call this function once for each successful call to LNetNIInit().
1492  * Once the LNetNIFini() operation has been started, the results of pending
1493  * API operations are undefined.
1494  *
1495  * \return always 0 for current implementation.
1496  */
1497 int
1498 LNetNIFini()
1499 {
1500         LNET_MUTEX_LOCK(&the_lnet.ln_api_mutex);
1501
1502         LASSERT (the_lnet.ln_init);
1503         LASSERT (the_lnet.ln_refcount > 0);
1504
1505         if (the_lnet.ln_refcount != 1) {
1506                 the_lnet.ln_refcount--;
1507         } else {
1508                 LASSERT (!the_lnet.ln_niinit_self);
1509
1510                 lnet_proc_fini();
1511                 lnet_router_checker_stop();
1512                 lnet_ping_target_fini();
1513
1514                 /* Teardown fns that use my own API functions BEFORE here */
1515                 the_lnet.ln_refcount = 0;
1516
1517                 lnet_acceptor_stop();
1518                 lnet_destroy_routes();
1519                 lnet_shutdown_lndnis();
1520                 lnet_unprepare();
1521         }
1522
1523         LNET_MUTEX_UNLOCK(&the_lnet.ln_api_mutex);
1524         return 0;
1525 }
1526 EXPORT_SYMBOL(LNetNIFini);
1527
1528 /**
1529  * This is an ugly hack to export IOC_LIBCFS_DEBUG_PEER and
1530  * IOC_LIBCFS_PORTALS_COMPATIBILITY commands to users, by tweaking the LNet
1531  * internal ioctl handler.
1532  *
1533  * IOC_LIBCFS_PORTALS_COMPATIBILITY is now deprecated, don't use it.
1534  *
1535  * \param cmd IOC_LIBCFS_DEBUG_PEER to print debugging data about a peer.
1536  * The data will be printed to system console. Don't use it excessively.
1537  * \param arg A pointer to lnet_process_id_t, process ID of the peer.
1538  *
1539  * \return Always return 0 when called by users directly (i.e., not via ioctl).
1540  */
1541 int
1542 LNetCtl(unsigned int cmd, void *arg)
1543 {
1544         struct libcfs_ioctl_data *data = arg;
1545         lnet_process_id_t         id = {0};
1546         lnet_ni_t                *ni;
1547         int                       rc;
1548
1549         LASSERT (the_lnet.ln_init);
1550         LASSERT (the_lnet.ln_refcount > 0);
1551
1552         switch (cmd) {
1553         case IOC_LIBCFS_GET_NI:
1554                 rc = LNetGetId(data->ioc_count, &id);
1555                 data->ioc_nid = id.nid;
1556                 return rc;
1557
1558         case IOC_LIBCFS_FAIL_NID:
1559                 return lnet_fail_nid(data->ioc_nid, data->ioc_count);
1560
1561         case IOC_LIBCFS_ADD_ROUTE:
1562                 rc = lnet_add_route(data->ioc_net, data->ioc_count,
1563                                     data->ioc_nid);
1564                 return (rc != 0) ? rc : lnet_check_routes();
1565
1566         case IOC_LIBCFS_DEL_ROUTE:
1567                 return lnet_del_route(data->ioc_net, data->ioc_nid);
1568
1569         case IOC_LIBCFS_GET_ROUTE:
1570                 return lnet_get_route(data->ioc_count,
1571                                       &data->ioc_net, &data->ioc_count,
1572                                       &data->ioc_nid, &data->ioc_flags);
1573         case IOC_LIBCFS_NOTIFY_ROUTER:
1574                 return lnet_notify(NULL, data->ioc_nid, data->ioc_flags,
1575                                    cfs_time_current() -
1576                                    cfs_time_seconds(cfs_time_current_sec() -
1577                                                     (time_t)data->ioc_u64[0]));
1578
1579         case IOC_LIBCFS_PORTALS_COMPATIBILITY:
1580                 /* This can be removed once lustre stops calling it */
1581                 return 0;
1582
1583         case IOC_LIBCFS_LNET_DIST:
1584                 rc = LNetDist(data->ioc_nid, &data->ioc_nid, &data->ioc_u32[1]);
1585                 if (rc < 0 && rc != -EHOSTUNREACH)
1586                         return rc;
1587
1588                 data->ioc_u32[0] = rc;
1589                 return 0;
1590
1591         case IOC_LIBCFS_TESTPROTOCOMPAT:
1592                 lnet_net_lock(LNET_LOCK_EX);
1593                 the_lnet.ln_testprotocompat = data->ioc_flags;
1594                 lnet_net_unlock(LNET_LOCK_EX);
1595                 return 0;
1596
1597         case IOC_LIBCFS_PING:
1598                 id.nid = data->ioc_nid;
1599                 id.pid = data->ioc_u32[0];
1600                 rc = lnet_ping(id, data->ioc_u32[1], /* timeout */
1601                                (lnet_process_id_t *)data->ioc_pbuf1,
1602                                data->ioc_plen1/sizeof(lnet_process_id_t));
1603                 if (rc < 0)
1604                         return rc;
1605                 data->ioc_count = rc;
1606                 return 0;
1607
1608         case IOC_LIBCFS_DEBUG_PEER: {
1609                 /* CAVEAT EMPTOR: this one designed for calling directly; not
1610                  * via an ioctl */
1611                 id = *((lnet_process_id_t *) arg);
1612
1613                 lnet_debug_peer(id.nid);
1614
1615                 ni = lnet_net2ni(LNET_NIDNET(id.nid));
1616                 if (ni == NULL) {
1617                         CDEBUG(D_WARNING, "No NI for %s\n", libcfs_id2str(id));
1618                 } else {
1619                         if (ni->ni_lnd->lnd_ctl == NULL) {
1620                                 CDEBUG(D_WARNING, "No ctl for %s\n",
1621                                        libcfs_id2str(id));
1622                         } else {
1623                                 (void)ni->ni_lnd->lnd_ctl(ni, cmd, arg);
1624                         }
1625
1626                         lnet_ni_decref(ni);
1627                 }
1628                 return 0;
1629         }
1630
1631         default:
1632                 ni = lnet_net2ni(data->ioc_net);
1633                 if (ni == NULL)
1634                         return -EINVAL;
1635
1636                 if (ni->ni_lnd->lnd_ctl == NULL)
1637                         rc = -EINVAL;
1638                 else
1639                         rc = ni->ni_lnd->lnd_ctl(ni, cmd, arg);
1640
1641                 lnet_ni_decref(ni);
1642                 return rc;
1643         }
1644         /* not reached */
1645 }
1646 EXPORT_SYMBOL(LNetCtl);
1647
1648 /**
1649  * Retrieve the lnet_process_id_t ID of LNet interface at \a index. Note that
1650  * all interfaces share a same PID, as requested by LNetNIInit().
1651  *
1652  * \param index Index of the interface to look up.
1653  * \param id On successful return, this location will hold the
1654  * lnet_process_id_t ID of the interface.
1655  *
1656  * \retval 0 If an interface exists at \a index.
1657  * \retval -ENOENT If no interface has been found.
1658  */
1659 int
1660 LNetGetId(unsigned int index, lnet_process_id_t *id)
1661 {
1662         struct lnet_ni  *ni;
1663         cfs_list_t      *tmp;
1664         int             cpt;
1665         int             rc = -ENOENT;
1666
1667         LASSERT(the_lnet.ln_init);
1668         LASSERT(the_lnet.ln_refcount > 0);
1669
1670         cpt = lnet_net_lock_current();
1671
1672         cfs_list_for_each(tmp, &the_lnet.ln_nis) {
1673                 if (index-- != 0)
1674                         continue;
1675
1676                 ni = cfs_list_entry(tmp, lnet_ni_t, ni_list);
1677
1678                 id->nid = ni->ni_nid;
1679                 id->pid = the_lnet.ln_pid;
1680                 rc = 0;
1681                 break;
1682         }
1683
1684         lnet_net_unlock(cpt);
1685         return rc;
1686 }
1687 EXPORT_SYMBOL(LNetGetId);
1688
1689 /**
1690  * Print a string representation of handle \a h into buffer \a str of
1691  * \a len bytes.
1692  */
1693 void
1694 LNetSnprintHandle(char *str, int len, lnet_handle_any_t h)
1695 {
1696         snprintf(str, len, LPX64, h.cookie);
1697 }
1698 EXPORT_SYMBOL(LNetSnprintHandle);
1699
1700 static int
1701 lnet_create_ping_info(void)
1702 {
1703         int               i;
1704         int               n;
1705         int               rc;
1706         unsigned int      infosz;
1707         lnet_ni_t        *ni;
1708         lnet_process_id_t id;
1709         lnet_ping_info_t *pinfo;
1710
1711         for (n = 0; ; n++) {
1712                 rc = LNetGetId(n, &id);
1713                 if (rc == -ENOENT)
1714                         break;
1715
1716                 LASSERT (rc == 0);
1717         }
1718
1719         infosz = offsetof(lnet_ping_info_t, pi_ni[n]);
1720         LIBCFS_ALLOC(pinfo, infosz);
1721         if (pinfo == NULL) {
1722                 CERROR("Can't allocate ping info[%d]\n", n);
1723                 return -ENOMEM;
1724         }
1725
1726         pinfo->pi_nnis    = n;
1727         pinfo->pi_pid     = the_lnet.ln_pid;
1728         pinfo->pi_magic   = LNET_PROTO_PING_MAGIC;
1729         pinfo->pi_features = LNET_PING_FEAT_NI_STATUS;
1730
1731         for (i = 0; i < n; i++) {
1732                 lnet_ni_status_t *ns = &pinfo->pi_ni[i];
1733
1734                 rc = LNetGetId(i, &id);
1735                 LASSERT (rc == 0);
1736
1737                 ns->ns_nid    = id.nid;
1738                 ns->ns_status = LNET_NI_STATUS_UP;
1739
1740                 lnet_net_lock(0);
1741
1742                 ni = lnet_nid2ni_locked(id.nid, 0);
1743                 LASSERT(ni != NULL);
1744
1745                 lnet_ni_lock(ni);
1746                 LASSERT(ni->ni_status == NULL);
1747                 ni->ni_status = ns;
1748                 lnet_ni_unlock(ni);
1749
1750                 lnet_ni_decref_locked(ni, 0);
1751                 lnet_net_unlock(0);
1752         }
1753
1754         the_lnet.ln_ping_info = pinfo;
1755         return 0;
1756 }
1757
1758 static void
1759 lnet_destroy_ping_info(void)
1760 {
1761         struct lnet_ni  *ni;
1762
1763         lnet_net_lock(0);
1764
1765         cfs_list_for_each_entry(ni, &the_lnet.ln_nis, ni_list) {
1766                 lnet_ni_lock(ni);
1767                 ni->ni_status = NULL;
1768                 lnet_ni_unlock(ni);
1769         }
1770
1771         lnet_net_unlock(0);
1772
1773         LIBCFS_FREE(the_lnet.ln_ping_info,
1774                     offsetof(lnet_ping_info_t,
1775                              pi_ni[the_lnet.ln_ping_info->pi_nnis]));
1776         the_lnet.ln_ping_info = NULL;
1777         return;
1778 }
1779
1780 int
1781 lnet_ping_target_init(void)
1782 {
1783         lnet_md_t         md = {0};
1784         lnet_handle_me_t  meh;
1785         lnet_process_id_t id;
1786         int               rc;
1787         int               rc2;
1788         int               infosz;
1789
1790         rc = lnet_create_ping_info();
1791         if (rc != 0)
1792                 return rc;
1793
1794         /* We can have a tiny EQ since we only need to see the unlink event on
1795          * teardown, which by definition is the last one! */
1796         rc = LNetEQAlloc(2, LNET_EQ_HANDLER_NONE, &the_lnet.ln_ping_target_eq);
1797         if (rc != 0) {
1798                 CERROR("Can't allocate ping EQ: %d\n", rc);
1799                 goto failed_0;
1800         }
1801
1802         memset(&id, 0, sizeof(lnet_process_id_t));
1803         id.nid = LNET_NID_ANY;
1804         id.pid = LNET_PID_ANY;
1805
1806         rc = LNetMEAttach(LNET_RESERVED_PORTAL, id,
1807                           LNET_PROTO_PING_MATCHBITS, 0,
1808                           LNET_UNLINK, LNET_INS_AFTER,
1809                           &meh);
1810         if (rc != 0) {
1811                 CERROR("Can't create ping ME: %d\n", rc);
1812                 goto failed_1;
1813         }
1814
1815         /* initialize md content */
1816         infosz = offsetof(lnet_ping_info_t,
1817                           pi_ni[the_lnet.ln_ping_info->pi_nnis]);
1818         md.start     = the_lnet.ln_ping_info;
1819         md.length    = infosz;
1820         md.threshold = LNET_MD_THRESH_INF;
1821         md.max_size  = 0;
1822         md.options   = LNET_MD_OP_GET | LNET_MD_TRUNCATE |
1823                        LNET_MD_MANAGE_REMOTE;
1824         md.user_ptr  = NULL;
1825         md.eq_handle = the_lnet.ln_ping_target_eq;
1826
1827         rc = LNetMDAttach(meh, md,
1828                           LNET_RETAIN,
1829                           &the_lnet.ln_ping_target_md);
1830         if (rc != 0) {
1831                 CERROR("Can't attach ping MD: %d\n", rc);
1832                 goto failed_2;
1833         }
1834
1835         return 0;
1836
1837  failed_2:
1838         rc2 = LNetMEUnlink(meh);
1839         LASSERT (rc2 == 0);
1840  failed_1:
1841         rc2 = LNetEQFree(the_lnet.ln_ping_target_eq);
1842         LASSERT (rc2 == 0);
1843  failed_0:
1844         lnet_destroy_ping_info();
1845         return rc;
1846 }
1847
1848 void
1849 lnet_ping_target_fini(void)
1850 {
1851         lnet_event_t    event;
1852         int             rc;
1853         int             which;
1854         int             timeout_ms = 1000;
1855         cfs_sigset_t    blocked = cfs_block_allsigs();
1856
1857         LNetMDUnlink(the_lnet.ln_ping_target_md);
1858         /* NB md could be busy; this just starts the unlink */
1859
1860         for (;;) {
1861                 rc = LNetEQPoll(&the_lnet.ln_ping_target_eq, 1,
1862                                 timeout_ms, &event, &which);
1863
1864                 /* I expect overflow... */
1865                 LASSERT (rc >= 0 || rc == -EOVERFLOW);
1866
1867                 if (rc == 0) {
1868                         /* timed out: provide a diagnostic */
1869                         CWARN("Still waiting for ping MD to unlink\n");
1870                         timeout_ms *= 2;
1871                         continue;
1872                 }
1873
1874                 /* Got a valid event */
1875                 if (event.unlinked)
1876                         break;
1877         }
1878
1879         rc = LNetEQFree(the_lnet.ln_ping_target_eq);
1880         LASSERT (rc == 0);
1881         lnet_destroy_ping_info();
1882         cfs_restore_sigs(blocked);
1883 }
1884
1885 int
1886 lnet_ping (lnet_process_id_t id, int timeout_ms, lnet_process_id_t *ids, int n_ids)
1887 {
1888         lnet_handle_eq_t     eqh;
1889         lnet_handle_md_t     mdh;
1890         lnet_event_t         event;
1891         lnet_md_t            md = {0};
1892         int                  which;
1893         int                  unlinked = 0;
1894         int                  replied = 0;
1895         const int            a_long_time = 60000; /* mS */
1896         int                  infosz = offsetof(lnet_ping_info_t, pi_ni[n_ids]);
1897         lnet_ping_info_t    *info;
1898         lnet_process_id_t    tmpid;
1899         int                  i;
1900         int                  nob;
1901         int                  rc;
1902         int                  rc2;
1903         cfs_sigset_t         blocked;
1904
1905         if (n_ids <= 0 ||
1906             id.nid == LNET_NID_ANY ||
1907             timeout_ms > 500000 ||              /* arbitrary limit! */
1908             n_ids > 20)                         /* arbitrary limit! */
1909                 return -EINVAL;
1910
1911         if (id.pid == LNET_PID_ANY)
1912                 id.pid = LUSTRE_SRV_LNET_PID;
1913
1914         LIBCFS_ALLOC(info, infosz);
1915         if (info == NULL)
1916                 return -ENOMEM;
1917
1918         /* NB 2 events max (including any unlink event) */
1919         rc = LNetEQAlloc(2, LNET_EQ_HANDLER_NONE, &eqh);
1920         if (rc != 0) {
1921                 CERROR("Can't allocate EQ: %d\n", rc);
1922                 goto out_0;
1923         }
1924
1925         /* initialize md content */
1926         md.start     = info;
1927         md.length    = infosz;
1928         md.threshold = 2; /*GET/REPLY*/
1929         md.max_size  = 0;
1930         md.options   = LNET_MD_TRUNCATE;
1931         md.user_ptr  = NULL;
1932         md.eq_handle = eqh;
1933
1934         rc = LNetMDBind(md, LNET_UNLINK, &mdh);
1935         if (rc != 0) {
1936                 CERROR("Can't bind MD: %d\n", rc);
1937                 goto out_1;
1938         }
1939
1940         rc = LNetGet(LNET_NID_ANY, mdh, id,
1941                      LNET_RESERVED_PORTAL,
1942                      LNET_PROTO_PING_MATCHBITS, 0);
1943
1944         if (rc != 0) {
1945                 /* Don't CERROR; this could be deliberate! */
1946
1947                 rc2 = LNetMDUnlink(mdh);
1948                 LASSERT (rc2 == 0);
1949
1950                 /* NB must wait for the UNLINK event below... */
1951                 unlinked = 1;
1952                 timeout_ms = a_long_time;
1953         }
1954
1955         do {
1956                 /* MUST block for unlink to complete */
1957                 if (unlinked)
1958                         blocked = cfs_block_allsigs();
1959
1960                 rc2 = LNetEQPoll(&eqh, 1, timeout_ms, &event, &which);
1961
1962                 if (unlinked)
1963                         cfs_restore_sigs(blocked);
1964
1965                 CDEBUG(D_NET, "poll %d(%d %d)%s\n", rc2,
1966                        (rc2 <= 0) ? -1 : event.type,
1967                        (rc2 <= 0) ? -1 : event.status,
1968                        (rc2 > 0 && event.unlinked) ? " unlinked" : "");
1969
1970                 LASSERT (rc2 != -EOVERFLOW);     /* can't miss anything */
1971
1972                 if (rc2 <= 0 || event.status != 0) {
1973                         /* timeout or error */
1974                         if (!replied && rc == 0)
1975                                 rc = (rc2 < 0) ? rc2 :
1976                                      (rc2 == 0) ? -ETIMEDOUT :
1977                                      event.status;
1978
1979                         if (!unlinked) {
1980                                 /* Ensure completion in finite time... */
1981                                 LNetMDUnlink(mdh);
1982                                 /* No assertion (racing with network) */
1983                                 unlinked = 1;
1984                                 timeout_ms = a_long_time;
1985                         } else if (rc2 == 0) {
1986                                 /* timed out waiting for unlink */
1987                                 CWARN("ping %s: late network completion\n",
1988                                       libcfs_id2str(id));
1989                         }
1990                 } else if (event.type == LNET_EVENT_REPLY) {
1991                         replied = 1;
1992                         rc = event.mlength;
1993                 }
1994
1995         } while (rc2 <= 0 || !event.unlinked);
1996
1997         if (!replied) {
1998                 if (rc >= 0)
1999                         CWARN("%s: Unexpected rc >= 0 but no reply!\n",
2000                               libcfs_id2str(id));
2001                 rc = -EIO;
2002                 goto out_1;
2003         }
2004
2005         nob = rc;
2006         LASSERT (nob >= 0 && nob <= infosz);
2007
2008         rc = -EPROTO;                           /* if I can't parse... */
2009
2010         if (nob < 8) {
2011                 /* can't check magic/version */
2012                 CERROR("%s: ping info too short %d\n",
2013                        libcfs_id2str(id), nob);
2014                 goto out_1;
2015         }
2016
2017         if (info->pi_magic == __swab32(LNET_PROTO_PING_MAGIC)) {
2018                 lnet_swap_pinginfo(info);
2019         } else if (info->pi_magic != LNET_PROTO_PING_MAGIC) {
2020                 CERROR("%s: Unexpected magic %08x\n", 
2021                        libcfs_id2str(id), info->pi_magic);
2022                 goto out_1;
2023         }
2024
2025         if ((info->pi_features & LNET_PING_FEAT_NI_STATUS) == 0) {
2026                 CERROR("%s: ping w/o NI status: 0x%x\n",
2027                        libcfs_id2str(id), info->pi_features);
2028                 goto out_1;
2029         }
2030
2031         if (nob < offsetof(lnet_ping_info_t, pi_ni[0])) {
2032                 CERROR("%s: Short reply %d(%d min)\n", libcfs_id2str(id),
2033                        nob, (int)offsetof(lnet_ping_info_t, pi_ni[0]));
2034                 goto out_1;
2035         }
2036
2037         if (info->pi_nnis < n_ids)
2038                 n_ids = info->pi_nnis;
2039
2040         if (nob < offsetof(lnet_ping_info_t, pi_ni[n_ids])) {
2041                 CERROR("%s: Short reply %d(%d expected)\n", libcfs_id2str(id),
2042                        nob, (int)offsetof(lnet_ping_info_t, pi_ni[n_ids]));
2043                 goto out_1;
2044         }
2045
2046         rc = -EFAULT;                           /* If I SEGV... */
2047
2048         for (i = 0; i < n_ids; i++) {
2049                 tmpid.pid = info->pi_pid;
2050                 tmpid.nid = info->pi_ni[i].ns_nid;
2051 #ifdef __KERNEL__
2052                 if (cfs_copy_to_user(&ids[i], &tmpid, sizeof(tmpid)))
2053                         goto out_1;
2054 #else
2055                 ids[i] = tmpid;
2056 #endif
2057         }
2058         rc = info->pi_nnis;
2059
2060  out_1:
2061         rc2 = LNetEQFree(eqh);
2062         if (rc2 != 0)
2063                 CERROR("rc2 %d\n", rc2);
2064         LASSERT (rc2 == 0);
2065
2066  out_0:
2067         LIBCFS_FREE(info, infosz);
2068         return rc;
2069 }