Whamcloud - gitweb
* Bug 5676 fix: keep extra ref on peer until all done in
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@zabbo.net>
6  *   Author: Peter J. Braam <braam@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Eric Barton <eric@bartonsoftware.com>
9  *
10  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  *   Portals is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Portals is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Portals; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #include "socknal.h"
27
28 nal_t                   ksocknal_api;
29 ksock_nal_data_t        ksocknal_data;
30 ptl_handle_ni_t         ksocknal_ni;
31 ksock_tunables_t        ksocknal_tunables;
32
33 kpr_nal_interface_t ksocknal_router_interface = {
34         kprni_nalid:      SOCKNAL,
35         kprni_arg:        &ksocknal_data,
36         kprni_fwd:        ksocknal_fwd_packet,
37         kprni_notify:     ksocknal_notify,
38 };
39
40 int
41 ksocknal_set_mynid(ptl_nid_t nid)
42 {
43         lib_ni_t *ni = &ksocknal_lib.libnal_ni;
44
45         /* FIXME: we have to do this because we call lib_init() at module
46          * insertion time, which is before we have 'mynid' available.  lib_init
47          * sets the NAL's nid, which it uses to tell other nodes where packets
48          * are coming from.  This is not a very graceful solution to this
49          * problem. */
50
51         CDEBUG(D_IOCTL, "setting mynid to "LPX64" (old nid="LPX64")\n",
52                nid, ni->ni_pid.nid);
53
54         ni->ni_pid.nid = nid;
55         return (0);
56 }
57
58 ksock_interface_t *
59 ksocknal_ip2iface(__u32 ip)
60 {
61         int                i;
62         ksock_interface_t *iface;
63
64         for (i = 0; i < ksocknal_data.ksnd_ninterfaces; i++) {
65                 LASSERT(i < SOCKNAL_MAX_INTERFACES);
66                 iface = &ksocknal_data.ksnd_interfaces[i];
67
68                 if (iface->ksni_ipaddr == ip)
69                         return (iface);
70         }
71
72         return (NULL);
73 }
74
75 ksock_route_t *
76 ksocknal_create_route (__u32 ipaddr, int port)
77 {
78         ksock_route_t *route;
79
80         PORTAL_ALLOC (route, sizeof (*route));
81         if (route == NULL)
82                 return (NULL);
83
84         atomic_set (&route->ksnr_refcount, 1);
85         route->ksnr_peer = NULL;
86         route->ksnr_timeout = cfs_time_current();
87         route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
88         route->ksnr_ipaddr = ipaddr;
89         route->ksnr_port = port;
90         route->ksnr_connecting = 0;
91         route->ksnr_connected = 0;
92         route->ksnr_deleted = 0;
93         route->ksnr_conn_count = 0;
94         route->ksnr_share_count = 0;
95
96         return (route);
97 }
98
99 void
100 ksocknal_destroy_route (ksock_route_t *route)
101 {
102         if (route->ksnr_peer != NULL)
103                 ksocknal_put_peer (route->ksnr_peer);
104
105         PORTAL_FREE (route, sizeof (*route));
106 }
107
108 void
109 ksocknal_put_route (ksock_route_t *route)
110 {
111         CDEBUG (D_OTHER, "putting route[%p] (%d)\n",
112                 route, atomic_read (&route->ksnr_refcount));
113
114         LASSERT (atomic_read (&route->ksnr_refcount) > 0);
115         if (!atomic_dec_and_test (&route->ksnr_refcount))
116              return;
117
118         ksocknal_destroy_route (route);
119 }
120
121 ksock_peer_t *
122 ksocknal_create_peer (ptl_nid_t nid)
123 {
124         ksock_peer_t *peer;
125
126         LASSERT (nid != PTL_NID_ANY);
127
128         PORTAL_ALLOC (peer, sizeof (*peer));
129         if (peer == NULL)
130                 return (NULL);
131
132         memset (peer, 0, sizeof (*peer));       /* NULL pointers/clear flags etc */
133
134         peer->ksnp_nid = nid;
135         atomic_set (&peer->ksnp_refcount, 1);   /* 1 ref for caller */
136         peer->ksnp_closing = 0;
137         CFS_INIT_LIST_HEAD (&peer->ksnp_conns);
138         CFS_INIT_LIST_HEAD (&peer->ksnp_routes);
139         CFS_INIT_LIST_HEAD (&peer->ksnp_tx_queue);
140
141         atomic_inc (&ksocknal_data.ksnd_npeers);
142         return (peer);
143 }
144
145 void
146 ksocknal_destroy_peer (ksock_peer_t *peer)
147 {
148         CDEBUG (D_NET, "peer "LPX64" %p deleted\n", peer->ksnp_nid, peer);
149
150         LASSERT (atomic_read (&peer->ksnp_refcount) == 0);
151         LASSERT (list_empty (&peer->ksnp_conns));
152         LASSERT (list_empty (&peer->ksnp_routes));
153         LASSERT (list_empty (&peer->ksnp_tx_queue));
154
155         PORTAL_FREE (peer, sizeof (*peer));
156
157         /* NB a peer's connections and autoconnect routes keep a reference
158          * on their peer until they are destroyed, so we can be assured
159          * that _all_ state to do with this peer has been cleaned up when
160          * its refcount drops to zero. */
161         atomic_dec (&ksocknal_data.ksnd_npeers);
162 }
163
164 void
165 ksocknal_put_peer (ksock_peer_t *peer)
166 {
167         CDEBUG (D_OTHER, "putting peer[%p] -> "LPX64" (%d)\n",
168                 peer, peer->ksnp_nid,
169                 atomic_read (&peer->ksnp_refcount));
170
171         LASSERT (atomic_read (&peer->ksnp_refcount) > 0);
172         if (!atomic_dec_and_test (&peer->ksnp_refcount))
173                 return;
174
175         ksocknal_destroy_peer (peer);
176 }
177
178 ksock_peer_t *
179 ksocknal_find_peer_locked (ptl_nid_t nid)
180 {
181         struct list_head *peer_list = ksocknal_nid2peerlist (nid);
182         struct list_head *tmp;
183         ksock_peer_t     *peer;
184
185         list_for_each (tmp, peer_list) {
186
187                 peer = list_entry (tmp, ksock_peer_t, ksnp_list);
188
189                 LASSERT (!peer->ksnp_closing);
190
191                 if (peer->ksnp_nid != nid)
192                         continue;
193
194                 CDEBUG(D_NET, "got peer [%p] -> "LPX64" (%d)\n",
195                        peer, nid, atomic_read (&peer->ksnp_refcount));
196                 return (peer);
197         }
198         return (NULL);
199 }
200
201 ksock_peer_t *
202 ksocknal_get_peer (ptl_nid_t nid)
203 {
204         ksock_peer_t     *peer;
205
206         read_lock (&ksocknal_data.ksnd_global_lock);
207         peer = ksocknal_find_peer_locked (nid);
208         if (peer != NULL)                       /* +1 ref for caller? */
209                 atomic_inc (&peer->ksnp_refcount);
210         read_unlock (&ksocknal_data.ksnd_global_lock);
211
212         return (peer);
213 }
214
215 void
216 ksocknal_unlink_peer_locked (ksock_peer_t *peer)
217 {
218         int                i;
219         __u32              ip;
220
221         for (i = 0; i < peer->ksnp_n_passive_ips; i++) {
222                 LASSERT (i < SOCKNAL_MAX_INTERFACES);
223                 ip = peer->ksnp_passive_ips[i];
224
225                 ksocknal_ip2iface(ip)->ksni_npeers--;
226         }
227
228         LASSERT (list_empty(&peer->ksnp_conns));
229         LASSERT (list_empty(&peer->ksnp_routes));
230         LASSERT (!peer->ksnp_closing);
231         peer->ksnp_closing = 1;
232         list_del (&peer->ksnp_list);
233         /* lose peerlist's ref */
234         ksocknal_put_peer (peer);
235 }
236
237 int
238 ksocknal_get_peer_info (int index, ptl_nid_t *nid,
239                         __u32 *myip, __u32 *peer_ip, int *port,
240                         int *conn_count, int *share_count)
241 {
242         ksock_peer_t      *peer;
243         struct list_head  *ptmp;
244         ksock_route_t     *route;
245         struct list_head  *rtmp;
246         int                i;
247         int                j;
248         int                rc = -ENOENT;
249
250         read_lock (&ksocknal_data.ksnd_global_lock);
251
252         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
253
254                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
255                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
256
257                         if (peer->ksnp_n_passive_ips == 0 &&
258                             list_empty(&peer->ksnp_routes)) {
259                                 if (index-- > 0)
260                                         continue;
261
262                                 *nid = peer->ksnp_nid;
263                                 *myip = 0;
264                                 *peer_ip = 0;
265                                 *port = 0;
266                                 *conn_count = 0;
267                                 *share_count = 0;
268                                 rc = 0;
269                                 goto out;
270                         }
271
272                         for (j = 0; j < peer->ksnp_n_passive_ips; j++) {
273                                 if (index-- > 0)
274                                         continue;
275
276                                 *nid = peer->ksnp_nid;
277                                 *myip = peer->ksnp_passive_ips[j];
278                                 *peer_ip = 0;
279                                 *port = 0;
280                                 *conn_count = 0;
281                                 *share_count = 0;
282                                 rc = 0;
283                                 goto out;
284                         }
285
286                         list_for_each (rtmp, &peer->ksnp_routes) {
287                                 if (index-- > 0)
288                                         continue;
289
290                                 route = list_entry(rtmp, ksock_route_t,
291                                                    ksnr_list);
292
293                                 *nid = peer->ksnp_nid;
294                                 *myip = route->ksnr_myipaddr;
295                                 *peer_ip = route->ksnr_ipaddr;
296                                 *port = route->ksnr_port;
297                                 *conn_count = route->ksnr_conn_count;
298                                 *share_count = route->ksnr_share_count;
299                                 rc = 0;
300                                 goto out;
301                         }
302                 }
303         }
304  out:
305         read_unlock (&ksocknal_data.ksnd_global_lock);
306         return (rc);
307 }
308
309 void
310 ksocknal_associate_route_conn_locked(ksock_route_t *route, ksock_conn_t *conn)
311 {
312         ksock_peer_t      *peer = route->ksnr_peer;
313         int                type = conn->ksnc_type;
314         ksock_interface_t *iface;
315
316         conn->ksnc_route = route;
317         atomic_inc (&route->ksnr_refcount);
318
319         if (route->ksnr_myipaddr != conn->ksnc_myipaddr) {
320                 if (route->ksnr_myipaddr == 0) {
321                         /* route wasn't bound locally yet (the initial route) */
322                         CWARN("Binding "LPX64" %u.%u.%u.%u to %u.%u.%u.%u\n",
323                               peer->ksnp_nid,
324                               HIPQUAD(route->ksnr_ipaddr),
325                               HIPQUAD(conn->ksnc_myipaddr));
326                 } else {
327                         CWARN("Rebinding "LPX64" %u.%u.%u.%u from "
328                               "%u.%u.%u.%u to %u.%u.%u.%u\n",
329                               peer->ksnp_nid,
330                               HIPQUAD(route->ksnr_ipaddr),
331                               HIPQUAD(route->ksnr_myipaddr),
332                               HIPQUAD(conn->ksnc_myipaddr));
333
334                         iface = ksocknal_ip2iface(route->ksnr_myipaddr);
335                         if (iface != NULL)
336                                 iface->ksni_nroutes--;
337                 }
338                 route->ksnr_myipaddr = conn->ksnc_myipaddr;
339                 iface = ksocknal_ip2iface(route->ksnr_myipaddr);
340                 if (iface != NULL)
341                         iface->ksni_nroutes++;
342         }
343
344         route->ksnr_connected |= (1<<type);
345         route->ksnr_connecting &= ~(1<<type);
346         route->ksnr_conn_count++;
347
348         /* Successful connection => further attempts can
349          * proceed immediately */
350         route->ksnr_timeout = cfs_time_current();
351         route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
352 }
353
354 void
355 ksocknal_add_route_locked (ksock_peer_t *peer, ksock_route_t *route)
356 {
357         struct list_head  *tmp;
358         ksock_conn_t      *conn;
359         int                type;
360         ksock_route_t     *route2;
361
362         LASSERT (route->ksnr_peer == NULL);
363         LASSERT (route->ksnr_connecting == 0);
364         LASSERT (route->ksnr_connected == 0);
365
366         /* LASSERT(unique) */
367         list_for_each(tmp, &peer->ksnp_routes) {
368                 route2 = list_entry(tmp, ksock_route_t, ksnr_list);
369
370                 if (route2->ksnr_ipaddr == route->ksnr_ipaddr) {
371                         CERROR ("Duplicate route "LPX64" %u.%u.%u.%u\n",
372                                 peer->ksnp_nid, HIPQUAD(route->ksnr_ipaddr));
373                         LBUG();
374                 }
375         }
376
377         route->ksnr_peer = peer;
378         atomic_inc (&peer->ksnp_refcount);
379         /* peer's routelist takes over my ref on 'route' */
380         list_add_tail(&route->ksnr_list, &peer->ksnp_routes);
381
382         list_for_each(tmp, &peer->ksnp_conns) {
383                 conn = list_entry(tmp, ksock_conn_t, ksnc_list);
384                 type = conn->ksnc_type;
385
386                 if (conn->ksnc_ipaddr != route->ksnr_ipaddr)
387                         continue;
388
389                 ksocknal_associate_route_conn_locked(route, conn);
390                 /* keep going (typed routes) */
391         }
392 }
393
394 void
395 ksocknal_del_route_locked (ksock_route_t *route)
396 {
397         ksock_peer_t      *peer = route->ksnr_peer;
398         ksock_interface_t *iface;
399         ksock_conn_t      *conn;
400         struct list_head  *ctmp;
401         struct list_head  *cnxt;
402
403         LASSERT (!route->ksnr_deleted);
404
405         /* Close associated conns */
406         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
407                 conn = list_entry(ctmp, ksock_conn_t, ksnc_list);
408
409                 if (conn->ksnc_route != route)
410                         continue;
411
412                 ksocknal_close_conn_locked (conn, 0);
413         }
414
415         if (route->ksnr_myipaddr != 0) {
416                 iface = ksocknal_ip2iface(route->ksnr_myipaddr);
417                 if (iface != NULL)
418                         iface->ksni_nroutes--;
419         }
420
421         route->ksnr_deleted = 1;
422         list_del (&route->ksnr_list);
423         ksocknal_put_route (route);             /* drop peer's ref */
424
425         if (list_empty (&peer->ksnp_routes) &&
426             list_empty (&peer->ksnp_conns)) {
427                 /* I've just removed the last autoconnect route of a peer
428                  * with no active connections */
429                 ksocknal_unlink_peer_locked (peer);
430         }
431 }
432
433 int
434 ksocknal_add_peer (ptl_nid_t nid, __u32 ipaddr, int port)
435 {
436         unsigned long      flags;
437         struct list_head  *tmp;
438         ksock_peer_t      *peer;
439         ksock_peer_t      *peer2;
440         ksock_route_t     *route;
441         ksock_route_t     *route2;
442
443         if (nid == PTL_NID_ANY)
444                 return (-EINVAL);
445
446         /* Have a brand new peer ready... */
447         peer = ksocknal_create_peer (nid);
448         if (peer == NULL)
449                 return (-ENOMEM);
450
451         route = ksocknal_create_route (ipaddr, port);
452         if (route == NULL) {
453                 ksocknal_put_peer (peer);
454                 return (-ENOMEM);
455         }
456
457         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
458
459         peer2 = ksocknal_find_peer_locked (nid);
460         if (peer2 != NULL) {
461                 ksocknal_put_peer (peer);
462                 peer = peer2;
463         } else {
464                 /* peer table takes my ref on peer */
465                 list_add_tail (&peer->ksnp_list,
466                                ksocknal_nid2peerlist (nid));
467         }
468
469         route2 = NULL;
470         list_for_each (tmp, &peer->ksnp_routes) {
471                 route2 = list_entry(tmp, ksock_route_t, ksnr_list);
472
473                 if (route2->ksnr_ipaddr == ipaddr)
474                         break;
475
476                 route2 = NULL;
477         }
478         if (route2 == NULL) {
479                 ksocknal_add_route_locked(peer, route);
480                 route->ksnr_share_count++;
481         } else {
482                 ksocknal_put_route(route);
483                 route2->ksnr_share_count++;
484         }
485
486         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
487
488         return (0);
489 }
490
491 void
492 ksocknal_del_peer_locked (ksock_peer_t *peer, __u32 ip, int single_share)
493 {
494         ksock_conn_t     *conn;
495         ksock_route_t    *route;
496         struct list_head *tmp;
497         struct list_head *nxt;
498         int               nshared;
499
500         LASSERT (!peer->ksnp_closing);
501
502         /* Extra ref prevents peer disappearing until I'm done with it */
503         atomic_inc(&peer->ksnp_refcount);
504
505         list_for_each_safe (tmp, nxt, &peer->ksnp_routes) {
506                 route = list_entry(tmp, ksock_route_t, ksnr_list);
507
508                 if (single_share && route->ksnr_share_count == 0)
509                         continue;
510
511                 /* no match */
512                 if (!(ip == 0 || route->ksnr_ipaddr == ip))
513                         continue;
514
515                 if (!single_share)
516                         route->ksnr_share_count = 0;
517                 else if (route->ksnr_share_count > 0)
518                         route->ksnr_share_count--;
519
520                 if (route->ksnr_share_count == 0) {
521                         /* This deletes associated conns too */
522                         ksocknal_del_route_locked (route);
523                 }
524
525                 if (single_share)
526                         break;
527         }
528
529         nshared = 0;
530         list_for_each_safe (tmp, nxt, &peer->ksnp_routes) {
531                 route = list_entry(tmp, ksock_route_t, ksnr_list);
532                 nshared += route->ksnr_share_count;
533         }
534
535         if (nshared == 0) {
536                 /* remove everything else if there are no explicit entries
537                  * left */
538
539                 list_for_each_safe (tmp, nxt, &peer->ksnp_routes) {
540                         route = list_entry(tmp, ksock_route_t, ksnr_list);
541
542                         /* we should only be removing auto-entries */
543                         LASSERT(route->ksnr_share_count == 0);
544                         ksocknal_del_route_locked (route);
545                 }
546
547                 list_for_each_safe (tmp, nxt, &peer->ksnp_conns) {
548                         conn = list_entry(tmp, ksock_conn_t, ksnc_list);
549
550                         ksocknal_close_conn_locked(conn, 0);
551                 }
552         }
553
554         ksocknal_put_peer(peer);
555         /* NB peer unlinks itself when last conn/route is removed */
556 }
557
558 int
559 ksocknal_del_peer (ptl_nid_t nid, __u32 ip, int single_share)
560 {
561         unsigned long      flags;
562         struct list_head  *ptmp;
563         struct list_head  *pnxt;
564         ksock_peer_t      *peer;
565         int                lo;
566         int                hi;
567         int                i;
568         int                rc = -ENOENT;
569
570         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
571
572         if (nid != PTL_NID_ANY)
573                 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
574         else {
575                 lo = 0;
576                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
577         }
578
579         for (i = lo; i <= hi; i++) {
580                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
581                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
582
583                         if (!(nid == PTL_NID_ANY || peer->ksnp_nid == nid))
584                                 continue;
585
586                         ksocknal_del_peer_locked (peer, ip, single_share);
587                         rc = 0;                 /* matched! */
588
589                         if (single_share)
590                                 break;
591                 }
592         }
593
594         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
595
596         return (rc);
597 }
598
599 ksock_conn_t *
600 ksocknal_get_conn_by_idx (int index)
601 {
602         ksock_peer_t      *peer;
603         struct list_head  *ptmp;
604         ksock_conn_t      *conn;
605         struct list_head  *ctmp;
606         int                i;
607
608         read_lock (&ksocknal_data.ksnd_global_lock);
609
610         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
611                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
612                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
613
614                         LASSERT (!peer->ksnp_closing);
615
616                         list_for_each (ctmp, &peer->ksnp_conns) {
617                                 if (index-- > 0)
618                                         continue;
619
620                                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
621                                 atomic_inc (&conn->ksnc_refcount);
622                                 read_unlock (&ksocknal_data.ksnd_global_lock);
623                                 return (conn);
624                         }
625                 }
626         }
627
628         read_unlock (&ksocknal_data.ksnd_global_lock);
629         return (NULL);
630 }
631
632 ksock_sched_t *
633 ksocknal_choose_scheduler_locked (unsigned int irq)
634 {
635         ksock_sched_t    *sched;
636         ksock_irqinfo_t  *info;
637         int               i;
638
639         LASSERT (irq < NR_IRQS);
640         info = &ksocknal_data.ksnd_irqinfo[irq];
641
642         if (irq != 0 &&                         /* hardware NIC */
643             info->ksni_valid) {                 /* already set up */
644                 return (&ksocknal_data.ksnd_schedulers[info->ksni_sched]);
645         }
646
647         /* software NIC (irq == 0) || not associated with a scheduler yet.
648          * Choose the CPU with the fewest connections... */
649         sched = &ksocknal_data.ksnd_schedulers[0];
650         for (i = 1; i < ksocknal_data.ksnd_nschedulers; i++)
651                 if (sched->kss_nconns >
652                     ksocknal_data.ksnd_schedulers[i].kss_nconns)
653                         sched = &ksocknal_data.ksnd_schedulers[i];
654
655         if (irq != 0) {                         /* Hardware NIC */
656                 info->ksni_valid = 1;
657                 info->ksni_sched = sched - ksocknal_data.ksnd_schedulers;
658
659                 /* no overflow... */
660                 LASSERT (info->ksni_sched == sched - ksocknal_data.ksnd_schedulers);
661         }
662
663         return (sched);
664 }
665
666 int
667 ksocknal_local_ipvec (__u32 *ipaddrs)
668 {
669         int                i;
670         int                nip;
671
672         read_lock (&ksocknal_data.ksnd_global_lock);
673
674         nip = ksocknal_data.ksnd_ninterfaces;
675         for (i = 0; i < nip; i++) {
676                 LASSERT (i < SOCKNAL_MAX_INTERFACES);
677
678                 ipaddrs[i] = ksocknal_data.ksnd_interfaces[i].ksni_ipaddr;
679                 LASSERT (ipaddrs[i] != 0);
680         }
681
682         read_unlock (&ksocknal_data.ksnd_global_lock);
683         return (nip);
684 }
685
686 int
687 ksocknal_match_peerip (ksock_interface_t *iface, __u32 *ips, int nips)
688 {
689         int   best_netmatch = 0;
690         int   best_xor      = 0;
691         int   best          = -1;
692         int   this_xor;
693         int   this_netmatch;
694         int   i;
695
696         for (i = 0; i < nips; i++) {
697                 if (ips[i] == 0)
698                         continue;
699
700                 this_xor = (ips[i] ^ iface->ksni_ipaddr);
701                 this_netmatch = ((this_xor & iface->ksni_netmask) == 0) ? 1 : 0;
702
703                 if (!(best < 0 ||
704                       best_netmatch < this_netmatch ||
705                       (best_netmatch == this_netmatch &&
706                        best_xor > this_xor)))
707                         continue;
708
709                 best = i;
710                 best_netmatch = this_netmatch;
711                 best_xor = this_xor;
712         }
713
714         LASSERT (best >= 0);
715         return (best);
716 }
717
718 int
719 ksocknal_select_ips(ksock_peer_t *peer, __u32 *peerips, int n_peerips)
720 {
721         rwlock_t           *global_lock = &ksocknal_data.ksnd_global_lock;
722         unsigned long       flags;
723         ksock_interface_t  *iface;
724         ksock_interface_t  *best_iface;
725         int                 n_ips;
726         int                 i;
727         int                 j;
728         int                 k;
729         __u32               ip;
730         __u32               xor;
731         int                 this_netmatch;
732         int                 best_netmatch;
733         int                 best_npeers;
734
735         /* CAVEAT EMPTOR: We do all our interface matching with an
736          * exclusive hold of global lock at IRQ priority.  We're only
737          * expecting to be dealing with small numbers of interfaces, so the
738          * O(n**3)-ness shouldn't matter */
739
740         /* Also note that I'm not going to return more than n_peerips
741          * interfaces, even if I have more myself */
742
743         write_lock_irqsave(global_lock, flags);
744
745         LASSERT (n_peerips <= SOCKNAL_MAX_INTERFACES);
746         LASSERT (ksocknal_data.ksnd_ninterfaces <= SOCKNAL_MAX_INTERFACES);
747
748         n_ips = MIN(n_peerips, ksocknal_data.ksnd_ninterfaces);
749
750         for (i = 0; peer->ksnp_n_passive_ips < n_ips; i++) {
751                 /*              ^ yes really... */
752
753                 /* If we have any new interfaces, first tick off all the
754                  * peer IPs that match old interfaces, then choose new
755                  * interfaces to match the remaining peer IPS.
756                  * We don't forget interfaces we've stopped using; we might
757                  * start using them again... */
758
759                 if (i < peer->ksnp_n_passive_ips) {
760                         /* Old interface. */
761                         ip = peer->ksnp_passive_ips[i];
762                         best_iface = ksocknal_ip2iface(ip);
763
764                         /* peer passive ips are kept up to date */
765                         LASSERT(best_iface != NULL);
766                 } else {
767                         /* choose a new interface */
768                         LASSERT (i == peer->ksnp_n_passive_ips);
769
770                         best_iface = NULL;
771                         best_netmatch = 0;
772                         best_npeers = 0;
773
774                         for (j = 0; j < ksocknal_data.ksnd_ninterfaces; j++) {
775                                 iface = &ksocknal_data.ksnd_interfaces[j];
776                                 ip = iface->ksni_ipaddr;
777
778                                 for (k = 0; k < peer->ksnp_n_passive_ips; k++)
779                                         if (peer->ksnp_passive_ips[k] == ip)
780                                                 break;
781
782                                 if (k < peer->ksnp_n_passive_ips) /* using it already */
783                                         continue;
784
785                                 k = ksocknal_match_peerip(iface, peerips, n_peerips);
786                                 xor = (ip ^ peerips[k]);
787                                 this_netmatch = ((xor & iface->ksni_netmask) == 0) ? 1 : 0;
788
789                                 if (!(best_iface == NULL ||
790                                       best_netmatch < this_netmatch ||
791                                       (best_netmatch == this_netmatch &&
792                                        best_npeers > iface->ksni_npeers)))
793                                         continue;
794
795                                 best_iface = iface;
796                                 best_netmatch = this_netmatch;
797                                 best_npeers = iface->ksni_npeers;
798                         }
799
800                         best_iface->ksni_npeers++;
801                         ip = best_iface->ksni_ipaddr;
802                         peer->ksnp_passive_ips[i] = ip;
803                         peer->ksnp_n_passive_ips = i+1;
804                 }
805
806                 LASSERT (best_iface != NULL);
807
808                 /* mark the best matching peer IP used */
809                 j = ksocknal_match_peerip(best_iface, peerips, n_peerips);
810                 peerips[j] = 0;
811         }
812
813         /* Overwrite input peer IP addresses */
814         memcpy(peerips, peer->ksnp_passive_ips, n_ips * sizeof(*peerips));
815
816         write_unlock_irqrestore(global_lock, flags);
817
818         return (n_ips);
819 }
820
821 void
822 ksocknal_create_routes(ksock_peer_t *peer, int port,
823                        __u32 *peer_ipaddrs, int npeer_ipaddrs)
824 {
825         ksock_route_t      *newroute = NULL;
826         rwlock_t           *global_lock = &ksocknal_data.ksnd_global_lock;
827         unsigned long       flags;
828         struct list_head   *rtmp;
829         ksock_route_t      *route;
830         ksock_interface_t  *iface;
831         ksock_interface_t  *best_iface;
832         int                 best_netmatch;
833         int                 this_netmatch;
834         int                 best_nroutes;
835         int                 i;
836         int                 j;
837
838         /* CAVEAT EMPTOR: We do all our interface matching with an
839          * exclusive hold of global lock at IRQ priority.  We're only
840          * expecting to be dealing with small numbers of interfaces, so the
841          * O(n**3)-ness here shouldn't matter */
842
843         write_lock_irqsave(global_lock, flags);
844
845         LASSERT (npeer_ipaddrs <= SOCKNAL_MAX_INTERFACES);
846
847         for (i = 0; i < npeer_ipaddrs; i++) {
848                 if (newroute != NULL) {
849                         newroute->ksnr_ipaddr = peer_ipaddrs[i];
850                 } else {
851                         write_unlock_irqrestore(global_lock, flags);
852
853                         newroute = ksocknal_create_route(peer_ipaddrs[i], port);
854                         if (newroute == NULL)
855                                 return;
856
857                         write_lock_irqsave(global_lock, flags);
858                 }
859
860                 /* Already got a route? */
861                 route = NULL;
862                 list_for_each(rtmp, &peer->ksnp_routes) {
863                         route = list_entry(rtmp, ksock_route_t, ksnr_list);
864
865                         if (route->ksnr_ipaddr == newroute->ksnr_ipaddr)
866                                 break;
867
868                         route = NULL;
869                 }
870                 if (route != NULL)
871                         continue;
872
873                 best_iface = NULL;
874                 best_nroutes = 0;
875                 best_netmatch = 0;
876
877                 LASSERT (ksocknal_data.ksnd_ninterfaces <= SOCKNAL_MAX_INTERFACES);
878
879                 /* Select interface to connect from */
880                 for (j = 0; j < ksocknal_data.ksnd_ninterfaces; j++) {
881                         iface = &ksocknal_data.ksnd_interfaces[j];
882
883                         /* Using this interface already? */
884                         list_for_each(rtmp, &peer->ksnp_routes) {
885                                 route = list_entry(rtmp, ksock_route_t, ksnr_list);
886
887                                 if (route->ksnr_myipaddr == iface->ksni_ipaddr)
888                                         break;
889
890                                 route = NULL;
891                         }
892                         if (route != NULL)
893                                 continue;
894
895                         this_netmatch = (((iface->ksni_ipaddr ^
896                                            newroute->ksnr_ipaddr) &
897                                            iface->ksni_netmask) == 0) ? 1 : 0;
898
899                         if (!(best_iface == NULL ||
900                               best_netmatch < this_netmatch ||
901                               (best_netmatch == this_netmatch &&
902                                best_nroutes > iface->ksni_nroutes)))
903                                 continue;
904
905                         best_iface = iface;
906                         best_netmatch = this_netmatch;
907                         best_nroutes = iface->ksni_nroutes;
908                 }
909
910                 if (best_iface == NULL)
911                         continue;
912
913                 newroute->ksnr_myipaddr = best_iface->ksni_ipaddr;
914                 best_iface->ksni_nroutes++;
915
916                 ksocknal_add_route_locked(peer, newroute);
917                 newroute = NULL;
918         }
919
920         write_unlock_irqrestore(global_lock, flags);
921         if (newroute != NULL)
922                 ksocknal_put_route(newroute);
923 }
924
925 int
926 ksocknal_create_conn (ksock_route_t *route, struct socket *sock, int type)
927 {
928         int                passive = (type == SOCKNAL_CONN_NONE);
929         rwlock_t          *global_lock = &ksocknal_data.ksnd_global_lock;
930         __u32              ipaddrs[SOCKNAL_MAX_INTERFACES];
931         int                nipaddrs;
932         ptl_nid_t          nid;
933         struct list_head  *tmp;
934         __u64              incarnation;
935         unsigned long      flags;
936         ksock_conn_t      *conn;
937         ksock_conn_t      *conn2;
938         ksock_peer_t      *peer = NULL;
939         ksock_peer_t      *peer2;
940         ksock_sched_t     *sched;
941         unsigned int       irq;
942         ksock_tx_t        *tx;
943         int                rc;
944
945         /* NB, sock has an associated file since (a) this connection might
946          * have been created in userland and (b) we need to refcount the
947          * socket so that we don't close it while I/O is being done on
948          * it, and sock->file has that pre-cooked... */
949         LASSERT (KSN_SOCK2FILE(sock) != NULL);
950         LASSERT (cfs_file_count(KSN_SOCK2FILE(sock)) > 0);
951         LASSERT (route == NULL || !passive);
952
953         rc = ksocknal_lib_setup_sock (sock);
954         if (rc != 0)
955                 return (rc);
956
957         irq = ksocknal_lib_sock_irq (sock);
958
959         PORTAL_ALLOC(conn, sizeof(*conn));
960         if (conn == NULL)
961                 return (-ENOMEM);
962
963         memset (conn, 0, sizeof (*conn));
964         conn->ksnc_peer = NULL;
965         conn->ksnc_route = NULL;
966         conn->ksnc_sock = sock;
967         conn->ksnc_type = type;
968         ksocknal_lib_save_callback(sock, conn);
969         atomic_set (&conn->ksnc_refcount, 1);    /* 1 ref for me */
970
971         conn->ksnc_rx_ready = 0;
972         conn->ksnc_rx_scheduled = 0;
973         ksocknal_new_packet (conn, 0);
974
975         CFS_INIT_LIST_HEAD (&conn->ksnc_tx_queue);
976         conn->ksnc_tx_ready = 0;
977         conn->ksnc_tx_scheduled = 0;
978         atomic_set (&conn->ksnc_tx_nob, 0);
979
980         /* stash conn's local and remote addrs */
981         rc = ksocknal_lib_get_conn_addrs (conn);
982         if (rc != 0)
983                 goto failed_0;
984
985         if (!passive) {
986                 /* Active connection sends HELLO eagerly */
987                 rc = ksocknal_local_ipvec(ipaddrs);
988                 if (rc < 0)
989                         goto failed_0;
990                 nipaddrs = rc;
991
992                 rc = ksocknal_send_hello (conn, ipaddrs, nipaddrs);
993                 if (rc != 0)
994                         goto failed_0;
995         }
996
997         /* Find out/confirm peer's NID and connection type and get the
998          * vector of interfaces she's willing to let me connect to */
999         nid = (route == NULL) ? PTL_NID_ANY : route->ksnr_peer->ksnp_nid;
1000         rc = ksocknal_recv_hello (conn, &nid, &incarnation, ipaddrs);
1001         if (rc < 0)
1002                 goto failed_0;
1003         nipaddrs = rc;
1004         LASSERT (nid != PTL_NID_ANY);
1005
1006         if (route != NULL) {
1007                 peer = route->ksnr_peer;
1008                 atomic_inc(&peer->ksnp_refcount);
1009         } else {
1010                 peer = ksocknal_create_peer(nid);
1011                 if (peer == NULL) {
1012                         rc = -ENOMEM;
1013                         goto failed_0;
1014                 }
1015
1016                 write_lock_irqsave(global_lock, flags);
1017
1018                 peer2 = ksocknal_find_peer_locked(nid);
1019                 if (peer2 == NULL) {
1020                         /* NB this puts an "empty" peer in the peer
1021                          * table (which takes my ref) */
1022                         list_add_tail(&peer->ksnp_list,
1023                                       ksocknal_nid2peerlist(nid));
1024                 } else  {
1025                         ksocknal_put_peer(peer);
1026                         peer = peer2;
1027                 }
1028                 /* +1 ref for me */
1029                 atomic_inc(&peer->ksnp_refcount);
1030
1031                 write_unlock_irqrestore(global_lock, flags);
1032         }
1033
1034         if (!passive) {
1035                 ksocknal_create_routes(peer, conn->ksnc_port,
1036                                        ipaddrs, nipaddrs);
1037                 rc = 0;
1038         } else {
1039                 rc = ksocknal_select_ips(peer, ipaddrs, nipaddrs);
1040                 LASSERT (rc >= 0);
1041                 rc = ksocknal_send_hello (conn, ipaddrs, rc);
1042         }
1043         if (rc < 0)
1044                 goto failed_1;
1045
1046         write_lock_irqsave (global_lock, flags);
1047
1048         if (peer->ksnp_closing ||
1049             (route != NULL && route->ksnr_deleted)) {
1050                 /* route/peer got closed under me */
1051                 rc = -ESTALE;
1052                 goto failed_2;
1053         }
1054
1055         /* Refuse to duplicate an existing connection (both sides might
1056          * autoconnect at once), unless this is a loopback connection */
1057         if (conn->ksnc_ipaddr != conn->ksnc_myipaddr) {
1058                 list_for_each(tmp, &peer->ksnp_conns) {
1059                         conn2 = list_entry(tmp, ksock_conn_t, ksnc_list);
1060
1061                         if (conn2->ksnc_ipaddr != conn->ksnc_ipaddr ||
1062                             conn2->ksnc_myipaddr != conn->ksnc_myipaddr ||
1063                             conn2->ksnc_type != conn->ksnc_type ||
1064                             conn2->ksnc_incarnation != incarnation)
1065                                 continue;
1066
1067                         CWARN("Not creating duplicate connection to "
1068                               "%u.%u.%u.%u type %d\n",
1069                               HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_type);
1070                         rc = -EALREADY;
1071                         goto failed_2;
1072                 }
1073         }
1074
1075         /* If the connection created by this route didn't bind to the IP
1076          * address the route connected to, the connection/route matching
1077          * code below probably isn't going to work. */
1078         if (route != NULL &&
1079             route->ksnr_ipaddr != conn->ksnc_ipaddr) {
1080                 CERROR("Route "LPX64" %u.%u.%u.%u connected to %u.%u.%u.%u\n",
1081                        peer->ksnp_nid,
1082                        HIPQUAD(route->ksnr_ipaddr),
1083                        HIPQUAD(conn->ksnc_ipaddr));
1084         }
1085
1086         /* Search for a route corresponding to the new connection and
1087          * create an association.  This allows incoming connections created
1088          * by routes in my peer to match my own route entries so I don't
1089          * continually create duplicate routes. */
1090         list_for_each (tmp, &peer->ksnp_routes) {
1091                 route = list_entry(tmp, ksock_route_t, ksnr_list);
1092
1093                 if (route->ksnr_ipaddr != conn->ksnc_ipaddr)
1094                         continue;
1095
1096                 ksocknal_associate_route_conn_locked(route, conn);
1097                 break;
1098         }
1099
1100         /* Give conn a ref on sock->file since we're going to return success */
1101         cfs_get_file(KSN_SOCK2FILE(sock));
1102
1103         conn->ksnc_peer = peer;                 /* conn takes my ref on peer */
1104         conn->ksnc_incarnation = incarnation;
1105         peer->ksnp_last_alive = cfs_time_current();
1106         peer->ksnp_error = 0;
1107
1108         sched = ksocknal_choose_scheduler_locked (irq);
1109         sched->kss_nconns++;
1110         conn->ksnc_scheduler = sched;
1111
1112         /* Set the deadline for the outgoing HELLO to drain */
1113         conn->ksnc_tx_bufnob = SOCK_WMEM_QUEUED(sock);
1114         conn->ksnc_tx_deadline = cfs_time_shift(ksocknal_tunables.ksnd_io_timeout);
1115         mb();       /* order with adding to peer's conn list */
1116
1117         list_add (&conn->ksnc_list, &peer->ksnp_conns);
1118         atomic_inc (&conn->ksnc_refcount);
1119
1120         /* NB my callbacks block while I hold ksnd_global_lock */
1121         ksocknal_lib_set_callback(sock, conn);
1122
1123         /* Take all the packets blocking for a connection.
1124          * NB, it might be nicer to share these blocked packets among any
1125          * other connections that are becoming established. */
1126         while (!list_empty (&peer->ksnp_tx_queue)) {
1127                 tx = list_entry (peer->ksnp_tx_queue.next,
1128                                  ksock_tx_t, tx_list);
1129
1130                 list_del (&tx->tx_list);
1131                 ksocknal_queue_tx_locked (tx, conn);
1132         }
1133
1134         rc = ksocknal_close_stale_conns_locked(peer, incarnation);
1135         if (rc != 0)
1136                 CERROR ("Closed %d stale conns to nid "LPX64" ip %d.%d.%d.%d\n",
1137                         rc, conn->ksnc_peer->ksnp_nid,
1138                         HIPQUAD(conn->ksnc_ipaddr));
1139
1140         write_unlock_irqrestore (global_lock, flags);
1141
1142         ksocknal_lib_bind_irq (irq);
1143
1144         /* Call the callbacks right now to get things going. */
1145         if (ksocknal_getconnsock(conn) == 0) {
1146                 ksocknal_lib_act_callback(sock, conn);
1147                 ksocknal_putconnsock(conn);
1148         }
1149
1150         CWARN("New conn nid:"LPX64" %u.%u.%u.%u -> %u.%u.%u.%u/%d"
1151               " incarnation:"LPX64" sched[%d]/%d\n",
1152               nid, HIPQUAD(conn->ksnc_myipaddr),
1153               HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port, incarnation,
1154               (int)(conn->ksnc_scheduler - ksocknal_data.ksnd_schedulers), irq);
1155
1156         ksocknal_put_conn (conn);
1157         return (0);
1158
1159  failed_2:
1160         if (!peer->ksnp_closing &&
1161             list_empty (&peer->ksnp_conns) &&
1162             list_empty (&peer->ksnp_routes))
1163                 ksocknal_unlink_peer_locked(peer);
1164         write_unlock_irqrestore(global_lock, flags);
1165
1166  failed_1:
1167         ksocknal_put_peer (peer);
1168
1169  failed_0:
1170         PORTAL_FREE (conn, sizeof(*conn));
1171
1172         LASSERT (rc != 0);
1173         return (rc);
1174 }
1175
1176 void
1177 ksocknal_close_conn_locked (ksock_conn_t *conn, int error)
1178 {
1179         /* This just does the immmediate housekeeping, and queues the
1180          * connection for the reaper to terminate.
1181          * Caller holds ksnd_global_lock exclusively in irq context */
1182         ksock_peer_t      *peer = conn->ksnc_peer;
1183         ksock_route_t     *route;
1184         ksock_conn_t      *conn2;
1185         struct list_head  *tmp;
1186
1187         LASSERT (peer->ksnp_error == 0);
1188         LASSERT (!conn->ksnc_closing);
1189         conn->ksnc_closing = 1;
1190         atomic_inc (&ksocknal_data.ksnd_nclosing_conns);
1191
1192         /* ksnd_deathrow_conns takes over peer's ref */
1193         list_del (&conn->ksnc_list);
1194
1195         route = conn->ksnc_route;
1196         if (route != NULL) {
1197                 /* dissociate conn from route... */
1198                 LASSERT (!route->ksnr_deleted);
1199                 LASSERT ((route->ksnr_connecting & (1 << conn->ksnc_type)) == 0);
1200                 LASSERT ((route->ksnr_connected & (1 << conn->ksnc_type)) != 0);
1201
1202                 conn2 = NULL;
1203                 list_for_each(tmp, &peer->ksnp_conns) {
1204                         conn2 = list_entry(tmp, ksock_conn_t, ksnc_list);
1205
1206                         if (conn2->ksnc_route == route &&
1207                             conn2->ksnc_type == conn->ksnc_type)
1208                                 break;
1209
1210                         conn2 = NULL;
1211                 }
1212                 if (conn2 == NULL)
1213                         route->ksnr_connected &= ~(1 << conn->ksnc_type);
1214
1215                 conn->ksnc_route = NULL;
1216
1217 #if 0           /* irrelevent with only eager routes */
1218                 list_del (&route->ksnr_list);   /* make route least favourite */
1219                 list_add_tail (&route->ksnr_list, &peer->ksnp_routes);
1220 #endif
1221                 ksocknal_put_route (route);     /* drop conn's ref on route */
1222         }
1223
1224         if (list_empty (&peer->ksnp_conns)) {
1225                 /* No more connections to this peer */
1226
1227                 peer->ksnp_error = error;       /* stash last conn close reason */
1228
1229                 if (list_empty (&peer->ksnp_routes)) {
1230                         /* I've just closed last conn belonging to a
1231                          * non-autoconnecting peer */
1232                         ksocknal_unlink_peer_locked (peer);
1233                 }
1234         }
1235
1236         spin_lock (&ksocknal_data.ksnd_reaper_lock);
1237
1238         list_add_tail (&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
1239         cfs_waitq_signal (&ksocknal_data.ksnd_reaper_waitq);
1240
1241         spin_unlock (&ksocknal_data.ksnd_reaper_lock);
1242 }
1243
1244 void
1245 ksocknal_terminate_conn (ksock_conn_t *conn)
1246 {
1247         /* This gets called by the reaper (guaranteed thread context) to
1248          * disengage the socket from its callbacks and close it.
1249          * ksnc_refcount will eventually hit zero, and then the reaper will
1250          * destroy it. */
1251         unsigned long   flags;
1252         ksock_peer_t   *peer = conn->ksnc_peer;
1253         ksock_sched_t  *sched = conn->ksnc_scheduler;
1254         struct timeval  now;
1255         time_t          then = 0;
1256         int             notify = 0;
1257
1258         LASSERT(conn->ksnc_closing);
1259
1260         /* wake up the scheduler to "send" all remaining packets to /dev/null */
1261         spin_lock_irqsave(&sched->kss_lock, flags);
1262
1263         if (!conn->ksnc_tx_scheduled &&
1264             !list_empty(&conn->ksnc_tx_queue)){
1265                 list_add_tail (&conn->ksnc_tx_list,
1266                                &sched->kss_tx_conns);
1267                 /* a closing conn is always ready to tx */
1268                 conn->ksnc_tx_ready = 1;
1269                 conn->ksnc_tx_scheduled = 1;
1270                 /* extra ref for scheduler */
1271                 atomic_inc (&conn->ksnc_refcount);
1272
1273                 cfs_waitq_signal (&sched->kss_waitq);
1274         }
1275
1276         spin_unlock_irqrestore (&sched->kss_lock, flags);
1277
1278         /* serialise with callbacks */
1279         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1280
1281         ksocknal_lib_reset_callback(conn->ksnc_sock, conn);
1282
1283         /* OK, so this conn may not be completely disengaged from its
1284          * scheduler yet, but it _has_ committed to terminate... */
1285         conn->ksnc_scheduler->kss_nconns--;
1286
1287         if (peer->ksnp_error != 0) {
1288                 /* peer's last conn closed in error */
1289                 LASSERT (list_empty (&peer->ksnp_conns));
1290
1291                 /* convert peer's last-known-alive timestamp from jiffies */
1292                 do_gettimeofday (&now);
1293                 then = now.tv_sec - cfs_duration_sec(cfs_time_sub(cfs_time_current(),
1294                                                                   peer->ksnp_last_alive));
1295                 notify = 1;
1296         }
1297
1298         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1299
1300         /* The socket is closed on the final put; either here, or in
1301          * ksocknal_{send,recv}msg().  Since we set up the linger2 option
1302          * when the connection was established, this will close the socket
1303          * immediately, aborting anything buffered in it. Any hung
1304          * zero-copy transmits will therefore complete in finite time. */
1305         ksocknal_putconnsock (conn);
1306
1307         if (notify)
1308                 kpr_notify (&ksocknal_data.ksnd_router, peer->ksnp_nid,
1309                             0, then);
1310 }
1311
1312 void
1313 ksocknal_destroy_conn (ksock_conn_t *conn)
1314 {
1315         /* Final coup-de-grace of the reaper */
1316         CDEBUG (D_NET, "connection %p\n", conn);
1317
1318         LASSERT (atomic_read (&conn->ksnc_refcount) == 0);
1319         LASSERT (conn->ksnc_route == NULL);
1320         LASSERT (!conn->ksnc_tx_scheduled);
1321         LASSERT (!conn->ksnc_rx_scheduled);
1322         LASSERT (list_empty(&conn->ksnc_tx_queue));
1323
1324         /* complete current receive if any */
1325         switch (conn->ksnc_rx_state) {
1326         case SOCKNAL_RX_BODY:
1327                 CERROR("Completing partial receive from "LPX64
1328                        ", ip %d.%d.%d.%d:%d, with error\n",
1329                        conn->ksnc_peer->ksnp_nid,
1330                        HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
1331                 lib_finalize (&ksocknal_lib, NULL, conn->ksnc_cookie, PTL_FAIL);
1332                 break;
1333         case SOCKNAL_RX_BODY_FWD:
1334                 ksocknal_fmb_callback (conn->ksnc_cookie, -ECONNABORTED);
1335                 break;
1336         case SOCKNAL_RX_HEADER:
1337         case SOCKNAL_RX_SLOP:
1338                 break;
1339         default:
1340                 LBUG ();
1341                 break;
1342         }
1343
1344         ksocknal_put_peer (conn->ksnc_peer);
1345
1346         PORTAL_FREE (conn, sizeof (*conn));
1347         atomic_dec (&ksocknal_data.ksnd_nclosing_conns);
1348 }
1349
1350 void
1351 ksocknal_put_conn (ksock_conn_t *conn)
1352 {
1353         unsigned long flags;
1354
1355         CDEBUG (D_OTHER, "putting conn[%p] -> "LPX64" (%d)\n",
1356                 conn, conn->ksnc_peer->ksnp_nid,
1357                 atomic_read (&conn->ksnc_refcount));
1358
1359         LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
1360         if (!atomic_dec_and_test (&conn->ksnc_refcount))
1361                 return;
1362
1363         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
1364
1365         list_add (&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1366         cfs_waitq_signal (&ksocknal_data.ksnd_reaper_waitq);
1367
1368         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
1369 }
1370
1371 int
1372 ksocknal_close_peer_conns_locked (ksock_peer_t *peer, __u32 ipaddr, int why)
1373 {
1374         ksock_conn_t       *conn;
1375         struct list_head   *ctmp;
1376         struct list_head   *cnxt;
1377         int                 count = 0;
1378
1379         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1380                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
1381
1382                 if (ipaddr == 0 ||
1383                     conn->ksnc_ipaddr == ipaddr) {
1384                         count++;
1385                         ksocknal_close_conn_locked (conn, why);
1386                 }
1387         }
1388
1389         return (count);
1390 }
1391
1392 int
1393 ksocknal_close_stale_conns_locked (ksock_peer_t *peer, __u64 incarnation)
1394 {
1395         ksock_conn_t       *conn;
1396         struct list_head   *ctmp;
1397         struct list_head   *cnxt;
1398         int                 count = 0;
1399
1400         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1401                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
1402
1403                 if (conn->ksnc_incarnation == incarnation)
1404                         continue;
1405
1406                 CWARN("Closing stale conn nid:"LPX64" ip:%08x/%d "
1407                       "incarnation:"LPX64"("LPX64")\n",
1408                       peer->ksnp_nid, conn->ksnc_ipaddr, conn->ksnc_port,
1409                       conn->ksnc_incarnation, incarnation);
1410
1411                 count++;
1412                 ksocknal_close_conn_locked (conn, -ESTALE);
1413         }
1414
1415         return (count);
1416 }
1417
1418 int
1419 ksocknal_close_conn_and_siblings (ksock_conn_t *conn, int why)
1420 {
1421         ksock_peer_t     *peer = conn->ksnc_peer;
1422         __u32             ipaddr = conn->ksnc_ipaddr;
1423         unsigned long     flags;
1424         int               count;
1425
1426         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1427
1428         count = ksocknal_close_peer_conns_locked (peer, ipaddr, why);
1429
1430         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1431
1432         return (count);
1433 }
1434
1435 int
1436 ksocknal_close_matching_conns (ptl_nid_t nid, __u32 ipaddr)
1437 {
1438         unsigned long       flags;
1439         ksock_peer_t       *peer;
1440         struct list_head   *ptmp;
1441         struct list_head   *pnxt;
1442         int                 lo;
1443         int                 hi;
1444         int                 i;
1445         int                 count = 0;
1446
1447         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1448
1449         if (nid != PTL_NID_ANY)
1450                 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
1451         else {
1452                 lo = 0;
1453                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
1454         }
1455
1456         for (i = lo; i <= hi; i++) {
1457                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
1458
1459                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
1460
1461                         if (!(nid == PTL_NID_ANY || nid == peer->ksnp_nid))
1462                                 continue;
1463
1464                         count += ksocknal_close_peer_conns_locked (peer, ipaddr, 0);
1465                 }
1466         }
1467
1468         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1469
1470         /* wildcards always succeed */
1471         if (nid == PTL_NID_ANY || ipaddr == 0)
1472                 return (0);
1473
1474         return (count == 0 ? -ENOENT : 0);
1475 }
1476
1477 void
1478 ksocknal_notify (void *arg, ptl_nid_t gw_nid, int alive)
1479 {
1480         /* The router is telling me she's been notified of a change in
1481          * gateway state.... */
1482
1483         CDEBUG (D_NET, "gw "LPX64" %s\n", gw_nid, alive ? "up" : "down");
1484
1485         if (!alive) {
1486                 /* If the gateway crashed, close all open connections... */
1487                 ksocknal_close_matching_conns (gw_nid, 0);
1488                 return;
1489         }
1490
1491         /* ...otherwise do nothing.  We can only establish new connections
1492          * if we have autroutes, and these connect on demand. */
1493 }
1494
1495 void
1496 ksocknal_push_peer (ksock_peer_t *peer)
1497 {
1498         int               index;
1499         int               i;
1500         struct list_head *tmp;
1501         ksock_conn_t     *conn;
1502
1503         for (index = 0; ; index++) {
1504                 read_lock (&ksocknal_data.ksnd_global_lock);
1505
1506                 i = 0;
1507                 conn = NULL;
1508
1509                 list_for_each (tmp, &peer->ksnp_conns) {
1510                         if (i++ == index) {
1511                                 conn = list_entry (tmp, ksock_conn_t, ksnc_list);
1512                                 atomic_inc (&conn->ksnc_refcount);
1513                                 break;
1514                         }
1515                 }
1516
1517                 read_unlock (&ksocknal_data.ksnd_global_lock);
1518
1519                 if (conn == NULL)
1520                         break;
1521
1522                 ksocknal_lib_push_conn (conn);
1523                 ksocknal_put_conn (conn);
1524         }
1525 }
1526
1527 int
1528 ksocknal_push (ptl_nid_t nid)
1529 {
1530         ksock_peer_t      *peer;
1531         struct list_head  *tmp;
1532         int                index;
1533         int                i;
1534         int                j;
1535         int                rc = -ENOENT;
1536
1537         if (nid != PTL_NID_ANY) {
1538                 peer = ksocknal_get_peer (nid);
1539
1540                 if (peer != NULL) {
1541                         rc = 0;
1542                         ksocknal_push_peer (peer);
1543                         ksocknal_put_peer (peer);
1544                 }
1545                 return (rc);
1546         }
1547
1548         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1549                 for (j = 0; ; j++) {
1550                         read_lock (&ksocknal_data.ksnd_global_lock);
1551
1552                         index = 0;
1553                         peer = NULL;
1554
1555                         list_for_each (tmp, &ksocknal_data.ksnd_peers[i]) {
1556                                 if (index++ == j) {
1557                                         peer = list_entry(tmp, ksock_peer_t,
1558                                                           ksnp_list);
1559                                         atomic_inc (&peer->ksnp_refcount);
1560                                         break;
1561                                 }
1562                         }
1563
1564                         read_unlock (&ksocknal_data.ksnd_global_lock);
1565
1566                         if (peer != NULL) {
1567                                 rc = 0;
1568                                 ksocknal_push_peer (peer);
1569                                 ksocknal_put_peer (peer);
1570                         }
1571                 }
1572
1573         }
1574
1575         return (rc);
1576 }
1577
1578 int
1579 ksocknal_add_interface(__u32 ipaddress, __u32 netmask)
1580 {
1581         unsigned long      flags;
1582         ksock_interface_t *iface;
1583         int                rc;
1584         int                i;
1585         int                j;
1586         struct list_head  *ptmp;
1587         ksock_peer_t      *peer;
1588         struct list_head  *rtmp;
1589         ksock_route_t     *route;
1590
1591         if (ipaddress == 0 ||
1592             netmask == 0)
1593                 return (-EINVAL);
1594
1595         write_lock_irqsave(&ksocknal_data.ksnd_global_lock, flags);
1596
1597         iface = ksocknal_ip2iface(ipaddress);
1598         if (iface != NULL) {
1599                 /* silently ignore dups */
1600                 rc = 0;
1601         } else if (ksocknal_data.ksnd_ninterfaces == SOCKNAL_MAX_INTERFACES) {
1602                 rc = -ENOSPC;
1603         } else {
1604                 iface = &ksocknal_data.ksnd_interfaces[ksocknal_data.ksnd_ninterfaces++];
1605
1606                 iface->ksni_ipaddr = ipaddress;
1607                 iface->ksni_netmask = netmask;
1608                 iface->ksni_nroutes = 0;
1609                 iface->ksni_npeers = 0;
1610
1611                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1612                         list_for_each(ptmp, &ksocknal_data.ksnd_peers[i]) {
1613                                 peer = list_entry(ptmp, ksock_peer_t, ksnp_list);
1614
1615                                 for (j = 0; i < peer->ksnp_n_passive_ips; j++)
1616                                         if (peer->ksnp_passive_ips[j] == ipaddress)
1617                                                 iface->ksni_npeers++;
1618
1619                                 list_for_each(rtmp, &peer->ksnp_routes) {
1620                                         route = list_entry(rtmp, ksock_route_t, ksnr_list);
1621
1622                                         if (route->ksnr_myipaddr == ipaddress)
1623                                                 iface->ksni_nroutes++;
1624                                 }
1625                         }
1626                 }
1627
1628                 rc = 0;
1629                 /* NB only new connections will pay attention to the new interface! */
1630         }
1631
1632         write_unlock_irqrestore(&ksocknal_data.ksnd_global_lock, flags);
1633
1634         return (rc);
1635 }
1636
1637 void
1638 ksocknal_peer_del_interface_locked(ksock_peer_t *peer, __u32 ipaddr)
1639 {
1640         struct list_head   *tmp;
1641         struct list_head   *nxt;
1642         ksock_route_t      *route;
1643         ksock_conn_t       *conn;
1644         int                 i;
1645         int                 j;
1646
1647         for (i = 0; i < peer->ksnp_n_passive_ips; i++)
1648                 if (peer->ksnp_passive_ips[i] == ipaddr) {
1649                         for (j = i+1; j < peer->ksnp_n_passive_ips; j++)
1650                                 peer->ksnp_passive_ips[j-1] =
1651                                         peer->ksnp_passive_ips[j];
1652                         peer->ksnp_n_passive_ips--;
1653                         break;
1654                 }
1655
1656         list_for_each_safe(tmp, nxt, &peer->ksnp_routes) {
1657                 route = list_entry (tmp, ksock_route_t, ksnr_list);
1658
1659                 if (route->ksnr_myipaddr != ipaddr)
1660                         continue;
1661
1662                 if (route->ksnr_share_count != 0) {
1663                         /* Manually created; keep, but unbind */
1664                         route->ksnr_myipaddr = 0;
1665                 } else {
1666                         ksocknal_del_route_locked(route);
1667                 }
1668         }
1669
1670         list_for_each_safe(tmp, nxt, &peer->ksnp_conns) {
1671                 conn = list_entry(tmp, ksock_conn_t, ksnc_list);
1672
1673                 if (conn->ksnc_myipaddr == ipaddr)
1674                         ksocknal_close_conn_locked (conn, 0);
1675         }
1676 }
1677
1678 int
1679 ksocknal_del_interface(__u32 ipaddress)
1680 {
1681         int                rc = -ENOENT;
1682         unsigned long      flags;
1683         struct list_head  *tmp;
1684         struct list_head  *nxt;
1685         ksock_peer_t      *peer;
1686         __u32              this_ip;
1687         int                i;
1688         int                j;
1689
1690         write_lock_irqsave(&ksocknal_data.ksnd_global_lock, flags);
1691
1692         for (i = 0; i < ksocknal_data.ksnd_ninterfaces; i++) {
1693                 this_ip = ksocknal_data.ksnd_interfaces[i].ksni_ipaddr;
1694
1695                 if (!(ipaddress == 0 ||
1696                       ipaddress == this_ip))
1697                         continue;
1698
1699                 rc = 0;
1700
1701                 for (j = i+1; j < ksocknal_data.ksnd_ninterfaces; j++)
1702                         ksocknal_data.ksnd_interfaces[j-1] =
1703                                 ksocknal_data.ksnd_interfaces[j];
1704
1705                 ksocknal_data.ksnd_ninterfaces--;
1706
1707                 for (j = 0; j < ksocknal_data.ksnd_peer_hash_size; j++) {
1708                         list_for_each_safe(tmp, nxt, &ksocknal_data.ksnd_peers[j]) {
1709                                 peer = list_entry(tmp, ksock_peer_t, ksnp_list);
1710
1711                                 ksocknal_peer_del_interface_locked(peer, this_ip);
1712                         }
1713                 }
1714         }
1715
1716         write_unlock_irqrestore(&ksocknal_data.ksnd_global_lock, flags);
1717
1718         return (rc);
1719 }
1720
1721 int
1722 ksocknal_cmd(struct portals_cfg *pcfg, void * private)
1723 {
1724         int rc;
1725
1726         switch(pcfg->pcfg_command) {
1727         case NAL_CMD_GET_INTERFACE: {
1728                 ksock_interface_t *iface;
1729
1730                 read_lock (&ksocknal_data.ksnd_global_lock);
1731
1732                 if (pcfg->pcfg_count < 0 ||
1733                     pcfg->pcfg_count >= ksocknal_data.ksnd_ninterfaces) {
1734                         rc = -ENOENT;
1735                 } else {
1736                         rc = 0;
1737                         iface = &ksocknal_data.ksnd_interfaces[pcfg->pcfg_count];
1738
1739                         pcfg->pcfg_id    = iface->ksni_ipaddr;
1740                         pcfg->pcfg_misc  = iface->ksni_netmask;
1741                         pcfg->pcfg_fd    = iface->ksni_npeers;
1742                         pcfg->pcfg_count = iface->ksni_nroutes;
1743                 }
1744
1745                 read_unlock (&ksocknal_data.ksnd_global_lock);
1746                 break;
1747         }
1748         case NAL_CMD_ADD_INTERFACE: {
1749                 rc = ksocknal_add_interface(pcfg->pcfg_id, /* IP address */
1750                                             pcfg->pcfg_misc); /* net mask */
1751                 break;
1752         }
1753         case NAL_CMD_DEL_INTERFACE: {
1754                 rc = ksocknal_del_interface(pcfg->pcfg_id); /* IP address */
1755                 break;
1756         }
1757         case NAL_CMD_GET_PEER: {
1758                 ptl_nid_t    nid = 0;
1759                 __u32        myip = 0;
1760                 __u32        ip = 0;
1761                 int          port = 0;
1762                 int          conn_count = 0;
1763                 int          share_count = 0;
1764
1765                 rc = ksocknal_get_peer_info(pcfg->pcfg_count, &nid,
1766                                             &myip, &ip, &port,
1767                                             &conn_count,  &share_count);
1768                 pcfg->pcfg_nid   = nid;
1769                 pcfg->pcfg_size  = myip;
1770                 pcfg->pcfg_id    = ip;
1771                 pcfg->pcfg_misc  = port;
1772                 pcfg->pcfg_count = conn_count;
1773                 pcfg->pcfg_wait  = share_count;
1774                 break;
1775         }
1776         case NAL_CMD_ADD_PEER: {
1777                 rc = ksocknal_add_peer (pcfg->pcfg_nid,
1778                                         pcfg->pcfg_id, /* IP */
1779                                         pcfg->pcfg_misc); /* port */
1780                 break;
1781         }
1782         case NAL_CMD_DEL_PEER: {
1783                 rc = ksocknal_del_peer (pcfg->pcfg_nid,
1784                                         pcfg->pcfg_id, /* IP */
1785                                         pcfg->pcfg_flags); /* single_share? */
1786                 break;
1787         }
1788         case NAL_CMD_GET_CONN: {
1789                 ksock_conn_t *conn = ksocknal_get_conn_by_idx (pcfg->pcfg_count);
1790
1791                 if (conn == NULL)
1792                         rc = -ENOENT;
1793                 else {
1794                         int   txmem;
1795                         int   rxmem;
1796                         int   nagle;
1797
1798                         ksocknal_get_conn_tunables(conn, &txmem, &rxmem, &nagle);
1799
1800                         rc = 0;
1801                         pcfg->pcfg_nid    = conn->ksnc_peer->ksnp_nid;
1802                         pcfg->pcfg_id     = conn->ksnc_ipaddr;
1803                         pcfg->pcfg_misc   = conn->ksnc_port;
1804                         pcfg->pcfg_fd     = conn->ksnc_myipaddr;
1805                         pcfg->pcfg_flags  = conn->ksnc_type;
1806                         pcfg->pcfg_gw_nal = conn->ksnc_scheduler -
1807                                             ksocknal_data.ksnd_schedulers;
1808                         pcfg->pcfg_count  = txmem;
1809                         pcfg->pcfg_size   = rxmem;
1810                         pcfg->pcfg_wait   = nagle;
1811                         ksocknal_put_conn (conn);
1812                 }
1813                 break;
1814         }
1815         case NAL_CMD_REGISTER_PEER_FD: {
1816                 struct socket *sock = sockfd_lookup (pcfg->pcfg_fd, &rc);
1817                 int            type = pcfg->pcfg_misc;
1818
1819                 if (sock == NULL)
1820                         break;
1821
1822                 switch (type) {
1823                 case SOCKNAL_CONN_NONE:
1824                 case SOCKNAL_CONN_ANY:
1825                 case SOCKNAL_CONN_CONTROL:
1826                 case SOCKNAL_CONN_BULK_IN:
1827                 case SOCKNAL_CONN_BULK_OUT:
1828                         rc = ksocknal_create_conn(NULL, sock, type);
1829                         break;
1830                 default:
1831                         rc = -EINVAL;
1832                         break;
1833                 }
1834                 cfs_put_file (KSN_SOCK2FILE(sock));
1835                 break;
1836         }
1837         case NAL_CMD_CLOSE_CONNECTION: {
1838                 rc = ksocknal_close_matching_conns (pcfg->pcfg_nid,
1839                                                     pcfg->pcfg_id);
1840                 break;
1841         }
1842         case NAL_CMD_REGISTER_MYNID: {
1843                 rc = ksocknal_set_mynid (pcfg->pcfg_nid);
1844                 break;
1845         }
1846         case NAL_CMD_PUSH_CONNECTION: {
1847                 rc = ksocknal_push (pcfg->pcfg_nid);
1848                 break;
1849         }
1850         default:
1851                 rc = -EINVAL;
1852                 break;
1853         }
1854
1855         return rc;
1856 }
1857
1858 void
1859 ksocknal_free_fmbs (ksock_fmb_pool_t *p)
1860 {
1861         int          npages = p->fmp_buff_pages;
1862         ksock_fmb_t *fmb;
1863         int          i;
1864
1865         LASSERT (list_empty(&p->fmp_blocked_conns));
1866         LASSERT (p->fmp_nactive_fmbs == 0);
1867
1868         while (!list_empty(&p->fmp_idle_fmbs)) {
1869
1870                 fmb = list_entry(p->fmp_idle_fmbs.next,
1871                                  ksock_fmb_t, fmb_list);
1872
1873                 for (i = 0; i < npages; i++)
1874                         if (fmb->fmb_kiov[i].kiov_page != NULL)
1875                                 cfs_free_page(fmb->fmb_kiov[i].kiov_page);
1876
1877                 list_del(&fmb->fmb_list);
1878                 PORTAL_FREE(fmb, offsetof(ksock_fmb_t, fmb_kiov[npages]));
1879         }
1880 }
1881
1882 void
1883 ksocknal_free_buffers (void)
1884 {
1885         ksocknal_free_fmbs(&ksocknal_data.ksnd_small_fmp);
1886         ksocknal_free_fmbs(&ksocknal_data.ksnd_large_fmp);
1887
1888         LASSERT (atomic_read(&ksocknal_data.ksnd_nactive_ltxs) == 0);
1889
1890         if (ksocknal_data.ksnd_schedulers != NULL)
1891                 PORTAL_FREE (ksocknal_data.ksnd_schedulers,
1892                              sizeof (ksock_sched_t) * ksocknal_data.ksnd_nschedulers);
1893
1894         PORTAL_FREE (ksocknal_data.ksnd_peers,
1895                      sizeof (struct list_head) *
1896                      ksocknal_data.ksnd_peer_hash_size);
1897 }
1898
1899 void
1900 ksocknal_api_shutdown (nal_t *nal)
1901 {
1902         ksock_sched_t *sched;
1903         int            i;
1904
1905         if (nal->nal_refct != 0) {
1906                 /* This module got the first ref */
1907                 PORTAL_MODULE_UNUSE;
1908                 return;
1909         }
1910
1911         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1912                atomic_read (&portal_kmemory));
1913
1914         LASSERT(nal == &ksocknal_api);
1915
1916         switch (ksocknal_data.ksnd_init) {
1917         default:
1918                 LASSERT (0);
1919
1920         case SOCKNAL_INIT_ALL:
1921                 libcfs_nal_cmd_unregister(SOCKNAL);
1922
1923                 ksocknal_data.ksnd_init = SOCKNAL_INIT_LIB;
1924                 /* fall through */
1925
1926         case SOCKNAL_INIT_LIB:
1927                 /* No more calls to ksocknal_cmd() to create new
1928                  * autoroutes/connections since we're being unloaded. */
1929
1930                 /* Delete all peers */
1931                 ksocknal_del_peer(PTL_NID_ANY, 0, 0);
1932
1933                 /* Wait for all peer state to clean up */
1934                 i = 2;
1935                 while (atomic_read (&ksocknal_data.ksnd_npeers) != 0) {
1936                         i++;
1937                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1938                                "waiting for %d peers to disconnect\n",
1939                                atomic_read (&ksocknal_data.ksnd_npeers));
1940                         set_current_state (TASK_UNINTERRUPTIBLE);
1941                         schedule_timeout (cfs_time_seconds(1));
1942                 }
1943
1944                 /* Tell lib we've stopped calling into her. */
1945                 lib_fini(&ksocknal_lib);
1946
1947                 ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
1948                 /* fall through */
1949
1950         case SOCKNAL_INIT_DATA:
1951                 LASSERT (atomic_read (&ksocknal_data.ksnd_npeers) == 0);
1952                 LASSERT (ksocknal_data.ksnd_peers != NULL);
1953                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1954                         LASSERT (list_empty (&ksocknal_data.ksnd_peers[i]));
1955                 }
1956                 LASSERT (list_empty (&ksocknal_data.ksnd_enomem_conns));
1957                 LASSERT (list_empty (&ksocknal_data.ksnd_zombie_conns));
1958                 LASSERT (list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
1959                 LASSERT (list_empty (&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns));
1960                 LASSERT (list_empty (&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns));
1961
1962                 if (ksocknal_data.ksnd_schedulers != NULL)
1963                         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
1964                                 ksock_sched_t *kss =
1965                                         &ksocknal_data.ksnd_schedulers[i];
1966
1967                                 LASSERT (list_empty (&kss->kss_tx_conns));
1968                                 LASSERT (list_empty (&kss->kss_rx_conns));
1969                                 LASSERT (kss->kss_nconns == 0);
1970                         }
1971
1972                 /* stop router calling me */
1973                 kpr_shutdown (&ksocknal_data.ksnd_router);
1974
1975                 /* flag threads to terminate; wake and wait for them to die */
1976                 ksocknal_data.ksnd_shuttingdown = 1;
1977                 cfs_waitq_broadcast (&ksocknal_data.ksnd_autoconnectd_waitq);
1978                 cfs_waitq_broadcast (&ksocknal_data.ksnd_reaper_waitq);
1979
1980                 for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
1981                         sched = &ksocknal_data.ksnd_schedulers[i];
1982                         cfs_waitq_broadcast(&sched->kss_waitq);
1983                 }
1984
1985                 i = 4;
1986                 read_lock(&ksocknal_data.ksnd_global_lock);
1987                 while (ksocknal_data.ksnd_nthreads != 0) {
1988                         i++;
1989                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1990                                "waiting for %d threads to terminate\n",
1991                                 ksocknal_data.ksnd_nthreads);
1992                         read_unlock(&ksocknal_data.ksnd_global_lock);
1993                         set_current_state (TASK_UNINTERRUPTIBLE);
1994                         schedule_timeout (cfs_time_seconds(1));
1995                         read_lock(&ksocknal_data.ksnd_global_lock);
1996                 }
1997                 read_unlock(&ksocknal_data.ksnd_global_lock);
1998
1999                 kpr_deregister (&ksocknal_data.ksnd_router);
2000
2001                 ksocknal_free_buffers();
2002
2003                 ksocknal_data.ksnd_init = SOCKNAL_INIT_NOTHING;
2004                 /* fall through */
2005
2006         case SOCKNAL_INIT_NOTHING:
2007                 break;
2008         }
2009
2010         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
2011                atomic_read (&portal_kmemory));
2012
2013         printk(KERN_INFO "Lustre: Routing socket NAL unloaded (final mem %d)\n",
2014                atomic_read(&portal_kmemory));
2015 }
2016
2017
2018 void
2019 ksocknal_init_incarnation (void)
2020 {
2021         struct timeval tv;
2022
2023         /* The incarnation number is the time this module loaded and it
2024          * identifies this particular instance of the socknal.  Hopefully
2025          * we won't be able to reboot more frequently than 1MHz for the
2026          * forseeable future :) */
2027
2028         do_gettimeofday(&tv);
2029
2030         ksocknal_data.ksnd_incarnation =
2031                 (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
2032 }
2033
2034 int
2035 ksocknal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
2036                       ptl_ni_limits_t *requested_limits,
2037                       ptl_ni_limits_t *actual_limits)
2038 {
2039         ptl_process_id_t  process_id;
2040         int               pkmem = atomic_read(&portal_kmemory);
2041         int               rc;
2042         int               i;
2043         int               j;
2044
2045         LASSERT (nal == &ksocknal_api);
2046
2047         if (nal->nal_refct != 0) {
2048                 if (actual_limits != NULL)
2049                         *actual_limits = ksocknal_lib.libnal_ni.ni_actual_limits;
2050                 /* This module got the first ref */
2051                 PORTAL_MODULE_USE;
2052                 return (PTL_OK);
2053         }
2054
2055         LASSERT (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
2056
2057         memset (&ksocknal_data, 0, sizeof (ksocknal_data)); /* zero pointers */
2058
2059         ksocknal_init_incarnation();
2060
2061         ksocknal_data.ksnd_peer_hash_size = SOCKNAL_PEER_HASH_SIZE;
2062         PORTAL_ALLOC (ksocknal_data.ksnd_peers,
2063                       sizeof (struct list_head) * ksocknal_data.ksnd_peer_hash_size);
2064         if (ksocknal_data.ksnd_peers == NULL)
2065                 return (-ENOMEM);
2066
2067         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++)
2068                 CFS_INIT_LIST_HEAD(&ksocknal_data.ksnd_peers[i]);
2069
2070         rwlock_init(&ksocknal_data.ksnd_global_lock);
2071
2072         spin_lock_init(&ksocknal_data.ksnd_small_fmp.fmp_lock);
2073         CFS_INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_idle_fmbs);
2074         CFS_INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns);
2075         ksocknal_data.ksnd_small_fmp.fmp_buff_pages = SOCKNAL_SMALL_FWD_PAGES;
2076
2077         spin_lock_init(&ksocknal_data.ksnd_large_fmp.fmp_lock);
2078         CFS_INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_idle_fmbs);
2079         CFS_INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns);
2080         ksocknal_data.ksnd_large_fmp.fmp_buff_pages = SOCKNAL_LARGE_FWD_PAGES;
2081
2082         spin_lock_init (&ksocknal_data.ksnd_reaper_lock);
2083         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_enomem_conns);
2084         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_zombie_conns);
2085         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_deathrow_conns);
2086         cfs_waitq_init(&ksocknal_data.ksnd_reaper_waitq);
2087
2088         spin_lock_init (&ksocknal_data.ksnd_autoconnectd_lock);
2089         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_autoconnectd_routes);
2090         cfs_waitq_init(&ksocknal_data.ksnd_autoconnectd_waitq);
2091
2092         /* NB memset above zeros whole of ksocknal_data, including
2093          * ksocknal_data.ksnd_irqinfo[all].ksni_valid */
2094
2095         /* flag lists/ptrs/locks initialised */
2096         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
2097
2098         ksocknal_data.ksnd_nschedulers = ksocknal_nsched();
2099         PORTAL_ALLOC(ksocknal_data.ksnd_schedulers,
2100                      sizeof(ksock_sched_t) * ksocknal_data.ksnd_nschedulers);
2101         if (ksocknal_data.ksnd_schedulers == NULL) {
2102                 ksocknal_api_shutdown (nal);
2103                 return (-ENOMEM);
2104         }
2105
2106         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
2107                 ksock_sched_t *kss = &ksocknal_data.ksnd_schedulers[i];
2108
2109                 spin_lock_init (&kss->kss_lock);
2110                 CFS_INIT_LIST_HEAD (&kss->kss_rx_conns);
2111                 CFS_INIT_LIST_HEAD (&kss->kss_tx_conns);
2112 #if SOCKNAL_ZC
2113                 CFS_INIT_LIST_HEAD (&kss->kss_zctxdone_list);
2114 #endif
2115                 cfs_waitq_init (&kss->kss_waitq);
2116         }
2117
2118         /* NB we have to wait to be told our true NID... */
2119         process_id.pid = requested_pid;
2120         process_id.nid = 0;
2121
2122         rc = lib_init(&ksocknal_lib, nal, process_id,
2123                       requested_limits, actual_limits);
2124         if (rc != PTL_OK) {
2125                 CERROR("lib_init failed: error %d\n", rc);
2126                 ksocknal_api_shutdown (nal);
2127                 return (rc);
2128         }
2129
2130         ksocknal_data.ksnd_init = SOCKNAL_INIT_LIB; // flag lib_init() called
2131
2132         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
2133                 rc = ksocknal_thread_start (ksocknal_scheduler,
2134                                             &ksocknal_data.ksnd_schedulers[i]);
2135                 if (rc != 0) {
2136                         CERROR("Can't spawn socknal scheduler[%d]: %d\n",
2137                                i, rc);
2138                         ksocknal_api_shutdown (nal);
2139                         return (rc);
2140                 }
2141         }
2142
2143         for (i = 0; i < SOCKNAL_N_AUTOCONNECTD; i++) {
2144                 rc = ksocknal_thread_start (ksocknal_autoconnectd, (void *)((long)i));
2145                 if (rc != 0) {
2146                         CERROR("Can't spawn socknal autoconnectd: %d\n", rc);
2147                         ksocknal_api_shutdown (nal);
2148                         return (rc);
2149                 }
2150         }
2151
2152         rc = ksocknal_thread_start (ksocknal_reaper, NULL);
2153         if (rc != 0) {
2154                 CERROR ("Can't spawn socknal reaper: %d\n", rc);
2155                 ksocknal_api_shutdown (nal);
2156                 return (rc);
2157         }
2158
2159         rc = kpr_register(&ksocknal_data.ksnd_router,
2160                           &ksocknal_router_interface);
2161         if (rc != 0) {
2162                 CDEBUG(D_NET, "Can't initialise routing interface "
2163                        "(rc = %d): not routing\n", rc);
2164         } else {
2165                 /* Only allocate forwarding buffers if there's a router */
2166
2167                 for (i = 0; i < (SOCKNAL_SMALL_FWD_NMSGS +
2168                                  SOCKNAL_LARGE_FWD_NMSGS); i++) {
2169                         ksock_fmb_t      *fmb;
2170                         ksock_fmb_pool_t *pool;
2171
2172
2173                         if (i < SOCKNAL_SMALL_FWD_NMSGS)
2174                                 pool = &ksocknal_data.ksnd_small_fmp;
2175                         else
2176                                 pool = &ksocknal_data.ksnd_large_fmp;
2177
2178                         PORTAL_ALLOC(fmb, offsetof(ksock_fmb_t,
2179                                                    fmb_kiov[pool->fmp_buff_pages]));
2180                         if (fmb == NULL) {
2181                                 ksocknal_api_shutdown(nal);
2182                                 return (-ENOMEM);
2183                         }
2184
2185                         fmb->fmb_pool = pool;
2186
2187                         for (j = 0; j < pool->fmp_buff_pages; j++) {
2188                                 fmb->fmb_kiov[j].kiov_page = cfs_alloc_page(CFS_ALLOC_STD);
2189
2190                                 if (fmb->fmb_kiov[j].kiov_page == NULL) {
2191                                         ksocknal_api_shutdown (nal);
2192                                         return (-ENOMEM);
2193                                 }
2194
2195                                 LASSERT(cfs_page_address(fmb->fmb_kiov[j].kiov_page) != NULL);
2196                         }
2197
2198                         list_add(&fmb->fmb_list, &pool->fmp_idle_fmbs);
2199                 }
2200         }
2201
2202         rc = libcfs_nal_cmd_register(SOCKNAL, &ksocknal_cmd, NULL);
2203         if (rc != 0) {
2204                 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
2205                 ksocknal_api_shutdown (nal);
2206                 return (rc);
2207         }
2208
2209         /* flag everything initialised */
2210         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
2211
2212         printk(KERN_INFO "Lustre: Routing socket NAL loaded "
2213                "(Routing %s, initial mem %d, incarnation "LPX64")\n",
2214                kpr_routing (&ksocknal_data.ksnd_router) ?
2215                "enabled" : "disabled", pkmem, ksocknal_data.ksnd_incarnation);
2216
2217         return (0);
2218 }
2219
2220 void __exit
2221 ksocknal_module_fini (void)
2222 {
2223 #ifdef CONFIG_SYSCTL
2224         if (ksocknal_tunables.ksnd_sysctl != NULL)
2225                 unregister_sysctl_table (ksocknal_tunables.ksnd_sysctl);
2226 #endif
2227         PtlNIFini(ksocknal_ni);
2228
2229         ptl_unregister_nal(SOCKNAL);
2230 }
2231
2232 extern cfs_sysctl_table_t ksocknal_top_ctl_table[];
2233
2234 int __init
2235 ksocknal_module_init (void)
2236 {
2237         int    rc;
2238
2239         /* packet descriptor must fit in a router descriptor's scratchpad */
2240         LASSERT(sizeof (ksock_tx_t) <= sizeof (kprfd_scratch_t));
2241         /* the following must be sizeof(int) for proc_dointvec() */
2242         LASSERT(sizeof (ksocknal_tunables.ksnd_io_timeout) == sizeof (int));
2243         LASSERT(sizeof (ksocknal_tunables.ksnd_eager_ack) == sizeof (int));
2244         LASSERT(sizeof (ksocknal_tunables.ksnd_typed_conns) == sizeof (int));
2245         LASSERT(sizeof (ksocknal_tunables.ksnd_min_bulk) == sizeof (int));
2246         LASSERT(sizeof (ksocknal_tunables.ksnd_buffer_size) == sizeof (int));
2247         LASSERT(sizeof (ksocknal_tunables.ksnd_nagle) == sizeof (int));
2248         LASSERT(sizeof (ksocknal_tunables.ksnd_keepalive_idle) == sizeof (int));
2249         LASSERT(sizeof (ksocknal_tunables.ksnd_keepalive_count) == sizeof (int));
2250         LASSERT(sizeof (ksocknal_tunables.ksnd_keepalive_intvl) == sizeof (int));
2251 #if CPU_AFFINITY
2252         LASSERT(sizeof (ksocknal_tunables.ksnd_irq_affinity) == sizeof (int));
2253 #endif
2254 #if SOCKNAL_ZC
2255         LASSERT(sizeof (ksocknal_tunables.ksnd_zc_min_frag) == sizeof (int));
2256 #endif
2257         /* check ksnr_connected/connecting field large enough */
2258         LASSERT(SOCKNAL_CONN_NTYPES <= 4);
2259
2260         ksocknal_api.nal_ni_init = ksocknal_api_startup;
2261         ksocknal_api.nal_ni_fini = ksocknal_api_shutdown;
2262
2263         /* Initialise dynamic tunables to defaults once only */
2264         ksocknal_tunables.ksnd_io_timeout      = SOCKNAL_IO_TIMEOUT;
2265         ksocknal_tunables.ksnd_eager_ack       = SOCKNAL_EAGER_ACK;
2266         ksocknal_tunables.ksnd_typed_conns     = SOCKNAL_TYPED_CONNS;
2267         ksocknal_tunables.ksnd_min_bulk        = SOCKNAL_MIN_BULK;
2268         ksocknal_tunables.ksnd_buffer_size     = SOCKNAL_BUFFER_SIZE;
2269         ksocknal_tunables.ksnd_nagle           = SOCKNAL_NAGLE;
2270         ksocknal_tunables.ksnd_keepalive_idle  = SOCKNAL_KEEPALIVE_IDLE;
2271         ksocknal_tunables.ksnd_keepalive_count = SOCKNAL_KEEPALIVE_COUNT;
2272         ksocknal_tunables.ksnd_keepalive_intvl = SOCKNAL_KEEPALIVE_INTVL;
2273 #if CPU_AFFINITY
2274         ksocknal_tunables.ksnd_irq_affinity = SOCKNAL_IRQ_AFFINITY;
2275 #endif
2276 #if SOCKNAL_ZC
2277         ksocknal_tunables.ksnd_zc_min_frag  = SOCKNAL_ZC_MIN_FRAG;
2278 #endif
2279
2280         rc = ptl_register_nal(SOCKNAL, &ksocknal_api);
2281         if (rc != PTL_OK) {
2282                 CERROR("Can't register SOCKNAL: %d\n", rc);
2283                 return (-ENOMEM);               /* or something... */
2284         }
2285
2286         /* Pure gateways want the NAL started up at module load time... */
2287         rc = PtlNIInit(SOCKNAL, LUSTRE_SRV_PTL_PID, NULL, NULL, &ksocknal_ni);
2288         if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
2289                 ptl_unregister_nal(SOCKNAL);
2290                 return (-ENODEV);
2291         }
2292
2293 #ifdef CONFIG_SYSCTL
2294         /* Press on regardless even if registering sysctl doesn't work */
2295         ksocknal_tunables.ksnd_sysctl =
2296                 register_sysctl_table (ksocknal_top_ctl_table, 0);
2297 #endif
2298         return (0);
2299 }
2300
2301 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2302 MODULE_DESCRIPTION("Kernel TCP Socket NAL v1.0.0");
2303 MODULE_LICENSE("GPL");
2304
2305 cfs_module(ksocknal, "1.0.0", ksocknal_module_init, ksocknal_module_fini);