1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5 * Author: Zach Brown <zab@zabbo.net>
6 * Author: Peter J. Braam <braam@clusterfs.com>
7 * Author: Phil Schwan <phil@clusterfs.com>
8 * Author: Eric Barton <eric@bartonsoftware.com>
10 * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
12 * Portals is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Portals is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Portals; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
28 # include <linux/syscalls.h>
32 * LIB functions follow
36 ksocknal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
38 /* I would guess that if ksocknal_get_peer (nid) == NULL,
39 and we're not routing, then 'nid' is very distant :) */
40 if (nal->libnal_ni.ni_pid.nid == nid) {
50 ksocknal_free_ltx (ksock_ltx_t *ltx)
52 atomic_dec(&ksocknal_data.ksnd_nactive_ltxs);
53 PORTAL_FREE(ltx, ltx->ltx_desc_size);
56 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
58 ksocknal_kvaddr_to_page (unsigned long vaddr)
62 if (vaddr >= VMALLOC_START &&
64 page = vmalloc_to_page ((void *)vaddr);
66 else if (vaddr >= PKMAP_BASE &&
67 vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
68 page = vmalloc_to_page ((void *)vaddr);
69 /* in 2.4 ^ just walks the page tables */
72 page = virt_to_page (vaddr);
83 ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
85 struct socket *sock = conn->ksnc_sock;
86 struct iovec *iov = tx->tx_iov;
87 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
88 unsigned long vaddr = (unsigned long)iov->iov_base
89 int offset = vaddr & (PAGE_SIZE - 1);
90 int zcsize = MIN (iov->iov_len, PAGE_SIZE - offset);
96 /* NB we can't trust socket ops to either consume our iovs
97 * or leave them alone. */
98 LASSERT (tx->tx_niov > 0);
100 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
101 if (zcsize >= ksocknal_data.ksnd_zc_min_frag &&
102 (sock->sk->route_caps & NETIF_F_SG) &&
103 (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
104 (page = ksocknal_kvaddr_to_page (vaddr)) != NULL) {
105 int msgflg = MSG_DONTWAIT;
107 CDEBUG(D_NET, "vaddr %p, page %p->%p + offset %x for %d\n",
108 (void *)vaddr, page, page_address(page), offset, zcsize);
110 if (!list_empty (&conn->ksnc_tx_queue) ||
111 zcsize < tx->tx_resid)
114 rc = tcp_sendpage_zccd(sock, page, offset, zcsize, msgflg, &tx->tx_zccd);
118 #if SOCKNAL_SINGLE_FRAG_TX
119 struct iovec scratch;
120 struct iovec *scratchiov = &scratch;
123 struct iovec *scratchiov = conn->ksnc_tx_scratch_iov;
124 int niov = tx->tx_niov;
126 struct msghdr msg = {
129 .msg_iov = scratchiov,
133 .msg_flags = MSG_DONTWAIT
135 mm_segment_t oldmm = get_fs();
138 for (nob = i = 0; i < niov; i++) {
139 scratchiov[i] = tx->tx_iov[i];
140 nob += scratchiov[i].iov_len;
143 if (!list_empty(&conn->ksnc_tx_queue) ||
145 msg.msg_flags |= MSG_MORE;
148 rc = sock_sendmsg(sock, &msg, nob);
152 if (rc <= 0) /* sent nothing? */
156 LASSERT (nob <= tx->tx_resid);
161 LASSERT (tx->tx_niov > 0);
163 if (nob < iov->iov_len) {
164 iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob);
178 ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
180 struct socket *sock = conn->ksnc_sock;
181 ptl_kiov_t *kiov = tx->tx_kiov;
185 /* NB we can't trust socket ops to either consume our iovs
186 * or leave them alone. */
187 LASSERT (tx->tx_niov == 0);
188 LASSERT (tx->tx_nkiov > 0);
191 if (kiov->kiov_len >= ksocknal_tunables.ksnd_zc_min_frag &&
192 (sock->sk->route_caps & NETIF_F_SG) &&
193 (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM))) {
194 struct page *page = kiov->kiov_page;
195 int offset = kiov->kiov_offset;
196 int fragsize = kiov->kiov_len;
197 int msgflg = MSG_DONTWAIT;
199 CDEBUG(D_NET, "page %p + offset %x for %d\n",
200 page, offset, kiov->kiov_len);
202 if (!list_empty(&conn->ksnc_tx_queue) ||
203 fragsize < tx->tx_resid)
206 rc = tcp_sendpage_zccd(sock, page, offset, fragsize, msgflg,
211 #if SOCKNAL_SINGLE_FRAG_TX || !SOCKNAL_RISK_KMAP_DEADLOCK
212 struct iovec scratch;
213 struct iovec *scratchiov = &scratch;
216 #ifdef CONFIG_HIGHMEM
217 #warning "XXX risk of kmap deadlock on multiple frags..."
219 struct iovec *scratchiov = conn->ksnc_tx_scratch_iov;
220 int niov = tx->tx_nkiov;
222 struct msghdr msg = {
225 .msg_iov = scratchiov,
229 .msg_flags = MSG_DONTWAIT
231 mm_segment_t oldmm = get_fs();
234 for (nob = i = 0; i < niov; i++) {
235 scratchiov[i].iov_base = kmap(kiov[i].kiov_page) +
237 nob += scratchiov[i].iov_len = kiov[i].kiov_len;
240 if (!list_empty(&conn->ksnc_tx_queue) ||
242 msg.msg_flags |= MSG_DONTWAIT;
245 rc = sock_sendmsg(sock, &msg, nob);
248 for (i = 0; i < niov; i++)
249 kunmap(kiov[i].kiov_page);
252 if (rc <= 0) /* sent nothing? */
256 LASSERT (nob <= tx->tx_resid);
260 LASSERT(tx->tx_nkiov > 0);
262 if (nob < kiov->kiov_len) {
263 kiov->kiov_offset += nob;
264 kiov->kiov_len -= nob;
268 nob -= kiov->kiov_len;
269 tx->tx_kiov = ++kiov;
277 ksocknal_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
282 if (ksocknal_data.ksnd_stall_tx != 0) {
283 set_current_state (TASK_UNINTERRUPTIBLE);
284 schedule_timeout (ksocknal_data.ksnd_stall_tx * HZ);
287 LASSERT (tx->tx_resid != 0);
289 rc = ksocknal_getconnsock (conn);
291 LASSERT (conn->ksnc_closing);
296 if (ksocknal_data.ksnd_enomem_tx > 0) {
298 ksocknal_data.ksnd_enomem_tx--;
300 } else if (tx->tx_niov != 0) {
301 rc = ksocknal_send_iov (conn, tx);
303 rc = ksocknal_send_kiov (conn, tx);
306 bufnob = conn->ksnc_sock->sk->sk_wmem_queued;
307 if (rc > 0) /* sent something? */
308 conn->ksnc_tx_bufnob += rc; /* account it */
310 if (bufnob < conn->ksnc_tx_bufnob) {
311 /* allocated send buffer bytes < computed; infer
312 * something got ACKed */
313 conn->ksnc_tx_deadline = jiffies +
314 ksocknal_tunables.ksnd_io_timeout * HZ;
315 conn->ksnc_peer->ksnp_last_alive = jiffies;
316 conn->ksnc_tx_bufnob = bufnob;
320 if (rc <= 0) { /* Didn't write anything? */
322 ksock_sched_t *sched;
324 if (rc == 0) /* some stacks return 0 instead of -EAGAIN */
330 /* Check if EAGAIN is due to memory pressure */
332 sched = conn->ksnc_scheduler;
333 spin_lock_irqsave(&sched->kss_lock, flags);
335 if (!test_bit(SOCK_NOSPACE, &conn->ksnc_sock->flags) &&
336 !conn->ksnc_tx_ready) {
337 /* SOCK_NOSPACE is set when the socket fills
338 * and cleared in the write_space callback
339 * (which also sets ksnc_tx_ready). If
340 * SOCK_NOSPACE and ksnc_tx_ready are BOTH
341 * zero, I didn't fill the socket and
342 * write_space won't reschedule me, so I
343 * return -ENOMEM to get my caller to retry
348 spin_unlock_irqrestore(&sched->kss_lock, flags);
352 /* socket's wmem_queued now includes 'rc' bytes */
353 atomic_sub (rc, &conn->ksnc_tx_nob);
356 } while (tx->tx_resid != 0);
358 ksocknal_putconnsock (conn);
363 ksocknal_eager_ack (ksock_conn_t *conn)
366 mm_segment_t oldmm = get_fs();
367 struct socket *sock = conn->ksnc_sock;
369 /* Remind the socket to ACK eagerly. If I don't, the socket might
370 * think I'm about to send something it could piggy-back the ACK
371 * on, introducing delay in completing zero-copy sends in my
375 sock->ops->setsockopt (sock, SOL_TCP, TCP_QUICKACK,
376 (char *)&opt, sizeof (opt));
381 ksocknal_recv_iov (ksock_conn_t *conn)
383 #if SOCKNAL_SINGLE_FRAG_RX
384 struct iovec scratch;
385 struct iovec *scratchiov = &scratch;
388 struct iovec *scratchiov = conn->ksnc_rx_scratch_iov;
389 int niov = conn->ksnc_rx_niov;
391 struct iovec *iov = conn->ksnc_rx_iov;
392 struct msghdr msg = {
395 .msg_iov = scratchiov,
401 mm_segment_t oldmm = get_fs();
406 /* NB we can't trust socket ops to either consume our iovs
407 * or leave them alone. */
410 for (nob = i = 0; i < niov; i++) {
411 scratchiov[i] = iov[i];
412 nob += scratchiov[i].iov_len;
414 LASSERT (nob <= conn->ksnc_rx_nob_wanted);
417 rc = sock_recvmsg (conn->ksnc_sock, &msg, nob, MSG_DONTWAIT);
418 /* NB this is just a boolean..........................^ */
424 /* received something... */
427 conn->ksnc_peer->ksnp_last_alive = jiffies;
428 conn->ksnc_rx_deadline = jiffies +
429 ksocknal_tunables.ksnd_io_timeout * HZ;
430 mb(); /* order with setting rx_started */
431 conn->ksnc_rx_started = 1;
433 conn->ksnc_rx_nob_wanted -= nob;
434 conn->ksnc_rx_nob_left -= nob;
437 LASSERT (conn->ksnc_rx_niov > 0);
439 if (nob < iov->iov_len) {
441 iov->iov_base = (void *)(((unsigned long)iov->iov_base) + nob);
446 conn->ksnc_rx_iov = ++iov;
447 conn->ksnc_rx_niov--;
454 ksocknal_recv_kiov (ksock_conn_t *conn)
456 #if SOCKNAL_SINGLE_FRAG_RX || !SOCKNAL_RISK_KMAP_DEADLOCK
457 struct iovec scratch;
458 struct iovec *scratchiov = &scratch;
461 #ifdef CONFIG_HIGHMEM
462 #warning "XXX risk of kmap deadlock on multiple frags..."
464 struct iovec *scratchiov = conn->ksnc_rx_scratch_iov;
465 int niov = conn->ksnc_rx_nkiov;
467 ptl_kiov_t *kiov = conn->ksnc_rx_kiov;
468 struct msghdr msg = {
471 .msg_iov = scratchiov,
477 mm_segment_t oldmm = get_fs();
482 LASSERT (conn->ksnc_rx_nkiov > 0);
484 /* NB we can't trust socket ops to either consume our iovs
485 * or leave them alone. */
486 for (nob = i = 0; i < niov; i++) {
487 scratchiov[i].iov_base = kmap(kiov[i].kiov_page) + kiov[i].kiov_offset;
488 nob += scratchiov[i].iov_len = kiov[i].kiov_len;
490 LASSERT (nob <= conn->ksnc_rx_nob_wanted);
493 rc = sock_recvmsg (conn->ksnc_sock, &msg, nob, MSG_DONTWAIT);
494 /* NB this is just a boolean.......................^ */
497 for (i = 0; i < niov; i++)
498 kunmap(kiov[i].kiov_page);
503 /* received something... */
506 conn->ksnc_peer->ksnp_last_alive = jiffies;
507 conn->ksnc_rx_deadline = jiffies +
508 ksocknal_tunables.ksnd_io_timeout * HZ;
509 mb(); /* order with setting rx_started */
510 conn->ksnc_rx_started = 1;
512 conn->ksnc_rx_nob_wanted -= nob;
513 conn->ksnc_rx_nob_left -= nob;
516 LASSERT (conn->ksnc_rx_nkiov > 0);
518 if (nob < kiov->kiov_len) {
519 kiov->kiov_offset += nob;
520 kiov->kiov_len -= nob;
524 nob -= kiov->kiov_len;
525 conn->ksnc_rx_kiov = ++kiov;
526 conn->ksnc_rx_nkiov--;
533 ksocknal_receive (ksock_conn_t *conn)
535 /* Return 1 on success, 0 on EOF, < 0 on error.
536 * Caller checks ksnc_rx_nob_wanted to determine
537 * progress/completion. */
541 if (ksocknal_data.ksnd_stall_rx != 0) {
542 set_current_state (TASK_UNINTERRUPTIBLE);
543 schedule_timeout (ksocknal_data.ksnd_stall_rx * HZ);
546 rc = ksocknal_getconnsock (conn);
548 LASSERT (conn->ksnc_closing);
553 if (conn->ksnc_rx_niov != 0)
554 rc = ksocknal_recv_iov (conn);
556 rc = ksocknal_recv_kiov (conn);
559 /* error/EOF or partial receive */
562 } else if (rc == 0 && conn->ksnc_rx_started) {
563 /* EOF in the middle of a message */
569 /* Completed a fragment */
571 if (conn->ksnc_rx_nob_wanted == 0) {
572 /* Completed a message segment (header or payload) */
573 if ((ksocknal_tunables.ksnd_eager_ack & conn->ksnc_type) != 0 &&
574 (conn->ksnc_rx_state == SOCKNAL_RX_BODY ||
575 conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD)) {
576 /* Remind the socket to ack eagerly... */
577 ksocknal_eager_ack(conn);
584 ksocknal_putconnsock (conn);
590 ksocknal_zc_callback (zccd_t *zcd)
592 ksock_tx_t *tx = KSOCK_ZCCD_2_TX(zcd);
593 ksock_sched_t *sched = tx->tx_conn->ksnc_scheduler;
597 /* Schedule tx for cleanup (can't do it now due to lock conflicts) */
599 spin_lock_irqsave (&sched->kss_lock, flags);
601 list_add_tail (&tx->tx_list, &sched->kss_zctxdone_list);
602 wake_up (&sched->kss_waitq);
604 spin_unlock_irqrestore (&sched->kss_lock, flags);
610 ksocknal_tx_done (ksock_tx_t *tx, int asynch)
615 if (tx->tx_conn != NULL) {
617 /* zero copy completion isn't always from
618 * process_transmit() so it needs to keep a ref on
621 ksocknal_put_conn (tx->tx_conn);
627 if (tx->tx_isfwd) { /* was a forwarded packet? */
628 kpr_fwd_done (&ksocknal_data.ksnd_router,
629 KSOCK_TX_2_KPR_FWD_DESC (tx),
630 (tx->tx_resid == 0) ? 0 : -ECONNABORTED);
636 ltx = KSOCK_TX_2_KSOCK_LTX (tx);
638 lib_finalize (&ksocknal_lib, ltx->ltx_private, ltx->ltx_cookie,
639 (tx->tx_resid == 0) ? PTL_OK : PTL_FAIL);
641 ksocknal_free_ltx (ltx);
646 ksocknal_tx_launched (ksock_tx_t *tx)
649 if (atomic_read (&tx->tx_zccd.zccd_count) != 1) {
650 ksock_conn_t *conn = tx->tx_conn;
652 /* zccd skbufs are still in-flight. First take a ref on
653 * conn, so it hangs about for ksocknal_tx_done... */
654 atomic_inc (&conn->ksnc_refcount);
656 /* ...then drop the initial ref on zccd, so the zero copy
657 * callback can occur */
658 zccd_put (&tx->tx_zccd);
662 /* Any zero-copy-ness (if any) has completed; I can complete the
663 * transmit now, avoiding an extra schedule */
664 ksocknal_tx_done (tx, 0);
668 ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
673 rc = ksocknal_transmit (conn, tx);
675 CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc);
677 if (tx->tx_resid == 0) {
678 /* Sent everything OK */
681 ksocknal_tx_launched (tx);
691 counter++; /* exponential backoff warnings */
692 if ((counter & (-counter)) == counter)
693 CWARN("%d ENOMEM tx %p\n", counter, conn);
695 /* Queue on ksnd_enomem_conns for retry after a timeout */
696 spin_lock_irqsave(&ksocknal_data.ksnd_reaper_lock, flags);
698 /* enomem list takes over scheduler's ref... */
699 LASSERT (conn->ksnc_tx_scheduled);
700 list_add_tail(&conn->ksnc_tx_list,
701 &ksocknal_data.ksnd_enomem_conns);
702 if (!time_after_eq(jiffies + SOCKNAL_ENOMEM_RETRY,
703 ksocknal_data.ksnd_reaper_waketime))
704 wake_up (&ksocknal_data.ksnd_reaper_waitq);
706 spin_unlock_irqrestore(&ksocknal_data.ksnd_reaper_lock, flags);
713 if (!conn->ksnc_closing)
714 CERROR("[%p] Error %d on write to "LPX64
715 " ip %d.%d.%d.%d:%d\n", conn, rc,
716 conn->ksnc_peer->ksnp_nid,
717 HIPQUAD(conn->ksnc_ipaddr),
720 ksocknal_close_conn_and_siblings (conn, rc);
721 ksocknal_tx_launched (tx);
727 ksocknal_launch_autoconnect_locked (ksock_route_t *route)
731 /* called holding write lock on ksnd_global_lock */
733 LASSERT (!route->ksnr_deleted);
734 LASSERT ((route->ksnr_connected & (1 << SOCKNAL_CONN_ANY)) == 0);
735 LASSERT ((route->ksnr_connected & KSNR_TYPED_ROUTES) != KSNR_TYPED_ROUTES);
736 LASSERT (route->ksnr_connecting == 0);
738 if (ksocknal_tunables.ksnd_typed_conns)
739 route->ksnr_connecting =
740 KSNR_TYPED_ROUTES & ~route->ksnr_connected;
742 route->ksnr_connecting = (1 << SOCKNAL_CONN_ANY);
744 atomic_inc (&route->ksnr_refcount); /* extra ref for asynchd */
746 spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
748 list_add_tail (&route->ksnr_connect_list,
749 &ksocknal_data.ksnd_autoconnectd_routes);
750 wake_up (&ksocknal_data.ksnd_autoconnectd_waitq);
752 spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
756 ksocknal_find_target_peer_locked (ksock_tx_t *tx, ptl_nid_t nid)
758 char ipbuf[PTL_NALFMT_SIZE];
759 ptl_nid_t target_nid;
761 ksock_peer_t *peer = ksocknal_find_peer_locked (nid);
767 CERROR ("Can't send packet to "LPX64
768 " %s: routed target is not a peer\n",
769 nid, portals_nid2str(SOCKNAL, nid, ipbuf));
773 rc = kpr_lookup (&ksocknal_data.ksnd_router, nid, tx->tx_nob,
776 CERROR ("Can't route to "LPX64" %s: router error %d\n",
777 nid, portals_nid2str(SOCKNAL, nid, ipbuf), rc);
781 peer = ksocknal_find_peer_locked (target_nid);
785 CERROR ("Can't send packet to "LPX64" %s: no peer entry\n",
786 target_nid, portals_nid2str(SOCKNAL, target_nid, ipbuf));
791 ksocknal_find_conn_locked (ksock_tx_t *tx, ksock_peer_t *peer)
793 struct list_head *tmp;
794 ksock_conn_t *typed = NULL;
796 ksock_conn_t *fallback = NULL;
800 list_for_each (tmp, &peer->ksnp_conns) {
801 ksock_conn_t *c = list_entry(tmp, ksock_conn_t, ksnc_list);
802 #if SOCKNAL_ROUND_ROBIN
805 int nob = atomic_read(&c->ksnc_tx_nob) +
806 c->ksnc_sock->sk->sk_wmem_queued;
808 LASSERT (!c->ksnc_closing);
810 if (fallback == NULL || nob < fnob) {
815 if (!ksocknal_tunables.ksnd_typed_conns)
818 switch (c->ksnc_type) {
821 case SOCKNAL_CONN_ANY:
823 case SOCKNAL_CONN_BULK_IN:
825 case SOCKNAL_CONN_BULK_OUT:
826 if (tx->tx_nob < ksocknal_tunables.ksnd_min_bulk)
829 case SOCKNAL_CONN_CONTROL:
830 if (tx->tx_nob >= ksocknal_tunables.ksnd_min_bulk)
835 if (typed == NULL || nob < tnob) {
841 /* prefer the typed selection */
842 conn = (typed != NULL) ? typed : fallback;
844 #if SOCKNAL_ROUND_ROBIN
846 /* round-robin all else being equal */
847 list_del (&conn->ksnc_list);
848 list_add_tail (&conn->ksnc_list, &peer->ksnp_conns);
855 ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
858 ksock_sched_t *sched = conn->ksnc_scheduler;
860 /* called holding global lock (read or irq-write) and caller may
861 * not have dropped this lock between finding conn and calling me,
862 * so we don't need the {get,put}connsock dance to deref
864 LASSERT(!conn->ksnc_closing);
865 LASSERT(tx->tx_resid == tx->tx_nob);
867 CDEBUG (D_NET, "Sending to "LPX64" ip %d.%d.%d.%d:%d\n",
868 conn->ksnc_peer->ksnp_nid,
869 HIPQUAD(conn->ksnc_ipaddr),
872 atomic_add (tx->tx_nob, &conn->ksnc_tx_nob);
876 zccd_init (&tx->tx_zccd, ksocknal_zc_callback);
877 /* NB this sets 1 ref on zccd, so the callback can only occur after
878 * I've released this ref. */
880 spin_lock_irqsave (&sched->kss_lock, flags);
882 if (list_empty(&conn->ksnc_tx_queue) &&
883 conn->ksnc_sock->sk->sk_wmem_queued == 0) {
884 /* First packet starts the timeout */
885 conn->ksnc_tx_deadline = jiffies +
886 ksocknal_tunables.ksnd_io_timeout * HZ;
887 conn->ksnc_tx_bufnob = 0;
888 mb(); /* order with adding to tx_queue */
891 list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
893 if (conn->ksnc_tx_ready && /* able to send */
894 !conn->ksnc_tx_scheduled) { /* not scheduled to send */
895 /* +1 ref for scheduler */
896 atomic_inc (&conn->ksnc_refcount);
897 list_add_tail (&conn->ksnc_tx_list,
898 &sched->kss_tx_conns);
899 conn->ksnc_tx_scheduled = 1;
900 wake_up (&sched->kss_waitq);
903 spin_unlock_irqrestore (&sched->kss_lock, flags);
907 ksocknal_find_connectable_route_locked (ksock_peer_t *peer)
909 struct list_head *tmp;
910 ksock_route_t *route;
913 list_for_each (tmp, &peer->ksnp_routes) {
914 route = list_entry (tmp, ksock_route_t, ksnr_list);
915 bits = route->ksnr_connected;
917 /* All typed connections established? */
918 if ((bits & KSNR_TYPED_ROUTES) == KSNR_TYPED_ROUTES)
921 /* Untyped connection established? */
922 if ((bits & (1 << SOCKNAL_CONN_ANY)) != 0)
925 /* connection being established? */
926 if (route->ksnr_connecting != 0)
929 /* too soon to retry this guy? */
930 if (!time_after_eq (jiffies, route->ksnr_timeout))
940 ksocknal_find_connecting_route_locked (ksock_peer_t *peer)
942 struct list_head *tmp;
943 ksock_route_t *route;
945 list_for_each (tmp, &peer->ksnp_routes) {
946 route = list_entry (tmp, ksock_route_t, ksnr_list);
948 if (route->ksnr_connecting != 0)
956 ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid)
961 ksock_route_t *route;
964 /* Ensure the frags we've been given EXACTLY match the number of
965 * bytes we want to send. Many TCP/IP stacks disregard any total
966 * size parameters passed to them and just look at the frags.
968 * We always expect at least 1 mapped fragment containing the
969 * complete portals header. */
970 LASSERT (lib_iov_nob (tx->tx_niov, tx->tx_iov) +
971 lib_kiov_nob (tx->tx_nkiov, tx->tx_kiov) == tx->tx_nob);
972 LASSERT (tx->tx_niov >= 1);
973 LASSERT (tx->tx_iov[0].iov_len >= sizeof (ptl_hdr_t));
975 CDEBUG (D_NET, "packet %p type %d, nob %d niov %d nkiov %d\n",
976 tx, ((ptl_hdr_t *)tx->tx_iov[0].iov_base)->type,
977 tx->tx_nob, tx->tx_niov, tx->tx_nkiov);
979 tx->tx_conn = NULL; /* only set when assigned a conn */
980 tx->tx_resid = tx->tx_nob;
981 tx->tx_hdr = (ptl_hdr_t *)tx->tx_iov[0].iov_base;
983 g_lock = &ksocknal_data.ksnd_global_lock;
984 #if !SOCKNAL_ROUND_ROBIN
987 peer = ksocknal_find_target_peer_locked (tx, nid);
989 read_unlock (g_lock);
990 return (-EHOSTUNREACH);
993 if (ksocknal_find_connectable_route_locked(peer) == NULL) {
994 conn = ksocknal_find_conn_locked (tx, peer);
996 /* I've got no autoconnect routes that need to be
997 * connecting and I do have an actual connection... */
998 ksocknal_queue_tx_locked (tx, conn);
999 read_unlock (g_lock);
1004 /* I'll need a write lock... */
1005 read_unlock (g_lock);
1007 write_lock_irqsave(g_lock, flags);
1009 peer = ksocknal_find_target_peer_locked (tx, nid);
1011 write_unlock_irqrestore(g_lock, flags);
1012 return (-EHOSTUNREACH);
1016 /* launch any/all autoconnections that need it */
1017 route = ksocknal_find_connectable_route_locked (peer);
1021 ksocknal_launch_autoconnect_locked (route);
1024 conn = ksocknal_find_conn_locked (tx, peer);
1026 /* Connection exists; queue message on it */
1027 ksocknal_queue_tx_locked (tx, conn);
1028 write_unlock_irqrestore (g_lock, flags);
1032 route = ksocknal_find_connecting_route_locked (peer);
1033 if (route != NULL) {
1034 /* At least 1 connection is being established; queue the
1036 list_add_tail (&tx->tx_list, &peer->ksnp_tx_queue);
1037 write_unlock_irqrestore (g_lock, flags);
1041 write_unlock_irqrestore (g_lock, flags);
1042 return (-EHOSTUNREACH);
1046 ksocknal_sendmsg(lib_nal_t *nal,
1053 unsigned int payload_niov,
1054 struct iovec *payload_iov,
1055 ptl_kiov_t *payload_kiov,
1056 size_t payload_offset,
1063 /* NB 'private' is different depending on what we're sending.
1064 * Just ignore it... */
1066 CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64
1067 " pid %d\n", payload_nob, payload_niov, nid , pid);
1069 LASSERT (payload_nob == 0 || payload_niov > 0);
1070 LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1072 /* It must be OK to kmap() if required */
1073 LASSERT (payload_kiov == NULL || !in_interrupt ());
1074 /* payload is either all vaddrs or all pages */
1075 LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1077 if (payload_iov != NULL)
1078 desc_size = offsetof(ksock_ltx_t, ltx_iov[1 + payload_niov]);
1080 desc_size = offsetof(ksock_ltx_t, ltx_kiov[payload_niov]);
1082 if (in_interrupt() ||
1083 type == PTL_MSG_ACK ||
1084 type == PTL_MSG_REPLY) {
1085 /* Can't block if in interrupt or responding to an incoming
1087 PORTAL_ALLOC_ATOMIC(ltx, desc_size);
1089 PORTAL_ALLOC(ltx, desc_size);
1093 CERROR("Can't allocate tx desc type %d size %d %s\n",
1094 type, desc_size, in_interrupt() ? "(intr)" : "");
1095 return (PTL_NO_SPACE);
1098 atomic_inc(&ksocknal_data.ksnd_nactive_ltxs);
1100 ltx->ltx_desc_size = desc_size;
1102 /* We always have 1 mapped frag for the header */
1103 ltx->ltx_tx.tx_iov = ltx->ltx_iov;
1104 ltx->ltx_iov[0].iov_base = <x->ltx_hdr;
1105 ltx->ltx_iov[0].iov_len = sizeof(*hdr);
1106 ltx->ltx_hdr = *hdr;
1108 ltx->ltx_private = private;
1109 ltx->ltx_cookie = cookie;
1111 ltx->ltx_tx.tx_isfwd = 0;
1112 ltx->ltx_tx.tx_nob = sizeof (*hdr) + payload_nob;
1114 if (payload_iov != NULL) {
1115 /* payload is all mapped */
1116 ltx->ltx_tx.tx_kiov = NULL;
1117 ltx->ltx_tx.tx_nkiov = 0;
1119 ltx->ltx_tx.tx_niov =
1120 1 + lib_extract_iov(payload_niov, <x->ltx_iov[1],
1121 payload_niov, payload_iov,
1122 payload_offset, payload_nob);
1124 /* payload is all pages */
1125 ltx->ltx_tx.tx_niov = 1;
1127 ltx->ltx_tx.tx_kiov = ltx->ltx_kiov;
1128 ltx->ltx_tx.tx_nkiov =
1129 lib_extract_kiov(payload_niov, ltx->ltx_kiov,
1130 payload_niov, payload_kiov,
1131 payload_offset, payload_nob);
1134 rc = ksocknal_launch_packet(<x->ltx_tx, nid);
1138 ksocknal_free_ltx(ltx);
1143 ksocknal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
1144 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1145 unsigned int payload_niov, struct iovec *payload_iov,
1146 size_t payload_offset, size_t payload_len)
1148 return (ksocknal_sendmsg(nal, private, cookie,
1149 hdr, type, nid, pid,
1150 payload_niov, payload_iov, NULL,
1151 payload_offset, payload_len));
1155 ksocknal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie,
1156 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1157 unsigned int payload_niov, ptl_kiov_t *payload_kiov,
1158 size_t payload_offset, size_t payload_len)
1160 return (ksocknal_sendmsg(nal, private, cookie,
1161 hdr, type, nid, pid,
1162 payload_niov, NULL, payload_kiov,
1163 payload_offset, payload_len));
1167 ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
1169 ptl_nid_t nid = fwd->kprfd_gateway_nid;
1170 ksock_ftx_t *ftx = (ksock_ftx_t *)&fwd->kprfd_scratch;
1173 CDEBUG (D_NET, "Forwarding [%p] -> "LPX64" ("LPX64"))\n", fwd,
1174 fwd->kprfd_gateway_nid, fwd->kprfd_target_nid);
1176 /* I'm the gateway; must be the last hop */
1177 if (nid == ksocknal_lib.libnal_ni.ni_pid.nid)
1178 nid = fwd->kprfd_target_nid;
1180 /* setup iov for hdr */
1181 ftx->ftx_iov.iov_base = fwd->kprfd_hdr;
1182 ftx->ftx_iov.iov_len = sizeof(ptl_hdr_t);
1184 ftx->ftx_tx.tx_isfwd = 1; /* This is a forwarding packet */
1185 ftx->ftx_tx.tx_nob = sizeof(ptl_hdr_t) + fwd->kprfd_nob;
1186 ftx->ftx_tx.tx_niov = 1;
1187 ftx->ftx_tx.tx_iov = &ftx->ftx_iov;
1188 ftx->ftx_tx.tx_nkiov = fwd->kprfd_niov;
1189 ftx->ftx_tx.tx_kiov = fwd->kprfd_kiov;
1191 rc = ksocknal_launch_packet (&ftx->ftx_tx, nid);
1193 kpr_fwd_done (&ksocknal_data.ksnd_router, fwd, rc);
1197 ksocknal_thread_start (int (*fn)(void *arg), void *arg)
1199 long pid = kernel_thread (fn, arg, 0);
1200 unsigned long flags;
1205 write_lock_irqsave(&ksocknal_data.ksnd_global_lock, flags);
1206 ksocknal_data.ksnd_nthreads++;
1207 write_unlock_irqrestore(&ksocknal_data.ksnd_global_lock, flags);
1212 ksocknal_thread_fini (void)
1214 unsigned long flags;
1216 write_lock_irqsave(&ksocknal_data.ksnd_global_lock, flags);
1217 ksocknal_data.ksnd_nthreads--;
1218 write_unlock_irqrestore(&ksocknal_data.ksnd_global_lock, flags);
1222 ksocknal_fmb_callback (void *arg, int error)
1224 ksock_fmb_t *fmb = (ksock_fmb_t *)arg;
1225 ksock_fmb_pool_t *fmp = fmb->fmb_pool;
1226 ptl_hdr_t *hdr = &fmb->fmb_hdr;
1227 ksock_conn_t *conn = NULL;
1228 ksock_sched_t *sched;
1229 unsigned long flags;
1230 char ipbuf[PTL_NALFMT_SIZE];
1231 char ipbuf2[PTL_NALFMT_SIZE];
1234 CERROR("Failed to route packet from "
1235 LPX64" %s to "LPX64" %s: %d\n",
1236 le64_to_cpu(hdr->src_nid),
1237 portals_nid2str(SOCKNAL, le64_to_cpu(hdr->src_nid), ipbuf),
1238 le64_to_cpu(hdr->dest_nid),
1239 portals_nid2str(SOCKNAL, le64_to_cpu(hdr->dest_nid), ipbuf2),
1242 CDEBUG (D_NET, "routed packet from "LPX64" to "LPX64": OK\n",
1243 le64_to_cpu(hdr->src_nid), le64_to_cpu(hdr->dest_nid));
1245 /* drop peer ref taken on init */
1246 ksocknal_put_peer (fmb->fmb_peer);
1248 spin_lock_irqsave (&fmp->fmp_lock, flags);
1250 list_add (&fmb->fmb_list, &fmp->fmp_idle_fmbs);
1251 fmp->fmp_nactive_fmbs--;
1253 if (!list_empty (&fmp->fmp_blocked_conns)) {
1254 conn = list_entry (fmb->fmb_pool->fmp_blocked_conns.next,
1255 ksock_conn_t, ksnc_rx_list);
1256 list_del (&conn->ksnc_rx_list);
1259 spin_unlock_irqrestore (&fmp->fmp_lock, flags);
1264 CDEBUG (D_NET, "Scheduling conn %p\n", conn);
1265 LASSERT (conn->ksnc_rx_scheduled);
1266 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP);
1268 conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;
1270 sched = conn->ksnc_scheduler;
1272 spin_lock_irqsave (&sched->kss_lock, flags);
1274 list_add_tail (&conn->ksnc_rx_list, &sched->kss_rx_conns);
1275 wake_up (&sched->kss_waitq);
1277 spin_unlock_irqrestore (&sched->kss_lock, flags);
1281 ksocknal_get_idle_fmb (ksock_conn_t *conn)
1283 int payload_nob = conn->ksnc_rx_nob_left;
1284 unsigned long flags;
1285 ksock_fmb_pool_t *pool;
1288 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
1289 LASSERT (kpr_routing(&ksocknal_data.ksnd_router));
1291 if (payload_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE)
1292 pool = &ksocknal_data.ksnd_small_fmp;
1294 pool = &ksocknal_data.ksnd_large_fmp;
1296 spin_lock_irqsave (&pool->fmp_lock, flags);
1298 if (!list_empty (&pool->fmp_idle_fmbs)) {
1299 fmb = list_entry(pool->fmp_idle_fmbs.next,
1300 ksock_fmb_t, fmb_list);
1301 list_del (&fmb->fmb_list);
1302 pool->fmp_nactive_fmbs++;
1303 spin_unlock_irqrestore (&pool->fmp_lock, flags);
1308 /* deschedule until fmb free */
1310 conn->ksnc_rx_state = SOCKNAL_RX_FMB_SLEEP;
1312 list_add_tail (&conn->ksnc_rx_list,
1313 &pool->fmp_blocked_conns);
1315 spin_unlock_irqrestore (&pool->fmp_lock, flags);
1320 ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
1322 int payload_nob = conn->ksnc_rx_nob_left;
1323 ptl_nid_t dest_nid = le64_to_cpu(conn->ksnc_hdr.dest_nid);
1325 int nob = payload_nob;
1327 LASSERT (conn->ksnc_rx_scheduled);
1328 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
1329 LASSERT (conn->ksnc_rx_nob_wanted == conn->ksnc_rx_nob_left);
1330 LASSERT (payload_nob >= 0);
1331 LASSERT (payload_nob <= fmb->fmb_pool->fmp_buff_pages * PAGE_SIZE);
1332 LASSERT (sizeof (ptl_hdr_t) < PAGE_SIZE);
1333 LASSERT (fmb->fmb_kiov[0].kiov_offset == 0);
1335 /* Take a ref on the conn's peer to prevent module unload before
1336 * forwarding completes. */
1337 fmb->fmb_peer = conn->ksnc_peer;
1338 atomic_inc (&conn->ksnc_peer->ksnp_refcount);
1340 /* Copy the header we just read into the forwarding buffer. If
1341 * there's payload, start reading reading it into the buffer,
1342 * otherwise the forwarding buffer can be kicked off
1344 fmb->fmb_hdr = conn->ksnc_hdr;
1347 LASSERT (niov < fmb->fmb_pool->fmp_buff_pages);
1348 LASSERT (fmb->fmb_kiov[niov].kiov_offset == 0);
1349 fmb->fmb_kiov[niov].kiov_len = MIN (PAGE_SIZE, nob);
1354 kpr_fwd_init(&fmb->fmb_fwd, dest_nid, &fmb->fmb_hdr,
1355 payload_nob, niov, fmb->fmb_kiov,
1356 ksocknal_fmb_callback, fmb);
1358 if (payload_nob == 0) { /* got complete packet already */
1359 CDEBUG (D_NET, "%p "LPX64"->"LPX64" fwd_start (immediate)\n",
1360 conn, le64_to_cpu(conn->ksnc_hdr.src_nid), dest_nid);
1362 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
1364 ksocknal_new_packet (conn, 0); /* on to next packet */
1368 conn->ksnc_cookie = fmb; /* stash fmb for later */
1369 conn->ksnc_rx_state = SOCKNAL_RX_BODY_FWD; /* read in the payload */
1371 /* Set up conn->ksnc_rx_kiov to read the payload into fmb's kiov-ed
1373 LASSERT (niov <= sizeof(conn->ksnc_rx_iov_space)/sizeof(ptl_kiov_t));
1375 conn->ksnc_rx_niov = 0;
1376 conn->ksnc_rx_nkiov = niov;
1377 conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
1378 memcpy(conn->ksnc_rx_kiov, fmb->fmb_kiov, niov * sizeof(ptl_kiov_t));
1380 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d reading body\n", conn,
1381 le64_to_cpu(conn->ksnc_hdr.src_nid), dest_nid, payload_nob);
1386 ksocknal_fwd_parse (ksock_conn_t *conn)
1389 ptl_nid_t dest_nid = le64_to_cpu(conn->ksnc_hdr.dest_nid);
1390 ptl_nid_t src_nid = le64_to_cpu(conn->ksnc_hdr.src_nid);
1391 int body_len = le32_to_cpu(conn->ksnc_hdr.payload_length);
1392 char str[PTL_NALFMT_SIZE];
1393 char str2[PTL_NALFMT_SIZE];
1395 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d parsing header\n", conn,
1396 src_nid, dest_nid, conn->ksnc_rx_nob_left);
1398 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER);
1399 LASSERT (conn->ksnc_rx_scheduled);
1401 if (body_len < 0) { /* length corrupt (overflow) */
1402 CERROR("dropping packet from "LPX64" (%s) for "LPX64" (%s): "
1403 "packet size %d illegal\n",
1404 src_nid, portals_nid2str(TCPNAL, src_nid, str),
1405 dest_nid, portals_nid2str(TCPNAL, dest_nid, str2),
1408 ksocknal_new_packet (conn, 0); /* on to new packet */
1412 if (!kpr_routing(&ksocknal_data.ksnd_router)) { /* not forwarding */
1413 CERROR("dropping packet from "LPX64" (%s) for "LPX64
1414 " (%s): not forwarding\n",
1415 src_nid, portals_nid2str(TCPNAL, src_nid, str),
1416 dest_nid, portals_nid2str(TCPNAL, dest_nid, str2));
1417 /* on to new packet (skip this one's body) */
1418 ksocknal_new_packet (conn, body_len);
1422 if (body_len > PTL_MTU) { /* too big to forward */
1423 CERROR ("dropping packet from "LPX64" (%s) for "LPX64
1424 "(%s): packet size %d too big\n",
1425 src_nid, portals_nid2str(TCPNAL, src_nid, str),
1426 dest_nid, portals_nid2str(TCPNAL, dest_nid, str2),
1428 /* on to new packet (skip this one's body) */
1429 ksocknal_new_packet (conn, body_len);
1433 /* should have gone direct */
1434 peer = ksocknal_get_peer (conn->ksnc_hdr.dest_nid);
1436 CERROR ("dropping packet from "LPX64" (%s) for "LPX64
1437 "(%s): target is a peer\n",
1438 src_nid, portals_nid2str(TCPNAL, src_nid, str),
1439 dest_nid, portals_nid2str(TCPNAL, dest_nid, str2));
1440 ksocknal_put_peer (peer); /* drop ref from get above */
1442 /* on to next packet (skip this one's body) */
1443 ksocknal_new_packet (conn, body_len);
1447 conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB; /* Getting FMB now */
1448 conn->ksnc_rx_nob_left = body_len; /* stash packet size */
1449 conn->ksnc_rx_nob_wanted = body_len; /* (no slop) */
1453 ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip)
1455 static char ksocknal_slop_buffer[4096];
1461 if (nob_to_skip == 0) { /* right at next packet boundary now */
1462 conn->ksnc_rx_started = 0;
1463 mb (); /* racing with timeout thread */
1465 conn->ksnc_rx_state = SOCKNAL_RX_HEADER;
1466 conn->ksnc_rx_nob_wanted = sizeof (ptl_hdr_t);
1467 conn->ksnc_rx_nob_left = sizeof (ptl_hdr_t);
1469 conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1470 conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_hdr;
1471 conn->ksnc_rx_iov[0].iov_len = sizeof (ptl_hdr_t);
1472 conn->ksnc_rx_niov = 1;
1474 conn->ksnc_rx_kiov = NULL;
1475 conn->ksnc_rx_nkiov = 0;
1479 /* Set up to skip as much a possible now. If there's more left
1480 * (ran out of iov entries) we'll get called again */
1482 conn->ksnc_rx_state = SOCKNAL_RX_SLOP;
1483 conn->ksnc_rx_nob_left = nob_to_skip;
1484 conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1489 nob = MIN (nob_to_skip, sizeof (ksocknal_slop_buffer));
1491 conn->ksnc_rx_iov[niov].iov_base = ksocknal_slop_buffer;
1492 conn->ksnc_rx_iov[niov].iov_len = nob;
1497 } while (nob_to_skip != 0 && /* mustn't overflow conn's rx iov */
1498 niov < sizeof(conn->ksnc_rx_iov_space) / sizeof (struct iovec));
1500 conn->ksnc_rx_niov = niov;
1501 conn->ksnc_rx_kiov = NULL;
1502 conn->ksnc_rx_nkiov = 0;
1503 conn->ksnc_rx_nob_wanted = skipped;
1508 ksocknal_process_receive (ksock_conn_t *conn)
1513 LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
1515 /* doesn't need a forwarding buffer */
1516 if (conn->ksnc_rx_state != SOCKNAL_RX_GET_FMB)
1520 fmb = ksocknal_get_idle_fmb (conn);
1522 /* conn descheduled waiting for idle fmb */
1526 if (ksocknal_init_fmb (conn, fmb)) {
1527 /* packet forwarded */
1532 /* NB: sched lock NOT held */
1533 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER ||
1534 conn->ksnc_rx_state == SOCKNAL_RX_BODY ||
1535 conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD ||
1536 conn->ksnc_rx_state == SOCKNAL_RX_SLOP);
1538 LASSERT (conn->ksnc_rx_nob_wanted > 0);
1540 rc = ksocknal_receive(conn);
1543 LASSERT (rc != -EAGAIN);
1546 CWARN ("[%p] EOF from "LPX64" ip %d.%d.%d.%d:%d\n",
1547 conn, conn->ksnc_peer->ksnp_nid,
1548 HIPQUAD(conn->ksnc_ipaddr),
1550 else if (!conn->ksnc_closing)
1551 CERROR ("[%p] Error %d on read from "LPX64
1552 " ip %d.%d.%d.%d:%d\n",
1553 conn, rc, conn->ksnc_peer->ksnp_nid,
1554 HIPQUAD(conn->ksnc_ipaddr),
1557 ksocknal_close_conn_and_siblings (conn, rc);
1558 return (rc == 0 ? -ESHUTDOWN : rc);
1561 if (conn->ksnc_rx_nob_wanted != 0) {
1566 switch (conn->ksnc_rx_state) {
1567 case SOCKNAL_RX_HEADER:
1568 if (conn->ksnc_hdr.type != cpu_to_le32(PTL_MSG_HELLO) &&
1569 le64_to_cpu(conn->ksnc_hdr.dest_nid) !=
1570 ksocknal_lib.libnal_ni.ni_pid.nid) {
1571 /* This packet isn't for me */
1572 ksocknal_fwd_parse (conn);
1573 switch (conn->ksnc_rx_state) {
1574 case SOCKNAL_RX_HEADER: /* skipped (zero payload) */
1575 return (0); /* => come back later */
1576 case SOCKNAL_RX_SLOP: /* skipping packet's body */
1577 goto try_read; /* => go read it */
1578 case SOCKNAL_RX_GET_FMB: /* forwarding */
1579 goto get_fmb; /* => go get a fwd msg buffer */
1586 /* sets wanted_len, iovs etc */
1587 rc = lib_parse(&ksocknal_lib, &conn->ksnc_hdr, conn);
1590 /* I just received garbage: give up on this conn */
1591 ksocknal_close_conn_and_siblings (conn, rc);
1595 if (conn->ksnc_rx_nob_wanted != 0) { /* need to get payload? */
1596 conn->ksnc_rx_state = SOCKNAL_RX_BODY;
1597 goto try_read; /* go read the payload */
1599 /* Fall through (completed packet for me) */
1601 case SOCKNAL_RX_BODY:
1602 /* payload all received */
1603 lib_finalize(&ksocknal_lib, NULL, conn->ksnc_cookie, PTL_OK);
1606 case SOCKNAL_RX_SLOP:
1607 /* starting new packet? */
1608 if (ksocknal_new_packet (conn, conn->ksnc_rx_nob_left))
1609 return (0); /* come back later */
1610 goto try_read; /* try to finish reading slop now */
1612 case SOCKNAL_RX_BODY_FWD:
1613 /* payload all received */
1614 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (got body)\n",
1615 conn, le64_to_cpu(conn->ksnc_hdr.src_nid),
1616 le64_to_cpu(conn->ksnc_hdr.dest_nid),
1617 conn->ksnc_rx_nob_left);
1619 /* forward the packet. NB ksocknal_init_fmb() put fmb into
1620 * conn->ksnc_cookie */
1621 fmb = (ksock_fmb_t *)conn->ksnc_cookie;
1622 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
1624 /* no slop in forwarded packets */
1625 LASSERT (conn->ksnc_rx_nob_left == 0);
1627 ksocknal_new_packet (conn, 0); /* on to next packet */
1628 return (0); /* (later) */
1636 return (-EINVAL); /* keep gcc happy */
1640 ksocknal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
1641 unsigned int niov, struct iovec *iov,
1642 size_t offset, size_t mlen, size_t rlen)
1644 ksock_conn_t *conn = (ksock_conn_t *)private;
1646 LASSERT (mlen <= rlen);
1647 LASSERT (niov <= PTL_MD_MAX_IOV);
1649 conn->ksnc_cookie = msg;
1650 conn->ksnc_rx_nob_wanted = mlen;
1651 conn->ksnc_rx_nob_left = rlen;
1653 conn->ksnc_rx_nkiov = 0;
1654 conn->ksnc_rx_kiov = NULL;
1655 conn->ksnc_rx_iov = conn->ksnc_rx_iov_space.iov;
1656 conn->ksnc_rx_niov =
1657 lib_extract_iov(PTL_MD_MAX_IOV, conn->ksnc_rx_iov,
1658 niov, iov, offset, mlen);
1661 lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1662 lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1668 ksocknal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
1669 unsigned int niov, ptl_kiov_t *kiov,
1670 size_t offset, size_t mlen, size_t rlen)
1672 ksock_conn_t *conn = (ksock_conn_t *)private;
1674 LASSERT (mlen <= rlen);
1675 LASSERT (niov <= PTL_MD_MAX_IOV);
1677 conn->ksnc_cookie = msg;
1678 conn->ksnc_rx_nob_wanted = mlen;
1679 conn->ksnc_rx_nob_left = rlen;
1681 conn->ksnc_rx_niov = 0;
1682 conn->ksnc_rx_iov = NULL;
1683 conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
1684 conn->ksnc_rx_nkiov =
1685 lib_extract_kiov(PTL_MD_MAX_IOV, conn->ksnc_rx_kiov,
1686 niov, kiov, offset, mlen);
1689 lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1690 lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1696 ksocknal_sched_cansleep(ksock_sched_t *sched)
1698 unsigned long flags;
1701 spin_lock_irqsave(&sched->kss_lock, flags);
1703 rc = (!ksocknal_data.ksnd_shuttingdown &&
1705 list_empty(&sched->kss_zctxdone_list) &&
1707 list_empty(&sched->kss_rx_conns) &&
1708 list_empty(&sched->kss_tx_conns));
1710 spin_unlock_irqrestore(&sched->kss_lock, flags);
1714 int ksocknal_scheduler (void *arg)
1716 ksock_sched_t *sched = (ksock_sched_t *)arg;
1719 unsigned long flags;
1722 int id = sched - ksocknal_data.ksnd_schedulers;
1725 snprintf (name, sizeof (name),"ksocknald_%02d", id);
1726 kportal_daemonize (name);
1727 kportal_blockallsigs ();
1729 #if (CONFIG_SMP && CPU_AFFINITY)
1730 id = ksocknal_sched2cpu(id);
1731 if (cpu_online(id)) {
1734 set_cpus_allowed(current, m);
1736 CERROR ("Can't set CPU affinity for %s to %d\n", name, id);
1738 #endif /* CONFIG_SMP && CPU_AFFINITY */
1740 spin_lock_irqsave (&sched->kss_lock, flags);
1742 while (!ksocknal_data.ksnd_shuttingdown) {
1743 int did_something = 0;
1745 /* Ensure I progress everything semi-fairly */
1747 if (!list_empty (&sched->kss_rx_conns)) {
1748 conn = list_entry(sched->kss_rx_conns.next,
1749 ksock_conn_t, ksnc_rx_list);
1750 list_del(&conn->ksnc_rx_list);
1752 LASSERT(conn->ksnc_rx_scheduled);
1753 LASSERT(conn->ksnc_rx_ready);
1755 /* clear rx_ready in case receive isn't complete.
1756 * Do it BEFORE we call process_recv, since
1757 * data_ready can set it any time after we release
1759 conn->ksnc_rx_ready = 0;
1760 spin_unlock_irqrestore(&sched->kss_lock, flags);
1762 rc = ksocknal_process_receive(conn);
1764 spin_lock_irqsave(&sched->kss_lock, flags);
1766 /* I'm the only one that can clear this flag */
1767 LASSERT(conn->ksnc_rx_scheduled);
1769 /* Did process_receive get everything it wanted? */
1771 conn->ksnc_rx_ready = 1;
1773 if (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP ||
1774 conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB) {
1775 /* Conn blocked for a forwarding buffer.
1776 * It will get queued for my attention when
1777 * one becomes available (and it might just
1778 * already have been!). Meanwhile my ref
1779 * on it stays put. */
1780 } else if (conn->ksnc_rx_ready) {
1781 /* reschedule for rx */
1782 list_add_tail (&conn->ksnc_rx_list,
1783 &sched->kss_rx_conns);
1785 conn->ksnc_rx_scheduled = 0;
1787 ksocknal_put_conn(conn);
1793 if (!list_empty (&sched->kss_tx_conns)) {
1794 conn = list_entry(sched->kss_tx_conns.next,
1795 ksock_conn_t, ksnc_tx_list);
1796 list_del (&conn->ksnc_tx_list);
1798 LASSERT(conn->ksnc_tx_scheduled);
1799 LASSERT(conn->ksnc_tx_ready);
1800 LASSERT(!list_empty(&conn->ksnc_tx_queue));
1802 tx = list_entry(conn->ksnc_tx_queue.next,
1803 ksock_tx_t, tx_list);
1804 /* dequeue now so empty list => more to send */
1805 list_del(&tx->tx_list);
1807 /* Clear tx_ready in case send isn't complete. Do
1808 * it BEFORE we call process_transmit, since
1809 * write_space can set it any time after we release
1811 conn->ksnc_tx_ready = 0;
1812 spin_unlock_irqrestore (&sched->kss_lock, flags);
1814 rc = ksocknal_process_transmit(conn, tx);
1816 spin_lock_irqsave (&sched->kss_lock, flags);
1818 if (rc == -ENOMEM || rc == -EAGAIN) {
1819 /* Incomplete send: replace tx on HEAD of tx_queue */
1820 list_add (&tx->tx_list, &conn->ksnc_tx_queue);
1822 /* Complete send; assume space for more */
1823 conn->ksnc_tx_ready = 1;
1826 if (rc == -ENOMEM) {
1827 /* Do nothing; after a short timeout, this
1828 * conn will be reposted on kss_tx_conns. */
1829 } else if (conn->ksnc_tx_ready &&
1830 !list_empty (&conn->ksnc_tx_queue)) {
1831 /* reschedule for tx */
1832 list_add_tail (&conn->ksnc_tx_list,
1833 &sched->kss_tx_conns);
1835 conn->ksnc_tx_scheduled = 0;
1837 ksocknal_put_conn (conn);
1843 if (!list_empty (&sched->kss_zctxdone_list)) {
1845 list_entry(sched->kss_zctxdone_list.next,
1846 ksock_tx_t, tx_list);
1849 list_del (&tx->tx_list);
1850 spin_unlock_irqrestore (&sched->kss_lock, flags);
1852 ksocknal_tx_done (tx, 1);
1854 spin_lock_irqsave (&sched->kss_lock, flags);
1857 if (!did_something || /* nothing to do */
1858 ++nloops == SOCKNAL_RESCHED) { /* hogging CPU? */
1859 spin_unlock_irqrestore (&sched->kss_lock, flags);
1863 if (!did_something) { /* wait for something to do */
1864 rc = wait_event_interruptible (sched->kss_waitq,
1865 !ksocknal_sched_cansleep(sched));
1870 spin_lock_irqsave (&sched->kss_lock, flags);
1874 spin_unlock_irqrestore (&sched->kss_lock, flags);
1875 ksocknal_thread_fini ();
1880 ksocknal_data_ready (struct sock *sk, int n)
1882 unsigned long flags;
1884 ksock_sched_t *sched;
1887 /* interleave correctly with closing sockets... */
1888 read_lock (&ksocknal_data.ksnd_global_lock);
1890 conn = sk->sk_user_data;
1891 if (conn == NULL) { /* raced with ksocknal_terminate_conn */
1892 LASSERT (sk->sk_data_ready != &ksocknal_data_ready);
1893 sk->sk_data_ready (sk, n);
1895 sched = conn->ksnc_scheduler;
1897 spin_lock_irqsave (&sched->kss_lock, flags);
1899 conn->ksnc_rx_ready = 1;
1901 if (!conn->ksnc_rx_scheduled) { /* not being progressed */
1902 list_add_tail(&conn->ksnc_rx_list,
1903 &sched->kss_rx_conns);
1904 conn->ksnc_rx_scheduled = 1;
1905 /* extra ref for scheduler */
1906 atomic_inc (&conn->ksnc_refcount);
1908 wake_up (&sched->kss_waitq);
1911 spin_unlock_irqrestore (&sched->kss_lock, flags);
1914 read_unlock (&ksocknal_data.ksnd_global_lock);
1920 ksocknal_write_space (struct sock *sk)
1922 unsigned long flags;
1924 ksock_sched_t *sched;
1926 /* interleave correctly with closing sockets... */
1927 read_lock (&ksocknal_data.ksnd_global_lock);
1929 conn = sk->sk_user_data;
1931 CDEBUG(D_NET, "sk %p wspace %d low water %d conn %p%s%s%s\n",
1932 sk, tcp_wspace(sk), SOCKNAL_TX_LOW_WATER(sk), conn,
1933 (conn == NULL) ? "" : (conn->ksnc_tx_ready ?
1934 " ready" : " blocked"),
1935 (conn == NULL) ? "" : (conn->ksnc_tx_scheduled ?
1936 " scheduled" : " idle"),
1937 (conn == NULL) ? "" : (list_empty (&conn->ksnc_tx_queue) ?
1938 " empty" : " queued"));
1940 if (conn == NULL) { /* raced with ksocknal_terminate_conn */
1941 LASSERT (sk->sk_write_space != &ksocknal_write_space);
1942 sk->sk_write_space (sk);
1944 read_unlock (&ksocknal_data.ksnd_global_lock);
1948 if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */
1949 sched = conn->ksnc_scheduler;
1951 spin_lock_irqsave (&sched->kss_lock, flags);
1953 clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags);
1954 conn->ksnc_tx_ready = 1;
1956 if (!conn->ksnc_tx_scheduled && // not being progressed
1957 !list_empty(&conn->ksnc_tx_queue)){//packets to send
1958 list_add_tail (&conn->ksnc_tx_list,
1959 &sched->kss_tx_conns);
1960 conn->ksnc_tx_scheduled = 1;
1961 /* extra ref for scheduler */
1962 atomic_inc (&conn->ksnc_refcount);
1964 wake_up (&sched->kss_waitq);
1967 spin_unlock_irqrestore (&sched->kss_lock, flags);
1970 read_unlock (&ksocknal_data.ksnd_global_lock);
1974 ksocknal_sock_write (struct socket *sock, void *buffer, int nob)
1977 mm_segment_t oldmm = get_fs();
1980 struct iovec iov = {
1984 struct msghdr msg = {
1989 .msg_control = NULL,
1990 .msg_controllen = 0,
1995 rc = sock_sendmsg (sock, &msg, iov.iov_len);
2002 CERROR ("Unexpected zero rc\n");
2003 return (-ECONNABORTED);
2006 buffer = ((char *)buffer) + rc;
2014 ksocknal_sock_read (struct socket *sock, void *buffer, int nob)
2017 mm_segment_t oldmm = get_fs();
2020 struct iovec iov = {
2024 struct msghdr msg = {
2029 .msg_control = NULL,
2030 .msg_controllen = 0,
2035 rc = sock_recvmsg (sock, &msg, iov.iov_len, 0);
2042 return (-ECONNABORTED);
2044 buffer = ((char *)buffer) + rc;
2052 ksocknal_send_hello (ksock_conn_t *conn, __u32 *ipaddrs, int nipaddrs)
2054 /* CAVEAT EMPTOR: this byte flips 'ipaddrs' */
2055 struct socket *sock = conn->ksnc_sock;
2057 ptl_magicversion_t *hmv = (ptl_magicversion_t *)&hdr.dest_nid;
2061 LASSERT (conn->ksnc_type != SOCKNAL_CONN_NONE);
2062 LASSERT (nipaddrs <= SOCKNAL_MAX_INTERFACES);
2064 /* No need for getconnsock/putconnsock */
2065 LASSERT (!conn->ksnc_closing);
2067 LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
2068 hmv->magic = cpu_to_le32 (PORTALS_PROTO_MAGIC);
2069 hmv->version_major = cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR);
2070 hmv->version_minor = cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR);
2072 hdr.src_nid = cpu_to_le64 (ksocknal_lib.libnal_ni.ni_pid.nid);
2073 hdr.type = cpu_to_le32 (PTL_MSG_HELLO);
2074 hdr.payload_length = cpu_to_le32 (nipaddrs * sizeof(*ipaddrs));
2076 hdr.msg.hello.type = cpu_to_le32 (conn->ksnc_type);
2077 hdr.msg.hello.incarnation =
2078 cpu_to_le64 (ksocknal_data.ksnd_incarnation);
2080 /* Receiver is eager */
2081 rc = ksocknal_sock_write (sock, &hdr, sizeof(hdr));
2083 CERROR ("Error %d sending HELLO hdr to %u.%u.%u.%u/%d\n",
2084 rc, HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
2091 for (i = 0; i < nipaddrs; i++) {
2092 ipaddrs[i] = __cpu_to_le32 (ipaddrs[i]);
2095 rc = ksocknal_sock_write (sock, ipaddrs, nipaddrs * sizeof(*ipaddrs));
2097 CERROR ("Error %d sending HELLO payload (%d)"
2098 " to %u.%u.%u.%u/%d\n", rc, nipaddrs,
2099 HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
2104 ksocknal_invert_type(int type)
2108 case SOCKNAL_CONN_ANY:
2109 case SOCKNAL_CONN_CONTROL:
2111 case SOCKNAL_CONN_BULK_IN:
2112 return SOCKNAL_CONN_BULK_OUT;
2113 case SOCKNAL_CONN_BULK_OUT:
2114 return SOCKNAL_CONN_BULK_IN;
2116 return (SOCKNAL_CONN_NONE);
2121 ksocknal_recv_hello (ksock_conn_t *conn, ptl_nid_t *nid,
2122 __u64 *incarnation, __u32 *ipaddrs)
2124 struct socket *sock = conn->ksnc_sock;
2130 ptl_magicversion_t *hmv;
2132 hmv = (ptl_magicversion_t *)&hdr.dest_nid;
2133 LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
2135 rc = ksocknal_sock_read (sock, hmv, sizeof (*hmv));
2137 CERROR ("Error %d reading HELLO from %u.%u.%u.%u\n",
2138 rc, HIPQUAD(conn->ksnc_ipaddr));
2142 if (hmv->magic != le32_to_cpu (PORTALS_PROTO_MAGIC)) {
2143 CERROR ("Bad magic %#08x (%#08x expected) from %u.%u.%u.%u\n",
2144 __cpu_to_le32 (hmv->magic), PORTALS_PROTO_MAGIC,
2145 HIPQUAD(conn->ksnc_ipaddr));
2149 if (hmv->version_major != cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR) ||
2150 hmv->version_minor != cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR)) {
2151 CERROR ("Incompatible protocol version %d.%d (%d.%d expected)"
2152 " from %u.%u.%u.%u\n",
2153 le16_to_cpu (hmv->version_major),
2154 le16_to_cpu (hmv->version_minor),
2155 PORTALS_PROTO_VERSION_MAJOR,
2156 PORTALS_PROTO_VERSION_MINOR,
2157 HIPQUAD(conn->ksnc_ipaddr));
2161 #if (PORTALS_PROTO_VERSION_MAJOR != 1)
2162 # error "This code only understands protocol version 1.x"
2164 /* version 1 sends magic/version as the dest_nid of a 'hello'
2165 * header, followed by payload full of interface IP addresses.
2166 * Read the rest of it in now... */
2168 rc = ksocknal_sock_read (sock, hmv + 1, sizeof (hdr) - sizeof (*hmv));
2170 CERROR ("Error %d reading rest of HELLO hdr from %u.%u.%u.%u\n",
2171 rc, HIPQUAD(conn->ksnc_ipaddr));
2175 /* ...and check we got what we expected */
2176 if (hdr.type != cpu_to_le32 (PTL_MSG_HELLO)) {
2177 CERROR ("Expecting a HELLO hdr,"
2178 " but got type %d from %u.%u.%u.%u\n",
2179 le32_to_cpu (hdr.type),
2180 HIPQUAD(conn->ksnc_ipaddr));
2184 if (le64_to_cpu(hdr.src_nid) == PTL_NID_ANY) {
2185 CERROR("Expecting a HELLO hdr with a NID, but got PTL_NID_ANY"
2186 "from %u.%u.%u.%u\n", HIPQUAD(conn->ksnc_ipaddr));
2190 if (*nid == PTL_NID_ANY) { /* don't know peer's nid yet */
2191 *nid = le64_to_cpu(hdr.src_nid);
2192 } else if (*nid != le64_to_cpu (hdr.src_nid)) {
2193 CERROR ("Connected to nid "LPX64"@%u.%u.%u.%u "
2194 "but expecting "LPX64"\n",
2195 le64_to_cpu (hdr.src_nid),
2196 HIPQUAD(conn->ksnc_ipaddr), *nid);
2200 type = __le32_to_cpu(hdr.msg.hello.type);
2202 if (conn->ksnc_type == SOCKNAL_CONN_NONE) {
2203 /* I've accepted this connection; peer determines type */
2204 conn->ksnc_type = ksocknal_invert_type(type);
2205 if (conn->ksnc_type == SOCKNAL_CONN_NONE) {
2206 CERROR ("Unexpected type %d from "LPX64"@%u.%u.%u.%u\n",
2207 type, *nid, HIPQUAD(conn->ksnc_ipaddr));
2210 } else if (ksocknal_invert_type(type) != conn->ksnc_type) {
2211 CERROR ("Mismatched types: me %d, "LPX64"@%u.%u.%u.%u %d\n",
2212 conn->ksnc_type, *nid, HIPQUAD(conn->ksnc_ipaddr),
2213 le32_to_cpu(hdr.msg.hello.type));
2217 *incarnation = le64_to_cpu(hdr.msg.hello.incarnation);
2219 nips = __le32_to_cpu (hdr.payload_length) / sizeof (__u32);
2221 if (nips > SOCKNAL_MAX_INTERFACES ||
2222 nips * sizeof(__u32) != __le32_to_cpu (hdr.payload_length)) {
2223 CERROR("Bad payload length %d from "LPX64"@%u.%u.%u.%u\n",
2224 __le32_to_cpu (hdr.payload_length),
2225 *nid, HIPQUAD(conn->ksnc_ipaddr));
2231 rc = ksocknal_sock_read (sock, ipaddrs, nips * sizeof(*ipaddrs));
2233 CERROR ("Error %d reading IPs from "LPX64"@%u.%u.%u.%u\n",
2234 rc, *nid, HIPQUAD(conn->ksnc_ipaddr));
2238 for (i = 0; i < nips; i++) {
2239 ipaddrs[i] = __le32_to_cpu(ipaddrs[i]);
2241 if (ipaddrs[i] == 0) {
2242 CERROR("Zero IP[%d] from "LPX64"@%u.%u.%u.%u\n",
2243 i, *nid, HIPQUAD(conn->ksnc_ipaddr));
2252 ksocknal_get_conn_tunables (ksock_conn_t *conn, int *txmem, int *rxmem, int *nagle)
2254 mm_segment_t oldmm = get_fs ();
2255 struct socket *sock = conn->ksnc_sock;
2259 rc = ksocknal_getconnsock (conn);
2261 LASSERT (conn->ksnc_closing);
2262 *txmem = *rxmem = *nagle = 0;
2263 return (-ESHUTDOWN);
2268 len = sizeof(*txmem);
2269 rc = sock_getsockopt(sock, SOL_SOCKET, SO_SNDBUF,
2270 (char *)txmem, &len);
2272 len = sizeof(*rxmem);
2273 rc = sock_getsockopt(sock, SOL_SOCKET, SO_RCVBUF,
2274 (char *)rxmem, &len);
2277 len = sizeof(*nagle);
2278 rc = sock->ops->getsockopt(sock, SOL_TCP, TCP_NODELAY,
2279 (char *)nagle, &len);
2283 ksocknal_putconnsock (conn);
2288 *txmem = *rxmem = *nagle = 0;
2294 ksocknal_setup_sock (struct socket *sock)
2296 mm_segment_t oldmm = get_fs ();
2303 struct linger linger;
2305 sock->sk->sk_allocation = GFP_NOFS;
2307 /* Ensure this socket aborts active sends immediately when we close
2311 linger.l_linger = 0;
2314 rc = sock_setsockopt (sock, SOL_SOCKET, SO_LINGER,
2315 (char *)&linger, sizeof (linger));
2318 CERROR ("Can't set SO_LINGER: %d\n", rc);
2324 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_LINGER2,
2325 (char *)&option, sizeof (option));
2328 CERROR ("Can't set SO_LINGER2: %d\n", rc);
2332 if (!ksocknal_tunables.ksnd_nagle) {
2336 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_NODELAY,
2337 (char *)&option, sizeof (option));
2340 CERROR ("Can't disable nagle: %d\n", rc);
2345 if (ksocknal_tunables.ksnd_buffer_size > 0) {
2346 option = ksocknal_tunables.ksnd_buffer_size;
2349 rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDBUF,
2350 (char *)&option, sizeof (option));
2353 CERROR ("Can't set send buffer %d: %d\n",
2359 rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVBUF,
2360 (char *)&option, sizeof (option));
2363 CERROR ("Can't set receive buffer %d: %d\n",
2369 /* snapshot tunables */
2370 keep_idle = ksocknal_tunables.ksnd_keepalive_idle;
2371 keep_count = ksocknal_tunables.ksnd_keepalive_count;
2372 keep_intvl = ksocknal_tunables.ksnd_keepalive_intvl;
2374 do_keepalive = (keep_idle > 0 && keep_count > 0 && keep_intvl > 0);
2376 option = (do_keepalive ? 1 : 0);
2378 rc = sock_setsockopt (sock, SOL_SOCKET, SO_KEEPALIVE,
2379 (char *)&option, sizeof (option));
2382 CERROR ("Can't set SO_KEEPALIVE: %d\n", rc);
2390 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPIDLE,
2391 (char *)&keep_idle, sizeof (keep_idle));
2394 CERROR ("Can't set TCP_KEEPIDLE: %d\n", rc);
2399 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPINTVL,
2400 (char *)&keep_intvl, sizeof (keep_intvl));
2403 CERROR ("Can't set TCP_KEEPINTVL: %d\n", rc);
2408 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPCNT,
2409 (char *)&keep_count, sizeof (keep_count));
2412 CERROR ("Can't set TCP_KEEPCNT: %d\n", rc);
2420 ksocknal_connect_sock(struct socket **sockp, int *may_retry,
2421 ksock_route_t *route, int local_port)
2423 struct sockaddr_in locaddr;
2424 struct sockaddr_in srvaddr;
2425 struct socket *sock;
2428 mm_segment_t oldmm = get_fs();
2431 memset(&locaddr, 0, sizeof(locaddr));
2432 locaddr.sin_family = AF_INET;
2433 locaddr.sin_port = htons(local_port);
2434 locaddr.sin_addr.s_addr =
2435 (route->ksnr_myipaddr != 0) ? htonl(route->ksnr_myipaddr)
2438 memset (&srvaddr, 0, sizeof (srvaddr));
2439 srvaddr.sin_family = AF_INET;
2440 srvaddr.sin_port = htons (route->ksnr_port);
2441 srvaddr.sin_addr.s_addr = htonl (route->ksnr_ipaddr);
2445 rc = sock_create (PF_INET, SOCK_STREAM, 0, &sock);
2448 CERROR ("Can't create autoconnect socket: %d\n", rc);
2452 /* Ugh; have to map_fd for compatibility with sockets passed in
2453 * from userspace. And we actually need the sock->file refcounting
2454 * that this gives you :) */
2456 rc = sock_map_fd (sock);
2458 sock_release (sock);
2459 CERROR ("sock_map_fd error %d\n", rc);
2463 /* NB the file descriptor (rc) now owns the ref on sock->file */
2464 LASSERT (sock->file != NULL);
2465 LASSERT (file_count(sock->file) == 1);
2467 get_file(sock->file); /* extra ref makes sock->file */
2468 sys_close(rc); /* survive this close */
2470 /* Still got a single ref on sock->file */
2471 LASSERT (file_count(sock->file) == 1);
2473 /* Set the socket timeouts, so our connection attempt completes in
2475 tv.tv_sec = ksocknal_tunables.ksnd_io_timeout;
2479 rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDTIMEO,
2480 (char *)&tv, sizeof (tv));
2483 CERROR ("Can't set send timeout %d: %d\n",
2484 ksocknal_tunables.ksnd_io_timeout, rc);
2489 rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVTIMEO,
2490 (char *)&tv, sizeof (tv));
2493 CERROR ("Can't set receive timeout %d: %d\n",
2494 ksocknal_tunables.ksnd_io_timeout, rc);
2500 rc = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
2501 (char *)&option, sizeof (option));
2504 CERROR("Can't set SO_REUSEADDR for socket: %d\n", rc);
2508 rc = sock->ops->bind(sock,
2509 (struct sockaddr *)&locaddr, sizeof(locaddr));
2510 if (rc == -EADDRINUSE) {
2511 CDEBUG(D_NET, "Port %d already in use\n", local_port);
2516 CERROR("Error trying to bind to reserved port %d: %d\n",
2521 rc = sock->ops->connect(sock,
2522 (struct sockaddr *)&srvaddr, sizeof(srvaddr),
2523 sock->file->f_flags);
2527 /* EADDRNOTAVAIL probably means we're already connected to the same
2528 * peer/port on the same local port on a differently typed
2529 * connection. Let our caller retry with a different local
2531 *may_retry = (rc == -EADDRNOTAVAIL);
2533 CDEBUG(*may_retry ? D_NET : D_ERROR,
2534 "Error %d connecting %u.%u.%u.%u/%d -> %u.%u.%u.%u/%d\n", rc,
2535 HIPQUAD(route->ksnr_myipaddr), local_port,
2536 HIPQUAD(route->ksnr_ipaddr), route->ksnr_port);
2544 ksocknal_connect_peer (ksock_route_t *route, int type)
2546 struct socket *sock;
2551 /* Iterate through reserved ports. When typed connections are
2552 * used, we will need to bind to multiple ports, but we only know
2553 * this at connect time. But, by that time we've already called
2554 * bind() so we need a new socket. */
2556 for (port = 1023; port > 512; --port) {
2558 rc = ksocknal_connect_sock(&sock, &may_retry, route, port);
2561 rc = ksocknal_create_conn(route, sock, type);
2570 CERROR("Out of ports trying to bind to a reserved port\n");
2571 return (-EADDRINUSE);
2575 ksocknal_autoconnect (ksock_route_t *route)
2577 LIST_HEAD (zombies);
2580 unsigned long flags;
2585 for (type = 0; type < SOCKNAL_CONN_NTYPES; type++)
2586 if ((route->ksnr_connecting & (1 << type)) != 0)
2588 LASSERT (type < SOCKNAL_CONN_NTYPES);
2590 rc = ksocknal_connect_peer (route, type);
2594 /* successfully autoconnected: create_conn did the
2595 * route/conn binding and scheduled any blocked packets */
2597 if (route->ksnr_connecting == 0) {
2598 /* No more connections required */
2603 /* Connection attempt failed */
2605 write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
2607 peer = route->ksnr_peer;
2608 route->ksnr_connecting = 0;
2610 /* This is a retry rather than a new connection */
2611 LASSERT (route->ksnr_retry_interval != 0);
2612 route->ksnr_timeout = jiffies + route->ksnr_retry_interval;
2613 route->ksnr_retry_interval = MIN (route->ksnr_retry_interval * 2,
2614 SOCKNAL_MAX_RECONNECT_INTERVAL);
2616 if (!list_empty (&peer->ksnp_tx_queue) &&
2617 ksocknal_find_connecting_route_locked (peer) == NULL) {
2618 LASSERT (list_empty (&peer->ksnp_conns));
2620 /* None of the connections that the blocked packets are
2621 * waiting for have been successful. Complete them now... */
2623 tx = list_entry (peer->ksnp_tx_queue.next,
2624 ksock_tx_t, tx_list);
2625 list_del (&tx->tx_list);
2626 list_add_tail (&tx->tx_list, &zombies);
2627 } while (!list_empty (&peer->ksnp_tx_queue));
2630 #if 0 /* irrelevent with only eager routes */
2631 if (!route->ksnr_deleted) {
2632 /* make this route least-favourite for re-selection */
2633 list_del(&route->ksnr_list);
2634 list_add_tail(&route->ksnr_list, &peer->ksnp_routes);
2637 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
2639 while (!list_empty (&zombies)) {
2640 char ipbuf[PTL_NALFMT_SIZE];
2641 char ipbuf2[PTL_NALFMT_SIZE];
2642 tx = list_entry (zombies.next, ksock_tx_t, tx_list);
2644 CERROR ("Deleting packet type %d len %d ("LPX64" %s->"LPX64" %s)\n",
2645 le32_to_cpu (tx->tx_hdr->type),
2646 le32_to_cpu (tx->tx_hdr->payload_length),
2647 le64_to_cpu (tx->tx_hdr->src_nid),
2648 portals_nid2str(SOCKNAL,
2649 le64_to_cpu(tx->tx_hdr->src_nid),
2651 le64_to_cpu (tx->tx_hdr->dest_nid),
2652 portals_nid2str(SOCKNAL,
2653 le64_to_cpu(tx->tx_hdr->src_nid),
2656 list_del (&tx->tx_list);
2658 ksocknal_tx_done (tx, 0);
2663 ksocknal_autoconnectd (void *arg)
2665 long id = (long)arg;
2667 unsigned long flags;
2668 ksock_route_t *route;
2671 snprintf (name, sizeof (name), "ksocknal_ad%02ld", id);
2672 kportal_daemonize (name);
2673 kportal_blockallsigs ();
2675 spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2677 while (!ksocknal_data.ksnd_shuttingdown) {
2679 if (!list_empty (&ksocknal_data.ksnd_autoconnectd_routes)) {
2680 route = list_entry (ksocknal_data.ksnd_autoconnectd_routes.next,
2681 ksock_route_t, ksnr_connect_list);
2683 list_del (&route->ksnr_connect_list);
2684 spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2686 ksocknal_autoconnect (route);
2687 ksocknal_put_route (route);
2689 spin_lock_irqsave(&ksocknal_data.ksnd_autoconnectd_lock,
2694 spin_unlock_irqrestore(&ksocknal_data.ksnd_autoconnectd_lock,
2697 rc = wait_event_interruptible(ksocknal_data.ksnd_autoconnectd_waitq,
2698 ksocknal_data.ksnd_shuttingdown ||
2699 !list_empty(&ksocknal_data.ksnd_autoconnectd_routes));
2701 spin_lock_irqsave(&ksocknal_data.ksnd_autoconnectd_lock, flags);
2704 spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2706 ksocknal_thread_fini ();
2711 ksocknal_find_timed_out_conn (ksock_peer_t *peer)
2713 /* We're called with a shared lock on ksnd_global_lock */
2715 struct list_head *ctmp;
2717 list_for_each (ctmp, &peer->ksnp_conns) {
2718 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
2720 /* Don't need the {get,put}connsock dance to deref ksnc_sock... */
2721 LASSERT (!conn->ksnc_closing);
2723 if (conn->ksnc_sock->sk->sk_err != 0) {
2724 /* Something (e.g. failed keepalive) set the socket error */
2725 atomic_inc (&conn->ksnc_refcount);
2726 CERROR ("Socket error %d: "LPX64" %p %d.%d.%d.%d\n",
2727 conn->ksnc_sock->sk->sk_err, peer->ksnp_nid,
2728 conn, HIPQUAD(conn->ksnc_ipaddr));
2732 if (conn->ksnc_rx_started &&
2733 time_after_eq (jiffies, conn->ksnc_rx_deadline)) {
2734 /* Timed out incomplete incoming message */
2735 atomic_inc (&conn->ksnc_refcount);
2736 CERROR ("Timed out RX from "LPX64" %p %d.%d.%d.%d\n",
2737 peer->ksnp_nid,conn,HIPQUAD(conn->ksnc_ipaddr));
2741 if ((!list_empty (&conn->ksnc_tx_queue) ||
2742 conn->ksnc_sock->sk->sk_wmem_queued != 0) &&
2743 time_after_eq (jiffies, conn->ksnc_tx_deadline)) {
2744 /* Timed out messages queued for sending or
2745 * buffered in the socket's send buffer */
2746 atomic_inc (&conn->ksnc_refcount);
2747 CERROR ("Timed out TX to "LPX64" %s%d %p %d.%d.%d.%d\n",
2749 list_empty (&conn->ksnc_tx_queue) ? "" : "Q ",
2750 conn->ksnc_sock->sk->sk_wmem_queued, conn,
2751 HIPQUAD(conn->ksnc_ipaddr));
2760 ksocknal_check_peer_timeouts (int idx)
2762 struct list_head *peers = &ksocknal_data.ksnd_peers[idx];
2763 struct list_head *ptmp;
2768 /* NB. We expect to have a look at all the peers and not find any
2769 * connections to time out, so we just use a shared lock while we
2771 read_lock (&ksocknal_data.ksnd_global_lock);
2773 list_for_each (ptmp, peers) {
2774 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
2775 conn = ksocknal_find_timed_out_conn (peer);
2778 read_unlock (&ksocknal_data.ksnd_global_lock);
2780 CERROR ("Timeout out conn->"LPX64" ip %d.%d.%d.%d:%d\n",
2782 HIPQUAD(conn->ksnc_ipaddr),
2784 ksocknal_close_conn_and_siblings (conn, -ETIMEDOUT);
2786 /* NB we won't find this one again, but we can't
2787 * just proceed with the next peer, since we dropped
2788 * ksnd_global_lock and it might be dead already! */
2789 ksocknal_put_conn (conn);
2794 read_unlock (&ksocknal_data.ksnd_global_lock);
2798 ksocknal_reaper (void *arg)
2801 unsigned long flags;
2803 ksock_sched_t *sched;
2804 struct list_head enomem_conns;
2809 unsigned long deadline = jiffies;
2811 kportal_daemonize ("ksocknal_reaper");
2812 kportal_blockallsigs ();
2814 INIT_LIST_HEAD(&enomem_conns);
2815 init_waitqueue_entry (&wait, current);
2817 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2819 while (!ksocknal_data.ksnd_shuttingdown) {
2821 if (!list_empty (&ksocknal_data.ksnd_deathrow_conns)) {
2822 conn = list_entry (ksocknal_data.ksnd_deathrow_conns.next,
2823 ksock_conn_t, ksnc_list);
2824 list_del (&conn->ksnc_list);
2826 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2828 ksocknal_terminate_conn (conn);
2829 ksocknal_put_conn (conn);
2831 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2835 if (!list_empty (&ksocknal_data.ksnd_zombie_conns)) {
2836 conn = list_entry (ksocknal_data.ksnd_zombie_conns.next,
2837 ksock_conn_t, ksnc_list);
2838 list_del (&conn->ksnc_list);
2840 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2842 ksocknal_destroy_conn (conn);
2844 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2848 if (!list_empty (&ksocknal_data.ksnd_enomem_conns)) {
2849 list_add(&enomem_conns, &ksocknal_data.ksnd_enomem_conns);
2850 list_del_init(&ksocknal_data.ksnd_enomem_conns);
2853 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2855 /* reschedule all the connections that stalled with ENOMEM... */
2857 while (!list_empty (&enomem_conns)) {
2858 conn = list_entry (enomem_conns.next,
2859 ksock_conn_t, ksnc_tx_list);
2860 list_del (&conn->ksnc_tx_list);
2862 sched = conn->ksnc_scheduler;
2864 spin_lock_irqsave (&sched->kss_lock, flags);
2866 LASSERT (conn->ksnc_tx_scheduled);
2867 conn->ksnc_tx_ready = 1;
2868 list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns);
2869 wake_up (&sched->kss_waitq);
2871 spin_unlock_irqrestore (&sched->kss_lock, flags);
2875 /* careful with the jiffy wrap... */
2876 while ((timeout = (int)(deadline - jiffies)) <= 0) {
2879 int chunk = ksocknal_data.ksnd_peer_hash_size;
2881 /* Time to check for timeouts on a few more peers: I do
2882 * checks every 'p' seconds on a proportion of the peer
2883 * table and I need to check every connection 'n' times
2884 * within a timeout interval, to ensure I detect a
2885 * timeout on any connection within (n+1)/n times the
2886 * timeout interval. */
2888 if (ksocknal_tunables.ksnd_io_timeout > n * p)
2889 chunk = (chunk * n * p) /
2890 ksocknal_tunables.ksnd_io_timeout;
2894 for (i = 0; i < chunk; i++) {
2895 ksocknal_check_peer_timeouts (peer_index);
2896 peer_index = (peer_index + 1) %
2897 ksocknal_data.ksnd_peer_hash_size;
2903 if (nenomem_conns != 0) {
2904 /* Reduce my timeout if I rescheduled ENOMEM conns.
2905 * This also prevents me getting woken immediately
2906 * if any go back on my enomem list. */
2907 timeout = SOCKNAL_ENOMEM_RETRY;
2909 ksocknal_data.ksnd_reaper_waketime = jiffies + timeout;
2911 set_current_state (TASK_INTERRUPTIBLE);
2912 add_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
2914 if (!ksocknal_data.ksnd_shuttingdown &&
2915 list_empty (&ksocknal_data.ksnd_deathrow_conns) &&
2916 list_empty (&ksocknal_data.ksnd_zombie_conns))
2917 schedule_timeout (timeout);
2919 set_current_state (TASK_RUNNING);
2920 remove_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
2922 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2925 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2927 ksocknal_thread_fini ();
2931 lib_nal_t ksocknal_lib = {
2932 libnal_data: &ksocknal_data, /* NAL private data */
2933 libnal_send: ksocknal_send,
2934 libnal_send_pages: ksocknal_send_pages,
2935 libnal_recv: ksocknal_recv,
2936 libnal_recv_pages: ksocknal_recv_pages,
2937 libnal_dist: ksocknal_dist