1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5 * Author: Zach Brown <zab@zabbo.net>
6 * Author: Peter J. Braam <braam@clusterfs.com>
7 * Author: Phil Schwan <phil@clusterfs.com>
8 * Author: Eric Barton <eric@bartonsoftware.com>
10 * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
12 * Portals is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Portals is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Portals; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
28 # include <linux/syscalls.h>
32 * LIB functions follow
36 ksocknal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
38 /* I would guess that if ksocknal_get_peer (nid) == NULL,
39 and we're not routing, then 'nid' is very distant :) */
40 if (nal->libnal_ni.ni_pid.nid == nid) {
50 ksocknal_free_ltx (ksock_ltx_t *ltx)
52 atomic_dec(&ksocknal_data.ksnd_nactive_ltxs);
53 PORTAL_FREE(ltx, ltx->ltx_desc_size);
56 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
58 ksocknal_kvaddr_to_page (unsigned long vaddr)
62 if (vaddr >= VMALLOC_START &&
64 page = vmalloc_to_page ((void *)vaddr);
66 else if (vaddr >= PKMAP_BASE &&
67 vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
68 page = vmalloc_to_page ((void *)vaddr);
69 /* in 2.4 ^ just walks the page tables */
72 page = virt_to_page (vaddr);
83 ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
85 struct socket *sock = conn->ksnc_sock;
86 struct iovec *iov = tx->tx_iov;
87 int fragsize = iov->iov_len;
88 unsigned long vaddr = (unsigned long)iov->iov_base;
89 int more = (tx->tx_niov > 1) ||
91 (!list_empty (&conn->ksnc_tx_queue));
92 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
93 int offset = vaddr & (PAGE_SIZE - 1);
94 int zcsize = MIN (fragsize, PAGE_SIZE - offset);
99 /* NB we can't trust socket ops to either consume our iovs
100 * or leave them alone, so we only send 1 frag at a time. */
101 LASSERT (fragsize <= tx->tx_resid);
102 LASSERT (tx->tx_niov > 0);
104 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
105 if (zcsize >= ksocknal_data.ksnd_zc_min_frag &&
106 (sock->sk->route_caps & NETIF_F_SG) &&
107 (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
108 (page = ksocknal_kvaddr_to_page (vaddr)) != NULL) {
110 CDEBUG(D_NET, "vaddr %p, page %p->%p + offset %x for %d\n",
111 (void *)vaddr, page, page_address(page), offset, zcsize);
113 if (fragsize > zcsize) {
118 rc = tcp_sendpage_zccd(sock, page, offset, zcsize,
119 more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
124 /* NB don't pass tx's iov; sendmsg may or may not update it */
125 struct iovec fragiov = { .iov_base = (void *)vaddr,
126 .iov_len = fragsize};
127 struct msghdr msg = {
134 .msg_flags = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
136 mm_segment_t oldmm = get_fs();
139 rc = sock_sendmsg(sock, &msg, fragsize);
146 if (rc < iov->iov_len) {
147 /* didn't send whole iov entry... */
148 iov->iov_base = (void *)(vaddr + rc);
160 ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
162 struct socket *sock = conn->ksnc_sock;
163 ptl_kiov_t *kiov = tx->tx_kiov;
164 int fragsize = kiov->kiov_len;
165 struct page *page = kiov->kiov_page;
166 int offset = kiov->kiov_offset;
167 int more = (tx->tx_nkiov > 1) ||
168 (!list_empty (&conn->ksnc_tx_queue));
171 /* NB we can't trust socket ops to either consume our iovs
172 * or leave them alone, so we only send 1 frag at a time. */
173 LASSERT (fragsize <= tx->tx_resid);
174 LASSERT (offset + fragsize <= PAGE_SIZE);
175 LASSERT (tx->tx_niov == 0);
176 LASSERT (tx->tx_nkiov > 0);
179 if (fragsize >= ksocknal_tunables.ksnd_zc_min_frag &&
180 (sock->sk->route_caps & NETIF_F_SG) &&
181 (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM))) {
183 CDEBUG(D_NET, "page %p + offset %x for %d\n",
184 page, offset, fragsize);
186 rc = tcp_sendpage_zccd(sock, page, offset, fragsize,
187 more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
192 char *addr = ((char *)kmap (page)) + offset;
193 struct iovec fragiov = {.iov_base = addr,
194 .iov_len = fragsize};
195 struct msghdr msg = {
202 .msg_flags = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
204 mm_segment_t oldmm = get_fs();
207 rc = sock_sendmsg(sock, &msg, fragsize);
217 kiov->kiov_offset = offset + rc;
218 kiov->kiov_len = fragsize - rc;
229 ksocknal_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
233 if (ksocknal_data.ksnd_stall_tx != 0) {
234 set_current_state (TASK_UNINTERRUPTIBLE);
235 schedule_timeout (ksocknal_data.ksnd_stall_tx * HZ);
238 LASSERT (tx->tx_resid != 0);
240 rc = ksocknal_getconnsock (conn);
242 LASSERT (conn->ksnc_closing);
247 if (ksocknal_data.ksnd_enomem_tx > 0) {
249 ksocknal_data.ksnd_enomem_tx--;
251 } else if (tx->tx_niov != 0) {
252 rc = ksocknal_send_iov (conn, tx);
254 rc = ksocknal_send_kiov (conn, tx);
258 /* Didn't write anything.
260 * NB: rc == 0 and rc == -EAGAIN both mean try
261 * again later (linux stack returns -EAGAIN for
262 * this, but Adaptech TOE returns 0).
264 * Also, sends never fail with -ENOMEM, just
265 * -EAGAIN, but with the added bonus that we can't
266 * expect write_space() to call us back to tell us
267 * when to try sending again. We use the
268 * SOCK_NOSPACE flag to diagnose... */
270 LASSERT(rc != -ENOMEM);
272 if (rc == 0 || rc == -EAGAIN) {
273 if (test_bit(SOCK_NOSPACE,
274 &conn->ksnc_sock->flags)) {
280 if ((counter & (-counter)) == counter)
281 CWARN("%d ENOMEM tx %p\n",
291 /* Consider the connection alive since we managed to chuck
292 * more data into it. Really, we'd like to consider it
293 * alive only when the peer ACKs something, but
294 * write_space() only gets called back while SOCK_NOSPACE
295 * is set. Instead, we presume peer death has occurred if
296 * the socket doesn't drain within a timout */
297 conn->ksnc_tx_deadline = jiffies +
298 ksocknal_tunables.ksnd_io_timeout * HZ;
299 conn->ksnc_peer->ksnp_last_alive = jiffies;
301 } while (tx->tx_resid != 0);
303 ksocknal_putconnsock (conn);
308 ksocknal_eager_ack (ksock_conn_t *conn)
311 mm_segment_t oldmm = get_fs();
312 struct socket *sock = conn->ksnc_sock;
314 /* Remind the socket to ACK eagerly. If I don't, the socket might
315 * think I'm about to send something it could piggy-back the ACK
316 * on, introducing delay in completing zero-copy sends in my
320 sock->ops->setsockopt (sock, SOL_TCP, TCP_QUICKACK,
321 (char *)&opt, sizeof (opt));
326 ksocknal_recv_iov (ksock_conn_t *conn)
328 struct iovec *iov = conn->ksnc_rx_iov;
329 int fragsize = iov->iov_len;
330 unsigned long vaddr = (unsigned long)iov->iov_base;
331 struct iovec fragiov = { .iov_base = (void *)vaddr,
332 .iov_len = fragsize};
333 struct msghdr msg = {
342 mm_segment_t oldmm = get_fs();
345 /* NB we can't trust socket ops to either consume our iovs
346 * or leave them alone, so we only receive 1 frag at a time. */
347 LASSERT (conn->ksnc_rx_niov > 0);
348 LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
351 rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
352 /* NB this is just a boolean............................^ */
358 /* received something... */
359 conn->ksnc_peer->ksnp_last_alive = jiffies;
360 conn->ksnc_rx_deadline = jiffies +
361 ksocknal_tunables.ksnd_io_timeout * HZ;
362 mb(); /* order with setting rx_started */
363 conn->ksnc_rx_started = 1;
365 conn->ksnc_rx_nob_wanted -= rc;
366 conn->ksnc_rx_nob_left -= rc;
369 iov->iov_base = (void *)(vaddr + rc);
370 iov->iov_len = fragsize - rc;
375 conn->ksnc_rx_niov--;
380 ksocknal_recv_kiov (ksock_conn_t *conn)
382 ptl_kiov_t *kiov = conn->ksnc_rx_kiov;
383 struct page *page = kiov->kiov_page;
384 int offset = kiov->kiov_offset;
385 int fragsize = kiov->kiov_len;
386 unsigned long vaddr = ((unsigned long)kmap (page)) + offset;
387 struct iovec fragiov = { .iov_base = (void *)vaddr,
388 .iov_len = fragsize};
389 struct msghdr msg = {
398 mm_segment_t oldmm = get_fs();
401 /* NB we can't trust socket ops to either consume our iovs
402 * or leave them alone, so we only receive 1 frag at a time. */
403 LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
404 LASSERT (conn->ksnc_rx_nkiov > 0);
405 LASSERT (offset + fragsize <= PAGE_SIZE);
408 rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
409 /* NB this is just a boolean............................^ */
417 /* received something... */
418 conn->ksnc_peer->ksnp_last_alive = jiffies;
419 conn->ksnc_rx_deadline = jiffies +
420 ksocknal_tunables.ksnd_io_timeout * HZ;
421 mb(); /* order with setting rx_started */
422 conn->ksnc_rx_started = 1;
424 conn->ksnc_rx_nob_wanted -= rc;
425 conn->ksnc_rx_nob_left -= rc;
428 kiov->kiov_offset = offset + rc;
429 kiov->kiov_len = fragsize - rc;
433 conn->ksnc_rx_kiov++;
434 conn->ksnc_rx_nkiov--;
439 ksocknal_receive (ksock_conn_t *conn)
441 /* Return 1 on success, 0 on EOF, < 0 on error.
442 * Caller checks ksnc_rx_nob_wanted to determine
443 * progress/completion. */
447 if (ksocknal_data.ksnd_stall_rx != 0) {
448 set_current_state (TASK_UNINTERRUPTIBLE);
449 schedule_timeout (ksocknal_data.ksnd_stall_rx * HZ);
452 rc = ksocknal_getconnsock (conn);
454 LASSERT (conn->ksnc_closing);
459 if (conn->ksnc_rx_niov != 0)
460 rc = ksocknal_recv_iov (conn);
462 rc = ksocknal_recv_kiov (conn);
465 /* error/EOF or partial receive */
468 } else if (rc == 0 && conn->ksnc_rx_started) {
469 /* EOF in the middle of a message */
475 /* Completed a fragment */
477 if (conn->ksnc_rx_nob_wanted == 0) {
478 /* Completed a message segment (header or payload) */
479 if ((ksocknal_tunables.ksnd_eager_ack & conn->ksnc_type) != 0 &&
480 (conn->ksnc_rx_state == SOCKNAL_RX_BODY ||
481 conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD)) {
482 /* Remind the socket to ack eagerly... */
483 ksocknal_eager_ack(conn);
490 ksocknal_putconnsock (conn);
496 ksocknal_zc_callback (zccd_t *zcd)
498 ksock_tx_t *tx = KSOCK_ZCCD_2_TX(zcd);
499 ksock_sched_t *sched = tx->tx_conn->ksnc_scheduler;
503 /* Schedule tx for cleanup (can't do it now due to lock conflicts) */
505 spin_lock_irqsave (&sched->kss_lock, flags);
507 list_add_tail (&tx->tx_list, &sched->kss_zctxdone_list);
508 wake_up (&sched->kss_waitq);
510 spin_unlock_irqrestore (&sched->kss_lock, flags);
516 ksocknal_tx_done (ksock_tx_t *tx, int asynch)
521 if (tx->tx_conn != NULL) {
522 /* This tx got queued on a conn; do the accounting... */
523 atomic_sub (tx->tx_nob, &tx->tx_conn->ksnc_tx_nob);
525 /* zero copy completion isn't always from
526 * process_transmit() so it needs to keep a ref on
529 ksocknal_put_conn (tx->tx_conn);
535 if (tx->tx_isfwd) { /* was a forwarded packet? */
536 kpr_fwd_done (&ksocknal_data.ksnd_router,
537 KSOCK_TX_2_KPR_FWD_DESC (tx),
538 (tx->tx_resid == 0) ? 0 : -ECONNABORTED);
544 ltx = KSOCK_TX_2_KSOCK_LTX (tx);
546 lib_finalize (&ksocknal_lib, ltx->ltx_private, ltx->ltx_cookie,
547 (tx->tx_resid == 0) ? PTL_OK : PTL_FAIL);
549 ksocknal_free_ltx (ltx);
554 ksocknal_tx_launched (ksock_tx_t *tx)
557 if (atomic_read (&tx->tx_zccd.zccd_count) != 1) {
558 ksock_conn_t *conn = tx->tx_conn;
560 /* zccd skbufs are still in-flight. First take a ref on
561 * conn, so it hangs about for ksocknal_tx_done... */
562 atomic_inc (&conn->ksnc_refcount);
564 /* ...then drop the initial ref on zccd, so the zero copy
565 * callback can occur */
566 zccd_put (&tx->tx_zccd);
570 /* Any zero-copy-ness (if any) has completed; I can complete the
571 * transmit now, avoiding an extra schedule */
572 ksocknal_tx_done (tx, 0);
576 ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
581 rc = ksocknal_transmit (conn, tx);
583 CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc);
585 if (tx->tx_resid == 0) {
586 /* Sent everything OK */
589 ksocknal_tx_launched (tx);
597 /* Queue on ksnd_enomem_conns for retry after a timeout */
598 spin_lock_irqsave(&ksocknal_data.ksnd_reaper_lock, flags);
600 /* enomem list takes over scheduler's ref... */
601 LASSERT (conn->ksnc_tx_scheduled);
602 list_add_tail(&conn->ksnc_tx_list,
603 &ksocknal_data.ksnd_enomem_conns);
604 if (!time_after_eq(jiffies + SOCKNAL_ENOMEM_RETRY,
605 ksocknal_data.ksnd_reaper_waketime))
606 wake_up (&ksocknal_data.ksnd_reaper_waitq);
608 spin_unlock_irqrestore(&ksocknal_data.ksnd_reaper_lock, flags);
615 if (!conn->ksnc_closing)
616 CERROR("[%p] Error %d on write to "LPX64
617 " ip %d.%d.%d.%d:%d\n", conn, rc,
618 conn->ksnc_peer->ksnp_nid,
619 HIPQUAD(conn->ksnc_ipaddr),
622 ksocknal_close_conn_and_siblings (conn, rc);
623 ksocknal_tx_launched (tx);
629 ksocknal_launch_autoconnect_locked (ksock_route_t *route)
633 /* called holding write lock on ksnd_global_lock */
635 LASSERT (!route->ksnr_deleted);
636 LASSERT ((route->ksnr_connected & (1 << SOCKNAL_CONN_ANY)) == 0);
637 LASSERT ((route->ksnr_connected & KSNR_TYPED_ROUTES) != KSNR_TYPED_ROUTES);
638 LASSERT (!route->ksnr_connecting);
640 if (ksocknal_tunables.ksnd_typed_conns)
641 route->ksnr_connecting =
642 KSNR_TYPED_ROUTES & ~route->ksnr_connected;
644 route->ksnr_connecting = (1 << SOCKNAL_CONN_ANY);
646 atomic_inc (&route->ksnr_refcount); /* extra ref for asynchd */
648 spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
650 list_add_tail (&route->ksnr_connect_list,
651 &ksocknal_data.ksnd_autoconnectd_routes);
652 wake_up (&ksocknal_data.ksnd_autoconnectd_waitq);
654 spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
658 ksocknal_find_target_peer_locked (ksock_tx_t *tx, ptl_nid_t nid)
660 char ipbuf[PTL_NALFMT_SIZE];
661 ptl_nid_t target_nid;
663 ksock_peer_t *peer = ksocknal_find_peer_locked (nid);
669 CERROR ("Can't send packet to "LPX64
670 " %s: routed target is not a peer\n",
671 nid, portals_nid2str(SOCKNAL, nid, ipbuf));
675 rc = kpr_lookup (&ksocknal_data.ksnd_router, nid, tx->tx_nob,
678 CERROR ("Can't route to "LPX64" %s: router error %d\n",
679 nid, portals_nid2str(SOCKNAL, nid, ipbuf), rc);
683 peer = ksocknal_find_peer_locked (target_nid);
687 CERROR ("Can't send packet to "LPX64" %s: no peer entry\n",
688 target_nid, portals_nid2str(SOCKNAL, target_nid, ipbuf));
693 ksocknal_find_conn_locked (ksock_tx_t *tx, ksock_peer_t *peer)
695 struct list_head *tmp;
696 ksock_conn_t *typed = NULL;
698 ksock_conn_t *fallback = NULL;
701 /* Find the conn with the shortest tx queue */
702 list_for_each (tmp, &peer->ksnp_conns) {
703 ksock_conn_t *c = list_entry(tmp, ksock_conn_t, ksnc_list);
704 int nob = atomic_read(&c->ksnc_tx_nob) +
705 c->ksnc_sock->sk->sk_wmem_queued;
707 LASSERT (!c->ksnc_closing);
709 if (fallback == NULL || nob < fnob) {
714 if (!ksocknal_tunables.ksnd_typed_conns)
717 switch (c->ksnc_type) {
720 case SOCKNAL_CONN_ANY:
722 case SOCKNAL_CONN_BULK_IN:
724 case SOCKNAL_CONN_BULK_OUT:
725 if (tx->tx_nob < ksocknal_tunables.ksnd_min_bulk)
728 case SOCKNAL_CONN_CONTROL:
729 if (tx->tx_nob >= ksocknal_tunables.ksnd_min_bulk)
734 if (typed == NULL || nob < tnob) {
740 /* prefer the typed selection */
741 return ((typed != NULL) ? typed : fallback);
745 ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
748 ksock_sched_t *sched = conn->ksnc_scheduler;
750 /* called holding global lock (read or irq-write) and caller may
751 * not have dropped this lock between finding conn and calling me,
752 * so we don't need the {get,put}connsock dance to deref
754 LASSERT(!conn->ksnc_closing);
755 LASSERT(tx->tx_resid == tx->tx_nob);
757 CDEBUG (D_NET, "Sending to "LPX64" ip %d.%d.%d.%d:%d\n",
758 conn->ksnc_peer->ksnp_nid,
759 HIPQUAD(conn->ksnc_ipaddr),
762 atomic_add (tx->tx_nob, &conn->ksnc_tx_nob);
766 zccd_init (&tx->tx_zccd, ksocknal_zc_callback);
767 /* NB this sets 1 ref on zccd, so the callback can only occur after
768 * I've released this ref. */
770 spin_lock_irqsave (&sched->kss_lock, flags);
772 conn->ksnc_tx_deadline = jiffies +
773 ksocknal_tunables.ksnd_io_timeout * HZ;
774 mb(); /* order with list_add_tail */
776 list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
778 if (conn->ksnc_tx_ready && /* able to send */
779 !conn->ksnc_tx_scheduled) { /* not scheduled to send */
780 /* +1 ref for scheduler */
781 atomic_inc (&conn->ksnc_refcount);
782 list_add_tail (&conn->ksnc_tx_list,
783 &sched->kss_tx_conns);
784 conn->ksnc_tx_scheduled = 1;
785 wake_up (&sched->kss_waitq);
788 spin_unlock_irqrestore (&sched->kss_lock, flags);
792 ksocknal_find_connectable_route_locked (ksock_peer_t *peer)
794 struct list_head *tmp;
795 ksock_route_t *route;
796 ksock_route_t *first_lazy = NULL;
797 int found_connecting_or_connected = 0;
800 list_for_each (tmp, &peer->ksnp_routes) {
801 route = list_entry (tmp, ksock_route_t, ksnr_list);
802 bits = route->ksnr_connected;
804 if ((bits & KSNR_TYPED_ROUTES) == KSNR_TYPED_ROUTES ||
805 (bits & (1 << SOCKNAL_CONN_ANY)) != 0 ||
806 route->ksnr_connecting != 0) {
807 /* All typed connections have been established, or
808 * an untyped connection has been established, or
809 * connections are currently being established */
810 found_connecting_or_connected = 1;
814 /* too soon to retry this guy? */
815 if (!time_after_eq (jiffies, route->ksnr_timeout))
818 /* eager routes always want to be connected */
819 if (route->ksnr_eager)
822 if (first_lazy == NULL)
826 /* No eager routes need to be connected. If some connection has
827 * already been established, or is being established there's nothing to
828 * do. Otherwise we return the first lazy route we found. If it fails
829 * to connect, it will go to the end of the list. */
831 if (!list_empty (&peer->ksnp_conns) ||
832 found_connecting_or_connected)
839 ksocknal_find_connecting_route_locked (ksock_peer_t *peer)
841 struct list_head *tmp;
842 ksock_route_t *route;
844 list_for_each (tmp, &peer->ksnp_routes) {
845 route = list_entry (tmp, ksock_route_t, ksnr_list);
847 if (route->ksnr_connecting != 0)
855 ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid)
860 ksock_route_t *route;
863 /* Ensure the frags we've been given EXACTLY match the number of
864 * bytes we want to send. Many TCP/IP stacks disregard any total
865 * size parameters passed to them and just look at the frags.
867 * We always expect at least 1 mapped fragment containing the
868 * complete portals header. */
869 LASSERT (lib_iov_nob (tx->tx_niov, tx->tx_iov) +
870 lib_kiov_nob (tx->tx_nkiov, tx->tx_kiov) == tx->tx_nob);
871 LASSERT (tx->tx_niov >= 1);
872 LASSERT (tx->tx_iov[0].iov_len >= sizeof (ptl_hdr_t));
874 CDEBUG (D_NET, "packet %p type %d, nob %d niov %d nkiov %d\n",
875 tx, ((ptl_hdr_t *)tx->tx_iov[0].iov_base)->type,
876 tx->tx_nob, tx->tx_niov, tx->tx_nkiov);
878 tx->tx_conn = NULL; /* only set when assigned a conn */
879 tx->tx_resid = tx->tx_nob;
880 tx->tx_hdr = (ptl_hdr_t *)tx->tx_iov[0].iov_base;
882 g_lock = &ksocknal_data.ksnd_global_lock;
885 peer = ksocknal_find_target_peer_locked (tx, nid);
887 read_unlock (g_lock);
888 return (-EHOSTUNREACH);
891 if (ksocknal_find_connectable_route_locked(peer) == NULL) {
892 conn = ksocknal_find_conn_locked (tx, peer);
894 /* I've got no autoconnect routes that need to be
895 * connecting and I do have an actual connection... */
896 ksocknal_queue_tx_locked (tx, conn);
897 read_unlock (g_lock);
902 /* Making one or more connections; I'll need a write lock... */
904 atomic_inc (&peer->ksnp_refcount); /* +1 ref for me while I unlock */
905 read_unlock (g_lock);
906 write_lock_irqsave (g_lock, flags);
908 if (peer->ksnp_closing) { /* peer deleted as I blocked! */
909 write_unlock_irqrestore (g_lock, flags);
910 ksocknal_put_peer (peer);
911 return (-EHOSTUNREACH);
913 ksocknal_put_peer (peer); /* drop ref I got above */
916 /* launch any/all autoconnections that need it */
917 route = ksocknal_find_connectable_route_locked (peer);
921 ksocknal_launch_autoconnect_locked (route);
924 conn = ksocknal_find_conn_locked (tx, peer);
926 /* Connection exists; queue message on it */
927 ksocknal_queue_tx_locked (tx, conn);
928 write_unlock_irqrestore (g_lock, flags);
932 route = ksocknal_find_connecting_route_locked (peer);
934 /* At least 1 connection is being established; queue the
936 list_add_tail (&tx->tx_list, &peer->ksnp_tx_queue);
937 write_unlock_irqrestore (g_lock, flags);
941 write_unlock_irqrestore (g_lock, flags);
942 return (-EHOSTUNREACH);
946 ksocknal_sendmsg(lib_nal_t *nal,
953 unsigned int payload_niov,
954 struct iovec *payload_iov,
955 ptl_kiov_t *payload_kiov,
956 size_t payload_offset,
963 /* NB 'private' is different depending on what we're sending.
964 * Just ignore it... */
966 CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64
967 " pid %d\n", payload_nob, payload_niov, nid , pid);
969 LASSERT (payload_nob == 0 || payload_niov > 0);
970 LASSERT (payload_niov <= PTL_MD_MAX_IOV);
972 /* It must be OK to kmap() if required */
973 LASSERT (payload_kiov == NULL || !in_interrupt ());
974 /* payload is either all vaddrs or all pages */
975 LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
977 if (payload_iov != NULL)
978 desc_size = offsetof(ksock_ltx_t, ltx_iov[1 + payload_niov]);
980 desc_size = offsetof(ksock_ltx_t, ltx_kiov[payload_niov]);
982 if (in_interrupt() ||
983 type == PTL_MSG_ACK ||
984 type == PTL_MSG_REPLY) {
985 /* Can't block if in interrupt or responding to an incoming
987 PORTAL_ALLOC_ATOMIC(ltx, desc_size);
989 PORTAL_ALLOC(ltx, desc_size);
993 CERROR("Can't allocate tx desc type %d size %d %s\n",
994 type, desc_size, in_interrupt() ? "(intr)" : "");
995 return (PTL_NO_SPACE);
998 atomic_inc(&ksocknal_data.ksnd_nactive_ltxs);
1000 ltx->ltx_desc_size = desc_size;
1002 /* We always have 1 mapped frag for the header */
1003 ltx->ltx_tx.tx_iov = ltx->ltx_iov;
1004 ltx->ltx_iov[0].iov_base = <x->ltx_hdr;
1005 ltx->ltx_iov[0].iov_len = sizeof(*hdr);
1006 ltx->ltx_hdr = *hdr;
1008 ltx->ltx_private = private;
1009 ltx->ltx_cookie = cookie;
1011 ltx->ltx_tx.tx_isfwd = 0;
1012 ltx->ltx_tx.tx_nob = sizeof (*hdr) + payload_nob;
1014 if (payload_iov != NULL) {
1015 /* payload is all mapped */
1016 ltx->ltx_tx.tx_kiov = NULL;
1017 ltx->ltx_tx.tx_nkiov = 0;
1019 ltx->ltx_tx.tx_niov =
1020 1 + lib_extract_iov(payload_niov, <x->ltx_iov[1],
1021 payload_niov, payload_iov,
1022 payload_offset, payload_nob);
1024 /* payload is all pages */
1025 ltx->ltx_tx.tx_niov = 1;
1027 ltx->ltx_tx.tx_kiov = ltx->ltx_kiov;
1028 ltx->ltx_tx.tx_nkiov =
1029 lib_extract_kiov(payload_niov, ltx->ltx_kiov,
1030 payload_niov, payload_kiov,
1031 payload_offset, payload_nob);
1034 rc = ksocknal_launch_packet(<x->ltx_tx, nid);
1038 ksocknal_free_ltx(ltx);
1043 ksocknal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
1044 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1045 unsigned int payload_niov, struct iovec *payload_iov,
1046 size_t payload_offset, size_t payload_len)
1048 return (ksocknal_sendmsg(nal, private, cookie,
1049 hdr, type, nid, pid,
1050 payload_niov, payload_iov, NULL,
1051 payload_offset, payload_len));
1055 ksocknal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie,
1056 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1057 unsigned int payload_niov, ptl_kiov_t *payload_kiov,
1058 size_t payload_offset, size_t payload_len)
1060 return (ksocknal_sendmsg(nal, private, cookie,
1061 hdr, type, nid, pid,
1062 payload_niov, NULL, payload_kiov,
1063 payload_offset, payload_len));
1067 ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
1069 ptl_nid_t nid = fwd->kprfd_gateway_nid;
1070 ksock_ftx_t *ftx = (ksock_ftx_t *)&fwd->kprfd_scratch;
1073 CDEBUG (D_NET, "Forwarding [%p] -> "LPX64" ("LPX64"))\n", fwd,
1074 fwd->kprfd_gateway_nid, fwd->kprfd_target_nid);
1076 /* I'm the gateway; must be the last hop */
1077 if (nid == ksocknal_lib.libnal_ni.ni_pid.nid)
1078 nid = fwd->kprfd_target_nid;
1080 /* setup iov for hdr */
1081 ftx->ftx_iov.iov_base = fwd->kprfd_hdr;
1082 ftx->ftx_iov.iov_len = sizeof(ptl_hdr_t);
1084 ftx->ftx_tx.tx_isfwd = 1; /* This is a forwarding packet */
1085 ftx->ftx_tx.tx_nob = sizeof(ptl_hdr_t) + fwd->kprfd_nob;
1086 ftx->ftx_tx.tx_niov = 1;
1087 ftx->ftx_tx.tx_iov = &ftx->ftx_iov;
1088 ftx->ftx_tx.tx_nkiov = fwd->kprfd_niov;
1089 ftx->ftx_tx.tx_kiov = fwd->kprfd_kiov;
1091 rc = ksocknal_launch_packet (&ftx->ftx_tx, nid);
1093 kpr_fwd_done (&ksocknal_data.ksnd_router, fwd, rc);
1097 ksocknal_thread_start (int (*fn)(void *arg), void *arg)
1099 long pid = kernel_thread (fn, arg, 0);
1104 atomic_inc (&ksocknal_data.ksnd_nthreads);
1109 ksocknal_thread_fini (void)
1111 atomic_dec (&ksocknal_data.ksnd_nthreads);
1115 ksocknal_fmb_callback (void *arg, int error)
1117 ksock_fmb_t *fmb = (ksock_fmb_t *)arg;
1118 ksock_fmb_pool_t *fmp = fmb->fmb_pool;
1119 ptl_hdr_t *hdr = (ptl_hdr_t *)page_address(fmb->fmb_kiov[0].kiov_page);
1120 ksock_conn_t *conn = NULL;
1121 ksock_sched_t *sched;
1122 unsigned long flags;
1123 char ipbuf[PTL_NALFMT_SIZE];
1124 char ipbuf2[PTL_NALFMT_SIZE];
1127 CERROR("Failed to route packet from "
1128 LPX64" %s to "LPX64" %s: %d\n",
1129 NTOH__u64(hdr->src_nid),
1130 portals_nid2str(SOCKNAL, NTOH__u64(hdr->src_nid), ipbuf),
1131 NTOH__u64(hdr->dest_nid),
1132 portals_nid2str(SOCKNAL, NTOH__u64(hdr->dest_nid), ipbuf2),
1135 CDEBUG (D_NET, "routed packet from "LPX64" to "LPX64": OK\n",
1136 NTOH__u64 (hdr->src_nid), NTOH__u64 (hdr->dest_nid));
1138 /* drop peer ref taken on init */
1139 ksocknal_put_peer (fmb->fmb_peer);
1141 spin_lock_irqsave (&fmp->fmp_lock, flags);
1143 list_add (&fmb->fmb_list, &fmp->fmp_idle_fmbs);
1144 fmp->fmp_nactive_fmbs--;
1146 if (!list_empty (&fmp->fmp_blocked_conns)) {
1147 conn = list_entry (fmb->fmb_pool->fmp_blocked_conns.next,
1148 ksock_conn_t, ksnc_rx_list);
1149 list_del (&conn->ksnc_rx_list);
1152 spin_unlock_irqrestore (&fmp->fmp_lock, flags);
1157 CDEBUG (D_NET, "Scheduling conn %p\n", conn);
1158 LASSERT (conn->ksnc_rx_scheduled);
1159 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP);
1161 conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;
1163 sched = conn->ksnc_scheduler;
1165 spin_lock_irqsave (&sched->kss_lock, flags);
1167 list_add_tail (&conn->ksnc_rx_list, &sched->kss_rx_conns);
1168 wake_up (&sched->kss_waitq);
1170 spin_unlock_irqrestore (&sched->kss_lock, flags);
1174 ksocknal_get_idle_fmb (ksock_conn_t *conn)
1176 int payload_nob = conn->ksnc_rx_nob_left;
1177 unsigned long flags;
1178 ksock_fmb_pool_t *pool;
1181 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
1182 LASSERT (kpr_routing(&ksocknal_data.ksnd_router));
1184 if (payload_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE)
1185 pool = &ksocknal_data.ksnd_small_fmp;
1187 pool = &ksocknal_data.ksnd_large_fmp;
1189 spin_lock_irqsave (&pool->fmp_lock, flags);
1191 if (!list_empty (&pool->fmp_idle_fmbs)) {
1192 fmb = list_entry(pool->fmp_idle_fmbs.next,
1193 ksock_fmb_t, fmb_list);
1194 list_del (&fmb->fmb_list);
1195 pool->fmp_nactive_fmbs++;
1196 spin_unlock_irqrestore (&pool->fmp_lock, flags);
1201 /* deschedule until fmb free */
1203 conn->ksnc_rx_state = SOCKNAL_RX_FMB_SLEEP;
1205 list_add_tail (&conn->ksnc_rx_list,
1206 &pool->fmp_blocked_conns);
1208 spin_unlock_irqrestore (&pool->fmp_lock, flags);
1213 ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
1215 int payload_nob = conn->ksnc_rx_nob_left;
1216 ptl_nid_t dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
1218 int nob = payload_nob;
1220 LASSERT (conn->ksnc_rx_scheduled);
1221 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
1222 LASSERT (conn->ksnc_rx_nob_wanted == conn->ksnc_rx_nob_left);
1223 LASSERT (payload_nob >= 0);
1224 LASSERT (payload_nob <= fmb->fmb_pool->fmp_buff_pages * PAGE_SIZE);
1225 LASSERT (sizeof (ptl_hdr_t) < PAGE_SIZE);
1226 LASSERT (fmb->fmb_kiov[0].kiov_offset == 0);
1228 /* Take a ref on the conn's peer to prevent module unload before
1229 * forwarding completes. */
1230 fmb->fmb_peer = conn->ksnc_peer;
1231 atomic_inc (&conn->ksnc_peer->ksnp_refcount);
1233 /* Copy the header we just read into the forwarding buffer. If
1234 * there's payload, start reading reading it into the buffer,
1235 * otherwise the forwarding buffer can be kicked off
1237 fmb->fmb_hdr = conn->ksnc_hdr;
1240 LASSERT (niov < fmb->fmb_pool->fmp_buff_pages);
1241 LASSERT (fmb->fmb_kiov[niov].kiov_offset == 0);
1242 fmb->fmb_kiov[niov].kiov_len = MIN (PAGE_SIZE, nob);
1247 kpr_fwd_init(&fmb->fmb_fwd, dest_nid, &fmb->fmb_hdr,
1248 payload_nob, niov, fmb->fmb_kiov,
1249 ksocknal_fmb_callback, fmb);
1251 if (payload_nob == 0) { /* got complete packet already */
1252 CDEBUG (D_NET, "%p "LPX64"->"LPX64" fwd_start (immediate)\n",
1253 conn, NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid);
1255 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
1257 ksocknal_new_packet (conn, 0); /* on to next packet */
1261 conn->ksnc_cookie = fmb; /* stash fmb for later */
1262 conn->ksnc_rx_state = SOCKNAL_RX_BODY_FWD; /* read in the payload */
1264 /* Set up conn->ksnc_rx_kiov to read the payload into fmb's kiov-ed
1266 LASSERT (niov <= sizeof(conn->ksnc_rx_iov_space)/sizeof(ptl_kiov_t));
1268 conn->ksnc_rx_niov = 0;
1269 conn->ksnc_rx_nkiov = niov;
1270 conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
1271 memcpy(conn->ksnc_rx_kiov, fmb->fmb_kiov, niov * sizeof(ptl_kiov_t));
1273 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d reading body\n", conn,
1274 NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid, payload_nob);
1279 ksocknal_fwd_parse (ksock_conn_t *conn)
1282 ptl_nid_t dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
1283 ptl_nid_t src_nid = NTOH__u64 (conn->ksnc_hdr.src_nid);
1284 int body_len = NTOH__u32 (conn->ksnc_hdr.payload_length);
1285 char str[PTL_NALFMT_SIZE];
1286 char str2[PTL_NALFMT_SIZE];
1288 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d parsing header\n", conn,
1289 src_nid, dest_nid, conn->ksnc_rx_nob_left);
1291 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER);
1292 LASSERT (conn->ksnc_rx_scheduled);
1294 if (body_len < 0) { /* length corrupt (overflow) */
1295 CERROR("dropping packet from "LPX64" (%s) for "LPX64" (%s): "
1296 "packet size %d illegal\n",
1297 src_nid, portals_nid2str(TCPNAL, src_nid, str),
1298 dest_nid, portals_nid2str(TCPNAL, dest_nid, str2),
1301 ksocknal_new_packet (conn, 0); /* on to new packet */
1305 if (!kpr_routing(&ksocknal_data.ksnd_router)) { /* not forwarding */
1306 CERROR("dropping packet from "LPX64" (%s) for "LPX64
1307 " (%s): not forwarding\n",
1308 src_nid, portals_nid2str(TCPNAL, src_nid, str),
1309 dest_nid, portals_nid2str(TCPNAL, dest_nid, str2));
1310 /* on to new packet (skip this one's body) */
1311 ksocknal_new_packet (conn, body_len);
1315 if (body_len > PTL_MTU) { /* too big to forward */
1316 CERROR ("dropping packet from "LPX64" (%s) for "LPX64
1317 "(%s): packet size %d too big\n",
1318 src_nid, portals_nid2str(TCPNAL, src_nid, str),
1319 dest_nid, portals_nid2str(TCPNAL, dest_nid, str2),
1321 /* on to new packet (skip this one's body) */
1322 ksocknal_new_packet (conn, body_len);
1326 /* should have gone direct */
1327 peer = ksocknal_get_peer (conn->ksnc_hdr.dest_nid);
1329 CERROR ("dropping packet from "LPX64" (%s) for "LPX64
1330 "(%s): target is a peer\n",
1331 src_nid, portals_nid2str(TCPNAL, src_nid, str),
1332 dest_nid, portals_nid2str(TCPNAL, dest_nid, str2));
1333 ksocknal_put_peer (peer); /* drop ref from get above */
1335 /* on to next packet (skip this one's body) */
1336 ksocknal_new_packet (conn, body_len);
1340 conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB; /* Getting FMB now */
1341 conn->ksnc_rx_nob_left = body_len; /* stash packet size */
1342 conn->ksnc_rx_nob_wanted = body_len; /* (no slop) */
1346 ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip)
1348 static char ksocknal_slop_buffer[4096];
1354 if (nob_to_skip == 0) { /* right at next packet boundary now */
1355 conn->ksnc_rx_started = 0;
1356 mb (); /* racing with timeout thread */
1358 conn->ksnc_rx_state = SOCKNAL_RX_HEADER;
1359 conn->ksnc_rx_nob_wanted = sizeof (ptl_hdr_t);
1360 conn->ksnc_rx_nob_left = sizeof (ptl_hdr_t);
1362 conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1363 conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_hdr;
1364 conn->ksnc_rx_iov[0].iov_len = sizeof (ptl_hdr_t);
1365 conn->ksnc_rx_niov = 1;
1367 conn->ksnc_rx_kiov = NULL;
1368 conn->ksnc_rx_nkiov = 0;
1372 /* Set up to skip as much a possible now. If there's more left
1373 * (ran out of iov entries) we'll get called again */
1375 conn->ksnc_rx_state = SOCKNAL_RX_SLOP;
1376 conn->ksnc_rx_nob_left = nob_to_skip;
1377 conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1382 nob = MIN (nob_to_skip, sizeof (ksocknal_slop_buffer));
1384 conn->ksnc_rx_iov[niov].iov_base = ksocknal_slop_buffer;
1385 conn->ksnc_rx_iov[niov].iov_len = nob;
1390 } while (nob_to_skip != 0 && /* mustn't overflow conn's rx iov */
1391 niov < sizeof(conn->ksnc_rx_iov_space) / sizeof (struct iovec));
1393 conn->ksnc_rx_niov = niov;
1394 conn->ksnc_rx_kiov = NULL;
1395 conn->ksnc_rx_nkiov = 0;
1396 conn->ksnc_rx_nob_wanted = skipped;
1401 ksocknal_process_receive (ksock_conn_t *conn)
1406 LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
1408 /* doesn't need a forwarding buffer */
1409 if (conn->ksnc_rx_state != SOCKNAL_RX_GET_FMB)
1413 fmb = ksocknal_get_idle_fmb (conn);
1415 /* conn descheduled waiting for idle fmb */
1419 if (ksocknal_init_fmb (conn, fmb)) {
1420 /* packet forwarded */
1425 /* NB: sched lock NOT held */
1426 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER ||
1427 conn->ksnc_rx_state == SOCKNAL_RX_BODY ||
1428 conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD ||
1429 conn->ksnc_rx_state == SOCKNAL_RX_SLOP);
1431 LASSERT (conn->ksnc_rx_nob_wanted > 0);
1433 rc = ksocknal_receive(conn);
1436 LASSERT (rc != -EAGAIN);
1439 CWARN ("[%p] EOF from "LPX64" ip %d.%d.%d.%d:%d\n",
1440 conn, conn->ksnc_peer->ksnp_nid,
1441 HIPQUAD(conn->ksnc_ipaddr),
1443 else if (!conn->ksnc_closing)
1444 CERROR ("[%p] Error %d on read from "LPX64
1445 " ip %d.%d.%d.%d:%d\n",
1446 conn, rc, conn->ksnc_peer->ksnp_nid,
1447 HIPQUAD(conn->ksnc_ipaddr),
1450 ksocknal_close_conn_and_siblings (conn, rc);
1451 return (rc == 0 ? -ESHUTDOWN : rc);
1454 if (conn->ksnc_rx_nob_wanted != 0) {
1459 switch (conn->ksnc_rx_state) {
1460 case SOCKNAL_RX_HEADER:
1461 if (conn->ksnc_hdr.type != HTON__u32(PTL_MSG_HELLO) &&
1462 NTOH__u64(conn->ksnc_hdr.dest_nid) !=
1463 ksocknal_lib.libnal_ni.ni_pid.nid) {
1464 /* This packet isn't for me */
1465 ksocknal_fwd_parse (conn);
1466 switch (conn->ksnc_rx_state) {
1467 case SOCKNAL_RX_HEADER: /* skipped (zero payload) */
1468 return (0); /* => come back later */
1469 case SOCKNAL_RX_SLOP: /* skipping packet's body */
1470 goto try_read; /* => go read it */
1471 case SOCKNAL_RX_GET_FMB: /* forwarding */
1472 goto get_fmb; /* => go get a fwd msg buffer */
1479 /* sets wanted_len, iovs etc */
1480 rc = lib_parse(&ksocknal_lib, &conn->ksnc_hdr, conn);
1483 /* I just received garbage: give up on this conn */
1484 ksocknal_close_conn_and_siblings (conn, rc);
1488 if (conn->ksnc_rx_nob_wanted != 0) { /* need to get payload? */
1489 conn->ksnc_rx_state = SOCKNAL_RX_BODY;
1490 goto try_read; /* go read the payload */
1492 /* Fall through (completed packet for me) */
1494 case SOCKNAL_RX_BODY:
1495 /* payload all received */
1496 lib_finalize(&ksocknal_lib, NULL, conn->ksnc_cookie, PTL_OK);
1499 case SOCKNAL_RX_SLOP:
1500 /* starting new packet? */
1501 if (ksocknal_new_packet (conn, conn->ksnc_rx_nob_left))
1502 return (0); /* come back later */
1503 goto try_read; /* try to finish reading slop now */
1505 case SOCKNAL_RX_BODY_FWD:
1506 /* payload all received */
1507 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (got body)\n",
1508 conn, NTOH__u64 (conn->ksnc_hdr.src_nid),
1509 NTOH__u64 (conn->ksnc_hdr.dest_nid),
1510 conn->ksnc_rx_nob_left);
1512 /* forward the packet. NB ksocknal_init_fmb() put fmb into
1513 * conn->ksnc_cookie */
1514 fmb = (ksock_fmb_t *)conn->ksnc_cookie;
1515 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
1517 /* no slop in forwarded packets */
1518 LASSERT (conn->ksnc_rx_nob_left == 0);
1520 ksocknal_new_packet (conn, 0); /* on to next packet */
1521 return (0); /* (later) */
1529 return (-EINVAL); /* keep gcc happy */
1533 ksocknal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
1534 unsigned int niov, struct iovec *iov,
1535 size_t offset, size_t mlen, size_t rlen)
1537 ksock_conn_t *conn = (ksock_conn_t *)private;
1539 LASSERT (mlen <= rlen);
1540 LASSERT (niov <= PTL_MD_MAX_IOV);
1542 conn->ksnc_cookie = msg;
1543 conn->ksnc_rx_nob_wanted = mlen;
1544 conn->ksnc_rx_nob_left = rlen;
1546 conn->ksnc_rx_nkiov = 0;
1547 conn->ksnc_rx_kiov = NULL;
1548 conn->ksnc_rx_iov = conn->ksnc_rx_iov_space.iov;
1549 conn->ksnc_rx_niov =
1550 lib_extract_iov(PTL_MD_MAX_IOV, conn->ksnc_rx_iov,
1551 niov, iov, offset, mlen);
1554 lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1555 lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1561 ksocknal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
1562 unsigned int niov, ptl_kiov_t *kiov,
1563 size_t offset, size_t mlen, size_t rlen)
1565 ksock_conn_t *conn = (ksock_conn_t *)private;
1567 LASSERT (mlen <= rlen);
1568 LASSERT (niov <= PTL_MD_MAX_IOV);
1570 conn->ksnc_cookie = msg;
1571 conn->ksnc_rx_nob_wanted = mlen;
1572 conn->ksnc_rx_nob_left = rlen;
1574 conn->ksnc_rx_niov = 0;
1575 conn->ksnc_rx_iov = NULL;
1576 conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
1577 conn->ksnc_rx_nkiov =
1578 lib_extract_kiov(PTL_MD_MAX_IOV, conn->ksnc_rx_kiov,
1579 niov, kiov, offset, mlen);
1582 lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1583 lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1588 int ksocknal_scheduler (void *arg)
1590 ksock_sched_t *sched = (ksock_sched_t *)arg;
1593 unsigned long flags;
1596 int id = sched - ksocknal_data.ksnd_schedulers;
1599 snprintf (name, sizeof (name),"ksocknald_%02d", id);
1600 kportal_daemonize (name);
1601 kportal_blockallsigs ();
1603 #if (CONFIG_SMP && CPU_AFFINITY)
1604 if ((cpu_online_map & (1 << id)) != 0) {
1606 current->cpus_allowed = (1 << id);
1608 set_cpus_allowed (current, 1<<id);
1611 CERROR ("Can't set CPU affinity for %s\n", name);
1613 #endif /* CONFIG_SMP && CPU_AFFINITY */
1615 spin_lock_irqsave (&sched->kss_lock, flags);
1617 while (!ksocknal_data.ksnd_shuttingdown) {
1618 int did_something = 0;
1620 /* Ensure I progress everything semi-fairly */
1622 if (!list_empty (&sched->kss_rx_conns)) {
1623 conn = list_entry(sched->kss_rx_conns.next,
1624 ksock_conn_t, ksnc_rx_list);
1625 list_del(&conn->ksnc_rx_list);
1627 LASSERT(conn->ksnc_rx_scheduled);
1628 LASSERT(conn->ksnc_rx_ready);
1630 /* clear rx_ready in case receive isn't complete.
1631 * Do it BEFORE we call process_recv, since
1632 * data_ready can set it any time after we release
1634 conn->ksnc_rx_ready = 0;
1635 spin_unlock_irqrestore(&sched->kss_lock, flags);
1637 rc = ksocknal_process_receive(conn);
1639 spin_lock_irqsave(&sched->kss_lock, flags);
1641 /* I'm the only one that can clear this flag */
1642 LASSERT(conn->ksnc_rx_scheduled);
1644 /* Did process_receive get everything it wanted? */
1646 conn->ksnc_rx_ready = 1;
1648 if (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP ||
1649 conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB) {
1650 /* Conn blocked for a forwarding buffer.
1651 * It will get queued for my attention when
1652 * one becomes available (and it might just
1653 * already have been!). Meanwhile my ref
1654 * on it stays put. */
1655 } else if (conn->ksnc_rx_ready) {
1656 /* reschedule for rx */
1657 list_add_tail (&conn->ksnc_rx_list,
1658 &sched->kss_rx_conns);
1660 conn->ksnc_rx_scheduled = 0;
1662 ksocknal_put_conn(conn);
1668 if (!list_empty (&sched->kss_tx_conns)) {
1669 conn = list_entry(sched->kss_tx_conns.next,
1670 ksock_conn_t, ksnc_tx_list);
1671 list_del (&conn->ksnc_tx_list);
1673 LASSERT(conn->ksnc_tx_scheduled);
1674 LASSERT(conn->ksnc_tx_ready);
1675 LASSERT(!list_empty(&conn->ksnc_tx_queue));
1677 tx = list_entry(conn->ksnc_tx_queue.next,
1678 ksock_tx_t, tx_list);
1679 /* dequeue now so empty list => more to send */
1680 list_del(&tx->tx_list);
1682 /* Clear tx_ready in case send isn't complete. Do
1683 * it BEFORE we call process_transmit, since
1684 * write_space can set it any time after we release
1686 conn->ksnc_tx_ready = 0;
1687 spin_unlock_irqrestore (&sched->kss_lock, flags);
1689 rc = ksocknal_process_transmit(conn, tx);
1691 spin_lock_irqsave (&sched->kss_lock, flags);
1693 if (rc == -ENOMEM || rc == -EAGAIN) {
1694 /* Incomplete send: replace tx on HEAD of tx_queue */
1695 list_add (&tx->tx_list, &conn->ksnc_tx_queue);
1697 /* Complete send; assume space for more */
1698 conn->ksnc_tx_ready = 1;
1701 if (rc == -ENOMEM) {
1702 /* Do nothing; after a short timeout, this
1703 * conn will be reposted on kss_tx_conns. */
1704 } else if (conn->ksnc_tx_ready &&
1705 !list_empty (&conn->ksnc_tx_queue)) {
1706 /* reschedule for tx */
1707 list_add_tail (&conn->ksnc_tx_list,
1708 &sched->kss_tx_conns);
1710 conn->ksnc_tx_scheduled = 0;
1712 ksocknal_put_conn (conn);
1718 if (!list_empty (&sched->kss_zctxdone_list)) {
1720 list_entry(sched->kss_zctxdone_list.next,
1721 ksock_tx_t, tx_list);
1724 list_del (&tx->tx_list);
1725 spin_unlock_irqrestore (&sched->kss_lock, flags);
1727 ksocknal_tx_done (tx, 1);
1729 spin_lock_irqsave (&sched->kss_lock, flags);
1732 if (!did_something || /* nothing to do */
1733 ++nloops == SOCKNAL_RESCHED) { /* hogging CPU? */
1734 spin_unlock_irqrestore (&sched->kss_lock, flags);
1738 if (!did_something) { /* wait for something to do */
1740 rc = wait_event_interruptible (sched->kss_waitq,
1741 ksocknal_data.ksnd_shuttingdown ||
1742 !list_empty(&sched->kss_rx_conns) ||
1743 !list_empty(&sched->kss_tx_conns) ||
1744 !list_empty(&sched->kss_zctxdone_list));
1746 rc = wait_event_interruptible (sched->kss_waitq,
1747 ksocknal_data.ksnd_shuttingdown ||
1748 !list_empty(&sched->kss_rx_conns) ||
1749 !list_empty(&sched->kss_tx_conns));
1755 spin_lock_irqsave (&sched->kss_lock, flags);
1759 spin_unlock_irqrestore (&sched->kss_lock, flags);
1760 ksocknal_thread_fini ();
1765 ksocknal_data_ready (struct sock *sk, int n)
1767 unsigned long flags;
1769 ksock_sched_t *sched;
1772 /* interleave correctly with closing sockets... */
1773 read_lock (&ksocknal_data.ksnd_global_lock);
1775 conn = sk->sk_user_data;
1776 if (conn == NULL) { /* raced with ksocknal_terminate_conn */
1777 LASSERT (sk->sk_data_ready != &ksocknal_data_ready);
1778 sk->sk_data_ready (sk, n);
1780 sched = conn->ksnc_scheduler;
1782 spin_lock_irqsave (&sched->kss_lock, flags);
1784 conn->ksnc_rx_ready = 1;
1786 if (!conn->ksnc_rx_scheduled) { /* not being progressed */
1787 list_add_tail(&conn->ksnc_rx_list,
1788 &sched->kss_rx_conns);
1789 conn->ksnc_rx_scheduled = 1;
1790 /* extra ref for scheduler */
1791 atomic_inc (&conn->ksnc_refcount);
1793 wake_up (&sched->kss_waitq);
1796 spin_unlock_irqrestore (&sched->kss_lock, flags);
1799 read_unlock (&ksocknal_data.ksnd_global_lock);
1805 ksocknal_write_space (struct sock *sk)
1807 unsigned long flags;
1809 ksock_sched_t *sched;
1811 /* interleave correctly with closing sockets... */
1812 read_lock (&ksocknal_data.ksnd_global_lock);
1814 conn = sk->sk_user_data;
1816 CDEBUG(D_NET, "sk %p wspace %d low water %d conn %p%s%s%s\n",
1817 sk, tcp_wspace(sk), SOCKNAL_TX_LOW_WATER(sk), conn,
1818 (conn == NULL) ? "" : (conn->ksnc_tx_ready ?
1819 " ready" : " blocked"),
1820 (conn == NULL) ? "" : (conn->ksnc_tx_scheduled ?
1821 " scheduled" : " idle"),
1822 (conn == NULL) ? "" : (list_empty (&conn->ksnc_tx_queue) ?
1823 " empty" : " queued"));
1825 if (conn == NULL) { /* raced with ksocknal_terminate_conn */
1826 LASSERT (sk->sk_write_space != &ksocknal_write_space);
1827 sk->sk_write_space (sk);
1829 read_unlock (&ksocknal_data.ksnd_global_lock);
1833 if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */
1834 clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags);
1836 sched = conn->ksnc_scheduler;
1838 spin_lock_irqsave (&sched->kss_lock, flags);
1840 conn->ksnc_tx_ready = 1;
1842 if (!conn->ksnc_tx_scheduled && // not being progressed
1843 !list_empty(&conn->ksnc_tx_queue)){//packets to send
1844 list_add_tail (&conn->ksnc_tx_list,
1845 &sched->kss_tx_conns);
1846 conn->ksnc_tx_scheduled = 1;
1847 /* extra ref for scheduler */
1848 atomic_inc (&conn->ksnc_refcount);
1850 wake_up (&sched->kss_waitq);
1853 spin_unlock_irqrestore (&sched->kss_lock, flags);
1856 read_unlock (&ksocknal_data.ksnd_global_lock);
1860 ksocknal_sock_write (struct socket *sock, void *buffer, int nob)
1863 mm_segment_t oldmm = get_fs();
1866 struct iovec iov = {
1870 struct msghdr msg = {
1875 .msg_control = NULL,
1876 .msg_controllen = 0,
1881 rc = sock_sendmsg (sock, &msg, iov.iov_len);
1888 CERROR ("Unexpected zero rc\n");
1889 return (-ECONNABORTED);
1892 buffer = ((char *)buffer) + rc;
1900 ksocknal_sock_read (struct socket *sock, void *buffer, int nob)
1903 mm_segment_t oldmm = get_fs();
1906 struct iovec iov = {
1910 struct msghdr msg = {
1915 .msg_control = NULL,
1916 .msg_controllen = 0,
1921 rc = sock_recvmsg (sock, &msg, iov.iov_len, 0);
1928 return (-ECONNABORTED);
1930 buffer = ((char *)buffer) + rc;
1938 ksocknal_hello (struct socket *sock, ptl_nid_t *nid, int *type,
1943 ptl_magicversion_t *hmv = (ptl_magicversion_t *)&hdr.dest_nid;
1944 char ipbuf[PTL_NALFMT_SIZE];
1945 char ipbuf2[PTL_NALFMT_SIZE];
1947 LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
1949 memset (&hdr, 0, sizeof (hdr));
1950 hmv->magic = __cpu_to_le32 (PORTALS_PROTO_MAGIC);
1951 hmv->version_major = __cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR);
1952 hmv->version_minor = __cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR);
1954 hdr.src_nid = __cpu_to_le64 (ksocknal_lib.libnal_ni.ni_pid.nid);
1955 hdr.type = __cpu_to_le32 (PTL_MSG_HELLO);
1957 hdr.msg.hello.type = __cpu_to_le32 (*type);
1958 hdr.msg.hello.incarnation =
1959 __cpu_to_le64 (ksocknal_data.ksnd_incarnation);
1961 /* Assume sufficient socket buffering for this message */
1962 rc = ksocknal_sock_write (sock, &hdr, sizeof (hdr));
1964 CERROR ("Error %d sending HELLO to "LPX64" %s\n",
1965 rc, *nid, portals_nid2str(SOCKNAL, *nid, ipbuf));
1969 rc = ksocknal_sock_read (sock, hmv, sizeof (*hmv));
1971 CERROR ("Error %d reading HELLO from "LPX64" %s\n",
1972 rc, *nid, portals_nid2str(SOCKNAL, *nid, ipbuf));
1976 if (hmv->magic != __le32_to_cpu (PORTALS_PROTO_MAGIC)) {
1977 CERROR ("Bad magic %#08x (%#08x expected) from "LPX64" %s\n",
1978 __cpu_to_le32 (hmv->magic), PORTALS_PROTO_MAGIC, *nid,
1979 portals_nid2str(SOCKNAL, *nid, ipbuf));
1983 if (hmv->version_major != __cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR) ||
1984 hmv->version_minor != __cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR)) {
1985 CERROR ("Incompatible protocol version %d.%d (%d.%d expected)"
1986 " from "LPX64" %s\n",
1987 __le16_to_cpu (hmv->version_major),
1988 __le16_to_cpu (hmv->version_minor),
1989 PORTALS_PROTO_VERSION_MAJOR,
1990 PORTALS_PROTO_VERSION_MINOR,
1991 *nid, portals_nid2str(SOCKNAL, *nid, ipbuf));
1995 #if (PORTALS_PROTO_VERSION_MAJOR != 0)
1996 # error "This code only understands protocol version 0.x"
1998 /* version 0 sends magic/version as the dest_nid of a 'hello' header,
1999 * so read the rest of it in now... */
2001 rc = ksocknal_sock_read (sock, hmv + 1, sizeof (hdr) - sizeof (*hmv));
2003 CERROR ("Error %d reading rest of HELLO hdr from "LPX64" %s\n",
2004 rc, *nid, portals_nid2str(SOCKNAL, *nid, ipbuf));
2008 /* ...and check we got what we expected */
2009 if (hdr.type != __cpu_to_le32 (PTL_MSG_HELLO) ||
2010 hdr.payload_length != __cpu_to_le32 (0)) {
2011 CERROR ("Expecting a HELLO hdr with 0 payload,"
2012 " but got type %d with %d payload from "LPX64" %s\n",
2013 __le32_to_cpu (hdr.type),
2014 __le32_to_cpu (hdr.payload_length), *nid,
2015 portals_nid2str(SOCKNAL, *nid, ipbuf));
2019 if (__le64_to_cpu(hdr.src_nid) == PTL_NID_ANY) {
2020 CERROR("Expecting a HELLO hdr with a NID, but got PTL_NID_ANY\n");
2024 if (*nid == PTL_NID_ANY) { /* don't know peer's nid yet */
2025 *nid = __le64_to_cpu(hdr.src_nid);
2026 } else if (*nid != __le64_to_cpu (hdr.src_nid)) {
2027 CERROR ("Connected to nid "LPX64" %s, but expecting "LPX64" %s\n",
2028 __le64_to_cpu (hdr.src_nid),
2029 portals_nid2str(SOCKNAL,
2030 __le64_to_cpu(hdr.src_nid),
2032 *nid, portals_nid2str(SOCKNAL, *nid, ipbuf2));
2036 if (*type == SOCKNAL_CONN_NONE) {
2037 /* I've accepted this connection; peer determines type */
2038 *type = __le32_to_cpu(hdr.msg.hello.type);
2040 case SOCKNAL_CONN_ANY:
2041 case SOCKNAL_CONN_CONTROL:
2043 case SOCKNAL_CONN_BULK_IN:
2044 *type = SOCKNAL_CONN_BULK_OUT;
2046 case SOCKNAL_CONN_BULK_OUT:
2047 *type = SOCKNAL_CONN_BULK_IN;
2050 CERROR ("Unexpected type %d from "LPX64" %s\n",
2052 portals_nid2str(SOCKNAL, *nid, ipbuf));
2055 } else if (__le32_to_cpu(hdr.msg.hello.type) != SOCKNAL_CONN_NONE) {
2056 CERROR ("Mismatched types: me %d "LPX64" %s %d\n",
2057 *type, *nid, portals_nid2str(SOCKNAL, *nid, ipbuf),
2058 __le32_to_cpu(hdr.msg.hello.type));
2062 *incarnation = __le64_to_cpu(hdr.msg.hello.incarnation);
2068 ksocknal_setup_sock (struct socket *sock)
2070 mm_segment_t oldmm = get_fs ();
2073 struct linger linger;
2075 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2076 sock->sk->sk_allocation = GFP_NOFS;
2078 sock->sk->allocation = GFP_NOFS;
2081 /* Ensure this socket aborts active sends immediately when we close
2085 linger.l_linger = 0;
2088 rc = sock_setsockopt (sock, SOL_SOCKET, SO_LINGER,
2089 (char *)&linger, sizeof (linger));
2092 CERROR ("Can't set SO_LINGER: %d\n", rc);
2098 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_LINGER2,
2099 (char *)&option, sizeof (option));
2102 CERROR ("Can't set SO_LINGER2: %d\n", rc);
2106 #if SOCKNAL_USE_KEEPALIVES
2107 /* Keepalives: If 3/4 of the timeout elapses, start probing every
2108 * second until the timeout elapses. */
2110 option = (ksocknal_tunables.ksnd_io_timeout * 3) / 4;
2112 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPIDLE,
2113 (char *)&option, sizeof (option));
2116 CERROR ("Can't set TCP_KEEPIDLE: %d\n", rc);
2122 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPINTVL,
2123 (char *)&option, sizeof (option));
2126 CERROR ("Can't set TCP_KEEPINTVL: %d\n", rc);
2130 option = ksocknal_tunables.ksnd_io_timeout / 4;
2132 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPCNT,
2133 (char *)&option, sizeof (option));
2136 CERROR ("Can't set TCP_KEEPINTVL: %d\n", rc);
2142 rc = sock_setsockopt (sock, SOL_SOCKET, SO_KEEPALIVE,
2143 (char *)&option, sizeof (option));
2146 CERROR ("Can't set SO_KEEPALIVE: %d\n", rc);
2154 ksocknal_connect_peer (ksock_route_t *route, int type)
2156 struct sockaddr_in peer_addr;
2157 mm_segment_t oldmm = get_fs();
2160 struct socket *sock;
2162 char ipbuf[PTL_NALFMT_SIZE];
2164 rc = sock_create (PF_INET, SOCK_STREAM, 0, &sock);
2166 CERROR ("Can't create autoconnect socket: %d\n", rc);
2170 /* Ugh; have to map_fd for compatibility with sockets passed in
2171 * from userspace. And we actually need the sock->file refcounting
2172 * that this gives you :) */
2174 fd = sock_map_fd (sock);
2176 sock_release (sock);
2177 CERROR ("sock_map_fd error %d\n", fd);
2181 /* NB the fd now owns the ref on sock->file */
2182 LASSERT (sock->file != NULL);
2183 LASSERT (file_count(sock->file) == 1);
2185 /* Set the socket timeouts, so our connection attempt completes in
2187 tv.tv_sec = ksocknal_tunables.ksnd_io_timeout;
2191 rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDTIMEO,
2192 (char *)&tv, sizeof (tv));
2195 CERROR ("Can't set send timeout %d: %d\n",
2196 ksocknal_tunables.ksnd_io_timeout, rc);
2201 rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVTIMEO,
2202 (char *)&tv, sizeof (tv));
2205 CERROR ("Can't set receive timeout %d: %d\n",
2206 ksocknal_tunables.ksnd_io_timeout, rc);
2214 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_NODELAY,
2215 (char *)&option, sizeof (option));
2218 CERROR ("Can't disable nagle: %d\n", rc);
2223 if (route->ksnr_buffer_size != 0) {
2224 int option = route->ksnr_buffer_size;
2227 rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDBUF,
2228 (char *)&option, sizeof (option));
2231 CERROR ("Can't set send buffer %d: %d\n",
2232 route->ksnr_buffer_size, rc);
2237 rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVBUF,
2238 (char *)&option, sizeof (option));
2241 CERROR ("Can't set receive buffer %d: %d\n",
2242 route->ksnr_buffer_size, rc);
2247 memset (&peer_addr, 0, sizeof (peer_addr));
2248 peer_addr.sin_family = AF_INET;
2249 peer_addr.sin_port = htons (route->ksnr_port);
2250 peer_addr.sin_addr.s_addr = htonl (route->ksnr_ipaddr);
2252 rc = sock->ops->connect (sock, (struct sockaddr *)&peer_addr,
2253 sizeof (peer_addr), sock->file->f_flags);
2255 CERROR ("Error %d connecting to "LPX64" %s\n", rc,
2256 route->ksnr_peer->ksnp_nid,
2257 portals_nid2str(SOCKNAL,
2258 route->ksnr_peer->ksnp_nid,
2263 rc = ksocknal_create_conn (route, sock, route->ksnr_irq_affinity, type);
2265 /* Take an extra ref on sock->file to compensate for the
2266 * upcoming close which will lose fd's ref on it. */
2267 get_file (sock->file);
2276 ksocknal_autoconnect (ksock_route_t *route)
2278 LIST_HEAD (zombies);
2281 unsigned long flags;
2286 for (type = 0; type < SOCKNAL_CONN_NTYPES; type++)
2287 if ((route->ksnr_connecting & (1 << type)) != 0)
2289 LASSERT (type < SOCKNAL_CONN_NTYPES);
2291 rc = ksocknal_connect_peer (route, type);
2296 /* successfully autoconnected: create_conn did the
2297 * route/conn binding and scheduled any blocked packets */
2299 if (route->ksnr_connecting == 0) {
2300 /* No more connections required */
2305 /* Connection attempt failed */
2307 write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
2309 peer = route->ksnr_peer;
2310 route->ksnr_connecting = 0;
2312 /* This is a retry rather than a new connection */
2313 LASSERT (route->ksnr_retry_interval != 0);
2314 route->ksnr_timeout = jiffies + route->ksnr_retry_interval;
2315 route->ksnr_retry_interval = MIN (route->ksnr_retry_interval * 2,
2316 SOCKNAL_MAX_RECONNECT_INTERVAL);
2318 if (!list_empty (&peer->ksnp_tx_queue) &&
2319 ksocknal_find_connecting_route_locked (peer) == NULL) {
2320 LASSERT (list_empty (&peer->ksnp_conns));
2322 /* None of the connections that the blocked packets are
2323 * waiting for have been successful. Complete them now... */
2325 tx = list_entry (peer->ksnp_tx_queue.next,
2326 ksock_tx_t, tx_list);
2327 list_del (&tx->tx_list);
2328 list_add_tail (&tx->tx_list, &zombies);
2329 } while (!list_empty (&peer->ksnp_tx_queue));
2332 /* make this route least-favourite for re-selection */
2333 if (!route->ksnr_deleted) {
2334 list_del(&route->ksnr_list);
2335 list_add_tail(&route->ksnr_list, &peer->ksnp_routes);
2338 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
2340 while (!list_empty (&zombies)) {
2341 char ipbuf[PTL_NALFMT_SIZE];
2342 char ipbuf2[PTL_NALFMT_SIZE];
2343 tx = list_entry (zombies.next, ksock_tx_t, tx_list);
2345 CERROR ("Deleting packet type %d len %d ("LPX64" %s->"LPX64" %s)\n",
2346 NTOH__u32 (tx->tx_hdr->type),
2347 NTOH__u32 (tx->tx_hdr->payload_length),
2348 NTOH__u64 (tx->tx_hdr->src_nid),
2349 portals_nid2str(SOCKNAL,
2350 NTOH__u64(tx->tx_hdr->src_nid),
2352 NTOH__u64 (tx->tx_hdr->dest_nid),
2353 portals_nid2str(SOCKNAL,
2354 NTOH__u64(tx->tx_hdr->src_nid),
2357 list_del (&tx->tx_list);
2359 ksocknal_tx_done (tx, 0);
2364 ksocknal_autoconnectd (void *arg)
2366 long id = (long)arg;
2368 unsigned long flags;
2369 ksock_route_t *route;
2372 snprintf (name, sizeof (name), "ksocknal_ad%02ld", id);
2373 kportal_daemonize (name);
2374 kportal_blockallsigs ();
2376 spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2378 while (!ksocknal_data.ksnd_shuttingdown) {
2380 if (!list_empty (&ksocknal_data.ksnd_autoconnectd_routes)) {
2381 route = list_entry (ksocknal_data.ksnd_autoconnectd_routes.next,
2382 ksock_route_t, ksnr_connect_list);
2384 list_del (&route->ksnr_connect_list);
2385 spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2387 ksocknal_autoconnect (route);
2388 ksocknal_put_route (route);
2390 spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2394 spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2396 rc = wait_event_interruptible (ksocknal_data.ksnd_autoconnectd_waitq,
2397 ksocknal_data.ksnd_shuttingdown ||
2398 !list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
2400 spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2403 spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2405 ksocknal_thread_fini ();
2410 ksocknal_find_timed_out_conn (ksock_peer_t *peer)
2412 /* We're called with a shared lock on ksnd_global_lock */
2414 struct list_head *ctmp;
2415 ksock_sched_t *sched;
2417 list_for_each (ctmp, &peer->ksnp_conns) {
2418 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
2419 sched = conn->ksnc_scheduler;
2421 /* Don't need the {get,put}connsock dance to deref ksnc_sock... */
2422 LASSERT (!conn->ksnc_closing);
2424 if (conn->ksnc_rx_started &&
2425 time_after_eq (jiffies, conn->ksnc_rx_deadline)) {
2426 /* Timed out incomplete incoming message */
2427 atomic_inc (&conn->ksnc_refcount);
2428 CERROR ("Timed out RX from "LPX64" %p %d.%d.%d.%d\n",
2429 peer->ksnp_nid, conn, HIPQUAD(conn->ksnc_ipaddr));
2433 if ((!list_empty (&conn->ksnc_tx_queue) ||
2434 conn->ksnc_sock->sk->sk_wmem_queued != 0) &&
2435 time_after_eq (jiffies, conn->ksnc_tx_deadline)) {
2436 /* Timed out messages queued for sending, or
2437 * messages buffered in the socket's send buffer */
2438 atomic_inc (&conn->ksnc_refcount);
2439 CERROR ("Timed out TX to "LPX64" %s%d %p %d.%d.%d.%d\n",
2441 list_empty (&conn->ksnc_tx_queue) ? "" : "Q ",
2442 conn->ksnc_sock->sk->sk_wmem_queued, conn,
2443 HIPQUAD(conn->ksnc_ipaddr));
2452 ksocknal_check_peer_timeouts (int idx)
2454 struct list_head *peers = &ksocknal_data.ksnd_peers[idx];
2455 struct list_head *ptmp;
2460 /* NB. We expect to have a look at all the peers and not find any
2461 * connections to time out, so we just use a shared lock while we
2463 read_lock (&ksocknal_data.ksnd_global_lock);
2465 list_for_each (ptmp, peers) {
2466 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
2467 conn = ksocknal_find_timed_out_conn (peer);
2470 read_unlock (&ksocknal_data.ksnd_global_lock);
2472 CERROR ("Timeout out conn->"LPX64" ip %d.%d.%d.%d:%d\n",
2474 HIPQUAD(conn->ksnc_ipaddr),
2476 ksocknal_close_conn_and_siblings (conn, -ETIMEDOUT);
2478 /* NB we won't find this one again, but we can't
2479 * just proceed with the next peer, since we dropped
2480 * ksnd_global_lock and it might be dead already! */
2481 ksocknal_put_conn (conn);
2486 read_unlock (&ksocknal_data.ksnd_global_lock);
2490 ksocknal_reaper (void *arg)
2493 unsigned long flags;
2495 ksock_sched_t *sched;
2496 struct list_head enomem_conns;
2501 unsigned long deadline = jiffies;
2503 kportal_daemonize ("ksocknal_reaper");
2504 kportal_blockallsigs ();
2506 INIT_LIST_HEAD(&enomem_conns);
2507 init_waitqueue_entry (&wait, current);
2509 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2511 while (!ksocknal_data.ksnd_shuttingdown) {
2513 if (!list_empty (&ksocknal_data.ksnd_deathrow_conns)) {
2514 conn = list_entry (ksocknal_data.ksnd_deathrow_conns.next,
2515 ksock_conn_t, ksnc_list);
2516 list_del (&conn->ksnc_list);
2518 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2520 ksocknal_terminate_conn (conn);
2521 ksocknal_put_conn (conn);
2523 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2527 if (!list_empty (&ksocknal_data.ksnd_zombie_conns)) {
2528 conn = list_entry (ksocknal_data.ksnd_zombie_conns.next,
2529 ksock_conn_t, ksnc_list);
2530 list_del (&conn->ksnc_list);
2532 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2534 ksocknal_destroy_conn (conn);
2536 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2540 if (!list_empty (&ksocknal_data.ksnd_enomem_conns)) {
2541 list_add(&enomem_conns, &ksocknal_data.ksnd_enomem_conns);
2542 list_del_init(&ksocknal_data.ksnd_enomem_conns);
2545 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2547 /* reschedule all the connections that stalled with ENOMEM... */
2549 while (!list_empty (&enomem_conns)) {
2550 conn = list_entry (enomem_conns.next,
2551 ksock_conn_t, ksnc_tx_list);
2552 list_del (&conn->ksnc_tx_list);
2554 sched = conn->ksnc_scheduler;
2556 spin_lock_irqsave (&sched->kss_lock, flags);
2558 LASSERT (conn->ksnc_tx_scheduled);
2559 conn->ksnc_tx_ready = 1;
2560 list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns);
2561 wake_up (&sched->kss_waitq);
2563 spin_unlock_irqrestore (&sched->kss_lock, flags);
2567 /* careful with the jiffy wrap... */
2568 while ((timeout = (int)(deadline - jiffies)) <= 0) {
2571 int chunk = ksocknal_data.ksnd_peer_hash_size;
2573 /* Time to check for timeouts on a few more peers: I do
2574 * checks every 'p' seconds on a proportion of the peer
2575 * table and I need to check every connection 'n' times
2576 * within a timeout interval, to ensure I detect a
2577 * timeout on any connection within (n+1)/n times the
2578 * timeout interval. */
2580 if (ksocknal_tunables.ksnd_io_timeout > n * p)
2581 chunk = (chunk * n * p) /
2582 ksocknal_tunables.ksnd_io_timeout;
2586 for (i = 0; i < chunk; i++) {
2587 ksocknal_check_peer_timeouts (peer_index);
2588 peer_index = (peer_index + 1) %
2589 ksocknal_data.ksnd_peer_hash_size;
2595 if (nenomem_conns != 0) {
2596 /* Reduce my timeout if I rescheduled ENOMEM conns.
2597 * This also prevents me getting woken immediately
2598 * if any go back on my enomem list. */
2599 timeout = SOCKNAL_ENOMEM_RETRY;
2601 ksocknal_data.ksnd_reaper_waketime = jiffies + timeout;
2603 set_current_state (TASK_INTERRUPTIBLE);
2604 add_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
2606 if (!ksocknal_data.ksnd_shuttingdown &&
2607 list_empty (&ksocknal_data.ksnd_deathrow_conns) &&
2608 list_empty (&ksocknal_data.ksnd_zombie_conns))
2609 schedule_timeout (timeout);
2611 set_current_state (TASK_RUNNING);
2612 remove_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
2614 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2617 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2619 ksocknal_thread_fini ();
2623 lib_nal_t ksocknal_lib = {
2624 libnal_data: &ksocknal_data, /* NAL private data */
2625 libnal_send: ksocknal_send,
2626 libnal_send_pages: ksocknal_send_pages,
2627 libnal_recv: ksocknal_recv,
2628 libnal_recv_pages: ksocknal_recv_pages,
2629 libnal_dist: ksocknal_dist