1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5 * Author: Zach Brown <zab@zabbo.net>
6 * Author: Peter J. Braam <braam@clusterfs.com>
7 * Author: Phil Schwan <phil@clusterfs.com>
8 * Author: Eric Barton <eric@bartonsoftware.com>
10 * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
12 * Portals is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Portals is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Portals; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
28 ptl_handle_ni_t ksocknal_ni;
29 static nal_t ksocknal_api;
30 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
31 ksock_nal_data_t ksocknal_data;
33 static ksock_nal_data_t ksocknal_data;
36 kpr_nal_interface_t ksocknal_router_interface = {
38 kprni_arg: &ksocknal_data,
39 kprni_fwd: ksocknal_fwd_packet,
40 kprni_notify: ksocknal_notify,
43 #define SOCKNAL_SYSCTL 200
45 #define SOCKNAL_SYSCTL_TIMEOUT 1
46 #define SOCKNAL_SYSCTL_EAGER_ACK 2
47 #define SOCKNAL_SYSCTL_ZERO_COPY 3
48 #define SOCKNAL_SYSCTL_TYPED 4
49 #define SOCKNAL_SYSCTL_MIN_BULK 5
51 static ctl_table ksocknal_ctl_table[] = {
52 {SOCKNAL_SYSCTL_TIMEOUT, "timeout",
53 &ksocknal_data.ksnd_io_timeout, sizeof (int),
54 0644, NULL, &proc_dointvec},
55 {SOCKNAL_SYSCTL_EAGER_ACK, "eager_ack",
56 &ksocknal_data.ksnd_eager_ack, sizeof (int),
57 0644, NULL, &proc_dointvec},
59 {SOCKNAL_SYSCTL_EAGER_ACK, "zero_copy",
60 &ksocknal_data.ksnd_zc_min_frag, sizeof (int),
61 0644, NULL, &proc_dointvec},
63 {SOCKNAL_SYSCTL_TYPED, "typed",
64 &ksocknal_data.ksnd_typed_conns, sizeof (int),
65 0644, NULL, &proc_dointvec},
66 {SOCKNAL_SYSCTL_MIN_BULK, "min_bulk",
67 &ksocknal_data.ksnd_min_bulk, sizeof (int),
68 0644, NULL, &proc_dointvec},
72 static ctl_table ksocknal_top_ctl_table[] = {
73 {SOCKNAL_SYSCTL, "socknal", NULL, 0, 0555, ksocknal_ctl_table},
78 ksocknal_api_forward(nal_t *nal, int id, void *args, size_t args_len,
79 void *ret, size_t ret_len)
85 nal_cb = k->ksnd_nal_cb;
87 lib_dispatch(nal_cb, k, id, args, ret); /* ksocknal_send needs k */
92 ksocknal_api_shutdown(nal_t *nal, int ni)
94 CDEBUG (D_NET, "closing all connections\n");
96 ksocknal_del_route (PTL_NID_ANY, 0, 0, 0);
97 ksocknal_close_matching_conns (PTL_NID_ANY, 0);
102 ksocknal_api_yield(nal_t *nal)
109 ksocknal_api_lock(nal_t *nal, unsigned long *flags)
115 nal_cb = k->ksnd_nal_cb;
116 nal_cb->cb_cli(nal_cb,flags);
120 ksocknal_api_unlock(nal_t *nal, unsigned long *flags)
126 nal_cb = k->ksnd_nal_cb;
127 nal_cb->cb_sti(nal_cb,flags);
131 ksocknal_init(int interface, ptl_pt_index_t ptl_size,
132 ptl_ac_index_t ac_size, ptl_pid_t requested_pid)
134 CDEBUG(D_NET, "calling lib_init with nid "LPX64"\n", (ptl_nid_t)0);
135 lib_init(&ksocknal_lib, (ptl_nid_t)0, 0, 10, ptl_size, ac_size);
136 return (&ksocknal_api);
140 * EXTRA functions follow
144 ksocknal_set_mynid(ptl_nid_t nid)
146 lib_ni_t *ni = &ksocknal_lib.ni;
148 /* FIXME: we have to do this because we call lib_init() at module
149 * insertion time, which is before we have 'mynid' available. lib_init
150 * sets the NAL's nid, which it uses to tell other nodes where packets
151 * are coming from. This is not a very graceful solution to this
154 CDEBUG(D_IOCTL, "setting mynid to "LPX64" (old nid="LPX64")\n",
162 ksocknal_bind_irq (unsigned int irq)
164 #if (defined(CONFIG_SMP) && CPU_AFFINITY)
168 ksock_irqinfo_t *info;
169 char *argv[] = {"/bin/sh",
173 char *envp[] = {"HOME=/",
174 "PATH=/sbin:/bin:/usr/sbin:/usr/bin",
177 LASSERT (irq < NR_IRQS);
178 if (irq == 0) /* software NIC */
181 info = &ksocknal_data.ksnd_irqinfo[irq];
183 write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
185 LASSERT (info->ksni_valid);
186 bind = !info->ksni_bound;
187 info->ksni_bound = 1;
189 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
191 if (!bind) /* bound already */
194 snprintf (cmdline, sizeof (cmdline),
195 "echo %d > /proc/irq/%u/smp_affinity", 1 << info->ksni_sched, irq);
197 printk (KERN_INFO "Lustre: Binding irq %u to CPU %d with cmd: %s\n",
198 irq, info->ksni_sched, cmdline);
200 /* FIXME: Find a better method of setting IRQ affinity...
203 call_usermodehelper (argv[0], argv, envp);
208 ksocknal_create_route (__u32 ipaddr, int port, int buffer_size,
209 int nonagel, int irq_affinity, int eager)
211 ksock_route_t *route;
213 PORTAL_ALLOC (route, sizeof (*route));
217 atomic_set (&route->ksnr_refcount, 1);
218 route->ksnr_sharecount = 0;
219 route->ksnr_peer = NULL;
220 route->ksnr_timeout = jiffies;
221 route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
222 route->ksnr_ipaddr = ipaddr;
223 route->ksnr_port = port;
224 route->ksnr_buffer_size = buffer_size;
225 route->ksnr_irq_affinity = irq_affinity;
226 route->ksnr_nonagel = nonagel;
227 route->ksnr_eager = eager;
228 route->ksnr_connecting = 0;
229 route->ksnr_connected = 0;
230 route->ksnr_deleted = 0;
231 route->ksnr_conn_count = 0;
237 ksocknal_destroy_route (ksock_route_t *route)
239 LASSERT (route->ksnr_sharecount == 0);
241 if (route->ksnr_peer != NULL)
242 ksocknal_put_peer (route->ksnr_peer);
244 PORTAL_FREE (route, sizeof (*route));
248 ksocknal_put_route (ksock_route_t *route)
250 CDEBUG (D_OTHER, "putting route[%p] (%d)\n",
251 route, atomic_read (&route->ksnr_refcount));
253 LASSERT (atomic_read (&route->ksnr_refcount) > 0);
254 if (!atomic_dec_and_test (&route->ksnr_refcount))
257 ksocknal_destroy_route (route);
261 ksocknal_create_peer (ptl_nid_t nid)
265 LASSERT (nid != PTL_NID_ANY);
267 PORTAL_ALLOC (peer, sizeof (*peer));
271 memset (peer, 0, sizeof (*peer));
273 peer->ksnp_nid = nid;
274 atomic_set (&peer->ksnp_refcount, 1); /* 1 ref for caller */
275 peer->ksnp_closing = 0;
276 INIT_LIST_HEAD (&peer->ksnp_conns);
277 INIT_LIST_HEAD (&peer->ksnp_routes);
278 INIT_LIST_HEAD (&peer->ksnp_tx_queue);
280 /* Can't unload while peers exist; ensures all I/O has terminated
281 * before unload attempts */
283 atomic_inc (&ksocknal_data.ksnd_npeers);
288 ksocknal_destroy_peer (ksock_peer_t *peer)
290 CDEBUG (D_NET, "peer "LPX64" %p deleted\n", peer->ksnp_nid, peer);
292 LASSERT (atomic_read (&peer->ksnp_refcount) == 0);
293 LASSERT (list_empty (&peer->ksnp_conns));
294 LASSERT (list_empty (&peer->ksnp_routes));
295 LASSERT (list_empty (&peer->ksnp_tx_queue));
297 PORTAL_FREE (peer, sizeof (*peer));
299 /* NB a peer's connections and autoconnect routes keep a reference
300 * on their peer until they are destroyed, so we can be assured
301 * that _all_ state to do with this peer has been cleaned up when
302 * its refcount drops to zero. */
303 atomic_dec (&ksocknal_data.ksnd_npeers);
308 ksocknal_put_peer (ksock_peer_t *peer)
310 CDEBUG (D_OTHER, "putting peer[%p] -> "LPX64" (%d)\n",
311 peer, peer->ksnp_nid,
312 atomic_read (&peer->ksnp_refcount));
314 LASSERT (atomic_read (&peer->ksnp_refcount) > 0);
315 if (!atomic_dec_and_test (&peer->ksnp_refcount))
318 ksocknal_destroy_peer (peer);
322 ksocknal_find_peer_locked (ptl_nid_t nid)
324 struct list_head *peer_list = ksocknal_nid2peerlist (nid);
325 struct list_head *tmp;
328 list_for_each (tmp, peer_list) {
330 peer = list_entry (tmp, ksock_peer_t, ksnp_list);
332 LASSERT (!peer->ksnp_closing);
333 LASSERT (!(list_empty (&peer->ksnp_routes) &&
334 list_empty (&peer->ksnp_conns)));
336 if (peer->ksnp_nid != nid)
339 CDEBUG(D_NET, "got peer [%p] -> "LPX64" (%d)\n",
340 peer, nid, atomic_read (&peer->ksnp_refcount));
347 ksocknal_get_peer (ptl_nid_t nid)
351 read_lock (&ksocknal_data.ksnd_global_lock);
352 peer = ksocknal_find_peer_locked (nid);
353 if (peer != NULL) /* +1 ref for caller? */
354 atomic_inc (&peer->ksnp_refcount);
355 read_unlock (&ksocknal_data.ksnd_global_lock);
361 ksocknal_unlink_peer_locked (ksock_peer_t *peer)
363 LASSERT (!peer->ksnp_closing);
364 peer->ksnp_closing = 1;
365 list_del (&peer->ksnp_list);
366 /* lose peerlist's ref */
367 ksocknal_put_peer (peer);
371 ksocknal_get_route_by_idx (int index)
374 struct list_head *ptmp;
375 ksock_route_t *route;
376 struct list_head *rtmp;
379 read_lock (&ksocknal_data.ksnd_global_lock);
381 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
382 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
383 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
385 LASSERT (!(list_empty (&peer->ksnp_routes) &&
386 list_empty (&peer->ksnp_conns)));
388 list_for_each (rtmp, &peer->ksnp_routes) {
392 route = list_entry (rtmp, ksock_route_t, ksnr_list);
393 atomic_inc (&route->ksnr_refcount);
394 read_unlock (&ksocknal_data.ksnd_global_lock);
400 read_unlock (&ksocknal_data.ksnd_global_lock);
405 ksocknal_add_route (ptl_nid_t nid, __u32 ipaddr, int port, int bufnob,
406 int nonagle, int bind_irq, int share, int eager)
411 ksock_route_t *route;
412 struct list_head *rtmp;
413 ksock_route_t *route2;
415 if (nid == PTL_NID_ANY)
418 /* Have a brand new peer ready... */
419 peer = ksocknal_create_peer (nid);
423 route = ksocknal_create_route (ipaddr, port, bufnob,
424 nonagle, bind_irq, eager);
426 ksocknal_put_peer (peer);
430 write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
432 peer2 = ksocknal_find_peer_locked (nid);
434 ksocknal_put_peer (peer);
437 /* peer table takes existing ref on peer */
438 list_add (&peer->ksnp_list,
439 ksocknal_nid2peerlist (nid));
444 /* check for existing route to this NID via this ipaddr */
445 list_for_each (rtmp, &peer->ksnp_routes) {
446 route2 = list_entry (rtmp, ksock_route_t, ksnr_list);
448 if (route2->ksnr_ipaddr == ipaddr)
455 if (route2 != NULL) {
456 ksocknal_put_route (route);
459 /* route takes a ref on peer */
460 route->ksnr_peer = peer;
461 atomic_inc (&peer->ksnp_refcount);
462 /* peer's route list takes existing ref on route */
463 list_add_tail (&route->ksnr_list, &peer->ksnp_routes);
466 route->ksnr_sharecount++;
468 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
474 ksocknal_del_route_locked (ksock_route_t *route, int share, int keep_conn)
476 ksock_peer_t *peer = route->ksnr_peer;
478 struct list_head *ctmp;
479 struct list_head *cnxt;
482 route->ksnr_sharecount = 0;
484 route->ksnr_sharecount--;
485 if (route->ksnr_sharecount != 0)
489 list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
490 conn = list_entry(ctmp, ksock_conn_t, ksnc_list);
492 if (conn->ksnc_route != route)
496 ksocknal_close_conn_locked (conn, 0);
500 /* keeping the conn; just dissociate it and route... */
501 conn->ksnc_route = NULL;
502 ksocknal_put_route (route); /* drop conn's ref on route */
505 route->ksnr_deleted = 1;
506 list_del (&route->ksnr_list);
507 ksocknal_put_route (route); /* drop peer's ref */
509 if (list_empty (&peer->ksnp_routes) &&
510 list_empty (&peer->ksnp_conns)) {
511 /* I've just removed the last autoconnect route of a peer
512 * with no active connections */
513 ksocknal_unlink_peer_locked (peer);
518 ksocknal_del_route (ptl_nid_t nid, __u32 ipaddr, int share, int keep_conn)
521 struct list_head *ptmp;
522 struct list_head *pnxt;
524 struct list_head *rtmp;
525 struct list_head *rnxt;
526 ksock_route_t *route;
532 write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
534 if (nid != PTL_NID_ANY)
535 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
538 hi = ksocknal_data.ksnd_peer_hash_size - 1;
541 for (i = lo; i <= hi; i++) {
542 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
543 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
545 if (!(nid == PTL_NID_ANY || peer->ksnp_nid == nid))
548 list_for_each_safe (rtmp, rnxt, &peer->ksnp_routes) {
549 route = list_entry (rtmp, ksock_route_t,
553 route->ksnr_ipaddr == ipaddr))
556 ksocknal_del_route_locked (route, share, keep_conn);
557 rc = 0; /* matched something */
564 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
570 ksocknal_get_conn_by_idx (int index)
573 struct list_head *ptmp;
575 struct list_head *ctmp;
578 read_lock (&ksocknal_data.ksnd_global_lock);
580 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
581 list_for_each (ptmp, &ksocknal_data.ksnd_peers[i]) {
582 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
584 LASSERT (!(list_empty (&peer->ksnp_routes) &&
585 list_empty (&peer->ksnp_conns)));
587 list_for_each (ctmp, &peer->ksnp_conns) {
591 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
592 atomic_inc (&conn->ksnc_refcount);
593 read_unlock (&ksocknal_data.ksnd_global_lock);
599 read_unlock (&ksocknal_data.ksnd_global_lock);
604 ksocknal_get_peer_addr (ksock_conn_t *conn)
606 struct sockaddr_in sin;
607 int len = sizeof (sin);
610 rc = conn->ksnc_sock->ops->getname (conn->ksnc_sock,
611 (struct sockaddr *)&sin, &len, 2);
612 /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
613 LASSERT (!conn->ksnc_closing);
614 LASSERT (len <= sizeof (sin));
617 CERROR ("Error %d getting sock peer IP\n", rc);
621 conn->ksnc_ipaddr = ntohl (sin.sin_addr.s_addr);
622 conn->ksnc_port = ntohs (sin.sin_port);
626 ksocknal_conn_irq (ksock_conn_t *conn)
629 struct dst_entry *dst;
631 dst = sk_dst_get (conn->ksnc_sock->sk);
633 if (dst->dev != NULL) {
635 if (irq >= NR_IRQS) {
636 CERROR ("Unexpected IRQ %x\n", irq);
643 /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
644 LASSERT (!conn->ksnc_closing);
649 ksocknal_choose_scheduler_locked (unsigned int irq)
651 ksock_sched_t *sched;
652 ksock_irqinfo_t *info;
655 LASSERT (irq < NR_IRQS);
656 info = &ksocknal_data.ksnd_irqinfo[irq];
658 if (irq != 0 && /* hardware NIC */
659 info->ksni_valid) { /* already set up */
660 return (&ksocknal_data.ksnd_schedulers[info->ksni_sched]);
663 /* software NIC (irq == 0) || not associated with a scheduler yet.
664 * Choose the CPU with the fewest connections... */
665 sched = &ksocknal_data.ksnd_schedulers[0];
666 for (i = 1; i < SOCKNAL_N_SCHED; i++)
667 if (sched->kss_nconns >
668 ksocknal_data.ksnd_schedulers[i].kss_nconns)
669 sched = &ksocknal_data.ksnd_schedulers[i];
671 if (irq != 0) { /* Hardware NIC */
672 info->ksni_valid = 1;
673 info->ksni_sched = sched - ksocknal_data.ksnd_schedulers;
676 LASSERT (info->ksni_sched == sched - ksocknal_data.ksnd_schedulers);
683 ksocknal_create_conn (ksock_route_t *route, struct socket *sock,
684 int bind_irq, int type)
692 ksock_sched_t *sched;
697 /* NB, sock has an associated file since (a) this connection might
698 * have been created in userland and (b) we need to refcount the
699 * socket so that we don't close it while I/O is being done on
700 * it, and sock->file has that pre-cooked... */
701 LASSERT (sock->file != NULL);
702 LASSERT (file_count(sock->file) > 0);
704 rc = ksocknal_setup_sock (sock);
709 /* acceptor or explicit connect */
712 LASSERT (type != SOCKNAL_CONN_NONE);
713 /* autoconnect: expect this nid on exchange */
714 nid = route->ksnr_peer->ksnp_nid;
717 rc = ksocknal_hello (sock, &nid, &type, &incarnation);
722 if (route == NULL) { /* not autoconnect */
723 /* Assume this socket connects to a brand new peer */
724 peer = ksocknal_create_peer (nid);
729 PORTAL_ALLOC(conn, sizeof(*conn));
732 ksocknal_put_peer (peer);
736 memset (conn, 0, sizeof (*conn));
737 conn->ksnc_peer = NULL;
738 conn->ksnc_route = NULL;
739 conn->ksnc_sock = sock;
740 conn->ksnc_type = type;
741 conn->ksnc_incarnation = incarnation;
742 conn->ksnc_saved_data_ready = sock->sk->sk_data_ready;
743 conn->ksnc_saved_write_space = sock->sk->sk_write_space;
744 atomic_set (&conn->ksnc_refcount, 1); /* 1 ref for me */
746 conn->ksnc_rx_ready = 0;
747 conn->ksnc_rx_scheduled = 0;
748 ksocknal_new_packet (conn, 0);
750 INIT_LIST_HEAD (&conn->ksnc_tx_queue);
751 conn->ksnc_tx_ready = 0;
752 conn->ksnc_tx_scheduled = 0;
753 atomic_set (&conn->ksnc_tx_nob, 0);
755 ksocknal_get_peer_addr (conn);
757 irq = ksocknal_conn_irq (conn);
759 write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
763 LASSERT ((route->ksnr_connected & (1 << type)) == 0);
764 LASSERT ((route->ksnr_connecting & (1 << type)) != 0);
766 if (route->ksnr_deleted) {
767 /* This conn was autoconnected, but the autoconnect
768 * route got deleted while it was being
770 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock,
772 PORTAL_FREE (conn, sizeof (*conn));
777 /* associate conn/route */
778 conn->ksnc_route = route;
779 atomic_inc (&route->ksnr_refcount);
781 route->ksnr_connecting &= ~(1 << type);
782 route->ksnr_connected |= (1 << type);
783 route->ksnr_conn_count++;
784 route->ksnr_retry_interval = SOCKNAL_MIN_RECONNECT_INTERVAL;
786 peer = route->ksnr_peer;
788 /* Not an autoconnected connection; see if there is an
789 * existing peer for this NID */
790 peer2 = ksocknal_find_peer_locked (nid);
792 ksocknal_put_peer (peer);
795 list_add (&peer->ksnp_list,
796 ksocknal_nid2peerlist (nid));
797 /* peer list takes over existing ref */
801 LASSERT (!peer->ksnp_closing);
803 conn->ksnc_peer = peer;
804 atomic_inc (&peer->ksnp_refcount);
805 peer->ksnp_last_alive = jiffies;
806 peer->ksnp_error = 0;
808 list_add (&conn->ksnc_list, &peer->ksnp_conns);
809 atomic_inc (&conn->ksnc_refcount);
811 sched = ksocknal_choose_scheduler_locked (irq);
813 conn->ksnc_scheduler = sched;
815 /* NB my callbacks block while I hold ksnd_global_lock */
816 sock->sk->sk_user_data = conn;
817 sock->sk->sk_data_ready = ksocknal_data_ready;
818 sock->sk->sk_write_space = ksocknal_write_space;
820 /* Take all the packets blocking for a connection.
821 * NB, it might be nicer to share these blocked packets among any
822 * other connections that are becoming established, however that
823 * confuses the normal packet launching operation, which selects a
824 * connection and queues the packet on it without needing an
825 * exclusive lock on ksnd_global_lock. */
826 while (!list_empty (&peer->ksnp_tx_queue)) {
827 tx = list_entry (peer->ksnp_tx_queue.next,
828 ksock_tx_t, tx_list);
830 list_del (&tx->tx_list);
831 ksocknal_queue_tx_locked (tx, conn);
834 rc = ksocknal_close_stale_conns_locked (peer, incarnation);
836 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
839 CERROR ("Closed %d stale conns to "LPX64"\n", rc, nid);
841 if (bind_irq) /* irq binding required */
842 ksocknal_bind_irq (irq);
844 /* Call the callbacks right now to get things going. */
845 ksocknal_data_ready (sock->sk, 0);
846 ksocknal_write_space (sock->sk);
848 CDEBUG(D_IOCTL, "conn [%p] registered for nid "LPX64"\n",
849 conn, conn->ksnc_peer->ksnp_nid);
851 ksocknal_put_conn (conn);
856 ksocknal_close_conn_locked (ksock_conn_t *conn, int error)
858 /* This just does the immmediate housekeeping, and queues the
859 * connection for the reaper to terminate.
860 * Caller holds ksnd_global_lock exclusively in irq context */
861 ksock_peer_t *peer = conn->ksnc_peer;
862 ksock_route_t *route;
864 LASSERT (peer->ksnp_error == 0);
865 LASSERT (!conn->ksnc_closing);
866 conn->ksnc_closing = 1;
867 atomic_inc (&ksocknal_data.ksnd_nclosing_conns);
869 route = conn->ksnc_route;
871 /* dissociate conn from route... */
872 LASSERT (!route->ksnr_deleted);
873 LASSERT ((route->ksnr_connecting & (1 << conn->ksnc_type)) == 0);
874 LASSERT ((route->ksnr_connected & (1 << conn->ksnc_type)) != 0);
876 route->ksnr_connected &= ~(1 << conn->ksnc_type);
877 conn->ksnc_route = NULL;
879 list_del (&route->ksnr_list); /* make route least favourite */
880 list_add_tail (&route->ksnr_list, &peer->ksnp_routes);
882 ksocknal_put_route (route); /* drop conn's ref on route */
885 /* ksnd_deathrow_conns takes over peer's ref */
886 list_del (&conn->ksnc_list);
888 if (list_empty (&peer->ksnp_conns)) {
889 /* No more connections to this peer */
891 peer->ksnp_error = error; /* stash last conn close reason */
893 if (list_empty (&peer->ksnp_routes)) {
894 /* I've just closed last conn belonging to a
895 * non-autoconnecting peer */
896 ksocknal_unlink_peer_locked (peer);
900 spin_lock (&ksocknal_data.ksnd_reaper_lock);
902 list_add_tail (&conn->ksnc_list, &ksocknal_data.ksnd_deathrow_conns);
903 wake_up (&ksocknal_data.ksnd_reaper_waitq);
905 spin_unlock (&ksocknal_data.ksnd_reaper_lock);
909 ksocknal_terminate_conn (ksock_conn_t *conn)
911 /* This gets called by the reaper (guaranteed thread context) to
912 * disengage the socket from its callbacks and close it.
913 * ksnc_refcount will eventually hit zero, and then the reaper will
916 ksock_peer_t *peer = conn->ksnc_peer;
917 ksock_sched_t *sched = conn->ksnc_scheduler;
922 LASSERT(conn->ksnc_closing);
924 /* wake up the scheduler to "send" all remaining packets to /dev/null */
925 spin_lock_irqsave(&sched->kss_lock, flags);
927 if (!conn->ksnc_tx_scheduled &&
928 !list_empty(&conn->ksnc_tx_queue)){
929 list_add_tail (&conn->ksnc_tx_list,
930 &sched->kss_tx_conns);
931 /* a closing conn is always ready to tx */
932 conn->ksnc_tx_ready = 1;
933 conn->ksnc_tx_scheduled = 1;
934 /* extra ref for scheduler */
935 atomic_inc (&conn->ksnc_refcount);
937 wake_up (&sched->kss_waitq);
940 spin_unlock_irqrestore (&sched->kss_lock, flags);
942 /* serialise with callbacks */
943 write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
945 /* Remove conn's network callbacks.
946 * NB I _have_ to restore the callback, rather than storing a noop,
947 * since the socket could survive past this module being unloaded!! */
948 conn->ksnc_sock->sk->sk_data_ready = conn->ksnc_saved_data_ready;
949 conn->ksnc_sock->sk->sk_write_space = conn->ksnc_saved_write_space;
951 /* A callback could be in progress already; they hold a read lock
952 * on ksnd_global_lock (to serialise with me) and NOOP if
953 * sk_user_data is NULL. */
954 conn->ksnc_sock->sk->sk_user_data = NULL;
956 /* OK, so this conn may not be completely disengaged from its
957 * scheduler yet, but it _has_ committed to terminate... */
958 conn->ksnc_scheduler->kss_nconns--;
960 if (peer->ksnp_error != 0) {
961 /* peer's last conn closed in error */
962 LASSERT (list_empty (&peer->ksnp_conns));
964 /* convert peer's last-known-alive timestamp from jiffies */
965 do_gettimeofday (&now);
966 then = now.tv_sec - (jiffies - peer->ksnp_last_alive)/HZ;
970 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
972 /* The socket is closed on the final put; either here, or in
973 * ksocknal_{send,recv}msg(). Since we set up the linger2 option
974 * when the connection was established, this will close the socket
975 * immediately, aborting anything buffered in it. Any hung
976 * zero-copy transmits will therefore complete in finite time. */
977 ksocknal_putconnsock (conn);
980 kpr_notify (&ksocknal_data.ksnd_router, peer->ksnp_nid,
985 ksocknal_destroy_conn (ksock_conn_t *conn)
987 /* Final coup-de-grace of the reaper */
988 CDEBUG (D_NET, "connection %p\n", conn);
990 LASSERT (atomic_read (&conn->ksnc_refcount) == 0);
991 LASSERT (conn->ksnc_route == NULL);
992 LASSERT (!conn->ksnc_tx_scheduled);
993 LASSERT (!conn->ksnc_rx_scheduled);
994 LASSERT (list_empty(&conn->ksnc_tx_queue));
996 /* complete current receive if any */
997 switch (conn->ksnc_rx_state) {
998 case SOCKNAL_RX_BODY:
1000 lib_finalize (&ksocknal_lib, NULL, conn->ksnc_cookie);
1002 CERROR ("Refusing to complete a partial receive from "
1003 LPX64", ip %08x\n", conn->ksnc_peer->ksnp_nid,
1005 CERROR ("This may hang communications and "
1006 "prevent modules from unloading\n");
1009 case SOCKNAL_RX_BODY_FWD:
1010 ksocknal_fmb_callback (conn->ksnc_cookie, -ECONNABORTED);
1012 case SOCKNAL_RX_HEADER:
1013 case SOCKNAL_RX_SLOP:
1020 ksocknal_put_peer (conn->ksnc_peer);
1022 PORTAL_FREE (conn, sizeof (*conn));
1023 atomic_dec (&ksocknal_data.ksnd_nclosing_conns);
1027 ksocknal_put_conn (ksock_conn_t *conn)
1029 unsigned long flags;
1031 CDEBUG (D_OTHER, "putting conn[%p] -> "LPX64" (%d)\n",
1032 conn, conn->ksnc_peer->ksnp_nid,
1033 atomic_read (&conn->ksnc_refcount));
1035 LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
1036 if (!atomic_dec_and_test (&conn->ksnc_refcount))
1039 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
1041 list_add (&conn->ksnc_list, &ksocknal_data.ksnd_zombie_conns);
1042 wake_up (&ksocknal_data.ksnd_reaper_waitq);
1044 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
1048 ksocknal_close_peer_conns_locked (ksock_peer_t *peer, __u32 ipaddr, int why)
1051 struct list_head *ctmp;
1052 struct list_head *cnxt;
1055 list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1056 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
1059 conn->ksnc_ipaddr == ipaddr) {
1061 ksocknal_close_conn_locked (conn, why);
1069 ksocknal_close_stale_conns_locked (ksock_peer_t *peer, __u64 incarnation)
1072 struct list_head *ctmp;
1073 struct list_head *cnxt;
1076 list_for_each_safe (ctmp, cnxt, &peer->ksnp_conns) {
1077 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
1079 if (conn->ksnc_incarnation == incarnation)
1083 ksocknal_close_conn_locked (conn, -ESTALE);
1090 ksocknal_close_conn_and_siblings (ksock_conn_t *conn, int why)
1092 ksock_peer_t *peer = conn->ksnc_peer;
1093 __u32 ipaddr = conn->ksnc_ipaddr;
1094 unsigned long flags;
1097 write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1099 count = ksocknal_close_peer_conns_locked (peer, ipaddr, why);
1101 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1107 ksocknal_close_matching_conns (ptl_nid_t nid, __u32 ipaddr)
1109 unsigned long flags;
1111 struct list_head *ptmp;
1112 struct list_head *pnxt;
1118 write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
1120 if (nid != PTL_NID_ANY)
1121 lo = hi = ksocknal_nid2peerlist(nid) - ksocknal_data.ksnd_peers;
1124 hi = ksocknal_data.ksnd_peer_hash_size - 1;
1127 for (i = lo; i <= hi; i++) {
1128 list_for_each_safe (ptmp, pnxt, &ksocknal_data.ksnd_peers[i]) {
1130 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
1132 if (!(nid == PTL_NID_ANY || nid == peer->ksnp_nid))
1135 count += ksocknal_close_peer_conns_locked (peer, ipaddr, 0);
1139 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
1141 /* wildcards always succeed */
1142 if (nid == PTL_NID_ANY || ipaddr == 0)
1145 return (count == 0 ? -ENOENT : 0);
1149 ksocknal_notify (void *arg, ptl_nid_t gw_nid, int alive)
1151 /* The router is telling me she's been notified of a change in
1152 * gateway state.... */
1154 CDEBUG (D_NET, "gw "LPX64" %s\n", gw_nid, alive ? "up" : "down");
1157 /* If the gateway crashed, close all open connections... */
1158 ksocknal_close_matching_conns (gw_nid, 0);
1162 /* ...otherwise do nothing. We can only establish new connections
1163 * if we have autroutes, and these connect on demand. */
1166 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1167 struct tcp_opt *sock2tcp_opt(struct sock *sk)
1169 return &(sk->tp_pinfo.af_tcp);
1172 struct tcp_opt *sock2tcp_opt(struct sock *sk)
1174 struct tcp_sock *s = (struct tcp_sock *)sk;
1180 ksocknal_push_conn (ksock_conn_t *conn)
1189 rc = ksocknal_getconnsock (conn);
1190 if (rc != 0) /* being shut down */
1193 sk = conn->ksnc_sock->sk;
1194 tp = sock2tcp_opt(sk);
1197 nonagle = tp->nonagle;
1204 rc = sk->sk_prot->setsockopt (sk, SOL_TCP, TCP_NODELAY,
1205 (char *)&val, sizeof (val));
1211 tp->nonagle = nonagle;
1214 ksocknal_putconnsock (conn);
1218 ksocknal_push_peer (ksock_peer_t *peer)
1222 struct list_head *tmp;
1225 for (index = 0; ; index++) {
1226 read_lock (&ksocknal_data.ksnd_global_lock);
1231 list_for_each (tmp, &peer->ksnp_conns) {
1233 conn = list_entry (tmp, ksock_conn_t, ksnc_list);
1234 atomic_inc (&conn->ksnc_refcount);
1239 read_unlock (&ksocknal_data.ksnd_global_lock);
1244 ksocknal_push_conn (conn);
1245 ksocknal_put_conn (conn);
1250 ksocknal_push (ptl_nid_t nid)
1253 struct list_head *tmp;
1259 if (nid != PTL_NID_ANY) {
1260 peer = ksocknal_get_peer (nid);
1264 ksocknal_push_peer (peer);
1265 ksocknal_put_peer (peer);
1270 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1271 for (j = 0; ; j++) {
1272 read_lock (&ksocknal_data.ksnd_global_lock);
1277 list_for_each (tmp, &ksocknal_data.ksnd_peers[i]) {
1279 peer = list_entry(tmp, ksock_peer_t,
1281 atomic_inc (&peer->ksnp_refcount);
1286 read_unlock (&ksocknal_data.ksnd_global_lock);
1290 ksocknal_push_peer (peer);
1291 ksocknal_put_peer (peer);
1301 ksocknal_cmd(struct portals_cfg *pcfg, void * private)
1305 LASSERT (pcfg != NULL);
1307 switch(pcfg->pcfg_command) {
1308 case NAL_CMD_GET_AUTOCONN: {
1309 ksock_route_t *route = ksocknal_get_route_by_idx (pcfg->pcfg_count);
1315 pcfg->pcfg_nid = route->ksnr_peer->ksnp_nid;
1316 pcfg->pcfg_id = route->ksnr_ipaddr;
1317 pcfg->pcfg_misc = route->ksnr_port;
1318 pcfg->pcfg_count = route->ksnr_conn_count;
1319 pcfg->pcfg_size = route->ksnr_buffer_size;
1320 pcfg->pcfg_wait = route->ksnr_sharecount;
1321 pcfg->pcfg_flags = (route->ksnr_nonagel ? 1 : 0) |
1322 (route->ksnr_irq_affinity ? 2 : 0) |
1323 (route->ksnr_eager ? 4 : 0);
1324 ksocknal_put_route (route);
1328 case NAL_CMD_ADD_AUTOCONN: {
1329 rc = ksocknal_add_route (pcfg->pcfg_nid, pcfg->pcfg_id,
1330 pcfg->pcfg_misc, pcfg->pcfg_size,
1331 (pcfg->pcfg_flags & 0x01) != 0,
1332 (pcfg->pcfg_flags & 0x02) != 0,
1333 (pcfg->pcfg_flags & 0x04) != 0,
1334 (pcfg->pcfg_flags & 0x08) != 0);
1337 case NAL_CMD_DEL_AUTOCONN: {
1338 rc = ksocknal_del_route (pcfg->pcfg_nid, pcfg->pcfg_id,
1339 (pcfg->pcfg_flags & 1) != 0,
1340 (pcfg->pcfg_flags & 2) != 0);
1343 case NAL_CMD_GET_CONN: {
1344 ksock_conn_t *conn = ksocknal_get_conn_by_idx (pcfg->pcfg_count);
1350 pcfg->pcfg_nid = conn->ksnc_peer->ksnp_nid;
1351 pcfg->pcfg_id = conn->ksnc_ipaddr;
1352 pcfg->pcfg_misc = conn->ksnc_port;
1353 pcfg->pcfg_flags = conn->ksnc_type;
1354 ksocknal_put_conn (conn);
1358 case NAL_CMD_REGISTER_PEER_FD: {
1359 struct socket *sock = sockfd_lookup (pcfg->pcfg_fd, &rc);
1360 int type = pcfg->pcfg_misc;
1366 case SOCKNAL_CONN_NONE:
1367 case SOCKNAL_CONN_ANY:
1368 case SOCKNAL_CONN_CONTROL:
1369 case SOCKNAL_CONN_BULK_IN:
1370 case SOCKNAL_CONN_BULK_OUT:
1371 rc = ksocknal_create_conn(NULL, sock, pcfg->pcfg_flags, type);
1379 case NAL_CMD_CLOSE_CONNECTION: {
1380 rc = ksocknal_close_matching_conns (pcfg->pcfg_nid,
1384 case NAL_CMD_REGISTER_MYNID: {
1385 rc = ksocknal_set_mynid (pcfg->pcfg_nid);
1388 case NAL_CMD_PUSH_CONNECTION: {
1389 rc = ksocknal_push (pcfg->pcfg_nid);
1398 ksocknal_free_fmbs (ksock_fmb_pool_t *p)
1403 LASSERT (list_empty(&p->fmp_blocked_conns));
1404 LASSERT (p->fmp_nactive_fmbs == 0);
1406 while (!list_empty(&p->fmp_idle_fmbs)) {
1408 fmb = list_entry(p->fmp_idle_fmbs.next,
1409 ksock_fmb_t, fmb_list);
1411 for (i = 0; i < fmb->fmb_npages; i++)
1412 if (fmb->fmb_pages[i] != NULL)
1413 __free_page(fmb->fmb_pages[i]);
1415 list_del(&fmb->fmb_list);
1416 PORTAL_FREE(fmb, sizeof(*fmb));
1421 ksocknal_free_buffers (void)
1423 ksocknal_free_fmbs(&ksocknal_data.ksnd_small_fmp);
1424 ksocknal_free_fmbs(&ksocknal_data.ksnd_large_fmp);
1426 LASSERT (atomic_read(&ksocknal_data.ksnd_nactive_ltxs) == 0);
1428 if (ksocknal_data.ksnd_schedulers != NULL)
1429 PORTAL_FREE (ksocknal_data.ksnd_schedulers,
1430 sizeof (ksock_sched_t) * SOCKNAL_N_SCHED);
1432 PORTAL_FREE (ksocknal_data.ksnd_peers,
1433 sizeof (struct list_head) *
1434 ksocknal_data.ksnd_peer_hash_size);
1438 ksocknal_module_fini (void)
1442 CDEBUG(D_MALLOC, "before NAL cleanup: kmem %d\n",
1443 atomic_read (&portal_kmemory));
1445 switch (ksocknal_data.ksnd_init) {
1449 case SOCKNAL_INIT_ALL:
1451 if (ksocknal_data.ksnd_sysctl != NULL)
1452 unregister_sysctl_table (ksocknal_data.ksnd_sysctl);
1454 kportal_nal_unregister(SOCKNAL);
1455 PORTAL_SYMBOL_UNREGISTER (ksocknal_ni);
1458 case SOCKNAL_INIT_PTL:
1459 PtlNIFini(ksocknal_ni);
1460 lib_fini(&ksocknal_lib);
1463 case SOCKNAL_INIT_DATA:
1464 /* Module refcount only gets to zero when all peers
1465 * have been closed so all lists must be empty */
1466 LASSERT (atomic_read (&ksocknal_data.ksnd_npeers) == 0);
1467 LASSERT (ksocknal_data.ksnd_peers != NULL);
1468 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++) {
1469 LASSERT (list_empty (&ksocknal_data.ksnd_peers[i]));
1471 LASSERT (list_empty (&ksocknal_data.ksnd_enomem_conns));
1472 LASSERT (list_empty (&ksocknal_data.ksnd_zombie_conns));
1473 LASSERT (list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
1474 LASSERT (list_empty (&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns));
1475 LASSERT (list_empty (&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns));
1477 if (ksocknal_data.ksnd_schedulers != NULL)
1478 for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1479 ksock_sched_t *kss =
1480 &ksocknal_data.ksnd_schedulers[i];
1482 LASSERT (list_empty (&kss->kss_tx_conns));
1483 LASSERT (list_empty (&kss->kss_rx_conns));
1484 LASSERT (kss->kss_nconns == 0);
1487 /* stop router calling me */
1488 kpr_shutdown (&ksocknal_data.ksnd_router);
1490 /* flag threads to terminate; wake and wait for them to die */
1491 ksocknal_data.ksnd_shuttingdown = 1;
1492 wake_up_all (&ksocknal_data.ksnd_autoconnectd_waitq);
1493 wake_up_all (&ksocknal_data.ksnd_reaper_waitq);
1495 for (i = 0; i < SOCKNAL_N_SCHED; i++)
1496 wake_up_all(&ksocknal_data.ksnd_schedulers[i].kss_waitq);
1498 while (atomic_read (&ksocknal_data.ksnd_nthreads) != 0) {
1499 CDEBUG (D_NET, "waitinf for %d threads to terminate\n",
1500 atomic_read (&ksocknal_data.ksnd_nthreads));
1501 set_current_state (TASK_UNINTERRUPTIBLE);
1502 schedule_timeout (HZ);
1505 kpr_deregister (&ksocknal_data.ksnd_router);
1507 ksocknal_free_buffers();
1510 case SOCKNAL_INIT_NOTHING:
1514 CDEBUG(D_MALLOC, "after NAL cleanup: kmem %d\n",
1515 atomic_read (&portal_kmemory));
1517 printk(KERN_INFO "Lustre: Routing socket NAL unloaded (final mem %d)\n",
1518 atomic_read(&portal_kmemory));
1523 ksocknal_init_incarnation (void)
1527 /* The incarnation number is the time this module loaded and it
1528 * identifies this particular instance of the socknal. Hopefully
1529 * we won't be able to reboot more frequently than 1MHz for the
1530 * forseeable future :) */
1532 do_gettimeofday(&tv);
1534 ksocknal_data.ksnd_incarnation =
1535 (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
1539 ksocknal_module_init (void)
1541 int pkmem = atomic_read(&portal_kmemory);
1546 /* packet descriptor must fit in a router descriptor's scratchpad */
1547 LASSERT(sizeof (ksock_tx_t) <= sizeof (kprfd_scratch_t));
1548 /* the following must be sizeof(int) for proc_dointvec() */
1549 LASSERT(sizeof (ksocknal_data.ksnd_io_timeout) == sizeof (int));
1550 LASSERT(sizeof (ksocknal_data.ksnd_eager_ack) == sizeof (int));
1551 /* check ksnr_connected/connecting field large enough */
1552 LASSERT(SOCKNAL_CONN_NTYPES <= 4);
1554 LASSERT (ksocknal_data.ksnd_init == SOCKNAL_INIT_NOTHING);
1556 ksocknal_api.forward = ksocknal_api_forward;
1557 ksocknal_api.shutdown = ksocknal_api_shutdown;
1558 ksocknal_api.yield = ksocknal_api_yield;
1559 ksocknal_api.validate = NULL; /* our api validate is a NOOP */
1560 ksocknal_api.lock = ksocknal_api_lock;
1561 ksocknal_api.unlock = ksocknal_api_unlock;
1562 ksocknal_api.nal_data = &ksocknal_data;
1564 ksocknal_lib.nal_data = &ksocknal_data;
1566 memset (&ksocknal_data, 0, sizeof (ksocknal_data)); /* zero pointers */
1568 ksocknal_data.ksnd_io_timeout = SOCKNAL_IO_TIMEOUT;
1569 ksocknal_data.ksnd_eager_ack = SOCKNAL_EAGER_ACK;
1570 ksocknal_data.ksnd_typed_conns = SOCKNAL_TYPED_CONNS;
1571 ksocknal_data.ksnd_min_bulk = SOCKNAL_MIN_BULK;
1573 ksocknal_data.ksnd_zc_min_frag = SOCKNAL_ZC_MIN_FRAG;
1575 ksocknal_init_incarnation();
1577 ksocknal_data.ksnd_peer_hash_size = SOCKNAL_PEER_HASH_SIZE;
1578 PORTAL_ALLOC (ksocknal_data.ksnd_peers,
1579 sizeof (struct list_head) * ksocknal_data.ksnd_peer_hash_size);
1580 if (ksocknal_data.ksnd_peers == NULL)
1583 for (i = 0; i < ksocknal_data.ksnd_peer_hash_size; i++)
1584 INIT_LIST_HEAD(&ksocknal_data.ksnd_peers[i]);
1586 rwlock_init(&ksocknal_data.ksnd_global_lock);
1588 ksocknal_data.ksnd_nal_cb = &ksocknal_lib;
1589 spin_lock_init (&ksocknal_data.ksnd_nal_cb_lock);
1591 spin_lock_init(&ksocknal_data.ksnd_small_fmp.fmp_lock);
1592 INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_idle_fmbs);
1593 INIT_LIST_HEAD(&ksocknal_data.ksnd_small_fmp.fmp_blocked_conns);
1595 spin_lock_init(&ksocknal_data.ksnd_large_fmp.fmp_lock);
1596 INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_idle_fmbs);
1597 INIT_LIST_HEAD(&ksocknal_data.ksnd_large_fmp.fmp_blocked_conns);
1599 spin_lock_init (&ksocknal_data.ksnd_reaper_lock);
1600 INIT_LIST_HEAD (&ksocknal_data.ksnd_enomem_conns);
1601 INIT_LIST_HEAD (&ksocknal_data.ksnd_zombie_conns);
1602 INIT_LIST_HEAD (&ksocknal_data.ksnd_deathrow_conns);
1603 init_waitqueue_head(&ksocknal_data.ksnd_reaper_waitq);
1605 spin_lock_init (&ksocknal_data.ksnd_autoconnectd_lock);
1606 INIT_LIST_HEAD (&ksocknal_data.ksnd_autoconnectd_routes);
1607 init_waitqueue_head(&ksocknal_data.ksnd_autoconnectd_waitq);
1609 /* NB memset above zeros whole of ksocknal_data, including
1610 * ksocknal_data.ksnd_irqinfo[all].ksni_valid */
1612 /* flag lists/ptrs/locks initialised */
1613 ksocknal_data.ksnd_init = SOCKNAL_INIT_DATA;
1615 PORTAL_ALLOC(ksocknal_data.ksnd_schedulers,
1616 sizeof(ksock_sched_t) * SOCKNAL_N_SCHED);
1617 if (ksocknal_data.ksnd_schedulers == NULL) {
1618 ksocknal_module_fini ();
1622 for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1623 ksock_sched_t *kss = &ksocknal_data.ksnd_schedulers[i];
1625 spin_lock_init (&kss->kss_lock);
1626 INIT_LIST_HEAD (&kss->kss_rx_conns);
1627 INIT_LIST_HEAD (&kss->kss_tx_conns);
1629 INIT_LIST_HEAD (&kss->kss_zctxdone_list);
1631 init_waitqueue_head (&kss->kss_waitq);
1634 rc = PtlNIInit(ksocknal_init, 32, 4, 0, &ksocknal_ni);
1636 CERROR("ksocknal: PtlNIInit failed: error %d\n", rc);
1637 ksocknal_module_fini ();
1640 PtlNIDebug(ksocknal_ni, ~0);
1642 ksocknal_data.ksnd_init = SOCKNAL_INIT_PTL; // flag PtlNIInit() called
1644 for (i = 0; i < SOCKNAL_N_SCHED; i++) {
1645 rc = ksocknal_thread_start (ksocknal_scheduler,
1646 &ksocknal_data.ksnd_schedulers[i]);
1648 CERROR("Can't spawn socknal scheduler[%d]: %d\n",
1650 ksocknal_module_fini ();
1655 for (i = 0; i < SOCKNAL_N_AUTOCONNECTD; i++) {
1656 rc = ksocknal_thread_start (ksocknal_autoconnectd, (void *)((long)i));
1658 CERROR("Can't spawn socknal autoconnectd: %d\n", rc);
1659 ksocknal_module_fini ();
1664 rc = ksocknal_thread_start (ksocknal_reaper, NULL);
1666 CERROR ("Can't spawn socknal reaper: %d\n", rc);
1667 ksocknal_module_fini ();
1671 rc = kpr_register(&ksocknal_data.ksnd_router,
1672 &ksocknal_router_interface);
1674 CDEBUG(D_NET, "Can't initialise routing interface "
1675 "(rc = %d): not routing\n", rc);
1677 /* Only allocate forwarding buffers if I'm on a gateway */
1679 for (i = 0; i < (SOCKNAL_SMALL_FWD_NMSGS +
1680 SOCKNAL_LARGE_FWD_NMSGS); i++) {
1683 PORTAL_ALLOC(fmb, sizeof(*fmb));
1685 ksocknal_module_fini();
1689 if (i < SOCKNAL_SMALL_FWD_NMSGS) {
1690 fmb->fmb_npages = SOCKNAL_SMALL_FWD_PAGES;
1691 fmb->fmb_pool = &ksocknal_data.ksnd_small_fmp;
1693 fmb->fmb_npages = SOCKNAL_LARGE_FWD_PAGES;
1694 fmb->fmb_pool = &ksocknal_data.ksnd_large_fmp;
1697 for (j = 0; j < fmb->fmb_npages; j++) {
1698 fmb->fmb_pages[j] = alloc_page(GFP_KERNEL);
1700 if (fmb->fmb_pages[j] == NULL) {
1701 ksocknal_module_fini ();
1705 LASSERT(page_address(fmb->fmb_pages[j]) != NULL);
1708 list_add(&fmb->fmb_list, &fmb->fmb_pool->fmp_idle_fmbs);
1712 rc = kportal_nal_register(SOCKNAL, &ksocknal_cmd, NULL);
1714 CERROR ("Can't initialise command interface (rc = %d)\n", rc);
1715 ksocknal_module_fini ();
1719 PORTAL_SYMBOL_REGISTER(ksocknal_ni);
1721 #ifdef CONFIG_SYSCTL
1722 /* Press on regardless even if registering sysctl doesn't work */
1723 ksocknal_data.ksnd_sysctl = register_sysctl_table (ksocknal_top_ctl_table, 0);
1725 /* flag everything initialised */
1726 ksocknal_data.ksnd_init = SOCKNAL_INIT_ALL;
1728 printk(KERN_INFO "Lustre: Routing socket NAL loaded "
1729 "(Routing %s, initial mem %d)\n",
1730 kpr_routing (&ksocknal_data.ksnd_router) ?
1731 "enabled" : "disabled", pkmem);
1736 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1737 MODULE_DESCRIPTION("Kernel TCP Socket NAL v0.01");
1738 MODULE_LICENSE("GPL");
1740 module_init(ksocknal_module_init);
1741 module_exit(ksocknal_module_fini);
1743 EXPORT_SYMBOL (ksocknal_ni);