Whamcloud - gitweb
LU-10391 socklnd: support IPv6 in ksocknal_ip2index()
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lnet/klnds/socklnd/socklnd.c
32  *
33  * Author: Zach Brown <zab@zabbo.net>
34  * Author: Peter J. Braam <braam@clusterfs.com>
35  * Author: Phil Schwan <phil@clusterfs.com>
36  * Author: Eric Barton <eric@bartonsoftware.com>
37  */
38
39 #include <linux/ethtool.h>
40 #include <linux/inetdevice.h>
41 #include "socklnd.h"
42 #include <linux/sunrpc/addr.h>
43 #include <net/addrconf.h>
44
45 static const struct lnet_lnd the_ksocklnd;
46 struct ksock_nal_data ksocknal_data;
47
48 static struct ksock_interface *
49 ksocknal_ip2iface(struct lnet_ni *ni, struct sockaddr *addr)
50 {
51         struct ksock_net *net = ni->ni_data;
52         struct ksock_interface *iface;
53
54         iface = &net->ksnn_interface;
55
56         if (rpc_cmp_addr((struct sockaddr *)&iface->ksni_addr, addr))
57                 return iface;
58
59         return NULL;
60 }
61
62 static struct ksock_interface *
63 ksocknal_index2iface(struct lnet_ni *ni, int index)
64 {
65         struct ksock_net *net = ni->ni_data;
66         struct ksock_interface *iface;
67
68         iface = &net->ksnn_interface;
69
70         if (iface->ksni_index == index)
71                 return iface;
72
73         return NULL;
74 }
75
76 static int ksocknal_ip2index(struct sockaddr *addr, struct lnet_ni *ni)
77 {
78         struct net_device *dev;
79         int ret = -1;
80         DECLARE_CONST_IN_IFADDR(ifa);
81
82         if (addr->sa_family != AF_INET && addr->sa_family != AF_INET6)
83                 return ret;
84
85         rcu_read_lock();
86         for_each_netdev(ni->ni_net_ns, dev) {
87                 int flags = dev_get_flags(dev);
88                 struct in_device *in_dev;
89
90                 if (flags & IFF_LOOPBACK) /* skip the loopback IF */
91                         continue;
92
93                 if (!(flags & IFF_UP))
94                         continue;
95
96                 switch (addr->sa_family) {
97                 case AF_INET:
98                         in_dev = __in_dev_get_rcu(dev);
99                         if (!in_dev)
100                                 continue;
101
102                         in_dev_for_each_ifa_rcu(ifa, in_dev) {
103                                 if (ifa->ifa_local ==
104                                     ((struct sockaddr_in *)addr)->sin_addr.s_addr)
105                                         ret = dev->ifindex;
106                         }
107                         endfor_ifa(in_dev);
108                         break;
109 #if IS_ENABLED(CONFIG_IPV6)
110                 case AF_INET6: {
111                         struct inet6_dev *in6_dev;
112                         const struct inet6_ifaddr *ifa6;
113                         struct sockaddr_in6 *addr6 = (struct sockaddr_in6*)addr;
114
115                         in6_dev = __in6_dev_get(dev);
116                         if (!in6_dev)
117                                 continue;
118
119                         list_for_each_entry_rcu(ifa6, &in6_dev->addr_list, if_list) {
120                                 if (ipv6_addr_cmp(&ifa6->addr,
121                                                  &addr6->sin6_addr) == 0)
122                                         ret = dev->ifindex;
123                         }
124                         break;
125                         }
126 #endif /* IS_ENABLED(CONFIG_IPV6) */
127                 }
128                 if (ret >= 0)
129                         break;
130         }
131         rcu_read_unlock();
132
133         return ret;
134 }
135
136 static struct ksock_conn_cb *
137 ksocknal_create_conn_cb(struct sockaddr *addr)
138 {
139         struct ksock_conn_cb *conn_cb;
140
141         LIBCFS_ALLOC(conn_cb, sizeof(*conn_cb));
142         if (!conn_cb)
143                 return NULL;
144
145         refcount_set(&conn_cb->ksnr_refcount, 1);
146         conn_cb->ksnr_peer = NULL;
147         conn_cb->ksnr_retry_interval = 0;         /* OK to connect at any time */
148         rpc_copy_addr((struct sockaddr *)&conn_cb->ksnr_addr, addr);
149         rpc_set_port((struct sockaddr *)&conn_cb->ksnr_addr,
150                      rpc_get_port(addr));
151         conn_cb->ksnr_myiface = -1;
152         conn_cb->ksnr_scheduled = 0;
153         conn_cb->ksnr_connecting = 0;
154         conn_cb->ksnr_connected = 0;
155         conn_cb->ksnr_deleted = 0;
156         conn_cb->ksnr_conn_count = 0;
157         conn_cb->ksnr_ctrl_conn_count = 0;
158         conn_cb->ksnr_blki_conn_count = 0;
159         conn_cb->ksnr_blko_conn_count = 0;
160         conn_cb->ksnr_max_conns = 0;
161
162         return conn_cb;
163 }
164
165 void
166 ksocknal_destroy_conn_cb(struct ksock_conn_cb *conn_cb)
167 {
168         LASSERT(refcount_read(&conn_cb->ksnr_refcount) == 0);
169
170         if (conn_cb->ksnr_peer)
171                 ksocknal_peer_decref(conn_cb->ksnr_peer);
172
173         LIBCFS_FREE(conn_cb, sizeof(*conn_cb));
174 }
175
176 static struct ksock_peer_ni *
177 ksocknal_create_peer(struct lnet_ni *ni, struct lnet_processid *id)
178 {
179         int cpt = lnet_nid2cpt(&id->nid, ni);
180         struct ksock_net *net = ni->ni_data;
181         struct ksock_peer_ni *peer_ni;
182
183         LASSERT(!LNET_NID_IS_ANY(&id->nid));
184         LASSERT(id->pid != LNET_PID_ANY);
185         LASSERT(!in_interrupt());
186
187         if (!atomic_inc_unless_negative(&net->ksnn_npeers)) {
188                 CERROR("Can't create peer_ni: network shutdown\n");
189                 return ERR_PTR(-ESHUTDOWN);
190         }
191
192         LIBCFS_CPT_ALLOC(peer_ni, lnet_cpt_table(), cpt, sizeof(*peer_ni));
193         if (!peer_ni) {
194                 atomic_dec(&net->ksnn_npeers);
195                 return ERR_PTR(-ENOMEM);
196         }
197
198         peer_ni->ksnp_ni = ni;
199         peer_ni->ksnp_id = *id;
200         refcount_set(&peer_ni->ksnp_refcount, 1); /* 1 ref for caller */
201         peer_ni->ksnp_closing = 0;
202         peer_ni->ksnp_accepting = 0;
203         peer_ni->ksnp_proto = NULL;
204         peer_ni->ksnp_last_alive = 0;
205         peer_ni->ksnp_zc_next_cookie = SOCKNAL_KEEPALIVE_PING + 1;
206         peer_ni->ksnp_conn_cb = NULL;
207
208         INIT_LIST_HEAD(&peer_ni->ksnp_conns);
209         INIT_LIST_HEAD(&peer_ni->ksnp_tx_queue);
210         INIT_LIST_HEAD(&peer_ni->ksnp_zc_req_list);
211         spin_lock_init(&peer_ni->ksnp_lock);
212
213         return peer_ni;
214 }
215
216 void
217 ksocknal_destroy_peer(struct ksock_peer_ni *peer_ni)
218 {
219         struct ksock_net *net = peer_ni->ksnp_ni->ni_data;
220
221         CDEBUG (D_NET, "peer_ni %s %p deleted\n",
222                 libcfs_idstr(&peer_ni->ksnp_id), peer_ni);
223
224         LASSERT(refcount_read(&peer_ni->ksnp_refcount) == 0);
225         LASSERT(peer_ni->ksnp_accepting == 0);
226         LASSERT(list_empty(&peer_ni->ksnp_conns));
227         LASSERT(peer_ni->ksnp_conn_cb == NULL);
228         LASSERT(list_empty(&peer_ni->ksnp_tx_queue));
229         LASSERT(list_empty(&peer_ni->ksnp_zc_req_list));
230
231         LIBCFS_FREE(peer_ni, sizeof(*peer_ni));
232
233         /* NB a peer_ni's connections and conn_cb keep a reference on their
234          * peer_ni until they are destroyed, so we can be assured that _all_
235          * state to do with this peer_ni has been cleaned up when its refcount
236          * drops to zero.
237          */
238         if (atomic_dec_and_test(&net->ksnn_npeers))
239                 wake_up_var(&net->ksnn_npeers);
240 }
241
242 struct ksock_peer_ni *
243 ksocknal_find_peer_locked(struct lnet_ni *ni, struct lnet_processid *id)
244 {
245         struct ksock_peer_ni *peer_ni;
246         unsigned long hash = nidhash(&id->nid);
247
248         hash_for_each_possible(ksocknal_data.ksnd_peers, peer_ni,
249                                ksnp_list, hash) {
250                 LASSERT(!peer_ni->ksnp_closing);
251
252                 if (peer_ni->ksnp_ni != ni)
253                         continue;
254
255                 if (!nid_same(&peer_ni->ksnp_id.nid, &id->nid) ||
256                     peer_ni->ksnp_id.pid != id->pid)
257                         continue;
258
259                 CDEBUG(D_NET, "got peer_ni [%p] -> %s (%d)\n",
260                        peer_ni, libcfs_idstr(id),
261                        refcount_read(&peer_ni->ksnp_refcount));
262                 return peer_ni;
263         }
264         return NULL;
265 }
266
267 struct ksock_peer_ni *
268 ksocknal_find_peer(struct lnet_ni *ni, struct lnet_processid *id)
269 {
270         struct ksock_peer_ni *peer_ni;
271
272         read_lock(&ksocknal_data.ksnd_global_lock);
273         peer_ni = ksocknal_find_peer_locked(ni, id);
274         if (peer_ni != NULL)                    /* +1 ref for caller? */
275                 ksocknal_peer_addref(peer_ni);
276         read_unlock(&ksocknal_data.ksnd_global_lock);
277
278         return peer_ni;
279 }
280
281 static void
282 ksocknal_unlink_peer_locked(struct ksock_peer_ni *peer_ni)
283 {
284         int i;
285         struct ksock_interface *iface;
286
287         for (i = 0; i < peer_ni->ksnp_n_passive_ips; i++) {
288                 struct sockaddr_in sa = { .sin_family = AF_INET };
289                 LASSERT(i < LNET_INTERFACES_NUM);
290                 sa.sin_addr.s_addr = htonl(peer_ni->ksnp_passive_ips[i]);
291
292                 iface = ksocknal_ip2iface(peer_ni->ksnp_ni,
293                                           (struct sockaddr *)&sa);
294                 /*
295                  * All IPs in peer_ni->ksnp_passive_ips[] come from the
296                  * interface list, therefore the call must succeed.
297                  */
298                 LASSERT(iface != NULL);
299
300                 CDEBUG(D_NET, "peer_ni=%p iface=%p ksni_nroutes=%d\n",
301                        peer_ni, iface, iface->ksni_nroutes);
302                 iface->ksni_npeers--;
303         }
304
305         LASSERT(list_empty(&peer_ni->ksnp_conns));
306         LASSERT(peer_ni->ksnp_conn_cb == NULL);
307         LASSERT(!peer_ni->ksnp_closing);
308         peer_ni->ksnp_closing = 1;
309         hlist_del(&peer_ni->ksnp_list);
310         /* lose peerlist's ref */
311         ksocknal_peer_decref(peer_ni);
312 }
313
314 static int
315 ksocknal_get_peer_info(struct lnet_ni *ni, int index,
316                        struct lnet_processid *id, __u32 *myip, __u32 *peer_ip,
317                        int *port, int *conn_count, int *share_count)
318 {
319         struct ksock_peer_ni *peer_ni;
320         struct ksock_conn_cb *conn_cb;
321         int i;
322         int j;
323         int rc = -ENOENT;
324
325         read_lock(&ksocknal_data.ksnd_global_lock);
326
327         hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list) {
328
329                 if (peer_ni->ksnp_ni != ni)
330                         continue;
331
332                 if (peer_ni->ksnp_n_passive_ips == 0 &&
333                     peer_ni->ksnp_conn_cb == NULL) {
334                         if (index-- > 0)
335                                 continue;
336
337                         *id = peer_ni->ksnp_id;
338                         *myip = 0;
339                         *peer_ip = 0;
340                         *port = 0;
341                         *conn_count = 0;
342                         *share_count = 0;
343                         rc = 0;
344                         goto out;
345                 }
346
347                 for (j = 0; j < peer_ni->ksnp_n_passive_ips; j++) {
348                         if (index-- > 0)
349                                 continue;
350
351                         *id = peer_ni->ksnp_id;
352                         *myip = peer_ni->ksnp_passive_ips[j];
353                         *peer_ip = 0;
354                         *port = 0;
355                         *conn_count = 0;
356                         *share_count = 0;
357                         rc = 0;
358                         goto out;
359                 }
360
361                 if (peer_ni->ksnp_conn_cb) {
362                         if (index-- > 0)
363                                 continue;
364
365                         conn_cb = peer_ni->ksnp_conn_cb;
366
367                         *id = peer_ni->ksnp_id;
368                         if (conn_cb->ksnr_addr.ss_family == AF_INET) {
369                                 struct sockaddr_in *sa =
370                                         (void *)&conn_cb->ksnr_addr;
371
372                                 rc = choose_ipv4_src(myip,
373                                                      conn_cb->ksnr_myiface,
374                                                      ntohl(sa->sin_addr.s_addr),
375                                                      ni->ni_net_ns);
376                                 *peer_ip = ntohl(sa->sin_addr.s_addr);
377                                 *port = ntohs(sa->sin_port);
378                         } else {
379                                 *myip = 0xFFFFFFFF;
380                                 *peer_ip = 0xFFFFFFFF;
381                                 *port = 0;
382                                 rc = -ENOTSUPP;
383                         }
384                         *conn_count = conn_cb->ksnr_conn_count;
385                         *share_count = 1;
386                         goto out;
387                 }
388         }
389 out:
390         read_unlock(&ksocknal_data.ksnd_global_lock);
391         return rc;
392 }
393
394 static unsigned int
395 ksocknal_get_conn_count_by_type(struct ksock_conn_cb *conn_cb,
396                                 int type)
397 {
398         unsigned int count = 0;
399
400         switch (type) {
401         case SOCKLND_CONN_CONTROL:
402                 count = conn_cb->ksnr_ctrl_conn_count;
403                 break;
404         case SOCKLND_CONN_BULK_IN:
405                 count = conn_cb->ksnr_blki_conn_count;
406                 break;
407         case SOCKLND_CONN_BULK_OUT:
408                 count = conn_cb->ksnr_blko_conn_count;
409                 break;
410         case SOCKLND_CONN_ANY:
411                 count = conn_cb->ksnr_conn_count;
412                 break;
413         default:
414                 LBUG();
415                 break;
416         }
417
418         return count;
419 }
420
421 static unsigned int
422 ksocknal_get_conns_per_peer(struct ksock_peer_ni *peer_ni)
423 {
424         struct lnet_ni *ni = peer_ni->ksnp_ni;
425         struct lnet_ioctl_config_socklnd_tunables *tunables;
426
427         LASSERT(ni);
428
429         tunables = &ni->ni_lnd_tunables.lnd_tun_u.lnd_sock;
430
431         return tunables->lnd_conns_per_peer;
432 }
433
434 static void
435 ksocknal_incr_conn_count(struct ksock_conn_cb *conn_cb,
436                          int type)
437 {
438         conn_cb->ksnr_conn_count++;
439
440         /* check if all connections of the given type got created */
441         switch (type) {
442         case SOCKLND_CONN_CONTROL:
443                 conn_cb->ksnr_ctrl_conn_count++;
444                 /* there's a single control connection per peer,
445                  * two in case of loopback
446                  */
447                 conn_cb->ksnr_connected |= BIT(type);
448                 break;
449         case SOCKLND_CONN_BULK_IN:
450                 conn_cb->ksnr_blki_conn_count++;
451                 if (conn_cb->ksnr_blki_conn_count >= conn_cb->ksnr_max_conns)
452                         conn_cb->ksnr_connected |= BIT(type);
453                 break;
454         case SOCKLND_CONN_BULK_OUT:
455                 conn_cb->ksnr_blko_conn_count++;
456                 if (conn_cb->ksnr_blko_conn_count >= conn_cb->ksnr_max_conns)
457                         conn_cb->ksnr_connected |= BIT(type);
458                 break;
459         case SOCKLND_CONN_ANY:
460                 if (conn_cb->ksnr_conn_count >= conn_cb->ksnr_max_conns)
461                         conn_cb->ksnr_connected |= BIT(type);
462                 break;
463         default:
464                 LBUG();
465                 break;
466         }
467
468         CDEBUG(D_NET, "Add conn type %d, ksnr_connected %x ksnr_max_conns %d\n",
469                type, conn_cb->ksnr_connected, conn_cb->ksnr_max_conns);
470 }
471
472
473 static void
474 ksocknal_decr_conn_count(struct ksock_conn_cb *conn_cb,
475                          int type)
476 {
477         conn_cb->ksnr_conn_count--;
478
479         /* check if all connections of the given type got created */
480         switch (type) {
481         case SOCKLND_CONN_CONTROL:
482                 conn_cb->ksnr_ctrl_conn_count--;
483                 /* there's a single control connection per peer,
484                  * two in case of loopback
485                  */
486                 if (conn_cb->ksnr_ctrl_conn_count == 0)
487                         conn_cb->ksnr_connected &= ~BIT(type);
488                 break;
489         case SOCKLND_CONN_BULK_IN:
490                 conn_cb->ksnr_blki_conn_count--;
491                 if (conn_cb->ksnr_blki_conn_count < conn_cb->ksnr_max_conns)
492                         conn_cb->ksnr_connected &= ~BIT(type);
493                 break;
494         case SOCKLND_CONN_BULK_OUT:
495                 conn_cb->ksnr_blko_conn_count--;
496                 if (conn_cb->ksnr_blko_conn_count < conn_cb->ksnr_max_conns)
497                         conn_cb->ksnr_connected &= ~BIT(type);
498                 break;
499         case SOCKLND_CONN_ANY:
500                 if (conn_cb->ksnr_conn_count < conn_cb->ksnr_max_conns)
501                         conn_cb->ksnr_connected &= ~BIT(type);
502                 break;
503         default:
504                 LBUG();
505                 break;
506         }
507
508         CDEBUG(D_NET, "Del conn type %d, ksnr_connected %x ksnr_max_conns %d\n",
509                type, conn_cb->ksnr_connected, conn_cb->ksnr_max_conns);
510 }
511
512 static void
513 ksocknal_associate_cb_conn_locked(struct ksock_conn_cb *conn_cb,
514                                   struct ksock_conn *conn)
515 {
516         struct ksock_peer_ni *peer_ni = conn_cb->ksnr_peer;
517         int type = conn->ksnc_type;
518         struct ksock_interface *iface;
519         int conn_iface;
520
521         conn_iface = ksocknal_ip2index((struct sockaddr *)&conn->ksnc_myaddr,
522                                        peer_ni->ksnp_ni);
523         conn->ksnc_conn_cb = conn_cb;
524         ksocknal_conn_cb_addref(conn_cb);
525
526         if (conn_cb->ksnr_myiface != conn_iface) {
527                 if (conn_cb->ksnr_myiface < 0) {
528                         /* route wasn't bound locally yet (the initial route) */
529                         CDEBUG(D_NET, "Binding %s %pIS to interface %d\n",
530                                libcfs_idstr(&peer_ni->ksnp_id),
531                                &conn_cb->ksnr_addr,
532                                conn_iface);
533                 } else {
534                         CDEBUG(D_NET,
535                                "Rebinding %s %pIS from interface %d to %d\n",
536                                libcfs_idstr(&peer_ni->ksnp_id),
537                                &conn_cb->ksnr_addr,
538                                conn_cb->ksnr_myiface,
539                                conn_iface);
540
541                         iface = ksocknal_index2iface(peer_ni->ksnp_ni,
542                                                      conn_cb->ksnr_myiface);
543                         if (iface)
544                                 iface->ksni_nroutes--;
545                 }
546                 conn_cb->ksnr_myiface = conn_iface;
547                 iface = ksocknal_index2iface(peer_ni->ksnp_ni,
548                                              conn_cb->ksnr_myiface);
549                 if (iface)
550                         iface->ksni_nroutes++;
551         }
552
553         ksocknal_incr_conn_count(conn_cb, type);
554
555         /* Successful connection => further attempts can
556          * proceed immediately
557          */
558         conn_cb->ksnr_retry_interval = 0;
559 }
560
561 static void
562 ksocknal_add_conn_cb_locked(struct ksock_peer_ni *peer_ni,
563                             struct ksock_conn_cb *conn_cb)
564 {
565         struct ksock_conn *conn;
566         struct ksock_net *net = peer_ni->ksnp_ni->ni_data;
567
568         LASSERT(!peer_ni->ksnp_closing);
569         LASSERT(!conn_cb->ksnr_peer);
570         LASSERT(!conn_cb->ksnr_scheduled);
571         LASSERT(!conn_cb->ksnr_connecting);
572         LASSERT(conn_cb->ksnr_connected == 0);
573
574         conn_cb->ksnr_peer = peer_ni;
575         ksocknal_peer_addref(peer_ni);
576
577         /* set the conn_cb's interface to the current net's interface */
578         conn_cb->ksnr_myiface = net->ksnn_interface.ksni_index;
579         net->ksnn_interface.ksni_nroutes++;
580
581         /* peer_ni's route list takes over my ref on 'route' */
582         peer_ni->ksnp_conn_cb = conn_cb;
583
584         list_for_each_entry(conn, &peer_ni->ksnp_conns, ksnc_list) {
585                 if (!rpc_cmp_addr((struct sockaddr *)&conn->ksnc_peeraddr,
586                                   (struct sockaddr *)&conn_cb->ksnr_addr))
587                         continue;
588
589                 ksocknal_associate_cb_conn_locked(conn_cb, conn);
590                 /* keep going (typed conns) */
591         }
592 }
593
594 static void
595 ksocknal_del_conn_cb_locked(struct ksock_conn_cb *conn_cb)
596 {
597         struct ksock_peer_ni *peer_ni = conn_cb->ksnr_peer;
598         struct ksock_interface *iface;
599         struct ksock_conn *conn;
600         struct ksock_conn *cnxt;
601
602         LASSERT(!conn_cb->ksnr_deleted);
603
604         /* Close associated conns */
605         list_for_each_entry_safe(conn, cnxt, &peer_ni->ksnp_conns, ksnc_list) {
606                 if (conn->ksnc_conn_cb != conn_cb)
607                         continue;
608
609                 ksocknal_close_conn_locked(conn, 0);
610         }
611
612         if (conn_cb->ksnr_myiface >= 0) {
613                 iface = ksocknal_index2iface(peer_ni->ksnp_ni,
614                                              conn_cb->ksnr_myiface);
615                 if (iface)
616                         iface->ksni_nroutes--;
617         }
618
619         conn_cb->ksnr_deleted = 1;
620         ksocknal_conn_cb_decref(conn_cb);               /* drop peer_ni's ref */
621         peer_ni->ksnp_conn_cb = NULL;
622
623         if (list_empty(&peer_ni->ksnp_conns)) {
624                 /* I've just removed the last route to a peer_ni with no active
625                  * connections
626                  */
627                 ksocknal_unlink_peer_locked(peer_ni);
628         }
629 }
630
631 int
632 ksocknal_add_peer(struct lnet_ni *ni, struct lnet_processid *id,
633                   struct sockaddr *addr)
634 {
635         struct ksock_peer_ni *peer_ni;
636         struct ksock_peer_ni *peer2;
637         struct ksock_conn_cb *conn_cb;
638
639         if (LNET_NID_IS_ANY(&id->nid) ||
640             id->pid == LNET_PID_ANY)
641                 return (-EINVAL);
642
643         /* Have a brand new peer_ni ready... */
644         peer_ni = ksocknal_create_peer(ni, id);
645         if (IS_ERR(peer_ni))
646                 return PTR_ERR(peer_ni);
647
648         conn_cb = ksocknal_create_conn_cb(addr);
649         if (!conn_cb) {
650                 ksocknal_peer_decref(peer_ni);
651                 return -ENOMEM;
652         }
653
654         write_lock_bh(&ksocknal_data.ksnd_global_lock);
655
656         /* always called with a ref on ni, so shutdown can't have started */
657         LASSERT(atomic_read(&((struct ksock_net *)ni->ni_data)->ksnn_npeers)
658                 >= 0);
659
660         peer2 = ksocknal_find_peer_locked(ni, id);
661         if (peer2 != NULL) {
662                 ksocknal_peer_decref(peer_ni);
663                 peer_ni = peer2;
664         } else {
665                 /* peer_ni table takes my ref on peer_ni */
666                 hash_add(ksocknal_data.ksnd_peers, &peer_ni->ksnp_list,
667                          nidhash(&id->nid));
668         }
669
670         if (peer_ni->ksnp_conn_cb) {
671                 ksocknal_conn_cb_decref(conn_cb);
672         } else {
673                 ksocknal_add_conn_cb_locked(peer_ni, conn_cb);
674                 /* Remember conns_per_peer setting at the time
675                  * of connection initiation. It will define the
676                  * max number of conns per type for this conn_cb
677                  * while it's in use.
678                  */
679                 conn_cb->ksnr_max_conns = ksocknal_get_conns_per_peer(peer_ni);
680         }
681
682         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
683
684         return 0;
685 }
686
687 static void
688 ksocknal_del_peer_locked(struct ksock_peer_ni *peer_ni)
689 {
690         struct ksock_conn *conn;
691         struct ksock_conn *cnxt;
692         struct ksock_conn_cb *conn_cb;
693
694         LASSERT(!peer_ni->ksnp_closing);
695
696         /* Extra ref prevents peer_ni disappearing until I'm done with it */
697         ksocknal_peer_addref(peer_ni);
698         conn_cb = peer_ni->ksnp_conn_cb;
699         if (conn_cb)
700                 ksocknal_del_conn_cb_locked(conn_cb);
701
702         list_for_each_entry_safe(conn, cnxt, &peer_ni->ksnp_conns,
703                                  ksnc_list)
704                 ksocknal_close_conn_locked(conn, 0);
705
706         ksocknal_peer_decref(peer_ni);
707         /* NB peer_ni unlinks itself when last conn/conn_cb is removed */
708 }
709
710 static int
711 ksocknal_del_peer(struct lnet_ni *ni, struct lnet_processid *id)
712 {
713         LIST_HEAD(zombies);
714         struct hlist_node *pnxt;
715         struct ksock_peer_ni *peer_ni;
716         int lo;
717         int hi;
718         int i;
719         int rc = -ENOENT;
720
721         write_lock_bh(&ksocknal_data.ksnd_global_lock);
722
723         if (id && !LNET_NID_IS_ANY(&id->nid)) {
724                 lo = hash_min(nidhash(&id->nid),
725                               HASH_BITS(ksocknal_data.ksnd_peers));
726                 hi = lo;
727         } else {
728                 lo = 0;
729                 hi = HASH_SIZE(ksocknal_data.ksnd_peers) - 1;
730         }
731
732         for (i = lo; i <= hi; i++) {
733                 hlist_for_each_entry_safe(peer_ni, pnxt,
734                                           &ksocknal_data.ksnd_peers[i],
735                                           ksnp_list) {
736                         if (peer_ni->ksnp_ni != ni)
737                                 continue;
738
739                         if (!((!id || LNET_NID_IS_ANY(&id->nid) ||
740                                nid_same(&peer_ni->ksnp_id.nid, &id->nid)) &&
741                               (!id || id->pid == LNET_PID_ANY ||
742                                peer_ni->ksnp_id.pid == id->pid)))
743                                 continue;
744
745                         ksocknal_peer_addref(peer_ni);  /* a ref for me... */
746
747                         ksocknal_del_peer_locked(peer_ni);
748
749                         if (peer_ni->ksnp_closing &&
750                             !list_empty(&peer_ni->ksnp_tx_queue)) {
751                                 LASSERT(list_empty(&peer_ni->ksnp_conns));
752                                 LASSERT(peer_ni->ksnp_conn_cb == NULL);
753
754                                 list_splice_init(&peer_ni->ksnp_tx_queue,
755                                                  &zombies);
756                         }
757
758                         ksocknal_peer_decref(peer_ni);  /* ...till here */
759
760                         rc = 0;                         /* matched! */
761                 }
762         }
763
764         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
765
766         ksocknal_txlist_done(ni, &zombies, -ENETDOWN);
767
768         return rc;
769 }
770
771 static struct ksock_conn *
772 ksocknal_get_conn_by_idx(struct lnet_ni *ni, int index)
773 {
774         struct ksock_peer_ni *peer_ni;
775         struct ksock_conn *conn;
776         int i;
777
778         read_lock(&ksocknal_data.ksnd_global_lock);
779
780         hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list) {
781                 LASSERT(!peer_ni->ksnp_closing);
782
783                 if (peer_ni->ksnp_ni != ni)
784                         continue;
785
786                 list_for_each_entry(conn, &peer_ni->ksnp_conns,
787                                     ksnc_list) {
788                         if (index-- > 0)
789                                 continue;
790
791                         ksocknal_conn_addref(conn);
792                         read_unlock(&ksocknal_data.ksnd_global_lock);
793                         return conn;
794                 }
795         }
796
797         read_unlock(&ksocknal_data.ksnd_global_lock);
798         return NULL;
799 }
800
801 static struct ksock_sched *
802 ksocknal_choose_scheduler_locked(unsigned int cpt)
803 {
804         struct ksock_sched *sched = ksocknal_data.ksnd_schedulers[cpt];
805         int i;
806
807         if (sched->kss_nthreads == 0) {
808                 cfs_percpt_for_each(sched, i, ksocknal_data.ksnd_schedulers) {
809                         if (sched->kss_nthreads > 0) {
810                                 CDEBUG(D_NET, "scheduler[%d] has no threads. selected scheduler[%d]\n",
811                                        cpt, sched->kss_cpt);
812                                 return sched;
813                         }
814                 }
815                 return NULL;
816         }
817
818         return sched;
819 }
820
821 int
822 ksocknal_accept(struct lnet_ni *ni, struct socket *sock)
823 {
824         struct ksock_connreq *cr;
825         int rc;
826         struct sockaddr_storage peer;
827
828         rc = lnet_sock_getaddr(sock, true, &peer);
829         if (rc != 0) {
830                 CERROR("Can't determine new connection's address\n");
831                 return rc;
832         }
833
834         LIBCFS_ALLOC(cr, sizeof(*cr));
835         if (cr == NULL) {
836                 LCONSOLE_ERROR_MSG(0x12f,
837                                    "Dropping connection request from %pIS: memory exhausted\n",
838                                    &peer);
839                 return -ENOMEM;
840         }
841
842         lnet_ni_addref(ni);
843         cr->ksncr_ni   = ni;
844         cr->ksncr_sock = sock;
845
846         spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
847
848         list_add_tail(&cr->ksncr_list, &ksocknal_data.ksnd_connd_connreqs);
849         wake_up(&ksocknal_data.ksnd_connd_waitq);
850
851         spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
852         return 0;
853 }
854
855 static int
856 ksocknal_connecting(struct ksock_conn_cb *conn_cb, struct sockaddr *sa)
857 {
858         if (conn_cb &&
859             rpc_cmp_addr((struct sockaddr *)&conn_cb->ksnr_addr, sa))
860                 return conn_cb->ksnr_connecting;
861         return 0;
862 }
863
864 int
865 ksocknal_create_conn(struct lnet_ni *ni, struct ksock_conn_cb *conn_cb,
866                      struct socket *sock, int type)
867 {
868         rwlock_t *global_lock = &ksocknal_data.ksnd_global_lock;
869         LIST_HEAD(zombies);
870         struct lnet_processid peerid;
871         u64 incarnation;
872         struct ksock_conn *conn;
873         struct ksock_conn *conn2;
874         struct ksock_peer_ni *peer_ni = NULL;
875         struct ksock_peer_ni *peer2;
876         struct ksock_sched *sched;
877         struct ksock_hello_msg *hello;
878         int cpt;
879         struct ksock_tx *tx;
880         struct ksock_tx *txtmp;
881         int rc;
882         int rc2;
883         int active;
884         int num_dup = 0;
885         char *warn = NULL;
886
887         active = (conn_cb != NULL);
888
889         LASSERT(active == (type != SOCKLND_CONN_NONE));
890
891         LIBCFS_ALLOC(conn, sizeof(*conn));
892         if (conn == NULL) {
893                 rc = -ENOMEM;
894                 goto failed_0;
895         }
896
897         conn->ksnc_peer = NULL;
898         conn->ksnc_conn_cb = NULL;
899         conn->ksnc_sock = sock;
900         /* 2 ref, 1 for conn, another extra ref prevents socket
901          * being closed before establishment of connection */
902         refcount_set(&conn->ksnc_sock_refcount, 2);
903         conn->ksnc_type = type;
904         ksocknal_lib_save_callback(sock, conn);
905         refcount_set(&conn->ksnc_conn_refcount, 1); /* 1 ref for me */
906
907         conn->ksnc_rx_ready = 0;
908         conn->ksnc_rx_scheduled = 0;
909
910         INIT_LIST_HEAD(&conn->ksnc_tx_queue);
911         conn->ksnc_tx_ready = 0;
912         conn->ksnc_tx_scheduled = 0;
913         conn->ksnc_tx_carrier = NULL;
914         atomic_set (&conn->ksnc_tx_nob, 0);
915
916         LIBCFS_ALLOC(hello, offsetof(struct ksock_hello_msg,
917                                      kshm_ips[LNET_INTERFACES_NUM]));
918         if (hello == NULL) {
919                 rc = -ENOMEM;
920                 goto failed_1;
921         }
922
923         /* stash conn's local and remote addrs */
924         rc = ksocknal_lib_get_conn_addrs(conn);
925         if (rc != 0)
926                 goto failed_1;
927
928         /* Find out/confirm peer_ni's NID and connection type and get the
929          * vector of interfaces she's willing to let me connect to.
930          * Passive connections use the listener timeout since the peer_ni sends
931          * eagerly
932          */
933
934         if (active) {
935                 peer_ni = conn_cb->ksnr_peer;
936                 LASSERT(ni == peer_ni->ksnp_ni);
937
938                 /* Active connection sends HELLO eagerly */
939                 hello->kshm_nips = 0;
940                 peerid = peer_ni->ksnp_id;
941
942                 write_lock_bh(global_lock);
943                 conn->ksnc_proto = peer_ni->ksnp_proto;
944                 write_unlock_bh(global_lock);
945
946                 if (conn->ksnc_proto == NULL) {
947                         conn->ksnc_proto = &ksocknal_protocol_v3x;
948 #if SOCKNAL_VERSION_DEBUG
949                         if (*ksocknal_tunables.ksnd_protocol == 2)
950                                 conn->ksnc_proto = &ksocknal_protocol_v2x;
951                         else if (*ksocknal_tunables.ksnd_protocol == 1)
952                                 conn->ksnc_proto = &ksocknal_protocol_v1x;
953 #endif
954                 }
955
956                 rc = ksocknal_send_hello(ni, conn, &peerid.nid, hello);
957                 if (rc != 0)
958                         goto failed_1;
959         } else {
960                 peerid.nid = LNET_ANY_NID;
961                 peerid.pid = LNET_PID_ANY;
962
963                 /* Passive, get protocol from peer_ni */
964                 conn->ksnc_proto = NULL;
965         }
966
967         rc = ksocknal_recv_hello(ni, conn, hello, &peerid, &incarnation);
968         if (rc < 0)
969                 goto failed_1;
970
971         LASSERT(rc == 0 || active);
972         LASSERT(conn->ksnc_proto != NULL);
973         LASSERT(!LNET_NID_IS_ANY(&peerid.nid));
974
975         cpt = lnet_nid2cpt(&peerid.nid, ni);
976
977         if (active) {
978                 ksocknal_peer_addref(peer_ni);
979                 write_lock_bh(global_lock);
980         } else {
981                 peer_ni = ksocknal_create_peer(ni, &peerid);
982                 if (IS_ERR(peer_ni)) {
983                         rc = PTR_ERR(peer_ni);
984                         goto failed_1;
985                 }
986
987                 write_lock_bh(global_lock);
988
989                 /* called with a ref on ni, so shutdown can't have started */
990                 LASSERT(atomic_read(&((struct ksock_net *)ni->ni_data)->ksnn_npeers) >= 0);
991
992                 peer2 = ksocknal_find_peer_locked(ni, &peerid);
993                 if (peer2 == NULL) {
994                         /* NB this puts an "empty" peer_ni in the peer_ni
995                          * table (which takes my ref) */
996                         hash_add(ksocknal_data.ksnd_peers,
997                                  &peer_ni->ksnp_list, nidhash(&peerid.nid));
998                 } else {
999                         ksocknal_peer_decref(peer_ni);
1000                         peer_ni = peer2;
1001                 }
1002
1003                 /* +1 ref for me */
1004                 ksocknal_peer_addref(peer_ni);
1005                 peer_ni->ksnp_accepting++;
1006
1007                 /* Am I already connecting to this guy?  Resolve in
1008                  * favour of higher NID...
1009                  */
1010                 if (memcmp(&peerid.nid, &ni->ni_nid, sizeof(peerid.nid)) < 0 &&
1011                     ksocknal_connecting(peer_ni->ksnp_conn_cb,
1012                                         ((struct sockaddr *) &conn->ksnc_peeraddr))) {
1013                         rc = EALREADY;
1014                         warn = "connection race resolution";
1015                         goto failed_2;
1016                 }
1017         }
1018
1019         if (peer_ni->ksnp_closing ||
1020             (active && conn_cb->ksnr_deleted)) {
1021                 /* peer_ni/conn_cb got closed under me */
1022                 rc = -ESTALE;
1023                 warn = "peer_ni/conn_cb removed";
1024                 goto failed_2;
1025         }
1026
1027         if (peer_ni->ksnp_proto == NULL) {
1028                 /* Never connected before.
1029                  * NB recv_hello may have returned EPROTO to signal my peer_ni
1030                  * wants a different protocol than the one I asked for.
1031                  */
1032                 LASSERT(list_empty(&peer_ni->ksnp_conns));
1033
1034                 peer_ni->ksnp_proto = conn->ksnc_proto;
1035                 peer_ni->ksnp_incarnation = incarnation;
1036         }
1037
1038         if (peer_ni->ksnp_proto != conn->ksnc_proto ||
1039             peer_ni->ksnp_incarnation != incarnation) {
1040                 /* peer_ni rebooted or I've got the wrong protocol version */
1041                 ksocknal_close_peer_conns_locked(peer_ni, NULL, 0);
1042
1043                 peer_ni->ksnp_proto = NULL;
1044                 rc = ESTALE;
1045                 warn = peer_ni->ksnp_incarnation != incarnation ?
1046                         "peer_ni rebooted" :
1047                         "wrong proto version";
1048                 goto failed_2;
1049         }
1050
1051         switch (rc) {
1052         default:
1053                 LBUG();
1054         case 0:
1055                 break;
1056         case EALREADY:
1057                 warn = "lost conn race";
1058                 goto failed_2;
1059         case EPROTO:
1060                 warn = "retry with different protocol version";
1061                 goto failed_2;
1062         }
1063
1064         /* Refuse to duplicate an existing connection, unless this is a
1065          * loopback connection */
1066         if (!rpc_cmp_addr((struct sockaddr *)&conn->ksnc_peeraddr,
1067                           (struct sockaddr *)&conn->ksnc_myaddr)) {
1068                 list_for_each_entry(conn2, &peer_ni->ksnp_conns, ksnc_list) {
1069                         if (!rpc_cmp_addr(
1070                                     (struct sockaddr *)&conn2->ksnc_peeraddr,
1071                                     (struct sockaddr *)&conn->ksnc_peeraddr) ||
1072                             !rpc_cmp_addr(
1073                                     (struct sockaddr *)&conn2->ksnc_myaddr,
1074                                     (struct sockaddr *)&conn->ksnc_myaddr) ||
1075                             conn2->ksnc_type != conn->ksnc_type)
1076                                 continue;
1077
1078                         num_dup++;
1079                         /* If max conns per type is not registered in conn_cb
1080                          * as ksnr_max_conns, use ni's conns_per_peer
1081                          */
1082                         if ((peer_ni->ksnp_conn_cb &&
1083                             num_dup < peer_ni->ksnp_conn_cb->ksnr_max_conns) ||
1084                             (!peer_ni->ksnp_conn_cb &&
1085                             num_dup < ksocknal_get_conns_per_peer(peer_ni)))
1086                                 continue;
1087
1088                         /* Reply on a passive connection attempt so the peer_ni
1089                          * realises we're connected.
1090                          */
1091                         LASSERT(rc == 0);
1092                         if (!active)
1093                                 rc = EALREADY;
1094
1095                         warn = "duplicate";
1096                         goto failed_2;
1097                 }
1098         }
1099         /* If the connection created by this route didn't bind to the IP
1100          * address the route connected to, the connection/route matching
1101          * code below probably isn't going to work.
1102          */
1103         if (active &&
1104             !rpc_cmp_addr((struct sockaddr *)&conn_cb->ksnr_addr,
1105                           (struct sockaddr *)&conn->ksnc_peeraddr)) {
1106                 CERROR("Route %s %pIS connected to %pIS\n",
1107                        libcfs_idstr(&peer_ni->ksnp_id),
1108                        &conn_cb->ksnr_addr,
1109                        &conn->ksnc_peeraddr);
1110         }
1111
1112         /* Search for a conn_cb corresponding to the new connection and
1113          * create an association.  This allows incoming connections created
1114          * by conn_cbs in my peer_ni to match my own conn_cb entries so I don't
1115          * continually create duplicate conn_cbs.
1116          */
1117         conn_cb = peer_ni->ksnp_conn_cb;
1118
1119         if (conn_cb && rpc_cmp_addr((struct sockaddr *)&conn->ksnc_peeraddr,
1120                                     (struct sockaddr *)&conn_cb->ksnr_addr))
1121                 ksocknal_associate_cb_conn_locked(conn_cb, conn);
1122
1123         conn->ksnc_peer = peer_ni;                 /* conn takes my ref on peer_ni */
1124         peer_ni->ksnp_last_alive = ktime_get_seconds();
1125         peer_ni->ksnp_send_keepalive = 0;
1126         peer_ni->ksnp_error = 0;
1127
1128         sched = ksocknal_choose_scheduler_locked(cpt);
1129         if (!sched) {
1130                 CERROR("no schedulers available. node is unhealthy\n");
1131                 goto failed_2;
1132         }
1133         /*
1134          * The cpt might have changed if we ended up selecting a non cpt
1135          * native scheduler. So use the scheduler's cpt instead.
1136          */
1137         cpt = sched->kss_cpt;
1138         sched->kss_nconns++;
1139         conn->ksnc_scheduler = sched;
1140
1141         conn->ksnc_tx_last_post = ktime_get_seconds();
1142         /* Set the deadline for the outgoing HELLO to drain */
1143         conn->ksnc_tx_bufnob = sock->sk->sk_wmem_queued;
1144         conn->ksnc_tx_deadline = ktime_get_seconds() +
1145                                  ksocknal_timeout();
1146         smp_mb();   /* order with adding to peer_ni's conn list */
1147
1148         list_add(&conn->ksnc_list, &peer_ni->ksnp_conns);
1149         ksocknal_conn_addref(conn);
1150
1151         ksocknal_new_packet(conn, 0);
1152
1153         conn->ksnc_zc_capable = ksocknal_lib_zc_capable(conn);
1154
1155         /* Take packets blocking for this connection. */
1156         list_for_each_entry_safe(tx, txtmp, &peer_ni->ksnp_tx_queue, tx_list) {
1157                 if (conn->ksnc_proto->pro_match_tx(conn, tx, tx->tx_nonblk) ==
1158                     SOCKNAL_MATCH_NO)
1159                         continue;
1160
1161                 list_del(&tx->tx_list);
1162                 ksocknal_queue_tx_locked(tx, conn);
1163         }
1164
1165         write_unlock_bh(global_lock);
1166         /* We've now got a new connection.  Any errors from here on are just
1167          * like "normal" comms errors and we close the connection normally.
1168          * NB (a) we still have to send the reply HELLO for passive
1169          *        connections,
1170          *    (b) normal I/O on the conn is blocked until I setup and call the
1171          *        socket callbacks.
1172          */
1173
1174         CDEBUG(D_NET, "New conn %s p %d.x %pIS -> %pISp"
1175                " incarnation:%lld sched[%d]\n",
1176                libcfs_idstr(&peerid), conn->ksnc_proto->pro_version,
1177                &conn->ksnc_myaddr, &conn->ksnc_peeraddr,
1178                incarnation, cpt);
1179
1180         if (!active) {
1181                 hello->kshm_nips = 0;
1182                 rc = ksocknal_send_hello(ni, conn, &peerid.nid, hello);
1183         }
1184
1185         LIBCFS_FREE(hello, offsetof(struct ksock_hello_msg,
1186                                     kshm_ips[LNET_INTERFACES_NUM]));
1187
1188         /* setup the socket AFTER I've received hello (it disables
1189          * SO_LINGER).  I might call back to the acceptor who may want
1190          * to send a protocol version response and then close the
1191          * socket; this ensures the socket only tears down after the
1192          * response has been sent.
1193          */
1194         if (rc == 0)
1195                 rc = ksocknal_lib_setup_sock(sock);
1196
1197         write_lock_bh(global_lock);
1198
1199         /* NB my callbacks block while I hold ksnd_global_lock */
1200         ksocknal_lib_set_callback(sock, conn);
1201
1202         if (!active)
1203                 peer_ni->ksnp_accepting--;
1204
1205         write_unlock_bh(global_lock);
1206
1207         if (rc != 0) {
1208                 write_lock_bh(global_lock);
1209                 if (!conn->ksnc_closing) {
1210                         /* could be closed by another thread */
1211                         ksocknal_close_conn_locked(conn, rc);
1212                 }
1213                 write_unlock_bh(global_lock);
1214         } else if (ksocknal_connsock_addref(conn) == 0) {
1215                 /* Allow I/O to proceed. */
1216                 ksocknal_read_callback(conn);
1217                 ksocknal_write_callback(conn);
1218                 ksocknal_connsock_decref(conn);
1219         }
1220
1221         ksocknal_connsock_decref(conn);
1222         ksocknal_conn_decref(conn);
1223         return rc;
1224
1225 failed_2:
1226
1227         if (!peer_ni->ksnp_closing &&
1228             list_empty(&peer_ni->ksnp_conns) &&
1229             peer_ni->ksnp_conn_cb == NULL) {
1230                 list_splice_init(&peer_ni->ksnp_tx_queue, &zombies);
1231                 ksocknal_unlink_peer_locked(peer_ni);
1232         }
1233
1234         write_unlock_bh(global_lock);
1235
1236         if (warn != NULL) {
1237                 if (rc < 0)
1238                         CERROR("Not creating conn %s type %d: %s\n",
1239                                libcfs_idstr(&peerid), conn->ksnc_type, warn);
1240                 else
1241                         CDEBUG(D_NET, "Not creating conn %s type %d: %s\n",
1242                                libcfs_idstr(&peerid), conn->ksnc_type, warn);
1243         }
1244
1245         if (!active) {
1246                 if (rc > 0) {
1247                         /* Request retry by replying with CONN_NONE
1248                          * ksnc_proto has been set already
1249                          */
1250                         conn->ksnc_type = SOCKLND_CONN_NONE;
1251                         hello->kshm_nips = 0;
1252                         ksocknal_send_hello(ni, conn, &peerid.nid, hello);
1253                 }
1254
1255                 write_lock_bh(global_lock);
1256                 peer_ni->ksnp_accepting--;
1257                 write_unlock_bh(global_lock);
1258         }
1259
1260         /*
1261          * If we get here without an error code, just use -EALREADY.
1262          * Depending on how we got here, the error may be positive
1263          * or negative. Normalize the value for ksocknal_txlist_done().
1264          */
1265         rc2 = (rc == 0 ? -EALREADY : (rc > 0 ? -rc : rc));
1266         ksocknal_txlist_done(ni, &zombies, rc2);
1267         ksocknal_peer_decref(peer_ni);
1268
1269 failed_1:
1270         if (hello != NULL)
1271                 LIBCFS_FREE(hello, offsetof(struct ksock_hello_msg,
1272                                             kshm_ips[LNET_INTERFACES_NUM]));
1273
1274         LIBCFS_FREE(conn, sizeof(*conn));
1275
1276 failed_0:
1277         sock_release(sock);
1278
1279         return rc;
1280 }
1281
1282 void
1283 ksocknal_close_conn_locked(struct ksock_conn *conn, int error)
1284 {
1285         /* This just does the immmediate housekeeping, and queues the
1286          * connection for the reaper to terminate.
1287          * Caller holds ksnd_global_lock exclusively in irq context */
1288         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1289         struct ksock_conn_cb *conn_cb;
1290         struct ksock_conn *conn2;
1291         int conn_count;
1292         int duplicate_count = 0;
1293
1294         LASSERT(peer_ni->ksnp_error == 0);
1295         LASSERT(!conn->ksnc_closing);
1296         conn->ksnc_closing = 1;
1297
1298         /* ksnd_deathrow_conns takes over peer_ni's ref */
1299         list_del(&conn->ksnc_list);
1300
1301         conn_cb = conn->ksnc_conn_cb;
1302         if (conn_cb != NULL) {
1303                 /* dissociate conn from cb... */
1304                 LASSERT(!conn_cb->ksnr_deleted);
1305
1306                 conn_count = ksocknal_get_conn_count_by_type(conn_cb,
1307                                                              conn->ksnc_type);
1308                 /* connected bit is set only if all connections
1309                  * of the given type got created
1310                  */
1311                 if (conn_count == conn_cb->ksnr_max_conns)
1312                         LASSERT((conn_cb->ksnr_connected &
1313                                 BIT(conn->ksnc_type)) != 0);
1314
1315                 if (conn_count == 1) {
1316                         list_for_each_entry(conn2, &peer_ni->ksnp_conns,
1317                                             ksnc_list) {
1318                                 if (conn2->ksnc_conn_cb == conn_cb &&
1319                                     conn2->ksnc_type == conn->ksnc_type)
1320                                         duplicate_count += 1;
1321                         }
1322                         if (duplicate_count > 0)
1323                                 CERROR("Found %d duplicate conns type %d\n",
1324                                        duplicate_count,
1325                                        conn->ksnc_type);
1326                 }
1327                 ksocknal_decr_conn_count(conn_cb, conn->ksnc_type);
1328
1329                 conn->ksnc_conn_cb = NULL;
1330
1331                 /* drop conn's ref on conn_cb */
1332                 ksocknal_conn_cb_decref(conn_cb);
1333         }
1334
1335         if (list_empty(&peer_ni->ksnp_conns)) {
1336                 /* No more connections to this peer_ni */
1337
1338                 if (!list_empty(&peer_ni->ksnp_tx_queue)) {
1339                         struct ksock_tx *tx;
1340
1341                         LASSERT(conn->ksnc_proto == &ksocknal_protocol_v3x);
1342
1343                         /* throw them to the last connection...,
1344                          * these TXs will be send to /dev/null by scheduler */
1345                         list_for_each_entry(tx, &peer_ni->ksnp_tx_queue,
1346                                             tx_list)
1347                                 ksocknal_tx_prep(conn, tx);
1348
1349                         spin_lock_bh(&conn->ksnc_scheduler->kss_lock);
1350                         list_splice_init(&peer_ni->ksnp_tx_queue,
1351                                          &conn->ksnc_tx_queue);
1352                         spin_unlock_bh(&conn->ksnc_scheduler->kss_lock);
1353                 }
1354
1355                 /* renegotiate protocol version */
1356                 peer_ni->ksnp_proto = NULL;
1357                 /* stash last conn close reason */
1358                 peer_ni->ksnp_error = error;
1359
1360                 if (peer_ni->ksnp_conn_cb == NULL) {
1361                         /* I've just closed last conn belonging to a
1362                          * peer_ni with no connections to it
1363                          */
1364                         ksocknal_unlink_peer_locked(peer_ni);
1365                 }
1366         }
1367
1368         spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
1369
1370         list_add_tail(&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
1371         wake_up(&ksocknal_data.ksnd_reaper_waitq);
1372
1373         spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
1374 }
1375
1376 void
1377 ksocknal_peer_failed(struct ksock_peer_ni *peer_ni)
1378 {
1379         bool notify = false;
1380         time64_t last_alive = 0;
1381
1382         /* There has been a connection failure or comms error; but I'll only
1383          * tell LNET I think the peer_ni is dead if it's to another kernel and
1384          * there are no connections or connection attempts in existence. */
1385
1386         read_lock(&ksocknal_data.ksnd_global_lock);
1387
1388         if ((peer_ni->ksnp_id.pid & LNET_PID_USERFLAG) == 0 &&
1389              list_empty(&peer_ni->ksnp_conns) &&
1390              peer_ni->ksnp_accepting == 0 &&
1391              !ksocknal_find_connecting_conn_cb_locked(peer_ni)) {
1392                 notify = true;
1393                 last_alive = peer_ni->ksnp_last_alive;
1394         }
1395
1396         read_unlock(&ksocknal_data.ksnd_global_lock);
1397
1398         if (notify)
1399                 lnet_notify(peer_ni->ksnp_ni,
1400                             lnet_nid_to_nid4(&peer_ni->ksnp_id.nid),
1401                             false, false, last_alive);
1402 }
1403
1404 void
1405 ksocknal_finalize_zcreq(struct ksock_conn *conn)
1406 {
1407         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1408         struct ksock_tx *tx;
1409         struct ksock_tx *tmp;
1410         LIST_HEAD(zlist);
1411
1412         /* NB safe to finalize TXs because closing of socket will
1413          * abort all buffered data */
1414         LASSERT(conn->ksnc_sock == NULL);
1415
1416         spin_lock(&peer_ni->ksnp_lock);
1417
1418         list_for_each_entry_safe(tx, tmp, &peer_ni->ksnp_zc_req_list,
1419                                  tx_zc_list) {
1420                 if (tx->tx_conn != conn)
1421                         continue;
1422
1423                 LASSERT(tx->tx_msg.ksm_zc_cookies[0] != 0);
1424
1425                 tx->tx_msg.ksm_zc_cookies[0] = 0;
1426                 tx->tx_zc_aborted = 1;  /* mark it as not-acked */
1427                 list_move(&tx->tx_zc_list, &zlist);
1428         }
1429
1430         spin_unlock(&peer_ni->ksnp_lock);
1431
1432         while ((tx = list_first_entry_or_null(&zlist, struct ksock_tx,
1433                                               tx_zc_list)) != NULL) {
1434                 list_del(&tx->tx_zc_list);
1435                 ksocknal_tx_decref(tx);
1436         }
1437 }
1438
1439 void
1440 ksocknal_terminate_conn(struct ksock_conn *conn)
1441 {
1442         /* This gets called by the reaper (guaranteed thread context) to
1443          * disengage the socket from its callbacks and close it.
1444          * ksnc_refcount will eventually hit zero, and then the reaper will
1445          * destroy it.
1446          */
1447         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1448         struct ksock_sched *sched = conn->ksnc_scheduler;
1449         bool failed = false;
1450
1451         LASSERT(conn->ksnc_closing);
1452
1453         /* wake up the scheduler to "send" all remaining packets to /dev/null */
1454         spin_lock_bh(&sched->kss_lock);
1455
1456         /* a closing conn is always ready to tx */
1457         conn->ksnc_tx_ready = 1;
1458
1459         if (!conn->ksnc_tx_scheduled &&
1460             !list_empty(&conn->ksnc_tx_queue)) {
1461                 list_add_tail(&conn->ksnc_tx_list,
1462                               &sched->kss_tx_conns);
1463                 conn->ksnc_tx_scheduled = 1;
1464                 /* extra ref for scheduler */
1465                 ksocknal_conn_addref(conn);
1466
1467                 wake_up(&sched->kss_waitq);
1468         }
1469
1470         spin_unlock_bh(&sched->kss_lock);
1471
1472         /* serialise with callbacks */
1473         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1474
1475         ksocknal_lib_reset_callback(conn->ksnc_sock, conn);
1476
1477         /* OK, so this conn may not be completely disengaged from its
1478          * scheduler yet, but it _has_ committed to terminate...
1479          */
1480         conn->ksnc_scheduler->kss_nconns--;
1481
1482         if (peer_ni->ksnp_error != 0) {
1483                 /* peer_ni's last conn closed in error */
1484                 LASSERT(list_empty(&peer_ni->ksnp_conns));
1485                 failed = true;
1486                 peer_ni->ksnp_error = 0;     /* avoid multiple notifications */
1487         }
1488
1489         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1490
1491         if (failed)
1492                 ksocknal_peer_failed(peer_ni);
1493
1494         /* The socket is closed on the final put; either here, or in
1495          * ksocknal_{send,recv}msg().  Since we set up the linger2 option
1496          * when the connection was established, this will close the socket
1497          * immediately, aborting anything buffered in it. Any hung
1498          * zero-copy transmits will therefore complete in finite time.
1499          */
1500         ksocknal_connsock_decref(conn);
1501 }
1502
1503 void
1504 ksocknal_queue_zombie_conn(struct ksock_conn *conn)
1505 {
1506         /* Queue the conn for the reaper to destroy */
1507         LASSERT(refcount_read(&conn->ksnc_conn_refcount) == 0);
1508         spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
1509
1510         list_add_tail(&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1511         wake_up(&ksocknal_data.ksnd_reaper_waitq);
1512
1513         spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
1514 }
1515
1516 void
1517 ksocknal_destroy_conn(struct ksock_conn *conn)
1518 {
1519         time64_t last_rcv;
1520
1521         /* Final coup-de-grace of the reaper */
1522         CDEBUG(D_NET, "connection %p\n", conn);
1523
1524         LASSERT(refcount_read(&conn->ksnc_conn_refcount) == 0);
1525         LASSERT(refcount_read(&conn->ksnc_sock_refcount) == 0);
1526         LASSERT(conn->ksnc_sock == NULL);
1527         LASSERT(conn->ksnc_conn_cb == NULL);
1528         LASSERT(!conn->ksnc_tx_scheduled);
1529         LASSERT(!conn->ksnc_rx_scheduled);
1530         LASSERT(list_empty(&conn->ksnc_tx_queue));
1531
1532         /* complete current receive if any */
1533         switch (conn->ksnc_rx_state) {
1534         case SOCKNAL_RX_LNET_PAYLOAD:
1535                 last_rcv = conn->ksnc_rx_deadline -
1536                            ksocknal_timeout();
1537                 CERROR("Completing partial receive from %s[%d], ip %pISp, with error, wanted: %d, left: %d, last alive is %lld secs ago\n",
1538                        libcfs_idstr(&conn->ksnc_peer->ksnp_id),
1539                        conn->ksnc_type,
1540                        &conn->ksnc_peeraddr,
1541                        conn->ksnc_rx_nob_wanted, conn->ksnc_rx_nob_left,
1542                        ktime_get_seconds() - last_rcv);
1543                 if (conn->ksnc_lnet_msg)
1544                         conn->ksnc_lnet_msg->msg_health_status =
1545                                 LNET_MSG_STATUS_REMOTE_ERROR;
1546                 lnet_finalize(conn->ksnc_lnet_msg, -EIO);
1547                 break;
1548         case SOCKNAL_RX_LNET_HEADER:
1549                 if (conn->ksnc_rx_started)
1550                         CERROR("Incomplete receive of lnet header from %s, ip %pISp, with error, protocol: %d.x.\n",
1551                                libcfs_idstr(&conn->ksnc_peer->ksnp_id),
1552                                &conn->ksnc_peeraddr,
1553                                conn->ksnc_proto->pro_version);
1554                 break;
1555         case SOCKNAL_RX_KSM_HEADER:
1556                 if (conn->ksnc_rx_started)
1557                         CERROR("Incomplete receive of ksock message from %s, ip %pISp, with error, protocol: %d.x.\n",
1558                                libcfs_idstr(&conn->ksnc_peer->ksnp_id),
1559                                &conn->ksnc_peeraddr,
1560                                conn->ksnc_proto->pro_version);
1561                 break;
1562         case SOCKNAL_RX_SLOP:
1563                 if (conn->ksnc_rx_started)
1564                         CERROR("Incomplete receive of slops from %s, ip %pISp, with error\n",
1565                                libcfs_idstr(&conn->ksnc_peer->ksnp_id),
1566                                &conn->ksnc_peeraddr);
1567                 break;
1568         default:
1569                 LBUG();
1570                 break;
1571         }
1572
1573         ksocknal_peer_decref(conn->ksnc_peer);
1574
1575         LIBCFS_FREE(conn, sizeof(*conn));
1576 }
1577
1578 int
1579 ksocknal_close_peer_conns_locked(struct ksock_peer_ni *peer_ni,
1580                                  struct sockaddr *addr, int why)
1581 {
1582         struct ksock_conn *conn;
1583         struct ksock_conn *cnxt;
1584         int count = 0;
1585
1586         list_for_each_entry_safe(conn, cnxt, &peer_ni->ksnp_conns, ksnc_list) {
1587                 if (!addr ||
1588                     rpc_cmp_addr(addr,
1589                                  (struct sockaddr *)&conn->ksnc_peeraddr)) {
1590                         count++;
1591                         ksocknal_close_conn_locked(conn, why);
1592                 }
1593         }
1594
1595         return count;
1596 }
1597
1598 int
1599 ksocknal_close_conn_and_siblings(struct ksock_conn *conn, int why)
1600 {
1601         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1602         int count;
1603
1604         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1605
1606         count = ksocknal_close_peer_conns_locked(
1607                 peer_ni, (struct sockaddr *)&conn->ksnc_peeraddr, why);
1608
1609         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1610
1611         return count;
1612 }
1613
1614 int
1615 ksocknal_close_matching_conns(struct lnet_processid *id, __u32 ipaddr)
1616 {
1617         struct ksock_peer_ni *peer_ni;
1618         struct hlist_node *pnxt;
1619         int lo;
1620         int hi;
1621         int i;
1622         int count = 0;
1623         struct sockaddr_in sa = {.sin_family = AF_INET};
1624
1625         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1626
1627         if (!LNET_NID_IS_ANY(&id->nid)) {
1628                 lo = hash_min(nidhash(&id->nid),
1629                               HASH_BITS(ksocknal_data.ksnd_peers));
1630                 hi = lo;
1631         } else {
1632                 lo = 0;
1633                 hi = HASH_SIZE(ksocknal_data.ksnd_peers) - 1;
1634         }
1635
1636         sa.sin_addr.s_addr = htonl(ipaddr);
1637         for (i = lo; i <= hi; i++) {
1638                 hlist_for_each_entry_safe(peer_ni, pnxt,
1639                                           &ksocknal_data.ksnd_peers[i],
1640                                           ksnp_list) {
1641
1642                         if (!((LNET_NID_IS_ANY(&id->nid) ||
1643                                nid_same(&id->nid, &peer_ni->ksnp_id.nid)) &&
1644                               (id->pid == LNET_PID_ANY ||
1645                                id->pid == peer_ni->ksnp_id.pid)))
1646                                 continue;
1647
1648                         count += ksocknal_close_peer_conns_locked(
1649                                 peer_ni,
1650                                 ipaddr ? (struct sockaddr *)&sa : NULL, 0);
1651                 }
1652         }
1653
1654         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1655
1656         /* wildcards always succeed */
1657         if (LNET_NID_IS_ANY(&id->nid) || id->pid == LNET_PID_ANY ||
1658             ipaddr == 0)
1659                 return 0;
1660
1661         return (count == 0 ? -ENOENT : 0);
1662 }
1663
1664 void
1665 ksocknal_notify_gw_down(struct lnet_nid *gw_nid)
1666 {
1667         /* The router is telling me she's been notified of a change in
1668          * gateway state....
1669          */
1670         struct lnet_processid id = {
1671                 .pid    = LNET_PID_ANY,
1672                 .nid    = *gw_nid,
1673         };
1674
1675         CDEBUG(D_NET, "gw %s down\n", libcfs_nidstr(gw_nid));
1676
1677         /* If the gateway crashed, close all open connections... */
1678         ksocknal_close_matching_conns(&id, 0);
1679         return;
1680
1681         /* We can only establish new connections
1682          * if we have autroutes, and these connect on demand.
1683          */
1684 }
1685
1686 static void
1687 ksocknal_push_peer(struct ksock_peer_ni *peer_ni)
1688 {
1689         int index;
1690         int i;
1691         struct ksock_conn *conn;
1692
1693         for (index = 0; ; index++) {
1694                 read_lock(&ksocknal_data.ksnd_global_lock);
1695
1696                 i = 0;
1697                 conn = NULL;
1698
1699                 list_for_each_entry(conn, &peer_ni->ksnp_conns, ksnc_list) {
1700                         if (i++ == index) {
1701                                 ksocknal_conn_addref(conn);
1702                                 break;
1703                         }
1704                 }
1705
1706                 read_unlock(&ksocknal_data.ksnd_global_lock);
1707
1708                 if (i <= index)
1709                         break;
1710
1711                 ksocknal_lib_push_conn (conn);
1712                 ksocknal_conn_decref(conn);
1713         }
1714 }
1715
1716 static int
1717 ksocknal_push(struct lnet_ni *ni, struct lnet_processid *id)
1718 {
1719         int lo;
1720         int hi;
1721         int bkt;
1722         int rc = -ENOENT;
1723
1724         if (!LNET_NID_IS_ANY(&id->nid)) {
1725                 lo = hash_min(nidhash(&id->nid),
1726                               HASH_BITS(ksocknal_data.ksnd_peers));
1727                 hi = lo;
1728         } else {
1729                 lo = 0;
1730                 hi = HASH_SIZE(ksocknal_data.ksnd_peers) - 1;
1731         }
1732
1733         for (bkt = lo; bkt <= hi; bkt++) {
1734                 int peer_off; /* searching offset in peer_ni hash table */
1735
1736                 for (peer_off = 0; ; peer_off++) {
1737                         struct ksock_peer_ni *peer_ni;
1738                         int           i = 0;
1739
1740                         read_lock(&ksocknal_data.ksnd_global_lock);
1741                         hlist_for_each_entry(peer_ni,
1742                                              &ksocknal_data.ksnd_peers[bkt],
1743                                              ksnp_list) {
1744                                 if (!((LNET_NID_IS_ANY(&id->nid) ||
1745                                        nid_same(&id->nid,
1746                                                  &peer_ni->ksnp_id.nid)) &&
1747                                       (id->pid == LNET_PID_ANY ||
1748                                        id->pid == peer_ni->ksnp_id.pid)))
1749                                         continue;
1750
1751                                 if (i++ == peer_off) {
1752                                         ksocknal_peer_addref(peer_ni);
1753                                         break;
1754                                 }
1755                         }
1756                         read_unlock(&ksocknal_data.ksnd_global_lock);
1757
1758                         if (i <= peer_off) /* no match */
1759                                 break;
1760
1761                         rc = 0;
1762                         ksocknal_push_peer(peer_ni);
1763                         ksocknal_peer_decref(peer_ni);
1764                 }
1765         }
1766         return rc;
1767 }
1768
1769 int
1770 ksocknal_ctl(struct lnet_ni *ni, unsigned int cmd, void *arg)
1771 {
1772         struct lnet_processid id = {};
1773         struct libcfs_ioctl_data *data = arg;
1774         int rc;
1775
1776         switch(cmd) {
1777         case IOC_LIBCFS_GET_INTERFACE: {
1778                 struct ksock_net *net = ni->ni_data;
1779                 struct ksock_interface *iface;
1780                 struct sockaddr_in *sa;
1781
1782                 read_lock(&ksocknal_data.ksnd_global_lock);
1783
1784                 if (data->ioc_count >= 1) {
1785                         rc = -ENOENT;
1786                 } else {
1787                         rc = 0;
1788                         iface = &net->ksnn_interface;
1789
1790                         sa = (void *)&iface->ksni_addr;
1791                         if (sa->sin_family == AF_INET)
1792                                 data->ioc_u32[0] = ntohl(sa->sin_addr.s_addr);
1793                         else
1794                                 data->ioc_u32[0] = 0xFFFFFFFF;
1795                         data->ioc_u32[1] = iface->ksni_netmask;
1796                         data->ioc_u32[2] = iface->ksni_npeers;
1797                         data->ioc_u32[3] = iface->ksni_nroutes;
1798                 }
1799
1800                 read_unlock(&ksocknal_data.ksnd_global_lock);
1801                 return rc;
1802         }
1803
1804         case IOC_LIBCFS_GET_PEER: {
1805                 __u32 myip = 0;
1806                 __u32 ip = 0;
1807                 int port = 0;
1808                 int conn_count = 0;
1809                 int share_count = 0;
1810
1811                 rc = ksocknal_get_peer_info(ni, data->ioc_count,
1812                                             &id, &myip, &ip, &port,
1813                                             &conn_count,  &share_count);
1814                 if (rc != 0)
1815                         return rc;
1816                 if (!nid_is_nid4(&id.nid))
1817                         return -EINVAL;
1818                 data->ioc_nid    = lnet_nid_to_nid4(&id.nid);
1819                 data->ioc_count  = share_count;
1820                 data->ioc_u32[0] = ip;
1821                 data->ioc_u32[1] = port;
1822                 data->ioc_u32[2] = myip;
1823                 data->ioc_u32[3] = conn_count;
1824                 data->ioc_u32[4] = id.pid;
1825                 return 0;
1826         }
1827
1828         case IOC_LIBCFS_ADD_PEER: {
1829                 struct sockaddr_in sa = {.sin_family = AF_INET};
1830
1831                 id.pid = LNET_PID_LUSTRE;
1832                 lnet_nid4_to_nid(data->ioc_nid, &id.nid);
1833                 sa.sin_addr.s_addr = htonl(data->ioc_u32[0]);
1834                 sa.sin_port = htons(data->ioc_u32[1]);
1835                 return ksocknal_add_peer(ni, &id, (struct sockaddr *)&sa);
1836         }
1837         case IOC_LIBCFS_DEL_PEER:
1838                 lnet_nid4_to_nid(data->ioc_nid, &id.nid);
1839                 id.pid = LNET_PID_ANY;
1840                 return ksocknal_del_peer(ni, &id);
1841
1842         case IOC_LIBCFS_GET_CONN: {
1843                 int           txmem;
1844                 int           rxmem;
1845                 int           nagle;
1846                 struct ksock_conn *conn = ksocknal_get_conn_by_idx(ni, data->ioc_count);
1847                 struct sockaddr_in *psa = (void *)&conn->ksnc_peeraddr;
1848                 struct sockaddr_in *mysa = (void *)&conn->ksnc_myaddr;
1849
1850                 if (conn == NULL)
1851                         return -ENOENT;
1852
1853                 ksocknal_lib_get_conn_tunables(conn, &txmem, &rxmem, &nagle);
1854
1855                 data->ioc_count = txmem;
1856                 data->ioc_nid = lnet_nid_to_nid4(&conn->ksnc_peer->ksnp_id.nid);
1857                 data->ioc_flags = nagle;
1858                 if (psa->sin_family == AF_INET)
1859                         data->ioc_u32[0] = ntohl(psa->sin_addr.s_addr);
1860                 else
1861                         data->ioc_u32[0] = 0xFFFFFFFF;
1862                 data->ioc_u32[1] = rpc_get_port((struct sockaddr *)
1863                                                 &conn->ksnc_peeraddr);
1864                 if (mysa->sin_family == AF_INET)
1865                         data->ioc_u32[2] = ntohl(mysa->sin_addr.s_addr);
1866                 else
1867                         data->ioc_u32[2] = 0xFFFFFFFF;
1868                 data->ioc_u32[3] = conn->ksnc_type;
1869                 data->ioc_u32[4] = conn->ksnc_scheduler->kss_cpt;
1870                 data->ioc_u32[5] = rxmem;
1871                 data->ioc_u32[6] = conn->ksnc_peer->ksnp_id.pid;
1872                 ksocknal_conn_decref(conn);
1873                 return 0;
1874         }
1875
1876         case IOC_LIBCFS_CLOSE_CONNECTION:
1877                 lnet_nid4_to_nid(data->ioc_nid, &id.nid);
1878                 id.pid = LNET_PID_ANY;
1879                 return ksocknal_close_matching_conns(&id,
1880                                                      data->ioc_u32[0]);
1881
1882         case IOC_LIBCFS_REGISTER_MYNID:
1883                 /* Ignore if this is a noop */
1884                 if (nid_is_nid4(&ni->ni_nid) &&
1885                     data->ioc_nid == lnet_nid_to_nid4(&ni->ni_nid))
1886                         return 0;
1887
1888                 CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID: %s(%s)\n",
1889                        libcfs_nid2str(data->ioc_nid),
1890                        libcfs_nidstr(&ni->ni_nid));
1891                 return -EINVAL;
1892
1893         case IOC_LIBCFS_PUSH_CONNECTION:
1894                 lnet_nid4_to_nid(data->ioc_nid, &id.nid);
1895                 id.pid = LNET_PID_ANY;
1896                 return ksocknal_push(ni, &id);
1897
1898         default:
1899                 return -EINVAL;
1900         }
1901         /* not reached */
1902 }
1903
1904 static void
1905 ksocknal_free_buffers (void)
1906 {
1907         LASSERT (atomic_read(&ksocknal_data.ksnd_nactive_txs) == 0);
1908
1909         if (ksocknal_data.ksnd_schedulers != NULL)
1910                 cfs_percpt_free(ksocknal_data.ksnd_schedulers);
1911
1912         spin_lock(&ksocknal_data.ksnd_tx_lock);
1913
1914         if (!list_empty(&ksocknal_data.ksnd_idle_noop_txs)) {
1915                 LIST_HEAD(zlist);
1916                 struct ksock_tx *tx;
1917
1918                 list_splice_init(&ksocknal_data.ksnd_idle_noop_txs, &zlist);
1919                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
1920
1921                 while ((tx = list_first_entry_or_null(&zlist, struct ksock_tx,
1922                                                       tx_list)) != NULL) {
1923                         list_del(&tx->tx_list);
1924                         LIBCFS_FREE(tx, tx->tx_desc_size);
1925                 }
1926         } else {
1927                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
1928         }
1929 }
1930
1931 static int ksocknal_get_link_status(struct net_device *dev)
1932 {
1933         int ret = -1;
1934
1935         LASSERT(dev);
1936
1937         if (!netif_running(dev)) {
1938                 ret = 0;
1939                 CDEBUG(D_NET, "device not running\n");
1940         }
1941         /* Some devices may not be providing link settings */
1942         else if (dev->ethtool_ops->get_link) {
1943                 ret = dev->ethtool_ops->get_link(dev);
1944                 CDEBUG(D_NET, "get_link returns %u\n", ret);
1945         }
1946
1947         return ret;
1948 }
1949
1950 static int
1951 ksocknal_handle_link_state_change(struct net_device *dev,
1952                                   unsigned char operstate)
1953 {
1954         struct lnet_ni *ni = NULL;
1955         struct ksock_net *net;
1956         struct ksock_net *cnxt;
1957         int ifindex;
1958         unsigned char link_down = !(operstate == IF_OPER_UP);
1959         struct in_device *in_dev;
1960         bool found_ip = false;
1961         struct ksock_interface *ksi = NULL;
1962         struct sockaddr_in *sa;
1963         DECLARE_CONST_IN_IFADDR(ifa);
1964
1965         ifindex = dev->ifindex;
1966
1967         if (!ksocknal_data.ksnd_nnets)
1968                 goto out;
1969
1970         list_for_each_entry_safe(net, cnxt, &ksocknal_data.ksnd_nets,
1971                                  ksnn_list) {
1972
1973                 ksi = &net->ksnn_interface;
1974                 sa = (void *)&ksi->ksni_addr;
1975                 found_ip = false;
1976
1977                 if (ksi->ksni_index != ifindex ||
1978                     strcmp(ksi->ksni_name, dev->name))
1979                         continue;
1980
1981                 ni = net->ksnn_ni;
1982
1983                 in_dev = __in_dev_get_rtnl(dev);
1984                 if (!in_dev) {
1985                         CDEBUG(D_NET, "Interface %s has no IPv4 status.\n",
1986                                dev->name);
1987                         CDEBUG(D_NET, "set link fatal state to 1\n");
1988                         atomic_set(&ni->ni_fatal_error_on, 1);
1989                         continue;
1990                 }
1991                 in_dev_for_each_ifa_rtnl(ifa, in_dev) {
1992                         if (sa->sin_addr.s_addr == ifa->ifa_local)
1993                                 found_ip = true;
1994                 }
1995                 endfor_ifa(in_dev);
1996
1997                 if (!found_ip) {
1998                         CDEBUG(D_NET, "Interface %s has no matching ip\n",
1999                                dev->name);
2000                         CDEBUG(D_NET, "set link fatal state to 1\n");
2001                         atomic_set(&ni->ni_fatal_error_on, 1);
2002                         continue;
2003                 }
2004
2005                 if (link_down) {
2006                         CDEBUG(D_NET, "set link fatal state to 1\n");
2007                         atomic_set(&ni->ni_fatal_error_on, link_down);
2008                 } else {
2009                         CDEBUG(D_NET, "set link fatal state to %u\n",
2010                                (ksocknal_get_link_status(dev) == 0));
2011                         atomic_set(&ni->ni_fatal_error_on,
2012                                    (ksocknal_get_link_status(dev) == 0));
2013                 }
2014         }
2015 out:
2016         return 0;
2017 }
2018
2019
2020 static int
2021 ksocknal_handle_inetaddr_change(struct in_ifaddr *ifa, unsigned long event)
2022 {
2023         struct lnet_ni *ni;
2024         struct ksock_net *net;
2025         struct ksock_net *cnxt;
2026         struct net_device *event_netdev = ifa->ifa_dev->dev;
2027         int ifindex;
2028         struct ksock_interface *ksi = NULL;
2029         struct sockaddr_in *sa;
2030
2031         if (!ksocknal_data.ksnd_nnets)
2032                 goto out;
2033
2034         ifindex = event_netdev->ifindex;
2035
2036         list_for_each_entry_safe(net, cnxt, &ksocknal_data.ksnd_nets,
2037                                  ksnn_list) {
2038
2039                 ksi = &net->ksnn_interface;
2040                 sa = (void *)&ksi->ksni_addr;
2041
2042                 if (ksi->ksni_index != ifindex ||
2043                     strcmp(ksi->ksni_name, event_netdev->name))
2044                         continue;
2045
2046                 if (sa->sin_addr.s_addr == ifa->ifa_local) {
2047                         CDEBUG(D_NET, "set link fatal state to %u\n",
2048                                (event == NETDEV_DOWN));
2049                         ni = net->ksnn_ni;
2050                         atomic_set(&ni->ni_fatal_error_on,
2051                                    (event == NETDEV_DOWN));
2052                 }
2053         }
2054 out:
2055         return 0;
2056 }
2057
2058 /************************************
2059  * Net device notifier event handler
2060  ************************************/
2061 static int ksocknal_device_event(struct notifier_block *unused,
2062                                  unsigned long event, void *ptr)
2063 {
2064         struct net_device *dev = netdev_notifier_info_to_dev(ptr);
2065         unsigned char operstate;
2066
2067         operstate = dev->operstate;
2068
2069         CDEBUG(D_NET, "devevent: status=%ld, iface=%s ifindex %d state %u\n",
2070                event, dev->name, dev->ifindex, operstate);
2071
2072         switch (event) {
2073         case NETDEV_UP:
2074         case NETDEV_DOWN:
2075         case NETDEV_CHANGE:
2076                 ksocknal_handle_link_state_change(dev, operstate);
2077                 break;
2078         }
2079
2080         return NOTIFY_OK;
2081 }
2082
2083 /************************************
2084  * Inetaddr notifier event handler
2085  ************************************/
2086 static int ksocknal_inetaddr_event(struct notifier_block *unused,
2087                                    unsigned long event, void *ptr)
2088 {
2089         struct in_ifaddr *ifa = ptr;
2090
2091         CDEBUG(D_NET, "addrevent: status %ld ip addr %pI4, netmask %pI4.\n",
2092                event, &ifa->ifa_address, &ifa->ifa_mask);
2093
2094         switch (event) {
2095         case NETDEV_UP:
2096         case NETDEV_DOWN:
2097         case NETDEV_CHANGE:
2098                 ksocknal_handle_inetaddr_change(ifa, event);
2099                 break;
2100
2101         }
2102         return NOTIFY_OK;
2103 }
2104
2105 static struct notifier_block ksocknal_dev_notifier_block = {
2106         .notifier_call = ksocknal_device_event,
2107 };
2108
2109 static struct notifier_block ksocknal_inetaddr_notifier_block = {
2110         .notifier_call = ksocknal_inetaddr_event,
2111 };
2112
2113 static void
2114 ksocknal_base_shutdown(void)
2115 {
2116         struct ksock_sched *sched;
2117         struct ksock_peer_ni *peer_ni;
2118         int i;
2119
2120         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %lld\n",
2121                libcfs_kmem_read());
2122         LASSERT (ksocknal_data.ksnd_nnets == 0);
2123
2124         if (ksocknal_data.ksnd_init == SOCKNAL_INIT_ALL) {
2125                 unregister_netdevice_notifier(&ksocknal_dev_notifier_block);
2126                 unregister_inetaddr_notifier(&ksocknal_inetaddr_notifier_block);
2127         }
2128
2129         switch (ksocknal_data.ksnd_init) {
2130         default:
2131                 LASSERT(0);
2132                 fallthrough;
2133
2134         case SOCKNAL_INIT_ALL:
2135         case SOCKNAL_INIT_DATA:
2136                 hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list)
2137                         LASSERT(0);
2138
2139                 LASSERT(list_empty(&ksocknal_data.ksnd_nets));
2140                 LASSERT(list_empty(&ksocknal_data.ksnd_enomem_conns));
2141                 LASSERT(list_empty(&ksocknal_data.ksnd_zombie_conns));
2142                 LASSERT(list_empty(&ksocknal_data.ksnd_connd_connreqs));
2143                 LASSERT(list_empty(&ksocknal_data.ksnd_connd_routes));
2144
2145                 if (ksocknal_data.ksnd_schedulers != NULL) {
2146                         cfs_percpt_for_each(sched, i,
2147                                             ksocknal_data.ksnd_schedulers) {
2148
2149                                 LASSERT(list_empty(&sched->kss_tx_conns));
2150                                 LASSERT(list_empty(&sched->kss_rx_conns));
2151                                 LASSERT(list_empty(&sched->kss_zombie_noop_txs));
2152                                 LASSERT(sched->kss_nconns == 0);
2153                         }
2154                 }
2155
2156                 /* flag threads to terminate; wake and wait for them to die */
2157                 ksocknal_data.ksnd_shuttingdown = 1;
2158                 wake_up_all(&ksocknal_data.ksnd_connd_waitq);
2159                 wake_up(&ksocknal_data.ksnd_reaper_waitq);
2160
2161                 if (ksocknal_data.ksnd_schedulers != NULL) {
2162                         cfs_percpt_for_each(sched, i,
2163                                             ksocknal_data.ksnd_schedulers)
2164                                         wake_up_all(&sched->kss_waitq);
2165                 }
2166
2167                 wait_var_event_warning(&ksocknal_data.ksnd_nthreads,
2168                                        atomic_read(&ksocknal_data.ksnd_nthreads) == 0,
2169                                        "waiting for %d threads to terminate\n",
2170                                        atomic_read(&ksocknal_data.ksnd_nthreads));
2171
2172                 ksocknal_free_buffers();
2173
2174                 ksocknal_data.ksnd_init = SOCKNAL_INIT_NOTHING;
2175                 break;
2176         }
2177
2178         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %lld\n",
2179                libcfs_kmem_read());
2180
2181         module_put(THIS_MODULE);
2182 }
2183
2184 static int
2185 ksocknal_base_startup(void)
2186 {
2187         struct ksock_sched *sched;
2188         int rc;
2189         int i;
2190
2191         LASSERT(ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
2192         LASSERT(ksocknal_data.ksnd_nnets == 0);
2193
2194         memset(&ksocknal_data, 0, sizeof(ksocknal_data)); /* zero pointers */
2195
2196         hash_init(ksocknal_data.ksnd_peers);
2197
2198         rwlock_init(&ksocknal_data.ksnd_global_lock);
2199         INIT_LIST_HEAD(&ksocknal_data.ksnd_nets);
2200
2201         spin_lock_init(&ksocknal_data.ksnd_reaper_lock);
2202         INIT_LIST_HEAD(&ksocknal_data.ksnd_enomem_conns);
2203         INIT_LIST_HEAD(&ksocknal_data.ksnd_zombie_conns);
2204         INIT_LIST_HEAD(&ksocknal_data.ksnd_deathrow_conns);
2205         init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
2206
2207         spin_lock_init(&ksocknal_data.ksnd_connd_lock);
2208         INIT_LIST_HEAD(&ksocknal_data.ksnd_connd_connreqs);
2209         INIT_LIST_HEAD(&ksocknal_data.ksnd_connd_routes);
2210         init_waitqueue_head(&ksocknal_data.ksnd_connd_waitq);
2211
2212         spin_lock_init(&ksocknal_data.ksnd_tx_lock);
2213         INIT_LIST_HEAD(&ksocknal_data.ksnd_idle_noop_txs);
2214
2215         /* NB memset above zeros whole of ksocknal_data */
2216
2217         /* flag lists/ptrs/locks initialised */
2218         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
2219         if (!try_module_get(THIS_MODULE))
2220                 goto failed;
2221
2222         /* Create a scheduler block per available CPT */
2223         ksocknal_data.ksnd_schedulers = cfs_percpt_alloc(lnet_cpt_table(),
2224                                                          sizeof(*sched));
2225         if (ksocknal_data.ksnd_schedulers == NULL)
2226                 goto failed;
2227
2228         cfs_percpt_for_each(sched, i, ksocknal_data.ksnd_schedulers) {
2229                 int nthrs;
2230
2231                 /*
2232                  * make sure not to allocate more threads than there are
2233                  * cores/CPUs in teh CPT
2234                  */
2235                 nthrs = cfs_cpt_weight(lnet_cpt_table(), i);
2236                 if (*ksocknal_tunables.ksnd_nscheds > 0) {
2237                         nthrs = min(nthrs, *ksocknal_tunables.ksnd_nscheds);
2238                 } else {
2239                         /*
2240                          * max to half of CPUs, assume another half should be
2241                          * reserved for upper layer modules
2242                          */
2243                         nthrs = min(max(SOCKNAL_NSCHEDS, nthrs >> 1), nthrs);
2244                 }
2245
2246                 sched->kss_nthreads_max = nthrs;
2247                 sched->kss_cpt = i;
2248
2249                 spin_lock_init(&sched->kss_lock);
2250                 INIT_LIST_HEAD(&sched->kss_rx_conns);
2251                 INIT_LIST_HEAD(&sched->kss_tx_conns);
2252                 INIT_LIST_HEAD(&sched->kss_zombie_noop_txs);
2253                 init_waitqueue_head(&sched->kss_waitq);
2254         }
2255
2256         ksocknal_data.ksnd_connd_starting         = 0;
2257         ksocknal_data.ksnd_connd_failed_stamp     = 0;
2258         ksocknal_data.ksnd_connd_starting_stamp   = ktime_get_real_seconds();
2259         /* must have at least 2 connds to remain responsive to accepts while
2260          * connecting */
2261         if (*ksocknal_tunables.ksnd_nconnds < SOCKNAL_CONND_RESV + 1)
2262                 *ksocknal_tunables.ksnd_nconnds = SOCKNAL_CONND_RESV + 1;
2263
2264         if (*ksocknal_tunables.ksnd_nconnds_max <
2265             *ksocknal_tunables.ksnd_nconnds) {
2266                 ksocknal_tunables.ksnd_nconnds_max =
2267                         ksocknal_tunables.ksnd_nconnds;
2268         }
2269
2270         for (i = 0; i < *ksocknal_tunables.ksnd_nconnds; i++) {
2271                 spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
2272                 ksocknal_data.ksnd_connd_starting++;
2273                 spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
2274
2275                 rc = ksocknal_thread_start(ksocknal_connd,
2276                                            (void *)((uintptr_t)i),
2277                                            "socknal_cd%02d", i);
2278                 if (rc != 0) {
2279                         spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
2280                         ksocknal_data.ksnd_connd_starting--;
2281                         spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
2282                         CERROR("Can't spawn socknal connd: %d\n", rc);
2283                         goto failed;
2284                 }
2285         }
2286
2287         rc = ksocknal_thread_start(ksocknal_reaper, NULL, "socknal_reaper");
2288         if (rc != 0) {
2289                 CERROR ("Can't spawn socknal reaper: %d\n", rc);
2290                 goto failed;
2291         }
2292
2293         register_netdevice_notifier(&ksocknal_dev_notifier_block);
2294         register_inetaddr_notifier(&ksocknal_inetaddr_notifier_block);
2295
2296         /* flag everything initialised */
2297         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
2298
2299         return 0;
2300
2301  failed:
2302         ksocknal_base_shutdown();
2303         return -ENETDOWN;
2304 }
2305
2306 static int
2307 ksocknal_debug_peerhash(struct lnet_ni *ni)
2308 {
2309         struct ksock_peer_ni *peer_ni;
2310         int i;
2311
2312         read_lock(&ksocknal_data.ksnd_global_lock);
2313
2314         hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list) {
2315                 struct ksock_conn_cb *conn_cb;
2316                 struct ksock_conn *conn;
2317
2318                 if (peer_ni->ksnp_ni != ni)
2319                         continue;
2320
2321                 CWARN("Active peer_ni on shutdown: %s, ref %d, closing %d, accepting %d, err %d, zcookie %llu, txq %d, zc_req %d\n",
2322                       libcfs_idstr(&peer_ni->ksnp_id),
2323                       refcount_read(&peer_ni->ksnp_refcount),
2324                       peer_ni->ksnp_closing,
2325                       peer_ni->ksnp_accepting, peer_ni->ksnp_error,
2326                       peer_ni->ksnp_zc_next_cookie,
2327                       !list_empty(&peer_ni->ksnp_tx_queue),
2328                       !list_empty(&peer_ni->ksnp_zc_req_list));
2329
2330                 conn_cb = peer_ni->ksnp_conn_cb;
2331                 if (conn_cb) {
2332                         CWARN("ConnCB: ref %d, schd %d, conn %d, cnted %d, del %d\n",
2333                               refcount_read(&conn_cb->ksnr_refcount),
2334                               conn_cb->ksnr_scheduled, conn_cb->ksnr_connecting,
2335                               conn_cb->ksnr_connected, conn_cb->ksnr_deleted);
2336                 }
2337
2338                 list_for_each_entry(conn, &peer_ni->ksnp_conns, ksnc_list) {
2339                         CWARN("Conn: ref %d, sref %d, t %d, c %d\n",
2340                               refcount_read(&conn->ksnc_conn_refcount),
2341                               refcount_read(&conn->ksnc_sock_refcount),
2342                               conn->ksnc_type, conn->ksnc_closing);
2343                 }
2344                 break;
2345         }
2346
2347         read_unlock(&ksocknal_data.ksnd_global_lock);
2348         return 0;
2349 }
2350
2351 void
2352 ksocknal_shutdown(struct lnet_ni *ni)
2353 {
2354         struct ksock_net *net = ni->ni_data;
2355
2356         LASSERT(ksocknal_data.ksnd_init == SOCKNAL_INIT_ALL);
2357         LASSERT(ksocknal_data.ksnd_nnets > 0);
2358
2359         /* prevent new peers */
2360         atomic_add(SOCKNAL_SHUTDOWN_BIAS, &net->ksnn_npeers);
2361
2362         /* Delete all peers */
2363         ksocknal_del_peer(ni, NULL);
2364
2365         /* Wait for all peer_ni state to clean up */
2366         wait_var_event_warning(&net->ksnn_npeers,
2367                                atomic_read(&net->ksnn_npeers) ==
2368                                SOCKNAL_SHUTDOWN_BIAS,
2369                                "waiting for %d peers to disconnect\n",
2370                                ksocknal_debug_peerhash(ni) +
2371                                atomic_read(&net->ksnn_npeers) -
2372                                SOCKNAL_SHUTDOWN_BIAS);
2373
2374         LASSERT(net->ksnn_interface.ksni_npeers == 0);
2375         LASSERT(net->ksnn_interface.ksni_nroutes == 0);
2376
2377         list_del(&net->ksnn_list);
2378         LIBCFS_FREE(net, sizeof(*net));
2379
2380         ksocknal_data.ksnd_nnets--;
2381         if (ksocknal_data.ksnd_nnets == 0)
2382                 ksocknal_base_shutdown();
2383 }
2384
2385 static int
2386 ksocknal_search_new_ipif(struct ksock_net *net)
2387 {
2388         int new_ipif = 0;
2389         char *ifnam = &net->ksnn_interface.ksni_name[0];
2390         char *colon = strchr(ifnam, ':');
2391         bool found = false;
2392         struct ksock_net *tmp;
2393
2394         if (colon != NULL)
2395                 *colon = 0;
2396
2397         list_for_each_entry(tmp, &ksocknal_data.ksnd_nets, ksnn_list) {
2398                 char *ifnam2 = &tmp->ksnn_interface.ksni_name[0];
2399                 char *colon2 = strchr(ifnam2, ':');
2400
2401                 if (colon2 != NULL)
2402                         *colon2 = 0;
2403
2404                 found = strcmp(ifnam, ifnam2) == 0;
2405                 if (colon2 != NULL)
2406                         *colon2 = ':';
2407         }
2408
2409         new_ipif += !found;
2410         if (colon != NULL)
2411                 *colon = ':';
2412
2413         return new_ipif;
2414 }
2415
2416 static int
2417 ksocknal_start_schedulers(struct ksock_sched *sched)
2418 {
2419         int     nthrs;
2420         int     rc = 0;
2421         int     i;
2422
2423         if (sched->kss_nthreads == 0) {
2424                 if (*ksocknal_tunables.ksnd_nscheds > 0) {
2425                         nthrs = sched->kss_nthreads_max;
2426                 } else {
2427                         nthrs = cfs_cpt_weight(lnet_cpt_table(),
2428                                                sched->kss_cpt);
2429                         nthrs = min(max(SOCKNAL_NSCHEDS, nthrs >> 1), nthrs);
2430                         nthrs = min(SOCKNAL_NSCHEDS_HIGH, nthrs);
2431                 }
2432                 nthrs = min(nthrs, sched->kss_nthreads_max);
2433         } else {
2434                 LASSERT(sched->kss_nthreads <= sched->kss_nthreads_max);
2435                 /* increase two threads if there is new interface */
2436                 nthrs = min(2, sched->kss_nthreads_max - sched->kss_nthreads);
2437         }
2438
2439         for (i = 0; i < nthrs; i++) {
2440                 long id;
2441
2442                 id = KSOCK_THREAD_ID(sched->kss_cpt, sched->kss_nthreads + i);
2443                 rc = ksocknal_thread_start(ksocknal_scheduler, (void *)id,
2444                                            "socknal_sd%02d_%02d",
2445                                            sched->kss_cpt,
2446                                            (int)KSOCK_THREAD_SID(id));
2447                 if (rc == 0)
2448                         continue;
2449
2450                 CERROR("Can't spawn thread %d for scheduler[%d]: %d\n",
2451                        sched->kss_cpt, (int) KSOCK_THREAD_SID(id), rc);
2452                 break;
2453         }
2454
2455         sched->kss_nthreads += i;
2456         return rc;
2457 }
2458
2459 static int
2460 ksocknal_net_start_threads(struct ksock_net *net, __u32 *cpts, int ncpts)
2461 {
2462         int newif = ksocknal_search_new_ipif(net);
2463         int rc;
2464         int i;
2465
2466         if (ncpts > 0 && ncpts > cfs_cpt_number(lnet_cpt_table()))
2467                 return -EINVAL;
2468
2469         for (i = 0; i < ncpts; i++) {
2470                 struct ksock_sched *sched;
2471                 int cpt = (cpts == NULL) ? i : cpts[i];
2472
2473                 LASSERT(cpt < cfs_cpt_number(lnet_cpt_table()));
2474                 sched = ksocknal_data.ksnd_schedulers[cpt];
2475
2476                 if (!newif && sched->kss_nthreads > 0)
2477                         continue;
2478
2479                 rc = ksocknal_start_schedulers(sched);
2480                 if (rc != 0)
2481                         return rc;
2482         }
2483         return 0;
2484 }
2485
2486 int
2487 ksocknal_startup(struct lnet_ni *ni)
2488 {
2489         struct ksock_net *net;
2490         struct ksock_interface *ksi = NULL;
2491         struct lnet_inetdev *ifaces = NULL;
2492         struct sockaddr_in *sa;
2493         int i = 0;
2494         int rc;
2495
2496         LASSERT (ni->ni_net->net_lnd == &the_ksocklnd);
2497         if (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING) {
2498                 rc = ksocknal_base_startup();
2499                 if (rc != 0)
2500                         return rc;
2501         }
2502         LIBCFS_ALLOC(net, sizeof(*net));
2503         if (net == NULL)
2504                 goto fail_0;
2505         net->ksnn_incarnation = ktime_get_real_ns();
2506         ni->ni_data = net;
2507
2508         ksocknal_tunables_setup(ni);
2509
2510         rc = lnet_inet_enumerate(&ifaces, ni->ni_net_ns);
2511         if (rc < 0)
2512                 goto fail_1;
2513
2514         ksi = &net->ksnn_interface;
2515
2516         /* Use the first discovered interface or look in the list */
2517         if (ni->ni_interface) {
2518                 for (i = 0; i < rc; i++)
2519                         if (strcmp(ifaces[i].li_name, ni->ni_interface) == 0)
2520                                 break;
2521
2522                 /* ni_interfaces doesn't contain the interface we want */
2523                 if (i == rc) {
2524                         CERROR("ksocklnd: failed to find interface %s\n",
2525                                ni->ni_interface);
2526                         goto fail_1;
2527                 }
2528         }
2529
2530         ni->ni_dev_cpt = ifaces[i].li_cpt;
2531         ksi->ksni_index = ifaces[i].li_index;
2532         sa = (void *)&ksi->ksni_addr;
2533         memset(sa, 0, sizeof(*sa));
2534         sa->sin_family = AF_INET;
2535         sa->sin_addr.s_addr = htonl(ifaces[i].li_ipaddr);
2536         ksi->ksni_netmask = ifaces[i].li_netmask;
2537         strlcpy(ksi->ksni_name, ifaces[i].li_name, sizeof(ksi->ksni_name));
2538
2539         /* call it before add it to ksocknal_data.ksnd_nets */
2540         rc = ksocknal_net_start_threads(net, ni->ni_cpts, ni->ni_ncpts);
2541         if (rc != 0)
2542                 goto fail_1;
2543
2544         LASSERT(ksi);
2545         LASSERT(ksi->ksni_addr.ss_family == AF_INET);
2546         ni->ni_nid.nid_addr[0] =
2547                 ((struct sockaddr_in *)&ksi->ksni_addr)->sin_addr.s_addr;
2548         list_add(&net->ksnn_list, &ksocknal_data.ksnd_nets);
2549         net->ksnn_ni = ni;
2550         ksocknal_data.ksnd_nnets++;
2551
2552         return 0;
2553
2554 fail_1:
2555         LIBCFS_FREE(net, sizeof(*net));
2556 fail_0:
2557         if (ksocknal_data.ksnd_nnets == 0)
2558                 ksocknal_base_shutdown();
2559
2560         return -ENETDOWN;
2561 }
2562
2563 static void __exit ksocklnd_exit(void)
2564 {
2565         lnet_unregister_lnd(&the_ksocklnd);
2566 }
2567
2568 static const struct lnet_lnd the_ksocklnd = {
2569         .lnd_type               = SOCKLND,
2570         .lnd_startup            = ksocknal_startup,
2571         .lnd_shutdown           = ksocknal_shutdown,
2572         .lnd_ctl                = ksocknal_ctl,
2573         .lnd_send               = ksocknal_send,
2574         .lnd_recv               = ksocknal_recv,
2575         .lnd_notify_peer_down   = ksocknal_notify_gw_down,
2576         .lnd_accept             = ksocknal_accept,
2577 };
2578
2579 static int __init ksocklnd_init(void)
2580 {
2581         int rc;
2582
2583         /* check ksnr_connected/connecting field large enough */
2584         BUILD_BUG_ON(SOCKLND_CONN_NTYPES > 4);
2585         BUILD_BUG_ON(SOCKLND_CONN_ACK != SOCKLND_CONN_BULK_IN);
2586
2587         rc = ksocknal_tunables_init();
2588         if (rc != 0)
2589                 return rc;
2590
2591         lnet_register_lnd(&the_ksocklnd);
2592
2593         return 0;
2594 }
2595
2596 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2597 MODULE_DESCRIPTION("TCP Socket LNet Network Driver");
2598 MODULE_VERSION("2.8.0");
2599 MODULE_LICENSE("GPL");
2600
2601 module_init(ksocklnd_init);
2602 module_exit(ksocklnd_exit);