1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5 * Author: Zach Brown <zab@zabbo.net>
6 * Author: Peter J. Braam <braam@clusterfs.com>
7 * Author: Phil Schwan <phil@clusterfs.com>
8 * Author: Eric Barton <eric@bartonsoftware.com>
10 * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
12 * Portals is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Portals is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Portals; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
28 ptl_handle_ni_t ksocknal_ni;
29 static nal_t ksocknal_api;
30 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
31 ksock_nal_data_t ksocknal_data;
33 static ksock_nal_data_t ksocknal_data;
36 kpr_nal_interface_t ksocknal_router_interface = {
38 kprni_arg: &ksocknal_data,
39 kprni_fwd: ksocknal_fwd_packet,
40 kprni_notify: ksocknal_notify,
43 #define SOCKNAL_SYSCTL 200
45 #define SOCKNAL_SYSCTL_TIMEOUT 1
46 #define SOCKNAL_SYSCTL_EAGER_ACK 2
47 #define SOCKNAL_SYSCTL_ZERO_COPY 3
48 #define SOCKNAL_SYSCTL_TYPED 4
49 #define SOCKNAL_SYSCTL_MIN_BULK 5
51 static ctl_table ksocknal_ctl_table[] = {
52 {SOCKNAL_SYSCTL_TIMEOUT, "timeout",
53 &ksocknal_data.ksnd_io_timeout, sizeof (int),
54 0644, NULL, &proc_dointvec},
55 {SOCKNAL_SYSCTL_EAGER_ACK, "eager_ack",
56 &ksocknal_data.ksnd_eager_ack, sizeof (int),
57 0644, NULL, &proc_dointvec},
59 {SOCKNAL_SYSCTL_ZERO_COPY, "zero_copy",
60 &ksocknal_data.ksnd_zc_min_frag, sizeof (int),
61 0644, NULL, &proc_dointvec},
63 {SOCKNAL_SYSCTL_TYPED, "typed",
64 &ksocknal_data.ksnd_typed_conns, sizeof (int),
65 0644, NULL, &proc_dointvec},
66 {SOCKNAL_SYSCTL_MIN_BULK, "min_bulk",
67 &ksocknal_data.ksnd_min_bulk, sizeof (int),
68 0644, NULL, &proc_dointvec},
72 static ctl_table ksocknal_top_ctl_table[] = {
73 {SOCKNAL_SYSCTL, "socknal", NULL, 0, 0555, ksocknal_ctl_table},
78 ksocknal_api_forward(nal_t *nal, int id, void *args, size_t args_len,
79 void *ret, size_t ret_len)
85 nal_cb = k->ksnd_nal_cb;
87 lib_dispatch(nal_cb, k, id, args, ret); /* ksocknal_send needs k */
92 ksocknal_api_shutdown(nal_t *nal, int ni)
98 ksocknal_api_lock(nal_t *nal, unsigned long *flags)
104 nal_cb = k->ksnd_nal_cb;
105 nal_cb->cb_cli(nal_cb,flags);
109 ksocknal_api_unlock(nal_t *nal, unsigned long *flags)
115 nal_cb = k->ksnd_nal_cb;
116 nal_cb->cb_sti(nal_cb,flags);
120 ksocknal_api_yield(nal_t *nal, unsigned long *flags, int milliseconds)
122 /* NB called holding statelock */
124 unsigned long now = jiffies;
126 CDEBUG (D_NET, "yield\n");
128 if (milliseconds == 0) {
133 init_waitqueue_entry(&wait, current);
134 set_current_state (TASK_INTERRUPTIBLE);
135 add_wait_queue (&ksocknal_data.ksnd_yield_waitq, &wait);
137 ksocknal_api_unlock(nal, flags);
139 if (milliseconds < 0)
142 schedule_timeout((milliseconds * HZ) / 1000);
144 ksocknal_api_lock(nal, flags);
146 remove_wait_queue (&ksocknal_data.ksnd_yield_waitq, &wait);
148 if (milliseconds > 0) {
149 milliseconds -= ((jiffies - now) * 1000) / HZ;
150 if (milliseconds < 0)
154 return (milliseconds);
158 ksocknal_init(int interface, ptl_pt_index_t ptl_size,
159 ptl_ac_index_t ac_size, ptl_pid_t requested_pid)
161 CDEBUG(D_NET, "calling lib_init with nid "LPX64"\n", (ptl_nid_t)0);
162 lib_init(&ksocknal_lib, (ptl_nid_t)0, 0, 10, ptl_size, ac_size);
163 return (&ksocknal_api);
167 * EXTRA functions follow
171 ksocknal_set_mynid(ptl_nid_t nid)
173 lib_ni_t *ni = &ksocknal_lib.ni;
175 /* FIXME: we have to do this because we call lib_init() at module
176 * insertion time, which is before we have 'mynid' available. lib_init
177 * sets the NAL's nid, which it uses to tell other nodes where packets
178 * are coming from. This is not a very graceful solution to this
181 CDEBUG(D_IOCTL, "setting mynid to "LPX64" (old nid="LPX64")\n",
189 ksocknal_bind_irq (unsigned int irq)
191 #if (defined(CONFIG_SMP) && CPU_AFFINITY)
195 ksock_irqinfo_t *info;
196 char *argv[] = {"/bin/sh",
200 char *envp[] = {"HOME=/",
201 "PATH=/sbin:/bin:/usr/sbin:/usr/bin",
204 LASSERT (irq < NR_IRQS);
205 if (irq == 0) /* software NIC */
208 info = &ksocknal_data.ksnd_irqinfo[irq];
210 write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
212 LASSERT (info->ksni_valid);
213 bind = !info->ksni_bound;
214 info->ksni_bound = 1;
216 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
218 if (!bind) /* bound already */
221 snprintf (cmdline, sizeof (cmdline),
222 "echo %d > /proc/irq/%u/smp_affinity", 1 << info->ksni_sched, irq);
224 printk (KERN_INFO "Lustre: Binding irq %u to CPU %d with cmd: %s\n",
225 irq, info->ksni_sched, cmdline);
227 /* FIXME: Find a better method of setting IRQ affinity...
230 USERMODEHELPER(argv[0], argv, envp);
235 ksocknal_create_route (__u32 ipaddr, int port, int buffer_size,
236 int irq_affinity, int eager)
238 ksock_route_t *route;
240 PORTAL_ALLOC (route, sizeof (*route));
244 atomic_set (&route->ksnr_refcount, 1);
245 route->ksnr_sharecount = 0;
246 route->ksnr_peer = NULL;
247 route->ksnr_timeout = jiffies;
248 route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
249 route->ksnr_ipaddr = ipaddr;
250 route->ksnr_port = port;
251 route->ksnr_buffer_size = buffer_size;
252 route->ksnr_irq_affinity = irq_affinity;
253 route->ksnr_eager = eager;
254 route->ksnr_connecting = 0;
255 route->ksnr_connected = 0;
256 route->ksnr_deleted = 0;
257 route->ksnr_conn_count = 0;
263 ksocknal_destroy_route (ksock_route_t *route)
265 LASSERT (route->ksnr_sharecount == 0);
267 if (route->ksnr_peer != NULL)
268 ksocknal_put_peer (route->ksnr_peer);
270 PORTAL_FREE (route, sizeof (*route));
274 ksocknal_put_route (ksock_route_t *route)
276 CDEBUG (D_OTHER, "putting route[%p] (%d)\n",
277 route, atomic_read (&route->ksnr_refcount));
279 LASSERT (atomic_read (&route->ksnr_refcount) > 0);
280 if (!atomic_dec_and_test (&route->ksnr_refcount))
283 ksocknal_destroy_route (route);
287 ksocknal_create_peer (ptl_nid_t nid)
291 LASSERT (nid != PTL_NID_ANY);
293 PORTAL_ALLOC (peer, sizeof (*peer));
297 memset (peer, 0, sizeof (*peer));
299 peer->ksnp_nid = nid;
300 atomic_set (&peer->ksnp_refcount, 1); /* 1 ref for caller */
301 peer->ksnp_closing = 0;
302 INIT_LIST_HEAD (&peer->ksnp_conns);
303 INIT_LIST_HEAD (&peer->ksnp_routes);
304 INIT_LIST_HEAD (&peer->ksnp_tx_queue);
306 atomic_inc (&ksocknal_data.ksnd_npeers);
311 ksocknal_destroy_peer (ksock_peer_t *peer)
313 CDEBUG (D_NET, "peer "LPX64" %p deleted\n", peer->ksnp_nid, peer);
315 LASSERT (atomic_read (&peer->ksnp_refcount) == 0);
316 LASSERT (list_empty (&peer->ksnp_conns));
317 LASSERT (list_empty (&peer->ksnp_routes));
318 LASSERT (list_empty (&peer->ksnp_tx_queue));
320 PORTAL_FREE (peer, sizeof (*peer));
322 /* NB a peer's connections and autoconnect routes keep a reference
323 * on their peer until they are destroyed, so we can be assured
324 * that _all_ state to do with this peer has been cleaned up when
325 * its refcount drops to zero. */
326 atomic_dec (&ksocknal_data.ksnd_npeers);
330 ksocknal_put_peer (ksock_peer_t *peer)
332 CDEBUG (D_OTHER, "putting peer[%p] -> "LPX64" (%d)\n",
333 peer, peer->ksnp_nid,
334 atomic_read (&peer->ksnp_refcount));
336 LASSERT (atomic_read (&peer->ksnp_refcount) > 0);
337 if (!atomic_dec_and_test (&peer->ksnp_refcount))
340 ksocknal_destroy_peer (peer);
344 ksocknal_find_peer_locked (ptl_nid_t nid)
346 struct list_head *peer_list = ksocknal_nid2peerlist (nid);
347 struct list_head *tmp;
350 list_for_each (tmp, peer_list) {
352 peer = list_entry (tmp, ksock_peer_t, ksnp_list);
354 LASSERT (!peer->ksnp_closing);
355 LASSERT (!(list_empty (&peer->ksnp_routes) &&
356 list_empty (&peer->ksnp_conns)));
358 if (peer->ksnp_nid != nid)
361 CDEBUG(D_NET, "got peer [%p] -> "LPX64" (%d)\n",
362 peer, nid, atomic_read (&peer->ksnp_refcount));
369 ksocknal_get_peer (ptl_nid_t nid)
373 read_lock (&ksocknal_data.ksnd_global_lock);
374 peer = ksocknal_find_peer_locked (nid);
375 if (peer != NULL) /* +1 ref for caller? */
376 atomic_inc (&peer->ksnp_refcount);
377 read_unlock (&ksocknal_data.ksnd_global_lock);
383 ksocknal_unlink_peer_locked (ksock_peer_t *peer)
385 LASSERT (!peer->ksnp_closing);
386 peer->ksnp_closing = 1;
387 list_del (&peer->ksnp_list);
388 /* lose peerlist's ref */
389 ksocknal_put_peer (peer);
393 ksocknal_get_route_by_idx (int index)
396 struct list_head *ptmp;
397 ksock_route_t *route;
398 struct list_head *rtmp;
401 read_lock (&ksocknal_data.ksnd_global_lock);
403 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
404 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
405 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
407 LASSERT (!(list_empty (&peer->ksnp_routes) &&
408 list_empty (&peer->ksnp_conns)));
410 list_for_each (rtmp, &peer->ksnp_routes) {
414 route = list_entry (rtmp, ksock_route_t, ksnr_list);
415 atomic_inc (&route->ksnr_refcount);
416 read_unlock (&ksocknal_data.ksnd_global_lock);
422 read_unlock (&ksocknal_data.ksnd_global_lock);
427 ksocknal_add_route (ptl_nid_t nid, __u32 ipaddr, int port, int bufnob,
428 int bind_irq, int share, int eager)
433 ksock_route_t *route;
434 struct list_head *rtmp;
435 ksock_route_t *route2;
437 if (nid == PTL_NID_ANY)
440 /* Have a brand new peer ready... */
441 peer = ksocknal_create_peer (nid);
445 route = ksocknal_create_route (ipaddr, port, bufnob,
448 ksocknal_put_peer (peer);
452 write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
454 peer2 = ksocknal_find_peer_locked (nid);
456 ksocknal_put_peer (peer);
459 /* peer table takes existing ref on peer */
460 list_add (&peer->ksnp_list,
461 ksocknal_nid2peerlist (nid));
466 /* check for existing route to this NID via this ipaddr */
467 list_for_each (rtmp, &peer->ksnp_routes) {
468 route2 = list_entry (rtmp, ksock_route_t, ksnr_list);
470 if (route2->ksnr_ipaddr == ipaddr)
477 if (route2 != NULL) {
478 ksocknal_put_route (route);
481 /* route takes a ref on peer */
482 route->ksnr_peer = peer;
483 atomic_inc (&peer->ksnp_refcount);
484 /* peer's route list takes existing ref on route */
485 list_add_tail (&route->ksnr_list, &peer->ksnp_routes);
488 route->ksnr_sharecount++;
490 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
496 ksocknal_del_route_locked (ksock_route_t *route, int share, int keep_conn)
498 ksock_peer_t *peer = route->ksnr_peer;
500 struct list_head *ctmp;
501 struct list_head *cnxt;
504 route->ksnr_sharecount = 0;
506 route->ksnr_sharecount--;
507 if (route->ksnr_sharecount != 0)
511 list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
512 conn = list_entry(ctmp, ksock_conn_t, ksnc_list);
514 if (conn->ksnc_route != route)
518 ksocknal_close_conn_locked (conn, 0);
522 /* keeping the conn; just dissociate it and route... */
523 conn->ksnc_route = NULL;
524 ksocknal_put_route (route); /* drop conn's ref on route */
527 route->ksnr_deleted = 1;
528 list_del (&route->ksnr_list);
529 ksocknal_put_route (route); /* drop peer's ref */
531 if (list_empty (&peer->ksnp_routes) &&
532 list_empty (&peer->ksnp_conns)) {
533 /* I've just removed the last autoconnect route of a peer
534 * with no active connections */
535 ksocknal_unlink_peer_locked (peer);
540 ksocknal_del_route (ptl_nid_t nid, __u32 ipaddr, int share, int keep_conn)
543 struct list_head *ptmp;
544 struct list_head *pnxt;
546 struct list_head *rtmp;
547 struct list_head *rnxt;
548 ksock_route_t *route;
554 write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
556 if (nid != PTL_NID_ANY)
557 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
560 hi = ksocknal_data.ksnd_peer_hash_size - 1;
563 for (i = lo; i <= hi; i++) {
564 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
565 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
567 if (!(nid == PTL_NID_ANY || peer->ksnp_nid == nid))
570 list_for_each_safe (rtmp, rnxt, &peer->ksnp_routes) {
571 route = list_entry (rtmp, ksock_route_t,
575 route->ksnr_ipaddr == ipaddr))
578 ksocknal_del_route_locked (route, share, keep_conn);
579 rc = 0; /* matched something */
586 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
592 ksocknal_get_conn_by_idx (int index)
595 struct list_head *ptmp;
597 struct list_head *ctmp;
600 read_lock (&ksocknal_data.ksnd_global_lock);
602 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
603 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
604 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
606 LASSERT (!(list_empty (&peer->ksnp_routes) &&
607 list_empty (&peer->ksnp_conns)));
609 list_for_each (ctmp, &peer->ksnp_conns) {
613 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
614 atomic_inc (&conn->ksnc_refcount);
615 read_unlock (&ksocknal_data.ksnd_global_lock);
621 read_unlock (&ksocknal_data.ksnd_global_lock);
626 ksocknal_get_peer_addr (ksock_conn_t *conn)
628 struct sockaddr_in sin;
629 int len = sizeof (sin);
632 rc = conn->ksnc_sock->ops->getname (conn->ksnc_sock,
633 (struct sockaddr *)&sin, &len, 2);
634 /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
635 LASSERT (!conn->ksnc_closing);
636 LASSERT (len <= sizeof (sin));
639 CERROR ("Error %d getting sock peer IP\n", rc);
643 conn->ksnc_ipaddr = ntohl (sin.sin_addr.s_addr);
644 conn->ksnc_port = ntohs (sin.sin_port);
648 ksocknal_conn_irq (ksock_conn_t *conn)
651 struct dst_entry *dst;
653 dst = sk_dst_get (conn->ksnc_sock->sk);
655 if (dst->dev != NULL) {
657 if (irq >= NR_IRQS) {
658 CERROR ("Unexpected IRQ %x\n", irq);
665 /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
666 LASSERT (!conn->ksnc_closing);
671 ksocknal_choose_scheduler_locked (unsigned int irq)
673 ksock_sched_t *sched;
674 ksock_irqinfo_t *info;
677 LASSERT (irq < NR_IRQS);
678 info = &ksocknal_data.ksnd_irqinfo[irq];
680 if (irq != 0 && /* hardware NIC */
681 info->ksni_valid) { /* already set up */
682 return (&ksocknal_data.ksnd_schedulers[info->ksni_sched]);
685 /* software NIC (irq == 0) || not associated with a scheduler yet.
686 * Choose the CPU with the fewest connections... */
687 sched = &ksocknal_data.ksnd_schedulers[0];
688 for (i = 1; i < SOCKNAL_N_SCHED; i++)
689 if (sched->kss_nconns >
690 ksocknal_data.ksnd_schedulers[i].kss_nconns)
691 sched = &ksocknal_data.ksnd_schedulers[i];
693 if (irq != 0) { /* Hardware NIC */
694 info->ksni_valid = 1;
695 info->ksni_sched = sched - ksocknal_data.ksnd_schedulers;
698 LASSERT (info->ksni_sched == sched - ksocknal_data.ksnd_schedulers);
705 ksocknal_create_conn (ksock_route_t *route, struct socket *sock,
706 int bind_irq, int type)
714 ksock_sched_t *sched;
719 /* NB, sock has an associated file since (a) this connection might
720 * have been created in userland and (b) we need to refcount the
721 * socket so that we don't close it while I/O is being done on
722 * it, and sock->file has that pre-cooked... */
723 LASSERT (sock->file != NULL);
724 LASSERT (file_count(sock->file) > 0);
726 rc = ksocknal_setup_sock (sock);
731 /* acceptor or explicit connect */
734 LASSERT (type != SOCKNAL_CONN_NONE);
735 /* autoconnect: expect this nid on exchange */
736 nid = route->ksnr_peer->ksnp_nid;
739 rc = ksocknal_hello (sock, &nid, &type, &incarnation);
744 if (route == NULL) { /* not autoconnect */
745 /* Assume this socket connects to a brand new peer */
746 peer = ksocknal_create_peer (nid);
751 PORTAL_ALLOC(conn, sizeof(*conn));
754 ksocknal_put_peer (peer);
758 memset (conn, 0, sizeof (*conn));
759 conn->ksnc_peer = NULL;
760 conn->ksnc_route = NULL;
761 conn->ksnc_sock = sock;
762 conn->ksnc_type = type;
763 conn->ksnc_incarnation = incarnation;
764 conn->ksnc_saved_data_ready = sock->sk->sk_data_ready;
765 conn->ksnc_saved_write_space = sock->sk->sk_write_space;
766 atomic_set (&conn->ksnc_refcount, 1); /* 1 ref for me */
768 conn->ksnc_rx_ready = 0;
769 conn->ksnc_rx_scheduled = 0;
770 ksocknal_new_packet (conn, 0);
772 INIT_LIST_HEAD (&conn->ksnc_tx_queue);
773 conn->ksnc_tx_ready = 0;
774 conn->ksnc_tx_scheduled = 0;
775 atomic_set (&conn->ksnc_tx_nob, 0);
777 ksocknal_get_peer_addr (conn);
779 CWARN("New conn nid:"LPX64" ip:%08x/%d incarnation:"LPX64"\n",
780 nid, conn->ksnc_ipaddr, conn->ksnc_port, incarnation);
782 irq = ksocknal_conn_irq (conn);
784 write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
788 LASSERT ((route->ksnr_connected & (1 << type)) == 0);
789 LASSERT ((route->ksnr_connecting & (1 << type)) != 0);
791 if (route->ksnr_deleted) {
792 /* This conn was autoconnected, but the autoconnect
793 * route got deleted while it was being
795 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock,
797 PORTAL_FREE (conn, sizeof (*conn));
802 /* associate conn/route */
803 conn->ksnc_route = route;
804 atomic_inc (&route->ksnr_refcount);
806 route->ksnr_connecting &= ~(1 << type);
807 route->ksnr_connected |= (1 << type);
808 route->ksnr_conn_count++;
809 route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
811 peer = route->ksnr_peer;
813 /* Not an autoconnected connection; see if there is an
814 * existing peer for this NID */
815 peer2 = ksocknal_find_peer_locked (nid);
817 ksocknal_put_peer (peer);
820 list_add (&peer->ksnp_list,
821 ksocknal_nid2peerlist (nid));
822 /* peer list takes over existing ref */
826 LASSERT (!peer->ksnp_closing);
828 conn->ksnc_peer = peer;
829 atomic_inc (&peer->ksnp_refcount);
830 peer->ksnp_last_alive = jiffies;
831 peer->ksnp_error = 0;
833 /* Set the deadline for the outgoing HELLO to drain */
834 conn->ksnc_tx_deadline = jiffies +
835 ksocknal_data.ksnd_io_timeout * HZ;
837 list_add (&conn->ksnc_list, &peer->ksnp_conns);
838 atomic_inc (&conn->ksnc_refcount);
840 sched = ksocknal_choose_scheduler_locked (irq);
842 conn->ksnc_scheduler = sched;
844 /* NB my callbacks block while I hold ksnd_global_lock */
845 sock->sk->sk_user_data = conn;
846 sock->sk->sk_data_ready = ksocknal_data_ready;
847 sock->sk->sk_write_space = ksocknal_write_space;
849 /* Take all the packets blocking for a connection.
850 * NB, it might be nicer to share these blocked packets among any
851 * other connections that are becoming established, however that
852 * confuses the normal packet launching operation, which selects a
853 * connection and queues the packet on it without needing an
854 * exclusive lock on ksnd_global_lock. */
855 while (!list_empty (&peer->ksnp_tx_queue)) {
856 tx = list_entry (peer->ksnp_tx_queue.next,
857 ksock_tx_t, tx_list);
859 list_del (&tx->tx_list);
860 ksocknal_queue_tx_locked (tx, conn);
863 rc = ksocknal_close_stale_conns_locked (peer, incarnation);
865 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
868 CERROR ("Closed %d stale conns to nid "LPX64" ip %d.%d.%d.%d\n",
869 rc, conn->ksnc_peer->ksnp_nid,
870 HIPQUAD(conn->ksnc_ipaddr));
872 if (bind_irq) /* irq binding required */
873 ksocknal_bind_irq (irq);
875 /* Call the callbacks right now to get things going. */
876 ksocknal_data_ready (sock->sk, 0);
877 ksocknal_write_space (sock->sk);
879 CDEBUG(D_IOCTL, "conn [%p] registered for nid "LPX64" ip %d.%d.%d.%d\n",
880 conn, conn->ksnc_peer->ksnp_nid, HIPQUAD(conn->ksnc_ipaddr));
882 ksocknal_put_conn (conn);
887 ksocknal_close_conn_locked (ksock_conn_t *conn, int error)
889 /* This just does the immmediate housekeeping, and queues the
890 * connection for the reaper to terminate.
891 * Caller holds ksnd_global_lock exclusively in irq context */
892 ksock_peer_t *peer = conn->ksnc_peer;
893 ksock_route_t *route;
895 LASSERT (peer->ksnp_error == 0);
896 LASSERT (!conn->ksnc_closing);
897 conn->ksnc_closing = 1;
898 atomic_inc (&ksocknal_data.ksnd_nclosing_conns);
900 route = conn->ksnc_route;
902 /* dissociate conn from route... */
903 LASSERT (!route->ksnr_deleted);
904 LASSERT ((route->ksnr_connecting & (1 << conn->ksnc_type)) == 0);
905 LASSERT ((route->ksnr_connected & (1 << conn->ksnc_type)) != 0);
907 route->ksnr_connected &= ~(1 << conn->ksnc_type);
908 conn->ksnc_route = NULL;
910 list_del (&route->ksnr_list); /* make route least favourite */
911 list_add_tail (&route->ksnr_list, &peer->ksnp_routes);
913 ksocknal_put_route (route); /* drop conn's ref on route */
916 /* ksnd_deathrow_conns takes over peer's ref */
917 list_del (&conn->ksnc_list);
919 if (list_empty (&peer->ksnp_conns)) {
920 /* No more connections to this peer */
922 peer->ksnp_error = error; /* stash last conn close reason */
924 if (list_empty (&peer->ksnp_routes)) {
925 /* I've just closed last conn belonging to a
926 * non-autoconnecting peer */
927 ksocknal_unlink_peer_locked (peer);
931 spin_lock (&ksocknal_data.ksnd_reaper_lock);
933 list_add_tail (&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
934 wake_up (&ksocknal_data.ksnd_reaper_waitq);
936 spin_unlock (&ksocknal_data.ksnd_reaper_lock);
940 ksocknal_terminate_conn (ksock_conn_t *conn)
942 /* This gets called by the reaper (guaranteed thread context) to
943 * disengage the socket from its callbacks and close it.
944 * ksnc_refcount will eventually hit zero, and then the reaper will
947 ksock_peer_t *peer = conn->ksnc_peer;
948 ksock_sched_t *sched = conn->ksnc_scheduler;
953 LASSERT(conn->ksnc_closing);
955 /* wake up the scheduler to "send" all remaining packets to /dev/null */
956 spin_lock_irqsave(&sched->kss_lock, flags);
958 if (!conn->ksnc_tx_scheduled &&
959 !list_empty(&conn->ksnc_tx_queue)){
960 list_add_tail (&conn->ksnc_tx_list,
961 &sched->kss_tx_conns);
962 /* a closing conn is always ready to tx */
963 conn->ksnc_tx_ready = 1;
964 conn->ksnc_tx_scheduled = 1;
965 /* extra ref for scheduler */
966 atomic_inc (&conn->ksnc_refcount);
968 wake_up (&sched->kss_waitq);
971 spin_unlock_irqrestore (&sched->kss_lock, flags);
973 /* serialise with callbacks */
974 write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
976 /* Remove conn's network callbacks.
977 * NB I _have_ to restore the callback, rather than storing a noop,
978 * since the socket could survive past this module being unloaded!! */
979 conn->ksnc_sock->sk->sk_data_ready = conn->ksnc_saved_data_ready;
980 conn->ksnc_sock->sk->sk_write_space = conn->ksnc_saved_write_space;
982 /* A callback could be in progress already; they hold a read lock
983 * on ksnd_global_lock (to serialise with me) and NOOP if
984 * sk_user_data is NULL. */
985 conn->ksnc_sock->sk->sk_user_data = NULL;
987 /* OK, so this conn may not be completely disengaged from its
988 * scheduler yet, but it _has_ committed to terminate... */
989 conn->ksnc_scheduler->kss_nconns--;
991 if (peer->ksnp_error != 0) {
992 /* peer's last conn closed in error */
993 LASSERT (list_empty (&peer->ksnp_conns));
995 /* convert peer's last-known-alive timestamp from jiffies */
996 do_gettimeofday (&now);
997 then = now.tv_sec - (jiffies - peer->ksnp_last_alive)/HZ;
1001 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1003 /* The socket is closed on the final put; either here, or in
1004 * ksocknal_{send,recv}msg(). Since we set up the linger2 option
1005 * when the connection was established, this will close the socket
1006 * immediately, aborting anything buffered in it. Any hung
1007 * zero-copy transmits will therefore complete in finite time. */
1008 ksocknal_putconnsock (conn);
1011 kpr_notify (&ksocknal_data.ksnd_router, peer->ksnp_nid,
1016 ksocknal_destroy_conn (ksock_conn_t *conn)
1018 /* Final coup-de-grace of the reaper */
1019 CDEBUG (D_NET, "connection %p\n", conn);
1021 LASSERT (atomic_read (&conn->ksnc_refcount) == 0);
1022 LASSERT (conn->ksnc_route == NULL);
1023 LASSERT (!conn->ksnc_tx_scheduled);
1024 LASSERT (!conn->ksnc_rx_scheduled);
1025 LASSERT (list_empty(&conn->ksnc_tx_queue));
1027 /* complete current receive if any */
1028 switch (conn->ksnc_rx_state) {
1029 case SOCKNAL_RX_BODY:
1030 CERROR("Completing partial receive from "LPX64
1031 ", ip %d.%d.%d.%d:%d, with error\n",
1032 conn->ksnc_peer->ksnp_nid,
1033 HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
1034 lib_finalize (&ksocknal_lib, NULL, conn->ksnc_cookie, PTL_FAIL);
1036 case SOCKNAL_RX_BODY_FWD:
1037 ksocknal_fmb_callback (conn->ksnc_cookie, -ECONNABORTED);
1039 case SOCKNAL_RX_HEADER:
1040 case SOCKNAL_RX_SLOP:
1047 ksocknal_put_peer (conn->ksnc_peer);
1049 PORTAL_FREE (conn, sizeof (*conn));
1050 atomic_dec (&ksocknal_data.ksnd_nclosing_conns);
1054 ksocknal_put_conn (ksock_conn_t *conn)
1056 unsigned long flags;
1058 CDEBUG (D_OTHER, "putting conn[%p] -> "LPX64" (%d)\n",
1059 conn, conn->ksnc_peer->ksnp_nid,
1060 atomic_read (&conn->ksnc_refcount));
1062 LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
1063 if (!atomic_dec_and_test (&conn->ksnc_refcount))
1066 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
1068 list_add (&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1069 wake_up (&ksocknal_data.ksnd_reaper_waitq);
1071 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
1075 ksocknal_close_peer_conns_locked (ksock_peer_t *peer, __u32 ipaddr, int why)
1078 struct list_head *ctmp;
1079 struct list_head *cnxt;
1082 list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1083 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
1086 conn->ksnc_ipaddr == ipaddr) {
1088 ksocknal_close_conn_locked (conn, why);
1096 ksocknal_close_stale_conns_locked (ksock_peer_t *peer, __u64 incarnation)
1099 struct list_head *ctmp;
1100 struct list_head *cnxt;
1103 list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1104 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
1106 if (conn->ksnc_incarnation == incarnation)
1109 CWARN("Closing stale conn nid:"LPX64" ip:%08x/%d "
1110 "incarnation:"LPX64"("LPX64")\n",
1111 peer->ksnp_nid, conn->ksnc_ipaddr, conn->ksnc_port,
1112 conn->ksnc_incarnation, incarnation);
1115 ksocknal_close_conn_locked (conn, -ESTALE);
1122 ksocknal_close_conn_and_siblings (ksock_conn_t *conn, int why)
1124 ksock_peer_t *peer = conn->ksnc_peer;
1125 __u32 ipaddr = conn->ksnc_ipaddr;
1126 unsigned long flags;
1129 write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1131 count = ksocknal_close_peer_conns_locked (peer, ipaddr, why);
1133 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1139 ksocknal_close_matching_conns (ptl_nid_t nid, __u32 ipaddr)
1141 unsigned long flags;
1143 struct list_head *ptmp;
1144 struct list_head *pnxt;
1150 write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1152 if (nid != PTL_NID_ANY)
1153 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
1156 hi = ksocknal_data.ksnd_peer_hash_size - 1;
1159 for (i = lo; i <= hi; i++) {
1160 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
1162 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
1164 if (!(nid == PTL_NID_ANY || nid == peer->ksnp_nid))
1167 count += ksocknal_close_peer_conns_locked (peer, ipaddr, 0);
1171 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1173 /* wildcards always succeed */
1174 if (nid == PTL_NID_ANY || ipaddr == 0)
1177 return (count == 0 ? -ENOENT : 0);
1181 ksocknal_notify (void *arg, ptl_nid_t gw_nid, int alive)
1183 /* The router is telling me she's been notified of a change in
1184 * gateway state.... */
1186 CDEBUG (D_NET, "gw "LPX64" %s\n", gw_nid, alive ? "up" : "down");
1189 /* If the gateway crashed, close all open connections... */
1190 ksocknal_close_matching_conns (gw_nid, 0);
1194 /* ...otherwise do nothing. We can only establish new connections
1195 * if we have autroutes, and these connect on demand. */
1198 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1199 struct tcp_opt *sock2tcp_opt(struct sock *sk)
1201 return &(sk->tp_pinfo.af_tcp);
1204 struct tcp_opt *sock2tcp_opt(struct sock *sk)
1206 struct tcp_sock *s = (struct tcp_sock *)sk;
1212 ksocknal_push_conn (ksock_conn_t *conn)
1221 rc = ksocknal_getconnsock (conn);
1222 if (rc != 0) /* being shut down */
1225 sk = conn->ksnc_sock->sk;
1226 tp = sock2tcp_opt(sk);
1229 nonagle = tp->nonagle;
1236 rc = sk->sk_prot->setsockopt (sk, SOL_TCP, TCP_NODELAY,
1237 (char *)&val, sizeof (val));
1243 tp->nonagle = nonagle;
1246 ksocknal_putconnsock (conn);
1250 ksocknal_push_peer (ksock_peer_t *peer)
1254 struct list_head *tmp;
1257 for (index = 0; ; index++) {
1258 read_lock (&ksocknal_data.ksnd_global_lock);
1263 list_for_each (tmp, &peer->ksnp_conns) {
1265 conn = list_entry (tmp, ksock_conn_t, ksnc_list);
1266 atomic_inc (&conn->ksnc_refcount);
1271 read_unlock (&ksocknal_data.ksnd_global_lock);
1276 ksocknal_push_conn (conn);
1277 ksocknal_put_conn (conn);
1282 ksocknal_push (ptl_nid_t nid)
1285 struct list_head *tmp;
1291 if (nid != PTL_NID_ANY) {
1292 peer = ksocknal_get_peer (nid);
1296 ksocknal_push_peer (peer);
1297 ksocknal_put_peer (peer);
1302 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1303 for (j = 0; ; j++) {
1304 read_lock (&ksocknal_data.ksnd_global_lock);
1309 list_for_each (tmp, &ksocknal_data.ksnd_peers[i]) {
1311 peer = list_entry(tmp, ksock_peer_t,
1313 atomic_inc (&peer->ksnp_refcount);
1318 read_unlock (&ksocknal_data.ksnd_global_lock);
1322 ksocknal_push_peer (peer);
1323 ksocknal_put_peer (peer);
1333 ksocknal_cmd(struct portals_cfg *pcfg, void * private)
1337 LASSERT (pcfg != NULL);
1339 switch(pcfg->pcfg_command) {
1340 case NAL_CMD_GET_AUTOCONN: {
1341 ksock_route_t *route = ksocknal_get_route_by_idx (pcfg->pcfg_count);
1347 pcfg->pcfg_nid = route->ksnr_peer->ksnp_nid;
1348 pcfg->pcfg_id = route->ksnr_ipaddr;
1349 pcfg->pcfg_misc = route->ksnr_port;
1350 pcfg->pcfg_count = route->ksnr_conn_count;
1351 pcfg->pcfg_size = route->ksnr_buffer_size;
1352 pcfg->pcfg_wait = route->ksnr_sharecount;
1353 pcfg->pcfg_flags = (route->ksnr_irq_affinity ? 2 : 0) |
1354 (route->ksnr_eager ? 4 : 0);
1355 ksocknal_put_route (route);
1359 case NAL_CMD_ADD_AUTOCONN: {
1360 rc = ksocknal_add_route (pcfg->pcfg_nid, pcfg->pcfg_id,
1361 pcfg->pcfg_misc, pcfg->pcfg_size,
1362 (pcfg->pcfg_flags & 0x02) != 0,
1363 (pcfg->pcfg_flags & 0x04) != 0,
1364 (pcfg->pcfg_flags & 0x08) != 0);
1367 case NAL_CMD_DEL_AUTOCONN: {
1368 rc = ksocknal_del_route (pcfg->pcfg_nid, pcfg->pcfg_id,
1369 (pcfg->pcfg_flags & 1) != 0,
1370 (pcfg->pcfg_flags & 2) != 0);
1373 case NAL_CMD_GET_CONN: {
1374 ksock_conn_t *conn = ksocknal_get_conn_by_idx (pcfg->pcfg_count);
1380 pcfg->pcfg_nid = conn->ksnc_peer->ksnp_nid;
1381 pcfg->pcfg_id = conn->ksnc_ipaddr;
1382 pcfg->pcfg_misc = conn->ksnc_port;
1383 pcfg->pcfg_flags = conn->ksnc_type;
1384 ksocknal_put_conn (conn);
1388 case NAL_CMD_REGISTER_PEER_FD: {
1389 struct socket *sock = sockfd_lookup (pcfg->pcfg_fd, &rc);
1390 int type = pcfg->pcfg_misc;
1396 case SOCKNAL_CONN_NONE:
1397 case SOCKNAL_CONN_ANY:
1398 case SOCKNAL_CONN_CONTROL:
1399 case SOCKNAL_CONN_BULK_IN:
1400 case SOCKNAL_CONN_BULK_OUT:
1401 rc = ksocknal_create_conn(NULL, sock, pcfg->pcfg_flags, type);
1409 case NAL_CMD_CLOSE_CONNECTION: {
1410 rc = ksocknal_close_matching_conns (pcfg->pcfg_nid,
1414 case NAL_CMD_REGISTER_MYNID: {
1415 rc = ksocknal_set_mynid (pcfg->pcfg_nid);
1418 case NAL_CMD_PUSH_CONNECTION: {
1419 rc = ksocknal_push (pcfg->pcfg_nid);
1428 ksocknal_free_fmbs (ksock_fmb_pool_t *p)
1430 int npages = p->fmp_buff_pages;
1434 LASSERT (list_empty(&p->fmp_blocked_conns));
1435 LASSERT (p->fmp_nactive_fmbs == 0);
1437 while (!list_empty(&p->fmp_idle_fmbs)) {
1439 fmb = list_entry(p->fmp_idle_fmbs.next,
1440 ksock_fmb_t, fmb_list);
1442 for (i = 0; i < npages; i++)
1443 if (fmb->fmb_kiov[i].kiov_page != NULL)
1444 __free_page(fmb->fmb_kiov[i].kiov_page);
1446 list_del(&fmb->fmb_list);
1447 PORTAL_FREE(fmb, offsetof(ksock_fmb_t, fmb_kiov[npages]));
1452 ksocknal_free_buffers (void)
1454 ksocknal_free_fmbs(&ksocknal_data.ksnd_small_fmp);
1455 ksocknal_free_fmbs(&ksocknal_data.ksnd_large_fmp);
1457 LASSERT (atomic_read(&ksocknal_data.ksnd_nactive_ltxs) == 0);
1459 if (ksocknal_data.ksnd_schedulers != NULL)
1460 PORTAL_FREE (ksocknal_data.ksnd_schedulers,
1461 sizeof (ksock_sched_t) * SOCKNAL_N_SCHED);
1463 PORTAL_FREE (ksocknal_data.ksnd_peers,
1464 sizeof (struct list_head) *
1465 ksocknal_data.ksnd_peer_hash_size);
1469 ksocknal_module_fini (void)
1473 CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1474 atomic_read (&portal_kmemory));
1476 switch (ksocknal_data.ksnd_init) {
1480 case SOCKNAL_INIT_ALL:
1482 if (ksocknal_data.ksnd_sysctl != NULL)
1483 unregister_sysctl_table (ksocknal_data.ksnd_sysctl);
1485 kportal_nal_unregister(SOCKNAL);
1486 PORTAL_SYMBOL_UNREGISTER (ksocknal_ni);
1489 case SOCKNAL_INIT_PTL:
1490 /* No more calls to ksocknal_cmd() to create new
1491 * autoroutes/connections since we're being unloaded. */
1492 PtlNIFini(ksocknal_ni);
1494 /* Delete all autoroute entries */
1495 ksocknal_del_route(PTL_NID_ANY, 0, 0, 0);
1497 /* Delete all connections */
1498 ksocknal_close_matching_conns (PTL_NID_ANY, 0);
1500 /* Wait for all peer state to clean up */
1502 while (atomic_read (&ksocknal_data.ksnd_npeers) != 0) {
1504 CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
1505 "waiting for %d peers to disconnect\n",
1506 atomic_read (&ksocknal_data.ksnd_npeers));
1507 set_current_state (TASK_UNINTERRUPTIBLE);
1508 schedule_timeout (HZ);
1511 /* Tell lib we've stopped calling into her. */
1512 lib_fini(&ksocknal_lib);
1515 case SOCKNAL_INIT_DATA:
1516 /* Module refcount only gets to zero when all peers
1517 * have been closed so all lists must be empty */
1518 LASSERT (atomic_read (&ksocknal_data.ksnd_npeers) == 0);
1519 LASSERT (ksocknal_data.ksnd_peers != NULL);
1520 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1521 LASSERT (list_empty (&ksocknal_data.ksnd_peers[i]));
1523 LASSERT (list_empty (&ksocknal_data.ksnd_enomem_conns));
1524 LASSERT (list_empty (&ksocknal_data.ksnd_zombie_conns));
1525 LASSERT (list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
1526 LASSERT (list_empty (&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns));
1527 LASSERT (list_empty (&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns));
1529 if (ksocknal_data.ksnd_schedulers != NULL)
1530 for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1531 ksock_sched_t *kss =
1532 &ksocknal_data.ksnd_schedulers[i];
1534 LASSERT (list_empty (&kss->kss_tx_conns));
1535 LASSERT (list_empty (&kss->kss_rx_conns));
1536 LASSERT (kss->kss_nconns == 0);
1539 /* stop router calling me */
1540 kpr_shutdown (&ksocknal_data.ksnd_router);
1542 /* flag threads to terminate; wake and wait for them to die */
1543 ksocknal_data.ksnd_shuttingdown = 1;
1544 wake_up_all (&ksocknal_data.ksnd_autoconnectd_waitq);
1545 wake_up_all (&ksocknal_data.ksnd_reaper_waitq);
1547 for (i = 0; i < SOCKNAL_N_SCHED; i++)
1548 wake_up_all(&ksocknal_data.ksnd_schedulers[i].kss_waitq);
1550 while (atomic_read (&ksocknal_data.ksnd_nthreads) != 0) {
1551 CDEBUG (D_NET, "waitinf for %d threads to terminate\n",
1552 atomic_read (&ksocknal_data.ksnd_nthreads));
1553 set_current_state (TASK_UNINTERRUPTIBLE);
1554 schedule_timeout (HZ);
1557 kpr_deregister (&ksocknal_data.ksnd_router);
1559 ksocknal_free_buffers();
1562 case SOCKNAL_INIT_NOTHING:
1566 CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1567 atomic_read (&portal_kmemory));
1569 printk(KERN_INFO "Lustre: Routing socket NAL unloaded (final mem %d)\n",
1570 atomic_read(&portal_kmemory));
1575 ksocknal_init_incarnation (void)
1579 /* The incarnation number is the time this module loaded and it
1580 * identifies this particular instance of the socknal. Hopefully
1581 * we won't be able to reboot more frequently than 1MHz for the
1582 * forseeable future :) */
1584 do_gettimeofday(&tv);
1586 ksocknal_data.ksnd_incarnation =
1587 (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
1591 ksocknal_module_init (void)
1593 int pkmem = atomic_read(&portal_kmemory);
1598 /* packet descriptor must fit in a router descriptor's scratchpad */
1599 LASSERT(sizeof (ksock_tx_t) <= sizeof (kprfd_scratch_t));
1600 /* the following must be sizeof(int) for proc_dointvec() */
1601 LASSERT(sizeof (ksocknal_data.ksnd_io_timeout) == sizeof (int));
1602 LASSERT(sizeof (ksocknal_data.ksnd_eager_ack) == sizeof (int));
1603 /* check ksnr_connected/connecting field large enough */
1604 LASSERT(SOCKNAL_CONN_NTYPES <= 4);
1606 LASSERT (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
1608 ksocknal_api.forward = ksocknal_api_forward;
1609 ksocknal_api.shutdown = ksocknal_api_shutdown;
1610 ksocknal_api.validate = NULL; /* our api validate is a NOOP */
1611 ksocknal_api.lock = ksocknal_api_lock;
1612 ksocknal_api.unlock = ksocknal_api_unlock;
1613 ksocknal_api.nal_data = &ksocknal_data;
1615 ksocknal_lib.nal_data = &ksocknal_data;
1617 memset (&ksocknal_data, 0, sizeof (ksocknal_data)); /* zero pointers */
1619 ksocknal_data.ksnd_io_timeout = SOCKNAL_IO_TIMEOUT;
1620 ksocknal_data.ksnd_eager_ack = SOCKNAL_EAGER_ACK;
1621 ksocknal_data.ksnd_typed_conns = SOCKNAL_TYPED_CONNS;
1622 ksocknal_data.ksnd_min_bulk = SOCKNAL_MIN_BULK;
1624 ksocknal_data.ksnd_zc_min_frag = SOCKNAL_ZC_MIN_FRAG;
1626 ksocknal_init_incarnation();
1628 ksocknal_data.ksnd_peer_hash_size = SOCKNAL_PEER_HASH_SIZE;
1629 PORTAL_ALLOC (ksocknal_data.ksnd_peers,
1630 sizeof (struct list_head) * ksocknal_data.ksnd_peer_hash_size);
1631 if (ksocknal_data.ksnd_peers == NULL)
1634 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++)
1635 INIT_LIST_HEAD(&ksocknal_data.ksnd_peers[i]);
1637 rwlock_init(&ksocknal_data.ksnd_global_lock);
1639 ksocknal_data.ksnd_nal_cb = &ksocknal_lib;
1640 spin_lock_init (&ksocknal_data.ksnd_nal_cb_lock);
1641 init_waitqueue_head(&ksocknal_data.ksnd_yield_waitq);
1643 spin_lock_init(&ksocknal_data.ksnd_small_fmp.fmp_lock);
1644 INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_idle_fmbs);
1645 INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns);
1646 ksocknal_data.ksnd_small_fmp.fmp_buff_pages = SOCKNAL_SMALL_FWD_PAGES;
1648 spin_lock_init(&ksocknal_data.ksnd_large_fmp.fmp_lock);
1649 INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_idle_fmbs);
1650 INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns);
1651 ksocknal_data.ksnd_large_fmp.fmp_buff_pages = SOCKNAL_LARGE_FWD_PAGES;
1653 spin_lock_init (&ksocknal_data.ksnd_reaper_lock);
1654 INIT_LIST_HEAD (&ksocknal_data.ksnd_enomem_conns);
1655 INIT_LIST_HEAD (&ksocknal_data.ksnd_zombie_conns);
1656 INIT_LIST_HEAD (&ksocknal_data.ksnd_deathrow_conns);
1657 init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
1659 spin_lock_init (&ksocknal_data.ksnd_autoconnectd_lock);
1660 INIT_LIST_HEAD (&ksocknal_data.ksnd_autoconnectd_routes);
1661 init_waitqueue_head(&ksocknal_data.ksnd_autoconnectd_waitq);
1663 /* NB memset above zeros whole of ksocknal_data, including
1664 * ksocknal_data.ksnd_irqinfo[all].ksni_valid */
1666 /* flag lists/ptrs/locks initialised */
1667 ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
1669 PORTAL_ALLOC(ksocknal_data.ksnd_schedulers,
1670 sizeof(ksock_sched_t) * SOCKNAL_N_SCHED);
1671 if (ksocknal_data.ksnd_schedulers == NULL) {
1672 ksocknal_module_fini ();
1676 for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1677 ksock_sched_t *kss = &ksocknal_data.ksnd_schedulers[i];
1679 spin_lock_init (&kss->kss_lock);
1680 INIT_LIST_HEAD (&kss->kss_rx_conns);
1681 INIT_LIST_HEAD (&kss->kss_tx_conns);
1683 INIT_LIST_HEAD (&kss->kss_zctxdone_list);
1685 init_waitqueue_head (&kss->kss_waitq);
1688 rc = PtlNIInit(ksocknal_init, 32, 4, 0, &ksocknal_ni);
1690 CERROR("ksocknal: PtlNIInit failed: error %d\n", rc);
1691 ksocknal_module_fini ();
1694 PtlNIDebug(ksocknal_ni, ~0);
1696 ksocknal_data.ksnd_init = SOCKNAL_INIT_PTL; // flag PtlNIInit() called
1698 for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1699 rc = ksocknal_thread_start (ksocknal_scheduler,
1700 &ksocknal_data.ksnd_schedulers[i]);
1702 CERROR("Can't spawn socknal scheduler[%d]: %d\n",
1704 ksocknal_module_fini ();
1709 for (i = 0; i < SOCKNAL_N_AUTOCONNECTD; i++) {
1710 rc = ksocknal_thread_start (ksocknal_autoconnectd, (void *)((long)i));
1712 CERROR("Can't spawn socknal autoconnectd: %d\n", rc);
1713 ksocknal_module_fini ();
1718 rc = ksocknal_thread_start (ksocknal_reaper, NULL);
1720 CERROR ("Can't spawn socknal reaper: %d\n", rc);
1721 ksocknal_module_fini ();
1725 rc = kpr_register(&ksocknal_data.ksnd_router,
1726 &ksocknal_router_interface);
1728 CDEBUG(D_NET, "Can't initialise routing interface "
1729 "(rc = %d): not routing\n", rc);
1731 /* Only allocate forwarding buffers if I'm on a gateway */
1733 for (i = 0; i < (SOCKNAL_SMALL_FWD_NMSGS +
1734 SOCKNAL_LARGE_FWD_NMSGS); i++) {
1736 ksock_fmb_pool_t *pool;
1739 if (i < SOCKNAL_SMALL_FWD_NMSGS)
1740 pool = &ksocknal_data.ksnd_small_fmp;
1742 pool = &ksocknal_data.ksnd_large_fmp;
1744 PORTAL_ALLOC(fmb, offsetof(ksock_fmb_t,
1745 fmb_kiov[pool->fmp_buff_pages]));
1747 ksocknal_module_fini();
1751 fmb->fmb_pool = pool;
1753 for (j = 0; j < pool->fmp_buff_pages; j++) {
1754 fmb->fmb_kiov[j].kiov_page = alloc_page(GFP_KERNEL);
1756 if (fmb->fmb_kiov[j].kiov_page == NULL) {
1757 ksocknal_module_fini ();
1761 LASSERT(page_address(fmb->fmb_kiov[j].kiov_page) != NULL);
1764 list_add(&fmb->fmb_list, &pool->fmp_idle_fmbs);
1768 rc = kportal_nal_register(SOCKNAL, &ksocknal_cmd, NULL);
1770 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
1771 ksocknal_module_fini ();
1775 PORTAL_SYMBOL_REGISTER(ksocknal_ni);
1777 #ifdef CONFIG_SYSCTL
1778 /* Press on regardless even if registering sysctl doesn't work */
1779 ksocknal_data.ksnd_sysctl = register_sysctl_table (ksocknal_top_ctl_table, 0);
1781 /* flag everything initialised */
1782 ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
1784 printk(KERN_INFO "Lustre: Routing socket NAL loaded "
1785 "(Routing %s, initial mem %d, incarnation "LPX64")\n",
1786 kpr_routing (&ksocknal_data.ksnd_router) ?
1787 "enabled" : "disabled", pkmem, ksocknal_data.ksnd_incarnation);
1792 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1793 MODULE_DESCRIPTION("Kernel TCP Socket NAL v0.01");
1794 MODULE_LICENSE("GPL");
1796 module_init(ksocknal_module_init);
1797 module_exit(ksocknal_module_fini);
1799 EXPORT_SYMBOL (ksocknal_ni);