1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5 * Author: Zach Brown <zab@zabbo.net>
6 * Author: Peter J. Braam <braam@clusterfs.com>
7 * Author: Phil Schwan <phil@clusterfs.com>
8 * Author: Eric Barton <eric@bartonsoftware.com>
10 * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
12 * Portals is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Portals is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Portals; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
28 # include <linux/syscalls.h>
32 * LIB functions follow
36 ksocknal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
38 /* I would guess that if ksocknal_get_peer (nid) == NULL,
39 and we're not routing, then 'nid' is very distant :) */
40 if (nal->libnal_ni.ni_pid.nid == nid) {
50 ksocknal_free_ltx (ksock_ltx_t *ltx)
52 atomic_dec(&ksocknal_data.ksnd_nactive_ltxs);
53 PORTAL_FREE(ltx, ltx->ltx_desc_size);
56 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
58 ksocknal_kvaddr_to_page (unsigned long vaddr)
62 if (vaddr >= VMALLOC_START &&
64 page = vmalloc_to_page ((void *)vaddr);
66 else if (vaddr >= PKMAP_BASE &&
67 vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
68 page = vmalloc_to_page ((void *)vaddr);
69 /* in 2.4 ^ just walks the page tables */
72 page = virt_to_page (vaddr);
83 ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
85 struct socket *sock = conn->ksnc_sock;
86 struct iovec *iov = tx->tx_iov;
87 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
88 unsigned long vaddr = (unsigned long)iov->iov_base
89 int offset = vaddr & (PAGE_SIZE - 1);
90 int zcsize = MIN (iov->iov_len, PAGE_SIZE - offset);
96 /* NB we can't trust socket ops to either consume our iovs
97 * or leave them alone. */
98 LASSERT (tx->tx_niov > 0);
100 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
101 if (zcsize >= ksocknal_data.ksnd_zc_min_frag &&
102 (sock->sk->route_caps & NETIF_F_SG) &&
103 (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
104 (page = ksocknal_kvaddr_to_page (vaddr)) != NULL) {
105 int msgflg = MSG_DONTWAIT;
107 CDEBUG(D_NET, "vaddr %p, page %p->%p + offset %x for %d\n",
108 (void *)vaddr, page, page_address(page), offset, zcsize);
110 if (!list_empty (&conn->ksnc_tx_queue) ||
111 zcsize < tx->tx_resid)
114 rc = tcp_sendpage_zccd(sock, page, offset, zcsize, msgflg, &tx->tx_zccd);
118 #if SOCKNAL_SINGLE_FRAG_TX
119 struct iovec scratch;
120 struct iovec *scratchiov = &scratch;
123 struct iovec *scratchiov = conn->ksnc_tx_scratch_iov;
124 int niov = tx->tx_niov;
126 struct msghdr msg = {
129 .msg_iov = scratchiov,
133 .msg_flags = MSG_DONTWAIT
135 mm_segment_t oldmm = get_fs();
138 for (nob = i = 0; i < niov; i++) {
139 scratchiov[i] = tx->tx_iov[i];
140 nob += scratchiov[i].iov_len;
143 if (!list_empty(&conn->ksnc_tx_queue) ||
145 msg.msg_flags |= MSG_MORE;
148 rc = sock_sendmsg(sock, &msg, nob);
152 if (rc <= 0) /* sent nothing? */
156 LASSERT (nob <= tx->tx_resid);
161 LASSERT (tx->tx_niov > 0);
163 if (nob < iov->iov_len) {
164 iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob);
178 ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
180 struct socket *sock = conn->ksnc_sock;
181 ptl_kiov_t *kiov = tx->tx_kiov;
185 /* NB we can't trust socket ops to either consume our iovs
186 * or leave them alone. */
187 LASSERT (tx->tx_niov == 0);
188 LASSERT (tx->tx_nkiov > 0);
191 if (kiov->kiov_len >= ksocknal_tunables.ksnd_zc_min_frag &&
192 (sock->sk->route_caps & NETIF_F_SG) &&
193 (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM))) {
194 struct page *page = kiov->kiov_page;
195 int offset = kiov->kiov_offset;
196 int fragsize = kiov->kiov_len;
197 int msgflg = MSG_DONTWAIT;
199 CDEBUG(D_NET, "page %p + offset %x for %d\n",
200 page, offset, kiov->kiov_len);
202 if (!list_empty(&conn->ksnc_tx_queue) ||
203 fragsize < tx->tx_resid)
206 rc = tcp_sendpage_zccd(sock, page, offset, fragsize, msgflg,
211 #if SOCKNAL_SINGLE_FRAG_TX || !SOCKNAL_RISK_KMAP_DEADLOCK
212 struct iovec scratch;
213 struct iovec *scratchiov = &scratch;
216 #warning "XXX risk of kmap deadlock on multiple frags..."
217 struct iovec *scratchiov = conn->ksnc_tx_scratch_iov;
218 int niov = tx->tx_nkiov;
220 struct msghdr msg = {
223 .msg_iov = scratchiov,
227 .msg_flags = MSG_DONTWAIT
229 mm_segment_t oldmm = get_fs();
232 for (nob = i = 0; i < niov; i++) {
233 scratchiov[i].iov_base = kmap(kiov[i].kiov_page) +
235 nob += scratchiov[i].iov_len = kiov[i].kiov_len;
238 if (!list_empty(&conn->ksnc_tx_queue) ||
240 msg.msg_flags |= MSG_DONTWAIT;
243 rc = sock_sendmsg(sock, &msg, nob);
246 for (i = 0; i < niov; i++)
247 kunmap(kiov[i].kiov_page);
250 if (rc <= 0) /* sent nothing? */
254 LASSERT (nob <= tx->tx_resid);
258 LASSERT(tx->tx_nkiov > 0);
260 if (nob < kiov->kiov_len) {
261 kiov->kiov_offset += nob;
262 kiov->kiov_len -= nob;
266 nob -= kiov->kiov_len;
267 tx->tx_kiov = ++kiov;
275 ksocknal_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
280 if (ksocknal_data.ksnd_stall_tx != 0) {
281 set_current_state (TASK_UNINTERRUPTIBLE);
282 schedule_timeout (ksocknal_data.ksnd_stall_tx * HZ);
285 LASSERT (tx->tx_resid != 0);
287 rc = ksocknal_getconnsock (conn);
289 LASSERT (conn->ksnc_closing);
294 if (ksocknal_data.ksnd_enomem_tx > 0) {
296 ksocknal_data.ksnd_enomem_tx--;
298 } else if (tx->tx_niov != 0) {
299 rc = ksocknal_send_iov (conn, tx);
301 rc = ksocknal_send_kiov (conn, tx);
304 bufnob = conn->ksnc_sock->sk->sk_wmem_queued;
305 if (rc > 0) /* sent something? */
306 conn->ksnc_tx_bufnob += rc; /* account it */
308 if (bufnob < conn->ksnc_tx_bufnob) {
309 /* allocated send buffer bytes < computed; infer
310 * something got ACKed */
311 conn->ksnc_tx_deadline = jiffies +
312 ksocknal_tunables.ksnd_io_timeout * HZ;
313 conn->ksnc_peer->ksnp_last_alive = jiffies;
314 conn->ksnc_tx_bufnob = bufnob;
318 if (rc <= 0) { /* Didn't write anything? */
320 ksock_sched_t *sched;
322 if (rc == 0) /* some stacks return 0 instead of -EAGAIN */
328 /* Check if EAGAIN is due to memory pressure */
330 sched = conn->ksnc_scheduler;
331 spin_lock_irqsave(&sched->kss_lock, flags);
333 if (!test_bit(SOCK_NOSPACE, &conn->ksnc_sock->flags) &&
334 !conn->ksnc_tx_ready) {
335 /* SOCK_NOSPACE is set when the socket fills
336 * and cleared in the write_space callback
337 * (which also sets ksnc_tx_ready). If
338 * SOCK_NOSPACE and ksnc_tx_ready are BOTH
339 * zero, I didn't fill the socket and
340 * write_space won't reschedule me, so I
341 * return -ENOMEM to get my caller to retry
346 spin_unlock_irqrestore(&sched->kss_lock, flags);
350 /* socket's wmem_queued now includes 'rc' bytes */
351 atomic_sub (rc, &conn->ksnc_tx_nob);
354 } while (tx->tx_resid != 0);
356 ksocknal_putconnsock (conn);
361 ksocknal_eager_ack (ksock_conn_t *conn)
364 mm_segment_t oldmm = get_fs();
365 struct socket *sock = conn->ksnc_sock;
367 /* Remind the socket to ACK eagerly. If I don't, the socket might
368 * think I'm about to send something it could piggy-back the ACK
369 * on, introducing delay in completing zero-copy sends in my
373 sock->ops->setsockopt (sock, SOL_TCP, TCP_QUICKACK,
374 (char *)&opt, sizeof (opt));
379 ksocknal_recv_iov (ksock_conn_t *conn)
381 #if SOCKNAL_SINGLE_FRAG_RX
382 struct iovec scratch;
383 struct iovec *scratchiov = &scratch;
386 struct iovec *scratchiov = conn->ksnc_rx_scratch_iov;
387 int niov = conn->ksnc_rx_niov;
389 struct iovec *iov = conn->ksnc_rx_iov;
390 struct msghdr msg = {
393 .msg_iov = scratchiov,
399 mm_segment_t oldmm = get_fs();
404 /* NB we can't trust socket ops to either consume our iovs
405 * or leave them alone. */
408 for (nob = i = 0; i < niov; i++) {
409 scratchiov[i] = iov[i];
410 nob += scratchiov[i].iov_len;
412 LASSERT (nob <= conn->ksnc_rx_nob_wanted);
415 rc = sock_recvmsg (conn->ksnc_sock, &msg, nob, MSG_DONTWAIT);
416 /* NB this is just a boolean..........................^ */
422 /* received something... */
425 conn->ksnc_peer->ksnp_last_alive = jiffies;
426 conn->ksnc_rx_deadline = jiffies +
427 ksocknal_tunables.ksnd_io_timeout * HZ;
428 mb(); /* order with setting rx_started */
429 conn->ksnc_rx_started = 1;
431 conn->ksnc_rx_nob_wanted -= nob;
432 conn->ksnc_rx_nob_left -= nob;
435 LASSERT (conn->ksnc_rx_niov > 0);
437 if (nob < iov->iov_len) {
439 iov->iov_base = (void *)(((unsigned long)iov->iov_base) + nob);
444 conn->ksnc_rx_iov = ++iov;
445 conn->ksnc_rx_niov--;
452 ksocknal_recv_kiov (ksock_conn_t *conn)
454 #if SOCKNAL_SINGLE_FRAG_RX || !SOCKNAL_RISK_KMAP_DEADLOCK
455 struct iovec scratch;
456 struct iovec *scratchiov = &scratch;
459 #warning "XXX risk of kmap deadlock on multiple frags..."
460 struct iovec *scratchiov = conn->ksnc_rx_scratch_iov;
461 int niov = conn->ksnc_rx_nkiov;
463 ptl_kiov_t *kiov = conn->ksnc_rx_kiov;
464 struct msghdr msg = {
467 .msg_iov = scratchiov,
473 mm_segment_t oldmm = get_fs();
478 LASSERT (conn->ksnc_rx_nkiov > 0);
480 /* NB we can't trust socket ops to either consume our iovs
481 * or leave them alone. */
482 for (nob = i = 0; i < niov; i++) {
483 scratchiov[i].iov_base = kmap(kiov[i].kiov_page) + kiov[i].kiov_offset;
484 nob += scratchiov[i].iov_len = kiov[i].kiov_len;
486 LASSERT (nob <= conn->ksnc_rx_nob_wanted);
489 rc = sock_recvmsg (conn->ksnc_sock, &msg, nob, MSG_DONTWAIT);
490 /* NB this is just a boolean.......................^ */
493 for (i = 0; i < niov; i++)
494 kunmap(kiov[i].kiov_page);
499 /* received something... */
502 conn->ksnc_peer->ksnp_last_alive = jiffies;
503 conn->ksnc_rx_deadline = jiffies +
504 ksocknal_tunables.ksnd_io_timeout * HZ;
505 mb(); /* order with setting rx_started */
506 conn->ksnc_rx_started = 1;
508 conn->ksnc_rx_nob_wanted -= nob;
509 conn->ksnc_rx_nob_left -= nob;
512 LASSERT (conn->ksnc_rx_nkiov > 0);
514 if (nob < kiov->kiov_len) {
515 kiov->kiov_offset += nob;
516 kiov->kiov_len -= nob;
520 nob -= kiov->kiov_len;
521 conn->ksnc_rx_kiov = ++kiov;
522 conn->ksnc_rx_nkiov--;
529 ksocknal_receive (ksock_conn_t *conn)
531 /* Return 1 on success, 0 on EOF, < 0 on error.
532 * Caller checks ksnc_rx_nob_wanted to determine
533 * progress/completion. */
537 if (ksocknal_data.ksnd_stall_rx != 0) {
538 set_current_state (TASK_UNINTERRUPTIBLE);
539 schedule_timeout (ksocknal_data.ksnd_stall_rx * HZ);
542 rc = ksocknal_getconnsock (conn);
544 LASSERT (conn->ksnc_closing);
549 if (conn->ksnc_rx_niov != 0)
550 rc = ksocknal_recv_iov (conn);
552 rc = ksocknal_recv_kiov (conn);
555 /* error/EOF or partial receive */
558 } else if (rc == 0 && conn->ksnc_rx_started) {
559 /* EOF in the middle of a message */
565 /* Completed a fragment */
567 if (conn->ksnc_rx_nob_wanted == 0) {
568 /* Completed a message segment (header or payload) */
569 if ((ksocknal_tunables.ksnd_eager_ack & conn->ksnc_type) != 0 &&
570 (conn->ksnc_rx_state == SOCKNAL_RX_BODY ||
571 conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD)) {
572 /* Remind the socket to ack eagerly... */
573 ksocknal_eager_ack(conn);
580 ksocknal_putconnsock (conn);
586 ksocknal_zc_callback (zccd_t *zcd)
588 ksock_tx_t *tx = KSOCK_ZCCD_2_TX(zcd);
589 ksock_sched_t *sched = tx->tx_conn->ksnc_scheduler;
593 /* Schedule tx for cleanup (can't do it now due to lock conflicts) */
595 spin_lock_irqsave (&sched->kss_lock, flags);
597 list_add_tail (&tx->tx_list, &sched->kss_zctxdone_list);
598 wake_up (&sched->kss_waitq);
600 spin_unlock_irqrestore (&sched->kss_lock, flags);
606 ksocknal_tx_done (ksock_tx_t *tx, int asynch)
611 if (tx->tx_conn != NULL) {
613 /* zero copy completion isn't always from
614 * process_transmit() so it needs to keep a ref on
617 ksocknal_put_conn (tx->tx_conn);
623 if (tx->tx_isfwd) { /* was a forwarded packet? */
624 kpr_fwd_done (&ksocknal_data.ksnd_router,
625 KSOCK_TX_2_KPR_FWD_DESC (tx),
626 (tx->tx_resid == 0) ? 0 : -ECONNABORTED);
632 ltx = KSOCK_TX_2_KSOCK_LTX (tx);
634 lib_finalize (&ksocknal_lib, ltx->ltx_private, ltx->ltx_cookie,
635 (tx->tx_resid == 0) ? PTL_OK : PTL_FAIL);
637 ksocknal_free_ltx (ltx);
642 ksocknal_tx_launched (ksock_tx_t *tx)
645 if (atomic_read (&tx->tx_zccd.zccd_count) != 1) {
646 ksock_conn_t *conn = tx->tx_conn;
648 /* zccd skbufs are still in-flight. First take a ref on
649 * conn, so it hangs about for ksocknal_tx_done... */
650 atomic_inc (&conn->ksnc_refcount);
652 /* ...then drop the initial ref on zccd, so the zero copy
653 * callback can occur */
654 zccd_put (&tx->tx_zccd);
658 /* Any zero-copy-ness (if any) has completed; I can complete the
659 * transmit now, avoiding an extra schedule */
660 ksocknal_tx_done (tx, 0);
664 ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
669 rc = ksocknal_transmit (conn, tx);
671 CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc);
673 if (tx->tx_resid == 0) {
674 /* Sent everything OK */
677 ksocknal_tx_launched (tx);
687 counter++; /* exponential backoff warnings */
688 if ((counter & (-counter)) == counter)
689 CWARN("%d ENOMEM tx %p\n", counter, conn);
691 /* Queue on ksnd_enomem_conns for retry after a timeout */
692 spin_lock_irqsave(&ksocknal_data.ksnd_reaper_lock, flags);
694 /* enomem list takes over scheduler's ref... */
695 LASSERT (conn->ksnc_tx_scheduled);
696 list_add_tail(&conn->ksnc_tx_list,
697 &ksocknal_data.ksnd_enomem_conns);
698 if (!time_after_eq(jiffies + SOCKNAL_ENOMEM_RETRY,
699 ksocknal_data.ksnd_reaper_waketime))
700 wake_up (&ksocknal_data.ksnd_reaper_waitq);
702 spin_unlock_irqrestore(&ksocknal_data.ksnd_reaper_lock, flags);
709 if (!conn->ksnc_closing)
710 CERROR("[%p] Error %d on write to "LPX64
711 " ip %d.%d.%d.%d:%d\n", conn, rc,
712 conn->ksnc_peer->ksnp_nid,
713 HIPQUAD(conn->ksnc_ipaddr),
716 ksocknal_close_conn_and_siblings (conn, rc);
717 ksocknal_tx_launched (tx);
723 ksocknal_launch_autoconnect_locked (ksock_route_t *route)
727 /* called holding write lock on ksnd_global_lock */
729 LASSERT (!route->ksnr_deleted);
730 LASSERT ((route->ksnr_connected & (1 << SOCKNAL_CONN_ANY)) == 0);
731 LASSERT ((route->ksnr_connected & KSNR_TYPED_ROUTES) != KSNR_TYPED_ROUTES);
732 LASSERT (route->ksnr_connecting == 0);
734 if (ksocknal_tunables.ksnd_typed_conns)
735 route->ksnr_connecting =
736 KSNR_TYPED_ROUTES & ~route->ksnr_connected;
738 route->ksnr_connecting = (1 << SOCKNAL_CONN_ANY);
740 atomic_inc (&route->ksnr_refcount); /* extra ref for asynchd */
742 spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
744 list_add_tail (&route->ksnr_connect_list,
745 &ksocknal_data.ksnd_autoconnectd_routes);
746 wake_up (&ksocknal_data.ksnd_autoconnectd_waitq);
748 spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
752 ksocknal_find_target_peer_locked (ksock_tx_t *tx, ptl_nid_t nid)
754 char ipbuf[PTL_NALFMT_SIZE];
755 ptl_nid_t target_nid;
757 ksock_peer_t *peer = ksocknal_find_peer_locked (nid);
763 CERROR ("Can't send packet to "LPX64
764 " %s: routed target is not a peer\n",
765 nid, portals_nid2str(SOCKNAL, nid, ipbuf));
769 rc = kpr_lookup (&ksocknal_data.ksnd_router, nid, tx->tx_nob,
772 CERROR ("Can't route to "LPX64" %s: router error %d\n",
773 nid, portals_nid2str(SOCKNAL, nid, ipbuf), rc);
777 peer = ksocknal_find_peer_locked (target_nid);
781 CERROR ("Can't send packet to "LPX64" %s: no peer entry\n",
782 target_nid, portals_nid2str(SOCKNAL, target_nid, ipbuf));
787 ksocknal_find_conn_locked (ksock_tx_t *tx, ksock_peer_t *peer)
789 struct list_head *tmp;
790 ksock_conn_t *typed = NULL;
792 ksock_conn_t *fallback = NULL;
796 list_for_each (tmp, &peer->ksnp_conns) {
797 ksock_conn_t *c = list_entry(tmp, ksock_conn_t, ksnc_list);
798 #if SOCKNAL_ROUND_ROBIN
801 int nob = atomic_read(&c->ksnc_tx_nob) +
802 c->ksnc_sock->sk->sk_wmem_queued;
804 LASSERT (!c->ksnc_closing);
806 if (fallback == NULL || nob < fnob) {
811 if (!ksocknal_tunables.ksnd_typed_conns)
814 switch (c->ksnc_type) {
817 case SOCKNAL_CONN_ANY:
819 case SOCKNAL_CONN_BULK_IN:
821 case SOCKNAL_CONN_BULK_OUT:
822 if (tx->tx_nob < ksocknal_tunables.ksnd_min_bulk)
825 case SOCKNAL_CONN_CONTROL:
826 if (tx->tx_nob >= ksocknal_tunables.ksnd_min_bulk)
831 if (typed == NULL || nob < tnob) {
837 /* prefer the typed selection */
838 conn = (typed != NULL) ? typed : fallback;
840 #if SOCKNAL_ROUND_ROBIN
842 /* round-robin all else being equal */
843 list_del (&conn->ksnc_list);
844 list_add_tail (&conn->ksnc_list, &peer->ksnp_conns);
851 ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
854 ksock_sched_t *sched = conn->ksnc_scheduler;
856 /* called holding global lock (read or irq-write) and caller may
857 * not have dropped this lock between finding conn and calling me,
858 * so we don't need the {get,put}connsock dance to deref
860 LASSERT(!conn->ksnc_closing);
861 LASSERT(tx->tx_resid == tx->tx_nob);
863 CDEBUG (D_NET, "Sending to "LPX64" ip %d.%d.%d.%d:%d\n",
864 conn->ksnc_peer->ksnp_nid,
865 HIPQUAD(conn->ksnc_ipaddr),
868 atomic_add (tx->tx_nob, &conn->ksnc_tx_nob);
872 zccd_init (&tx->tx_zccd, ksocknal_zc_callback);
873 /* NB this sets 1 ref on zccd, so the callback can only occur after
874 * I've released this ref. */
876 spin_lock_irqsave (&sched->kss_lock, flags);
878 if (list_empty(&conn->ksnc_tx_queue) &&
879 conn->ksnc_sock->sk->sk_wmem_queued == 0) {
880 /* First packet starts the timeout */
881 conn->ksnc_tx_deadline = jiffies +
882 ksocknal_tunables.ksnd_io_timeout * HZ;
883 conn->ksnc_tx_bufnob = 0;
884 mb(); /* order with adding to tx_queue */
887 list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
889 if (conn->ksnc_tx_ready && /* able to send */
890 !conn->ksnc_tx_scheduled) { /* not scheduled to send */
891 /* +1 ref for scheduler */
892 atomic_inc (&conn->ksnc_refcount);
893 list_add_tail (&conn->ksnc_tx_list,
894 &sched->kss_tx_conns);
895 conn->ksnc_tx_scheduled = 1;
896 wake_up (&sched->kss_waitq);
899 spin_unlock_irqrestore (&sched->kss_lock, flags);
903 ksocknal_find_connectable_route_locked (ksock_peer_t *peer)
905 struct list_head *tmp;
906 ksock_route_t *route;
909 list_for_each (tmp, &peer->ksnp_routes) {
910 route = list_entry (tmp, ksock_route_t, ksnr_list);
911 bits = route->ksnr_connected;
913 /* All typed connections established? */
914 if ((bits & KSNR_TYPED_ROUTES) == KSNR_TYPED_ROUTES)
917 /* Untyped connection established? */
918 if ((bits & (1 << SOCKNAL_CONN_ANY)) != 0)
921 /* connection being established? */
922 if (route->ksnr_connecting != 0)
925 /* too soon to retry this guy? */
926 if (!time_after_eq (jiffies, route->ksnr_timeout))
936 ksocknal_find_connecting_route_locked (ksock_peer_t *peer)
938 struct list_head *tmp;
939 ksock_route_t *route;
941 list_for_each (tmp, &peer->ksnp_routes) {
942 route = list_entry (tmp, ksock_route_t, ksnr_list);
944 if (route->ksnr_connecting != 0)
952 ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid)
957 ksock_route_t *route;
960 /* Ensure the frags we've been given EXACTLY match the number of
961 * bytes we want to send. Many TCP/IP stacks disregard any total
962 * size parameters passed to them and just look at the frags.
964 * We always expect at least 1 mapped fragment containing the
965 * complete portals header. */
966 LASSERT (lib_iov_nob (tx->tx_niov, tx->tx_iov) +
967 lib_kiov_nob (tx->tx_nkiov, tx->tx_kiov) == tx->tx_nob);
968 LASSERT (tx->tx_niov >= 1);
969 LASSERT (tx->tx_iov[0].iov_len >= sizeof (ptl_hdr_t));
971 CDEBUG (D_NET, "packet %p type %d, nob %d niov %d nkiov %d\n",
972 tx, ((ptl_hdr_t *)tx->tx_iov[0].iov_base)->type,
973 tx->tx_nob, tx->tx_niov, tx->tx_nkiov);
975 tx->tx_conn = NULL; /* only set when assigned a conn */
976 tx->tx_resid = tx->tx_nob;
977 tx->tx_hdr = (ptl_hdr_t *)tx->tx_iov[0].iov_base;
979 g_lock = &ksocknal_data.ksnd_global_lock;
980 #if !SOCKNAL_ROUND_ROBIN
983 peer = ksocknal_find_target_peer_locked (tx, nid);
985 read_unlock (g_lock);
986 return (-EHOSTUNREACH);
989 if (ksocknal_find_connectable_route_locked(peer) == NULL) {
990 conn = ksocknal_find_conn_locked (tx, peer);
992 /* I've got no autoconnect routes that need to be
993 * connecting and I do have an actual connection... */
994 ksocknal_queue_tx_locked (tx, conn);
995 read_unlock (g_lock);
1000 /* I'll need a write lock... */
1001 read_unlock (g_lock);
1003 write_lock_irqsave(g_lock, flags);
1005 peer = ksocknal_find_target_peer_locked (tx, nid);
1007 write_unlock_irqrestore(g_lock, flags);
1008 return (-EHOSTUNREACH);
1012 /* launch any/all autoconnections that need it */
1013 route = ksocknal_find_connectable_route_locked (peer);
1017 ksocknal_launch_autoconnect_locked (route);
1020 conn = ksocknal_find_conn_locked (tx, peer);
1022 /* Connection exists; queue message on it */
1023 ksocknal_queue_tx_locked (tx, conn);
1024 write_unlock_irqrestore (g_lock, flags);
1028 route = ksocknal_find_connecting_route_locked (peer);
1029 if (route != NULL) {
1030 /* At least 1 connection is being established; queue the
1032 list_add_tail (&tx->tx_list, &peer->ksnp_tx_queue);
1033 write_unlock_irqrestore (g_lock, flags);
1037 write_unlock_irqrestore (g_lock, flags);
1038 return (-EHOSTUNREACH);
1042 ksocknal_sendmsg(lib_nal_t *nal,
1049 unsigned int payload_niov,
1050 struct iovec *payload_iov,
1051 ptl_kiov_t *payload_kiov,
1052 size_t payload_offset,
1059 /* NB 'private' is different depending on what we're sending.
1060 * Just ignore it... */
1062 CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64
1063 " pid %d\n", payload_nob, payload_niov, nid , pid);
1065 LASSERT (payload_nob == 0 || payload_niov > 0);
1066 LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1068 /* It must be OK to kmap() if required */
1069 LASSERT (payload_kiov == NULL || !in_interrupt ());
1070 /* payload is either all vaddrs or all pages */
1071 LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1073 if (payload_iov != NULL)
1074 desc_size = offsetof(ksock_ltx_t, ltx_iov[1 + payload_niov]);
1076 desc_size = offsetof(ksock_ltx_t, ltx_kiov[payload_niov]);
1078 if (in_interrupt() ||
1079 type == PTL_MSG_ACK ||
1080 type == PTL_MSG_REPLY) {
1081 /* Can't block if in interrupt or responding to an incoming
1083 PORTAL_ALLOC_ATOMIC(ltx, desc_size);
1085 PORTAL_ALLOC(ltx, desc_size);
1089 CERROR("Can't allocate tx desc type %d size %d %s\n",
1090 type, desc_size, in_interrupt() ? "(intr)" : "");
1091 return (PTL_NO_SPACE);
1094 atomic_inc(&ksocknal_data.ksnd_nactive_ltxs);
1096 ltx->ltx_desc_size = desc_size;
1098 /* We always have 1 mapped frag for the header */
1099 ltx->ltx_tx.tx_iov = ltx->ltx_iov;
1100 ltx->ltx_iov[0].iov_base = <x->ltx_hdr;
1101 ltx->ltx_iov[0].iov_len = sizeof(*hdr);
1102 ltx->ltx_hdr = *hdr;
1104 ltx->ltx_private = private;
1105 ltx->ltx_cookie = cookie;
1107 ltx->ltx_tx.tx_isfwd = 0;
1108 ltx->ltx_tx.tx_nob = sizeof (*hdr) + payload_nob;
1110 if (payload_iov != NULL) {
1111 /* payload is all mapped */
1112 ltx->ltx_tx.tx_kiov = NULL;
1113 ltx->ltx_tx.tx_nkiov = 0;
1115 ltx->ltx_tx.tx_niov =
1116 1 + lib_extract_iov(payload_niov, <x->ltx_iov[1],
1117 payload_niov, payload_iov,
1118 payload_offset, payload_nob);
1120 /* payload is all pages */
1121 ltx->ltx_tx.tx_niov = 1;
1123 ltx->ltx_tx.tx_kiov = ltx->ltx_kiov;
1124 ltx->ltx_tx.tx_nkiov =
1125 lib_extract_kiov(payload_niov, ltx->ltx_kiov,
1126 payload_niov, payload_kiov,
1127 payload_offset, payload_nob);
1130 rc = ksocknal_launch_packet(<x->ltx_tx, nid);
1134 ksocknal_free_ltx(ltx);
1139 ksocknal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
1140 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1141 unsigned int payload_niov, struct iovec *payload_iov,
1142 size_t payload_offset, size_t payload_len)
1144 return (ksocknal_sendmsg(nal, private, cookie,
1145 hdr, type, nid, pid,
1146 payload_niov, payload_iov, NULL,
1147 payload_offset, payload_len));
1151 ksocknal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie,
1152 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1153 unsigned int payload_niov, ptl_kiov_t *payload_kiov,
1154 size_t payload_offset, size_t payload_len)
1156 return (ksocknal_sendmsg(nal, private, cookie,
1157 hdr, type, nid, pid,
1158 payload_niov, NULL, payload_kiov,
1159 payload_offset, payload_len));
1163 ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
1165 ptl_nid_t nid = fwd->kprfd_gateway_nid;
1166 ksock_ftx_t *ftx = (ksock_ftx_t *)&fwd->kprfd_scratch;
1169 CDEBUG (D_NET, "Forwarding [%p] -> "LPX64" ("LPX64"))\n", fwd,
1170 fwd->kprfd_gateway_nid, fwd->kprfd_target_nid);
1172 /* I'm the gateway; must be the last hop */
1173 if (nid == ksocknal_lib.libnal_ni.ni_pid.nid)
1174 nid = fwd->kprfd_target_nid;
1176 /* setup iov for hdr */
1177 ftx->ftx_iov.iov_base = fwd->kprfd_hdr;
1178 ftx->ftx_iov.iov_len = sizeof(ptl_hdr_t);
1180 ftx->ftx_tx.tx_isfwd = 1; /* This is a forwarding packet */
1181 ftx->ftx_tx.tx_nob = sizeof(ptl_hdr_t) + fwd->kprfd_nob;
1182 ftx->ftx_tx.tx_niov = 1;
1183 ftx->ftx_tx.tx_iov = &ftx->ftx_iov;
1184 ftx->ftx_tx.tx_nkiov = fwd->kprfd_niov;
1185 ftx->ftx_tx.tx_kiov = fwd->kprfd_kiov;
1187 rc = ksocknal_launch_packet (&ftx->ftx_tx, nid);
1189 kpr_fwd_done (&ksocknal_data.ksnd_router, fwd, rc);
1193 ksocknal_thread_start (int (*fn)(void *arg), void *arg)
1195 long pid = kernel_thread (fn, arg, 0);
1196 unsigned long flags;
1201 write_lock_irqsave(&ksocknal_data.ksnd_global_lock, flags);
1202 ksocknal_data.ksnd_nthreads++;
1203 write_unlock_irqrestore(&ksocknal_data.ksnd_global_lock, flags);
1208 ksocknal_thread_fini (void)
1210 unsigned long flags;
1212 write_lock_irqsave(&ksocknal_data.ksnd_global_lock, flags);
1213 ksocknal_data.ksnd_nthreads--;
1214 write_unlock_irqrestore(&ksocknal_data.ksnd_global_lock, flags);
1218 ksocknal_fmb_callback (void *arg, int error)
1220 ksock_fmb_t *fmb = (ksock_fmb_t *)arg;
1221 ksock_fmb_pool_t *fmp = fmb->fmb_pool;
1222 ptl_hdr_t *hdr = &fmb->fmb_hdr;
1223 ksock_conn_t *conn = NULL;
1224 ksock_sched_t *sched;
1225 unsigned long flags;
1226 char ipbuf[PTL_NALFMT_SIZE];
1227 char ipbuf2[PTL_NALFMT_SIZE];
1230 CERROR("Failed to route packet from "
1231 LPX64" %s to "LPX64" %s: %d\n",
1232 le64_to_cpu(hdr->src_nid),
1233 portals_nid2str(SOCKNAL, le64_to_cpu(hdr->src_nid), ipbuf),
1234 le64_to_cpu(hdr->dest_nid),
1235 portals_nid2str(SOCKNAL, le64_to_cpu(hdr->dest_nid), ipbuf2),
1238 CDEBUG (D_NET, "routed packet from "LPX64" to "LPX64": OK\n",
1239 le64_to_cpu(hdr->src_nid), le64_to_cpu(hdr->dest_nid));
1241 /* drop peer ref taken on init */
1242 ksocknal_put_peer (fmb->fmb_peer);
1244 spin_lock_irqsave (&fmp->fmp_lock, flags);
1246 list_add (&fmb->fmb_list, &fmp->fmp_idle_fmbs);
1247 fmp->fmp_nactive_fmbs--;
1249 if (!list_empty (&fmp->fmp_blocked_conns)) {
1250 conn = list_entry (fmb->fmb_pool->fmp_blocked_conns.next,
1251 ksock_conn_t, ksnc_rx_list);
1252 list_del (&conn->ksnc_rx_list);
1255 spin_unlock_irqrestore (&fmp->fmp_lock, flags);
1260 CDEBUG (D_NET, "Scheduling conn %p\n", conn);
1261 LASSERT (conn->ksnc_rx_scheduled);
1262 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP);
1264 conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;
1266 sched = conn->ksnc_scheduler;
1268 spin_lock_irqsave (&sched->kss_lock, flags);
1270 list_add_tail (&conn->ksnc_rx_list, &sched->kss_rx_conns);
1271 wake_up (&sched->kss_waitq);
1273 spin_unlock_irqrestore (&sched->kss_lock, flags);
1277 ksocknal_get_idle_fmb (ksock_conn_t *conn)
1279 int payload_nob = conn->ksnc_rx_nob_left;
1280 unsigned long flags;
1281 ksock_fmb_pool_t *pool;
1284 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
1285 LASSERT (kpr_routing(&ksocknal_data.ksnd_router));
1287 if (payload_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE)
1288 pool = &ksocknal_data.ksnd_small_fmp;
1290 pool = &ksocknal_data.ksnd_large_fmp;
1292 spin_lock_irqsave (&pool->fmp_lock, flags);
1294 if (!list_empty (&pool->fmp_idle_fmbs)) {
1295 fmb = list_entry(pool->fmp_idle_fmbs.next,
1296 ksock_fmb_t, fmb_list);
1297 list_del (&fmb->fmb_list);
1298 pool->fmp_nactive_fmbs++;
1299 spin_unlock_irqrestore (&pool->fmp_lock, flags);
1304 /* deschedule until fmb free */
1306 conn->ksnc_rx_state = SOCKNAL_RX_FMB_SLEEP;
1308 list_add_tail (&conn->ksnc_rx_list,
1309 &pool->fmp_blocked_conns);
1311 spin_unlock_irqrestore (&pool->fmp_lock, flags);
1316 ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
1318 int payload_nob = conn->ksnc_rx_nob_left;
1319 ptl_nid_t dest_nid = le64_to_cpu(conn->ksnc_hdr.dest_nid);
1321 int nob = payload_nob;
1323 LASSERT (conn->ksnc_rx_scheduled);
1324 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
1325 LASSERT (conn->ksnc_rx_nob_wanted == conn->ksnc_rx_nob_left);
1326 LASSERT (payload_nob >= 0);
1327 LASSERT (payload_nob <= fmb->fmb_pool->fmp_buff_pages * PAGE_SIZE);
1328 LASSERT (sizeof (ptl_hdr_t) < PAGE_SIZE);
1329 LASSERT (fmb->fmb_kiov[0].kiov_offset == 0);
1331 /* Take a ref on the conn's peer to prevent module unload before
1332 * forwarding completes. */
1333 fmb->fmb_peer = conn->ksnc_peer;
1334 atomic_inc (&conn->ksnc_peer->ksnp_refcount);
1336 /* Copy the header we just read into the forwarding buffer. If
1337 * there's payload, start reading reading it into the buffer,
1338 * otherwise the forwarding buffer can be kicked off
1340 fmb->fmb_hdr = conn->ksnc_hdr;
1343 LASSERT (niov < fmb->fmb_pool->fmp_buff_pages);
1344 LASSERT (fmb->fmb_kiov[niov].kiov_offset == 0);
1345 fmb->fmb_kiov[niov].kiov_len = MIN (PAGE_SIZE, nob);
1350 kpr_fwd_init(&fmb->fmb_fwd, dest_nid, &fmb->fmb_hdr,
1351 payload_nob, niov, fmb->fmb_kiov,
1352 ksocknal_fmb_callback, fmb);
1354 if (payload_nob == 0) { /* got complete packet already */
1355 CDEBUG (D_NET, "%p "LPX64"->"LPX64" fwd_start (immediate)\n",
1356 conn, le64_to_cpu(conn->ksnc_hdr.src_nid), dest_nid);
1358 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
1360 ksocknal_new_packet (conn, 0); /* on to next packet */
1364 conn->ksnc_cookie = fmb; /* stash fmb for later */
1365 conn->ksnc_rx_state = SOCKNAL_RX_BODY_FWD; /* read in the payload */
1367 /* Set up conn->ksnc_rx_kiov to read the payload into fmb's kiov-ed
1369 LASSERT (niov <= sizeof(conn->ksnc_rx_iov_space)/sizeof(ptl_kiov_t));
1371 conn->ksnc_rx_niov = 0;
1372 conn->ksnc_rx_nkiov = niov;
1373 conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
1374 memcpy(conn->ksnc_rx_kiov, fmb->fmb_kiov, niov * sizeof(ptl_kiov_t));
1376 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d reading body\n", conn,
1377 le64_to_cpu(conn->ksnc_hdr.src_nid), dest_nid, payload_nob);
1382 ksocknal_fwd_parse (ksock_conn_t *conn)
1385 ptl_nid_t dest_nid = le64_to_cpu(conn->ksnc_hdr.dest_nid);
1386 ptl_nid_t src_nid = le64_to_cpu(conn->ksnc_hdr.src_nid);
1387 int body_len = le32_to_cpu(conn->ksnc_hdr.payload_length);
1388 char str[PTL_NALFMT_SIZE];
1389 char str2[PTL_NALFMT_SIZE];
1391 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d parsing header\n", conn,
1392 src_nid, dest_nid, conn->ksnc_rx_nob_left);
1394 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER);
1395 LASSERT (conn->ksnc_rx_scheduled);
1397 if (body_len < 0) { /* length corrupt (overflow) */
1398 CERROR("dropping packet from "LPX64" (%s) for "LPX64" (%s): "
1399 "packet size %d illegal\n",
1400 src_nid, portals_nid2str(TCPNAL, src_nid, str),
1401 dest_nid, portals_nid2str(TCPNAL, dest_nid, str2),
1404 ksocknal_new_packet (conn, 0); /* on to new packet */
1408 if (!kpr_routing(&ksocknal_data.ksnd_router)) { /* not forwarding */
1409 CERROR("dropping packet from "LPX64" (%s) for "LPX64
1410 " (%s): not forwarding\n",
1411 src_nid, portals_nid2str(TCPNAL, src_nid, str),
1412 dest_nid, portals_nid2str(TCPNAL, dest_nid, str2));
1413 /* on to new packet (skip this one's body) */
1414 ksocknal_new_packet (conn, body_len);
1418 if (body_len > PTL_MTU) { /* too big to forward */
1419 CERROR ("dropping packet from "LPX64" (%s) for "LPX64
1420 "(%s): packet size %d too big\n",
1421 src_nid, portals_nid2str(TCPNAL, src_nid, str),
1422 dest_nid, portals_nid2str(TCPNAL, dest_nid, str2),
1424 /* on to new packet (skip this one's body) */
1425 ksocknal_new_packet (conn, body_len);
1429 /* should have gone direct */
1430 peer = ksocknal_get_peer (conn->ksnc_hdr.dest_nid);
1432 CERROR ("dropping packet from "LPX64" (%s) for "LPX64
1433 "(%s): target is a peer\n",
1434 src_nid, portals_nid2str(TCPNAL, src_nid, str),
1435 dest_nid, portals_nid2str(TCPNAL, dest_nid, str2));
1436 ksocknal_put_peer (peer); /* drop ref from get above */
1438 /* on to next packet (skip this one's body) */
1439 ksocknal_new_packet (conn, body_len);
1443 conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB; /* Getting FMB now */
1444 conn->ksnc_rx_nob_left = body_len; /* stash packet size */
1445 conn->ksnc_rx_nob_wanted = body_len; /* (no slop) */
1449 ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip)
1451 static char ksocknal_slop_buffer[4096];
1457 if (nob_to_skip == 0) { /* right at next packet boundary now */
1458 conn->ksnc_rx_started = 0;
1459 mb (); /* racing with timeout thread */
1461 conn->ksnc_rx_state = SOCKNAL_RX_HEADER;
1462 conn->ksnc_rx_nob_wanted = sizeof (ptl_hdr_t);
1463 conn->ksnc_rx_nob_left = sizeof (ptl_hdr_t);
1465 conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1466 conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_hdr;
1467 conn->ksnc_rx_iov[0].iov_len = sizeof (ptl_hdr_t);
1468 conn->ksnc_rx_niov = 1;
1470 conn->ksnc_rx_kiov = NULL;
1471 conn->ksnc_rx_nkiov = 0;
1475 /* Set up to skip as much a possible now. If there's more left
1476 * (ran out of iov entries) we'll get called again */
1478 conn->ksnc_rx_state = SOCKNAL_RX_SLOP;
1479 conn->ksnc_rx_nob_left = nob_to_skip;
1480 conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1485 nob = MIN (nob_to_skip, sizeof (ksocknal_slop_buffer));
1487 conn->ksnc_rx_iov[niov].iov_base = ksocknal_slop_buffer;
1488 conn->ksnc_rx_iov[niov].iov_len = nob;
1493 } while (nob_to_skip != 0 && /* mustn't overflow conn's rx iov */
1494 niov < sizeof(conn->ksnc_rx_iov_space) / sizeof (struct iovec));
1496 conn->ksnc_rx_niov = niov;
1497 conn->ksnc_rx_kiov = NULL;
1498 conn->ksnc_rx_nkiov = 0;
1499 conn->ksnc_rx_nob_wanted = skipped;
1504 ksocknal_process_receive (ksock_conn_t *conn)
1509 LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
1511 /* doesn't need a forwarding buffer */
1512 if (conn->ksnc_rx_state != SOCKNAL_RX_GET_FMB)
1516 fmb = ksocknal_get_idle_fmb (conn);
1518 /* conn descheduled waiting for idle fmb */
1522 if (ksocknal_init_fmb (conn, fmb)) {
1523 /* packet forwarded */
1528 /* NB: sched lock NOT held */
1529 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER ||
1530 conn->ksnc_rx_state == SOCKNAL_RX_BODY ||
1531 conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD ||
1532 conn->ksnc_rx_state == SOCKNAL_RX_SLOP);
1534 LASSERT (conn->ksnc_rx_nob_wanted > 0);
1536 rc = ksocknal_receive(conn);
1539 LASSERT (rc != -EAGAIN);
1542 CWARN ("[%p] EOF from "LPX64" ip %d.%d.%d.%d:%d\n",
1543 conn, conn->ksnc_peer->ksnp_nid,
1544 HIPQUAD(conn->ksnc_ipaddr),
1546 else if (!conn->ksnc_closing)
1547 CERROR ("[%p] Error %d on read from "LPX64
1548 " ip %d.%d.%d.%d:%d\n",
1549 conn, rc, conn->ksnc_peer->ksnp_nid,
1550 HIPQUAD(conn->ksnc_ipaddr),
1553 ksocknal_close_conn_and_siblings (conn, rc);
1554 return (rc == 0 ? -ESHUTDOWN : rc);
1557 if (conn->ksnc_rx_nob_wanted != 0) {
1562 switch (conn->ksnc_rx_state) {
1563 case SOCKNAL_RX_HEADER:
1564 if (conn->ksnc_hdr.type != cpu_to_le32(PTL_MSG_HELLO) &&
1565 le64_to_cpu(conn->ksnc_hdr.dest_nid) !=
1566 ksocknal_lib.libnal_ni.ni_pid.nid) {
1567 /* This packet isn't for me */
1568 ksocknal_fwd_parse (conn);
1569 switch (conn->ksnc_rx_state) {
1570 case SOCKNAL_RX_HEADER: /* skipped (zero payload) */
1571 return (0); /* => come back later */
1572 case SOCKNAL_RX_SLOP: /* skipping packet's body */
1573 goto try_read; /* => go read it */
1574 case SOCKNAL_RX_GET_FMB: /* forwarding */
1575 goto get_fmb; /* => go get a fwd msg buffer */
1582 /* sets wanted_len, iovs etc */
1583 rc = lib_parse(&ksocknal_lib, &conn->ksnc_hdr, conn);
1586 /* I just received garbage: give up on this conn */
1587 ksocknal_close_conn_and_siblings (conn, rc);
1591 if (conn->ksnc_rx_nob_wanted != 0) { /* need to get payload? */
1592 conn->ksnc_rx_state = SOCKNAL_RX_BODY;
1593 goto try_read; /* go read the payload */
1595 /* Fall through (completed packet for me) */
1597 case SOCKNAL_RX_BODY:
1598 /* payload all received */
1599 lib_finalize(&ksocknal_lib, NULL, conn->ksnc_cookie, PTL_OK);
1602 case SOCKNAL_RX_SLOP:
1603 /* starting new packet? */
1604 if (ksocknal_new_packet (conn, conn->ksnc_rx_nob_left))
1605 return (0); /* come back later */
1606 goto try_read; /* try to finish reading slop now */
1608 case SOCKNAL_RX_BODY_FWD:
1609 /* payload all received */
1610 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (got body)\n",
1611 conn, le64_to_cpu(conn->ksnc_hdr.src_nid),
1612 le64_to_cpu(conn->ksnc_hdr.dest_nid),
1613 conn->ksnc_rx_nob_left);
1615 /* forward the packet. NB ksocknal_init_fmb() put fmb into
1616 * conn->ksnc_cookie */
1617 fmb = (ksock_fmb_t *)conn->ksnc_cookie;
1618 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
1620 /* no slop in forwarded packets */
1621 LASSERT (conn->ksnc_rx_nob_left == 0);
1623 ksocknal_new_packet (conn, 0); /* on to next packet */
1624 return (0); /* (later) */
1632 return (-EINVAL); /* keep gcc happy */
1636 ksocknal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
1637 unsigned int niov, struct iovec *iov,
1638 size_t offset, size_t mlen, size_t rlen)
1640 ksock_conn_t *conn = (ksock_conn_t *)private;
1642 LASSERT (mlen <= rlen);
1643 LASSERT (niov <= PTL_MD_MAX_IOV);
1645 conn->ksnc_cookie = msg;
1646 conn->ksnc_rx_nob_wanted = mlen;
1647 conn->ksnc_rx_nob_left = rlen;
1649 conn->ksnc_rx_nkiov = 0;
1650 conn->ksnc_rx_kiov = NULL;
1651 conn->ksnc_rx_iov = conn->ksnc_rx_iov_space.iov;
1652 conn->ksnc_rx_niov =
1653 lib_extract_iov(PTL_MD_MAX_IOV, conn->ksnc_rx_iov,
1654 niov, iov, offset, mlen);
1657 lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1658 lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1664 ksocknal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
1665 unsigned int niov, ptl_kiov_t *kiov,
1666 size_t offset, size_t mlen, size_t rlen)
1668 ksock_conn_t *conn = (ksock_conn_t *)private;
1670 LASSERT (mlen <= rlen);
1671 LASSERT (niov <= PTL_MD_MAX_IOV);
1673 conn->ksnc_cookie = msg;
1674 conn->ksnc_rx_nob_wanted = mlen;
1675 conn->ksnc_rx_nob_left = rlen;
1677 conn->ksnc_rx_niov = 0;
1678 conn->ksnc_rx_iov = NULL;
1679 conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
1680 conn->ksnc_rx_nkiov =
1681 lib_extract_kiov(PTL_MD_MAX_IOV, conn->ksnc_rx_kiov,
1682 niov, kiov, offset, mlen);
1685 lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1686 lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1692 ksocknal_sched_cansleep(ksock_sched_t *sched)
1694 unsigned long flags;
1697 spin_lock_irqsave(&sched->kss_lock, flags);
1699 rc = (!ksocknal_data.ksnd_shuttingdown &&
1701 list_empty(&sched->kss_zctxdone_list) &&
1703 list_empty(&sched->kss_rx_conns) &&
1704 list_empty(&sched->kss_tx_conns));
1706 spin_unlock_irqrestore(&sched->kss_lock, flags);
1710 int ksocknal_scheduler (void *arg)
1712 ksock_sched_t *sched = (ksock_sched_t *)arg;
1715 unsigned long flags;
1718 int id = sched - ksocknal_data.ksnd_schedulers;
1721 snprintf (name, sizeof (name),"ksocknald_%02d", id);
1722 kportal_daemonize (name);
1723 kportal_blockallsigs ();
1725 #if (CONFIG_SMP && CPU_AFFINITY)
1726 id = ksocknal_sched2cpu(id);
1727 if (cpu_online(id)) {
1730 set_cpus_allowed(current, m);
1732 CERROR ("Can't set CPU affinity for %s to %d\n", name, id);
1734 #endif /* CONFIG_SMP && CPU_AFFINITY */
1736 spin_lock_irqsave (&sched->kss_lock, flags);
1738 while (!ksocknal_data.ksnd_shuttingdown) {
1739 int did_something = 0;
1741 /* Ensure I progress everything semi-fairly */
1743 if (!list_empty (&sched->kss_rx_conns)) {
1744 conn = list_entry(sched->kss_rx_conns.next,
1745 ksock_conn_t, ksnc_rx_list);
1746 list_del(&conn->ksnc_rx_list);
1748 LASSERT(conn->ksnc_rx_scheduled);
1749 LASSERT(conn->ksnc_rx_ready);
1751 /* clear rx_ready in case receive isn't complete.
1752 * Do it BEFORE we call process_recv, since
1753 * data_ready can set it any time after we release
1755 conn->ksnc_rx_ready = 0;
1756 spin_unlock_irqrestore(&sched->kss_lock, flags);
1758 rc = ksocknal_process_receive(conn);
1760 spin_lock_irqsave(&sched->kss_lock, flags);
1762 /* I'm the only one that can clear this flag */
1763 LASSERT(conn->ksnc_rx_scheduled);
1765 /* Did process_receive get everything it wanted? */
1767 conn->ksnc_rx_ready = 1;
1769 if (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP ||
1770 conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB) {
1771 /* Conn blocked for a forwarding buffer.
1772 * It will get queued for my attention when
1773 * one becomes available (and it might just
1774 * already have been!). Meanwhile my ref
1775 * on it stays put. */
1776 } else if (conn->ksnc_rx_ready) {
1777 /* reschedule for rx */
1778 list_add_tail (&conn->ksnc_rx_list,
1779 &sched->kss_rx_conns);
1781 conn->ksnc_rx_scheduled = 0;
1783 ksocknal_put_conn(conn);
1789 if (!list_empty (&sched->kss_tx_conns)) {
1790 conn = list_entry(sched->kss_tx_conns.next,
1791 ksock_conn_t, ksnc_tx_list);
1792 list_del (&conn->ksnc_tx_list);
1794 LASSERT(conn->ksnc_tx_scheduled);
1795 LASSERT(conn->ksnc_tx_ready);
1796 LASSERT(!list_empty(&conn->ksnc_tx_queue));
1798 tx = list_entry(conn->ksnc_tx_queue.next,
1799 ksock_tx_t, tx_list);
1800 /* dequeue now so empty list => more to send */
1801 list_del(&tx->tx_list);
1803 /* Clear tx_ready in case send isn't complete. Do
1804 * it BEFORE we call process_transmit, since
1805 * write_space can set it any time after we release
1807 conn->ksnc_tx_ready = 0;
1808 spin_unlock_irqrestore (&sched->kss_lock, flags);
1810 rc = ksocknal_process_transmit(conn, tx);
1812 spin_lock_irqsave (&sched->kss_lock, flags);
1814 if (rc == -ENOMEM || rc == -EAGAIN) {
1815 /* Incomplete send: replace tx on HEAD of tx_queue */
1816 list_add (&tx->tx_list, &conn->ksnc_tx_queue);
1818 /* Complete send; assume space for more */
1819 conn->ksnc_tx_ready = 1;
1822 if (rc == -ENOMEM) {
1823 /* Do nothing; after a short timeout, this
1824 * conn will be reposted on kss_tx_conns. */
1825 } else if (conn->ksnc_tx_ready &&
1826 !list_empty (&conn->ksnc_tx_queue)) {
1827 /* reschedule for tx */
1828 list_add_tail (&conn->ksnc_tx_list,
1829 &sched->kss_tx_conns);
1831 conn->ksnc_tx_scheduled = 0;
1833 ksocknal_put_conn (conn);
1839 if (!list_empty (&sched->kss_zctxdone_list)) {
1841 list_entry(sched->kss_zctxdone_list.next,
1842 ksock_tx_t, tx_list);
1845 list_del (&tx->tx_list);
1846 spin_unlock_irqrestore (&sched->kss_lock, flags);
1848 ksocknal_tx_done (tx, 1);
1850 spin_lock_irqsave (&sched->kss_lock, flags);
1853 if (!did_something || /* nothing to do */
1854 ++nloops == SOCKNAL_RESCHED) { /* hogging CPU? */
1855 spin_unlock_irqrestore (&sched->kss_lock, flags);
1859 if (!did_something) { /* wait for something to do */
1860 rc = wait_event_interruptible (sched->kss_waitq,
1861 !ksocknal_sched_cansleep(sched));
1866 spin_lock_irqsave (&sched->kss_lock, flags);
1870 spin_unlock_irqrestore (&sched->kss_lock, flags);
1871 ksocknal_thread_fini ();
1876 ksocknal_data_ready (struct sock *sk, int n)
1878 unsigned long flags;
1880 ksock_sched_t *sched;
1883 /* interleave correctly with closing sockets... */
1884 read_lock (&ksocknal_data.ksnd_global_lock);
1886 conn = sk->sk_user_data;
1887 if (conn == NULL) { /* raced with ksocknal_terminate_conn */
1888 LASSERT (sk->sk_data_ready != &ksocknal_data_ready);
1889 sk->sk_data_ready (sk, n);
1891 sched = conn->ksnc_scheduler;
1893 spin_lock_irqsave (&sched->kss_lock, flags);
1895 conn->ksnc_rx_ready = 1;
1897 if (!conn->ksnc_rx_scheduled) { /* not being progressed */
1898 list_add_tail(&conn->ksnc_rx_list,
1899 &sched->kss_rx_conns);
1900 conn->ksnc_rx_scheduled = 1;
1901 /* extra ref for scheduler */
1902 atomic_inc (&conn->ksnc_refcount);
1904 wake_up (&sched->kss_waitq);
1907 spin_unlock_irqrestore (&sched->kss_lock, flags);
1910 read_unlock (&ksocknal_data.ksnd_global_lock);
1916 ksocknal_write_space (struct sock *sk)
1918 unsigned long flags;
1920 ksock_sched_t *sched;
1922 /* interleave correctly with closing sockets... */
1923 read_lock (&ksocknal_data.ksnd_global_lock);
1925 conn = sk->sk_user_data;
1927 CDEBUG(D_NET, "sk %p wspace %d low water %d conn %p%s%s%s\n",
1928 sk, tcp_wspace(sk), SOCKNAL_TX_LOW_WATER(sk), conn,
1929 (conn == NULL) ? "" : (conn->ksnc_tx_ready ?
1930 " ready" : " blocked"),
1931 (conn == NULL) ? "" : (conn->ksnc_tx_scheduled ?
1932 " scheduled" : " idle"),
1933 (conn == NULL) ? "" : (list_empty (&conn->ksnc_tx_queue) ?
1934 " empty" : " queued"));
1936 if (conn == NULL) { /* raced with ksocknal_terminate_conn */
1937 LASSERT (sk->sk_write_space != &ksocknal_write_space);
1938 sk->sk_write_space (sk);
1940 read_unlock (&ksocknal_data.ksnd_global_lock);
1944 if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */
1945 sched = conn->ksnc_scheduler;
1947 spin_lock_irqsave (&sched->kss_lock, flags);
1949 clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags);
1950 conn->ksnc_tx_ready = 1;
1952 if (!conn->ksnc_tx_scheduled && // not being progressed
1953 !list_empty(&conn->ksnc_tx_queue)){//packets to send
1954 list_add_tail (&conn->ksnc_tx_list,
1955 &sched->kss_tx_conns);
1956 conn->ksnc_tx_scheduled = 1;
1957 /* extra ref for scheduler */
1958 atomic_inc (&conn->ksnc_refcount);
1960 wake_up (&sched->kss_waitq);
1963 spin_unlock_irqrestore (&sched->kss_lock, flags);
1966 read_unlock (&ksocknal_data.ksnd_global_lock);
1970 ksocknal_sock_write (struct socket *sock, void *buffer, int nob)
1973 mm_segment_t oldmm = get_fs();
1976 struct iovec iov = {
1980 struct msghdr msg = {
1985 .msg_control = NULL,
1986 .msg_controllen = 0,
1991 rc = sock_sendmsg (sock, &msg, iov.iov_len);
1998 CERROR ("Unexpected zero rc\n");
1999 return (-ECONNABORTED);
2002 buffer = ((char *)buffer) + rc;
2010 ksocknal_sock_read (struct socket *sock, void *buffer, int nob)
2013 mm_segment_t oldmm = get_fs();
2016 struct iovec iov = {
2020 struct msghdr msg = {
2025 .msg_control = NULL,
2026 .msg_controllen = 0,
2031 rc = sock_recvmsg (sock, &msg, iov.iov_len, 0);
2038 return (-ECONNABORTED);
2040 buffer = ((char *)buffer) + rc;
2048 ksocknal_send_hello (ksock_conn_t *conn, __u32 *ipaddrs, int nipaddrs)
2050 /* CAVEAT EMPTOR: this byte flips 'ipaddrs' */
2051 struct socket *sock = conn->ksnc_sock;
2053 ptl_magicversion_t *hmv = (ptl_magicversion_t *)&hdr.dest_nid;
2057 LASSERT (conn->ksnc_type != SOCKNAL_CONN_NONE);
2058 LASSERT (nipaddrs <= SOCKNAL_MAX_INTERFACES);
2060 /* No need for getconnsock/putconnsock */
2061 LASSERT (!conn->ksnc_closing);
2063 LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
2064 hmv->magic = cpu_to_le32 (PORTALS_PROTO_MAGIC);
2065 hmv->version_major = cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR);
2066 hmv->version_minor = cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR);
2068 hdr.src_nid = cpu_to_le64 (ksocknal_lib.libnal_ni.ni_pid.nid);
2069 hdr.type = cpu_to_le32 (PTL_MSG_HELLO);
2070 hdr.payload_length = cpu_to_le32 (nipaddrs * sizeof(*ipaddrs));
2072 hdr.msg.hello.type = cpu_to_le32 (conn->ksnc_type);
2073 hdr.msg.hello.incarnation =
2074 cpu_to_le64 (ksocknal_data.ksnd_incarnation);
2076 /* Receiver is eager */
2077 rc = ksocknal_sock_write (sock, &hdr, sizeof(hdr));
2079 CERROR ("Error %d sending HELLO hdr to %u.%u.%u.%u/%d\n",
2080 rc, HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
2087 for (i = 0; i < nipaddrs; i++) {
2088 ipaddrs[i] = __cpu_to_le32 (ipaddrs[i]);
2091 rc = ksocknal_sock_write (sock, ipaddrs, nipaddrs * sizeof(*ipaddrs));
2093 CERROR ("Error %d sending HELLO payload (%d)"
2094 " to %u.%u.%u.%u/%d\n", rc, nipaddrs,
2095 HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
2100 ksocknal_invert_type(int type)
2104 case SOCKNAL_CONN_ANY:
2105 case SOCKNAL_CONN_CONTROL:
2107 case SOCKNAL_CONN_BULK_IN:
2108 return SOCKNAL_CONN_BULK_OUT;
2109 case SOCKNAL_CONN_BULK_OUT:
2110 return SOCKNAL_CONN_BULK_IN;
2112 return (SOCKNAL_CONN_NONE);
2117 ksocknal_recv_hello (ksock_conn_t *conn, ptl_nid_t *nid,
2118 __u64 *incarnation, __u32 *ipaddrs)
2120 struct socket *sock = conn->ksnc_sock;
2126 ptl_magicversion_t *hmv;
2128 hmv = (ptl_magicversion_t *)&hdr.dest_nid;
2129 LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
2131 rc = ksocknal_sock_read (sock, hmv, sizeof (*hmv));
2133 CERROR ("Error %d reading HELLO from %u.%u.%u.%u\n",
2134 rc, HIPQUAD(conn->ksnc_ipaddr));
2138 if (hmv->magic != le32_to_cpu (PORTALS_PROTO_MAGIC)) {
2139 CERROR ("Bad magic %#08x (%#08x expected) from %u.%u.%u.%u\n",
2140 __cpu_to_le32 (hmv->magic), PORTALS_PROTO_MAGIC,
2141 HIPQUAD(conn->ksnc_ipaddr));
2145 if (hmv->version_major != cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR) ||
2146 hmv->version_minor != cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR)) {
2147 CERROR ("Incompatible protocol version %d.%d (%d.%d expected)"
2148 " from %u.%u.%u.%u\n",
2149 le16_to_cpu (hmv->version_major),
2150 le16_to_cpu (hmv->version_minor),
2151 PORTALS_PROTO_VERSION_MAJOR,
2152 PORTALS_PROTO_VERSION_MINOR,
2153 HIPQUAD(conn->ksnc_ipaddr));
2157 #if (PORTALS_PROTO_VERSION_MAJOR != 1)
2158 # error "This code only understands protocol version 1.x"
2160 /* version 1 sends magic/version as the dest_nid of a 'hello'
2161 * header, followed by payload full of interface IP addresses.
2162 * Read the rest of it in now... */
2164 rc = ksocknal_sock_read (sock, hmv + 1, sizeof (hdr) - sizeof (*hmv));
2166 CERROR ("Error %d reading rest of HELLO hdr from %u.%u.%u.%u\n",
2167 rc, HIPQUAD(conn->ksnc_ipaddr));
2171 /* ...and check we got what we expected */
2172 if (hdr.type != cpu_to_le32 (PTL_MSG_HELLO)) {
2173 CERROR ("Expecting a HELLO hdr,"
2174 " but got type %d from %u.%u.%u.%u\n",
2175 le32_to_cpu (hdr.type),
2176 HIPQUAD(conn->ksnc_ipaddr));
2180 if (le64_to_cpu(hdr.src_nid) == PTL_NID_ANY) {
2181 CERROR("Expecting a HELLO hdr with a NID, but got PTL_NID_ANY"
2182 "from %u.%u.%u.%u\n", HIPQUAD(conn->ksnc_ipaddr));
2186 if (*nid == PTL_NID_ANY) { /* don't know peer's nid yet */
2187 *nid = le64_to_cpu(hdr.src_nid);
2188 } else if (*nid != le64_to_cpu (hdr.src_nid)) {
2189 CERROR ("Connected to nid "LPX64"@%u.%u.%u.%u "
2190 "but expecting "LPX64"\n",
2191 le64_to_cpu (hdr.src_nid),
2192 HIPQUAD(conn->ksnc_ipaddr), *nid);
2196 type = __le32_to_cpu(hdr.msg.hello.type);
2198 if (conn->ksnc_type == SOCKNAL_CONN_NONE) {
2199 /* I've accepted this connection; peer determines type */
2200 conn->ksnc_type = ksocknal_invert_type(type);
2201 if (conn->ksnc_type == SOCKNAL_CONN_NONE) {
2202 CERROR ("Unexpected type %d from "LPX64"@%u.%u.%u.%u\n",
2203 type, *nid, HIPQUAD(conn->ksnc_ipaddr));
2206 } else if (ksocknal_invert_type(type) != conn->ksnc_type) {
2207 CERROR ("Mismatched types: me %d, "LPX64"@%u.%u.%u.%u %d\n",
2208 conn->ksnc_type, *nid, HIPQUAD(conn->ksnc_ipaddr),
2209 le32_to_cpu(hdr.msg.hello.type));
2213 *incarnation = le64_to_cpu(hdr.msg.hello.incarnation);
2215 nips = __le32_to_cpu (hdr.payload_length) / sizeof (__u32);
2217 if (nips > SOCKNAL_MAX_INTERFACES ||
2218 nips * sizeof(__u32) != __le32_to_cpu (hdr.payload_length)) {
2219 CERROR("Bad payload length %d from "LPX64"@%u.%u.%u.%u\n",
2220 __le32_to_cpu (hdr.payload_length),
2221 *nid, HIPQUAD(conn->ksnc_ipaddr));
2227 rc = ksocknal_sock_read (sock, ipaddrs, nips * sizeof(*ipaddrs));
2229 CERROR ("Error %d reading IPs from "LPX64"@%u.%u.%u.%u\n",
2230 rc, *nid, HIPQUAD(conn->ksnc_ipaddr));
2234 for (i = 0; i < nips; i++) {
2235 ipaddrs[i] = __le32_to_cpu(ipaddrs[i]);
2237 if (ipaddrs[i] == 0) {
2238 CERROR("Zero IP[%d] from "LPX64"@%u.%u.%u.%u\n",
2239 i, *nid, HIPQUAD(conn->ksnc_ipaddr));
2248 ksocknal_get_conn_tunables (ksock_conn_t *conn, int *txmem, int *rxmem, int *nagle)
2250 mm_segment_t oldmm = get_fs ();
2251 struct socket *sock = conn->ksnc_sock;
2255 rc = ksocknal_getconnsock (conn);
2257 LASSERT (conn->ksnc_closing);
2258 *txmem = *rxmem = *nagle = 0;
2259 return (-ESHUTDOWN);
2264 len = sizeof(*txmem);
2265 rc = sock_getsockopt(sock, SOL_SOCKET, SO_SNDBUF,
2266 (char *)txmem, &len);
2268 len = sizeof(*rxmem);
2269 rc = sock_getsockopt(sock, SOL_SOCKET, SO_RCVBUF,
2270 (char *)rxmem, &len);
2273 len = sizeof(*nagle);
2274 rc = sock->ops->getsockopt(sock, SOL_TCP, TCP_NODELAY,
2275 (char *)nagle, &len);
2279 ksocknal_putconnsock (conn);
2284 *txmem = *rxmem = *nagle = 0;
2290 ksocknal_setup_sock (struct socket *sock)
2292 mm_segment_t oldmm = get_fs ();
2299 struct linger linger;
2301 sock->sk->sk_allocation = GFP_NOFS;
2303 /* Ensure this socket aborts active sends immediately when we close
2307 linger.l_linger = 0;
2310 rc = sock_setsockopt (sock, SOL_SOCKET, SO_LINGER,
2311 (char *)&linger, sizeof (linger));
2314 CERROR ("Can't set SO_LINGER: %d\n", rc);
2320 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_LINGER2,
2321 (char *)&option, sizeof (option));
2324 CERROR ("Can't set SO_LINGER2: %d\n", rc);
2328 if (!ksocknal_tunables.ksnd_nagle) {
2332 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_NODELAY,
2333 (char *)&option, sizeof (option));
2336 CERROR ("Can't disable nagle: %d\n", rc);
2341 if (ksocknal_tunables.ksnd_buffer_size > 0) {
2342 option = ksocknal_tunables.ksnd_buffer_size;
2345 rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDBUF,
2346 (char *)&option, sizeof (option));
2349 CERROR ("Can't set send buffer %d: %d\n",
2355 rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVBUF,
2356 (char *)&option, sizeof (option));
2359 CERROR ("Can't set receive buffer %d: %d\n",
2365 /* snapshot tunables */
2366 keep_idle = ksocknal_tunables.ksnd_keepalive_idle;
2367 keep_count = ksocknal_tunables.ksnd_keepalive_count;
2368 keep_intvl = ksocknal_tunables.ksnd_keepalive_intvl;
2370 do_keepalive = (keep_idle > 0 && keep_count > 0 && keep_intvl > 0);
2372 option = (do_keepalive ? 1 : 0);
2374 rc = sock_setsockopt (sock, SOL_SOCKET, SO_KEEPALIVE,
2375 (char *)&option, sizeof (option));
2378 CERROR ("Can't set SO_KEEPALIVE: %d\n", rc);
2386 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPIDLE,
2387 (char *)&keep_idle, sizeof (keep_idle));
2390 CERROR ("Can't set TCP_KEEPIDLE: %d\n", rc);
2395 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPINTVL,
2396 (char *)&keep_intvl, sizeof (keep_intvl));
2399 CERROR ("Can't set TCP_KEEPINTVL: %d\n", rc);
2404 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPCNT,
2405 (char *)&keep_count, sizeof (keep_count));
2408 CERROR ("Can't set TCP_KEEPCNT: %d\n", rc);
2416 ksocknal_connect_sock(struct socket **sockp, int *may_retry,
2417 ksock_route_t *route, int local_port)
2419 struct sockaddr_in locaddr;
2420 struct sockaddr_in srvaddr;
2421 struct socket *sock;
2424 mm_segment_t oldmm = get_fs();
2427 memset(&locaddr, 0, sizeof(locaddr));
2428 locaddr.sin_family = AF_INET;
2429 locaddr.sin_port = htons(local_port);
2430 locaddr.sin_addr.s_addr =
2431 (route->ksnr_myipaddr != 0) ? htonl(route->ksnr_myipaddr)
2434 memset (&srvaddr, 0, sizeof (srvaddr));
2435 srvaddr.sin_family = AF_INET;
2436 srvaddr.sin_port = htons (route->ksnr_port);
2437 srvaddr.sin_addr.s_addr = htonl (route->ksnr_ipaddr);
2441 rc = sock_create (PF_INET, SOCK_STREAM, 0, &sock);
2444 CERROR ("Can't create autoconnect socket: %d\n", rc);
2448 /* Ugh; have to map_fd for compatibility with sockets passed in
2449 * from userspace. And we actually need the sock->file refcounting
2450 * that this gives you :) */
2452 rc = sock_map_fd (sock);
2454 sock_release (sock);
2455 CERROR ("sock_map_fd error %d\n", rc);
2459 /* NB the file descriptor (rc) now owns the ref on sock->file */
2460 LASSERT (sock->file != NULL);
2461 LASSERT (file_count(sock->file) == 1);
2463 get_file(sock->file); /* extra ref makes sock->file */
2464 sys_close(rc); /* survive this close */
2466 /* Still got a single ref on sock->file */
2467 LASSERT (file_count(sock->file) == 1);
2469 /* Set the socket timeouts, so our connection attempt completes in
2471 tv.tv_sec = ksocknal_tunables.ksnd_io_timeout;
2475 rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDTIMEO,
2476 (char *)&tv, sizeof (tv));
2479 CERROR ("Can't set send timeout %d: %d\n",
2480 ksocknal_tunables.ksnd_io_timeout, rc);
2485 rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVTIMEO,
2486 (char *)&tv, sizeof (tv));
2489 CERROR ("Can't set receive timeout %d: %d\n",
2490 ksocknal_tunables.ksnd_io_timeout, rc);
2496 rc = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
2497 (char *)&option, sizeof (option));
2500 CERROR("Can't set SO_REUSEADDR for socket: %d\n", rc);
2504 rc = sock->ops->bind(sock,
2505 (struct sockaddr *)&locaddr, sizeof(locaddr));
2506 if (rc == -EADDRINUSE) {
2507 CDEBUG(D_NET, "Port %d already in use\n", local_port);
2512 CERROR("Error trying to bind to reserved port %d: %d\n",
2517 rc = sock->ops->connect(sock,
2518 (struct sockaddr *)&srvaddr, sizeof(srvaddr),
2519 sock->file->f_flags);
2523 /* EADDRNOTAVAIL probably means we're already connected to the same
2524 * peer/port on the same local port on a differently typed
2525 * connection. Let our caller retry with a different local
2527 *may_retry = (rc == -EADDRNOTAVAIL);
2529 CDEBUG(*may_retry ? D_NET : D_ERROR,
2530 "Error %d connecting %u.%u.%u.%u/%d -> %u.%u.%u.%u/%d\n", rc,
2531 HIPQUAD(route->ksnr_myipaddr), local_port,
2532 HIPQUAD(route->ksnr_ipaddr), route->ksnr_port);
2540 ksocknal_connect_peer (ksock_route_t *route, int type)
2542 struct socket *sock;
2547 /* Iterate through reserved ports. When typed connections are
2548 * used, we will need to bind to multiple ports, but we only know
2549 * this at connect time. But, by that time we've already called
2550 * bind() so we need a new socket. */
2552 for (port = 1023; port > 512; --port) {
2554 rc = ksocknal_connect_sock(&sock, &may_retry, route, port);
2557 rc = ksocknal_create_conn(route, sock, type);
2566 CERROR("Out of ports trying to bind to a reserved port\n");
2567 return (-EADDRINUSE);
2571 ksocknal_autoconnect (ksock_route_t *route)
2573 LIST_HEAD (zombies);
2576 unsigned long flags;
2581 for (type = 0; type < SOCKNAL_CONN_NTYPES; type++)
2582 if ((route->ksnr_connecting & (1 << type)) != 0)
2584 LASSERT (type < SOCKNAL_CONN_NTYPES);
2586 rc = ksocknal_connect_peer (route, type);
2590 /* successfully autoconnected: create_conn did the
2591 * route/conn binding and scheduled any blocked packets */
2593 if (route->ksnr_connecting == 0) {
2594 /* No more connections required */
2599 /* Connection attempt failed */
2601 write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
2603 peer = route->ksnr_peer;
2604 route->ksnr_connecting = 0;
2606 /* This is a retry rather than a new connection */
2607 LASSERT (route->ksnr_retry_interval != 0);
2608 route->ksnr_timeout = jiffies + route->ksnr_retry_interval;
2609 route->ksnr_retry_interval = MIN (route->ksnr_retry_interval * 2,
2610 SOCKNAL_MAX_RECONNECT_INTERVAL);
2612 if (!list_empty (&peer->ksnp_tx_queue) &&
2613 ksocknal_find_connecting_route_locked (peer) == NULL) {
2614 LASSERT (list_empty (&peer->ksnp_conns));
2616 /* None of the connections that the blocked packets are
2617 * waiting for have been successful. Complete them now... */
2619 tx = list_entry (peer->ksnp_tx_queue.next,
2620 ksock_tx_t, tx_list);
2621 list_del (&tx->tx_list);
2622 list_add_tail (&tx->tx_list, &zombies);
2623 } while (!list_empty (&peer->ksnp_tx_queue));
2626 #if 0 /* irrelevent with only eager routes */
2627 if (!route->ksnr_deleted) {
2628 /* make this route least-favourite for re-selection */
2629 list_del(&route->ksnr_list);
2630 list_add_tail(&route->ksnr_list, &peer->ksnp_routes);
2633 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
2635 while (!list_empty (&zombies)) {
2636 char ipbuf[PTL_NALFMT_SIZE];
2637 char ipbuf2[PTL_NALFMT_SIZE];
2638 tx = list_entry (zombies.next, ksock_tx_t, tx_list);
2640 CERROR ("Deleting packet type %d len %d ("LPX64" %s->"LPX64" %s)\n",
2641 le32_to_cpu (tx->tx_hdr->type),
2642 le32_to_cpu (tx->tx_hdr->payload_length),
2643 le64_to_cpu (tx->tx_hdr->src_nid),
2644 portals_nid2str(SOCKNAL,
2645 le64_to_cpu(tx->tx_hdr->src_nid),
2647 le64_to_cpu (tx->tx_hdr->dest_nid),
2648 portals_nid2str(SOCKNAL,
2649 le64_to_cpu(tx->tx_hdr->src_nid),
2652 list_del (&tx->tx_list);
2654 ksocknal_tx_done (tx, 0);
2659 ksocknal_autoconnectd (void *arg)
2661 long id = (long)arg;
2663 unsigned long flags;
2664 ksock_route_t *route;
2667 snprintf (name, sizeof (name), "ksocknal_ad%02ld", id);
2668 kportal_daemonize (name);
2669 kportal_blockallsigs ();
2671 spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2673 while (!ksocknal_data.ksnd_shuttingdown) {
2675 if (!list_empty (&ksocknal_data.ksnd_autoconnectd_routes)) {
2676 route = list_entry (ksocknal_data.ksnd_autoconnectd_routes.next,
2677 ksock_route_t, ksnr_connect_list);
2679 list_del (&route->ksnr_connect_list);
2680 spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2682 ksocknal_autoconnect (route);
2683 ksocknal_put_route (route);
2685 spin_lock_irqsave(&ksocknal_data.ksnd_autoconnectd_lock,
2690 spin_unlock_irqrestore(&ksocknal_data.ksnd_autoconnectd_lock,
2693 rc = wait_event_interruptible(ksocknal_data.ksnd_autoconnectd_waitq,
2694 ksocknal_data.ksnd_shuttingdown ||
2695 !list_empty(&ksocknal_data.ksnd_autoconnectd_routes));
2697 spin_lock_irqsave(&ksocknal_data.ksnd_autoconnectd_lock, flags);
2700 spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2702 ksocknal_thread_fini ();
2707 ksocknal_find_timed_out_conn (ksock_peer_t *peer)
2709 /* We're called with a shared lock on ksnd_global_lock */
2711 struct list_head *ctmp;
2713 list_for_each (ctmp, &peer->ksnp_conns) {
2714 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
2716 /* Don't need the {get,put}connsock dance to deref ksnc_sock... */
2717 LASSERT (!conn->ksnc_closing);
2719 if (conn->ksnc_sock->sk->sk_err != 0) {
2720 /* Something (e.g. failed keepalive) set the socket error */
2721 atomic_inc (&conn->ksnc_refcount);
2722 CERROR ("Socket error %d: "LPX64" %p %d.%d.%d.%d\n",
2723 conn->ksnc_sock->sk->sk_err, peer->ksnp_nid,
2724 conn, HIPQUAD(conn->ksnc_ipaddr));
2728 if (conn->ksnc_rx_started &&
2729 time_after_eq (jiffies, conn->ksnc_rx_deadline)) {
2730 /* Timed out incomplete incoming message */
2731 atomic_inc (&conn->ksnc_refcount);
2732 CERROR ("Timed out RX from "LPX64" %p %d.%d.%d.%d\n",
2733 peer->ksnp_nid,conn,HIPQUAD(conn->ksnc_ipaddr));
2737 if ((!list_empty (&conn->ksnc_tx_queue) ||
2738 conn->ksnc_sock->sk->sk_wmem_queued != 0) &&
2739 time_after_eq (jiffies, conn->ksnc_tx_deadline)) {
2740 /* Timed out messages queued for sending or
2741 * buffered in the socket's send buffer */
2742 atomic_inc (&conn->ksnc_refcount);
2743 CERROR ("Timed out TX to "LPX64" %s%d %p %d.%d.%d.%d\n",
2745 list_empty (&conn->ksnc_tx_queue) ? "" : "Q ",
2746 conn->ksnc_sock->sk->sk_wmem_queued, conn,
2747 HIPQUAD(conn->ksnc_ipaddr));
2756 ksocknal_check_peer_timeouts (int idx)
2758 struct list_head *peers = &ksocknal_data.ksnd_peers[idx];
2759 struct list_head *ptmp;
2764 /* NB. We expect to have a look at all the peers and not find any
2765 * connections to time out, so we just use a shared lock while we
2767 read_lock (&ksocknal_data.ksnd_global_lock);
2769 list_for_each (ptmp, peers) {
2770 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
2771 conn = ksocknal_find_timed_out_conn (peer);
2774 read_unlock (&ksocknal_data.ksnd_global_lock);
2776 CERROR ("Timeout out conn->"LPX64" ip %d.%d.%d.%d:%d\n",
2778 HIPQUAD(conn->ksnc_ipaddr),
2780 ksocknal_close_conn_and_siblings (conn, -ETIMEDOUT);
2782 /* NB we won't find this one again, but we can't
2783 * just proceed with the next peer, since we dropped
2784 * ksnd_global_lock and it might be dead already! */
2785 ksocknal_put_conn (conn);
2790 read_unlock (&ksocknal_data.ksnd_global_lock);
2794 ksocknal_reaper (void *arg)
2797 unsigned long flags;
2799 ksock_sched_t *sched;
2800 struct list_head enomem_conns;
2805 unsigned long deadline = jiffies;
2807 kportal_daemonize ("ksocknal_reaper");
2808 kportal_blockallsigs ();
2810 INIT_LIST_HEAD(&enomem_conns);
2811 init_waitqueue_entry (&wait, current);
2813 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2815 while (!ksocknal_data.ksnd_shuttingdown) {
2817 if (!list_empty (&ksocknal_data.ksnd_deathrow_conns)) {
2818 conn = list_entry (ksocknal_data.ksnd_deathrow_conns.next,
2819 ksock_conn_t, ksnc_list);
2820 list_del (&conn->ksnc_list);
2822 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2824 ksocknal_terminate_conn (conn);
2825 ksocknal_put_conn (conn);
2827 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2831 if (!list_empty (&ksocknal_data.ksnd_zombie_conns)) {
2832 conn = list_entry (ksocknal_data.ksnd_zombie_conns.next,
2833 ksock_conn_t, ksnc_list);
2834 list_del (&conn->ksnc_list);
2836 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2838 ksocknal_destroy_conn (conn);
2840 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2844 if (!list_empty (&ksocknal_data.ksnd_enomem_conns)) {
2845 list_add(&enomem_conns, &ksocknal_data.ksnd_enomem_conns);
2846 list_del_init(&ksocknal_data.ksnd_enomem_conns);
2849 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2851 /* reschedule all the connections that stalled with ENOMEM... */
2853 while (!list_empty (&enomem_conns)) {
2854 conn = list_entry (enomem_conns.next,
2855 ksock_conn_t, ksnc_tx_list);
2856 list_del (&conn->ksnc_tx_list);
2858 sched = conn->ksnc_scheduler;
2860 spin_lock_irqsave (&sched->kss_lock, flags);
2862 LASSERT (conn->ksnc_tx_scheduled);
2863 conn->ksnc_tx_ready = 1;
2864 list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns);
2865 wake_up (&sched->kss_waitq);
2867 spin_unlock_irqrestore (&sched->kss_lock, flags);
2871 /* careful with the jiffy wrap... */
2872 while ((timeout = (int)(deadline - jiffies)) <= 0) {
2875 int chunk = ksocknal_data.ksnd_peer_hash_size;
2877 /* Time to check for timeouts on a few more peers: I do
2878 * checks every 'p' seconds on a proportion of the peer
2879 * table and I need to check every connection 'n' times
2880 * within a timeout interval, to ensure I detect a
2881 * timeout on any connection within (n+1)/n times the
2882 * timeout interval. */
2884 if (ksocknal_tunables.ksnd_io_timeout > n * p)
2885 chunk = (chunk * n * p) /
2886 ksocknal_tunables.ksnd_io_timeout;
2890 for (i = 0; i < chunk; i++) {
2891 ksocknal_check_peer_timeouts (peer_index);
2892 peer_index = (peer_index + 1) %
2893 ksocknal_data.ksnd_peer_hash_size;
2899 if (nenomem_conns != 0) {
2900 /* Reduce my timeout if I rescheduled ENOMEM conns.
2901 * This also prevents me getting woken immediately
2902 * if any go back on my enomem list. */
2903 timeout = SOCKNAL_ENOMEM_RETRY;
2905 ksocknal_data.ksnd_reaper_waketime = jiffies + timeout;
2907 set_current_state (TASK_INTERRUPTIBLE);
2908 add_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
2910 if (!ksocknal_data.ksnd_shuttingdown &&
2911 list_empty (&ksocknal_data.ksnd_deathrow_conns) &&
2912 list_empty (&ksocknal_data.ksnd_zombie_conns))
2913 schedule_timeout (timeout);
2915 set_current_state (TASK_RUNNING);
2916 remove_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
2918 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2921 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2923 ksocknal_thread_fini ();
2927 lib_nal_t ksocknal_lib = {
2928 libnal_data: &ksocknal_data, /* NAL private data */
2929 libnal_send: ksocknal_send,
2930 libnal_send_pages: ksocknal_send_pages,
2931 libnal_recv: ksocknal_recv,
2932 libnal_recv_pages: ksocknal_recv_pages,
2933 libnal_dist: ksocknal_dist