Whamcloud - gitweb
* 12299 - fixed spurious timeout on userspace ptllnd NOOP messages.
[fs/lustre-release.git] / lnet / ulnds / ptllnd / ptllnd_cb.c
index 0114c42..0c6f521 100644 (file)
 
 #include "ptllnd.h"
 
+void
+ptllnd_set_tx_deadline(ptllnd_tx_t *tx)
+{
+        ptllnd_peer_t  *peer = tx->tx_peer;
+        lnet_ni_t      *ni = peer->plp_ni;
+        ptllnd_ni_t    *plni = ni->ni_data;
+
+        tx->tx_deadline = cfs_time_current_sec() + plni->plni_timeout;
+}
+
+void
+ptllnd_post_tx(ptllnd_tx_t *tx)
+{
+        ptllnd_peer_t  *peer = tx->tx_peer;
+
+        ptllnd_set_tx_deadline(tx);
+        list_add_tail(&tx->tx_list, &peer->plp_txq);
+        ptllnd_check_sends(peer);
+}
+
 char *
 ptllnd_ptlid2str(ptl_process_id_t id)
 {
@@ -38,6 +58,10 @@ ptllnd_destroy_peer(ptllnd_peer_t *peer)
 {
         lnet_ni_t         *ni = peer->plp_ni;
         ptllnd_ni_t       *plni = ni->ni_data;
+        int                nmsg = peer->plp_lazy_credits +
+                                  plni->plni_peer_credits;
+
+        ptllnd_size_buffers(ni, -nmsg);
 
         LASSERT (peer->plp_closing);
         LASSERT (plni->plni_npeers > 0);
@@ -73,8 +97,9 @@ ptllnd_close_peer(ptllnd_peer_t *peer, int error)
         if (!list_empty(&peer->plp_txq) ||
             !list_empty(&peer->plp_activeq) ||
             error != 0) {
-                CERROR("Closing %s\n", libcfs_id2str(peer->plp_id));
-                ptllnd_debug_peer(ni, peer->plp_id);
+                CWARN("Closing %s\n", libcfs_id2str(peer->plp_id));
+                if (plni->plni_debug)
+                        ptllnd_dump_debug(ni, peer->plp_id);
         }
         
         ptllnd_abort_txs(plni, &peer->plp_txq);
@@ -111,7 +136,7 @@ ptllnd_find_peer(lnet_ni_t *ni, lnet_process_id_t id, int create)
 
         /* New peer: check first for enough posted buffers */
         plni->plni_npeers++;
-        rc = ptllnd_grow_buffers(ni);
+        rc = ptllnd_size_buffers(ni, plni->plni_peer_credits);
         if (rc != 0) {
                 plni->plni_npeers--;
                 return NULL;
@@ -121,19 +146,20 @@ ptllnd_find_peer(lnet_ni_t *ni, lnet_process_id_t id, int create)
         if (plp == NULL) {
                 CERROR("Can't allocate new peer %s\n", libcfs_id2str(id));
                 plni->plni_npeers--;
+                ptllnd_size_buffers(ni, -plni->plni_peer_credits);
                 return NULL;
         }
 
-        CDEBUG(D_NET, "new peer=%p\n",plp);
-
         plp->plp_ni = ni;
         plp->plp_id = id;
         plp->plp_ptlid.nid = LNET_NIDADDR(id.nid);
         plp->plp_ptlid.pid = plni->plni_ptllnd_pid;
-        plp->plp_max_credits =
         plp->plp_credits = 1; /* add more later when she gives me credits */
         plp->plp_max_msg_size = plni->plni_max_msg_size; /* until I hear from her */
+        plp->plp_sent_credits = 1;              /* Implicit credit for HELLO */
         plp->plp_outstanding_credits = plni->plni_peer_credits - 1;
+        plp->plp_lazy_credits = 0;
+        plp->plp_extra_lazy_credits = 0;
         plp->plp_match = 0;
         plp->plp_stamp = 0;
         plp->plp_recvd_hello = 0;
@@ -157,6 +183,12 @@ ptllnd_find_peer(lnet_ni_t *ni, lnet_process_id_t id, int create)
         tx->tx_msg.ptlm_u.hello.kptlhm_matchbits = PTL_RESERVED_MATCHBITS;
         tx->tx_msg.ptlm_u.hello.kptlhm_max_msg_size = plni->plni_max_msg_size;
 
+        PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post hello %p", libcfs_id2str(id),
+                       tx->tx_peer->plp_credits,
+                       tx->tx_peer->plp_outstanding_credits,
+                       tx->tx_peer->plp_sent_credits,
+                       plni->plni_peer_credits + 
+                       tx->tx_peer->plp_lazy_credits, tx);
         ptllnd_post_tx(tx);
 
         return plp;
@@ -208,12 +240,14 @@ ptllnd_tx_typestr(int type)
 void
 ptllnd_debug_tx(ptllnd_tx_t *tx) 
 {
-        CDEBUG(D_WARNING, "%s %s b "DBGT_FMT"/"DBGT_FMT
-               " r "DBGT_FMT"/"DBGT_FMT" status %d\n",
+        CDEBUG(D_WARNING, "%s %s b %ld.%06ld/%ld.%06ld"
+               " r %ld.%06ld/%ld.%06ld status %d\n",
                ptllnd_tx_typestr(tx->tx_type),
-               libcfs_id2str(tx->tx_peer->plp_id)
-               DBGT_ARGS(tx->tx_bulk_posted) DBGT_ARGS(tx->tx_bulk_done)
-               DBGT_ARGS(tx->tx_req_posted) DBGT_ARGS(tx->tx_req_done),
+               libcfs_id2str(tx->tx_peer->plp_id),
+               tx->tx_bulk_posted.tv_sec, tx->tx_bulk_posted.tv_usec, 
+               tx->tx_bulk_done.tv_sec, tx->tx_bulk_done.tv_usec,
+               tx->tx_req_posted.tv_sec, tx->tx_req_posted.tv_usec,
+               tx->tx_req_done.tv_sec, tx->tx_req_done.tv_usec,
                tx->tx_status);
 }
 
@@ -230,7 +264,7 @@ ptllnd_debug_peer(lnet_ni_t *ni, lnet_process_id_t id)
                 return;
         }
         
-        CDEBUG(D_WARNING, "%s %s%s [%d] "LPD64".%06d m "LPD64" q %d/%d c %d/%d(%d)\n",
+        CDEBUG(D_WARNING, "%s %s%s [%d] "LPU64".%06d m "LPU64" q %d/%d c %d/%d+%d(%d)\n",
                libcfs_id2str(id), 
                plp->plp_recvd_hello ? "H" : "_",
                plp->plp_closing     ? "C" : "_",
@@ -239,7 +273,8 @@ ptllnd_debug_peer(lnet_ni_t *ni, lnet_process_id_t id)
                plp->plp_match,
                ptllnd_count_q(&plp->plp_txq),
                ptllnd_count_q(&plp->plp_activeq),
-               plp->plp_credits, plp->plp_outstanding_credits, plp->plp_max_credits);
+               plp->plp_credits, plp->plp_outstanding_credits, plp->plp_sent_credits,
+               plni->plni_peer_credits + plp->plp_lazy_credits);
 
         CDEBUG(D_WARNING, "txq:\n");
         list_for_each (tmp, &plp->plp_txq) {
@@ -277,18 +312,23 @@ ptllnd_debug_peer(lnet_ni_t *ni, lnet_process_id_t id)
 }
 
 void
+ptllnd_dump_debug(lnet_ni_t *ni, lnet_process_id_t id)
+{
+        ptllnd_debug_peer(ni, id);
+        ptllnd_dump_history();
+}
+
+void
 ptllnd_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive)
 {
         lnet_process_id_t  id;
         ptllnd_peer_t     *peer;
         time_t             start = cfs_time_current_sec();
-        int                w = PTLLND_WARN_LONG_WAIT;
-        
+        ptllnd_ni_t       *plni = ni->ni_data;
+        int                w = plni->plni_long_wait;
+
         /* This is only actually used to connect to routers at startup! */
-        if (!alive) {
-                LBUG();
-                return;
-        }
+        LASSERT(alive);
 
         id.nid = nid;
         id.pid = LUSTRE_SRV_LNET_PID;
@@ -299,18 +339,59 @@ ptllnd_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive)
 
         /* wait for the peer to reply */
         while (!peer->plp_recvd_hello) {
-                if (cfs_time_current_sec() > start + w) {
+                if (w > 0 && cfs_time_current_sec() > start + w/1000) {
                         CWARN("Waited %ds to connect to %s\n",
-                              w, libcfs_id2str(id));
+                              (int)(cfs_time_current_sec() - start),
+                              libcfs_id2str(id));
                         w *= 2;
                 }
                 
-                ptllnd_wait(ni, w*1000);
+                ptllnd_wait(ni, w);
         }
         
         ptllnd_peer_decref(peer);
 }
 
+int
+ptllnd_setasync(lnet_ni_t *ni, lnet_process_id_t id, int nasync)
+{
+        ptllnd_peer_t *peer = ptllnd_find_peer(ni, id, nasync > 0);
+        int            rc;
+        
+        if (peer == NULL)
+                return -ENOMEM;
+
+        LASSERT (peer->plp_lazy_credits >= 0);
+        LASSERT (peer->plp_extra_lazy_credits >= 0);
+
+        /* If nasync < 0, we're being told we can reduce the total message
+         * headroom.  We can't do this right now because our peer might already
+         * have credits for the extra buffers, so we just account the extra
+         * headroom in case we need it later and only destroy buffers when the
+         * peer closes.
+         *
+         * Note that the following condition handles this case, where it
+         * actually increases the extra lazy credit counter. */
+
+        if (nasync <= peer->plp_extra_lazy_credits) {
+                peer->plp_extra_lazy_credits -= nasync;
+                return 0;
+        }
+
+        LASSERT (nasync > 0);
+
+        nasync -= peer->plp_extra_lazy_credits;
+        peer->plp_extra_lazy_credits = 0;
+        
+        rc = ptllnd_size_buffers(ni, nasync);
+        if (rc == 0) {
+                peer->plp_lazy_credits += nasync;
+                peer->plp_outstanding_credits += nasync;
+        }
+
+        return rc;
+}
+
 __u32
 ptllnd_cksum (void *ptr, int nob)
 {
@@ -332,7 +413,7 @@ ptllnd_new_tx(ptllnd_peer_t *peer, int type, int payload_nob)
         ptllnd_tx_t *tx;
         int          msgsize;
 
-        CDEBUG(D_NET, "peer=%p type=%d payload=%d\n",peer,type,payload_nob);
+        CDEBUG(D_NET, "peer=%p type=%d payload=%d\n", peer, type, payload_nob);
 
         switch (type) {
         default:
@@ -371,8 +452,6 @@ ptllnd_new_tx(ptllnd_peer_t *peer, int type, int payload_nob)
         msgsize = (msgsize + 7) & ~7;
         LASSERT (msgsize <= peer->plp_max_msg_size);
 
-        CDEBUG(D_NET, "msgsize=%d\n",msgsize);
-
         LIBCFS_ALLOC(tx, offsetof(ptllnd_tx_t, tx_msg) + msgsize);
 
         if (tx == NULL) {
@@ -393,10 +472,10 @@ ptllnd_new_tx(ptllnd_peer_t *peer, int type, int payload_nob)
         tx->tx_completing = 0;
         tx->tx_status = 0;
 
-        PTLLND_DBGT_INIT(tx->tx_bulk_posted);
-        PTLLND_DBGT_INIT(tx->tx_bulk_done);
-        PTLLND_DBGT_INIT(tx->tx_req_posted);
-        PTLLND_DBGT_INIT(tx->tx_req_done);
+        memset(&tx->tx_bulk_posted, 0, sizeof(tx->tx_bulk_posted));
+        memset(&tx->tx_bulk_done, 0, sizeof(tx->tx_bulk_done));
+        memset(&tx->tx_req_posted, 0, sizeof(tx->tx_req_posted));
+        memset(&tx->tx_req_done, 0, sizeof(tx->tx_req_done));
 
         if (msgsize != 0) {
                 tx->tx_msg.ptlm_magic = PTLLND_MSG_MAGIC;
@@ -428,7 +507,8 @@ ptllnd_abort_tx(ptllnd_tx_t *tx, ptl_handle_md_t *mdh)
         lnet_ni_t       *ni = peer->plp_ni;
         int              rc;
         time_t           start = cfs_time_current_sec();
-        int              w = PTLLND_WARN_LONG_WAIT;
+        ptllnd_ni_t     *plni = ni->ni_data;
+        int              w = plni->plni_long_wait;
 
         while (!PtlHandleIsEqual(*mdh, PTL_INVALID_HANDLE)) {
                 rc = PtlMDUnlink(*mdh);
@@ -437,13 +517,14 @@ ptllnd_abort_tx(ptllnd_tx_t *tx, ptl_handle_md_t *mdh)
                         return;
                 LASSERT (rc == PTL_MD_IN_USE);
 #endif
-                if (cfs_time_current_sec() > start + w) {
+                if (w > 0 && cfs_time_current_sec() > start + w/1000) {
                         CWARN("Waited %ds to abort tx to %s\n",
-                              w, libcfs_id2str(peer->plp_id));
+                              (int)(cfs_time_current_sec() - start),
+                              libcfs_id2str(peer->plp_id));
                         w *= 2;
                 }
                 /* Wait for ptllnd_tx_event() to invalidate */
-                ptllnd_wait(ni, w*1000);
+                ptllnd_wait(ni, w);
         }
 }
 
@@ -487,8 +568,11 @@ ptllnd_tx_done(ptllnd_tx_t *tx)
                 list_del_init(&tx->tx_list);
 
         if (tx->tx_status != 0) {
-                CERROR("Completing tx with error\n");
-                ptllnd_debug_tx(tx);
+                if (plni->plni_debug) {
+                        CERROR("Completing tx for %s with error %d\n",
+                               libcfs_id2str(peer->plp_id), tx->tx_status);
+                        ptllnd_debug_tx(tx);
+                }
                 ptllnd_close_peer(peer, tx->tx_status);
         }
         
@@ -530,11 +614,6 @@ ptllnd_set_txiov(ptllnd_tx_t *tx,
                 return 0;
         }
 
-        CDEBUG(D_NET, "niov  =%d\n",niov);
-        CDEBUG(D_NET, "offset=%d\n",offset);
-        CDEBUG(D_NET, "len   =%d\n",len);
-
-
         /*
          * Remove iovec's at the beginning that
          * are skipped because of the offset.
@@ -549,10 +628,6 @@ ptllnd_set_txiov(ptllnd_tx_t *tx,
                 iov++;
         }
 
-        CDEBUG(D_NET, "niov  =%d (after)\n",niov);
-        CDEBUG(D_NET, "offset=%d (after)\n",offset);
-        CDEBUG(D_NET, "len   =%d (after)\n",len);
-
         for (;;) {
                 int temp_offset = offset;
                 int resid = len;
@@ -561,11 +636,6 @@ ptllnd_set_txiov(ptllnd_tx_t *tx,
                         return -ENOMEM;
 
                 for (npiov = 0;; npiov++) {
-                        CDEBUG(D_NET, "npiov=%d\n",npiov);
-                        CDEBUG(D_NET, "offset=%d\n",temp_offset);
-                        CDEBUG(D_NET, "len=%d\n",resid);
-                        CDEBUG(D_NET, "iov[npiov].iov_len=%d\n",iov[npiov].iov_len);
-
                         LASSERT (npiov < niov);
                         LASSERT (iov->iov_len >= temp_offset);
 
@@ -584,8 +654,6 @@ ptllnd_set_txiov(ptllnd_tx_t *tx,
                 if (npiov == niov) {
                         tx->tx_niov = niov;
                         tx->tx_iov = piov;
-                        CDEBUG(D_NET, "tx->tx_iov=%p\n",tx->tx_iov);
-                        CDEBUG(D_NET, "tx->tx_niov=%d\n",tx->tx_niov);
                         return 0;
                 }
 
@@ -645,7 +713,8 @@ ptllnd_post_buffer(ptllnd_buffer_t *buf)
                          anyid, LNET_MSG_MATCHBITS, 0,
                          PTL_UNLINK, PTL_INS_AFTER, &meh);
         if (rc != PTL_OK) {
-                CERROR("PtlMEAttach failed: %d\n", rc);
+                CERROR("PtlMEAttach failed: %s(%d)\n",
+                       ptllnd_errtype2str(rc), rc);
                 return -ENOMEM;
         }
 
@@ -656,7 +725,8 @@ ptllnd_post_buffer(ptllnd_buffer_t *buf)
         if (rc == PTL_OK)
                 return 0;
 
-        CERROR("PtlMDAttach failed: %d\n", rc);
+        CERROR("PtlMDAttach failed: %s(%d)\n",
+               ptllnd_errtype2str(rc), rc);
 
         buf->plb_posted = 0;
         plni->plni_nposted_buffers--;
@@ -677,11 +747,14 @@ ptllnd_check_sends(ptllnd_peer_t *peer)
         ptl_handle_md_t mdh;
         int             rc;
 
-        CDEBUG(D_NET, "plp_outstanding_credits=%d\n",peer->plp_outstanding_credits);
+        CDEBUG(D_NET, "%s: [%d/%d+%d(%d)\n",
+               libcfs_id2str(peer->plp_id), peer->plp_credits,
+               peer->plp_outstanding_credits, peer->plp_sent_credits,
+               plni->plni_peer_credits + peer->plp_lazy_credits);
 
         if (list_empty(&peer->plp_txq) &&
-            peer->plp_outstanding_credits >=
-            PTLLND_CREDIT_HIGHWATER(plni)) {
+            peer->plp_outstanding_credits >= PTLLND_CREDIT_HIGHWATER(plni) &&
+            peer->plp_credits != 0) {
 
                 tx = ptllnd_new_tx(peer, PTLLND_MSG_TYPE_NOOP, 0);
                 CDEBUG(D_NET, "NOOP tx=%p\n",tx);
@@ -689,6 +762,7 @@ ptllnd_check_sends(ptllnd_peer_t *peer)
                         CERROR("Can't return credits to %s\n",
                                libcfs_id2str(peer->plp_id));
                 } else {
+                        ptllnd_set_tx_deadline(tx);
                         list_add_tail(&tx->tx_list, &peer->plp_txq);
                 }
         }
@@ -696,25 +770,37 @@ ptllnd_check_sends(ptllnd_peer_t *peer)
         while (!list_empty(&peer->plp_txq)) {
                 tx = list_entry(peer->plp_txq.next, ptllnd_tx_t, tx_list);
 
-                CDEBUG(D_NET, "Looking at TX=%p\n",tx);
-                CDEBUG(D_NET, "plp_credits=%d\n",peer->plp_credits);
-                CDEBUG(D_NET, "plp_outstanding_credits=%d\n",peer->plp_outstanding_credits);
-
                 LASSERT (tx->tx_msgsize > 0);
 
                 LASSERT (peer->plp_outstanding_credits >= 0);
-                LASSERT (peer->plp_outstanding_credits <=
-                         plni->plni_peer_credits);
+                LASSERT (peer->plp_sent_credits >= 0);
+                LASSERT (peer->plp_outstanding_credits + peer->plp_sent_credits
+                         <= plni->plni_peer_credits + peer->plp_lazy_credits);
                 LASSERT (peer->plp_credits >= 0);
-                LASSERT (peer->plp_credits <= peer->plp_max_credits);
 
-                if (peer->plp_credits == 0)     /* no credits */
+                if (peer->plp_credits == 0) {   /* no credits */
+                        PTLLND_HISTORY("%s[%d/%d+%d(%d)]: no creds for %p",
+                                       libcfs_id2str(peer->plp_id),
+                                       peer->plp_credits,
+                                       peer->plp_outstanding_credits,
+                                       peer->plp_sent_credits,
+                                       plni->plni_peer_credits +
+                                       peer->plp_lazy_credits, tx);
                         break;
-
+                }
+                
                 if (peer->plp_credits == 1 &&   /* last credit reserved for */
-                    peer->plp_outstanding_credits == 0) /* returning credits */
+                    peer->plp_outstanding_credits == 0) { /* returning credits */
+                        PTLLND_HISTORY("%s[%d/%d+%d(%d)]: too few creds for %p",
+                                       libcfs_id2str(peer->plp_id),
+                                       peer->plp_credits,
+                                       peer->plp_outstanding_credits,
+                                       peer->plp_sent_credits,
+                                       plni->plni_peer_credits +
+                                       peer->plp_lazy_credits, tx);
                         break;
-
+                }
+                
                 list_del(&tx->tx_list);
                 list_add_tail(&tx->tx_list, &peer->plp_activeq);
 
@@ -734,12 +820,11 @@ ptllnd_check_sends(ptllnd_peer_t *peer)
                  * until I receive the HELLO back */
                 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
 
-                CDEBUG(D_NET, "Returning %d to peer\n",peer->plp_outstanding_credits);
-
                 /*
                  * Return all the credits we have
                  */
                 tx->tx_msg.ptlm_credits = peer->plp_outstanding_credits;
+                peer->plp_sent_credits += peer->plp_outstanding_credits;
                 peer->plp_outstanding_credits = 0;
 
                 /*
@@ -761,21 +846,36 @@ ptllnd_check_sends(ptllnd_peer_t *peer)
 
                 rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);
                 if (rc != PTL_OK) {
-                        CERROR("PtlMDBind for %s failed: %d\n",
-                               libcfs_id2str(peer->plp_id), rc);
+                        CERROR("PtlMDBind for %s failed: %s(%d)\n",
+                               libcfs_id2str(peer->plp_id),
+                               ptllnd_errtype2str(rc), rc);
                         tx->tx_status = -EIO;
                         ptllnd_tx_done(tx);
                         break;
                 }
 
+                LASSERT (tx->tx_type != PTLLND_RDMA_WRITE &&
+                         tx->tx_type != PTLLND_RDMA_READ);
+                
                 tx->tx_reqmdh = mdh;
-                PTLLND_DBGT_STAMP(tx->tx_req_posted);
+                gettimeofday(&tx->tx_req_posted, NULL);
+
+                PTLLND_HISTORY("%s[%d/%d+%d(%d)]: %s %p c %d",
+                               libcfs_id2str(peer->plp_id),
+                               peer->plp_credits,
+                               peer->plp_outstanding_credits,
+                               peer->plp_sent_credits,
+                               plni->plni_peer_credits +
+                               peer->plp_lazy_credits,
+                               ptllnd_msgtype2str(tx->tx_type), tx,
+                               tx->tx_msg.ptlm_credits);
 
                 rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,
                             plni->plni_portal, 0, LNET_MSG_MATCHBITS, 0, 0);
                 if (rc != PTL_OK) {
-                        CERROR("PtlPut for %s failed: %d\n",
-                               libcfs_id2str(peer->plp_id), rc);
+                        CERROR("PtlPut for %s failed: %s(%d)\n",
+                               libcfs_id2str(peer->plp_id),
+                               ptllnd_errtype2str(rc), rc);
                         tx->tx_status = -EIO;
                         ptllnd_tx_done(tx);
                         break;
@@ -832,49 +932,43 @@ ptllnd_passive_rdma(ptllnd_peer_t *peer, int type, lnet_msg_t *msg,
         ptllnd_set_md_buffer(&md, tx);
 
         start = cfs_time_current_sec();
-        w = PTLLND_WARN_LONG_WAIT;
+        w = plni->plni_long_wait;
 
         while (!peer->plp_recvd_hello) {        /* wait to validate plp_match */
                 if (peer->plp_closing) {
                         rc = -EIO;
                         goto failed;
                 }
-                if (cfs_time_current_sec() > start + w) {
+                if (w > 0 && cfs_time_current_sec() > start + w/1000) {
                         CWARN("Waited %ds to connect to %s\n",
-                              w, libcfs_id2str(peer->plp_id));
+                              (int)(cfs_time_current_sec() - start),
+                              libcfs_id2str(peer->plp_id));
                         w *= 2;
                 }
-                ptllnd_wait(ni, w*1000);
+                ptllnd_wait(ni, w);
         }
 
         if (peer->plp_match < PTL_RESERVED_MATCHBITS)
                 peer->plp_match = PTL_RESERVED_MATCHBITS;
         matchbits = peer->plp_match++;
-        CDEBUG(D_NET, "matchbits " LPX64 " %s\n", matchbits,
-               ptllnd_ptlid2str(peer->plp_ptlid));
 
         rc = PtlMEAttach(plni->plni_nih, plni->plni_portal, peer->plp_ptlid,
                          matchbits, 0, PTL_UNLINK, PTL_INS_BEFORE, &meh);
         if (rc != PTL_OK) {
-                CERROR("PtlMEAttach for %s failed: %d\n",
-                       libcfs_id2str(peer->plp_id), rc);
+                CERROR("PtlMEAttach for %s failed: %s(%d)\n",
+                       libcfs_id2str(peer->plp_id),
+                       ptllnd_errtype2str(rc), rc);
                 rc = -EIO;
                 goto failed;
         }
 
-        CDEBUG(D_NET, "md.start=%p\n",md.start);
-        CDEBUG(D_NET, "md.length=%d\n",md.length);
-        CDEBUG(D_NET, "md.threshold=%d\n",md.threshold);
-        CDEBUG(D_NET, "md.max_size=%d\n",md.max_size);
-        CDEBUG(D_NET, "md.options=0x%x\n",md.options);
-        CDEBUG(D_NET, "md.user_ptr=%p\n",md.user_ptr);
-
-        PTLLND_DBGT_STAMP(tx->tx_bulk_posted);
+        gettimeofday(&tx->tx_bulk_posted, NULL);
 
         rc = PtlMDAttach(meh, md, LNET_UNLINK, &mdh);
         if (rc != PTL_OK) {
-                CERROR("PtlMDAttach for %s failed: %d\n",
-                       libcfs_id2str(peer->plp_id), rc);
+                CERROR("PtlMDAttach for %s failed: %s(%d)\n",
+                       libcfs_id2str(peer->plp_id),
+                       ptllnd_errtype2str(rc), rc);
                 rc2 = PtlMEUnlink(meh);
                 LASSERT (rc2 == PTL_OK);
                 rc = -EIO;
@@ -903,6 +997,17 @@ ptllnd_passive_rdma(ptllnd_peer_t *peer, int type, lnet_msg_t *msg,
         }
 
         tx->tx_lnetmsg = msg;
+        PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post passive %s p %d %p",
+                       libcfs_id2str(msg->msg_target),
+                       peer->plp_credits, peer->plp_outstanding_credits,
+                       peer->plp_sent_credits,
+                       plni->plni_peer_credits + peer->plp_lazy_credits,
+                       lnet_msgtyp2str(msg->msg_type),
+                       (le32_to_cpu(msg->msg_type) == LNET_MSG_PUT) ? 
+                       le32_to_cpu(msg->msg_hdr.msg.put.ptl_index) :
+                       (le32_to_cpu(msg->msg_type) == LNET_MSG_GET) ? 
+                       le32_to_cpu(msg->msg_hdr.msg.get.ptl_index) : -1,
+                       tx);
         ptllnd_post_tx(tx);
         return 0;
 
@@ -953,8 +1058,9 @@ ptllnd_active_rdma(ptllnd_peer_t *peer, int type,
 
         rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);
         if (rc != PTL_OK) {
-                CERROR("PtlMDBind for %s failed: %d\n",
-                       libcfs_id2str(peer->plp_id), rc);
+                CERROR("PtlMDBind for %s failed: %s(%d)\n",
+                       libcfs_id2str(peer->plp_id),
+                       ptllnd_errtype2str(rc), rc);
                 rc = -EIO;
                 goto failed;
         }
@@ -962,8 +1068,9 @@ ptllnd_active_rdma(ptllnd_peer_t *peer, int type,
         tx->tx_bulkmdh = mdh;
         tx->tx_lnetmsg = msg;
 
+        ptllnd_set_tx_deadline(tx);
         list_add_tail(&tx->tx_list, &peer->plp_activeq);
-        PTLLND_DBGT_STAMP(tx->tx_bulk_posted);
+        gettimeofday(&tx->tx_bulk_posted, NULL);
 
         if (type == PTLLND_RDMA_READ)
                 rc = PtlGet(mdh, peer->plp_ptlid,
@@ -976,8 +1083,9 @@ ptllnd_active_rdma(ptllnd_peer_t *peer, int type,
         if (rc == PTL_OK)
                 return 0;
 
-        CERROR("Can't initiate RDMA with %s: %d\n",
-               libcfs_id2str(peer->plp_id), rc);
+        CERROR("Can't initiate RDMA with %s: %s(%d)\n",
+               libcfs_id2str(peer->plp_id),
+               ptllnd_errtype2str(rc), rc);
 
         tx->tx_lnetmsg = NULL;
  failed:
@@ -1021,14 +1129,10 @@ ptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *msg)
                 LBUG();
 
         case LNET_MSG_ACK:
-                CDEBUG(D_NET, "LNET_MSG_ACK\n");
-
                 LASSERT (msg->msg_len == 0);
                 break;                          /* send IMMEDIATE */
 
         case LNET_MSG_GET:
-                CDEBUG(D_NET, "LNET_MSG_GET nob=%d\n",msg->msg_md->md_length);
-
                 if (msg->msg_target_is_router)
                         break;                  /* send IMMEDIATE */
 
@@ -1047,10 +1151,8 @@ ptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *msg)
 
         case LNET_MSG_REPLY:
         case LNET_MSG_PUT:
-                CDEBUG(D_NET, "LNET_MSG_PUT nob=%d\n",msg->msg_len);
                 nob = msg->msg_len;
                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);
-                CDEBUG(D_NET, "msg_size=%d max=%d\n",msg->msg_len,plp->plp_max_msg_size);
                 if (nob <= plp->plp_max_msg_size)
                         break;                  /* send IMMEDIATE */
 
@@ -1064,7 +1166,6 @@ ptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *msg)
         /* send IMMEDIATE
          * NB copy the payload so we don't have to do a fragmented send */
 
-        CDEBUG(D_NET, "IMMEDIATE len=%d\n", msg->msg_len);
         tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_IMMEDIATE, msg->msg_len);
         if (tx == NULL) {
                 CERROR("Can't allocate tx for lnet type %d to %s\n",
@@ -1080,6 +1181,17 @@ ptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *msg)
         tx->tx_msg.ptlm_u.immediate.kptlim_hdr = msg->msg_hdr;
 
         tx->tx_lnetmsg = msg;
+        PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post immediate %s p %d %p",
+                       libcfs_id2str(msg->msg_target),
+                       plp->plp_credits, plp->plp_outstanding_credits,
+                       plp->plp_sent_credits,
+                       plni->plni_peer_credits + plp->plp_lazy_credits,
+                       lnet_msgtyp2str(msg->msg_type),
+                       (le32_to_cpu(msg->msg_type) == LNET_MSG_PUT) ? 
+                       le32_to_cpu(msg->msg_hdr.msg.put.ptl_index) :
+                       (le32_to_cpu(msg->msg_type) == LNET_MSG_GET) ? 
+                       le32_to_cpu(msg->msg_hdr.msg.get.ptl_index) : -1,
+                       tx);
         ptllnd_post_tx(tx);
         ptllnd_peer_decref(plp);
         return 0;
@@ -1092,9 +1204,14 @@ ptllnd_rx_done(ptllnd_rx_t *rx)
         lnet_ni_t     *ni = plp->plp_ni;
         ptllnd_ni_t   *plni = ni->ni_data;
 
-        CDEBUG(D_NET, "rx=%p\n", rx);
-
         plp->plp_outstanding_credits++;
+
+        PTLLND_HISTORY("%s[%d/%d+%d(%d)]: rx=%p done\n",
+                       libcfs_id2str(plp->plp_id),
+                       plp->plp_credits, plp->plp_outstanding_credits, 
+                       plp->plp_sent_credits,
+                       plni->plni_peer_credits + plp->plp_lazy_credits, rx);
+
         ptllnd_check_sends(rx->rx_peer);
 
         LASSERT (plni->plni_nrxs > 0);
@@ -1129,7 +1246,6 @@ ptllnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
 
         case PTLLND_MSG_TYPE_IMMEDIATE:
                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[mlen]);
-                CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE nob=%d\n",nob);
                 if (nob > rx->rx_nob) {
                         CERROR("Immediate message from %s too big: %d(%d)\n",
                                libcfs_id2str(rx->rx_peer->plp_id),
@@ -1145,14 +1261,12 @@ ptllnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
                 break;
 
         case PTLLND_MSG_TYPE_PUT:
-                CDEBUG(D_NET, "PTLLND_MSG_TYPE_PUT offset=%d mlen=%d\n",offset,mlen);
                 rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_READ, msg,
                                         rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
                                         niov, iov, offset, mlen);
                 break;
 
         case PTLLND_MSG_TYPE_GET:
-                CDEBUG(D_NET, "PTLLND_MSG_TYPE_GET\n");
                 if (msg != NULL)
                         rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, msg,
                                                 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
@@ -1170,15 +1284,6 @@ ptllnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
 }
 
 void
-ptllnd_abort_on_nak(lnet_ni_t *ni)
-{
-        ptllnd_ni_t      *plni = ni->ni_data;
-
-        if (plni->plni_abort_on_nak)
-                abort();
-}
-
-void
 ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
                      kptl_msg_t *msg, unsigned int nob)
 {
@@ -1210,9 +1315,12 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
         msg_version = flip ? __swab16(msg->ptlm_version) : msg->ptlm_version;
 
         if (msg_version != PTLLND_MSG_VERSION) {
-                CERROR("Bad protocol version %04x from %s\n", 
-                       (__u32)msg_version, ptllnd_ptlid2str(initiator));
-                ptllnd_abort_on_nak(ni);
+                CERROR("Bad protocol version %04x from %s: %04x expected\n", 
+                       (__u32)msg_version, ptllnd_ptlid2str(initiator), PTLLND_MSG_VERSION);
+
+                if (plni->plni_abort_on_protocol_mismatch)
+                        abort();
+
                 return;
         }
 
@@ -1260,7 +1368,13 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
                 CERROR("NAK from %s (%s)\n", 
                        libcfs_id2str(srcid),
                        ptllnd_ptlid2str(initiator));
-                ptllnd_abort_on_nak(ni);
+
+                if (plni->plni_dump_on_nak)
+                        ptllnd_dump_debug(ni, srcid);
+                
+                if (plni->plni_abort_on_nak)
+                        abort();
+                
                 return;
         }
         
@@ -1283,12 +1397,14 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
                        libcfs_id2str(srcid));
                 return;
         }
-       
+
+        PTLLND_HISTORY("RX %s: %s %d %p", libcfs_id2str(srcid), 
+                       ptllnd_msgtype2str(msg->ptlm_type),
+                       msg->ptlm_credits, &rx);
+
         switch (msg->ptlm_type) {
         case PTLLND_MSG_TYPE_PUT:
         case PTLLND_MSG_TYPE_GET:
-                CDEBUG(D_NET, "PTLLND_MSG_TYPE_%s\n",
-                        msg->ptlm_type==PTLLND_MSG_TYPE_PUT ? "PUT" : "GET");
                 if (nob < basenob + sizeof(kptl_rdma_msg_t)) {
                         CERROR("Short rdma request from %s(%s)\n",
                                libcfs_id2str(srcid),
@@ -1300,7 +1416,6 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
                 break;
 
         case PTLLND_MSG_TYPE_IMMEDIATE:
-                CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE\n");
                 if (nob < offsetof(kptl_msg_t,
                                    ptlm_u.immediate.kptlim_payload)) {
                         CERROR("Short immediate from %s(%s)\n",
@@ -1311,9 +1426,6 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
                 break;
 
         case PTLLND_MSG_TYPE_HELLO:
-                CDEBUG(D_NET, "PTLLND_MSG_TYPE_HELLO from %s(%s)\n",
-                               libcfs_id2str(srcid),
-                               ptllnd_ptlid2str(initiator));
                 if (nob < basenob + sizeof(kptl_hello_msg_t)) {
                         CERROR("Short hello from %s(%s)\n",
                                libcfs_id2str(srcid),
@@ -1327,9 +1439,6 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
                 break;
                 
         case PTLLND_MSG_TYPE_NOOP:
-                CDEBUG(D_NET, "PTLLND_MSG_TYPE_NOOP from %s(%s)\n",
-                               libcfs_id2str(srcid),
-                               ptllnd_ptlid2str(initiator));        
                 break;
 
         default:
@@ -1339,8 +1448,7 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
                 return;
         }
 
-        plp = ptllnd_find_peer(ni, srcid,
-                               msg->ptlm_type == PTLLND_MSG_TYPE_HELLO);
+        plp = ptllnd_find_peer(ni, srcid, 0);
         if (plp == NULL) {
                 CERROR("Can't find peer %s\n", libcfs_id2str(srcid));
                 return;
@@ -1354,20 +1462,11 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
                         return;
                 }
 
-                CDEBUG(D_NET, "maxsz %d match "LPX64" stamp "LPX64"\n",
-                       msg->ptlm_u.hello.kptlhm_max_msg_size,
-                       msg->ptlm_u.hello.kptlhm_matchbits,
-                       msg->ptlm_srcstamp);
-
-                plp->plp_max_msg_size = MAX(plni->plni_max_msg_size,
-                        msg->ptlm_u.hello.kptlhm_max_msg_size);
+                plp->plp_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
                 plp->plp_match = msg->ptlm_u.hello.kptlhm_matchbits;
                 plp->plp_stamp = msg->ptlm_srcstamp;
-                plp->plp_max_credits += msg->ptlm_credits;
                 plp->plp_recvd_hello = 1;
 
-                CDEBUG(D_NET, "plp_max_msg_size=%d\n",plp->plp_max_msg_size);
-
         } else if (!plp->plp_recvd_hello) {
 
                 CERROR("Bad message type %d (HELLO expected) from %s\n",
@@ -1384,18 +1483,21 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
                 return;
         }
 
+        /* Check peer only sends when I've sent her credits */
+        if (plp->plp_sent_credits == 0) {
+                CERROR("%s[%d/%d+%d(%d)]: unexpected message\n",
+                       libcfs_id2str(plp->plp_id),
+                       plp->plp_credits, plp->plp_outstanding_credits, 
+                       plp->plp_sent_credits,
+                       plni->plni_peer_credits + plp->plp_lazy_credits);
+                return;
+        }
+        plp->plp_sent_credits--;
+        
+        /* No check for credit overflow - the peer may post new buffers after
+         * the startup handshake. */
         if (msg->ptlm_credits > 0) {
-                CDEBUG(D_NET, "Getting back %d credits from peer\n",msg->ptlm_credits);
-                if (plp->plp_credits + msg->ptlm_credits >
-                    plp->plp_max_credits) {
-                        CWARN("Too many credits from %s: %d + %d > %d\n",
-                              libcfs_id2str(srcid),
-                              plp->plp_credits, msg->ptlm_credits,
-                              plp->plp_max_credits);
-                        plp->plp_credits = plp->plp_max_credits;
-                } else {
-                        plp->plp_credits += msg->ptlm_credits;
-                }
+                plp->plp_credits += msg->ptlm_credits;
                 ptllnd_check_sends(plp);
         }
 
@@ -1406,8 +1508,6 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
         rx.rx_nob       = nob;
         plni->plni_nrxs++;
 
-        CDEBUG(D_NET, "rx=%p type=%d\n",&rx,msg->ptlm_type);
-
         switch (msg->ptlm_type) {
         default: /* message types have been checked already */
                 ptllnd_rx_done(&rx);
@@ -1415,20 +1515,15 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
 
         case PTLLND_MSG_TYPE_PUT:
         case PTLLND_MSG_TYPE_GET:
-                CDEBUG(D_NET, "PTLLND_MSG_TYPE_%s\n",
-                        msg->ptlm_type==PTLLND_MSG_TYPE_PUT ? "PUT" : "GET");
                 rc = lnet_parse(ni, &msg->ptlm_u.rdma.kptlrm_hdr,
                                 msg->ptlm_srcnid, &rx, 1);
-                CDEBUG(D_NET, "lnet_parse rc=%d\n",rc);
                 if (rc < 0)
                         ptllnd_rx_done(&rx);
                 break;
 
         case PTLLND_MSG_TYPE_IMMEDIATE:
-                CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE\n");
                 rc = lnet_parse(ni, &msg->ptlm_u.immediate.kptlim_hdr,
                                 msg->ptlm_srcnid, &rx, 0);
-                CDEBUG(D_NET, "lnet_parse rc=%d\n",rc);
                 if (rc < 0)
                         ptllnd_rx_done(&rx);
                 break;
@@ -1450,12 +1545,12 @@ ptllnd_buf_event (lnet_ni_t *ni, ptl_event_t *event)
         LASSERT (event->type == PTL_EVENT_PUT_END ||
                  event->type == PTL_EVENT_UNLINK);
 
-        CDEBUG(D_NET, "buf=%p event=%d\n",buf,event->type);
-
         if (event->ni_fail_type != PTL_NI_OK) {
 
-                CERROR("event type %d, status %d from %s\n",
-                       event->type, event->ni_fail_type,
+                CERROR("event type %s(%d), status %s(%d) from %s\n",
+                       ptllnd_evtype2str(event->type), event->type,
+                       ptllnd_errtype2str(event->ni_fail_type), 
+                       event->ni_fail_type,
                        ptllnd_ptlid2str(event->initiator));
 
         } else if (event->type == PTL_EVENT_PUT_END) {
@@ -1463,7 +1558,7 @@ ptllnd_buf_event (lnet_ni_t *ni, ptl_event_t *event)
                 /* Portals can't force message alignment - someone sending an
                  * odd-length message could misalign subsequent messages */
                 if ((event->mlength & 7) != 0) {
-                        CERROR("Message from %s has odd length %d: "
+                        CERROR("Message from %s has odd length %llu: "
                                "probable version incompatibility\n",
                                ptllnd_ptlid2str(event->initiator),
                                event->mlength);
@@ -1486,8 +1581,6 @@ ptllnd_buf_event (lnet_ni_t *ni, ptl_event_t *event)
         repost = (event->type == PTL_EVENT_UNLINK);
 #endif
 
-        CDEBUG(D_NET, "repost=%d unlinked=%d\n",repost,unlinked);
-
         if (unlinked) {
                 LASSERT(buf->plb_posted);
                 buf->plb_posted = 0;
@@ -1513,36 +1606,41 @@ ptllnd_tx_event (lnet_ni_t *ni, ptl_event_t *event)
 #endif
 
         if (error)
-                CERROR("Error event type %d for %s for %s\n",
-                       event->type, ptllnd_msgtype2str(tx->tx_type),
+                CERROR("Error %s(%d) event %s(%d) unlinked %d, %s(%d) for %s\n",
+                       ptllnd_errtype2str(event->ni_fail_type),
+                       event->ni_fail_type,
+                       ptllnd_evtype2str(event->type), event->type,
+                       unlinked, ptllnd_msgtype2str(tx->tx_type), tx->tx_type,
                        libcfs_id2str(tx->tx_peer->plp_id));
 
         LASSERT (!PtlHandleIsEqual(event->md_handle, PTL_INVALID_HANDLE));
 
-        CDEBUG(D_NET, "tx=%p type=%s (%d)\n",tx,
-                ptllnd_msgtype2str(tx->tx_type),tx->tx_type);
-        CDEBUG(D_NET, "unlinked=%d\n",unlinked);
-        CDEBUG(D_NET, "error=%d\n",error);
-
         isreq = PtlHandleIsEqual(event->md_handle, tx->tx_reqmdh);
-        CDEBUG(D_NET, "isreq=%d\n",isreq);
         if (isreq) {
                 LASSERT (event->md.start == (void *)&tx->tx_msg);
                 if (unlinked) {
                         tx->tx_reqmdh = PTL_INVALID_HANDLE;
-                        PTLLND_DBGT_STAMP(tx->tx_req_done);
+                        gettimeofday(&tx->tx_req_done, NULL);
                 }
         }
 
         isbulk = PtlHandleIsEqual(event->md_handle, tx->tx_bulkmdh);
-        CDEBUG(D_NET, "isbulk=%d\n",isbulk);
         if ( isbulk && unlinked ) {
                 tx->tx_bulkmdh = PTL_INVALID_HANDLE;
-                PTLLND_DBGT_STAMP(tx->tx_bulk_done);
+                gettimeofday(&tx->tx_bulk_done, NULL);
         }
 
         LASSERT (!isreq != !isbulk);            /* always one and only 1 match */
 
+        PTLLND_HISTORY("%s[%d/%d+%d(%d)]: TX done %p %s%s",
+                       libcfs_id2str(tx->tx_peer->plp_id), 
+                       tx->tx_peer->plp_credits,
+                       tx->tx_peer->plp_outstanding_credits,
+                       tx->tx_peer->plp_sent_credits,
+                       plni->plni_peer_credits + tx->tx_peer->plp_lazy_credits,
+                       tx, isreq ? "REQ" : "BULK", unlinked ? "(unlinked)" : "");
+
+        LASSERT (!isreq != !isbulk);            /* always one and only 1 match */
         switch (tx->tx_type) {
         default:
                 LBUG();
@@ -1601,19 +1699,102 @@ ptllnd_tx_event (lnet_ni_t *ni, ptl_event_t *event)
                         tx->tx_status = -EIO;
                 list_del(&tx->tx_list);
                 list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);
-                CDEBUG(D_NET, "tx=%p ONTO ZOMBIE LIST\n",tx);
         }
 }
 
+ptllnd_tx_t *
+ptllnd_find_timed_out_tx(ptllnd_peer_t *peer)
+{
+        time_t            now = cfs_time_current_sec();
+        struct list_head *tmp;
+
+        list_for_each(tmp, &peer->plp_txq) {
+                ptllnd_tx_t *tx = list_entry(tmp, ptllnd_tx_t, tx_list);
+                
+                if (tx->tx_deadline < now)
+                        return tx;
+        }
+        
+        list_for_each(tmp, &peer->plp_activeq) {
+                ptllnd_tx_t *tx = list_entry(tmp, ptllnd_tx_t, tx_list);
+                
+                if (tx->tx_deadline < now)
+                        return tx;
+        }
+
+        return NULL;
+}
+
+void
+ptllnd_check_peer(ptllnd_peer_t *peer)
+{
+        ptllnd_tx_t *tx = ptllnd_find_timed_out_tx(peer);
+        
+        if (tx == NULL)
+                return;
+        
+        CERROR("%s: timed out\n", libcfs_id2str(peer->plp_id));
+        ptllnd_close_peer(peer, -ETIMEDOUT);
+}
+
+void
+ptllnd_watchdog (lnet_ni_t *ni, time_t now)
+{
+        ptllnd_ni_t      *plni = ni->ni_data;
+        const int         n = 4;
+        int               p = plni->plni_watchdog_interval;
+        int               chunk = plni->plni_peer_hash_size;
+        int               interval = now - (plni->plni_watchdog_nextt - p);
+        int               i;
+        struct list_head *hashlist;
+        struct list_head *tmp;
+        struct list_head *nxt;
+
+        /* Time to check for RDMA timeouts on a few more peers: 
+         * I try to do checks every 'p' seconds on a proportion of the peer
+         * table and I need to check every connection 'n' times within a
+         * timeout interval, to ensure I detect a timeout on any connection
+         * within (n+1)/n times the timeout interval. */
+
+        LASSERT (now >= plni->plni_watchdog_nextt);
+
+        if (plni->plni_timeout > n * interval) { /* Scan less than the whole table? */
+                chunk = (chunk * n * interval) / plni->plni_timeout;
+                if (chunk == 0)
+                        chunk = 1;
+        }
+
+        for (i = 0; i < chunk; i++) {
+                hashlist = &plni->plni_peer_hash[plni->plni_watchdog_peeridx];
+                
+                list_for_each_safe(tmp, nxt, hashlist) {
+                        ptllnd_check_peer(list_entry(tmp, ptllnd_peer_t, plp_list));
+                }
+                
+                plni->plni_watchdog_peeridx = (plni->plni_watchdog_peeridx + 1) %
+                                              plni->plni_peer_hash_size;
+        }
+
+        plni->plni_watchdog_nextt = now + p;
+}
+
 void
 ptllnd_wait (lnet_ni_t *ni, int milliseconds)
 {
+        static struct timeval  prevt;
+        static int             prevt_count;
+        static int             call_count;
+
+        struct timeval         start;
+        struct timeval         then;
+        struct timeval         now;
+        struct timeval         deadline;
+        
         ptllnd_ni_t   *plni = ni->ni_data;
         ptllnd_tx_t   *tx;
         ptl_event_t    event;
         int            which;
         int            rc;
-        int            blocked = 0;
         int            found = 0;
         int            timeout = 0;
 
@@ -1621,46 +1802,70 @@ ptllnd_wait (lnet_ni_t *ni, int milliseconds)
          * Otherwise block for the timeout and handle all events queued
          * then. */
 
-        for (;;) {
-                time_t  then = cfs_time_current_sec();
+        gettimeofday(&start, NULL);
+        call_count++;
 
-                CDEBUG(D_NET, "Poll(%d)\n", timeout);
-                
-                rc = PtlEQPoll(&plni->plni_eqh, 1,
-                               (timeout < 0) ? PTL_TIME_FOREVER : timeout,
-                               &event, &which);
-
-                if (timeout >= 0 &&
-                    (cfs_time_current_sec() - then)*1000 > timeout + 1000) {
-                        /* 1000 mS grace.............................^ */
-                        CERROR("SLOW PtlEQPoll(%d): %d seconds\n", timeout,
-                               (int)(cfs_time_current_sec() - then));
+        if (milliseconds <= 0) {
+                deadline = start;
+        } else {
+                deadline.tv_sec  = start.tv_sec  +  milliseconds/1000;
+                deadline.tv_usec = start.tv_usec + (milliseconds % 1000)*1000;
+
+                if (deadline.tv_usec >= 1000000) {
+                        start.tv_usec -= 1000000;
+                        start.tv_sec++;
                 }
+        }
+
+        for (;;) {
+                gettimeofday(&then, NULL);
                 
-                CDEBUG(D_NET, "PtlEQPoll rc=%d\n",rc);
-                timeout = 0;
+                rc = PtlEQPoll(&plni->plni_eqh, 1, timeout, &event, &which);
+
+                gettimeofday(&now, NULL);
+
+                if ((now.tv_sec*1000 + now.tv_usec/1000) - 
+                    (then.tv_sec*1000 + then.tv_usec/1000) > timeout + 1000) {
+                        /* 1000 mS grace...........................^ */
+                        CERROR("SLOW PtlEQPoll(%d): %dmS elapsed\n", timeout,
+                               (int)(now.tv_sec*1000 + now.tv_usec/1000) - 
+                               (int)(then.tv_sec*1000 + then.tv_usec/1000));
+                }
 
                 if (rc == PTL_EQ_EMPTY) {
-                        if (found ||            /* handled some events */
-                            milliseconds == 0 || /* just checking */
-                            blocked)            /* blocked already */
+                        if (found)              /* handled some events */
+                                break;
+
+                        if (now.tv_sec >= plni->plni_watchdog_nextt) { /* check timeouts? */
+                                ptllnd_watchdog(ni, now.tv_sec);
+                                LASSERT (now.tv_sec < plni->plni_watchdog_nextt);
+                        }
+                        
+                        if (now.tv_sec > deadline.tv_sec || /* timeout expired */
+                            (now.tv_sec == deadline.tv_sec &&
+                             now.tv_usec >= deadline.tv_usec))
                                 break;
 
-                        blocked = 1;
-                        timeout = milliseconds;
+                        if (milliseconds < 0 ||
+                            plni->plni_watchdog_nextt <= deadline.tv_sec)  {
+                                timeout = (plni->plni_watchdog_nextt - now.tv_sec)*1000;
+                        } else {
+                                timeout = (deadline.tv_sec - now.tv_sec)*1000 +
+                                          (deadline.tv_usec - now.tv_usec)/1000;
+                        }
+
                         continue;
                 }
-
+                
                 LASSERT (rc == PTL_OK || rc == PTL_EQ_DROPPED);
 
                 if (rc == PTL_EQ_DROPPED)
                         CERROR("Event queue: size %d is too small\n",
                                plni->plni_eq_size);
 
-                CDEBUG(D_NET, "event.type=%s(%d)\n",
-                       ptllnd_evtype2str(event.type),event.type);
-
+                timeout = 0;
                 found = 1;
+
                 switch (ptllnd_eventarg2type(event.md.user_ptr)) {
                 default:
                         LBUG();
@@ -1678,7 +1883,15 @@ ptllnd_wait (lnet_ni_t *ni, int milliseconds)
         while (!list_empty(&plni->plni_zombie_txs)) {
                 tx = list_entry(plni->plni_zombie_txs.next,
                                 ptllnd_tx_t, tx_list);
-                CDEBUG(D_NET, "Process ZOMBIE tx=%p\n",tx);
+                list_del_init(&tx->tx_list);
                 ptllnd_tx_done(tx);
         }
+
+        if (prevt.tv_sec == 0 ||
+            prevt.tv_sec != now.tv_sec) {
+                PTLLND_HISTORY("%d wait entered at %d.%06d - prev %d %d.%06d", 
+                               call_count, (int)start.tv_sec, (int)start.tv_usec,
+                               prevt_count, (int)prevt.tv_sec, (int)prevt.tv_usec);
+                prevt = now;
+        }
 }