Whamcloud - gitweb
add macro LCONSOLE_ERROR_MSG with extra parameter and map
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@zabbo.net>
6  *   Author: Peter J. Braam <braam@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Eric Barton <eric@bartonsoftware.com>
9  *
10  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  *   Portals is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Portals is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Portals; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #include "socklnd.h"
27
28 lnd_t the_ksocklnd = {
29         .lnd_type       = SOCKLND,
30         .lnd_startup    = ksocknal_startup,
31         .lnd_shutdown   = ksocknal_shutdown,
32         .lnd_ctl        = ksocknal_ctl,
33         .lnd_send       = ksocknal_send,
34         .lnd_recv       = ksocknal_recv,
35         .lnd_notify     = ksocknal_notify,
36         .lnd_accept     = ksocknal_accept,
37 };
38
39 ksock_nal_data_t        ksocknal_data;
40
41 ksock_interface_t *
42 ksocknal_ip2iface(lnet_ni_t *ni, __u32 ip)
43 {
44         ksock_net_t       *net = ni->ni_data;
45         int                i;
46         ksock_interface_t *iface;
47
48         for (i = 0; i < net->ksnn_ninterfaces; i++) {
49                 LASSERT(i < LNET_MAX_INTERFACES);
50                 iface = &net->ksnn_interfaces[i];
51
52                 if (iface->ksni_ipaddr == ip)
53                         return (iface);
54         }
55
56         return (NULL);
57 }
58
59 ksock_route_t *
60 ksocknal_create_route (__u32 ipaddr, int port)
61 {
62         ksock_route_t *route;
63
64         LIBCFS_ALLOC (route, sizeof (*route));
65         if (route == NULL)
66                 return (NULL);
67
68         atomic_set (&route->ksnr_refcount, 1);
69         route->ksnr_peer = NULL;
70         route->ksnr_retry_interval = 0;         /* OK to connect at any time */
71         route->ksnr_ipaddr = ipaddr;
72         route->ksnr_port = port;
73         route->ksnr_scheduled = 0;
74         route->ksnr_connecting = 0;
75         route->ksnr_connected = 0;
76         route->ksnr_deleted = 0;
77         route->ksnr_conn_count = 0;
78         route->ksnr_share_count = 0;
79         route->ksnr_proto = &ksocknal_protocol_v2x;
80
81         return (route);
82 }
83
84 void
85 ksocknal_destroy_route (ksock_route_t *route)
86 {
87         LASSERT (atomic_read(&route->ksnr_refcount) == 0);
88
89         if (route->ksnr_peer != NULL)
90                 ksocknal_peer_decref(route->ksnr_peer);
91
92         LIBCFS_FREE (route, sizeof (*route));
93 }
94
95 int
96 ksocknal_create_peer (ksock_peer_t **peerp, lnet_ni_t *ni, lnet_process_id_t id)
97 {
98         ksock_net_t   *net = ni->ni_data;
99         ksock_peer_t  *peer;
100
101         LASSERT (id.nid != LNET_NID_ANY);
102         LASSERT (id.pid != LNET_PID_ANY);
103         LASSERT (!in_interrupt());
104
105         LIBCFS_ALLOC (peer, sizeof (*peer));
106         if (peer == NULL)
107                 return -ENOMEM;
108
109         memset (peer, 0, sizeof (*peer));       /* NULL pointers/clear flags etc */
110
111         peer->ksnp_ni = ni;
112         peer->ksnp_id = id;
113         atomic_set (&peer->ksnp_refcount, 1);   /* 1 ref for caller */
114         peer->ksnp_closing = 0;
115         peer->ksnp_accepting = 0;
116         peer->ksnp_zc_next_cookie = 1;
117         CFS_INIT_LIST_HEAD (&peer->ksnp_conns);
118         CFS_INIT_LIST_HEAD (&peer->ksnp_routes);
119         CFS_INIT_LIST_HEAD (&peer->ksnp_tx_queue);
120         CFS_INIT_LIST_HEAD (&peer->ksnp_zc_req_list);
121         spin_lock_init(&peer->ksnp_lock);
122
123         spin_lock_bh (&net->ksnn_lock);
124
125         if (net->ksnn_shutdown) {
126                 spin_unlock_bh (&net->ksnn_lock);
127                 
128                 LIBCFS_FREE(peer, sizeof(*peer));
129                 CERROR("Can't create peer: network shutdown\n");
130                 return -ESHUTDOWN;
131         }
132
133         net->ksnn_npeers++;
134
135         spin_unlock_bh (&net->ksnn_lock);
136
137         *peerp = peer;
138         return 0;
139 }
140
141 void
142 ksocknal_destroy_peer (ksock_peer_t *peer)
143 {
144         ksock_net_t    *net = peer->ksnp_ni->ni_data;
145
146         CDEBUG (D_NET, "peer %s %p deleted\n", 
147                 libcfs_id2str(peer->ksnp_id), peer);
148
149         LASSERT (atomic_read (&peer->ksnp_refcount) == 0);
150         LASSERT (peer->ksnp_accepting == 0);
151         LASSERT (list_empty (&peer->ksnp_conns));
152         LASSERT (list_empty (&peer->ksnp_routes));
153         LASSERT (list_empty (&peer->ksnp_tx_queue));
154         LASSERT (list_empty (&peer->ksnp_zc_req_list));
155
156         LIBCFS_FREE (peer, sizeof (*peer));
157
158         /* NB a peer's connections and routes keep a reference on their peer
159          * until they are destroyed, so we can be assured that _all_ state to
160          * do with this peer has been cleaned up when its refcount drops to
161          * zero. */
162         spin_lock_bh (&net->ksnn_lock);
163         net->ksnn_npeers--;
164         spin_unlock_bh (&net->ksnn_lock);
165 }
166
167 ksock_peer_t *
168 ksocknal_find_peer_locked (lnet_ni_t *ni, lnet_process_id_t id)
169 {
170         struct list_head *peer_list = ksocknal_nid2peerlist(id.nid);
171         struct list_head *tmp;
172         ksock_peer_t     *peer;
173
174         list_for_each (tmp, peer_list) {
175
176                 peer = list_entry (tmp, ksock_peer_t, ksnp_list);
177
178                 LASSERT (!peer->ksnp_closing);
179
180                 if (peer->ksnp_ni != ni)
181                         continue;
182
183                 if (peer->ksnp_id.nid != id.nid ||
184                     peer->ksnp_id.pid != id.pid)
185                         continue;
186
187                 CDEBUG(D_NET, "got peer [%p] -> %s (%d)\n",
188                        peer, libcfs_id2str(id), 
189                        atomic_read(&peer->ksnp_refcount));
190                 return (peer);
191         }
192         return (NULL);
193 }
194
195 ksock_peer_t *
196 ksocknal_find_peer (lnet_ni_t *ni, lnet_process_id_t id)
197 {
198         ksock_peer_t     *peer;
199
200         read_lock (&ksocknal_data.ksnd_global_lock);
201         peer = ksocknal_find_peer_locked (ni, id);
202         if (peer != NULL)                       /* +1 ref for caller? */
203                 ksocknal_peer_addref(peer);
204         read_unlock (&ksocknal_data.ksnd_global_lock);
205
206         return (peer);
207 }
208
209 void
210 ksocknal_unlink_peer_locked (ksock_peer_t *peer)
211 {
212         int                i;
213         __u32              ip;
214
215         for (i = 0; i < peer->ksnp_n_passive_ips; i++) {
216                 LASSERT (i < LNET_MAX_INTERFACES);
217                 ip = peer->ksnp_passive_ips[i];
218
219                 ksocknal_ip2iface(peer->ksnp_ni, ip)->ksni_npeers--;
220         }
221
222         LASSERT (list_empty(&peer->ksnp_conns));
223         LASSERT (list_empty(&peer->ksnp_routes));
224         LASSERT (!peer->ksnp_closing);
225         peer->ksnp_closing = 1;
226         list_del (&peer->ksnp_list);
227         /* lose peerlist's ref */
228         ksocknal_peer_decref(peer);
229 }
230
231 int
232 ksocknal_get_peer_info (lnet_ni_t *ni, int index, 
233                         lnet_process_id_t *id, __u32 *myip, __u32 *peer_ip, int *port,
234                         int *conn_count, int *share_count)
235 {
236         ksock_peer_t      *peer;
237         struct list_head  *ptmp;
238         ksock_route_t     *route;
239         struct list_head  *rtmp;
240         int                i;
241         int                j;
242         int                rc = -ENOENT;
243
244         read_lock (&ksocknal_data.ksnd_global_lock);
245
246         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
247
248                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
249                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
250
251                         if (peer->ksnp_ni != ni)
252                                 continue;
253
254                         if (peer->ksnp_n_passive_ips == 0 &&
255                             list_empty(&peer->ksnp_routes)) {
256                                 if (index-- > 0)
257                                         continue;
258
259                                 *id = peer->ksnp_id;
260                                 *myip = 0;
261                                 *peer_ip = 0;
262                                 *port = 0;
263                                 *conn_count = 0;
264                                 *share_count = 0;
265                                 rc = 0;
266                                 goto out;
267                         }
268
269                         for (j = 0; j < peer->ksnp_n_passive_ips; j++) {
270                                 if (index-- > 0)
271                                         continue;
272
273                                 *id = peer->ksnp_id;
274                                 *myip = peer->ksnp_passive_ips[j];
275                                 *peer_ip = 0;
276                                 *port = 0;
277                                 *conn_count = 0;
278                                 *share_count = 0;
279                                 rc = 0;
280                                 goto out;
281                         }
282
283                         list_for_each (rtmp, &peer->ksnp_routes) {
284                                 if (index-- > 0)
285                                         continue;
286
287                                 route = list_entry(rtmp, ksock_route_t,
288                                                    ksnr_list);
289
290                                 *id = peer->ksnp_id;
291                                 *myip = route->ksnr_myipaddr;
292                                 *peer_ip = route->ksnr_ipaddr;
293                                 *port = route->ksnr_port;
294                                 *conn_count = route->ksnr_conn_count;
295                                 *share_count = route->ksnr_share_count;
296                                 rc = 0;
297                                 goto out;
298                         }
299                 }
300         }
301  out:
302         read_unlock (&ksocknal_data.ksnd_global_lock);
303         return (rc);
304 }
305
306 void
307 ksocknal_associate_route_conn_locked(ksock_route_t *route, ksock_conn_t *conn)
308 {
309         ksock_peer_t      *peer = route->ksnr_peer;
310         int                type = conn->ksnc_type;
311         ksock_interface_t *iface;
312
313         conn->ksnc_route = route;
314         ksocknal_route_addref(route);
315
316         if (route->ksnr_myipaddr != conn->ksnc_myipaddr) {
317                 if (route->ksnr_myipaddr == 0) {
318                         /* route wasn't bound locally yet (the initial route) */
319                         CDEBUG(D_NET, "Binding %s %u.%u.%u.%u to %u.%u.%u.%u\n",
320                                libcfs_id2str(peer->ksnp_id),
321                                HIPQUAD(route->ksnr_ipaddr),
322                                HIPQUAD(conn->ksnc_myipaddr));
323                 } else {
324                         CDEBUG(D_NET, "Rebinding %s %u.%u.%u.%u from "
325                                "%u.%u.%u.%u to %u.%u.%u.%u\n",
326                                libcfs_id2str(peer->ksnp_id),
327                                HIPQUAD(route->ksnr_ipaddr),
328                                HIPQUAD(route->ksnr_myipaddr),
329                                HIPQUAD(conn->ksnc_myipaddr));
330
331                         iface = ksocknal_ip2iface(route->ksnr_peer->ksnp_ni,
332                                                   route->ksnr_myipaddr);
333                         if (iface != NULL)
334                                 iface->ksni_nroutes--;
335                 }
336                 route->ksnr_myipaddr = conn->ksnc_myipaddr;
337                 iface = ksocknal_ip2iface(route->ksnr_peer->ksnp_ni,
338                                           route->ksnr_myipaddr);
339                 if (iface != NULL)
340                         iface->ksni_nroutes++;
341         }
342
343         route->ksnr_connected |= (1<<type);
344         route->ksnr_conn_count++;
345
346         /* Successful connection => further attempts can
347          * proceed immediately */
348         route->ksnr_retry_interval = 0;
349 }
350
351 void
352 ksocknal_add_route_locked (ksock_peer_t *peer, ksock_route_t *route)
353 {
354         struct list_head  *tmp;
355         ksock_conn_t      *conn;
356         ksock_route_t     *route2;
357
358         LASSERT (!peer->ksnp_closing);
359         LASSERT (route->ksnr_peer == NULL);
360         LASSERT (!route->ksnr_scheduled);
361         LASSERT (!route->ksnr_connecting);
362         LASSERT (route->ksnr_connected == 0);
363
364         /* LASSERT(unique) */
365         list_for_each(tmp, &peer->ksnp_routes) {
366                 route2 = list_entry(tmp, ksock_route_t, ksnr_list);
367
368                 if (route2->ksnr_ipaddr == route->ksnr_ipaddr) {
369                         CERROR ("Duplicate route %s %u.%u.%u.%u\n",
370                                 libcfs_id2str(peer->ksnp_id), 
371                                 HIPQUAD(route->ksnr_ipaddr));
372                         LBUG();
373                 }
374         }
375
376         route->ksnr_peer = peer;
377         ksocknal_peer_addref(peer);
378         /* peer's routelist takes over my ref on 'route' */
379         list_add_tail(&route->ksnr_list, &peer->ksnp_routes);
380
381         list_for_each(tmp, &peer->ksnp_conns) {
382                 conn = list_entry(tmp, ksock_conn_t, ksnc_list);
383
384                 if (conn->ksnc_ipaddr != route->ksnr_ipaddr)
385                         continue;
386
387                 ksocknal_associate_route_conn_locked(route, conn);
388                 /* keep going (typed routes) */
389         }
390 }
391
392 void
393 ksocknal_del_route_locked (ksock_route_t *route)
394 {
395         ksock_peer_t      *peer = route->ksnr_peer;
396         ksock_interface_t *iface;
397         ksock_conn_t      *conn;
398         struct list_head  *ctmp;
399         struct list_head  *cnxt;
400
401         LASSERT (!route->ksnr_deleted);
402
403         /* Close associated conns */
404         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
405                 conn = list_entry(ctmp, ksock_conn_t, ksnc_list);
406
407                 if (conn->ksnc_route != route)
408                         continue;
409
410                 ksocknal_close_conn_locked (conn, 0);
411         }
412
413         if (route->ksnr_myipaddr != 0) {
414                 iface = ksocknal_ip2iface(route->ksnr_peer->ksnp_ni,
415                                           route->ksnr_myipaddr);
416                 if (iface != NULL)
417                         iface->ksni_nroutes--;
418         }
419
420         route->ksnr_deleted = 1;
421         list_del (&route->ksnr_list);
422         ksocknal_route_decref(route);             /* drop peer's ref */
423
424         if (list_empty (&peer->ksnp_routes) &&
425             list_empty (&peer->ksnp_conns)) {
426                 /* I've just removed the last route to a peer with no active
427                  * connections */
428                 ksocknal_unlink_peer_locked (peer);
429         }
430 }
431
432 int
433 ksocknal_add_peer (lnet_ni_t *ni, lnet_process_id_t id, __u32 ipaddr, int port)
434 {
435         struct list_head  *tmp;
436         ksock_peer_t      *peer;
437         ksock_peer_t      *peer2;
438         ksock_route_t     *route;
439         ksock_route_t     *route2;
440         int                rc;
441
442         if (id.nid == LNET_NID_ANY ||
443             id.pid == LNET_PID_ANY)
444                 return (-EINVAL);
445
446         /* Have a brand new peer ready... */
447         rc = ksocknal_create_peer(&peer, ni, id);
448         if (rc != 0)
449                 return rc;
450
451         route = ksocknal_create_route (ipaddr, port);
452         if (route == NULL) {
453                 ksocknal_peer_decref(peer);
454                 return (-ENOMEM);
455         }
456
457         write_lock_bh (&ksocknal_data.ksnd_global_lock);
458
459         peer2 = ksocknal_find_peer_locked (ni, id);
460         if (peer2 != NULL) {
461                 ksocknal_peer_decref(peer);
462                 peer = peer2;
463         } else {
464                 /* peer table takes my ref on peer */
465                 list_add_tail (&peer->ksnp_list,
466                                ksocknal_nid2peerlist (id.nid));
467         }
468
469         route2 = NULL;
470         list_for_each (tmp, &peer->ksnp_routes) {
471                 route2 = list_entry(tmp, ksock_route_t, ksnr_list);
472
473                 if (route2->ksnr_ipaddr == ipaddr)
474                         break;
475
476                 route2 = NULL;
477         }
478         if (route2 == NULL) {
479                 ksocknal_add_route_locked(peer, route);
480                 route->ksnr_share_count++;
481         } else {
482                 ksocknal_route_decref(route);
483                 route2->ksnr_share_count++;
484         }
485
486         write_unlock_bh (&ksocknal_data.ksnd_global_lock);
487
488         return (0);
489 }
490
491 void
492 ksocknal_del_peer_locked (ksock_peer_t *peer, __u32 ip)
493 {
494         ksock_conn_t     *conn;
495         ksock_route_t    *route;
496         struct list_head *tmp;
497         struct list_head *nxt;
498         int               nshared;
499
500         LASSERT (!peer->ksnp_closing);
501
502         /* Extra ref prevents peer disappearing until I'm done with it */
503         ksocknal_peer_addref(peer);
504
505         list_for_each_safe (tmp, nxt, &peer->ksnp_routes) {
506                 route = list_entry(tmp, ksock_route_t, ksnr_list);
507
508                 /* no match */
509                 if (!(ip == 0 || route->ksnr_ipaddr == ip))
510                         continue;
511
512                 route->ksnr_share_count = 0;
513                 /* This deletes associated conns too */
514                 ksocknal_del_route_locked (route);
515         }
516
517         nshared = 0;
518         list_for_each_safe (tmp, nxt, &peer->ksnp_routes) {
519                 route = list_entry(tmp, ksock_route_t, ksnr_list);
520                 nshared += route->ksnr_share_count;
521         }
522
523         if (nshared == 0) {
524                 /* remove everything else if there are no explicit entries
525                  * left */
526
527                 list_for_each_safe (tmp, nxt, &peer->ksnp_routes) {
528                         route = list_entry(tmp, ksock_route_t, ksnr_list);
529
530                         /* we should only be removing auto-entries */
531                         LASSERT(route->ksnr_share_count == 0);
532                         ksocknal_del_route_locked (route);
533                 }
534
535                 list_for_each_safe (tmp, nxt, &peer->ksnp_conns) {
536                         conn = list_entry(tmp, ksock_conn_t, ksnc_list);
537
538                         ksocknal_close_conn_locked(conn, 0);
539                 }
540         }
541
542         ksocknal_peer_decref(peer);
543         /* NB peer unlinks itself when last conn/route is removed */
544 }
545
546 int
547 ksocknal_del_peer (lnet_ni_t *ni, lnet_process_id_t id, __u32 ip)
548 {
549         CFS_LIST_HEAD     (zombies);
550         struct list_head  *ptmp;
551         struct list_head  *pnxt;
552         ksock_peer_t      *peer;
553         int                lo;
554         int                hi;
555         int                i;
556         int                rc = -ENOENT;
557
558         write_lock_bh (&ksocknal_data.ksnd_global_lock);
559
560         if (id.nid != LNET_NID_ANY)
561                 lo = hi = ksocknal_nid2peerlist(id.nid) - ksocknal_data.ksnd_peers;
562         else {
563                 lo = 0;
564                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
565         }
566
567         for (i = lo; i <= hi; i++) {
568                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
569                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
570
571                         if (peer->ksnp_ni != ni)
572                                 continue;
573
574                         if (!((id.nid == LNET_NID_ANY || peer->ksnp_id.nid == id.nid) &&
575                               (id.pid == LNET_PID_ANY || peer->ksnp_id.pid == id.pid)))
576                                 continue;
577
578                         ksocknal_peer_addref(peer);     /* a ref for me... */
579
580                         ksocknal_del_peer_locked (peer, ip);
581
582                         if (peer->ksnp_closing && !list_empty(&peer->ksnp_tx_queue)) {
583                                 LASSERT (list_empty(&peer->ksnp_conns));
584                                 LASSERT (list_empty(&peer->ksnp_routes));
585
586                                 list_splice_init(&peer->ksnp_tx_queue, &zombies);
587                         }
588
589                         ksocknal_peer_decref(peer);     /* ...till here */
590
591                         rc = 0;                 /* matched! */
592                 }
593         }
594
595         write_unlock_bh (&ksocknal_data.ksnd_global_lock);
596
597         ksocknal_txlist_done(ni, &zombies, 1);
598
599         return (rc);
600 }
601
602 ksock_conn_t *
603 ksocknal_get_conn_by_idx (lnet_ni_t *ni, int index)
604 {
605         ksock_peer_t      *peer;
606         struct list_head  *ptmp;
607         ksock_conn_t      *conn;
608         struct list_head  *ctmp;
609         int                i;
610
611         read_lock (&ksocknal_data.ksnd_global_lock);
612
613         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
614                 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
615                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
616
617                         LASSERT (!peer->ksnp_closing);
618
619                         if (peer->ksnp_ni != ni)
620                                 continue;
621
622                         list_for_each (ctmp, &peer->ksnp_conns) {
623                                 if (index-- > 0)
624                                         continue;
625
626                                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
627                                 ksocknal_conn_addref(conn);
628                                 read_unlock (&ksocknal_data.ksnd_global_lock);
629                                 return (conn);
630                         }
631                 }
632         }
633
634         read_unlock (&ksocknal_data.ksnd_global_lock);
635         return (NULL);
636 }
637
638 ksock_sched_t *
639 ksocknal_choose_scheduler_locked (unsigned int irq)
640 {
641         ksock_sched_t    *sched;
642         ksock_irqinfo_t  *info;
643         int               i;
644
645         LASSERT (irq < NR_IRQS);
646         info = &ksocknal_data.ksnd_irqinfo[irq];
647
648         if (irq != 0 &&                         /* hardware NIC */
649             info->ksni_valid) {                 /* already set up */
650                 return (&ksocknal_data.ksnd_schedulers[info->ksni_sched]);
651         }
652
653         /* software NIC (irq == 0) || not associated with a scheduler yet.
654          * Choose the CPU with the fewest connections... */
655         sched = &ksocknal_data.ksnd_schedulers[0];
656         for (i = 1; i < ksocknal_data.ksnd_nschedulers; i++)
657                 if (sched->kss_nconns >
658                     ksocknal_data.ksnd_schedulers[i].kss_nconns)
659                         sched = &ksocknal_data.ksnd_schedulers[i];
660
661         if (irq != 0) {                         /* Hardware NIC */
662                 info->ksni_valid = 1;
663                 info->ksni_sched = sched - ksocknal_data.ksnd_schedulers;
664
665                 /* no overflow... */
666                 LASSERT (info->ksni_sched == sched - ksocknal_data.ksnd_schedulers);
667         }
668
669         return (sched);
670 }
671
672 int
673 ksocknal_local_ipvec (lnet_ni_t *ni, __u32 *ipaddrs)
674 {
675         ksock_net_t       *net = ni->ni_data;
676         int                i;
677         int                nip;
678
679         read_lock (&ksocknal_data.ksnd_global_lock);
680
681         nip = net->ksnn_ninterfaces;
682         LASSERT (nip < LNET_MAX_INTERFACES);
683
684         /* Only offer interfaces for additional connections if I have 
685          * more than one. */
686         if (nip < 2) {
687                 read_unlock (&ksocknal_data.ksnd_global_lock);
688                 return 0;
689         }
690         
691         for (i = 0; i < nip; i++) {
692                 ipaddrs[i] = net->ksnn_interfaces[i].ksni_ipaddr;
693                 LASSERT (ipaddrs[i] != 0);
694         }
695
696         read_unlock (&ksocknal_data.ksnd_global_lock);
697         return (nip);
698 }
699
700 int
701 ksocknal_match_peerip (ksock_interface_t *iface, __u32 *ips, int nips)
702 {
703         int   best_netmatch = 0;
704         int   best_xor      = 0;
705         int   best          = -1;
706         int   this_xor;
707         int   this_netmatch;
708         int   i;
709
710         for (i = 0; i < nips; i++) {
711                 if (ips[i] == 0)
712                         continue;
713
714                 this_xor = (ips[i] ^ iface->ksni_ipaddr);
715                 this_netmatch = ((this_xor & iface->ksni_netmask) == 0) ? 1 : 0;
716
717                 if (!(best < 0 ||
718                       best_netmatch < this_netmatch ||
719                       (best_netmatch == this_netmatch &&
720                        best_xor > this_xor)))
721                         continue;
722
723                 best = i;
724                 best_netmatch = this_netmatch;
725                 best_xor = this_xor;
726         }
727
728         LASSERT (best >= 0);
729         return (best);
730 }
731
732 int
733 ksocknal_select_ips(ksock_peer_t *peer, __u32 *peerips, int n_peerips)
734 {
735         rwlock_t           *global_lock = &ksocknal_data.ksnd_global_lock;
736         ksock_net_t        *net = peer->ksnp_ni->ni_data;
737         ksock_interface_t  *iface;
738         ksock_interface_t  *best_iface;
739         int                 n_ips;
740         int                 i;
741         int                 j;
742         int                 k;
743         __u32               ip;
744         __u32               xor;
745         int                 this_netmatch;
746         int                 best_netmatch;
747         int                 best_npeers;
748
749         /* CAVEAT EMPTOR: We do all our interface matching with an
750          * exclusive hold of global lock at IRQ priority.  We're only
751          * expecting to be dealing with small numbers of interfaces, so the
752          * O(n**3)-ness shouldn't matter */
753
754         /* Also note that I'm not going to return more than n_peerips
755          * interfaces, even if I have more myself */
756
757         write_lock_bh (global_lock);
758
759         LASSERT (n_peerips <= LNET_MAX_INTERFACES);
760         LASSERT (net->ksnn_ninterfaces <= LNET_MAX_INTERFACES);
761
762         /* Only match interfaces for additional connections 
763          * if I have > 1 interface */
764         n_ips = (net->ksnn_ninterfaces < 2) ? 0 :
765                 MIN(n_peerips, net->ksnn_ninterfaces);
766
767         for (i = 0; peer->ksnp_n_passive_ips < n_ips; i++) {
768                 /*              ^ yes really... */
769
770                 /* If we have any new interfaces, first tick off all the
771                  * peer IPs that match old interfaces, then choose new
772                  * interfaces to match the remaining peer IPS.
773                  * We don't forget interfaces we've stopped using; we might
774                  * start using them again... */
775
776                 if (i < peer->ksnp_n_passive_ips) {
777                         /* Old interface. */
778                         ip = peer->ksnp_passive_ips[i];
779                         best_iface = ksocknal_ip2iface(peer->ksnp_ni, ip);
780
781                         /* peer passive ips are kept up to date */
782                         LASSERT(best_iface != NULL);
783                 } else {
784                         /* choose a new interface */
785                         LASSERT (i == peer->ksnp_n_passive_ips);
786
787                         best_iface = NULL;
788                         best_netmatch = 0;
789                         best_npeers = 0;
790
791                         for (j = 0; j < net->ksnn_ninterfaces; j++) {
792                                 iface = &net->ksnn_interfaces[j];
793                                 ip = iface->ksni_ipaddr;
794
795                                 for (k = 0; k < peer->ksnp_n_passive_ips; k++)
796                                         if (peer->ksnp_passive_ips[k] == ip)
797                                                 break;
798
799                                 if (k < peer->ksnp_n_passive_ips) /* using it already */
800                                         continue;
801
802                                 k = ksocknal_match_peerip(iface, peerips, n_peerips);
803                                 xor = (ip ^ peerips[k]);
804                                 this_netmatch = ((xor & iface->ksni_netmask) == 0) ? 1 : 0;
805
806                                 if (!(best_iface == NULL ||
807                                       best_netmatch < this_netmatch ||
808                                       (best_netmatch == this_netmatch &&
809                                        best_npeers > iface->ksni_npeers)))
810                                         continue;
811
812                                 best_iface = iface;
813                                 best_netmatch = this_netmatch;
814                                 best_npeers = iface->ksni_npeers;
815                         }
816
817                         best_iface->ksni_npeers++;
818                         ip = best_iface->ksni_ipaddr;
819                         peer->ksnp_passive_ips[i] = ip;
820                         peer->ksnp_n_passive_ips = i+1;
821                 }
822
823                 LASSERT (best_iface != NULL);
824
825                 /* mark the best matching peer IP used */
826                 j = ksocknal_match_peerip(best_iface, peerips, n_peerips);
827                 peerips[j] = 0;
828         }
829
830         /* Overwrite input peer IP addresses */
831         memcpy(peerips, peer->ksnp_passive_ips, n_ips * sizeof(*peerips));
832
833         write_unlock_bh (global_lock);
834
835         return (n_ips);
836 }
837
838 void
839 ksocknal_create_routes(ksock_peer_t *peer, int port,
840                        __u32 *peer_ipaddrs, int npeer_ipaddrs)
841 {
842         ksock_route_t      *newroute = NULL;
843         rwlock_t           *global_lock = &ksocknal_data.ksnd_global_lock;
844         lnet_ni_t          *ni = peer->ksnp_ni;
845         ksock_net_t        *net = ni->ni_data;
846         struct list_head   *rtmp;
847         ksock_route_t      *route;
848         ksock_interface_t  *iface;
849         ksock_interface_t  *best_iface;
850         int                 best_netmatch;
851         int                 this_netmatch;
852         int                 best_nroutes;
853         int                 i;
854         int                 j;
855
856         /* CAVEAT EMPTOR: We do all our interface matching with an
857          * exclusive hold of global lock at IRQ priority.  We're only
858          * expecting to be dealing with small numbers of interfaces, so the
859          * O(n**3)-ness here shouldn't matter */
860
861         write_lock_bh (global_lock);
862
863         if (net->ksnn_ninterfaces < 2) {
864                 /* Only create additional connections 
865                  * if I have > 1 interface */
866                 write_unlock_bh (global_lock);
867                 return;
868         }
869         
870         LASSERT (npeer_ipaddrs <= LNET_MAX_INTERFACES);
871
872         for (i = 0; i < npeer_ipaddrs; i++) {
873                 if (newroute != NULL) {
874                         newroute->ksnr_ipaddr = peer_ipaddrs[i];
875                 } else {
876                         write_unlock_bh (global_lock);
877
878                         newroute = ksocknal_create_route(peer_ipaddrs[i], port);
879                         if (newroute == NULL)
880                                 return;
881
882                         write_lock_bh (global_lock);
883                 }
884
885                 if (peer->ksnp_closing) {
886                         /* peer got closed under me */
887                         break;
888                 }
889
890                 /* Already got a route? */
891                 route = NULL;
892                 list_for_each(rtmp, &peer->ksnp_routes) {
893                         route = list_entry(rtmp, ksock_route_t, ksnr_list);
894
895                         if (route->ksnr_ipaddr == newroute->ksnr_ipaddr)
896                                 break;
897
898                         route = NULL;
899                 }
900                 if (route != NULL)
901                         continue;
902
903                 best_iface = NULL;
904                 best_nroutes = 0;
905                 best_netmatch = 0;
906
907                 LASSERT (net->ksnn_ninterfaces <= LNET_MAX_INTERFACES);
908
909                 /* Select interface to connect from */
910                 for (j = 0; j < net->ksnn_ninterfaces; j++) {
911                         iface = &net->ksnn_interfaces[j];
912
913                         /* Using this interface already? */
914                         list_for_each(rtmp, &peer->ksnp_routes) {
915                                 route = list_entry(rtmp, ksock_route_t, ksnr_list);
916
917                                 if (route->ksnr_myipaddr == iface->ksni_ipaddr)
918                                         break;
919
920                                 route = NULL;
921                         }
922                         if (route != NULL)
923                                 continue;
924
925                         this_netmatch = (((iface->ksni_ipaddr ^
926                                            newroute->ksnr_ipaddr) &
927                                            iface->ksni_netmask) == 0) ? 1 : 0;
928
929                         if (!(best_iface == NULL ||
930                               best_netmatch < this_netmatch ||
931                               (best_netmatch == this_netmatch &&
932                                best_nroutes > iface->ksni_nroutes)))
933                                 continue;
934
935                         best_iface = iface;
936                         best_netmatch = this_netmatch;
937                         best_nroutes = iface->ksni_nroutes;
938                 }
939
940                 if (best_iface == NULL)
941                         continue;
942
943                 newroute->ksnr_myipaddr = best_iface->ksni_ipaddr;
944                 best_iface->ksni_nroutes++;
945
946                 ksocknal_add_route_locked(peer, newroute);
947                 newroute = NULL;
948         }
949
950         write_unlock_bh (global_lock);
951         if (newroute != NULL)
952                 ksocknal_route_decref(newroute);
953 }
954
955 int
956 ksocknal_accept (lnet_ni_t *ni, cfs_socket_t *sock)
957 {
958         ksock_connreq_t    *cr;
959         int                 rc;
960         __u32               peer_ip;
961         int                 peer_port;
962
963         rc = libcfs_sock_getaddr(sock, 1, &peer_ip, &peer_port);
964         LASSERT (rc == 0);                      /* we succeeded before */
965
966         LIBCFS_ALLOC(cr, sizeof(*cr));
967         if (cr == NULL) {
968                 LCONSOLE_ERROR_MSG(0x12f, "Dropping connection request from "
969                                    "%u.%u.%u.%u: memory exhausted\n",
970                                    HIPQUAD(peer_ip));
971                 return -ENOMEM;
972         }
973
974         lnet_ni_addref(ni);
975         cr->ksncr_ni   = ni;
976         cr->ksncr_sock = sock;
977
978         spin_lock_bh (&ksocknal_data.ksnd_connd_lock);
979
980         list_add_tail(&cr->ksncr_list, &ksocknal_data.ksnd_connd_connreqs);
981         cfs_waitq_signal(&ksocknal_data.ksnd_connd_waitq);
982                         
983         spin_unlock_bh (&ksocknal_data.ksnd_connd_lock);
984         return 0;
985 }
986
987 int
988 ksocknal_create_conn (lnet_ni_t *ni, ksock_route_t *route, 
989                       cfs_socket_t *sock, int type)
990 {
991         rwlock_t          *global_lock = &ksocknal_data.ksnd_global_lock;
992         CFS_LIST_HEAD     (zombies);
993         lnet_process_id_t  peerid;
994         struct list_head  *tmp;
995         __u64              incarnation;
996         ksock_conn_t      *conn;
997         ksock_conn_t      *conn2;
998         ksock_peer_t      *peer = NULL;
999         ksock_peer_t      *peer2;
1000         ksock_sched_t     *sched;
1001         ksock_hello_msg_t *hello;
1002         unsigned int       irq;
1003         ksock_tx_t        *tx;
1004         int                rc;
1005         int                active;
1006         char              *warn = NULL;
1007
1008         active = (route != NULL);
1009
1010         LASSERT (active == (type != SOCKLND_CONN_NONE));
1011         LASSERT (route == NULL || route->ksnr_proto != NULL);
1012
1013         irq = ksocknal_lib_sock_irq (sock);
1014
1015         LIBCFS_ALLOC(conn, sizeof(*conn));
1016         if (conn == NULL) {
1017                 rc = -ENOMEM;
1018                 goto failed_0;
1019         }
1020
1021         memset (conn, 0, sizeof (*conn));
1022         conn->ksnc_peer = NULL;
1023         conn->ksnc_route = NULL;
1024         conn->ksnc_sock = sock;
1025         atomic_set (&conn->ksnc_sock_refcount, 1); /* 1 ref for conn */
1026         conn->ksnc_type = type;
1027         ksocknal_lib_save_callback(sock, conn);
1028         atomic_set (&conn->ksnc_conn_refcount, 1); /* 1 ref for me */
1029
1030         conn->ksnc_zc_capable = ksocknal_lib_zc_capable(sock);
1031
1032         conn->ksnc_rx_ready = 0;
1033         conn->ksnc_rx_scheduled = 0;
1034
1035         CFS_INIT_LIST_HEAD (&conn->ksnc_tx_queue);
1036         conn->ksnc_tx_ready = 0;
1037         conn->ksnc_tx_scheduled = 0;
1038         conn->ksnc_tx_mono = NULL;
1039         atomic_set (&conn->ksnc_tx_nob, 0);
1040
1041         LIBCFS_ALLOC(hello, offsetof(ksock_hello_msg_t,
1042                                      kshm_ips[LNET_MAX_INTERFACES]));
1043         if (hello == NULL) {
1044                 rc = -ENOMEM;
1045                 goto failed_1;
1046         }
1047
1048         /* stash conn's local and remote addrs */
1049         rc = ksocknal_lib_get_conn_addrs (conn);
1050         if (rc != 0)
1051                 goto failed_1;
1052
1053         /* Find out/confirm peer's NID and connection type and get the
1054          * vector of interfaces she's willing to let me connect to.
1055          * Passive connections use the listener timeout since the peer sends
1056          * eagerly */
1057
1058         if (active) {
1059                 LASSERT(ni == route->ksnr_peer->ksnp_ni);
1060
1061                 /* Active connection sends HELLO eagerly */
1062                 hello->kshm_nips = ksocknal_local_ipvec(ni, hello->kshm_ips);
1063                 peerid = route->ksnr_peer->ksnp_id;
1064                 conn->ksnc_proto = route->ksnr_proto;
1065
1066                 rc = ksocknal_send_hello (ni, conn, peerid.nid, hello);
1067                 if (rc != 0)
1068                         goto failed_1;
1069         } else {
1070                 peerid.nid = LNET_NID_ANY;
1071                 peerid.pid = LNET_PID_ANY;
1072
1073                 /* Passive, get protocol from peer */
1074                 conn->ksnc_proto = NULL;
1075         }
1076
1077         rc = ksocknal_recv_hello (ni, conn, hello, &peerid, &incarnation);
1078         if (rc < 0) {
1079                 if (rc == -EALREADY) {
1080                         /* only active connection loses conn race */
1081                         LASSERT (active);
1082
1083                         CDEBUG(D_NET, "Lost connection race with %s\n", 
1084                                libcfs_id2str(peerid));
1085                         /* Not an actual failure: return +ve RC so active
1086                          * connector can back off */
1087                         rc = EALREADY;
1088                 }
1089                 goto failed_1;
1090         }
1091
1092         if (active && route->ksnr_proto != conn->ksnc_proto) {
1093                 /* Active connecting, and different protocol is returned */
1094                 CDEBUG(D_NET, "Connecting by %d.x protocol is rejected,"
1095                               " compatible version %d.x found.\n",
1096                        route->ksnr_proto->pro_version,
1097                        conn->ksnc_proto->pro_version);
1098                 /* Not an actual failure: return +ve RC so active
1099                  * connector can back off */
1100                 rc = EPROTO;
1101
1102                 /* Retry with peer's protocol later */
1103                 route->ksnr_proto = conn->ksnc_proto;
1104
1105                 goto failed_1;
1106         }
1107         
1108         LASSERT (peerid.nid != LNET_NID_ANY);
1109
1110         if (active) {
1111                 peer = route->ksnr_peer;
1112                 ksocknal_peer_addref(peer);
1113
1114                 /* additional routes after interface exchange? */
1115                 ksocknal_create_routes(peer, conn->ksnc_port,
1116                                        hello->kshm_ips, hello->kshm_nips);
1117
1118                 /* setup the socket AFTER I've received hello (it disables
1119                  * SO_LINGER).  I might call back to the acceptor who may want
1120                  * to send a protocol version response and then close the
1121                  * socket; this ensures the socket only tears down after the
1122                  * response has been sent. */
1123                 rc = ksocknal_lib_setup_sock(sock);
1124
1125                 write_lock_bh (global_lock);
1126
1127                 if (rc != 0)
1128                         goto failed_2;
1129         } else {
1130                 rc = ksocknal_create_peer(&peer, ni, peerid);
1131                 if (rc != 0)
1132                         goto failed_1;
1133
1134                 write_lock_bh (global_lock);
1135
1136                 peer2 = ksocknal_find_peer_locked(ni, peerid);
1137                 if (peer2 == NULL) {
1138                         /* NB this puts an "empty" peer in the peer
1139                          * table (which takes my ref) */
1140                         list_add_tail(&peer->ksnp_list,
1141                                       ksocknal_nid2peerlist(peerid.nid));
1142                 } else {
1143                         ksocknal_peer_decref(peer);
1144                         peer = peer2;
1145                 }
1146
1147                 /* +1 ref for me */
1148                 ksocknal_peer_addref(peer);
1149                 peer->ksnp_accepting++;
1150                 
1151                 /* Am I already connecting to this guy?  Resolve in
1152                  * favour of higher NID... */
1153                 rc = 0;
1154                 if (peerid.nid < ni->ni_nid) {
1155                         list_for_each(tmp, &peer->ksnp_routes) {
1156                                 route = list_entry(tmp, ksock_route_t, 
1157                                                    ksnr_list);
1158
1159                                 if (route->ksnr_ipaddr != conn->ksnc_ipaddr)
1160                                         continue;
1161                         
1162                                 if (route->ksnr_connecting) {
1163                                         rc = EALREADY;  /* not a failure */
1164                                         warn = "connection race";
1165                                 }
1166
1167                                 break;
1168                         }
1169                 }
1170                 route = NULL;
1171                 
1172                 write_unlock_bh (global_lock);
1173
1174                 if (rc != 0) {
1175                         /* set CONN_NONE makes returned HELLO acknowledge I
1176                          * lost a connection race */
1177                         conn->ksnc_type = SOCKLND_CONN_NONE;
1178                         hello->kshm_nips = 0;
1179                         ksocknal_send_hello(ni, conn, peerid.nid, hello);
1180                 } else {
1181                         hello->kshm_nips = ksocknal_select_ips(peer, hello->kshm_ips,
1182                                                                hello->kshm_nips);
1183                         rc = ksocknal_send_hello(ni, conn, peerid.nid, hello);
1184
1185                         /* Setup the socket (it disables SO_LINGER).  I don't
1186                          * do it if I'm sending a negative response to ensure
1187                          * the response isn't discarded when I close the socket
1188                          * immediately after sending it. */
1189                         if (rc == 0)
1190                                 rc = ksocknal_lib_setup_sock(sock);
1191                 }
1192                 
1193                 write_lock_bh (global_lock);
1194                 peer->ksnp_accepting--;
1195                 
1196                 if (rc != 0)
1197                         goto failed_2;
1198         }
1199
1200         if (peer->ksnp_closing ||
1201             (active && route->ksnr_deleted)) {
1202                 /* peer/route got closed under me */
1203                 rc = -ESTALE;
1204                 warn = "peer/route removed";
1205                 goto failed_2;
1206         }
1207
1208         /* Refuse to duplicate an existing connection, unless this is a
1209          * loopback connection */
1210         if (conn->ksnc_ipaddr != conn->ksnc_myipaddr) {
1211                 list_for_each(tmp, &peer->ksnp_conns) {
1212                         conn2 = list_entry(tmp, ksock_conn_t, ksnc_list);
1213
1214                         if (conn2->ksnc_ipaddr != conn->ksnc_ipaddr ||
1215                             conn2->ksnc_myipaddr != conn->ksnc_myipaddr ||
1216                             conn2->ksnc_type != conn->ksnc_type ||
1217                             conn2->ksnc_incarnation != incarnation)
1218                                 continue;
1219
1220                         rc = 0;    /* more of a NOOP than a failure */
1221                         warn = "duplicate";
1222                         goto failed_2;
1223                 }
1224         }
1225
1226         /* If the connection created by this route didn't bind to the IP
1227          * address the route connected to, the connection/route matching
1228          * code below probably isn't going to work. */
1229         if (active &&
1230             route->ksnr_ipaddr != conn->ksnc_ipaddr) {
1231                 CERROR("Route %s %u.%u.%u.%u connected to %u.%u.%u.%u\n",
1232                        libcfs_id2str(peer->ksnp_id),
1233                        HIPQUAD(route->ksnr_ipaddr),
1234                        HIPQUAD(conn->ksnc_ipaddr));
1235         }
1236
1237         /* Search for a route corresponding to the new connection and
1238          * create an association.  This allows incoming connections created
1239          * by routes in my peer to match my own route entries so I don't
1240          * continually create duplicate routes. */
1241         list_for_each (tmp, &peer->ksnp_routes) {
1242                 route = list_entry(tmp, ksock_route_t, ksnr_list);
1243
1244                 if (route->ksnr_ipaddr != conn->ksnc_ipaddr)
1245                         continue;
1246
1247                 ksocknal_associate_route_conn_locked(route, conn);
1248                 break;
1249         }
1250
1251         conn->ksnc_peer = peer;                 /* conn takes my ref on peer */
1252         conn->ksnc_incarnation = incarnation;
1253         peer->ksnp_last_alive = cfs_time_current();
1254         peer->ksnp_error = 0;
1255
1256         sched = ksocknal_choose_scheduler_locked (irq);
1257         sched->kss_nconns++;
1258         conn->ksnc_scheduler = sched;
1259
1260         /* Set the deadline for the outgoing HELLO to drain */
1261         conn->ksnc_tx_bufnob = SOCK_WMEM_QUEUED(sock);
1262         conn->ksnc_tx_deadline = cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
1263         mb();       /* order with adding to peer's conn list */
1264
1265         list_add (&conn->ksnc_list, &peer->ksnp_conns);
1266         ksocknal_conn_addref(conn);
1267
1268         ksocknal_new_packet(conn, 0);
1269
1270         /* NB my callbacks block while I hold ksnd_global_lock */
1271         ksocknal_lib_set_callback(sock, conn);
1272
1273         /* Take all the packets blocking for a connection.
1274          * NB, it might be nicer to share these blocked packets among any
1275          * other connections that are becoming established. */
1276         while (!list_empty (&peer->ksnp_tx_queue)) {
1277                 tx = list_entry (peer->ksnp_tx_queue.next,
1278                                  ksock_tx_t, tx_list);
1279
1280                 list_del (&tx->tx_list);
1281                 ksocknal_queue_tx_locked (tx, conn);
1282         }
1283
1284         rc = ksocknal_close_stale_conns_locked(peer, incarnation);
1285         write_unlock_bh (global_lock);
1286
1287         if (rc != 0)
1288                 CDEBUG(D_NET, "Closed %d stale conns to %s ip %d.%d.%d.%d\n",
1289                        rc, libcfs_id2str(conn->ksnc_peer->ksnp_id),
1290                        HIPQUAD(conn->ksnc_ipaddr));
1291
1292         ksocknal_lib_bind_irq (irq);
1293
1294         /* Call the callbacks right now to get things going. */
1295         if (ksocknal_connsock_addref(conn) == 0) {
1296                 ksocknal_read_callback(conn);
1297                 ksocknal_write_callback(conn);
1298                 ksocknal_connsock_decref(conn);
1299         }
1300
1301         CDEBUG(D_NET, "New conn %s %u.%u.%u.%u -> %u.%u.%u.%u/%d"
1302                " incarnation:"LPD64" sched[%d]/%d\n",
1303                libcfs_id2str(peerid), HIPQUAD(conn->ksnc_myipaddr),
1304                HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port, incarnation,
1305                (int)(conn->ksnc_scheduler - ksocknal_data.ksnd_schedulers), irq);
1306
1307         LIBCFS_FREE(hello, offsetof(ksock_hello_msg_t,
1308                                     kshm_ips[LNET_MAX_INTERFACES]));
1309
1310         ksocknal_conn_decref(conn);
1311         return (0);
1312
1313  failed_2:
1314         if (!peer->ksnp_closing &&
1315             list_empty (&peer->ksnp_conns) &&
1316             list_empty (&peer->ksnp_routes)) {
1317                 list_add(&zombies, &peer->ksnp_tx_queue);
1318                 list_del_init(&peer->ksnp_tx_queue);
1319                 ksocknal_unlink_peer_locked(peer);
1320         }
1321         
1322         write_unlock_bh (global_lock);
1323
1324         if (warn != NULL) {
1325                 if (rc < 0)
1326                         CERROR("Not creating conn %s type %d: %s\n",
1327                                libcfs_id2str(peerid), conn->ksnc_type, warn);
1328                 else
1329                         CDEBUG(D_NET, "Not creating conn %s type %d: %s\n",
1330                               libcfs_id2str(peerid), conn->ksnc_type, warn);
1331         }
1332
1333         ksocknal_txlist_done(ni, &zombies, 1);
1334         ksocknal_peer_decref(peer);
1335
1336  failed_1:
1337         if (hello != NULL)
1338                 LIBCFS_FREE(hello, offsetof(ksock_hello_msg_t,
1339                                             kshm_ips[LNET_MAX_INTERFACES]));
1340
1341         LIBCFS_FREE (conn, sizeof(*conn));
1342
1343  failed_0:
1344         libcfs_sock_release(sock);
1345         return rc;
1346 }
1347
1348 void
1349 ksocknal_close_conn_locked (ksock_conn_t *conn, int error)
1350 {
1351         /* This just does the immmediate housekeeping, and queues the
1352          * connection for the reaper to terminate.
1353          * Caller holds ksnd_global_lock exclusively in irq context */
1354         ksock_peer_t      *peer = conn->ksnc_peer;
1355         ksock_route_t     *route;
1356         ksock_conn_t      *conn2;
1357         struct list_head  *tmp;
1358
1359         LASSERT (peer->ksnp_error == 0);
1360         LASSERT (!conn->ksnc_closing);
1361         conn->ksnc_closing = 1;
1362
1363         /* ksnd_deathrow_conns takes over peer's ref */
1364         list_del (&conn->ksnc_list);
1365
1366         route = conn->ksnc_route;
1367         if (route != NULL) {
1368                 /* dissociate conn from route... */
1369                 LASSERT (!route->ksnr_deleted);
1370                 LASSERT ((route->ksnr_connected & (1 << conn->ksnc_type)) != 0);
1371
1372                 conn2 = NULL;
1373                 list_for_each(tmp, &peer->ksnp_conns) {
1374                         conn2 = list_entry(tmp, ksock_conn_t, ksnc_list);
1375
1376                         if (conn2->ksnc_route == route &&
1377                             conn2->ksnc_type == conn->ksnc_type)
1378                                 break;
1379
1380                         conn2 = NULL;
1381                 }
1382                 if (conn2 == NULL)
1383                         route->ksnr_connected &= ~(1 << conn->ksnc_type);
1384
1385                 conn->ksnc_route = NULL;
1386
1387 #if 0           /* irrelevent with only eager routes */
1388                 list_del (&route->ksnr_list);   /* make route least favourite */
1389                 list_add_tail (&route->ksnr_list, &peer->ksnp_routes);
1390 #endif
1391                 ksocknal_route_decref(route);     /* drop conn's ref on route */
1392         }
1393
1394         if (list_empty (&peer->ksnp_conns)) {
1395                 /* No more connections to this peer */
1396
1397                 peer->ksnp_error = error;       /* stash last conn close reason */
1398
1399                 if (list_empty (&peer->ksnp_routes)) {
1400                         /* I've just closed last conn belonging to a
1401                          * peer with no routes to it */
1402                         ksocknal_unlink_peer_locked (peer);
1403                 }
1404         }
1405
1406         spin_lock_bh (&ksocknal_data.ksnd_reaper_lock);
1407
1408         list_add_tail (&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
1409         cfs_waitq_signal (&ksocknal_data.ksnd_reaper_waitq);
1410
1411         spin_unlock_bh (&ksocknal_data.ksnd_reaper_lock);
1412 }
1413
1414 void
1415 ksocknal_peer_failed (ksock_peer_t *peer)
1416 {
1417         time_t    last_alive = 0;
1418         int       notify = 0;
1419
1420         /* There has been a connection failure or comms error; but I'll only
1421          * tell LNET I think the peer is dead if it's to another kernel and
1422          * there are no connections or connection attempts in existance. */
1423         
1424         read_lock (&ksocknal_data.ksnd_global_lock);
1425
1426         if ((peer->ksnp_id.pid & LNET_PID_USERFLAG) == 0 &&
1427             list_empty(&peer->ksnp_conns) &&
1428             peer->ksnp_accepting == 0 &&
1429             ksocknal_find_connecting_route_locked(peer) == NULL) {
1430                 notify = 1;
1431                 last_alive = cfs_time_current_sec() - 
1432                              cfs_duration_sec(cfs_time_current() - 
1433                                               peer->ksnp_last_alive);
1434         }
1435         
1436         read_unlock (&ksocknal_data.ksnd_global_lock);
1437
1438         if (notify)
1439                 lnet_notify (peer->ksnp_ni, peer->ksnp_id.nid, 0,
1440                              last_alive);
1441 }
1442
1443 void
1444 ksocknal_terminate_conn (ksock_conn_t *conn)
1445 {
1446         /* This gets called by the reaper (guaranteed thread context) to
1447          * disengage the socket from its callbacks and close it.
1448          * ksnc_refcount will eventually hit zero, and then the reaper will
1449          * destroy it. */
1450         ksock_peer_t     *peer = conn->ksnc_peer;
1451         ksock_sched_t    *sched = conn->ksnc_scheduler;
1452         int               failed = 0;
1453         struct list_head *tmp;
1454         struct list_head *nxt;
1455         ksock_tx_t       *tx;
1456         LIST_HEAD        (zlist);
1457
1458         LASSERT(conn->ksnc_closing);
1459
1460         /* wake up the scheduler to "send" all remaining packets to /dev/null */
1461         spin_lock_bh (&sched->kss_lock);
1462
1463         if (!conn->ksnc_tx_scheduled &&
1464             !list_empty(&conn->ksnc_tx_queue)){
1465                 list_add_tail (&conn->ksnc_tx_list,
1466                                &sched->kss_tx_conns);
1467                 /* a closing conn is always ready to tx */
1468                 conn->ksnc_tx_ready = 1;
1469                 conn->ksnc_tx_scheduled = 1;
1470                 /* extra ref for scheduler */
1471                 ksocknal_conn_addref(conn);
1472
1473                 cfs_waitq_signal (&sched->kss_waitq);
1474         }
1475
1476         spin_unlock_bh (&sched->kss_lock);
1477
1478         spin_lock(&peer->ksnp_lock);
1479
1480         list_for_each_safe(tmp, nxt, &peer->ksnp_zc_req_list) {
1481                 tx = list_entry(tmp, ksock_tx_t, tx_zc_list);
1482
1483                 if (tx->tx_conn != conn)
1484                         continue;
1485
1486                 LASSERT (tx->tx_msg.ksm_zc_req_cookie != 0);
1487
1488                 tx->tx_msg.ksm_zc_req_cookie = 0;
1489                 list_del(&tx->tx_zc_list);
1490                 list_add(&tx->tx_zc_list, &zlist);
1491         }
1492
1493         spin_unlock(&peer->ksnp_lock);
1494
1495         list_for_each_safe(tmp, nxt, &zlist) {
1496                 tx = list_entry(tmp, ksock_tx_t, tx_zc_list);
1497
1498                 list_del(&tx->tx_zc_list);
1499                 ksocknal_tx_decref(tx);
1500         }
1501
1502         /* serialise with callbacks */
1503         write_lock_bh (&ksocknal_data.ksnd_global_lock);
1504
1505         ksocknal_lib_reset_callback(conn->ksnc_sock, conn);
1506
1507         /* OK, so this conn may not be completely disengaged from its
1508          * scheduler yet, but it _has_ committed to terminate... */
1509         conn->ksnc_scheduler->kss_nconns--;
1510
1511         if (peer->ksnp_error != 0) {
1512                 /* peer's last conn closed in error */
1513                 LASSERT (list_empty (&peer->ksnp_conns));
1514                 failed = 1;
1515                 peer->ksnp_error = 0;     /* avoid multiple notifications */
1516         }
1517
1518         write_unlock_bh (&ksocknal_data.ksnd_global_lock);
1519
1520         if (failed)
1521                 ksocknal_peer_failed(peer);
1522
1523         /* The socket is closed on the final put; either here, or in
1524          * ksocknal_{send,recv}msg().  Since we set up the linger2 option
1525          * when the connection was established, this will close the socket
1526          * immediately, aborting anything buffered in it. Any hung
1527          * zero-copy transmits will therefore complete in finite time. */
1528         ksocknal_connsock_decref(conn);
1529 }
1530
1531 void
1532 ksocknal_queue_zombie_conn (ksock_conn_t *conn)
1533 {
1534         /* Queue the conn for the reaper to destroy */
1535
1536         LASSERT (atomic_read(&conn->ksnc_conn_refcount) == 0);
1537         spin_lock_bh (&ksocknal_data.ksnd_reaper_lock);
1538
1539         list_add_tail(&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1540         cfs_waitq_signal(&ksocknal_data.ksnd_reaper_waitq);
1541         
1542         spin_unlock_bh (&ksocknal_data.ksnd_reaper_lock);
1543 }
1544
1545 void
1546 ksocknal_destroy_conn (ksock_conn_t *conn)
1547 {
1548         /* Final coup-de-grace of the reaper */
1549         CDEBUG (D_NET, "connection %p\n", conn);
1550
1551         LASSERT (atomic_read (&conn->ksnc_conn_refcount) == 0);
1552         LASSERT (atomic_read (&conn->ksnc_sock_refcount) == 0);
1553         LASSERT (conn->ksnc_sock == NULL);
1554         LASSERT (conn->ksnc_route == NULL);
1555         LASSERT (!conn->ksnc_tx_scheduled);
1556         LASSERT (!conn->ksnc_rx_scheduled);
1557         LASSERT (list_empty(&conn->ksnc_tx_queue));
1558
1559         /* complete current receive if any */
1560         switch (conn->ksnc_rx_state) {
1561         case SOCKNAL_RX_LNET_PAYLOAD:
1562                 CERROR("Completing partial receive from %s"
1563                        ", ip %d.%d.%d.%d:%d, with error\n",
1564                        libcfs_id2str(conn->ksnc_peer->ksnp_id),
1565                        HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
1566                 lnet_finalize (conn->ksnc_peer->ksnp_ni, 
1567                                conn->ksnc_cookie, -EIO);
1568                 break;
1569         case SOCKNAL_RX_LNET_HEADER:
1570                 if (conn->ksnc_rx_started)
1571                         CERROR("Incomplete receive of lnet header from %s"
1572                                ", ip %d.%d.%d.%d:%d, with error, protocol: %d.x.\n",
1573                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1574                                HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port,
1575                                conn->ksnc_proto->pro_version);
1576                 break;
1577         case SOCKNAL_RX_KSM_HEADER:
1578                 if (conn->ksnc_rx_started)
1579                         CERROR("Incomplete receive of ksock message from %s"
1580                                ", ip %d.%d.%d.%d:%d, with error, protocol: %d.x.\n",
1581                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1582                                HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port,
1583                                conn->ksnc_proto->pro_version);
1584                 break;
1585         case SOCKNAL_RX_SLOP:
1586                 if (conn->ksnc_rx_started)
1587                         CERROR("Incomplete receive of slops from %s"
1588                                ", ip %d.%d.%d.%d:%d, with error\n",
1589                                libcfs_id2str(conn->ksnc_peer->ksnp_id),
1590                                HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
1591                break;
1592         default:
1593                 LBUG ();
1594                 break;
1595         }
1596
1597         ksocknal_peer_decref(conn->ksnc_peer);
1598
1599         LIBCFS_FREE (conn, sizeof (*conn));
1600 }
1601
1602 int
1603 ksocknal_close_peer_conns_locked (ksock_peer_t *peer, __u32 ipaddr, int why)
1604 {
1605         ksock_conn_t       *conn;
1606         struct list_head   *ctmp;
1607         struct list_head   *cnxt;
1608         int                 count = 0;
1609
1610         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1611                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
1612
1613                 if (ipaddr == 0 ||
1614                     conn->ksnc_ipaddr == ipaddr) {
1615                         count++;
1616                         ksocknal_close_conn_locked (conn, why);
1617                 }
1618         }
1619
1620         return (count);
1621 }
1622
1623 int
1624 ksocknal_close_stale_conns_locked (ksock_peer_t *peer, __u64 incarnation)
1625 {
1626         ksock_conn_t       *conn;
1627         struct list_head   *ctmp;
1628         struct list_head   *cnxt;
1629         int                 count = 0;
1630
1631         list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1632                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
1633
1634                 if (conn->ksnc_incarnation == incarnation)
1635                         continue;
1636
1637                 CDEBUG(D_NET, "Closing stale conn %s ip:%08x/%d "
1638                        "incarnation:"LPD64"("LPD64")\n",
1639                        libcfs_id2str(peer->ksnp_id), 
1640                        conn->ksnc_ipaddr, conn->ksnc_port,
1641                        conn->ksnc_incarnation, incarnation);
1642
1643                 count++;
1644                 ksocknal_close_conn_locked (conn, -ESTALE);
1645         }
1646
1647         return (count);
1648 }
1649
1650 int
1651 ksocknal_close_conn_and_siblings (ksock_conn_t *conn, int why)
1652 {
1653         ksock_peer_t     *peer = conn->ksnc_peer;
1654         __u32             ipaddr = conn->ksnc_ipaddr;
1655         int               count;
1656
1657         write_lock_bh (&ksocknal_data.ksnd_global_lock);
1658
1659         count = ksocknal_close_peer_conns_locked (peer, ipaddr, why);
1660
1661         write_unlock_bh (&ksocknal_data.ksnd_global_lock);
1662
1663         return (count);
1664 }
1665
1666 int
1667 ksocknal_close_matching_conns (lnet_process_id_t id, __u32 ipaddr)
1668 {
1669         ksock_peer_t       *peer;
1670         struct list_head   *ptmp;
1671         struct list_head   *pnxt;
1672         int                 lo;
1673         int                 hi;
1674         int                 i;
1675         int                 count = 0;
1676
1677         write_lock_bh (&ksocknal_data.ksnd_global_lock);
1678
1679         if (id.nid != LNET_NID_ANY)
1680                 lo = hi = ksocknal_nid2peerlist(id.nid) - ksocknal_data.ksnd_peers;
1681         else {
1682                 lo = 0;
1683                 hi = ksocknal_data.ksnd_peer_hash_size - 1;
1684         }
1685
1686         for (i = lo; i <= hi; i++) {
1687                 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
1688
1689                         peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
1690
1691                         if (!((id.nid == LNET_NID_ANY || id.nid == peer->ksnp_id.nid) &&
1692                               (id.pid == LNET_PID_ANY || id.pid == peer->ksnp_id.pid)))
1693                                 continue;
1694
1695                         count += ksocknal_close_peer_conns_locked (peer, ipaddr, 0);
1696                 }
1697         }
1698
1699         write_unlock_bh (&ksocknal_data.ksnd_global_lock);
1700
1701         /* wildcards always succeed */
1702         if (id.nid == LNET_NID_ANY || id.pid == LNET_PID_ANY || ipaddr == 0)
1703                 return (0);
1704
1705         return (count == 0 ? -ENOENT : 0);
1706 }
1707
1708 void
1709 ksocknal_notify (lnet_ni_t *ni, lnet_nid_t gw_nid, int alive)
1710 {
1711         /* The router is telling me she's been notified of a change in
1712          * gateway state.... */
1713         lnet_process_id_t  id = {.nid = gw_nid, .pid = LNET_PID_ANY};
1714
1715         CDEBUG (D_NET, "gw %s %s\n", libcfs_nid2str(gw_nid), 
1716                 alive ? "up" : "down");
1717
1718         if (!alive) {
1719                 /* If the gateway crashed, close all open connections... */
1720                 ksocknal_close_matching_conns (id, 0);
1721                 return;
1722         }
1723
1724         /* ...otherwise do nothing.  We can only establish new connections
1725          * if we have autroutes, and these connect on demand. */
1726 }
1727
1728 void
1729 ksocknal_push_peer (ksock_peer_t *peer)
1730 {
1731         int               index;
1732         int               i;
1733         struct list_head *tmp;
1734         ksock_conn_t     *conn;
1735
1736         for (index = 0; ; index++) {
1737                 read_lock (&ksocknal_data.ksnd_global_lock);
1738
1739                 i = 0;
1740                 conn = NULL;
1741
1742                 list_for_each (tmp, &peer->ksnp_conns) {
1743                         if (i++ == index) {
1744                                 conn = list_entry (tmp, ksock_conn_t, ksnc_list);
1745                                 ksocknal_conn_addref(conn);
1746                                 break;
1747                         }
1748                 }
1749
1750                 read_unlock (&ksocknal_data.ksnd_global_lock);
1751
1752                 if (conn == NULL)
1753                         break;
1754
1755                 ksocknal_lib_push_conn (conn);
1756                 ksocknal_conn_decref(conn);
1757         }
1758 }
1759
1760 int
1761 ksocknal_push (lnet_ni_t *ni, lnet_process_id_t id)
1762 {
1763         ksock_peer_t      *peer;
1764         struct list_head  *tmp;
1765         int                index;
1766         int                i;
1767         int                j;
1768         int                rc = -ENOENT;
1769
1770         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1771                 for (j = 0; ; j++) {
1772                         read_lock (&ksocknal_data.ksnd_global_lock);
1773
1774                         index = 0;
1775                         peer = NULL;
1776
1777                         list_for_each (tmp, &ksocknal_data.ksnd_peers[i]) {
1778                                 peer = list_entry(tmp, ksock_peer_t,
1779                                                   ksnp_list);
1780
1781                                 if (!((id.nid == LNET_NID_ANY ||
1782                                        id.nid == peer->ksnp_id.nid) &&
1783                                       (id.pid == LNET_PID_ANY ||
1784                                        id.pid == peer->ksnp_id.pid))) {
1785                                         peer = NULL;
1786                                         continue;
1787                                 }
1788
1789                                 if (index++ == j) {
1790                                         ksocknal_peer_addref(peer);
1791                                         break;
1792                                 }
1793                         }
1794
1795                         read_unlock (&ksocknal_data.ksnd_global_lock);
1796
1797                         if (peer != NULL) {
1798                                 rc = 0;
1799                                 ksocknal_push_peer (peer);
1800                                 ksocknal_peer_decref(peer);
1801                         }
1802                 }
1803
1804         }
1805
1806         return (rc);
1807 }
1808
1809 int
1810 ksocknal_add_interface(lnet_ni_t *ni, __u32 ipaddress, __u32 netmask)
1811 {
1812         ksock_net_t       *net = ni->ni_data;
1813         ksock_interface_t *iface;
1814         int                rc;
1815         int                i;
1816         int                j;
1817         struct list_head  *ptmp;
1818         ksock_peer_t      *peer;
1819         struct list_head  *rtmp;
1820         ksock_route_t     *route;
1821
1822         if (ipaddress == 0 ||
1823             netmask == 0)
1824                 return (-EINVAL);
1825
1826         write_lock_bh (&ksocknal_data.ksnd_global_lock);
1827
1828         iface = ksocknal_ip2iface(ni, ipaddress);
1829         if (iface != NULL) {
1830                 /* silently ignore dups */
1831                 rc = 0;
1832         } else if (net->ksnn_ninterfaces == LNET_MAX_INTERFACES) {
1833                 rc = -ENOSPC;
1834         } else {
1835                 iface = &net->ksnn_interfaces[net->ksnn_ninterfaces++];
1836
1837                 iface->ksni_ipaddr = ipaddress;
1838                 iface->ksni_netmask = netmask;
1839                 iface->ksni_nroutes = 0;
1840                 iface->ksni_npeers = 0;
1841
1842                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1843                         list_for_each(ptmp, &ksocknal_data.ksnd_peers[i]) {
1844                                 peer = list_entry(ptmp, ksock_peer_t, ksnp_list);
1845
1846                                 for (j = 0; i < peer->ksnp_n_passive_ips; j++)
1847                                         if (peer->ksnp_passive_ips[j] == ipaddress)
1848                                                 iface->ksni_npeers++;
1849
1850                                 list_for_each(rtmp, &peer->ksnp_routes) {
1851                                         route = list_entry(rtmp, ksock_route_t, ksnr_list);
1852
1853                                         if (route->ksnr_myipaddr == ipaddress)
1854                                                 iface->ksni_nroutes++;
1855                                 }
1856                         }
1857                 }
1858
1859                 rc = 0;
1860                 /* NB only new connections will pay attention to the new interface! */
1861         }
1862
1863         write_unlock_bh (&ksocknal_data.ksnd_global_lock);
1864
1865         return (rc);
1866 }
1867
1868 void
1869 ksocknal_peer_del_interface_locked(ksock_peer_t *peer, __u32 ipaddr)
1870 {
1871         struct list_head   *tmp;
1872         struct list_head   *nxt;
1873         ksock_route_t      *route;
1874         ksock_conn_t       *conn;
1875         int                 i;
1876         int                 j;
1877
1878         for (i = 0; i < peer->ksnp_n_passive_ips; i++)
1879                 if (peer->ksnp_passive_ips[i] == ipaddr) {
1880                         for (j = i+1; j < peer->ksnp_n_passive_ips; j++)
1881                                 peer->ksnp_passive_ips[j-1] =
1882                                         peer->ksnp_passive_ips[j];
1883                         peer->ksnp_n_passive_ips--;
1884                         break;
1885                 }
1886
1887         list_for_each_safe(tmp, nxt, &peer->ksnp_routes) {
1888                 route = list_entry (tmp, ksock_route_t, ksnr_list);
1889
1890                 if (route->ksnr_myipaddr != ipaddr)
1891                         continue;
1892
1893                 if (route->ksnr_share_count != 0) {
1894                         /* Manually created; keep, but unbind */
1895                         route->ksnr_myipaddr = 0;
1896                 } else {
1897                         ksocknal_del_route_locked(route);
1898                 }
1899         }
1900
1901         list_for_each_safe(tmp, nxt, &peer->ksnp_conns) {
1902                 conn = list_entry(tmp, ksock_conn_t, ksnc_list);
1903
1904                 if (conn->ksnc_myipaddr == ipaddr)
1905                         ksocknal_close_conn_locked (conn, 0);
1906         }
1907 }
1908
1909 int
1910 ksocknal_del_interface(lnet_ni_t *ni, __u32 ipaddress)
1911 {
1912         ksock_net_t       *net = ni->ni_data;
1913         int                rc = -ENOENT;
1914         struct list_head  *tmp;
1915         struct list_head  *nxt;
1916         ksock_peer_t      *peer;
1917         __u32              this_ip;
1918         int                i;
1919         int                j;
1920
1921         write_lock_bh (&ksocknal_data.ksnd_global_lock);
1922
1923         for (i = 0; i < net->ksnn_ninterfaces; i++) {
1924                 this_ip = net->ksnn_interfaces[i].ksni_ipaddr;
1925
1926                 if (!(ipaddress == 0 ||
1927                       ipaddress == this_ip))
1928                         continue;
1929
1930                 rc = 0;
1931
1932                 for (j = i+1; j < net->ksnn_ninterfaces; j++)
1933                         net->ksnn_interfaces[j-1] =
1934                                 net->ksnn_interfaces[j];
1935
1936                 net->ksnn_ninterfaces--;
1937
1938                 for (j = 0; j < ksocknal_data.ksnd_peer_hash_size; j++) {
1939                         list_for_each_safe(tmp, nxt, &ksocknal_data.ksnd_peers[j]) {
1940                                 peer = list_entry(tmp, ksock_peer_t, ksnp_list);
1941
1942                                 if (peer->ksnp_ni != ni)
1943                                         continue;
1944
1945                                 ksocknal_peer_del_interface_locked(peer, this_ip);
1946                         }
1947                 }
1948         }
1949
1950         write_unlock_bh (&ksocknal_data.ksnd_global_lock);
1951
1952         return (rc);
1953 }
1954
1955 int
1956 ksocknal_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1957 {
1958         struct libcfs_ioctl_data *data = arg;
1959         int rc;
1960
1961         switch(cmd) {
1962         case IOC_LIBCFS_GET_INTERFACE: {
1963                 ksock_net_t       *net = ni->ni_data;
1964                 ksock_interface_t *iface;
1965
1966                 read_lock (&ksocknal_data.ksnd_global_lock);
1967
1968                 if (data->ioc_count < 0 ||
1969                     data->ioc_count >= net->ksnn_ninterfaces) {
1970                         rc = -ENOENT;
1971                 } else {
1972                         rc = 0;
1973                         iface = &net->ksnn_interfaces[data->ioc_count];
1974
1975                         data->ioc_u32[0] = iface->ksni_ipaddr;
1976                         data->ioc_u32[1] = iface->ksni_netmask;
1977                         data->ioc_u32[2] = iface->ksni_npeers;
1978                         data->ioc_u32[3] = iface->ksni_nroutes;
1979                 }
1980
1981                 read_unlock (&ksocknal_data.ksnd_global_lock);
1982                 return rc;
1983         }
1984
1985         case IOC_LIBCFS_ADD_INTERFACE:
1986                 return ksocknal_add_interface(ni,
1987                                               data->ioc_u32[0], /* IP address */
1988                                               data->ioc_u32[1]); /* net mask */
1989
1990         case IOC_LIBCFS_DEL_INTERFACE:
1991                 return ksocknal_del_interface(ni, 
1992                                               data->ioc_u32[0]); /* IP address */
1993
1994         case IOC_LIBCFS_GET_PEER: {
1995                 lnet_process_id_t id = {0,};
1996                 __u32            myip = 0;
1997                 __u32            ip = 0;
1998                 int              port = 0;
1999                 int              conn_count = 0;
2000                 int              share_count = 0;
2001
2002                 rc = ksocknal_get_peer_info(ni, data->ioc_count,
2003                                             &id, &myip, &ip, &port,
2004                                             &conn_count,  &share_count);
2005                 if (rc != 0)
2006                         return rc;
2007                         
2008                 data->ioc_nid    = id.nid;
2009                 data->ioc_count  = share_count;
2010                 data->ioc_u32[0] = ip;
2011                 data->ioc_u32[1] = port;
2012                 data->ioc_u32[2] = myip;
2013                 data->ioc_u32[3] = conn_count;
2014                 data->ioc_u32[4] = id.pid;
2015                 return 0;
2016         }
2017
2018         case IOC_LIBCFS_ADD_PEER: {
2019                 lnet_process_id_t  id = {.nid = data->ioc_nid,
2020                                          .pid = LUSTRE_SRV_LNET_PID};
2021                 return ksocknal_add_peer (ni, id,
2022                                           data->ioc_u32[0], /* IP */
2023                                           data->ioc_u32[1]); /* port */
2024         }
2025         case IOC_LIBCFS_DEL_PEER: {
2026                 lnet_process_id_t  id = {.nid = data->ioc_nid,
2027                                          .pid = LNET_PID_ANY};
2028                 return ksocknal_del_peer (ni, id,
2029                                           data->ioc_u32[0]); /* IP */
2030         }
2031         case IOC_LIBCFS_GET_CONN: {
2032                 int           txmem;
2033                 int           rxmem;
2034                 int           nagle;
2035                 ksock_conn_t *conn = ksocknal_get_conn_by_idx (ni, data->ioc_count);
2036
2037                 if (conn == NULL)
2038                         return -ENOENT;
2039
2040                 ksocknal_lib_get_conn_tunables(conn, &txmem, &rxmem, &nagle);
2041
2042                 data->ioc_count  = txmem;
2043                 data->ioc_nid    = conn->ksnc_peer->ksnp_id.nid;
2044                 data->ioc_flags  = nagle;
2045                 data->ioc_u32[0] = conn->ksnc_ipaddr;
2046                 data->ioc_u32[1] = conn->ksnc_port;
2047                 data->ioc_u32[2] = conn->ksnc_myipaddr;
2048                 data->ioc_u32[3] = conn->ksnc_type;
2049                 data->ioc_u32[4] = conn->ksnc_scheduler -
2050                                    ksocknal_data.ksnd_schedulers;
2051                 data->ioc_u32[5] = rxmem;
2052                 data->ioc_u32[6] = conn->ksnc_peer->ksnp_id.pid;
2053                 ksocknal_conn_decref(conn);
2054                 return 0;
2055         }
2056
2057         case IOC_LIBCFS_CLOSE_CONNECTION: {
2058                 lnet_process_id_t  id = {.nid = data->ioc_nid,
2059                                         .pid = LNET_PID_ANY};
2060
2061                 return ksocknal_close_matching_conns (id,
2062                                                       data->ioc_u32[0]);
2063         }
2064         case IOC_LIBCFS_REGISTER_MYNID:
2065                 /* Ignore if this is a noop */
2066                 if (data->ioc_nid == ni->ni_nid)
2067                         return 0;
2068
2069                 CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID: %s(%s)\n",
2070                        libcfs_nid2str(data->ioc_nid),
2071                        libcfs_nid2str(ni->ni_nid));
2072                 return -EINVAL;
2073
2074         case IOC_LIBCFS_PUSH_CONNECTION: {
2075                 lnet_process_id_t  id = {.nid = data->ioc_nid,
2076                                         .pid = LNET_PID_ANY};
2077                 
2078                 return ksocknal_push(ni, id);
2079         }
2080         default:
2081                 return -EINVAL;
2082         }
2083         /* not reached */
2084 }
2085
2086 void
2087 ksocknal_free_buffers (void)
2088 {
2089         LASSERT (atomic_read(&ksocknal_data.ksnd_nactive_txs) == 0);
2090
2091         if (ksocknal_data.ksnd_schedulers != NULL)
2092                 LIBCFS_FREE (ksocknal_data.ksnd_schedulers,
2093                              sizeof (ksock_sched_t) * ksocknal_data.ksnd_nschedulers);
2094
2095         LIBCFS_FREE (ksocknal_data.ksnd_peers,
2096                      sizeof (struct list_head) *
2097                      ksocknal_data.ksnd_peer_hash_size);
2098
2099         spin_lock(&ksocknal_data.ksnd_tx_lock);
2100
2101         if (!list_empty(&ksocknal_data.ksnd_idle_noop_txs)) {
2102                 struct list_head  zlist;
2103                 ksock_tx_t       *tx;
2104
2105                 list_add(&zlist, &ksocknal_data.ksnd_idle_noop_txs);
2106                 list_del_init(&ksocknal_data.ksnd_idle_noop_txs);
2107                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
2108
2109                 while(!list_empty(&zlist)) {
2110                         tx = list_entry(zlist.next, ksock_tx_t, tx_list);
2111                         list_del(&tx->tx_list);
2112                         LIBCFS_FREE(tx, tx->tx_desc_size);
2113                 }
2114         } else {
2115                 spin_unlock(&ksocknal_data.ksnd_tx_lock);
2116         }
2117 }
2118
2119 void
2120 ksocknal_base_shutdown (void)
2121 {
2122         ksock_sched_t *sched;
2123         int            i;
2124
2125         CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
2126                atomic_read (&libcfs_kmemory));
2127         LASSERT (ksocknal_data.ksnd_nnets == 0);
2128
2129         switch (ksocknal_data.ksnd_init) {
2130         default:
2131                 LASSERT (0);
2132
2133         case SOCKNAL_INIT_ALL:
2134         case SOCKNAL_INIT_DATA:
2135                 LASSERT (ksocknal_data.ksnd_peers != NULL);
2136                 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
2137                         LASSERT (list_empty (&ksocknal_data.ksnd_peers[i]));
2138                 }
2139                 LASSERT (list_empty (&ksocknal_data.ksnd_enomem_conns));
2140                 LASSERT (list_empty (&ksocknal_data.ksnd_zombie_conns));
2141                 LASSERT (list_empty (&ksocknal_data.ksnd_connd_connreqs));
2142                 LASSERT (list_empty (&ksocknal_data.ksnd_connd_routes));
2143
2144                 if (ksocknal_data.ksnd_schedulers != NULL)
2145                         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
2146                                 ksock_sched_t *kss =
2147                                         &ksocknal_data.ksnd_schedulers[i];
2148
2149                                 LASSERT (list_empty (&kss->kss_tx_conns));
2150                                 LASSERT (list_empty (&kss->kss_rx_conns));
2151                                 LASSERT (list_empty (&kss->kss_zombie_noop_txs));
2152                                 LASSERT (kss->kss_nconns == 0);
2153                         }
2154
2155                 /* flag threads to terminate; wake and wait for them to die */
2156                 ksocknal_data.ksnd_shuttingdown = 1;
2157                 cfs_waitq_broadcast (&ksocknal_data.ksnd_connd_waitq);
2158                 cfs_waitq_broadcast (&ksocknal_data.ksnd_reaper_waitq);
2159
2160                 if (ksocknal_data.ksnd_schedulers != NULL)
2161                         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
2162                                 sched = &ksocknal_data.ksnd_schedulers[i];
2163                                 cfs_waitq_broadcast(&sched->kss_waitq);
2164                         }
2165
2166                 i = 4;
2167                 read_lock (&ksocknal_data.ksnd_global_lock);
2168                 while (ksocknal_data.ksnd_nthreads != 0) {
2169                         i++;
2170                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
2171                                "waiting for %d threads to terminate\n",
2172                                 ksocknal_data.ksnd_nthreads);
2173                         read_unlock (&ksocknal_data.ksnd_global_lock);
2174                         cfs_pause(cfs_time_seconds(1));
2175                         read_lock (&ksocknal_data.ksnd_global_lock);
2176                 }
2177                 read_unlock (&ksocknal_data.ksnd_global_lock);
2178
2179                 ksocknal_free_buffers();
2180
2181                 ksocknal_data.ksnd_init = SOCKNAL_INIT_NOTHING;
2182                 break;
2183         }
2184
2185         CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
2186                atomic_read (&libcfs_kmemory));
2187
2188         PORTAL_MODULE_UNUSE;
2189 }
2190
2191
2192 __u64
2193 ksocknal_new_incarnation (void)
2194 {
2195         struct timeval tv;
2196
2197         /* The incarnation number is the time this module loaded and it
2198          * identifies this particular instance of the socknal.  Hopefully
2199          * we won't be able to reboot more frequently than 1MHz for the
2200          * forseeable future :) */
2201
2202         do_gettimeofday(&tv);
2203
2204         return (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
2205 }
2206
2207 int
2208 ksocknal_base_startup (void)
2209 {
2210         int               rc;
2211         int               i;
2212
2213         LASSERT (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
2214         LASSERT (ksocknal_data.ksnd_nnets == 0);
2215
2216         memset (&ksocknal_data, 0, sizeof (ksocknal_data)); /* zero pointers */
2217
2218         ksocknal_data.ksnd_peer_hash_size = SOCKNAL_PEER_HASH_SIZE;
2219         LIBCFS_ALLOC (ksocknal_data.ksnd_peers,
2220                       sizeof (struct list_head) * ksocknal_data.ksnd_peer_hash_size);
2221         if (ksocknal_data.ksnd_peers == NULL)
2222                 return -ENOMEM;
2223
2224         for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++)
2225                 CFS_INIT_LIST_HEAD(&ksocknal_data.ksnd_peers[i]);
2226
2227         rwlock_init(&ksocknal_data.ksnd_global_lock);
2228
2229         spin_lock_init (&ksocknal_data.ksnd_reaper_lock);
2230         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_enomem_conns);
2231         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_zombie_conns);
2232         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_deathrow_conns);
2233         cfs_waitq_init(&ksocknal_data.ksnd_reaper_waitq);
2234
2235         spin_lock_init (&ksocknal_data.ksnd_connd_lock);
2236         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_connd_connreqs);
2237         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_connd_routes);
2238         cfs_waitq_init(&ksocknal_data.ksnd_connd_waitq);
2239
2240         spin_lock_init (&ksocknal_data.ksnd_tx_lock);
2241         CFS_INIT_LIST_HEAD (&ksocknal_data.ksnd_idle_noop_txs);
2242
2243         /* NB memset above zeros whole of ksocknal_data, including
2244          * ksocknal_data.ksnd_irqinfo[all].ksni_valid */
2245
2246         /* flag lists/ptrs/locks initialised */
2247         ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
2248         PORTAL_MODULE_USE;
2249
2250         ksocknal_data.ksnd_nschedulers = ksocknal_nsched();
2251         LIBCFS_ALLOC(ksocknal_data.ksnd_schedulers,
2252                      sizeof(ksock_sched_t) * ksocknal_data.ksnd_nschedulers);
2253         if (ksocknal_data.ksnd_schedulers == NULL)
2254                 goto failed;
2255
2256         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
2257                 ksock_sched_t *kss = &ksocknal_data.ksnd_schedulers[i];
2258
2259                 spin_lock_init (&kss->kss_lock);
2260                 CFS_INIT_LIST_HEAD (&kss->kss_rx_conns);
2261                 CFS_INIT_LIST_HEAD (&kss->kss_tx_conns);
2262                 CFS_INIT_LIST_HEAD (&kss->kss_zombie_noop_txs);
2263                 cfs_waitq_init (&kss->kss_waitq);
2264         }
2265
2266         for (i = 0; i < ksocknal_data.ksnd_nschedulers; i++) {
2267                 rc = ksocknal_thread_start (ksocknal_scheduler,
2268                                             &ksocknal_data.ksnd_schedulers[i]);
2269                 if (rc != 0) {
2270                         CERROR("Can't spawn socknal scheduler[%d]: %d\n",
2271                                i, rc);
2272                         goto failed;
2273                 }
2274         }
2275
2276         /* must have at least 2 connds to remain responsive to accepts while
2277          * connecting */
2278         if (*ksocknal_tunables.ksnd_nconnds < 2)
2279                 *ksocknal_tunables.ksnd_nconnds = 2;
2280         
2281         for (i = 0; i < *ksocknal_tunables.ksnd_nconnds; i++) {
2282                 rc = ksocknal_thread_start (ksocknal_connd, (void *)((long)i));
2283                 if (rc != 0) {
2284                         CERROR("Can't spawn socknal connd: %d\n", rc);
2285                         goto failed;
2286                 }
2287         }
2288
2289         rc = ksocknal_thread_start (ksocknal_reaper, NULL);
2290         if (rc != 0) {
2291                 CERROR ("Can't spawn socknal reaper: %d\n", rc);
2292                 goto failed;
2293         }
2294
2295         /* flag everything initialised */
2296         ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
2297
2298         return 0;
2299
2300  failed:
2301         ksocknal_base_shutdown();
2302         return -ENETDOWN;
2303 }
2304
2305 void
2306 ksocknal_shutdown (lnet_ni_t *ni)
2307 {
2308         ksock_net_t      *net = ni->ni_data;
2309         int               i;
2310         lnet_process_id_t  anyid = {.nid = LNET_NID_ANY,
2311                                    .pid = LNET_PID_ANY};
2312
2313         LASSERT(ksocknal_data.ksnd_init == SOCKNAL_INIT_ALL);
2314         LASSERT(ksocknal_data.ksnd_nnets > 0);
2315
2316         spin_lock_bh (&net->ksnn_lock);
2317         net->ksnn_shutdown = 1;                 /* prevent new peers */
2318         spin_unlock_bh (&net->ksnn_lock);
2319
2320         /* Delete all peers */
2321         ksocknal_del_peer(ni, anyid, 0);
2322
2323         /* Wait for all peer state to clean up */
2324         i = 2;
2325         spin_lock_bh (&net->ksnn_lock);
2326         while (net->ksnn_npeers != 0) {
2327                 spin_unlock_bh (&net->ksnn_lock);
2328
2329                 i++;
2330                 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
2331                        "waiting for %d peers to disconnect\n",
2332                        net->ksnn_npeers);
2333                 cfs_pause(cfs_time_seconds(1));
2334
2335                 spin_lock_bh (&net->ksnn_lock);
2336         }
2337         spin_unlock_bh (&net->ksnn_lock);
2338
2339         for (i = 0; i < net->ksnn_ninterfaces; i++) {
2340                 LASSERT (net->ksnn_interfaces[i].ksni_npeers == 0);
2341                 LASSERT (net->ksnn_interfaces[i].ksni_nroutes == 0);
2342         }
2343
2344         LIBCFS_FREE(net, sizeof(*net));
2345         
2346         ksocknal_data.ksnd_nnets--;
2347         if (ksocknal_data.ksnd_nnets == 0)
2348                 ksocknal_base_shutdown();
2349 }
2350
2351 int
2352 ksocknal_enumerate_interfaces(ksock_net_t *net)
2353 {
2354         char      **names;
2355         int         i;
2356         int         j;
2357         int         rc;
2358         int         n;
2359                 
2360         n = libcfs_ipif_enumerate(&names);
2361         if (n <= 0) {
2362                 CERROR("Can't enumerate interfaces: %d\n", n);
2363                 return n;
2364         }
2365
2366         for (i = j = 0; i < n; i++) {
2367                 int        up;
2368                 __u32      ip;
2369                 __u32      mask;
2370
2371                 if (!strcmp(names[i], "lo")) /* skip the loopback IF */
2372                         continue;
2373
2374                 rc = libcfs_ipif_query(names[i], &up, &ip, &mask);
2375                 if (rc != 0) {
2376                         CWARN("Can't get interface %s info: %d\n",
2377                               names[i], rc);
2378                         continue;
2379                 }
2380                 
2381                 if (!up) {
2382                         CWARN("Ignoring interface %s (down)\n",
2383                               names[i]);
2384                         continue;
2385                 }
2386
2387                 if (j == LNET_MAX_INTERFACES) {
2388                         CWARN("Ignoring interface %s (too many interfaces)\n",
2389                               names[i]);
2390                         continue;
2391                 }
2392
2393                 net->ksnn_interfaces[j].ksni_ipaddr = ip;
2394                 net->ksnn_interfaces[j].ksni_netmask = mask;
2395                 j++;
2396         }
2397
2398         libcfs_ipif_free_enumeration(names, n);
2399         
2400         if (j == 0)
2401                 CERROR("Can't find any usable interfaces\n");
2402         
2403         return j;
2404 }
2405
2406 int
2407 ksocknal_startup (lnet_ni_t *ni)
2408 {
2409         ksock_net_t  *net;
2410         int           rc;
2411         int           i;
2412
2413         LASSERT (ni->ni_lnd == &the_ksocklnd);
2414
2415         if (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING) {
2416                 rc = ksocknal_base_startup();
2417                 if (rc != 0)
2418                         return rc;
2419         }
2420         
2421         LIBCFS_ALLOC(net, sizeof(*net));
2422         if (net == NULL)
2423                 goto fail_0;
2424                 
2425         memset(net, 0, sizeof(*net));
2426         spin_lock_init(&net->ksnn_lock);
2427         net->ksnn_incarnation = ksocknal_new_incarnation();
2428         ni->ni_data = net;
2429         ni->ni_maxtxcredits = *ksocknal_tunables.ksnd_credits;
2430         ni->ni_peertxcredits = *ksocknal_tunables.ksnd_peercredits;
2431         
2432         if (ni->ni_interfaces[0] == NULL) {
2433                 rc = ksocknal_enumerate_interfaces(net);
2434                 if (rc <= 0)
2435                         goto fail_1;
2436
2437                 net->ksnn_ninterfaces = 1;
2438         } else {
2439                 for (i = 0; i < LNET_MAX_INTERFACES; i++) {
2440                         int    up;
2441
2442                         if (ni->ni_interfaces[i] == NULL)
2443                                 break;
2444
2445                         rc = libcfs_ipif_query(
2446                                 ni->ni_interfaces[i], &up,
2447                                 &net->ksnn_interfaces[i].ksni_ipaddr,
2448                                 &net->ksnn_interfaces[i].ksni_netmask);
2449                         
2450                         if (rc != 0) {
2451                                 CERROR("Can't get interface %s info: %d\n",
2452                                        ni->ni_interfaces[i], rc);
2453                                 goto fail_1;
2454                         }
2455                         
2456                         if (!up) {
2457                                 CERROR("Interface %s is down\n",
2458                                        ni->ni_interfaces[i]);
2459                                 goto fail_1;
2460                         }
2461                 }
2462                 net->ksnn_ninterfaces = i;
2463         }
2464
2465         ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid),
2466                                 net->ksnn_interfaces[0].ksni_ipaddr);
2467
2468         ksocknal_data.ksnd_nnets++;
2469
2470         return 0;
2471         
2472  fail_1:
2473         LIBCFS_FREE(net, sizeof(*net));
2474  fail_0:
2475         if (ksocknal_data.ksnd_nnets == 0)
2476                 ksocknal_base_shutdown();
2477
2478         return -ENETDOWN;
2479 }
2480
2481
2482 void __exit
2483 ksocknal_module_fini (void)
2484 {
2485         lnet_unregister_lnd(&the_ksocklnd);
2486         ksocknal_lib_tunables_fini();
2487 }
2488
2489 int __init
2490 ksocknal_module_init (void)
2491 {
2492         int    rc;
2493
2494         /* check ksnr_connected/connecting field large enough */
2495         CLASSERT(SOCKLND_CONN_NTYPES <= 4);
2496         
2497         rc = ksocknal_lib_tunables_init();
2498         if (rc != 0)
2499                 return rc;
2500
2501         lnet_register_lnd(&the_ksocklnd);
2502
2503         return 0;
2504 }
2505
2506 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2507 MODULE_DESCRIPTION("Kernel TCP Socket LND v2.0.0");
2508 MODULE_LICENSE("GPL");
2509
2510 cfs_module(ksocknal, "2.0.0", ksocknal_module_init, ksocknal_module_fini);