Whamcloud - gitweb
LU-56 lnet: Partitioned LNet resources (ME/MD/EQ)
[fs/lustre-release.git] / lnet / lnet / api-ni.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_LNET
38 #include <lnet/lib-lnet.h>
39
40 #ifdef __KERNEL__
41 #define D_LNI D_CONSOLE
42 #else
43 #define D_LNI D_CONFIG
44 #endif
45
46 lnet_t      the_lnet;                           /* THE state of the network */
47
48 #ifdef __KERNEL__
49
50 static char *ip2nets = "";
51 CFS_MODULE_PARM(ip2nets, "s", charp, 0444,
52                 "LNET network <- IP table");
53
54 static char *networks = "";
55 CFS_MODULE_PARM(networks, "s", charp, 0444,
56                 "local networks");
57
58 static char *routes = "";
59 CFS_MODULE_PARM(routes, "s", charp, 0444,
60                 "routes to non-local networks");
61
62 char *
63 lnet_get_routes(void)
64 {
65         return routes;
66 }
67
68 char *
69 lnet_get_networks(void)
70 {
71         char   *nets;
72         int     rc;
73
74         if (*networks != 0 && *ip2nets != 0) {
75                 LCONSOLE_ERROR_MSG(0x101, "Please specify EITHER 'networks' or "
76                                    "'ip2nets' but not both at once\n");
77                 return NULL;
78         }
79
80         if (*ip2nets != 0) {
81                 rc = lnet_parse_ip2nets(&nets, ip2nets);
82                 return (rc == 0) ? nets : NULL;
83         }
84
85         if (*networks != 0)
86                 return networks;
87
88         return "tcp";
89 }
90
91 void
92 lnet_init_locks(void)
93 {
94         cfs_spin_lock_init(&the_lnet.ln_lock);
95         cfs_spin_lock_init(&the_lnet.ln_eq_wait_lock);
96         cfs_waitq_init(&the_lnet.ln_eq_waitq);
97         cfs_mutex_init(&the_lnet.ln_lnd_mutex);
98         cfs_mutex_init(&the_lnet.ln_api_mutex);
99 }
100
101 void
102 lnet_fini_locks(void)
103 {
104 }
105
106 #else
107
108 char *
109 lnet_get_routes(void)
110 {
111         char *str = getenv("LNET_ROUTES");
112
113         return (str == NULL) ? "" : str;
114 }
115
116 char *
117 lnet_get_networks (void)
118 {
119         static char       default_networks[256];
120         char             *networks = getenv ("LNET_NETWORKS");
121         char             *ip2nets  = getenv ("LNET_IP2NETS");
122         char             *str;
123         char             *sep;
124         int               len;
125         int               nob;
126         int               rc;
127         cfs_list_t       *tmp;
128
129 #ifdef NOT_YET
130         if (networks != NULL && ip2nets != NULL) {
131                 LCONSOLE_ERROR_MSG(0x103, "Please set EITHER 'LNET_NETWORKS' or"
132                                    " 'LNET_IP2NETS' but not both at once\n");
133                 return NULL;
134         }
135
136         if (ip2nets != NULL) {
137                 rc = lnet_parse_ip2nets(&networks, ip2nets);
138                 return (rc == 0) ? networks : NULL;
139         }
140 #else
141         SET_BUT_UNUSED(ip2nets);
142         SET_BUT_UNUSED(rc);
143 #endif
144         if (networks != NULL)
145                 return networks;
146
147         /* In userland, the default 'networks=' is the list of known net types */
148
149         len = sizeof(default_networks);
150         str = default_networks;
151         *str = 0;
152         sep = "";
153
154         cfs_list_for_each (tmp, &the_lnet.ln_lnds) {
155                 lnd_t *lnd = cfs_list_entry(tmp, lnd_t, lnd_list);
156
157                 nob = snprintf(str, len, "%s%s", sep,
158                                libcfs_lnd2str(lnd->lnd_type));
159                 len -= nob;
160                 if (len < 0) {
161                         /* overflowed the string; leave it where it was */
162                         *str = 0;
163                         break;
164                 }
165
166                 str += nob;
167                 sep = ",";
168         }
169
170         return default_networks;
171 }
172
173 # ifndef HAVE_LIBPTHREAD
174
175 void lnet_init_locks(void)
176 {
177         the_lnet.ln_lock = 0;
178         the_lnet.ln_eq_wait_lock = 0;
179         the_lnet.ln_lnd_mutex = 0;
180         the_lnet.ln_api_mutex = 0;
181 }
182
183 void lnet_fini_locks(void)
184 {
185         LASSERT(the_lnet.ln_api_mutex == 0);
186         LASSERT(the_lnet.ln_lnd_mutex == 0);
187         LASSERT(the_lnet.ln_lock == 0);
188         LASSERT(the_lnet.ln_eq_wait_lock == 0);
189 }
190
191 # else
192
193 void lnet_init_locks(void)
194 {
195         pthread_cond_init(&the_lnet.ln_eq_cond, NULL);
196         pthread_mutex_init(&the_lnet.ln_lock, NULL);
197         pthread_mutex_init(&the_lnet.ln_eq_wait_lock, NULL);
198         pthread_mutex_init(&the_lnet.ln_lnd_mutex, NULL);
199         pthread_mutex_init(&the_lnet.ln_api_mutex, NULL);
200 }
201
202 void lnet_fini_locks(void)
203 {
204         pthread_mutex_destroy(&the_lnet.ln_api_mutex);
205         pthread_mutex_destroy(&the_lnet.ln_lnd_mutex);
206         pthread_mutex_destroy(&the_lnet.ln_lock);
207         pthread_mutex_destroy(&the_lnet.ln_eq_wait_lock);
208         pthread_cond_destroy(&the_lnet.ln_eq_cond);
209 }
210
211 # endif
212 #endif
213
214 static int
215 lnet_create_locks(void)
216 {
217         lnet_init_locks();
218
219         the_lnet.ln_res_lock = cfs_percpt_lock_alloc(lnet_cpt_table());
220         if (the_lnet.ln_res_lock != NULL)
221                 return 0;
222
223         lnet_fini_locks();
224         return -ENOMEM;
225 }
226
227 static void
228 lnet_destroy_locks(void)
229 {
230         if (the_lnet.ln_res_lock != NULL) {
231                 cfs_percpt_lock_free(the_lnet.ln_res_lock);
232                 the_lnet.ln_res_lock = NULL;
233         }
234
235         lnet_fini_locks();
236 }
237
238 void lnet_assert_wire_constants (void)
239 {
240         /* Wire protocol assertions generated by 'wirecheck'
241          * running on Linux robert.bartonsoftware.com 2.6.8-1.521
242          * #1 Mon Aug 16 09:01:18 EDT 2004 i686 athlon i386 GNU/Linux
243          * with gcc version 3.3.3 20040412 (Red Hat Linux 3.3.3-7) */
244
245         /* Constants... */
246         CLASSERT (LNET_PROTO_TCP_MAGIC == 0xeebc0ded);
247         CLASSERT (LNET_PROTO_TCP_VERSION_MAJOR == 1);
248         CLASSERT (LNET_PROTO_TCP_VERSION_MINOR == 0);
249         CLASSERT (LNET_MSG_ACK == 0);
250         CLASSERT (LNET_MSG_PUT == 1);
251         CLASSERT (LNET_MSG_GET == 2);
252         CLASSERT (LNET_MSG_REPLY == 3);
253         CLASSERT (LNET_MSG_HELLO == 4);
254
255         /* Checks for struct ptl_handle_wire_t */
256         CLASSERT ((int)sizeof(lnet_handle_wire_t) == 16);
257         CLASSERT ((int)offsetof(lnet_handle_wire_t, wh_interface_cookie) == 0);
258         CLASSERT ((int)sizeof(((lnet_handle_wire_t *)0)->wh_interface_cookie) == 8);
259         CLASSERT ((int)offsetof(lnet_handle_wire_t, wh_object_cookie) == 8);
260         CLASSERT ((int)sizeof(((lnet_handle_wire_t *)0)->wh_object_cookie) == 8);
261
262         /* Checks for struct lnet_magicversion_t */
263         CLASSERT ((int)sizeof(lnet_magicversion_t) == 8);
264         CLASSERT ((int)offsetof(lnet_magicversion_t, magic) == 0);
265         CLASSERT ((int)sizeof(((lnet_magicversion_t *)0)->magic) == 4);
266         CLASSERT ((int)offsetof(lnet_magicversion_t, version_major) == 4);
267         CLASSERT ((int)sizeof(((lnet_magicversion_t *)0)->version_major) == 2);
268         CLASSERT ((int)offsetof(lnet_magicversion_t, version_minor) == 6);
269         CLASSERT ((int)sizeof(((lnet_magicversion_t *)0)->version_minor) == 2);
270
271         /* Checks for struct lnet_hdr_t */
272         CLASSERT ((int)sizeof(lnet_hdr_t) == 72);
273         CLASSERT ((int)offsetof(lnet_hdr_t, dest_nid) == 0);
274         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->dest_nid) == 8);
275         CLASSERT ((int)offsetof(lnet_hdr_t, src_nid) == 8);
276         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->src_nid) == 8);
277         CLASSERT ((int)offsetof(lnet_hdr_t, dest_pid) == 16);
278         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->dest_pid) == 4);
279         CLASSERT ((int)offsetof(lnet_hdr_t, src_pid) == 20);
280         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->src_pid) == 4);
281         CLASSERT ((int)offsetof(lnet_hdr_t, type) == 24);
282         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->type) == 4);
283         CLASSERT ((int)offsetof(lnet_hdr_t, payload_length) == 28);
284         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->payload_length) == 4);
285         CLASSERT ((int)offsetof(lnet_hdr_t, msg) == 32);
286         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->msg) == 40);
287
288         /* Ack */
289         CLASSERT ((int)offsetof(lnet_hdr_t, msg.ack.dst_wmd) == 32);
290         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->msg.ack.dst_wmd) == 16);
291         CLASSERT ((int)offsetof(lnet_hdr_t, msg.ack.match_bits) == 48);
292         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->msg.ack.match_bits) == 8);
293         CLASSERT ((int)offsetof(lnet_hdr_t, msg.ack.mlength) == 56);
294         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->msg.ack.mlength) == 4);
295
296         /* Put */
297         CLASSERT ((int)offsetof(lnet_hdr_t, msg.put.ack_wmd) == 32);
298         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->msg.put.ack_wmd) == 16);
299         CLASSERT ((int)offsetof(lnet_hdr_t, msg.put.match_bits) == 48);
300         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->msg.put.match_bits) == 8);
301         CLASSERT ((int)offsetof(lnet_hdr_t, msg.put.hdr_data) == 56);
302         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->msg.put.hdr_data) == 8);
303         CLASSERT ((int)offsetof(lnet_hdr_t, msg.put.ptl_index) == 64);
304         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->msg.put.ptl_index) == 4);
305         CLASSERT ((int)offsetof(lnet_hdr_t, msg.put.offset) == 68);
306         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->msg.put.offset) == 4);
307
308         /* Get */
309         CLASSERT ((int)offsetof(lnet_hdr_t, msg.get.return_wmd) == 32);
310         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->msg.get.return_wmd) == 16);
311         CLASSERT ((int)offsetof(lnet_hdr_t, msg.get.match_bits) == 48);
312         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->msg.get.match_bits) == 8);
313         CLASSERT ((int)offsetof(lnet_hdr_t, msg.get.ptl_index) == 56);
314         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->msg.get.ptl_index) == 4);
315         CLASSERT ((int)offsetof(lnet_hdr_t, msg.get.src_offset) == 60);
316         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->msg.get.src_offset) == 4);
317         CLASSERT ((int)offsetof(lnet_hdr_t, msg.get.sink_length) == 64);
318         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->msg.get.sink_length) == 4);
319
320         /* Reply */
321         CLASSERT ((int)offsetof(lnet_hdr_t, msg.reply.dst_wmd) == 32);
322         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->msg.reply.dst_wmd) == 16);
323
324         /* Hello */
325         CLASSERT ((int)offsetof(lnet_hdr_t, msg.hello.incarnation) == 32);
326         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->msg.hello.incarnation) == 8);
327         CLASSERT ((int)offsetof(lnet_hdr_t, msg.hello.type) == 40);
328         CLASSERT ((int)sizeof(((lnet_hdr_t *)0)->msg.hello.type) == 4);
329 }
330
331 lnd_t *
332 lnet_find_lnd_by_type (int type)
333 {
334         lnd_t              *lnd;
335         cfs_list_t         *tmp;
336
337         /* holding lnd mutex */
338         cfs_list_for_each (tmp, &the_lnet.ln_lnds) {
339                 lnd = cfs_list_entry(tmp, lnd_t, lnd_list);
340
341                 if ((int)lnd->lnd_type == type)
342                         return lnd;
343         }
344
345         return NULL;
346 }
347
348 void
349 lnet_register_lnd (lnd_t *lnd)
350 {
351         LNET_MUTEX_LOCK(&the_lnet.ln_lnd_mutex);
352
353         LASSERT (the_lnet.ln_init);
354         LASSERT (libcfs_isknown_lnd(lnd->lnd_type));
355         LASSERT (lnet_find_lnd_by_type(lnd->lnd_type) == NULL);
356
357         cfs_list_add_tail (&lnd->lnd_list, &the_lnet.ln_lnds);
358         lnd->lnd_refcount = 0;
359
360         CDEBUG(D_NET, "%s LND registered\n", libcfs_lnd2str(lnd->lnd_type));
361
362         LNET_MUTEX_UNLOCK(&the_lnet.ln_lnd_mutex);
363 }
364
365 void
366 lnet_unregister_lnd (lnd_t *lnd)
367 {
368         LNET_MUTEX_LOCK(&the_lnet.ln_lnd_mutex);
369
370         LASSERT (the_lnet.ln_init);
371         LASSERT (lnet_find_lnd_by_type(lnd->lnd_type) == lnd);
372         LASSERT (lnd->lnd_refcount == 0);
373
374         cfs_list_del (&lnd->lnd_list);
375         CDEBUG(D_NET, "%s LND unregistered\n", libcfs_lnd2str(lnd->lnd_type));
376
377         LNET_MUTEX_UNLOCK(&the_lnet.ln_lnd_mutex);
378 }
379
380 #ifdef LNET_USE_LIB_FREELIST
381
382 int
383 lnet_freelist_init (lnet_freelist_t *fl, int n, int size)
384 {
385         char *space;
386
387         LASSERT (n > 0);
388
389         size += offsetof (lnet_freeobj_t, fo_contents);
390
391         LIBCFS_ALLOC(space, n * size);
392         if (space == NULL)
393                 return (-ENOMEM);
394
395         CFS_INIT_LIST_HEAD (&fl->fl_list);
396         fl->fl_objs = space;
397         fl->fl_nobjs = n;
398         fl->fl_objsize = size;
399
400         do
401         {
402                 memset (space, 0, size);
403                 cfs_list_add ((cfs_list_t *)space, &fl->fl_list);
404                 space += size;
405         } while (--n != 0);
406
407         return (0);
408 }
409
410 void
411 lnet_freelist_fini (lnet_freelist_t *fl)
412 {
413         cfs_list_t       *el;
414         int               count;
415
416         if (fl->fl_nobjs == 0)
417                 return;
418
419         count = 0;
420         for (el = fl->fl_list.next; el != &fl->fl_list; el = el->next)
421                 count++;
422
423         LASSERT (count == fl->fl_nobjs);
424
425         LIBCFS_FREE(fl->fl_objs, fl->fl_nobjs * fl->fl_objsize);
426         memset (fl, 0, sizeof (*fl));
427 }
428
429 #endif /* LNET_USE_LIB_FREELIST */
430
431 __u64
432 lnet_create_interface_cookie (void)
433 {
434         /* NB the interface cookie in wire handles guards against delayed
435          * replies and ACKs appearing valid after reboot. Initialisation time,
436          * even if it's only implemented to millisecond resolution is probably
437          * easily good enough. */
438         struct timeval tv;
439         __u64          cookie;
440 #ifndef __KERNEL__
441         int            rc = gettimeofday (&tv, NULL);
442         LASSERT (rc == 0);
443 #else
444         cfs_gettimeofday(&tv);
445 #endif
446         cookie = tv.tv_sec;
447         cookie *= 1000000;
448         cookie += tv.tv_usec;
449         return cookie;
450 }
451
452 static char *
453 lnet_res_type2str(int type)
454 {
455         switch (type) {
456         default:
457                 LBUG();
458         case LNET_COOKIE_TYPE_MD:
459                 return "MD";
460         case LNET_COOKIE_TYPE_ME:
461                 return "ME";
462         case LNET_COOKIE_TYPE_EQ:
463                 return "EQ";
464         }
465 }
466
467 void
468 lnet_res_container_cleanup(struct lnet_res_container *rec)
469 {
470         int     count = 0;
471
472         if (rec->rec_type == 0) /* not set yet, it's a uninitialized */
473                 return;
474
475         while (!cfs_list_empty(&rec->rec_active)) {
476                 cfs_list_t *e = rec->rec_active.next;
477
478                 cfs_list_del_init(e);
479                 if (rec->rec_type == LNET_COOKIE_TYPE_EQ) {
480                         lnet_eq_free(cfs_list_entry(e, lnet_eq_t, eq_list));
481
482                 } else if (rec->rec_type == LNET_COOKIE_TYPE_MD) {
483                         lnet_md_free(cfs_list_entry(e, lnet_libmd_t, md_list));
484
485                 } else { /* NB: Active MEs should be attached on portals */
486                         LBUG();
487                 }
488                 count++;
489         }
490
491         if (count > 0) {
492                 /* Found alive MD/ME/EQ, user really should unlink/free
493                  * all of them before finalize LNet, but if someone didn't,
494                  * we have to recycle garbage for him */
495                 CERROR("%d active elements on exit of %s container\n",
496                        count, lnet_res_type2str(rec->rec_type));
497         }
498
499 #ifdef LNET_USE_LIB_FREELIST
500         lnet_freelist_fini(&rec->rec_freelist);
501 #endif
502         if (rec->rec_lh_hash != NULL) {
503                 LIBCFS_FREE(rec->rec_lh_hash,
504                             LNET_LH_HASH_SIZE * sizeof(rec->rec_lh_hash[0]));
505                 rec->rec_lh_hash = NULL;
506         }
507
508         rec->rec_type = 0; /* mark it as finalized */
509 }
510
511 int
512 lnet_res_container_setup(struct lnet_res_container *rec,
513                          int cpt, int type, int objnum, int objsz)
514 {
515         int     rc = 0;
516         int     i;
517
518         LASSERT(rec->rec_type == 0);
519
520         rec->rec_type = type;
521         CFS_INIT_LIST_HEAD(&rec->rec_active);
522
523 #ifdef LNET_USE_LIB_FREELIST
524         memset(&rec->rec_freelist, 0, sizeof(rec->rec_freelist));
525         rc = lnet_freelist_init(&rec->rec_freelist, objnum, objsz);
526         if (rc != 0)
527                 goto out;
528 #endif
529         rec->rec_lh_cookie = (cpt << LNET_COOKIE_TYPE_BITS) | type;
530
531         /* Arbitrary choice of hash table size */
532         LIBCFS_CPT_ALLOC(rec->rec_lh_hash, lnet_cpt_table(), cpt,
533                          LNET_LH_HASH_SIZE * sizeof(rec->rec_lh_hash[0]));
534         if (rec->rec_lh_hash == NULL) {
535                 rc = -ENOMEM;
536                 goto out;
537         }
538
539         for (i = 0; i < LNET_LH_HASH_SIZE; i++)
540                 CFS_INIT_LIST_HEAD(&rec->rec_lh_hash[i]);
541
542         return 0;
543
544 out:
545         CERROR("Failed to setup %s resource container\n",
546                lnet_res_type2str(type));
547         lnet_res_container_cleanup(rec);
548         return rc;
549 }
550
551 static void
552 lnet_res_containers_destroy(struct lnet_res_container **recs)
553 {
554         struct lnet_res_container       *rec;
555         int                             i;
556
557         cfs_percpt_for_each(rec, i, recs)
558                 lnet_res_container_cleanup(rec);
559
560         cfs_percpt_free(recs);
561 }
562
563 static struct lnet_res_container **
564 lnet_res_containers_create(int type, int objnum, int objsz)
565 {
566         struct lnet_res_container       **recs;
567         struct lnet_res_container       *rec;
568         int                             rc;
569         int                             i;
570
571         recs = cfs_percpt_alloc(lnet_cpt_table(), sizeof(*rec));
572         if (recs == NULL) {
573                 CERROR("Failed to allocate %s resource containers\n",
574                        lnet_res_type2str(type));
575                 return NULL;
576         }
577
578         cfs_percpt_for_each(rec, i, recs) {
579                 rc = lnet_res_container_setup(rec, i, type, objnum, objsz);
580                 if (rc != 0) {
581                         lnet_res_containers_destroy(recs);
582                         return NULL;
583                 }
584         }
585
586         return recs;
587 }
588
589 lnet_libhandle_t *
590 lnet_res_lh_lookup(struct lnet_res_container *rec, __u64 cookie)
591 {
592         /* ALWAYS called with lnet_res_lock held */
593         cfs_list_t              *head;
594         lnet_libhandle_t        *lh;
595         unsigned int            hash;
596
597         if ((cookie & (LNET_COOKIE_TYPES - 1)) != rec->rec_type)
598                 return NULL;
599
600         hash = cookie >> (LNET_COOKIE_TYPE_BITS + LNET_CPT_BITS);
601         head = &rec->rec_lh_hash[hash & LNET_LH_HASH_MASK];
602
603         cfs_list_for_each_entry(lh, head, lh_hash_chain) {
604                 if (lh->lh_cookie == cookie)
605                         return lh;
606         }
607
608         return NULL;
609 }
610
611 void
612 lnet_res_lh_initialize(struct lnet_res_container *rec, lnet_libhandle_t *lh)
613 {
614         /* ALWAYS called with lnet_res_lock held */
615         unsigned int    ibits = LNET_COOKIE_TYPE_BITS + LNET_CPT_BITS;
616         unsigned int    hash;
617
618         lh->lh_cookie = rec->rec_lh_cookie;
619         rec->rec_lh_cookie += 1 << ibits;
620
621         hash = (lh->lh_cookie >> ibits) & LNET_LH_HASH_MASK;
622
623         cfs_list_add(&lh->lh_hash_chain, &rec->rec_lh_hash[hash]);
624 }
625
626 #ifndef __KERNEL__
627 /**
628  * Reserved API - do not use.
629  * Temporary workaround to allow uOSS and test programs force server
630  * mode in userspace. See comments near ln_server_mode_flag in
631  * lnet/lib-types.h */
632
633 void
634 lnet_server_mode() {
635         the_lnet.ln_server_mode_flag = 1;
636 }
637 #endif
638
639 int
640 lnet_prepare(lnet_pid_t requested_pid)
641 {
642         /* Prepare to bring up the network */
643         struct lnet_res_container **recs;
644         int                       rc = 0;
645
646         LASSERT (the_lnet.ln_refcount == 0);
647
648         the_lnet.ln_routing = 0;
649
650 #ifdef __KERNEL__
651         LASSERT ((requested_pid & LNET_PID_USERFLAG) == 0);
652         the_lnet.ln_pid = requested_pid;
653 #else
654         if (the_lnet.ln_server_mode_flag) {/* server case (uOSS) */
655                 LASSERT ((requested_pid & LNET_PID_USERFLAG) == 0);
656
657                 if (cfs_curproc_uid())/* Only root can run user-space server */
658                         return -EPERM;
659                 the_lnet.ln_pid = requested_pid;
660
661         } else {/* client case (liblustre) */
662
663                 /* My PID must be unique on this node and flag I'm userspace */
664                 the_lnet.ln_pid = getpid() | LNET_PID_USERFLAG;
665         }
666 #endif
667
668         memset(&the_lnet.ln_counters, 0,
669                sizeof(the_lnet.ln_counters));
670
671         CFS_INIT_LIST_HEAD (&the_lnet.ln_test_peers);
672         CFS_INIT_LIST_HEAD (&the_lnet.ln_nis);
673         CFS_INIT_LIST_HEAD (&the_lnet.ln_zombie_nis);
674         CFS_INIT_LIST_HEAD (&the_lnet.ln_remote_nets);
675         CFS_INIT_LIST_HEAD (&the_lnet.ln_routers);
676
677         the_lnet.ln_interface_cookie = lnet_create_interface_cookie();
678
679         lnet_init_rtrpools();
680
681         rc = lnet_peer_table_create();
682         if (rc != 0)
683                 goto failed0;
684
685         /* NB: we will have instance of message container per CPT soon */
686         rc = lnet_msg_container_setup(&the_lnet.ln_msg_container);
687         if (rc != 0)
688                 goto failed1;
689
690         rc = lnet_res_container_setup(&the_lnet.ln_eq_container, 0,
691                                       LNET_COOKIE_TYPE_EQ, LNET_FL_MAX_EQS,
692                                       sizeof(lnet_eq_t));
693         if (rc != 0)
694                 goto failed2;
695
696         recs = lnet_res_containers_create(LNET_COOKIE_TYPE_ME, LNET_FL_MAX_MES,
697                                           sizeof(lnet_me_t));
698         if (recs == NULL)
699                 goto failed3;
700
701         the_lnet.ln_me_containers = recs;
702
703         /* NB: we will have instance of MD container per CPT soon */
704         recs = lnet_res_containers_create(LNET_COOKIE_TYPE_MD, LNET_FL_MAX_MDS,
705                                           sizeof(lnet_libmd_t));
706         if (recs == NULL)
707                 goto failed3;
708
709         the_lnet.ln_md_containers = recs;
710
711         rc = lnet_portals_create();
712         if (rc != 0) {
713                 CERROR("Failed to create portals for LNet: %d\n", rc);
714                 goto failed3;
715         }
716
717         return 0;
718
719  failed3:
720         /* NB: lnet_res_container_cleanup is safe to call for
721          * uninitialized container */
722         if (the_lnet.ln_md_containers != NULL) {
723                 lnet_res_containers_destroy(the_lnet.ln_md_containers);
724                 the_lnet.ln_md_containers = NULL;
725         }
726         if (the_lnet.ln_me_containers != NULL) {
727                 lnet_res_containers_destroy(the_lnet.ln_me_containers);
728                 the_lnet.ln_me_containers = NULL;
729         }
730         lnet_res_container_cleanup(&the_lnet.ln_eq_container);
731  failed2:
732         lnet_msg_container_cleanup(&the_lnet.ln_msg_container);
733  failed1:
734         lnet_peer_table_destroy();
735  failed0:
736         return rc;
737 }
738
739 int
740 lnet_unprepare (void)
741 {
742         /* NB no LNET_LOCK since this is the last reference.  All LND instances
743          * have shut down already, so it is safe to unlink and free all
744          * descriptors, even those that appear committed to a network op (eg MD
745          * with non-zero pending count) */
746
747         lnet_fail_nid(LNET_NID_ANY, 0);
748
749         LASSERT (cfs_list_empty(&the_lnet.ln_test_peers));
750         LASSERT (the_lnet.ln_refcount == 0);
751         LASSERT (cfs_list_empty(&the_lnet.ln_nis));
752         LASSERT (cfs_list_empty(&the_lnet.ln_zombie_nis));
753         LASSERT (the_lnet.ln_nzombie_nis == 0);
754
755         lnet_portals_destroy();
756
757         if (the_lnet.ln_md_containers != NULL) {
758                 lnet_res_containers_destroy(the_lnet.ln_md_containers);
759                 the_lnet.ln_md_containers = NULL;
760         }
761
762         if (the_lnet.ln_me_containers != NULL) {
763                 lnet_res_containers_destroy(the_lnet.ln_me_containers);
764                 the_lnet.ln_me_containers = NULL;
765         }
766
767         lnet_res_container_cleanup(&the_lnet.ln_eq_container);
768
769         lnet_free_rtrpools();
770         lnet_msg_container_cleanup(&the_lnet.ln_msg_container);
771         lnet_peer_table_destroy();
772
773         return 0;
774 }
775
776 lnet_ni_t  *
777 lnet_net2ni_locked (__u32 net)
778 {
779         cfs_list_t       *tmp;
780         lnet_ni_t        *ni;
781
782         cfs_list_for_each (tmp, &the_lnet.ln_nis) {
783                 ni = cfs_list_entry(tmp, lnet_ni_t, ni_list);
784
785                 if (LNET_NIDNET(ni->ni_nid) == net) {
786                         lnet_ni_addref_locked(ni);
787                         return ni;
788                 }
789         }
790
791         return NULL;
792 }
793
794 unsigned int
795 lnet_nid_cpt_hash(lnet_nid_t nid)
796 {
797         __u64           key = nid;
798         unsigned int    val;
799
800         val = cfs_hash_long(key, LNET_CPT_BITS);
801         /* NB: LNET_CP_NUMBER doesn't have to be PO2 */
802         if (val < LNET_CPT_NUMBER)
803                 return val;
804
805         return (unsigned int)((key + val + (val >> 1)) % LNET_CPT_NUMBER);
806 }
807
808 int
809 lnet_cpt_of_nid(lnet_nid_t nid)
810 {
811         if (LNET_CPT_NUMBER == 1)
812                 return 0; /* the only one */
813
814         return lnet_nid_cpt_hash(nid);
815 }
816 EXPORT_SYMBOL(lnet_cpt_of_nid);
817
818 int
819 lnet_islocalnet (__u32 net)
820 {
821         lnet_ni_t        *ni;
822
823         LNET_LOCK();
824         ni = lnet_net2ni_locked(net);
825         if (ni != NULL)
826                 lnet_ni_decref_locked(ni);
827         LNET_UNLOCK();
828
829         return ni != NULL;
830 }
831
832 lnet_ni_t  *
833 lnet_nid2ni_locked (lnet_nid_t nid)
834 {
835         cfs_list_t       *tmp;
836         lnet_ni_t        *ni;
837
838         cfs_list_for_each (tmp, &the_lnet.ln_nis) {
839                 ni = cfs_list_entry(tmp, lnet_ni_t, ni_list);
840
841                 if (ni->ni_nid == nid) {
842                         lnet_ni_addref_locked(ni);
843                         return ni;
844                 }
845         }
846
847         return NULL;
848 }
849
850 int
851 lnet_islocalnid (lnet_nid_t nid)
852 {
853         lnet_ni_t     *ni;
854
855         LNET_LOCK();
856         ni = lnet_nid2ni_locked(nid);
857         if (ni != NULL)
858                 lnet_ni_decref_locked(ni);
859         LNET_UNLOCK();
860
861         return ni != NULL;
862 }
863
864 int
865 lnet_count_acceptor_nis (void)
866 {
867         /* Return the # of NIs that need the acceptor. */
868         int            count = 0;
869 #if defined(__KERNEL__) || defined(HAVE_LIBPTHREAD)
870         cfs_list_t    *tmp;
871         lnet_ni_t     *ni;
872
873         LNET_LOCK();
874         cfs_list_for_each (tmp, &the_lnet.ln_nis) {
875                 ni = cfs_list_entry(tmp, lnet_ni_t, ni_list);
876
877                 if (ni->ni_lnd->lnd_accept != NULL)
878                         count++;
879         }
880
881         LNET_UNLOCK();
882
883 #endif /* defined(__KERNEL__) || defined(HAVE_LIBPTHREAD) */
884         return count;
885 }
886
887 void
888 lnet_shutdown_lndnis (void)
889 {
890         int                i;
891         int                islo;
892         lnet_ni_t         *ni;
893
894         /* NB called holding the global mutex */
895
896         /* All quiet on the API front */
897         LASSERT (!the_lnet.ln_shutdown);
898         LASSERT (the_lnet.ln_refcount == 0);
899         LASSERT (cfs_list_empty(&the_lnet.ln_zombie_nis));
900         LASSERT (the_lnet.ln_nzombie_nis == 0);
901         LASSERT (cfs_list_empty(&the_lnet.ln_remote_nets));
902
903         LNET_LOCK();
904         the_lnet.ln_shutdown = 1;               /* flag shutdown */
905
906         /* Unlink NIs from the global table */
907         while (!cfs_list_empty(&the_lnet.ln_nis)) {
908                 ni = cfs_list_entry(the_lnet.ln_nis.next,
909                                     lnet_ni_t, ni_list);
910                 cfs_list_del (&ni->ni_list);
911
912                 the_lnet.ln_nzombie_nis++;
913                 lnet_ni_decref_locked(ni); /* drop ln_nis' ref */
914         }
915
916         /* Drop the cached eqwait NI. */
917         if (the_lnet.ln_eq_waitni != NULL) {
918                 lnet_ni_decref_locked(the_lnet.ln_eq_waitni);
919                 the_lnet.ln_eq_waitni = NULL;
920         }
921
922         /* Drop the cached loopback NI. */
923         if (the_lnet.ln_loni != NULL) {
924                 lnet_ni_decref_locked(the_lnet.ln_loni);
925                 the_lnet.ln_loni = NULL;
926         }
927
928         LNET_UNLOCK();
929
930         /* Clear lazy portals and drop delayed messages which hold refs
931          * on their lnet_msg_t::msg_rxpeer */
932         for (i = 0; i < the_lnet.ln_nportals; i++)
933                 LNetClearLazyPortal(i);
934
935         /* Clear the peer table and wait for all peers to go (they hold refs on
936          * their NIs) */
937         lnet_peer_table_cleanup();
938
939         LNET_LOCK();
940         /* Now wait for the NI's I just nuked to show up on ln_zombie_nis
941          * and shut them down in guaranteed thread context */
942         i = 2;
943         while (the_lnet.ln_nzombie_nis != 0) {
944
945                 while (cfs_list_empty(&the_lnet.ln_zombie_nis)) {
946                         LNET_UNLOCK();
947                         ++i;
948                         if ((i & (-i)) == i)
949                                 CDEBUG(D_WARNING,"Waiting for %d zombie NIs\n",
950                                        the_lnet.ln_nzombie_nis);
951                         cfs_pause(cfs_time_seconds(1));
952                         LNET_LOCK();
953                 }
954
955                 ni = cfs_list_entry(the_lnet.ln_zombie_nis.next,
956                                     lnet_ni_t, ni_list);
957                 cfs_list_del(&ni->ni_list);
958                 ni->ni_lnd->lnd_refcount--;
959
960                 LNET_UNLOCK();
961
962                 islo = ni->ni_lnd->lnd_type == LOLND;
963
964                 LASSERT (!cfs_in_interrupt ());
965                 (ni->ni_lnd->lnd_shutdown)(ni);
966
967                 /* can't deref lnd anymore now; it might have unregistered
968                  * itself...  */
969
970                 if (!islo)
971                         CDEBUG(D_LNI, "Removed LNI %s\n",
972                                libcfs_nid2str(ni->ni_nid));
973
974                 LIBCFS_FREE(ni, sizeof(*ni));
975
976                 LNET_LOCK();
977                 the_lnet.ln_nzombie_nis--;
978         }
979
980         the_lnet.ln_shutdown = 0;
981         LNET_UNLOCK();
982
983         if (the_lnet.ln_network_tokens != NULL) {
984                 LIBCFS_FREE(the_lnet.ln_network_tokens,
985                             the_lnet.ln_network_tokens_nob);
986                 the_lnet.ln_network_tokens = NULL;
987         }
988 }
989
990 int
991 lnet_startup_lndnis (void)
992 {
993         lnd_t             *lnd;
994         lnet_ni_t         *ni;
995         cfs_list_t         nilist;
996         int                rc = 0;
997         int                lnd_type;
998         int                nicount = 0;
999         char              *nets = lnet_get_networks();
1000
1001         CFS_INIT_LIST_HEAD(&nilist);
1002
1003         if (nets == NULL)
1004                 goto failed;
1005
1006         rc = lnet_parse_networks(&nilist, nets);
1007         if (rc != 0)
1008                 goto failed;
1009
1010         while (!cfs_list_empty(&nilist)) {
1011                 ni = cfs_list_entry(nilist.next, lnet_ni_t, ni_list);
1012                 lnd_type = LNET_NETTYP(LNET_NIDNET(ni->ni_nid));
1013
1014                 LASSERT (libcfs_isknown_lnd(lnd_type));
1015
1016                 if (lnd_type == CIBLND    ||
1017                     lnd_type == OPENIBLND ||
1018                     lnd_type == IIBLND    ||
1019                     lnd_type == VIBLND) {
1020                         CERROR("LND %s obsoleted\n",
1021                                libcfs_lnd2str(lnd_type));
1022                         goto failed;
1023                 }
1024
1025                 LNET_MUTEX_LOCK(&the_lnet.ln_lnd_mutex);
1026                 lnd = lnet_find_lnd_by_type(lnd_type);
1027
1028 #ifdef __KERNEL__
1029                 if (lnd == NULL) {
1030                         LNET_MUTEX_UNLOCK(&the_lnet.ln_lnd_mutex);
1031                         rc = cfs_request_module("%s",
1032                                                 libcfs_lnd2modname(lnd_type));
1033                         LNET_MUTEX_LOCK(&the_lnet.ln_lnd_mutex);
1034
1035                         lnd = lnet_find_lnd_by_type(lnd_type);
1036                         if (lnd == NULL) {
1037                                 LNET_MUTEX_UNLOCK(&the_lnet.ln_lnd_mutex);
1038                                 CERROR("Can't load LND %s, module %s, rc=%d\n",
1039                                        libcfs_lnd2str(lnd_type),
1040                                        libcfs_lnd2modname(lnd_type), rc);
1041 #ifndef HAVE_MODULE_LOADING_SUPPORT
1042                                 LCONSOLE_ERROR_MSG(0x104, "Your kernel must be "
1043                                          "compiled with kernel module "
1044                                          "loading support.");
1045 #endif
1046                                 goto failed;
1047                         }
1048                 }
1049 #else
1050                 if (lnd == NULL) {
1051                         LNET_MUTEX_UNLOCK(&the_lnet.ln_lnd_mutex);
1052                         CERROR("LND %s not supported\n",
1053                                libcfs_lnd2str(lnd_type));
1054                         goto failed;
1055                 }
1056 #endif
1057
1058                 ni->ni_refcount = 1;
1059
1060                 LNET_LOCK();
1061                 lnd->lnd_refcount++;
1062                 LNET_UNLOCK();
1063
1064                 ni->ni_lnd = lnd;
1065
1066                 rc = (lnd->lnd_startup)(ni);
1067
1068                 LNET_MUTEX_UNLOCK(&the_lnet.ln_lnd_mutex);
1069
1070                 if (rc != 0) {
1071                         LCONSOLE_ERROR_MSG(0x105, "Error %d starting up LNI %s"
1072                                            "\n",
1073                                            rc, libcfs_lnd2str(lnd->lnd_type));
1074                         LNET_LOCK();
1075                         lnd->lnd_refcount--;
1076                         LNET_UNLOCK();
1077                         goto failed;
1078                 }
1079
1080                 LASSERT (ni->ni_peertimeout <= 0 || lnd->lnd_query != NULL);
1081
1082                 cfs_list_del(&ni->ni_list);
1083
1084                 LNET_LOCK();
1085                 cfs_list_add_tail(&ni->ni_list, &the_lnet.ln_nis);
1086                 LNET_UNLOCK();
1087
1088                 if (lnd->lnd_type == LOLND) {
1089                         lnet_ni_addref(ni);
1090                         LASSERT (the_lnet.ln_loni == NULL);
1091                         the_lnet.ln_loni = ni;
1092                         continue;
1093                 }
1094
1095 #ifndef __KERNEL__
1096                 if (lnd->lnd_wait != NULL) {
1097                         if (the_lnet.ln_eq_waitni == NULL) {
1098                                 lnet_ni_addref(ni);
1099                                 the_lnet.ln_eq_waitni = ni;
1100                         }
1101                 } else {
1102 # ifndef HAVE_LIBPTHREAD
1103                         LCONSOLE_ERROR_MSG(0x106, "LND %s not supported in a "
1104                                            "single-threaded runtime\n",
1105                                            libcfs_lnd2str(lnd_type));
1106                         goto failed;
1107 # endif
1108                 }
1109 #endif
1110                 if (ni->ni_peertxcredits == 0 ||
1111                     ni->ni_maxtxcredits == 0) {
1112                         LCONSOLE_ERROR_MSG(0x107, "LNI %s has no %scredits\n",
1113                                            libcfs_lnd2str(lnd->lnd_type),
1114                                            ni->ni_peertxcredits == 0 ?
1115                                            "" : "per-peer ");
1116                         goto failed;
1117                 }
1118
1119                 ni->ni_txcredits = ni->ni_mintxcredits = ni->ni_maxtxcredits;
1120
1121                 CDEBUG(D_LNI, "Added LNI %s [%d/%d/%d/%d]\n",
1122                        libcfs_nid2str(ni->ni_nid),
1123                        ni->ni_peertxcredits, ni->ni_txcredits,
1124                        ni->ni_peerrtrcredits, ni->ni_peertimeout);
1125
1126                 nicount++;
1127         }
1128
1129         if (the_lnet.ln_eq_waitni != NULL && nicount > 1) {
1130                 lnd_type = the_lnet.ln_eq_waitni->ni_lnd->lnd_type;
1131                 LCONSOLE_ERROR_MSG(0x109, "LND %s can only run single-network"
1132                                    "\n",
1133                                    libcfs_lnd2str(lnd_type));
1134                 goto failed;
1135         }
1136
1137         return 0;
1138
1139  failed:
1140         lnet_shutdown_lndnis();
1141
1142         while (!cfs_list_empty(&nilist)) {
1143                 ni = cfs_list_entry(nilist.next, lnet_ni_t, ni_list);
1144                 cfs_list_del(&ni->ni_list);
1145                 LIBCFS_FREE(ni, sizeof(*ni));
1146         }
1147
1148         return -ENETDOWN;
1149 }
1150
1151 /**
1152  * Initialize LNet library.
1153  *
1154  * Only userspace program needs to call this function - it's automatically
1155  * called in the kernel at module loading time. Caller has to call LNetFini()
1156  * after a call to LNetInit(), if and only if the latter returned 0. It must
1157  * be called exactly once.
1158  *
1159  * \return 0 on success, and -ve on failures.
1160  */
1161 int
1162 LNetInit(void)
1163 {
1164         int     rc;
1165
1166         lnet_assert_wire_constants ();
1167         LASSERT (!the_lnet.ln_init);
1168
1169         memset(&the_lnet, 0, sizeof(the_lnet));
1170
1171         /* refer to global cfs_cpt_table for now */
1172         the_lnet.ln_cpt_table   = cfs_cpt_table;
1173         the_lnet.ln_cpt_number  = cfs_cpt_number(cfs_cpt_table);
1174
1175         LASSERT(the_lnet.ln_cpt_number > 0);
1176         if (the_lnet.ln_cpt_number > LNET_CPT_MAX) {
1177                 /* we are under risk of consuming all lh_cookie */
1178                 CERROR("Can't have %d CPTs for LNet (max allowed is %d), "
1179                        "please change setting of CPT-table and retry\n",
1180                        the_lnet.ln_cpt_number, LNET_CPT_MAX);
1181                 return -1;
1182         }
1183
1184         while ((1 << the_lnet.ln_cpt_bits) < the_lnet.ln_cpt_number)
1185                 the_lnet.ln_cpt_bits++;
1186
1187         rc = lnet_create_locks();
1188         if (rc != 0) {
1189                 CERROR("Can't create LNet global locks: %d\n", rc);
1190                 return -1;
1191         }
1192
1193         the_lnet.ln_refcount = 0;
1194         the_lnet.ln_init = 1;
1195         LNetInvalidateHandle(&the_lnet.ln_rc_eqh);
1196         CFS_INIT_LIST_HEAD(&the_lnet.ln_lnds);
1197         CFS_INIT_LIST_HEAD(&the_lnet.ln_rcd_zombie);
1198         CFS_INIT_LIST_HEAD(&the_lnet.ln_rcd_deathrow);
1199
1200 #ifdef __KERNEL__
1201         /* All LNDs apart from the LOLND are in separate modules.  They
1202          * register themselves when their module loads, and unregister
1203          * themselves when their module is unloaded. */
1204 #else
1205         /* Register LNDs
1206          * NB the order here determines default 'networks=' order */
1207 # ifdef CRAY_XT3
1208         LNET_REGISTER_ULND(the_ptllnd);
1209 # endif
1210 # ifdef HAVE_LIBPTHREAD
1211         LNET_REGISTER_ULND(the_tcplnd);
1212 # endif
1213 #endif
1214         lnet_register_lnd(&the_lolnd);
1215         return 0;
1216 }
1217
1218 /**
1219  * Finalize LNet library.
1220  *
1221  * Only userspace program needs to call this function. It can be called
1222  * at most once.
1223  *
1224  * \pre LNetInit() called with success.
1225  * \pre All LNet users called LNetNIFini() for matching LNetNIInit() calls.
1226  */
1227 void
1228 LNetFini(void)
1229 {
1230         LASSERT(the_lnet.ln_init);
1231         LASSERT(the_lnet.ln_refcount == 0);
1232
1233         while (!cfs_list_empty(&the_lnet.ln_lnds))
1234                 lnet_unregister_lnd(cfs_list_entry(the_lnet.ln_lnds.next,
1235                                                    lnd_t, lnd_list));
1236         lnet_destroy_locks();
1237
1238         the_lnet.ln_init = 0;
1239 }
1240
1241 /**
1242  * Set LNet PID and start LNet interfaces, routing, and forwarding.
1243  *
1244  * Userspace program should call this after a successful call to LNetInit().
1245  * Users must call this function at least once before any other functions.
1246  * For each successful call there must be a corresponding call to
1247  * LNetNIFini(). For subsequent calls to LNetNIInit(), \a requested_pid is
1248  * ignored.
1249  *
1250  * The PID used by LNet may be different from the one requested.
1251  * See LNetGetId().
1252  *
1253  * \param requested_pid PID requested by the caller.
1254  *
1255  * \return >= 0 on success, and < 0 error code on failures.
1256  */
1257 int
1258 LNetNIInit(lnet_pid_t requested_pid)
1259 {
1260         int         im_a_router = 0;
1261         int         rc;
1262
1263         LNET_MUTEX_LOCK(&the_lnet.ln_api_mutex);
1264
1265         LASSERT (the_lnet.ln_init);
1266         CDEBUG(D_OTHER, "refs %d\n", the_lnet.ln_refcount);
1267
1268         if (the_lnet.ln_refcount > 0) {
1269                 rc = the_lnet.ln_refcount++;
1270                 goto out;
1271         }
1272
1273         lnet_get_tunables();
1274
1275         if (requested_pid == LNET_PID_ANY) {
1276                 /* Don't instantiate LNET just for me */
1277                 rc = -ENETDOWN;
1278                 goto failed0;
1279         }
1280
1281         rc = lnet_prepare(requested_pid);
1282         if (rc != 0)
1283                 goto failed0;
1284
1285         rc = lnet_startup_lndnis();
1286         if (rc != 0)
1287                 goto failed1;
1288
1289         rc = lnet_parse_routes(lnet_get_routes(), &im_a_router);
1290         if (rc != 0)
1291                 goto failed2;
1292
1293         rc = lnet_check_routes();
1294         if (rc != 0)
1295                 goto failed2;
1296
1297         rc = lnet_alloc_rtrpools(im_a_router);
1298         if (rc != 0)
1299                 goto failed2;
1300
1301         rc = lnet_acceptor_start();
1302         if (rc != 0)
1303                 goto failed2;
1304
1305         the_lnet.ln_refcount = 1;
1306         /* Now I may use my own API functions... */
1307
1308         /* NB router checker needs the_lnet.ln_ping_info in
1309          * lnet_router_checker -> lnet_update_ni_status_locked */
1310         rc = lnet_ping_target_init();
1311         if (rc != 0)
1312                 goto failed3;
1313
1314         rc = lnet_router_checker_start();
1315         if (rc != 0)
1316                 goto failed4;
1317
1318         lnet_proc_init();
1319         goto out;
1320
1321  failed4:
1322         lnet_ping_target_fini();
1323  failed3:
1324         the_lnet.ln_refcount = 0;
1325         lnet_acceptor_stop();
1326  failed2:
1327         lnet_destroy_routes();
1328         lnet_shutdown_lndnis();
1329  failed1:
1330         lnet_unprepare();
1331  failed0:
1332         LASSERT (rc < 0);
1333  out:
1334         LNET_MUTEX_UNLOCK(&the_lnet.ln_api_mutex);
1335         return rc;
1336 }
1337
1338 /**
1339  * Stop LNet interfaces, routing, and forwarding.
1340  *
1341  * Users must call this function once for each successful call to LNetNIInit().
1342  * Once the LNetNIFini() operation has been started, the results of pending
1343  * API operations are undefined.
1344  *
1345  * \return always 0 for current implementation.
1346  */
1347 int
1348 LNetNIFini()
1349 {
1350         LNET_MUTEX_LOCK(&the_lnet.ln_api_mutex);
1351
1352         LASSERT (the_lnet.ln_init);
1353         LASSERT (the_lnet.ln_refcount > 0);
1354
1355         if (the_lnet.ln_refcount != 1) {
1356                 the_lnet.ln_refcount--;
1357         } else {
1358                 LASSERT (!the_lnet.ln_niinit_self);
1359
1360                 lnet_proc_fini();
1361                 lnet_router_checker_stop();
1362                 lnet_ping_target_fini();
1363
1364                 /* Teardown fns that use my own API functions BEFORE here */
1365                 the_lnet.ln_refcount = 0;
1366
1367                 lnet_acceptor_stop();
1368                 lnet_destroy_routes();
1369                 lnet_shutdown_lndnis();
1370                 lnet_unprepare();
1371         }
1372
1373         LNET_MUTEX_UNLOCK(&the_lnet.ln_api_mutex);
1374         return 0;
1375 }
1376
1377 /**
1378  * This is an ugly hack to export IOC_LIBCFS_DEBUG_PEER and
1379  * IOC_LIBCFS_PORTALS_COMPATIBILITY commands to users, by tweaking the LNet
1380  * internal ioctl handler.
1381  *
1382  * IOC_LIBCFS_PORTALS_COMPATIBILITY is now deprecated, don't use it.
1383  *
1384  * \param cmd IOC_LIBCFS_DEBUG_PEER to print debugging data about a peer.
1385  * The data will be printed to system console. Don't use it excessively.
1386  * \param arg A pointer to lnet_process_id_t, process ID of the peer.
1387  *
1388  * \return Always return 0 when called by users directly (i.e., not via ioctl).
1389  */
1390 int
1391 LNetCtl(unsigned int cmd, void *arg)
1392 {
1393         struct libcfs_ioctl_data *data = arg;
1394         lnet_process_id_t         id = {0};
1395         lnet_ni_t                *ni;
1396         int                       rc;
1397
1398         LASSERT (the_lnet.ln_init);
1399         LASSERT (the_lnet.ln_refcount > 0);
1400
1401         switch (cmd) {
1402         case IOC_LIBCFS_GET_NI:
1403                 rc = LNetGetId(data->ioc_count, &id);
1404                 data->ioc_nid = id.nid;
1405                 return rc;
1406
1407         case IOC_LIBCFS_FAIL_NID:
1408                 return lnet_fail_nid(data->ioc_nid, data->ioc_count);
1409
1410         case IOC_LIBCFS_ADD_ROUTE:
1411                 rc = lnet_add_route(data->ioc_net, data->ioc_count,
1412                                     data->ioc_nid);
1413                 return (rc != 0) ? rc : lnet_check_routes();
1414
1415         case IOC_LIBCFS_DEL_ROUTE:
1416                 return lnet_del_route(data->ioc_net, data->ioc_nid);
1417
1418         case IOC_LIBCFS_GET_ROUTE:
1419                 return lnet_get_route(data->ioc_count,
1420                                       &data->ioc_net, &data->ioc_count,
1421                                       &data->ioc_nid, &data->ioc_flags);
1422         case IOC_LIBCFS_NOTIFY_ROUTER:
1423                 return lnet_notify(NULL, data->ioc_nid, data->ioc_flags,
1424                                    cfs_time_current() -
1425                                    cfs_time_seconds(cfs_time_current_sec() -
1426                                                     (time_t)data->ioc_u64[0]));
1427
1428         case IOC_LIBCFS_PORTALS_COMPATIBILITY:
1429                 /* This can be removed once lustre stops calling it */
1430                 return 0;
1431
1432         case IOC_LIBCFS_LNET_DIST:
1433                 rc = LNetDist(data->ioc_nid, &data->ioc_nid, &data->ioc_u32[1]);
1434                 if (rc < 0 && rc != -EHOSTUNREACH)
1435                         return rc;
1436
1437                 data->ioc_u32[0] = rc;
1438                 return 0;
1439
1440         case IOC_LIBCFS_TESTPROTOCOMPAT:
1441                 LNET_LOCK();
1442                 the_lnet.ln_testprotocompat = data->ioc_flags;
1443                 LNET_UNLOCK();
1444                 return 0;
1445
1446         case IOC_LIBCFS_PING:
1447                 id.nid = data->ioc_nid;
1448                 id.pid = data->ioc_u32[0];
1449                 rc = lnet_ping(id, data->ioc_u32[1], /* timeout */
1450                                (lnet_process_id_t *)data->ioc_pbuf1,
1451                                data->ioc_plen1/sizeof(lnet_process_id_t));
1452                 if (rc < 0)
1453                         return rc;
1454                 data->ioc_count = rc;
1455                 return 0;
1456
1457         case IOC_LIBCFS_DEBUG_PEER: {
1458                 /* CAVEAT EMPTOR: this one designed for calling directly; not
1459                  * via an ioctl */
1460                 id = *((lnet_process_id_t *) arg);
1461
1462                 lnet_debug_peer(id.nid);
1463
1464                 ni = lnet_net2ni(LNET_NIDNET(id.nid));
1465                 if (ni == NULL) {
1466                         CDEBUG(D_WARNING, "No NI for %s\n", libcfs_id2str(id));
1467                 } else {
1468                         if (ni->ni_lnd->lnd_ctl == NULL) {
1469                                 CDEBUG(D_WARNING, "No ctl for %s\n",
1470                                        libcfs_id2str(id));
1471                         } else {
1472                                 (void)ni->ni_lnd->lnd_ctl(ni, cmd, arg);
1473                         }
1474
1475                         lnet_ni_decref(ni);
1476                 }
1477                 return 0;
1478         }
1479
1480         default:
1481                 ni = lnet_net2ni(data->ioc_net);
1482                 if (ni == NULL)
1483                         return -EINVAL;
1484
1485                 if (ni->ni_lnd->lnd_ctl == NULL)
1486                         rc = -EINVAL;
1487                 else
1488                         rc = ni->ni_lnd->lnd_ctl(ni, cmd, arg);
1489
1490                 lnet_ni_decref(ni);
1491                 return rc;
1492         }
1493         /* not reached */
1494 }
1495
1496 /**
1497  * Retrieve the lnet_process_id_t ID of LNet interface at \a index. Note that
1498  * all interfaces share a same PID, as requested by LNetNIInit().
1499  *
1500  * \param index Index of the interface to look up.
1501  * \param id On successful return, this location will hold the
1502  * lnet_process_id_t ID of the interface.
1503  *
1504  * \retval 0 If an interface exists at \a index.
1505  * \retval -ENOENT If no interface has been found.
1506  */
1507 int
1508 LNetGetId(unsigned int index, lnet_process_id_t *id)
1509 {
1510         lnet_ni_t        *ni;
1511         cfs_list_t       *tmp;
1512         int               rc = -ENOENT;
1513
1514         LASSERT (the_lnet.ln_init);
1515         LASSERT (the_lnet.ln_refcount > 0);
1516
1517         LNET_LOCK();
1518
1519         cfs_list_for_each(tmp, &the_lnet.ln_nis) {
1520                 if (index-- != 0)
1521                         continue;
1522
1523                 ni = cfs_list_entry(tmp, lnet_ni_t, ni_list);
1524
1525                 id->nid = ni->ni_nid;
1526                 id->pid = the_lnet.ln_pid;
1527                 rc = 0;
1528                 break;
1529         }
1530
1531         LNET_UNLOCK();
1532
1533         return rc;
1534 }
1535
1536 /**
1537  * Print a string representation of handle \a h into buffer \a str of
1538  * \a len bytes.
1539  */
1540 void
1541 LNetSnprintHandle(char *str, int len, lnet_handle_any_t h)
1542 {
1543         snprintf(str, len, LPX64, h.cookie);
1544 }
1545
1546 static int
1547 lnet_create_ping_info(void)
1548 {
1549         int               i;
1550         int               n;
1551         int               rc;
1552         unsigned int      infosz;
1553         lnet_ni_t        *ni;
1554         lnet_process_id_t id;
1555         lnet_ping_info_t *pinfo;
1556
1557         for (n = 0; ; n++) {
1558                 rc = LNetGetId(n, &id);
1559                 if (rc == -ENOENT)
1560                         break;
1561
1562                 LASSERT (rc == 0);
1563         }
1564
1565         infosz = offsetof(lnet_ping_info_t, pi_ni[n]);
1566         LIBCFS_ALLOC(pinfo, infosz);
1567         if (pinfo == NULL) {
1568                 CERROR("Can't allocate ping info[%d]\n", n);
1569                 return -ENOMEM;
1570         }
1571
1572         pinfo->pi_nnis    = n;
1573         pinfo->pi_pid     = the_lnet.ln_pid;
1574         pinfo->pi_magic   = LNET_PROTO_PING_MAGIC;
1575         pinfo->pi_version = LNET_PROTO_PING_VERSION;
1576
1577         for (i = 0; i < n; i++) {
1578                 lnet_ni_status_t *ns = &pinfo->pi_ni[i];
1579
1580                 rc = LNetGetId(i, &id);
1581                 LASSERT (rc == 0);
1582
1583                 ns->ns_nid    = id.nid;
1584                 ns->ns_status = LNET_NI_STATUS_UP;
1585
1586                 LNET_LOCK();
1587
1588                 ni = lnet_nid2ni_locked(id.nid);
1589                 LASSERT (ni != NULL);
1590                 LASSERT (ni->ni_status == NULL);
1591                 ni->ni_status = ns;
1592                 lnet_ni_decref_locked(ni);
1593
1594                 LNET_UNLOCK();
1595         }
1596
1597         the_lnet.ln_ping_info = pinfo;
1598         return 0;
1599 }
1600
1601 static void
1602 lnet_destroy_ping_info(void)
1603 {
1604         lnet_ni_t *ni;
1605
1606         LNET_LOCK();
1607
1608         cfs_list_for_each_entry (ni, &the_lnet.ln_nis, ni_list) {
1609                 ni->ni_status = NULL;
1610         }
1611
1612         LNET_UNLOCK();
1613
1614         LIBCFS_FREE(the_lnet.ln_ping_info,
1615                     offsetof(lnet_ping_info_t,
1616                              pi_ni[the_lnet.ln_ping_info->pi_nnis]));
1617         the_lnet.ln_ping_info = NULL;
1618         return;
1619 }
1620
1621 int
1622 lnet_ping_target_init(void)
1623 {
1624         lnet_md_t         md = {0};
1625         lnet_handle_me_t  meh;
1626         lnet_process_id_t id;
1627         int               rc;
1628         int               rc2;
1629         int               infosz;
1630
1631         rc = lnet_create_ping_info();
1632         if (rc != 0)
1633                 return rc;
1634
1635         /* We can have a tiny EQ since we only need to see the unlink event on
1636          * teardown, which by definition is the last one! */
1637         rc = LNetEQAlloc(2, LNET_EQ_HANDLER_NONE, &the_lnet.ln_ping_target_eq);
1638         if (rc != 0) {
1639                 CERROR("Can't allocate ping EQ: %d\n", rc);
1640                 goto failed_0;
1641         }
1642
1643         memset(&id, 0, sizeof(lnet_process_id_t));
1644         id.nid = LNET_NID_ANY;
1645         id.pid = LNET_PID_ANY;
1646
1647         rc = LNetMEAttach(LNET_RESERVED_PORTAL, id,
1648                           LNET_PROTO_PING_MATCHBITS, 0,
1649                           LNET_UNLINK, LNET_INS_AFTER,
1650                           &meh);
1651         if (rc != 0) {
1652                 CERROR("Can't create ping ME: %d\n", rc);
1653                 goto failed_1;
1654         }
1655
1656         /* initialize md content */
1657         infosz = offsetof(lnet_ping_info_t,
1658                           pi_ni[the_lnet.ln_ping_info->pi_nnis]);
1659         md.start     = the_lnet.ln_ping_info;
1660         md.length    = infosz;
1661         md.threshold = LNET_MD_THRESH_INF;
1662         md.max_size  = 0;
1663         md.options   = LNET_MD_OP_GET | LNET_MD_TRUNCATE |
1664                        LNET_MD_MANAGE_REMOTE;
1665         md.user_ptr  = NULL;
1666         md.eq_handle = the_lnet.ln_ping_target_eq;
1667
1668         rc = LNetMDAttach(meh, md,
1669                           LNET_RETAIN,
1670                           &the_lnet.ln_ping_target_md);
1671         if (rc != 0) {
1672                 CERROR("Can't attach ping MD: %d\n", rc);
1673                 goto failed_2;
1674         }
1675
1676         return 0;
1677
1678  failed_2:
1679         rc2 = LNetMEUnlink(meh);
1680         LASSERT (rc2 == 0);
1681  failed_1:
1682         rc2 = LNetEQFree(the_lnet.ln_ping_target_eq);
1683         LASSERT (rc2 == 0);
1684  failed_0:
1685         lnet_destroy_ping_info();
1686         return rc;
1687 }
1688
1689 void
1690 lnet_ping_target_fini(void)
1691 {
1692         lnet_event_t    event;
1693         int             rc;
1694         int             which;
1695         int             timeout_ms = 1000;
1696         cfs_sigset_t    blocked = cfs_block_allsigs();
1697
1698         LNetMDUnlink(the_lnet.ln_ping_target_md);
1699         /* NB md could be busy; this just starts the unlink */
1700
1701         for (;;) {
1702                 rc = LNetEQPoll(&the_lnet.ln_ping_target_eq, 1,
1703                                 timeout_ms, &event, &which);
1704
1705                 /* I expect overflow... */
1706                 LASSERT (rc >= 0 || rc == -EOVERFLOW);
1707
1708                 if (rc == 0) {
1709                         /* timed out: provide a diagnostic */
1710                         CWARN("Still waiting for ping MD to unlink\n");
1711                         timeout_ms *= 2;
1712                         continue;
1713                 }
1714
1715                 /* Got a valid event */
1716                 if (event.unlinked)
1717                         break;
1718         }
1719
1720         rc = LNetEQFree(the_lnet.ln_ping_target_eq);
1721         LASSERT (rc == 0);
1722         lnet_destroy_ping_info();
1723         cfs_restore_sigs(blocked);
1724 }
1725
1726 int
1727 lnet_ping (lnet_process_id_t id, int timeout_ms, lnet_process_id_t *ids, int n_ids)
1728 {
1729         lnet_handle_eq_t     eqh;
1730         lnet_handle_md_t     mdh;
1731         lnet_event_t         event;
1732         lnet_md_t            md = {0};
1733         int                  which;
1734         int                  unlinked = 0;
1735         int                  replied = 0;
1736         const int            a_long_time = 60000; /* mS */
1737         int                  infosz = offsetof(lnet_ping_info_t, pi_ni[n_ids]);
1738         lnet_ping_info_t    *info;
1739         lnet_process_id_t    tmpid;
1740         int                  i;
1741         int                  nob;
1742         int                  rc;
1743         int                  rc2;
1744         cfs_sigset_t         blocked;
1745
1746         if (n_ids <= 0 ||
1747             id.nid == LNET_NID_ANY ||
1748             timeout_ms > 500000 ||              /* arbitrary limit! */
1749             n_ids > 20)                         /* arbitrary limit! */
1750                 return -EINVAL;
1751
1752         if (id.pid == LNET_PID_ANY)
1753                 id.pid = LUSTRE_SRV_LNET_PID;
1754
1755         LIBCFS_ALLOC(info, infosz);
1756         if (info == NULL)
1757                 return -ENOMEM;
1758
1759         /* NB 2 events max (including any unlink event) */
1760         rc = LNetEQAlloc(2, LNET_EQ_HANDLER_NONE, &eqh);
1761         if (rc != 0) {
1762                 CERROR("Can't allocate EQ: %d\n", rc);
1763                 goto out_0;
1764         }
1765
1766         /* initialize md content */
1767         md.start     = info;
1768         md.length    = infosz;
1769         md.threshold = 2; /*GET/REPLY*/
1770         md.max_size  = 0;
1771         md.options   = LNET_MD_TRUNCATE;
1772         md.user_ptr  = NULL;
1773         md.eq_handle = eqh;
1774
1775         rc = LNetMDBind(md, LNET_UNLINK, &mdh);
1776         if (rc != 0) {
1777                 CERROR("Can't bind MD: %d\n", rc);
1778                 goto out_1;
1779         }
1780
1781         rc = LNetGet(LNET_NID_ANY, mdh, id,
1782                      LNET_RESERVED_PORTAL,
1783                      LNET_PROTO_PING_MATCHBITS, 0);
1784
1785         if (rc != 0) {
1786                 /* Don't CERROR; this could be deliberate! */
1787
1788                 rc2 = LNetMDUnlink(mdh);
1789                 LASSERT (rc2 == 0);
1790
1791                 /* NB must wait for the UNLINK event below... */
1792                 unlinked = 1;
1793                 timeout_ms = a_long_time;
1794         }
1795
1796         do {
1797                 /* MUST block for unlink to complete */
1798                 if (unlinked)
1799                         blocked = cfs_block_allsigs();
1800
1801                 rc2 = LNetEQPoll(&eqh, 1, timeout_ms, &event, &which);
1802
1803                 if (unlinked)
1804                         cfs_restore_sigs(blocked);
1805
1806                 CDEBUG(D_NET, "poll %d(%d %d)%s\n", rc2,
1807                        (rc2 <= 0) ? -1 : event.type,
1808                        (rc2 <= 0) ? -1 : event.status,
1809                        (rc2 > 0 && event.unlinked) ? " unlinked" : "");
1810
1811                 LASSERT (rc2 != -EOVERFLOW);     /* can't miss anything */
1812
1813                 if (rc2 <= 0 || event.status != 0) {
1814                         /* timeout or error */
1815                         if (!replied && rc == 0)
1816                                 rc = (rc2 < 0) ? rc2 :
1817                                      (rc2 == 0) ? -ETIMEDOUT :
1818                                      event.status;
1819
1820                         if (!unlinked) {
1821                                 /* Ensure completion in finite time... */
1822                                 LNetMDUnlink(mdh);
1823                                 /* No assertion (racing with network) */
1824                                 unlinked = 1;
1825                                 timeout_ms = a_long_time;
1826                         } else if (rc2 == 0) {
1827                                 /* timed out waiting for unlink */
1828                                 CWARN("ping %s: late network completion\n",
1829                                       libcfs_id2str(id));
1830                         }
1831                 } else if (event.type == LNET_EVENT_REPLY) {
1832                         replied = 1;
1833                         rc = event.mlength;
1834                 }
1835
1836         } while (rc2 <= 0 || !event.unlinked);
1837
1838         if (!replied) {
1839                 if (rc >= 0)
1840                         CWARN("%s: Unexpected rc >= 0 but no reply!\n",
1841                               libcfs_id2str(id));
1842                 rc = -EIO;
1843                 goto out_1;
1844         }
1845
1846         nob = rc;
1847         LASSERT (nob >= 0 && nob <= infosz);
1848
1849         rc = -EPROTO;                           /* if I can't parse... */
1850
1851         if (nob < 8) {
1852                 /* can't check magic/version */
1853                 CERROR("%s: ping info too short %d\n",
1854                        libcfs_id2str(id), nob);
1855                 goto out_1;
1856         }
1857
1858         if (info->pi_magic == __swab32(LNET_PROTO_PING_MAGIC)) {
1859                 lnet_swap_pinginfo(info);
1860         } else if (info->pi_magic != LNET_PROTO_PING_MAGIC) {
1861                 CERROR("%s: Unexpected magic %08x\n", 
1862                        libcfs_id2str(id), info->pi_magic);
1863                 goto out_1;
1864         }
1865
1866         if (info->pi_version != LNET_PROTO_PING_VERSION) {
1867                 CERROR("%s: Unexpected version 0x%x\n",
1868                        libcfs_id2str(id), info->pi_version);
1869                 goto out_1;
1870         }
1871
1872         if (nob < offsetof(lnet_ping_info_t, pi_ni[0])) {
1873                 CERROR("%s: Short reply %d(%d min)\n", libcfs_id2str(id),
1874                        nob, (int)offsetof(lnet_ping_info_t, pi_ni[0]));
1875                 goto out_1;
1876         }
1877
1878         if (info->pi_nnis < n_ids)
1879                 n_ids = info->pi_nnis;
1880
1881         if (nob < offsetof(lnet_ping_info_t, pi_ni[n_ids])) {
1882                 CERROR("%s: Short reply %d(%d expected)\n", libcfs_id2str(id),
1883                        nob, (int)offsetof(lnet_ping_info_t, pi_ni[n_ids]));
1884                 goto out_1;
1885         }
1886
1887         rc = -EFAULT;                           /* If I SEGV... */
1888
1889         for (i = 0; i < n_ids; i++) {
1890                 tmpid.pid = info->pi_pid;
1891                 tmpid.nid = info->pi_ni[i].ns_nid;
1892 #ifdef __KERNEL__
1893                 if (cfs_copy_to_user(&ids[i], &tmpid, sizeof(tmpid)))
1894                         goto out_1;
1895 #else
1896                 ids[i] = tmpid;
1897 #endif
1898         }
1899         rc = info->pi_nnis;
1900
1901  out_1:
1902         rc2 = LNetEQFree(eqh);
1903         if (rc2 != 0)
1904                 CERROR("rc2 %d\n", rc2);
1905         LASSERT (rc2 == 0);
1906
1907  out_0:
1908         LIBCFS_FREE(info, infosz);
1909         return rc;
1910 }