Whamcloud - gitweb
Land b_smallfix onto HEAD (20040414_1359)
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd_cb.c
index 0678d41..21e0abe 100644 (file)
@@ -24,6 +24,9 @@
  */
 
 #include "socknal.h"
+#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
+# include <linux/syscalls.h>
+#endif
 
 /*
  *  LIB functions follow
@@ -90,6 +93,8 @@ ksocknal_cli(nal_cb_t *nal, unsigned long *flags)
 {
         ksock_nal_data_t *data = nal->nal_data;
 
+        /* OK to ignore 'flags'; we're only ever serialise threads and
+         * never need to lock out interrupts */
         spin_lock(&data->ksnd_nal_cb_lock);
 }
 
@@ -99,9 +104,23 @@ ksocknal_sti(nal_cb_t *nal, unsigned long *flags)
         ksock_nal_data_t *data;
         data = nal->nal_data;
 
+        /* OK to ignore 'flags'; we're only ever serialise threads and
+         * never need to lock out interrupts */
         spin_unlock(&data->ksnd_nal_cb_lock);
 }
 
+void
+ksocknal_callback(nal_cb_t *nal, void *private, lib_eq_t *eq, ptl_event_t *ev)
+{
+        /* holding ksnd_nal_cb_lock */
+
+        if (eq->event_callback != NULL)
+                eq->event_callback(ev);
+        
+        if (waitqueue_active(&ksocknal_data.ksnd_yield_waitq))
+                wake_up_all(&ksocknal_data.ksnd_yield_waitq);
+}
+
 int
 ksocknal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
 {
@@ -123,7 +142,7 @@ ksocknal_free_ltx (ksock_ltx_t *ltx)
         PORTAL_FREE(ltx, ltx->ltx_desc_size);
 }
 
-#if SOCKNAL_ZC
+#if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
 struct page *
 ksocknal_kvaddr_to_page (unsigned long vaddr)
 {
@@ -159,7 +178,7 @@ ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
         int            more = (tx->tx_niov > 1) || 
                               (tx->tx_nkiov > 0) ||
                               (!list_empty (&conn->ksnc_tx_queue));
-#if SOCKNAL_ZC
+#if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
         int            offset = vaddr & (PAGE_SIZE - 1);
         int            zcsize = MIN (fragsize, PAGE_SIZE - offset);
         struct page   *page;
@@ -171,7 +190,7 @@ ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
         LASSERT (fragsize <= tx->tx_resid);
         LASSERT (tx->tx_niov > 0);
         
-#if SOCKNAL_ZC
+#if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
         if (zcsize >= ksocknal_data.ksnd_zc_min_frag &&
             (sock->sk->route_caps & NETIF_F_SG) &&
             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
@@ -246,7 +265,7 @@ ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
         LASSERT (tx->tx_nkiov > 0);
 
 #if SOCKNAL_ZC
-        if (fragsize >= ksocknal_data.ksnd_zc_min_frag &&
+        if (fragsize >= ksocknal_tunables.ksnd_zc_min_frag &&
             (sock->sk->route_caps & NETIF_F_SG) &&
             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM))) {
 
@@ -365,7 +384,7 @@ ksocknal_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
                  * is set.  Instead, we presume peer death has occurred if
                  * the socket doesn't drain within a timout */
                 conn->ksnc_tx_deadline = jiffies + 
-                                         ksocknal_data.ksnd_io_timeout * HZ;
+                                         ksocknal_tunables.ksnd_io_timeout * HZ;
                 conn->ksnc_peer->ksnp_last_alive = jiffies;
 
         } while (tx->tx_resid != 0);
@@ -428,7 +447,7 @@ ksocknal_recv_iov (ksock_conn_t *conn)
         /* received something... */
         conn->ksnc_peer->ksnp_last_alive = jiffies;
         conn->ksnc_rx_deadline = jiffies + 
-                                 ksocknal_data.ksnd_io_timeout * HZ;
+                                 ksocknal_tunables.ksnd_io_timeout * HZ;
         mb();                           /* order with setting rx_started */
         conn->ksnc_rx_started = 1;
 
@@ -487,7 +506,7 @@ ksocknal_recv_kiov (ksock_conn_t *conn)
         /* received something... */
         conn->ksnc_peer->ksnp_last_alive = jiffies;
         conn->ksnc_rx_deadline = jiffies + 
-                                 ksocknal_data.ksnd_io_timeout * HZ;
+                                 ksocknal_tunables.ksnd_io_timeout * HZ;
         mb();                           /* order with setting rx_started */
         conn->ksnc_rx_started = 1;
 
@@ -546,7 +565,7 @@ ksocknal_receive (ksock_conn_t *conn)
 
                 if (conn->ksnc_rx_nob_wanted == 0) {
                         /* Completed a message segment (header or payload) */
-                        if ((ksocknal_data.ksnd_eager_ack & conn->ksnc_type) != 0 &&
+                        if ((ksocknal_tunables.ksnd_eager_ack & conn->ksnc_type) != 0 &&
                             (conn->ksnc_rx_state ==  SOCKNAL_RX_BODY ||
                              conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD)) {
                                 /* Remind the socket to ack eagerly... */
@@ -707,7 +726,7 @@ ksocknal_launch_autoconnect_locked (ksock_route_t *route)
         LASSERT ((route->ksnr_connected & KSNR_TYPED_ROUTES) != KSNR_TYPED_ROUTES);
         LASSERT (!route->ksnr_connecting);
         
-        if (ksocknal_data.ksnd_typed_conns)
+        if (ksocknal_tunables.ksnd_typed_conns)
                 route->ksnr_connecting = 
                         KSNR_TYPED_ROUTES & ~route->ksnr_connected;
         else
@@ -760,18 +779,19 @@ ksocknal_find_target_peer_locked (ksock_tx_t *tx, ptl_nid_t nid)
 }
 
 ksock_conn_t *
-ksocknal_find_conn_locked (ksock_tx_t *tx, ksock_peer_t *peer) 
+ksocknal_find_conn_locked (ksock_tx_t *tx, ksock_peer_t *peer)
 {
         struct list_head *tmp;
         ksock_conn_t     *typed = NULL;
         int               tnob  = 0;
         ksock_conn_t     *fallback = NULL;
         int               fnob     = 0;
-        
+
         /* Find the conn with the shortest tx queue */
         list_for_each (tmp, &peer->ksnp_conns) {
                 ksock_conn_t *c = list_entry(tmp, ksock_conn_t, ksnc_list);
-                int           nob = atomic_read(&c->ksnc_tx_nob);
+                int           nob = atomic_read(&c->ksnc_tx_nob) +
+                                        c->ksnc_sock->sk->sk_wmem_queued;
 
                 LASSERT (!c->ksnc_closing);
 
@@ -780,7 +800,7 @@ ksocknal_find_conn_locked (ksock_tx_t *tx, ksock_peer_t *peer)
                         fnob     = nob;
                 }
 
-                if (!ksocknal_data.ksnd_typed_conns)
+                if (!ksocknal_tunables.ksnd_typed_conns)
                         continue;
 
                 switch (c->ksnc_type) {
@@ -791,11 +811,11 @@ ksocknal_find_conn_locked (ksock_tx_t *tx, ksock_peer_t *peer)
                 case SOCKNAL_CONN_BULK_IN:
                         continue;
                 case SOCKNAL_CONN_BULK_OUT:
-                        if (tx->tx_nob < ksocknal_data.ksnd_min_bulk)
+                        if (tx->tx_nob < ksocknal_tunables.ksnd_min_bulk)
                                 continue;
                         break;
                 case SOCKNAL_CONN_CONTROL:
-                        if (tx->tx_nob >= ksocknal_data.ksnd_min_bulk)
+                        if (tx->tx_nob >= ksocknal_tunables.ksnd_min_bulk)
                                 continue;
                         break;
                 }
@@ -839,7 +859,7 @@ ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
         spin_lock_irqsave (&sched->kss_lock, flags);
 
         conn->ksnc_tx_deadline = jiffies + 
-                                 ksocknal_data.ksnd_io_timeout * HZ;
+                                 ksocknal_tunables.ksnd_io_timeout * HZ;
         mb();                                   /* order with list_add_tail */
 
         list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
@@ -1057,7 +1077,7 @@ ksocknal_sendmsg(nal_cb_t     *nal,
         if (ltx == NULL) {
                 CERROR("Can't allocate tx desc type %d size %d %s\n",
                        type, desc_size, in_interrupt() ? "(intr)" : "");
-                return (PTL_NOSPACE);
+                return (PTL_NO_SPACE);
         }
 
         atomic_inc(&ksocknal_data.ksnd_nactive_ltxs);
@@ -1132,7 +1152,7 @@ void
 ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
 {
         ptl_nid_t     nid = fwd->kprfd_gateway_nid;
-        ksock_tx_t   *tx  = (ksock_tx_t *)&fwd->kprfd_scratch;
+        ksock_ftx_t  *ftx = (ksock_ftx_t *)&fwd->kprfd_scratch;
         int           rc;
         
         CDEBUG (D_NET, "Forwarding [%p] -> "LPX64" ("LPX64"))\n", fwd,
@@ -1142,14 +1162,18 @@ ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
         if (nid == ksocknal_lib.ni.nid)
                 nid = fwd->kprfd_target_nid;
 
-        tx->tx_isfwd = 1;                   /* This is a forwarding packet */
-        tx->tx_nob   = fwd->kprfd_nob;
-        tx->tx_niov  = fwd->kprfd_niov;
-        tx->tx_iov   = fwd->kprfd_iov;
-        tx->tx_nkiov = 0;
-        tx->tx_kiov  = NULL;
+        /* setup iov for hdr */
+        ftx->ftx_iov.iov_base = fwd->kprfd_hdr;
+        ftx->ftx_iov.iov_len = sizeof(ptl_hdr_t);
+
+        ftx->ftx_tx.tx_isfwd = 1;                  /* This is a forwarding packet */
+        ftx->ftx_tx.tx_nob   = sizeof(ptl_hdr_t) + fwd->kprfd_nob;
+        ftx->ftx_tx.tx_niov  = 1;
+        ftx->ftx_tx.tx_iov   = &ftx->ftx_iov;
+        ftx->ftx_tx.tx_nkiov = fwd->kprfd_niov;
+        ftx->ftx_tx.tx_kiov  = fwd->kprfd_kiov;
 
-        rc = ksocknal_launch_packet (tx, nid);
+        rc = ksocknal_launch_packet (&ftx->ftx_tx, nid);
         if (rc != 0)
                 kpr_fwd_done (&ksocknal_data.ksnd_router, fwd, rc);
 }
@@ -1177,7 +1201,7 @@ ksocknal_fmb_callback (void *arg, int error)
 {
         ksock_fmb_t       *fmb = (ksock_fmb_t *)arg;
         ksock_fmb_pool_t  *fmp = fmb->fmb_pool;
-        ptl_hdr_t         *hdr = (ptl_hdr_t *) page_address(fmb->fmb_pages[0]);
+        ptl_hdr_t         *hdr = (ptl_hdr_t *)page_address(fmb->fmb_kiov[0].kiov_page);
         ksock_conn_t      *conn = NULL;
         ksock_sched_t     *sched;
         unsigned long      flags;
@@ -1235,7 +1259,6 @@ ksock_fmb_t *
 ksocknal_get_idle_fmb (ksock_conn_t *conn)
 {
         int               payload_nob = conn->ksnc_rx_nob_left;
-        int               packet_nob = sizeof (ptl_hdr_t) + payload_nob;
         unsigned long     flags;
         ksock_fmb_pool_t *pool;
         ksock_fmb_t      *fmb;
@@ -1243,7 +1266,7 @@ ksocknal_get_idle_fmb (ksock_conn_t *conn)
         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
         LASSERT (kpr_routing(&ksocknal_data.ksnd_router));
 
-        if (packet_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE)
+        if (payload_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE)
                 pool = &ksocknal_data.ksnd_small_fmp;
         else
                 pool = &ksocknal_data.ksnd_large_fmp;
@@ -1274,98 +1297,64 @@ ksocknal_get_idle_fmb (ksock_conn_t *conn)
 int
 ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
 {
-        int payload_nob = conn->ksnc_rx_nob_left;
-        int packet_nob = sizeof (ptl_hdr_t) + payload_nob;
+        int       payload_nob = conn->ksnc_rx_nob_left;
         ptl_nid_t dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
-        int niov;                               /* at least the header */
-        int nob;
+        int       niov = 0;
+        int       nob = payload_nob;
 
         LASSERT (conn->ksnc_rx_scheduled);
         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
         LASSERT (conn->ksnc_rx_nob_wanted == conn->ksnc_rx_nob_left);
         LASSERT (payload_nob >= 0);
-        LASSERT (packet_nob <= fmb->fmb_npages * PAGE_SIZE);
+        LASSERT (payload_nob <= fmb->fmb_pool->fmp_buff_pages * PAGE_SIZE);
         LASSERT (sizeof (ptl_hdr_t) < PAGE_SIZE);
-
-        /* Got a forwarding buffer; copy the header we just read into the
-         * forwarding buffer.  If there's payload, start reading reading it
-         * into the buffer, otherwise the forwarding buffer can be kicked
-         * off immediately.
-         *
-         * NB fmb->fmb_iov spans the WHOLE packet.
-         *    conn->ksnc_rx_iov spans just the payload.
-         */
-        fmb->fmb_iov[0].iov_base = page_address (fmb->fmb_pages[0]);
-
-        /* copy header */
-        memcpy (fmb->fmb_iov[0].iov_base, &conn->ksnc_hdr, sizeof (ptl_hdr_t));
+        LASSERT (fmb->fmb_kiov[0].kiov_offset == 0);
 
         /* Take a ref on the conn's peer to prevent module unload before
-         * forwarding completes.  NB we ref peer and not conn since because
-         * all refs on conn after it has been closed must remove themselves
-         * in finite time */
+         * forwarding completes. */
         fmb->fmb_peer = conn->ksnc_peer;
         atomic_inc (&conn->ksnc_peer->ksnp_refcount);
 
-        if (payload_nob == 0) {         /* got complete packet already */
-                CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (immediate)\n",
-                        conn, NTOH__u64 (conn->ksnc_hdr.src_nid),
-                        dest_nid, packet_nob);
+        /* Copy the header we just read into the forwarding buffer.  If
+         * there's payload, start reading reading it into the buffer,
+         * otherwise the forwarding buffer can be kicked off
+         * immediately. */
+        fmb->fmb_hdr = conn->ksnc_hdr;
 
-                fmb->fmb_iov[0].iov_len = sizeof (ptl_hdr_t);
+        while (nob > 0) {
+                LASSERT (niov < fmb->fmb_pool->fmp_buff_pages);
+                LASSERT (fmb->fmb_kiov[niov].kiov_offset == 0);
+                fmb->fmb_kiov[niov].kiov_len = MIN (PAGE_SIZE, nob);
+                nob -= PAGE_SIZE;
+                niov++;
+        }
+
+        kpr_fwd_init(&fmb->fmb_fwd, dest_nid, &fmb->fmb_hdr,
+                     payload_nob, niov, fmb->fmb_kiov,
+                     ksocknal_fmb_callback, fmb);
 
-                kpr_fwd_init (&fmb->fmb_fwd, dest_nid,
-                              packet_nob, 1, fmb->fmb_iov,
-                              ksocknal_fmb_callback, fmb);
+        if (payload_nob == 0) {         /* got complete packet already */
+                CDEBUG (D_NET, "%p "LPX64"->"LPX64" fwd_start (immediate)\n",
+                        conn, NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid);
 
-                /* forward it now */
                 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
 
                 ksocknal_new_packet (conn, 0);  /* on to next packet */
                 return (1);
         }
 
-        niov = 1;
-        if (packet_nob <= PAGE_SIZE) {  /* whole packet fits in first page */
-                fmb->fmb_iov[0].iov_len = packet_nob;
-        } else {
-                fmb->fmb_iov[0].iov_len = PAGE_SIZE;
-                nob = packet_nob - PAGE_SIZE;
-
-                do {
-                        LASSERT (niov < fmb->fmb_npages);
-                        fmb->fmb_iov[niov].iov_base =
-                                page_address (fmb->fmb_pages[niov]);
-                        fmb->fmb_iov[niov].iov_len = MIN (PAGE_SIZE, nob);
-                        nob -= PAGE_SIZE;
-                        niov++;
-                } while (nob > 0);
-        }
-
-        kpr_fwd_init (&fmb->fmb_fwd, dest_nid,
-                      packet_nob, niov, fmb->fmb_iov,
-                      ksocknal_fmb_callback, fmb);
-
         conn->ksnc_cookie = fmb;                /* stash fmb for later */
         conn->ksnc_rx_state = SOCKNAL_RX_BODY_FWD; /* read in the payload */
         
-        /* payload is desc's iov-ed buffer, but skipping the hdr */
-        LASSERT (niov <= sizeof (conn->ksnc_rx_iov_space) /
-                 sizeof (struct iovec));
-
-        conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
-        conn->ksnc_rx_iov[0].iov_base =
-                (void *)(((unsigned long)fmb->fmb_iov[0].iov_base) +
-                         sizeof (ptl_hdr_t));
-        conn->ksnc_rx_iov[0].iov_len =
-                fmb->fmb_iov[0].iov_len - sizeof (ptl_hdr_t);
-
-        if (niov > 1)
-                memcpy(&conn->ksnc_rx_iov[1], &fmb->fmb_iov[1],
-                       (niov - 1) * sizeof (struct iovec));
-
-        conn->ksnc_rx_niov = niov;
+        /* Set up conn->ksnc_rx_kiov to read the payload into fmb's kiov-ed
+         * buffer */
+        LASSERT (niov <= sizeof(conn->ksnc_rx_iov_space)/sizeof(ptl_kiov_t));
 
+        conn->ksnc_rx_niov = 0;
+        conn->ksnc_rx_nkiov = niov;
+        conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
+        memcpy(conn->ksnc_rx_kiov, fmb->fmb_kiov, niov * sizeof(ptl_kiov_t));
+        
         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d reading body\n", conn,
                 NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid, payload_nob);
         return (0);
@@ -2196,7 +2185,7 @@ ksocknal_setup_sock (struct socket *sock)
         /* Keepalives: If 3/4 of the timeout elapses, start probing every
          * second until the timeout elapses. */
 
-        option = (ksocknal_data.ksnd_io_timeout * 3) / 4;
+        option = (ksocknal_tunables.ksnd_io_timeout * 3) / 4;
         set_fs (KERNEL_DS);
         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPIDLE,
                                     (char *)&option, sizeof (option));
@@ -2216,7 +2205,7 @@ ksocknal_setup_sock (struct socket *sock)
                 return (rc);
         }
         
-        option = ksocknal_data.ksnd_io_timeout / 4;
+        option = ksocknal_tunables.ksnd_io_timeout / 4;
         set_fs (KERNEL_DS);
         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPCNT,
                                     (char *)&option, sizeof (option));
@@ -2273,7 +2262,7 @@ ksocknal_connect_peer (ksock_route_t *route, int type)
 
         /* Set the socket timeouts, so our connection attempt completes in
          * finite time */
-        tv.tv_sec = ksocknal_data.ksnd_io_timeout;
+        tv.tv_sec = ksocknal_tunables.ksnd_io_timeout;
         tv.tv_usec = 0;
 
         set_fs (KERNEL_DS);
@@ -2282,7 +2271,7 @@ ksocknal_connect_peer (ksock_route_t *route, int type)
         set_fs (oldmm);
         if (rc != 0) {
                 CERROR ("Can't set send timeout %d: %d\n", 
-                        ksocknal_data.ksnd_io_timeout, rc);
+                        ksocknal_tunables.ksnd_io_timeout, rc);
                 goto out;
         }
         
@@ -2292,7 +2281,7 @@ ksocknal_connect_peer (ksock_route_t *route, int type)
         set_fs (oldmm);
         if (rc != 0) {
                 CERROR ("Can't set receive timeout %d: %d\n",
-                        ksocknal_data.ksnd_io_timeout, rc);
+                        ksocknal_tunables.ksnd_io_timeout, rc);
                 goto out;
         }
 
@@ -2666,9 +2655,9 @@ ksocknal_reaper (void *arg)
                          * timeout on any connection within (n+1)/n times the
                          * timeout interval. */
 
-                        if (ksocknal_data.ksnd_io_timeout > n * p)
+                        if (ksocknal_tunables.ksnd_io_timeout > n * p)
                                 chunk = (chunk * n * p) / 
-                                        ksocknal_data.ksnd_io_timeout;
+                                        ksocknal_tunables.ksnd_io_timeout;
                         if (chunk == 0)
                                 chunk = 1;
 
@@ -2689,8 +2678,8 @@ ksocknal_reaper (void *arg)
                 }
                 ksocknal_data.ksnd_reaper_waketime = jiffies + timeout;
 
-                add_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
                 set_current_state (TASK_INTERRUPTIBLE);
+                add_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
 
                 if (!ksocknal_data.ksnd_shuttingdown &&
                     list_empty (&ksocknal_data.ksnd_deathrow_conns) &&
@@ -2722,5 +2711,6 @@ nal_cb_t ksocknal_lib = {
         cb_printf:       ksocknal_printf,
         cb_cli:          ksocknal_cli,
         cb_sti:          ksocknal_sti,
+        cb_callback:     ksocknal_callback,
         cb_dist:         ksocknal_dist
 };