Whamcloud - gitweb
LU-15117 ofd: no lock for dt_bufs_get() in read path
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lnet/klnds/socklnd/socklnd.c
32  *
33  * Author: Zach Brown <zab@zabbo.net>
34  * Author: Peter J. Braam <braam@clusterfs.com>
35  * Author: Phil Schwan <phil@clusterfs.com>
36  * Author: Eric Barton <eric@bartonsoftware.com>
37  */
38
39 #include <linux/ethtool.h>
40 #include <linux/inetdevice.h>
41 #include "socklnd.h"
42 #include <linux/sunrpc/addr.h>
43 #include <net/addrconf.h>
44
45 static const struct lnet_lnd the_ksocklnd;
46 struct ksock_nal_data ksocknal_data;
47
48 static struct ksock_interface *
49 ksocknal_index2iface(struct lnet_ni *ni, int index)
50 {
51         struct ksock_net *net = ni->ni_data;
52         struct ksock_interface *iface;
53
54         iface = &net->ksnn_interface;
55
56         if (iface->ksni_index == index)
57                 return iface;
58
59         return NULL;
60 }
61
62 static int ksocknal_ip2index(struct sockaddr *addr, struct lnet_ni *ni)
63 {
64         struct net_device *dev;
65         int ret = -1;
66         DECLARE_CONST_IN_IFADDR(ifa);
67
68         if (addr->sa_family != AF_INET && addr->sa_family != AF_INET6)
69                 return ret;
70
71         rcu_read_lock();
72         for_each_netdev(ni->ni_net_ns, dev) {
73                 int flags = dev_get_flags(dev);
74                 struct in_device *in_dev;
75
76                 if (flags & IFF_LOOPBACK) /* skip the loopback IF */
77                         continue;
78
79                 if (!(flags & IFF_UP))
80                         continue;
81
82                 switch (addr->sa_family) {
83                 case AF_INET:
84                         in_dev = __in_dev_get_rcu(dev);
85                         if (!in_dev)
86                                 continue;
87
88                         in_dev_for_each_ifa_rcu(ifa, in_dev) {
89                                 if (ifa->ifa_local ==
90                                     ((struct sockaddr_in *)addr)->sin_addr.s_addr)
91                                         ret = dev->ifindex;
92                         }
93                         endfor_ifa(in_dev);
94                         break;
95 #if IS_ENABLED(CONFIG_IPV6)
96                 case AF_INET6: {
97                         struct inet6_dev *in6_dev;
98                         const struct inet6_ifaddr *ifa6;
99                         struct sockaddr_in6 *addr6 = (struct sockaddr_in6*)addr;
100
101                         in6_dev = __in6_dev_get(dev);
102                         if (!in6_dev)
103                                 continue;
104
105                         list_for_each_entry_rcu(ifa6, &in6_dev->addr_list, if_list) {
106                                 if (ipv6_addr_cmp(&ifa6->addr,
107                                                  &addr6->sin6_addr) == 0)
108                                         ret = dev->ifindex;
109                         }
110                         break;
111                         }
112 #endif /* IS_ENABLED(CONFIG_IPV6) */
113                 }
114                 if (ret >= 0)
115                         break;
116         }
117         rcu_read_unlock();
118
119         return ret;
120 }
121
122 static struct ksock_conn_cb *
123 ksocknal_create_conn_cb(struct sockaddr *addr)
124 {
125         struct ksock_conn_cb *conn_cb;
126
127         LIBCFS_ALLOC(conn_cb, sizeof(*conn_cb));
128         if (!conn_cb)
129                 return NULL;
130
131         refcount_set(&conn_cb->ksnr_refcount, 1);
132         conn_cb->ksnr_peer = NULL;
133         conn_cb->ksnr_retry_interval = 0;         /* OK to connect at any time */
134         rpc_copy_addr((struct sockaddr *)&conn_cb->ksnr_addr, addr);
135         rpc_set_port((struct sockaddr *)&conn_cb->ksnr_addr,
136                      rpc_get_port(addr));
137         conn_cb->ksnr_myiface = -1;
138         conn_cb->ksnr_scheduled = 0;
139         conn_cb->ksnr_connecting = 0;
140         conn_cb->ksnr_connected = 0;
141         conn_cb->ksnr_deleted = 0;
142         conn_cb->ksnr_conn_count = 0;
143         conn_cb->ksnr_ctrl_conn_count = 0;
144         conn_cb->ksnr_blki_conn_count = 0;
145         conn_cb->ksnr_blko_conn_count = 0;
146         conn_cb->ksnr_max_conns = 0;
147         conn_cb->ksnr_busy_retry_count = 0;
148
149         return conn_cb;
150 }
151
152 void
153 ksocknal_destroy_conn_cb(struct ksock_conn_cb *conn_cb)
154 {
155         LASSERT(refcount_read(&conn_cb->ksnr_refcount) == 0);
156
157         if (conn_cb->ksnr_peer)
158                 ksocknal_peer_decref(conn_cb->ksnr_peer);
159
160         LIBCFS_FREE(conn_cb, sizeof(*conn_cb));
161 }
162
163 static struct ksock_peer_ni *
164 ksocknal_create_peer(struct lnet_ni *ni, struct lnet_processid *id)
165 {
166         int cpt = lnet_nid2cpt(&id->nid, ni);
167         struct ksock_net *net = ni->ni_data;
168         struct ksock_peer_ni *peer_ni;
169
170         LASSERT(!LNET_NID_IS_ANY(&id->nid));
171         LASSERT(id->pid != LNET_PID_ANY);
172         LASSERT(!in_interrupt());
173
174         if (!atomic_inc_unless_negative(&net->ksnn_npeers)) {
175                 CERROR("Can't create peer_ni: network shutdown\n");
176                 return ERR_PTR(-ESHUTDOWN);
177         }
178
179         LIBCFS_CPT_ALLOC(peer_ni, lnet_cpt_table(), cpt, sizeof(*peer_ni));
180         if (!peer_ni) {
181                 atomic_dec(&net->ksnn_npeers);
182                 return ERR_PTR(-ENOMEM);
183         }
184
185         peer_ni->ksnp_ni = ni;
186         peer_ni->ksnp_id = *id;
187         refcount_set(&peer_ni->ksnp_refcount, 1); /* 1 ref for caller */
188         peer_ni->ksnp_closing = 0;
189         peer_ni->ksnp_accepting = 0;
190         peer_ni->ksnp_proto = NULL;
191         peer_ni->ksnp_last_alive = 0;
192         peer_ni->ksnp_zc_next_cookie = SOCKNAL_KEEPALIVE_PING + 1;
193         peer_ni->ksnp_conn_cb = NULL;
194
195         INIT_LIST_HEAD(&peer_ni->ksnp_conns);
196         INIT_LIST_HEAD(&peer_ni->ksnp_tx_queue);
197         INIT_LIST_HEAD(&peer_ni->ksnp_zc_req_list);
198         spin_lock_init(&peer_ni->ksnp_lock);
199
200         return peer_ni;
201 }
202
203 void
204 ksocknal_destroy_peer(struct ksock_peer_ni *peer_ni)
205 {
206         struct ksock_net *net = peer_ni->ksnp_ni->ni_data;
207
208         CDEBUG (D_NET, "peer_ni %s %p deleted\n",
209                 libcfs_idstr(&peer_ni->ksnp_id), peer_ni);
210
211         LASSERT(refcount_read(&peer_ni->ksnp_refcount) == 0);
212         LASSERT(peer_ni->ksnp_accepting == 0);
213         LASSERT(list_empty(&peer_ni->ksnp_conns));
214         LASSERT(peer_ni->ksnp_conn_cb == NULL);
215         LASSERT(list_empty(&peer_ni->ksnp_tx_queue));
216         LASSERT(list_empty(&peer_ni->ksnp_zc_req_list));
217
218         LIBCFS_FREE(peer_ni, sizeof(*peer_ni));
219
220         /* NB a peer_ni's connections and conn_cb keep a reference on their
221          * peer_ni until they are destroyed, so we can be assured that _all_
222          * state to do with this peer_ni has been cleaned up when its refcount
223          * drops to zero.
224          */
225         if (atomic_dec_and_test(&net->ksnn_npeers))
226                 wake_up_var(&net->ksnn_npeers);
227 }
228
229 struct ksock_peer_ni *
230 ksocknal_find_peer_locked(struct lnet_ni *ni, struct lnet_processid *id)
231 {
232         struct ksock_peer_ni *peer_ni;
233         unsigned long hash = nidhash(&id->nid);
234
235         hash_for_each_possible(ksocknal_data.ksnd_peers, peer_ni,
236                                ksnp_list, hash) {
237                 LASSERT(!peer_ni->ksnp_closing);
238
239                 if (peer_ni->ksnp_ni != ni)
240                         continue;
241
242                 if (!nid_same(&peer_ni->ksnp_id.nid, &id->nid) ||
243                     peer_ni->ksnp_id.pid != id->pid)
244                         continue;
245
246                 CDEBUG(D_NET, "got peer_ni [%p] -> %s (%d)\n",
247                        peer_ni, libcfs_idstr(id),
248                        refcount_read(&peer_ni->ksnp_refcount));
249                 return peer_ni;
250         }
251         return NULL;
252 }
253
254 struct ksock_peer_ni *
255 ksocknal_find_peer(struct lnet_ni *ni, struct lnet_processid *id)
256 {
257         struct ksock_peer_ni *peer_ni;
258
259         read_lock(&ksocknal_data.ksnd_global_lock);
260         peer_ni = ksocknal_find_peer_locked(ni, id);
261         if (peer_ni != NULL)                    /* +1 ref for caller? */
262                 ksocknal_peer_addref(peer_ni);
263         read_unlock(&ksocknal_data.ksnd_global_lock);
264
265         return peer_ni;
266 }
267
268 static void
269 ksocknal_unlink_peer_locked(struct ksock_peer_ni *peer_ni)
270 {
271         LASSERT(list_empty(&peer_ni->ksnp_conns));
272         LASSERT(peer_ni->ksnp_conn_cb == NULL);
273         LASSERT(!peer_ni->ksnp_closing);
274         peer_ni->ksnp_closing = 1;
275         hlist_del(&peer_ni->ksnp_list);
276         /* lose peerlist's ref */
277         ksocknal_peer_decref(peer_ni);
278 }
279
280
281 static void
282 ksocknal_dump_peer_debug_info(struct ksock_peer_ni *peer_ni)
283 {
284         struct ksock_conn *conn;
285         struct list_head *ctmp;
286         struct list_head *txtmp;
287         int ccount = 0;
288         int txcount = 0;
289
290         list_for_each(ctmp, &peer_ni->ksnp_conns) {
291                 conn = list_entry(ctmp, struct ksock_conn, ksnc_list);
292
293                 if (!list_empty(&conn->ksnc_tx_queue))
294                         list_for_each(txtmp, &conn->ksnc_tx_queue) txcount++;
295
296                 CDEBUG(D_CONSOLE, "Conn %d [type, closing, crefcnt, srefcnt]: %d, %d, %d, %d\n",
297                        ccount,
298                        conn->ksnc_type,
299                        conn->ksnc_closing,
300                        refcount_read(&conn->ksnc_conn_refcount),
301                        refcount_read(&conn->ksnc_sock_refcount));
302                 CDEBUG(D_CONSOLE, "Conn %d rx [scheduled, ready, state]: %d, %d, %d\n",
303                        ccount,
304                        conn->ksnc_rx_scheduled,
305                        conn->ksnc_rx_ready,
306                        conn->ksnc_rx_state);
307                 CDEBUG(D_CONSOLE, "Conn %d tx [txqcnt, scheduled, last_post, ready, deadline]: %d, %d, %lld, %d, %lld\n",
308                        ccount,
309                        txcount,
310                        conn->ksnc_tx_scheduled,
311                        conn->ksnc_tx_last_post,
312                        conn->ksnc_rx_ready,
313                        conn->ksnc_rx_deadline);
314
315                 if (conn->ksnc_scheduler)
316                         CDEBUG(D_CONSOLE, "Conn %d sched [nconns, cpt]: %d, %d\n",
317                                ccount,
318                                conn->ksnc_scheduler->kss_nconns,
319                                conn->ksnc_scheduler->kss_cpt);
320
321                 txcount = 0;
322                 ccount++;
323         }
324 }
325
326 static int
327 ksocknal_get_peer_info(struct lnet_ni *ni, int index,
328                        struct lnet_processid *id, __u32 *myip, __u32 *peer_ip,
329                        int *port, int *conn_count, int *share_count)
330 {
331         struct ksock_peer_ni *peer_ni;
332         struct ksock_conn_cb *conn_cb;
333         int i;
334         int rc = -ENOENT;
335
336         read_lock(&ksocknal_data.ksnd_global_lock);
337
338         hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list) {
339
340                 if (peer_ni->ksnp_ni != ni)
341                         continue;
342                 if (index-- > 0)
343                         continue;
344
345                 *id = peer_ni->ksnp_id;
346                 conn_cb = peer_ni->ksnp_conn_cb;
347                 if (conn_cb == NULL) {
348                         *myip = 0;
349                         *peer_ip = 0;
350                         *port = 0;
351                         *conn_count = 0;
352                         *share_count = 0;
353                         rc = 0;
354                 } else {
355                         ksocknal_dump_peer_debug_info(peer_ni);
356
357                         if (conn_cb->ksnr_addr.ss_family == AF_INET) {
358                                 struct sockaddr_in *sa =
359                                         (void *)&conn_cb->ksnr_addr;
360
361                                 rc = choose_ipv4_src(myip,
362                                                      conn_cb->ksnr_myiface,
363                                                      ntohl(sa->sin_addr.s_addr),
364                                                      ni->ni_net_ns);
365                                 *peer_ip = ntohl(sa->sin_addr.s_addr);
366                                 *port = ntohs(sa->sin_port);
367
368                         } else {
369                                 *myip = 0xFFFFFFFF;
370                                 *peer_ip = 0xFFFFFFFF;
371                                 *port = 0;
372                                 rc = -ENOTSUPP;
373                         }
374                         *conn_count = conn_cb->ksnr_conn_count;
375                         *share_count = 1;
376                 }
377                 break;
378         }
379         read_unlock(&ksocknal_data.ksnd_global_lock);
380         return rc;
381 }
382
383 static unsigned int
384 ksocknal_get_conn_count_by_type(struct ksock_conn_cb *conn_cb,
385                                 int type)
386 {
387         unsigned int count = 0;
388
389         switch (type) {
390         case SOCKLND_CONN_CONTROL:
391                 count = conn_cb->ksnr_ctrl_conn_count;
392                 break;
393         case SOCKLND_CONN_BULK_IN:
394                 count = conn_cb->ksnr_blki_conn_count;
395                 break;
396         case SOCKLND_CONN_BULK_OUT:
397                 count = conn_cb->ksnr_blko_conn_count;
398                 break;
399         case SOCKLND_CONN_ANY:
400                 count = conn_cb->ksnr_conn_count;
401                 break;
402         default:
403                 LBUG();
404                 break;
405         }
406
407         return count;
408 }
409
410 static unsigned int
411 ksocknal_get_conns_per_peer(struct ksock_peer_ni *peer_ni)
412 {
413         struct lnet_ni *ni = peer_ni->ksnp_ni;
414         struct lnet_ioctl_config_socklnd_tunables *tunables;
415
416         LASSERT(ni);
417
418         tunables = &ni->ni_lnd_tunables.lnd_tun_u.lnd_sock;
419
420         return tunables->lnd_conns_per_peer;
421 }
422
423 static void
424 ksocknal_incr_conn_count(struct ksock_conn_cb *conn_cb,
425                          int type)
426 {
427         conn_cb->ksnr_conn_count++;
428
429         /* check if all connections of the given type got created */
430         switch (type) {
431         case SOCKLND_CONN_CONTROL:
432                 conn_cb->ksnr_ctrl_conn_count++;
433                 /* there's a single control connection per peer,
434                  * two in case of loopback
435                  */
436                 conn_cb->ksnr_connected |= BIT(type);
437                 break;
438         case SOCKLND_CONN_BULK_IN:
439                 conn_cb->ksnr_blki_conn_count++;
440                 if (conn_cb->ksnr_blki_conn_count >= conn_cb->ksnr_max_conns)
441                         conn_cb->ksnr_connected |= BIT(type);
442                 break;
443         case SOCKLND_CONN_BULK_OUT:
444                 conn_cb->ksnr_blko_conn_count++;
445                 if (conn_cb->ksnr_blko_conn_count >= conn_cb->ksnr_max_conns)
446                         conn_cb->ksnr_connected |= BIT(type);
447                 break;
448         case SOCKLND_CONN_ANY:
449                 if (conn_cb->ksnr_conn_count >= conn_cb->ksnr_max_conns)
450                         conn_cb->ksnr_connected |= BIT(type);
451                 break;
452         default:
453                 LBUG();
454                 break;
455         }
456
457         CDEBUG(D_NET, "Add conn type %d, ksnr_connected %x ksnr_max_conns %d\n",
458                type, conn_cb->ksnr_connected, conn_cb->ksnr_max_conns);
459 }
460
461
462 static void
463 ksocknal_decr_conn_count(struct ksock_conn_cb *conn_cb,
464                          int type)
465 {
466         conn_cb->ksnr_conn_count--;
467
468         /* check if all connections of the given type got created */
469         switch (type) {
470         case SOCKLND_CONN_CONTROL:
471                 conn_cb->ksnr_ctrl_conn_count--;
472                 /* there's a single control connection per peer,
473                  * two in case of loopback
474                  */
475                 if (conn_cb->ksnr_ctrl_conn_count == 0)
476                         conn_cb->ksnr_connected &= ~BIT(type);
477                 break;
478         case SOCKLND_CONN_BULK_IN:
479                 conn_cb->ksnr_blki_conn_count--;
480                 if (conn_cb->ksnr_blki_conn_count < conn_cb->ksnr_max_conns)
481                         conn_cb->ksnr_connected &= ~BIT(type);
482                 break;
483         case SOCKLND_CONN_BULK_OUT:
484                 conn_cb->ksnr_blko_conn_count--;
485                 if (conn_cb->ksnr_blko_conn_count < conn_cb->ksnr_max_conns)
486                         conn_cb->ksnr_connected &= ~BIT(type);
487                 break;
488         case SOCKLND_CONN_ANY:
489                 if (conn_cb->ksnr_conn_count < conn_cb->ksnr_max_conns)
490                         conn_cb->ksnr_connected &= ~BIT(type);
491                 break;
492         default:
493                 LBUG();
494                 break;
495         }
496
497         CDEBUG(D_NET, "Del conn type %d, ksnr_connected %x ksnr_max_conns %d\n",
498                type, conn_cb->ksnr_connected, conn_cb->ksnr_max_conns);
499 }
500
501 static void
502 ksocknal_associate_cb_conn_locked(struct ksock_conn_cb *conn_cb,
503                                   struct ksock_conn *conn)
504 {
505         struct ksock_peer_ni *peer_ni = conn_cb->ksnr_peer;
506         int type = conn->ksnc_type;
507         struct ksock_interface *iface;
508         int conn_iface;
509
510         conn_iface = ksocknal_ip2index((struct sockaddr *)&conn->ksnc_myaddr,
511                                        peer_ni->ksnp_ni);
512         conn->ksnc_conn_cb = conn_cb;
513         ksocknal_conn_cb_addref(conn_cb);
514
515         if (conn_cb->ksnr_myiface != conn_iface) {
516                 if (conn_cb->ksnr_myiface < 0) {
517                         /* route wasn't bound locally yet (the initial route) */
518                         CDEBUG(D_NET, "Binding %s %pISc to interface %d\n",
519                                libcfs_idstr(&peer_ni->ksnp_id),
520                                &conn_cb->ksnr_addr,
521                                conn_iface);
522                 } else {
523                         CDEBUG(D_NET,
524                                "Rebinding %s %pISc from interface %d to %d\n",
525                                libcfs_idstr(&peer_ni->ksnp_id),
526                                &conn_cb->ksnr_addr,
527                                conn_cb->ksnr_myiface,
528                                conn_iface);
529
530                         iface = ksocknal_index2iface(peer_ni->ksnp_ni,
531                                                      conn_cb->ksnr_myiface);
532                         if (iface)
533                                 iface->ksni_nroutes--;
534                 }
535                 conn_cb->ksnr_myiface = conn_iface;
536                 iface = ksocknal_index2iface(peer_ni->ksnp_ni,
537                                              conn_cb->ksnr_myiface);
538                 if (iface)
539                         iface->ksni_nroutes++;
540         }
541
542         ksocknal_incr_conn_count(conn_cb, type);
543
544         /* Successful connection => further attempts can
545          * proceed immediately
546          */
547         conn_cb->ksnr_retry_interval = 0;
548 }
549
550 static void
551 ksocknal_add_conn_cb_locked(struct ksock_peer_ni *peer_ni,
552                             struct ksock_conn_cb *conn_cb)
553 {
554         struct ksock_conn *conn;
555         struct ksock_net *net = peer_ni->ksnp_ni->ni_data;
556
557         LASSERT(!peer_ni->ksnp_closing);
558         LASSERT(!conn_cb->ksnr_peer);
559         LASSERT(!conn_cb->ksnr_scheduled);
560         LASSERT(!conn_cb->ksnr_connecting);
561         LASSERT(conn_cb->ksnr_connected == 0);
562
563         conn_cb->ksnr_peer = peer_ni;
564         ksocknal_peer_addref(peer_ni);
565
566         /* set the conn_cb's interface to the current net's interface */
567         conn_cb->ksnr_myiface = net->ksnn_interface.ksni_index;
568         net->ksnn_interface.ksni_nroutes++;
569
570         /* peer_ni's route list takes over my ref on 'route' */
571         peer_ni->ksnp_conn_cb = conn_cb;
572
573         list_for_each_entry(conn, &peer_ni->ksnp_conns, ksnc_list) {
574                 if (!rpc_cmp_addr((struct sockaddr *)&conn->ksnc_peeraddr,
575                                   (struct sockaddr *)&conn_cb->ksnr_addr))
576                         continue;
577
578                 ksocknal_associate_cb_conn_locked(conn_cb, conn);
579                 /* keep going (typed conns) */
580         }
581 }
582
583 static void
584 ksocknal_del_conn_cb_locked(struct ksock_conn_cb *conn_cb)
585 {
586         struct ksock_peer_ni *peer_ni = conn_cb->ksnr_peer;
587         struct ksock_interface *iface;
588         struct ksock_conn *conn;
589         struct ksock_conn *cnxt;
590
591         LASSERT(!conn_cb->ksnr_deleted);
592
593         /* Close associated conns */
594         list_for_each_entry_safe(conn, cnxt, &peer_ni->ksnp_conns, ksnc_list) {
595                 if (conn->ksnc_conn_cb != conn_cb)
596                         continue;
597
598                 ksocknal_close_conn_locked(conn, 0);
599         }
600
601         if (conn_cb->ksnr_myiface >= 0) {
602                 iface = ksocknal_index2iface(peer_ni->ksnp_ni,
603                                              conn_cb->ksnr_myiface);
604                 if (iface)
605                         iface->ksni_nroutes--;
606         }
607
608         conn_cb->ksnr_deleted = 1;
609         ksocknal_conn_cb_decref(conn_cb);               /* drop peer_ni's ref */
610         peer_ni->ksnp_conn_cb = NULL;
611
612         if (list_empty(&peer_ni->ksnp_conns)) {
613                 /* I've just removed the last route to a peer_ni with no active
614                  * connections
615                  */
616                 ksocknal_unlink_peer_locked(peer_ni);
617         }
618 }
619
620 int
621 ksocknal_add_peer(struct lnet_ni *ni, struct lnet_processid *id,
622                   struct sockaddr *addr)
623 {
624         struct ksock_peer_ni *peer_ni;
625         struct ksock_peer_ni *peer2;
626         struct ksock_conn_cb *conn_cb;
627
628         if (LNET_NID_IS_ANY(&id->nid) ||
629             id->pid == LNET_PID_ANY)
630                 return (-EINVAL);
631
632         /* Have a brand new peer_ni ready... */
633         peer_ni = ksocknal_create_peer(ni, id);
634         if (IS_ERR(peer_ni))
635                 return PTR_ERR(peer_ni);
636
637         conn_cb = ksocknal_create_conn_cb(addr);
638         if (!conn_cb) {
639                 ksocknal_peer_decref(peer_ni);
640                 return -ENOMEM;
641         }
642
643         write_lock_bh(&ksocknal_data.ksnd_global_lock);
644
645         /* always called with a ref on ni, so shutdown can't have started */
646         LASSERT(atomic_read(&((struct ksock_net *)ni->ni_data)->ksnn_npeers)
647                 >= 0);
648
649         peer2 = ksocknal_find_peer_locked(ni, id);
650         if (peer2 != NULL) {
651                 ksocknal_peer_decref(peer_ni);
652                 peer_ni = peer2;
653         } else {
654                 /* peer_ni table takes my ref on peer_ni */
655                 hash_add(ksocknal_data.ksnd_peers, &peer_ni->ksnp_list,
656                          nidhash(&id->nid));
657         }
658
659         if (peer_ni->ksnp_conn_cb) {
660                 ksocknal_conn_cb_decref(conn_cb);
661         } else {
662                 ksocknal_add_conn_cb_locked(peer_ni, conn_cb);
663                 /* Remember conns_per_peer setting at the time
664                  * of connection initiation. It will define the
665                  * max number of conns per type for this conn_cb
666                  * while it's in use.
667                  */
668                 conn_cb->ksnr_max_conns = ksocknal_get_conns_per_peer(peer_ni);
669         }
670
671         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
672
673         return 0;
674 }
675
676 static void
677 ksocknal_del_peer_locked(struct ksock_peer_ni *peer_ni)
678 {
679         struct ksock_conn *conn;
680         struct ksock_conn *cnxt;
681         struct ksock_conn_cb *conn_cb;
682
683         LASSERT(!peer_ni->ksnp_closing);
684
685         /* Extra ref prevents peer_ni disappearing until I'm done with it */
686         ksocknal_peer_addref(peer_ni);
687         conn_cb = peer_ni->ksnp_conn_cb;
688         if (conn_cb)
689                 ksocknal_del_conn_cb_locked(conn_cb);
690
691         list_for_each_entry_safe(conn, cnxt, &peer_ni->ksnp_conns,
692                                  ksnc_list)
693                 ksocknal_close_conn_locked(conn, 0);
694
695         ksocknal_peer_decref(peer_ni);
696         /* NB peer_ni unlinks itself when last conn/conn_cb is removed */
697 }
698
699 static int
700 ksocknal_del_peer(struct lnet_ni *ni, struct lnet_processid *id)
701 {
702         LIST_HEAD(zombies);
703         struct hlist_node *pnxt;
704         struct ksock_peer_ni *peer_ni;
705         int lo;
706         int hi;
707         int i;
708         int rc = -ENOENT;
709
710         write_lock_bh(&ksocknal_data.ksnd_global_lock);
711
712         if (id && !LNET_NID_IS_ANY(&id->nid)) {
713                 lo = hash_min(nidhash(&id->nid),
714                               HASH_BITS(ksocknal_data.ksnd_peers));
715                 hi = lo;
716         } else {
717                 lo = 0;
718                 hi = HASH_SIZE(ksocknal_data.ksnd_peers) - 1;
719         }
720
721         for (i = lo; i <= hi; i++) {
722                 hlist_for_each_entry_safe(peer_ni, pnxt,
723                                           &ksocknal_data.ksnd_peers[i],
724                                           ksnp_list) {
725                         if (peer_ni->ksnp_ni != ni)
726                                 continue;
727
728                         if (!((!id || LNET_NID_IS_ANY(&id->nid) ||
729                                nid_same(&peer_ni->ksnp_id.nid, &id->nid)) &&
730                               (!id || id->pid == LNET_PID_ANY ||
731                                peer_ni->ksnp_id.pid == id->pid)))
732                                 continue;
733
734                         ksocknal_peer_addref(peer_ni);  /* a ref for me... */
735
736                         ksocknal_del_peer_locked(peer_ni);
737
738                         if (peer_ni->ksnp_closing &&
739                             !list_empty(&peer_ni->ksnp_tx_queue)) {
740                                 LASSERT(list_empty(&peer_ni->ksnp_conns));
741                                 LASSERT(peer_ni->ksnp_conn_cb == NULL);
742
743                                 list_splice_init(&peer_ni->ksnp_tx_queue,
744                                                  &zombies);
745                         }
746
747                         ksocknal_peer_decref(peer_ni);  /* ...till here */
748
749                         rc = 0;                         /* matched! */
750                 }
751         }
752
753         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
754
755         ksocknal_txlist_done(ni, &zombies, -ENETDOWN);
756
757         return rc;
758 }
759
760 static struct ksock_conn *
761 ksocknal_get_conn_by_idx(struct lnet_ni *ni, int index)
762 {
763         struct ksock_peer_ni *peer_ni;
764         struct ksock_conn *conn;
765         int i;
766
767         read_lock(&ksocknal_data.ksnd_global_lock);
768
769         hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list) {
770                 LASSERT(!peer_ni->ksnp_closing);
771
772                 if (peer_ni->ksnp_ni != ni)
773                         continue;
774
775                 list_for_each_entry(conn, &peer_ni->ksnp_conns,
776                                     ksnc_list) {
777                         if (index-- > 0)
778                                 continue;
779
780                         ksocknal_conn_addref(conn);
781                         read_unlock(&ksocknal_data.ksnd_global_lock);
782                         return conn;
783                 }
784         }
785
786         read_unlock(&ksocknal_data.ksnd_global_lock);
787         return NULL;
788 }
789
790 static struct ksock_sched *
791 ksocknal_choose_scheduler_locked(unsigned int cpt)
792 {
793         struct ksock_sched *sched = ksocknal_data.ksnd_schedulers[cpt];
794         int i;
795
796         if (sched->kss_nthreads == 0) {
797                 cfs_percpt_for_each(sched, i, ksocknal_data.ksnd_schedulers) {
798                         if (sched->kss_nthreads > 0) {
799                                 CDEBUG(D_NET, "scheduler[%d] has no threads. selected scheduler[%d]\n",
800                                        cpt, sched->kss_cpt);
801                                 return sched;
802                         }
803                 }
804                 return NULL;
805         }
806
807         return sched;
808 }
809
810 int
811 ksocknal_accept(struct lnet_ni *ni, struct socket *sock)
812 {
813         struct ksock_connreq *cr;
814         int rc;
815         struct sockaddr_storage peer;
816
817         rc = lnet_sock_getaddr(sock, true, &peer);
818         if (rc != 0) {
819                 CERROR("Can't determine new connection's address\n");
820                 return rc;
821         }
822
823         LIBCFS_ALLOC(cr, sizeof(*cr));
824         if (cr == NULL) {
825                 LCONSOLE_ERROR_MSG(0x12f,
826                                    "Dropping connection request from %pISc: memory exhausted\n",
827                                    &peer);
828                 return -ENOMEM;
829         }
830
831         lnet_ni_addref(ni);
832         cr->ksncr_ni   = ni;
833         cr->ksncr_sock = sock;
834
835         spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
836
837         list_add_tail(&cr->ksncr_list, &ksocknal_data.ksnd_connd_connreqs);
838         wake_up(&ksocknal_data.ksnd_connd_waitq);
839
840         spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
841         return 0;
842 }
843
844 static int
845 ksocknal_connecting(struct ksock_conn_cb *conn_cb, struct sockaddr *sa)
846 {
847         if (conn_cb &&
848             rpc_cmp_addr((struct sockaddr *)&conn_cb->ksnr_addr, sa))
849                 return conn_cb->ksnr_connecting;
850         return 0;
851 }
852
853 int
854 ksocknal_create_conn(struct lnet_ni *ni, struct ksock_conn_cb *conn_cb,
855                      struct socket *sock, int type)
856 {
857         rwlock_t *global_lock = &ksocknal_data.ksnd_global_lock;
858         LIST_HEAD(zombies);
859         struct lnet_processid peerid;
860         u64 incarnation;
861         struct ksock_conn *conn;
862         struct ksock_conn *conn2;
863         struct ksock_peer_ni *peer_ni = NULL;
864         struct ksock_peer_ni *peer2;
865         struct ksock_sched *sched;
866         struct ksock_hello_msg *hello;
867         int cpt;
868         struct ksock_tx *tx;
869         struct ksock_tx *txtmp;
870         int rc;
871         int rc2;
872         int active;
873         int num_dup = 0;
874         char *warn = NULL;
875
876         active = (conn_cb != NULL);
877
878         LASSERT(active == (type != SOCKLND_CONN_NONE));
879
880         LIBCFS_ALLOC(conn, sizeof(*conn));
881         if (conn == NULL) {
882                 rc = -ENOMEM;
883                 goto failed_0;
884         }
885
886         conn->ksnc_peer = NULL;
887         conn->ksnc_conn_cb = NULL;
888         conn->ksnc_sock = sock;
889         /* 2 ref, 1 for conn, another extra ref prevents socket
890          * being closed before establishment of connection */
891         refcount_set(&conn->ksnc_sock_refcount, 2);
892         conn->ksnc_type = type;
893         ksocknal_lib_save_callback(sock, conn);
894         refcount_set(&conn->ksnc_conn_refcount, 1); /* 1 ref for me */
895
896         conn->ksnc_rx_ready = 0;
897         conn->ksnc_rx_scheduled = 0;
898
899         INIT_LIST_HEAD(&conn->ksnc_tx_queue);
900         conn->ksnc_tx_ready = 0;
901         conn->ksnc_tx_scheduled = 0;
902         conn->ksnc_tx_carrier = NULL;
903         atomic_set (&conn->ksnc_tx_nob, 0);
904
905         LIBCFS_ALLOC(hello, offsetof(struct ksock_hello_msg,
906                                      kshm_ips[LNET_INTERFACES_NUM]));
907         if (hello == NULL) {
908                 rc = -ENOMEM;
909                 goto failed_1;
910         }
911
912         /* stash conn's local and remote addrs */
913         rc = ksocknal_lib_get_conn_addrs(conn);
914         if (rc != 0)
915                 goto failed_1;
916
917         /* Find out/confirm peer_ni's NID and connection type and get the
918          * vector of interfaces she's willing to let me connect to.
919          * Passive connections use the listener timeout since the peer_ni sends
920          * eagerly
921          */
922
923         if (active) {
924                 peer_ni = conn_cb->ksnr_peer;
925                 LASSERT(ni == peer_ni->ksnp_ni);
926
927                 /* Active connection sends HELLO eagerly */
928                 hello->kshm_nips = 0;
929                 peerid = peer_ni->ksnp_id;
930
931                 write_lock_bh(global_lock);
932                 conn->ksnc_proto = peer_ni->ksnp_proto;
933                 write_unlock_bh(global_lock);
934
935                 if (conn->ksnc_proto == NULL) {
936                         conn->ksnc_proto = &ksocknal_protocol_v3x;
937 #if SOCKNAL_VERSION_DEBUG
938                         if (*ksocknal_tunables.ksnd_protocol == 2)
939                                 conn->ksnc_proto = &ksocknal_protocol_v2x;
940                         else if (*ksocknal_tunables.ksnd_protocol == 1)
941                                 conn->ksnc_proto = &ksocknal_protocol_v1x;
942 #endif
943                 }
944
945                 rc = ksocknal_send_hello(ni, conn, &peerid.nid, hello);
946                 if (rc != 0)
947                         goto failed_1;
948         } else {
949                 peerid.nid = LNET_ANY_NID;
950                 peerid.pid = LNET_PID_ANY;
951
952                 /* Passive, get protocol from peer_ni */
953                 conn->ksnc_proto = NULL;
954         }
955
956         rc = ksocknal_recv_hello(ni, conn, hello, &peerid, &incarnation);
957         if (rc < 0)
958                 goto failed_1;
959
960         LASSERT(rc == 0 || active);
961         LASSERT(conn->ksnc_proto != NULL);
962         LASSERT(!LNET_NID_IS_ANY(&peerid.nid));
963
964         cpt = lnet_nid2cpt(&peerid.nid, ni);
965
966         if (active) {
967                 ksocknal_peer_addref(peer_ni);
968                 write_lock_bh(global_lock);
969         } else {
970                 peer_ni = ksocknal_create_peer(ni, &peerid);
971                 if (IS_ERR(peer_ni)) {
972                         rc = PTR_ERR(peer_ni);
973                         goto failed_1;
974                 }
975
976                 write_lock_bh(global_lock);
977
978                 /* called with a ref on ni, so shutdown can't have started */
979                 LASSERT(atomic_read(&((struct ksock_net *)ni->ni_data)->ksnn_npeers) >= 0);
980
981                 peer2 = ksocknal_find_peer_locked(ni, &peerid);
982                 if (peer2 == NULL) {
983                         /* NB this puts an "empty" peer_ni in the peer_ni
984                          * table (which takes my ref) */
985                         hash_add(ksocknal_data.ksnd_peers,
986                                  &peer_ni->ksnp_list, nidhash(&peerid.nid));
987                 } else {
988                         ksocknal_peer_decref(peer_ni);
989                         peer_ni = peer2;
990                 }
991
992                 /* +1 ref for me */
993                 ksocknal_peer_addref(peer_ni);
994                 peer_ni->ksnp_accepting++;
995
996                 /* Am I already connecting to this guy?  Resolve in
997                  * favour of higher NID...
998                  */
999                 if (memcmp(&peerid.nid, &ni->ni_nid, sizeof(peerid.nid)) < 0 &&
1000                     ksocknal_connecting(peer_ni->ksnp_conn_cb,
1001                                         ((struct sockaddr *) &conn->ksnc_peeraddr))) {
1002                         rc = EALREADY;
1003                         warn = "connection race resolution";
1004                         goto failed_2;
1005                 }
1006         }
1007
1008         if (peer_ni->ksnp_closing ||
1009             (active && conn_cb->ksnr_deleted)) {
1010                 /* peer_ni/conn_cb got closed under me */
1011                 rc = -ESTALE;
1012                 warn = "peer_ni/conn_cb removed";
1013                 goto failed_2;
1014         }
1015
1016         if (peer_ni->ksnp_proto == NULL) {
1017                 /* Never connected before.
1018                  * NB recv_hello may have returned EPROTO to signal my peer_ni
1019                  * wants a different protocol than the one I asked for.
1020                  */
1021                 LASSERT(list_empty(&peer_ni->ksnp_conns));
1022
1023                 peer_ni->ksnp_proto = conn->ksnc_proto;
1024                 peer_ni->ksnp_incarnation = incarnation;
1025         }
1026
1027         if (peer_ni->ksnp_proto != conn->ksnc_proto ||
1028             peer_ni->ksnp_incarnation != incarnation) {
1029                 /* peer_ni rebooted or I've got the wrong protocol version */
1030                 ksocknal_close_peer_conns_locked(peer_ni, NULL, 0);
1031
1032                 peer_ni->ksnp_proto = NULL;
1033                 rc = ESTALE;
1034                 warn = peer_ni->ksnp_incarnation != incarnation ?
1035                         "peer_ni rebooted" :
1036                         "wrong proto version";
1037                 goto failed_2;
1038         }
1039
1040         switch (rc) {
1041         default:
1042                 LBUG();
1043         case 0:
1044                 break;
1045         case EALREADY:
1046                 warn = "lost conn race";
1047                 goto failed_2;
1048         case EPROTO:
1049                 warn = "retry with different protocol version";
1050                 goto failed_2;
1051         }
1052
1053         /* Refuse to duplicate an existing connection, unless this is a
1054          * loopback connection */
1055         if (!rpc_cmp_addr((struct sockaddr *)&conn->ksnc_peeraddr,
1056                           (struct sockaddr *)&conn->ksnc_myaddr)) {
1057                 list_for_each_entry(conn2, &peer_ni->ksnp_conns, ksnc_list) {
1058                         if (!rpc_cmp_addr(
1059                                     (struct sockaddr *)&conn2->ksnc_peeraddr,
1060                                     (struct sockaddr *)&conn->ksnc_peeraddr) ||
1061                             !rpc_cmp_addr(
1062                                     (struct sockaddr *)&conn2->ksnc_myaddr,
1063                                     (struct sockaddr *)&conn->ksnc_myaddr) ||
1064                             conn2->ksnc_type != conn->ksnc_type)
1065                                 continue;
1066
1067                         num_dup++;
1068                         /* If max conns per type is not registered in conn_cb
1069                          * as ksnr_max_conns, use ni's conns_per_peer
1070                          */
1071                         if ((peer_ni->ksnp_conn_cb &&
1072                             num_dup < peer_ni->ksnp_conn_cb->ksnr_max_conns) ||
1073                             (!peer_ni->ksnp_conn_cb &&
1074                             num_dup < ksocknal_get_conns_per_peer(peer_ni)))
1075                                 continue;
1076
1077                         /* Reply on a passive connection attempt so the peer_ni
1078                          * realises we're connected.
1079                          */
1080                         LASSERT(rc == 0);
1081                         if (!active)
1082                                 rc = EALREADY;
1083
1084                         warn = "duplicate";
1085                         goto failed_2;
1086                 }
1087         }
1088         /* If the connection created by this route didn't bind to the IP
1089          * address the route connected to, the connection/route matching
1090          * code below probably isn't going to work.
1091          */
1092         if (active &&
1093             !rpc_cmp_addr((struct sockaddr *)&conn_cb->ksnr_addr,
1094                           (struct sockaddr *)&conn->ksnc_peeraddr)) {
1095                 CERROR("Route %s %pISc connected to %pISc\n",
1096                        libcfs_idstr(&peer_ni->ksnp_id),
1097                        &conn_cb->ksnr_addr,
1098                        &conn->ksnc_peeraddr);
1099         }
1100
1101         /* Search for a conn_cb corresponding to the new connection and
1102          * create an association.  This allows incoming connections created
1103          * by conn_cbs in my peer_ni to match my own conn_cb entries so I don't
1104          * continually create duplicate conn_cbs.
1105          */
1106         conn_cb = peer_ni->ksnp_conn_cb;
1107
1108         if (conn_cb && rpc_cmp_addr((struct sockaddr *)&conn->ksnc_peeraddr,
1109                                     (struct sockaddr *)&conn_cb->ksnr_addr))
1110                 ksocknal_associate_cb_conn_locked(conn_cb, conn);
1111
1112         conn->ksnc_peer = peer_ni;                 /* conn takes my ref on peer_ni */
1113         peer_ni->ksnp_last_alive = ktime_get_seconds();
1114         peer_ni->ksnp_send_keepalive = 0;
1115         peer_ni->ksnp_error = 0;
1116
1117         sched = ksocknal_choose_scheduler_locked(cpt);
1118         if (!sched) {
1119                 CERROR("no schedulers available. node is unhealthy\n");
1120                 goto failed_2;
1121         }
1122         /*
1123          * The cpt might have changed if we ended up selecting a non cpt
1124          * native scheduler. So use the scheduler's cpt instead.
1125          */
1126         cpt = sched->kss_cpt;
1127         sched->kss_nconns++;
1128         conn->ksnc_scheduler = sched;
1129
1130         conn->ksnc_tx_last_post = ktime_get_seconds();
1131         /* Set the deadline for the outgoing HELLO to drain */
1132         conn->ksnc_tx_bufnob = sock->sk->sk_wmem_queued;
1133         conn->ksnc_tx_deadline = ktime_get_seconds() +
1134                                  ksocknal_timeout();
1135         smp_mb();   /* order with adding to peer_ni's conn list */
1136
1137         list_add(&conn->ksnc_list, &peer_ni->ksnp_conns);
1138         ksocknal_conn_addref(conn);
1139
1140         ksocknal_new_packet(conn, 0);
1141
1142         conn->ksnc_zc_capable = ksocknal_lib_zc_capable(conn);
1143
1144         /* Take packets blocking for this connection. */
1145         list_for_each_entry_safe(tx, txtmp, &peer_ni->ksnp_tx_queue, tx_list) {
1146                 if (conn->ksnc_proto->pro_match_tx(conn, tx, tx->tx_nonblk) ==
1147                     SOCKNAL_MATCH_NO)
1148                         continue;
1149
1150                 list_del(&tx->tx_list);
1151                 ksocknal_queue_tx_locked(tx, conn);
1152         }
1153
1154         write_unlock_bh(global_lock);
1155         /* We've now got a new connection.  Any errors from here on are just
1156          * like "normal" comms errors and we close the connection normally.
1157          * NB (a) we still have to send the reply HELLO for passive
1158          *        connections,
1159          *    (b) normal I/O on the conn is blocked until I setup and call the
1160          *        socket callbacks.
1161          */
1162
1163         CDEBUG(D_NET, "New conn %s p %d.x %pISc -> %pIScp"
1164                " incarnation:%lld sched[%d]\n",
1165                libcfs_idstr(&peerid), conn->ksnc_proto->pro_version,
1166                &conn->ksnc_myaddr, &conn->ksnc_peeraddr,
1167                incarnation, cpt);
1168
1169         if (!active) {
1170                 hello->kshm_nips = 0;
1171                 rc = ksocknal_send_hello(ni, conn, &peerid.nid, hello);
1172         }
1173
1174         LIBCFS_FREE(hello, offsetof(struct ksock_hello_msg,
1175                                     kshm_ips[LNET_INTERFACES_NUM]));
1176
1177         /* setup the socket AFTER I've received hello (it disables
1178          * SO_LINGER).  I might call back to the acceptor who may want
1179          * to send a protocol version response and then close the
1180          * socket; this ensures the socket only tears down after the
1181          * response has been sent.
1182          */
1183         if (rc == 0)
1184                 rc = ksocknal_lib_setup_sock(sock);
1185
1186         write_lock_bh(global_lock);
1187
1188         /* NB my callbacks block while I hold ksnd_global_lock */
1189         ksocknal_lib_set_callback(sock, conn);
1190
1191         if (!active)
1192                 peer_ni->ksnp_accepting--;
1193
1194         write_unlock_bh(global_lock);
1195
1196         if (rc != 0) {
1197                 write_lock_bh(global_lock);
1198                 if (!conn->ksnc_closing) {
1199                         /* could be closed by another thread */
1200                         ksocknal_close_conn_locked(conn, rc);
1201                 }
1202                 write_unlock_bh(global_lock);
1203         } else if (ksocknal_connsock_addref(conn) == 0) {
1204                 /* Allow I/O to proceed. */
1205                 ksocknal_read_callback(conn);
1206                 ksocknal_write_callback(conn);
1207                 ksocknal_connsock_decref(conn);
1208         }
1209
1210         ksocknal_connsock_decref(conn);
1211         ksocknal_conn_decref(conn);
1212         return rc;
1213
1214 failed_2:
1215
1216         if (!peer_ni->ksnp_closing &&
1217             list_empty(&peer_ni->ksnp_conns) &&
1218             peer_ni->ksnp_conn_cb == NULL) {
1219                 list_splice_init(&peer_ni->ksnp_tx_queue, &zombies);
1220                 ksocknal_unlink_peer_locked(peer_ni);
1221         }
1222
1223         write_unlock_bh(global_lock);
1224
1225         if (warn != NULL) {
1226                 if (rc < 0)
1227                         CERROR("Not creating conn %s type %d: %s\n",
1228                                libcfs_idstr(&peerid), conn->ksnc_type, warn);
1229                 else
1230                         CDEBUG(D_NET, "Not creating conn %s type %d: %s\n",
1231                                libcfs_idstr(&peerid), conn->ksnc_type, warn);
1232         }
1233
1234         if (!active) {
1235                 if (rc > 0) {
1236                         /* Request retry by replying with CONN_NONE
1237                          * ksnc_proto has been set already
1238                          */
1239                         conn->ksnc_type = SOCKLND_CONN_NONE;
1240                         hello->kshm_nips = 0;
1241                         ksocknal_send_hello(ni, conn, &peerid.nid, hello);
1242                 }
1243
1244                 write_lock_bh(global_lock);
1245                 peer_ni->ksnp_accepting--;
1246                 write_unlock_bh(global_lock);
1247         }
1248
1249         /*
1250          * If we get here without an error code, just use -EALREADY.
1251          * Depending on how we got here, the error may be positive
1252          * or negative. Normalize the value for ksocknal_txlist_done().
1253          */
1254         rc2 = (rc == 0 ? -EALREADY : (rc > 0 ? -rc : rc));
1255         ksocknal_txlist_done(ni, &zombies, rc2);
1256         ksocknal_peer_decref(peer_ni);
1257
1258 failed_1:
1259         if (hello != NULL)
1260                 LIBCFS_FREE(hello, offsetof(struct ksock_hello_msg,
1261                                             kshm_ips[LNET_INTERFACES_NUM]));
1262
1263         LIBCFS_FREE(conn, sizeof(*conn));
1264
1265 failed_0:
1266         sock_release(sock);
1267
1268         return rc;
1269 }
1270
1271 void
1272 ksocknal_close_conn_locked(struct ksock_conn *conn, int error)
1273 {
1274         /* This just does the immmediate housekeeping, and queues the
1275          * connection for the reaper to terminate.
1276          * Caller holds ksnd_global_lock exclusively in irq context */
1277         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1278         struct ksock_conn_cb *conn_cb;
1279         struct ksock_conn *conn2;
1280         int conn_count;
1281         int duplicate_count = 0;
1282
1283         LASSERT(peer_ni->ksnp_error == 0);
1284         LASSERT(!conn->ksnc_closing);
1285         conn->ksnc_closing = 1;
1286
1287         /* ksnd_deathrow_conns takes over peer_ni's ref */
1288         list_del(&conn->ksnc_list);
1289
1290         conn_cb = conn->ksnc_conn_cb;
1291         if (conn_cb != NULL) {
1292                 /* dissociate conn from cb... */
1293                 LASSERT(!conn_cb->ksnr_deleted);
1294
1295                 conn_count = ksocknal_get_conn_count_by_type(conn_cb,
1296                                                              conn->ksnc_type);
1297                 /* connected bit is set only if all connections
1298                  * of the given type got created
1299                  */
1300                 if (conn_count == conn_cb->ksnr_max_conns)
1301                         LASSERT((conn_cb->ksnr_connected &
1302                                 BIT(conn->ksnc_type)) != 0);
1303
1304                 if (conn_count == 1) {
1305                         list_for_each_entry(conn2, &peer_ni->ksnp_conns,
1306                                             ksnc_list) {
1307                                 if (conn2->ksnc_conn_cb == conn_cb &&
1308                                     conn2->ksnc_type == conn->ksnc_type)
1309                                         duplicate_count += 1;
1310                         }
1311                         if (duplicate_count > 0)
1312                                 CERROR("Found %d duplicate conns type %d\n",
1313                                        duplicate_count,
1314                                        conn->ksnc_type);
1315                 }
1316                 ksocknal_decr_conn_count(conn_cb, conn->ksnc_type);
1317
1318                 conn->ksnc_conn_cb = NULL;
1319
1320                 /* drop conn's ref on conn_cb */
1321                 ksocknal_conn_cb_decref(conn_cb);
1322         }
1323
1324         if (list_empty(&peer_ni->ksnp_conns)) {
1325                 /* No more connections to this peer_ni */
1326
1327                 if (!list_empty(&peer_ni->ksnp_tx_queue)) {
1328                         struct ksock_tx *tx;
1329
1330                         LASSERT(conn->ksnc_proto == &ksocknal_protocol_v3x);
1331
1332                         /* throw them to the last connection...,
1333                          * these TXs will be send to /dev/null by scheduler */
1334                         list_for_each_entry(tx, &peer_ni->ksnp_tx_queue,
1335                                             tx_list)
1336                                 ksocknal_tx_prep(conn, tx);
1337
1338                         spin_lock_bh(&conn->ksnc_scheduler->kss_lock);
1339                         list_splice_init(&peer_ni->ksnp_tx_queue,
1340                                          &conn->ksnc_tx_queue);
1341                         spin_unlock_bh(&conn->ksnc_scheduler->kss_lock);
1342                 }
1343
1344                 /* renegotiate protocol version */
1345                 peer_ni->ksnp_proto = NULL;
1346                 /* stash last conn close reason */
1347                 peer_ni->ksnp_error = error;
1348
1349                 if (peer_ni->ksnp_conn_cb == NULL) {
1350                         /* I've just closed last conn belonging to a
1351                          * peer_ni with no connections to it
1352                          */
1353                         ksocknal_unlink_peer_locked(peer_ni);
1354                 }
1355         }
1356
1357         spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
1358
1359         list_add_tail(&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
1360         wake_up(&ksocknal_data.ksnd_reaper_waitq);
1361
1362         spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
1363 }
1364
1365 void
1366 ksocknal_peer_failed(struct ksock_peer_ni *peer_ni)
1367 {
1368         bool notify = false;
1369         time64_t last_alive = 0;
1370
1371         /* There has been a connection failure or comms error; but I'll only
1372          * tell LNET I think the peer_ni is dead if it's to another kernel and
1373          * there are no connections or connection attempts in existence. */
1374
1375         read_lock(&ksocknal_data.ksnd_global_lock);
1376
1377         if ((peer_ni->ksnp_id.pid & LNET_PID_USERFLAG) == 0 &&
1378              list_empty(&peer_ni->ksnp_conns) &&
1379              peer_ni->ksnp_accepting == 0 &&
1380              !ksocknal_find_connecting_conn_cb_locked(peer_ni)) {
1381                 notify = true;
1382                 last_alive = peer_ni->ksnp_last_alive;
1383         }
1384
1385         read_unlock(&ksocknal_data.ksnd_global_lock);
1386
1387         if (notify)
1388                 lnet_notify(peer_ni->ksnp_ni,
1389                             lnet_nid_to_nid4(&peer_ni->ksnp_id.nid),
1390                             false, false, last_alive);
1391 }
1392
1393 void
1394 ksocknal_finalize_zcreq(struct ksock_conn *conn)
1395 {
1396         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1397         struct ksock_tx *tx;
1398         struct ksock_tx *tmp;
1399         LIST_HEAD(zlist);
1400
1401         /* NB safe to finalize TXs because closing of socket will
1402          * abort all buffered data */
1403         LASSERT(conn->ksnc_sock == NULL);
1404
1405         spin_lock(&peer_ni->ksnp_lock);
1406
1407         list_for_each_entry_safe(tx, tmp, &peer_ni->ksnp_zc_req_list,
1408                                  tx_zc_list) {
1409                 if (tx->tx_conn != conn)
1410                         continue;
1411
1412                 LASSERT(tx->tx_msg.ksm_zc_cookies[0] != 0);
1413
1414                 tx->tx_msg.ksm_zc_cookies[0] = 0;
1415                 tx->tx_zc_aborted = 1;  /* mark it as not-acked */
1416                 list_move(&tx->tx_zc_list, &zlist);
1417         }
1418
1419         spin_unlock(&peer_ni->ksnp_lock);
1420
1421         while ((tx = list_first_entry_or_null(&zlist, struct ksock_tx,
1422                                               tx_zc_list)) != NULL) {
1423                 list_del(&tx->tx_zc_list);
1424                 ksocknal_tx_decref(tx);
1425         }
1426 }
1427
1428 void
1429 ksocknal_terminate_conn(struct ksock_conn *conn)
1430 {
1431         /* This gets called by the reaper (guaranteed thread context) to
1432          * disengage the socket from its callbacks and close it.
1433          * ksnc_refcount will eventually hit zero, and then the reaper will
1434          * destroy it.
1435          */
1436         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1437         struct ksock_sched *sched = conn->ksnc_scheduler;
1438         bool failed = false;
1439
1440         LASSERT(conn->ksnc_closing);
1441
1442         /* wake up the scheduler to "send" all remaining packets to /dev/null */
1443         spin_lock_bh(&sched->kss_lock);
1444
1445         /* a closing conn is always ready to tx */
1446         conn->ksnc_tx_ready = 1;
1447
1448         if (!conn->ksnc_tx_scheduled &&
1449             !list_empty(&conn->ksnc_tx_queue)) {
1450                 list_add_tail(&conn->ksnc_tx_list,
1451                               &sched->kss_tx_conns);
1452                 conn->ksnc_tx_scheduled = 1;
1453                 /* extra ref for scheduler */
1454                 ksocknal_conn_addref(conn);
1455
1456                 wake_up(&sched->kss_waitq);
1457         }
1458
1459         spin_unlock_bh(&sched->kss_lock);
1460
1461         /* serialise with callbacks */
1462         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1463
1464         ksocknal_lib_reset_callback(conn->ksnc_sock, conn);
1465
1466         /* OK, so this conn may not be completely disengaged from its
1467          * scheduler yet, but it _has_ committed to terminate...
1468          */
1469         conn->ksnc_scheduler->kss_nconns--;
1470
1471         if (peer_ni->ksnp_error != 0) {
1472                 /* peer_ni's last conn closed in error */
1473                 LASSERT(list_empty(&peer_ni->ksnp_conns));
1474                 failed = true;
1475                 peer_ni->ksnp_error = 0;     /* avoid multiple notifications */
1476         }
1477
1478         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1479
1480         if (failed)
1481                 ksocknal_peer_failed(peer_ni);
1482
1483         /* The socket is closed on the final put; either here, or in
1484          * ksocknal_{send,recv}msg().  Since we set up the linger2 option
1485          * when the connection was established, this will close the socket
1486          * immediately, aborting anything buffered in it. Any hung
1487          * zero-copy transmits will therefore complete in finite time.
1488          */
1489         ksocknal_connsock_decref(conn);
1490 }
1491
1492 void
1493 ksocknal_queue_zombie_conn(struct ksock_conn *conn)
1494 {
1495         /* Queue the conn for the reaper to destroy */
1496         LASSERT(refcount_read(&conn->ksnc_conn_refcount) == 0);
1497         spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
1498
1499         list_add_tail(&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1500         wake_up(&ksocknal_data.ksnd_reaper_waitq);
1501
1502         spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
1503 }
1504
1505 void
1506 ksocknal_destroy_conn(struct ksock_conn *conn)
1507 {
1508         time64_t last_rcv;
1509
1510         /* Final coup-de-grace of the reaper */
1511         CDEBUG(D_NET, "connection %p\n", conn);
1512
1513         LASSERT(refcount_read(&conn->ksnc_conn_refcount) == 0);
1514         LASSERT(refcount_read(&conn->ksnc_sock_refcount) == 0);
1515         LASSERT(conn->ksnc_sock == NULL);
1516         LASSERT(conn->ksnc_conn_cb == NULL);
1517         LASSERT(!conn->ksnc_tx_scheduled);
1518         LASSERT(!conn->ksnc_rx_scheduled);
1519         LASSERT(list_empty(&conn->ksnc_tx_queue));
1520
1521         /* complete current receive if any */
1522         switch (conn->ksnc_rx_state) {
1523         case SOCKNAL_RX_LNET_PAYLOAD:
1524                 last_rcv = conn->ksnc_rx_deadline -
1525                            ksocknal_timeout();
1526                 CERROR("Completing partial receive from %s[%d], ip %pIScp, with error, wanted: %d, left: %d, last alive is %lld secs ago\n",
1527                        libcfs_idstr(&conn->ksnc_peer->ksnp_id),
1528                        conn->ksnc_type,
1529                        &conn->ksnc_peeraddr,
1530                        conn->ksnc_rx_nob_wanted, conn->ksnc_rx_nob_left,
1531                        ktime_get_seconds() - last_rcv);
1532                 if (conn->ksnc_lnet_msg)
1533                         conn->ksnc_lnet_msg->msg_health_status =
1534                                 LNET_MSG_STATUS_REMOTE_ERROR;
1535                 lnet_finalize(conn->ksnc_lnet_msg, -EIO);
1536                 break;
1537         case SOCKNAL_RX_LNET_HEADER:
1538                 if (conn->ksnc_rx_started)
1539                         CERROR("Incomplete receive of lnet header from %s, ip %pIScp, with error, protocol: %d.x.\n",
1540                                libcfs_idstr(&conn->ksnc_peer->ksnp_id),
1541                                &conn->ksnc_peeraddr,
1542                                conn->ksnc_proto->pro_version);
1543                 break;
1544         case SOCKNAL_RX_KSM_HEADER:
1545                 if (conn->ksnc_rx_started)
1546                         CERROR("Incomplete receive of ksock message from %s, ip %pIScp, with error, protocol: %d.x.\n",
1547                                libcfs_idstr(&conn->ksnc_peer->ksnp_id),
1548                                &conn->ksnc_peeraddr,
1549                                conn->ksnc_proto->pro_version);
1550                 break;
1551         case SOCKNAL_RX_SLOP:
1552                 if (conn->ksnc_rx_started)
1553                         CERROR("Incomplete receive of slops from %s, ip %pIScp, with error\n",
1554                                libcfs_idstr(&conn->ksnc_peer->ksnp_id),
1555                                &conn->ksnc_peeraddr);
1556                 break;
1557         default:
1558                 LBUG();
1559                 break;
1560         }
1561
1562         ksocknal_peer_decref(conn->ksnc_peer);
1563
1564         LIBCFS_FREE(conn, sizeof(*conn));
1565 }
1566
1567 int
1568 ksocknal_close_peer_conns_locked(struct ksock_peer_ni *peer_ni,
1569                                  struct sockaddr *addr, int why)
1570 {
1571         struct ksock_conn *conn;
1572         struct ksock_conn *cnxt;
1573         int count = 0;
1574
1575         list_for_each_entry_safe(conn, cnxt, &peer_ni->ksnp_conns, ksnc_list) {
1576                 if (!addr ||
1577                     rpc_cmp_addr(addr,
1578                                  (struct sockaddr *)&conn->ksnc_peeraddr)) {
1579                         count++;
1580                         ksocknal_close_conn_locked(conn, why);
1581                 }
1582         }
1583
1584         return count;
1585 }
1586
1587 int
1588 ksocknal_close_conn_and_siblings(struct ksock_conn *conn, int why)
1589 {
1590         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1591         int count;
1592
1593         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1594
1595         count = ksocknal_close_peer_conns_locked(
1596                 peer_ni, (struct sockaddr *)&conn->ksnc_peeraddr, why);
1597
1598         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1599
1600         return count;
1601 }
1602
1603 int
1604 ksocknal_close_matching_conns(struct lnet_processid *id, __u32 ipaddr)
1605 {
1606         struct ksock_peer_ni *peer_ni;
1607         struct hlist_node *pnxt;
1608         int lo;
1609         int hi;
1610         int i;
1611         int count = 0;
1612         struct sockaddr_in sa = {.sin_family = AF_INET};
1613
1614         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1615
1616         if (!LNET_NID_IS_ANY(&id->nid)) {
1617                 lo = hash_min(nidhash(&id->nid),
1618                               HASH_BITS(ksocknal_data.ksnd_peers));
1619                 hi = lo;
1620         } else {
1621                 lo = 0;
1622                 hi = HASH_SIZE(ksocknal_data.ksnd_peers) - 1;
1623         }
1624
1625         sa.sin_addr.s_addr = htonl(ipaddr);
1626         for (i = lo; i <= hi; i++) {
1627                 hlist_for_each_entry_safe(peer_ni, pnxt,
1628                                           &ksocknal_data.ksnd_peers[i],
1629                                           ksnp_list) {
1630
1631                         if (!((LNET_NID_IS_ANY(&id->nid) ||
1632                                nid_same(&id->nid, &peer_ni->ksnp_id.nid)) &&
1633                               (id->pid == LNET_PID_ANY ||
1634                                id->pid == peer_ni->ksnp_id.pid)))
1635                                 continue;
1636
1637                         count += ksocknal_close_peer_conns_locked(
1638                                 peer_ni,
1639                                 ipaddr ? (struct sockaddr *)&sa : NULL, 0);
1640                 }
1641         }
1642
1643         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1644
1645         /* wildcards always succeed */
1646         if (LNET_NID_IS_ANY(&id->nid) || id->pid == LNET_PID_ANY ||
1647             ipaddr == 0)
1648                 return 0;
1649
1650         return (count == 0 ? -ENOENT : 0);
1651 }
1652
1653 void
1654 ksocknal_notify_gw_down(struct lnet_nid *gw_nid)
1655 {
1656         /* The router is telling me she's been notified of a change in
1657          * gateway state....
1658          */
1659         struct lnet_processid id = {
1660                 .pid    = LNET_PID_ANY,
1661                 .nid    = *gw_nid,
1662         };
1663
1664         CDEBUG(D_NET, "gw %s down\n", libcfs_nidstr(gw_nid));
1665
1666         /* If the gateway crashed, close all open connections... */
1667         ksocknal_close_matching_conns(&id, 0);
1668         return;
1669
1670         /* We can only establish new connections
1671          * if we have autroutes, and these connect on demand.
1672          */
1673 }
1674
1675 static void
1676 ksocknal_push_peer(struct ksock_peer_ni *peer_ni)
1677 {
1678         int index;
1679         int i;
1680         struct ksock_conn *conn;
1681
1682         for (index = 0; ; index++) {
1683                 read_lock(&ksocknal_data.ksnd_global_lock);
1684
1685                 i = 0;
1686                 conn = NULL;
1687
1688                 list_for_each_entry(conn, &peer_ni->ksnp_conns, ksnc_list) {
1689                         if (i++ == index) {
1690                                 ksocknal_conn_addref(conn);
1691                                 break;
1692                         }
1693                 }
1694
1695                 read_unlock(&ksocknal_data.ksnd_global_lock);
1696
1697                 if (i <= index)
1698                         break;
1699
1700                 ksocknal_lib_push_conn (conn);
1701                 ksocknal_conn_decref(conn);
1702         }
1703 }
1704
1705 static int
1706 ksocknal_push(struct lnet_ni *ni, struct lnet_processid *id)
1707 {
1708         int lo;
1709         int hi;
1710         int bkt;
1711         int rc = -ENOENT;
1712
1713         if (!LNET_NID_IS_ANY(&id->nid)) {
1714                 lo = hash_min(nidhash(&id->nid),
1715                               HASH_BITS(ksocknal_data.ksnd_peers));
1716                 hi = lo;
1717         } else {
1718                 lo = 0;
1719                 hi = HASH_SIZE(ksocknal_data.ksnd_peers) - 1;
1720         }
1721
1722         for (bkt = lo; bkt <= hi; bkt++) {
1723                 int peer_off; /* searching offset in peer_ni hash table */
1724
1725                 for (peer_off = 0; ; peer_off++) {
1726                         struct ksock_peer_ni *peer_ni;
1727                         int           i = 0;
1728
1729                         read_lock(&ksocknal_data.ksnd_global_lock);
1730                         hlist_for_each_entry(peer_ni,
1731                                              &ksocknal_data.ksnd_peers[bkt],
1732                                              ksnp_list) {
1733                                 if (!((LNET_NID_IS_ANY(&id->nid) ||
1734                                        nid_same(&id->nid,
1735                                                  &peer_ni->ksnp_id.nid)) &&
1736                                       (id->pid == LNET_PID_ANY ||
1737                                        id->pid == peer_ni->ksnp_id.pid)))
1738                                         continue;
1739
1740                                 if (i++ == peer_off) {
1741                                         ksocknal_peer_addref(peer_ni);
1742                                         break;
1743                                 }
1744                         }
1745                         read_unlock(&ksocknal_data.ksnd_global_lock);
1746
1747                         if (i <= peer_off) /* no match */
1748                                 break;
1749
1750                         rc = 0;
1751                         ksocknal_push_peer(peer_ni);
1752                         ksocknal_peer_decref(peer_ni);
1753                 }
1754         }
1755         return rc;
1756 }
1757
1758 int
1759 ksocknal_ctl(struct lnet_ni *ni, unsigned int cmd, void *arg)
1760 {
1761         struct lnet_processid id = {};
1762         struct libcfs_ioctl_data *data = arg;
1763         int rc;
1764
1765         switch(cmd) {
1766         case IOC_LIBCFS_GET_INTERFACE: {
1767                 struct ksock_net *net = ni->ni_data;
1768                 struct ksock_interface *iface;
1769                 struct sockaddr_in *sa;
1770
1771                 read_lock(&ksocknal_data.ksnd_global_lock);
1772
1773                 if (data->ioc_count >= 1) {
1774                         rc = -ENOENT;
1775                 } else {
1776                         rc = 0;
1777                         iface = &net->ksnn_interface;
1778
1779                         sa = (void *)&iface->ksni_addr;
1780                         if (sa->sin_family == AF_INET) {
1781                                 data->ioc_u32[0] = ntohl(sa->sin_addr.s_addr);
1782                                 data->ioc_u32[1] = iface->ksni_netmask;
1783                         } else {
1784                                 data->ioc_u32[0] = 0xFFFFFFFF;
1785                                 data->ioc_u32[1] = 0;
1786                         }
1787                         data->ioc_u32[2] = iface->ksni_npeers;
1788                         data->ioc_u32[3] = iface->ksni_nroutes;
1789                 }
1790
1791                 read_unlock(&ksocknal_data.ksnd_global_lock);
1792                 return rc;
1793         }
1794
1795         case IOC_LIBCFS_GET_PEER: {
1796                 __u32            myip = 0;
1797                 __u32            ip = 0;
1798                 int              port = 0;
1799                 int              conn_count = 0;
1800                 int              share_count = 0;
1801
1802                 rc = ksocknal_get_peer_info(ni, data->ioc_count,
1803                                             &id, &myip, &ip, &port,
1804                                             &conn_count,  &share_count);
1805                 if (rc != 0)
1806                         return rc;
1807
1808                 if (!nid_is_nid4(&id.nid))
1809                         return -EINVAL;
1810                 data->ioc_nid    = lnet_nid_to_nid4(&id.nid);
1811                 data->ioc_count  = share_count;
1812                 data->ioc_u32[0] = ip;
1813                 data->ioc_u32[1] = port;
1814                 data->ioc_u32[2] = myip;
1815                 data->ioc_u32[3] = conn_count;
1816                 data->ioc_u32[4] = id.pid;
1817                 return 0;
1818         }
1819
1820         case IOC_LIBCFS_ADD_PEER: {
1821                 struct sockaddr_in sa = {.sin_family = AF_INET};
1822
1823                 id.pid = LNET_PID_LUSTRE;
1824                 lnet_nid4_to_nid(data->ioc_nid, &id.nid);
1825                 sa.sin_addr.s_addr = htonl(data->ioc_u32[0]);
1826                 sa.sin_port = htons(data->ioc_u32[1]);
1827                 return ksocknal_add_peer(ni, &id, (struct sockaddr *)&sa);
1828         }
1829         case IOC_LIBCFS_DEL_PEER:
1830                 lnet_nid4_to_nid(data->ioc_nid, &id.nid);
1831                 id.pid = LNET_PID_ANY;
1832                 return ksocknal_del_peer(ni, &id);
1833
1834         case IOC_LIBCFS_GET_CONN: {
1835                 int           txmem;
1836                 int           rxmem;
1837                 int           nagle;
1838                 struct ksock_conn *conn = ksocknal_get_conn_by_idx(ni, data->ioc_count);
1839                 struct sockaddr_in *psa = (void *)&conn->ksnc_peeraddr;
1840                 struct sockaddr_in *mysa = (void *)&conn->ksnc_myaddr;
1841
1842                 if (conn == NULL)
1843                         return -ENOENT;
1844
1845                 ksocknal_lib_get_conn_tunables(conn, &txmem, &rxmem, &nagle);
1846
1847                 data->ioc_count = txmem;
1848                 data->ioc_nid = lnet_nid_to_nid4(&conn->ksnc_peer->ksnp_id.nid);
1849                 data->ioc_flags = nagle;
1850                 if (psa->sin_family == AF_INET)
1851                         data->ioc_u32[0] = ntohl(psa->sin_addr.s_addr);
1852                 else
1853                         data->ioc_u32[0] = 0xFFFFFFFF;
1854                 data->ioc_u32[1] = rpc_get_port((struct sockaddr *)
1855                                                 &conn->ksnc_peeraddr);
1856                 if (mysa->sin_family == AF_INET)
1857                         data->ioc_u32[2] = ntohl(mysa->sin_addr.s_addr);
1858                 else
1859                         data->ioc_u32[2] = 0xFFFFFFFF;
1860                 data->ioc_u32[3] = conn->ksnc_type;
1861                 data->ioc_u32[4] = conn->ksnc_scheduler->kss_cpt;
1862                 data->ioc_u32[5] = rxmem;
1863                 data->ioc_u32[6] = conn->ksnc_peer->ksnp_id.pid;
1864                 ksocknal_conn_decref(conn);
1865                 return 0;
1866         }
1867
1868         case IOC_LIBCFS_CLOSE_CONNECTION:
1869                 lnet_nid4_to_nid(data->ioc_nid, &id.nid);
1870                 id.pid = LNET_PID_ANY;
1871                 return ksocknal_close_matching_conns(&id,
1872                                                      data->ioc_u32[0]);
1873
1874         case IOC_LIBCFS_REGISTER_MYNID:
1875                 /* Ignore if this is a noop */
1876                 if (nid_is_nid4(&ni->ni_nid) &&
1877                     data->ioc_nid == lnet_nid_to_nid4(&ni->ni_nid))
1878                         return 0;
1879
1880                 CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID: %s(%s)\n",
1881                        libcfs_nid2str(data->ioc_nid),
1882                        libcfs_nidstr(&ni->ni_nid));
1883                 return -EINVAL;
1884
1885         case IOC_LIBCFS_PUSH_CONNECTION:
1886                 lnet_nid4_to_nid(data->ioc_nid, &id.nid);
1887                 id.pid = LNET_PID_ANY;
1888                 return ksocknal_push(ni, &id);
1889
1890         default:
1891                 return -EINVAL;
1892         }
1893         /* not reached */
1894 }
1895
1896 static void
1897 ksocknal_free_buffers (void)
1898 {
1899         LASSERT (atomic_read(&ksocknal_data.ksnd_nactive_txs) == 0);
1900
1901         if (ksocknal_data.ksnd_schedulers != NULL)
1902                 cfs_percpt_free(ksocknal_data.ksnd_schedulers);
1903
1904         spin_lock(&ksocknal_data.ksnd_tx_lock);
1905
1906         if (!list_empty(&ksocknal_data.ksnd_idle_noop_txs)) {
1907                 LIST_HEAD(zlist);
1908                 struct ksock_tx *tx;
1909
1910                 list_splice_init(&ksocknal_data.ksnd_idle_noop_txs, &zlist);
1911                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
1912
1913                 while ((tx = list_first_entry_or_null(&zlist, struct ksock_tx,
1914                                                       tx_list)) != NULL) {
1915                         list_del(&tx->tx_list);
1916                         LIBCFS_FREE(tx, tx->tx_desc_size);
1917                 }
1918         } else {
1919                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
1920         }
1921 }
1922
1923 static int ksocknal_get_link_status(struct net_device *dev)
1924 {
1925         int ret = -1;
1926
1927         LASSERT(dev);
1928
1929         if (!netif_running(dev)) {
1930                 ret = 0;
1931                 CDEBUG(D_NET, "device not running\n");
1932         }
1933         /* Some devices may not be providing link settings */
1934         else if (dev->ethtool_ops->get_link) {
1935                 ret = dev->ethtool_ops->get_link(dev);
1936                 CDEBUG(D_NET, "get_link returns %u\n", ret);
1937         }
1938
1939         return ret;
1940 }
1941
1942 static int
1943 ksocknal_handle_link_state_change(struct net_device *dev,
1944                                   unsigned char operstate)
1945 {
1946         struct lnet_ni *ni = NULL;
1947         struct ksock_net *net;
1948         struct ksock_net *cnxt;
1949         int ifindex;
1950         unsigned char link_down = !(operstate == IF_OPER_UP);
1951         struct in_device *in_dev;
1952         bool found_ip = false;
1953         struct ksock_interface *ksi = NULL;
1954         struct sockaddr_in *sa;
1955         DECLARE_CONST_IN_IFADDR(ifa);
1956
1957         ifindex = dev->ifindex;
1958
1959         if (!ksocknal_data.ksnd_nnets)
1960                 goto out;
1961
1962         list_for_each_entry_safe(net, cnxt, &ksocknal_data.ksnd_nets,
1963                                  ksnn_list) {
1964
1965                 ksi = &net->ksnn_interface;
1966                 sa = (void *)&ksi->ksni_addr;
1967                 found_ip = false;
1968
1969                 if (ksi->ksni_index != ifindex ||
1970                     strcmp(ksi->ksni_name, dev->name))
1971                         continue;
1972
1973                 ni = net->ksnn_ni;
1974
1975                 in_dev = __in_dev_get_rtnl(dev);
1976                 if (!in_dev) {
1977                         CDEBUG(D_NET, "Interface %s has no IPv4 status.\n",
1978                                dev->name);
1979                         CDEBUG(D_NET, "set link fatal state to 1\n");
1980                         atomic_set(&ni->ni_fatal_error_on, 1);
1981                         continue;
1982                 }
1983                 in_dev_for_each_ifa_rtnl(ifa, in_dev) {
1984                         if (sa->sin_addr.s_addr == ifa->ifa_local)
1985                                 found_ip = true;
1986                 }
1987                 endfor_ifa(in_dev);
1988
1989                 if (!found_ip) {
1990                         CDEBUG(D_NET, "Interface %s has no matching ip\n",
1991                                dev->name);
1992                         CDEBUG(D_NET, "set link fatal state to 1\n");
1993                         atomic_set(&ni->ni_fatal_error_on, 1);
1994                         continue;
1995                 }
1996
1997                 if (link_down) {
1998                         CDEBUG(D_NET, "set link fatal state to 1\n");
1999                         atomic_set(&ni->ni_fatal_error_on, link_down);
2000                 } else {
2001                         CDEBUG(D_NET, "set link fatal state to %u\n",
2002                                (ksocknal_get_link_status(dev) == 0));
2003                         atomic_set(&ni->ni_fatal_error_on,
2004                                    (ksocknal_get_link_status(dev) == 0));
2005                 }
2006         }
2007 out:
2008         return 0;
2009 }
2010
2011
2012 static int
2013 ksocknal_handle_inetaddr_change(struct in_ifaddr *ifa, unsigned long event)
2014 {
2015         struct lnet_ni *ni;
2016         struct ksock_net *net;
2017         struct ksock_net *cnxt;
2018         struct net_device *event_netdev = ifa->ifa_dev->dev;
2019         int ifindex;
2020         struct ksock_interface *ksi = NULL;
2021         struct sockaddr_in *sa;
2022
2023         if (!ksocknal_data.ksnd_nnets)
2024                 goto out;
2025
2026         ifindex = event_netdev->ifindex;
2027
2028         list_for_each_entry_safe(net, cnxt, &ksocknal_data.ksnd_nets,
2029                                  ksnn_list) {
2030
2031                 ksi = &net->ksnn_interface;
2032                 sa = (void *)&ksi->ksni_addr;
2033
2034                 if (ksi->ksni_index != ifindex ||
2035                     strcmp(ksi->ksni_name, event_netdev->name))
2036                         continue;
2037
2038                 if (sa->sin_addr.s_addr == ifa->ifa_local) {
2039                         CDEBUG(D_NET, "set link fatal state to %u\n",
2040                                (event == NETDEV_DOWN));
2041                         ni = net->ksnn_ni;
2042                         atomic_set(&ni->ni_fatal_error_on,
2043                                    (event == NETDEV_DOWN));
2044                 }
2045         }
2046 out:
2047         return 0;
2048 }
2049
2050 /************************************
2051  * Net device notifier event handler
2052  ************************************/
2053 static int ksocknal_device_event(struct notifier_block *unused,
2054                                  unsigned long event, void *ptr)
2055 {
2056         struct net_device *dev = netdev_notifier_info_to_dev(ptr);
2057         unsigned char operstate;
2058
2059         operstate = dev->operstate;
2060
2061         CDEBUG(D_NET, "devevent: status=%ld, iface=%s ifindex %d state %u\n",
2062                event, dev->name, dev->ifindex, operstate);
2063
2064         switch (event) {
2065         case NETDEV_UP:
2066         case NETDEV_DOWN:
2067         case NETDEV_CHANGE:
2068                 ksocknal_handle_link_state_change(dev, operstate);
2069                 break;
2070         }
2071
2072         return NOTIFY_OK;
2073 }
2074
2075 /************************************
2076  * Inetaddr notifier event handler
2077  ************************************/
2078 static int ksocknal_inetaddr_event(struct notifier_block *unused,
2079                                    unsigned long event, void *ptr)
2080 {
2081         struct in_ifaddr *ifa = ptr;
2082
2083         CDEBUG(D_NET, "addrevent: status %ld ip addr %pI4, netmask %pI4.\n",
2084                event, &ifa->ifa_address, &ifa->ifa_mask);
2085
2086         switch (event) {
2087         case NETDEV_UP:
2088         case NETDEV_DOWN:
2089         case NETDEV_CHANGE:
2090                 ksocknal_handle_inetaddr_change(ifa, event);
2091                 break;
2092
2093         }
2094         return NOTIFY_OK;
2095 }
2096
2097 static struct notifier_block ksocknal_dev_notifier_block = {
2098         .notifier_call = ksocknal_device_event,
2099 };
2100
2101 static struct notifier_block ksocknal_inetaddr_notifier_block = {
2102         .notifier_call = ksocknal_inetaddr_event,
2103 };
2104
2105 static void
2106 ksocknal_base_shutdown(void)
2107 {
2108         struct ksock_sched *sched;
2109         struct ksock_peer_ni *peer_ni;
2110         int i;
2111
2112         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %lld\n",
2113                libcfs_kmem_read());
2114         LASSERT (ksocknal_data.ksnd_nnets == 0);
2115
2116         if (ksocknal_data.ksnd_init == SOCKNAL_INIT_ALL) {
2117                 unregister_netdevice_notifier(&ksocknal_dev_notifier_block);
2118                 unregister_inetaddr_notifier(&ksocknal_inetaddr_notifier_block);
2119         }
2120
2121         switch (ksocknal_data.ksnd_init) {
2122         default:
2123                 LASSERT(0);
2124                 fallthrough;
2125
2126         case SOCKNAL_INIT_ALL:
2127         case SOCKNAL_INIT_DATA:
2128                 hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list)
2129                         LASSERT(0);
2130
2131                 LASSERT(list_empty(&ksocknal_data.ksnd_nets));
2132                 LASSERT(list_empty(&ksocknal_data.ksnd_enomem_conns));
2133                 LASSERT(list_empty(&ksocknal_data.ksnd_zombie_conns));
2134                 LASSERT(list_empty(&ksocknal_data.ksnd_connd_connreqs));
2135                 LASSERT(list_empty(&ksocknal_data.ksnd_connd_routes));
2136
2137                 if (ksocknal_data.ksnd_schedulers != NULL) {
2138                         cfs_percpt_for_each(sched, i,
2139                                             ksocknal_data.ksnd_schedulers) {
2140
2141                                 LASSERT(list_empty(&sched->kss_tx_conns));
2142                                 LASSERT(list_empty(&sched->kss_rx_conns));
2143                                 LASSERT(list_empty(&sched->kss_zombie_noop_txs));
2144                                 LASSERT(sched->kss_nconns == 0);
2145                         }
2146                 }
2147
2148                 /* flag threads to terminate; wake and wait for them to die */
2149                 ksocknal_data.ksnd_shuttingdown = 1;
2150                 wake_up_all(&ksocknal_data.ksnd_connd_waitq);
2151                 wake_up(&ksocknal_data.ksnd_reaper_waitq);
2152
2153                 if (ksocknal_data.ksnd_schedulers != NULL) {
2154                         cfs_percpt_for_each(sched, i,
2155                                             ksocknal_data.ksnd_schedulers)
2156                                         wake_up_all(&sched->kss_waitq);
2157                 }
2158
2159                 wait_var_event_warning(&ksocknal_data.ksnd_nthreads,
2160                                        atomic_read(&ksocknal_data.ksnd_nthreads) == 0,
2161                                        "waiting for %d threads to terminate\n",
2162                                        atomic_read(&ksocknal_data.ksnd_nthreads));
2163
2164                 ksocknal_free_buffers();
2165
2166                 ksocknal_data.ksnd_init = SOCKNAL_INIT_NOTHING;
2167                 break;
2168         }
2169
2170         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %lld\n",
2171                libcfs_kmem_read());
2172
2173         module_put(THIS_MODULE);
2174 }
2175
2176 static int
2177 ksocknal_base_startup(void)
2178 {
2179         struct ksock_sched *sched;
2180         int rc;
2181         int i;
2182
2183         LASSERT(ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
2184         LASSERT(ksocknal_data.ksnd_nnets == 0);
2185
2186         memset(&ksocknal_data, 0, sizeof(ksocknal_data)); /* zero pointers */
2187
2188         hash_init(ksocknal_data.ksnd_peers);
2189
2190         rwlock_init(&ksocknal_data.ksnd_global_lock);
2191         INIT_LIST_HEAD(&ksocknal_data.ksnd_nets);
2192
2193         spin_lock_init(&ksocknal_data.ksnd_reaper_lock);
2194         INIT_LIST_HEAD(&ksocknal_data.ksnd_enomem_conns);
2195         INIT_LIST_HEAD(&ksocknal_data.ksnd_zombie_conns);
2196         INIT_LIST_HEAD(&ksocknal_data.ksnd_deathrow_conns);
2197         init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
2198
2199         spin_lock_init(&ksocknal_data.ksnd_connd_lock);
2200         INIT_LIST_HEAD(&ksocknal_data.ksnd_connd_connreqs);
2201         INIT_LIST_HEAD(&ksocknal_data.ksnd_connd_routes);
2202         init_waitqueue_head(&ksocknal_data.ksnd_connd_waitq);
2203
2204         spin_lock_init(&ksocknal_data.ksnd_tx_lock);
2205         INIT_LIST_HEAD(&ksocknal_data.ksnd_idle_noop_txs);
2206
2207         /* NB memset above zeros whole of ksocknal_data */
2208
2209         /* flag lists/ptrs/locks initialised */
2210         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
2211         if (!try_module_get(THIS_MODULE))
2212                 goto failed;
2213
2214         /* Create a scheduler block per available CPT */
2215         ksocknal_data.ksnd_schedulers = cfs_percpt_alloc(lnet_cpt_table(),
2216                                                          sizeof(*sched));
2217         if (ksocknal_data.ksnd_schedulers == NULL)
2218                 goto failed;
2219
2220         cfs_percpt_for_each(sched, i, ksocknal_data.ksnd_schedulers) {
2221                 int nthrs;
2222
2223                 /*
2224                  * make sure not to allocate more threads than there are
2225                  * cores/CPUs in teh CPT
2226                  */
2227                 nthrs = cfs_cpt_weight(lnet_cpt_table(), i);
2228                 if (*ksocknal_tunables.ksnd_nscheds > 0) {
2229                         nthrs = min(nthrs, *ksocknal_tunables.ksnd_nscheds);
2230                 } else {
2231                         /*
2232                          * max to half of CPUs, assume another half should be
2233                          * reserved for upper layer modules
2234                          */
2235                         nthrs = min(max(SOCKNAL_NSCHEDS, nthrs >> 1), nthrs);
2236                 }
2237
2238                 sched->kss_nthreads_max = nthrs;
2239                 sched->kss_cpt = i;
2240
2241                 spin_lock_init(&sched->kss_lock);
2242                 INIT_LIST_HEAD(&sched->kss_rx_conns);
2243                 INIT_LIST_HEAD(&sched->kss_tx_conns);
2244                 INIT_LIST_HEAD(&sched->kss_zombie_noop_txs);
2245                 init_waitqueue_head(&sched->kss_waitq);
2246         }
2247
2248         ksocknal_data.ksnd_connd_starting         = 0;
2249         ksocknal_data.ksnd_connd_failed_stamp     = 0;
2250         ksocknal_data.ksnd_connd_starting_stamp   = ktime_get_real_seconds();
2251         /* must have at least 2 connds to remain responsive to accepts while
2252          * connecting */
2253         if (*ksocknal_tunables.ksnd_nconnds < SOCKNAL_CONND_RESV + 1)
2254                 *ksocknal_tunables.ksnd_nconnds = SOCKNAL_CONND_RESV + 1;
2255
2256         if (*ksocknal_tunables.ksnd_nconnds_max <
2257             *ksocknal_tunables.ksnd_nconnds) {
2258                 ksocknal_tunables.ksnd_nconnds_max =
2259                         ksocknal_tunables.ksnd_nconnds;
2260         }
2261
2262         for (i = 0; i < *ksocknal_tunables.ksnd_nconnds; i++) {
2263                 spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
2264                 ksocknal_data.ksnd_connd_starting++;
2265                 spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
2266
2267                 rc = ksocknal_thread_start(ksocknal_connd,
2268                                            (void *)((uintptr_t)i),
2269                                            "socknal_cd%02d", i);
2270                 if (rc != 0) {
2271                         spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
2272                         ksocknal_data.ksnd_connd_starting--;
2273                         spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
2274                         CERROR("Can't spawn socknal connd: %d\n", rc);
2275                         goto failed;
2276                 }
2277         }
2278
2279         rc = ksocknal_thread_start(ksocknal_reaper, NULL, "socknal_reaper");
2280         if (rc != 0) {
2281                 CERROR ("Can't spawn socknal reaper: %d\n", rc);
2282                 goto failed;
2283         }
2284
2285         register_netdevice_notifier(&ksocknal_dev_notifier_block);
2286         register_inetaddr_notifier(&ksocknal_inetaddr_notifier_block);
2287
2288         /* flag everything initialised */
2289         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
2290
2291         return 0;
2292
2293  failed:
2294         ksocknal_base_shutdown();
2295         return -ENETDOWN;
2296 }
2297
2298 static int
2299 ksocknal_debug_peerhash(struct lnet_ni *ni)
2300 {
2301         struct ksock_peer_ni *peer_ni;
2302         int i;
2303
2304         read_lock(&ksocknal_data.ksnd_global_lock);
2305
2306         hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list) {
2307                 struct ksock_conn_cb *conn_cb;
2308                 struct ksock_conn *conn;
2309
2310                 if (peer_ni->ksnp_ni != ni)
2311                         continue;
2312
2313                 CWARN("Active peer_ni on shutdown: %s, ref %d, closing %d, accepting %d, err %d, zcookie %llu, txq %d, zc_req %d\n",
2314                       libcfs_idstr(&peer_ni->ksnp_id),
2315                       refcount_read(&peer_ni->ksnp_refcount),
2316                       peer_ni->ksnp_closing,
2317                       peer_ni->ksnp_accepting, peer_ni->ksnp_error,
2318                       peer_ni->ksnp_zc_next_cookie,
2319                       !list_empty(&peer_ni->ksnp_tx_queue),
2320                       !list_empty(&peer_ni->ksnp_zc_req_list));
2321
2322                 conn_cb = peer_ni->ksnp_conn_cb;
2323                 if (conn_cb) {
2324                         CWARN("ConnCB: ref %d, schd %d, conn %d, cnted %d, del %d\n",
2325                               refcount_read(&conn_cb->ksnr_refcount),
2326                               conn_cb->ksnr_scheduled, conn_cb->ksnr_connecting,
2327                               conn_cb->ksnr_connected, conn_cb->ksnr_deleted);
2328                 }
2329
2330                 list_for_each_entry(conn, &peer_ni->ksnp_conns, ksnc_list) {
2331                         CWARN("Conn: ref %d, sref %d, t %d, c %d\n",
2332                               refcount_read(&conn->ksnc_conn_refcount),
2333                               refcount_read(&conn->ksnc_sock_refcount),
2334                               conn->ksnc_type, conn->ksnc_closing);
2335                 }
2336                 break;
2337         }
2338
2339         read_unlock(&ksocknal_data.ksnd_global_lock);
2340         return 0;
2341 }
2342
2343 void
2344 ksocknal_shutdown(struct lnet_ni *ni)
2345 {
2346         struct ksock_net *net = ni->ni_data;
2347
2348         LASSERT(ksocknal_data.ksnd_init == SOCKNAL_INIT_ALL);
2349         LASSERT(ksocknal_data.ksnd_nnets > 0);
2350
2351         /* prevent new peers */
2352         atomic_add(SOCKNAL_SHUTDOWN_BIAS, &net->ksnn_npeers);
2353
2354         /* Delete all peers */
2355         ksocknal_del_peer(ni, NULL);
2356
2357         /* Wait for all peer_ni state to clean up */
2358         wait_var_event_warning(&net->ksnn_npeers,
2359                                atomic_read(&net->ksnn_npeers) ==
2360                                SOCKNAL_SHUTDOWN_BIAS,
2361                                "waiting for %d peers to disconnect\n",
2362                                ksocknal_debug_peerhash(ni) +
2363                                atomic_read(&net->ksnn_npeers) -
2364                                SOCKNAL_SHUTDOWN_BIAS);
2365
2366         LASSERT(net->ksnn_interface.ksni_npeers == 0);
2367         LASSERT(net->ksnn_interface.ksni_nroutes == 0);
2368
2369         list_del(&net->ksnn_list);
2370         LIBCFS_FREE(net, sizeof(*net));
2371
2372         ksocknal_data.ksnd_nnets--;
2373         if (ksocknal_data.ksnd_nnets == 0)
2374                 ksocknal_base_shutdown();
2375 }
2376
2377 static int
2378 ksocknal_search_new_ipif(struct ksock_net *net)
2379 {
2380         int new_ipif = 0;
2381         char *ifnam = &net->ksnn_interface.ksni_name[0];
2382         char *colon = strchr(ifnam, ':');
2383         bool found = false;
2384         struct ksock_net *tmp;
2385
2386         if (colon != NULL)
2387                 *colon = 0;
2388
2389         list_for_each_entry(tmp, &ksocknal_data.ksnd_nets, ksnn_list) {
2390                 char *ifnam2 = &tmp->ksnn_interface.ksni_name[0];
2391                 char *colon2 = strchr(ifnam2, ':');
2392
2393                 if (colon2 != NULL)
2394                         *colon2 = 0;
2395
2396                 found = strcmp(ifnam, ifnam2) == 0;
2397                 if (colon2 != NULL)
2398                         *colon2 = ':';
2399         }
2400
2401         new_ipif += !found;
2402         if (colon != NULL)
2403                 *colon = ':';
2404
2405         return new_ipif;
2406 }
2407
2408 static int
2409 ksocknal_start_schedulers(struct ksock_sched *sched)
2410 {
2411         int     nthrs;
2412         int     rc = 0;
2413         int     i;
2414
2415         if (sched->kss_nthreads == 0) {
2416                 if (*ksocknal_tunables.ksnd_nscheds > 0) {
2417                         nthrs = sched->kss_nthreads_max;
2418                 } else {
2419                         nthrs = cfs_cpt_weight(lnet_cpt_table(),
2420                                                sched->kss_cpt);
2421                         nthrs = min(max(SOCKNAL_NSCHEDS, nthrs >> 1), nthrs);
2422                         nthrs = min(SOCKNAL_NSCHEDS_HIGH, nthrs);
2423                 }
2424                 nthrs = min(nthrs, sched->kss_nthreads_max);
2425         } else {
2426                 LASSERT(sched->kss_nthreads <= sched->kss_nthreads_max);
2427                 /* increase two threads if there is new interface */
2428                 nthrs = min(2, sched->kss_nthreads_max - sched->kss_nthreads);
2429         }
2430
2431         for (i = 0; i < nthrs; i++) {
2432                 long id;
2433
2434                 id = KSOCK_THREAD_ID(sched->kss_cpt, sched->kss_nthreads + i);
2435                 rc = ksocknal_thread_start(ksocknal_scheduler, (void *)id,
2436                                            "socknal_sd%02d_%02d",
2437                                            sched->kss_cpt,
2438                                            (int)KSOCK_THREAD_SID(id));
2439                 if (rc == 0)
2440                         continue;
2441
2442                 CERROR("Can't spawn thread %d for scheduler[%d]: %d\n",
2443                        sched->kss_cpt, (int) KSOCK_THREAD_SID(id), rc);
2444                 break;
2445         }
2446
2447         sched->kss_nthreads += i;
2448         return rc;
2449 }
2450
2451 static int
2452 ksocknal_net_start_threads(struct ksock_net *net, __u32 *cpts, int ncpts)
2453 {
2454         int newif = ksocknal_search_new_ipif(net);
2455         int rc;
2456         int i;
2457
2458         if (ncpts > 0 && ncpts > cfs_cpt_number(lnet_cpt_table()))
2459                 return -EINVAL;
2460
2461         for (i = 0; i < ncpts; i++) {
2462                 struct ksock_sched *sched;
2463                 int cpt = (cpts == NULL) ? i : cpts[i];
2464
2465                 LASSERT(cpt < cfs_cpt_number(lnet_cpt_table()));
2466                 sched = ksocknal_data.ksnd_schedulers[cpt];
2467
2468                 if (!newif && sched->kss_nthreads > 0)
2469                         continue;
2470
2471                 rc = ksocknal_start_schedulers(sched);
2472                 if (rc != 0)
2473                         return rc;
2474         }
2475         return 0;
2476 }
2477
2478 int
2479 ksocknal_startup(struct lnet_ni *ni)
2480 {
2481         struct ksock_net *net;
2482         struct ksock_interface *ksi = NULL;
2483         struct lnet_inetdev *ifaces = NULL;
2484         int i = 0;
2485         int rc;
2486
2487         LASSERT (ni->ni_net->net_lnd == &the_ksocklnd);
2488         if (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING) {
2489                 rc = ksocknal_base_startup();
2490                 if (rc != 0)
2491                         return rc;
2492         }
2493         LIBCFS_ALLOC(net, sizeof(*net));
2494         if (net == NULL)
2495                 goto fail_0;
2496         net->ksnn_incarnation = ktime_get_real_ns();
2497         ni->ni_data = net;
2498
2499         ksocknal_tunables_setup(ni);
2500
2501         rc = lnet_inet_enumerate(&ifaces, ni->ni_net_ns, true);
2502         if (rc < 0)
2503                 goto fail_1;
2504
2505         ksi = &net->ksnn_interface;
2506
2507         /* Use the first discovered interface or look in the list */
2508         if (ni->ni_interface) {
2509                 for (i = 0; i < rc; i++)
2510                         if (strcmp(ifaces[i].li_name, ni->ni_interface) == 0)
2511                                 break;
2512
2513                 /* ni_interfaces doesn't contain the interface we want */
2514                 if (i == rc) {
2515                         CERROR("ksocklnd: failed to find interface %s\n",
2516                                ni->ni_interface);
2517                         goto fail_1;
2518                 }
2519         }
2520
2521         ni->ni_dev_cpt = ifaces[i].li_cpt;
2522         ksi->ksni_index = ifaces[i].li_index;
2523         if (ifaces[i].li_ipv6) {
2524                 struct sockaddr_in6 *sa;
2525                 sa = (void *)&ksi->ksni_addr;
2526                 memset(sa, 0, sizeof(*sa));
2527                 sa->sin6_family = AF_INET6;
2528                 memcpy(&sa->sin6_addr, ifaces[i].li_ipv6addr,
2529                        sizeof(struct in6_addr));
2530                 ni->ni_nid.nid_size = sizeof(struct in6_addr) - 4;
2531                 memcpy(&ni->ni_nid.nid_addr, ifaces[i].li_ipv6addr,
2532                        sizeof(struct in6_addr));
2533         } else {
2534                 struct sockaddr_in *sa;
2535                 sa = (void *)&ksi->ksni_addr;
2536                 memset(sa, 0, sizeof(*sa));
2537                 sa->sin_family = AF_INET;
2538                 sa->sin_addr.s_addr = htonl(ifaces[i].li_ipaddr);
2539                 ksi->ksni_netmask = ifaces[i].li_netmask;
2540                 ni->ni_nid.nid_size = 4 - 4;
2541                 ni->ni_nid.nid_addr[0] = sa->sin_addr.s_addr;
2542         }
2543         strlcpy(ksi->ksni_name, ifaces[i].li_name, sizeof(ksi->ksni_name));
2544
2545         /* call it before add it to ksocknal_data.ksnd_nets */
2546         rc = ksocknal_net_start_threads(net, ni->ni_cpts, ni->ni_ncpts);
2547         if (rc != 0)
2548                 goto fail_1;
2549
2550         list_add(&net->ksnn_list, &ksocknal_data.ksnd_nets);
2551         net->ksnn_ni = ni;
2552         ksocknal_data.ksnd_nnets++;
2553
2554         return 0;
2555
2556 fail_1:
2557         LIBCFS_FREE(net, sizeof(*net));
2558 fail_0:
2559         if (ksocknal_data.ksnd_nnets == 0)
2560                 ksocknal_base_shutdown();
2561
2562         return -ENETDOWN;
2563 }
2564
2565 static void __exit ksocklnd_exit(void)
2566 {
2567         lnet_unregister_lnd(&the_ksocklnd);
2568 }
2569
2570 static const struct lnet_lnd the_ksocklnd = {
2571         .lnd_type               = SOCKLND,
2572         .lnd_startup            = ksocknal_startup,
2573         .lnd_shutdown           = ksocknal_shutdown,
2574         .lnd_ctl                = ksocknal_ctl,
2575         .lnd_send               = ksocknal_send,
2576         .lnd_recv               = ksocknal_recv,
2577         .lnd_notify_peer_down   = ksocknal_notify_gw_down,
2578         .lnd_accept             = ksocknal_accept,
2579 };
2580
2581 static int __init ksocklnd_init(void)
2582 {
2583         int rc;
2584
2585         /* check ksnr_connected/connecting field large enough */
2586         BUILD_BUG_ON(SOCKLND_CONN_NTYPES > 4);
2587         BUILD_BUG_ON(SOCKLND_CONN_ACK != SOCKLND_CONN_BULK_IN);
2588
2589         rc = ksocknal_tunables_init();
2590         if (rc != 0)
2591                 return rc;
2592
2593         lnet_register_lnd(&the_ksocklnd);
2594
2595         return 0;
2596 }
2597
2598 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2599 MODULE_DESCRIPTION("TCP Socket LNet Network Driver");
2600 MODULE_VERSION("2.8.0");
2601 MODULE_LICENSE("GPL");
2602
2603 module_init(ksocklnd_init);
2604 module_exit(ksocklnd_exit);