Whamcloud - gitweb
448871e0aa225bfb1c53f0edc3f5502ca1128a9d
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@zabbo.net>
6  *   Author: Peter J. Braam <braam@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Eric Barton <eric@bartonsoftware.com>
9  *
10  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  *   Portals is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Portals is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Portals; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #include "socknal.h"
27
28 nal_t                   ksocknal_api;
29 ksock_nal_data_t        ksocknal_data;
30 ptl_handle_ni_t         ksocknal_ni;
31 ksock_tunables_t        ksocknal_tunables;
32
33 kpr_nal_interface_t ksocknal_router_interface = {
34         kprni_nalid:      SOCKNAL,
35         kprni_arg:        &ksocknal_data,
36         kprni_fwd:        ksocknal_fwd_packet,
37         kprni_notify:     ksocknal_notify,
38 };
39
40 int
41 ksocknal_set_mynid(ptl_nid_t nid)
42 {
43         lib_ni_t *ni = &ksocknal_lib.libnal_ni;
44
45         /* FIXME: we have to do this because we call lib_init() at module
46          * insertion time, which is before we have 'mynid' available.  lib_init
47          * sets the NAL's nid, which it uses to tell other nodes where packets
48          * are coming from.  This is not a very graceful solution to this
49          * problem. */
50
51         CDEBUG(D_IOCTL, "setting mynid to "LPX64" (old nid="LPX64")\n",
52                nid, ni->ni_pid.nid);
53
54         ni->ni_pid.nid = nid;
55         return (0);
56 }
57
58 ksock_interface_t *
59 ksocknal_ip2iface(__u32 ip)
60 {
61         int                i;
62         ksock_interface_t *iface;
63
64         for (i = 0; i < ksocknal_data.ksnd_ninterfaces; i++) {
65                 LASSERT(i < SOCKNAL_MAX_INTERFACES);
66                 iface = &ksocknal_data.ksnd_interfaces[i];
67
68                 if (iface->ksni_ipaddr == ip)
69                         return (iface);
70         }
71
72         return (NULL);
73 }
74
75 ksock_route_t *
76 ksocknal_create_route (__u32 ipaddr, int port)
77 {
78         ksock_route_t *route;
79
80         PORTAL_ALLOC (route, sizeof (*route));
81         if (route == NULL)
82                 return (NULL);
83
84         atomic_set (&route->ksnr_refcount, 1);
85         route->ksnr_peer = NULL;
86         route->ksnr_timeout = cfs_time_current();
87         route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
88         route->ksnr_ipaddr = ipaddr;
89         route->ksnr_port = port;
90         route->ksnr_connecting = 0;
91         route->ksnr_connected = 0;
92         route->ksnr_deleted = 0;
93         route->ksnr_conn_count = 0;
94         route->ksnr_share_count = 0;
95
96         return (route);
97 }
98
99 void
100 ksocknal_destroy_route (ksock_route_t *route)
101 {
102         if (route->ksnr_peer != NULL)
103                 ksocknal_put_peer (route->ksnr_peer);
104
105         PORTAL_FREE (route, sizeof (*route));
106 }
107
108 void
109 ksocknal_put_route (ksock_route_t *route)
110 {
111         CDEBUG (D_OTHER, "putting route[%p] (%d)\n",
112                 route, atomic_read (&route->ksnr_refcount));
113
114         LASSERT (atomic_read (&route->ksnr_refcount) > 0);
115         if (!atomic_dec_and_test (&route->ksnr_refcount))
116              return;
117
118         ksocknal_destroy_route (route);
119 }
120
121 ksock_peer_t *
122 ksocknal_create_peer (ptl_nid_t nid)
123 {
124         ksock_peer_t *peer;
125
126         LASSERT (nid != PTL_NID_ANY);
127
128         PORTAL_ALLOC (peer, sizeof (*peer));
129         if (peer == NULL)
130                 return (NULL);
131
132         memset (peer, 0, sizeof (*peer));       /* NULL pointers/clear flags etc */
133
134         peer->ksnp_nid = nid;
135         atomic_set (&peer->ksnp_refcount, 1);   /* 1 ref for caller */
136         peer->ksnp_closing = 0;
137         CFS_INIT_LIST_HEAD (&peer->ksnp_conns);
138         CFS_INIT_LIST_HEAD (&peer->ksnp_routes);
139         CFS_INIT_LIST_HEAD (&peer->ksnp_tx_queue);
140
141         atomic_inc (&ksocknal_data.ksnd_npeers);
142         return (peer);
143 }
144
145 void
146 ksocknal_destroy_peer (ksock_peer_t *peer)
147 {
148         CDEBUG (D_NET, "peer "LPX64" %p deleted\n", peer->ksnp_nid, peer);
149
150         LASSERT (atomic_read (&peer->ksnp_refcount) == 0);
151         LASSERT (list_empty (&peer->ksnp_conns));
152         LASSERT (list_empty (&peer->ksnp_routes));
153         LASSERT (list_empty (&peer->ksnp_tx_queue));
154
155         PORTAL_FREE (peer, sizeof (*peer));
156
157         /* NB a peer's connections and autoconnect routes keep a reference
158          * on their peer until they are destroyed, so we can be assured
159          * that _all_ state to do with this peer has been cleaned up when
160          * its refcount drops to zero. */
161         atomic_dec (&ksocknal_data.ksnd_npeers);
162 }
163
164 void
165 ksocknal_put_peer (ksock_peer_t *peer)
166 {
167         CDEBUG (D_OTHER, "putting peer[%p] -> "LPX64" (%d)\n",
168                 peer, peer->ksnp_nid,
169                 atomic_read (&peer->ksnp_refcount));
170
171         LASSERT (atomic_read (&peer->ksnp_refcount) > 0);
172         if (!atomic_dec_and_test (&peer->ksnp_refcount))
173                 return;
174
175         ksocknal_destroy_peer (peer);
176 }
177
178 ksock_peer_t *
179 ksocknal_find_peer_locked (ptl_nid_t nid)
180 {
181         struct list_head *peer_list = ksocknal_nid2peerlist (nid);
182         struct list_head *tmp;
183         ksock_peer_t     *peer;
184
185         list_for_each (tmp, peer_list) {
186
187                 peer = list_entry (tmp, ksock_peer_t, ksnp_list);
188
189                 LASSERT (!peer->ksnp_closing);
190
191                 if (peer->ksnp_nid != nid)
192                         continue;
193
194                 CDEBUG(D_NET, "got peer [%p] -> "LPX64" (%d)\n",
195                        peer, nid, atomic_read (&peer->ksnp_refcount));
196                 return (peer);
197         }
198         return (NULL);
199 }
200
201 ksock_peer_t *
202 ksocknal_get_peer (ptl_nid_t nid)
203 {
204         ksock_peer_t     *peer;
205
206         read_lock (&ksocknal_data.ksnd_global_lock);
207         peer = ksocknal_find_peer_locked (nid);
208         if (peer != NULL)                       /* +1 ref for caller? */
209                 atomic_inc (&peer->ksnp_refcount);
210         read_unlock (&ksocknal_data.ksnd_global_lock);
211
212         return (peer);
213 }
214
215 void
216 ksocknal_unlink_peer_locked (ksock_peer_t *peer)
217 {
218         int                i;
219         __u32              ip;
220
221         for (i = 0; i < peer->ksnp_n_passive_ips; i++) {
222                 LASSERT (i < SOCKNAL_MAX_INTERFACES);
223                 ip = peer->ksnp_passive_ips[i];
224
225                 ksocknal_ip2iface(ip)->ksni_npeers--;
226         }
227
228         LASSERT (list_empty(&peer->ksnp_conns));
229         LASSERT (list_empty(&peer->ksnp_routes));
230         LASSERT (!peer->ksnp_closing);
231         peer->ksnp_closing = 1;
232         list_del (&peer->ksnp_list);
233         /* lose peerlist's ref */
234         ksocknal_put_peer (peer);
235 }
236
237 int
238 ksocknal_get_peer_info (int index, ptl_nid_t *nid,
239                         __u32 *myip, __u32 *peer_ip, int *port,
240                         int *conn_count, int *share_count)
241 {
242         ksock_peer_t      *peer;
243         struct list_head  *ptmp;
244         ksock_route_t     *route;
245         struct list_head  *rtmp;
246         int                i;
247         int                j;
248         int                rc = -ENOENT;
249
250         read_lock (&ksocknal_data.ksnd_global_lock);
251
252         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
253
254                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
255                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
256
257                         if (peer->ksnp_n_passive_ips == 0 &&
258                             list_empty(&peer->ksnp_routes)) {
259                                 if (index-- > 0)
260                                         continue;
261
262                                 *nid = peer->ksnp_nid;
263                                 *myip = 0;
264                                 *peer_ip = 0;
265                                 *port = 0;
266                                 *conn_count = 0;
267                                 *share_count = 0;
268                                 rc = 0;
269                                 goto out;
270                         }
271
272                         for (j = 0; j < peer->ksnp_n_passive_ips; j++) {
273                                 if (index-- > 0)
274                                         continue;
275
276                                 *nid = peer->ksnp_nid;
277                                 *myip = peer->ksnp_passive_ips[j];
278                                 *peer_ip = 0;
279                                 *port = 0;
280                                 *conn_count = 0;
281                                 *share_count = 0;
282                                 rc = 0;
283                                 goto out;
284                         }
285
286                         list_for_each (rtmp, &peer->ksnp_routes) {
287                                 if (index-- > 0)
288                                         continue;
289
290                                 route = list_entry(rtmp, ksock_route_t,
291                                                    ksnr_list);
292
293                                 *nid = peer->ksnp_nid;
294                                 *myip = route->ksnr_myipaddr;
295                                 *peer_ip = route->ksnr_ipaddr;
296                                 *port = route->ksnr_port;
297                                 *conn_count = route->ksnr_conn_count;
298                                 *share_count = route->ksnr_share_count;
299                                 rc = 0;
300                                 goto out;
301                         }
302                 }
303         }
304  out:
305         read_unlock (&ksocknal_data.ksnd_global_lock);
306         return (rc);
307 }
308
309 void
310 ksocknal_associate_route_conn_locked(ksock_route_t *route, ksock_conn_t *conn)
311 {
312         ksock_peer_t      *peer = route->ksnr_peer;
313         int                type = conn->ksnc_type;
314         ksock_interface_t *iface;
315
316         conn->ksnc_route = route;
317         atomic_inc (&route->ksnr_refcount);
318
319         if (route->ksnr_myipaddr != conn->ksnc_myipaddr) {
320                 if (route->ksnr_myipaddr == 0) {
321                         /* route wasn't bound locally yet (the initial route) */
322                         CWARN("Binding "LPX64" %u.%u.%u.%u to %u.%u.%u.%u\n",
323                               peer->ksnp_nid,
324                               HIPQUAD(route->ksnr_ipaddr),
325                               HIPQUAD(conn->ksnc_myipaddr));
326                 } else {
327                         CWARN("Rebinding "LPX64" %u.%u.%u.%u from "
328                               "%u.%u.%u.%u to %u.%u.%u.%u\n",
329                               peer->ksnp_nid,
330                               HIPQUAD(route->ksnr_ipaddr),
331                               HIPQUAD(route->ksnr_myipaddr),
332                               HIPQUAD(conn->ksnc_myipaddr));
333
334                         iface = ksocknal_ip2iface(route->ksnr_myipaddr);
335                         if (iface != NULL)
336                                 iface->ksni_nroutes--;
337                 }
338                 route->ksnr_myipaddr = conn->ksnc_myipaddr;
339                 iface = ksocknal_ip2iface(route->ksnr_myipaddr);
340                 if (iface != NULL)
341                         iface->ksni_nroutes++;
342         }
343
344         route->ksnr_connected |= (1<<type);
345         route->ksnr_conn_count++;
346
347         /* Successful connection => further attempts can
348          * proceed immediately */
349         route->ksnr_timeout = cfs_time_current();
350         route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
351 }
352
353 void
354 ksocknal_add_route_locked (ksock_peer_t *peer, ksock_route_t *route)
355 {
356         struct list_head  *tmp;
357         ksock_conn_t      *conn;
358         int                type;
359         ksock_route_t     *route2;
360
361         LASSERT (route->ksnr_peer == NULL);
362         LASSERT (!route->ksnr_connecting);
363         LASSERT (route->ksnr_connected == 0);
364
365         /* LASSERT(unique) */
366         list_for_each(tmp, &peer->ksnp_routes) {
367                 route2 = list_entry(tmp, ksock_route_t, ksnr_list);
368
369                 if (route2->ksnr_ipaddr == route->ksnr_ipaddr) {
370                         CERROR ("Duplicate route "LPX64" %u.%u.%u.%u\n",
371                                 peer->ksnp_nid, HIPQUAD(route->ksnr_ipaddr));
372                         LBUG();
373                 }
374         }
375
376         route->ksnr_peer = peer;
377         atomic_inc (&peer->ksnp_refcount);
378         /* peer's routelist takes over my ref on 'route' */
379         list_add_tail(&route->ksnr_list, &peer->ksnp_routes);
380
381         list_for_each(tmp, &peer->ksnp_conns) {
382                 conn = list_entry(tmp, ksock_conn_t, ksnc_list);
383                 type = conn->ksnc_type;
384
385                 if (conn->ksnc_ipaddr != route->ksnr_ipaddr)
386                         continue;
387
388                 ksocknal_associate_route_conn_locked(route, conn);
389                 /* keep going (typed routes) */
390         }
391 }
392
393 void
394 ksocknal_del_route_locked (ksock_route_t *route)
395 {
396         ksock_peer_t      *peer = route->ksnr_peer;
397         ksock_interface_t *iface;
398         ksock_conn_t      *conn;
399         struct list_head  *ctmp;
400         struct list_head  *cnxt;
401
402         LASSERT (!route->ksnr_deleted);
403
404         /* Close associated conns */
405         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
406                 conn = list_entry(ctmp, ksock_conn_t, ksnc_list);
407
408                 if (conn->ksnc_route != route)
409                         continue;
410
411                 ksocknal_close_conn_locked (conn, 0);
412         }
413
414         if (route->ksnr_myipaddr != 0) {
415                 iface = ksocknal_ip2iface(route->ksnr_myipaddr);
416                 if (iface != NULL)
417                         iface->ksni_nroutes--;
418         }
419
420         route->ksnr_deleted = 1;
421         list_del (&route->ksnr_list);
422         ksocknal_put_route (route);             /* drop peer's ref */
423
424         if (list_empty (&peer->ksnp_routes) &&
425             list_empty (&peer->ksnp_conns)) {
426                 /* I've just removed the last autoconnect route of a peer
427                  * with no active connections */
428                 ksocknal_unlink_peer_locked (peer);
429         }
430 }
431
432 int
433 ksocknal_add_peer (ptl_nid_t nid, __u32 ipaddr, int port)
434 {
435         unsigned long      flags;
436         struct list_head  *tmp;
437         ksock_peer_t      *peer;
438         ksock_peer_t      *peer2;
439         ksock_route_t     *route;
440         ksock_route_t     *route2;
441
442         if (nid == PTL_NID_ANY)
443                 return (-EINVAL);
444
445         /* Have a brand new peer ready... */
446         peer = ksocknal_create_peer (nid);
447         if (peer == NULL)
448                 return (-ENOMEM);
449
450         route = ksocknal_create_route (ipaddr, port);
451         if (route == NULL) {
452                 ksocknal_put_peer (peer);
453                 return (-ENOMEM);
454         }
455
456         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
457
458         peer2 = ksocknal_find_peer_locked (nid);
459         if (peer2 != NULL) {
460                 ksocknal_put_peer (peer);
461                 peer = peer2;
462         } else {
463                 /* peer table takes my ref on peer */
464                 list_add_tail (&peer->ksnp_list,
465                                ksocknal_nid2peerlist (nid));
466         }
467
468         route2 = NULL;
469         list_for_each (tmp, &peer->ksnp_routes) {
470                 route2 = list_entry(tmp, ksock_route_t, ksnr_list);
471
472                 if (route2->ksnr_ipaddr == ipaddr)
473                         break;
474
475                 route2 = NULL;
476         }
477         if (route2 == NULL) {
478                 ksocknal_add_route_locked(peer, route);
479                 route->ksnr_share_count++;
480         } else {
481                 ksocknal_put_route(route);
482                 route2->ksnr_share_count++;
483         }
484
485         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
486
487         return (0);
488 }
489
490 void
491 ksocknal_del_peer_locked (ksock_peer_t *peer, __u32 ip, int single_share)
492 {
493         ksock_conn_t     *conn;
494         ksock_route_t    *route;
495         struct list_head *tmp;
496         struct list_head *nxt;
497         int               nshared;
498
499         LASSERT (!peer->ksnp_closing);
500
501         /* Extra ref prevents peer disappearing until I'm done with it */
502         atomic_inc(&peer->ksnp_refcount);
503
504         list_for_each_safe (tmp, nxt, &peer->ksnp_routes) {
505                 route = list_entry(tmp, ksock_route_t, ksnr_list);
506
507                 if (single_share && route->ksnr_share_count == 0)
508                         continue;
509
510                 /* no match */
511                 if (!(ip == 0 || route->ksnr_ipaddr == ip))
512                         continue;
513
514                 if (!single_share)
515                         route->ksnr_share_count = 0;
516                 else if (route->ksnr_share_count > 0)
517                         route->ksnr_share_count--;
518
519                 if (route->ksnr_share_count == 0) {
520                         /* This deletes associated conns too */
521                         ksocknal_del_route_locked (route);
522                 }
523
524                 if (single_share)
525                         break;
526         }
527
528         nshared = 0;
529         list_for_each_safe (tmp, nxt, &peer->ksnp_routes) {
530                 route = list_entry(tmp, ksock_route_t, ksnr_list);
531                 nshared += route->ksnr_share_count;
532         }
533
534         if (nshared == 0) {
535                 /* remove everything else if there are no explicit entries
536                  * left */
537
538                 list_for_each_safe (tmp, nxt, &peer->ksnp_routes) {
539                         route = list_entry(tmp, ksock_route_t, ksnr_list);
540
541                         /* we should only be removing auto-entries */
542                         LASSERT(route->ksnr_share_count == 0);
543                         ksocknal_del_route_locked (route);
544                 }
545
546                 list_for_each_safe (tmp, nxt, &peer->ksnp_conns) {
547                         conn = list_entry(tmp, ksock_conn_t, ksnc_list);
548
549                         ksocknal_close_conn_locked(conn, 0);
550                 }
551         }
552
553         ksocknal_put_peer(peer);
554         /* NB peer unlinks itself when last conn/route is removed */
555 }
556
557 int
558 ksocknal_del_peer (ptl_nid_t nid, __u32 ip, int single_share)
559 {
560         unsigned long      flags;
561         struct list_head  *ptmp;
562         struct list_head  *pnxt;
563         ksock_peer_t      *peer;
564         int                lo;
565         int                hi;
566         int                i;
567         int                rc = -ENOENT;
568
569         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
570
571         if (nid != PTL_NID_ANY)
572                 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
573         else {
574                 lo = 0;
575                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
576         }
577
578         for (i = lo; i <= hi; i++) {
579                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
580                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
581
582                         if (!(nid == PTL_NID_ANY || peer->ksnp_nid == nid))
583                                 continue;
584
585                         ksocknal_del_peer_locked (peer, ip, single_share);
586                         rc = 0;                 /* matched! */
587
588                         if (single_share)
589                                 break;
590                 }
591         }
592
593         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
594
595         return (rc);
596 }
597
598 ksock_conn_t *
599 ksocknal_get_conn_by_idx (int index)
600 {
601         ksock_peer_t      *peer;
602         struct list_head  *ptmp;
603         ksock_conn_t      *conn;
604         struct list_head  *ctmp;
605         int                i;
606
607         read_lock (&ksocknal_data.ksnd_global_lock);
608
609         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
610                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
611                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
612
613                         LASSERT (!peer->ksnp_closing);
614
615                         list_for_each (ctmp, &peer->ksnp_conns) {
616                                 if (index-- > 0)
617                                         continue;
618
619                                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
620                                 atomic_inc (&conn->ksnc_refcount);
621                                 read_unlock (&ksocknal_data.ksnd_global_lock);
622                                 return (conn);
623                         }
624                 }
625         }
626
627         read_unlock (&ksocknal_data.ksnd_global_lock);
628         return (NULL);
629 }
630
631 ksock_sched_t *
632 ksocknal_choose_scheduler_locked (unsigned int irq)
633 {
634         ksock_sched_t    *sched;
635         ksock_irqinfo_t  *info;
636         int               i;
637
638         LASSERT (irq < NR_IRQS);
639         info = &ksocknal_data.ksnd_irqinfo[irq];
640
641         if (irq != 0 &&                         /* hardware NIC */
642             info->ksni_valid) {                 /* already set up */
643                 return (&ksocknal_data.ksnd_schedulers[info->ksni_sched]);
644         }
645
646         /* software NIC (irq == 0) || not associated with a scheduler yet.
647          * Choose the CPU with the fewest connections... */
648         sched = &ksocknal_data.ksnd_schedulers[0];
649         for (i = 1; i < ksocknal_data.ksnd_nschedulers; i++)
650                 if (sched->kss_nconns >
651                     ksocknal_data.ksnd_schedulers[i].kss_nconns)
652                         sched = &ksocknal_data.ksnd_schedulers[i];
653
654         if (irq != 0) {                         /* Hardware NIC */
655                 info->ksni_valid = 1;
656                 info->ksni_sched = sched - ksocknal_data.ksnd_schedulers;
657
658                 /* no overflow... */
659                 LASSERT (info->ksni_sched == sched - ksocknal_data.ksnd_schedulers);
660         }
661
662         return (sched);
663 }
664
665 int
666 ksocknal_local_ipvec (__u32 *ipaddrs)
667 {
668         int                i;
669         int                nip;
670
671         read_lock (&ksocknal_data.ksnd_global_lock);
672
673         nip = ksocknal_data.ksnd_ninterfaces;
674         for (i = 0; i < nip; i++) {
675                 LASSERT (i < SOCKNAL_MAX_INTERFACES);
676
677                 ipaddrs[i] = ksocknal_data.ksnd_interfaces[i].ksni_ipaddr;
678                 LASSERT (ipaddrs[i] != 0);
679         }
680
681         read_unlock (&ksocknal_data.ksnd_global_lock);
682         return (nip);
683 }
684
685 int
686 ksocknal_match_peerip (ksock_interface_t *iface, __u32 *ips, int nips)
687 {
688         int   best_netmatch = 0;
689         int   best_xor      = 0;
690         int   best          = -1;
691         int   this_xor;
692         int   this_netmatch;
693         int   i;
694
695         for (i = 0; i < nips; i++) {
696                 if (ips[i] == 0)
697                         continue;
698
699                 this_xor = (ips[i] ^ iface->ksni_ipaddr);
700                 this_netmatch = ((this_xor & iface->ksni_netmask) == 0) ? 1 : 0;
701
702                 if (!(best < 0 ||
703                       best_netmatch < this_netmatch ||
704                       (best_netmatch == this_netmatch &&
705                        best_xor > this_xor)))
706                         continue;
707
708                 best = i;
709                 best_netmatch = this_netmatch;
710                 best_xor = this_xor;
711         }
712
713         LASSERT (best >= 0);
714         return (best);
715 }
716
717 int
718 ksocknal_select_ips(ksock_peer_t *peer, __u32 *peerips, int n_peerips)
719 {
720         rwlock_t           *global_lock = &ksocknal_data.ksnd_global_lock;
721         unsigned long       flags;
722         ksock_interface_t  *iface;
723         ksock_interface_t  *best_iface;
724         int                 n_ips;
725         int                 i;
726         int                 j;
727         int                 k;
728         __u32               ip;
729         __u32               xor;
730         int                 this_netmatch;
731         int                 best_netmatch;
732         int                 best_npeers;
733
734         /* CAVEAT EMPTOR: We do all our interface matching with an
735          * exclusive hold of global lock at IRQ priority.  We're only
736          * expecting to be dealing with small numbers of interfaces, so the
737          * O(n**3)-ness shouldn't matter */
738
739         /* Also note that I'm not going to return more than n_peerips
740          * interfaces, even if I have more myself */
741
742         write_lock_irqsave(global_lock, flags);
743
744         LASSERT (n_peerips <= SOCKNAL_MAX_INTERFACES);
745         LASSERT (ksocknal_data.ksnd_ninterfaces <= SOCKNAL_MAX_INTERFACES);
746
747         n_ips = MIN(n_peerips, ksocknal_data.ksnd_ninterfaces);
748
749         for (i = 0; peer->ksnp_n_passive_ips < n_ips; i++) {
750                 /*              ^ yes really... */
751
752                 /* If we have any new interfaces, first tick off all the
753                  * peer IPs that match old interfaces, then choose new
754                  * interfaces to match the remaining peer IPS.
755                  * We don't forget interfaces we've stopped using; we might
756                  * start using them again... */
757
758                 if (i < peer->ksnp_n_passive_ips) {
759                         /* Old interface. */
760                         ip = peer->ksnp_passive_ips[i];
761                         best_iface = ksocknal_ip2iface(ip);
762
763                         /* peer passive ips are kept up to date */
764                         LASSERT(best_iface != NULL);
765                 } else {
766                         /* choose a new interface */
767                         LASSERT (i == peer->ksnp_n_passive_ips);
768
769                         best_iface = NULL;
770                         best_netmatch = 0;
771                         best_npeers = 0;
772
773                         for (j = 0; j < ksocknal_data.ksnd_ninterfaces; j++) {
774                                 iface = &ksocknal_data.ksnd_interfaces[j];
775                                 ip = iface->ksni_ipaddr;
776
777                                 for (k = 0; k < peer->ksnp_n_passive_ips; k++)
778                                         if (peer->ksnp_passive_ips[k] == ip)
779                                                 break;
780
781                                 if (k < peer->ksnp_n_passive_ips) /* using it already */
782                                         continue;
783
784                                 k = ksocknal_match_peerip(iface, peerips, n_peerips);
785                                 xor = (ip ^ peerips[k]);
786                                 this_netmatch = ((xor & iface->ksni_netmask) == 0) ? 1 : 0;
787
788                                 if (!(best_iface == NULL ||
789                                       best_netmatch < this_netmatch ||
790                                       (best_netmatch == this_netmatch &&
791                                        best_npeers > iface->ksni_npeers)))
792                                         continue;
793
794                                 best_iface = iface;
795                                 best_netmatch = this_netmatch;
796                                 best_npeers = iface->ksni_npeers;
797                         }
798
799                         best_iface->ksni_npeers++;
800                         ip = best_iface->ksni_ipaddr;
801                         peer->ksnp_passive_ips[i] = ip;
802                         peer->ksnp_n_passive_ips = i+1;
803                 }
804
805                 LASSERT (best_iface != NULL);
806
807                 /* mark the best matching peer IP used */
808                 j = ksocknal_match_peerip(best_iface, peerips, n_peerips);
809                 peerips[j] = 0;
810         }
811
812         /* Overwrite input peer IP addresses */
813         memcpy(peerips, peer->ksnp_passive_ips, n_ips * sizeof(*peerips));
814
815         write_unlock_irqrestore(global_lock, flags);
816
817         return (n_ips);
818 }
819
820 void
821 ksocknal_create_routes(ksock_peer_t *peer, int port,
822                        __u32 *peer_ipaddrs, int npeer_ipaddrs)
823 {
824         ksock_route_t      *newroute = NULL;
825         rwlock_t           *global_lock = &ksocknal_data.ksnd_global_lock;
826         unsigned long       flags;
827         struct list_head   *rtmp;
828         ksock_route_t      *route;
829         ksock_interface_t  *iface;
830         ksock_interface_t  *best_iface;
831         int                 best_netmatch;
832         int                 this_netmatch;
833         int                 best_nroutes;
834         int                 i;
835         int                 j;
836
837         /* CAVEAT EMPTOR: We do all our interface matching with an
838          * exclusive hold of global lock at IRQ priority.  We're only
839          * expecting to be dealing with small numbers of interfaces, so the
840          * O(n**3)-ness here shouldn't matter */
841
842         write_lock_irqsave(global_lock, flags);
843
844         LASSERT (npeer_ipaddrs <= SOCKNAL_MAX_INTERFACES);
845
846         for (i = 0; i < npeer_ipaddrs; i++) {
847                 if (newroute != NULL) {
848                         newroute->ksnr_ipaddr = peer_ipaddrs[i];
849                 } else {
850                         write_unlock_irqrestore(global_lock, flags);
851
852                         newroute = ksocknal_create_route(peer_ipaddrs[i], port);
853                         if (newroute == NULL)
854                                 return;
855
856                         write_lock_irqsave(global_lock, flags);
857                 }
858
859                 /* Already got a route? */
860                 route = NULL;
861                 list_for_each(rtmp, &peer->ksnp_routes) {
862                         route = list_entry(rtmp, ksock_route_t, ksnr_list);
863
864                         if (route->ksnr_ipaddr == newroute->ksnr_ipaddr)
865                                 break;
866
867                         route = NULL;
868                 }
869                 if (route != NULL)
870                         continue;
871
872                 best_iface = NULL;
873                 best_nroutes = 0;
874                 best_netmatch = 0;
875
876                 LASSERT (ksocknal_data.ksnd_ninterfaces <= SOCKNAL_MAX_INTERFACES);
877
878                 /* Select interface to connect from */
879                 for (j = 0; j < ksocknal_data.ksnd_ninterfaces; j++) {
880                         iface = &ksocknal_data.ksnd_interfaces[j];
881
882                         /* Using this interface already? */
883                         list_for_each(rtmp, &peer->ksnp_routes) {
884                                 route = list_entry(rtmp, ksock_route_t, ksnr_list);
885
886                                 if (route->ksnr_myipaddr == iface->ksni_ipaddr)
887                                         break;
888
889                                 route = NULL;
890                         }
891                         if (route != NULL)
892                                 continue;
893
894                         this_netmatch = (((iface->ksni_ipaddr ^
895                                            newroute->ksnr_ipaddr) &
896                                            iface->ksni_netmask) == 0) ? 1 : 0;
897
898                         if (!(best_iface == NULL ||
899                               best_netmatch < this_netmatch ||
900                               (best_netmatch == this_netmatch &&
901                                best_nroutes > iface->ksni_nroutes)))
902                                 continue;
903
904                         best_iface = iface;
905                         best_netmatch = this_netmatch;
906                         best_nroutes = iface->ksni_nroutes;
907                 }
908
909                 if (best_iface == NULL)
910                         continue;
911
912                 newroute->ksnr_myipaddr = best_iface->ksni_ipaddr;
913                 best_iface->ksni_nroutes++;
914
915                 ksocknal_add_route_locked(peer, newroute);
916                 newroute = NULL;
917         }
918
919         write_unlock_irqrestore(global_lock, flags);
920         if (newroute != NULL)
921                 ksocknal_put_route(newroute);
922 }
923
924 int
925 ksocknal_create_conn (ksock_route_t *route, struct socket *sock, int type)
926 {
927         int                passive = (type == SOCKNAL_CONN_NONE);
928         rwlock_t          *global_lock = &ksocknal_data.ksnd_global_lock;
929         __u32              ipaddrs[SOCKNAL_MAX_INTERFACES];
930         int                nipaddrs;
931         ptl_nid_t          nid;
932         struct list_head  *tmp;
933         __u64              incarnation;
934         unsigned long      flags;
935         ksock_conn_t      *conn;
936         ksock_conn_t      *conn2;
937         ksock_peer_t      *peer = NULL;
938         ksock_peer_t      *peer2;
939         ksock_sched_t     *sched;
940         unsigned int       irq;
941         ksock_tx_t        *tx;
942         int                rc;
943
944         /* NB, sock has an associated file since (a) this connection might
945          * have been created in userland and (b) we need to refcount the
946          * socket so that we don't close it while I/O is being done on
947          * it, and sock->file has that pre-cooked... */
948         LASSERT (KSN_SOCK2FILE(sock) != NULL);
949         LASSERT (cfs_file_count(KSN_SOCK2FILE(sock)) > 0);
950         LASSERT (route == NULL || !passive);
951
952         rc = ksocknal_lib_setup_sock (sock);
953         if (rc != 0)
954                 return (rc);
955
956         irq = ksocknal_lib_sock_irq (sock);
957
958         PORTAL_ALLOC(conn, sizeof(*conn));
959         if (conn == NULL)
960                 return (-ENOMEM);
961
962         memset (conn, 0, sizeof (*conn));
963         conn->ksnc_peer = NULL;
964         conn->ksnc_route = NULL;
965         conn->ksnc_sock = sock;
966         conn->ksnc_type = type;
967         ksocknal_lib_save_callback(sock, conn);
968         atomic_set (&conn->ksnc_refcount, 1);    /* 1 ref for me */
969
970         conn->ksnc_rx_ready = 0;
971         conn->ksnc_rx_scheduled = 0;
972         ksocknal_new_packet (conn, 0);
973
974         CFS_INIT_LIST_HEAD (&conn->ksnc_tx_queue);
975         conn->ksnc_tx_ready = 0;
976         conn->ksnc_tx_scheduled = 0;
977         atomic_set (&conn->ksnc_tx_nob, 0);
978
979         /* stash conn's local and remote addrs */
980         rc = ksocknal_lib_get_conn_addrs (conn);
981         if (rc != 0)
982                 goto failed_0;
983
984         if (!passive) {
985                 /* Active connection sends HELLO eagerly */
986                 rc = ksocknal_local_ipvec(ipaddrs);
987                 if (rc < 0)
988                         goto failed_0;
989                 nipaddrs = rc;
990
991                 rc = ksocknal_send_hello (conn, ipaddrs, nipaddrs);
992                 if (rc != 0)
993                         goto failed_0;
994         }
995
996         /* Find out/confirm peer's NID and connection type and get the
997          * vector of interfaces she's willing to let me connect to */
998         nid = (route == NULL) ? PTL_NID_ANY : route->ksnr_peer->ksnp_nid;
999         rc = ksocknal_recv_hello (conn, &nid, &incarnation, ipaddrs);
1000         if (rc < 0)
1001                 goto failed_0;
1002         nipaddrs = rc;
1003         LASSERT (nid != PTL_NID_ANY);
1004
1005         if (route != NULL) {
1006                 peer = route->ksnr_peer;
1007                 atomic_inc(&peer->ksnp_refcount);
1008         } else {
1009                 peer = ksocknal_create_peer(nid);
1010                 if (peer == NULL) {
1011                         rc = -ENOMEM;
1012                         goto failed_0;
1013                 }
1014
1015                 write_lock_irqsave(global_lock, flags);
1016
1017                 peer2 = ksocknal_find_peer_locked(nid);
1018                 if (peer2 == NULL) {
1019                         /* NB this puts an "empty" peer in the peer
1020                          * table (which takes my ref) */
1021                         list_add_tail(&peer->ksnp_list,
1022                                       ksocknal_nid2peerlist(nid));
1023                 } else  {
1024                         ksocknal_put_peer(peer);
1025                         peer = peer2;
1026                 }
1027                 /* +1 ref for me */
1028                 atomic_inc(&peer->ksnp_refcount);
1029
1030                 write_unlock_irqrestore(global_lock, flags);
1031         }
1032
1033         if (!passive) {
1034                 ksocknal_create_routes(peer, conn->ksnc_port,
1035                                        ipaddrs, nipaddrs);
1036                 rc = 0;
1037         } else {
1038                 rc = ksocknal_select_ips(peer, ipaddrs, nipaddrs);
1039                 LASSERT (rc >= 0);
1040                 rc = ksocknal_send_hello (conn, ipaddrs, rc);
1041         }
1042         if (rc < 0)
1043                 goto failed_1;
1044
1045         write_lock_irqsave (global_lock, flags);
1046
1047         if (peer->ksnp_closing ||
1048             (route != NULL && route->ksnr_deleted)) {
1049                 /* route/peer got closed under me */
1050                 rc = -ESTALE;
1051                 goto failed_2;
1052         }
1053
1054         /* Refuse to duplicate an existing connection (both sides might
1055          * autoconnect at once), unless this is a loopback connection */
1056         if (conn->ksnc_ipaddr != conn->ksnc_myipaddr) {
1057                 list_for_each(tmp, &peer->ksnp_conns) {
1058                         conn2 = list_entry(tmp, ksock_conn_t, ksnc_list);
1059
1060                         if (conn2->ksnc_ipaddr != conn->ksnc_ipaddr ||
1061                             conn2->ksnc_myipaddr != conn->ksnc_myipaddr ||
1062                             conn2->ksnc_type != conn->ksnc_type ||
1063                             conn2->ksnc_incarnation != incarnation)
1064                                 continue;
1065
1066                         CWARN("Not creating duplicate connection to "
1067                               "%u.%u.%u.%u type %d\n",
1068                               HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_type);
1069                         rc = -EALREADY;
1070                         goto failed_2;
1071                 }
1072         }
1073
1074         /* If the connection created by this route didn't bind to the IP
1075          * address the route connected to, the connection/route matching
1076          * code below probably isn't going to work. */
1077         if (route != NULL &&
1078             route->ksnr_ipaddr != conn->ksnc_ipaddr) {
1079                 CERROR("Route "LPX64" %u.%u.%u.%u connected to %u.%u.%u.%u\n",
1080                        peer->ksnp_nid,
1081                        HIPQUAD(route->ksnr_ipaddr),
1082                        HIPQUAD(conn->ksnc_ipaddr));
1083         }
1084
1085         /* Search for a route corresponding to the new connection and
1086          * create an association.  This allows incoming connections created
1087          * by routes in my peer to match my own route entries so I don't
1088          * continually create duplicate routes. */
1089         list_for_each (tmp, &peer->ksnp_routes) {
1090                 route = list_entry(tmp, ksock_route_t, ksnr_list);
1091
1092                 if (route->ksnr_ipaddr != conn->ksnc_ipaddr)
1093                         continue;
1094
1095                 ksocknal_associate_route_conn_locked(route, conn);
1096                 break;
1097         }
1098
1099         /* Give conn a ref on sock->file since we're going to return success */
1100         cfs_get_file(KSN_SOCK2FILE(sock));
1101
1102         conn->ksnc_peer = peer;                 /* conn takes my ref on peer */
1103         conn->ksnc_incarnation = incarnation;
1104         peer->ksnp_last_alive = cfs_time_current();
1105         peer->ksnp_error = 0;
1106
1107         sched = ksocknal_choose_scheduler_locked (irq);
1108         sched->kss_nconns++;
1109         conn->ksnc_scheduler = sched;
1110
1111         /* Set the deadline for the outgoing HELLO to drain */
1112         conn->ksnc_tx_bufnob = SOCK_WMEM_QUEUED(sock);
1113         conn->ksnc_tx_deadline = cfs_time_shift(ksocknal_tunables.ksnd_io_timeout);
1114         mb();       /* order with adding to peer's conn list */
1115
1116         list_add (&conn->ksnc_list, &peer->ksnp_conns);
1117         atomic_inc (&conn->ksnc_refcount);
1118
1119         /* NB my callbacks block while I hold ksnd_global_lock */
1120         ksocknal_lib_set_callback(sock, conn);
1121
1122         /* Take all the packets blocking for a connection.
1123          * NB, it might be nicer to share these blocked packets among any
1124          * other connections that are becoming established. */
1125         while (!list_empty (&peer->ksnp_tx_queue)) {
1126                 tx = list_entry (peer->ksnp_tx_queue.next,
1127                                  ksock_tx_t, tx_list);
1128
1129                 list_del (&tx->tx_list);
1130                 ksocknal_queue_tx_locked (tx, conn);
1131         }
1132
1133         rc = ksocknal_close_stale_conns_locked(peer, incarnation);
1134         if (rc != 0)
1135                 CERROR ("Closed %d stale conns to nid "LPX64" ip %d.%d.%d.%d\n",
1136                         rc, conn->ksnc_peer->ksnp_nid,
1137                         HIPQUAD(conn->ksnc_ipaddr));
1138
1139         write_unlock_irqrestore (global_lock, flags);
1140
1141         ksocknal_lib_bind_irq (irq);
1142
1143         /* Call the callbacks right now to get things going. */
1144         if (ksocknal_getconnsock(conn) == 0) {
1145                 ksocknal_lib_act_callback(sock, conn);
1146                 ksocknal_putconnsock(conn);
1147         }
1148
1149         CWARN("New conn nid:"LPX64" %u.%u.%u.%u -> %u.%u.%u.%u/%d"
1150               " incarnation:"LPX64" sched[%d]/%d\n",
1151               nid, HIPQUAD(conn->ksnc_myipaddr),
1152               HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port, incarnation,
1153               (int)(conn->ksnc_scheduler - ksocknal_data.ksnd_schedulers), irq);
1154
1155         ksocknal_put_conn (conn);
1156         return (0);
1157
1158  failed_2:
1159         if (!peer->ksnp_closing &&
1160             list_empty (&peer->ksnp_conns) &&
1161             list_empty (&peer->ksnp_routes))
1162                 ksocknal_unlink_peer_locked(peer);
1163         write_unlock_irqrestore(global_lock, flags);
1164
1165  failed_1:
1166         ksocknal_put_peer (peer);
1167
1168  failed_0:
1169         PORTAL_FREE (conn, sizeof(*conn));
1170
1171         LASSERT (rc != 0);
1172         return (rc);
1173 }
1174
1175 void
1176 ksocknal_close_conn_locked (ksock_conn_t *conn, int error)
1177 {
1178         /* This just does the immmediate housekeeping, and queues the
1179          * connection for the reaper to terminate.
1180          * Caller holds ksnd_global_lock exclusively in irq context */
1181         ksock_peer_t      *peer = conn->ksnc_peer;
1182         ksock_route_t     *route;
1183         ksock_conn_t      *conn2;
1184         struct list_head  *tmp;
1185
1186         LASSERT (peer->ksnp_error == 0);
1187         LASSERT (!conn->ksnc_closing);
1188         conn->ksnc_closing = 1;
1189         atomic_inc (&ksocknal_data.ksnd_nclosing_conns);
1190
1191         /* ksnd_deathrow_conns takes over peer's ref */
1192         list_del (&conn->ksnc_list);
1193
1194         route = conn->ksnc_route;
1195         if (route != NULL) {
1196                 /* dissociate conn from route... */
1197                 LASSERT (!route->ksnr_deleted);
1198                 LASSERT ((route->ksnr_connected & (1 << conn->ksnc_type)) != 0);
1199
1200                 conn2 = NULL;
1201                 list_for_each(tmp, &peer->ksnp_conns) {
1202                         conn2 = list_entry(tmp, ksock_conn_t, ksnc_list);
1203
1204                         if (conn2->ksnc_route == route &&
1205                             conn2->ksnc_type == conn->ksnc_type)
1206                                 break;
1207
1208                         conn2 = NULL;
1209                 }
1210                 if (conn2 == NULL)
1211                         route->ksnr_connected &= ~(1 << conn->ksnc_type);
1212
1213                 conn->ksnc_route = NULL;
1214
1215 #if 0           /* irrelevent with only eager routes */
1216                 list_del (&route->ksnr_list);   /* make route least favourite */
1217                 list_add_tail (&route->ksnr_list, &peer->ksnp_routes);
1218 #endif
1219                 ksocknal_put_route (route);     /* drop conn's ref on route */
1220         }
1221
1222         if (list_empty (&peer->ksnp_conns)) {
1223                 /* No more connections to this peer */
1224
1225                 peer->ksnp_error = error;       /* stash last conn close reason */
1226
1227                 if (list_empty (&peer->ksnp_routes)) {
1228                         /* I've just closed last conn belonging to a
1229                          * non-autoconnecting peer */
1230                         ksocknal_unlink_peer_locked (peer);
1231                 }
1232         }
1233
1234         spin_lock (&ksocknal_data.ksnd_reaper_lock);
1235
1236         list_add_tail (&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
1237         cfs_waitq_signal (&ksocknal_data.ksnd_reaper_waitq);
1238
1239         spin_unlock (&ksocknal_data.ksnd_reaper_lock);
1240 }
1241
1242 void
1243 ksocknal_terminate_conn (ksock_conn_t *conn)
1244 {
1245         /* This gets called by the reaper (guaranteed thread context) to
1246          * disengage the socket from its callbacks and close it.
1247          * ksnc_refcount will eventually hit zero, and then the reaper will
1248          * destroy it. */
1249         unsigned long   flags;
1250         ksock_peer_t   *peer = conn->ksnc_peer;
1251         ksock_sched_t  *sched = conn->ksnc_scheduler;
1252         struct timeval  now;
1253         time_t          then = 0;
1254         int             notify = 0;
1255
1256         LASSERT(conn->ksnc_closing);
1257
1258         /* wake up the scheduler to "send" all remaining packets to /dev/null */
1259         spin_lock_irqsave(&sched->kss_lock, flags);
1260
1261         if (!conn->ksnc_tx_scheduled &&
1262             !list_empty(&conn->ksnc_tx_queue)){
1263                 list_add_tail (&conn->ksnc_tx_list,
1264                                &sched->kss_tx_conns);
1265                 /* a closing conn is always ready to tx */
1266                 conn->ksnc_tx_ready = 1;
1267                 conn->ksnc_tx_scheduled = 1;
1268                 /* extra ref for scheduler */
1269                 atomic_inc (&conn->ksnc_refcount);
1270
1271                 cfs_waitq_signal (&sched->kss_waitq);
1272         }
1273
1274         spin_unlock_irqrestore (&sched->kss_lock, flags);
1275
1276         /* serialise with callbacks */
1277         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1278
1279         ksocknal_lib_reset_callback(conn->ksnc_sock, conn);
1280
1281         /* OK, so this conn may not be completely disengaged from its
1282          * scheduler yet, but it _has_ committed to terminate... */
1283         conn->ksnc_scheduler->kss_nconns--;
1284
1285         if (peer->ksnp_error != 0) {
1286                 /* peer's last conn closed in error */
1287                 LASSERT (list_empty (&peer->ksnp_conns));
1288
1289                 /* convert peer's last-known-alive timestamp from jiffies */
1290                 do_gettimeofday (&now);
1291                 then = now.tv_sec - cfs_duration_sec(cfs_time_sub(cfs_time_current(),
1292                                                                   peer->ksnp_last_alive));
1293                 notify = 1;
1294         }
1295
1296         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1297
1298         /* The socket is closed on the final put; either here, or in
1299          * ksocknal_{send,recv}msg().  Since we set up the linger2 option
1300          * when the connection was established, this will close the socket
1301          * immediately, aborting anything buffered in it. Any hung
1302          * zero-copy transmits will therefore complete in finite time. */
1303         ksocknal_putconnsock (conn);
1304
1305         if (notify)
1306                 kpr_notify (&ksocknal_data.ksnd_router, peer->ksnp_nid,
1307                             0, then);
1308 }
1309
1310 void
1311 ksocknal_destroy_conn (ksock_conn_t *conn)
1312 {
1313         /* Final coup-de-grace of the reaper */
1314         CDEBUG (D_NET, "connection %p\n", conn);
1315
1316         LASSERT (atomic_read (&conn->ksnc_refcount) == 0);
1317         LASSERT (conn->ksnc_route == NULL);
1318         LASSERT (!conn->ksnc_tx_scheduled);
1319         LASSERT (!conn->ksnc_rx_scheduled);
1320         LASSERT (list_empty(&conn->ksnc_tx_queue));
1321
1322         /* complete current receive if any */
1323         switch (conn->ksnc_rx_state) {
1324         case SOCKNAL_RX_BODY:
1325                 CERROR("Completing partial receive from "LPX64
1326                        ", ip %d.%d.%d.%d:%d, with error\n",
1327                        conn->ksnc_peer->ksnp_nid,
1328                        HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
1329                 lib_finalize (&ksocknal_lib, NULL, conn->ksnc_cookie, PTL_FAIL);
1330                 break;
1331         case SOCKNAL_RX_BODY_FWD:
1332                 ksocknal_fmb_callback (conn->ksnc_cookie, -ECONNABORTED);
1333                 break;
1334         case SOCKNAL_RX_HEADER:
1335         case SOCKNAL_RX_SLOP:
1336                 break;
1337         default:
1338                 LBUG ();
1339                 break;
1340         }
1341
1342         ksocknal_put_peer (conn->ksnc_peer);
1343
1344         PORTAL_FREE (conn, sizeof (*conn));
1345         atomic_dec (&ksocknal_data.ksnd_nclosing_conns);
1346 }
1347
1348 void
1349 ksocknal_put_conn (ksock_conn_t *conn)
1350 {
1351         unsigned long flags;
1352
1353         CDEBUG (D_OTHER, "putting conn[%p] -> "LPX64" (%d)\n",
1354                 conn, conn->ksnc_peer->ksnp_nid,
1355                 atomic_read (&conn->ksnc_refcount));
1356
1357         LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
1358         if (!atomic_dec_and_test (&conn->ksnc_refcount))
1359                 return;
1360
1361         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
1362
1363         list_add (&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1364         cfs_waitq_signal (&ksocknal_data.ksnd_reaper_waitq);
1365
1366         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
1367 }
1368
1369 int
1370 ksocknal_close_peer_conns_locked (ksock_peer_t *peer, __u32 ipaddr, int why)
1371 {
1372         ksock_conn_t       *conn;
1373         struct list_head   *ctmp;
1374         struct list_head   *cnxt;
1375         int                 count = 0;
1376
1377         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1378                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
1379
1380                 if (ipaddr == 0 ||
1381                     conn->ksnc_ipaddr == ipaddr) {
1382                         count++;
1383                         ksocknal_close_conn_locked (conn, why);
1384                 }
1385         }
1386
1387         return (count);
1388 }
1389
1390 int
1391 ksocknal_close_stale_conns_locked (ksock_peer_t *peer, __u64 incarnation)
1392 {
1393         ksock_conn_t       *conn;
1394         struct list_head   *ctmp;
1395         struct list_head   *cnxt;
1396         int                 count = 0;
1397
1398         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1399                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
1400
1401                 if (conn->ksnc_incarnation == incarnation)
1402                         continue;
1403
1404                 CWARN("Closing stale conn nid:"LPX64" ip:%08x/%d "
1405                       "incarnation:"LPX64"("LPX64")\n",
1406                       peer->ksnp_nid, conn->ksnc_ipaddr, conn->ksnc_port,
1407                       conn->ksnc_incarnation, incarnation);
1408
1409                 count++;
1410                 ksocknal_close_conn_locked (conn, -ESTALE);
1411         }
1412
1413         return (count);
1414 }
1415
1416 int
1417 ksocknal_close_conn_and_siblings (ksock_conn_t *conn, int why)
1418 {
1419         ksock_peer_t     *peer = conn->ksnc_peer;
1420         __u32             ipaddr = conn->ksnc_ipaddr;
1421         unsigned long     flags;
1422         int               count;
1423
1424         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1425
1426         count = ksocknal_close_peer_conns_locked (peer, ipaddr, why);
1427
1428         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1429
1430         return (count);
1431 }
1432
1433 int
1434 ksocknal_close_matching_conns (ptl_nid_t nid, __u32 ipaddr)
1435 {
1436         unsigned long       flags;
1437         ksock_peer_t       *peer;
1438         struct list_head   *ptmp;
1439         struct list_head   *pnxt;
1440         int                 lo;
1441         int                 hi;
1442         int                 i;
1443         int                 count = 0;
1444
1445         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1446
1447         if (nid != PTL_NID_ANY)
1448                 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
1449         else {
1450                 lo = 0;
1451                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
1452         }
1453
1454         for (i = lo; i <= hi; i++) {
1455                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
1456
1457                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
1458
1459                         if (!(nid == PTL_NID_ANY || nid == peer->ksnp_nid))
1460                                 continue;
1461
1462                         count += ksocknal_close_peer_conns_locked (peer, ipaddr, 0);
1463                 }
1464         }
1465
1466         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1467
1468         /* wildcards always succeed */
1469         if (nid == PTL_NID_ANY || ipaddr == 0)
1470                 return (0);
1471
1472         return (count == 0 ? -ENOENT : 0);
1473 }
1474
1475 void
1476 ksocknal_notify (void *arg, ptl_nid_t gw_nid, int alive)
1477 {
1478         /* The router is telling me she's been notified of a change in
1479          * gateway state.... */
1480
1481         CDEBUG (D_NET, "gw "LPX64" %s\n", gw_nid, alive ? "up" : "down");
1482
1483         if (!alive) {
1484                 /* If the gateway crashed, close all open connections... */
1485                 ksocknal_close_matching_conns (gw_nid, 0);
1486                 return;
1487         }
1488
1489         /* ...otherwise do nothing.  We can only establish new connections
1490          * if we have autroutes, and these connect on demand. */
1491 }
1492
1493 void
1494 ksocknal_push_peer (ksock_peer_t *peer)
1495 {
1496         int               index;
1497         int               i;
1498         struct list_head *tmp;
1499         ksock_conn_t     *conn;
1500
1501         for (index = 0; ; index++) {
1502                 read_lock (&ksocknal_data.ksnd_global_lock);
1503
1504                 i = 0;
1505                 conn = NULL;
1506
1507                 list_for_each (tmp, &peer->ksnp_conns) {
1508                         if (i++ == index) {
1509                                 conn = list_entry (tmp, ksock_conn_t, ksnc_list);
1510                                 atomic_inc (&conn->ksnc_refcount);
1511                                 break;
1512                         }
1513                 }
1514
1515                 read_unlock (&ksocknal_data.ksnd_global_lock);
1516
1517                 if (conn == NULL)
1518                         break;
1519
1520                 ksocknal_lib_push_conn (conn);
1521                 ksocknal_put_conn (conn);
1522         }
1523 }
1524
1525 int
1526 ksocknal_push (ptl_nid_t nid)
1527 {
1528         ksock_peer_t      *peer;
1529         struct list_head  *tmp;
1530         int                index;
1531         int                i;
1532         int                j;
1533         int                rc = -ENOENT;
1534
1535         if (nid != PTL_NID_ANY) {
1536                 peer = ksocknal_get_peer (nid);
1537
1538                 if (peer != NULL) {
1539                         rc = 0;
1540                         ksocknal_push_peer (peer);
1541                         ksocknal_put_peer (peer);
1542                 }
1543                 return (rc);
1544         }
1545
1546         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1547                 for (j = 0; ; j++) {
1548                         read_lock (&ksocknal_data.ksnd_global_lock);
1549
1550                         index = 0;
1551                         peer = NULL;
1552
1553                         list_for_each (tmp, &ksocknal_data.ksnd_peers[i]) {
1554                                 if (index++ == j) {
1555                                         peer = list_entry(tmp, ksock_peer_t,
1556                                                           ksnp_list);
1557                                         atomic_inc (&peer->ksnp_refcount);
1558                                         break;
1559                                 }
1560                         }
1561
1562                         read_unlock (&ksocknal_data.ksnd_global_lock);
1563
1564                         if (peer != NULL) {
1565                                 rc = 0;
1566                                 ksocknal_push_peer (peer);
1567                                 ksocknal_put_peer (peer);
1568                         }
1569                 }
1570
1571         }
1572
1573         return (rc);
1574 }
1575
1576 int
1577 ksocknal_add_interface(__u32 ipaddress, __u32 netmask)
1578 {
1579         unsigned long      flags;
1580         ksock_interface_t *iface;
1581         int                rc;
1582         int                i;
1583         int                j;
1584         struct list_head  *ptmp;
1585         ksock_peer_t      *peer;
1586         struct list_head  *rtmp;
1587         ksock_route_t     *route;
1588
1589         if (ipaddress == 0 ||
1590             netmask == 0)
1591                 return (-EINVAL);
1592
1593         write_lock_irqsave(&ksocknal_data.ksnd_global_lock, flags);
1594
1595         iface = ksocknal_ip2iface(ipaddress);
1596         if (iface != NULL) {
1597                 /* silently ignore dups */
1598                 rc = 0;
1599         } else if (ksocknal_data.ksnd_ninterfaces == SOCKNAL_MAX_INTERFACES) {
1600                 rc = -ENOSPC;
1601         } else {
1602                 iface = &ksocknal_data.ksnd_interfaces[ksocknal_data.ksnd_ninterfaces++];
1603
1604                 iface->ksni_ipaddr = ipaddress;
1605                 iface->ksni_netmask = netmask;
1606                 iface->ksni_nroutes = 0;
1607                 iface->ksni_npeers = 0;
1608
1609                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1610                         list_for_each(ptmp, &ksocknal_data.ksnd_peers[i]) {
1611                                 peer = list_entry(ptmp, ksock_peer_t, ksnp_list);
1612
1613                                 for (j = 0; i < peer->ksnp_n_passive_ips; j++)
1614                                         if (peer->ksnp_passive_ips[j] == ipaddress)
1615                                                 iface->ksni_npeers++;
1616
1617                                 list_for_each(rtmp, &peer->ksnp_routes) {
1618                                         route = list_entry(rtmp, ksock_route_t, ksnr_list);
1619
1620                                         if (route->ksnr_myipaddr == ipaddress)
1621                                                 iface->ksni_nroutes++;
1622                                 }
1623                         }
1624                 }
1625
1626                 rc = 0;
1627                 /* NB only new connections will pay attention to the new interface! */
1628         }
1629
1630         write_unlock_irqrestore(&ksocknal_data.ksnd_global_lock, flags);
1631
1632         return (rc);
1633 }
1634
1635 void
1636 ksocknal_peer_del_interface_locked(ksock_peer_t *peer, __u32 ipaddr)
1637 {
1638         struct list_head   *tmp;
1639         struct list_head   *nxt;
1640         ksock_route_t      *route;
1641         ksock_conn_t       *conn;
1642         int                 i;
1643         int                 j;
1644
1645         for (i = 0; i < peer->ksnp_n_passive_ips; i++)
1646                 if (peer->ksnp_passive_ips[i] == ipaddr) {
1647                         for (j = i+1; j < peer->ksnp_n_passive_ips; j++)
1648                                 peer->ksnp_passive_ips[j-1] =
1649                                         peer->ksnp_passive_ips[j];
1650                         peer->ksnp_n_passive_ips--;
1651                         break;
1652                 }
1653
1654         list_for_each_safe(tmp, nxt, &peer->ksnp_routes) {
1655                 route = list_entry (tmp, ksock_route_t, ksnr_list);
1656
1657                 if (route->ksnr_myipaddr != ipaddr)
1658                         continue;
1659
1660                 if (route->ksnr_share_count != 0) {
1661                         /* Manually created; keep, but unbind */
1662                         route->ksnr_myipaddr = 0;
1663                 } else {
1664                         ksocknal_del_route_locked(route);
1665                 }
1666         }
1667
1668         list_for_each_safe(tmp, nxt, &peer->ksnp_conns) {
1669                 conn = list_entry(tmp, ksock_conn_t, ksnc_list);
1670
1671                 if (conn->ksnc_myipaddr == ipaddr)
1672                         ksocknal_close_conn_locked (conn, 0);
1673         }
1674 }
1675
1676 int
1677 ksocknal_del_interface(__u32 ipaddress)
1678 {
1679         int                rc = -ENOENT;
1680         unsigned long      flags;
1681         struct list_head  *tmp;
1682         struct list_head  *nxt;
1683         ksock_peer_t      *peer;
1684         __u32              this_ip;
1685         int                i;
1686         int                j;
1687
1688         write_lock_irqsave(&ksocknal_data.ksnd_global_lock, flags);
1689
1690         for (i = 0; i < ksocknal_data.ksnd_ninterfaces; i++) {
1691                 this_ip = ksocknal_data.ksnd_interfaces[i].ksni_ipaddr;
1692
1693                 if (!(ipaddress == 0 ||
1694                       ipaddress == this_ip))
1695                         continue;
1696
1697                 rc = 0;
1698
1699                 for (j = i+1; j < ksocknal_data.ksnd_ninterfaces; j++)
1700                         ksocknal_data.ksnd_interfaces[j-1] =
1701                                 ksocknal_data.ksnd_interfaces[j];
1702
1703                 ksocknal_data.ksnd_ninterfaces--;
1704
1705                 for (j = 0; j < ksocknal_data.ksnd_peer_hash_size; j++) {
1706                         list_for_each_safe(tmp, nxt, &ksocknal_data.ksnd_peers[j]) {
1707                                 peer = list_entry(tmp, ksock_peer_t, ksnp_list);
1708
1709                                 ksocknal_peer_del_interface_locked(peer, this_ip);
1710                         }
1711                 }
1712         }
1713
1714         write_unlock_irqrestore(&ksocknal_data.ksnd_global_lock, flags);
1715
1716         return (rc);
1717 }
1718
1719 int
1720 ksocknal_cmd(struct portals_cfg *pcfg, void * private)
1721 {
1722         int rc;
1723
1724         switch(pcfg->pcfg_command) {
1725         case NAL_CMD_GET_INTERFACE: {
1726                 ksock_interface_t *iface;
1727
1728                 read_lock (&ksocknal_data.ksnd_global_lock);
1729
1730                 if (pcfg->pcfg_count < 0 ||
1731                     pcfg->pcfg_count >= ksocknal_data.ksnd_ninterfaces) {
1732                         rc = -ENOENT;
1733                 } else {
1734                         rc = 0;
1735                         iface = &ksocknal_data.ksnd_interfaces[pcfg->pcfg_count];
1736
1737                         pcfg->pcfg_id    = iface->ksni_ipaddr;
1738                         pcfg->pcfg_misc  = iface->ksni_netmask;
1739                         pcfg->pcfg_fd    = iface->ksni_npeers;
1740                         pcfg->pcfg_count = iface->ksni_nroutes;
1741                 }
1742
1743                 read_unlock (&ksocknal_data.ksnd_global_lock);
1744                 break;
1745         }
1746         case NAL_CMD_ADD_INTERFACE: {
1747                 rc = ksocknal_add_interface(pcfg->pcfg_id, /* IP address */
1748                                             pcfg->pcfg_misc); /* net mask */
1749                 break;
1750         }
1751         case NAL_CMD_DEL_INTERFACE: {
1752                 rc = ksocknal_del_interface(pcfg->pcfg_id); /* IP address */
1753                 break;
1754         }
1755         case NAL_CMD_GET_PEER: {
1756                 ptl_nid_t    nid = 0;
1757                 __u32        myip = 0;
1758                 __u32        ip = 0;
1759                 int          port = 0;
1760                 int          conn_count = 0;
1761                 int          share_count = 0;
1762
1763                 rc = ksocknal_get_peer_info(pcfg->pcfg_count, &nid,
1764                                             &myip, &ip, &port,
1765                                             &conn_count,  &share_count);
1766                 pcfg->pcfg_nid   = nid;
1767                 pcfg->pcfg_size  = myip;
1768                 pcfg->pcfg_id    = ip;
1769                 pcfg->pcfg_misc  = port;
1770                 pcfg->pcfg_count = conn_count;
1771                 pcfg->pcfg_wait  = share_count;
1772                 break;
1773         }
1774         case NAL_CMD_ADD_PEER: {
1775                 rc = ksocknal_add_peer (pcfg->pcfg_nid,
1776                                         pcfg->pcfg_id, /* IP */
1777                                         pcfg->pcfg_misc); /* port */
1778                 break;
1779         }
1780         case NAL_CMD_DEL_PEER: {
1781                 rc = ksocknal_del_peer (pcfg->pcfg_nid,
1782                                         pcfg->pcfg_id, /* IP */
1783                                         pcfg->pcfg_flags); /* single_share? */
1784                 break;
1785         }
1786         case NAL_CMD_GET_CONN: {
1787                 ksock_conn_t *conn = ksocknal_get_conn_by_idx (pcfg->pcfg_count);
1788
1789                 if (conn == NULL)
1790                         rc = -ENOENT;
1791                 else {
1792                         int   txmem;
1793                         int   rxmem;
1794                         int   nagle;
1795
1796                         ksocknal_lib_get_conn_tunables(conn, &txmem, &rxmem, &nagle);
1797
1798                         rc = 0;
1799                         pcfg->pcfg_nid    = conn->ksnc_peer->ksnp_nid;
1800                         pcfg->pcfg_id     = conn->ksnc_ipaddr;
1801                         pcfg->pcfg_misc   = conn->ksnc_port;
1802                         pcfg->pcfg_fd     = conn->ksnc_myipaddr;
1803                         pcfg->pcfg_flags  = conn->ksnc_type;
1804                         pcfg->pcfg_gw_nal = conn->ksnc_scheduler -
1805                                             ksocknal_data.ksnd_schedulers;
1806                         pcfg->pcfg_count  = txmem;
1807                         pcfg->pcfg_size   = rxmem;
1808                         pcfg->pcfg_wait   = nagle;
1809                         ksocknal_put_conn (conn);
1810                 }
1811                 break;
1812         }
1813         case NAL_CMD_REGISTER_PEER_FD: {
1814                 struct socket *sock = sockfd_lookup (pcfg->pcfg_fd, &rc);
1815                 int            type = pcfg->pcfg_misc;
1816
1817                 if (sock == NULL)
1818                         break;
1819
1820                 switch (type) {
1821                 case SOCKNAL_CONN_NONE:
1822                 case SOCKNAL_CONN_ANY:
1823                 case SOCKNAL_CONN_CONTROL:
1824                 case SOCKNAL_CONN_BULK_IN:
1825                 case SOCKNAL_CONN_BULK_OUT:
1826                         rc = ksocknal_create_conn(NULL, sock, type);
1827                         break;
1828                 default:
1829                         rc = -EINVAL;
1830                         break;
1831                 }
1832                 cfs_put_file (KSN_SOCK2FILE(sock));
1833                 break;
1834         }
1835         case NAL_CMD_CLOSE_CONNECTION: {
1836                 rc = ksocknal_close_matching_conns (pcfg->pcfg_nid,
1837                                                     pcfg->pcfg_id);
1838                 break;
1839         }
1840         case NAL_CMD_REGISTER_MYNID: {
1841                 rc = ksocknal_set_mynid (pcfg->pcfg_nid);
1842                 break;
1843         }
1844         case NAL_CMD_PUSH_CONNECTION: {
1845                 rc = ksocknal_push (pcfg->pcfg_nid);
1846                 break;
1847         }
1848         default:
1849                 rc = -EINVAL;
1850                 break;
1851         }
1852
1853         return rc;
1854 }
1855
1856 void
1857 ksocknal_free_fmbs (ksock_fmb_pool_t *p)
1858 {
1859         int          npages = p->fmp_buff_pages;
1860         ksock_fmb_t *fmb;
1861         int          i;
1862
1863         LASSERT (list_empty(&p->fmp_blocked_conns));
1864         LASSERT (p->fmp_nactive_fmbs == 0);
1865
1866         while (!list_empty(&p->fmp_idle_fmbs)) {
1867
1868                 fmb = list_entry(p->fmp_idle_fmbs.next,
1869                                  ksock_fmb_t, fmb_list);
1870
1871                 for (i = 0; i < npages; i++)
1872                         if (fmb->fmb_kiov[i].kiov_page != NULL)
1873                                 cfs_free_page(fmb->fmb_kiov[i].kiov_page);
1874
1875                 list_del(&fmb->fmb_list);
1876                 PORTAL_FREE(fmb, offsetof(ksock_fmb_t, fmb_kiov[npages]));
1877         }
1878 }
1879
1880 void
1881 ksocknal_free_buffers (void)
1882 {
1883         ksocknal_free_fmbs(&ksocknal_data.ksnd_small_fmp);
1884         ksocknal_free_fmbs(&ksocknal_data.ksnd_large_fmp);
1885
1886         LASSERT (atomic_read(&ksocknal_data.ksnd_nactive_ltxs) == 0);
1887
1888         if (ksocknal_data.ksnd_schedulers != NULL)
1889                 PORTAL_FREE (ksocknal_data.ksnd_schedulers,
1890                              sizeof (ksock_sched_t) * ksocknal_data.ksnd_nschedulers);
1891
1892         PORTAL_FREE (ksocknal_data.ksnd_peers,
1893                      sizeof (struct list_head) *
1894                      ksocknal_data.ksnd_peer_hash_size);
1895 }
1896
1897 void
1898 ksocknal_api_shutdown (nal_t *nal)
1899 {
1900         ksock_sched_t *sched;
1901         int            i;
1902
1903         if (nal->nal_refct != 0) {
1904                 /* This module got the first ref */
1905                 PORTAL_MODULE_UNUSE;
1906                 return;
1907         }
1908
1909         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1910                atomic_read (&portal_kmemory));
1911
1912         LASSERT(nal == &ksocknal_api);
1913
1914         switch (ksocknal_data.ksnd_init) {
1915         default:
1916                 LASSERT (0);
1917
1918         case SOCKNAL_INIT_ALL:
1919                 libcfs_nal_cmd_unregister(SOCKNAL);
1920
1921                 ksocknal_data.ksnd_init = SOCKNAL_INIT_LIB;
1922                 /* fall through */
1923
1924         case SOCKNAL_INIT_LIB:
1925                 /* No more calls to ksocknal_cmd() to create new
1926                  * autoroutes/connections since we're being unloaded. */
1927
1928                 /* Delete all peers */
1929                 ksocknal_del_peer(PTL_NID_ANY, 0, 0);
1930
1931                 /* Wait for all peer state to clean up */
1932                 i = 2;
1933                 while (atomic_read (&ksocknal_data.ksnd_npeers) != 0) {
1934                         i++;
1935                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1936                                "waiting for %d peers to disconnect\n",
1937                                atomic_read (&ksocknal_data.ksnd_npeers));
1938                         set_current_state (TASK_UNINTERRUPTIBLE);
1939                         schedule_timeout (cfs_time_seconds(1));
1940                 }
1941
1942                 /* Tell lib we've stopped calling into her. */
1943                 lib_fini(&ksocknal_lib);
1944
1945                 ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
1946                 /* fall through */
1947
1948         case SOCKNAL_INIT_DATA:
1949                 LASSERT (atomic_read (&ksocknal_data.ksnd_npeers) == 0);
1950                 LASSERT (ksocknal_data.ksnd_peers != NULL);
1951                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1952                         LASSERT (list_empty (&ksocknal_data.ksnd_peers[i]));
1953                 }
1954                 LASSERT (list_empty (&ksocknal_data.ksnd_enomem_conns));
1955                 LASSERT (list_empty (&ksocknal_data.ksnd_zombie_conns));
1956                 LASSERT (list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
1957                 LASSERT (list_empty (&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns));
1958                 LASSERT (list_empty (&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns));
1959
1960                 if (ksocknal_data.ksnd_schedulers != NULL)
1961                         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
1962                                 ksock_sched_t *kss =
1963                                         &ksocknal_data.ksnd_schedulers[i];
1964
1965                                 LASSERT (list_empty (&kss->kss_tx_conns));
1966                                 LASSERT (list_empty (&kss->kss_rx_conns));
1967                                 LASSERT (kss->kss_nconns == 0);
1968                         }
1969
1970                 /* stop router calling me */
1971                 kpr_shutdown (&ksocknal_data.ksnd_router);
1972
1973                 /* flag threads to terminate; wake and wait for them to die */
1974                 ksocknal_data.ksnd_shuttingdown = 1;
1975                 cfs_waitq_broadcast (&ksocknal_data.ksnd_autoconnectd_waitq);
1976                 cfs_waitq_broadcast (&ksocknal_data.ksnd_reaper_waitq);
1977
1978                 for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
1979                         sched = &ksocknal_data.ksnd_schedulers[i];
1980                         cfs_waitq_broadcast(&sched->kss_waitq);
1981                 }
1982
1983                 i = 4;
1984                 read_lock(&ksocknal_data.ksnd_global_lock);
1985                 while (ksocknal_data.ksnd_nthreads != 0) {
1986                         i++;
1987                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1988                                "waiting for %d threads to terminate\n",
1989                                 ksocknal_data.ksnd_nthreads);
1990                         read_unlock(&ksocknal_data.ksnd_global_lock);
1991                         set_current_state (TASK_UNINTERRUPTIBLE);
1992                         schedule_timeout (cfs_time_seconds(1));
1993                         read_lock(&ksocknal_data.ksnd_global_lock);
1994                 }
1995                 read_unlock(&ksocknal_data.ksnd_global_lock);
1996
1997                 kpr_deregister (&ksocknal_data.ksnd_router);
1998
1999                 ksocknal_free_buffers();
2000
2001                 ksocknal_data.ksnd_init = SOCKNAL_INIT_NOTHING;
2002                 /* fall through */
2003
2004         case SOCKNAL_INIT_NOTHING:
2005                 break;
2006         }
2007
2008         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
2009                atomic_read (&portal_kmemory));
2010
2011         printk(KERN_INFO "Lustre: Routing socket NAL unloaded (final mem %d)\n",
2012                atomic_read(&portal_kmemory));
2013 }
2014
2015
2016 void
2017 ksocknal_init_incarnation (void)
2018 {
2019         struct timeval tv;
2020
2021         /* The incarnation number is the time this module loaded and it
2022          * identifies this particular instance of the socknal.  Hopefully
2023          * we won't be able to reboot more frequently than 1MHz for the
2024          * forseeable future :) */
2025
2026         do_gettimeofday(&tv);
2027
2028         ksocknal_data.ksnd_incarnation =
2029                 (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
2030 }
2031
2032 int
2033 ksocknal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
2034                       ptl_ni_limits_t *requested_limits,
2035                       ptl_ni_limits_t *actual_limits)
2036 {
2037         ptl_process_id_t  process_id;
2038         int               pkmem = atomic_read(&portal_kmemory);
2039         int               rc;
2040         int               i;
2041         int               j;
2042
2043         LASSERT (nal == &ksocknal_api);
2044
2045         if (nal->nal_refct != 0) {
2046                 if (actual_limits != NULL)
2047                         *actual_limits = ksocknal_lib.libnal_ni.ni_actual_limits;
2048                 /* This module got the first ref */
2049                 PORTAL_MODULE_USE;
2050                 return (PTL_OK);
2051         }
2052
2053         LASSERT (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
2054
2055         memset (&ksocknal_data, 0, sizeof (ksocknal_data)); /* zero pointers */
2056
2057         ksocknal_init_incarnation();
2058
2059         ksocknal_data.ksnd_peer_hash_size = SOCKNAL_PEER_HASH_SIZE;
2060         PORTAL_ALLOC (ksocknal_data.ksnd_peers,
2061                       sizeof (struct list_head) * ksocknal_data.ksnd_peer_hash_size);
2062         if (ksocknal_data.ksnd_peers == NULL)
2063                 return (-ENOMEM);
2064
2065         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++)
2066                 CFS_INIT_LIST_HEAD(&ksocknal_data.ksnd_peers[i]);
2067
2068         rwlock_init(&ksocknal_data.ksnd_global_lock);
2069
2070         spin_lock_init(&ksocknal_data.ksnd_small_fmp.fmp_lock);
2071         CFS_INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_idle_fmbs);
2072         CFS_INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns);
2073         ksocknal_data.ksnd_small_fmp.fmp_buff_pages = SOCKNAL_SMALL_FWD_PAGES;
2074
2075         spin_lock_init(&ksocknal_data.ksnd_large_fmp.fmp_lock);
2076         CFS_INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_idle_fmbs);
2077         CFS_INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns);
2078         ksocknal_data.ksnd_large_fmp.fmp_buff_pages = SOCKNAL_LARGE_FWD_PAGES;
2079
2080         spin_lock_init (&ksocknal_data.ksnd_reaper_lock);
2081         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_enomem_conns);
2082         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_zombie_conns);
2083         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_deathrow_conns);
2084         cfs_waitq_init(&ksocknal_data.ksnd_reaper_waitq);
2085
2086         spin_lock_init (&ksocknal_data.ksnd_autoconnectd_lock);
2087         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_autoconnectd_routes);
2088         cfs_waitq_init(&ksocknal_data.ksnd_autoconnectd_waitq);
2089
2090         /* NB memset above zeros whole of ksocknal_data, including
2091          * ksocknal_data.ksnd_irqinfo[all].ksni_valid */
2092
2093         /* flag lists/ptrs/locks initialised */
2094         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
2095
2096         ksocknal_data.ksnd_nschedulers = ksocknal_nsched();
2097         PORTAL_ALLOC(ksocknal_data.ksnd_schedulers,
2098                      sizeof(ksock_sched_t) * ksocknal_data.ksnd_nschedulers);
2099         if (ksocknal_data.ksnd_schedulers == NULL) {
2100                 ksocknal_api_shutdown (nal);
2101                 return (-ENOMEM);
2102         }
2103
2104         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
2105                 ksock_sched_t *kss = &ksocknal_data.ksnd_schedulers[i];
2106
2107                 spin_lock_init (&kss->kss_lock);
2108                 CFS_INIT_LIST_HEAD (&kss->kss_rx_conns);
2109                 CFS_INIT_LIST_HEAD (&kss->kss_tx_conns);
2110 #if SOCKNAL_ZC
2111                 CFS_INIT_LIST_HEAD (&kss->kss_zctxdone_list);
2112 #endif
2113                 cfs_waitq_init (&kss->kss_waitq);
2114         }
2115
2116         /* NB we have to wait to be told our true NID... */
2117         process_id.pid = requested_pid;
2118         process_id.nid = 0;
2119
2120         rc = lib_init(&ksocknal_lib, nal, process_id,
2121                       requested_limits, actual_limits);
2122         if (rc != PTL_OK) {
2123                 CERROR("lib_init failed: error %d\n", rc);
2124                 ksocknal_api_shutdown (nal);
2125                 return (rc);
2126         }
2127
2128         ksocknal_data.ksnd_init = SOCKNAL_INIT_LIB; // flag lib_init() called
2129
2130         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
2131                 rc = ksocknal_thread_start (ksocknal_scheduler,
2132                                             &ksocknal_data.ksnd_schedulers[i]);
2133                 if (rc != 0) {
2134                         CERROR("Can't spawn socknal scheduler[%d]: %d\n",
2135                                i, rc);
2136                         ksocknal_api_shutdown (nal);
2137                         return (rc);
2138                 }
2139         }
2140
2141         for (i = 0; i < SOCKNAL_N_AUTOCONNECTD; i++) {
2142                 rc = ksocknal_thread_start (ksocknal_autoconnectd, (void *)((long)i));
2143                 if (rc != 0) {
2144                         CERROR("Can't spawn socknal autoconnectd: %d\n", rc);
2145                         ksocknal_api_shutdown (nal);
2146                         return (rc);
2147                 }
2148         }
2149
2150         rc = ksocknal_thread_start (ksocknal_reaper, NULL);
2151         if (rc != 0) {
2152                 CERROR ("Can't spawn socknal reaper: %d\n", rc);
2153                 ksocknal_api_shutdown (nal);
2154                 return (rc);
2155         }
2156
2157         rc = kpr_register(&ksocknal_data.ksnd_router,
2158                           &ksocknal_router_interface);
2159         if (rc != 0) {
2160                 CDEBUG(D_NET, "Can't initialise routing interface "
2161                        "(rc = %d): not routing\n", rc);
2162         } else {
2163                 /* Only allocate forwarding buffers if there's a router */
2164
2165                 for (i = 0; i < (SOCKNAL_SMALL_FWD_NMSGS +
2166                                  SOCKNAL_LARGE_FWD_NMSGS); i++) {
2167                         ksock_fmb_t      *fmb;
2168                         ksock_fmb_pool_t *pool;
2169
2170
2171                         if (i < SOCKNAL_SMALL_FWD_NMSGS)
2172                                 pool = &ksocknal_data.ksnd_small_fmp;
2173                         else
2174                                 pool = &ksocknal_data.ksnd_large_fmp;
2175
2176                         PORTAL_ALLOC(fmb, offsetof(ksock_fmb_t,
2177                                                    fmb_kiov[pool->fmp_buff_pages]));
2178                         if (fmb == NULL) {
2179                                 ksocknal_api_shutdown(nal);
2180                                 return (-ENOMEM);
2181                         }
2182
2183                         fmb->fmb_pool = pool;
2184
2185                         for (j = 0; j < pool->fmp_buff_pages; j++) {
2186                                 fmb->fmb_kiov[j].kiov_page = cfs_alloc_page(CFS_ALLOC_STD);
2187
2188                                 if (fmb->fmb_kiov[j].kiov_page == NULL) {
2189                                         ksocknal_api_shutdown (nal);
2190                                         return (-ENOMEM);
2191                                 }
2192
2193                                 LASSERT(cfs_page_address(fmb->fmb_kiov[j].kiov_page) != NULL);
2194                         }
2195
2196                         list_add(&fmb->fmb_list, &pool->fmp_idle_fmbs);
2197                 }
2198         }
2199
2200         rc = libcfs_nal_cmd_register(SOCKNAL, &ksocknal_cmd, NULL);
2201         if (rc != 0) {
2202                 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
2203                 ksocknal_api_shutdown (nal);
2204                 return (rc);
2205         }
2206
2207         /* flag everything initialised */
2208         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
2209
2210         printk(KERN_INFO "Lustre: Routing socket NAL loaded "
2211                "(Routing %s, initial mem %d, incarnation "LPX64")\n",
2212                kpr_routing (&ksocknal_data.ksnd_router) ?
2213                "enabled" : "disabled", pkmem, ksocknal_data.ksnd_incarnation);
2214
2215         return (0);
2216 }
2217
2218 void __exit
2219 ksocknal_module_fini (void)
2220 {
2221 #ifdef CONFIG_SYSCTL
2222         if (ksocknal_tunables.ksnd_sysctl != NULL)
2223                 unregister_sysctl_table (ksocknal_tunables.ksnd_sysctl);
2224 #endif
2225         PtlNIFini(ksocknal_ni);
2226
2227         ptl_unregister_nal(SOCKNAL);
2228 }
2229
2230 extern cfs_sysctl_table_t ksocknal_top_ctl_table[];
2231
2232 int __init
2233 ksocknal_module_init (void)
2234 {
2235         int    rc;
2236
2237         /* packet descriptor must fit in a router descriptor's scratchpad */
2238         LASSERT(sizeof (ksock_tx_t) <= sizeof (kprfd_scratch_t));
2239         /* the following must be sizeof(int) for proc_dointvec() */
2240         LASSERT(sizeof (ksocknal_tunables.ksnd_io_timeout) == sizeof (int));
2241         LASSERT(sizeof (ksocknal_tunables.ksnd_eager_ack) == sizeof (int));
2242         LASSERT(sizeof (ksocknal_tunables.ksnd_typed_conns) == sizeof (int));
2243         LASSERT(sizeof (ksocknal_tunables.ksnd_min_bulk) == sizeof (int));
2244         LASSERT(sizeof (ksocknal_tunables.ksnd_buffer_size) == sizeof (int));
2245         LASSERT(sizeof (ksocknal_tunables.ksnd_nagle) == sizeof (int));
2246         LASSERT(sizeof (ksocknal_tunables.ksnd_keepalive_idle) == sizeof (int));
2247         LASSERT(sizeof (ksocknal_tunables.ksnd_keepalive_count) == sizeof (int));
2248         LASSERT(sizeof (ksocknal_tunables.ksnd_keepalive_intvl) == sizeof (int));
2249 #if CPU_AFFINITY
2250         LASSERT(sizeof (ksocknal_tunables.ksnd_irq_affinity) == sizeof (int));
2251 #endif
2252 #if SOCKNAL_ZC
2253         LASSERT(sizeof (ksocknal_tunables.ksnd_zc_min_frag) == sizeof (int));
2254 #endif
2255         /* check ksnr_connected/connecting field large enough */
2256         LASSERT(SOCKNAL_CONN_NTYPES <= 4);
2257
2258         ksocknal_api.nal_ni_init = ksocknal_api_startup;
2259         ksocknal_api.nal_ni_fini = ksocknal_api_shutdown;
2260
2261         /* Initialise dynamic tunables to defaults once only */
2262         ksocknal_tunables.ksnd_io_timeout      = SOCKNAL_IO_TIMEOUT;
2263         ksocknal_tunables.ksnd_eager_ack       = SOCKNAL_EAGER_ACK;
2264         ksocknal_tunables.ksnd_typed_conns     = SOCKNAL_TYPED_CONNS;
2265         ksocknal_tunables.ksnd_min_bulk        = SOCKNAL_MIN_BULK;
2266         ksocknal_tunables.ksnd_buffer_size     = SOCKNAL_BUFFER_SIZE;
2267         ksocknal_tunables.ksnd_nagle           = SOCKNAL_NAGLE;
2268         ksocknal_tunables.ksnd_keepalive_idle  = SOCKNAL_KEEPALIVE_IDLE;
2269         ksocknal_tunables.ksnd_keepalive_count = SOCKNAL_KEEPALIVE_COUNT;
2270         ksocknal_tunables.ksnd_keepalive_intvl = SOCKNAL_KEEPALIVE_INTVL;
2271 #if CPU_AFFINITY
2272         ksocknal_tunables.ksnd_irq_affinity = SOCKNAL_IRQ_AFFINITY;
2273 #endif
2274 #if SOCKNAL_ZC
2275         ksocknal_tunables.ksnd_zc_min_frag  = SOCKNAL_ZC_MIN_FRAG;
2276 #endif
2277
2278         rc = ptl_register_nal(SOCKNAL, &ksocknal_api);
2279         if (rc != PTL_OK) {
2280                 CERROR("Can't register SOCKNAL: %d\n", rc);
2281                 return (-ENOMEM);               /* or something... */
2282         }
2283
2284         /* Pure gateways want the NAL started up at module load time... */
2285         rc = PtlNIInit(SOCKNAL, LUSTRE_SRV_PTL_PID, NULL, NULL, &ksocknal_ni);
2286         if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
2287                 ptl_unregister_nal(SOCKNAL);
2288                 return (-ENODEV);
2289         }
2290
2291 #ifdef CONFIG_SYSCTL
2292         /* Press on regardless even if registering sysctl doesn't work */
2293         ksocknal_tunables.ksnd_sysctl =
2294                 register_sysctl_table (ksocknal_top_ctl_table, 0);
2295 #endif
2296         return (0);
2297 }
2298
2299 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2300 MODULE_DESCRIPTION("Kernel TCP Socket NAL v1.0.0");
2301 MODULE_LICENSE("GPL");
2302
2303 cfs_module(ksocknal, "1.0.0", ksocknal_module_init, ksocknal_module_fini);