Whamcloud - gitweb
LU-6068 misc: update Intel copyright messages 2014
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/socklnd/socklnd.c
37  *
38  * Author: Zach Brown <zab@zabbo.net>
39  * Author: Peter J. Braam <braam@clusterfs.com>
40  * Author: Phil Schwan <phil@clusterfs.com>
41  * Author: Eric Barton <eric@bartonsoftware.com>
42  */
43
44 #include "socklnd.h"
45
46 lnd_t                   the_ksocklnd;
47 ksock_nal_data_t        ksocknal_data;
48
49 ksock_interface_t *
50 ksocknal_ip2iface(lnet_ni_t *ni, __u32 ip)
51 {
52         ksock_net_t       *net = ni->ni_data;
53         int                i;
54         ksock_interface_t *iface;
55
56         for (i = 0; i < net->ksnn_ninterfaces; i++) {
57                 LASSERT(i < LNET_MAX_INTERFACES);
58                 iface = &net->ksnn_interfaces[i];
59
60                 if (iface->ksni_ipaddr == ip)
61                         return (iface);
62         }
63
64         return (NULL);
65 }
66
67 ksock_route_t *
68 ksocknal_create_route (__u32 ipaddr, int port)
69 {
70         ksock_route_t *route;
71
72         LIBCFS_ALLOC (route, sizeof (*route));
73         if (route == NULL)
74                 return (NULL);
75
76         atomic_set (&route->ksnr_refcount, 1);
77         route->ksnr_peer = NULL;
78         route->ksnr_retry_interval = 0;         /* OK to connect at any time */
79         route->ksnr_ipaddr = ipaddr;
80         route->ksnr_port = port;
81         route->ksnr_scheduled = 0;
82         route->ksnr_connecting = 0;
83         route->ksnr_connected = 0;
84         route->ksnr_deleted = 0;
85         route->ksnr_conn_count = 0;
86         route->ksnr_share_count = 0;
87
88         return (route);
89 }
90
91 void
92 ksocknal_destroy_route (ksock_route_t *route)
93 {
94         LASSERT (atomic_read(&route->ksnr_refcount) == 0);
95
96         if (route->ksnr_peer != NULL)
97                 ksocknal_peer_decref(route->ksnr_peer);
98
99         LIBCFS_FREE (route, sizeof (*route));
100 }
101
102 int
103 ksocknal_create_peer (ksock_peer_t **peerp, lnet_ni_t *ni, lnet_process_id_t id)
104 {
105         ksock_net_t   *net = ni->ni_data;
106         ksock_peer_t  *peer;
107
108         LASSERT (id.nid != LNET_NID_ANY);
109         LASSERT (id.pid != LNET_PID_ANY);
110         LASSERT (!in_interrupt());
111
112         LIBCFS_ALLOC (peer, sizeof (*peer));
113         if (peer == NULL)
114                 return -ENOMEM;
115
116         memset (peer, 0, sizeof (*peer));       /* NULL pointers/clear flags etc */
117
118         peer->ksnp_ni = ni;
119         peer->ksnp_id = id;
120         atomic_set (&peer->ksnp_refcount, 1);   /* 1 ref for caller */
121         peer->ksnp_closing = 0;
122         peer->ksnp_accepting = 0;
123         peer->ksnp_proto = NULL;
124         peer->ksnp_last_alive = 0;
125         peer->ksnp_zc_next_cookie = SOCKNAL_KEEPALIVE_PING + 1;
126
127         INIT_LIST_HEAD(&peer->ksnp_conns);
128         INIT_LIST_HEAD(&peer->ksnp_routes);
129         INIT_LIST_HEAD(&peer->ksnp_tx_queue);
130         INIT_LIST_HEAD(&peer->ksnp_zc_req_list);
131         spin_lock_init(&peer->ksnp_lock);
132
133         spin_lock_bh(&net->ksnn_lock);
134
135         if (net->ksnn_shutdown) {
136                 spin_unlock_bh(&net->ksnn_lock);
137
138                 LIBCFS_FREE(peer, sizeof(*peer));
139                 CERROR("Can't create peer: network shutdown\n");
140                 return -ESHUTDOWN;
141         }
142
143         net->ksnn_npeers++;
144
145         spin_unlock_bh(&net->ksnn_lock);
146
147         *peerp = peer;
148         return 0;
149 }
150
151 void
152 ksocknal_destroy_peer (ksock_peer_t *peer)
153 {
154         ksock_net_t    *net = peer->ksnp_ni->ni_data;
155
156         CDEBUG (D_NET, "peer %s %p deleted\n",
157                 libcfs_id2str(peer->ksnp_id), peer);
158
159         LASSERT(atomic_read(&peer->ksnp_refcount) == 0);
160         LASSERT(peer->ksnp_accepting == 0);
161         LASSERT(list_empty(&peer->ksnp_conns));
162         LASSERT(list_empty(&peer->ksnp_routes));
163         LASSERT(list_empty(&peer->ksnp_tx_queue));
164         LASSERT(list_empty(&peer->ksnp_zc_req_list));
165
166         LIBCFS_FREE(peer, sizeof(*peer));
167
168         /* NB a peer's connections and routes keep a reference on their peer
169          * until they are destroyed, so we can be assured that _all_ state to
170          * do with this peer has been cleaned up when its refcount drops to
171          * zero. */
172         spin_lock_bh(&net->ksnn_lock);
173         net->ksnn_npeers--;
174         spin_unlock_bh(&net->ksnn_lock);
175 }
176
177 ksock_peer_t *
178 ksocknal_find_peer_locked (lnet_ni_t *ni, lnet_process_id_t id)
179 {
180         struct list_head *peer_list = ksocknal_nid2peerlist(id.nid);
181         struct list_head *tmp;
182         ksock_peer_t     *peer;
183
184         list_for_each(tmp, peer_list) {
185
186                 peer = list_entry(tmp, ksock_peer_t, ksnp_list);
187
188                 LASSERT(!peer->ksnp_closing);
189
190                 if (peer->ksnp_ni != ni)
191                         continue;
192
193                 if (peer->ksnp_id.nid != id.nid ||
194                     peer->ksnp_id.pid != id.pid)
195                         continue;
196
197                 CDEBUG(D_NET, "got peer [%p] -> %s (%d)\n",
198                        peer, libcfs_id2str(id),
199                        atomic_read(&peer->ksnp_refcount));
200                 return peer;
201         }
202         return NULL;
203 }
204
205 ksock_peer_t *
206 ksocknal_find_peer (lnet_ni_t *ni, lnet_process_id_t id)
207 {
208         ksock_peer_t     *peer;
209
210         read_lock(&ksocknal_data.ksnd_global_lock);
211         peer = ksocknal_find_peer_locked(ni, id);
212         if (peer != NULL)                       /* +1 ref for caller? */
213                 ksocknal_peer_addref(peer);
214         read_unlock(&ksocknal_data.ksnd_global_lock);
215
216         return (peer);
217 }
218
219 void
220 ksocknal_unlink_peer_locked (ksock_peer_t *peer)
221 {
222         int                i;
223         __u32              ip;
224         ksock_interface_t *iface;
225
226         for (i = 0; i < peer->ksnp_n_passive_ips; i++) {
227                 LASSERT (i < LNET_MAX_INTERFACES);
228                 ip = peer->ksnp_passive_ips[i];
229
230                 iface = ksocknal_ip2iface(peer->ksnp_ni, ip);
231                 /* All IPs in peer->ksnp_passive_ips[] come from the
232                  * interface list, therefore the call must succeed. */
233                 LASSERT (iface != NULL);
234
235                 CDEBUG(D_NET, "peer=%p iface=%p ksni_nroutes=%d\n",
236                        peer, iface, iface->ksni_nroutes);
237                 iface->ksni_npeers--;
238         }
239
240         LASSERT(list_empty(&peer->ksnp_conns));
241         LASSERT(list_empty(&peer->ksnp_routes));
242         LASSERT(!peer->ksnp_closing);
243         peer->ksnp_closing = 1;
244         list_del(&peer->ksnp_list);
245         /* lose peerlist's ref */
246         ksocknal_peer_decref(peer);
247 }
248
249 int
250 ksocknal_get_peer_info (lnet_ni_t *ni, int index,
251                         lnet_process_id_t *id, __u32 *myip, __u32 *peer_ip,
252                         int *port, int *conn_count, int *share_count)
253 {
254         ksock_peer_t      *peer;
255         struct list_head  *ptmp;
256         ksock_route_t     *route;
257         struct list_head  *rtmp;
258         int                i;
259         int                j;
260         int                rc = -ENOENT;
261
262         read_lock(&ksocknal_data.ksnd_global_lock);
263
264         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
265                 list_for_each(ptmp, &ksocknal_data.ksnd_peers[i]) {
266                         peer = list_entry(ptmp, ksock_peer_t, ksnp_list);
267
268                         if (peer->ksnp_ni != ni)
269                                 continue;
270
271                         if (peer->ksnp_n_passive_ips == 0 &&
272                             list_empty(&peer->ksnp_routes)) {
273                                 if (index-- > 0)
274                                         continue;
275
276                                 *id = peer->ksnp_id;
277                                 *myip = 0;
278                                 *peer_ip = 0;
279                                 *port = 0;
280                                 *conn_count = 0;
281                                 *share_count = 0;
282                                 rc = 0;
283                                 goto out;
284                         }
285
286                         for (j = 0; j < peer->ksnp_n_passive_ips; j++) {
287                                 if (index-- > 0)
288                                         continue;
289
290                                 *id = peer->ksnp_id;
291                                 *myip = peer->ksnp_passive_ips[j];
292                                 *peer_ip = 0;
293                                 *port = 0;
294                                 *conn_count = 0;
295                                 *share_count = 0;
296                                 rc = 0;
297                                 goto out;
298                         }
299
300                         list_for_each(rtmp, &peer->ksnp_routes) {
301                                 if (index-- > 0)
302                                         continue;
303
304                                 route = list_entry(rtmp, ksock_route_t,
305                                                    ksnr_list);
306
307                                 *id = peer->ksnp_id;
308                                 *myip = route->ksnr_myipaddr;
309                                 *peer_ip = route->ksnr_ipaddr;
310                                 *port = route->ksnr_port;
311                                 *conn_count = route->ksnr_conn_count;
312                                 *share_count = route->ksnr_share_count;
313                                 rc = 0;
314                                 goto out;
315                         }
316                 }
317         }
318 out:
319         read_unlock(&ksocknal_data.ksnd_global_lock);
320         return rc;
321 }
322
323 void
324 ksocknal_associate_route_conn_locked(ksock_route_t *route, ksock_conn_t *conn)
325 {
326         ksock_peer_t      *peer = route->ksnr_peer;
327         int                type = conn->ksnc_type;
328         ksock_interface_t *iface;
329
330         conn->ksnc_route = route;
331         ksocknal_route_addref(route);
332
333         if (route->ksnr_myipaddr != conn->ksnc_myipaddr) {
334                 if (route->ksnr_myipaddr == 0) {
335                         /* route wasn't bound locally yet (the initial route) */
336                         CDEBUG(D_NET, "Binding %s %u.%u.%u.%u to %u.%u.%u.%u\n",
337                                libcfs_id2str(peer->ksnp_id),
338                                HIPQUAD(route->ksnr_ipaddr),
339                                HIPQUAD(conn->ksnc_myipaddr));
340                 } else {
341                         CDEBUG(D_NET, "Rebinding %s %u.%u.%u.%u from "
342                                "%u.%u.%u.%u to %u.%u.%u.%u\n",
343                                libcfs_id2str(peer->ksnp_id),
344                                HIPQUAD(route->ksnr_ipaddr),
345                                HIPQUAD(route->ksnr_myipaddr),
346                                HIPQUAD(conn->ksnc_myipaddr));
347
348                         iface = ksocknal_ip2iface(route->ksnr_peer->ksnp_ni,
349                                                   route->ksnr_myipaddr);
350                         if (iface != NULL)
351                                 iface->ksni_nroutes--;
352                 }
353                 route->ksnr_myipaddr = conn->ksnc_myipaddr;
354                 iface = ksocknal_ip2iface(route->ksnr_peer->ksnp_ni,
355                                           route->ksnr_myipaddr);
356                 if (iface != NULL)
357                         iface->ksni_nroutes++;
358         }
359
360         route->ksnr_connected |= (1<<type);
361         route->ksnr_conn_count++;
362
363         /* Successful connection => further attempts can
364          * proceed immediately */
365         route->ksnr_retry_interval = 0;
366 }
367
368 void
369 ksocknal_add_route_locked (ksock_peer_t *peer, ksock_route_t *route)
370 {
371         struct list_head *tmp;
372         ksock_conn_t     *conn;
373         ksock_route_t    *route2;
374
375         LASSERT(!peer->ksnp_closing);
376         LASSERT(route->ksnr_peer == NULL);
377         LASSERT(!route->ksnr_scheduled);
378         LASSERT(!route->ksnr_connecting);
379         LASSERT(route->ksnr_connected == 0);
380
381         /* LASSERT(unique) */
382         list_for_each(tmp, &peer->ksnp_routes) {
383                 route2 = list_entry(tmp, ksock_route_t, ksnr_list);
384
385                 if (route2->ksnr_ipaddr == route->ksnr_ipaddr) {
386                         CERROR("Duplicate route %s %u.%u.%u.%u\n",
387                                libcfs_id2str(peer->ksnp_id),
388                                HIPQUAD(route->ksnr_ipaddr));
389                         LBUG();
390                 }
391         }
392
393         route->ksnr_peer = peer;
394         ksocknal_peer_addref(peer);
395         /* peer's routelist takes over my ref on 'route' */
396         list_add_tail(&route->ksnr_list, &peer->ksnp_routes);
397
398         list_for_each(tmp, &peer->ksnp_conns) {
399                 conn = list_entry(tmp, ksock_conn_t, ksnc_list);
400
401                 if (conn->ksnc_ipaddr != route->ksnr_ipaddr)
402                         continue;
403
404                 ksocknal_associate_route_conn_locked(route, conn);
405                 /* keep going (typed routes) */
406         }
407 }
408
409 void
410 ksocknal_del_route_locked (ksock_route_t *route)
411 {
412         ksock_peer_t      *peer = route->ksnr_peer;
413         ksock_interface_t *iface;
414         ksock_conn_t      *conn;
415         struct list_head  *ctmp;
416         struct list_head  *cnxt;
417
418         LASSERT(!route->ksnr_deleted);
419
420         /* Close associated conns */
421         list_for_each_safe(ctmp, cnxt, &peer->ksnp_conns) {
422                 conn = list_entry(ctmp, ksock_conn_t, ksnc_list);
423
424                 if (conn->ksnc_route != route)
425                         continue;
426
427                 ksocknal_close_conn_locked(conn, 0);
428         }
429
430         if (route->ksnr_myipaddr != 0) {
431                 iface = ksocknal_ip2iface(route->ksnr_peer->ksnp_ni,
432                                           route->ksnr_myipaddr);
433                 if (iface != NULL)
434                         iface->ksni_nroutes--;
435         }
436
437         route->ksnr_deleted = 1;
438         list_del(&route->ksnr_list);
439         ksocknal_route_decref(route);           /* drop peer's ref */
440
441         if (list_empty(&peer->ksnp_routes) &&
442             list_empty(&peer->ksnp_conns)) {
443                 /* I've just removed the last route to a peer with no active
444                  * connections */
445                 ksocknal_unlink_peer_locked(peer);
446         }
447 }
448
449 int
450 ksocknal_add_peer(lnet_ni_t *ni, lnet_process_id_t id, __u32 ipaddr, int port)
451 {
452         struct list_head *tmp;
453         ksock_peer_t     *peer;
454         ksock_peer_t     *peer2;
455         ksock_route_t    *route;
456         ksock_route_t    *route2;
457         int               rc;
458
459         if (id.nid == LNET_NID_ANY ||
460             id.pid == LNET_PID_ANY)
461                 return (-EINVAL);
462
463         /* Have a brand new peer ready... */
464         rc = ksocknal_create_peer(&peer, ni, id);
465         if (rc != 0)
466                 return rc;
467
468         route = ksocknal_create_route (ipaddr, port);
469         if (route == NULL) {
470                 ksocknal_peer_decref(peer);
471                 return (-ENOMEM);
472         }
473
474         write_lock_bh(&ksocknal_data.ksnd_global_lock);
475
476         /* always called with a ref on ni, so shutdown can't have started */
477         LASSERT (((ksock_net_t *) ni->ni_data)->ksnn_shutdown == 0);
478
479         peer2 = ksocknal_find_peer_locked(ni, id);
480         if (peer2 != NULL) {
481                 ksocknal_peer_decref(peer);
482                 peer = peer2;
483         } else {
484                 /* peer table takes my ref on peer */
485                 list_add_tail(&peer->ksnp_list,
486                               ksocknal_nid2peerlist(id.nid));
487         }
488
489         route2 = NULL;
490         list_for_each(tmp, &peer->ksnp_routes) {
491                 route2 = list_entry(tmp, ksock_route_t, ksnr_list);
492
493                 if (route2->ksnr_ipaddr == ipaddr)
494                         break;
495
496                 route2 = NULL;
497         }
498         if (route2 == NULL) {
499                 ksocknal_add_route_locked(peer, route);
500                 route->ksnr_share_count++;
501         } else {
502                 ksocknal_route_decref(route);
503                 route2->ksnr_share_count++;
504         }
505
506         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
507
508         return 0;
509 }
510
511 void
512 ksocknal_del_peer_locked (ksock_peer_t *peer, __u32 ip)
513 {
514         ksock_conn_t     *conn;
515         ksock_route_t    *route;
516         struct list_head *tmp;
517         struct list_head *nxt;
518         int               nshared;
519
520         LASSERT(!peer->ksnp_closing);
521
522         /* Extra ref prevents peer disappearing until I'm done with it */
523         ksocknal_peer_addref(peer);
524
525         list_for_each_safe(tmp, nxt, &peer->ksnp_routes) {
526                 route = list_entry(tmp, ksock_route_t, ksnr_list);
527
528                 /* no match */
529                 if (!(ip == 0 || route->ksnr_ipaddr == ip))
530                         continue;
531
532                 route->ksnr_share_count = 0;
533                 /* This deletes associated conns too */
534                 ksocknal_del_route_locked(route);
535         }
536
537         nshared = 0;
538         list_for_each_safe(tmp, nxt, &peer->ksnp_routes) {
539                 route = list_entry(tmp, ksock_route_t, ksnr_list);
540                 nshared += route->ksnr_share_count;
541         }
542
543         if (nshared == 0) {
544                 /* remove everything else if there are no explicit entries
545                  * left */
546
547                 list_for_each_safe(tmp, nxt, &peer->ksnp_routes) {
548                         route = list_entry(tmp, ksock_route_t, ksnr_list);
549
550                         /* we should only be removing auto-entries */
551                         LASSERT(route->ksnr_share_count == 0);
552                         ksocknal_del_route_locked(route);
553                 }
554
555                 list_for_each_safe(tmp, nxt, &peer->ksnp_conns) {
556                         conn = list_entry(tmp, ksock_conn_t, ksnc_list);
557
558                         ksocknal_close_conn_locked(conn, 0);
559                 }
560         }
561
562         ksocknal_peer_decref(peer);
563                 /* NB peer unlinks itself when last conn/route is removed */
564 }
565
566 int
567 ksocknal_del_peer (lnet_ni_t *ni, lnet_process_id_t id, __u32 ip)
568 {
569         struct list_head  zombies = LIST_HEAD_INIT(zombies);
570         struct list_head *ptmp;
571         struct list_head *pnxt;
572         ksock_peer_t     *peer;
573         int               lo;
574         int               hi;
575         int               i;
576         int               rc = -ENOENT;
577
578         write_lock_bh(&ksocknal_data.ksnd_global_lock);
579
580         if (id.nid != LNET_NID_ANY) {
581                 hi = (int)(ksocknal_nid2peerlist(id.nid) -
582                            ksocknal_data.ksnd_peers);
583                 lo = hi;
584         } else {
585                 lo = 0;
586                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
587         }
588
589         for (i = lo; i <= hi; i++) {
590                 list_for_each_safe(ptmp, pnxt,
591                                    &ksocknal_data.ksnd_peers[i]) {
592                         peer = list_entry(ptmp, ksock_peer_t, ksnp_list);
593
594                         if (peer->ksnp_ni != ni)
595                                 continue;
596
597                         if (!((id.nid == LNET_NID_ANY ||
598                                peer->ksnp_id.nid == id.nid) &&
599                               (id.pid == LNET_PID_ANY ||
600                                peer->ksnp_id.pid == id.pid)))
601                                 continue;
602
603                         ksocknal_peer_addref(peer);     /* a ref for me... */
604
605                         ksocknal_del_peer_locked(peer, ip);
606
607                         if (peer->ksnp_closing &&
608                             !list_empty(&peer->ksnp_tx_queue)) {
609                                 LASSERT(list_empty(&peer->ksnp_conns));
610                                 LASSERT(list_empty(&peer->ksnp_routes));
611
612                                 list_splice_init(&peer->ksnp_tx_queue,
613                                                  &zombies);
614                         }
615
616                         ksocknal_peer_decref(peer);     /* ...till here */
617
618                         rc = 0;                         /* matched! */
619                 }
620         }
621
622         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
623
624         ksocknal_txlist_done(ni, &zombies, 1);
625
626         return rc;
627 }
628
629 ksock_conn_t *
630 ksocknal_get_conn_by_idx (lnet_ni_t *ni, int index)
631 {
632         ksock_peer_t     *peer;
633         struct list_head *ptmp;
634         ksock_conn_t     *conn;
635         struct list_head *ctmp;
636         int               i;
637
638         read_lock(&ksocknal_data.ksnd_global_lock);
639
640         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
641                 list_for_each(ptmp, &ksocknal_data.ksnd_peers[i]) {
642                         peer = list_entry(ptmp, ksock_peer_t, ksnp_list);
643
644                         LASSERT(!peer->ksnp_closing);
645
646                         if (peer->ksnp_ni != ni)
647                                 continue;
648
649                         list_for_each(ctmp, &peer->ksnp_conns) {
650                                 if (index-- > 0)
651                                         continue;
652
653                                 conn = list_entry(ctmp, ksock_conn_t,
654                                                   ksnc_list);
655                                 ksocknal_conn_addref(conn);
656                                 read_unlock(&ksocknal_data. \
657                                             ksnd_global_lock);
658                                 return conn;
659                         }
660                 }
661         }
662
663         read_unlock(&ksocknal_data.ksnd_global_lock);
664         return NULL;
665 }
666
667 ksock_sched_t *
668 ksocknal_choose_scheduler_locked(unsigned int cpt)
669 {
670         struct ksock_sched_info *info = ksocknal_data.ksnd_sched_info[cpt];
671         ksock_sched_t           *sched;
672         int                     i;
673
674         LASSERT(info->ksi_nthreads > 0);
675
676         sched = &info->ksi_scheds[0];
677         /*
678          * NB: it's safe so far, but info->ksi_nthreads could be changed
679          * at runtime when we have dynamic LNet configuration, then we
680          * need to take care of this.
681          */
682         for (i = 1; i < info->ksi_nthreads; i++) {
683                 if (sched->kss_nconns > info->ksi_scheds[i].kss_nconns)
684                         sched = &info->ksi_scheds[i];
685         }
686
687         return sched;
688 }
689
690 int
691 ksocknal_local_ipvec (lnet_ni_t *ni, __u32 *ipaddrs)
692 {
693         ksock_net_t       *net = ni->ni_data;
694         int                i;
695         int                nip;
696
697         read_lock(&ksocknal_data.ksnd_global_lock);
698
699         nip = net->ksnn_ninterfaces;
700         LASSERT (nip <= LNET_MAX_INTERFACES);
701
702         /* Only offer interfaces for additional connections if I have 
703          * more than one. */
704         if (nip < 2) {
705                 read_unlock(&ksocknal_data.ksnd_global_lock);
706                 return 0;
707         }
708
709         for (i = 0; i < nip; i++) {
710                 ipaddrs[i] = net->ksnn_interfaces[i].ksni_ipaddr;
711                 LASSERT (ipaddrs[i] != 0);
712         }
713
714         read_unlock(&ksocknal_data.ksnd_global_lock);
715         return (nip);
716 }
717
718 int
719 ksocknal_match_peerip (ksock_interface_t *iface, __u32 *ips, int nips)
720 {
721         int   best_netmatch = 0;
722         int   best_xor      = 0;
723         int   best          = -1;
724         int   this_xor;
725         int   this_netmatch;
726         int   i;
727
728         for (i = 0; i < nips; i++) {
729                 if (ips[i] == 0)
730                         continue;
731
732                 this_xor = (ips[i] ^ iface->ksni_ipaddr);
733                 this_netmatch = ((this_xor & iface->ksni_netmask) == 0) ? 1 : 0;
734
735                 if (!(best < 0 ||
736                       best_netmatch < this_netmatch ||
737                       (best_netmatch == this_netmatch &&
738                        best_xor > this_xor)))
739                         continue;
740
741                 best = i;
742                 best_netmatch = this_netmatch;
743                 best_xor = this_xor;
744         }
745
746         LASSERT (best >= 0);
747         return (best);
748 }
749
750 int
751 ksocknal_select_ips(ksock_peer_t *peer, __u32 *peerips, int n_peerips)
752 {
753         rwlock_t                *global_lock = &ksocknal_data.ksnd_global_lock;
754         ksock_net_t        *net = peer->ksnp_ni->ni_data;
755         ksock_interface_t  *iface;
756         ksock_interface_t  *best_iface;
757         int                 n_ips;
758         int                 i;
759         int                 j;
760         int                 k;
761         __u32               ip;
762         __u32               xor;
763         int                 this_netmatch;
764         int                 best_netmatch;
765         int                 best_npeers;
766
767         /* CAVEAT EMPTOR: We do all our interface matching with an
768          * exclusive hold of global lock at IRQ priority.  We're only
769          * expecting to be dealing with small numbers of interfaces, so the
770          * O(n**3)-ness shouldn't matter */
771
772         /* Also note that I'm not going to return more than n_peerips
773          * interfaces, even if I have more myself */
774
775         write_lock_bh(global_lock);
776
777         LASSERT (n_peerips <= LNET_MAX_INTERFACES);
778         LASSERT (net->ksnn_ninterfaces <= LNET_MAX_INTERFACES);
779
780         /* Only match interfaces for additional connections 
781          * if I have > 1 interface */
782         n_ips = (net->ksnn_ninterfaces < 2) ? 0 :
783                 MIN(n_peerips, net->ksnn_ninterfaces);
784
785         for (i = 0; peer->ksnp_n_passive_ips < n_ips; i++) {
786                 /*              ^ yes really... */
787
788                 /* If we have any new interfaces, first tick off all the
789                  * peer IPs that match old interfaces, then choose new
790                  * interfaces to match the remaining peer IPS.
791                  * We don't forget interfaces we've stopped using; we might
792                  * start using them again... */
793
794                 if (i < peer->ksnp_n_passive_ips) {
795                         /* Old interface. */
796                         ip = peer->ksnp_passive_ips[i];
797                         best_iface = ksocknal_ip2iface(peer->ksnp_ni, ip);
798
799                         /* peer passive ips are kept up to date */
800                         LASSERT(best_iface != NULL);
801                 } else {
802                         /* choose a new interface */
803                         LASSERT (i == peer->ksnp_n_passive_ips);
804
805                         best_iface = NULL;
806                         best_netmatch = 0;
807                         best_npeers = 0;
808
809                         for (j = 0; j < net->ksnn_ninterfaces; j++) {
810                                 iface = &net->ksnn_interfaces[j];
811                                 ip = iface->ksni_ipaddr;
812
813                                 for (k = 0; k < peer->ksnp_n_passive_ips; k++)
814                                         if (peer->ksnp_passive_ips[k] == ip)
815                                                 break;
816
817                                 if (k < peer->ksnp_n_passive_ips) /* using it already */
818                                         continue;
819
820                                 k = ksocknal_match_peerip(iface, peerips, n_peerips);
821                                 xor = (ip ^ peerips[k]);
822                                 this_netmatch = ((xor & iface->ksni_netmask) == 0) ? 1 : 0;
823
824                                 if (!(best_iface == NULL ||
825                                       best_netmatch < this_netmatch ||
826                                       (best_netmatch == this_netmatch &&
827                                        best_npeers > iface->ksni_npeers)))
828                                         continue;
829
830                                 best_iface = iface;
831                                 best_netmatch = this_netmatch;
832                                 best_npeers = iface->ksni_npeers;
833                         }
834
835                         LASSERT(best_iface != NULL);
836
837                         best_iface->ksni_npeers++;
838                         ip = best_iface->ksni_ipaddr;
839                         peer->ksnp_passive_ips[i] = ip;
840                         peer->ksnp_n_passive_ips = i+1;
841                 }
842
843                 /* mark the best matching peer IP used */
844                 j = ksocknal_match_peerip(best_iface, peerips, n_peerips);
845                 peerips[j] = 0;
846         }
847
848         /* Overwrite input peer IP addresses */
849         memcpy(peerips, peer->ksnp_passive_ips, n_ips * sizeof(*peerips));
850
851         write_unlock_bh(global_lock);
852
853         return (n_ips);
854 }
855
856 void
857 ksocknal_create_routes(ksock_peer_t *peer, int port,
858                        __u32 *peer_ipaddrs, int npeer_ipaddrs)
859 {
860         ksock_route_t           *newroute = NULL;
861         rwlock_t                *global_lock = &ksocknal_data.ksnd_global_lock;
862         lnet_ni_t               *ni = peer->ksnp_ni;
863         ksock_net_t             *net = ni->ni_data;
864         struct list_head        *rtmp;
865         ksock_route_t           *route;
866         ksock_interface_t       *iface;
867         ksock_interface_t       *best_iface;
868         int                     best_netmatch;
869         int                     this_netmatch;
870         int                     best_nroutes;
871         int                     i;
872         int                     j;
873
874         /* CAVEAT EMPTOR: We do all our interface matching with an
875          * exclusive hold of global lock at IRQ priority.  We're only
876          * expecting to be dealing with small numbers of interfaces, so the
877          * O(n**3)-ness here shouldn't matter */
878
879         write_lock_bh(global_lock);
880
881         if (net->ksnn_ninterfaces < 2) {
882                 /* Only create additional connections 
883                  * if I have > 1 interface */
884                 write_unlock_bh(global_lock);
885                 return;
886         }
887
888         LASSERT (npeer_ipaddrs <= LNET_MAX_INTERFACES);
889
890         for (i = 0; i < npeer_ipaddrs; i++) {
891                 if (newroute != NULL) {
892                         newroute->ksnr_ipaddr = peer_ipaddrs[i];
893                 } else {
894                         write_unlock_bh(global_lock);
895
896                         newroute = ksocknal_create_route(peer_ipaddrs[i], port);
897                         if (newroute == NULL)
898                                 return;
899
900                         write_lock_bh(global_lock);
901                 }
902
903                 if (peer->ksnp_closing) {
904                         /* peer got closed under me */
905                         break;
906                 }
907
908                 /* Already got a route? */
909                 route = NULL;
910                 list_for_each(rtmp, &peer->ksnp_routes) {
911                         route = list_entry(rtmp, ksock_route_t, ksnr_list);
912
913                         if (route->ksnr_ipaddr == newroute->ksnr_ipaddr)
914                                 break;
915
916                         route = NULL;
917                 }
918                 if (route != NULL)
919                         continue;
920
921                 best_iface = NULL;
922                 best_nroutes = 0;
923                 best_netmatch = 0;
924
925                 LASSERT(net->ksnn_ninterfaces <= LNET_MAX_INTERFACES);
926
927                 /* Select interface to connect from */
928                 for (j = 0; j < net->ksnn_ninterfaces; j++) {
929                         iface = &net->ksnn_interfaces[j];
930
931                         /* Using this interface already? */
932                         list_for_each(rtmp, &peer->ksnp_routes) {
933                                 route = list_entry(rtmp, ksock_route_t,
934                                                    ksnr_list);
935
936                                 if (route->ksnr_myipaddr == iface->ksni_ipaddr)
937                                         break;
938
939                                 route = NULL;
940                         }
941                         if (route != NULL)
942                                 continue;
943
944                         this_netmatch = (((iface->ksni_ipaddr ^
945                                            newroute->ksnr_ipaddr) &
946                                            iface->ksni_netmask) == 0) ? 1 : 0;
947
948                         if (!(best_iface == NULL ||
949                               best_netmatch < this_netmatch ||
950                               (best_netmatch == this_netmatch &&
951                                best_nroutes > iface->ksni_nroutes)))
952                                 continue;
953
954                         best_iface = iface;
955                         best_netmatch = this_netmatch;
956                         best_nroutes = iface->ksni_nroutes;
957                 }
958
959                 if (best_iface == NULL)
960                         continue;
961
962                 newroute->ksnr_myipaddr = best_iface->ksni_ipaddr;
963                 best_iface->ksni_nroutes++;
964
965                 ksocknal_add_route_locked(peer, newroute);
966                 newroute = NULL;
967         }
968
969         write_unlock_bh(global_lock);
970         if (newroute != NULL)
971                 ksocknal_route_decref(newroute);
972 }
973
974 int
975 ksocknal_accept (lnet_ni_t *ni, cfs_socket_t *sock)
976 {
977         ksock_connreq_t    *cr;
978         int                 rc;
979         __u32               peer_ip;
980         int                 peer_port;
981
982         rc = libcfs_sock_getaddr(sock, 1, &peer_ip, &peer_port);
983         LASSERT (rc == 0);                      /* we succeeded before */
984
985         LIBCFS_ALLOC(cr, sizeof(*cr));
986         if (cr == NULL) {
987                 LCONSOLE_ERROR_MSG(0x12f, "Dropping connection request from "
988                                    "%u.%u.%u.%u: memory exhausted\n",
989                                    HIPQUAD(peer_ip));
990                 return -ENOMEM;
991         }
992
993         lnet_ni_addref(ni);
994         cr->ksncr_ni   = ni;
995         cr->ksncr_sock = sock;
996
997         spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
998
999         list_add_tail(&cr->ksncr_list, &ksocknal_data.ksnd_connd_connreqs);
1000         wake_up(&ksocknal_data.ksnd_connd_waitq);
1001
1002         spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
1003         return 0;
1004 }
1005
1006 int
1007 ksocknal_connecting (ksock_peer_t *peer, __u32 ipaddr)
1008 {
1009         ksock_route_t *route;
1010
1011         list_for_each_entry(route, &peer->ksnp_routes, ksnr_list) {
1012                 if (route->ksnr_ipaddr == ipaddr)
1013                         return route->ksnr_connecting;
1014         }
1015         return 0;
1016 }
1017
1018 int
1019 ksocknal_create_conn (lnet_ni_t *ni, ksock_route_t *route,
1020                       cfs_socket_t *sock, int type)
1021 {
1022         rwlock_t                *global_lock = &ksocknal_data.ksnd_global_lock;
1023         struct list_head        zombies = LIST_HEAD_INIT(zombies);
1024         lnet_process_id_t       peerid;
1025         struct list_head        *tmp;
1026         __u64              incarnation;
1027         ksock_conn_t      *conn;
1028         ksock_conn_t      *conn2;
1029         ksock_peer_t      *peer = NULL;
1030         ksock_peer_t      *peer2;
1031         ksock_sched_t     *sched;
1032         ksock_hello_msg_t *hello;
1033         int                cpt;
1034         ksock_tx_t        *tx;
1035         ksock_tx_t        *txtmp;
1036         int                rc;
1037         int                active;
1038         char              *warn = NULL;
1039
1040         active = (route != NULL);
1041
1042         LASSERT (active == (type != SOCKLND_CONN_NONE));
1043
1044         LIBCFS_ALLOC(conn, sizeof(*conn));
1045         if (conn == NULL) {
1046                 rc = -ENOMEM;
1047                 goto failed_0;
1048         }
1049
1050         memset (conn, 0, sizeof (*conn));
1051
1052         conn->ksnc_peer = NULL;
1053         conn->ksnc_route = NULL;
1054         conn->ksnc_sock = sock;
1055         /* 2 ref, 1 for conn, another extra ref prevents socket
1056          * being closed before establishment of connection */
1057         atomic_set (&conn->ksnc_sock_refcount, 2);
1058         conn->ksnc_type = type;
1059         ksocknal_lib_save_callback(sock, conn);
1060         atomic_set (&conn->ksnc_conn_refcount, 1); /* 1 ref for me */
1061
1062         conn->ksnc_rx_ready = 0;
1063         conn->ksnc_rx_scheduled = 0;
1064
1065         INIT_LIST_HEAD(&conn->ksnc_tx_queue);
1066         conn->ksnc_tx_ready = 0;
1067         conn->ksnc_tx_scheduled = 0;
1068         conn->ksnc_tx_carrier = NULL;
1069         atomic_set (&conn->ksnc_tx_nob, 0);
1070
1071         LIBCFS_ALLOC(hello, offsetof(ksock_hello_msg_t,
1072                                      kshm_ips[LNET_MAX_INTERFACES]));
1073         if (hello == NULL) {
1074                 rc = -ENOMEM;
1075                 goto failed_1;
1076         }
1077
1078         /* stash conn's local and remote addrs */
1079         rc = ksocknal_lib_get_conn_addrs (conn);
1080         if (rc != 0)
1081                 goto failed_1;
1082
1083         /* Find out/confirm peer's NID and connection type and get the
1084          * vector of interfaces she's willing to let me connect to.
1085          * Passive connections use the listener timeout since the peer sends
1086          * eagerly */
1087
1088         if (active) {
1089                 peer = route->ksnr_peer;
1090                 LASSERT(ni == peer->ksnp_ni);
1091
1092                 /* Active connection sends HELLO eagerly */
1093                 hello->kshm_nips = ksocknal_local_ipvec(ni, hello->kshm_ips);
1094                 peerid = peer->ksnp_id;
1095
1096                 write_lock_bh(global_lock);
1097                 conn->ksnc_proto = peer->ksnp_proto;
1098                 write_unlock_bh(global_lock);
1099
1100                 if (conn->ksnc_proto == NULL) {
1101                          conn->ksnc_proto = &ksocknal_protocol_v3x;
1102 #if SOCKNAL_VERSION_DEBUG
1103                          if (*ksocknal_tunables.ksnd_protocol == 2)
1104                                  conn->ksnc_proto = &ksocknal_protocol_v2x;
1105                          else if (*ksocknal_tunables.ksnd_protocol == 1)
1106                                  conn->ksnc_proto = &ksocknal_protocol_v1x;
1107 #endif
1108                 }
1109
1110                 rc = ksocknal_send_hello (ni, conn, peerid.nid, hello);
1111                 if (rc != 0)
1112                         goto failed_1;
1113         } else {
1114                 peerid.nid = LNET_NID_ANY;
1115                 peerid.pid = LNET_PID_ANY;
1116
1117                 /* Passive, get protocol from peer */
1118                 conn->ksnc_proto = NULL;
1119         }
1120
1121         rc = ksocknal_recv_hello (ni, conn, hello, &peerid, &incarnation);
1122         if (rc < 0)
1123                 goto failed_1;
1124
1125         LASSERT (rc == 0 || active);
1126         LASSERT (conn->ksnc_proto != NULL);
1127         LASSERT (peerid.nid != LNET_NID_ANY);
1128
1129         cpt = lnet_cpt_of_nid(peerid.nid);
1130
1131         if (active) {
1132                 ksocknal_peer_addref(peer);
1133                 write_lock_bh(global_lock);
1134         } else {
1135                 rc = ksocknal_create_peer(&peer, ni, peerid);
1136                 if (rc != 0)
1137                         goto failed_1;
1138
1139                 write_lock_bh(global_lock);
1140
1141                 /* called with a ref on ni, so shutdown can't have started */
1142                 LASSERT (((ksock_net_t *) ni->ni_data)->ksnn_shutdown == 0);
1143
1144                 peer2 = ksocknal_find_peer_locked(ni, peerid);
1145                 if (peer2 == NULL) {
1146                         /* NB this puts an "empty" peer in the peer
1147                          * table (which takes my ref) */
1148                         list_add_tail(&peer->ksnp_list,
1149                                       ksocknal_nid2peerlist(peerid.nid));
1150                 } else {
1151                         ksocknal_peer_decref(peer);
1152                         peer = peer2;
1153                 }
1154
1155                 /* +1 ref for me */
1156                 ksocknal_peer_addref(peer);
1157                 peer->ksnp_accepting++;
1158
1159                 /* Am I already connecting to this guy?  Resolve in
1160                  * favour of higher NID... */
1161                 if (peerid.nid < ni->ni_nid &&
1162                     ksocknal_connecting(peer, conn->ksnc_ipaddr)) {
1163                         rc = EALREADY;
1164                         warn = "connection race resolution";
1165                         goto failed_2;
1166                 }
1167         }
1168
1169         if (peer->ksnp_closing ||
1170             (active && route->ksnr_deleted)) {
1171                 /* peer/route got closed under me */
1172                 rc = -ESTALE;
1173                 warn = "peer/route removed";
1174                 goto failed_2;
1175         }
1176
1177         if (peer->ksnp_proto == NULL) {
1178                 /* Never connected before.
1179                  * NB recv_hello may have returned EPROTO to signal my peer
1180                  * wants a different protocol than the one I asked for.
1181                  */
1182                 LASSERT(list_empty(&peer->ksnp_conns));
1183
1184                 peer->ksnp_proto = conn->ksnc_proto;
1185                 peer->ksnp_incarnation = incarnation;
1186         }
1187
1188         if (peer->ksnp_proto != conn->ksnc_proto ||
1189             peer->ksnp_incarnation != incarnation) {
1190                 /* Peer rebooted or I've got the wrong protocol version */
1191                 ksocknal_close_peer_conns_locked(peer, 0, 0);
1192
1193                 peer->ksnp_proto = NULL;
1194                 rc = ESTALE;
1195                 warn = peer->ksnp_incarnation != incarnation ?
1196                        "peer rebooted" :
1197                        "wrong proto version";
1198                 goto failed_2;
1199         }
1200
1201         switch (rc) {
1202         default:
1203                 LBUG();
1204         case 0:
1205                 break;
1206         case EALREADY:
1207                 warn = "lost conn race";
1208                 goto failed_2;
1209         case EPROTO:
1210                 warn = "retry with different protocol version";
1211                 goto failed_2;
1212         }
1213
1214         /* Refuse to duplicate an existing connection, unless this is a
1215          * loopback connection */
1216         if (conn->ksnc_ipaddr != conn->ksnc_myipaddr) {
1217                 list_for_each(tmp, &peer->ksnp_conns) {
1218                         conn2 = list_entry(tmp, ksock_conn_t, ksnc_list);
1219
1220                         if (conn2->ksnc_ipaddr != conn->ksnc_ipaddr ||
1221                             conn2->ksnc_myipaddr != conn->ksnc_myipaddr ||
1222                             conn2->ksnc_type != conn->ksnc_type)
1223                                 continue;
1224
1225                         /* Reply on a passive connection attempt so the peer
1226                          * realises we're connected. */
1227                         LASSERT (rc == 0);
1228                         if (!active)
1229                                 rc = EALREADY;
1230
1231                         warn = "duplicate";
1232                         goto failed_2;
1233                 }
1234         }
1235
1236         /* If the connection created by this route didn't bind to the IP
1237          * address the route connected to, the connection/route matching
1238          * code below probably isn't going to work. */
1239         if (active &&
1240             route->ksnr_ipaddr != conn->ksnc_ipaddr) {
1241                 CERROR("Route %s %u.%u.%u.%u connected to %u.%u.%u.%u\n",
1242                        libcfs_id2str(peer->ksnp_id),
1243                        HIPQUAD(route->ksnr_ipaddr),
1244                        HIPQUAD(conn->ksnc_ipaddr));
1245         }
1246
1247         /* Search for a route corresponding to the new connection and
1248          * create an association.  This allows incoming connections created
1249          * by routes in my peer to match my own route entries so I don't
1250          * continually create duplicate routes. */
1251         list_for_each(tmp, &peer->ksnp_routes) {
1252                 route = list_entry(tmp, ksock_route_t, ksnr_list);
1253
1254                 if (route->ksnr_ipaddr != conn->ksnc_ipaddr)
1255                         continue;
1256
1257                 ksocknal_associate_route_conn_locked(route, conn);
1258                 break;
1259         }
1260
1261         conn->ksnc_peer = peer;                 /* conn takes my ref on peer */
1262         peer->ksnp_last_alive = cfs_time_current();
1263         peer->ksnp_send_keepalive = 0;
1264         peer->ksnp_error = 0;
1265
1266         sched = ksocknal_choose_scheduler_locked(cpt);
1267         sched->kss_nconns++;
1268         conn->ksnc_scheduler = sched;
1269
1270         conn->ksnc_tx_last_post = cfs_time_current();
1271         /* Set the deadline for the outgoing HELLO to drain */
1272         conn->ksnc_tx_bufnob = libcfs_sock_wmem_queued(sock);
1273         conn->ksnc_tx_deadline = cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
1274         smp_mb();   /* order with adding to peer's conn list */
1275
1276         list_add(&conn->ksnc_list, &peer->ksnp_conns);
1277         ksocknal_conn_addref(conn);
1278
1279         ksocknal_new_packet(conn, 0);
1280
1281         conn->ksnc_zc_capable = ksocknal_lib_zc_capable(conn);
1282
1283         /* Take packets blocking for this connection. */
1284         list_for_each_entry_safe(tx, txtmp, &peer->ksnp_tx_queue, tx_list) {
1285                 if (conn->ksnc_proto->pro_match_tx(conn, tx, tx->tx_nonblk) ==
1286                     SOCKNAL_MATCH_NO)
1287                         continue;
1288
1289                 list_del(&tx->tx_list);
1290                 ksocknal_queue_tx_locked(tx, conn);
1291         }
1292
1293         write_unlock_bh(global_lock);
1294
1295         /* We've now got a new connection.  Any errors from here on are just
1296          * like "normal" comms errors and we close the connection normally.
1297          * NB (a) we still have to send the reply HELLO for passive
1298          *        connections, 
1299          *    (b) normal I/O on the conn is blocked until I setup and call the
1300          *        socket callbacks.
1301          */
1302
1303         CDEBUG(D_NET, "New conn %s p %d.x %u.%u.%u.%u -> %u.%u.%u.%u/%d"
1304                " incarnation:"LPD64" sched[%d:%d]\n",
1305                libcfs_id2str(peerid), conn->ksnc_proto->pro_version,
1306                HIPQUAD(conn->ksnc_myipaddr), HIPQUAD(conn->ksnc_ipaddr),
1307                conn->ksnc_port, incarnation, cpt,
1308                (int)(sched - &sched->kss_info->ksi_scheds[0]));
1309
1310         if (active) {
1311                 /* additional routes after interface exchange? */
1312                 ksocknal_create_routes(peer, conn->ksnc_port,
1313                                        hello->kshm_ips, hello->kshm_nips);
1314         } else {
1315                 hello->kshm_nips = ksocknal_select_ips(peer, hello->kshm_ips,
1316                                                        hello->kshm_nips);
1317                 rc = ksocknal_send_hello(ni, conn, peerid.nid, hello);
1318         }
1319
1320         LIBCFS_FREE(hello, offsetof(ksock_hello_msg_t,
1321                                     kshm_ips[LNET_MAX_INTERFACES]));
1322
1323         /* setup the socket AFTER I've received hello (it disables
1324          * SO_LINGER).  I might call back to the acceptor who may want
1325          * to send a protocol version response and then close the
1326          * socket; this ensures the socket only tears down after the
1327          * response has been sent. */
1328         if (rc == 0)
1329                 rc = ksocknal_lib_setup_sock(sock);
1330
1331         write_lock_bh(global_lock);
1332
1333         /* NB my callbacks block while I hold ksnd_global_lock */
1334         ksocknal_lib_set_callback(sock, conn);
1335
1336         if (!active)
1337                 peer->ksnp_accepting--;
1338
1339         write_unlock_bh(global_lock);
1340
1341         if (rc != 0) {
1342                 write_lock_bh(global_lock);
1343                 if (!conn->ksnc_closing) {
1344                         /* could be closed by another thread */
1345                         ksocknal_close_conn_locked(conn, rc);
1346                 }
1347                 write_unlock_bh(global_lock);
1348         } else if (ksocknal_connsock_addref(conn) == 0) {
1349                 /* Allow I/O to proceed. */
1350                 ksocknal_read_callback(conn);
1351                 ksocknal_write_callback(conn);
1352                 ksocknal_connsock_decref(conn);
1353         }
1354
1355         ksocknal_connsock_decref(conn);
1356         ksocknal_conn_decref(conn);
1357         return rc;
1358
1359 failed_2:
1360         if (!peer->ksnp_closing &&
1361             list_empty(&peer->ksnp_conns) &&
1362             list_empty(&peer->ksnp_routes)) {
1363                 list_add(&zombies, &peer->ksnp_tx_queue);
1364                 list_del_init(&peer->ksnp_tx_queue);
1365                 ksocknal_unlink_peer_locked(peer);
1366         }
1367
1368         write_unlock_bh(global_lock);
1369
1370         if (warn != NULL) {
1371                 if (rc < 0)
1372                         CERROR("Not creating conn %s type %d: %s\n",
1373                                libcfs_id2str(peerid), conn->ksnc_type, warn);
1374                 else
1375                         CDEBUG(D_NET, "Not creating conn %s type %d: %s\n",
1376                               libcfs_id2str(peerid), conn->ksnc_type, warn);
1377         }
1378
1379         if (!active) {
1380                 if (rc > 0) {
1381                         /* Request retry by replying with CONN_NONE 
1382                          * ksnc_proto has been set already */
1383                         conn->ksnc_type = SOCKLND_CONN_NONE;
1384                         hello->kshm_nips = 0;
1385                         ksocknal_send_hello(ni, conn, peerid.nid, hello);
1386                 }
1387
1388                 write_lock_bh(global_lock);
1389                 peer->ksnp_accepting--;
1390                 write_unlock_bh(global_lock);
1391         }
1392
1393         ksocknal_txlist_done(ni, &zombies, 1);
1394         ksocknal_peer_decref(peer);
1395
1396  failed_1:
1397         if (hello != NULL)
1398                 LIBCFS_FREE(hello, offsetof(ksock_hello_msg_t,
1399                                             kshm_ips[LNET_MAX_INTERFACES]));
1400
1401         LIBCFS_FREE (conn, sizeof(*conn));
1402
1403  failed_0:
1404         libcfs_sock_release(sock);
1405         return rc;
1406 }
1407
1408 void
1409 ksocknal_close_conn_locked (ksock_conn_t *conn, int error)
1410 {
1411         /* This just does the immmediate housekeeping, and queues the
1412          * connection for the reaper to terminate.
1413          * Caller holds ksnd_global_lock exclusively in irq context */
1414         ksock_peer_t      *peer = conn->ksnc_peer;
1415         ksock_route_t     *route;
1416         ksock_conn_t      *conn2;
1417         struct list_head  *tmp;
1418
1419         LASSERT(peer->ksnp_error == 0);
1420         LASSERT(!conn->ksnc_closing);
1421         conn->ksnc_closing = 1;
1422
1423         /* ksnd_deathrow_conns takes over peer's ref */
1424         list_del(&conn->ksnc_list);
1425
1426         route = conn->ksnc_route;
1427         if (route != NULL) {
1428                 /* dissociate conn from route... */
1429                 LASSERT(!route->ksnr_deleted);
1430                 LASSERT((route->ksnr_connected & (1 << conn->ksnc_type)) != 0);
1431
1432                 conn2 = NULL;
1433                 list_for_each(tmp, &peer->ksnp_conns) {
1434                         conn2 = list_entry(tmp, ksock_conn_t, ksnc_list);
1435
1436                         if (conn2->ksnc_route == route &&
1437                             conn2->ksnc_type == conn->ksnc_type)
1438                                 break;
1439
1440                         conn2 = NULL;
1441                 }
1442                 if (conn2 == NULL)
1443                         route->ksnr_connected &= ~(1 << conn->ksnc_type);
1444
1445                 conn->ksnc_route = NULL;
1446
1447 #if 0           /* irrelevent with only eager routes */
1448                 /* make route least favourite */
1449                 list_del(&route->ksnr_list);
1450                 list_add_tail(&route->ksnr_list, &peer->ksnp_routes);
1451 #endif
1452                 ksocknal_route_decref(route);   /* drop conn's ref on route */
1453         }
1454
1455         if (list_empty(&peer->ksnp_conns)) {
1456                 /* No more connections to this peer */
1457
1458                 if (!list_empty(&peer->ksnp_tx_queue)) {
1459                                 ksock_tx_t *tx;
1460
1461                         LASSERT(conn->ksnc_proto == &ksocknal_protocol_v3x);
1462
1463                         /* throw them to the last connection...,
1464                          * these TXs will be send to /dev/null by scheduler */
1465                         list_for_each_entry(tx, &peer->ksnp_tx_queue,
1466                                             tx_list)
1467                                 ksocknal_tx_prep(conn, tx);
1468
1469                         spin_lock_bh(&conn->ksnc_scheduler->kss_lock);
1470                         list_splice_init(&peer->ksnp_tx_queue,
1471                                          &conn->ksnc_tx_queue);
1472                         spin_unlock_bh(&conn->ksnc_scheduler->kss_lock);
1473                 }
1474
1475                 /* renegotiate protocol version */
1476                 peer->ksnp_proto = NULL;
1477                 /* stash last conn close reason */
1478                 peer->ksnp_error = error;
1479
1480                 if (list_empty(&peer->ksnp_routes)) {
1481                         /* I've just closed last conn belonging to a
1482                          * peer with no routes to it */
1483                         ksocknal_unlink_peer_locked(peer);
1484                 }
1485         }
1486
1487         spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
1488
1489         list_add_tail(&conn->ksnc_list,
1490                       &ksocknal_data.ksnd_deathrow_conns);
1491         wake_up(&ksocknal_data.ksnd_reaper_waitq);
1492
1493         spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
1494 }
1495
1496 void
1497 ksocknal_peer_failed (ksock_peer_t *peer)
1498 {
1499         int        notify = 0;
1500         cfs_time_t last_alive = 0;
1501
1502         /* There has been a connection failure or comms error; but I'll only
1503          * tell LNET I think the peer is dead if it's to another kernel and
1504          * there are no connections or connection attempts in existance. */
1505
1506         read_lock(&ksocknal_data.ksnd_global_lock);
1507
1508         if ((peer->ksnp_id.pid & LNET_PID_USERFLAG) == 0 &&
1509              list_empty(&peer->ksnp_conns) &&
1510              peer->ksnp_accepting == 0 &&
1511              ksocknal_find_connecting_route_locked(peer) == NULL) {
1512                 notify = 1;
1513                 last_alive = peer->ksnp_last_alive;
1514         }
1515
1516         read_unlock(&ksocknal_data.ksnd_global_lock);
1517
1518         if (notify)
1519                 lnet_notify(peer->ksnp_ni, peer->ksnp_id.nid, 0,
1520                             last_alive);
1521 }
1522
1523 void
1524 ksocknal_finalize_zcreq(ksock_conn_t *conn)
1525 {
1526         ksock_peer_t     *peer = conn->ksnc_peer;
1527         ksock_tx_t       *tx;
1528         ksock_tx_t       *tmp;
1529         struct list_head  zlist = LIST_HEAD_INIT(zlist);
1530
1531         /* NB safe to finalize TXs because closing of socket will
1532          * abort all buffered data */
1533         LASSERT(conn->ksnc_sock == NULL);
1534
1535         spin_lock(&peer->ksnp_lock);
1536
1537         list_for_each_entry_safe(tx, tmp, &peer->ksnp_zc_req_list, tx_zc_list) {
1538                 if (tx->tx_conn != conn)
1539                         continue;
1540
1541                 LASSERT(tx->tx_msg.ksm_zc_cookies[0] != 0);
1542
1543                 tx->tx_msg.ksm_zc_cookies[0] = 0;
1544                 tx->tx_zc_aborted = 1;  /* mark it as not-acked */
1545                 list_del(&tx->tx_zc_list);
1546                 list_add(&tx->tx_zc_list, &zlist);
1547         }
1548
1549         spin_unlock(&peer->ksnp_lock);
1550
1551         while (!list_empty(&zlist)) {
1552                 tx = list_entry(zlist.next, ksock_tx_t, tx_zc_list);
1553
1554                 list_del(&tx->tx_zc_list);
1555                 ksocknal_tx_decref(tx);
1556         }
1557 }
1558
1559 void
1560 ksocknal_terminate_conn(ksock_conn_t *conn)
1561 {
1562         /* This gets called by the reaper (guaranteed thread context) to
1563          * disengage the socket from its callbacks and close it.
1564          * ksnc_refcount will eventually hit zero, and then the reaper will
1565          * destroy it. */
1566         ksock_peer_t     *peer = conn->ksnc_peer;
1567         ksock_sched_t    *sched = conn->ksnc_scheduler;
1568         int               failed = 0;
1569
1570         LASSERT(conn->ksnc_closing);
1571
1572         /* wake up the scheduler to "send" all remaining packets to /dev/null */
1573         spin_lock_bh(&sched->kss_lock);
1574
1575         /* a closing conn is always ready to tx */
1576         conn->ksnc_tx_ready = 1;
1577
1578         if (!conn->ksnc_tx_scheduled &&
1579             !list_empty(&conn->ksnc_tx_queue)) {
1580                 list_add_tail(&conn->ksnc_tx_list,
1581                                &sched->kss_tx_conns);
1582                 conn->ksnc_tx_scheduled = 1;
1583                 /* extra ref for scheduler */
1584                 ksocknal_conn_addref(conn);
1585
1586                 wake_up (&sched->kss_waitq);
1587         }
1588
1589         spin_unlock_bh(&sched->kss_lock);
1590
1591         /* serialise with callbacks */
1592         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1593
1594         ksocknal_lib_reset_callback(conn->ksnc_sock, conn);
1595
1596         /* OK, so this conn may not be completely disengaged from its
1597          * scheduler yet, but it _has_ committed to terminate... */
1598         conn->ksnc_scheduler->kss_nconns--;
1599
1600         if (peer->ksnp_error != 0) {
1601                 /* peer's last conn closed in error */
1602                 LASSERT(list_empty(&peer->ksnp_conns));
1603                 failed = 1;
1604                 peer->ksnp_error = 0;     /* avoid multiple notifications */
1605         }
1606
1607         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1608
1609         if (failed)
1610                 ksocknal_peer_failed(peer);
1611
1612         /* The socket is closed on the final put; either here, or in
1613          * ksocknal_{send,recv}msg().  Since we set up the linger2 option
1614          * when the connection was established, this will close the socket
1615          * immediately, aborting anything buffered in it. Any hung
1616          * zero-copy transmits will therefore complete in finite time. */
1617         ksocknal_connsock_decref(conn);
1618 }
1619
1620 void
1621 ksocknal_queue_zombie_conn (ksock_conn_t *conn)
1622 {
1623         /* Queue the conn for the reaper to destroy */
1624
1625         LASSERT(atomic_read(&conn->ksnc_conn_refcount) == 0);
1626         spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
1627
1628         list_add_tail(&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1629         wake_up(&ksocknal_data.ksnd_reaper_waitq);
1630
1631         spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
1632 }
1633
1634 void
1635 ksocknal_destroy_conn (ksock_conn_t *conn)
1636 {
1637         cfs_time_t      last_rcv;
1638
1639         /* Final coup-de-grace of the reaper */
1640         CDEBUG (D_NET, "connection %p\n", conn);
1641
1642         LASSERT (atomic_read (&conn->ksnc_conn_refcount) == 0);
1643         LASSERT (atomic_read (&conn->ksnc_sock_refcount) == 0);
1644         LASSERT (conn->ksnc_sock == NULL);
1645         LASSERT (conn->ksnc_route == NULL);
1646         LASSERT (!conn->ksnc_tx_scheduled);
1647         LASSERT (!conn->ksnc_rx_scheduled);
1648         LASSERT(list_empty(&conn->ksnc_tx_queue));
1649
1650         /* complete current receive if any */
1651         switch (conn->ksnc_rx_state) {
1652         case SOCKNAL_RX_LNET_PAYLOAD:
1653                 last_rcv = conn->ksnc_rx_deadline -
1654                            cfs_time_seconds(*ksocknal_tunables.ksnd_timeout);
1655                 CERROR("Completing partial receive from %s[%d]"
1656                        ", ip %d.%d.%d.%d:%d, with error, wanted: %d, left: %d, "
1657                        "last alive is %ld secs ago\n",
1658                        libcfs_id2str(conn->ksnc_peer->ksnp_id), conn->ksnc_type,
1659                        HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port,
1660                        conn->ksnc_rx_nob_wanted, conn->ksnc_rx_nob_left,
1661                        cfs_duration_sec(cfs_time_sub(cfs_time_current(),
1662                                         last_rcv)));
1663                 lnet_finalize (conn->ksnc_peer->ksnp_ni,
1664                                conn->ksnc_cookie, -EIO);
1665                 break;
1666         case SOCKNAL_RX_LNET_HEADER:
1667                 if (conn->ksnc_rx_started)
1668                         CERROR("Incomplete receive of lnet header from %s"
1669                                ", ip %d.%d.%d.%d:%d, with error, protocol: %d.x.\n",
1670                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1671                                HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port,
1672                                conn->ksnc_proto->pro_version);
1673                 break;
1674         case SOCKNAL_RX_KSM_HEADER:
1675                 if (conn->ksnc_rx_started)
1676                         CERROR("Incomplete receive of ksock message from %s"
1677                                ", ip %d.%d.%d.%d:%d, with error, protocol: %d.x.\n",
1678                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1679                                HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port,
1680                                conn->ksnc_proto->pro_version);
1681                 break;
1682         case SOCKNAL_RX_SLOP:
1683                 if (conn->ksnc_rx_started)
1684                         CERROR("Incomplete receive of slops from %s"
1685                                ", ip %d.%d.%d.%d:%d, with error\n",
1686                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1687                                HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
1688                break;
1689         default:
1690                 LBUG ();
1691                 break;
1692         }
1693
1694         ksocknal_peer_decref(conn->ksnc_peer);
1695
1696         LIBCFS_FREE (conn, sizeof (*conn));
1697 }
1698
1699 int
1700 ksocknal_close_peer_conns_locked (ksock_peer_t *peer, __u32 ipaddr, int why)
1701 {
1702         ksock_conn_t       *conn;
1703         struct list_head         *ctmp;
1704         struct list_head         *cnxt;
1705         int                 count = 0;
1706
1707         list_for_each_safe(ctmp, cnxt, &peer->ksnp_conns) {
1708                 conn = list_entry(ctmp, ksock_conn_t, ksnc_list);
1709
1710                 if (ipaddr == 0 ||
1711                     conn->ksnc_ipaddr == ipaddr) {
1712                         count++;
1713                         ksocknal_close_conn_locked (conn, why);
1714                 }
1715         }
1716
1717         return (count);
1718 }
1719
1720 int
1721 ksocknal_close_conn_and_siblings (ksock_conn_t *conn, int why)
1722 {
1723         ksock_peer_t     *peer = conn->ksnc_peer;
1724         __u32             ipaddr = conn->ksnc_ipaddr;
1725         int               count;
1726
1727         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1728
1729         count = ksocknal_close_peer_conns_locked (peer, ipaddr, why);
1730
1731         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1732
1733         return (count);
1734 }
1735
1736 int
1737 ksocknal_close_matching_conns (lnet_process_id_t id, __u32 ipaddr)
1738 {
1739         ksock_peer_t       *peer;
1740         struct list_head         *ptmp;
1741         struct list_head         *pnxt;
1742         int                 lo;
1743         int                 hi;
1744         int                 i;
1745         int                 count = 0;
1746
1747         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1748
1749         if (id.nid != LNET_NID_ANY)
1750                 lo = hi = (int)(ksocknal_nid2peerlist(id.nid) - ksocknal_data.ksnd_peers);
1751         else {
1752                 lo = 0;
1753                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
1754         }
1755
1756         for (i = lo; i <= hi; i++) {
1757                 list_for_each_safe(ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
1758
1759                         peer = list_entry(ptmp, ksock_peer_t, ksnp_list);
1760
1761                         if (!((id.nid == LNET_NID_ANY || id.nid == peer->ksnp_id.nid) &&
1762                               (id.pid == LNET_PID_ANY || id.pid == peer->ksnp_id.pid)))
1763                                 continue;
1764
1765                         count += ksocknal_close_peer_conns_locked (peer, ipaddr, 0);
1766                 }
1767         }
1768
1769         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1770
1771         /* wildcards always succeed */
1772         if (id.nid == LNET_NID_ANY || id.pid == LNET_PID_ANY || ipaddr == 0)
1773                 return (0);
1774
1775         return (count == 0 ? -ENOENT : 0);
1776 }
1777
1778 void
1779 ksocknal_notify (lnet_ni_t *ni, lnet_nid_t gw_nid, int alive)
1780 {
1781         /* The router is telling me she's been notified of a change in
1782          * gateway state.... */
1783         lnet_process_id_t  id = {0};
1784
1785         id.nid = gw_nid;
1786         id.pid = LNET_PID_ANY;
1787
1788         CDEBUG (D_NET, "gw %s %s\n", libcfs_nid2str(gw_nid),
1789                 alive ? "up" : "down");
1790
1791         if (!alive) {
1792                 /* If the gateway crashed, close all open connections... */
1793                 ksocknal_close_matching_conns (id, 0);
1794                 return;
1795         }
1796
1797         /* ...otherwise do nothing.  We can only establish new connections
1798          * if we have autroutes, and these connect on demand. */
1799 }
1800
1801 void
1802 ksocknal_query (lnet_ni_t *ni, lnet_nid_t nid, cfs_time_t *when)
1803 {
1804         int                connect = 1;
1805         cfs_time_t         last_alive = 0;
1806         cfs_time_t         now = cfs_time_current();
1807         ksock_peer_t      *peer = NULL;
1808         rwlock_t                *glock = &ksocknal_data.ksnd_global_lock;
1809         lnet_process_id_t  id = {
1810                 .nid = nid,
1811                 .pid = LNET_PID_LUSTRE,
1812         };
1813
1814         read_lock(glock);
1815
1816         peer = ksocknal_find_peer_locked(ni, id);
1817         if (peer != NULL) {
1818                 struct list_head       *tmp;
1819                 ksock_conn_t     *conn;
1820                 int               bufnob;
1821
1822                 list_for_each(tmp, &peer->ksnp_conns) {
1823                         conn = list_entry(tmp, ksock_conn_t, ksnc_list);
1824                         bufnob = libcfs_sock_wmem_queued(conn->ksnc_sock);
1825
1826                         if (bufnob < conn->ksnc_tx_bufnob) {
1827                                 /* something got ACKed */
1828                                 conn->ksnc_tx_deadline =
1829                                         cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
1830                                 peer->ksnp_last_alive = now;
1831                                 conn->ksnc_tx_bufnob = bufnob;
1832                         }
1833                 }
1834
1835                 last_alive = peer->ksnp_last_alive;
1836                 if (ksocknal_find_connectable_route_locked(peer) == NULL)
1837                         connect = 0;
1838         }
1839
1840         read_unlock(glock);
1841
1842         if (last_alive != 0)
1843                 *when = last_alive;
1844
1845         CDEBUG(D_NET, "Peer %s %p, alive %ld secs ago, connect %d\n",
1846                libcfs_nid2str(nid), peer,
1847                last_alive ? cfs_duration_sec(now - last_alive) : -1,
1848                connect);
1849
1850         if (!connect)
1851                 return;
1852
1853         ksocknal_add_peer(ni, id, LNET_NIDADDR(nid), lnet_acceptor_port());
1854
1855         write_lock_bh(glock);
1856
1857         peer = ksocknal_find_peer_locked(ni, id);
1858         if (peer != NULL)
1859                 ksocknal_launch_all_connections_locked(peer);
1860
1861         write_unlock_bh(glock);
1862         return;
1863 }
1864
1865 void
1866 ksocknal_push_peer (ksock_peer_t *peer)
1867 {
1868         int               index;
1869         int               i;
1870         struct list_head       *tmp;
1871         ksock_conn_t     *conn;
1872
1873         for (index = 0; ; index++) {
1874                 read_lock(&ksocknal_data.ksnd_global_lock);
1875
1876                 i = 0;
1877                 conn = NULL;
1878
1879                 list_for_each(tmp, &peer->ksnp_conns) {
1880                         if (i++ == index) {
1881                                 conn = list_entry(tmp, ksock_conn_t,
1882                                                        ksnc_list);
1883                                 ksocknal_conn_addref(conn);
1884                                 break;
1885                         }
1886                 }
1887
1888                 read_unlock(&ksocknal_data.ksnd_global_lock);
1889
1890                 if (conn == NULL)
1891                         break;
1892
1893                 ksocknal_lib_push_conn (conn);
1894                 ksocknal_conn_decref(conn);
1895         }
1896 }
1897
1898 int
1899 ksocknal_push (lnet_ni_t *ni, lnet_process_id_t id)
1900 {
1901         ksock_peer_t      *peer;
1902         struct list_head        *tmp;
1903         int                index;
1904         int                i;
1905         int                j;
1906         int                rc = -ENOENT;
1907
1908         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1909                 for (j = 0; ; j++) {
1910                         read_lock(&ksocknal_data.ksnd_global_lock);
1911
1912                         index = 0;
1913                         peer = NULL;
1914
1915                         list_for_each(tmp, &ksocknal_data.ksnd_peers[i]) {
1916                                 peer = list_entry(tmp, ksock_peer_t,
1917                                                       ksnp_list);
1918
1919                                 if (!((id.nid == LNET_NID_ANY ||
1920                                        id.nid == peer->ksnp_id.nid) &&
1921                                       (id.pid == LNET_PID_ANY ||
1922                                        id.pid == peer->ksnp_id.pid))) {
1923                                         peer = NULL;
1924                                         continue;
1925                                 }
1926
1927                                 if (index++ == j) {
1928                                         ksocknal_peer_addref(peer);
1929                                         break;
1930                                 }
1931                         }
1932
1933                         read_unlock(&ksocknal_data.ksnd_global_lock);
1934
1935                         if (peer != NULL) {
1936                                 rc = 0;
1937                                 ksocknal_push_peer (peer);
1938                                 ksocknal_peer_decref(peer);
1939                         }
1940                 }
1941
1942         }
1943
1944         return (rc);
1945 }
1946
1947 int
1948 ksocknal_add_interface(lnet_ni_t *ni, __u32 ipaddress, __u32 netmask)
1949 {
1950         ksock_net_t       *net = ni->ni_data;
1951         ksock_interface_t *iface;
1952         int                rc;
1953         int                i;
1954         int                j;
1955         struct list_head        *ptmp;
1956         ksock_peer_t      *peer;
1957         struct list_head        *rtmp;
1958         ksock_route_t     *route;
1959
1960         if (ipaddress == 0 ||
1961             netmask == 0)
1962                 return (-EINVAL);
1963
1964         write_lock_bh(&ksocknal_data.ksnd_global_lock);
1965
1966         iface = ksocknal_ip2iface(ni, ipaddress);
1967         if (iface != NULL) {
1968                 /* silently ignore dups */
1969                 rc = 0;
1970         } else if (net->ksnn_ninterfaces == LNET_MAX_INTERFACES) {
1971                 rc = -ENOSPC;
1972         } else {
1973                 iface = &net->ksnn_interfaces[net->ksnn_ninterfaces++];
1974
1975                 iface->ksni_ipaddr = ipaddress;
1976                 iface->ksni_netmask = netmask;
1977                 iface->ksni_nroutes = 0;
1978                 iface->ksni_npeers = 0;
1979
1980                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1981                         list_for_each(ptmp, &ksocknal_data.ksnd_peers[i]) {
1982                                 peer = list_entry(ptmp, ksock_peer_t,
1983                                                       ksnp_list);
1984
1985                                 for (j = 0; j < peer->ksnp_n_passive_ips; j++)
1986                                         if (peer->ksnp_passive_ips[j] == ipaddress)
1987                                                 iface->ksni_npeers++;
1988
1989                                 list_for_each(rtmp, &peer->ksnp_routes) {
1990                                         route = list_entry(rtmp,
1991                                                                ksock_route_t,
1992                                                                ksnr_list);
1993
1994                                         if (route->ksnr_myipaddr == ipaddress)
1995                                                 iface->ksni_nroutes++;
1996                                 }
1997                         }
1998                 }
1999
2000                 rc = 0;
2001                 /* NB only new connections will pay attention to the new interface! */
2002         }
2003
2004         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
2005
2006         return (rc);
2007 }
2008
2009 void
2010 ksocknal_peer_del_interface_locked(ksock_peer_t *peer, __u32 ipaddr)
2011 {
2012         struct list_head         *tmp;
2013         struct list_head         *nxt;
2014         ksock_route_t      *route;
2015         ksock_conn_t       *conn;
2016         int                 i;
2017         int                 j;
2018
2019         for (i = 0; i < peer->ksnp_n_passive_ips; i++)
2020                 if (peer->ksnp_passive_ips[i] == ipaddr) {
2021                         for (j = i+1; j < peer->ksnp_n_passive_ips; j++)
2022                                 peer->ksnp_passive_ips[j-1] =
2023                                         peer->ksnp_passive_ips[j];
2024                         peer->ksnp_n_passive_ips--;
2025                         break;
2026                 }
2027
2028         list_for_each_safe(tmp, nxt, &peer->ksnp_routes) {
2029                 route = list_entry(tmp, ksock_route_t, ksnr_list);
2030
2031                 if (route->ksnr_myipaddr != ipaddr)
2032                         continue;
2033
2034                 if (route->ksnr_share_count != 0) {
2035                         /* Manually created; keep, but unbind */
2036                         route->ksnr_myipaddr = 0;
2037                 } else {
2038                         ksocknal_del_route_locked(route);
2039                 }
2040         }
2041
2042         list_for_each_safe(tmp, nxt, &peer->ksnp_conns) {
2043                 conn = list_entry(tmp, ksock_conn_t, ksnc_list);
2044
2045                 if (conn->ksnc_myipaddr == ipaddr)
2046                         ksocknal_close_conn_locked (conn, 0);
2047         }
2048 }
2049
2050 int
2051 ksocknal_del_interface(lnet_ni_t *ni, __u32 ipaddress)
2052 {
2053         ksock_net_t       *net = ni->ni_data;
2054         int                rc = -ENOENT;
2055         struct list_head        *tmp;
2056         struct list_head        *nxt;
2057         ksock_peer_t      *peer;
2058         __u32              this_ip;
2059         int                i;
2060         int                j;
2061
2062         write_lock_bh(&ksocknal_data.ksnd_global_lock);
2063
2064         for (i = 0; i < net->ksnn_ninterfaces; i++) {
2065                 this_ip = net->ksnn_interfaces[i].ksni_ipaddr;
2066
2067                 if (!(ipaddress == 0 ||
2068                       ipaddress == this_ip))
2069                         continue;
2070
2071                 rc = 0;
2072
2073                 for (j = i+1; j < net->ksnn_ninterfaces; j++)
2074                         net->ksnn_interfaces[j-1] =
2075                                 net->ksnn_interfaces[j];
2076
2077                 net->ksnn_ninterfaces--;
2078
2079                 for (j = 0; j < ksocknal_data.ksnd_peer_hash_size; j++) {
2080                         list_for_each_safe(tmp, nxt,
2081                                                &ksocknal_data.ksnd_peers[j]) {
2082                                 peer = list_entry(tmp, ksock_peer_t,
2083                                                       ksnp_list);
2084
2085                                 if (peer->ksnp_ni != ni)
2086                                         continue;
2087
2088                                 ksocknal_peer_del_interface_locked(peer, this_ip);
2089                         }
2090                 }
2091         }
2092
2093         write_unlock_bh(&ksocknal_data.ksnd_global_lock);
2094
2095         return (rc);
2096 }
2097
2098 int
2099 ksocknal_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
2100 {
2101         lnet_process_id_t id = {0}; 
2102         struct libcfs_ioctl_data *data = arg;
2103         int rc;
2104
2105         switch(cmd) {
2106         case IOC_LIBCFS_GET_INTERFACE: {
2107                 ksock_net_t       *net = ni->ni_data;
2108                 ksock_interface_t *iface;
2109
2110                 read_lock(&ksocknal_data.ksnd_global_lock);
2111
2112                 if (data->ioc_count >= (__u32)net->ksnn_ninterfaces) {
2113                         rc = -ENOENT;
2114                 } else {
2115                         rc = 0;
2116                         iface = &net->ksnn_interfaces[data->ioc_count];
2117
2118                         data->ioc_u32[0] = iface->ksni_ipaddr;
2119                         data->ioc_u32[1] = iface->ksni_netmask;
2120                         data->ioc_u32[2] = iface->ksni_npeers;
2121                         data->ioc_u32[3] = iface->ksni_nroutes;
2122                 }
2123
2124                 read_unlock(&ksocknal_data.ksnd_global_lock);
2125                 return rc;
2126         }
2127
2128         case IOC_LIBCFS_ADD_INTERFACE:
2129                 return ksocknal_add_interface(ni,
2130                                               data->ioc_u32[0], /* IP address */
2131                                               data->ioc_u32[1]); /* net mask */
2132
2133         case IOC_LIBCFS_DEL_INTERFACE:
2134                 return ksocknal_del_interface(ni,
2135                                               data->ioc_u32[0]); /* IP address */
2136
2137         case IOC_LIBCFS_GET_PEER: {
2138                 __u32            myip = 0;
2139                 __u32            ip = 0;
2140                 int              port = 0;
2141                 int              conn_count = 0;
2142                 int              share_count = 0;
2143
2144                 rc = ksocknal_get_peer_info(ni, data->ioc_count,
2145                                             &id, &myip, &ip, &port,
2146                                             &conn_count,  &share_count);
2147                 if (rc != 0)
2148                         return rc;
2149
2150                 data->ioc_nid    = id.nid;
2151                 data->ioc_count  = share_count;
2152                 data->ioc_u32[0] = ip;
2153                 data->ioc_u32[1] = port;
2154                 data->ioc_u32[2] = myip;
2155                 data->ioc_u32[3] = conn_count;
2156                 data->ioc_u32[4] = id.pid;
2157                 return 0;
2158         }
2159
2160         case IOC_LIBCFS_ADD_PEER:
2161                 id.nid = data->ioc_nid;
2162                 id.pid = LNET_PID_LUSTRE;
2163                 return ksocknal_add_peer (ni, id,
2164                                           data->ioc_u32[0], /* IP */
2165                                           data->ioc_u32[1]); /* port */
2166
2167         case IOC_LIBCFS_DEL_PEER:
2168                 id.nid = data->ioc_nid;
2169                 id.pid = LNET_PID_ANY;
2170                 return ksocknal_del_peer (ni, id,
2171                                           data->ioc_u32[0]); /* IP */
2172
2173         case IOC_LIBCFS_GET_CONN: {
2174                 int           txmem;
2175                 int           rxmem;
2176                 int           nagle;
2177                 ksock_conn_t *conn = ksocknal_get_conn_by_idx (ni, data->ioc_count);
2178
2179                 if (conn == NULL)
2180                         return -ENOENT;
2181
2182                 ksocknal_lib_get_conn_tunables(conn, &txmem, &rxmem, &nagle);
2183
2184                 data->ioc_count  = txmem;
2185                 data->ioc_nid    = conn->ksnc_peer->ksnp_id.nid;
2186                 data->ioc_flags  = nagle;
2187                 data->ioc_u32[0] = conn->ksnc_ipaddr;
2188                 data->ioc_u32[1] = conn->ksnc_port;
2189                 data->ioc_u32[2] = conn->ksnc_myipaddr;
2190                 data->ioc_u32[3] = conn->ksnc_type;
2191                 data->ioc_u32[4] = conn->ksnc_scheduler->kss_info->ksi_cpt;
2192                 data->ioc_u32[5] = rxmem;
2193                 data->ioc_u32[6] = conn->ksnc_peer->ksnp_id.pid;
2194                 ksocknal_conn_decref(conn);
2195                 return 0;
2196         }
2197
2198         case IOC_LIBCFS_CLOSE_CONNECTION:
2199                 id.nid = data->ioc_nid;
2200                 id.pid = LNET_PID_ANY;
2201                 return ksocknal_close_matching_conns (id,
2202                                                       data->ioc_u32[0]);
2203
2204         case IOC_LIBCFS_REGISTER_MYNID:
2205                 /* Ignore if this is a noop */
2206                 if (data->ioc_nid == ni->ni_nid)
2207                         return 0;
2208
2209                 CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID: %s(%s)\n",
2210                        libcfs_nid2str(data->ioc_nid),
2211                        libcfs_nid2str(ni->ni_nid));
2212                 return -EINVAL;
2213
2214         case IOC_LIBCFS_PUSH_CONNECTION:
2215                 id.nid = data->ioc_nid;
2216                 id.pid = LNET_PID_ANY;
2217                 return ksocknal_push(ni, id);
2218
2219         default:
2220                 return -EINVAL;
2221         }
2222         /* not reached */
2223 }
2224
2225 void
2226 ksocknal_free_buffers (void)
2227 {
2228         LASSERT (atomic_read(&ksocknal_data.ksnd_nactive_txs) == 0);
2229
2230         if (ksocknal_data.ksnd_sched_info != NULL) {
2231                 struct ksock_sched_info *info;
2232                 int                     i;
2233
2234                 cfs_percpt_for_each(info, i, ksocknal_data.ksnd_sched_info) {
2235                         if (info->ksi_scheds != NULL) {
2236                                 LIBCFS_FREE(info->ksi_scheds,
2237                                             info->ksi_nthreads_max *
2238                                             sizeof(info->ksi_scheds[0]));
2239                         }
2240                 }
2241                 cfs_percpt_free(ksocknal_data.ksnd_sched_info);
2242         }
2243
2244         LIBCFS_FREE (ksocknal_data.ksnd_peers,
2245                      sizeof(struct list_head) *
2246                      ksocknal_data.ksnd_peer_hash_size);
2247
2248         spin_lock(&ksocknal_data.ksnd_tx_lock);
2249
2250         if (!list_empty(&ksocknal_data.ksnd_idle_noop_txs)) {
2251                 struct list_head        zlist;
2252                 ksock_tx_t      *tx;
2253
2254                 list_add(&zlist, &ksocknal_data.ksnd_idle_noop_txs);
2255                 list_del_init(&ksocknal_data.ksnd_idle_noop_txs);
2256                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
2257
2258                 while (!list_empty(&zlist)) {
2259                         tx = list_entry(zlist.next, ksock_tx_t, tx_list);
2260                         list_del(&tx->tx_list);
2261                         LIBCFS_FREE(tx, tx->tx_desc_size);
2262                 }
2263         } else {
2264                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
2265         }
2266 }
2267
2268 void
2269 ksocknal_base_shutdown(void)
2270 {
2271         struct ksock_sched_info *info;
2272         ksock_sched_t           *sched;
2273         int                     i;
2274         int                     j;
2275
2276         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
2277                atomic_read (&libcfs_kmemory));
2278         LASSERT (ksocknal_data.ksnd_nnets == 0);
2279
2280         switch (ksocknal_data.ksnd_init) {
2281         default:
2282                 LASSERT (0);
2283
2284         case SOCKNAL_INIT_ALL:
2285         case SOCKNAL_INIT_DATA:
2286                 LASSERT (ksocknal_data.ksnd_peers != NULL);
2287                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
2288                         LASSERT(list_empty(&ksocknal_data.ksnd_peers[i]));
2289                 }
2290
2291                 LASSERT(list_empty(&ksocknal_data.ksnd_nets));
2292                 LASSERT(list_empty(&ksocknal_data.ksnd_enomem_conns));
2293                 LASSERT(list_empty(&ksocknal_data.ksnd_zombie_conns));
2294                 LASSERT(list_empty(&ksocknal_data.ksnd_connd_connreqs));
2295                 LASSERT(list_empty(&ksocknal_data.ksnd_connd_routes));
2296
2297                 if (ksocknal_data.ksnd_sched_info != NULL) {
2298                         cfs_percpt_for_each(info, i,
2299                                             ksocknal_data.ksnd_sched_info) {
2300                                 if (info->ksi_scheds == NULL)
2301                                         continue;
2302
2303                                 for (j = 0; j < info->ksi_nthreads_max; j++) {
2304
2305                                         sched = &info->ksi_scheds[j];
2306                                         LASSERT(list_empty(&sched->\
2307                                                                kss_tx_conns));
2308                                         LASSERT(list_empty(&sched->\
2309                                                                kss_rx_conns));
2310                                         LASSERT(list_empty(&sched-> \
2311                                                   kss_zombie_noop_txs));
2312                                         LASSERT(sched->kss_nconns == 0);
2313                                 }
2314                         }
2315                 }
2316
2317                 /* flag threads to terminate; wake and wait for them to die */
2318                 ksocknal_data.ksnd_shuttingdown = 1;
2319                 wake_up_all(&ksocknal_data.ksnd_connd_waitq);
2320                 wake_up_all(&ksocknal_data.ksnd_reaper_waitq);
2321
2322                 if (ksocknal_data.ksnd_sched_info != NULL) {
2323                         cfs_percpt_for_each(info, i,
2324                                             ksocknal_data.ksnd_sched_info) {
2325                                 if (info->ksi_scheds == NULL)
2326                                         continue;
2327
2328                                 for (j = 0; j < info->ksi_nthreads_max; j++) {
2329                                         sched = &info->ksi_scheds[j];
2330                                         wake_up_all(&sched->kss_waitq);
2331                                 }
2332                         }
2333                 }
2334
2335                 i = 4;
2336                 read_lock(&ksocknal_data.ksnd_global_lock);
2337                 while (ksocknal_data.ksnd_nthreads != 0) {
2338                         i++;
2339                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
2340                                "waiting for %d threads to terminate\n",
2341                                 ksocknal_data.ksnd_nthreads);
2342                         read_unlock(&ksocknal_data.ksnd_global_lock);
2343                         cfs_pause(cfs_time_seconds(1));
2344                         read_lock(&ksocknal_data.ksnd_global_lock);
2345                 }
2346                 read_unlock(&ksocknal_data.ksnd_global_lock);
2347
2348                 ksocknal_free_buffers();
2349
2350                 ksocknal_data.ksnd_init = SOCKNAL_INIT_NOTHING;
2351                 break;
2352         }
2353
2354         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
2355                atomic_read (&libcfs_kmemory));
2356
2357         module_put(THIS_MODULE);
2358 }
2359
2360 __u64 ksocknal_new_incarnation (void)
2361 {
2362         struct timeval tv;
2363
2364         /* The incarnation number is the time this module loaded and it
2365          * identifies this particular instance of the socknal.  Hopefully
2366          * we won't be able to reboot more frequently than 1MHz for the
2367          * forseeable future :) */
2368
2369         do_gettimeofday(&tv);
2370
2371         return (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
2372 }
2373
2374 int
2375 ksocknal_base_startup(void)
2376 {
2377         struct ksock_sched_info *info;
2378         int                     rc;
2379         int                     i;
2380
2381         LASSERT (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
2382         LASSERT (ksocknal_data.ksnd_nnets == 0);
2383
2384         memset (&ksocknal_data, 0, sizeof (ksocknal_data)); /* zero pointers */
2385
2386         ksocknal_data.ksnd_peer_hash_size = SOCKNAL_PEER_HASH_SIZE;
2387         LIBCFS_ALLOC(ksocknal_data.ksnd_peers,
2388                      sizeof(struct list_head) *
2389                      ksocknal_data.ksnd_peer_hash_size);
2390         if (ksocknal_data.ksnd_peers == NULL)
2391                 return -ENOMEM;
2392
2393         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++)
2394                 INIT_LIST_HEAD(&ksocknal_data.ksnd_peers[i]);
2395
2396         rwlock_init(&ksocknal_data.ksnd_global_lock);
2397         INIT_LIST_HEAD(&ksocknal_data.ksnd_nets);
2398
2399         spin_lock_init(&ksocknal_data.ksnd_reaper_lock);
2400         INIT_LIST_HEAD(&ksocknal_data.ksnd_enomem_conns);
2401         INIT_LIST_HEAD(&ksocknal_data.ksnd_zombie_conns);
2402         INIT_LIST_HEAD(&ksocknal_data.ksnd_deathrow_conns);
2403         init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
2404
2405         spin_lock_init(&ksocknal_data.ksnd_connd_lock);
2406         INIT_LIST_HEAD(&ksocknal_data.ksnd_connd_connreqs);
2407         INIT_LIST_HEAD(&ksocknal_data.ksnd_connd_routes);
2408         init_waitqueue_head(&ksocknal_data.ksnd_connd_waitq);
2409
2410         spin_lock_init(&ksocknal_data.ksnd_tx_lock);
2411         INIT_LIST_HEAD(&ksocknal_data.ksnd_idle_noop_txs);
2412
2413         /* NB memset above zeros whole of ksocknal_data */
2414
2415         /* flag lists/ptrs/locks initialised */
2416         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
2417         try_module_get(THIS_MODULE);
2418
2419         ksocknal_data.ksnd_sched_info = cfs_percpt_alloc(lnet_cpt_table(),
2420                                                          sizeof(*info));
2421         if (ksocknal_data.ksnd_sched_info == NULL)
2422                 goto failed;
2423
2424         cfs_percpt_for_each(info, i, ksocknal_data.ksnd_sched_info) {
2425                 ksock_sched_t   *sched;
2426                 int             nthrs;
2427
2428                 nthrs = cfs_cpt_weight(lnet_cpt_table(), i);
2429                 if (*ksocknal_tunables.ksnd_nscheds > 0) {
2430                         nthrs = min(nthrs, *ksocknal_tunables.ksnd_nscheds);
2431                 } else {
2432                         /* max to half of CPUs, assume another half should be
2433                          * reserved for upper layer modules */
2434                         nthrs = min(max(SOCKNAL_NSCHEDS, nthrs >> 1), nthrs);
2435                 }
2436
2437                 info->ksi_nthreads_max = nthrs;
2438                 info->ksi_cpt = i;
2439
2440                 LIBCFS_CPT_ALLOC(info->ksi_scheds, lnet_cpt_table(), i,
2441                                  info->ksi_nthreads_max * sizeof(*sched));
2442                 if (info->ksi_scheds == NULL)
2443                         goto failed;
2444
2445                 for (; nthrs > 0; nthrs--) {
2446                         sched = &info->ksi_scheds[nthrs - 1];
2447
2448                         sched->kss_info = info;
2449                         spin_lock_init(&sched->kss_lock);
2450                         INIT_LIST_HEAD(&sched->kss_rx_conns);
2451                         INIT_LIST_HEAD(&sched->kss_tx_conns);
2452                         INIT_LIST_HEAD(&sched->kss_zombie_noop_txs);
2453                         init_waitqueue_head(&sched->kss_waitq);
2454                 }
2455         }
2456
2457         ksocknal_data.ksnd_connd_starting         = 0;
2458         ksocknal_data.ksnd_connd_failed_stamp     = 0;
2459         ksocknal_data.ksnd_connd_starting_stamp   = cfs_time_current_sec();
2460         /* must have at least 2 connds to remain responsive to accepts while
2461          * connecting */
2462         if (*ksocknal_tunables.ksnd_nconnds < SOCKNAL_CONND_RESV + 1)
2463                 *ksocknal_tunables.ksnd_nconnds = SOCKNAL_CONND_RESV + 1;
2464
2465         if (*ksocknal_tunables.ksnd_nconnds_max <
2466             *ksocknal_tunables.ksnd_nconnds) {
2467                 ksocknal_tunables.ksnd_nconnds_max =
2468                         ksocknal_tunables.ksnd_nconnds;
2469         }
2470
2471         for (i = 0; i < *ksocknal_tunables.ksnd_nconnds; i++) {
2472                 char name[16];
2473                 spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
2474                 ksocknal_data.ksnd_connd_starting++;
2475                 spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
2476
2477
2478                 snprintf(name, sizeof(name), "socknal_cd%02d", i);
2479                 rc = ksocknal_thread_start(ksocknal_connd,
2480                                            (void *)((ulong_ptr_t)i), name);
2481                 if (rc != 0) {
2482                         spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
2483                         ksocknal_data.ksnd_connd_starting--;
2484                         spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
2485                         CERROR("Can't spawn socknal connd: %d\n", rc);
2486                         goto failed;
2487                 }
2488         }
2489
2490         rc = ksocknal_thread_start(ksocknal_reaper, NULL, "socknal_reaper");
2491         if (rc != 0) {
2492                 CERROR ("Can't spawn socknal reaper: %d\n", rc);
2493                 goto failed;
2494         }
2495
2496         /* flag everything initialised */
2497         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
2498
2499         return 0;
2500
2501  failed:
2502         ksocknal_base_shutdown();
2503         return -ENETDOWN;
2504 }
2505
2506 void
2507 ksocknal_debug_peerhash (lnet_ni_t *ni)
2508 {
2509         ksock_peer_t    *peer = NULL;
2510         struct list_head        *tmp;
2511         int             i;
2512
2513         read_lock(&ksocknal_data.ksnd_global_lock);
2514
2515         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
2516                 list_for_each(tmp, &ksocknal_data.ksnd_peers[i]) {
2517                         peer = list_entry(tmp, ksock_peer_t, ksnp_list);
2518
2519                         if (peer->ksnp_ni == ni) break;
2520
2521                         peer = NULL;
2522                 }
2523         }
2524
2525         if (peer != NULL) {
2526                 ksock_route_t *route;
2527                 ksock_conn_t  *conn;
2528
2529                 CWARN ("Active peer on shutdown: %s, ref %d, scnt %d, "
2530                        "closing %d, accepting %d, err %d, zcookie "LPU64", "
2531                        "txq %d, zc_req %d\n", libcfs_id2str(peer->ksnp_id),
2532                        atomic_read(&peer->ksnp_refcount),
2533                        peer->ksnp_sharecount, peer->ksnp_closing,
2534                        peer->ksnp_accepting, peer->ksnp_error,
2535                        peer->ksnp_zc_next_cookie,
2536                        !list_empty(&peer->ksnp_tx_queue),
2537                        !list_empty(&peer->ksnp_zc_req_list));
2538
2539                 list_for_each(tmp, &peer->ksnp_routes) {
2540                         route = list_entry(tmp, ksock_route_t, ksnr_list);
2541                         CWARN ("Route: ref %d, schd %d, conn %d, cnted %d, "
2542                                "del %d\n", atomic_read(&route->ksnr_refcount),
2543                                route->ksnr_scheduled, route->ksnr_connecting,
2544                                route->ksnr_connected, route->ksnr_deleted);
2545                 }
2546
2547                 list_for_each(tmp, &peer->ksnp_conns) {
2548                         conn = list_entry(tmp, ksock_conn_t, ksnc_list);
2549                         CWARN ("Conn: ref %d, sref %d, t %d, c %d\n",
2550                                atomic_read(&conn->ksnc_conn_refcount),
2551                                atomic_read(&conn->ksnc_sock_refcount),
2552                                conn->ksnc_type, conn->ksnc_closing);
2553                 }
2554         }
2555
2556         read_unlock(&ksocknal_data.ksnd_global_lock);
2557         return;
2558 }
2559
2560 void
2561 ksocknal_shutdown (lnet_ni_t *ni)
2562 {
2563         ksock_net_t      *net = ni->ni_data;
2564         int               i;
2565         lnet_process_id_t anyid = {0};
2566
2567         anyid.nid =  LNET_NID_ANY;
2568         anyid.pid =  LNET_PID_ANY;
2569
2570         LASSERT(ksocknal_data.ksnd_init == SOCKNAL_INIT_ALL);
2571         LASSERT(ksocknal_data.ksnd_nnets > 0);
2572
2573         spin_lock_bh(&net->ksnn_lock);
2574         net->ksnn_shutdown = 1;                 /* prevent new peers */
2575         spin_unlock_bh(&net->ksnn_lock);
2576
2577         /* Delete all peers */
2578         ksocknal_del_peer(ni, anyid, 0);
2579
2580         /* Wait for all peer state to clean up */
2581         i = 2;
2582         spin_lock_bh(&net->ksnn_lock);
2583         while (net->ksnn_npeers != 0) {
2584                 spin_unlock_bh(&net->ksnn_lock);
2585
2586                 i++;
2587                 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
2588                        "waiting for %d peers to disconnect\n",
2589                        net->ksnn_npeers);
2590                 cfs_pause(cfs_time_seconds(1));
2591
2592                 ksocknal_debug_peerhash(ni);
2593
2594                 spin_lock_bh(&net->ksnn_lock);
2595         }
2596         spin_unlock_bh(&net->ksnn_lock);
2597
2598         for (i = 0; i < net->ksnn_ninterfaces; i++) {
2599                 LASSERT (net->ksnn_interfaces[i].ksni_npeers == 0);
2600                 LASSERT (net->ksnn_interfaces[i].ksni_nroutes == 0);
2601         }
2602
2603         list_del(&net->ksnn_list);
2604         LIBCFS_FREE(net, sizeof(*net));
2605
2606         ksocknal_data.ksnd_nnets--;
2607         if (ksocknal_data.ksnd_nnets == 0)
2608                 ksocknal_base_shutdown();
2609 }
2610
2611 int
2612 ksocknal_enumerate_interfaces(ksock_net_t *net)
2613 {
2614         char      **names;
2615         int         i;
2616         int         j;
2617         int         rc;
2618         int         n;
2619
2620         n = libcfs_ipif_enumerate(&names);
2621         if (n <= 0) {
2622                 CERROR("Can't enumerate interfaces: %d\n", n);
2623                 return n;
2624         }
2625
2626         for (i = j = 0; i < n; i++) {
2627                 int        up;
2628                 __u32      ip;
2629                 __u32      mask;
2630
2631                 if (!strcmp(names[i], "lo")) /* skip the loopback IF */
2632                         continue;
2633
2634                 rc = libcfs_ipif_query(names[i], &up, &ip, &mask);
2635                 if (rc != 0) {
2636                         CWARN("Can't get interface %s info: %d\n",
2637                               names[i], rc);
2638                         continue;
2639                 }
2640
2641                 if (!up) {
2642                         CWARN("Ignoring interface %s (down)\n",
2643                               names[i]);
2644                         continue;
2645                 }
2646
2647                 if (j == LNET_MAX_INTERFACES) {
2648                         CWARN("Ignoring interface %s (too many interfaces)\n",
2649                               names[i]);
2650                         continue;
2651                 }
2652
2653                 net->ksnn_interfaces[j].ksni_ipaddr = ip;
2654                 net->ksnn_interfaces[j].ksni_netmask = mask;
2655                 strlcpy(net->ksnn_interfaces[j].ksni_name,
2656                         names[i], sizeof(net->ksnn_interfaces[j].ksni_name));
2657                 j++;
2658         }
2659
2660         libcfs_ipif_free_enumeration(names, n);
2661
2662         if (j == 0)
2663                 CERROR("Can't find any usable interfaces\n");
2664
2665         return j;
2666 }
2667
2668 int
2669 ksocknal_search_new_ipif(ksock_net_t *net)
2670 {
2671         int     new_ipif = 0;
2672         int     i;
2673
2674         for (i = 0; i < net->ksnn_ninterfaces; i++) {
2675                 char            *ifnam = &net->ksnn_interfaces[i].ksni_name[0];
2676                 char            *colon = strchr(ifnam, ':');
2677                 int             found  = 0;
2678                 ksock_net_t     *tmp;
2679                 int             j;
2680
2681                 if (colon != NULL) /* ignore alias device */
2682                         *colon = 0;
2683
2684                 list_for_each_entry(tmp, &ksocknal_data.ksnd_nets,
2685                                         ksnn_list) {
2686                         for (j = 0; !found && j < tmp->ksnn_ninterfaces; j++) {
2687                                 char *ifnam2 = &tmp->ksnn_interfaces[j].\
2688                                              ksni_name[0];
2689                                 char *colon2 = strchr(ifnam2, ':');
2690
2691                                 if (colon2 != NULL)
2692                                         *colon2 = 0;
2693
2694                                 found = strcmp(ifnam, ifnam2) == 0;
2695                                 if (colon2 != NULL)
2696                                         *colon2 = ':';
2697                         }
2698                         if (found)
2699                                 break;
2700                 }
2701
2702                 new_ipif += !found;
2703                 if (colon != NULL)
2704                         *colon = ':';
2705         }
2706
2707         return new_ipif;
2708 }
2709
2710 int
2711 ksocknal_start_schedulers(struct ksock_sched_info *info)
2712 {
2713         int     nthrs;
2714         int     rc = 0;
2715         int     i;
2716
2717         if (info->ksi_nthreads == 0) {
2718                 if (*ksocknal_tunables.ksnd_nscheds > 0) {
2719                         nthrs = info->ksi_nthreads_max;
2720                 } else {
2721                         nthrs = cfs_cpt_weight(lnet_cpt_table(),
2722                                                info->ksi_cpt);
2723                         nthrs = min(max(SOCKNAL_NSCHEDS, nthrs >> 1), nthrs);
2724                         nthrs = min(SOCKNAL_NSCHEDS_HIGH, nthrs);
2725                 }
2726                 nthrs = min(nthrs, info->ksi_nthreads_max);
2727         } else {
2728                 LASSERT(info->ksi_nthreads <= info->ksi_nthreads_max);
2729                 /* increase two threads if there is new interface */
2730                 nthrs = min(2, info->ksi_nthreads_max - info->ksi_nthreads);
2731         }
2732
2733         for (i = 0; i < nthrs; i++) {
2734                 long            id;
2735                 char            name[20];
2736                 ksock_sched_t   *sched;
2737                 id = KSOCK_THREAD_ID(info->ksi_cpt, info->ksi_nthreads + i);
2738                 sched = &info->ksi_scheds[KSOCK_THREAD_SID(id)];
2739                 snprintf(name, sizeof(name), "socknal_sd%02d_%02d",
2740                          info->ksi_cpt, (int)(sched - &info->ksi_scheds[0]));
2741
2742                 rc = ksocknal_thread_start(ksocknal_scheduler,
2743                                            (void *)id, name);
2744                 if (rc == 0)
2745                         continue;
2746
2747                 CERROR("Can't spawn thread %d for scheduler[%d]: %d\n",
2748                        info->ksi_cpt, info->ksi_nthreads + i, rc);
2749                 break;
2750         }
2751
2752         info->ksi_nthreads += i;
2753         return rc;
2754 }
2755
2756 int
2757 ksocknal_net_start_threads(ksock_net_t *net, __u32 *cpts, int ncpts)
2758 {
2759         int     newif = ksocknal_search_new_ipif(net);
2760         int     rc;
2761         int     i;
2762
2763         LASSERT(ncpts > 0 && ncpts <= cfs_cpt_number(lnet_cpt_table()));
2764
2765         for (i = 0; i < ncpts; i++) {
2766                 struct ksock_sched_info *info;
2767                 int cpt = (cpts == NULL) ? i : cpts[i];
2768
2769                 LASSERT(cpt < cfs_cpt_number(lnet_cpt_table()));
2770                 info = ksocknal_data.ksnd_sched_info[cpt];
2771
2772                 if (!newif && info->ksi_nthreads > 0)
2773                         continue;
2774
2775                 rc = ksocknal_start_schedulers(info);
2776                 if (rc != 0)
2777                         return rc;
2778         }
2779         return 0;
2780 }
2781
2782 int
2783 ksocknal_startup (lnet_ni_t *ni)
2784 {
2785         ksock_net_t  *net;
2786         int           rc;
2787         int           i;
2788
2789         LASSERT (ni->ni_lnd == &the_ksocklnd);
2790
2791         if (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING) {
2792                 rc = ksocknal_base_startup();
2793                 if (rc != 0)
2794                         return rc;
2795         }
2796
2797         LIBCFS_ALLOC(net, sizeof(*net));
2798         if (net == NULL)
2799                 goto fail_0;
2800
2801         spin_lock_init(&net->ksnn_lock);
2802         net->ksnn_incarnation = ksocknal_new_incarnation();
2803         ni->ni_data = net;
2804         ni->ni_peertimeout    = *ksocknal_tunables.ksnd_peertimeout;
2805         ni->ni_maxtxcredits   = *ksocknal_tunables.ksnd_credits;
2806         ni->ni_peertxcredits  = *ksocknal_tunables.ksnd_peertxcredits;
2807         ni->ni_peerrtrcredits = *ksocknal_tunables.ksnd_peerrtrcredits;
2808
2809         if (ni->ni_interfaces[0] == NULL) {
2810                 rc = ksocknal_enumerate_interfaces(net);
2811                 if (rc <= 0)
2812                         goto fail_1;
2813
2814                 net->ksnn_ninterfaces = 1;
2815         } else {
2816                 for (i = 0; i < LNET_MAX_INTERFACES; i++) {
2817                         int    up;
2818
2819                         if (ni->ni_interfaces[i] == NULL)
2820                                 break;
2821
2822                         rc = libcfs_ipif_query(
2823                                 ni->ni_interfaces[i], &up,
2824                                 &net->ksnn_interfaces[i].ksni_ipaddr,
2825                                 &net->ksnn_interfaces[i].ksni_netmask);
2826
2827                         if (rc != 0) {
2828                                 CERROR("Can't get interface %s info: %d\n",
2829                                        ni->ni_interfaces[i], rc);
2830                                 goto fail_1;
2831                         }
2832
2833                         if (!up) {
2834                                 CERROR("Interface %s is down\n",
2835                                        ni->ni_interfaces[i]);
2836                                 goto fail_1;
2837                         }
2838
2839                         strlcpy(net->ksnn_interfaces[i].ksni_name,
2840                                 ni->ni_interfaces[i],
2841                                 sizeof(net->ksnn_interfaces[i].ksni_name));
2842                 }
2843                 net->ksnn_ninterfaces = i;
2844         }
2845
2846         /* call it before add it to ksocknal_data.ksnd_nets */
2847         rc = ksocknal_net_start_threads(net, ni->ni_cpts, ni->ni_ncpts);
2848         if (rc != 0)
2849                 goto fail_1;
2850
2851         ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid),
2852                                 net->ksnn_interfaces[0].ksni_ipaddr);
2853         list_add(&net->ksnn_list, &ksocknal_data.ksnd_nets);
2854
2855         ksocknal_data.ksnd_nnets++;
2856
2857         return 0;
2858
2859  fail_1:
2860         LIBCFS_FREE(net, sizeof(*net));
2861  fail_0:
2862         if (ksocknal_data.ksnd_nnets == 0)
2863                 ksocknal_base_shutdown();
2864
2865         return -ENETDOWN;
2866 }
2867
2868
2869 void __exit
2870 ksocknal_module_fini (void)
2871 {
2872         lnet_unregister_lnd(&the_ksocklnd);
2873         ksocknal_tunables_fini();
2874 }
2875
2876 int __init
2877 ksocknal_module_init (void)
2878 {
2879         int    rc;
2880
2881         /* check ksnr_connected/connecting field large enough */
2882         CLASSERT (SOCKLND_CONN_NTYPES <= 4);
2883         CLASSERT (SOCKLND_CONN_ACK == SOCKLND_CONN_BULK_IN);
2884
2885         /* initialize the_ksocklnd */
2886         the_ksocklnd.lnd_type     = SOCKLND;
2887         the_ksocklnd.lnd_startup  = ksocknal_startup;
2888         the_ksocklnd.lnd_shutdown = ksocknal_shutdown;
2889         the_ksocklnd.lnd_ctl      = ksocknal_ctl;
2890         the_ksocklnd.lnd_send     = ksocknal_send;
2891         the_ksocklnd.lnd_recv     = ksocknal_recv;
2892         the_ksocklnd.lnd_notify   = ksocknal_notify;
2893         the_ksocklnd.lnd_query    = ksocknal_query;
2894         the_ksocklnd.lnd_accept   = ksocknal_accept;
2895
2896         rc = ksocknal_tunables_init();
2897         if (rc != 0)
2898                 return rc;
2899
2900         lnet_register_lnd(&the_ksocklnd);
2901
2902         return 0;
2903 }
2904
2905 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2906 MODULE_DESCRIPTION("Kernel TCP Socket LND v3.0.0");
2907 MODULE_LICENSE("GPL");
2908
2909 cfs_module(ksocknal, "3.0.0", ksocknal_module_init, ksocknal_module_fini);