Whamcloud - gitweb
LU-12678 lnet: discard ksnn_lock
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lnet/klnds/socklnd/socklnd.c
33  *
34  * Author: Zach Brown <zab@zabbo.net>
35  * Author: Peter J. Braam <braam@clusterfs.com>
36  * Author: Phil Schwan <phil@clusterfs.com>
37  * Author: Eric Barton <eric@bartonsoftware.com>
38  */
39
40 #include "socklnd.h"
41 #include <linux/inetdevice.h>
42
43 static struct lnet_lnd the_ksocklnd;
44 struct ksock_nal_data ksocknal_data;
45
46 static struct ksock_interface *
47 ksocknal_ip2iface(struct lnet_ni *ni, __u32 ip)
48 {
49         struct ksock_net *net = ni->ni_data;
50         int i;
51         struct ksock_interface *iface;
52
53         for (i = 0; i < net->ksnn_ninterfaces; i++) {
54                 LASSERT(i < LNET_INTERFACES_NUM);
55                 iface = &net->ksnn_interfaces[i];
56
57                 if (iface->ksni_ipaddr == ip)
58                         return iface;
59         }
60
61         return NULL;
62 }
63
64 static struct ksock_route *
65 ksocknal_create_route(__u32 ipaddr, int port)
66 {
67         struct ksock_route *route;
68
69         LIBCFS_ALLOC (route, sizeof (*route));
70         if (route == NULL)
71                 return (NULL);
72
73         atomic_set (&route->ksnr_refcount, 1);
74         route->ksnr_peer = NULL;
75         route->ksnr_retry_interval = 0;         /* OK to connect at any time */
76         route->ksnr_ipaddr = ipaddr;
77         route->ksnr_port = port;
78         route->ksnr_scheduled = 0;
79         route->ksnr_connecting = 0;
80         route->ksnr_connected = 0;
81         route->ksnr_deleted = 0;
82         route->ksnr_conn_count = 0;
83         route->ksnr_share_count = 0;
84
85         return (route);
86 }
87
88 void
89 ksocknal_destroy_route(struct ksock_route *route)
90 {
91         LASSERT (atomic_read(&route->ksnr_refcount) == 0);
92
93         if (route->ksnr_peer != NULL)
94                 ksocknal_peer_decref(route->ksnr_peer);
95
96         LIBCFS_FREE (route, sizeof (*route));
97 }
98
99 static struct ksock_peer_ni *
100 ksocknal_create_peer(struct lnet_ni *ni, struct lnet_process_id id)
101 {
102         int cpt = lnet_cpt_of_nid(id.nid, ni);
103         struct ksock_net *net = ni->ni_data;
104         struct ksock_peer_ni *peer_ni;
105
106         LASSERT(id.nid != LNET_NID_ANY);
107         LASSERT(id.pid != LNET_PID_ANY);
108         LASSERT(!in_interrupt());
109
110         if (!atomic_inc_unless_negative(&net->ksnn_npeers)) {
111                 CERROR("Can't create peer_ni: network shutdown\n");
112                 return ERR_PTR(-ESHUTDOWN);
113         }
114
115         LIBCFS_CPT_ALLOC(peer_ni, lnet_cpt_table(), cpt, sizeof(*peer_ni));
116         if (!peer_ni) {
117                 atomic_dec(&net->ksnn_npeers);
118                 return ERR_PTR(-ENOMEM);
119         }
120
121         peer_ni->ksnp_ni = ni;
122         peer_ni->ksnp_id = id;
123         atomic_set(&peer_ni->ksnp_refcount, 1); /* 1 ref for caller */
124         peer_ni->ksnp_closing = 0;
125         peer_ni->ksnp_accepting = 0;
126         peer_ni->ksnp_proto = NULL;
127         peer_ni->ksnp_last_alive = 0;
128         peer_ni->ksnp_zc_next_cookie = SOCKNAL_KEEPALIVE_PING + 1;
129
130         INIT_LIST_HEAD(&peer_ni->ksnp_conns);
131         INIT_LIST_HEAD(&peer_ni->ksnp_routes);
132         INIT_LIST_HEAD(&peer_ni->ksnp_tx_queue);
133         INIT_LIST_HEAD(&peer_ni->ksnp_zc_req_list);
134         spin_lock_init(&peer_ni->ksnp_lock);
135
136         return peer_ni;
137 }
138
139 void
140 ksocknal_destroy_peer(struct ksock_peer_ni *peer_ni)
141 {
142         struct ksock_net *net = peer_ni->ksnp_ni->ni_data;
143
144         CDEBUG (D_NET, "peer_ni %s %p deleted\n",
145                 libcfs_id2str(peer_ni->ksnp_id), peer_ni);
146
147         LASSERT(atomic_read(&peer_ni->ksnp_refcount) == 0);
148         LASSERT(peer_ni->ksnp_accepting == 0);
149         LASSERT(list_empty(&peer_ni->ksnp_conns));
150         LASSERT(list_empty(&peer_ni->ksnp_routes));
151         LASSERT(list_empty(&peer_ni->ksnp_tx_queue));
152         LASSERT(list_empty(&peer_ni->ksnp_zc_req_list));
153
154         LIBCFS_FREE(peer_ni, sizeof(*peer_ni));
155
156         /* NB a peer_ni's connections and routes keep a reference on their
157          * peer_ni until they are destroyed, so we can be assured that _all_
158          * state to do with this peer_ni has been cleaned up when its refcount
159          * drops to zero.
160          */
161         atomic_dec(&net->ksnn_npeers);
162 }
163
164 struct ksock_peer_ni *
165 ksocknal_find_peer_locked(struct lnet_ni *ni, struct lnet_process_id id)
166 {
167         struct list_head *peer_list = ksocknal_nid2peerlist(id.nid);
168         struct list_head *tmp;
169         struct ksock_peer_ni *peer_ni;
170
171         list_for_each(tmp, peer_list) {
172                 peer_ni = list_entry(tmp, struct ksock_peer_ni, ksnp_list);
173
174                 LASSERT(!peer_ni->ksnp_closing);
175
176                 if (peer_ni->ksnp_ni != ni)
177                         continue;
178
179                 if (peer_ni->ksnp_id.nid != id.nid ||
180                     peer_ni->ksnp_id.pid != id.pid)
181                         continue;
182
183                 CDEBUG(D_NET, "got peer_ni [%p] -> %s (%d)\n",
184                        peer_ni, libcfs_id2str(id),
185                        atomic_read(&peer_ni->ksnp_refcount));
186                 return peer_ni;
187         }
188         return NULL;
189 }
190
191 struct ksock_peer_ni *
192 ksocknal_find_peer(struct lnet_ni *ni, struct lnet_process_id id)
193 {
194         struct ksock_peer_ni *peer_ni;
195
196         read_lock(&ksocknal_data.ksnd_global_lock);
197         peer_ni = ksocknal_find_peer_locked(ni, id);
198         if (peer_ni != NULL)                    /* +1 ref for caller? */
199                 ksocknal_peer_addref(peer_ni);
200         read_unlock(&ksocknal_data.ksnd_global_lock);
201
202         return (peer_ni);
203 }
204
205 static void
206 ksocknal_unlink_peer_locked(struct ksock_peer_ni *peer_ni)
207 {
208         int i;
209         __u32 ip;
210         struct ksock_interface *iface;
211
212         for (i = 0; i < peer_ni->ksnp_n_passive_ips; i++) {
213                 LASSERT(i < LNET_INTERFACES_NUM);
214                 ip = peer_ni->ksnp_passive_ips[i];
215
216                 iface = ksocknal_ip2iface(peer_ni->ksnp_ni, ip);
217                 /*
218                  * All IPs in peer_ni->ksnp_passive_ips[] come from the
219                  * interface list, therefore the call must succeed.
220                  */
221                 LASSERT(iface != NULL);
222
223                 CDEBUG(D_NET, "peer_ni=%p iface=%p ksni_nroutes=%d\n",
224                        peer_ni, iface, iface->ksni_nroutes);
225                 iface->ksni_npeers--;
226         }
227
228         LASSERT(list_empty(&peer_ni->ksnp_conns));
229         LASSERT(list_empty(&peer_ni->ksnp_routes));
230         LASSERT(!peer_ni->ksnp_closing);
231         peer_ni->ksnp_closing = 1;
232         list_del(&peer_ni->ksnp_list);
233         /* lose peerlist's ref */
234         ksocknal_peer_decref(peer_ni);
235 }
236
237 static int
238 ksocknal_get_peer_info(struct lnet_ni *ni, int index,
239                        struct lnet_process_id *id, __u32 *myip, __u32 *peer_ip,
240                        int *port, int *conn_count, int *share_count)
241 {
242         struct ksock_peer_ni *peer_ni;
243         struct list_head *ptmp;
244         struct ksock_route *route;
245         struct list_head *rtmp;
246         int i;
247         int j;
248         int rc = -ENOENT;
249
250         read_lock(&ksocknal_data.ksnd_global_lock);
251
252         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
253                 list_for_each(ptmp, &ksocknal_data.ksnd_peers[i]) {
254                         peer_ni = list_entry(ptmp, struct ksock_peer_ni, ksnp_list);
255
256                         if (peer_ni->ksnp_ni != ni)
257                                 continue;
258
259                         if (peer_ni->ksnp_n_passive_ips == 0 &&
260                             list_empty(&peer_ni->ksnp_routes)) {
261                                 if (index-- > 0)
262                                         continue;
263
264                                 *id = peer_ni->ksnp_id;
265                                 *myip = 0;
266                                 *peer_ip = 0;
267                                 *port = 0;
268                                 *conn_count = 0;
269                                 *share_count = 0;
270                                 rc = 0;
271                                 goto out;
272                         }
273
274                         for (j = 0; j < peer_ni->ksnp_n_passive_ips; j++) {
275                                 if (index-- > 0)
276                                         continue;
277
278                                 *id = peer_ni->ksnp_id;
279                                 *myip = peer_ni->ksnp_passive_ips[j];
280                                 *peer_ip = 0;
281                                 *port = 0;
282                                 *conn_count = 0;
283                                 *share_count = 0;
284                                 rc = 0;
285                                 goto out;
286                         }
287
288                         list_for_each(rtmp, &peer_ni->ksnp_routes) {
289                                 if (index-- > 0)
290                                         continue;
291
292                                 route = list_entry(rtmp, struct ksock_route,
293                                                    ksnr_list);
294
295                                 *id = peer_ni->ksnp_id;
296                                 *myip = route->ksnr_myipaddr;
297                                 *peer_ip = route->ksnr_ipaddr;
298                                 *port = route->ksnr_port;
299                                 *conn_count = route->ksnr_conn_count;
300                                 *share_count = route->ksnr_share_count;
301                                 rc = 0;
302                                 goto out;
303                         }
304                 }
305         }
306 out:
307         read_unlock(&ksocknal_data.ksnd_global_lock);
308         return rc;
309 }
310
311 static void
312 ksocknal_associate_route_conn_locked(struct ksock_route *route, struct ksock_conn *conn)
313 {
314         struct ksock_peer_ni *peer_ni = route->ksnr_peer;
315         int type = conn->ksnc_type;
316         struct ksock_interface *iface;
317
318         conn->ksnc_route = route;
319         ksocknal_route_addref(route);
320
321         if (route->ksnr_myipaddr != conn->ksnc_myipaddr) {
322                 if (route->ksnr_myipaddr == 0) {
323                         /* route wasn't bound locally yet (the initial route) */
324                         CDEBUG(D_NET, "Binding %s %pI4h to %pI4h\n",
325                                libcfs_id2str(peer_ni->ksnp_id),
326                                &route->ksnr_ipaddr,
327                                &conn->ksnc_myipaddr);
328                 } else {
329                         CDEBUG(D_NET, "Rebinding %s %pI4h from %pI4h "
330                                "to %pI4h\n", libcfs_id2str(peer_ni->ksnp_id),
331                                &route->ksnr_ipaddr,
332                                &route->ksnr_myipaddr,
333                                &conn->ksnc_myipaddr);
334
335                         iface = ksocknal_ip2iface(route->ksnr_peer->ksnp_ni,
336                                                   route->ksnr_myipaddr);
337                         if (iface != NULL)
338                                 iface->ksni_nroutes--;
339                 }
340                 route->ksnr_myipaddr = conn->ksnc_myipaddr;
341                 iface = ksocknal_ip2iface(route->ksnr_peer->ksnp_ni,
342                                           route->ksnr_myipaddr);
343                 if (iface != NULL)
344                         iface->ksni_nroutes++;
345         }
346
347         route->ksnr_connected |= (1<<type);
348         route->ksnr_conn_count++;
349
350         /* Successful connection => further attempts can
351          * proceed immediately */
352         route->ksnr_retry_interval = 0;
353 }
354
355 static void
356 ksocknal_add_route_locked(struct ksock_peer_ni *peer_ni, struct ksock_route *route)
357 {
358         struct list_head *tmp;
359         struct ksock_conn *conn;
360         struct ksock_route *route2;
361
362         LASSERT(!peer_ni->ksnp_closing);
363         LASSERT(route->ksnr_peer == NULL);
364         LASSERT(!route->ksnr_scheduled);
365         LASSERT(!route->ksnr_connecting);
366         LASSERT(route->ksnr_connected == 0);
367
368         /* LASSERT(unique) */
369         list_for_each(tmp, &peer_ni->ksnp_routes) {
370                 route2 = list_entry(tmp, struct ksock_route, ksnr_list);
371
372                 if (route2->ksnr_ipaddr == route->ksnr_ipaddr) {
373                         CERROR("Duplicate route %s %pI4h\n",
374                                libcfs_id2str(peer_ni->ksnp_id),
375                                &route->ksnr_ipaddr);
376                         LBUG();
377                 }
378         }
379
380         route->ksnr_peer = peer_ni;
381         ksocknal_peer_addref(peer_ni);
382         /* peer_ni's routelist takes over my ref on 'route' */
383         list_add_tail(&route->ksnr_list, &peer_ni->ksnp_routes);
384
385         list_for_each(tmp, &peer_ni->ksnp_conns) {
386                 conn = list_entry(tmp, struct ksock_conn, ksnc_list);
387
388                 if (conn->ksnc_ipaddr != route->ksnr_ipaddr)
389                         continue;
390
391                 ksocknal_associate_route_conn_locked(route, conn);
392                 /* keep going (typed routes) */
393         }
394 }
395
396 static void
397 ksocknal_del_route_locked(struct ksock_route *route)
398 {
399         struct ksock_peer_ni *peer_ni = route->ksnr_peer;
400         struct ksock_interface *iface;
401         struct ksock_conn *conn;
402         struct list_head *ctmp;
403         struct list_head *cnxt;
404
405         LASSERT(!route->ksnr_deleted);
406
407         /* Close associated conns */
408         list_for_each_safe(ctmp, cnxt, &peer_ni->ksnp_conns) {
409                 conn = list_entry(ctmp, struct ksock_conn, ksnc_list);
410
411                 if (conn->ksnc_route != route)
412                         continue;
413
414                 ksocknal_close_conn_locked(conn, 0);
415         }
416
417         if (route->ksnr_myipaddr != 0) {
418                 iface = ksocknal_ip2iface(route->ksnr_peer->ksnp_ni,
419                                           route->ksnr_myipaddr);
420                 if (iface != NULL)
421                         iface->ksni_nroutes--;
422         }
423
424         route->ksnr_deleted = 1;
425         list_del(&route->ksnr_list);
426         ksocknal_route_decref(route);           /* drop peer_ni's ref */
427
428         if (list_empty(&peer_ni->ksnp_routes) &&
429             list_empty(&peer_ni->ksnp_conns)) {
430                 /* I've just removed the last route to a peer_ni with no active
431                  * connections */
432                 ksocknal_unlink_peer_locked(peer_ni);
433         }
434 }
435
436 int
437 ksocknal_add_peer(struct lnet_ni *ni, struct lnet_process_id id, __u32 ipaddr,
438                   int port)
439 {
440         struct list_head *tmp;
441         struct ksock_peer_ni *peer_ni;
442         struct ksock_peer_ni *peer2;
443         struct ksock_route *route;
444         struct ksock_route *route2;
445
446         if (id.nid == LNET_NID_ANY ||
447             id.pid == LNET_PID_ANY)
448                 return (-EINVAL);
449
450         /* Have a brand new peer_ni ready... */
451         peer_ni = ksocknal_create_peer(ni, id);
452         if (IS_ERR(peer_ni))
453                 return PTR_ERR(peer_ni);
454
455         route = ksocknal_create_route (ipaddr, port);
456         if (route == NULL) {
457                 ksocknal_peer_decref(peer_ni);
458                 return (-ENOMEM);
459         }
460
461         write_lock_bh(&ksocknal_data.ksnd_global_lock);
462
463         /* always called with a ref on ni, so shutdown can't have started */
464         LASSERT(atomic_read(&((struct ksock_net *)ni->ni_data)->ksnn_npeers)
465                 >= 0);
466
467         peer2 = ksocknal_find_peer_locked(ni, id);
468         if (peer2 != NULL) {
469                 ksocknal_peer_decref(peer_ni);
470                 peer_ni = peer2;
471         } else {
472                 /* peer_ni table takes my ref on peer_ni */
473                 list_add_tail(&peer_ni->ksnp_list,
474                               ksocknal_nid2peerlist(id.nid));
475         }
476
477         route2 = NULL;
478         list_for_each(tmp, &peer_ni->ksnp_routes) {
479                 route2 = list_entry(tmp, struct ksock_route, ksnr_list);
480
481                 if (route2->ksnr_ipaddr == ipaddr)
482                         break;
483
484                 route2 = NULL;
485         }
486         if (route2 == NULL) {
487                 ksocknal_add_route_locked(peer_ni, route);
488                 route->ksnr_share_count++;
489         } else {
490                 ksocknal_route_decref(route);
491                 route2->ksnr_share_count++;
492         }
493
494         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
495
496         return 0;
497 }
498
499 static void
500 ksocknal_del_peer_locked(struct ksock_peer_ni *peer_ni, __u32 ip)
501 {
502         struct ksock_conn *conn;
503         struct ksock_route *route;
504         struct list_head *tmp;
505         struct list_head *nxt;
506         int nshared;
507
508         LASSERT(!peer_ni->ksnp_closing);
509
510         /* Extra ref prevents peer_ni disappearing until I'm done with it */
511         ksocknal_peer_addref(peer_ni);
512
513         list_for_each_safe(tmp, nxt, &peer_ni->ksnp_routes) {
514                 route = list_entry(tmp, struct ksock_route, ksnr_list);
515
516                 /* no match */
517                 if (!(ip == 0 || route->ksnr_ipaddr == ip))
518                         continue;
519
520                 route->ksnr_share_count = 0;
521                 /* This deletes associated conns too */
522                 ksocknal_del_route_locked(route);
523         }
524
525         nshared = 0;
526         list_for_each_safe(tmp, nxt, &peer_ni->ksnp_routes) {
527                 route = list_entry(tmp, struct ksock_route, ksnr_list);
528                 nshared += route->ksnr_share_count;
529         }
530
531         if (nshared == 0) {
532                 /* remove everything else if there are no explicit entries
533                  * left */
534
535                 list_for_each_safe(tmp, nxt, &peer_ni->ksnp_routes) {
536                         route = list_entry(tmp, struct ksock_route, ksnr_list);
537
538                         /* we should only be removing auto-entries */
539                         LASSERT(route->ksnr_share_count == 0);
540                         ksocknal_del_route_locked(route);
541                 }
542
543                 list_for_each_safe(tmp, nxt, &peer_ni->ksnp_conns) {
544                         conn = list_entry(tmp, struct ksock_conn, ksnc_list);
545
546                         ksocknal_close_conn_locked(conn, 0);
547                 }
548         }
549
550         ksocknal_peer_decref(peer_ni);
551         /* NB peer_ni unlinks itself when last conn/route is removed */
552 }
553
554 static int
555 ksocknal_del_peer(struct lnet_ni *ni, struct lnet_process_id id, __u32 ip)
556 {
557         LIST_HEAD(zombies);
558         struct list_head *ptmp;
559         struct list_head *pnxt;
560         struct ksock_peer_ni *peer_ni;
561         int lo;
562         int hi;
563         int i;
564         int rc = -ENOENT;
565
566         write_lock_bh(&ksocknal_data.ksnd_global_lock);
567
568         if (id.nid != LNET_NID_ANY) {
569                 hi = (int)(ksocknal_nid2peerlist(id.nid) -
570                            ksocknal_data.ksnd_peers);
571                 lo = hi;
572         } else {
573                 lo = 0;
574                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
575         }
576
577         for (i = lo; i <= hi; i++) {
578                 list_for_each_safe(ptmp, pnxt,
579                                    &ksocknal_data.ksnd_peers[i]) {
580                         peer_ni = list_entry(ptmp, struct ksock_peer_ni, ksnp_list);
581
582                         if (peer_ni->ksnp_ni != ni)
583                                 continue;
584
585                         if (!((id.nid == LNET_NID_ANY ||
586                                peer_ni->ksnp_id.nid == id.nid) &&
587                               (id.pid == LNET_PID_ANY ||
588                                peer_ni->ksnp_id.pid == id.pid)))
589                                 continue;
590
591                         ksocknal_peer_addref(peer_ni);  /* a ref for me... */
592
593                         ksocknal_del_peer_locked(peer_ni, ip);
594
595                         if (peer_ni->ksnp_closing &&
596                             !list_empty(&peer_ni->ksnp_tx_queue)) {
597                                 LASSERT(list_empty(&peer_ni->ksnp_conns));
598                                 LASSERT(list_empty(&peer_ni->ksnp_routes));
599
600                                 list_splice_init(&peer_ni->ksnp_tx_queue,
601                                                  &zombies);
602                         }
603
604                         ksocknal_peer_decref(peer_ni);  /* ...till here */
605
606                         rc = 0;                         /* matched! */
607                 }
608         }
609
610         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
611
612         ksocknal_txlist_done(ni, &zombies, -ENETDOWN);
613
614         return rc;
615 }
616
617 static struct ksock_conn *
618 ksocknal_get_conn_by_idx(struct lnet_ni *ni, int index)
619 {
620         struct ksock_peer_ni *peer_ni;
621         struct list_head *ptmp;
622         struct ksock_conn *conn;
623         struct list_head *ctmp;
624         int i;
625
626         read_lock(&ksocknal_data.ksnd_global_lock);
627
628         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
629                 list_for_each(ptmp, &ksocknal_data.ksnd_peers[i]) {
630                         peer_ni = list_entry(ptmp, struct ksock_peer_ni, ksnp_list);
631
632                         LASSERT(!peer_ni->ksnp_closing);
633
634                         if (peer_ni->ksnp_ni != ni)
635                                 continue;
636
637                         list_for_each(ctmp, &peer_ni->ksnp_conns) {
638                                 if (index-- > 0)
639                                         continue;
640
641                                 conn = list_entry(ctmp, struct ksock_conn,
642                                                   ksnc_list);
643                                 ksocknal_conn_addref(conn);
644                                 read_unlock(&ksocknal_data. \
645                                             ksnd_global_lock);
646                                 return conn;
647                         }
648                 }
649         }
650
651         read_unlock(&ksocknal_data.ksnd_global_lock);
652         return NULL;
653 }
654
655 static struct ksock_sched *
656 ksocknal_choose_scheduler_locked(unsigned int cpt)
657 {
658         struct ksock_sched *sched = ksocknal_data.ksnd_schedulers[cpt];
659         int i;
660
661         if (sched->kss_nthreads == 0) {
662                 cfs_percpt_for_each(sched, i, ksocknal_data.ksnd_schedulers) {
663                         if (sched->kss_nthreads > 0) {
664                                 CDEBUG(D_NET, "scheduler[%d] has no threads. selected scheduler[%d]\n",
665                                        cpt, sched->kss_cpt);
666                                 return sched;
667                         }
668                 }
669                 return NULL;
670         }
671
672         return sched;
673 }
674
675 static int
676 ksocknal_local_ipvec(struct lnet_ni *ni, __u32 *ipaddrs)
677 {
678         struct ksock_net *net = ni->ni_data;
679         int i;
680         int nip;
681
682         read_lock(&ksocknal_data.ksnd_global_lock);
683
684         nip = net->ksnn_ninterfaces;
685         LASSERT(nip <= LNET_INTERFACES_NUM);
686
687         /*
688          * Only offer interfaces for additional connections if I have
689          * more than one.
690          */
691         if (nip < 2) {
692                 read_unlock(&ksocknal_data.ksnd_global_lock);
693                 return 0;
694         }
695
696         for (i = 0; i < nip; i++) {
697                 ipaddrs[i] = net->ksnn_interfaces[i].ksni_ipaddr;
698                 LASSERT(ipaddrs[i] != 0);
699         }
700
701         read_unlock(&ksocknal_data.ksnd_global_lock);
702         return nip;
703 }
704
705 static int
706 ksocknal_match_peerip(struct ksock_interface *iface, __u32 *ips, int nips)
707 {
708         int best_netmatch = 0;
709         int best_xor = 0;
710         int best = -1;
711         int this_xor;
712         int this_netmatch;
713         int i;
714
715         for (i = 0; i < nips; i++) {
716                 if (ips[i] == 0)
717                         continue;
718
719                 this_xor = (ips[i] ^ iface->ksni_ipaddr);
720                 this_netmatch = ((this_xor & iface->ksni_netmask) == 0) ? 1 : 0;
721
722                 if (!(best < 0 ||
723                       best_netmatch < this_netmatch ||
724                       (best_netmatch == this_netmatch &&
725                        best_xor > this_xor)))
726                         continue;
727
728                 best = i;
729                 best_netmatch = this_netmatch;
730                 best_xor = this_xor;
731         }
732
733         LASSERT (best >= 0);
734         return (best);
735 }
736
737 static int
738 ksocknal_select_ips(struct ksock_peer_ni *peer_ni, __u32 *peerips, int n_peerips)
739 {
740         rwlock_t *global_lock = &ksocknal_data.ksnd_global_lock;
741         struct ksock_net *net = peer_ni->ksnp_ni->ni_data;
742         struct ksock_interface *iface;
743         struct ksock_interface *best_iface;
744         int n_ips;
745         int i;
746         int j;
747         int k;
748         u32 ip;
749         u32 xor;
750         int this_netmatch;
751         int best_netmatch;
752         int best_npeers;
753
754         /* CAVEAT EMPTOR: We do all our interface matching with an
755          * exclusive hold of global lock at IRQ priority.  We're only
756          * expecting to be dealing with small numbers of interfaces, so the
757          * O(n**3)-ness shouldn't matter */
758
759         /* Also note that I'm not going to return more than n_peerips
760          * interfaces, even if I have more myself */
761
762         write_lock_bh(global_lock);
763
764         LASSERT(n_peerips <= LNET_INTERFACES_NUM);
765         LASSERT(net->ksnn_ninterfaces <= LNET_INTERFACES_NUM);
766
767         /* Only match interfaces for additional connections
768          * if I have > 1 interface */
769         n_ips = (net->ksnn_ninterfaces < 2) ? 0 :
770                 MIN(n_peerips, net->ksnn_ninterfaces);
771
772         for (i = 0; peer_ni->ksnp_n_passive_ips < n_ips; i++) {
773                 /*              ^ yes really... */
774
775                 /* If we have any new interfaces, first tick off all the
776                  * peer_ni IPs that match old interfaces, then choose new
777                  * interfaces to match the remaining peer_ni IPS.
778                  * We don't forget interfaces we've stopped using; we might
779                  * start using them again... */
780
781                 if (i < peer_ni->ksnp_n_passive_ips) {
782                         /* Old interface. */
783                         ip = peer_ni->ksnp_passive_ips[i];
784                         best_iface = ksocknal_ip2iface(peer_ni->ksnp_ni, ip);
785
786                         /* peer_ni passive ips are kept up to date */
787                         LASSERT(best_iface != NULL);
788                 } else {
789                         /* choose a new interface */
790                         LASSERT (i == peer_ni->ksnp_n_passive_ips);
791
792                         best_iface = NULL;
793                         best_netmatch = 0;
794                         best_npeers = 0;
795
796                         for (j = 0; j < net->ksnn_ninterfaces; j++) {
797                                 iface = &net->ksnn_interfaces[j];
798                                 ip = iface->ksni_ipaddr;
799
800                                 for (k = 0; k < peer_ni->ksnp_n_passive_ips; k++)
801                                         if (peer_ni->ksnp_passive_ips[k] == ip)
802                                                 break;
803
804                                 if (k < peer_ni->ksnp_n_passive_ips) /* using it already */
805                                         continue;
806
807                                 k = ksocknal_match_peerip(iface, peerips, n_peerips);
808                                 xor = (ip ^ peerips[k]);
809                                 this_netmatch = ((xor & iface->ksni_netmask) == 0) ? 1 : 0;
810
811                                 if (!(best_iface == NULL ||
812                                       best_netmatch < this_netmatch ||
813                                       (best_netmatch == this_netmatch &&
814                                        best_npeers > iface->ksni_npeers)))
815                                         continue;
816
817                                 best_iface = iface;
818                                 best_netmatch = this_netmatch;
819                                 best_npeers = iface->ksni_npeers;
820                         }
821
822                         LASSERT(best_iface != NULL);
823
824                         best_iface->ksni_npeers++;
825                         ip = best_iface->ksni_ipaddr;
826                         peer_ni->ksnp_passive_ips[i] = ip;
827                         peer_ni->ksnp_n_passive_ips = i+1;
828                 }
829
830                 /* mark the best matching peer_ni IP used */
831                 j = ksocknal_match_peerip(best_iface, peerips, n_peerips);
832                 peerips[j] = 0;
833         }
834
835         /* Overwrite input peer_ni IP addresses */
836         memcpy(peerips, peer_ni->ksnp_passive_ips, n_ips * sizeof(*peerips));
837
838         write_unlock_bh(global_lock);
839
840         return (n_ips);
841 }
842
843 static void
844 ksocknal_create_routes(struct ksock_peer_ni *peer_ni, int port,
845                        __u32 *peer_ipaddrs, int npeer_ipaddrs)
846 {
847         struct ksock_route              *newroute = NULL;
848         rwlock_t                *global_lock = &ksocknal_data.ksnd_global_lock;
849         struct lnet_ni *ni = peer_ni->ksnp_ni;
850         struct ksock_net                *net = ni->ni_data;
851         struct list_head        *rtmp;
852         struct ksock_route              *route;
853         struct ksock_interface  *iface;
854         struct ksock_interface  *best_iface;
855         int                     best_netmatch;
856         int                     this_netmatch;
857         int                     best_nroutes;
858         int                     i;
859         int                     j;
860
861         /* CAVEAT EMPTOR: We do all our interface matching with an
862          * exclusive hold of global lock at IRQ priority.  We're only
863          * expecting to be dealing with small numbers of interfaces, so the
864          * O(n**3)-ness here shouldn't matter */
865
866         write_lock_bh(global_lock);
867
868         if (net->ksnn_ninterfaces < 2) {
869                 /* Only create additional connections
870                  * if I have > 1 interface */
871                 write_unlock_bh(global_lock);
872                 return;
873         }
874
875         LASSERT(npeer_ipaddrs <= LNET_INTERFACES_NUM);
876
877         for (i = 0; i < npeer_ipaddrs; i++) {
878                 if (newroute != NULL) {
879                         newroute->ksnr_ipaddr = peer_ipaddrs[i];
880                 } else {
881                         write_unlock_bh(global_lock);
882
883                         newroute = ksocknal_create_route(peer_ipaddrs[i], port);
884                         if (newroute == NULL)
885                                 return;
886
887                         write_lock_bh(global_lock);
888                 }
889
890                 if (peer_ni->ksnp_closing) {
891                         /* peer_ni got closed under me */
892                         break;
893                 }
894
895                 /* Already got a route? */
896                 route = NULL;
897                 list_for_each(rtmp, &peer_ni->ksnp_routes) {
898                         route = list_entry(rtmp, struct ksock_route, ksnr_list);
899
900                         if (route->ksnr_ipaddr == newroute->ksnr_ipaddr)
901                                 break;
902
903                         route = NULL;
904                 }
905                 if (route != NULL)
906                         continue;
907
908                 best_iface = NULL;
909                 best_nroutes = 0;
910                 best_netmatch = 0;
911
912                 LASSERT(net->ksnn_ninterfaces <= LNET_INTERFACES_NUM);
913
914                 /* Select interface to connect from */
915                 for (j = 0; j < net->ksnn_ninterfaces; j++) {
916                         iface = &net->ksnn_interfaces[j];
917
918                         /* Using this interface already? */
919                         list_for_each(rtmp, &peer_ni->ksnp_routes) {
920                                 route = list_entry(rtmp, struct ksock_route,
921                                                    ksnr_list);
922
923                                 if (route->ksnr_myipaddr == iface->ksni_ipaddr)
924                                         break;
925
926                                 route = NULL;
927                         }
928                         if (route != NULL)
929                                 continue;
930
931                         this_netmatch = (((iface->ksni_ipaddr ^
932                                            newroute->ksnr_ipaddr) &
933                                            iface->ksni_netmask) == 0) ? 1 : 0;
934
935                         if (!(best_iface == NULL ||
936                               best_netmatch < this_netmatch ||
937                               (best_netmatch == this_netmatch &&
938                                best_nroutes > iface->ksni_nroutes)))
939                                 continue;
940
941                         best_iface = iface;
942                         best_netmatch = this_netmatch;
943                         best_nroutes = iface->ksni_nroutes;
944                 }
945
946                 if (best_iface == NULL)
947                         continue;
948
949                 newroute->ksnr_myipaddr = best_iface->ksni_ipaddr;
950                 best_iface->ksni_nroutes++;
951
952                 ksocknal_add_route_locked(peer_ni, newroute);
953                 newroute = NULL;
954         }
955
956         write_unlock_bh(global_lock);
957         if (newroute != NULL)
958                 ksocknal_route_decref(newroute);
959 }
960
961 int
962 ksocknal_accept(struct lnet_ni *ni, struct socket *sock)
963 {
964         struct ksock_connreq *cr;
965         int rc;
966         u32 peer_ip;
967         int peer_port;
968
969         rc = lnet_sock_getaddr(sock, true, &peer_ip, &peer_port);
970         LASSERT(rc == 0);               /* we succeeded before */
971
972         LIBCFS_ALLOC(cr, sizeof(*cr));
973         if (cr == NULL) {
974                 LCONSOLE_ERROR_MSG(0x12f, "Dropping connection request from "
975                                    "%pI4h: memory exhausted\n", &peer_ip);
976                 return -ENOMEM;
977         }
978
979         lnet_ni_addref(ni);
980         cr->ksncr_ni   = ni;
981         cr->ksncr_sock = sock;
982
983         spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
984
985         list_add_tail(&cr->ksncr_list, &ksocknal_data.ksnd_connd_connreqs);
986         wake_up(&ksocknal_data.ksnd_connd_waitq);
987
988         spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
989         return 0;
990 }
991
992 static int
993 ksocknal_connecting(struct ksock_peer_ni *peer_ni, __u32 ipaddr)
994 {
995         struct ksock_route *route;
996
997         list_for_each_entry(route, &peer_ni->ksnp_routes, ksnr_list) {
998                 if (route->ksnr_ipaddr == ipaddr)
999                         return route->ksnr_connecting;
1000         }
1001         return 0;
1002 }
1003
1004 int
1005 ksocknal_create_conn(struct lnet_ni *ni, struct ksock_route *route,
1006                      struct socket *sock, int type)
1007 {
1008         rwlock_t *global_lock = &ksocknal_data.ksnd_global_lock;
1009         LIST_HEAD(zombies);
1010         struct lnet_process_id peerid;
1011         struct list_head *tmp;
1012         u64 incarnation;
1013         struct ksock_conn *conn;
1014         struct ksock_conn *conn2;
1015         struct ksock_peer_ni *peer_ni = NULL;
1016         struct ksock_peer_ni *peer2;
1017         struct ksock_sched *sched;
1018         struct ksock_hello_msg *hello;
1019         int cpt;
1020         struct ksock_tx *tx;
1021         struct ksock_tx *txtmp;
1022         int rc;
1023         int rc2;
1024         int active;
1025         char *warn = NULL;
1026
1027         active = (route != NULL);
1028
1029         LASSERT (active == (type != SOCKLND_CONN_NONE));
1030
1031         LIBCFS_ALLOC(conn, sizeof(*conn));
1032         if (conn == NULL) {
1033                 rc = -ENOMEM;
1034                 goto failed_0;
1035         }
1036
1037         conn->ksnc_peer = NULL;
1038         conn->ksnc_route = NULL;
1039         conn->ksnc_sock = sock;
1040         /* 2 ref, 1 for conn, another extra ref prevents socket
1041          * being closed before establishment of connection */
1042         atomic_set (&conn->ksnc_sock_refcount, 2);
1043         conn->ksnc_type = type;
1044         ksocknal_lib_save_callback(sock, conn);
1045         atomic_set (&conn->ksnc_conn_refcount, 1); /* 1 ref for me */
1046
1047         conn->ksnc_rx_ready = 0;
1048         conn->ksnc_rx_scheduled = 0;
1049
1050         INIT_LIST_HEAD(&conn->ksnc_tx_queue);
1051         conn->ksnc_tx_ready = 0;
1052         conn->ksnc_tx_scheduled = 0;
1053         conn->ksnc_tx_carrier = NULL;
1054         atomic_set (&conn->ksnc_tx_nob, 0);
1055
1056         LIBCFS_ALLOC(hello, offsetof(struct ksock_hello_msg,
1057                                      kshm_ips[LNET_INTERFACES_NUM]));
1058         if (hello == NULL) {
1059                 rc = -ENOMEM;
1060                 goto failed_1;
1061         }
1062
1063         /* stash conn's local and remote addrs */
1064         rc = ksocknal_lib_get_conn_addrs (conn);
1065         if (rc != 0)
1066                 goto failed_1;
1067
1068         /* Find out/confirm peer_ni's NID and connection type and get the
1069          * vector of interfaces she's willing to let me connect to.
1070          * Passive connections use the listener timeout since the peer_ni sends
1071          * eagerly */
1072
1073         if (active) {
1074                 peer_ni = route->ksnr_peer;
1075                 LASSERT(ni == peer_ni->ksnp_ni);
1076
1077                 /* Active connection sends HELLO eagerly */
1078                 hello->kshm_nips = ksocknal_local_ipvec(ni, hello->kshm_ips);
1079                 peerid = peer_ni->ksnp_id;
1080
1081                 write_lock_bh(global_lock);
1082                 conn->ksnc_proto = peer_ni->ksnp_proto;
1083                 write_unlock_bh(global_lock);
1084
1085                 if (conn->ksnc_proto == NULL) {
1086                          conn->ksnc_proto = &ksocknal_protocol_v3x;
1087 #if SOCKNAL_VERSION_DEBUG
1088                          if (*ksocknal_tunables.ksnd_protocol == 2)
1089                                  conn->ksnc_proto = &ksocknal_protocol_v2x;
1090                          else if (*ksocknal_tunables.ksnd_protocol == 1)
1091                                  conn->ksnc_proto = &ksocknal_protocol_v1x;
1092 #endif
1093                 }
1094
1095                 rc = ksocknal_send_hello (ni, conn, peerid.nid, hello);
1096                 if (rc != 0)
1097                         goto failed_1;
1098         } else {
1099                 peerid.nid = LNET_NID_ANY;
1100                 peerid.pid = LNET_PID_ANY;
1101
1102                 /* Passive, get protocol from peer_ni */
1103                 conn->ksnc_proto = NULL;
1104         }
1105
1106         rc = ksocknal_recv_hello (ni, conn, hello, &peerid, &incarnation);
1107         if (rc < 0)
1108                 goto failed_1;
1109
1110         LASSERT (rc == 0 || active);
1111         LASSERT (conn->ksnc_proto != NULL);
1112         LASSERT (peerid.nid != LNET_NID_ANY);
1113
1114         cpt = lnet_cpt_of_nid(peerid.nid, ni);
1115
1116         if (active) {
1117                 ksocknal_peer_addref(peer_ni);
1118                 write_lock_bh(global_lock);
1119         } else {
1120                 peer_ni = ksocknal_create_peer(ni, peerid);
1121                 if (IS_ERR(peer_ni)) {
1122                         rc = PTR_ERR(peer_ni);
1123                         goto failed_1;
1124                 }
1125
1126                 write_lock_bh(global_lock);
1127
1128                 /* called with a ref on ni, so shutdown can't have started */
1129                 LASSERT(atomic_read(&((struct ksock_net *)ni->ni_data)->ksnn_npeers) >= 0);
1130
1131                 peer2 = ksocknal_find_peer_locked(ni, peerid);
1132                 if (peer2 == NULL) {
1133                         /* NB this puts an "empty" peer_ni in the peer_ni
1134                          * table (which takes my ref) */
1135                         list_add_tail(&peer_ni->ksnp_list,
1136                                       ksocknal_nid2peerlist(peerid.nid));
1137                 } else {
1138                         ksocknal_peer_decref(peer_ni);
1139                         peer_ni = peer2;
1140                 }
1141
1142                 /* +1 ref for me */
1143                 ksocknal_peer_addref(peer_ni);
1144                 peer_ni->ksnp_accepting++;
1145
1146                 /* Am I already connecting to this guy?  Resolve in
1147                  * favour of higher NID... */
1148                 if (peerid.nid < ni->ni_nid &&
1149                     ksocknal_connecting(peer_ni, conn->ksnc_ipaddr)) {
1150                         rc = EALREADY;
1151                         warn = "connection race resolution";
1152                         goto failed_2;
1153                 }
1154         }
1155
1156         if (peer_ni->ksnp_closing ||
1157             (active && route->ksnr_deleted)) {
1158                 /* peer_ni/route got closed under me */
1159                 rc = -ESTALE;
1160                 warn = "peer_ni/route removed";
1161                 goto failed_2;
1162         }
1163
1164         if (peer_ni->ksnp_proto == NULL) {
1165                 /* Never connected before.
1166                  * NB recv_hello may have returned EPROTO to signal my peer_ni
1167                  * wants a different protocol than the one I asked for.
1168                  */
1169                 LASSERT(list_empty(&peer_ni->ksnp_conns));
1170
1171                 peer_ni->ksnp_proto = conn->ksnc_proto;
1172                 peer_ni->ksnp_incarnation = incarnation;
1173         }
1174
1175         if (peer_ni->ksnp_proto != conn->ksnc_proto ||
1176             peer_ni->ksnp_incarnation != incarnation) {
1177                 /* peer_ni rebooted or I've got the wrong protocol version */
1178                 ksocknal_close_peer_conns_locked(peer_ni, 0, 0);
1179
1180                 peer_ni->ksnp_proto = NULL;
1181                 rc = ESTALE;
1182                 warn = peer_ni->ksnp_incarnation != incarnation ?
1183                        "peer_ni rebooted" :
1184                        "wrong proto version";
1185                 goto failed_2;
1186         }
1187
1188         switch (rc) {
1189         default:
1190                 LBUG();
1191         case 0:
1192                 break;
1193         case EALREADY:
1194                 warn = "lost conn race";
1195                 goto failed_2;
1196         case EPROTO:
1197                 warn = "retry with different protocol version";
1198                 goto failed_2;
1199         }
1200
1201         /* Refuse to duplicate an existing connection, unless this is a
1202          * loopback connection */
1203         if (conn->ksnc_ipaddr != conn->ksnc_myipaddr) {
1204                 list_for_each(tmp, &peer_ni->ksnp_conns) {
1205                         conn2 = list_entry(tmp, struct ksock_conn, ksnc_list);
1206
1207                         if (conn2->ksnc_ipaddr != conn->ksnc_ipaddr ||
1208                             conn2->ksnc_myipaddr != conn->ksnc_myipaddr ||
1209                             conn2->ksnc_type != conn->ksnc_type)
1210                                 continue;
1211
1212                         /* Reply on a passive connection attempt so the peer_ni
1213                          * realises we're connected. */
1214                         LASSERT (rc == 0);
1215                         if (!active)
1216                                 rc = EALREADY;
1217
1218                         warn = "duplicate";
1219                         goto failed_2;
1220                 }
1221         }
1222
1223         /* If the connection created by this route didn't bind to the IP
1224          * address the route connected to, the connection/route matching
1225          * code below probably isn't going to work. */
1226         if (active &&
1227             route->ksnr_ipaddr != conn->ksnc_ipaddr) {
1228                 CERROR("Route %s %pI4h connected to %pI4h\n",
1229                        libcfs_id2str(peer_ni->ksnp_id),
1230                        &route->ksnr_ipaddr,
1231                        &conn->ksnc_ipaddr);
1232         }
1233
1234         /* Search for a route corresponding to the new connection and
1235          * create an association.  This allows incoming connections created
1236          * by routes in my peer_ni to match my own route entries so I don't
1237          * continually create duplicate routes. */
1238         list_for_each(tmp, &peer_ni->ksnp_routes) {
1239                 route = list_entry(tmp, struct ksock_route, ksnr_list);
1240
1241                 if (route->ksnr_ipaddr != conn->ksnc_ipaddr)
1242                         continue;
1243
1244                 ksocknal_associate_route_conn_locked(route, conn);
1245                 break;
1246         }
1247
1248         conn->ksnc_peer = peer_ni;                 /* conn takes my ref on peer_ni */
1249         peer_ni->ksnp_last_alive = ktime_get_seconds();
1250         peer_ni->ksnp_send_keepalive = 0;
1251         peer_ni->ksnp_error = 0;
1252
1253         sched = ksocknal_choose_scheduler_locked(cpt);
1254         if (!sched) {
1255                 CERROR("no schedulers available. node is unhealthy\n");
1256                 goto failed_2;
1257         }
1258         /*
1259          * The cpt might have changed if we ended up selecting a non cpt
1260          * native scheduler. So use the scheduler's cpt instead.
1261          */
1262         cpt = sched->kss_cpt;
1263         sched->kss_nconns++;
1264         conn->ksnc_scheduler = sched;
1265
1266         conn->ksnc_tx_last_post = ktime_get_seconds();
1267         /* Set the deadline for the outgoing HELLO to drain */
1268         conn->ksnc_tx_bufnob = sock->sk->sk_wmem_queued;
1269         conn->ksnc_tx_deadline = ktime_get_seconds() +
1270                                  lnet_get_lnd_timeout();
1271         smp_mb();   /* order with adding to peer_ni's conn list */
1272
1273         list_add(&conn->ksnc_list, &peer_ni->ksnp_conns);
1274         ksocknal_conn_addref(conn);
1275
1276         ksocknal_new_packet(conn, 0);
1277
1278         conn->ksnc_zc_capable = ksocknal_lib_zc_capable(conn);
1279
1280         /* Take packets blocking for this connection. */
1281         list_for_each_entry_safe(tx, txtmp, &peer_ni->ksnp_tx_queue, tx_list) {
1282                 if (conn->ksnc_proto->pro_match_tx(conn, tx, tx->tx_nonblk) ==
1283                     SOCKNAL_MATCH_NO)
1284                         continue;
1285
1286                 list_del(&tx->tx_list);
1287                 ksocknal_queue_tx_locked(tx, conn);
1288         }
1289
1290         write_unlock_bh(global_lock);
1291
1292         /* We've now got a new connection.  Any errors from here on are just
1293          * like "normal" comms errors and we close the connection normally.
1294          * NB (a) we still have to send the reply HELLO for passive
1295          *        connections,
1296          *    (b) normal I/O on the conn is blocked until I setup and call the
1297          *        socket callbacks.
1298          */
1299
1300         CDEBUG(D_NET, "New conn %s p %d.x %pI4h -> %pI4h/%d"
1301                " incarnation:%lld sched[%d]\n",
1302                libcfs_id2str(peerid), conn->ksnc_proto->pro_version,
1303                &conn->ksnc_myipaddr, &conn->ksnc_ipaddr,
1304                conn->ksnc_port, incarnation, cpt);
1305
1306         if (active) {
1307                 /* additional routes after interface exchange? */
1308                 ksocknal_create_routes(peer_ni, conn->ksnc_port,
1309                                        hello->kshm_ips, hello->kshm_nips);
1310         } else {
1311                 hello->kshm_nips = ksocknal_select_ips(peer_ni, hello->kshm_ips,
1312                                                        hello->kshm_nips);
1313                 rc = ksocknal_send_hello(ni, conn, peerid.nid, hello);
1314         }
1315
1316         LIBCFS_FREE(hello, offsetof(struct ksock_hello_msg,
1317                                     kshm_ips[LNET_INTERFACES_NUM]));
1318
1319         /* setup the socket AFTER I've received hello (it disables
1320          * SO_LINGER).  I might call back to the acceptor who may want
1321          * to send a protocol version response and then close the
1322          * socket; this ensures the socket only tears down after the
1323          * response has been sent. */
1324         if (rc == 0)
1325                 rc = ksocknal_lib_setup_sock(sock);
1326
1327         write_lock_bh(global_lock);
1328
1329         /* NB my callbacks block while I hold ksnd_global_lock */
1330         ksocknal_lib_set_callback(sock, conn);
1331
1332         if (!active)
1333                 peer_ni->ksnp_accepting--;
1334
1335         write_unlock_bh(global_lock);
1336
1337         if (rc != 0) {
1338                 write_lock_bh(global_lock);
1339                 if (!conn->ksnc_closing) {
1340                         /* could be closed by another thread */
1341                         ksocknal_close_conn_locked(conn, rc);
1342                 }
1343                 write_unlock_bh(global_lock);
1344         } else if (ksocknal_connsock_addref(conn) == 0) {
1345                 /* Allow I/O to proceed. */
1346                 ksocknal_read_callback(conn);
1347                 ksocknal_write_callback(conn);
1348                 ksocknal_connsock_decref(conn);
1349         }
1350
1351         ksocknal_connsock_decref(conn);
1352         ksocknal_conn_decref(conn);
1353         return rc;
1354
1355 failed_2:
1356         if (!peer_ni->ksnp_closing &&
1357             list_empty(&peer_ni->ksnp_conns) &&
1358             list_empty(&peer_ni->ksnp_routes)) {
1359                 list_add(&zombies, &peer_ni->ksnp_tx_queue);
1360                 list_del_init(&peer_ni->ksnp_tx_queue);
1361                 ksocknal_unlink_peer_locked(peer_ni);
1362         }
1363
1364         write_unlock_bh(global_lock);
1365
1366         if (warn != NULL) {
1367                 if (rc < 0)
1368                         CERROR("Not creating conn %s type %d: %s\n",
1369                                libcfs_id2str(peerid), conn->ksnc_type, warn);
1370                 else
1371                         CDEBUG(D_NET, "Not creating conn %s type %d: %s\n",
1372                               libcfs_id2str(peerid), conn->ksnc_type, warn);
1373         }
1374
1375         if (!active) {
1376                 if (rc > 0) {
1377                         /* Request retry by replying with CONN_NONE
1378                          * ksnc_proto has been set already */
1379                         conn->ksnc_type = SOCKLND_CONN_NONE;
1380                         hello->kshm_nips = 0;
1381                         ksocknal_send_hello(ni, conn, peerid.nid, hello);
1382                 }
1383
1384                 write_lock_bh(global_lock);
1385                 peer_ni->ksnp_accepting--;
1386                 write_unlock_bh(global_lock);
1387         }
1388
1389         /*
1390          * If we get here without an error code, just use -EALREADY.
1391          * Depending on how we got here, the error may be positive
1392          * or negative. Normalize the value for ksocknal_txlist_done().
1393          */
1394         rc2 = (rc == 0 ? -EALREADY : (rc > 0 ? -rc : rc));
1395         ksocknal_txlist_done(ni, &zombies, rc2);
1396         ksocknal_peer_decref(peer_ni);
1397
1398 failed_1:
1399         if (hello != NULL)
1400                 LIBCFS_FREE(hello, offsetof(struct ksock_hello_msg,
1401                                             kshm_ips[LNET_INTERFACES_NUM]));
1402
1403         LIBCFS_FREE(conn, sizeof(*conn));
1404
1405 failed_0:
1406         sock_release(sock);
1407         return rc;
1408 }
1409
1410 void
1411 ksocknal_close_conn_locked(struct ksock_conn *conn, int error)
1412 {
1413         /* This just does the immmediate housekeeping, and queues the
1414          * connection for the reaper to terminate.
1415          * Caller holds ksnd_global_lock exclusively in irq context */
1416         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1417         struct ksock_route *route;
1418         struct ksock_conn *conn2;
1419         struct list_head *tmp;
1420
1421         LASSERT(peer_ni->ksnp_error == 0);
1422         LASSERT(!conn->ksnc_closing);
1423         conn->ksnc_closing = 1;
1424
1425         /* ksnd_deathrow_conns takes over peer_ni's ref */
1426         list_del(&conn->ksnc_list);
1427
1428         route = conn->ksnc_route;
1429         if (route != NULL) {
1430                 /* dissociate conn from route... */
1431                 LASSERT(!route->ksnr_deleted);
1432                 LASSERT((route->ksnr_connected & (1 << conn->ksnc_type)) != 0);
1433
1434                 conn2 = NULL;
1435                 list_for_each(tmp, &peer_ni->ksnp_conns) {
1436                         conn2 = list_entry(tmp, struct ksock_conn, ksnc_list);
1437
1438                         if (conn2->ksnc_route == route &&
1439                             conn2->ksnc_type == conn->ksnc_type)
1440                                 break;
1441
1442                         conn2 = NULL;
1443                 }
1444                 if (conn2 == NULL)
1445                         route->ksnr_connected &= ~(1 << conn->ksnc_type);
1446
1447                 conn->ksnc_route = NULL;
1448
1449                 ksocknal_route_decref(route);   /* drop conn's ref on route */
1450         }
1451
1452         if (list_empty(&peer_ni->ksnp_conns)) {
1453                 /* No more connections to this peer_ni */
1454
1455                 if (!list_empty(&peer_ni->ksnp_tx_queue)) {
1456                         struct ksock_tx *tx;
1457
1458                         LASSERT(conn->ksnc_proto == &ksocknal_protocol_v3x);
1459
1460                         /* throw them to the last connection...,
1461                          * these TXs will be send to /dev/null by scheduler */
1462                         list_for_each_entry(tx, &peer_ni->ksnp_tx_queue,
1463                                             tx_list)
1464                                 ksocknal_tx_prep(conn, tx);
1465
1466                         spin_lock_bh(&conn->ksnc_scheduler->kss_lock);
1467                         list_splice_init(&peer_ni->ksnp_tx_queue,
1468                                          &conn->ksnc_tx_queue);
1469                         spin_unlock_bh(&conn->ksnc_scheduler->kss_lock);
1470                 }
1471
1472                 /* renegotiate protocol version */
1473                 peer_ni->ksnp_proto = NULL;
1474                 /* stash last conn close reason */
1475                 peer_ni->ksnp_error = error;
1476
1477                 if (list_empty(&peer_ni->ksnp_routes)) {
1478                         /* I've just closed last conn belonging to a
1479                          * peer_ni with no routes to it */
1480                         ksocknal_unlink_peer_locked(peer_ni);
1481                 }
1482         }
1483
1484         spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
1485
1486         list_add_tail(&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
1487         wake_up(&ksocknal_data.ksnd_reaper_waitq);
1488
1489         spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
1490 }
1491
1492 void
1493 ksocknal_peer_failed(struct ksock_peer_ni *peer_ni)
1494 {
1495         int notify = 0;
1496         time64_t last_alive = 0;
1497
1498         /* There has been a connection failure or comms error; but I'll only
1499          * tell LNET I think the peer_ni is dead if it's to another kernel and
1500          * there are no connections or connection attempts in existence. */
1501
1502         read_lock(&ksocknal_data.ksnd_global_lock);
1503
1504         if ((peer_ni->ksnp_id.pid & LNET_PID_USERFLAG) == 0 &&
1505              list_empty(&peer_ni->ksnp_conns) &&
1506              peer_ni->ksnp_accepting == 0 &&
1507              ksocknal_find_connecting_route_locked(peer_ni) == NULL) {
1508                 notify = 1;
1509                 last_alive = peer_ni->ksnp_last_alive;
1510         }
1511
1512         read_unlock(&ksocknal_data.ksnd_global_lock);
1513
1514         if (notify)
1515                 lnet_notify(peer_ni->ksnp_ni, peer_ni->ksnp_id.nid,
1516                             false, false, last_alive);
1517 }
1518
1519 void
1520 ksocknal_finalize_zcreq(struct ksock_conn *conn)
1521 {
1522         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1523         struct ksock_tx *tx;
1524         struct ksock_tx *tmp;
1525         LIST_HEAD(zlist);
1526
1527         /* NB safe to finalize TXs because closing of socket will
1528          * abort all buffered data */
1529         LASSERT(conn->ksnc_sock == NULL);
1530
1531         spin_lock(&peer_ni->ksnp_lock);
1532
1533         list_for_each_entry_safe(tx, tmp, &peer_ni->ksnp_zc_req_list, tx_zc_list) {
1534                 if (tx->tx_conn != conn)
1535                         continue;
1536
1537                 LASSERT(tx->tx_msg.ksm_zc_cookies[0] != 0);
1538
1539                 tx->tx_msg.ksm_zc_cookies[0] = 0;
1540                 tx->tx_zc_aborted = 1;  /* mark it as not-acked */
1541                 list_del(&tx->tx_zc_list);
1542                 list_add(&tx->tx_zc_list, &zlist);
1543         }
1544
1545         spin_unlock(&peer_ni->ksnp_lock);
1546
1547         while (!list_empty(&zlist)) {
1548                 tx = list_entry(zlist.next, struct ksock_tx, tx_zc_list);
1549
1550                 list_del(&tx->tx_zc_list);
1551                 ksocknal_tx_decref(tx);
1552         }
1553 }
1554
1555 void
1556 ksocknal_terminate_conn(struct ksock_conn *conn)
1557 {
1558         /* This gets called by the reaper (guaranteed thread context) to
1559          * disengage the socket from its callbacks and close it.
1560          * ksnc_refcount will eventually hit zero, and then the reaper will
1561          * destroy it. */
1562         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1563         struct ksock_sched *sched = conn->ksnc_scheduler;
1564         int failed = 0;
1565
1566         LASSERT(conn->ksnc_closing);
1567
1568         /* wake up the scheduler to "send" all remaining packets to /dev/null */
1569         spin_lock_bh(&sched->kss_lock);
1570
1571         /* a closing conn is always ready to tx */
1572         conn->ksnc_tx_ready = 1;
1573
1574         if (!conn->ksnc_tx_scheduled &&
1575             !list_empty(&conn->ksnc_tx_queue)) {
1576                 list_add_tail(&conn->ksnc_tx_list,
1577                                &sched->kss_tx_conns);
1578                 conn->ksnc_tx_scheduled = 1;
1579                 /* extra ref for scheduler */
1580                 ksocknal_conn_addref(conn);
1581
1582                 wake_up (&sched->kss_waitq);
1583         }
1584
1585         spin_unlock_bh(&sched->kss_lock);
1586
1587         /* serialise with callbacks */
1588         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1589
1590         ksocknal_lib_reset_callback(conn->ksnc_sock, conn);
1591
1592         /* OK, so this conn may not be completely disengaged from its
1593          * scheduler yet, but it _has_ committed to terminate... */
1594         conn->ksnc_scheduler->kss_nconns--;
1595
1596         if (peer_ni->ksnp_error != 0) {
1597                 /* peer_ni's last conn closed in error */
1598                 LASSERT(list_empty(&peer_ni->ksnp_conns));
1599                 failed = 1;
1600                 peer_ni->ksnp_error = 0;     /* avoid multiple notifications */
1601         }
1602
1603         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1604
1605         if (failed)
1606                 ksocknal_peer_failed(peer_ni);
1607
1608         /* The socket is closed on the final put; either here, or in
1609          * ksocknal_{send,recv}msg().  Since we set up the linger2 option
1610          * when the connection was established, this will close the socket
1611          * immediately, aborting anything buffered in it. Any hung
1612          * zero-copy transmits will therefore complete in finite time. */
1613         ksocknal_connsock_decref(conn);
1614 }
1615
1616 void
1617 ksocknal_queue_zombie_conn(struct ksock_conn *conn)
1618 {
1619         /* Queue the conn for the reaper to destroy */
1620         LASSERT(atomic_read(&conn->ksnc_conn_refcount) == 0);
1621         spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
1622
1623         list_add_tail(&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1624         wake_up(&ksocknal_data.ksnd_reaper_waitq);
1625
1626         spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
1627 }
1628
1629 void
1630 ksocknal_destroy_conn(struct ksock_conn *conn)
1631 {
1632         time64_t last_rcv;
1633
1634         /* Final coup-de-grace of the reaper */
1635         CDEBUG (D_NET, "connection %p\n", conn);
1636
1637         LASSERT (atomic_read (&conn->ksnc_conn_refcount) == 0);
1638         LASSERT (atomic_read (&conn->ksnc_sock_refcount) == 0);
1639         LASSERT (conn->ksnc_sock == NULL);
1640         LASSERT (conn->ksnc_route == NULL);
1641         LASSERT (!conn->ksnc_tx_scheduled);
1642         LASSERT (!conn->ksnc_rx_scheduled);
1643         LASSERT(list_empty(&conn->ksnc_tx_queue));
1644
1645         /* complete current receive if any */
1646         switch (conn->ksnc_rx_state) {
1647         case SOCKNAL_RX_LNET_PAYLOAD:
1648                 last_rcv = conn->ksnc_rx_deadline -
1649                            lnet_get_lnd_timeout();
1650                 CERROR("Completing partial receive from %s[%d], "
1651                        "ip %pI4h:%d, with error, wanted: %d, left: %d, "
1652                        "last alive is %lld secs ago\n",
1653                        libcfs_id2str(conn->ksnc_peer->ksnp_id), conn->ksnc_type,
1654                        &conn->ksnc_ipaddr, conn->ksnc_port,
1655                        conn->ksnc_rx_nob_wanted, conn->ksnc_rx_nob_left,
1656                        ktime_get_seconds() - last_rcv);
1657                 if (conn->ksnc_lnet_msg)
1658                         conn->ksnc_lnet_msg->msg_health_status =
1659                                 LNET_MSG_STATUS_REMOTE_ERROR;
1660                 lnet_finalize(conn->ksnc_lnet_msg, -EIO);
1661                 break;
1662         case SOCKNAL_RX_LNET_HEADER:
1663                 if (conn->ksnc_rx_started)
1664                         CERROR("Incomplete receive of lnet header from %s, "
1665                                "ip %pI4h:%d, with error, protocol: %d.x.\n",
1666                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1667                                &conn->ksnc_ipaddr, conn->ksnc_port,
1668                                conn->ksnc_proto->pro_version);
1669                 break;
1670         case SOCKNAL_RX_KSM_HEADER:
1671                 if (conn->ksnc_rx_started)
1672                         CERROR("Incomplete receive of ksock message from %s, "
1673                                "ip %pI4h:%d, with error, protocol: %d.x.\n",
1674                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1675                                &conn->ksnc_ipaddr, conn->ksnc_port,
1676                                conn->ksnc_proto->pro_version);
1677                 break;
1678         case SOCKNAL_RX_SLOP:
1679                 if (conn->ksnc_rx_started)
1680                         CERROR("Incomplete receive of slops from %s, "
1681                                "ip %pI4h:%d, with error\n",
1682                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1683                                &conn->ksnc_ipaddr, conn->ksnc_port);
1684                break;
1685         default:
1686                 LBUG ();
1687                 break;
1688         }
1689
1690         ksocknal_peer_decref(conn->ksnc_peer);
1691
1692         LIBCFS_FREE (conn, sizeof (*conn));
1693 }
1694
1695 int
1696 ksocknal_close_peer_conns_locked(struct ksock_peer_ni *peer_ni, __u32 ipaddr, int why)
1697 {
1698         struct ksock_conn *conn;
1699         struct list_head *ctmp;
1700         struct list_head *cnxt;
1701         int count = 0;
1702
1703         list_for_each_safe(ctmp, cnxt, &peer_ni->ksnp_conns) {
1704                 conn = list_entry(ctmp, struct ksock_conn, ksnc_list);
1705
1706                 if (ipaddr == 0 ||
1707                     conn->ksnc_ipaddr == ipaddr) {
1708                         count++;
1709                         ksocknal_close_conn_locked (conn, why);
1710                 }
1711         }
1712
1713         return (count);
1714 }
1715
1716 int
1717 ksocknal_close_conn_and_siblings(struct ksock_conn *conn, int why)
1718 {
1719         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1720         u32 ipaddr = conn->ksnc_ipaddr;
1721         int count;
1722
1723         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1724
1725         count = ksocknal_close_peer_conns_locked (peer_ni, ipaddr, why);
1726
1727         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1728
1729         return (count);
1730 }
1731
1732 int
1733 ksocknal_close_matching_conns(struct lnet_process_id id, __u32 ipaddr)
1734 {
1735         struct ksock_peer_ni *peer_ni;
1736         struct list_head *ptmp;
1737         struct list_head *pnxt;
1738         int lo;
1739         int hi;
1740         int i;
1741         int count = 0;
1742
1743         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1744
1745         if (id.nid != LNET_NID_ANY)
1746                 lo = hi = (int)(ksocknal_nid2peerlist(id.nid) - ksocknal_data.ksnd_peers);
1747         else {
1748                 lo = 0;
1749                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
1750         }
1751
1752         for (i = lo; i <= hi; i++) {
1753                 list_for_each_safe(ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
1754
1755                         peer_ni = list_entry(ptmp, struct ksock_peer_ni, ksnp_list);
1756
1757                         if (!((id.nid == LNET_NID_ANY || id.nid == peer_ni->ksnp_id.nid) &&
1758                               (id.pid == LNET_PID_ANY || id.pid == peer_ni->ksnp_id.pid)))
1759                                 continue;
1760
1761                         count += ksocknal_close_peer_conns_locked (peer_ni, ipaddr, 0);
1762                 }
1763         }
1764
1765         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1766
1767         /* wildcards always succeed */
1768         if (id.nid == LNET_NID_ANY || id.pid == LNET_PID_ANY || ipaddr == 0)
1769                 return (0);
1770
1771         return (count == 0 ? -ENOENT : 0);
1772 }
1773
1774 void
1775 ksocknal_notify_gw_down(lnet_nid_t gw_nid)
1776 {
1777         /* The router is telling me she's been notified of a change in
1778          * gateway state....
1779          */
1780         struct lnet_process_id id = {
1781                 .nid    = gw_nid,
1782                 .pid    = LNET_PID_ANY,
1783         };
1784
1785         CDEBUG(D_NET, "gw %s down\n", libcfs_nid2str(gw_nid));
1786
1787         /* If the gateway crashed, close all open connections... */
1788         ksocknal_close_matching_conns(id, 0);
1789         return;
1790
1791         /* We can only establish new connections
1792          * if we have autroutes, and these connect on demand. */
1793 }
1794
1795 void
1796 ksocknal_query(struct lnet_ni *ni, lnet_nid_t nid, time64_t *when)
1797 {
1798         int connect = 1;
1799         time64_t last_alive = 0;
1800         time64_t now = ktime_get_seconds();
1801         struct ksock_peer_ni *peer_ni = NULL;
1802         rwlock_t *glock = &ksocknal_data.ksnd_global_lock;
1803         struct lnet_process_id id = {
1804                 .nid = nid,
1805                 .pid = LNET_PID_LUSTRE,
1806         };
1807
1808         read_lock(glock);
1809
1810         peer_ni = ksocknal_find_peer_locked(ni, id);
1811         if (peer_ni != NULL) {
1812                 struct list_head *tmp;
1813                 struct ksock_conn *conn;
1814                 int bufnob;
1815
1816                 list_for_each(tmp, &peer_ni->ksnp_conns) {
1817                         conn = list_entry(tmp, struct ksock_conn, ksnc_list);
1818                         bufnob = conn->ksnc_sock->sk->sk_wmem_queued;
1819
1820                         if (bufnob < conn->ksnc_tx_bufnob) {
1821                                 /* something got ACKed */
1822                                 conn->ksnc_tx_deadline = ktime_get_seconds() +
1823                                                          lnet_get_lnd_timeout();
1824                                 peer_ni->ksnp_last_alive = now;
1825                                 conn->ksnc_tx_bufnob = bufnob;
1826                         }
1827                 }
1828
1829                 last_alive = peer_ni->ksnp_last_alive;
1830                 if (ksocknal_find_connectable_route_locked(peer_ni) == NULL)
1831                         connect = 0;
1832         }
1833
1834         read_unlock(glock);
1835
1836         if (last_alive != 0)
1837                 *when = last_alive;
1838
1839         CDEBUG(D_NET, "peer_ni %s %p, alive %lld secs ago, connect %d\n",
1840                libcfs_nid2str(nid), peer_ni,
1841                last_alive ? now - last_alive : -1,
1842                connect);
1843
1844         if (!connect)
1845                 return;
1846
1847         ksocknal_add_peer(ni, id, LNET_NIDADDR(nid), lnet_acceptor_port());
1848
1849         write_lock_bh(glock);
1850
1851         peer_ni = ksocknal_find_peer_locked(ni, id);
1852         if (peer_ni != NULL)
1853                 ksocknal_launch_all_connections_locked(peer_ni);
1854
1855         write_unlock_bh(glock);
1856 }
1857
1858 static void
1859 ksocknal_push_peer(struct ksock_peer_ni *peer_ni)
1860 {
1861         int index;
1862         int i;
1863         struct list_head *tmp;
1864         struct ksock_conn *conn;
1865
1866         for (index = 0; ; index++) {
1867                 read_lock(&ksocknal_data.ksnd_global_lock);
1868
1869                 i = 0;
1870                 conn = NULL;
1871
1872                 list_for_each(tmp, &peer_ni->ksnp_conns) {
1873                         if (i++ == index) {
1874                                 conn = list_entry(tmp, struct ksock_conn,
1875                                                   ksnc_list);
1876                                 ksocknal_conn_addref(conn);
1877                                 break;
1878                         }
1879                 }
1880
1881                 read_unlock(&ksocknal_data.ksnd_global_lock);
1882
1883                 if (conn == NULL)
1884                         break;
1885
1886                 ksocknal_lib_push_conn (conn);
1887                 ksocknal_conn_decref(conn);
1888         }
1889 }
1890
1891 static int
1892 ksocknal_push(struct lnet_ni *ni, struct lnet_process_id id)
1893 {
1894         struct list_head *start;
1895         struct list_head *end;
1896         struct list_head *tmp;
1897         int               rc = -ENOENT;
1898         unsigned int      hsize = ksocknal_data.ksnd_peer_hash_size;
1899
1900         if (id.nid == LNET_NID_ANY) {
1901                 start = &ksocknal_data.ksnd_peers[0];
1902                 end = &ksocknal_data.ksnd_peers[hsize - 1];
1903         } else {
1904                 start = end = ksocknal_nid2peerlist(id.nid);
1905         }
1906
1907         for (tmp = start; tmp <= end; tmp++) {
1908                 int     peer_off; /* searching offset in peer_ni hash table */
1909
1910                 for (peer_off = 0; ; peer_off++) {
1911                         struct ksock_peer_ni *peer_ni;
1912                         int           i = 0;
1913
1914                         read_lock(&ksocknal_data.ksnd_global_lock);
1915                         list_for_each_entry(peer_ni, tmp, ksnp_list) {
1916                                 if (!((id.nid == LNET_NID_ANY ||
1917                                        id.nid == peer_ni->ksnp_id.nid) &&
1918                                       (id.pid == LNET_PID_ANY ||
1919                                        id.pid == peer_ni->ksnp_id.pid)))
1920                                         continue;
1921
1922                                 if (i++ == peer_off) {
1923                                         ksocknal_peer_addref(peer_ni);
1924                                         break;
1925                                 }
1926                         }
1927                         read_unlock(&ksocknal_data.ksnd_global_lock);
1928
1929                         if (i <= peer_off) /* no match */
1930                                 break;
1931
1932                         rc = 0;
1933                         ksocknal_push_peer(peer_ni);
1934                         ksocknal_peer_decref(peer_ni);
1935                 }
1936         }
1937         return rc;
1938 }
1939
1940 static int
1941 ksocknal_add_interface(struct lnet_ni *ni, __u32 ipaddress, __u32 netmask)
1942 {
1943         struct ksock_net *net = ni->ni_data;
1944         struct ksock_interface *iface;
1945         int rc;
1946         int i;
1947         int j;
1948         struct list_head *ptmp;
1949         struct ksock_peer_ni *peer_ni;
1950         struct list_head *rtmp;
1951         struct ksock_route *route;
1952
1953         if (ipaddress == 0 ||
1954             netmask == 0)
1955                 return -EINVAL;
1956
1957         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1958
1959         iface = ksocknal_ip2iface(ni, ipaddress);
1960         if (iface != NULL) {
1961                 /* silently ignore dups */
1962                 rc = 0;
1963         } else if (net->ksnn_ninterfaces == LNET_INTERFACES_NUM) {
1964                 rc = -ENOSPC;
1965         } else {
1966                 iface = &net->ksnn_interfaces[net->ksnn_ninterfaces++];
1967
1968                 iface->ksni_ipaddr = ipaddress;
1969                 iface->ksni_netmask = netmask;
1970                 iface->ksni_nroutes = 0;
1971                 iface->ksni_npeers = 0;
1972
1973                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1974                         list_for_each(ptmp, &ksocknal_data.ksnd_peers[i]) {
1975                                 peer_ni = list_entry(ptmp, struct ksock_peer_ni,
1976                                                      ksnp_list);
1977
1978                                 for (j = 0; j < peer_ni->ksnp_n_passive_ips; j++)
1979                                         if (peer_ni->ksnp_passive_ips[j] == ipaddress)
1980                                                 iface->ksni_npeers++;
1981
1982                                 list_for_each(rtmp, &peer_ni->ksnp_routes) {
1983                                         route = list_entry(rtmp,
1984                                                            struct ksock_route,
1985                                                            ksnr_list);
1986
1987                                         if (route->ksnr_myipaddr == ipaddress)
1988                                                 iface->ksni_nroutes++;
1989                                 }
1990                         }
1991                 }
1992
1993                 rc = 0;
1994                 /* NB only new connections will pay attention to the new interface! */
1995         }
1996
1997         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1998
1999         return rc;
2000 }
2001
2002 static void
2003 ksocknal_peer_del_interface_locked(struct ksock_peer_ni *peer_ni, __u32 ipaddr)
2004 {
2005         struct list_head *tmp;
2006         struct list_head *nxt;
2007         struct ksock_route *route;
2008         struct ksock_conn *conn;
2009         int i;
2010         int j;
2011
2012         for (i = 0; i < peer_ni->ksnp_n_passive_ips; i++)
2013                 if (peer_ni->ksnp_passive_ips[i] == ipaddr) {
2014                         for (j = i+1; j < peer_ni->ksnp_n_passive_ips; j++)
2015                                 peer_ni->ksnp_passive_ips[j-1] =
2016                                         peer_ni->ksnp_passive_ips[j];
2017                         peer_ni->ksnp_n_passive_ips--;
2018                         break;
2019                 }
2020
2021         list_for_each_safe(tmp, nxt, &peer_ni->ksnp_routes) {
2022                 route = list_entry(tmp, struct ksock_route, ksnr_list);
2023
2024                 if (route->ksnr_myipaddr != ipaddr)
2025                         continue;
2026
2027                 if (route->ksnr_share_count != 0) {
2028                         /* Manually created; keep, but unbind */
2029                         route->ksnr_myipaddr = 0;
2030                 } else {
2031                         ksocknal_del_route_locked(route);
2032                 }
2033         }
2034
2035         list_for_each_safe(tmp, nxt, &peer_ni->ksnp_conns) {
2036                 conn = list_entry(tmp, struct ksock_conn, ksnc_list);
2037
2038                 if (conn->ksnc_myipaddr == ipaddr)
2039                         ksocknal_close_conn_locked (conn, 0);
2040         }
2041 }
2042
2043 static int
2044 ksocknal_del_interface(struct lnet_ni *ni, __u32 ipaddress)
2045 {
2046         struct ksock_net *net = ni->ni_data;
2047         int rc = -ENOENT;
2048         struct list_head *tmp;
2049         struct list_head *nxt;
2050         struct ksock_peer_ni *peer_ni;
2051         u32 this_ip;
2052         int i;
2053         int j;
2054
2055         write_lock_bh(&ksocknal_data.ksnd_global_lock);
2056
2057         for (i = 0; i < net->ksnn_ninterfaces; i++) {
2058                 this_ip = net->ksnn_interfaces[i].ksni_ipaddr;
2059
2060                 if (!(ipaddress == 0 ||
2061                       ipaddress == this_ip))
2062                         continue;
2063
2064                 rc = 0;
2065
2066                 for (j = i+1; j < net->ksnn_ninterfaces; j++)
2067                         net->ksnn_interfaces[j-1] =
2068                                 net->ksnn_interfaces[j];
2069
2070                 net->ksnn_ninterfaces--;
2071
2072                 for (j = 0; j < ksocknal_data.ksnd_peer_hash_size; j++) {
2073                         list_for_each_safe(tmp, nxt,
2074                                            &ksocknal_data.ksnd_peers[j]) {
2075                                 peer_ni = list_entry(tmp, struct ksock_peer_ni,
2076                                                      ksnp_list);
2077
2078                                 if (peer_ni->ksnp_ni != ni)
2079                                         continue;
2080
2081                                 ksocknal_peer_del_interface_locked(peer_ni, this_ip);
2082                         }
2083                 }
2084         }
2085
2086         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
2087
2088         return (rc);
2089 }
2090
2091 int
2092 ksocknal_ctl(struct lnet_ni *ni, unsigned int cmd, void *arg)
2093 {
2094         struct lnet_process_id id = {0};
2095         struct libcfs_ioctl_data *data = arg;
2096         int rc;
2097
2098         switch(cmd) {
2099         case IOC_LIBCFS_GET_INTERFACE: {
2100                 struct ksock_net *net = ni->ni_data;
2101                 struct ksock_interface *iface;
2102
2103                 read_lock(&ksocknal_data.ksnd_global_lock);
2104
2105                 if (data->ioc_count >= (__u32)net->ksnn_ninterfaces) {
2106                         rc = -ENOENT;
2107                 } else {
2108                         rc = 0;
2109                         iface = &net->ksnn_interfaces[data->ioc_count];
2110
2111                         data->ioc_u32[0] = iface->ksni_ipaddr;
2112                         data->ioc_u32[1] = iface->ksni_netmask;
2113                         data->ioc_u32[2] = iface->ksni_npeers;
2114                         data->ioc_u32[3] = iface->ksni_nroutes;
2115                 }
2116
2117                 read_unlock(&ksocknal_data.ksnd_global_lock);
2118                 return rc;
2119         }
2120
2121         case IOC_LIBCFS_ADD_INTERFACE:
2122                 return ksocknal_add_interface(ni,
2123                                               data->ioc_u32[0], /* IP address */
2124                                               data->ioc_u32[1]); /* net mask */
2125
2126         case IOC_LIBCFS_DEL_INTERFACE:
2127                 return ksocknal_del_interface(ni,
2128                                               data->ioc_u32[0]); /* IP address */
2129
2130         case IOC_LIBCFS_GET_PEER: {
2131                 __u32            myip = 0;
2132                 __u32            ip = 0;
2133                 int              port = 0;
2134                 int              conn_count = 0;
2135                 int              share_count = 0;
2136
2137                 rc = ksocknal_get_peer_info(ni, data->ioc_count,
2138                                             &id, &myip, &ip, &port,
2139                                             &conn_count,  &share_count);
2140                 if (rc != 0)
2141                         return rc;
2142
2143                 data->ioc_nid    = id.nid;
2144                 data->ioc_count  = share_count;
2145                 data->ioc_u32[0] = ip;
2146                 data->ioc_u32[1] = port;
2147                 data->ioc_u32[2] = myip;
2148                 data->ioc_u32[3] = conn_count;
2149                 data->ioc_u32[4] = id.pid;
2150                 return 0;
2151         }
2152
2153         case IOC_LIBCFS_ADD_PEER:
2154                 id.nid = data->ioc_nid;
2155                 id.pid = LNET_PID_LUSTRE;
2156                 return ksocknal_add_peer (ni, id,
2157                                           data->ioc_u32[0], /* IP */
2158                                           data->ioc_u32[1]); /* port */
2159
2160         case IOC_LIBCFS_DEL_PEER:
2161                 id.nid = data->ioc_nid;
2162                 id.pid = LNET_PID_ANY;
2163                 return ksocknal_del_peer (ni, id,
2164                                           data->ioc_u32[0]); /* IP */
2165
2166         case IOC_LIBCFS_GET_CONN: {
2167                 int           txmem;
2168                 int           rxmem;
2169                 int           nagle;
2170                 struct ksock_conn *conn = ksocknal_get_conn_by_idx(ni, data->ioc_count);
2171
2172                 if (conn == NULL)
2173                         return -ENOENT;
2174
2175                 ksocknal_lib_get_conn_tunables(conn, &txmem, &rxmem, &nagle);
2176
2177                 data->ioc_count  = txmem;
2178                 data->ioc_nid    = conn->ksnc_peer->ksnp_id.nid;
2179                 data->ioc_flags  = nagle;
2180                 data->ioc_u32[0] = conn->ksnc_ipaddr;
2181                 data->ioc_u32[1] = conn->ksnc_port;
2182                 data->ioc_u32[2] = conn->ksnc_myipaddr;
2183                 data->ioc_u32[3] = conn->ksnc_type;
2184                 data->ioc_u32[4] = conn->ksnc_scheduler->kss_cpt;
2185                 data->ioc_u32[5] = rxmem;
2186                 data->ioc_u32[6] = conn->ksnc_peer->ksnp_id.pid;
2187                 ksocknal_conn_decref(conn);
2188                 return 0;
2189         }
2190
2191         case IOC_LIBCFS_CLOSE_CONNECTION:
2192                 id.nid = data->ioc_nid;
2193                 id.pid = LNET_PID_ANY;
2194                 return ksocknal_close_matching_conns (id,
2195                                                       data->ioc_u32[0]);
2196
2197         case IOC_LIBCFS_REGISTER_MYNID:
2198                 /* Ignore if this is a noop */
2199                 if (data->ioc_nid == ni->ni_nid)
2200                         return 0;
2201
2202                 CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID: %s(%s)\n",
2203                        libcfs_nid2str(data->ioc_nid),
2204                        libcfs_nid2str(ni->ni_nid));
2205                 return -EINVAL;
2206
2207         case IOC_LIBCFS_PUSH_CONNECTION:
2208                 id.nid = data->ioc_nid;
2209                 id.pid = LNET_PID_ANY;
2210                 return ksocknal_push(ni, id);
2211
2212         default:
2213                 return -EINVAL;
2214         }
2215         /* not reached */
2216 }
2217
2218 static void
2219 ksocknal_free_buffers (void)
2220 {
2221         LASSERT (atomic_read(&ksocknal_data.ksnd_nactive_txs) == 0);
2222
2223         if (ksocknal_data.ksnd_schedulers != NULL)
2224                 cfs_percpt_free(ksocknal_data.ksnd_schedulers);
2225
2226         LIBCFS_FREE (ksocknal_data.ksnd_peers,
2227                      sizeof(struct list_head) *
2228                      ksocknal_data.ksnd_peer_hash_size);
2229
2230         spin_lock(&ksocknal_data.ksnd_tx_lock);
2231
2232         if (!list_empty(&ksocknal_data.ksnd_idle_noop_txs)) {
2233                 struct list_head zlist;
2234                 struct ksock_tx *tx;
2235
2236                 list_add(&zlist, &ksocknal_data.ksnd_idle_noop_txs);
2237                 list_del_init(&ksocknal_data.ksnd_idle_noop_txs);
2238                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
2239
2240                 while (!list_empty(&zlist)) {
2241                         tx = list_entry(zlist.next, struct ksock_tx, tx_list);
2242                         list_del(&tx->tx_list);
2243                         LIBCFS_FREE(tx, tx->tx_desc_size);
2244                 }
2245         } else {
2246                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
2247         }
2248 }
2249
2250 static void
2251 ksocknal_base_shutdown(void)
2252 {
2253         struct ksock_sched *sched;
2254         int i;
2255
2256         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
2257                atomic_read (&libcfs_kmemory));
2258         LASSERT (ksocknal_data.ksnd_nnets == 0);
2259
2260         switch (ksocknal_data.ksnd_init) {
2261         default:
2262                 LASSERT(0);
2263                 /* fallthrough */
2264
2265         case SOCKNAL_INIT_ALL:
2266         case SOCKNAL_INIT_DATA:
2267                 LASSERT(ksocknal_data.ksnd_peers != NULL);
2268                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++)
2269                         LASSERT(list_empty(&ksocknal_data.ksnd_peers[i]));
2270
2271                 LASSERT(list_empty(&ksocknal_data.ksnd_nets));
2272                 LASSERT(list_empty(&ksocknal_data.ksnd_enomem_conns));
2273                 LASSERT(list_empty(&ksocknal_data.ksnd_zombie_conns));
2274                 LASSERT(list_empty(&ksocknal_data.ksnd_connd_connreqs));
2275                 LASSERT(list_empty(&ksocknal_data.ksnd_connd_routes));
2276
2277                 if (ksocknal_data.ksnd_schedulers != NULL) {
2278                         cfs_percpt_for_each(sched, i,
2279                                             ksocknal_data.ksnd_schedulers) {
2280
2281                                 LASSERT(list_empty(&sched->kss_tx_conns));
2282                                 LASSERT(list_empty(&sched->kss_rx_conns));
2283                                 LASSERT(list_empty(&sched->kss_zombie_noop_txs));
2284                                 LASSERT(sched->kss_nconns == 0);
2285                         }
2286                 }
2287
2288                 /* flag threads to terminate; wake and wait for them to die */
2289                 ksocknal_data.ksnd_shuttingdown = 1;
2290                 wake_up_all(&ksocknal_data.ksnd_connd_waitq);
2291                 wake_up_all(&ksocknal_data.ksnd_reaper_waitq);
2292
2293                 if (ksocknal_data.ksnd_schedulers != NULL) {
2294                         cfs_percpt_for_each(sched, i,
2295                                             ksocknal_data.ksnd_schedulers)
2296                                         wake_up_all(&sched->kss_waitq);
2297                 }
2298
2299                 i = 4;
2300                 read_lock(&ksocknal_data.ksnd_global_lock);
2301                 while (ksocknal_data.ksnd_nthreads != 0) {
2302                         i++;
2303                         /* power of 2? */
2304                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET,
2305                                 "waiting for %d threads to terminate\n",
2306                                 ksocknal_data.ksnd_nthreads);
2307                         read_unlock(&ksocknal_data.ksnd_global_lock);
2308                         set_current_state(TASK_UNINTERRUPTIBLE);
2309                         schedule_timeout(cfs_time_seconds(1));
2310                         read_lock(&ksocknal_data.ksnd_global_lock);
2311                 }
2312                 read_unlock(&ksocknal_data.ksnd_global_lock);
2313
2314                 ksocknal_free_buffers();
2315
2316                 ksocknal_data.ksnd_init = SOCKNAL_INIT_NOTHING;
2317                 break;
2318         }
2319
2320         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
2321                atomic_read (&libcfs_kmemory));
2322
2323         module_put(THIS_MODULE);
2324 }
2325
2326 static int
2327 ksocknal_base_startup(void)
2328 {
2329         struct ksock_sched *sched;
2330         int rc;
2331         int i;
2332
2333         LASSERT (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
2334         LASSERT (ksocknal_data.ksnd_nnets == 0);
2335
2336         memset (&ksocknal_data, 0, sizeof (ksocknal_data)); /* zero pointers */
2337
2338         ksocknal_data.ksnd_peer_hash_size = SOCKNAL_PEER_HASH_SIZE;
2339         LIBCFS_ALLOC(ksocknal_data.ksnd_peers,
2340                      sizeof(struct list_head) *
2341                      ksocknal_data.ksnd_peer_hash_size);
2342         if (ksocknal_data.ksnd_peers == NULL)
2343                 return -ENOMEM;
2344
2345         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++)
2346                 INIT_LIST_HEAD(&ksocknal_data.ksnd_peers[i]);
2347
2348         rwlock_init(&ksocknal_data.ksnd_global_lock);
2349         INIT_LIST_HEAD(&ksocknal_data.ksnd_nets);
2350
2351         spin_lock_init(&ksocknal_data.ksnd_reaper_lock);
2352         INIT_LIST_HEAD(&ksocknal_data.ksnd_enomem_conns);
2353         INIT_LIST_HEAD(&ksocknal_data.ksnd_zombie_conns);
2354         INIT_LIST_HEAD(&ksocknal_data.ksnd_deathrow_conns);
2355         init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
2356
2357         spin_lock_init(&ksocknal_data.ksnd_connd_lock);
2358         INIT_LIST_HEAD(&ksocknal_data.ksnd_connd_connreqs);
2359         INIT_LIST_HEAD(&ksocknal_data.ksnd_connd_routes);
2360         init_waitqueue_head(&ksocknal_data.ksnd_connd_waitq);
2361
2362         spin_lock_init(&ksocknal_data.ksnd_tx_lock);
2363         INIT_LIST_HEAD(&ksocknal_data.ksnd_idle_noop_txs);
2364
2365         /* NB memset above zeros whole of ksocknal_data */
2366
2367         /* flag lists/ptrs/locks initialised */
2368         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
2369         try_module_get(THIS_MODULE);
2370
2371         /* Create a scheduler block per available CPT */
2372         ksocknal_data.ksnd_schedulers = cfs_percpt_alloc(lnet_cpt_table(),
2373                                                          sizeof(*sched));
2374         if (ksocknal_data.ksnd_schedulers == NULL)
2375                 goto failed;
2376
2377         cfs_percpt_for_each(sched, i, ksocknal_data.ksnd_schedulers) {
2378                 int nthrs;
2379
2380                 /*
2381                  * make sure not to allocate more threads than there are
2382                  * cores/CPUs in teh CPT
2383                  */
2384                 nthrs = cfs_cpt_weight(lnet_cpt_table(), i);
2385                 if (*ksocknal_tunables.ksnd_nscheds > 0) {
2386                         nthrs = min(nthrs, *ksocknal_tunables.ksnd_nscheds);
2387                 } else {
2388                         /*
2389                          * max to half of CPUs, assume another half should be
2390                          * reserved for upper layer modules
2391                          */
2392                         nthrs = min(max(SOCKNAL_NSCHEDS, nthrs >> 1), nthrs);
2393                 }
2394
2395                 sched->kss_nthreads_max = nthrs;
2396                 sched->kss_cpt = i;
2397
2398                 spin_lock_init(&sched->kss_lock);
2399                 INIT_LIST_HEAD(&sched->kss_rx_conns);
2400                 INIT_LIST_HEAD(&sched->kss_tx_conns);
2401                 INIT_LIST_HEAD(&sched->kss_zombie_noop_txs);
2402                 init_waitqueue_head(&sched->kss_waitq);
2403         }
2404
2405         ksocknal_data.ksnd_connd_starting         = 0;
2406         ksocknal_data.ksnd_connd_failed_stamp     = 0;
2407         ksocknal_data.ksnd_connd_starting_stamp   = ktime_get_real_seconds();
2408         /* must have at least 2 connds to remain responsive to accepts while
2409          * connecting */
2410         if (*ksocknal_tunables.ksnd_nconnds < SOCKNAL_CONND_RESV + 1)
2411                 *ksocknal_tunables.ksnd_nconnds = SOCKNAL_CONND_RESV + 1;
2412
2413         if (*ksocknal_tunables.ksnd_nconnds_max <
2414             *ksocknal_tunables.ksnd_nconnds) {
2415                 ksocknal_tunables.ksnd_nconnds_max =
2416                         ksocknal_tunables.ksnd_nconnds;
2417         }
2418
2419         for (i = 0; i < *ksocknal_tunables.ksnd_nconnds; i++) {
2420                 char name[16];
2421                 spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
2422                 ksocknal_data.ksnd_connd_starting++;
2423                 spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
2424
2425
2426                 snprintf(name, sizeof(name), "socknal_cd%02d", i);
2427                 rc = ksocknal_thread_start(ksocknal_connd,
2428                                            (void *)((uintptr_t)i), name);
2429                 if (rc != 0) {
2430                         spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
2431                         ksocknal_data.ksnd_connd_starting--;
2432                         spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
2433                         CERROR("Can't spawn socknal connd: %d\n", rc);
2434                         goto failed;
2435                 }
2436         }
2437
2438         rc = ksocknal_thread_start(ksocknal_reaper, NULL, "socknal_reaper");
2439         if (rc != 0) {
2440                 CERROR ("Can't spawn socknal reaper: %d\n", rc);
2441                 goto failed;
2442         }
2443
2444         /* flag everything initialised */
2445         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
2446
2447         return 0;
2448
2449  failed:
2450         ksocknal_base_shutdown();
2451         return -ENETDOWN;
2452 }
2453
2454 static void
2455 ksocknal_debug_peerhash(struct lnet_ni *ni)
2456 {
2457         struct ksock_peer_ni *peer_ni = NULL;
2458         struct list_head *tmp;
2459         int i;
2460
2461         read_lock(&ksocknal_data.ksnd_global_lock);
2462
2463         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
2464                 list_for_each(tmp, &ksocknal_data.ksnd_peers[i]) {
2465                         peer_ni = list_entry(tmp, struct ksock_peer_ni, ksnp_list);
2466
2467                         if (peer_ni->ksnp_ni == ni)
2468                                 break;
2469
2470                         peer_ni = NULL;
2471                 }
2472         }
2473
2474         if (peer_ni != NULL) {
2475                 struct ksock_route *route;
2476                 struct ksock_conn  *conn;
2477
2478                 CWARN("Active peer_ni on shutdown: %s, ref %d, "
2479                       "closing %d, accepting %d, err %d, zcookie %llu, "
2480                       "txq %d, zc_req %d\n", libcfs_id2str(peer_ni->ksnp_id),
2481                       atomic_read(&peer_ni->ksnp_refcount),
2482                       peer_ni->ksnp_closing,
2483                       peer_ni->ksnp_accepting, peer_ni->ksnp_error,
2484                       peer_ni->ksnp_zc_next_cookie,
2485                       !list_empty(&peer_ni->ksnp_tx_queue),
2486                       !list_empty(&peer_ni->ksnp_zc_req_list));
2487
2488                 list_for_each(tmp, &peer_ni->ksnp_routes) {
2489                         route = list_entry(tmp, struct ksock_route, ksnr_list);
2490                         CWARN("Route: ref %d, schd %d, conn %d, cnted %d, "
2491                               "del %d\n", atomic_read(&route->ksnr_refcount),
2492                               route->ksnr_scheduled, route->ksnr_connecting,
2493                               route->ksnr_connected, route->ksnr_deleted);
2494                 }
2495
2496                 list_for_each(tmp, &peer_ni->ksnp_conns) {
2497                         conn = list_entry(tmp, struct ksock_conn, ksnc_list);
2498                         CWARN("Conn: ref %d, sref %d, t %d, c %d\n",
2499                               atomic_read(&conn->ksnc_conn_refcount),
2500                               atomic_read(&conn->ksnc_sock_refcount),
2501                               conn->ksnc_type, conn->ksnc_closing);
2502                 }
2503         }
2504
2505         read_unlock(&ksocknal_data.ksnd_global_lock);
2506 }
2507
2508 void
2509 ksocknal_shutdown(struct lnet_ni *ni)
2510 {
2511         struct ksock_net *net = ni->ni_data;
2512         struct lnet_process_id anyid = {
2513                 .nid = LNET_NID_ANY,
2514                 .pid = LNET_PID_ANY,
2515         };
2516         int i;
2517
2518         LASSERT(ksocknal_data.ksnd_init == SOCKNAL_INIT_ALL);
2519         LASSERT(ksocknal_data.ksnd_nnets > 0);
2520
2521         /* prevent new peers */
2522         atomic_add(SOCKNAL_SHUTDOWN_BIAS, &net->ksnn_npeers);
2523
2524         /* Delete all peers */
2525         ksocknal_del_peer(ni, anyid, 0);
2526
2527         /* Wait for all peer_ni state to clean up */
2528         i = 2;
2529         while (atomic_read(&net->ksnn_npeers) > SOCKNAL_SHUTDOWN_BIAS) {
2530                 i++;
2531                 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
2532                        "waiting for %d peers to disconnect\n",
2533                        atomic_read(&net->ksnn_npeers) - SOCKNAL_SHUTDOWN_BIAS);
2534                 set_current_state(TASK_UNINTERRUPTIBLE);
2535                 schedule_timeout(cfs_time_seconds(1));
2536
2537                 ksocknal_debug_peerhash(ni);
2538         }
2539
2540         for (i = 0; i < net->ksnn_ninterfaces; i++) {
2541                 LASSERT(net->ksnn_interfaces[i].ksni_npeers == 0);
2542                 LASSERT(net->ksnn_interfaces[i].ksni_nroutes == 0);
2543         }
2544
2545         list_del(&net->ksnn_list);
2546         LIBCFS_FREE(net, sizeof(*net));
2547
2548         ksocknal_data.ksnd_nnets--;
2549         if (ksocknal_data.ksnd_nnets == 0)
2550                 ksocknal_base_shutdown();
2551 }
2552
2553 static int
2554 ksocknal_search_new_ipif(struct ksock_net *net)
2555 {
2556         int new_ipif = 0;
2557         int i;
2558
2559         for (i = 0; i < net->ksnn_ninterfaces; i++) {
2560                 char *ifnam = &net->ksnn_interfaces[i].ksni_name[0];
2561                 char *colon = strchr(ifnam, ':');
2562                 int found  = 0;
2563                 struct ksock_net *tmp;
2564                 int j;
2565
2566                 if (colon != NULL) /* ignore alias device */
2567                         *colon = 0;
2568
2569                 list_for_each_entry(tmp, &ksocknal_data.ksnd_nets,
2570                                         ksnn_list) {
2571                         for (j = 0; !found && j < tmp->ksnn_ninterfaces; j++) {
2572                                 char *ifnam2 = &tmp->ksnn_interfaces[j].\
2573                                              ksni_name[0];
2574                                 char *colon2 = strchr(ifnam2, ':');
2575
2576                                 if (colon2 != NULL)
2577                                         *colon2 = 0;
2578
2579                                 found = strcmp(ifnam, ifnam2) == 0;
2580                                 if (colon2 != NULL)
2581                                         *colon2 = ':';
2582                         }
2583                         if (found)
2584                                 break;
2585                 }
2586
2587                 new_ipif += !found;
2588                 if (colon != NULL)
2589                         *colon = ':';
2590         }
2591
2592         return new_ipif;
2593 }
2594
2595 static int
2596 ksocknal_start_schedulers(struct ksock_sched *sched)
2597 {
2598         int     nthrs;
2599         int     rc = 0;
2600         int     i;
2601
2602         if (sched->kss_nthreads == 0) {
2603                 if (*ksocknal_tunables.ksnd_nscheds > 0) {
2604                         nthrs = sched->kss_nthreads_max;
2605                 } else {
2606                         nthrs = cfs_cpt_weight(lnet_cpt_table(),
2607                                                sched->kss_cpt);
2608                         nthrs = min(max(SOCKNAL_NSCHEDS, nthrs >> 1), nthrs);
2609                         nthrs = min(SOCKNAL_NSCHEDS_HIGH, nthrs);
2610                 }
2611                 nthrs = min(nthrs, sched->kss_nthreads_max);
2612         } else {
2613                 LASSERT(sched->kss_nthreads <= sched->kss_nthreads_max);
2614                 /* increase two threads if there is new interface */
2615                 nthrs = min(2, sched->kss_nthreads_max - sched->kss_nthreads);
2616         }
2617
2618         for (i = 0; i < nthrs; i++) {
2619                 long id;
2620                 char name[20];
2621
2622                 id = KSOCK_THREAD_ID(sched->kss_cpt, sched->kss_nthreads + i);
2623                 snprintf(name, sizeof(name), "socknal_sd%02d_%02d",
2624                          sched->kss_cpt, (int)KSOCK_THREAD_SID(id));
2625
2626                 rc = ksocknal_thread_start(ksocknal_scheduler,
2627                                            (void *)id, name);
2628                 if (rc == 0)
2629                         continue;
2630
2631                 CERROR("Can't spawn thread %d for scheduler[%d]: %d\n",
2632                        sched->kss_cpt, (int) KSOCK_THREAD_SID(id), rc);
2633                 break;
2634         }
2635
2636         sched->kss_nthreads += i;
2637         return rc;
2638 }
2639
2640 static int
2641 ksocknal_net_start_threads(struct ksock_net *net, __u32 *cpts, int ncpts)
2642 {
2643         int newif = ksocknal_search_new_ipif(net);
2644         int rc;
2645         int i;
2646
2647         if (ncpts > 0 && ncpts > cfs_cpt_number(lnet_cpt_table()))
2648                 return -EINVAL;
2649
2650         for (i = 0; i < ncpts; i++) {
2651                 struct ksock_sched *sched;
2652                 int cpt = (cpts == NULL) ? i : cpts[i];
2653
2654                 LASSERT(cpt < cfs_cpt_number(lnet_cpt_table()));
2655                 sched = ksocknal_data.ksnd_schedulers[cpt];
2656
2657                 if (!newif && sched->kss_nthreads > 0)
2658                         continue;
2659
2660                 rc = ksocknal_start_schedulers(sched);
2661                 if (rc != 0)
2662                         return rc;
2663         }
2664         return 0;
2665 }
2666
2667 int
2668 ksocknal_startup(struct lnet_ni *ni)
2669 {
2670         struct ksock_net *net;
2671         struct lnet_ioctl_config_lnd_cmn_tunables *net_tunables;
2672         struct ksock_interface *ksi = NULL;
2673         struct lnet_inetdev *ifaces = NULL;
2674         int i = 0;
2675         int rc;
2676
2677         LASSERT (ni->ni_net->net_lnd == &the_ksocklnd);
2678
2679         if (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING) {
2680                 rc = ksocknal_base_startup();
2681                 if (rc != 0)
2682                         return rc;
2683         }
2684
2685         LIBCFS_ALLOC(net, sizeof(*net));
2686         if (net == NULL)
2687                 goto fail_0;
2688
2689         net->ksnn_incarnation = ktime_get_real_ns();
2690         ni->ni_data = net;
2691         net_tunables = &ni->ni_net->net_tunables;
2692
2693         if (net_tunables->lct_peer_timeout == -1)
2694                 net_tunables->lct_peer_timeout =
2695                         *ksocknal_tunables.ksnd_peertimeout;
2696
2697         if (net_tunables->lct_max_tx_credits == -1)
2698                 net_tunables->lct_max_tx_credits =
2699                         *ksocknal_tunables.ksnd_credits;
2700
2701         if (net_tunables->lct_peer_tx_credits == -1)
2702                 net_tunables->lct_peer_tx_credits =
2703                         *ksocknal_tunables.ksnd_peertxcredits;
2704
2705         if (net_tunables->lct_peer_tx_credits >
2706             net_tunables->lct_max_tx_credits)
2707                 net_tunables->lct_peer_tx_credits =
2708                         net_tunables->lct_max_tx_credits;
2709
2710         if (net_tunables->lct_peer_rtr_credits == -1)
2711                 net_tunables->lct_peer_rtr_credits =
2712                         *ksocknal_tunables.ksnd_peerrtrcredits;
2713
2714         rc = lnet_inet_enumerate(&ifaces, ni->ni_net_ns);
2715         if (rc < 0)
2716                 goto fail_1;
2717
2718         if (!ni->ni_interfaces[0]) {
2719                 ksi = &net->ksnn_interfaces[0];
2720
2721                 /* Use the first discovered interface */
2722                 net->ksnn_ninterfaces = 1;
2723                 ni->ni_dev_cpt = ifaces[0].li_cpt;
2724                 ksi->ksni_ipaddr = ifaces[0].li_ipaddr;
2725                 ksi->ksni_netmask = ifaces[0].li_netmask;
2726                 strlcpy(ksi->ksni_name, ifaces[0].li_name,
2727                         sizeof(ksi->ksni_name));
2728         } else {
2729                 /* Before Multi-Rail ksocklnd would manage
2730                  * multiple interfaces with its own tcp bonding.
2731                  * If we encounter an old configuration using
2732                  * this tcp bonding approach then we need to
2733                  * handle more than one ni_interfaces.
2734                  *
2735                  * In Multi-Rail configuration only ONE ni_interface
2736                  * should exist. Each IP alias should be mapped to
2737                  * each 'struct net_ni'.
2738                  */
2739                 for (i = 0; i < LNET_INTERFACES_NUM; i++) {
2740                         int j;
2741
2742                         if (!ni->ni_interfaces[i])
2743                                 break;
2744
2745                         for (j = 0; j < LNET_INTERFACES_NUM;  j++) {
2746                                 if (i != j && ni->ni_interfaces[j] &&
2747                                     strcmp(ni->ni_interfaces[i],
2748                                            ni->ni_interfaces[j]) == 0) {
2749                                         rc = -EEXIST;
2750                                         CERROR("ksocklnd: found duplicate %s at %d and %d, rc = %d\n",
2751                                                ni->ni_interfaces[i], i, j, rc);
2752                                         goto fail_1;
2753                                 }
2754                         }
2755
2756                         for (j = 0; j < rc; j++) {
2757                                 if (strcmp(ifaces[j].li_name,
2758                                            ni->ni_interfaces[i]) != 0)
2759                                         continue;
2760
2761                                 ksi = &net->ksnn_interfaces[j];
2762                                 ni->ni_dev_cpt = ifaces[j].li_cpt;
2763                                 ksi->ksni_ipaddr = ifaces[j].li_ipaddr;
2764                                 ksi->ksni_netmask = ifaces[j].li_netmask;
2765                                 strlcpy(ksi->ksni_name, ifaces[j].li_name,
2766                                         sizeof(ksi->ksni_name));
2767                                 net->ksnn_ninterfaces++;
2768                                 break;
2769                         }
2770                 }
2771                 /* ni_interfaces don't map to all network interfaces */
2772                 if (!ksi || net->ksnn_ninterfaces != i) {
2773                         CERROR("ksocklnd: requested %d but only %d interfaces found\n",
2774                                i, net->ksnn_ninterfaces);
2775                         goto fail_1;
2776                 }
2777         }
2778
2779         /* call it before add it to ksocknal_data.ksnd_nets */
2780         rc = ksocknal_net_start_threads(net, ni->ni_cpts, ni->ni_ncpts);
2781         if (rc != 0)
2782                 goto fail_1;
2783
2784         LASSERT(ksi);
2785         ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ksi->ksni_ipaddr);
2786         list_add(&net->ksnn_list, &ksocknal_data.ksnd_nets);
2787
2788         ksocknal_data.ksnd_nnets++;
2789
2790         return 0;
2791
2792  fail_1:
2793         LIBCFS_FREE(net, sizeof(*net));
2794  fail_0:
2795         if (ksocknal_data.ksnd_nnets == 0)
2796                 ksocknal_base_shutdown();
2797
2798         return -ENETDOWN;
2799 }
2800
2801
2802 static void __exit ksocklnd_exit(void)
2803 {
2804         lnet_unregister_lnd(&the_ksocklnd);
2805 }
2806
2807 static int __init ksocklnd_init(void)
2808 {
2809         int rc;
2810
2811         /* check ksnr_connected/connecting field large enough */
2812         BUILD_BUG_ON(SOCKLND_CONN_NTYPES > 4);
2813         BUILD_BUG_ON(SOCKLND_CONN_ACK != SOCKLND_CONN_BULK_IN);
2814
2815         /* initialize the_ksocklnd */
2816         the_ksocklnd.lnd_type     = SOCKLND;
2817         the_ksocklnd.lnd_startup  = ksocknal_startup;
2818         the_ksocklnd.lnd_shutdown = ksocknal_shutdown;
2819         the_ksocklnd.lnd_ctl      = ksocknal_ctl;
2820         the_ksocklnd.lnd_send     = ksocknal_send;
2821         the_ksocklnd.lnd_recv     = ksocknal_recv;
2822         the_ksocklnd.lnd_notify_peer_down   = ksocknal_notify_gw_down;
2823         the_ksocklnd.lnd_query    = ksocknal_query;
2824         the_ksocklnd.lnd_accept   = ksocknal_accept;
2825
2826         rc = ksocknal_tunables_init();
2827         if (rc != 0)
2828                 return rc;
2829
2830         lnet_register_lnd(&the_ksocklnd);
2831
2832         return 0;
2833 }
2834
2835 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2836 MODULE_DESCRIPTION("TCP Socket LNet Network Driver");
2837 MODULE_VERSION("2.8.0");
2838 MODULE_LICENSE("GPL");
2839
2840 module_init(ksocklnd_init);
2841 module_exit(ksocklnd_exit);