Whamcloud - gitweb
d187969b6c2be430f2cc1f5aeda0e5b99db4de22
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lnet/klnds/socklnd/socklnd.c
32  *
33  * Author: Zach Brown <zab@zabbo.net>
34  * Author: Peter J. Braam <braam@clusterfs.com>
35  * Author: Phil Schwan <phil@clusterfs.com>
36  * Author: Eric Barton <eric@bartonsoftware.com>
37  */
38
39 #include <linux/ethtool.h>
40 #include <linux/inetdevice.h>
41 #include <linux/kernel.h>
42 #include <linux/sunrpc/addr.h>
43 #include <net/addrconf.h>
44 #include "socklnd.h"
45
46 static const struct lnet_lnd the_ksocklnd;
47 struct ksock_nal_data ksocknal_data;
48
49 static int ksocknal_ip2index(struct sockaddr *addr, struct lnet_ni *ni,
50                              int *dev_status)
51 {
52         struct net_device *dev;
53         int ret = -1;
54         DECLARE_CONST_IN_IFADDR(ifa);
55
56         *dev_status = -1;
57
58         if (addr->sa_family != AF_INET && addr->sa_family != AF_INET6)
59                 return ret;
60
61         rcu_read_lock();
62         for_each_netdev(ni->ni_net_ns, dev) {
63                 int flags = dev_get_flags(dev);
64                 struct in_device *in_dev;
65
66                 if (flags & IFF_LOOPBACK) /* skip the loopback IF */
67                         continue;
68
69                 if (!(flags & IFF_UP))
70                         continue;
71
72                 switch (addr->sa_family) {
73                 case AF_INET:
74                         in_dev = __in_dev_get_rcu(dev);
75                         if (!in_dev)
76                                 continue;
77
78                         in_dev_for_each_ifa_rcu(ifa, in_dev) {
79                                 if (ifa->ifa_local ==
80                                     ((struct sockaddr_in *)addr)->sin_addr.s_addr)
81                                         ret = dev->ifindex;
82                         }
83                         endfor_ifa(in_dev);
84                         break;
85 #if IS_ENABLED(CONFIG_IPV6)
86                 case AF_INET6: {
87                         struct inet6_dev *in6_dev;
88                         const struct inet6_ifaddr *ifa6;
89                         struct sockaddr_in6 *addr6 = (struct sockaddr_in6*)addr;
90
91                         in6_dev = __in6_dev_get(dev);
92                         if (!in6_dev)
93                                 continue;
94
95                         list_for_each_entry_rcu(ifa6, &in6_dev->addr_list, if_list) {
96                                 if (ipv6_addr_cmp(&ifa6->addr,
97                                                  &addr6->sin6_addr) == 0)
98                                         ret = dev->ifindex;
99                         }
100                         break;
101                         }
102 #endif /* IS_ENABLED(CONFIG_IPV6) */
103                 }
104                 if (ret >= 0)
105                         break;
106         }
107
108         rcu_read_unlock();
109         if (ret >= 0)
110                 *dev_status = 1;
111
112         if ((ret == -1) ||
113             ((dev->reg_state == NETREG_UNREGISTERING) ||
114              ((dev->operstate != IF_OPER_UP) &&
115               (dev->operstate != IF_OPER_UNKNOWN))) ||
116             (lnet_get_link_status(dev) == 0))
117                 *dev_status = 0;
118
119         return ret;
120 }
121
122 static struct ksock_conn_cb *
123 ksocknal_create_conn_cb(struct sockaddr *addr)
124 {
125         struct ksock_conn_cb *conn_cb;
126
127         LIBCFS_ALLOC(conn_cb, sizeof(*conn_cb));
128         if (!conn_cb)
129                 return NULL;
130
131         refcount_set(&conn_cb->ksnr_refcount, 1);
132         conn_cb->ksnr_peer = NULL;
133         conn_cb->ksnr_retry_interval = 0;         /* OK to connect at any time */
134         rpc_copy_addr((struct sockaddr *)&conn_cb->ksnr_addr, addr);
135         rpc_set_port((struct sockaddr *)&conn_cb->ksnr_addr,
136                      rpc_get_port(addr));
137         conn_cb->ksnr_scheduled = 0;
138         conn_cb->ksnr_connecting = 0;
139         conn_cb->ksnr_connected = 0;
140         conn_cb->ksnr_deleted = 0;
141         conn_cb->ksnr_conn_count = 0;
142         conn_cb->ksnr_ctrl_conn_count = 0;
143         conn_cb->ksnr_blki_conn_count = 0;
144         conn_cb->ksnr_blko_conn_count = 0;
145         conn_cb->ksnr_max_conns = 0;
146         conn_cb->ksnr_busy_retry_count = 0;
147
148         return conn_cb;
149 }
150
151 void
152 ksocknal_destroy_conn_cb(struct ksock_conn_cb *conn_cb)
153 {
154         LASSERT(refcount_read(&conn_cb->ksnr_refcount) == 0);
155
156         if (conn_cb->ksnr_peer)
157                 ksocknal_peer_decref(conn_cb->ksnr_peer);
158
159         LIBCFS_FREE(conn_cb, sizeof(*conn_cb));
160 }
161
162 static struct ksock_peer_ni *
163 ksocknal_create_peer(struct lnet_ni *ni, struct lnet_processid *id)
164 {
165         int cpt = lnet_nid2cpt(&id->nid, ni);
166         struct ksock_net *net = ni->ni_data;
167         struct ksock_peer_ni *peer_ni;
168
169         LASSERT(!LNET_NID_IS_ANY(&id->nid));
170         LASSERT(id->pid != LNET_PID_ANY);
171         LASSERT(!in_interrupt());
172
173         if (!atomic_inc_unless_negative(&net->ksnn_npeers)) {
174                 CERROR("Can't create peer_ni: network shutdown\n");
175                 return ERR_PTR(-ESHUTDOWN);
176         }
177
178         LIBCFS_CPT_ALLOC(peer_ni, lnet_cpt_table(), cpt, sizeof(*peer_ni));
179         if (!peer_ni) {
180                 atomic_dec(&net->ksnn_npeers);
181                 return ERR_PTR(-ENOMEM);
182         }
183
184         peer_ni->ksnp_ni = ni;
185         peer_ni->ksnp_id = *id;
186         refcount_set(&peer_ni->ksnp_refcount, 1); /* 1 ref for caller */
187         peer_ni->ksnp_closing = 0;
188         peer_ni->ksnp_accepting = 0;
189         peer_ni->ksnp_proto = NULL;
190         peer_ni->ksnp_last_alive = 0;
191         peer_ni->ksnp_zc_next_cookie = SOCKNAL_KEEPALIVE_PING + 1;
192         peer_ni->ksnp_conn_cb = NULL;
193
194         INIT_LIST_HEAD(&peer_ni->ksnp_conns);
195         INIT_LIST_HEAD(&peer_ni->ksnp_tx_queue);
196         INIT_LIST_HEAD(&peer_ni->ksnp_zc_req_list);
197         spin_lock_init(&peer_ni->ksnp_lock);
198
199         return peer_ni;
200 }
201
202 void
203 ksocknal_destroy_peer(struct ksock_peer_ni *peer_ni)
204 {
205         struct ksock_net *net = peer_ni->ksnp_ni->ni_data;
206
207         CDEBUG (D_NET, "peer_ni %s %p deleted\n",
208                 libcfs_idstr(&peer_ni->ksnp_id), peer_ni);
209
210         LASSERT(refcount_read(&peer_ni->ksnp_refcount) == 0);
211         LASSERT(peer_ni->ksnp_accepting == 0);
212         LASSERT(list_empty(&peer_ni->ksnp_conns));
213         LASSERT(peer_ni->ksnp_conn_cb == NULL);
214         LASSERT(list_empty(&peer_ni->ksnp_tx_queue));
215         LASSERT(list_empty(&peer_ni->ksnp_zc_req_list));
216
217         LIBCFS_FREE(peer_ni, sizeof(*peer_ni));
218
219         /* NB a peer_ni's connections and conn_cb keep a reference on their
220          * peer_ni until they are destroyed, so we can be assured that _all_
221          * state to do with this peer_ni has been cleaned up when its refcount
222          * drops to zero.
223          */
224         if (atomic_dec_and_test(&net->ksnn_npeers))
225                 wake_up_var(&net->ksnn_npeers);
226 }
227
228 struct ksock_peer_ni *
229 ksocknal_find_peer_locked(struct lnet_ni *ni, struct lnet_processid *id)
230 {
231         struct ksock_peer_ni *peer_ni;
232         unsigned long hash = nidhash(&id->nid);
233
234         hash_for_each_possible(ksocknal_data.ksnd_peers, peer_ni,
235                                ksnp_list, hash) {
236                 LASSERT(!peer_ni->ksnp_closing);
237
238                 if (peer_ni->ksnp_ni != ni)
239                         continue;
240
241                 if (!nid_same(&peer_ni->ksnp_id.nid, &id->nid) ||
242                     peer_ni->ksnp_id.pid != id->pid)
243                         continue;
244
245                 CDEBUG(D_NET, "got peer_ni [%p] -> %s (%d)\n",
246                        peer_ni, libcfs_idstr(id),
247                        refcount_read(&peer_ni->ksnp_refcount));
248                 return peer_ni;
249         }
250         return NULL;
251 }
252
253 struct ksock_peer_ni *
254 ksocknal_find_peer(struct lnet_ni *ni, struct lnet_processid *id)
255 {
256         struct ksock_peer_ni *peer_ni;
257
258         read_lock(&ksocknal_data.ksnd_global_lock);
259         peer_ni = ksocknal_find_peer_locked(ni, id);
260         if (peer_ni != NULL)                    /* +1 ref for caller? */
261                 ksocknal_peer_addref(peer_ni);
262         read_unlock(&ksocknal_data.ksnd_global_lock);
263
264         return peer_ni;
265 }
266
267 static void
268 ksocknal_unlink_peer_locked(struct ksock_peer_ni *peer_ni)
269 {
270         LASSERT(list_empty(&peer_ni->ksnp_conns));
271         LASSERT(peer_ni->ksnp_conn_cb == NULL);
272         LASSERT(!peer_ni->ksnp_closing);
273         peer_ni->ksnp_closing = 1;
274         hlist_del(&peer_ni->ksnp_list);
275         /* lose peerlist's ref */
276         ksocknal_peer_decref(peer_ni);
277 }
278
279
280 static void
281 ksocknal_dump_peer_debug_info(struct ksock_peer_ni *peer_ni)
282 {
283         struct ksock_conn *conn;
284         struct list_head *ctmp;
285         struct list_head *txtmp;
286         int ccount = 0;
287         int txcount = 0;
288
289         list_for_each(ctmp, &peer_ni->ksnp_conns) {
290                 conn = list_entry(ctmp, struct ksock_conn, ksnc_list);
291
292                 if (!list_empty(&conn->ksnc_tx_queue))
293                         list_for_each(txtmp, &conn->ksnc_tx_queue) txcount++;
294
295                 CDEBUG(D_CONSOLE, "Conn %d [type, closing, crefcnt, srefcnt]: %d, %d, %d, %d\n",
296                        ccount,
297                        conn->ksnc_type,
298                        conn->ksnc_closing,
299                        refcount_read(&conn->ksnc_conn_refcount),
300                        refcount_read(&conn->ksnc_sock_refcount));
301                 CDEBUG(D_CONSOLE, "Conn %d rx [scheduled, ready, state]: %d, %d, %d\n",
302                        ccount,
303                        conn->ksnc_rx_scheduled,
304                        conn->ksnc_rx_ready,
305                        conn->ksnc_rx_state);
306                 CDEBUG(D_CONSOLE, "Conn %d tx [txqcnt, scheduled, last_post, ready, deadline]: %d, %d, %lld, %d, %lld\n",
307                        ccount,
308                        txcount,
309                        conn->ksnc_tx_scheduled,
310                        conn->ksnc_tx_last_post,
311                        conn->ksnc_rx_ready,
312                        conn->ksnc_rx_deadline);
313
314                 if (conn->ksnc_scheduler)
315                         CDEBUG(D_CONSOLE, "Conn %d sched [nconns, cpt]: %d, %d\n",
316                                ccount,
317                                conn->ksnc_scheduler->kss_nconns,
318                                conn->ksnc_scheduler->kss_cpt);
319
320                 txcount = 0;
321                 ccount++;
322         }
323 }
324
325 static int
326 ksocknal_get_peer_info(struct lnet_ni *ni, int index,
327                        struct lnet_processid *id, __u32 *myip, __u32 *peer_ip,
328                        int *port, int *conn_count, int *share_count)
329 {
330         struct ksock_peer_ni *peer_ni;
331         struct ksock_conn_cb *conn_cb;
332         int i;
333         int rc = -ENOENT;
334         struct ksock_net *net;
335
336         read_lock(&ksocknal_data.ksnd_global_lock);
337
338         hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list) {
339
340                 if (peer_ni->ksnp_ni != ni)
341                         continue;
342                 if (index-- > 0)
343                         continue;
344
345                 *id = peer_ni->ksnp_id;
346                 conn_cb = peer_ni->ksnp_conn_cb;
347                 if (conn_cb == NULL) {
348                         *myip = 0;
349                         *peer_ip = 0;
350                         *port = 0;
351                         *conn_count = 0;
352                         *share_count = 0;
353                         rc = 0;
354                 } else {
355                         ksocknal_dump_peer_debug_info(peer_ni);
356
357                         if (conn_cb->ksnr_addr.ss_family == AF_INET) {
358                                 struct sockaddr_in *sa =
359                                         (void *)&conn_cb->ksnr_addr;
360                                 net = ni->ni_data;
361                                 rc = choose_ipv4_src(myip,
362                                                      net->ksnn_interface.ksni_index,
363                                                      ntohl(sa->sin_addr.s_addr),
364                                                      ni->ni_net_ns);
365                                 *peer_ip = ntohl(sa->sin_addr.s_addr);
366                                 *port = ntohs(sa->sin_port);
367
368                         } else {
369                                 *myip = 0xFFFFFFFF;
370                                 *peer_ip = 0xFFFFFFFF;
371                                 *port = 0;
372                                 rc = -ENOTSUPP;
373                         }
374                         *conn_count = conn_cb->ksnr_conn_count;
375                         *share_count = 1;
376                 }
377                 break;
378         }
379         read_unlock(&ksocknal_data.ksnd_global_lock);
380         return rc;
381 }
382
383 static unsigned int
384 ksocknal_get_conns_per_peer(struct ksock_peer_ni *peer_ni)
385 {
386         struct lnet_ni *ni = peer_ni->ksnp_ni;
387         struct lnet_ioctl_config_socklnd_tunables *tunables;
388
389         LASSERT(ni);
390
391         tunables = &ni->ni_lnd_tunables.lnd_tun_u.lnd_sock;
392
393         return tunables->lnd_conns_per_peer;
394 }
395
396 static void
397 ksocknal_incr_conn_count(struct ksock_conn_cb *conn_cb,
398                          int type)
399 {
400         conn_cb->ksnr_conn_count++;
401
402         /* check if all connections of the given type got created */
403         switch (type) {
404         case SOCKLND_CONN_CONTROL:
405                 conn_cb->ksnr_ctrl_conn_count++;
406                 /* there's a single control connection per peer,
407                  * two in case of loopback
408                  */
409                 conn_cb->ksnr_connected |= BIT(type);
410                 break;
411         case SOCKLND_CONN_BULK_IN:
412                 conn_cb->ksnr_blki_conn_count++;
413                 if (conn_cb->ksnr_blki_conn_count >= conn_cb->ksnr_max_conns)
414                         conn_cb->ksnr_connected |= BIT(type);
415                 break;
416         case SOCKLND_CONN_BULK_OUT:
417                 conn_cb->ksnr_blko_conn_count++;
418                 if (conn_cb->ksnr_blko_conn_count >= conn_cb->ksnr_max_conns)
419                         conn_cb->ksnr_connected |= BIT(type);
420                 break;
421         case SOCKLND_CONN_ANY:
422                 if (conn_cb->ksnr_conn_count >= conn_cb->ksnr_max_conns)
423                         conn_cb->ksnr_connected |= BIT(type);
424                 break;
425         default:
426                 LBUG();
427                 break;
428         }
429
430         CDEBUG(D_NET, "Add conn type %d, ksnr_connected %x ksnr_max_conns %d\n",
431                type, conn_cb->ksnr_connected, conn_cb->ksnr_max_conns);
432 }
433
434
435 static void
436 ksocknal_decr_conn_count(struct ksock_conn_cb *conn_cb,
437                          int type)
438 {
439         conn_cb->ksnr_conn_count--;
440
441         /* check if all connections of the given type got created */
442         switch (type) {
443         case SOCKLND_CONN_CONTROL:
444                 conn_cb->ksnr_ctrl_conn_count--;
445                 /* there's a single control connection per peer,
446                  * two in case of loopback
447                  */
448                 if (conn_cb->ksnr_ctrl_conn_count == 0)
449                         conn_cb->ksnr_connected &= ~BIT(type);
450                 break;
451         case SOCKLND_CONN_BULK_IN:
452                 conn_cb->ksnr_blki_conn_count--;
453                 if (conn_cb->ksnr_blki_conn_count == 0)
454                         conn_cb->ksnr_connected &= ~BIT(type);
455                 break;
456         case SOCKLND_CONN_BULK_OUT:
457                 conn_cb->ksnr_blko_conn_count--;
458                 if (conn_cb->ksnr_blko_conn_count == 0)
459                         conn_cb->ksnr_connected &= ~BIT(type);
460                 break;
461         case SOCKLND_CONN_ANY:
462                 if (conn_cb->ksnr_conn_count == 0)
463                         conn_cb->ksnr_connected &= ~BIT(type);
464                 break;
465         default:
466                 LBUG();
467                 break;
468         }
469
470         CDEBUG(D_NET, "Del conn type %d, ksnr_connected %x ksnr_max_conns %d\n",
471                type, conn_cb->ksnr_connected, conn_cb->ksnr_max_conns);
472 }
473
474 static void
475 ksocknal_associate_cb_conn_locked(struct ksock_conn_cb *conn_cb,
476                                   struct ksock_conn *conn)
477 {
478         int type = conn->ksnc_type;
479
480         conn->ksnc_conn_cb = conn_cb;
481         ksocknal_conn_cb_addref(conn_cb);
482         ksocknal_incr_conn_count(conn_cb, type);
483
484         /* Successful connection => further attempts can
485          * proceed immediately
486          */
487         conn_cb->ksnr_retry_interval = 0;
488 }
489
490 static void
491 ksocknal_add_conn_cb_locked(struct ksock_peer_ni *peer_ni,
492                             struct ksock_conn_cb *conn_cb)
493 {
494         struct ksock_conn *conn;
495         struct ksock_net *net = peer_ni->ksnp_ni->ni_data;
496
497         LASSERT(!peer_ni->ksnp_closing);
498         LASSERT(!conn_cb->ksnr_peer);
499         LASSERT(!conn_cb->ksnr_scheduled);
500         LASSERT(!conn_cb->ksnr_connecting);
501         LASSERT(conn_cb->ksnr_connected == 0);
502
503         conn_cb->ksnr_peer = peer_ni;
504         ksocknal_peer_addref(peer_ni);
505
506         /* peer_ni's route list takes over my ref on 'route' */
507         peer_ni->ksnp_conn_cb = conn_cb;
508         net->ksnn_interface.ksni_nroutes++;
509
510         list_for_each_entry(conn, &peer_ni->ksnp_conns, ksnc_list) {
511                 if (!rpc_cmp_addr((struct sockaddr *)&conn->ksnc_peeraddr,
512                                   (struct sockaddr *)&conn_cb->ksnr_addr))
513                         continue;
514                 CDEBUG(D_NET, "call ksocknal_associate_cb_conn_locked\n");
515                 ksocknal_associate_cb_conn_locked(conn_cb, conn);
516                 /* keep going (typed conns) */
517         }
518 }
519
520 static void
521 ksocknal_del_conn_cb_locked(struct ksock_conn_cb *conn_cb)
522 {
523         struct ksock_peer_ni *peer_ni = conn_cb->ksnr_peer;
524         struct ksock_conn *conn;
525         struct ksock_conn *cnxt;
526         struct ksock_net *net;
527
528         LASSERT(!conn_cb->ksnr_deleted);
529
530         /* Close associated conns */
531         list_for_each_entry_safe(conn, cnxt, &peer_ni->ksnp_conns, ksnc_list) {
532                 if (conn->ksnc_conn_cb != conn_cb)
533                         continue;
534
535                 ksocknal_close_conn_locked(conn, 0);
536         }
537
538         net = (struct ksock_net *)(peer_ni->ksnp_ni->ni_data);
539         net->ksnn_interface.ksni_nroutes--;
540         LASSERT(net->ksnn_interface.ksni_nroutes >= 0);
541
542         conn_cb->ksnr_deleted = 1;
543         ksocknal_conn_cb_decref(conn_cb);               /* drop peer_ni's ref */
544         peer_ni->ksnp_conn_cb = NULL;
545
546         if (list_empty(&peer_ni->ksnp_conns)) {
547                 /* I've just removed the last route to a peer_ni with no active
548                  * connections
549                  */
550                 ksocknal_unlink_peer_locked(peer_ni);
551         }
552 }
553
554 unsigned int
555 ksocknal_get_conn_count_by_type(struct ksock_conn_cb *conn_cb,
556                                 int type)
557 {
558         unsigned int count = 0;
559
560         switch (type) {
561         case SOCKLND_CONN_CONTROL:
562                 count = conn_cb->ksnr_ctrl_conn_count;
563                 break;
564         case SOCKLND_CONN_BULK_IN:
565                 count = conn_cb->ksnr_blki_conn_count;
566                 break;
567         case SOCKLND_CONN_BULK_OUT:
568                 count = conn_cb->ksnr_blko_conn_count;
569                 break;
570         case SOCKLND_CONN_ANY:
571                 count = conn_cb->ksnr_conn_count;
572                 break;
573         default:
574                 LBUG();
575                 break;
576         }
577
578         return count;
579 }
580
581 int
582 ksocknal_add_peer(struct lnet_ni *ni, struct lnet_processid *id,
583                   struct sockaddr *addr)
584 {
585         struct ksock_peer_ni *peer_ni;
586         struct ksock_peer_ni *peer2;
587         struct ksock_conn_cb *conn_cb;
588
589         if (LNET_NID_IS_ANY(&id->nid) ||
590             id->pid == LNET_PID_ANY)
591                 return (-EINVAL);
592
593         /* Have a brand new peer_ni ready... */
594         peer_ni = ksocknal_create_peer(ni, id);
595         if (IS_ERR(peer_ni))
596                 return PTR_ERR(peer_ni);
597
598         conn_cb = ksocknal_create_conn_cb(addr);
599         if (!conn_cb) {
600                 ksocknal_peer_decref(peer_ni);
601                 return -ENOMEM;
602         }
603
604         write_lock_bh(&ksocknal_data.ksnd_global_lock);
605
606         /* always called with a ref on ni, so shutdown can't have started */
607         LASSERT(atomic_read(&((struct ksock_net *)ni->ni_data)->ksnn_npeers)
608                 >= 0);
609
610         peer2 = ksocknal_find_peer_locked(ni, id);
611         if (peer2 != NULL) {
612                 ksocknal_peer_decref(peer_ni);
613                 peer_ni = peer2;
614         } else {
615                 /* peer_ni table takes my ref on peer_ni */
616                 hash_add(ksocknal_data.ksnd_peers, &peer_ni->ksnp_list,
617                          nidhash(&id->nid));
618         }
619
620         if (peer_ni->ksnp_conn_cb) {
621                 ksocknal_conn_cb_decref(conn_cb);
622         } else {
623                 /* Remember conns_per_peer setting at the time
624                  * of connection initiation. It will define the
625                  * max number of conns per type for this conn_cb
626                  * while it's in use.
627                  */
628                 conn_cb->ksnr_max_conns = ksocknal_get_conns_per_peer(peer_ni);
629                 ksocknal_add_conn_cb_locked(peer_ni, conn_cb);
630         }
631
632         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
633
634         return 0;
635 }
636
637 static void
638 ksocknal_del_peer_locked(struct ksock_peer_ni *peer_ni)
639 {
640         struct ksock_conn *conn;
641         struct ksock_conn *cnxt;
642         struct ksock_conn_cb *conn_cb;
643
644         LASSERT(!peer_ni->ksnp_closing);
645
646         /* Extra ref prevents peer_ni disappearing until I'm done with it */
647         ksocknal_peer_addref(peer_ni);
648         conn_cb = peer_ni->ksnp_conn_cb;
649         if (conn_cb)
650                 ksocknal_del_conn_cb_locked(conn_cb);
651
652         list_for_each_entry_safe(conn, cnxt, &peer_ni->ksnp_conns,
653                                  ksnc_list)
654                 ksocknal_close_conn_locked(conn, 0);
655
656         ksocknal_peer_decref(peer_ni);
657         /* NB peer_ni unlinks itself when last conn/conn_cb is removed */
658 }
659
660 static int
661 ksocknal_del_peer(struct lnet_ni *ni, struct lnet_processid *id)
662 {
663         LIST_HEAD(zombies);
664         struct hlist_node *pnxt;
665         struct ksock_peer_ni *peer_ni;
666         int lo;
667         int hi;
668         int i;
669         int rc = -ENOENT;
670
671         write_lock_bh(&ksocknal_data.ksnd_global_lock);
672
673         if (id && !LNET_NID_IS_ANY(&id->nid)) {
674                 lo = hash_min(nidhash(&id->nid),
675                               HASH_BITS(ksocknal_data.ksnd_peers));
676                 hi = lo;
677         } else {
678                 lo = 0;
679                 hi = HASH_SIZE(ksocknal_data.ksnd_peers) - 1;
680         }
681
682         for (i = lo; i <= hi; i++) {
683                 hlist_for_each_entry_safe(peer_ni, pnxt,
684                                           &ksocknal_data.ksnd_peers[i],
685                                           ksnp_list) {
686                         if (peer_ni->ksnp_ni != ni)
687                                 continue;
688
689                         if (!((!id || LNET_NID_IS_ANY(&id->nid) ||
690                                nid_same(&peer_ni->ksnp_id.nid, &id->nid)) &&
691                               (!id || id->pid == LNET_PID_ANY ||
692                                peer_ni->ksnp_id.pid == id->pid)))
693                                 continue;
694
695                         ksocknal_peer_addref(peer_ni);  /* a ref for me... */
696
697                         ksocknal_del_peer_locked(peer_ni);
698
699                         if (peer_ni->ksnp_closing &&
700                             !list_empty(&peer_ni->ksnp_tx_queue)) {
701                                 LASSERT(list_empty(&peer_ni->ksnp_conns));
702                                 LASSERT(peer_ni->ksnp_conn_cb == NULL);
703
704                                 list_splice_init(&peer_ni->ksnp_tx_queue,
705                                                  &zombies);
706                         }
707
708                         ksocknal_peer_decref(peer_ni);  /* ...till here */
709
710                         rc = 0;                         /* matched! */
711                 }
712         }
713
714         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
715
716         ksocknal_txlist_done(ni, &zombies, -ENETDOWN);
717
718         return rc;
719 }
720
721 static struct ksock_conn *
722 ksocknal_get_conn_by_idx(struct lnet_ni *ni, int index)
723 {
724         struct ksock_peer_ni *peer_ni;
725         struct ksock_conn *conn;
726         int i;
727
728         read_lock(&ksocknal_data.ksnd_global_lock);
729
730         hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list) {
731                 LASSERT(!peer_ni->ksnp_closing);
732
733                 if (peer_ni->ksnp_ni != ni)
734                         continue;
735
736                 list_for_each_entry(conn, &peer_ni->ksnp_conns,
737                                     ksnc_list) {
738                         if (index-- > 0)
739                                 continue;
740
741                         ksocknal_conn_addref(conn);
742                         read_unlock(&ksocknal_data.ksnd_global_lock);
743                         return conn;
744                 }
745         }
746
747         read_unlock(&ksocknal_data.ksnd_global_lock);
748         return NULL;
749 }
750
751 static struct ksock_sched *
752 ksocknal_choose_scheduler_locked(unsigned int cpt)
753 {
754         struct ksock_sched *sched = ksocknal_data.ksnd_schedulers[cpt];
755         int i;
756
757         if (sched->kss_nthreads == 0) {
758                 cfs_percpt_for_each(sched, i, ksocknal_data.ksnd_schedulers) {
759                         if (sched->kss_nthreads > 0) {
760                                 CDEBUG(D_NET, "scheduler[%d] has no threads. selected scheduler[%d]\n",
761                                        cpt, sched->kss_cpt);
762                                 return sched;
763                         }
764                 }
765                 return NULL;
766         }
767
768         return sched;
769 }
770
771 int
772 ksocknal_accept(struct lnet_ni *ni, struct socket *sock)
773 {
774         struct ksock_connreq *cr;
775         int rc;
776         struct sockaddr_storage peer;
777
778         rc = lnet_sock_getaddr(sock, true, &peer);
779         if (rc != 0) {
780                 CERROR("Can't determine new connection's address\n");
781                 return rc;
782         }
783
784         LIBCFS_ALLOC(cr, sizeof(*cr));
785         if (cr == NULL) {
786                 LCONSOLE_ERROR_MSG(0x12f,
787                                    "Dropping connection request from %pISc: memory exhausted\n",
788                                    &peer);
789                 return -ENOMEM;
790         }
791
792         lnet_ni_addref(ni);
793         cr->ksncr_ni   = ni;
794         cr->ksncr_sock = sock;
795
796         spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
797
798         list_add_tail(&cr->ksncr_list, &ksocknal_data.ksnd_connd_connreqs);
799         wake_up(&ksocknal_data.ksnd_connd_waitq);
800
801         spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
802         return 0;
803 }
804
805 static const struct ln_key_list ksocknal_tunables_keys = {
806         .lkl_maxattr                    = LNET_NET_SOCKLND_TUNABLES_ATTR_MAX,
807         .lkl_list                       = {
808                 [LNET_NET_SOCKLND_TUNABLES_ATTR_CONNS_PER_PEER]  = {
809                         .lkp_value      = "conns_per_peer",
810                         .lkp_data_type  = NLA_U16
811                 },
812                 [LNET_NET_SOCKLND_TUNABLES_ATTR_LND_TIMEOUT]    = {
813                         .lkp_value      = "timeout",
814                         .lkp_data_type  = NLA_U32
815                 },
816                 [LNET_NET_SOCKLND_TUNABLES_ATTR_LND_TOS] = {
817                         .lkp_value      = "tos",
818                         .lkp_data_type  = NLA_S16,
819                 },
820         },
821 };
822
823 static int
824 ksocknal_nl_get(int cmd, struct sk_buff *msg, int type, void *data)
825 {
826         struct lnet_lnd_tunables *tun;
827         struct lnet_ni *ni = data;
828
829         if (!ni || !msg)
830                 return -EINVAL;
831
832          if (cmd != LNET_CMD_NETS || type != LNET_NET_LOCAL_NI_ATTR_LND_TUNABLES)
833                 return -EOPNOTSUPP;
834
835         tun = &ni->ni_lnd_tunables;
836         nla_put_u16(msg, LNET_NET_SOCKLND_TUNABLES_ATTR_CONNS_PER_PEER,
837                     tun->lnd_tun_u.lnd_sock.lnd_conns_per_peer);
838         nla_put_u32(msg, LNET_NET_SOCKLND_TUNABLES_ATTR_LND_TIMEOUT,
839                     ksocknal_timeout());
840         nla_put_s16(msg, LNET_NET_SOCKLND_TUNABLES_ATTR_LND_TOS,
841                     tun->lnd_tun_u.lnd_sock.lnd_tos);
842
843         return 0;
844 }
845
846 static inline void
847 ksocknal_nl_set_default(int cmd, int type, void *data)
848 {
849         struct lnet_lnd_tunables *tunables = data;
850         struct lnet_ioctl_config_socklnd_tunables *lt;
851         struct lnet_ioctl_config_socklnd_tunables *df;
852
853         lt = &tunables->lnd_tun_u.lnd_sock;
854         df = &ksock_default_tunables;
855         switch (type) {
856         case LNET_NET_SOCKLND_TUNABLES_ATTR_CONNS_PER_PEER:
857                 lt->lnd_conns_per_peer = df->lnd_conns_per_peer;
858                 break;
859         case LNET_NET_SOCKLND_TUNABLES_ATTR_LND_TIMEOUT:
860                 lt->lnd_timeout = df->lnd_timeout;
861                 fallthrough;
862         default:
863                 break;
864         }
865 }
866
867 static int
868 ksocknal_nl_set(int cmd, struct nlattr *attr, int type, void *data)
869 {
870         struct lnet_lnd_tunables *tunables = data;
871         int rc = 0;
872         s64 num;
873
874         if (cmd != LNET_CMD_NETS)
875                 return -EOPNOTSUPP;
876
877         if (!attr) {
878                 ksocknal_nl_set_default(cmd, type, data);
879                 return 0;
880         }
881
882         if (nla_type(attr) != LN_SCALAR_ATTR_INT_VALUE)
883                 return -EINVAL;
884
885         switch (type) {
886         case LNET_NET_SOCKLND_TUNABLES_ATTR_CONNS_PER_PEER:
887                 /* value values are 1 to 127. Zero mean calculate the value */
888                 num = nla_get_s64(attr);
889                 if (num > -1 && num < 128)
890                         tunables->lnd_tun_u.lnd_sock.lnd_conns_per_peer = num;
891                 else
892                         rc = -ERANGE;
893                 break;
894         case LNET_NET_SOCKLND_TUNABLES_ATTR_LND_TIMEOUT:
895                 num = nla_get_s64(attr);
896                 tunables->lnd_tun_u.lnd_sock.lnd_timeout = num;
897                 break;
898         case LNET_NET_SOCKLND_TUNABLES_ATTR_LND_TOS:
899                 num = nla_get_s64(attr);
900                 clamp_t(s64, num, -1, 0xff);
901                 tunables->lnd_tun_u.lnd_sock.lnd_tos = num;
902                 fallthrough;
903         default:
904                 break;
905         }
906
907         return rc;
908 }
909
910 static int
911 ksocknal_connecting(struct ksock_conn_cb *conn_cb, struct sockaddr *sa)
912 {
913         if (conn_cb &&
914             rpc_cmp_addr((struct sockaddr *)&conn_cb->ksnr_addr, sa))
915                 return conn_cb->ksnr_connecting;
916         return 0;
917 }
918
919 int
920 ksocknal_create_conn(struct lnet_ni *ni, struct ksock_conn_cb *conn_cb,
921                      struct socket *sock, int type)
922 {
923         rwlock_t *global_lock = &ksocknal_data.ksnd_global_lock;
924         LIST_HEAD(zombies);
925         struct lnet_processid peerid;
926         u64 incarnation;
927         struct ksock_conn *conn;
928         struct ksock_conn *conn2;
929         struct ksock_peer_ni *peer_ni = NULL;
930         struct ksock_peer_ni *peer2;
931         struct ksock_sched *sched;
932         struct ksock_hello_msg *hello;
933         int cpt;
934         struct ksock_tx *tx;
935         struct ksock_tx *txtmp;
936         int rc;
937         int rc2;
938         int active;
939         int num_dup = 0;
940         char *warn = NULL;
941
942         active = (conn_cb != NULL);
943
944         LASSERT(active == (type != SOCKLND_CONN_NONE));
945
946         LIBCFS_ALLOC(conn, sizeof(*conn));
947         if (conn == NULL) {
948                 rc = -ENOMEM;
949                 goto failed_0;
950         }
951
952         conn->ksnc_peer = NULL;
953         conn->ksnc_conn_cb = NULL;
954         conn->ksnc_sock = sock;
955         /* 2 ref, 1 for conn, another extra ref prevents socket
956          * being closed before establishment of connection */
957         refcount_set(&conn->ksnc_sock_refcount, 2);
958         conn->ksnc_type = type;
959         ksocknal_lib_save_callback(sock, conn);
960         refcount_set(&conn->ksnc_conn_refcount, 1); /* 1 ref for me */
961
962         conn->ksnc_rx_ready = 0;
963         conn->ksnc_rx_scheduled = 0;
964
965         INIT_LIST_HEAD(&conn->ksnc_tx_queue);
966         conn->ksnc_tx_ready = 0;
967         conn->ksnc_tx_scheduled = 0;
968         conn->ksnc_tx_carrier = NULL;
969         atomic_set (&conn->ksnc_tx_nob, 0);
970
971         LIBCFS_ALLOC(hello, offsetof(struct ksock_hello_msg,
972                                      kshm_ips[LNET_INTERFACES_NUM]));
973         if (hello == NULL) {
974                 rc = -ENOMEM;
975                 goto failed_1;
976         }
977
978         /* stash conn's local and remote addrs */
979         rc = ksocknal_lib_get_conn_addrs(conn);
980         if (rc != 0)
981                 goto failed_1;
982
983         /* Find out/confirm peer_ni's NID and connection type and get the
984          * vector of interfaces she's willing to let me connect to.
985          * Passive connections use the listener timeout since the peer_ni sends
986          * eagerly
987          */
988         if (active) {
989                 struct sockaddr_in *psa = (void *)&conn->ksnc_peeraddr;
990
991                 peer_ni = conn_cb->ksnr_peer;
992                 LASSERT(ni == peer_ni->ksnp_ni);
993
994                 /* Active connection sends HELLO eagerly */
995                 hello->kshm_nips = 0;
996                 peerid = peer_ni->ksnp_id;
997
998                 write_lock_bh(global_lock);
999                 conn->ksnc_proto = peer_ni->ksnp_proto;
1000                 write_unlock_bh(global_lock);
1001
1002                 if (conn->ksnc_proto == NULL) {
1003                         if (psa->sin_family == AF_INET6)
1004                                 conn->ksnc_proto = &ksocknal_protocol_v4x;
1005                         else if (psa->sin_family == AF_INET)
1006                                 conn->ksnc_proto = &ksocknal_protocol_v3x;
1007 #if SOCKNAL_VERSION_DEBUG
1008                         if (*ksocknal_tunables.ksnd_protocol == 2)
1009                                 conn->ksnc_proto = &ksocknal_protocol_v2x;
1010                         else if (*ksocknal_tunables.ksnd_protocol == 1)
1011                                 conn->ksnc_proto = &ksocknal_protocol_v1x;
1012 #endif
1013                 }
1014                 if (!conn->ksnc_proto) {
1015                         rc = -EPROTO;
1016                         goto failed_1;
1017                 }
1018
1019                 rc = ksocknal_send_hello(ni, conn, &peerid.nid, hello);
1020                 if (rc != 0)
1021                         goto failed_1;
1022         } else {
1023                 peerid.nid = LNET_ANY_NID;
1024                 peerid.pid = LNET_PID_ANY;
1025
1026                 /* Passive, get protocol from peer_ni */
1027                 conn->ksnc_proto = NULL;
1028         }
1029
1030         rc = ksocknal_recv_hello(ni, conn, hello, &peerid, &incarnation);
1031         if (rc < 0)
1032                 goto failed_1;
1033
1034         LASSERT(rc == 0 || active);
1035         LASSERT(conn->ksnc_proto != NULL);
1036         LASSERT(!LNET_NID_IS_ANY(&peerid.nid));
1037
1038         cpt = lnet_nid2cpt(&peerid.nid, ni);
1039
1040         if (active) {
1041                 ksocknal_peer_addref(peer_ni);
1042                 write_lock_bh(global_lock);
1043         } else {
1044                 peer_ni = ksocknal_create_peer(ni, &peerid);
1045                 if (IS_ERR(peer_ni)) {
1046                         rc = PTR_ERR(peer_ni);
1047                         goto failed_1;
1048                 }
1049
1050                 write_lock_bh(global_lock);
1051
1052                 /* called with a ref on ni, so shutdown can't have started */
1053                 LASSERT(atomic_read(&((struct ksock_net *)ni->ni_data)->ksnn_npeers) >= 0);
1054
1055                 peer2 = ksocknal_find_peer_locked(ni, &peerid);
1056                 if (peer2 == NULL) {
1057                         /* NB this puts an "empty" peer_ni in the peer_ni
1058                          * table (which takes my ref) */
1059                         hash_add(ksocknal_data.ksnd_peers,
1060                                  &peer_ni->ksnp_list, nidhash(&peerid.nid));
1061                 } else {
1062                         ksocknal_peer_decref(peer_ni);
1063                         peer_ni = peer2;
1064                 }
1065
1066                 /* +1 ref for me */
1067                 ksocknal_peer_addref(peer_ni);
1068                 peer_ni->ksnp_accepting++;
1069
1070                 /* Am I already connecting to this guy?  Resolve in
1071                  * favour of higher NID...
1072                  */
1073                 if (memcmp(&peerid.nid, &ni->ni_nid, sizeof(peerid.nid)) < 0 &&
1074                     ksocknal_connecting(peer_ni->ksnp_conn_cb,
1075                                         ((struct sockaddr *) &conn->ksnc_peeraddr))) {
1076                         rc = EALREADY;
1077                         warn = "connection race resolution";
1078                         goto failed_2;
1079                 }
1080         }
1081
1082         if (peer_ni->ksnp_closing ||
1083             (active && conn_cb->ksnr_deleted)) {
1084                 /* peer_ni/conn_cb got closed under me */
1085                 rc = -ESTALE;
1086                 warn = "peer_ni/conn_cb removed";
1087                 goto failed_2;
1088         }
1089
1090         if (peer_ni->ksnp_proto == NULL) {
1091                 /* Never connected before.
1092                  * NB recv_hello may have returned EPROTO to signal my peer_ni
1093                  * wants a different protocol than the one I asked for.
1094                  */
1095                 LASSERT(list_empty(&peer_ni->ksnp_conns));
1096
1097                 peer_ni->ksnp_proto = conn->ksnc_proto;
1098                 peer_ni->ksnp_incarnation = incarnation;
1099         }
1100
1101         if (peer_ni->ksnp_proto != conn->ksnc_proto ||
1102             peer_ni->ksnp_incarnation != incarnation) {
1103                 /* peer_ni rebooted or I've got the wrong protocol version */
1104                 ksocknal_close_peer_conns_locked(peer_ni, NULL, 0);
1105
1106                 peer_ni->ksnp_proto = NULL;
1107                 rc = ESTALE;
1108                 warn = peer_ni->ksnp_incarnation != incarnation ?
1109                         "peer_ni rebooted" :
1110                         "wrong proto version";
1111                 goto failed_2;
1112         }
1113
1114         switch (rc) {
1115         default:
1116                 LBUG();
1117         case 0:
1118                 break;
1119         case EALREADY:
1120                 warn = "lost conn race";
1121                 goto failed_2;
1122         case EPROTO:
1123                 warn = "retry with different protocol version";
1124                 goto failed_2;
1125         }
1126
1127         /* Refuse to duplicate an existing connection, unless this is a
1128          * loopback connection */
1129         if (!rpc_cmp_addr((struct sockaddr *)&conn->ksnc_peeraddr,
1130                           (struct sockaddr *)&conn->ksnc_myaddr)) {
1131                 list_for_each_entry(conn2, &peer_ni->ksnp_conns, ksnc_list) {
1132                         if (!rpc_cmp_addr(
1133                                     (struct sockaddr *)&conn2->ksnc_peeraddr,
1134                                     (struct sockaddr *)&conn->ksnc_peeraddr) ||
1135                             !rpc_cmp_addr(
1136                                     (struct sockaddr *)&conn2->ksnc_myaddr,
1137                                     (struct sockaddr *)&conn->ksnc_myaddr) ||
1138                             conn2->ksnc_type != conn->ksnc_type)
1139                                 continue;
1140
1141                         num_dup++;
1142                         /* If max conns per type is not registered in conn_cb
1143                          * as ksnr_max_conns, use ni's conns_per_peer
1144                          */
1145                         if ((peer_ni->ksnp_conn_cb &&
1146                             num_dup < peer_ni->ksnp_conn_cb->ksnr_max_conns) ||
1147                             (!peer_ni->ksnp_conn_cb &&
1148                             num_dup < ksocknal_get_conns_per_peer(peer_ni)))
1149                                 continue;
1150
1151                         /* Reply on a passive connection attempt so the peer_ni
1152                          * realises we're connected.
1153                          */
1154                         LASSERT(rc == 0);
1155                         if (!active)
1156                                 rc = EALREADY;
1157
1158                         warn = "duplicate";
1159                         goto failed_2;
1160                 }
1161         }
1162         /* If the connection created by this route didn't bind to the IP
1163          * address the route connected to, the connection/route matching
1164          * code below probably isn't going to work.
1165          */
1166         if (active &&
1167             !rpc_cmp_addr((struct sockaddr *)&conn_cb->ksnr_addr,
1168                           (struct sockaddr *)&conn->ksnc_peeraddr)) {
1169                 CERROR("Route %s %pISc connected to %pISc\n",
1170                        libcfs_idstr(&peer_ni->ksnp_id),
1171                        &conn_cb->ksnr_addr,
1172                        &conn->ksnc_peeraddr);
1173         }
1174
1175         /* Search for a conn_cb corresponding to the new connection and
1176          * create an association.  This allows incoming connections created
1177          * by conn_cbs in my peer_ni to match my own conn_cb entries so I don't
1178          * continually create duplicate conn_cbs.
1179          */
1180         conn_cb = peer_ni->ksnp_conn_cb;
1181
1182         if (conn_cb && rpc_cmp_addr((struct sockaddr *)&conn->ksnc_peeraddr,
1183                                     (struct sockaddr *)&conn_cb->ksnr_addr))
1184                 ksocknal_associate_cb_conn_locked(conn_cb, conn);
1185
1186         conn->ksnc_peer = peer_ni;                 /* conn takes my ref on peer_ni */
1187         peer_ni->ksnp_last_alive = ktime_get_seconds();
1188         peer_ni->ksnp_send_keepalive = 0;
1189         peer_ni->ksnp_error = 0;
1190
1191         sched = ksocknal_choose_scheduler_locked(cpt);
1192         if (!sched) {
1193                 CERROR("no schedulers available. node is unhealthy\n");
1194                 goto failed_2;
1195         }
1196         /*
1197          * The cpt might have changed if we ended up selecting a non cpt
1198          * native scheduler. So use the scheduler's cpt instead.
1199          */
1200         cpt = sched->kss_cpt;
1201         sched->kss_nconns++;
1202         conn->ksnc_scheduler = sched;
1203
1204         conn->ksnc_tx_last_post = ktime_get_seconds();
1205         /* Set the deadline for the outgoing HELLO to drain */
1206         conn->ksnc_tx_bufnob = sock->sk->sk_wmem_queued;
1207         conn->ksnc_tx_deadline = ktime_get_seconds() +
1208                                  ksocknal_timeout();
1209         smp_mb();   /* order with adding to peer_ni's conn list */
1210
1211         list_add(&conn->ksnc_list, &peer_ni->ksnp_conns);
1212         ksocknal_conn_addref(conn);
1213
1214         ksocknal_new_packet(conn, 0);
1215
1216         conn->ksnc_zc_capable = ksocknal_lib_zc_capable(conn);
1217
1218         /* Take packets blocking for this connection. */
1219         list_for_each_entry_safe(tx, txtmp, &peer_ni->ksnp_tx_queue, tx_list) {
1220                 if (conn->ksnc_proto->pro_match_tx(conn, tx, tx->tx_nonblk) ==
1221                     SOCKNAL_MATCH_NO)
1222                         continue;
1223
1224                 list_del(&tx->tx_list);
1225                 ksocknal_queue_tx_locked(tx, conn);
1226         }
1227
1228         write_unlock_bh(global_lock);
1229         /* We've now got a new connection.  Any errors from here on are just
1230          * like "normal" comms errors and we close the connection normally.
1231          * NB (a) we still have to send the reply HELLO for passive
1232          *        connections,
1233          *    (b) normal I/O on the conn is blocked until I setup and call the
1234          *        socket callbacks.
1235          */
1236
1237         CDEBUG(D_NET, "New conn %s p %d.x %pISc -> %pIScp"
1238                " incarnation:%lld sched[%d]\n",
1239                libcfs_idstr(&peerid), conn->ksnc_proto->pro_version,
1240                &conn->ksnc_myaddr, &conn->ksnc_peeraddr,
1241                incarnation, cpt);
1242
1243         if (!active) {
1244                 hello->kshm_nips = 0;
1245                 rc = ksocknal_send_hello(ni, conn, &peerid.nid, hello);
1246         }
1247
1248         LIBCFS_FREE(hello, offsetof(struct ksock_hello_msg,
1249                                     kshm_ips[LNET_INTERFACES_NUM]));
1250
1251         /* setup the socket AFTER I've received hello (it disables
1252          * SO_LINGER).  I might call back to the acceptor who may want
1253          * to send a protocol version response and then close the
1254          * socket; this ensures the socket only tears down after the
1255          * response has been sent.
1256          */
1257         if (rc == 0)
1258                 rc = ksocknal_lib_setup_sock(sock, ni);
1259
1260         write_lock_bh(global_lock);
1261
1262         /* NB my callbacks block while I hold ksnd_global_lock */
1263         ksocknal_lib_set_callback(sock, conn);
1264
1265         if (!active)
1266                 peer_ni->ksnp_accepting--;
1267
1268         write_unlock_bh(global_lock);
1269
1270         if (rc != 0) {
1271                 write_lock_bh(global_lock);
1272                 if (!conn->ksnc_closing) {
1273                         /* could be closed by another thread */
1274                         ksocknal_close_conn_locked(conn, rc);
1275                 }
1276                 write_unlock_bh(global_lock);
1277         } else if (ksocknal_connsock_addref(conn) == 0) {
1278                 /* Allow I/O to proceed. */
1279                 ksocknal_read_callback(conn);
1280                 ksocknal_write_callback(conn);
1281                 ksocknal_connsock_decref(conn);
1282         }
1283
1284         ksocknal_connsock_decref(conn);
1285         ksocknal_conn_decref(conn);
1286         return rc;
1287
1288 failed_2:
1289
1290         if (!peer_ni->ksnp_closing &&
1291             list_empty(&peer_ni->ksnp_conns) &&
1292             peer_ni->ksnp_conn_cb == NULL) {
1293                 list_splice_init(&peer_ni->ksnp_tx_queue, &zombies);
1294                 ksocknal_unlink_peer_locked(peer_ni);
1295         }
1296
1297         write_unlock_bh(global_lock);
1298
1299         if (warn != NULL) {
1300                 if (rc < 0)
1301                         CERROR("Not creating conn %s type %d: %s\n",
1302                                libcfs_idstr(&peerid), conn->ksnc_type, warn);
1303                 else
1304                         CDEBUG(D_NET, "Not creating conn %s type %d: %s\n",
1305                                libcfs_idstr(&peerid), conn->ksnc_type, warn);
1306         }
1307
1308         if (!active) {
1309                 if (rc > 0) {
1310                         /* Request retry by replying with CONN_NONE
1311                          * ksnc_proto has been set already
1312                          */
1313                         conn->ksnc_type = SOCKLND_CONN_NONE;
1314                         hello->kshm_nips = 0;
1315                         ksocknal_send_hello(ni, conn, &peerid.nid, hello);
1316                 }
1317
1318                 write_lock_bh(global_lock);
1319                 peer_ni->ksnp_accepting--;
1320                 write_unlock_bh(global_lock);
1321         }
1322
1323         /*
1324          * If we get here without an error code, just use -EALREADY.
1325          * Depending on how we got here, the error may be positive
1326          * or negative. Normalize the value for ksocknal_txlist_done().
1327          */
1328         rc2 = (rc == 0 ? -EALREADY : (rc > 0 ? -rc : rc));
1329         ksocknal_txlist_done(ni, &zombies, rc2);
1330         ksocknal_peer_decref(peer_ni);
1331
1332 failed_1:
1333         if (hello != NULL)
1334                 LIBCFS_FREE(hello, offsetof(struct ksock_hello_msg,
1335                                             kshm_ips[LNET_INTERFACES_NUM]));
1336
1337         LIBCFS_FREE(conn, sizeof(*conn));
1338
1339 failed_0:
1340         sock_release(sock);
1341
1342         return rc;
1343 }
1344
1345 void
1346 ksocknal_close_conn_locked(struct ksock_conn *conn, int error)
1347 {
1348         /* This just does the immmediate housekeeping, and queues the
1349          * connection for the reaper to terminate.
1350          * Caller holds ksnd_global_lock exclusively in irq context */
1351         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1352         struct ksock_conn_cb *conn_cb;
1353         struct ksock_conn *conn2;
1354         int conn_count;
1355         int duplicate_count = 0;
1356
1357         LASSERT(peer_ni->ksnp_error == 0);
1358         LASSERT(!conn->ksnc_closing);
1359         conn->ksnc_closing = 1;
1360
1361         /* ksnd_deathrow_conns takes over peer_ni's ref */
1362         list_del(&conn->ksnc_list);
1363
1364         conn_cb = conn->ksnc_conn_cb;
1365         if (conn_cb != NULL) {
1366                 /* dissociate conn from cb... */
1367                 LASSERT(!conn_cb->ksnr_deleted);
1368
1369                 conn_count = ksocknal_get_conn_count_by_type(conn_cb,
1370                                                              conn->ksnc_type);
1371                 /* connected bit is set only if all connections
1372                  * of the given type got created
1373                  */
1374                 if (conn_count == conn_cb->ksnr_max_conns)
1375                         LASSERT((conn_cb->ksnr_connected &
1376                                 BIT(conn->ksnc_type)) != 0);
1377
1378                 if (conn_count == 1) {
1379                         list_for_each_entry(conn2, &peer_ni->ksnp_conns,
1380                                             ksnc_list) {
1381                                 if (conn2->ksnc_conn_cb == conn_cb &&
1382                                     conn2->ksnc_type == conn->ksnc_type)
1383                                         duplicate_count += 1;
1384                         }
1385                         if (duplicate_count > 0)
1386                                 CERROR("Found %d duplicate conns type %d\n",
1387                                        duplicate_count,
1388                                        conn->ksnc_type);
1389                 }
1390                 ksocknal_decr_conn_count(conn_cb, conn->ksnc_type);
1391
1392                 conn->ksnc_conn_cb = NULL;
1393
1394                 /* drop conn's ref on conn_cb */
1395                 ksocknal_conn_cb_decref(conn_cb);
1396         }
1397
1398         if (list_empty(&peer_ni->ksnp_conns)) {
1399                 /* No more connections to this peer_ni */
1400
1401                 if (!list_empty(&peer_ni->ksnp_tx_queue)) {
1402                         struct ksock_tx *tx;
1403
1404                         LASSERT(conn->ksnc_proto == &ksocknal_protocol_v3x);
1405
1406                         /* throw them to the last connection...,
1407                          * these TXs will be send to /dev/null by scheduler */
1408                         list_for_each_entry(tx, &peer_ni->ksnp_tx_queue,
1409                                             tx_list)
1410                                 ksocknal_tx_prep(conn, tx);
1411
1412                         spin_lock_bh(&conn->ksnc_scheduler->kss_lock);
1413                         list_splice_init(&peer_ni->ksnp_tx_queue,
1414                                          &conn->ksnc_tx_queue);
1415                         spin_unlock_bh(&conn->ksnc_scheduler->kss_lock);
1416                 }
1417
1418                 /* renegotiate protocol version */
1419                 peer_ni->ksnp_proto = NULL;
1420                 /* stash last conn close reason */
1421                 peer_ni->ksnp_error = error;
1422
1423                 if (peer_ni->ksnp_conn_cb == NULL) {
1424                         /* I've just closed last conn belonging to a
1425                          * peer_ni with no connections to it
1426                          */
1427                         ksocknal_unlink_peer_locked(peer_ni);
1428                 }
1429         }
1430
1431         spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
1432
1433         list_add_tail(&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
1434         wake_up(&ksocknal_data.ksnd_reaper_waitq);
1435
1436         spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
1437 }
1438
1439 void
1440 ksocknal_peer_failed(struct ksock_peer_ni *peer_ni)
1441 {
1442         bool notify = false;
1443         time64_t last_alive = 0;
1444
1445         /* There has been a connection failure or comms error; but I'll only
1446          * tell LNET I think the peer_ni is dead if it's to another kernel and
1447          * there are no connections or connection attempts in existence. */
1448
1449         read_lock(&ksocknal_data.ksnd_global_lock);
1450
1451         if ((peer_ni->ksnp_id.pid & LNET_PID_USERFLAG) == 0 &&
1452              list_empty(&peer_ni->ksnp_conns) &&
1453              peer_ni->ksnp_accepting == 0 &&
1454              !ksocknal_find_connecting_conn_cb_locked(peer_ni)) {
1455                 notify = true;
1456                 last_alive = peer_ni->ksnp_last_alive;
1457         }
1458
1459         read_unlock(&ksocknal_data.ksnd_global_lock);
1460
1461         if (notify)
1462                 lnet_notify(peer_ni->ksnp_ni,
1463                             &peer_ni->ksnp_id.nid,
1464                             false, false, last_alive);
1465 }
1466
1467 void
1468 ksocknal_finalize_zcreq(struct ksock_conn *conn)
1469 {
1470         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1471         struct ksock_tx *tx;
1472         struct ksock_tx *tmp;
1473         LIST_HEAD(zlist);
1474
1475         /* NB safe to finalize TXs because closing of socket will
1476          * abort all buffered data */
1477         LASSERT(conn->ksnc_sock == NULL);
1478
1479         spin_lock(&peer_ni->ksnp_lock);
1480
1481         list_for_each_entry_safe(tx, tmp, &peer_ni->ksnp_zc_req_list,
1482                                  tx_zc_list) {
1483                 if (tx->tx_conn != conn)
1484                         continue;
1485
1486                 LASSERT(tx->tx_msg.ksm_zc_cookies[0] != 0);
1487
1488                 tx->tx_msg.ksm_zc_cookies[0] = 0;
1489                 tx->tx_zc_aborted = 1;  /* mark it as not-acked */
1490                 list_move(&tx->tx_zc_list, &zlist);
1491         }
1492
1493         spin_unlock(&peer_ni->ksnp_lock);
1494
1495         while ((tx = list_first_entry_or_null(&zlist, struct ksock_tx,
1496                                               tx_zc_list)) != NULL) {
1497                 list_del(&tx->tx_zc_list);
1498                 ksocknal_tx_decref(tx);
1499         }
1500 }
1501
1502 void
1503 ksocknal_terminate_conn(struct ksock_conn *conn)
1504 {
1505         /* This gets called by the reaper (guaranteed thread context) to
1506          * disengage the socket from its callbacks and close it.
1507          * ksnc_refcount will eventually hit zero, and then the reaper will
1508          * destroy it.
1509          */
1510         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1511         struct ksock_sched *sched = conn->ksnc_scheduler;
1512         bool failed = false;
1513
1514         LASSERT(conn->ksnc_closing);
1515
1516         /* wake up the scheduler to "send" all remaining packets to /dev/null */
1517         spin_lock_bh(&sched->kss_lock);
1518
1519         /* a closing conn is always ready to tx */
1520         conn->ksnc_tx_ready = 1;
1521
1522         if (!conn->ksnc_tx_scheduled &&
1523             !list_empty(&conn->ksnc_tx_queue)) {
1524                 list_add_tail(&conn->ksnc_tx_list,
1525                               &sched->kss_tx_conns);
1526                 conn->ksnc_tx_scheduled = 1;
1527                 /* extra ref for scheduler */
1528                 ksocknal_conn_addref(conn);
1529
1530                 wake_up(&sched->kss_waitq);
1531         }
1532
1533         spin_unlock_bh(&sched->kss_lock);
1534
1535         /* serialise with callbacks */
1536         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1537
1538         ksocknal_lib_reset_callback(conn->ksnc_sock, conn);
1539
1540         /* OK, so this conn may not be completely disengaged from its
1541          * scheduler yet, but it _has_ committed to terminate...
1542          */
1543         conn->ksnc_scheduler->kss_nconns--;
1544
1545         if (peer_ni->ksnp_error != 0) {
1546                 /* peer_ni's last conn closed in error */
1547                 LASSERT(list_empty(&peer_ni->ksnp_conns));
1548                 failed = true;
1549                 peer_ni->ksnp_error = 0;     /* avoid multiple notifications */
1550         }
1551
1552         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1553
1554         if (failed)
1555                 ksocknal_peer_failed(peer_ni);
1556
1557         /* The socket is closed on the final put; either here, or in
1558          * ksocknal_{send,recv}msg().  Since we set up the linger2 option
1559          * when the connection was established, this will close the socket
1560          * immediately, aborting anything buffered in it. Any hung
1561          * zero-copy transmits will therefore complete in finite time.
1562          */
1563         ksocknal_connsock_decref(conn);
1564 }
1565
1566 void
1567 ksocknal_queue_zombie_conn(struct ksock_conn *conn)
1568 {
1569         /* Queue the conn for the reaper to destroy */
1570         LASSERT(refcount_read(&conn->ksnc_conn_refcount) == 0);
1571         spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
1572
1573         list_add_tail(&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1574         wake_up(&ksocknal_data.ksnd_reaper_waitq);
1575
1576         spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
1577 }
1578
1579 void
1580 ksocknal_destroy_conn(struct ksock_conn *conn)
1581 {
1582         time64_t last_rcv;
1583
1584         /* Final coup-de-grace of the reaper */
1585         CDEBUG(D_NET, "connection %p\n", conn);
1586
1587         LASSERT(refcount_read(&conn->ksnc_conn_refcount) == 0);
1588         LASSERT(refcount_read(&conn->ksnc_sock_refcount) == 0);
1589         LASSERT(conn->ksnc_sock == NULL);
1590         LASSERT(conn->ksnc_conn_cb == NULL);
1591         LASSERT(!conn->ksnc_tx_scheduled);
1592         LASSERT(!conn->ksnc_rx_scheduled);
1593         LASSERT(list_empty(&conn->ksnc_tx_queue));
1594
1595         /* complete current receive if any */
1596         switch (conn->ksnc_rx_state) {
1597         case SOCKNAL_RX_LNET_PAYLOAD:
1598                 last_rcv = conn->ksnc_rx_deadline -
1599                            ksocknal_timeout();
1600                 CERROR("Completing partial receive from %s[%d], ip %pIScp, with error, wanted: %d, left: %d, last alive is %lld secs ago\n",
1601                        libcfs_idstr(&conn->ksnc_peer->ksnp_id),
1602                        conn->ksnc_type,
1603                        &conn->ksnc_peeraddr,
1604                        conn->ksnc_rx_nob_wanted, conn->ksnc_rx_nob_left,
1605                        ktime_get_seconds() - last_rcv);
1606                 if (conn->ksnc_lnet_msg)
1607                         conn->ksnc_lnet_msg->msg_health_status =
1608                                 LNET_MSG_STATUS_REMOTE_ERROR;
1609                 lnet_finalize(conn->ksnc_lnet_msg, -EIO);
1610                 break;
1611         case SOCKNAL_RX_LNET_HEADER:
1612                 if (conn->ksnc_rx_started)
1613                         CERROR("Incomplete receive of lnet header from %s, ip %pIScp, with error, protocol: %d.x.\n",
1614                                libcfs_idstr(&conn->ksnc_peer->ksnp_id),
1615                                &conn->ksnc_peeraddr,
1616                                conn->ksnc_proto->pro_version);
1617                 break;
1618         case SOCKNAL_RX_KSM_HEADER:
1619                 if (conn->ksnc_rx_started)
1620                         CERROR("Incomplete receive of ksock message from %s, ip %pIScp, with error, protocol: %d.x.\n",
1621                                libcfs_idstr(&conn->ksnc_peer->ksnp_id),
1622                                &conn->ksnc_peeraddr,
1623                                conn->ksnc_proto->pro_version);
1624                 break;
1625         case SOCKNAL_RX_SLOP:
1626                 if (conn->ksnc_rx_started)
1627                         CERROR("Incomplete receive of slops from %s, ip %pIScp, with error\n",
1628                                libcfs_idstr(&conn->ksnc_peer->ksnp_id),
1629                                &conn->ksnc_peeraddr);
1630                 break;
1631         default:
1632                 LBUG();
1633                 break;
1634         }
1635
1636         ksocknal_peer_decref(conn->ksnc_peer);
1637
1638         LIBCFS_FREE(conn, sizeof(*conn));
1639 }
1640
1641 int
1642 ksocknal_close_peer_conns_locked(struct ksock_peer_ni *peer_ni,
1643                                  struct sockaddr *addr, int why)
1644 {
1645         struct ksock_conn *conn;
1646         struct ksock_conn *cnxt;
1647         int count = 0;
1648
1649         list_for_each_entry_safe(conn, cnxt, &peer_ni->ksnp_conns, ksnc_list) {
1650                 if (!addr ||
1651                     rpc_cmp_addr(addr,
1652                                  (struct sockaddr *)&conn->ksnc_peeraddr)) {
1653                         count++;
1654                         ksocknal_close_conn_locked(conn, why);
1655                 }
1656         }
1657
1658         return count;
1659 }
1660
1661 int
1662 ksocknal_close_conn_and_siblings(struct ksock_conn *conn, int why)
1663 {
1664         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1665         int count;
1666
1667         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1668
1669         count = ksocknal_close_peer_conns_locked(
1670                 peer_ni, (struct sockaddr *)&conn->ksnc_peeraddr, why);
1671
1672         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1673
1674         return count;
1675 }
1676
1677 int
1678 ksocknal_close_matching_conns(struct lnet_processid *id, __u32 ipaddr)
1679 {
1680         struct ksock_peer_ni *peer_ni;
1681         struct hlist_node *pnxt;
1682         int lo;
1683         int hi;
1684         int i;
1685         int count = 0;
1686         struct sockaddr_in sa = {.sin_family = AF_INET};
1687
1688         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1689
1690         if (!LNET_NID_IS_ANY(&id->nid)) {
1691                 lo = hash_min(nidhash(&id->nid),
1692                               HASH_BITS(ksocknal_data.ksnd_peers));
1693                 hi = lo;
1694         } else {
1695                 lo = 0;
1696                 hi = HASH_SIZE(ksocknal_data.ksnd_peers) - 1;
1697         }
1698
1699         sa.sin_addr.s_addr = htonl(ipaddr);
1700         for (i = lo; i <= hi; i++) {
1701                 hlist_for_each_entry_safe(peer_ni, pnxt,
1702                                           &ksocknal_data.ksnd_peers[i],
1703                                           ksnp_list) {
1704
1705                         if (!((LNET_NID_IS_ANY(&id->nid) ||
1706                                nid_same(&id->nid, &peer_ni->ksnp_id.nid)) &&
1707                               (id->pid == LNET_PID_ANY ||
1708                                id->pid == peer_ni->ksnp_id.pid)))
1709                                 continue;
1710
1711                         count += ksocknal_close_peer_conns_locked(
1712                                 peer_ni,
1713                                 ipaddr ? (struct sockaddr *)&sa : NULL, 0);
1714                 }
1715         }
1716
1717         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1718
1719         /* wildcards always succeed */
1720         if (LNET_NID_IS_ANY(&id->nid) || id->pid == LNET_PID_ANY ||
1721             ipaddr == 0)
1722                 return 0;
1723
1724         return (count == 0 ? -ENOENT : 0);
1725 }
1726
1727 static void
1728 ksocknal_notify_gw_down(struct lnet_nid *gw_nid)
1729 {
1730         /* The router is telling me she's been notified of a change in
1731          * gateway state....
1732          */
1733         struct lnet_processid id = {
1734                 .pid    = LNET_PID_ANY,
1735                 .nid    = *gw_nid,
1736         };
1737
1738         CDEBUG(D_NET, "gw %s down\n", libcfs_nidstr(gw_nid));
1739
1740         /* If the gateway crashed, close all open connections... */
1741         ksocknal_close_matching_conns(&id, 0);
1742         return;
1743
1744         /* We can only establish new connections
1745          * if we have autroutes, and these connect on demand.
1746          */
1747 }
1748
1749 static void
1750 ksocknal_push_peer(struct ksock_peer_ni *peer_ni)
1751 {
1752         int index;
1753         int i;
1754         struct ksock_conn *conn;
1755
1756         for (index = 0; ; index++) {
1757                 read_lock(&ksocknal_data.ksnd_global_lock);
1758
1759                 i = 0;
1760                 conn = NULL;
1761
1762                 list_for_each_entry(conn, &peer_ni->ksnp_conns, ksnc_list) {
1763                         if (i++ == index) {
1764                                 ksocknal_conn_addref(conn);
1765                                 break;
1766                         }
1767                 }
1768
1769                 read_unlock(&ksocknal_data.ksnd_global_lock);
1770
1771                 if (i <= index)
1772                         break;
1773
1774                 ksocknal_lib_push_conn (conn);
1775                 ksocknal_conn_decref(conn);
1776         }
1777 }
1778
1779 static int
1780 ksocknal_push(struct lnet_ni *ni, struct lnet_processid *id)
1781 {
1782         int lo;
1783         int hi;
1784         int bkt;
1785         int rc = -ENOENT;
1786
1787         if (!LNET_NID_IS_ANY(&id->nid)) {
1788                 lo = hash_min(nidhash(&id->nid),
1789                               HASH_BITS(ksocknal_data.ksnd_peers));
1790                 hi = lo;
1791         } else {
1792                 lo = 0;
1793                 hi = HASH_SIZE(ksocknal_data.ksnd_peers) - 1;
1794         }
1795
1796         for (bkt = lo; bkt <= hi; bkt++) {
1797                 int peer_off; /* searching offset in peer_ni hash table */
1798
1799                 for (peer_off = 0; ; peer_off++) {
1800                         struct ksock_peer_ni *peer_ni;
1801                         int           i = 0;
1802
1803                         read_lock(&ksocknal_data.ksnd_global_lock);
1804                         hlist_for_each_entry(peer_ni,
1805                                              &ksocknal_data.ksnd_peers[bkt],
1806                                              ksnp_list) {
1807                                 if (!((LNET_NID_IS_ANY(&id->nid) ||
1808                                        nid_same(&id->nid,
1809                                                  &peer_ni->ksnp_id.nid)) &&
1810                                       (id->pid == LNET_PID_ANY ||
1811                                        id->pid == peer_ni->ksnp_id.pid)))
1812                                         continue;
1813
1814                                 if (i++ == peer_off) {
1815                                         ksocknal_peer_addref(peer_ni);
1816                                         break;
1817                                 }
1818                         }
1819                         read_unlock(&ksocknal_data.ksnd_global_lock);
1820
1821                         if (i <= peer_off) /* no match */
1822                                 break;
1823
1824                         rc = 0;
1825                         ksocknal_push_peer(peer_ni);
1826                         ksocknal_peer_decref(peer_ni);
1827                 }
1828         }
1829         return rc;
1830 }
1831
1832 int
1833 ksocknal_ctl(struct lnet_ni *ni, unsigned int cmd, void *arg)
1834 {
1835         struct lnet_processid id = {};
1836         struct libcfs_ioctl_data *data = arg;
1837         int rc;
1838
1839         switch(cmd) {
1840         case IOC_LIBCFS_GET_INTERFACE: {
1841                 struct ksock_net *net = ni->ni_data;
1842                 struct ksock_interface *iface;
1843                 struct sockaddr_in *sa;
1844
1845                 read_lock(&ksocknal_data.ksnd_global_lock);
1846
1847                 if (data->ioc_count >= 1) {
1848                         rc = -ENOENT;
1849                 } else {
1850                         rc = 0;
1851                         iface = &net->ksnn_interface;
1852
1853                         sa = (void *)&iface->ksni_addr;
1854                         if (sa->sin_family == AF_INET) {
1855                                 data->ioc_u32[0] = ntohl(sa->sin_addr.s_addr);
1856                                 data->ioc_u32[1] = iface->ksni_netmask;
1857                         } else {
1858                                 data->ioc_u32[0] = 0xFFFFFFFF;
1859                                 data->ioc_u32[1] = 0;
1860                         }
1861                         data->ioc_u32[2] = iface->ksni_npeers;
1862                         data->ioc_u32[3] = iface->ksni_nroutes;
1863                 }
1864
1865                 read_unlock(&ksocknal_data.ksnd_global_lock);
1866                 return rc;
1867         }
1868
1869         case IOC_LIBCFS_GET_PEER: {
1870                 __u32            myip = 0;
1871                 __u32            ip = 0;
1872                 int              port = 0;
1873                 int              conn_count = 0;
1874                 int              share_count = 0;
1875
1876                 rc = ksocknal_get_peer_info(ni, data->ioc_count,
1877                                             &id, &myip, &ip, &port,
1878                                             &conn_count,  &share_count);
1879                 if (rc != 0)
1880                         return rc;
1881
1882                 if (!nid_is_nid4(&id.nid))
1883                         return -EINVAL;
1884                 data->ioc_nid    = lnet_nid_to_nid4(&id.nid);
1885                 data->ioc_count  = share_count;
1886                 data->ioc_u32[0] = ip;
1887                 data->ioc_u32[1] = port;
1888                 data->ioc_u32[2] = myip;
1889                 data->ioc_u32[3] = conn_count;
1890                 data->ioc_u32[4] = id.pid;
1891                 return 0;
1892         }
1893
1894         case IOC_LIBCFS_ADD_PEER: {
1895                 struct sockaddr_in sa = {.sin_family = AF_INET};
1896
1897                 id.pid = LNET_PID_LUSTRE;
1898                 lnet_nid4_to_nid(data->ioc_nid, &id.nid);
1899                 sa.sin_addr.s_addr = htonl(data->ioc_u32[0]);
1900                 sa.sin_port = htons(data->ioc_u32[1]);
1901                 return ksocknal_add_peer(ni, &id, (struct sockaddr *)&sa);
1902         }
1903         case IOC_LIBCFS_DEL_PEER:
1904                 lnet_nid4_to_nid(data->ioc_nid, &id.nid);
1905                 id.pid = LNET_PID_ANY;
1906                 return ksocknal_del_peer(ni, &id);
1907
1908         case IOC_LIBCFS_GET_CONN: {
1909                 int           txmem;
1910                 int           rxmem;
1911                 int           nagle;
1912                 struct ksock_conn *conn = ksocknal_get_conn_by_idx(ni, data->ioc_count);
1913                 struct sockaddr_in *psa = (void *)&conn->ksnc_peeraddr;
1914                 struct sockaddr_in *mysa = (void *)&conn->ksnc_myaddr;
1915
1916                 if (conn == NULL)
1917                         return -ENOENT;
1918
1919                 ksocknal_lib_get_conn_tunables(conn, &txmem, &rxmem, &nagle);
1920
1921                 data->ioc_count = txmem;
1922                 data->ioc_nid = lnet_nid_to_nid4(&conn->ksnc_peer->ksnp_id.nid);
1923                 data->ioc_flags = nagle;
1924                 if (psa->sin_family == AF_INET)
1925                         data->ioc_u32[0] = ntohl(psa->sin_addr.s_addr);
1926                 else
1927                         data->ioc_u32[0] = 0xFFFFFFFF;
1928                 data->ioc_u32[1] = rpc_get_port((struct sockaddr *)
1929                                                 &conn->ksnc_peeraddr);
1930                 if (mysa->sin_family == AF_INET)
1931                         data->ioc_u32[2] = ntohl(mysa->sin_addr.s_addr);
1932                 else
1933                         data->ioc_u32[2] = 0xFFFFFFFF;
1934                 data->ioc_u32[3] = conn->ksnc_type;
1935                 data->ioc_u32[4] = conn->ksnc_scheduler->kss_cpt;
1936                 data->ioc_u32[5] = rxmem;
1937                 data->ioc_u32[6] = conn->ksnc_peer->ksnp_id.pid;
1938                 ksocknal_conn_decref(conn);
1939                 return 0;
1940         }
1941
1942         case IOC_LIBCFS_CLOSE_CONNECTION:
1943                 lnet_nid4_to_nid(data->ioc_nid, &id.nid);
1944                 id.pid = LNET_PID_ANY;
1945                 return ksocknal_close_matching_conns(&id,
1946                                                      data->ioc_u32[0]);
1947
1948         case IOC_LIBCFS_REGISTER_MYNID:
1949                 /* Ignore if this is a noop */
1950                 if (nid_is_nid4(&ni->ni_nid) &&
1951                     data->ioc_nid == lnet_nid_to_nid4(&ni->ni_nid))
1952                         return 0;
1953
1954                 CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID: %s(%s)\n",
1955                        libcfs_nid2str(data->ioc_nid),
1956                        libcfs_nidstr(&ni->ni_nid));
1957                 return -EINVAL;
1958
1959         case IOC_LIBCFS_PUSH_CONNECTION:
1960                 lnet_nid4_to_nid(data->ioc_nid, &id.nid);
1961                 id.pid = LNET_PID_ANY;
1962                 return ksocknal_push(ni, &id);
1963
1964         default:
1965                 return -EINVAL;
1966         }
1967         /* not reached */
1968 }
1969
1970 static void
1971 ksocknal_free_buffers (void)
1972 {
1973         LASSERT (atomic_read(&ksocknal_data.ksnd_nactive_txs) == 0);
1974
1975         if (ksocknal_data.ksnd_schedulers != NULL)
1976                 cfs_percpt_free(ksocknal_data.ksnd_schedulers);
1977
1978         spin_lock(&ksocknal_data.ksnd_tx_lock);
1979
1980         if (!list_empty(&ksocknal_data.ksnd_idle_noop_txs)) {
1981                 LIST_HEAD(zlist);
1982                 struct ksock_tx *tx;
1983
1984                 list_splice_init(&ksocknal_data.ksnd_idle_noop_txs, &zlist);
1985                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
1986
1987                 while ((tx = list_first_entry_or_null(&zlist, struct ksock_tx,
1988                                                       tx_list)) != NULL) {
1989                         list_del(&tx->tx_list);
1990                         LIBCFS_FREE(tx, tx->tx_desc_size);
1991                 }
1992         } else {
1993                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
1994         }
1995 }
1996
1997 static int
1998 ksocknal_handle_link_state_change(struct net_device *dev,
1999                                   unsigned char operstate)
2000 {
2001         struct lnet_ni *ni = NULL;
2002         struct ksock_net *net;
2003         struct ksock_net *cnxt;
2004         int ifindex;
2005         unsigned char link_down;
2006         struct in_device *in_dev;
2007         bool found_ip = false;
2008         struct ksock_interface *ksi = NULL;
2009         struct sockaddr_in *sa;
2010         __u32 ni_state_before;
2011         bool update_ping_buf = false;
2012         int state;
2013         DECLARE_CONST_IN_IFADDR(ifa);
2014
2015         link_down = !((operstate == IF_OPER_UP) || (operstate == IF_OPER_UNKNOWN));
2016         ifindex = dev->ifindex;
2017
2018         if (!ksocknal_data.ksnd_nnets)
2019                 goto out;
2020
2021         list_for_each_entry_safe(net, cnxt, &ksocknal_data.ksnd_nets,
2022                                  ksnn_list) {
2023
2024                 ksi = &net->ksnn_interface;
2025                 sa = (void *)&ksi->ksni_addr;
2026                 found_ip = false;
2027
2028                 if (strcmp(ksi->ksni_name, dev->name))
2029                         continue;
2030
2031                 if (ksi->ksni_index == -1) {
2032                         if (dev->reg_state != NETREG_REGISTERED)
2033                                 continue;
2034                         /* A registration just happened: save the new index for
2035                          * the device */
2036                         ksi->ksni_index = ifindex;
2037                         goto out;
2038                 }
2039
2040                 if (ksi->ksni_index != ifindex)
2041                         continue;
2042
2043                 if (dev->reg_state == NETREG_UNREGISTERING) {
2044                         /* Device is being unregistered, we need to clear the
2045                          * index, it can change when device will be back */
2046                         ksi->ksni_index = -1;
2047                         goto out;
2048                 }
2049
2050                 ni = net->ksnn_ni;
2051
2052                 in_dev = __in_dev_get_rtnl(dev);
2053                 if (!in_dev) {
2054                         CDEBUG(D_NET, "Interface %s has no IPv4 status.\n",
2055                                dev->name);
2056                         ni_state_before = lnet_set_link_fatal_state(ni, 1);
2057                         goto ni_done;
2058                 }
2059                 in_dev_for_each_ifa_rtnl(ifa, in_dev) {
2060                         if (sa->sin_addr.s_addr == ifa->ifa_local)
2061                                 found_ip = true;
2062                 }
2063                 endfor_ifa(in_dev);
2064
2065                 if (!found_ip) {
2066                         CDEBUG(D_NET, "Interface %s has no matching ip\n",
2067                                dev->name);
2068                         ni_state_before = lnet_set_link_fatal_state(ni, 1);
2069                         goto ni_done;
2070                 }
2071
2072                 if (link_down) {
2073                         ni_state_before = lnet_set_link_fatal_state(ni, 1);
2074                 } else {
2075                         state = (lnet_get_link_status(dev) == 0);
2076                         ni_state_before = lnet_set_link_fatal_state(ni,
2077                                                                     state);
2078                 }
2079 ni_done:
2080                 if (!update_ping_buf &&
2081                     (ni->ni_state == LNET_NI_STATE_ACTIVE) &&
2082                     (atomic_read(&ni->ni_fatal_error_on) != ni_state_before))
2083                         update_ping_buf = true;
2084         }
2085
2086         if (update_ping_buf)
2087                 lnet_mark_ping_buffer_for_update();
2088 out:
2089         return 0;
2090 }
2091
2092
2093 static int
2094 ksocknal_handle_inetaddr_change(struct in_ifaddr *ifa, unsigned long event)
2095 {
2096         struct lnet_ni *ni = NULL;
2097         struct ksock_net *net;
2098         struct ksock_net *cnxt;
2099         struct net_device *event_netdev = ifa->ifa_dev->dev;
2100         int ifindex;
2101         struct ksock_interface *ksi = NULL;
2102         struct sockaddr_in *sa;
2103         __u32 ni_state_before;
2104         bool update_ping_buf = false;
2105         bool link_down;
2106
2107         if (!ksocknal_data.ksnd_nnets)
2108                 goto out;
2109
2110         ifindex = event_netdev->ifindex;
2111
2112         list_for_each_entry_safe(net, cnxt, &ksocknal_data.ksnd_nets,
2113                                  ksnn_list) {
2114
2115                 ksi = &net->ksnn_interface;
2116                 sa = (void *)&ksi->ksni_addr;
2117
2118                 if (ksi->ksni_index != ifindex ||
2119                     strcmp(ksi->ksni_name, event_netdev->name))
2120                         continue;
2121
2122                 if (sa->sin_addr.s_addr == ifa->ifa_local) {
2123                         ni = net->ksnn_ni;
2124                         link_down = (event == NETDEV_DOWN);
2125                         ni_state_before = lnet_set_link_fatal_state(ni,
2126                                                                     link_down);
2127
2128                         if (!update_ping_buf &&
2129                             (ni->ni_state == LNET_NI_STATE_ACTIVE) &&
2130                             ((event == NETDEV_DOWN) != ni_state_before))
2131                                 update_ping_buf = true;
2132                 }
2133         }
2134
2135         if (update_ping_buf)
2136                 lnet_mark_ping_buffer_for_update();
2137 out:
2138         return 0;
2139 }
2140
2141 /************************************
2142  * Net device notifier event handler
2143  ************************************/
2144 static int ksocknal_device_event(struct notifier_block *unused,
2145                                  unsigned long event, void *ptr)
2146 {
2147         struct net_device *dev = netdev_notifier_info_to_dev(ptr);
2148         unsigned char operstate;
2149
2150         operstate = dev->operstate;
2151
2152         CDEBUG(D_NET, "devevent: status=%ld, iface=%s ifindex %d state %u\n",
2153                event, dev->name, dev->ifindex, operstate);
2154
2155         switch (event) {
2156         case NETDEV_UP:
2157         case NETDEV_DOWN:
2158         case NETDEV_CHANGE:
2159         case NETDEV_REGISTER:
2160         case NETDEV_UNREGISTER:
2161                 ksocknal_handle_link_state_change(dev, operstate);
2162                 break;
2163         }
2164
2165         return NOTIFY_OK;
2166 }
2167
2168 /************************************
2169  * Inetaddr notifier event handler
2170  ************************************/
2171 static int ksocknal_inetaddr_event(struct notifier_block *unused,
2172                                    unsigned long event, void *ptr)
2173 {
2174         struct in_ifaddr *ifa = ptr;
2175
2176         CDEBUG(D_NET, "addrevent: status %ld ip addr %pI4, netmask %pI4.\n",
2177                event, &ifa->ifa_address, &ifa->ifa_mask);
2178
2179         switch (event) {
2180         case NETDEV_UP:
2181         case NETDEV_DOWN:
2182         case NETDEV_CHANGE:
2183                 ksocknal_handle_inetaddr_change(ifa, event);
2184                 break;
2185
2186         }
2187         return NOTIFY_OK;
2188 }
2189
2190 static struct notifier_block ksocknal_dev_notifier_block = {
2191         .notifier_call = ksocknal_device_event,
2192 };
2193
2194 static struct notifier_block ksocknal_inetaddr_notifier_block = {
2195         .notifier_call = ksocknal_inetaddr_event,
2196 };
2197
2198 static void
2199 ksocknal_base_shutdown(void)
2200 {
2201         struct ksock_sched *sched;
2202         struct ksock_peer_ni *peer_ni;
2203         int i;
2204
2205         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %lld\n",
2206                libcfs_kmem_read());
2207         LASSERT (ksocknal_data.ksnd_nnets == 0);
2208
2209         if (ksocknal_data.ksnd_init == SOCKNAL_INIT_ALL) {
2210                 unregister_netdevice_notifier(&ksocknal_dev_notifier_block);
2211                 unregister_inetaddr_notifier(&ksocknal_inetaddr_notifier_block);
2212         }
2213
2214         switch (ksocknal_data.ksnd_init) {
2215         default:
2216                 LASSERT(0);
2217                 fallthrough;
2218
2219         case SOCKNAL_INIT_ALL:
2220         case SOCKNAL_INIT_DATA:
2221                 hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list)
2222                         LASSERT(0);
2223
2224                 LASSERT(list_empty(&ksocknal_data.ksnd_nets));
2225                 LASSERT(list_empty(&ksocknal_data.ksnd_enomem_conns));
2226                 LASSERT(list_empty(&ksocknal_data.ksnd_zombie_conns));
2227                 LASSERT(list_empty(&ksocknal_data.ksnd_connd_connreqs));
2228                 LASSERT(list_empty(&ksocknal_data.ksnd_connd_routes));
2229
2230                 if (ksocknal_data.ksnd_schedulers != NULL) {
2231                         cfs_percpt_for_each(sched, i,
2232                                             ksocknal_data.ksnd_schedulers) {
2233
2234                                 LASSERT(list_empty(&sched->kss_tx_conns));
2235                                 LASSERT(list_empty(&sched->kss_rx_conns));
2236                                 LASSERT(list_empty(&sched->kss_zombie_noop_txs));
2237                                 LASSERT(sched->kss_nconns == 0);
2238                         }
2239                 }
2240
2241                 /* flag threads to terminate; wake and wait for them to die */
2242                 ksocknal_data.ksnd_shuttingdown = 1;
2243                 wake_up_all(&ksocknal_data.ksnd_connd_waitq);
2244                 wake_up(&ksocknal_data.ksnd_reaper_waitq);
2245
2246                 if (ksocknal_data.ksnd_schedulers != NULL) {
2247                         cfs_percpt_for_each(sched, i,
2248                                             ksocknal_data.ksnd_schedulers)
2249                                         wake_up_all(&sched->kss_waitq);
2250                 }
2251
2252                 wait_var_event_warning(&ksocknal_data.ksnd_nthreads,
2253                                        atomic_read(&ksocknal_data.ksnd_nthreads) == 0,
2254                                        "waiting for %d threads to terminate\n",
2255                                        atomic_read(&ksocknal_data.ksnd_nthreads));
2256
2257                 ksocknal_free_buffers();
2258
2259                 ksocknal_data.ksnd_init = SOCKNAL_INIT_NOTHING;
2260                 break;
2261         }
2262
2263         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %lld\n",
2264                libcfs_kmem_read());
2265
2266         module_put(THIS_MODULE);
2267 }
2268
2269 static int
2270 ksocknal_base_startup(void)
2271 {
2272         struct ksock_sched *sched;
2273         int rc;
2274         int i;
2275
2276         LASSERT(ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
2277         LASSERT(ksocknal_data.ksnd_nnets == 0);
2278
2279         memset(&ksocknal_data, 0, sizeof(ksocknal_data)); /* zero pointers */
2280
2281         hash_init(ksocknal_data.ksnd_peers);
2282
2283         rwlock_init(&ksocknal_data.ksnd_global_lock);
2284         INIT_LIST_HEAD(&ksocknal_data.ksnd_nets);
2285
2286         spin_lock_init(&ksocknal_data.ksnd_reaper_lock);
2287         INIT_LIST_HEAD(&ksocknal_data.ksnd_enomem_conns);
2288         INIT_LIST_HEAD(&ksocknal_data.ksnd_zombie_conns);
2289         INIT_LIST_HEAD(&ksocknal_data.ksnd_deathrow_conns);
2290         init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
2291
2292         spin_lock_init(&ksocknal_data.ksnd_connd_lock);
2293         INIT_LIST_HEAD(&ksocknal_data.ksnd_connd_connreqs);
2294         INIT_LIST_HEAD(&ksocknal_data.ksnd_connd_routes);
2295         init_waitqueue_head(&ksocknal_data.ksnd_connd_waitq);
2296
2297         spin_lock_init(&ksocknal_data.ksnd_tx_lock);
2298         INIT_LIST_HEAD(&ksocknal_data.ksnd_idle_noop_txs);
2299
2300         /* NB memset above zeros whole of ksocknal_data */
2301
2302         /* flag lists/ptrs/locks initialised */
2303         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
2304         if (!try_module_get(THIS_MODULE))
2305                 goto failed;
2306
2307         /* Create a scheduler block per available CPT */
2308         ksocknal_data.ksnd_schedulers = cfs_percpt_alloc(lnet_cpt_table(),
2309                                                          sizeof(*sched));
2310         if (ksocknal_data.ksnd_schedulers == NULL)
2311                 goto failed;
2312
2313         cfs_percpt_for_each(sched, i, ksocknal_data.ksnd_schedulers) {
2314                 int nthrs;
2315
2316                 /*
2317                  * make sure not to allocate more threads than there are
2318                  * cores/CPUs in teh CPT
2319                  */
2320                 nthrs = cfs_cpt_weight(lnet_cpt_table(), i);
2321                 if (*ksocknal_tunables.ksnd_nscheds > 0) {
2322                         nthrs = min(nthrs, *ksocknal_tunables.ksnd_nscheds);
2323                 } else {
2324                         /*
2325                          * max to half of CPUs, assume another half should be
2326                          * reserved for upper layer modules
2327                          */
2328                         nthrs = min(max(SOCKNAL_NSCHEDS, nthrs >> 1), nthrs);
2329                 }
2330
2331                 sched->kss_nthreads_max = nthrs;
2332                 sched->kss_cpt = i;
2333
2334                 spin_lock_init(&sched->kss_lock);
2335                 INIT_LIST_HEAD(&sched->kss_rx_conns);
2336                 INIT_LIST_HEAD(&sched->kss_tx_conns);
2337                 INIT_LIST_HEAD(&sched->kss_zombie_noop_txs);
2338                 init_waitqueue_head(&sched->kss_waitq);
2339         }
2340
2341         ksocknal_data.ksnd_connd_starting         = 0;
2342         ksocknal_data.ksnd_connd_failed_stamp     = 0;
2343         ksocknal_data.ksnd_connd_starting_stamp   = ktime_get_real_seconds();
2344         /* must have at least 2 connds to remain responsive to accepts while
2345          * connecting */
2346         if (*ksocknal_tunables.ksnd_nconnds < SOCKNAL_CONND_RESV + 1)
2347                 *ksocknal_tunables.ksnd_nconnds = SOCKNAL_CONND_RESV + 1;
2348
2349         if (*ksocknal_tunables.ksnd_nconnds_max <
2350             *ksocknal_tunables.ksnd_nconnds) {
2351                 ksocknal_tunables.ksnd_nconnds_max =
2352                         ksocknal_tunables.ksnd_nconnds;
2353         }
2354
2355         for (i = 0; i < *ksocknal_tunables.ksnd_nconnds; i++) {
2356                 spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
2357                 ksocknal_data.ksnd_connd_starting++;
2358                 spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
2359
2360                 rc = ksocknal_thread_start(ksocknal_connd,
2361                                            (void *)((uintptr_t)i),
2362                                            "socknal_cd%02d", i);
2363                 if (rc != 0) {
2364                         spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
2365                         ksocknal_data.ksnd_connd_starting--;
2366                         spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
2367                         CERROR("Can't spawn socknal connd: %d\n", rc);
2368                         goto failed;
2369                 }
2370         }
2371
2372         rc = ksocknal_thread_start(ksocknal_reaper, NULL, "socknal_reaper");
2373         if (rc != 0) {
2374                 CERROR ("Can't spawn socknal reaper: %d\n", rc);
2375                 goto failed;
2376         }
2377
2378         register_netdevice_notifier(&ksocknal_dev_notifier_block);
2379         register_inetaddr_notifier(&ksocknal_inetaddr_notifier_block);
2380
2381         /* flag everything initialised */
2382         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
2383
2384         return 0;
2385
2386  failed:
2387         ksocknal_base_shutdown();
2388         return -ENETDOWN;
2389 }
2390
2391 static int
2392 ksocknal_debug_peerhash(struct lnet_ni *ni)
2393 {
2394         struct ksock_peer_ni *peer_ni;
2395         int i;
2396
2397         read_lock(&ksocknal_data.ksnd_global_lock);
2398
2399         hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list) {
2400                 struct ksock_conn_cb *conn_cb;
2401                 struct ksock_conn *conn;
2402
2403                 if (peer_ni->ksnp_ni != ni)
2404                         continue;
2405
2406                 CWARN("Active peer_ni on shutdown: %s, ref %d, closing %d, accepting %d, err %d, zcookie %llu, txq %d, zc_req %d\n",
2407                       libcfs_idstr(&peer_ni->ksnp_id),
2408                       refcount_read(&peer_ni->ksnp_refcount),
2409                       peer_ni->ksnp_closing,
2410                       peer_ni->ksnp_accepting, peer_ni->ksnp_error,
2411                       peer_ni->ksnp_zc_next_cookie,
2412                       !list_empty(&peer_ni->ksnp_tx_queue),
2413                       !list_empty(&peer_ni->ksnp_zc_req_list));
2414
2415                 conn_cb = peer_ni->ksnp_conn_cb;
2416                 if (conn_cb) {
2417                         CWARN("ConnCB: ref %d, schd %d, conn %d, cnted %d, del %d\n",
2418                               refcount_read(&conn_cb->ksnr_refcount),
2419                               conn_cb->ksnr_scheduled, conn_cb->ksnr_connecting,
2420                               conn_cb->ksnr_connected, conn_cb->ksnr_deleted);
2421                 }
2422
2423                 list_for_each_entry(conn, &peer_ni->ksnp_conns, ksnc_list) {
2424                         CWARN("Conn: ref %d, sref %d, t %d, c %d\n",
2425                               refcount_read(&conn->ksnc_conn_refcount),
2426                               refcount_read(&conn->ksnc_sock_refcount),
2427                               conn->ksnc_type, conn->ksnc_closing);
2428                 }
2429                 break;
2430         }
2431
2432         read_unlock(&ksocknal_data.ksnd_global_lock);
2433         return 0;
2434 }
2435
2436 void
2437 ksocknal_shutdown(struct lnet_ni *ni)
2438 {
2439         struct ksock_net *net = ni->ni_data;
2440
2441         LASSERT(ksocknal_data.ksnd_init == SOCKNAL_INIT_ALL);
2442         LASSERT(ksocknal_data.ksnd_nnets > 0);
2443
2444         /* prevent new peers */
2445         atomic_add(SOCKNAL_SHUTDOWN_BIAS, &net->ksnn_npeers);
2446
2447         /* Delete all peers */
2448         ksocknal_del_peer(ni, NULL);
2449
2450         /* Wait for all peer_ni state to clean up */
2451         wait_var_event_warning(&net->ksnn_npeers,
2452                                atomic_read(&net->ksnn_npeers) ==
2453                                SOCKNAL_SHUTDOWN_BIAS,
2454                                "waiting for %d peers to disconnect\n",
2455                                ksocknal_debug_peerhash(ni) +
2456                                atomic_read(&net->ksnn_npeers) -
2457                                SOCKNAL_SHUTDOWN_BIAS);
2458
2459         LASSERT(net->ksnn_interface.ksni_npeers == 0);
2460         LASSERT(net->ksnn_interface.ksni_nroutes == 0);
2461
2462         list_del(&net->ksnn_list);
2463         LIBCFS_FREE(net, sizeof(*net));
2464
2465         ksocknal_data.ksnd_nnets--;
2466         if (ksocknal_data.ksnd_nnets == 0)
2467                 ksocknal_base_shutdown();
2468 }
2469
2470 static int
2471 ksocknal_search_new_ipif(struct ksock_net *net)
2472 {
2473         int new_ipif = 0;
2474         char *ifnam = &net->ksnn_interface.ksni_name[0];
2475         char *colon = strchr(ifnam, ':');
2476         bool found = false;
2477         struct ksock_net *tmp;
2478
2479         if (colon != NULL)
2480                 *colon = 0;
2481
2482         list_for_each_entry(tmp, &ksocknal_data.ksnd_nets, ksnn_list) {
2483                 char *ifnam2 = &tmp->ksnn_interface.ksni_name[0];
2484                 char *colon2 = strchr(ifnam2, ':');
2485
2486                 if (colon2 != NULL)
2487                         *colon2 = 0;
2488
2489                 found = strcmp(ifnam, ifnam2) == 0;
2490                 if (colon2 != NULL)
2491                         *colon2 = ':';
2492         }
2493
2494         new_ipif += !found;
2495         if (colon != NULL)
2496                 *colon = ':';
2497
2498         return new_ipif;
2499 }
2500
2501 static int
2502 ksocknal_start_schedulers(struct ksock_sched *sched)
2503 {
2504         int     nthrs;
2505         int     rc = 0;
2506         int     i;
2507
2508         if (sched->kss_nthreads == 0) {
2509                 if (*ksocknal_tunables.ksnd_nscheds > 0) {
2510                         nthrs = sched->kss_nthreads_max;
2511                 } else {
2512                         nthrs = cfs_cpt_weight(lnet_cpt_table(),
2513                                                sched->kss_cpt);
2514                         nthrs = min(max(SOCKNAL_NSCHEDS, nthrs >> 1), nthrs);
2515                         nthrs = min(SOCKNAL_NSCHEDS_HIGH, nthrs);
2516                 }
2517                 nthrs = min(nthrs, sched->kss_nthreads_max);
2518         } else {
2519                 LASSERT(sched->kss_nthreads <= sched->kss_nthreads_max);
2520                 /* increase two threads if there is new interface */
2521                 nthrs = min(2, sched->kss_nthreads_max - sched->kss_nthreads);
2522         }
2523
2524         for (i = 0; i < nthrs; i++) {
2525                 long id;
2526
2527                 id = KSOCK_THREAD_ID(sched->kss_cpt, sched->kss_nthreads + i);
2528                 rc = ksocknal_thread_start(ksocknal_scheduler, (void *)id,
2529                                            "socknal_sd%02d_%02d",
2530                                            sched->kss_cpt,
2531                                            (int)KSOCK_THREAD_SID(id));
2532                 if (rc == 0)
2533                         continue;
2534
2535                 CERROR("Can't spawn thread %d for scheduler[%d]: %d\n",
2536                        sched->kss_cpt, (int) KSOCK_THREAD_SID(id), rc);
2537                 break;
2538         }
2539
2540         sched->kss_nthreads += i;
2541         return rc;
2542 }
2543
2544 static int
2545 ksocknal_net_start_threads(struct ksock_net *net, __u32 *cpts, int ncpts)
2546 {
2547         int newif = ksocknal_search_new_ipif(net);
2548         int rc;
2549         int i;
2550
2551         if (ncpts > 0 && ncpts > cfs_cpt_number(lnet_cpt_table()))
2552                 return -EINVAL;
2553
2554         for (i = 0; i < ncpts; i++) {
2555                 struct ksock_sched *sched;
2556                 int cpt = (cpts == NULL) ? i : cpts[i];
2557
2558                 LASSERT(cpt < cfs_cpt_number(lnet_cpt_table()));
2559                 sched = ksocknal_data.ksnd_schedulers[cpt];
2560
2561                 if (!newif && sched->kss_nthreads > 0)
2562                         continue;
2563
2564                 rc = ksocknal_start_schedulers(sched);
2565                 if (rc != 0)
2566                         return rc;
2567         }
2568         return 0;
2569 }
2570
2571 int
2572 ksocknal_startup(struct lnet_ni *ni)
2573 {
2574         struct ksock_net *net;
2575         struct ksock_interface *ksi = NULL;
2576         struct lnet_inetdev *ifaces = NULL;
2577         int rc, if_idx;
2578         int dev_status;
2579
2580         LASSERT (ni->ni_net->net_lnd == &the_ksocklnd);
2581         if (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING) {
2582                 rc = ksocknal_base_startup();
2583                 if (rc != 0)
2584                         return rc;
2585         }
2586         LIBCFS_ALLOC(net, sizeof(*net));
2587         if (net == NULL)
2588                 goto out_base;
2589
2590         net->ksnn_incarnation = ktime_get_real_ns();
2591         ni->ni_data = net;
2592
2593         ksocknal_tunables_setup(ni);
2594
2595         rc = lnet_inet_enumerate(&ifaces, ni->ni_net_ns, true);
2596         if (rc < 0)
2597                 goto out_net;
2598
2599         ksi = &net->ksnn_interface;
2600
2601         /* Interface and/or IP address is specified otherwise default to
2602          * the first Interface
2603          */
2604         if_idx = lnet_inet_select(ni, ifaces, rc);
2605         if (if_idx < 0)
2606                 goto out_net;
2607
2608         if (!ni->ni_interface) {
2609                 rc = lnet_ni_add_interface(ni, ifaces[if_idx].li_name);
2610                 if (rc < 0)
2611                         CWARN("ksocklnd failed to allocate ni_interface\n");
2612         }
2613
2614         ni->ni_dev_cpt = ifaces[if_idx].li_cpt;
2615         ksi->ksni_index = ifaces[if_idx].li_index;
2616         if (ifaces[if_idx].li_size == sizeof(struct in6_addr)) {
2617                 struct sockaddr_in6 *sa;
2618                 sa = (void *)&ksi->ksni_addr;
2619                 memset(sa, 0, sizeof(*sa));
2620                 sa->sin6_family = AF_INET6;
2621                 memcpy(&sa->sin6_addr, ifaces[if_idx].li_ipv6addr,
2622                        sizeof(struct in6_addr));
2623                 ni->ni_nid.nid_size = sizeof(struct in6_addr) - 4;
2624                 memcpy(&ni->ni_nid.nid_addr, ifaces[if_idx].li_ipv6addr,
2625                        sizeof(struct in6_addr));
2626         } else {
2627                 struct sockaddr_in *sa;
2628                 sa = (void *)&ksi->ksni_addr;
2629                 memset(sa, 0, sizeof(*sa));
2630                 sa->sin_family = AF_INET;
2631                 sa->sin_addr.s_addr = ifaces[if_idx].li_ipaddr;
2632                 ksi->ksni_netmask = ifaces[if_idx].li_netmask;
2633                 ni->ni_nid.nid_size = 0;
2634                 ni->ni_nid.nid_addr[0] = sa->sin_addr.s_addr;
2635         }
2636         strscpy(ksi->ksni_name, ifaces[if_idx].li_name, sizeof(ksi->ksni_name));
2637
2638         /* call it before add it to ksocknal_data.ksnd_nets */
2639         rc = ksocknal_net_start_threads(net, ni->ni_cpts, ni->ni_ncpts);
2640         if (rc != 0)
2641                 goto out_net;
2642
2643         if ((ksocknal_ip2index((struct sockaddr *)&ksi->ksni_addr,
2644                                 ni,
2645                                 &dev_status) < 0) ||
2646              (dev_status <= 0))
2647                 lnet_set_link_fatal_state(ni, 1);
2648
2649         list_add(&net->ksnn_list, &ksocknal_data.ksnd_nets);
2650         net->ksnn_ni = ni;
2651         ksocknal_data.ksnd_nnets++;
2652         kfree(ifaces);
2653
2654         return 0;
2655
2656 out_net:
2657         LIBCFS_FREE(net, sizeof(*net));
2658 out_base:
2659         if (ksocknal_data.ksnd_nnets == 0)
2660                 ksocknal_base_shutdown();
2661         kfree(ifaces);
2662
2663         return -ENETDOWN;
2664 }
2665
2666 static void __exit ksocklnd_exit(void)
2667 {
2668         lnet_unregister_lnd(&the_ksocklnd);
2669 }
2670
2671 static const struct lnet_lnd the_ksocklnd = {
2672         .lnd_type               = SOCKLND,
2673         .lnd_startup            = ksocknal_startup,
2674         .lnd_shutdown           = ksocknal_shutdown,
2675         .lnd_ctl                = ksocknal_ctl,
2676         .lnd_send               = ksocknal_send,
2677         .lnd_recv               = ksocknal_recv,
2678         .lnd_notify_peer_down   = ksocknal_notify_gw_down,
2679         .lnd_accept             = ksocknal_accept,
2680         .lnd_nl_get             = ksocknal_nl_get,
2681         .lnd_nl_set             = ksocknal_nl_set,
2682         .lnd_keys               = &ksocknal_tunables_keys,
2683 };
2684
2685 static int __init ksocklnd_init(void)
2686 {
2687         int rc;
2688
2689         /* check ksnr_connected/connecting field large enough */
2690         BUILD_BUG_ON(SOCKLND_CONN_NTYPES > 4);
2691         BUILD_BUG_ON(SOCKLND_CONN_ACK != SOCKLND_CONN_BULK_IN);
2692
2693         rc = ksocknal_tunables_init();
2694         if (rc != 0)
2695                 return rc;
2696
2697         rc = libcfs_setup();
2698         if (rc)
2699                 return rc;
2700
2701         lnet_register_lnd(&the_ksocklnd);
2702
2703         return 0;
2704 }
2705
2706 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2707 MODULE_DESCRIPTION("TCP Socket LNet Network Driver");
2708 MODULE_VERSION("2.8.0");
2709 MODULE_LICENSE("GPL");
2710
2711 module_init(ksocklnd_init);
2712 module_exit(ksocklnd_exit);