1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5 * Author: Zach Brown <zab@zabbo.net>
6 * Author: Peter J. Braam <braam@clusterfs.com>
7 * Author: Phil Schwan <phil@clusterfs.com>
8 * Author: Eric Barton <eric@bartonsoftware.com>
10 * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
12 * Portals is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Portals is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Portals; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
28 int ksocknal_io_timeout = SOCKNAL_IO_TIMEOUT;
30 int ksocknal_do_zc = 1;
31 int ksocknal_zc_min_frag = SOCKNAL_ZC_MIN_FRAG;
35 * LIB functions follow
39 ksocknal_read(nal_cb_t *nal, void *private, void *dst_addr,
40 user_ptr src_addr, size_t len)
42 CDEBUG(D_NET, LPX64": reading %ld bytes from %p -> %p\n",
43 nal->ni.nid, (long)len, src_addr, dst_addr);
45 memcpy( dst_addr, src_addr, len );
50 ksocknal_write(nal_cb_t *nal, void *private, user_ptr dst_addr,
51 void *src_addr, size_t len)
53 CDEBUG(D_NET, LPX64": writing %ld bytes from %p -> %p\n",
54 nal->ni.nid, (long)len, src_addr, dst_addr);
56 memcpy( dst_addr, src_addr, len );
61 ksocknal_callback (nal_cb_t * nal, void *private, lib_eq_t *eq,
64 CDEBUG(D_NET, LPX64": callback eq %p ev %p\n",
67 if (eq->event_callback != NULL)
68 eq->event_callback(ev);
74 ksocknal_malloc(nal_cb_t *nal, size_t len)
78 PORTAL_ALLOC(buf, len);
87 ksocknal_free(nal_cb_t *nal, void *buf, size_t len)
89 PORTAL_FREE(buf, len);
93 ksocknal_printf(nal_cb_t *nal, const char *fmt, ...)
99 vsnprintf (msg, sizeof (msg), fmt, ap); /* sprint safely */
102 msg[sizeof (msg) - 1] = 0; /* ensure terminated */
104 CDEBUG (D_NET, "%s", msg);
108 ksocknal_cli(nal_cb_t *nal, unsigned long *flags)
110 ksock_nal_data_t *data = nal->nal_data;
112 spin_lock(&data->ksnd_nal_cb_lock);
116 ksocknal_sti(nal_cb_t *nal, unsigned long *flags)
118 ksock_nal_data_t *data;
119 data = nal->nal_data;
121 spin_unlock(&data->ksnd_nal_cb_lock);
125 ksocknal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
127 /* I would guess that if ksocknal_get_peer (nid) == NULL,
128 and we're not routing, then 'nid' is very distant :) */
129 if ( nal->ni.nid == nid ) {
139 ksocknal_get_ltx (int may_block)
142 ksock_ltx_t *ltx = NULL;
145 spin_lock_irqsave (&ksocknal_data.ksnd_idle_ltx_lock, flags);
147 if (!list_empty (&ksocknal_data.ksnd_idle_ltx_list)) {
148 ltx = list_entry(ksocknal_data.ksnd_idle_ltx_list.next,
149 ksock_ltx_t, ltx_tx.tx_list);
150 list_del (<x->ltx_tx.tx_list);
151 ksocknal_data.ksnd_active_ltxs++;
156 if (!list_empty(&ksocknal_data.ksnd_idle_nblk_ltx_list)) {
157 ltx = list_entry(ksocknal_data.ksnd_idle_nblk_ltx_list.next,
158 ksock_ltx_t, ltx_tx.tx_list);
159 list_del (<x->ltx_tx.tx_list);
160 ksocknal_data.ksnd_active_ltxs++;
165 spin_unlock_irqrestore(&ksocknal_data.ksnd_idle_ltx_lock,
168 wait_event (ksocknal_data.ksnd_idle_ltx_waitq,
169 !list_empty (&ksocknal_data.ksnd_idle_ltx_list));
172 spin_unlock_irqrestore (&ksocknal_data.ksnd_idle_ltx_lock, flags);
178 ksocknal_put_ltx (ksock_ltx_t *ltx)
182 spin_lock_irqsave (&ksocknal_data.ksnd_idle_ltx_lock, flags);
184 ksocknal_data.ksnd_active_ltxs--;
185 list_add_tail (<x->ltx_tx.tx_list, ltx->ltx_idle);
187 /* normal tx desc => wakeup anyone blocking for one */
188 if (ltx->ltx_idle == &ksocknal_data.ksnd_idle_ltx_list &&
189 waitqueue_active (&ksocknal_data.ksnd_idle_ltx_waitq))
190 wake_up (&ksocknal_data.ksnd_idle_ltx_waitq);
192 spin_unlock_irqrestore (&ksocknal_data.ksnd_idle_ltx_lock, flags);
197 ksocknal_kvaddr_to_page (unsigned long vaddr)
201 if (vaddr >= VMALLOC_START &&
203 page = vmalloc_to_page ((void *)vaddr);
205 else if (vaddr >= PKMAP_BASE &&
206 vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
207 page = vmalloc_to_page ((void *)vaddr);
208 /* in 2.4 ^ just walks the page tables */
211 page = virt_to_page (vaddr);
222 ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
224 struct socket *sock = conn->ksnc_sock;
225 struct iovec *iov = tx->tx_iov;
226 int fragsize = iov->iov_len;
227 unsigned long vaddr = (unsigned long)iov->iov_base;
228 int more = !list_empty (&conn->ksnc_tx_queue) |
232 int offset = vaddr & (PAGE_SIZE - 1);
233 int zcsize = MIN (fragsize, PAGE_SIZE - offset);
238 /* NB we can't trust socket ops to either consume our iovs
239 * or leave them alone, so we only send 1 frag at a time. */
240 LASSERT (fragsize <= tx->tx_resid);
241 LASSERT (tx->tx_niov > 0);
244 if (ksocknal_do_zc &&
245 (sock->sk->route_caps & NETIF_F_SG) &&
246 (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
247 zcsize >= ksocknal_zc_min_frag &&
248 (page = ksocknal_kvaddr_to_page (vaddr)) != NULL) {
250 CDEBUG(D_NET, "vaddr %p, page %p->%p + offset %x for %d\n",
251 (void *)vaddr, page, page_address(page), offset, zcsize);
253 if (fragsize > zcsize) {
258 rc = tcp_sendpage_zccd(sock, page, offset, zcsize,
259 more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
264 /* NB don't pass tx's iov; sendmsg may or may not update it */
265 struct iovec fragiov = { .iov_base = (void *)vaddr,
266 .iov_len = fragsize};
267 struct msghdr msg = {
274 .msg_flags = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
276 mm_segment_t oldmm = get_fs();
279 rc = sock_sendmsg(sock, &msg, fragsize);
288 if (rc < iov->iov_len) {
289 /* didn't send whole iov entry... */
290 iov->iov_base = (void *)(vaddr + rc);
292 /* ...but did we send everything we tried to send? */
293 return ((rc == fragsize) ? 1 : -EAGAIN);
302 ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
304 struct socket *sock = conn->ksnc_sock;
305 ptl_kiov_t *kiov = tx->tx_kiov;
306 int fragsize = kiov->kiov_len;
307 struct page *page = kiov->kiov_page;
308 int offset = kiov->kiov_offset;
309 int more = !list_empty (&conn->ksnc_tx_queue) |
313 /* NB we can't trust socket ops to either consume our iovs
314 * or leave them alone, so we only send 1 frag at a time. */
315 LASSERT (fragsize <= tx->tx_resid);
316 LASSERT (offset + fragsize <= PAGE_SIZE);
317 LASSERT (tx->tx_niov == 0);
318 LASSERT (tx->tx_nkiov > 0);
321 if (ksocknal_do_zc &&
322 (sock->sk->route_caps & NETIF_F_SG) &&
323 (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
324 fragsize >= ksocknal_zc_min_frag) {
326 CDEBUG(D_NET, "page %p + offset %x for %d\n",
327 page, offset, fragsize);
329 rc = tcp_sendpage_zccd(sock, page, offset, fragsize,
330 more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
335 char *addr = ((char *)kmap (page)) + offset;
336 struct iovec fragiov = {.iov_base = addr,
337 .iov_len = fragsize};
338 struct msghdr msg = {
345 .msg_flags = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
347 mm_segment_t oldmm = get_fs();
350 rc = sock_sendmsg(sock, &msg, fragsize);
361 /* didn't send whole frag */
362 kiov->kiov_offset = offset + rc;
363 kiov->kiov_len = fragsize - rc;
367 /* everything went */
368 LASSERT (rc == fragsize);
375 ksocknal_sendmsg (ksock_conn_t *conn, ksock_tx_t *tx)
377 /* Return 0 on success, < 0 on error.
378 * caller checks tx_resid to determine progress/completion */
382 if (ksocknal_data.ksnd_stall_tx != 0) {
383 set_current_state (TASK_UNINTERRUPTIBLE);
384 schedule_timeout (ksocknal_data.ksnd_stall_tx * HZ);
387 rc = ksocknal_getconnsock (conn);
392 LASSERT (tx->tx_resid != 0);
394 if (conn->ksnc_closing) {
399 if (tx->tx_niov != 0)
400 rc = ksocknal_send_iov (conn, tx);
402 rc = ksocknal_send_kiov (conn, tx);
404 if (rc <= 0) { /* error or socket full? */
405 /* NB: rc == 0 and rc == -EAGAIN both mean try
406 * again later (linux stack returns -EAGAIN for
407 * this, but Adaptech TOE returns 0) */
413 if (tx->tx_resid == 0) { /* sent everything */
419 ksocknal_putconnsock (conn);
424 ksocknal_recv_iov (ksock_conn_t *conn)
426 struct iovec *iov = conn->ksnc_rx_iov;
427 int fragsize = iov->iov_len;
428 unsigned long vaddr = (unsigned long)iov->iov_base;
429 struct iovec fragiov = { .iov_base = (void *)vaddr,
430 .iov_len = fragsize};
431 struct msghdr msg = {
440 mm_segment_t oldmm = get_fs();
443 /* NB we can't trust socket ops to either consume our iovs
444 * or leave them alone, so we only receive 1 frag at a time. */
445 LASSERT (conn->ksnc_rx_niov > 0);
446 LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
449 rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
450 /* NB this is just a boolean............................^ */
456 conn->ksnc_rx_nob_wanted -= rc;
457 conn->ksnc_rx_nob_left -= rc;
460 iov->iov_base = (void *)(vaddr + rc);
461 iov->iov_len = fragsize - rc;
466 conn->ksnc_rx_niov--;
471 ksocknal_recv_kiov (ksock_conn_t *conn)
473 ptl_kiov_t *kiov = conn->ksnc_rx_kiov;
474 struct page *page = kiov->kiov_page;
475 int offset = kiov->kiov_offset;
476 int fragsize = kiov->kiov_len;
477 unsigned long vaddr = ((unsigned long)kmap (page)) + offset;
478 struct iovec fragiov = { .iov_base = (void *)vaddr,
479 .iov_len = fragsize};
480 struct msghdr msg = {
489 mm_segment_t oldmm = get_fs();
492 /* NB we can't trust socket ops to either consume our iovs
493 * or leave them alone, so we only receive 1 frag at a time. */
494 LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
495 LASSERT (conn->ksnc_rx_nkiov > 0);
496 LASSERT (offset + fragsize <= PAGE_SIZE);
499 rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
500 /* NB this is just a boolean............................^ */
507 conn->ksnc_rx_nob_wanted -= rc;
508 conn->ksnc_rx_nob_left -= rc;
511 kiov->kiov_offset = offset + rc;
512 kiov->kiov_len = fragsize - rc;
516 conn->ksnc_rx_kiov++;
517 conn->ksnc_rx_nkiov--;
522 ksocknal_recvmsg (ksock_conn_t *conn)
524 /* Return 1 on success, 0 on EOF, < 0 on error.
525 * Caller checks ksnc_rx_nob_wanted to determine
526 * progress/completion. */
530 if (ksocknal_data.ksnd_stall_rx != 0) {
531 set_current_state (TASK_UNINTERRUPTIBLE);
532 schedule_timeout (ksocknal_data.ksnd_stall_rx * HZ);
535 rc = ksocknal_getconnsock (conn);
540 if (conn->ksnc_closing) {
545 if (conn->ksnc_rx_niov != 0)
546 rc = ksocknal_recv_iov (conn);
548 rc = ksocknal_recv_kiov (conn);
551 /* error/EOF or partial receive */
557 if (conn->ksnc_rx_nob_wanted == 0) {
563 ksocknal_putconnsock (conn);
569 ksocknal_zc_callback (zccd_t *zcd)
571 ksock_tx_t *tx = KSOCK_ZCCD_2_TX(zcd);
572 ksock_sched_t *sched = tx->tx_conn->ksnc_scheduler;
576 /* Schedule tx for cleanup (can't do it now due to lock conflicts) */
578 spin_lock_irqsave (&sched->kss_lock, flags);
580 list_del (&tx->tx_list); /* remove from kss_zctxpending_list */
581 list_add_tail (&tx->tx_list, &sched->kss_zctxdone_list);
582 if (waitqueue_active (&sched->kss_waitq))
583 wake_up (&sched->kss_waitq);
585 spin_unlock_irqrestore (&sched->kss_lock, flags);
591 ksocknal_tx_done (ksock_tx_t *tx, int asynch)
596 if (tx->tx_conn != NULL) {
597 /* This tx got queued on a conn; do the accounting... */
598 atomic_sub (tx->tx_nob, &tx->tx_conn->ksnc_tx_nob);
600 /* zero copy completion isn't always from
601 * process_transmit() so it needs to keep a ref on
604 ksocknal_put_conn (tx->tx_conn);
610 if (tx->tx_isfwd) { /* was a forwarded packet? */
611 kpr_fwd_done (&ksocknal_data.ksnd_router,
612 KSOCK_TX_2_KPR_FWD_DESC (tx), 0);
618 ltx = KSOCK_TX_2_KSOCK_LTX (tx);
620 lib_finalize (&ksocknal_lib, ltx->ltx_private, ltx->ltx_cookie);
622 ksocknal_put_ltx (ltx);
627 ksocknal_tx_launched (ksock_tx_t *tx)
630 if (atomic_read (&tx->tx_zccd.zccd_count) != 1) {
632 ksock_conn_t *conn = tx->tx_conn;
633 ksock_sched_t *sched = conn->ksnc_scheduler;
635 /* zccd skbufs are still in-flight. First take a ref on
636 * conn, so it hangs about for ksocknal_tx_done... */
637 atomic_inc (&conn->ksnc_refcount);
639 /* Stash it for timeout...
640 * NB We have to hold a lock to stash the tx, and we have
641 * stash it before we zcc_put(), but we have to _not_ hold
642 * this lock when we zcc_put(), otherwise we could deadlock
643 * if it turns out to be the last put. Aaaaarrrrggghhh! */
644 spin_lock_irqsave (&sched->kss_lock, flags);
645 list_add_tail (&tx->tx_list, &conn->ksnc_tx_pending);
646 spin_unlock_irqrestore (&sched->kss_lock, flags);
648 /* ...then drop the initial ref on zccd, so the zero copy
649 * callback can occur */
650 zccd_put (&tx->tx_zccd);
654 /* Any zero-copy-ness (if any) has completed; I can complete the
655 * transmit now, avoiding an extra schedule */
656 ksocknal_tx_done (tx, 0);
660 ksocknal_process_transmit (ksock_sched_t *sched, unsigned long *irq_flags)
666 LASSERT (!list_empty (&sched->kss_tx_conns));
667 conn = list_entry(sched->kss_tx_conns.next, ksock_conn_t, ksnc_tx_list);
668 list_del (&conn->ksnc_tx_list);
670 LASSERT (conn->ksnc_tx_scheduled);
671 LASSERT (conn->ksnc_tx_ready);
672 LASSERT (!list_empty (&conn->ksnc_tx_queue));
673 tx = list_entry (conn->ksnc_tx_queue.next, ksock_tx_t, tx_list);
674 /* assume transmit will complete now, so dequeue while I've got lock */
675 list_del (&tx->tx_list);
677 spin_unlock_irqrestore (&sched->kss_lock, *irq_flags);
679 LASSERT (tx->tx_resid > 0);
681 conn->ksnc_tx_ready = 0;/* write_space may race with me and set ready */
682 mb(); /* => clear BEFORE trying to write */
684 rc = ksocknal_sendmsg (conn, tx);
686 CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc);
689 if (ksocknal_close_conn_unlocked (conn)) {
690 /* I'm the first to close */
691 CERROR ("[%p] Error %d on write to "LPX64" ip %08x:%d\n",
692 conn, rc, conn->ksnc_peer->ksnp_nid,
693 conn->ksnc_ipaddr, conn->ksnc_port);
695 ksocknal_tx_launched (tx);
696 spin_lock_irqsave (&sched->kss_lock, *irq_flags);
698 } else if (tx->tx_resid == 0) {
700 /* everything went; assume more can go, and avoid
701 * write_space locking */
702 conn->ksnc_tx_ready = 1;
704 ksocknal_tx_launched (tx);
705 spin_lock_irqsave (&sched->kss_lock, *irq_flags);
707 spin_lock_irqsave (&sched->kss_lock, *irq_flags);
709 /* back onto HEAD of tx_queue */
710 list_add (&tx->tx_list, &conn->ksnc_tx_queue);
713 /* no space to write, or nothing to write? */
714 if (!conn->ksnc_tx_ready ||
715 list_empty (&conn->ksnc_tx_queue)) {
716 /* mark not scheduled */
717 conn->ksnc_tx_scheduled = 0;
718 /* drop scheduler's ref */
719 ksocknal_put_conn (conn);
722 list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns);
727 ksocknal_launch_autoconnect_locked (ksock_route_t *route)
731 /* called holding write lock on ksnd_global_lock */
733 LASSERT (route->ksnr_conn == NULL);
734 LASSERT (!route->ksnr_deleted && !route->ksnr_connecting);
736 route->ksnr_connecting = 1;
737 atomic_inc (&route->ksnr_refcount); /* extra ref for asynchd */
739 spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
741 list_add_tail (&route->ksnr_connect_list,
742 &ksocknal_data.ksnd_autoconnectd_routes);
744 if (waitqueue_active (&ksocknal_data.ksnd_autoconnectd_waitq))
745 wake_up (&ksocknal_data.ksnd_autoconnectd_waitq);
747 spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
751 ksocknal_find_target_peer_locked (ksock_tx_t *tx, ptl_nid_t nid)
753 ptl_nid_t target_nid;
755 ksock_peer_t *peer = ksocknal_find_peer_locked (nid);
761 CERROR ("Can't send packet to "LPX64
762 ": routed target is not a peer\n", nid);
766 rc = kpr_lookup (&ksocknal_data.ksnd_router, nid, &target_nid);
768 CERROR ("Can't route to "LPX64": router error %d\n", nid, rc);
772 peer = ksocknal_find_peer_locked (target_nid);
776 CERROR ("Can't send packet to "LPX64": no peer entry\n", target_nid);
781 ksocknal_find_conn_locked (ksock_tx_t *tx, ksock_peer_t *peer)
783 struct list_head *tmp;
784 ksock_conn_t *conn = NULL;
786 /* Find the conn with the shortest tx queue */
787 list_for_each (tmp, &peer->ksnp_conns) {
788 ksock_conn_t *c = list_entry (tmp, ksock_conn_t, ksnc_list);
790 LASSERT (!c->ksnc_closing);
793 atomic_read (&conn->ksnc_tx_nob) >
794 atomic_read (&c->ksnc_tx_nob))
802 ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
805 ksock_sched_t *sched = conn->ksnc_scheduler;
807 /* called holding global lock (read or irq-write) */
809 CDEBUG (D_NET, "Sending to "LPX64" on port %d\n",
810 conn->ksnc_peer->ksnp_nid, conn->ksnc_port);
812 atomic_add (tx->tx_nob, &conn->ksnc_tx_nob);
813 tx->tx_resid = tx->tx_nob;
817 zccd_init (&tx->tx_zccd, ksocknal_zc_callback);
818 /* NB this sets 1 ref on zccd, so the callback can only occur after
819 * I've released this ref. */
822 spin_lock_irqsave (&sched->kss_lock, flags);
824 tx->tx_deadline = jiffies_64 + ksocknal_io_timeout;
825 list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
827 if (conn->ksnc_tx_ready && /* able to send */
828 !conn->ksnc_tx_scheduled) { /* not scheduled to send */
829 /* +1 ref for scheduler */
830 atomic_inc (&conn->ksnc_refcount);
831 list_add_tail (&conn->ksnc_tx_list,
832 &sched->kss_tx_conns);
833 conn->ksnc_tx_scheduled = 1;
834 if (waitqueue_active (&sched->kss_waitq))
835 wake_up (&sched->kss_waitq);
838 spin_unlock_irqrestore (&sched->kss_lock, flags);
842 ksocknal_find_connectable_route_locked (ksock_peer_t *peer)
844 struct list_head *tmp;
845 ksock_route_t *route;
847 list_for_each (tmp, &peer->ksnp_routes) {
848 route = list_entry (tmp, ksock_route_t, ksnr_list);
850 if (route->ksnr_conn == NULL && /* not connected */
851 !route->ksnr_connecting && /* not connecting */
852 route->ksnr_timeout <= jiffies_64) /* OK to retry */
860 ksocknal_find_connecting_route_locked (ksock_peer_t *peer)
862 struct list_head *tmp;
863 ksock_route_t *route;
865 list_for_each (tmp, &peer->ksnp_routes) {
866 route = list_entry (tmp, ksock_route_t, ksnr_list);
868 if (route->ksnr_connecting)
876 ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid)
881 ksock_route_t *route;
884 /* Ensure the frags we've been given EXACTLY match the number of
885 * bytes we want to send. Many TCP/IP stacks disregard any total
886 * size parameters passed to them and just look at the frags.
888 * We always expect at least 1 mapped fragment containing the
889 * complete portals header. */
890 LASSERT (lib_iov_nob (tx->tx_niov, tx->tx_iov) +
891 lib_kiov_nob (tx->tx_nkiov, tx->tx_kiov) == tx->tx_nob);
892 LASSERT (tx->tx_niov >= 1);
893 LASSERT (tx->tx_iov[0].iov_len >= sizeof (ptl_hdr_t));
895 CDEBUG (D_NET, "packet %p type %d, nob %d niov %d nkiov %d\n",
896 tx, ((ptl_hdr_t *)tx->tx_iov[0].iov_base)->type,
897 tx->tx_nob, tx->tx_niov, tx->tx_nkiov);
899 tx->tx_conn = NULL; /* only set when assigned a conn */
901 g_lock = &ksocknal_data.ksnd_global_lock;
904 peer = ksocknal_find_target_peer_locked (tx, nid);
906 read_unlock (g_lock);
910 /* Any routes need to be connected? (need write lock if so) */
911 if (ksocknal_find_connectable_route_locked (peer) == NULL) {
912 conn = ksocknal_find_conn_locked (tx, peer);
914 ksocknal_queue_tx_locked (tx, conn);
915 read_unlock (g_lock);
920 /* need a write lock now to change peer state... */
922 atomic_inc (&peer->ksnp_refcount); /* +1 ref for me while I unlock */
923 read_unlock (g_lock);
924 write_lock_irqsave (g_lock, flags);
926 if (peer->ksnp_closing) { /* peer deleted as I blocked! */
927 write_unlock_irqrestore (g_lock, flags);
928 ksocknal_put_peer (peer);
931 ksocknal_put_peer (peer); /* drop ref I got above */
933 /* I may launch autoconnects, now we're write locked... */
934 while ((route = ksocknal_find_connectable_route_locked (peer)) != NULL)
935 ksocknal_launch_autoconnect_locked (route);
937 conn = ksocknal_find_conn_locked (tx, peer);
939 ksocknal_queue_tx_locked (tx, conn);
940 write_unlock_irqrestore (g_lock, flags);
944 if (ksocknal_find_connecting_route_locked (peer) == NULL) {
945 /* no routes actually connecting now */
946 write_unlock_irqrestore (g_lock, flags);
950 list_add_tail (&tx->tx_list, &peer->ksnp_tx_queue);
952 write_unlock_irqrestore (g_lock, flags);
957 ksocknal_setup_hdr (nal_cb_t *nal, void *private, lib_msg_t *cookie,
958 ptl_hdr_t *hdr, int type)
962 /* I may not block for a transmit descriptor if I might block the
963 * receiver, or an interrupt handler. */
964 ltx = ksocknal_get_ltx (!(type == PTL_MSG_ACK ||
965 type == PTL_MSG_REPLY ||
968 CERROR ("Can't allocate tx desc\n");
972 /* Init local send packet (storage for hdr, finalize() args) */
974 ltx->ltx_private = private;
975 ltx->ltx_cookie = cookie;
977 /* Init common ltx_tx */
978 ltx->ltx_tx.tx_isfwd = 0;
979 ltx->ltx_tx.tx_nob = sizeof (*hdr);
981 /* We always have 1 mapped frag for the header */
982 ltx->ltx_tx.tx_niov = 1;
983 ltx->ltx_tx.tx_iov = <x->ltx_iov_space.hdr;
984 ltx->ltx_tx.tx_iov[0].iov_base = <x->ltx_hdr;
985 ltx->ltx_tx.tx_iov[0].iov_len = sizeof (ltx->ltx_hdr);
987 ltx->ltx_tx.tx_kiov = NULL;
988 ltx->ltx_tx.tx_nkiov = 0;
994 ksocknal_send (nal_cb_t *nal, void *private, lib_msg_t *cookie,
995 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
996 unsigned int payload_niov, struct iovec *payload_iov,
1002 /* NB 'private' is different depending on what we're sending.
1003 * Just ignore it until we can rely on it
1007 "sending "LPSZ" bytes in %d mapped frags to nid: "LPX64
1008 " pid %d\n", payload_len, payload_niov, nid, pid);
1010 ltx = ksocknal_setup_hdr (nal, private, cookie, hdr, type);
1014 /* append the payload_iovs to the one pointing at the header */
1015 LASSERT (ltx->ltx_tx.tx_niov == 1 && ltx->ltx_tx.tx_nkiov == 0);
1016 LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1018 memcpy (ltx->ltx_tx.tx_iov + 1, payload_iov,
1019 payload_niov * sizeof (*payload_iov));
1020 ltx->ltx_tx.tx_niov = 1 + payload_niov;
1021 ltx->ltx_tx.tx_nob = sizeof (*hdr) + payload_len;
1023 rc = ksocknal_launch_packet (<x->ltx_tx, nid);
1025 ksocknal_put_ltx (ltx);
1031 ksocknal_send_pages (nal_cb_t *nal, void *private, lib_msg_t *cookie,
1032 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1033 unsigned int payload_niov, ptl_kiov_t *payload_iov, size_t payload_len)
1038 /* NB 'private' is different depending on what we're sending.
1039 * Just ignore it until we can rely on it */
1042 "sending "LPSZ" bytes in %d mapped frags to nid: "LPX64" pid %d\n",
1043 payload_len, payload_niov, nid, pid);
1045 ltx = ksocknal_setup_hdr (nal, private, cookie, hdr, type);
1049 LASSERT (ltx->ltx_tx.tx_niov == 1 && ltx->ltx_tx.tx_nkiov == 0);
1050 LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1052 ltx->ltx_tx.tx_kiov = ltx->ltx_iov_space.payload.kiov;
1053 memcpy (ltx->ltx_tx.tx_kiov, payload_iov,
1054 payload_niov * sizeof (*payload_iov));
1055 ltx->ltx_tx.tx_nkiov = payload_niov;
1056 ltx->ltx_tx.tx_nob = sizeof (*hdr) + payload_len;
1058 rc = ksocknal_launch_packet (<x->ltx_tx, nid);
1060 ksocknal_put_ltx (ltx);
1066 ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
1068 ptl_nid_t nid = fwd->kprfd_gateway_nid;
1069 ksock_tx_t *tx = (ksock_tx_t *)&fwd->kprfd_scratch;
1072 CDEBUG (D_NET, "Forwarding [%p] -> "LPX64" ("LPX64"))\n", fwd,
1073 fwd->kprfd_gateway_nid, fwd->kprfd_target_nid);
1075 /* I'm the gateway; must be the last hop */
1076 if (nid == ksocknal_lib.ni.nid)
1077 nid = fwd->kprfd_target_nid;
1079 tx->tx_isfwd = 1; /* This is a forwarding packet */
1080 tx->tx_nob = fwd->kprfd_nob;
1081 tx->tx_niov = fwd->kprfd_niov;
1082 tx->tx_iov = fwd->kprfd_iov;
1085 tx->tx_hdr = (ptl_hdr_t *)fwd->kprfd_iov[0].iov_base;
1087 rc = ksocknal_launch_packet (tx, nid);
1089 /* FIXME, could pass a better completion error */
1090 kpr_fwd_done (&ksocknal_data.ksnd_router, fwd, -EHOSTUNREACH);
1095 ksocknal_thread_start (int (*fn)(void *arg), void *arg)
1097 long pid = kernel_thread (fn, arg, 0);
1102 atomic_inc (&ksocknal_data.ksnd_nthreads);
1107 ksocknal_thread_fini (void)
1109 atomic_dec (&ksocknal_data.ksnd_nthreads);
1113 ksocknal_fmb_callback (void *arg, int error)
1115 ksock_fmb_t *fmb = (ksock_fmb_t *)arg;
1116 ksock_fmb_pool_t *fmp = fmb->fmb_pool;
1117 ptl_hdr_t *hdr = (ptl_hdr_t *) page_address(fmb->fmb_pages[0]);
1118 ksock_conn_t *conn = NULL;
1119 ksock_sched_t *sched;
1120 unsigned long flags;
1123 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
1124 NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),
1127 CDEBUG (D_NET, "routed packet from "LPX64" to "LPX64": OK\n",
1128 NTOH__u64 (hdr->src_nid), NTOH__u64 (hdr->dest_nid));
1130 spin_lock_irqsave (&fmp->fmp_lock, flags);
1132 list_add (&fmb->fmb_list, &fmp->fmp_idle_fmbs);
1134 if (!list_empty (&fmp->fmp_blocked_conns)) {
1135 conn = list_entry (fmb->fmb_pool->fmp_blocked_conns.next,
1136 ksock_conn_t, ksnc_rx_list);
1137 list_del (&conn->ksnc_rx_list);
1140 spin_unlock_irqrestore (&fmp->fmp_lock, flags);
1142 /* drop peer ref taken on init */
1143 ksocknal_put_peer (fmb->fmb_peer);
1148 CDEBUG (D_NET, "Scheduling conn %p\n", conn);
1149 LASSERT (conn->ksnc_rx_scheduled);
1150 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP);
1152 conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;
1154 sched = conn->ksnc_scheduler;
1156 spin_lock_irqsave (&sched->kss_lock, flags);
1158 list_add_tail (&conn->ksnc_rx_list, &sched->kss_rx_conns);
1160 if (waitqueue_active (&sched->kss_waitq))
1161 wake_up (&sched->kss_waitq);
1163 spin_unlock_irqrestore (&sched->kss_lock, flags);
1167 ksocknal_get_idle_fmb (ksock_conn_t *conn)
1169 int payload_nob = conn->ksnc_rx_nob_left;
1170 int packet_nob = sizeof (ptl_hdr_t) + payload_nob;
1171 unsigned long flags;
1172 ksock_fmb_pool_t *pool;
1175 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
1176 LASSERT (ksocknal_data.ksnd_fmbs != NULL);
1178 if (packet_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE)
1179 pool = &ksocknal_data.ksnd_small_fmp;
1181 pool = &ksocknal_data.ksnd_large_fmp;
1183 spin_lock_irqsave (&pool->fmp_lock, flags);
1185 if (!list_empty (&pool->fmp_idle_fmbs)) {
1186 fmb = list_entry(pool->fmp_idle_fmbs.next,
1187 ksock_fmb_t, fmb_list);
1188 list_del (&fmb->fmb_list);
1189 spin_unlock_irqrestore (&pool->fmp_lock, flags);
1194 /* deschedule until fmb free */
1196 conn->ksnc_rx_state = SOCKNAL_RX_FMB_SLEEP;
1198 list_add_tail (&conn->ksnc_rx_list,
1199 &pool->fmp_blocked_conns);
1201 spin_unlock_irqrestore (&pool->fmp_lock, flags);
1206 ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
1208 int payload_nob = conn->ksnc_rx_nob_left;
1209 int packet_nob = sizeof (ptl_hdr_t) + payload_nob;
1210 ptl_nid_t dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
1211 int niov; /* at least the header */
1214 LASSERT (conn->ksnc_rx_scheduled);
1215 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
1216 LASSERT (conn->ksnc_rx_nob_wanted == conn->ksnc_rx_nob_left);
1217 LASSERT (payload_nob >= 0);
1218 LASSERT (packet_nob <= fmb->fmb_npages * PAGE_SIZE);
1219 LASSERT (sizeof (ptl_hdr_t) < PAGE_SIZE);
1221 /* Got a forwarding buffer; copy the header we just read into the
1222 * forwarding buffer. If there's payload, start reading reading it
1223 * into the buffer, otherwise the forwarding buffer can be kicked
1226 * NB fmb->fmb_iov spans the WHOLE packet.
1227 * conn->ksnc_rx_iov spans just the payload.
1229 fmb->fmb_iov[0].iov_base = page_address (fmb->fmb_pages[0]);
1232 memcpy (fmb->fmb_iov[0].iov_base, &conn->ksnc_hdr, sizeof (ptl_hdr_t));
1234 /* Take a ref on the conn's peer to prevent module unload before
1235 * forwarding completes. NB we ref peer and not conn since because
1236 * all refs on conn after it has been closed must remove themselves
1238 fmb->fmb_peer = conn->ksnc_peer;
1239 atomic_inc (&conn->ksnc_peer->ksnp_refcount);
1241 if (payload_nob == 0) { /* got complete packet already */
1242 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (immediate)\n",
1243 conn, NTOH__u64 (conn->ksnc_hdr.src_nid),
1244 dest_nid, packet_nob);
1246 fmb->fmb_iov[0].iov_len = sizeof (ptl_hdr_t);
1248 kpr_fwd_init (&fmb->fmb_fwd, dest_nid,
1249 packet_nob, 1, fmb->fmb_iov,
1250 ksocknal_fmb_callback, fmb);
1252 /* forward it now */
1253 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
1255 ksocknal_new_packet (conn, 0); /* on to next packet */
1260 if (packet_nob <= PAGE_SIZE) { /* whole packet fits in first page */
1261 fmb->fmb_iov[0].iov_len = packet_nob;
1263 fmb->fmb_iov[0].iov_len = PAGE_SIZE;
1264 nob = packet_nob - PAGE_SIZE;
1267 LASSERT (niov < fmb->fmb_npages);
1268 fmb->fmb_iov[niov].iov_base =
1269 page_address (fmb->fmb_pages[niov]);
1270 fmb->fmb_iov[niov].iov_len = MIN (PAGE_SIZE, nob);
1276 kpr_fwd_init (&fmb->fmb_fwd, dest_nid,
1277 packet_nob, niov, fmb->fmb_iov,
1278 ksocknal_fmb_callback, fmb);
1280 conn->ksnc_cookie = fmb; /* stash fmb for later */
1281 conn->ksnc_rx_state = SOCKNAL_RX_BODY_FWD; /* read in the payload */
1282 conn->ksnc_rx_deadline = jiffies_64 + ksocknal_io_timeout; /* start timeout */
1284 /* payload is desc's iov-ed buffer, but skipping the hdr */
1285 LASSERT (niov <= sizeof (conn->ksnc_rx_iov_space) /
1286 sizeof (struct iovec));
1288 conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1289 conn->ksnc_rx_iov[0].iov_base =
1290 (void *)(((unsigned long)fmb->fmb_iov[0].iov_base) +
1291 sizeof (ptl_hdr_t));
1292 conn->ksnc_rx_iov[0].iov_len =
1293 fmb->fmb_iov[0].iov_len - sizeof (ptl_hdr_t);
1296 memcpy(&conn->ksnc_rx_iov[1], &fmb->fmb_iov[1],
1297 (niov - 1) * sizeof (struct iovec));
1299 conn->ksnc_rx_niov = niov;
1301 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d reading body\n", conn,
1302 NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid, payload_nob);
1307 ksocknal_fwd_parse (ksock_conn_t *conn)
1310 ptl_nid_t dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
1311 int body_len = NTOH__u32 (PTL_HDR_LENGTH(&conn->ksnc_hdr));
1313 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d parsing header\n", conn,
1314 NTOH__u64 (conn->ksnc_hdr.src_nid),
1315 dest_nid, conn->ksnc_rx_nob_left);
1317 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER);
1318 LASSERT (conn->ksnc_rx_scheduled);
1320 if (body_len < 0) { /* length corrupt (overflow) */
1321 CERROR("dropping packet from "LPX64" for "LPX64": packet "
1322 "size %d illegal\n", NTOH__u64 (conn->ksnc_hdr.src_nid),
1323 dest_nid, body_len);
1325 ksocknal_new_packet (conn, 0); /* on to new packet */
1326 ksocknal_close_conn_unlocked (conn); /* give up on conn */
1330 if (ksocknal_data.ksnd_fmbs == NULL) { /* not forwarding */
1331 CERROR("dropping packet from "LPX64" for "LPX64": not "
1332 "forwarding\n", conn->ksnc_hdr.src_nid,
1333 conn->ksnc_hdr.dest_nid);
1334 /* on to new packet (skip this one's body) */
1335 ksocknal_new_packet (conn, body_len);
1339 if (body_len > SOCKNAL_MAX_FWD_PAYLOAD) { /* too big to forward */
1340 CERROR ("dropping packet from "LPX64" for "LPX64
1341 ": packet size %d too big\n", conn->ksnc_hdr.src_nid,
1342 conn->ksnc_hdr.dest_nid, body_len);
1343 /* on to new packet (skip this one's body) */
1344 ksocknal_new_packet (conn, body_len);
1348 /* should have gone direct */
1349 peer = ksocknal_get_peer (conn->ksnc_hdr.dest_nid);
1351 CERROR ("dropping packet from "LPX64" for "LPX64
1352 ": target is a peer\n", conn->ksnc_hdr.src_nid,
1353 conn->ksnc_hdr.dest_nid);
1354 ksocknal_put_peer (peer); /* drop ref from get above */
1356 /* on to next packet (skip this one's body) */
1357 ksocknal_new_packet (conn, body_len);
1361 conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB; /* Getting FMB now */
1362 conn->ksnc_rx_nob_left = body_len; /* stash packet size */
1363 conn->ksnc_rx_nob_wanted = body_len; /* (no slop) */
1367 ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip)
1369 static char ksocknal_slop_buffer[4096];
1375 if (nob_to_skip == 0) { /* right at next packet boundary now */
1376 conn->ksnc_rx_state = SOCKNAL_RX_HEADER;
1377 conn->ksnc_rx_nob_wanted = sizeof (ptl_hdr_t);
1378 conn->ksnc_rx_nob_left = sizeof (ptl_hdr_t);
1380 conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1381 conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_hdr;
1382 conn->ksnc_rx_iov[0].iov_len = sizeof (ptl_hdr_t);
1383 conn->ksnc_rx_niov = 1;
1385 conn->ksnc_rx_kiov = NULL;
1386 conn->ksnc_rx_nkiov = 0;
1390 /* Set up to skip as much a possible now. If there's more left
1391 * (ran out of iov entries) we'll get called again */
1393 conn->ksnc_rx_state = SOCKNAL_RX_SLOP;
1394 conn->ksnc_rx_nob_left = nob_to_skip;
1395 conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1400 nob = MIN (nob_to_skip, sizeof (ksocknal_slop_buffer));
1402 conn->ksnc_rx_iov[niov].iov_base = ksocknal_slop_buffer;
1403 conn->ksnc_rx_iov[niov].iov_len = nob;
1408 } while (nob_to_skip != 0 && /* mustn't overflow conn's rx iov */
1409 niov < sizeof(conn->ksnc_rx_iov_space) / sizeof (struct iovec));
1411 conn->ksnc_rx_niov = niov;
1412 conn->ksnc_rx_kiov = NULL;
1413 conn->ksnc_rx_nkiov = 0;
1414 conn->ksnc_rx_nob_wanted = skipped;
1419 ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
1425 /* NB: sched->ksnc_lock lock held */
1427 LASSERT (!list_empty (&sched->kss_rx_conns));
1428 conn = list_entry(sched->kss_rx_conns.next, ksock_conn_t, ksnc_rx_list);
1429 list_del (&conn->ksnc_rx_list);
1431 spin_unlock_irqrestore (&sched->kss_lock, *irq_flags);
1433 CDEBUG(D_NET, "sched %p conn %p\n", sched, conn);
1434 LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
1435 LASSERT (conn->ksnc_rx_scheduled);
1436 LASSERT (conn->ksnc_rx_ready);
1438 /* doesn't need a forwarding buffer */
1439 if (conn->ksnc_rx_state != SOCKNAL_RX_GET_FMB)
1443 fmb = ksocknal_get_idle_fmb (conn);
1444 if (fmb == NULL) { /* conn descheduled waiting for idle fmb */
1445 spin_lock_irqsave (&sched->kss_lock, *irq_flags);
1449 if (ksocknal_init_fmb (conn, fmb)) /* packet forwarded ? */
1450 goto out; /* come back later for next packet */
1453 /* NB: sched lock NOT held */
1454 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER ||
1455 conn->ksnc_rx_state == SOCKNAL_RX_BODY ||
1456 conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD ||
1457 conn->ksnc_rx_state == SOCKNAL_RX_SLOP);
1459 LASSERT (conn->ksnc_rx_nob_wanted > 0);
1461 conn->ksnc_rx_ready = 0;/* data ready may race with me and set ready */
1462 mb(); /* => clear BEFORE trying to read */
1464 rc = ksocknal_recvmsg(conn);
1467 if (ksocknal_close_conn_unlocked (conn)) {
1468 /* I'm the first to close */
1470 CERROR ("[%p] Error %d on read from "LPX64" ip %08x:%d\n",
1471 conn, rc, conn->ksnc_peer->ksnp_nid,
1472 conn->ksnc_ipaddr, conn->ksnc_port);
1474 CERROR ("[%p] EOF from "LPX64" ip %08x:%d\n",
1475 conn, conn->ksnc_peer->ksnp_nid,
1476 conn->ksnc_ipaddr, conn->ksnc_port);
1481 if (conn->ksnc_rx_nob_wanted != 0) /* short read */
1482 goto out; /* try again later */
1484 /* got all I wanted, assume there's more - prevent data_ready locking */
1485 conn->ksnc_rx_ready = 1;
1487 switch (conn->ksnc_rx_state) {
1488 case SOCKNAL_RX_HEADER:
1489 if (conn->ksnc_hdr.type != HTON__u32(PTL_MSG_HELLO) &&
1490 NTOH__u64(conn->ksnc_hdr.dest_nid) != ksocknal_lib.ni.nid) {
1491 /* This packet isn't for me */
1492 ksocknal_fwd_parse (conn);
1493 switch (conn->ksnc_rx_state) {
1494 case SOCKNAL_RX_HEADER: /* skipped (zero payload) */
1495 goto out; /* => come back later */
1496 case SOCKNAL_RX_SLOP: /* skipping packet's body */
1497 goto try_read; /* => go read it */
1498 case SOCKNAL_RX_GET_FMB: /* forwarding */
1499 goto get_fmb; /* => go get a fwd msg buffer */
1506 /* sets wanted_len, iovs etc */
1507 lib_parse(&ksocknal_lib, &conn->ksnc_hdr, conn);
1509 /* start timeout (lib is waiting for finalize) */
1510 conn->ksnc_rx_deadline = jiffies_64 + ksocknal_io_timeout;
1512 if (conn->ksnc_rx_nob_wanted != 0) { /* need to get payload? */
1513 conn->ksnc_rx_state = SOCKNAL_RX_BODY;
1514 goto try_read; /* go read the payload */
1516 /* Fall through (completed packet for me) */
1518 case SOCKNAL_RX_BODY:
1519 /* payload all received */
1520 conn->ksnc_rx_deadline = 0; /* cancel timeout */
1521 lib_finalize(&ksocknal_lib, NULL, conn->ksnc_cookie);
1524 case SOCKNAL_RX_SLOP:
1525 /* starting new packet? */
1526 if (ksocknal_new_packet (conn, conn->ksnc_rx_nob_left))
1527 goto out; /* come back later */
1528 goto try_read; /* try to finish reading slop now */
1530 case SOCKNAL_RX_BODY_FWD:
1531 /* payload all received */
1532 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (got body)\n",
1533 conn, NTOH__u64 (conn->ksnc_hdr.src_nid),
1534 NTOH__u64 (conn->ksnc_hdr.dest_nid),
1535 conn->ksnc_rx_nob_left);
1537 /* cancel timeout (only needed it while fmb allocated) */
1538 conn->ksnc_rx_deadline = 0;
1540 /* forward the packet. NB ksocknal_init_fmb() put fmb into
1541 * conn->ksnc_cookie */
1542 fmb = (ksock_fmb_t *)conn->ksnc_cookie;
1543 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
1545 /* no slop in forwarded packets */
1546 LASSERT (conn->ksnc_rx_nob_left == 0);
1548 ksocknal_new_packet (conn, 0); /* on to next packet */
1549 goto out; /* (later) */
1559 spin_lock_irqsave (&sched->kss_lock, *irq_flags);
1561 /* no data there to read? */
1562 if (!conn->ksnc_rx_ready) {
1563 /* let socket callback schedule again */
1564 conn->ksnc_rx_scheduled = 0;
1565 /* drop scheduler's ref */
1566 ksocknal_put_conn (conn);
1568 /* stay scheduled */
1569 list_add_tail (&conn->ksnc_rx_list, &sched->kss_rx_conns);
1574 ksocknal_recv (nal_cb_t *nal, void *private, lib_msg_t *msg,
1575 unsigned int niov, struct iovec *iov, size_t mlen, size_t rlen)
1577 ksock_conn_t *conn = (ksock_conn_t *)private;
1579 LASSERT (mlen <= rlen);
1580 LASSERT (niov <= PTL_MD_MAX_IOV);
1582 conn->ksnc_cookie = msg;
1583 conn->ksnc_rx_nob_wanted = mlen;
1584 conn->ksnc_rx_nob_left = rlen;
1586 conn->ksnc_rx_nkiov = 0;
1587 conn->ksnc_rx_kiov = NULL;
1588 conn->ksnc_rx_niov = niov;
1589 conn->ksnc_rx_iov = conn->ksnc_rx_iov_space.iov;
1590 memcpy (conn->ksnc_rx_iov, iov, niov * sizeof (*iov));
1593 lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1594 lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1600 ksocknal_recv_pages (nal_cb_t *nal, void *private, lib_msg_t *msg,
1601 unsigned int niov, ptl_kiov_t *kiov, size_t mlen, size_t rlen)
1603 ksock_conn_t *conn = (ksock_conn_t *)private;
1605 LASSERT (mlen <= rlen);
1606 LASSERT (niov <= PTL_MD_MAX_IOV);
1608 conn->ksnc_cookie = msg;
1609 conn->ksnc_rx_nob_wanted = mlen;
1610 conn->ksnc_rx_nob_left = rlen;
1612 conn->ksnc_rx_niov = 0;
1613 conn->ksnc_rx_iov = NULL;
1614 conn->ksnc_rx_nkiov = niov;
1615 conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
1616 memcpy (conn->ksnc_rx_kiov, kiov, niov * sizeof (*kiov));
1619 lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1620 lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1625 int ksocknal_scheduler (void *arg)
1627 ksock_sched_t *sched = (ksock_sched_t *)arg;
1628 unsigned long flags;
1631 int id = sched - ksocknal_data.ksnd_schedulers;
1634 snprintf (name, sizeof (name),"ksocknald[%d]", id);
1635 kportal_daemonize (name);
1636 kportal_blockallsigs ();
1638 #if (CONFIG_SMP && CPU_AFFINITY)
1639 if ((cpu_online_map & (1 << id)) != 0)
1640 current->cpus_allowed = (1 << id);
1642 CERROR ("Can't set CPU affinity for %s\n", name);
1643 #endif /* CONFIG_SMP && CPU_AFFINITY */
1645 spin_lock_irqsave (&sched->kss_lock, flags);
1647 while (!ksocknal_data.ksnd_shuttingdown) {
1648 int did_something = 0;
1650 /* Ensure I progress everything semi-fairly */
1652 if (!list_empty (&sched->kss_rx_conns)) {
1654 /* drops & regains kss_lock */
1655 ksocknal_process_receive (sched, &flags);
1658 if (!list_empty (&sched->kss_tx_conns)) {
1660 /* drops and regains kss_lock */
1661 ksocknal_process_transmit (sched, &flags);
1664 if (!list_empty (&sched->kss_zctxdone_list)) {
1666 list_entry(sched->kss_zctxdone_list.next,
1667 ksock_tx_t, tx_list);
1670 list_del (&tx->tx_list);
1671 spin_unlock_irqrestore (&sched->kss_lock, flags);
1673 ksocknal_tx_done (tx, 1);
1675 spin_lock_irqsave (&sched->kss_lock, flags);
1678 if (!did_something || /* nothing to do */
1679 ++nloops == SOCKNAL_RESCHED) { /* hogging CPU? */
1680 spin_unlock_irqrestore (&sched->kss_lock, flags);
1684 if (!did_something) { /* wait for something to do */
1686 rc = wait_event_interruptible (sched->kss_waitq,
1687 ksocknal_data.ksnd_shuttingdown ||
1688 !list_empty(&sched->kss_rx_conns) ||
1689 !list_empty(&sched->kss_tx_conns) ||
1690 !list_empty(&sched->kss_zctxdone_list));
1692 rc = wait_event_interruptible (sched->kss_waitq,
1693 ksocknal_data.ksnd_shuttingdown ||
1694 !list_empty(&sched->kss_rx_conns) ||
1695 !list_empty(&sched->kss_tx_conns));
1701 spin_lock_irqsave (&sched->kss_lock, flags);
1705 spin_unlock_irqrestore (&sched->kss_lock, flags);
1706 ksocknal_thread_fini ();
1711 ksocknal_data_ready (struct sock *sk, int n)
1713 unsigned long flags;
1715 ksock_sched_t *sched;
1718 /* interleave correctly with closing sockets... */
1719 read_lock (&ksocknal_data.ksnd_global_lock);
1721 conn = sk->sk_user_data;
1722 if (conn == NULL) { /* raced with ksocknal_close_sock */
1723 LASSERT (sk->sk_data_ready != &ksocknal_data_ready);
1724 sk->sk_data_ready (sk, n);
1725 } else if (!conn->ksnc_rx_ready) { /* new news */
1726 /* Set ASAP in case of concurrent calls to me */
1727 conn->ksnc_rx_ready = 1;
1729 sched = conn->ksnc_scheduler;
1731 spin_lock_irqsave (&sched->kss_lock, flags);
1733 /* Set again (process_receive may have cleared while I blocked for the lock) */
1734 conn->ksnc_rx_ready = 1;
1736 if (!conn->ksnc_rx_scheduled) { /* not being progressed */
1737 list_add_tail(&conn->ksnc_rx_list,
1738 &sched->kss_rx_conns);
1739 conn->ksnc_rx_scheduled = 1;
1740 /* extra ref for scheduler */
1741 atomic_inc (&conn->ksnc_refcount);
1743 if (waitqueue_active (&sched->kss_waitq))
1744 wake_up (&sched->kss_waitq);
1747 spin_unlock_irqrestore (&sched->kss_lock, flags);
1750 read_unlock (&ksocknal_data.ksnd_global_lock);
1756 ksocknal_write_space (struct sock *sk)
1758 unsigned long flags;
1760 ksock_sched_t *sched;
1762 /* interleave correctly with closing sockets... */
1763 read_lock (&ksocknal_data.ksnd_global_lock);
1765 conn = sk->sk_user_data;
1767 CDEBUG(D_NET, "sk %p wspace %d low water %d conn %p%s%s%s\n",
1768 sk, tcp_wspace(sk), SOCKNAL_TX_LOW_WATER(sk), conn,
1769 (conn == NULL) ? "" : (conn->ksnc_tx_ready ?
1770 " ready" : " blocked"),
1771 (conn == NULL) ? "" : (conn->ksnc_tx_scheduled ?
1772 " scheduled" : " idle"),
1773 (conn == NULL) ? "" : (list_empty (&conn->ksnc_tx_queue) ?
1774 " empty" : " queued"));
1776 if (conn == NULL) { /* raced with ksocknal_close_sock */
1777 LASSERT (sk->sk_write_space != &ksocknal_write_space);
1778 sk->sk_write_space (sk);
1779 } else if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */
1780 clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags);
1782 if (!conn->ksnc_tx_ready) { /* new news */
1783 /* Set ASAP in case of concurrent calls to me */
1784 conn->ksnc_tx_ready = 1;
1786 sched = conn->ksnc_scheduler;
1788 spin_lock_irqsave (&sched->kss_lock, flags);
1790 /* Set again (process_transmit may have
1791 cleared while I blocked for the lock) */
1792 conn->ksnc_tx_ready = 1;
1794 if (!conn->ksnc_tx_scheduled && // not being progressed
1795 !list_empty(&conn->ksnc_tx_queue)){//packets to send
1796 list_add_tail (&conn->ksnc_tx_list,
1797 &sched->kss_tx_conns);
1798 conn->ksnc_tx_scheduled = 1;
1799 /* extra ref for scheduler */
1800 atomic_inc (&conn->ksnc_refcount);
1802 if (waitqueue_active (&sched->kss_waitq))
1803 wake_up (&sched->kss_waitq);
1806 spin_unlock_irqrestore (&sched->kss_lock, flags);
1810 read_unlock (&ksocknal_data.ksnd_global_lock);
1814 ksocknal_sock_write (struct socket *sock, void *buffer, int nob)
1817 mm_segment_t oldmm = get_fs();
1820 struct iovec iov = {
1824 struct msghdr msg = {
1829 .msg_control = NULL,
1830 .msg_controllen = 0,
1835 rc = sock_sendmsg (sock, &msg, iov.iov_len);
1842 CERROR ("Unexpected zero rc\n");
1843 return (-ECONNABORTED);
1846 buffer = ((char *)buffer) + rc;
1854 ksocknal_sock_read (struct socket *sock, void *buffer, int nob)
1857 mm_segment_t oldmm = get_fs();
1860 struct iovec iov = {
1864 struct msghdr msg = {
1869 .msg_control = NULL,
1870 .msg_controllen = 0,
1875 rc = sock_recvmsg (sock, &msg, iov.iov_len, 0);
1882 return (-ECONNABORTED);
1884 buffer = ((char *)buffer) + rc;
1892 ksocknal_exchange_nids (struct socket *sock, ptl_nid_t nid)
1896 ptl_magicversion_t *hmv = (ptl_magicversion_t *)&hdr.dest_nid;
1898 LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
1900 memset (&hdr, 0, sizeof (hdr));
1901 hmv->magic = __cpu_to_le32 (PORTALS_PROTO_MAGIC);
1902 hmv->version_major = __cpu_to_le32 (PORTALS_PROTO_VERSION_MAJOR);
1903 hmv->version_minor = __cpu_to_le32 (PORTALS_PROTO_VERSION_MINOR);
1905 hdr.src_nid = __cpu_to_le64 (ksocknal_lib.ni.nid);
1906 hdr.type = __cpu_to_le32 (PTL_MSG_HELLO);
1908 /* Assume sufficient socket buffering for this message */
1909 rc = ksocknal_sock_write (sock, &hdr, sizeof (hdr));
1911 CERROR ("Error %d sending HELLO to "LPX64"\n", rc, nid);
1915 rc = ksocknal_sock_read (sock, hmv, sizeof (*hmv));
1917 CERROR ("Error %d reading HELLO from "LPX64"\n", rc, nid);
1921 if (hmv->magic != __le32_to_cpu (PORTALS_PROTO_MAGIC)) {
1922 CERROR ("Bad magic %#08x (%#08x expected) from "LPX64"\n",
1923 __cpu_to_le32 (hmv->magic), PORTALS_PROTO_MAGIC, nid);
1927 if (hmv->version_major != __cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR) ||
1928 hmv->version_minor != __cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR)) {
1929 CERROR ("Incompatible protocol version %d.%d (%d.%d expected)"
1931 __le16_to_cpu (hmv->version_major),
1932 __le16_to_cpu (hmv->version_minor),
1933 PORTALS_PROTO_VERSION_MAJOR,
1934 PORTALS_PROTO_VERSION_MINOR,
1939 LASSERT (PORTALS_PROTO_VERSION_MAJOR == 0);
1940 /* version 0 sends magic/version as the dest_nid of a 'hello' header,
1941 * so read the rest of it in now... */
1943 rc = ksocknal_sock_read (sock, hmv + 1, sizeof (hdr) - sizeof (*hmv));
1945 CERROR ("Error %d reading rest of HELLO hdr from "LPX64"\n",
1950 /* ...and check we got what we expected */
1951 if (hdr.type != __cpu_to_le32 (PTL_MSG_HELLO) ||
1952 PTL_HDR_LENGTH (&hdr) != __cpu_to_le32 (0)) {
1953 CERROR ("Expecting a HELLO hdr with 0 payload,"
1954 " but got type %d with %d payload from "LPX64"\n",
1955 __le32_to_cpu (hdr.type),
1956 __le32_to_cpu (PTL_HDR_LENGTH (&hdr)), nid);
1960 if (__le64_to_cpu (hdr.src_nid) != nid) {
1961 CERROR ("Connected to nid "LPX64", but expecting "LPX64"\n",
1962 __le64_to_cpu (hdr.src_nid), nid);
1970 ksocknal_set_linger (struct socket *sock)
1972 mm_segment_t oldmm = get_fs ();
1975 struct linger linger;
1977 /* Ensure this socket aborts active sends immediately when we close
1981 linger.l_linger = 0;
1984 rc = sock_setsockopt (sock, SOL_SOCKET, SO_LINGER,
1985 (char *)&linger, sizeof (linger));
1988 CERROR ("Can't set SO_LINGER: %d\n", rc);
1994 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_LINGER2,
1995 (char *)&option, sizeof (option));
1998 CERROR ("Can't set SO_LINGER2: %d\n", rc);
2006 ksocknal_connect_peer (ksock_route_t *route)
2008 struct sockaddr_in peer_addr;
2009 mm_segment_t oldmm = get_fs();
2013 struct socket *sock;
2016 rc = sock_create (PF_INET, SOCK_STREAM, 0, &sock);
2018 CERROR ("Can't create autoconnect socket: %d\n", rc);
2022 /* Ugh; have to map_fd for compatibility with sockets passed in
2023 * from userspace. And we actually need the refcounting that
2024 * this gives you :) */
2026 fd = sock_map_fd (sock);
2028 sock_release (sock);
2029 CERROR ("sock_map_fd error %d\n", fd);
2033 /* Set the socket timeouts, so our connection attempt completes in
2035 tv.tv_sec = ksocknal_io_timeout / HZ;
2036 n = ksocknal_io_timeout % HZ;
2037 n = n * 1000000 + HZ - 1;
2042 rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDTIMEO,
2043 (char *)&tv, sizeof (tv));
2046 CERROR ("Can't set send timeout %d (in HZ): %d\n",
2047 ksocknal_io_timeout, rc);
2052 rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVTIMEO,
2053 (char *)&tv, sizeof (tv));
2056 CERROR ("Can't set receive timeout %d (in HZ): %d\n",
2057 ksocknal_io_timeout, rc);
2061 if (route->ksnr_nonagel) {
2065 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_NODELAY,
2066 (char *)&option, sizeof (option));
2069 CERROR ("Can't disable nagel: %d\n", rc);
2074 if (route->ksnr_buffer_size != 0) {
2075 int option = route->ksnr_buffer_size;
2078 rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDBUF,
2079 (char *)&option, sizeof (option));
2082 CERROR ("Can't set send buffer %d: %d\n",
2083 route->ksnr_buffer_size, rc);
2088 rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVBUF,
2089 (char *)&option, sizeof (option));
2092 CERROR ("Can't set receive buffer %d: %d\n",
2093 route->ksnr_buffer_size, rc);
2098 memset (&peer_addr, 0, sizeof (peer_addr));
2099 peer_addr.sin_family = AF_INET;
2100 peer_addr.sin_port = htons (route->ksnr_port);
2101 peer_addr.sin_addr.s_addr = htonl (route->ksnr_ipaddr);
2103 rc = sock->ops->connect (sock, (struct sockaddr *)&peer_addr,
2104 sizeof (peer_addr), sock->file->f_flags);
2106 CERROR ("Error %d connecting to "LPX64"\n", rc,
2107 route->ksnr_peer->ksnp_nid);
2111 if (route->ksnr_xchange_nids) {
2112 rc = ksocknal_exchange_nids (sock, route->ksnr_peer->ksnp_nid);
2117 rc = ksocknal_create_conn (route->ksnr_peer->ksnp_nid,
2118 route, sock, route->ksnr_irq_affinity);
2128 ksocknal_autoconnect (ksock_route_t *route)
2130 LIST_HEAD (zombies);
2133 unsigned long flags;
2136 rc = ksocknal_connect_peer (route);
2138 /* successfully autoconnected: create_conn did the
2139 * route/conn binding and scheduled any blocked packets,
2140 * so there's nothing left to do now. */
2144 write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
2146 peer = route->ksnr_peer;
2147 route->ksnr_connecting = 0;
2149 LASSERT (route->ksnr_retry_interval != 0);
2150 route->ksnr_timeout = jiffies_64 + route->ksnr_retry_interval;
2151 route->ksnr_retry_interval = MIN (route->ksnr_retry_interval * 2,
2152 SOCKNAL_MAX_RECONNECT_INTERVAL);
2154 if (!list_empty (&peer->ksnp_tx_queue) &&
2155 ksocknal_find_connecting_route_locked (peer) == NULL) {
2156 LASSERT (list_empty (&peer->ksnp_conns));
2158 /* None of the connections that the blocked packets are
2159 * waiting for have been successful. Complete them now... */
2161 tx = list_entry (peer->ksnp_tx_queue.next,
2162 ksock_tx_t, tx_list);
2163 list_del (&tx->tx_list);
2164 list_add_tail (&tx->tx_list, &zombies);
2165 } while (!list_empty (&peer->ksnp_tx_queue));
2168 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
2170 while (!list_empty (&zombies)) {
2171 tx = list_entry (zombies.next, ksock_tx_t, tx_list);
2173 CERROR ("Deleting packet type %d len %d ("LPX64"->"LPX64")\n",
2174 NTOH__u32 (tx->tx_hdr->type),
2175 NTOH__u32 (PTL_HDR_LENGTH(tx->tx_hdr)),
2176 NTOH__u64 (tx->tx_hdr->src_nid),
2177 NTOH__u64 (tx->tx_hdr->dest_nid));
2179 list_del (&tx->tx_list);
2181 ksocknal_tx_done (tx, 0);
2186 ksocknal_autoconnectd (void *arg)
2188 long id = (long)arg;
2190 unsigned long flags;
2191 ksock_route_t *route;
2194 snprintf (name, sizeof (name), "ksocknal_ad[%ld]", id);
2195 kportal_daemonize (name);
2196 kportal_blockallsigs ();
2198 spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2200 while (!ksocknal_data.ksnd_shuttingdown) {
2202 if (!list_empty (&ksocknal_data.ksnd_autoconnectd_routes)) {
2203 route = list_entry (ksocknal_data.ksnd_autoconnectd_routes.next,
2204 ksock_route_t, ksnr_connect_list);
2206 list_del (&route->ksnr_connect_list);
2207 spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2209 ksocknal_autoconnect (route);
2210 ksocknal_put_route (route);
2212 spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2216 spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2218 rc = wait_event_interruptible (ksocknal_data.ksnd_autoconnectd_waitq,
2219 ksocknal_data.ksnd_shuttingdown ||
2220 !list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
2222 spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2225 spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2227 ksocknal_thread_fini ();
2232 ksocknal_find_timed_out_conn (ksock_peer_t *peer)
2234 /* We're called with a shared lock on ksnd_global_lock */
2235 unsigned long flags;
2237 struct list_head *ctmp;
2239 struct list_head *ttmp;
2240 ksock_sched_t *sched;
2242 list_for_each (ctmp, &peer->ksnp_conns) {
2243 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
2244 sched = conn->ksnc_scheduler;
2246 if (conn->ksnc_rx_deadline != 0 &&
2247 conn->ksnc_rx_deadline <= jiffies_64)
2250 spin_lock_irqsave (&sched->kss_lock, flags);
2252 list_for_each (ttmp, &conn->ksnc_tx_queue) {
2253 tx = list_entry (ttmp, ksock_tx_t, tx_list);
2254 LASSERT (tx->tx_deadline != 0);
2256 if (tx->tx_deadline <= jiffies_64)
2257 goto timed_out_locked;
2260 list_for_each (ttmp, &conn->ksnc_tx_pending) {
2261 tx = list_entry (ttmp, ksock_tx_t, tx_list);
2262 LASSERT (tx->tx_deadline != 0);
2264 if (tx->tx_deadline <= jiffies_64)
2265 goto timed_out_locked;
2268 spin_unlock_irqrestore (&sched->kss_lock, flags);
2272 spin_unlock_irqrestore (&sched->kss_lock, flags);
2274 atomic_inc (&conn->ksnc_refcount);
2282 ksocknal_check_peer_timeouts (struct list_head *peers)
2284 struct list_head *ptmp;
2289 /* NB. We expect to have a look at all the peers and not find any
2290 * connections to time out, so we just use a shared lock while we
2292 read_lock (&ksocknal_data.ksnd_global_lock);
2294 list_for_each (ptmp, peers) {
2295 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
2296 conn = ksocknal_find_timed_out_conn (peer);
2299 read_unlock (&ksocknal_data.ksnd_global_lock);
2301 if (ksocknal_close_conn_unlocked (conn)) {
2302 /* I actually closed... */
2303 CERROR ("Timeout out conn->"LPX64" ip %x:%d\n",
2304 peer->ksnp_nid, conn->ksnc_ipaddr,
2308 /* NB we won't find this one again, but we can't
2309 * just proceed with the next peer, since we dropped
2310 * ksnd_global_lock and it might be dead already! */
2311 ksocknal_put_conn (conn);
2316 read_unlock (&ksocknal_data.ksnd_global_lock);
2320 ksocknal_reaper (void *arg)
2323 unsigned long flags;
2327 __u64 deadline = jiffies_64;
2329 kportal_daemonize ("ksocknal_reaper");
2330 kportal_blockallsigs ();
2332 init_waitqueue_entry (&wait, current);
2334 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2336 while (!ksocknal_data.ksnd_shuttingdown) {
2338 if (!list_empty (&ksocknal_data.ksnd_deathrow_conns)) {
2339 conn = list_entry (ksocknal_data.ksnd_deathrow_conns.next,
2340 ksock_conn_t, ksnc_list);
2341 list_del (&conn->ksnc_list);
2343 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2345 ksocknal_terminate_conn (conn);
2346 ksocknal_put_conn (conn);
2348 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2352 if (!list_empty (&ksocknal_data.ksnd_zombie_conns)) {
2353 conn = list_entry (ksocknal_data.ksnd_zombie_conns.next,
2354 ksock_conn_t, ksnc_list);
2355 list_del (&conn->ksnc_list);
2357 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2359 ksocknal_destroy_conn (conn);
2361 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2365 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2367 while ((timeout = deadline - jiffies_64) <= 0) {
2368 /* Time to check for timeouts on a few more peers */
2369 ksocknal_check_peer_timeouts (&ksocknal_data.ksnd_peers[peer_index]);
2371 peer_index = (peer_index + 1) % SOCKNAL_PEER_HASH_SIZE;
2375 add_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
2376 set_current_state (TASK_INTERRUPTIBLE);
2378 if (!ksocknal_data.ksnd_shuttingdown &&
2379 list_empty (&ksocknal_data.ksnd_deathrow_conns) &&
2380 list_empty (&ksocknal_data.ksnd_zombie_conns))
2381 schedule_timeout (timeout);
2383 set_current_state (TASK_RUNNING);
2384 remove_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
2386 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2389 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2391 ksocknal_thread_fini ();
2395 nal_cb_t ksocknal_lib = {
2396 nal_data: &ksocknal_data, /* NAL private data */
2397 cb_send: ksocknal_send,
2398 cb_send_pages: ksocknal_send_pages,
2399 cb_recv: ksocknal_recv,
2400 cb_recv_pages: ksocknal_recv_pages,
2401 cb_read: ksocknal_read,
2402 cb_write: ksocknal_write,
2403 cb_callback: ksocknal_callback,
2404 cb_malloc: ksocknal_malloc,
2405 cb_free: ksocknal_free,
2406 cb_printf: ksocknal_printf,
2407 cb_cli: ksocknal_cli,
2408 cb_sti: ksocknal_sti,
2409 cb_dist: ksocknal_dist