Whamcloud - gitweb
LU-12678 lnet: use list_move where appropriate.
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lnet/klnds/socklnd/socklnd.c
33  *
34  * Author: Zach Brown <zab@zabbo.net>
35  * Author: Peter J. Braam <braam@clusterfs.com>
36  * Author: Phil Schwan <phil@clusterfs.com>
37  * Author: Eric Barton <eric@bartonsoftware.com>
38  */
39
40 #include "socklnd.h"
41 #include <linux/inetdevice.h>
42
43 static struct lnet_lnd the_ksocklnd;
44 struct ksock_nal_data ksocknal_data;
45
46 static struct ksock_interface *
47 ksocknal_ip2iface(struct lnet_ni *ni, __u32 ip)
48 {
49         struct ksock_net *net = ni->ni_data;
50         int i;
51         struct ksock_interface *iface;
52
53         for (i = 0; i < net->ksnn_ninterfaces; i++) {
54                 LASSERT(i < LNET_INTERFACES_NUM);
55                 iface = &net->ksnn_interfaces[i];
56
57                 if (iface->ksni_ipaddr == ip)
58                         return iface;
59         }
60
61         return NULL;
62 }
63
64 static struct ksock_route *
65 ksocknal_create_route(__u32 ipaddr, int port)
66 {
67         struct ksock_route *route;
68
69         LIBCFS_ALLOC (route, sizeof (*route));
70         if (route == NULL)
71                 return (NULL);
72
73         atomic_set (&route->ksnr_refcount, 1);
74         route->ksnr_peer = NULL;
75         route->ksnr_retry_interval = 0;         /* OK to connect at any time */
76         route->ksnr_ipaddr = ipaddr;
77         route->ksnr_port = port;
78         route->ksnr_scheduled = 0;
79         route->ksnr_connecting = 0;
80         route->ksnr_connected = 0;
81         route->ksnr_deleted = 0;
82         route->ksnr_conn_count = 0;
83         route->ksnr_share_count = 0;
84
85         return (route);
86 }
87
88 void
89 ksocknal_destroy_route(struct ksock_route *route)
90 {
91         LASSERT (atomic_read(&route->ksnr_refcount) == 0);
92
93         if (route->ksnr_peer != NULL)
94                 ksocknal_peer_decref(route->ksnr_peer);
95
96         LIBCFS_FREE (route, sizeof (*route));
97 }
98
99 static struct ksock_peer_ni *
100 ksocknal_create_peer(struct lnet_ni *ni, struct lnet_process_id id)
101 {
102         int cpt = lnet_cpt_of_nid(id.nid, ni);
103         struct ksock_net *net = ni->ni_data;
104         struct ksock_peer_ni *peer_ni;
105
106         LASSERT(id.nid != LNET_NID_ANY);
107         LASSERT(id.pid != LNET_PID_ANY);
108         LASSERT(!in_interrupt());
109
110         if (!atomic_inc_unless_negative(&net->ksnn_npeers)) {
111                 CERROR("Can't create peer_ni: network shutdown\n");
112                 return ERR_PTR(-ESHUTDOWN);
113         }
114
115         LIBCFS_CPT_ALLOC(peer_ni, lnet_cpt_table(), cpt, sizeof(*peer_ni));
116         if (!peer_ni) {
117                 atomic_dec(&net->ksnn_npeers);
118                 return ERR_PTR(-ENOMEM);
119         }
120
121         peer_ni->ksnp_ni = ni;
122         peer_ni->ksnp_id = id;
123         atomic_set(&peer_ni->ksnp_refcount, 1); /* 1 ref for caller */
124         peer_ni->ksnp_closing = 0;
125         peer_ni->ksnp_accepting = 0;
126         peer_ni->ksnp_proto = NULL;
127         peer_ni->ksnp_last_alive = 0;
128         peer_ni->ksnp_zc_next_cookie = SOCKNAL_KEEPALIVE_PING + 1;
129
130         INIT_LIST_HEAD(&peer_ni->ksnp_conns);
131         INIT_LIST_HEAD(&peer_ni->ksnp_routes);
132         INIT_LIST_HEAD(&peer_ni->ksnp_tx_queue);
133         INIT_LIST_HEAD(&peer_ni->ksnp_zc_req_list);
134         spin_lock_init(&peer_ni->ksnp_lock);
135
136         return peer_ni;
137 }
138
139 void
140 ksocknal_destroy_peer(struct ksock_peer_ni *peer_ni)
141 {
142         struct ksock_net *net = peer_ni->ksnp_ni->ni_data;
143
144         CDEBUG (D_NET, "peer_ni %s %p deleted\n",
145                 libcfs_id2str(peer_ni->ksnp_id), peer_ni);
146
147         LASSERT(atomic_read(&peer_ni->ksnp_refcount) == 0);
148         LASSERT(peer_ni->ksnp_accepting == 0);
149         LASSERT(list_empty(&peer_ni->ksnp_conns));
150         LASSERT(list_empty(&peer_ni->ksnp_routes));
151         LASSERT(list_empty(&peer_ni->ksnp_tx_queue));
152         LASSERT(list_empty(&peer_ni->ksnp_zc_req_list));
153
154         LIBCFS_FREE(peer_ni, sizeof(*peer_ni));
155
156         /* NB a peer_ni's connections and routes keep a reference on their
157          * peer_ni until they are destroyed, so we can be assured that _all_
158          * state to do with this peer_ni has been cleaned up when its refcount
159          * drops to zero.
160          */
161         atomic_dec(&net->ksnn_npeers);
162 }
163
164 struct ksock_peer_ni *
165 ksocknal_find_peer_locked(struct lnet_ni *ni, struct lnet_process_id id)
166 {
167         struct list_head *peer_list = ksocknal_nid2peerlist(id.nid);
168         struct list_head *tmp;
169         struct ksock_peer_ni *peer_ni;
170
171         list_for_each(tmp, peer_list) {
172                 peer_ni = list_entry(tmp, struct ksock_peer_ni, ksnp_list);
173
174                 LASSERT(!peer_ni->ksnp_closing);
175
176                 if (peer_ni->ksnp_ni != ni)
177                         continue;
178
179                 if (peer_ni->ksnp_id.nid != id.nid ||
180                     peer_ni->ksnp_id.pid != id.pid)
181                         continue;
182
183                 CDEBUG(D_NET, "got peer_ni [%p] -> %s (%d)\n",
184                        peer_ni, libcfs_id2str(id),
185                        atomic_read(&peer_ni->ksnp_refcount));
186                 return peer_ni;
187         }
188         return NULL;
189 }
190
191 struct ksock_peer_ni *
192 ksocknal_find_peer(struct lnet_ni *ni, struct lnet_process_id id)
193 {
194         struct ksock_peer_ni *peer_ni;
195
196         read_lock(&ksocknal_data.ksnd_global_lock);
197         peer_ni = ksocknal_find_peer_locked(ni, id);
198         if (peer_ni != NULL)                    /* +1 ref for caller? */
199                 ksocknal_peer_addref(peer_ni);
200         read_unlock(&ksocknal_data.ksnd_global_lock);
201
202         return (peer_ni);
203 }
204
205 static void
206 ksocknal_unlink_peer_locked(struct ksock_peer_ni *peer_ni)
207 {
208         int i;
209         __u32 ip;
210         struct ksock_interface *iface;
211
212         for (i = 0; i < peer_ni->ksnp_n_passive_ips; i++) {
213                 LASSERT(i < LNET_INTERFACES_NUM);
214                 ip = peer_ni->ksnp_passive_ips[i];
215
216                 iface = ksocknal_ip2iface(peer_ni->ksnp_ni, ip);
217                 /*
218                  * All IPs in peer_ni->ksnp_passive_ips[] come from the
219                  * interface list, therefore the call must succeed.
220                  */
221                 LASSERT(iface != NULL);
222
223                 CDEBUG(D_NET, "peer_ni=%p iface=%p ksni_nroutes=%d\n",
224                        peer_ni, iface, iface->ksni_nroutes);
225                 iface->ksni_npeers--;
226         }
227
228         LASSERT(list_empty(&peer_ni->ksnp_conns));
229         LASSERT(list_empty(&peer_ni->ksnp_routes));
230         LASSERT(!peer_ni->ksnp_closing);
231         peer_ni->ksnp_closing = 1;
232         list_del(&peer_ni->ksnp_list);
233         /* lose peerlist's ref */
234         ksocknal_peer_decref(peer_ni);
235 }
236
237 static int
238 ksocknal_get_peer_info(struct lnet_ni *ni, int index,
239                        struct lnet_process_id *id, __u32 *myip, __u32 *peer_ip,
240                        int *port, int *conn_count, int *share_count)
241 {
242         struct ksock_peer_ni *peer_ni;
243         struct list_head *ptmp;
244         struct ksock_route *route;
245         struct list_head *rtmp;
246         int i;
247         int j;
248         int rc = -ENOENT;
249
250         read_lock(&ksocknal_data.ksnd_global_lock);
251
252         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
253                 list_for_each(ptmp, &ksocknal_data.ksnd_peers[i]) {
254                         peer_ni = list_entry(ptmp, struct ksock_peer_ni, ksnp_list);
255
256                         if (peer_ni->ksnp_ni != ni)
257                                 continue;
258
259                         if (peer_ni->ksnp_n_passive_ips == 0 &&
260                             list_empty(&peer_ni->ksnp_routes)) {
261                                 if (index-- > 0)
262                                         continue;
263
264                                 *id = peer_ni->ksnp_id;
265                                 *myip = 0;
266                                 *peer_ip = 0;
267                                 *port = 0;
268                                 *conn_count = 0;
269                                 *share_count = 0;
270                                 rc = 0;
271                                 goto out;
272                         }
273
274                         for (j = 0; j < peer_ni->ksnp_n_passive_ips; j++) {
275                                 if (index-- > 0)
276                                         continue;
277
278                                 *id = peer_ni->ksnp_id;
279                                 *myip = peer_ni->ksnp_passive_ips[j];
280                                 *peer_ip = 0;
281                                 *port = 0;
282                                 *conn_count = 0;
283                                 *share_count = 0;
284                                 rc = 0;
285                                 goto out;
286                         }
287
288                         list_for_each(rtmp, &peer_ni->ksnp_routes) {
289                                 if (index-- > 0)
290                                         continue;
291
292                                 route = list_entry(rtmp, struct ksock_route,
293                                                    ksnr_list);
294
295                                 *id = peer_ni->ksnp_id;
296                                 *myip = route->ksnr_myipaddr;
297                                 *peer_ip = route->ksnr_ipaddr;
298                                 *port = route->ksnr_port;
299                                 *conn_count = route->ksnr_conn_count;
300                                 *share_count = route->ksnr_share_count;
301                                 rc = 0;
302                                 goto out;
303                         }
304                 }
305         }
306 out:
307         read_unlock(&ksocknal_data.ksnd_global_lock);
308         return rc;
309 }
310
311 static void
312 ksocknal_associate_route_conn_locked(struct ksock_route *route, struct ksock_conn *conn)
313 {
314         struct ksock_peer_ni *peer_ni = route->ksnr_peer;
315         int type = conn->ksnc_type;
316         struct ksock_interface *iface;
317
318         conn->ksnc_route = route;
319         ksocknal_route_addref(route);
320
321         if (route->ksnr_myipaddr != conn->ksnc_myipaddr) {
322                 if (route->ksnr_myipaddr == 0) {
323                         /* route wasn't bound locally yet (the initial route) */
324                         CDEBUG(D_NET, "Binding %s %pI4h to %pI4h\n",
325                                libcfs_id2str(peer_ni->ksnp_id),
326                                &route->ksnr_ipaddr,
327                                &conn->ksnc_myipaddr);
328                 } else {
329                         CDEBUG(D_NET, "Rebinding %s %pI4h from %pI4h "
330                                "to %pI4h\n", libcfs_id2str(peer_ni->ksnp_id),
331                                &route->ksnr_ipaddr,
332                                &route->ksnr_myipaddr,
333                                &conn->ksnc_myipaddr);
334
335                         iface = ksocknal_ip2iface(route->ksnr_peer->ksnp_ni,
336                                                   route->ksnr_myipaddr);
337                         if (iface != NULL)
338                                 iface->ksni_nroutes--;
339                 }
340                 route->ksnr_myipaddr = conn->ksnc_myipaddr;
341                 iface = ksocknal_ip2iface(route->ksnr_peer->ksnp_ni,
342                                           route->ksnr_myipaddr);
343                 if (iface != NULL)
344                         iface->ksni_nroutes++;
345         }
346
347         route->ksnr_connected |= (1<<type);
348         route->ksnr_conn_count++;
349
350         /* Successful connection => further attempts can
351          * proceed immediately */
352         route->ksnr_retry_interval = 0;
353 }
354
355 static void
356 ksocknal_add_route_locked(struct ksock_peer_ni *peer_ni, struct ksock_route *route)
357 {
358         struct list_head *tmp;
359         struct ksock_conn *conn;
360         struct ksock_route *route2;
361
362         LASSERT(!peer_ni->ksnp_closing);
363         LASSERT(route->ksnr_peer == NULL);
364         LASSERT(!route->ksnr_scheduled);
365         LASSERT(!route->ksnr_connecting);
366         LASSERT(route->ksnr_connected == 0);
367
368         /* LASSERT(unique) */
369         list_for_each(tmp, &peer_ni->ksnp_routes) {
370                 route2 = list_entry(tmp, struct ksock_route, ksnr_list);
371
372                 if (route2->ksnr_ipaddr == route->ksnr_ipaddr) {
373                         CERROR("Duplicate route %s %pI4h\n",
374                                libcfs_id2str(peer_ni->ksnp_id),
375                                &route->ksnr_ipaddr);
376                         LBUG();
377                 }
378         }
379
380         route->ksnr_peer = peer_ni;
381         ksocknal_peer_addref(peer_ni);
382         /* peer_ni's routelist takes over my ref on 'route' */
383         list_add_tail(&route->ksnr_list, &peer_ni->ksnp_routes);
384
385         list_for_each(tmp, &peer_ni->ksnp_conns) {
386                 conn = list_entry(tmp, struct ksock_conn, ksnc_list);
387
388                 if (conn->ksnc_ipaddr != route->ksnr_ipaddr)
389                         continue;
390
391                 ksocknal_associate_route_conn_locked(route, conn);
392                 /* keep going (typed routes) */
393         }
394 }
395
396 static void
397 ksocknal_del_route_locked(struct ksock_route *route)
398 {
399         struct ksock_peer_ni *peer_ni = route->ksnr_peer;
400         struct ksock_interface *iface;
401         struct ksock_conn *conn;
402         struct list_head *ctmp;
403         struct list_head *cnxt;
404
405         LASSERT(!route->ksnr_deleted);
406
407         /* Close associated conns */
408         list_for_each_safe(ctmp, cnxt, &peer_ni->ksnp_conns) {
409                 conn = list_entry(ctmp, struct ksock_conn, ksnc_list);
410
411                 if (conn->ksnc_route != route)
412                         continue;
413
414                 ksocknal_close_conn_locked(conn, 0);
415         }
416
417         if (route->ksnr_myipaddr != 0) {
418                 iface = ksocknal_ip2iface(route->ksnr_peer->ksnp_ni,
419                                           route->ksnr_myipaddr);
420                 if (iface != NULL)
421                         iface->ksni_nroutes--;
422         }
423
424         route->ksnr_deleted = 1;
425         list_del(&route->ksnr_list);
426         ksocknal_route_decref(route);           /* drop peer_ni's ref */
427
428         if (list_empty(&peer_ni->ksnp_routes) &&
429             list_empty(&peer_ni->ksnp_conns)) {
430                 /* I've just removed the last route to a peer_ni with no active
431                  * connections */
432                 ksocknal_unlink_peer_locked(peer_ni);
433         }
434 }
435
436 int
437 ksocknal_add_peer(struct lnet_ni *ni, struct lnet_process_id id, __u32 ipaddr,
438                   int port)
439 {
440         struct list_head *tmp;
441         struct ksock_peer_ni *peer_ni;
442         struct ksock_peer_ni *peer2;
443         struct ksock_route *route;
444         struct ksock_route *route2;
445
446         if (id.nid == LNET_NID_ANY ||
447             id.pid == LNET_PID_ANY)
448                 return (-EINVAL);
449
450         /* Have a brand new peer_ni ready... */
451         peer_ni = ksocknal_create_peer(ni, id);
452         if (IS_ERR(peer_ni))
453                 return PTR_ERR(peer_ni);
454
455         route = ksocknal_create_route (ipaddr, port);
456         if (route == NULL) {
457                 ksocknal_peer_decref(peer_ni);
458                 return (-ENOMEM);
459         }
460
461         write_lock_bh(&ksocknal_data.ksnd_global_lock);
462
463         /* always called with a ref on ni, so shutdown can't have started */
464         LASSERT(atomic_read(&((struct ksock_net *)ni->ni_data)->ksnn_npeers)
465                 >= 0);
466
467         peer2 = ksocknal_find_peer_locked(ni, id);
468         if (peer2 != NULL) {
469                 ksocknal_peer_decref(peer_ni);
470                 peer_ni = peer2;
471         } else {
472                 /* peer_ni table takes my ref on peer_ni */
473                 list_add_tail(&peer_ni->ksnp_list,
474                               ksocknal_nid2peerlist(id.nid));
475         }
476
477         route2 = NULL;
478         list_for_each(tmp, &peer_ni->ksnp_routes) {
479                 route2 = list_entry(tmp, struct ksock_route, ksnr_list);
480
481                 if (route2->ksnr_ipaddr == ipaddr)
482                         break;
483
484                 route2 = NULL;
485         }
486         if (route2 == NULL) {
487                 ksocknal_add_route_locked(peer_ni, route);
488                 route->ksnr_share_count++;
489         } else {
490                 ksocknal_route_decref(route);
491                 route2->ksnr_share_count++;
492         }
493
494         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
495
496         return 0;
497 }
498
499 static void
500 ksocknal_del_peer_locked(struct ksock_peer_ni *peer_ni, __u32 ip)
501 {
502         struct ksock_conn *conn;
503         struct ksock_route *route;
504         struct list_head *tmp;
505         struct list_head *nxt;
506         int nshared;
507
508         LASSERT(!peer_ni->ksnp_closing);
509
510         /* Extra ref prevents peer_ni disappearing until I'm done with it */
511         ksocknal_peer_addref(peer_ni);
512
513         list_for_each_safe(tmp, nxt, &peer_ni->ksnp_routes) {
514                 route = list_entry(tmp, struct ksock_route, ksnr_list);
515
516                 /* no match */
517                 if (!(ip == 0 || route->ksnr_ipaddr == ip))
518                         continue;
519
520                 route->ksnr_share_count = 0;
521                 /* This deletes associated conns too */
522                 ksocknal_del_route_locked(route);
523         }
524
525         nshared = 0;
526         list_for_each_safe(tmp, nxt, &peer_ni->ksnp_routes) {
527                 route = list_entry(tmp, struct ksock_route, ksnr_list);
528                 nshared += route->ksnr_share_count;
529         }
530
531         if (nshared == 0) {
532                 /* remove everything else if there are no explicit entries
533                  * left */
534
535                 list_for_each_safe(tmp, nxt, &peer_ni->ksnp_routes) {
536                         route = list_entry(tmp, struct ksock_route, ksnr_list);
537
538                         /* we should only be removing auto-entries */
539                         LASSERT(route->ksnr_share_count == 0);
540                         ksocknal_del_route_locked(route);
541                 }
542
543                 list_for_each_safe(tmp, nxt, &peer_ni->ksnp_conns) {
544                         conn = list_entry(tmp, struct ksock_conn, ksnc_list);
545
546                         ksocknal_close_conn_locked(conn, 0);
547                 }
548         }
549
550         ksocknal_peer_decref(peer_ni);
551         /* NB peer_ni unlinks itself when last conn/route is removed */
552 }
553
554 static int
555 ksocknal_del_peer(struct lnet_ni *ni, struct lnet_process_id id, __u32 ip)
556 {
557         LIST_HEAD(zombies);
558         struct list_head *ptmp;
559         struct list_head *pnxt;
560         struct ksock_peer_ni *peer_ni;
561         int lo;
562         int hi;
563         int i;
564         int rc = -ENOENT;
565
566         write_lock_bh(&ksocknal_data.ksnd_global_lock);
567
568         if (id.nid != LNET_NID_ANY) {
569                 hi = (int)(ksocknal_nid2peerlist(id.nid) -
570                            ksocknal_data.ksnd_peers);
571                 lo = hi;
572         } else {
573                 lo = 0;
574                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
575         }
576
577         for (i = lo; i <= hi; i++) {
578                 list_for_each_safe(ptmp, pnxt,
579                                    &ksocknal_data.ksnd_peers[i]) {
580                         peer_ni = list_entry(ptmp, struct ksock_peer_ni, ksnp_list);
581
582                         if (peer_ni->ksnp_ni != ni)
583                                 continue;
584
585                         if (!((id.nid == LNET_NID_ANY ||
586                                peer_ni->ksnp_id.nid == id.nid) &&
587                               (id.pid == LNET_PID_ANY ||
588                                peer_ni->ksnp_id.pid == id.pid)))
589                                 continue;
590
591                         ksocknal_peer_addref(peer_ni);  /* a ref for me... */
592
593                         ksocknal_del_peer_locked(peer_ni, ip);
594
595                         if (peer_ni->ksnp_closing &&
596                             !list_empty(&peer_ni->ksnp_tx_queue)) {
597                                 LASSERT(list_empty(&peer_ni->ksnp_conns));
598                                 LASSERT(list_empty(&peer_ni->ksnp_routes));
599
600                                 list_splice_init(&peer_ni->ksnp_tx_queue,
601                                                  &zombies);
602                         }
603
604                         ksocknal_peer_decref(peer_ni);  /* ...till here */
605
606                         rc = 0;                         /* matched! */
607                 }
608         }
609
610         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
611
612         ksocknal_txlist_done(ni, &zombies, -ENETDOWN);
613
614         return rc;
615 }
616
617 static struct ksock_conn *
618 ksocknal_get_conn_by_idx(struct lnet_ni *ni, int index)
619 {
620         struct ksock_peer_ni *peer_ni;
621         struct list_head *ptmp;
622         struct ksock_conn *conn;
623         struct list_head *ctmp;
624         int i;
625
626         read_lock(&ksocknal_data.ksnd_global_lock);
627
628         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
629                 list_for_each(ptmp, &ksocknal_data.ksnd_peers[i]) {
630                         peer_ni = list_entry(ptmp, struct ksock_peer_ni, ksnp_list);
631
632                         LASSERT(!peer_ni->ksnp_closing);
633
634                         if (peer_ni->ksnp_ni != ni)
635                                 continue;
636
637                         list_for_each(ctmp, &peer_ni->ksnp_conns) {
638                                 if (index-- > 0)
639                                         continue;
640
641                                 conn = list_entry(ctmp, struct ksock_conn,
642                                                   ksnc_list);
643                                 ksocknal_conn_addref(conn);
644                                 read_unlock(&ksocknal_data. \
645                                             ksnd_global_lock);
646                                 return conn;
647                         }
648                 }
649         }
650
651         read_unlock(&ksocknal_data.ksnd_global_lock);
652         return NULL;
653 }
654
655 static struct ksock_sched *
656 ksocknal_choose_scheduler_locked(unsigned int cpt)
657 {
658         struct ksock_sched *sched = ksocknal_data.ksnd_schedulers[cpt];
659         int i;
660
661         if (sched->kss_nthreads == 0) {
662                 cfs_percpt_for_each(sched, i, ksocknal_data.ksnd_schedulers) {
663                         if (sched->kss_nthreads > 0) {
664                                 CDEBUG(D_NET, "scheduler[%d] has no threads. selected scheduler[%d]\n",
665                                        cpt, sched->kss_cpt);
666                                 return sched;
667                         }
668                 }
669                 return NULL;
670         }
671
672         return sched;
673 }
674
675 static int
676 ksocknal_local_ipvec(struct lnet_ni *ni, __u32 *ipaddrs)
677 {
678         struct ksock_net *net = ni->ni_data;
679         int i;
680         int nip;
681
682         read_lock(&ksocknal_data.ksnd_global_lock);
683
684         nip = net->ksnn_ninterfaces;
685         LASSERT(nip <= LNET_INTERFACES_NUM);
686
687         /*
688          * Only offer interfaces for additional connections if I have
689          * more than one.
690          */
691         if (nip < 2) {
692                 read_unlock(&ksocknal_data.ksnd_global_lock);
693                 return 0;
694         }
695
696         for (i = 0; i < nip; i++) {
697                 ipaddrs[i] = net->ksnn_interfaces[i].ksni_ipaddr;
698                 LASSERT(ipaddrs[i] != 0);
699         }
700
701         read_unlock(&ksocknal_data.ksnd_global_lock);
702         return nip;
703 }
704
705 static int
706 ksocknal_match_peerip(struct ksock_interface *iface, __u32 *ips, int nips)
707 {
708         int best_netmatch = 0;
709         int best_xor = 0;
710         int best = -1;
711         int this_xor;
712         int this_netmatch;
713         int i;
714
715         for (i = 0; i < nips; i++) {
716                 if (ips[i] == 0)
717                         continue;
718
719                 this_xor = (ips[i] ^ iface->ksni_ipaddr);
720                 this_netmatch = ((this_xor & iface->ksni_netmask) == 0) ? 1 : 0;
721
722                 if (!(best < 0 ||
723                       best_netmatch < this_netmatch ||
724                       (best_netmatch == this_netmatch &&
725                        best_xor > this_xor)))
726                         continue;
727
728                 best = i;
729                 best_netmatch = this_netmatch;
730                 best_xor = this_xor;
731         }
732
733         LASSERT (best >= 0);
734         return (best);
735 }
736
737 static int
738 ksocknal_select_ips(struct ksock_peer_ni *peer_ni, __u32 *peerips, int n_peerips)
739 {
740         rwlock_t *global_lock = &ksocknal_data.ksnd_global_lock;
741         struct ksock_net *net = peer_ni->ksnp_ni->ni_data;
742         struct ksock_interface *iface;
743         struct ksock_interface *best_iface;
744         int n_ips;
745         int i;
746         int j;
747         int k;
748         u32 ip;
749         u32 xor;
750         int this_netmatch;
751         int best_netmatch;
752         int best_npeers;
753
754         /* CAVEAT EMPTOR: We do all our interface matching with an
755          * exclusive hold of global lock at IRQ priority.  We're only
756          * expecting to be dealing with small numbers of interfaces, so the
757          * O(n**3)-ness shouldn't matter */
758
759         /* Also note that I'm not going to return more than n_peerips
760          * interfaces, even if I have more myself */
761
762         write_lock_bh(global_lock);
763
764         LASSERT(n_peerips <= LNET_INTERFACES_NUM);
765         LASSERT(net->ksnn_ninterfaces <= LNET_INTERFACES_NUM);
766
767         /* Only match interfaces for additional connections
768          * if I have > 1 interface */
769         n_ips = (net->ksnn_ninterfaces < 2) ? 0 :
770                 MIN(n_peerips, net->ksnn_ninterfaces);
771
772         for (i = 0; peer_ni->ksnp_n_passive_ips < n_ips; i++) {
773                 /*              ^ yes really... */
774
775                 /* If we have any new interfaces, first tick off all the
776                  * peer_ni IPs that match old interfaces, then choose new
777                  * interfaces to match the remaining peer_ni IPS.
778                  * We don't forget interfaces we've stopped using; we might
779                  * start using them again... */
780
781                 if (i < peer_ni->ksnp_n_passive_ips) {
782                         /* Old interface. */
783                         ip = peer_ni->ksnp_passive_ips[i];
784                         best_iface = ksocknal_ip2iface(peer_ni->ksnp_ni, ip);
785
786                         /* peer_ni passive ips are kept up to date */
787                         LASSERT(best_iface != NULL);
788                 } else {
789                         /* choose a new interface */
790                         LASSERT (i == peer_ni->ksnp_n_passive_ips);
791
792                         best_iface = NULL;
793                         best_netmatch = 0;
794                         best_npeers = 0;
795
796                         for (j = 0; j < net->ksnn_ninterfaces; j++) {
797                                 iface = &net->ksnn_interfaces[j];
798                                 ip = iface->ksni_ipaddr;
799
800                                 for (k = 0; k < peer_ni->ksnp_n_passive_ips; k++)
801                                         if (peer_ni->ksnp_passive_ips[k] == ip)
802                                                 break;
803
804                                 if (k < peer_ni->ksnp_n_passive_ips) /* using it already */
805                                         continue;
806
807                                 k = ksocknal_match_peerip(iface, peerips, n_peerips);
808                                 xor = (ip ^ peerips[k]);
809                                 this_netmatch = ((xor & iface->ksni_netmask) == 0) ? 1 : 0;
810
811                                 if (!(best_iface == NULL ||
812                                       best_netmatch < this_netmatch ||
813                                       (best_netmatch == this_netmatch &&
814                                        best_npeers > iface->ksni_npeers)))
815                                         continue;
816
817                                 best_iface = iface;
818                                 best_netmatch = this_netmatch;
819                                 best_npeers = iface->ksni_npeers;
820                         }
821
822                         LASSERT(best_iface != NULL);
823
824                         best_iface->ksni_npeers++;
825                         ip = best_iface->ksni_ipaddr;
826                         peer_ni->ksnp_passive_ips[i] = ip;
827                         peer_ni->ksnp_n_passive_ips = i+1;
828                 }
829
830                 /* mark the best matching peer_ni IP used */
831                 j = ksocknal_match_peerip(best_iface, peerips, n_peerips);
832                 peerips[j] = 0;
833         }
834
835         /* Overwrite input peer_ni IP addresses */
836         memcpy(peerips, peer_ni->ksnp_passive_ips, n_ips * sizeof(*peerips));
837
838         write_unlock_bh(global_lock);
839
840         return (n_ips);
841 }
842
843 static void
844 ksocknal_create_routes(struct ksock_peer_ni *peer_ni, int port,
845                        __u32 *peer_ipaddrs, int npeer_ipaddrs)
846 {
847         struct ksock_route              *newroute = NULL;
848         rwlock_t                *global_lock = &ksocknal_data.ksnd_global_lock;
849         struct lnet_ni *ni = peer_ni->ksnp_ni;
850         struct ksock_net                *net = ni->ni_data;
851         struct list_head        *rtmp;
852         struct ksock_route              *route;
853         struct ksock_interface  *iface;
854         struct ksock_interface  *best_iface;
855         int                     best_netmatch;
856         int                     this_netmatch;
857         int                     best_nroutes;
858         int                     i;
859         int                     j;
860
861         /* CAVEAT EMPTOR: We do all our interface matching with an
862          * exclusive hold of global lock at IRQ priority.  We're only
863          * expecting to be dealing with small numbers of interfaces, so the
864          * O(n**3)-ness here shouldn't matter */
865
866         write_lock_bh(global_lock);
867
868         if (net->ksnn_ninterfaces < 2) {
869                 /* Only create additional connections
870                  * if I have > 1 interface */
871                 write_unlock_bh(global_lock);
872                 return;
873         }
874
875         LASSERT(npeer_ipaddrs <= LNET_INTERFACES_NUM);
876
877         for (i = 0; i < npeer_ipaddrs; i++) {
878                 if (newroute != NULL) {
879                         newroute->ksnr_ipaddr = peer_ipaddrs[i];
880                 } else {
881                         write_unlock_bh(global_lock);
882
883                         newroute = ksocknal_create_route(peer_ipaddrs[i], port);
884                         if (newroute == NULL)
885                                 return;
886
887                         write_lock_bh(global_lock);
888                 }
889
890                 if (peer_ni->ksnp_closing) {
891                         /* peer_ni got closed under me */
892                         break;
893                 }
894
895                 /* Already got a route? */
896                 route = NULL;
897                 list_for_each(rtmp, &peer_ni->ksnp_routes) {
898                         route = list_entry(rtmp, struct ksock_route, ksnr_list);
899
900                         if (route->ksnr_ipaddr == newroute->ksnr_ipaddr)
901                                 break;
902
903                         route = NULL;
904                 }
905                 if (route != NULL)
906                         continue;
907
908                 best_iface = NULL;
909                 best_nroutes = 0;
910                 best_netmatch = 0;
911
912                 LASSERT(net->ksnn_ninterfaces <= LNET_INTERFACES_NUM);
913
914                 /* Select interface to connect from */
915                 for (j = 0; j < net->ksnn_ninterfaces; j++) {
916                         iface = &net->ksnn_interfaces[j];
917
918                         /* Using this interface already? */
919                         list_for_each(rtmp, &peer_ni->ksnp_routes) {
920                                 route = list_entry(rtmp, struct ksock_route,
921                                                    ksnr_list);
922
923                                 if (route->ksnr_myipaddr == iface->ksni_ipaddr)
924                                         break;
925
926                                 route = NULL;
927                         }
928                         if (route != NULL)
929                                 continue;
930
931                         this_netmatch = (((iface->ksni_ipaddr ^
932                                            newroute->ksnr_ipaddr) &
933                                            iface->ksni_netmask) == 0) ? 1 : 0;
934
935                         if (!(best_iface == NULL ||
936                               best_netmatch < this_netmatch ||
937                               (best_netmatch == this_netmatch &&
938                                best_nroutes > iface->ksni_nroutes)))
939                                 continue;
940
941                         best_iface = iface;
942                         best_netmatch = this_netmatch;
943                         best_nroutes = iface->ksni_nroutes;
944                 }
945
946                 if (best_iface == NULL)
947                         continue;
948
949                 newroute->ksnr_myipaddr = best_iface->ksni_ipaddr;
950                 best_iface->ksni_nroutes++;
951
952                 ksocknal_add_route_locked(peer_ni, newroute);
953                 newroute = NULL;
954         }
955
956         write_unlock_bh(global_lock);
957         if (newroute != NULL)
958                 ksocknal_route_decref(newroute);
959 }
960
961 int
962 ksocknal_accept(struct lnet_ni *ni, struct socket *sock)
963 {
964         struct ksock_connreq *cr;
965         int rc;
966         u32 peer_ip;
967         int peer_port;
968
969         rc = lnet_sock_getaddr(sock, true, &peer_ip, &peer_port);
970         LASSERT(rc == 0);               /* we succeeded before */
971
972         LIBCFS_ALLOC(cr, sizeof(*cr));
973         if (cr == NULL) {
974                 LCONSOLE_ERROR_MSG(0x12f, "Dropping connection request from "
975                                    "%pI4h: memory exhausted\n", &peer_ip);
976                 return -ENOMEM;
977         }
978
979         lnet_ni_addref(ni);
980         cr->ksncr_ni   = ni;
981         cr->ksncr_sock = sock;
982
983         spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
984
985         list_add_tail(&cr->ksncr_list, &ksocknal_data.ksnd_connd_connreqs);
986         wake_up(&ksocknal_data.ksnd_connd_waitq);
987
988         spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
989         return 0;
990 }
991
992 static int
993 ksocknal_connecting(struct ksock_peer_ni *peer_ni, __u32 ipaddr)
994 {
995         struct ksock_route *route;
996
997         list_for_each_entry(route, &peer_ni->ksnp_routes, ksnr_list) {
998                 if (route->ksnr_ipaddr == ipaddr)
999                         return route->ksnr_connecting;
1000         }
1001         return 0;
1002 }
1003
1004 int
1005 ksocknal_create_conn(struct lnet_ni *ni, struct ksock_route *route,
1006                      struct socket *sock, int type)
1007 {
1008         rwlock_t *global_lock = &ksocknal_data.ksnd_global_lock;
1009         LIST_HEAD(zombies);
1010         struct lnet_process_id peerid;
1011         struct list_head *tmp;
1012         u64 incarnation;
1013         struct ksock_conn *conn;
1014         struct ksock_conn *conn2;
1015         struct ksock_peer_ni *peer_ni = NULL;
1016         struct ksock_peer_ni *peer2;
1017         struct ksock_sched *sched;
1018         struct ksock_hello_msg *hello;
1019         int cpt;
1020         struct ksock_tx *tx;
1021         struct ksock_tx *txtmp;
1022         int rc;
1023         int rc2;
1024         int active;
1025         char *warn = NULL;
1026
1027         active = (route != NULL);
1028
1029         LASSERT (active == (type != SOCKLND_CONN_NONE));
1030
1031         LIBCFS_ALLOC(conn, sizeof(*conn));
1032         if (conn == NULL) {
1033                 rc = -ENOMEM;
1034                 goto failed_0;
1035         }
1036
1037         conn->ksnc_peer = NULL;
1038         conn->ksnc_route = NULL;
1039         conn->ksnc_sock = sock;
1040         /* 2 ref, 1 for conn, another extra ref prevents socket
1041          * being closed before establishment of connection */
1042         atomic_set (&conn->ksnc_sock_refcount, 2);
1043         conn->ksnc_type = type;
1044         ksocknal_lib_save_callback(sock, conn);
1045         atomic_set (&conn->ksnc_conn_refcount, 1); /* 1 ref for me */
1046
1047         conn->ksnc_rx_ready = 0;
1048         conn->ksnc_rx_scheduled = 0;
1049
1050         INIT_LIST_HEAD(&conn->ksnc_tx_queue);
1051         conn->ksnc_tx_ready = 0;
1052         conn->ksnc_tx_scheduled = 0;
1053         conn->ksnc_tx_carrier = NULL;
1054         atomic_set (&conn->ksnc_tx_nob, 0);
1055
1056         LIBCFS_ALLOC(hello, offsetof(struct ksock_hello_msg,
1057                                      kshm_ips[LNET_INTERFACES_NUM]));
1058         if (hello == NULL) {
1059                 rc = -ENOMEM;
1060                 goto failed_1;
1061         }
1062
1063         /* stash conn's local and remote addrs */
1064         rc = ksocknal_lib_get_conn_addrs (conn);
1065         if (rc != 0)
1066                 goto failed_1;
1067
1068         /* Find out/confirm peer_ni's NID and connection type and get the
1069          * vector of interfaces she's willing to let me connect to.
1070          * Passive connections use the listener timeout since the peer_ni sends
1071          * eagerly */
1072
1073         if (active) {
1074                 peer_ni = route->ksnr_peer;
1075                 LASSERT(ni == peer_ni->ksnp_ni);
1076
1077                 /* Active connection sends HELLO eagerly */
1078                 hello->kshm_nips = ksocknal_local_ipvec(ni, hello->kshm_ips);
1079                 peerid = peer_ni->ksnp_id;
1080
1081                 write_lock_bh(global_lock);
1082                 conn->ksnc_proto = peer_ni->ksnp_proto;
1083                 write_unlock_bh(global_lock);
1084
1085                 if (conn->ksnc_proto == NULL) {
1086                          conn->ksnc_proto = &ksocknal_protocol_v3x;
1087 #if SOCKNAL_VERSION_DEBUG
1088                          if (*ksocknal_tunables.ksnd_protocol == 2)
1089                                  conn->ksnc_proto = &ksocknal_protocol_v2x;
1090                          else if (*ksocknal_tunables.ksnd_protocol == 1)
1091                                  conn->ksnc_proto = &ksocknal_protocol_v1x;
1092 #endif
1093                 }
1094
1095                 rc = ksocknal_send_hello (ni, conn, peerid.nid, hello);
1096                 if (rc != 0)
1097                         goto failed_1;
1098         } else {
1099                 peerid.nid = LNET_NID_ANY;
1100                 peerid.pid = LNET_PID_ANY;
1101
1102                 /* Passive, get protocol from peer_ni */
1103                 conn->ksnc_proto = NULL;
1104         }
1105
1106         rc = ksocknal_recv_hello (ni, conn, hello, &peerid, &incarnation);
1107         if (rc < 0)
1108                 goto failed_1;
1109
1110         LASSERT (rc == 0 || active);
1111         LASSERT (conn->ksnc_proto != NULL);
1112         LASSERT (peerid.nid != LNET_NID_ANY);
1113
1114         cpt = lnet_cpt_of_nid(peerid.nid, ni);
1115
1116         if (active) {
1117                 ksocknal_peer_addref(peer_ni);
1118                 write_lock_bh(global_lock);
1119         } else {
1120                 peer_ni = ksocknal_create_peer(ni, peerid);
1121                 if (IS_ERR(peer_ni)) {
1122                         rc = PTR_ERR(peer_ni);
1123                         goto failed_1;
1124                 }
1125
1126                 write_lock_bh(global_lock);
1127
1128                 /* called with a ref on ni, so shutdown can't have started */
1129                 LASSERT(atomic_read(&((struct ksock_net *)ni->ni_data)->ksnn_npeers) >= 0);
1130
1131                 peer2 = ksocknal_find_peer_locked(ni, peerid);
1132                 if (peer2 == NULL) {
1133                         /* NB this puts an "empty" peer_ni in the peer_ni
1134                          * table (which takes my ref) */
1135                         list_add_tail(&peer_ni->ksnp_list,
1136                                       ksocknal_nid2peerlist(peerid.nid));
1137                 } else {
1138                         ksocknal_peer_decref(peer_ni);
1139                         peer_ni = peer2;
1140                 }
1141
1142                 /* +1 ref for me */
1143                 ksocknal_peer_addref(peer_ni);
1144                 peer_ni->ksnp_accepting++;
1145
1146                 /* Am I already connecting to this guy?  Resolve in
1147                  * favour of higher NID... */
1148                 if (peerid.nid < ni->ni_nid &&
1149                     ksocknal_connecting(peer_ni, conn->ksnc_ipaddr)) {
1150                         rc = EALREADY;
1151                         warn = "connection race resolution";
1152                         goto failed_2;
1153                 }
1154         }
1155
1156         if (peer_ni->ksnp_closing ||
1157             (active && route->ksnr_deleted)) {
1158                 /* peer_ni/route got closed under me */
1159                 rc = -ESTALE;
1160                 warn = "peer_ni/route removed";
1161                 goto failed_2;
1162         }
1163
1164         if (peer_ni->ksnp_proto == NULL) {
1165                 /* Never connected before.
1166                  * NB recv_hello may have returned EPROTO to signal my peer_ni
1167                  * wants a different protocol than the one I asked for.
1168                  */
1169                 LASSERT(list_empty(&peer_ni->ksnp_conns));
1170
1171                 peer_ni->ksnp_proto = conn->ksnc_proto;
1172                 peer_ni->ksnp_incarnation = incarnation;
1173         }
1174
1175         if (peer_ni->ksnp_proto != conn->ksnc_proto ||
1176             peer_ni->ksnp_incarnation != incarnation) {
1177                 /* peer_ni rebooted or I've got the wrong protocol version */
1178                 ksocknal_close_peer_conns_locked(peer_ni, 0, 0);
1179
1180                 peer_ni->ksnp_proto = NULL;
1181                 rc = ESTALE;
1182                 warn = peer_ni->ksnp_incarnation != incarnation ?
1183                        "peer_ni rebooted" :
1184                        "wrong proto version";
1185                 goto failed_2;
1186         }
1187
1188         switch (rc) {
1189         default:
1190                 LBUG();
1191         case 0:
1192                 break;
1193         case EALREADY:
1194                 warn = "lost conn race";
1195                 goto failed_2;
1196         case EPROTO:
1197                 warn = "retry with different protocol version";
1198                 goto failed_2;
1199         }
1200
1201         /* Refuse to duplicate an existing connection, unless this is a
1202          * loopback connection */
1203         if (conn->ksnc_ipaddr != conn->ksnc_myipaddr) {
1204                 list_for_each(tmp, &peer_ni->ksnp_conns) {
1205                         conn2 = list_entry(tmp, struct ksock_conn, ksnc_list);
1206
1207                         if (conn2->ksnc_ipaddr != conn->ksnc_ipaddr ||
1208                             conn2->ksnc_myipaddr != conn->ksnc_myipaddr ||
1209                             conn2->ksnc_type != conn->ksnc_type)
1210                                 continue;
1211
1212                         /* Reply on a passive connection attempt so the peer_ni
1213                          * realises we're connected. */
1214                         LASSERT (rc == 0);
1215                         if (!active)
1216                                 rc = EALREADY;
1217
1218                         warn = "duplicate";
1219                         goto failed_2;
1220                 }
1221         }
1222
1223         /* If the connection created by this route didn't bind to the IP
1224          * address the route connected to, the connection/route matching
1225          * code below probably isn't going to work. */
1226         if (active &&
1227             route->ksnr_ipaddr != conn->ksnc_ipaddr) {
1228                 CERROR("Route %s %pI4h connected to %pI4h\n",
1229                        libcfs_id2str(peer_ni->ksnp_id),
1230                        &route->ksnr_ipaddr,
1231                        &conn->ksnc_ipaddr);
1232         }
1233
1234         /* Search for a route corresponding to the new connection and
1235          * create an association.  This allows incoming connections created
1236          * by routes in my peer_ni to match my own route entries so I don't
1237          * continually create duplicate routes. */
1238         list_for_each(tmp, &peer_ni->ksnp_routes) {
1239                 route = list_entry(tmp, struct ksock_route, ksnr_list);
1240
1241                 if (route->ksnr_ipaddr != conn->ksnc_ipaddr)
1242                         continue;
1243
1244                 ksocknal_associate_route_conn_locked(route, conn);
1245                 break;
1246         }
1247
1248         conn->ksnc_peer = peer_ni;                 /* conn takes my ref on peer_ni */
1249         peer_ni->ksnp_last_alive = ktime_get_seconds();
1250         peer_ni->ksnp_send_keepalive = 0;
1251         peer_ni->ksnp_error = 0;
1252
1253         sched = ksocknal_choose_scheduler_locked(cpt);
1254         if (!sched) {
1255                 CERROR("no schedulers available. node is unhealthy\n");
1256                 goto failed_2;
1257         }
1258         /*
1259          * The cpt might have changed if we ended up selecting a non cpt
1260          * native scheduler. So use the scheduler's cpt instead.
1261          */
1262         cpt = sched->kss_cpt;
1263         sched->kss_nconns++;
1264         conn->ksnc_scheduler = sched;
1265
1266         conn->ksnc_tx_last_post = ktime_get_seconds();
1267         /* Set the deadline for the outgoing HELLO to drain */
1268         conn->ksnc_tx_bufnob = sock->sk->sk_wmem_queued;
1269         conn->ksnc_tx_deadline = ktime_get_seconds() +
1270                                  lnet_get_lnd_timeout();
1271         smp_mb();   /* order with adding to peer_ni's conn list */
1272
1273         list_add(&conn->ksnc_list, &peer_ni->ksnp_conns);
1274         ksocknal_conn_addref(conn);
1275
1276         ksocknal_new_packet(conn, 0);
1277
1278         conn->ksnc_zc_capable = ksocknal_lib_zc_capable(conn);
1279
1280         /* Take packets blocking for this connection. */
1281         list_for_each_entry_safe(tx, txtmp, &peer_ni->ksnp_tx_queue, tx_list) {
1282                 if (conn->ksnc_proto->pro_match_tx(conn, tx, tx->tx_nonblk) ==
1283                     SOCKNAL_MATCH_NO)
1284                         continue;
1285
1286                 list_del(&tx->tx_list);
1287                 ksocknal_queue_tx_locked(tx, conn);
1288         }
1289
1290         write_unlock_bh(global_lock);
1291
1292         /* We've now got a new connection.  Any errors from here on are just
1293          * like "normal" comms errors and we close the connection normally.
1294          * NB (a) we still have to send the reply HELLO for passive
1295          *        connections,
1296          *    (b) normal I/O on the conn is blocked until I setup and call the
1297          *        socket callbacks.
1298          */
1299
1300         CDEBUG(D_NET, "New conn %s p %d.x %pI4h -> %pI4h/%d"
1301                " incarnation:%lld sched[%d]\n",
1302                libcfs_id2str(peerid), conn->ksnc_proto->pro_version,
1303                &conn->ksnc_myipaddr, &conn->ksnc_ipaddr,
1304                conn->ksnc_port, incarnation, cpt);
1305
1306         if (active) {
1307                 /* additional routes after interface exchange? */
1308                 ksocknal_create_routes(peer_ni, conn->ksnc_port,
1309                                        hello->kshm_ips, hello->kshm_nips);
1310         } else {
1311                 hello->kshm_nips = ksocknal_select_ips(peer_ni, hello->kshm_ips,
1312                                                        hello->kshm_nips);
1313                 rc = ksocknal_send_hello(ni, conn, peerid.nid, hello);
1314         }
1315
1316         LIBCFS_FREE(hello, offsetof(struct ksock_hello_msg,
1317                                     kshm_ips[LNET_INTERFACES_NUM]));
1318
1319         /* setup the socket AFTER I've received hello (it disables
1320          * SO_LINGER).  I might call back to the acceptor who may want
1321          * to send a protocol version response and then close the
1322          * socket; this ensures the socket only tears down after the
1323          * response has been sent. */
1324         if (rc == 0)
1325                 rc = ksocknal_lib_setup_sock(sock);
1326
1327         write_lock_bh(global_lock);
1328
1329         /* NB my callbacks block while I hold ksnd_global_lock */
1330         ksocknal_lib_set_callback(sock, conn);
1331
1332         if (!active)
1333                 peer_ni->ksnp_accepting--;
1334
1335         write_unlock_bh(global_lock);
1336
1337         if (rc != 0) {
1338                 write_lock_bh(global_lock);
1339                 if (!conn->ksnc_closing) {
1340                         /* could be closed by another thread */
1341                         ksocknal_close_conn_locked(conn, rc);
1342                 }
1343                 write_unlock_bh(global_lock);
1344         } else if (ksocknal_connsock_addref(conn) == 0) {
1345                 /* Allow I/O to proceed. */
1346                 ksocknal_read_callback(conn);
1347                 ksocknal_write_callback(conn);
1348                 ksocknal_connsock_decref(conn);
1349         }
1350
1351         ksocknal_connsock_decref(conn);
1352         ksocknal_conn_decref(conn);
1353         return rc;
1354
1355 failed_2:
1356         if (!peer_ni->ksnp_closing &&
1357             list_empty(&peer_ni->ksnp_conns) &&
1358             list_empty(&peer_ni->ksnp_routes)) {
1359                 list_add(&zombies, &peer_ni->ksnp_tx_queue);
1360                 list_del_init(&peer_ni->ksnp_tx_queue);
1361                 ksocknal_unlink_peer_locked(peer_ni);
1362         }
1363
1364         write_unlock_bh(global_lock);
1365
1366         if (warn != NULL) {
1367                 if (rc < 0)
1368                         CERROR("Not creating conn %s type %d: %s\n",
1369                                libcfs_id2str(peerid), conn->ksnc_type, warn);
1370                 else
1371                         CDEBUG(D_NET, "Not creating conn %s type %d: %s\n",
1372                               libcfs_id2str(peerid), conn->ksnc_type, warn);
1373         }
1374
1375         if (!active) {
1376                 if (rc > 0) {
1377                         /* Request retry by replying with CONN_NONE
1378                          * ksnc_proto has been set already */
1379                         conn->ksnc_type = SOCKLND_CONN_NONE;
1380                         hello->kshm_nips = 0;
1381                         ksocknal_send_hello(ni, conn, peerid.nid, hello);
1382                 }
1383
1384                 write_lock_bh(global_lock);
1385                 peer_ni->ksnp_accepting--;
1386                 write_unlock_bh(global_lock);
1387         }
1388
1389         /*
1390          * If we get here without an error code, just use -EALREADY.
1391          * Depending on how we got here, the error may be positive
1392          * or negative. Normalize the value for ksocknal_txlist_done().
1393          */
1394         rc2 = (rc == 0 ? -EALREADY : (rc > 0 ? -rc : rc));
1395         ksocknal_txlist_done(ni, &zombies, rc2);
1396         ksocknal_peer_decref(peer_ni);
1397
1398 failed_1:
1399         if (hello != NULL)
1400                 LIBCFS_FREE(hello, offsetof(struct ksock_hello_msg,
1401                                             kshm_ips[LNET_INTERFACES_NUM]));
1402
1403         LIBCFS_FREE(conn, sizeof(*conn));
1404
1405 failed_0:
1406         sock_release(sock);
1407         return rc;
1408 }
1409
1410 void
1411 ksocknal_close_conn_locked(struct ksock_conn *conn, int error)
1412 {
1413         /* This just does the immmediate housekeeping, and queues the
1414          * connection for the reaper to terminate.
1415          * Caller holds ksnd_global_lock exclusively in irq context */
1416         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1417         struct ksock_route *route;
1418         struct ksock_conn *conn2;
1419         struct list_head *tmp;
1420
1421         LASSERT(peer_ni->ksnp_error == 0);
1422         LASSERT(!conn->ksnc_closing);
1423         conn->ksnc_closing = 1;
1424
1425         /* ksnd_deathrow_conns takes over peer_ni's ref */
1426         list_del(&conn->ksnc_list);
1427
1428         route = conn->ksnc_route;
1429         if (route != NULL) {
1430                 /* dissociate conn from route... */
1431                 LASSERT(!route->ksnr_deleted);
1432                 LASSERT((route->ksnr_connected & (1 << conn->ksnc_type)) != 0);
1433
1434                 conn2 = NULL;
1435                 list_for_each(tmp, &peer_ni->ksnp_conns) {
1436                         conn2 = list_entry(tmp, struct ksock_conn, ksnc_list);
1437
1438                         if (conn2->ksnc_route == route &&
1439                             conn2->ksnc_type == conn->ksnc_type)
1440                                 break;
1441
1442                         conn2 = NULL;
1443                 }
1444                 if (conn2 == NULL)
1445                         route->ksnr_connected &= ~(1 << conn->ksnc_type);
1446
1447                 conn->ksnc_route = NULL;
1448
1449                 ksocknal_route_decref(route);   /* drop conn's ref on route */
1450         }
1451
1452         if (list_empty(&peer_ni->ksnp_conns)) {
1453                 /* No more connections to this peer_ni */
1454
1455                 if (!list_empty(&peer_ni->ksnp_tx_queue)) {
1456                         struct ksock_tx *tx;
1457
1458                         LASSERT(conn->ksnc_proto == &ksocknal_protocol_v3x);
1459
1460                         /* throw them to the last connection...,
1461                          * these TXs will be send to /dev/null by scheduler */
1462                         list_for_each_entry(tx, &peer_ni->ksnp_tx_queue,
1463                                             tx_list)
1464                                 ksocknal_tx_prep(conn, tx);
1465
1466                         spin_lock_bh(&conn->ksnc_scheduler->kss_lock);
1467                         list_splice_init(&peer_ni->ksnp_tx_queue,
1468                                          &conn->ksnc_tx_queue);
1469                         spin_unlock_bh(&conn->ksnc_scheduler->kss_lock);
1470                 }
1471
1472                 /* renegotiate protocol version */
1473                 peer_ni->ksnp_proto = NULL;
1474                 /* stash last conn close reason */
1475                 peer_ni->ksnp_error = error;
1476
1477                 if (list_empty(&peer_ni->ksnp_routes)) {
1478                         /* I've just closed last conn belonging to a
1479                          * peer_ni with no routes to it */
1480                         ksocknal_unlink_peer_locked(peer_ni);
1481                 }
1482         }
1483
1484         spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
1485
1486         list_add_tail(&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
1487         wake_up(&ksocknal_data.ksnd_reaper_waitq);
1488
1489         spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
1490 }
1491
1492 void
1493 ksocknal_peer_failed(struct ksock_peer_ni *peer_ni)
1494 {
1495         int notify = 0;
1496         time64_t last_alive = 0;
1497
1498         /* There has been a connection failure or comms error; but I'll only
1499          * tell LNET I think the peer_ni is dead if it's to another kernel and
1500          * there are no connections or connection attempts in existence. */
1501
1502         read_lock(&ksocknal_data.ksnd_global_lock);
1503
1504         if ((peer_ni->ksnp_id.pid & LNET_PID_USERFLAG) == 0 &&
1505              list_empty(&peer_ni->ksnp_conns) &&
1506              peer_ni->ksnp_accepting == 0 &&
1507              ksocknal_find_connecting_route_locked(peer_ni) == NULL) {
1508                 notify = 1;
1509                 last_alive = peer_ni->ksnp_last_alive;
1510         }
1511
1512         read_unlock(&ksocknal_data.ksnd_global_lock);
1513
1514         if (notify)
1515                 lnet_notify(peer_ni->ksnp_ni, peer_ni->ksnp_id.nid,
1516                             false, false, last_alive);
1517 }
1518
1519 void
1520 ksocknal_finalize_zcreq(struct ksock_conn *conn)
1521 {
1522         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1523         struct ksock_tx *tx;
1524         struct ksock_tx *tmp;
1525         LIST_HEAD(zlist);
1526
1527         /* NB safe to finalize TXs because closing of socket will
1528          * abort all buffered data */
1529         LASSERT(conn->ksnc_sock == NULL);
1530
1531         spin_lock(&peer_ni->ksnp_lock);
1532
1533         list_for_each_entry_safe(tx, tmp, &peer_ni->ksnp_zc_req_list, tx_zc_list) {
1534                 if (tx->tx_conn != conn)
1535                         continue;
1536
1537                 LASSERT(tx->tx_msg.ksm_zc_cookies[0] != 0);
1538
1539                 tx->tx_msg.ksm_zc_cookies[0] = 0;
1540                 tx->tx_zc_aborted = 1;  /* mark it as not-acked */
1541                 list_move(&tx->tx_zc_list, &zlist);
1542         }
1543
1544         spin_unlock(&peer_ni->ksnp_lock);
1545
1546         while (!list_empty(&zlist)) {
1547                 tx = list_entry(zlist.next, struct ksock_tx, tx_zc_list);
1548
1549                 list_del(&tx->tx_zc_list);
1550                 ksocknal_tx_decref(tx);
1551         }
1552 }
1553
1554 void
1555 ksocknal_terminate_conn(struct ksock_conn *conn)
1556 {
1557         /* This gets called by the reaper (guaranteed thread context) to
1558          * disengage the socket from its callbacks and close it.
1559          * ksnc_refcount will eventually hit zero, and then the reaper will
1560          * destroy it. */
1561         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1562         struct ksock_sched *sched = conn->ksnc_scheduler;
1563         int failed = 0;
1564
1565         LASSERT(conn->ksnc_closing);
1566
1567         /* wake up the scheduler to "send" all remaining packets to /dev/null */
1568         spin_lock_bh(&sched->kss_lock);
1569
1570         /* a closing conn is always ready to tx */
1571         conn->ksnc_tx_ready = 1;
1572
1573         if (!conn->ksnc_tx_scheduled &&
1574             !list_empty(&conn->ksnc_tx_queue)) {
1575                 list_add_tail(&conn->ksnc_tx_list,
1576                                &sched->kss_tx_conns);
1577                 conn->ksnc_tx_scheduled = 1;
1578                 /* extra ref for scheduler */
1579                 ksocknal_conn_addref(conn);
1580
1581                 wake_up (&sched->kss_waitq);
1582         }
1583
1584         spin_unlock_bh(&sched->kss_lock);
1585
1586         /* serialise with callbacks */
1587         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1588
1589         ksocknal_lib_reset_callback(conn->ksnc_sock, conn);
1590
1591         /* OK, so this conn may not be completely disengaged from its
1592          * scheduler yet, but it _has_ committed to terminate... */
1593         conn->ksnc_scheduler->kss_nconns--;
1594
1595         if (peer_ni->ksnp_error != 0) {
1596                 /* peer_ni's last conn closed in error */
1597                 LASSERT(list_empty(&peer_ni->ksnp_conns));
1598                 failed = 1;
1599                 peer_ni->ksnp_error = 0;     /* avoid multiple notifications */
1600         }
1601
1602         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1603
1604         if (failed)
1605                 ksocknal_peer_failed(peer_ni);
1606
1607         /* The socket is closed on the final put; either here, or in
1608          * ksocknal_{send,recv}msg().  Since we set up the linger2 option
1609          * when the connection was established, this will close the socket
1610          * immediately, aborting anything buffered in it. Any hung
1611          * zero-copy transmits will therefore complete in finite time. */
1612         ksocknal_connsock_decref(conn);
1613 }
1614
1615 void
1616 ksocknal_queue_zombie_conn(struct ksock_conn *conn)
1617 {
1618         /* Queue the conn for the reaper to destroy */
1619         LASSERT(atomic_read(&conn->ksnc_conn_refcount) == 0);
1620         spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
1621
1622         list_add_tail(&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1623         wake_up(&ksocknal_data.ksnd_reaper_waitq);
1624
1625         spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
1626 }
1627
1628 void
1629 ksocknal_destroy_conn(struct ksock_conn *conn)
1630 {
1631         time64_t last_rcv;
1632
1633         /* Final coup-de-grace of the reaper */
1634         CDEBUG (D_NET, "connection %p\n", conn);
1635
1636         LASSERT (atomic_read (&conn->ksnc_conn_refcount) == 0);
1637         LASSERT (atomic_read (&conn->ksnc_sock_refcount) == 0);
1638         LASSERT (conn->ksnc_sock == NULL);
1639         LASSERT (conn->ksnc_route == NULL);
1640         LASSERT (!conn->ksnc_tx_scheduled);
1641         LASSERT (!conn->ksnc_rx_scheduled);
1642         LASSERT(list_empty(&conn->ksnc_tx_queue));
1643
1644         /* complete current receive if any */
1645         switch (conn->ksnc_rx_state) {
1646         case SOCKNAL_RX_LNET_PAYLOAD:
1647                 last_rcv = conn->ksnc_rx_deadline -
1648                            lnet_get_lnd_timeout();
1649                 CERROR("Completing partial receive from %s[%d], "
1650                        "ip %pI4h:%d, with error, wanted: %d, left: %d, "
1651                        "last alive is %lld secs ago\n",
1652                        libcfs_id2str(conn->ksnc_peer->ksnp_id), conn->ksnc_type,
1653                        &conn->ksnc_ipaddr, conn->ksnc_port,
1654                        conn->ksnc_rx_nob_wanted, conn->ksnc_rx_nob_left,
1655                        ktime_get_seconds() - last_rcv);
1656                 if (conn->ksnc_lnet_msg)
1657                         conn->ksnc_lnet_msg->msg_health_status =
1658                                 LNET_MSG_STATUS_REMOTE_ERROR;
1659                 lnet_finalize(conn->ksnc_lnet_msg, -EIO);
1660                 break;
1661         case SOCKNAL_RX_LNET_HEADER:
1662                 if (conn->ksnc_rx_started)
1663                         CERROR("Incomplete receive of lnet header from %s, "
1664                                "ip %pI4h:%d, with error, protocol: %d.x.\n",
1665                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1666                                &conn->ksnc_ipaddr, conn->ksnc_port,
1667                                conn->ksnc_proto->pro_version);
1668                 break;
1669         case SOCKNAL_RX_KSM_HEADER:
1670                 if (conn->ksnc_rx_started)
1671                         CERROR("Incomplete receive of ksock message from %s, "
1672                                "ip %pI4h:%d, with error, protocol: %d.x.\n",
1673                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1674                                &conn->ksnc_ipaddr, conn->ksnc_port,
1675                                conn->ksnc_proto->pro_version);
1676                 break;
1677         case SOCKNAL_RX_SLOP:
1678                 if (conn->ksnc_rx_started)
1679                         CERROR("Incomplete receive of slops from %s, "
1680                                "ip %pI4h:%d, with error\n",
1681                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1682                                &conn->ksnc_ipaddr, conn->ksnc_port);
1683                break;
1684         default:
1685                 LBUG ();
1686                 break;
1687         }
1688
1689         ksocknal_peer_decref(conn->ksnc_peer);
1690
1691         LIBCFS_FREE (conn, sizeof (*conn));
1692 }
1693
1694 int
1695 ksocknal_close_peer_conns_locked(struct ksock_peer_ni *peer_ni, __u32 ipaddr, int why)
1696 {
1697         struct ksock_conn *conn;
1698         struct list_head *ctmp;
1699         struct list_head *cnxt;
1700         int count = 0;
1701
1702         list_for_each_safe(ctmp, cnxt, &peer_ni->ksnp_conns) {
1703                 conn = list_entry(ctmp, struct ksock_conn, ksnc_list);
1704
1705                 if (ipaddr == 0 ||
1706                     conn->ksnc_ipaddr == ipaddr) {
1707                         count++;
1708                         ksocknal_close_conn_locked (conn, why);
1709                 }
1710         }
1711
1712         return (count);
1713 }
1714
1715 int
1716 ksocknal_close_conn_and_siblings(struct ksock_conn *conn, int why)
1717 {
1718         struct ksock_peer_ni *peer_ni = conn->ksnc_peer;
1719         u32 ipaddr = conn->ksnc_ipaddr;
1720         int count;
1721
1722         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1723
1724         count = ksocknal_close_peer_conns_locked (peer_ni, ipaddr, why);
1725
1726         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1727
1728         return (count);
1729 }
1730
1731 int
1732 ksocknal_close_matching_conns(struct lnet_process_id id, __u32 ipaddr)
1733 {
1734         struct ksock_peer_ni *peer_ni;
1735         struct list_head *ptmp;
1736         struct list_head *pnxt;
1737         int lo;
1738         int hi;
1739         int i;
1740         int count = 0;
1741
1742         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1743
1744         if (id.nid != LNET_NID_ANY)
1745                 lo = hi = (int)(ksocknal_nid2peerlist(id.nid) - ksocknal_data.ksnd_peers);
1746         else {
1747                 lo = 0;
1748                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
1749         }
1750
1751         for (i = lo; i <= hi; i++) {
1752                 list_for_each_safe(ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
1753
1754                         peer_ni = list_entry(ptmp, struct ksock_peer_ni, ksnp_list);
1755
1756                         if (!((id.nid == LNET_NID_ANY || id.nid == peer_ni->ksnp_id.nid) &&
1757                               (id.pid == LNET_PID_ANY || id.pid == peer_ni->ksnp_id.pid)))
1758                                 continue;
1759
1760                         count += ksocknal_close_peer_conns_locked (peer_ni, ipaddr, 0);
1761                 }
1762         }
1763
1764         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1765
1766         /* wildcards always succeed */
1767         if (id.nid == LNET_NID_ANY || id.pid == LNET_PID_ANY || ipaddr == 0)
1768                 return (0);
1769
1770         return (count == 0 ? -ENOENT : 0);
1771 }
1772
1773 void
1774 ksocknal_notify_gw_down(lnet_nid_t gw_nid)
1775 {
1776         /* The router is telling me she's been notified of a change in
1777          * gateway state....
1778          */
1779         struct lnet_process_id id = {
1780                 .nid    = gw_nid,
1781                 .pid    = LNET_PID_ANY,
1782         };
1783
1784         CDEBUG(D_NET, "gw %s down\n", libcfs_nid2str(gw_nid));
1785
1786         /* If the gateway crashed, close all open connections... */
1787         ksocknal_close_matching_conns(id, 0);
1788         return;
1789
1790         /* We can only establish new connections
1791          * if we have autroutes, and these connect on demand. */
1792 }
1793
1794 void
1795 ksocknal_query(struct lnet_ni *ni, lnet_nid_t nid, time64_t *when)
1796 {
1797         int connect = 1;
1798         time64_t last_alive = 0;
1799         time64_t now = ktime_get_seconds();
1800         struct ksock_peer_ni *peer_ni = NULL;
1801         rwlock_t *glock = &ksocknal_data.ksnd_global_lock;
1802         struct lnet_process_id id = {
1803                 .nid = nid,
1804                 .pid = LNET_PID_LUSTRE,
1805         };
1806
1807         read_lock(glock);
1808
1809         peer_ni = ksocknal_find_peer_locked(ni, id);
1810         if (peer_ni != NULL) {
1811                 struct list_head *tmp;
1812                 struct ksock_conn *conn;
1813                 int bufnob;
1814
1815                 list_for_each(tmp, &peer_ni->ksnp_conns) {
1816                         conn = list_entry(tmp, struct ksock_conn, ksnc_list);
1817                         bufnob = conn->ksnc_sock->sk->sk_wmem_queued;
1818
1819                         if (bufnob < conn->ksnc_tx_bufnob) {
1820                                 /* something got ACKed */
1821                                 conn->ksnc_tx_deadline = ktime_get_seconds() +
1822                                                          lnet_get_lnd_timeout();
1823                                 peer_ni->ksnp_last_alive = now;
1824                                 conn->ksnc_tx_bufnob = bufnob;
1825                         }
1826                 }
1827
1828                 last_alive = peer_ni->ksnp_last_alive;
1829                 if (ksocknal_find_connectable_route_locked(peer_ni) == NULL)
1830                         connect = 0;
1831         }
1832
1833         read_unlock(glock);
1834
1835         if (last_alive != 0)
1836                 *when = last_alive;
1837
1838         CDEBUG(D_NET, "peer_ni %s %p, alive %lld secs ago, connect %d\n",
1839                libcfs_nid2str(nid), peer_ni,
1840                last_alive ? now - last_alive : -1,
1841                connect);
1842
1843         if (!connect)
1844                 return;
1845
1846         ksocknal_add_peer(ni, id, LNET_NIDADDR(nid), lnet_acceptor_port());
1847
1848         write_lock_bh(glock);
1849
1850         peer_ni = ksocknal_find_peer_locked(ni, id);
1851         if (peer_ni != NULL)
1852                 ksocknal_launch_all_connections_locked(peer_ni);
1853
1854         write_unlock_bh(glock);
1855 }
1856
1857 static void
1858 ksocknal_push_peer(struct ksock_peer_ni *peer_ni)
1859 {
1860         int index;
1861         int i;
1862         struct list_head *tmp;
1863         struct ksock_conn *conn;
1864
1865         for (index = 0; ; index++) {
1866                 read_lock(&ksocknal_data.ksnd_global_lock);
1867
1868                 i = 0;
1869                 conn = NULL;
1870
1871                 list_for_each(tmp, &peer_ni->ksnp_conns) {
1872                         if (i++ == index) {
1873                                 conn = list_entry(tmp, struct ksock_conn,
1874                                                   ksnc_list);
1875                                 ksocknal_conn_addref(conn);
1876                                 break;
1877                         }
1878                 }
1879
1880                 read_unlock(&ksocknal_data.ksnd_global_lock);
1881
1882                 if (conn == NULL)
1883                         break;
1884
1885                 ksocknal_lib_push_conn (conn);
1886                 ksocknal_conn_decref(conn);
1887         }
1888 }
1889
1890 static int
1891 ksocknal_push(struct lnet_ni *ni, struct lnet_process_id id)
1892 {
1893         struct list_head *start;
1894         struct list_head *end;
1895         struct list_head *tmp;
1896         int               rc = -ENOENT;
1897         unsigned int      hsize = ksocknal_data.ksnd_peer_hash_size;
1898
1899         if (id.nid == LNET_NID_ANY) {
1900                 start = &ksocknal_data.ksnd_peers[0];
1901                 end = &ksocknal_data.ksnd_peers[hsize - 1];
1902         } else {
1903                 start = end = ksocknal_nid2peerlist(id.nid);
1904         }
1905
1906         for (tmp = start; tmp <= end; tmp++) {
1907                 int     peer_off; /* searching offset in peer_ni hash table */
1908
1909                 for (peer_off = 0; ; peer_off++) {
1910                         struct ksock_peer_ni *peer_ni;
1911                         int           i = 0;
1912
1913                         read_lock(&ksocknal_data.ksnd_global_lock);
1914                         list_for_each_entry(peer_ni, tmp, ksnp_list) {
1915                                 if (!((id.nid == LNET_NID_ANY ||
1916                                        id.nid == peer_ni->ksnp_id.nid) &&
1917                                       (id.pid == LNET_PID_ANY ||
1918                                        id.pid == peer_ni->ksnp_id.pid)))
1919                                         continue;
1920
1921                                 if (i++ == peer_off) {
1922                                         ksocknal_peer_addref(peer_ni);
1923                                         break;
1924                                 }
1925                         }
1926                         read_unlock(&ksocknal_data.ksnd_global_lock);
1927
1928                         if (i <= peer_off) /* no match */
1929                                 break;
1930
1931                         rc = 0;
1932                         ksocknal_push_peer(peer_ni);
1933                         ksocknal_peer_decref(peer_ni);
1934                 }
1935         }
1936         return rc;
1937 }
1938
1939 static int
1940 ksocknal_add_interface(struct lnet_ni *ni, __u32 ipaddress, __u32 netmask)
1941 {
1942         struct ksock_net *net = ni->ni_data;
1943         struct ksock_interface *iface;
1944         int rc;
1945         int i;
1946         int j;
1947         struct list_head *ptmp;
1948         struct ksock_peer_ni *peer_ni;
1949         struct list_head *rtmp;
1950         struct ksock_route *route;
1951
1952         if (ipaddress == 0 ||
1953             netmask == 0)
1954                 return -EINVAL;
1955
1956         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1957
1958         iface = ksocknal_ip2iface(ni, ipaddress);
1959         if (iface != NULL) {
1960                 /* silently ignore dups */
1961                 rc = 0;
1962         } else if (net->ksnn_ninterfaces == LNET_INTERFACES_NUM) {
1963                 rc = -ENOSPC;
1964         } else {
1965                 iface = &net->ksnn_interfaces[net->ksnn_ninterfaces++];
1966
1967                 iface->ksni_ipaddr = ipaddress;
1968                 iface->ksni_netmask = netmask;
1969                 iface->ksni_nroutes = 0;
1970                 iface->ksni_npeers = 0;
1971
1972                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1973                         list_for_each(ptmp, &ksocknal_data.ksnd_peers[i]) {
1974                                 peer_ni = list_entry(ptmp, struct ksock_peer_ni,
1975                                                      ksnp_list);
1976
1977                                 for (j = 0; j < peer_ni->ksnp_n_passive_ips; j++)
1978                                         if (peer_ni->ksnp_passive_ips[j] == ipaddress)
1979                                                 iface->ksni_npeers++;
1980
1981                                 list_for_each(rtmp, &peer_ni->ksnp_routes) {
1982                                         route = list_entry(rtmp,
1983                                                            struct ksock_route,
1984                                                            ksnr_list);
1985
1986                                         if (route->ksnr_myipaddr == ipaddress)
1987                                                 iface->ksni_nroutes++;
1988                                 }
1989                         }
1990                 }
1991
1992                 rc = 0;
1993                 /* NB only new connections will pay attention to the new interface! */
1994         }
1995
1996         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1997
1998         return rc;
1999 }
2000
2001 static void
2002 ksocknal_peer_del_interface_locked(struct ksock_peer_ni *peer_ni, __u32 ipaddr)
2003 {
2004         struct list_head *tmp;
2005         struct list_head *nxt;
2006         struct ksock_route *route;
2007         struct ksock_conn *conn;
2008         int i;
2009         int j;
2010
2011         for (i = 0; i < peer_ni->ksnp_n_passive_ips; i++)
2012                 if (peer_ni->ksnp_passive_ips[i] == ipaddr) {
2013                         for (j = i+1; j < peer_ni->ksnp_n_passive_ips; j++)
2014                                 peer_ni->ksnp_passive_ips[j-1] =
2015                                         peer_ni->ksnp_passive_ips[j];
2016                         peer_ni->ksnp_n_passive_ips--;
2017                         break;
2018                 }
2019
2020         list_for_each_safe(tmp, nxt, &peer_ni->ksnp_routes) {
2021                 route = list_entry(tmp, struct ksock_route, ksnr_list);
2022
2023                 if (route->ksnr_myipaddr != ipaddr)
2024                         continue;
2025
2026                 if (route->ksnr_share_count != 0) {
2027                         /* Manually created; keep, but unbind */
2028                         route->ksnr_myipaddr = 0;
2029                 } else {
2030                         ksocknal_del_route_locked(route);
2031                 }
2032         }
2033
2034         list_for_each_safe(tmp, nxt, &peer_ni->ksnp_conns) {
2035                 conn = list_entry(tmp, struct ksock_conn, ksnc_list);
2036
2037                 if (conn->ksnc_myipaddr == ipaddr)
2038                         ksocknal_close_conn_locked (conn, 0);
2039         }
2040 }
2041
2042 static int
2043 ksocknal_del_interface(struct lnet_ni *ni, __u32 ipaddress)
2044 {
2045         struct ksock_net *net = ni->ni_data;
2046         int rc = -ENOENT;
2047         struct list_head *tmp;
2048         struct list_head *nxt;
2049         struct ksock_peer_ni *peer_ni;
2050         u32 this_ip;
2051         int i;
2052         int j;
2053
2054         write_lock_bh(&ksocknal_data.ksnd_global_lock);
2055
2056         for (i = 0; i < net->ksnn_ninterfaces; i++) {
2057                 this_ip = net->ksnn_interfaces[i].ksni_ipaddr;
2058
2059                 if (!(ipaddress == 0 ||
2060                       ipaddress == this_ip))
2061                         continue;
2062
2063                 rc = 0;
2064
2065                 for (j = i+1; j < net->ksnn_ninterfaces; j++)
2066                         net->ksnn_interfaces[j-1] =
2067                                 net->ksnn_interfaces[j];
2068
2069                 net->ksnn_ninterfaces--;
2070
2071                 for (j = 0; j < ksocknal_data.ksnd_peer_hash_size; j++) {
2072                         list_for_each_safe(tmp, nxt,
2073                                            &ksocknal_data.ksnd_peers[j]) {
2074                                 peer_ni = list_entry(tmp, struct ksock_peer_ni,
2075                                                      ksnp_list);
2076
2077                                 if (peer_ni->ksnp_ni != ni)
2078                                         continue;
2079
2080                                 ksocknal_peer_del_interface_locked(peer_ni, this_ip);
2081                         }
2082                 }
2083         }
2084
2085         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
2086
2087         return (rc);
2088 }
2089
2090 int
2091 ksocknal_ctl(struct lnet_ni *ni, unsigned int cmd, void *arg)
2092 {
2093         struct lnet_process_id id = {0};
2094         struct libcfs_ioctl_data *data = arg;
2095         int rc;
2096
2097         switch(cmd) {
2098         case IOC_LIBCFS_GET_INTERFACE: {
2099                 struct ksock_net *net = ni->ni_data;
2100                 struct ksock_interface *iface;
2101
2102                 read_lock(&ksocknal_data.ksnd_global_lock);
2103
2104                 if (data->ioc_count >= (__u32)net->ksnn_ninterfaces) {
2105                         rc = -ENOENT;
2106                 } else {
2107                         rc = 0;
2108                         iface = &net->ksnn_interfaces[data->ioc_count];
2109
2110                         data->ioc_u32[0] = iface->ksni_ipaddr;
2111                         data->ioc_u32[1] = iface->ksni_netmask;
2112                         data->ioc_u32[2] = iface->ksni_npeers;
2113                         data->ioc_u32[3] = iface->ksni_nroutes;
2114                 }
2115
2116                 read_unlock(&ksocknal_data.ksnd_global_lock);
2117                 return rc;
2118         }
2119
2120         case IOC_LIBCFS_ADD_INTERFACE:
2121                 return ksocknal_add_interface(ni,
2122                                               data->ioc_u32[0], /* IP address */
2123                                               data->ioc_u32[1]); /* net mask */
2124
2125         case IOC_LIBCFS_DEL_INTERFACE:
2126                 return ksocknal_del_interface(ni,
2127                                               data->ioc_u32[0]); /* IP address */
2128
2129         case IOC_LIBCFS_GET_PEER: {
2130                 __u32            myip = 0;
2131                 __u32            ip = 0;
2132                 int              port = 0;
2133                 int              conn_count = 0;
2134                 int              share_count = 0;
2135
2136                 rc = ksocknal_get_peer_info(ni, data->ioc_count,
2137                                             &id, &myip, &ip, &port,
2138                                             &conn_count,  &share_count);
2139                 if (rc != 0)
2140                         return rc;
2141
2142                 data->ioc_nid    = id.nid;
2143                 data->ioc_count  = share_count;
2144                 data->ioc_u32[0] = ip;
2145                 data->ioc_u32[1] = port;
2146                 data->ioc_u32[2] = myip;
2147                 data->ioc_u32[3] = conn_count;
2148                 data->ioc_u32[4] = id.pid;
2149                 return 0;
2150         }
2151
2152         case IOC_LIBCFS_ADD_PEER:
2153                 id.nid = data->ioc_nid;
2154                 id.pid = LNET_PID_LUSTRE;
2155                 return ksocknal_add_peer (ni, id,
2156                                           data->ioc_u32[0], /* IP */
2157                                           data->ioc_u32[1]); /* port */
2158
2159         case IOC_LIBCFS_DEL_PEER:
2160                 id.nid = data->ioc_nid;
2161                 id.pid = LNET_PID_ANY;
2162                 return ksocknal_del_peer (ni, id,
2163                                           data->ioc_u32[0]); /* IP */
2164
2165         case IOC_LIBCFS_GET_CONN: {
2166                 int           txmem;
2167                 int           rxmem;
2168                 int           nagle;
2169                 struct ksock_conn *conn = ksocknal_get_conn_by_idx(ni, data->ioc_count);
2170
2171                 if (conn == NULL)
2172                         return -ENOENT;
2173
2174                 ksocknal_lib_get_conn_tunables(conn, &txmem, &rxmem, &nagle);
2175
2176                 data->ioc_count  = txmem;
2177                 data->ioc_nid    = conn->ksnc_peer->ksnp_id.nid;
2178                 data->ioc_flags  = nagle;
2179                 data->ioc_u32[0] = conn->ksnc_ipaddr;
2180                 data->ioc_u32[1] = conn->ksnc_port;
2181                 data->ioc_u32[2] = conn->ksnc_myipaddr;
2182                 data->ioc_u32[3] = conn->ksnc_type;
2183                 data->ioc_u32[4] = conn->ksnc_scheduler->kss_cpt;
2184                 data->ioc_u32[5] = rxmem;
2185                 data->ioc_u32[6] = conn->ksnc_peer->ksnp_id.pid;
2186                 ksocknal_conn_decref(conn);
2187                 return 0;
2188         }
2189
2190         case IOC_LIBCFS_CLOSE_CONNECTION:
2191                 id.nid = data->ioc_nid;
2192                 id.pid = LNET_PID_ANY;
2193                 return ksocknal_close_matching_conns (id,
2194                                                       data->ioc_u32[0]);
2195
2196         case IOC_LIBCFS_REGISTER_MYNID:
2197                 /* Ignore if this is a noop */
2198                 if (data->ioc_nid == ni->ni_nid)
2199                         return 0;
2200
2201                 CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID: %s(%s)\n",
2202                        libcfs_nid2str(data->ioc_nid),
2203                        libcfs_nid2str(ni->ni_nid));
2204                 return -EINVAL;
2205
2206         case IOC_LIBCFS_PUSH_CONNECTION:
2207                 id.nid = data->ioc_nid;
2208                 id.pid = LNET_PID_ANY;
2209                 return ksocknal_push(ni, id);
2210
2211         default:
2212                 return -EINVAL;
2213         }
2214         /* not reached */
2215 }
2216
2217 static void
2218 ksocknal_free_buffers (void)
2219 {
2220         LASSERT (atomic_read(&ksocknal_data.ksnd_nactive_txs) == 0);
2221
2222         if (ksocknal_data.ksnd_schedulers != NULL)
2223                 cfs_percpt_free(ksocknal_data.ksnd_schedulers);
2224
2225         LIBCFS_FREE (ksocknal_data.ksnd_peers,
2226                      sizeof(struct list_head) *
2227                      ksocknal_data.ksnd_peer_hash_size);
2228
2229         spin_lock(&ksocknal_data.ksnd_tx_lock);
2230
2231         if (!list_empty(&ksocknal_data.ksnd_idle_noop_txs)) {
2232                 struct list_head zlist;
2233                 struct ksock_tx *tx;
2234
2235                 list_add(&zlist, &ksocknal_data.ksnd_idle_noop_txs);
2236                 list_del_init(&ksocknal_data.ksnd_idle_noop_txs);
2237                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
2238
2239                 while (!list_empty(&zlist)) {
2240                         tx = list_entry(zlist.next, struct ksock_tx, tx_list);
2241                         list_del(&tx->tx_list);
2242                         LIBCFS_FREE(tx, tx->tx_desc_size);
2243                 }
2244         } else {
2245                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
2246         }
2247 }
2248
2249 static void
2250 ksocknal_base_shutdown(void)
2251 {
2252         struct ksock_sched *sched;
2253         int i;
2254
2255         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
2256                atomic_read (&libcfs_kmemory));
2257         LASSERT (ksocknal_data.ksnd_nnets == 0);
2258
2259         switch (ksocknal_data.ksnd_init) {
2260         default:
2261                 LASSERT(0);
2262                 /* fallthrough */
2263
2264         case SOCKNAL_INIT_ALL:
2265         case SOCKNAL_INIT_DATA:
2266                 LASSERT(ksocknal_data.ksnd_peers != NULL);
2267                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++)
2268                         LASSERT(list_empty(&ksocknal_data.ksnd_peers[i]));
2269
2270                 LASSERT(list_empty(&ksocknal_data.ksnd_nets));
2271                 LASSERT(list_empty(&ksocknal_data.ksnd_enomem_conns));
2272                 LASSERT(list_empty(&ksocknal_data.ksnd_zombie_conns));
2273                 LASSERT(list_empty(&ksocknal_data.ksnd_connd_connreqs));
2274                 LASSERT(list_empty(&ksocknal_data.ksnd_connd_routes));
2275
2276                 if (ksocknal_data.ksnd_schedulers != NULL) {
2277                         cfs_percpt_for_each(sched, i,
2278                                             ksocknal_data.ksnd_schedulers) {
2279
2280                                 LASSERT(list_empty(&sched->kss_tx_conns));
2281                                 LASSERT(list_empty(&sched->kss_rx_conns));
2282                                 LASSERT(list_empty(&sched->kss_zombie_noop_txs));
2283                                 LASSERT(sched->kss_nconns == 0);
2284                         }
2285                 }
2286
2287                 /* flag threads to terminate; wake and wait for them to die */
2288                 ksocknal_data.ksnd_shuttingdown = 1;
2289                 wake_up_all(&ksocknal_data.ksnd_connd_waitq);
2290                 wake_up_all(&ksocknal_data.ksnd_reaper_waitq);
2291
2292                 if (ksocknal_data.ksnd_schedulers != NULL) {
2293                         cfs_percpt_for_each(sched, i,
2294                                             ksocknal_data.ksnd_schedulers)
2295                                         wake_up_all(&sched->kss_waitq);
2296                 }
2297
2298                 i = 4;
2299                 read_lock(&ksocknal_data.ksnd_global_lock);
2300                 while (ksocknal_data.ksnd_nthreads != 0) {
2301                         i++;
2302                         /* power of 2? */
2303                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET,
2304                                 "waiting for %d threads to terminate\n",
2305                                 ksocknal_data.ksnd_nthreads);
2306                         read_unlock(&ksocknal_data.ksnd_global_lock);
2307                         set_current_state(TASK_UNINTERRUPTIBLE);
2308                         schedule_timeout(cfs_time_seconds(1));
2309                         read_lock(&ksocknal_data.ksnd_global_lock);
2310                 }
2311                 read_unlock(&ksocknal_data.ksnd_global_lock);
2312
2313                 ksocknal_free_buffers();
2314
2315                 ksocknal_data.ksnd_init = SOCKNAL_INIT_NOTHING;
2316                 break;
2317         }
2318
2319         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
2320                atomic_read (&libcfs_kmemory));
2321
2322         module_put(THIS_MODULE);
2323 }
2324
2325 static int
2326 ksocknal_base_startup(void)
2327 {
2328         struct ksock_sched *sched;
2329         int rc;
2330         int i;
2331
2332         LASSERT (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
2333         LASSERT (ksocknal_data.ksnd_nnets == 0);
2334
2335         memset (&ksocknal_data, 0, sizeof (ksocknal_data)); /* zero pointers */
2336
2337         ksocknal_data.ksnd_peer_hash_size = SOCKNAL_PEER_HASH_SIZE;
2338         LIBCFS_ALLOC(ksocknal_data.ksnd_peers,
2339                      sizeof(struct list_head) *
2340                      ksocknal_data.ksnd_peer_hash_size);
2341         if (ksocknal_data.ksnd_peers == NULL)
2342                 return -ENOMEM;
2343
2344         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++)
2345                 INIT_LIST_HEAD(&ksocknal_data.ksnd_peers[i]);
2346
2347         rwlock_init(&ksocknal_data.ksnd_global_lock);
2348         INIT_LIST_HEAD(&ksocknal_data.ksnd_nets);
2349
2350         spin_lock_init(&ksocknal_data.ksnd_reaper_lock);
2351         INIT_LIST_HEAD(&ksocknal_data.ksnd_enomem_conns);
2352         INIT_LIST_HEAD(&ksocknal_data.ksnd_zombie_conns);
2353         INIT_LIST_HEAD(&ksocknal_data.ksnd_deathrow_conns);
2354         init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
2355
2356         spin_lock_init(&ksocknal_data.ksnd_connd_lock);
2357         INIT_LIST_HEAD(&ksocknal_data.ksnd_connd_connreqs);
2358         INIT_LIST_HEAD(&ksocknal_data.ksnd_connd_routes);
2359         init_waitqueue_head(&ksocknal_data.ksnd_connd_waitq);
2360
2361         spin_lock_init(&ksocknal_data.ksnd_tx_lock);
2362         INIT_LIST_HEAD(&ksocknal_data.ksnd_idle_noop_txs);
2363
2364         /* NB memset above zeros whole of ksocknal_data */
2365
2366         /* flag lists/ptrs/locks initialised */
2367         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
2368         try_module_get(THIS_MODULE);
2369
2370         /* Create a scheduler block per available CPT */
2371         ksocknal_data.ksnd_schedulers = cfs_percpt_alloc(lnet_cpt_table(),
2372                                                          sizeof(*sched));
2373         if (ksocknal_data.ksnd_schedulers == NULL)
2374                 goto failed;
2375
2376         cfs_percpt_for_each(sched, i, ksocknal_data.ksnd_schedulers) {
2377                 int nthrs;
2378
2379                 /*
2380                  * make sure not to allocate more threads than there are
2381                  * cores/CPUs in teh CPT
2382                  */
2383                 nthrs = cfs_cpt_weight(lnet_cpt_table(), i);
2384                 if (*ksocknal_tunables.ksnd_nscheds > 0) {
2385                         nthrs = min(nthrs, *ksocknal_tunables.ksnd_nscheds);
2386                 } else {
2387                         /*
2388                          * max to half of CPUs, assume another half should be
2389                          * reserved for upper layer modules
2390                          */
2391                         nthrs = min(max(SOCKNAL_NSCHEDS, nthrs >> 1), nthrs);
2392                 }
2393
2394                 sched->kss_nthreads_max = nthrs;
2395                 sched->kss_cpt = i;
2396
2397                 spin_lock_init(&sched->kss_lock);
2398                 INIT_LIST_HEAD(&sched->kss_rx_conns);
2399                 INIT_LIST_HEAD(&sched->kss_tx_conns);
2400                 INIT_LIST_HEAD(&sched->kss_zombie_noop_txs);
2401                 init_waitqueue_head(&sched->kss_waitq);
2402         }
2403
2404         ksocknal_data.ksnd_connd_starting         = 0;
2405         ksocknal_data.ksnd_connd_failed_stamp     = 0;
2406         ksocknal_data.ksnd_connd_starting_stamp   = ktime_get_real_seconds();
2407         /* must have at least 2 connds to remain responsive to accepts while
2408          * connecting */
2409         if (*ksocknal_tunables.ksnd_nconnds < SOCKNAL_CONND_RESV + 1)
2410                 *ksocknal_tunables.ksnd_nconnds = SOCKNAL_CONND_RESV + 1;
2411
2412         if (*ksocknal_tunables.ksnd_nconnds_max <
2413             *ksocknal_tunables.ksnd_nconnds) {
2414                 ksocknal_tunables.ksnd_nconnds_max =
2415                         ksocknal_tunables.ksnd_nconnds;
2416         }
2417
2418         for (i = 0; i < *ksocknal_tunables.ksnd_nconnds; i++) {
2419                 char name[16];
2420                 spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
2421                 ksocknal_data.ksnd_connd_starting++;
2422                 spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
2423
2424
2425                 snprintf(name, sizeof(name), "socknal_cd%02d", i);
2426                 rc = ksocknal_thread_start(ksocknal_connd,
2427                                            (void *)((uintptr_t)i), name);
2428                 if (rc != 0) {
2429                         spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
2430                         ksocknal_data.ksnd_connd_starting--;
2431                         spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
2432                         CERROR("Can't spawn socknal connd: %d\n", rc);
2433                         goto failed;
2434                 }
2435         }
2436
2437         rc = ksocknal_thread_start(ksocknal_reaper, NULL, "socknal_reaper");
2438         if (rc != 0) {
2439                 CERROR ("Can't spawn socknal reaper: %d\n", rc);
2440                 goto failed;
2441         }
2442
2443         /* flag everything initialised */
2444         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
2445
2446         return 0;
2447
2448  failed:
2449         ksocknal_base_shutdown();
2450         return -ENETDOWN;
2451 }
2452
2453 static void
2454 ksocknal_debug_peerhash(struct lnet_ni *ni)
2455 {
2456         struct ksock_peer_ni *peer_ni = NULL;
2457         struct list_head *tmp;
2458         int i;
2459
2460         read_lock(&ksocknal_data.ksnd_global_lock);
2461
2462         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
2463                 list_for_each(tmp, &ksocknal_data.ksnd_peers[i]) {
2464                         peer_ni = list_entry(tmp, struct ksock_peer_ni, ksnp_list);
2465
2466                         if (peer_ni->ksnp_ni == ni)
2467                                 break;
2468
2469                         peer_ni = NULL;
2470                 }
2471         }
2472
2473         if (peer_ni != NULL) {
2474                 struct ksock_route *route;
2475                 struct ksock_conn  *conn;
2476
2477                 CWARN("Active peer_ni on shutdown: %s, ref %d, "
2478                       "closing %d, accepting %d, err %d, zcookie %llu, "
2479                       "txq %d, zc_req %d\n", libcfs_id2str(peer_ni->ksnp_id),
2480                       atomic_read(&peer_ni->ksnp_refcount),
2481                       peer_ni->ksnp_closing,
2482                       peer_ni->ksnp_accepting, peer_ni->ksnp_error,
2483                       peer_ni->ksnp_zc_next_cookie,
2484                       !list_empty(&peer_ni->ksnp_tx_queue),
2485                       !list_empty(&peer_ni->ksnp_zc_req_list));
2486
2487                 list_for_each(tmp, &peer_ni->ksnp_routes) {
2488                         route = list_entry(tmp, struct ksock_route, ksnr_list);
2489                         CWARN("Route: ref %d, schd %d, conn %d, cnted %d, "
2490                               "del %d\n", atomic_read(&route->ksnr_refcount),
2491                               route->ksnr_scheduled, route->ksnr_connecting,
2492                               route->ksnr_connected, route->ksnr_deleted);
2493                 }
2494
2495                 list_for_each(tmp, &peer_ni->ksnp_conns) {
2496                         conn = list_entry(tmp, struct ksock_conn, ksnc_list);
2497                         CWARN("Conn: ref %d, sref %d, t %d, c %d\n",
2498                               atomic_read(&conn->ksnc_conn_refcount),
2499                               atomic_read(&conn->ksnc_sock_refcount),
2500                               conn->ksnc_type, conn->ksnc_closing);
2501                 }
2502         }
2503
2504         read_unlock(&ksocknal_data.ksnd_global_lock);
2505 }
2506
2507 void
2508 ksocknal_shutdown(struct lnet_ni *ni)
2509 {
2510         struct ksock_net *net = ni->ni_data;
2511         struct lnet_process_id anyid = {
2512                 .nid = LNET_NID_ANY,
2513                 .pid = LNET_PID_ANY,
2514         };
2515         int i;
2516
2517         LASSERT(ksocknal_data.ksnd_init == SOCKNAL_INIT_ALL);
2518         LASSERT(ksocknal_data.ksnd_nnets > 0);
2519
2520         /* prevent new peers */
2521         atomic_add(SOCKNAL_SHUTDOWN_BIAS, &net->ksnn_npeers);
2522
2523         /* Delete all peers */
2524         ksocknal_del_peer(ni, anyid, 0);
2525
2526         /* Wait for all peer_ni state to clean up */
2527         i = 2;
2528         while (atomic_read(&net->ksnn_npeers) > SOCKNAL_SHUTDOWN_BIAS) {
2529                 i++;
2530                 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
2531                        "waiting for %d peers to disconnect\n",
2532                        atomic_read(&net->ksnn_npeers) - SOCKNAL_SHUTDOWN_BIAS);
2533                 set_current_state(TASK_UNINTERRUPTIBLE);
2534                 schedule_timeout(cfs_time_seconds(1));
2535
2536                 ksocknal_debug_peerhash(ni);
2537         }
2538
2539         for (i = 0; i < net->ksnn_ninterfaces; i++) {
2540                 LASSERT(net->ksnn_interfaces[i].ksni_npeers == 0);
2541                 LASSERT(net->ksnn_interfaces[i].ksni_nroutes == 0);
2542         }
2543
2544         list_del(&net->ksnn_list);
2545         LIBCFS_FREE(net, sizeof(*net));
2546
2547         ksocknal_data.ksnd_nnets--;
2548         if (ksocknal_data.ksnd_nnets == 0)
2549                 ksocknal_base_shutdown();
2550 }
2551
2552 static int
2553 ksocknal_search_new_ipif(struct ksock_net *net)
2554 {
2555         int new_ipif = 0;
2556         int i;
2557
2558         for (i = 0; i < net->ksnn_ninterfaces; i++) {
2559                 char *ifnam = &net->ksnn_interfaces[i].ksni_name[0];
2560                 char *colon = strchr(ifnam, ':');
2561                 int found  = 0;
2562                 struct ksock_net *tmp;
2563                 int j;
2564
2565                 if (colon != NULL) /* ignore alias device */
2566                         *colon = 0;
2567
2568                 list_for_each_entry(tmp, &ksocknal_data.ksnd_nets,
2569                                         ksnn_list) {
2570                         for (j = 0; !found && j < tmp->ksnn_ninterfaces; j++) {
2571                                 char *ifnam2 = &tmp->ksnn_interfaces[j].\
2572                                              ksni_name[0];
2573                                 char *colon2 = strchr(ifnam2, ':');
2574
2575                                 if (colon2 != NULL)
2576                                         *colon2 = 0;
2577
2578                                 found = strcmp(ifnam, ifnam2) == 0;
2579                                 if (colon2 != NULL)
2580                                         *colon2 = ':';
2581                         }
2582                         if (found)
2583                                 break;
2584                 }
2585
2586                 new_ipif += !found;
2587                 if (colon != NULL)
2588                         *colon = ':';
2589         }
2590
2591         return new_ipif;
2592 }
2593
2594 static int
2595 ksocknal_start_schedulers(struct ksock_sched *sched)
2596 {
2597         int     nthrs;
2598         int     rc = 0;
2599         int     i;
2600
2601         if (sched->kss_nthreads == 0) {
2602                 if (*ksocknal_tunables.ksnd_nscheds > 0) {
2603                         nthrs = sched->kss_nthreads_max;
2604                 } else {
2605                         nthrs = cfs_cpt_weight(lnet_cpt_table(),
2606                                                sched->kss_cpt);
2607                         nthrs = min(max(SOCKNAL_NSCHEDS, nthrs >> 1), nthrs);
2608                         nthrs = min(SOCKNAL_NSCHEDS_HIGH, nthrs);
2609                 }
2610                 nthrs = min(nthrs, sched->kss_nthreads_max);
2611         } else {
2612                 LASSERT(sched->kss_nthreads <= sched->kss_nthreads_max);
2613                 /* increase two threads if there is new interface */
2614                 nthrs = min(2, sched->kss_nthreads_max - sched->kss_nthreads);
2615         }
2616
2617         for (i = 0; i < nthrs; i++) {
2618                 long id;
2619                 char name[20];
2620
2621                 id = KSOCK_THREAD_ID(sched->kss_cpt, sched->kss_nthreads + i);
2622                 snprintf(name, sizeof(name), "socknal_sd%02d_%02d",
2623                          sched->kss_cpt, (int)KSOCK_THREAD_SID(id));
2624
2625                 rc = ksocknal_thread_start(ksocknal_scheduler,
2626                                            (void *)id, name);
2627                 if (rc == 0)
2628                         continue;
2629
2630                 CERROR("Can't spawn thread %d for scheduler[%d]: %d\n",
2631                        sched->kss_cpt, (int) KSOCK_THREAD_SID(id), rc);
2632                 break;
2633         }
2634
2635         sched->kss_nthreads += i;
2636         return rc;
2637 }
2638
2639 static int
2640 ksocknal_net_start_threads(struct ksock_net *net, __u32 *cpts, int ncpts)
2641 {
2642         int newif = ksocknal_search_new_ipif(net);
2643         int rc;
2644         int i;
2645
2646         if (ncpts > 0 && ncpts > cfs_cpt_number(lnet_cpt_table()))
2647                 return -EINVAL;
2648
2649         for (i = 0; i < ncpts; i++) {
2650                 struct ksock_sched *sched;
2651                 int cpt = (cpts == NULL) ? i : cpts[i];
2652
2653                 LASSERT(cpt < cfs_cpt_number(lnet_cpt_table()));
2654                 sched = ksocknal_data.ksnd_schedulers[cpt];
2655
2656                 if (!newif && sched->kss_nthreads > 0)
2657                         continue;
2658
2659                 rc = ksocknal_start_schedulers(sched);
2660                 if (rc != 0)
2661                         return rc;
2662         }
2663         return 0;
2664 }
2665
2666 int
2667 ksocknal_startup(struct lnet_ni *ni)
2668 {
2669         struct ksock_net *net;
2670         struct lnet_ioctl_config_lnd_cmn_tunables *net_tunables;
2671         struct ksock_interface *ksi = NULL;
2672         struct lnet_inetdev *ifaces = NULL;
2673         int i = 0;
2674         int rc;
2675
2676         LASSERT (ni->ni_net->net_lnd == &the_ksocklnd);
2677
2678         if (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING) {
2679                 rc = ksocknal_base_startup();
2680                 if (rc != 0)
2681                         return rc;
2682         }
2683
2684         LIBCFS_ALLOC(net, sizeof(*net));
2685         if (net == NULL)
2686                 goto fail_0;
2687
2688         net->ksnn_incarnation = ktime_get_real_ns();
2689         ni->ni_data = net;
2690         net_tunables = &ni->ni_net->net_tunables;
2691
2692         if (net_tunables->lct_peer_timeout == -1)
2693                 net_tunables->lct_peer_timeout =
2694                         *ksocknal_tunables.ksnd_peertimeout;
2695
2696         if (net_tunables->lct_max_tx_credits == -1)
2697                 net_tunables->lct_max_tx_credits =
2698                         *ksocknal_tunables.ksnd_credits;
2699
2700         if (net_tunables->lct_peer_tx_credits == -1)
2701                 net_tunables->lct_peer_tx_credits =
2702                         *ksocknal_tunables.ksnd_peertxcredits;
2703
2704         if (net_tunables->lct_peer_tx_credits >
2705             net_tunables->lct_max_tx_credits)
2706                 net_tunables->lct_peer_tx_credits =
2707                         net_tunables->lct_max_tx_credits;
2708
2709         if (net_tunables->lct_peer_rtr_credits == -1)
2710                 net_tunables->lct_peer_rtr_credits =
2711                         *ksocknal_tunables.ksnd_peerrtrcredits;
2712
2713         rc = lnet_inet_enumerate(&ifaces, ni->ni_net_ns);
2714         if (rc < 0)
2715                 goto fail_1;
2716
2717         if (!ni->ni_interfaces[0]) {
2718                 ksi = &net->ksnn_interfaces[0];
2719
2720                 /* Use the first discovered interface */
2721                 net->ksnn_ninterfaces = 1;
2722                 ni->ni_dev_cpt = ifaces[0].li_cpt;
2723                 ksi->ksni_ipaddr = ifaces[0].li_ipaddr;
2724                 ksi->ksni_netmask = ifaces[0].li_netmask;
2725                 strlcpy(ksi->ksni_name, ifaces[0].li_name,
2726                         sizeof(ksi->ksni_name));
2727         } else {
2728                 /* Before Multi-Rail ksocklnd would manage
2729                  * multiple interfaces with its own tcp bonding.
2730                  * If we encounter an old configuration using
2731                  * this tcp bonding approach then we need to
2732                  * handle more than one ni_interfaces.
2733                  *
2734                  * In Multi-Rail configuration only ONE ni_interface
2735                  * should exist. Each IP alias should be mapped to
2736                  * each 'struct net_ni'.
2737                  */
2738                 for (i = 0; i < LNET_INTERFACES_NUM; i++) {
2739                         int j;
2740
2741                         if (!ni->ni_interfaces[i])
2742                                 break;
2743
2744                         for (j = 0; j < LNET_INTERFACES_NUM;  j++) {
2745                                 if (i != j && ni->ni_interfaces[j] &&
2746                                     strcmp(ni->ni_interfaces[i],
2747                                            ni->ni_interfaces[j]) == 0) {
2748                                         rc = -EEXIST;
2749                                         CERROR("ksocklnd: found duplicate %s at %d and %d, rc = %d\n",
2750                                                ni->ni_interfaces[i], i, j, rc);
2751                                         goto fail_1;
2752                                 }
2753                         }
2754
2755                         for (j = 0; j < rc; j++) {
2756                                 if (strcmp(ifaces[j].li_name,
2757                                            ni->ni_interfaces[i]) != 0)
2758                                         continue;
2759
2760                                 ksi = &net->ksnn_interfaces[j];
2761                                 ni->ni_dev_cpt = ifaces[j].li_cpt;
2762                                 ksi->ksni_ipaddr = ifaces[j].li_ipaddr;
2763                                 ksi->ksni_netmask = ifaces[j].li_netmask;
2764                                 strlcpy(ksi->ksni_name, ifaces[j].li_name,
2765                                         sizeof(ksi->ksni_name));
2766                                 net->ksnn_ninterfaces++;
2767                                 break;
2768                         }
2769                 }
2770                 /* ni_interfaces don't map to all network interfaces */
2771                 if (!ksi || net->ksnn_ninterfaces != i) {
2772                         CERROR("ksocklnd: requested %d but only %d interfaces found\n",
2773                                i, net->ksnn_ninterfaces);
2774                         goto fail_1;
2775                 }
2776         }
2777
2778         /* call it before add it to ksocknal_data.ksnd_nets */
2779         rc = ksocknal_net_start_threads(net, ni->ni_cpts, ni->ni_ncpts);
2780         if (rc != 0)
2781                 goto fail_1;
2782
2783         LASSERT(ksi);
2784         ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ksi->ksni_ipaddr);
2785         list_add(&net->ksnn_list, &ksocknal_data.ksnd_nets);
2786
2787         ksocknal_data.ksnd_nnets++;
2788
2789         return 0;
2790
2791  fail_1:
2792         LIBCFS_FREE(net, sizeof(*net));
2793  fail_0:
2794         if (ksocknal_data.ksnd_nnets == 0)
2795                 ksocknal_base_shutdown();
2796
2797         return -ENETDOWN;
2798 }
2799
2800
2801 static void __exit ksocklnd_exit(void)
2802 {
2803         lnet_unregister_lnd(&the_ksocklnd);
2804 }
2805
2806 static int __init ksocklnd_init(void)
2807 {
2808         int rc;
2809
2810         /* check ksnr_connected/connecting field large enough */
2811         BUILD_BUG_ON(SOCKLND_CONN_NTYPES > 4);
2812         BUILD_BUG_ON(SOCKLND_CONN_ACK != SOCKLND_CONN_BULK_IN);
2813
2814         /* initialize the_ksocklnd */
2815         the_ksocklnd.lnd_type     = SOCKLND;
2816         the_ksocklnd.lnd_startup  = ksocknal_startup;
2817         the_ksocklnd.lnd_shutdown = ksocknal_shutdown;
2818         the_ksocklnd.lnd_ctl      = ksocknal_ctl;
2819         the_ksocklnd.lnd_send     = ksocknal_send;
2820         the_ksocklnd.lnd_recv     = ksocknal_recv;
2821         the_ksocklnd.lnd_notify_peer_down   = ksocknal_notify_gw_down;
2822         the_ksocklnd.lnd_query    = ksocknal_query;
2823         the_ksocklnd.lnd_accept   = ksocknal_accept;
2824
2825         rc = ksocknal_tunables_init();
2826         if (rc != 0)
2827                 return rc;
2828
2829         lnet_register_lnd(&the_ksocklnd);
2830
2831         return 0;
2832 }
2833
2834 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2835 MODULE_DESCRIPTION("TCP Socket LNet Network Driver");
2836 MODULE_VERSION("2.8.0");
2837 MODULE_LICENSE("GPL");
2838
2839 module_init(ksocklnd_init);
2840 module_exit(ksocklnd_exit);