Whamcloud - gitweb
* Landed portals:b_port_step as follows...
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@zabbo.net>
6  *   Author: Peter J. Braam <braam@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Eric Barton <eric@bartonsoftware.com>
9  *
10  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  *   Portals is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Portals is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Portals; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #include "socknal.h"
27
28 nal_t                   ksocknal_api;
29 ksock_nal_data_t        ksocknal_data;
30 ptl_handle_ni_t         ksocknal_ni;
31 ksock_tunables_t        ksocknal_tunables;
32
33 kpr_nal_interface_t ksocknal_router_interface = {
34         kprni_nalid:      SOCKNAL,
35         kprni_arg:        &ksocknal_data,
36         kprni_fwd:        ksocknal_fwd_packet,
37         kprni_notify:     ksocknal_notify,
38 };
39
40 int
41 ksocknal_set_mynid(ptl_nid_t nid)
42 {
43         lib_ni_t *ni = &ksocknal_lib.libnal_ni;
44
45         /* FIXME: we have to do this because we call lib_init() at module
46          * insertion time, which is before we have 'mynid' available.  lib_init
47          * sets the NAL's nid, which it uses to tell other nodes where packets
48          * are coming from.  This is not a very graceful solution to this
49          * problem. */
50
51         CDEBUG(D_IOCTL, "setting mynid to "LPX64" (old nid="LPX64")\n",
52                nid, ni->ni_pid.nid);
53
54         ni->ni_pid.nid = nid;
55         return (0);
56 }
57
58 ksock_interface_t *
59 ksocknal_ip2iface(__u32 ip)
60 {
61         int                i;
62         ksock_interface_t *iface;
63
64         for (i = 0; i < ksocknal_data.ksnd_ninterfaces; i++) {
65                 LASSERT(i < SOCKNAL_MAX_INTERFACES);
66                 iface = &ksocknal_data.ksnd_interfaces[i];
67
68                 if (iface->ksni_ipaddr == ip)
69                         return (iface);
70         }
71
72         return (NULL);
73 }
74
75 ksock_route_t *
76 ksocknal_create_route (__u32 ipaddr, int port)
77 {
78         ksock_route_t *route;
79
80         PORTAL_ALLOC (route, sizeof (*route));
81         if (route == NULL)
82                 return (NULL);
83
84         atomic_set (&route->ksnr_refcount, 1);
85         route->ksnr_peer = NULL;
86         route->ksnr_timeout = cfs_time_current();
87         route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
88         route->ksnr_ipaddr = ipaddr;
89         route->ksnr_port = port;
90         route->ksnr_connecting = 0;
91         route->ksnr_connected = 0;
92         route->ksnr_deleted = 0;
93         route->ksnr_conn_count = 0;
94         route->ksnr_share_count = 0;
95
96         return (route);
97 }
98
99 void
100 ksocknal_destroy_route (ksock_route_t *route)
101 {
102         if (route->ksnr_peer != NULL)
103                 ksocknal_put_peer (route->ksnr_peer);
104
105         PORTAL_FREE (route, sizeof (*route));
106 }
107
108 void
109 ksocknal_put_route (ksock_route_t *route)
110 {
111         CDEBUG (D_OTHER, "putting route[%p] (%d)\n",
112                 route, atomic_read (&route->ksnr_refcount));
113
114         LASSERT (atomic_read (&route->ksnr_refcount) > 0);
115         if (!atomic_dec_and_test (&route->ksnr_refcount))
116              return;
117
118         ksocknal_destroy_route (route);
119 }
120
121 ksock_peer_t *
122 ksocknal_create_peer (ptl_nid_t nid)
123 {
124         ksock_peer_t *peer;
125
126         LASSERT (nid != PTL_NID_ANY);
127
128         PORTAL_ALLOC (peer, sizeof (*peer));
129         if (peer == NULL)
130                 return (NULL);
131
132         memset (peer, 0, sizeof (*peer));       /* NULL pointers/clear flags etc */
133
134         peer->ksnp_nid = nid;
135         atomic_set (&peer->ksnp_refcount, 1);   /* 1 ref for caller */
136         peer->ksnp_closing = 0;
137         CFS_INIT_LIST_HEAD (&peer->ksnp_conns);
138         CFS_INIT_LIST_HEAD (&peer->ksnp_routes);
139         CFS_INIT_LIST_HEAD (&peer->ksnp_tx_queue);
140
141         atomic_inc (&ksocknal_data.ksnd_npeers);
142         return (peer);
143 }
144
145 void
146 ksocknal_destroy_peer (ksock_peer_t *peer)
147 {
148         CDEBUG (D_NET, "peer "LPX64" %p deleted\n", peer->ksnp_nid, peer);
149
150         LASSERT (atomic_read (&peer->ksnp_refcount) == 0);
151         LASSERT (list_empty (&peer->ksnp_conns));
152         LASSERT (list_empty (&peer->ksnp_routes));
153         LASSERT (list_empty (&peer->ksnp_tx_queue));
154
155         PORTAL_FREE (peer, sizeof (*peer));
156
157         /* NB a peer's connections and autoconnect routes keep a reference
158          * on their peer until they are destroyed, so we can be assured
159          * that _all_ state to do with this peer has been cleaned up when
160          * its refcount drops to zero. */
161         atomic_dec (&ksocknal_data.ksnd_npeers);
162 }
163
164 void
165 ksocknal_put_peer (ksock_peer_t *peer)
166 {
167         CDEBUG (D_OTHER, "putting peer[%p] -> "LPX64" (%d)\n",
168                 peer, peer->ksnp_nid,
169                 atomic_read (&peer->ksnp_refcount));
170
171         LASSERT (atomic_read (&peer->ksnp_refcount) > 0);
172         if (!atomic_dec_and_test (&peer->ksnp_refcount))
173                 return;
174
175         ksocknal_destroy_peer (peer);
176 }
177
178 ksock_peer_t *
179 ksocknal_find_peer_locked (ptl_nid_t nid)
180 {
181         struct list_head *peer_list = ksocknal_nid2peerlist (nid);
182         struct list_head *tmp;
183         ksock_peer_t     *peer;
184
185         list_for_each (tmp, peer_list) {
186
187                 peer = list_entry (tmp, ksock_peer_t, ksnp_list);
188
189                 LASSERT (!peer->ksnp_closing);
190
191                 if (peer->ksnp_nid != nid)
192                         continue;
193
194                 CDEBUG(D_NET, "got peer [%p] -> "LPX64" (%d)\n",
195                        peer, nid, atomic_read (&peer->ksnp_refcount));
196                 return (peer);
197         }
198         return (NULL);
199 }
200
201 ksock_peer_t *
202 ksocknal_get_peer (ptl_nid_t nid)
203 {
204         ksock_peer_t     *peer;
205
206         read_lock (&ksocknal_data.ksnd_global_lock);
207         peer = ksocknal_find_peer_locked (nid);
208         if (peer != NULL)                       /* +1 ref for caller? */
209                 atomic_inc (&peer->ksnp_refcount);
210         read_unlock (&ksocknal_data.ksnd_global_lock);
211
212         return (peer);
213 }
214
215 void
216 ksocknal_unlink_peer_locked (ksock_peer_t *peer)
217 {
218         int                i;
219         __u32              ip;
220
221         for (i = 0; i < peer->ksnp_n_passive_ips; i++) {
222                 LASSERT (i < SOCKNAL_MAX_INTERFACES);
223                 ip = peer->ksnp_passive_ips[i];
224
225                 ksocknal_ip2iface(ip)->ksni_npeers--;
226         }
227
228         LASSERT (list_empty(&peer->ksnp_conns));
229         LASSERT (list_empty(&peer->ksnp_routes));
230         LASSERT (!peer->ksnp_closing);
231         peer->ksnp_closing = 1;
232         list_del (&peer->ksnp_list);
233         /* lose peerlist's ref */
234         ksocknal_put_peer (peer);
235 }
236
237 int
238 ksocknal_get_peer_info (int index, ptl_nid_t *nid,
239                         __u32 *myip, __u32 *peer_ip, int *port,
240                         int *conn_count, int *share_count)
241 {
242         ksock_peer_t      *peer;
243         struct list_head  *ptmp;
244         ksock_route_t     *route;
245         struct list_head  *rtmp;
246         int                i;
247         int                j;
248         int                rc = -ENOENT;
249
250         read_lock (&ksocknal_data.ksnd_global_lock);
251
252         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
253
254                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
255                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
256
257                         if (peer->ksnp_n_passive_ips == 0 &&
258                             list_empty(&peer->ksnp_routes)) {
259                                 if (index-- > 0)
260                                         continue;
261
262                                 *nid = peer->ksnp_nid;
263                                 *myip = 0;
264                                 *peer_ip = 0;
265                                 *port = 0;
266                                 *conn_count = 0;
267                                 *share_count = 0;
268                                 rc = 0;
269                                 goto out;
270                         }
271
272                         for (j = 0; j < peer->ksnp_n_passive_ips; j++) {
273                                 if (index-- > 0)
274                                         continue;
275
276                                 *nid = peer->ksnp_nid;
277                                 *myip = peer->ksnp_passive_ips[j];
278                                 *peer_ip = 0;
279                                 *port = 0;
280                                 *conn_count = 0;
281                                 *share_count = 0;
282                                 rc = 0;
283                                 goto out;
284                         }
285
286                         list_for_each (rtmp, &peer->ksnp_routes) {
287                                 if (index-- > 0)
288                                         continue;
289
290                                 route = list_entry(rtmp, ksock_route_t,
291                                                    ksnr_list);
292
293                                 *nid = peer->ksnp_nid;
294                                 *myip = route->ksnr_myipaddr;
295                                 *peer_ip = route->ksnr_ipaddr;
296                                 *port = route->ksnr_port;
297                                 *conn_count = route->ksnr_conn_count;
298                                 *share_count = route->ksnr_share_count;
299                                 rc = 0;
300                                 goto out;
301                         }
302                 }
303         }
304  out:
305         read_unlock (&ksocknal_data.ksnd_global_lock);
306         return (rc);
307 }
308
309 void
310 ksocknal_associate_route_conn_locked(ksock_route_t *route, ksock_conn_t *conn)
311 {
312         ksock_peer_t      *peer = route->ksnr_peer;
313         int                type = conn->ksnc_type;
314         ksock_interface_t *iface;
315
316         conn->ksnc_route = route;
317         atomic_inc (&route->ksnr_refcount);
318
319         if (route->ksnr_myipaddr != conn->ksnc_myipaddr) {
320                 if (route->ksnr_myipaddr == 0) {
321                         /* route wasn't bound locally yet (the initial route) */
322                         CWARN("Binding "LPX64" %u.%u.%u.%u to %u.%u.%u.%u\n",
323                               peer->ksnp_nid,
324                               HIPQUAD(route->ksnr_ipaddr),
325                               HIPQUAD(conn->ksnc_myipaddr));
326                 } else {
327                         CWARN("Rebinding "LPX64" %u.%u.%u.%u from "
328                               "%u.%u.%u.%u to %u.%u.%u.%u\n",
329                               peer->ksnp_nid,
330                               HIPQUAD(route->ksnr_ipaddr),
331                               HIPQUAD(route->ksnr_myipaddr),
332                               HIPQUAD(conn->ksnc_myipaddr));
333
334                         iface = ksocknal_ip2iface(route->ksnr_myipaddr);
335                         if (iface != NULL)
336                                 iface->ksni_nroutes--;
337                 }
338                 route->ksnr_myipaddr = conn->ksnc_myipaddr;
339                 iface = ksocknal_ip2iface(route->ksnr_myipaddr);
340                 if (iface != NULL)
341                         iface->ksni_nroutes++;
342         }
343
344         route->ksnr_connected |= (1<<type);
345         route->ksnr_connecting &= ~(1<<type);
346         route->ksnr_conn_count++;
347
348         /* Successful connection => further attempts can
349          * proceed immediately */
350         route->ksnr_timeout = cfs_time_current();
351         route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
352 }
353
354 void
355 ksocknal_add_route_locked (ksock_peer_t *peer, ksock_route_t *route)
356 {
357         struct list_head  *tmp;
358         ksock_conn_t      *conn;
359         int                type;
360         ksock_route_t     *route2;
361
362         LASSERT (route->ksnr_peer == NULL);
363         LASSERT (route->ksnr_connecting == 0);
364         LASSERT (route->ksnr_connected == 0);
365
366         /* LASSERT(unique) */
367         list_for_each(tmp, &peer->ksnp_routes) {
368                 route2 = list_entry(tmp, ksock_route_t, ksnr_list);
369
370                 if (route2->ksnr_ipaddr == route->ksnr_ipaddr) {
371                         CERROR ("Duplicate route "LPX64" %u.%u.%u.%u\n",
372                                 peer->ksnp_nid, HIPQUAD(route->ksnr_ipaddr));
373                         LBUG();
374                 }
375         }
376
377         route->ksnr_peer = peer;
378         atomic_inc (&peer->ksnp_refcount);
379         /* peer's routelist takes over my ref on 'route' */
380         list_add_tail(&route->ksnr_list, &peer->ksnp_routes);
381
382         list_for_each(tmp, &peer->ksnp_conns) {
383                 conn = list_entry(tmp, ksock_conn_t, ksnc_list);
384                 type = conn->ksnc_type;
385
386                 if (conn->ksnc_ipaddr != route->ksnr_ipaddr)
387                         continue;
388
389                 ksocknal_associate_route_conn_locked(route, conn);
390                 /* keep going (typed routes) */
391         }
392 }
393
394 void
395 ksocknal_del_route_locked (ksock_route_t *route)
396 {
397         ksock_peer_t      *peer = route->ksnr_peer;
398         ksock_interface_t *iface;
399         ksock_conn_t      *conn;
400         struct list_head  *ctmp;
401         struct list_head  *cnxt;
402
403         LASSERT (!route->ksnr_deleted);
404
405         /* Close associated conns */
406         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
407                 conn = list_entry(ctmp, ksock_conn_t, ksnc_list);
408
409                 if (conn->ksnc_route != route)
410                         continue;
411
412                 ksocknal_close_conn_locked (conn, 0);
413         }
414
415         if (route->ksnr_myipaddr != 0) {
416                 iface = ksocknal_ip2iface(route->ksnr_myipaddr);
417                 if (iface != NULL)
418                         iface->ksni_nroutes--;
419         }
420
421         route->ksnr_deleted = 1;
422         list_del (&route->ksnr_list);
423         ksocknal_put_route (route);             /* drop peer's ref */
424
425         if (list_empty (&peer->ksnp_routes) &&
426             list_empty (&peer->ksnp_conns)) {
427                 /* I've just removed the last autoconnect route of a peer
428                  * with no active connections */
429                 ksocknal_unlink_peer_locked (peer);
430         }
431 }
432
433 int
434 ksocknal_add_peer (ptl_nid_t nid, __u32 ipaddr, int port)
435 {
436         unsigned long      flags;
437         struct list_head  *tmp;
438         ksock_peer_t      *peer;
439         ksock_peer_t      *peer2;
440         ksock_route_t     *route;
441         ksock_route_t     *route2;
442
443         if (nid == PTL_NID_ANY)
444                 return (-EINVAL);
445
446         /* Have a brand new peer ready... */
447         peer = ksocknal_create_peer (nid);
448         if (peer == NULL)
449                 return (-ENOMEM);
450
451         route = ksocknal_create_route (ipaddr, port);
452         if (route == NULL) {
453                 ksocknal_put_peer (peer);
454                 return (-ENOMEM);
455         }
456
457         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
458
459         peer2 = ksocknal_find_peer_locked (nid);
460         if (peer2 != NULL) {
461                 ksocknal_put_peer (peer);
462                 peer = peer2;
463         } else {
464                 /* peer table takes my ref on peer */
465                 list_add_tail (&peer->ksnp_list,
466                                ksocknal_nid2peerlist (nid));
467         }
468
469         route2 = NULL;
470         list_for_each (tmp, &peer->ksnp_routes) {
471                 route2 = list_entry(tmp, ksock_route_t, ksnr_list);
472
473                 if (route2->ksnr_ipaddr == ipaddr)
474                         break;
475
476                 route2 = NULL;
477         }
478         if (route2 == NULL) {
479                 ksocknal_add_route_locked(peer, route);
480                 route->ksnr_share_count++;
481         } else {
482                 ksocknal_put_route(route);
483                 route2->ksnr_share_count++;
484         }
485
486         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
487
488         return (0);
489 }
490
491 void
492 ksocknal_del_peer_locked (ksock_peer_t *peer, __u32 ip, int single_share)
493 {
494         ksock_conn_t     *conn;
495         ksock_route_t    *route;
496         struct list_head *tmp;
497         struct list_head *nxt;
498         int               nshared;
499
500         LASSERT (!peer->ksnp_closing);
501
502         list_for_each_safe (tmp, nxt, &peer->ksnp_routes) {
503                 route = list_entry(tmp, ksock_route_t, ksnr_list);
504
505                 if (single_share && route->ksnr_share_count == 0)
506                         continue;
507
508                 /* no match */
509                 if (!(ip == 0 || route->ksnr_ipaddr == ip))
510                         continue;
511
512                 if (!single_share)
513                         route->ksnr_share_count = 0;
514                 else if (route->ksnr_share_count > 0)
515                         route->ksnr_share_count--;
516
517                 if (route->ksnr_share_count == 0) {
518                         /* This deletes associated conns too */
519                         ksocknal_del_route_locked (route);
520                 }
521
522                 if (single_share)
523                         break;
524         }
525
526         nshared = 0;
527         list_for_each_safe (tmp, nxt, &peer->ksnp_routes) {
528                 route = list_entry(tmp, ksock_route_t, ksnr_list);
529                 nshared += route->ksnr_share_count;
530         }
531
532         if (nshared == 0) {
533                 /* remove everything else if there are no explicit entries
534                  * left */
535
536                 list_for_each_safe (tmp, nxt, &peer->ksnp_routes) {
537                         route = list_entry(tmp, ksock_route_t, ksnr_list);
538
539                         /* we should only be removing auto-entries */
540                         LASSERT(route->ksnr_share_count == 0);
541                         ksocknal_del_route_locked (route);
542                 }
543
544                 list_for_each_safe (tmp, nxt, &peer->ksnp_conns) {
545                         conn = list_entry(tmp, ksock_conn_t, ksnc_list);
546
547                         ksocknal_close_conn_locked(conn, 0);
548                 }
549         }
550
551         /* NB peer unlinks itself when last conn/route is removed */
552 }
553
554 int
555 ksocknal_del_peer (ptl_nid_t nid, __u32 ip, int single_share)
556 {
557         unsigned long      flags;
558         struct list_head  *ptmp;
559         struct list_head  *pnxt;
560         ksock_peer_t      *peer;
561         int                lo;
562         int                hi;
563         int                i;
564         int                rc = -ENOENT;
565
566         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
567
568         if (nid != PTL_NID_ANY)
569                 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
570         else {
571                 lo = 0;
572                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
573         }
574
575         for (i = lo; i <= hi; i++) {
576                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
577                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
578
579                         if (!(nid == PTL_NID_ANY || peer->ksnp_nid == nid))
580                                 continue;
581
582                         ksocknal_del_peer_locked (peer, ip, single_share);
583                         rc = 0;                 /* matched! */
584
585                         if (single_share)
586                                 break;
587                 }
588         }
589
590         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
591
592         return (rc);
593 }
594
595 ksock_conn_t *
596 ksocknal_get_conn_by_idx (int index)
597 {
598         ksock_peer_t      *peer;
599         struct list_head  *ptmp;
600         ksock_conn_t      *conn;
601         struct list_head  *ctmp;
602         int                i;
603
604         read_lock (&ksocknal_data.ksnd_global_lock);
605
606         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
607                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
608                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
609
610                         LASSERT (!peer->ksnp_closing);
611
612                         list_for_each (ctmp, &peer->ksnp_conns) {
613                                 if (index-- > 0)
614                                         continue;
615
616                                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
617                                 atomic_inc (&conn->ksnc_refcount);
618                                 read_unlock (&ksocknal_data.ksnd_global_lock);
619                                 return (conn);
620                         }
621                 }
622         }
623
624         read_unlock (&ksocknal_data.ksnd_global_lock);
625         return (NULL);
626 }
627
628 ksock_sched_t *
629 ksocknal_choose_scheduler_locked (unsigned int irq)
630 {
631         ksock_sched_t    *sched;
632         ksock_irqinfo_t  *info;
633         int               i;
634
635         LASSERT (irq < NR_IRQS);
636         info = &ksocknal_data.ksnd_irqinfo[irq];
637
638         if (irq != 0 &&                         /* hardware NIC */
639             info->ksni_valid) {                 /* already set up */
640                 return (&ksocknal_data.ksnd_schedulers[info->ksni_sched]);
641         }
642
643         /* software NIC (irq == 0) || not associated with a scheduler yet.
644          * Choose the CPU with the fewest connections... */
645         sched = &ksocknal_data.ksnd_schedulers[0];
646         for (i = 1; i < ksocknal_data.ksnd_nschedulers; i++)
647                 if (sched->kss_nconns >
648                     ksocknal_data.ksnd_schedulers[i].kss_nconns)
649                         sched = &ksocknal_data.ksnd_schedulers[i];
650
651         if (irq != 0) {                         /* Hardware NIC */
652                 info->ksni_valid = 1;
653                 info->ksni_sched = sched - ksocknal_data.ksnd_schedulers;
654
655                 /* no overflow... */
656                 LASSERT (info->ksni_sched == sched - ksocknal_data.ksnd_schedulers);
657         }
658
659         return (sched);
660 }
661
662 int
663 ksocknal_local_ipvec (__u32 *ipaddrs)
664 {
665         int                i;
666         int                nip;
667
668         read_lock (&ksocknal_data.ksnd_global_lock);
669
670         nip = ksocknal_data.ksnd_ninterfaces;
671         for (i = 0; i < nip; i++) {
672                 LASSERT (i < SOCKNAL_MAX_INTERFACES);
673
674                 ipaddrs[i] = ksocknal_data.ksnd_interfaces[i].ksni_ipaddr;
675                 LASSERT (ipaddrs[i] != 0);
676         }
677
678         read_unlock (&ksocknal_data.ksnd_global_lock);
679         return (nip);
680 }
681
682 int
683 ksocknal_match_peerip (ksock_interface_t *iface, __u32 *ips, int nips)
684 {
685         int   best_netmatch = 0;
686         int   best_xor      = 0;
687         int   best          = -1;
688         int   this_xor;
689         int   this_netmatch;
690         int   i;
691
692         for (i = 0; i < nips; i++) {
693                 if (ips[i] == 0)
694                         continue;
695
696                 this_xor = (ips[i] ^ iface->ksni_ipaddr);
697                 this_netmatch = ((this_xor & iface->ksni_netmask) == 0) ? 1 : 0;
698
699                 if (!(best < 0 ||
700                       best_netmatch < this_netmatch ||
701                       (best_netmatch == this_netmatch &&
702                        best_xor > this_xor)))
703                         continue;
704
705                 best = i;
706                 best_netmatch = this_netmatch;
707                 best_xor = this_xor;
708         }
709
710         LASSERT (best >= 0);
711         return (best);
712 }
713
714 int
715 ksocknal_select_ips(ksock_peer_t *peer, __u32 *peerips, int n_peerips)
716 {
717         rwlock_t           *global_lock = &ksocknal_data.ksnd_global_lock;
718         unsigned long       flags;
719         ksock_interface_t  *iface;
720         ksock_interface_t  *best_iface;
721         int                 n_ips;
722         int                 i;
723         int                 j;
724         int                 k;
725         __u32               ip;
726         __u32               xor;
727         int                 this_netmatch;
728         int                 best_netmatch;
729         int                 best_npeers;
730
731         /* CAVEAT EMPTOR: We do all our interface matching with an
732          * exclusive hold of global lock at IRQ priority.  We're only
733          * expecting to be dealing with small numbers of interfaces, so the
734          * O(n**3)-ness shouldn't matter */
735
736         /* Also note that I'm not going to return more than n_peerips
737          * interfaces, even if I have more myself */
738
739         write_lock_irqsave(global_lock, flags);
740
741         LASSERT (n_peerips <= SOCKNAL_MAX_INTERFACES);
742         LASSERT (ksocknal_data.ksnd_ninterfaces <= SOCKNAL_MAX_INTERFACES);
743
744         n_ips = MIN(n_peerips, ksocknal_data.ksnd_ninterfaces);
745
746         for (i = 0; peer->ksnp_n_passive_ips < n_ips; i++) {
747                 /*              ^ yes really... */
748
749                 /* If we have any new interfaces, first tick off all the
750                  * peer IPs that match old interfaces, then choose new
751                  * interfaces to match the remaining peer IPS.
752                  * We don't forget interfaces we've stopped using; we might
753                  * start using them again... */
754
755                 if (i < peer->ksnp_n_passive_ips) {
756                         /* Old interface. */
757                         ip = peer->ksnp_passive_ips[i];
758                         best_iface = ksocknal_ip2iface(ip);
759
760                         /* peer passive ips are kept up to date */
761                         LASSERT(best_iface != NULL);
762                 } else {
763                         /* choose a new interface */
764                         LASSERT (i == peer->ksnp_n_passive_ips);
765
766                         best_iface = NULL;
767                         best_netmatch = 0;
768                         best_npeers = 0;
769
770                         for (j = 0; j < ksocknal_data.ksnd_ninterfaces; j++) {
771                                 iface = &ksocknal_data.ksnd_interfaces[j];
772                                 ip = iface->ksni_ipaddr;
773
774                                 for (k = 0; k < peer->ksnp_n_passive_ips; k++)
775                                         if (peer->ksnp_passive_ips[k] == ip)
776                                                 break;
777
778                                 if (k < peer->ksnp_n_passive_ips) /* using it already */
779                                         continue;
780
781                                 k = ksocknal_match_peerip(iface, peerips, n_peerips);
782                                 xor = (ip ^ peerips[k]);
783                                 this_netmatch = ((xor & iface->ksni_netmask) == 0) ? 1 : 0;
784
785                                 if (!(best_iface == NULL ||
786                                       best_netmatch < this_netmatch ||
787                                       (best_netmatch == this_netmatch &&
788                                        best_npeers > iface->ksni_npeers)))
789                                         continue;
790
791                                 best_iface = iface;
792                                 best_netmatch = this_netmatch;
793                                 best_npeers = iface->ksni_npeers;
794                         }
795
796                         best_iface->ksni_npeers++;
797                         ip = best_iface->ksni_ipaddr;
798                         peer->ksnp_passive_ips[i] = ip;
799                         peer->ksnp_n_passive_ips = i+1;
800                 }
801
802                 LASSERT (best_iface != NULL);
803
804                 /* mark the best matching peer IP used */
805                 j = ksocknal_match_peerip(best_iface, peerips, n_peerips);
806                 peerips[j] = 0;
807         }
808
809         /* Overwrite input peer IP addresses */
810         memcpy(peerips, peer->ksnp_passive_ips, n_ips * sizeof(*peerips));
811
812         write_unlock_irqrestore(global_lock, flags);
813
814         return (n_ips);
815 }
816
817 void
818 ksocknal_create_routes(ksock_peer_t *peer, int port,
819                        __u32 *peer_ipaddrs, int npeer_ipaddrs)
820 {
821         ksock_route_t      *newroute = NULL;
822         rwlock_t           *global_lock = &ksocknal_data.ksnd_global_lock;
823         unsigned long       flags;
824         struct list_head   *rtmp;
825         ksock_route_t      *route;
826         ksock_interface_t  *iface;
827         ksock_interface_t  *best_iface;
828         int                 best_netmatch;
829         int                 this_netmatch;
830         int                 best_nroutes;
831         int                 i;
832         int                 j;
833
834         /* CAVEAT EMPTOR: We do all our interface matching with an
835          * exclusive hold of global lock at IRQ priority.  We're only
836          * expecting to be dealing with small numbers of interfaces, so the
837          * O(n**3)-ness here shouldn't matter */
838
839         write_lock_irqsave(global_lock, flags);
840
841         LASSERT (npeer_ipaddrs <= SOCKNAL_MAX_INTERFACES);
842
843         for (i = 0; i < npeer_ipaddrs; i++) {
844                 if (newroute != NULL) {
845                         newroute->ksnr_ipaddr = peer_ipaddrs[i];
846                 } else {
847                         write_unlock_irqrestore(global_lock, flags);
848
849                         newroute = ksocknal_create_route(peer_ipaddrs[i], port);
850                         if (newroute == NULL)
851                                 return;
852
853                         write_lock_irqsave(global_lock, flags);
854                 }
855
856                 /* Already got a route? */
857                 route = NULL;
858                 list_for_each(rtmp, &peer->ksnp_routes) {
859                         route = list_entry(rtmp, ksock_route_t, ksnr_list);
860
861                         if (route->ksnr_ipaddr == newroute->ksnr_ipaddr)
862                                 break;
863
864                         route = NULL;
865                 }
866                 if (route != NULL)
867                         continue;
868
869                 best_iface = NULL;
870                 best_nroutes = 0;
871                 best_netmatch = 0;
872
873                 LASSERT (ksocknal_data.ksnd_ninterfaces <= SOCKNAL_MAX_INTERFACES);
874
875                 /* Select interface to connect from */
876                 for (j = 0; j < ksocknal_data.ksnd_ninterfaces; j++) {
877                         iface = &ksocknal_data.ksnd_interfaces[j];
878
879                         /* Using this interface already? */
880                         list_for_each(rtmp, &peer->ksnp_routes) {
881                                 route = list_entry(rtmp, ksock_route_t, ksnr_list);
882
883                                 if (route->ksnr_myipaddr == iface->ksni_ipaddr)
884                                         break;
885
886                                 route = NULL;
887                         }
888                         if (route != NULL)
889                                 continue;
890
891                         this_netmatch = (((iface->ksni_ipaddr ^
892                                            newroute->ksnr_ipaddr) &
893                                            iface->ksni_netmask) == 0) ? 1 : 0;
894
895                         if (!(best_iface == NULL ||
896                               best_netmatch < this_netmatch ||
897                               (best_netmatch == this_netmatch &&
898                                best_nroutes > iface->ksni_nroutes)))
899                                 continue;
900
901                         best_iface = iface;
902                         best_netmatch = this_netmatch;
903                         best_nroutes = iface->ksni_nroutes;
904                 }
905
906                 if (best_iface == NULL)
907                         continue;
908
909                 newroute->ksnr_myipaddr = best_iface->ksni_ipaddr;
910                 best_iface->ksni_nroutes++;
911
912                 ksocknal_add_route_locked(peer, newroute);
913                 newroute = NULL;
914         }
915
916         write_unlock_irqrestore(global_lock, flags);
917         if (newroute != NULL)
918                 ksocknal_put_route(newroute);
919 }
920
921 int
922 ksocknal_create_conn (ksock_route_t *route, struct socket *sock, int type)
923 {
924         int                passive = (type == SOCKNAL_CONN_NONE);
925         rwlock_t          *global_lock = &ksocknal_data.ksnd_global_lock;
926         __u32              ipaddrs[SOCKNAL_MAX_INTERFACES];
927         int                nipaddrs;
928         ptl_nid_t          nid;
929         struct list_head  *tmp;
930         __u64              incarnation;
931         unsigned long      flags;
932         ksock_conn_t      *conn;
933         ksock_conn_t      *conn2;
934         ksock_peer_t      *peer = NULL;
935         ksock_peer_t      *peer2;
936         ksock_sched_t     *sched;
937         unsigned int       irq;
938         ksock_tx_t        *tx;
939         int                rc;
940
941         /* NB, sock has an associated file since (a) this connection might
942          * have been created in userland and (b) we need to refcount the
943          * socket so that we don't close it while I/O is being done on
944          * it, and sock->file has that pre-cooked... */
945         LASSERT (KSN_SOCK2FILE(sock) != NULL);
946         LASSERT (cfs_file_count(KSN_SOCK2FILE(sock)) > 0);
947         LASSERT (route == NULL || !passive);
948
949         rc = ksocknal_lib_setup_sock (sock);
950         if (rc != 0)
951                 return (rc);
952
953         irq = ksocknal_lib_sock_irq (sock);
954
955         PORTAL_ALLOC(conn, sizeof(*conn));
956         if (conn == NULL)
957                 return (-ENOMEM);
958
959         memset (conn, 0, sizeof (*conn));
960         conn->ksnc_peer = NULL;
961         conn->ksnc_route = NULL;
962         conn->ksnc_sock = sock;
963         conn->ksnc_type = type;
964         ksocknal_lib_save_callback(sock, conn);
965         atomic_set (&conn->ksnc_refcount, 1);    /* 1 ref for me */
966
967         conn->ksnc_rx_ready = 0;
968         conn->ksnc_rx_scheduled = 0;
969         ksocknal_new_packet (conn, 0);
970
971         CFS_INIT_LIST_HEAD (&conn->ksnc_tx_queue);
972         conn->ksnc_tx_ready = 0;
973         conn->ksnc_tx_scheduled = 0;
974         atomic_set (&conn->ksnc_tx_nob, 0);
975
976         /* stash conn's local and remote addrs */
977         rc = ksocknal_lib_get_conn_addrs (conn);
978         if (rc != 0)
979                 goto failed_0;
980
981         if (!passive) {
982                 /* Active connection sends HELLO eagerly */
983                 rc = ksocknal_local_ipvec(ipaddrs);
984                 if (rc < 0)
985                         goto failed_0;
986                 nipaddrs = rc;
987
988                 rc = ksocknal_send_hello (conn, ipaddrs, nipaddrs);
989                 if (rc != 0)
990                         goto failed_0;
991         }
992
993         /* Find out/confirm peer's NID and connection type and get the
994          * vector of interfaces she's willing to let me connect to */
995         nid = (route == NULL) ? PTL_NID_ANY : route->ksnr_peer->ksnp_nid;
996         rc = ksocknal_recv_hello (conn, &nid, &incarnation, ipaddrs);
997         if (rc < 0)
998                 goto failed_0;
999         nipaddrs = rc;
1000         LASSERT (nid != PTL_NID_ANY);
1001
1002         if (route != NULL) {
1003                 peer = route->ksnr_peer;
1004                 atomic_inc(&peer->ksnp_refcount);
1005         } else {
1006                 peer = ksocknal_create_peer(nid);
1007                 if (peer == NULL) {
1008                         rc = -ENOMEM;
1009                         goto failed_0;
1010                 }
1011
1012                 write_lock_irqsave(global_lock, flags);
1013
1014                 peer2 = ksocknal_find_peer_locked(nid);
1015                 if (peer2 == NULL) {
1016                         /* NB this puts an "empty" peer in the peer
1017                          * table (which takes my ref) */
1018                         list_add_tail(&peer->ksnp_list,
1019                                       ksocknal_nid2peerlist(nid));
1020                 } else  {
1021                         ksocknal_put_peer(peer);
1022                         peer = peer2;
1023                 }
1024                 /* +1 ref for me */
1025                 atomic_inc(&peer->ksnp_refcount);
1026
1027                 write_unlock_irqrestore(global_lock, flags);
1028         }
1029
1030         if (!passive) {
1031                 ksocknal_create_routes(peer, conn->ksnc_port,
1032                                        ipaddrs, nipaddrs);
1033                 rc = 0;
1034         } else {
1035                 rc = ksocknal_select_ips(peer, ipaddrs, nipaddrs);
1036                 LASSERT (rc >= 0);
1037                 rc = ksocknal_send_hello (conn, ipaddrs, rc);
1038         }
1039         if (rc < 0)
1040                 goto failed_1;
1041
1042         write_lock_irqsave (global_lock, flags);
1043
1044         if (peer->ksnp_closing ||
1045             (route != NULL && route->ksnr_deleted)) {
1046                 /* route/peer got closed under me */
1047                 rc = -ESTALE;
1048                 goto failed_2;
1049         }
1050
1051         /* Refuse to duplicate an existing connection (both sides might
1052          * autoconnect at once), unless this is a loopback connection */
1053         if (conn->ksnc_ipaddr != conn->ksnc_myipaddr) {
1054                 list_for_each(tmp, &peer->ksnp_conns) {
1055                         conn2 = list_entry(tmp, ksock_conn_t, ksnc_list);
1056
1057                         if (conn2->ksnc_ipaddr != conn->ksnc_ipaddr ||
1058                             conn2->ksnc_myipaddr != conn->ksnc_myipaddr ||
1059                             conn2->ksnc_type != conn->ksnc_type ||
1060                             conn2->ksnc_incarnation != incarnation)
1061                                 continue;
1062
1063                         CWARN("Not creating duplicate connection to "
1064                               "%u.%u.%u.%u type %d\n",
1065                               HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_type);
1066                         rc = -EALREADY;
1067                         goto failed_2;
1068                 }
1069         }
1070
1071         /* If the connection created by this route didn't bind to the IP
1072          * address the route connected to, the connection/route matching
1073          * code below probably isn't going to work. */
1074         if (route != NULL &&
1075             route->ksnr_ipaddr != conn->ksnc_ipaddr) {
1076                 CERROR("Route "LPX64" %u.%u.%u.%u connected to %u.%u.%u.%u\n",
1077                        peer->ksnp_nid,
1078                        HIPQUAD(route->ksnr_ipaddr),
1079                        HIPQUAD(conn->ksnc_ipaddr));
1080         }
1081
1082         /* Search for a route corresponding to the new connection and
1083          * create an association.  This allows incoming connections created
1084          * by routes in my peer to match my own route entries so I don't
1085          * continually create duplicate routes. */
1086         list_for_each (tmp, &peer->ksnp_routes) {
1087                 route = list_entry(tmp, ksock_route_t, ksnr_list);
1088
1089                 if (route->ksnr_ipaddr != conn->ksnc_ipaddr)
1090                         continue;
1091
1092                 ksocknal_associate_route_conn_locked(route, conn);
1093                 break;
1094         }
1095
1096         /* Give conn a ref on sock->file since we're going to return success */
1097         cfs_get_file(KSN_SOCK2FILE(sock));
1098
1099         conn->ksnc_peer = peer;                 /* conn takes my ref on peer */
1100         conn->ksnc_incarnation = incarnation;
1101         peer->ksnp_last_alive = cfs_time_current();
1102         peer->ksnp_error = 0;
1103
1104         sched = ksocknal_choose_scheduler_locked (irq);
1105         sched->kss_nconns++;
1106         conn->ksnc_scheduler = sched;
1107
1108         /* Set the deadline for the outgoing HELLO to drain */
1109         conn->ksnc_tx_bufnob = SOCK_WMEM_QUEUED(sock);
1110         conn->ksnc_tx_deadline = cfs_time_shift(ksocknal_tunables.ksnd_io_timeout);
1111         mb();       /* order with adding to peer's conn list */
1112
1113         list_add (&conn->ksnc_list, &peer->ksnp_conns);
1114         atomic_inc (&conn->ksnc_refcount);
1115
1116         /* NB my callbacks block while I hold ksnd_global_lock */
1117         ksocknal_lib_set_callback(sock, conn);
1118
1119         /* Take all the packets blocking for a connection.
1120          * NB, it might be nicer to share these blocked packets among any
1121          * other connections that are becoming established. */
1122         while (!list_empty (&peer->ksnp_tx_queue)) {
1123                 tx = list_entry (peer->ksnp_tx_queue.next,
1124                                  ksock_tx_t, tx_list);
1125
1126                 list_del (&tx->tx_list);
1127                 ksocknal_queue_tx_locked (tx, conn);
1128         }
1129
1130         rc = ksocknal_close_stale_conns_locked(peer, incarnation);
1131         if (rc != 0)
1132                 CERROR ("Closed %d stale conns to nid "LPX64" ip %d.%d.%d.%d\n",
1133                         rc, conn->ksnc_peer->ksnp_nid,
1134                         HIPQUAD(conn->ksnc_ipaddr));
1135
1136         write_unlock_irqrestore (global_lock, flags);
1137
1138         ksocknal_lib_bind_irq (irq);
1139
1140         /* Call the callbacks right now to get things going. */
1141         if (ksocknal_getconnsock(conn) == 0) {
1142                 ksocknal_lib_act_callback(sock, conn);
1143                 ksocknal_putconnsock(conn);
1144         }
1145
1146         CWARN("New conn nid:"LPX64" %u.%u.%u.%u -> %u.%u.%u.%u/%d"
1147               " incarnation:"LPX64" sched[%d]/%d\n",
1148               nid, HIPQUAD(conn->ksnc_myipaddr),
1149               HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port, incarnation,
1150               (int)(conn->ksnc_scheduler - ksocknal_data.ksnd_schedulers), irq);
1151
1152         ksocknal_put_conn (conn);
1153         return (0);
1154
1155  failed_2:
1156         if (!peer->ksnp_closing &&
1157             list_empty (&peer->ksnp_conns) &&
1158             list_empty (&peer->ksnp_routes))
1159                 ksocknal_unlink_peer_locked(peer);
1160         write_unlock_irqrestore(global_lock, flags);
1161
1162  failed_1:
1163         ksocknal_put_peer (peer);
1164
1165  failed_0:
1166         PORTAL_FREE (conn, sizeof(*conn));
1167
1168         LASSERT (rc != 0);
1169         return (rc);
1170 }
1171
1172 void
1173 ksocknal_close_conn_locked (ksock_conn_t *conn, int error)
1174 {
1175         /* This just does the immmediate housekeeping, and queues the
1176          * connection for the reaper to terminate.
1177          * Caller holds ksnd_global_lock exclusively in irq context */
1178         ksock_peer_t      *peer = conn->ksnc_peer;
1179         ksock_route_t     *route;
1180         ksock_conn_t      *conn2;
1181         struct list_head  *tmp;
1182
1183         LASSERT (peer->ksnp_error == 0);
1184         LASSERT (!conn->ksnc_closing);
1185         conn->ksnc_closing = 1;
1186         atomic_inc (&ksocknal_data.ksnd_nclosing_conns);
1187
1188         /* ksnd_deathrow_conns takes over peer's ref */
1189         list_del (&conn->ksnc_list);
1190
1191         route = conn->ksnc_route;
1192         if (route != NULL) {
1193                 /* dissociate conn from route... */
1194                 LASSERT (!route->ksnr_deleted);
1195                 LASSERT ((route->ksnr_connecting & (1 << conn->ksnc_type)) == 0);
1196                 LASSERT ((route->ksnr_connected & (1 << conn->ksnc_type)) != 0);
1197
1198                 conn2 = NULL;
1199                 list_for_each(tmp, &peer->ksnp_conns) {
1200                         conn2 = list_entry(tmp, ksock_conn_t, ksnc_list);
1201
1202                         if (conn2->ksnc_route == route &&
1203                             conn2->ksnc_type == conn->ksnc_type)
1204                                 break;
1205
1206                         conn2 = NULL;
1207                 }
1208                 if (conn2 == NULL)
1209                         route->ksnr_connected &= ~(1 << conn->ksnc_type);
1210
1211                 conn->ksnc_route = NULL;
1212
1213 #if 0           /* irrelevent with only eager routes */
1214                 list_del (&route->ksnr_list);   /* make route least favourite */
1215                 list_add_tail (&route->ksnr_list, &peer->ksnp_routes);
1216 #endif
1217                 ksocknal_put_route (route);     /* drop conn's ref on route */
1218         }
1219
1220         if (list_empty (&peer->ksnp_conns)) {
1221                 /* No more connections to this peer */
1222
1223                 peer->ksnp_error = error;       /* stash last conn close reason */
1224
1225                 if (list_empty (&peer->ksnp_routes)) {
1226                         /* I've just closed last conn belonging to a
1227                          * non-autoconnecting peer */
1228                         ksocknal_unlink_peer_locked (peer);
1229                 }
1230         }
1231
1232         spin_lock (&ksocknal_data.ksnd_reaper_lock);
1233
1234         list_add_tail (&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
1235         cfs_waitq_signal (&ksocknal_data.ksnd_reaper_waitq);
1236
1237         spin_unlock (&ksocknal_data.ksnd_reaper_lock);
1238 }
1239
1240 void
1241 ksocknal_terminate_conn (ksock_conn_t *conn)
1242 {
1243         /* This gets called by the reaper (guaranteed thread context) to
1244          * disengage the socket from its callbacks and close it.
1245          * ksnc_refcount will eventually hit zero, and then the reaper will
1246          * destroy it. */
1247         unsigned long   flags;
1248         ksock_peer_t   *peer = conn->ksnc_peer;
1249         ksock_sched_t  *sched = conn->ksnc_scheduler;
1250         struct timeval  now;
1251         time_t          then = 0;
1252         int             notify = 0;
1253
1254         LASSERT(conn->ksnc_closing);
1255
1256         /* wake up the scheduler to "send" all remaining packets to /dev/null */
1257         spin_lock_irqsave(&sched->kss_lock, flags);
1258
1259         if (!conn->ksnc_tx_scheduled &&
1260             !list_empty(&conn->ksnc_tx_queue)){
1261                 list_add_tail (&conn->ksnc_tx_list,
1262                                &sched->kss_tx_conns);
1263                 /* a closing conn is always ready to tx */
1264                 conn->ksnc_tx_ready = 1;
1265                 conn->ksnc_tx_scheduled = 1;
1266                 /* extra ref for scheduler */
1267                 atomic_inc (&conn->ksnc_refcount);
1268
1269                 cfs_waitq_signal (&sched->kss_waitq);
1270         }
1271
1272         spin_unlock_irqrestore (&sched->kss_lock, flags);
1273
1274         /* serialise with callbacks */
1275         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1276
1277         ksocknal_lib_reset_callback(conn->ksnc_sock, conn);
1278
1279         /* OK, so this conn may not be completely disengaged from its
1280          * scheduler yet, but it _has_ committed to terminate... */
1281         conn->ksnc_scheduler->kss_nconns--;
1282
1283         if (peer->ksnp_error != 0) {
1284                 /* peer's last conn closed in error */
1285                 LASSERT (list_empty (&peer->ksnp_conns));
1286
1287                 /* convert peer's last-known-alive timestamp from jiffies */
1288                 do_gettimeofday (&now);
1289                 then = now.tv_sec - cfs_duration_sec(cfs_time_sub(cfs_time_current(),
1290                                                                   peer->ksnp_last_alive));
1291                 notify = 1;
1292         }
1293
1294         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1295
1296         /* The socket is closed on the final put; either here, or in
1297          * ksocknal_{send,recv}msg().  Since we set up the linger2 option
1298          * when the connection was established, this will close the socket
1299          * immediately, aborting anything buffered in it. Any hung
1300          * zero-copy transmits will therefore complete in finite time. */
1301         ksocknal_putconnsock (conn);
1302
1303         if (notify)
1304                 kpr_notify (&ksocknal_data.ksnd_router, peer->ksnp_nid,
1305                             0, then);
1306 }
1307
1308 void
1309 ksocknal_destroy_conn (ksock_conn_t *conn)
1310 {
1311         /* Final coup-de-grace of the reaper */
1312         CDEBUG (D_NET, "connection %p\n", conn);
1313
1314         LASSERT (atomic_read (&conn->ksnc_refcount) == 0);
1315         LASSERT (conn->ksnc_route == NULL);
1316         LASSERT (!conn->ksnc_tx_scheduled);
1317         LASSERT (!conn->ksnc_rx_scheduled);
1318         LASSERT (list_empty(&conn->ksnc_tx_queue));
1319
1320         /* complete current receive if any */
1321         switch (conn->ksnc_rx_state) {
1322         case SOCKNAL_RX_BODY:
1323                 CERROR("Completing partial receive from "LPX64
1324                        ", ip %d.%d.%d.%d:%d, with error\n",
1325                        conn->ksnc_peer->ksnp_nid,
1326                        HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
1327                 lib_finalize (&ksocknal_lib, NULL, conn->ksnc_cookie, PTL_FAIL);
1328                 break;
1329         case SOCKNAL_RX_BODY_FWD:
1330                 ksocknal_fmb_callback (conn->ksnc_cookie, -ECONNABORTED);
1331                 break;
1332         case SOCKNAL_RX_HEADER:
1333         case SOCKNAL_RX_SLOP:
1334                 break;
1335         default:
1336                 LBUG ();
1337                 break;
1338         }
1339
1340         ksocknal_put_peer (conn->ksnc_peer);
1341
1342         PORTAL_FREE (conn, sizeof (*conn));
1343         atomic_dec (&ksocknal_data.ksnd_nclosing_conns);
1344 }
1345
1346 void
1347 ksocknal_put_conn (ksock_conn_t *conn)
1348 {
1349         unsigned long flags;
1350
1351         CDEBUG (D_OTHER, "putting conn[%p] -> "LPX64" (%d)\n",
1352                 conn, conn->ksnc_peer->ksnp_nid,
1353                 atomic_read (&conn->ksnc_refcount));
1354
1355         LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
1356         if (!atomic_dec_and_test (&conn->ksnc_refcount))
1357                 return;
1358
1359         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
1360
1361         list_add (&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1362         cfs_waitq_signal (&ksocknal_data.ksnd_reaper_waitq);
1363
1364         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
1365 }
1366
1367 int
1368 ksocknal_close_peer_conns_locked (ksock_peer_t *peer, __u32 ipaddr, int why)
1369 {
1370         ksock_conn_t       *conn;
1371         struct list_head   *ctmp;
1372         struct list_head   *cnxt;
1373         int                 count = 0;
1374
1375         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1376                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
1377
1378                 if (ipaddr == 0 ||
1379                     conn->ksnc_ipaddr == ipaddr) {
1380                         count++;
1381                         ksocknal_close_conn_locked (conn, why);
1382                 }
1383         }
1384
1385         return (count);
1386 }
1387
1388 int
1389 ksocknal_close_stale_conns_locked (ksock_peer_t *peer, __u64 incarnation)
1390 {
1391         ksock_conn_t       *conn;
1392         struct list_head   *ctmp;
1393         struct list_head   *cnxt;
1394         int                 count = 0;
1395
1396         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1397                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
1398
1399                 if (conn->ksnc_incarnation == incarnation)
1400                         continue;
1401
1402                 CWARN("Closing stale conn nid:"LPX64" ip:%08x/%d "
1403                       "incarnation:"LPX64"("LPX64")\n",
1404                       peer->ksnp_nid, conn->ksnc_ipaddr, conn->ksnc_port,
1405                       conn->ksnc_incarnation, incarnation);
1406
1407                 count++;
1408                 ksocknal_close_conn_locked (conn, -ESTALE);
1409         }
1410
1411         return (count);
1412 }
1413
1414 int
1415 ksocknal_close_conn_and_siblings (ksock_conn_t *conn, int why)
1416 {
1417         ksock_peer_t     *peer = conn->ksnc_peer;
1418         __u32             ipaddr = conn->ksnc_ipaddr;
1419         unsigned long     flags;
1420         int               count;
1421
1422         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1423
1424         count = ksocknal_close_peer_conns_locked (peer, ipaddr, why);
1425
1426         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1427
1428         return (count);
1429 }
1430
1431 int
1432 ksocknal_close_matching_conns (ptl_nid_t nid, __u32 ipaddr)
1433 {
1434         unsigned long       flags;
1435         ksock_peer_t       *peer;
1436         struct list_head   *ptmp;
1437         struct list_head   *pnxt;
1438         int                 lo;
1439         int                 hi;
1440         int                 i;
1441         int                 count = 0;
1442
1443         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1444
1445         if (nid != PTL_NID_ANY)
1446                 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
1447         else {
1448                 lo = 0;
1449                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
1450         }
1451
1452         for (i = lo; i <= hi; i++) {
1453                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
1454
1455                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
1456
1457                         if (!(nid == PTL_NID_ANY || nid == peer->ksnp_nid))
1458                                 continue;
1459
1460                         count += ksocknal_close_peer_conns_locked (peer, ipaddr, 0);
1461                 }
1462         }
1463
1464         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1465
1466         /* wildcards always succeed */
1467         if (nid == PTL_NID_ANY || ipaddr == 0)
1468                 return (0);
1469
1470         return (count == 0 ? -ENOENT : 0);
1471 }
1472
1473 void
1474 ksocknal_notify (void *arg, ptl_nid_t gw_nid, int alive)
1475 {
1476         /* The router is telling me she's been notified of a change in
1477          * gateway state.... */
1478
1479         CDEBUG (D_NET, "gw "LPX64" %s\n", gw_nid, alive ? "up" : "down");
1480
1481         if (!alive) {
1482                 /* If the gateway crashed, close all open connections... */
1483                 ksocknal_close_matching_conns (gw_nid, 0);
1484                 return;
1485         }
1486
1487         /* ...otherwise do nothing.  We can only establish new connections
1488          * if we have autroutes, and these connect on demand. */
1489 }
1490
1491 void
1492 ksocknal_push_peer (ksock_peer_t *peer)
1493 {
1494         int               index;
1495         int               i;
1496         struct list_head *tmp;
1497         ksock_conn_t     *conn;
1498
1499         for (index = 0; ; index++) {
1500                 read_lock (&ksocknal_data.ksnd_global_lock);
1501
1502                 i = 0;
1503                 conn = NULL;
1504
1505                 list_for_each (tmp, &peer->ksnp_conns) {
1506                         if (i++ == index) {
1507                                 conn = list_entry (tmp, ksock_conn_t, ksnc_list);
1508                                 atomic_inc (&conn->ksnc_refcount);
1509                                 break;
1510                         }
1511                 }
1512
1513                 read_unlock (&ksocknal_data.ksnd_global_lock);
1514
1515                 if (conn == NULL)
1516                         break;
1517
1518                 ksocknal_lib_push_conn (conn);
1519                 ksocknal_put_conn (conn);
1520         }
1521 }
1522
1523 int
1524 ksocknal_push (ptl_nid_t nid)
1525 {
1526         ksock_peer_t      *peer;
1527         struct list_head  *tmp;
1528         int                index;
1529         int                i;
1530         int                j;
1531         int                rc = -ENOENT;
1532
1533         if (nid != PTL_NID_ANY) {
1534                 peer = ksocknal_get_peer (nid);
1535
1536                 if (peer != NULL) {
1537                         rc = 0;
1538                         ksocknal_push_peer (peer);
1539                         ksocknal_put_peer (peer);
1540                 }
1541                 return (rc);
1542         }
1543
1544         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1545                 for (j = 0; ; j++) {
1546                         read_lock (&ksocknal_data.ksnd_global_lock);
1547
1548                         index = 0;
1549                         peer = NULL;
1550
1551                         list_for_each (tmp, &ksocknal_data.ksnd_peers[i]) {
1552                                 if (index++ == j) {
1553                                         peer = list_entry(tmp, ksock_peer_t,
1554                                                           ksnp_list);
1555                                         atomic_inc (&peer->ksnp_refcount);
1556                                         break;
1557                                 }
1558                         }
1559
1560                         read_unlock (&ksocknal_data.ksnd_global_lock);
1561
1562                         if (peer != NULL) {
1563                                 rc = 0;
1564                                 ksocknal_push_peer (peer);
1565                                 ksocknal_put_peer (peer);
1566                         }
1567                 }
1568
1569         }
1570
1571         return (rc);
1572 }
1573
1574 int
1575 ksocknal_add_interface(__u32 ipaddress, __u32 netmask)
1576 {
1577         unsigned long      flags;
1578         ksock_interface_t *iface;
1579         int                rc;
1580         int                i;
1581         int                j;
1582         struct list_head  *ptmp;
1583         ksock_peer_t      *peer;
1584         struct list_head  *rtmp;
1585         ksock_route_t     *route;
1586
1587         if (ipaddress == 0 ||
1588             netmask == 0)
1589                 return (-EINVAL);
1590
1591         write_lock_irqsave(&ksocknal_data.ksnd_global_lock, flags);
1592
1593         iface = ksocknal_ip2iface(ipaddress);
1594         if (iface != NULL) {
1595                 /* silently ignore dups */
1596                 rc = 0;
1597         } else if (ksocknal_data.ksnd_ninterfaces == SOCKNAL_MAX_INTERFACES) {
1598                 rc = -ENOSPC;
1599         } else {
1600                 iface = &ksocknal_data.ksnd_interfaces[ksocknal_data.ksnd_ninterfaces++];
1601
1602                 iface->ksni_ipaddr = ipaddress;
1603                 iface->ksni_netmask = netmask;
1604                 iface->ksni_nroutes = 0;
1605                 iface->ksni_npeers = 0;
1606
1607                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1608                         list_for_each(ptmp, &ksocknal_data.ksnd_peers[i]) {
1609                                 peer = list_entry(ptmp, ksock_peer_t, ksnp_list);
1610
1611                                 for (j = 0; i < peer->ksnp_n_passive_ips; j++)
1612                                         if (peer->ksnp_passive_ips[j] == ipaddress)
1613                                                 iface->ksni_npeers++;
1614
1615                                 list_for_each(rtmp, &peer->ksnp_routes) {
1616                                         route = list_entry(rtmp, ksock_route_t, ksnr_list);
1617
1618                                         if (route->ksnr_myipaddr == ipaddress)
1619                                                 iface->ksni_nroutes++;
1620                                 }
1621                         }
1622                 }
1623
1624                 rc = 0;
1625                 /* NB only new connections will pay attention to the new interface! */
1626         }
1627
1628         write_unlock_irqrestore(&ksocknal_data.ksnd_global_lock, flags);
1629
1630         return (rc);
1631 }
1632
1633 void
1634 ksocknal_peer_del_interface_locked(ksock_peer_t *peer, __u32 ipaddr)
1635 {
1636         struct list_head   *tmp;
1637         struct list_head   *nxt;
1638         ksock_route_t      *route;
1639         ksock_conn_t       *conn;
1640         int                 i;
1641         int                 j;
1642
1643         for (i = 0; i < peer->ksnp_n_passive_ips; i++)
1644                 if (peer->ksnp_passive_ips[i] == ipaddr) {
1645                         for (j = i+1; j < peer->ksnp_n_passive_ips; j++)
1646                                 peer->ksnp_passive_ips[j-1] =
1647                                         peer->ksnp_passive_ips[j];
1648                         peer->ksnp_n_passive_ips--;
1649                         break;
1650                 }
1651
1652         list_for_each_safe(tmp, nxt, &peer->ksnp_routes) {
1653                 route = list_entry (tmp, ksock_route_t, ksnr_list);
1654
1655                 if (route->ksnr_myipaddr != ipaddr)
1656                         continue;
1657
1658                 if (route->ksnr_share_count != 0) {
1659                         /* Manually created; keep, but unbind */
1660                         route->ksnr_myipaddr = 0;
1661                 } else {
1662                         ksocknal_del_route_locked(route);
1663                 }
1664         }
1665
1666         list_for_each_safe(tmp, nxt, &peer->ksnp_conns) {
1667                 conn = list_entry(tmp, ksock_conn_t, ksnc_list);
1668
1669                 if (conn->ksnc_myipaddr == ipaddr)
1670                         ksocknal_close_conn_locked (conn, 0);
1671         }
1672 }
1673
1674 int
1675 ksocknal_del_interface(__u32 ipaddress)
1676 {
1677         int                rc = -ENOENT;
1678         unsigned long      flags;
1679         struct list_head  *tmp;
1680         struct list_head  *nxt;
1681         ksock_peer_t      *peer;
1682         __u32              this_ip;
1683         int                i;
1684         int                j;
1685
1686         write_lock_irqsave(&ksocknal_data.ksnd_global_lock, flags);
1687
1688         for (i = 0; i < ksocknal_data.ksnd_ninterfaces; i++) {
1689                 this_ip = ksocknal_data.ksnd_interfaces[i].ksni_ipaddr;
1690
1691                 if (!(ipaddress == 0 ||
1692                       ipaddress == this_ip))
1693                         continue;
1694
1695                 rc = 0;
1696
1697                 for (j = i+1; j < ksocknal_data.ksnd_ninterfaces; j++)
1698                         ksocknal_data.ksnd_interfaces[j-1] =
1699                                 ksocknal_data.ksnd_interfaces[j];
1700
1701                 ksocknal_data.ksnd_ninterfaces--;
1702
1703                 for (j = 0; j < ksocknal_data.ksnd_peer_hash_size; j++) {
1704                         list_for_each_safe(tmp, nxt, &ksocknal_data.ksnd_peers[j]) {
1705                                 peer = list_entry(tmp, ksock_peer_t, ksnp_list);
1706
1707                                 ksocknal_peer_del_interface_locked(peer, this_ip);
1708                         }
1709                 }
1710         }
1711
1712         write_unlock_irqrestore(&ksocknal_data.ksnd_global_lock, flags);
1713
1714         return (rc);
1715 }
1716
1717 int
1718 ksocknal_cmd(struct portals_cfg *pcfg, void * private)
1719 {
1720         int rc;
1721
1722         switch(pcfg->pcfg_command) {
1723         case NAL_CMD_GET_INTERFACE: {
1724                 ksock_interface_t *iface;
1725
1726                 read_lock (&ksocknal_data.ksnd_global_lock);
1727
1728                 if (pcfg->pcfg_count < 0 ||
1729                     pcfg->pcfg_count >= ksocknal_data.ksnd_ninterfaces) {
1730                         rc = -ENOENT;
1731                 } else {
1732                         rc = 0;
1733                         iface = &ksocknal_data.ksnd_interfaces[pcfg->pcfg_count];
1734
1735                         pcfg->pcfg_id    = iface->ksni_ipaddr;
1736                         pcfg->pcfg_misc  = iface->ksni_netmask;
1737                         pcfg->pcfg_fd    = iface->ksni_npeers;
1738                         pcfg->pcfg_count = iface->ksni_nroutes;
1739                 }
1740
1741                 read_unlock (&ksocknal_data.ksnd_global_lock);
1742                 break;
1743         }
1744         case NAL_CMD_ADD_INTERFACE: {
1745                 rc = ksocknal_add_interface(pcfg->pcfg_id, /* IP address */
1746                                             pcfg->pcfg_misc); /* net mask */
1747                 break;
1748         }
1749         case NAL_CMD_DEL_INTERFACE: {
1750                 rc = ksocknal_del_interface(pcfg->pcfg_id); /* IP address */
1751                 break;
1752         }
1753         case NAL_CMD_GET_PEER: {
1754                 ptl_nid_t    nid = 0;
1755                 __u32        myip = 0;
1756                 __u32        ip = 0;
1757                 int          port = 0;
1758                 int          conn_count = 0;
1759                 int          share_count = 0;
1760
1761                 rc = ksocknal_get_peer_info(pcfg->pcfg_count, &nid,
1762                                             &myip, &ip, &port,
1763                                             &conn_count,  &share_count);
1764                 pcfg->pcfg_nid   = nid;
1765                 pcfg->pcfg_size  = myip;
1766                 pcfg->pcfg_id    = ip;
1767                 pcfg->pcfg_misc  = port;
1768                 pcfg->pcfg_count = conn_count;
1769                 pcfg->pcfg_wait  = share_count;
1770                 break;
1771         }
1772         case NAL_CMD_ADD_PEER: {
1773                 rc = ksocknal_add_peer (pcfg->pcfg_nid,
1774                                         pcfg->pcfg_id, /* IP */
1775                                         pcfg->pcfg_misc); /* port */
1776                 break;
1777         }
1778         case NAL_CMD_DEL_PEER: {
1779                 rc = ksocknal_del_peer (pcfg->pcfg_nid,
1780                                         pcfg->pcfg_id, /* IP */
1781                                         pcfg->pcfg_flags); /* single_share? */
1782                 break;
1783         }
1784         case NAL_CMD_GET_CONN: {
1785                 ksock_conn_t *conn = ksocknal_get_conn_by_idx (pcfg->pcfg_count);
1786
1787                 if (conn == NULL)
1788                         rc = -ENOENT;
1789                 else {
1790                         int   txmem;
1791                         int   rxmem;
1792                         int   nagle;
1793
1794                         ksocknal_get_conn_tunables(conn, &txmem, &rxmem, &nagle);
1795
1796                         rc = 0;
1797                         pcfg->pcfg_nid    = conn->ksnc_peer->ksnp_nid;
1798                         pcfg->pcfg_id     = conn->ksnc_ipaddr;
1799                         pcfg->pcfg_misc   = conn->ksnc_port;
1800                         pcfg->pcfg_fd     = conn->ksnc_myipaddr;
1801                         pcfg->pcfg_flags  = conn->ksnc_type;
1802                         pcfg->pcfg_gw_nal = conn->ksnc_scheduler -
1803                                             ksocknal_data.ksnd_schedulers;
1804                         pcfg->pcfg_count  = txmem;
1805                         pcfg->pcfg_size   = rxmem;
1806                         pcfg->pcfg_wait   = nagle;
1807                         ksocknal_put_conn (conn);
1808                 }
1809                 break;
1810         }
1811         case NAL_CMD_REGISTER_PEER_FD: {
1812                 struct socket *sock = sockfd_lookup (pcfg->pcfg_fd, &rc);
1813                 int            type = pcfg->pcfg_misc;
1814
1815                 if (sock == NULL)
1816                         break;
1817
1818                 switch (type) {
1819                 case SOCKNAL_CONN_NONE:
1820                 case SOCKNAL_CONN_ANY:
1821                 case SOCKNAL_CONN_CONTROL:
1822                 case SOCKNAL_CONN_BULK_IN:
1823                 case SOCKNAL_CONN_BULK_OUT:
1824                         rc = ksocknal_create_conn(NULL, sock, type);
1825                         break;
1826                 default:
1827                         rc = -EINVAL;
1828                         break;
1829                 }
1830                 cfs_put_file (KSN_SOCK2FILE(sock));
1831                 break;
1832         }
1833         case NAL_CMD_CLOSE_CONNECTION: {
1834                 rc = ksocknal_close_matching_conns (pcfg->pcfg_nid,
1835                                                     pcfg->pcfg_id);
1836                 break;
1837         }
1838         case NAL_CMD_REGISTER_MYNID: {
1839                 rc = ksocknal_set_mynid (pcfg->pcfg_nid);
1840                 break;
1841         }
1842         case NAL_CMD_PUSH_CONNECTION: {
1843                 rc = ksocknal_push (pcfg->pcfg_nid);
1844                 break;
1845         }
1846         default:
1847                 rc = -EINVAL;
1848                 break;
1849         }
1850
1851         return rc;
1852 }
1853
1854 void
1855 ksocknal_free_fmbs (ksock_fmb_pool_t *p)
1856 {
1857         int          npages = p->fmp_buff_pages;
1858         ksock_fmb_t *fmb;
1859         int          i;
1860
1861         LASSERT (list_empty(&p->fmp_blocked_conns));
1862         LASSERT (p->fmp_nactive_fmbs == 0);
1863
1864         while (!list_empty(&p->fmp_idle_fmbs)) {
1865
1866                 fmb = list_entry(p->fmp_idle_fmbs.next,
1867                                  ksock_fmb_t, fmb_list);
1868
1869                 for (i = 0; i < npages; i++)
1870                         if (fmb->fmb_kiov[i].kiov_page != NULL)
1871                                 cfs_free_page(fmb->fmb_kiov[i].kiov_page);
1872
1873                 list_del(&fmb->fmb_list);
1874                 PORTAL_FREE(fmb, offsetof(ksock_fmb_t, fmb_kiov[npages]));
1875         }
1876 }
1877
1878 void
1879 ksocknal_free_buffers (void)
1880 {
1881         ksocknal_free_fmbs(&ksocknal_data.ksnd_small_fmp);
1882         ksocknal_free_fmbs(&ksocknal_data.ksnd_large_fmp);
1883
1884         LASSERT (atomic_read(&ksocknal_data.ksnd_nactive_ltxs) == 0);
1885
1886         if (ksocknal_data.ksnd_schedulers != NULL)
1887                 PORTAL_FREE (ksocknal_data.ksnd_schedulers,
1888                              sizeof (ksock_sched_t) * ksocknal_data.ksnd_nschedulers);
1889
1890         PORTAL_FREE (ksocknal_data.ksnd_peers,
1891                      sizeof (struct list_head) *
1892                      ksocknal_data.ksnd_peer_hash_size);
1893 }
1894
1895 void
1896 ksocknal_api_shutdown (nal_t *nal)
1897 {
1898         ksock_sched_t *sched;
1899         int            i;
1900
1901         if (nal->nal_refct != 0) {
1902                 /* This module got the first ref */
1903                 PORTAL_MODULE_UNUSE;
1904                 return;
1905         }
1906
1907         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1908                atomic_read (&portal_kmemory));
1909
1910         LASSERT(nal == &ksocknal_api);
1911
1912         switch (ksocknal_data.ksnd_init) {
1913         default:
1914                 LASSERT (0);
1915
1916         case SOCKNAL_INIT_ALL:
1917                 libcfs_nal_cmd_unregister(SOCKNAL);
1918
1919                 ksocknal_data.ksnd_init = SOCKNAL_INIT_LIB;
1920                 /* fall through */
1921
1922         case SOCKNAL_INIT_LIB:
1923                 /* No more calls to ksocknal_cmd() to create new
1924                  * autoroutes/connections since we're being unloaded. */
1925
1926                 /* Delete all peers */
1927                 ksocknal_del_peer(PTL_NID_ANY, 0, 0);
1928
1929                 /* Wait for all peer state to clean up */
1930                 i = 2;
1931                 while (atomic_read (&ksocknal_data.ksnd_npeers) != 0) {
1932                         i++;
1933                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1934                                "waiting for %d peers to disconnect\n",
1935                                atomic_read (&ksocknal_data.ksnd_npeers));
1936                         set_current_state (TASK_UNINTERRUPTIBLE);
1937                         schedule_timeout (cfs_time_seconds(1));
1938                 }
1939
1940                 /* Tell lib we've stopped calling into her. */
1941                 lib_fini(&ksocknal_lib);
1942
1943                 ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
1944                 /* fall through */
1945
1946         case SOCKNAL_INIT_DATA:
1947                 LASSERT (atomic_read (&ksocknal_data.ksnd_npeers) == 0);
1948                 LASSERT (ksocknal_data.ksnd_peers != NULL);
1949                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1950                         LASSERT (list_empty (&ksocknal_data.ksnd_peers[i]));
1951                 }
1952                 LASSERT (list_empty (&ksocknal_data.ksnd_enomem_conns));
1953                 LASSERT (list_empty (&ksocknal_data.ksnd_zombie_conns));
1954                 LASSERT (list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
1955                 LASSERT (list_empty (&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns));
1956                 LASSERT (list_empty (&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns));
1957
1958                 if (ksocknal_data.ksnd_schedulers != NULL)
1959                         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
1960                                 ksock_sched_t *kss =
1961                                         &ksocknal_data.ksnd_schedulers[i];
1962
1963                                 LASSERT (list_empty (&kss->kss_tx_conns));
1964                                 LASSERT (list_empty (&kss->kss_rx_conns));
1965                                 LASSERT (kss->kss_nconns == 0);
1966                         }
1967
1968                 /* stop router calling me */
1969                 kpr_shutdown (&ksocknal_data.ksnd_router);
1970
1971                 /* flag threads to terminate; wake and wait for them to die */
1972                 ksocknal_data.ksnd_shuttingdown = 1;
1973                 cfs_waitq_broadcast (&ksocknal_data.ksnd_autoconnectd_waitq);
1974                 cfs_waitq_broadcast (&ksocknal_data.ksnd_reaper_waitq);
1975
1976                 for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
1977                         sched = &ksocknal_data.ksnd_schedulers[i];
1978                         cfs_waitq_broadcast(&sched->kss_waitq);
1979                 }
1980
1981                 i = 4;
1982                 read_lock(&ksocknal_data.ksnd_global_lock);
1983                 while (ksocknal_data.ksnd_nthreads != 0) {
1984                         i++;
1985                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1986                                "waiting for %d threads to terminate\n",
1987                                 ksocknal_data.ksnd_nthreads);
1988                         read_unlock(&ksocknal_data.ksnd_global_lock);
1989                         set_current_state (TASK_UNINTERRUPTIBLE);
1990                         schedule_timeout (cfs_time_seconds(1));
1991                         read_lock(&ksocknal_data.ksnd_global_lock);
1992                 }
1993                 read_unlock(&ksocknal_data.ksnd_global_lock);
1994
1995                 kpr_deregister (&ksocknal_data.ksnd_router);
1996
1997                 ksocknal_free_buffers();
1998
1999                 ksocknal_data.ksnd_init = SOCKNAL_INIT_NOTHING;
2000                 /* fall through */
2001
2002         case SOCKNAL_INIT_NOTHING:
2003                 break;
2004         }
2005
2006         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
2007                atomic_read (&portal_kmemory));
2008
2009         printk(KERN_INFO "Lustre: Routing socket NAL unloaded (final mem %d)\n",
2010                atomic_read(&portal_kmemory));
2011 }
2012
2013
2014 void
2015 ksocknal_init_incarnation (void)
2016 {
2017         struct timeval tv;
2018
2019         /* The incarnation number is the time this module loaded and it
2020          * identifies this particular instance of the socknal.  Hopefully
2021          * we won't be able to reboot more frequently than 1MHz for the
2022          * forseeable future :) */
2023
2024         do_gettimeofday(&tv);
2025
2026         ksocknal_data.ksnd_incarnation =
2027                 (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
2028 }
2029
2030 int
2031 ksocknal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
2032                       ptl_ni_limits_t *requested_limits,
2033                       ptl_ni_limits_t *actual_limits)
2034 {
2035         ptl_process_id_t  process_id;
2036         int               pkmem = atomic_read(&portal_kmemory);
2037         int               rc;
2038         int               i;
2039         int               j;
2040
2041         LASSERT (nal == &ksocknal_api);
2042
2043         if (nal->nal_refct != 0) {
2044                 if (actual_limits != NULL)
2045                         *actual_limits = ksocknal_lib.libnal_ni.ni_actual_limits;
2046                 /* This module got the first ref */
2047                 PORTAL_MODULE_USE;
2048                 return (PTL_OK);
2049         }
2050
2051         LASSERT (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
2052
2053         memset (&ksocknal_data, 0, sizeof (ksocknal_data)); /* zero pointers */
2054
2055         ksocknal_init_incarnation();
2056
2057         ksocknal_data.ksnd_peer_hash_size = SOCKNAL_PEER_HASH_SIZE;
2058         PORTAL_ALLOC (ksocknal_data.ksnd_peers,
2059                       sizeof (struct list_head) * ksocknal_data.ksnd_peer_hash_size);
2060         if (ksocknal_data.ksnd_peers == NULL)
2061                 return (-ENOMEM);
2062
2063         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++)
2064                 CFS_INIT_LIST_HEAD(&ksocknal_data.ksnd_peers[i]);
2065
2066         rwlock_init(&ksocknal_data.ksnd_global_lock);
2067
2068         spin_lock_init(&ksocknal_data.ksnd_small_fmp.fmp_lock);
2069         CFS_INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_idle_fmbs);
2070         CFS_INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns);
2071         ksocknal_data.ksnd_small_fmp.fmp_buff_pages = SOCKNAL_SMALL_FWD_PAGES;
2072
2073         spin_lock_init(&ksocknal_data.ksnd_large_fmp.fmp_lock);
2074         CFS_INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_idle_fmbs);
2075         CFS_INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns);
2076         ksocknal_data.ksnd_large_fmp.fmp_buff_pages = SOCKNAL_LARGE_FWD_PAGES;
2077
2078         spin_lock_init (&ksocknal_data.ksnd_reaper_lock);
2079         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_enomem_conns);
2080         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_zombie_conns);
2081         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_deathrow_conns);
2082         cfs_waitq_init(&ksocknal_data.ksnd_reaper_waitq);
2083
2084         spin_lock_init (&ksocknal_data.ksnd_autoconnectd_lock);
2085         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_autoconnectd_routes);
2086         cfs_waitq_init(&ksocknal_data.ksnd_autoconnectd_waitq);
2087
2088         /* NB memset above zeros whole of ksocknal_data, including
2089          * ksocknal_data.ksnd_irqinfo[all].ksni_valid */
2090
2091         /* flag lists/ptrs/locks initialised */
2092         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
2093
2094         ksocknal_data.ksnd_nschedulers = ksocknal_nsched();
2095         PORTAL_ALLOC(ksocknal_data.ksnd_schedulers,
2096                      sizeof(ksock_sched_t) * ksocknal_data.ksnd_nschedulers);
2097         if (ksocknal_data.ksnd_schedulers == NULL) {
2098                 ksocknal_api_shutdown (nal);
2099                 return (-ENOMEM);
2100         }
2101
2102         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
2103                 ksock_sched_t *kss = &ksocknal_data.ksnd_schedulers[i];
2104
2105                 spin_lock_init (&kss->kss_lock);
2106                 CFS_INIT_LIST_HEAD (&kss->kss_rx_conns);
2107                 CFS_INIT_LIST_HEAD (&kss->kss_tx_conns);
2108 #if SOCKNAL_ZC
2109                 CFS_INIT_LIST_HEAD (&kss->kss_zctxdone_list);
2110 #endif
2111                 cfs_waitq_init (&kss->kss_waitq);
2112         }
2113
2114         /* NB we have to wait to be told our true NID... */
2115         process_id.pid = requested_pid;
2116         process_id.nid = 0;
2117
2118         rc = lib_init(&ksocknal_lib, nal, process_id,
2119                       requested_limits, actual_limits);
2120         if (rc != PTL_OK) {
2121                 CERROR("lib_init failed: error %d\n", rc);
2122                 ksocknal_api_shutdown (nal);
2123                 return (rc);
2124         }
2125
2126         ksocknal_data.ksnd_init = SOCKNAL_INIT_LIB; // flag lib_init() called
2127
2128         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
2129                 rc = ksocknal_thread_start (ksocknal_scheduler,
2130                                             &ksocknal_data.ksnd_schedulers[i]);
2131                 if (rc != 0) {
2132                         CERROR("Can't spawn socknal scheduler[%d]: %d\n",
2133                                i, rc);
2134                         ksocknal_api_shutdown (nal);
2135                         return (rc);
2136                 }
2137         }
2138
2139         for (i = 0; i < SOCKNAL_N_AUTOCONNECTD; i++) {
2140                 rc = ksocknal_thread_start (ksocknal_autoconnectd, (void *)((long)i));
2141                 if (rc != 0) {
2142                         CERROR("Can't spawn socknal autoconnectd: %d\n", rc);
2143                         ksocknal_api_shutdown (nal);
2144                         return (rc);
2145                 }
2146         }
2147
2148         rc = ksocknal_thread_start (ksocknal_reaper, NULL);
2149         if (rc != 0) {
2150                 CERROR ("Can't spawn socknal reaper: %d\n", rc);
2151                 ksocknal_api_shutdown (nal);
2152                 return (rc);
2153         }
2154
2155         rc = kpr_register(&ksocknal_data.ksnd_router,
2156                           &ksocknal_router_interface);
2157         if (rc != 0) {
2158                 CDEBUG(D_NET, "Can't initialise routing interface "
2159                        "(rc = %d): not routing\n", rc);
2160         } else {
2161                 /* Only allocate forwarding buffers if there's a router */
2162
2163                 for (i = 0; i < (SOCKNAL_SMALL_FWD_NMSGS +
2164                                  SOCKNAL_LARGE_FWD_NMSGS); i++) {
2165                         ksock_fmb_t      *fmb;
2166                         ksock_fmb_pool_t *pool;
2167
2168
2169                         if (i < SOCKNAL_SMALL_FWD_NMSGS)
2170                                 pool = &ksocknal_data.ksnd_small_fmp;
2171                         else
2172                                 pool = &ksocknal_data.ksnd_large_fmp;
2173
2174                         PORTAL_ALLOC(fmb, offsetof(ksock_fmb_t,
2175                                                    fmb_kiov[pool->fmp_buff_pages]));
2176                         if (fmb == NULL) {
2177                                 ksocknal_api_shutdown(nal);
2178                                 return (-ENOMEM);
2179                         }
2180
2181                         fmb->fmb_pool = pool;
2182
2183                         for (j = 0; j < pool->fmp_buff_pages; j++) {
2184                                 fmb->fmb_kiov[j].kiov_page = cfs_alloc_page(CFS_ALLOC_STD);
2185
2186                                 if (fmb->fmb_kiov[j].kiov_page == NULL) {
2187                                         ksocknal_api_shutdown (nal);
2188                                         return (-ENOMEM);
2189                                 }
2190
2191                                 LASSERT(cfs_page_address(fmb->fmb_kiov[j].kiov_page) != NULL);
2192                         }
2193
2194                         list_add(&fmb->fmb_list, &pool->fmp_idle_fmbs);
2195                 }
2196         }
2197
2198         rc = libcfs_nal_cmd_register(SOCKNAL, &ksocknal_cmd, NULL);
2199         if (rc != 0) {
2200                 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
2201                 ksocknal_api_shutdown (nal);
2202                 return (rc);
2203         }
2204
2205         /* flag everything initialised */
2206         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
2207
2208         printk(KERN_INFO "Lustre: Routing socket NAL loaded "
2209                "(Routing %s, initial mem %d, incarnation "LPX64")\n",
2210                kpr_routing (&ksocknal_data.ksnd_router) ?
2211                "enabled" : "disabled", pkmem, ksocknal_data.ksnd_incarnation);
2212
2213         return (0);
2214 }
2215
2216 void __exit
2217 ksocknal_module_fini (void)
2218 {
2219 #ifdef CONFIG_SYSCTL
2220         if (ksocknal_tunables.ksnd_sysctl != NULL)
2221                 unregister_sysctl_table (ksocknal_tunables.ksnd_sysctl);
2222 #endif
2223         PtlNIFini(ksocknal_ni);
2224
2225         ptl_unregister_nal(SOCKNAL);
2226 }
2227
2228 extern cfs_sysctl_table_t ksocknal_top_ctl_table[];
2229
2230 int __init
2231 ksocknal_module_init (void)
2232 {
2233         int    rc;
2234
2235         /* packet descriptor must fit in a router descriptor's scratchpad */
2236         LASSERT(sizeof (ksock_tx_t) <= sizeof (kprfd_scratch_t));
2237         /* the following must be sizeof(int) for proc_dointvec() */
2238         LASSERT(sizeof (ksocknal_tunables.ksnd_io_timeout) == sizeof (int));
2239         LASSERT(sizeof (ksocknal_tunables.ksnd_eager_ack) == sizeof (int));
2240         LASSERT(sizeof (ksocknal_tunables.ksnd_typed_conns) == sizeof (int));
2241         LASSERT(sizeof (ksocknal_tunables.ksnd_min_bulk) == sizeof (int));
2242         LASSERT(sizeof (ksocknal_tunables.ksnd_buffer_size) == sizeof (int));
2243         LASSERT(sizeof (ksocknal_tunables.ksnd_nagle) == sizeof (int));
2244         LASSERT(sizeof (ksocknal_tunables.ksnd_keepalive_idle) == sizeof (int));
2245         LASSERT(sizeof (ksocknal_tunables.ksnd_keepalive_count) == sizeof (int));
2246         LASSERT(sizeof (ksocknal_tunables.ksnd_keepalive_intvl) == sizeof (int));
2247 #if CPU_AFFINITY
2248         LASSERT(sizeof (ksocknal_tunables.ksnd_irq_affinity) == sizeof (int));
2249 #endif
2250 #if SOCKNAL_ZC
2251         LASSERT(sizeof (ksocknal_tunables.ksnd_zc_min_frag) == sizeof (int));
2252 #endif
2253         /* check ksnr_connected/connecting field large enough */
2254         LASSERT(SOCKNAL_CONN_NTYPES <= 4);
2255
2256         ksocknal_api.nal_ni_init = ksocknal_api_startup;
2257         ksocknal_api.nal_ni_fini = ksocknal_api_shutdown;
2258
2259         /* Initialise dynamic tunables to defaults once only */
2260         ksocknal_tunables.ksnd_io_timeout      = SOCKNAL_IO_TIMEOUT;
2261         ksocknal_tunables.ksnd_eager_ack       = SOCKNAL_EAGER_ACK;
2262         ksocknal_tunables.ksnd_typed_conns     = SOCKNAL_TYPED_CONNS;
2263         ksocknal_tunables.ksnd_min_bulk        = SOCKNAL_MIN_BULK;
2264         ksocknal_tunables.ksnd_buffer_size     = SOCKNAL_BUFFER_SIZE;
2265         ksocknal_tunables.ksnd_nagle           = SOCKNAL_NAGLE;
2266         ksocknal_tunables.ksnd_keepalive_idle  = SOCKNAL_KEEPALIVE_IDLE;
2267         ksocknal_tunables.ksnd_keepalive_count = SOCKNAL_KEEPALIVE_COUNT;
2268         ksocknal_tunables.ksnd_keepalive_intvl = SOCKNAL_KEEPALIVE_INTVL;
2269 #if CPU_AFFINITY
2270         ksocknal_tunables.ksnd_irq_affinity = SOCKNAL_IRQ_AFFINITY;
2271 #endif
2272 #if SOCKNAL_ZC
2273         ksocknal_tunables.ksnd_zc_min_frag  = SOCKNAL_ZC_MIN_FRAG;
2274 #endif
2275
2276         rc = ptl_register_nal(SOCKNAL, &ksocknal_api);
2277         if (rc != PTL_OK) {
2278                 CERROR("Can't register SOCKNAL: %d\n", rc);
2279                 return (-ENOMEM);               /* or something... */
2280         }
2281
2282         /* Pure gateways want the NAL started up at module load time... */
2283         rc = PtlNIInit(SOCKNAL, LUSTRE_SRV_PTL_PID, NULL, NULL, &ksocknal_ni);
2284         if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
2285                 ptl_unregister_nal(SOCKNAL);
2286                 return (-ENODEV);
2287         }
2288
2289 #ifdef CONFIG_SYSCTL
2290         /* Press on regardless even if registering sysctl doesn't work */
2291         ksocknal_tunables.ksnd_sysctl =
2292                 register_sysctl_table (ksocknal_top_ctl_table, 0);
2293 #endif
2294         return (0);
2295 }
2296
2297 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2298 MODULE_DESCRIPTION("Kernel TCP Socket NAL v1.0.0");
2299 MODULE_LICENSE("GPL");
2300
2301 cfs_module(ksocknal, "1.0.0", ksocknal_module_init, ksocknal_module_fini);