Whamcloud - gitweb
LU-15860 socklnd: Duplicate ksock_conn_cb
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lnet/klnds/socklnd/socklnd.c
32  *
33  * Author: Zach Brown <zab@zabbo.net>
34  * Author: Peter J. Braam <braam@clusterfs.com>
35  * Author: Phil Schwan <phil@clusterfs.com>
36  * Author: Eric Barton <eric@bartonsoftware.com>
37  */
38
39 #include <linux/ethtool.h>
40 #include <linux/inetdevice.h>
41 #include "socklnd.h"
42 #include <linux/sunrpc/addr.h>
43
44 static const struct lnet_lnd the_ksocklnd;
45 struct ksock_nal_data ksocknal_data;
46
47 static struct ksock_interface *
48 ksocknal_ip2iface(struct lnet_ni *ni, struct sockaddr *addr)
49 {
50         struct ksock_net *net = ni->ni_data;
51         struct ksock_interface *iface;
52
53         iface = &net->ksnn_interface;
54
55         if (rpc_cmp_addr((struct sockaddr *)&iface->ksni_addr, addr))
56                 return iface;
57
58         return NULL;
59 }
60
61 static struct ksock_interface *
62 ksocknal_index2iface(struct lnet_ni *ni, int index)
63 {
64         struct ksock_net *net = ni->ni_data;
65         struct ksock_interface *iface;
66
67         iface = &net->ksnn_interface;
68
69         if (iface->ksni_index == index)
70                 return iface;
71
72         return NULL;
73 }
74
75 static int ksocknal_ip2index(struct sockaddr *addr, struct lnet_ni *ni)
76 {
77         struct net_device *dev;
78         int ret = -1;
79         DECLARE_CONST_IN_IFADDR(ifa);
80
81         if (addr->sa_family != AF_INET)
82                 /* No IPv6 support yet */
83                 return ret;
84
85         rcu_read_lock();
86         for_each_netdev(ni->ni_net_ns, dev) {
87                 int flags = dev_get_flags(dev);
88                 struct in_device *in_dev;
89
90                 if (flags & IFF_LOOPBACK) /* skip the loopback IF */
91                         continue;
92
93                 if (!(flags & IFF_UP))
94                         continue;
95
96                 in_dev = __in_dev_get_rcu(dev);
97                 if (!in_dev)
98                         continue;
99
100                 in_dev_for_each_ifa_rcu(ifa, in_dev) {
101                         if (ifa->ifa_local ==
102                             ((struct sockaddr_in *)addr)->sin_addr.s_addr)
103                                 ret = dev->ifindex;
104                 }
105                 endfor_ifa(in_dev);
106                 if (ret >= 0)
107                         break;
108         }
109         rcu_read_unlock();
110
111         return ret;
112 }
113
114 static struct ksock_conn_cb *
115 ksocknal_create_conn_cb(struct sockaddr *addr)
116 {
117         struct ksock_conn_cb *conn_cb;
118
119         LIBCFS_ALLOC(conn_cb, sizeof(*conn_cb));
120         if (!conn_cb)
121                 return NULL;
122
123         refcount_set(&conn_cb->ksnr_refcount, 1);
124         conn_cb->ksnr_peer = NULL;
125         conn_cb->ksnr_retry_interval = 0;         /* OK to connect at any time */
126         rpc_copy_addr((struct sockaddr *)&conn_cb->ksnr_addr, addr);
127         rpc_set_port((struct sockaddr *)&conn_cb->ksnr_addr,
128                      rpc_get_port(addr));
129         conn_cb->ksnr_myiface = -1;
130         conn_cb->ksnr_scheduled = 0;
131         conn_cb->ksnr_connecting = 0;
132         conn_cb->ksnr_connected = 0;
133         conn_cb->ksnr_deleted = 0;
134         conn_cb->ksnr_conn_count = 0;
135         conn_cb->ksnr_ctrl_conn_count = 0;
136         conn_cb->ksnr_blki_conn_count = 0;
137         conn_cb->ksnr_blko_conn_count = 0;
138         conn_cb->ksnr_max_conns = 0;
139
140         return conn_cb;
141 }
142
143 void
144 ksocknal_destroy_conn_cb(struct ksock_conn_cb *conn_cb)
145 {
146         LASSERT(refcount_read(&conn_cb->ksnr_refcount) == 0);
147
148         if (conn_cb->ksnr_peer)
149                 ksocknal_peer_decref(conn_cb->ksnr_peer);
150
151         LIBCFS_FREE(conn_cb, sizeof(*conn_cb));
152 }
153
154 static struct ksock_peer_ni *
155 ksocknal_create_peer(struct lnet_ni *ni, struct lnet_processid *id)
156 {
157         int cpt = lnet_nid2cpt(&id->nid, ni);
158         struct ksock_net *net = ni->ni_data;
159         struct ksock_peer_ni *peer_ni;
160
161         LASSERT(!LNET_NID_IS_ANY(&id->nid));
162         LASSERT(id->pid != LNET_PID_ANY);
163         LASSERT(!in_interrupt());
164
165         if (!atomic_inc_unless_negative(&net->ksnn_npeers)) {
166                 CERROR("Can't create peer_ni: network shutdown\n");
167                 return ERR_PTR(-ESHUTDOWN);
168         }
169
170         LIBCFS_CPT_ALLOC(peer_ni, lnet_cpt_table(), cpt, sizeof(*peer_ni));
171         if (!peer_ni) {
172                 atomic_dec(&net->ksnn_npeers);
173                 return ERR_PTR(-ENOMEM);
174         }
175
176         peer_ni->ksnp_ni = ni;
177         peer_ni->ksnp_id = *id;
178         refcount_set(&peer_ni->ksnp_refcount, 1); /* 1 ref for caller */
179         peer_ni->ksnp_closing = 0;
180         peer_ni->ksnp_accepting = 0;
181         peer_ni->ksnp_proto = NULL;
182         peer_ni->ksnp_last_alive = 0;
183         peer_ni->ksnp_zc_next_cookie = SOCKNAL_KEEPALIVE_PING + 1;
184         peer_ni->ksnp_conn_cb = NULL;
185
186         INIT_LIST_HEAD(&peer_ni->ksnp_conns);
187         INIT_LIST_HEAD(&peer_ni->ksnp_tx_queue);
188         INIT_LIST_HEAD(&peer_ni->ksnp_zc_req_list);
189         spin_lock_init(&peer_ni->ksnp_lock);
190
191         return peer_ni;
192 }
193
194 void
195 ksocknal_destroy_peer(struct ksock_peer_ni *peer_ni)
196 {
197         struct ksock_net *net = peer_ni->ksnp_ni->ni_data;
198
199         CDEBUG (D_NET, "peer_ni %s %p deleted\n",
200                 libcfs_idstr(&peer_ni->ksnp_id), peer_ni);
201
202         LASSERT(refcount_read(&peer_ni->ksnp_refcount) == 0);
203         LASSERT(peer_ni->ksnp_accepting == 0);
204         LASSERT(list_empty(&peer_ni->ksnp_conns));
205         LASSERT(peer_ni->ksnp_conn_cb == NULL);
206         LASSERT(list_empty(&peer_ni->ksnp_tx_queue));
207         LASSERT(list_empty(&peer_ni->ksnp_zc_req_list));
208
209         LIBCFS_FREE(peer_ni, sizeof(*peer_ni));
210
211         /* NB a peer_ni's connections and conn_cb keep a reference on their
212          * peer_ni until they are destroyed, so we can be assured that _all_
213          * state to do with this peer_ni has been cleaned up when its refcount
214          * drops to zero.
215          */
216         if (atomic_dec_and_test(&net->ksnn_npeers))
217                 wake_up_var(&net->ksnn_npeers);
218 }
219
220 struct ksock_peer_ni *
221 ksocknal_find_peer_locked(struct lnet_ni *ni, struct lnet_processid *id)
222 {
223         struct ksock_peer_ni *peer_ni;
224         unsigned long hash = nidhash(&id->nid);
225
226         hash_for_each_possible(ksocknal_data.ksnd_peers, peer_ni,
227                                ksnp_list, hash) {
228                 LASSERT(!peer_ni->ksnp_closing);
229
230                 if (peer_ni->ksnp_ni != ni)
231                         continue;
232
233                 if (!nid_same(&peer_ni->ksnp_id.nid, &id->nid) ||
234                     peer_ni->ksnp_id.pid != id->pid)
235                         continue;
236
237                 CDEBUG(D_NET, "got peer_ni [%p] -> %s (%d)\n",
238                        peer_ni, libcfs_idstr(id),
239                        refcount_read(&peer_ni->ksnp_refcount));
240                 return peer_ni;
241         }
242         return NULL;
243 }
244
245 struct ksock_peer_ni *
246 ksocknal_find_peer(struct lnet_ni *ni, struct lnet_processid *id)
247 {
248         struct ksock_peer_ni *peer_ni;
249
250         read_lock(&ksocknal_data.ksnd_global_lock);
251         peer_ni = ksocknal_find_peer_locked(ni, id);
252         if (peer_ni != NULL)                    /* +1 ref for caller? */
253                 ksocknal_peer_addref(peer_ni);
254         read_unlock(&ksocknal_data.ksnd_global_lock);
255
256         return peer_ni;
257 }
258
259 static void
260 ksocknal_unlink_peer_locked(struct ksock_peer_ni *peer_ni)
261 {
262         int i;
263         struct ksock_interface *iface;
264
265         for (i = 0; i < peer_ni->ksnp_n_passive_ips; i++) {
266                 struct sockaddr_in sa = { .sin_family = AF_INET };
267                 LASSERT(i < LNET_INTERFACES_NUM);
268                 sa.sin_addr.s_addr = htonl(peer_ni->ksnp_passive_ips[i]);
269
270                 iface = ksocknal_ip2iface(peer_ni->ksnp_ni,
271                                           (struct sockaddr *)&sa);
272                 /*
273                  * All IPs in peer_ni->ksnp_passive_ips[] come from the
274                  * interface list, therefore the call must succeed.
275                  */
276                 LASSERT(iface != NULL);
277
278                 CDEBUG(D_NET, "peer_ni=%p iface=%p ksni_nroutes=%d\n",
279                        peer_ni, iface, iface->ksni_nroutes);
280                 iface->ksni_npeers--;
281         }
282
283         LASSERT(list_empty(&peer_ni->ksnp_conns));
284         LASSERT(peer_ni->ksnp_conn_cb == NULL);
285         LASSERT(!peer_ni->ksnp_closing);
286         peer_ni->ksnp_closing = 1;
287         hlist_del(&peer_ni->ksnp_list);
288         /* lose peerlist's ref */
289         ksocknal_peer_decref(peer_ni);
290 }
291
292 static int
293 ksocknal_get_peer_info(struct lnet_ni *ni, int index,
294                        struct lnet_processid *id, __u32 *myip, __u32 *peer_ip,
295                        int *port, int *conn_count, int *share_count)
296 {
297         struct ksock_peer_ni *peer_ni;
298         struct ksock_conn_cb *conn_cb;
299         int i;
300         int j;
301         int rc = -ENOENT;
302
303         read_lock(&ksocknal_data.ksnd_global_lock);
304
305         hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list) {
306
307                 if (peer_ni->ksnp_ni != ni)
308                         continue;
309
310                 if (peer_ni->ksnp_n_passive_ips == 0 &&
311                     peer_ni->ksnp_conn_cb == NULL) {
312                         if (index-- > 0)
313                                 continue;
314
315                         *id = peer_ni->ksnp_id;
316                         *myip = 0;
317                         *peer_ip = 0;
318                         *port = 0;
319                         *conn_count = 0;
320                         *share_count = 0;
321                         rc = 0;
322                         goto out;
323                 }
324
325                 for (j = 0; j < peer_ni->ksnp_n_passive_ips; j++) {
326                         if (index-- > 0)
327                                 continue;
328
329                         *id = peer_ni->ksnp_id;
330                         *myip = peer_ni->ksnp_passive_ips[j];
331                         *peer_ip = 0;
332                         *port = 0;
333                         *conn_count = 0;
334                         *share_count = 0;
335                         rc = 0;
336                         goto out;
337                 }
338
339                 if (peer_ni->ksnp_conn_cb) {
340                         if (index-- > 0)
341                                 continue;
342
343                         conn_cb = peer_ni->ksnp_conn_cb;
344
345                         *id = peer_ni->ksnp_id;
346                         if (conn_cb->ksnr_addr.ss_family == AF_INET) {
347                                 struct sockaddr_in *sa =
348                                         (void *)&conn_cb->ksnr_addr;
349
350                                 rc = choose_ipv4_src(myip,
351                                                      conn_cb->ksnr_myiface,
352                                                      ntohl(sa->sin_addr.s_addr),
353                                                      ni->ni_net_ns);
354                                 *peer_ip = ntohl(sa->sin_addr.s_addr);
355                                 *port = ntohs(sa->sin_port);
356                         } else {
357                                 *myip = 0xFFFFFFFF;
358                                 *peer_ip = 0xFFFFFFFF;
359                                 *port = 0;
360                                 rc = -ENOTSUPP;
361                         }
362                         *conn_count = conn_cb->ksnr_conn_count;
363                         *share_count = 1;
364                         goto out;
365                 }
366         }
367 out:
368         read_unlock(&ksocknal_data.ksnd_global_lock);
369         return rc;
370 }
371
372 static unsigned int
373 ksocknal_get_conn_count_by_type(struct ksock_conn_cb *conn_cb,
374                                 int type)
375 {
376         unsigned int count = 0;
377
378         switch (type) {
379         case SOCKLND_CONN_CONTROL:
380                 count = conn_cb->ksnr_ctrl_conn_count;
381                 break;
382         case SOCKLND_CONN_BULK_IN:
383                 count = conn_cb->ksnr_blki_conn_count;
384                 break;
385         case SOCKLND_CONN_BULK_OUT:
386                 count = conn_cb->ksnr_blko_conn_count;
387                 break;
388         case SOCKLND_CONN_ANY:
389                 count = conn_cb->ksnr_conn_count;
390                 break;
391         default:
392                 LBUG();
393                 break;
394         }
395
396         return count;
397 }
398
399 static unsigned int
400 ksocknal_get_conns_per_peer(struct ksock_peer_ni *peer_ni)
401 {
402         struct lnet_ni *ni = peer_ni->ksnp_ni;
403         struct lnet_ioctl_config_socklnd_tunables *tunables;
404
405         LASSERT(ni);
406
407         tunables = &ni->ni_lnd_tunables.lnd_tun_u.lnd_sock;
408
409         return tunables->lnd_conns_per_peer;
410 }
411
412 static void
413 ksocknal_incr_conn_count(struct ksock_conn_cb *conn_cb,
414                          int type)
415 {
416         conn_cb->ksnr_conn_count++;
417
418         /* check if all connections of the given type got created */
419         switch (type) {
420         case SOCKLND_CONN_CONTROL:
421                 conn_cb->ksnr_ctrl_conn_count++;
422                 /* there's a single control connection per peer,
423                  * two in case of loopback
424                  */
425                 conn_cb->ksnr_connected |= BIT(type);
426                 break;
427         case SOCKLND_CONN_BULK_IN:
428                 conn_cb->ksnr_blki_conn_count++;
429                 if (conn_cb->ksnr_blki_conn_count >= conn_cb->ksnr_max_conns)
430                         conn_cb->ksnr_connected |= BIT(type);
431                 break;
432         case SOCKLND_CONN_BULK_OUT:
433                 conn_cb->ksnr_blko_conn_count++;
434                 if (conn_cb->ksnr_blko_conn_count >= conn_cb->ksnr_max_conns)
435                         conn_cb->ksnr_connected |= BIT(type);
436                 break;
437         case SOCKLND_CONN_ANY:
438                 if (conn_cb->ksnr_conn_count >= conn_cb->ksnr_max_conns)
439                         conn_cb->ksnr_connected |= BIT(type);
440                 break;
441         default:
442                 LBUG();
443                 break;
444         }
445
446         CDEBUG(D_NET, "Add conn type %d, ksnr_connected %x ksnr_max_conns %d\n",
447                type, conn_cb->ksnr_connected, conn_cb->ksnr_max_conns);
448 }
449
450
451 static void
452 ksocknal_decr_conn_count(struct ksock_conn_cb *conn_cb,
453                          int type)
454 {
455         conn_cb->ksnr_conn_count--;
456
457         /* check if all connections of the given type got created */
458         switch (type) {
459         case SOCKLND_CONN_CONTROL:
460                 conn_cb->ksnr_ctrl_conn_count--;
461                 /* there's a single control connection per peer,
462                  * two in case of loopback
463                  */
464                 if (conn_cb->ksnr_ctrl_conn_count == 0)
465                         conn_cb->ksnr_connected &= ~BIT(type);
466                 break;
467         case SOCKLND_CONN_BULK_IN:
468                 conn_cb->ksnr_blki_conn_count--;
469                 if (conn_cb->ksnr_blki_conn_count < conn_cb->ksnr_max_conns)
470                         conn_cb->ksnr_connected &= ~BIT(type);
471                 break;
472         case SOCKLND_CONN_BULK_OUT:
473                 conn_cb->ksnr_blko_conn_count--;
474                 if (conn_cb->ksnr_blko_conn_count < conn_cb->ksnr_max_conns)
475                         conn_cb->ksnr_connected &= ~BIT(type);
476                 break;
477         case SOCKLND_CONN_ANY:
478                 if (conn_cb->ksnr_conn_count < conn_cb->ksnr_max_conns)
479                         conn_cb->ksnr_connected &= ~BIT(type);
480                 break;
481         default:
482                 LBUG();
483                 break;
484         }
485
486         CDEBUG(D_NET, "Del conn type %d, ksnr_connected %x ksnr_max_conns %d\n",
487                type, conn_cb->ksnr_connected, conn_cb->ksnr_max_conns);
488 }
489
490 static void
491 ksocknal_associate_cb_conn_locked(struct ksock_conn_cb *conn_cb,
492                                   struct ksock_conn *conn)
493 {
494         struct ksock_peer_ni *peer_ni = conn_cb->ksnr_peer;
495         int type = conn->ksnc_type;
496         struct ksock_interface *iface;
497         int conn_iface;
498
499         conn_iface = ksocknal_ip2index((struct sockaddr *)&conn->ksnc_myaddr,
500                                        peer_ni->ksnp_ni);
501         conn->ksnc_conn_cb = conn_cb;
502         ksocknal_conn_cb_addref(conn_cb);
503
504         if (conn_cb->ksnr_myiface != conn_iface) {
505                 if (conn_cb->ksnr_myiface < 0) {
506                         /* route wasn't bound locally yet (the initial route) */
507                         CDEBUG(D_NET, "Binding %s %pIS to interface %d\n",
508                                libcfs_idstr(&peer_ni->ksnp_id),
509                                &conn_cb->ksnr_addr,
510                                conn_iface);
511                 } else {
512                         CDEBUG(D_NET,
513                                "Rebinding %s %pIS from interface %d to %d\n",
514                                libcfs_idstr(&peer_ni->ksnp_id),
515                                &conn_cb->ksnr_addr,
516                                conn_cb->ksnr_myiface,
517                                conn_iface);
518
519                         iface = ksocknal_index2iface(peer_ni->ksnp_ni,
520                                                      conn_cb->ksnr_myiface);
521                         if (iface)
522                                 iface->ksni_nroutes--;
523                 }
524                 conn_cb->ksnr_myiface = conn_iface;
525                 iface = ksocknal_index2iface(peer_ni->ksnp_ni,
526                                              conn_cb->ksnr_myiface);
527                 if (iface)
528                         iface->ksni_nroutes++;
529         }
530
531         ksocknal_incr_conn_count(conn_cb, type);
532
533         /* Successful connection => further attempts can
534          * proceed immediately
535          */
536         conn_cb->ksnr_retry_interval = 0;
537 }
538
539 static void
540 ksocknal_add_conn_cb_locked(struct ksock_peer_ni *peer_ni,
541                             struct ksock_conn_cb *conn_cb)
542 {
543         struct ksock_conn *conn;
544         struct ksock_net *net = peer_ni->ksnp_ni->ni_data;
545
546         LASSERT(!peer_ni->ksnp_closing);
547         LASSERT(!conn_cb->ksnr_peer);
548         LASSERT(!conn_cb->ksnr_scheduled);
549         LASSERT(!conn_cb->ksnr_connecting);
550         LASSERT(conn_cb->ksnr_connected == 0);
551
552         conn_cb->ksnr_peer = peer_ni;
553         ksocknal_peer_addref(peer_ni);
554
555         /* set the conn_cb's interface to the current net's interface */
556         conn_cb->ksnr_myiface = net->ksnn_interface.ksni_index;
557         net->ksnn_interface.ksni_nroutes++;
558
559         /* peer_ni's route list takes over my ref on 'route' */
560         peer_ni->ksnp_conn_cb = conn_cb;
561
562         list_for_each_entry(conn, &peer_ni->ksnp_conns, ksnc_list) {
563                 if (!rpc_cmp_addr((struct sockaddr *)&conn->ksnc_peeraddr,
564                                   (struct sockaddr *)&conn_cb->ksnr_addr))
565                         continue;
566
567                 ksocknal_associate_cb_conn_locked(conn_cb, conn);
568                 /* keep going (typed conns) */
569         }
570 }
571
572 static void
573 ksocknal_del_conn_cb_locked(struct ksock_conn_cb *conn_cb)
574 {
575         struct ksock_peer_ni *peer_ni = conn_cb->ksnr_peer;
576         struct ksock_interface *iface;
577         struct ksock_conn *conn;
578         struct ksock_conn *cnxt;
579
580         LASSERT(!conn_cb->ksnr_deleted);
581
582         /* Close associated conns */
583         list_for_each_entry_safe(conn, cnxt, &peer_ni->ksnp_conns, ksnc_list) {
584                 if (conn->ksnc_conn_cb != conn_cb)
585                         continue;
586
587                 ksocknal_close_conn_locked(conn, 0);
588         }
589
590         if (conn_cb->ksnr_myiface >= 0) {
591                 iface = ksocknal_index2iface(peer_ni->ksnp_ni,
592                                              conn_cb->ksnr_myiface);
593                 if (iface)
594                         iface->ksni_nroutes--;
595         }
596
597         conn_cb->ksnr_deleted = 1;
598         ksocknal_conn_cb_decref(conn_cb);               /* drop peer_ni's ref */
599         peer_ni->ksnp_conn_cb = NULL;
600
601         if (list_empty(&peer_ni->ksnp_conns)) {
602                 /* I've just removed the last route to a peer_ni with no active
603                  * connections
604                  */
605                 ksocknal_unlink_peer_locked(peer_ni);
606         }
607 }
608
609 int
610 ksocknal_add_peer(struct lnet_ni *ni, struct lnet_processid *id,
611                   struct sockaddr *addr)
612 {
613         struct ksock_peer_ni *peer_ni;
614         struct ksock_peer_ni *peer2;
615         struct ksock_conn_cb *conn_cb;
616
617         if (LNET_NID_IS_ANY(&id->nid) ||
618             id->pid == LNET_PID_ANY)
619                 return (-EINVAL);
620
621         /* Have a brand new peer_ni ready... */
622         peer_ni = ksocknal_create_peer(ni, id);
623         if (IS_ERR(peer_ni))
624                 return PTR_ERR(peer_ni);
625
626         conn_cb = ksocknal_create_conn_cb(addr);
627         if (!conn_cb) {
628                 ksocknal_peer_decref(peer_ni);
629                 return -ENOMEM;
630         }
631
632         write_lock_bh(&ksocknal_data.ksnd_global_lock);
633
634         /* always called with a ref on ni, so shutdown can't have started */
635         LASSERT(atomic_read(&((struct ksock_net *)ni->ni_data)->ksnn_npeers)
636                 >= 0);
637
638         peer2 = ksocknal_find_peer_locked(ni, id);
639         if (peer2 != NULL) {
640                 ksocknal_peer_decref(peer_ni);
641                 peer_ni = peer2;
642         } else {
643                 /* peer_ni table takes my ref on peer_ni */
644                 hash_add(ksocknal_data.ksnd_peers, &peer_ni->ksnp_list,
645                          nidhash(&id->nid));
646         }
647
648         if (peer_ni->ksnp_conn_cb) {
649                 ksocknal_conn_cb_decref(conn_cb);
650         } else {
651                 ksocknal_add_conn_cb_locked(peer_ni, conn_cb);
652                 /* Remember conns_per_peer setting at the time
653                  * of connection initiation. It will define the
654                  * max number of conns per type for this conn_cb
655                  * while it's in use.
656                  */
657                 conn_cb->ksnr_max_conns = ksocknal_get_conns_per_peer(peer_ni);
658         }
659
660         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
661
662         return 0;
663 }
664
665 static void
666 ksocknal_del_peer_locked(struct ksock_peer_ni *peer_ni)
667 {
668         struct ksock_conn *conn;
669         struct ksock_conn *cnxt;
670         struct ksock_conn_cb *conn_cb;
671
672         LASSERT(!peer_ni->ksnp_closing);
673
674         /* Extra ref prevents peer_ni disappearing until I'm done with it */
675         ksocknal_peer_addref(peer_ni);
676         conn_cb = peer_ni->ksnp_conn_cb;
677         if (conn_cb)
678                 ksocknal_del_conn_cb_locked(conn_cb);
679
680         list_for_each_entry_safe(conn, cnxt, &peer_ni->ksnp_conns,
681                                  ksnc_list)
682                 ksocknal_close_conn_locked(conn, 0);
683
684         ksocknal_peer_decref(peer_ni);
685         /* NB peer_ni unlinks itself when last conn/conn_cb is removed */
686 }
687
688 static int
689 ksocknal_del_peer(struct lnet_ni *ni, struct lnet_processid *id)
690 {
691         LIST_HEAD(zombies);
692         struct hlist_node *pnxt;
693         struct ksock_peer_ni *peer_ni;
694         int lo;
695         int hi;
696         int i;
697         int rc = -ENOENT;
698
699         write_lock_bh(&ksocknal_data.ksnd_global_lock);
700
701         if (id && !LNET_NID_IS_ANY(&id->nid)) {
702                 lo = hash_min(nidhash(&id->nid),
703                               HASH_BITS(ksocknal_data.ksnd_peers));
704                 hi = lo;
705         } else {
706                 lo = 0;
707                 hi = HASH_SIZE(ksocknal_data.ksnd_peers) - 1;
708         }
709
710         for (i = lo; i <= hi; i++) {
711                 hlist_for_each_entry_safe(peer_ni, pnxt,
712                                           &ksocknal_data.ksnd_peers[i],
713                                           ksnp_list) {
714                         if (peer_ni->ksnp_ni != ni)
715                                 continue;
716
717                         if (!((!id || LNET_NID_IS_ANY(&id->nid) ||
718                                nid_same(&peer_ni->ksnp_id.nid, &id->nid)) &&
719                               (!id || id->pid == LNET_PID_ANY ||
720                                peer_ni->ksnp_id.pid == id->pid)))
721                                 continue;
722
723                         ksocknal_peer_addref(peer_ni);  /* a ref for me... */
724
725                         ksocknal_del_peer_locked(peer_ni);
726
727                         if (peer_ni->ksnp_closing &&
728                             !list_empty(&peer_ni->ksnp_tx_queue)) {
729                                 LASSERT(list_empty(&peer_ni->ksnp_conns));
730                                 LASSERT(peer_ni->ksnp_conn_cb == NULL);
731
732                                 list_splice_init(&peer_ni->ksnp_tx_queue,
733                                                  &zombies);
734                         }
735
736                         ksocknal_peer_decref(peer_ni);  /* ...till here */
737
738                         rc = 0;                         /* matched! */
739                 }
740         }
741
742         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
743
744         ksocknal_txlist_done(ni, &zombies, -ENETDOWN);
745
746         return rc;
747 }
748
749 static struct ksock_conn *
750 ksocknal_get_conn_by_idx(struct lnet_ni *ni, int index)
751 {
752         struct ksock_peer_ni *peer_ni;
753         struct ksock_conn *conn;
754         int i;
755
756         read_lock(&ksocknal_data.ksnd_global_lock);
757
758         hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list) {
759                 LASSERT(!peer_ni->ksnp_closing);
760
761                 if (peer_ni->ksnp_ni != ni)
762                         continue;
763
764                 list_for_each_entry(conn, &peer_ni->ksnp_conns,
765                                     ksnc_list) {
766                         if (index-- > 0)
767                                 continue;
768
769                         ksocknal_conn_addref(conn);
770                         read_unlock(&ksocknal_data.ksnd_global_lock);
771                         return conn;
772                 }
773         }
774
775         read_unlock(&ksocknal_data.ksnd_global_lock);
776         return NULL;
777 }
778
779 static struct ksock_sched *
780 ksocknal_choose_scheduler_locked(unsigned int cpt)
781 {
782         struct ksock_sched *sched = ksocknal_data.ksnd_schedulers[cpt];
783         int i;
784
785         if (sched->kss_nthreads == 0) {
786                 cfs_percpt_for_each(sched, i, ksocknal_data.ksnd_schedulers) {
787                         if (sched->kss_nthreads > 0) {
788                                 CDEBUG(D_NET, "scheduler[%d] has no threads. selected scheduler[%d]\n",
789                                        cpt, sched->kss_cpt);
790                                 return sched;
791                         }
792                 }
793                 return NULL;
794         }
795
796         return sched;
797 }
798
799 int
800 ksocknal_accept(struct lnet_ni *ni, struct socket *sock)
801 {
802         struct ksock_connreq *cr;
803         int rc;
804         struct sockaddr_storage peer;
805
806         rc = lnet_sock_getaddr(sock, true, &peer);
807         if (rc != 0) {
808                 CERROR("Can't determine new connection's address\n");
809                 return rc;
810         }
811
812         LIBCFS_ALLOC(cr, sizeof(*cr));
813         if (cr == NULL) {
814                 LCONSOLE_ERROR_MSG(0x12f,
815                                    "Dropping connection request from %pIS: memory exhausted\n",
816                                    &peer);
817                 return -ENOMEM;
818         }
819
820         lnet_ni_addref(ni);
821         cr->ksncr_ni   = ni;
822         cr->ksncr_sock = sock;
823
824         spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
825
826         list_add_tail(&cr->ksncr_list, &ksocknal_data.ksnd_connd_connreqs);
827         wake_up(&ksocknal_data.ksnd_connd_waitq);
828
829         spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
830         return 0;
831 }
832
833 static int
834 ksocknal_connecting(struct ksock_conn_cb *conn_cb, struct sockaddr *sa)
835 {
836         if (conn_cb &&
837             rpc_cmp_addr((struct sockaddr *)&conn_cb->ksnr_addr, sa))
838                 return conn_cb->ksnr_connecting;
839         return 0;
840 }
841
842 int
843 ksocknal_create_conn(struct lnet_ni *ni, struct ksock_conn_cb *conn_cb,
844                      struct socket *sock, int type)
845 {
846         rwlock_t *global_lock = &ksocknal_data.ksnd_global_lock;
847         LIST_HEAD(zombies);
848         struct lnet_processid peerid;
849         u64 incarnation;
850         struct ksock_conn *conn;
851         struct ksock_conn *conn2;
852         struct ksock_peer_ni *peer_ni = NULL;
853         struct ksock_peer_ni *peer2;
854         struct ksock_sched *sched;
855         struct ksock_hello_msg *hello;
856         int cpt;
857         struct ksock_tx *tx;
858         struct ksock_tx *txtmp;
859         int rc;
860         int rc2;
861         int active;
862         int num_dup = 0;
863         char *warn = NULL;
864
865         active = (conn_cb != NULL);
866
867         LASSERT(active == (type != SOCKLND_CONN_NONE));
868
869         LIBCFS_ALLOC(conn, sizeof(*conn));
870         if (conn == NULL) {
871                 rc = -ENOMEM;
872                 goto failed_0;
873         }
874
875         conn->ksnc_peer = NULL;
876         conn->ksnc_conn_cb = NULL;
877         conn->ksnc_sock = sock;
878         /* 2 ref, 1 for conn, another extra ref prevents socket
879          * being closed before establishment of connection */
880         refcount_set(&conn->ksnc_sock_refcount, 2);
881         conn->ksnc_type = type;
882         ksocknal_lib_save_callback(sock, conn);
883         refcount_set(&conn->ksnc_conn_refcount, 1); /* 1 ref for me */
884
885         conn->ksnc_rx_ready = 0;
886         conn->ksnc_rx_scheduled = 0;
887
888         INIT_LIST_HEAD(&conn->ksnc_tx_queue);
889         conn->ksnc_tx_ready = 0;
890         conn->ksnc_tx_scheduled = 0;
891         conn->ksnc_tx_carrier = NULL;
892         atomic_set (&conn->ksnc_tx_nob, 0);
893
894         LIBCFS_ALLOC(hello, offsetof(struct ksock_hello_msg,
895                                      kshm_ips[LNET_INTERFACES_NUM]));
896         if (hello == NULL) {
897                 rc = -ENOMEM;
898                 goto failed_1;
899         }
900
901         /* stash conn's local and remote addrs */
902         rc = ksocknal_lib_get_conn_addrs(conn);
903         if (rc != 0)
904                 goto failed_1;
905
906         /* Find out/confirm peer_ni's NID and connection type and get the
907          * vector of interfaces she's willing to let me connect to.
908          * Passive connections use the listener timeout since the peer_ni sends
909          * eagerly
910          */
911
912         if (active) {
913                 peer_ni = conn_cb->ksnr_peer;
914                 LASSERT(ni == peer_ni->ksnp_ni);
915
916                 /* Active connection sends HELLO eagerly */
917                 hello->kshm_nips = 0;
918                 peerid = peer_ni->ksnp_id;
919
920                 write_lock_bh(global_lock);
921                 conn->ksnc_proto = peer_ni->ksnp_proto;
922                 write_unlock_bh(global_lock);
923
924                 if (conn->ksnc_proto == NULL) {
925                         conn->ksnc_proto = &ksocknal_protocol_v3x;
926 #if SOCKNAL_VERSION_DEBUG
927                         if (*ksocknal_tunables.ksnd_protocol == 2)
928                                 conn->ksnc_proto = &ksocknal_protocol_v2x;
929                         else if (*ksocknal_tunables.ksnd_protocol == 1)
930                                 conn->ksnc_proto = &ksocknal_protocol_v1x;
931 #endif
932                 }
933
934                 rc = ksocknal_send_hello(ni, conn, &peerid.nid, hello);
935                 if (rc != 0)
936                         goto failed_1;
937         } else {
938                 peerid.nid = LNET_ANY_NID;
939                 peerid.pid = LNET_PID_ANY;
940
941                 /* Passive, get protocol from peer_ni */
942                 conn->ksnc_proto = NULL;
943         }
944
945         rc = ksocknal_recv_hello(ni, conn, hello, &peerid, &incarnation);
946         if (rc < 0)
947                 goto failed_1;
948
949         LASSERT(rc == 0 || active);
950         LASSERT(conn->ksnc_proto != NULL);
951         LASSERT(!LNET_NID_IS_ANY(&peerid.nid));
952
953         cpt = lnet_nid2cpt(&peerid.nid, ni);
954
955         if (active) {
956                 ksocknal_peer_addref(peer_ni);
957                 write_lock_bh(global_lock);
958         } else {
959                 peer_ni = ksocknal_create_peer(ni, &peerid);
960                 if (IS_ERR(peer_ni)) {
961                         rc = PTR_ERR(peer_ni);
962                         goto failed_1;
963                 }
964
965                 write_lock_bh(global_lock);
966
967                 /* called with a ref on ni, so shutdown can't have started */
968                 LASSERT(atomic_read(&((struct ksock_net *)ni->ni_data)->ksnn_npeers) >= 0);
969
970                 peer2 = ksocknal_find_peer_locked(ni, &peerid);
971                 if (peer2 == NULL) {
972                         /* NB this puts an "empty" peer_ni in the peer_ni
973                          * table (which takes my ref) */
974                         hash_add(ksocknal_data.ksnd_peers,
975                                  &peer_ni->ksnp_list, nidhash(&peerid.nid));
976                 } else {
977                         ksocknal_peer_decref(peer_ni);
978                         peer_ni = peer2;
979                 }
980
981                 /* +1 ref for me */
982                 ksocknal_peer_addref(peer_ni);
983                 peer_ni->ksnp_accepting++;
984
985                 /* Am I already connecting to this guy?  Resolve in
986                  * favour of higher NID...
987                  */
988                 if (memcmp(&peerid.nid, &ni->ni_nid, sizeof(peerid.nid)) < 0 &&
989                     ksocknal_connecting(peer_ni->ksnp_conn_cb,
990                                         ((struct sockaddr *) &conn->ksnc_peeraddr))) {
991                         rc = EALREADY;
992                         warn = "connection race resolution";
993                         goto failed_2;
994                 }
995         }
996
997         if (peer_ni->ksnp_closing ||
998             (active && conn_cb->ksnr_deleted)) {
999                 /* peer_ni/conn_cb got closed under me */
1000                 rc = -ESTALE;
1001                 warn = "peer_ni/conn_cb removed";
1002                 goto failed_2;
1003         }
1004
1005         if (peer_ni->ksnp_proto == NULL) {
1006                 /* Never connected before.
1007                  * NB recv_hello may have returned EPROTO to signal my peer_ni
1008                  * wants a different protocol than the one I asked for.
1009                  */
1010                 LASSERT(list_empty(&peer_ni->ksnp_conns));
1011
1012                 peer_ni->ksnp_proto = conn->ksnc_proto;
1013                 peer_ni->ksnp_incarnation = incarnation;
1014         }
1015
1016         if (peer_ni->ksnp_proto != conn->ksnc_proto ||
1017             peer_ni->ksnp_incarnation != incarnation) {
1018                 /* peer_ni rebooted or I've got the wrong protocol version */
1019                 ksocknal_close_peer_conns_locked(peer_ni, NULL, 0);
1020
1021                 peer_ni->ksnp_proto = NULL;
1022                 rc = ESTALE;
1023                 warn = peer_ni->ksnp_incarnation != incarnation ?
1024                         "peer_ni rebooted" :
1025                         "wrong proto version";
1026                 goto failed_2;
1027         }
1028
1029         switch (rc) {
1030         default:
1031                 LBUG();
1032         case 0:
1033                 break;
1034         case EALREADY:
1035                 warn = "lost conn race";
1036                 goto failed_2;
1037         case EPROTO:
1038                 warn = "retry with different protocol version";
1039                 goto failed_2;
1040         }
1041
1042         /* Refuse to duplicate an existing connection, unless this is a
1043          * loopback connection */
1044         if (!rpc_cmp_addr((struct sockaddr *)&conn->ksnc_peeraddr,
1045                           (struct sockaddr *)&conn->ksnc_myaddr)) {
1046                 list_for_each_entry(conn2, &peer_ni->ksnp_conns, ksnc_list) {
1047                         if (!rpc_cmp_addr(
1048                                     (struct sockaddr *)&conn2->ksnc_peeraddr,
1049                                     (struct sockaddr *)&conn->ksnc_peeraddr) ||
1050                             !rpc_cmp_addr(
1051                                     (struct sockaddr *)&conn2->ksnc_myaddr,
1052                                     (struct sockaddr *)&conn->ksnc_myaddr) ||
1053                             conn2->ksnc_type != conn->ksnc_type)
1054                                 continue;
1055
1056                         num_dup++;
1057                         /* If max conns per type is not registered in conn_cb
1058                          * as ksnr_max_conns, use ni's conns_per_peer
1059                          */
1060                         if ((peer_ni->ksnp_conn_cb &&
1061                             num_dup < peer_ni->ksnp_conn_cb->ksnr_max_conns) ||
1062                             (!peer_ni->ksnp_conn_cb &&
1063                             num_dup < ksocknal_get_conns_per_peer(peer_ni)))
1064                                 continue;
1065
1066                         /* Reply on a passive connection attempt so the peer_ni
1067                          * realises we're connected.
1068                          */
1069                         LASSERT(rc == 0);
1070                         if (!active)
1071                                 rc = EALREADY;
1072
1073                         warn = "duplicate";
1074                         goto failed_2;
1075                 }
1076         }
1077         /* If the connection created by this route didn't bind to the IP
1078          * address the route connected to, the connection/route matching
1079          * code below probably isn't going to work.
1080          */
1081         if (active &&
1082             !rpc_cmp_addr((struct sockaddr *)&conn_cb->ksnr_addr,
1083                           (struct sockaddr *)&conn->ksnc_peeraddr)) {
1084                 CERROR("Route %s %pIS connected to %pIS\n",
1085                        libcfs_idstr(&peer_ni->ksnp_id),
1086                        &conn_cb->ksnr_addr,
1087                        &conn->ksnc_peeraddr);
1088         }
1089
1090         /* Search for a conn_cb corresponding to the new connection and
1091          * create an association.  This allows incoming connections created
1092          * by conn_cbs in my peer_ni to match my own conn_cb entries so I don't
1093          * continually create duplicate conn_cbs.
1094          */
1095         conn_cb = peer_ni->ksnp_conn_cb;
1096
1097         if (conn_cb && rpc_cmp_addr((struct sockaddr *)&conn->ksnc_peeraddr,
1098                                     (struct sockaddr *)&conn_cb->ksnr_addr))
1099                 ksocknal_associate_cb_conn_locked(conn_cb, conn);
1100
1101         conn->ksnc_peer = peer_ni;                 /* conn takes my ref on peer_ni */
1102         peer_ni->ksnp_last_alive = ktime_get_seconds();
1103         peer_ni->ksnp_send_keepalive = 0;
1104         peer_ni->ksnp_error = 0;
1105
1106         sched = ksocknal_choose_scheduler_locked(cpt);
1107         if (!sched) {
1108                 CERROR("no schedulers available. node is unhealthy\n");
1109                 goto failed_2;
1110         }
1111         /*
1112          * The cpt might have changed if we ended up selecting a non cpt
1113          * native scheduler. So use the scheduler's cpt instead.
1114          */
1115         cpt = sched->kss_cpt;
1116         sched->kss_nconns++;
1117         conn->ksnc_scheduler = sched;
1118
1119         conn->ksnc_tx_last_post = ktime_get_seconds();
1120         /* Set the deadline for the outgoing HELLO to drain */
1121         conn->ksnc_tx_bufnob = sock->sk->sk_wmem_queued;
1122         conn->ksnc_tx_deadline = ktime_get_seconds() +
1123                                  ksocknal_timeout();
1124         smp_mb();   /* order with adding to peer_ni's conn list */
1125
1126         list_add(&conn->ksnc_list, &peer_ni->ksnp_conns);
1127         ksocknal_conn_addref(conn);
1128
1129         ksocknal_new_packet(conn, 0);
1130
1131         conn->ksnc_zc_capable = ksocknal_lib_zc_capable(conn);
1132
1133         /* Take packets blocking for this connection. */
1134         list_for_each_entry_safe(tx, txtmp, &peer_ni->ksnp_tx_queue, tx_list) {
1135                 if (conn->ksnc_proto->pro_match_tx(conn, tx, tx->tx_nonblk) ==
1136                     SOCKNAL_MATCH_NO)
1137                         continue;
1138
1139                 list_del(&tx->tx_list);
1140                 ksocknal_queue_tx_locked(tx, conn);
1141         }
1142
1143         write_unlock_bh(global_lock);
1144         /* We've now got a new connection.  Any errors from here on are just
1145          * like "normal" comms errors and we close the connection normally.
1146          * NB (a) we still have to send the reply HELLO for passive
1147          *        connections,
1148          *    (b) normal I/O on the conn is blocked until I setup and call the
1149          *        socket callbacks.
1150          */
1151
1152         CDEBUG(D_NET, "New conn %s p %d.x %pIS -> %pISp"
1153                " incarnation:%lld sched[%d]\n",
1154                libcfs_idstr(&peerid), conn->ksnc_proto->pro_version,
1155                &conn->ksnc_myaddr, &conn->ksnc_peeraddr,
1156                incarnation, cpt);
1157
1158         if (!active) {
1159                 hello->kshm_nips = 0;
1160                 rc = ksocknal_send_hello(ni, conn, &peerid.nid, hello);
1161         }
1162
1163         LIBCFS_FREE(hello, offsetof(struct ksock_hello_msg,
1164                                     kshm_ips[LNET_INTERFACES_NUM]));
1165
1166         /* setup the socket AFTER I've received hello (it disables
1167          * SO_LINGER).  I might call back to the acceptor who may want
1168          * to send a protocol version response and then close the
1169          * socket; this ensures the socket only tears down after the
1170          * response has been sent.
1171          */
1172         if (rc == 0)
1173                 rc = ksocknal_lib_setup_sock(sock);
1174
1175         write_lock_bh(global_lock);
1176
1177         /* NB my callbacks block while I hold ksnd_global_lock */
1178         ksocknal_lib_set_callback(sock, conn);
1179
1180         if (!active)
1181                 peer_ni->ksnp_accepting--;
1182
1183         write_unlock_bh(global_lock);
1184
1185         if (rc != 0) {
1186                 write_lock_bh(global_lock);
1187                 if (!conn->ksnc_closing) {
1188                         /* could be closed by another thread */
1189                         ksocknal_close_conn_locked(conn, rc);
1190                 }
1191                 write_unlock_bh(global_lock);
1192         } else if (ksocknal_connsock_addref(conn) == 0) {
1193                 /* Allow I/O to proceed. */
1194                 ksocknal_read_callback(conn);
1195                 ksocknal_write_callback(conn);
1196                 ksocknal_connsock_decref(conn);
1197         }
1198
1199         ksocknal_connsock_decref(conn);
1200         ksocknal_conn_decref(conn);
1201         return rc;
1202
1203 failed_2:
1204
1205         if (!peer_ni->ksnp_closing &&
1206             list_empty(&peer_ni->ksnp_conns) &&
1207             peer_ni->ksnp_conn_cb == NULL) {
1208                 list_splice_init(&peer_ni->ksnp_tx_queue, &zombies);
1209                 ksocknal_unlink_peer_locked(peer_ni);
1210         }
1211
1212         write_unlock_bh(global_lock);
1213
1214         if (warn != NULL) {
1215                 if (rc < 0)
1216                         CERROR("Not creating conn %s type %d: %s\n",
1217                                libcfs_idstr(&peerid), conn->ksnc_type, warn);
1218                 else
1219                         CDEBUG(D_NET, "Not creating conn %s type %d: %s\n",
1220                                libcfs_idstr(&peerid), conn->ksnc_type, warn);
1221         }
1222
1223         if (!active) {
1224                 if (rc > 0) {
1225                         /* Request retry by replying with CONN_NONE
1226                          * ksnc_proto has been set already
1227                          */
1228                         conn->ksnc_type = SOCKLND_CONN_NONE;
1229                         hello->kshm_nips = 0;
1230                         ksocknal_send_hello(ni, conn, &peerid.nid, hello);
1231                 }
1232
1233                 write_lock_bh(global_lock);
1234                 peer_ni->ksnp_accepting--;
1235                 write_unlock_bh(global_lock);
1236         }
1237
1238         /*
1239          * If we get here without an error code, just use -EALREADY.
1240          * Depending on how we got here, the error may be positive
1241          * or negative. Normalize the value for ksocknal_txlist_done().
1242          */
1243         rc2 = (rc == 0 ? -EALREADY : (rc > 0 ? -rc : rc));
1244         ksocknal_txlist_done(ni, &zombies, rc2);
1245         ksocknal_peer_decref(peer_ni);
1246
1247 failed_1:
1248         if (hello != NULL)
1249                 LIBCFS_FREE(hello, offsetof(struct ksock_hello_msg,
1250                                             kshm_ips[LNET_INTERFACES_NUM]));
1251
1252         LIBCFS_FREE(conn, sizeof(*conn));
1253
1254 failed_0:
1255         sock_release(sock);
1256
1257         return rc;
1258 }
1259
1260 void
1261 ksocknal_close_conn_locked(struct ksock_conn *conn, int error)
1262 {
1263         /* This just does the immmediate housekeeping, and queues the
1264          * connection for the reaper to terminate.
1265          * Caller holds ksnd_global_lock exclusively in irq context */
1266         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1267         struct ksock_conn_cb *conn_cb;
1268         struct ksock_conn *conn2;
1269         int conn_count;
1270         int duplicate_count = 0;
1271
1272         LASSERT(peer_ni->ksnp_error == 0);
1273         LASSERT(!conn->ksnc_closing);
1274         conn->ksnc_closing = 1;
1275
1276         /* ksnd_deathrow_conns takes over peer_ni's ref */
1277         list_del(&conn->ksnc_list);
1278
1279         conn_cb = conn->ksnc_conn_cb;
1280         if (conn_cb != NULL) {
1281                 /* dissociate conn from cb... */
1282                 LASSERT(!conn_cb->ksnr_deleted);
1283
1284                 conn_count = ksocknal_get_conn_count_by_type(conn_cb,
1285                                                              conn->ksnc_type);
1286                 /* connected bit is set only if all connections
1287                  * of the given type got created
1288                  */
1289                 if (conn_count == conn_cb->ksnr_max_conns)
1290                         LASSERT((conn_cb->ksnr_connected &
1291                                 BIT(conn->ksnc_type)) != 0);
1292
1293                 if (conn_count == 1) {
1294                         list_for_each_entry(conn2, &peer_ni->ksnp_conns,
1295                                             ksnc_list) {
1296                                 if (conn2->ksnc_conn_cb == conn_cb &&
1297                                     conn2->ksnc_type == conn->ksnc_type)
1298                                         duplicate_count += 1;
1299                         }
1300                         if (duplicate_count > 0)
1301                                 CERROR("Found %d duplicate conns type %d\n",
1302                                        duplicate_count,
1303                                        conn->ksnc_type);
1304                 }
1305                 ksocknal_decr_conn_count(conn_cb, conn->ksnc_type);
1306
1307                 conn->ksnc_conn_cb = NULL;
1308
1309                 /* drop conn's ref on conn_cb */
1310                 ksocknal_conn_cb_decref(conn_cb);
1311         }
1312
1313         if (list_empty(&peer_ni->ksnp_conns)) {
1314                 /* No more connections to this peer_ni */
1315
1316                 if (!list_empty(&peer_ni->ksnp_tx_queue)) {
1317                         struct ksock_tx *tx;
1318
1319                         LASSERT(conn->ksnc_proto == &ksocknal_protocol_v3x);
1320
1321                         /* throw them to the last connection...,
1322                          * these TXs will be send to /dev/null by scheduler */
1323                         list_for_each_entry(tx, &peer_ni->ksnp_tx_queue,
1324                                             tx_list)
1325                                 ksocknal_tx_prep(conn, tx);
1326
1327                         spin_lock_bh(&conn->ksnc_scheduler->kss_lock);
1328                         list_splice_init(&peer_ni->ksnp_tx_queue,
1329                                          &conn->ksnc_tx_queue);
1330                         spin_unlock_bh(&conn->ksnc_scheduler->kss_lock);
1331                 }
1332
1333                 /* renegotiate protocol version */
1334                 peer_ni->ksnp_proto = NULL;
1335                 /* stash last conn close reason */
1336                 peer_ni->ksnp_error = error;
1337
1338                 if (peer_ni->ksnp_conn_cb == NULL) {
1339                         /* I've just closed last conn belonging to a
1340                          * peer_ni with no connections to it
1341                          */
1342                         ksocknal_unlink_peer_locked(peer_ni);
1343                 }
1344         }
1345
1346         spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
1347
1348         list_add_tail(&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
1349         wake_up(&ksocknal_data.ksnd_reaper_waitq);
1350
1351         spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
1352 }
1353
1354 void
1355 ksocknal_peer_failed(struct ksock_peer_ni *peer_ni)
1356 {
1357         bool notify = false;
1358         time64_t last_alive = 0;
1359
1360         /* There has been a connection failure or comms error; but I'll only
1361          * tell LNET I think the peer_ni is dead if it's to another kernel and
1362          * there are no connections or connection attempts in existence. */
1363
1364         read_lock(&ksocknal_data.ksnd_global_lock);
1365
1366         if ((peer_ni->ksnp_id.pid & LNET_PID_USERFLAG) == 0 &&
1367              list_empty(&peer_ni->ksnp_conns) &&
1368              peer_ni->ksnp_accepting == 0 &&
1369              !ksocknal_find_connecting_conn_cb_locked(peer_ni)) {
1370                 notify = true;
1371                 last_alive = peer_ni->ksnp_last_alive;
1372         }
1373
1374         read_unlock(&ksocknal_data.ksnd_global_lock);
1375
1376         if (notify)
1377                 lnet_notify(peer_ni->ksnp_ni,
1378                             lnet_nid_to_nid4(&peer_ni->ksnp_id.nid),
1379                             false, false, last_alive);
1380 }
1381
1382 void
1383 ksocknal_finalize_zcreq(struct ksock_conn *conn)
1384 {
1385         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1386         struct ksock_tx *tx;
1387         struct ksock_tx *tmp;
1388         LIST_HEAD(zlist);
1389
1390         /* NB safe to finalize TXs because closing of socket will
1391          * abort all buffered data */
1392         LASSERT(conn->ksnc_sock == NULL);
1393
1394         spin_lock(&peer_ni->ksnp_lock);
1395
1396         list_for_each_entry_safe(tx, tmp, &peer_ni->ksnp_zc_req_list,
1397                                  tx_zc_list) {
1398                 if (tx->tx_conn != conn)
1399                         continue;
1400
1401                 LASSERT(tx->tx_msg.ksm_zc_cookies[0] != 0);
1402
1403                 tx->tx_msg.ksm_zc_cookies[0] = 0;
1404                 tx->tx_zc_aborted = 1;  /* mark it as not-acked */
1405                 list_move(&tx->tx_zc_list, &zlist);
1406         }
1407
1408         spin_unlock(&peer_ni->ksnp_lock);
1409
1410         while ((tx = list_first_entry_or_null(&zlist, struct ksock_tx,
1411                                               tx_zc_list)) != NULL) {
1412                 list_del(&tx->tx_zc_list);
1413                 ksocknal_tx_decref(tx);
1414         }
1415 }
1416
1417 void
1418 ksocknal_terminate_conn(struct ksock_conn *conn)
1419 {
1420         /* This gets called by the reaper (guaranteed thread context) to
1421          * disengage the socket from its callbacks and close it.
1422          * ksnc_refcount will eventually hit zero, and then the reaper will
1423          * destroy it.
1424          */
1425         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1426         struct ksock_sched *sched = conn->ksnc_scheduler;
1427         bool failed = false;
1428
1429         LASSERT(conn->ksnc_closing);
1430
1431         /* wake up the scheduler to "send" all remaining packets to /dev/null */
1432         spin_lock_bh(&sched->kss_lock);
1433
1434         /* a closing conn is always ready to tx */
1435         conn->ksnc_tx_ready = 1;
1436
1437         if (!conn->ksnc_tx_scheduled &&
1438             !list_empty(&conn->ksnc_tx_queue)) {
1439                 list_add_tail(&conn->ksnc_tx_list,
1440                               &sched->kss_tx_conns);
1441                 conn->ksnc_tx_scheduled = 1;
1442                 /* extra ref for scheduler */
1443                 ksocknal_conn_addref(conn);
1444
1445                 wake_up(&sched->kss_waitq);
1446         }
1447
1448         spin_unlock_bh(&sched->kss_lock);
1449
1450         /* serialise with callbacks */
1451         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1452
1453         ksocknal_lib_reset_callback(conn->ksnc_sock, conn);
1454
1455         /* OK, so this conn may not be completely disengaged from its
1456          * scheduler yet, but it _has_ committed to terminate...
1457          */
1458         conn->ksnc_scheduler->kss_nconns--;
1459
1460         if (peer_ni->ksnp_error != 0) {
1461                 /* peer_ni's last conn closed in error */
1462                 LASSERT(list_empty(&peer_ni->ksnp_conns));
1463                 failed = true;
1464                 peer_ni->ksnp_error = 0;     /* avoid multiple notifications */
1465         }
1466
1467         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1468
1469         if (failed)
1470                 ksocknal_peer_failed(peer_ni);
1471
1472         /* The socket is closed on the final put; either here, or in
1473          * ksocknal_{send,recv}msg().  Since we set up the linger2 option
1474          * when the connection was established, this will close the socket
1475          * immediately, aborting anything buffered in it. Any hung
1476          * zero-copy transmits will therefore complete in finite time.
1477          */
1478         ksocknal_connsock_decref(conn);
1479 }
1480
1481 void
1482 ksocknal_queue_zombie_conn(struct ksock_conn *conn)
1483 {
1484         /* Queue the conn for the reaper to destroy */
1485         LASSERT(refcount_read(&conn->ksnc_conn_refcount) == 0);
1486         spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
1487
1488         list_add_tail(&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1489         wake_up(&ksocknal_data.ksnd_reaper_waitq);
1490
1491         spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
1492 }
1493
1494 void
1495 ksocknal_destroy_conn(struct ksock_conn *conn)
1496 {
1497         time64_t last_rcv;
1498
1499         /* Final coup-de-grace of the reaper */
1500         CDEBUG(D_NET, "connection %p\n", conn);
1501
1502         LASSERT(refcount_read(&conn->ksnc_conn_refcount) == 0);
1503         LASSERT(refcount_read(&conn->ksnc_sock_refcount) == 0);
1504         LASSERT(conn->ksnc_sock == NULL);
1505         LASSERT(conn->ksnc_conn_cb == NULL);
1506         LASSERT(!conn->ksnc_tx_scheduled);
1507         LASSERT(!conn->ksnc_rx_scheduled);
1508         LASSERT(list_empty(&conn->ksnc_tx_queue));
1509
1510         /* complete current receive if any */
1511         switch (conn->ksnc_rx_state) {
1512         case SOCKNAL_RX_LNET_PAYLOAD:
1513                 last_rcv = conn->ksnc_rx_deadline -
1514                            ksocknal_timeout();
1515                 CERROR("Completing partial receive from %s[%d], ip %pISp, with error, wanted: %d, left: %d, last alive is %lld secs ago\n",
1516                        libcfs_idstr(&conn->ksnc_peer->ksnp_id),
1517                        conn->ksnc_type,
1518                        &conn->ksnc_peeraddr,
1519                        conn->ksnc_rx_nob_wanted, conn->ksnc_rx_nob_left,
1520                        ktime_get_seconds() - last_rcv);
1521                 if (conn->ksnc_lnet_msg)
1522                         conn->ksnc_lnet_msg->msg_health_status =
1523                                 LNET_MSG_STATUS_REMOTE_ERROR;
1524                 lnet_finalize(conn->ksnc_lnet_msg, -EIO);
1525                 break;
1526         case SOCKNAL_RX_LNET_HEADER:
1527                 if (conn->ksnc_rx_started)
1528                         CERROR("Incomplete receive of lnet header from %s, ip %pISp, with error, protocol: %d.x.\n",
1529                                libcfs_idstr(&conn->ksnc_peer->ksnp_id),
1530                                &conn->ksnc_peeraddr,
1531                                conn->ksnc_proto->pro_version);
1532                 break;
1533         case SOCKNAL_RX_KSM_HEADER:
1534                 if (conn->ksnc_rx_started)
1535                         CERROR("Incomplete receive of ksock message from %s, ip %pISp, with error, protocol: %d.x.\n",
1536                                libcfs_idstr(&conn->ksnc_peer->ksnp_id),
1537                                &conn->ksnc_peeraddr,
1538                                conn->ksnc_proto->pro_version);
1539                 break;
1540         case SOCKNAL_RX_SLOP:
1541                 if (conn->ksnc_rx_started)
1542                         CERROR("Incomplete receive of slops from %s, ip %pISp, with error\n",
1543                                libcfs_idstr(&conn->ksnc_peer->ksnp_id),
1544                                &conn->ksnc_peeraddr);
1545                 break;
1546         default:
1547                 LBUG();
1548                 break;
1549         }
1550
1551         ksocknal_peer_decref(conn->ksnc_peer);
1552
1553         LIBCFS_FREE(conn, sizeof(*conn));
1554 }
1555
1556 int
1557 ksocknal_close_peer_conns_locked(struct ksock_peer_ni *peer_ni,
1558                                  struct sockaddr *addr, int why)
1559 {
1560         struct ksock_conn *conn;
1561         struct ksock_conn *cnxt;
1562         int count = 0;
1563
1564         list_for_each_entry_safe(conn, cnxt, &peer_ni->ksnp_conns, ksnc_list) {
1565                 if (!addr ||
1566                     rpc_cmp_addr(addr,
1567                                  (struct sockaddr *)&conn->ksnc_peeraddr)) {
1568                         count++;
1569                         ksocknal_close_conn_locked(conn, why);
1570                 }
1571         }
1572
1573         return count;
1574 }
1575
1576 int
1577 ksocknal_close_conn_and_siblings(struct ksock_conn *conn, int why)
1578 {
1579         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1580         int count;
1581
1582         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1583
1584         count = ksocknal_close_peer_conns_locked(
1585                 peer_ni, (struct sockaddr *)&conn->ksnc_peeraddr, why);
1586
1587         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1588
1589         return count;
1590 }
1591
1592 int
1593 ksocknal_close_matching_conns(struct lnet_processid *id, __u32 ipaddr)
1594 {
1595         struct ksock_peer_ni *peer_ni;
1596         struct hlist_node *pnxt;
1597         int lo;
1598         int hi;
1599         int i;
1600         int count = 0;
1601         struct sockaddr_in sa = {.sin_family = AF_INET};
1602
1603         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1604
1605         if (!LNET_NID_IS_ANY(&id->nid)) {
1606                 lo = hash_min(nidhash(&id->nid),
1607                               HASH_BITS(ksocknal_data.ksnd_peers));
1608                 hi = lo;
1609         } else {
1610                 lo = 0;
1611                 hi = HASH_SIZE(ksocknal_data.ksnd_peers) - 1;
1612         }
1613
1614         sa.sin_addr.s_addr = htonl(ipaddr);
1615         for (i = lo; i <= hi; i++) {
1616                 hlist_for_each_entry_safe(peer_ni, pnxt,
1617                                           &ksocknal_data.ksnd_peers[i],
1618                                           ksnp_list) {
1619
1620                         if (!((LNET_NID_IS_ANY(&id->nid) ||
1621                                nid_same(&id->nid, &peer_ni->ksnp_id.nid)) &&
1622                               (id->pid == LNET_PID_ANY ||
1623                                id->pid == peer_ni->ksnp_id.pid)))
1624                                 continue;
1625
1626                         count += ksocknal_close_peer_conns_locked(
1627                                 peer_ni,
1628                                 ipaddr ? (struct sockaddr *)&sa : NULL, 0);
1629                 }
1630         }
1631
1632         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1633
1634         /* wildcards always succeed */
1635         if (LNET_NID_IS_ANY(&id->nid) || id->pid == LNET_PID_ANY ||
1636             ipaddr == 0)
1637                 return 0;
1638
1639         return (count == 0 ? -ENOENT : 0);
1640 }
1641
1642 void
1643 ksocknal_notify_gw_down(struct lnet_nid *gw_nid)
1644 {
1645         /* The router is telling me she's been notified of a change in
1646          * gateway state....
1647          */
1648         struct lnet_processid id = {
1649                 .pid    = LNET_PID_ANY,
1650                 .nid    = *gw_nid,
1651         };
1652
1653         CDEBUG(D_NET, "gw %s down\n", libcfs_nidstr(gw_nid));
1654
1655         /* If the gateway crashed, close all open connections... */
1656         ksocknal_close_matching_conns(&id, 0);
1657         return;
1658
1659         /* We can only establish new connections
1660          * if we have autroutes, and these connect on demand.
1661          */
1662 }
1663
1664 static void
1665 ksocknal_push_peer(struct ksock_peer_ni *peer_ni)
1666 {
1667         int index;
1668         int i;
1669         struct ksock_conn *conn;
1670
1671         for (index = 0; ; index++) {
1672                 read_lock(&ksocknal_data.ksnd_global_lock);
1673
1674                 i = 0;
1675                 conn = NULL;
1676
1677                 list_for_each_entry(conn, &peer_ni->ksnp_conns, ksnc_list) {
1678                         if (i++ == index) {
1679                                 ksocknal_conn_addref(conn);
1680                                 break;
1681                         }
1682                 }
1683
1684                 read_unlock(&ksocknal_data.ksnd_global_lock);
1685
1686                 if (i <= index)
1687                         break;
1688
1689                 ksocknal_lib_push_conn (conn);
1690                 ksocknal_conn_decref(conn);
1691         }
1692 }
1693
1694 static int
1695 ksocknal_push(struct lnet_ni *ni, struct lnet_processid *id)
1696 {
1697         int lo;
1698         int hi;
1699         int bkt;
1700         int rc = -ENOENT;
1701
1702         if (!LNET_NID_IS_ANY(&id->nid)) {
1703                 lo = hash_min(nidhash(&id->nid),
1704                               HASH_BITS(ksocknal_data.ksnd_peers));
1705                 hi = lo;
1706         } else {
1707                 lo = 0;
1708                 hi = HASH_SIZE(ksocknal_data.ksnd_peers) - 1;
1709         }
1710
1711         for (bkt = lo; bkt <= hi; bkt++) {
1712                 int peer_off; /* searching offset in peer_ni hash table */
1713
1714                 for (peer_off = 0; ; peer_off++) {
1715                         struct ksock_peer_ni *peer_ni;
1716                         int           i = 0;
1717
1718                         read_lock(&ksocknal_data.ksnd_global_lock);
1719                         hlist_for_each_entry(peer_ni,
1720                                              &ksocknal_data.ksnd_peers[bkt],
1721                                              ksnp_list) {
1722                                 if (!((LNET_NID_IS_ANY(&id->nid) ||
1723                                        nid_same(&id->nid,
1724                                                  &peer_ni->ksnp_id.nid)) &&
1725                                       (id->pid == LNET_PID_ANY ||
1726                                        id->pid == peer_ni->ksnp_id.pid)))
1727                                         continue;
1728
1729                                 if (i++ == peer_off) {
1730                                         ksocknal_peer_addref(peer_ni);
1731                                         break;
1732                                 }
1733                         }
1734                         read_unlock(&ksocknal_data.ksnd_global_lock);
1735
1736                         if (i <= peer_off) /* no match */
1737                                 break;
1738
1739                         rc = 0;
1740                         ksocknal_push_peer(peer_ni);
1741                         ksocknal_peer_decref(peer_ni);
1742                 }
1743         }
1744         return rc;
1745 }
1746
1747 int
1748 ksocknal_ctl(struct lnet_ni *ni, unsigned int cmd, void *arg)
1749 {
1750         struct lnet_processid id = {};
1751         struct libcfs_ioctl_data *data = arg;
1752         int rc;
1753
1754         switch(cmd) {
1755         case IOC_LIBCFS_GET_INTERFACE: {
1756                 struct ksock_net *net = ni->ni_data;
1757                 struct ksock_interface *iface;
1758                 struct sockaddr_in *sa;
1759
1760                 read_lock(&ksocknal_data.ksnd_global_lock);
1761
1762                 if (data->ioc_count >= 1) {
1763                         rc = -ENOENT;
1764                 } else {
1765                         rc = 0;
1766                         iface = &net->ksnn_interface;
1767
1768                         sa = (void *)&iface->ksni_addr;
1769                         if (sa->sin_family == AF_INET)
1770                                 data->ioc_u32[0] = ntohl(sa->sin_addr.s_addr);
1771                         else
1772                                 data->ioc_u32[0] = 0xFFFFFFFF;
1773                         data->ioc_u32[1] = iface->ksni_netmask;
1774                         data->ioc_u32[2] = iface->ksni_npeers;
1775                         data->ioc_u32[3] = iface->ksni_nroutes;
1776                 }
1777
1778                 read_unlock(&ksocknal_data.ksnd_global_lock);
1779                 return rc;
1780         }
1781
1782         case IOC_LIBCFS_GET_PEER: {
1783                 __u32 myip = 0;
1784                 __u32 ip = 0;
1785                 int port = 0;
1786                 int conn_count = 0;
1787                 int share_count = 0;
1788
1789                 rc = ksocknal_get_peer_info(ni, data->ioc_count,
1790                                             &id, &myip, &ip, &port,
1791                                             &conn_count,  &share_count);
1792                 if (rc != 0)
1793                         return rc;
1794                 if (!nid_is_nid4(&id.nid))
1795                         return -EINVAL;
1796                 data->ioc_nid    = lnet_nid_to_nid4(&id.nid);
1797                 data->ioc_count  = share_count;
1798                 data->ioc_u32[0] = ip;
1799                 data->ioc_u32[1] = port;
1800                 data->ioc_u32[2] = myip;
1801                 data->ioc_u32[3] = conn_count;
1802                 data->ioc_u32[4] = id.pid;
1803                 return 0;
1804         }
1805
1806         case IOC_LIBCFS_ADD_PEER: {
1807                 struct sockaddr_in sa = {.sin_family = AF_INET};
1808
1809                 id.pid = LNET_PID_LUSTRE;
1810                 lnet_nid4_to_nid(data->ioc_nid, &id.nid);
1811                 sa.sin_addr.s_addr = htonl(data->ioc_u32[0]);
1812                 sa.sin_port = htons(data->ioc_u32[1]);
1813                 return ksocknal_add_peer(ni, &id, (struct sockaddr *)&sa);
1814         }
1815         case IOC_LIBCFS_DEL_PEER:
1816                 lnet_nid4_to_nid(data->ioc_nid, &id.nid);
1817                 id.pid = LNET_PID_ANY;
1818                 return ksocknal_del_peer(ni, &id);
1819
1820         case IOC_LIBCFS_GET_CONN: {
1821                 int           txmem;
1822                 int           rxmem;
1823                 int           nagle;
1824                 struct ksock_conn *conn = ksocknal_get_conn_by_idx(ni, data->ioc_count);
1825                 struct sockaddr_in *psa = (void *)&conn->ksnc_peeraddr;
1826                 struct sockaddr_in *mysa = (void *)&conn->ksnc_myaddr;
1827
1828                 if (conn == NULL)
1829                         return -ENOENT;
1830
1831                 ksocknal_lib_get_conn_tunables(conn, &txmem, &rxmem, &nagle);
1832
1833                 data->ioc_count = txmem;
1834                 data->ioc_nid = lnet_nid_to_nid4(&conn->ksnc_peer->ksnp_id.nid);
1835                 data->ioc_flags = nagle;
1836                 if (psa->sin_family == AF_INET)
1837                         data->ioc_u32[0] = ntohl(psa->sin_addr.s_addr);
1838                 else
1839                         data->ioc_u32[0] = 0xFFFFFFFF;
1840                 data->ioc_u32[1] = rpc_get_port((struct sockaddr *)
1841                                                 &conn->ksnc_peeraddr);
1842                 if (mysa->sin_family == AF_INET)
1843                         data->ioc_u32[2] = ntohl(mysa->sin_addr.s_addr);
1844                 else
1845                         data->ioc_u32[2] = 0xFFFFFFFF;
1846                 data->ioc_u32[3] = conn->ksnc_type;
1847                 data->ioc_u32[4] = conn->ksnc_scheduler->kss_cpt;
1848                 data->ioc_u32[5] = rxmem;
1849                 data->ioc_u32[6] = conn->ksnc_peer->ksnp_id.pid;
1850                 ksocknal_conn_decref(conn);
1851                 return 0;
1852         }
1853
1854         case IOC_LIBCFS_CLOSE_CONNECTION:
1855                 lnet_nid4_to_nid(data->ioc_nid, &id.nid);
1856                 id.pid = LNET_PID_ANY;
1857                 return ksocknal_close_matching_conns(&id,
1858                                                      data->ioc_u32[0]);
1859
1860         case IOC_LIBCFS_REGISTER_MYNID:
1861                 /* Ignore if this is a noop */
1862                 if (nid_is_nid4(&ni->ni_nid) &&
1863                     data->ioc_nid == lnet_nid_to_nid4(&ni->ni_nid))
1864                         return 0;
1865
1866                 CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID: %s(%s)\n",
1867                        libcfs_nid2str(data->ioc_nid),
1868                        libcfs_nidstr(&ni->ni_nid));
1869                 return -EINVAL;
1870
1871         case IOC_LIBCFS_PUSH_CONNECTION:
1872                 lnet_nid4_to_nid(data->ioc_nid, &id.nid);
1873                 id.pid = LNET_PID_ANY;
1874                 return ksocknal_push(ni, &id);
1875
1876         default:
1877                 return -EINVAL;
1878         }
1879         /* not reached */
1880 }
1881
1882 static void
1883 ksocknal_free_buffers (void)
1884 {
1885         LASSERT (atomic_read(&ksocknal_data.ksnd_nactive_txs) == 0);
1886
1887         if (ksocknal_data.ksnd_schedulers != NULL)
1888                 cfs_percpt_free(ksocknal_data.ksnd_schedulers);
1889
1890         spin_lock(&ksocknal_data.ksnd_tx_lock);
1891
1892         if (!list_empty(&ksocknal_data.ksnd_idle_noop_txs)) {
1893                 LIST_HEAD(zlist);
1894                 struct ksock_tx *tx;
1895
1896                 list_splice_init(&ksocknal_data.ksnd_idle_noop_txs, &zlist);
1897                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
1898
1899                 while ((tx = list_first_entry_or_null(&zlist, struct ksock_tx,
1900                                                       tx_list)) != NULL) {
1901                         list_del(&tx->tx_list);
1902                         LIBCFS_FREE(tx, tx->tx_desc_size);
1903                 }
1904         } else {
1905                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
1906         }
1907 }
1908
1909 static int ksocknal_get_link_status(struct net_device *dev)
1910 {
1911         int ret = -1;
1912
1913         LASSERT(dev);
1914
1915         if (!netif_running(dev)) {
1916                 ret = 0;
1917                 CDEBUG(D_NET, "device not running\n");
1918         }
1919         /* Some devices may not be providing link settings */
1920         else if (dev->ethtool_ops->get_link) {
1921                 ret = dev->ethtool_ops->get_link(dev);
1922                 CDEBUG(D_NET, "get_link returns %u\n", ret);
1923         }
1924
1925         return ret;
1926 }
1927
1928 static int
1929 ksocknal_handle_link_state_change(struct net_device *dev,
1930                                   unsigned char operstate)
1931 {
1932         struct lnet_ni *ni = NULL;
1933         struct ksock_net *net;
1934         struct ksock_net *cnxt;
1935         int ifindex;
1936         unsigned char link_down = !(operstate == IF_OPER_UP);
1937         struct in_device *in_dev;
1938         bool found_ip = false;
1939         struct ksock_interface *ksi = NULL;
1940         struct sockaddr_in *sa;
1941         DECLARE_CONST_IN_IFADDR(ifa);
1942
1943         ifindex = dev->ifindex;
1944
1945         if (!ksocknal_data.ksnd_nnets)
1946                 goto out;
1947
1948         list_for_each_entry_safe(net, cnxt, &ksocknal_data.ksnd_nets,
1949                                  ksnn_list) {
1950
1951                 ksi = &net->ksnn_interface;
1952                 sa = (void *)&ksi->ksni_addr;
1953                 found_ip = false;
1954
1955                 if (ksi->ksni_index != ifindex ||
1956                     strcmp(ksi->ksni_name, dev->name))
1957                         continue;
1958
1959                 ni = net->ksnn_ni;
1960
1961                 in_dev = __in_dev_get_rtnl(dev);
1962                 if (!in_dev) {
1963                         CDEBUG(D_NET, "Interface %s has no IPv4 status.\n",
1964                                dev->name);
1965                         CDEBUG(D_NET, "set link fatal state to 1\n");
1966                         atomic_set(&ni->ni_fatal_error_on, 1);
1967                         continue;
1968                 }
1969                 in_dev_for_each_ifa_rtnl(ifa, in_dev) {
1970                         if (sa->sin_addr.s_addr == ifa->ifa_local)
1971                                 found_ip = true;
1972                 }
1973                 endfor_ifa(in_dev);
1974
1975                 if (!found_ip) {
1976                         CDEBUG(D_NET, "Interface %s has no matching ip\n",
1977                                dev->name);
1978                         CDEBUG(D_NET, "set link fatal state to 1\n");
1979                         atomic_set(&ni->ni_fatal_error_on, 1);
1980                         continue;
1981                 }
1982
1983                 if (link_down) {
1984                         CDEBUG(D_NET, "set link fatal state to 1\n");
1985                         atomic_set(&ni->ni_fatal_error_on, link_down);
1986                 } else {
1987                         CDEBUG(D_NET, "set link fatal state to %u\n",
1988                                (ksocknal_get_link_status(dev) == 0));
1989                         atomic_set(&ni->ni_fatal_error_on,
1990                                    (ksocknal_get_link_status(dev) == 0));
1991                 }
1992         }
1993 out:
1994         return 0;
1995 }
1996
1997
1998 static int
1999 ksocknal_handle_inetaddr_change(struct in_ifaddr *ifa, unsigned long event)
2000 {
2001         struct lnet_ni *ni;
2002         struct ksock_net *net;
2003         struct ksock_net *cnxt;
2004         struct net_device *event_netdev = ifa->ifa_dev->dev;
2005         int ifindex;
2006         struct ksock_interface *ksi = NULL;
2007         struct sockaddr_in *sa;
2008
2009         if (!ksocknal_data.ksnd_nnets)
2010                 goto out;
2011
2012         ifindex = event_netdev->ifindex;
2013
2014         list_for_each_entry_safe(net, cnxt, &ksocknal_data.ksnd_nets,
2015                                  ksnn_list) {
2016
2017                 ksi = &net->ksnn_interface;
2018                 sa = (void *)&ksi->ksni_addr;
2019
2020                 if (ksi->ksni_index != ifindex ||
2021                     strcmp(ksi->ksni_name, event_netdev->name))
2022                         continue;
2023
2024                 if (sa->sin_addr.s_addr == ifa->ifa_local) {
2025                         CDEBUG(D_NET, "set link fatal state to %u\n",
2026                                (event == NETDEV_DOWN));
2027                         ni = net->ksnn_ni;
2028                         atomic_set(&ni->ni_fatal_error_on,
2029                                    (event == NETDEV_DOWN));
2030                 }
2031         }
2032 out:
2033         return 0;
2034 }
2035
2036 /************************************
2037  * Net device notifier event handler
2038  ************************************/
2039 static int ksocknal_device_event(struct notifier_block *unused,
2040                                  unsigned long event, void *ptr)
2041 {
2042         struct net_device *dev = netdev_notifier_info_to_dev(ptr);
2043         unsigned char operstate;
2044
2045         operstate = dev->operstate;
2046
2047         CDEBUG(D_NET, "devevent: status=%ld, iface=%s ifindex %d state %u\n",
2048                event, dev->name, dev->ifindex, operstate);
2049
2050         switch (event) {
2051         case NETDEV_UP:
2052         case NETDEV_DOWN:
2053         case NETDEV_CHANGE:
2054                 ksocknal_handle_link_state_change(dev, operstate);
2055                 break;
2056         }
2057
2058         return NOTIFY_OK;
2059 }
2060
2061 /************************************
2062  * Inetaddr notifier event handler
2063  ************************************/
2064 static int ksocknal_inetaddr_event(struct notifier_block *unused,
2065                                    unsigned long event, void *ptr)
2066 {
2067         struct in_ifaddr *ifa = ptr;
2068
2069         CDEBUG(D_NET, "addrevent: status %ld ip addr %pI4, netmask %pI4.\n",
2070                event, &ifa->ifa_address, &ifa->ifa_mask);
2071
2072         switch (event) {
2073         case NETDEV_UP:
2074         case NETDEV_DOWN:
2075         case NETDEV_CHANGE:
2076                 ksocknal_handle_inetaddr_change(ifa, event);
2077                 break;
2078
2079         }
2080         return NOTIFY_OK;
2081 }
2082
2083 static struct notifier_block ksocknal_dev_notifier_block = {
2084         .notifier_call = ksocknal_device_event,
2085 };
2086
2087 static struct notifier_block ksocknal_inetaddr_notifier_block = {
2088         .notifier_call = ksocknal_inetaddr_event,
2089 };
2090
2091 static void
2092 ksocknal_base_shutdown(void)
2093 {
2094         struct ksock_sched *sched;
2095         struct ksock_peer_ni *peer_ni;
2096         int i;
2097
2098         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %lld\n",
2099                libcfs_kmem_read());
2100         LASSERT (ksocknal_data.ksnd_nnets == 0);
2101
2102         if (ksocknal_data.ksnd_init == SOCKNAL_INIT_ALL) {
2103                 unregister_netdevice_notifier(&ksocknal_dev_notifier_block);
2104                 unregister_inetaddr_notifier(&ksocknal_inetaddr_notifier_block);
2105         }
2106
2107         switch (ksocknal_data.ksnd_init) {
2108         default:
2109                 LASSERT(0);
2110                 fallthrough;
2111
2112         case SOCKNAL_INIT_ALL:
2113         case SOCKNAL_INIT_DATA:
2114                 hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list)
2115                         LASSERT(0);
2116
2117                 LASSERT(list_empty(&ksocknal_data.ksnd_nets));
2118                 LASSERT(list_empty(&ksocknal_data.ksnd_enomem_conns));
2119                 LASSERT(list_empty(&ksocknal_data.ksnd_zombie_conns));
2120                 LASSERT(list_empty(&ksocknal_data.ksnd_connd_connreqs));
2121                 LASSERT(list_empty(&ksocknal_data.ksnd_connd_routes));
2122
2123                 if (ksocknal_data.ksnd_schedulers != NULL) {
2124                         cfs_percpt_for_each(sched, i,
2125                                             ksocknal_data.ksnd_schedulers) {
2126
2127                                 LASSERT(list_empty(&sched->kss_tx_conns));
2128                                 LASSERT(list_empty(&sched->kss_rx_conns));
2129                                 LASSERT(list_empty(&sched->kss_zombie_noop_txs));
2130                                 LASSERT(sched->kss_nconns == 0);
2131                         }
2132                 }
2133
2134                 /* flag threads to terminate; wake and wait for them to die */
2135                 ksocknal_data.ksnd_shuttingdown = 1;
2136                 wake_up_all(&ksocknal_data.ksnd_connd_waitq);
2137                 wake_up(&ksocknal_data.ksnd_reaper_waitq);
2138
2139                 if (ksocknal_data.ksnd_schedulers != NULL) {
2140                         cfs_percpt_for_each(sched, i,
2141                                             ksocknal_data.ksnd_schedulers)
2142                                         wake_up_all(&sched->kss_waitq);
2143                 }
2144
2145                 wait_var_event_warning(&ksocknal_data.ksnd_nthreads,
2146                                        atomic_read(&ksocknal_data.ksnd_nthreads) == 0,
2147                                        "waiting for %d threads to terminate\n",
2148                                        atomic_read(&ksocknal_data.ksnd_nthreads));
2149
2150                 ksocknal_free_buffers();
2151
2152                 ksocknal_data.ksnd_init = SOCKNAL_INIT_NOTHING;
2153                 break;
2154         }
2155
2156         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %lld\n",
2157                libcfs_kmem_read());
2158
2159         module_put(THIS_MODULE);
2160 }
2161
2162 static int
2163 ksocknal_base_startup(void)
2164 {
2165         struct ksock_sched *sched;
2166         int rc;
2167         int i;
2168
2169         LASSERT(ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
2170         LASSERT(ksocknal_data.ksnd_nnets == 0);
2171
2172         memset(&ksocknal_data, 0, sizeof(ksocknal_data)); /* zero pointers */
2173
2174         hash_init(ksocknal_data.ksnd_peers);
2175
2176         rwlock_init(&ksocknal_data.ksnd_global_lock);
2177         INIT_LIST_HEAD(&ksocknal_data.ksnd_nets);
2178
2179         spin_lock_init(&ksocknal_data.ksnd_reaper_lock);
2180         INIT_LIST_HEAD(&ksocknal_data.ksnd_enomem_conns);
2181         INIT_LIST_HEAD(&ksocknal_data.ksnd_zombie_conns);
2182         INIT_LIST_HEAD(&ksocknal_data.ksnd_deathrow_conns);
2183         init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
2184
2185         spin_lock_init(&ksocknal_data.ksnd_connd_lock);
2186         INIT_LIST_HEAD(&ksocknal_data.ksnd_connd_connreqs);
2187         INIT_LIST_HEAD(&ksocknal_data.ksnd_connd_routes);
2188         init_waitqueue_head(&ksocknal_data.ksnd_connd_waitq);
2189
2190         spin_lock_init(&ksocknal_data.ksnd_tx_lock);
2191         INIT_LIST_HEAD(&ksocknal_data.ksnd_idle_noop_txs);
2192
2193         /* NB memset above zeros whole of ksocknal_data */
2194
2195         /* flag lists/ptrs/locks initialised */
2196         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
2197         if (!try_module_get(THIS_MODULE))
2198                 goto failed;
2199
2200         /* Create a scheduler block per available CPT */
2201         ksocknal_data.ksnd_schedulers = cfs_percpt_alloc(lnet_cpt_table(),
2202                                                          sizeof(*sched));
2203         if (ksocknal_data.ksnd_schedulers == NULL)
2204                 goto failed;
2205
2206         cfs_percpt_for_each(sched, i, ksocknal_data.ksnd_schedulers) {
2207                 int nthrs;
2208
2209                 /*
2210                  * make sure not to allocate more threads than there are
2211                  * cores/CPUs in teh CPT
2212                  */
2213                 nthrs = cfs_cpt_weight(lnet_cpt_table(), i);
2214                 if (*ksocknal_tunables.ksnd_nscheds > 0) {
2215                         nthrs = min(nthrs, *ksocknal_tunables.ksnd_nscheds);
2216                 } else {
2217                         /*
2218                          * max to half of CPUs, assume another half should be
2219                          * reserved for upper layer modules
2220                          */
2221                         nthrs = min(max(SOCKNAL_NSCHEDS, nthrs >> 1), nthrs);
2222                 }
2223
2224                 sched->kss_nthreads_max = nthrs;
2225                 sched->kss_cpt = i;
2226
2227                 spin_lock_init(&sched->kss_lock);
2228                 INIT_LIST_HEAD(&sched->kss_rx_conns);
2229                 INIT_LIST_HEAD(&sched->kss_tx_conns);
2230                 INIT_LIST_HEAD(&sched->kss_zombie_noop_txs);
2231                 init_waitqueue_head(&sched->kss_waitq);
2232         }
2233
2234         ksocknal_data.ksnd_connd_starting         = 0;
2235         ksocknal_data.ksnd_connd_failed_stamp     = 0;
2236         ksocknal_data.ksnd_connd_starting_stamp   = ktime_get_real_seconds();
2237         /* must have at least 2 connds to remain responsive to accepts while
2238          * connecting */
2239         if (*ksocknal_tunables.ksnd_nconnds < SOCKNAL_CONND_RESV + 1)
2240                 *ksocknal_tunables.ksnd_nconnds = SOCKNAL_CONND_RESV + 1;
2241
2242         if (*ksocknal_tunables.ksnd_nconnds_max <
2243             *ksocknal_tunables.ksnd_nconnds) {
2244                 ksocknal_tunables.ksnd_nconnds_max =
2245                         ksocknal_tunables.ksnd_nconnds;
2246         }
2247
2248         for (i = 0; i < *ksocknal_tunables.ksnd_nconnds; i++) {
2249                 spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
2250                 ksocknal_data.ksnd_connd_starting++;
2251                 spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
2252
2253                 rc = ksocknal_thread_start(ksocknal_connd,
2254                                            (void *)((uintptr_t)i),
2255                                            "socknal_cd%02d", i);
2256                 if (rc != 0) {
2257                         spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
2258                         ksocknal_data.ksnd_connd_starting--;
2259                         spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
2260                         CERROR("Can't spawn socknal connd: %d\n", rc);
2261                         goto failed;
2262                 }
2263         }
2264
2265         rc = ksocknal_thread_start(ksocknal_reaper, NULL, "socknal_reaper");
2266         if (rc != 0) {
2267                 CERROR ("Can't spawn socknal reaper: %d\n", rc);
2268                 goto failed;
2269         }
2270
2271         register_netdevice_notifier(&ksocknal_dev_notifier_block);
2272         register_inetaddr_notifier(&ksocknal_inetaddr_notifier_block);
2273
2274         /* flag everything initialised */
2275         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
2276
2277         return 0;
2278
2279  failed:
2280         ksocknal_base_shutdown();
2281         return -ENETDOWN;
2282 }
2283
2284 static int
2285 ksocknal_debug_peerhash(struct lnet_ni *ni)
2286 {
2287         struct ksock_peer_ni *peer_ni;
2288         int i;
2289
2290         read_lock(&ksocknal_data.ksnd_global_lock);
2291
2292         hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list) {
2293                 struct ksock_conn_cb *conn_cb;
2294                 struct ksock_conn *conn;
2295
2296                 if (peer_ni->ksnp_ni != ni)
2297                         continue;
2298
2299                 CWARN("Active peer_ni on shutdown: %s, ref %d, closing %d, accepting %d, err %d, zcookie %llu, txq %d, zc_req %d\n",
2300                       libcfs_idstr(&peer_ni->ksnp_id),
2301                       refcount_read(&peer_ni->ksnp_refcount),
2302                       peer_ni->ksnp_closing,
2303                       peer_ni->ksnp_accepting, peer_ni->ksnp_error,
2304                       peer_ni->ksnp_zc_next_cookie,
2305                       !list_empty(&peer_ni->ksnp_tx_queue),
2306                       !list_empty(&peer_ni->ksnp_zc_req_list));
2307
2308                 conn_cb = peer_ni->ksnp_conn_cb;
2309                 if (conn_cb) {
2310                         CWARN("ConnCB: ref %d, schd %d, conn %d, cnted %d, del %d\n",
2311                               refcount_read(&conn_cb->ksnr_refcount),
2312                               conn_cb->ksnr_scheduled, conn_cb->ksnr_connecting,
2313                               conn_cb->ksnr_connected, conn_cb->ksnr_deleted);
2314                 }
2315
2316                 list_for_each_entry(conn, &peer_ni->ksnp_conns, ksnc_list) {
2317                         CWARN("Conn: ref %d, sref %d, t %d, c %d\n",
2318                               refcount_read(&conn->ksnc_conn_refcount),
2319                               refcount_read(&conn->ksnc_sock_refcount),
2320                               conn->ksnc_type, conn->ksnc_closing);
2321                 }
2322                 break;
2323         }
2324
2325         read_unlock(&ksocknal_data.ksnd_global_lock);
2326         return 0;
2327 }
2328
2329 void
2330 ksocknal_shutdown(struct lnet_ni *ni)
2331 {
2332         struct ksock_net *net = ni->ni_data;
2333
2334         LASSERT(ksocknal_data.ksnd_init == SOCKNAL_INIT_ALL);
2335         LASSERT(ksocknal_data.ksnd_nnets > 0);
2336
2337         /* prevent new peers */
2338         atomic_add(SOCKNAL_SHUTDOWN_BIAS, &net->ksnn_npeers);
2339
2340         /* Delete all peers */
2341         ksocknal_del_peer(ni, NULL);
2342
2343         /* Wait for all peer_ni state to clean up */
2344         wait_var_event_warning(&net->ksnn_npeers,
2345                                atomic_read(&net->ksnn_npeers) ==
2346                                SOCKNAL_SHUTDOWN_BIAS,
2347                                "waiting for %d peers to disconnect\n",
2348                                ksocknal_debug_peerhash(ni) +
2349                                atomic_read(&net->ksnn_npeers) -
2350                                SOCKNAL_SHUTDOWN_BIAS);
2351
2352         LASSERT(net->ksnn_interface.ksni_npeers == 0);
2353         LASSERT(net->ksnn_interface.ksni_nroutes == 0);
2354
2355         list_del(&net->ksnn_list);
2356         LIBCFS_FREE(net, sizeof(*net));
2357
2358         ksocknal_data.ksnd_nnets--;
2359         if (ksocknal_data.ksnd_nnets == 0)
2360                 ksocknal_base_shutdown();
2361 }
2362
2363 static int
2364 ksocknal_search_new_ipif(struct ksock_net *net)
2365 {
2366         int new_ipif = 0;
2367         char *ifnam = &net->ksnn_interface.ksni_name[0];
2368         char *colon = strchr(ifnam, ':');
2369         bool found = false;
2370         struct ksock_net *tmp;
2371
2372         if (colon != NULL)
2373                 *colon = 0;
2374
2375         list_for_each_entry(tmp, &ksocknal_data.ksnd_nets, ksnn_list) {
2376                 char *ifnam2 = &tmp->ksnn_interface.ksni_name[0];
2377                 char *colon2 = strchr(ifnam2, ':');
2378
2379                 if (colon2 != NULL)
2380                         *colon2 = 0;
2381
2382                 found = strcmp(ifnam, ifnam2) == 0;
2383                 if (colon2 != NULL)
2384                         *colon2 = ':';
2385         }
2386
2387         new_ipif += !found;
2388         if (colon != NULL)
2389                 *colon = ':';
2390
2391         return new_ipif;
2392 }
2393
2394 static int
2395 ksocknal_start_schedulers(struct ksock_sched *sched)
2396 {
2397         int     nthrs;
2398         int     rc = 0;
2399         int     i;
2400
2401         if (sched->kss_nthreads == 0) {
2402                 if (*ksocknal_tunables.ksnd_nscheds > 0) {
2403                         nthrs = sched->kss_nthreads_max;
2404                 } else {
2405                         nthrs = cfs_cpt_weight(lnet_cpt_table(),
2406                                                sched->kss_cpt);
2407                         nthrs = min(max(SOCKNAL_NSCHEDS, nthrs >> 1), nthrs);
2408                         nthrs = min(SOCKNAL_NSCHEDS_HIGH, nthrs);
2409                 }
2410                 nthrs = min(nthrs, sched->kss_nthreads_max);
2411         } else {
2412                 LASSERT(sched->kss_nthreads <= sched->kss_nthreads_max);
2413                 /* increase two threads if there is new interface */
2414                 nthrs = min(2, sched->kss_nthreads_max - sched->kss_nthreads);
2415         }
2416
2417         for (i = 0; i < nthrs; i++) {
2418                 long id;
2419
2420                 id = KSOCK_THREAD_ID(sched->kss_cpt, sched->kss_nthreads + i);
2421                 rc = ksocknal_thread_start(ksocknal_scheduler, (void *)id,
2422                                            "socknal_sd%02d_%02d",
2423                                            sched->kss_cpt,
2424                                            (int)KSOCK_THREAD_SID(id));
2425                 if (rc == 0)
2426                         continue;
2427
2428                 CERROR("Can't spawn thread %d for scheduler[%d]: %d\n",
2429                        sched->kss_cpt, (int) KSOCK_THREAD_SID(id), rc);
2430                 break;
2431         }
2432
2433         sched->kss_nthreads += i;
2434         return rc;
2435 }
2436
2437 static int
2438 ksocknal_net_start_threads(struct ksock_net *net, __u32 *cpts, int ncpts)
2439 {
2440         int newif = ksocknal_search_new_ipif(net);
2441         int rc;
2442         int i;
2443
2444         if (ncpts > 0 && ncpts > cfs_cpt_number(lnet_cpt_table()))
2445                 return -EINVAL;
2446
2447         for (i = 0; i < ncpts; i++) {
2448                 struct ksock_sched *sched;
2449                 int cpt = (cpts == NULL) ? i : cpts[i];
2450
2451                 LASSERT(cpt < cfs_cpt_number(lnet_cpt_table()));
2452                 sched = ksocknal_data.ksnd_schedulers[cpt];
2453
2454                 if (!newif && sched->kss_nthreads > 0)
2455                         continue;
2456
2457                 rc = ksocknal_start_schedulers(sched);
2458                 if (rc != 0)
2459                         return rc;
2460         }
2461         return 0;
2462 }
2463
2464 int
2465 ksocknal_startup(struct lnet_ni *ni)
2466 {
2467         struct ksock_net *net;
2468         struct ksock_interface *ksi = NULL;
2469         struct lnet_inetdev *ifaces = NULL;
2470         struct sockaddr_in *sa;
2471         int i = 0;
2472         int rc;
2473
2474         LASSERT (ni->ni_net->net_lnd == &the_ksocklnd);
2475         if (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING) {
2476                 rc = ksocknal_base_startup();
2477                 if (rc != 0)
2478                         return rc;
2479         }
2480         LIBCFS_ALLOC(net, sizeof(*net));
2481         if (net == NULL)
2482                 goto fail_0;
2483         net->ksnn_incarnation = ktime_get_real_ns();
2484         ni->ni_data = net;
2485
2486         ksocknal_tunables_setup(ni);
2487
2488         rc = lnet_inet_enumerate(&ifaces, ni->ni_net_ns);
2489         if (rc < 0)
2490                 goto fail_1;
2491
2492         ksi = &net->ksnn_interface;
2493
2494         /* Use the first discovered interface or look in the list */
2495         if (ni->ni_interface) {
2496                 for (i = 0; i < rc; i++)
2497                         if (strcmp(ifaces[i].li_name, ni->ni_interface) == 0)
2498                                 break;
2499
2500                 /* ni_interfaces doesn't contain the interface we want */
2501                 if (i == rc) {
2502                         CERROR("ksocklnd: failed to find interface %s\n",
2503                                ni->ni_interface);
2504                         goto fail_1;
2505                 }
2506         }
2507
2508         ni->ni_dev_cpt = ifaces[i].li_cpt;
2509         sa = (void *)&ksi->ksni_addr;
2510         memset(sa, 0, sizeof(*sa));
2511         sa->sin_family = AF_INET;
2512         sa->sin_addr.s_addr = htonl(ifaces[i].li_ipaddr);
2513         ksi->ksni_index = ksocknal_ip2index((struct sockaddr *)sa, ni);
2514         ksi->ksni_netmask = ifaces[i].li_netmask;
2515         strlcpy(ksi->ksni_name, ifaces[i].li_name, sizeof(ksi->ksni_name));
2516
2517         /* call it before add it to ksocknal_data.ksnd_nets */
2518         rc = ksocknal_net_start_threads(net, ni->ni_cpts, ni->ni_ncpts);
2519         if (rc != 0)
2520                 goto fail_1;
2521
2522         LASSERT(ksi);
2523         LASSERT(ksi->ksni_addr.ss_family == AF_INET);
2524         ni->ni_nid.nid_addr[0] =
2525                 ((struct sockaddr_in *)&ksi->ksni_addr)->sin_addr.s_addr;
2526         list_add(&net->ksnn_list, &ksocknal_data.ksnd_nets);
2527         net->ksnn_ni = ni;
2528         ksocknal_data.ksnd_nnets++;
2529
2530         return 0;
2531
2532 fail_1:
2533         LIBCFS_FREE(net, sizeof(*net));
2534 fail_0:
2535         if (ksocknal_data.ksnd_nnets == 0)
2536                 ksocknal_base_shutdown();
2537
2538         return -ENETDOWN;
2539 }
2540
2541 static void __exit ksocklnd_exit(void)
2542 {
2543         lnet_unregister_lnd(&the_ksocklnd);
2544 }
2545
2546 static const struct lnet_lnd the_ksocklnd = {
2547         .lnd_type               = SOCKLND,
2548         .lnd_startup            = ksocknal_startup,
2549         .lnd_shutdown           = ksocknal_shutdown,
2550         .lnd_ctl                = ksocknal_ctl,
2551         .lnd_send               = ksocknal_send,
2552         .lnd_recv               = ksocknal_recv,
2553         .lnd_notify_peer_down   = ksocknal_notify_gw_down,
2554         .lnd_accept             = ksocknal_accept,
2555 };
2556
2557 static int __init ksocklnd_init(void)
2558 {
2559         int rc;
2560
2561         /* check ksnr_connected/connecting field large enough */
2562         BUILD_BUG_ON(SOCKLND_CONN_NTYPES > 4);
2563         BUILD_BUG_ON(SOCKLND_CONN_ACK != SOCKLND_CONN_BULK_IN);
2564
2565         rc = ksocknal_tunables_init();
2566         if (rc != 0)
2567                 return rc;
2568
2569         lnet_register_lnd(&the_ksocklnd);
2570
2571         return 0;
2572 }
2573
2574 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2575 MODULE_DESCRIPTION("TCP Socket LNet Network Driver");
2576 MODULE_VERSION("2.8.0");
2577 MODULE_LICENSE("GPL");
2578
2579 module_init(ksocklnd_init);
2580 module_exit(ksocklnd_exit);