Whamcloud - gitweb
2c2070c5339488f6e695e7936530726a6a30b11f
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lnet/klnds/socklnd/socklnd.c
32  *
33  * Author: Zach Brown <zab@zabbo.net>
34  * Author: Peter J. Braam <braam@clusterfs.com>
35  * Author: Phil Schwan <phil@clusterfs.com>
36  * Author: Eric Barton <eric@bartonsoftware.com>
37  */
38
39 #include <linux/ethtool.h>
40 #include <linux/inetdevice.h>
41 #include <linux/kernel.h>
42 #include <linux/sunrpc/addr.h>
43 #include <net/addrconf.h>
44 #include "socklnd.h"
45
46 static const struct lnet_lnd the_ksocklnd;
47 struct ksock_nal_data ksocknal_data;
48
49 static int ksocknal_ip2index(struct sockaddr *addr, struct lnet_ni *ni,
50                              int *dev_status)
51 {
52         struct net_device *dev;
53         int ret = -1;
54         DECLARE_CONST_IN_IFADDR(ifa);
55
56         *dev_status = -1;
57
58         if (addr->sa_family != AF_INET && addr->sa_family != AF_INET6)
59                 return ret;
60
61         rcu_read_lock();
62         for_each_netdev(ni->ni_net_ns, dev) {
63                 int flags = dev_get_flags(dev);
64                 struct in_device *in_dev;
65
66                 if (flags & IFF_LOOPBACK) /* skip the loopback IF */
67                         continue;
68
69                 if (!(flags & IFF_UP))
70                         continue;
71
72                 switch (addr->sa_family) {
73                 case AF_INET:
74                         in_dev = __in_dev_get_rcu(dev);
75                         if (!in_dev)
76                                 continue;
77
78                         in_dev_for_each_ifa_rcu(ifa, in_dev) {
79                                 if (ifa->ifa_local ==
80                                     ((struct sockaddr_in *)addr)->sin_addr.s_addr)
81                                         ret = dev->ifindex;
82                         }
83                         endfor_ifa(in_dev);
84                         break;
85 #if IS_ENABLED(CONFIG_IPV6)
86                 case AF_INET6: {
87                         struct inet6_dev *in6_dev;
88                         const struct inet6_ifaddr *ifa6;
89                         struct sockaddr_in6 *addr6 = (struct sockaddr_in6*)addr;
90
91                         in6_dev = __in6_dev_get(dev);
92                         if (!in6_dev)
93                                 continue;
94
95                         list_for_each_entry_rcu(ifa6, &in6_dev->addr_list, if_list) {
96                                 if (ipv6_addr_cmp(&ifa6->addr,
97                                                  &addr6->sin6_addr) == 0)
98                                         ret = dev->ifindex;
99                         }
100                         break;
101                         }
102 #endif /* IS_ENABLED(CONFIG_IPV6) */
103                 }
104                 if (ret >= 0)
105                         break;
106         }
107
108         rcu_read_unlock();
109         if (ret >= 0)
110                 *dev_status = 1;
111
112         if ((ret == -1) ||
113             ((dev->reg_state == NETREG_UNREGISTERING) ||
114              ((dev->operstate != IF_OPER_UP) &&
115               (dev->operstate != IF_OPER_UNKNOWN))) ||
116             (lnet_get_link_status(dev) == 0))
117                 *dev_status = 0;
118
119         return ret;
120 }
121
122 static struct ksock_conn_cb *
123 ksocknal_create_conn_cb(struct sockaddr *addr)
124 {
125         struct ksock_conn_cb *conn_cb;
126
127         LIBCFS_ALLOC(conn_cb, sizeof(*conn_cb));
128         if (!conn_cb)
129                 return NULL;
130
131         refcount_set(&conn_cb->ksnr_refcount, 1);
132         conn_cb->ksnr_peer = NULL;
133         conn_cb->ksnr_retry_interval = 0;         /* OK to connect at any time */
134         rpc_copy_addr((struct sockaddr *)&conn_cb->ksnr_addr, addr);
135         rpc_set_port((struct sockaddr *)&conn_cb->ksnr_addr,
136                      rpc_get_port(addr));
137         conn_cb->ksnr_scheduled = 0;
138         conn_cb->ksnr_connecting = 0;
139         conn_cb->ksnr_connected = 0;
140         conn_cb->ksnr_deleted = 0;
141         conn_cb->ksnr_conn_count = 0;
142         conn_cb->ksnr_ctrl_conn_count = 0;
143         conn_cb->ksnr_blki_conn_count = 0;
144         conn_cb->ksnr_blko_conn_count = 0;
145         conn_cb->ksnr_max_conns = 0;
146         conn_cb->ksnr_busy_retry_count = 0;
147
148         return conn_cb;
149 }
150
151 void
152 ksocknal_destroy_conn_cb(struct ksock_conn_cb *conn_cb)
153 {
154         LASSERT(refcount_read(&conn_cb->ksnr_refcount) == 0);
155
156         if (conn_cb->ksnr_peer)
157                 ksocknal_peer_decref(conn_cb->ksnr_peer);
158
159         LIBCFS_FREE(conn_cb, sizeof(*conn_cb));
160 }
161
162 static struct ksock_peer_ni *
163 ksocknal_create_peer(struct lnet_ni *ni, struct lnet_processid *id)
164 {
165         int cpt = lnet_nid2cpt(&id->nid, ni);
166         struct ksock_net *net = ni->ni_data;
167         struct ksock_peer_ni *peer_ni;
168
169         LASSERT(!LNET_NID_IS_ANY(&id->nid));
170         LASSERT(id->pid != LNET_PID_ANY);
171         LASSERT(!in_interrupt());
172
173         if (!atomic_inc_unless_negative(&net->ksnn_npeers)) {
174                 CERROR("Can't create peer_ni: network shutdown\n");
175                 return ERR_PTR(-ESHUTDOWN);
176         }
177
178         LIBCFS_CPT_ALLOC(peer_ni, lnet_cpt_table(), cpt, sizeof(*peer_ni));
179         if (!peer_ni) {
180                 atomic_dec(&net->ksnn_npeers);
181                 return ERR_PTR(-ENOMEM);
182         }
183
184         peer_ni->ksnp_ni = ni;
185         peer_ni->ksnp_id = *id;
186         refcount_set(&peer_ni->ksnp_refcount, 1); /* 1 ref for caller */
187         peer_ni->ksnp_closing = 0;
188         peer_ni->ksnp_accepting = 0;
189         peer_ni->ksnp_proto = NULL;
190         peer_ni->ksnp_last_alive = 0;
191         peer_ni->ksnp_zc_next_cookie = SOCKNAL_KEEPALIVE_PING + 1;
192         peer_ni->ksnp_conn_cb = NULL;
193
194         INIT_LIST_HEAD(&peer_ni->ksnp_conns);
195         INIT_LIST_HEAD(&peer_ni->ksnp_tx_queue);
196         INIT_LIST_HEAD(&peer_ni->ksnp_zc_req_list);
197         spin_lock_init(&peer_ni->ksnp_lock);
198
199         return peer_ni;
200 }
201
202 void
203 ksocknal_destroy_peer(struct ksock_peer_ni *peer_ni)
204 {
205         struct ksock_net *net = peer_ni->ksnp_ni->ni_data;
206
207         CDEBUG (D_NET, "peer_ni %s %p deleted\n",
208                 libcfs_idstr(&peer_ni->ksnp_id), peer_ni);
209
210         LASSERT(refcount_read(&peer_ni->ksnp_refcount) == 0);
211         LASSERT(peer_ni->ksnp_accepting == 0);
212         LASSERT(list_empty(&peer_ni->ksnp_conns));
213         LASSERT(peer_ni->ksnp_conn_cb == NULL);
214         LASSERT(list_empty(&peer_ni->ksnp_tx_queue));
215         LASSERT(list_empty(&peer_ni->ksnp_zc_req_list));
216
217         LIBCFS_FREE(peer_ni, sizeof(*peer_ni));
218
219         /* NB a peer_ni's connections and conn_cb keep a reference on their
220          * peer_ni until they are destroyed, so we can be assured that _all_
221          * state to do with this peer_ni has been cleaned up when its refcount
222          * drops to zero.
223          */
224         if (atomic_dec_and_test(&net->ksnn_npeers))
225                 wake_up_var(&net->ksnn_npeers);
226 }
227
228 struct ksock_peer_ni *
229 ksocknal_find_peer_locked(struct lnet_ni *ni, struct lnet_processid *id)
230 {
231         struct ksock_peer_ni *peer_ni;
232         unsigned long hash = nidhash(&id->nid);
233
234         hash_for_each_possible(ksocknal_data.ksnd_peers, peer_ni,
235                                ksnp_list, hash) {
236                 LASSERT(!peer_ni->ksnp_closing);
237
238                 if (peer_ni->ksnp_ni != ni)
239                         continue;
240
241                 if (!nid_same(&peer_ni->ksnp_id.nid, &id->nid) ||
242                     peer_ni->ksnp_id.pid != id->pid)
243                         continue;
244
245                 CDEBUG(D_NET, "got peer_ni [%p] -> %s (%d)\n",
246                        peer_ni, libcfs_idstr(id),
247                        refcount_read(&peer_ni->ksnp_refcount));
248                 return peer_ni;
249         }
250         return NULL;
251 }
252
253 struct ksock_peer_ni *
254 ksocknal_find_peer(struct lnet_ni *ni, struct lnet_processid *id)
255 {
256         struct ksock_peer_ni *peer_ni;
257
258         read_lock(&ksocknal_data.ksnd_global_lock);
259         peer_ni = ksocknal_find_peer_locked(ni, id);
260         if (peer_ni != NULL)                    /* +1 ref for caller? */
261                 ksocknal_peer_addref(peer_ni);
262         read_unlock(&ksocknal_data.ksnd_global_lock);
263
264         return peer_ni;
265 }
266
267 static void
268 ksocknal_unlink_peer_locked(struct ksock_peer_ni *peer_ni)
269 {
270         LASSERT(list_empty(&peer_ni->ksnp_conns));
271         LASSERT(peer_ni->ksnp_conn_cb == NULL);
272         LASSERT(!peer_ni->ksnp_closing);
273         peer_ni->ksnp_closing = 1;
274         hlist_del(&peer_ni->ksnp_list);
275         /* lose peerlist's ref */
276         ksocknal_peer_decref(peer_ni);
277 }
278
279
280 static void
281 ksocknal_dump_peer_debug_info(struct ksock_peer_ni *peer_ni)
282 {
283         struct ksock_conn *conn;
284         struct list_head *ctmp;
285         struct list_head *txtmp;
286         int ccount = 0;
287         int txcount = 0;
288
289         list_for_each(ctmp, &peer_ni->ksnp_conns) {
290                 conn = list_entry(ctmp, struct ksock_conn, ksnc_list);
291
292                 if (!list_empty(&conn->ksnc_tx_queue))
293                         list_for_each(txtmp, &conn->ksnc_tx_queue) txcount++;
294
295                 CDEBUG(D_CONSOLE, "Conn %d [type, closing, crefcnt, srefcnt]: %d, %d, %d, %d\n",
296                        ccount,
297                        conn->ksnc_type,
298                        conn->ksnc_closing,
299                        refcount_read(&conn->ksnc_conn_refcount),
300                        refcount_read(&conn->ksnc_sock_refcount));
301                 CDEBUG(D_CONSOLE, "Conn %d rx [scheduled, ready, state]: %d, %d, %d\n",
302                        ccount,
303                        conn->ksnc_rx_scheduled,
304                        conn->ksnc_rx_ready,
305                        conn->ksnc_rx_state);
306                 CDEBUG(D_CONSOLE, "Conn %d tx [txqcnt, scheduled, last_post, ready, deadline]: %d, %d, %lld, %d, %lld\n",
307                        ccount,
308                        txcount,
309                        conn->ksnc_tx_scheduled,
310                        conn->ksnc_tx_last_post,
311                        conn->ksnc_rx_ready,
312                        conn->ksnc_rx_deadline);
313
314                 if (conn->ksnc_scheduler)
315                         CDEBUG(D_CONSOLE, "Conn %d sched [nconns, cpt]: %d, %d\n",
316                                ccount,
317                                conn->ksnc_scheduler->kss_nconns,
318                                conn->ksnc_scheduler->kss_cpt);
319
320                 txcount = 0;
321                 ccount++;
322         }
323 }
324
325 static int
326 ksocknal_get_peer_info(struct lnet_ni *ni, int index,
327                        struct lnet_processid *id, __u32 *myip, __u32 *peer_ip,
328                        int *port, int *conn_count, int *share_count)
329 {
330         struct ksock_peer_ni *peer_ni;
331         struct ksock_conn_cb *conn_cb;
332         int i;
333         int rc = -ENOENT;
334         struct ksock_net *net;
335
336         read_lock(&ksocknal_data.ksnd_global_lock);
337
338         hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list) {
339
340                 if (peer_ni->ksnp_ni != ni)
341                         continue;
342                 if (index-- > 0)
343                         continue;
344
345                 *id = peer_ni->ksnp_id;
346                 conn_cb = peer_ni->ksnp_conn_cb;
347                 if (conn_cb == NULL) {
348                         *myip = 0;
349                         *peer_ip = 0;
350                         *port = 0;
351                         *conn_count = 0;
352                         *share_count = 0;
353                         rc = 0;
354                 } else {
355                         ksocknal_dump_peer_debug_info(peer_ni);
356
357                         if (conn_cb->ksnr_addr.ss_family == AF_INET) {
358                                 struct sockaddr_in *sa =
359                                         (void *)&conn_cb->ksnr_addr;
360                                 net = ni->ni_data;
361                                 rc = choose_ipv4_src(myip,
362                                                      net->ksnn_interface.ksni_index,
363                                                      ntohl(sa->sin_addr.s_addr),
364                                                      ni->ni_net_ns);
365                                 *peer_ip = ntohl(sa->sin_addr.s_addr);
366                                 *port = ntohs(sa->sin_port);
367
368                         } else {
369                                 *myip = 0xFFFFFFFF;
370                                 *peer_ip = 0xFFFFFFFF;
371                                 *port = 0;
372                                 rc = -ENOTSUPP;
373                         }
374                         *conn_count = conn_cb->ksnr_conn_count;
375                         *share_count = 1;
376                 }
377                 break;
378         }
379         read_unlock(&ksocknal_data.ksnd_global_lock);
380         return rc;
381 }
382
383 static unsigned int
384 ksocknal_get_conns_per_peer(struct ksock_peer_ni *peer_ni)
385 {
386         struct lnet_ni *ni = peer_ni->ksnp_ni;
387         struct lnet_ioctl_config_socklnd_tunables *tunables;
388
389         LASSERT(ni);
390
391         tunables = &ni->ni_lnd_tunables.lnd_tun_u.lnd_sock;
392
393         return tunables->lnd_conns_per_peer;
394 }
395
396 static void
397 ksocknal_incr_conn_count(struct ksock_conn_cb *conn_cb,
398                          int type)
399 {
400         conn_cb->ksnr_conn_count++;
401
402         /* check if all connections of the given type got created */
403         switch (type) {
404         case SOCKLND_CONN_CONTROL:
405                 conn_cb->ksnr_ctrl_conn_count++;
406                 /* there's a single control connection per peer,
407                  * two in case of loopback
408                  */
409                 conn_cb->ksnr_connected |= BIT(type);
410                 break;
411         case SOCKLND_CONN_BULK_IN:
412                 conn_cb->ksnr_blki_conn_count++;
413                 if (conn_cb->ksnr_blki_conn_count >= conn_cb->ksnr_max_conns)
414                         conn_cb->ksnr_connected |= BIT(type);
415                 break;
416         case SOCKLND_CONN_BULK_OUT:
417                 conn_cb->ksnr_blko_conn_count++;
418                 if (conn_cb->ksnr_blko_conn_count >= conn_cb->ksnr_max_conns)
419                         conn_cb->ksnr_connected |= BIT(type);
420                 break;
421         case SOCKLND_CONN_ANY:
422                 if (conn_cb->ksnr_conn_count >= conn_cb->ksnr_max_conns)
423                         conn_cb->ksnr_connected |= BIT(type);
424                 break;
425         default:
426                 LBUG();
427                 break;
428         }
429
430         CDEBUG(D_NET, "Add conn type %d, ksnr_connected %x ksnr_max_conns %d\n",
431                type, conn_cb->ksnr_connected, conn_cb->ksnr_max_conns);
432 }
433
434
435 static void
436 ksocknal_decr_conn_count(struct ksock_conn_cb *conn_cb,
437                          int type)
438 {
439         conn_cb->ksnr_conn_count--;
440
441         /* check if all connections of the given type got created */
442         switch (type) {
443         case SOCKLND_CONN_CONTROL:
444                 conn_cb->ksnr_ctrl_conn_count--;
445                 /* there's a single control connection per peer,
446                  * two in case of loopback
447                  */
448                 if (conn_cb->ksnr_ctrl_conn_count == 0)
449                         conn_cb->ksnr_connected &= ~BIT(type);
450                 break;
451         case SOCKLND_CONN_BULK_IN:
452                 conn_cb->ksnr_blki_conn_count--;
453                 if (conn_cb->ksnr_blki_conn_count == 0)
454                         conn_cb->ksnr_connected &= ~BIT(type);
455                 break;
456         case SOCKLND_CONN_BULK_OUT:
457                 conn_cb->ksnr_blko_conn_count--;
458                 if (conn_cb->ksnr_blko_conn_count == 0)
459                         conn_cb->ksnr_connected &= ~BIT(type);
460                 break;
461         case SOCKLND_CONN_ANY:
462                 if (conn_cb->ksnr_conn_count == 0)
463                         conn_cb->ksnr_connected &= ~BIT(type);
464                 break;
465         default:
466                 LBUG();
467                 break;
468         }
469
470         CDEBUG(D_NET, "Del conn type %d, ksnr_connected %x ksnr_max_conns %d\n",
471                type, conn_cb->ksnr_connected, conn_cb->ksnr_max_conns);
472 }
473
474 static void
475 ksocknal_associate_cb_conn_locked(struct ksock_conn_cb *conn_cb,
476                                   struct ksock_conn *conn)
477 {
478         int type = conn->ksnc_type;
479
480         conn->ksnc_conn_cb = conn_cb;
481         ksocknal_conn_cb_addref(conn_cb);
482         ksocknal_incr_conn_count(conn_cb, type);
483
484         /* Successful connection => further attempts can
485          * proceed immediately
486          */
487         conn_cb->ksnr_retry_interval = 0;
488 }
489
490 static void
491 ksocknal_add_conn_cb_locked(struct ksock_peer_ni *peer_ni,
492                             struct ksock_conn_cb *conn_cb)
493 {
494         struct ksock_conn *conn;
495         struct ksock_net *net = peer_ni->ksnp_ni->ni_data;
496
497         LASSERT(!peer_ni->ksnp_closing);
498         LASSERT(!conn_cb->ksnr_peer);
499         LASSERT(!conn_cb->ksnr_scheduled);
500         LASSERT(!conn_cb->ksnr_connecting);
501         LASSERT(conn_cb->ksnr_connected == 0);
502
503         conn_cb->ksnr_peer = peer_ni;
504         ksocknal_peer_addref(peer_ni);
505
506         /* peer_ni's route list takes over my ref on 'route' */
507         peer_ni->ksnp_conn_cb = conn_cb;
508         net->ksnn_interface.ksni_nroutes++;
509
510         list_for_each_entry(conn, &peer_ni->ksnp_conns, ksnc_list) {
511                 if (!rpc_cmp_addr((struct sockaddr *)&conn->ksnc_peeraddr,
512                                   (struct sockaddr *)&conn_cb->ksnr_addr))
513                         continue;
514                 CDEBUG(D_NET, "call ksocknal_associate_cb_conn_locked\n");
515                 ksocknal_associate_cb_conn_locked(conn_cb, conn);
516                 /* keep going (typed conns) */
517         }
518 }
519
520 static void
521 ksocknal_del_conn_cb_locked(struct ksock_conn_cb *conn_cb)
522 {
523         struct ksock_peer_ni *peer_ni = conn_cb->ksnr_peer;
524         struct ksock_conn *conn;
525         struct ksock_conn *cnxt;
526         struct ksock_net *net;
527
528         LASSERT(!conn_cb->ksnr_deleted);
529
530         /* Close associated conns */
531         list_for_each_entry_safe(conn, cnxt, &peer_ni->ksnp_conns, ksnc_list) {
532                 if (conn->ksnc_conn_cb != conn_cb)
533                         continue;
534
535                 ksocknal_close_conn_locked(conn, 0);
536         }
537
538         net = (struct ksock_net *)(peer_ni->ksnp_ni->ni_data);
539         net->ksnn_interface.ksni_nroutes--;
540         LASSERT(net->ksnn_interface.ksni_nroutes >= 0);
541
542         conn_cb->ksnr_deleted = 1;
543         ksocknal_conn_cb_decref(conn_cb);               /* drop peer_ni's ref */
544         peer_ni->ksnp_conn_cb = NULL;
545
546         if (list_empty(&peer_ni->ksnp_conns)) {
547                 /* I've just removed the last route to a peer_ni with no active
548                  * connections
549                  */
550                 ksocknal_unlink_peer_locked(peer_ni);
551         }
552 }
553
554 unsigned int
555 ksocknal_get_conn_count_by_type(struct ksock_conn_cb *conn_cb,
556                                 int type)
557 {
558         unsigned int count = 0;
559
560         switch (type) {
561         case SOCKLND_CONN_CONTROL:
562                 count = conn_cb->ksnr_ctrl_conn_count;
563                 break;
564         case SOCKLND_CONN_BULK_IN:
565                 count = conn_cb->ksnr_blki_conn_count;
566                 break;
567         case SOCKLND_CONN_BULK_OUT:
568                 count = conn_cb->ksnr_blko_conn_count;
569                 break;
570         case SOCKLND_CONN_ANY:
571                 count = conn_cb->ksnr_conn_count;
572                 break;
573         default:
574                 LBUG();
575                 break;
576         }
577
578         return count;
579 }
580
581 int
582 ksocknal_add_peer(struct lnet_ni *ni, struct lnet_processid *id,
583                   struct sockaddr *addr)
584 {
585         struct ksock_peer_ni *peer_ni;
586         struct ksock_peer_ni *peer2;
587         struct ksock_conn_cb *conn_cb;
588
589         if (LNET_NID_IS_ANY(&id->nid) ||
590             id->pid == LNET_PID_ANY)
591                 return (-EINVAL);
592
593         /* Have a brand new peer_ni ready... */
594         peer_ni = ksocknal_create_peer(ni, id);
595         if (IS_ERR(peer_ni))
596                 return PTR_ERR(peer_ni);
597
598         conn_cb = ksocknal_create_conn_cb(addr);
599         if (!conn_cb) {
600                 ksocknal_peer_decref(peer_ni);
601                 return -ENOMEM;
602         }
603
604         write_lock_bh(&ksocknal_data.ksnd_global_lock);
605
606         /* always called with a ref on ni, so shutdown can't have started */
607         LASSERT(atomic_read(&((struct ksock_net *)ni->ni_data)->ksnn_npeers)
608                 >= 0);
609
610         peer2 = ksocknal_find_peer_locked(ni, id);
611         if (peer2 != NULL) {
612                 ksocknal_peer_decref(peer_ni);
613                 peer_ni = peer2;
614         } else {
615                 /* peer_ni table takes my ref on peer_ni */
616                 hash_add(ksocknal_data.ksnd_peers, &peer_ni->ksnp_list,
617                          nidhash(&id->nid));
618         }
619
620         if (peer_ni->ksnp_conn_cb) {
621                 ksocknal_conn_cb_decref(conn_cb);
622         } else {
623                 /* Remember conns_per_peer setting at the time
624                  * of connection initiation. It will define the
625                  * max number of conns per type for this conn_cb
626                  * while it's in use.
627                  */
628                 conn_cb->ksnr_max_conns = ksocknal_get_conns_per_peer(peer_ni);
629                 ksocknal_add_conn_cb_locked(peer_ni, conn_cb);
630         }
631
632         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
633
634         return 0;
635 }
636
637 static void
638 ksocknal_del_peer_locked(struct ksock_peer_ni *peer_ni)
639 {
640         struct ksock_conn *conn;
641         struct ksock_conn *cnxt;
642         struct ksock_conn_cb *conn_cb;
643
644         LASSERT(!peer_ni->ksnp_closing);
645
646         /* Extra ref prevents peer_ni disappearing until I'm done with it */
647         ksocknal_peer_addref(peer_ni);
648         conn_cb = peer_ni->ksnp_conn_cb;
649         if (conn_cb)
650                 ksocknal_del_conn_cb_locked(conn_cb);
651
652         list_for_each_entry_safe(conn, cnxt, &peer_ni->ksnp_conns,
653                                  ksnc_list)
654                 ksocknal_close_conn_locked(conn, 0);
655
656         ksocknal_peer_decref(peer_ni);
657         /* NB peer_ni unlinks itself when last conn/conn_cb is removed */
658 }
659
660 static int
661 ksocknal_del_peer(struct lnet_ni *ni, struct lnet_processid *id)
662 {
663         LIST_HEAD(zombies);
664         struct hlist_node *pnxt;
665         struct ksock_peer_ni *peer_ni;
666         int lo;
667         int hi;
668         int i;
669         int rc = -ENOENT;
670
671         write_lock_bh(&ksocknal_data.ksnd_global_lock);
672
673         if (id && !LNET_NID_IS_ANY(&id->nid)) {
674                 lo = hash_min(nidhash(&id->nid),
675                               HASH_BITS(ksocknal_data.ksnd_peers));
676                 hi = lo;
677         } else {
678                 lo = 0;
679                 hi = HASH_SIZE(ksocknal_data.ksnd_peers) - 1;
680         }
681
682         for (i = lo; i <= hi; i++) {
683                 hlist_for_each_entry_safe(peer_ni, pnxt,
684                                           &ksocknal_data.ksnd_peers[i],
685                                           ksnp_list) {
686                         if (peer_ni->ksnp_ni != ni)
687                                 continue;
688
689                         if (!((!id || LNET_NID_IS_ANY(&id->nid) ||
690                                nid_same(&peer_ni->ksnp_id.nid, &id->nid)) &&
691                               (!id || id->pid == LNET_PID_ANY ||
692                                peer_ni->ksnp_id.pid == id->pid)))
693                                 continue;
694
695                         ksocknal_peer_addref(peer_ni);  /* a ref for me... */
696
697                         ksocknal_del_peer_locked(peer_ni);
698
699                         if (peer_ni->ksnp_closing &&
700                             !list_empty(&peer_ni->ksnp_tx_queue)) {
701                                 LASSERT(list_empty(&peer_ni->ksnp_conns));
702                                 LASSERT(peer_ni->ksnp_conn_cb == NULL);
703
704                                 list_splice_init(&peer_ni->ksnp_tx_queue,
705                                                  &zombies);
706                         }
707
708                         ksocknal_peer_decref(peer_ni);  /* ...till here */
709
710                         rc = 0;                         /* matched! */
711                 }
712         }
713
714         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
715
716         ksocknal_txlist_done(ni, &zombies, -ENETDOWN);
717
718         return rc;
719 }
720
721 static struct ksock_conn *
722 ksocknal_get_conn_by_idx(struct lnet_ni *ni, int index)
723 {
724         struct ksock_peer_ni *peer_ni;
725         struct ksock_conn *conn;
726         int i;
727
728         read_lock(&ksocknal_data.ksnd_global_lock);
729
730         hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list) {
731                 LASSERT(!peer_ni->ksnp_closing);
732
733                 if (peer_ni->ksnp_ni != ni)
734                         continue;
735
736                 list_for_each_entry(conn, &peer_ni->ksnp_conns,
737                                     ksnc_list) {
738                         if (index-- > 0)
739                                 continue;
740
741                         ksocknal_conn_addref(conn);
742                         read_unlock(&ksocknal_data.ksnd_global_lock);
743                         return conn;
744                 }
745         }
746
747         read_unlock(&ksocknal_data.ksnd_global_lock);
748         return NULL;
749 }
750
751 static struct ksock_sched *
752 ksocknal_choose_scheduler_locked(unsigned int cpt)
753 {
754         struct ksock_sched *sched = ksocknal_data.ksnd_schedulers[cpt];
755         int i;
756
757         if (sched->kss_nthreads == 0) {
758                 cfs_percpt_for_each(sched, i, ksocknal_data.ksnd_schedulers) {
759                         if (sched->kss_nthreads > 0) {
760                                 CDEBUG(D_NET, "scheduler[%d] has no threads. selected scheduler[%d]\n",
761                                        cpt, sched->kss_cpt);
762                                 return sched;
763                         }
764                 }
765                 return NULL;
766         }
767
768         return sched;
769 }
770
771 int
772 ksocknal_accept(struct lnet_ni *ni, struct socket *sock)
773 {
774         struct ksock_connreq *cr;
775         int rc;
776         struct sockaddr_storage peer;
777
778         rc = lnet_sock_getaddr(sock, true, &peer);
779         if (rc != 0) {
780                 CERROR("Can't determine new connection's address\n");
781                 return rc;
782         }
783
784         LIBCFS_ALLOC(cr, sizeof(*cr));
785         if (cr == NULL) {
786                 LCONSOLE_ERROR_MSG(0x12f,
787                                    "Dropping connection request from %pISc: memory exhausted\n",
788                                    &peer);
789                 return -ENOMEM;
790         }
791
792         lnet_ni_addref(ni);
793         cr->ksncr_ni   = ni;
794         cr->ksncr_sock = sock;
795
796         spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
797
798         list_add_tail(&cr->ksncr_list, &ksocknal_data.ksnd_connd_connreqs);
799         wake_up(&ksocknal_data.ksnd_connd_waitq);
800
801         spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
802         return 0;
803 }
804
805 static const struct ln_key_list ksocknal_tunables_keys = {
806         .lkl_maxattr                    = LNET_NET_SOCKLND_TUNABLES_ATTR_MAX,
807         .lkl_list                       = {
808                 [LNET_NET_SOCKLND_TUNABLES_ATTR_CONNS_PER_PEER]  = {
809                         .lkp_value      = "conns_per_peer",
810                         .lkp_data_type  = NLA_U16
811                 },
812                 [LNET_NET_SOCKLND_TUNABLES_ATTR_LND_TIMEOUT]    = {
813                         .lkp_value      = "timeout",
814                         .lkp_data_type  = NLA_U32
815                 },
816         },
817 };
818
819 static int
820 ksocknal_nl_get(int cmd, struct sk_buff *msg, int type, void *data)
821 {
822         struct lnet_lnd_tunables *tun;
823         struct lnet_ni *ni = data;
824
825         if (!ni || !msg)
826                 return -EINVAL;
827
828          if (cmd != LNET_CMD_NETS || type != LNET_NET_LOCAL_NI_ATTR_LND_TUNABLES)
829                 return -EOPNOTSUPP;
830
831         tun = &ni->ni_lnd_tunables;
832         nla_put_u16(msg, LNET_NET_SOCKLND_TUNABLES_ATTR_CONNS_PER_PEER,
833                     tun->lnd_tun_u.lnd_sock.lnd_conns_per_peer);
834         nla_put_u32(msg, LNET_NET_SOCKLND_TUNABLES_ATTR_LND_TIMEOUT,
835                     ksocknal_timeout());
836
837         return 0;
838 }
839
840 static inline void
841 ksocknal_nl_set_default(int cmd, int type, void *data)
842 {
843         struct lnet_lnd_tunables *tunables = data;
844         struct lnet_ioctl_config_socklnd_tunables *lt;
845         struct lnet_ioctl_config_socklnd_tunables *df;
846
847         lt = &tunables->lnd_tun_u.lnd_sock;
848         df = &ksock_default_tunables;
849         switch (type) {
850         case LNET_NET_SOCKLND_TUNABLES_ATTR_CONNS_PER_PEER:
851                 lt->lnd_conns_per_peer = df->lnd_conns_per_peer;
852                 break;
853         case LNET_NET_SOCKLND_TUNABLES_ATTR_LND_TIMEOUT:
854                 lt->lnd_timeout = df->lnd_timeout;
855                 fallthrough;
856         default:
857                 break;
858         }
859 }
860
861 static int
862 ksocknal_nl_set(int cmd, struct nlattr *attr, int type, void *data)
863 {
864         struct lnet_lnd_tunables *tunables = data;
865         int rc = 0;
866         s64 num;
867
868         if (cmd != LNET_CMD_NETS)
869                 return -EOPNOTSUPP;
870
871         if (!attr) {
872                 ksocknal_nl_set_default(cmd, type, data);
873                 return 0;
874         }
875
876         if (nla_type(attr) != LN_SCALAR_ATTR_INT_VALUE)
877                 return -EINVAL;
878
879         switch (type) {
880         case LNET_NET_SOCKLND_TUNABLES_ATTR_CONNS_PER_PEER:
881                 /* value values are 1 to 127. Zero mean calculate the value */
882                 num = nla_get_s64(attr);
883                 if (num > -1 && num < 128)
884                         tunables->lnd_tun_u.lnd_sock.lnd_conns_per_peer = num;
885                 else
886                         rc = -ERANGE;
887                 break;
888         case LNET_NET_SOCKLND_TUNABLES_ATTR_LND_TIMEOUT:
889                 num = nla_get_s64(attr);
890                 tunables->lnd_tun_u.lnd_sock.lnd_timeout = num;
891                 fallthrough;
892         default:
893                 break;
894         }
895
896         return rc;
897 }
898
899 static int
900 ksocknal_connecting(struct ksock_conn_cb *conn_cb, struct sockaddr *sa)
901 {
902         if (conn_cb &&
903             rpc_cmp_addr((struct sockaddr *)&conn_cb->ksnr_addr, sa))
904                 return conn_cb->ksnr_connecting;
905         return 0;
906 }
907
908 int
909 ksocknal_create_conn(struct lnet_ni *ni, struct ksock_conn_cb *conn_cb,
910                      struct socket *sock, int type)
911 {
912         rwlock_t *global_lock = &ksocknal_data.ksnd_global_lock;
913         LIST_HEAD(zombies);
914         struct lnet_processid peerid;
915         u64 incarnation;
916         struct ksock_conn *conn;
917         struct ksock_conn *conn2;
918         struct ksock_peer_ni *peer_ni = NULL;
919         struct ksock_peer_ni *peer2;
920         struct ksock_sched *sched;
921         struct ksock_hello_msg *hello;
922         int cpt;
923         struct ksock_tx *tx;
924         struct ksock_tx *txtmp;
925         int rc;
926         int rc2;
927         int active;
928         int num_dup = 0;
929         char *warn = NULL;
930
931         active = (conn_cb != NULL);
932
933         LASSERT(active == (type != SOCKLND_CONN_NONE));
934
935         LIBCFS_ALLOC(conn, sizeof(*conn));
936         if (conn == NULL) {
937                 rc = -ENOMEM;
938                 goto failed_0;
939         }
940
941         conn->ksnc_peer = NULL;
942         conn->ksnc_conn_cb = NULL;
943         conn->ksnc_sock = sock;
944         /* 2 ref, 1 for conn, another extra ref prevents socket
945          * being closed before establishment of connection */
946         refcount_set(&conn->ksnc_sock_refcount, 2);
947         conn->ksnc_type = type;
948         ksocknal_lib_save_callback(sock, conn);
949         refcount_set(&conn->ksnc_conn_refcount, 1); /* 1 ref for me */
950
951         conn->ksnc_rx_ready = 0;
952         conn->ksnc_rx_scheduled = 0;
953
954         INIT_LIST_HEAD(&conn->ksnc_tx_queue);
955         conn->ksnc_tx_ready = 0;
956         conn->ksnc_tx_scheduled = 0;
957         conn->ksnc_tx_carrier = NULL;
958         atomic_set (&conn->ksnc_tx_nob, 0);
959
960         LIBCFS_ALLOC(hello, offsetof(struct ksock_hello_msg,
961                                      kshm_ips[LNET_INTERFACES_NUM]));
962         if (hello == NULL) {
963                 rc = -ENOMEM;
964                 goto failed_1;
965         }
966
967         /* stash conn's local and remote addrs */
968         rc = ksocknal_lib_get_conn_addrs(conn);
969         if (rc != 0)
970                 goto failed_1;
971
972         /* Find out/confirm peer_ni's NID and connection type and get the
973          * vector of interfaces she's willing to let me connect to.
974          * Passive connections use the listener timeout since the peer_ni sends
975          * eagerly
976          */
977         if (active) {
978                 struct sockaddr_in *psa = (void *)&conn->ksnc_peeraddr;
979
980                 peer_ni = conn_cb->ksnr_peer;
981                 LASSERT(ni == peer_ni->ksnp_ni);
982
983                 /* Active connection sends HELLO eagerly */
984                 hello->kshm_nips = 0;
985                 peerid = peer_ni->ksnp_id;
986
987                 write_lock_bh(global_lock);
988                 conn->ksnc_proto = peer_ni->ksnp_proto;
989                 write_unlock_bh(global_lock);
990
991                 if (conn->ksnc_proto == NULL) {
992                         if (psa->sin_family == AF_INET6)
993                                 conn->ksnc_proto = &ksocknal_protocol_v4x;
994                         else if (psa->sin_family == AF_INET)
995                                 conn->ksnc_proto = &ksocknal_protocol_v3x;
996 #if SOCKNAL_VERSION_DEBUG
997                         if (*ksocknal_tunables.ksnd_protocol == 2)
998                                 conn->ksnc_proto = &ksocknal_protocol_v2x;
999                         else if (*ksocknal_tunables.ksnd_protocol == 1)
1000                                 conn->ksnc_proto = &ksocknal_protocol_v1x;
1001 #endif
1002                 }
1003                 if (!conn->ksnc_proto) {
1004                         rc = -EPROTO;
1005                         goto failed_1;
1006                 }
1007
1008                 rc = ksocknal_send_hello(ni, conn, &peerid.nid, hello);
1009                 if (rc != 0)
1010                         goto failed_1;
1011         } else {
1012                 peerid.nid = LNET_ANY_NID;
1013                 peerid.pid = LNET_PID_ANY;
1014
1015                 /* Passive, get protocol from peer_ni */
1016                 conn->ksnc_proto = NULL;
1017         }
1018
1019         rc = ksocknal_recv_hello(ni, conn, hello, &peerid, &incarnation);
1020         if (rc < 0)
1021                 goto failed_1;
1022
1023         LASSERT(rc == 0 || active);
1024         LASSERT(conn->ksnc_proto != NULL);
1025         LASSERT(!LNET_NID_IS_ANY(&peerid.nid));
1026
1027         cpt = lnet_nid2cpt(&peerid.nid, ni);
1028
1029         if (active) {
1030                 ksocknal_peer_addref(peer_ni);
1031                 write_lock_bh(global_lock);
1032         } else {
1033                 peer_ni = ksocknal_create_peer(ni, &peerid);
1034                 if (IS_ERR(peer_ni)) {
1035                         rc = PTR_ERR(peer_ni);
1036                         goto failed_1;
1037                 }
1038
1039                 write_lock_bh(global_lock);
1040
1041                 /* called with a ref on ni, so shutdown can't have started */
1042                 LASSERT(atomic_read(&((struct ksock_net *)ni->ni_data)->ksnn_npeers) >= 0);
1043
1044                 peer2 = ksocknal_find_peer_locked(ni, &peerid);
1045                 if (peer2 == NULL) {
1046                         /* NB this puts an "empty" peer_ni in the peer_ni
1047                          * table (which takes my ref) */
1048                         hash_add(ksocknal_data.ksnd_peers,
1049                                  &peer_ni->ksnp_list, nidhash(&peerid.nid));
1050                 } else {
1051                         ksocknal_peer_decref(peer_ni);
1052                         peer_ni = peer2;
1053                 }
1054
1055                 /* +1 ref for me */
1056                 ksocknal_peer_addref(peer_ni);
1057                 peer_ni->ksnp_accepting++;
1058
1059                 /* Am I already connecting to this guy?  Resolve in
1060                  * favour of higher NID...
1061                  */
1062                 if (memcmp(&peerid.nid, &ni->ni_nid, sizeof(peerid.nid)) < 0 &&
1063                     ksocknal_connecting(peer_ni->ksnp_conn_cb,
1064                                         ((struct sockaddr *) &conn->ksnc_peeraddr))) {
1065                         rc = EALREADY;
1066                         warn = "connection race resolution";
1067                         goto failed_2;
1068                 }
1069         }
1070
1071         if (peer_ni->ksnp_closing ||
1072             (active && conn_cb->ksnr_deleted)) {
1073                 /* peer_ni/conn_cb got closed under me */
1074                 rc = -ESTALE;
1075                 warn = "peer_ni/conn_cb removed";
1076                 goto failed_2;
1077         }
1078
1079         if (peer_ni->ksnp_proto == NULL) {
1080                 /* Never connected before.
1081                  * NB recv_hello may have returned EPROTO to signal my peer_ni
1082                  * wants a different protocol than the one I asked for.
1083                  */
1084                 LASSERT(list_empty(&peer_ni->ksnp_conns));
1085
1086                 peer_ni->ksnp_proto = conn->ksnc_proto;
1087                 peer_ni->ksnp_incarnation = incarnation;
1088         }
1089
1090         if (peer_ni->ksnp_proto != conn->ksnc_proto ||
1091             peer_ni->ksnp_incarnation != incarnation) {
1092                 /* peer_ni rebooted or I've got the wrong protocol version */
1093                 ksocknal_close_peer_conns_locked(peer_ni, NULL, 0);
1094
1095                 peer_ni->ksnp_proto = NULL;
1096                 rc = ESTALE;
1097                 warn = peer_ni->ksnp_incarnation != incarnation ?
1098                         "peer_ni rebooted" :
1099                         "wrong proto version";
1100                 goto failed_2;
1101         }
1102
1103         switch (rc) {
1104         default:
1105                 LBUG();
1106         case 0:
1107                 break;
1108         case EALREADY:
1109                 warn = "lost conn race";
1110                 goto failed_2;
1111         case EPROTO:
1112                 warn = "retry with different protocol version";
1113                 goto failed_2;
1114         }
1115
1116         /* Refuse to duplicate an existing connection, unless this is a
1117          * loopback connection */
1118         if (!rpc_cmp_addr((struct sockaddr *)&conn->ksnc_peeraddr,
1119                           (struct sockaddr *)&conn->ksnc_myaddr)) {
1120                 list_for_each_entry(conn2, &peer_ni->ksnp_conns, ksnc_list) {
1121                         if (!rpc_cmp_addr(
1122                                     (struct sockaddr *)&conn2->ksnc_peeraddr,
1123                                     (struct sockaddr *)&conn->ksnc_peeraddr) ||
1124                             !rpc_cmp_addr(
1125                                     (struct sockaddr *)&conn2->ksnc_myaddr,
1126                                     (struct sockaddr *)&conn->ksnc_myaddr) ||
1127                             conn2->ksnc_type != conn->ksnc_type)
1128                                 continue;
1129
1130                         num_dup++;
1131                         /* If max conns per type is not registered in conn_cb
1132                          * as ksnr_max_conns, use ni's conns_per_peer
1133                          */
1134                         if ((peer_ni->ksnp_conn_cb &&
1135                             num_dup < peer_ni->ksnp_conn_cb->ksnr_max_conns) ||
1136                             (!peer_ni->ksnp_conn_cb &&
1137                             num_dup < ksocknal_get_conns_per_peer(peer_ni)))
1138                                 continue;
1139
1140                         /* Reply on a passive connection attempt so the peer_ni
1141                          * realises we're connected.
1142                          */
1143                         LASSERT(rc == 0);
1144                         if (!active)
1145                                 rc = EALREADY;
1146
1147                         warn = "duplicate";
1148                         goto failed_2;
1149                 }
1150         }
1151         /* If the connection created by this route didn't bind to the IP
1152          * address the route connected to, the connection/route matching
1153          * code below probably isn't going to work.
1154          */
1155         if (active &&
1156             !rpc_cmp_addr((struct sockaddr *)&conn_cb->ksnr_addr,
1157                           (struct sockaddr *)&conn->ksnc_peeraddr)) {
1158                 CERROR("Route %s %pISc connected to %pISc\n",
1159                        libcfs_idstr(&peer_ni->ksnp_id),
1160                        &conn_cb->ksnr_addr,
1161                        &conn->ksnc_peeraddr);
1162         }
1163
1164         /* Search for a conn_cb corresponding to the new connection and
1165          * create an association.  This allows incoming connections created
1166          * by conn_cbs in my peer_ni to match my own conn_cb entries so I don't
1167          * continually create duplicate conn_cbs.
1168          */
1169         conn_cb = peer_ni->ksnp_conn_cb;
1170
1171         if (conn_cb && rpc_cmp_addr((struct sockaddr *)&conn->ksnc_peeraddr,
1172                                     (struct sockaddr *)&conn_cb->ksnr_addr))
1173                 ksocknal_associate_cb_conn_locked(conn_cb, conn);
1174
1175         conn->ksnc_peer = peer_ni;                 /* conn takes my ref on peer_ni */
1176         peer_ni->ksnp_last_alive = ktime_get_seconds();
1177         peer_ni->ksnp_send_keepalive = 0;
1178         peer_ni->ksnp_error = 0;
1179
1180         sched = ksocknal_choose_scheduler_locked(cpt);
1181         if (!sched) {
1182                 CERROR("no schedulers available. node is unhealthy\n");
1183                 goto failed_2;
1184         }
1185         /*
1186          * The cpt might have changed if we ended up selecting a non cpt
1187          * native scheduler. So use the scheduler's cpt instead.
1188          */
1189         cpt = sched->kss_cpt;
1190         sched->kss_nconns++;
1191         conn->ksnc_scheduler = sched;
1192
1193         conn->ksnc_tx_last_post = ktime_get_seconds();
1194         /* Set the deadline for the outgoing HELLO to drain */
1195         conn->ksnc_tx_bufnob = sock->sk->sk_wmem_queued;
1196         conn->ksnc_tx_deadline = ktime_get_seconds() +
1197                                  ksocknal_timeout();
1198         smp_mb();   /* order with adding to peer_ni's conn list */
1199
1200         list_add(&conn->ksnc_list, &peer_ni->ksnp_conns);
1201         ksocknal_conn_addref(conn);
1202
1203         ksocknal_new_packet(conn, 0);
1204
1205         conn->ksnc_zc_capable = ksocknal_lib_zc_capable(conn);
1206
1207         /* Take packets blocking for this connection. */
1208         list_for_each_entry_safe(tx, txtmp, &peer_ni->ksnp_tx_queue, tx_list) {
1209                 if (conn->ksnc_proto->pro_match_tx(conn, tx, tx->tx_nonblk) ==
1210                     SOCKNAL_MATCH_NO)
1211                         continue;
1212
1213                 list_del(&tx->tx_list);
1214                 ksocknal_queue_tx_locked(tx, conn);
1215         }
1216
1217         write_unlock_bh(global_lock);
1218         /* We've now got a new connection.  Any errors from here on are just
1219          * like "normal" comms errors and we close the connection normally.
1220          * NB (a) we still have to send the reply HELLO for passive
1221          *        connections,
1222          *    (b) normal I/O on the conn is blocked until I setup and call the
1223          *        socket callbacks.
1224          */
1225
1226         CDEBUG(D_NET, "New conn %s p %d.x %pISc -> %pIScp"
1227                " incarnation:%lld sched[%d]\n",
1228                libcfs_idstr(&peerid), conn->ksnc_proto->pro_version,
1229                &conn->ksnc_myaddr, &conn->ksnc_peeraddr,
1230                incarnation, cpt);
1231
1232         if (!active) {
1233                 hello->kshm_nips = 0;
1234                 rc = ksocknal_send_hello(ni, conn, &peerid.nid, hello);
1235         }
1236
1237         LIBCFS_FREE(hello, offsetof(struct ksock_hello_msg,
1238                                     kshm_ips[LNET_INTERFACES_NUM]));
1239
1240         /* setup the socket AFTER I've received hello (it disables
1241          * SO_LINGER).  I might call back to the acceptor who may want
1242          * to send a protocol version response and then close the
1243          * socket; this ensures the socket only tears down after the
1244          * response has been sent.
1245          */
1246         if (rc == 0)
1247                 rc = ksocknal_lib_setup_sock(sock);
1248
1249         write_lock_bh(global_lock);
1250
1251         /* NB my callbacks block while I hold ksnd_global_lock */
1252         ksocknal_lib_set_callback(sock, conn);
1253
1254         if (!active)
1255                 peer_ni->ksnp_accepting--;
1256
1257         write_unlock_bh(global_lock);
1258
1259         if (rc != 0) {
1260                 write_lock_bh(global_lock);
1261                 if (!conn->ksnc_closing) {
1262                         /* could be closed by another thread */
1263                         ksocknal_close_conn_locked(conn, rc);
1264                 }
1265                 write_unlock_bh(global_lock);
1266         } else if (ksocknal_connsock_addref(conn) == 0) {
1267                 /* Allow I/O to proceed. */
1268                 ksocknal_read_callback(conn);
1269                 ksocknal_write_callback(conn);
1270                 ksocknal_connsock_decref(conn);
1271         }
1272
1273         ksocknal_connsock_decref(conn);
1274         ksocknal_conn_decref(conn);
1275         return rc;
1276
1277 failed_2:
1278
1279         if (!peer_ni->ksnp_closing &&
1280             list_empty(&peer_ni->ksnp_conns) &&
1281             peer_ni->ksnp_conn_cb == NULL) {
1282                 list_splice_init(&peer_ni->ksnp_tx_queue, &zombies);
1283                 ksocknal_unlink_peer_locked(peer_ni);
1284         }
1285
1286         write_unlock_bh(global_lock);
1287
1288         if (warn != NULL) {
1289                 if (rc < 0)
1290                         CERROR("Not creating conn %s type %d: %s\n",
1291                                libcfs_idstr(&peerid), conn->ksnc_type, warn);
1292                 else
1293                         CDEBUG(D_NET, "Not creating conn %s type %d: %s\n",
1294                                libcfs_idstr(&peerid), conn->ksnc_type, warn);
1295         }
1296
1297         if (!active) {
1298                 if (rc > 0) {
1299                         /* Request retry by replying with CONN_NONE
1300                          * ksnc_proto has been set already
1301                          */
1302                         conn->ksnc_type = SOCKLND_CONN_NONE;
1303                         hello->kshm_nips = 0;
1304                         ksocknal_send_hello(ni, conn, &peerid.nid, hello);
1305                 }
1306
1307                 write_lock_bh(global_lock);
1308                 peer_ni->ksnp_accepting--;
1309                 write_unlock_bh(global_lock);
1310         }
1311
1312         /*
1313          * If we get here without an error code, just use -EALREADY.
1314          * Depending on how we got here, the error may be positive
1315          * or negative. Normalize the value for ksocknal_txlist_done().
1316          */
1317         rc2 = (rc == 0 ? -EALREADY : (rc > 0 ? -rc : rc));
1318         ksocknal_txlist_done(ni, &zombies, rc2);
1319         ksocknal_peer_decref(peer_ni);
1320
1321 failed_1:
1322         if (hello != NULL)
1323                 LIBCFS_FREE(hello, offsetof(struct ksock_hello_msg,
1324                                             kshm_ips[LNET_INTERFACES_NUM]));
1325
1326         LIBCFS_FREE(conn, sizeof(*conn));
1327
1328 failed_0:
1329         sock_release(sock);
1330
1331         return rc;
1332 }
1333
1334 void
1335 ksocknal_close_conn_locked(struct ksock_conn *conn, int error)
1336 {
1337         /* This just does the immmediate housekeeping, and queues the
1338          * connection for the reaper to terminate.
1339          * Caller holds ksnd_global_lock exclusively in irq context */
1340         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1341         struct ksock_conn_cb *conn_cb;
1342         struct ksock_conn *conn2;
1343         int conn_count;
1344         int duplicate_count = 0;
1345
1346         LASSERT(peer_ni->ksnp_error == 0);
1347         LASSERT(!conn->ksnc_closing);
1348         conn->ksnc_closing = 1;
1349
1350         /* ksnd_deathrow_conns takes over peer_ni's ref */
1351         list_del(&conn->ksnc_list);
1352
1353         conn_cb = conn->ksnc_conn_cb;
1354         if (conn_cb != NULL) {
1355                 /* dissociate conn from cb... */
1356                 LASSERT(!conn_cb->ksnr_deleted);
1357
1358                 conn_count = ksocknal_get_conn_count_by_type(conn_cb,
1359                                                              conn->ksnc_type);
1360                 /* connected bit is set only if all connections
1361                  * of the given type got created
1362                  */
1363                 if (conn_count == conn_cb->ksnr_max_conns)
1364                         LASSERT((conn_cb->ksnr_connected &
1365                                 BIT(conn->ksnc_type)) != 0);
1366
1367                 if (conn_count == 1) {
1368                         list_for_each_entry(conn2, &peer_ni->ksnp_conns,
1369                                             ksnc_list) {
1370                                 if (conn2->ksnc_conn_cb == conn_cb &&
1371                                     conn2->ksnc_type == conn->ksnc_type)
1372                                         duplicate_count += 1;
1373                         }
1374                         if (duplicate_count > 0)
1375                                 CERROR("Found %d duplicate conns type %d\n",
1376                                        duplicate_count,
1377                                        conn->ksnc_type);
1378                 }
1379                 ksocknal_decr_conn_count(conn_cb, conn->ksnc_type);
1380
1381                 conn->ksnc_conn_cb = NULL;
1382
1383                 /* drop conn's ref on conn_cb */
1384                 ksocknal_conn_cb_decref(conn_cb);
1385         }
1386
1387         if (list_empty(&peer_ni->ksnp_conns)) {
1388                 /* No more connections to this peer_ni */
1389
1390                 if (!list_empty(&peer_ni->ksnp_tx_queue)) {
1391                         struct ksock_tx *tx;
1392
1393                         LASSERT(conn->ksnc_proto == &ksocknal_protocol_v3x);
1394
1395                         /* throw them to the last connection...,
1396                          * these TXs will be send to /dev/null by scheduler */
1397                         list_for_each_entry(tx, &peer_ni->ksnp_tx_queue,
1398                                             tx_list)
1399                                 ksocknal_tx_prep(conn, tx);
1400
1401                         spin_lock_bh(&conn->ksnc_scheduler->kss_lock);
1402                         list_splice_init(&peer_ni->ksnp_tx_queue,
1403                                          &conn->ksnc_tx_queue);
1404                         spin_unlock_bh(&conn->ksnc_scheduler->kss_lock);
1405                 }
1406
1407                 /* renegotiate protocol version */
1408                 peer_ni->ksnp_proto = NULL;
1409                 /* stash last conn close reason */
1410                 peer_ni->ksnp_error = error;
1411
1412                 if (peer_ni->ksnp_conn_cb == NULL) {
1413                         /* I've just closed last conn belonging to a
1414                          * peer_ni with no connections to it
1415                          */
1416                         ksocknal_unlink_peer_locked(peer_ni);
1417                 }
1418         }
1419
1420         spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
1421
1422         list_add_tail(&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
1423         wake_up(&ksocknal_data.ksnd_reaper_waitq);
1424
1425         spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
1426 }
1427
1428 void
1429 ksocknal_peer_failed(struct ksock_peer_ni *peer_ni)
1430 {
1431         bool notify = false;
1432         time64_t last_alive = 0;
1433
1434         /* There has been a connection failure or comms error; but I'll only
1435          * tell LNET I think the peer_ni is dead if it's to another kernel and
1436          * there are no connections or connection attempts in existence. */
1437
1438         read_lock(&ksocknal_data.ksnd_global_lock);
1439
1440         if ((peer_ni->ksnp_id.pid & LNET_PID_USERFLAG) == 0 &&
1441              list_empty(&peer_ni->ksnp_conns) &&
1442              peer_ni->ksnp_accepting == 0 &&
1443              !ksocknal_find_connecting_conn_cb_locked(peer_ni)) {
1444                 notify = true;
1445                 last_alive = peer_ni->ksnp_last_alive;
1446         }
1447
1448         read_unlock(&ksocknal_data.ksnd_global_lock);
1449
1450         if (notify)
1451                 lnet_notify(peer_ni->ksnp_ni,
1452                             &peer_ni->ksnp_id.nid,
1453                             false, false, last_alive);
1454 }
1455
1456 void
1457 ksocknal_finalize_zcreq(struct ksock_conn *conn)
1458 {
1459         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1460         struct ksock_tx *tx;
1461         struct ksock_tx *tmp;
1462         LIST_HEAD(zlist);
1463
1464         /* NB safe to finalize TXs because closing of socket will
1465          * abort all buffered data */
1466         LASSERT(conn->ksnc_sock == NULL);
1467
1468         spin_lock(&peer_ni->ksnp_lock);
1469
1470         list_for_each_entry_safe(tx, tmp, &peer_ni->ksnp_zc_req_list,
1471                                  tx_zc_list) {
1472                 if (tx->tx_conn != conn)
1473                         continue;
1474
1475                 LASSERT(tx->tx_msg.ksm_zc_cookies[0] != 0);
1476
1477                 tx->tx_msg.ksm_zc_cookies[0] = 0;
1478                 tx->tx_zc_aborted = 1;  /* mark it as not-acked */
1479                 list_move(&tx->tx_zc_list, &zlist);
1480         }
1481
1482         spin_unlock(&peer_ni->ksnp_lock);
1483
1484         while ((tx = list_first_entry_or_null(&zlist, struct ksock_tx,
1485                                               tx_zc_list)) != NULL) {
1486                 list_del(&tx->tx_zc_list);
1487                 ksocknal_tx_decref(tx);
1488         }
1489 }
1490
1491 void
1492 ksocknal_terminate_conn(struct ksock_conn *conn)
1493 {
1494         /* This gets called by the reaper (guaranteed thread context) to
1495          * disengage the socket from its callbacks and close it.
1496          * ksnc_refcount will eventually hit zero, and then the reaper will
1497          * destroy it.
1498          */
1499         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1500         struct ksock_sched *sched = conn->ksnc_scheduler;
1501         bool failed = false;
1502
1503         LASSERT(conn->ksnc_closing);
1504
1505         /* wake up the scheduler to "send" all remaining packets to /dev/null */
1506         spin_lock_bh(&sched->kss_lock);
1507
1508         /* a closing conn is always ready to tx */
1509         conn->ksnc_tx_ready = 1;
1510
1511         if (!conn->ksnc_tx_scheduled &&
1512             !list_empty(&conn->ksnc_tx_queue)) {
1513                 list_add_tail(&conn->ksnc_tx_list,
1514                               &sched->kss_tx_conns);
1515                 conn->ksnc_tx_scheduled = 1;
1516                 /* extra ref for scheduler */
1517                 ksocknal_conn_addref(conn);
1518
1519                 wake_up(&sched->kss_waitq);
1520         }
1521
1522         spin_unlock_bh(&sched->kss_lock);
1523
1524         /* serialise with callbacks */
1525         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1526
1527         ksocknal_lib_reset_callback(conn->ksnc_sock, conn);
1528
1529         /* OK, so this conn may not be completely disengaged from its
1530          * scheduler yet, but it _has_ committed to terminate...
1531          */
1532         conn->ksnc_scheduler->kss_nconns--;
1533
1534         if (peer_ni->ksnp_error != 0) {
1535                 /* peer_ni's last conn closed in error */
1536                 LASSERT(list_empty(&peer_ni->ksnp_conns));
1537                 failed = true;
1538                 peer_ni->ksnp_error = 0;     /* avoid multiple notifications */
1539         }
1540
1541         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1542
1543         if (failed)
1544                 ksocknal_peer_failed(peer_ni);
1545
1546         /* The socket is closed on the final put; either here, or in
1547          * ksocknal_{send,recv}msg().  Since we set up the linger2 option
1548          * when the connection was established, this will close the socket
1549          * immediately, aborting anything buffered in it. Any hung
1550          * zero-copy transmits will therefore complete in finite time.
1551          */
1552         ksocknal_connsock_decref(conn);
1553 }
1554
1555 void
1556 ksocknal_queue_zombie_conn(struct ksock_conn *conn)
1557 {
1558         /* Queue the conn for the reaper to destroy */
1559         LASSERT(refcount_read(&conn->ksnc_conn_refcount) == 0);
1560         spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
1561
1562         list_add_tail(&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1563         wake_up(&ksocknal_data.ksnd_reaper_waitq);
1564
1565         spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
1566 }
1567
1568 void
1569 ksocknal_destroy_conn(struct ksock_conn *conn)
1570 {
1571         time64_t last_rcv;
1572
1573         /* Final coup-de-grace of the reaper */
1574         CDEBUG(D_NET, "connection %p\n", conn);
1575
1576         LASSERT(refcount_read(&conn->ksnc_conn_refcount) == 0);
1577         LASSERT(refcount_read(&conn->ksnc_sock_refcount) == 0);
1578         LASSERT(conn->ksnc_sock == NULL);
1579         LASSERT(conn->ksnc_conn_cb == NULL);
1580         LASSERT(!conn->ksnc_tx_scheduled);
1581         LASSERT(!conn->ksnc_rx_scheduled);
1582         LASSERT(list_empty(&conn->ksnc_tx_queue));
1583
1584         /* complete current receive if any */
1585         switch (conn->ksnc_rx_state) {
1586         case SOCKNAL_RX_LNET_PAYLOAD:
1587                 last_rcv = conn->ksnc_rx_deadline -
1588                            ksocknal_timeout();
1589                 CERROR("Completing partial receive from %s[%d], ip %pIScp, with error, wanted: %d, left: %d, last alive is %lld secs ago\n",
1590                        libcfs_idstr(&conn->ksnc_peer->ksnp_id),
1591                        conn->ksnc_type,
1592                        &conn->ksnc_peeraddr,
1593                        conn->ksnc_rx_nob_wanted, conn->ksnc_rx_nob_left,
1594                        ktime_get_seconds() - last_rcv);
1595                 if (conn->ksnc_lnet_msg)
1596                         conn->ksnc_lnet_msg->msg_health_status =
1597                                 LNET_MSG_STATUS_REMOTE_ERROR;
1598                 lnet_finalize(conn->ksnc_lnet_msg, -EIO);
1599                 break;
1600         case SOCKNAL_RX_LNET_HEADER:
1601                 if (conn->ksnc_rx_started)
1602                         CERROR("Incomplete receive of lnet header from %s, ip %pIScp, with error, protocol: %d.x.\n",
1603                                libcfs_idstr(&conn->ksnc_peer->ksnp_id),
1604                                &conn->ksnc_peeraddr,
1605                                conn->ksnc_proto->pro_version);
1606                 break;
1607         case SOCKNAL_RX_KSM_HEADER:
1608                 if (conn->ksnc_rx_started)
1609                         CERROR("Incomplete receive of ksock message from %s, ip %pIScp, with error, protocol: %d.x.\n",
1610                                libcfs_idstr(&conn->ksnc_peer->ksnp_id),
1611                                &conn->ksnc_peeraddr,
1612                                conn->ksnc_proto->pro_version);
1613                 break;
1614         case SOCKNAL_RX_SLOP:
1615                 if (conn->ksnc_rx_started)
1616                         CERROR("Incomplete receive of slops from %s, ip %pIScp, with error\n",
1617                                libcfs_idstr(&conn->ksnc_peer->ksnp_id),
1618                                &conn->ksnc_peeraddr);
1619                 break;
1620         default:
1621                 LBUG();
1622                 break;
1623         }
1624
1625         ksocknal_peer_decref(conn->ksnc_peer);
1626
1627         LIBCFS_FREE(conn, sizeof(*conn));
1628 }
1629
1630 int
1631 ksocknal_close_peer_conns_locked(struct ksock_peer_ni *peer_ni,
1632                                  struct sockaddr *addr, int why)
1633 {
1634         struct ksock_conn *conn;
1635         struct ksock_conn *cnxt;
1636         int count = 0;
1637
1638         list_for_each_entry_safe(conn, cnxt, &peer_ni->ksnp_conns, ksnc_list) {
1639                 if (!addr ||
1640                     rpc_cmp_addr(addr,
1641                                  (struct sockaddr *)&conn->ksnc_peeraddr)) {
1642                         count++;
1643                         ksocknal_close_conn_locked(conn, why);
1644                 }
1645         }
1646
1647         return count;
1648 }
1649
1650 int
1651 ksocknal_close_conn_and_siblings(struct ksock_conn *conn, int why)
1652 {
1653         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1654         int count;
1655
1656         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1657
1658         count = ksocknal_close_peer_conns_locked(
1659                 peer_ni, (struct sockaddr *)&conn->ksnc_peeraddr, why);
1660
1661         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1662
1663         return count;
1664 }
1665
1666 int
1667 ksocknal_close_matching_conns(struct lnet_processid *id, __u32 ipaddr)
1668 {
1669         struct ksock_peer_ni *peer_ni;
1670         struct hlist_node *pnxt;
1671         int lo;
1672         int hi;
1673         int i;
1674         int count = 0;
1675         struct sockaddr_in sa = {.sin_family = AF_INET};
1676
1677         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1678
1679         if (!LNET_NID_IS_ANY(&id->nid)) {
1680                 lo = hash_min(nidhash(&id->nid),
1681                               HASH_BITS(ksocknal_data.ksnd_peers));
1682                 hi = lo;
1683         } else {
1684                 lo = 0;
1685                 hi = HASH_SIZE(ksocknal_data.ksnd_peers) - 1;
1686         }
1687
1688         sa.sin_addr.s_addr = htonl(ipaddr);
1689         for (i = lo; i <= hi; i++) {
1690                 hlist_for_each_entry_safe(peer_ni, pnxt,
1691                                           &ksocknal_data.ksnd_peers[i],
1692                                           ksnp_list) {
1693
1694                         if (!((LNET_NID_IS_ANY(&id->nid) ||
1695                                nid_same(&id->nid, &peer_ni->ksnp_id.nid)) &&
1696                               (id->pid == LNET_PID_ANY ||
1697                                id->pid == peer_ni->ksnp_id.pid)))
1698                                 continue;
1699
1700                         count += ksocknal_close_peer_conns_locked(
1701                                 peer_ni,
1702                                 ipaddr ? (struct sockaddr *)&sa : NULL, 0);
1703                 }
1704         }
1705
1706         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1707
1708         /* wildcards always succeed */
1709         if (LNET_NID_IS_ANY(&id->nid) || id->pid == LNET_PID_ANY ||
1710             ipaddr == 0)
1711                 return 0;
1712
1713         return (count == 0 ? -ENOENT : 0);
1714 }
1715
1716 static void
1717 ksocknal_notify_gw_down(struct lnet_nid *gw_nid)
1718 {
1719         /* The router is telling me she's been notified of a change in
1720          * gateway state....
1721          */
1722         struct lnet_processid id = {
1723                 .pid    = LNET_PID_ANY,
1724                 .nid    = *gw_nid,
1725         };
1726
1727         CDEBUG(D_NET, "gw %s down\n", libcfs_nidstr(gw_nid));
1728
1729         /* If the gateway crashed, close all open connections... */
1730         ksocknal_close_matching_conns(&id, 0);
1731         return;
1732
1733         /* We can only establish new connections
1734          * if we have autroutes, and these connect on demand.
1735          */
1736 }
1737
1738 static void
1739 ksocknal_push_peer(struct ksock_peer_ni *peer_ni)
1740 {
1741         int index;
1742         int i;
1743         struct ksock_conn *conn;
1744
1745         for (index = 0; ; index++) {
1746                 read_lock(&ksocknal_data.ksnd_global_lock);
1747
1748                 i = 0;
1749                 conn = NULL;
1750
1751                 list_for_each_entry(conn, &peer_ni->ksnp_conns, ksnc_list) {
1752                         if (i++ == index) {
1753                                 ksocknal_conn_addref(conn);
1754                                 break;
1755                         }
1756                 }
1757
1758                 read_unlock(&ksocknal_data.ksnd_global_lock);
1759
1760                 if (i <= index)
1761                         break;
1762
1763                 ksocknal_lib_push_conn (conn);
1764                 ksocknal_conn_decref(conn);
1765         }
1766 }
1767
1768 static int
1769 ksocknal_push(struct lnet_ni *ni, struct lnet_processid *id)
1770 {
1771         int lo;
1772         int hi;
1773         int bkt;
1774         int rc = -ENOENT;
1775
1776         if (!LNET_NID_IS_ANY(&id->nid)) {
1777                 lo = hash_min(nidhash(&id->nid),
1778                               HASH_BITS(ksocknal_data.ksnd_peers));
1779                 hi = lo;
1780         } else {
1781                 lo = 0;
1782                 hi = HASH_SIZE(ksocknal_data.ksnd_peers) - 1;
1783         }
1784
1785         for (bkt = lo; bkt <= hi; bkt++) {
1786                 int peer_off; /* searching offset in peer_ni hash table */
1787
1788                 for (peer_off = 0; ; peer_off++) {
1789                         struct ksock_peer_ni *peer_ni;
1790                         int           i = 0;
1791
1792                         read_lock(&ksocknal_data.ksnd_global_lock);
1793                         hlist_for_each_entry(peer_ni,
1794                                              &ksocknal_data.ksnd_peers[bkt],
1795                                              ksnp_list) {
1796                                 if (!((LNET_NID_IS_ANY(&id->nid) ||
1797                                        nid_same(&id->nid,
1798                                                  &peer_ni->ksnp_id.nid)) &&
1799                                       (id->pid == LNET_PID_ANY ||
1800                                        id->pid == peer_ni->ksnp_id.pid)))
1801                                         continue;
1802
1803                                 if (i++ == peer_off) {
1804                                         ksocknal_peer_addref(peer_ni);
1805                                         break;
1806                                 }
1807                         }
1808                         read_unlock(&ksocknal_data.ksnd_global_lock);
1809
1810                         if (i <= peer_off) /* no match */
1811                                 break;
1812
1813                         rc = 0;
1814                         ksocknal_push_peer(peer_ni);
1815                         ksocknal_peer_decref(peer_ni);
1816                 }
1817         }
1818         return rc;
1819 }
1820
1821 int
1822 ksocknal_ctl(struct lnet_ni *ni, unsigned int cmd, void *arg)
1823 {
1824         struct lnet_processid id = {};
1825         struct libcfs_ioctl_data *data = arg;
1826         int rc;
1827
1828         switch(cmd) {
1829         case IOC_LIBCFS_GET_INTERFACE: {
1830                 struct ksock_net *net = ni->ni_data;
1831                 struct ksock_interface *iface;
1832                 struct sockaddr_in *sa;
1833
1834                 read_lock(&ksocknal_data.ksnd_global_lock);
1835
1836                 if (data->ioc_count >= 1) {
1837                         rc = -ENOENT;
1838                 } else {
1839                         rc = 0;
1840                         iface = &net->ksnn_interface;
1841
1842                         sa = (void *)&iface->ksni_addr;
1843                         if (sa->sin_family == AF_INET) {
1844                                 data->ioc_u32[0] = ntohl(sa->sin_addr.s_addr);
1845                                 data->ioc_u32[1] = iface->ksni_netmask;
1846                         } else {
1847                                 data->ioc_u32[0] = 0xFFFFFFFF;
1848                                 data->ioc_u32[1] = 0;
1849                         }
1850                         data->ioc_u32[2] = iface->ksni_npeers;
1851                         data->ioc_u32[3] = iface->ksni_nroutes;
1852                 }
1853
1854                 read_unlock(&ksocknal_data.ksnd_global_lock);
1855                 return rc;
1856         }
1857
1858         case IOC_LIBCFS_GET_PEER: {
1859                 __u32            myip = 0;
1860                 __u32            ip = 0;
1861                 int              port = 0;
1862                 int              conn_count = 0;
1863                 int              share_count = 0;
1864
1865                 rc = ksocknal_get_peer_info(ni, data->ioc_count,
1866                                             &id, &myip, &ip, &port,
1867                                             &conn_count,  &share_count);
1868                 if (rc != 0)
1869                         return rc;
1870
1871                 if (!nid_is_nid4(&id.nid))
1872                         return -EINVAL;
1873                 data->ioc_nid    = lnet_nid_to_nid4(&id.nid);
1874                 data->ioc_count  = share_count;
1875                 data->ioc_u32[0] = ip;
1876                 data->ioc_u32[1] = port;
1877                 data->ioc_u32[2] = myip;
1878                 data->ioc_u32[3] = conn_count;
1879                 data->ioc_u32[4] = id.pid;
1880                 return 0;
1881         }
1882
1883         case IOC_LIBCFS_ADD_PEER: {
1884                 struct sockaddr_in sa = {.sin_family = AF_INET};
1885
1886                 id.pid = LNET_PID_LUSTRE;
1887                 lnet_nid4_to_nid(data->ioc_nid, &id.nid);
1888                 sa.sin_addr.s_addr = htonl(data->ioc_u32[0]);
1889                 sa.sin_port = htons(data->ioc_u32[1]);
1890                 return ksocknal_add_peer(ni, &id, (struct sockaddr *)&sa);
1891         }
1892         case IOC_LIBCFS_DEL_PEER:
1893                 lnet_nid4_to_nid(data->ioc_nid, &id.nid);
1894                 id.pid = LNET_PID_ANY;
1895                 return ksocknal_del_peer(ni, &id);
1896
1897         case IOC_LIBCFS_GET_CONN: {
1898                 int           txmem;
1899                 int           rxmem;
1900                 int           nagle;
1901                 struct ksock_conn *conn = ksocknal_get_conn_by_idx(ni, data->ioc_count);
1902                 struct sockaddr_in *psa = (void *)&conn->ksnc_peeraddr;
1903                 struct sockaddr_in *mysa = (void *)&conn->ksnc_myaddr;
1904
1905                 if (conn == NULL)
1906                         return -ENOENT;
1907
1908                 ksocknal_lib_get_conn_tunables(conn, &txmem, &rxmem, &nagle);
1909
1910                 data->ioc_count = txmem;
1911                 data->ioc_nid = lnet_nid_to_nid4(&conn->ksnc_peer->ksnp_id.nid);
1912                 data->ioc_flags = nagle;
1913                 if (psa->sin_family == AF_INET)
1914                         data->ioc_u32[0] = ntohl(psa->sin_addr.s_addr);
1915                 else
1916                         data->ioc_u32[0] = 0xFFFFFFFF;
1917                 data->ioc_u32[1] = rpc_get_port((struct sockaddr *)
1918                                                 &conn->ksnc_peeraddr);
1919                 if (mysa->sin_family == AF_INET)
1920                         data->ioc_u32[2] = ntohl(mysa->sin_addr.s_addr);
1921                 else
1922                         data->ioc_u32[2] = 0xFFFFFFFF;
1923                 data->ioc_u32[3] = conn->ksnc_type;
1924                 data->ioc_u32[4] = conn->ksnc_scheduler->kss_cpt;
1925                 data->ioc_u32[5] = rxmem;
1926                 data->ioc_u32[6] = conn->ksnc_peer->ksnp_id.pid;
1927                 ksocknal_conn_decref(conn);
1928                 return 0;
1929         }
1930
1931         case IOC_LIBCFS_CLOSE_CONNECTION:
1932                 lnet_nid4_to_nid(data->ioc_nid, &id.nid);
1933                 id.pid = LNET_PID_ANY;
1934                 return ksocknal_close_matching_conns(&id,
1935                                                      data->ioc_u32[0]);
1936
1937         case IOC_LIBCFS_REGISTER_MYNID:
1938                 /* Ignore if this is a noop */
1939                 if (nid_is_nid4(&ni->ni_nid) &&
1940                     data->ioc_nid == lnet_nid_to_nid4(&ni->ni_nid))
1941                         return 0;
1942
1943                 CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID: %s(%s)\n",
1944                        libcfs_nid2str(data->ioc_nid),
1945                        libcfs_nidstr(&ni->ni_nid));
1946                 return -EINVAL;
1947
1948         case IOC_LIBCFS_PUSH_CONNECTION:
1949                 lnet_nid4_to_nid(data->ioc_nid, &id.nid);
1950                 id.pid = LNET_PID_ANY;
1951                 return ksocknal_push(ni, &id);
1952
1953         default:
1954                 return -EINVAL;
1955         }
1956         /* not reached */
1957 }
1958
1959 static void
1960 ksocknal_free_buffers (void)
1961 {
1962         LASSERT (atomic_read(&ksocknal_data.ksnd_nactive_txs) == 0);
1963
1964         if (ksocknal_data.ksnd_schedulers != NULL)
1965                 cfs_percpt_free(ksocknal_data.ksnd_schedulers);
1966
1967         spin_lock(&ksocknal_data.ksnd_tx_lock);
1968
1969         if (!list_empty(&ksocknal_data.ksnd_idle_noop_txs)) {
1970                 LIST_HEAD(zlist);
1971                 struct ksock_tx *tx;
1972
1973                 list_splice_init(&ksocknal_data.ksnd_idle_noop_txs, &zlist);
1974                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
1975
1976                 while ((tx = list_first_entry_or_null(&zlist, struct ksock_tx,
1977                                                       tx_list)) != NULL) {
1978                         list_del(&tx->tx_list);
1979                         LIBCFS_FREE(tx, tx->tx_desc_size);
1980                 }
1981         } else {
1982                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
1983         }
1984 }
1985
1986 static int
1987 ksocknal_handle_link_state_change(struct net_device *dev,
1988                                   unsigned char operstate)
1989 {
1990         struct lnet_ni *ni = NULL;
1991         struct ksock_net *net;
1992         struct ksock_net *cnxt;
1993         int ifindex;
1994         unsigned char link_down;
1995         struct in_device *in_dev;
1996         bool found_ip = false;
1997         struct ksock_interface *ksi = NULL;
1998         struct sockaddr_in *sa;
1999         __u32 ni_state_before;
2000         bool update_ping_buf = false;
2001         int state;
2002         DECLARE_CONST_IN_IFADDR(ifa);
2003
2004         link_down = !((operstate == IF_OPER_UP) || (operstate == IF_OPER_UNKNOWN));
2005         ifindex = dev->ifindex;
2006
2007         if (!ksocknal_data.ksnd_nnets)
2008                 goto out;
2009
2010         list_for_each_entry_safe(net, cnxt, &ksocknal_data.ksnd_nets,
2011                                  ksnn_list) {
2012
2013                 ksi = &net->ksnn_interface;
2014                 sa = (void *)&ksi->ksni_addr;
2015                 found_ip = false;
2016
2017                 if (strcmp(ksi->ksni_name, dev->name))
2018                         continue;
2019
2020                 if (ksi->ksni_index == -1) {
2021                         if (dev->reg_state != NETREG_REGISTERED)
2022                                 continue;
2023                         /* A registration just happened: save the new index for
2024                          * the device */
2025                         ksi->ksni_index = ifindex;
2026                         goto out;
2027                 }
2028
2029                 if (ksi->ksni_index != ifindex)
2030                         continue;
2031
2032                 if (dev->reg_state == NETREG_UNREGISTERING) {
2033                         /* Device is being unregistered, we need to clear the
2034                          * index, it can change when device will be back */
2035                         ksi->ksni_index = -1;
2036                         goto out;
2037                 }
2038
2039                 ni = net->ksnn_ni;
2040
2041                 in_dev = __in_dev_get_rtnl(dev);
2042                 if (!in_dev) {
2043                         CDEBUG(D_NET, "Interface %s has no IPv4 status.\n",
2044                                dev->name);
2045                         ni_state_before = lnet_set_link_fatal_state(ni, 1);
2046                         goto ni_done;
2047                 }
2048                 in_dev_for_each_ifa_rtnl(ifa, in_dev) {
2049                         if (sa->sin_addr.s_addr == ifa->ifa_local)
2050                                 found_ip = true;
2051                 }
2052                 endfor_ifa(in_dev);
2053
2054                 if (!found_ip) {
2055                         CDEBUG(D_NET, "Interface %s has no matching ip\n",
2056                                dev->name);
2057                         ni_state_before = lnet_set_link_fatal_state(ni, 1);
2058                         goto ni_done;
2059                 }
2060
2061                 if (link_down) {
2062                         ni_state_before = lnet_set_link_fatal_state(ni, 1);
2063                 } else {
2064                         state = (lnet_get_link_status(dev) == 0);
2065                         ni_state_before = lnet_set_link_fatal_state(ni,
2066                                                                     state);
2067                 }
2068 ni_done:
2069                 if (!update_ping_buf &&
2070                     (ni->ni_state == LNET_NI_STATE_ACTIVE) &&
2071                     (atomic_read(&ni->ni_fatal_error_on) != ni_state_before))
2072                         update_ping_buf = true;
2073         }
2074
2075         if (update_ping_buf)
2076                 lnet_mark_ping_buffer_for_update();
2077 out:
2078         return 0;
2079 }
2080
2081
2082 static int
2083 ksocknal_handle_inetaddr_change(struct in_ifaddr *ifa, unsigned long event)
2084 {
2085         struct lnet_ni *ni = NULL;
2086         struct ksock_net *net;
2087         struct ksock_net *cnxt;
2088         struct net_device *event_netdev = ifa->ifa_dev->dev;
2089         int ifindex;
2090         struct ksock_interface *ksi = NULL;
2091         struct sockaddr_in *sa;
2092         __u32 ni_state_before;
2093         bool update_ping_buf = false;
2094         bool link_down;
2095
2096         if (!ksocknal_data.ksnd_nnets)
2097                 goto out;
2098
2099         ifindex = event_netdev->ifindex;
2100
2101         list_for_each_entry_safe(net, cnxt, &ksocknal_data.ksnd_nets,
2102                                  ksnn_list) {
2103
2104                 ksi = &net->ksnn_interface;
2105                 sa = (void *)&ksi->ksni_addr;
2106
2107                 if (ksi->ksni_index != ifindex ||
2108                     strcmp(ksi->ksni_name, event_netdev->name))
2109                         continue;
2110
2111                 if (sa->sin_addr.s_addr == ifa->ifa_local) {
2112                         ni = net->ksnn_ni;
2113                         link_down = (event == NETDEV_DOWN);
2114                         ni_state_before = lnet_set_link_fatal_state(ni,
2115                                                                     link_down);
2116
2117                         if (!update_ping_buf &&
2118                             (ni->ni_state == LNET_NI_STATE_ACTIVE) &&
2119                             ((event == NETDEV_DOWN) != ni_state_before))
2120                                 update_ping_buf = true;
2121                 }
2122         }
2123
2124         if (update_ping_buf)
2125                 lnet_mark_ping_buffer_for_update();
2126 out:
2127         return 0;
2128 }
2129
2130 /************************************
2131  * Net device notifier event handler
2132  ************************************/
2133 static int ksocknal_device_event(struct notifier_block *unused,
2134                                  unsigned long event, void *ptr)
2135 {
2136         struct net_device *dev = netdev_notifier_info_to_dev(ptr);
2137         unsigned char operstate;
2138
2139         operstate = dev->operstate;
2140
2141         CDEBUG(D_NET, "devevent: status=%ld, iface=%s ifindex %d state %u\n",
2142                event, dev->name, dev->ifindex, operstate);
2143
2144         switch (event) {
2145         case NETDEV_UP:
2146         case NETDEV_DOWN:
2147         case NETDEV_CHANGE:
2148         case NETDEV_REGISTER:
2149         case NETDEV_UNREGISTER:
2150                 ksocknal_handle_link_state_change(dev, operstate);
2151                 break;
2152         }
2153
2154         return NOTIFY_OK;
2155 }
2156
2157 /************************************
2158  * Inetaddr notifier event handler
2159  ************************************/
2160 static int ksocknal_inetaddr_event(struct notifier_block *unused,
2161                                    unsigned long event, void *ptr)
2162 {
2163         struct in_ifaddr *ifa = ptr;
2164
2165         CDEBUG(D_NET, "addrevent: status %ld ip addr %pI4, netmask %pI4.\n",
2166                event, &ifa->ifa_address, &ifa->ifa_mask);
2167
2168         switch (event) {
2169         case NETDEV_UP:
2170         case NETDEV_DOWN:
2171         case NETDEV_CHANGE:
2172                 ksocknal_handle_inetaddr_change(ifa, event);
2173                 break;
2174
2175         }
2176         return NOTIFY_OK;
2177 }
2178
2179 static struct notifier_block ksocknal_dev_notifier_block = {
2180         .notifier_call = ksocknal_device_event,
2181 };
2182
2183 static struct notifier_block ksocknal_inetaddr_notifier_block = {
2184         .notifier_call = ksocknal_inetaddr_event,
2185 };
2186
2187 static void
2188 ksocknal_base_shutdown(void)
2189 {
2190         struct ksock_sched *sched;
2191         struct ksock_peer_ni *peer_ni;
2192         int i;
2193
2194         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %lld\n",
2195                libcfs_kmem_read());
2196         LASSERT (ksocknal_data.ksnd_nnets == 0);
2197
2198         if (ksocknal_data.ksnd_init == SOCKNAL_INIT_ALL) {
2199                 unregister_netdevice_notifier(&ksocknal_dev_notifier_block);
2200                 unregister_inetaddr_notifier(&ksocknal_inetaddr_notifier_block);
2201         }
2202
2203         switch (ksocknal_data.ksnd_init) {
2204         default:
2205                 LASSERT(0);
2206                 fallthrough;
2207
2208         case SOCKNAL_INIT_ALL:
2209         case SOCKNAL_INIT_DATA:
2210                 hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list)
2211                         LASSERT(0);
2212
2213                 LASSERT(list_empty(&ksocknal_data.ksnd_nets));
2214                 LASSERT(list_empty(&ksocknal_data.ksnd_enomem_conns));
2215                 LASSERT(list_empty(&ksocknal_data.ksnd_zombie_conns));
2216                 LASSERT(list_empty(&ksocknal_data.ksnd_connd_connreqs));
2217                 LASSERT(list_empty(&ksocknal_data.ksnd_connd_routes));
2218
2219                 if (ksocknal_data.ksnd_schedulers != NULL) {
2220                         cfs_percpt_for_each(sched, i,
2221                                             ksocknal_data.ksnd_schedulers) {
2222
2223                                 LASSERT(list_empty(&sched->kss_tx_conns));
2224                                 LASSERT(list_empty(&sched->kss_rx_conns));
2225                                 LASSERT(list_empty(&sched->kss_zombie_noop_txs));
2226                                 LASSERT(sched->kss_nconns == 0);
2227                         }
2228                 }
2229
2230                 /* flag threads to terminate; wake and wait for them to die */
2231                 ksocknal_data.ksnd_shuttingdown = 1;
2232                 wake_up_all(&ksocknal_data.ksnd_connd_waitq);
2233                 wake_up(&ksocknal_data.ksnd_reaper_waitq);
2234
2235                 if (ksocknal_data.ksnd_schedulers != NULL) {
2236                         cfs_percpt_for_each(sched, i,
2237                                             ksocknal_data.ksnd_schedulers)
2238                                         wake_up_all(&sched->kss_waitq);
2239                 }
2240
2241                 wait_var_event_warning(&ksocknal_data.ksnd_nthreads,
2242                                        atomic_read(&ksocknal_data.ksnd_nthreads) == 0,
2243                                        "waiting for %d threads to terminate\n",
2244                                        atomic_read(&ksocknal_data.ksnd_nthreads));
2245
2246                 ksocknal_free_buffers();
2247
2248                 ksocknal_data.ksnd_init = SOCKNAL_INIT_NOTHING;
2249                 break;
2250         }
2251
2252         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %lld\n",
2253                libcfs_kmem_read());
2254
2255         module_put(THIS_MODULE);
2256 }
2257
2258 static int
2259 ksocknal_base_startup(void)
2260 {
2261         struct ksock_sched *sched;
2262         int rc;
2263         int i;
2264
2265         LASSERT(ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
2266         LASSERT(ksocknal_data.ksnd_nnets == 0);
2267
2268         memset(&ksocknal_data, 0, sizeof(ksocknal_data)); /* zero pointers */
2269
2270         hash_init(ksocknal_data.ksnd_peers);
2271
2272         rwlock_init(&ksocknal_data.ksnd_global_lock);
2273         INIT_LIST_HEAD(&ksocknal_data.ksnd_nets);
2274
2275         spin_lock_init(&ksocknal_data.ksnd_reaper_lock);
2276         INIT_LIST_HEAD(&ksocknal_data.ksnd_enomem_conns);
2277         INIT_LIST_HEAD(&ksocknal_data.ksnd_zombie_conns);
2278         INIT_LIST_HEAD(&ksocknal_data.ksnd_deathrow_conns);
2279         init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
2280
2281         spin_lock_init(&ksocknal_data.ksnd_connd_lock);
2282         INIT_LIST_HEAD(&ksocknal_data.ksnd_connd_connreqs);
2283         INIT_LIST_HEAD(&ksocknal_data.ksnd_connd_routes);
2284         init_waitqueue_head(&ksocknal_data.ksnd_connd_waitq);
2285
2286         spin_lock_init(&ksocknal_data.ksnd_tx_lock);
2287         INIT_LIST_HEAD(&ksocknal_data.ksnd_idle_noop_txs);
2288
2289         /* NB memset above zeros whole of ksocknal_data */
2290
2291         /* flag lists/ptrs/locks initialised */
2292         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
2293         if (!try_module_get(THIS_MODULE))
2294                 goto failed;
2295
2296         /* Create a scheduler block per available CPT */
2297         ksocknal_data.ksnd_schedulers = cfs_percpt_alloc(lnet_cpt_table(),
2298                                                          sizeof(*sched));
2299         if (ksocknal_data.ksnd_schedulers == NULL)
2300                 goto failed;
2301
2302         cfs_percpt_for_each(sched, i, ksocknal_data.ksnd_schedulers) {
2303                 int nthrs;
2304
2305                 /*
2306                  * make sure not to allocate more threads than there are
2307                  * cores/CPUs in teh CPT
2308                  */
2309                 nthrs = cfs_cpt_weight(lnet_cpt_table(), i);
2310                 if (*ksocknal_tunables.ksnd_nscheds > 0) {
2311                         nthrs = min(nthrs, *ksocknal_tunables.ksnd_nscheds);
2312                 } else {
2313                         /*
2314                          * max to half of CPUs, assume another half should be
2315                          * reserved for upper layer modules
2316                          */
2317                         nthrs = min(max(SOCKNAL_NSCHEDS, nthrs >> 1), nthrs);
2318                 }
2319
2320                 sched->kss_nthreads_max = nthrs;
2321                 sched->kss_cpt = i;
2322
2323                 spin_lock_init(&sched->kss_lock);
2324                 INIT_LIST_HEAD(&sched->kss_rx_conns);
2325                 INIT_LIST_HEAD(&sched->kss_tx_conns);
2326                 INIT_LIST_HEAD(&sched->kss_zombie_noop_txs);
2327                 init_waitqueue_head(&sched->kss_waitq);
2328         }
2329
2330         ksocknal_data.ksnd_connd_starting         = 0;
2331         ksocknal_data.ksnd_connd_failed_stamp     = 0;
2332         ksocknal_data.ksnd_connd_starting_stamp   = ktime_get_real_seconds();
2333         /* must have at least 2 connds to remain responsive to accepts while
2334          * connecting */
2335         if (*ksocknal_tunables.ksnd_nconnds < SOCKNAL_CONND_RESV + 1)
2336                 *ksocknal_tunables.ksnd_nconnds = SOCKNAL_CONND_RESV + 1;
2337
2338         if (*ksocknal_tunables.ksnd_nconnds_max <
2339             *ksocknal_tunables.ksnd_nconnds) {
2340                 ksocknal_tunables.ksnd_nconnds_max =
2341                         ksocknal_tunables.ksnd_nconnds;
2342         }
2343
2344         for (i = 0; i < *ksocknal_tunables.ksnd_nconnds; i++) {
2345                 spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
2346                 ksocknal_data.ksnd_connd_starting++;
2347                 spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
2348
2349                 rc = ksocknal_thread_start(ksocknal_connd,
2350                                            (void *)((uintptr_t)i),
2351                                            "socknal_cd%02d", i);
2352                 if (rc != 0) {
2353                         spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
2354                         ksocknal_data.ksnd_connd_starting--;
2355                         spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
2356                         CERROR("Can't spawn socknal connd: %d\n", rc);
2357                         goto failed;
2358                 }
2359         }
2360
2361         rc = ksocknal_thread_start(ksocknal_reaper, NULL, "socknal_reaper");
2362         if (rc != 0) {
2363                 CERROR ("Can't spawn socknal reaper: %d\n", rc);
2364                 goto failed;
2365         }
2366
2367         register_netdevice_notifier(&ksocknal_dev_notifier_block);
2368         register_inetaddr_notifier(&ksocknal_inetaddr_notifier_block);
2369
2370         /* flag everything initialised */
2371         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
2372
2373         return 0;
2374
2375  failed:
2376         ksocknal_base_shutdown();
2377         return -ENETDOWN;
2378 }
2379
2380 static int
2381 ksocknal_debug_peerhash(struct lnet_ni *ni)
2382 {
2383         struct ksock_peer_ni *peer_ni;
2384         int i;
2385
2386         read_lock(&ksocknal_data.ksnd_global_lock);
2387
2388         hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list) {
2389                 struct ksock_conn_cb *conn_cb;
2390                 struct ksock_conn *conn;
2391
2392                 if (peer_ni->ksnp_ni != ni)
2393                         continue;
2394
2395                 CWARN("Active peer_ni on shutdown: %s, ref %d, closing %d, accepting %d, err %d, zcookie %llu, txq %d, zc_req %d\n",
2396                       libcfs_idstr(&peer_ni->ksnp_id),
2397                       refcount_read(&peer_ni->ksnp_refcount),
2398                       peer_ni->ksnp_closing,
2399                       peer_ni->ksnp_accepting, peer_ni->ksnp_error,
2400                       peer_ni->ksnp_zc_next_cookie,
2401                       !list_empty(&peer_ni->ksnp_tx_queue),
2402                       !list_empty(&peer_ni->ksnp_zc_req_list));
2403
2404                 conn_cb = peer_ni->ksnp_conn_cb;
2405                 if (conn_cb) {
2406                         CWARN("ConnCB: ref %d, schd %d, conn %d, cnted %d, del %d\n",
2407                               refcount_read(&conn_cb->ksnr_refcount),
2408                               conn_cb->ksnr_scheduled, conn_cb->ksnr_connecting,
2409                               conn_cb->ksnr_connected, conn_cb->ksnr_deleted);
2410                 }
2411
2412                 list_for_each_entry(conn, &peer_ni->ksnp_conns, ksnc_list) {
2413                         CWARN("Conn: ref %d, sref %d, t %d, c %d\n",
2414                               refcount_read(&conn->ksnc_conn_refcount),
2415                               refcount_read(&conn->ksnc_sock_refcount),
2416                               conn->ksnc_type, conn->ksnc_closing);
2417                 }
2418                 break;
2419         }
2420
2421         read_unlock(&ksocknal_data.ksnd_global_lock);
2422         return 0;
2423 }
2424
2425 void
2426 ksocknal_shutdown(struct lnet_ni *ni)
2427 {
2428         struct ksock_net *net = ni->ni_data;
2429
2430         LASSERT(ksocknal_data.ksnd_init == SOCKNAL_INIT_ALL);
2431         LASSERT(ksocknal_data.ksnd_nnets > 0);
2432
2433         /* prevent new peers */
2434         atomic_add(SOCKNAL_SHUTDOWN_BIAS, &net->ksnn_npeers);
2435
2436         /* Delete all peers */
2437         ksocknal_del_peer(ni, NULL);
2438
2439         /* Wait for all peer_ni state to clean up */
2440         wait_var_event_warning(&net->ksnn_npeers,
2441                                atomic_read(&net->ksnn_npeers) ==
2442                                SOCKNAL_SHUTDOWN_BIAS,
2443                                "waiting for %d peers to disconnect\n",
2444                                ksocknal_debug_peerhash(ni) +
2445                                atomic_read(&net->ksnn_npeers) -
2446                                SOCKNAL_SHUTDOWN_BIAS);
2447
2448         LASSERT(net->ksnn_interface.ksni_npeers == 0);
2449         LASSERT(net->ksnn_interface.ksni_nroutes == 0);
2450
2451         list_del(&net->ksnn_list);
2452         LIBCFS_FREE(net, sizeof(*net));
2453
2454         ksocknal_data.ksnd_nnets--;
2455         if (ksocknal_data.ksnd_nnets == 0)
2456                 ksocknal_base_shutdown();
2457 }
2458
2459 static int
2460 ksocknal_search_new_ipif(struct ksock_net *net)
2461 {
2462         int new_ipif = 0;
2463         char *ifnam = &net->ksnn_interface.ksni_name[0];
2464         char *colon = strchr(ifnam, ':');
2465         bool found = false;
2466         struct ksock_net *tmp;
2467
2468         if (colon != NULL)
2469                 *colon = 0;
2470
2471         list_for_each_entry(tmp, &ksocknal_data.ksnd_nets, ksnn_list) {
2472                 char *ifnam2 = &tmp->ksnn_interface.ksni_name[0];
2473                 char *colon2 = strchr(ifnam2, ':');
2474
2475                 if (colon2 != NULL)
2476                         *colon2 = 0;
2477
2478                 found = strcmp(ifnam, ifnam2) == 0;
2479                 if (colon2 != NULL)
2480                         *colon2 = ':';
2481         }
2482
2483         new_ipif += !found;
2484         if (colon != NULL)
2485                 *colon = ':';
2486
2487         return new_ipif;
2488 }
2489
2490 static int
2491 ksocknal_start_schedulers(struct ksock_sched *sched)
2492 {
2493         int     nthrs;
2494         int     rc = 0;
2495         int     i;
2496
2497         if (sched->kss_nthreads == 0) {
2498                 if (*ksocknal_tunables.ksnd_nscheds > 0) {
2499                         nthrs = sched->kss_nthreads_max;
2500                 } else {
2501                         nthrs = cfs_cpt_weight(lnet_cpt_table(),
2502                                                sched->kss_cpt);
2503                         nthrs = min(max(SOCKNAL_NSCHEDS, nthrs >> 1), nthrs);
2504                         nthrs = min(SOCKNAL_NSCHEDS_HIGH, nthrs);
2505                 }
2506                 nthrs = min(nthrs, sched->kss_nthreads_max);
2507         } else {
2508                 LASSERT(sched->kss_nthreads <= sched->kss_nthreads_max);
2509                 /* increase two threads if there is new interface */
2510                 nthrs = min(2, sched->kss_nthreads_max - sched->kss_nthreads);
2511         }
2512
2513         for (i = 0; i < nthrs; i++) {
2514                 long id;
2515
2516                 id = KSOCK_THREAD_ID(sched->kss_cpt, sched->kss_nthreads + i);
2517                 rc = ksocknal_thread_start(ksocknal_scheduler, (void *)id,
2518                                            "socknal_sd%02d_%02d",
2519                                            sched->kss_cpt,
2520                                            (int)KSOCK_THREAD_SID(id));
2521                 if (rc == 0)
2522                         continue;
2523
2524                 CERROR("Can't spawn thread %d for scheduler[%d]: %d\n",
2525                        sched->kss_cpt, (int) KSOCK_THREAD_SID(id), rc);
2526                 break;
2527         }
2528
2529         sched->kss_nthreads += i;
2530         return rc;
2531 }
2532
2533 static int
2534 ksocknal_net_start_threads(struct ksock_net *net, __u32 *cpts, int ncpts)
2535 {
2536         int newif = ksocknal_search_new_ipif(net);
2537         int rc;
2538         int i;
2539
2540         if (ncpts > 0 && ncpts > cfs_cpt_number(lnet_cpt_table()))
2541                 return -EINVAL;
2542
2543         for (i = 0; i < ncpts; i++) {
2544                 struct ksock_sched *sched;
2545                 int cpt = (cpts == NULL) ? i : cpts[i];
2546
2547                 LASSERT(cpt < cfs_cpt_number(lnet_cpt_table()));
2548                 sched = ksocknal_data.ksnd_schedulers[cpt];
2549
2550                 if (!newif && sched->kss_nthreads > 0)
2551                         continue;
2552
2553                 rc = ksocknal_start_schedulers(sched);
2554                 if (rc != 0)
2555                         return rc;
2556         }
2557         return 0;
2558 }
2559
2560 int
2561 ksocknal_startup(struct lnet_ni *ni)
2562 {
2563         struct ksock_net *net;
2564         struct ksock_interface *ksi = NULL;
2565         struct lnet_inetdev *ifaces = NULL;
2566         int rc, if_idx;
2567         int dev_status;
2568
2569         LASSERT (ni->ni_net->net_lnd == &the_ksocklnd);
2570         if (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING) {
2571                 rc = ksocknal_base_startup();
2572                 if (rc != 0)
2573                         return rc;
2574         }
2575         LIBCFS_ALLOC(net, sizeof(*net));
2576         if (net == NULL)
2577                 goto out_base;
2578
2579         net->ksnn_incarnation = ktime_get_real_ns();
2580         ni->ni_data = net;
2581
2582         ksocknal_tunables_setup(ni);
2583
2584         rc = lnet_inet_enumerate(&ifaces, ni->ni_net_ns, true);
2585         if (rc < 0)
2586                 goto out_net;
2587
2588         ksi = &net->ksnn_interface;
2589
2590         /* Interface and/or IP address is specified otherwise default to
2591          * the first Interface
2592          */
2593         if_idx = lnet_inet_select(ni, ifaces, rc);
2594         if (if_idx < 0)
2595                 goto out_net;
2596
2597         if (!ni->ni_interface) {
2598                 rc = lnet_ni_add_interface(ni, ifaces[if_idx].li_name);
2599                 if (rc < 0)
2600                         CWARN("ksocklnd failed to allocate ni_interface\n");
2601         }
2602
2603         ni->ni_dev_cpt = ifaces[if_idx].li_cpt;
2604         ksi->ksni_index = ifaces[if_idx].li_index;
2605         if (ifaces[if_idx].li_size == sizeof(struct in6_addr)) {
2606                 struct sockaddr_in6 *sa;
2607                 sa = (void *)&ksi->ksni_addr;
2608                 memset(sa, 0, sizeof(*sa));
2609                 sa->sin6_family = AF_INET6;
2610                 memcpy(&sa->sin6_addr, ifaces[if_idx].li_ipv6addr,
2611                        sizeof(struct in6_addr));
2612                 ni->ni_nid.nid_size = sizeof(struct in6_addr) - 4;
2613                 memcpy(&ni->ni_nid.nid_addr, ifaces[if_idx].li_ipv6addr,
2614                        sizeof(struct in6_addr));
2615         } else {
2616                 struct sockaddr_in *sa;
2617                 sa = (void *)&ksi->ksni_addr;
2618                 memset(sa, 0, sizeof(*sa));
2619                 sa->sin_family = AF_INET;
2620                 sa->sin_addr.s_addr = ifaces[if_idx].li_ipaddr;
2621                 ksi->ksni_netmask = ifaces[if_idx].li_netmask;
2622                 ni->ni_nid.nid_size = 0;
2623                 ni->ni_nid.nid_addr[0] = sa->sin_addr.s_addr;
2624         }
2625         strlcpy(ksi->ksni_name, ifaces[if_idx].li_name, sizeof(ksi->ksni_name));
2626
2627         /* call it before add it to ksocknal_data.ksnd_nets */
2628         rc = ksocknal_net_start_threads(net, ni->ni_cpts, ni->ni_ncpts);
2629         if (rc != 0)
2630                 goto out_net;
2631
2632         if ((ksocknal_ip2index((struct sockaddr *)&ksi->ksni_addr,
2633                                 ni,
2634                                 &dev_status) < 0) ||
2635              (dev_status <= 0))
2636                 lnet_set_link_fatal_state(ni, 1);
2637
2638         list_add(&net->ksnn_list, &ksocknal_data.ksnd_nets);
2639         net->ksnn_ni = ni;
2640         ksocknal_data.ksnd_nnets++;
2641
2642         return 0;
2643
2644 out_net:
2645         LIBCFS_FREE(net, sizeof(*net));
2646 out_base:
2647         if (ksocknal_data.ksnd_nnets == 0)
2648                 ksocknal_base_shutdown();
2649
2650         return -ENETDOWN;
2651 }
2652
2653 static void __exit ksocklnd_exit(void)
2654 {
2655         lnet_unregister_lnd(&the_ksocklnd);
2656 }
2657
2658 static const struct lnet_lnd the_ksocklnd = {
2659         .lnd_type               = SOCKLND,
2660         .lnd_startup            = ksocknal_startup,
2661         .lnd_shutdown           = ksocknal_shutdown,
2662         .lnd_ctl                = ksocknal_ctl,
2663         .lnd_send               = ksocknal_send,
2664         .lnd_recv               = ksocknal_recv,
2665         .lnd_notify_peer_down   = ksocknal_notify_gw_down,
2666         .lnd_accept             = ksocknal_accept,
2667         .lnd_nl_get             = ksocknal_nl_get,
2668         .lnd_nl_set             = ksocknal_nl_set,
2669         .lnd_keys               = &ksocknal_tunables_keys,
2670 };
2671
2672 static int __init ksocklnd_init(void)
2673 {
2674         int rc;
2675
2676         /* check ksnr_connected/connecting field large enough */
2677         BUILD_BUG_ON(SOCKLND_CONN_NTYPES > 4);
2678         BUILD_BUG_ON(SOCKLND_CONN_ACK != SOCKLND_CONN_BULK_IN);
2679
2680         rc = ksocknal_tunables_init();
2681         if (rc != 0)
2682                 return rc;
2683
2684         rc = libcfs_setup();
2685         if (rc)
2686                 return rc;
2687
2688         lnet_register_lnd(&the_ksocklnd);
2689
2690         return 0;
2691 }
2692
2693 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2694 MODULE_DESCRIPTION("TCP Socket LNet Network Driver");
2695 MODULE_VERSION("2.8.0");
2696 MODULE_LICENSE("GPL");
2697
2698 module_init(ksocklnd_init);
2699 module_exit(ksocklnd_exit);