1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5 * Author: Zach Brown <zab@zabbo.net>
6 * Author: Peter J. Braam <braam@clusterfs.com>
7 * Author: Phil Schwan <phil@clusterfs.com>
8 * Author: Eric Barton <eric@bartonsoftware.com>
10 * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
12 * Portals is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Portals is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Portals; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
29 ksock_nal_data_t ksocknal_data;
30 ptl_handle_ni_t ksocknal_ni;
31 ksock_tunables_t ksocknal_tunables;
33 kpr_nal_interface_t ksocknal_router_interface = {
35 kprni_arg: &ksocknal_data,
36 kprni_fwd: ksocknal_fwd_packet,
37 kprni_notify: ksocknal_notify,
41 #define SOCKNAL_SYSCTL 200
43 #define SOCKNAL_SYSCTL_TIMEOUT 1
44 #define SOCKNAL_SYSCTL_EAGER_ACK 2
45 #define SOCKNAL_SYSCTL_ZERO_COPY 3
46 #define SOCKNAL_SYSCTL_TYPED 4
47 #define SOCKNAL_SYSCTL_MIN_BULK 5
49 static ctl_table ksocknal_ctl_table[] = {
50 {SOCKNAL_SYSCTL_TIMEOUT, "timeout",
51 &ksocknal_tunables.ksnd_io_timeout, sizeof (int),
52 0644, NULL, &proc_dointvec},
53 {SOCKNAL_SYSCTL_EAGER_ACK, "eager_ack",
54 &ksocknal_tunables.ksnd_eager_ack, sizeof (int),
55 0644, NULL, &proc_dointvec},
57 {SOCKNAL_SYSCTL_ZERO_COPY, "zero_copy",
58 &ksocknal_tunables.ksnd_zc_min_frag, sizeof (int),
59 0644, NULL, &proc_dointvec},
61 {SOCKNAL_SYSCTL_TYPED, "typed",
62 &ksocknal_tunables.ksnd_typed_conns, sizeof (int),
63 0644, NULL, &proc_dointvec},
64 {SOCKNAL_SYSCTL_MIN_BULK, "min_bulk",
65 &ksocknal_tunables.ksnd_min_bulk, sizeof (int),
66 0644, NULL, &proc_dointvec},
70 static ctl_table ksocknal_top_ctl_table[] = {
71 {SOCKNAL_SYSCTL, "socknal", NULL, 0, 0555, ksocknal_ctl_table},
77 ksocknal_api_forward(nal_t *nal, int id, void *args, size_t args_len,
78 void *ret, size_t ret_len)
84 nal_cb = k->ksnd_nal_cb;
86 lib_dispatch(nal_cb, k, id, args, ret); /* ksocknal_send needs k */
91 ksocknal_api_lock(nal_t *nal, unsigned long *flags)
97 nal_cb = k->ksnd_nal_cb;
98 nal_cb->cb_cli(nal_cb,flags);
102 ksocknal_api_unlock(nal_t *nal, unsigned long *flags)
108 nal_cb = k->ksnd_nal_cb;
109 nal_cb->cb_sti(nal_cb,flags);
113 ksocknal_api_yield(nal_t *nal, unsigned long *flags, int milliseconds)
115 /* NB called holding statelock */
117 unsigned long now = jiffies;
119 CDEBUG (D_NET, "yield\n");
121 if (milliseconds == 0) {
126 init_waitqueue_entry(&wait, current);
127 set_current_state (TASK_INTERRUPTIBLE);
128 add_wait_queue (&ksocknal_data.ksnd_yield_waitq, &wait);
130 ksocknal_api_unlock(nal, flags);
132 if (milliseconds < 0)
135 schedule_timeout((milliseconds * HZ) / 1000);
137 ksocknal_api_lock(nal, flags);
139 remove_wait_queue (&ksocknal_data.ksnd_yield_waitq, &wait);
141 if (milliseconds > 0) {
142 milliseconds -= ((jiffies - now) * 1000) / HZ;
143 if (milliseconds < 0)
147 return (milliseconds);
151 ksocknal_set_mynid(ptl_nid_t nid)
153 lib_ni_t *ni = &ksocknal_lib.ni;
155 /* FIXME: we have to do this because we call lib_init() at module
156 * insertion time, which is before we have 'mynid' available. lib_init
157 * sets the NAL's nid, which it uses to tell other nodes where packets
158 * are coming from. This is not a very graceful solution to this
161 CDEBUG(D_IOCTL, "setting mynid to "LPX64" (old nid="LPX64")\n",
169 ksocknal_bind_irq (unsigned int irq)
171 #if (defined(CONFIG_SMP) && CPU_AFFINITY)
175 ksock_irqinfo_t *info;
176 char *argv[] = {"/bin/sh",
180 char *envp[] = {"HOME=/",
181 "PATH=/sbin:/bin:/usr/sbin:/usr/bin",
184 LASSERT (irq < NR_IRQS);
185 if (irq == 0) /* software NIC */
188 info = &ksocknal_data.ksnd_irqinfo[irq];
190 write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
192 LASSERT (info->ksni_valid);
193 bind = !info->ksni_bound;
194 info->ksni_bound = 1;
196 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
198 if (!bind) /* bound already */
201 snprintf (cmdline, sizeof (cmdline),
202 "echo %d > /proc/irq/%u/smp_affinity", 1 << info->ksni_sched, irq);
204 printk (KERN_INFO "Lustre: Binding irq %u to CPU %d with cmd: %s\n",
205 irq, info->ksni_sched, cmdline);
207 /* FIXME: Find a better method of setting IRQ affinity...
210 USERMODEHELPER(argv[0], argv, envp);
215 ksocknal_create_route (__u32 ipaddr, int port, int buffer_size,
216 int irq_affinity, int eager)
218 ksock_route_t *route;
220 PORTAL_ALLOC (route, sizeof (*route));
224 atomic_set (&route->ksnr_refcount, 1);
225 route->ksnr_sharecount = 0;
226 route->ksnr_peer = NULL;
227 route->ksnr_timeout = jiffies;
228 route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
229 route->ksnr_ipaddr = ipaddr;
230 route->ksnr_port = port;
231 route->ksnr_buffer_size = buffer_size;
232 route->ksnr_irq_affinity = irq_affinity;
233 route->ksnr_eager = eager;
234 route->ksnr_connecting = 0;
235 route->ksnr_connected = 0;
236 route->ksnr_deleted = 0;
237 route->ksnr_conn_count = 0;
243 ksocknal_destroy_route (ksock_route_t *route)
245 LASSERT (route->ksnr_sharecount == 0);
247 if (route->ksnr_peer != NULL)
248 ksocknal_put_peer (route->ksnr_peer);
250 PORTAL_FREE (route, sizeof (*route));
254 ksocknal_put_route (ksock_route_t *route)
256 CDEBUG (D_OTHER, "putting route[%p] (%d)\n",
257 route, atomic_read (&route->ksnr_refcount));
259 LASSERT (atomic_read (&route->ksnr_refcount) > 0);
260 if (!atomic_dec_and_test (&route->ksnr_refcount))
263 ksocknal_destroy_route (route);
267 ksocknal_create_peer (ptl_nid_t nid)
271 LASSERT (nid != PTL_NID_ANY);
273 PORTAL_ALLOC (peer, sizeof (*peer));
277 memset (peer, 0, sizeof (*peer));
279 peer->ksnp_nid = nid;
280 atomic_set (&peer->ksnp_refcount, 1); /* 1 ref for caller */
281 peer->ksnp_closing = 0;
282 INIT_LIST_HEAD (&peer->ksnp_conns);
283 INIT_LIST_HEAD (&peer->ksnp_routes);
284 INIT_LIST_HEAD (&peer->ksnp_tx_queue);
286 atomic_inc (&ksocknal_data.ksnd_npeers);
291 ksocknal_destroy_peer (ksock_peer_t *peer)
293 CDEBUG (D_NET, "peer "LPX64" %p deleted\n", peer->ksnp_nid, peer);
295 LASSERT (atomic_read (&peer->ksnp_refcount) == 0);
296 LASSERT (list_empty (&peer->ksnp_conns));
297 LASSERT (list_empty (&peer->ksnp_routes));
298 LASSERT (list_empty (&peer->ksnp_tx_queue));
300 PORTAL_FREE (peer, sizeof (*peer));
302 /* NB a peer's connections and autoconnect routes keep a reference
303 * on their peer until they are destroyed, so we can be assured
304 * that _all_ state to do with this peer has been cleaned up when
305 * its refcount drops to zero. */
306 atomic_dec (&ksocknal_data.ksnd_npeers);
310 ksocknal_put_peer (ksock_peer_t *peer)
312 CDEBUG (D_OTHER, "putting peer[%p] -> "LPX64" (%d)\n",
313 peer, peer->ksnp_nid,
314 atomic_read (&peer->ksnp_refcount));
316 LASSERT (atomic_read (&peer->ksnp_refcount) > 0);
317 if (!atomic_dec_and_test (&peer->ksnp_refcount))
320 ksocknal_destroy_peer (peer);
324 ksocknal_find_peer_locked (ptl_nid_t nid)
326 struct list_head *peer_list = ksocknal_nid2peerlist (nid);
327 struct list_head *tmp;
330 list_for_each (tmp, peer_list) {
332 peer = list_entry (tmp, ksock_peer_t, ksnp_list);
334 LASSERT (!peer->ksnp_closing);
335 LASSERT (!(list_empty (&peer->ksnp_routes) &&
336 list_empty (&peer->ksnp_conns)));
338 if (peer->ksnp_nid != nid)
341 CDEBUG(D_NET, "got peer [%p] -> "LPX64" (%d)\n",
342 peer, nid, atomic_read (&peer->ksnp_refcount));
349 ksocknal_get_peer (ptl_nid_t nid)
353 read_lock (&ksocknal_data.ksnd_global_lock);
354 peer = ksocknal_find_peer_locked (nid);
355 if (peer != NULL) /* +1 ref for caller? */
356 atomic_inc (&peer->ksnp_refcount);
357 read_unlock (&ksocknal_data.ksnd_global_lock);
363 ksocknal_unlink_peer_locked (ksock_peer_t *peer)
365 LASSERT (!peer->ksnp_closing);
366 peer->ksnp_closing = 1;
367 list_del (&peer->ksnp_list);
368 /* lose peerlist's ref */
369 ksocknal_put_peer (peer);
373 ksocknal_get_route_by_idx (int index)
376 struct list_head *ptmp;
377 ksock_route_t *route;
378 struct list_head *rtmp;
381 read_lock (&ksocknal_data.ksnd_global_lock);
383 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
384 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
385 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
387 LASSERT (!(list_empty (&peer->ksnp_routes) &&
388 list_empty (&peer->ksnp_conns)));
390 list_for_each (rtmp, &peer->ksnp_routes) {
394 route = list_entry (rtmp, ksock_route_t, ksnr_list);
395 atomic_inc (&route->ksnr_refcount);
396 read_unlock (&ksocknal_data.ksnd_global_lock);
402 read_unlock (&ksocknal_data.ksnd_global_lock);
407 ksocknal_add_route (ptl_nid_t nid, __u32 ipaddr, int port, int bufnob,
408 int bind_irq, int share, int eager)
413 ksock_route_t *route;
414 struct list_head *rtmp;
415 ksock_route_t *route2;
417 if (nid == PTL_NID_ANY)
420 /* Have a brand new peer ready... */
421 peer = ksocknal_create_peer (nid);
425 route = ksocknal_create_route (ipaddr, port, bufnob,
428 ksocknal_put_peer (peer);
432 write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
434 peer2 = ksocknal_find_peer_locked (nid);
436 ksocknal_put_peer (peer);
439 /* peer table takes existing ref on peer */
440 list_add (&peer->ksnp_list,
441 ksocknal_nid2peerlist (nid));
446 /* check for existing route to this NID via this ipaddr */
447 list_for_each (rtmp, &peer->ksnp_routes) {
448 route2 = list_entry (rtmp, ksock_route_t, ksnr_list);
450 if (route2->ksnr_ipaddr == ipaddr)
457 if (route2 != NULL) {
458 ksocknal_put_route (route);
461 /* route takes a ref on peer */
462 route->ksnr_peer = peer;
463 atomic_inc (&peer->ksnp_refcount);
464 /* peer's route list takes existing ref on route */
465 list_add_tail (&route->ksnr_list, &peer->ksnp_routes);
468 route->ksnr_sharecount++;
470 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
476 ksocknal_del_route_locked (ksock_route_t *route, int share, int keep_conn)
478 ksock_peer_t *peer = route->ksnr_peer;
480 struct list_head *ctmp;
481 struct list_head *cnxt;
484 route->ksnr_sharecount = 0;
486 route->ksnr_sharecount--;
487 if (route->ksnr_sharecount != 0)
491 list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
492 conn = list_entry(ctmp, ksock_conn_t, ksnc_list);
494 if (conn->ksnc_route != route)
498 ksocknal_close_conn_locked (conn, 0);
502 /* keeping the conn; just dissociate it and route... */
503 conn->ksnc_route = NULL;
504 ksocknal_put_route (route); /* drop conn's ref on route */
507 route->ksnr_deleted = 1;
508 list_del (&route->ksnr_list);
509 ksocknal_put_route (route); /* drop peer's ref */
511 if (list_empty (&peer->ksnp_routes) &&
512 list_empty (&peer->ksnp_conns)) {
513 /* I've just removed the last autoconnect route of a peer
514 * with no active connections */
515 ksocknal_unlink_peer_locked (peer);
520 ksocknal_del_route (ptl_nid_t nid, __u32 ipaddr, int share, int keep_conn)
523 struct list_head *ptmp;
524 struct list_head *pnxt;
526 struct list_head *rtmp;
527 struct list_head *rnxt;
528 ksock_route_t *route;
534 write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
536 if (nid != PTL_NID_ANY)
537 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
540 hi = ksocknal_data.ksnd_peer_hash_size - 1;
543 for (i = lo; i <= hi; i++) {
544 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
545 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
547 if (!(nid == PTL_NID_ANY || peer->ksnp_nid == nid))
550 list_for_each_safe (rtmp, rnxt, &peer->ksnp_routes) {
551 route = list_entry (rtmp, ksock_route_t,
555 route->ksnr_ipaddr == ipaddr))
558 ksocknal_del_route_locked (route, share, keep_conn);
559 rc = 0; /* matched something */
566 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
572 ksocknal_get_conn_by_idx (int index)
575 struct list_head *ptmp;
577 struct list_head *ctmp;
580 read_lock (&ksocknal_data.ksnd_global_lock);
582 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
583 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
584 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
586 LASSERT (!(list_empty (&peer->ksnp_routes) &&
587 list_empty (&peer->ksnp_conns)));
589 list_for_each (ctmp, &peer->ksnp_conns) {
593 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
594 atomic_inc (&conn->ksnc_refcount);
595 read_unlock (&ksocknal_data.ksnd_global_lock);
601 read_unlock (&ksocknal_data.ksnd_global_lock);
606 ksocknal_get_peer_addr (ksock_conn_t *conn)
608 struct sockaddr_in sin;
609 int len = sizeof (sin);
612 rc = conn->ksnc_sock->ops->getname (conn->ksnc_sock,
613 (struct sockaddr *)&sin, &len, 2);
614 /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
615 LASSERT (!conn->ksnc_closing);
616 LASSERT (len <= sizeof (sin));
619 CERROR ("Error %d getting sock peer IP\n", rc);
623 conn->ksnc_ipaddr = ntohl (sin.sin_addr.s_addr);
624 conn->ksnc_port = ntohs (sin.sin_port);
628 ksocknal_conn_irq (ksock_conn_t *conn)
631 struct dst_entry *dst;
633 dst = sk_dst_get (conn->ksnc_sock->sk);
635 if (dst->dev != NULL) {
637 if (irq >= NR_IRQS) {
638 CERROR ("Unexpected IRQ %x\n", irq);
645 /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
646 LASSERT (!conn->ksnc_closing);
651 ksocknal_choose_scheduler_locked (unsigned int irq)
653 ksock_sched_t *sched;
654 ksock_irqinfo_t *info;
657 LASSERT (irq < NR_IRQS);
658 info = &ksocknal_data.ksnd_irqinfo[irq];
660 if (irq != 0 && /* hardware NIC */
661 info->ksni_valid) { /* already set up */
662 return (&ksocknal_data.ksnd_schedulers[info->ksni_sched]);
665 /* software NIC (irq == 0) || not associated with a scheduler yet.
666 * Choose the CPU with the fewest connections... */
667 sched = &ksocknal_data.ksnd_schedulers[0];
668 for (i = 1; i < SOCKNAL_N_SCHED; i++)
669 if (sched->kss_nconns >
670 ksocknal_data.ksnd_schedulers[i].kss_nconns)
671 sched = &ksocknal_data.ksnd_schedulers[i];
673 if (irq != 0) { /* Hardware NIC */
674 info->ksni_valid = 1;
675 info->ksni_sched = sched - ksocknal_data.ksnd_schedulers;
678 LASSERT (info->ksni_sched == sched - ksocknal_data.ksnd_schedulers);
685 ksocknal_create_conn (ksock_route_t *route, struct socket *sock,
686 int bind_irq, int type)
694 ksock_sched_t *sched;
699 /* NB, sock has an associated file since (a) this connection might
700 * have been created in userland and (b) we need to refcount the
701 * socket so that we don't close it while I/O is being done on
702 * it, and sock->file has that pre-cooked... */
703 LASSERT (sock->file != NULL);
704 LASSERT (file_count(sock->file) > 0);
706 rc = ksocknal_setup_sock (sock);
711 /* acceptor or explicit connect */
714 LASSERT (type != SOCKNAL_CONN_NONE);
715 /* autoconnect: expect this nid on exchange */
716 nid = route->ksnr_peer->ksnp_nid;
719 rc = ksocknal_hello (sock, &nid, &type, &incarnation);
724 if (route == NULL) { /* not autoconnect */
725 /* Assume this socket connects to a brand new peer */
726 peer = ksocknal_create_peer (nid);
731 PORTAL_ALLOC(conn, sizeof(*conn));
734 ksocknal_put_peer (peer);
738 memset (conn, 0, sizeof (*conn));
739 conn->ksnc_peer = NULL;
740 conn->ksnc_route = NULL;
741 conn->ksnc_sock = sock;
742 conn->ksnc_type = type;
743 conn->ksnc_incarnation = incarnation;
744 conn->ksnc_saved_data_ready = sock->sk->sk_data_ready;
745 conn->ksnc_saved_write_space = sock->sk->sk_write_space;
746 atomic_set (&conn->ksnc_refcount, 1); /* 1 ref for me */
748 conn->ksnc_rx_ready = 0;
749 conn->ksnc_rx_scheduled = 0;
750 ksocknal_new_packet (conn, 0);
752 INIT_LIST_HEAD (&conn->ksnc_tx_queue);
753 conn->ksnc_tx_ready = 0;
754 conn->ksnc_tx_scheduled = 0;
755 atomic_set (&conn->ksnc_tx_nob, 0);
757 ksocknal_get_peer_addr (conn);
759 CWARN("New conn nid:"LPX64" ip:%08x/%d incarnation:"LPX64"\n",
760 nid, conn->ksnc_ipaddr, conn->ksnc_port, incarnation);
762 irq = ksocknal_conn_irq (conn);
764 write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
768 LASSERT ((route->ksnr_connected & (1 << type)) == 0);
769 LASSERT ((route->ksnr_connecting & (1 << type)) != 0);
771 if (route->ksnr_deleted) {
772 /* This conn was autoconnected, but the autoconnect
773 * route got deleted while it was being
775 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock,
777 PORTAL_FREE (conn, sizeof (*conn));
782 /* associate conn/route */
783 conn->ksnc_route = route;
784 atomic_inc (&route->ksnr_refcount);
786 route->ksnr_connecting &= ~(1 << type);
787 route->ksnr_connected |= (1 << type);
788 route->ksnr_conn_count++;
789 route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
791 peer = route->ksnr_peer;
793 /* Not an autoconnected connection; see if there is an
794 * existing peer for this NID */
795 peer2 = ksocknal_find_peer_locked (nid);
797 ksocknal_put_peer (peer);
800 list_add (&peer->ksnp_list,
801 ksocknal_nid2peerlist (nid));
802 /* peer list takes over existing ref */
806 LASSERT (!peer->ksnp_closing);
808 conn->ksnc_peer = peer;
809 atomic_inc (&peer->ksnp_refcount);
810 peer->ksnp_last_alive = jiffies;
811 peer->ksnp_error = 0;
813 /* Set the deadline for the outgoing HELLO to drain */
814 conn->ksnc_tx_deadline = jiffies +
815 ksocknal_tunables.ksnd_io_timeout * HZ;
817 list_add (&conn->ksnc_list, &peer->ksnp_conns);
818 atomic_inc (&conn->ksnc_refcount);
820 sched = ksocknal_choose_scheduler_locked (irq);
822 conn->ksnc_scheduler = sched;
824 /* NB my callbacks block while I hold ksnd_global_lock */
825 sock->sk->sk_user_data = conn;
826 sock->sk->sk_data_ready = ksocknal_data_ready;
827 sock->sk->sk_write_space = ksocknal_write_space;
829 /* Take all the packets blocking for a connection.
830 * NB, it might be nicer to share these blocked packets among any
831 * other connections that are becoming established, however that
832 * confuses the normal packet launching operation, which selects a
833 * connection and queues the packet on it without needing an
834 * exclusive lock on ksnd_global_lock. */
835 while (!list_empty (&peer->ksnp_tx_queue)) {
836 tx = list_entry (peer->ksnp_tx_queue.next,
837 ksock_tx_t, tx_list);
839 list_del (&tx->tx_list);
840 ksocknal_queue_tx_locked (tx, conn);
843 rc = ksocknal_close_stale_conns_locked (peer, incarnation);
845 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
848 CERROR ("Closed %d stale conns to nid "LPX64" ip %d.%d.%d.%d\n",
849 rc, conn->ksnc_peer->ksnp_nid,
850 HIPQUAD(conn->ksnc_ipaddr));
852 if (bind_irq) /* irq binding required */
853 ksocknal_bind_irq (irq);
855 /* Call the callbacks right now to get things going. */
856 ksocknal_data_ready (sock->sk, 0);
857 ksocknal_write_space (sock->sk);
859 CDEBUG(D_IOCTL, "conn [%p] registered for nid "LPX64" ip %d.%d.%d.%d\n",
860 conn, conn->ksnc_peer->ksnp_nid, HIPQUAD(conn->ksnc_ipaddr));
862 ksocknal_put_conn (conn);
867 ksocknal_close_conn_locked (ksock_conn_t *conn, int error)
869 /* This just does the immmediate housekeeping, and queues the
870 * connection for the reaper to terminate.
871 * Caller holds ksnd_global_lock exclusively in irq context */
872 ksock_peer_t *peer = conn->ksnc_peer;
873 ksock_route_t *route;
875 LASSERT (peer->ksnp_error == 0);
876 LASSERT (!conn->ksnc_closing);
877 conn->ksnc_closing = 1;
878 atomic_inc (&ksocknal_data.ksnd_nclosing_conns);
880 route = conn->ksnc_route;
882 /* dissociate conn from route... */
883 LASSERT (!route->ksnr_deleted);
884 LASSERT ((route->ksnr_connecting & (1 << conn->ksnc_type)) == 0);
885 LASSERT ((route->ksnr_connected & (1 << conn->ksnc_type)) != 0);
887 route->ksnr_connected &= ~(1 << conn->ksnc_type);
888 conn->ksnc_route = NULL;
890 list_del (&route->ksnr_list); /* make route least favourite */
891 list_add_tail (&route->ksnr_list, &peer->ksnp_routes);
893 ksocknal_put_route (route); /* drop conn's ref on route */
896 /* ksnd_deathrow_conns takes over peer's ref */
897 list_del (&conn->ksnc_list);
899 if (list_empty (&peer->ksnp_conns)) {
900 /* No more connections to this peer */
902 peer->ksnp_error = error; /* stash last conn close reason */
904 if (list_empty (&peer->ksnp_routes)) {
905 /* I've just closed last conn belonging to a
906 * non-autoconnecting peer */
907 ksocknal_unlink_peer_locked (peer);
911 spin_lock (&ksocknal_data.ksnd_reaper_lock);
913 list_add_tail (&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
914 wake_up (&ksocknal_data.ksnd_reaper_waitq);
916 spin_unlock (&ksocknal_data.ksnd_reaper_lock);
920 ksocknal_terminate_conn (ksock_conn_t *conn)
922 /* This gets called by the reaper (guaranteed thread context) to
923 * disengage the socket from its callbacks and close it.
924 * ksnc_refcount will eventually hit zero, and then the reaper will
927 ksock_peer_t *peer = conn->ksnc_peer;
928 ksock_sched_t *sched = conn->ksnc_scheduler;
933 LASSERT(conn->ksnc_closing);
935 /* wake up the scheduler to "send" all remaining packets to /dev/null */
936 spin_lock_irqsave(&sched->kss_lock, flags);
938 if (!conn->ksnc_tx_scheduled &&
939 !list_empty(&conn->ksnc_tx_queue)){
940 list_add_tail (&conn->ksnc_tx_list,
941 &sched->kss_tx_conns);
942 /* a closing conn is always ready to tx */
943 conn->ksnc_tx_ready = 1;
944 conn->ksnc_tx_scheduled = 1;
945 /* extra ref for scheduler */
946 atomic_inc (&conn->ksnc_refcount);
948 wake_up (&sched->kss_waitq);
951 spin_unlock_irqrestore (&sched->kss_lock, flags);
953 /* serialise with callbacks */
954 write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
956 /* Remove conn's network callbacks.
957 * NB I _have_ to restore the callback, rather than storing a noop,
958 * since the socket could survive past this module being unloaded!! */
959 conn->ksnc_sock->sk->sk_data_ready = conn->ksnc_saved_data_ready;
960 conn->ksnc_sock->sk->sk_write_space = conn->ksnc_saved_write_space;
962 /* A callback could be in progress already; they hold a read lock
963 * on ksnd_global_lock (to serialise with me) and NOOP if
964 * sk_user_data is NULL. */
965 conn->ksnc_sock->sk->sk_user_data = NULL;
967 /* OK, so this conn may not be completely disengaged from its
968 * scheduler yet, but it _has_ committed to terminate... */
969 conn->ksnc_scheduler->kss_nconns--;
971 if (peer->ksnp_error != 0) {
972 /* peer's last conn closed in error */
973 LASSERT (list_empty (&peer->ksnp_conns));
975 /* convert peer's last-known-alive timestamp from jiffies */
976 do_gettimeofday (&now);
977 then = now.tv_sec - (jiffies - peer->ksnp_last_alive)/HZ;
981 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
983 /* The socket is closed on the final put; either here, or in
984 * ksocknal_{send,recv}msg(). Since we set up the linger2 option
985 * when the connection was established, this will close the socket
986 * immediately, aborting anything buffered in it. Any hung
987 * zero-copy transmits will therefore complete in finite time. */
988 ksocknal_putconnsock (conn);
991 kpr_notify (&ksocknal_data.ksnd_router, peer->ksnp_nid,
996 ksocknal_destroy_conn (ksock_conn_t *conn)
998 /* Final coup-de-grace of the reaper */
999 CDEBUG (D_NET, "connection %p\n", conn);
1001 LASSERT (atomic_read (&conn->ksnc_refcount) == 0);
1002 LASSERT (conn->ksnc_route == NULL);
1003 LASSERT (!conn->ksnc_tx_scheduled);
1004 LASSERT (!conn->ksnc_rx_scheduled);
1005 LASSERT (list_empty(&conn->ksnc_tx_queue));
1007 /* complete current receive if any */
1008 switch (conn->ksnc_rx_state) {
1009 case SOCKNAL_RX_BODY:
1010 CERROR("Completing partial receive from "LPX64
1011 ", ip %d.%d.%d.%d:%d, with error\n",
1012 conn->ksnc_peer->ksnp_nid,
1013 HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
1014 lib_finalize (&ksocknal_lib, NULL, conn->ksnc_cookie, PTL_FAIL);
1016 case SOCKNAL_RX_BODY_FWD:
1017 ksocknal_fmb_callback (conn->ksnc_cookie, -ECONNABORTED);
1019 case SOCKNAL_RX_HEADER:
1020 case SOCKNAL_RX_SLOP:
1027 ksocknal_put_peer (conn->ksnc_peer);
1029 PORTAL_FREE (conn, sizeof (*conn));
1030 atomic_dec (&ksocknal_data.ksnd_nclosing_conns);
1034 ksocknal_put_conn (ksock_conn_t *conn)
1036 unsigned long flags;
1038 CDEBUG (D_OTHER, "putting conn[%p] -> "LPX64" (%d)\n",
1039 conn, conn->ksnc_peer->ksnp_nid,
1040 atomic_read (&conn->ksnc_refcount));
1042 LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
1043 if (!atomic_dec_and_test (&conn->ksnc_refcount))
1046 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
1048 list_add (&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1049 wake_up (&ksocknal_data.ksnd_reaper_waitq);
1051 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
1055 ksocknal_close_peer_conns_locked (ksock_peer_t *peer, __u32 ipaddr, int why)
1058 struct list_head *ctmp;
1059 struct list_head *cnxt;
1062 list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1063 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
1066 conn->ksnc_ipaddr == ipaddr) {
1068 ksocknal_close_conn_locked (conn, why);
1076 ksocknal_close_stale_conns_locked (ksock_peer_t *peer, __u64 incarnation)
1079 struct list_head *ctmp;
1080 struct list_head *cnxt;
1083 list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1084 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
1086 if (conn->ksnc_incarnation == incarnation)
1089 CWARN("Closing stale conn nid:"LPX64" ip:%08x/%d "
1090 "incarnation:"LPX64"("LPX64")\n",
1091 peer->ksnp_nid, conn->ksnc_ipaddr, conn->ksnc_port,
1092 conn->ksnc_incarnation, incarnation);
1095 ksocknal_close_conn_locked (conn, -ESTALE);
1102 ksocknal_close_conn_and_siblings (ksock_conn_t *conn, int why)
1104 ksock_peer_t *peer = conn->ksnc_peer;
1105 __u32 ipaddr = conn->ksnc_ipaddr;
1106 unsigned long flags;
1109 write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1111 count = ksocknal_close_peer_conns_locked (peer, ipaddr, why);
1113 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1119 ksocknal_close_matching_conns (ptl_nid_t nid, __u32 ipaddr)
1121 unsigned long flags;
1123 struct list_head *ptmp;
1124 struct list_head *pnxt;
1130 write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1132 if (nid != PTL_NID_ANY)
1133 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
1136 hi = ksocknal_data.ksnd_peer_hash_size - 1;
1139 for (i = lo; i <= hi; i++) {
1140 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
1142 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
1144 if (!(nid == PTL_NID_ANY || nid == peer->ksnp_nid))
1147 count += ksocknal_close_peer_conns_locked (peer, ipaddr, 0);
1151 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1153 /* wildcards always succeed */
1154 if (nid == PTL_NID_ANY || ipaddr == 0)
1157 return (count == 0 ? -ENOENT : 0);
1161 ksocknal_notify (void *arg, ptl_nid_t gw_nid, int alive)
1163 /* The router is telling me she's been notified of a change in
1164 * gateway state.... */
1166 CDEBUG (D_NET, "gw "LPX64" %s\n", gw_nid, alive ? "up" : "down");
1169 /* If the gateway crashed, close all open connections... */
1170 ksocknal_close_matching_conns (gw_nid, 0);
1174 /* ...otherwise do nothing. We can only establish new connections
1175 * if we have autroutes, and these connect on demand. */
1178 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1179 struct tcp_opt *sock2tcp_opt(struct sock *sk)
1181 return &(sk->tp_pinfo.af_tcp);
1184 struct tcp_opt *sock2tcp_opt(struct sock *sk)
1186 struct tcp_sock *s = (struct tcp_sock *)sk;
1192 ksocknal_push_conn (ksock_conn_t *conn)
1201 rc = ksocknal_getconnsock (conn);
1202 if (rc != 0) /* being shut down */
1205 sk = conn->ksnc_sock->sk;
1206 tp = sock2tcp_opt(sk);
1209 nonagle = tp->nonagle;
1216 rc = sk->sk_prot->setsockopt (sk, SOL_TCP, TCP_NODELAY,
1217 (char *)&val, sizeof (val));
1223 tp->nonagle = nonagle;
1226 ksocknal_putconnsock (conn);
1230 ksocknal_push_peer (ksock_peer_t *peer)
1234 struct list_head *tmp;
1237 for (index = 0; ; index++) {
1238 read_lock (&ksocknal_data.ksnd_global_lock);
1243 list_for_each (tmp, &peer->ksnp_conns) {
1245 conn = list_entry (tmp, ksock_conn_t, ksnc_list);
1246 atomic_inc (&conn->ksnc_refcount);
1251 read_unlock (&ksocknal_data.ksnd_global_lock);
1256 ksocknal_push_conn (conn);
1257 ksocknal_put_conn (conn);
1262 ksocknal_push (ptl_nid_t nid)
1265 struct list_head *tmp;
1271 if (nid != PTL_NID_ANY) {
1272 peer = ksocknal_get_peer (nid);
1276 ksocknal_push_peer (peer);
1277 ksocknal_put_peer (peer);
1282 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1283 for (j = 0; ; j++) {
1284 read_lock (&ksocknal_data.ksnd_global_lock);
1289 list_for_each (tmp, &ksocknal_data.ksnd_peers[i]) {
1291 peer = list_entry(tmp, ksock_peer_t,
1293 atomic_inc (&peer->ksnp_refcount);
1298 read_unlock (&ksocknal_data.ksnd_global_lock);
1302 ksocknal_push_peer (peer);
1303 ksocknal_put_peer (peer);
1313 ksocknal_cmd(struct portals_cfg *pcfg, void * private)
1317 LASSERT (pcfg != NULL);
1319 switch(pcfg->pcfg_command) {
1320 case NAL_CMD_GET_AUTOCONN: {
1321 ksock_route_t *route = ksocknal_get_route_by_idx (pcfg->pcfg_count);
1327 pcfg->pcfg_nid = route->ksnr_peer->ksnp_nid;
1328 pcfg->pcfg_id = route->ksnr_ipaddr;
1329 pcfg->pcfg_misc = route->ksnr_port;
1330 pcfg->pcfg_count = route->ksnr_conn_count;
1331 pcfg->pcfg_size = route->ksnr_buffer_size;
1332 pcfg->pcfg_wait = route->ksnr_sharecount;
1333 pcfg->pcfg_flags = (route->ksnr_irq_affinity ? 2 : 0) |
1334 (route->ksnr_eager ? 4 : 0);
1335 ksocknal_put_route (route);
1339 case NAL_CMD_ADD_AUTOCONN: {
1340 rc = ksocknal_add_route (pcfg->pcfg_nid, pcfg->pcfg_id,
1341 pcfg->pcfg_misc, pcfg->pcfg_size,
1342 (pcfg->pcfg_flags & 0x02) != 0,
1343 (pcfg->pcfg_flags & 0x04) != 0,
1344 (pcfg->pcfg_flags & 0x08) != 0);
1347 case NAL_CMD_DEL_AUTOCONN: {
1348 rc = ksocknal_del_route (pcfg->pcfg_nid, pcfg->pcfg_id,
1349 (pcfg->pcfg_flags & 1) != 0,
1350 (pcfg->pcfg_flags & 2) != 0);
1353 case NAL_CMD_GET_CONN: {
1354 ksock_conn_t *conn = ksocknal_get_conn_by_idx (pcfg->pcfg_count);
1360 pcfg->pcfg_nid = conn->ksnc_peer->ksnp_nid;
1361 pcfg->pcfg_id = conn->ksnc_ipaddr;
1362 pcfg->pcfg_misc = conn->ksnc_port;
1363 pcfg->pcfg_flags = conn->ksnc_type;
1364 ksocknal_put_conn (conn);
1368 case NAL_CMD_REGISTER_PEER_FD: {
1369 struct socket *sock = sockfd_lookup (pcfg->pcfg_fd, &rc);
1370 int type = pcfg->pcfg_misc;
1376 case SOCKNAL_CONN_NONE:
1377 case SOCKNAL_CONN_ANY:
1378 case SOCKNAL_CONN_CONTROL:
1379 case SOCKNAL_CONN_BULK_IN:
1380 case SOCKNAL_CONN_BULK_OUT:
1381 rc = ksocknal_create_conn(NULL, sock, pcfg->pcfg_flags, type);
1389 case NAL_CMD_CLOSE_CONNECTION: {
1390 rc = ksocknal_close_matching_conns (pcfg->pcfg_nid,
1394 case NAL_CMD_REGISTER_MYNID: {
1395 rc = ksocknal_set_mynid (pcfg->pcfg_nid);
1398 case NAL_CMD_PUSH_CONNECTION: {
1399 rc = ksocknal_push (pcfg->pcfg_nid);
1408 ksocknal_free_fmbs (ksock_fmb_pool_t *p)
1410 int npages = p->fmp_buff_pages;
1414 LASSERT (list_empty(&p->fmp_blocked_conns));
1415 LASSERT (p->fmp_nactive_fmbs == 0);
1417 while (!list_empty(&p->fmp_idle_fmbs)) {
1419 fmb = list_entry(p->fmp_idle_fmbs.next,
1420 ksock_fmb_t, fmb_list);
1422 for (i = 0; i < npages; i++)
1423 if (fmb->fmb_kiov[i].kiov_page != NULL)
1424 __free_page(fmb->fmb_kiov[i].kiov_page);
1426 list_del(&fmb->fmb_list);
1427 PORTAL_FREE(fmb, offsetof(ksock_fmb_t, fmb_kiov[npages]));
1432 ksocknal_free_buffers (void)
1434 ksocknal_free_fmbs(&ksocknal_data.ksnd_small_fmp);
1435 ksocknal_free_fmbs(&ksocknal_data.ksnd_large_fmp);
1437 LASSERT (atomic_read(&ksocknal_data.ksnd_nactive_ltxs) == 0);
1439 if (ksocknal_data.ksnd_schedulers != NULL)
1440 PORTAL_FREE (ksocknal_data.ksnd_schedulers,
1441 sizeof (ksock_sched_t) * SOCKNAL_N_SCHED);
1443 PORTAL_FREE (ksocknal_data.ksnd_peers,
1444 sizeof (struct list_head) *
1445 ksocknal_data.ksnd_peer_hash_size);
1449 ksocknal_api_shutdown (nal_t *nal)
1453 if (nal->nal_refct != 0) {
1454 /* This module got the first ref */
1455 PORTAL_MODULE_UNUSE;
1459 CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1460 atomic_read (&portal_kmemory));
1462 LASSERT(nal == &ksocknal_api);
1464 switch (ksocknal_data.ksnd_init) {
1468 case SOCKNAL_INIT_ALL:
1469 libcfs_nal_cmd_unregister(SOCKNAL);
1471 ksocknal_data.ksnd_init = SOCKNAL_INIT_LIB;
1474 case SOCKNAL_INIT_LIB:
1475 /* No more calls to ksocknal_cmd() to create new
1476 * autoroutes/connections since we're being unloaded. */
1478 /* Delete all autoroute entries */
1479 ksocknal_del_route(PTL_NID_ANY, 0, 0, 0);
1481 /* Delete all connections */
1482 ksocknal_close_matching_conns (PTL_NID_ANY, 0);
1484 /* Wait for all peer state to clean up */
1486 while (atomic_read (&ksocknal_data.ksnd_npeers) != 0) {
1488 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1489 "waiting for %d peers to disconnect\n",
1490 atomic_read (&ksocknal_data.ksnd_npeers));
1491 set_current_state (TASK_UNINTERRUPTIBLE);
1492 schedule_timeout (HZ);
1495 /* Tell lib we've stopped calling into her. */
1496 lib_fini(&ksocknal_lib);
1498 ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
1501 case SOCKNAL_INIT_DATA:
1502 /* Module refcount only gets to zero when all peers
1503 * have been closed so all lists must be empty */
1504 LASSERT (atomic_read (&ksocknal_data.ksnd_npeers) == 0);
1505 LASSERT (ksocknal_data.ksnd_peers != NULL);
1506 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1507 LASSERT (list_empty (&ksocknal_data.ksnd_peers[i]));
1509 LASSERT (list_empty (&ksocknal_data.ksnd_enomem_conns));
1510 LASSERT (list_empty (&ksocknal_data.ksnd_zombie_conns));
1511 LASSERT (list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
1512 LASSERT (list_empty (&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns));
1513 LASSERT (list_empty (&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns));
1515 if (ksocknal_data.ksnd_schedulers != NULL)
1516 for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1517 ksock_sched_t *kss =
1518 &ksocknal_data.ksnd_schedulers[i];
1520 LASSERT (list_empty (&kss->kss_tx_conns));
1521 LASSERT (list_empty (&kss->kss_rx_conns));
1522 LASSERT (kss->kss_nconns == 0);
1525 /* stop router calling me */
1526 kpr_shutdown (&ksocknal_data.ksnd_router);
1528 /* flag threads to terminate; wake and wait for them to die */
1529 ksocknal_data.ksnd_shuttingdown = 1;
1530 wake_up_all (&ksocknal_data.ksnd_autoconnectd_waitq);
1531 wake_up_all (&ksocknal_data.ksnd_reaper_waitq);
1533 for (i = 0; i < SOCKNAL_N_SCHED; i++)
1534 wake_up_all(&ksocknal_data.ksnd_schedulers[i].kss_waitq);
1536 while (atomic_read (&ksocknal_data.ksnd_nthreads) != 0) {
1537 CDEBUG (D_NET, "waitinf for %d threads to terminate\n",
1538 atomic_read (&ksocknal_data.ksnd_nthreads));
1539 set_current_state (TASK_UNINTERRUPTIBLE);
1540 schedule_timeout (HZ);
1543 kpr_deregister (&ksocknal_data.ksnd_router);
1545 ksocknal_free_buffers();
1547 ksocknal_data.ksnd_init = SOCKNAL_INIT_NOTHING;
1550 case SOCKNAL_INIT_NOTHING:
1554 CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1555 atomic_read (&portal_kmemory));
1557 printk(KERN_INFO "Lustre: Routing socket NAL unloaded (final mem %d)\n",
1558 atomic_read(&portal_kmemory));
1563 ksocknal_init_incarnation (void)
1567 /* The incarnation number is the time this module loaded and it
1568 * identifies this particular instance of the socknal. Hopefully
1569 * we won't be able to reboot more frequently than 1MHz for the
1570 * forseeable future :) */
1572 do_gettimeofday(&tv);
1574 ksocknal_data.ksnd_incarnation =
1575 (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
1579 ksocknal_api_startup (nal_t *nal, ptl_pid_t requested_pid,
1580 ptl_ni_limits_t *requested_limits,
1581 ptl_ni_limits_t *actual_limits)
1583 ptl_process_id_t process_id;
1584 int pkmem = atomic_read(&portal_kmemory);
1589 LASSERT (nal == &ksocknal_api);
1591 if (nal->nal_refct != 0) {
1592 if (actual_limits != NULL)
1593 *actual_limits = ksocknal_lib.ni.actual_limits;
1594 /* This module got the first ref */
1599 LASSERT (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
1601 memset (&ksocknal_data, 0, sizeof (ksocknal_data)); /* zero pointers */
1603 ksocknal_init_incarnation();
1605 ksocknal_data.ksnd_peer_hash_size = SOCKNAL_PEER_HASH_SIZE;
1606 PORTAL_ALLOC (ksocknal_data.ksnd_peers,
1607 sizeof (struct list_head) * ksocknal_data.ksnd_peer_hash_size);
1608 if (ksocknal_data.ksnd_peers == NULL)
1611 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++)
1612 INIT_LIST_HEAD(&ksocknal_data.ksnd_peers[i]);
1614 rwlock_init(&ksocknal_data.ksnd_global_lock);
1616 ksocknal_data.ksnd_nal_cb = &ksocknal_lib;
1617 spin_lock_init (&ksocknal_data.ksnd_nal_cb_lock);
1618 init_waitqueue_head(&ksocknal_data.ksnd_yield_waitq);
1620 spin_lock_init(&ksocknal_data.ksnd_small_fmp.fmp_lock);
1621 INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_idle_fmbs);
1622 INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns);
1623 ksocknal_data.ksnd_small_fmp.fmp_buff_pages = SOCKNAL_SMALL_FWD_PAGES;
1625 spin_lock_init(&ksocknal_data.ksnd_large_fmp.fmp_lock);
1626 INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_idle_fmbs);
1627 INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns);
1628 ksocknal_data.ksnd_large_fmp.fmp_buff_pages = SOCKNAL_LARGE_FWD_PAGES;
1630 spin_lock_init (&ksocknal_data.ksnd_reaper_lock);
1631 INIT_LIST_HEAD (&ksocknal_data.ksnd_enomem_conns);
1632 INIT_LIST_HEAD (&ksocknal_data.ksnd_zombie_conns);
1633 INIT_LIST_HEAD (&ksocknal_data.ksnd_deathrow_conns);
1634 init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
1636 spin_lock_init (&ksocknal_data.ksnd_autoconnectd_lock);
1637 INIT_LIST_HEAD (&ksocknal_data.ksnd_autoconnectd_routes);
1638 init_waitqueue_head(&ksocknal_data.ksnd_autoconnectd_waitq);
1640 /* NB memset above zeros whole of ksocknal_data, including
1641 * ksocknal_data.ksnd_irqinfo[all].ksni_valid */
1643 /* flag lists/ptrs/locks initialised */
1644 ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
1646 PORTAL_ALLOC(ksocknal_data.ksnd_schedulers,
1647 sizeof(ksock_sched_t) * SOCKNAL_N_SCHED);
1648 if (ksocknal_data.ksnd_schedulers == NULL) {
1649 ksocknal_api_shutdown (&ksocknal_api);
1653 for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1654 ksock_sched_t *kss = &ksocknal_data.ksnd_schedulers[i];
1656 spin_lock_init (&kss->kss_lock);
1657 INIT_LIST_HEAD (&kss->kss_rx_conns);
1658 INIT_LIST_HEAD (&kss->kss_tx_conns);
1660 INIT_LIST_HEAD (&kss->kss_zctxdone_list);
1662 init_waitqueue_head (&kss->kss_waitq);
1665 /* NB we have to wait to be told our true NID... */
1669 rc = lib_init(&ksocknal_lib, process_id,
1670 requested_limits, actual_limits);
1672 CERROR("lib_init failed: error %d\n", rc);
1673 ksocknal_api_shutdown (&ksocknal_api);
1677 ksocknal_data.ksnd_init = SOCKNAL_INIT_LIB; // flag lib_init() called
1679 for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1680 rc = ksocknal_thread_start (ksocknal_scheduler,
1681 &ksocknal_data.ksnd_schedulers[i]);
1683 CERROR("Can't spawn socknal scheduler[%d]: %d\n",
1685 ksocknal_api_shutdown (&ksocknal_api);
1690 for (i = 0; i < SOCKNAL_N_AUTOCONNECTD; i++) {
1691 rc = ksocknal_thread_start (ksocknal_autoconnectd, (void *)((long)i));
1693 CERROR("Can't spawn socknal autoconnectd: %d\n", rc);
1694 ksocknal_api_shutdown (&ksocknal_api);
1699 rc = ksocknal_thread_start (ksocknal_reaper, NULL);
1701 CERROR ("Can't spawn socknal reaper: %d\n", rc);
1702 ksocknal_api_shutdown (&ksocknal_api);
1706 rc = kpr_register(&ksocknal_data.ksnd_router,
1707 &ksocknal_router_interface);
1709 CDEBUG(D_NET, "Can't initialise routing interface "
1710 "(rc = %d): not routing\n", rc);
1712 /* Only allocate forwarding buffers if there's a router */
1714 for (i = 0; i < (SOCKNAL_SMALL_FWD_NMSGS +
1715 SOCKNAL_LARGE_FWD_NMSGS); i++) {
1717 ksock_fmb_pool_t *pool;
1720 if (i < SOCKNAL_SMALL_FWD_NMSGS)
1721 pool = &ksocknal_data.ksnd_small_fmp;
1723 pool = &ksocknal_data.ksnd_large_fmp;
1725 PORTAL_ALLOC(fmb, offsetof(ksock_fmb_t,
1726 fmb_kiov[pool->fmp_buff_pages]));
1728 ksocknal_api_shutdown(&ksocknal_api);
1732 fmb->fmb_pool = pool;
1734 for (j = 0; j < pool->fmp_buff_pages; j++) {
1735 fmb->fmb_kiov[j].kiov_page = alloc_page(GFP_KERNEL);
1737 if (fmb->fmb_kiov[j].kiov_page == NULL) {
1738 ksocknal_api_shutdown (&ksocknal_api);
1742 LASSERT(page_address(fmb->fmb_kiov[j].kiov_page) != NULL);
1745 list_add(&fmb->fmb_list, &pool->fmp_idle_fmbs);
1749 rc = libcfs_nal_cmd_register(SOCKNAL, &ksocknal_cmd, NULL);
1751 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
1752 ksocknal_api_shutdown (&ksocknal_api);
1756 /* flag everything initialised */
1757 ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
1759 printk(KERN_INFO "Lustre: Routing socket NAL loaded "
1760 "(Routing %s, initial mem %d, incarnation "LPX64")\n",
1761 kpr_routing (&ksocknal_data.ksnd_router) ?
1762 "enabled" : "disabled", pkmem, ksocknal_data.ksnd_incarnation);
1768 ksocknal_module_fini (void)
1770 #ifdef CONFIG_SYSCTL
1771 if (ksocknal_tunables.ksnd_sysctl != NULL)
1772 unregister_sysctl_table (ksocknal_tunables.ksnd_sysctl);
1774 PtlNIFini(ksocknal_ni);
1776 ptl_unregister_nal(SOCKNAL);
1780 ksocknal_module_init (void)
1784 /* packet descriptor must fit in a router descriptor's scratchpad */
1785 LASSERT(sizeof (ksock_tx_t) <= sizeof (kprfd_scratch_t));
1786 /* the following must be sizeof(int) for proc_dointvec() */
1787 LASSERT(sizeof (ksocknal_tunables.ksnd_io_timeout) == sizeof (int));
1788 LASSERT(sizeof (ksocknal_tunables.ksnd_eager_ack) == sizeof (int));
1789 LASSERT(sizeof (ksocknal_tunables.ksnd_typed_conns) == sizeof (int));
1790 LASSERT(sizeof (ksocknal_tunables.ksnd_min_bulk) == sizeof (int));
1792 LASSERT(sizeof (ksocknal_tunables.ksnd_zc_min_frag) == sizeof (int));
1794 /* check ksnr_connected/connecting field large enough */
1795 LASSERT(SOCKNAL_CONN_NTYPES <= 4);
1797 ksocknal_api.startup = ksocknal_api_startup;
1798 ksocknal_api.forward = ksocknal_api_forward;
1799 ksocknal_api.shutdown = ksocknal_api_shutdown;
1800 ksocknal_api.lock = ksocknal_api_lock;
1801 ksocknal_api.unlock = ksocknal_api_unlock;
1802 ksocknal_api.nal_data = &ksocknal_data;
1804 ksocknal_lib.nal_data = &ksocknal_data;
1806 /* Initialise dynamic tunables to defaults once only */
1807 ksocknal_tunables.ksnd_io_timeout = SOCKNAL_IO_TIMEOUT;
1808 ksocknal_tunables.ksnd_eager_ack = SOCKNAL_EAGER_ACK;
1809 ksocknal_tunables.ksnd_typed_conns = SOCKNAL_TYPED_CONNS;
1810 ksocknal_tunables.ksnd_min_bulk = SOCKNAL_MIN_BULK;
1812 ksocknal_tunables.ksnd_zc_min_frag = SOCKNAL_ZC_MIN_FRAG;
1815 rc = ptl_register_nal(SOCKNAL, &ksocknal_api);
1817 CERROR("Can't register SOCKNAL: %d\n", rc);
1818 return (-ENOMEM); /* or something... */
1821 /* Pure gateways want the NAL started up at module load time... */
1822 rc = PtlNIInit(SOCKNAL, 0, NULL, NULL, &ksocknal_ni);
1823 if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
1824 ptl_unregister_nal(SOCKNAL);
1828 #ifdef CONFIG_SYSCTL
1829 /* Press on regardless even if registering sysctl doesn't work */
1830 ksocknal_tunables.ksnd_sysctl =
1831 register_sysctl_table (ksocknal_top_ctl_table, 0);
1836 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1837 MODULE_DESCRIPTION("Kernel TCP Socket NAL v0.01");
1838 MODULE_LICENSE("GPL");
1840 module_init(ksocknal_module_init);
1841 module_exit(ksocknal_module_fini);