Whamcloud - gitweb
LU-10391 socklnd: use interface index to track local addr
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lnet/klnds/socklnd/socklnd.c
33  *
34  * Author: Zach Brown <zab@zabbo.net>
35  * Author: Peter J. Braam <braam@clusterfs.com>
36  * Author: Phil Schwan <phil@clusterfs.com>
37  * Author: Eric Barton <eric@bartonsoftware.com>
38  */
39
40 #include "socklnd.h"
41 #include <linux/inetdevice.h>
42
43 static const struct lnet_lnd the_ksocklnd;
44 struct ksock_nal_data ksocknal_data;
45
46 static struct ksock_interface *
47 ksocknal_ip2iface(struct lnet_ni *ni, __u32 ip)
48 {
49         struct ksock_net *net = ni->ni_data;
50         int i;
51         struct ksock_interface *iface;
52
53         for (i = 0; i < net->ksnn_ninterfaces; i++) {
54                 LASSERT(i < LNET_INTERFACES_NUM);
55                 iface = &net->ksnn_interfaces[i];
56
57                 if (iface->ksni_ipaddr == ip)
58                         return iface;
59         }
60
61         return NULL;
62 }
63
64 static struct ksock_interface *
65 ksocknal_index2iface(struct lnet_ni *ni, int index)
66 {
67         struct ksock_net *net = ni->ni_data;
68         int i;
69         struct ksock_interface *iface;
70
71         for (i = 0; i < net->ksnn_ninterfaces; i++) {
72                 LASSERT(i < LNET_INTERFACES_NUM);
73                 iface = &net->ksnn_interfaces[i];
74
75                 if (iface->ksni_index == index)
76                         return iface;
77         }
78
79         return NULL;
80 }
81
82 static int ksocknal_ip2index(__u32 ipaddress, struct lnet_ni *ni)
83 {
84         struct net_device *dev;
85         int ret = -1;
86         DECLARE_CONST_IN_IFADDR(ifa);
87
88         rcu_read_lock();
89         for_each_netdev(ni->ni_net_ns, dev) {
90                 int flags = dev_get_flags(dev);
91                 struct in_device *in_dev;
92
93                 if (flags & IFF_LOOPBACK) /* skip the loopback IF */
94                         continue;
95
96                 if (!(flags & IFF_UP))
97                         continue;
98
99                 in_dev = __in_dev_get_rcu(dev);
100                 if (!in_dev)
101                         continue;
102
103                 in_dev_for_each_ifa_rcu(ifa, in_dev) {
104                         if (ntohl(ifa->ifa_local) == ipaddress)
105                                 ret = dev->ifindex;
106                 }
107                 endfor_ifa(in_dev);
108                 if (ret >= 0)
109                         break;
110         }
111         rcu_read_unlock();
112
113         return ret;
114 }
115
116 static struct ksock_route *
117 ksocknal_create_route(__u32 ipaddr, int port)
118 {
119         struct ksock_route *route;
120
121         LIBCFS_ALLOC (route, sizeof (*route));
122         if (route == NULL)
123                 return (NULL);
124
125         atomic_set (&route->ksnr_refcount, 1);
126         route->ksnr_peer = NULL;
127         route->ksnr_retry_interval = 0;         /* OK to connect at any time */
128         route->ksnr_ipaddr = ipaddr;
129         route->ksnr_myiface = -1;
130         route->ksnr_port = port;
131         route->ksnr_scheduled = 0;
132         route->ksnr_connecting = 0;
133         route->ksnr_connected = 0;
134         route->ksnr_deleted = 0;
135         route->ksnr_conn_count = 0;
136         route->ksnr_share_count = 0;
137
138         return route;
139 }
140
141 void
142 ksocknal_destroy_route(struct ksock_route *route)
143 {
144         LASSERT (atomic_read(&route->ksnr_refcount) == 0);
145
146         if (route->ksnr_peer != NULL)
147                 ksocknal_peer_decref(route->ksnr_peer);
148
149         LIBCFS_FREE (route, sizeof (*route));
150 }
151
152 static struct ksock_peer_ni *
153 ksocknal_create_peer(struct lnet_ni *ni, struct lnet_process_id id)
154 {
155         int cpt = lnet_cpt_of_nid(id.nid, ni);
156         struct ksock_net *net = ni->ni_data;
157         struct ksock_peer_ni *peer_ni;
158
159         LASSERT(id.nid != LNET_NID_ANY);
160         LASSERT(id.pid != LNET_PID_ANY);
161         LASSERT(!in_interrupt());
162
163         if (!atomic_inc_unless_negative(&net->ksnn_npeers)) {
164                 CERROR("Can't create peer_ni: network shutdown\n");
165                 return ERR_PTR(-ESHUTDOWN);
166         }
167
168         LIBCFS_CPT_ALLOC(peer_ni, lnet_cpt_table(), cpt, sizeof(*peer_ni));
169         if (!peer_ni) {
170                 atomic_dec(&net->ksnn_npeers);
171                 return ERR_PTR(-ENOMEM);
172         }
173
174         peer_ni->ksnp_ni = ni;
175         peer_ni->ksnp_id = id;
176         atomic_set(&peer_ni->ksnp_refcount, 1); /* 1 ref for caller */
177         peer_ni->ksnp_closing = 0;
178         peer_ni->ksnp_accepting = 0;
179         peer_ni->ksnp_proto = NULL;
180         peer_ni->ksnp_last_alive = 0;
181         peer_ni->ksnp_zc_next_cookie = SOCKNAL_KEEPALIVE_PING + 1;
182
183         INIT_LIST_HEAD(&peer_ni->ksnp_conns);
184         INIT_LIST_HEAD(&peer_ni->ksnp_routes);
185         INIT_LIST_HEAD(&peer_ni->ksnp_tx_queue);
186         INIT_LIST_HEAD(&peer_ni->ksnp_zc_req_list);
187         spin_lock_init(&peer_ni->ksnp_lock);
188
189         return peer_ni;
190 }
191
192 void
193 ksocknal_destroy_peer(struct ksock_peer_ni *peer_ni)
194 {
195         struct ksock_net *net = peer_ni->ksnp_ni->ni_data;
196
197         CDEBUG (D_NET, "peer_ni %s %p deleted\n",
198                 libcfs_id2str(peer_ni->ksnp_id), peer_ni);
199
200         LASSERT(atomic_read(&peer_ni->ksnp_refcount) == 0);
201         LASSERT(peer_ni->ksnp_accepting == 0);
202         LASSERT(list_empty(&peer_ni->ksnp_conns));
203         LASSERT(list_empty(&peer_ni->ksnp_routes));
204         LASSERT(list_empty(&peer_ni->ksnp_tx_queue));
205         LASSERT(list_empty(&peer_ni->ksnp_zc_req_list));
206
207         LIBCFS_FREE(peer_ni, sizeof(*peer_ni));
208
209         /* NB a peer_ni's connections and routes keep a reference on their
210          * peer_ni until they are destroyed, so we can be assured that _all_
211          * state to do with this peer_ni has been cleaned up when its refcount
212          * drops to zero.
213          */
214         if (atomic_dec_and_test(&net->ksnn_npeers))
215                 wake_up_var(&net->ksnn_npeers);
216 }
217
218 struct ksock_peer_ni *
219 ksocknal_find_peer_locked(struct lnet_ni *ni, struct lnet_process_id id)
220 {
221         struct ksock_peer_ni *peer_ni;
222
223         hash_for_each_possible(ksocknal_data.ksnd_peers, peer_ni,
224                                ksnp_list, id.nid) {
225                 LASSERT(!peer_ni->ksnp_closing);
226
227                 if (peer_ni->ksnp_ni != ni)
228                         continue;
229
230                 if (peer_ni->ksnp_id.nid != id.nid ||
231                     peer_ni->ksnp_id.pid != id.pid)
232                         continue;
233
234                 CDEBUG(D_NET, "got peer_ni [%p] -> %s (%d)\n",
235                        peer_ni, libcfs_id2str(id),
236                        atomic_read(&peer_ni->ksnp_refcount));
237                 return peer_ni;
238         }
239         return NULL;
240 }
241
242 struct ksock_peer_ni *
243 ksocknal_find_peer(struct lnet_ni *ni, struct lnet_process_id id)
244 {
245         struct ksock_peer_ni *peer_ni;
246
247         read_lock(&ksocknal_data.ksnd_global_lock);
248         peer_ni = ksocknal_find_peer_locked(ni, id);
249         if (peer_ni != NULL)                    /* +1 ref for caller? */
250                 ksocknal_peer_addref(peer_ni);
251         read_unlock(&ksocknal_data.ksnd_global_lock);
252
253         return (peer_ni);
254 }
255
256 static void
257 ksocknal_unlink_peer_locked(struct ksock_peer_ni *peer_ni)
258 {
259         int i;
260         __u32 ip;
261         struct ksock_interface *iface;
262
263         for (i = 0; i < peer_ni->ksnp_n_passive_ips; i++) {
264                 LASSERT(i < LNET_INTERFACES_NUM);
265                 ip = peer_ni->ksnp_passive_ips[i];
266
267                 iface = ksocknal_ip2iface(peer_ni->ksnp_ni, ip);
268                 /*
269                  * All IPs in peer_ni->ksnp_passive_ips[] come from the
270                  * interface list, therefore the call must succeed.
271                  */
272                 LASSERT(iface != NULL);
273
274                 CDEBUG(D_NET, "peer_ni=%p iface=%p ksni_nroutes=%d\n",
275                        peer_ni, iface, iface->ksni_nroutes);
276                 iface->ksni_npeers--;
277         }
278
279         LASSERT(list_empty(&peer_ni->ksnp_conns));
280         LASSERT(list_empty(&peer_ni->ksnp_routes));
281         LASSERT(!peer_ni->ksnp_closing);
282         peer_ni->ksnp_closing = 1;
283         hlist_del(&peer_ni->ksnp_list);
284         /* lose peerlist's ref */
285         ksocknal_peer_decref(peer_ni);
286 }
287
288 static int
289 ksocknal_get_peer_info(struct lnet_ni *ni, int index,
290                        struct lnet_process_id *id, __u32 *myip, __u32 *peer_ip,
291                        int *port, int *conn_count, int *share_count)
292 {
293         struct ksock_peer_ni *peer_ni;
294         struct ksock_route *route;
295         struct list_head *rtmp;
296         int i;
297         int j;
298         int rc = -ENOENT;
299
300         read_lock(&ksocknal_data.ksnd_global_lock);
301
302         hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list) {
303
304                 if (peer_ni->ksnp_ni != ni)
305                         continue;
306
307                 if (peer_ni->ksnp_n_passive_ips == 0 &&
308                     list_empty(&peer_ni->ksnp_routes)) {
309                         if (index-- > 0)
310                                 continue;
311
312                         *id = peer_ni->ksnp_id;
313                         *myip = 0;
314                         *peer_ip = 0;
315                         *port = 0;
316                         *conn_count = 0;
317                         *share_count = 0;
318                         rc = 0;
319                         goto out;
320                 }
321
322                 for (j = 0; j < peer_ni->ksnp_n_passive_ips; j++) {
323                         if (index-- > 0)
324                                 continue;
325
326                         *id = peer_ni->ksnp_id;
327                         *myip = peer_ni->ksnp_passive_ips[j];
328                         *peer_ip = 0;
329                         *port = 0;
330                         *conn_count = 0;
331                         *share_count = 0;
332                         rc = 0;
333                         goto out;
334                 }
335
336                 list_for_each(rtmp, &peer_ni->ksnp_routes) {
337                         if (index-- > 0)
338                                 continue;
339
340                         route = list_entry(rtmp, struct ksock_route,
341                                            ksnr_list);
342
343                         *id = peer_ni->ksnp_id;
344                         rc = choose_ipv4_src(myip, route->ksnr_myiface,
345                                              route->ksnr_ipaddr,
346                                              ni->ni_net_ns);
347                         *peer_ip = route->ksnr_ipaddr;
348                         *port = route->ksnr_port;
349                         *conn_count = route->ksnr_conn_count;
350                         *share_count = route->ksnr_share_count;
351                         goto out;
352                 }
353         }
354 out:
355         read_unlock(&ksocknal_data.ksnd_global_lock);
356         return rc;
357 }
358
359 static void
360 ksocknal_associate_route_conn_locked(struct ksock_route *route,
361                                      struct ksock_conn *conn)
362 {
363         struct ksock_peer_ni *peer_ni = route->ksnr_peer;
364         int type = conn->ksnc_type;
365         struct ksock_interface *iface;
366         int conn_iface = ksocknal_ip2index(conn->ksnc_myipaddr,
367                                            route->ksnr_peer->ksnp_ni);
368
369         conn->ksnc_route = route;
370         ksocknal_route_addref(route);
371
372         if (route->ksnr_myiface != conn_iface) {
373                 if (route->ksnr_myiface < 0) {
374                         /* route wasn't bound locally yet (the initial route) */
375                         CDEBUG(D_NET, "Binding %s %pI4h to interface %d\n",
376                                libcfs_id2str(peer_ni->ksnp_id),
377                                &route->ksnr_ipaddr,
378                                conn_iface);
379                 } else {
380                         CDEBUG(D_NET,
381                                "Rebinding %s %pI4h from interface %d to %d\n",
382                                libcfs_id2str(peer_ni->ksnp_id),
383                                &route->ksnr_ipaddr,
384                                route->ksnr_myiface,
385                                conn_iface);
386
387                         iface = ksocknal_index2iface(route->ksnr_peer->ksnp_ni,
388                                                      route->ksnr_myiface);
389                         if (iface)
390                                 iface->ksni_nroutes--;
391                 }
392                 route->ksnr_myiface = conn_iface;
393                 iface = ksocknal_index2iface(route->ksnr_peer->ksnp_ni,
394                                              route->ksnr_myiface);
395                 if (iface)
396                         iface->ksni_nroutes++;
397         }
398
399         route->ksnr_connected |= (1<<type);
400         route->ksnr_conn_count++;
401
402         /* Successful connection => further attempts can
403          * proceed immediately
404          */
405         route->ksnr_retry_interval = 0;
406 }
407
408 static void
409 ksocknal_add_route_locked(struct ksock_peer_ni *peer_ni, struct ksock_route *route)
410 {
411         struct list_head *tmp;
412         struct ksock_conn *conn;
413         struct ksock_route *route2;
414
415         LASSERT(!peer_ni->ksnp_closing);
416         LASSERT(route->ksnr_peer == NULL);
417         LASSERT(!route->ksnr_scheduled);
418         LASSERT(!route->ksnr_connecting);
419         LASSERT(route->ksnr_connected == 0);
420
421         /* LASSERT(unique) */
422         list_for_each(tmp, &peer_ni->ksnp_routes) {
423                 route2 = list_entry(tmp, struct ksock_route, ksnr_list);
424
425                 if (route2->ksnr_ipaddr == route->ksnr_ipaddr) {
426                         CERROR("Duplicate route %s %pI4h\n",
427                                libcfs_id2str(peer_ni->ksnp_id),
428                                &route->ksnr_ipaddr);
429                         LBUG();
430                 }
431         }
432
433         route->ksnr_peer = peer_ni;
434         ksocknal_peer_addref(peer_ni);
435         /* peer_ni's routelist takes over my ref on 'route' */
436         list_add_tail(&route->ksnr_list, &peer_ni->ksnp_routes);
437
438         list_for_each(tmp, &peer_ni->ksnp_conns) {
439                 conn = list_entry(tmp, struct ksock_conn, ksnc_list);
440
441                 if (conn->ksnc_ipaddr != route->ksnr_ipaddr)
442                         continue;
443
444                 ksocknal_associate_route_conn_locked(route, conn);
445                 /* keep going (typed routes) */
446         }
447 }
448
449 static void
450 ksocknal_del_route_locked(struct ksock_route *route)
451 {
452         struct ksock_peer_ni *peer_ni = route->ksnr_peer;
453         struct ksock_interface *iface;
454         struct ksock_conn *conn;
455         struct list_head *ctmp;
456         struct list_head *cnxt;
457
458         LASSERT(!route->ksnr_deleted);
459
460         /* Close associated conns */
461         list_for_each_safe(ctmp, cnxt, &peer_ni->ksnp_conns) {
462                 conn = list_entry(ctmp, struct ksock_conn, ksnc_list);
463
464                 if (conn->ksnc_route != route)
465                         continue;
466
467                 ksocknal_close_conn_locked(conn, 0);
468         }
469
470         if (route->ksnr_myiface >= 0) {
471                 iface = ksocknal_index2iface(route->ksnr_peer->ksnp_ni,
472                                              route->ksnr_myiface);
473                 if (iface)
474                         iface->ksni_nroutes--;
475         }
476
477         route->ksnr_deleted = 1;
478         list_del(&route->ksnr_list);
479         ksocknal_route_decref(route);           /* drop peer_ni's ref */
480
481         if (list_empty(&peer_ni->ksnp_routes) &&
482             list_empty(&peer_ni->ksnp_conns)) {
483                 /* I've just removed the last route to a peer_ni with no active
484                  * connections */
485                 ksocknal_unlink_peer_locked(peer_ni);
486         }
487 }
488
489 int
490 ksocknal_add_peer(struct lnet_ni *ni, struct lnet_process_id id, __u32 ipaddr,
491                   int port)
492 {
493         struct list_head *tmp;
494         struct ksock_peer_ni *peer_ni;
495         struct ksock_peer_ni *peer2;
496         struct ksock_route *route;
497         struct ksock_route *route2;
498
499         if (id.nid == LNET_NID_ANY ||
500             id.pid == LNET_PID_ANY)
501                 return (-EINVAL);
502
503         /* Have a brand new peer_ni ready... */
504         peer_ni = ksocknal_create_peer(ni, id);
505         if (IS_ERR(peer_ni))
506                 return PTR_ERR(peer_ni);
507
508         route = ksocknal_create_route (ipaddr, port);
509         if (route == NULL) {
510                 ksocknal_peer_decref(peer_ni);
511                 return (-ENOMEM);
512         }
513
514         write_lock_bh(&ksocknal_data.ksnd_global_lock);
515
516         /* always called with a ref on ni, so shutdown can't have started */
517         LASSERT(atomic_read(&((struct ksock_net *)ni->ni_data)->ksnn_npeers)
518                 >= 0);
519
520         peer2 = ksocknal_find_peer_locked(ni, id);
521         if (peer2 != NULL) {
522                 ksocknal_peer_decref(peer_ni);
523                 peer_ni = peer2;
524         } else {
525                 /* peer_ni table takes my ref on peer_ni */
526                 hash_add(ksocknal_data.ksnd_peers, &peer_ni->ksnp_list, id.nid);
527         }
528
529         route2 = NULL;
530         list_for_each(tmp, &peer_ni->ksnp_routes) {
531                 route2 = list_entry(tmp, struct ksock_route, ksnr_list);
532
533                 if (route2->ksnr_ipaddr == ipaddr)
534                         break;
535
536                 route2 = NULL;
537         }
538         if (route2 == NULL) {
539                 ksocknal_add_route_locked(peer_ni, route);
540                 route->ksnr_share_count++;
541         } else {
542                 ksocknal_route_decref(route);
543                 route2->ksnr_share_count++;
544         }
545
546         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
547
548         return 0;
549 }
550
551 static void
552 ksocknal_del_peer_locked(struct ksock_peer_ni *peer_ni, __u32 ip)
553 {
554         struct ksock_conn *conn;
555         struct ksock_route *route;
556         struct list_head *tmp;
557         struct list_head *nxt;
558         int nshared;
559
560         LASSERT(!peer_ni->ksnp_closing);
561
562         /* Extra ref prevents peer_ni disappearing until I'm done with it */
563         ksocknal_peer_addref(peer_ni);
564
565         list_for_each_safe(tmp, nxt, &peer_ni->ksnp_routes) {
566                 route = list_entry(tmp, struct ksock_route, ksnr_list);
567
568                 /* no match */
569                 if (!(ip == 0 || route->ksnr_ipaddr == ip))
570                         continue;
571
572                 route->ksnr_share_count = 0;
573                 /* This deletes associated conns too */
574                 ksocknal_del_route_locked(route);
575         }
576
577         nshared = 0;
578         list_for_each_safe(tmp, nxt, &peer_ni->ksnp_routes) {
579                 route = list_entry(tmp, struct ksock_route, ksnr_list);
580                 nshared += route->ksnr_share_count;
581         }
582
583         if (nshared == 0) {
584                 /* remove everything else if there are no explicit entries
585                  * left */
586
587                 list_for_each_safe(tmp, nxt, &peer_ni->ksnp_routes) {
588                         route = list_entry(tmp, struct ksock_route, ksnr_list);
589
590                         /* we should only be removing auto-entries */
591                         LASSERT(route->ksnr_share_count == 0);
592                         ksocknal_del_route_locked(route);
593                 }
594
595                 list_for_each_safe(tmp, nxt, &peer_ni->ksnp_conns) {
596                         conn = list_entry(tmp, struct ksock_conn, ksnc_list);
597
598                         ksocknal_close_conn_locked(conn, 0);
599                 }
600         }
601
602         ksocknal_peer_decref(peer_ni);
603         /* NB peer_ni unlinks itself when last conn/route is removed */
604 }
605
606 static int
607 ksocknal_del_peer(struct lnet_ni *ni, struct lnet_process_id id, __u32 ip)
608 {
609         LIST_HEAD(zombies);
610         struct hlist_node *pnxt;
611         struct ksock_peer_ni *peer_ni;
612         int lo;
613         int hi;
614         int i;
615         int rc = -ENOENT;
616
617         write_lock_bh(&ksocknal_data.ksnd_global_lock);
618
619         if (id.nid != LNET_NID_ANY) {
620                 lo = hash_min(id.nid, HASH_BITS(ksocknal_data.ksnd_peers));
621                 hi = lo;
622         } else {
623                 lo = 0;
624                 hi = HASH_SIZE(ksocknal_data.ksnd_peers) - 1;
625         }
626
627         for (i = lo; i <= hi; i++) {
628                 hlist_for_each_entry_safe(peer_ni, pnxt,
629                                           &ksocknal_data.ksnd_peers[i],
630                                           ksnp_list) {
631                         if (peer_ni->ksnp_ni != ni)
632                                 continue;
633
634                         if (!((id.nid == LNET_NID_ANY ||
635                                peer_ni->ksnp_id.nid == id.nid) &&
636                               (id.pid == LNET_PID_ANY ||
637                                peer_ni->ksnp_id.pid == id.pid)))
638                                 continue;
639
640                         ksocknal_peer_addref(peer_ni);  /* a ref for me... */
641
642                         ksocknal_del_peer_locked(peer_ni, ip);
643
644                         if (peer_ni->ksnp_closing &&
645                             !list_empty(&peer_ni->ksnp_tx_queue)) {
646                                 LASSERT(list_empty(&peer_ni->ksnp_conns));
647                                 LASSERT(list_empty(&peer_ni->ksnp_routes));
648
649                                 list_splice_init(&peer_ni->ksnp_tx_queue,
650                                                  &zombies);
651                         }
652
653                         ksocknal_peer_decref(peer_ni);  /* ...till here */
654
655                         rc = 0;                         /* matched! */
656                 }
657         }
658
659         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
660
661         ksocknal_txlist_done(ni, &zombies, -ENETDOWN);
662
663         return rc;
664 }
665
666 static struct ksock_conn *
667 ksocknal_get_conn_by_idx(struct lnet_ni *ni, int index)
668 {
669         struct ksock_peer_ni *peer_ni;
670         struct ksock_conn *conn;
671         struct list_head *ctmp;
672         int i;
673
674         read_lock(&ksocknal_data.ksnd_global_lock);
675
676         hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list) {
677                 LASSERT(!peer_ni->ksnp_closing);
678
679                 if (peer_ni->ksnp_ni != ni)
680                         continue;
681
682                 list_for_each(ctmp, &peer_ni->ksnp_conns) {
683                         if (index-- > 0)
684                                 continue;
685
686                         conn = list_entry(ctmp, struct ksock_conn,
687                                           ksnc_list);
688                         ksocknal_conn_addref(conn);
689                         read_unlock(&ksocknal_data.ksnd_global_lock);
690                         return conn;
691                 }
692         }
693
694         read_unlock(&ksocknal_data.ksnd_global_lock);
695         return NULL;
696 }
697
698 static struct ksock_sched *
699 ksocknal_choose_scheduler_locked(unsigned int cpt)
700 {
701         struct ksock_sched *sched = ksocknal_data.ksnd_schedulers[cpt];
702         int i;
703
704         if (sched->kss_nthreads == 0) {
705                 cfs_percpt_for_each(sched, i, ksocknal_data.ksnd_schedulers) {
706                         if (sched->kss_nthreads > 0) {
707                                 CDEBUG(D_NET, "scheduler[%d] has no threads. selected scheduler[%d]\n",
708                                        cpt, sched->kss_cpt);
709                                 return sched;
710                         }
711                 }
712                 return NULL;
713         }
714
715         return sched;
716 }
717
718 static int
719 ksocknal_local_ipvec(struct lnet_ni *ni, __u32 *ipaddrs)
720 {
721         struct ksock_net *net = ni->ni_data;
722         int i;
723         int nip;
724
725         read_lock(&ksocknal_data.ksnd_global_lock);
726
727         nip = net->ksnn_ninterfaces;
728         LASSERT(nip <= LNET_INTERFACES_NUM);
729
730         /*
731          * Only offer interfaces for additional connections if I have
732          * more than one.
733          */
734         if (nip < 2) {
735                 read_unlock(&ksocknal_data.ksnd_global_lock);
736                 return 0;
737         }
738
739         for (i = 0; i < nip; i++) {
740                 ipaddrs[i] = net->ksnn_interfaces[i].ksni_ipaddr;
741                 LASSERT(ipaddrs[i] != 0);
742         }
743
744         read_unlock(&ksocknal_data.ksnd_global_lock);
745         return nip;
746 }
747
748 static int
749 ksocknal_match_peerip(struct ksock_interface *iface, __u32 *ips, int nips)
750 {
751         int best_netmatch = 0;
752         int best_xor = 0;
753         int best = -1;
754         int this_xor;
755         int this_netmatch;
756         int i;
757
758         for (i = 0; i < nips; i++) {
759                 if (ips[i] == 0)
760                         continue;
761
762                 this_xor = (ips[i] ^ iface->ksni_ipaddr);
763                 this_netmatch = ((this_xor & iface->ksni_netmask) == 0) ? 1 : 0;
764
765                 if (!(best < 0 ||
766                       best_netmatch < this_netmatch ||
767                       (best_netmatch == this_netmatch &&
768                        best_xor > this_xor)))
769                         continue;
770
771                 best = i;
772                 best_netmatch = this_netmatch;
773                 best_xor = this_xor;
774         }
775
776         LASSERT (best >= 0);
777         return (best);
778 }
779
780 static int
781 ksocknal_select_ips(struct ksock_peer_ni *peer_ni, __u32 *peerips, int n_peerips)
782 {
783         rwlock_t *global_lock = &ksocknal_data.ksnd_global_lock;
784         struct ksock_net *net = peer_ni->ksnp_ni->ni_data;
785         struct ksock_interface *iface;
786         struct ksock_interface *best_iface;
787         int n_ips;
788         int i;
789         int j;
790         int k;
791         u32 ip;
792         u32 xor;
793         int this_netmatch;
794         int best_netmatch;
795         int best_npeers;
796
797         /* CAVEAT EMPTOR: We do all our interface matching with an
798          * exclusive hold of global lock at IRQ priority.  We're only
799          * expecting to be dealing with small numbers of interfaces, so the
800          * O(n**3)-ness shouldn't matter */
801
802         /* Also note that I'm not going to return more than n_peerips
803          * interfaces, even if I have more myself */
804
805         write_lock_bh(global_lock);
806
807         LASSERT(n_peerips <= LNET_INTERFACES_NUM);
808         LASSERT(net->ksnn_ninterfaces <= LNET_INTERFACES_NUM);
809
810         /* Only match interfaces for additional connections
811          * if I have > 1 interface
812          */
813         n_ips = (net->ksnn_ninterfaces < 2) ? 0 :
814                 min(n_peerips, net->ksnn_ninterfaces);
815
816         for (i = 0; peer_ni->ksnp_n_passive_ips < n_ips; i++) {
817                 /*              ^ yes really... */
818
819                 /* If we have any new interfaces, first tick off all the
820                  * peer_ni IPs that match old interfaces, then choose new
821                  * interfaces to match the remaining peer_ni IPS.
822                  * We don't forget interfaces we've stopped using; we might
823                  * start using them again... */
824
825                 if (i < peer_ni->ksnp_n_passive_ips) {
826                         /* Old interface. */
827                         ip = peer_ni->ksnp_passive_ips[i];
828                         best_iface = ksocknal_ip2iface(peer_ni->ksnp_ni, ip);
829
830                         /* peer_ni passive ips are kept up to date */
831                         LASSERT(best_iface != NULL);
832                 } else {
833                         /* choose a new interface */
834                         LASSERT (i == peer_ni->ksnp_n_passive_ips);
835
836                         best_iface = NULL;
837                         best_netmatch = 0;
838                         best_npeers = 0;
839
840                         for (j = 0; j < net->ksnn_ninterfaces; j++) {
841                                 iface = &net->ksnn_interfaces[j];
842                                 ip = iface->ksni_ipaddr;
843
844                                 for (k = 0; k < peer_ni->ksnp_n_passive_ips; k++)
845                                         if (peer_ni->ksnp_passive_ips[k] == ip)
846                                                 break;
847
848                                 if (k < peer_ni->ksnp_n_passive_ips) /* using it already */
849                                         continue;
850
851                                 k = ksocknal_match_peerip(iface, peerips, n_peerips);
852                                 xor = (ip ^ peerips[k]);
853                                 this_netmatch = ((xor & iface->ksni_netmask) == 0) ? 1 : 0;
854
855                                 if (!(best_iface == NULL ||
856                                       best_netmatch < this_netmatch ||
857                                       (best_netmatch == this_netmatch &&
858                                        best_npeers > iface->ksni_npeers)))
859                                         continue;
860
861                                 best_iface = iface;
862                                 best_netmatch = this_netmatch;
863                                 best_npeers = iface->ksni_npeers;
864                         }
865
866                         LASSERT(best_iface != NULL);
867
868                         best_iface->ksni_npeers++;
869                         ip = best_iface->ksni_ipaddr;
870                         peer_ni->ksnp_passive_ips[i] = ip;
871                         peer_ni->ksnp_n_passive_ips = i+1;
872                 }
873
874                 /* mark the best matching peer_ni IP used */
875                 j = ksocknal_match_peerip(best_iface, peerips, n_peerips);
876                 peerips[j] = 0;
877         }
878
879         /* Overwrite input peer_ni IP addresses */
880         memcpy(peerips, peer_ni->ksnp_passive_ips, n_ips * sizeof(*peerips));
881
882         write_unlock_bh(global_lock);
883
884         return (n_ips);
885 }
886
887 static void
888 ksocknal_create_routes(struct ksock_peer_ni *peer_ni, int port,
889                        __u32 *peer_ipaddrs, int npeer_ipaddrs)
890 {
891         struct ksock_route              *newroute = NULL;
892         rwlock_t                *global_lock = &ksocknal_data.ksnd_global_lock;
893         struct lnet_ni *ni = peer_ni->ksnp_ni;
894         struct ksock_net                *net = ni->ni_data;
895         struct list_head        *rtmp;
896         struct ksock_route              *route;
897         struct ksock_interface  *iface;
898         struct ksock_interface  *best_iface;
899         int                     best_netmatch;
900         int                     this_netmatch;
901         int                     best_nroutes;
902         int                     i;
903         int                     j;
904
905         /* CAVEAT EMPTOR: We do all our interface matching with an
906          * exclusive hold of global lock at IRQ priority.  We're only
907          * expecting to be dealing with small numbers of interfaces, so the
908          * O(n**3)-ness here shouldn't matter */
909
910         write_lock_bh(global_lock);
911
912         if (net->ksnn_ninterfaces < 2) {
913                 /* Only create additional connections
914                  * if I have > 1 interface */
915                 write_unlock_bh(global_lock);
916                 return;
917         }
918
919         LASSERT(npeer_ipaddrs <= LNET_INTERFACES_NUM);
920
921         for (i = 0; i < npeer_ipaddrs; i++) {
922                 if (newroute != NULL) {
923                         newroute->ksnr_ipaddr = peer_ipaddrs[i];
924                 } else {
925                         write_unlock_bh(global_lock);
926
927                         newroute = ksocknal_create_route(peer_ipaddrs[i], port);
928                         if (newroute == NULL)
929                                 return;
930
931                         write_lock_bh(global_lock);
932                 }
933
934                 if (peer_ni->ksnp_closing) {
935                         /* peer_ni got closed under me */
936                         break;
937                 }
938
939                 /* Already got a route? */
940                 route = NULL;
941                 list_for_each(rtmp, &peer_ni->ksnp_routes) {
942                         route = list_entry(rtmp, struct ksock_route, ksnr_list);
943
944                         if (route->ksnr_ipaddr == newroute->ksnr_ipaddr)
945                                 break;
946
947                         route = NULL;
948                 }
949                 if (route != NULL)
950                         continue;
951
952                 best_iface = NULL;
953                 best_nroutes = 0;
954                 best_netmatch = 0;
955
956                 LASSERT(net->ksnn_ninterfaces <= LNET_INTERFACES_NUM);
957
958                 /* Select interface to connect from */
959                 for (j = 0; j < net->ksnn_ninterfaces; j++) {
960                         iface = &net->ksnn_interfaces[j];
961
962                         /* Using this interface already? */
963                         list_for_each(rtmp, &peer_ni->ksnp_routes) {
964                                 route = list_entry(rtmp, struct ksock_route,
965                                                    ksnr_list);
966
967                                 if (route->ksnr_myiface == iface->ksni_index)
968                                         break;
969
970                                 route = NULL;
971                         }
972                         if (route != NULL)
973                                 continue;
974
975                         this_netmatch = (((iface->ksni_ipaddr ^
976                                            newroute->ksnr_ipaddr) &
977                                           iface->ksni_netmask) == 0) ? 1 : 0;
978
979                         if (!(best_iface == NULL ||
980                               best_netmatch < this_netmatch ||
981                               (best_netmatch == this_netmatch &&
982                                best_nroutes > iface->ksni_nroutes)))
983                                 continue;
984
985                         best_iface = iface;
986                         best_netmatch = this_netmatch;
987                         best_nroutes = iface->ksni_nroutes;
988                 }
989
990                 if (best_iface == NULL)
991                         continue;
992
993                 newroute->ksnr_myiface = best_iface->ksni_index;
994                 best_iface->ksni_nroutes++;
995
996                 ksocknal_add_route_locked(peer_ni, newroute);
997                 newroute = NULL;
998         }
999
1000         write_unlock_bh(global_lock);
1001         if (newroute != NULL)
1002                 ksocknal_route_decref(newroute);
1003 }
1004
1005 int
1006 ksocknal_accept(struct lnet_ni *ni, struct socket *sock)
1007 {
1008         struct ksock_connreq *cr;
1009         int rc;
1010         u32 peer_ip;
1011         int peer_port;
1012
1013         rc = lnet_sock_getaddr(sock, true, &peer_ip, &peer_port);
1014         LASSERT(rc == 0);               /* we succeeded before */
1015
1016         LIBCFS_ALLOC(cr, sizeof(*cr));
1017         if (cr == NULL) {
1018                 LCONSOLE_ERROR_MSG(0x12f, "Dropping connection request from "
1019                                    "%pI4h: memory exhausted\n", &peer_ip);
1020                 return -ENOMEM;
1021         }
1022
1023         lnet_ni_addref(ni);
1024         cr->ksncr_ni   = ni;
1025         cr->ksncr_sock = sock;
1026
1027         spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
1028
1029         list_add_tail(&cr->ksncr_list, &ksocknal_data.ksnd_connd_connreqs);
1030         wake_up(&ksocknal_data.ksnd_connd_waitq);
1031
1032         spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
1033         return 0;
1034 }
1035
1036 static int
1037 ksocknal_connecting(struct ksock_peer_ni *peer_ni, __u32 ipaddr)
1038 {
1039         struct ksock_route *route;
1040
1041         list_for_each_entry(route, &peer_ni->ksnp_routes, ksnr_list) {
1042                 if (route->ksnr_ipaddr == ipaddr)
1043                         return route->ksnr_connecting;
1044         }
1045         return 0;
1046 }
1047
1048 int
1049 ksocknal_create_conn(struct lnet_ni *ni, struct ksock_route *route,
1050                      struct socket *sock, int type)
1051 {
1052         rwlock_t *global_lock = &ksocknal_data.ksnd_global_lock;
1053         LIST_HEAD(zombies);
1054         struct lnet_process_id peerid;
1055         struct list_head *tmp;
1056         u64 incarnation;
1057         struct ksock_conn *conn;
1058         struct ksock_conn *conn2;
1059         struct ksock_peer_ni *peer_ni = NULL;
1060         struct ksock_peer_ni *peer2;
1061         struct ksock_sched *sched;
1062         struct ksock_hello_msg *hello;
1063         int cpt;
1064         struct ksock_tx *tx;
1065         struct ksock_tx *txtmp;
1066         int rc;
1067         int rc2;
1068         int active;
1069         char *warn = NULL;
1070
1071         active = (route != NULL);
1072
1073         LASSERT (active == (type != SOCKLND_CONN_NONE));
1074
1075         LIBCFS_ALLOC(conn, sizeof(*conn));
1076         if (conn == NULL) {
1077                 rc = -ENOMEM;
1078                 goto failed_0;
1079         }
1080
1081         conn->ksnc_peer = NULL;
1082         conn->ksnc_route = NULL;
1083         conn->ksnc_sock = sock;
1084         /* 2 ref, 1 for conn, another extra ref prevents socket
1085          * being closed before establishment of connection */
1086         atomic_set (&conn->ksnc_sock_refcount, 2);
1087         conn->ksnc_type = type;
1088         ksocknal_lib_save_callback(sock, conn);
1089         atomic_set (&conn->ksnc_conn_refcount, 1); /* 1 ref for me */
1090
1091         conn->ksnc_rx_ready = 0;
1092         conn->ksnc_rx_scheduled = 0;
1093
1094         INIT_LIST_HEAD(&conn->ksnc_tx_queue);
1095         conn->ksnc_tx_ready = 0;
1096         conn->ksnc_tx_scheduled = 0;
1097         conn->ksnc_tx_carrier = NULL;
1098         atomic_set (&conn->ksnc_tx_nob, 0);
1099
1100         LIBCFS_ALLOC(hello, offsetof(struct ksock_hello_msg,
1101                                      kshm_ips[LNET_INTERFACES_NUM]));
1102         if (hello == NULL) {
1103                 rc = -ENOMEM;
1104                 goto failed_1;
1105         }
1106
1107         /* stash conn's local and remote addrs */
1108         rc = ksocknal_lib_get_conn_addrs (conn);
1109         if (rc != 0)
1110                 goto failed_1;
1111
1112         /* Find out/confirm peer_ni's NID and connection type and get the
1113          * vector of interfaces she's willing to let me connect to.
1114          * Passive connections use the listener timeout since the peer_ni sends
1115          * eagerly */
1116
1117         if (active) {
1118                 peer_ni = route->ksnr_peer;
1119                 LASSERT(ni == peer_ni->ksnp_ni);
1120
1121                 /* Active connection sends HELLO eagerly */
1122                 hello->kshm_nips = ksocknal_local_ipvec(ni, hello->kshm_ips);
1123                 peerid = peer_ni->ksnp_id;
1124
1125                 write_lock_bh(global_lock);
1126                 conn->ksnc_proto = peer_ni->ksnp_proto;
1127                 write_unlock_bh(global_lock);
1128
1129                 if (conn->ksnc_proto == NULL) {
1130                          conn->ksnc_proto = &ksocknal_protocol_v3x;
1131 #if SOCKNAL_VERSION_DEBUG
1132                          if (*ksocknal_tunables.ksnd_protocol == 2)
1133                                  conn->ksnc_proto = &ksocknal_protocol_v2x;
1134                          else if (*ksocknal_tunables.ksnd_protocol == 1)
1135                                  conn->ksnc_proto = &ksocknal_protocol_v1x;
1136 #endif
1137                 }
1138
1139                 rc = ksocknal_send_hello (ni, conn, peerid.nid, hello);
1140                 if (rc != 0)
1141                         goto failed_1;
1142         } else {
1143                 peerid.nid = LNET_NID_ANY;
1144                 peerid.pid = LNET_PID_ANY;
1145
1146                 /* Passive, get protocol from peer_ni */
1147                 conn->ksnc_proto = NULL;
1148         }
1149
1150         rc = ksocknal_recv_hello (ni, conn, hello, &peerid, &incarnation);
1151         if (rc < 0)
1152                 goto failed_1;
1153
1154         LASSERT (rc == 0 || active);
1155         LASSERT (conn->ksnc_proto != NULL);
1156         LASSERT (peerid.nid != LNET_NID_ANY);
1157
1158         cpt = lnet_cpt_of_nid(peerid.nid, ni);
1159
1160         if (active) {
1161                 ksocknal_peer_addref(peer_ni);
1162                 write_lock_bh(global_lock);
1163         } else {
1164                 peer_ni = ksocknal_create_peer(ni, peerid);
1165                 if (IS_ERR(peer_ni)) {
1166                         rc = PTR_ERR(peer_ni);
1167                         goto failed_1;
1168                 }
1169
1170                 write_lock_bh(global_lock);
1171
1172                 /* called with a ref on ni, so shutdown can't have started */
1173                 LASSERT(atomic_read(&((struct ksock_net *)ni->ni_data)->ksnn_npeers) >= 0);
1174
1175                 peer2 = ksocknal_find_peer_locked(ni, peerid);
1176                 if (peer2 == NULL) {
1177                         /* NB this puts an "empty" peer_ni in the peer_ni
1178                          * table (which takes my ref) */
1179                         hash_add(ksocknal_data.ksnd_peers,
1180                                  &peer_ni->ksnp_list, peerid.nid);
1181                 } else {
1182                         ksocknal_peer_decref(peer_ni);
1183                         peer_ni = peer2;
1184                 }
1185
1186                 /* +1 ref for me */
1187                 ksocknal_peer_addref(peer_ni);
1188                 peer_ni->ksnp_accepting++;
1189
1190                 /* Am I already connecting to this guy?  Resolve in
1191                  * favour of higher NID... */
1192                 if (peerid.nid < ni->ni_nid &&
1193                     ksocknal_connecting(peer_ni, conn->ksnc_ipaddr)) {
1194                         rc = EALREADY;
1195                         warn = "connection race resolution";
1196                         goto failed_2;
1197                 }
1198         }
1199
1200         if (peer_ni->ksnp_closing ||
1201             (active && route->ksnr_deleted)) {
1202                 /* peer_ni/route got closed under me */
1203                 rc = -ESTALE;
1204                 warn = "peer_ni/route removed";
1205                 goto failed_2;
1206         }
1207
1208         if (peer_ni->ksnp_proto == NULL) {
1209                 /* Never connected before.
1210                  * NB recv_hello may have returned EPROTO to signal my peer_ni
1211                  * wants a different protocol than the one I asked for.
1212                  */
1213                 LASSERT(list_empty(&peer_ni->ksnp_conns));
1214
1215                 peer_ni->ksnp_proto = conn->ksnc_proto;
1216                 peer_ni->ksnp_incarnation = incarnation;
1217         }
1218
1219         if (peer_ni->ksnp_proto != conn->ksnc_proto ||
1220             peer_ni->ksnp_incarnation != incarnation) {
1221                 /* peer_ni rebooted or I've got the wrong protocol version */
1222                 ksocknal_close_peer_conns_locked(peer_ni, 0, 0);
1223
1224                 peer_ni->ksnp_proto = NULL;
1225                 rc = ESTALE;
1226                 warn = peer_ni->ksnp_incarnation != incarnation ?
1227                        "peer_ni rebooted" :
1228                        "wrong proto version";
1229                 goto failed_2;
1230         }
1231
1232         switch (rc) {
1233         default:
1234                 LBUG();
1235         case 0:
1236                 break;
1237         case EALREADY:
1238                 warn = "lost conn race";
1239                 goto failed_2;
1240         case EPROTO:
1241                 warn = "retry with different protocol version";
1242                 goto failed_2;
1243         }
1244
1245         /* Refuse to duplicate an existing connection, unless this is a
1246          * loopback connection */
1247         if (conn->ksnc_ipaddr != conn->ksnc_myipaddr) {
1248                 list_for_each(tmp, &peer_ni->ksnp_conns) {
1249                         conn2 = list_entry(tmp, struct ksock_conn, ksnc_list);
1250
1251                         if (conn2->ksnc_ipaddr != conn->ksnc_ipaddr ||
1252                             conn2->ksnc_myipaddr != conn->ksnc_myipaddr ||
1253                             conn2->ksnc_type != conn->ksnc_type)
1254                                 continue;
1255
1256                         /* Reply on a passive connection attempt so the peer_ni
1257                          * realises we're connected. */
1258                         LASSERT (rc == 0);
1259                         if (!active)
1260                                 rc = EALREADY;
1261
1262                         warn = "duplicate";
1263                         goto failed_2;
1264                 }
1265         }
1266
1267         /* If the connection created by this route didn't bind to the IP
1268          * address the route connected to, the connection/route matching
1269          * code below probably isn't going to work. */
1270         if (active &&
1271             route->ksnr_ipaddr != conn->ksnc_ipaddr) {
1272                 CERROR("Route %s %pI4h connected to %pI4h\n",
1273                        libcfs_id2str(peer_ni->ksnp_id),
1274                        &route->ksnr_ipaddr,
1275                        &conn->ksnc_ipaddr);
1276         }
1277
1278         /* Search for a route corresponding to the new connection and
1279          * create an association.  This allows incoming connections created
1280          * by routes in my peer_ni to match my own route entries so I don't
1281          * continually create duplicate routes. */
1282         list_for_each(tmp, &peer_ni->ksnp_routes) {
1283                 route = list_entry(tmp, struct ksock_route, ksnr_list);
1284
1285                 if (route->ksnr_ipaddr != conn->ksnc_ipaddr)
1286                         continue;
1287
1288                 ksocknal_associate_route_conn_locked(route, conn);
1289                 break;
1290         }
1291
1292         conn->ksnc_peer = peer_ni;                 /* conn takes my ref on peer_ni */
1293         peer_ni->ksnp_last_alive = ktime_get_seconds();
1294         peer_ni->ksnp_send_keepalive = 0;
1295         peer_ni->ksnp_error = 0;
1296
1297         sched = ksocknal_choose_scheduler_locked(cpt);
1298         if (!sched) {
1299                 CERROR("no schedulers available. node is unhealthy\n");
1300                 goto failed_2;
1301         }
1302         /*
1303          * The cpt might have changed if we ended up selecting a non cpt
1304          * native scheduler. So use the scheduler's cpt instead.
1305          */
1306         cpt = sched->kss_cpt;
1307         sched->kss_nconns++;
1308         conn->ksnc_scheduler = sched;
1309
1310         conn->ksnc_tx_last_post = ktime_get_seconds();
1311         /* Set the deadline for the outgoing HELLO to drain */
1312         conn->ksnc_tx_bufnob = sock->sk->sk_wmem_queued;
1313         conn->ksnc_tx_deadline = ktime_get_seconds() +
1314                                  lnet_get_lnd_timeout();
1315         smp_mb();   /* order with adding to peer_ni's conn list */
1316
1317         list_add(&conn->ksnc_list, &peer_ni->ksnp_conns);
1318         ksocknal_conn_addref(conn);
1319
1320         ksocknal_new_packet(conn, 0);
1321
1322         conn->ksnc_zc_capable = ksocknal_lib_zc_capable(conn);
1323
1324         /* Take packets blocking for this connection. */
1325         list_for_each_entry_safe(tx, txtmp, &peer_ni->ksnp_tx_queue, tx_list) {
1326                 if (conn->ksnc_proto->pro_match_tx(conn, tx, tx->tx_nonblk) ==
1327                     SOCKNAL_MATCH_NO)
1328                         continue;
1329
1330                 list_del(&tx->tx_list);
1331                 ksocknal_queue_tx_locked(tx, conn);
1332         }
1333
1334         write_unlock_bh(global_lock);
1335
1336         /* We've now got a new connection.  Any errors from here on are just
1337          * like "normal" comms errors and we close the connection normally.
1338          * NB (a) we still have to send the reply HELLO for passive
1339          *        connections,
1340          *    (b) normal I/O on the conn is blocked until I setup and call the
1341          *        socket callbacks.
1342          */
1343
1344         CDEBUG(D_NET, "New conn %s p %d.x %pI4h -> %pI4h/%d"
1345                " incarnation:%lld sched[%d]\n",
1346                libcfs_id2str(peerid), conn->ksnc_proto->pro_version,
1347                &conn->ksnc_myipaddr, &conn->ksnc_ipaddr,
1348                conn->ksnc_port, incarnation, cpt);
1349
1350         if (active) {
1351                 /* additional routes after interface exchange? */
1352                 ksocknal_create_routes(peer_ni, conn->ksnc_port,
1353                                        hello->kshm_ips, hello->kshm_nips);
1354         } else {
1355                 hello->kshm_nips = ksocknal_select_ips(peer_ni, hello->kshm_ips,
1356                                                        hello->kshm_nips);
1357                 rc = ksocknal_send_hello(ni, conn, peerid.nid, hello);
1358         }
1359
1360         LIBCFS_FREE(hello, offsetof(struct ksock_hello_msg,
1361                                     kshm_ips[LNET_INTERFACES_NUM]));
1362
1363         /* setup the socket AFTER I've received hello (it disables
1364          * SO_LINGER).  I might call back to the acceptor who may want
1365          * to send a protocol version response and then close the
1366          * socket; this ensures the socket only tears down after the
1367          * response has been sent. */
1368         if (rc == 0)
1369                 rc = ksocknal_lib_setup_sock(sock);
1370
1371         write_lock_bh(global_lock);
1372
1373         /* NB my callbacks block while I hold ksnd_global_lock */
1374         ksocknal_lib_set_callback(sock, conn);
1375
1376         if (!active)
1377                 peer_ni->ksnp_accepting--;
1378
1379         write_unlock_bh(global_lock);
1380
1381         if (rc != 0) {
1382                 write_lock_bh(global_lock);
1383                 if (!conn->ksnc_closing) {
1384                         /* could be closed by another thread */
1385                         ksocknal_close_conn_locked(conn, rc);
1386                 }
1387                 write_unlock_bh(global_lock);
1388         } else if (ksocknal_connsock_addref(conn) == 0) {
1389                 /* Allow I/O to proceed. */
1390                 ksocknal_read_callback(conn);
1391                 ksocknal_write_callback(conn);
1392                 ksocknal_connsock_decref(conn);
1393         }
1394
1395         ksocknal_connsock_decref(conn);
1396         ksocknal_conn_decref(conn);
1397         return rc;
1398
1399 failed_2:
1400         if (!peer_ni->ksnp_closing &&
1401             list_empty(&peer_ni->ksnp_conns) &&
1402             list_empty(&peer_ni->ksnp_routes)) {
1403                 list_splice_init(&peer_ni->ksnp_tx_queue, &zombies);
1404                 ksocknal_unlink_peer_locked(peer_ni);
1405         }
1406
1407         write_unlock_bh(global_lock);
1408
1409         if (warn != NULL) {
1410                 if (rc < 0)
1411                         CERROR("Not creating conn %s type %d: %s\n",
1412                                libcfs_id2str(peerid), conn->ksnc_type, warn);
1413                 else
1414                         CDEBUG(D_NET, "Not creating conn %s type %d: %s\n",
1415                               libcfs_id2str(peerid), conn->ksnc_type, warn);
1416         }
1417
1418         if (!active) {
1419                 if (rc > 0) {
1420                         /* Request retry by replying with CONN_NONE
1421                          * ksnc_proto has been set already */
1422                         conn->ksnc_type = SOCKLND_CONN_NONE;
1423                         hello->kshm_nips = 0;
1424                         ksocknal_send_hello(ni, conn, peerid.nid, hello);
1425                 }
1426
1427                 write_lock_bh(global_lock);
1428                 peer_ni->ksnp_accepting--;
1429                 write_unlock_bh(global_lock);
1430         }
1431
1432         /*
1433          * If we get here without an error code, just use -EALREADY.
1434          * Depending on how we got here, the error may be positive
1435          * or negative. Normalize the value for ksocknal_txlist_done().
1436          */
1437         rc2 = (rc == 0 ? -EALREADY : (rc > 0 ? -rc : rc));
1438         ksocknal_txlist_done(ni, &zombies, rc2);
1439         ksocknal_peer_decref(peer_ni);
1440
1441 failed_1:
1442         if (hello != NULL)
1443                 LIBCFS_FREE(hello, offsetof(struct ksock_hello_msg,
1444                                             kshm_ips[LNET_INTERFACES_NUM]));
1445
1446         LIBCFS_FREE(conn, sizeof(*conn));
1447
1448 failed_0:
1449         sock_release(sock);
1450         return rc;
1451 }
1452
1453 void
1454 ksocknal_close_conn_locked(struct ksock_conn *conn, int error)
1455 {
1456         /* This just does the immmediate housekeeping, and queues the
1457          * connection for the reaper to terminate.
1458          * Caller holds ksnd_global_lock exclusively in irq context */
1459         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1460         struct ksock_route *route;
1461         struct ksock_conn *conn2;
1462         struct list_head *tmp;
1463
1464         LASSERT(peer_ni->ksnp_error == 0);
1465         LASSERT(!conn->ksnc_closing);
1466         conn->ksnc_closing = 1;
1467
1468         /* ksnd_deathrow_conns takes over peer_ni's ref */
1469         list_del(&conn->ksnc_list);
1470
1471         route = conn->ksnc_route;
1472         if (route != NULL) {
1473                 /* dissociate conn from route... */
1474                 LASSERT(!route->ksnr_deleted);
1475                 LASSERT((route->ksnr_connected & (1 << conn->ksnc_type)) != 0);
1476
1477                 conn2 = NULL;
1478                 list_for_each(tmp, &peer_ni->ksnp_conns) {
1479                         conn2 = list_entry(tmp, struct ksock_conn, ksnc_list);
1480
1481                         if (conn2->ksnc_route == route &&
1482                             conn2->ksnc_type == conn->ksnc_type)
1483                                 break;
1484
1485                         conn2 = NULL;
1486                 }
1487                 if (conn2 == NULL)
1488                         route->ksnr_connected &= ~(1 << conn->ksnc_type);
1489
1490                 conn->ksnc_route = NULL;
1491
1492                 ksocknal_route_decref(route);   /* drop conn's ref on route */
1493         }
1494
1495         if (list_empty(&peer_ni->ksnp_conns)) {
1496                 /* No more connections to this peer_ni */
1497
1498                 if (!list_empty(&peer_ni->ksnp_tx_queue)) {
1499                         struct ksock_tx *tx;
1500
1501                         LASSERT(conn->ksnc_proto == &ksocknal_protocol_v3x);
1502
1503                         /* throw them to the last connection...,
1504                          * these TXs will be send to /dev/null by scheduler */
1505                         list_for_each_entry(tx, &peer_ni->ksnp_tx_queue,
1506                                             tx_list)
1507                                 ksocknal_tx_prep(conn, tx);
1508
1509                         spin_lock_bh(&conn->ksnc_scheduler->kss_lock);
1510                         list_splice_init(&peer_ni->ksnp_tx_queue,
1511                                          &conn->ksnc_tx_queue);
1512                         spin_unlock_bh(&conn->ksnc_scheduler->kss_lock);
1513                 }
1514
1515                 /* renegotiate protocol version */
1516                 peer_ni->ksnp_proto = NULL;
1517                 /* stash last conn close reason */
1518                 peer_ni->ksnp_error = error;
1519
1520                 if (list_empty(&peer_ni->ksnp_routes)) {
1521                         /* I've just closed last conn belonging to a
1522                          * peer_ni with no routes to it */
1523                         ksocknal_unlink_peer_locked(peer_ni);
1524                 }
1525         }
1526
1527         spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
1528
1529         list_add_tail(&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
1530         wake_up(&ksocknal_data.ksnd_reaper_waitq);
1531
1532         spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
1533 }
1534
1535 void
1536 ksocknal_peer_failed(struct ksock_peer_ni *peer_ni)
1537 {
1538         int notify = 0;
1539         time64_t last_alive = 0;
1540
1541         /* There has been a connection failure or comms error; but I'll only
1542          * tell LNET I think the peer_ni is dead if it's to another kernel and
1543          * there are no connections or connection attempts in existence. */
1544
1545         read_lock(&ksocknal_data.ksnd_global_lock);
1546
1547         if ((peer_ni->ksnp_id.pid & LNET_PID_USERFLAG) == 0 &&
1548              list_empty(&peer_ni->ksnp_conns) &&
1549              peer_ni->ksnp_accepting == 0 &&
1550              ksocknal_find_connecting_route_locked(peer_ni) == NULL) {
1551                 notify = 1;
1552                 last_alive = peer_ni->ksnp_last_alive;
1553         }
1554
1555         read_unlock(&ksocknal_data.ksnd_global_lock);
1556
1557         if (notify)
1558                 lnet_notify(peer_ni->ksnp_ni, peer_ni->ksnp_id.nid,
1559                             false, false, last_alive);
1560 }
1561
1562 void
1563 ksocknal_finalize_zcreq(struct ksock_conn *conn)
1564 {
1565         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1566         struct ksock_tx *tx;
1567         struct ksock_tx *tmp;
1568         LIST_HEAD(zlist);
1569
1570         /* NB safe to finalize TXs because closing of socket will
1571          * abort all buffered data */
1572         LASSERT(conn->ksnc_sock == NULL);
1573
1574         spin_lock(&peer_ni->ksnp_lock);
1575
1576         list_for_each_entry_safe(tx, tmp, &peer_ni->ksnp_zc_req_list, tx_zc_list) {
1577                 if (tx->tx_conn != conn)
1578                         continue;
1579
1580                 LASSERT(tx->tx_msg.ksm_zc_cookies[0] != 0);
1581
1582                 tx->tx_msg.ksm_zc_cookies[0] = 0;
1583                 tx->tx_zc_aborted = 1;  /* mark it as not-acked */
1584                 list_move(&tx->tx_zc_list, &zlist);
1585         }
1586
1587         spin_unlock(&peer_ni->ksnp_lock);
1588
1589         while (!list_empty(&zlist)) {
1590                 tx = list_entry(zlist.next, struct ksock_tx, tx_zc_list);
1591
1592                 list_del(&tx->tx_zc_list);
1593                 ksocknal_tx_decref(tx);
1594         }
1595 }
1596
1597 void
1598 ksocknal_terminate_conn(struct ksock_conn *conn)
1599 {
1600         /* This gets called by the reaper (guaranteed thread context) to
1601          * disengage the socket from its callbacks and close it.
1602          * ksnc_refcount will eventually hit zero, and then the reaper will
1603          * destroy it. */
1604         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1605         struct ksock_sched *sched = conn->ksnc_scheduler;
1606         int failed = 0;
1607
1608         LASSERT(conn->ksnc_closing);
1609
1610         /* wake up the scheduler to "send" all remaining packets to /dev/null */
1611         spin_lock_bh(&sched->kss_lock);
1612
1613         /* a closing conn is always ready to tx */
1614         conn->ksnc_tx_ready = 1;
1615
1616         if (!conn->ksnc_tx_scheduled &&
1617             !list_empty(&conn->ksnc_tx_queue)) {
1618                 list_add_tail(&conn->ksnc_tx_list,
1619                                &sched->kss_tx_conns);
1620                 conn->ksnc_tx_scheduled = 1;
1621                 /* extra ref for scheduler */
1622                 ksocknal_conn_addref(conn);
1623
1624                 wake_up (&sched->kss_waitq);
1625         }
1626
1627         spin_unlock_bh(&sched->kss_lock);
1628
1629         /* serialise with callbacks */
1630         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1631
1632         ksocknal_lib_reset_callback(conn->ksnc_sock, conn);
1633
1634         /* OK, so this conn may not be completely disengaged from its
1635          * scheduler yet, but it _has_ committed to terminate... */
1636         conn->ksnc_scheduler->kss_nconns--;
1637
1638         if (peer_ni->ksnp_error != 0) {
1639                 /* peer_ni's last conn closed in error */
1640                 LASSERT(list_empty(&peer_ni->ksnp_conns));
1641                 failed = 1;
1642                 peer_ni->ksnp_error = 0;     /* avoid multiple notifications */
1643         }
1644
1645         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1646
1647         if (failed)
1648                 ksocknal_peer_failed(peer_ni);
1649
1650         /* The socket is closed on the final put; either here, or in
1651          * ksocknal_{send,recv}msg().  Since we set up the linger2 option
1652          * when the connection was established, this will close the socket
1653          * immediately, aborting anything buffered in it. Any hung
1654          * zero-copy transmits will therefore complete in finite time. */
1655         ksocknal_connsock_decref(conn);
1656 }
1657
1658 void
1659 ksocknal_queue_zombie_conn(struct ksock_conn *conn)
1660 {
1661         /* Queue the conn for the reaper to destroy */
1662         LASSERT(atomic_read(&conn->ksnc_conn_refcount) == 0);
1663         spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
1664
1665         list_add_tail(&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1666         wake_up(&ksocknal_data.ksnd_reaper_waitq);
1667
1668         spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
1669 }
1670
1671 void
1672 ksocknal_destroy_conn(struct ksock_conn *conn)
1673 {
1674         time64_t last_rcv;
1675
1676         /* Final coup-de-grace of the reaper */
1677         CDEBUG (D_NET, "connection %p\n", conn);
1678
1679         LASSERT (atomic_read (&conn->ksnc_conn_refcount) == 0);
1680         LASSERT (atomic_read (&conn->ksnc_sock_refcount) == 0);
1681         LASSERT (conn->ksnc_sock == NULL);
1682         LASSERT (conn->ksnc_route == NULL);
1683         LASSERT (!conn->ksnc_tx_scheduled);
1684         LASSERT (!conn->ksnc_rx_scheduled);
1685         LASSERT(list_empty(&conn->ksnc_tx_queue));
1686
1687         /* complete current receive if any */
1688         switch (conn->ksnc_rx_state) {
1689         case SOCKNAL_RX_LNET_PAYLOAD:
1690                 last_rcv = conn->ksnc_rx_deadline -
1691                            lnet_get_lnd_timeout();
1692                 CERROR("Completing partial receive from %s[%d], "
1693                        "ip %pI4h:%d, with error, wanted: %d, left: %d, "
1694                        "last alive is %lld secs ago\n",
1695                        libcfs_id2str(conn->ksnc_peer->ksnp_id), conn->ksnc_type,
1696                        &conn->ksnc_ipaddr, conn->ksnc_port,
1697                        conn->ksnc_rx_nob_wanted, conn->ksnc_rx_nob_left,
1698                        ktime_get_seconds() - last_rcv);
1699                 if (conn->ksnc_lnet_msg)
1700                         conn->ksnc_lnet_msg->msg_health_status =
1701                                 LNET_MSG_STATUS_REMOTE_ERROR;
1702                 lnet_finalize(conn->ksnc_lnet_msg, -EIO);
1703                 break;
1704         case SOCKNAL_RX_LNET_HEADER:
1705                 if (conn->ksnc_rx_started)
1706                         CERROR("Incomplete receive of lnet header from %s, "
1707                                "ip %pI4h:%d, with error, protocol: %d.x.\n",
1708                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1709                                &conn->ksnc_ipaddr, conn->ksnc_port,
1710                                conn->ksnc_proto->pro_version);
1711                 break;
1712         case SOCKNAL_RX_KSM_HEADER:
1713                 if (conn->ksnc_rx_started)
1714                         CERROR("Incomplete receive of ksock message from %s, "
1715                                "ip %pI4h:%d, with error, protocol: %d.x.\n",
1716                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1717                                &conn->ksnc_ipaddr, conn->ksnc_port,
1718                                conn->ksnc_proto->pro_version);
1719                 break;
1720         case SOCKNAL_RX_SLOP:
1721                 if (conn->ksnc_rx_started)
1722                         CERROR("Incomplete receive of slops from %s, "
1723                                "ip %pI4h:%d, with error\n",
1724                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1725                                &conn->ksnc_ipaddr, conn->ksnc_port);
1726                break;
1727         default:
1728                 LBUG ();
1729                 break;
1730         }
1731
1732         ksocknal_peer_decref(conn->ksnc_peer);
1733
1734         LIBCFS_FREE (conn, sizeof (*conn));
1735 }
1736
1737 int
1738 ksocknal_close_peer_conns_locked(struct ksock_peer_ni *peer_ni, __u32 ipaddr, int why)
1739 {
1740         struct ksock_conn *conn;
1741         struct list_head *ctmp;
1742         struct list_head *cnxt;
1743         int count = 0;
1744
1745         list_for_each_safe(ctmp, cnxt, &peer_ni->ksnp_conns) {
1746                 conn = list_entry(ctmp, struct ksock_conn, ksnc_list);
1747
1748                 if (ipaddr == 0 ||
1749                     conn->ksnc_ipaddr == ipaddr) {
1750                         count++;
1751                         ksocknal_close_conn_locked (conn, why);
1752                 }
1753         }
1754
1755         return (count);
1756 }
1757
1758 int
1759 ksocknal_close_conn_and_siblings(struct ksock_conn *conn, int why)
1760 {
1761         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1762         u32 ipaddr = conn->ksnc_ipaddr;
1763         int count;
1764
1765         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1766
1767         count = ksocknal_close_peer_conns_locked (peer_ni, ipaddr, why);
1768
1769         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1770
1771         return (count);
1772 }
1773
1774 int
1775 ksocknal_close_matching_conns(struct lnet_process_id id, __u32 ipaddr)
1776 {
1777         struct ksock_peer_ni *peer_ni;
1778         struct hlist_node *pnxt;
1779         int lo;
1780         int hi;
1781         int i;
1782         int count = 0;
1783
1784         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1785
1786         if (id.nid != LNET_NID_ANY) {
1787                 lo = hash_min(id.nid, HASH_BITS(ksocknal_data.ksnd_peers));
1788                 hi = lo;
1789         } else {
1790                 lo = 0;
1791                 hi = HASH_SIZE(ksocknal_data.ksnd_peers) - 1;
1792         }
1793
1794         for (i = lo; i <= hi; i++) {
1795                 hlist_for_each_entry_safe(peer_ni, pnxt,
1796                                           &ksocknal_data.ksnd_peers[i],
1797                                           ksnp_list) {
1798
1799                         if (!((id.nid == LNET_NID_ANY ||
1800                                id.nid == peer_ni->ksnp_id.nid) &&
1801                               (id.pid == LNET_PID_ANY ||
1802                                id.pid == peer_ni->ksnp_id.pid)))
1803                                 continue;
1804
1805                         count += ksocknal_close_peer_conns_locked(peer_ni,
1806                                                                   ipaddr, 0);
1807                 }
1808         }
1809
1810         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1811
1812         /* wildcards always succeed */
1813         if (id.nid == LNET_NID_ANY || id.pid == LNET_PID_ANY || ipaddr == 0)
1814                 return 0;
1815
1816         return (count == 0 ? -ENOENT : 0);
1817 }
1818
1819 void
1820 ksocknal_notify_gw_down(lnet_nid_t gw_nid)
1821 {
1822         /* The router is telling me she's been notified of a change in
1823          * gateway state....
1824          */
1825         struct lnet_process_id id = {
1826                 .nid    = gw_nid,
1827                 .pid    = LNET_PID_ANY,
1828         };
1829
1830         CDEBUG(D_NET, "gw %s down\n", libcfs_nid2str(gw_nid));
1831
1832         /* If the gateway crashed, close all open connections... */
1833         ksocknal_close_matching_conns(id, 0);
1834         return;
1835
1836         /* We can only establish new connections
1837          * if we have autroutes, and these connect on demand. */
1838 }
1839
1840 static void
1841 ksocknal_push_peer(struct ksock_peer_ni *peer_ni)
1842 {
1843         int index;
1844         int i;
1845         struct list_head *tmp;
1846         struct ksock_conn *conn;
1847
1848         for (index = 0; ; index++) {
1849                 read_lock(&ksocknal_data.ksnd_global_lock);
1850
1851                 i = 0;
1852                 conn = NULL;
1853
1854                 list_for_each(tmp, &peer_ni->ksnp_conns) {
1855                         if (i++ == index) {
1856                                 conn = list_entry(tmp, struct ksock_conn,
1857                                                   ksnc_list);
1858                                 ksocknal_conn_addref(conn);
1859                                 break;
1860                         }
1861                 }
1862
1863                 read_unlock(&ksocknal_data.ksnd_global_lock);
1864
1865                 if (conn == NULL)
1866                         break;
1867
1868                 ksocknal_lib_push_conn (conn);
1869                 ksocknal_conn_decref(conn);
1870         }
1871 }
1872
1873 static int
1874 ksocknal_push(struct lnet_ni *ni, struct lnet_process_id id)
1875 {
1876         int lo;
1877         int hi;
1878         int bkt;
1879         int rc = -ENOENT;
1880
1881         if (id.nid != LNET_NID_ANY) {
1882                 lo = hash_min(id.nid, HASH_BITS(ksocknal_data.ksnd_peers));
1883                 hi = lo;
1884         } else {
1885                 lo = 0;
1886                 hi = HASH_SIZE(ksocknal_data.ksnd_peers) - 1;
1887         }
1888
1889         for (bkt = lo; bkt <= hi; bkt++) {
1890                 int peer_off; /* searching offset in peer_ni hash table */
1891
1892                 for (peer_off = 0; ; peer_off++) {
1893                         struct ksock_peer_ni *peer_ni;
1894                         int           i = 0;
1895
1896                         read_lock(&ksocknal_data.ksnd_global_lock);
1897                         hlist_for_each_entry(peer_ni,
1898                                              &ksocknal_data.ksnd_peers[bkt],
1899                                              ksnp_list) {
1900                                 if (!((id.nid == LNET_NID_ANY ||
1901                                        id.nid == peer_ni->ksnp_id.nid) &&
1902                                       (id.pid == LNET_PID_ANY ||
1903                                        id.pid == peer_ni->ksnp_id.pid)))
1904                                         continue;
1905
1906                                 if (i++ == peer_off) {
1907                                         ksocknal_peer_addref(peer_ni);
1908                                         break;
1909                                 }
1910                         }
1911                         read_unlock(&ksocknal_data.ksnd_global_lock);
1912
1913                         if (i <= peer_off) /* no match */
1914                                 break;
1915
1916                         rc = 0;
1917                         ksocknal_push_peer(peer_ni);
1918                         ksocknal_peer_decref(peer_ni);
1919                 }
1920         }
1921         return rc;
1922 }
1923
1924 static int
1925 ksocknal_add_interface(struct lnet_ni *ni, __u32 ipaddress, __u32 netmask)
1926 {
1927         struct ksock_net *net = ni->ni_data;
1928         struct ksock_interface *iface;
1929         int rc;
1930         int i;
1931         int j;
1932         struct ksock_peer_ni *peer_ni;
1933         struct list_head *rtmp;
1934         struct ksock_route *route;
1935
1936         if (ipaddress == 0 ||
1937             netmask == 0)
1938                 return -EINVAL;
1939
1940         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1941
1942         iface = ksocknal_ip2iface(ni, ipaddress);
1943         if (iface != NULL) {
1944                 /* silently ignore dups */
1945                 rc = 0;
1946         } else if (net->ksnn_ninterfaces == LNET_INTERFACES_NUM) {
1947                 rc = -ENOSPC;
1948         } else {
1949                 iface = &net->ksnn_interfaces[net->ksnn_ninterfaces++];
1950
1951                 iface->ksni_index = ksocknal_ip2index(ipaddress, ni);
1952                 iface->ksni_ipaddr = ipaddress;
1953                 iface->ksni_netmask = netmask;
1954                 iface->ksni_nroutes = 0;
1955                 iface->ksni_npeers = 0;
1956
1957                 hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list) {
1958                         for (j = 0; j < peer_ni->ksnp_n_passive_ips; j++)
1959                                 if (peer_ni->ksnp_passive_ips[j] == ipaddress)
1960                                         iface->ksni_npeers++;
1961
1962                         list_for_each(rtmp, &peer_ni->ksnp_routes) {
1963                                 route = list_entry(rtmp,
1964                                                    struct ksock_route,
1965                                                    ksnr_list);
1966
1967                                 if (route->ksnr_myiface ==
1968                                             iface->ksni_index)
1969                                         iface->ksni_nroutes++;
1970                         }
1971                 }
1972
1973                 rc = 0;
1974                 /* NB only new connections will pay attention to the new
1975                  * interface!
1976                  */
1977         }
1978
1979         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1980
1981         return rc;
1982 }
1983
1984 static void
1985 ksocknal_peer_del_interface_locked(struct ksock_peer_ni *peer_ni,
1986                                    __u32 ipaddr, int index)
1987 {
1988         struct list_head *tmp;
1989         struct list_head *nxt;
1990         struct ksock_route *route;
1991         struct ksock_conn *conn;
1992         int i;
1993         int j;
1994
1995         for (i = 0; i < peer_ni->ksnp_n_passive_ips; i++)
1996                 if (peer_ni->ksnp_passive_ips[i] == ipaddr) {
1997                         for (j = i+1; j < peer_ni->ksnp_n_passive_ips; j++)
1998                                 peer_ni->ksnp_passive_ips[j-1] =
1999                                         peer_ni->ksnp_passive_ips[j];
2000                         peer_ni->ksnp_n_passive_ips--;
2001                         break;
2002                 }
2003
2004         list_for_each_safe(tmp, nxt, &peer_ni->ksnp_routes) {
2005                 route = list_entry(tmp, struct ksock_route, ksnr_list);
2006
2007                 if (route->ksnr_myiface != index)
2008                         continue;
2009
2010                 if (route->ksnr_share_count != 0) {
2011                         /* Manually created; keep, but unbind */
2012                         route->ksnr_myiface = -1;
2013                 } else {
2014                         ksocknal_del_route_locked(route);
2015                 }
2016         }
2017
2018         list_for_each_safe(tmp, nxt, &peer_ni->ksnp_conns) {
2019                 conn = list_entry(tmp, struct ksock_conn, ksnc_list);
2020
2021                 if (conn->ksnc_myipaddr == ipaddr)
2022                         ksocknal_close_conn_locked (conn, 0);
2023         }
2024 }
2025
2026 static int
2027 ksocknal_del_interface(struct lnet_ni *ni, __u32 ipaddress)
2028 {
2029         struct ksock_net *net = ni->ni_data;
2030         int rc = -ENOENT;
2031         struct hlist_node *nxt;
2032         struct ksock_peer_ni *peer_ni;
2033         u32 this_ip;
2034         int index;
2035         int i;
2036         int j;
2037
2038         index = ksocknal_ip2index(ipaddress, ni);
2039
2040         write_lock_bh(&ksocknal_data.ksnd_global_lock);
2041
2042         for (i = 0; i < net->ksnn_ninterfaces; i++) {
2043                 this_ip = net->ksnn_interfaces[i].ksni_ipaddr;
2044
2045                 if (!(ipaddress == 0 ||
2046                       ipaddress == this_ip))
2047                         continue;
2048
2049                 rc = 0;
2050
2051                 for (j = i+1; j < net->ksnn_ninterfaces; j++)
2052                         net->ksnn_interfaces[j-1] =
2053                                 net->ksnn_interfaces[j];
2054
2055                 net->ksnn_ninterfaces--;
2056
2057                 hash_for_each_safe(ksocknal_data.ksnd_peers, j,
2058                                    nxt, peer_ni, ksnp_list) {
2059                         if (peer_ni->ksnp_ni != ni)
2060                                 continue;
2061
2062                         ksocknal_peer_del_interface_locked(peer_ni,
2063                                                            this_ip, index);
2064                 }
2065         }
2066
2067         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
2068
2069         return rc;
2070 }
2071
2072 int
2073 ksocknal_ctl(struct lnet_ni *ni, unsigned int cmd, void *arg)
2074 {
2075         struct lnet_process_id id = {0};
2076         struct libcfs_ioctl_data *data = arg;
2077         int rc;
2078
2079         switch(cmd) {
2080         case IOC_LIBCFS_GET_INTERFACE: {
2081                 struct ksock_net *net = ni->ni_data;
2082                 struct ksock_interface *iface;
2083
2084                 read_lock(&ksocknal_data.ksnd_global_lock);
2085
2086                 if (data->ioc_count >= (__u32)net->ksnn_ninterfaces) {
2087                         rc = -ENOENT;
2088                 } else {
2089                         rc = 0;
2090                         iface = &net->ksnn_interfaces[data->ioc_count];
2091
2092                         data->ioc_u32[0] = iface->ksni_ipaddr;
2093                         data->ioc_u32[1] = iface->ksni_netmask;
2094                         data->ioc_u32[2] = iface->ksni_npeers;
2095                         data->ioc_u32[3] = iface->ksni_nroutes;
2096                 }
2097
2098                 read_unlock(&ksocknal_data.ksnd_global_lock);
2099                 return rc;
2100         }
2101
2102         case IOC_LIBCFS_ADD_INTERFACE:
2103                 return ksocknal_add_interface(ni,
2104                                               data->ioc_u32[0], /* IP address */
2105                                               data->ioc_u32[1]); /* net mask */
2106
2107         case IOC_LIBCFS_DEL_INTERFACE:
2108                 return ksocknal_del_interface(ni,
2109                                               data->ioc_u32[0]); /* IP address */
2110
2111         case IOC_LIBCFS_GET_PEER: {
2112                 __u32            myip = 0;
2113                 __u32            ip = 0;
2114                 int              port = 0;
2115                 int              conn_count = 0;
2116                 int              share_count = 0;
2117
2118                 rc = ksocknal_get_peer_info(ni, data->ioc_count,
2119                                             &id, &myip, &ip, &port,
2120                                             &conn_count,  &share_count);
2121                 if (rc != 0)
2122                         return rc;
2123
2124                 data->ioc_nid    = id.nid;
2125                 data->ioc_count  = share_count;
2126                 data->ioc_u32[0] = ip;
2127                 data->ioc_u32[1] = port;
2128                 data->ioc_u32[2] = myip;
2129                 data->ioc_u32[3] = conn_count;
2130                 data->ioc_u32[4] = id.pid;
2131                 return 0;
2132         }
2133
2134         case IOC_LIBCFS_ADD_PEER:
2135                 id.nid = data->ioc_nid;
2136                 id.pid = LNET_PID_LUSTRE;
2137                 return ksocknal_add_peer (ni, id,
2138                                           data->ioc_u32[0], /* IP */
2139                                           data->ioc_u32[1]); /* port */
2140
2141         case IOC_LIBCFS_DEL_PEER:
2142                 id.nid = data->ioc_nid;
2143                 id.pid = LNET_PID_ANY;
2144                 return ksocknal_del_peer (ni, id,
2145                                           data->ioc_u32[0]); /* IP */
2146
2147         case IOC_LIBCFS_GET_CONN: {
2148                 int           txmem;
2149                 int           rxmem;
2150                 int           nagle;
2151                 struct ksock_conn *conn = ksocknal_get_conn_by_idx(ni, data->ioc_count);
2152
2153                 if (conn == NULL)
2154                         return -ENOENT;
2155
2156                 ksocknal_lib_get_conn_tunables(conn, &txmem, &rxmem, &nagle);
2157
2158                 data->ioc_count  = txmem;
2159                 data->ioc_nid    = conn->ksnc_peer->ksnp_id.nid;
2160                 data->ioc_flags  = nagle;
2161                 data->ioc_u32[0] = conn->ksnc_ipaddr;
2162                 data->ioc_u32[1] = conn->ksnc_port;
2163                 data->ioc_u32[2] = conn->ksnc_myipaddr;
2164                 data->ioc_u32[3] = conn->ksnc_type;
2165                 data->ioc_u32[4] = conn->ksnc_scheduler->kss_cpt;
2166                 data->ioc_u32[5] = rxmem;
2167                 data->ioc_u32[6] = conn->ksnc_peer->ksnp_id.pid;
2168                 ksocknal_conn_decref(conn);
2169                 return 0;
2170         }
2171
2172         case IOC_LIBCFS_CLOSE_CONNECTION:
2173                 id.nid = data->ioc_nid;
2174                 id.pid = LNET_PID_ANY;
2175                 return ksocknal_close_matching_conns (id,
2176                                                       data->ioc_u32[0]);
2177
2178         case IOC_LIBCFS_REGISTER_MYNID:
2179                 /* Ignore if this is a noop */
2180                 if (data->ioc_nid == ni->ni_nid)
2181                         return 0;
2182
2183                 CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID: %s(%s)\n",
2184                        libcfs_nid2str(data->ioc_nid),
2185                        libcfs_nid2str(ni->ni_nid));
2186                 return -EINVAL;
2187
2188         case IOC_LIBCFS_PUSH_CONNECTION:
2189                 id.nid = data->ioc_nid;
2190                 id.pid = LNET_PID_ANY;
2191                 return ksocknal_push(ni, id);
2192
2193         default:
2194                 return -EINVAL;
2195         }
2196         /* not reached */
2197 }
2198
2199 static void
2200 ksocknal_free_buffers (void)
2201 {
2202         LASSERT (atomic_read(&ksocknal_data.ksnd_nactive_txs) == 0);
2203
2204         if (ksocknal_data.ksnd_schedulers != NULL)
2205                 cfs_percpt_free(ksocknal_data.ksnd_schedulers);
2206
2207         spin_lock(&ksocknal_data.ksnd_tx_lock);
2208
2209         if (!list_empty(&ksocknal_data.ksnd_idle_noop_txs)) {
2210                 LIST_HEAD(zlist);
2211                 struct ksock_tx *tx;
2212
2213                 list_splice_init(&ksocknal_data.ksnd_idle_noop_txs, &zlist);
2214                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
2215
2216                 while (!list_empty(&zlist)) {
2217                         tx = list_entry(zlist.next, struct ksock_tx, tx_list);
2218                         list_del(&tx->tx_list);
2219                         LIBCFS_FREE(tx, tx->tx_desc_size);
2220                 }
2221         } else {
2222                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
2223         }
2224 }
2225
2226 static void
2227 ksocknal_base_shutdown(void)
2228 {
2229         struct ksock_sched *sched;
2230         struct ksock_peer_ni *peer_ni;
2231         int i;
2232
2233         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
2234                atomic_read (&libcfs_kmemory));
2235         LASSERT (ksocknal_data.ksnd_nnets == 0);
2236
2237         switch (ksocknal_data.ksnd_init) {
2238         default:
2239                 LASSERT(0);
2240                 /* fallthrough */
2241
2242         case SOCKNAL_INIT_ALL:
2243         case SOCKNAL_INIT_DATA:
2244                 hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list)
2245                         LASSERT(0);
2246
2247                 LASSERT(list_empty(&ksocknal_data.ksnd_nets));
2248                 LASSERT(list_empty(&ksocknal_data.ksnd_enomem_conns));
2249                 LASSERT(list_empty(&ksocknal_data.ksnd_zombie_conns));
2250                 LASSERT(list_empty(&ksocknal_data.ksnd_connd_connreqs));
2251                 LASSERT(list_empty(&ksocknal_data.ksnd_connd_routes));
2252
2253                 if (ksocknal_data.ksnd_schedulers != NULL) {
2254                         cfs_percpt_for_each(sched, i,
2255                                             ksocknal_data.ksnd_schedulers) {
2256
2257                                 LASSERT(list_empty(&sched->kss_tx_conns));
2258                                 LASSERT(list_empty(&sched->kss_rx_conns));
2259                                 LASSERT(list_empty(&sched->kss_zombie_noop_txs));
2260                                 LASSERT(sched->kss_nconns == 0);
2261                         }
2262                 }
2263
2264                 /* flag threads to terminate; wake and wait for them to die */
2265                 ksocknal_data.ksnd_shuttingdown = 1;
2266                 wake_up_all(&ksocknal_data.ksnd_connd_waitq);
2267                 wake_up_all(&ksocknal_data.ksnd_reaper_waitq);
2268
2269                 if (ksocknal_data.ksnd_schedulers != NULL) {
2270                         cfs_percpt_for_each(sched, i,
2271                                             ksocknal_data.ksnd_schedulers)
2272                                         wake_up_all(&sched->kss_waitq);
2273                 }
2274
2275                 wait_var_event_warning(&ksocknal_data.ksnd_nthreads,
2276                                        ksocknal_data.ksnd_nthreads == 0,
2277                                        "waiting for %d threads to terminate\n",
2278                                        ksocknal_data.ksnd_nthreads);
2279
2280                 ksocknal_free_buffers();
2281
2282                 ksocknal_data.ksnd_init = SOCKNAL_INIT_NOTHING;
2283                 break;
2284         }
2285
2286         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
2287                atomic_read (&libcfs_kmemory));
2288
2289         module_put(THIS_MODULE);
2290 }
2291
2292 static int
2293 ksocknal_base_startup(void)
2294 {
2295         struct ksock_sched *sched;
2296         int rc;
2297         int i;
2298
2299         LASSERT(ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
2300         LASSERT(ksocknal_data.ksnd_nnets == 0);
2301
2302         memset(&ksocknal_data, 0, sizeof(ksocknal_data)); /* zero pointers */
2303
2304         hash_init(ksocknal_data.ksnd_peers);
2305
2306         rwlock_init(&ksocknal_data.ksnd_global_lock);
2307         INIT_LIST_HEAD(&ksocknal_data.ksnd_nets);
2308
2309         spin_lock_init(&ksocknal_data.ksnd_reaper_lock);
2310         INIT_LIST_HEAD(&ksocknal_data.ksnd_enomem_conns);
2311         INIT_LIST_HEAD(&ksocknal_data.ksnd_zombie_conns);
2312         INIT_LIST_HEAD(&ksocknal_data.ksnd_deathrow_conns);
2313         init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
2314
2315         spin_lock_init(&ksocknal_data.ksnd_connd_lock);
2316         INIT_LIST_HEAD(&ksocknal_data.ksnd_connd_connreqs);
2317         INIT_LIST_HEAD(&ksocknal_data.ksnd_connd_routes);
2318         init_waitqueue_head(&ksocknal_data.ksnd_connd_waitq);
2319
2320         spin_lock_init(&ksocknal_data.ksnd_tx_lock);
2321         INIT_LIST_HEAD(&ksocknal_data.ksnd_idle_noop_txs);
2322
2323         /* NB memset above zeros whole of ksocknal_data */
2324
2325         /* flag lists/ptrs/locks initialised */
2326         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
2327         if (!try_module_get(THIS_MODULE))
2328                 goto failed;
2329
2330         /* Create a scheduler block per available CPT */
2331         ksocknal_data.ksnd_schedulers = cfs_percpt_alloc(lnet_cpt_table(),
2332                                                          sizeof(*sched));
2333         if (ksocknal_data.ksnd_schedulers == NULL)
2334                 goto failed;
2335
2336         cfs_percpt_for_each(sched, i, ksocknal_data.ksnd_schedulers) {
2337                 int nthrs;
2338
2339                 /*
2340                  * make sure not to allocate more threads than there are
2341                  * cores/CPUs in teh CPT
2342                  */
2343                 nthrs = cfs_cpt_weight(lnet_cpt_table(), i);
2344                 if (*ksocknal_tunables.ksnd_nscheds > 0) {
2345                         nthrs = min(nthrs, *ksocknal_tunables.ksnd_nscheds);
2346                 } else {
2347                         /*
2348                          * max to half of CPUs, assume another half should be
2349                          * reserved for upper layer modules
2350                          */
2351                         nthrs = min(max(SOCKNAL_NSCHEDS, nthrs >> 1), nthrs);
2352                 }
2353
2354                 sched->kss_nthreads_max = nthrs;
2355                 sched->kss_cpt = i;
2356
2357                 spin_lock_init(&sched->kss_lock);
2358                 INIT_LIST_HEAD(&sched->kss_rx_conns);
2359                 INIT_LIST_HEAD(&sched->kss_tx_conns);
2360                 INIT_LIST_HEAD(&sched->kss_zombie_noop_txs);
2361                 init_waitqueue_head(&sched->kss_waitq);
2362         }
2363
2364         ksocknal_data.ksnd_connd_starting         = 0;
2365         ksocknal_data.ksnd_connd_failed_stamp     = 0;
2366         ksocknal_data.ksnd_connd_starting_stamp   = ktime_get_real_seconds();
2367         /* must have at least 2 connds to remain responsive to accepts while
2368          * connecting */
2369         if (*ksocknal_tunables.ksnd_nconnds < SOCKNAL_CONND_RESV + 1)
2370                 *ksocknal_tunables.ksnd_nconnds = SOCKNAL_CONND_RESV + 1;
2371
2372         if (*ksocknal_tunables.ksnd_nconnds_max <
2373             *ksocknal_tunables.ksnd_nconnds) {
2374                 ksocknal_tunables.ksnd_nconnds_max =
2375                         ksocknal_tunables.ksnd_nconnds;
2376         }
2377
2378         for (i = 0; i < *ksocknal_tunables.ksnd_nconnds; i++) {
2379                 char name[16];
2380                 spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
2381                 ksocknal_data.ksnd_connd_starting++;
2382                 spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
2383
2384
2385                 snprintf(name, sizeof(name), "socknal_cd%02d", i);
2386                 rc = ksocknal_thread_start(ksocknal_connd,
2387                                            (void *)((uintptr_t)i), name);
2388                 if (rc != 0) {
2389                         spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
2390                         ksocknal_data.ksnd_connd_starting--;
2391                         spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
2392                         CERROR("Can't spawn socknal connd: %d\n", rc);
2393                         goto failed;
2394                 }
2395         }
2396
2397         rc = ksocknal_thread_start(ksocknal_reaper, NULL, "socknal_reaper");
2398         if (rc != 0) {
2399                 CERROR ("Can't spawn socknal reaper: %d\n", rc);
2400                 goto failed;
2401         }
2402
2403         /* flag everything initialised */
2404         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
2405
2406         return 0;
2407
2408  failed:
2409         ksocknal_base_shutdown();
2410         return -ENETDOWN;
2411 }
2412
2413 static int
2414 ksocknal_debug_peerhash(struct lnet_ni *ni)
2415 {
2416         struct ksock_peer_ni *peer_ni;
2417         int i;
2418
2419         read_lock(&ksocknal_data.ksnd_global_lock);
2420
2421         hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list) {
2422                 struct ksock_route *route;
2423                 struct ksock_conn *conn;
2424
2425                 if (peer_ni->ksnp_ni != ni)
2426                         continue;
2427
2428                 CWARN("Active peer_ni on shutdown: %s, ref %d, "
2429                       "closing %d, accepting %d, err %d, zcookie %llu, "
2430                       "txq %d, zc_req %d\n", libcfs_id2str(peer_ni->ksnp_id),
2431                       atomic_read(&peer_ni->ksnp_refcount),
2432                       peer_ni->ksnp_closing,
2433                       peer_ni->ksnp_accepting, peer_ni->ksnp_error,
2434                       peer_ni->ksnp_zc_next_cookie,
2435                       !list_empty(&peer_ni->ksnp_tx_queue),
2436                       !list_empty(&peer_ni->ksnp_zc_req_list));
2437
2438                 list_for_each_entry(route, &peer_ni->ksnp_routes, ksnr_list) {
2439                         CWARN("Route: ref %d, schd %d, conn %d, cnted %d, "
2440                               "del %d\n", atomic_read(&route->ksnr_refcount),
2441                               route->ksnr_scheduled, route->ksnr_connecting,
2442                               route->ksnr_connected, route->ksnr_deleted);
2443                 }
2444
2445                 list_for_each_entry(conn, &peer_ni->ksnp_conns, ksnc_list) {
2446                         CWARN("Conn: ref %d, sref %d, t %d, c %d\n",
2447                               atomic_read(&conn->ksnc_conn_refcount),
2448                               atomic_read(&conn->ksnc_sock_refcount),
2449                               conn->ksnc_type, conn->ksnc_closing);
2450                 }
2451                 break;
2452         }
2453
2454         read_unlock(&ksocknal_data.ksnd_global_lock);
2455         return 0;
2456 }
2457
2458 void
2459 ksocknal_shutdown(struct lnet_ni *ni)
2460 {
2461         struct ksock_net *net = ni->ni_data;
2462         struct lnet_process_id anyid = {
2463                 .nid = LNET_NID_ANY,
2464                 .pid = LNET_PID_ANY,
2465         };
2466         int i;
2467
2468         LASSERT(ksocknal_data.ksnd_init == SOCKNAL_INIT_ALL);
2469         LASSERT(ksocknal_data.ksnd_nnets > 0);
2470
2471         /* prevent new peers */
2472         atomic_add(SOCKNAL_SHUTDOWN_BIAS, &net->ksnn_npeers);
2473
2474         /* Delete all peers */
2475         ksocknal_del_peer(ni, anyid, 0);
2476
2477         /* Wait for all peer_ni state to clean up */
2478         wait_var_event_warning(&net->ksnn_npeers,
2479                                atomic_read(&net->ksnn_npeers) ==
2480                                SOCKNAL_SHUTDOWN_BIAS,
2481                                "waiting for %d peers to disconnect\n",
2482                                ksocknal_debug_peerhash(ni) +
2483                                atomic_read(&net->ksnn_npeers) -
2484                                SOCKNAL_SHUTDOWN_BIAS);
2485
2486         for (i = 0; i < net->ksnn_ninterfaces; i++) {
2487                 LASSERT(net->ksnn_interfaces[i].ksni_npeers == 0);
2488                 LASSERT(net->ksnn_interfaces[i].ksni_nroutes == 0);
2489         }
2490
2491         list_del(&net->ksnn_list);
2492         LIBCFS_FREE(net, sizeof(*net));
2493
2494         ksocknal_data.ksnd_nnets--;
2495         if (ksocknal_data.ksnd_nnets == 0)
2496                 ksocknal_base_shutdown();
2497 }
2498
2499 static int
2500 ksocknal_search_new_ipif(struct ksock_net *net)
2501 {
2502         int new_ipif = 0;
2503         int i;
2504
2505         for (i = 0; i < net->ksnn_ninterfaces; i++) {
2506                 char *ifnam = &net->ksnn_interfaces[i].ksni_name[0];
2507                 char *colon = strchr(ifnam, ':');
2508                 int found  = 0;
2509                 struct ksock_net *tmp;
2510                 int j;
2511
2512                 if (colon != NULL) /* ignore alias device */
2513                         *colon = 0;
2514
2515                 list_for_each_entry(tmp, &ksocknal_data.ksnd_nets,
2516                                         ksnn_list) {
2517                         for (j = 0; !found && j < tmp->ksnn_ninterfaces; j++) {
2518                                 char *ifnam2 = &tmp->ksnn_interfaces[j].\
2519                                              ksni_name[0];
2520                                 char *colon2 = strchr(ifnam2, ':');
2521
2522                                 if (colon2 != NULL)
2523                                         *colon2 = 0;
2524
2525                                 found = strcmp(ifnam, ifnam2) == 0;
2526                                 if (colon2 != NULL)
2527                                         *colon2 = ':';
2528                         }
2529                         if (found)
2530                                 break;
2531                 }
2532
2533                 new_ipif += !found;
2534                 if (colon != NULL)
2535                         *colon = ':';
2536         }
2537
2538         return new_ipif;
2539 }
2540
2541 static int
2542 ksocknal_start_schedulers(struct ksock_sched *sched)
2543 {
2544         int     nthrs;
2545         int     rc = 0;
2546         int     i;
2547
2548         if (sched->kss_nthreads == 0) {
2549                 if (*ksocknal_tunables.ksnd_nscheds > 0) {
2550                         nthrs = sched->kss_nthreads_max;
2551                 } else {
2552                         nthrs = cfs_cpt_weight(lnet_cpt_table(),
2553                                                sched->kss_cpt);
2554                         nthrs = min(max(SOCKNAL_NSCHEDS, nthrs >> 1), nthrs);
2555                         nthrs = min(SOCKNAL_NSCHEDS_HIGH, nthrs);
2556                 }
2557                 nthrs = min(nthrs, sched->kss_nthreads_max);
2558         } else {
2559                 LASSERT(sched->kss_nthreads <= sched->kss_nthreads_max);
2560                 /* increase two threads if there is new interface */
2561                 nthrs = min(2, sched->kss_nthreads_max - sched->kss_nthreads);
2562         }
2563
2564         for (i = 0; i < nthrs; i++) {
2565                 long id;
2566                 char name[20];
2567
2568                 id = KSOCK_THREAD_ID(sched->kss_cpt, sched->kss_nthreads + i);
2569                 snprintf(name, sizeof(name), "socknal_sd%02d_%02d",
2570                          sched->kss_cpt, (int)KSOCK_THREAD_SID(id));
2571
2572                 rc = ksocknal_thread_start(ksocknal_scheduler,
2573                                            (void *)id, name);
2574                 if (rc == 0)
2575                         continue;
2576
2577                 CERROR("Can't spawn thread %d for scheduler[%d]: %d\n",
2578                        sched->kss_cpt, (int) KSOCK_THREAD_SID(id), rc);
2579                 break;
2580         }
2581
2582         sched->kss_nthreads += i;
2583         return rc;
2584 }
2585
2586 static int
2587 ksocknal_net_start_threads(struct ksock_net *net, __u32 *cpts, int ncpts)
2588 {
2589         int newif = ksocknal_search_new_ipif(net);
2590         int rc;
2591         int i;
2592
2593         if (ncpts > 0 && ncpts > cfs_cpt_number(lnet_cpt_table()))
2594                 return -EINVAL;
2595
2596         for (i = 0; i < ncpts; i++) {
2597                 struct ksock_sched *sched;
2598                 int cpt = (cpts == NULL) ? i : cpts[i];
2599
2600                 LASSERT(cpt < cfs_cpt_number(lnet_cpt_table()));
2601                 sched = ksocknal_data.ksnd_schedulers[cpt];
2602
2603                 if (!newif && sched->kss_nthreads > 0)
2604                         continue;
2605
2606                 rc = ksocknal_start_schedulers(sched);
2607                 if (rc != 0)
2608                         return rc;
2609         }
2610         return 0;
2611 }
2612
2613 int
2614 ksocknal_startup(struct lnet_ni *ni)
2615 {
2616         struct ksock_net *net;
2617         struct lnet_ioctl_config_lnd_cmn_tunables *net_tunables;
2618         struct ksock_interface *ksi = NULL;
2619         struct lnet_inetdev *ifaces = NULL;
2620         int i = 0;
2621         int rc;
2622
2623         LASSERT (ni->ni_net->net_lnd == &the_ksocklnd);
2624
2625         if (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING) {
2626                 rc = ksocknal_base_startup();
2627                 if (rc != 0)
2628                         return rc;
2629         }
2630
2631         LIBCFS_ALLOC(net, sizeof(*net));
2632         if (net == NULL)
2633                 goto fail_0;
2634
2635         net->ksnn_incarnation = ktime_get_real_ns();
2636         ni->ni_data = net;
2637         net_tunables = &ni->ni_net->net_tunables;
2638
2639         if (net_tunables->lct_peer_timeout == -1)
2640                 net_tunables->lct_peer_timeout =
2641                         *ksocknal_tunables.ksnd_peertimeout;
2642
2643         if (net_tunables->lct_max_tx_credits == -1)
2644                 net_tunables->lct_max_tx_credits =
2645                         *ksocknal_tunables.ksnd_credits;
2646
2647         if (net_tunables->lct_peer_tx_credits == -1)
2648                 net_tunables->lct_peer_tx_credits =
2649                         *ksocknal_tunables.ksnd_peertxcredits;
2650
2651         if (net_tunables->lct_peer_tx_credits >
2652             net_tunables->lct_max_tx_credits)
2653                 net_tunables->lct_peer_tx_credits =
2654                         net_tunables->lct_max_tx_credits;
2655
2656         if (net_tunables->lct_peer_rtr_credits == -1)
2657                 net_tunables->lct_peer_rtr_credits =
2658                         *ksocknal_tunables.ksnd_peerrtrcredits;
2659
2660         rc = lnet_inet_enumerate(&ifaces, ni->ni_net_ns);
2661         if (rc < 0)
2662                 goto fail_1;
2663
2664         if (!ni->ni_interfaces[0]) {
2665                 ksi = &net->ksnn_interfaces[0];
2666
2667                 /* Use the first discovered interface */
2668                 net->ksnn_ninterfaces = 1;
2669                 ni->ni_dev_cpt = ifaces[0].li_cpt;
2670                 ksi->ksni_ipaddr = ifaces[0].li_ipaddr;
2671                 ksi->ksni_netmask = ifaces[0].li_netmask;
2672                 strlcpy(ksi->ksni_name, ifaces[0].li_name,
2673                         sizeof(ksi->ksni_name));
2674         } else {
2675                 /* Before Multi-Rail ksocklnd would manage
2676                  * multiple interfaces with its own tcp bonding.
2677                  * If we encounter an old configuration using
2678                  * this tcp bonding approach then we need to
2679                  * handle more than one ni_interfaces.
2680                  *
2681                  * In Multi-Rail configuration only ONE ni_interface
2682                  * should exist. Each IP alias should be mapped to
2683                  * each 'struct net_ni'.
2684                  */
2685                 for (i = 0; i < LNET_INTERFACES_NUM; i++) {
2686                         int j;
2687
2688                         if (!ni->ni_interfaces[i])
2689                                 break;
2690
2691                         for (j = 0; j < LNET_INTERFACES_NUM;  j++) {
2692                                 if (i != j && ni->ni_interfaces[j] &&
2693                                     strcmp(ni->ni_interfaces[i],
2694                                            ni->ni_interfaces[j]) == 0) {
2695                                         rc = -EEXIST;
2696                                         CERROR("ksocklnd: found duplicate %s at %d and %d, rc = %d\n",
2697                                                ni->ni_interfaces[i], i, j, rc);
2698                                         goto fail_1;
2699                                 }
2700                         }
2701
2702                         for (j = 0; j < rc; j++) {
2703                                 if (strcmp(ifaces[j].li_name,
2704                                            ni->ni_interfaces[i]) != 0)
2705                                         continue;
2706
2707                                 ksi = &net->ksnn_interfaces[j];
2708                                 ni->ni_dev_cpt = ifaces[j].li_cpt;
2709                                 ksi->ksni_ipaddr = ifaces[j].li_ipaddr;
2710                                 ksi->ksni_netmask = ifaces[j].li_netmask;
2711                                 strlcpy(ksi->ksni_name, ifaces[j].li_name,
2712                                         sizeof(ksi->ksni_name));
2713                                 net->ksnn_ninterfaces++;
2714                                 break;
2715                         }
2716                 }
2717                 /* ni_interfaces don't map to all network interfaces */
2718                 if (!ksi || net->ksnn_ninterfaces != i) {
2719                         CERROR("ksocklnd: requested %d but only %d interfaces found\n",
2720                                i, net->ksnn_ninterfaces);
2721                         goto fail_1;
2722                 }
2723         }
2724
2725         /* call it before add it to ksocknal_data.ksnd_nets */
2726         rc = ksocknal_net_start_threads(net, ni->ni_cpts, ni->ni_ncpts);
2727         if (rc != 0)
2728                 goto fail_1;
2729
2730         LASSERT(ksi);
2731         ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ksi->ksni_ipaddr);
2732         list_add(&net->ksnn_list, &ksocknal_data.ksnd_nets);
2733
2734         ksocknal_data.ksnd_nnets++;
2735
2736         return 0;
2737
2738  fail_1:
2739         LIBCFS_FREE(net, sizeof(*net));
2740  fail_0:
2741         if (ksocknal_data.ksnd_nnets == 0)
2742                 ksocknal_base_shutdown();
2743
2744         return -ENETDOWN;
2745 }
2746
2747
2748 static void __exit ksocklnd_exit(void)
2749 {
2750         lnet_unregister_lnd(&the_ksocklnd);
2751 }
2752
2753 static const struct lnet_lnd the_ksocklnd = {
2754         .lnd_type               = SOCKLND,
2755         .lnd_startup            = ksocknal_startup,
2756         .lnd_shutdown           = ksocknal_shutdown,
2757         .lnd_ctl                = ksocknal_ctl,
2758         .lnd_send               = ksocknal_send,
2759         .lnd_recv               = ksocknal_recv,
2760         .lnd_notify_peer_down   = ksocknal_notify_gw_down,
2761         .lnd_accept             = ksocknal_accept,
2762 };
2763
2764 static int __init ksocklnd_init(void)
2765 {
2766         int rc;
2767
2768         /* check ksnr_connected/connecting field large enough */
2769         BUILD_BUG_ON(SOCKLND_CONN_NTYPES > 4);
2770         BUILD_BUG_ON(SOCKLND_CONN_ACK != SOCKLND_CONN_BULK_IN);
2771
2772         rc = ksocknal_tunables_init();
2773         if (rc != 0)
2774                 return rc;
2775
2776         lnet_register_lnd(&the_ksocklnd);
2777
2778         return 0;
2779 }
2780
2781 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2782 MODULE_DESCRIPTION("TCP Socket LNet Network Driver");
2783 MODULE_VERSION("2.8.0");
2784 MODULE_LICENSE("GPL");
2785
2786 module_init(ksocklnd_init);
2787 module_exit(ksocklnd_exit);