1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5 * Author: Zach Brown <zab@zabbo.net>
6 * Author: Peter J. Braam <braam@clusterfs.com>
7 * Author: Phil Schwan <phil@clusterfs.com>
8 * Author: Eric Barton <eric@bartonsoftware.com>
10 * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
12 * Portals is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Portals is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Portals; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
29 ksock_nal_data_t ksocknal_data;
30 ptl_handle_ni_t ksocknal_ni;
31 ksock_tunables_t ksocknal_tunables;
33 kpr_nal_interface_t ksocknal_router_interface = {
35 kprni_arg: &ksocknal_data,
36 kprni_fwd: ksocknal_fwd_packet,
37 kprni_notify: ksocknal_notify,
41 #define SOCKNAL_SYSCTL 200
43 #define SOCKNAL_SYSCTL_TIMEOUT 1
44 #define SOCKNAL_SYSCTL_EAGER_ACK 2
45 #define SOCKNAL_SYSCTL_ZERO_COPY 3
46 #define SOCKNAL_SYSCTL_TYPED 4
47 #define SOCKNAL_SYSCTL_MIN_BULK 5
49 static ctl_table ksocknal_ctl_table[] = {
50 {SOCKNAL_SYSCTL_TIMEOUT, "timeout",
51 &ksocknal_tunables.ksnd_io_timeout, sizeof (int),
52 0644, NULL, &proc_dointvec},
53 {SOCKNAL_SYSCTL_EAGER_ACK, "eager_ack",
54 &ksocknal_tunables.ksnd_eager_ack, sizeof (int),
55 0644, NULL, &proc_dointvec},
57 {SOCKNAL_SYSCTL_ZERO_COPY, "zero_copy",
58 &ksocknal_tunables.ksnd_zc_min_frag, sizeof (int),
59 0644, NULL, &proc_dointvec},
61 {SOCKNAL_SYSCTL_TYPED, "typed",
62 &ksocknal_tunables.ksnd_typed_conns, sizeof (int),
63 0644, NULL, &proc_dointvec},
64 {SOCKNAL_SYSCTL_MIN_BULK, "min_bulk",
65 &ksocknal_tunables.ksnd_min_bulk, sizeof (int),
66 0644, NULL, &proc_dointvec},
70 static ctl_table ksocknal_top_ctl_table[] = {
71 {SOCKNAL_SYSCTL, "socknal", NULL, 0, 0555, ksocknal_ctl_table},
77 ksocknal_set_mynid(ptl_nid_t nid)
79 lib_ni_t *ni = &ksocknal_lib.libnal_ni;
81 /* FIXME: we have to do this because we call lib_init() at module
82 * insertion time, which is before we have 'mynid' available. lib_init
83 * sets the NAL's nid, which it uses to tell other nodes where packets
84 * are coming from. This is not a very graceful solution to this
87 CDEBUG(D_IOCTL, "setting mynid to "LPX64" (old nid="LPX64")\n",
95 ksocknal_bind_irq (unsigned int irq)
97 #if (defined(CONFIG_SMP) && CPU_AFFINITY)
101 ksock_irqinfo_t *info;
102 char *argv[] = {"/bin/sh",
106 char *envp[] = {"HOME=/",
107 "PATH=/sbin:/bin:/usr/sbin:/usr/bin",
110 LASSERT (irq < NR_IRQS);
111 if (irq == 0) /* software NIC */
114 info = &ksocknal_data.ksnd_irqinfo[irq];
116 write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
118 LASSERT (info->ksni_valid);
119 bind = !info->ksni_bound;
120 info->ksni_bound = 1;
122 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
124 if (!bind) /* bound already */
127 snprintf (cmdline, sizeof (cmdline),
128 "echo %d > /proc/irq/%u/smp_affinity", 1 << info->ksni_sched, irq);
130 printk (KERN_INFO "Lustre: Binding irq %u to CPU %d with cmd: %s\n",
131 irq, info->ksni_sched, cmdline);
133 /* FIXME: Find a better method of setting IRQ affinity...
136 USERMODEHELPER(argv[0], argv, envp);
141 ksocknal_create_route (__u32 ipaddr, int port, int buffer_size,
142 int irq_affinity, int eager)
144 ksock_route_t *route;
146 PORTAL_ALLOC (route, sizeof (*route));
150 atomic_set (&route->ksnr_refcount, 1);
151 route->ksnr_sharecount = 0;
152 route->ksnr_peer = NULL;
153 route->ksnr_timeout = jiffies;
154 route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
155 route->ksnr_ipaddr = ipaddr;
156 route->ksnr_port = port;
157 route->ksnr_buffer_size = buffer_size;
158 route->ksnr_irq_affinity = irq_affinity;
159 route->ksnr_eager = eager;
160 route->ksnr_connecting = 0;
161 route->ksnr_connected = 0;
162 route->ksnr_deleted = 0;
163 route->ksnr_conn_count = 0;
169 ksocknal_destroy_route (ksock_route_t *route)
171 LASSERT (route->ksnr_sharecount == 0);
173 if (route->ksnr_peer != NULL)
174 ksocknal_put_peer (route->ksnr_peer);
176 PORTAL_FREE (route, sizeof (*route));
180 ksocknal_put_route (ksock_route_t *route)
182 CDEBUG (D_OTHER, "putting route[%p] (%d)\n",
183 route, atomic_read (&route->ksnr_refcount));
185 LASSERT (atomic_read (&route->ksnr_refcount) > 0);
186 if (!atomic_dec_and_test (&route->ksnr_refcount))
189 ksocknal_destroy_route (route);
193 ksocknal_create_peer (ptl_nid_t nid)
197 LASSERT (nid != PTL_NID_ANY);
199 PORTAL_ALLOC (peer, sizeof (*peer));
203 memset (peer, 0, sizeof (*peer));
205 peer->ksnp_nid = nid;
206 atomic_set (&peer->ksnp_refcount, 1); /* 1 ref for caller */
207 peer->ksnp_closing = 0;
208 INIT_LIST_HEAD (&peer->ksnp_conns);
209 INIT_LIST_HEAD (&peer->ksnp_routes);
210 INIT_LIST_HEAD (&peer->ksnp_tx_queue);
212 atomic_inc (&ksocknal_data.ksnd_npeers);
217 ksocknal_destroy_peer (ksock_peer_t *peer)
219 CDEBUG (D_NET, "peer "LPX64" %p deleted\n", peer->ksnp_nid, peer);
221 LASSERT (atomic_read (&peer->ksnp_refcount) == 0);
222 LASSERT (list_empty (&peer->ksnp_conns));
223 LASSERT (list_empty (&peer->ksnp_routes));
224 LASSERT (list_empty (&peer->ksnp_tx_queue));
226 PORTAL_FREE (peer, sizeof (*peer));
228 /* NB a peer's connections and autoconnect routes keep a reference
229 * on their peer until they are destroyed, so we can be assured
230 * that _all_ state to do with this peer has been cleaned up when
231 * its refcount drops to zero. */
232 atomic_dec (&ksocknal_data.ksnd_npeers);
236 ksocknal_put_peer (ksock_peer_t *peer)
238 CDEBUG (D_OTHER, "putting peer[%p] -> "LPX64" (%d)\n",
239 peer, peer->ksnp_nid,
240 atomic_read (&peer->ksnp_refcount));
242 LASSERT (atomic_read (&peer->ksnp_refcount) > 0);
243 if (!atomic_dec_and_test (&peer->ksnp_refcount))
246 ksocknal_destroy_peer (peer);
250 ksocknal_find_peer_locked (ptl_nid_t nid)
252 struct list_head *peer_list = ksocknal_nid2peerlist (nid);
253 struct list_head *tmp;
256 list_for_each (tmp, peer_list) {
258 peer = list_entry (tmp, ksock_peer_t, ksnp_list);
260 LASSERT (!peer->ksnp_closing);
261 LASSERT (!(list_empty (&peer->ksnp_routes) &&
262 list_empty (&peer->ksnp_conns)));
264 if (peer->ksnp_nid != nid)
267 CDEBUG(D_NET, "got peer [%p] -> "LPX64" (%d)\n",
268 peer, nid, atomic_read (&peer->ksnp_refcount));
275 ksocknal_get_peer (ptl_nid_t nid)
279 read_lock (&ksocknal_data.ksnd_global_lock);
280 peer = ksocknal_find_peer_locked (nid);
281 if (peer != NULL) /* +1 ref for caller? */
282 atomic_inc (&peer->ksnp_refcount);
283 read_unlock (&ksocknal_data.ksnd_global_lock);
289 ksocknal_unlink_peer_locked (ksock_peer_t *peer)
291 LASSERT (!peer->ksnp_closing);
292 peer->ksnp_closing = 1;
293 list_del (&peer->ksnp_list);
294 /* lose peerlist's ref */
295 ksocknal_put_peer (peer);
299 ksocknal_get_route_by_idx (int index)
302 struct list_head *ptmp;
303 ksock_route_t *route;
304 struct list_head *rtmp;
307 read_lock (&ksocknal_data.ksnd_global_lock);
309 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
310 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
311 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
313 LASSERT (!(list_empty (&peer->ksnp_routes) &&
314 list_empty (&peer->ksnp_conns)));
316 list_for_each (rtmp, &peer->ksnp_routes) {
320 route = list_entry (rtmp, ksock_route_t, ksnr_list);
321 atomic_inc (&route->ksnr_refcount);
322 read_unlock (&ksocknal_data.ksnd_global_lock);
328 read_unlock (&ksocknal_data.ksnd_global_lock);
333 ksocknal_add_route (ptl_nid_t nid, __u32 ipaddr, int port, int bufnob,
334 int bind_irq, int share, int eager)
339 ksock_route_t *route;
340 struct list_head *rtmp;
341 ksock_route_t *route2;
343 if (nid == PTL_NID_ANY)
346 /* Have a brand new peer ready... */
347 peer = ksocknal_create_peer (nid);
351 route = ksocknal_create_route (ipaddr, port, bufnob,
354 ksocknal_put_peer (peer);
358 write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
360 peer2 = ksocknal_find_peer_locked (nid);
362 ksocknal_put_peer (peer);
365 /* peer table takes existing ref on peer */
366 list_add (&peer->ksnp_list,
367 ksocknal_nid2peerlist (nid));
372 /* check for existing route to this NID via this ipaddr */
373 list_for_each (rtmp, &peer->ksnp_routes) {
374 route2 = list_entry (rtmp, ksock_route_t, ksnr_list);
376 if (route2->ksnr_ipaddr == ipaddr)
383 if (route2 != NULL) {
384 ksocknal_put_route (route);
387 /* route takes a ref on peer */
388 route->ksnr_peer = peer;
389 atomic_inc (&peer->ksnp_refcount);
390 /* peer's route list takes existing ref on route */
391 list_add_tail (&route->ksnr_list, &peer->ksnp_routes);
394 route->ksnr_sharecount++;
396 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
402 ksocknal_del_route_locked (ksock_route_t *route, int share, int keep_conn)
404 ksock_peer_t *peer = route->ksnr_peer;
406 struct list_head *ctmp;
407 struct list_head *cnxt;
410 route->ksnr_sharecount = 0;
412 route->ksnr_sharecount--;
413 if (route->ksnr_sharecount != 0)
417 list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
418 conn = list_entry(ctmp, ksock_conn_t, ksnc_list);
420 if (conn->ksnc_route != route)
424 ksocknal_close_conn_locked (conn, 0);
428 /* keeping the conn; just dissociate it and route... */
429 conn->ksnc_route = NULL;
430 ksocknal_put_route (route); /* drop conn's ref on route */
433 route->ksnr_deleted = 1;
434 list_del (&route->ksnr_list);
435 ksocknal_put_route (route); /* drop peer's ref */
437 if (list_empty (&peer->ksnp_routes) &&
438 list_empty (&peer->ksnp_conns)) {
439 /* I've just removed the last autoconnect route of a peer
440 * with no active connections */
441 ksocknal_unlink_peer_locked (peer);
446 ksocknal_del_route (ptl_nid_t nid, __u32 ipaddr, int share, int keep_conn)
449 struct list_head *ptmp;
450 struct list_head *pnxt;
452 struct list_head *rtmp;
453 struct list_head *rnxt;
454 ksock_route_t *route;
460 write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
462 if (nid != PTL_NID_ANY)
463 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
466 hi = ksocknal_data.ksnd_peer_hash_size - 1;
469 for (i = lo; i <= hi; i++) {
470 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
471 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
473 if (!(nid == PTL_NID_ANY || peer->ksnp_nid == nid))
476 list_for_each_safe (rtmp, rnxt, &peer->ksnp_routes) {
477 route = list_entry (rtmp, ksock_route_t,
481 route->ksnr_ipaddr == ipaddr))
484 ksocknal_del_route_locked (route, share, keep_conn);
485 rc = 0; /* matched something */
492 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
498 ksocknal_get_conn_by_idx (int index)
501 struct list_head *ptmp;
503 struct list_head *ctmp;
506 read_lock (&ksocknal_data.ksnd_global_lock);
508 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
509 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
510 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
512 LASSERT (!(list_empty (&peer->ksnp_routes) &&
513 list_empty (&peer->ksnp_conns)));
515 list_for_each (ctmp, &peer->ksnp_conns) {
519 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
520 atomic_inc (&conn->ksnc_refcount);
521 read_unlock (&ksocknal_data.ksnd_global_lock);
527 read_unlock (&ksocknal_data.ksnd_global_lock);
532 ksocknal_get_peer_addr (ksock_conn_t *conn)
534 struct sockaddr_in sin;
535 int len = sizeof (sin);
538 rc = conn->ksnc_sock->ops->getname (conn->ksnc_sock,
539 (struct sockaddr *)&sin, &len, 2);
540 /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
541 LASSERT (!conn->ksnc_closing);
542 LASSERT (len <= sizeof (sin));
545 CERROR ("Error %d getting sock peer IP\n", rc);
549 conn->ksnc_ipaddr = ntohl (sin.sin_addr.s_addr);
550 conn->ksnc_port = ntohs (sin.sin_port);
554 ksocknal_conn_irq (ksock_conn_t *conn)
557 struct dst_entry *dst;
559 dst = sk_dst_get (conn->ksnc_sock->sk);
561 if (dst->dev != NULL) {
563 if (irq >= NR_IRQS) {
564 CERROR ("Unexpected IRQ %x\n", irq);
571 /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
572 LASSERT (!conn->ksnc_closing);
577 ksocknal_choose_scheduler_locked (unsigned int irq)
579 ksock_sched_t *sched;
580 ksock_irqinfo_t *info;
583 LASSERT (irq < NR_IRQS);
584 info = &ksocknal_data.ksnd_irqinfo[irq];
586 if (irq != 0 && /* hardware NIC */
587 info->ksni_valid) { /* already set up */
588 return (&ksocknal_data.ksnd_schedulers[info->ksni_sched]);
591 /* software NIC (irq == 0) || not associated with a scheduler yet.
592 * Choose the CPU with the fewest connections... */
593 sched = &ksocknal_data.ksnd_schedulers[0];
594 for (i = 1; i < SOCKNAL_N_SCHED; i++)
595 if (sched->kss_nconns >
596 ksocknal_data.ksnd_schedulers[i].kss_nconns)
597 sched = &ksocknal_data.ksnd_schedulers[i];
599 if (irq != 0) { /* Hardware NIC */
600 info->ksni_valid = 1;
601 info->ksni_sched = sched - ksocknal_data.ksnd_schedulers;
604 LASSERT (info->ksni_sched == sched - ksocknal_data.ksnd_schedulers);
611 ksocknal_create_conn (ksock_route_t *route, struct socket *sock,
612 int bind_irq, int type)
620 ksock_sched_t *sched;
625 /* NB, sock has an associated file since (a) this connection might
626 * have been created in userland and (b) we need to refcount the
627 * socket so that we don't close it while I/O is being done on
628 * it, and sock->file has that pre-cooked... */
629 LASSERT (sock->file != NULL);
630 LASSERT (file_count(sock->file) > 0);
632 rc = ksocknal_setup_sock (sock);
637 /* acceptor or explicit connect */
640 LASSERT (type != SOCKNAL_CONN_NONE);
641 /* autoconnect: expect this nid on exchange */
642 nid = route->ksnr_peer->ksnp_nid;
645 rc = ksocknal_hello (sock, &nid, &type, &incarnation);
650 if (route == NULL) { /* not autoconnect */
651 /* Assume this socket connects to a brand new peer */
652 peer = ksocknal_create_peer (nid);
657 PORTAL_ALLOC(conn, sizeof(*conn));
660 ksocknal_put_peer (peer);
664 memset (conn, 0, sizeof (*conn));
665 conn->ksnc_peer = NULL;
666 conn->ksnc_route = NULL;
667 conn->ksnc_sock = sock;
668 conn->ksnc_type = type;
669 conn->ksnc_incarnation = incarnation;
670 conn->ksnc_saved_data_ready = sock->sk->sk_data_ready;
671 conn->ksnc_saved_write_space = sock->sk->sk_write_space;
672 atomic_set (&conn->ksnc_refcount, 1); /* 1 ref for me */
674 conn->ksnc_rx_ready = 0;
675 conn->ksnc_rx_scheduled = 0;
676 ksocknal_new_packet (conn, 0);
678 INIT_LIST_HEAD (&conn->ksnc_tx_queue);
679 conn->ksnc_tx_ready = 0;
680 conn->ksnc_tx_scheduled = 0;
681 atomic_set (&conn->ksnc_tx_nob, 0);
683 ksocknal_get_peer_addr (conn);
685 CWARN("New conn nid:"LPX64" ip:%08x/%d incarnation:"LPX64"\n",
686 nid, conn->ksnc_ipaddr, conn->ksnc_port, incarnation);
688 irq = ksocknal_conn_irq (conn);
690 write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
694 LASSERT ((route->ksnr_connected & (1 << type)) == 0);
695 LASSERT ((route->ksnr_connecting & (1 << type)) != 0);
697 if (route->ksnr_deleted) {
698 /* This conn was autoconnected, but the autoconnect
699 * route got deleted while it was being
701 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock,
703 PORTAL_FREE (conn, sizeof (*conn));
708 /* associate conn/route */
709 conn->ksnc_route = route;
710 atomic_inc (&route->ksnr_refcount);
712 route->ksnr_connecting &= ~(1 << type);
713 route->ksnr_connected |= (1 << type);
714 route->ksnr_conn_count++;
715 route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
717 peer = route->ksnr_peer;
719 /* Not an autoconnected connection; see if there is an
720 * existing peer for this NID */
721 peer2 = ksocknal_find_peer_locked (nid);
723 ksocknal_put_peer (peer);
726 list_add (&peer->ksnp_list,
727 ksocknal_nid2peerlist (nid));
728 /* peer list takes over existing ref */
732 LASSERT (!peer->ksnp_closing);
734 conn->ksnc_peer = peer;
735 atomic_inc (&peer->ksnp_refcount);
736 peer->ksnp_last_alive = jiffies;
737 peer->ksnp_error = 0;
739 /* Set the deadline for the outgoing HELLO to drain */
740 conn->ksnc_tx_deadline = jiffies +
741 ksocknal_tunables.ksnd_io_timeout * HZ;
743 list_add (&conn->ksnc_list, &peer->ksnp_conns);
744 atomic_inc (&conn->ksnc_refcount);
746 sched = ksocknal_choose_scheduler_locked (irq);
748 conn->ksnc_scheduler = sched;
750 /* NB my callbacks block while I hold ksnd_global_lock */
751 sock->sk->sk_user_data = conn;
752 sock->sk->sk_data_ready = ksocknal_data_ready;
753 sock->sk->sk_write_space = ksocknal_write_space;
755 /* Take all the packets blocking for a connection.
756 * NB, it might be nicer to share these blocked packets among any
757 * other connections that are becoming established, however that
758 * confuses the normal packet launching operation, which selects a
759 * connection and queues the packet on it without needing an
760 * exclusive lock on ksnd_global_lock. */
761 while (!list_empty (&peer->ksnp_tx_queue)) {
762 tx = list_entry (peer->ksnp_tx_queue.next,
763 ksock_tx_t, tx_list);
765 list_del (&tx->tx_list);
766 ksocknal_queue_tx_locked (tx, conn);
769 rc = ksocknal_close_stale_conns_locked (peer, incarnation);
771 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
774 CERROR ("Closed %d stale conns to nid "LPX64" ip %d.%d.%d.%d\n",
775 rc, conn->ksnc_peer->ksnp_nid,
776 HIPQUAD(conn->ksnc_ipaddr));
778 if (bind_irq) /* irq binding required */
779 ksocknal_bind_irq (irq);
781 /* Call the callbacks right now to get things going. */
782 ksocknal_data_ready (sock->sk, 0);
783 ksocknal_write_space (sock->sk);
785 CDEBUG(D_IOCTL, "conn [%p] registered for nid "LPX64" ip %d.%d.%d.%d\n",
786 conn, conn->ksnc_peer->ksnp_nid, HIPQUAD(conn->ksnc_ipaddr));
788 ksocknal_put_conn (conn);
793 ksocknal_close_conn_locked (ksock_conn_t *conn, int error)
795 /* This just does the immmediate housekeeping, and queues the
796 * connection for the reaper to terminate.
797 * Caller holds ksnd_global_lock exclusively in irq context */
798 ksock_peer_t *peer = conn->ksnc_peer;
799 ksock_route_t *route;
801 LASSERT (peer->ksnp_error == 0);
802 LASSERT (!conn->ksnc_closing);
803 conn->ksnc_closing = 1;
804 atomic_inc (&ksocknal_data.ksnd_nclosing_conns);
806 route = conn->ksnc_route;
808 /* dissociate conn from route... */
809 LASSERT (!route->ksnr_deleted);
810 LASSERT ((route->ksnr_connecting & (1 << conn->ksnc_type)) == 0);
811 LASSERT ((route->ksnr_connected & (1 << conn->ksnc_type)) != 0);
813 route->ksnr_connected &= ~(1 << conn->ksnc_type);
814 conn->ksnc_route = NULL;
816 list_del (&route->ksnr_list); /* make route least favourite */
817 list_add_tail (&route->ksnr_list, &peer->ksnp_routes);
819 ksocknal_put_route (route); /* drop conn's ref on route */
822 /* ksnd_deathrow_conns takes over peer's ref */
823 list_del (&conn->ksnc_list);
825 if (list_empty (&peer->ksnp_conns)) {
826 /* No more connections to this peer */
828 peer->ksnp_error = error; /* stash last conn close reason */
830 if (list_empty (&peer->ksnp_routes)) {
831 /* I've just closed last conn belonging to a
832 * non-autoconnecting peer */
833 ksocknal_unlink_peer_locked (peer);
837 spin_lock (&ksocknal_data.ksnd_reaper_lock);
839 list_add_tail (&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
840 wake_up (&ksocknal_data.ksnd_reaper_waitq);
842 spin_unlock (&ksocknal_data.ksnd_reaper_lock);
846 ksocknal_terminate_conn (ksock_conn_t *conn)
848 /* This gets called by the reaper (guaranteed thread context) to
849 * disengage the socket from its callbacks and close it.
850 * ksnc_refcount will eventually hit zero, and then the reaper will
853 ksock_peer_t *peer = conn->ksnc_peer;
854 ksock_sched_t *sched = conn->ksnc_scheduler;
859 LASSERT(conn->ksnc_closing);
861 /* wake up the scheduler to "send" all remaining packets to /dev/null */
862 spin_lock_irqsave(&sched->kss_lock, flags);
864 if (!conn->ksnc_tx_scheduled &&
865 !list_empty(&conn->ksnc_tx_queue)){
866 list_add_tail (&conn->ksnc_tx_list,
867 &sched->kss_tx_conns);
868 /* a closing conn is always ready to tx */
869 conn->ksnc_tx_ready = 1;
870 conn->ksnc_tx_scheduled = 1;
871 /* extra ref for scheduler */
872 atomic_inc (&conn->ksnc_refcount);
874 wake_up (&sched->kss_waitq);
877 spin_unlock_irqrestore (&sched->kss_lock, flags);
879 /* serialise with callbacks */
880 write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
882 /* Remove conn's network callbacks.
883 * NB I _have_ to restore the callback, rather than storing a noop,
884 * since the socket could survive past this module being unloaded!! */
885 conn->ksnc_sock->sk->sk_data_ready = conn->ksnc_saved_data_ready;
886 conn->ksnc_sock->sk->sk_write_space = conn->ksnc_saved_write_space;
888 /* A callback could be in progress already; they hold a read lock
889 * on ksnd_global_lock (to serialise with me) and NOOP if
890 * sk_user_data is NULL. */
891 conn->ksnc_sock->sk->sk_user_data = NULL;
893 /* OK, so this conn may not be completely disengaged from its
894 * scheduler yet, but it _has_ committed to terminate... */
895 conn->ksnc_scheduler->kss_nconns--;
897 if (peer->ksnp_error != 0) {
898 /* peer's last conn closed in error */
899 LASSERT (list_empty (&peer->ksnp_conns));
901 /* convert peer's last-known-alive timestamp from jiffies */
902 do_gettimeofday (&now);
903 then = now.tv_sec - (jiffies - peer->ksnp_last_alive)/HZ;
907 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
909 /* The socket is closed on the final put; either here, or in
910 * ksocknal_{send,recv}msg(). Since we set up the linger2 option
911 * when the connection was established, this will close the socket
912 * immediately, aborting anything buffered in it. Any hung
913 * zero-copy transmits will therefore complete in finite time. */
914 ksocknal_putconnsock (conn);
917 kpr_notify (&ksocknal_data.ksnd_router, peer->ksnp_nid,
922 ksocknal_destroy_conn (ksock_conn_t *conn)
924 /* Final coup-de-grace of the reaper */
925 CDEBUG (D_NET, "connection %p\n", conn);
927 LASSERT (atomic_read (&conn->ksnc_refcount) == 0);
928 LASSERT (conn->ksnc_route == NULL);
929 LASSERT (!conn->ksnc_tx_scheduled);
930 LASSERT (!conn->ksnc_rx_scheduled);
931 LASSERT (list_empty(&conn->ksnc_tx_queue));
933 /* complete current receive if any */
934 switch (conn->ksnc_rx_state) {
935 case SOCKNAL_RX_BODY:
936 CERROR("Completing partial receive from "LPX64
937 ", ip %d.%d.%d.%d:%d, with error\n",
938 conn->ksnc_peer->ksnp_nid,
939 HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
940 lib_finalize (&ksocknal_lib, NULL, conn->ksnc_cookie, PTL_FAIL);
942 case SOCKNAL_RX_BODY_FWD:
943 ksocknal_fmb_callback (conn->ksnc_cookie, -ECONNABORTED);
945 case SOCKNAL_RX_HEADER:
946 case SOCKNAL_RX_SLOP:
953 ksocknal_put_peer (conn->ksnc_peer);
955 PORTAL_FREE (conn, sizeof (*conn));
956 atomic_dec (&ksocknal_data.ksnd_nclosing_conns);
960 ksocknal_put_conn (ksock_conn_t *conn)
964 CDEBUG (D_OTHER, "putting conn[%p] -> "LPX64" (%d)\n",
965 conn, conn->ksnc_peer->ksnp_nid,
966 atomic_read (&conn->ksnc_refcount));
968 LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
969 if (!atomic_dec_and_test (&conn->ksnc_refcount))
972 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
974 list_add (&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
975 wake_up (&ksocknal_data.ksnd_reaper_waitq);
977 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
981 ksocknal_close_peer_conns_locked (ksock_peer_t *peer, __u32 ipaddr, int why)
984 struct list_head *ctmp;
985 struct list_head *cnxt;
988 list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
989 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
992 conn->ksnc_ipaddr == ipaddr) {
994 ksocknal_close_conn_locked (conn, why);
1002 ksocknal_close_stale_conns_locked (ksock_peer_t *peer, __u64 incarnation)
1005 struct list_head *ctmp;
1006 struct list_head *cnxt;
1009 list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1010 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
1012 if (conn->ksnc_incarnation == incarnation)
1015 CWARN("Closing stale conn nid:"LPX64" ip:%08x/%d "
1016 "incarnation:"LPX64"("LPX64")\n",
1017 peer->ksnp_nid, conn->ksnc_ipaddr, conn->ksnc_port,
1018 conn->ksnc_incarnation, incarnation);
1021 ksocknal_close_conn_locked (conn, -ESTALE);
1028 ksocknal_close_conn_and_siblings (ksock_conn_t *conn, int why)
1030 ksock_peer_t *peer = conn->ksnc_peer;
1031 __u32 ipaddr = conn->ksnc_ipaddr;
1032 unsigned long flags;
1035 write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1037 count = ksocknal_close_peer_conns_locked (peer, ipaddr, why);
1039 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1045 ksocknal_close_matching_conns (ptl_nid_t nid, __u32 ipaddr)
1047 unsigned long flags;
1049 struct list_head *ptmp;
1050 struct list_head *pnxt;
1056 write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1058 if (nid != PTL_NID_ANY)
1059 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
1062 hi = ksocknal_data.ksnd_peer_hash_size - 1;
1065 for (i = lo; i <= hi; i++) {
1066 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
1068 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
1070 if (!(nid == PTL_NID_ANY || nid == peer->ksnp_nid))
1073 count += ksocknal_close_peer_conns_locked (peer, ipaddr, 0);
1077 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1079 /* wildcards always succeed */
1080 if (nid == PTL_NID_ANY || ipaddr == 0)
1083 return (count == 0 ? -ENOENT : 0);
1087 ksocknal_notify (void *arg, ptl_nid_t gw_nid, int alive)
1089 /* The router is telling me she's been notified of a change in
1090 * gateway state.... */
1092 CDEBUG (D_NET, "gw "LPX64" %s\n", gw_nid, alive ? "up" : "down");
1095 /* If the gateway crashed, close all open connections... */
1096 ksocknal_close_matching_conns (gw_nid, 0);
1100 /* ...otherwise do nothing. We can only establish new connections
1101 * if we have autroutes, and these connect on demand. */
1104 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1105 struct tcp_opt *sock2tcp_opt(struct sock *sk)
1107 return &(sk->tp_pinfo.af_tcp);
1110 struct tcp_opt *sock2tcp_opt(struct sock *sk)
1112 struct tcp_sock *s = (struct tcp_sock *)sk;
1118 ksocknal_push_conn (ksock_conn_t *conn)
1127 rc = ksocknal_getconnsock (conn);
1128 if (rc != 0) /* being shut down */
1131 sk = conn->ksnc_sock->sk;
1132 tp = sock2tcp_opt(sk);
1135 nonagle = tp->nonagle;
1142 rc = sk->sk_prot->setsockopt (sk, SOL_TCP, TCP_NODELAY,
1143 (char *)&val, sizeof (val));
1149 tp->nonagle = nonagle;
1152 ksocknal_putconnsock (conn);
1156 ksocknal_push_peer (ksock_peer_t *peer)
1160 struct list_head *tmp;
1163 for (index = 0; ; index++) {
1164 read_lock (&ksocknal_data.ksnd_global_lock);
1169 list_for_each (tmp, &peer->ksnp_conns) {
1171 conn = list_entry (tmp, ksock_conn_t, ksnc_list);
1172 atomic_inc (&conn->ksnc_refcount);
1177 read_unlock (&ksocknal_data.ksnd_global_lock);
1182 ksocknal_push_conn (conn);
1183 ksocknal_put_conn (conn);
1188 ksocknal_push (ptl_nid_t nid)
1191 struct list_head *tmp;
1197 if (nid != PTL_NID_ANY) {
1198 peer = ksocknal_get_peer (nid);
1202 ksocknal_push_peer (peer);
1203 ksocknal_put_peer (peer);
1208 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1209 for (j = 0; ; j++) {
1210 read_lock (&ksocknal_data.ksnd_global_lock);
1215 list_for_each (tmp, &ksocknal_data.ksnd_peers[i]) {
1217 peer = list_entry(tmp, ksock_peer_t,
1219 atomic_inc (&peer->ksnp_refcount);
1224 read_unlock (&ksocknal_data.ksnd_global_lock);
1228 ksocknal_push_peer (peer);
1229 ksocknal_put_peer (peer);
1239 ksocknal_cmd(struct portals_cfg *pcfg, void * private)
1243 LASSERT (pcfg != NULL);
1245 switch(pcfg->pcfg_command) {
1246 case NAL_CMD_GET_AUTOCONN: {
1247 ksock_route_t *route = ksocknal_get_route_by_idx (pcfg->pcfg_count);
1253 pcfg->pcfg_nid = route->ksnr_peer->ksnp_nid;
1254 pcfg->pcfg_id = route->ksnr_ipaddr;
1255 pcfg->pcfg_misc = route->ksnr_port;
1256 pcfg->pcfg_count = route->ksnr_conn_count;
1257 pcfg->pcfg_size = route->ksnr_buffer_size;
1258 pcfg->pcfg_wait = route->ksnr_sharecount;
1259 pcfg->pcfg_flags = (route->ksnr_irq_affinity ? 2 : 0) |
1260 (route->ksnr_eager ? 4 : 0);
1261 ksocknal_put_route (route);
1265 case NAL_CMD_ADD_AUTOCONN: {
1266 rc = ksocknal_add_route (pcfg->pcfg_nid, pcfg->pcfg_id,
1267 pcfg->pcfg_misc, pcfg->pcfg_size,
1268 (pcfg->pcfg_flags & 0x02) != 0,
1269 (pcfg->pcfg_flags & 0x04) != 0,
1270 (pcfg->pcfg_flags & 0x08) != 0);
1273 case NAL_CMD_DEL_AUTOCONN: {
1274 rc = ksocknal_del_route (pcfg->pcfg_nid, pcfg->pcfg_id,
1275 (pcfg->pcfg_flags & 1) != 0,
1276 (pcfg->pcfg_flags & 2) != 0);
1279 case NAL_CMD_GET_CONN: {
1280 ksock_conn_t *conn = ksocknal_get_conn_by_idx (pcfg->pcfg_count);
1286 pcfg->pcfg_nid = conn->ksnc_peer->ksnp_nid;
1287 pcfg->pcfg_id = conn->ksnc_ipaddr;
1288 pcfg->pcfg_misc = conn->ksnc_port;
1289 pcfg->pcfg_flags = conn->ksnc_type;
1290 ksocknal_put_conn (conn);
1294 case NAL_CMD_REGISTER_PEER_FD: {
1295 struct socket *sock = sockfd_lookup (pcfg->pcfg_fd, &rc);
1296 int type = pcfg->pcfg_misc;
1302 case SOCKNAL_CONN_NONE:
1303 case SOCKNAL_CONN_ANY:
1304 case SOCKNAL_CONN_CONTROL:
1305 case SOCKNAL_CONN_BULK_IN:
1306 case SOCKNAL_CONN_BULK_OUT:
1307 rc = ksocknal_create_conn(NULL, sock, pcfg->pcfg_flags, type);
1315 case NAL_CMD_CLOSE_CONNECTION: {
1316 rc = ksocknal_close_matching_conns (pcfg->pcfg_nid,
1320 case NAL_CMD_REGISTER_MYNID: {
1321 rc = ksocknal_set_mynid (pcfg->pcfg_nid);
1324 case NAL_CMD_PUSH_CONNECTION: {
1325 rc = ksocknal_push (pcfg->pcfg_nid);
1334 ksocknal_free_fmbs (ksock_fmb_pool_t *p)
1336 int npages = p->fmp_buff_pages;
1340 LASSERT (list_empty(&p->fmp_blocked_conns));
1341 LASSERT (p->fmp_nactive_fmbs == 0);
1343 while (!list_empty(&p->fmp_idle_fmbs)) {
1345 fmb = list_entry(p->fmp_idle_fmbs.next,
1346 ksock_fmb_t, fmb_list);
1348 for (i = 0; i < npages; i++)
1349 if (fmb->fmb_kiov[i].kiov_page != NULL)
1350 __free_page(fmb->fmb_kiov[i].kiov_page);
1352 list_del(&fmb->fmb_list);
1353 PORTAL_FREE(fmb, offsetof(ksock_fmb_t, fmb_kiov[npages]));
1358 ksocknal_free_buffers (void)
1360 ksocknal_free_fmbs(&ksocknal_data.ksnd_small_fmp);
1361 ksocknal_free_fmbs(&ksocknal_data.ksnd_large_fmp);
1363 LASSERT (atomic_read(&ksocknal_data.ksnd_nactive_ltxs) == 0);
1365 if (ksocknal_data.ksnd_schedulers != NULL)
1366 PORTAL_FREE (ksocknal_data.ksnd_schedulers,
1367 sizeof (ksock_sched_t) * SOCKNAL_N_SCHED);
1369 PORTAL_FREE (ksocknal_data.ksnd_peers,
1370 sizeof (struct list_head) *
1371 ksocknal_data.ksnd_peer_hash_size);
1375 ksocknal_api_shutdown (nal_t *nal)
1379 if (nal->nal_refct != 0) {
1380 /* This module got the first ref */
1381 PORTAL_MODULE_UNUSE;
1385 CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1386 atomic_read (&portal_kmemory));
1388 LASSERT(nal == &ksocknal_api);
1390 switch (ksocknal_data.ksnd_init) {
1394 case SOCKNAL_INIT_ALL:
1395 libcfs_nal_cmd_unregister(SOCKNAL);
1397 ksocknal_data.ksnd_init = SOCKNAL_INIT_LIB;
1400 case SOCKNAL_INIT_LIB:
1401 /* No more calls to ksocknal_cmd() to create new
1402 * autoroutes/connections since we're being unloaded. */
1404 /* Delete all autoroute entries */
1405 ksocknal_del_route(PTL_NID_ANY, 0, 0, 0);
1407 /* Delete all connections */
1408 ksocknal_close_matching_conns (PTL_NID_ANY, 0);
1410 /* Wait for all peer state to clean up */
1412 while (atomic_read (&ksocknal_data.ksnd_npeers) != 0) {
1414 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1415 "waiting for %d peers to disconnect\n",
1416 atomic_read (&ksocknal_data.ksnd_npeers));
1417 set_current_state (TASK_UNINTERRUPTIBLE);
1418 schedule_timeout (HZ);
1421 /* Tell lib we've stopped calling into her. */
1422 lib_fini(&ksocknal_lib);
1424 ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
1427 case SOCKNAL_INIT_DATA:
1428 /* Module refcount only gets to zero when all peers
1429 * have been closed so all lists must be empty */
1430 LASSERT (atomic_read (&ksocknal_data.ksnd_npeers) == 0);
1431 LASSERT (ksocknal_data.ksnd_peers != NULL);
1432 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1433 LASSERT (list_empty (&ksocknal_data.ksnd_peers[i]));
1435 LASSERT (list_empty (&ksocknal_data.ksnd_enomem_conns));
1436 LASSERT (list_empty (&ksocknal_data.ksnd_zombie_conns));
1437 LASSERT (list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
1438 LASSERT (list_empty (&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns));
1439 LASSERT (list_empty (&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns));
1441 if (ksocknal_data.ksnd_schedulers != NULL)
1442 for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1443 ksock_sched_t *kss =
1444 &ksocknal_data.ksnd_schedulers[i];
1446 LASSERT (list_empty (&kss->kss_tx_conns));
1447 LASSERT (list_empty (&kss->kss_rx_conns));
1448 LASSERT (kss->kss_nconns == 0);
1451 /* stop router calling me */
1452 kpr_shutdown (&ksocknal_data.ksnd_router);
1454 /* flag threads to terminate; wake and wait for them to die */
1455 ksocknal_data.ksnd_shuttingdown = 1;
1457 wake_up_all (&ksocknal_data.ksnd_autoconnectd_waitq);
1458 wake_up_all (&ksocknal_data.ksnd_reaper_waitq);
1460 for (i = 0; i < SOCKNAL_N_SCHED; i++)
1461 wake_up_all(&ksocknal_data.ksnd_schedulers[i].kss_waitq);
1464 while (atomic_read (&ksocknal_data.ksnd_nthreads) != 0) {
1466 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1467 "waiting for %d threads to terminate\n",
1468 atomic_read (&ksocknal_data.ksnd_nthreads));
1469 set_current_state (TASK_UNINTERRUPTIBLE);
1470 schedule_timeout (HZ);
1473 kpr_deregister (&ksocknal_data.ksnd_router);
1475 ksocknal_free_buffers();
1477 ksocknal_data.ksnd_init = SOCKNAL_INIT_NOTHING;
1480 case SOCKNAL_INIT_NOTHING:
1484 CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1485 atomic_read (&portal_kmemory));
1487 printk(KERN_INFO "Lustre: Routing socket NAL unloaded (final mem %d)\n",
1488 atomic_read(&portal_kmemory));
1493 ksocknal_init_incarnation (void)
1497 /* The incarnation number is the time this module loaded and it
1498 * identifies this particular instance of the socknal. Hopefully
1499 * we won't be able to reboot more frequently than 1MHz for the
1500 * forseeable future :) */
1502 do_gettimeofday(&tv);
1504 ksocknal_data.ksnd_incarnation =
1505 (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
1509 ksocknal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
1510 ptl_ni_limits_t *requested_limits,
1511 ptl_ni_limits_t *actual_limits)
1513 ptl_process_id_t process_id;
1514 int pkmem = atomic_read(&portal_kmemory);
1519 LASSERT (nal == &ksocknal_api);
1521 if (nal->nal_refct != 0) {
1522 if (actual_limits != NULL)
1523 *actual_limits = ksocknal_lib.libnal_ni.ni_actual_limits;
1524 /* This module got the first ref */
1529 LASSERT (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
1531 memset (&ksocknal_data, 0, sizeof (ksocknal_data)); /* zero pointers */
1533 ksocknal_init_incarnation();
1535 ksocknal_data.ksnd_peer_hash_size = SOCKNAL_PEER_HASH_SIZE;
1536 PORTAL_ALLOC (ksocknal_data.ksnd_peers,
1537 sizeof (struct list_head) * ksocknal_data.ksnd_peer_hash_size);
1538 if (ksocknal_data.ksnd_peers == NULL)
1541 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++)
1542 INIT_LIST_HEAD(&ksocknal_data.ksnd_peers[i]);
1544 rwlock_init(&ksocknal_data.ksnd_global_lock);
1546 spin_lock_init(&ksocknal_data.ksnd_small_fmp.fmp_lock);
1547 INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_idle_fmbs);
1548 INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns);
1549 ksocknal_data.ksnd_small_fmp.fmp_buff_pages = SOCKNAL_SMALL_FWD_PAGES;
1551 spin_lock_init(&ksocknal_data.ksnd_large_fmp.fmp_lock);
1552 INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_idle_fmbs);
1553 INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns);
1554 ksocknal_data.ksnd_large_fmp.fmp_buff_pages = SOCKNAL_LARGE_FWD_PAGES;
1556 spin_lock_init (&ksocknal_data.ksnd_reaper_lock);
1557 INIT_LIST_HEAD (&ksocknal_data.ksnd_enomem_conns);
1558 INIT_LIST_HEAD (&ksocknal_data.ksnd_zombie_conns);
1559 INIT_LIST_HEAD (&ksocknal_data.ksnd_deathrow_conns);
1560 init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
1562 spin_lock_init (&ksocknal_data.ksnd_autoconnectd_lock);
1563 INIT_LIST_HEAD (&ksocknal_data.ksnd_autoconnectd_routes);
1564 init_waitqueue_head(&ksocknal_data.ksnd_autoconnectd_waitq);
1566 /* NB memset above zeros whole of ksocknal_data, including
1567 * ksocknal_data.ksnd_irqinfo[all].ksni_valid */
1569 /* flag lists/ptrs/locks initialised */
1570 ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
1572 PORTAL_ALLOC(ksocknal_data.ksnd_schedulers,
1573 sizeof(ksock_sched_t) * SOCKNAL_N_SCHED);
1574 if (ksocknal_data.ksnd_schedulers == NULL) {
1575 ksocknal_api_shutdown (nal);
1579 for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1580 ksock_sched_t *kss = &ksocknal_data.ksnd_schedulers[i];
1582 spin_lock_init (&kss->kss_lock);
1583 INIT_LIST_HEAD (&kss->kss_rx_conns);
1584 INIT_LIST_HEAD (&kss->kss_tx_conns);
1586 INIT_LIST_HEAD (&kss->kss_zctxdone_list);
1588 init_waitqueue_head (&kss->kss_waitq);
1591 /* NB we have to wait to be told our true NID... */
1595 rc = lib_init(&ksocknal_lib, nal, process_id,
1596 requested_limits, actual_limits);
1598 CERROR("lib_init failed: error %d\n", rc);
1599 ksocknal_api_shutdown (nal);
1603 ksocknal_data.ksnd_init = SOCKNAL_INIT_LIB; // flag lib_init() called
1605 for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1606 rc = ksocknal_thread_start (ksocknal_scheduler,
1607 &ksocknal_data.ksnd_schedulers[i]);
1609 CERROR("Can't spawn socknal scheduler[%d]: %d\n",
1611 ksocknal_api_shutdown (nal);
1616 for (i = 0; i < SOCKNAL_N_AUTOCONNECTD; i++) {
1617 rc = ksocknal_thread_start (ksocknal_autoconnectd, (void *)((long)i));
1619 CERROR("Can't spawn socknal autoconnectd: %d\n", rc);
1620 ksocknal_api_shutdown (nal);
1625 rc = ksocknal_thread_start (ksocknal_reaper, NULL);
1627 CERROR ("Can't spawn socknal reaper: %d\n", rc);
1628 ksocknal_api_shutdown (nal);
1632 rc = kpr_register(&ksocknal_data.ksnd_router,
1633 &ksocknal_router_interface);
1635 CDEBUG(D_NET, "Can't initialise routing interface "
1636 "(rc = %d): not routing\n", rc);
1638 /* Only allocate forwarding buffers if there's a router */
1640 for (i = 0; i < (SOCKNAL_SMALL_FWD_NMSGS +
1641 SOCKNAL_LARGE_FWD_NMSGS); i++) {
1643 ksock_fmb_pool_t *pool;
1646 if (i < SOCKNAL_SMALL_FWD_NMSGS)
1647 pool = &ksocknal_data.ksnd_small_fmp;
1649 pool = &ksocknal_data.ksnd_large_fmp;
1651 PORTAL_ALLOC(fmb, offsetof(ksock_fmb_t,
1652 fmb_kiov[pool->fmp_buff_pages]));
1654 ksocknal_api_shutdown(nal);
1658 fmb->fmb_pool = pool;
1660 for (j = 0; j < pool->fmp_buff_pages; j++) {
1661 fmb->fmb_kiov[j].kiov_page = alloc_page(GFP_KERNEL);
1663 if (fmb->fmb_kiov[j].kiov_page == NULL) {
1664 ksocknal_api_shutdown (nal);
1668 LASSERT(page_address(fmb->fmb_kiov[j].kiov_page) != NULL);
1671 list_add(&fmb->fmb_list, &pool->fmp_idle_fmbs);
1675 rc = libcfs_nal_cmd_register(SOCKNAL, &ksocknal_cmd, NULL);
1677 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
1678 ksocknal_api_shutdown (nal);
1682 /* flag everything initialised */
1683 ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
1685 printk(KERN_INFO "Lustre: Routing socket NAL loaded "
1686 "(Routing %s, initial mem %d, incarnation "LPX64")\n",
1687 kpr_routing (&ksocknal_data.ksnd_router) ?
1688 "enabled" : "disabled", pkmem, ksocknal_data.ksnd_incarnation);
1694 ksocknal_module_fini (void)
1696 #ifdef CONFIG_SYSCTL
1697 if (ksocknal_tunables.ksnd_sysctl != NULL)
1698 unregister_sysctl_table (ksocknal_tunables.ksnd_sysctl);
1700 PtlNIFini(ksocknal_ni);
1702 ptl_unregister_nal(SOCKNAL);
1706 ksocknal_module_init (void)
1710 /* packet descriptor must fit in a router descriptor's scratchpad */
1711 LASSERT(sizeof (ksock_tx_t) <= sizeof (kprfd_scratch_t));
1712 /* the following must be sizeof(int) for proc_dointvec() */
1713 LASSERT(sizeof (ksocknal_tunables.ksnd_io_timeout) == sizeof (int));
1714 LASSERT(sizeof (ksocknal_tunables.ksnd_eager_ack) == sizeof (int));
1715 LASSERT(sizeof (ksocknal_tunables.ksnd_typed_conns) == sizeof (int));
1716 LASSERT(sizeof (ksocknal_tunables.ksnd_min_bulk) == sizeof (int));
1718 LASSERT(sizeof (ksocknal_tunables.ksnd_zc_min_frag) == sizeof (int));
1720 /* check ksnr_connected/connecting field large enough */
1721 LASSERT(SOCKNAL_CONN_NTYPES <= 4);
1723 ksocknal_api.nal_ni_init = ksocknal_api_startup;
1724 ksocknal_api.nal_ni_fini = ksocknal_api_shutdown;
1726 /* Initialise dynamic tunables to defaults once only */
1727 ksocknal_tunables.ksnd_io_timeout = SOCKNAL_IO_TIMEOUT;
1728 ksocknal_tunables.ksnd_eager_ack = SOCKNAL_EAGER_ACK;
1729 ksocknal_tunables.ksnd_typed_conns = SOCKNAL_TYPED_CONNS;
1730 ksocknal_tunables.ksnd_min_bulk = SOCKNAL_MIN_BULK;
1732 ksocknal_tunables.ksnd_zc_min_frag = SOCKNAL_ZC_MIN_FRAG;
1735 rc = ptl_register_nal(SOCKNAL, &ksocknal_api);
1737 CERROR("Can't register SOCKNAL: %d\n", rc);
1738 return (-ENOMEM); /* or something... */
1741 /* Pure gateways want the NAL started up at module load time... */
1742 rc = PtlNIInit(SOCKNAL, 0, NULL, NULL, &ksocknal_ni);
1743 if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
1744 ptl_unregister_nal(SOCKNAL);
1748 #ifdef CONFIG_SYSCTL
1749 /* Press on regardless even if registering sysctl doesn't work */
1750 ksocknal_tunables.ksnd_sysctl =
1751 register_sysctl_table (ksocknal_top_ctl_table, 0);
1756 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1757 MODULE_DESCRIPTION("Kernel TCP Socket NAL v0.01");
1758 MODULE_LICENSE("GPL");
1760 module_init(ksocknal_module_init);
1761 module_exit(ksocknal_module_fini);