Whamcloud - gitweb
4d3b7a4c4b08ed5d5d6f66402b238990a1c3e1ba
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lnet/klnds/socklnd/socklnd.c
33  *
34  * Author: Zach Brown <zab@zabbo.net>
35  * Author: Peter J. Braam <braam@clusterfs.com>
36  * Author: Phil Schwan <phil@clusterfs.com>
37  * Author: Eric Barton <eric@bartonsoftware.com>
38  */
39
40 #include <linux/inetdevice.h>
41 #include "socklnd.h"
42 #include <linux/sunrpc/addr.h>
43
44 static const struct lnet_lnd the_ksocklnd;
45 struct ksock_nal_data ksocknal_data;
46
47 static struct ksock_interface *
48 ksocknal_ip2iface(struct lnet_ni *ni, struct sockaddr *addr)
49 {
50         struct ksock_net *net = ni->ni_data;
51         int i;
52         struct ksock_interface *iface;
53
54         for (i = 0; i < net->ksnn_ninterfaces; i++) {
55                 LASSERT(i < LNET_INTERFACES_NUM);
56                 iface = &net->ksnn_interfaces[i];
57
58                 if (rpc_cmp_addr((struct sockaddr *)&iface->ksni_addr, addr))
59                         return iface;
60         }
61
62         return NULL;
63 }
64
65 static struct ksock_interface *
66 ksocknal_index2iface(struct lnet_ni *ni, int index)
67 {
68         struct ksock_net *net = ni->ni_data;
69         int i;
70         struct ksock_interface *iface;
71
72         for (i = 0; i < net->ksnn_ninterfaces; i++) {
73                 LASSERT(i < LNET_INTERFACES_NUM);
74                 iface = &net->ksnn_interfaces[i];
75
76                 if (iface->ksni_index == index)
77                         return iface;
78         }
79
80         return NULL;
81 }
82
83 static int ksocknal_ip2index(struct sockaddr *addr, struct lnet_ni *ni)
84 {
85         struct net_device *dev;
86         int ret = -1;
87         DECLARE_CONST_IN_IFADDR(ifa);
88
89         if (addr->sa_family != AF_INET)
90                 /* No IPv6 support yet */
91                 return ret;
92
93         rcu_read_lock();
94         for_each_netdev(ni->ni_net_ns, dev) {
95                 int flags = dev_get_flags(dev);
96                 struct in_device *in_dev;
97
98                 if (flags & IFF_LOOPBACK) /* skip the loopback IF */
99                         continue;
100
101                 if (!(flags & IFF_UP))
102                         continue;
103
104                 in_dev = __in_dev_get_rcu(dev);
105                 if (!in_dev)
106                         continue;
107
108                 in_dev_for_each_ifa_rcu(ifa, in_dev) {
109                         if (ifa->ifa_local ==
110                             ((struct sockaddr_in *)addr)->sin_addr.s_addr)
111                                 ret = dev->ifindex;
112                 }
113                 endfor_ifa(in_dev);
114                 if (ret >= 0)
115                         break;
116         }
117         rcu_read_unlock();
118
119         return ret;
120 }
121
122 static struct ksock_route *
123 ksocknal_create_route(struct sockaddr *addr)
124 {
125         struct ksock_route *route;
126
127         LIBCFS_ALLOC (route, sizeof (*route));
128         if (route == NULL)
129                 return (NULL);
130
131         refcount_set(&route->ksnr_refcount, 1);
132         route->ksnr_peer = NULL;
133         route->ksnr_retry_interval = 0;         /* OK to connect at any time */
134         rpc_copy_addr((struct sockaddr *)&route->ksnr_addr, addr);
135         rpc_set_port((struct sockaddr *)&route->ksnr_addr, rpc_get_port(addr));
136         route->ksnr_myiface = -1;
137         route->ksnr_scheduled = 0;
138         route->ksnr_connecting = 0;
139         route->ksnr_connected = 0;
140         route->ksnr_deleted = 0;
141         route->ksnr_conn_count = 0;
142         route->ksnr_share_count = 0;
143
144         return route;
145 }
146
147 void
148 ksocknal_destroy_route(struct ksock_route *route)
149 {
150         LASSERT(refcount_read(&route->ksnr_refcount) == 0);
151
152         if (route->ksnr_peer != NULL)
153                 ksocknal_peer_decref(route->ksnr_peer);
154
155         LIBCFS_FREE (route, sizeof (*route));
156 }
157
158 static struct ksock_peer_ni *
159 ksocknal_create_peer(struct lnet_ni *ni, struct lnet_process_id id)
160 {
161         int cpt = lnet_cpt_of_nid(id.nid, ni);
162         struct ksock_net *net = ni->ni_data;
163         struct ksock_peer_ni *peer_ni;
164
165         LASSERT(id.nid != LNET_NID_ANY);
166         LASSERT(id.pid != LNET_PID_ANY);
167         LASSERT(!in_interrupt());
168
169         if (!atomic_inc_unless_negative(&net->ksnn_npeers)) {
170                 CERROR("Can't create peer_ni: network shutdown\n");
171                 return ERR_PTR(-ESHUTDOWN);
172         }
173
174         LIBCFS_CPT_ALLOC(peer_ni, lnet_cpt_table(), cpt, sizeof(*peer_ni));
175         if (!peer_ni) {
176                 atomic_dec(&net->ksnn_npeers);
177                 return ERR_PTR(-ENOMEM);
178         }
179
180         peer_ni->ksnp_ni = ni;
181         peer_ni->ksnp_id = id;
182         refcount_set(&peer_ni->ksnp_refcount, 1); /* 1 ref for caller */
183         peer_ni->ksnp_closing = 0;
184         peer_ni->ksnp_accepting = 0;
185         peer_ni->ksnp_proto = NULL;
186         peer_ni->ksnp_last_alive = 0;
187         peer_ni->ksnp_zc_next_cookie = SOCKNAL_KEEPALIVE_PING + 1;
188
189         INIT_LIST_HEAD(&peer_ni->ksnp_conns);
190         INIT_LIST_HEAD(&peer_ni->ksnp_routes);
191         INIT_LIST_HEAD(&peer_ni->ksnp_tx_queue);
192         INIT_LIST_HEAD(&peer_ni->ksnp_zc_req_list);
193         spin_lock_init(&peer_ni->ksnp_lock);
194
195         return peer_ni;
196 }
197
198 void
199 ksocknal_destroy_peer(struct ksock_peer_ni *peer_ni)
200 {
201         struct ksock_net *net = peer_ni->ksnp_ni->ni_data;
202
203         CDEBUG (D_NET, "peer_ni %s %p deleted\n",
204                 libcfs_id2str(peer_ni->ksnp_id), peer_ni);
205
206         LASSERT(refcount_read(&peer_ni->ksnp_refcount) == 0);
207         LASSERT(peer_ni->ksnp_accepting == 0);
208         LASSERT(list_empty(&peer_ni->ksnp_conns));
209         LASSERT(list_empty(&peer_ni->ksnp_routes));
210         LASSERT(list_empty(&peer_ni->ksnp_tx_queue));
211         LASSERT(list_empty(&peer_ni->ksnp_zc_req_list));
212
213         LIBCFS_FREE(peer_ni, sizeof(*peer_ni));
214
215         /* NB a peer_ni's connections and routes keep a reference on their
216          * peer_ni until they are destroyed, so we can be assured that _all_
217          * state to do with this peer_ni has been cleaned up when its refcount
218          * drops to zero.
219          */
220         if (atomic_dec_and_test(&net->ksnn_npeers))
221                 wake_up_var(&net->ksnn_npeers);
222 }
223
224 struct ksock_peer_ni *
225 ksocknal_find_peer_locked(struct lnet_ni *ni, struct lnet_process_id id)
226 {
227         struct ksock_peer_ni *peer_ni;
228
229         hash_for_each_possible(ksocknal_data.ksnd_peers, peer_ni,
230                                ksnp_list, id.nid) {
231                 LASSERT(!peer_ni->ksnp_closing);
232
233                 if (peer_ni->ksnp_ni != ni)
234                         continue;
235
236                 if (peer_ni->ksnp_id.nid != id.nid ||
237                     peer_ni->ksnp_id.pid != id.pid)
238                         continue;
239
240                 CDEBUG(D_NET, "got peer_ni [%p] -> %s (%d)\n",
241                        peer_ni, libcfs_id2str(id),
242                        refcount_read(&peer_ni->ksnp_refcount));
243                 return peer_ni;
244         }
245         return NULL;
246 }
247
248 struct ksock_peer_ni *
249 ksocknal_find_peer(struct lnet_ni *ni, struct lnet_process_id id)
250 {
251         struct ksock_peer_ni *peer_ni;
252
253         read_lock(&ksocknal_data.ksnd_global_lock);
254         peer_ni = ksocknal_find_peer_locked(ni, id);
255         if (peer_ni != NULL)                    /* +1 ref for caller? */
256                 ksocknal_peer_addref(peer_ni);
257         read_unlock(&ksocknal_data.ksnd_global_lock);
258
259         return (peer_ni);
260 }
261
262 static void
263 ksocknal_unlink_peer_locked(struct ksock_peer_ni *peer_ni)
264 {
265         int i;
266         struct ksock_interface *iface;
267
268         for (i = 0; i < peer_ni->ksnp_n_passive_ips; i++) {
269                 struct sockaddr_in sa = { .sin_family = AF_INET };
270                 LASSERT(i < LNET_INTERFACES_NUM);
271                 sa.sin_addr.s_addr = htonl(peer_ni->ksnp_passive_ips[i]);
272
273                 iface = ksocknal_ip2iface(peer_ni->ksnp_ni,
274                                           (struct sockaddr *)&sa);
275                 /*
276                  * All IPs in peer_ni->ksnp_passive_ips[] come from the
277                  * interface list, therefore the call must succeed.
278                  */
279                 LASSERT(iface != NULL);
280
281                 CDEBUG(D_NET, "peer_ni=%p iface=%p ksni_nroutes=%d\n",
282                        peer_ni, iface, iface->ksni_nroutes);
283                 iface->ksni_npeers--;
284         }
285
286         LASSERT(list_empty(&peer_ni->ksnp_conns));
287         LASSERT(list_empty(&peer_ni->ksnp_routes));
288         LASSERT(!peer_ni->ksnp_closing);
289         peer_ni->ksnp_closing = 1;
290         hlist_del(&peer_ni->ksnp_list);
291         /* lose peerlist's ref */
292         ksocknal_peer_decref(peer_ni);
293 }
294
295 static int
296 ksocknal_get_peer_info(struct lnet_ni *ni, int index,
297                        struct lnet_process_id *id, __u32 *myip, __u32 *peer_ip,
298                        int *port, int *conn_count, int *share_count)
299 {
300         struct ksock_peer_ni *peer_ni;
301         struct ksock_route *route;
302         struct list_head *rtmp;
303         int i;
304         int j;
305         int rc = -ENOENT;
306
307         read_lock(&ksocknal_data.ksnd_global_lock);
308
309         hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list) {
310
311                 if (peer_ni->ksnp_ni != ni)
312                         continue;
313
314                 if (peer_ni->ksnp_n_passive_ips == 0 &&
315                     list_empty(&peer_ni->ksnp_routes)) {
316                         if (index-- > 0)
317                                 continue;
318
319                         *id = peer_ni->ksnp_id;
320                         *myip = 0;
321                         *peer_ip = 0;
322                         *port = 0;
323                         *conn_count = 0;
324                         *share_count = 0;
325                         rc = 0;
326                         goto out;
327                 }
328
329                 for (j = 0; j < peer_ni->ksnp_n_passive_ips; j++) {
330                         if (index-- > 0)
331                                 continue;
332
333                         *id = peer_ni->ksnp_id;
334                         *myip = peer_ni->ksnp_passive_ips[j];
335                         *peer_ip = 0;
336                         *port = 0;
337                         *conn_count = 0;
338                         *share_count = 0;
339                         rc = 0;
340                         goto out;
341                 }
342
343                 list_for_each(rtmp, &peer_ni->ksnp_routes) {
344                         if (index-- > 0)
345                                 continue;
346
347                         route = list_entry(rtmp, struct ksock_route,
348                                            ksnr_list);
349
350                         *id = peer_ni->ksnp_id;
351                         if (route->ksnr_addr.ss_family == AF_INET) {
352                                 struct sockaddr_in *sa =
353                                         (void *)&route->ksnr_addr;
354                                 rc = choose_ipv4_src(
355                                         myip,
356                                         route->ksnr_myiface,
357                                         ntohl(sa->sin_addr.s_addr),
358                                         ni->ni_net_ns);
359                                 *peer_ip = ntohl(sa->sin_addr.s_addr);
360                                 *port = ntohs(sa->sin_port);
361                         } else {
362                                 *myip = 0xFFFFFFFF;
363                                 *peer_ip = 0xFFFFFFFF;
364                                 *port = 0;
365                                 rc = -ENOTSUPP;
366                         }
367                         *conn_count = route->ksnr_conn_count;
368                         *share_count = route->ksnr_share_count;
369                         goto out;
370                 }
371         }
372 out:
373         read_unlock(&ksocknal_data.ksnd_global_lock);
374         return rc;
375 }
376
377 static void
378 ksocknal_associate_route_conn_locked(struct ksock_route *route,
379                                      struct ksock_conn *conn)
380 {
381         struct ksock_peer_ni *peer_ni = route->ksnr_peer;
382         int type = conn->ksnc_type;
383         struct ksock_interface *iface;
384         int conn_iface =
385                 ksocknal_ip2index((struct sockaddr *)&conn->ksnc_myaddr,
386                                   route->ksnr_peer->ksnp_ni);
387
388         conn->ksnc_route = route;
389         ksocknal_route_addref(route);
390
391         if (route->ksnr_myiface != conn_iface) {
392                 if (route->ksnr_myiface < 0) {
393                         /* route wasn't bound locally yet (the initial route) */
394                         CDEBUG(D_NET, "Binding %s %pIS to interface %d\n",
395                                libcfs_id2str(peer_ni->ksnp_id),
396                                &route->ksnr_addr,
397                                conn_iface);
398                 } else {
399                         CDEBUG(D_NET,
400                                "Rebinding %s %pIS from interface %d to %d\n",
401                                libcfs_id2str(peer_ni->ksnp_id),
402                                &route->ksnr_addr,
403                                route->ksnr_myiface,
404                                conn_iface);
405
406                         iface = ksocknal_index2iface(route->ksnr_peer->ksnp_ni,
407                                                      route->ksnr_myiface);
408                         if (iface)
409                                 iface->ksni_nroutes--;
410                 }
411                 route->ksnr_myiface = conn_iface;
412                 iface = ksocknal_index2iface(route->ksnr_peer->ksnp_ni,
413                                              route->ksnr_myiface);
414                 if (iface)
415                         iface->ksni_nroutes++;
416         }
417
418         route->ksnr_connected |= (1<<type);
419         route->ksnr_conn_count++;
420
421         /* Successful connection => further attempts can
422          * proceed immediately
423          */
424         route->ksnr_retry_interval = 0;
425 }
426
427 static void
428 ksocknal_add_route_locked(struct ksock_peer_ni *peer_ni, struct ksock_route *route)
429 {
430         struct list_head *tmp;
431         struct ksock_conn *conn;
432         struct ksock_route *route2;
433         struct ksock_net *net = peer_ni->ksnp_ni->ni_data;
434
435         LASSERT(!peer_ni->ksnp_closing);
436         LASSERT(route->ksnr_peer == NULL);
437         LASSERT(!route->ksnr_scheduled);
438         LASSERT(!route->ksnr_connecting);
439         LASSERT(route->ksnr_connected == 0);
440         LASSERT(net->ksnn_ninterfaces > 0);
441
442         /* LASSERT(unique) */
443         list_for_each(tmp, &peer_ni->ksnp_routes) {
444                 route2 = list_entry(tmp, struct ksock_route, ksnr_list);
445
446                 if (rpc_cmp_addr((struct sockaddr *)&route2->ksnr_addr,
447                                  (struct sockaddr *)&route->ksnr_addr)) {
448                         CERROR("Duplicate route %s %pI4h\n",
449                                libcfs_id2str(peer_ni->ksnp_id),
450                                &route->ksnr_addr);
451                         LBUG();
452                 }
453         }
454
455         route->ksnr_peer = peer_ni;
456         ksocknal_peer_addref(peer_ni);
457
458         /* set the route's interface to the current net's interface */
459         route->ksnr_myiface = net->ksnn_interfaces[0].ksni_index;
460         net->ksnn_interfaces[0].ksni_nroutes++;
461
462         /* peer_ni's routelist takes over my ref on 'route' */
463         list_add_tail(&route->ksnr_list, &peer_ni->ksnp_routes);
464
465         list_for_each(tmp, &peer_ni->ksnp_conns) {
466                 conn = list_entry(tmp, struct ksock_conn, ksnc_list);
467
468                 if (!rpc_cmp_addr((struct sockaddr *)&conn->ksnc_peeraddr,
469                                   (struct sockaddr *)&route->ksnr_addr))
470                         continue;
471
472                 ksocknal_associate_route_conn_locked(route, conn);
473                 /* keep going (typed routes) */
474         }
475 }
476
477 static void
478 ksocknal_del_route_locked(struct ksock_route *route)
479 {
480         struct ksock_peer_ni *peer_ni = route->ksnr_peer;
481         struct ksock_interface *iface;
482         struct ksock_conn *conn;
483         struct ksock_conn *cnxt;
484
485         LASSERT(!route->ksnr_deleted);
486
487         /* Close associated conns */
488         list_for_each_entry_safe(conn, cnxt, &peer_ni->ksnp_conns, ksnc_list) {
489                 if (conn->ksnc_route != route)
490                         continue;
491
492                 ksocknal_close_conn_locked(conn, 0);
493         }
494
495         if (route->ksnr_myiface >= 0) {
496                 iface = ksocknal_index2iface(route->ksnr_peer->ksnp_ni,
497                                              route->ksnr_myiface);
498                 if (iface)
499                         iface->ksni_nroutes--;
500         }
501
502         route->ksnr_deleted = 1;
503         list_del(&route->ksnr_list);
504         ksocknal_route_decref(route);           /* drop peer_ni's ref */
505
506         if (list_empty(&peer_ni->ksnp_routes) &&
507             list_empty(&peer_ni->ksnp_conns)) {
508                 /* I've just removed the last route to a peer_ni with no active
509                  * connections */
510                 ksocknal_unlink_peer_locked(peer_ni);
511         }
512 }
513
514 int
515 ksocknal_add_peer(struct lnet_ni *ni, struct lnet_process_id id,
516                   struct sockaddr *addr)
517 {
518         struct list_head *tmp;
519         struct ksock_peer_ni *peer_ni;
520         struct ksock_peer_ni *peer2;
521         struct ksock_route *route;
522         struct ksock_route *route2;
523
524         if (id.nid == LNET_NID_ANY ||
525             id.pid == LNET_PID_ANY)
526                 return (-EINVAL);
527
528         /* Have a brand new peer_ni ready... */
529         peer_ni = ksocknal_create_peer(ni, id);
530         if (IS_ERR(peer_ni))
531                 return PTR_ERR(peer_ni);
532
533         route = ksocknal_create_route(addr);
534         if (route == NULL) {
535                 ksocknal_peer_decref(peer_ni);
536                 return (-ENOMEM);
537         }
538
539         write_lock_bh(&ksocknal_data.ksnd_global_lock);
540
541         /* always called with a ref on ni, so shutdown can't have started */
542         LASSERT(atomic_read(&((struct ksock_net *)ni->ni_data)->ksnn_npeers)
543                 >= 0);
544
545         peer2 = ksocknal_find_peer_locked(ni, id);
546         if (peer2 != NULL) {
547                 ksocknal_peer_decref(peer_ni);
548                 peer_ni = peer2;
549         } else {
550                 /* peer_ni table takes my ref on peer_ni */
551                 hash_add(ksocknal_data.ksnd_peers, &peer_ni->ksnp_list, id.nid);
552         }
553
554         route2 = NULL;
555         list_for_each(tmp, &peer_ni->ksnp_routes) {
556                 route2 = list_entry(tmp, struct ksock_route, ksnr_list);
557
558                 if (rpc_cmp_addr(addr, (struct sockaddr *)&route2->ksnr_addr))
559                         break;
560
561                 route2 = NULL;
562         }
563         if (route2 == NULL) {
564                 ksocknal_add_route_locked(peer_ni, route);
565                 route->ksnr_share_count++;
566         } else {
567                 ksocknal_route_decref(route);
568                 route2->ksnr_share_count++;
569         }
570
571         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
572
573         return 0;
574 }
575
576 static void
577 ksocknal_del_peer_locked(struct ksock_peer_ni *peer_ni, __u32 ip)
578 {
579         struct ksock_conn *conn;
580         struct ksock_conn *cnxt;
581         struct ksock_route *route;
582         struct ksock_route *rnxt;
583         int nshared;
584
585         LASSERT(!peer_ni->ksnp_closing);
586
587         /* Extra ref prevents peer_ni disappearing until I'm done with it */
588         ksocknal_peer_addref(peer_ni);
589
590         list_for_each_entry_safe(route, rnxt, &peer_ni->ksnp_routes,
591                                  ksnr_list) {
592                 /* no match */
593                 if (ip) {
594                         if (route->ksnr_addr.ss_family != AF_INET)
595                                 continue;
596                         if (((struct sockaddr_in *)&route->ksnr_addr)
597                                         ->sin_addr.s_addr != htonl(ip))
598                                 continue;
599                 }
600
601                 route->ksnr_share_count = 0;
602                 /* This deletes associated conns too */
603                 ksocknal_del_route_locked(route);
604         }
605
606         nshared = 0;
607         list_for_each_entry(route, &peer_ni->ksnp_routes, ksnr_list)
608                 nshared += route->ksnr_share_count;
609
610         if (nshared == 0) {
611                 /* remove everything else if there are no explicit entries
612                  * left
613                  */
614                 list_for_each_entry_safe(route, rnxt, &peer_ni->ksnp_routes,
615                                          ksnr_list) {
616                         /* we should only be removing auto-entries */
617                         LASSERT(route->ksnr_share_count == 0);
618                         ksocknal_del_route_locked(route);
619                 }
620
621                 list_for_each_entry_safe(conn, cnxt, &peer_ni->ksnp_conns,
622                                          ksnc_list)
623                         ksocknal_close_conn_locked(conn, 0);
624         }
625
626         ksocknal_peer_decref(peer_ni);
627         /* NB peer_ni unlinks itself when last conn/route is removed */
628 }
629
630 static int
631 ksocknal_del_peer(struct lnet_ni *ni, struct lnet_process_id id, __u32 ip)
632 {
633         LIST_HEAD(zombies);
634         struct hlist_node *pnxt;
635         struct ksock_peer_ni *peer_ni;
636         int lo;
637         int hi;
638         int i;
639         int rc = -ENOENT;
640
641         write_lock_bh(&ksocknal_data.ksnd_global_lock);
642
643         if (id.nid != LNET_NID_ANY) {
644                 lo = hash_min(id.nid, HASH_BITS(ksocknal_data.ksnd_peers));
645                 hi = lo;
646         } else {
647                 lo = 0;
648                 hi = HASH_SIZE(ksocknal_data.ksnd_peers) - 1;
649         }
650
651         for (i = lo; i <= hi; i++) {
652                 hlist_for_each_entry_safe(peer_ni, pnxt,
653                                           &ksocknal_data.ksnd_peers[i],
654                                           ksnp_list) {
655                         if (peer_ni->ksnp_ni != ni)
656                                 continue;
657
658                         if (!((id.nid == LNET_NID_ANY ||
659                                peer_ni->ksnp_id.nid == id.nid) &&
660                               (id.pid == LNET_PID_ANY ||
661                                peer_ni->ksnp_id.pid == id.pid)))
662                                 continue;
663
664                         ksocknal_peer_addref(peer_ni);  /* a ref for me... */
665
666                         ksocknal_del_peer_locked(peer_ni, ip);
667
668                         if (peer_ni->ksnp_closing &&
669                             !list_empty(&peer_ni->ksnp_tx_queue)) {
670                                 LASSERT(list_empty(&peer_ni->ksnp_conns));
671                                 LASSERT(list_empty(&peer_ni->ksnp_routes));
672
673                                 list_splice_init(&peer_ni->ksnp_tx_queue,
674                                                  &zombies);
675                         }
676
677                         ksocknal_peer_decref(peer_ni);  /* ...till here */
678
679                         rc = 0;                         /* matched! */
680                 }
681         }
682
683         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
684
685         ksocknal_txlist_done(ni, &zombies, -ENETDOWN);
686
687         return rc;
688 }
689
690 static struct ksock_conn *
691 ksocknal_get_conn_by_idx(struct lnet_ni *ni, int index)
692 {
693         struct ksock_peer_ni *peer_ni;
694         struct ksock_conn *conn;
695         struct list_head *ctmp;
696         int i;
697
698         read_lock(&ksocknal_data.ksnd_global_lock);
699
700         hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list) {
701                 LASSERT(!peer_ni->ksnp_closing);
702
703                 if (peer_ni->ksnp_ni != ni)
704                         continue;
705
706                 list_for_each(ctmp, &peer_ni->ksnp_conns) {
707                         if (index-- > 0)
708                                 continue;
709
710                         conn = list_entry(ctmp, struct ksock_conn,
711                                           ksnc_list);
712                         ksocknal_conn_addref(conn);
713                         read_unlock(&ksocknal_data.ksnd_global_lock);
714                         return conn;
715                 }
716         }
717
718         read_unlock(&ksocknal_data.ksnd_global_lock);
719         return NULL;
720 }
721
722 static struct ksock_sched *
723 ksocknal_choose_scheduler_locked(unsigned int cpt)
724 {
725         struct ksock_sched *sched = ksocknal_data.ksnd_schedulers[cpt];
726         int i;
727
728         if (sched->kss_nthreads == 0) {
729                 cfs_percpt_for_each(sched, i, ksocknal_data.ksnd_schedulers) {
730                         if (sched->kss_nthreads > 0) {
731                                 CDEBUG(D_NET, "scheduler[%d] has no threads. selected scheduler[%d]\n",
732                                        cpt, sched->kss_cpt);
733                                 return sched;
734                         }
735                 }
736                 return NULL;
737         }
738
739         return sched;
740 }
741
742 static int
743 ksocknal_local_ipvec(struct lnet_ni *ni, __u32 *ipaddrs)
744 {
745         struct ksock_net *net = ni->ni_data;
746         int i, j;
747         int nip;
748
749         read_lock(&ksocknal_data.ksnd_global_lock);
750
751         nip = net->ksnn_ninterfaces;
752         LASSERT(nip <= LNET_INTERFACES_NUM);
753
754         for (i = 0, j = 0; i < nip; i++)
755                 if (net->ksnn_interfaces[i].ksni_addr.ss_family == AF_INET) {
756                         struct sockaddr_in *sa =
757                                 (void *)&net->ksnn_interfaces[i].ksni_addr;
758
759                         ipaddrs[j] = ntohl(sa->sin_addr.s_addr);
760                         LASSERT(ipaddrs[j] != 0);
761                         j += 1;
762                 }
763         nip = j;
764
765         read_unlock(&ksocknal_data.ksnd_global_lock);
766         /*
767          * Only offer interfaces for additional connections if I have
768          * more than one.
769          */
770         return nip < 2 ? 0 : nip;
771 }
772
773 static int
774 ksocknal_match_peerip(struct ksock_interface *iface, __u32 *ips, int nips)
775 {
776         int best_netmatch = 0;
777         int best_xor = 0;
778         int best = -1;
779         int this_xor;
780         int this_netmatch;
781         int i;
782         struct sockaddr_in *sa;
783         __u32 ip;
784
785         sa = (struct sockaddr_in *)&iface->ksni_addr;
786         LASSERT(sa->sin_family == AF_INET);
787         ip = ntohl(sa->sin_addr.s_addr);
788
789         for (i = 0; i < nips; i++) {
790                 if (ips[i] == 0)
791                         continue;
792
793                 this_xor = ips[i] ^ ip;
794                 this_netmatch = ((this_xor & iface->ksni_netmask) == 0) ? 1 : 0;
795
796                 if (!(best < 0 ||
797                       best_netmatch < this_netmatch ||
798                       (best_netmatch == this_netmatch &&
799                        best_xor > this_xor)))
800                         continue;
801
802                 best = i;
803                 best_netmatch = this_netmatch;
804                 best_xor = this_xor;
805         }
806
807         LASSERT(best >= 0);
808         return best;
809 }
810
811 static int
812 ksocknal_select_ips(struct ksock_peer_ni *peer_ni, __u32 *peerips, int n_peerips)
813 {
814         rwlock_t *global_lock = &ksocknal_data.ksnd_global_lock;
815         struct ksock_net *net = peer_ni->ksnp_ni->ni_data;
816         struct ksock_interface *iface;
817         struct ksock_interface *best_iface;
818         int n_ips;
819         int i;
820         int j;
821         int k;
822         u32 ip;
823         u32 xor;
824         int this_netmatch;
825         int best_netmatch;
826         int best_npeers;
827
828         /* CAVEAT EMPTOR: We do all our interface matching with an
829          * exclusive hold of global lock at IRQ priority.  We're only
830          * expecting to be dealing with small numbers of interfaces, so the
831          * O(n**3)-ness shouldn't matter */
832
833         /* Also note that I'm not going to return more than n_peerips
834          * interfaces, even if I have more myself */
835
836         write_lock_bh(global_lock);
837
838         LASSERT(n_peerips <= LNET_INTERFACES_NUM);
839         LASSERT(net->ksnn_ninterfaces <= LNET_INTERFACES_NUM);
840
841         /* Only match interfaces for additional connections
842          * if I have > 1 interface
843          */
844         n_ips = (net->ksnn_ninterfaces < 2) ? 0 :
845                 min(n_peerips, net->ksnn_ninterfaces);
846
847         for (i = 0; peer_ni->ksnp_n_passive_ips < n_ips; i++) {
848                 /*              ^ yes really... */
849
850                 /* If we have any new interfaces, first tick off all the
851                  * peer_ni IPs that match old interfaces, then choose new
852                  * interfaces to match the remaining peer_ni IPS.
853                  * We don't forget interfaces we've stopped using; we might
854                  * start using them again... */
855
856                 if (i < peer_ni->ksnp_n_passive_ips) {
857                         /* Old interface. */
858                         struct sockaddr_in sa = { .sin_family = AF_INET};
859
860                         sa.sin_addr.s_addr =
861                                 htonl(peer_ni->ksnp_passive_ips[i]);
862                         best_iface = ksocknal_ip2iface(peer_ni->ksnp_ni,
863                                                        (struct sockaddr *)&sa);
864
865                         /* peer_ni passive ips are kept up to date */
866                         LASSERT(best_iface != NULL);
867                 } else {
868                         /* choose a new interface */
869                         struct sockaddr_in *sa;
870
871                         LASSERT (i == peer_ni->ksnp_n_passive_ips);
872
873                         best_iface = NULL;
874                         best_netmatch = 0;
875                         best_npeers = 0;
876
877                         for (j = 0; j < net->ksnn_ninterfaces; j++) {
878                                 iface = &net->ksnn_interfaces[j];
879                                 sa = (void *)&iface->ksni_addr;
880                                 if (sa->sin_family != AF_INET)
881                                         continue;
882                                 ip = ntohl(sa->sin_addr.s_addr);
883
884                                 for (k = 0; k < peer_ni->ksnp_n_passive_ips; k++)
885                                         if (peer_ni->ksnp_passive_ips[k] == ip)
886                                                 break;
887
888                                 if (k < peer_ni->ksnp_n_passive_ips) /* using it already */
889                                         continue;
890
891                                 k = ksocknal_match_peerip(iface, peerips, n_peerips);
892                                 xor = (ip ^ peerips[k]);
893                                 this_netmatch = ((xor & iface->ksni_netmask) == 0) ? 1 : 0;
894
895                                 if (!(best_iface == NULL ||
896                                       best_netmatch < this_netmatch ||
897                                       (best_netmatch == this_netmatch &&
898                                        best_npeers > iface->ksni_npeers)))
899                                         continue;
900
901                                 best_iface = iface;
902                                 best_netmatch = this_netmatch;
903                                 best_npeers = iface->ksni_npeers;
904                         }
905
906                         LASSERT(best_iface != NULL);
907
908                         best_iface->ksni_npeers++;
909                         sa = (void *)&best_iface->ksni_addr;
910                         ip = ntohl(sa->sin_addr.s_addr);
911                         peer_ni->ksnp_passive_ips[i] = ip;
912                         peer_ni->ksnp_n_passive_ips = i+1;
913                 }
914
915                 /* mark the best matching peer_ni IP used */
916                 j = ksocknal_match_peerip(best_iface, peerips, n_peerips);
917                 peerips[j] = 0;
918         }
919
920         /* Overwrite input peer_ni IP addresses */
921         memcpy(peerips, peer_ni->ksnp_passive_ips, n_ips * sizeof(*peerips));
922
923         write_unlock_bh(global_lock);
924
925         return (n_ips);
926 }
927
928 static void
929 ksocknal_create_routes(struct ksock_peer_ni *peer_ni, int port,
930                        __u32 *peer_ipaddrs, int npeer_ipaddrs)
931 {
932         struct ksock_route      *newroute = NULL;
933         rwlock_t                *global_lock = &ksocknal_data.ksnd_global_lock;
934         struct lnet_ni *ni = peer_ni->ksnp_ni;
935         struct ksock_net                *net = ni->ni_data;
936         struct list_head        *rtmp;
937         struct ksock_route              *route;
938         struct ksock_interface  *iface;
939         struct ksock_interface  *best_iface;
940         int                     best_netmatch;
941         int                     this_netmatch;
942         int                     best_nroutes;
943         int                     i;
944         int                     j;
945
946         /* CAVEAT EMPTOR: We do all our interface matching with an
947          * exclusive hold of global lock at IRQ priority.  We're only
948          * expecting to be dealing with small numbers of interfaces, so the
949          * O(n**3)-ness here shouldn't matter */
950
951         write_lock_bh(global_lock);
952
953         if (net->ksnn_ninterfaces < 2) {
954                 /* Only create additional connections
955                  * if I have > 1 interface */
956                 write_unlock_bh(global_lock);
957                 return;
958         }
959
960         LASSERT(npeer_ipaddrs <= LNET_INTERFACES_NUM);
961
962         for (i = 0; i < npeer_ipaddrs; i++) {
963                 if (newroute) {
964                         struct sockaddr_in *sa = (void *)&newroute->ksnr_addr;
965
966                         memset(sa, 0, sizeof(*sa));
967                         sa->sin_family = AF_INET;
968                         sa->sin_addr.s_addr = htonl(peer_ipaddrs[i]);
969                 } else {
970                         struct sockaddr_in sa = {.sin_family = AF_INET};
971
972                         write_unlock_bh(global_lock);
973
974                         sa.sin_addr.s_addr = htonl(peer_ipaddrs[i]);
975                         sa.sin_port = htons(port);
976                         newroute =
977                                 ksocknal_create_route((struct sockaddr *)&sa);
978                         if (!newroute)
979                                 return;
980
981                         write_lock_bh(global_lock);
982                 }
983
984                 if (peer_ni->ksnp_closing) {
985                         /* peer_ni got closed under me */
986                         break;
987                 }
988
989                 /* Already got a route? */
990                 route = NULL;
991                 list_for_each(rtmp, &peer_ni->ksnp_routes) {
992                         route = list_entry(rtmp, struct ksock_route, ksnr_list);
993
994                         if (rpc_cmp_addr(
995                                     (struct sockaddr *)&route->ksnr_addr,
996                                     (struct sockaddr *)&newroute->ksnr_addr))
997                                 break;
998
999                         route = NULL;
1000                 }
1001                 if (route != NULL)
1002                         continue;
1003
1004                 best_iface = NULL;
1005                 best_nroutes = 0;
1006                 best_netmatch = 0;
1007
1008                 LASSERT(net->ksnn_ninterfaces <= LNET_INTERFACES_NUM);
1009
1010                 /* Select interface to connect from */
1011                 for (j = 0; j < net->ksnn_ninterfaces; j++) {
1012                         __u32 iface_ip, route_ip;
1013
1014                         iface = &net->ksnn_interfaces[j];
1015
1016                         /* Using this interface already? */
1017                         list_for_each(rtmp, &peer_ni->ksnp_routes) {
1018                                 route = list_entry(rtmp, struct ksock_route,
1019                                                    ksnr_list);
1020
1021                                 if (route->ksnr_myiface == iface->ksni_index)
1022                                         break;
1023
1024                                 route = NULL;
1025                         }
1026                         if (route != NULL)
1027                                 continue;
1028                         if (iface->ksni_addr.ss_family != AF_INET)
1029                                 continue;
1030                         if (newroute->ksnr_addr.ss_family != AF_INET)
1031                                 continue;
1032
1033                         iface_ip =
1034                                 ntohl(((struct sockaddr_in *)
1035                                        &iface->ksni_addr)->sin_addr.s_addr);
1036                         route_ip =
1037                                 ntohl(((struct sockaddr_in *)
1038                                        &newroute->ksnr_addr)->sin_addr.s_addr);
1039
1040                         this_netmatch = (((iface_ip ^ route_ip) &
1041                                           iface->ksni_netmask) == 0) ? 1 : 0;
1042
1043                         if (!(best_iface == NULL ||
1044                               best_netmatch < this_netmatch ||
1045                               (best_netmatch == this_netmatch &&
1046                                best_nroutes > iface->ksni_nroutes)))
1047                                 continue;
1048
1049                         best_iface = iface;
1050                         best_netmatch = this_netmatch;
1051                         best_nroutes = iface->ksni_nroutes;
1052                 }
1053
1054                 if (best_iface == NULL)
1055                         continue;
1056
1057                 newroute->ksnr_myiface = best_iface->ksni_index;
1058                 best_iface->ksni_nroutes++;
1059
1060                 ksocknal_add_route_locked(peer_ni, newroute);
1061                 newroute = NULL;
1062         }
1063
1064         write_unlock_bh(global_lock);
1065         if (newroute != NULL)
1066                 ksocknal_route_decref(newroute);
1067 }
1068
1069 int
1070 ksocknal_accept(struct lnet_ni *ni, struct socket *sock)
1071 {
1072         struct ksock_connreq *cr;
1073         int rc;
1074         struct sockaddr_storage peer;
1075
1076         rc = lnet_sock_getaddr(sock, true, &peer);
1077         LASSERT(rc == 0);               /* we succeeded before */
1078
1079         LIBCFS_ALLOC(cr, sizeof(*cr));
1080         if (cr == NULL) {
1081                 LCONSOLE_ERROR_MSG(0x12f,
1082                                    "Dropping connection request from %pIS: memory exhausted\n",
1083                                    &peer);
1084                 return -ENOMEM;
1085         }
1086
1087         lnet_ni_addref(ni);
1088         cr->ksncr_ni   = ni;
1089         cr->ksncr_sock = sock;
1090
1091         spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
1092
1093         list_add_tail(&cr->ksncr_list, &ksocknal_data.ksnd_connd_connreqs);
1094         wake_up(&ksocknal_data.ksnd_connd_waitq);
1095
1096         spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
1097         return 0;
1098 }
1099
1100 static int
1101 ksocknal_connecting(struct ksock_peer_ni *peer_ni, struct sockaddr *sa)
1102 {
1103         struct ksock_route *route;
1104
1105         list_for_each_entry(route, &peer_ni->ksnp_routes, ksnr_list) {
1106                 if (rpc_cmp_addr((struct sockaddr *)&route->ksnr_addr, sa))
1107                         return route->ksnr_connecting;
1108         }
1109         return 0;
1110 }
1111
1112 int
1113 ksocknal_create_conn(struct lnet_ni *ni, struct ksock_route *route,
1114                      struct socket *sock, int type)
1115 {
1116         rwlock_t *global_lock = &ksocknal_data.ksnd_global_lock;
1117         LIST_HEAD(zombies);
1118         struct lnet_process_id peerid;
1119         struct list_head *tmp;
1120         u64 incarnation;
1121         struct ksock_conn *conn;
1122         struct ksock_conn *conn2;
1123         struct ksock_peer_ni *peer_ni = NULL;
1124         struct ksock_peer_ni *peer2;
1125         struct ksock_sched *sched;
1126         struct ksock_hello_msg *hello;
1127         int cpt;
1128         struct ksock_tx *tx;
1129         struct ksock_tx *txtmp;
1130         int rc;
1131         int rc2;
1132         int active;
1133         char *warn = NULL;
1134
1135         active = (route != NULL);
1136
1137         LASSERT (active == (type != SOCKLND_CONN_NONE));
1138
1139         LIBCFS_ALLOC(conn, sizeof(*conn));
1140         if (conn == NULL) {
1141                 rc = -ENOMEM;
1142                 goto failed_0;
1143         }
1144
1145         conn->ksnc_peer = NULL;
1146         conn->ksnc_route = NULL;
1147         conn->ksnc_sock = sock;
1148         /* 2 ref, 1 for conn, another extra ref prevents socket
1149          * being closed before establishment of connection */
1150         refcount_set(&conn->ksnc_sock_refcount, 2);
1151         conn->ksnc_type = type;
1152         ksocknal_lib_save_callback(sock, conn);
1153         refcount_set(&conn->ksnc_conn_refcount, 1); /* 1 ref for me */
1154
1155         conn->ksnc_rx_ready = 0;
1156         conn->ksnc_rx_scheduled = 0;
1157
1158         INIT_LIST_HEAD(&conn->ksnc_tx_queue);
1159         conn->ksnc_tx_ready = 0;
1160         conn->ksnc_tx_scheduled = 0;
1161         conn->ksnc_tx_carrier = NULL;
1162         atomic_set (&conn->ksnc_tx_nob, 0);
1163
1164         LIBCFS_ALLOC(hello, offsetof(struct ksock_hello_msg,
1165                                      kshm_ips[LNET_INTERFACES_NUM]));
1166         if (hello == NULL) {
1167                 rc = -ENOMEM;
1168                 goto failed_1;
1169         }
1170
1171         /* stash conn's local and remote addrs */
1172         rc = ksocknal_lib_get_conn_addrs (conn);
1173         if (rc != 0)
1174                 goto failed_1;
1175
1176         /* Find out/confirm peer_ni's NID and connection type and get the
1177          * vector of interfaces she's willing to let me connect to.
1178          * Passive connections use the listener timeout since the peer_ni sends
1179          * eagerly */
1180
1181         if (active) {
1182                 peer_ni = route->ksnr_peer;
1183                 LASSERT(ni == peer_ni->ksnp_ni);
1184
1185                 /* Active connection sends HELLO eagerly */
1186                 hello->kshm_nips = ksocknal_local_ipvec(ni, hello->kshm_ips);
1187                 peerid = peer_ni->ksnp_id;
1188
1189                 write_lock_bh(global_lock);
1190                 conn->ksnc_proto = peer_ni->ksnp_proto;
1191                 write_unlock_bh(global_lock);
1192
1193                 if (conn->ksnc_proto == NULL) {
1194                          conn->ksnc_proto = &ksocknal_protocol_v3x;
1195 #if SOCKNAL_VERSION_DEBUG
1196                          if (*ksocknal_tunables.ksnd_protocol == 2)
1197                                  conn->ksnc_proto = &ksocknal_protocol_v2x;
1198                          else if (*ksocknal_tunables.ksnd_protocol == 1)
1199                                  conn->ksnc_proto = &ksocknal_protocol_v1x;
1200 #endif
1201                 }
1202
1203                 rc = ksocknal_send_hello (ni, conn, peerid.nid, hello);
1204                 if (rc != 0)
1205                         goto failed_1;
1206         } else {
1207                 peerid.nid = LNET_NID_ANY;
1208                 peerid.pid = LNET_PID_ANY;
1209
1210                 /* Passive, get protocol from peer_ni */
1211                 conn->ksnc_proto = NULL;
1212         }
1213
1214         rc = ksocknal_recv_hello (ni, conn, hello, &peerid, &incarnation);
1215         if (rc < 0)
1216                 goto failed_1;
1217
1218         LASSERT (rc == 0 || active);
1219         LASSERT (conn->ksnc_proto != NULL);
1220         LASSERT (peerid.nid != LNET_NID_ANY);
1221
1222         cpt = lnet_cpt_of_nid(peerid.nid, ni);
1223
1224         if (active) {
1225                 ksocknal_peer_addref(peer_ni);
1226                 write_lock_bh(global_lock);
1227         } else {
1228                 peer_ni = ksocknal_create_peer(ni, peerid);
1229                 if (IS_ERR(peer_ni)) {
1230                         rc = PTR_ERR(peer_ni);
1231                         goto failed_1;
1232                 }
1233
1234                 write_lock_bh(global_lock);
1235
1236                 /* called with a ref on ni, so shutdown can't have started */
1237                 LASSERT(atomic_read(&((struct ksock_net *)ni->ni_data)->ksnn_npeers) >= 0);
1238
1239                 peer2 = ksocknal_find_peer_locked(ni, peerid);
1240                 if (peer2 == NULL) {
1241                         /* NB this puts an "empty" peer_ni in the peer_ni
1242                          * table (which takes my ref) */
1243                         hash_add(ksocknal_data.ksnd_peers,
1244                                  &peer_ni->ksnp_list, peerid.nid);
1245                 } else {
1246                         ksocknal_peer_decref(peer_ni);
1247                         peer_ni = peer2;
1248                 }
1249
1250                 /* +1 ref for me */
1251                 ksocknal_peer_addref(peer_ni);
1252                 peer_ni->ksnp_accepting++;
1253
1254                 /* Am I already connecting to this guy?  Resolve in
1255                  * favour of higher NID...
1256                  */
1257                 if (peerid.nid < ni->ni_nid &&
1258                     ksocknal_connecting(peer_ni, ((struct sockaddr *)
1259                                                   &conn->ksnc_peeraddr))) {
1260                         rc = EALREADY;
1261                         warn = "connection race resolution";
1262                         goto failed_2;
1263                 }
1264         }
1265
1266         if (peer_ni->ksnp_closing ||
1267             (active && route->ksnr_deleted)) {
1268                 /* peer_ni/route got closed under me */
1269                 rc = -ESTALE;
1270                 warn = "peer_ni/route removed";
1271                 goto failed_2;
1272         }
1273
1274         if (peer_ni->ksnp_proto == NULL) {
1275                 /* Never connected before.
1276                  * NB recv_hello may have returned EPROTO to signal my peer_ni
1277                  * wants a different protocol than the one I asked for.
1278                  */
1279                 LASSERT(list_empty(&peer_ni->ksnp_conns));
1280
1281                 peer_ni->ksnp_proto = conn->ksnc_proto;
1282                 peer_ni->ksnp_incarnation = incarnation;
1283         }
1284
1285         if (peer_ni->ksnp_proto != conn->ksnc_proto ||
1286             peer_ni->ksnp_incarnation != incarnation) {
1287                 /* peer_ni rebooted or I've got the wrong protocol version */
1288                 ksocknal_close_peer_conns_locked(peer_ni, NULL, 0);
1289
1290                 peer_ni->ksnp_proto = NULL;
1291                 rc = ESTALE;
1292                 warn = peer_ni->ksnp_incarnation != incarnation ?
1293                         "peer_ni rebooted" :
1294                         "wrong proto version";
1295                 goto failed_2;
1296         }
1297
1298         switch (rc) {
1299         default:
1300                 LBUG();
1301         case 0:
1302                 break;
1303         case EALREADY:
1304                 warn = "lost conn race";
1305                 goto failed_2;
1306         case EPROTO:
1307                 warn = "retry with different protocol version";
1308                 goto failed_2;
1309         }
1310
1311         /* Refuse to duplicate an existing connection, unless this is a
1312          * loopback connection */
1313         if (!rpc_cmp_addr((struct sockaddr *)&conn->ksnc_peeraddr,
1314                           (struct sockaddr *)&conn->ksnc_myaddr)) {
1315                 list_for_each(tmp, &peer_ni->ksnp_conns) {
1316                         conn2 = list_entry(tmp, struct ksock_conn, ksnc_list);
1317
1318                         if (!rpc_cmp_addr(
1319                                     (struct sockaddr *)&conn2->ksnc_peeraddr,
1320                                     (struct sockaddr *)&conn->ksnc_peeraddr) ||
1321                             !rpc_cmp_addr(
1322                                     (struct sockaddr *)&conn2->ksnc_myaddr,
1323                                     (struct sockaddr *)&conn->ksnc_myaddr) ||
1324                             conn2->ksnc_type != conn->ksnc_type)
1325                                 continue;
1326
1327                         /* Reply on a passive connection attempt so the peer_ni
1328                          * realises we're connected. */
1329                         LASSERT (rc == 0);
1330                         if (!active)
1331                                 rc = EALREADY;
1332
1333                         warn = "duplicate";
1334                         goto failed_2;
1335                 }
1336         }
1337
1338         /* If the connection created by this route didn't bind to the IP
1339          * address the route connected to, the connection/route matching
1340          * code below probably isn't going to work. */
1341         if (active &&
1342             !rpc_cmp_addr((struct sockaddr *)&route->ksnr_addr,
1343                           (struct sockaddr *)&conn->ksnc_peeraddr)) {
1344                 CERROR("Route %s %pIS connected to %pIS\n",
1345                        libcfs_id2str(peer_ni->ksnp_id),
1346                        &route->ksnr_addr,
1347                        &conn->ksnc_peeraddr);
1348         }
1349
1350         /* Search for a route corresponding to the new connection and
1351          * create an association.  This allows incoming connections created
1352          * by routes in my peer_ni to match my own route entries so I don't
1353          * continually create duplicate routes. */
1354         list_for_each(tmp, &peer_ni->ksnp_routes) {
1355                 route = list_entry(tmp, struct ksock_route, ksnr_list);
1356
1357                 if (!rpc_cmp_addr((struct sockaddr *)&route->ksnr_addr,
1358                                   (struct sockaddr *)&conn->ksnc_peeraddr))
1359                         continue;
1360
1361                 ksocknal_associate_route_conn_locked(route, conn);
1362                 break;
1363         }
1364
1365         conn->ksnc_peer = peer_ni;                 /* conn takes my ref on peer_ni */
1366         peer_ni->ksnp_last_alive = ktime_get_seconds();
1367         peer_ni->ksnp_send_keepalive = 0;
1368         peer_ni->ksnp_error = 0;
1369
1370         sched = ksocknal_choose_scheduler_locked(cpt);
1371         if (!sched) {
1372                 CERROR("no schedulers available. node is unhealthy\n");
1373                 goto failed_2;
1374         }
1375         /*
1376          * The cpt might have changed if we ended up selecting a non cpt
1377          * native scheduler. So use the scheduler's cpt instead.
1378          */
1379         cpt = sched->kss_cpt;
1380         sched->kss_nconns++;
1381         conn->ksnc_scheduler = sched;
1382
1383         conn->ksnc_tx_last_post = ktime_get_seconds();
1384         /* Set the deadline for the outgoing HELLO to drain */
1385         conn->ksnc_tx_bufnob = sock->sk->sk_wmem_queued;
1386         conn->ksnc_tx_deadline = ktime_get_seconds() +
1387                                  ksocknal_timeout();
1388         smp_mb();   /* order with adding to peer_ni's conn list */
1389
1390         list_add(&conn->ksnc_list, &peer_ni->ksnp_conns);
1391         ksocknal_conn_addref(conn);
1392
1393         ksocknal_new_packet(conn, 0);
1394
1395         conn->ksnc_zc_capable = ksocknal_lib_zc_capable(conn);
1396
1397         /* Take packets blocking for this connection. */
1398         list_for_each_entry_safe(tx, txtmp, &peer_ni->ksnp_tx_queue, tx_list) {
1399                 if (conn->ksnc_proto->pro_match_tx(conn, tx, tx->tx_nonblk) ==
1400                     SOCKNAL_MATCH_NO)
1401                         continue;
1402
1403                 list_del(&tx->tx_list);
1404                 ksocknal_queue_tx_locked(tx, conn);
1405         }
1406
1407         write_unlock_bh(global_lock);
1408
1409         /* We've now got a new connection.  Any errors from here on are just
1410          * like "normal" comms errors and we close the connection normally.
1411          * NB (a) we still have to send the reply HELLO for passive
1412          *        connections,
1413          *    (b) normal I/O on the conn is blocked until I setup and call the
1414          *        socket callbacks.
1415          */
1416
1417         CDEBUG(D_NET, "New conn %s p %d.x %pIS -> %pISp"
1418                " incarnation:%lld sched[%d]\n",
1419                libcfs_id2str(peerid), conn->ksnc_proto->pro_version,
1420                &conn->ksnc_myaddr, &conn->ksnc_peeraddr,
1421                incarnation, cpt);
1422
1423         if (active) {
1424                 /* additional routes after interface exchange? */
1425                 ksocknal_create_routes(
1426                         peer_ni,
1427                         rpc_get_port((struct sockaddr *)&conn->ksnc_peeraddr),
1428                         hello->kshm_ips, hello->kshm_nips);
1429         } else {
1430                 hello->kshm_nips = ksocknal_select_ips(peer_ni, hello->kshm_ips,
1431                                                        hello->kshm_nips);
1432                 rc = ksocknal_send_hello(ni, conn, peerid.nid, hello);
1433         }
1434
1435         LIBCFS_FREE(hello, offsetof(struct ksock_hello_msg,
1436                                     kshm_ips[LNET_INTERFACES_NUM]));
1437
1438         /* setup the socket AFTER I've received hello (it disables
1439          * SO_LINGER).  I might call back to the acceptor who may want
1440          * to send a protocol version response and then close the
1441          * socket; this ensures the socket only tears down after the
1442          * response has been sent. */
1443         if (rc == 0)
1444                 rc = ksocknal_lib_setup_sock(sock);
1445
1446         write_lock_bh(global_lock);
1447
1448         /* NB my callbacks block while I hold ksnd_global_lock */
1449         ksocknal_lib_set_callback(sock, conn);
1450
1451         if (!active)
1452                 peer_ni->ksnp_accepting--;
1453
1454         write_unlock_bh(global_lock);
1455
1456         if (rc != 0) {
1457                 write_lock_bh(global_lock);
1458                 if (!conn->ksnc_closing) {
1459                         /* could be closed by another thread */
1460                         ksocknal_close_conn_locked(conn, rc);
1461                 }
1462                 write_unlock_bh(global_lock);
1463         } else if (ksocknal_connsock_addref(conn) == 0) {
1464                 /* Allow I/O to proceed. */
1465                 ksocknal_read_callback(conn);
1466                 ksocknal_write_callback(conn);
1467                 ksocknal_connsock_decref(conn);
1468         }
1469
1470         ksocknal_connsock_decref(conn);
1471         ksocknal_conn_decref(conn);
1472         return rc;
1473
1474 failed_2:
1475         if (!peer_ni->ksnp_closing &&
1476             list_empty(&peer_ni->ksnp_conns) &&
1477             list_empty(&peer_ni->ksnp_routes)) {
1478                 list_splice_init(&peer_ni->ksnp_tx_queue, &zombies);
1479                 ksocknal_unlink_peer_locked(peer_ni);
1480         }
1481
1482         write_unlock_bh(global_lock);
1483
1484         if (warn != NULL) {
1485                 if (rc < 0)
1486                         CERROR("Not creating conn %s type %d: %s\n",
1487                                libcfs_id2str(peerid), conn->ksnc_type, warn);
1488                 else
1489                         CDEBUG(D_NET, "Not creating conn %s type %d: %s\n",
1490                               libcfs_id2str(peerid), conn->ksnc_type, warn);
1491         }
1492
1493         if (!active) {
1494                 if (rc > 0) {
1495                         /* Request retry by replying with CONN_NONE
1496                          * ksnc_proto has been set already */
1497                         conn->ksnc_type = SOCKLND_CONN_NONE;
1498                         hello->kshm_nips = 0;
1499                         ksocknal_send_hello(ni, conn, peerid.nid, hello);
1500                 }
1501
1502                 write_lock_bh(global_lock);
1503                 peer_ni->ksnp_accepting--;
1504                 write_unlock_bh(global_lock);
1505         }
1506
1507         /*
1508          * If we get here without an error code, just use -EALREADY.
1509          * Depending on how we got here, the error may be positive
1510          * or negative. Normalize the value for ksocknal_txlist_done().
1511          */
1512         rc2 = (rc == 0 ? -EALREADY : (rc > 0 ? -rc : rc));
1513         ksocknal_txlist_done(ni, &zombies, rc2);
1514         ksocknal_peer_decref(peer_ni);
1515
1516 failed_1:
1517         if (hello != NULL)
1518                 LIBCFS_FREE(hello, offsetof(struct ksock_hello_msg,
1519                                             kshm_ips[LNET_INTERFACES_NUM]));
1520
1521         LIBCFS_FREE(conn, sizeof(*conn));
1522
1523 failed_0:
1524         sock_release(sock);
1525         return rc;
1526 }
1527
1528 void
1529 ksocknal_close_conn_locked(struct ksock_conn *conn, int error)
1530 {
1531         /* This just does the immmediate housekeeping, and queues the
1532          * connection for the reaper to terminate.
1533          * Caller holds ksnd_global_lock exclusively in irq context */
1534         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1535         struct ksock_route *route;
1536         struct ksock_conn *conn2;
1537         struct list_head *tmp;
1538
1539         LASSERT(peer_ni->ksnp_error == 0);
1540         LASSERT(!conn->ksnc_closing);
1541         conn->ksnc_closing = 1;
1542
1543         /* ksnd_deathrow_conns takes over peer_ni's ref */
1544         list_del(&conn->ksnc_list);
1545
1546         route = conn->ksnc_route;
1547         if (route != NULL) {
1548                 /* dissociate conn from route... */
1549                 LASSERT(!route->ksnr_deleted);
1550                 LASSERT((route->ksnr_connected & BIT(conn->ksnc_type)) != 0);
1551
1552                 conn2 = NULL;
1553                 list_for_each(tmp, &peer_ni->ksnp_conns) {
1554                         conn2 = list_entry(tmp, struct ksock_conn, ksnc_list);
1555
1556                         if (conn2->ksnc_route == route &&
1557                             conn2->ksnc_type == conn->ksnc_type)
1558                                 break;
1559
1560                         conn2 = NULL;
1561                 }
1562                 if (conn2 == NULL)
1563                         route->ksnr_connected &= ~BIT(conn->ksnc_type);
1564
1565                 conn->ksnc_route = NULL;
1566
1567                 ksocknal_route_decref(route);   /* drop conn's ref on route */
1568         }
1569
1570         if (list_empty(&peer_ni->ksnp_conns)) {
1571                 /* No more connections to this peer_ni */
1572
1573                 if (!list_empty(&peer_ni->ksnp_tx_queue)) {
1574                         struct ksock_tx *tx;
1575
1576                         LASSERT(conn->ksnc_proto == &ksocknal_protocol_v3x);
1577
1578                         /* throw them to the last connection...,
1579                          * these TXs will be send to /dev/null by scheduler */
1580                         list_for_each_entry(tx, &peer_ni->ksnp_tx_queue,
1581                                             tx_list)
1582                                 ksocknal_tx_prep(conn, tx);
1583
1584                         spin_lock_bh(&conn->ksnc_scheduler->kss_lock);
1585                         list_splice_init(&peer_ni->ksnp_tx_queue,
1586                                          &conn->ksnc_tx_queue);
1587                         spin_unlock_bh(&conn->ksnc_scheduler->kss_lock);
1588                 }
1589
1590                 /* renegotiate protocol version */
1591                 peer_ni->ksnp_proto = NULL;
1592                 /* stash last conn close reason */
1593                 peer_ni->ksnp_error = error;
1594
1595                 if (list_empty(&peer_ni->ksnp_routes)) {
1596                         /* I've just closed last conn belonging to a
1597                          * peer_ni with no routes to it */
1598                         ksocknal_unlink_peer_locked(peer_ni);
1599                 }
1600         }
1601
1602         spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
1603
1604         list_add_tail(&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
1605         wake_up(&ksocknal_data.ksnd_reaper_waitq);
1606
1607         spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
1608 }
1609
1610 void
1611 ksocknal_peer_failed(struct ksock_peer_ni *peer_ni)
1612 {
1613         bool notify = false;
1614         time64_t last_alive = 0;
1615
1616         /* There has been a connection failure or comms error; but I'll only
1617          * tell LNET I think the peer_ni is dead if it's to another kernel and
1618          * there are no connections or connection attempts in existence. */
1619
1620         read_lock(&ksocknal_data.ksnd_global_lock);
1621
1622         if ((peer_ni->ksnp_id.pid & LNET_PID_USERFLAG) == 0 &&
1623              list_empty(&peer_ni->ksnp_conns) &&
1624              peer_ni->ksnp_accepting == 0 &&
1625              ksocknal_find_connecting_route_locked(peer_ni) == NULL) {
1626                 notify = true;
1627                 last_alive = peer_ni->ksnp_last_alive;
1628         }
1629
1630         read_unlock(&ksocknal_data.ksnd_global_lock);
1631
1632         if (notify)
1633                 lnet_notify(peer_ni->ksnp_ni, peer_ni->ksnp_id.nid,
1634                             false, false, last_alive);
1635 }
1636
1637 void
1638 ksocknal_finalize_zcreq(struct ksock_conn *conn)
1639 {
1640         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1641         struct ksock_tx *tx;
1642         struct ksock_tx *tmp;
1643         LIST_HEAD(zlist);
1644
1645         /* NB safe to finalize TXs because closing of socket will
1646          * abort all buffered data */
1647         LASSERT(conn->ksnc_sock == NULL);
1648
1649         spin_lock(&peer_ni->ksnp_lock);
1650
1651         list_for_each_entry_safe(tx, tmp, &peer_ni->ksnp_zc_req_list, tx_zc_list) {
1652                 if (tx->tx_conn != conn)
1653                         continue;
1654
1655                 LASSERT(tx->tx_msg.ksm_zc_cookies[0] != 0);
1656
1657                 tx->tx_msg.ksm_zc_cookies[0] = 0;
1658                 tx->tx_zc_aborted = 1;  /* mark it as not-acked */
1659                 list_move(&tx->tx_zc_list, &zlist);
1660         }
1661
1662         spin_unlock(&peer_ni->ksnp_lock);
1663
1664         while (!list_empty(&zlist)) {
1665                 tx = list_entry(zlist.next, struct ksock_tx, tx_zc_list);
1666
1667                 list_del(&tx->tx_zc_list);
1668                 ksocknal_tx_decref(tx);
1669         }
1670 }
1671
1672 void
1673 ksocknal_terminate_conn(struct ksock_conn *conn)
1674 {
1675         /* This gets called by the reaper (guaranteed thread context) to
1676          * disengage the socket from its callbacks and close it.
1677          * ksnc_refcount will eventually hit zero, and then the reaper will
1678          * destroy it.
1679          */
1680         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1681         struct ksock_sched *sched = conn->ksnc_scheduler;
1682         bool failed = false;
1683
1684         LASSERT(conn->ksnc_closing);
1685
1686         /* wake up the scheduler to "send" all remaining packets to /dev/null */
1687         spin_lock_bh(&sched->kss_lock);
1688
1689         /* a closing conn is always ready to tx */
1690         conn->ksnc_tx_ready = 1;
1691
1692         if (!conn->ksnc_tx_scheduled &&
1693             !list_empty(&conn->ksnc_tx_queue)) {
1694                 list_add_tail(&conn->ksnc_tx_list,
1695                               &sched->kss_tx_conns);
1696                 conn->ksnc_tx_scheduled = 1;
1697                 /* extra ref for scheduler */
1698                 ksocknal_conn_addref(conn);
1699
1700                 wake_up (&sched->kss_waitq);
1701         }
1702
1703         spin_unlock_bh(&sched->kss_lock);
1704
1705         /* serialise with callbacks */
1706         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1707
1708         ksocknal_lib_reset_callback(conn->ksnc_sock, conn);
1709
1710         /* OK, so this conn may not be completely disengaged from its
1711          * scheduler yet, but it _has_ committed to terminate...
1712          */
1713         conn->ksnc_scheduler->kss_nconns--;
1714
1715         if (peer_ni->ksnp_error != 0) {
1716                 /* peer_ni's last conn closed in error */
1717                 LASSERT(list_empty(&peer_ni->ksnp_conns));
1718                 failed = true;
1719                 peer_ni->ksnp_error = 0;     /* avoid multiple notifications */
1720         }
1721
1722         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1723
1724         if (failed)
1725                 ksocknal_peer_failed(peer_ni);
1726
1727         /* The socket is closed on the final put; either here, or in
1728          * ksocknal_{send,recv}msg().  Since we set up the linger2 option
1729          * when the connection was established, this will close the socket
1730          * immediately, aborting anything buffered in it. Any hung
1731          * zero-copy transmits will therefore complete in finite time.
1732          */
1733         ksocknal_connsock_decref(conn);
1734 }
1735
1736 void
1737 ksocknal_queue_zombie_conn(struct ksock_conn *conn)
1738 {
1739         /* Queue the conn for the reaper to destroy */
1740         LASSERT(refcount_read(&conn->ksnc_conn_refcount) == 0);
1741         spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
1742
1743         list_add_tail(&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1744         wake_up(&ksocknal_data.ksnd_reaper_waitq);
1745
1746         spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
1747 }
1748
1749 void
1750 ksocknal_destroy_conn(struct ksock_conn *conn)
1751 {
1752         time64_t last_rcv;
1753
1754         /* Final coup-de-grace of the reaper */
1755         CDEBUG (D_NET, "connection %p\n", conn);
1756
1757         LASSERT(refcount_read(&conn->ksnc_conn_refcount) == 0);
1758         LASSERT(refcount_read(&conn->ksnc_sock_refcount) == 0);
1759         LASSERT (conn->ksnc_sock == NULL);
1760         LASSERT (conn->ksnc_route == NULL);
1761         LASSERT (!conn->ksnc_tx_scheduled);
1762         LASSERT (!conn->ksnc_rx_scheduled);
1763         LASSERT(list_empty(&conn->ksnc_tx_queue));
1764
1765         /* complete current receive if any */
1766         switch (conn->ksnc_rx_state) {
1767         case SOCKNAL_RX_LNET_PAYLOAD:
1768                 last_rcv = conn->ksnc_rx_deadline -
1769                            ksocknal_timeout();
1770                 CERROR("Completing partial receive from %s[%d], ip %pISp, with error, wanted: %d, left: %d, last alive is %lld secs ago\n",
1771                        libcfs_id2str(conn->ksnc_peer->ksnp_id), conn->ksnc_type,
1772                        &conn->ksnc_peeraddr,
1773                        conn->ksnc_rx_nob_wanted, conn->ksnc_rx_nob_left,
1774                        ktime_get_seconds() - last_rcv);
1775                 if (conn->ksnc_lnet_msg)
1776                         conn->ksnc_lnet_msg->msg_health_status =
1777                                 LNET_MSG_STATUS_REMOTE_ERROR;
1778                 lnet_finalize(conn->ksnc_lnet_msg, -EIO);
1779                 break;
1780         case SOCKNAL_RX_LNET_HEADER:
1781                 if (conn->ksnc_rx_started)
1782                         CERROR("Incomplete receive of lnet header from %s, ip %pISp, with error, protocol: %d.x.\n",
1783                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1784                                &conn->ksnc_peeraddr,
1785                                conn->ksnc_proto->pro_version);
1786                 break;
1787         case SOCKNAL_RX_KSM_HEADER:
1788                 if (conn->ksnc_rx_started)
1789                         CERROR("Incomplete receive of ksock message from %s, ip %pISp, with error, protocol: %d.x.\n",
1790                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1791                                &conn->ksnc_peeraddr,
1792                                conn->ksnc_proto->pro_version);
1793                 break;
1794         case SOCKNAL_RX_SLOP:
1795                 if (conn->ksnc_rx_started)
1796                         CERROR("Incomplete receive of slops from %s, ip %pISp, with error\n",
1797                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1798                                &conn->ksnc_peeraddr);
1799                break;
1800         default:
1801                 LBUG ();
1802                 break;
1803         }
1804
1805         ksocknal_peer_decref(conn->ksnc_peer);
1806
1807         LIBCFS_FREE (conn, sizeof (*conn));
1808 }
1809
1810 int
1811 ksocknal_close_peer_conns_locked(struct ksock_peer_ni *peer_ni,
1812                                  struct sockaddr *addr, int why)
1813 {
1814         struct ksock_conn *conn;
1815         struct ksock_conn *cnxt;
1816         int count = 0;
1817
1818         list_for_each_entry_safe(conn, cnxt, &peer_ni->ksnp_conns, ksnc_list) {
1819                 if (!addr ||
1820                     rpc_cmp_addr(addr,
1821                                  (struct sockaddr *)&conn->ksnc_peeraddr)) {
1822                         count++;
1823                         ksocknal_close_conn_locked(conn, why);
1824                 }
1825         }
1826
1827         return count;
1828 }
1829
1830 int
1831 ksocknal_close_conn_and_siblings(struct ksock_conn *conn, int why)
1832 {
1833         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1834         int count;
1835
1836         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1837
1838         count = ksocknal_close_peer_conns_locked(
1839                 peer_ni, (struct sockaddr *)&conn->ksnc_peeraddr, why);
1840
1841         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1842
1843         return count;
1844 }
1845
1846 int
1847 ksocknal_close_matching_conns(struct lnet_process_id id, __u32 ipaddr)
1848 {
1849         struct ksock_peer_ni *peer_ni;
1850         struct hlist_node *pnxt;
1851         int lo;
1852         int hi;
1853         int i;
1854         int count = 0;
1855         struct sockaddr_in sa = {.sin_family = AF_INET};
1856
1857         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1858
1859         if (id.nid != LNET_NID_ANY) {
1860                 lo = hash_min(id.nid, HASH_BITS(ksocknal_data.ksnd_peers));
1861                 hi = lo;
1862         } else {
1863                 lo = 0;
1864                 hi = HASH_SIZE(ksocknal_data.ksnd_peers) - 1;
1865         }
1866
1867         sa.sin_addr.s_addr = htonl(ipaddr);
1868         for (i = lo; i <= hi; i++) {
1869                 hlist_for_each_entry_safe(peer_ni, pnxt,
1870                                           &ksocknal_data.ksnd_peers[i],
1871                                           ksnp_list) {
1872
1873                         if (!((id.nid == LNET_NID_ANY ||
1874                                id.nid == peer_ni->ksnp_id.nid) &&
1875                               (id.pid == LNET_PID_ANY ||
1876                                id.pid == peer_ni->ksnp_id.pid)))
1877                                 continue;
1878
1879                         count += ksocknal_close_peer_conns_locked(
1880                                 peer_ni,
1881                                 ipaddr ? (struct sockaddr *)&sa : NULL, 0);
1882                 }
1883         }
1884
1885         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1886
1887         /* wildcards always succeed */
1888         if (id.nid == LNET_NID_ANY || id.pid == LNET_PID_ANY || ipaddr == 0)
1889                 return 0;
1890
1891         return (count == 0 ? -ENOENT : 0);
1892 }
1893
1894 void
1895 ksocknal_notify_gw_down(lnet_nid_t gw_nid)
1896 {
1897         /* The router is telling me she's been notified of a change in
1898          * gateway state....
1899          */
1900         struct lnet_process_id id = {
1901                 .nid    = gw_nid,
1902                 .pid    = LNET_PID_ANY,
1903         };
1904
1905         CDEBUG(D_NET, "gw %s down\n", libcfs_nid2str(gw_nid));
1906
1907         /* If the gateway crashed, close all open connections... */
1908         ksocknal_close_matching_conns(id, 0);
1909         return;
1910
1911         /* We can only establish new connections
1912          * if we have autroutes, and these connect on demand. */
1913 }
1914
1915 static void
1916 ksocknal_push_peer(struct ksock_peer_ni *peer_ni)
1917 {
1918         int index;
1919         int i;
1920         struct list_head *tmp;
1921         struct ksock_conn *conn;
1922
1923         for (index = 0; ; index++) {
1924                 read_lock(&ksocknal_data.ksnd_global_lock);
1925
1926                 i = 0;
1927                 conn = NULL;
1928
1929                 list_for_each(tmp, &peer_ni->ksnp_conns) {
1930                         if (i++ == index) {
1931                                 conn = list_entry(tmp, struct ksock_conn,
1932                                                   ksnc_list);
1933                                 ksocknal_conn_addref(conn);
1934                                 break;
1935                         }
1936                 }
1937
1938                 read_unlock(&ksocknal_data.ksnd_global_lock);
1939
1940                 if (conn == NULL)
1941                         break;
1942
1943                 ksocknal_lib_push_conn (conn);
1944                 ksocknal_conn_decref(conn);
1945         }
1946 }
1947
1948 static int
1949 ksocknal_push(struct lnet_ni *ni, struct lnet_process_id id)
1950 {
1951         int lo;
1952         int hi;
1953         int bkt;
1954         int rc = -ENOENT;
1955
1956         if (id.nid != LNET_NID_ANY) {
1957                 lo = hash_min(id.nid, HASH_BITS(ksocknal_data.ksnd_peers));
1958                 hi = lo;
1959         } else {
1960                 lo = 0;
1961                 hi = HASH_SIZE(ksocknal_data.ksnd_peers) - 1;
1962         }
1963
1964         for (bkt = lo; bkt <= hi; bkt++) {
1965                 int peer_off; /* searching offset in peer_ni hash table */
1966
1967                 for (peer_off = 0; ; peer_off++) {
1968                         struct ksock_peer_ni *peer_ni;
1969                         int           i = 0;
1970
1971                         read_lock(&ksocknal_data.ksnd_global_lock);
1972                         hlist_for_each_entry(peer_ni,
1973                                              &ksocknal_data.ksnd_peers[bkt],
1974                                              ksnp_list) {
1975                                 if (!((id.nid == LNET_NID_ANY ||
1976                                        id.nid == peer_ni->ksnp_id.nid) &&
1977                                       (id.pid == LNET_PID_ANY ||
1978                                        id.pid == peer_ni->ksnp_id.pid)))
1979                                         continue;
1980
1981                                 if (i++ == peer_off) {
1982                                         ksocknal_peer_addref(peer_ni);
1983                                         break;
1984                                 }
1985                         }
1986                         read_unlock(&ksocknal_data.ksnd_global_lock);
1987
1988                         if (i <= peer_off) /* no match */
1989                                 break;
1990
1991                         rc = 0;
1992                         ksocknal_push_peer(peer_ni);
1993                         ksocknal_peer_decref(peer_ni);
1994                 }
1995         }
1996         return rc;
1997 }
1998
1999 static int
2000 ksocknal_add_interface(struct lnet_ni *ni, __u32 ipaddress, __u32 netmask)
2001 {
2002         struct ksock_net *net = ni->ni_data;
2003         struct ksock_interface *iface;
2004         struct sockaddr_in sa = { .sin_family = AF_INET };
2005         int rc;
2006         int i;
2007         int j;
2008         struct ksock_peer_ni *peer_ni;
2009         struct list_head *rtmp;
2010         struct ksock_route *route;
2011
2012         if (ipaddress == 0 ||
2013             netmask == 0)
2014                 return -EINVAL;
2015
2016         write_lock_bh(&ksocknal_data.ksnd_global_lock);
2017
2018         sa.sin_addr.s_addr = htonl(ipaddress);
2019         iface = ksocknal_ip2iface(ni, (struct sockaddr *)&sa);
2020         if (iface != NULL) {
2021                 /* silently ignore dups */
2022                 rc = 0;
2023         } else if (net->ksnn_ninterfaces == LNET_INTERFACES_NUM) {
2024                 rc = -ENOSPC;
2025         } else {
2026                 iface = &net->ksnn_interfaces[net->ksnn_ninterfaces++];
2027
2028                 iface->ksni_index = ksocknal_ip2index((struct sockaddr *)&sa,
2029                                                       ni);
2030                 rpc_copy_addr((struct sockaddr *)&iface->ksni_addr,
2031                               (struct sockaddr *)&sa);
2032                 iface->ksni_netmask = netmask;
2033                 iface->ksni_nroutes = 0;
2034                 iface->ksni_npeers = 0;
2035
2036                 hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list) {
2037                         for (j = 0; j < peer_ni->ksnp_n_passive_ips; j++)
2038                                 if (peer_ni->ksnp_passive_ips[j] == ipaddress)
2039                                         iface->ksni_npeers++;
2040
2041                         list_for_each(rtmp, &peer_ni->ksnp_routes) {
2042                                 route = list_entry(rtmp,
2043                                                    struct ksock_route,
2044                                                    ksnr_list);
2045
2046                                 if (route->ksnr_myiface ==
2047                                             iface->ksni_index)
2048                                         iface->ksni_nroutes++;
2049                         }
2050                 }
2051
2052                 rc = 0;
2053                 /* NB only new connections will pay attention to the new
2054                  * interface!
2055                  */
2056         }
2057
2058         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
2059
2060         return rc;
2061 }
2062
2063 static void
2064 ksocknal_peer_del_interface_locked(struct ksock_peer_ni *peer_ni,
2065                                    __u32 ipaddr, int index)
2066 {
2067         struct ksock_route *route;
2068         struct ksock_route *rnxt;
2069         struct ksock_conn *conn;
2070         struct ksock_conn *cnxt;
2071         int i;
2072         int j;
2073
2074         for (i = 0; i < peer_ni->ksnp_n_passive_ips; i++)
2075                 if (peer_ni->ksnp_passive_ips[i] == ipaddr) {
2076                         for (j = i+1; j < peer_ni->ksnp_n_passive_ips; j++)
2077                                 peer_ni->ksnp_passive_ips[j-1] =
2078                                         peer_ni->ksnp_passive_ips[j];
2079                         peer_ni->ksnp_n_passive_ips--;
2080                         break;
2081                 }
2082
2083         list_for_each_entry_safe(route, rnxt, &peer_ni->ksnp_routes,
2084                                  ksnr_list) {
2085                 if (route->ksnr_myiface != index)
2086                         continue;
2087
2088                 if (route->ksnr_share_count != 0) {
2089                         /* Manually created; keep, but unbind */
2090                         route->ksnr_myiface = -1;
2091                 } else {
2092                         ksocknal_del_route_locked(route);
2093                 }
2094         }
2095
2096         list_for_each_entry_safe(conn, cnxt, &peer_ni->ksnp_conns, ksnc_list)
2097                 if (conn->ksnc_route->ksnr_myiface == index)
2098                         ksocknal_close_conn_locked (conn, 0);
2099 }
2100
2101 static int
2102 ksocknal_del_interface(struct lnet_ni *ni, __u32 ipaddress)
2103 {
2104         struct ksock_net *net = ni->ni_data;
2105         int rc = -ENOENT;
2106         struct hlist_node *nxt;
2107         struct ksock_peer_ni *peer_ni;
2108         u32 this_ip;
2109         struct sockaddr_in sa = {.sin_family = AF_INET };
2110         int index;
2111         int i;
2112         int j;
2113
2114         sa.sin_addr.s_addr = htonl(ipaddress);
2115         index = ksocknal_ip2index((struct sockaddr *)&sa, ni);
2116
2117         write_lock_bh(&ksocknal_data.ksnd_global_lock);
2118
2119         for (i = 0; i < net->ksnn_ninterfaces; i++) {
2120                 struct sockaddr_in *sa =
2121                         (void *)&net->ksnn_interfaces[i].ksni_addr;
2122
2123                 if (sa->sin_family != AF_INET)
2124                         continue;
2125                 this_ip = ntohl(sa->sin_addr.s_addr);
2126
2127                 if (!(ipaddress == 0 ||
2128                       ipaddress == this_ip))
2129                         continue;
2130
2131                 rc = 0;
2132
2133                 for (j = i+1; j < net->ksnn_ninterfaces; j++)
2134                         net->ksnn_interfaces[j-1] =
2135                                 net->ksnn_interfaces[j];
2136
2137                 net->ksnn_ninterfaces--;
2138
2139                 hash_for_each_safe(ksocknal_data.ksnd_peers, j,
2140                                    nxt, peer_ni, ksnp_list) {
2141                         if (peer_ni->ksnp_ni != ni)
2142                                 continue;
2143
2144                         ksocknal_peer_del_interface_locked(peer_ni,
2145                                                            this_ip, index);
2146                 }
2147         }
2148
2149         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
2150
2151         return rc;
2152 }
2153
2154 int
2155 ksocknal_ctl(struct lnet_ni *ni, unsigned int cmd, void *arg)
2156 {
2157         struct lnet_process_id id = {0};
2158         struct libcfs_ioctl_data *data = arg;
2159         int rc;
2160
2161         switch(cmd) {
2162         case IOC_LIBCFS_GET_INTERFACE: {
2163                 struct ksock_net *net = ni->ni_data;
2164                 struct ksock_interface *iface;
2165                 struct sockaddr_in *sa;
2166
2167                 read_lock(&ksocknal_data.ksnd_global_lock);
2168
2169                 if (data->ioc_count >= (__u32)net->ksnn_ninterfaces) {
2170                         rc = -ENOENT;
2171                 } else {
2172                         rc = 0;
2173                         iface = &net->ksnn_interfaces[data->ioc_count];
2174
2175                         sa = (void *)&iface->ksni_addr;
2176                         if (sa->sin_family == AF_INET)
2177                                 data->ioc_u32[0] = ntohl(sa->sin_addr.s_addr);
2178                         else
2179                                 data->ioc_u32[0] = 0xFFFFFFFF;
2180                         data->ioc_u32[1] = iface->ksni_netmask;
2181                         data->ioc_u32[2] = iface->ksni_npeers;
2182                         data->ioc_u32[3] = iface->ksni_nroutes;
2183                 }
2184
2185                 read_unlock(&ksocknal_data.ksnd_global_lock);
2186                 return rc;
2187         }
2188
2189         case IOC_LIBCFS_ADD_INTERFACE:
2190                 return ksocknal_add_interface(ni,
2191                                               data->ioc_u32[0], /* IP address */
2192                                               data->ioc_u32[1]); /* net mask */
2193
2194         case IOC_LIBCFS_DEL_INTERFACE:
2195                 return ksocknal_del_interface(ni,
2196                                               data->ioc_u32[0]); /* IP address */
2197
2198         case IOC_LIBCFS_GET_PEER: {
2199                 __u32            myip = 0;
2200                 __u32            ip = 0;
2201                 int              port = 0;
2202                 int              conn_count = 0;
2203                 int              share_count = 0;
2204
2205                 rc = ksocknal_get_peer_info(ni, data->ioc_count,
2206                                             &id, &myip, &ip, &port,
2207                                             &conn_count,  &share_count);
2208                 if (rc != 0)
2209                         return rc;
2210
2211                 data->ioc_nid    = id.nid;
2212                 data->ioc_count  = share_count;
2213                 data->ioc_u32[0] = ip;
2214                 data->ioc_u32[1] = port;
2215                 data->ioc_u32[2] = myip;
2216                 data->ioc_u32[3] = conn_count;
2217                 data->ioc_u32[4] = id.pid;
2218                 return 0;
2219         }
2220
2221         case IOC_LIBCFS_ADD_PEER: {
2222                 struct sockaddr_in sa = {.sin_family = AF_INET};
2223
2224                 id.nid = data->ioc_nid;
2225                 id.pid = LNET_PID_LUSTRE;
2226                 sa.sin_addr.s_addr = htonl(data->ioc_u32[0]);
2227                 sa.sin_port = htons(data->ioc_u32[1]);
2228                 return ksocknal_add_peer(ni, id, (struct sockaddr *)&sa);
2229         }
2230         case IOC_LIBCFS_DEL_PEER:
2231                 id.nid = data->ioc_nid;
2232                 id.pid = LNET_PID_ANY;
2233                 return ksocknal_del_peer (ni, id,
2234                                           data->ioc_u32[0]); /* IP */
2235
2236         case IOC_LIBCFS_GET_CONN: {
2237                 int           txmem;
2238                 int           rxmem;
2239                 int           nagle;
2240                 struct ksock_conn *conn = ksocknal_get_conn_by_idx(ni, data->ioc_count);
2241                 struct sockaddr_in *psa = (void *)&conn->ksnc_peeraddr;
2242                 struct sockaddr_in *mysa = (void *)&conn->ksnc_myaddr;
2243
2244                 if (conn == NULL)
2245                         return -ENOENT;
2246
2247                 ksocknal_lib_get_conn_tunables(conn, &txmem, &rxmem, &nagle);
2248
2249                 data->ioc_count  = txmem;
2250                 data->ioc_nid    = conn->ksnc_peer->ksnp_id.nid;
2251                 data->ioc_flags  = nagle;
2252                 if (psa->sin_family == AF_INET)
2253                         data->ioc_u32[0] = ntohl(psa->sin_addr.s_addr);
2254                 else
2255                         data->ioc_u32[0] = 0xFFFFFFFF;
2256                 data->ioc_u32[1] = rpc_get_port((struct sockaddr *)
2257                                                 &conn->ksnc_peeraddr);
2258                 if (mysa->sin_family == AF_INET)
2259                         data->ioc_u32[2] = ntohl(mysa->sin_addr.s_addr);
2260                 else
2261                         data->ioc_u32[2] = 0xFFFFFFFF;
2262                 data->ioc_u32[3] = conn->ksnc_type;
2263                 data->ioc_u32[4] = conn->ksnc_scheduler->kss_cpt;
2264                 data->ioc_u32[5] = rxmem;
2265                 data->ioc_u32[6] = conn->ksnc_peer->ksnp_id.pid;
2266                 ksocknal_conn_decref(conn);
2267                 return 0;
2268         }
2269
2270         case IOC_LIBCFS_CLOSE_CONNECTION:
2271                 id.nid = data->ioc_nid;
2272                 id.pid = LNET_PID_ANY;
2273                 return ksocknal_close_matching_conns (id,
2274                                                       data->ioc_u32[0]);
2275
2276         case IOC_LIBCFS_REGISTER_MYNID:
2277                 /* Ignore if this is a noop */
2278                 if (data->ioc_nid == ni->ni_nid)
2279                         return 0;
2280
2281                 CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID: %s(%s)\n",
2282                        libcfs_nid2str(data->ioc_nid),
2283                        libcfs_nid2str(ni->ni_nid));
2284                 return -EINVAL;
2285
2286         case IOC_LIBCFS_PUSH_CONNECTION:
2287                 id.nid = data->ioc_nid;
2288                 id.pid = LNET_PID_ANY;
2289                 return ksocknal_push(ni, id);
2290
2291         default:
2292                 return -EINVAL;
2293         }
2294         /* not reached */
2295 }
2296
2297 static void
2298 ksocknal_free_buffers (void)
2299 {
2300         LASSERT (atomic_read(&ksocknal_data.ksnd_nactive_txs) == 0);
2301
2302         if (ksocknal_data.ksnd_schedulers != NULL)
2303                 cfs_percpt_free(ksocknal_data.ksnd_schedulers);
2304
2305         spin_lock(&ksocknal_data.ksnd_tx_lock);
2306
2307         if (!list_empty(&ksocknal_data.ksnd_idle_noop_txs)) {
2308                 LIST_HEAD(zlist);
2309                 struct ksock_tx *tx;
2310
2311                 list_splice_init(&ksocknal_data.ksnd_idle_noop_txs, &zlist);
2312                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
2313
2314                 while (!list_empty(&zlist)) {
2315                         tx = list_entry(zlist.next, struct ksock_tx, tx_list);
2316                         list_del(&tx->tx_list);
2317                         LIBCFS_FREE(tx, tx->tx_desc_size);
2318                 }
2319         } else {
2320                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
2321         }
2322 }
2323
2324 static void
2325 ksocknal_base_shutdown(void)
2326 {
2327         struct ksock_sched *sched;
2328         struct ksock_peer_ni *peer_ni;
2329         int i;
2330
2331         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %lld\n",
2332                libcfs_kmem_read());
2333         LASSERT (ksocknal_data.ksnd_nnets == 0);
2334
2335         switch (ksocknal_data.ksnd_init) {
2336         default:
2337                 LASSERT(0);
2338                 /* fallthrough */
2339
2340         case SOCKNAL_INIT_ALL:
2341         case SOCKNAL_INIT_DATA:
2342                 hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list)
2343                         LASSERT(0);
2344
2345                 LASSERT(list_empty(&ksocknal_data.ksnd_nets));
2346                 LASSERT(list_empty(&ksocknal_data.ksnd_enomem_conns));
2347                 LASSERT(list_empty(&ksocknal_data.ksnd_zombie_conns));
2348                 LASSERT(list_empty(&ksocknal_data.ksnd_connd_connreqs));
2349                 LASSERT(list_empty(&ksocknal_data.ksnd_connd_routes));
2350
2351                 if (ksocknal_data.ksnd_schedulers != NULL) {
2352                         cfs_percpt_for_each(sched, i,
2353                                             ksocknal_data.ksnd_schedulers) {
2354
2355                                 LASSERT(list_empty(&sched->kss_tx_conns));
2356                                 LASSERT(list_empty(&sched->kss_rx_conns));
2357                                 LASSERT(list_empty(&sched->kss_zombie_noop_txs));
2358                                 LASSERT(sched->kss_nconns == 0);
2359                         }
2360                 }
2361
2362                 /* flag threads to terminate; wake and wait for them to die */
2363                 ksocknal_data.ksnd_shuttingdown = 1;
2364                 wake_up_all(&ksocknal_data.ksnd_connd_waitq);
2365                 wake_up_all(&ksocknal_data.ksnd_reaper_waitq);
2366
2367                 if (ksocknal_data.ksnd_schedulers != NULL) {
2368                         cfs_percpt_for_each(sched, i,
2369                                             ksocknal_data.ksnd_schedulers)
2370                                         wake_up_all(&sched->kss_waitq);
2371                 }
2372
2373                 wait_var_event_warning(&ksocknal_data.ksnd_nthreads,
2374                                        atomic_read(&ksocknal_data.ksnd_nthreads) == 0,
2375                                        "waiting for %d threads to terminate\n",
2376                                        atomic_read(&ksocknal_data.ksnd_nthreads));
2377
2378                 ksocknal_free_buffers();
2379
2380                 ksocknal_data.ksnd_init = SOCKNAL_INIT_NOTHING;
2381                 break;
2382         }
2383
2384         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %lld\n",
2385                libcfs_kmem_read());
2386
2387         module_put(THIS_MODULE);
2388 }
2389
2390 static int
2391 ksocknal_base_startup(void)
2392 {
2393         struct ksock_sched *sched;
2394         int rc;
2395         int i;
2396
2397         LASSERT(ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
2398         LASSERT(ksocknal_data.ksnd_nnets == 0);
2399
2400         memset(&ksocknal_data, 0, sizeof(ksocknal_data)); /* zero pointers */
2401
2402         hash_init(ksocknal_data.ksnd_peers);
2403
2404         rwlock_init(&ksocknal_data.ksnd_global_lock);
2405         INIT_LIST_HEAD(&ksocknal_data.ksnd_nets);
2406
2407         spin_lock_init(&ksocknal_data.ksnd_reaper_lock);
2408         INIT_LIST_HEAD(&ksocknal_data.ksnd_enomem_conns);
2409         INIT_LIST_HEAD(&ksocknal_data.ksnd_zombie_conns);
2410         INIT_LIST_HEAD(&ksocknal_data.ksnd_deathrow_conns);
2411         init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
2412
2413         spin_lock_init(&ksocknal_data.ksnd_connd_lock);
2414         INIT_LIST_HEAD(&ksocknal_data.ksnd_connd_connreqs);
2415         INIT_LIST_HEAD(&ksocknal_data.ksnd_connd_routes);
2416         init_waitqueue_head(&ksocknal_data.ksnd_connd_waitq);
2417
2418         spin_lock_init(&ksocknal_data.ksnd_tx_lock);
2419         INIT_LIST_HEAD(&ksocknal_data.ksnd_idle_noop_txs);
2420
2421         /* NB memset above zeros whole of ksocknal_data */
2422
2423         /* flag lists/ptrs/locks initialised */
2424         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
2425         if (!try_module_get(THIS_MODULE))
2426                 goto failed;
2427
2428         /* Create a scheduler block per available CPT */
2429         ksocknal_data.ksnd_schedulers = cfs_percpt_alloc(lnet_cpt_table(),
2430                                                          sizeof(*sched));
2431         if (ksocknal_data.ksnd_schedulers == NULL)
2432                 goto failed;
2433
2434         cfs_percpt_for_each(sched, i, ksocknal_data.ksnd_schedulers) {
2435                 int nthrs;
2436
2437                 /*
2438                  * make sure not to allocate more threads than there are
2439                  * cores/CPUs in teh CPT
2440                  */
2441                 nthrs = cfs_cpt_weight(lnet_cpt_table(), i);
2442                 if (*ksocknal_tunables.ksnd_nscheds > 0) {
2443                         nthrs = min(nthrs, *ksocknal_tunables.ksnd_nscheds);
2444                 } else {
2445                         /*
2446                          * max to half of CPUs, assume another half should be
2447                          * reserved for upper layer modules
2448                          */
2449                         nthrs = min(max(SOCKNAL_NSCHEDS, nthrs >> 1), nthrs);
2450                 }
2451
2452                 sched->kss_nthreads_max = nthrs;
2453                 sched->kss_cpt = i;
2454
2455                 spin_lock_init(&sched->kss_lock);
2456                 INIT_LIST_HEAD(&sched->kss_rx_conns);
2457                 INIT_LIST_HEAD(&sched->kss_tx_conns);
2458                 INIT_LIST_HEAD(&sched->kss_zombie_noop_txs);
2459                 init_waitqueue_head(&sched->kss_waitq);
2460         }
2461
2462         ksocknal_data.ksnd_connd_starting         = 0;
2463         ksocknal_data.ksnd_connd_failed_stamp     = 0;
2464         ksocknal_data.ksnd_connd_starting_stamp   = ktime_get_real_seconds();
2465         /* must have at least 2 connds to remain responsive to accepts while
2466          * connecting */
2467         if (*ksocknal_tunables.ksnd_nconnds < SOCKNAL_CONND_RESV + 1)
2468                 *ksocknal_tunables.ksnd_nconnds = SOCKNAL_CONND_RESV + 1;
2469
2470         if (*ksocknal_tunables.ksnd_nconnds_max <
2471             *ksocknal_tunables.ksnd_nconnds) {
2472                 ksocknal_tunables.ksnd_nconnds_max =
2473                         ksocknal_tunables.ksnd_nconnds;
2474         }
2475
2476         for (i = 0; i < *ksocknal_tunables.ksnd_nconnds; i++) {
2477                 char name[16];
2478                 spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
2479                 ksocknal_data.ksnd_connd_starting++;
2480                 spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
2481
2482
2483                 snprintf(name, sizeof(name), "socknal_cd%02d", i);
2484                 rc = ksocknal_thread_start(ksocknal_connd,
2485                                            (void *)((uintptr_t)i), name);
2486                 if (rc != 0) {
2487                         spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
2488                         ksocknal_data.ksnd_connd_starting--;
2489                         spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
2490                         CERROR("Can't spawn socknal connd: %d\n", rc);
2491                         goto failed;
2492                 }
2493         }
2494
2495         rc = ksocknal_thread_start(ksocknal_reaper, NULL, "socknal_reaper");
2496         if (rc != 0) {
2497                 CERROR ("Can't spawn socknal reaper: %d\n", rc);
2498                 goto failed;
2499         }
2500
2501         /* flag everything initialised */
2502         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
2503
2504         return 0;
2505
2506  failed:
2507         ksocknal_base_shutdown();
2508         return -ENETDOWN;
2509 }
2510
2511 static int
2512 ksocknal_debug_peerhash(struct lnet_ni *ni)
2513 {
2514         struct ksock_peer_ni *peer_ni;
2515         int i;
2516
2517         read_lock(&ksocknal_data.ksnd_global_lock);
2518
2519         hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list) {
2520                 struct ksock_route *route;
2521                 struct ksock_conn *conn;
2522
2523                 if (peer_ni->ksnp_ni != ni)
2524                         continue;
2525
2526                 CWARN("Active peer_ni on shutdown: %s, ref %d, "
2527                       "closing %d, accepting %d, err %d, zcookie %llu, "
2528                       "txq %d, zc_req %d\n", libcfs_id2str(peer_ni->ksnp_id),
2529                       refcount_read(&peer_ni->ksnp_refcount),
2530                       peer_ni->ksnp_closing,
2531                       peer_ni->ksnp_accepting, peer_ni->ksnp_error,
2532                       peer_ni->ksnp_zc_next_cookie,
2533                       !list_empty(&peer_ni->ksnp_tx_queue),
2534                       !list_empty(&peer_ni->ksnp_zc_req_list));
2535
2536                 list_for_each_entry(route, &peer_ni->ksnp_routes, ksnr_list) {
2537                         CWARN("Route: ref %d, schd %d, conn %d, cnted %d, del %d\n",
2538                               refcount_read(&route->ksnr_refcount),
2539                               route->ksnr_scheduled, route->ksnr_connecting,
2540                               route->ksnr_connected, route->ksnr_deleted);
2541                 }
2542
2543                 list_for_each_entry(conn, &peer_ni->ksnp_conns, ksnc_list) {
2544                         CWARN("Conn: ref %d, sref %d, t %d, c %d\n",
2545                               refcount_read(&conn->ksnc_conn_refcount),
2546                               refcount_read(&conn->ksnc_sock_refcount),
2547                               conn->ksnc_type, conn->ksnc_closing);
2548                 }
2549                 break;
2550         }
2551
2552         read_unlock(&ksocknal_data.ksnd_global_lock);
2553         return 0;
2554 }
2555
2556 void
2557 ksocknal_shutdown(struct lnet_ni *ni)
2558 {
2559         struct ksock_net *net = ni->ni_data;
2560         struct lnet_process_id anyid = {
2561                 .nid = LNET_NID_ANY,
2562                 .pid = LNET_PID_ANY,
2563         };
2564         int i;
2565
2566         LASSERT(ksocknal_data.ksnd_init == SOCKNAL_INIT_ALL);
2567         LASSERT(ksocknal_data.ksnd_nnets > 0);
2568
2569         /* prevent new peers */
2570         atomic_add(SOCKNAL_SHUTDOWN_BIAS, &net->ksnn_npeers);
2571
2572         /* Delete all peers */
2573         ksocknal_del_peer(ni, anyid, 0);
2574
2575         /* Wait for all peer_ni state to clean up */
2576         wait_var_event_warning(&net->ksnn_npeers,
2577                                atomic_read(&net->ksnn_npeers) ==
2578                                SOCKNAL_SHUTDOWN_BIAS,
2579                                "waiting for %d peers to disconnect\n",
2580                                ksocknal_debug_peerhash(ni) +
2581                                atomic_read(&net->ksnn_npeers) -
2582                                SOCKNAL_SHUTDOWN_BIAS);
2583
2584         for (i = 0; i < net->ksnn_ninterfaces; i++) {
2585                 LASSERT(net->ksnn_interfaces[i].ksni_npeers == 0);
2586                 LASSERT(net->ksnn_interfaces[i].ksni_nroutes == 0);
2587         }
2588
2589         list_del(&net->ksnn_list);
2590         LIBCFS_FREE(net, sizeof(*net));
2591
2592         ksocknal_data.ksnd_nnets--;
2593         if (ksocknal_data.ksnd_nnets == 0)
2594                 ksocknal_base_shutdown();
2595 }
2596
2597 static int
2598 ksocknal_search_new_ipif(struct ksock_net *net)
2599 {
2600         int new_ipif = 0;
2601         int i;
2602
2603         for (i = 0; i < net->ksnn_ninterfaces; i++) {
2604                 char *ifnam = &net->ksnn_interfaces[i].ksni_name[0];
2605                 char *colon = strchr(ifnam, ':');
2606                 bool found  = false;
2607                 struct ksock_net *tmp;
2608                 int j;
2609
2610                 if (colon != NULL) /* ignore alias device */
2611                         *colon = 0;
2612
2613                 list_for_each_entry(tmp, &ksocknal_data.ksnd_nets,
2614                                     ksnn_list) {
2615                         for (j = 0; !found && j < tmp->ksnn_ninterfaces; j++) {
2616                                 char *ifnam2 = &tmp->ksnn_interfaces[j].\
2617                                         ksni_name[0];
2618                                 char *colon2 = strchr(ifnam2, ':');
2619
2620                                 if (colon2 != NULL)
2621                                         *colon2 = 0;
2622
2623                                 found = strcmp(ifnam, ifnam2) == 0;
2624                                 if (colon2 != NULL)
2625                                         *colon2 = ':';
2626                         }
2627                         if (found)
2628                                 break;
2629                 }
2630
2631                 new_ipif += !found;
2632                 if (colon != NULL)
2633                         *colon = ':';
2634         }
2635
2636         return new_ipif;
2637 }
2638
2639 static int
2640 ksocknal_start_schedulers(struct ksock_sched *sched)
2641 {
2642         int     nthrs;
2643         int     rc = 0;
2644         int     i;
2645
2646         if (sched->kss_nthreads == 0) {
2647                 if (*ksocknal_tunables.ksnd_nscheds > 0) {
2648                         nthrs = sched->kss_nthreads_max;
2649                 } else {
2650                         nthrs = cfs_cpt_weight(lnet_cpt_table(),
2651                                                sched->kss_cpt);
2652                         nthrs = min(max(SOCKNAL_NSCHEDS, nthrs >> 1), nthrs);
2653                         nthrs = min(SOCKNAL_NSCHEDS_HIGH, nthrs);
2654                 }
2655                 nthrs = min(nthrs, sched->kss_nthreads_max);
2656         } else {
2657                 LASSERT(sched->kss_nthreads <= sched->kss_nthreads_max);
2658                 /* increase two threads if there is new interface */
2659                 nthrs = min(2, sched->kss_nthreads_max - sched->kss_nthreads);
2660         }
2661
2662         for (i = 0; i < nthrs; i++) {
2663                 long id;
2664                 char name[20];
2665
2666                 id = KSOCK_THREAD_ID(sched->kss_cpt, sched->kss_nthreads + i);
2667                 snprintf(name, sizeof(name), "socknal_sd%02d_%02d",
2668                          sched->kss_cpt, (int)KSOCK_THREAD_SID(id));
2669
2670                 rc = ksocknal_thread_start(ksocknal_scheduler,
2671                                            (void *)id, name);
2672                 if (rc == 0)
2673                         continue;
2674
2675                 CERROR("Can't spawn thread %d for scheduler[%d]: %d\n",
2676                        sched->kss_cpt, (int) KSOCK_THREAD_SID(id), rc);
2677                 break;
2678         }
2679
2680         sched->kss_nthreads += i;
2681         return rc;
2682 }
2683
2684 static int
2685 ksocknal_net_start_threads(struct ksock_net *net, __u32 *cpts, int ncpts)
2686 {
2687         int newif = ksocknal_search_new_ipif(net);
2688         int rc;
2689         int i;
2690
2691         if (ncpts > 0 && ncpts > cfs_cpt_number(lnet_cpt_table()))
2692                 return -EINVAL;
2693
2694         for (i = 0; i < ncpts; i++) {
2695                 struct ksock_sched *sched;
2696                 int cpt = (cpts == NULL) ? i : cpts[i];
2697
2698                 LASSERT(cpt < cfs_cpt_number(lnet_cpt_table()));
2699                 sched = ksocknal_data.ksnd_schedulers[cpt];
2700
2701                 if (!newif && sched->kss_nthreads > 0)
2702                         continue;
2703
2704                 rc = ksocknal_start_schedulers(sched);
2705                 if (rc != 0)
2706                         return rc;
2707         }
2708         return 0;
2709 }
2710
2711 int
2712 ksocknal_startup(struct lnet_ni *ni)
2713 {
2714         struct ksock_net *net;
2715         struct lnet_ioctl_config_lnd_cmn_tunables *net_tunables;
2716         struct ksock_interface *ksi = NULL;
2717         struct lnet_inetdev *ifaces = NULL;
2718         int i = 0;
2719         int rc;
2720
2721         LASSERT (ni->ni_net->net_lnd == &the_ksocklnd);
2722
2723         if (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING) {
2724                 rc = ksocknal_base_startup();
2725                 if (rc != 0)
2726                         return rc;
2727         }
2728
2729         LIBCFS_ALLOC(net, sizeof(*net));
2730         if (net == NULL)
2731                 goto fail_0;
2732
2733         net->ksnn_incarnation = ktime_get_real_ns();
2734         ni->ni_data = net;
2735         net_tunables = &ni->ni_net->net_tunables;
2736
2737         if (net_tunables->lct_peer_timeout == -1)
2738                 net_tunables->lct_peer_timeout =
2739                         *ksocknal_tunables.ksnd_peertimeout;
2740
2741         if (net_tunables->lct_max_tx_credits == -1)
2742                 net_tunables->lct_max_tx_credits =
2743                         *ksocknal_tunables.ksnd_credits;
2744
2745         if (net_tunables->lct_peer_tx_credits == -1)
2746                 net_tunables->lct_peer_tx_credits =
2747                         *ksocknal_tunables.ksnd_peertxcredits;
2748
2749         if (net_tunables->lct_peer_tx_credits >
2750             net_tunables->lct_max_tx_credits)
2751                 net_tunables->lct_peer_tx_credits =
2752                         net_tunables->lct_max_tx_credits;
2753
2754         if (net_tunables->lct_peer_rtr_credits == -1)
2755                 net_tunables->lct_peer_rtr_credits =
2756                         *ksocknal_tunables.ksnd_peerrtrcredits;
2757
2758         rc = lnet_inet_enumerate(&ifaces, ni->ni_net_ns);
2759         if (rc < 0)
2760                 goto fail_1;
2761
2762         if (!ni->ni_interfaces[0]) {
2763                 struct sockaddr_in *sa;
2764
2765                 ksi = &net->ksnn_interfaces[0];
2766                 sa = (void *)&ksi->ksni_addr;
2767
2768                 /* Use the first discovered interface */
2769                 net->ksnn_ninterfaces = 1;
2770                 ni->ni_dev_cpt = ifaces[0].li_cpt;
2771                 memset(sa, 0, sizeof(*sa));
2772                 sa->sin_family = AF_INET;
2773                 sa->sin_addr.s_addr = htonl(ifaces[0].li_ipaddr);
2774                 ksi->ksni_index = ksocknal_ip2index((struct sockaddr *)sa, ni);
2775                 ksi->ksni_netmask = ifaces[0].li_netmask;
2776                 strlcpy(ksi->ksni_name, ifaces[0].li_name,
2777                         sizeof(ksi->ksni_name));
2778         } else {
2779                 /* Before Multi-Rail ksocklnd would manage
2780                  * multiple interfaces with its own tcp bonding.
2781                  * If we encounter an old configuration using
2782                  * this tcp bonding approach then we need to
2783                  * handle more than one ni_interfaces.
2784                  *
2785                  * In Multi-Rail configuration only ONE ni_interface
2786                  * should exist. Each IP alias should be mapped to
2787                  * each 'struct net_ni'.
2788                  */
2789                 for (i = 0; i < LNET_INTERFACES_NUM; i++) {
2790                         int j;
2791
2792                         if (!ni->ni_interfaces[i])
2793                                 break;
2794
2795                         for (j = 0; j < LNET_INTERFACES_NUM;  j++) {
2796                                 if (i != j && ni->ni_interfaces[j] &&
2797                                     strcmp(ni->ni_interfaces[i],
2798                                            ni->ni_interfaces[j]) == 0) {
2799                                         rc = -EEXIST;
2800                                         CERROR("ksocklnd: found duplicate %s at %d and %d, rc = %d\n",
2801                                                ni->ni_interfaces[i], i, j, rc);
2802                                         goto fail_1;
2803                                 }
2804                         }
2805
2806                         for (j = 0; j < rc; j++) {
2807                                 struct sockaddr_in *sa;
2808
2809                                 if (strcmp(ifaces[j].li_name,
2810                                            ni->ni_interfaces[i]) != 0)
2811                                         continue;
2812
2813                                 ksi =
2814                                   &net->ksnn_interfaces[net->ksnn_ninterfaces];
2815                                 sa = (void *)&ksi->ksni_addr;
2816                                 ni->ni_dev_cpt = ifaces[j].li_cpt;
2817                                 memset(sa, 0, sizeof(*sa));
2818                                 sa->sin_family = AF_INET;
2819                                 sa->sin_addr.s_addr =
2820                                         htonl(ifaces[j].li_ipaddr);
2821                                 ksi->ksni_index = ksocknal_ip2index(
2822                                         (struct sockaddr *)sa, ni);
2823                                 ksi->ksni_netmask = ifaces[j].li_netmask;
2824                                 strlcpy(ksi->ksni_name, ifaces[j].li_name,
2825                                         sizeof(ksi->ksni_name));
2826                                 net->ksnn_ninterfaces++;
2827                                 break;
2828                         }
2829                 }
2830                 /* ni_interfaces don't map to all network interfaces */
2831                 if (!ksi || net->ksnn_ninterfaces != i) {
2832                         CERROR("ksocklnd: requested %d but only %d interfaces found\n",
2833                                i, net->ksnn_ninterfaces);
2834                         goto fail_1;
2835                 }
2836         }
2837
2838         /* call it before add it to ksocknal_data.ksnd_nets */
2839         rc = ksocknal_net_start_threads(net, ni->ni_cpts, ni->ni_ncpts);
2840         if (rc != 0)
2841                 goto fail_1;
2842
2843         LASSERT(ksi);
2844         LASSERT(ksi->ksni_addr.ss_family == AF_INET);
2845         ni->ni_nid = LNET_MKNID(
2846                 LNET_NIDNET(ni->ni_nid),
2847                 ntohl(((struct sockaddr_in *)
2848                        &ksi->ksni_addr)->sin_addr.s_addr));
2849         list_add(&net->ksnn_list, &ksocknal_data.ksnd_nets);
2850
2851         ksocknal_data.ksnd_nnets++;
2852
2853         return 0;
2854
2855 fail_1:
2856         LIBCFS_FREE(net, sizeof(*net));
2857 fail_0:
2858         if (ksocknal_data.ksnd_nnets == 0)
2859                 ksocknal_base_shutdown();
2860
2861         return -ENETDOWN;
2862 }
2863
2864
2865 static void __exit ksocklnd_exit(void)
2866 {
2867         lnet_unregister_lnd(&the_ksocklnd);
2868 }
2869
2870 static const struct lnet_lnd the_ksocklnd = {
2871         .lnd_type               = SOCKLND,
2872         .lnd_startup            = ksocknal_startup,
2873         .lnd_shutdown           = ksocknal_shutdown,
2874         .lnd_ctl                = ksocknal_ctl,
2875         .lnd_send               = ksocknal_send,
2876         .lnd_recv               = ksocknal_recv,
2877         .lnd_notify_peer_down   = ksocknal_notify_gw_down,
2878         .lnd_accept             = ksocknal_accept,
2879 };
2880
2881 static int __init ksocklnd_init(void)
2882 {
2883         int rc;
2884
2885         /* check ksnr_connected/connecting field large enough */
2886         BUILD_BUG_ON(SOCKLND_CONN_NTYPES > 4);
2887         BUILD_BUG_ON(SOCKLND_CONN_ACK != SOCKLND_CONN_BULK_IN);
2888
2889         rc = ksocknal_tunables_init();
2890         if (rc != 0)
2891                 return rc;
2892
2893         lnet_register_lnd(&the_ksocklnd);
2894
2895         return 0;
2896 }
2897
2898 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2899 MODULE_DESCRIPTION("TCP Socket LNet Network Driver");
2900 MODULE_VERSION("2.8.0");
2901 MODULE_LICENSE("GPL");
2902
2903 module_init(ksocklnd_init);
2904 module_exit(ksocklnd_exit);