Whamcloud - gitweb
Land b1_2 onto HEAD (20040304_171022)
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd_cb.c
index 985b432..c89e20e 100644 (file)
@@ -29,7 +29,7 @@
  *  LIB functions follow
  *
  */
-int
+ptl_err_t
 ksocknal_read(nal_cb_t *nal, void *private, void *dst_addr,
               user_ptr src_addr, size_t len)
 {
@@ -37,10 +37,10 @@ ksocknal_read(nal_cb_t *nal, void *private, void *dst_addr,
                nal->ni.nid, (long)len, src_addr, dst_addr);
 
         memcpy( dst_addr, src_addr, len );
-        return 0;
+        return PTL_OK;
 }
 
-int
+ptl_err_t
 ksocknal_write(nal_cb_t *nal, void *private, user_ptr dst_addr,
                void *src_addr, size_t len)
 {
@@ -48,20 +48,7 @@ ksocknal_write(nal_cb_t *nal, void *private, user_ptr dst_addr,
                nal->ni.nid, (long)len, src_addr, dst_addr);
 
         memcpy( dst_addr, src_addr, len );
-        return 0;
-}
-
-int
-ksocknal_callback (nal_cb_t * nal, void *private, lib_eq_t *eq,
-                         ptl_event_t *ev)
-{
-        CDEBUG(D_NET, LPX64": callback eq %p ev %p\n",
-               nal->ni.nid, eq, ev);
-
-        if (eq->event_callback != NULL)
-                eq->event_callback(ev);
-
-        return 0;
+        return PTL_OK;
 }
 
 void *
@@ -136,7 +123,7 @@ ksocknal_free_ltx (ksock_ltx_t *ltx)
         PORTAL_FREE(ltx, ltx->ltx_desc_size);
 }
 
-#if SOCKNAL_ZC
+#if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
 struct page *
 ksocknal_kvaddr_to_page (unsigned long vaddr)
 {
@@ -172,7 +159,7 @@ ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
         int            more = (tx->tx_niov > 1) || 
                               (tx->tx_nkiov > 0) ||
                               (!list_empty (&conn->ksnc_tx_queue));
-#if SOCKNAL_ZC
+#if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
         int            offset = vaddr & (PAGE_SIZE - 1);
         int            zcsize = MIN (fragsize, PAGE_SIZE - offset);
         struct page   *page;
@@ -184,7 +171,7 @@ ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
         LASSERT (fragsize <= tx->tx_resid);
         LASSERT (tx->tx_niov > 0);
         
-#if SOCKNAL_ZC
+#if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
         if (zcsize >= ksocknal_data.ksnd_zc_min_frag &&
             (sock->sk->route_caps & NETIF_F_SG) &&
             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
@@ -617,7 +604,8 @@ ksocknal_tx_done (ksock_tx_t *tx, int asynch)
 
         if (tx->tx_isfwd) {             /* was a forwarded packet? */
                 kpr_fwd_done (&ksocknal_data.ksnd_router,
-                              KSOCK_TX_2_KPR_FWD_DESC (tx), 0);
+                              KSOCK_TX_2_KPR_FWD_DESC (tx), 
+                              (tx->tx_resid == 0) ? 0 : -ECONNABORTED);
                 EXIT;
                 return;
         }
@@ -625,7 +613,8 @@ ksocknal_tx_done (ksock_tx_t *tx, int asynch)
         /* local send */
         ltx = KSOCK_TX_2_KSOCK_LTX (tx);
 
-        lib_finalize (&ksocknal_lib, ltx->ltx_private, ltx->ltx_cookie);
+        lib_finalize (&ksocknal_lib, ltx->ltx_private, ltx->ltx_cookie,
+                      (tx->tx_resid == 0) ? PTL_OK : PTL_FAIL);
 
         ksocknal_free_ltx (ltx);
         EXIT;
@@ -694,16 +683,17 @@ ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
         LASSERT (rc < 0);
 
         if (!conn->ksnc_closing)
-                CERROR ("[%p] Error %d on write to "LPX64
-                        " ip %08x:%d\n",conn, rc, 
-                        conn->ksnc_peer->ksnp_nid,
-                        conn->ksnc_ipaddr, conn->ksnc_port);
+                CERROR("[%p] Error %d on write to "LPX64
+                       " ip %d.%d.%d.%d:%d\n", conn, rc,
+                       conn->ksnc_peer->ksnp_nid,
+                       HIPQUAD(conn->ksnc_ipaddr),
+                       conn->ksnc_port);
 
         ksocknal_close_conn_and_siblings (conn, rc);
         ksocknal_tx_launched (tx);
-        
+
         return (rc);
-} 
+}
 
 void
 ksocknal_launch_autoconnect_locked (ksock_route_t *route)
@@ -737,23 +727,26 @@ ksocknal_launch_autoconnect_locked (ksock_route_t *route)
 ksock_peer_t *
 ksocknal_find_target_peer_locked (ksock_tx_t *tx, ptl_nid_t nid)
 {
+        char          ipbuf[PTL_NALFMT_SIZE];
         ptl_nid_t     target_nid;
         int           rc;
         ksock_peer_t *peer = ksocknal_find_peer_locked (nid);
-        
+
         if (peer != NULL)
                 return (peer);
-        
+
         if (tx->tx_isfwd) {
                 CERROR ("Can't send packet to "LPX64
-                        ": routed target is not a peer\n", nid);
+                       " %s: routed target is not a peer\n",
+                        nid, portals_nid2str(SOCKNAL, nid, ipbuf));
                 return (NULL);
         }
-        
+
         rc = kpr_lookup (&ksocknal_data.ksnd_router, nid, tx->tx_nob,
                          &target_nid);
         if (rc != 0) {
-                CERROR ("Can't route to "LPX64": router error %d\n", nid, rc);
+                CERROR ("Can't route to "LPX64" %s: router error %d\n",
+                        nid, portals_nid2str(SOCKNAL, nid, ipbuf), rc);
                 return (NULL);
         }
 
@@ -761,23 +754,25 @@ ksocknal_find_target_peer_locked (ksock_tx_t *tx, ptl_nid_t nid)
         if (peer != NULL)
                 return (peer);
 
-        CERROR ("Can't send packet to "LPX64": no peer entry\n", target_nid);
+        CERROR ("Can't send packet to "LPX64" %s: no peer entry\n",
+                target_nid, portals_nid2str(SOCKNAL, target_nid, ipbuf));
         return (NULL);
 }
 
 ksock_conn_t *
-ksocknal_find_conn_locked (ksock_tx_t *tx, ksock_peer_t *peer) 
+ksocknal_find_conn_locked (ksock_tx_t *tx, ksock_peer_t *peer)
 {
         struct list_head *tmp;
         ksock_conn_t     *typed = NULL;
         int               tnob  = 0;
         ksock_conn_t     *fallback = NULL;
         int               fnob     = 0;
-        
+
         /* Find the conn with the shortest tx queue */
         list_for_each (tmp, &peer->ksnp_conns) {
                 ksock_conn_t *c = list_entry(tmp, ksock_conn_t, ksnc_list);
-                int           nob = atomic_read(&c->ksnc_tx_nob);
+                int           nob = atomic_read(&c->ksnc_tx_nob) +
+                                        c->ksnc_sock->sk->sk_wmem_queued;
 
                 LASSERT (!c->ksnc_closing);
 
@@ -829,8 +824,10 @@ ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
         LASSERT(!conn->ksnc_closing);
         LASSERT(tx->tx_resid == tx->tx_nob);
         
-        CDEBUG (D_NET, "Sending to "LPX64" on port %d\n", 
-                conn->ksnc_peer->ksnp_nid, conn->ksnc_port);
+        CDEBUG (D_NET, "Sending to "LPX64" ip %d.%d.%d.%d:%d\n", 
+                conn->ksnc_peer->ksnp_nid,
+                HIPQUAD(conn->ksnc_ipaddr),
+                conn->ksnc_port);
 
         atomic_add (tx->tx_nob, &conn->ksnc_tx_nob);
         tx->tx_conn = conn;
@@ -1011,7 +1008,7 @@ ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid)
         return (-EHOSTUNREACH);
 }
 
-int
+ptl_err_t
 ksocknal_sendmsg(nal_cb_t     *nal, 
                  void         *private, 
                  lib_msg_t    *cookie,
@@ -1022,6 +1019,7 @@ ksocknal_sendmsg(nal_cb_t     *nal,
                  unsigned int  payload_niov, 
                  struct iovec *payload_iov, 
                  ptl_kiov_t   *payload_kiov,
+                 size_t        payload_offset,
                  size_t        payload_nob)
 {
         ksock_ltx_t  *ltx;
@@ -1084,20 +1082,19 @@ ksocknal_sendmsg(nal_cb_t     *nal,
                 ltx->ltx_tx.tx_kiov  = NULL;
                 ltx->ltx_tx.tx_nkiov = 0;
 
-                ltx->ltx_tx.tx_niov = 1 + payload_niov;
-
-                memcpy(ltx->ltx_iov + 1, payload_iov,
-                       payload_niov * sizeof (*payload_iov));
-
+                ltx->ltx_tx.tx_niov = 
+                        1 + lib_extract_iov(payload_niov, &ltx->ltx_iov[1],
+                                            payload_niov, payload_iov,
+                                            payload_offset, payload_nob);
         } else {
                 /* payload is all pages */
-                ltx->ltx_tx.tx_kiov = ltx->ltx_kiov;
-                ltx->ltx_tx.tx_nkiov = payload_niov;
-
                 ltx->ltx_tx.tx_niov = 1;
 
-                memcpy(ltx->ltx_kiov, payload_kiov, 
-                       payload_niov * sizeof (*payload_kiov));
+                ltx->ltx_tx.tx_kiov = ltx->ltx_kiov;
+                ltx->ltx_tx.tx_nkiov =
+                        lib_extract_kiov(payload_niov, ltx->ltx_kiov,
+                                         payload_niov, payload_kiov,
+                                         payload_offset, payload_nob);
         }
 
         rc = ksocknal_launch_packet(&ltx->ltx_tx, nid);
@@ -1108,35 +1105,35 @@ ksocknal_sendmsg(nal_cb_t     *nal,
         return (PTL_FAIL);
 }
 
-int
+ptl_err_t
 ksocknal_send (nal_cb_t *nal, void *private, lib_msg_t *cookie,
                ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
                unsigned int payload_niov, struct iovec *payload_iov,
-               size_t payload_len)
+               size_t payload_offset, size_t payload_len)
 {
         return (ksocknal_sendmsg(nal, private, cookie,
                                  hdr, type, nid, pid,
                                  payload_niov, payload_iov, NULL,
-                                 payload_len));
+                                 payload_offset, payload_len));
 }
 
-int
+ptl_err_t
 ksocknal_send_pages (nal_cb_t *nal, void *private, lib_msg_t *cookie, 
                      ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
                      unsigned int payload_niov, ptl_kiov_t *payload_kiov, 
-                     size_t payload_len)
+                     size_t payload_offset, size_t payload_len)
 {
         return (ksocknal_sendmsg(nal, private, cookie,
                                  hdr, type, nid, pid,
                                  payload_niov, NULL, payload_kiov,
-                                 payload_len));
+                                 payload_offset, payload_len));
 }
 
 void
 ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
 {
         ptl_nid_t     nid = fwd->kprfd_gateway_nid;
-        ksock_tx_t   *tx  = (ksock_tx_t *)&fwd->kprfd_scratch;
+        ksock_ftx_t  *ftx = (ksock_ftx_t *)&fwd->kprfd_scratch;
         int           rc;
         
         CDEBUG (D_NET, "Forwarding [%p] -> "LPX64" ("LPX64"))\n", fwd,
@@ -1146,14 +1143,18 @@ ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
         if (nid == ksocknal_lib.ni.nid)
                 nid = fwd->kprfd_target_nid;
 
-        tx->tx_isfwd = 1;                   /* This is a forwarding packet */
-        tx->tx_nob   = fwd->kprfd_nob;
-        tx->tx_niov  = fwd->kprfd_niov;
-        tx->tx_iov   = fwd->kprfd_iov;
-        tx->tx_nkiov = 0;
-        tx->tx_kiov  = NULL;
+        /* setup iov for hdr */
+        ftx->ftx_iov.iov_base = fwd->kprfd_hdr;
+        ftx->ftx_iov.iov_len = sizeof(ptl_hdr_t);
 
-        rc = ksocknal_launch_packet (tx, nid);
+        ftx->ftx_tx.tx_isfwd = 1;                  /* This is a forwarding packet */
+        ftx->ftx_tx.tx_nob   = sizeof(ptl_hdr_t) + fwd->kprfd_nob;
+        ftx->ftx_tx.tx_niov  = 1;
+        ftx->ftx_tx.tx_iov   = &ftx->ftx_iov;
+        ftx->ftx_tx.tx_nkiov = fwd->kprfd_niov;
+        ftx->ftx_tx.tx_kiov  = fwd->kprfd_kiov;
+
+        rc = ksocknal_launch_packet (&ftx->ftx_tx, nid);
         if (rc != 0)
                 kpr_fwd_done (&ksocknal_data.ksnd_router, fwd, rc);
 }
@@ -1181,14 +1182,20 @@ ksocknal_fmb_callback (void *arg, int error)
 {
         ksock_fmb_t       *fmb = (ksock_fmb_t *)arg;
         ksock_fmb_pool_t  *fmp = fmb->fmb_pool;
-        ptl_hdr_t         *hdr = (ptl_hdr_t *) page_address(fmb->fmb_pages[0]);
+        ptl_hdr_t         *hdr = (ptl_hdr_t *)page_address(fmb->fmb_kiov[0].kiov_page);
         ksock_conn_t      *conn = NULL;
         ksock_sched_t     *sched;
         unsigned long      flags;
+        char               ipbuf[PTL_NALFMT_SIZE];
+        char               ipbuf2[PTL_NALFMT_SIZE];
 
         if (error != 0)
-                CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
-                       NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),
+                CERROR("Failed to route packet from "
+                       LPX64" %s to "LPX64" %s: %d\n",
+                       NTOH__u64(hdr->src_nid),
+                       portals_nid2str(SOCKNAL, NTOH__u64(hdr->src_nid), ipbuf),
+                       NTOH__u64(hdr->dest_nid),
+                       portals_nid2str(SOCKNAL, NTOH__u64(hdr->dest_nid), ipbuf2),
                        error);
         else
                 CDEBUG (D_NET, "routed packet from "LPX64" to "LPX64": OK\n",
@@ -1196,7 +1203,7 @@ ksocknal_fmb_callback (void *arg, int error)
 
         /* drop peer ref taken on init */
         ksocknal_put_peer (fmb->fmb_peer);
-        
+
         spin_lock_irqsave (&fmp->fmp_lock, flags);
 
         list_add (&fmb->fmb_list, &fmp->fmp_idle_fmbs);
@@ -1233,7 +1240,6 @@ ksock_fmb_t *
 ksocknal_get_idle_fmb (ksock_conn_t *conn)
 {
         int               payload_nob = conn->ksnc_rx_nob_left;
-        int               packet_nob = sizeof (ptl_hdr_t) + payload_nob;
         unsigned long     flags;
         ksock_fmb_pool_t *pool;
         ksock_fmb_t      *fmb;
@@ -1241,7 +1247,7 @@ ksocknal_get_idle_fmb (ksock_conn_t *conn)
         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
         LASSERT (kpr_routing(&ksocknal_data.ksnd_router));
 
-        if (packet_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE)
+        if (payload_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE)
                 pool = &ksocknal_data.ksnd_small_fmp;
         else
                 pool = &ksocknal_data.ksnd_large_fmp;
@@ -1272,98 +1278,64 @@ ksocknal_get_idle_fmb (ksock_conn_t *conn)
 int
 ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
 {
-        int payload_nob = conn->ksnc_rx_nob_left;
-        int packet_nob = sizeof (ptl_hdr_t) + payload_nob;
+        int       payload_nob = conn->ksnc_rx_nob_left;
         ptl_nid_t dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
-        int niov;                               /* at least the header */
-        int nob;
+        int       niov = 0;
+        int       nob = payload_nob;
 
         LASSERT (conn->ksnc_rx_scheduled);
         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
         LASSERT (conn->ksnc_rx_nob_wanted == conn->ksnc_rx_nob_left);
         LASSERT (payload_nob >= 0);
-        LASSERT (packet_nob <= fmb->fmb_npages * PAGE_SIZE);
+        LASSERT (payload_nob <= fmb->fmb_pool->fmp_buff_pages * PAGE_SIZE);
         LASSERT (sizeof (ptl_hdr_t) < PAGE_SIZE);
-
-        /* Got a forwarding buffer; copy the header we just read into the
-         * forwarding buffer.  If there's payload, start reading reading it
-         * into the buffer, otherwise the forwarding buffer can be kicked
-         * off immediately.
-         *
-         * NB fmb->fmb_iov spans the WHOLE packet.
-         *    conn->ksnc_rx_iov spans just the payload.
-         */
-        fmb->fmb_iov[0].iov_base = page_address (fmb->fmb_pages[0]);
-
-        /* copy header */
-        memcpy (fmb->fmb_iov[0].iov_base, &conn->ksnc_hdr, sizeof (ptl_hdr_t));
+        LASSERT (fmb->fmb_kiov[0].kiov_offset == 0);
 
         /* Take a ref on the conn's peer to prevent module unload before
-         * forwarding completes.  NB we ref peer and not conn since because
-         * all refs on conn after it has been closed must remove themselves
-         * in finite time */
+         * forwarding completes. */
         fmb->fmb_peer = conn->ksnc_peer;
         atomic_inc (&conn->ksnc_peer->ksnp_refcount);
 
-        if (payload_nob == 0) {         /* got complete packet already */
-                CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (immediate)\n",
-                        conn, NTOH__u64 (conn->ksnc_hdr.src_nid),
-                        dest_nid, packet_nob);
+        /* Copy the header we just read into the forwarding buffer.  If
+         * there's payload, start reading reading it into the buffer,
+         * otherwise the forwarding buffer can be kicked off
+         * immediately. */
+        fmb->fmb_hdr = conn->ksnc_hdr;
 
-                fmb->fmb_iov[0].iov_len = sizeof (ptl_hdr_t);
+        while (nob > 0) {
+                LASSERT (niov < fmb->fmb_pool->fmp_buff_pages);
+                LASSERT (fmb->fmb_kiov[niov].kiov_offset == 0);
+                fmb->fmb_kiov[niov].kiov_len = MIN (PAGE_SIZE, nob);
+                nob -= PAGE_SIZE;
+                niov++;
+        }
 
-                kpr_fwd_init (&fmb->fmb_fwd, dest_nid,
-                              packet_nob, 1, fmb->fmb_iov,
-                              ksocknal_fmb_callback, fmb);
+        kpr_fwd_init(&fmb->fmb_fwd, dest_nid, &fmb->fmb_hdr,
+                     payload_nob, niov, fmb->fmb_kiov,
+                     ksocknal_fmb_callback, fmb);
+
+        if (payload_nob == 0) {         /* got complete packet already */
+                CDEBUG (D_NET, "%p "LPX64"->"LPX64" fwd_start (immediate)\n",
+                        conn, NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid);
 
-                /* forward it now */
                 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
 
                 ksocknal_new_packet (conn, 0);  /* on to next packet */
                 return (1);
         }
 
-        niov = 1;
-        if (packet_nob <= PAGE_SIZE) {  /* whole packet fits in first page */
-                fmb->fmb_iov[0].iov_len = packet_nob;
-        } else {
-                fmb->fmb_iov[0].iov_len = PAGE_SIZE;
-                nob = packet_nob - PAGE_SIZE;
-
-                do {
-                        LASSERT (niov < fmb->fmb_npages);
-                        fmb->fmb_iov[niov].iov_base =
-                                page_address (fmb->fmb_pages[niov]);
-                        fmb->fmb_iov[niov].iov_len = MIN (PAGE_SIZE, nob);
-                        nob -= PAGE_SIZE;
-                        niov++;
-                } while (nob > 0);
-        }
-
-        kpr_fwd_init (&fmb->fmb_fwd, dest_nid,
-                      packet_nob, niov, fmb->fmb_iov,
-                      ksocknal_fmb_callback, fmb);
-
         conn->ksnc_cookie = fmb;                /* stash fmb for later */
         conn->ksnc_rx_state = SOCKNAL_RX_BODY_FWD; /* read in the payload */
         
-        /* payload is desc's iov-ed buffer, but skipping the hdr */
-        LASSERT (niov <= sizeof (conn->ksnc_rx_iov_space) /
-                 sizeof (struct iovec));
-
-        conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
-        conn->ksnc_rx_iov[0].iov_base =
-                (void *)(((unsigned long)fmb->fmb_iov[0].iov_base) +
-                         sizeof (ptl_hdr_t));
-        conn->ksnc_rx_iov[0].iov_len =
-                fmb->fmb_iov[0].iov_len - sizeof (ptl_hdr_t);
-
-        if (niov > 1)
-                memcpy(&conn->ksnc_rx_iov[1], &fmb->fmb_iov[1],
-                       (niov - 1) * sizeof (struct iovec));
-
-        conn->ksnc_rx_niov = niov;
+        /* Set up conn->ksnc_rx_kiov to read the payload into fmb's kiov-ed
+         * buffer */
+        LASSERT (niov <= sizeof(conn->ksnc_rx_iov_space)/sizeof(ptl_kiov_t));
 
+        conn->ksnc_rx_niov = 0;
+        conn->ksnc_rx_nkiov = niov;
+        conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
+        memcpy(conn->ksnc_rx_kiov, fmb->fmb_kiov, niov * sizeof(ptl_kiov_t));
+        
         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d reading body\n", conn,
                 NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid, payload_nob);
         return (0);
@@ -1377,6 +1349,7 @@ ksocknal_fwd_parse (ksock_conn_t *conn)
         ptl_nid_t     src_nid = NTOH__u64 (conn->ksnc_hdr.src_nid);
         int           body_len = NTOH__u32 (conn->ksnc_hdr.payload_length);
         char str[PTL_NALFMT_SIZE];
+        char str2[PTL_NALFMT_SIZE];
 
         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d parsing header\n", conn,
                 src_nid, dest_nid, conn->ksnc_rx_nob_left);
@@ -1388,7 +1361,7 @@ ksocknal_fwd_parse (ksock_conn_t *conn)
                 CERROR("dropping packet from "LPX64" (%s) for "LPX64" (%s): "
                        "packet size %d illegal\n",
                        src_nid, portals_nid2str(TCPNAL, src_nid, str),
-                       dest_nid, portals_nid2str(TCPNAL, dest_nid, str),
+                       dest_nid, portals_nid2str(TCPNAL, dest_nid, str2),
                        body_len);
 
                 ksocknal_new_packet (conn, 0);  /* on to new packet */
@@ -1399,7 +1372,7 @@ ksocknal_fwd_parse (ksock_conn_t *conn)
                 CERROR("dropping packet from "LPX64" (%s) for "LPX64
                        " (%s): not forwarding\n",
                        src_nid, portals_nid2str(TCPNAL, src_nid, str),
-                       dest_nid, portals_nid2str(TCPNAL, dest_nid, str));
+                       dest_nid, portals_nid2str(TCPNAL, dest_nid, str2));
                 /* on to new packet (skip this one's body) */
                 ksocknal_new_packet (conn, body_len);
                 return;
@@ -1409,7 +1382,7 @@ ksocknal_fwd_parse (ksock_conn_t *conn)
                 CERROR ("dropping packet from "LPX64" (%s) for "LPX64
                         "(%s): packet size %d too big\n",
                         src_nid, portals_nid2str(TCPNAL, src_nid, str),
-                        dest_nid, portals_nid2str(TCPNAL, dest_nid, str),
+                        dest_nid, portals_nid2str(TCPNAL, dest_nid, str2),
                         body_len);
                 /* on to new packet (skip this one's body) */
                 ksocknal_new_packet (conn, body_len);
@@ -1422,7 +1395,7 @@ ksocknal_fwd_parse (ksock_conn_t *conn)
                 CERROR ("dropping packet from "LPX64" (%s) for "LPX64
                         "(%s): target is a peer\n",
                         src_nid, portals_nid2str(TCPNAL, src_nid, str),
-                        dest_nid, portals_nid2str(TCPNAL, dest_nid, str));
+                        dest_nid, portals_nid2str(TCPNAL, dest_nid, str2));
                 ksocknal_put_peer (peer);  /* drop ref from get above */
 
                 /* on to next packet (skip this one's body) */
@@ -1529,13 +1502,16 @@ ksocknal_process_receive (ksock_conn_t *conn)
                 LASSERT (rc != -EAGAIN);
 
                 if (rc == 0)
-                        CWARN ("[%p] EOF from "LPX64" ip %08x:%d\n",
+                        CWARN ("[%p] EOF from "LPX64" ip %d.%d.%d.%d:%d\n",
                                conn, conn->ksnc_peer->ksnp_nid,
-                               conn->ksnc_ipaddr, conn->ksnc_port);
+                               HIPQUAD(conn->ksnc_ipaddr),
+                               conn->ksnc_port);
                 else if (!conn->ksnc_closing)
-                        CERROR ("[%p] Error %d on read from "LPX64" ip %08x:%d\n",
+                        CERROR ("[%p] Error %d on read from "LPX64
+                                " ip %d.%d.%d.%d:%d\n",
                                 conn, rc, conn->ksnc_peer->ksnp_nid,
-                                conn->ksnc_ipaddr, conn->ksnc_port);
+                                HIPQUAD(conn->ksnc_ipaddr),
+                                conn->ksnc_port);
 
                 ksocknal_close_conn_and_siblings (conn, rc);
                 return (rc == 0 ? -ESHUTDOWN : rc);
@@ -1576,7 +1552,7 @@ ksocknal_process_receive (ksock_conn_t *conn)
 
         case SOCKNAL_RX_BODY:
                 /* payload all received */
-                lib_finalize(&ksocknal_lib, NULL, conn->ksnc_cookie);
+                lib_finalize(&ksocknal_lib, NULL, conn->ksnc_cookie, PTL_OK);
                 /* Fall through */
 
         case SOCKNAL_RX_SLOP:
@@ -1612,9 +1588,10 @@ ksocknal_process_receive (ksock_conn_t *conn)
         return (-EINVAL);                       /* keep gcc happy */
 }
 
-int
+ptl_err_t
 ksocknal_recv (nal_cb_t *nal, void *private, lib_msg_t *msg,
-               unsigned int niov, struct iovec *iov, size_t mlen, size_t rlen)
+               unsigned int niov, struct iovec *iov, 
+               size_t offset, size_t mlen, size_t rlen)
 {
         ksock_conn_t *conn = (ksock_conn_t *)private;
 
@@ -1627,20 +1604,22 @@ ksocknal_recv (nal_cb_t *nal, void *private, lib_msg_t *msg,
 
         conn->ksnc_rx_nkiov = 0;
         conn->ksnc_rx_kiov = NULL;
-        conn->ksnc_rx_niov = niov;
         conn->ksnc_rx_iov = conn->ksnc_rx_iov_space.iov;
-        memcpy (conn->ksnc_rx_iov, iov, niov * sizeof (*iov));
+        conn->ksnc_rx_niov =
+                lib_extract_iov(PTL_MD_MAX_IOV, conn->ksnc_rx_iov,
+                                niov, iov, offset, mlen);
 
         LASSERT (mlen == 
                  lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
                  lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
 
-        return (rlen);
+        return (PTL_OK);
 }
 
-int
+ptl_err_t
 ksocknal_recv_pages (nal_cb_t *nal, void *private, lib_msg_t *msg,
-                     unsigned int niov, ptl_kiov_t *kiov, size_t mlen, size_t rlen)
+                     unsigned int niov, ptl_kiov_t *kiov, 
+                     size_t offset, size_t mlen, size_t rlen)
 {
         ksock_conn_t *conn = (ksock_conn_t *)private;
 
@@ -1653,15 +1632,16 @@ ksocknal_recv_pages (nal_cb_t *nal, void *private, lib_msg_t *msg,
 
         conn->ksnc_rx_niov = 0;
         conn->ksnc_rx_iov  = NULL;
-        conn->ksnc_rx_nkiov = niov;
         conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
-        memcpy (conn->ksnc_rx_kiov, kiov, niov * sizeof (*kiov));
+        conn->ksnc_rx_nkiov = 
+                lib_extract_kiov(PTL_MD_MAX_IOV, conn->ksnc_rx_kiov,
+                                 niov, kiov, offset, mlen);
 
         LASSERT (mlen == 
                  lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
                  lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
 
-        return (rlen);
+        return (PTL_OK);
 }
 
 int ksocknal_scheduler (void *arg)
@@ -2014,54 +1994,60 @@ ksocknal_sock_read (struct socket *sock, void *buffer, int nob)
 }
 
 int
-ksocknal_hello (struct socket *sock, ptl_nid_t *nid, int *type, __u64 *incarnation)
+ksocknal_hello (struct socket *sock, ptl_nid_t *nid, int *type,
+                __u64 *incarnation)
 {
         int                 rc;
         ptl_hdr_t           hdr;
         ptl_magicversion_t *hmv = (ptl_magicversion_t *)&hdr.dest_nid;
+        char                ipbuf[PTL_NALFMT_SIZE];
+        char                ipbuf2[PTL_NALFMT_SIZE];
 
         LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
 
         memset (&hdr, 0, sizeof (hdr));
         hmv->magic         = __cpu_to_le32 (PORTALS_PROTO_MAGIC);
-        hmv->version_major = __cpu_to_le32 (PORTALS_PROTO_VERSION_MAJOR);
-        hmv->version_minor = __cpu_to_le32 (PORTALS_PROTO_VERSION_MINOR);
-        
+        hmv->version_major = __cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR);
+        hmv->version_minor = __cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR);
+
         hdr.src_nid = __cpu_to_le64 (ksocknal_lib.ni.nid);
         hdr.type    = __cpu_to_le32 (PTL_MSG_HELLO);
 
         hdr.msg.hello.type = __cpu_to_le32 (*type);
-        hdr.msg.hello.incarnation = 
+        hdr.msg.hello.incarnation =
                 __cpu_to_le64 (ksocknal_data.ksnd_incarnation);
 
         /* Assume sufficient socket buffering for this message */
         rc = ksocknal_sock_write (sock, &hdr, sizeof (hdr));
         if (rc != 0) {
-                CERROR ("Error %d sending HELLO to "LPX64"\n", rc, *nid);
+                CERROR ("Error %d sending HELLO to "LPX64" %s\n",
+                        rc, *nid, portals_nid2str(SOCKNAL, *nid, ipbuf));
                 return (rc);
         }
 
         rc = ksocknal_sock_read (sock, hmv, sizeof (*hmv));
         if (rc != 0) {
-                CERROR ("Error %d reading HELLO from "LPX64"\n", rc, *nid);
+                CERROR ("Error %d reading HELLO from "LPX64" %s\n",
+                        rc, *nid, portals_nid2str(SOCKNAL, *nid, ipbuf));
                 return (rc);
         }
-        
+
         if (hmv->magic != __le32_to_cpu (PORTALS_PROTO_MAGIC)) {
-                CERROR ("Bad magic %#08x (%#08x expected) from "LPX64"\n",
-                        __cpu_to_le32 (hmv->magic), PORTALS_PROTO_MAGIC, *nid);
+                CERROR ("Bad magic %#08x (%#08x expected) from "LPX64" %s\n",
+                        __cpu_to_le32 (hmv->magic), PORTALS_PROTO_MAGIC, *nid,
+                        portals_nid2str(SOCKNAL, *nid, ipbuf));
                 return (-EPROTO);
         }
 
         if (hmv->version_major != __cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR) ||
             hmv->version_minor != __cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR)) {
                 CERROR ("Incompatible protocol version %d.%d (%d.%d expected)"
-                        " from "LPX64"\n",
+                        " from "LPX64" %s\n",
                         __le16_to_cpu (hmv->version_major),
                         __le16_to_cpu (hmv->version_minor),
                         PORTALS_PROTO_VERSION_MAJOR,
                         PORTALS_PROTO_VERSION_MINOR,
-                        *nid);
+                        *nid, portals_nid2str(SOCKNAL, *nid, ipbuf));
                 return (-EPROTO);
         }
 
@@ -2073,8 +2059,8 @@ ksocknal_hello (struct socket *sock, ptl_nid_t *nid, int *type, __u64 *incarnati
 
         rc = ksocknal_sock_read (sock, hmv + 1, sizeof (hdr) - sizeof (*hmv));
         if (rc != 0) {
-                CERROR ("Error %d reading rest of HELLO hdr from "LPX64"\n",
-                        rc, *nid);
+                CERROR ("Error %d reading rest of HELLO hdr from "LPX64" %s\n",
+                        rc, *nid, portals_nid2str(SOCKNAL, *nid, ipbuf));
                 return (rc);
         }
 
@@ -2082,9 +2068,10 @@ ksocknal_hello (struct socket *sock, ptl_nid_t *nid, int *type, __u64 *incarnati
         if (hdr.type != __cpu_to_le32 (PTL_MSG_HELLO) ||
             hdr.payload_length != __cpu_to_le32 (0)) {
                 CERROR ("Expecting a HELLO hdr with 0 payload,"
-                        " but got type %d with %d payload from "LPX64"\n",
+                        " but got type %d with %d payload from "LPX64" %s\n",
                         __le32_to_cpu (hdr.type),
-                        __le32_to_cpu (hdr.payload_length), *nid);
+                        __le32_to_cpu (hdr.payload_length), *nid,
+                        portals_nid2str(SOCKNAL, *nid, ipbuf));
                 return (-EPROTO);
         }
 
@@ -2096,8 +2083,12 @@ ksocknal_hello (struct socket *sock, ptl_nid_t *nid, int *type, __u64 *incarnati
         if (*nid == PTL_NID_ANY) {              /* don't know peer's nid yet */
                 *nid = __le64_to_cpu(hdr.src_nid);
         } else if (*nid != __le64_to_cpu (hdr.src_nid)) {
-                CERROR ("Connected to nid "LPX64", but expecting "LPX64"\n",
-                        __le64_to_cpu (hdr.src_nid), *nid);
+                CERROR ("Connected to nid "LPX64" %s, but expecting "LPX64" %s\n",
+                        __le64_to_cpu (hdr.src_nid),
+                        portals_nid2str(SOCKNAL,
+                                        __le64_to_cpu(hdr.src_nid),
+                                        ipbuf),
+                        *nid, portals_nid2str(SOCKNAL, *nid, ipbuf2));
                 return (-EPROTO);
         }
 
@@ -2115,12 +2106,15 @@ ksocknal_hello (struct socket *sock, ptl_nid_t *nid, int *type, __u64 *incarnati
                         *type = SOCKNAL_CONN_BULK_IN;
                         break;
                 default:
-                        CERROR ("Unexpected type %d from "LPX64"\n", *type, *nid);
+                        CERROR ("Unexpected type %d from "LPX64" %s\n",
+                                *type, *nid,
+                                portals_nid2str(SOCKNAL, *nid, ipbuf));
                         return (-EPROTO);
                 }
         } else if (__le32_to_cpu(hdr.msg.hello.type) != SOCKNAL_CONN_NONE) {
-                CERROR ("Mismatched types: me %d "LPX64" %d\n",
-                        *type, *nid, __le32_to_cpu(hdr.msg.hello.type));
+                CERROR ("Mismatched types: me %d "LPX64" %s %d\n",
+                        *type, *nid, portals_nid2str(SOCKNAL, *nid, ipbuf),
+                        __le32_to_cpu(hdr.msg.hello.type));
                 return (-EPROTO);
         }
 
@@ -2224,6 +2218,7 @@ ksocknal_connect_peer (ksock_route_t *route, int type)
         int                 fd;
         struct socket      *sock;
         int                 rc;
+        char                ipbuf[PTL_NALFMT_SIZE];
 
         rc = sock_create (PF_INET, SOCK_STREAM, 0, &sock);
         if (rc != 0) {
@@ -2271,7 +2266,7 @@ ksocknal_connect_peer (ksock_route_t *route, int type)
                 goto out;
         }
 
-        if (route->ksnr_nonagel) {
+        {
                 int  option = 1;
                 
                 set_fs (KERNEL_DS);
@@ -2279,7 +2274,7 @@ ksocknal_connect_peer (ksock_route_t *route, int type)
                                             (char *)&option, sizeof (option));
                 set_fs (oldmm);
                 if (rc != 0) {
-                        CERROR ("Can't disable nagel: %d\n", rc);
+                        CERROR ("Can't disable nagle: %d\n", rc);
                         goto out;
                 }
         }
@@ -2316,8 +2311,11 @@ ksocknal_connect_peer (ksock_route_t *route, int type)
         rc = sock->ops->connect (sock, (struct sockaddr *)&peer_addr, 
                                  sizeof (peer_addr), sock->file->f_flags);
         if (rc != 0) {
-                CERROR ("Error %d connecting to "LPX64"\n", rc,
-                        route->ksnr_peer->ksnp_nid);
+                CERROR ("Error %d connecting to "LPX64" %s\n", rc,
+                        route->ksnr_peer->ksnp_nid,
+                        portals_nid2str(SOCKNAL,
+                                        route->ksnr_peer->ksnp_nid,
+                                        ipbuf));
                 goto out;
         }
         
@@ -2399,13 +2397,21 @@ ksocknal_autoconnect (ksock_route_t *route)
         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
 
         while (!list_empty (&zombies)) {
+                char ipbuf[PTL_NALFMT_SIZE];
+                char ipbuf2[PTL_NALFMT_SIZE];
                 tx = list_entry (zombies.next, ksock_tx_t, tx_list);
-                
-                CERROR ("Deleting packet type %d len %d ("LPX64"->"LPX64")\n",
+
+                CERROR ("Deleting packet type %d len %d ("LPX64" %s->"LPX64" %s)\n",
                         NTOH__u32 (tx->tx_hdr->type),
                         NTOH__u32 (tx->tx_hdr->payload_length),
                         NTOH__u64 (tx->tx_hdr->src_nid),
-                        NTOH__u64 (tx->tx_hdr->dest_nid));
+                        portals_nid2str(SOCKNAL,
+                                        NTOH__u64(tx->tx_hdr->src_nid),
+                                        ipbuf),
+                        NTOH__u64 (tx->tx_hdr->dest_nid),
+                        portals_nid2str(SOCKNAL,
+                                        NTOH__u64(tx->tx_hdr->src_nid),
+                                        ipbuf2));
 
                 list_del (&tx->tx_list);
                 /* complete now */
@@ -2478,8 +2484,8 @@ ksocknal_find_timed_out_conn (ksock_peer_t *peer)
                     time_after_eq (jiffies, conn->ksnc_rx_deadline)) {
                         /* Timed out incomplete incoming message */
                         atomic_inc (&conn->ksnc_refcount);
-                        CERROR ("Timed out RX from "LPX64" %p\n", 
-                                peer->ksnp_nid, conn);
+                        CERROR ("Timed out RX from "LPX64" %p %d.%d.%d.%d\n",
+                                peer->ksnp_nid, conn, HIPQUAD(conn->ksnc_ipaddr));
                         return (conn);
                 }
                 
@@ -2489,10 +2495,11 @@ ksocknal_find_timed_out_conn (ksock_peer_t *peer)
                         /* Timed out messages queued for sending, or
                          * messages buffered in the socket's send buffer */
                         atomic_inc (&conn->ksnc_refcount);
-                        CERROR ("Timed out TX to "LPX64" %s%d %p\n", 
+                        CERROR ("Timed out TX to "LPX64" %s%d %p %d.%d.%d.%d\n", 
                                 peer->ksnp_nid, 
                                 list_empty (&conn->ksnc_tx_queue) ? "" : "Q ",
-                                conn->ksnc_sock->sk->sk_wmem_queued, conn);
+                                conn->ksnc_sock->sk->sk_wmem_queued, conn,
+                                HIPQUAD(conn->ksnc_ipaddr));
                         return (conn);
                 }
         }
@@ -2521,8 +2528,9 @@ ksocknal_check_peer_timeouts (int idx)
                 if (conn != NULL) {
                         read_unlock (&ksocknal_data.ksnd_global_lock);
 
-                        CERROR ("Timeout out conn->"LPX64" ip %x:%d\n",
-                                peer->ksnp_nid, conn->ksnc_ipaddr,
+                        CERROR ("Timeout out conn->"LPX64" ip %d.%d.%d.%d:%d\n",
+                                peer->ksnp_nid,
+                                HIPQUAD(conn->ksnc_ipaddr),
                                 conn->ksnc_port);
                         ksocknal_close_conn_and_siblings (conn, -ETIMEDOUT);
                         
@@ -2679,7 +2687,6 @@ nal_cb_t ksocknal_lib = {
         cb_recv_pages:   ksocknal_recv_pages,
         cb_read:         ksocknal_read,
         cb_write:        ksocknal_write,
-        cb_callback:     ksocknal_callback,
         cb_malloc:       ksocknal_malloc,
         cb_free:         ksocknal_free,
         cb_printf:       ksocknal_printf,