1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5 * Author: Zach Brown <zab@zabbo.net>
6 * Author: Peter J. Braam <braam@clusterfs.com>
7 * Author: Phil Schwan <phil@clusterfs.com>
8 * Author: Eric Barton <eric@bartonsoftware.com>
10 * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
12 * Portals is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Portals is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Portals; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
28 atomic_t ksocknal_packets_received;
29 atomic_t ksocknal_packets_launched;
30 atomic_t ksocknal_packets_being_sent;
33 int ksocknal_do_zc = 1;
34 int ksocknal_zc_min_frag = 2048;
38 * LIB functions follow
42 ksocknal_read(nal_cb_t *nal, void *private, void *dst_addr,
43 user_ptr src_addr, size_t len)
45 CDEBUG(D_NET, LPX64": reading %ld bytes from %p -> %p\n",
46 nal->ni.nid, (long)len, src_addr, dst_addr);
48 memcpy( dst_addr, src_addr, len );
53 ksocknal_write(nal_cb_t *nal, void *private, user_ptr dst_addr,
54 void *src_addr, size_t len)
56 CDEBUG(D_NET, LPX64": writing %ld bytes from %p -> %p\n",
57 nal->ni.nid, (long)len, src_addr, dst_addr);
59 memcpy( dst_addr, src_addr, len );
64 ksocknal_callback (nal_cb_t * nal, void *private, lib_eq_t *eq,
67 CDEBUG(D_NET, LPX64": callback eq %p ev %p\n",
70 if (eq->event_callback != NULL)
71 eq->event_callback(ev);
77 ksocknal_malloc(nal_cb_t *nal, size_t len)
81 PORTAL_ALLOC(buf, len);
90 ksocknal_free(nal_cb_t *nal, void *buf, size_t len)
92 PORTAL_FREE(buf, len);
96 ksocknal_printf(nal_cb_t *nal, const char *fmt, ...)
102 vsnprintf (msg, sizeof (msg), fmt, ap); /* sprint safely */
105 msg[sizeof (msg) - 1] = 0; /* ensure terminated */
107 CDEBUG (D_NET, "%s", msg);
111 ksocknal_cli(nal_cb_t *nal, unsigned long *flags)
113 ksock_nal_data_t *data = nal->nal_data;
115 spin_lock(&data->ksnd_nal_cb_lock);
119 ksocknal_sti(nal_cb_t *nal, unsigned long *flags)
121 ksock_nal_data_t *data;
122 data = nal->nal_data;
124 spin_unlock(&data->ksnd_nal_cb_lock);
128 ksocknal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
130 /* I would guess that if ksocknal_get_conn(nid) == NULL,
131 and we're not routing, then 'nid' is very distant :) */
132 if ( nal->ni.nid == nid ) {
142 ksocknal_get_ltx (int may_block)
145 ksock_ltx_t *ltx = NULL;
148 spin_lock_irqsave (&ksocknal_data.ksnd_idle_ltx_lock, flags);
150 if (!list_empty (&ksocknal_data.ksnd_idle_ltx_list)) {
151 ltx = list_entry(ksocknal_data.ksnd_idle_ltx_list.next,
152 ksock_ltx_t, ltx_tx.tx_list);
153 list_del (<x->ltx_tx.tx_list);
158 if (!list_empty(&ksocknal_data.ksnd_idle_nblk_ltx_list)) {
159 ltx = list_entry(ksocknal_data.ksnd_idle_nblk_ltx_list.next,
160 ksock_ltx_t, ltx_tx.tx_list);
161 list_del (<x->ltx_tx.tx_list);
166 spin_unlock_irqrestore(&ksocknal_data.ksnd_idle_ltx_lock,
169 wait_event (ksocknal_data.ksnd_idle_ltx_waitq,
170 !list_empty (&ksocknal_data.ksnd_idle_ltx_list));
173 spin_unlock_irqrestore (&ksocknal_data.ksnd_idle_ltx_lock, flags);
180 ksocknal_kvaddr_to_page (unsigned long vaddr)
184 if (vaddr >= VMALLOC_START &&
186 page = vmalloc_to_page ((void *)vaddr);
188 else if (vaddr >= PKMAP_BASE &&
189 vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
190 page = vmalloc_to_page ((void *)vaddr);
191 /* in 2.4 ^ just walks the page tables */
194 page = virt_to_page (vaddr);
205 ksocknal_send_iov (struct socket *sock, ksock_tx_t *tx, int more)
207 struct iovec *iov = tx->tx_iov;
208 int fragsize = iov->iov_len;
209 unsigned long vaddr = (unsigned long)iov->iov_base;
211 int offset = vaddr & (PAGE_SIZE - 1);
212 int zcsize = MIN (fragsize, PAGE_SIZE - offset);
217 /* NB we can't trust socket ops to either consume our iovs
218 * or leave them alone, so we only send 1 frag at a time. */
219 LASSERT (fragsize <= tx->tx_nob);
220 LASSERT (tx->tx_niov > 0);
221 more |= (tx->tx_niov > 1);
224 if (ksocknal_do_zc &&
225 (sock->sk->route_caps & NETIF_F_SG) &&
226 (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
227 zcsize >= ksocknal_zc_min_frag &&
228 (page = ksocknal_kvaddr_to_page (vaddr)) != NULL) {
230 CDEBUG(D_NET, "vaddr %p, page %p->%p + offset %x for %d\n",
231 (void *)vaddr, page, page_address(page), offset, zcsize);
233 more |= (zcsize < fragsize);
235 rc = tcp_sendpage_zccd(sock, page, offset, zcsize,
236 more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
241 /* NB don't pass tx's iov; sendmsg may or may not update it */
242 struct iovec fragiov = { .iov_base = (void *)vaddr,
243 .iov_len = fragsize};
244 struct msghdr msg = {
251 .msg_flags = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
253 mm_segment_t oldmm = get_fs();
256 rc = sock->sk->prot->sendmsg(sock->sk, &msg, fragsize);
266 /* didn't send whole frag */
267 iov->iov_base = (void *)(vaddr + rc);
268 iov->iov_len = fragsize - rc;
272 /* everything went */
273 LASSERT (rc == fragsize);
280 ksocknal_send_kiov (struct socket *sock, ksock_tx_t *tx, int more)
282 ptl_kiov_t *kiov = tx->tx_kiov;
283 int fragsize = kiov->kiov_len;
284 struct page *page = kiov->kiov_page;
285 int offset = kiov->kiov_offset;
288 /* NB we can't trust socket ops to either consume our iovs
289 * or leave them alone, so we only send 1 frag at a time. */
290 LASSERT (fragsize <= tx->tx_nob);
291 LASSERT (offset + fragsize <= PAGE_SIZE);
292 LASSERT (tx->tx_nkiov > 0);
293 more |= (tx->tx_nkiov > 1);
296 if (ksocknal_do_zc &&
297 (sock->sk->route_caps & NETIF_F_SG) &&
298 (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
299 fragsize >= ksocknal_zc_min_frag) {
301 CDEBUG(D_NET, "page %p + offset %x for %d\n",
302 page, offset, fragsize);
304 rc = tcp_sendpage_zccd(sock, page, offset, fragsize,
305 more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
310 char *addr = ((char *)kmap (page)) + offset;
311 struct iovec fragiov = {.iov_base = addr,
312 .iov_len = fragsize};
313 struct msghdr msg = {
320 .msg_flags = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
322 mm_segment_t oldmm = get_fs();
325 rc = sock->sk->prot->sendmsg(sock->sk, &msg, fragsize);
336 /* didn't send whole frag */
337 kiov->kiov_offset = offset + rc;
338 kiov->kiov_len = fragsize - rc;
342 /* everything went */
343 LASSERT (rc == fragsize);
350 ksocknal_sendmsg (struct socket *sock, ksock_tx_t *tx, int more)
356 LASSERT (!in_interrupt());
359 if (tx->tx_niov != 0)
360 rc = ksocknal_send_iov (sock, tx, more || tx->tx_nkiov != 0);
362 rc = ksocknal_send_kiov (sock, tx, more);
364 /* Interpret a zero rc the same as -EAGAIN (Adaptech TOE) */
365 if (rc <= 0) /* error or partial send */
366 RETURN ((sent_some || rc == -EAGAIN) ? 0 : rc);
368 if (tx->tx_nob == 0) /* sent everything */
376 ksocknal_recv_iov (ksock_conn_t *conn)
378 struct iovec *iov = conn->ksnc_rx_iov;
379 int fragsize = iov->iov_len;
380 unsigned long vaddr = (unsigned long)iov->iov_base;
381 struct iovec fragiov = { .iov_base = (void *)vaddr,
382 .iov_len = fragsize};
383 struct msghdr msg = {
392 mm_segment_t oldmm = get_fs();
395 /* NB we can't trust socket ops to either consume our iovs
396 * or leave them alone, so we only receive 1 frag at a time. */
397 LASSERT (conn->ksnc_rx_niov > 0);
398 LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
401 rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
402 /* NB this is just a boolean............................^ */
408 conn->ksnc_rx_nob_wanted -= rc;
409 conn->ksnc_rx_nob_left -= rc;
412 iov->iov_base = (void *)(vaddr + rc);
413 iov->iov_len = fragsize - rc;
417 LASSERT (rc == fragsize);
419 conn->ksnc_rx_niov--;
424 ksocknal_recv_kiov (ksock_conn_t *conn)
426 ptl_kiov_t *kiov = conn->ksnc_rx_kiov;
427 struct page *page = kiov->kiov_page;
428 int offset = kiov->kiov_offset;
429 int fragsize = kiov->kiov_len;
430 unsigned long vaddr = ((unsigned long)kmap (page)) + offset;
431 struct iovec fragiov = { .iov_base = (void *)vaddr,
432 .iov_len = fragsize};
433 struct msghdr msg = {
442 mm_segment_t oldmm = get_fs();
445 /* NB we can't trust socket ops to either consume our iovs
446 * or leave them alone, so we only receive 1 frag at a time. */
447 LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
448 LASSERT (conn->ksnc_rx_nkiov > 0);
449 LASSERT (offset + fragsize <= PAGE_SIZE);
452 rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
453 /* NB this is just a boolean............................^ */
460 conn->ksnc_rx_nob_wanted -= rc;
461 conn->ksnc_rx_nob_left -= rc;
464 kiov->kiov_offset = offset + rc;
465 kiov->kiov_len = fragsize - rc;
469 LASSERT (rc == fragsize);
470 conn->ksnc_rx_kiov++;
471 conn->ksnc_rx_nkiov--;
476 ksocknal_recvmsg (ksock_conn_t *conn)
482 LASSERT (!in_interrupt ());
485 LASSERT (conn->ksnc_rx_nob_wanted > 0);
487 if (conn->ksnc_rx_niov != 0)
488 rc = ksocknal_recv_iov (conn);
490 rc = ksocknal_recv_kiov (conn);
492 /* CAVEAT EMPTOR: we return...
493 * <= 0 for error (0 == EOF) and > 0 for success (unlike sendmsg()) */
495 if (rc <= 0) /* error/EOF or partial receive */
496 RETURN ((got_some || rc == -EAGAIN) ? 1 : rc);
498 if (conn->ksnc_rx_nob_wanted == 0)
507 ksocknal_zc_callback (zccd_t *zcd)
509 ksock_tx_t *tx = KSOCK_ZCCD_2_TX(zcd);
510 ksock_sched_t *sched = tx->tx_sched;
514 /* Schedule tx for cleanup (can't do it now due to lock conflicts) */
516 spin_lock_irqsave (&sched->kss_lock, flags);
518 list_add_tail (&tx->tx_list, &sched->kss_zctxdone_list);
519 if (waitqueue_active (&sched->kss_waitq))
520 wake_up (&sched->kss_waitq);
522 spin_unlock_irqrestore (&sched->kss_lock, flags);
528 ksocknal_tx_done (ksock_tx_t *tx)
534 atomic_dec (&ksocknal_packets_being_sent);
536 if (tx->tx_isfwd) { /* was a forwarded packet? */
537 kpr_fwd_done (&ksocknal_data.ksnd_router,
538 KSOCK_TX_2_KPR_FWD_DESC (tx), 0);
544 ltx = KSOCK_TX_2_KSOCK_LTX (tx);
546 lib_finalize (&ksocknal_lib, ltx->ltx_private, ltx->ltx_cookie);
548 spin_lock_irqsave (&ksocknal_data.ksnd_idle_ltx_lock, flags);
550 list_add_tail (<x->ltx_tx.tx_list, ltx->ltx_idle);
552 /* normal tx desc => wakeup anyone blocking for one */
553 if (ltx->ltx_idle == &ksocknal_data.ksnd_idle_ltx_list &&
554 waitqueue_active (&ksocknal_data.ksnd_idle_ltx_waitq))
555 wake_up (&ksocknal_data.ksnd_idle_ltx_waitq);
557 spin_unlock_irqrestore (&ksocknal_data.ksnd_idle_ltx_lock, flags);
562 ksocknal_process_transmit (ksock_sched_t *sched, long *irq_flags)
568 LASSERT (!list_empty (&sched->kss_tx_conns));
569 conn = list_entry(sched->kss_tx_conns.next, ksock_conn_t, ksnc_tx_list);
570 list_del (&conn->ksnc_tx_list);
572 LASSERT (conn->ksnc_tx_scheduled);
573 LASSERT (conn->ksnc_tx_ready);
574 LASSERT (!list_empty (&conn->ksnc_tx_queue));
575 tx = list_entry (conn->ksnc_tx_queue.next, ksock_tx_t, tx_list);
576 /* assume transmit will complete now, so dequeue while I've got lock */
577 list_del (&tx->tx_list);
579 spin_unlock_irqrestore (&sched->kss_lock, *irq_flags);
581 LASSERT (tx->tx_nob > 0);
583 conn->ksnc_tx_ready = 0;/* write_space may race with me and set ready */
584 mb(); /* => clear BEFORE trying to write */
586 rc = ksocknal_sendmsg (conn->ksnc_sock, tx,
587 !list_empty (&conn->ksnc_tx_queue)); /* more to come? */
589 CDEBUG (D_NET, "send(%d) %d\n", tx->tx_nob, rc);
592 #warning FIXME: handle socket errors properly
593 CERROR("Error socknal send(%d) %p: %d\n", tx->tx_nob, conn, rc);
594 /* kid on for now the whole packet went.
595 * NB when we handle the error better, we'll still need to
596 * block for zccd completion.
601 if (tx->tx_nob == 0) /* nothing left to send */
603 /* everything went; assume more can go, so prevent write_space locking */
604 conn->ksnc_tx_ready = 1;
606 ksocknal_put_conn (conn); /* release packet's ref */
607 atomic_inc (&ksocknal_packets_being_sent);
609 if (atomic_read (&tx->tx_zccd.zccd_count) != 1) {
610 /* zccd skbufs are still in-flight. Release my
611 * initial ref on zccd, so callback can occur */
612 zccd_put (&tx->tx_zccd);
615 ksocknal_tx_done (tx);
617 spin_lock_irqsave (&sched->kss_lock, *irq_flags);
619 spin_lock_irqsave (&sched->kss_lock, *irq_flags);
621 /* back onto HEAD of tx_queue */
622 list_add (&tx->tx_list, &conn->ksnc_tx_queue);
625 if (!conn->ksnc_tx_ready || /* no space to write now */
626 list_empty (&conn->ksnc_tx_queue)) {/* nothing to write */
627 conn->ksnc_tx_scheduled = 0; /* not being scheduled */
628 ksocknal_put_conn (conn); /* release scheduler's ref */
629 } else /* let scheduler call me again */
630 list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns);
634 ksocknal_launch_packet (ksock_conn_t *conn, ksock_tx_t *tx)
637 ksock_sched_t *sched = conn->ksnc_scheduler;
639 /* Ensure the frags we've been given EXACTLY match the number of
640 * bytes we want to send. Many TCP/IP stacks disregard any total
641 * size parameters passed to them and just look at the frags.
643 * We always expect at least 1 mapped fragment containing the
644 * complete portals header.
646 LASSERT (lib_iov_nob (tx->tx_niov, tx->tx_iov) +
647 lib_kiov_nob (tx->tx_nkiov, tx->tx_kiov) == tx->tx_nob);
648 LASSERT (tx->tx_niov >= 1);
649 LASSERT (tx->tx_iov[0].iov_len >= sizeof (ptl_hdr_t));
651 CDEBUG (D_NET, "type %d, nob %d niov %d nkiov %d\n",
652 ((ptl_hdr_t *)tx->tx_iov[0].iov_base)->type, tx->tx_nob,
653 tx->tx_niov, tx->tx_nkiov);
656 zccd_init (&tx->tx_zccd, ksocknal_zc_callback);
657 /* NB this sets 1 ref on zccd, so the callback can only occur
658 * after I've released this ref */
659 tx->tx_sched = sched;
661 spin_lock_irqsave (&sched->kss_lock, flags);
663 list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
665 if (conn->ksnc_tx_ready && /* able to send */
666 !conn->ksnc_tx_scheduled) { /* not scheduled to send */
667 list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns);
668 conn->ksnc_tx_scheduled = 1;
669 atomic_inc (&conn->ksnc_refcount); /* extra ref for scheduler */
670 if (waitqueue_active (&sched->kss_waitq))
671 wake_up (&sched->kss_waitq);
674 spin_unlock_irqrestore (&sched->kss_lock, flags);
676 atomic_inc (&ksocknal_packets_launched);
680 ksocknal_send_target (ptl_nid_t nid)
682 ptl_nid_t gatewaynid;
686 if ((conn = ksocknal_get_conn (nid)) == NULL) {
687 /* It's not a peer; try to find a gateway */
688 rc = kpr_lookup (&ksocknal_data.ksnd_router, nid, &gatewaynid);
690 CERROR("Can't route to "LPX64": router error %d\n",
695 if ((conn = ksocknal_get_conn (gatewaynid)) == NULL) {
696 CERROR ("Can't route to "LPX64": gateway "LPX64
697 " is not a peer\n", nid, gatewaynid);
706 ksocknal_setup_hdr (nal_cb_t *nal, void *private, lib_msg_t *cookie,
707 ptl_hdr_t *hdr, int type)
711 /* I may not block for a transmit descriptor if I might block the
712 * receiver, or an interrupt handler. */
713 ltx = ksocknal_get_ltx (!(type == PTL_MSG_ACK ||
714 type == PTL_MSG_REPLY ||
717 CERROR ("Can't allocate tx desc\n");
721 /* Init local send packet (storage for hdr, finalize() args) */
723 ltx->ltx_private = private;
724 ltx->ltx_cookie = cookie;
726 /* Init common ltx_tx */
727 ltx->ltx_tx.tx_isfwd = 0;
728 ltx->ltx_tx.tx_nob = sizeof (*hdr);
730 /* We always have 1 mapped frag for the header */
731 ltx->ltx_tx.tx_niov = 1;
732 ltx->ltx_tx.tx_iov = <x->ltx_iov_space.hdr;
733 ltx->ltx_tx.tx_iov[0].iov_base = <x->ltx_hdr;
734 ltx->ltx_tx.tx_iov[0].iov_len = sizeof (ltx->ltx_hdr);
736 ltx->ltx_tx.tx_kiov = NULL;
737 ltx->ltx_tx.tx_nkiov = 0;
743 ksocknal_send (nal_cb_t *nal, void *private, lib_msg_t *cookie,
744 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
745 unsigned int payload_niov, struct iovec *payload_iov,
751 /* NB 'private' is different depending on what we're sending.
752 * Just ignore it until we can rely on it
754 * Also, the return code from this procedure is ignored.
755 * If we can't send, we must still complete with lib_finalize().
756 * We'll have to wait for 3.2 to return an error event.
760 "sending "LPSZ" bytes in %d mapped frags to nid: "LPX64
761 " pid %d\n", payload_len, payload_niov, nid, pid);
763 conn = ksocknal_send_target (nid);
765 lib_finalize (&ksocknal_lib, private, cookie);
769 ltx = ksocknal_setup_hdr (nal, private, cookie, hdr, type);
771 ksocknal_put_conn (conn);
772 lib_finalize (&ksocknal_lib, private, cookie);
776 /* append the payload_iovs to the one pointing at the header */
777 LASSERT (ltx->ltx_tx.tx_niov == 1 && ltx->ltx_tx.tx_nkiov == 0);
778 LASSERT (payload_niov <= PTL_MD_MAX_IOV);
780 memcpy (ltx->ltx_tx.tx_iov + 1, payload_iov,
781 payload_niov * sizeof (*payload_iov));
782 ltx->ltx_tx.tx_niov = 1 + payload_niov;
783 ltx->ltx_tx.tx_nob = sizeof (*hdr) + payload_len;
785 ksocknal_launch_packet (conn, <x->ltx_tx);
790 ksocknal_send_pages (nal_cb_t *nal, void *private, lib_msg_t *cookie,
791 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
792 unsigned int payload_niov, ptl_kiov_t *payload_iov, size_t payload_len)
797 /* NB 'private' is different depending on what we're sending.
798 * Just ignore it until we can rely on it */
801 "sending "LPSZ" bytes in %d mapped frags to nid: "LPX64" pid %d\n",
802 payload_len, payload_niov, nid, pid);
804 conn = ksocknal_send_target (nid);
808 ltx = ksocknal_setup_hdr (nal, private, cookie, hdr, type);
810 ksocknal_put_conn (conn);
814 LASSERT (ltx->ltx_tx.tx_niov == 1 && ltx->ltx_tx.tx_nkiov == 0);
815 LASSERT (payload_niov <= PTL_MD_MAX_IOV);
817 ltx->ltx_tx.tx_kiov = ltx->ltx_iov_space.payload.kiov;
818 memcpy (ltx->ltx_tx.tx_kiov, payload_iov,
819 payload_niov * sizeof (*payload_iov));
820 ltx->ltx_tx.tx_nkiov = payload_niov;
821 ltx->ltx_tx.tx_nob = sizeof (*hdr) + payload_len;
823 ksocknal_launch_packet (conn, <x->ltx_tx);
828 ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
831 ptl_nid_t nid = fwd->kprfd_gateway_nid;
832 ksock_tx_t *tx = (ksock_tx_t *)&fwd->kprfd_scratch;
834 CDEBUG (D_NET, "Forwarding [%p] -> "LPX64" ("LPX64"))\n", fwd,
835 fwd->kprfd_gateway_nid, fwd->kprfd_target_nid);
837 /* I'm the gateway; must be the last hop */
838 if (nid == ksocknal_lib.ni.nid)
839 nid = fwd->kprfd_target_nid;
841 conn = ksocknal_get_conn (nid);
843 CERROR ("[%p] fwd to "LPX64" isn't a peer\n", fwd, nid);
844 kpr_fwd_done (&ksocknal_data.ksnd_router, fwd, -EHOSTUNREACH);
848 /* This forward has now got a ref on conn */
850 tx->tx_isfwd = 1; /* This is a forwarding packet */
851 tx->tx_nob = fwd->kprfd_nob;
852 tx->tx_niov = fwd->kprfd_niov;
853 tx->tx_iov = fwd->kprfd_iov;
857 ksocknal_launch_packet (conn, tx);
861 ksocknal_thread_start (int (*fn)(void *arg), void *arg)
863 long pid = kernel_thread (fn, arg, 0);
868 atomic_inc (&ksocknal_data.ksnd_nthreads);
873 ksocknal_thread_fini (void)
875 atomic_dec (&ksocknal_data.ksnd_nthreads);
879 ksocknal_fmb_callback (void *arg, int error)
881 ksock_fmb_t *fmb = (ksock_fmb_t *)arg;
882 ksock_fmb_pool_t *fmp = fmb->fmb_pool;
883 ptl_hdr_t *hdr = (ptl_hdr_t *) page_address(fmb->fmb_pages[0]);
884 ksock_conn_t *conn = NULL;
885 ksock_sched_t *sched;
889 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
890 NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),
893 CDEBUG (D_NET, "routed packet from "LPX64" to "LPX64": OK\n",
894 NTOH__u64 (hdr->src_nid), NTOH__u64 (hdr->dest_nid));
896 spin_lock_irqsave (&fmp->fmp_lock, flags);
898 list_add (&fmb->fmb_list, &fmp->fmp_idle_fmbs);
900 if (!list_empty (&fmp->fmp_blocked_conns)) {
901 conn = list_entry (fmb->fmb_pool->fmp_blocked_conns.next,
902 ksock_conn_t, ksnc_rx_list);
903 list_del (&conn->ksnc_rx_list);
906 spin_unlock_irqrestore (&fmp->fmp_lock, flags);
911 CDEBUG (D_NET, "Scheduling conn %p\n", conn);
912 LASSERT (conn->ksnc_rx_scheduled);
913 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP);
915 conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;
917 sched = conn->ksnc_scheduler;
919 spin_lock_irqsave (&sched->kss_lock, flags);
921 list_add_tail (&conn->ksnc_rx_list, &sched->kss_rx_conns);
923 if (waitqueue_active (&sched->kss_waitq))
924 wake_up (&sched->kss_waitq);
926 spin_unlock_irqrestore (&sched->kss_lock, flags);
930 ksocknal_get_idle_fmb (ksock_conn_t *conn)
932 int payload_nob = conn->ksnc_rx_nob_left;
933 int packet_nob = sizeof (ptl_hdr_t) + payload_nob;
935 ksock_fmb_pool_t *pool;
938 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
939 LASSERT (ksocknal_data.ksnd_fmbs != NULL);
941 if (packet_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE)
942 pool = &ksocknal_data.ksnd_small_fmp;
944 pool = &ksocknal_data.ksnd_large_fmp;
946 spin_lock_irqsave (&pool->fmp_lock, flags);
948 if (!list_empty (&pool->fmp_idle_fmbs)) {
949 fmb = list_entry(pool->fmp_idle_fmbs.next,
950 ksock_fmb_t, fmb_list);
951 list_del (&fmb->fmb_list);
952 spin_unlock_irqrestore (&pool->fmp_lock, flags);
957 /* deschedule until fmb free */
959 conn->ksnc_rx_state = SOCKNAL_RX_FMB_SLEEP;
961 list_add_tail (&conn->ksnc_rx_list,
962 &pool->fmp_blocked_conns);
964 spin_unlock_irqrestore (&pool->fmp_lock, flags);
970 ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
972 int payload_nob = conn->ksnc_rx_nob_left;
973 int packet_nob = sizeof (ptl_hdr_t) + payload_nob;
974 ptl_nid_t dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
975 int niov; /* at least the header */
978 LASSERT (conn->ksnc_rx_scheduled);
979 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
980 LASSERT (conn->ksnc_rx_nob_wanted == conn->ksnc_rx_nob_left);
981 LASSERT (payload_nob >= 0);
982 LASSERT (packet_nob <= fmb->fmb_npages * PAGE_SIZE);
983 LASSERT (sizeof (ptl_hdr_t) < PAGE_SIZE);
985 /* Got a forwarding buffer; copy the header we just read into the
986 * forwarding buffer. If there's payload start reading reading it
987 * into the buffer, otherwise the forwarding buffer can be kicked
990 * NB fmb->fmb_iov spans the WHOLE packet.
991 * conn->ksnc_rx_iov spans just the payload.
994 fmb->fmb_iov[0].iov_base = page_address (fmb->fmb_pages[0]);
997 memcpy (fmb->fmb_iov[0].iov_base, &conn->ksnc_hdr, sizeof (ptl_hdr_t));
999 if (payload_nob == 0) { /* got complete packet already */
1000 atomic_inc (&ksocknal_packets_received);
1002 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (immediate)\n",
1003 conn, NTOH__u64 (conn->ksnc_hdr.src_nid),
1004 dest_nid, packet_nob);
1006 fmb->fmb_iov[0].iov_len = sizeof (ptl_hdr_t);
1008 kpr_fwd_init (&fmb->fmb_fwd, dest_nid,
1009 packet_nob, 1, fmb->fmb_iov,
1010 ksocknal_fmb_callback, fmb);
1012 /* forward it now */
1013 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
1015 ksocknal_new_packet (conn, 0); /* on to next packet */
1020 if (packet_nob <= PAGE_SIZE) { /* whole packet fits in first page */
1021 fmb->fmb_iov[0].iov_len = packet_nob;
1023 fmb->fmb_iov[0].iov_len = PAGE_SIZE;
1024 nob = packet_nob - PAGE_SIZE;
1027 LASSERT (niov < fmb->fmb_npages);
1028 fmb->fmb_iov[niov].iov_base =
1029 page_address (fmb->fmb_pages[niov]);
1030 fmb->fmb_iov[niov].iov_len = MIN (PAGE_SIZE, nob);
1036 kpr_fwd_init (&fmb->fmb_fwd, dest_nid,
1037 packet_nob, niov, fmb->fmb_iov,
1038 ksocknal_fmb_callback, fmb);
1040 /* stash router's descriptor ready for call to kpr_fwd_start */
1041 conn->ksnc_cookie = &fmb->fmb_fwd;
1043 conn->ksnc_rx_state = SOCKNAL_RX_BODY_FWD; /* read in the payload */
1045 /* payload is desc's iov-ed buffer, but skipping the hdr */
1046 LASSERT (niov <= sizeof (conn->ksnc_rx_iov_space) /
1047 sizeof (struct iovec));
1049 conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1050 conn->ksnc_rx_iov[0].iov_base =
1051 (void *)(((unsigned long)fmb->fmb_iov[0].iov_base) +
1052 sizeof (ptl_hdr_t));
1053 conn->ksnc_rx_iov[0].iov_len =
1054 fmb->fmb_iov[0].iov_len - sizeof (ptl_hdr_t);
1057 memcpy(&conn->ksnc_rx_iov[1], &fmb->fmb_iov[1],
1058 (niov - 1) * sizeof (struct iovec));
1060 conn->ksnc_rx_niov = niov;
1062 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d reading body\n", conn,
1063 NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid, payload_nob);
1068 ksocknal_fwd_parse (ksock_conn_t *conn)
1070 ksock_conn_t *conn2;
1071 ptl_nid_t dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
1072 int body_len = NTOH__u32 (PTL_HDR_LENGTH(&conn->ksnc_hdr));
1074 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d parsing header\n", conn,
1075 NTOH__u64 (conn->ksnc_hdr.src_nid),
1076 dest_nid, conn->ksnc_rx_nob_left);
1078 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER);
1079 LASSERT (conn->ksnc_rx_scheduled);
1081 if (body_len < 0) { /* length corrupt (overflow) */
1082 CERROR("dropping packet from "LPX64" for "LPX64": packet "
1083 "size %d illegal\n", NTOH__u64 (conn->ksnc_hdr.src_nid),
1084 dest_nid, body_len);
1085 ksocknal_new_packet (conn, 0); /* on to new packet */
1089 if (ksocknal_data.ksnd_fmbs == NULL) { /* not forwarding */
1090 CERROR("dropping packet from "LPX64" for "LPX64": not "
1091 "forwarding\n", conn->ksnc_hdr.src_nid,
1092 conn->ksnc_hdr.dest_nid);
1093 /* on to new packet (skip this one's body) */
1094 ksocknal_new_packet (conn, body_len);
1098 if (body_len > SOCKNAL_MAX_FWD_PAYLOAD) { /* too big to forward */
1099 CERROR ("dropping packet from "LPX64" for "LPX64
1100 ": packet size %d too big\n", conn->ksnc_hdr.src_nid,
1101 conn->ksnc_hdr.dest_nid, body_len);
1102 /* on to new packet (skip this one's body) */
1103 ksocknal_new_packet (conn, body_len);
1107 /* should have gone direct */
1108 conn2 = ksocknal_get_conn (conn->ksnc_hdr.dest_nid);
1109 if (conn2 != NULL) {
1110 CERROR ("dropping packet from "LPX64" for "LPX64
1111 ": target is a peer\n", conn->ksnc_hdr.src_nid,
1112 conn->ksnc_hdr.dest_nid);
1113 ksocknal_put_conn (conn2); /* drop ref from get above */
1115 /* on to next packet (skip this one's body) */
1116 ksocknal_new_packet (conn, body_len);
1120 conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB; /* Getting FMB now */
1121 conn->ksnc_rx_nob_left = body_len; /* stash packet size */
1122 conn->ksnc_rx_nob_wanted = body_len; /* (no slop) */
1126 ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip)
1128 static char ksocknal_slop_buffer[4096];
1134 if (nob_to_skip == 0) { /* right at next packet boundary now */
1135 conn->ksnc_rx_state = SOCKNAL_RX_HEADER;
1136 conn->ksnc_rx_nob_wanted = sizeof (ptl_hdr_t);
1137 conn->ksnc_rx_nob_left = sizeof (ptl_hdr_t);
1139 conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1140 conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_hdr;
1141 conn->ksnc_rx_iov[0].iov_len = sizeof (ptl_hdr_t);
1142 conn->ksnc_rx_niov = 1;
1144 conn->ksnc_rx_kiov = NULL;
1145 conn->ksnc_rx_nkiov = 0;
1149 /* Set up to skip as much a possible now. If there's more left
1150 * (ran out of iov entries) we'll get called again */
1152 conn->ksnc_rx_state = SOCKNAL_RX_SLOP;
1153 conn->ksnc_rx_nob_left = nob_to_skip;
1154 conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1159 nob = MIN (nob_to_skip, sizeof (ksocknal_slop_buffer));
1161 conn->ksnc_rx_iov[niov].iov_base = ksocknal_slop_buffer;
1162 conn->ksnc_rx_iov[niov].iov_len = nob;
1167 } while (nob_to_skip != 0 && /* mustn't overflow conn's rx iov */
1168 niov < sizeof(conn->ksnc_rx_iov_space) / sizeof (struct iovec));
1170 conn->ksnc_rx_niov = niov;
1171 conn->ksnc_rx_kiov = NULL;
1172 conn->ksnc_rx_nkiov = 0;
1173 conn->ksnc_rx_nob_wanted = skipped;
1178 ksocknal_process_receive (ksock_sched_t *sched, long *irq_flags)
1184 /* NB: sched->ksnc_lock lock held */
1186 LASSERT (!list_empty (&sched->kss_rx_conns));
1187 conn = list_entry(sched->kss_rx_conns.next, ksock_conn_t, ksnc_rx_list);
1188 list_del (&conn->ksnc_rx_list);
1190 spin_unlock_irqrestore (&sched->kss_lock, *irq_flags);
1192 CDEBUG(D_NET, "sched %p conn %p\n", sched, conn);
1193 LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
1194 LASSERT (conn->ksnc_rx_scheduled);
1195 LASSERT (conn->ksnc_rx_ready);
1197 /* doesn't need a forwarding buffer */
1198 if (conn->ksnc_rx_state != SOCKNAL_RX_GET_FMB)
1202 fmb = ksocknal_get_idle_fmb (conn);
1203 if (fmb == NULL) { /* conn descheduled waiting for idle fmb */
1204 spin_lock_irqsave (&sched->kss_lock, *irq_flags);
1208 if (ksocknal_init_fmb (conn, fmb)) /* packet forwarded ? */
1209 goto out; /* come back later for next packet */
1212 /* NB: sched lock NOT held */
1213 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER ||
1214 conn->ksnc_rx_state == SOCKNAL_RX_BODY ||
1215 conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD ||
1216 conn->ksnc_rx_state == SOCKNAL_RX_SLOP);
1218 LASSERT (conn->ksnc_rx_nob_wanted > 0);
1220 conn->ksnc_rx_ready = 0;/* data ready may race with me and set ready */
1221 mb(); /* => clear BEFORE trying to read */
1223 rc = ksocknal_recvmsg(conn);
1228 #warning FIXME: handle socket errors properly
1229 CERROR ("Error socknal read %p: %d\n", conn, rc);
1233 if (conn->ksnc_rx_nob_wanted != 0) /* short read */
1234 goto out; /* try again later */
1236 /* got all I wanted, assume there's more - prevent data_ready locking */
1237 conn->ksnc_rx_ready = 1;
1239 switch (conn->ksnc_rx_state) {
1240 case SOCKNAL_RX_HEADER:
1241 /* It's not for me */
1242 if (conn->ksnc_hdr.type != PTL_MSG_HELLO &&
1243 NTOH__u64(conn->ksnc_hdr.dest_nid) != ksocknal_lib.ni.nid) {
1244 ksocknal_fwd_parse (conn);
1245 switch (conn->ksnc_rx_state) {
1246 case SOCKNAL_RX_HEADER: /* skipped (zero payload) */
1247 goto out; /* => come back later */
1248 case SOCKNAL_RX_SLOP: /* skipping packet's body */
1249 goto try_read; /* => go read it */
1250 case SOCKNAL_RX_GET_FMB: /* forwarding */
1251 goto get_fmb; /* => go get a fwd msg buffer */
1258 PROF_START(lib_parse);
1259 /* sets wanted_len, iovs etc */
1260 lib_parse(&ksocknal_lib, &conn->ksnc_hdr, conn);
1261 PROF_FINISH(lib_parse);
1263 if (conn->ksnc_rx_nob_wanted != 0) { /* need to get payload? */
1264 conn->ksnc_rx_state = SOCKNAL_RX_BODY;
1265 goto try_read; /* go read the payload */
1267 /* Fall through (completed packet for me) */
1269 case SOCKNAL_RX_BODY:
1270 atomic_inc (&ksocknal_packets_received);
1271 /* packet is done now */
1272 lib_finalize(&ksocknal_lib, NULL, conn->ksnc_cookie);
1275 case SOCKNAL_RX_SLOP:
1276 /* starting new packet? */
1277 if (ksocknal_new_packet (conn, conn->ksnc_rx_nob_left))
1278 goto out; /* come back later */
1279 goto try_read; /* try to finish reading slop now */
1281 case SOCKNAL_RX_BODY_FWD:
1282 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (got body)\n",
1283 conn, NTOH__u64 (conn->ksnc_hdr.src_nid),
1284 NTOH__u64 (conn->ksnc_hdr.dest_nid),
1285 conn->ksnc_rx_nob_left);
1287 atomic_inc (&ksocknal_packets_received);
1289 /* ksocknal_init_fmb() put router desc. in conn->ksnc_cookie */
1290 kpr_fwd_start (&ksocknal_data.ksnd_router,
1291 (kpr_fwd_desc_t *)conn->ksnc_cookie);
1293 /* no slop in forwarded packets */
1294 LASSERT (conn->ksnc_rx_nob_left == 0);
1296 ksocknal_new_packet (conn, 0); /* on to next packet */
1297 goto out; /* (later) */
1306 spin_lock_irqsave (&sched->kss_lock, *irq_flags);
1308 /* no data there to read? */
1309 if (!conn->ksnc_rx_ready) {
1310 /* let socket callback schedule again */
1311 conn->ksnc_rx_scheduled = 0;
1312 ksocknal_put_conn (conn); /* release scheduler's ref */
1313 } else /* let scheduler call me again */
1314 list_add_tail (&conn->ksnc_rx_list, &sched->kss_rx_conns);
1318 ksocknal_recv (nal_cb_t *nal, void *private, lib_msg_t *msg,
1319 unsigned int niov, struct iovec *iov, size_t mlen, size_t rlen)
1321 ksock_conn_t *conn = (ksock_conn_t *)private;
1323 LASSERT (mlen <= rlen);
1324 LASSERT (niov <= PTL_MD_MAX_IOV);
1326 conn->ksnc_cookie = msg;
1327 conn->ksnc_rx_nob_wanted = mlen;
1328 conn->ksnc_rx_nob_left = rlen;
1330 conn->ksnc_rx_nkiov = 0;
1331 conn->ksnc_rx_kiov = NULL;
1332 conn->ksnc_rx_niov = niov;
1333 conn->ksnc_rx_iov = conn->ksnc_rx_iov_space.iov;
1334 memcpy (conn->ksnc_rx_iov, iov, niov * sizeof (*iov));
1337 lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1338 lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1344 ksocknal_recv_pages (nal_cb_t *nal, void *private, lib_msg_t *msg,
1345 unsigned int niov, ptl_kiov_t *kiov, size_t mlen, size_t rlen)
1347 ksock_conn_t *conn = (ksock_conn_t *)private;
1349 LASSERT (mlen <= rlen);
1350 LASSERT (niov <= PTL_MD_MAX_IOV);
1352 conn->ksnc_cookie = msg;
1353 conn->ksnc_rx_nob_wanted = mlen;
1354 conn->ksnc_rx_nob_left = rlen;
1356 conn->ksnc_rx_niov = 0;
1357 conn->ksnc_rx_iov = NULL;
1358 conn->ksnc_rx_nkiov = niov;
1359 conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
1360 memcpy (conn->ksnc_rx_kiov, kiov, niov * sizeof (*kiov));
1363 lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1364 lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1369 int ksocknal_scheduler (void *arg)
1371 ksock_sched_t *sched = (ksock_sched_t *)arg;
1372 unsigned long flags;
1375 int id = sched - ksocknal_data.ksnd_schedulers;
1377 #if (CONFIG_SMP && CPU_AFFINITY)
1378 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1379 int cpu = cpu_logical_map(id % num_online_cpus());
1381 #warning "Take care of architecure specific logical APIC map"
1382 int cpu = 1; /* Have to change later. */
1383 #endif /* LINUX_VERSION_CODE */
1385 set_cpus_allowed (current, 1 << cpu);
1387 #endif /* CONFIG_SMP && CPU_AFFINITY */
1389 snprintf (name, sizeof (name),"ksocknald[%d]", id);
1390 kportal_daemonize (name);
1391 kportal_blockallsigs ();
1393 spin_lock_irqsave (&sched->kss_lock, flags);
1395 while (!ksocknal_data.ksnd_shuttingdown) {
1396 int did_something = 0;
1398 /* Ensure I progress everything semi-fairly */
1400 if (!list_empty (&sched->kss_rx_conns)) {
1402 /* drops & regains kss_lock */
1403 ksocknal_process_receive (sched, &flags);
1406 if (!list_empty (&sched->kss_tx_conns)) {
1408 /* drops and regains kss_lock */
1409 ksocknal_process_transmit (sched, &flags);
1412 if (!list_empty (&sched->kss_zctxdone_list)) {
1414 list_entry(sched->kss_zctxdone_list.next,
1415 ksock_tx_t, tx_list);
1418 list_del (&tx->tx_list);
1419 spin_unlock_irqrestore (&sched->kss_lock, flags);
1421 ksocknal_tx_done (tx);
1423 spin_lock_irqsave (&sched->kss_lock, flags);
1426 if (!did_something || /* nothing to do */
1427 ++nloops == SOCKNAL_RESCHED) { /* hogging CPU? */
1428 spin_unlock_irqrestore (&sched->kss_lock, flags);
1432 if (!did_something) { /* wait for something to do */
1434 rc = wait_event_interruptible (sched->kss_waitq,
1435 ksocknal_data.ksnd_shuttingdown ||
1436 !list_empty(&sched->kss_rx_conns) ||
1437 !list_empty(&sched->kss_tx_conns) ||
1438 !list_empty(&sched->kss_zctxdone_list));
1440 rc = wait_event_interruptible (sched->kss_waitq,
1441 ksocknal_data.ksnd_shuttingdown ||
1442 !list_empty(&sched->kss_rx_conns) ||
1443 !list_empty(&sched->kss_tx_conns));
1449 spin_lock_irqsave (&sched->kss_lock, flags);
1453 spin_unlock_irqrestore (&sched->kss_lock, flags);
1454 ksocknal_thread_fini ();
1459 ksocknal_data_ready (struct sock *sk, int n)
1461 unsigned long flags;
1463 ksock_sched_t *sched;
1466 /* interleave correctly with closing sockets... */
1467 read_lock (&ksocknal_data.ksnd_socklist_lock);
1469 conn = sk->user_data;
1470 if (conn == NULL) { /* raced with ksocknal_close_sock */
1471 LASSERT (sk->data_ready != &ksocknal_data_ready);
1472 sk->data_ready (sk, n);
1473 } else if (!conn->ksnc_rx_ready) { /* new news */
1474 /* Set ASAP in case of concurrent calls to me */
1475 conn->ksnc_rx_ready = 1;
1477 sched = conn->ksnc_scheduler;
1479 spin_lock_irqsave (&sched->kss_lock, flags);
1481 /* Set again (process_receive may have cleared while I blocked for the lock) */
1482 conn->ksnc_rx_ready = 1;
1484 if (!conn->ksnc_rx_scheduled) { /* not being progressed */
1485 list_add_tail(&conn->ksnc_rx_list,
1486 &sched->kss_rx_conns);
1487 conn->ksnc_rx_scheduled = 1;
1488 /* extra ref for scheduler */
1489 atomic_inc (&conn->ksnc_refcount);
1491 if (waitqueue_active (&sched->kss_waitq))
1492 wake_up (&sched->kss_waitq);
1495 spin_unlock_irqrestore (&sched->kss_lock, flags);
1498 read_unlock (&ksocknal_data.ksnd_socklist_lock);
1504 ksocknal_write_space (struct sock *sk)
1506 unsigned long flags;
1508 ksock_sched_t *sched;
1510 /* interleave correctly with closing sockets... */
1511 read_lock (&ksocknal_data.ksnd_socklist_lock);
1513 conn = sk->user_data;
1515 CDEBUG(D_NET, "sk %p wspace %d low water %d conn %p%s%s%s\n",
1516 sk, tcp_wspace(sk), SOCKNAL_TX_LOW_WATER(sk), conn,
1517 (conn == NULL) ? "" : (test_bit (0, &conn->ksnc_tx_ready) ?
1518 " ready" : " blocked"),
1519 (conn == NULL) ? "" : (conn->ksnc_tx_scheduled ?
1520 " scheduled" : " idle"),
1521 (conn == NULL) ? "" : (list_empty (&conn->ksnc_tx_queue) ?
1522 " empty" : " queued"));
1524 if (conn == NULL) { /* raced with ksocknal_close_sock */
1525 LASSERT (sk->write_space != &ksocknal_write_space);
1526 sk->write_space (sk);
1527 } else if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */
1528 clear_bit (SOCK_NOSPACE, &sk->socket->flags);
1530 if (!conn->ksnc_tx_ready) { /* new news */
1531 /* Set ASAP in case of concurrent calls to me */
1532 conn->ksnc_tx_ready = 1;
1534 sched = conn->ksnc_scheduler;
1536 spin_lock_irqsave (&sched->kss_lock, flags);
1538 /* Set again (process_transmit may have
1539 cleared while I blocked for the lock) */
1540 conn->ksnc_tx_ready = 1;
1542 if (!conn->ksnc_tx_scheduled && // not being progressed
1543 !list_empty(&conn->ksnc_tx_queue)){//packets to send
1544 list_add_tail (&conn->ksnc_tx_list,
1545 &sched->kss_tx_conns);
1546 conn->ksnc_tx_scheduled = 1;
1547 /* extra ref for scheduler */
1548 atomic_inc (&conn->ksnc_refcount);
1550 if (waitqueue_active (&sched->kss_waitq))
1551 wake_up (&sched->kss_waitq);
1554 spin_unlock_irqrestore (&sched->kss_lock, flags);
1558 read_unlock (&ksocknal_data.ksnd_socklist_lock);
1562 ksocknal_reaper (void *arg)
1564 unsigned long flags;
1568 kportal_daemonize ("ksocknal_reaper");
1569 kportal_blockallsigs ();
1571 while (!ksocknal_data.ksnd_shuttingdown) {
1572 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
1574 if (list_empty (&ksocknal_data.ksnd_reaper_list)) {
1577 conn = list_entry (ksocknal_data.ksnd_reaper_list.next,
1578 ksock_conn_t, ksnc_list);
1579 list_del (&conn->ksnc_list);
1582 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
1585 ksocknal_close_conn (conn);
1587 rc = wait_event_interruptible (ksocknal_data.ksnd_reaper_waitq,
1588 ksocknal_data.ksnd_shuttingdown ||
1589 !list_empty(&ksocknal_data.ksnd_reaper_list));
1594 ksocknal_thread_fini ();
1598 nal_cb_t ksocknal_lib = {
1599 nal_data: &ksocknal_data, /* NAL private data */
1600 cb_send: ksocknal_send,
1601 cb_send_pages: ksocknal_send_pages,
1602 cb_recv: ksocknal_recv,
1603 cb_recv_pages: ksocknal_recv_pages,
1604 cb_read: ksocknal_read,
1605 cb_write: ksocknal_write,
1606 cb_callback: ksocknal_callback,
1607 cb_malloc: ksocknal_malloc,
1608 cb_free: ksocknal_free,
1609 cb_printf: ksocknal_printf,
1610 cb_cli: ksocknal_cli,
1611 cb_sti: ksocknal_sti,
1612 cb_dist: ksocknal_dist