Whamcloud - gitweb
LU-9859 libcfs: refactor libcfs initialization.
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lnet/klnds/socklnd/socklnd.c
32  *
33  * Author: Zach Brown <zab@zabbo.net>
34  * Author: Peter J. Braam <braam@clusterfs.com>
35  * Author: Phil Schwan <phil@clusterfs.com>
36  * Author: Eric Barton <eric@bartonsoftware.com>
37  */
38
39 #include <linux/ethtool.h>
40 #include <linux/inetdevice.h>
41 #include <linux/kernel.h>
42 #include <linux/sunrpc/addr.h>
43 #include <net/addrconf.h>
44 #include "socklnd.h"
45
46 static const struct lnet_lnd the_ksocklnd;
47 struct ksock_nal_data ksocknal_data;
48
49 static int ksocknal_ip2index(struct sockaddr *addr, struct lnet_ni *ni,
50                              int *dev_status)
51 {
52         struct net_device *dev;
53         int ret = -1;
54         DECLARE_CONST_IN_IFADDR(ifa);
55
56         *dev_status = -1;
57
58         if (addr->sa_family != AF_INET && addr->sa_family != AF_INET6)
59                 return ret;
60
61         rcu_read_lock();
62         for_each_netdev(ni->ni_net_ns, dev) {
63                 int flags = dev_get_flags(dev);
64                 struct in_device *in_dev;
65
66                 if (flags & IFF_LOOPBACK) /* skip the loopback IF */
67                         continue;
68
69                 if (!(flags & IFF_UP))
70                         continue;
71
72                 switch (addr->sa_family) {
73                 case AF_INET:
74                         in_dev = __in_dev_get_rcu(dev);
75                         if (!in_dev)
76                                 continue;
77
78                         in_dev_for_each_ifa_rcu(ifa, in_dev) {
79                                 if (ifa->ifa_local ==
80                                     ((struct sockaddr_in *)addr)->sin_addr.s_addr)
81                                         ret = dev->ifindex;
82                         }
83                         endfor_ifa(in_dev);
84                         break;
85 #if IS_ENABLED(CONFIG_IPV6)
86                 case AF_INET6: {
87                         struct inet6_dev *in6_dev;
88                         const struct inet6_ifaddr *ifa6;
89                         struct sockaddr_in6 *addr6 = (struct sockaddr_in6*)addr;
90
91                         in6_dev = __in6_dev_get(dev);
92                         if (!in6_dev)
93                                 continue;
94
95                         list_for_each_entry_rcu(ifa6, &in6_dev->addr_list, if_list) {
96                                 if (ipv6_addr_cmp(&ifa6->addr,
97                                                  &addr6->sin6_addr) == 0)
98                                         ret = dev->ifindex;
99                         }
100                         break;
101                         }
102 #endif /* IS_ENABLED(CONFIG_IPV6) */
103                 }
104                 if (ret >= 0)
105                         break;
106         }
107
108         rcu_read_unlock();
109         if (ret >= 0)
110                 *dev_status = 1;
111
112         if ((ret == -1) ||
113             ((dev->reg_state == NETREG_UNREGISTERING) ||
114              (dev->operstate != IF_OPER_UP)) ||
115             (lnet_get_link_status(dev) == 0))
116                 *dev_status = 0;
117
118         return ret;
119 }
120
121 static struct ksock_conn_cb *
122 ksocknal_create_conn_cb(struct sockaddr *addr)
123 {
124         struct ksock_conn_cb *conn_cb;
125
126         LIBCFS_ALLOC(conn_cb, sizeof(*conn_cb));
127         if (!conn_cb)
128                 return NULL;
129
130         refcount_set(&conn_cb->ksnr_refcount, 1);
131         conn_cb->ksnr_peer = NULL;
132         conn_cb->ksnr_retry_interval = 0;         /* OK to connect at any time */
133         rpc_copy_addr((struct sockaddr *)&conn_cb->ksnr_addr, addr);
134         rpc_set_port((struct sockaddr *)&conn_cb->ksnr_addr,
135                      rpc_get_port(addr));
136         conn_cb->ksnr_scheduled = 0;
137         conn_cb->ksnr_connecting = 0;
138         conn_cb->ksnr_connected = 0;
139         conn_cb->ksnr_deleted = 0;
140         conn_cb->ksnr_conn_count = 0;
141         conn_cb->ksnr_ctrl_conn_count = 0;
142         conn_cb->ksnr_blki_conn_count = 0;
143         conn_cb->ksnr_blko_conn_count = 0;
144         conn_cb->ksnr_max_conns = 0;
145         conn_cb->ksnr_busy_retry_count = 0;
146
147         return conn_cb;
148 }
149
150 void
151 ksocknal_destroy_conn_cb(struct ksock_conn_cb *conn_cb)
152 {
153         LASSERT(refcount_read(&conn_cb->ksnr_refcount) == 0);
154
155         if (conn_cb->ksnr_peer)
156                 ksocknal_peer_decref(conn_cb->ksnr_peer);
157
158         LIBCFS_FREE(conn_cb, sizeof(*conn_cb));
159 }
160
161 static struct ksock_peer_ni *
162 ksocknal_create_peer(struct lnet_ni *ni, struct lnet_processid *id)
163 {
164         int cpt = lnet_nid2cpt(&id->nid, ni);
165         struct ksock_net *net = ni->ni_data;
166         struct ksock_peer_ni *peer_ni;
167
168         LASSERT(!LNET_NID_IS_ANY(&id->nid));
169         LASSERT(id->pid != LNET_PID_ANY);
170         LASSERT(!in_interrupt());
171
172         if (!atomic_inc_unless_negative(&net->ksnn_npeers)) {
173                 CERROR("Can't create peer_ni: network shutdown\n");
174                 return ERR_PTR(-ESHUTDOWN);
175         }
176
177         LIBCFS_CPT_ALLOC(peer_ni, lnet_cpt_table(), cpt, sizeof(*peer_ni));
178         if (!peer_ni) {
179                 atomic_dec(&net->ksnn_npeers);
180                 return ERR_PTR(-ENOMEM);
181         }
182
183         peer_ni->ksnp_ni = ni;
184         peer_ni->ksnp_id = *id;
185         refcount_set(&peer_ni->ksnp_refcount, 1); /* 1 ref for caller */
186         peer_ni->ksnp_closing = 0;
187         peer_ni->ksnp_accepting = 0;
188         peer_ni->ksnp_proto = NULL;
189         peer_ni->ksnp_last_alive = 0;
190         peer_ni->ksnp_zc_next_cookie = SOCKNAL_KEEPALIVE_PING + 1;
191         peer_ni->ksnp_conn_cb = NULL;
192
193         INIT_LIST_HEAD(&peer_ni->ksnp_conns);
194         INIT_LIST_HEAD(&peer_ni->ksnp_tx_queue);
195         INIT_LIST_HEAD(&peer_ni->ksnp_zc_req_list);
196         spin_lock_init(&peer_ni->ksnp_lock);
197
198         return peer_ni;
199 }
200
201 void
202 ksocknal_destroy_peer(struct ksock_peer_ni *peer_ni)
203 {
204         struct ksock_net *net = peer_ni->ksnp_ni->ni_data;
205
206         CDEBUG (D_NET, "peer_ni %s %p deleted\n",
207                 libcfs_idstr(&peer_ni->ksnp_id), peer_ni);
208
209         LASSERT(refcount_read(&peer_ni->ksnp_refcount) == 0);
210         LASSERT(peer_ni->ksnp_accepting == 0);
211         LASSERT(list_empty(&peer_ni->ksnp_conns));
212         LASSERT(peer_ni->ksnp_conn_cb == NULL);
213         LASSERT(list_empty(&peer_ni->ksnp_tx_queue));
214         LASSERT(list_empty(&peer_ni->ksnp_zc_req_list));
215
216         LIBCFS_FREE(peer_ni, sizeof(*peer_ni));
217
218         /* NB a peer_ni's connections and conn_cb keep a reference on their
219          * peer_ni until they are destroyed, so we can be assured that _all_
220          * state to do with this peer_ni has been cleaned up when its refcount
221          * drops to zero.
222          */
223         if (atomic_dec_and_test(&net->ksnn_npeers))
224                 wake_up_var(&net->ksnn_npeers);
225 }
226
227 struct ksock_peer_ni *
228 ksocknal_find_peer_locked(struct lnet_ni *ni, struct lnet_processid *id)
229 {
230         struct ksock_peer_ni *peer_ni;
231         unsigned long hash = nidhash(&id->nid);
232
233         hash_for_each_possible(ksocknal_data.ksnd_peers, peer_ni,
234                                ksnp_list, hash) {
235                 LASSERT(!peer_ni->ksnp_closing);
236
237                 if (peer_ni->ksnp_ni != ni)
238                         continue;
239
240                 if (!nid_same(&peer_ni->ksnp_id.nid, &id->nid) ||
241                     peer_ni->ksnp_id.pid != id->pid)
242                         continue;
243
244                 CDEBUG(D_NET, "got peer_ni [%p] -> %s (%d)\n",
245                        peer_ni, libcfs_idstr(id),
246                        refcount_read(&peer_ni->ksnp_refcount));
247                 return peer_ni;
248         }
249         return NULL;
250 }
251
252 struct ksock_peer_ni *
253 ksocknal_find_peer(struct lnet_ni *ni, struct lnet_processid *id)
254 {
255         struct ksock_peer_ni *peer_ni;
256
257         read_lock(&ksocknal_data.ksnd_global_lock);
258         peer_ni = ksocknal_find_peer_locked(ni, id);
259         if (peer_ni != NULL)                    /* +1 ref for caller? */
260                 ksocknal_peer_addref(peer_ni);
261         read_unlock(&ksocknal_data.ksnd_global_lock);
262
263         return peer_ni;
264 }
265
266 static void
267 ksocknal_unlink_peer_locked(struct ksock_peer_ni *peer_ni)
268 {
269         LASSERT(list_empty(&peer_ni->ksnp_conns));
270         LASSERT(peer_ni->ksnp_conn_cb == NULL);
271         LASSERT(!peer_ni->ksnp_closing);
272         peer_ni->ksnp_closing = 1;
273         hlist_del(&peer_ni->ksnp_list);
274         /* lose peerlist's ref */
275         ksocknal_peer_decref(peer_ni);
276 }
277
278
279 static void
280 ksocknal_dump_peer_debug_info(struct ksock_peer_ni *peer_ni)
281 {
282         struct ksock_conn *conn;
283         struct list_head *ctmp;
284         struct list_head *txtmp;
285         int ccount = 0;
286         int txcount = 0;
287
288         list_for_each(ctmp, &peer_ni->ksnp_conns) {
289                 conn = list_entry(ctmp, struct ksock_conn, ksnc_list);
290
291                 if (!list_empty(&conn->ksnc_tx_queue))
292                         list_for_each(txtmp, &conn->ksnc_tx_queue) txcount++;
293
294                 CDEBUG(D_CONSOLE, "Conn %d [type, closing, crefcnt, srefcnt]: %d, %d, %d, %d\n",
295                        ccount,
296                        conn->ksnc_type,
297                        conn->ksnc_closing,
298                        refcount_read(&conn->ksnc_conn_refcount),
299                        refcount_read(&conn->ksnc_sock_refcount));
300                 CDEBUG(D_CONSOLE, "Conn %d rx [scheduled, ready, state]: %d, %d, %d\n",
301                        ccount,
302                        conn->ksnc_rx_scheduled,
303                        conn->ksnc_rx_ready,
304                        conn->ksnc_rx_state);
305                 CDEBUG(D_CONSOLE, "Conn %d tx [txqcnt, scheduled, last_post, ready, deadline]: %d, %d, %lld, %d, %lld\n",
306                        ccount,
307                        txcount,
308                        conn->ksnc_tx_scheduled,
309                        conn->ksnc_tx_last_post,
310                        conn->ksnc_rx_ready,
311                        conn->ksnc_rx_deadline);
312
313                 if (conn->ksnc_scheduler)
314                         CDEBUG(D_CONSOLE, "Conn %d sched [nconns, cpt]: %d, %d\n",
315                                ccount,
316                                conn->ksnc_scheduler->kss_nconns,
317                                conn->ksnc_scheduler->kss_cpt);
318
319                 txcount = 0;
320                 ccount++;
321         }
322 }
323
324 static int
325 ksocknal_get_peer_info(struct lnet_ni *ni, int index,
326                        struct lnet_processid *id, __u32 *myip, __u32 *peer_ip,
327                        int *port, int *conn_count, int *share_count)
328 {
329         struct ksock_peer_ni *peer_ni;
330         struct ksock_conn_cb *conn_cb;
331         int i;
332         int rc = -ENOENT;
333         struct ksock_net *net;
334
335         read_lock(&ksocknal_data.ksnd_global_lock);
336
337         hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list) {
338
339                 if (peer_ni->ksnp_ni != ni)
340                         continue;
341                 if (index-- > 0)
342                         continue;
343
344                 *id = peer_ni->ksnp_id;
345                 conn_cb = peer_ni->ksnp_conn_cb;
346                 if (conn_cb == NULL) {
347                         *myip = 0;
348                         *peer_ip = 0;
349                         *port = 0;
350                         *conn_count = 0;
351                         *share_count = 0;
352                         rc = 0;
353                 } else {
354                         ksocknal_dump_peer_debug_info(peer_ni);
355
356                         if (conn_cb->ksnr_addr.ss_family == AF_INET) {
357                                 struct sockaddr_in *sa =
358                                         (void *)&conn_cb->ksnr_addr;
359                                 net = ni->ni_data;
360                                 rc = choose_ipv4_src(myip,
361                                                      net->ksnn_interface.ksni_index,
362                                                      ntohl(sa->sin_addr.s_addr),
363                                                      ni->ni_net_ns);
364                                 *peer_ip = ntohl(sa->sin_addr.s_addr);
365                                 *port = ntohs(sa->sin_port);
366
367                         } else {
368                                 *myip = 0xFFFFFFFF;
369                                 *peer_ip = 0xFFFFFFFF;
370                                 *port = 0;
371                                 rc = -ENOTSUPP;
372                         }
373                         *conn_count = conn_cb->ksnr_conn_count;
374                         *share_count = 1;
375                 }
376                 break;
377         }
378         read_unlock(&ksocknal_data.ksnd_global_lock);
379         return rc;
380 }
381
382 static unsigned int
383 ksocknal_get_conns_per_peer(struct ksock_peer_ni *peer_ni)
384 {
385         struct lnet_ni *ni = peer_ni->ksnp_ni;
386         struct lnet_ioctl_config_socklnd_tunables *tunables;
387
388         LASSERT(ni);
389
390         tunables = &ni->ni_lnd_tunables.lnd_tun_u.lnd_sock;
391
392         return tunables->lnd_conns_per_peer;
393 }
394
395 static void
396 ksocknal_incr_conn_count(struct ksock_conn_cb *conn_cb,
397                          int type)
398 {
399         conn_cb->ksnr_conn_count++;
400
401         /* check if all connections of the given type got created */
402         switch (type) {
403         case SOCKLND_CONN_CONTROL:
404                 conn_cb->ksnr_ctrl_conn_count++;
405                 /* there's a single control connection per peer,
406                  * two in case of loopback
407                  */
408                 conn_cb->ksnr_connected |= BIT(type);
409                 break;
410         case SOCKLND_CONN_BULK_IN:
411                 conn_cb->ksnr_blki_conn_count++;
412                 if (conn_cb->ksnr_blki_conn_count >= conn_cb->ksnr_max_conns)
413                         conn_cb->ksnr_connected |= BIT(type);
414                 break;
415         case SOCKLND_CONN_BULK_OUT:
416                 conn_cb->ksnr_blko_conn_count++;
417                 if (conn_cb->ksnr_blko_conn_count >= conn_cb->ksnr_max_conns)
418                         conn_cb->ksnr_connected |= BIT(type);
419                 break;
420         case SOCKLND_CONN_ANY:
421                 if (conn_cb->ksnr_conn_count >= conn_cb->ksnr_max_conns)
422                         conn_cb->ksnr_connected |= BIT(type);
423                 break;
424         default:
425                 LBUG();
426                 break;
427         }
428
429         CDEBUG(D_NET, "Add conn type %d, ksnr_connected %x ksnr_max_conns %d\n",
430                type, conn_cb->ksnr_connected, conn_cb->ksnr_max_conns);
431 }
432
433
434 static void
435 ksocknal_decr_conn_count(struct ksock_conn_cb *conn_cb,
436                          int type)
437 {
438         conn_cb->ksnr_conn_count--;
439
440         /* check if all connections of the given type got created */
441         switch (type) {
442         case SOCKLND_CONN_CONTROL:
443                 conn_cb->ksnr_ctrl_conn_count--;
444                 /* there's a single control connection per peer,
445                  * two in case of loopback
446                  */
447                 if (conn_cb->ksnr_ctrl_conn_count == 0)
448                         conn_cb->ksnr_connected &= ~BIT(type);
449                 break;
450         case SOCKLND_CONN_BULK_IN:
451                 conn_cb->ksnr_blki_conn_count--;
452                 if (conn_cb->ksnr_blki_conn_count < conn_cb->ksnr_max_conns)
453                         conn_cb->ksnr_connected &= ~BIT(type);
454                 break;
455         case SOCKLND_CONN_BULK_OUT:
456                 conn_cb->ksnr_blko_conn_count--;
457                 if (conn_cb->ksnr_blko_conn_count < conn_cb->ksnr_max_conns)
458                         conn_cb->ksnr_connected &= ~BIT(type);
459                 break;
460         case SOCKLND_CONN_ANY:
461                 if (conn_cb->ksnr_conn_count < conn_cb->ksnr_max_conns)
462                         conn_cb->ksnr_connected &= ~BIT(type);
463                 break;
464         default:
465                 LBUG();
466                 break;
467         }
468
469         CDEBUG(D_NET, "Del conn type %d, ksnr_connected %x ksnr_max_conns %d\n",
470                type, conn_cb->ksnr_connected, conn_cb->ksnr_max_conns);
471 }
472
473 static void
474 ksocknal_associate_cb_conn_locked(struct ksock_conn_cb *conn_cb,
475                                   struct ksock_conn *conn)
476 {
477         int type = conn->ksnc_type;
478
479         conn->ksnc_conn_cb = conn_cb;
480         ksocknal_conn_cb_addref(conn_cb);
481         ksocknal_incr_conn_count(conn_cb, type);
482
483         /* Successful connection => further attempts can
484          * proceed immediately
485          */
486         conn_cb->ksnr_retry_interval = 0;
487 }
488
489 static void
490 ksocknal_add_conn_cb_locked(struct ksock_peer_ni *peer_ni,
491                             struct ksock_conn_cb *conn_cb)
492 {
493         struct ksock_conn *conn;
494         struct ksock_net *net = peer_ni->ksnp_ni->ni_data;
495
496         LASSERT(!peer_ni->ksnp_closing);
497         LASSERT(!conn_cb->ksnr_peer);
498         LASSERT(!conn_cb->ksnr_scheduled);
499         LASSERT(!conn_cb->ksnr_connecting);
500         LASSERT(conn_cb->ksnr_connected == 0);
501
502         conn_cb->ksnr_peer = peer_ni;
503         ksocknal_peer_addref(peer_ni);
504
505         /* peer_ni's route list takes over my ref on 'route' */
506         peer_ni->ksnp_conn_cb = conn_cb;
507         net->ksnn_interface.ksni_nroutes++;
508
509         list_for_each_entry(conn, &peer_ni->ksnp_conns, ksnc_list) {
510                 if (!rpc_cmp_addr((struct sockaddr *)&conn->ksnc_peeraddr,
511                                   (struct sockaddr *)&conn_cb->ksnr_addr))
512                         continue;
513                 CDEBUG(D_NET, "call ksocknal_associate_cb_conn_locked\n");
514                 ksocknal_associate_cb_conn_locked(conn_cb, conn);
515                 /* keep going (typed conns) */
516         }
517 }
518
519 static void
520 ksocknal_del_conn_cb_locked(struct ksock_conn_cb *conn_cb)
521 {
522         struct ksock_peer_ni *peer_ni = conn_cb->ksnr_peer;
523         struct ksock_conn *conn;
524         struct ksock_conn *cnxt;
525         struct ksock_net *net;
526
527         LASSERT(!conn_cb->ksnr_deleted);
528
529         /* Close associated conns */
530         list_for_each_entry_safe(conn, cnxt, &peer_ni->ksnp_conns, ksnc_list) {
531                 if (conn->ksnc_conn_cb != conn_cb)
532                         continue;
533
534                 ksocknal_close_conn_locked(conn, 0);
535         }
536
537         net = (struct ksock_net *)(peer_ni->ksnp_ni->ni_data);
538         net->ksnn_interface.ksni_nroutes--;
539         LASSERT(net->ksnn_interface.ksni_nroutes >= 0);
540
541         conn_cb->ksnr_deleted = 1;
542         ksocknal_conn_cb_decref(conn_cb);               /* drop peer_ni's ref */
543         peer_ni->ksnp_conn_cb = NULL;
544
545         if (list_empty(&peer_ni->ksnp_conns)) {
546                 /* I've just removed the last route to a peer_ni with no active
547                  * connections
548                  */
549                 ksocknal_unlink_peer_locked(peer_ni);
550         }
551 }
552
553 unsigned int
554 ksocknal_get_conn_count_by_type(struct ksock_conn_cb *conn_cb,
555                                 int type)
556 {
557         unsigned int count = 0;
558
559         switch (type) {
560         case SOCKLND_CONN_CONTROL:
561                 count = conn_cb->ksnr_ctrl_conn_count;
562                 break;
563         case SOCKLND_CONN_BULK_IN:
564                 count = conn_cb->ksnr_blki_conn_count;
565                 break;
566         case SOCKLND_CONN_BULK_OUT:
567                 count = conn_cb->ksnr_blko_conn_count;
568                 break;
569         case SOCKLND_CONN_ANY:
570                 count = conn_cb->ksnr_conn_count;
571                 break;
572         default:
573                 LBUG();
574                 break;
575         }
576
577         return count;
578 }
579
580 int
581 ksocknal_add_peer(struct lnet_ni *ni, struct lnet_processid *id,
582                   struct sockaddr *addr)
583 {
584         struct ksock_peer_ni *peer_ni;
585         struct ksock_peer_ni *peer2;
586         struct ksock_conn_cb *conn_cb;
587
588         if (LNET_NID_IS_ANY(&id->nid) ||
589             id->pid == LNET_PID_ANY)
590                 return (-EINVAL);
591
592         /* Have a brand new peer_ni ready... */
593         peer_ni = ksocknal_create_peer(ni, id);
594         if (IS_ERR(peer_ni))
595                 return PTR_ERR(peer_ni);
596
597         conn_cb = ksocknal_create_conn_cb(addr);
598         if (!conn_cb) {
599                 ksocknal_peer_decref(peer_ni);
600                 return -ENOMEM;
601         }
602
603         write_lock_bh(&ksocknal_data.ksnd_global_lock);
604
605         /* always called with a ref on ni, so shutdown can't have started */
606         LASSERT(atomic_read(&((struct ksock_net *)ni->ni_data)->ksnn_npeers)
607                 >= 0);
608
609         peer2 = ksocknal_find_peer_locked(ni, id);
610         if (peer2 != NULL) {
611                 ksocknal_peer_decref(peer_ni);
612                 peer_ni = peer2;
613         } else {
614                 /* peer_ni table takes my ref on peer_ni */
615                 hash_add(ksocknal_data.ksnd_peers, &peer_ni->ksnp_list,
616                          nidhash(&id->nid));
617         }
618
619         if (peer_ni->ksnp_conn_cb) {
620                 ksocknal_conn_cb_decref(conn_cb);
621         } else {
622                 /* Remember conns_per_peer setting at the time
623                  * of connection initiation. It will define the
624                  * max number of conns per type for this conn_cb
625                  * while it's in use.
626                  */
627                 conn_cb->ksnr_max_conns = ksocknal_get_conns_per_peer(peer_ni);
628                 ksocknal_add_conn_cb_locked(peer_ni, conn_cb);
629         }
630
631         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
632
633         return 0;
634 }
635
636 static void
637 ksocknal_del_peer_locked(struct ksock_peer_ni *peer_ni)
638 {
639         struct ksock_conn *conn;
640         struct ksock_conn *cnxt;
641         struct ksock_conn_cb *conn_cb;
642
643         LASSERT(!peer_ni->ksnp_closing);
644
645         /* Extra ref prevents peer_ni disappearing until I'm done with it */
646         ksocknal_peer_addref(peer_ni);
647         conn_cb = peer_ni->ksnp_conn_cb;
648         if (conn_cb)
649                 ksocknal_del_conn_cb_locked(conn_cb);
650
651         list_for_each_entry_safe(conn, cnxt, &peer_ni->ksnp_conns,
652                                  ksnc_list)
653                 ksocknal_close_conn_locked(conn, 0);
654
655         ksocknal_peer_decref(peer_ni);
656         /* NB peer_ni unlinks itself when last conn/conn_cb is removed */
657 }
658
659 static int
660 ksocknal_del_peer(struct lnet_ni *ni, struct lnet_processid *id)
661 {
662         LIST_HEAD(zombies);
663         struct hlist_node *pnxt;
664         struct ksock_peer_ni *peer_ni;
665         int lo;
666         int hi;
667         int i;
668         int rc = -ENOENT;
669
670         write_lock_bh(&ksocknal_data.ksnd_global_lock);
671
672         if (id && !LNET_NID_IS_ANY(&id->nid)) {
673                 lo = hash_min(nidhash(&id->nid),
674                               HASH_BITS(ksocknal_data.ksnd_peers));
675                 hi = lo;
676         } else {
677                 lo = 0;
678                 hi = HASH_SIZE(ksocknal_data.ksnd_peers) - 1;
679         }
680
681         for (i = lo; i <= hi; i++) {
682                 hlist_for_each_entry_safe(peer_ni, pnxt,
683                                           &ksocknal_data.ksnd_peers[i],
684                                           ksnp_list) {
685                         if (peer_ni->ksnp_ni != ni)
686                                 continue;
687
688                         if (!((!id || LNET_NID_IS_ANY(&id->nid) ||
689                                nid_same(&peer_ni->ksnp_id.nid, &id->nid)) &&
690                               (!id || id->pid == LNET_PID_ANY ||
691                                peer_ni->ksnp_id.pid == id->pid)))
692                                 continue;
693
694                         ksocknal_peer_addref(peer_ni);  /* a ref for me... */
695
696                         ksocknal_del_peer_locked(peer_ni);
697
698                         if (peer_ni->ksnp_closing &&
699                             !list_empty(&peer_ni->ksnp_tx_queue)) {
700                                 LASSERT(list_empty(&peer_ni->ksnp_conns));
701                                 LASSERT(peer_ni->ksnp_conn_cb == NULL);
702
703                                 list_splice_init(&peer_ni->ksnp_tx_queue,
704                                                  &zombies);
705                         }
706
707                         ksocknal_peer_decref(peer_ni);  /* ...till here */
708
709                         rc = 0;                         /* matched! */
710                 }
711         }
712
713         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
714
715         ksocknal_txlist_done(ni, &zombies, -ENETDOWN);
716
717         return rc;
718 }
719
720 static struct ksock_conn *
721 ksocknal_get_conn_by_idx(struct lnet_ni *ni, int index)
722 {
723         struct ksock_peer_ni *peer_ni;
724         struct ksock_conn *conn;
725         int i;
726
727         read_lock(&ksocknal_data.ksnd_global_lock);
728
729         hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list) {
730                 LASSERT(!peer_ni->ksnp_closing);
731
732                 if (peer_ni->ksnp_ni != ni)
733                         continue;
734
735                 list_for_each_entry(conn, &peer_ni->ksnp_conns,
736                                     ksnc_list) {
737                         if (index-- > 0)
738                                 continue;
739
740                         ksocknal_conn_addref(conn);
741                         read_unlock(&ksocknal_data.ksnd_global_lock);
742                         return conn;
743                 }
744         }
745
746         read_unlock(&ksocknal_data.ksnd_global_lock);
747         return NULL;
748 }
749
750 static struct ksock_sched *
751 ksocknal_choose_scheduler_locked(unsigned int cpt)
752 {
753         struct ksock_sched *sched = ksocknal_data.ksnd_schedulers[cpt];
754         int i;
755
756         if (sched->kss_nthreads == 0) {
757                 cfs_percpt_for_each(sched, i, ksocknal_data.ksnd_schedulers) {
758                         if (sched->kss_nthreads > 0) {
759                                 CDEBUG(D_NET, "scheduler[%d] has no threads. selected scheduler[%d]\n",
760                                        cpt, sched->kss_cpt);
761                                 return sched;
762                         }
763                 }
764                 return NULL;
765         }
766
767         return sched;
768 }
769
770 int
771 ksocknal_accept(struct lnet_ni *ni, struct socket *sock)
772 {
773         struct ksock_connreq *cr;
774         int rc;
775         struct sockaddr_storage peer;
776
777         rc = lnet_sock_getaddr(sock, true, &peer);
778         if (rc != 0) {
779                 CERROR("Can't determine new connection's address\n");
780                 return rc;
781         }
782
783         LIBCFS_ALLOC(cr, sizeof(*cr));
784         if (cr == NULL) {
785                 LCONSOLE_ERROR_MSG(0x12f,
786                                    "Dropping connection request from %pISc: memory exhausted\n",
787                                    &peer);
788                 return -ENOMEM;
789         }
790
791         lnet_ni_addref(ni);
792         cr->ksncr_ni   = ni;
793         cr->ksncr_sock = sock;
794
795         spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
796
797         list_add_tail(&cr->ksncr_list, &ksocknal_data.ksnd_connd_connreqs);
798         wake_up(&ksocknal_data.ksnd_connd_waitq);
799
800         spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
801         return 0;
802 }
803
804 static const struct ln_key_list ksocknal_tunables_keys = {
805         .lkl_maxattr                    = LNET_NET_SOCKLND_TUNABLES_ATTR_MAX,
806         .lkl_list                       = {
807                 [LNET_NET_SOCKLND_TUNABLES_ATTR_CONNS_PER_PEER]  = {
808                         .lkp_value      = "conns_per_peer",
809                         .lkp_data_type  = NLA_U16
810                 },
811                 [LNET_NET_SOCKLND_TUNABLES_ATTR_LND_TIMEOUT]    = {
812                         .lkp_value      = "timeout",
813                         .lkp_data_type  = NLA_U32
814                 },
815         },
816 };
817
818 static int
819 ksocknal_nl_get(int cmd, struct sk_buff *msg, int type, void *data)
820 {
821         struct lnet_lnd_tunables *tun;
822         struct lnet_ni *ni = data;
823
824         if (!ni || !msg)
825                 return -EINVAL;
826
827          if (cmd != LNET_CMD_NETS || type != LNET_NET_LOCAL_NI_ATTR_LND_TUNABLES)
828                 return -EOPNOTSUPP;
829
830         tun = &ni->ni_lnd_tunables;
831         nla_put_u16(msg, LNET_NET_SOCKLND_TUNABLES_ATTR_CONNS_PER_PEER,
832                     tun->lnd_tun_u.lnd_sock.lnd_conns_per_peer);
833         nla_put_u32(msg, LNET_NET_SOCKLND_TUNABLES_ATTR_LND_TIMEOUT,
834                     ksocknal_timeout());
835
836         return 0;
837 }
838
839 static int
840 ksocknal_nl_set(int cmd, struct nlattr *attr, int type, void *data)
841 {
842         struct lnet_lnd_tunables *tunables = data;
843         s64 num;
844
845         if (cmd != LNET_CMD_NETS)
846                 return -EOPNOTSUPP;
847
848         if (nla_type(attr) != LN_SCALAR_ATTR_INT_VALUE)
849                 return -EINVAL;
850
851         switch (type) {
852         case LNET_NET_SOCKLND_TUNABLES_ATTR_CONNS_PER_PEER:
853                 /* value values are 1 to 127. Zero mean calculate the value */
854                 num = nla_get_s64(attr);
855                 clamp_t(s64, num, 0, 127);
856                 tunables->lnd_tun_u.lnd_sock.lnd_conns_per_peer = num;
857                 break;
858         case LNET_NET_SOCKLND_TUNABLES_ATTR_LND_TIMEOUT:
859                 num = nla_get_s64(attr);
860                 tunables->lnd_tun_u.lnd_sock.lnd_timeout = num;
861                 fallthrough;
862         default:
863                 break;
864         }
865
866         return 0;
867 }
868
869 static int
870 ksocknal_connecting(struct ksock_conn_cb *conn_cb, struct sockaddr *sa)
871 {
872         if (conn_cb &&
873             rpc_cmp_addr((struct sockaddr *)&conn_cb->ksnr_addr, sa))
874                 return conn_cb->ksnr_connecting;
875         return 0;
876 }
877
878 int
879 ksocknal_create_conn(struct lnet_ni *ni, struct ksock_conn_cb *conn_cb,
880                      struct socket *sock, int type)
881 {
882         rwlock_t *global_lock = &ksocknal_data.ksnd_global_lock;
883         LIST_HEAD(zombies);
884         struct lnet_processid peerid;
885         u64 incarnation;
886         struct ksock_conn *conn;
887         struct ksock_conn *conn2;
888         struct ksock_peer_ni *peer_ni = NULL;
889         struct ksock_peer_ni *peer2;
890         struct ksock_sched *sched;
891         struct ksock_hello_msg *hello;
892         int cpt;
893         struct ksock_tx *tx;
894         struct ksock_tx *txtmp;
895         int rc;
896         int rc2;
897         int active;
898         int num_dup = 0;
899         char *warn = NULL;
900
901         active = (conn_cb != NULL);
902
903         LASSERT(active == (type != SOCKLND_CONN_NONE));
904
905         LIBCFS_ALLOC(conn, sizeof(*conn));
906         if (conn == NULL) {
907                 rc = -ENOMEM;
908                 goto failed_0;
909         }
910
911         conn->ksnc_peer = NULL;
912         conn->ksnc_conn_cb = NULL;
913         conn->ksnc_sock = sock;
914         /* 2 ref, 1 for conn, another extra ref prevents socket
915          * being closed before establishment of connection */
916         refcount_set(&conn->ksnc_sock_refcount, 2);
917         conn->ksnc_type = type;
918         ksocknal_lib_save_callback(sock, conn);
919         refcount_set(&conn->ksnc_conn_refcount, 1); /* 1 ref for me */
920
921         conn->ksnc_rx_ready = 0;
922         conn->ksnc_rx_scheduled = 0;
923
924         INIT_LIST_HEAD(&conn->ksnc_tx_queue);
925         conn->ksnc_tx_ready = 0;
926         conn->ksnc_tx_scheduled = 0;
927         conn->ksnc_tx_carrier = NULL;
928         atomic_set (&conn->ksnc_tx_nob, 0);
929
930         LIBCFS_ALLOC(hello, offsetof(struct ksock_hello_msg,
931                                      kshm_ips[LNET_INTERFACES_NUM]));
932         if (hello == NULL) {
933                 rc = -ENOMEM;
934                 goto failed_1;
935         }
936
937         /* stash conn's local and remote addrs */
938         rc = ksocknal_lib_get_conn_addrs(conn);
939         if (rc != 0)
940                 goto failed_1;
941
942         /* Find out/confirm peer_ni's NID and connection type and get the
943          * vector of interfaces she's willing to let me connect to.
944          * Passive connections use the listener timeout since the peer_ni sends
945          * eagerly
946          */
947         if (active) {
948                 struct sockaddr_in *psa = (void *)&conn->ksnc_peeraddr;
949
950                 peer_ni = conn_cb->ksnr_peer;
951                 LASSERT(ni == peer_ni->ksnp_ni);
952
953                 /* Active connection sends HELLO eagerly */
954                 hello->kshm_nips = 0;
955                 peerid = peer_ni->ksnp_id;
956
957                 write_lock_bh(global_lock);
958                 conn->ksnc_proto = peer_ni->ksnp_proto;
959                 write_unlock_bh(global_lock);
960
961                 if (conn->ksnc_proto == NULL) {
962                         if (psa->sin_family == AF_INET6)
963                                 conn->ksnc_proto = &ksocknal_protocol_v4x;
964                         else if (psa->sin_family == AF_INET)
965                                 conn->ksnc_proto = &ksocknal_protocol_v3x;
966 #if SOCKNAL_VERSION_DEBUG
967                         if (*ksocknal_tunables.ksnd_protocol == 2)
968                                 conn->ksnc_proto = &ksocknal_protocol_v2x;
969                         else if (*ksocknal_tunables.ksnd_protocol == 1)
970                                 conn->ksnc_proto = &ksocknal_protocol_v1x;
971 #endif
972                 }
973                 if (!conn->ksnc_proto) {
974                         rc = -EPROTO;
975                         goto failed_1;
976                 }
977
978                 rc = ksocknal_send_hello(ni, conn, &peerid.nid, hello);
979                 if (rc != 0)
980                         goto failed_1;
981         } else {
982                 peerid.nid = LNET_ANY_NID;
983                 peerid.pid = LNET_PID_ANY;
984
985                 /* Passive, get protocol from peer_ni */
986                 conn->ksnc_proto = NULL;
987         }
988
989         rc = ksocknal_recv_hello(ni, conn, hello, &peerid, &incarnation);
990         if (rc < 0)
991                 goto failed_1;
992
993         LASSERT(rc == 0 || active);
994         LASSERT(conn->ksnc_proto != NULL);
995         LASSERT(!LNET_NID_IS_ANY(&peerid.nid));
996
997         cpt = lnet_nid2cpt(&peerid.nid, ni);
998
999         if (active) {
1000                 ksocknal_peer_addref(peer_ni);
1001                 write_lock_bh(global_lock);
1002         } else {
1003                 peer_ni = ksocknal_create_peer(ni, &peerid);
1004                 if (IS_ERR(peer_ni)) {
1005                         rc = PTR_ERR(peer_ni);
1006                         goto failed_1;
1007                 }
1008
1009                 write_lock_bh(global_lock);
1010
1011                 /* called with a ref on ni, so shutdown can't have started */
1012                 LASSERT(atomic_read(&((struct ksock_net *)ni->ni_data)->ksnn_npeers) >= 0);
1013
1014                 peer2 = ksocknal_find_peer_locked(ni, &peerid);
1015                 if (peer2 == NULL) {
1016                         /* NB this puts an "empty" peer_ni in the peer_ni
1017                          * table (which takes my ref) */
1018                         hash_add(ksocknal_data.ksnd_peers,
1019                                  &peer_ni->ksnp_list, nidhash(&peerid.nid));
1020                 } else {
1021                         ksocknal_peer_decref(peer_ni);
1022                         peer_ni = peer2;
1023                 }
1024
1025                 /* +1 ref for me */
1026                 ksocknal_peer_addref(peer_ni);
1027                 peer_ni->ksnp_accepting++;
1028
1029                 /* Am I already connecting to this guy?  Resolve in
1030                  * favour of higher NID...
1031                  */
1032                 if (memcmp(&peerid.nid, &ni->ni_nid, sizeof(peerid.nid)) < 0 &&
1033                     ksocknal_connecting(peer_ni->ksnp_conn_cb,
1034                                         ((struct sockaddr *) &conn->ksnc_peeraddr))) {
1035                         rc = EALREADY;
1036                         warn = "connection race resolution";
1037                         goto failed_2;
1038                 }
1039         }
1040
1041         if (peer_ni->ksnp_closing ||
1042             (active && conn_cb->ksnr_deleted)) {
1043                 /* peer_ni/conn_cb got closed under me */
1044                 rc = -ESTALE;
1045                 warn = "peer_ni/conn_cb removed";
1046                 goto failed_2;
1047         }
1048
1049         if (peer_ni->ksnp_proto == NULL) {
1050                 /* Never connected before.
1051                  * NB recv_hello may have returned EPROTO to signal my peer_ni
1052                  * wants a different protocol than the one I asked for.
1053                  */
1054                 LASSERT(list_empty(&peer_ni->ksnp_conns));
1055
1056                 peer_ni->ksnp_proto = conn->ksnc_proto;
1057                 peer_ni->ksnp_incarnation = incarnation;
1058         }
1059
1060         if (peer_ni->ksnp_proto != conn->ksnc_proto ||
1061             peer_ni->ksnp_incarnation != incarnation) {
1062                 /* peer_ni rebooted or I've got the wrong protocol version */
1063                 ksocknal_close_peer_conns_locked(peer_ni, NULL, 0);
1064
1065                 peer_ni->ksnp_proto = NULL;
1066                 rc = ESTALE;
1067                 warn = peer_ni->ksnp_incarnation != incarnation ?
1068                         "peer_ni rebooted" :
1069                         "wrong proto version";
1070                 goto failed_2;
1071         }
1072
1073         switch (rc) {
1074         default:
1075                 LBUG();
1076         case 0:
1077                 break;
1078         case EALREADY:
1079                 warn = "lost conn race";
1080                 goto failed_2;
1081         case EPROTO:
1082                 warn = "retry with different protocol version";
1083                 goto failed_2;
1084         }
1085
1086         /* Refuse to duplicate an existing connection, unless this is a
1087          * loopback connection */
1088         if (!rpc_cmp_addr((struct sockaddr *)&conn->ksnc_peeraddr,
1089                           (struct sockaddr *)&conn->ksnc_myaddr)) {
1090                 list_for_each_entry(conn2, &peer_ni->ksnp_conns, ksnc_list) {
1091                         if (!rpc_cmp_addr(
1092                                     (struct sockaddr *)&conn2->ksnc_peeraddr,
1093                                     (struct sockaddr *)&conn->ksnc_peeraddr) ||
1094                             !rpc_cmp_addr(
1095                                     (struct sockaddr *)&conn2->ksnc_myaddr,
1096                                     (struct sockaddr *)&conn->ksnc_myaddr) ||
1097                             conn2->ksnc_type != conn->ksnc_type)
1098                                 continue;
1099
1100                         num_dup++;
1101                         /* If max conns per type is not registered in conn_cb
1102                          * as ksnr_max_conns, use ni's conns_per_peer
1103                          */
1104                         if ((peer_ni->ksnp_conn_cb &&
1105                             num_dup < peer_ni->ksnp_conn_cb->ksnr_max_conns) ||
1106                             (!peer_ni->ksnp_conn_cb &&
1107                             num_dup < ksocknal_get_conns_per_peer(peer_ni)))
1108                                 continue;
1109
1110                         /* Reply on a passive connection attempt so the peer_ni
1111                          * realises we're connected.
1112                          */
1113                         LASSERT(rc == 0);
1114                         if (!active)
1115                                 rc = EALREADY;
1116
1117                         warn = "duplicate";
1118                         goto failed_2;
1119                 }
1120         }
1121         /* If the connection created by this route didn't bind to the IP
1122          * address the route connected to, the connection/route matching
1123          * code below probably isn't going to work.
1124          */
1125         if (active &&
1126             !rpc_cmp_addr((struct sockaddr *)&conn_cb->ksnr_addr,
1127                           (struct sockaddr *)&conn->ksnc_peeraddr)) {
1128                 CERROR("Route %s %pISc connected to %pISc\n",
1129                        libcfs_idstr(&peer_ni->ksnp_id),
1130                        &conn_cb->ksnr_addr,
1131                        &conn->ksnc_peeraddr);
1132         }
1133
1134         /* Search for a conn_cb corresponding to the new connection and
1135          * create an association.  This allows incoming connections created
1136          * by conn_cbs in my peer_ni to match my own conn_cb entries so I don't
1137          * continually create duplicate conn_cbs.
1138          */
1139         conn_cb = peer_ni->ksnp_conn_cb;
1140
1141         if (conn_cb && rpc_cmp_addr((struct sockaddr *)&conn->ksnc_peeraddr,
1142                                     (struct sockaddr *)&conn_cb->ksnr_addr))
1143                 ksocknal_associate_cb_conn_locked(conn_cb, conn);
1144
1145         conn->ksnc_peer = peer_ni;                 /* conn takes my ref on peer_ni */
1146         peer_ni->ksnp_last_alive = ktime_get_seconds();
1147         peer_ni->ksnp_send_keepalive = 0;
1148         peer_ni->ksnp_error = 0;
1149
1150         sched = ksocknal_choose_scheduler_locked(cpt);
1151         if (!sched) {
1152                 CERROR("no schedulers available. node is unhealthy\n");
1153                 goto failed_2;
1154         }
1155         /*
1156          * The cpt might have changed if we ended up selecting a non cpt
1157          * native scheduler. So use the scheduler's cpt instead.
1158          */
1159         cpt = sched->kss_cpt;
1160         sched->kss_nconns++;
1161         conn->ksnc_scheduler = sched;
1162
1163         conn->ksnc_tx_last_post = ktime_get_seconds();
1164         /* Set the deadline for the outgoing HELLO to drain */
1165         conn->ksnc_tx_bufnob = sock->sk->sk_wmem_queued;
1166         conn->ksnc_tx_deadline = ktime_get_seconds() +
1167                                  ksocknal_timeout();
1168         smp_mb();   /* order with adding to peer_ni's conn list */
1169
1170         list_add(&conn->ksnc_list, &peer_ni->ksnp_conns);
1171         ksocknal_conn_addref(conn);
1172
1173         ksocknal_new_packet(conn, 0);
1174
1175         conn->ksnc_zc_capable = ksocknal_lib_zc_capable(conn);
1176
1177         /* Take packets blocking for this connection. */
1178         list_for_each_entry_safe(tx, txtmp, &peer_ni->ksnp_tx_queue, tx_list) {
1179                 if (conn->ksnc_proto->pro_match_tx(conn, tx, tx->tx_nonblk) ==
1180                     SOCKNAL_MATCH_NO)
1181                         continue;
1182
1183                 list_del(&tx->tx_list);
1184                 ksocknal_queue_tx_locked(tx, conn);
1185         }
1186
1187         write_unlock_bh(global_lock);
1188         /* We've now got a new connection.  Any errors from here on are just
1189          * like "normal" comms errors and we close the connection normally.
1190          * NB (a) we still have to send the reply HELLO for passive
1191          *        connections,
1192          *    (b) normal I/O on the conn is blocked until I setup and call the
1193          *        socket callbacks.
1194          */
1195
1196         CDEBUG(D_NET, "New conn %s p %d.x %pISc -> %pIScp"
1197                " incarnation:%lld sched[%d]\n",
1198                libcfs_idstr(&peerid), conn->ksnc_proto->pro_version,
1199                &conn->ksnc_myaddr, &conn->ksnc_peeraddr,
1200                incarnation, cpt);
1201
1202         if (!active) {
1203                 hello->kshm_nips = 0;
1204                 rc = ksocknal_send_hello(ni, conn, &peerid.nid, hello);
1205         }
1206
1207         LIBCFS_FREE(hello, offsetof(struct ksock_hello_msg,
1208                                     kshm_ips[LNET_INTERFACES_NUM]));
1209
1210         /* setup the socket AFTER I've received hello (it disables
1211          * SO_LINGER).  I might call back to the acceptor who may want
1212          * to send a protocol version response and then close the
1213          * socket; this ensures the socket only tears down after the
1214          * response has been sent.
1215          */
1216         if (rc == 0)
1217                 rc = ksocknal_lib_setup_sock(sock);
1218
1219         write_lock_bh(global_lock);
1220
1221         /* NB my callbacks block while I hold ksnd_global_lock */
1222         ksocknal_lib_set_callback(sock, conn);
1223
1224         if (!active)
1225                 peer_ni->ksnp_accepting--;
1226
1227         write_unlock_bh(global_lock);
1228
1229         if (rc != 0) {
1230                 write_lock_bh(global_lock);
1231                 if (!conn->ksnc_closing) {
1232                         /* could be closed by another thread */
1233                         ksocknal_close_conn_locked(conn, rc);
1234                 }
1235                 write_unlock_bh(global_lock);
1236         } else if (ksocknal_connsock_addref(conn) == 0) {
1237                 /* Allow I/O to proceed. */
1238                 ksocknal_read_callback(conn);
1239                 ksocknal_write_callback(conn);
1240                 ksocknal_connsock_decref(conn);
1241         }
1242
1243         ksocknal_connsock_decref(conn);
1244         ksocknal_conn_decref(conn);
1245         return rc;
1246
1247 failed_2:
1248
1249         if (!peer_ni->ksnp_closing &&
1250             list_empty(&peer_ni->ksnp_conns) &&
1251             peer_ni->ksnp_conn_cb == NULL) {
1252                 list_splice_init(&peer_ni->ksnp_tx_queue, &zombies);
1253                 ksocknal_unlink_peer_locked(peer_ni);
1254         }
1255
1256         write_unlock_bh(global_lock);
1257
1258         if (warn != NULL) {
1259                 if (rc < 0)
1260                         CERROR("Not creating conn %s type %d: %s\n",
1261                                libcfs_idstr(&peerid), conn->ksnc_type, warn);
1262                 else
1263                         CDEBUG(D_NET, "Not creating conn %s type %d: %s\n",
1264                                libcfs_idstr(&peerid), conn->ksnc_type, warn);
1265         }
1266
1267         if (!active) {
1268                 if (rc > 0) {
1269                         /* Request retry by replying with CONN_NONE
1270                          * ksnc_proto has been set already
1271                          */
1272                         conn->ksnc_type = SOCKLND_CONN_NONE;
1273                         hello->kshm_nips = 0;
1274                         ksocknal_send_hello(ni, conn, &peerid.nid, hello);
1275                 }
1276
1277                 write_lock_bh(global_lock);
1278                 peer_ni->ksnp_accepting--;
1279                 write_unlock_bh(global_lock);
1280         }
1281
1282         /*
1283          * If we get here without an error code, just use -EALREADY.
1284          * Depending on how we got here, the error may be positive
1285          * or negative. Normalize the value for ksocknal_txlist_done().
1286          */
1287         rc2 = (rc == 0 ? -EALREADY : (rc > 0 ? -rc : rc));
1288         ksocknal_txlist_done(ni, &zombies, rc2);
1289         ksocknal_peer_decref(peer_ni);
1290
1291 failed_1:
1292         if (hello != NULL)
1293                 LIBCFS_FREE(hello, offsetof(struct ksock_hello_msg,
1294                                             kshm_ips[LNET_INTERFACES_NUM]));
1295
1296         LIBCFS_FREE(conn, sizeof(*conn));
1297
1298 failed_0:
1299         sock_release(sock);
1300
1301         return rc;
1302 }
1303
1304 void
1305 ksocknal_close_conn_locked(struct ksock_conn *conn, int error)
1306 {
1307         /* This just does the immmediate housekeeping, and queues the
1308          * connection for the reaper to terminate.
1309          * Caller holds ksnd_global_lock exclusively in irq context */
1310         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1311         struct ksock_conn_cb *conn_cb;
1312         struct ksock_conn *conn2;
1313         int conn_count;
1314         int duplicate_count = 0;
1315
1316         LASSERT(peer_ni->ksnp_error == 0);
1317         LASSERT(!conn->ksnc_closing);
1318         conn->ksnc_closing = 1;
1319
1320         /* ksnd_deathrow_conns takes over peer_ni's ref */
1321         list_del(&conn->ksnc_list);
1322
1323         conn_cb = conn->ksnc_conn_cb;
1324         if (conn_cb != NULL) {
1325                 /* dissociate conn from cb... */
1326                 LASSERT(!conn_cb->ksnr_deleted);
1327
1328                 conn_count = ksocknal_get_conn_count_by_type(conn_cb,
1329                                                              conn->ksnc_type);
1330                 /* connected bit is set only if all connections
1331                  * of the given type got created
1332                  */
1333                 if (conn_count == conn_cb->ksnr_max_conns)
1334                         LASSERT((conn_cb->ksnr_connected &
1335                                 BIT(conn->ksnc_type)) != 0);
1336
1337                 if (conn_count == 1) {
1338                         list_for_each_entry(conn2, &peer_ni->ksnp_conns,
1339                                             ksnc_list) {
1340                                 if (conn2->ksnc_conn_cb == conn_cb &&
1341                                     conn2->ksnc_type == conn->ksnc_type)
1342                                         duplicate_count += 1;
1343                         }
1344                         if (duplicate_count > 0)
1345                                 CERROR("Found %d duplicate conns type %d\n",
1346                                        duplicate_count,
1347                                        conn->ksnc_type);
1348                 }
1349                 ksocknal_decr_conn_count(conn_cb, conn->ksnc_type);
1350
1351                 conn->ksnc_conn_cb = NULL;
1352
1353                 /* drop conn's ref on conn_cb */
1354                 ksocknal_conn_cb_decref(conn_cb);
1355         }
1356
1357         if (list_empty(&peer_ni->ksnp_conns)) {
1358                 /* No more connections to this peer_ni */
1359
1360                 if (!list_empty(&peer_ni->ksnp_tx_queue)) {
1361                         struct ksock_tx *tx;
1362
1363                         LASSERT(conn->ksnc_proto == &ksocknal_protocol_v3x);
1364
1365                         /* throw them to the last connection...,
1366                          * these TXs will be send to /dev/null by scheduler */
1367                         list_for_each_entry(tx, &peer_ni->ksnp_tx_queue,
1368                                             tx_list)
1369                                 ksocknal_tx_prep(conn, tx);
1370
1371                         spin_lock_bh(&conn->ksnc_scheduler->kss_lock);
1372                         list_splice_init(&peer_ni->ksnp_tx_queue,
1373                                          &conn->ksnc_tx_queue);
1374                         spin_unlock_bh(&conn->ksnc_scheduler->kss_lock);
1375                 }
1376
1377                 /* renegotiate protocol version */
1378                 peer_ni->ksnp_proto = NULL;
1379                 /* stash last conn close reason */
1380                 peer_ni->ksnp_error = error;
1381
1382                 if (peer_ni->ksnp_conn_cb == NULL) {
1383                         /* I've just closed last conn belonging to a
1384                          * peer_ni with no connections to it
1385                          */
1386                         ksocknal_unlink_peer_locked(peer_ni);
1387                 }
1388         }
1389
1390         spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
1391
1392         list_add_tail(&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
1393         wake_up(&ksocknal_data.ksnd_reaper_waitq);
1394
1395         spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
1396 }
1397
1398 void
1399 ksocknal_peer_failed(struct ksock_peer_ni *peer_ni)
1400 {
1401         bool notify = false;
1402         time64_t last_alive = 0;
1403
1404         /* There has been a connection failure or comms error; but I'll only
1405          * tell LNET I think the peer_ni is dead if it's to another kernel and
1406          * there are no connections or connection attempts in existence. */
1407
1408         read_lock(&ksocknal_data.ksnd_global_lock);
1409
1410         if ((peer_ni->ksnp_id.pid & LNET_PID_USERFLAG) == 0 &&
1411              list_empty(&peer_ni->ksnp_conns) &&
1412              peer_ni->ksnp_accepting == 0 &&
1413              !ksocknal_find_connecting_conn_cb_locked(peer_ni)) {
1414                 notify = true;
1415                 last_alive = peer_ni->ksnp_last_alive;
1416         }
1417
1418         read_unlock(&ksocknal_data.ksnd_global_lock);
1419
1420         if (notify)
1421                 lnet_notify(peer_ni->ksnp_ni,
1422                             &peer_ni->ksnp_id.nid,
1423                             false, false, last_alive);
1424 }
1425
1426 void
1427 ksocknal_finalize_zcreq(struct ksock_conn *conn)
1428 {
1429         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1430         struct ksock_tx *tx;
1431         struct ksock_tx *tmp;
1432         LIST_HEAD(zlist);
1433
1434         /* NB safe to finalize TXs because closing of socket will
1435          * abort all buffered data */
1436         LASSERT(conn->ksnc_sock == NULL);
1437
1438         spin_lock(&peer_ni->ksnp_lock);
1439
1440         list_for_each_entry_safe(tx, tmp, &peer_ni->ksnp_zc_req_list,
1441                                  tx_zc_list) {
1442                 if (tx->tx_conn != conn)
1443                         continue;
1444
1445                 LASSERT(tx->tx_msg.ksm_zc_cookies[0] != 0);
1446
1447                 tx->tx_msg.ksm_zc_cookies[0] = 0;
1448                 tx->tx_zc_aborted = 1;  /* mark it as not-acked */
1449                 list_move(&tx->tx_zc_list, &zlist);
1450         }
1451
1452         spin_unlock(&peer_ni->ksnp_lock);
1453
1454         while ((tx = list_first_entry_or_null(&zlist, struct ksock_tx,
1455                                               tx_zc_list)) != NULL) {
1456                 list_del(&tx->tx_zc_list);
1457                 ksocknal_tx_decref(tx);
1458         }
1459 }
1460
1461 void
1462 ksocknal_terminate_conn(struct ksock_conn *conn)
1463 {
1464         /* This gets called by the reaper (guaranteed thread context) to
1465          * disengage the socket from its callbacks and close it.
1466          * ksnc_refcount will eventually hit zero, and then the reaper will
1467          * destroy it.
1468          */
1469         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1470         struct ksock_sched *sched = conn->ksnc_scheduler;
1471         bool failed = false;
1472
1473         LASSERT(conn->ksnc_closing);
1474
1475         /* wake up the scheduler to "send" all remaining packets to /dev/null */
1476         spin_lock_bh(&sched->kss_lock);
1477
1478         /* a closing conn is always ready to tx */
1479         conn->ksnc_tx_ready = 1;
1480
1481         if (!conn->ksnc_tx_scheduled &&
1482             !list_empty(&conn->ksnc_tx_queue)) {
1483                 list_add_tail(&conn->ksnc_tx_list,
1484                               &sched->kss_tx_conns);
1485                 conn->ksnc_tx_scheduled = 1;
1486                 /* extra ref for scheduler */
1487                 ksocknal_conn_addref(conn);
1488
1489                 wake_up(&sched->kss_waitq);
1490         }
1491
1492         spin_unlock_bh(&sched->kss_lock);
1493
1494         /* serialise with callbacks */
1495         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1496
1497         ksocknal_lib_reset_callback(conn->ksnc_sock, conn);
1498
1499         /* OK, so this conn may not be completely disengaged from its
1500          * scheduler yet, but it _has_ committed to terminate...
1501          */
1502         conn->ksnc_scheduler->kss_nconns--;
1503
1504         if (peer_ni->ksnp_error != 0) {
1505                 /* peer_ni's last conn closed in error */
1506                 LASSERT(list_empty(&peer_ni->ksnp_conns));
1507                 failed = true;
1508                 peer_ni->ksnp_error = 0;     /* avoid multiple notifications */
1509         }
1510
1511         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1512
1513         if (failed)
1514                 ksocknal_peer_failed(peer_ni);
1515
1516         /* The socket is closed on the final put; either here, or in
1517          * ksocknal_{send,recv}msg().  Since we set up the linger2 option
1518          * when the connection was established, this will close the socket
1519          * immediately, aborting anything buffered in it. Any hung
1520          * zero-copy transmits will therefore complete in finite time.
1521          */
1522         ksocknal_connsock_decref(conn);
1523 }
1524
1525 void
1526 ksocknal_queue_zombie_conn(struct ksock_conn *conn)
1527 {
1528         /* Queue the conn for the reaper to destroy */
1529         LASSERT(refcount_read(&conn->ksnc_conn_refcount) == 0);
1530         spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
1531
1532         list_add_tail(&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1533         wake_up(&ksocknal_data.ksnd_reaper_waitq);
1534
1535         spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
1536 }
1537
1538 void
1539 ksocknal_destroy_conn(struct ksock_conn *conn)
1540 {
1541         time64_t last_rcv;
1542
1543         /* Final coup-de-grace of the reaper */
1544         CDEBUG(D_NET, "connection %p\n", conn);
1545
1546         LASSERT(refcount_read(&conn->ksnc_conn_refcount) == 0);
1547         LASSERT(refcount_read(&conn->ksnc_sock_refcount) == 0);
1548         LASSERT(conn->ksnc_sock == NULL);
1549         LASSERT(conn->ksnc_conn_cb == NULL);
1550         LASSERT(!conn->ksnc_tx_scheduled);
1551         LASSERT(!conn->ksnc_rx_scheduled);
1552         LASSERT(list_empty(&conn->ksnc_tx_queue));
1553
1554         /* complete current receive if any */
1555         switch (conn->ksnc_rx_state) {
1556         case SOCKNAL_RX_LNET_PAYLOAD:
1557                 last_rcv = conn->ksnc_rx_deadline -
1558                            ksocknal_timeout();
1559                 CERROR("Completing partial receive from %s[%d], ip %pIScp, with error, wanted: %d, left: %d, last alive is %lld secs ago\n",
1560                        libcfs_idstr(&conn->ksnc_peer->ksnp_id),
1561                        conn->ksnc_type,
1562                        &conn->ksnc_peeraddr,
1563                        conn->ksnc_rx_nob_wanted, conn->ksnc_rx_nob_left,
1564                        ktime_get_seconds() - last_rcv);
1565                 if (conn->ksnc_lnet_msg)
1566                         conn->ksnc_lnet_msg->msg_health_status =
1567                                 LNET_MSG_STATUS_REMOTE_ERROR;
1568                 lnet_finalize(conn->ksnc_lnet_msg, -EIO);
1569                 break;
1570         case SOCKNAL_RX_LNET_HEADER:
1571                 if (conn->ksnc_rx_started)
1572                         CERROR("Incomplete receive of lnet header from %s, ip %pIScp, with error, protocol: %d.x.\n",
1573                                libcfs_idstr(&conn->ksnc_peer->ksnp_id),
1574                                &conn->ksnc_peeraddr,
1575                                conn->ksnc_proto->pro_version);
1576                 break;
1577         case SOCKNAL_RX_KSM_HEADER:
1578                 if (conn->ksnc_rx_started)
1579                         CERROR("Incomplete receive of ksock message from %s, ip %pIScp, with error, protocol: %d.x.\n",
1580                                libcfs_idstr(&conn->ksnc_peer->ksnp_id),
1581                                &conn->ksnc_peeraddr,
1582                                conn->ksnc_proto->pro_version);
1583                 break;
1584         case SOCKNAL_RX_SLOP:
1585                 if (conn->ksnc_rx_started)
1586                         CERROR("Incomplete receive of slops from %s, ip %pIScp, with error\n",
1587                                libcfs_idstr(&conn->ksnc_peer->ksnp_id),
1588                                &conn->ksnc_peeraddr);
1589                 break;
1590         default:
1591                 LBUG();
1592                 break;
1593         }
1594
1595         ksocknal_peer_decref(conn->ksnc_peer);
1596
1597         LIBCFS_FREE(conn, sizeof(*conn));
1598 }
1599
1600 int
1601 ksocknal_close_peer_conns_locked(struct ksock_peer_ni *peer_ni,
1602                                  struct sockaddr *addr, int why)
1603 {
1604         struct ksock_conn *conn;
1605         struct ksock_conn *cnxt;
1606         int count = 0;
1607
1608         list_for_each_entry_safe(conn, cnxt, &peer_ni->ksnp_conns, ksnc_list) {
1609                 if (!addr ||
1610                     rpc_cmp_addr(addr,
1611                                  (struct sockaddr *)&conn->ksnc_peeraddr)) {
1612                         count++;
1613                         ksocknal_close_conn_locked(conn, why);
1614                 }
1615         }
1616
1617         return count;
1618 }
1619
1620 int
1621 ksocknal_close_conn_and_siblings(struct ksock_conn *conn, int why)
1622 {
1623         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1624         int count;
1625
1626         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1627
1628         count = ksocknal_close_peer_conns_locked(
1629                 peer_ni, (struct sockaddr *)&conn->ksnc_peeraddr, why);
1630
1631         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1632
1633         return count;
1634 }
1635
1636 int
1637 ksocknal_close_matching_conns(struct lnet_processid *id, __u32 ipaddr)
1638 {
1639         struct ksock_peer_ni *peer_ni;
1640         struct hlist_node *pnxt;
1641         int lo;
1642         int hi;
1643         int i;
1644         int count = 0;
1645         struct sockaddr_in sa = {.sin_family = AF_INET};
1646
1647         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1648
1649         if (!LNET_NID_IS_ANY(&id->nid)) {
1650                 lo = hash_min(nidhash(&id->nid),
1651                               HASH_BITS(ksocknal_data.ksnd_peers));
1652                 hi = lo;
1653         } else {
1654                 lo = 0;
1655                 hi = HASH_SIZE(ksocknal_data.ksnd_peers) - 1;
1656         }
1657
1658         sa.sin_addr.s_addr = htonl(ipaddr);
1659         for (i = lo; i <= hi; i++) {
1660                 hlist_for_each_entry_safe(peer_ni, pnxt,
1661                                           &ksocknal_data.ksnd_peers[i],
1662                                           ksnp_list) {
1663
1664                         if (!((LNET_NID_IS_ANY(&id->nid) ||
1665                                nid_same(&id->nid, &peer_ni->ksnp_id.nid)) &&
1666                               (id->pid == LNET_PID_ANY ||
1667                                id->pid == peer_ni->ksnp_id.pid)))
1668                                 continue;
1669
1670                         count += ksocknal_close_peer_conns_locked(
1671                                 peer_ni,
1672                                 ipaddr ? (struct sockaddr *)&sa : NULL, 0);
1673                 }
1674         }
1675
1676         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1677
1678         /* wildcards always succeed */
1679         if (LNET_NID_IS_ANY(&id->nid) || id->pid == LNET_PID_ANY ||
1680             ipaddr == 0)
1681                 return 0;
1682
1683         return (count == 0 ? -ENOENT : 0);
1684 }
1685
1686 static void
1687 ksocknal_notify_gw_down(struct lnet_nid *gw_nid)
1688 {
1689         /* The router is telling me she's been notified of a change in
1690          * gateway state....
1691          */
1692         struct lnet_processid id = {
1693                 .pid    = LNET_PID_ANY,
1694                 .nid    = *gw_nid,
1695         };
1696
1697         CDEBUG(D_NET, "gw %s down\n", libcfs_nidstr(gw_nid));
1698
1699         /* If the gateway crashed, close all open connections... */
1700         ksocknal_close_matching_conns(&id, 0);
1701         return;
1702
1703         /* We can only establish new connections
1704          * if we have autroutes, and these connect on demand.
1705          */
1706 }
1707
1708 static void
1709 ksocknal_push_peer(struct ksock_peer_ni *peer_ni)
1710 {
1711         int index;
1712         int i;
1713         struct ksock_conn *conn;
1714
1715         for (index = 0; ; index++) {
1716                 read_lock(&ksocknal_data.ksnd_global_lock);
1717
1718                 i = 0;
1719                 conn = NULL;
1720
1721                 list_for_each_entry(conn, &peer_ni->ksnp_conns, ksnc_list) {
1722                         if (i++ == index) {
1723                                 ksocknal_conn_addref(conn);
1724                                 break;
1725                         }
1726                 }
1727
1728                 read_unlock(&ksocknal_data.ksnd_global_lock);
1729
1730                 if (i <= index)
1731                         break;
1732
1733                 ksocknal_lib_push_conn (conn);
1734                 ksocknal_conn_decref(conn);
1735         }
1736 }
1737
1738 static int
1739 ksocknal_push(struct lnet_ni *ni, struct lnet_processid *id)
1740 {
1741         int lo;
1742         int hi;
1743         int bkt;
1744         int rc = -ENOENT;
1745
1746         if (!LNET_NID_IS_ANY(&id->nid)) {
1747                 lo = hash_min(nidhash(&id->nid),
1748                               HASH_BITS(ksocknal_data.ksnd_peers));
1749                 hi = lo;
1750         } else {
1751                 lo = 0;
1752                 hi = HASH_SIZE(ksocknal_data.ksnd_peers) - 1;
1753         }
1754
1755         for (bkt = lo; bkt <= hi; bkt++) {
1756                 int peer_off; /* searching offset in peer_ni hash table */
1757
1758                 for (peer_off = 0; ; peer_off++) {
1759                         struct ksock_peer_ni *peer_ni;
1760                         int           i = 0;
1761
1762                         read_lock(&ksocknal_data.ksnd_global_lock);
1763                         hlist_for_each_entry(peer_ni,
1764                                              &ksocknal_data.ksnd_peers[bkt],
1765                                              ksnp_list) {
1766                                 if (!((LNET_NID_IS_ANY(&id->nid) ||
1767                                        nid_same(&id->nid,
1768                                                  &peer_ni->ksnp_id.nid)) &&
1769                                       (id->pid == LNET_PID_ANY ||
1770                                        id->pid == peer_ni->ksnp_id.pid)))
1771                                         continue;
1772
1773                                 if (i++ == peer_off) {
1774                                         ksocknal_peer_addref(peer_ni);
1775                                         break;
1776                                 }
1777                         }
1778                         read_unlock(&ksocknal_data.ksnd_global_lock);
1779
1780                         if (i <= peer_off) /* no match */
1781                                 break;
1782
1783                         rc = 0;
1784                         ksocknal_push_peer(peer_ni);
1785                         ksocknal_peer_decref(peer_ni);
1786                 }
1787         }
1788         return rc;
1789 }
1790
1791 int
1792 ksocknal_ctl(struct lnet_ni *ni, unsigned int cmd, void *arg)
1793 {
1794         struct lnet_processid id = {};
1795         struct libcfs_ioctl_data *data = arg;
1796         int rc;
1797
1798         switch(cmd) {
1799         case IOC_LIBCFS_GET_INTERFACE: {
1800                 struct ksock_net *net = ni->ni_data;
1801                 struct ksock_interface *iface;
1802                 struct sockaddr_in *sa;
1803
1804                 read_lock(&ksocknal_data.ksnd_global_lock);
1805
1806                 if (data->ioc_count >= 1) {
1807                         rc = -ENOENT;
1808                 } else {
1809                         rc = 0;
1810                         iface = &net->ksnn_interface;
1811
1812                         sa = (void *)&iface->ksni_addr;
1813                         if (sa->sin_family == AF_INET) {
1814                                 data->ioc_u32[0] = ntohl(sa->sin_addr.s_addr);
1815                                 data->ioc_u32[1] = iface->ksni_netmask;
1816                         } else {
1817                                 data->ioc_u32[0] = 0xFFFFFFFF;
1818                                 data->ioc_u32[1] = 0;
1819                         }
1820                         data->ioc_u32[2] = iface->ksni_npeers;
1821                         data->ioc_u32[3] = iface->ksni_nroutes;
1822                 }
1823
1824                 read_unlock(&ksocknal_data.ksnd_global_lock);
1825                 return rc;
1826         }
1827
1828         case IOC_LIBCFS_GET_PEER: {
1829                 __u32            myip = 0;
1830                 __u32            ip = 0;
1831                 int              port = 0;
1832                 int              conn_count = 0;
1833                 int              share_count = 0;
1834
1835                 rc = ksocknal_get_peer_info(ni, data->ioc_count,
1836                                             &id, &myip, &ip, &port,
1837                                             &conn_count,  &share_count);
1838                 if (rc != 0)
1839                         return rc;
1840
1841                 if (!nid_is_nid4(&id.nid))
1842                         return -EINVAL;
1843                 data->ioc_nid    = lnet_nid_to_nid4(&id.nid);
1844                 data->ioc_count  = share_count;
1845                 data->ioc_u32[0] = ip;
1846                 data->ioc_u32[1] = port;
1847                 data->ioc_u32[2] = myip;
1848                 data->ioc_u32[3] = conn_count;
1849                 data->ioc_u32[4] = id.pid;
1850                 return 0;
1851         }
1852
1853         case IOC_LIBCFS_ADD_PEER: {
1854                 struct sockaddr_in sa = {.sin_family = AF_INET};
1855
1856                 id.pid = LNET_PID_LUSTRE;
1857                 lnet_nid4_to_nid(data->ioc_nid, &id.nid);
1858                 sa.sin_addr.s_addr = htonl(data->ioc_u32[0]);
1859                 sa.sin_port = htons(data->ioc_u32[1]);
1860                 return ksocknal_add_peer(ni, &id, (struct sockaddr *)&sa);
1861         }
1862         case IOC_LIBCFS_DEL_PEER:
1863                 lnet_nid4_to_nid(data->ioc_nid, &id.nid);
1864                 id.pid = LNET_PID_ANY;
1865                 return ksocknal_del_peer(ni, &id);
1866
1867         case IOC_LIBCFS_GET_CONN: {
1868                 int           txmem;
1869                 int           rxmem;
1870                 int           nagle;
1871                 struct ksock_conn *conn = ksocknal_get_conn_by_idx(ni, data->ioc_count);
1872                 struct sockaddr_in *psa = (void *)&conn->ksnc_peeraddr;
1873                 struct sockaddr_in *mysa = (void *)&conn->ksnc_myaddr;
1874
1875                 if (conn == NULL)
1876                         return -ENOENT;
1877
1878                 ksocknal_lib_get_conn_tunables(conn, &txmem, &rxmem, &nagle);
1879
1880                 data->ioc_count = txmem;
1881                 data->ioc_nid = lnet_nid_to_nid4(&conn->ksnc_peer->ksnp_id.nid);
1882                 data->ioc_flags = nagle;
1883                 if (psa->sin_family == AF_INET)
1884                         data->ioc_u32[0] = ntohl(psa->sin_addr.s_addr);
1885                 else
1886                         data->ioc_u32[0] = 0xFFFFFFFF;
1887                 data->ioc_u32[1] = rpc_get_port((struct sockaddr *)
1888                                                 &conn->ksnc_peeraddr);
1889                 if (mysa->sin_family == AF_INET)
1890                         data->ioc_u32[2] = ntohl(mysa->sin_addr.s_addr);
1891                 else
1892                         data->ioc_u32[2] = 0xFFFFFFFF;
1893                 data->ioc_u32[3] = conn->ksnc_type;
1894                 data->ioc_u32[4] = conn->ksnc_scheduler->kss_cpt;
1895                 data->ioc_u32[5] = rxmem;
1896                 data->ioc_u32[6] = conn->ksnc_peer->ksnp_id.pid;
1897                 ksocknal_conn_decref(conn);
1898                 return 0;
1899         }
1900
1901         case IOC_LIBCFS_CLOSE_CONNECTION:
1902                 lnet_nid4_to_nid(data->ioc_nid, &id.nid);
1903                 id.pid = LNET_PID_ANY;
1904                 return ksocknal_close_matching_conns(&id,
1905                                                      data->ioc_u32[0]);
1906
1907         case IOC_LIBCFS_REGISTER_MYNID:
1908                 /* Ignore if this is a noop */
1909                 if (nid_is_nid4(&ni->ni_nid) &&
1910                     data->ioc_nid == lnet_nid_to_nid4(&ni->ni_nid))
1911                         return 0;
1912
1913                 CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID: %s(%s)\n",
1914                        libcfs_nid2str(data->ioc_nid),
1915                        libcfs_nidstr(&ni->ni_nid));
1916                 return -EINVAL;
1917
1918         case IOC_LIBCFS_PUSH_CONNECTION:
1919                 lnet_nid4_to_nid(data->ioc_nid, &id.nid);
1920                 id.pid = LNET_PID_ANY;
1921                 return ksocknal_push(ni, &id);
1922
1923         default:
1924                 return -EINVAL;
1925         }
1926         /* not reached */
1927 }
1928
1929 static void
1930 ksocknal_free_buffers (void)
1931 {
1932         LASSERT (atomic_read(&ksocknal_data.ksnd_nactive_txs) == 0);
1933
1934         if (ksocknal_data.ksnd_schedulers != NULL)
1935                 cfs_percpt_free(ksocknal_data.ksnd_schedulers);
1936
1937         spin_lock(&ksocknal_data.ksnd_tx_lock);
1938
1939         if (!list_empty(&ksocknal_data.ksnd_idle_noop_txs)) {
1940                 LIST_HEAD(zlist);
1941                 struct ksock_tx *tx;
1942
1943                 list_splice_init(&ksocknal_data.ksnd_idle_noop_txs, &zlist);
1944                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
1945
1946                 while ((tx = list_first_entry_or_null(&zlist, struct ksock_tx,
1947                                                       tx_list)) != NULL) {
1948                         list_del(&tx->tx_list);
1949                         LIBCFS_FREE(tx, tx->tx_desc_size);
1950                 }
1951         } else {
1952                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
1953         }
1954 }
1955
1956 static int
1957 ksocknal_handle_link_state_change(struct net_device *dev,
1958                                   unsigned char operstate)
1959 {
1960         struct lnet_ni *ni = NULL;
1961         struct ksock_net *net;
1962         struct ksock_net *cnxt;
1963         int ifindex;
1964         unsigned char link_down = !(operstate == IF_OPER_UP);
1965         struct in_device *in_dev;
1966         bool found_ip = false;
1967         struct ksock_interface *ksi = NULL;
1968         struct sockaddr_in *sa;
1969         __u32 ni_state_before;
1970         bool update_ping_buf = false;
1971         int state;
1972         DECLARE_CONST_IN_IFADDR(ifa);
1973
1974         ifindex = dev->ifindex;
1975
1976         if (!ksocknal_data.ksnd_nnets)
1977                 goto out;
1978
1979         list_for_each_entry_safe(net, cnxt, &ksocknal_data.ksnd_nets,
1980                                  ksnn_list) {
1981
1982                 ksi = &net->ksnn_interface;
1983                 sa = (void *)&ksi->ksni_addr;
1984                 found_ip = false;
1985
1986                 if (strcmp(ksi->ksni_name, dev->name))
1987                         continue;
1988
1989                 if (ksi->ksni_index == -1) {
1990                         if (dev->reg_state != NETREG_REGISTERED)
1991                                 continue;
1992                         /* A registration just happened: save the new index for
1993                          * the device */
1994                         ksi->ksni_index = ifindex;
1995                         goto out;
1996                 }
1997
1998                 if (ksi->ksni_index != ifindex)
1999                         continue;
2000
2001                 if (dev->reg_state == NETREG_UNREGISTERING) {
2002                         /* Device is being unregistered, we need to clear the
2003                          * index, it can change when device will be back */
2004                         ksi->ksni_index = -1;
2005                         goto out;
2006                 }
2007
2008                 ni = net->ksnn_ni;
2009
2010                 in_dev = __in_dev_get_rtnl(dev);
2011                 if (!in_dev) {
2012                         CDEBUG(D_NET, "Interface %s has no IPv4 status.\n",
2013                                dev->name);
2014                         ni_state_before = lnet_set_link_fatal_state(ni, 1);
2015                         goto ni_done;
2016                 }
2017                 in_dev_for_each_ifa_rtnl(ifa, in_dev) {
2018                         if (sa->sin_addr.s_addr == ifa->ifa_local)
2019                                 found_ip = true;
2020                 }
2021                 endfor_ifa(in_dev);
2022
2023                 if (!found_ip) {
2024                         CDEBUG(D_NET, "Interface %s has no matching ip\n",
2025                                dev->name);
2026                         ni_state_before = lnet_set_link_fatal_state(ni, 1);
2027                         goto ni_done;
2028                 }
2029
2030                 if (link_down) {
2031                         ni_state_before = lnet_set_link_fatal_state(ni, 1);
2032                 } else {
2033                         state = (lnet_get_link_status(dev) == 0);
2034                         ni_state_before = lnet_set_link_fatal_state(ni,
2035                                                                     state);
2036                 }
2037 ni_done:
2038                 if (!update_ping_buf &&
2039                     (ni->ni_state == LNET_NI_STATE_ACTIVE) &&
2040                     (atomic_read(&ni->ni_fatal_error_on) != ni_state_before))
2041                         update_ping_buf = true;
2042         }
2043
2044         if (update_ping_buf)
2045                 lnet_mark_ping_buffer_for_update();
2046 out:
2047         return 0;
2048 }
2049
2050
2051 static int
2052 ksocknal_handle_inetaddr_change(struct in_ifaddr *ifa, unsigned long event)
2053 {
2054         struct lnet_ni *ni = NULL;
2055         struct ksock_net *net;
2056         struct ksock_net *cnxt;
2057         struct net_device *event_netdev = ifa->ifa_dev->dev;
2058         int ifindex;
2059         struct ksock_interface *ksi = NULL;
2060         struct sockaddr_in *sa;
2061         __u32 ni_state_before;
2062         bool update_ping_buf = false;
2063         bool link_down;
2064
2065         if (!ksocknal_data.ksnd_nnets)
2066                 goto out;
2067
2068         ifindex = event_netdev->ifindex;
2069
2070         list_for_each_entry_safe(net, cnxt, &ksocknal_data.ksnd_nets,
2071                                  ksnn_list) {
2072
2073                 ksi = &net->ksnn_interface;
2074                 sa = (void *)&ksi->ksni_addr;
2075
2076                 if (ksi->ksni_index != ifindex ||
2077                     strcmp(ksi->ksni_name, event_netdev->name))
2078                         continue;
2079
2080                 if (sa->sin_addr.s_addr == ifa->ifa_local) {
2081                         ni = net->ksnn_ni;
2082                         link_down = (event == NETDEV_DOWN);
2083                         ni_state_before = lnet_set_link_fatal_state(ni,
2084                                                                     link_down);
2085
2086                         if (!update_ping_buf &&
2087                             (ni->ni_state == LNET_NI_STATE_ACTIVE) &&
2088                             ((event == NETDEV_DOWN) != ni_state_before))
2089                                 update_ping_buf = true;
2090                 }
2091         }
2092
2093         if (update_ping_buf)
2094                 lnet_mark_ping_buffer_for_update();
2095 out:
2096         return 0;
2097 }
2098
2099 /************************************
2100  * Net device notifier event handler
2101  ************************************/
2102 static int ksocknal_device_event(struct notifier_block *unused,
2103                                  unsigned long event, void *ptr)
2104 {
2105         struct net_device *dev = netdev_notifier_info_to_dev(ptr);
2106         unsigned char operstate;
2107
2108         operstate = dev->operstate;
2109
2110         CDEBUG(D_NET, "devevent: status=%ld, iface=%s ifindex %d state %u\n",
2111                event, dev->name, dev->ifindex, operstate);
2112
2113         switch (event) {
2114         case NETDEV_UP:
2115         case NETDEV_DOWN:
2116         case NETDEV_CHANGE:
2117         case NETDEV_REGISTER:
2118         case NETDEV_UNREGISTER:
2119                 ksocknal_handle_link_state_change(dev, operstate);
2120                 break;
2121         }
2122
2123         return NOTIFY_OK;
2124 }
2125
2126 /************************************
2127  * Inetaddr notifier event handler
2128  ************************************/
2129 static int ksocknal_inetaddr_event(struct notifier_block *unused,
2130                                    unsigned long event, void *ptr)
2131 {
2132         struct in_ifaddr *ifa = ptr;
2133
2134         CDEBUG(D_NET, "addrevent: status %ld ip addr %pI4, netmask %pI4.\n",
2135                event, &ifa->ifa_address, &ifa->ifa_mask);
2136
2137         switch (event) {
2138         case NETDEV_UP:
2139         case NETDEV_DOWN:
2140         case NETDEV_CHANGE:
2141                 ksocknal_handle_inetaddr_change(ifa, event);
2142                 break;
2143
2144         }
2145         return NOTIFY_OK;
2146 }
2147
2148 static struct notifier_block ksocknal_dev_notifier_block = {
2149         .notifier_call = ksocknal_device_event,
2150 };
2151
2152 static struct notifier_block ksocknal_inetaddr_notifier_block = {
2153         .notifier_call = ksocknal_inetaddr_event,
2154 };
2155
2156 static void
2157 ksocknal_base_shutdown(void)
2158 {
2159         struct ksock_sched *sched;
2160         struct ksock_peer_ni *peer_ni;
2161         int i;
2162
2163         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %lld\n",
2164                libcfs_kmem_read());
2165         LASSERT (ksocknal_data.ksnd_nnets == 0);
2166
2167         if (ksocknal_data.ksnd_init == SOCKNAL_INIT_ALL) {
2168                 unregister_netdevice_notifier(&ksocknal_dev_notifier_block);
2169                 unregister_inetaddr_notifier(&ksocknal_inetaddr_notifier_block);
2170         }
2171
2172         switch (ksocknal_data.ksnd_init) {
2173         default:
2174                 LASSERT(0);
2175                 fallthrough;
2176
2177         case SOCKNAL_INIT_ALL:
2178         case SOCKNAL_INIT_DATA:
2179                 hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list)
2180                         LASSERT(0);
2181
2182                 LASSERT(list_empty(&ksocknal_data.ksnd_nets));
2183                 LASSERT(list_empty(&ksocknal_data.ksnd_enomem_conns));
2184                 LASSERT(list_empty(&ksocknal_data.ksnd_zombie_conns));
2185                 LASSERT(list_empty(&ksocknal_data.ksnd_connd_connreqs));
2186                 LASSERT(list_empty(&ksocknal_data.ksnd_connd_routes));
2187
2188                 if (ksocknal_data.ksnd_schedulers != NULL) {
2189                         cfs_percpt_for_each(sched, i,
2190                                             ksocknal_data.ksnd_schedulers) {
2191
2192                                 LASSERT(list_empty(&sched->kss_tx_conns));
2193                                 LASSERT(list_empty(&sched->kss_rx_conns));
2194                                 LASSERT(list_empty(&sched->kss_zombie_noop_txs));
2195                                 LASSERT(sched->kss_nconns == 0);
2196                         }
2197                 }
2198
2199                 /* flag threads to terminate; wake and wait for them to die */
2200                 ksocknal_data.ksnd_shuttingdown = 1;
2201                 wake_up_all(&ksocknal_data.ksnd_connd_waitq);
2202                 wake_up(&ksocknal_data.ksnd_reaper_waitq);
2203
2204                 if (ksocknal_data.ksnd_schedulers != NULL) {
2205                         cfs_percpt_for_each(sched, i,
2206                                             ksocknal_data.ksnd_schedulers)
2207                                         wake_up_all(&sched->kss_waitq);
2208                 }
2209
2210                 wait_var_event_warning(&ksocknal_data.ksnd_nthreads,
2211                                        atomic_read(&ksocknal_data.ksnd_nthreads) == 0,
2212                                        "waiting for %d threads to terminate\n",
2213                                        atomic_read(&ksocknal_data.ksnd_nthreads));
2214
2215                 ksocknal_free_buffers();
2216
2217                 ksocknal_data.ksnd_init = SOCKNAL_INIT_NOTHING;
2218                 break;
2219         }
2220
2221         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %lld\n",
2222                libcfs_kmem_read());
2223
2224         module_put(THIS_MODULE);
2225 }
2226
2227 static int
2228 ksocknal_base_startup(void)
2229 {
2230         struct ksock_sched *sched;
2231         int rc;
2232         int i;
2233
2234         LASSERT(ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
2235         LASSERT(ksocknal_data.ksnd_nnets == 0);
2236
2237         memset(&ksocknal_data, 0, sizeof(ksocknal_data)); /* zero pointers */
2238
2239         hash_init(ksocknal_data.ksnd_peers);
2240
2241         rwlock_init(&ksocknal_data.ksnd_global_lock);
2242         INIT_LIST_HEAD(&ksocknal_data.ksnd_nets);
2243
2244         spin_lock_init(&ksocknal_data.ksnd_reaper_lock);
2245         INIT_LIST_HEAD(&ksocknal_data.ksnd_enomem_conns);
2246         INIT_LIST_HEAD(&ksocknal_data.ksnd_zombie_conns);
2247         INIT_LIST_HEAD(&ksocknal_data.ksnd_deathrow_conns);
2248         init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
2249
2250         spin_lock_init(&ksocknal_data.ksnd_connd_lock);
2251         INIT_LIST_HEAD(&ksocknal_data.ksnd_connd_connreqs);
2252         INIT_LIST_HEAD(&ksocknal_data.ksnd_connd_routes);
2253         init_waitqueue_head(&ksocknal_data.ksnd_connd_waitq);
2254
2255         spin_lock_init(&ksocknal_data.ksnd_tx_lock);
2256         INIT_LIST_HEAD(&ksocknal_data.ksnd_idle_noop_txs);
2257
2258         /* NB memset above zeros whole of ksocknal_data */
2259
2260         /* flag lists/ptrs/locks initialised */
2261         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
2262         if (!try_module_get(THIS_MODULE))
2263                 goto failed;
2264
2265         /* Create a scheduler block per available CPT */
2266         ksocknal_data.ksnd_schedulers = cfs_percpt_alloc(lnet_cpt_table(),
2267                                                          sizeof(*sched));
2268         if (ksocknal_data.ksnd_schedulers == NULL)
2269                 goto failed;
2270
2271         cfs_percpt_for_each(sched, i, ksocknal_data.ksnd_schedulers) {
2272                 int nthrs;
2273
2274                 /*
2275                  * make sure not to allocate more threads than there are
2276                  * cores/CPUs in teh CPT
2277                  */
2278                 nthrs = cfs_cpt_weight(lnet_cpt_table(), i);
2279                 if (*ksocknal_tunables.ksnd_nscheds > 0) {
2280                         nthrs = min(nthrs, *ksocknal_tunables.ksnd_nscheds);
2281                 } else {
2282                         /*
2283                          * max to half of CPUs, assume another half should be
2284                          * reserved for upper layer modules
2285                          */
2286                         nthrs = min(max(SOCKNAL_NSCHEDS, nthrs >> 1), nthrs);
2287                 }
2288
2289                 sched->kss_nthreads_max = nthrs;
2290                 sched->kss_cpt = i;
2291
2292                 spin_lock_init(&sched->kss_lock);
2293                 INIT_LIST_HEAD(&sched->kss_rx_conns);
2294                 INIT_LIST_HEAD(&sched->kss_tx_conns);
2295                 INIT_LIST_HEAD(&sched->kss_zombie_noop_txs);
2296                 init_waitqueue_head(&sched->kss_waitq);
2297         }
2298
2299         ksocknal_data.ksnd_connd_starting         = 0;
2300         ksocknal_data.ksnd_connd_failed_stamp     = 0;
2301         ksocknal_data.ksnd_connd_starting_stamp   = ktime_get_real_seconds();
2302         /* must have at least 2 connds to remain responsive to accepts while
2303          * connecting */
2304         if (*ksocknal_tunables.ksnd_nconnds < SOCKNAL_CONND_RESV + 1)
2305                 *ksocknal_tunables.ksnd_nconnds = SOCKNAL_CONND_RESV + 1;
2306
2307         if (*ksocknal_tunables.ksnd_nconnds_max <
2308             *ksocknal_tunables.ksnd_nconnds) {
2309                 ksocknal_tunables.ksnd_nconnds_max =
2310                         ksocknal_tunables.ksnd_nconnds;
2311         }
2312
2313         for (i = 0; i < *ksocknal_tunables.ksnd_nconnds; i++) {
2314                 spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
2315                 ksocknal_data.ksnd_connd_starting++;
2316                 spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
2317
2318                 rc = ksocknal_thread_start(ksocknal_connd,
2319                                            (void *)((uintptr_t)i),
2320                                            "socknal_cd%02d", i);
2321                 if (rc != 0) {
2322                         spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
2323                         ksocknal_data.ksnd_connd_starting--;
2324                         spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
2325                         CERROR("Can't spawn socknal connd: %d\n", rc);
2326                         goto failed;
2327                 }
2328         }
2329
2330         rc = ksocknal_thread_start(ksocknal_reaper, NULL, "socknal_reaper");
2331         if (rc != 0) {
2332                 CERROR ("Can't spawn socknal reaper: %d\n", rc);
2333                 goto failed;
2334         }
2335
2336         register_netdevice_notifier(&ksocknal_dev_notifier_block);
2337         register_inetaddr_notifier(&ksocknal_inetaddr_notifier_block);
2338
2339         /* flag everything initialised */
2340         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
2341
2342         return 0;
2343
2344  failed:
2345         ksocknal_base_shutdown();
2346         return -ENETDOWN;
2347 }
2348
2349 static int
2350 ksocknal_debug_peerhash(struct lnet_ni *ni)
2351 {
2352         struct ksock_peer_ni *peer_ni;
2353         int i;
2354
2355         read_lock(&ksocknal_data.ksnd_global_lock);
2356
2357         hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list) {
2358                 struct ksock_conn_cb *conn_cb;
2359                 struct ksock_conn *conn;
2360
2361                 if (peer_ni->ksnp_ni != ni)
2362                         continue;
2363
2364                 CWARN("Active peer_ni on shutdown: %s, ref %d, closing %d, accepting %d, err %d, zcookie %llu, txq %d, zc_req %d\n",
2365                       libcfs_idstr(&peer_ni->ksnp_id),
2366                       refcount_read(&peer_ni->ksnp_refcount),
2367                       peer_ni->ksnp_closing,
2368                       peer_ni->ksnp_accepting, peer_ni->ksnp_error,
2369                       peer_ni->ksnp_zc_next_cookie,
2370                       !list_empty(&peer_ni->ksnp_tx_queue),
2371                       !list_empty(&peer_ni->ksnp_zc_req_list));
2372
2373                 conn_cb = peer_ni->ksnp_conn_cb;
2374                 if (conn_cb) {
2375                         CWARN("ConnCB: ref %d, schd %d, conn %d, cnted %d, del %d\n",
2376                               refcount_read(&conn_cb->ksnr_refcount),
2377                               conn_cb->ksnr_scheduled, conn_cb->ksnr_connecting,
2378                               conn_cb->ksnr_connected, conn_cb->ksnr_deleted);
2379                 }
2380
2381                 list_for_each_entry(conn, &peer_ni->ksnp_conns, ksnc_list) {
2382                         CWARN("Conn: ref %d, sref %d, t %d, c %d\n",
2383                               refcount_read(&conn->ksnc_conn_refcount),
2384                               refcount_read(&conn->ksnc_sock_refcount),
2385                               conn->ksnc_type, conn->ksnc_closing);
2386                 }
2387                 break;
2388         }
2389
2390         read_unlock(&ksocknal_data.ksnd_global_lock);
2391         return 0;
2392 }
2393
2394 void
2395 ksocknal_shutdown(struct lnet_ni *ni)
2396 {
2397         struct ksock_net *net = ni->ni_data;
2398
2399         LASSERT(ksocknal_data.ksnd_init == SOCKNAL_INIT_ALL);
2400         LASSERT(ksocknal_data.ksnd_nnets > 0);
2401
2402         /* prevent new peers */
2403         atomic_add(SOCKNAL_SHUTDOWN_BIAS, &net->ksnn_npeers);
2404
2405         /* Delete all peers */
2406         ksocknal_del_peer(ni, NULL);
2407
2408         /* Wait for all peer_ni state to clean up */
2409         wait_var_event_warning(&net->ksnn_npeers,
2410                                atomic_read(&net->ksnn_npeers) ==
2411                                SOCKNAL_SHUTDOWN_BIAS,
2412                                "waiting for %d peers to disconnect\n",
2413                                ksocknal_debug_peerhash(ni) +
2414                                atomic_read(&net->ksnn_npeers) -
2415                                SOCKNAL_SHUTDOWN_BIAS);
2416
2417         LASSERT(net->ksnn_interface.ksni_npeers == 0);
2418         LASSERT(net->ksnn_interface.ksni_nroutes == 0);
2419
2420         list_del(&net->ksnn_list);
2421         LIBCFS_FREE(net, sizeof(*net));
2422
2423         ksocknal_data.ksnd_nnets--;
2424         if (ksocknal_data.ksnd_nnets == 0)
2425                 ksocknal_base_shutdown();
2426 }
2427
2428 static int
2429 ksocknal_search_new_ipif(struct ksock_net *net)
2430 {
2431         int new_ipif = 0;
2432         char *ifnam = &net->ksnn_interface.ksni_name[0];
2433         char *colon = strchr(ifnam, ':');
2434         bool found = false;
2435         struct ksock_net *tmp;
2436
2437         if (colon != NULL)
2438                 *colon = 0;
2439
2440         list_for_each_entry(tmp, &ksocknal_data.ksnd_nets, ksnn_list) {
2441                 char *ifnam2 = &tmp->ksnn_interface.ksni_name[0];
2442                 char *colon2 = strchr(ifnam2, ':');
2443
2444                 if (colon2 != NULL)
2445                         *colon2 = 0;
2446
2447                 found = strcmp(ifnam, ifnam2) == 0;
2448                 if (colon2 != NULL)
2449                         *colon2 = ':';
2450         }
2451
2452         new_ipif += !found;
2453         if (colon != NULL)
2454                 *colon = ':';
2455
2456         return new_ipif;
2457 }
2458
2459 static int
2460 ksocknal_start_schedulers(struct ksock_sched *sched)
2461 {
2462         int     nthrs;
2463         int     rc = 0;
2464         int     i;
2465
2466         if (sched->kss_nthreads == 0) {
2467                 if (*ksocknal_tunables.ksnd_nscheds > 0) {
2468                         nthrs = sched->kss_nthreads_max;
2469                 } else {
2470                         nthrs = cfs_cpt_weight(lnet_cpt_table(),
2471                                                sched->kss_cpt);
2472                         nthrs = min(max(SOCKNAL_NSCHEDS, nthrs >> 1), nthrs);
2473                         nthrs = min(SOCKNAL_NSCHEDS_HIGH, nthrs);
2474                 }
2475                 nthrs = min(nthrs, sched->kss_nthreads_max);
2476         } else {
2477                 LASSERT(sched->kss_nthreads <= sched->kss_nthreads_max);
2478                 /* increase two threads if there is new interface */
2479                 nthrs = min(2, sched->kss_nthreads_max - sched->kss_nthreads);
2480         }
2481
2482         for (i = 0; i < nthrs; i++) {
2483                 long id;
2484
2485                 id = KSOCK_THREAD_ID(sched->kss_cpt, sched->kss_nthreads + i);
2486                 rc = ksocknal_thread_start(ksocknal_scheduler, (void *)id,
2487                                            "socknal_sd%02d_%02d",
2488                                            sched->kss_cpt,
2489                                            (int)KSOCK_THREAD_SID(id));
2490                 if (rc == 0)
2491                         continue;
2492
2493                 CERROR("Can't spawn thread %d for scheduler[%d]: %d\n",
2494                        sched->kss_cpt, (int) KSOCK_THREAD_SID(id), rc);
2495                 break;
2496         }
2497
2498         sched->kss_nthreads += i;
2499         return rc;
2500 }
2501
2502 static int
2503 ksocknal_net_start_threads(struct ksock_net *net, __u32 *cpts, int ncpts)
2504 {
2505         int newif = ksocknal_search_new_ipif(net);
2506         int rc;
2507         int i;
2508
2509         if (ncpts > 0 && ncpts > cfs_cpt_number(lnet_cpt_table()))
2510                 return -EINVAL;
2511
2512         for (i = 0; i < ncpts; i++) {
2513                 struct ksock_sched *sched;
2514                 int cpt = (cpts == NULL) ? i : cpts[i];
2515
2516                 LASSERT(cpt < cfs_cpt_number(lnet_cpt_table()));
2517                 sched = ksocknal_data.ksnd_schedulers[cpt];
2518
2519                 if (!newif && sched->kss_nthreads > 0)
2520                         continue;
2521
2522                 rc = ksocknal_start_schedulers(sched);
2523                 if (rc != 0)
2524                         return rc;
2525         }
2526         return 0;
2527 }
2528
2529 int
2530 ksocknal_startup(struct lnet_ni *ni)
2531 {
2532         struct ksock_net *net;
2533         struct ksock_interface *ksi = NULL;
2534         struct lnet_inetdev *ifaces = NULL;
2535         int rc, if_idx;
2536         int dev_status;
2537
2538         LASSERT (ni->ni_net->net_lnd == &the_ksocklnd);
2539         if (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING) {
2540                 rc = ksocknal_base_startup();
2541                 if (rc != 0)
2542                         return rc;
2543         }
2544         LIBCFS_ALLOC(net, sizeof(*net));
2545         if (net == NULL)
2546                 goto out_base;
2547
2548         net->ksnn_incarnation = ktime_get_real_ns();
2549         ni->ni_data = net;
2550
2551         ksocknal_tunables_setup(ni);
2552
2553         rc = lnet_inet_enumerate(&ifaces, ni->ni_net_ns, true);
2554         if (rc < 0)
2555                 goto out_net;
2556
2557         ksi = &net->ksnn_interface;
2558
2559         /* Interface and/or IP address is specified otherwise default to
2560          * the first Interface
2561          */
2562         if_idx = lnet_inet_select(ni, ifaces, rc);
2563         if (if_idx < 0)
2564                 goto out_net;
2565
2566         if (!ni->ni_interface) {
2567                 rc = lnet_ni_add_interface(ni, ifaces[if_idx].li_name);
2568                 if (rc < 0)
2569                         CWARN("ksocklnd failed to allocate ni_interface\n");
2570         }
2571
2572         ni->ni_dev_cpt = ifaces[if_idx].li_cpt;
2573         ksi->ksni_index = ifaces[if_idx].li_index;
2574         if (ifaces[if_idx].li_size == sizeof(struct in6_addr)) {
2575                 struct sockaddr_in6 *sa;
2576                 sa = (void *)&ksi->ksni_addr;
2577                 memset(sa, 0, sizeof(*sa));
2578                 sa->sin6_family = AF_INET6;
2579                 memcpy(&sa->sin6_addr, ifaces[if_idx].li_ipv6addr,
2580                        sizeof(struct in6_addr));
2581                 ni->ni_nid.nid_size = sizeof(struct in6_addr) - 4;
2582                 memcpy(&ni->ni_nid.nid_addr, ifaces[if_idx].li_ipv6addr,
2583                        sizeof(struct in6_addr));
2584         } else {
2585                 struct sockaddr_in *sa;
2586                 sa = (void *)&ksi->ksni_addr;
2587                 memset(sa, 0, sizeof(*sa));
2588                 sa->sin_family = AF_INET;
2589                 sa->sin_addr.s_addr = ifaces[if_idx].li_ipaddr;
2590                 ksi->ksni_netmask = ifaces[if_idx].li_netmask;
2591                 ni->ni_nid.nid_size = 0;
2592                 ni->ni_nid.nid_addr[0] = sa->sin_addr.s_addr;
2593         }
2594         strlcpy(ksi->ksni_name, ifaces[if_idx].li_name, sizeof(ksi->ksni_name));
2595
2596         /* call it before add it to ksocknal_data.ksnd_nets */
2597         rc = ksocknal_net_start_threads(net, ni->ni_cpts, ni->ni_ncpts);
2598         if (rc != 0)
2599                 goto out_net;
2600
2601         if ((ksocknal_ip2index((struct sockaddr *)&ksi->ksni_addr,
2602                                 ni,
2603                                 &dev_status) < 0) ||
2604              (dev_status <= 0))
2605                 lnet_set_link_fatal_state(ni, 1);
2606
2607         list_add(&net->ksnn_list, &ksocknal_data.ksnd_nets);
2608         net->ksnn_ni = ni;
2609         ksocknal_data.ksnd_nnets++;
2610
2611         return 0;
2612
2613 out_net:
2614         LIBCFS_FREE(net, sizeof(*net));
2615 out_base:
2616         if (ksocknal_data.ksnd_nnets == 0)
2617                 ksocknal_base_shutdown();
2618
2619         return -ENETDOWN;
2620 }
2621
2622 static void __exit ksocklnd_exit(void)
2623 {
2624         lnet_unregister_lnd(&the_ksocklnd);
2625 }
2626
2627 static const struct lnet_lnd the_ksocklnd = {
2628         .lnd_type               = SOCKLND,
2629         .lnd_startup            = ksocknal_startup,
2630         .lnd_shutdown           = ksocknal_shutdown,
2631         .lnd_ctl                = ksocknal_ctl,
2632         .lnd_send               = ksocknal_send,
2633         .lnd_recv               = ksocknal_recv,
2634         .lnd_notify_peer_down   = ksocknal_notify_gw_down,
2635         .lnd_accept             = ksocknal_accept,
2636         .lnd_nl_get             = ksocknal_nl_get,
2637         .lnd_nl_set             = ksocknal_nl_set,
2638         .lnd_keys               = &ksocknal_tunables_keys,
2639 };
2640
2641 static int __init ksocklnd_init(void)
2642 {
2643         int rc;
2644
2645         /* check ksnr_connected/connecting field large enough */
2646         BUILD_BUG_ON(SOCKLND_CONN_NTYPES > 4);
2647         BUILD_BUG_ON(SOCKLND_CONN_ACK != SOCKLND_CONN_BULK_IN);
2648
2649         rc = ksocknal_tunables_init();
2650         if (rc != 0)
2651                 return rc;
2652
2653         rc = libcfs_setup();
2654         if (rc)
2655                 return rc;
2656
2657         lnet_register_lnd(&the_ksocklnd);
2658
2659         return 0;
2660 }
2661
2662 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2663 MODULE_DESCRIPTION("TCP Socket LNet Network Driver");
2664 MODULE_VERSION("2.8.0");
2665 MODULE_LICENSE("GPL");
2666
2667 module_init(ksocklnd_init);
2668 module_exit(ksocklnd_exit);