Whamcloud - gitweb
* userspace (catamount) ptllnd changes
[fs/lustre-release.git] / lnet / ulnds / ptllnd / ptllnd_cb.c
index 96b0345..ec6170f 100644 (file)
 
 #include "ptllnd.h"
 
 
 #include "ptllnd.h"
 
+void
+ptllnd_set_tx_deadline(ptllnd_tx_t *tx)
+{
+        ptllnd_peer_t  *peer = tx->tx_peer;
+        lnet_ni_t      *ni = peer->plp_ni;
+        ptllnd_ni_t    *plni = ni->ni_data;
+
+        tx->tx_deadline = cfs_time_current_sec() + plni->plni_timeout;
+}
+
+void
+ptllnd_post_tx(ptllnd_tx_t *tx)
+{
+        ptllnd_peer_t  *peer = tx->tx_peer;
+
+        ptllnd_set_tx_deadline(tx);
+        list_add_tail(&tx->tx_list, &peer->plp_txq);
+        ptllnd_check_sends(peer);
+}
+
 char *
 ptllnd_ptlid2str(ptl_process_id_t id)
 {
 char *
 ptllnd_ptlid2str(ptl_process_id_t id)
 {
@@ -77,8 +97,9 @@ ptllnd_close_peer(ptllnd_peer_t *peer, int error)
         if (!list_empty(&peer->plp_txq) ||
             !list_empty(&peer->plp_activeq) ||
             error != 0) {
         if (!list_empty(&peer->plp_txq) ||
             !list_empty(&peer->plp_activeq) ||
             error != 0) {
-                CERROR("Closing %s\n", libcfs_id2str(peer->plp_id));
-                ptllnd_debug_peer(ni, peer->plp_id);
+                CWARN("Closing %s\n", libcfs_id2str(peer->plp_id));
+                if (plni->plni_debug)
+                        ptllnd_dump_debug(ni, peer->plp_id);
         }
         
         ptllnd_abort_txs(plni, &peer->plp_txq);
         }
         
         ptllnd_abort_txs(plni, &peer->plp_txq);
@@ -219,12 +240,14 @@ ptllnd_tx_typestr(int type)
 void
 ptllnd_debug_tx(ptllnd_tx_t *tx) 
 {
 void
 ptllnd_debug_tx(ptllnd_tx_t *tx) 
 {
-        CDEBUG(D_WARNING, "%s %s b "DBGT_FMT"/"DBGT_FMT
-               " r "DBGT_FMT"/"DBGT_FMT" status %d\n",
+        CDEBUG(D_WARNING, "%s %s b %ld.%06ld/%ld.%06ld"
+               " r %ld.%06ld/%ld.%06ld status %d\n",
                ptllnd_tx_typestr(tx->tx_type),
                ptllnd_tx_typestr(tx->tx_type),
-               libcfs_id2str(tx->tx_peer->plp_id)
-               DBGT_ARGS(tx->tx_bulk_posted) DBGT_ARGS(tx->tx_bulk_done)
-               DBGT_ARGS(tx->tx_req_posted) DBGT_ARGS(tx->tx_req_done),
+               libcfs_id2str(tx->tx_peer->plp_id),
+               tx->tx_bulk_posted.tv_sec, tx->tx_bulk_posted.tv_usec, 
+               tx->tx_bulk_done.tv_sec, tx->tx_bulk_done.tv_usec,
+               tx->tx_req_posted.tv_sec, tx->tx_req_posted.tv_usec,
+               tx->tx_req_done.tv_sec, tx->tx_req_done.tv_usec,
                tx->tx_status);
 }
 
                tx->tx_status);
 }
 
@@ -241,7 +264,7 @@ ptllnd_debug_peer(lnet_ni_t *ni, lnet_process_id_t id)
                 return;
         }
         
                 return;
         }
         
-        CDEBUG(D_WARNING, "%s %s%s [%d] "LPD64".%06d m "LPD64" q %d/%d c %d/%d+%d(%d)\n",
+        CDEBUG(D_WARNING, "%s %s%s [%d] "LPU64".%06d m "LPU64" q %d/%d c %d/%d+%d(%d)\n",
                libcfs_id2str(id), 
                plp->plp_recvd_hello ? "H" : "_",
                plp->plp_closing     ? "C" : "_",
                libcfs_id2str(id), 
                plp->plp_recvd_hello ? "H" : "_",
                plp->plp_closing     ? "C" : "_",
@@ -286,6 +309,12 @@ ptllnd_debug_peer(lnet_ni_t *ni, lnet_process_id_t id)
         }
         
         ptllnd_peer_decref(plp);
         }
         
         ptllnd_peer_decref(plp);
+}
+
+void
+ptllnd_dump_debug(lnet_ni_t *ni, lnet_process_id_t id)
+{
+        ptllnd_debug_peer(ni, id);
         ptllnd_dump_history();
 }
 
         ptllnd_dump_history();
 }
 
@@ -295,13 +324,11 @@ ptllnd_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive)
         lnet_process_id_t  id;
         ptllnd_peer_t     *peer;
         time_t             start = cfs_time_current_sec();
         lnet_process_id_t  id;
         ptllnd_peer_t     *peer;
         time_t             start = cfs_time_current_sec();
-        int                w = PTLLND_WARN_LONG_WAIT;
+        ptllnd_ni_t       *plni = ni->ni_data;
+        int                w = plni->plni_long_wait;
 
         /* This is only actually used to connect to routers at startup! */
 
         /* This is only actually used to connect to routers at startup! */
-        if (!alive) {
-                LBUG();
-                return;
-        }
+        LASSERT(alive);
 
         id.nid = nid;
         id.pid = LUSTRE_SRV_LNET_PID;
 
         id.nid = nid;
         id.pid = LUSTRE_SRV_LNET_PID;
@@ -312,13 +339,14 @@ ptllnd_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive)
 
         /* wait for the peer to reply */
         while (!peer->plp_recvd_hello) {
 
         /* wait for the peer to reply */
         while (!peer->plp_recvd_hello) {
-                if (cfs_time_current_sec() > start + w) {
+                if (w > 0 && cfs_time_current_sec() > start + w/1000) {
                         CWARN("Waited %ds to connect to %s\n",
                         CWARN("Waited %ds to connect to %s\n",
-                              w, libcfs_id2str(id));
+                              (int)(cfs_time_current_sec() - start),
+                              libcfs_id2str(id));
                         w *= 2;
                 }
                 
                         w *= 2;
                 }
                 
-                ptllnd_wait(ni, w*1000);
+                ptllnd_wait(ni, w);
         }
         
         ptllnd_peer_decref(peer);
         }
         
         ptllnd_peer_decref(peer);
@@ -444,10 +472,10 @@ ptllnd_new_tx(ptllnd_peer_t *peer, int type, int payload_nob)
         tx->tx_completing = 0;
         tx->tx_status = 0;
 
         tx->tx_completing = 0;
         tx->tx_status = 0;
 
-        PTLLND_DBGT_INIT(tx->tx_bulk_posted);
-        PTLLND_DBGT_INIT(tx->tx_bulk_done);
-        PTLLND_DBGT_INIT(tx->tx_req_posted);
-        PTLLND_DBGT_INIT(tx->tx_req_done);
+        memset(&tx->tx_bulk_posted, 0, sizeof(tx->tx_bulk_posted));
+        memset(&tx->tx_bulk_done, 0, sizeof(tx->tx_bulk_done));
+        memset(&tx->tx_req_posted, 0, sizeof(tx->tx_req_posted));
+        memset(&tx->tx_req_done, 0, sizeof(tx->tx_req_done));
 
         if (msgsize != 0) {
                 tx->tx_msg.ptlm_magic = PTLLND_MSG_MAGIC;
 
         if (msgsize != 0) {
                 tx->tx_msg.ptlm_magic = PTLLND_MSG_MAGIC;
@@ -479,7 +507,8 @@ ptllnd_abort_tx(ptllnd_tx_t *tx, ptl_handle_md_t *mdh)
         lnet_ni_t       *ni = peer->plp_ni;
         int              rc;
         time_t           start = cfs_time_current_sec();
         lnet_ni_t       *ni = peer->plp_ni;
         int              rc;
         time_t           start = cfs_time_current_sec();
-        int              w = PTLLND_WARN_LONG_WAIT;
+        ptllnd_ni_t     *plni = ni->ni_data;
+        int              w = plni->plni_long_wait;
 
         while (!PtlHandleIsEqual(*mdh, PTL_INVALID_HANDLE)) {
                 rc = PtlMDUnlink(*mdh);
 
         while (!PtlHandleIsEqual(*mdh, PTL_INVALID_HANDLE)) {
                 rc = PtlMDUnlink(*mdh);
@@ -488,13 +517,14 @@ ptllnd_abort_tx(ptllnd_tx_t *tx, ptl_handle_md_t *mdh)
                         return;
                 LASSERT (rc == PTL_MD_IN_USE);
 #endif
                         return;
                 LASSERT (rc == PTL_MD_IN_USE);
 #endif
-                if (cfs_time_current_sec() > start + w) {
+                if (w > 0 && cfs_time_current_sec() > start + w/1000) {
                         CWARN("Waited %ds to abort tx to %s\n",
                         CWARN("Waited %ds to abort tx to %s\n",
-                              w, libcfs_id2str(peer->plp_id));
+                              (int)(cfs_time_current_sec() - start),
+                              libcfs_id2str(peer->plp_id));
                         w *= 2;
                 }
                 /* Wait for ptllnd_tx_event() to invalidate */
                         w *= 2;
                 }
                 /* Wait for ptllnd_tx_event() to invalidate */
-                ptllnd_wait(ni, w*1000);
+                ptllnd_wait(ni, w);
         }
 }
 
         }
 }
 
@@ -538,8 +568,11 @@ ptllnd_tx_done(ptllnd_tx_t *tx)
                 list_del_init(&tx->tx_list);
 
         if (tx->tx_status != 0) {
                 list_del_init(&tx->tx_list);
 
         if (tx->tx_status != 0) {
-                CERROR("Completing tx with error\n");
-                ptllnd_debug_tx(tx);
+                if (plni->plni_debug) {
+                        CERROR("Completing tx for %s with error %d\n",
+                               libcfs_id2str(peer->plp_id), tx->tx_status);
+                        ptllnd_debug_tx(tx);
+                }
                 ptllnd_close_peer(peer, tx->tx_status);
         }
         
                 ptllnd_close_peer(peer, tx->tx_status);
         }
         
@@ -821,7 +854,7 @@ ptllnd_check_sends(ptllnd_peer_t *peer)
                          tx->tx_type != PTLLND_RDMA_READ);
                 
                 tx->tx_reqmdh = mdh;
                          tx->tx_type != PTLLND_RDMA_READ);
                 
                 tx->tx_reqmdh = mdh;
-                PTLLND_DBGT_STAMP(tx->tx_req_posted);
+                gettimeofday(&tx->tx_req_posted, NULL);
 
                 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: %s %p c %d",
                                libcfs_id2str(peer->plp_id),
 
                 PTLLND_HISTORY("%s[%d/%d+%d(%d)]: %s %p c %d",
                                libcfs_id2str(peer->plp_id),
@@ -894,26 +927,25 @@ ptllnd_passive_rdma(ptllnd_peer_t *peer, int type, lnet_msg_t *msg,
         ptllnd_set_md_buffer(&md, tx);
 
         start = cfs_time_current_sec();
         ptllnd_set_md_buffer(&md, tx);
 
         start = cfs_time_current_sec();
-        w = PTLLND_WARN_LONG_WAIT;
+        w = plni->plni_long_wait;
 
         while (!peer->plp_recvd_hello) {        /* wait to validate plp_match */
                 if (peer->plp_closing) {
                         rc = -EIO;
                         goto failed;
                 }
 
         while (!peer->plp_recvd_hello) {        /* wait to validate plp_match */
                 if (peer->plp_closing) {
                         rc = -EIO;
                         goto failed;
                 }
-                if (cfs_time_current_sec() > start + w) {
+                if (w > 0 && cfs_time_current_sec() > start + w/1000) {
                         CWARN("Waited %ds to connect to %s\n",
                         CWARN("Waited %ds to connect to %s\n",
-                              w, libcfs_id2str(peer->plp_id));
+                              (int)(cfs_time_current_sec() - start),
+                              libcfs_id2str(peer->plp_id));
                         w *= 2;
                 }
                         w *= 2;
                 }
-                ptllnd_wait(ni, w*1000);
+                ptllnd_wait(ni, w);
         }
 
         if (peer->plp_match < PTL_RESERVED_MATCHBITS)
                 peer->plp_match = PTL_RESERVED_MATCHBITS;
         matchbits = peer->plp_match++;
         }
 
         if (peer->plp_match < PTL_RESERVED_MATCHBITS)
                 peer->plp_match = PTL_RESERVED_MATCHBITS;
         matchbits = peer->plp_match++;
-        CDEBUG(D_NET, "matchbits " LPX64 " %s\n", matchbits,
-               ptllnd_ptlid2str(peer->plp_ptlid));
 
         rc = PtlMEAttach(plni->plni_nih, plni->plni_portal, peer->plp_ptlid,
                          matchbits, 0, PTL_UNLINK, PTL_INS_BEFORE, &meh);
 
         rc = PtlMEAttach(plni->plni_nih, plni->plni_portal, peer->plp_ptlid,
                          matchbits, 0, PTL_UNLINK, PTL_INS_BEFORE, &meh);
@@ -924,7 +956,7 @@ ptllnd_passive_rdma(ptllnd_peer_t *peer, int type, lnet_msg_t *msg,
                 goto failed;
         }
 
                 goto failed;
         }
 
-        PTLLND_DBGT_STAMP(tx->tx_bulk_posted);
+        gettimeofday(&tx->tx_bulk_posted, NULL);
 
         rc = PtlMDAttach(meh, md, LNET_UNLINK, &mdh);
         if (rc != PTL_OK) {
 
         rc = PtlMDAttach(meh, md, LNET_UNLINK, &mdh);
         if (rc != PTL_OK) {
@@ -1028,8 +1060,9 @@ ptllnd_active_rdma(ptllnd_peer_t *peer, int type,
         tx->tx_bulkmdh = mdh;
         tx->tx_lnetmsg = msg;
 
         tx->tx_bulkmdh = mdh;
         tx->tx_lnetmsg = msg;
 
+        ptllnd_set_tx_deadline(tx);
         list_add_tail(&tx->tx_list, &peer->plp_activeq);
         list_add_tail(&tx->tx_list, &peer->plp_activeq);
-        PTLLND_DBGT_STAMP(tx->tx_bulk_posted);
+        gettimeofday(&tx->tx_bulk_posted, NULL);
 
         if (type == PTLLND_RDMA_READ)
                 rc = PtlGet(mdh, peer->plp_ptlid,
 
         if (type == PTLLND_RDMA_READ)
                 rc = PtlGet(mdh, peer->plp_ptlid,
@@ -1242,18 +1275,6 @@ ptllnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
 }
 
 void
 }
 
 void
-ptllnd_abort_on_nak(lnet_ni_t *ni)
-{
-        ptllnd_ni_t      *plni = ni->ni_data;
-
-        if (plni->plni_dump_on_nak)
-                ptllnd_dump_history();
-
-        if (plni->plni_abort_on_nak)
-                abort();
-}
-
-void
 ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
                      kptl_msg_t *msg, unsigned int nob)
 {
 ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
                      kptl_msg_t *msg, unsigned int nob)
 {
@@ -1285,9 +1306,12 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
         msg_version = flip ? __swab16(msg->ptlm_version) : msg->ptlm_version;
 
         if (msg_version != PTLLND_MSG_VERSION) {
         msg_version = flip ? __swab16(msg->ptlm_version) : msg->ptlm_version;
 
         if (msg_version != PTLLND_MSG_VERSION) {
-                CERROR("Bad protocol version %04x from %s\n", 
-                       (__u32)msg_version, ptllnd_ptlid2str(initiator));
-                ptllnd_abort_on_nak(ni);
+                CERROR("Bad protocol version %04x from %s: %04x expected\n", 
+                       (__u32)msg_version, ptllnd_ptlid2str(initiator), PTLLND_MSG_VERSION);
+
+                if (plni->plni_abort_on_protocol_mismatch)
+                        abort();
+
                 return;
         }
 
                 return;
         }
 
@@ -1335,7 +1359,13 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
                 CERROR("NAK from %s (%s)\n", 
                        libcfs_id2str(srcid),
                        ptllnd_ptlid2str(initiator));
                 CERROR("NAK from %s (%s)\n", 
                        libcfs_id2str(srcid),
                        ptllnd_ptlid2str(initiator));
-                ptllnd_abort_on_nak(ni);
+
+                if (plni->plni_dump_on_nak)
+                        ptllnd_dump_debug(ni, srcid);
+                
+                if (plni->plni_abort_on_nak)
+                        abort();
+                
                 return;
         }
         
                 return;
         }
         
@@ -1581,14 +1611,14 @@ ptllnd_tx_event (lnet_ni_t *ni, ptl_event_t *event)
                 LASSERT (event->md.start == (void *)&tx->tx_msg);
                 if (unlinked) {
                         tx->tx_reqmdh = PTL_INVALID_HANDLE;
                 LASSERT (event->md.start == (void *)&tx->tx_msg);
                 if (unlinked) {
                         tx->tx_reqmdh = PTL_INVALID_HANDLE;
-                        PTLLND_DBGT_STAMP(tx->tx_req_done);
+                        gettimeofday(&tx->tx_req_done, NULL);
                 }
         }
 
         isbulk = PtlHandleIsEqual(event->md_handle, tx->tx_bulkmdh);
         if ( isbulk && unlinked ) {
                 tx->tx_bulkmdh = PTL_INVALID_HANDLE;
                 }
         }
 
         isbulk = PtlHandleIsEqual(event->md_handle, tx->tx_bulkmdh);
         if ( isbulk && unlinked ) {
                 tx->tx_bulkmdh = PTL_INVALID_HANDLE;
-                PTLLND_DBGT_STAMP(tx->tx_bulk_done);
+                gettimeofday(&tx->tx_bulk_done, NULL);
         }
 
         LASSERT (!isreq != !isbulk);            /* always one and only 1 match */
         }
 
         LASSERT (!isreq != !isbulk);            /* always one and only 1 match */
@@ -1663,6 +1693,82 @@ ptllnd_tx_event (lnet_ni_t *ni, ptl_event_t *event)
         }
 }
 
         }
 }
 
+ptllnd_tx_t *
+ptllnd_find_timed_out_tx(ptllnd_peer_t *peer)
+{
+        time_t            now = cfs_time_current_sec();
+        struct list_head *tmp;
+
+        list_for_each(tmp, &peer->plp_txq) {
+                ptllnd_tx_t *tx = list_entry(tmp, ptllnd_tx_t, tx_list);
+                
+                if (tx->tx_deadline < now)
+                        return tx;
+        }
+        
+        list_for_each(tmp, &peer->plp_activeq) {
+                ptllnd_tx_t *tx = list_entry(tmp, ptllnd_tx_t, tx_list);
+                
+                if (tx->tx_deadline < now)
+                        return tx;
+        }
+
+        return NULL;
+}
+
+void
+ptllnd_check_peer(ptllnd_peer_t *peer)
+{
+        ptllnd_tx_t *tx = ptllnd_find_timed_out_tx(peer);
+        
+        if (tx == NULL)
+                return;
+        
+        CERROR("%s: timed out\n", libcfs_id2str(peer->plp_id));
+        ptllnd_close_peer(peer, -ETIMEDOUT);
+}
+
+void
+ptllnd_watchdog (lnet_ni_t *ni, time_t now)
+{
+        ptllnd_ni_t      *plni = ni->ni_data;
+        const int         n = 4;
+        int               p = plni->plni_watchdog_interval;
+        int               chunk = plni->plni_peer_hash_size;
+        int               interval = now - (plni->plni_watchdog_nextt - p);
+        int               i;
+        struct list_head *hashlist;
+        struct list_head *tmp;
+        struct list_head *nxt;
+
+        /* Time to check for RDMA timeouts on a few more peers: 
+         * I try to do checks every 'p' seconds on a proportion of the peer
+         * table and I need to check every connection 'n' times within a
+         * timeout interval, to ensure I detect a timeout on any connection
+         * within (n+1)/n times the timeout interval. */
+
+        LASSERT (now >= plni->plni_watchdog_nextt);
+
+        if (plni->plni_timeout > n * interval) { /* Scan less than the whole table? */
+                chunk = (chunk * n * interval) / plni->plni_timeout;
+                if (chunk == 0)
+                        chunk = 1;
+        }
+
+        for (i = 0; i < chunk; i++) {
+                hashlist = &plni->plni_peer_hash[plni->plni_watchdog_peeridx];
+                
+                list_for_each_safe(tmp, nxt, hashlist) {
+                        ptllnd_check_peer(list_entry(tmp, ptllnd_peer_t, plp_list));
+                }
+                
+                plni->plni_watchdog_peeridx = (plni->plni_watchdog_peeridx + 1) %
+                                              plni->plni_peer_hash_size;
+        }
+
+        plni->plni_watchdog_nextt = now + p;
+}
+
 void
 ptllnd_wait (lnet_ni_t *ni, int milliseconds)
 {
 void
 ptllnd_wait (lnet_ni_t *ni, int milliseconds)
 {
@@ -1670,15 +1776,16 @@ ptllnd_wait (lnet_ni_t *ni, int milliseconds)
         static int             prevt_count;
         static int             call_count;
 
         static int             prevt_count;
         static int             call_count;
 
-        struct timeval         t1;
-        struct timeval         t2;
+        struct timeval         start;
+        struct timeval         then;
+        struct timeval         now;
+        struct timeval         deadline;
         
         ptllnd_ni_t   *plni = ni->ni_data;
         ptllnd_tx_t   *tx;
         ptl_event_t    event;
         int            which;
         int            rc;
         
         ptllnd_ni_t   *plni = ni->ni_data;
         ptllnd_tx_t   *tx;
         ptl_event_t    event;
         int            which;
         int            rc;
-        int            blocked = 0;
         int            found = 0;
         int            timeout = 0;
 
         int            found = 0;
         int            timeout = 0;
 
@@ -1686,44 +1793,70 @@ ptllnd_wait (lnet_ni_t *ni, int milliseconds)
          * Otherwise block for the timeout and handle all events queued
          * then. */
 
          * Otherwise block for the timeout and handle all events queued
          * then. */
 
-        gettimeofday(&t1, NULL);
+        gettimeofday(&start, NULL);
         call_count++;
 
         call_count++;
 
+        if (milliseconds <= 0) {
+                deadline = start;
+        } else {
+                deadline.tv_sec  = start.tv_sec  +  milliseconds/1000;
+                deadline.tv_usec = start.tv_usec + (milliseconds % 1000)*1000;
+
+                if (deadline.tv_usec >= 1000000) {
+                        start.tv_usec -= 1000000;
+                        start.tv_sec++;
+                }
+        }
+
         for (;;) {
         for (;;) {
-                time_t  then = cfs_time_current_sec();
+                gettimeofday(&then, NULL);
+                
+                rc = PtlEQPoll(&plni->plni_eqh, 1, timeout, &event, &which);
 
 
-                rc = PtlEQPoll(&plni->plni_eqh, 1,
-                               (timeout < 0) ? PTL_TIME_FOREVER : timeout,
-                               &event, &which);
+                gettimeofday(&now, NULL);
 
 
-                if (timeout >= 0 &&
-                    (cfs_time_current_sec() - then)*1000 > timeout + 1000) {
-                        /* 1000 mS grace.............................^ */
-                        CERROR("SLOW PtlEQPoll(%d): %d seconds\n", timeout,
-                               (int)(cfs_time_current_sec() - then));
+                if ((now.tv_sec*1000 + now.tv_usec/1000) - 
+                    (then.tv_sec*1000 + then.tv_usec/1000) > timeout + 1000) {
+                        /* 1000 mS grace...........................^ */
+                        CERROR("SLOW PtlEQPoll(%d): %dmS elapsed\n", timeout,
+                               (int)(now.tv_sec*1000 + now.tv_usec/1000) - 
+                               (int)(then.tv_sec*1000 + then.tv_usec/1000));
                 }
                 }
-                
-                timeout = 0;
 
                 if (rc == PTL_EQ_EMPTY) {
 
                 if (rc == PTL_EQ_EMPTY) {
-                        if (found ||            /* handled some events */
-                            milliseconds == 0 || /* just checking */
-                            blocked)            /* blocked already */
+                        if (found)              /* handled some events */
                                 break;
 
                                 break;
 
-                        blocked = 1;
-                        timeout = (milliseconds < 0) ?
-                                  PTL_TIME_FOREVER : milliseconds;
+                        if (now.tv_sec >= plni->plni_watchdog_nextt) { /* check timeouts? */
+                                ptllnd_watchdog(ni, now.tv_sec);
+                                LASSERT (now.tv_sec < plni->plni_watchdog_nextt);
+                        }
+                        
+                        if (now.tv_sec > deadline.tv_sec || /* timeout expired */
+                            (now.tv_sec == deadline.tv_sec &&
+                             now.tv_usec >= deadline.tv_usec))
+                                break;
+
+                        if (milliseconds < 0 ||
+                            plni->plni_watchdog_nextt <= deadline.tv_sec)  {
+                                timeout = (plni->plni_watchdog_nextt - now.tv_sec)*1000;
+                        } else {
+                                timeout = (deadline.tv_sec - now.tv_sec)*1000 +
+                                          (deadline.tv_usec - now.tv_usec)/1000;
+                        }
+
                         continue;
                 }
                         continue;
                 }
-
+                
                 LASSERT (rc == PTL_OK || rc == PTL_EQ_DROPPED);
 
                 if (rc == PTL_EQ_DROPPED)
                         CERROR("Event queue: size %d is too small\n",
                                plni->plni_eq_size);
 
                 LASSERT (rc == PTL_OK || rc == PTL_EQ_DROPPED);
 
                 if (rc == PTL_EQ_DROPPED)
                         CERROR("Event queue: size %d is too small\n",
                                plni->plni_eq_size);
 
+                timeout = 0;
                 found = 1;
                 found = 1;
+
                 switch (ptllnd_eventarg2type(event.md.user_ptr)) {
                 default:
                         LBUG();
                 switch (ptllnd_eventarg2type(event.md.user_ptr)) {
                 default:
                         LBUG();
@@ -1741,16 +1874,15 @@ ptllnd_wait (lnet_ni_t *ni, int milliseconds)
         while (!list_empty(&plni->plni_zombie_txs)) {
                 tx = list_entry(plni->plni_zombie_txs.next,
                                 ptllnd_tx_t, tx_list);
         while (!list_empty(&plni->plni_zombie_txs)) {
                 tx = list_entry(plni->plni_zombie_txs.next,
                                 ptllnd_tx_t, tx_list);
+                list_del_init(&tx->tx_list);
                 ptllnd_tx_done(tx);
         }
 
                 ptllnd_tx_done(tx);
         }
 
-        gettimeofday(&t2, NULL);
-                
         if (prevt.tv_sec == 0 ||
         if (prevt.tv_sec == 0 ||
-            prevt.tv_sec != t2.tv_sec) {
+            prevt.tv_sec != now.tv_sec) {
                 PTLLND_HISTORY("%d wait entered at %d.%06d - prev %d %d.%06d", 
                 PTLLND_HISTORY("%d wait entered at %d.%06d - prev %d %d.%06d", 
-                               call_count, (int)t1.tv_sec, (int)t1.tv_usec,
+                               call_count, (int)start.tv_sec, (int)start.tv_usec,
                                prevt_count, (int)prevt.tv_sec, (int)prevt.tv_usec);
                                prevt_count, (int)prevt.tv_sec, (int)prevt.tv_usec);
-                prevt = t2;
+                prevt = now;
         }
 }
         }
 }