Whamcloud - gitweb
c7f91840331ea3133f38628b69f8422c4a90d6b5
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lnet/klnds/socklnd/socklnd.c
33  *
34  * Author: Zach Brown <zab@zabbo.net>
35  * Author: Peter J. Braam <braam@clusterfs.com>
36  * Author: Phil Schwan <phil@clusterfs.com>
37  * Author: Eric Barton <eric@bartonsoftware.com>
38  */
39
40 #include <linux/inetdevice.h>
41 #include "socklnd.h"
42 #include <linux/sunrpc/addr.h>
43
44 static const struct lnet_lnd the_ksocklnd;
45 struct ksock_nal_data ksocknal_data;
46
47 static struct ksock_interface *
48 ksocknal_ip2iface(struct lnet_ni *ni, struct sockaddr *addr)
49 {
50         struct ksock_net *net = ni->ni_data;
51         int i;
52         struct ksock_interface *iface;
53
54         for (i = 0; i < net->ksnn_ninterfaces; i++) {
55                 LASSERT(i < LNET_INTERFACES_NUM);
56                 iface = &net->ksnn_interfaces[i];
57
58                 if (rpc_cmp_addr((struct sockaddr *)&iface->ksni_addr, addr))
59                         return iface;
60         }
61
62         return NULL;
63 }
64
65 static struct ksock_interface *
66 ksocknal_index2iface(struct lnet_ni *ni, int index)
67 {
68         struct ksock_net *net = ni->ni_data;
69         int i;
70         struct ksock_interface *iface;
71
72         for (i = 0; i < net->ksnn_ninterfaces; i++) {
73                 LASSERT(i < LNET_INTERFACES_NUM);
74                 iface = &net->ksnn_interfaces[i];
75
76                 if (iface->ksni_index == index)
77                         return iface;
78         }
79
80         return NULL;
81 }
82
83 static int ksocknal_ip2index(struct sockaddr *addr, struct lnet_ni *ni)
84 {
85         struct net_device *dev;
86         int ret = -1;
87         DECLARE_CONST_IN_IFADDR(ifa);
88
89         if (addr->sa_family != AF_INET)
90                 /* No IPv6 support yet */
91                 return ret;
92
93         rcu_read_lock();
94         for_each_netdev(ni->ni_net_ns, dev) {
95                 int flags = dev_get_flags(dev);
96                 struct in_device *in_dev;
97
98                 if (flags & IFF_LOOPBACK) /* skip the loopback IF */
99                         continue;
100
101                 if (!(flags & IFF_UP))
102                         continue;
103
104                 in_dev = __in_dev_get_rcu(dev);
105                 if (!in_dev)
106                         continue;
107
108                 in_dev_for_each_ifa_rcu(ifa, in_dev) {
109                         if (ifa->ifa_local ==
110                             ((struct sockaddr_in *)addr)->sin_addr.s_addr)
111                                 ret = dev->ifindex;
112                 }
113                 endfor_ifa(in_dev);
114                 if (ret >= 0)
115                         break;
116         }
117         rcu_read_unlock();
118
119         return ret;
120 }
121
122 static struct ksock_route *
123 ksocknal_create_route(struct sockaddr *addr)
124 {
125         struct ksock_route *route;
126
127         LIBCFS_ALLOC (route, sizeof (*route));
128         if (route == NULL)
129                 return (NULL);
130
131         refcount_set(&route->ksnr_refcount, 1);
132         route->ksnr_peer = NULL;
133         route->ksnr_retry_interval = 0;         /* OK to connect at any time */
134         rpc_copy_addr((struct sockaddr *)&route->ksnr_addr, addr);
135         rpc_set_port((struct sockaddr *)&route->ksnr_addr, rpc_get_port(addr));
136         route->ksnr_myiface = -1;
137         route->ksnr_scheduled = 0;
138         route->ksnr_connecting = 0;
139         route->ksnr_connected = 0;
140         route->ksnr_deleted = 0;
141         route->ksnr_conn_count = 0;
142         route->ksnr_share_count = 0;
143
144         return route;
145 }
146
147 void
148 ksocknal_destroy_route(struct ksock_route *route)
149 {
150         LASSERT(refcount_read(&route->ksnr_refcount) == 0);
151
152         if (route->ksnr_peer != NULL)
153                 ksocknal_peer_decref(route->ksnr_peer);
154
155         LIBCFS_FREE (route, sizeof (*route));
156 }
157
158 static struct ksock_peer_ni *
159 ksocknal_create_peer(struct lnet_ni *ni, struct lnet_process_id id)
160 {
161         int cpt = lnet_cpt_of_nid(id.nid, ni);
162         struct ksock_net *net = ni->ni_data;
163         struct ksock_peer_ni *peer_ni;
164
165         LASSERT(id.nid != LNET_NID_ANY);
166         LASSERT(id.pid != LNET_PID_ANY);
167         LASSERT(!in_interrupt());
168
169         if (!atomic_inc_unless_negative(&net->ksnn_npeers)) {
170                 CERROR("Can't create peer_ni: network shutdown\n");
171                 return ERR_PTR(-ESHUTDOWN);
172         }
173
174         LIBCFS_CPT_ALLOC(peer_ni, lnet_cpt_table(), cpt, sizeof(*peer_ni));
175         if (!peer_ni) {
176                 atomic_dec(&net->ksnn_npeers);
177                 return ERR_PTR(-ENOMEM);
178         }
179
180         peer_ni->ksnp_ni = ni;
181         peer_ni->ksnp_id = id;
182         refcount_set(&peer_ni->ksnp_refcount, 1); /* 1 ref for caller */
183         peer_ni->ksnp_closing = 0;
184         peer_ni->ksnp_accepting = 0;
185         peer_ni->ksnp_proto = NULL;
186         peer_ni->ksnp_last_alive = 0;
187         peer_ni->ksnp_zc_next_cookie = SOCKNAL_KEEPALIVE_PING + 1;
188
189         INIT_LIST_HEAD(&peer_ni->ksnp_conns);
190         INIT_LIST_HEAD(&peer_ni->ksnp_routes);
191         INIT_LIST_HEAD(&peer_ni->ksnp_tx_queue);
192         INIT_LIST_HEAD(&peer_ni->ksnp_zc_req_list);
193         spin_lock_init(&peer_ni->ksnp_lock);
194
195         return peer_ni;
196 }
197
198 void
199 ksocknal_destroy_peer(struct ksock_peer_ni *peer_ni)
200 {
201         struct ksock_net *net = peer_ni->ksnp_ni->ni_data;
202
203         CDEBUG (D_NET, "peer_ni %s %p deleted\n",
204                 libcfs_id2str(peer_ni->ksnp_id), peer_ni);
205
206         LASSERT(refcount_read(&peer_ni->ksnp_refcount) == 0);
207         LASSERT(peer_ni->ksnp_accepting == 0);
208         LASSERT(list_empty(&peer_ni->ksnp_conns));
209         LASSERT(list_empty(&peer_ni->ksnp_routes));
210         LASSERT(list_empty(&peer_ni->ksnp_tx_queue));
211         LASSERT(list_empty(&peer_ni->ksnp_zc_req_list));
212
213         LIBCFS_FREE(peer_ni, sizeof(*peer_ni));
214
215         /* NB a peer_ni's connections and routes keep a reference on their
216          * peer_ni until they are destroyed, so we can be assured that _all_
217          * state to do with this peer_ni has been cleaned up when its refcount
218          * drops to zero.
219          */
220         if (atomic_dec_and_test(&net->ksnn_npeers))
221                 wake_up_var(&net->ksnn_npeers);
222 }
223
224 struct ksock_peer_ni *
225 ksocknal_find_peer_locked(struct lnet_ni *ni, struct lnet_process_id id)
226 {
227         struct ksock_peer_ni *peer_ni;
228
229         hash_for_each_possible(ksocknal_data.ksnd_peers, peer_ni,
230                                ksnp_list, id.nid) {
231                 LASSERT(!peer_ni->ksnp_closing);
232
233                 if (peer_ni->ksnp_ni != ni)
234                         continue;
235
236                 if (peer_ni->ksnp_id.nid != id.nid ||
237                     peer_ni->ksnp_id.pid != id.pid)
238                         continue;
239
240                 CDEBUG(D_NET, "got peer_ni [%p] -> %s (%d)\n",
241                        peer_ni, libcfs_id2str(id),
242                        refcount_read(&peer_ni->ksnp_refcount));
243                 return peer_ni;
244         }
245         return NULL;
246 }
247
248 struct ksock_peer_ni *
249 ksocknal_find_peer(struct lnet_ni *ni, struct lnet_process_id id)
250 {
251         struct ksock_peer_ni *peer_ni;
252
253         read_lock(&ksocknal_data.ksnd_global_lock);
254         peer_ni = ksocknal_find_peer_locked(ni, id);
255         if (peer_ni != NULL)                    /* +1 ref for caller? */
256                 ksocknal_peer_addref(peer_ni);
257         read_unlock(&ksocknal_data.ksnd_global_lock);
258
259         return (peer_ni);
260 }
261
262 static void
263 ksocknal_unlink_peer_locked(struct ksock_peer_ni *peer_ni)
264 {
265         int i;
266         struct ksock_interface *iface;
267
268         for (i = 0; i < peer_ni->ksnp_n_passive_ips; i++) {
269                 struct sockaddr_in sa = { .sin_family = AF_INET };
270                 LASSERT(i < LNET_INTERFACES_NUM);
271                 sa.sin_addr.s_addr = htonl(peer_ni->ksnp_passive_ips[i]);
272
273                 iface = ksocknal_ip2iface(peer_ni->ksnp_ni,
274                                           (struct sockaddr *)&sa);
275                 /*
276                  * All IPs in peer_ni->ksnp_passive_ips[] come from the
277                  * interface list, therefore the call must succeed.
278                  */
279                 LASSERT(iface != NULL);
280
281                 CDEBUG(D_NET, "peer_ni=%p iface=%p ksni_nroutes=%d\n",
282                        peer_ni, iface, iface->ksni_nroutes);
283                 iface->ksni_npeers--;
284         }
285
286         LASSERT(list_empty(&peer_ni->ksnp_conns));
287         LASSERT(list_empty(&peer_ni->ksnp_routes));
288         LASSERT(!peer_ni->ksnp_closing);
289         peer_ni->ksnp_closing = 1;
290         hlist_del(&peer_ni->ksnp_list);
291         /* lose peerlist's ref */
292         ksocknal_peer_decref(peer_ni);
293 }
294
295 static int
296 ksocknal_get_peer_info(struct lnet_ni *ni, int index,
297                        struct lnet_process_id *id, __u32 *myip, __u32 *peer_ip,
298                        int *port, int *conn_count, int *share_count)
299 {
300         struct ksock_peer_ni *peer_ni;
301         struct ksock_route *route;
302         struct list_head *rtmp;
303         int i;
304         int j;
305         int rc = -ENOENT;
306
307         read_lock(&ksocknal_data.ksnd_global_lock);
308
309         hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list) {
310
311                 if (peer_ni->ksnp_ni != ni)
312                         continue;
313
314                 if (peer_ni->ksnp_n_passive_ips == 0 &&
315                     list_empty(&peer_ni->ksnp_routes)) {
316                         if (index-- > 0)
317                                 continue;
318
319                         *id = peer_ni->ksnp_id;
320                         *myip = 0;
321                         *peer_ip = 0;
322                         *port = 0;
323                         *conn_count = 0;
324                         *share_count = 0;
325                         rc = 0;
326                         goto out;
327                 }
328
329                 for (j = 0; j < peer_ni->ksnp_n_passive_ips; j++) {
330                         if (index-- > 0)
331                                 continue;
332
333                         *id = peer_ni->ksnp_id;
334                         *myip = peer_ni->ksnp_passive_ips[j];
335                         *peer_ip = 0;
336                         *port = 0;
337                         *conn_count = 0;
338                         *share_count = 0;
339                         rc = 0;
340                         goto out;
341                 }
342
343                 list_for_each(rtmp, &peer_ni->ksnp_routes) {
344                         if (index-- > 0)
345                                 continue;
346
347                         route = list_entry(rtmp, struct ksock_route,
348                                            ksnr_list);
349
350                         *id = peer_ni->ksnp_id;
351                         if (route->ksnr_addr.ss_family == AF_INET) {
352                                 struct sockaddr_in *sa =
353                                         (void *)&route->ksnr_addr;
354                                 rc = choose_ipv4_src(
355                                         myip,
356                                         route->ksnr_myiface,
357                                         ntohl(sa->sin_addr.s_addr),
358                                         ni->ni_net_ns);
359                                 *peer_ip = ntohl(sa->sin_addr.s_addr);
360                                 *port = ntohs(sa->sin_port);
361                         } else {
362                                 *myip = 0xFFFFFFFF;
363                                 *peer_ip = 0xFFFFFFFF;
364                                 *port = 0;
365                                 rc = -ENOTSUPP;
366                         }
367                         *conn_count = route->ksnr_conn_count;
368                         *share_count = route->ksnr_share_count;
369                         goto out;
370                 }
371         }
372 out:
373         read_unlock(&ksocknal_data.ksnd_global_lock);
374         return rc;
375 }
376
377 static void
378 ksocknal_associate_route_conn_locked(struct ksock_route *route,
379                                      struct ksock_conn *conn)
380 {
381         struct ksock_peer_ni *peer_ni = route->ksnr_peer;
382         int type = conn->ksnc_type;
383         struct ksock_interface *iface;
384         int conn_iface =
385                 ksocknal_ip2index((struct sockaddr *)&conn->ksnc_myaddr,
386                                   route->ksnr_peer->ksnp_ni);
387
388         conn->ksnc_route = route;
389         ksocknal_route_addref(route);
390
391         if (route->ksnr_myiface != conn_iface) {
392                 if (route->ksnr_myiface < 0) {
393                         /* route wasn't bound locally yet (the initial route) */
394                         CDEBUG(D_NET, "Binding %s %pIS to interface %d\n",
395                                libcfs_id2str(peer_ni->ksnp_id),
396                                &route->ksnr_addr,
397                                conn_iface);
398                 } else {
399                         CDEBUG(D_NET,
400                                "Rebinding %s %pIS from interface %d to %d\n",
401                                libcfs_id2str(peer_ni->ksnp_id),
402                                &route->ksnr_addr,
403                                route->ksnr_myiface,
404                                conn_iface);
405
406                         iface = ksocknal_index2iface(route->ksnr_peer->ksnp_ni,
407                                                      route->ksnr_myiface);
408                         if (iface)
409                                 iface->ksni_nroutes--;
410                 }
411                 route->ksnr_myiface = conn_iface;
412                 iface = ksocknal_index2iface(route->ksnr_peer->ksnp_ni,
413                                              route->ksnr_myiface);
414                 if (iface)
415                         iface->ksni_nroutes++;
416         }
417
418         route->ksnr_connected |= (1<<type);
419         route->ksnr_conn_count++;
420
421         /* Successful connection => further attempts can
422          * proceed immediately
423          */
424         route->ksnr_retry_interval = 0;
425 }
426
427 static void
428 ksocknal_add_route_locked(struct ksock_peer_ni *peer_ni, struct ksock_route *route)
429 {
430         struct list_head *tmp;
431         struct ksock_conn *conn;
432         struct ksock_route *route2;
433         struct ksock_net *net = peer_ni->ksnp_ni->ni_data;
434
435         LASSERT(!peer_ni->ksnp_closing);
436         LASSERT(route->ksnr_peer == NULL);
437         LASSERT(!route->ksnr_scheduled);
438         LASSERT(!route->ksnr_connecting);
439         LASSERT(route->ksnr_connected == 0);
440         LASSERT(net->ksnn_ninterfaces > 0);
441
442         /* LASSERT(unique) */
443         list_for_each(tmp, &peer_ni->ksnp_routes) {
444                 route2 = list_entry(tmp, struct ksock_route, ksnr_list);
445
446                 if (rpc_cmp_addr((struct sockaddr *)&route2->ksnr_addr,
447                                  (struct sockaddr *)&route->ksnr_addr)) {
448                         CERROR("Duplicate route %s %pI4h\n",
449                                libcfs_id2str(peer_ni->ksnp_id),
450                                &route->ksnr_addr);
451                         LBUG();
452                 }
453         }
454
455         route->ksnr_peer = peer_ni;
456         ksocknal_peer_addref(peer_ni);
457
458         /* set the route's interface to the current net's interface */
459         route->ksnr_myiface = net->ksnn_interfaces[0].ksni_index;
460         net->ksnn_interfaces[0].ksni_nroutes++;
461
462         /* peer_ni's routelist takes over my ref on 'route' */
463         list_add_tail(&route->ksnr_list, &peer_ni->ksnp_routes);
464
465         list_for_each(tmp, &peer_ni->ksnp_conns) {
466                 conn = list_entry(tmp, struct ksock_conn, ksnc_list);
467
468                 if (!rpc_cmp_addr((struct sockaddr *)&conn->ksnc_peeraddr,
469                                   (struct sockaddr *)&route->ksnr_addr))
470                         continue;
471
472                 ksocknal_associate_route_conn_locked(route, conn);
473                 /* keep going (typed routes) */
474         }
475 }
476
477 static void
478 ksocknal_del_route_locked(struct ksock_route *route)
479 {
480         struct ksock_peer_ni *peer_ni = route->ksnr_peer;
481         struct ksock_interface *iface;
482         struct ksock_conn *conn;
483         struct ksock_conn *cnxt;
484
485         LASSERT(!route->ksnr_deleted);
486
487         /* Close associated conns */
488         list_for_each_entry_safe(conn, cnxt, &peer_ni->ksnp_conns, ksnc_list) {
489                 if (conn->ksnc_route != route)
490                         continue;
491
492                 ksocknal_close_conn_locked(conn, 0);
493         }
494
495         if (route->ksnr_myiface >= 0) {
496                 iface = ksocknal_index2iface(route->ksnr_peer->ksnp_ni,
497                                              route->ksnr_myiface);
498                 if (iface)
499                         iface->ksni_nroutes--;
500         }
501
502         route->ksnr_deleted = 1;
503         list_del(&route->ksnr_list);
504         ksocknal_route_decref(route);           /* drop peer_ni's ref */
505
506         if (list_empty(&peer_ni->ksnp_routes) &&
507             list_empty(&peer_ni->ksnp_conns)) {
508                 /* I've just removed the last route to a peer_ni with no active
509                  * connections */
510                 ksocknal_unlink_peer_locked(peer_ni);
511         }
512 }
513
514 int
515 ksocknal_add_peer(struct lnet_ni *ni, struct lnet_process_id id, __u32 ipaddr,
516                   int port)
517 {
518         struct list_head *tmp;
519         struct ksock_peer_ni *peer_ni;
520         struct ksock_peer_ni *peer2;
521         struct ksock_route *route;
522         struct ksock_route *route2;
523         struct sockaddr_in sa = {.sin_family = AF_INET};
524
525         if (id.nid == LNET_NID_ANY ||
526             id.pid == LNET_PID_ANY)
527                 return (-EINVAL);
528
529         /* Have a brand new peer_ni ready... */
530         peer_ni = ksocknal_create_peer(ni, id);
531         if (IS_ERR(peer_ni))
532                 return PTR_ERR(peer_ni);
533
534         sa.sin_addr.s_addr = htonl(ipaddr);
535         sa.sin_port = htons(port);
536         route = ksocknal_create_route((struct sockaddr *)&sa);
537         if (route == NULL) {
538                 ksocknal_peer_decref(peer_ni);
539                 return (-ENOMEM);
540         }
541
542         write_lock_bh(&ksocknal_data.ksnd_global_lock);
543
544         /* always called with a ref on ni, so shutdown can't have started */
545         LASSERT(atomic_read(&((struct ksock_net *)ni->ni_data)->ksnn_npeers)
546                 >= 0);
547
548         peer2 = ksocknal_find_peer_locked(ni, id);
549         if (peer2 != NULL) {
550                 ksocknal_peer_decref(peer_ni);
551                 peer_ni = peer2;
552         } else {
553                 /* peer_ni table takes my ref on peer_ni */
554                 hash_add(ksocknal_data.ksnd_peers, &peer_ni->ksnp_list, id.nid);
555         }
556
557         route2 = NULL;
558         list_for_each(tmp, &peer_ni->ksnp_routes) {
559                 route2 = list_entry(tmp, struct ksock_route, ksnr_list);
560
561                 if (route2->ksnr_addr.ss_family == AF_INET &&
562                     ((struct sockaddr_in *)&route2->ksnr_addr)->sin_addr.s_addr
563                     == htonl(ipaddr))
564                         break;
565
566                 route2 = NULL;
567         }
568         if (route2 == NULL) {
569                 ksocknal_add_route_locked(peer_ni, route);
570                 route->ksnr_share_count++;
571         } else {
572                 ksocknal_route_decref(route);
573                 route2->ksnr_share_count++;
574         }
575
576         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
577
578         return 0;
579 }
580
581 static void
582 ksocknal_del_peer_locked(struct ksock_peer_ni *peer_ni, __u32 ip)
583 {
584         struct ksock_conn *conn;
585         struct ksock_conn *cnxt;
586         struct ksock_route *route;
587         struct ksock_route *rnxt;
588         int nshared;
589
590         LASSERT(!peer_ni->ksnp_closing);
591
592         /* Extra ref prevents peer_ni disappearing until I'm done with it */
593         ksocknal_peer_addref(peer_ni);
594
595         list_for_each_entry_safe(route, rnxt, &peer_ni->ksnp_routes,
596                                  ksnr_list) {
597                 /* no match */
598                 if (ip) {
599                         if (route->ksnr_addr.ss_family != AF_INET)
600                                 continue;
601                         if (((struct sockaddr_in *)&route->ksnr_addr)
602                                         ->sin_addr.s_addr != htonl(ip))
603                                 continue;
604                 }
605
606                 route->ksnr_share_count = 0;
607                 /* This deletes associated conns too */
608                 ksocknal_del_route_locked(route);
609         }
610
611         nshared = 0;
612         list_for_each_entry(route, &peer_ni->ksnp_routes, ksnr_list)
613                 nshared += route->ksnr_share_count;
614
615         if (nshared == 0) {
616                 /* remove everything else if there are no explicit entries
617                  * left
618                  */
619                 list_for_each_entry_safe(route, rnxt, &peer_ni->ksnp_routes,
620                                          ksnr_list) {
621                         /* we should only be removing auto-entries */
622                         LASSERT(route->ksnr_share_count == 0);
623                         ksocknal_del_route_locked(route);
624                 }
625
626                 list_for_each_entry_safe(conn, cnxt, &peer_ni->ksnp_conns,
627                                          ksnc_list)
628                         ksocknal_close_conn_locked(conn, 0);
629         }
630
631         ksocknal_peer_decref(peer_ni);
632         /* NB peer_ni unlinks itself when last conn/route is removed */
633 }
634
635 static int
636 ksocknal_del_peer(struct lnet_ni *ni, struct lnet_process_id id, __u32 ip)
637 {
638         LIST_HEAD(zombies);
639         struct hlist_node *pnxt;
640         struct ksock_peer_ni *peer_ni;
641         int lo;
642         int hi;
643         int i;
644         int rc = -ENOENT;
645
646         write_lock_bh(&ksocknal_data.ksnd_global_lock);
647
648         if (id.nid != LNET_NID_ANY) {
649                 lo = hash_min(id.nid, HASH_BITS(ksocknal_data.ksnd_peers));
650                 hi = lo;
651         } else {
652                 lo = 0;
653                 hi = HASH_SIZE(ksocknal_data.ksnd_peers) - 1;
654         }
655
656         for (i = lo; i <= hi; i++) {
657                 hlist_for_each_entry_safe(peer_ni, pnxt,
658                                           &ksocknal_data.ksnd_peers[i],
659                                           ksnp_list) {
660                         if (peer_ni->ksnp_ni != ni)
661                                 continue;
662
663                         if (!((id.nid == LNET_NID_ANY ||
664                                peer_ni->ksnp_id.nid == id.nid) &&
665                               (id.pid == LNET_PID_ANY ||
666                                peer_ni->ksnp_id.pid == id.pid)))
667                                 continue;
668
669                         ksocknal_peer_addref(peer_ni);  /* a ref for me... */
670
671                         ksocknal_del_peer_locked(peer_ni, ip);
672
673                         if (peer_ni->ksnp_closing &&
674                             !list_empty(&peer_ni->ksnp_tx_queue)) {
675                                 LASSERT(list_empty(&peer_ni->ksnp_conns));
676                                 LASSERT(list_empty(&peer_ni->ksnp_routes));
677
678                                 list_splice_init(&peer_ni->ksnp_tx_queue,
679                                                  &zombies);
680                         }
681
682                         ksocknal_peer_decref(peer_ni);  /* ...till here */
683
684                         rc = 0;                         /* matched! */
685                 }
686         }
687
688         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
689
690         ksocknal_txlist_done(ni, &zombies, -ENETDOWN);
691
692         return rc;
693 }
694
695 static struct ksock_conn *
696 ksocknal_get_conn_by_idx(struct lnet_ni *ni, int index)
697 {
698         struct ksock_peer_ni *peer_ni;
699         struct ksock_conn *conn;
700         struct list_head *ctmp;
701         int i;
702
703         read_lock(&ksocknal_data.ksnd_global_lock);
704
705         hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list) {
706                 LASSERT(!peer_ni->ksnp_closing);
707
708                 if (peer_ni->ksnp_ni != ni)
709                         continue;
710
711                 list_for_each(ctmp, &peer_ni->ksnp_conns) {
712                         if (index-- > 0)
713                                 continue;
714
715                         conn = list_entry(ctmp, struct ksock_conn,
716                                           ksnc_list);
717                         ksocknal_conn_addref(conn);
718                         read_unlock(&ksocknal_data.ksnd_global_lock);
719                         return conn;
720                 }
721         }
722
723         read_unlock(&ksocknal_data.ksnd_global_lock);
724         return NULL;
725 }
726
727 static struct ksock_sched *
728 ksocknal_choose_scheduler_locked(unsigned int cpt)
729 {
730         struct ksock_sched *sched = ksocknal_data.ksnd_schedulers[cpt];
731         int i;
732
733         if (sched->kss_nthreads == 0) {
734                 cfs_percpt_for_each(sched, i, ksocknal_data.ksnd_schedulers) {
735                         if (sched->kss_nthreads > 0) {
736                                 CDEBUG(D_NET, "scheduler[%d] has no threads. selected scheduler[%d]\n",
737                                        cpt, sched->kss_cpt);
738                                 return sched;
739                         }
740                 }
741                 return NULL;
742         }
743
744         return sched;
745 }
746
747 static int
748 ksocknal_local_ipvec(struct lnet_ni *ni, __u32 *ipaddrs)
749 {
750         struct ksock_net *net = ni->ni_data;
751         int i, j;
752         int nip;
753
754         read_lock(&ksocknal_data.ksnd_global_lock);
755
756         nip = net->ksnn_ninterfaces;
757         LASSERT(nip <= LNET_INTERFACES_NUM);
758
759         for (i = 0, j = 0; i < nip; i++)
760                 if (net->ksnn_interfaces[i].ksni_addr.ss_family == AF_INET) {
761                         struct sockaddr_in *sa =
762                                 (void *)&net->ksnn_interfaces[i].ksni_addr;
763
764                         ipaddrs[j] = ntohl(sa->sin_addr.s_addr);
765                         LASSERT(ipaddrs[j] != 0);
766                         j += 1;
767                 }
768         nip = j;
769
770         read_unlock(&ksocknal_data.ksnd_global_lock);
771         /*
772          * Only offer interfaces for additional connections if I have
773          * more than one.
774          */
775         return nip < 2 ? 0 : nip;
776 }
777
778 static int
779 ksocknal_match_peerip(struct ksock_interface *iface, __u32 *ips, int nips)
780 {
781         int best_netmatch = 0;
782         int best_xor = 0;
783         int best = -1;
784         int this_xor;
785         int this_netmatch;
786         int i;
787         struct sockaddr_in *sa;
788         __u32 ip;
789
790         sa = (struct sockaddr_in *)&iface->ksni_addr;
791         LASSERT(sa->sin_family == AF_INET);
792         ip = ntohl(sa->sin_addr.s_addr);
793
794         for (i = 0; i < nips; i++) {
795                 if (ips[i] == 0)
796                         continue;
797
798                 this_xor = ips[i] ^ ip;
799                 this_netmatch = ((this_xor & iface->ksni_netmask) == 0) ? 1 : 0;
800
801                 if (!(best < 0 ||
802                       best_netmatch < this_netmatch ||
803                       (best_netmatch == this_netmatch &&
804                        best_xor > this_xor)))
805                         continue;
806
807                 best = i;
808                 best_netmatch = this_netmatch;
809                 best_xor = this_xor;
810         }
811
812         LASSERT(best >= 0);
813         return best;
814 }
815
816 static int
817 ksocknal_select_ips(struct ksock_peer_ni *peer_ni, __u32 *peerips, int n_peerips)
818 {
819         rwlock_t *global_lock = &ksocknal_data.ksnd_global_lock;
820         struct ksock_net *net = peer_ni->ksnp_ni->ni_data;
821         struct ksock_interface *iface;
822         struct ksock_interface *best_iface;
823         int n_ips;
824         int i;
825         int j;
826         int k;
827         u32 ip;
828         u32 xor;
829         int this_netmatch;
830         int best_netmatch;
831         int best_npeers;
832
833         /* CAVEAT EMPTOR: We do all our interface matching with an
834          * exclusive hold of global lock at IRQ priority.  We're only
835          * expecting to be dealing with small numbers of interfaces, so the
836          * O(n**3)-ness shouldn't matter */
837
838         /* Also note that I'm not going to return more than n_peerips
839          * interfaces, even if I have more myself */
840
841         write_lock_bh(global_lock);
842
843         LASSERT(n_peerips <= LNET_INTERFACES_NUM);
844         LASSERT(net->ksnn_ninterfaces <= LNET_INTERFACES_NUM);
845
846         /* Only match interfaces for additional connections
847          * if I have > 1 interface
848          */
849         n_ips = (net->ksnn_ninterfaces < 2) ? 0 :
850                 min(n_peerips, net->ksnn_ninterfaces);
851
852         for (i = 0; peer_ni->ksnp_n_passive_ips < n_ips; i++) {
853                 /*              ^ yes really... */
854
855                 /* If we have any new interfaces, first tick off all the
856                  * peer_ni IPs that match old interfaces, then choose new
857                  * interfaces to match the remaining peer_ni IPS.
858                  * We don't forget interfaces we've stopped using; we might
859                  * start using them again... */
860
861                 if (i < peer_ni->ksnp_n_passive_ips) {
862                         /* Old interface. */
863                         struct sockaddr_in sa = { .sin_family = AF_INET};
864
865                         sa.sin_addr.s_addr =
866                                 htonl(peer_ni->ksnp_passive_ips[i]);
867                         best_iface = ksocknal_ip2iface(peer_ni->ksnp_ni,
868                                                        (struct sockaddr *)&sa);
869
870                         /* peer_ni passive ips are kept up to date */
871                         LASSERT(best_iface != NULL);
872                 } else {
873                         /* choose a new interface */
874                         struct sockaddr_in *sa;
875
876                         LASSERT (i == peer_ni->ksnp_n_passive_ips);
877
878                         best_iface = NULL;
879                         best_netmatch = 0;
880                         best_npeers = 0;
881
882                         for (j = 0; j < net->ksnn_ninterfaces; j++) {
883                                 iface = &net->ksnn_interfaces[j];
884                                 sa = (void *)&iface->ksni_addr;
885                                 if (sa->sin_family != AF_INET)
886                                         continue;
887                                 ip = ntohl(sa->sin_addr.s_addr);
888
889                                 for (k = 0; k < peer_ni->ksnp_n_passive_ips; k++)
890                                         if (peer_ni->ksnp_passive_ips[k] == ip)
891                                                 break;
892
893                                 if (k < peer_ni->ksnp_n_passive_ips) /* using it already */
894                                         continue;
895
896                                 k = ksocknal_match_peerip(iface, peerips, n_peerips);
897                                 xor = (ip ^ peerips[k]);
898                                 this_netmatch = ((xor & iface->ksni_netmask) == 0) ? 1 : 0;
899
900                                 if (!(best_iface == NULL ||
901                                       best_netmatch < this_netmatch ||
902                                       (best_netmatch == this_netmatch &&
903                                        best_npeers > iface->ksni_npeers)))
904                                         continue;
905
906                                 best_iface = iface;
907                                 best_netmatch = this_netmatch;
908                                 best_npeers = iface->ksni_npeers;
909                         }
910
911                         LASSERT(best_iface != NULL);
912
913                         best_iface->ksni_npeers++;
914                         sa = (void *)&best_iface->ksni_addr;
915                         ip = ntohl(sa->sin_addr.s_addr);
916                         peer_ni->ksnp_passive_ips[i] = ip;
917                         peer_ni->ksnp_n_passive_ips = i+1;
918                 }
919
920                 /* mark the best matching peer_ni IP used */
921                 j = ksocknal_match_peerip(best_iface, peerips, n_peerips);
922                 peerips[j] = 0;
923         }
924
925         /* Overwrite input peer_ni IP addresses */
926         memcpy(peerips, peer_ni->ksnp_passive_ips, n_ips * sizeof(*peerips));
927
928         write_unlock_bh(global_lock);
929
930         return (n_ips);
931 }
932
933 static void
934 ksocknal_create_routes(struct ksock_peer_ni *peer_ni, int port,
935                        __u32 *peer_ipaddrs, int npeer_ipaddrs)
936 {
937         struct ksock_route              *newroute = NULL;
938         rwlock_t                *global_lock = &ksocknal_data.ksnd_global_lock;
939         struct lnet_ni *ni = peer_ni->ksnp_ni;
940         struct ksock_net                *net = ni->ni_data;
941         struct list_head        *rtmp;
942         struct ksock_route              *route;
943         struct ksock_interface  *iface;
944         struct ksock_interface  *best_iface;
945         int                     best_netmatch;
946         int                     this_netmatch;
947         int                     best_nroutes;
948         int                     i;
949         int                     j;
950
951         /* CAVEAT EMPTOR: We do all our interface matching with an
952          * exclusive hold of global lock at IRQ priority.  We're only
953          * expecting to be dealing with small numbers of interfaces, so the
954          * O(n**3)-ness here shouldn't matter */
955
956         write_lock_bh(global_lock);
957
958         if (net->ksnn_ninterfaces < 2) {
959                 /* Only create additional connections
960                  * if I have > 1 interface */
961                 write_unlock_bh(global_lock);
962                 return;
963         }
964
965         LASSERT(npeer_ipaddrs <= LNET_INTERFACES_NUM);
966
967         for (i = 0; i < npeer_ipaddrs; i++) {
968                 if (newroute) {
969                         struct sockaddr_in *sa = (void *)&newroute->ksnr_addr;
970
971                         memset(sa, 0, sizeof(*sa));
972                         sa->sin_family = AF_INET;
973                         sa->sin_addr.s_addr = htonl(peer_ipaddrs[i]);
974                 } else {
975                         struct sockaddr_in sa = {.sin_family = AF_INET};
976
977                         write_unlock_bh(global_lock);
978
979                         sa.sin_addr.s_addr = htonl(peer_ipaddrs[i]);
980                         sa.sin_port = htons(port);
981                         newroute =
982                                 ksocknal_create_route((struct sockaddr *)&sa);
983                         if (!newroute)
984                                 return;
985
986                         write_lock_bh(global_lock);
987                 }
988
989                 if (peer_ni->ksnp_closing) {
990                         /* peer_ni got closed under me */
991                         break;
992                 }
993
994                 /* Already got a route? */
995                 route = NULL;
996                 list_for_each(rtmp, &peer_ni->ksnp_routes) {
997                         route = list_entry(rtmp, struct ksock_route, ksnr_list);
998
999                         if (rpc_cmp_addr(
1000                                     (struct sockaddr *)&route->ksnr_addr,
1001                                     (struct sockaddr *)&newroute->ksnr_addr))
1002                                 break;
1003
1004                         route = NULL;
1005                 }
1006                 if (route != NULL)
1007                         continue;
1008
1009                 best_iface = NULL;
1010                 best_nroutes = 0;
1011                 best_netmatch = 0;
1012
1013                 LASSERT(net->ksnn_ninterfaces <= LNET_INTERFACES_NUM);
1014
1015                 /* Select interface to connect from */
1016                 for (j = 0; j < net->ksnn_ninterfaces; j++) {
1017                         __u32 iface_ip, route_ip;
1018
1019                         iface = &net->ksnn_interfaces[j];
1020
1021                         /* Using this interface already? */
1022                         list_for_each(rtmp, &peer_ni->ksnp_routes) {
1023                                 route = list_entry(rtmp, struct ksock_route,
1024                                                    ksnr_list);
1025
1026                                 if (route->ksnr_myiface == iface->ksni_index)
1027                                         break;
1028
1029                                 route = NULL;
1030                         }
1031                         if (route != NULL)
1032                                 continue;
1033                         if (iface->ksni_addr.ss_family != AF_INET)
1034                                 continue;
1035                         if (newroute->ksnr_addr.ss_family != AF_INET)
1036                                 continue;
1037
1038                         iface_ip =
1039                                 ntohl(((struct sockaddr_in *)
1040                                        &iface->ksni_addr)->sin_addr.s_addr);
1041                         route_ip =
1042                                 ntohl(((struct sockaddr_in *)
1043                                        &newroute->ksnr_addr)->sin_addr.s_addr);
1044
1045                         this_netmatch = (((iface_ip ^ route_ip) &
1046                                           iface->ksni_netmask) == 0) ? 1 : 0;
1047
1048                         if (!(best_iface == NULL ||
1049                               best_netmatch < this_netmatch ||
1050                               (best_netmatch == this_netmatch &&
1051                                best_nroutes > iface->ksni_nroutes)))
1052                                 continue;
1053
1054                         best_iface = iface;
1055                         best_netmatch = this_netmatch;
1056                         best_nroutes = iface->ksni_nroutes;
1057                 }
1058
1059                 if (best_iface == NULL)
1060                         continue;
1061
1062                 newroute->ksnr_myiface = best_iface->ksni_index;
1063                 best_iface->ksni_nroutes++;
1064
1065                 ksocknal_add_route_locked(peer_ni, newroute);
1066                 newroute = NULL;
1067         }
1068
1069         write_unlock_bh(global_lock);
1070         if (newroute != NULL)
1071                 ksocknal_route_decref(newroute);
1072 }
1073
1074 int
1075 ksocknal_accept(struct lnet_ni *ni, struct socket *sock)
1076 {
1077         struct ksock_connreq *cr;
1078         int rc;
1079         struct sockaddr_storage peer;
1080
1081         rc = lnet_sock_getaddr(sock, true, &peer);
1082         LASSERT(rc == 0);               /* we succeeded before */
1083
1084         LIBCFS_ALLOC(cr, sizeof(*cr));
1085         if (cr == NULL) {
1086                 LCONSOLE_ERROR_MSG(0x12f,
1087                                    "Dropping connection request from %pIS: memory exhausted\n",
1088                                    &peer);
1089                 return -ENOMEM;
1090         }
1091
1092         lnet_ni_addref(ni);
1093         cr->ksncr_ni   = ni;
1094         cr->ksncr_sock = sock;
1095
1096         spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
1097
1098         list_add_tail(&cr->ksncr_list, &ksocknal_data.ksnd_connd_connreqs);
1099         wake_up(&ksocknal_data.ksnd_connd_waitq);
1100
1101         spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
1102         return 0;
1103 }
1104
1105 static int
1106 ksocknal_connecting(struct ksock_peer_ni *peer_ni, struct sockaddr *sa)
1107 {
1108         struct ksock_route *route;
1109
1110         list_for_each_entry(route, &peer_ni->ksnp_routes, ksnr_list) {
1111                 if (rpc_cmp_addr((struct sockaddr *)&route->ksnr_addr, sa))
1112                         return route->ksnr_connecting;
1113         }
1114         return 0;
1115 }
1116
1117 int
1118 ksocknal_create_conn(struct lnet_ni *ni, struct ksock_route *route,
1119                      struct socket *sock, int type)
1120 {
1121         rwlock_t *global_lock = &ksocknal_data.ksnd_global_lock;
1122         LIST_HEAD(zombies);
1123         struct lnet_process_id peerid;
1124         struct list_head *tmp;
1125         u64 incarnation;
1126         struct ksock_conn *conn;
1127         struct ksock_conn *conn2;
1128         struct ksock_peer_ni *peer_ni = NULL;
1129         struct ksock_peer_ni *peer2;
1130         struct ksock_sched *sched;
1131         struct ksock_hello_msg *hello;
1132         int cpt;
1133         struct ksock_tx *tx;
1134         struct ksock_tx *txtmp;
1135         int rc;
1136         int rc2;
1137         int active;
1138         char *warn = NULL;
1139
1140         active = (route != NULL);
1141
1142         LASSERT (active == (type != SOCKLND_CONN_NONE));
1143
1144         LIBCFS_ALLOC(conn, sizeof(*conn));
1145         if (conn == NULL) {
1146                 rc = -ENOMEM;
1147                 goto failed_0;
1148         }
1149
1150         conn->ksnc_peer = NULL;
1151         conn->ksnc_route = NULL;
1152         conn->ksnc_sock = sock;
1153         /* 2 ref, 1 for conn, another extra ref prevents socket
1154          * being closed before establishment of connection */
1155         refcount_set(&conn->ksnc_sock_refcount, 2);
1156         conn->ksnc_type = type;
1157         ksocknal_lib_save_callback(sock, conn);
1158         refcount_set(&conn->ksnc_conn_refcount, 1); /* 1 ref for me */
1159
1160         conn->ksnc_rx_ready = 0;
1161         conn->ksnc_rx_scheduled = 0;
1162
1163         INIT_LIST_HEAD(&conn->ksnc_tx_queue);
1164         conn->ksnc_tx_ready = 0;
1165         conn->ksnc_tx_scheduled = 0;
1166         conn->ksnc_tx_carrier = NULL;
1167         atomic_set (&conn->ksnc_tx_nob, 0);
1168
1169         LIBCFS_ALLOC(hello, offsetof(struct ksock_hello_msg,
1170                                      kshm_ips[LNET_INTERFACES_NUM]));
1171         if (hello == NULL) {
1172                 rc = -ENOMEM;
1173                 goto failed_1;
1174         }
1175
1176         /* stash conn's local and remote addrs */
1177         rc = ksocknal_lib_get_conn_addrs (conn);
1178         if (rc != 0)
1179                 goto failed_1;
1180
1181         /* Find out/confirm peer_ni's NID and connection type and get the
1182          * vector of interfaces she's willing to let me connect to.
1183          * Passive connections use the listener timeout since the peer_ni sends
1184          * eagerly */
1185
1186         if (active) {
1187                 peer_ni = route->ksnr_peer;
1188                 LASSERT(ni == peer_ni->ksnp_ni);
1189
1190                 /* Active connection sends HELLO eagerly */
1191                 hello->kshm_nips = ksocknal_local_ipvec(ni, hello->kshm_ips);
1192                 peerid = peer_ni->ksnp_id;
1193
1194                 write_lock_bh(global_lock);
1195                 conn->ksnc_proto = peer_ni->ksnp_proto;
1196                 write_unlock_bh(global_lock);
1197
1198                 if (conn->ksnc_proto == NULL) {
1199                          conn->ksnc_proto = &ksocknal_protocol_v3x;
1200 #if SOCKNAL_VERSION_DEBUG
1201                          if (*ksocknal_tunables.ksnd_protocol == 2)
1202                                  conn->ksnc_proto = &ksocknal_protocol_v2x;
1203                          else if (*ksocknal_tunables.ksnd_protocol == 1)
1204                                  conn->ksnc_proto = &ksocknal_protocol_v1x;
1205 #endif
1206                 }
1207
1208                 rc = ksocknal_send_hello (ni, conn, peerid.nid, hello);
1209                 if (rc != 0)
1210                         goto failed_1;
1211         } else {
1212                 peerid.nid = LNET_NID_ANY;
1213                 peerid.pid = LNET_PID_ANY;
1214
1215                 /* Passive, get protocol from peer_ni */
1216                 conn->ksnc_proto = NULL;
1217         }
1218
1219         rc = ksocknal_recv_hello (ni, conn, hello, &peerid, &incarnation);
1220         if (rc < 0)
1221                 goto failed_1;
1222
1223         LASSERT (rc == 0 || active);
1224         LASSERT (conn->ksnc_proto != NULL);
1225         LASSERT (peerid.nid != LNET_NID_ANY);
1226
1227         cpt = lnet_cpt_of_nid(peerid.nid, ni);
1228
1229         if (active) {
1230                 ksocknal_peer_addref(peer_ni);
1231                 write_lock_bh(global_lock);
1232         } else {
1233                 peer_ni = ksocknal_create_peer(ni, peerid);
1234                 if (IS_ERR(peer_ni)) {
1235                         rc = PTR_ERR(peer_ni);
1236                         goto failed_1;
1237                 }
1238
1239                 write_lock_bh(global_lock);
1240
1241                 /* called with a ref on ni, so shutdown can't have started */
1242                 LASSERT(atomic_read(&((struct ksock_net *)ni->ni_data)->ksnn_npeers) >= 0);
1243
1244                 peer2 = ksocknal_find_peer_locked(ni, peerid);
1245                 if (peer2 == NULL) {
1246                         /* NB this puts an "empty" peer_ni in the peer_ni
1247                          * table (which takes my ref) */
1248                         hash_add(ksocknal_data.ksnd_peers,
1249                                  &peer_ni->ksnp_list, peerid.nid);
1250                 } else {
1251                         ksocknal_peer_decref(peer_ni);
1252                         peer_ni = peer2;
1253                 }
1254
1255                 /* +1 ref for me */
1256                 ksocknal_peer_addref(peer_ni);
1257                 peer_ni->ksnp_accepting++;
1258
1259                 /* Am I already connecting to this guy?  Resolve in
1260                  * favour of higher NID...
1261                  */
1262                 if (peerid.nid < ni->ni_nid &&
1263                     ksocknal_connecting(peer_ni, ((struct sockaddr *)
1264                                                   &conn->ksnc_peeraddr))) {
1265                         rc = EALREADY;
1266                         warn = "connection race resolution";
1267                         goto failed_2;
1268                 }
1269         }
1270
1271         if (peer_ni->ksnp_closing ||
1272             (active && route->ksnr_deleted)) {
1273                 /* peer_ni/route got closed under me */
1274                 rc = -ESTALE;
1275                 warn = "peer_ni/route removed";
1276                 goto failed_2;
1277         }
1278
1279         if (peer_ni->ksnp_proto == NULL) {
1280                 /* Never connected before.
1281                  * NB recv_hello may have returned EPROTO to signal my peer_ni
1282                  * wants a different protocol than the one I asked for.
1283                  */
1284                 LASSERT(list_empty(&peer_ni->ksnp_conns));
1285
1286                 peer_ni->ksnp_proto = conn->ksnc_proto;
1287                 peer_ni->ksnp_incarnation = incarnation;
1288         }
1289
1290         if (peer_ni->ksnp_proto != conn->ksnc_proto ||
1291             peer_ni->ksnp_incarnation != incarnation) {
1292                 /* peer_ni rebooted or I've got the wrong protocol version */
1293                 ksocknal_close_peer_conns_locked(peer_ni, NULL, 0);
1294
1295                 peer_ni->ksnp_proto = NULL;
1296                 rc = ESTALE;
1297                 warn = peer_ni->ksnp_incarnation != incarnation ?
1298                         "peer_ni rebooted" :
1299                         "wrong proto version";
1300                 goto failed_2;
1301         }
1302
1303         switch (rc) {
1304         default:
1305                 LBUG();
1306         case 0:
1307                 break;
1308         case EALREADY:
1309                 warn = "lost conn race";
1310                 goto failed_2;
1311         case EPROTO:
1312                 warn = "retry with different protocol version";
1313                 goto failed_2;
1314         }
1315
1316         /* Refuse to duplicate an existing connection, unless this is a
1317          * loopback connection */
1318         if (!rpc_cmp_addr((struct sockaddr *)&conn->ksnc_peeraddr,
1319                           (struct sockaddr *)&conn->ksnc_myaddr)) {
1320                 list_for_each(tmp, &peer_ni->ksnp_conns) {
1321                         conn2 = list_entry(tmp, struct ksock_conn, ksnc_list);
1322
1323                         if (!rpc_cmp_addr(
1324                                     (struct sockaddr *)&conn2->ksnc_peeraddr,
1325                                     (struct sockaddr *)&conn->ksnc_peeraddr) ||
1326                             !rpc_cmp_addr(
1327                                     (struct sockaddr *)&conn2->ksnc_myaddr,
1328                                     (struct sockaddr *)&conn->ksnc_myaddr) ||
1329                             conn2->ksnc_type != conn->ksnc_type)
1330                                 continue;
1331
1332                         /* Reply on a passive connection attempt so the peer_ni
1333                          * realises we're connected. */
1334                         LASSERT (rc == 0);
1335                         if (!active)
1336                                 rc = EALREADY;
1337
1338                         warn = "duplicate";
1339                         goto failed_2;
1340                 }
1341         }
1342
1343         /* If the connection created by this route didn't bind to the IP
1344          * address the route connected to, the connection/route matching
1345          * code below probably isn't going to work. */
1346         if (active &&
1347             !rpc_cmp_addr((struct sockaddr *)&route->ksnr_addr,
1348                           (struct sockaddr *)&conn->ksnc_peeraddr)) {
1349                 CERROR("Route %s %pIS connected to %pIS\n",
1350                        libcfs_id2str(peer_ni->ksnp_id),
1351                        &route->ksnr_addr,
1352                        &conn->ksnc_peeraddr);
1353         }
1354
1355         /* Search for a route corresponding to the new connection and
1356          * create an association.  This allows incoming connections created
1357          * by routes in my peer_ni to match my own route entries so I don't
1358          * continually create duplicate routes. */
1359         list_for_each(tmp, &peer_ni->ksnp_routes) {
1360                 route = list_entry(tmp, struct ksock_route, ksnr_list);
1361
1362                 if (!rpc_cmp_addr((struct sockaddr *)&route->ksnr_addr,
1363                                   (struct sockaddr *)&conn->ksnc_peeraddr))
1364                         continue;
1365
1366                 ksocknal_associate_route_conn_locked(route, conn);
1367                 break;
1368         }
1369
1370         conn->ksnc_peer = peer_ni;                 /* conn takes my ref on peer_ni */
1371         peer_ni->ksnp_last_alive = ktime_get_seconds();
1372         peer_ni->ksnp_send_keepalive = 0;
1373         peer_ni->ksnp_error = 0;
1374
1375         sched = ksocknal_choose_scheduler_locked(cpt);
1376         if (!sched) {
1377                 CERROR("no schedulers available. node is unhealthy\n");
1378                 goto failed_2;
1379         }
1380         /*
1381          * The cpt might have changed if we ended up selecting a non cpt
1382          * native scheduler. So use the scheduler's cpt instead.
1383          */
1384         cpt = sched->kss_cpt;
1385         sched->kss_nconns++;
1386         conn->ksnc_scheduler = sched;
1387
1388         conn->ksnc_tx_last_post = ktime_get_seconds();
1389         /* Set the deadline for the outgoing HELLO to drain */
1390         conn->ksnc_tx_bufnob = sock->sk->sk_wmem_queued;
1391         conn->ksnc_tx_deadline = ktime_get_seconds() +
1392                                  ksocknal_timeout();
1393         smp_mb();   /* order with adding to peer_ni's conn list */
1394
1395         list_add(&conn->ksnc_list, &peer_ni->ksnp_conns);
1396         ksocknal_conn_addref(conn);
1397
1398         ksocknal_new_packet(conn, 0);
1399
1400         conn->ksnc_zc_capable = ksocknal_lib_zc_capable(conn);
1401
1402         /* Take packets blocking for this connection. */
1403         list_for_each_entry_safe(tx, txtmp, &peer_ni->ksnp_tx_queue, tx_list) {
1404                 if (conn->ksnc_proto->pro_match_tx(conn, tx, tx->tx_nonblk) ==
1405                     SOCKNAL_MATCH_NO)
1406                         continue;
1407
1408                 list_del(&tx->tx_list);
1409                 ksocknal_queue_tx_locked(tx, conn);
1410         }
1411
1412         write_unlock_bh(global_lock);
1413
1414         /* We've now got a new connection.  Any errors from here on are just
1415          * like "normal" comms errors and we close the connection normally.
1416          * NB (a) we still have to send the reply HELLO for passive
1417          *        connections,
1418          *    (b) normal I/O on the conn is blocked until I setup and call the
1419          *        socket callbacks.
1420          */
1421
1422         CDEBUG(D_NET, "New conn %s p %d.x %pIS -> %pISp"
1423                " incarnation:%lld sched[%d]\n",
1424                libcfs_id2str(peerid), conn->ksnc_proto->pro_version,
1425                &conn->ksnc_myaddr, &conn->ksnc_peeraddr,
1426                incarnation, cpt);
1427
1428         if (active) {
1429                 /* additional routes after interface exchange? */
1430                 ksocknal_create_routes(
1431                         peer_ni,
1432                         rpc_get_port((struct sockaddr *)&conn->ksnc_peeraddr),
1433                         hello->kshm_ips, hello->kshm_nips);
1434         } else {
1435                 hello->kshm_nips = ksocknal_select_ips(peer_ni, hello->kshm_ips,
1436                                                        hello->kshm_nips);
1437                 rc = ksocknal_send_hello(ni, conn, peerid.nid, hello);
1438         }
1439
1440         LIBCFS_FREE(hello, offsetof(struct ksock_hello_msg,
1441                                     kshm_ips[LNET_INTERFACES_NUM]));
1442
1443         /* setup the socket AFTER I've received hello (it disables
1444          * SO_LINGER).  I might call back to the acceptor who may want
1445          * to send a protocol version response and then close the
1446          * socket; this ensures the socket only tears down after the
1447          * response has been sent. */
1448         if (rc == 0)
1449                 rc = ksocknal_lib_setup_sock(sock);
1450
1451         write_lock_bh(global_lock);
1452
1453         /* NB my callbacks block while I hold ksnd_global_lock */
1454         ksocknal_lib_set_callback(sock, conn);
1455
1456         if (!active)
1457                 peer_ni->ksnp_accepting--;
1458
1459         write_unlock_bh(global_lock);
1460
1461         if (rc != 0) {
1462                 write_lock_bh(global_lock);
1463                 if (!conn->ksnc_closing) {
1464                         /* could be closed by another thread */
1465                         ksocknal_close_conn_locked(conn, rc);
1466                 }
1467                 write_unlock_bh(global_lock);
1468         } else if (ksocknal_connsock_addref(conn) == 0) {
1469                 /* Allow I/O to proceed. */
1470                 ksocknal_read_callback(conn);
1471                 ksocknal_write_callback(conn);
1472                 ksocknal_connsock_decref(conn);
1473         }
1474
1475         ksocknal_connsock_decref(conn);
1476         ksocknal_conn_decref(conn);
1477         return rc;
1478
1479 failed_2:
1480         if (!peer_ni->ksnp_closing &&
1481             list_empty(&peer_ni->ksnp_conns) &&
1482             list_empty(&peer_ni->ksnp_routes)) {
1483                 list_splice_init(&peer_ni->ksnp_tx_queue, &zombies);
1484                 ksocknal_unlink_peer_locked(peer_ni);
1485         }
1486
1487         write_unlock_bh(global_lock);
1488
1489         if (warn != NULL) {
1490                 if (rc < 0)
1491                         CERROR("Not creating conn %s type %d: %s\n",
1492                                libcfs_id2str(peerid), conn->ksnc_type, warn);
1493                 else
1494                         CDEBUG(D_NET, "Not creating conn %s type %d: %s\n",
1495                               libcfs_id2str(peerid), conn->ksnc_type, warn);
1496         }
1497
1498         if (!active) {
1499                 if (rc > 0) {
1500                         /* Request retry by replying with CONN_NONE
1501                          * ksnc_proto has been set already */
1502                         conn->ksnc_type = SOCKLND_CONN_NONE;
1503                         hello->kshm_nips = 0;
1504                         ksocknal_send_hello(ni, conn, peerid.nid, hello);
1505                 }
1506
1507                 write_lock_bh(global_lock);
1508                 peer_ni->ksnp_accepting--;
1509                 write_unlock_bh(global_lock);
1510         }
1511
1512         /*
1513          * If we get here without an error code, just use -EALREADY.
1514          * Depending on how we got here, the error may be positive
1515          * or negative. Normalize the value for ksocknal_txlist_done().
1516          */
1517         rc2 = (rc == 0 ? -EALREADY : (rc > 0 ? -rc : rc));
1518         ksocknal_txlist_done(ni, &zombies, rc2);
1519         ksocknal_peer_decref(peer_ni);
1520
1521 failed_1:
1522         if (hello != NULL)
1523                 LIBCFS_FREE(hello, offsetof(struct ksock_hello_msg,
1524                                             kshm_ips[LNET_INTERFACES_NUM]));
1525
1526         LIBCFS_FREE(conn, sizeof(*conn));
1527
1528 failed_0:
1529         sock_release(sock);
1530         return rc;
1531 }
1532
1533 void
1534 ksocknal_close_conn_locked(struct ksock_conn *conn, int error)
1535 {
1536         /* This just does the immmediate housekeeping, and queues the
1537          * connection for the reaper to terminate.
1538          * Caller holds ksnd_global_lock exclusively in irq context */
1539         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1540         struct ksock_route *route;
1541         struct ksock_conn *conn2;
1542         struct list_head *tmp;
1543
1544         LASSERT(peer_ni->ksnp_error == 0);
1545         LASSERT(!conn->ksnc_closing);
1546         conn->ksnc_closing = 1;
1547
1548         /* ksnd_deathrow_conns takes over peer_ni's ref */
1549         list_del(&conn->ksnc_list);
1550
1551         route = conn->ksnc_route;
1552         if (route != NULL) {
1553                 /* dissociate conn from route... */
1554                 LASSERT(!route->ksnr_deleted);
1555                 LASSERT((route->ksnr_connected & BIT(conn->ksnc_type)) != 0);
1556
1557                 conn2 = NULL;
1558                 list_for_each(tmp, &peer_ni->ksnp_conns) {
1559                         conn2 = list_entry(tmp, struct ksock_conn, ksnc_list);
1560
1561                         if (conn2->ksnc_route == route &&
1562                             conn2->ksnc_type == conn->ksnc_type)
1563                                 break;
1564
1565                         conn2 = NULL;
1566                 }
1567                 if (conn2 == NULL)
1568                         route->ksnr_connected &= ~BIT(conn->ksnc_type);
1569
1570                 conn->ksnc_route = NULL;
1571
1572                 ksocknal_route_decref(route);   /* drop conn's ref on route */
1573         }
1574
1575         if (list_empty(&peer_ni->ksnp_conns)) {
1576                 /* No more connections to this peer_ni */
1577
1578                 if (!list_empty(&peer_ni->ksnp_tx_queue)) {
1579                         struct ksock_tx *tx;
1580
1581                         LASSERT(conn->ksnc_proto == &ksocknal_protocol_v3x);
1582
1583                         /* throw them to the last connection...,
1584                          * these TXs will be send to /dev/null by scheduler */
1585                         list_for_each_entry(tx, &peer_ni->ksnp_tx_queue,
1586                                             tx_list)
1587                                 ksocknal_tx_prep(conn, tx);
1588
1589                         spin_lock_bh(&conn->ksnc_scheduler->kss_lock);
1590                         list_splice_init(&peer_ni->ksnp_tx_queue,
1591                                          &conn->ksnc_tx_queue);
1592                         spin_unlock_bh(&conn->ksnc_scheduler->kss_lock);
1593                 }
1594
1595                 /* renegotiate protocol version */
1596                 peer_ni->ksnp_proto = NULL;
1597                 /* stash last conn close reason */
1598                 peer_ni->ksnp_error = error;
1599
1600                 if (list_empty(&peer_ni->ksnp_routes)) {
1601                         /* I've just closed last conn belonging to a
1602                          * peer_ni with no routes to it */
1603                         ksocknal_unlink_peer_locked(peer_ni);
1604                 }
1605         }
1606
1607         spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
1608
1609         list_add_tail(&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
1610         wake_up(&ksocknal_data.ksnd_reaper_waitq);
1611
1612         spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
1613 }
1614
1615 void
1616 ksocknal_peer_failed(struct ksock_peer_ni *peer_ni)
1617 {
1618         bool notify = false;
1619         time64_t last_alive = 0;
1620
1621         /* There has been a connection failure or comms error; but I'll only
1622          * tell LNET I think the peer_ni is dead if it's to another kernel and
1623          * there are no connections or connection attempts in existence. */
1624
1625         read_lock(&ksocknal_data.ksnd_global_lock);
1626
1627         if ((peer_ni->ksnp_id.pid & LNET_PID_USERFLAG) == 0 &&
1628              list_empty(&peer_ni->ksnp_conns) &&
1629              peer_ni->ksnp_accepting == 0 &&
1630              ksocknal_find_connecting_route_locked(peer_ni) == NULL) {
1631                 notify = true;
1632                 last_alive = peer_ni->ksnp_last_alive;
1633         }
1634
1635         read_unlock(&ksocknal_data.ksnd_global_lock);
1636
1637         if (notify)
1638                 lnet_notify(peer_ni->ksnp_ni, peer_ni->ksnp_id.nid,
1639                             false, false, last_alive);
1640 }
1641
1642 void
1643 ksocknal_finalize_zcreq(struct ksock_conn *conn)
1644 {
1645         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1646         struct ksock_tx *tx;
1647         struct ksock_tx *tmp;
1648         LIST_HEAD(zlist);
1649
1650         /* NB safe to finalize TXs because closing of socket will
1651          * abort all buffered data */
1652         LASSERT(conn->ksnc_sock == NULL);
1653
1654         spin_lock(&peer_ni->ksnp_lock);
1655
1656         list_for_each_entry_safe(tx, tmp, &peer_ni->ksnp_zc_req_list, tx_zc_list) {
1657                 if (tx->tx_conn != conn)
1658                         continue;
1659
1660                 LASSERT(tx->tx_msg.ksm_zc_cookies[0] != 0);
1661
1662                 tx->tx_msg.ksm_zc_cookies[0] = 0;
1663                 tx->tx_zc_aborted = 1;  /* mark it as not-acked */
1664                 list_move(&tx->tx_zc_list, &zlist);
1665         }
1666
1667         spin_unlock(&peer_ni->ksnp_lock);
1668
1669         while (!list_empty(&zlist)) {
1670                 tx = list_entry(zlist.next, struct ksock_tx, tx_zc_list);
1671
1672                 list_del(&tx->tx_zc_list);
1673                 ksocknal_tx_decref(tx);
1674         }
1675 }
1676
1677 void
1678 ksocknal_terminate_conn(struct ksock_conn *conn)
1679 {
1680         /* This gets called by the reaper (guaranteed thread context) to
1681          * disengage the socket from its callbacks and close it.
1682          * ksnc_refcount will eventually hit zero, and then the reaper will
1683          * destroy it.
1684          */
1685         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1686         struct ksock_sched *sched = conn->ksnc_scheduler;
1687         bool failed = false;
1688
1689         LASSERT(conn->ksnc_closing);
1690
1691         /* wake up the scheduler to "send" all remaining packets to /dev/null */
1692         spin_lock_bh(&sched->kss_lock);
1693
1694         /* a closing conn is always ready to tx */
1695         conn->ksnc_tx_ready = 1;
1696
1697         if (!conn->ksnc_tx_scheduled &&
1698             !list_empty(&conn->ksnc_tx_queue)) {
1699                 list_add_tail(&conn->ksnc_tx_list,
1700                               &sched->kss_tx_conns);
1701                 conn->ksnc_tx_scheduled = 1;
1702                 /* extra ref for scheduler */
1703                 ksocknal_conn_addref(conn);
1704
1705                 wake_up (&sched->kss_waitq);
1706         }
1707
1708         spin_unlock_bh(&sched->kss_lock);
1709
1710         /* serialise with callbacks */
1711         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1712
1713         ksocknal_lib_reset_callback(conn->ksnc_sock, conn);
1714
1715         /* OK, so this conn may not be completely disengaged from its
1716          * scheduler yet, but it _has_ committed to terminate...
1717          */
1718         conn->ksnc_scheduler->kss_nconns--;
1719
1720         if (peer_ni->ksnp_error != 0) {
1721                 /* peer_ni's last conn closed in error */
1722                 LASSERT(list_empty(&peer_ni->ksnp_conns));
1723                 failed = true;
1724                 peer_ni->ksnp_error = 0;     /* avoid multiple notifications */
1725         }
1726
1727         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1728
1729         if (failed)
1730                 ksocknal_peer_failed(peer_ni);
1731
1732         /* The socket is closed on the final put; either here, or in
1733          * ksocknal_{send,recv}msg().  Since we set up the linger2 option
1734          * when the connection was established, this will close the socket
1735          * immediately, aborting anything buffered in it. Any hung
1736          * zero-copy transmits will therefore complete in finite time.
1737          */
1738         ksocknal_connsock_decref(conn);
1739 }
1740
1741 void
1742 ksocknal_queue_zombie_conn(struct ksock_conn *conn)
1743 {
1744         /* Queue the conn for the reaper to destroy */
1745         LASSERT(refcount_read(&conn->ksnc_conn_refcount) == 0);
1746         spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
1747
1748         list_add_tail(&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1749         wake_up(&ksocknal_data.ksnd_reaper_waitq);
1750
1751         spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
1752 }
1753
1754 void
1755 ksocknal_destroy_conn(struct ksock_conn *conn)
1756 {
1757         time64_t last_rcv;
1758
1759         /* Final coup-de-grace of the reaper */
1760         CDEBUG (D_NET, "connection %p\n", conn);
1761
1762         LASSERT(refcount_read(&conn->ksnc_conn_refcount) == 0);
1763         LASSERT(refcount_read(&conn->ksnc_sock_refcount) == 0);
1764         LASSERT (conn->ksnc_sock == NULL);
1765         LASSERT (conn->ksnc_route == NULL);
1766         LASSERT (!conn->ksnc_tx_scheduled);
1767         LASSERT (!conn->ksnc_rx_scheduled);
1768         LASSERT(list_empty(&conn->ksnc_tx_queue));
1769
1770         /* complete current receive if any */
1771         switch (conn->ksnc_rx_state) {
1772         case SOCKNAL_RX_LNET_PAYLOAD:
1773                 last_rcv = conn->ksnc_rx_deadline -
1774                            ksocknal_timeout();
1775                 CERROR("Completing partial receive from %s[%d], ip %pISp, with error, wanted: %d, left: %d, last alive is %lld secs ago\n",
1776                        libcfs_id2str(conn->ksnc_peer->ksnp_id), conn->ksnc_type,
1777                        &conn->ksnc_peeraddr,
1778                        conn->ksnc_rx_nob_wanted, conn->ksnc_rx_nob_left,
1779                        ktime_get_seconds() - last_rcv);
1780                 if (conn->ksnc_lnet_msg)
1781                         conn->ksnc_lnet_msg->msg_health_status =
1782                                 LNET_MSG_STATUS_REMOTE_ERROR;
1783                 lnet_finalize(conn->ksnc_lnet_msg, -EIO);
1784                 break;
1785         case SOCKNAL_RX_LNET_HEADER:
1786                 if (conn->ksnc_rx_started)
1787                         CERROR("Incomplete receive of lnet header from %s, ip %pISp, with error, protocol: %d.x.\n",
1788                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1789                                &conn->ksnc_peeraddr,
1790                                conn->ksnc_proto->pro_version);
1791                 break;
1792         case SOCKNAL_RX_KSM_HEADER:
1793                 if (conn->ksnc_rx_started)
1794                         CERROR("Incomplete receive of ksock message from %s, ip %pISp, with error, protocol: %d.x.\n",
1795                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1796                                &conn->ksnc_peeraddr,
1797                                conn->ksnc_proto->pro_version);
1798                 break;
1799         case SOCKNAL_RX_SLOP:
1800                 if (conn->ksnc_rx_started)
1801                         CERROR("Incomplete receive of slops from %s, ip %pISp, with error\n",
1802                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1803                                &conn->ksnc_peeraddr);
1804                break;
1805         default:
1806                 LBUG ();
1807                 break;
1808         }
1809
1810         ksocknal_peer_decref(conn->ksnc_peer);
1811
1812         LIBCFS_FREE (conn, sizeof (*conn));
1813 }
1814
1815 int
1816 ksocknal_close_peer_conns_locked(struct ksock_peer_ni *peer_ni,
1817                                  struct sockaddr *addr, int why)
1818 {
1819         struct ksock_conn *conn;
1820         struct ksock_conn *cnxt;
1821         int count = 0;
1822
1823         list_for_each_entry_safe(conn, cnxt, &peer_ni->ksnp_conns, ksnc_list) {
1824                 if (!addr ||
1825                     rpc_cmp_addr(addr,
1826                                  (struct sockaddr *)&conn->ksnc_peeraddr)) {
1827                         count++;
1828                         ksocknal_close_conn_locked(conn, why);
1829                 }
1830         }
1831
1832         return count;
1833 }
1834
1835 int
1836 ksocknal_close_conn_and_siblings(struct ksock_conn *conn, int why)
1837 {
1838         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1839         int count;
1840
1841         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1842
1843         count = ksocknal_close_peer_conns_locked(
1844                 peer_ni, (struct sockaddr *)&conn->ksnc_peeraddr, why);
1845
1846         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1847
1848         return count;
1849 }
1850
1851 int
1852 ksocknal_close_matching_conns(struct lnet_process_id id, __u32 ipaddr)
1853 {
1854         struct ksock_peer_ni *peer_ni;
1855         struct hlist_node *pnxt;
1856         int lo;
1857         int hi;
1858         int i;
1859         int count = 0;
1860         struct sockaddr_in sa = {.sin_family = AF_INET};
1861
1862         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1863
1864         if (id.nid != LNET_NID_ANY) {
1865                 lo = hash_min(id.nid, HASH_BITS(ksocknal_data.ksnd_peers));
1866                 hi = lo;
1867         } else {
1868                 lo = 0;
1869                 hi = HASH_SIZE(ksocknal_data.ksnd_peers) - 1;
1870         }
1871
1872         sa.sin_addr.s_addr = htonl(ipaddr);
1873         for (i = lo; i <= hi; i++) {
1874                 hlist_for_each_entry_safe(peer_ni, pnxt,
1875                                           &ksocknal_data.ksnd_peers[i],
1876                                           ksnp_list) {
1877
1878                         if (!((id.nid == LNET_NID_ANY ||
1879                                id.nid == peer_ni->ksnp_id.nid) &&
1880                               (id.pid == LNET_PID_ANY ||
1881                                id.pid == peer_ni->ksnp_id.pid)))
1882                                 continue;
1883
1884                         count += ksocknal_close_peer_conns_locked(
1885                                 peer_ni,
1886                                 ipaddr ? (struct sockaddr *)&sa : NULL, 0);
1887                 }
1888         }
1889
1890         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1891
1892         /* wildcards always succeed */
1893         if (id.nid == LNET_NID_ANY || id.pid == LNET_PID_ANY || ipaddr == 0)
1894                 return 0;
1895
1896         return (count == 0 ? -ENOENT : 0);
1897 }
1898
1899 void
1900 ksocknal_notify_gw_down(lnet_nid_t gw_nid)
1901 {
1902         /* The router is telling me she's been notified of a change in
1903          * gateway state....
1904          */
1905         struct lnet_process_id id = {
1906                 .nid    = gw_nid,
1907                 .pid    = LNET_PID_ANY,
1908         };
1909
1910         CDEBUG(D_NET, "gw %s down\n", libcfs_nid2str(gw_nid));
1911
1912         /* If the gateway crashed, close all open connections... */
1913         ksocknal_close_matching_conns(id, 0);
1914         return;
1915
1916         /* We can only establish new connections
1917          * if we have autroutes, and these connect on demand. */
1918 }
1919
1920 static void
1921 ksocknal_push_peer(struct ksock_peer_ni *peer_ni)
1922 {
1923         int index;
1924         int i;
1925         struct list_head *tmp;
1926         struct ksock_conn *conn;
1927
1928         for (index = 0; ; index++) {
1929                 read_lock(&ksocknal_data.ksnd_global_lock);
1930
1931                 i = 0;
1932                 conn = NULL;
1933
1934                 list_for_each(tmp, &peer_ni->ksnp_conns) {
1935                         if (i++ == index) {
1936                                 conn = list_entry(tmp, struct ksock_conn,
1937                                                   ksnc_list);
1938                                 ksocknal_conn_addref(conn);
1939                                 break;
1940                         }
1941                 }
1942
1943                 read_unlock(&ksocknal_data.ksnd_global_lock);
1944
1945                 if (conn == NULL)
1946                         break;
1947
1948                 ksocknal_lib_push_conn (conn);
1949                 ksocknal_conn_decref(conn);
1950         }
1951 }
1952
1953 static int
1954 ksocknal_push(struct lnet_ni *ni, struct lnet_process_id id)
1955 {
1956         int lo;
1957         int hi;
1958         int bkt;
1959         int rc = -ENOENT;
1960
1961         if (id.nid != LNET_NID_ANY) {
1962                 lo = hash_min(id.nid, HASH_BITS(ksocknal_data.ksnd_peers));
1963                 hi = lo;
1964         } else {
1965                 lo = 0;
1966                 hi = HASH_SIZE(ksocknal_data.ksnd_peers) - 1;
1967         }
1968
1969         for (bkt = lo; bkt <= hi; bkt++) {
1970                 int peer_off; /* searching offset in peer_ni hash table */
1971
1972                 for (peer_off = 0; ; peer_off++) {
1973                         struct ksock_peer_ni *peer_ni;
1974                         int           i = 0;
1975
1976                         read_lock(&ksocknal_data.ksnd_global_lock);
1977                         hlist_for_each_entry(peer_ni,
1978                                              &ksocknal_data.ksnd_peers[bkt],
1979                                              ksnp_list) {
1980                                 if (!((id.nid == LNET_NID_ANY ||
1981                                        id.nid == peer_ni->ksnp_id.nid) &&
1982                                       (id.pid == LNET_PID_ANY ||
1983                                        id.pid == peer_ni->ksnp_id.pid)))
1984                                         continue;
1985
1986                                 if (i++ == peer_off) {
1987                                         ksocknal_peer_addref(peer_ni);
1988                                         break;
1989                                 }
1990                         }
1991                         read_unlock(&ksocknal_data.ksnd_global_lock);
1992
1993                         if (i <= peer_off) /* no match */
1994                                 break;
1995
1996                         rc = 0;
1997                         ksocknal_push_peer(peer_ni);
1998                         ksocknal_peer_decref(peer_ni);
1999                 }
2000         }
2001         return rc;
2002 }
2003
2004 static int
2005 ksocknal_add_interface(struct lnet_ni *ni, __u32 ipaddress, __u32 netmask)
2006 {
2007         struct ksock_net *net = ni->ni_data;
2008         struct ksock_interface *iface;
2009         struct sockaddr_in sa = { .sin_family = AF_INET };
2010         int rc;
2011         int i;
2012         int j;
2013         struct ksock_peer_ni *peer_ni;
2014         struct list_head *rtmp;
2015         struct ksock_route *route;
2016
2017         if (ipaddress == 0 ||
2018             netmask == 0)
2019                 return -EINVAL;
2020
2021         write_lock_bh(&ksocknal_data.ksnd_global_lock);
2022
2023         sa.sin_addr.s_addr = htonl(ipaddress);
2024         iface = ksocknal_ip2iface(ni, (struct sockaddr *)&sa);
2025         if (iface != NULL) {
2026                 /* silently ignore dups */
2027                 rc = 0;
2028         } else if (net->ksnn_ninterfaces == LNET_INTERFACES_NUM) {
2029                 rc = -ENOSPC;
2030         } else {
2031                 iface = &net->ksnn_interfaces[net->ksnn_ninterfaces++];
2032
2033                 iface->ksni_index = ksocknal_ip2index((struct sockaddr *)&sa,
2034                                                       ni);
2035                 rpc_copy_addr((struct sockaddr *)&iface->ksni_addr,
2036                               (struct sockaddr *)&sa);
2037                 iface->ksni_netmask = netmask;
2038                 iface->ksni_nroutes = 0;
2039                 iface->ksni_npeers = 0;
2040
2041                 hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list) {
2042                         for (j = 0; j < peer_ni->ksnp_n_passive_ips; j++)
2043                                 if (peer_ni->ksnp_passive_ips[j] == ipaddress)
2044                                         iface->ksni_npeers++;
2045
2046                         list_for_each(rtmp, &peer_ni->ksnp_routes) {
2047                                 route = list_entry(rtmp,
2048                                                    struct ksock_route,
2049                                                    ksnr_list);
2050
2051                                 if (route->ksnr_myiface ==
2052                                             iface->ksni_index)
2053                                         iface->ksni_nroutes++;
2054                         }
2055                 }
2056
2057                 rc = 0;
2058                 /* NB only new connections will pay attention to the new
2059                  * interface!
2060                  */
2061         }
2062
2063         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
2064
2065         return rc;
2066 }
2067
2068 static void
2069 ksocknal_peer_del_interface_locked(struct ksock_peer_ni *peer_ni,
2070                                    __u32 ipaddr, int index)
2071 {
2072         struct ksock_route *route;
2073         struct ksock_route *rnxt;
2074         struct ksock_conn *conn;
2075         struct ksock_conn *cnxt;
2076         int i;
2077         int j;
2078
2079         for (i = 0; i < peer_ni->ksnp_n_passive_ips; i++)
2080                 if (peer_ni->ksnp_passive_ips[i] == ipaddr) {
2081                         for (j = i+1; j < peer_ni->ksnp_n_passive_ips; j++)
2082                                 peer_ni->ksnp_passive_ips[j-1] =
2083                                         peer_ni->ksnp_passive_ips[j];
2084                         peer_ni->ksnp_n_passive_ips--;
2085                         break;
2086                 }
2087
2088         list_for_each_entry_safe(route, rnxt, &peer_ni->ksnp_routes,
2089                                  ksnr_list) {
2090                 if (route->ksnr_myiface != index)
2091                         continue;
2092
2093                 if (route->ksnr_share_count != 0) {
2094                         /* Manually created; keep, but unbind */
2095                         route->ksnr_myiface = -1;
2096                 } else {
2097                         ksocknal_del_route_locked(route);
2098                 }
2099         }
2100
2101         list_for_each_entry_safe(conn, cnxt, &peer_ni->ksnp_conns, ksnc_list)
2102                 if (conn->ksnc_route->ksnr_myiface == index)
2103                         ksocknal_close_conn_locked (conn, 0);
2104 }
2105
2106 static int
2107 ksocknal_del_interface(struct lnet_ni *ni, __u32 ipaddress)
2108 {
2109         struct ksock_net *net = ni->ni_data;
2110         int rc = -ENOENT;
2111         struct hlist_node *nxt;
2112         struct ksock_peer_ni *peer_ni;
2113         u32 this_ip;
2114         struct sockaddr_in sa = {.sin_family = AF_INET };
2115         int index;
2116         int i;
2117         int j;
2118
2119         sa.sin_addr.s_addr = htonl(ipaddress);
2120         index = ksocknal_ip2index((struct sockaddr *)&sa, ni);
2121
2122         write_lock_bh(&ksocknal_data.ksnd_global_lock);
2123
2124         for (i = 0; i < net->ksnn_ninterfaces; i++) {
2125                 struct sockaddr_in *sa =
2126                         (void *)&net->ksnn_interfaces[i].ksni_addr;
2127
2128                 if (sa->sin_family != AF_INET)
2129                         continue;
2130                 this_ip = ntohl(sa->sin_addr.s_addr);
2131
2132                 if (!(ipaddress == 0 ||
2133                       ipaddress == this_ip))
2134                         continue;
2135
2136                 rc = 0;
2137
2138                 for (j = i+1; j < net->ksnn_ninterfaces; j++)
2139                         net->ksnn_interfaces[j-1] =
2140                                 net->ksnn_interfaces[j];
2141
2142                 net->ksnn_ninterfaces--;
2143
2144                 hash_for_each_safe(ksocknal_data.ksnd_peers, j,
2145                                    nxt, peer_ni, ksnp_list) {
2146                         if (peer_ni->ksnp_ni != ni)
2147                                 continue;
2148
2149                         ksocknal_peer_del_interface_locked(peer_ni,
2150                                                            this_ip, index);
2151                 }
2152         }
2153
2154         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
2155
2156         return rc;
2157 }
2158
2159 int
2160 ksocknal_ctl(struct lnet_ni *ni, unsigned int cmd, void *arg)
2161 {
2162         struct lnet_process_id id = {0};
2163         struct libcfs_ioctl_data *data = arg;
2164         int rc;
2165
2166         switch(cmd) {
2167         case IOC_LIBCFS_GET_INTERFACE: {
2168                 struct ksock_net *net = ni->ni_data;
2169                 struct ksock_interface *iface;
2170                 struct sockaddr_in *sa;
2171
2172                 read_lock(&ksocknal_data.ksnd_global_lock);
2173
2174                 if (data->ioc_count >= (__u32)net->ksnn_ninterfaces) {
2175                         rc = -ENOENT;
2176                 } else {
2177                         rc = 0;
2178                         iface = &net->ksnn_interfaces[data->ioc_count];
2179
2180                         sa = (void *)&iface->ksni_addr;
2181                         if (sa->sin_family == AF_INET)
2182                                 data->ioc_u32[0] = ntohl(sa->sin_addr.s_addr);
2183                         else
2184                                 data->ioc_u32[0] = 0xFFFFFFFF;
2185                         data->ioc_u32[1] = iface->ksni_netmask;
2186                         data->ioc_u32[2] = iface->ksni_npeers;
2187                         data->ioc_u32[3] = iface->ksni_nroutes;
2188                 }
2189
2190                 read_unlock(&ksocknal_data.ksnd_global_lock);
2191                 return rc;
2192         }
2193
2194         case IOC_LIBCFS_ADD_INTERFACE:
2195                 return ksocknal_add_interface(ni,
2196                                               data->ioc_u32[0], /* IP address */
2197                                               data->ioc_u32[1]); /* net mask */
2198
2199         case IOC_LIBCFS_DEL_INTERFACE:
2200                 return ksocknal_del_interface(ni,
2201                                               data->ioc_u32[0]); /* IP address */
2202
2203         case IOC_LIBCFS_GET_PEER: {
2204                 __u32            myip = 0;
2205                 __u32            ip = 0;
2206                 int              port = 0;
2207                 int              conn_count = 0;
2208                 int              share_count = 0;
2209
2210                 rc = ksocknal_get_peer_info(ni, data->ioc_count,
2211                                             &id, &myip, &ip, &port,
2212                                             &conn_count,  &share_count);
2213                 if (rc != 0)
2214                         return rc;
2215
2216                 data->ioc_nid    = id.nid;
2217                 data->ioc_count  = share_count;
2218                 data->ioc_u32[0] = ip;
2219                 data->ioc_u32[1] = port;
2220                 data->ioc_u32[2] = myip;
2221                 data->ioc_u32[3] = conn_count;
2222                 data->ioc_u32[4] = id.pid;
2223                 return 0;
2224         }
2225
2226         case IOC_LIBCFS_ADD_PEER:
2227                 id.nid = data->ioc_nid;
2228                 id.pid = LNET_PID_LUSTRE;
2229                 return ksocknal_add_peer (ni, id,
2230                                           data->ioc_u32[0], /* IP */
2231                                           data->ioc_u32[1]); /* port */
2232
2233         case IOC_LIBCFS_DEL_PEER:
2234                 id.nid = data->ioc_nid;
2235                 id.pid = LNET_PID_ANY;
2236                 return ksocknal_del_peer (ni, id,
2237                                           data->ioc_u32[0]); /* IP */
2238
2239         case IOC_LIBCFS_GET_CONN: {
2240                 int           txmem;
2241                 int           rxmem;
2242                 int           nagle;
2243                 struct ksock_conn *conn = ksocknal_get_conn_by_idx(ni, data->ioc_count);
2244                 struct sockaddr_in *psa = (void *)&conn->ksnc_peeraddr;
2245                 struct sockaddr_in *mysa = (void *)&conn->ksnc_myaddr;
2246
2247                 if (conn == NULL)
2248                         return -ENOENT;
2249
2250                 ksocknal_lib_get_conn_tunables(conn, &txmem, &rxmem, &nagle);
2251
2252                 data->ioc_count  = txmem;
2253                 data->ioc_nid    = conn->ksnc_peer->ksnp_id.nid;
2254                 data->ioc_flags  = nagle;
2255                 if (psa->sin_family == AF_INET)
2256                         data->ioc_u32[0] = ntohl(psa->sin_addr.s_addr);
2257                 else
2258                         data->ioc_u32[0] = 0xFFFFFFFF;
2259                 data->ioc_u32[1] = rpc_get_port((struct sockaddr *)
2260                                                 &conn->ksnc_peeraddr);
2261                 if (mysa->sin_family == AF_INET)
2262                         data->ioc_u32[2] = ntohl(mysa->sin_addr.s_addr);
2263                 else
2264                         data->ioc_u32[2] = 0xFFFFFFFF;
2265                 data->ioc_u32[3] = conn->ksnc_type;
2266                 data->ioc_u32[4] = conn->ksnc_scheduler->kss_cpt;
2267                 data->ioc_u32[5] = rxmem;
2268                 data->ioc_u32[6] = conn->ksnc_peer->ksnp_id.pid;
2269                 ksocknal_conn_decref(conn);
2270                 return 0;
2271         }
2272
2273         case IOC_LIBCFS_CLOSE_CONNECTION:
2274                 id.nid = data->ioc_nid;
2275                 id.pid = LNET_PID_ANY;
2276                 return ksocknal_close_matching_conns (id,
2277                                                       data->ioc_u32[0]);
2278
2279         case IOC_LIBCFS_REGISTER_MYNID:
2280                 /* Ignore if this is a noop */
2281                 if (data->ioc_nid == ni->ni_nid)
2282                         return 0;
2283
2284                 CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID: %s(%s)\n",
2285                        libcfs_nid2str(data->ioc_nid),
2286                        libcfs_nid2str(ni->ni_nid));
2287                 return -EINVAL;
2288
2289         case IOC_LIBCFS_PUSH_CONNECTION:
2290                 id.nid = data->ioc_nid;
2291                 id.pid = LNET_PID_ANY;
2292                 return ksocknal_push(ni, id);
2293
2294         default:
2295                 return -EINVAL;
2296         }
2297         /* not reached */
2298 }
2299
2300 static void
2301 ksocknal_free_buffers (void)
2302 {
2303         LASSERT (atomic_read(&ksocknal_data.ksnd_nactive_txs) == 0);
2304
2305         if (ksocknal_data.ksnd_schedulers != NULL)
2306                 cfs_percpt_free(ksocknal_data.ksnd_schedulers);
2307
2308         spin_lock(&ksocknal_data.ksnd_tx_lock);
2309
2310         if (!list_empty(&ksocknal_data.ksnd_idle_noop_txs)) {
2311                 LIST_HEAD(zlist);
2312                 struct ksock_tx *tx;
2313
2314                 list_splice_init(&ksocknal_data.ksnd_idle_noop_txs, &zlist);
2315                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
2316
2317                 while (!list_empty(&zlist)) {
2318                         tx = list_entry(zlist.next, struct ksock_tx, tx_list);
2319                         list_del(&tx->tx_list);
2320                         LIBCFS_FREE(tx, tx->tx_desc_size);
2321                 }
2322         } else {
2323                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
2324         }
2325 }
2326
2327 static void
2328 ksocknal_base_shutdown(void)
2329 {
2330         struct ksock_sched *sched;
2331         struct ksock_peer_ni *peer_ni;
2332         int i;
2333
2334         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %lld\n",
2335                libcfs_kmem_read());
2336         LASSERT (ksocknal_data.ksnd_nnets == 0);
2337
2338         switch (ksocknal_data.ksnd_init) {
2339         default:
2340                 LASSERT(0);
2341                 /* fallthrough */
2342
2343         case SOCKNAL_INIT_ALL:
2344         case SOCKNAL_INIT_DATA:
2345                 hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list)
2346                         LASSERT(0);
2347
2348                 LASSERT(list_empty(&ksocknal_data.ksnd_nets));
2349                 LASSERT(list_empty(&ksocknal_data.ksnd_enomem_conns));
2350                 LASSERT(list_empty(&ksocknal_data.ksnd_zombie_conns));
2351                 LASSERT(list_empty(&ksocknal_data.ksnd_connd_connreqs));
2352                 LASSERT(list_empty(&ksocknal_data.ksnd_connd_routes));
2353
2354                 if (ksocknal_data.ksnd_schedulers != NULL) {
2355                         cfs_percpt_for_each(sched, i,
2356                                             ksocknal_data.ksnd_schedulers) {
2357
2358                                 LASSERT(list_empty(&sched->kss_tx_conns));
2359                                 LASSERT(list_empty(&sched->kss_rx_conns));
2360                                 LASSERT(list_empty(&sched->kss_zombie_noop_txs));
2361                                 LASSERT(sched->kss_nconns == 0);
2362                         }
2363                 }
2364
2365                 /* flag threads to terminate; wake and wait for them to die */
2366                 ksocknal_data.ksnd_shuttingdown = 1;
2367                 wake_up_all(&ksocknal_data.ksnd_connd_waitq);
2368                 wake_up_all(&ksocknal_data.ksnd_reaper_waitq);
2369
2370                 if (ksocknal_data.ksnd_schedulers != NULL) {
2371                         cfs_percpt_for_each(sched, i,
2372                                             ksocknal_data.ksnd_schedulers)
2373                                         wake_up_all(&sched->kss_waitq);
2374                 }
2375
2376                 wait_var_event_warning(&ksocknal_data.ksnd_nthreads,
2377                                        atomic_read(&ksocknal_data.ksnd_nthreads) == 0,
2378                                        "waiting for %d threads to terminate\n",
2379                                        atomic_read(&ksocknal_data.ksnd_nthreads));
2380
2381                 ksocknal_free_buffers();
2382
2383                 ksocknal_data.ksnd_init = SOCKNAL_INIT_NOTHING;
2384                 break;
2385         }
2386
2387         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %lld\n",
2388                libcfs_kmem_read());
2389
2390         module_put(THIS_MODULE);
2391 }
2392
2393 static int
2394 ksocknal_base_startup(void)
2395 {
2396         struct ksock_sched *sched;
2397         int rc;
2398         int i;
2399
2400         LASSERT(ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
2401         LASSERT(ksocknal_data.ksnd_nnets == 0);
2402
2403         memset(&ksocknal_data, 0, sizeof(ksocknal_data)); /* zero pointers */
2404
2405         hash_init(ksocknal_data.ksnd_peers);
2406
2407         rwlock_init(&ksocknal_data.ksnd_global_lock);
2408         INIT_LIST_HEAD(&ksocknal_data.ksnd_nets);
2409
2410         spin_lock_init(&ksocknal_data.ksnd_reaper_lock);
2411         INIT_LIST_HEAD(&ksocknal_data.ksnd_enomem_conns);
2412         INIT_LIST_HEAD(&ksocknal_data.ksnd_zombie_conns);
2413         INIT_LIST_HEAD(&ksocknal_data.ksnd_deathrow_conns);
2414         init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
2415
2416         spin_lock_init(&ksocknal_data.ksnd_connd_lock);
2417         INIT_LIST_HEAD(&ksocknal_data.ksnd_connd_connreqs);
2418         INIT_LIST_HEAD(&ksocknal_data.ksnd_connd_routes);
2419         init_waitqueue_head(&ksocknal_data.ksnd_connd_waitq);
2420
2421         spin_lock_init(&ksocknal_data.ksnd_tx_lock);
2422         INIT_LIST_HEAD(&ksocknal_data.ksnd_idle_noop_txs);
2423
2424         /* NB memset above zeros whole of ksocknal_data */
2425
2426         /* flag lists/ptrs/locks initialised */
2427         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
2428         if (!try_module_get(THIS_MODULE))
2429                 goto failed;
2430
2431         /* Create a scheduler block per available CPT */
2432         ksocknal_data.ksnd_schedulers = cfs_percpt_alloc(lnet_cpt_table(),
2433                                                          sizeof(*sched));
2434         if (ksocknal_data.ksnd_schedulers == NULL)
2435                 goto failed;
2436
2437         cfs_percpt_for_each(sched, i, ksocknal_data.ksnd_schedulers) {
2438                 int nthrs;
2439
2440                 /*
2441                  * make sure not to allocate more threads than there are
2442                  * cores/CPUs in teh CPT
2443                  */
2444                 nthrs = cfs_cpt_weight(lnet_cpt_table(), i);
2445                 if (*ksocknal_tunables.ksnd_nscheds > 0) {
2446                         nthrs = min(nthrs, *ksocknal_tunables.ksnd_nscheds);
2447                 } else {
2448                         /*
2449                          * max to half of CPUs, assume another half should be
2450                          * reserved for upper layer modules
2451                          */
2452                         nthrs = min(max(SOCKNAL_NSCHEDS, nthrs >> 1), nthrs);
2453                 }
2454
2455                 sched->kss_nthreads_max = nthrs;
2456                 sched->kss_cpt = i;
2457
2458                 spin_lock_init(&sched->kss_lock);
2459                 INIT_LIST_HEAD(&sched->kss_rx_conns);
2460                 INIT_LIST_HEAD(&sched->kss_tx_conns);
2461                 INIT_LIST_HEAD(&sched->kss_zombie_noop_txs);
2462                 init_waitqueue_head(&sched->kss_waitq);
2463         }
2464
2465         ksocknal_data.ksnd_connd_starting         = 0;
2466         ksocknal_data.ksnd_connd_failed_stamp     = 0;
2467         ksocknal_data.ksnd_connd_starting_stamp   = ktime_get_real_seconds();
2468         /* must have at least 2 connds to remain responsive to accepts while
2469          * connecting */
2470         if (*ksocknal_tunables.ksnd_nconnds < SOCKNAL_CONND_RESV + 1)
2471                 *ksocknal_tunables.ksnd_nconnds = SOCKNAL_CONND_RESV + 1;
2472
2473         if (*ksocknal_tunables.ksnd_nconnds_max <
2474             *ksocknal_tunables.ksnd_nconnds) {
2475                 ksocknal_tunables.ksnd_nconnds_max =
2476                         ksocknal_tunables.ksnd_nconnds;
2477         }
2478
2479         for (i = 0; i < *ksocknal_tunables.ksnd_nconnds; i++) {
2480                 char name[16];
2481                 spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
2482                 ksocknal_data.ksnd_connd_starting++;
2483                 spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
2484
2485
2486                 snprintf(name, sizeof(name), "socknal_cd%02d", i);
2487                 rc = ksocknal_thread_start(ksocknal_connd,
2488                                            (void *)((uintptr_t)i), name);
2489                 if (rc != 0) {
2490                         spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
2491                         ksocknal_data.ksnd_connd_starting--;
2492                         spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
2493                         CERROR("Can't spawn socknal connd: %d\n", rc);
2494                         goto failed;
2495                 }
2496         }
2497
2498         rc = ksocknal_thread_start(ksocknal_reaper, NULL, "socknal_reaper");
2499         if (rc != 0) {
2500                 CERROR ("Can't spawn socknal reaper: %d\n", rc);
2501                 goto failed;
2502         }
2503
2504         /* flag everything initialised */
2505         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
2506
2507         return 0;
2508
2509  failed:
2510         ksocknal_base_shutdown();
2511         return -ENETDOWN;
2512 }
2513
2514 static int
2515 ksocknal_debug_peerhash(struct lnet_ni *ni)
2516 {
2517         struct ksock_peer_ni *peer_ni;
2518         int i;
2519
2520         read_lock(&ksocknal_data.ksnd_global_lock);
2521
2522         hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list) {
2523                 struct ksock_route *route;
2524                 struct ksock_conn *conn;
2525
2526                 if (peer_ni->ksnp_ni != ni)
2527                         continue;
2528
2529                 CWARN("Active peer_ni on shutdown: %s, ref %d, "
2530                       "closing %d, accepting %d, err %d, zcookie %llu, "
2531                       "txq %d, zc_req %d\n", libcfs_id2str(peer_ni->ksnp_id),
2532                       refcount_read(&peer_ni->ksnp_refcount),
2533                       peer_ni->ksnp_closing,
2534                       peer_ni->ksnp_accepting, peer_ni->ksnp_error,
2535                       peer_ni->ksnp_zc_next_cookie,
2536                       !list_empty(&peer_ni->ksnp_tx_queue),
2537                       !list_empty(&peer_ni->ksnp_zc_req_list));
2538
2539                 list_for_each_entry(route, &peer_ni->ksnp_routes, ksnr_list) {
2540                         CWARN("Route: ref %d, schd %d, conn %d, cnted %d, del %d\n",
2541                               refcount_read(&route->ksnr_refcount),
2542                               route->ksnr_scheduled, route->ksnr_connecting,
2543                               route->ksnr_connected, route->ksnr_deleted);
2544                 }
2545
2546                 list_for_each_entry(conn, &peer_ni->ksnp_conns, ksnc_list) {
2547                         CWARN("Conn: ref %d, sref %d, t %d, c %d\n",
2548                               refcount_read(&conn->ksnc_conn_refcount),
2549                               refcount_read(&conn->ksnc_sock_refcount),
2550                               conn->ksnc_type, conn->ksnc_closing);
2551                 }
2552                 break;
2553         }
2554
2555         read_unlock(&ksocknal_data.ksnd_global_lock);
2556         return 0;
2557 }
2558
2559 void
2560 ksocknal_shutdown(struct lnet_ni *ni)
2561 {
2562         struct ksock_net *net = ni->ni_data;
2563         struct lnet_process_id anyid = {
2564                 .nid = LNET_NID_ANY,
2565                 .pid = LNET_PID_ANY,
2566         };
2567         int i;
2568
2569         LASSERT(ksocknal_data.ksnd_init == SOCKNAL_INIT_ALL);
2570         LASSERT(ksocknal_data.ksnd_nnets > 0);
2571
2572         /* prevent new peers */
2573         atomic_add(SOCKNAL_SHUTDOWN_BIAS, &net->ksnn_npeers);
2574
2575         /* Delete all peers */
2576         ksocknal_del_peer(ni, anyid, 0);
2577
2578         /* Wait for all peer_ni state to clean up */
2579         wait_var_event_warning(&net->ksnn_npeers,
2580                                atomic_read(&net->ksnn_npeers) ==
2581                                SOCKNAL_SHUTDOWN_BIAS,
2582                                "waiting for %d peers to disconnect\n",
2583                                ksocknal_debug_peerhash(ni) +
2584                                atomic_read(&net->ksnn_npeers) -
2585                                SOCKNAL_SHUTDOWN_BIAS);
2586
2587         for (i = 0; i < net->ksnn_ninterfaces; i++) {
2588                 LASSERT(net->ksnn_interfaces[i].ksni_npeers == 0);
2589                 LASSERT(net->ksnn_interfaces[i].ksni_nroutes == 0);
2590         }
2591
2592         list_del(&net->ksnn_list);
2593         LIBCFS_FREE(net, sizeof(*net));
2594
2595         ksocknal_data.ksnd_nnets--;
2596         if (ksocknal_data.ksnd_nnets == 0)
2597                 ksocknal_base_shutdown();
2598 }
2599
2600 static int
2601 ksocknal_search_new_ipif(struct ksock_net *net)
2602 {
2603         int new_ipif = 0;
2604         int i;
2605
2606         for (i = 0; i < net->ksnn_ninterfaces; i++) {
2607                 char *ifnam = &net->ksnn_interfaces[i].ksni_name[0];
2608                 char *colon = strchr(ifnam, ':');
2609                 bool found  = false;
2610                 struct ksock_net *tmp;
2611                 int j;
2612
2613                 if (colon != NULL) /* ignore alias device */
2614                         *colon = 0;
2615
2616                 list_for_each_entry(tmp, &ksocknal_data.ksnd_nets,
2617                                     ksnn_list) {
2618                         for (j = 0; !found && j < tmp->ksnn_ninterfaces; j++) {
2619                                 char *ifnam2 = &tmp->ksnn_interfaces[j].\
2620                                         ksni_name[0];
2621                                 char *colon2 = strchr(ifnam2, ':');
2622
2623                                 if (colon2 != NULL)
2624                                         *colon2 = 0;
2625
2626                                 found = strcmp(ifnam, ifnam2) == 0;
2627                                 if (colon2 != NULL)
2628                                         *colon2 = ':';
2629                         }
2630                         if (found)
2631                                 break;
2632                 }
2633
2634                 new_ipif += !found;
2635                 if (colon != NULL)
2636                         *colon = ':';
2637         }
2638
2639         return new_ipif;
2640 }
2641
2642 static int
2643 ksocknal_start_schedulers(struct ksock_sched *sched)
2644 {
2645         int     nthrs;
2646         int     rc = 0;
2647         int     i;
2648
2649         if (sched->kss_nthreads == 0) {
2650                 if (*ksocknal_tunables.ksnd_nscheds > 0) {
2651                         nthrs = sched->kss_nthreads_max;
2652                 } else {
2653                         nthrs = cfs_cpt_weight(lnet_cpt_table(),
2654                                                sched->kss_cpt);
2655                         nthrs = min(max(SOCKNAL_NSCHEDS, nthrs >> 1), nthrs);
2656                         nthrs = min(SOCKNAL_NSCHEDS_HIGH, nthrs);
2657                 }
2658                 nthrs = min(nthrs, sched->kss_nthreads_max);
2659         } else {
2660                 LASSERT(sched->kss_nthreads <= sched->kss_nthreads_max);
2661                 /* increase two threads if there is new interface */
2662                 nthrs = min(2, sched->kss_nthreads_max - sched->kss_nthreads);
2663         }
2664
2665         for (i = 0; i < nthrs; i++) {
2666                 long id;
2667                 char name[20];
2668
2669                 id = KSOCK_THREAD_ID(sched->kss_cpt, sched->kss_nthreads + i);
2670                 snprintf(name, sizeof(name), "socknal_sd%02d_%02d",
2671                          sched->kss_cpt, (int)KSOCK_THREAD_SID(id));
2672
2673                 rc = ksocknal_thread_start(ksocknal_scheduler,
2674                                            (void *)id, name);
2675                 if (rc == 0)
2676                         continue;
2677
2678                 CERROR("Can't spawn thread %d for scheduler[%d]: %d\n",
2679                        sched->kss_cpt, (int) KSOCK_THREAD_SID(id), rc);
2680                 break;
2681         }
2682
2683         sched->kss_nthreads += i;
2684         return rc;
2685 }
2686
2687 static int
2688 ksocknal_net_start_threads(struct ksock_net *net, __u32 *cpts, int ncpts)
2689 {
2690         int newif = ksocknal_search_new_ipif(net);
2691         int rc;
2692         int i;
2693
2694         if (ncpts > 0 && ncpts > cfs_cpt_number(lnet_cpt_table()))
2695                 return -EINVAL;
2696
2697         for (i = 0; i < ncpts; i++) {
2698                 struct ksock_sched *sched;
2699                 int cpt = (cpts == NULL) ? i : cpts[i];
2700
2701                 LASSERT(cpt < cfs_cpt_number(lnet_cpt_table()));
2702                 sched = ksocknal_data.ksnd_schedulers[cpt];
2703
2704                 if (!newif && sched->kss_nthreads > 0)
2705                         continue;
2706
2707                 rc = ksocknal_start_schedulers(sched);
2708                 if (rc != 0)
2709                         return rc;
2710         }
2711         return 0;
2712 }
2713
2714 int
2715 ksocknal_startup(struct lnet_ni *ni)
2716 {
2717         struct ksock_net *net;
2718         struct lnet_ioctl_config_lnd_cmn_tunables *net_tunables;
2719         struct ksock_interface *ksi = NULL;
2720         struct lnet_inetdev *ifaces = NULL;
2721         int i = 0;
2722         int rc;
2723
2724         LASSERT (ni->ni_net->net_lnd == &the_ksocklnd);
2725
2726         if (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING) {
2727                 rc = ksocknal_base_startup();
2728                 if (rc != 0)
2729                         return rc;
2730         }
2731
2732         LIBCFS_ALLOC(net, sizeof(*net));
2733         if (net == NULL)
2734                 goto fail_0;
2735
2736         net->ksnn_incarnation = ktime_get_real_ns();
2737         ni->ni_data = net;
2738         net_tunables = &ni->ni_net->net_tunables;
2739
2740         if (net_tunables->lct_peer_timeout == -1)
2741                 net_tunables->lct_peer_timeout =
2742                         *ksocknal_tunables.ksnd_peertimeout;
2743
2744         if (net_tunables->lct_max_tx_credits == -1)
2745                 net_tunables->lct_max_tx_credits =
2746                         *ksocknal_tunables.ksnd_credits;
2747
2748         if (net_tunables->lct_peer_tx_credits == -1)
2749                 net_tunables->lct_peer_tx_credits =
2750                         *ksocknal_tunables.ksnd_peertxcredits;
2751
2752         if (net_tunables->lct_peer_tx_credits >
2753             net_tunables->lct_max_tx_credits)
2754                 net_tunables->lct_peer_tx_credits =
2755                         net_tunables->lct_max_tx_credits;
2756
2757         if (net_tunables->lct_peer_rtr_credits == -1)
2758                 net_tunables->lct_peer_rtr_credits =
2759                         *ksocknal_tunables.ksnd_peerrtrcredits;
2760
2761         rc = lnet_inet_enumerate(&ifaces, ni->ni_net_ns);
2762         if (rc < 0)
2763                 goto fail_1;
2764
2765         if (!ni->ni_interfaces[0]) {
2766                 struct sockaddr_in *sa;
2767
2768                 ksi = &net->ksnn_interfaces[0];
2769                 sa = (void *)&ksi->ksni_addr;
2770
2771                 /* Use the first discovered interface */
2772                 net->ksnn_ninterfaces = 1;
2773                 ni->ni_dev_cpt = ifaces[0].li_cpt;
2774                 memset(sa, 0, sizeof(*sa));
2775                 sa->sin_family = AF_INET;
2776                 sa->sin_addr.s_addr = htonl(ifaces[0].li_ipaddr);
2777                 ksi->ksni_index = ksocknal_ip2index((struct sockaddr *)sa, ni);
2778                 ksi->ksni_netmask = ifaces[0].li_netmask;
2779                 strlcpy(ksi->ksni_name, ifaces[0].li_name,
2780                         sizeof(ksi->ksni_name));
2781         } else {
2782                 /* Before Multi-Rail ksocklnd would manage
2783                  * multiple interfaces with its own tcp bonding.
2784                  * If we encounter an old configuration using
2785                  * this tcp bonding approach then we need to
2786                  * handle more than one ni_interfaces.
2787                  *
2788                  * In Multi-Rail configuration only ONE ni_interface
2789                  * should exist. Each IP alias should be mapped to
2790                  * each 'struct net_ni'.
2791                  */
2792                 for (i = 0; i < LNET_INTERFACES_NUM; i++) {
2793                         int j;
2794
2795                         if (!ni->ni_interfaces[i])
2796                                 break;
2797
2798                         for (j = 0; j < LNET_INTERFACES_NUM;  j++) {
2799                                 if (i != j && ni->ni_interfaces[j] &&
2800                                     strcmp(ni->ni_interfaces[i],
2801                                            ni->ni_interfaces[j]) == 0) {
2802                                         rc = -EEXIST;
2803                                         CERROR("ksocklnd: found duplicate %s at %d and %d, rc = %d\n",
2804                                                ni->ni_interfaces[i], i, j, rc);
2805                                         goto fail_1;
2806                                 }
2807                         }
2808
2809                         for (j = 0; j < rc; j++) {
2810                                 struct sockaddr_in *sa;
2811
2812                                 if (strcmp(ifaces[j].li_name,
2813                                            ni->ni_interfaces[i]) != 0)
2814                                         continue;
2815
2816                                 ksi =
2817                                   &net->ksnn_interfaces[net->ksnn_ninterfaces];
2818                                 sa = (void *)&ksi->ksni_addr;
2819                                 ni->ni_dev_cpt = ifaces[j].li_cpt;
2820                                 memset(sa, 0, sizeof(*sa));
2821                                 sa->sin_family = AF_INET;
2822                                 sa->sin_addr.s_addr =
2823                                         htonl(ifaces[j].li_ipaddr);
2824                                 ksi->ksni_index = ksocknal_ip2index(
2825                                         (struct sockaddr *)sa, ni);
2826                                 ksi->ksni_netmask = ifaces[j].li_netmask;
2827                                 strlcpy(ksi->ksni_name, ifaces[j].li_name,
2828                                         sizeof(ksi->ksni_name));
2829                                 net->ksnn_ninterfaces++;
2830                                 break;
2831                         }
2832                 }
2833                 /* ni_interfaces don't map to all network interfaces */
2834                 if (!ksi || net->ksnn_ninterfaces != i) {
2835                         CERROR("ksocklnd: requested %d but only %d interfaces found\n",
2836                                i, net->ksnn_ninterfaces);
2837                         goto fail_1;
2838                 }
2839         }
2840
2841         /* call it before add it to ksocknal_data.ksnd_nets */
2842         rc = ksocknal_net_start_threads(net, ni->ni_cpts, ni->ni_ncpts);
2843         if (rc != 0)
2844                 goto fail_1;
2845
2846         LASSERT(ksi);
2847         LASSERT(ksi->ksni_addr.ss_family == AF_INET);
2848         ni->ni_nid = LNET_MKNID(
2849                 LNET_NIDNET(ni->ni_nid),
2850                 ntohl(((struct sockaddr_in *)
2851                        &ksi->ksni_addr)->sin_addr.s_addr));
2852         list_add(&net->ksnn_list, &ksocknal_data.ksnd_nets);
2853
2854         ksocknal_data.ksnd_nnets++;
2855
2856         return 0;
2857
2858 fail_1:
2859         LIBCFS_FREE(net, sizeof(*net));
2860 fail_0:
2861         if (ksocknal_data.ksnd_nnets == 0)
2862                 ksocknal_base_shutdown();
2863
2864         return -ENETDOWN;
2865 }
2866
2867
2868 static void __exit ksocklnd_exit(void)
2869 {
2870         lnet_unregister_lnd(&the_ksocklnd);
2871 }
2872
2873 static const struct lnet_lnd the_ksocklnd = {
2874         .lnd_type               = SOCKLND,
2875         .lnd_startup            = ksocknal_startup,
2876         .lnd_shutdown           = ksocknal_shutdown,
2877         .lnd_ctl                = ksocknal_ctl,
2878         .lnd_send               = ksocknal_send,
2879         .lnd_recv               = ksocknal_recv,
2880         .lnd_notify_peer_down   = ksocknal_notify_gw_down,
2881         .lnd_accept             = ksocknal_accept,
2882 };
2883
2884 static int __init ksocklnd_init(void)
2885 {
2886         int rc;
2887
2888         /* check ksnr_connected/connecting field large enough */
2889         BUILD_BUG_ON(SOCKLND_CONN_NTYPES > 4);
2890         BUILD_BUG_ON(SOCKLND_CONN_ACK != SOCKLND_CONN_BULK_IN);
2891
2892         rc = ksocknal_tunables_init();
2893         if (rc != 0)
2894                 return rc;
2895
2896         lnet_register_lnd(&the_ksocklnd);
2897
2898         return 0;
2899 }
2900
2901 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2902 MODULE_DESCRIPTION("TCP Socket LNet Network Driver");
2903 MODULE_VERSION("2.8.0");
2904 MODULE_LICENSE("GPL");
2905
2906 module_init(ksocklnd_init);
2907 module_exit(ksocklnd_exit);