1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5 * Author: Zach Brown <zab@zabbo.net>
6 * Author: Peter J. Braam <braam@clusterfs.com>
7 * Author: Phil Schwan <phil@clusterfs.com>
8 * Author: Eric Barton <eric@bartonsoftware.com>
10 * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
12 * Portals is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Portals is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Portals; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
29 * LIB functions follow
33 ksocknal_read(nal_cb_t *nal, void *private, void *dst_addr,
34 user_ptr src_addr, size_t len)
36 CDEBUG(D_NET, LPX64": reading %ld bytes from %p -> %p\n",
37 nal->ni.nid, (long)len, src_addr, dst_addr);
39 memcpy( dst_addr, src_addr, len );
44 ksocknal_write(nal_cb_t *nal, void *private, user_ptr dst_addr,
45 void *src_addr, size_t len)
47 CDEBUG(D_NET, LPX64": writing %ld bytes from %p -> %p\n",
48 nal->ni.nid, (long)len, src_addr, dst_addr);
50 memcpy( dst_addr, src_addr, len );
55 ksocknal_callback (nal_cb_t * nal, void *private, lib_eq_t *eq,
58 CDEBUG(D_NET, LPX64": callback eq %p ev %p\n",
61 if (eq->event_callback != NULL)
62 eq->event_callback(ev);
68 ksocknal_malloc(nal_cb_t *nal, size_t len)
72 PORTAL_ALLOC(buf, len);
81 ksocknal_free(nal_cb_t *nal, void *buf, size_t len)
83 PORTAL_FREE(buf, len);
87 ksocknal_printf(nal_cb_t *nal, const char *fmt, ...)
93 vsnprintf (msg, sizeof (msg), fmt, ap); /* sprint safely */
96 msg[sizeof (msg) - 1] = 0; /* ensure terminated */
98 CDEBUG (D_NET, "%s", msg);
102 ksocknal_cli(nal_cb_t *nal, unsigned long *flags)
104 ksock_nal_data_t *data = nal->nal_data;
106 spin_lock(&data->ksnd_nal_cb_lock);
110 ksocknal_sti(nal_cb_t *nal, unsigned long *flags)
112 ksock_nal_data_t *data;
113 data = nal->nal_data;
115 spin_unlock(&data->ksnd_nal_cb_lock);
119 ksocknal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
121 /* I would guess that if ksocknal_get_peer (nid) == NULL,
122 and we're not routing, then 'nid' is very distant :) */
123 if ( nal->ni.nid == nid ) {
133 ksocknal_free_ltx (ksock_ltx_t *ltx)
135 atomic_dec(&ksocknal_data.ksnd_nactive_ltxs);
136 PORTAL_FREE(ltx, ltx->ltx_desc_size);
141 ksocknal_kvaddr_to_page (unsigned long vaddr)
145 if (vaddr >= VMALLOC_START &&
147 page = vmalloc_to_page ((void *)vaddr);
149 else if (vaddr >= PKMAP_BASE &&
150 vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
151 page = vmalloc_to_page ((void *)vaddr);
152 /* in 2.4 ^ just walks the page tables */
155 page = virt_to_page (vaddr);
166 ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
168 struct socket *sock = conn->ksnc_sock;
169 struct iovec *iov = tx->tx_iov;
170 int fragsize = iov->iov_len;
171 unsigned long vaddr = (unsigned long)iov->iov_base;
172 int more = (tx->tx_niov > 1) ||
173 (tx->tx_nkiov > 0) ||
174 (!list_empty (&conn->ksnc_tx_queue));
176 int offset = vaddr & (PAGE_SIZE - 1);
177 int zcsize = MIN (fragsize, PAGE_SIZE - offset);
182 /* NB we can't trust socket ops to either consume our iovs
183 * or leave them alone, so we only send 1 frag at a time. */
184 LASSERT (fragsize <= tx->tx_resid);
185 LASSERT (tx->tx_niov > 0);
188 if (zcsize >= ksocknal_data.ksnd_zc_min_frag &&
189 (sock->sk->route_caps & NETIF_F_SG) &&
190 (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
191 (page = ksocknal_kvaddr_to_page (vaddr)) != NULL) {
193 CDEBUG(D_NET, "vaddr %p, page %p->%p + offset %x for %d\n",
194 (void *)vaddr, page, page_address(page), offset, zcsize);
196 if (fragsize > zcsize) {
201 rc = tcp_sendpage_zccd(sock, page, offset, zcsize,
202 more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
207 /* NB don't pass tx's iov; sendmsg may or may not update it */
208 struct iovec fragiov = { .iov_base = (void *)vaddr,
209 .iov_len = fragsize};
210 struct msghdr msg = {
217 .msg_flags = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
219 mm_segment_t oldmm = get_fs();
222 rc = sock_sendmsg(sock, &msg, fragsize);
231 if (rc < iov->iov_len) {
232 /* didn't send whole iov entry... */
233 iov->iov_base = (void *)(vaddr + rc);
235 /* ...but did we send everything we tried to send? */
236 return ((rc == fragsize) ? 1 : -EAGAIN);
245 ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
247 struct socket *sock = conn->ksnc_sock;
248 ptl_kiov_t *kiov = tx->tx_kiov;
249 int fragsize = kiov->kiov_len;
250 struct page *page = kiov->kiov_page;
251 int offset = kiov->kiov_offset;
252 int more = (tx->tx_nkiov > 1) ||
253 (!list_empty (&conn->ksnc_tx_queue));
256 /* NB we can't trust socket ops to either consume our iovs
257 * or leave them alone, so we only send 1 frag at a time. */
258 LASSERT (fragsize <= tx->tx_resid);
259 LASSERT (offset + fragsize <= PAGE_SIZE);
260 LASSERT (tx->tx_niov == 0);
261 LASSERT (tx->tx_nkiov > 0);
264 if (fragsize >= ksocknal_data.ksnd_zc_min_frag &&
265 (sock->sk->route_caps & NETIF_F_SG) &&
266 (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM))) {
268 CDEBUG(D_NET, "page %p + offset %x for %d\n",
269 page, offset, fragsize);
271 rc = tcp_sendpage_zccd(sock, page, offset, fragsize,
272 more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
277 char *addr = ((char *)kmap (page)) + offset;
278 struct iovec fragiov = {.iov_base = addr,
279 .iov_len = fragsize};
280 struct msghdr msg = {
287 .msg_flags = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
289 mm_segment_t oldmm = get_fs();
292 rc = sock_sendmsg(sock, &msg, fragsize);
304 /* didn't send whole frag */
305 kiov->kiov_offset = offset + rc;
306 kiov->kiov_len = fragsize - rc;
310 /* everything went */
311 LASSERT (rc == fragsize);
318 ksocknal_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
320 /* Return 0 on success, < 0 on error.
321 * caller checks tx_resid to determine progress/completion */
325 if (ksocknal_data.ksnd_stall_tx != 0) {
326 set_current_state (TASK_UNINTERRUPTIBLE);
327 schedule_timeout (ksocknal_data.ksnd_stall_tx * HZ);
330 rc = ksocknal_getconnsock (conn);
332 LASSERT (conn->ksnc_closing);
337 LASSERT (tx->tx_resid != 0);
339 if (tx->tx_niov != 0)
340 rc = ksocknal_send_iov (conn, tx);
342 rc = ksocknal_send_kiov (conn, tx);
344 if (rc <= 0) { /* error or socket full? */
345 /* NB: rc == 0 and rc == -EAGAIN both mean try
346 * again later (linux stack returns -EAGAIN for
347 * this, but Adaptech TOE returns 0) */
353 /* Consider the connection alive since we managed to chuck
354 * more data into it. Really, we'd like to consider it
355 * alive only when the peer ACKs something, but
356 * write_space() only gets called back while SOCK_NOSPACE
357 * is set. Instead, we presume peer death has occurred if
358 * the socket doesn't drain within a timout */
359 conn->ksnc_tx_deadline = jiffies +
360 ksocknal_data.ksnd_io_timeout * HZ;
361 conn->ksnc_peer->ksnp_last_alive = jiffies;
363 if (tx->tx_resid == 0) { /* sent everything */
369 ksocknal_putconnsock (conn);
374 ksocknal_eager_ack (ksock_conn_t *conn)
377 mm_segment_t oldmm = get_fs();
378 struct socket *sock = conn->ksnc_sock;
380 /* Remind the socket to ACK eagerly. If I don't, the socket might
381 * think I'm about to send something it could piggy-back the ACK
382 * on, introducing delay in completing zero-copy sends in my
386 sock->ops->setsockopt (sock, SOL_TCP, TCP_QUICKACK,
387 (char *)&opt, sizeof (opt));
392 ksocknal_recv_iov (ksock_conn_t *conn)
394 struct iovec *iov = conn->ksnc_rx_iov;
395 int fragsize = iov->iov_len;
396 unsigned long vaddr = (unsigned long)iov->iov_base;
397 struct iovec fragiov = { .iov_base = (void *)vaddr,
398 .iov_len = fragsize};
399 struct msghdr msg = {
408 mm_segment_t oldmm = get_fs();
411 /* NB we can't trust socket ops to either consume our iovs
412 * or leave them alone, so we only receive 1 frag at a time. */
413 LASSERT (conn->ksnc_rx_niov > 0);
414 LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
417 rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
418 /* NB this is just a boolean............................^ */
424 /* received something... */
425 conn->ksnc_peer->ksnp_last_alive = jiffies;
426 conn->ksnc_rx_deadline = jiffies +
427 ksocknal_data.ksnd_io_timeout * HZ;
428 mb(); /* order with setting rx_started */
429 conn->ksnc_rx_started = 1;
431 conn->ksnc_rx_nob_wanted -= rc;
432 conn->ksnc_rx_nob_left -= rc;
435 iov->iov_base = (void *)(vaddr + rc);
436 iov->iov_len = fragsize - rc;
441 conn->ksnc_rx_niov--;
446 ksocknal_recv_kiov (ksock_conn_t *conn)
448 ptl_kiov_t *kiov = conn->ksnc_rx_kiov;
449 struct page *page = kiov->kiov_page;
450 int offset = kiov->kiov_offset;
451 int fragsize = kiov->kiov_len;
452 unsigned long vaddr = ((unsigned long)kmap (page)) + offset;
453 struct iovec fragiov = { .iov_base = (void *)vaddr,
454 .iov_len = fragsize};
455 struct msghdr msg = {
464 mm_segment_t oldmm = get_fs();
467 /* NB we can't trust socket ops to either consume our iovs
468 * or leave them alone, so we only receive 1 frag at a time. */
469 LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
470 LASSERT (conn->ksnc_rx_nkiov > 0);
471 LASSERT (offset + fragsize <= PAGE_SIZE);
474 rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
475 /* NB this is just a boolean............................^ */
483 /* received something... */
484 conn->ksnc_peer->ksnp_last_alive = jiffies;
485 conn->ksnc_rx_deadline = jiffies +
486 ksocknal_data.ksnd_io_timeout * HZ;
487 mb(); /* order with setting rx_started */
488 conn->ksnc_rx_started = 1;
490 conn->ksnc_rx_nob_wanted -= rc;
491 conn->ksnc_rx_nob_left -= rc;
494 kiov->kiov_offset = offset + rc;
495 kiov->kiov_len = fragsize - rc;
499 conn->ksnc_rx_kiov++;
500 conn->ksnc_rx_nkiov--;
505 ksocknal_receive (ksock_conn_t *conn)
507 /* Return 1 on success, 0 on EOF, < 0 on error.
508 * Caller checks ksnc_rx_nob_wanted to determine
509 * progress/completion. */
513 if (ksocknal_data.ksnd_stall_rx != 0) {
514 set_current_state (TASK_UNINTERRUPTIBLE);
515 schedule_timeout (ksocknal_data.ksnd_stall_rx * HZ);
518 rc = ksocknal_getconnsock (conn);
520 LASSERT (conn->ksnc_closing);
525 if (conn->ksnc_rx_niov != 0)
526 rc = ksocknal_recv_iov (conn);
528 rc = ksocknal_recv_kiov (conn);
531 /* error/EOF or partial receive */
534 } else if (rc == 0 && conn->ksnc_rx_started) {
535 /* EOF in the middle of a message */
541 /* Completed a fragment */
543 if (conn->ksnc_rx_nob_wanted == 0) {
544 /* Completed a message segment (header or payload) */
545 if ((ksocknal_data.ksnd_eager_ack & conn->ksnc_type) != 0 &&
546 (conn->ksnc_rx_state == SOCKNAL_RX_BODY ||
547 conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD)) {
548 /* Remind the socket to ack eagerly... */
549 ksocknal_eager_ack(conn);
556 ksocknal_putconnsock (conn);
562 ksocknal_zc_callback (zccd_t *zcd)
564 ksock_tx_t *tx = KSOCK_ZCCD_2_TX(zcd);
565 ksock_sched_t *sched = tx->tx_conn->ksnc_scheduler;
569 /* Schedule tx for cleanup (can't do it now due to lock conflicts) */
571 spin_lock_irqsave (&sched->kss_lock, flags);
573 list_add_tail (&tx->tx_list, &sched->kss_zctxdone_list);
574 wake_up (&sched->kss_waitq);
576 spin_unlock_irqrestore (&sched->kss_lock, flags);
582 ksocknal_tx_done (ksock_tx_t *tx, int asynch)
587 if (tx->tx_conn != NULL) {
588 /* This tx got queued on a conn; do the accounting... */
589 atomic_sub (tx->tx_nob, &tx->tx_conn->ksnc_tx_nob);
591 /* zero copy completion isn't always from
592 * process_transmit() so it needs to keep a ref on
595 ksocknal_put_conn (tx->tx_conn);
601 if (tx->tx_isfwd) { /* was a forwarded packet? */
602 kpr_fwd_done (&ksocknal_data.ksnd_router,
603 KSOCK_TX_2_KPR_FWD_DESC (tx), 0);
609 ltx = KSOCK_TX_2_KSOCK_LTX (tx);
611 lib_finalize (&ksocknal_lib, ltx->ltx_private, ltx->ltx_cookie);
613 ksocknal_free_ltx (ltx);
618 ksocknal_tx_launched (ksock_tx_t *tx)
621 if (atomic_read (&tx->tx_zccd.zccd_count) != 1) {
622 ksock_conn_t *conn = tx->tx_conn;
624 /* zccd skbufs are still in-flight. First take a ref on
625 * conn, so it hangs about for ksocknal_tx_done... */
626 atomic_inc (&conn->ksnc_refcount);
628 /* ...then drop the initial ref on zccd, so the zero copy
629 * callback can occur */
630 zccd_put (&tx->tx_zccd);
634 /* Any zero-copy-ness (if any) has completed; I can complete the
635 * transmit now, avoiding an extra schedule */
636 ksocknal_tx_done (tx, 0);
640 ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
644 rc = ksocknal_transmit (conn, tx);
646 CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc);
647 LASSERT (rc != -EAGAIN);
651 if (tx->tx_resid != 0) {
652 /* didn't send everything */
656 ksocknal_tx_launched (tx);
660 if (!conn->ksnc_closing)
661 CERROR ("[%p] Error %d on write to "LPX64" ip %08x:%d\n",
662 conn, rc, conn->ksnc_peer->ksnp_nid,
663 conn->ksnc_ipaddr, conn->ksnc_port);
665 ksocknal_close_conn_and_siblings (conn, rc);
666 ksocknal_tx_launched (tx);
672 ksocknal_launch_autoconnect_locked (ksock_route_t *route)
676 /* called holding write lock on ksnd_global_lock */
678 LASSERT (!route->ksnr_deleted);
679 LASSERT ((route->ksnr_connected & (1 << SOCKNAL_CONN_ANY)) == 0);
680 LASSERT ((route->ksnr_connected & KSNR_TYPED_ROUTES) != KSNR_TYPED_ROUTES);
681 LASSERT (!route->ksnr_connecting);
683 if (ksocknal_data.ksnd_typed_conns)
684 route->ksnr_connecting =
685 KSNR_TYPED_ROUTES & ~route->ksnr_connected;
687 route->ksnr_connecting = (1 << SOCKNAL_CONN_ANY);
689 atomic_inc (&route->ksnr_refcount); /* extra ref for asynchd */
691 spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
693 list_add_tail (&route->ksnr_connect_list,
694 &ksocknal_data.ksnd_autoconnectd_routes);
695 wake_up (&ksocknal_data.ksnd_autoconnectd_waitq);
697 spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
701 ksocknal_find_target_peer_locked (ksock_tx_t *tx, ptl_nid_t nid)
703 ptl_nid_t target_nid;
705 ksock_peer_t *peer = ksocknal_find_peer_locked (nid);
711 CERROR ("Can't send packet to "LPX64
712 ": routed target is not a peer\n", nid);
716 rc = kpr_lookup (&ksocknal_data.ksnd_router, nid, tx->tx_nob,
719 CERROR ("Can't route to "LPX64": router error %d\n", nid, rc);
723 peer = ksocknal_find_peer_locked (target_nid);
727 CERROR ("Can't send packet to "LPX64": no peer entry\n", target_nid);
732 ksocknal_find_conn_locked (ksock_tx_t *tx, ksock_peer_t *peer)
734 struct list_head *tmp;
735 ksock_conn_t *typed = NULL;
737 ksock_conn_t *fallback = NULL;
740 /* Find the conn with the shortest tx queue */
741 list_for_each (tmp, &peer->ksnp_conns) {
742 ksock_conn_t *c = list_entry(tmp, ksock_conn_t, ksnc_list);
743 int nob = atomic_read(&c->ksnc_tx_nob);
745 LASSERT (!c->ksnc_closing);
747 if (fallback == NULL || nob < fnob) {
752 if (!ksocknal_data.ksnd_typed_conns)
755 switch (c->ksnc_type) {
758 case SOCKNAL_CONN_ANY:
760 case SOCKNAL_CONN_BULK_IN:
762 case SOCKNAL_CONN_BULK_OUT:
763 if (tx->tx_nob < ksocknal_data.ksnd_min_bulk)
766 case SOCKNAL_CONN_CONTROL:
767 if (tx->tx_nob >= ksocknal_data.ksnd_min_bulk)
772 if (typed == NULL || nob < tnob) {
778 /* prefer the typed selection */
779 return ((typed != NULL) ? typed : fallback);
783 ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
786 ksock_sched_t *sched = conn->ksnc_scheduler;
788 /* called holding global lock (read or irq-write) and caller may
789 * not have dropped this lock between finding conn and calling me,
790 * so we don't need the {get,put}connsock dance to deref
792 LASSERT(!conn->ksnc_closing);
793 LASSERT(tx->tx_resid == tx->tx_nob);
795 CDEBUG (D_NET, "Sending to "LPX64" on port %d\n",
796 conn->ksnc_peer->ksnp_nid, conn->ksnc_port);
798 atomic_add (tx->tx_nob, &conn->ksnc_tx_nob);
802 zccd_init (&tx->tx_zccd, ksocknal_zc_callback);
803 /* NB this sets 1 ref on zccd, so the callback can only occur after
804 * I've released this ref. */
806 spin_lock_irqsave (&sched->kss_lock, flags);
808 conn->ksnc_tx_deadline = jiffies +
809 ksocknal_data.ksnd_io_timeout * HZ;
810 mb(); /* order with list_add_tail */
812 list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
814 if (conn->ksnc_tx_ready && /* able to send */
815 !conn->ksnc_tx_scheduled) { /* not scheduled to send */
816 /* +1 ref for scheduler */
817 atomic_inc (&conn->ksnc_refcount);
818 list_add_tail (&conn->ksnc_tx_list,
819 &sched->kss_tx_conns);
820 conn->ksnc_tx_scheduled = 1;
821 wake_up (&sched->kss_waitq);
824 spin_unlock_irqrestore (&sched->kss_lock, flags);
828 ksocknal_find_connectable_route_locked (ksock_peer_t *peer)
830 struct list_head *tmp;
831 ksock_route_t *route;
832 ksock_route_t *candidate = NULL;
836 list_for_each (tmp, &peer->ksnp_routes) {
837 route = list_entry (tmp, ksock_route_t, ksnr_list);
838 bits = route->ksnr_connected;
840 if ((bits & KSNR_TYPED_ROUTES) == KSNR_TYPED_ROUTES ||
841 (bits & (1 << SOCKNAL_CONN_ANY)) != 0 ||
842 route->ksnr_connecting != 0) {
843 /* All typed connections have been established, or
844 * an untyped connection has been established, or
845 * connections are currently being established */
850 /* too soon to retry this guy? */
851 if (!time_after_eq (jiffies, route->ksnr_timeout))
854 /* always do eager routes */
855 if (route->ksnr_eager)
858 if (candidate == NULL) {
859 /* If we don't find any other route that is fully
860 * connected or connecting, the first connectable
861 * route is returned. If it fails to connect, it
862 * will get placed at the end of the list */
867 return (found ? NULL : candidate);
871 ksocknal_find_connecting_route_locked (ksock_peer_t *peer)
873 struct list_head *tmp;
874 ksock_route_t *route;
876 list_for_each (tmp, &peer->ksnp_routes) {
877 route = list_entry (tmp, ksock_route_t, ksnr_list);
879 if (route->ksnr_connecting != 0)
887 ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid)
892 ksock_route_t *route;
895 /* Ensure the frags we've been given EXACTLY match the number of
896 * bytes we want to send. Many TCP/IP stacks disregard any total
897 * size parameters passed to them and just look at the frags.
899 * We always expect at least 1 mapped fragment containing the
900 * complete portals header. */
901 LASSERT (lib_iov_nob (tx->tx_niov, tx->tx_iov) +
902 lib_kiov_nob (tx->tx_nkiov, tx->tx_kiov) == tx->tx_nob);
903 LASSERT (tx->tx_niov >= 1);
904 LASSERT (tx->tx_iov[0].iov_len >= sizeof (ptl_hdr_t));
906 CDEBUG (D_NET, "packet %p type %d, nob %d niov %d nkiov %d\n",
907 tx, ((ptl_hdr_t *)tx->tx_iov[0].iov_base)->type,
908 tx->tx_nob, tx->tx_niov, tx->tx_nkiov);
910 tx->tx_conn = NULL; /* only set when assigned a conn */
911 tx->tx_resid = tx->tx_nob;
912 tx->tx_hdr = (ptl_hdr_t *)tx->tx_iov[0].iov_base;
914 g_lock = &ksocknal_data.ksnd_global_lock;
917 peer = ksocknal_find_target_peer_locked (tx, nid);
919 read_unlock (g_lock);
920 return (-EHOSTUNREACH);
923 if (ksocknal_find_connectable_route_locked(peer) == NULL) {
924 conn = ksocknal_find_conn_locked (tx, peer);
926 /* I've got no autoconnect routes that need to be
927 * connecting and I do have an actual connection... */
928 ksocknal_queue_tx_locked (tx, conn);
929 read_unlock (g_lock);
934 /* Making one or more connections; I'll need a write lock... */
936 atomic_inc (&peer->ksnp_refcount); /* +1 ref for me while I unlock */
937 read_unlock (g_lock);
938 write_lock_irqsave (g_lock, flags);
940 if (peer->ksnp_closing) { /* peer deleted as I blocked! */
941 write_unlock_irqrestore (g_lock, flags);
942 ksocknal_put_peer (peer);
943 return (-EHOSTUNREACH);
945 ksocknal_put_peer (peer); /* drop ref I got above */
948 /* launch any/all autoconnections that need it */
949 route = ksocknal_find_connectable_route_locked (peer);
953 ksocknal_launch_autoconnect_locked (route);
956 conn = ksocknal_find_conn_locked (tx, peer);
958 /* Connection exists; queue message on it */
959 ksocknal_queue_tx_locked (tx, conn);
960 write_unlock_irqrestore (g_lock, flags);
964 route = ksocknal_find_connecting_route_locked (peer);
966 /* At least 1 connection is being established; queue the
968 list_add_tail (&tx->tx_list, &peer->ksnp_tx_queue);
969 write_unlock_irqrestore (g_lock, flags);
973 write_unlock_irqrestore (g_lock, flags);
974 return (-EHOSTUNREACH);
978 ksocknal_sendmsg(nal_cb_t *nal,
985 unsigned int payload_niov,
986 struct iovec *payload_iov,
987 ptl_kiov_t *payload_kiov,
994 /* NB 'private' is different depending on what we're sending.
995 * Just ignore it... */
997 CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64
998 " pid %d\n", payload_nob, payload_niov, nid , pid);
1000 LASSERT (payload_nob == 0 || payload_niov > 0);
1001 LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1003 /* It must be OK to kmap() if required */
1004 LASSERT (payload_kiov == NULL || !in_interrupt ());
1005 /* payload is either all vaddrs or all pages */
1006 LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1008 if (payload_iov != NULL)
1009 desc_size = offsetof(ksock_ltx_t, ltx_iov[1 + payload_niov]);
1011 desc_size = offsetof(ksock_ltx_t, ltx_kiov[payload_niov]);
1013 if (in_interrupt() ||
1014 type == PTL_MSG_ACK ||
1015 type == PTL_MSG_REPLY) {
1016 /* Can't block if in interrupt or responding to an incoming
1018 PORTAL_ALLOC_ATOMIC(ltx, desc_size);
1020 PORTAL_ALLOC(ltx, desc_size);
1024 CERROR("Can't allocate tx desc type %d size %d %s\n",
1025 type, desc_size, in_interrupt() ? "(intr)" : "");
1026 return (PTL_NOSPACE);
1029 atomic_inc(&ksocknal_data.ksnd_nactive_ltxs);
1031 ltx->ltx_desc_size = desc_size;
1033 /* We always have 1 mapped frag for the header */
1034 ltx->ltx_tx.tx_iov = ltx->ltx_iov;
1035 ltx->ltx_iov[0].iov_base = <x->ltx_hdr;
1036 ltx->ltx_iov[0].iov_len = sizeof(*hdr);
1037 ltx->ltx_hdr = *hdr;
1039 ltx->ltx_private = private;
1040 ltx->ltx_cookie = cookie;
1042 ltx->ltx_tx.tx_isfwd = 0;
1043 ltx->ltx_tx.tx_nob = sizeof (*hdr) + payload_nob;
1045 if (payload_iov != NULL) {
1046 /* payload is all mapped */
1047 ltx->ltx_tx.tx_kiov = NULL;
1048 ltx->ltx_tx.tx_nkiov = 0;
1050 ltx->ltx_tx.tx_niov = 1 + payload_niov;
1052 memcpy(ltx->ltx_iov + 1, payload_iov,
1053 payload_niov * sizeof (*payload_iov));
1056 /* payload is all pages */
1057 ltx->ltx_tx.tx_kiov = ltx->ltx_kiov;
1058 ltx->ltx_tx.tx_nkiov = payload_niov;
1060 ltx->ltx_tx.tx_niov = 1;
1062 memcpy(ltx->ltx_kiov, payload_kiov,
1063 payload_niov * sizeof (*payload_kiov));
1066 rc = ksocknal_launch_packet(<x->ltx_tx, nid);
1070 ksocknal_free_ltx(ltx);
1075 ksocknal_send (nal_cb_t *nal, void *private, lib_msg_t *cookie,
1076 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1077 unsigned int payload_niov, struct iovec *payload_iov,
1080 return (ksocknal_sendmsg(nal, private, cookie,
1081 hdr, type, nid, pid,
1082 payload_niov, payload_iov, NULL,
1087 ksocknal_send_pages (nal_cb_t *nal, void *private, lib_msg_t *cookie,
1088 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1089 unsigned int payload_niov, ptl_kiov_t *payload_kiov,
1092 return (ksocknal_sendmsg(nal, private, cookie,
1093 hdr, type, nid, pid,
1094 payload_niov, NULL, payload_kiov,
1099 ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
1101 ptl_nid_t nid = fwd->kprfd_gateway_nid;
1102 ksock_tx_t *tx = (ksock_tx_t *)&fwd->kprfd_scratch;
1105 CDEBUG (D_NET, "Forwarding [%p] -> "LPX64" ("LPX64"))\n", fwd,
1106 fwd->kprfd_gateway_nid, fwd->kprfd_target_nid);
1108 /* I'm the gateway; must be the last hop */
1109 if (nid == ksocknal_lib.ni.nid)
1110 nid = fwd->kprfd_target_nid;
1112 tx->tx_isfwd = 1; /* This is a forwarding packet */
1113 tx->tx_nob = fwd->kprfd_nob;
1114 tx->tx_niov = fwd->kprfd_niov;
1115 tx->tx_iov = fwd->kprfd_iov;
1119 rc = ksocknal_launch_packet (tx, nid);
1121 kpr_fwd_done (&ksocknal_data.ksnd_router, fwd, rc);
1125 ksocknal_thread_start (int (*fn)(void *arg), void *arg)
1127 long pid = kernel_thread (fn, arg, 0);
1132 atomic_inc (&ksocknal_data.ksnd_nthreads);
1137 ksocknal_thread_fini (void)
1139 atomic_dec (&ksocknal_data.ksnd_nthreads);
1143 ksocknal_fmb_callback (void *arg, int error)
1145 ksock_fmb_t *fmb = (ksock_fmb_t *)arg;
1146 ksock_fmb_pool_t *fmp = fmb->fmb_pool;
1147 ptl_hdr_t *hdr = (ptl_hdr_t *) page_address(fmb->fmb_pages[0]);
1148 ksock_conn_t *conn = NULL;
1149 ksock_sched_t *sched;
1150 unsigned long flags;
1153 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
1154 NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),
1157 CDEBUG (D_NET, "routed packet from "LPX64" to "LPX64": OK\n",
1158 NTOH__u64 (hdr->src_nid), NTOH__u64 (hdr->dest_nid));
1160 /* drop peer ref taken on init */
1161 ksocknal_put_peer (fmb->fmb_peer);
1163 spin_lock_irqsave (&fmp->fmp_lock, flags);
1165 list_add (&fmb->fmb_list, &fmp->fmp_idle_fmbs);
1166 fmp->fmp_nactive_fmbs--;
1168 if (!list_empty (&fmp->fmp_blocked_conns)) {
1169 conn = list_entry (fmb->fmb_pool->fmp_blocked_conns.next,
1170 ksock_conn_t, ksnc_rx_list);
1171 list_del (&conn->ksnc_rx_list);
1174 spin_unlock_irqrestore (&fmp->fmp_lock, flags);
1179 CDEBUG (D_NET, "Scheduling conn %p\n", conn);
1180 LASSERT (conn->ksnc_rx_scheduled);
1181 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP);
1183 conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;
1185 sched = conn->ksnc_scheduler;
1187 spin_lock_irqsave (&sched->kss_lock, flags);
1189 list_add_tail (&conn->ksnc_rx_list, &sched->kss_rx_conns);
1190 wake_up (&sched->kss_waitq);
1192 spin_unlock_irqrestore (&sched->kss_lock, flags);
1196 ksocknal_get_idle_fmb (ksock_conn_t *conn)
1198 int payload_nob = conn->ksnc_rx_nob_left;
1199 int packet_nob = sizeof (ptl_hdr_t) + payload_nob;
1200 unsigned long flags;
1201 ksock_fmb_pool_t *pool;
1204 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
1205 LASSERT (kpr_routing(&ksocknal_data.ksnd_router));
1207 if (packet_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE)
1208 pool = &ksocknal_data.ksnd_small_fmp;
1210 pool = &ksocknal_data.ksnd_large_fmp;
1212 spin_lock_irqsave (&pool->fmp_lock, flags);
1214 if (!list_empty (&pool->fmp_idle_fmbs)) {
1215 fmb = list_entry(pool->fmp_idle_fmbs.next,
1216 ksock_fmb_t, fmb_list);
1217 list_del (&fmb->fmb_list);
1218 pool->fmp_nactive_fmbs++;
1219 spin_unlock_irqrestore (&pool->fmp_lock, flags);
1224 /* deschedule until fmb free */
1226 conn->ksnc_rx_state = SOCKNAL_RX_FMB_SLEEP;
1228 list_add_tail (&conn->ksnc_rx_list,
1229 &pool->fmp_blocked_conns);
1231 spin_unlock_irqrestore (&pool->fmp_lock, flags);
1236 ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
1238 int payload_nob = conn->ksnc_rx_nob_left;
1239 int packet_nob = sizeof (ptl_hdr_t) + payload_nob;
1240 ptl_nid_t dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
1241 int niov; /* at least the header */
1244 LASSERT (conn->ksnc_rx_scheduled);
1245 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
1246 LASSERT (conn->ksnc_rx_nob_wanted == conn->ksnc_rx_nob_left);
1247 LASSERT (payload_nob >= 0);
1248 LASSERT (packet_nob <= fmb->fmb_npages * PAGE_SIZE);
1249 LASSERT (sizeof (ptl_hdr_t) < PAGE_SIZE);
1251 /* Got a forwarding buffer; copy the header we just read into the
1252 * forwarding buffer. If there's payload, start reading reading it
1253 * into the buffer, otherwise the forwarding buffer can be kicked
1256 * NB fmb->fmb_iov spans the WHOLE packet.
1257 * conn->ksnc_rx_iov spans just the payload.
1259 fmb->fmb_iov[0].iov_base = page_address (fmb->fmb_pages[0]);
1262 memcpy (fmb->fmb_iov[0].iov_base, &conn->ksnc_hdr, sizeof (ptl_hdr_t));
1264 /* Take a ref on the conn's peer to prevent module unload before
1265 * forwarding completes. NB we ref peer and not conn since because
1266 * all refs on conn after it has been closed must remove themselves
1268 fmb->fmb_peer = conn->ksnc_peer;
1269 atomic_inc (&conn->ksnc_peer->ksnp_refcount);
1271 if (payload_nob == 0) { /* got complete packet already */
1272 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (immediate)\n",
1273 conn, NTOH__u64 (conn->ksnc_hdr.src_nid),
1274 dest_nid, packet_nob);
1276 fmb->fmb_iov[0].iov_len = sizeof (ptl_hdr_t);
1278 kpr_fwd_init (&fmb->fmb_fwd, dest_nid,
1279 packet_nob, 1, fmb->fmb_iov,
1280 ksocknal_fmb_callback, fmb);
1282 /* forward it now */
1283 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
1285 ksocknal_new_packet (conn, 0); /* on to next packet */
1290 if (packet_nob <= PAGE_SIZE) { /* whole packet fits in first page */
1291 fmb->fmb_iov[0].iov_len = packet_nob;
1293 fmb->fmb_iov[0].iov_len = PAGE_SIZE;
1294 nob = packet_nob - PAGE_SIZE;
1297 LASSERT (niov < fmb->fmb_npages);
1298 fmb->fmb_iov[niov].iov_base =
1299 page_address (fmb->fmb_pages[niov]);
1300 fmb->fmb_iov[niov].iov_len = MIN (PAGE_SIZE, nob);
1306 kpr_fwd_init (&fmb->fmb_fwd, dest_nid,
1307 packet_nob, niov, fmb->fmb_iov,
1308 ksocknal_fmb_callback, fmb);
1310 conn->ksnc_cookie = fmb; /* stash fmb for later */
1311 conn->ksnc_rx_state = SOCKNAL_RX_BODY_FWD; /* read in the payload */
1313 /* payload is desc's iov-ed buffer, but skipping the hdr */
1314 LASSERT (niov <= sizeof (conn->ksnc_rx_iov_space) /
1315 sizeof (struct iovec));
1317 conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1318 conn->ksnc_rx_iov[0].iov_base =
1319 (void *)(((unsigned long)fmb->fmb_iov[0].iov_base) +
1320 sizeof (ptl_hdr_t));
1321 conn->ksnc_rx_iov[0].iov_len =
1322 fmb->fmb_iov[0].iov_len - sizeof (ptl_hdr_t);
1325 memcpy(&conn->ksnc_rx_iov[1], &fmb->fmb_iov[1],
1326 (niov - 1) * sizeof (struct iovec));
1328 conn->ksnc_rx_niov = niov;
1330 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d reading body\n", conn,
1331 NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid, payload_nob);
1336 ksocknal_fwd_parse (ksock_conn_t *conn)
1339 ptl_nid_t dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
1340 ptl_nid_t src_nid = NTOH__u64 (conn->ksnc_hdr.src_nid);
1341 int body_len = NTOH__u32 (conn->ksnc_hdr.payload_length);
1342 char str[PTL_NALFMT_SIZE];
1344 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d parsing header\n", conn,
1345 src_nid, dest_nid, conn->ksnc_rx_nob_left);
1347 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER);
1348 LASSERT (conn->ksnc_rx_scheduled);
1350 if (body_len < 0) { /* length corrupt (overflow) */
1351 CERROR("dropping packet from "LPX64" (%s) for "LPX64" (%s): "
1352 "packet size %d illegal\n",
1353 src_nid, portals_nid2str(TCPNAL, src_nid, str),
1354 dest_nid, portals_nid2str(TCPNAL, dest_nid, str),
1357 ksocknal_new_packet (conn, 0); /* on to new packet */
1361 if (!kpr_routing(&ksocknal_data.ksnd_router)) { /* not forwarding */
1362 CERROR("dropping packet from "LPX64" (%s) for "LPX64
1363 " (%s): not forwarding\n",
1364 src_nid, portals_nid2str(TCPNAL, src_nid, str),
1365 dest_nid, portals_nid2str(TCPNAL, dest_nid, str));
1366 /* on to new packet (skip this one's body) */
1367 ksocknal_new_packet (conn, body_len);
1371 if (body_len > PTL_MTU) { /* too big to forward */
1372 CERROR ("dropping packet from "LPX64" (%s) for "LPX64
1373 "(%s): packet size %d too big\n",
1374 src_nid, portals_nid2str(TCPNAL, src_nid, str),
1375 dest_nid, portals_nid2str(TCPNAL, dest_nid, str),
1377 /* on to new packet (skip this one's body) */
1378 ksocknal_new_packet (conn, body_len);
1382 /* should have gone direct */
1383 peer = ksocknal_get_peer (conn->ksnc_hdr.dest_nid);
1385 CERROR ("dropping packet from "LPX64" (%s) for "LPX64
1386 "(%s): target is a peer\n",
1387 src_nid, portals_nid2str(TCPNAL, src_nid, str),
1388 dest_nid, portals_nid2str(TCPNAL, dest_nid, str));
1389 ksocknal_put_peer (peer); /* drop ref from get above */
1391 /* on to next packet (skip this one's body) */
1392 ksocknal_new_packet (conn, body_len);
1396 conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB; /* Getting FMB now */
1397 conn->ksnc_rx_nob_left = body_len; /* stash packet size */
1398 conn->ksnc_rx_nob_wanted = body_len; /* (no slop) */
1402 ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip)
1404 static char ksocknal_slop_buffer[4096];
1410 if (nob_to_skip == 0) { /* right at next packet boundary now */
1411 conn->ksnc_rx_started = 0;
1412 mb (); /* racing with timeout thread */
1414 conn->ksnc_rx_state = SOCKNAL_RX_HEADER;
1415 conn->ksnc_rx_nob_wanted = sizeof (ptl_hdr_t);
1416 conn->ksnc_rx_nob_left = sizeof (ptl_hdr_t);
1418 conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1419 conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_hdr;
1420 conn->ksnc_rx_iov[0].iov_len = sizeof (ptl_hdr_t);
1421 conn->ksnc_rx_niov = 1;
1423 conn->ksnc_rx_kiov = NULL;
1424 conn->ksnc_rx_nkiov = 0;
1428 /* Set up to skip as much a possible now. If there's more left
1429 * (ran out of iov entries) we'll get called again */
1431 conn->ksnc_rx_state = SOCKNAL_RX_SLOP;
1432 conn->ksnc_rx_nob_left = nob_to_skip;
1433 conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1438 nob = MIN (nob_to_skip, sizeof (ksocknal_slop_buffer));
1440 conn->ksnc_rx_iov[niov].iov_base = ksocknal_slop_buffer;
1441 conn->ksnc_rx_iov[niov].iov_len = nob;
1446 } while (nob_to_skip != 0 && /* mustn't overflow conn's rx iov */
1447 niov < sizeof(conn->ksnc_rx_iov_space) / sizeof (struct iovec));
1449 conn->ksnc_rx_niov = niov;
1450 conn->ksnc_rx_kiov = NULL;
1451 conn->ksnc_rx_nkiov = 0;
1452 conn->ksnc_rx_nob_wanted = skipped;
1457 ksocknal_process_receive (ksock_conn_t *conn)
1462 LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
1464 /* doesn't need a forwarding buffer */
1465 if (conn->ksnc_rx_state != SOCKNAL_RX_GET_FMB)
1469 fmb = ksocknal_get_idle_fmb (conn);
1471 /* conn descheduled waiting for idle fmb */
1475 if (ksocknal_init_fmb (conn, fmb)) {
1476 /* packet forwarded */
1481 /* NB: sched lock NOT held */
1482 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER ||
1483 conn->ksnc_rx_state == SOCKNAL_RX_BODY ||
1484 conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD ||
1485 conn->ksnc_rx_state == SOCKNAL_RX_SLOP);
1487 LASSERT (conn->ksnc_rx_nob_wanted > 0);
1489 rc = ksocknal_receive(conn);
1492 LASSERT (rc != -EAGAIN);
1495 CWARN ("[%p] EOF from "LPX64" ip %08x:%d\n",
1496 conn, conn->ksnc_peer->ksnp_nid,
1497 conn->ksnc_ipaddr, conn->ksnc_port);
1498 else if (!conn->ksnc_closing)
1499 CERROR ("[%p] Error %d on read from "LPX64" ip %08x:%d\n",
1500 conn, rc, conn->ksnc_peer->ksnp_nid,
1501 conn->ksnc_ipaddr, conn->ksnc_port);
1503 ksocknal_close_conn_and_siblings (conn, rc);
1504 return (rc == 0 ? -ESHUTDOWN : rc);
1507 if (conn->ksnc_rx_nob_wanted != 0) {
1512 switch (conn->ksnc_rx_state) {
1513 case SOCKNAL_RX_HEADER:
1514 if (conn->ksnc_hdr.type != HTON__u32(PTL_MSG_HELLO) &&
1515 NTOH__u64(conn->ksnc_hdr.dest_nid) != ksocknal_lib.ni.nid) {
1516 /* This packet isn't for me */
1517 ksocknal_fwd_parse (conn);
1518 switch (conn->ksnc_rx_state) {
1519 case SOCKNAL_RX_HEADER: /* skipped (zero payload) */
1520 return (0); /* => come back later */
1521 case SOCKNAL_RX_SLOP: /* skipping packet's body */
1522 goto try_read; /* => go read it */
1523 case SOCKNAL_RX_GET_FMB: /* forwarding */
1524 goto get_fmb; /* => go get a fwd msg buffer */
1531 /* sets wanted_len, iovs etc */
1532 lib_parse(&ksocknal_lib, &conn->ksnc_hdr, conn);
1534 if (conn->ksnc_rx_nob_wanted != 0) { /* need to get payload? */
1535 conn->ksnc_rx_state = SOCKNAL_RX_BODY;
1536 goto try_read; /* go read the payload */
1538 /* Fall through (completed packet for me) */
1540 case SOCKNAL_RX_BODY:
1541 /* payload all received */
1542 lib_finalize(&ksocknal_lib, NULL, conn->ksnc_cookie);
1545 case SOCKNAL_RX_SLOP:
1546 /* starting new packet? */
1547 if (ksocknal_new_packet (conn, conn->ksnc_rx_nob_left))
1548 return (0); /* come back later */
1549 goto try_read; /* try to finish reading slop now */
1551 case SOCKNAL_RX_BODY_FWD:
1552 /* payload all received */
1553 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (got body)\n",
1554 conn, NTOH__u64 (conn->ksnc_hdr.src_nid),
1555 NTOH__u64 (conn->ksnc_hdr.dest_nid),
1556 conn->ksnc_rx_nob_left);
1558 /* forward the packet. NB ksocknal_init_fmb() put fmb into
1559 * conn->ksnc_cookie */
1560 fmb = (ksock_fmb_t *)conn->ksnc_cookie;
1561 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
1563 /* no slop in forwarded packets */
1564 LASSERT (conn->ksnc_rx_nob_left == 0);
1566 ksocknal_new_packet (conn, 0); /* on to next packet */
1567 return (0); /* (later) */
1575 return (-EINVAL); /* keep gcc happy */
1579 ksocknal_recv (nal_cb_t *nal, void *private, lib_msg_t *msg,
1580 unsigned int niov, struct iovec *iov, size_t mlen, size_t rlen)
1582 ksock_conn_t *conn = (ksock_conn_t *)private;
1584 LASSERT (mlen <= rlen);
1585 LASSERT (niov <= PTL_MD_MAX_IOV);
1587 conn->ksnc_cookie = msg;
1588 conn->ksnc_rx_nob_wanted = mlen;
1589 conn->ksnc_rx_nob_left = rlen;
1591 conn->ksnc_rx_nkiov = 0;
1592 conn->ksnc_rx_kiov = NULL;
1593 conn->ksnc_rx_niov = niov;
1594 conn->ksnc_rx_iov = conn->ksnc_rx_iov_space.iov;
1595 memcpy (conn->ksnc_rx_iov, iov, niov * sizeof (*iov));
1598 lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1599 lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1605 ksocknal_recv_pages (nal_cb_t *nal, void *private, lib_msg_t *msg,
1606 unsigned int niov, ptl_kiov_t *kiov, size_t mlen, size_t rlen)
1608 ksock_conn_t *conn = (ksock_conn_t *)private;
1610 LASSERT (mlen <= rlen);
1611 LASSERT (niov <= PTL_MD_MAX_IOV);
1613 conn->ksnc_cookie = msg;
1614 conn->ksnc_rx_nob_wanted = mlen;
1615 conn->ksnc_rx_nob_left = rlen;
1617 conn->ksnc_rx_niov = 0;
1618 conn->ksnc_rx_iov = NULL;
1619 conn->ksnc_rx_nkiov = niov;
1620 conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
1621 memcpy (conn->ksnc_rx_kiov, kiov, niov * sizeof (*kiov));
1624 lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1625 lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1630 int ksocknal_scheduler (void *arg)
1632 ksock_sched_t *sched = (ksock_sched_t *)arg;
1635 unsigned long flags;
1638 int id = sched - ksocknal_data.ksnd_schedulers;
1641 snprintf (name, sizeof (name),"ksocknald_%02d", id);
1642 kportal_daemonize (name);
1643 kportal_blockallsigs ();
1645 current->flags |= PF_MEMALLOC;
1647 #if (CONFIG_SMP && CPU_AFFINITY)
1648 if ((cpu_online_map & (1 << id)) != 0) {
1650 current->cpus_allowed = (1 << id);
1652 set_cpus_allowed (current, 1<<id);
1655 CERROR ("Can't set CPU affinity for %s\n", name);
1657 #endif /* CONFIG_SMP && CPU_AFFINITY */
1659 spin_lock_irqsave (&sched->kss_lock, flags);
1661 while (!ksocknal_data.ksnd_shuttingdown) {
1662 int did_something = 0;
1664 /* Ensure I progress everything semi-fairly */
1666 if (!list_empty (&sched->kss_rx_conns)) {
1667 conn = list_entry(sched->kss_rx_conns.next,
1668 ksock_conn_t, ksnc_rx_list);
1669 list_del(&conn->ksnc_rx_list);
1671 LASSERT(conn->ksnc_rx_scheduled);
1672 LASSERT(conn->ksnc_rx_ready);
1674 /* clear rx_ready in case receive isn't complete.
1675 * Do it BEFORE we call process_recv, since
1676 * data_ready can set it any time after we release
1678 conn->ksnc_rx_ready = 0;
1679 spin_unlock_irqrestore(&sched->kss_lock, flags);
1681 rc = ksocknal_process_receive(conn);
1683 spin_lock_irqsave(&sched->kss_lock, flags);
1685 /* I'm the only one that can clear this flag */
1686 LASSERT(conn->ksnc_rx_scheduled);
1688 /* Did process_receive get everything it wanted? */
1690 conn->ksnc_rx_ready = 1;
1692 if (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP ||
1693 conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB) {
1694 /* Conn blocked for a forwarding buffer.
1695 * It will get queued for my attention when
1696 * one becomes available (and it might just
1697 * already have been!). Meanwhile my ref
1698 * on it stays put. */
1699 } else if (conn->ksnc_rx_ready) {
1700 /* reschedule for rx */
1701 list_add_tail (&conn->ksnc_rx_list,
1702 &sched->kss_rx_conns);
1704 conn->ksnc_rx_scheduled = 0;
1706 ksocknal_put_conn(conn);
1712 if (!list_empty (&sched->kss_tx_conns)) {
1713 conn = list_entry(sched->kss_tx_conns.next,
1714 ksock_conn_t, ksnc_tx_list);
1715 list_del (&conn->ksnc_tx_list);
1717 LASSERT(conn->ksnc_tx_scheduled);
1718 LASSERT(conn->ksnc_tx_ready);
1719 LASSERT(!list_empty(&conn->ksnc_tx_queue));
1721 tx = list_entry(conn->ksnc_tx_queue.next,
1722 ksock_tx_t, tx_list);
1723 /* dequeue now so empty list => more to send */
1724 list_del(&tx->tx_list);
1726 /* Clear tx_ready in case send isn't complete. Do
1727 * it BEFORE we call process_transmit, since
1728 * write_space can set it any time after we release
1730 conn->ksnc_tx_ready = 0;
1731 spin_unlock_irqrestore (&sched->kss_lock, flags);
1733 rc = ksocknal_process_transmit(conn, tx);
1735 spin_lock_irqsave (&sched->kss_lock, flags);
1737 if (rc != -EAGAIN) {
1738 /* error or everything went: assume more can go */
1739 conn->ksnc_tx_ready = 1;
1741 /* back onto HEAD of tx_queue */
1742 list_add (&tx->tx_list, &conn->ksnc_tx_queue);
1745 if (conn->ksnc_tx_ready &&
1746 !list_empty (&conn->ksnc_tx_queue)) {
1747 /* reschedule for tx */
1748 list_add_tail (&conn->ksnc_tx_list,
1749 &sched->kss_tx_conns);
1751 conn->ksnc_tx_scheduled = 0;
1753 ksocknal_put_conn (conn);
1759 if (!list_empty (&sched->kss_zctxdone_list)) {
1761 list_entry(sched->kss_zctxdone_list.next,
1762 ksock_tx_t, tx_list);
1765 list_del (&tx->tx_list);
1766 spin_unlock_irqrestore (&sched->kss_lock, flags);
1768 ksocknal_tx_done (tx, 1);
1770 spin_lock_irqsave (&sched->kss_lock, flags);
1773 if (!did_something || /* nothing to do */
1774 ++nloops == SOCKNAL_RESCHED) { /* hogging CPU? */
1775 spin_unlock_irqrestore (&sched->kss_lock, flags);
1779 if (!did_something) { /* wait for something to do */
1781 rc = wait_event_interruptible (sched->kss_waitq,
1782 ksocknal_data.ksnd_shuttingdown ||
1783 !list_empty(&sched->kss_rx_conns) ||
1784 !list_empty(&sched->kss_tx_conns) ||
1785 !list_empty(&sched->kss_zctxdone_list));
1787 rc = wait_event_interruptible (sched->kss_waitq,
1788 ksocknal_data.ksnd_shuttingdown ||
1789 !list_empty(&sched->kss_rx_conns) ||
1790 !list_empty(&sched->kss_tx_conns));
1796 spin_lock_irqsave (&sched->kss_lock, flags);
1800 spin_unlock_irqrestore (&sched->kss_lock, flags);
1801 ksocknal_thread_fini ();
1806 ksocknal_data_ready (struct sock *sk, int n)
1808 unsigned long flags;
1810 ksock_sched_t *sched;
1813 /* interleave correctly with closing sockets... */
1814 read_lock (&ksocknal_data.ksnd_global_lock);
1816 conn = sk->sk_user_data;
1817 if (conn == NULL) { /* raced with ksocknal_terminate_conn */
1818 LASSERT (sk->sk_data_ready != &ksocknal_data_ready);
1819 sk->sk_data_ready (sk, n);
1821 sched = conn->ksnc_scheduler;
1823 spin_lock_irqsave (&sched->kss_lock, flags);
1825 conn->ksnc_rx_ready = 1;
1827 if (!conn->ksnc_rx_scheduled) { /* not being progressed */
1828 list_add_tail(&conn->ksnc_rx_list,
1829 &sched->kss_rx_conns);
1830 conn->ksnc_rx_scheduled = 1;
1831 /* extra ref for scheduler */
1832 atomic_inc (&conn->ksnc_refcount);
1834 wake_up (&sched->kss_waitq);
1837 spin_unlock_irqrestore (&sched->kss_lock, flags);
1840 read_unlock (&ksocknal_data.ksnd_global_lock);
1846 ksocknal_write_space (struct sock *sk)
1848 unsigned long flags;
1850 ksock_sched_t *sched;
1852 /* interleave correctly with closing sockets... */
1853 read_lock (&ksocknal_data.ksnd_global_lock);
1855 conn = sk->sk_user_data;
1857 CDEBUG(D_NET, "sk %p wspace %d low water %d conn %p%s%s%s\n",
1858 sk, tcp_wspace(sk), SOCKNAL_TX_LOW_WATER(sk), conn,
1859 (conn == NULL) ? "" : (conn->ksnc_tx_ready ?
1860 " ready" : " blocked"),
1861 (conn == NULL) ? "" : (conn->ksnc_tx_scheduled ?
1862 " scheduled" : " idle"),
1863 (conn == NULL) ? "" : (list_empty (&conn->ksnc_tx_queue) ?
1864 " empty" : " queued"));
1866 if (conn == NULL) { /* raced with ksocknal_terminate_conn */
1867 LASSERT (sk->sk_write_space != &ksocknal_write_space);
1868 sk->sk_write_space (sk);
1870 read_unlock (&ksocknal_data.ksnd_global_lock);
1874 if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */
1875 clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags);
1877 sched = conn->ksnc_scheduler;
1879 spin_lock_irqsave (&sched->kss_lock, flags);
1881 conn->ksnc_tx_ready = 1;
1883 if (!conn->ksnc_tx_scheduled && // not being progressed
1884 !list_empty(&conn->ksnc_tx_queue)){//packets to send
1885 list_add_tail (&conn->ksnc_tx_list,
1886 &sched->kss_tx_conns);
1887 conn->ksnc_tx_scheduled = 1;
1888 /* extra ref for scheduler */
1889 atomic_inc (&conn->ksnc_refcount);
1891 wake_up (&sched->kss_waitq);
1894 spin_unlock_irqrestore (&sched->kss_lock, flags);
1897 read_unlock (&ksocknal_data.ksnd_global_lock);
1901 ksocknal_sock_write (struct socket *sock, void *buffer, int nob)
1904 mm_segment_t oldmm = get_fs();
1907 struct iovec iov = {
1911 struct msghdr msg = {
1916 .msg_control = NULL,
1917 .msg_controllen = 0,
1922 rc = sock_sendmsg (sock, &msg, iov.iov_len);
1929 CERROR ("Unexpected zero rc\n");
1930 return (-ECONNABORTED);
1933 buffer = ((char *)buffer) + rc;
1941 ksocknal_sock_read (struct socket *sock, void *buffer, int nob)
1944 mm_segment_t oldmm = get_fs();
1947 struct iovec iov = {
1951 struct msghdr msg = {
1956 .msg_control = NULL,
1957 .msg_controllen = 0,
1962 rc = sock_recvmsg (sock, &msg, iov.iov_len, 0);
1969 return (-ECONNABORTED);
1971 buffer = ((char *)buffer) + rc;
1979 ksocknal_hello (struct socket *sock, ptl_nid_t *nid, int *type, __u64 *incarnation)
1983 ptl_magicversion_t *hmv = (ptl_magicversion_t *)&hdr.dest_nid;
1985 LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
1987 memset (&hdr, 0, sizeof (hdr));
1988 hmv->magic = __cpu_to_le32 (PORTALS_PROTO_MAGIC);
1989 hmv->version_major = __cpu_to_le32 (PORTALS_PROTO_VERSION_MAJOR);
1990 hmv->version_minor = __cpu_to_le32 (PORTALS_PROTO_VERSION_MINOR);
1992 hdr.src_nid = __cpu_to_le64 (ksocknal_lib.ni.nid);
1993 hdr.type = __cpu_to_le32 (PTL_MSG_HELLO);
1995 hdr.msg.hello.type = __cpu_to_le32 (*type);
1996 hdr.msg.hello.incarnation =
1997 __cpu_to_le64 (ksocknal_data.ksnd_incarnation);
1999 /* Assume sufficient socket buffering for this message */
2000 rc = ksocknal_sock_write (sock, &hdr, sizeof (hdr));
2002 CERROR ("Error %d sending HELLO to "LPX64"\n", rc, *nid);
2006 rc = ksocknal_sock_read (sock, hmv, sizeof (*hmv));
2008 CERROR ("Error %d reading HELLO from "LPX64"\n", rc, *nid);
2012 if (hmv->magic != __le32_to_cpu (PORTALS_PROTO_MAGIC)) {
2013 CERROR ("Bad magic %#08x (%#08x expected) from "LPX64"\n",
2014 __cpu_to_le32 (hmv->magic), PORTALS_PROTO_MAGIC, *nid);
2018 if (hmv->version_major != __cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR) ||
2019 hmv->version_minor != __cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR)) {
2020 CERROR ("Incompatible protocol version %d.%d (%d.%d expected)"
2022 __le16_to_cpu (hmv->version_major),
2023 __le16_to_cpu (hmv->version_minor),
2024 PORTALS_PROTO_VERSION_MAJOR,
2025 PORTALS_PROTO_VERSION_MINOR,
2030 #if (PORTALS_PROTO_VERSION_MAJOR != 0)
2031 # error "This code only understands protocol version 0.x"
2033 /* version 0 sends magic/version as the dest_nid of a 'hello' header,
2034 * so read the rest of it in now... */
2036 rc = ksocknal_sock_read (sock, hmv + 1, sizeof (hdr) - sizeof (*hmv));
2038 CERROR ("Error %d reading rest of HELLO hdr from "LPX64"\n",
2043 /* ...and check we got what we expected */
2044 if (hdr.type != __cpu_to_le32 (PTL_MSG_HELLO) ||
2045 hdr.payload_length != __cpu_to_le32 (0)) {
2046 CERROR ("Expecting a HELLO hdr with 0 payload,"
2047 " but got type %d with %d payload from "LPX64"\n",
2048 __le32_to_cpu (hdr.type),
2049 __le32_to_cpu (hdr.payload_length), *nid);
2053 if (__le64_to_cpu(hdr.src_nid) == PTL_NID_ANY) {
2054 CERROR("Expecting a HELLO hdr with a NID, but got PTL_NID_ANY\n");
2058 if (*nid == PTL_NID_ANY) { /* don't know peer's nid yet */
2059 *nid = __le64_to_cpu(hdr.src_nid);
2060 } else if (*nid != __le64_to_cpu (hdr.src_nid)) {
2061 CERROR ("Connected to nid "LPX64", but expecting "LPX64"\n",
2062 __le64_to_cpu (hdr.src_nid), *nid);
2066 if (*type == SOCKNAL_CONN_NONE) {
2067 /* I've accepted this connection; peer determines type */
2068 *type = __le32_to_cpu(hdr.msg.hello.type);
2070 case SOCKNAL_CONN_ANY:
2071 case SOCKNAL_CONN_CONTROL:
2073 case SOCKNAL_CONN_BULK_IN:
2074 *type = SOCKNAL_CONN_BULK_OUT;
2076 case SOCKNAL_CONN_BULK_OUT:
2077 *type = SOCKNAL_CONN_BULK_IN;
2080 CERROR ("Unexpected type %d from "LPX64"\n", *type, *nid);
2083 } else if (__le32_to_cpu(hdr.msg.hello.type) != SOCKNAL_CONN_NONE) {
2084 CERROR ("Mismatched types: me %d "LPX64" %d\n",
2085 *type, *nid, __le32_to_cpu(hdr.msg.hello.type));
2089 *incarnation = __le64_to_cpu(hdr.msg.hello.incarnation);
2095 ksocknal_setup_sock (struct socket *sock)
2097 mm_segment_t oldmm = get_fs ();
2100 struct linger linger;
2102 sock->sk->allocation = GFP_MEMALLOC;
2104 /* Ensure this socket aborts active sends immediately when we close
2108 linger.l_linger = 0;
2111 rc = sock_setsockopt (sock, SOL_SOCKET, SO_LINGER,
2112 (char *)&linger, sizeof (linger));
2115 CERROR ("Can't set SO_LINGER: %d\n", rc);
2121 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_LINGER2,
2122 (char *)&option, sizeof (option));
2125 CERROR ("Can't set SO_LINGER2: %d\n", rc);
2129 #if SOCKNAL_USE_KEEPALIVES
2130 /* Keepalives: If 3/4 of the timeout elapses, start probing every
2131 * second until the timeout elapses. */
2133 option = (ksocknal_data.ksnd_io_timeout * 3) / 4;
2135 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPIDLE,
2136 (char *)&option, sizeof (option));
2139 CERROR ("Can't set TCP_KEEPIDLE: %d\n", rc);
2145 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPINTVL,
2146 (char *)&option, sizeof (option));
2149 CERROR ("Can't set TCP_KEEPINTVL: %d\n", rc);
2153 option = ksocknal_data.ksnd_io_timeout / 4;
2155 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPCNT,
2156 (char *)&option, sizeof (option));
2159 CERROR ("Can't set TCP_KEEPINTVL: %d\n", rc);
2165 rc = sock_setsockopt (sock, SOL_SOCKET, SO_KEEPALIVE,
2166 (char *)&option, sizeof (option));
2169 CERROR ("Can't set SO_KEEPALIVE: %d\n", rc);
2177 ksocknal_connect_peer (ksock_route_t *route, int type)
2179 struct sockaddr_in peer_addr;
2180 mm_segment_t oldmm = get_fs();
2183 struct socket *sock;
2186 rc = sock_create (PF_INET, SOCK_STREAM, 0, &sock);
2188 CERROR ("Can't create autoconnect socket: %d\n", rc);
2192 /* Ugh; have to map_fd for compatibility with sockets passed in
2193 * from userspace. And we actually need the sock->file refcounting
2194 * that this gives you :) */
2196 fd = sock_map_fd (sock);
2198 sock_release (sock);
2199 CERROR ("sock_map_fd error %d\n", fd);
2203 /* NB the fd now owns the ref on sock->file */
2204 LASSERT (sock->file != NULL);
2205 LASSERT (file_count(sock->file) == 1);
2207 /* Set the socket timeouts, so our connection attempt completes in
2209 tv.tv_sec = ksocknal_data.ksnd_io_timeout;
2213 rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDTIMEO,
2214 (char *)&tv, sizeof (tv));
2217 CERROR ("Can't set send timeout %d: %d\n",
2218 ksocknal_data.ksnd_io_timeout, rc);
2223 rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVTIMEO,
2224 (char *)&tv, sizeof (tv));
2227 CERROR ("Can't set receive timeout %d: %d\n",
2228 ksocknal_data.ksnd_io_timeout, rc);
2232 if (route->ksnr_nonagel) {
2236 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_NODELAY,
2237 (char *)&option, sizeof (option));
2240 CERROR ("Can't disable nagel: %d\n", rc);
2245 if (route->ksnr_buffer_size != 0) {
2246 int option = route->ksnr_buffer_size;
2249 rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDBUF,
2250 (char *)&option, sizeof (option));
2253 CERROR ("Can't set send buffer %d: %d\n",
2254 route->ksnr_buffer_size, rc);
2259 rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVBUF,
2260 (char *)&option, sizeof (option));
2263 CERROR ("Can't set receive buffer %d: %d\n",
2264 route->ksnr_buffer_size, rc);
2269 memset (&peer_addr, 0, sizeof (peer_addr));
2270 peer_addr.sin_family = AF_INET;
2271 peer_addr.sin_port = htons (route->ksnr_port);
2272 peer_addr.sin_addr.s_addr = htonl (route->ksnr_ipaddr);
2274 rc = sock->ops->connect (sock, (struct sockaddr *)&peer_addr,
2275 sizeof (peer_addr), sock->file->f_flags);
2277 CERROR ("Error %d connecting to "LPX64"\n", rc,
2278 route->ksnr_peer->ksnp_nid);
2282 rc = ksocknal_create_conn (route, sock, route->ksnr_irq_affinity, type);
2284 /* Take an extra ref on sock->file to compensate for the
2285 * upcoming close which will lose fd's ref on it. */
2286 get_file (sock->file);
2295 ksocknal_autoconnect (ksock_route_t *route)
2297 LIST_HEAD (zombies);
2300 unsigned long flags;
2305 for (type = 0; type < SOCKNAL_CONN_NTYPES; type++)
2306 if ((route->ksnr_connecting & (1 << type)) != 0)
2308 LASSERT (type < SOCKNAL_CONN_NTYPES);
2310 rc = ksocknal_connect_peer (route, type);
2315 /* successfully autoconnected: create_conn did the
2316 * route/conn binding and scheduled any blocked packets */
2318 if (route->ksnr_connecting == 0) {
2319 /* No more connections required */
2324 /* Connection attempt failed */
2326 write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
2328 peer = route->ksnr_peer;
2329 route->ksnr_connecting = 0;
2331 /* This is a retry rather than a new connection */
2332 LASSERT (route->ksnr_retry_interval != 0);
2333 route->ksnr_timeout = jiffies + route->ksnr_retry_interval;
2334 route->ksnr_retry_interval = MIN (route->ksnr_retry_interval * 2,
2335 SOCKNAL_MAX_RECONNECT_INTERVAL);
2337 if (!list_empty (&peer->ksnp_tx_queue) &&
2338 ksocknal_find_connecting_route_locked (peer) == NULL) {
2339 LASSERT (list_empty (&peer->ksnp_conns));
2341 /* None of the connections that the blocked packets are
2342 * waiting for have been successful. Complete them now... */
2344 tx = list_entry (peer->ksnp_tx_queue.next,
2345 ksock_tx_t, tx_list);
2346 list_del (&tx->tx_list);
2347 list_add_tail (&tx->tx_list, &zombies);
2348 } while (!list_empty (&peer->ksnp_tx_queue));
2351 /* make this route least-favourite for re-selection */
2352 if (!route->ksnr_deleted) {
2353 list_del(&route->ksnr_list);
2354 list_add_tail(&route->ksnr_list, &peer->ksnp_routes);
2357 write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
2359 while (!list_empty (&zombies)) {
2360 tx = list_entry (zombies.next, ksock_tx_t, tx_list);
2362 CERROR ("Deleting packet type %d len %d ("LPX64"->"LPX64")\n",
2363 NTOH__u32 (tx->tx_hdr->type),
2364 NTOH__u32 (tx->tx_hdr->payload_length),
2365 NTOH__u64 (tx->tx_hdr->src_nid),
2366 NTOH__u64 (tx->tx_hdr->dest_nid));
2368 list_del (&tx->tx_list);
2370 ksocknal_tx_done (tx, 0);
2375 ksocknal_autoconnectd (void *arg)
2377 long id = (long)arg;
2379 unsigned long flags;
2380 ksock_route_t *route;
2383 snprintf (name, sizeof (name), "ksocknal_ad%02ld", id);
2384 kportal_daemonize (name);
2385 kportal_blockallsigs ();
2387 current->flags |= PF_MEMALLOC;
2389 spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2391 while (!ksocknal_data.ksnd_shuttingdown) {
2393 if (!list_empty (&ksocknal_data.ksnd_autoconnectd_routes)) {
2394 route = list_entry (ksocknal_data.ksnd_autoconnectd_routes.next,
2395 ksock_route_t, ksnr_connect_list);
2397 list_del (&route->ksnr_connect_list);
2398 spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2400 ksocknal_autoconnect (route);
2401 ksocknal_put_route (route);
2403 spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2407 spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2409 rc = wait_event_interruptible (ksocknal_data.ksnd_autoconnectd_waitq,
2410 ksocknal_data.ksnd_shuttingdown ||
2411 !list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
2413 spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2416 spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2418 ksocknal_thread_fini ();
2423 ksocknal_find_timed_out_conn (ksock_peer_t *peer)
2425 /* We're called with a shared lock on ksnd_global_lock */
2427 struct list_head *ctmp;
2428 ksock_sched_t *sched;
2430 list_for_each (ctmp, &peer->ksnp_conns) {
2431 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
2432 sched = conn->ksnc_scheduler;
2434 /* Don't need the {get,put}connsock dance to deref ksnc_sock... */
2435 LASSERT (!conn->ksnc_closing);
2437 if (conn->ksnc_rx_started &&
2438 time_after_eq (jiffies, conn->ksnc_rx_deadline)) {
2439 /* Timed out incomplete incoming message */
2440 atomic_inc (&conn->ksnc_refcount);
2441 CERROR ("Timed out RX from "LPX64" %p\n",
2442 peer->ksnp_nid, conn);
2446 if ((!list_empty (&conn->ksnc_tx_queue) ||
2447 conn->ksnc_sock->sk->sk_wmem_queued != 0) &&
2448 time_after_eq (jiffies, conn->ksnc_tx_deadline)) {
2449 /* Timed out messages queued for sending, or
2450 * messages buffered in the socket's send buffer */
2451 atomic_inc (&conn->ksnc_refcount);
2452 CERROR ("Timed out TX to "LPX64" %s%d %p\n",
2454 list_empty (&conn->ksnc_tx_queue) ? "" : "Q ",
2455 conn->ksnc_sock->sk->sk_wmem_queued, conn);
2464 ksocknal_check_peer_timeouts (int idx)
2466 struct list_head *peers = &ksocknal_data.ksnd_peers[idx];
2467 struct list_head *ptmp;
2472 /* NB. We expect to have a look at all the peers and not find any
2473 * connections to time out, so we just use a shared lock while we
2475 read_lock (&ksocknal_data.ksnd_global_lock);
2477 list_for_each (ptmp, peers) {
2478 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
2479 conn = ksocknal_find_timed_out_conn (peer);
2482 read_unlock (&ksocknal_data.ksnd_global_lock);
2484 CERROR ("Timeout out conn->"LPX64" ip %x:%d\n",
2485 peer->ksnp_nid, conn->ksnc_ipaddr,
2487 ksocknal_close_conn_and_siblings (conn, -ETIMEDOUT);
2489 /* NB we won't find this one again, but we can't
2490 * just proceed with the next peer, since we dropped
2491 * ksnd_global_lock and it might be dead already! */
2492 ksocknal_put_conn (conn);
2497 read_unlock (&ksocknal_data.ksnd_global_lock);
2501 ksocknal_reaper (void *arg)
2504 unsigned long flags;
2509 unsigned long deadline = jiffies;
2511 kportal_daemonize ("ksocknal_reaper");
2512 kportal_blockallsigs ();
2514 init_waitqueue_entry (&wait, current);
2516 current->flags |= PF_MEMALLOC;
2518 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2520 while (!ksocknal_data.ksnd_shuttingdown) {
2522 if (!list_empty (&ksocknal_data.ksnd_deathrow_conns)) {
2523 conn = list_entry (ksocknal_data.ksnd_deathrow_conns.next,
2524 ksock_conn_t, ksnc_list);
2525 list_del (&conn->ksnc_list);
2527 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2529 ksocknal_terminate_conn (conn);
2530 ksocknal_put_conn (conn);
2532 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2536 if (!list_empty (&ksocknal_data.ksnd_zombie_conns)) {
2537 conn = list_entry (ksocknal_data.ksnd_zombie_conns.next,
2538 ksock_conn_t, ksnc_list);
2539 list_del (&conn->ksnc_list);
2541 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2543 ksocknal_destroy_conn (conn);
2545 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2549 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2551 /* careful with the jiffy wrap... */
2552 while ((timeout = ((int)deadline - (int)jiffies)) <= 0) {
2555 int chunk = ksocknal_data.ksnd_peer_hash_size;
2557 /* Time to check for timeouts on a few more peers: I do
2558 * checks every 'p' seconds on a proportion of the peer
2559 * table and I need to check every connection 'n' times
2560 * within a timeout interval, to ensure I detect a
2561 * timeout on any connection within (n+1)/n times the
2562 * timeout interval. */
2564 if (ksocknal_data.ksnd_io_timeout > n * p)
2565 chunk = (chunk * n * p) /
2566 ksocknal_data.ksnd_io_timeout;
2570 for (i = 0; i < chunk; i++) {
2571 ksocknal_check_peer_timeouts (peer_index);
2572 peer_index = (peer_index + 1) %
2573 ksocknal_data.ksnd_peer_hash_size;
2579 add_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
2580 set_current_state (TASK_INTERRUPTIBLE);
2582 if (!ksocknal_data.ksnd_shuttingdown &&
2583 list_empty (&ksocknal_data.ksnd_deathrow_conns) &&
2584 list_empty (&ksocknal_data.ksnd_zombie_conns))
2585 schedule_timeout (timeout);
2587 set_current_state (TASK_RUNNING);
2588 remove_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
2590 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2593 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2595 ksocknal_thread_fini ();
2599 nal_cb_t ksocknal_lib = {
2600 nal_data: &ksocknal_data, /* NAL private data */
2601 cb_send: ksocknal_send,
2602 cb_send_pages: ksocknal_send_pages,
2603 cb_recv: ksocknal_recv,
2604 cb_recv_pages: ksocknal_recv_pages,
2605 cb_read: ksocknal_read,
2606 cb_write: ksocknal_write,
2607 cb_callback: ksocknal_callback,
2608 cb_malloc: ksocknal_malloc,
2609 cb_free: ksocknal_free,
2610 cb_printf: ksocknal_printf,
2611 cb_cli: ksocknal_cli,
2612 cb_sti: ksocknal_sti,
2613 cb_dist: ksocknal_dist