Whamcloud - gitweb
LU-13781 lnet: Local NI must be on same net as next-hop
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lnet/klnds/socklnd/socklnd.c
32  *
33  * Author: Zach Brown <zab@zabbo.net>
34  * Author: Peter J. Braam <braam@clusterfs.com>
35  * Author: Phil Schwan <phil@clusterfs.com>
36  * Author: Eric Barton <eric@bartonsoftware.com>
37  */
38
39 #include <linux/inetdevice.h>
40 #include "socklnd.h"
41 #include <linux/sunrpc/addr.h>
42
43 static const struct lnet_lnd the_ksocklnd;
44 struct ksock_nal_data ksocknal_data;
45
46 static struct ksock_interface *
47 ksocknal_ip2iface(struct lnet_ni *ni, struct sockaddr *addr)
48 {
49         struct ksock_net *net = ni->ni_data;
50         struct ksock_interface *iface;
51
52         iface = &net->ksnn_interface;
53
54         if (rpc_cmp_addr((struct sockaddr *)&iface->ksni_addr, addr))
55                 return iface;
56
57         return NULL;
58 }
59
60 static struct ksock_interface *
61 ksocknal_index2iface(struct lnet_ni *ni, int index)
62 {
63         struct ksock_net *net = ni->ni_data;
64         struct ksock_interface *iface;
65
66         iface = &net->ksnn_interface;
67
68         if (iface->ksni_index == index)
69                 return iface;
70
71         return NULL;
72 }
73
74 static int ksocknal_ip2index(struct sockaddr *addr, struct lnet_ni *ni)
75 {
76         struct net_device *dev;
77         int ret = -1;
78         DECLARE_CONST_IN_IFADDR(ifa);
79
80         if (addr->sa_family != AF_INET)
81                 /* No IPv6 support yet */
82                 return ret;
83
84         rcu_read_lock();
85         for_each_netdev(ni->ni_net_ns, dev) {
86                 int flags = dev_get_flags(dev);
87                 struct in_device *in_dev;
88
89                 if (flags & IFF_LOOPBACK) /* skip the loopback IF */
90                         continue;
91
92                 if (!(flags & IFF_UP))
93                         continue;
94
95                 in_dev = __in_dev_get_rcu(dev);
96                 if (!in_dev)
97                         continue;
98
99                 in_dev_for_each_ifa_rcu(ifa, in_dev) {
100                         if (ifa->ifa_local ==
101                             ((struct sockaddr_in *)addr)->sin_addr.s_addr)
102                                 ret = dev->ifindex;
103                 }
104                 endfor_ifa(in_dev);
105                 if (ret >= 0)
106                         break;
107         }
108         rcu_read_unlock();
109
110         return ret;
111 }
112
113 static struct ksock_conn_cb *
114 ksocknal_create_conn_cb(struct sockaddr *addr)
115 {
116         struct ksock_conn_cb *conn_cb;
117
118         LIBCFS_ALLOC(conn_cb, sizeof(*conn_cb));
119         if (!conn_cb)
120                 return NULL;
121
122         refcount_set(&conn_cb->ksnr_refcount, 1);
123         conn_cb->ksnr_peer = NULL;
124         conn_cb->ksnr_retry_interval = 0;         /* OK to connect at any time */
125         rpc_copy_addr((struct sockaddr *)&conn_cb->ksnr_addr, addr);
126         rpc_set_port((struct sockaddr *)&conn_cb->ksnr_addr,
127                      rpc_get_port(addr));
128         conn_cb->ksnr_myiface = -1;
129         conn_cb->ksnr_scheduled = 0;
130         conn_cb->ksnr_connecting = 0;
131         conn_cb->ksnr_connected = 0;
132         conn_cb->ksnr_deleted = 0;
133         conn_cb->ksnr_conn_count = 0;
134
135         return conn_cb;
136 }
137
138 void
139 ksocknal_destroy_conn_cb(struct ksock_conn_cb *conn_cb)
140 {
141         LASSERT(refcount_read(&conn_cb->ksnr_refcount) == 0);
142
143         if (conn_cb->ksnr_peer)
144                 ksocknal_peer_decref(conn_cb->ksnr_peer);
145
146         LIBCFS_FREE(conn_cb, sizeof(*conn_cb));
147 }
148
149 static struct ksock_peer_ni *
150 ksocknal_create_peer(struct lnet_ni *ni, struct lnet_process_id id)
151 {
152         int cpt = lnet_cpt_of_nid(id.nid, ni);
153         struct ksock_net *net = ni->ni_data;
154         struct ksock_peer_ni *peer_ni;
155
156         LASSERT(id.nid != LNET_NID_ANY);
157         LASSERT(id.pid != LNET_PID_ANY);
158         LASSERT(!in_interrupt());
159
160         if (!atomic_inc_unless_negative(&net->ksnn_npeers)) {
161                 CERROR("Can't create peer_ni: network shutdown\n");
162                 return ERR_PTR(-ESHUTDOWN);
163         }
164
165         LIBCFS_CPT_ALLOC(peer_ni, lnet_cpt_table(), cpt, sizeof(*peer_ni));
166         if (!peer_ni) {
167                 atomic_dec(&net->ksnn_npeers);
168                 return ERR_PTR(-ENOMEM);
169         }
170
171         peer_ni->ksnp_ni = ni;
172         peer_ni->ksnp_id = id;
173         refcount_set(&peer_ni->ksnp_refcount, 1); /* 1 ref for caller */
174         peer_ni->ksnp_closing = 0;
175         peer_ni->ksnp_accepting = 0;
176         peer_ni->ksnp_proto = NULL;
177         peer_ni->ksnp_last_alive = 0;
178         peer_ni->ksnp_zc_next_cookie = SOCKNAL_KEEPALIVE_PING + 1;
179         peer_ni->ksnp_conn_cb = NULL;
180
181         INIT_LIST_HEAD(&peer_ni->ksnp_conns);
182         INIT_LIST_HEAD(&peer_ni->ksnp_tx_queue);
183         INIT_LIST_HEAD(&peer_ni->ksnp_zc_req_list);
184         spin_lock_init(&peer_ni->ksnp_lock);
185
186         return peer_ni;
187 }
188
189 void
190 ksocknal_destroy_peer(struct ksock_peer_ni *peer_ni)
191 {
192         struct ksock_net *net = peer_ni->ksnp_ni->ni_data;
193
194         CDEBUG (D_NET, "peer_ni %s %p deleted\n",
195                 libcfs_id2str(peer_ni->ksnp_id), peer_ni);
196
197         LASSERT(refcount_read(&peer_ni->ksnp_refcount) == 0);
198         LASSERT(peer_ni->ksnp_accepting == 0);
199         LASSERT(list_empty(&peer_ni->ksnp_conns));
200         LASSERT(peer_ni->ksnp_conn_cb == NULL);
201         LASSERT(list_empty(&peer_ni->ksnp_tx_queue));
202         LASSERT(list_empty(&peer_ni->ksnp_zc_req_list));
203
204         LIBCFS_FREE(peer_ni, sizeof(*peer_ni));
205
206         /* NB a peer_ni's connections and conn_cb keep a reference on their
207          * peer_ni until they are destroyed, so we can be assured that _all_
208          * state to do with this peer_ni has been cleaned up when its refcount
209          * drops to zero.
210          */
211         if (atomic_dec_and_test(&net->ksnn_npeers))
212                 wake_up_var(&net->ksnn_npeers);
213 }
214
215 struct ksock_peer_ni *
216 ksocknal_find_peer_locked(struct lnet_ni *ni, struct lnet_process_id id)
217 {
218         struct ksock_peer_ni *peer_ni;
219
220         hash_for_each_possible(ksocknal_data.ksnd_peers, peer_ni,
221                                ksnp_list, id.nid) {
222                 LASSERT(!peer_ni->ksnp_closing);
223
224                 if (peer_ni->ksnp_ni != ni)
225                         continue;
226
227                 if (peer_ni->ksnp_id.nid != id.nid ||
228                     peer_ni->ksnp_id.pid != id.pid)
229                         continue;
230
231                 CDEBUG(D_NET, "got peer_ni [%p] -> %s (%d)\n",
232                        peer_ni, libcfs_id2str(id),
233                        refcount_read(&peer_ni->ksnp_refcount));
234                 return peer_ni;
235         }
236         return NULL;
237 }
238
239 struct ksock_peer_ni *
240 ksocknal_find_peer(struct lnet_ni *ni, struct lnet_process_id id)
241 {
242         struct ksock_peer_ni *peer_ni;
243
244         read_lock(&ksocknal_data.ksnd_global_lock);
245         peer_ni = ksocknal_find_peer_locked(ni, id);
246         if (peer_ni != NULL)                    /* +1 ref for caller? */
247                 ksocknal_peer_addref(peer_ni);
248         read_unlock(&ksocknal_data.ksnd_global_lock);
249
250         return (peer_ni);
251 }
252
253 static void
254 ksocknal_unlink_peer_locked(struct ksock_peer_ni *peer_ni)
255 {
256         int i;
257         struct ksock_interface *iface;
258
259         for (i = 0; i < peer_ni->ksnp_n_passive_ips; i++) {
260                 struct sockaddr_in sa = { .sin_family = AF_INET };
261                 LASSERT(i < LNET_INTERFACES_NUM);
262                 sa.sin_addr.s_addr = htonl(peer_ni->ksnp_passive_ips[i]);
263
264                 iface = ksocknal_ip2iface(peer_ni->ksnp_ni,
265                                           (struct sockaddr *)&sa);
266                 /*
267                  * All IPs in peer_ni->ksnp_passive_ips[] come from the
268                  * interface list, therefore the call must succeed.
269                  */
270                 LASSERT(iface != NULL);
271
272                 CDEBUG(D_NET, "peer_ni=%p iface=%p ksni_nroutes=%d\n",
273                        peer_ni, iface, iface->ksni_nroutes);
274                 iface->ksni_npeers--;
275         }
276
277         LASSERT(list_empty(&peer_ni->ksnp_conns));
278         LASSERT(peer_ni->ksnp_conn_cb == NULL);
279         LASSERT(!peer_ni->ksnp_closing);
280         peer_ni->ksnp_closing = 1;
281         hlist_del(&peer_ni->ksnp_list);
282         /* lose peerlist's ref */
283         ksocknal_peer_decref(peer_ni);
284 }
285
286 static int
287 ksocknal_get_peer_info(struct lnet_ni *ni, int index,
288                        struct lnet_process_id *id, __u32 *myip, __u32 *peer_ip,
289                        int *port, int *conn_count, int *share_count)
290 {
291         struct ksock_peer_ni *peer_ni;
292         struct ksock_conn_cb *conn_cb;
293         int i;
294         int j;
295         int rc = -ENOENT;
296
297         read_lock(&ksocknal_data.ksnd_global_lock);
298
299         hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list) {
300
301                 if (peer_ni->ksnp_ni != ni)
302                         continue;
303
304                 if (peer_ni->ksnp_n_passive_ips == 0 &&
305                     peer_ni->ksnp_conn_cb == NULL) {
306                         if (index-- > 0)
307                                 continue;
308
309                         *id = peer_ni->ksnp_id;
310                         *myip = 0;
311                         *peer_ip = 0;
312                         *port = 0;
313                         *conn_count = 0;
314                         *share_count = 0;
315                         rc = 0;
316                         goto out;
317                 }
318
319                 for (j = 0; j < peer_ni->ksnp_n_passive_ips; j++) {
320                         if (index-- > 0)
321                                 continue;
322
323                         *id = peer_ni->ksnp_id;
324                         *myip = peer_ni->ksnp_passive_ips[j];
325                         *peer_ip = 0;
326                         *port = 0;
327                         *conn_count = 0;
328                         *share_count = 0;
329                         rc = 0;
330                         goto out;
331                 }
332
333                 if (peer_ni->ksnp_conn_cb) {
334                         if (index-- > 0)
335                                 continue;
336
337                         conn_cb = peer_ni->ksnp_conn_cb;
338
339                         *id = peer_ni->ksnp_id;
340                         if (conn_cb->ksnr_addr.ss_family == AF_INET) {
341                                 struct sockaddr_in *sa =
342                                         (void *)&conn_cb->ksnr_addr;
343
344                                 rc = choose_ipv4_src(myip,
345                                                      conn_cb->ksnr_myiface,
346                                                      ntohl(sa->sin_addr.s_addr),
347                                                      ni->ni_net_ns);
348                                 *peer_ip = ntohl(sa->sin_addr.s_addr);
349                                 *port = ntohs(sa->sin_port);
350                         } else {
351                                 *myip = 0xFFFFFFFF;
352                                 *peer_ip = 0xFFFFFFFF;
353                                 *port = 0;
354                                 rc = -ENOTSUPP;
355                         }
356                         *conn_count = conn_cb->ksnr_conn_count;
357                         *share_count = 1;
358                         goto out;
359                 }
360         }
361 out:
362         read_unlock(&ksocknal_data.ksnd_global_lock);
363         return rc;
364 }
365
366 static void
367 ksocknal_associate_cb_conn_locked(struct ksock_conn_cb *conn_cb,
368                                   struct ksock_conn *conn)
369 {
370         struct ksock_peer_ni *peer_ni = conn_cb->ksnr_peer;
371         int type = conn->ksnc_type;
372         struct ksock_interface *iface;
373         int conn_iface;
374
375         conn_iface = ksocknal_ip2index((struct sockaddr *)&conn->ksnc_myaddr,
376                                        peer_ni->ksnp_ni);
377         conn->ksnc_conn_cb = conn_cb;
378         ksocknal_conn_cb_addref(conn_cb);
379
380         if (conn_cb->ksnr_myiface != conn_iface) {
381                 if (conn_cb->ksnr_myiface < 0) {
382                         /* route wasn't bound locally yet (the initial route) */
383                         CDEBUG(D_NET, "Binding %s %pIS to interface %d\n",
384                                libcfs_id2str(peer_ni->ksnp_id),
385                                &conn_cb->ksnr_addr,
386                                conn_iface);
387                 } else {
388                         CDEBUG(D_NET,
389                                "Rebinding %s %pIS from interface %d to %d\n",
390                                libcfs_id2str(peer_ni->ksnp_id),
391                                &conn_cb->ksnr_addr,
392                                conn_cb->ksnr_myiface,
393                                conn_iface);
394
395                         iface = ksocknal_index2iface(peer_ni->ksnp_ni,
396                                                      conn_cb->ksnr_myiface);
397                         if (iface)
398                                 iface->ksni_nroutes--;
399                 }
400                 conn_cb->ksnr_myiface = conn_iface;
401                 iface = ksocknal_index2iface(peer_ni->ksnp_ni,
402                                              conn_cb->ksnr_myiface);
403                 if (iface)
404                         iface->ksni_nroutes++;
405         }
406
407         conn_cb->ksnr_connected |= (1<<type);
408         conn_cb->ksnr_conn_count++;
409
410         /* Successful connection => further attempts can
411          * proceed immediately
412          */
413         conn_cb->ksnr_retry_interval = 0;
414 }
415
416 static void
417 ksocknal_add_conn_cb_locked(struct ksock_peer_ni *peer_ni,
418                             struct ksock_conn_cb *conn_cb)
419 {
420         struct list_head *tmp;
421         struct ksock_conn *conn;
422         struct ksock_net *net = peer_ni->ksnp_ni->ni_data;
423
424         LASSERT(!peer_ni->ksnp_closing);
425         LASSERT(!conn_cb->ksnr_peer);
426         LASSERT(!conn_cb->ksnr_scheduled);
427         LASSERT(!conn_cb->ksnr_connecting);
428         LASSERT(conn_cb->ksnr_connected == 0);
429
430         conn_cb->ksnr_peer = peer_ni;
431         ksocknal_peer_addref(peer_ni);
432
433         /* set the conn_cb's interface to the current net's interface */
434         conn_cb->ksnr_myiface = net->ksnn_interface.ksni_index;
435         net->ksnn_interface.ksni_nroutes++;
436
437         /* peer_ni's route list takes over my ref on 'route' */
438         peer_ni->ksnp_conn_cb = conn_cb;
439
440         list_for_each(tmp, &peer_ni->ksnp_conns) {
441                 conn = list_entry(tmp, struct ksock_conn, ksnc_list);
442
443                 if (!rpc_cmp_addr((struct sockaddr *)&conn->ksnc_peeraddr,
444                                   (struct sockaddr *)&conn_cb->ksnr_addr))
445                         continue;
446
447                 ksocknal_associate_cb_conn_locked(conn_cb, conn);
448                 /* keep going (typed conns) */
449         }
450 }
451
452 static void
453 ksocknal_del_conn_cb_locked(struct ksock_conn_cb *conn_cb)
454 {
455         struct ksock_peer_ni *peer_ni = conn_cb->ksnr_peer;
456         struct ksock_interface *iface;
457         struct ksock_conn *conn;
458         struct ksock_conn *cnxt;
459
460         LASSERT(!conn_cb->ksnr_deleted);
461
462         /* Close associated conns */
463         list_for_each_entry_safe(conn, cnxt, &peer_ni->ksnp_conns, ksnc_list) {
464                 if (conn->ksnc_conn_cb != conn_cb)
465                         continue;
466
467                 ksocknal_close_conn_locked(conn, 0);
468         }
469
470         if (conn_cb->ksnr_myiface >= 0) {
471                 iface = ksocknal_index2iface(peer_ni->ksnp_ni,
472                                              conn_cb->ksnr_myiface);
473                 if (iface)
474                         iface->ksni_nroutes--;
475         }
476
477         conn_cb->ksnr_deleted = 1;
478         ksocknal_conn_cb_decref(conn_cb);               /* drop peer_ni's ref */
479         peer_ni->ksnp_conn_cb = NULL;
480
481         if (list_empty(&peer_ni->ksnp_conns)) {
482                 /* I've just removed the last route to a peer_ni with no active
483                  * connections
484                  */
485                 ksocknal_unlink_peer_locked(peer_ni);
486         }
487 }
488
489 int
490 ksocknal_add_peer(struct lnet_ni *ni, struct lnet_process_id id,
491                   struct sockaddr *addr)
492 {
493         struct ksock_peer_ni *peer_ni;
494         struct ksock_peer_ni *peer2;
495         struct ksock_conn_cb *conn_cb;
496
497         if (id.nid == LNET_NID_ANY ||
498             id.pid == LNET_PID_ANY)
499                 return (-EINVAL);
500
501         /* Have a brand new peer_ni ready... */
502         peer_ni = ksocknal_create_peer(ni, id);
503         if (IS_ERR(peer_ni))
504                 return PTR_ERR(peer_ni);
505
506         conn_cb = ksocknal_create_conn_cb(addr);
507         if (!conn_cb) {
508                 ksocknal_peer_decref(peer_ni);
509                 return -ENOMEM;
510         }
511
512         write_lock_bh(&ksocknal_data.ksnd_global_lock);
513
514         /* always called with a ref on ni, so shutdown can't have started */
515         LASSERT(atomic_read(&((struct ksock_net *)ni->ni_data)->ksnn_npeers)
516                 >= 0);
517
518         peer2 = ksocknal_find_peer_locked(ni, id);
519         if (peer2 != NULL) {
520                 ksocknal_peer_decref(peer_ni);
521                 peer_ni = peer2;
522         } else {
523                 /* peer_ni table takes my ref on peer_ni */
524                 hash_add(ksocknal_data.ksnd_peers, &peer_ni->ksnp_list, id.nid);
525         }
526
527         ksocknal_add_conn_cb_locked(peer_ni, conn_cb);
528
529         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
530
531         return 0;
532 }
533
534 static void
535 ksocknal_del_peer_locked(struct ksock_peer_ni *peer_ni, __u32 ip)
536 {
537         struct ksock_conn *conn;
538         struct ksock_conn *cnxt;
539         struct ksock_conn_cb *conn_cb;
540
541         LASSERT(!peer_ni->ksnp_closing);
542
543         /* Extra ref prevents peer_ni disappearing until I'm done with it */
544         ksocknal_peer_addref(peer_ni);
545         conn_cb = peer_ni->ksnp_conn_cb;
546         if (conn_cb)
547                 ksocknal_del_conn_cb_locked(conn_cb);
548
549         list_for_each_entry_safe(conn, cnxt, &peer_ni->ksnp_conns,
550                                  ksnc_list)
551                 ksocknal_close_conn_locked(conn, 0);
552
553         ksocknal_peer_decref(peer_ni);
554         /* NB peer_ni unlinks itself when last conn/conn_cb is removed */
555 }
556
557 static int
558 ksocknal_del_peer(struct lnet_ni *ni, struct lnet_process_id id, __u32 ip)
559 {
560         LIST_HEAD(zombies);
561         struct hlist_node *pnxt;
562         struct ksock_peer_ni *peer_ni;
563         int lo;
564         int hi;
565         int i;
566         int rc = -ENOENT;
567
568         write_lock_bh(&ksocknal_data.ksnd_global_lock);
569
570         if (id.nid != LNET_NID_ANY) {
571                 lo = hash_min(id.nid, HASH_BITS(ksocknal_data.ksnd_peers));
572                 hi = lo;
573         } else {
574                 lo = 0;
575                 hi = HASH_SIZE(ksocknal_data.ksnd_peers) - 1;
576         }
577
578         for (i = lo; i <= hi; i++) {
579                 hlist_for_each_entry_safe(peer_ni, pnxt,
580                                           &ksocknal_data.ksnd_peers[i],
581                                           ksnp_list) {
582                         if (peer_ni->ksnp_ni != ni)
583                                 continue;
584
585                         if (!((id.nid == LNET_NID_ANY ||
586                                peer_ni->ksnp_id.nid == id.nid) &&
587                               (id.pid == LNET_PID_ANY ||
588                                peer_ni->ksnp_id.pid == id.pid)))
589                                 continue;
590
591                         ksocknal_peer_addref(peer_ni);  /* a ref for me... */
592
593                         ksocknal_del_peer_locked(peer_ni, ip);
594
595                         if (peer_ni->ksnp_closing &&
596                             !list_empty(&peer_ni->ksnp_tx_queue)) {
597                                 LASSERT(list_empty(&peer_ni->ksnp_conns));
598                                 LASSERT(peer_ni->ksnp_conn_cb == NULL);
599
600                                 list_splice_init(&peer_ni->ksnp_tx_queue,
601                                                  &zombies);
602                         }
603
604                         ksocknal_peer_decref(peer_ni);  /* ...till here */
605
606                         rc = 0;                         /* matched! */
607                 }
608         }
609
610         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
611
612         ksocknal_txlist_done(ni, &zombies, -ENETDOWN);
613
614         return rc;
615 }
616
617 static struct ksock_conn *
618 ksocknal_get_conn_by_idx(struct lnet_ni *ni, int index)
619 {
620         struct ksock_peer_ni *peer_ni;
621         struct ksock_conn *conn;
622         struct list_head *ctmp;
623         int i;
624
625         read_lock(&ksocknal_data.ksnd_global_lock);
626
627         hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list) {
628                 LASSERT(!peer_ni->ksnp_closing);
629
630                 if (peer_ni->ksnp_ni != ni)
631                         continue;
632
633                 list_for_each(ctmp, &peer_ni->ksnp_conns) {
634                         if (index-- > 0)
635                                 continue;
636
637                         conn = list_entry(ctmp, struct ksock_conn,
638                                           ksnc_list);
639                         ksocknal_conn_addref(conn);
640                         read_unlock(&ksocknal_data.ksnd_global_lock);
641                         return conn;
642                 }
643         }
644
645         read_unlock(&ksocknal_data.ksnd_global_lock);
646         return NULL;
647 }
648
649 static struct ksock_sched *
650 ksocknal_choose_scheduler_locked(unsigned int cpt)
651 {
652         struct ksock_sched *sched = ksocknal_data.ksnd_schedulers[cpt];
653         int i;
654
655         if (sched->kss_nthreads == 0) {
656                 cfs_percpt_for_each(sched, i, ksocknal_data.ksnd_schedulers) {
657                         if (sched->kss_nthreads > 0) {
658                                 CDEBUG(D_NET, "scheduler[%d] has no threads. selected scheduler[%d]\n",
659                                        cpt, sched->kss_cpt);
660                                 return sched;
661                         }
662                 }
663                 return NULL;
664         }
665
666         return sched;
667 }
668
669 int
670 ksocknal_accept(struct lnet_ni *ni, struct socket *sock)
671 {
672         struct ksock_connreq *cr;
673         int rc;
674         struct sockaddr_storage peer;
675
676         rc = lnet_sock_getaddr(sock, true, &peer);
677         LASSERT(rc == 0);               /* we succeeded before */
678
679         LIBCFS_ALLOC(cr, sizeof(*cr));
680         if (cr == NULL) {
681                 LCONSOLE_ERROR_MSG(0x12f,
682                                    "Dropping connection request from %pIS: memory exhausted\n",
683                                    &peer);
684                 return -ENOMEM;
685         }
686
687         lnet_ni_addref(ni);
688         cr->ksncr_ni   = ni;
689         cr->ksncr_sock = sock;
690
691         spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
692
693         list_add_tail(&cr->ksncr_list, &ksocknal_data.ksnd_connd_connreqs);
694         wake_up(&ksocknal_data.ksnd_connd_waitq);
695
696         spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
697         return 0;
698 }
699
700 static int
701 ksocknal_connecting(struct ksock_conn_cb *conn_cb, struct sockaddr *sa)
702 {
703         if (conn_cb &&
704             rpc_cmp_addr((struct sockaddr *)&conn_cb->ksnr_addr, sa))
705                 return conn_cb->ksnr_connecting;
706         return 0;
707 }
708
709 int
710 ksocknal_create_conn(struct lnet_ni *ni, struct ksock_conn_cb *conn_cb,
711                      struct socket *sock, int type)
712 {
713         rwlock_t *global_lock = &ksocknal_data.ksnd_global_lock;
714         LIST_HEAD(zombies);
715         struct lnet_process_id peerid;
716         struct list_head *tmp;
717         u64 incarnation;
718         struct ksock_conn *conn;
719         struct ksock_conn *conn2;
720         struct ksock_peer_ni *peer_ni = NULL;
721         struct ksock_peer_ni *peer2;
722         struct ksock_sched *sched;
723         struct ksock_hello_msg *hello;
724         int cpt;
725         struct ksock_tx *tx;
726         struct ksock_tx *txtmp;
727         int rc;
728         int rc2;
729         int active;
730         char *warn = NULL;
731
732         active = (conn_cb != NULL);
733
734         LASSERT(active == (type != SOCKLND_CONN_NONE));
735
736         LIBCFS_ALLOC(conn, sizeof(*conn));
737         if (conn == NULL) {
738                 rc = -ENOMEM;
739                 goto failed_0;
740         }
741
742         conn->ksnc_peer = NULL;
743         conn->ksnc_conn_cb = NULL;
744         conn->ksnc_sock = sock;
745         /* 2 ref, 1 for conn, another extra ref prevents socket
746          * being closed before establishment of connection */
747         refcount_set(&conn->ksnc_sock_refcount, 2);
748         conn->ksnc_type = type;
749         ksocknal_lib_save_callback(sock, conn);
750         refcount_set(&conn->ksnc_conn_refcount, 1); /* 1 ref for me */
751
752         conn->ksnc_rx_ready = 0;
753         conn->ksnc_rx_scheduled = 0;
754
755         INIT_LIST_HEAD(&conn->ksnc_tx_queue);
756         conn->ksnc_tx_ready = 0;
757         conn->ksnc_tx_scheduled = 0;
758         conn->ksnc_tx_carrier = NULL;
759         atomic_set (&conn->ksnc_tx_nob, 0);
760
761         LIBCFS_ALLOC(hello, offsetof(struct ksock_hello_msg,
762                                      kshm_ips[LNET_INTERFACES_NUM]));
763         if (hello == NULL) {
764                 rc = -ENOMEM;
765                 goto failed_1;
766         }
767
768         /* stash conn's local and remote addrs */
769         rc = ksocknal_lib_get_conn_addrs(conn);
770         if (rc != 0)
771                 goto failed_1;
772
773         /* Find out/confirm peer_ni's NID and connection type and get the
774          * vector of interfaces she's willing to let me connect to.
775          * Passive connections use the listener timeout since the peer_ni sends
776          * eagerly
777          */
778
779         if (active) {
780                 peer_ni = conn_cb->ksnr_peer;
781                 LASSERT(ni == peer_ni->ksnp_ni);
782
783                 /* Active connection sends HELLO eagerly */
784                 hello->kshm_nips = 0;
785                 peerid = peer_ni->ksnp_id;
786
787                 write_lock_bh(global_lock);
788                 conn->ksnc_proto = peer_ni->ksnp_proto;
789                 write_unlock_bh(global_lock);
790
791                 if (conn->ksnc_proto == NULL) {
792                         conn->ksnc_proto = &ksocknal_protocol_v3x;
793 #if SOCKNAL_VERSION_DEBUG
794                         if (*ksocknal_tunables.ksnd_protocol == 2)
795                                 conn->ksnc_proto = &ksocknal_protocol_v2x;
796                         else if (*ksocknal_tunables.ksnd_protocol == 1)
797                                 conn->ksnc_proto = &ksocknal_protocol_v1x;
798 #endif
799                 }
800
801                 rc = ksocknal_send_hello(ni, conn, peerid.nid, hello);
802                 if (rc != 0)
803                         goto failed_1;
804         } else {
805                 peerid.nid = LNET_NID_ANY;
806                 peerid.pid = LNET_PID_ANY;
807
808                 /* Passive, get protocol from peer_ni */
809                 conn->ksnc_proto = NULL;
810         }
811
812         rc = ksocknal_recv_hello(ni, conn, hello, &peerid, &incarnation);
813         if (rc < 0)
814                 goto failed_1;
815
816         LASSERT(rc == 0 || active);
817         LASSERT(conn->ksnc_proto != NULL);
818         LASSERT(peerid.nid != LNET_NID_ANY);
819
820         cpt = lnet_cpt_of_nid(peerid.nid, ni);
821
822         if (active) {
823                 ksocknal_peer_addref(peer_ni);
824                 write_lock_bh(global_lock);
825         } else {
826                 peer_ni = ksocknal_create_peer(ni, peerid);
827                 if (IS_ERR(peer_ni)) {
828                         rc = PTR_ERR(peer_ni);
829                         goto failed_1;
830                 }
831
832                 write_lock_bh(global_lock);
833
834                 /* called with a ref on ni, so shutdown can't have started */
835                 LASSERT(atomic_read(&((struct ksock_net *)ni->ni_data)->ksnn_npeers) >= 0);
836
837                 peer2 = ksocknal_find_peer_locked(ni, peerid);
838                 if (peer2 == NULL) {
839                         /* NB this puts an "empty" peer_ni in the peer_ni
840                          * table (which takes my ref) */
841                         hash_add(ksocknal_data.ksnd_peers,
842                                  &peer_ni->ksnp_list, peerid.nid);
843                 } else {
844                         ksocknal_peer_decref(peer_ni);
845                         peer_ni = peer2;
846                 }
847
848                 /* +1 ref for me */
849                 ksocknal_peer_addref(peer_ni);
850                 peer_ni->ksnp_accepting++;
851
852                 /* Am I already connecting to this guy?  Resolve in
853                  * favour of higher NID...
854                  */
855                 if (peerid.nid < ni->ni_nid &&
856                     ksocknal_connecting(peer_ni->ksnp_conn_cb,
857                                         ((struct sockaddr *) &conn->ksnc_peeraddr))) {
858                         rc = EALREADY;
859                         warn = "connection race resolution";
860                         goto failed_2;
861                 }
862         }
863
864         if (peer_ni->ksnp_closing ||
865             (active && conn_cb->ksnr_deleted)) {
866                 /* peer_ni/conn_cb got closed under me */
867                 rc = -ESTALE;
868                 warn = "peer_ni/conn_cb removed";
869                 goto failed_2;
870         }
871
872         if (peer_ni->ksnp_proto == NULL) {
873                 /* Never connected before.
874                  * NB recv_hello may have returned EPROTO to signal my peer_ni
875                  * wants a different protocol than the one I asked for.
876                  */
877                 LASSERT(list_empty(&peer_ni->ksnp_conns));
878
879                 peer_ni->ksnp_proto = conn->ksnc_proto;
880                 peer_ni->ksnp_incarnation = incarnation;
881         }
882
883         if (peer_ni->ksnp_proto != conn->ksnc_proto ||
884             peer_ni->ksnp_incarnation != incarnation) {
885                 /* peer_ni rebooted or I've got the wrong protocol version */
886                 ksocknal_close_peer_conns_locked(peer_ni, NULL, 0);
887
888                 peer_ni->ksnp_proto = NULL;
889                 rc = ESTALE;
890                 warn = peer_ni->ksnp_incarnation != incarnation ?
891                         "peer_ni rebooted" :
892                         "wrong proto version";
893                 goto failed_2;
894         }
895
896         switch (rc) {
897         default:
898                 LBUG();
899         case 0:
900                 break;
901         case EALREADY:
902                 warn = "lost conn race";
903                 goto failed_2;
904         case EPROTO:
905                 warn = "retry with different protocol version";
906                 goto failed_2;
907         }
908
909         /* Refuse to duplicate an existing connection, unless this is a
910          * loopback connection */
911         if (!rpc_cmp_addr((struct sockaddr *)&conn->ksnc_peeraddr,
912                           (struct sockaddr *)&conn->ksnc_myaddr)) {
913                 list_for_each(tmp, &peer_ni->ksnp_conns) {
914                         conn2 = list_entry(tmp, struct ksock_conn, ksnc_list);
915
916                         if (!rpc_cmp_addr(
917                                     (struct sockaddr *)&conn2->ksnc_peeraddr,
918                                     (struct sockaddr *)&conn->ksnc_peeraddr) ||
919                             !rpc_cmp_addr(
920                                     (struct sockaddr *)&conn2->ksnc_myaddr,
921                                     (struct sockaddr *)&conn->ksnc_myaddr) ||
922                             conn2->ksnc_type != conn->ksnc_type)
923                                 continue;
924
925                         /* Reply on a passive connection attempt so the peer_ni
926                          * realises we're connected. */
927                         LASSERT (rc == 0);
928                         if (!active)
929                                 rc = EALREADY;
930
931                         warn = "duplicate";
932                         goto failed_2;
933                 }
934         }
935
936         /* If the connection created by this route didn't bind to the IP
937          * address the route connected to, the connection/route matching
938          * code below probably isn't going to work.
939          */
940         if (active &&
941             !rpc_cmp_addr((struct sockaddr *)&conn_cb->ksnr_addr,
942                           (struct sockaddr *)&conn->ksnc_peeraddr)) {
943                 CERROR("Route %s %pIS connected to %pIS\n",
944                        libcfs_id2str(peer_ni->ksnp_id),
945                        &conn_cb->ksnr_addr,
946                        &conn->ksnc_peeraddr);
947         }
948
949         /* Search for a conn_cb corresponding to the new connection and
950          * create an association.  This allows incoming connections created
951          * by conn_cbs in my peer_ni to match my own conn_cb entries so I don't
952          * continually create duplicate conn_cbs.
953          */
954         conn_cb = peer_ni->ksnp_conn_cb;
955
956         if (conn_cb && rpc_cmp_addr((struct sockaddr *)&conn->ksnc_peeraddr,
957                                     (struct sockaddr *)&conn_cb->ksnr_addr))
958                 ksocknal_associate_cb_conn_locked(conn_cb, conn);
959
960         conn->ksnc_peer = peer_ni;                 /* conn takes my ref on peer_ni */
961         peer_ni->ksnp_last_alive = ktime_get_seconds();
962         peer_ni->ksnp_send_keepalive = 0;
963         peer_ni->ksnp_error = 0;
964
965         sched = ksocknal_choose_scheduler_locked(cpt);
966         if (!sched) {
967                 CERROR("no schedulers available. node is unhealthy\n");
968                 goto failed_2;
969         }
970         /*
971          * The cpt might have changed if we ended up selecting a non cpt
972          * native scheduler. So use the scheduler's cpt instead.
973          */
974         cpt = sched->kss_cpt;
975         sched->kss_nconns++;
976         conn->ksnc_scheduler = sched;
977
978         conn->ksnc_tx_last_post = ktime_get_seconds();
979         /* Set the deadline for the outgoing HELLO to drain */
980         conn->ksnc_tx_bufnob = sock->sk->sk_wmem_queued;
981         conn->ksnc_tx_deadline = ktime_get_seconds() +
982                                  ksocknal_timeout();
983         smp_mb();   /* order with adding to peer_ni's conn list */
984
985         list_add(&conn->ksnc_list, &peer_ni->ksnp_conns);
986         ksocknal_conn_addref(conn);
987
988         ksocknal_new_packet(conn, 0);
989
990         conn->ksnc_zc_capable = ksocknal_lib_zc_capable(conn);
991
992         /* Take packets blocking for this connection. */
993         list_for_each_entry_safe(tx, txtmp, &peer_ni->ksnp_tx_queue, tx_list) {
994                 if (conn->ksnc_proto->pro_match_tx(conn, tx, tx->tx_nonblk) ==
995                     SOCKNAL_MATCH_NO)
996                         continue;
997
998                 list_del(&tx->tx_list);
999                 ksocknal_queue_tx_locked(tx, conn);
1000         }
1001
1002         write_unlock_bh(global_lock);
1003
1004         /* We've now got a new connection.  Any errors from here on are just
1005          * like "normal" comms errors and we close the connection normally.
1006          * NB (a) we still have to send the reply HELLO for passive
1007          *        connections,
1008          *    (b) normal I/O on the conn is blocked until I setup and call the
1009          *        socket callbacks.
1010          */
1011
1012         CDEBUG(D_NET, "New conn %s p %d.x %pIS -> %pISp"
1013                " incarnation:%lld sched[%d]\n",
1014                libcfs_id2str(peerid), conn->ksnc_proto->pro_version,
1015                &conn->ksnc_myaddr, &conn->ksnc_peeraddr,
1016                incarnation, cpt);
1017
1018         if (!active) {
1019                 hello->kshm_nips = 0;
1020                 rc = ksocknal_send_hello(ni, conn, peerid.nid, hello);
1021         }
1022
1023         LIBCFS_FREE(hello, offsetof(struct ksock_hello_msg,
1024                                     kshm_ips[LNET_INTERFACES_NUM]));
1025
1026         /* setup the socket AFTER I've received hello (it disables
1027          * SO_LINGER).  I might call back to the acceptor who may want
1028          * to send a protocol version response and then close the
1029          * socket; this ensures the socket only tears down after the
1030          * response has been sent.
1031          */
1032         if (rc == 0)
1033                 rc = ksocknal_lib_setup_sock(sock);
1034
1035         write_lock_bh(global_lock);
1036
1037         /* NB my callbacks block while I hold ksnd_global_lock */
1038         ksocknal_lib_set_callback(sock, conn);
1039
1040         if (!active)
1041                 peer_ni->ksnp_accepting--;
1042
1043         write_unlock_bh(global_lock);
1044
1045         if (rc != 0) {
1046                 write_lock_bh(global_lock);
1047                 if (!conn->ksnc_closing) {
1048                         /* could be closed by another thread */
1049                         ksocknal_close_conn_locked(conn, rc);
1050                 }
1051                 write_unlock_bh(global_lock);
1052         } else if (ksocknal_connsock_addref(conn) == 0) {
1053                 /* Allow I/O to proceed. */
1054                 ksocknal_read_callback(conn);
1055                 ksocknal_write_callback(conn);
1056                 ksocknal_connsock_decref(conn);
1057         }
1058
1059         ksocknal_connsock_decref(conn);
1060         ksocknal_conn_decref(conn);
1061         return rc;
1062
1063 failed_2:
1064
1065         if (!peer_ni->ksnp_closing &&
1066             list_empty(&peer_ni->ksnp_conns) &&
1067             peer_ni->ksnp_conn_cb == NULL) {
1068                 list_splice_init(&peer_ni->ksnp_tx_queue, &zombies);
1069                 ksocknal_unlink_peer_locked(peer_ni);
1070         }
1071
1072         write_unlock_bh(global_lock);
1073
1074         if (warn != NULL) {
1075                 if (rc < 0)
1076                         CERROR("Not creating conn %s type %d: %s\n",
1077                                libcfs_id2str(peerid), conn->ksnc_type, warn);
1078                 else
1079                         CDEBUG(D_NET, "Not creating conn %s type %d: %s\n",
1080                                libcfs_id2str(peerid), conn->ksnc_type, warn);
1081         }
1082
1083         if (!active) {
1084                 if (rc > 0) {
1085                         /* Request retry by replying with CONN_NONE
1086                          * ksnc_proto has been set already
1087                          */
1088                         conn->ksnc_type = SOCKLND_CONN_NONE;
1089                         hello->kshm_nips = 0;
1090                         ksocknal_send_hello(ni, conn, peerid.nid, hello);
1091                 }
1092
1093                 write_lock_bh(global_lock);
1094                 peer_ni->ksnp_accepting--;
1095                 write_unlock_bh(global_lock);
1096         }
1097
1098         /*
1099          * If we get here without an error code, just use -EALREADY.
1100          * Depending on how we got here, the error may be positive
1101          * or negative. Normalize the value for ksocknal_txlist_done().
1102          */
1103         rc2 = (rc == 0 ? -EALREADY : (rc > 0 ? -rc : rc));
1104         ksocknal_txlist_done(ni, &zombies, rc2);
1105         ksocknal_peer_decref(peer_ni);
1106
1107 failed_1:
1108         if (hello != NULL)
1109                 LIBCFS_FREE(hello, offsetof(struct ksock_hello_msg,
1110                                             kshm_ips[LNET_INTERFACES_NUM]));
1111
1112         LIBCFS_FREE(conn, sizeof(*conn));
1113
1114 failed_0:
1115         sock_release(sock);
1116
1117         return rc;
1118 }
1119
1120 void
1121 ksocknal_close_conn_locked(struct ksock_conn *conn, int error)
1122 {
1123         /* This just does the immmediate housekeeping, and queues the
1124          * connection for the reaper to terminate.
1125          * Caller holds ksnd_global_lock exclusively in irq context */
1126         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1127         struct ksock_conn_cb *conn_cb;
1128         struct ksock_conn *conn2;
1129         struct list_head *tmp;
1130
1131         LASSERT(peer_ni->ksnp_error == 0);
1132         LASSERT(!conn->ksnc_closing);
1133         conn->ksnc_closing = 1;
1134
1135         /* ksnd_deathrow_conns takes over peer_ni's ref */
1136         list_del(&conn->ksnc_list);
1137
1138         conn_cb = conn->ksnc_conn_cb;
1139         if (conn_cb != NULL) {
1140                 /* dissociate conn from cb... */
1141                 LASSERT(!conn_cb->ksnr_deleted);
1142                 LASSERT((conn_cb->ksnr_connected & BIT(conn->ksnc_type)) != 0);
1143
1144                 conn2 = NULL;
1145                 list_for_each(tmp, &peer_ni->ksnp_conns) {
1146                         conn2 = list_entry(tmp, struct ksock_conn, ksnc_list);
1147
1148                         if (conn2->ksnc_conn_cb == conn_cb &&
1149                             conn2->ksnc_type == conn->ksnc_type)
1150                                 break;
1151
1152                         conn2 = NULL;
1153                 }
1154                 if (conn2 == NULL)
1155                         conn_cb->ksnr_connected &= ~BIT(conn->ksnc_type);
1156
1157                 conn->ksnc_conn_cb = NULL;
1158
1159                 /* drop conn's ref on conn_cb */
1160                 ksocknal_conn_cb_decref(conn_cb);
1161         }
1162
1163         if (list_empty(&peer_ni->ksnp_conns)) {
1164                 /* No more connections to this peer_ni */
1165
1166                 if (!list_empty(&peer_ni->ksnp_tx_queue)) {
1167                         struct ksock_tx *tx;
1168
1169                         LASSERT(conn->ksnc_proto == &ksocknal_protocol_v3x);
1170
1171                         /* throw them to the last connection...,
1172                          * these TXs will be send to /dev/null by scheduler */
1173                         list_for_each_entry(tx, &peer_ni->ksnp_tx_queue,
1174                                             tx_list)
1175                                 ksocknal_tx_prep(conn, tx);
1176
1177                         spin_lock_bh(&conn->ksnc_scheduler->kss_lock);
1178                         list_splice_init(&peer_ni->ksnp_tx_queue,
1179                                          &conn->ksnc_tx_queue);
1180                         spin_unlock_bh(&conn->ksnc_scheduler->kss_lock);
1181                 }
1182
1183                 /* renegotiate protocol version */
1184                 peer_ni->ksnp_proto = NULL;
1185                 /* stash last conn close reason */
1186                 peer_ni->ksnp_error = error;
1187
1188                 if (peer_ni->ksnp_conn_cb == NULL) {
1189                         /* I've just closed last conn belonging to a
1190                          * peer_ni with no connections to it
1191                          */
1192                         ksocknal_unlink_peer_locked(peer_ni);
1193                 }
1194         }
1195
1196         spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
1197
1198         list_add_tail(&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
1199         wake_up(&ksocknal_data.ksnd_reaper_waitq);
1200
1201         spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
1202 }
1203
1204 void
1205 ksocknal_peer_failed(struct ksock_peer_ni *peer_ni)
1206 {
1207         bool notify = false;
1208         time64_t last_alive = 0;
1209
1210         /* There has been a connection failure or comms error; but I'll only
1211          * tell LNET I think the peer_ni is dead if it's to another kernel and
1212          * there are no connections or connection attempts in existence. */
1213
1214         read_lock(&ksocknal_data.ksnd_global_lock);
1215
1216         if ((peer_ni->ksnp_id.pid & LNET_PID_USERFLAG) == 0 &&
1217              list_empty(&peer_ni->ksnp_conns) &&
1218              peer_ni->ksnp_accepting == 0 &&
1219              !ksocknal_find_connecting_conn_cb_locked(peer_ni)) {
1220                 notify = true;
1221                 last_alive = peer_ni->ksnp_last_alive;
1222         }
1223
1224         read_unlock(&ksocknal_data.ksnd_global_lock);
1225
1226         if (notify)
1227                 lnet_notify(peer_ni->ksnp_ni, peer_ni->ksnp_id.nid,
1228                             false, false, last_alive);
1229 }
1230
1231 void
1232 ksocknal_finalize_zcreq(struct ksock_conn *conn)
1233 {
1234         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1235         struct ksock_tx *tx;
1236         struct ksock_tx *tmp;
1237         LIST_HEAD(zlist);
1238
1239         /* NB safe to finalize TXs because closing of socket will
1240          * abort all buffered data */
1241         LASSERT(conn->ksnc_sock == NULL);
1242
1243         spin_lock(&peer_ni->ksnp_lock);
1244
1245         list_for_each_entry_safe(tx, tmp, &peer_ni->ksnp_zc_req_list, tx_zc_list) {
1246                 if (tx->tx_conn != conn)
1247                         continue;
1248
1249                 LASSERT(tx->tx_msg.ksm_zc_cookies[0] != 0);
1250
1251                 tx->tx_msg.ksm_zc_cookies[0] = 0;
1252                 tx->tx_zc_aborted = 1;  /* mark it as not-acked */
1253                 list_move(&tx->tx_zc_list, &zlist);
1254         }
1255
1256         spin_unlock(&peer_ni->ksnp_lock);
1257
1258         while (!list_empty(&zlist)) {
1259                 tx = list_entry(zlist.next, struct ksock_tx, tx_zc_list);
1260
1261                 list_del(&tx->tx_zc_list);
1262                 ksocknal_tx_decref(tx);
1263         }
1264 }
1265
1266 void
1267 ksocknal_terminate_conn(struct ksock_conn *conn)
1268 {
1269         /* This gets called by the reaper (guaranteed thread context) to
1270          * disengage the socket from its callbacks and close it.
1271          * ksnc_refcount will eventually hit zero, and then the reaper will
1272          * destroy it.
1273          */
1274         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1275         struct ksock_sched *sched = conn->ksnc_scheduler;
1276         bool failed = false;
1277
1278         LASSERT(conn->ksnc_closing);
1279
1280         /* wake up the scheduler to "send" all remaining packets to /dev/null */
1281         spin_lock_bh(&sched->kss_lock);
1282
1283         /* a closing conn is always ready to tx */
1284         conn->ksnc_tx_ready = 1;
1285
1286         if (!conn->ksnc_tx_scheduled &&
1287             !list_empty(&conn->ksnc_tx_queue)) {
1288                 list_add_tail(&conn->ksnc_tx_list,
1289                               &sched->kss_tx_conns);
1290                 conn->ksnc_tx_scheduled = 1;
1291                 /* extra ref for scheduler */
1292                 ksocknal_conn_addref(conn);
1293
1294                 wake_up(&sched->kss_waitq);
1295         }
1296
1297         spin_unlock_bh(&sched->kss_lock);
1298
1299         /* serialise with callbacks */
1300         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1301
1302         ksocknal_lib_reset_callback(conn->ksnc_sock, conn);
1303
1304         /* OK, so this conn may not be completely disengaged from its
1305          * scheduler yet, but it _has_ committed to terminate...
1306          */
1307         conn->ksnc_scheduler->kss_nconns--;
1308
1309         if (peer_ni->ksnp_error != 0) {
1310                 /* peer_ni's last conn closed in error */
1311                 LASSERT(list_empty(&peer_ni->ksnp_conns));
1312                 failed = true;
1313                 peer_ni->ksnp_error = 0;     /* avoid multiple notifications */
1314         }
1315
1316         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1317
1318         if (failed)
1319                 ksocknal_peer_failed(peer_ni);
1320
1321         /* The socket is closed on the final put; either here, or in
1322          * ksocknal_{send,recv}msg().  Since we set up the linger2 option
1323          * when the connection was established, this will close the socket
1324          * immediately, aborting anything buffered in it. Any hung
1325          * zero-copy transmits will therefore complete in finite time.
1326          */
1327         ksocknal_connsock_decref(conn);
1328 }
1329
1330 void
1331 ksocknal_queue_zombie_conn(struct ksock_conn *conn)
1332 {
1333         /* Queue the conn for the reaper to destroy */
1334         LASSERT(refcount_read(&conn->ksnc_conn_refcount) == 0);
1335         spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
1336
1337         list_add_tail(&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1338         wake_up(&ksocknal_data.ksnd_reaper_waitq);
1339
1340         spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
1341 }
1342
1343 void
1344 ksocknal_destroy_conn(struct ksock_conn *conn)
1345 {
1346         time64_t last_rcv;
1347
1348         /* Final coup-de-grace of the reaper */
1349         CDEBUG(D_NET, "connection %p\n", conn);
1350
1351         LASSERT(refcount_read(&conn->ksnc_conn_refcount) == 0);
1352         LASSERT(refcount_read(&conn->ksnc_sock_refcount) == 0);
1353         LASSERT(conn->ksnc_sock == NULL);
1354         LASSERT(conn->ksnc_conn_cb == NULL);
1355         LASSERT(!conn->ksnc_tx_scheduled);
1356         LASSERT(!conn->ksnc_rx_scheduled);
1357         LASSERT(list_empty(&conn->ksnc_tx_queue));
1358
1359         /* complete current receive if any */
1360         switch (conn->ksnc_rx_state) {
1361         case SOCKNAL_RX_LNET_PAYLOAD:
1362                 last_rcv = conn->ksnc_rx_deadline -
1363                            ksocknal_timeout();
1364                 CERROR("Completing partial receive from %s[%d], ip %pISp, with error, wanted: %d, left: %d, last alive is %lld secs ago\n",
1365                        libcfs_id2str(conn->ksnc_peer->ksnp_id), conn->ksnc_type,
1366                        &conn->ksnc_peeraddr,
1367                        conn->ksnc_rx_nob_wanted, conn->ksnc_rx_nob_left,
1368                        ktime_get_seconds() - last_rcv);
1369                 if (conn->ksnc_lnet_msg)
1370                         conn->ksnc_lnet_msg->msg_health_status =
1371                                 LNET_MSG_STATUS_REMOTE_ERROR;
1372                 lnet_finalize(conn->ksnc_lnet_msg, -EIO);
1373                 break;
1374         case SOCKNAL_RX_LNET_HEADER:
1375                 if (conn->ksnc_rx_started)
1376                         CERROR("Incomplete receive of lnet header from %s, ip %pISp, with error, protocol: %d.x.\n",
1377                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1378                                &conn->ksnc_peeraddr,
1379                                conn->ksnc_proto->pro_version);
1380                 break;
1381         case SOCKNAL_RX_KSM_HEADER:
1382                 if (conn->ksnc_rx_started)
1383                         CERROR("Incomplete receive of ksock message from %s, ip %pISp, with error, protocol: %d.x.\n",
1384                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1385                                &conn->ksnc_peeraddr,
1386                                conn->ksnc_proto->pro_version);
1387                 break;
1388         case SOCKNAL_RX_SLOP:
1389                 if (conn->ksnc_rx_started)
1390                         CERROR("Incomplete receive of slops from %s, ip %pISp, with error\n",
1391                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1392                                &conn->ksnc_peeraddr);
1393                break;
1394         default:
1395                 LBUG ();
1396                 break;
1397         }
1398
1399         ksocknal_peer_decref(conn->ksnc_peer);
1400
1401         LIBCFS_FREE (conn, sizeof (*conn));
1402 }
1403
1404 int
1405 ksocknal_close_peer_conns_locked(struct ksock_peer_ni *peer_ni,
1406                                  struct sockaddr *addr, int why)
1407 {
1408         struct ksock_conn *conn;
1409         struct ksock_conn *cnxt;
1410         int count = 0;
1411
1412         list_for_each_entry_safe(conn, cnxt, &peer_ni->ksnp_conns, ksnc_list) {
1413                 if (!addr ||
1414                     rpc_cmp_addr(addr,
1415                                  (struct sockaddr *)&conn->ksnc_peeraddr)) {
1416                         count++;
1417                         ksocknal_close_conn_locked(conn, why);
1418                 }
1419         }
1420
1421         return count;
1422 }
1423
1424 int
1425 ksocknal_close_conn_and_siblings(struct ksock_conn *conn, int why)
1426 {
1427         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1428         int count;
1429
1430         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1431
1432         count = ksocknal_close_peer_conns_locked(
1433                 peer_ni, (struct sockaddr *)&conn->ksnc_peeraddr, why);
1434
1435         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1436
1437         return count;
1438 }
1439
1440 int
1441 ksocknal_close_matching_conns(struct lnet_process_id id, __u32 ipaddr)
1442 {
1443         struct ksock_peer_ni *peer_ni;
1444         struct hlist_node *pnxt;
1445         int lo;
1446         int hi;
1447         int i;
1448         int count = 0;
1449         struct sockaddr_in sa = {.sin_family = AF_INET};
1450
1451         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1452
1453         if (id.nid != LNET_NID_ANY) {
1454                 lo = hash_min(id.nid, HASH_BITS(ksocknal_data.ksnd_peers));
1455                 hi = lo;
1456         } else {
1457                 lo = 0;
1458                 hi = HASH_SIZE(ksocknal_data.ksnd_peers) - 1;
1459         }
1460
1461         sa.sin_addr.s_addr = htonl(ipaddr);
1462         for (i = lo; i <= hi; i++) {
1463                 hlist_for_each_entry_safe(peer_ni, pnxt,
1464                                           &ksocknal_data.ksnd_peers[i],
1465                                           ksnp_list) {
1466
1467                         if (!((id.nid == LNET_NID_ANY ||
1468                                id.nid == peer_ni->ksnp_id.nid) &&
1469                               (id.pid == LNET_PID_ANY ||
1470                                id.pid == peer_ni->ksnp_id.pid)))
1471                                 continue;
1472
1473                         count += ksocknal_close_peer_conns_locked(
1474                                 peer_ni,
1475                                 ipaddr ? (struct sockaddr *)&sa : NULL, 0);
1476                 }
1477         }
1478
1479         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1480
1481         /* wildcards always succeed */
1482         if (id.nid == LNET_NID_ANY || id.pid == LNET_PID_ANY || ipaddr == 0)
1483                 return 0;
1484
1485         return (count == 0 ? -ENOENT : 0);
1486 }
1487
1488 void
1489 ksocknal_notify_gw_down(lnet_nid_t gw_nid)
1490 {
1491         /* The router is telling me she's been notified of a change in
1492          * gateway state....
1493          */
1494         struct lnet_process_id id = {
1495                 .nid    = gw_nid,
1496                 .pid    = LNET_PID_ANY,
1497         };
1498
1499         CDEBUG(D_NET, "gw %s down\n", libcfs_nid2str(gw_nid));
1500
1501         /* If the gateway crashed, close all open connections... */
1502         ksocknal_close_matching_conns(id, 0);
1503         return;
1504
1505         /* We can only establish new connections
1506          * if we have autroutes, and these connect on demand. */
1507 }
1508
1509 static void
1510 ksocknal_push_peer(struct ksock_peer_ni *peer_ni)
1511 {
1512         int index;
1513         int i;
1514         struct list_head *tmp;
1515         struct ksock_conn *conn;
1516
1517         for (index = 0; ; index++) {
1518                 read_lock(&ksocknal_data.ksnd_global_lock);
1519
1520                 i = 0;
1521                 conn = NULL;
1522
1523                 list_for_each(tmp, &peer_ni->ksnp_conns) {
1524                         if (i++ == index) {
1525                                 conn = list_entry(tmp, struct ksock_conn,
1526                                                   ksnc_list);
1527                                 ksocknal_conn_addref(conn);
1528                                 break;
1529                         }
1530                 }
1531
1532                 read_unlock(&ksocknal_data.ksnd_global_lock);
1533
1534                 if (conn == NULL)
1535                         break;
1536
1537                 ksocknal_lib_push_conn (conn);
1538                 ksocknal_conn_decref(conn);
1539         }
1540 }
1541
1542 static int
1543 ksocknal_push(struct lnet_ni *ni, struct lnet_process_id id)
1544 {
1545         int lo;
1546         int hi;
1547         int bkt;
1548         int rc = -ENOENT;
1549
1550         if (id.nid != LNET_NID_ANY) {
1551                 lo = hash_min(id.nid, HASH_BITS(ksocknal_data.ksnd_peers));
1552                 hi = lo;
1553         } else {
1554                 lo = 0;
1555                 hi = HASH_SIZE(ksocknal_data.ksnd_peers) - 1;
1556         }
1557
1558         for (bkt = lo; bkt <= hi; bkt++) {
1559                 int peer_off; /* searching offset in peer_ni hash table */
1560
1561                 for (peer_off = 0; ; peer_off++) {
1562                         struct ksock_peer_ni *peer_ni;
1563                         int           i = 0;
1564
1565                         read_lock(&ksocknal_data.ksnd_global_lock);
1566                         hlist_for_each_entry(peer_ni,
1567                                              &ksocknal_data.ksnd_peers[bkt],
1568                                              ksnp_list) {
1569                                 if (!((id.nid == LNET_NID_ANY ||
1570                                        id.nid == peer_ni->ksnp_id.nid) &&
1571                                       (id.pid == LNET_PID_ANY ||
1572                                        id.pid == peer_ni->ksnp_id.pid)))
1573                                         continue;
1574
1575                                 if (i++ == peer_off) {
1576                                         ksocknal_peer_addref(peer_ni);
1577                                         break;
1578                                 }
1579                         }
1580                         read_unlock(&ksocknal_data.ksnd_global_lock);
1581
1582                         if (i <= peer_off) /* no match */
1583                                 break;
1584
1585                         rc = 0;
1586                         ksocknal_push_peer(peer_ni);
1587                         ksocknal_peer_decref(peer_ni);
1588                 }
1589         }
1590         return rc;
1591 }
1592
1593 int
1594 ksocknal_ctl(struct lnet_ni *ni, unsigned int cmd, void *arg)
1595 {
1596         struct lnet_process_id id = {0};
1597         struct libcfs_ioctl_data *data = arg;
1598         int rc;
1599
1600         switch(cmd) {
1601         case IOC_LIBCFS_GET_INTERFACE: {
1602                 struct ksock_net *net = ni->ni_data;
1603                 struct ksock_interface *iface;
1604                 struct sockaddr_in *sa;
1605
1606                 read_lock(&ksocknal_data.ksnd_global_lock);
1607
1608                 if (data->ioc_count >= 1) {
1609                         rc = -ENOENT;
1610                 } else {
1611                         rc = 0;
1612                         iface = &net->ksnn_interface;
1613
1614                         sa = (void *)&iface->ksni_addr;
1615                         if (sa->sin_family == AF_INET)
1616                                 data->ioc_u32[0] = ntohl(sa->sin_addr.s_addr);
1617                         else
1618                                 data->ioc_u32[0] = 0xFFFFFFFF;
1619                         data->ioc_u32[1] = iface->ksni_netmask;
1620                         data->ioc_u32[2] = iface->ksni_npeers;
1621                         data->ioc_u32[3] = iface->ksni_nroutes;
1622                 }
1623
1624                 read_unlock(&ksocknal_data.ksnd_global_lock);
1625                 return rc;
1626         }
1627
1628         case IOC_LIBCFS_GET_PEER: {
1629                 __u32            myip = 0;
1630                 __u32            ip = 0;
1631                 int              port = 0;
1632                 int              conn_count = 0;
1633                 int              share_count = 0;
1634
1635                 rc = ksocknal_get_peer_info(ni, data->ioc_count,
1636                                             &id, &myip, &ip, &port,
1637                                             &conn_count,  &share_count);
1638                 if (rc != 0)
1639                         return rc;
1640
1641                 data->ioc_nid    = id.nid;
1642                 data->ioc_count  = share_count;
1643                 data->ioc_u32[0] = ip;
1644                 data->ioc_u32[1] = port;
1645                 data->ioc_u32[2] = myip;
1646                 data->ioc_u32[3] = conn_count;
1647                 data->ioc_u32[4] = id.pid;
1648                 return 0;
1649         }
1650
1651         case IOC_LIBCFS_ADD_PEER: {
1652                 struct sockaddr_in sa = {.sin_family = AF_INET};
1653
1654                 id.nid = data->ioc_nid;
1655                 id.pid = LNET_PID_LUSTRE;
1656                 sa.sin_addr.s_addr = htonl(data->ioc_u32[0]);
1657                 sa.sin_port = htons(data->ioc_u32[1]);
1658                 return ksocknal_add_peer(ni, id, (struct sockaddr *)&sa);
1659         }
1660         case IOC_LIBCFS_DEL_PEER:
1661                 id.nid = data->ioc_nid;
1662                 id.pid = LNET_PID_ANY;
1663                 return ksocknal_del_peer (ni, id,
1664                                           data->ioc_u32[0]); /* IP */
1665
1666         case IOC_LIBCFS_GET_CONN: {
1667                 int           txmem;
1668                 int           rxmem;
1669                 int           nagle;
1670                 struct ksock_conn *conn = ksocknal_get_conn_by_idx(ni, data->ioc_count);
1671                 struct sockaddr_in *psa = (void *)&conn->ksnc_peeraddr;
1672                 struct sockaddr_in *mysa = (void *)&conn->ksnc_myaddr;
1673
1674                 if (conn == NULL)
1675                         return -ENOENT;
1676
1677                 ksocknal_lib_get_conn_tunables(conn, &txmem, &rxmem, &nagle);
1678
1679                 data->ioc_count  = txmem;
1680                 data->ioc_nid    = conn->ksnc_peer->ksnp_id.nid;
1681                 data->ioc_flags  = nagle;
1682                 if (psa->sin_family == AF_INET)
1683                         data->ioc_u32[0] = ntohl(psa->sin_addr.s_addr);
1684                 else
1685                         data->ioc_u32[0] = 0xFFFFFFFF;
1686                 data->ioc_u32[1] = rpc_get_port((struct sockaddr *)
1687                                                 &conn->ksnc_peeraddr);
1688                 if (mysa->sin_family == AF_INET)
1689                         data->ioc_u32[2] = ntohl(mysa->sin_addr.s_addr);
1690                 else
1691                         data->ioc_u32[2] = 0xFFFFFFFF;
1692                 data->ioc_u32[3] = conn->ksnc_type;
1693                 data->ioc_u32[4] = conn->ksnc_scheduler->kss_cpt;
1694                 data->ioc_u32[5] = rxmem;
1695                 data->ioc_u32[6] = conn->ksnc_peer->ksnp_id.pid;
1696                 ksocknal_conn_decref(conn);
1697                 return 0;
1698         }
1699
1700         case IOC_LIBCFS_CLOSE_CONNECTION:
1701                 id.nid = data->ioc_nid;
1702                 id.pid = LNET_PID_ANY;
1703                 return ksocknal_close_matching_conns (id,
1704                                                       data->ioc_u32[0]);
1705
1706         case IOC_LIBCFS_REGISTER_MYNID:
1707                 /* Ignore if this is a noop */
1708                 if (data->ioc_nid == ni->ni_nid)
1709                         return 0;
1710
1711                 CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID: %s(%s)\n",
1712                        libcfs_nid2str(data->ioc_nid),
1713                        libcfs_nid2str(ni->ni_nid));
1714                 return -EINVAL;
1715
1716         case IOC_LIBCFS_PUSH_CONNECTION:
1717                 id.nid = data->ioc_nid;
1718                 id.pid = LNET_PID_ANY;
1719                 return ksocknal_push(ni, id);
1720
1721         default:
1722                 return -EINVAL;
1723         }
1724         /* not reached */
1725 }
1726
1727 static void
1728 ksocknal_free_buffers (void)
1729 {
1730         LASSERT (atomic_read(&ksocknal_data.ksnd_nactive_txs) == 0);
1731
1732         if (ksocknal_data.ksnd_schedulers != NULL)
1733                 cfs_percpt_free(ksocknal_data.ksnd_schedulers);
1734
1735         spin_lock(&ksocknal_data.ksnd_tx_lock);
1736
1737         if (!list_empty(&ksocknal_data.ksnd_idle_noop_txs)) {
1738                 LIST_HEAD(zlist);
1739                 struct ksock_tx *tx;
1740
1741                 list_splice_init(&ksocknal_data.ksnd_idle_noop_txs, &zlist);
1742                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
1743
1744                 while (!list_empty(&zlist)) {
1745                         tx = list_entry(zlist.next, struct ksock_tx, tx_list);
1746                         list_del(&tx->tx_list);
1747                         LIBCFS_FREE(tx, tx->tx_desc_size);
1748                 }
1749         } else {
1750                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
1751         }
1752 }
1753
1754 static void
1755 ksocknal_base_shutdown(void)
1756 {
1757         struct ksock_sched *sched;
1758         struct ksock_peer_ni *peer_ni;
1759         int i;
1760
1761         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %lld\n",
1762                libcfs_kmem_read());
1763         LASSERT (ksocknal_data.ksnd_nnets == 0);
1764
1765         switch (ksocknal_data.ksnd_init) {
1766         default:
1767                 LASSERT(0);
1768                 /* fallthrough */
1769
1770         case SOCKNAL_INIT_ALL:
1771         case SOCKNAL_INIT_DATA:
1772                 hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list)
1773                         LASSERT(0);
1774
1775                 LASSERT(list_empty(&ksocknal_data.ksnd_nets));
1776                 LASSERT(list_empty(&ksocknal_data.ksnd_enomem_conns));
1777                 LASSERT(list_empty(&ksocknal_data.ksnd_zombie_conns));
1778                 LASSERT(list_empty(&ksocknal_data.ksnd_connd_connreqs));
1779                 LASSERT(list_empty(&ksocknal_data.ksnd_connd_routes));
1780
1781                 if (ksocknal_data.ksnd_schedulers != NULL) {
1782                         cfs_percpt_for_each(sched, i,
1783                                             ksocknal_data.ksnd_schedulers) {
1784
1785                                 LASSERT(list_empty(&sched->kss_tx_conns));
1786                                 LASSERT(list_empty(&sched->kss_rx_conns));
1787                                 LASSERT(list_empty(&sched->kss_zombie_noop_txs));
1788                                 LASSERT(sched->kss_nconns == 0);
1789                         }
1790                 }
1791
1792                 /* flag threads to terminate; wake and wait for them to die */
1793                 ksocknal_data.ksnd_shuttingdown = 1;
1794                 wake_up_all(&ksocknal_data.ksnd_connd_waitq);
1795                 wake_up(&ksocknal_data.ksnd_reaper_waitq);
1796
1797                 if (ksocknal_data.ksnd_schedulers != NULL) {
1798                         cfs_percpt_for_each(sched, i,
1799                                             ksocknal_data.ksnd_schedulers)
1800                                         wake_up_all(&sched->kss_waitq);
1801                 }
1802
1803                 wait_var_event_warning(&ksocknal_data.ksnd_nthreads,
1804                                        atomic_read(&ksocknal_data.ksnd_nthreads) == 0,
1805                                        "waiting for %d threads to terminate\n",
1806                                        atomic_read(&ksocknal_data.ksnd_nthreads));
1807
1808                 ksocknal_free_buffers();
1809
1810                 ksocknal_data.ksnd_init = SOCKNAL_INIT_NOTHING;
1811                 break;
1812         }
1813
1814         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %lld\n",
1815                libcfs_kmem_read());
1816
1817         module_put(THIS_MODULE);
1818 }
1819
1820 static int
1821 ksocknal_base_startup(void)
1822 {
1823         struct ksock_sched *sched;
1824         int rc;
1825         int i;
1826
1827         LASSERT(ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
1828         LASSERT(ksocknal_data.ksnd_nnets == 0);
1829
1830         memset(&ksocknal_data, 0, sizeof(ksocknal_data)); /* zero pointers */
1831
1832         hash_init(ksocknal_data.ksnd_peers);
1833
1834         rwlock_init(&ksocknal_data.ksnd_global_lock);
1835         INIT_LIST_HEAD(&ksocknal_data.ksnd_nets);
1836
1837         spin_lock_init(&ksocknal_data.ksnd_reaper_lock);
1838         INIT_LIST_HEAD(&ksocknal_data.ksnd_enomem_conns);
1839         INIT_LIST_HEAD(&ksocknal_data.ksnd_zombie_conns);
1840         INIT_LIST_HEAD(&ksocknal_data.ksnd_deathrow_conns);
1841         init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
1842
1843         spin_lock_init(&ksocknal_data.ksnd_connd_lock);
1844         INIT_LIST_HEAD(&ksocknal_data.ksnd_connd_connreqs);
1845         INIT_LIST_HEAD(&ksocknal_data.ksnd_connd_routes);
1846         init_waitqueue_head(&ksocknal_data.ksnd_connd_waitq);
1847
1848         spin_lock_init(&ksocknal_data.ksnd_tx_lock);
1849         INIT_LIST_HEAD(&ksocknal_data.ksnd_idle_noop_txs);
1850
1851         /* NB memset above zeros whole of ksocknal_data */
1852
1853         /* flag lists/ptrs/locks initialised */
1854         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
1855         if (!try_module_get(THIS_MODULE))
1856                 goto failed;
1857
1858         /* Create a scheduler block per available CPT */
1859         ksocknal_data.ksnd_schedulers = cfs_percpt_alloc(lnet_cpt_table(),
1860                                                          sizeof(*sched));
1861         if (ksocknal_data.ksnd_schedulers == NULL)
1862                 goto failed;
1863
1864         cfs_percpt_for_each(sched, i, ksocknal_data.ksnd_schedulers) {
1865                 int nthrs;
1866
1867                 /*
1868                  * make sure not to allocate more threads than there are
1869                  * cores/CPUs in teh CPT
1870                  */
1871                 nthrs = cfs_cpt_weight(lnet_cpt_table(), i);
1872                 if (*ksocknal_tunables.ksnd_nscheds > 0) {
1873                         nthrs = min(nthrs, *ksocknal_tunables.ksnd_nscheds);
1874                 } else {
1875                         /*
1876                          * max to half of CPUs, assume another half should be
1877                          * reserved for upper layer modules
1878                          */
1879                         nthrs = min(max(SOCKNAL_NSCHEDS, nthrs >> 1), nthrs);
1880                 }
1881
1882                 sched->kss_nthreads_max = nthrs;
1883                 sched->kss_cpt = i;
1884
1885                 spin_lock_init(&sched->kss_lock);
1886                 INIT_LIST_HEAD(&sched->kss_rx_conns);
1887                 INIT_LIST_HEAD(&sched->kss_tx_conns);
1888                 INIT_LIST_HEAD(&sched->kss_zombie_noop_txs);
1889                 init_waitqueue_head(&sched->kss_waitq);
1890         }
1891
1892         ksocknal_data.ksnd_connd_starting         = 0;
1893         ksocknal_data.ksnd_connd_failed_stamp     = 0;
1894         ksocknal_data.ksnd_connd_starting_stamp   = ktime_get_real_seconds();
1895         /* must have at least 2 connds to remain responsive to accepts while
1896          * connecting */
1897         if (*ksocknal_tunables.ksnd_nconnds < SOCKNAL_CONND_RESV + 1)
1898                 *ksocknal_tunables.ksnd_nconnds = SOCKNAL_CONND_RESV + 1;
1899
1900         if (*ksocknal_tunables.ksnd_nconnds_max <
1901             *ksocknal_tunables.ksnd_nconnds) {
1902                 ksocknal_tunables.ksnd_nconnds_max =
1903                         ksocknal_tunables.ksnd_nconnds;
1904         }
1905
1906         for (i = 0; i < *ksocknal_tunables.ksnd_nconnds; i++) {
1907                 char name[16];
1908                 spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
1909                 ksocknal_data.ksnd_connd_starting++;
1910                 spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
1911
1912
1913                 snprintf(name, sizeof(name), "socknal_cd%02d", i);
1914                 rc = ksocknal_thread_start(ksocknal_connd,
1915                                            (void *)((uintptr_t)i), name);
1916                 if (rc != 0) {
1917                         spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
1918                         ksocknal_data.ksnd_connd_starting--;
1919                         spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
1920                         CERROR("Can't spawn socknal connd: %d\n", rc);
1921                         goto failed;
1922                 }
1923         }
1924
1925         rc = ksocknal_thread_start(ksocknal_reaper, NULL, "socknal_reaper");
1926         if (rc != 0) {
1927                 CERROR ("Can't spawn socknal reaper: %d\n", rc);
1928                 goto failed;
1929         }
1930
1931         /* flag everything initialised */
1932         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
1933
1934         return 0;
1935
1936  failed:
1937         ksocknal_base_shutdown();
1938         return -ENETDOWN;
1939 }
1940
1941 static int
1942 ksocknal_debug_peerhash(struct lnet_ni *ni)
1943 {
1944         struct ksock_peer_ni *peer_ni;
1945         int i;
1946
1947         read_lock(&ksocknal_data.ksnd_global_lock);
1948
1949         hash_for_each(ksocknal_data.ksnd_peers, i, peer_ni, ksnp_list) {
1950                 struct ksock_conn_cb *conn_cb;
1951                 struct ksock_conn *conn;
1952
1953                 if (peer_ni->ksnp_ni != ni)
1954                         continue;
1955
1956                 CWARN("Active peer_ni on shutdown: %s, ref %d, "
1957                       "closing %d, accepting %d, err %d, zcookie %llu, "
1958                       "txq %d, zc_req %d\n", libcfs_id2str(peer_ni->ksnp_id),
1959                       refcount_read(&peer_ni->ksnp_refcount),
1960                       peer_ni->ksnp_closing,
1961                       peer_ni->ksnp_accepting, peer_ni->ksnp_error,
1962                       peer_ni->ksnp_zc_next_cookie,
1963                       !list_empty(&peer_ni->ksnp_tx_queue),
1964                       !list_empty(&peer_ni->ksnp_zc_req_list));
1965
1966                 conn_cb = peer_ni->ksnp_conn_cb;
1967                 if (conn_cb) {
1968                         CWARN("ConnCB: ref %d, schd %d, conn %d, cnted %d, del %d\n",
1969                               refcount_read(&conn_cb->ksnr_refcount),
1970                               conn_cb->ksnr_scheduled, conn_cb->ksnr_connecting,
1971                               conn_cb->ksnr_connected, conn_cb->ksnr_deleted);
1972                 }
1973
1974                 list_for_each_entry(conn, &peer_ni->ksnp_conns, ksnc_list) {
1975                         CWARN("Conn: ref %d, sref %d, t %d, c %d\n",
1976                               refcount_read(&conn->ksnc_conn_refcount),
1977                               refcount_read(&conn->ksnc_sock_refcount),
1978                               conn->ksnc_type, conn->ksnc_closing);
1979                 }
1980                 break;
1981         }
1982
1983         read_unlock(&ksocknal_data.ksnd_global_lock);
1984         return 0;
1985 }
1986
1987 void
1988 ksocknal_shutdown(struct lnet_ni *ni)
1989 {
1990         struct ksock_net *net = ni->ni_data;
1991         struct lnet_process_id anyid = {
1992                 .nid = LNET_NID_ANY,
1993                 .pid = LNET_PID_ANY,
1994         };
1995
1996         LASSERT(ksocknal_data.ksnd_init == SOCKNAL_INIT_ALL);
1997         LASSERT(ksocknal_data.ksnd_nnets > 0);
1998
1999         /* prevent new peers */
2000         atomic_add(SOCKNAL_SHUTDOWN_BIAS, &net->ksnn_npeers);
2001
2002         /* Delete all peers */
2003         ksocknal_del_peer(ni, anyid, 0);
2004
2005         /* Wait for all peer_ni state to clean up */
2006         wait_var_event_warning(&net->ksnn_npeers,
2007                                atomic_read(&net->ksnn_npeers) ==
2008                                SOCKNAL_SHUTDOWN_BIAS,
2009                                "waiting for %d peers to disconnect\n",
2010                                ksocknal_debug_peerhash(ni) +
2011                                atomic_read(&net->ksnn_npeers) -
2012                                SOCKNAL_SHUTDOWN_BIAS);
2013
2014         LASSERT(net->ksnn_interface.ksni_npeers == 0);
2015         LASSERT(net->ksnn_interface.ksni_nroutes == 0);
2016
2017         list_del(&net->ksnn_list);
2018         LIBCFS_FREE(net, sizeof(*net));
2019
2020         ksocknal_data.ksnd_nnets--;
2021         if (ksocknal_data.ksnd_nnets == 0)
2022                 ksocknal_base_shutdown();
2023 }
2024
2025 static int
2026 ksocknal_search_new_ipif(struct ksock_net *net)
2027 {
2028         int new_ipif = 0;
2029         char *ifnam = &net->ksnn_interface.ksni_name[0];
2030         char *colon = strchr(ifnam, ':');
2031         bool found = false;
2032         struct ksock_net *tmp;
2033
2034         if (colon != NULL)
2035                 *colon = 0;
2036
2037         list_for_each_entry(tmp, &ksocknal_data.ksnd_nets, ksnn_list) {
2038                 char *ifnam2 = &tmp->ksnn_interface.ksni_name[0];
2039                 char *colon2 = strchr(ifnam2, ':');
2040
2041                 if (colon2 != NULL)
2042                         *colon2 = 0;
2043
2044                 found = strcmp(ifnam, ifnam2) == 0;
2045                 if (colon2 != NULL)
2046                         *colon2 = ':';
2047         }
2048
2049         new_ipif += !found;
2050         if (colon != NULL)
2051                 *colon = ':';
2052
2053         return new_ipif;
2054 }
2055
2056 static int
2057 ksocknal_start_schedulers(struct ksock_sched *sched)
2058 {
2059         int     nthrs;
2060         int     rc = 0;
2061         int     i;
2062
2063         if (sched->kss_nthreads == 0) {
2064                 if (*ksocknal_tunables.ksnd_nscheds > 0) {
2065                         nthrs = sched->kss_nthreads_max;
2066                 } else {
2067                         nthrs = cfs_cpt_weight(lnet_cpt_table(),
2068                                                sched->kss_cpt);
2069                         nthrs = min(max(SOCKNAL_NSCHEDS, nthrs >> 1), nthrs);
2070                         nthrs = min(SOCKNAL_NSCHEDS_HIGH, nthrs);
2071                 }
2072                 nthrs = min(nthrs, sched->kss_nthreads_max);
2073         } else {
2074                 LASSERT(sched->kss_nthreads <= sched->kss_nthreads_max);
2075                 /* increase two threads if there is new interface */
2076                 nthrs = min(2, sched->kss_nthreads_max - sched->kss_nthreads);
2077         }
2078
2079         for (i = 0; i < nthrs; i++) {
2080                 long id;
2081                 char name[20];
2082
2083                 id = KSOCK_THREAD_ID(sched->kss_cpt, sched->kss_nthreads + i);
2084                 snprintf(name, sizeof(name), "socknal_sd%02d_%02d",
2085                          sched->kss_cpt, (int)KSOCK_THREAD_SID(id));
2086
2087                 rc = ksocknal_thread_start(ksocknal_scheduler,
2088                                            (void *)id, name);
2089                 if (rc == 0)
2090                         continue;
2091
2092                 CERROR("Can't spawn thread %d for scheduler[%d]: %d\n",
2093                        sched->kss_cpt, (int) KSOCK_THREAD_SID(id), rc);
2094                 break;
2095         }
2096
2097         sched->kss_nthreads += i;
2098         return rc;
2099 }
2100
2101 static int
2102 ksocknal_net_start_threads(struct ksock_net *net, __u32 *cpts, int ncpts)
2103 {
2104         int newif = ksocknal_search_new_ipif(net);
2105         int rc;
2106         int i;
2107
2108         if (ncpts > 0 && ncpts > cfs_cpt_number(lnet_cpt_table()))
2109                 return -EINVAL;
2110
2111         for (i = 0; i < ncpts; i++) {
2112                 struct ksock_sched *sched;
2113                 int cpt = (cpts == NULL) ? i : cpts[i];
2114
2115                 LASSERT(cpt < cfs_cpt_number(lnet_cpt_table()));
2116                 sched = ksocknal_data.ksnd_schedulers[cpt];
2117
2118                 if (!newif && sched->kss_nthreads > 0)
2119                         continue;
2120
2121                 rc = ksocknal_start_schedulers(sched);
2122                 if (rc != 0)
2123                         return rc;
2124         }
2125         return 0;
2126 }
2127
2128 int
2129 ksocknal_startup(struct lnet_ni *ni)
2130 {
2131         struct ksock_net *net;
2132         struct lnet_ioctl_config_lnd_cmn_tunables *net_tunables;
2133         struct ksock_interface *ksi = NULL;
2134         struct lnet_inetdev *ifaces = NULL;
2135         struct sockaddr_in *sa;
2136         int i = 0;
2137         int rc;
2138
2139         LASSERT (ni->ni_net->net_lnd == &the_ksocklnd);
2140         if (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING) {
2141                 rc = ksocknal_base_startup();
2142                 if (rc != 0)
2143                         return rc;
2144         }
2145         LIBCFS_ALLOC(net, sizeof(*net));
2146         if (net == NULL)
2147                 goto fail_0;
2148         net->ksnn_incarnation = ktime_get_real_ns();
2149         ni->ni_data = net;
2150         net_tunables = &ni->ni_net->net_tunables;
2151         if (net_tunables->lct_peer_timeout == -1)
2152                 net_tunables->lct_peer_timeout =
2153                         *ksocknal_tunables.ksnd_peertimeout;
2154
2155         if (net_tunables->lct_max_tx_credits == -1)
2156                 net_tunables->lct_max_tx_credits =
2157                         *ksocknal_tunables.ksnd_credits;
2158
2159         if (net_tunables->lct_peer_tx_credits == -1)
2160                 net_tunables->lct_peer_tx_credits =
2161                         *ksocknal_tunables.ksnd_peertxcredits;
2162
2163         if (net_tunables->lct_peer_tx_credits >
2164             net_tunables->lct_max_tx_credits)
2165                 net_tunables->lct_peer_tx_credits =
2166                         net_tunables->lct_max_tx_credits;
2167
2168         if (net_tunables->lct_peer_rtr_credits == -1)
2169                 net_tunables->lct_peer_rtr_credits =
2170                         *ksocknal_tunables.ksnd_peerrtrcredits;
2171
2172         rc = lnet_inet_enumerate(&ifaces, ni->ni_net_ns);
2173         if (rc < 0)
2174                 goto fail_1;
2175
2176         ksi = &net->ksnn_interface;
2177
2178         /* Use the first discovered interface or look in the list */
2179         if (ni->ni_interface) {
2180                 for (i = 0; i < rc; i++)
2181                         if (strcmp(ifaces[i].li_name, ni->ni_interface) == 0)
2182                                 break;
2183
2184                 /* ni_interfaces doesn't contain the interface we want */
2185                 if (i == rc) {
2186                         CERROR("ksocklnd: failed to find interface %s\n",
2187                                ni->ni_interface);
2188                         goto fail_1;
2189                 }
2190         }
2191
2192         ni->ni_dev_cpt = ifaces[i].li_cpt;
2193         sa = (void *)&ksi->ksni_addr;
2194         memset(sa, 0, sizeof(*sa));
2195         sa->sin_family = AF_INET;
2196         sa->sin_addr.s_addr = htonl(ifaces[i].li_ipaddr);
2197         ksi->ksni_index = ksocknal_ip2index((struct sockaddr *)sa, ni);
2198         ksi->ksni_netmask = ifaces[i].li_netmask;
2199         strlcpy(ksi->ksni_name, ifaces[i].li_name, sizeof(ksi->ksni_name));
2200
2201         /* call it before add it to ksocknal_data.ksnd_nets */
2202         rc = ksocknal_net_start_threads(net, ni->ni_cpts, ni->ni_ncpts);
2203         if (rc != 0)
2204                 goto fail_1;
2205
2206         LASSERT(ksi);
2207         LASSERT(ksi->ksni_addr.ss_family == AF_INET);
2208         ni->ni_nid = LNET_MKNID(
2209                 LNET_NIDNET(ni->ni_nid),
2210                 ntohl(((struct sockaddr_in *)
2211                        &ksi->ksni_addr)->sin_addr.s_addr));
2212         list_add(&net->ksnn_list, &ksocknal_data.ksnd_nets);
2213         ksocknal_data.ksnd_nnets++;
2214
2215         return 0;
2216
2217 fail_1:
2218         LIBCFS_FREE(net, sizeof(*net));
2219 fail_0:
2220         if (ksocknal_data.ksnd_nnets == 0)
2221                 ksocknal_base_shutdown();
2222
2223         return -ENETDOWN;
2224 }
2225
2226
2227 static void __exit ksocklnd_exit(void)
2228 {
2229         lnet_unregister_lnd(&the_ksocklnd);
2230 }
2231
2232 static const struct lnet_lnd the_ksocklnd = {
2233         .lnd_type               = SOCKLND,
2234         .lnd_startup            = ksocknal_startup,
2235         .lnd_shutdown           = ksocknal_shutdown,
2236         .lnd_ctl                = ksocknal_ctl,
2237         .lnd_send               = ksocknal_send,
2238         .lnd_recv               = ksocknal_recv,
2239         .lnd_notify_peer_down   = ksocknal_notify_gw_down,
2240         .lnd_accept             = ksocknal_accept,
2241 };
2242
2243 static int __init ksocklnd_init(void)
2244 {
2245         int rc;
2246
2247         /* check ksnr_connected/connecting field large enough */
2248         BUILD_BUG_ON(SOCKLND_CONN_NTYPES > 4);
2249         BUILD_BUG_ON(SOCKLND_CONN_ACK != SOCKLND_CONN_BULK_IN);
2250
2251         rc = ksocknal_tunables_init();
2252         if (rc != 0)
2253                 return rc;
2254
2255         lnet_register_lnd(&the_ksocklnd);
2256
2257         return 0;
2258 }
2259
2260 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2261 MODULE_DESCRIPTION("TCP Socket LNet Network Driver");
2262 MODULE_VERSION("2.8.0");
2263 MODULE_LICENSE("GPL");
2264
2265 module_init(ksocklnd_init);
2266 module_exit(ksocklnd_exit);