Whamcloud - gitweb
LU-9120 lnet: handle local ni failure
[fs/lustre-release.git] / lnet / lnet / api-ni.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  */
32
33 #define DEBUG_SUBSYSTEM S_LNET
34
35 #include <linux/ctype.h>
36 #include <linux/log2.h>
37 #include <linux/ktime.h>
38 #include <linux/moduleparam.h>
39 #include <linux/uaccess.h>
40
41 #include <lnet/lib-lnet.h>
42
43 #define D_LNI D_CONSOLE
44
45 /*
46  * initialize ln_api_mutex statically, since it needs to be used in
47  * discovery_set callback. That module parameter callback can be called
48  * before module init completes. The mutex needs to be ready for use then.
49  */
50 struct lnet the_lnet = {
51         .ln_api_mutex = __MUTEX_INITIALIZER(the_lnet.ln_api_mutex),
52 };              /* THE state of the network */
53 EXPORT_SYMBOL(the_lnet);
54
55 static char *ip2nets = "";
56 module_param(ip2nets, charp, 0444);
57 MODULE_PARM_DESC(ip2nets, "LNET network <- IP table");
58
59 static char *networks = "";
60 module_param(networks, charp, 0444);
61 MODULE_PARM_DESC(networks, "local networks");
62
63 static char *routes = "";
64 module_param(routes, charp, 0444);
65 MODULE_PARM_DESC(routes, "routes to non-local networks");
66
67 static int rnet_htable_size = LNET_REMOTE_NETS_HASH_DEFAULT;
68 module_param(rnet_htable_size, int, 0444);
69 MODULE_PARM_DESC(rnet_htable_size, "size of remote network hash table");
70
71 static int use_tcp_bonding = false;
72 module_param(use_tcp_bonding, int, 0444);
73 MODULE_PARM_DESC(use_tcp_bonding,
74                  "Set to 1 to use socklnd bonding. 0 to use Multi-Rail");
75
76 unsigned int lnet_numa_range = 0;
77 module_param(lnet_numa_range, uint, 0444);
78 MODULE_PARM_DESC(lnet_numa_range,
79                 "NUMA range to consider during Multi-Rail selection");
80
81 /*
82  * lnet_health_sensitivity determines by how much we decrement the health
83  * value on sending error. The value defaults to 0, which means health
84  * checking is turned off by default.
85  */
86 unsigned int lnet_health_sensitivity = 0;
87 static int sensitivity_set(const char *val, cfs_kernel_param_arg_t *kp);
88 static struct kernel_param_ops param_ops_health_sensitivity = {
89         .set = sensitivity_set,
90         .get = param_get_int,
91 };
92 #define param_check_health_sensitivity(name, p) \
93                 __param_check(name, p, int)
94 #ifdef HAVE_KERNEL_PARAM_OPS
95 module_param(lnet_health_sensitivity, health_sensitivity, S_IRUGO|S_IWUSR);
96 #else
97 module_param_call(lnet_health_sensitivity, sensitivity_set, param_get_int,
98                   &lnet_health_sensitivity, S_IRUGO|S_IWUSR);
99 #endif
100 MODULE_PARM_DESC(lnet_health_sensitivity,
101                 "Value to decrement the health value by on error");
102
103 static int lnet_interfaces_max = LNET_INTERFACES_MAX_DEFAULT;
104 static int intf_max_set(const char *val, cfs_kernel_param_arg_t *kp);
105
106 static struct kernel_param_ops param_ops_interfaces_max = {
107         .set = intf_max_set,
108         .get = param_get_int,
109 };
110
111 #define param_check_interfaces_max(name, p) \
112                 __param_check(name, p, int)
113
114 #ifdef HAVE_KERNEL_PARAM_OPS
115 module_param(lnet_interfaces_max, interfaces_max, 0644);
116 #else
117 module_param_call(lnet_interfaces_max, intf_max_set, param_get_int,
118                   &param_ops_interfaces_max, 0644);
119 #endif
120 MODULE_PARM_DESC(lnet_interfaces_max,
121                 "Maximum number of interfaces in a node.");
122
123 unsigned lnet_peer_discovery_disabled = 0;
124 static int discovery_set(const char *val, cfs_kernel_param_arg_t *kp);
125
126 static struct kernel_param_ops param_ops_discovery_disabled = {
127         .set = discovery_set,
128         .get = param_get_int,
129 };
130
131 #define param_check_discovery_disabled(name, p) \
132                 __param_check(name, p, int)
133 #ifdef HAVE_KERNEL_PARAM_OPS
134 module_param(lnet_peer_discovery_disabled, discovery_disabled, 0644);
135 #else
136 module_param_call(lnet_peer_discovery_disabled, discovery_set, param_get_int,
137                   &param_ops_discovery_disabled, 0644);
138 #endif
139 MODULE_PARM_DESC(lnet_peer_discovery_disabled,
140                 "Set to 1 to disable peer discovery on this node.");
141
142 unsigned lnet_transaction_timeout = 5;
143 module_param(lnet_transaction_timeout, uint, 0444);
144 MODULE_PARM_DESC(lnet_transaction_timeout,
145                 "Time in seconds to wait for a REPLY or an ACK");
146
147 /*
148  * This sequence number keeps track of how many times DLC was used to
149  * update the local NIs. It is incremented when a NI is added or
150  * removed and checked when sending a message to determine if there is
151  * a need to re-run the selection algorithm. See lnet_select_pathway()
152  * for more details on its usage.
153  */
154 static atomic_t lnet_dlc_seq_no = ATOMIC_INIT(0);
155
156 static int lnet_ping(struct lnet_process_id id, signed long timeout,
157                      struct lnet_process_id __user *ids, int n_ids);
158
159 static int lnet_discover(struct lnet_process_id id, __u32 force,
160                          struct lnet_process_id __user *ids, int n_ids);
161
162 static int
163 sensitivity_set(const char *val, cfs_kernel_param_arg_t *kp)
164 {
165         int rc;
166         unsigned *sensitivity = (unsigned *)kp->arg;
167         unsigned long value;
168
169         rc = kstrtoul(val, 0, &value);
170         if (rc) {
171                 CERROR("Invalid module parameter value for 'lnet_health_sensitivity'\n");
172                 return rc;
173         }
174
175         /*
176          * The purpose of locking the api_mutex here is to ensure that
177          * the correct value ends up stored properly.
178          */
179         mutex_lock(&the_lnet.ln_api_mutex);
180
181         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
182                 mutex_unlock(&the_lnet.ln_api_mutex);
183                 return 0;
184         }
185
186         if (value == *sensitivity) {
187                 mutex_unlock(&the_lnet.ln_api_mutex);
188                 return 0;
189         }
190
191         *sensitivity = value;
192
193         mutex_unlock(&the_lnet.ln_api_mutex);
194
195         return 0;
196 }
197
198 static int
199 discovery_set(const char *val, cfs_kernel_param_arg_t *kp)
200 {
201         int rc;
202         unsigned *discovery = (unsigned *)kp->arg;
203         unsigned long value;
204         struct lnet_ping_buffer *pbuf;
205
206         rc = kstrtoul(val, 0, &value);
207         if (rc) {
208                 CERROR("Invalid module parameter value for 'lnet_peer_discovery_disabled'\n");
209                 return rc;
210         }
211
212         value = (value) ? 1 : 0;
213
214         /*
215          * The purpose of locking the api_mutex here is to ensure that
216          * the correct value ends up stored properly.
217          */
218         mutex_lock(&the_lnet.ln_api_mutex);
219
220         if (value == *discovery) {
221                 mutex_unlock(&the_lnet.ln_api_mutex);
222                 return 0;
223         }
224
225         *discovery = value;
226
227         if (the_lnet.ln_state != LNET_STATE_RUNNING) {
228                 mutex_unlock(&the_lnet.ln_api_mutex);
229                 return 0;
230         }
231
232         /* tell peers that discovery setting has changed */
233         lnet_net_lock(LNET_LOCK_EX);
234         pbuf = the_lnet.ln_ping_target;
235         if (value)
236                 pbuf->pb_info.pi_features &= ~LNET_PING_FEAT_DISCOVERY;
237         else
238                 pbuf->pb_info.pi_features |= LNET_PING_FEAT_DISCOVERY;
239         lnet_net_unlock(LNET_LOCK_EX);
240
241         lnet_push_update_to_peers(1);
242
243         mutex_unlock(&the_lnet.ln_api_mutex);
244
245         return 0;
246 }
247
248 static int
249 intf_max_set(const char *val, cfs_kernel_param_arg_t *kp)
250 {
251         int value, rc;
252
253         rc = kstrtoint(val, 0, &value);
254         if (rc) {
255                 CERROR("Invalid module parameter value for 'lnet_interfaces_max'\n");
256                 return rc;
257         }
258
259         if (value < LNET_INTERFACES_MIN) {
260                 CWARN("max interfaces provided are too small, setting to %d\n",
261                       LNET_INTERFACES_MAX_DEFAULT);
262                 value = LNET_INTERFACES_MAX_DEFAULT;
263         }
264
265         *(int *)kp->arg = value;
266
267         return 0;
268 }
269
270 static char *
271 lnet_get_routes(void)
272 {
273         return routes;
274 }
275
276 static char *
277 lnet_get_networks(void)
278 {
279         char   *nets;
280         int     rc;
281
282         if (*networks != 0 && *ip2nets != 0) {
283                 LCONSOLE_ERROR_MSG(0x101, "Please specify EITHER 'networks' or "
284                                    "'ip2nets' but not both at once\n");
285                 return NULL;
286         }
287
288         if (*ip2nets != 0) {
289                 rc = lnet_parse_ip2nets(&nets, ip2nets);
290                 return (rc == 0) ? nets : NULL;
291         }
292
293         if (*networks != 0)
294                 return networks;
295
296         return "tcp";
297 }
298
299 static void
300 lnet_init_locks(void)
301 {
302         spin_lock_init(&the_lnet.ln_eq_wait_lock);
303         spin_lock_init(&the_lnet.ln_msg_resend_lock);
304         init_waitqueue_head(&the_lnet.ln_eq_waitq);
305         init_waitqueue_head(&the_lnet.ln_mt_waitq);
306         mutex_init(&the_lnet.ln_lnd_mutex);
307 }
308
309 static void
310 lnet_fini_locks(void)
311 {
312 }
313
314 struct kmem_cache *lnet_mes_cachep;        /* MEs kmem_cache */
315 struct kmem_cache *lnet_small_mds_cachep;  /* <= LNET_SMALL_MD_SIZE bytes
316                                             *  MDs kmem_cache */
317
318 static int
319 lnet_descriptor_setup(void)
320 {
321         /* create specific kmem_cache for MEs and small MDs (i.e., originally
322          * allocated in <size-xxx> kmem_cache).
323          */
324         lnet_mes_cachep = kmem_cache_create("lnet_MEs", sizeof(struct lnet_me),
325                                             0, 0, NULL);
326         if (!lnet_mes_cachep)
327                 return -ENOMEM;
328
329         lnet_small_mds_cachep = kmem_cache_create("lnet_small_MDs",
330                                                   LNET_SMALL_MD_SIZE, 0, 0,
331                                                   NULL);
332         if (!lnet_small_mds_cachep)
333                 return -ENOMEM;
334
335         return 0;
336 }
337
338 static void
339 lnet_descriptor_cleanup(void)
340 {
341
342         if (lnet_small_mds_cachep) {
343                 kmem_cache_destroy(lnet_small_mds_cachep);
344                 lnet_small_mds_cachep = NULL;
345         }
346
347         if (lnet_mes_cachep) {
348                 kmem_cache_destroy(lnet_mes_cachep);
349                 lnet_mes_cachep = NULL;
350         }
351 }
352
353 static int
354 lnet_create_remote_nets_table(void)
355 {
356         int               i;
357         struct list_head *hash;
358
359         LASSERT(the_lnet.ln_remote_nets_hash == NULL);
360         LASSERT(the_lnet.ln_remote_nets_hbits > 0);
361         LIBCFS_ALLOC(hash, LNET_REMOTE_NETS_HASH_SIZE * sizeof(*hash));
362         if (hash == NULL) {
363                 CERROR("Failed to create remote nets hash table\n");
364                 return -ENOMEM;
365         }
366
367         for (i = 0; i < LNET_REMOTE_NETS_HASH_SIZE; i++)
368                 INIT_LIST_HEAD(&hash[i]);
369         the_lnet.ln_remote_nets_hash = hash;
370         return 0;
371 }
372
373 static void
374 lnet_destroy_remote_nets_table(void)
375 {
376         int i;
377
378         if (the_lnet.ln_remote_nets_hash == NULL)
379                 return;
380
381         for (i = 0; i < LNET_REMOTE_NETS_HASH_SIZE; i++)
382                 LASSERT(list_empty(&the_lnet.ln_remote_nets_hash[i]));
383
384         LIBCFS_FREE(the_lnet.ln_remote_nets_hash,
385                     LNET_REMOTE_NETS_HASH_SIZE *
386                     sizeof(the_lnet.ln_remote_nets_hash[0]));
387         the_lnet.ln_remote_nets_hash = NULL;
388 }
389
390 static void
391 lnet_destroy_locks(void)
392 {
393         if (the_lnet.ln_res_lock != NULL) {
394                 cfs_percpt_lock_free(the_lnet.ln_res_lock);
395                 the_lnet.ln_res_lock = NULL;
396         }
397
398         if (the_lnet.ln_net_lock != NULL) {
399                 cfs_percpt_lock_free(the_lnet.ln_net_lock);
400                 the_lnet.ln_net_lock = NULL;
401         }
402
403         lnet_fini_locks();
404 }
405
406 static int
407 lnet_create_locks(void)
408 {
409         lnet_init_locks();
410
411         the_lnet.ln_res_lock = cfs_percpt_lock_alloc(lnet_cpt_table());
412         if (the_lnet.ln_res_lock == NULL)
413                 goto failed;
414
415         the_lnet.ln_net_lock = cfs_percpt_lock_alloc(lnet_cpt_table());
416         if (the_lnet.ln_net_lock == NULL)
417                 goto failed;
418
419         return 0;
420
421  failed:
422         lnet_destroy_locks();
423         return -ENOMEM;
424 }
425
426 static void lnet_assert_wire_constants(void)
427 {
428         /* Wire protocol assertions generated by 'wirecheck'
429          * running on Linux robert.bartonsoftware.com 2.6.8-1.521
430          * #1 Mon Aug 16 09:01:18 EDT 2004 i686 athlon i386 GNU/Linux
431          * with gcc version 3.3.3 20040412 (Red Hat Linux 3.3.3-7) */
432
433         /* Constants... */
434         CLASSERT(LNET_PROTO_TCP_MAGIC == 0xeebc0ded);
435         CLASSERT(LNET_PROTO_TCP_VERSION_MAJOR == 1);
436         CLASSERT(LNET_PROTO_TCP_VERSION_MINOR == 0);
437         CLASSERT(LNET_MSG_ACK == 0);
438         CLASSERT(LNET_MSG_PUT == 1);
439         CLASSERT(LNET_MSG_GET == 2);
440         CLASSERT(LNET_MSG_REPLY == 3);
441         CLASSERT(LNET_MSG_HELLO == 4);
442
443         /* Checks for struct lnet_handle_wire */
444         CLASSERT((int)sizeof(struct lnet_handle_wire) == 16);
445         CLASSERT((int)offsetof(struct lnet_handle_wire, wh_interface_cookie) == 0);
446         CLASSERT((int)sizeof(((struct lnet_handle_wire *)0)->wh_interface_cookie) == 8);
447         CLASSERT((int)offsetof(struct lnet_handle_wire, wh_object_cookie) == 8);
448         CLASSERT((int)sizeof(((struct lnet_handle_wire *)0)->wh_object_cookie) == 8);
449
450         /* Checks for struct struct lnet_magicversion */
451         CLASSERT((int)sizeof(struct lnet_magicversion) == 8);
452         CLASSERT((int)offsetof(struct lnet_magicversion, magic) == 0);
453         CLASSERT((int)sizeof(((struct lnet_magicversion *)0)->magic) == 4);
454         CLASSERT((int)offsetof(struct lnet_magicversion, version_major) == 4);
455         CLASSERT((int)sizeof(((struct lnet_magicversion *)0)->version_major) == 2);
456         CLASSERT((int)offsetof(struct lnet_magicversion, version_minor) == 6);
457         CLASSERT((int)sizeof(((struct lnet_magicversion *)0)->version_minor) == 2);
458
459         /* Checks for struct struct lnet_hdr */
460         CLASSERT((int)sizeof(struct lnet_hdr) == 72);
461         CLASSERT((int)offsetof(struct lnet_hdr, dest_nid) == 0);
462         CLASSERT((int)sizeof(((struct lnet_hdr *)0)->dest_nid) == 8);
463         CLASSERT((int)offsetof(struct lnet_hdr, src_nid) == 8);
464         CLASSERT((int)sizeof(((struct lnet_hdr *)0)->src_nid) == 8);
465         CLASSERT((int)offsetof(struct lnet_hdr, dest_pid) == 16);
466         CLASSERT((int)sizeof(((struct lnet_hdr *)0)->dest_pid) == 4);
467         CLASSERT((int)offsetof(struct lnet_hdr, src_pid) == 20);
468         CLASSERT((int)sizeof(((struct lnet_hdr *)0)->src_pid) == 4);
469         CLASSERT((int)offsetof(struct lnet_hdr, type) == 24);
470         CLASSERT((int)sizeof(((struct lnet_hdr *)0)->type) == 4);
471         CLASSERT((int)offsetof(struct lnet_hdr, payload_length) == 28);
472         CLASSERT((int)sizeof(((struct lnet_hdr *)0)->payload_length) == 4);
473         CLASSERT((int)offsetof(struct lnet_hdr, msg) == 32);
474         CLASSERT((int)sizeof(((struct lnet_hdr *)0)->msg) == 40);
475
476         /* Ack */
477         CLASSERT((int)offsetof(struct lnet_hdr, msg.ack.dst_wmd) == 32);
478         CLASSERT((int)sizeof(((struct lnet_hdr *)0)->msg.ack.dst_wmd) == 16);
479         CLASSERT((int)offsetof(struct lnet_hdr, msg.ack.match_bits) == 48);
480         CLASSERT((int)sizeof(((struct lnet_hdr *)0)->msg.ack.match_bits) == 8);
481         CLASSERT((int)offsetof(struct lnet_hdr, msg.ack.mlength) == 56);
482         CLASSERT((int)sizeof(((struct lnet_hdr *)0)->msg.ack.mlength) == 4);
483
484         /* Put */
485         CLASSERT((int)offsetof(struct lnet_hdr, msg.put.ack_wmd) == 32);
486         CLASSERT((int)sizeof(((struct lnet_hdr *)0)->msg.put.ack_wmd) == 16);
487         CLASSERT((int)offsetof(struct lnet_hdr, msg.put.match_bits) == 48);
488         CLASSERT((int)sizeof(((struct lnet_hdr *)0)->msg.put.match_bits) == 8);
489         CLASSERT((int)offsetof(struct lnet_hdr, msg.put.hdr_data) == 56);
490         CLASSERT((int)sizeof(((struct lnet_hdr *)0)->msg.put.hdr_data) == 8);
491         CLASSERT((int)offsetof(struct lnet_hdr, msg.put.ptl_index) == 64);
492         CLASSERT((int)sizeof(((struct lnet_hdr *)0)->msg.put.ptl_index) == 4);
493         CLASSERT((int)offsetof(struct lnet_hdr, msg.put.offset) == 68);
494         CLASSERT((int)sizeof(((struct lnet_hdr *)0)->msg.put.offset) == 4);
495
496         /* Get */
497         CLASSERT((int)offsetof(struct lnet_hdr, msg.get.return_wmd) == 32);
498         CLASSERT((int)sizeof(((struct lnet_hdr *)0)->msg.get.return_wmd) == 16);
499         CLASSERT((int)offsetof(struct lnet_hdr, msg.get.match_bits) == 48);
500         CLASSERT((int)sizeof(((struct lnet_hdr *)0)->msg.get.match_bits) == 8);
501         CLASSERT((int)offsetof(struct lnet_hdr, msg.get.ptl_index) == 56);
502         CLASSERT((int)sizeof(((struct lnet_hdr *)0)->msg.get.ptl_index) == 4);
503         CLASSERT((int)offsetof(struct lnet_hdr, msg.get.src_offset) == 60);
504         CLASSERT((int)sizeof(((struct lnet_hdr *)0)->msg.get.src_offset) == 4);
505         CLASSERT((int)offsetof(struct lnet_hdr, msg.get.sink_length) == 64);
506         CLASSERT((int)sizeof(((struct lnet_hdr *)0)->msg.get.sink_length) == 4);
507
508         /* Reply */
509         CLASSERT((int)offsetof(struct lnet_hdr, msg.reply.dst_wmd) == 32);
510         CLASSERT((int)sizeof(((struct lnet_hdr *)0)->msg.reply.dst_wmd) == 16);
511
512         /* Hello */
513         CLASSERT((int)offsetof(struct lnet_hdr, msg.hello.incarnation) == 32);
514         CLASSERT((int)sizeof(((struct lnet_hdr *)0)->msg.hello.incarnation) == 8);
515         CLASSERT((int)offsetof(struct lnet_hdr, msg.hello.type) == 40);
516         CLASSERT((int)sizeof(((struct lnet_hdr *)0)->msg.hello.type) == 4);
517
518         /* Checks for struct lnet_ni_status and related constants */
519         CLASSERT(LNET_NI_STATUS_INVALID == 0x00000000);
520         CLASSERT(LNET_NI_STATUS_UP == 0x15aac0de);
521         CLASSERT(LNET_NI_STATUS_DOWN == 0xdeadface);
522
523         /* Checks for struct lnet_ni_status */
524         CLASSERT((int)sizeof(struct lnet_ni_status) == 16);
525         CLASSERT((int)offsetof(struct lnet_ni_status, ns_nid) == 0);
526         CLASSERT((int)sizeof(((struct lnet_ni_status *)0)->ns_nid) == 8);
527         CLASSERT((int)offsetof(struct lnet_ni_status, ns_status) == 8);
528         CLASSERT((int)sizeof(((struct lnet_ni_status *)0)->ns_status) == 4);
529         CLASSERT((int)offsetof(struct lnet_ni_status, ns_unused) == 12);
530         CLASSERT((int)sizeof(((struct lnet_ni_status *)0)->ns_unused) == 4);
531
532         /* Checks for struct lnet_ping_info and related constants */
533         CLASSERT(LNET_PROTO_PING_MAGIC == 0x70696E67);
534         CLASSERT(LNET_PING_FEAT_INVAL == 0);
535         CLASSERT(LNET_PING_FEAT_BASE == 1);
536         CLASSERT(LNET_PING_FEAT_NI_STATUS == 2);
537         CLASSERT(LNET_PING_FEAT_RTE_DISABLED == 4);
538         CLASSERT(LNET_PING_FEAT_MULTI_RAIL == 8);
539         CLASSERT(LNET_PING_FEAT_DISCOVERY == 16);
540         CLASSERT(LNET_PING_FEAT_BITS == 31);
541
542         /* Checks for struct lnet_ping_info */
543         CLASSERT((int)sizeof(struct lnet_ping_info) == 16);
544         CLASSERT((int)offsetof(struct lnet_ping_info, pi_magic) == 0);
545         CLASSERT((int)sizeof(((struct lnet_ping_info *)0)->pi_magic) == 4);
546         CLASSERT((int)offsetof(struct lnet_ping_info, pi_features) == 4);
547         CLASSERT((int)sizeof(((struct lnet_ping_info *)0)->pi_features) == 4);
548         CLASSERT((int)offsetof(struct lnet_ping_info, pi_pid) == 8);
549         CLASSERT((int)sizeof(((struct lnet_ping_info *)0)->pi_pid) == 4);
550         CLASSERT((int)offsetof(struct lnet_ping_info, pi_nnis) == 12);
551         CLASSERT((int)sizeof(((struct lnet_ping_info *)0)->pi_nnis) == 4);
552         CLASSERT((int)offsetof(struct lnet_ping_info, pi_ni) == 16);
553         CLASSERT((int)sizeof(((struct lnet_ping_info *)0)->pi_ni) == 0);
554 }
555
556 static struct lnet_lnd *lnet_find_lnd_by_type(__u32 type)
557 {
558         struct lnet_lnd *lnd;
559         struct list_head *tmp;
560
561         /* holding lnd mutex */
562         list_for_each(tmp, &the_lnet.ln_lnds) {
563                 lnd = list_entry(tmp, struct lnet_lnd, lnd_list);
564
565                 if (lnd->lnd_type == type)
566                         return lnd;
567         }
568         return NULL;
569 }
570
571 void
572 lnet_register_lnd(struct lnet_lnd *lnd)
573 {
574         mutex_lock(&the_lnet.ln_lnd_mutex);
575
576         LASSERT(libcfs_isknown_lnd(lnd->lnd_type));
577         LASSERT(lnet_find_lnd_by_type(lnd->lnd_type) == NULL);
578
579         list_add_tail(&lnd->lnd_list, &the_lnet.ln_lnds);
580         lnd->lnd_refcount = 0;
581
582         CDEBUG(D_NET, "%s LND registered\n", libcfs_lnd2str(lnd->lnd_type));
583
584         mutex_unlock(&the_lnet.ln_lnd_mutex);
585 }
586 EXPORT_SYMBOL(lnet_register_lnd);
587
588 void
589 lnet_unregister_lnd(struct lnet_lnd *lnd)
590 {
591         mutex_lock(&the_lnet.ln_lnd_mutex);
592
593         LASSERT(lnet_find_lnd_by_type(lnd->lnd_type) == lnd);
594         LASSERT(lnd->lnd_refcount == 0);
595
596         list_del(&lnd->lnd_list);
597         CDEBUG(D_NET, "%s LND unregistered\n", libcfs_lnd2str(lnd->lnd_type));
598
599         mutex_unlock(&the_lnet.ln_lnd_mutex);
600 }
601 EXPORT_SYMBOL(lnet_unregister_lnd);
602
603 void
604 lnet_counters_get(struct lnet_counters *counters)
605 {
606         struct lnet_counters *ctr;
607         int             i;
608
609         memset(counters, 0, sizeof(*counters));
610
611         lnet_net_lock(LNET_LOCK_EX);
612
613         cfs_percpt_for_each(ctr, i, the_lnet.ln_counters) {
614                 counters->msgs_max     += ctr->msgs_max;
615                 counters->msgs_alloc   += ctr->msgs_alloc;
616                 counters->errors       += ctr->errors;
617                 counters->send_count   += ctr->send_count;
618                 counters->recv_count   += ctr->recv_count;
619                 counters->route_count  += ctr->route_count;
620                 counters->drop_count   += ctr->drop_count;
621                 counters->send_length  += ctr->send_length;
622                 counters->recv_length  += ctr->recv_length;
623                 counters->route_length += ctr->route_length;
624                 counters->drop_length  += ctr->drop_length;
625
626         }
627         lnet_net_unlock(LNET_LOCK_EX);
628 }
629 EXPORT_SYMBOL(lnet_counters_get);
630
631 void
632 lnet_counters_reset(void)
633 {
634         struct lnet_counters *counters;
635         int             i;
636
637         lnet_net_lock(LNET_LOCK_EX);
638
639         cfs_percpt_for_each(counters, i, the_lnet.ln_counters)
640                 memset(counters, 0, sizeof(struct lnet_counters));
641
642         lnet_net_unlock(LNET_LOCK_EX);
643 }
644
645 static char *
646 lnet_res_type2str(int type)
647 {
648         switch (type) {
649         default:
650                 LBUG();
651         case LNET_COOKIE_TYPE_MD:
652                 return "MD";
653         case LNET_COOKIE_TYPE_ME:
654                 return "ME";
655         case LNET_COOKIE_TYPE_EQ:
656                 return "EQ";
657         }
658 }
659
660 static void
661 lnet_res_container_cleanup(struct lnet_res_container *rec)
662 {
663         int     count = 0;
664
665         if (rec->rec_type == 0) /* not set yet, it's uninitialized */
666                 return;
667
668         while (!list_empty(&rec->rec_active)) {
669                 struct list_head *e = rec->rec_active.next;
670
671                 list_del_init(e);
672                 if (rec->rec_type == LNET_COOKIE_TYPE_EQ) {
673                         lnet_eq_free(list_entry(e, struct lnet_eq, eq_list));
674
675                 } else if (rec->rec_type == LNET_COOKIE_TYPE_MD) {
676                         lnet_md_free(list_entry(e, struct lnet_libmd, md_list));
677
678                 } else { /* NB: Active MEs should be attached on portals */
679                         LBUG();
680                 }
681                 count++;
682         }
683
684         if (count > 0) {
685                 /* Found alive MD/ME/EQ, user really should unlink/free
686                  * all of them before finalize LNet, but if someone didn't,
687                  * we have to recycle garbage for him */
688                 CERROR("%d active elements on exit of %s container\n",
689                        count, lnet_res_type2str(rec->rec_type));
690         }
691
692         if (rec->rec_lh_hash != NULL) {
693                 LIBCFS_FREE(rec->rec_lh_hash,
694                             LNET_LH_HASH_SIZE * sizeof(rec->rec_lh_hash[0]));
695                 rec->rec_lh_hash = NULL;
696         }
697
698         rec->rec_type = 0; /* mark it as finalized */
699 }
700
701 static int
702 lnet_res_container_setup(struct lnet_res_container *rec, int cpt, int type)
703 {
704         int     rc = 0;
705         int     i;
706
707         LASSERT(rec->rec_type == 0);
708
709         rec->rec_type = type;
710         INIT_LIST_HEAD(&rec->rec_active);
711
712         rec->rec_lh_cookie = (cpt << LNET_COOKIE_TYPE_BITS) | type;
713
714         /* Arbitrary choice of hash table size */
715         LIBCFS_CPT_ALLOC(rec->rec_lh_hash, lnet_cpt_table(), cpt,
716                          LNET_LH_HASH_SIZE * sizeof(rec->rec_lh_hash[0]));
717         if (rec->rec_lh_hash == NULL) {
718                 rc = -ENOMEM;
719                 goto out;
720         }
721
722         for (i = 0; i < LNET_LH_HASH_SIZE; i++)
723                 INIT_LIST_HEAD(&rec->rec_lh_hash[i]);
724
725         return 0;
726
727 out:
728         CERROR("Failed to setup %s resource container\n",
729                lnet_res_type2str(type));
730         lnet_res_container_cleanup(rec);
731         return rc;
732 }
733
734 static void
735 lnet_res_containers_destroy(struct lnet_res_container **recs)
736 {
737         struct lnet_res_container       *rec;
738         int                             i;
739
740         cfs_percpt_for_each(rec, i, recs)
741                 lnet_res_container_cleanup(rec);
742
743         cfs_percpt_free(recs);
744 }
745
746 static struct lnet_res_container **
747 lnet_res_containers_create(int type)
748 {
749         struct lnet_res_container       **recs;
750         struct lnet_res_container       *rec;
751         int                             rc;
752         int                             i;
753
754         recs = cfs_percpt_alloc(lnet_cpt_table(), sizeof(*rec));
755         if (recs == NULL) {
756                 CERROR("Failed to allocate %s resource containers\n",
757                        lnet_res_type2str(type));
758                 return NULL;
759         }
760
761         cfs_percpt_for_each(rec, i, recs) {
762                 rc = lnet_res_container_setup(rec, i, type);
763                 if (rc != 0) {
764                         lnet_res_containers_destroy(recs);
765                         return NULL;
766                 }
767         }
768
769         return recs;
770 }
771
772 struct lnet_libhandle *
773 lnet_res_lh_lookup(struct lnet_res_container *rec, __u64 cookie)
774 {
775         /* ALWAYS called with lnet_res_lock held */
776         struct list_head        *head;
777         struct lnet_libhandle   *lh;
778         unsigned int            hash;
779
780         if ((cookie & LNET_COOKIE_MASK) != rec->rec_type)
781                 return NULL;
782
783         hash = cookie >> (LNET_COOKIE_TYPE_BITS + LNET_CPT_BITS);
784         head = &rec->rec_lh_hash[hash & LNET_LH_HASH_MASK];
785
786         list_for_each_entry(lh, head, lh_hash_chain) {
787                 if (lh->lh_cookie == cookie)
788                         return lh;
789         }
790
791         return NULL;
792 }
793
794 void
795 lnet_res_lh_initialize(struct lnet_res_container *rec,
796                        struct lnet_libhandle *lh)
797 {
798         /* ALWAYS called with lnet_res_lock held */
799         unsigned int    ibits = LNET_COOKIE_TYPE_BITS + LNET_CPT_BITS;
800         unsigned int    hash;
801
802         lh->lh_cookie = rec->rec_lh_cookie;
803         rec->rec_lh_cookie += 1 << ibits;
804
805         hash = (lh->lh_cookie >> ibits) & LNET_LH_HASH_MASK;
806
807         list_add(&lh->lh_hash_chain, &rec->rec_lh_hash[hash]);
808 }
809
810 static int lnet_unprepare(void);
811
812 static int
813 lnet_prepare(lnet_pid_t requested_pid)
814 {
815         /* Prepare to bring up the network */
816         struct lnet_res_container **recs;
817         int                       rc = 0;
818
819         if (requested_pid == LNET_PID_ANY) {
820                 /* Don't instantiate LNET just for me */
821                 return -ENETDOWN;
822         }
823
824         LASSERT(the_lnet.ln_refcount == 0);
825
826         the_lnet.ln_routing = 0;
827
828         LASSERT((requested_pid & LNET_PID_USERFLAG) == 0);
829         the_lnet.ln_pid = requested_pid;
830
831         INIT_LIST_HEAD(&the_lnet.ln_test_peers);
832         INIT_LIST_HEAD(&the_lnet.ln_remote_peer_ni_list);
833         INIT_LIST_HEAD(&the_lnet.ln_nets);
834         INIT_LIST_HEAD(&the_lnet.ln_routers);
835         INIT_LIST_HEAD(&the_lnet.ln_drop_rules);
836         INIT_LIST_HEAD(&the_lnet.ln_delay_rules);
837         INIT_LIST_HEAD(&the_lnet.ln_dc_request);
838         INIT_LIST_HEAD(&the_lnet.ln_dc_working);
839         INIT_LIST_HEAD(&the_lnet.ln_dc_expired);
840         INIT_LIST_HEAD(&the_lnet.ln_mt_localNIRecovq);
841         init_waitqueue_head(&the_lnet.ln_dc_waitq);
842
843         rc = lnet_descriptor_setup();
844         if (rc != 0)
845                 goto failed;
846
847         rc = lnet_create_remote_nets_table();
848         if (rc != 0)
849                 goto failed;
850
851         /*
852          * NB the interface cookie in wire handles guards against delayed
853          * replies and ACKs appearing valid after reboot.
854          */
855         the_lnet.ln_interface_cookie = ktime_get_real_ns();
856
857         the_lnet.ln_counters = cfs_percpt_alloc(lnet_cpt_table(),
858                                                 sizeof(struct lnet_counters));
859         if (the_lnet.ln_counters == NULL) {
860                 CERROR("Failed to allocate counters for LNet\n");
861                 rc = -ENOMEM;
862                 goto failed;
863         }
864
865         rc = lnet_peer_tables_create();
866         if (rc != 0)
867                 goto failed;
868
869         rc = lnet_msg_containers_create();
870         if (rc != 0)
871                 goto failed;
872
873         rc = lnet_res_container_setup(&the_lnet.ln_eq_container, 0,
874                                       LNET_COOKIE_TYPE_EQ);
875         if (rc != 0)
876                 goto failed;
877
878         recs = lnet_res_containers_create(LNET_COOKIE_TYPE_ME);
879         if (recs == NULL) {
880                 rc = -ENOMEM;
881                 goto failed;
882         }
883
884         the_lnet.ln_me_containers = recs;
885
886         recs = lnet_res_containers_create(LNET_COOKIE_TYPE_MD);
887         if (recs == NULL) {
888                 rc = -ENOMEM;
889                 goto failed;
890         }
891
892         the_lnet.ln_md_containers = recs;
893
894         rc = lnet_portals_create();
895         if (rc != 0) {
896                 CERROR("Failed to create portals for LNet: %d\n", rc);
897                 goto failed;
898         }
899
900         return 0;
901
902  failed:
903         lnet_unprepare();
904         return rc;
905 }
906
907 static int
908 lnet_unprepare (void)
909 {
910         /* NB no LNET_LOCK since this is the last reference.  All LND instances
911          * have shut down already, so it is safe to unlink and free all
912          * descriptors, even those that appear committed to a network op (eg MD
913          * with non-zero pending count) */
914
915         lnet_fail_nid(LNET_NID_ANY, 0);
916
917         LASSERT(the_lnet.ln_refcount == 0);
918         LASSERT(list_empty(&the_lnet.ln_test_peers));
919         LASSERT(list_empty(&the_lnet.ln_nets));
920
921         lnet_portals_destroy();
922
923         if (the_lnet.ln_md_containers != NULL) {
924                 lnet_res_containers_destroy(the_lnet.ln_md_containers);
925                 the_lnet.ln_md_containers = NULL;
926         }
927
928         if (the_lnet.ln_me_containers != NULL) {
929                 lnet_res_containers_destroy(the_lnet.ln_me_containers);
930                 the_lnet.ln_me_containers = NULL;
931         }
932
933         lnet_res_container_cleanup(&the_lnet.ln_eq_container);
934
935         lnet_msg_containers_destroy();
936         lnet_peer_uninit();
937         lnet_rtrpools_free(0);
938
939         if (the_lnet.ln_counters != NULL) {
940                 cfs_percpt_free(the_lnet.ln_counters);
941                 the_lnet.ln_counters = NULL;
942         }
943         lnet_destroy_remote_nets_table();
944         lnet_descriptor_cleanup();
945
946         return 0;
947 }
948
949 struct lnet_ni  *
950 lnet_net2ni_locked(__u32 net_id, int cpt)
951 {
952         struct lnet_ni   *ni;
953         struct lnet_net  *net;
954
955         LASSERT(cpt != LNET_LOCK_EX);
956
957         list_for_each_entry(net, &the_lnet.ln_nets, net_list) {
958                 if (net->net_id == net_id) {
959                         ni = list_entry(net->net_ni_list.next, struct lnet_ni,
960                                         ni_netlist);
961                         return ni;
962                 }
963         }
964
965         return NULL;
966 }
967
968 struct lnet_ni *
969 lnet_net2ni_addref(__u32 net)
970 {
971         struct lnet_ni *ni;
972
973         lnet_net_lock(0);
974         ni = lnet_net2ni_locked(net, 0);
975         if (ni)
976                 lnet_ni_addref_locked(ni, 0);
977         lnet_net_unlock(0);
978
979         return ni;
980 }
981 EXPORT_SYMBOL(lnet_net2ni_addref);
982
983 struct lnet_net *
984 lnet_get_net_locked(__u32 net_id)
985 {
986         struct lnet_net  *net;
987
988         list_for_each_entry(net, &the_lnet.ln_nets, net_list) {
989                 if (net->net_id == net_id)
990                         return net;
991         }
992
993         return NULL;
994 }
995
996 unsigned int
997 lnet_nid_cpt_hash(lnet_nid_t nid, unsigned int number)
998 {
999         __u64           key = nid;
1000         unsigned int    val;
1001
1002         LASSERT(number >= 1 && number <= LNET_CPT_NUMBER);
1003
1004         if (number == 1)
1005                 return 0;
1006
1007         val = hash_long(key, LNET_CPT_BITS);
1008         /* NB: LNET_CP_NUMBER doesn't have to be PO2 */
1009         if (val < number)
1010                 return val;
1011
1012         return (unsigned int)(key + val + (val >> 1)) % number;
1013 }
1014
1015 int
1016 lnet_cpt_of_nid_locked(lnet_nid_t nid, struct lnet_ni *ni)
1017 {
1018         struct lnet_net *net;
1019
1020         /* must called with hold of lnet_net_lock */
1021         if (LNET_CPT_NUMBER == 1)
1022                 return 0; /* the only one */
1023
1024         /*
1025          * If NI is provided then use the CPT identified in the NI cpt
1026          * list if one exists. If one doesn't exist, then that NI is
1027          * associated with all CPTs and it follows that the net it belongs
1028          * to is implicitly associated with all CPTs, so just hash the nid
1029          * and return that.
1030          */
1031         if (ni != NULL) {
1032                 if (ni->ni_cpts != NULL)
1033                         return ni->ni_cpts[lnet_nid_cpt_hash(nid,
1034                                                              ni->ni_ncpts)];
1035                 else
1036                         return lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
1037         }
1038
1039         /* no NI provided so look at the net */
1040         net = lnet_get_net_locked(LNET_NIDNET(nid));
1041
1042         if (net != NULL && net->net_cpts != NULL) {
1043                 return net->net_cpts[lnet_nid_cpt_hash(nid, net->net_ncpts)];
1044         }
1045
1046         return lnet_nid_cpt_hash(nid, LNET_CPT_NUMBER);
1047 }
1048
1049 int
1050 lnet_cpt_of_nid(lnet_nid_t nid, struct lnet_ni *ni)
1051 {
1052         int     cpt;
1053         int     cpt2;
1054
1055         if (LNET_CPT_NUMBER == 1)
1056                 return 0; /* the only one */
1057
1058         cpt = lnet_net_lock_current();
1059
1060         cpt2 = lnet_cpt_of_nid_locked(nid, ni);
1061
1062         lnet_net_unlock(cpt);
1063
1064         return cpt2;
1065 }
1066 EXPORT_SYMBOL(lnet_cpt_of_nid);
1067
1068 int
1069 lnet_islocalnet(__u32 net_id)
1070 {
1071         struct lnet_net *net;
1072         int             cpt;
1073         bool            local;
1074
1075         cpt = lnet_net_lock_current();
1076
1077         net = lnet_get_net_locked(net_id);
1078
1079         local = net != NULL;
1080
1081         lnet_net_unlock(cpt);
1082
1083         return local;
1084 }
1085
1086 bool
1087 lnet_is_ni_healthy_locked(struct lnet_ni *ni)
1088 {
1089         if (ni->ni_state & LNET_NI_STATE_ACTIVE)
1090                 return true;
1091
1092         return false;
1093 }
1094
1095 struct lnet_ni  *
1096 lnet_nid2ni_locked(lnet_nid_t nid, int cpt)
1097 {
1098         struct lnet_net  *net;
1099         struct lnet_ni   *ni;
1100
1101         LASSERT(cpt != LNET_LOCK_EX);
1102
1103         list_for_each_entry(net, &the_lnet.ln_nets, net_list) {
1104                 list_for_each_entry(ni, &net->net_ni_list, ni_netlist) {
1105                         if (ni->ni_nid == nid)
1106                                 return ni;
1107                 }
1108         }
1109
1110         return NULL;
1111 }
1112
1113 struct lnet_ni *
1114 lnet_nid2ni_addref(lnet_nid_t nid)
1115 {
1116         struct lnet_ni *ni;
1117
1118         lnet_net_lock(0);
1119         ni = lnet_nid2ni_locked(nid, 0);
1120         if (ni)
1121                 lnet_ni_addref_locked(ni, 0);
1122         lnet_net_unlock(0);
1123
1124         return ni;
1125 }
1126 EXPORT_SYMBOL(lnet_nid2ni_addref);
1127
1128 int
1129 lnet_islocalnid(lnet_nid_t nid)
1130 {
1131         struct lnet_ni  *ni;
1132         int             cpt;
1133
1134         cpt = lnet_net_lock_current();
1135         ni = lnet_nid2ni_locked(nid, cpt);
1136         lnet_net_unlock(cpt);
1137
1138         return ni != NULL;
1139 }
1140
1141 int
1142 lnet_count_acceptor_nets(void)
1143 {
1144         /* Return the # of NIs that need the acceptor. */
1145         int              count = 0;
1146         struct lnet_net  *net;
1147         int              cpt;
1148
1149         cpt = lnet_net_lock_current();
1150         list_for_each_entry(net, &the_lnet.ln_nets, net_list) {
1151                 /* all socklnd type networks should have the acceptor
1152                  * thread started */
1153                 if (net->net_lnd->lnd_accept != NULL)
1154                         count++;
1155         }
1156
1157         lnet_net_unlock(cpt);
1158
1159         return count;
1160 }
1161
1162 struct lnet_ping_buffer *
1163 lnet_ping_buffer_alloc(int nnis, gfp_t gfp)
1164 {
1165         struct lnet_ping_buffer *pbuf;
1166
1167         LIBCFS_ALLOC_GFP(pbuf, LNET_PING_BUFFER_SIZE(nnis), gfp);
1168         if (pbuf) {
1169                 pbuf->pb_nnis = nnis;
1170                 atomic_set(&pbuf->pb_refcnt, 1);
1171         }
1172
1173         return pbuf;
1174 }
1175
1176 void
1177 lnet_ping_buffer_free(struct lnet_ping_buffer *pbuf)
1178 {
1179         LASSERT(lnet_ping_buffer_numref(pbuf) == 0);
1180         LIBCFS_FREE(pbuf, LNET_PING_BUFFER_SIZE(pbuf->pb_nnis));
1181 }
1182
1183 static struct lnet_ping_buffer *
1184 lnet_ping_target_create(int nnis)
1185 {
1186         struct lnet_ping_buffer *pbuf;
1187
1188         pbuf = lnet_ping_buffer_alloc(nnis, GFP_NOFS);
1189         if (pbuf == NULL) {
1190                 CERROR("Can't allocate ping source [%d]\n", nnis);
1191                 return NULL;
1192         }
1193
1194         pbuf->pb_info.pi_nnis = nnis;
1195         pbuf->pb_info.pi_pid = the_lnet.ln_pid;
1196         pbuf->pb_info.pi_magic = LNET_PROTO_PING_MAGIC;
1197         pbuf->pb_info.pi_features =
1198                 LNET_PING_FEAT_NI_STATUS | LNET_PING_FEAT_MULTI_RAIL;
1199
1200         return pbuf;
1201 }
1202
1203 static inline int
1204 lnet_get_net_ni_count_locked(struct lnet_net *net)
1205 {
1206         struct lnet_ni  *ni;
1207         int             count = 0;
1208
1209         list_for_each_entry(ni, &net->net_ni_list, ni_netlist)
1210                 count++;
1211
1212         return count;
1213 }
1214
1215 static inline int
1216 lnet_get_net_ni_count_pre(struct lnet_net *net)
1217 {
1218         struct lnet_ni  *ni;
1219         int             count = 0;
1220
1221         list_for_each_entry(ni, &net->net_ni_added, ni_netlist)
1222                 count++;
1223
1224         return count;
1225 }
1226
1227 static inline int
1228 lnet_get_ni_count(void)
1229 {
1230         struct lnet_ni  *ni;
1231         struct lnet_net *net;
1232         int             count = 0;
1233
1234         lnet_net_lock(0);
1235
1236         list_for_each_entry(net, &the_lnet.ln_nets, net_list) {
1237                 list_for_each_entry(ni, &net->net_ni_list, ni_netlist)
1238                         count++;
1239         }
1240
1241         lnet_net_unlock(0);
1242
1243         return count;
1244 }
1245
1246 int
1247 lnet_ping_info_validate(struct lnet_ping_info *pinfo)
1248 {
1249         if (!pinfo)
1250                 return -EINVAL;
1251         if (pinfo->pi_magic != LNET_PROTO_PING_MAGIC)
1252                 return -EPROTO;
1253         if (!(pinfo->pi_features & LNET_PING_FEAT_NI_STATUS))
1254                 return -EPROTO;
1255         /* Loopback is guaranteed to be present */
1256         if (pinfo->pi_nnis < 1 || pinfo->pi_nnis > lnet_interfaces_max)
1257                 return -ERANGE;
1258         if (LNET_NETTYP(LNET_NIDNET(LNET_PING_INFO_LONI(pinfo))) != LOLND)
1259                 return -EPROTO;
1260         return 0;
1261 }
1262
1263 static void
1264 lnet_ping_target_destroy(void)
1265 {
1266         struct lnet_net *net;
1267         struct lnet_ni  *ni;
1268
1269         lnet_net_lock(LNET_LOCK_EX);
1270
1271         list_for_each_entry(net, &the_lnet.ln_nets, net_list) {
1272                 list_for_each_entry(ni, &net->net_ni_list, ni_netlist) {
1273                         lnet_ni_lock(ni);
1274                         ni->ni_status = NULL;
1275                         lnet_ni_unlock(ni);
1276                 }
1277         }
1278
1279         lnet_ping_buffer_decref(the_lnet.ln_ping_target);
1280         the_lnet.ln_ping_target = NULL;
1281
1282         lnet_net_unlock(LNET_LOCK_EX);
1283 }
1284
1285 static void
1286 lnet_ping_target_event_handler(struct lnet_event *event)
1287 {
1288         struct lnet_ping_buffer *pbuf = event->md.user_ptr;
1289
1290         if (event->unlinked)
1291                 lnet_ping_buffer_decref(pbuf);
1292 }
1293
1294 static int
1295 lnet_ping_target_setup(struct lnet_ping_buffer **ppbuf,
1296                        struct lnet_handle_md *ping_mdh,
1297                        int ni_count, bool set_eq)
1298 {
1299         struct lnet_process_id id = {
1300                 .nid = LNET_NID_ANY,
1301                 .pid = LNET_PID_ANY
1302         };
1303         struct lnet_handle_me me_handle;
1304         struct lnet_md md = { NULL };
1305         int rc, rc2;
1306
1307         if (set_eq) {
1308                 rc = LNetEQAlloc(0, lnet_ping_target_event_handler,
1309                                  &the_lnet.ln_ping_target_eq);
1310                 if (rc != 0) {
1311                         CERROR("Can't allocate ping buffer EQ: %d\n", rc);
1312                         return rc;
1313                 }
1314         }
1315
1316         *ppbuf = lnet_ping_target_create(ni_count);
1317         if (*ppbuf == NULL) {
1318                 rc = -ENOMEM;
1319                 goto fail_free_eq;
1320         }
1321
1322         /* Ping target ME/MD */
1323         rc = LNetMEAttach(LNET_RESERVED_PORTAL, id,
1324                           LNET_PROTO_PING_MATCHBITS, 0,
1325                           LNET_UNLINK, LNET_INS_AFTER,
1326                           &me_handle);
1327         if (rc != 0) {
1328                 CERROR("Can't create ping target ME: %d\n", rc);
1329                 goto fail_decref_ping_buffer;
1330         }
1331
1332         /* initialize md content */
1333         md.start     = &(*ppbuf)->pb_info;
1334         md.length    = LNET_PING_INFO_SIZE((*ppbuf)->pb_nnis);
1335         md.threshold = LNET_MD_THRESH_INF;
1336         md.max_size  = 0;
1337         md.options   = LNET_MD_OP_GET | LNET_MD_TRUNCATE |
1338                        LNET_MD_MANAGE_REMOTE;
1339         md.eq_handle = the_lnet.ln_ping_target_eq;
1340         md.user_ptr  = *ppbuf;
1341
1342         rc = LNetMDAttach(me_handle, md, LNET_RETAIN, ping_mdh);
1343         if (rc != 0) {
1344                 CERROR("Can't attach ping target MD: %d\n", rc);
1345                 goto fail_unlink_ping_me;
1346         }
1347         lnet_ping_buffer_addref(*ppbuf);
1348
1349         return 0;
1350
1351 fail_unlink_ping_me:
1352         rc2 = LNetMEUnlink(me_handle);
1353         LASSERT(rc2 == 0);
1354 fail_decref_ping_buffer:
1355         LASSERT(lnet_ping_buffer_numref(*ppbuf) == 1);
1356         lnet_ping_buffer_decref(*ppbuf);
1357         *ppbuf = NULL;
1358 fail_free_eq:
1359         if (set_eq) {
1360                 rc2 = LNetEQFree(the_lnet.ln_ping_target_eq);
1361                 LASSERT(rc2 == 0);
1362         }
1363         return rc;
1364 }
1365
1366 static void
1367 lnet_ping_md_unlink(struct lnet_ping_buffer *pbuf,
1368                     struct lnet_handle_md *ping_mdh)
1369 {
1370         sigset_t        blocked = cfs_block_allsigs();
1371
1372         LNetMDUnlink(*ping_mdh);
1373         LNetInvalidateMDHandle(ping_mdh);
1374
1375         /* NB the MD could be busy; this just starts the unlink */
1376         while (lnet_ping_buffer_numref(pbuf) > 1) {
1377                 CDEBUG(D_NET, "Still waiting for ping data MD to unlink\n");
1378                 set_current_state(TASK_UNINTERRUPTIBLE);
1379                 schedule_timeout(cfs_time_seconds(1));
1380         }
1381
1382         cfs_restore_sigs(blocked);
1383 }
1384
1385 static void
1386 lnet_ping_target_install_locked(struct lnet_ping_buffer *pbuf)
1387 {
1388         struct lnet_ni          *ni;
1389         struct lnet_net         *net;
1390         struct lnet_ni_status *ns;
1391         int                     i;
1392         int                     rc;
1393
1394         i = 0;
1395         list_for_each_entry(net, &the_lnet.ln_nets, net_list) {
1396                 list_for_each_entry(ni, &net->net_ni_list, ni_netlist) {
1397                         LASSERT(i < pbuf->pb_nnis);
1398
1399                         ns = &pbuf->pb_info.pi_ni[i];
1400
1401                         ns->ns_nid = ni->ni_nid;
1402
1403                         lnet_ni_lock(ni);
1404                         ns->ns_status = (ni->ni_status != NULL) ?
1405                                          ni->ni_status->ns_status :
1406                                                 LNET_NI_STATUS_UP;
1407                         ni->ni_status = ns;
1408                         lnet_ni_unlock(ni);
1409
1410                         i++;
1411                 }
1412         }
1413         /*
1414          * We (ab)use the ns_status of the loopback interface to
1415          * transmit the sequence number. The first interface listed
1416          * must be the loopback interface.
1417          */
1418         rc = lnet_ping_info_validate(&pbuf->pb_info);
1419         if (rc) {
1420                 LCONSOLE_EMERG("Invalid ping target: %d\n", rc);
1421                 LBUG();
1422         }
1423         LNET_PING_BUFFER_SEQNO(pbuf) =
1424                 atomic_inc_return(&the_lnet.ln_ping_target_seqno);
1425 }
1426
1427 static void
1428 lnet_ping_target_update(struct lnet_ping_buffer *pbuf,
1429                         struct lnet_handle_md ping_mdh)
1430 {
1431         struct lnet_ping_buffer *old_pbuf = NULL;
1432         struct lnet_handle_md old_ping_md;
1433
1434         /* switch the NIs to point to the new ping info created */
1435         lnet_net_lock(LNET_LOCK_EX);
1436
1437         if (!the_lnet.ln_routing)
1438                 pbuf->pb_info.pi_features |= LNET_PING_FEAT_RTE_DISABLED;
1439         if (!lnet_peer_discovery_disabled)
1440                 pbuf->pb_info.pi_features |= LNET_PING_FEAT_DISCOVERY;
1441
1442         /* Ensure only known feature bits have been set. */
1443         LASSERT(pbuf->pb_info.pi_features & LNET_PING_FEAT_BITS);
1444         LASSERT(!(pbuf->pb_info.pi_features & ~LNET_PING_FEAT_BITS));
1445
1446         lnet_ping_target_install_locked(pbuf);
1447
1448         if (the_lnet.ln_ping_target) {
1449                 old_pbuf = the_lnet.ln_ping_target;
1450                 old_ping_md = the_lnet.ln_ping_target_md;
1451         }
1452         the_lnet.ln_ping_target_md = ping_mdh;
1453         the_lnet.ln_ping_target = pbuf;
1454
1455         lnet_net_unlock(LNET_LOCK_EX);
1456
1457         if (old_pbuf) {
1458                 /* unlink and free the old ping info */
1459                 lnet_ping_md_unlink(old_pbuf, &old_ping_md);
1460                 lnet_ping_buffer_decref(old_pbuf);
1461         }
1462
1463         lnet_push_update_to_peers(0);
1464 }
1465
1466 static void
1467 lnet_ping_target_fini(void)
1468 {
1469         int             rc;
1470
1471         lnet_ping_md_unlink(the_lnet.ln_ping_target,
1472                             &the_lnet.ln_ping_target_md);
1473
1474         rc = LNetEQFree(the_lnet.ln_ping_target_eq);
1475         LASSERT(rc == 0);
1476
1477         lnet_ping_target_destroy();
1478 }
1479
1480 /* Resize the push target. */
1481 int lnet_push_target_resize(void)
1482 {
1483         struct lnet_process_id id = { LNET_NID_ANY, LNET_PID_ANY };
1484         struct lnet_md md = { NULL };
1485         struct lnet_handle_me meh;
1486         struct lnet_handle_md mdh;
1487         struct lnet_handle_md old_mdh;
1488         struct lnet_ping_buffer *pbuf;
1489         struct lnet_ping_buffer *old_pbuf;
1490         int nnis = the_lnet.ln_push_target_nnis;
1491         int rc;
1492
1493         if (nnis <= 0) {
1494                 rc = -EINVAL;
1495                 goto fail_return;
1496         }
1497 again:
1498         pbuf = lnet_ping_buffer_alloc(nnis, GFP_NOFS);
1499         if (!pbuf) {
1500                 rc = -ENOMEM;
1501                 goto fail_return;
1502         }
1503
1504         rc = LNetMEAttach(LNET_RESERVED_PORTAL, id,
1505                           LNET_PROTO_PING_MATCHBITS, 0,
1506                           LNET_UNLINK, LNET_INS_AFTER,
1507                           &meh);
1508         if (rc) {
1509                 CERROR("Can't create push target ME: %d\n", rc);
1510                 goto fail_decref_pbuf;
1511         }
1512
1513         /* initialize md content */
1514         md.start     = &pbuf->pb_info;
1515         md.length    = LNET_PING_INFO_SIZE(nnis);
1516         md.threshold = LNET_MD_THRESH_INF;
1517         md.max_size  = 0;
1518         md.options   = LNET_MD_OP_PUT | LNET_MD_TRUNCATE |
1519                        LNET_MD_MANAGE_REMOTE;
1520         md.user_ptr  = pbuf;
1521         md.eq_handle = the_lnet.ln_push_target_eq;
1522
1523         rc = LNetMDAttach(meh, md, LNET_RETAIN, &mdh);
1524         if (rc) {
1525                 CERROR("Can't attach push MD: %d\n", rc);
1526                 goto fail_unlink_meh;
1527         }
1528         lnet_ping_buffer_addref(pbuf);
1529
1530         lnet_net_lock(LNET_LOCK_EX);
1531         old_pbuf = the_lnet.ln_push_target;
1532         old_mdh = the_lnet.ln_push_target_md;
1533         the_lnet.ln_push_target = pbuf;
1534         the_lnet.ln_push_target_md = mdh;
1535         lnet_net_unlock(LNET_LOCK_EX);
1536
1537         if (old_pbuf) {
1538                 LNetMDUnlink(old_mdh);
1539                 lnet_ping_buffer_decref(old_pbuf);
1540         }
1541
1542         if (nnis < the_lnet.ln_push_target_nnis)
1543                 goto again;
1544
1545         CDEBUG(D_NET, "nnis %d success\n", nnis);
1546
1547         return 0;
1548
1549 fail_unlink_meh:
1550         LNetMEUnlink(meh);
1551 fail_decref_pbuf:
1552         lnet_ping_buffer_decref(pbuf);
1553 fail_return:
1554         CDEBUG(D_NET, "nnis %d error %d\n", nnis, rc);
1555         return rc;
1556 }
1557
1558 static void lnet_push_target_event_handler(struct lnet_event *ev)
1559 {
1560         struct lnet_ping_buffer *pbuf = ev->md.user_ptr;
1561
1562         if (pbuf->pb_info.pi_magic == __swab32(LNET_PROTO_PING_MAGIC))
1563                 lnet_swap_pinginfo(pbuf);
1564
1565         lnet_peer_push_event(ev);
1566         if (ev->unlinked)
1567                 lnet_ping_buffer_decref(pbuf);
1568 }
1569
1570 /* Initialize the push target. */
1571 static int lnet_push_target_init(void)
1572 {
1573         int rc;
1574
1575         if (the_lnet.ln_push_target)
1576                 return -EALREADY;
1577
1578         rc = LNetEQAlloc(0, lnet_push_target_event_handler,
1579                          &the_lnet.ln_push_target_eq);
1580         if (rc) {
1581                 CERROR("Can't allocated push target EQ: %d\n", rc);
1582                 return rc;
1583         }
1584
1585         /* Start at the required minimum, we'll enlarge if required. */
1586         the_lnet.ln_push_target_nnis = LNET_INTERFACES_MIN;
1587
1588         rc = lnet_push_target_resize();
1589
1590         if (rc) {
1591                 LNetEQFree(the_lnet.ln_push_target_eq);
1592                 LNetInvalidateEQHandle(&the_lnet.ln_push_target_eq);
1593         }
1594
1595         return rc;
1596 }
1597
1598 /* Clean up the push target. */
1599 static void lnet_push_target_fini(void)
1600 {
1601         if (!the_lnet.ln_push_target)
1602                 return;
1603
1604         /* Unlink and invalidate to prevent new references. */
1605         LNetMDUnlink(the_lnet.ln_push_target_md);
1606         LNetInvalidateMDHandle(&the_lnet.ln_push_target_md);
1607
1608         /* Wait for the unlink to complete. */
1609         while (lnet_ping_buffer_numref(the_lnet.ln_push_target) > 1) {
1610                 CDEBUG(D_NET, "Still waiting for ping data MD to unlink\n");
1611                 set_current_state(TASK_UNINTERRUPTIBLE);
1612                 schedule_timeout(cfs_time_seconds(1));
1613         }
1614
1615         lnet_ping_buffer_decref(the_lnet.ln_push_target);
1616         the_lnet.ln_push_target = NULL;
1617         the_lnet.ln_push_target_nnis = 0;
1618
1619         LNetEQFree(the_lnet.ln_push_target_eq);
1620         LNetInvalidateEQHandle(&the_lnet.ln_push_target_eq);
1621 }
1622
1623 static int
1624 lnet_ni_tq_credits(struct lnet_ni *ni)
1625 {
1626         int     credits;
1627
1628         LASSERT(ni->ni_ncpts >= 1);
1629
1630         if (ni->ni_ncpts == 1)
1631                 return ni->ni_net->net_tunables.lct_max_tx_credits;
1632
1633         credits = ni->ni_net->net_tunables.lct_max_tx_credits / ni->ni_ncpts;
1634         credits = max(credits, 8 * ni->ni_net->net_tunables.lct_peer_tx_credits);
1635         credits = min(credits, ni->ni_net->net_tunables.lct_max_tx_credits);
1636
1637         return credits;
1638 }
1639
1640 static void
1641 lnet_ni_unlink_locked(struct lnet_ni *ni)
1642 {
1643         if (!list_empty(&ni->ni_cptlist)) {
1644                 list_del_init(&ni->ni_cptlist);
1645                 lnet_ni_decref_locked(ni, 0);
1646         }
1647
1648         /* move it to zombie list and nobody can find it anymore */
1649         LASSERT(!list_empty(&ni->ni_netlist));
1650         list_move(&ni->ni_netlist, &ni->ni_net->net_ni_zombie);
1651         lnet_ni_decref_locked(ni, 0);
1652 }
1653
1654 static void
1655 lnet_clear_zombies_nis_locked(struct lnet_net *net)
1656 {
1657         int             i;
1658         int             islo;
1659         struct lnet_ni  *ni;
1660         struct list_head *zombie_list = &net->net_ni_zombie;
1661
1662         /*
1663          * Now wait for the NIs I just nuked to show up on the zombie
1664          * list and shut them down in guaranteed thread context
1665          */
1666         i = 2;
1667         while (!list_empty(zombie_list)) {
1668                 int     *ref;
1669                 int     j;
1670
1671                 ni = list_entry(zombie_list->next,
1672                                 struct lnet_ni, ni_netlist);
1673                 list_del_init(&ni->ni_netlist);
1674                 /* the ni should be in deleting state. If it's not it's
1675                  * a bug */
1676                 LASSERT(ni->ni_state & LNET_NI_STATE_DELETING);
1677                 cfs_percpt_for_each(ref, j, ni->ni_refs) {
1678                         if (*ref == 0)
1679                                 continue;
1680                         /* still busy, add it back to zombie list */
1681                         list_add(&ni->ni_netlist, zombie_list);
1682                         break;
1683                 }
1684
1685                 if (!list_empty(&ni->ni_netlist)) {
1686                         lnet_net_unlock(LNET_LOCK_EX);
1687                         ++i;
1688                         if ((i & (-i)) == i) {
1689                                 CDEBUG(D_WARNING,
1690                                        "Waiting for zombie LNI %s\n",
1691                                        libcfs_nid2str(ni->ni_nid));
1692                         }
1693                         set_current_state(TASK_UNINTERRUPTIBLE);
1694                         schedule_timeout(cfs_time_seconds(1));
1695                         lnet_net_lock(LNET_LOCK_EX);
1696                         continue;
1697                 }
1698
1699                 lnet_net_unlock(LNET_LOCK_EX);
1700
1701                 islo = ni->ni_net->net_lnd->lnd_type == LOLND;
1702
1703                 LASSERT(!in_interrupt());
1704                 (net->net_lnd->lnd_shutdown)(ni);
1705
1706                 if (!islo)
1707                         CDEBUG(D_LNI, "Removed LNI %s\n",
1708                               libcfs_nid2str(ni->ni_nid));
1709
1710                 lnet_ni_free(ni);
1711                 i = 2;
1712                 lnet_net_lock(LNET_LOCK_EX);
1713         }
1714 }
1715
1716 /* shutdown down the NI and release refcount */
1717 static void
1718 lnet_shutdown_lndni(struct lnet_ni *ni)
1719 {
1720         int i;
1721         struct lnet_net *net = ni->ni_net;
1722
1723         lnet_net_lock(LNET_LOCK_EX);
1724         lnet_ni_lock(ni);
1725         ni->ni_state |= LNET_NI_STATE_DELETING;
1726         ni->ni_state &= ~LNET_NI_STATE_ACTIVE;
1727         lnet_ni_unlock(ni);
1728         lnet_ni_unlink_locked(ni);
1729         lnet_incr_dlc_seq();
1730         lnet_net_unlock(LNET_LOCK_EX);
1731
1732         /* clear messages for this NI on the lazy portal */
1733         for (i = 0; i < the_lnet.ln_nportals; i++)
1734                 lnet_clear_lazy_portal(ni, i, "Shutting down NI");
1735
1736         lnet_net_lock(LNET_LOCK_EX);
1737         lnet_clear_zombies_nis_locked(net);
1738         lnet_net_unlock(LNET_LOCK_EX);
1739 }
1740
1741 static void
1742 lnet_shutdown_lndnet(struct lnet_net *net)
1743 {
1744         struct lnet_ni *ni;
1745
1746         lnet_net_lock(LNET_LOCK_EX);
1747
1748         net->net_state = LNET_NET_STATE_DELETING;
1749
1750         list_del_init(&net->net_list);
1751
1752         while (!list_empty(&net->net_ni_list)) {
1753                 ni = list_entry(net->net_ni_list.next,
1754                                 struct lnet_ni, ni_netlist);
1755                 lnet_net_unlock(LNET_LOCK_EX);
1756                 lnet_shutdown_lndni(ni);
1757                 lnet_net_lock(LNET_LOCK_EX);
1758         }
1759
1760         lnet_net_unlock(LNET_LOCK_EX);
1761
1762         /* Do peer table cleanup for this net */
1763         lnet_peer_tables_cleanup(net);
1764
1765         lnet_net_lock(LNET_LOCK_EX);
1766         /*
1767          * decrement ref count on lnd only when the entire network goes
1768          * away
1769          */
1770         net->net_lnd->lnd_refcount--;
1771
1772         lnet_net_unlock(LNET_LOCK_EX);
1773
1774         lnet_net_free(net);
1775 }
1776
1777 static void
1778 lnet_shutdown_lndnets(void)
1779 {
1780         struct lnet_net *net;
1781         struct list_head resend;
1782         struct lnet_msg *msg, *tmp;
1783
1784         INIT_LIST_HEAD(&resend);
1785
1786         /* NB called holding the global mutex */
1787
1788         /* All quiet on the API front */
1789         LASSERT(the_lnet.ln_state == LNET_STATE_RUNNING);
1790         LASSERT(the_lnet.ln_refcount == 0);
1791
1792         lnet_net_lock(LNET_LOCK_EX);
1793         the_lnet.ln_state = LNET_STATE_STOPPING;
1794
1795         while (!list_empty(&the_lnet.ln_nets)) {
1796                 /*
1797                  * move the nets to the zombie list to avoid them being
1798                  * picked up for new work. LONET is also included in the
1799                  * Nets that will be moved to the zombie list
1800                  */
1801                 net = list_entry(the_lnet.ln_nets.next,
1802                                  struct lnet_net, net_list);
1803                 list_move(&net->net_list, &the_lnet.ln_net_zombie);
1804         }
1805
1806         /* Drop the cached loopback Net. */
1807         if (the_lnet.ln_loni != NULL) {
1808                 lnet_ni_decref_locked(the_lnet.ln_loni, 0);
1809                 the_lnet.ln_loni = NULL;
1810         }
1811         lnet_net_unlock(LNET_LOCK_EX);
1812
1813         /* iterate through the net zombie list and delete each net */
1814         while (!list_empty(&the_lnet.ln_net_zombie)) {
1815                 net = list_entry(the_lnet.ln_net_zombie.next,
1816                                  struct lnet_net, net_list);
1817                 lnet_shutdown_lndnet(net);
1818         }
1819
1820         spin_lock(&the_lnet.ln_msg_resend_lock);
1821         list_splice(&the_lnet.ln_msg_resend, &resend);
1822         spin_unlock(&the_lnet.ln_msg_resend_lock);
1823
1824         list_for_each_entry_safe(msg, tmp, &resend, msg_list) {
1825                 list_del_init(&msg->msg_list);
1826                 msg->msg_no_resend = true;
1827                 lnet_finalize(msg, -ECANCELED);
1828         }
1829
1830         lnet_net_lock(LNET_LOCK_EX);
1831         the_lnet.ln_state = LNET_STATE_SHUTDOWN;
1832         lnet_net_unlock(LNET_LOCK_EX);
1833 }
1834
1835 static int
1836 lnet_startup_lndni(struct lnet_ni *ni, struct lnet_lnd_tunables *tun)
1837 {
1838         int                     rc = -EINVAL;
1839         struct lnet_tx_queue    *tq;
1840         int                     i;
1841         struct lnet_net         *net = ni->ni_net;
1842
1843         mutex_lock(&the_lnet.ln_lnd_mutex);
1844
1845         if (tun) {
1846                 memcpy(&ni->ni_lnd_tunables, tun, sizeof(*tun));
1847                 ni->ni_lnd_tunables_set = true;
1848         }
1849
1850         rc = (net->net_lnd->lnd_startup)(ni);
1851
1852         mutex_unlock(&the_lnet.ln_lnd_mutex);
1853
1854         if (rc != 0) {
1855                 LCONSOLE_ERROR_MSG(0x105, "Error %d starting up LNI %s\n",
1856                                    rc, libcfs_lnd2str(net->net_lnd->lnd_type));
1857                 lnet_net_lock(LNET_LOCK_EX);
1858                 net->net_lnd->lnd_refcount--;
1859                 lnet_net_unlock(LNET_LOCK_EX);
1860                 goto failed0;
1861         }
1862
1863         lnet_ni_lock(ni);
1864         ni->ni_state |= LNET_NI_STATE_ACTIVE;
1865         ni->ni_state &= ~LNET_NI_STATE_INIT;
1866         lnet_ni_unlock(ni);
1867
1868         /* We keep a reference on the loopback net through the loopback NI */
1869         if (net->net_lnd->lnd_type == LOLND) {
1870                 lnet_ni_addref(ni);
1871                 LASSERT(the_lnet.ln_loni == NULL);
1872                 the_lnet.ln_loni = ni;
1873                 ni->ni_net->net_tunables.lct_peer_tx_credits = 0;
1874                 ni->ni_net->net_tunables.lct_peer_rtr_credits = 0;
1875                 ni->ni_net->net_tunables.lct_max_tx_credits = 0;
1876                 ni->ni_net->net_tunables.lct_peer_timeout = 0;
1877                 return 0;
1878         }
1879
1880         if (ni->ni_net->net_tunables.lct_peer_tx_credits == 0 ||
1881             ni->ni_net->net_tunables.lct_max_tx_credits == 0) {
1882                 LCONSOLE_ERROR_MSG(0x107, "LNI %s has no %scredits\n",
1883                                    libcfs_lnd2str(net->net_lnd->lnd_type),
1884                                    ni->ni_net->net_tunables.lct_peer_tx_credits == 0 ?
1885                                         "" : "per-peer ");
1886                 /* shutdown the NI since if we get here then it must've already
1887                  * been started
1888                  */
1889                 lnet_shutdown_lndni(ni);
1890                 return -EINVAL;
1891         }
1892
1893         cfs_percpt_for_each(tq, i, ni->ni_tx_queues) {
1894                 tq->tq_credits_min =
1895                 tq->tq_credits_max =
1896                 tq->tq_credits = lnet_ni_tq_credits(ni);
1897         }
1898
1899         atomic_set(&ni->ni_tx_credits,
1900                    lnet_ni_tq_credits(ni) * ni->ni_ncpts);
1901         atomic_set(&ni->ni_healthv, LNET_MAX_HEALTH_VALUE);
1902
1903         CDEBUG(D_LNI, "Added LNI %s [%d/%d/%d/%d]\n",
1904                 libcfs_nid2str(ni->ni_nid),
1905                 ni->ni_net->net_tunables.lct_peer_tx_credits,
1906                 lnet_ni_tq_credits(ni) * LNET_CPT_NUMBER,
1907                 ni->ni_net->net_tunables.lct_peer_rtr_credits,
1908                 ni->ni_net->net_tunables.lct_peer_timeout);
1909
1910         return 0;
1911 failed0:
1912         lnet_ni_free(ni);
1913         return rc;
1914 }
1915
1916 static int
1917 lnet_startup_lndnet(struct lnet_net *net, struct lnet_lnd_tunables *tun)
1918 {
1919         struct lnet_ni *ni;
1920         struct lnet_net *net_l = NULL;
1921         struct list_head        local_ni_list;
1922         int                     rc;
1923         int                     ni_count = 0;
1924         __u32                   lnd_type;
1925         struct lnet_lnd *lnd;
1926         int                     peer_timeout =
1927                 net->net_tunables.lct_peer_timeout;
1928         int                     maxtxcredits =
1929                 net->net_tunables.lct_max_tx_credits;
1930         int                     peerrtrcredits =
1931                 net->net_tunables.lct_peer_rtr_credits;
1932
1933         INIT_LIST_HEAD(&local_ni_list);
1934
1935         /*
1936          * make sure that this net is unique. If it isn't then
1937          * we are adding interfaces to an already existing network, and
1938          * 'net' is just a convenient way to pass in the list.
1939          * if it is unique we need to find the LND and load it if
1940          * necessary.
1941          */
1942         if (lnet_net_unique(net->net_id, &the_lnet.ln_nets, &net_l)) {
1943                 lnd_type = LNET_NETTYP(net->net_id);
1944
1945                 mutex_lock(&the_lnet.ln_lnd_mutex);
1946                 lnd = lnet_find_lnd_by_type(lnd_type);
1947
1948                 if (lnd == NULL) {
1949                         mutex_unlock(&the_lnet.ln_lnd_mutex);
1950                         rc = request_module("%s", libcfs_lnd2modname(lnd_type));
1951                         mutex_lock(&the_lnet.ln_lnd_mutex);
1952
1953                         lnd = lnet_find_lnd_by_type(lnd_type);
1954                         if (lnd == NULL) {
1955                                 mutex_unlock(&the_lnet.ln_lnd_mutex);
1956                                 CERROR("Can't load LND %s, module %s, rc=%d\n",
1957                                 libcfs_lnd2str(lnd_type),
1958                                 libcfs_lnd2modname(lnd_type), rc);
1959 #ifndef HAVE_MODULE_LOADING_SUPPORT
1960                                 LCONSOLE_ERROR_MSG(0x104, "Your kernel must be "
1961                                                 "compiled with kernel module "
1962                                                 "loading support.");
1963 #endif
1964                                 rc = -EINVAL;
1965                                 goto failed0;
1966                         }
1967                 }
1968
1969                 lnet_net_lock(LNET_LOCK_EX);
1970                 lnd->lnd_refcount++;
1971                 lnet_net_unlock(LNET_LOCK_EX);
1972
1973                 net->net_lnd = lnd;
1974
1975                 mutex_unlock(&the_lnet.ln_lnd_mutex);
1976
1977                 net_l = net;
1978         }
1979
1980         /*
1981          * net_l: if the network being added is unique then net_l
1982          *        will point to that network
1983          *        if the network being added is not unique then
1984          *        net_l points to the existing network.
1985          *
1986          * When we enter the loop below, we'll pick NIs off he
1987          * network beign added and start them up, then add them to
1988          * a local ni list. Once we've successfully started all
1989          * the NIs then we join the local NI list (of started up
1990          * networks) with the net_l->net_ni_list, which should
1991          * point to the correct network to add the new ni list to
1992          *
1993          * If any of the new NIs fail to start up, then we want to
1994          * iterate through the local ni list, which should include
1995          * any NIs which were successfully started up, and shut
1996          * them down.
1997          *
1998          * After than we want to delete the network being added,
1999          * to avoid a memory leak.
2000          */
2001
2002         /*
2003          * When a network uses TCP bonding then all its interfaces
2004          * must be specified when the network is first defined: the
2005          * TCP bonding code doesn't allow for interfaces to be added
2006          * or removed.
2007          */
2008         if (net_l != net && net_l != NULL && use_tcp_bonding &&
2009             LNET_NETTYP(net_l->net_id) == SOCKLND) {
2010                 rc = -EINVAL;
2011                 goto failed0;
2012         }
2013
2014         while (!list_empty(&net->net_ni_added)) {
2015                 ni = list_entry(net->net_ni_added.next, struct lnet_ni,
2016                                 ni_netlist);
2017                 list_del_init(&ni->ni_netlist);
2018
2019                 /* make sure that the the NI we're about to start
2020                  * up is actually unique. if it's not fail. */
2021                 if (!lnet_ni_unique_net(&net_l->net_ni_list,
2022                                         ni->ni_interfaces[0])) {
2023                         rc = -EINVAL;
2024                         goto failed1;
2025                 }
2026
2027                 /* adjust the pointer the parent network, just in case it
2028                  * the net is a duplicate */
2029                 ni->ni_net = net_l;
2030
2031                 rc = lnet_startup_lndni(ni, tun);
2032
2033                 LASSERT(ni->ni_net->net_tunables.lct_peer_timeout <= 0 ||
2034                         ni->ni_net->net_lnd->lnd_query != NULL);
2035
2036                 if (rc < 0)
2037                         goto failed1;
2038
2039                 lnet_ni_addref(ni);
2040                 list_add_tail(&ni->ni_netlist, &local_ni_list);
2041
2042                 ni_count++;
2043         }
2044
2045         lnet_net_lock(LNET_LOCK_EX);
2046         list_splice_tail(&local_ni_list, &net_l->net_ni_list);
2047         lnet_incr_dlc_seq();
2048         lnet_net_unlock(LNET_LOCK_EX);
2049
2050         /* if the network is not unique then we don't want to keep
2051          * it around after we're done. Free it. Otherwise add that
2052          * net to the global the_lnet.ln_nets */
2053         if (net_l != net && net_l != NULL) {
2054                 /*
2055                  * TODO - note. currently the tunables can not be updated
2056                  * once added
2057                  */
2058                 lnet_net_free(net);
2059         } else {
2060                 net->net_state = LNET_NET_STATE_ACTIVE;
2061                 /*
2062                  * restore tunables after it has been overwitten by the
2063                  * lnd
2064                  */
2065                 if (peer_timeout != -1)
2066                         net->net_tunables.lct_peer_timeout = peer_timeout;
2067                 if (maxtxcredits != -1)
2068                         net->net_tunables.lct_max_tx_credits = maxtxcredits;
2069                 if (peerrtrcredits != -1)
2070                         net->net_tunables.lct_peer_rtr_credits = peerrtrcredits;
2071
2072                 lnet_net_lock(LNET_LOCK_EX);
2073                 list_add_tail(&net->net_list, &the_lnet.ln_nets);
2074                 lnet_net_unlock(LNET_LOCK_EX);
2075         }
2076
2077         return ni_count;
2078
2079 failed1:
2080         /*
2081          * shutdown the new NIs that are being started up
2082          * free the NET being started
2083          */
2084         while (!list_empty(&local_ni_list)) {
2085                 ni = list_entry(local_ni_list.next, struct lnet_ni,
2086                                 ni_netlist);
2087
2088                 lnet_shutdown_lndni(ni);
2089         }
2090
2091 failed0:
2092         lnet_net_free(net);
2093
2094         return rc;
2095 }
2096
2097 static int
2098 lnet_startup_lndnets(struct list_head *netlist)
2099 {
2100         struct lnet_net         *net;
2101         int                     rc;
2102         int                     ni_count = 0;
2103
2104         /*
2105          * Change to running state before bringing up the LNDs. This
2106          * allows lnet_shutdown_lndnets() to assert that we've passed
2107          * through here.
2108          */
2109         lnet_net_lock(LNET_LOCK_EX);
2110         the_lnet.ln_state = LNET_STATE_RUNNING;
2111         lnet_net_unlock(LNET_LOCK_EX);
2112
2113         while (!list_empty(netlist)) {
2114                 net = list_entry(netlist->next, struct lnet_net, net_list);
2115                 list_del_init(&net->net_list);
2116
2117                 rc = lnet_startup_lndnet(net, NULL);
2118
2119                 if (rc < 0)
2120                         goto failed;
2121
2122                 ni_count += rc;
2123         }
2124
2125         return ni_count;
2126 failed:
2127         lnet_shutdown_lndnets();
2128
2129         return rc;
2130 }
2131
2132 /**
2133  * Initialize LNet library.
2134  *
2135  * Automatically called at module loading time. Caller has to call
2136  * lnet_lib_exit() after a call to lnet_lib_init(), if and only if the
2137  * latter returned 0. It must be called exactly once.
2138  *
2139  * \retval 0 on success
2140  * \retval -ve on failures.
2141  */
2142 int lnet_lib_init(void)
2143 {
2144         int rc;
2145
2146         lnet_assert_wire_constants();
2147
2148         /* refer to global cfs_cpt_table for now */
2149         the_lnet.ln_cpt_table   = cfs_cpt_table;
2150         the_lnet.ln_cpt_number  = cfs_cpt_number(cfs_cpt_table);
2151
2152         LASSERT(the_lnet.ln_cpt_number > 0);
2153         if (the_lnet.ln_cpt_number > LNET_CPT_MAX) {
2154                 /* we are under risk of consuming all lh_cookie */
2155                 CERROR("Can't have %d CPTs for LNet (max allowed is %d), "
2156                        "please change setting of CPT-table and retry\n",
2157                        the_lnet.ln_cpt_number, LNET_CPT_MAX);
2158                 return -E2BIG;
2159         }
2160
2161         while ((1 << the_lnet.ln_cpt_bits) < the_lnet.ln_cpt_number)
2162                 the_lnet.ln_cpt_bits++;
2163
2164         rc = lnet_create_locks();
2165         if (rc != 0) {
2166                 CERROR("Can't create LNet global locks: %d\n", rc);
2167                 return rc;
2168         }
2169
2170         the_lnet.ln_refcount = 0;
2171         LNetInvalidateEQHandle(&the_lnet.ln_rc_eqh);
2172         INIT_LIST_HEAD(&the_lnet.ln_lnds);
2173         INIT_LIST_HEAD(&the_lnet.ln_net_zombie);
2174         INIT_LIST_HEAD(&the_lnet.ln_rcd_zombie);
2175         INIT_LIST_HEAD(&the_lnet.ln_msg_resend);
2176         INIT_LIST_HEAD(&the_lnet.ln_rcd_deathrow);
2177
2178         /* The hash table size is the number of bits it takes to express the set
2179          * ln_num_routes, minus 1 (better to under estimate than over so we
2180          * don't waste memory). */
2181         if (rnet_htable_size <= 0)
2182                 rnet_htable_size = LNET_REMOTE_NETS_HASH_DEFAULT;
2183         else if (rnet_htable_size > LNET_REMOTE_NETS_HASH_MAX)
2184                 rnet_htable_size = LNET_REMOTE_NETS_HASH_MAX;
2185         the_lnet.ln_remote_nets_hbits = max_t(int, 1,
2186                                            order_base_2(rnet_htable_size) - 1);
2187
2188         /* All LNDs apart from the LOLND are in separate modules.  They
2189          * register themselves when their module loads, and unregister
2190          * themselves when their module is unloaded. */
2191         lnet_register_lnd(&the_lolnd);
2192         return 0;
2193 }
2194
2195 /**
2196  * Finalize LNet library.
2197  *
2198  * \pre lnet_lib_init() called with success.
2199  * \pre All LNet users called LNetNIFini() for matching LNetNIInit() calls.
2200  */
2201 void lnet_lib_exit(void)
2202 {
2203         LASSERT(the_lnet.ln_refcount == 0);
2204
2205         while (!list_empty(&the_lnet.ln_lnds))
2206                 lnet_unregister_lnd(list_entry(the_lnet.ln_lnds.next,
2207                                                struct lnet_lnd, lnd_list));
2208         lnet_destroy_locks();
2209 }
2210
2211 /**
2212  * Set LNet PID and start LNet interfaces, routing, and forwarding.
2213  *
2214  * Users must call this function at least once before any other functions.
2215  * For each successful call there must be a corresponding call to
2216  * LNetNIFini(). For subsequent calls to LNetNIInit(), \a requested_pid is
2217  * ignored.
2218  *
2219  * The PID used by LNet may be different from the one requested.
2220  * See LNetGetId().
2221  *
2222  * \param requested_pid PID requested by the caller.
2223  *
2224  * \return >= 0 on success, and < 0 error code on failures.
2225  */
2226 int
2227 LNetNIInit(lnet_pid_t requested_pid)
2228 {
2229         int                     im_a_router = 0;
2230         int                     rc;
2231         int                     ni_count;
2232         struct lnet_ping_buffer *pbuf;
2233         struct lnet_handle_md   ping_mdh;
2234         struct list_head        net_head;
2235         struct lnet_net         *net;
2236
2237         INIT_LIST_HEAD(&net_head);
2238
2239         mutex_lock(&the_lnet.ln_api_mutex);
2240
2241         CDEBUG(D_OTHER, "refs %d\n", the_lnet.ln_refcount);
2242
2243         if (the_lnet.ln_refcount > 0) {
2244                 rc = the_lnet.ln_refcount++;
2245                 mutex_unlock(&the_lnet.ln_api_mutex);
2246                 return rc;
2247         }
2248
2249         rc = lnet_prepare(requested_pid);
2250         if (rc != 0) {
2251                 mutex_unlock(&the_lnet.ln_api_mutex);
2252                 return rc;
2253         }
2254
2255         /* create a network for Loopback network */
2256         net = lnet_net_alloc(LNET_MKNET(LOLND, 0), &net_head);
2257         if (net == NULL) {
2258                 rc = -ENOMEM;
2259                 goto err_empty_list;
2260         }
2261
2262         /* Add in the loopback NI */
2263         if (lnet_ni_alloc(net, NULL, NULL) == NULL) {
2264                 rc = -ENOMEM;
2265                 goto err_empty_list;
2266         }
2267
2268         /* If LNet is being initialized via DLC it is possible
2269          * that the user requests not to load module parameters (ones which
2270          * are supported by DLC) on initialization.  Therefore, make sure not
2271          * to load networks, routes and forwarding from module parameters
2272          * in this case.  On cleanup in case of failure only clean up
2273          * routes if it has been loaded */
2274         if (!the_lnet.ln_nis_from_mod_params) {
2275                 rc = lnet_parse_networks(&net_head, lnet_get_networks(),
2276                                          use_tcp_bonding);
2277                 if (rc < 0)
2278                         goto err_empty_list;
2279         }
2280
2281         ni_count = lnet_startup_lndnets(&net_head);
2282         if (ni_count < 0) {
2283                 rc = ni_count;
2284                 goto err_empty_list;
2285         }
2286
2287         if (!the_lnet.ln_nis_from_mod_params) {
2288                 rc = lnet_parse_routes(lnet_get_routes(), &im_a_router);
2289                 if (rc != 0)
2290                         goto err_shutdown_lndnis;
2291
2292                 rc = lnet_check_routes();
2293                 if (rc != 0)
2294                         goto err_destroy_routes;
2295
2296                 rc = lnet_rtrpools_alloc(im_a_router);
2297                 if (rc != 0)
2298                         goto err_destroy_routes;
2299         }
2300
2301         rc = lnet_acceptor_start();
2302         if (rc != 0)
2303                 goto err_destroy_routes;
2304
2305         the_lnet.ln_refcount = 1;
2306         /* Now I may use my own API functions... */
2307
2308         rc = lnet_ping_target_setup(&pbuf, &ping_mdh, ni_count, true);
2309         if (rc != 0)
2310                 goto err_acceptor_stop;
2311
2312         lnet_ping_target_update(pbuf, ping_mdh);
2313
2314         rc = lnet_monitor_thr_start();
2315         if (rc != 0)
2316                 goto err_stop_ping;
2317
2318         rc = lnet_push_target_init();
2319         if (rc != 0)
2320                 goto err_stop_monitor_thr;
2321
2322         rc = lnet_peer_discovery_start();
2323         if (rc != 0)
2324                 goto err_destroy_push_target;
2325
2326         lnet_fault_init();
2327         lnet_router_debugfs_init();
2328
2329         mutex_unlock(&the_lnet.ln_api_mutex);
2330
2331         return 0;
2332
2333 err_destroy_push_target:
2334         lnet_push_target_fini();
2335 err_stop_monitor_thr:
2336         lnet_monitor_thr_stop();
2337 err_stop_ping:
2338         lnet_ping_target_fini();
2339 err_acceptor_stop:
2340         the_lnet.ln_refcount = 0;
2341         lnet_acceptor_stop();
2342 err_destroy_routes:
2343         if (!the_lnet.ln_nis_from_mod_params)
2344                 lnet_destroy_routes();
2345 err_shutdown_lndnis:
2346         lnet_shutdown_lndnets();
2347 err_empty_list:
2348         lnet_unprepare();
2349         LASSERT(rc < 0);
2350         mutex_unlock(&the_lnet.ln_api_mutex);
2351         while (!list_empty(&net_head)) {
2352                 struct lnet_net *net;
2353
2354                 net = list_entry(net_head.next, struct lnet_net, net_list);
2355                 list_del_init(&net->net_list);
2356                 lnet_net_free(net);
2357         }
2358         return rc;
2359 }
2360 EXPORT_SYMBOL(LNetNIInit);
2361
2362 /**
2363  * Stop LNet interfaces, routing, and forwarding.
2364  *
2365  * Users must call this function once for each successful call to LNetNIInit().
2366  * Once the LNetNIFini() operation has been started, the results of pending
2367  * API operations are undefined.
2368  *
2369  * \return always 0 for current implementation.
2370  */
2371 int
2372 LNetNIFini()
2373 {
2374         mutex_lock(&the_lnet.ln_api_mutex);
2375
2376         LASSERT(the_lnet.ln_refcount > 0);
2377
2378         if (the_lnet.ln_refcount != 1) {
2379                 the_lnet.ln_refcount--;
2380         } else {
2381                 LASSERT(!the_lnet.ln_niinit_self);
2382
2383                 lnet_fault_fini();
2384
2385                 lnet_router_debugfs_init();
2386                 lnet_peer_discovery_stop();
2387                 lnet_push_target_fini();
2388                 lnet_monitor_thr_stop();
2389                 lnet_ping_target_fini();
2390
2391                 /* Teardown fns that use my own API functions BEFORE here */
2392                 the_lnet.ln_refcount = 0;
2393
2394                 lnet_acceptor_stop();
2395                 lnet_destroy_routes();
2396                 lnet_shutdown_lndnets();
2397                 lnet_unprepare();
2398         }
2399
2400         mutex_unlock(&the_lnet.ln_api_mutex);
2401         return 0;
2402 }
2403 EXPORT_SYMBOL(LNetNIFini);
2404
2405 /**
2406  * Grabs the ni data from the ni structure and fills the out
2407  * parameters
2408  *
2409  * \param[in] ni network        interface structure
2410  * \param[out] cfg_ni           NI config information
2411  * \param[out] tun              network and LND tunables
2412  */
2413 static void
2414 lnet_fill_ni_info(struct lnet_ni *ni, struct lnet_ioctl_config_ni *cfg_ni,
2415                    struct lnet_ioctl_config_lnd_tunables *tun,
2416                    struct lnet_ioctl_element_stats *stats,
2417                    __u32 tun_size)
2418 {
2419         size_t min_size = 0;
2420         int i;
2421
2422         if (!ni || !cfg_ni || !tun)
2423                 return;
2424
2425         if (ni->ni_interfaces[0] != NULL) {
2426                 for (i = 0; i < ARRAY_SIZE(ni->ni_interfaces); i++) {
2427                         if (ni->ni_interfaces[i] != NULL) {
2428                                 strncpy(cfg_ni->lic_ni_intf[i],
2429                                         ni->ni_interfaces[i],
2430                                         sizeof(cfg_ni->lic_ni_intf[i]));
2431                         }
2432                 }
2433         }
2434
2435         cfg_ni->lic_nid = ni->ni_nid;
2436         if (LNET_NETTYP(LNET_NIDNET(ni->ni_nid)) == LOLND)
2437                 cfg_ni->lic_status = LNET_NI_STATUS_UP;
2438         else
2439                 cfg_ni->lic_status = ni->ni_status->ns_status;
2440         cfg_ni->lic_tcp_bonding = use_tcp_bonding;
2441         cfg_ni->lic_dev_cpt = ni->ni_dev_cpt;
2442
2443         memcpy(&tun->lt_cmn, &ni->ni_net->net_tunables, sizeof(tun->lt_cmn));
2444
2445         if (stats) {
2446                 stats->iel_send_count = lnet_sum_stats(&ni->ni_stats,
2447                                                        LNET_STATS_TYPE_SEND);
2448                 stats->iel_recv_count = lnet_sum_stats(&ni->ni_stats,
2449                                                        LNET_STATS_TYPE_RECV);
2450                 stats->iel_drop_count = lnet_sum_stats(&ni->ni_stats,
2451                                                        LNET_STATS_TYPE_DROP);
2452         }
2453
2454         /*
2455          * tun->lt_tun will always be present, but in order to be
2456          * backwards compatible, we need to deal with the cases when
2457          * tun->lt_tun is smaller than what the kernel has, because it
2458          * comes from an older version of a userspace program, then we'll
2459          * need to copy as much information as we have available space.
2460          */
2461         min_size = tun_size - sizeof(tun->lt_cmn);
2462         memcpy(&tun->lt_tun, &ni->ni_lnd_tunables, min_size);
2463
2464         /* copy over the cpts */
2465         if (ni->ni_ncpts == LNET_CPT_NUMBER &&
2466             ni->ni_cpts == NULL)  {
2467                 for (i = 0; i < ni->ni_ncpts; i++)
2468                         cfg_ni->lic_cpts[i] = i;
2469         } else {
2470                 for (i = 0;
2471                      ni->ni_cpts != NULL && i < ni->ni_ncpts &&
2472                      i < LNET_MAX_SHOW_NUM_CPT;
2473                      i++)
2474                         cfg_ni->lic_cpts[i] = ni->ni_cpts[i];
2475         }
2476         cfg_ni->lic_ncpts = ni->ni_ncpts;
2477 }
2478
2479 /**
2480  * NOTE: This is a legacy function left in the code to be backwards
2481  * compatible with older userspace programs. It should eventually be
2482  * removed.
2483  *
2484  * Grabs the ni data from the ni structure and fills the out
2485  * parameters
2486  *
2487  * \param[in] ni network        interface structure
2488  * \param[out] config           config information
2489  */
2490 static void
2491 lnet_fill_ni_info_legacy(struct lnet_ni *ni,
2492                          struct lnet_ioctl_config_data *config)
2493 {
2494         struct lnet_ioctl_net_config *net_config;
2495         struct lnet_ioctl_config_lnd_tunables *lnd_cfg = NULL;
2496         size_t min_size, tunable_size = 0;
2497         int i;
2498
2499         if (!ni || !config)
2500                 return;
2501
2502         net_config = (struct lnet_ioctl_net_config *) config->cfg_bulk;
2503         if (!net_config)
2504                 return;
2505
2506         BUILD_BUG_ON(ARRAY_SIZE(ni->ni_interfaces) !=
2507                      ARRAY_SIZE(net_config->ni_interfaces));
2508
2509         for (i = 0; i < ARRAY_SIZE(ni->ni_interfaces); i++) {
2510                 if (!ni->ni_interfaces[i])
2511                         break;
2512
2513                 strncpy(net_config->ni_interfaces[i],
2514                         ni->ni_interfaces[i],
2515                         sizeof(net_config->ni_interfaces[i]));
2516         }
2517
2518         config->cfg_nid = ni->ni_nid;
2519         config->cfg_config_u.cfg_net.net_peer_timeout =
2520                 ni->ni_net->net_tunables.lct_peer_timeout;
2521         config->cfg_config_u.cfg_net.net_max_tx_credits =
2522                 ni->ni_net->net_tunables.lct_max_tx_credits;
2523         config->cfg_config_u.cfg_net.net_peer_tx_credits =
2524                 ni->ni_net->net_tunables.lct_peer_tx_credits;
2525         config->cfg_config_u.cfg_net.net_peer_rtr_credits =
2526                 ni->ni_net->net_tunables.lct_peer_rtr_credits;
2527
2528         if (LNET_NETTYP(LNET_NIDNET(ni->ni_nid)) == LOLND)
2529                 net_config->ni_status = LNET_NI_STATUS_UP;
2530         else
2531                 net_config->ni_status = ni->ni_status->ns_status;
2532
2533         if (ni->ni_cpts) {
2534                 int num_cpts = min(ni->ni_ncpts, LNET_MAX_SHOW_NUM_CPT);
2535
2536                 for (i = 0; i < num_cpts; i++)
2537                         net_config->ni_cpts[i] = ni->ni_cpts[i];
2538
2539                 config->cfg_ncpts = num_cpts;
2540         }
2541
2542         /*
2543          * See if user land tools sent in a newer and larger version
2544          * of struct lnet_tunables than what the kernel uses.
2545          */
2546         min_size = sizeof(*config) + sizeof(*net_config);
2547
2548         if (config->cfg_hdr.ioc_len > min_size)
2549                 tunable_size = config->cfg_hdr.ioc_len - min_size;
2550
2551         /* Don't copy too much data to user space */
2552         min_size = min(tunable_size, sizeof(ni->ni_lnd_tunables));
2553         lnd_cfg = (struct lnet_ioctl_config_lnd_tunables *)net_config->cfg_bulk;
2554
2555         if (lnd_cfg && min_size) {
2556                 memcpy(&lnd_cfg->lt_tun, &ni->ni_lnd_tunables, min_size);
2557                 config->cfg_config_u.cfg_net.net_interface_count = 1;
2558
2559                 /* Tell user land that kernel side has less data */
2560                 if (tunable_size > sizeof(ni->ni_lnd_tunables)) {
2561                         min_size = tunable_size - sizeof(ni->ni_lnd_tunables);
2562                         config->cfg_hdr.ioc_len -= min_size;
2563                 }
2564         }
2565 }
2566
2567 struct lnet_ni *
2568 lnet_get_ni_idx_locked(int idx)
2569 {
2570         struct lnet_ni          *ni;
2571         struct lnet_net         *net;
2572
2573         list_for_each_entry(net, &the_lnet.ln_nets, net_list) {
2574                 list_for_each_entry(ni, &net->net_ni_list, ni_netlist) {
2575                         if (idx-- == 0)
2576                                 return ni;
2577                 }
2578         }
2579
2580         return NULL;
2581 }
2582
2583 struct lnet_ni *
2584 lnet_get_next_ni_locked(struct lnet_net *mynet, struct lnet_ni *prev)
2585 {
2586         struct lnet_ni          *ni;
2587         struct lnet_net         *net = mynet;
2588
2589         /*
2590          * It is possible that the net has been cleaned out while there is
2591          * a message being sent. This function accessed the net without
2592          * checking if the list is empty
2593          */
2594         if (prev == NULL) {
2595                 if (net == NULL)
2596                         net = list_entry(the_lnet.ln_nets.next, struct lnet_net,
2597                                         net_list);
2598                 if (list_empty(&net->net_ni_list))
2599                         return NULL;
2600                 ni = list_entry(net->net_ni_list.next, struct lnet_ni,
2601                                 ni_netlist);
2602
2603                 return ni;
2604         }
2605
2606         if (prev->ni_netlist.next == &prev->ni_net->net_ni_list) {
2607                 /* if you reached the end of the ni list and the net is
2608                  * specified, then there are no more nis in that net */
2609                 if (net != NULL)
2610                         return NULL;
2611
2612                 /* we reached the end of this net ni list. move to the
2613                  * next net */
2614                 if (prev->ni_net->net_list.next == &the_lnet.ln_nets)
2615                         /* no more nets and no more NIs. */
2616                         return NULL;
2617
2618                 /* get the next net */
2619                 net = list_entry(prev->ni_net->net_list.next, struct lnet_net,
2620                                  net_list);
2621                 if (list_empty(&net->net_ni_list))
2622                         return NULL;
2623                 /* get the ni on it */
2624                 ni = list_entry(net->net_ni_list.next, struct lnet_ni,
2625                                 ni_netlist);
2626
2627                 return ni;
2628         }
2629
2630         if (list_empty(&prev->ni_netlist))
2631                 return NULL;
2632
2633         /* there are more nis left */
2634         ni = list_entry(prev->ni_netlist.next, struct lnet_ni, ni_netlist);
2635
2636         return ni;
2637 }
2638
2639 int
2640 lnet_get_net_config(struct lnet_ioctl_config_data *config)
2641 {
2642         struct lnet_ni *ni;
2643         int cpt;
2644         int rc = -ENOENT;
2645         int idx = config->cfg_count;
2646
2647         cpt = lnet_net_lock_current();
2648
2649         ni = lnet_get_ni_idx_locked(idx);
2650
2651         if (ni != NULL) {
2652                 rc = 0;
2653                 lnet_ni_lock(ni);
2654                 lnet_fill_ni_info_legacy(ni, config);
2655                 lnet_ni_unlock(ni);
2656         }
2657
2658         lnet_net_unlock(cpt);
2659         return rc;
2660 }
2661
2662 int
2663 lnet_get_ni_config(struct lnet_ioctl_config_ni *cfg_ni,
2664                    struct lnet_ioctl_config_lnd_tunables *tun,
2665                    struct lnet_ioctl_element_stats *stats,
2666                    __u32 tun_size)
2667 {
2668         struct lnet_ni          *ni;
2669         int                     cpt;
2670         int                     rc = -ENOENT;
2671
2672         if (!cfg_ni || !tun || !stats)
2673                 return -EINVAL;
2674
2675         cpt = lnet_net_lock_current();
2676
2677         ni = lnet_get_ni_idx_locked(cfg_ni->lic_idx);
2678
2679         if (ni) {
2680                 rc = 0;
2681                 lnet_ni_lock(ni);
2682                 lnet_fill_ni_info(ni, cfg_ni, tun, stats, tun_size);
2683                 lnet_ni_unlock(ni);
2684         }
2685
2686         lnet_net_unlock(cpt);
2687         return rc;
2688 }
2689
2690 int lnet_get_ni_stats(struct lnet_ioctl_element_msg_stats *msg_stats)
2691 {
2692         struct lnet_ni *ni;
2693         int cpt;
2694         int rc = -ENOENT;
2695
2696         if (!msg_stats)
2697                 return -EINVAL;
2698
2699         cpt = lnet_net_lock_current();
2700
2701         ni = lnet_get_ni_idx_locked(msg_stats->im_idx);
2702
2703         if (ni) {
2704                 lnet_usr_translate_stats(msg_stats, &ni->ni_stats);
2705                 rc = 0;
2706         }
2707
2708         lnet_net_unlock(cpt);
2709
2710         return rc;
2711 }
2712
2713 static int lnet_add_net_common(struct lnet_net *net,
2714                                struct lnet_ioctl_config_lnd_tunables *tun)
2715 {
2716         __u32                   net_id;
2717         struct lnet_ping_buffer *pbuf;
2718         struct lnet_handle_md   ping_mdh;
2719         int                     rc;
2720         struct lnet_remotenet *rnet;
2721         int                     net_ni_count;
2722         int                     num_acceptor_nets;
2723
2724         lnet_net_lock(LNET_LOCK_EX);
2725         rnet = lnet_find_rnet_locked(net->net_id);
2726         lnet_net_unlock(LNET_LOCK_EX);
2727         /*
2728          * make sure that the net added doesn't invalidate the current
2729          * configuration LNet is keeping
2730          */
2731         if (rnet) {
2732                 CERROR("Adding net %s will invalidate routing configuration\n",
2733                        libcfs_net2str(net->net_id));
2734                 lnet_net_free(net);
2735                 return -EUSERS;
2736         }
2737
2738         /*
2739          * make sure you calculate the correct number of slots in the ping
2740          * buffer. Since the ping info is a flattened list of all the NIs,
2741          * we should allocate enough slots to accomodate the number of NIs
2742          * which will be added.
2743          *
2744          * since ni hasn't been configured yet, use
2745          * lnet_get_net_ni_count_pre() which checks the net_ni_added list
2746          */
2747         net_ni_count = lnet_get_net_ni_count_pre(net);
2748
2749         rc = lnet_ping_target_setup(&pbuf, &ping_mdh,
2750                                     net_ni_count + lnet_get_ni_count(),
2751                                     false);
2752         if (rc < 0) {
2753                 lnet_net_free(net);
2754                 return rc;
2755         }
2756
2757         if (tun)
2758                 memcpy(&net->net_tunables,
2759                        &tun->lt_cmn, sizeof(net->net_tunables));
2760         else
2761                 memset(&net->net_tunables, -1, sizeof(net->net_tunables));
2762
2763         /*
2764          * before starting this network get a count of the current TCP
2765          * networks which require the acceptor thread running. If that
2766          * count is == 0 before we start up this network, then we'd want to
2767          * start up the acceptor thread after starting up this network
2768          */
2769         num_acceptor_nets = lnet_count_acceptor_nets();
2770
2771         net_id = net->net_id;
2772
2773         rc = lnet_startup_lndnet(net,
2774                                  (tun) ? &tun->lt_tun : NULL);
2775         if (rc < 0)
2776                 goto failed;
2777
2778         lnet_net_lock(LNET_LOCK_EX);
2779         net = lnet_get_net_locked(net_id);
2780         lnet_net_unlock(LNET_LOCK_EX);
2781
2782         LASSERT(net);
2783
2784         /*
2785          * Start the acceptor thread if this is the first network
2786          * being added that requires the thread.
2787          */
2788         if (net->net_lnd->lnd_accept && num_acceptor_nets == 0) {
2789                 rc = lnet_acceptor_start();
2790                 if (rc < 0) {
2791                         /* shutdown the net that we just started */
2792                         CERROR("Failed to start up acceptor thread\n");
2793                         lnet_shutdown_lndnet(net);
2794                         goto failed;
2795                 }
2796         }
2797
2798         lnet_net_lock(LNET_LOCK_EX);
2799         lnet_peer_net_added(net);
2800         lnet_net_unlock(LNET_LOCK_EX);
2801
2802         lnet_ping_target_update(pbuf, ping_mdh);
2803
2804         return 0;
2805
2806 failed:
2807         lnet_ping_md_unlink(pbuf, &ping_mdh);
2808         lnet_ping_buffer_decref(pbuf);
2809         return rc;
2810 }
2811
2812 static int lnet_handle_legacy_ip2nets(char *ip2nets,
2813                                       struct lnet_ioctl_config_lnd_tunables *tun)
2814 {
2815         struct lnet_net *net;
2816         char *nets;
2817         int rc;
2818         struct list_head net_head;
2819
2820         INIT_LIST_HEAD(&net_head);
2821
2822         rc = lnet_parse_ip2nets(&nets, ip2nets);
2823         if (rc < 0)
2824                 return rc;
2825
2826         rc = lnet_parse_networks(&net_head, nets, use_tcp_bonding);
2827         if (rc < 0)
2828                 return rc;
2829
2830         mutex_lock(&the_lnet.ln_api_mutex);
2831         while (!list_empty(&net_head)) {
2832                 net = list_entry(net_head.next, struct lnet_net, net_list);
2833                 list_del_init(&net->net_list);
2834                 rc = lnet_add_net_common(net, tun);
2835                 if (rc < 0)
2836                         goto out;
2837         }
2838
2839 out:
2840         mutex_unlock(&the_lnet.ln_api_mutex);
2841
2842         while (!list_empty(&net_head)) {
2843                 net = list_entry(net_head.next, struct lnet_net, net_list);
2844                 list_del_init(&net->net_list);
2845                 lnet_net_free(net);
2846         }
2847         return rc;
2848 }
2849
2850 int lnet_dyn_add_ni(struct lnet_ioctl_config_ni *conf)
2851 {
2852         struct lnet_net *net;
2853         struct lnet_ni *ni;
2854         struct lnet_ioctl_config_lnd_tunables *tun = NULL;
2855         int rc, i;
2856         __u32 net_id, lnd_type;
2857
2858         /* get the tunables if they are available */
2859         if (conf->lic_cfg_hdr.ioc_len >=
2860             sizeof(*conf) + sizeof(*tun))
2861                 tun = (struct lnet_ioctl_config_lnd_tunables *)
2862                         conf->lic_bulk;
2863
2864         /* handle legacy ip2nets from DLC */
2865         if (conf->lic_legacy_ip2nets[0] != '\0')
2866                 return lnet_handle_legacy_ip2nets(conf->lic_legacy_ip2nets,
2867                                                   tun);
2868
2869         net_id = LNET_NIDNET(conf->lic_nid);
2870         lnd_type = LNET_NETTYP(net_id);
2871
2872         if (!libcfs_isknown_lnd(lnd_type)) {
2873                 CERROR("No valid net and lnd information provided\n");
2874                 return -EINVAL;
2875         }
2876
2877         net = lnet_net_alloc(net_id, NULL);
2878         if (!net)
2879                 return -ENOMEM;
2880
2881         for (i = 0; i < conf->lic_ncpts; i++) {
2882                 if (conf->lic_cpts[i] >= LNET_CPT_NUMBER)
2883                         return -EINVAL;
2884         }
2885
2886         ni = lnet_ni_alloc_w_cpt_array(net, conf->lic_cpts, conf->lic_ncpts,
2887                                        conf->lic_ni_intf[0]);
2888         if (!ni)
2889                 return -ENOMEM;
2890
2891         mutex_lock(&the_lnet.ln_api_mutex);
2892
2893         rc = lnet_add_net_common(net, tun);
2894
2895         mutex_unlock(&the_lnet.ln_api_mutex);
2896
2897         return rc;
2898 }
2899
2900 int lnet_dyn_del_ni(struct lnet_ioctl_config_ni *conf)
2901 {
2902         struct lnet_net  *net;
2903         struct lnet_ni *ni;
2904         __u32 net_id = LNET_NIDNET(conf->lic_nid);
2905         struct lnet_ping_buffer *pbuf;
2906         struct lnet_handle_md  ping_mdh;
2907         int               rc;
2908         int               net_count;
2909         __u32             addr;
2910
2911         /* don't allow userspace to shutdown the LOLND */
2912         if (LNET_NETTYP(net_id) == LOLND)
2913                 return -EINVAL;
2914
2915         mutex_lock(&the_lnet.ln_api_mutex);
2916
2917         lnet_net_lock(0);
2918
2919         net = lnet_get_net_locked(net_id);
2920         if (!net) {
2921                 CERROR("net %s not found\n",
2922                        libcfs_net2str(net_id));
2923                 rc = -ENOENT;
2924                 goto unlock_net;
2925         }
2926
2927         addr = LNET_NIDADDR(conf->lic_nid);
2928         if (addr == 0) {
2929                 /* remove the entire net */
2930                 net_count = lnet_get_net_ni_count_locked(net);
2931
2932                 lnet_net_unlock(0);
2933
2934                 /* create and link a new ping info, before removing the old one */
2935                 rc = lnet_ping_target_setup(&pbuf, &ping_mdh,
2936                                         lnet_get_ni_count() - net_count,
2937                                         false);
2938                 if (rc != 0)
2939                         goto unlock_api_mutex;
2940
2941                 lnet_shutdown_lndnet(net);
2942
2943                 if (lnet_count_acceptor_nets() == 0)
2944                         lnet_acceptor_stop();
2945
2946                 lnet_ping_target_update(pbuf, ping_mdh);
2947
2948                 goto unlock_api_mutex;
2949         }
2950
2951         ni = lnet_nid2ni_locked(conf->lic_nid, 0);
2952         if (!ni) {
2953                 CERROR("nid %s not found\n",
2954                        libcfs_nid2str(conf->lic_nid));
2955                 rc = -ENOENT;
2956                 goto unlock_net;
2957         }
2958
2959         net_count = lnet_get_net_ni_count_locked(net);
2960
2961         lnet_net_unlock(0);
2962
2963         /* create and link a new ping info, before removing the old one */
2964         rc = lnet_ping_target_setup(&pbuf, &ping_mdh,
2965                                   lnet_get_ni_count() - 1, false);
2966         if (rc != 0)
2967                 goto unlock_api_mutex;
2968
2969         lnet_shutdown_lndni(ni);
2970
2971         if (lnet_count_acceptor_nets() == 0)
2972                 lnet_acceptor_stop();
2973
2974         lnet_ping_target_update(pbuf, ping_mdh);
2975
2976         /* check if the net is empty and remove it if it is */
2977         if (net_count == 1)
2978                 lnet_shutdown_lndnet(net);
2979
2980         goto unlock_api_mutex;
2981
2982 unlock_net:
2983         lnet_net_unlock(0);
2984 unlock_api_mutex:
2985         mutex_unlock(&the_lnet.ln_api_mutex);
2986
2987         return rc;
2988 }
2989
2990 /*
2991  * lnet_dyn_add_net and lnet_dyn_del_net are now deprecated.
2992  * They are only expected to be called for unique networks.
2993  * That can be as a result of older DLC library
2994  * calls. Multi-Rail DLC and beyond no longer uses these APIs.
2995  */
2996 int
2997 lnet_dyn_add_net(struct lnet_ioctl_config_data *conf)
2998 {
2999         struct lnet_net         *net;
3000         struct list_head        net_head;
3001         int                     rc;
3002         struct lnet_ioctl_config_lnd_tunables tun;
3003         char *nets = conf->cfg_config_u.cfg_net.net_intf;
3004
3005         INIT_LIST_HEAD(&net_head);
3006
3007         /* Create a net/ni structures for the network string */
3008         rc = lnet_parse_networks(&net_head, nets, use_tcp_bonding);
3009         if (rc <= 0)
3010                 return rc == 0 ? -EINVAL : rc;
3011
3012         mutex_lock(&the_lnet.ln_api_mutex);
3013
3014         if (rc > 1) {
3015                 rc = -EINVAL; /* only add one network per call */
3016                 goto out_unlock_clean;
3017         }
3018
3019         net = list_entry(net_head.next, struct lnet_net, net_list);
3020         list_del_init(&net->net_list);
3021
3022         LASSERT(lnet_net_unique(net->net_id, &the_lnet.ln_nets, NULL));
3023
3024         memset(&tun, 0, sizeof(tun));
3025
3026         tun.lt_cmn.lct_peer_timeout =
3027           conf->cfg_config_u.cfg_net.net_peer_timeout;
3028         tun.lt_cmn.lct_peer_tx_credits =
3029           conf->cfg_config_u.cfg_net.net_peer_tx_credits;
3030         tun.lt_cmn.lct_peer_rtr_credits =
3031           conf->cfg_config_u.cfg_net.net_peer_rtr_credits;
3032         tun.lt_cmn.lct_max_tx_credits =
3033           conf->cfg_config_u.cfg_net.net_max_tx_credits;
3034
3035         rc = lnet_add_net_common(net, &tun);
3036
3037 out_unlock_clean:
3038         mutex_unlock(&the_lnet.ln_api_mutex);
3039         while (!list_empty(&net_head)) {
3040                 /* net_head list is empty in success case */
3041                 net = list_entry(net_head.next, struct lnet_net, net_list);
3042                 list_del_init(&net->net_list);
3043                 lnet_net_free(net);
3044         }
3045         return rc;
3046 }
3047
3048 int
3049 lnet_dyn_del_net(__u32 net_id)
3050 {
3051         struct lnet_net  *net;
3052         struct lnet_ping_buffer *pbuf;
3053         struct lnet_handle_md ping_mdh;
3054         int               rc;
3055         int               net_ni_count;
3056
3057         /* don't allow userspace to shutdown the LOLND */
3058         if (LNET_NETTYP(net_id) == LOLND)
3059                 return -EINVAL;
3060
3061         mutex_lock(&the_lnet.ln_api_mutex);
3062
3063         lnet_net_lock(0);
3064
3065         net = lnet_get_net_locked(net_id);
3066         if (net == NULL) {
3067                 lnet_net_unlock(0);
3068                 rc = -EINVAL;
3069                 goto out;
3070         }
3071
3072         net_ni_count = lnet_get_net_ni_count_locked(net);
3073
3074         lnet_net_unlock(0);
3075
3076         /* create and link a new ping info, before removing the old one */
3077         rc = lnet_ping_target_setup(&pbuf, &ping_mdh,
3078                                     lnet_get_ni_count() - net_ni_count, false);
3079         if (rc != 0)
3080                 goto out;
3081
3082         lnet_shutdown_lndnet(net);
3083
3084         if (lnet_count_acceptor_nets() == 0)
3085                 lnet_acceptor_stop();
3086
3087         lnet_ping_target_update(pbuf, ping_mdh);
3088
3089 out:
3090         mutex_unlock(&the_lnet.ln_api_mutex);
3091
3092         return rc;
3093 }
3094
3095 void lnet_incr_dlc_seq(void)
3096 {
3097         atomic_inc(&lnet_dlc_seq_no);
3098 }
3099
3100 __u32 lnet_get_dlc_seq_locked(void)
3101 {
3102         return atomic_read(&lnet_dlc_seq_no);
3103 }
3104
3105 /**
3106  * LNet ioctl handler.
3107  *
3108  */
3109 int
3110 LNetCtl(unsigned int cmd, void *arg)
3111 {
3112         struct libcfs_ioctl_data *data = arg;
3113         struct lnet_ioctl_config_data *config;
3114         struct lnet_process_id    id = {0};
3115         struct lnet_ni           *ni;
3116         int                       rc;
3117
3118         BUILD_BUG_ON(sizeof(struct lnet_ioctl_net_config) +
3119                      sizeof(struct lnet_ioctl_config_data) > LIBCFS_IOC_DATA_MAX);
3120
3121         switch (cmd) {
3122         case IOC_LIBCFS_GET_NI:
3123                 rc = LNetGetId(data->ioc_count, &id);
3124                 data->ioc_nid = id.nid;
3125                 return rc;
3126
3127         case IOC_LIBCFS_FAIL_NID:
3128                 return lnet_fail_nid(data->ioc_nid, data->ioc_count);
3129
3130         case IOC_LIBCFS_ADD_ROUTE:
3131                 config = arg;
3132
3133                 if (config->cfg_hdr.ioc_len < sizeof(*config))
3134                         return -EINVAL;
3135
3136                 mutex_lock(&the_lnet.ln_api_mutex);
3137                 rc = lnet_add_route(config->cfg_net,
3138                                     config->cfg_config_u.cfg_route.rtr_hop,
3139                                     config->cfg_nid,
3140                                     config->cfg_config_u.cfg_route.
3141                                         rtr_priority);
3142                 if (rc == 0) {
3143                         rc = lnet_check_routes();
3144                         if (rc != 0)
3145                                 lnet_del_route(config->cfg_net,
3146                                                config->cfg_nid);
3147                 }
3148                 mutex_unlock(&the_lnet.ln_api_mutex);
3149                 return rc;
3150
3151         case IOC_LIBCFS_DEL_ROUTE:
3152                 config = arg;
3153
3154                 if (config->cfg_hdr.ioc_len < sizeof(*config))
3155                         return -EINVAL;
3156
3157                 mutex_lock(&the_lnet.ln_api_mutex);
3158                 rc = lnet_del_route(config->cfg_net, config->cfg_nid);
3159                 mutex_unlock(&the_lnet.ln_api_mutex);
3160                 return rc;
3161
3162         case IOC_LIBCFS_GET_ROUTE:
3163                 config = arg;
3164
3165                 if (config->cfg_hdr.ioc_len < sizeof(*config))
3166                         return -EINVAL;
3167
3168                 mutex_lock(&the_lnet.ln_api_mutex);
3169                 rc = lnet_get_route(config->cfg_count,
3170                                     &config->cfg_net,
3171                                     &config->cfg_config_u.cfg_route.rtr_hop,
3172                                     &config->cfg_nid,
3173                                     &config->cfg_config_u.cfg_route.rtr_flags,
3174                                     &config->cfg_config_u.cfg_route.
3175                                         rtr_priority);
3176                 mutex_unlock(&the_lnet.ln_api_mutex);
3177                 return rc;
3178
3179         case IOC_LIBCFS_GET_LOCAL_NI: {
3180                 struct lnet_ioctl_config_ni *cfg_ni;
3181                 struct lnet_ioctl_config_lnd_tunables *tun = NULL;
3182                 struct lnet_ioctl_element_stats *stats;
3183                 __u32 tun_size;
3184
3185                 cfg_ni = arg;
3186
3187                 /* get the tunables if they are available */
3188                 if (cfg_ni->lic_cfg_hdr.ioc_len <
3189                     sizeof(*cfg_ni) + sizeof(*stats) + sizeof(*tun))
3190                         return -EINVAL;
3191
3192                 stats = (struct lnet_ioctl_element_stats *)
3193                         cfg_ni->lic_bulk;
3194                 tun = (struct lnet_ioctl_config_lnd_tunables *)
3195                                 (cfg_ni->lic_bulk + sizeof(*stats));
3196
3197                 tun_size = cfg_ni->lic_cfg_hdr.ioc_len - sizeof(*cfg_ni) -
3198                         sizeof(*stats);
3199
3200                 mutex_lock(&the_lnet.ln_api_mutex);
3201                 rc = lnet_get_ni_config(cfg_ni, tun, stats, tun_size);
3202                 mutex_unlock(&the_lnet.ln_api_mutex);
3203                 return rc;
3204         }
3205
3206         case IOC_LIBCFS_GET_LOCAL_NI_MSG_STATS: {
3207                 struct lnet_ioctl_element_msg_stats *msg_stats = arg;
3208
3209                 if (msg_stats->im_hdr.ioc_len != sizeof(*msg_stats))
3210                         return -EINVAL;
3211
3212                 mutex_lock(&the_lnet.ln_api_mutex);
3213                 rc = lnet_get_ni_stats(msg_stats);
3214                 mutex_unlock(&the_lnet.ln_api_mutex);
3215
3216                 return rc;
3217         }
3218
3219         case IOC_LIBCFS_GET_NET: {
3220                 size_t total = sizeof(*config) +
3221                                sizeof(struct lnet_ioctl_net_config);
3222                 config = arg;
3223
3224                 if (config->cfg_hdr.ioc_len < total)
3225                         return -EINVAL;
3226
3227                 mutex_lock(&the_lnet.ln_api_mutex);
3228                 rc = lnet_get_net_config(config);
3229                 mutex_unlock(&the_lnet.ln_api_mutex);
3230                 return rc;
3231         }
3232
3233         case IOC_LIBCFS_GET_LNET_STATS:
3234         {
3235                 struct lnet_ioctl_lnet_stats *lnet_stats = arg;
3236
3237                 if (lnet_stats->st_hdr.ioc_len < sizeof(*lnet_stats))
3238                         return -EINVAL;
3239
3240                 mutex_lock(&the_lnet.ln_api_mutex);
3241                 lnet_counters_get(&lnet_stats->st_cntrs);
3242                 mutex_unlock(&the_lnet.ln_api_mutex);
3243                 return 0;
3244         }
3245
3246         case IOC_LIBCFS_CONFIG_RTR:
3247                 config = arg;
3248
3249                 if (config->cfg_hdr.ioc_len < sizeof(*config))
3250                         return -EINVAL;
3251
3252                 mutex_lock(&the_lnet.ln_api_mutex);
3253                 if (config->cfg_config_u.cfg_buffers.buf_enable) {
3254                         rc = lnet_rtrpools_enable();
3255                         mutex_unlock(&the_lnet.ln_api_mutex);
3256                         return rc;
3257                 }
3258                 lnet_rtrpools_disable();
3259                 mutex_unlock(&the_lnet.ln_api_mutex);
3260                 return 0;
3261
3262         case IOC_LIBCFS_ADD_BUF:
3263                 config = arg;
3264
3265                 if (config->cfg_hdr.ioc_len < sizeof(*config))
3266                         return -EINVAL;
3267
3268                 mutex_lock(&the_lnet.ln_api_mutex);
3269                 rc = lnet_rtrpools_adjust(config->cfg_config_u.cfg_buffers.
3270                                                 buf_tiny,
3271                                           config->cfg_config_u.cfg_buffers.
3272                                                 buf_small,
3273                                           config->cfg_config_u.cfg_buffers.
3274                                                 buf_large);
3275                 mutex_unlock(&the_lnet.ln_api_mutex);
3276                 return rc;
3277
3278         case IOC_LIBCFS_SET_NUMA_RANGE: {
3279                 struct lnet_ioctl_set_value *numa;
3280                 numa = arg;
3281                 if (numa->sv_hdr.ioc_len != sizeof(*numa))
3282                         return -EINVAL;
3283                 lnet_net_lock(LNET_LOCK_EX);
3284                 lnet_numa_range = numa->sv_value;
3285                 lnet_net_unlock(LNET_LOCK_EX);
3286                 return 0;
3287         }
3288
3289         case IOC_LIBCFS_GET_NUMA_RANGE: {
3290                 struct lnet_ioctl_set_value *numa;
3291                 numa = arg;
3292                 if (numa->sv_hdr.ioc_len != sizeof(*numa))
3293                         return -EINVAL;
3294                 numa->sv_value = lnet_numa_range;
3295                 return 0;
3296         }
3297
3298         case IOC_LIBCFS_GET_BUF: {
3299                 struct lnet_ioctl_pool_cfg *pool_cfg;
3300                 size_t total = sizeof(*config) + sizeof(*pool_cfg);
3301
3302                 config = arg;
3303
3304                 if (config->cfg_hdr.ioc_len < total)
3305                         return -EINVAL;
3306
3307                 pool_cfg = (struct lnet_ioctl_pool_cfg *)config->cfg_bulk;
3308
3309                 mutex_lock(&the_lnet.ln_api_mutex);
3310                 rc = lnet_get_rtr_pool_cfg(config->cfg_count, pool_cfg);
3311                 mutex_unlock(&the_lnet.ln_api_mutex);
3312                 return rc;
3313         }
3314
3315         case IOC_LIBCFS_ADD_PEER_NI: {
3316                 struct lnet_ioctl_peer_cfg *cfg = arg;
3317
3318                 if (cfg->prcfg_hdr.ioc_len < sizeof(*cfg))
3319                         return -EINVAL;
3320
3321                 mutex_lock(&the_lnet.ln_api_mutex);
3322                 rc = lnet_add_peer_ni(cfg->prcfg_prim_nid,
3323                                       cfg->prcfg_cfg_nid,
3324                                       cfg->prcfg_mr);
3325                 mutex_unlock(&the_lnet.ln_api_mutex);
3326                 return rc;
3327         }
3328
3329         case IOC_LIBCFS_DEL_PEER_NI: {
3330                 struct lnet_ioctl_peer_cfg *cfg = arg;
3331
3332                 if (cfg->prcfg_hdr.ioc_len < sizeof(*cfg))
3333                         return -EINVAL;
3334
3335                 mutex_lock(&the_lnet.ln_api_mutex);
3336                 rc = lnet_del_peer_ni(cfg->prcfg_prim_nid,
3337                                       cfg->prcfg_cfg_nid);
3338                 mutex_unlock(&the_lnet.ln_api_mutex);
3339                 return rc;
3340         }
3341
3342         case IOC_LIBCFS_GET_PEER_INFO: {
3343                 struct lnet_ioctl_peer *peer_info = arg;
3344
3345                 if (peer_info->pr_hdr.ioc_len < sizeof(*peer_info))
3346                         return -EINVAL;
3347
3348                 mutex_lock(&the_lnet.ln_api_mutex);
3349                 rc = lnet_get_peer_ni_info(
3350                    peer_info->pr_count,
3351                    &peer_info->pr_nid,
3352                    peer_info->pr_lnd_u.pr_peer_credits.cr_aliveness,
3353                    &peer_info->pr_lnd_u.pr_peer_credits.cr_ncpt,
3354                    &peer_info->pr_lnd_u.pr_peer_credits.cr_refcount,
3355                    &peer_info->pr_lnd_u.pr_peer_credits.cr_ni_peer_tx_credits,
3356                    &peer_info->pr_lnd_u.pr_peer_credits.cr_peer_tx_credits,
3357                    &peer_info->pr_lnd_u.pr_peer_credits.cr_peer_rtr_credits,
3358                    &peer_info->pr_lnd_u.pr_peer_credits.cr_peer_min_tx_credits,
3359                    &peer_info->pr_lnd_u.pr_peer_credits.cr_peer_tx_qnob);
3360                 mutex_unlock(&the_lnet.ln_api_mutex);
3361                 return rc;
3362         }
3363
3364         case IOC_LIBCFS_GET_PEER_NI: {
3365                 struct lnet_ioctl_peer_cfg *cfg = arg;
3366
3367                 if (cfg->prcfg_hdr.ioc_len < sizeof(*cfg))
3368                         return -EINVAL;
3369
3370                 mutex_lock(&the_lnet.ln_api_mutex);
3371                 rc = lnet_get_peer_info(cfg,
3372                                         (void __user *)cfg->prcfg_bulk);
3373                 mutex_unlock(&the_lnet.ln_api_mutex);
3374                 return rc;
3375         }
3376
3377         case IOC_LIBCFS_GET_PEER_LIST: {
3378                 struct lnet_ioctl_peer_cfg *cfg = arg;
3379
3380                 if (cfg->prcfg_hdr.ioc_len < sizeof(*cfg))
3381                         return -EINVAL;
3382
3383                 mutex_lock(&the_lnet.ln_api_mutex);
3384                 rc = lnet_get_peer_list(&cfg->prcfg_count, &cfg->prcfg_size,
3385                                 (struct lnet_process_id __user *)cfg->prcfg_bulk);
3386                 mutex_unlock(&the_lnet.ln_api_mutex);
3387                 return rc;
3388         }
3389
3390         case IOC_LIBCFS_NOTIFY_ROUTER: {
3391                 time64_t deadline = ktime_get_real_seconds() - data->ioc_u64[0];
3392
3393                 /* The deadline passed in by the user should be some time in
3394                  * seconds in the future since the UNIX epoch. We have to map
3395                  * that deadline to the wall clock.
3396                  */
3397                 deadline += ktime_get_seconds();
3398                 return lnet_notify(NULL, data->ioc_nid, data->ioc_flags,
3399                                    deadline);
3400         }
3401
3402         case IOC_LIBCFS_LNET_DIST:
3403                 rc = LNetDist(data->ioc_nid, &data->ioc_nid, &data->ioc_u32[1]);
3404                 if (rc < 0 && rc != -EHOSTUNREACH)
3405                         return rc;
3406
3407                 data->ioc_u32[0] = rc;
3408                 return 0;
3409
3410         case IOC_LIBCFS_TESTPROTOCOMPAT:
3411                 lnet_net_lock(LNET_LOCK_EX);
3412                 the_lnet.ln_testprotocompat = data->ioc_flags;
3413                 lnet_net_unlock(LNET_LOCK_EX);
3414                 return 0;
3415
3416         case IOC_LIBCFS_LNET_FAULT:
3417                 return lnet_fault_ctl(data->ioc_flags, data);
3418
3419         case IOC_LIBCFS_PING: {
3420                 signed long timeout;
3421
3422                 id.nid = data->ioc_nid;
3423                 id.pid = data->ioc_u32[0];
3424
3425                 /* If timeout is negative then set default of 3 minutes */
3426                 if (((s32)data->ioc_u32[1] <= 0) ||
3427                     data->ioc_u32[1] > (DEFAULT_PEER_TIMEOUT * MSEC_PER_SEC))
3428                         timeout = msecs_to_jiffies(DEFAULT_PEER_TIMEOUT * MSEC_PER_SEC);
3429                 else
3430                         timeout = msecs_to_jiffies(data->ioc_u32[1]);
3431
3432                 rc = lnet_ping(id, timeout, data->ioc_pbuf1,
3433                                data->ioc_plen1 / sizeof(struct lnet_process_id));
3434
3435                 if (rc < 0)
3436                         return rc;
3437
3438                 data->ioc_count = rc;
3439                 return 0;
3440         }
3441
3442         case IOC_LIBCFS_PING_PEER: {
3443                 struct lnet_ioctl_ping_data *ping = arg;
3444                 struct lnet_peer *lp;
3445                 signed long timeout;
3446
3447                 /* If timeout is negative then set default of 3 minutes */
3448                 if (((s32)ping->op_param) <= 0 ||
3449                     ping->op_param > (DEFAULT_PEER_TIMEOUT * MSEC_PER_SEC))
3450                         timeout = msecs_to_jiffies(DEFAULT_PEER_TIMEOUT * MSEC_PER_SEC);
3451                 else
3452                         timeout = msecs_to_jiffies(ping->op_param);
3453
3454                 rc = lnet_ping(ping->ping_id, timeout,
3455                                ping->ping_buf,
3456                                ping->ping_count);
3457                 if (rc < 0)
3458                         return rc;
3459
3460                 mutex_lock(&the_lnet.ln_api_mutex);
3461                 lp = lnet_find_peer(ping->ping_id.nid);
3462                 if (lp) {
3463                         ping->ping_id.nid = lp->lp_primary_nid;
3464                         ping->mr_info = lnet_peer_is_multi_rail(lp);
3465                         lnet_peer_decref_locked(lp);
3466                 }
3467                 mutex_unlock(&the_lnet.ln_api_mutex);
3468
3469                 ping->ping_count = rc;
3470                 return 0;
3471         }
3472
3473         case IOC_LIBCFS_DISCOVER: {
3474                 struct lnet_ioctl_ping_data *discover = arg;
3475                 struct lnet_peer *lp;
3476
3477                 rc = lnet_discover(discover->ping_id, discover->op_param,
3478                                    discover->ping_buf,
3479                                    discover->ping_count);
3480                 if (rc < 0)
3481                         return rc;
3482
3483                 mutex_lock(&the_lnet.ln_api_mutex);
3484                 lp = lnet_find_peer(discover->ping_id.nid);
3485                 if (lp) {
3486                         discover->ping_id.nid = lp->lp_primary_nid;
3487                         discover->mr_info = lnet_peer_is_multi_rail(lp);
3488                         lnet_peer_decref_locked(lp);
3489                 }
3490                 mutex_unlock(&the_lnet.ln_api_mutex);
3491
3492                 discover->ping_count = rc;
3493                 return 0;
3494         }
3495
3496         default:
3497                 ni = lnet_net2ni_addref(data->ioc_net);
3498                 if (ni == NULL)
3499                         return -EINVAL;
3500
3501                 if (ni->ni_net->net_lnd->lnd_ctl == NULL)
3502                         rc = -EINVAL;
3503                 else
3504                         rc = ni->ni_net->net_lnd->lnd_ctl(ni, cmd, arg);
3505
3506                 lnet_ni_decref(ni);
3507                 return rc;
3508         }
3509         /* not reached */
3510 }
3511 EXPORT_SYMBOL(LNetCtl);
3512
3513 void LNetDebugPeer(struct lnet_process_id id)
3514 {
3515         lnet_debug_peer(id.nid);
3516 }
3517 EXPORT_SYMBOL(LNetDebugPeer);
3518
3519 /**
3520  * Determine if the specified peer \a nid is on the local node.
3521  *
3522  * \param nid   peer nid to check
3523  *
3524  * \retval true         If peer NID is on the local node.
3525  * \retval false        If peer NID is not on the local node.
3526  */
3527 bool LNetIsPeerLocal(lnet_nid_t nid)
3528 {
3529         struct lnet_net *net;
3530         struct lnet_ni *ni;
3531         int cpt;
3532
3533         cpt = lnet_net_lock_current();
3534         list_for_each_entry(net, &the_lnet.ln_nets, net_list) {
3535                 list_for_each_entry(ni, &net->net_ni_list, ni_netlist) {
3536                         if (ni->ni_nid == nid) {
3537                                 lnet_net_unlock(cpt);
3538                                 return true;
3539                         }
3540                 }
3541         }
3542         lnet_net_unlock(cpt);
3543
3544         return false;
3545 }
3546 EXPORT_SYMBOL(LNetIsPeerLocal);
3547
3548 /**
3549  * Retrieve the struct lnet_process_id ID of LNet interface at \a index.
3550  * Note that all interfaces share a same PID, as requested by LNetNIInit().
3551  *
3552  * \param index Index of the interface to look up.
3553  * \param id On successful return, this location will hold the
3554  * struct lnet_process_id ID of the interface.
3555  *
3556  * \retval 0 If an interface exists at \a index.
3557  * \retval -ENOENT If no interface has been found.
3558  */
3559 int
3560 LNetGetId(unsigned int index, struct lnet_process_id *id)
3561 {
3562         struct lnet_ni   *ni;
3563         struct lnet_net  *net;
3564         int               cpt;
3565         int               rc = -ENOENT;
3566
3567         LASSERT(the_lnet.ln_refcount > 0);
3568
3569         cpt = lnet_net_lock_current();
3570
3571         list_for_each_entry(net, &the_lnet.ln_nets, net_list) {
3572                 list_for_each_entry(ni, &net->net_ni_list, ni_netlist) {
3573                         if (index-- != 0)
3574                                 continue;
3575
3576                         id->nid = ni->ni_nid;
3577                         id->pid = the_lnet.ln_pid;
3578                         rc = 0;
3579                         break;
3580                 }
3581         }
3582
3583         lnet_net_unlock(cpt);
3584         return rc;
3585 }
3586 EXPORT_SYMBOL(LNetGetId);
3587
3588 static int lnet_ping(struct lnet_process_id id, signed long timeout,
3589                      struct lnet_process_id __user *ids, int n_ids)
3590 {
3591         struct lnet_handle_eq eqh;
3592         struct lnet_handle_md mdh;
3593         struct lnet_event event;
3594         struct lnet_md md = { NULL };
3595         int which;
3596         int unlinked = 0;
3597         int replied = 0;
3598         const signed long a_long_time = msecs_to_jiffies(60 * MSEC_PER_SEC);
3599         struct lnet_ping_buffer *pbuf;
3600         struct lnet_process_id tmpid;
3601         int i;
3602         int nob;
3603         int rc;
3604         int rc2;
3605         sigset_t blocked;
3606
3607         /* n_ids limit is arbitrary */
3608         if (n_ids <= 0 || id.nid == LNET_NID_ANY)
3609                 return -EINVAL;
3610
3611         /*
3612          * if the user buffer has more space than the lnet_interfaces_max
3613          * then only fill it up to lnet_interfaces_max
3614          */
3615         if (n_ids > lnet_interfaces_max)
3616                 n_ids = lnet_interfaces_max;
3617
3618         if (id.pid == LNET_PID_ANY)
3619                 id.pid = LNET_PID_LUSTRE;
3620
3621         pbuf = lnet_ping_buffer_alloc(n_ids, GFP_NOFS);
3622         if (!pbuf)
3623                 return -ENOMEM;
3624
3625         /* NB 2 events max (including any unlink event) */
3626         rc = LNetEQAlloc(2, LNET_EQ_HANDLER_NONE, &eqh);
3627         if (rc != 0) {
3628                 CERROR("Can't allocate EQ: %d\n", rc);
3629                 goto fail_ping_buffer_decref;
3630         }
3631
3632         /* initialize md content */
3633         md.start     = &pbuf->pb_info;
3634         md.length    = LNET_PING_INFO_SIZE(n_ids);
3635         md.threshold = 2; /* GET/REPLY */
3636         md.max_size  = 0;
3637         md.options   = LNET_MD_TRUNCATE;
3638         md.user_ptr  = NULL;
3639         md.eq_handle = eqh;
3640
3641         rc = LNetMDBind(md, LNET_UNLINK, &mdh);
3642         if (rc != 0) {
3643                 CERROR("Can't bind MD: %d\n", rc);
3644                 goto fail_free_eq;
3645         }
3646
3647         rc = LNetGet(LNET_NID_ANY, mdh, id,
3648                      LNET_RESERVED_PORTAL,
3649                      LNET_PROTO_PING_MATCHBITS, 0, false);
3650
3651         if (rc != 0) {
3652                 /* Don't CERROR; this could be deliberate! */
3653                 rc2 = LNetMDUnlink(mdh);
3654                 LASSERT(rc2 == 0);
3655
3656                 /* NB must wait for the UNLINK event below... */
3657                 unlinked = 1;
3658                 timeout = a_long_time;
3659         }
3660
3661         do {
3662                 /* MUST block for unlink to complete */
3663                 if (unlinked)
3664                         blocked = cfs_block_allsigs();
3665
3666                 rc2 = LNetEQPoll(&eqh, 1, timeout, &event, &which);
3667
3668                 if (unlinked)
3669                         cfs_restore_sigs(blocked);
3670
3671                 CDEBUG(D_NET, "poll %d(%d %d)%s\n", rc2,
3672                        (rc2 <= 0) ? -1 : event.type,
3673                        (rc2 <= 0) ? -1 : event.status,
3674                        (rc2 > 0 && event.unlinked) ? " unlinked" : "");
3675
3676                 LASSERT(rc2 != -EOVERFLOW);     /* can't miss anything */
3677
3678                 if (rc2 <= 0 || event.status != 0) {
3679                         /* timeout or error */
3680                         if (!replied && rc == 0)
3681                                 rc = (rc2 < 0) ? rc2 :
3682                                      (rc2 == 0) ? -ETIMEDOUT :
3683                                      event.status;
3684
3685                         if (!unlinked) {
3686                                 /* Ensure completion in finite time... */
3687                                 LNetMDUnlink(mdh);
3688                                 /* No assertion (racing with network) */
3689                                 unlinked = 1;
3690                                 timeout = a_long_time;
3691                         } else if (rc2 == 0) {
3692                                 /* timed out waiting for unlink */
3693                                 CWARN("ping %s: late network completion\n",
3694                                       libcfs_id2str(id));
3695                         }
3696                 } else if (event.type == LNET_EVENT_REPLY) {
3697                         replied = 1;
3698                         rc = event.mlength;
3699                 }
3700         } while (rc2 <= 0 || !event.unlinked);
3701
3702         if (!replied) {
3703                 if (rc >= 0)
3704                         CWARN("%s: Unexpected rc >= 0 but no reply!\n",
3705                               libcfs_id2str(id));
3706                 rc = -EIO;
3707                 goto fail_free_eq;
3708         }
3709
3710         nob = rc;
3711         LASSERT(nob >= 0 && nob <= LNET_PING_INFO_SIZE(n_ids));
3712
3713         rc = -EPROTO;           /* if I can't parse... */
3714
3715         if (nob < 8) {
3716                 CERROR("%s: ping info too short %d\n",
3717                        libcfs_id2str(id), nob);
3718                 goto fail_free_eq;
3719         }
3720
3721         if (pbuf->pb_info.pi_magic == __swab32(LNET_PROTO_PING_MAGIC)) {
3722                 lnet_swap_pinginfo(pbuf);
3723         } else if (pbuf->pb_info.pi_magic != LNET_PROTO_PING_MAGIC) {
3724                 CERROR("%s: Unexpected magic %08x\n",
3725                        libcfs_id2str(id), pbuf->pb_info.pi_magic);
3726                 goto fail_free_eq;
3727         }
3728
3729         if ((pbuf->pb_info.pi_features & LNET_PING_FEAT_NI_STATUS) == 0) {
3730                 CERROR("%s: ping w/o NI status: 0x%x\n",
3731                        libcfs_id2str(id), pbuf->pb_info.pi_features);
3732                 goto fail_free_eq;
3733         }
3734
3735         if (nob < LNET_PING_INFO_SIZE(0)) {
3736                 CERROR("%s: Short reply %d(%d min)\n",
3737                        libcfs_id2str(id),
3738                        nob, (int)LNET_PING_INFO_SIZE(0));
3739                 goto fail_free_eq;
3740         }
3741
3742         if (pbuf->pb_info.pi_nnis < n_ids)
3743                 n_ids = pbuf->pb_info.pi_nnis;
3744
3745         if (nob < LNET_PING_INFO_SIZE(n_ids)) {
3746                 CERROR("%s: Short reply %d(%d expected)\n",
3747                        libcfs_id2str(id),
3748                        nob, (int)LNET_PING_INFO_SIZE(n_ids));
3749                 goto fail_free_eq;
3750         }
3751
3752         rc = -EFAULT;           /* if I segv in copy_to_user()... */
3753
3754         memset(&tmpid, 0, sizeof(tmpid));
3755         for (i = 0; i < n_ids; i++) {
3756                 tmpid.pid = pbuf->pb_info.pi_pid;
3757                 tmpid.nid = pbuf->pb_info.pi_ni[i].ns_nid;
3758                 if (copy_to_user(&ids[i], &tmpid, sizeof(tmpid)))
3759                         goto fail_free_eq;
3760         }
3761         rc = pbuf->pb_info.pi_nnis;
3762
3763  fail_free_eq:
3764         rc2 = LNetEQFree(eqh);
3765         if (rc2 != 0)
3766                 CERROR("rc2 %d\n", rc2);
3767         LASSERT(rc2 == 0);
3768
3769  fail_ping_buffer_decref:
3770         lnet_ping_buffer_decref(pbuf);
3771         return rc;
3772 }
3773
3774 static int
3775 lnet_discover(struct lnet_process_id id, __u32 force,
3776               struct lnet_process_id __user *ids, int n_ids)
3777 {
3778         struct lnet_peer_ni *lpni;
3779         struct lnet_peer_ni *p;
3780         struct lnet_peer *lp;
3781         struct lnet_process_id *buf;
3782         int cpt;
3783         int i;
3784         int rc;
3785         int max_intf = lnet_interfaces_max;
3786         size_t buf_size;
3787
3788         if (n_ids <= 0 ||
3789             id.nid == LNET_NID_ANY)
3790                 return -EINVAL;
3791
3792         if (id.pid == LNET_PID_ANY)
3793                 id.pid = LNET_PID_LUSTRE;
3794
3795         /*
3796          * if the user buffer has more space than the max_intf
3797          * then only fill it up to max_intf
3798          */
3799         if (n_ids > max_intf)
3800                 n_ids = max_intf;
3801
3802         buf_size = n_ids * sizeof(*buf);
3803
3804         LIBCFS_ALLOC(buf, buf_size);
3805         if (!buf)
3806                 return -ENOMEM;
3807
3808         cpt = lnet_net_lock_current();
3809         lpni = lnet_nid2peerni_locked(id.nid, LNET_NID_ANY, cpt);
3810         if (IS_ERR(lpni)) {
3811                 rc = PTR_ERR(lpni);
3812                 goto out;
3813         }
3814
3815         /*
3816          * Clearing the NIDS_UPTODATE flag ensures the peer will
3817          * be discovered, provided discovery has not been disabled.
3818          */
3819         lp = lpni->lpni_peer_net->lpn_peer;
3820         spin_lock(&lp->lp_lock);
3821         lp->lp_state &= ~LNET_PEER_NIDS_UPTODATE;
3822         /* If the force flag is set, force a PING and PUSH as well. */
3823         if (force)
3824                 lp->lp_state |= LNET_PEER_FORCE_PING | LNET_PEER_FORCE_PUSH;
3825         spin_unlock(&lp->lp_lock);
3826         rc = lnet_discover_peer_locked(lpni, cpt, true);
3827         if (rc)
3828                 goto out_decref;
3829
3830         /* Peer may have changed. */
3831         lp = lpni->lpni_peer_net->lpn_peer;
3832         if (lp->lp_nnis < n_ids)
3833                 n_ids = lp->lp_nnis;
3834
3835         i = 0;
3836         p = NULL;
3837         while ((p = lnet_get_next_peer_ni_locked(lp, NULL, p)) != NULL) {
3838                 buf[i].pid = id.pid;
3839                 buf[i].nid = p->lpni_nid;
3840                 if (++i >= n_ids)
3841                         break;
3842         }
3843
3844         lnet_net_unlock(cpt);
3845
3846         rc = -EFAULT;
3847         if (copy_to_user(ids, buf, n_ids * sizeof(*buf)))
3848                 goto out_relock;
3849         rc = n_ids;
3850 out_relock:
3851         lnet_net_lock(cpt);
3852 out_decref:
3853         lnet_peer_ni_decref_locked(lpni);
3854 out:
3855         lnet_net_unlock(cpt);
3856
3857         LIBCFS_FREE(buf, buf_size);
3858
3859         return rc;
3860 }