Whamcloud - gitweb
Severity : major
authoreeb <eeb>
Tue, 8 May 2007 14:52:08 +0000 (14:52 +0000)
committereeb <eeb>
Tue, 8 May 2007 14:52:08 +0000 (14:52 +0000)
Frequency  : rare
Bugzilla   : 11706
Description: Added LNetSetAsync() to ensure single-threaded userspace
             clients can be eager LNET receivers even when the application
     is not executing in the filesystem.

16 files changed:
lnet/ChangeLog
lnet/include/lnet/api.h
lnet/include/lnet/lib-types.h
lnet/include/lnet/ptllnd_wire.h
lnet/klnds/ptllnd/ptllnd.c
lnet/klnds/ptllnd/ptllnd.h
lnet/klnds/ptllnd/ptllnd_cb.c
lnet/klnds/ptllnd/ptllnd_modparams.c
lnet/klnds/ptllnd/ptllnd_peer.c
lnet/klnds/ptllnd/ptllnd_rx_buf.c
lnet/klnds/ptllnd/ptllnd_tx.c
lnet/lnet/lib-move.c
lnet/lnet/module.c
lnet/ulnds/ptllnd/ptllnd.c
lnet/ulnds/ptllnd/ptllnd.h
lnet/ulnds/ptllnd/ptllnd_cb.c

index 3a39bf2..0995305 100644 (file)
 
 Severity   : major
 Frequency  : rare
+Bugzilla   : 11706
+Description: Added LNetSetAsync() to ensure single-threaded userspace
+             clients can be eager LNET receivers even when the application
+            is not executing in the filesystem.
+       
+Severity   : major
+Frequency  : rare
 Bugzilla   : 12016
 Description: node crash on socket teardown race
 
index 481a0fd..c240aa2 100644 (file)
@@ -96,4 +96,7 @@ int LNetGet(lnet_nid_t        self,
            __u64             match_bits_in, 
            unsigned int      offset_in);
 
+
+int LNetSetAsync(lnet_process_id_t id, int nasync);
+
 #endif
index b7d09cd..6c6dfd3 100644 (file)
@@ -332,6 +332,9 @@ typedef struct lnet_lnd
 #else
         /* wait for something to happen */
         void (*lnd_wait)(struct lnet_ni *ni, int milliseconds);
+
+        /* ensure non-RDMA messages can be received outside liblustre */
+        int (*lnd_setasync)(struct lnet_ni *ni, lnet_process_id_t id, int nasync);
 #endif
 } lnd_t;
 
index e5b5410..ca9046c 100644 (file)
@@ -16,6 +16,9 @@
  *
  */
  
+/* Minimum buffer size that any peer will post to receive ptllnd messages */
+#define PTLLND_MIN_BUFFER_SIZE  256
+
 /************************************************************************
  * Tunable defaults that {u,k}lnds/ptllnd should have in common.
  */
 #define PTLLND_PORTAL           9          /* The same portal PTLPRC used when talking to cray portals */
 #define PTLLND_PID              9          /* The Portals PID */
 #define PTLLND_PEERCREDITS      8          /* concurrent sends to 1 peer */
-#define PTLLND_MAX_MSG_SIZE     512        /* Maximum message size */
+
+/* Default buffer size for kernel ptllnds (guaranteed eager) */
+#define PTLLND_MAX_KLND_MSG_SIZE 512
+
+/* Default buffer size for catamount ptllnds (not guaranteed eager) - large
+ * enough to avoid RDMA for anything sent while control is not in liblustre */
+#define PTLLND_MAX_ULND_MSG_SIZE 512
 
 
 /************************************************************************
- * Portals LNS Wire message format.
+ * Portals LND Wire message format.
  * These are sent in sender's byte order (i.e. receiver flips).
  */
 
index 5723c8a..f9361f9 100755 (executable)
@@ -166,6 +166,50 @@ const char *kptllnd_msgtype2str(int type)
 #undef DO_TYPE
 }
 
+const char *kptllnd_errtype2str(int type)
+{
+#define DO_TYPE(x) case x: return #x;
+        switch(type)
+        {
+                DO_TYPE(PTL_OK);
+                DO_TYPE(PTL_SEGV);
+                DO_TYPE(PTL_NO_SPACE);
+                DO_TYPE(PTL_ME_IN_USE);
+                DO_TYPE(PTL_NAL_FAILED);
+                DO_TYPE(PTL_NO_INIT);
+                DO_TYPE(PTL_IFACE_DUP);
+                DO_TYPE(PTL_IFACE_INVALID);
+                DO_TYPE(PTL_HANDLE_INVALID);
+                DO_TYPE(PTL_MD_INVALID);
+                DO_TYPE(PTL_ME_INVALID);
+                DO_TYPE(PTL_PROCESS_INVALID);
+                DO_TYPE(PTL_PT_INDEX_INVALID);
+                DO_TYPE(PTL_SR_INDEX_INVALID);
+                DO_TYPE(PTL_EQ_INVALID);
+                DO_TYPE(PTL_EQ_DROPPED);
+                DO_TYPE(PTL_EQ_EMPTY);
+                DO_TYPE(PTL_MD_NO_UPDATE);
+                DO_TYPE(PTL_FAIL);
+                DO_TYPE(PTL_AC_INDEX_INVALID);
+                DO_TYPE(PTL_MD_ILLEGAL);
+                DO_TYPE(PTL_ME_LIST_TOO_LONG);
+                DO_TYPE(PTL_MD_IN_USE);
+                DO_TYPE(PTL_NI_INVALID);
+                DO_TYPE(PTL_PID_INVALID);
+                DO_TYPE(PTL_PT_FULL);
+                DO_TYPE(PTL_VAL_FAILED);
+                DO_TYPE(PTL_NOT_IMPLEMENTED);
+                DO_TYPE(PTL_NO_ACK);
+                DO_TYPE(PTL_EQ_IN_USE);
+                DO_TYPE(PTL_PID_IN_USE);
+                DO_TYPE(PTL_INV_EQ_SIZE);
+                DO_TYPE(PTL_AGAIN);
+        default:
+                return "<unknown event type>";
+        }
+#undef DO_TYPE
+}
+
 __u32
 kptllnd_cksum (void *ptr, int nob)
 {
@@ -432,9 +476,12 @@ kptllnd_startup (lnet_ni_t *ni)
         }
 
         *kptllnd_tunables.kptl_max_msg_size &= ~7;
-        if (*kptllnd_tunables.kptl_max_msg_size < sizeof(kptl_msg_t))
-                *kptllnd_tunables.kptl_max_msg_size =
-                        (sizeof(kptl_msg_t) + 7) & ~7;
+        if (*kptllnd_tunables.kptl_max_msg_size < PTLLND_MIN_BUFFER_SIZE)
+                *kptllnd_tunables.kptl_max_msg_size = PTLLND_MIN_BUFFER_SIZE;
+
+        CLASSERT ((PTLLND_MIN_BUFFER_SIZE & 7) == 0);
+        CLASSERT (sizeof(kptl_msg_t) <= PTLLND_MIN_BUFFER_SIZE);
+
         /*
          * zero pointers, flags etc
          * put everything into a known state.
index 598c4b8..4ea88f4 100755 (executable)
@@ -90,6 +90,7 @@ typedef struct
         int             *kptl_max_msg_size;     /* max immd message size*/
         int             *kptl_peer_hash_table_size; /* # slots in peer hash table */
         int             *kptl_reschedule_loops; /* scheduler yield loops */
+        int             *kptl_ack_puts;         /* make portals ack PUTs */
 #ifdef CRAY_XT3
         int             *kptl_ptltrace_on_timeout; /* dump pltrace on timeout? */
         char           **kptl_ptltrace_basename;  /* ptltrace dump file basename */
@@ -125,6 +126,7 @@ typedef struct kptl_rx                          /* receive message */
         kptl_rx_buffer_t       *rx_rxb;         /* the rx buffer pointer */
         kptl_msg_t             *rx_msg;         /* received message */
         int                     rx_nob;         /* received message size */
+        unsigned long           rx_treceived;   /* time received */
         ptl_process_id_t        rx_initiator;   /* sender's address */
 #ifdef CRAY_XT3
         ptl_uid_t               rx_uid;         /* sender's uid */
@@ -182,6 +184,7 @@ typedef struct kptl_tx                           /* transmit message */
         enum kptl_tx_type       tx_type;      /* small msg/{put,get}{req,resp} */
         int                     tx_active:1;  /* queued on the peer */
         int                     tx_idle:1;    /* on the free list */
+        int                     tx_acked:1;   /* portals ACK wanted (for debug only) */
         kptl_eventarg_t         tx_msg_eventarg; /* event->md.user_ptr */
         kptl_eventarg_t         tx_rdma_eventarg; /* event->md.user_ptr */
         int                     tx_status;    /* the status of this tx descriptor */
@@ -192,8 +195,9 @@ typedef struct kptl_tx                           /* transmit message */
         kptl_msg_t             *tx_msg;       /* the message data */
         kptl_peer_t            *tx_peer;      /* the peer this is waiting on */
         unsigned long           tx_deadline;  /* deadline */
-        ptl_md_t                tx_rdma_md;   /* rdma buffer */
-        kptl_fragvec_t         *tx_rdma_frags; /* buffer fragments */
+        unsigned long           tx_tposted;   /* time posted */
+        ptl_md_t                tx_rdma_md;   /* rdma descriptor */
+        kptl_fragvec_t         *tx_frags;     /* buffer fragments */
 } kptl_tx_t;
 
 enum kptllnd_peer_state
@@ -221,7 +225,8 @@ struct kptl_peer
         int                     peer_sent_hello;        /* have I sent HELLO? */
         int                     peer_credits;           /* number of send credits */
         int                     peer_outstanding_credits;/* number of peer credits to return */
-        int                     peer_active_rxs;        /* # rx-es being handled */
+        int                     peer_sent_credits;      /* #msg buffers posted for peer */
+        int                     peer_max_msg_size;      /* peer's rx buffer size */
         int                     peer_error;             /* errno on closing this peer */
         cfs_time_t              peer_last_alive;        /* when (in jiffies) I was last alive */
         __u64                   peer_next_matchbits;    /* Next value to register RDMA from peer */
@@ -319,6 +324,7 @@ void kptllnd_tunables_fini(void);
 
 const char *kptllnd_evtype2str(int evtype);
 const char *kptllnd_msgtype2str(int msgtype);
+const char *kptllnd_errtype2str(int errtype);
 
 static inline void *
 kptllnd_eventarg2obj (kptl_eventarg_t *eva)
@@ -413,7 +419,8 @@ void kptllnd_handle_closing_peers(void);
 int  kptllnd_peer_connect(kptl_tx_t *tx, lnet_nid_t nid);
 void kptllnd_peer_check_sends(kptl_peer_t *peer);
 void kptllnd_peer_check_bucket(int idx);
-void kptllnd_tx_launch(kptl_tx_t *tx, lnet_process_id_t target);
+void kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag);
+int  kptllnd_find_target(kptl_peer_t **peerp, lnet_process_id_t target);
 kptl_peer_t *kptllnd_peer_handle_hello(ptl_process_id_t initiator,
                                        kptl_msg_t *msg);
 kptl_peer_t *kptllnd_id2peer_locked(lnet_process_id_t id);
index 22adc85..91772f9 100644 (file)
@@ -127,7 +127,7 @@ kptllnd_init_rdma_md(kptl_tx_t *tx, unsigned int niov,
         
         memset(&tx->tx_rdma_md, 0, sizeof(tx->tx_rdma_md));
 
-        tx->tx_rdma_md.start     = tx->tx_rdma_frags;
+        tx->tx_rdma_md.start     = tx->tx_frags;
         tx->tx_rdma_md.user_ptr  = &tx->tx_rdma_eventarg;
         tx->tx_rdma_md.eq_handle = kptllnd_data.kptl_eqh;
         tx->tx_rdma_md.options   = PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
@@ -151,7 +151,7 @@ kptllnd_init_rdma_md(kptl_tx_t *tx, unsigned int niov,
                 break;
                 
         case TX_TYPE_GET_RESPONSE:              /* active: I put */
-                tx->tx_rdma_md.threshold = 1;   /* SEND */
+                tx->tx_rdma_md.threshold = tx->tx_acked ? 2 : 1;   /* SEND + ACK? */
                 break;
         }
 
@@ -164,7 +164,7 @@ kptllnd_init_rdma_md(kptl_tx_t *tx, unsigned int niov,
         if (iov != NULL) {
                 tx->tx_rdma_md.options |= PTL_MD_IOVEC;
                 tx->tx_rdma_md.length = 
-                        lnet_extract_iov(PTL_MD_MAX_IOV, tx->tx_rdma_frags->iov,
+                        lnet_extract_iov(PTL_MD_MAX_IOV, tx->tx_frags->iov,
                                          niov, iov, offset, nob);
                 return;
         }
@@ -180,20 +180,20 @@ kptllnd_init_rdma_md(kptl_tx_t *tx, unsigned int niov,
         
         tx->tx_rdma_md.options |= PTL_MD_KIOV;
         tx->tx_rdma_md.length = 
-                lnet_extract_kiov(PTL_MD_MAX_IOV, tx->tx_rdma_frags->kiov,
+                lnet_extract_kiov(PTL_MD_MAX_IOV, tx->tx_frags->kiov,
                                   niov, kiov, offset, nob);
 #else
         if (iov != NULL) {
                 tx->tx_rdma_md.options |= PTL_MD_IOVEC;
                 tx->tx_rdma_md.length = 
-                        kptllnd_extract_iov(PTL_MD_MAX_IOV, tx->tx_rdma_frags->iov,
+                        kptllnd_extract_iov(PTL_MD_MAX_IOV, tx->tx_frags->iov,
                                             niov, iov, offset, nob);
                 return;
         }
 
         tx->tx_rdma_md.options |= PTL_MD_IOVEC | PTL_MD_PHYS;
         tx->tx_rdma_md.length =
-                kptllnd_extract_phys(PTL_MD_MAX_IOV, tx->tx_rdma_frags->iov,
+                kptllnd_extract_phys(PTL_MD_MAX_IOV, tx->tx_frags->iov,
                                      niov, kiov, offset, nob);
 #endif
 }
@@ -249,9 +249,11 @@ kptllnd_active_rdma(kptl_rx_t *rx, lnet_msg_t *lntmsg, int type,
 
         spin_unlock_irqrestore(&peer->peer_lock, flags);
 
+        tx->tx_tposted = jiffies;
+
         if (type == TX_TYPE_GET_RESPONSE)
                 ptlrc = PtlPut(mdh,
-                               PTL_NOACK_REQ,
+                               tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
                                rx->rx_initiator,
                                *kptllnd_tunables.kptl_portal,
                                0,                     /* acl cookie */
@@ -293,8 +295,11 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
         lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;
         unsigned int      payload_offset = lntmsg->msg_offset;
         unsigned int      payload_nob = lntmsg->msg_len;
+        kptl_peer_t      *peer;
         kptl_tx_t        *tx;
         int               nob;
+        int               nfrag;
+        int               rc;
 
         LASSERT (payload_nob == 0 || payload_niov > 0);
         LASSERT (payload_niov <= LNET_MAX_IOV);
@@ -302,6 +307,10 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
         LASSERT (!in_interrupt());
 
+        rc = kptllnd_find_target(&peer, target);
+        if (rc != 0)
+                return rc;
+        
         switch (type) {
         default:
                 LBUG();
@@ -309,9 +318,10 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
 
         case LNET_MSG_REPLY:
         case LNET_MSG_PUT:
-                /* Is the payload small enough not to need RDMA? */
+                /* Should the payload avoid RDMA? */
                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[payload_nob]);
-                if (nob <= *kptllnd_tunables.kptl_max_msg_size)
+                if (payload_kiov == NULL && 
+                    nob <= peer->peer_max_msg_size)
                         break;
 
                 tx = kptllnd_get_idle_tx(TX_TYPE_PUT_REQUEST);
@@ -319,7 +329,8 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                         CERROR("Can't send %s to %s: can't allocate descriptor\n",
                                lnet_msgtyp2str(type),
                                libcfs_id2str(target));
-                        return -ENOMEM;
+                        rc = -ENOMEM;
+                        goto out;
                 }
 
                 kptllnd_init_rdma_md(tx, payload_niov, 
@@ -335,8 +346,8 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                        libcfs_id2str(target),
                        le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index), tx);
 
-                kptllnd_tx_launch(tx, target);
-                return 0;
+                kptllnd_tx_launch(peer, tx, 0);
+                goto out;
 
         case LNET_MSG_GET:
                 /* routed gets don't RDMA */
@@ -347,14 +358,15 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                 nob = lntmsg->msg_md->md_length;
                 nob = offsetof(kptl_msg_t, 
                                ptlm_u.immediate.kptlim_payload[nob]);
-                if (nob <= *kptllnd_tunables.kptl_max_msg_size)
+                if (nob <= peer->peer_max_msg_size)
                         break;
 
                 tx = kptllnd_get_idle_tx(TX_TYPE_GET_REQUEST);
                 if (tx == NULL) {
                         CERROR("Can't send GET to %s: can't allocate descriptor\n",
                                libcfs_id2str(target));
-                        return -ENOMEM;
+                        rc = -ENOMEM;
+                        goto out;
                 }
 
                 tx->tx_lnet_replymsg =
@@ -363,7 +375,8 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                         CERROR("Failed to allocate LNET reply for %s\n",
                                libcfs_id2str(target));
                         kptllnd_tx_decref(tx);
-                        return -ENOMEM;
+                        rc = -ENOMEM;
+                        goto out;
                 }
 
                 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
@@ -384,8 +397,8 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                        libcfs_id2str(target),
                        le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index), tx);
 
-                kptllnd_tx_launch(tx, target);
-                return 0;
+                kptllnd_tx_launch(peer, tx, 0);
+                goto out;
 
         case LNET_MSG_ACK:
                 CDEBUG(D_NET, "LNET_MSG_ACK\n");
@@ -393,29 +406,42 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                 break;
         }
 
+        /* I don't have to handle kiovs */
+        LASSERT (payload_nob == 0 || payload_iov != NULL);
+
         tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
         if (tx == NULL) {
                 CERROR("Can't send %s to %s: can't allocate descriptor\n",
                        lnet_msgtyp2str(type), libcfs_id2str(target));
-                        return -ENOMEM;
+                rc = -ENOMEM;
+                goto out;
         }
 
         tx->tx_lnet_msg = lntmsg;
         tx->tx_msg->ptlm_u.immediate.kptlim_hdr = *hdr;
 
-        if (payload_kiov != NULL)
-                lnet_copy_kiov2flat(*kptllnd_tunables.kptl_max_msg_size,
-                                    tx->tx_msg->ptlm_u.immediate.kptlim_payload,
-                                    0,
-                                    payload_niov, payload_kiov,
-                                    payload_offset, payload_nob);
-        else
-                lnet_copy_iov2flat(*kptllnd_tunables.kptl_max_msg_size,
-                                   tx->tx_msg->ptlm_u.immediate.kptlim_payload,
-                                   0,
-                                   payload_niov, payload_iov,
-                                   payload_offset, payload_nob);
+        if (payload_nob == 0) {
+                nfrag = 0;
+        } else {
+                tx->tx_frags->iov[0].iov_base = tx->tx_msg;
+                tx->tx_frags->iov[0].iov_len = offsetof(kptl_msg_t,
+                                                        ptlm_u.immediate.kptlim_payload);
 
+                /* NB relying on lustre not asking for PTL_MD_MAX_IOV
+                 * fragments!! */
+#ifdef _USING_LUSTRE_PORTALS_
+                nfrag = 1 + lnet_extract_iov(PTL_MD_MAX_IOV - 1, 
+                                             &tx->tx_frags->iov[1],
+                                             payload_niov, payload_iov,
+                                             payload_offset, payload_nob);
+#else
+                nfrag = 1 + kptllnd_extract_iov(PTL_MD_MAX_IOV - 1,
+                                                &tx->tx_frags->iov[1],
+                                                payload_niov, payload_iov,
+                                                payload_offset, payload_nob);
+#endif
+        }
+        
         nob = offsetof(kptl_immediate_msg_t, kptlim_payload[payload_nob]);
         kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_IMMEDIATE, nob);
 
@@ -428,8 +454,11 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                le32_to_cpu(lntmsg->msg_hdr.msg.get.ptl_index) : -1,
                tx);
 
-        kptllnd_tx_launch(tx, target);
-        return 0;
+        kptllnd_tx_launch(peer, tx, nfrag);
+
+ out:
+        kptllnd_peer_decref(peer);
+        return rc;
 }
 
 int 
index 7678748..0313413 100644 (file)
@@ -63,7 +63,7 @@ static int peercredits = PTLLND_PEERCREDITS;    /* <lnet/ptllnd_wire.h> */
 CFS_MODULE_PARM(peercredits, "i", int, 0444,
                "concurrent sends to 1 peer");
 
-static int max_msg_size = PTLLND_MAX_MSG_SIZE;  /* <lnet/ptllnd_wire.h> */
+static int max_msg_size = PTLLND_MAX_KLND_MSG_SIZE;  /* <lnet/ptllnd_wire.h> */
 CFS_MODULE_PARM(max_msg_size, "i", int, 0444,
                "max size of immediate message");
 
@@ -75,6 +75,10 @@ static int reschedule_loops = 100;
 CFS_MODULE_PARM(reschedule_loops, "i", int, 0644,
                 "# of loops before scheduler does cond_resched()");
 
+static int ack_puts = 0;
+CFS_MODULE_PARM(ack_puts, "i", int, 0644,
+               "get portals to ack all PUTs");
+
 #ifdef CRAY_XT3
 static int ptltrace_on_timeout = 1;
 CFS_MODULE_PARM(ptltrace_on_timeout, "i", int, 0644,
@@ -106,6 +110,7 @@ kptl_tunables_t kptllnd_tunables = {
         .kptl_max_msg_size           = &max_msg_size,
         .kptl_peer_hash_table_size   = &peer_hash_table_size,
         .kptl_reschedule_loops       = &reschedule_loops,
+        .kptl_ack_puts               = &ack_puts,
 #ifdef CRAY_XT3
         .kptl_ptltrace_on_timeout    = &ptltrace_on_timeout,
         .kptl_ptltrace_basename      = &ptltrace_basename,
@@ -156,15 +161,17 @@ static ctl_table kptllnd_ctl_table[] = {
         sizeof(int), 0444, NULL, &proc_dointvec},
        {13, "reschedule_loops", &reschedule_loops,
         sizeof(int), 0444, NULL, &proc_dointvec},
+       {14, "ack_puts", &ack_puts,
+        sizeof(int), 0644, NULL, &proc_dointvec},
 #ifdef CRAY_XT3
-       {14, "ptltrace_on_timeout", &ptltrace_on_timeout,
+       {15, "ptltrace_on_timeout", &ptltrace_on_timeout,
         sizeof(int), 0644, NULL, &proc_dointvec},
-       {15, "ptltrace_basename", ptltrace_basename_space,
+       {16, "ptltrace_basename", ptltrace_basename_space,
         sizeof(ptltrace_basename_space), 0644, NULL, &proc_dostring,
         &sysctl_string},
 #endif
 #ifdef PJK_DEBUGGING
-       {16, "simulation_bitmap", &simulation_bitmap,
+       {17, "simulation_bitmap", &simulation_bitmap,
         sizeof(int), 0444, NULL, &proc_dointvec},
 #endif
 
index 0f9e7e0..86a21f1 100644 (file)
@@ -169,7 +169,8 @@ kptllnd_peer_allocate (lnet_process_id_t lpid, ptl_process_id_t ppid)
         peer->peer_credits = 1;                 /* enough for HELLO */
         peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
         peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peercredits - 1;
-        peer->peer_active_rxs = 0;
+        peer->peer_sent_credits = 1;           /* HELLO credit is implicit */
+        peer->peer_max_msg_size = PTLLND_MIN_BUFFER_SIZE; /* until we know better */
 
         atomic_set(&peer->peer_refcount, 1);    /* 1 ref for caller */
 
@@ -201,7 +202,6 @@ kptllnd_peer_destroy (kptl_peer_t *peer)
 
         LASSERT (!in_interrupt());
         LASSERT (atomic_read(&peer->peer_refcount) == 0);
-        LASSERT (peer->peer_active_rxs == 0);
         LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||
                  peer->peer_state == PEER_STATE_ZOMBIE);
         LASSERT (list_empty(&peer->peer_sendq));
@@ -461,7 +461,7 @@ again:
 }
 
 void
-kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx)
+kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
 {
         /* CAVEAT EMPTOR: I take over caller's ref on 'tx' */
         ptl_handle_md_t  rdma_mdh = PTL_INVALID_HANDLE;
@@ -524,16 +524,26 @@ kptllnd_post_tx(kptl_peer_t *peer, kptl_tx_t *tx)
         }
 
         memset(&md, 0, sizeof(md));
-        
-        md.start = tx->tx_msg;
-        md.length = tx->tx_msg->ptlm_nob;
-        md.threshold = 1;
+
+        md.threshold = tx->tx_acked ? 2 : 1;    /* SEND END + ACK? */
         md.options = PTL_MD_OP_PUT |
                      PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
                      PTL_MD_EVENT_START_DISABLE;
         md.user_ptr = &tx->tx_msg_eventarg;
         md.eq_handle = kptllnd_data.kptl_eqh;
 
+        if (nfrag == 0) {
+                md.start = tx->tx_msg;
+                md.length = tx->tx_msg->ptlm_nob;
+        } else {
+                LASSERT (nfrag > 1);
+                LASSERT (tx->tx_frags->iov[0].iov_base == (void *)tx->tx_msg);
+
+                md.start = tx->tx_frags;
+                md.length = nfrag;
+                md.options |= PTL_MD_IOVEC;
+        }
+
         prc = PtlMDBind(kptllnd_data.kptl_nih, md, PTL_UNLINK, &msg_mdh);
         if (prc != PTL_OK) {
                 msg_mdh = PTL_INVALID_HANDLE;
@@ -593,7 +603,7 @@ kptllnd_peer_check_sends (kptl_peer_t *peer)
                                libcfs_id2str(peer->peer_id));
                 } else {
                         kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_NOOP, 0);
-                        kptllnd_post_tx(peer, tx);
+                        kptllnd_post_tx(peer, tx, 0);
                 }
 
                 spin_lock_irqsave(&peer->peer_lock, flags);
@@ -608,11 +618,11 @@ kptllnd_peer_check_sends (kptl_peer_t *peer)
                          !PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE));
 
                 LASSERT (peer->peer_outstanding_credits >= 0);
-                LASSERT (peer->peer_outstanding_credits <= 
+                LASSERT (peer->peer_sent_credits >= 0);
+                LASSERT (peer->peer_sent_credits +
+                         peer->peer_outstanding_credits <=
                          *kptllnd_tunables.kptl_peercredits);
                 LASSERT (peer->peer_credits >= 0);
-                LASSERT (peer->peer_credits <= 
-                         *kptllnd_tunables.kptl_peercredits);
 
                /* Ensure HELLO is sent first */
                if (!peer->peer_sent_hello) {
@@ -622,9 +632,11 @@ kptllnd_peer_check_sends (kptl_peer_t *peer)
                }
 
                 if (peer->peer_credits == 0) {
-                        CDEBUG(D_NETTRACE, "%s[%d/%d]: no credits for %p\n",
-                               libcfs_id2str(peer->peer_id),
-                               peer->peer_credits, peer->peer_outstanding_credits, tx);
+                        CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: no credits for %p\n",
+                               libcfs_id2str(peer->peer_id), 
+                               peer->peer_credits,
+                               peer->peer_outstanding_credits, 
+                               peer->peer_sent_credits, tx);
                         break;
                 }
 
@@ -632,9 +644,12 @@ kptllnd_peer_check_sends (kptl_peer_t *peer)
                  * return */
                 if (peer->peer_credits == 1 &&
                     peer->peer_outstanding_credits == 0) {
-                        CDEBUG(D_NETTRACE, "%s[%d/%d]: not using last credit for %p\n",
-                               libcfs_id2str(peer->peer_id),
-                               peer->peer_credits, peer->peer_outstanding_credits, tx);
+                        CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: "
+                               "not using last credit for %p\n",
+                               libcfs_id2str(peer->peer_id), 
+                               peer->peer_credits,
+                               peer->peer_outstanding_credits,
+                               peer->peer_sent_credits, tx);
                         break;
                 }
 
@@ -661,12 +676,13 @@ kptllnd_peer_check_sends (kptl_peer_t *peer)
                 /* fill last-minute msg header fields */
                 kptllnd_msg_pack(tx->tx_msg, peer);
 
+                peer->peer_sent_credits += peer->peer_outstanding_credits;
                 peer->peer_outstanding_credits = 0;
                 peer->peer_credits--;
 
-                CDEBUG(D_NETTRACE, "%s[%d/%d]: %s tx=%p nob=%d cred=%d\n",
-                       libcfs_id2str(peer->peer_id),
-                       peer->peer_credits, peer->peer_outstanding_credits,
+                CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s tx=%p nob=%d cred=%d\n",
+                       libcfs_id2str(peer->peer_id), peer->peer_credits,
+                       peer->peer_outstanding_credits, peer->peer_sent_credits,
                        kptllnd_msgtype2str(tx->tx_msg->ptlm_type),
                        tx, tx->tx_msg->ptlm_nob,
                        tx->tx_msg->ptlm_credits);
@@ -677,8 +693,10 @@ kptllnd_peer_check_sends (kptl_peer_t *peer)
 
                 spin_unlock_irqrestore(&peer->peer_lock, flags);
 
+                tx->tx_tposted = jiffies;       /* going on the wire */
+
                 rc = PtlPut (tx->tx_msg_mdh,
-                             PTL_NOACK_REQ,
+                             tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
                              peer->peer_ptlid,
                              *kptllnd_tunables.kptl_portal,
                              0,                 /* acl cookie */
@@ -756,9 +774,9 @@ kptllnd_peer_check_bucket (int idx)
         list_for_each (ptmp, peers) {
                 peer = list_entry (ptmp, kptl_peer_t, peer_list);
 
-                CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d\n",
-                       libcfs_id2str(peer->peer_id),
-                       peer->peer_credits, peer->peer_outstanding_credits);
+                CDEBUG(D_NET, "Peer=%s Credits=%d Outstanding=%d Send=%d\n",
+                       libcfs_id2str(peer->peer_id), peer->peer_credits, 
+                       peer->peer_outstanding_credits, peer->peer_sent_credits);
 
                 /* In case we have enough credits to return via a
                  * NOOP, but there were no non-blocking tx descs
@@ -779,20 +797,26 @@ kptllnd_peer_check_bucket (int idx)
                 nactive = kptllnd_count_queue(&peer->peer_activeq);
                 spin_unlock_irqrestore(&peer->peer_lock, flags);
 
-                LCONSOLE_ERROR("Timing out %s: please check Portals\n",
-                               libcfs_id2str(peer->peer_id));
-
-                CERROR("%s timed out: cred %d outstanding %d sendq %d "
-                       "activeq %d Tx %s (%s%s%s) status %d T/O %ds\n",
-                       libcfs_id2str(peer->peer_id),
-                       peer->peer_credits, peer->peer_outstanding_credits,
-                       nsend, nactive, kptllnd_tx_typestr(tx->tx_type),
+                LCONSOLE_ERROR("Timing out %s: %s\n",
+                               libcfs_id2str(peer->peer_id),
+                               (tx->tx_tposted == 0) ? 
+                               "no free peer buffers" : "please check Portals");
+
+                CERROR("%s timed out: cred %d outstanding %d, sent %d, "
+                       "sendq %d, activeq %d Tx %p %s (%s%s%s) status %d "
+                       "%sposted %lu T/O %ds\n",
+                       libcfs_id2str(peer->peer_id), peer->peer_credits,
+                       peer->peer_outstanding_credits, peer->peer_sent_credits,
+                       nsend, nactive, tx, kptllnd_tx_typestr(tx->tx_type),
                        tx->tx_active ? "A" : "",
                        PtlHandleIsEqual(tx->tx_msg_mdh, PTL_INVALID_HANDLE) ?
                        "" : "M",
                        PtlHandleIsEqual(tx->tx_rdma_mdh, PTL_INVALID_HANDLE) ?
                        "" : "D",
-                       tx->tx_status, *kptllnd_tunables.kptl_timeout);
+                       tx->tx_status,
+                       (tx->tx_tposted == 0) ? "not " : "",
+                       (tx->tx_tposted == 0) ? 0UL : (jiffies - tx->tx_tposted),
+                       *kptllnd_tunables.kptl_timeout);
 
                 kptllnd_dump_ptltrace();
 
@@ -928,22 +952,17 @@ kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
                return NULL;
        }
        
-        if (msg->ptlm_u.hello.kptlhm_max_msg_size !=
-            *kptllnd_tunables.kptl_max_msg_size) {
-                CERROR("max message size MUST be equal for all peers: "
-                       "got %d expected %d from %s\n",
+        if (msg->ptlm_u.hello.kptlhm_max_msg_size < PTLLND_MIN_BUFFER_SIZE) {
+                CERROR("%s: max message size %d < MIN %d",
+                       libcfs_id2str(lpid),
                        msg->ptlm_u.hello.kptlhm_max_msg_size,
-                       *kptllnd_tunables.kptl_max_msg_size,
-                       libcfs_id2str(lpid));
+                       *kptllnd_tunables.kptl_max_msg_size);
                 return NULL;
         }
 
-        if (msg->ptlm_credits + 1 != *kptllnd_tunables.kptl_peercredits) {
-                CERROR("peercredits MUST be equal on all peers: "
-                       "got %d expected %d from %s\n",
-                       msg->ptlm_credits + 1,
-                       *kptllnd_tunables.kptl_peercredits,
-                       libcfs_id2str(lpid));
+        if (msg->ptlm_credits <= 1) {
+                CERROR("Need more than 1+%d credits from %s\n",
+                       msg->ptlm_credits, libcfs_id2str(lpid));
                 return NULL;
         }
         
@@ -972,6 +991,8 @@ kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
                         peer->peer_state = PEER_STATE_ACTIVE;
                         peer->peer_incarnation = msg->ptlm_srcstamp;
                         peer->peer_next_matchbits = safe_matchbits;
+                        peer->peer_max_msg_size =
+                                msg->ptlm_u.hello.kptlhm_max_msg_size;
                         
                         write_unlock_irqrestore(g_lock, flags);
                         return peer;
@@ -1034,18 +1055,19 @@ kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
         }
 
         write_lock_irqsave(g_lock, flags);
-
+ again:
         peer = kptllnd_id2peer_locked(lpid);
         if (peer != NULL) {
                 if (peer->peer_state == PEER_STATE_WAITING_HELLO) {
-                        /* An outgoing message instantiated 'peer' for me and
-                        * presumably provoked this reply */
+                        /* An outgoing message instantiated 'peer' for me */
                         CWARN("Outgoing instantiated peer %s\n", libcfs_id2str(lpid));
                         LASSERT(peer->peer_incarnation == 0);
 
                         peer->peer_state = PEER_STATE_ACTIVE;
                         peer->peer_incarnation = msg->ptlm_srcstamp;
                         peer->peer_next_matchbits = safe_matchbits;
+                        peer->peer_max_msg_size =
+                                msg->ptlm_u.hello.kptlhm_max_msg_size;
                } else {
                        LASSERT (peer->peer_state == PEER_STATE_ACTIVE);
                        /* WOW!  Somehow this peer completed the HELLO
@@ -1084,6 +1106,7 @@ kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
                 
                 write_lock_irqsave(g_lock, flags);
                 kptllnd_data.kptl_expected_peers++;
+                goto again;
         }
 
         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);
@@ -1096,6 +1119,7 @@ kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
         new_peer->peer_incarnation = msg->ptlm_srcstamp;
         new_peer->peer_next_matchbits = safe_matchbits;
         new_peer->peer_last_matchbits_seen = last_matchbits_seen;
+        new_peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
 
         kptllnd_peer_add_peertable_locked(new_peer);
 
@@ -1107,41 +1131,42 @@ kptllnd_peer_handle_hello (ptl_process_id_t  initiator,
         CDEBUG(D_NETTRACE, "%s: post response hello %p\n",
                libcfs_id2str(new_peer->peer_id), hello_tx);
 
-        kptllnd_post_tx(new_peer, hello_tx);
+        kptllnd_post_tx(new_peer, hello_tx, 0);
         kptllnd_peer_check_sends(new_peer);
 
         return new_peer;
 }
 
 void
-kptllnd_tx_launch(kptl_tx_t *tx, lnet_process_id_t target)
+kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag)
+{
+        kptllnd_post_tx(peer, tx, nfrag);
+        kptllnd_peer_check_sends(peer);
+}
+
+int
+kptllnd_find_target(kptl_peer_t **peerp, lnet_process_id_t target)
 {
         rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;
         ptl_process_id_t  ptl_id;
-        kptl_peer_t      *peer;
-        kptl_peer_t      *new_peer = NULL;
-        kptl_tx_t        *hello_tx = NULL;
+        kptl_peer_t      *new_peer;
+        kptl_tx_t        *hello_tx;
         unsigned long     flags;
         int               rc;
         __u64             last_matchbits_seen;
 
-        LASSERT (tx->tx_lnet_msg != NULL);
-        LASSERT (tx->tx_peer == NULL);
-
         /* I expect to find the peer, so I only take a read lock... */
         read_lock_irqsave(g_lock, flags);
-        peer = kptllnd_id2peer_locked(target);
+        *peerp = kptllnd_id2peer_locked(target);
         read_unlock_irqrestore(g_lock, flags);
 
-        if (peer != NULL) {
-                goto post;
-        }
+        if (*peerp != NULL)
+                return 0;
         
         if ((target.pid & LNET_PID_USERFLAG) != 0) {
                 CWARN("Refusing to create a new connection to %s "
                       "(non-kernel peer)\n", libcfs_id2str(target));
-                tx->tx_status = -EHOSTUNREACH;
-                goto failed;
+                return -EHOSTUNREACH;
         }
 
         /* The new peer is a kernel ptllnd, and kernel ptllnds all have
@@ -1149,24 +1174,11 @@ kptllnd_tx_launch(kptl_tx_t *tx, lnet_process_id_t target)
         ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);
         ptl_id.pid = kptllnd_data.kptl_portals_id.pid;
 
-        write_lock_irqsave(g_lock, flags);
-
-        peer = kptllnd_id2peer_locked(target);
-        if (peer != NULL) {
-                write_unlock_irqrestore(g_lock, flags);
-                goto post;
-        }
-        
-        kptllnd_cull_peertable_locked(target);
-
-        write_unlock_irqrestore(g_lock, flags);
-                
         hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
         if (hello_tx == NULL) {
                 CERROR("Unable to allocate connect message for %s\n",
                        libcfs_id2str(target));
-                tx->tx_status = -ENOMEM;
-                goto failed;
+                return -ENOMEM;
         }
 
         kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,
@@ -1174,28 +1186,24 @@ kptllnd_tx_launch(kptl_tx_t *tx, lnet_process_id_t target)
 
         new_peer = kptllnd_peer_allocate(target, ptl_id);
         if (new_peer == NULL) {
-                tx->tx_status = -ENOMEM;
-                goto failed;
+                rc = -ENOMEM;
+                goto unwind_0;
         }
 
         rc = kptllnd_peer_reserve_buffers();
-        if (rc != 0) {
-                tx->tx_status = rc;
-                goto failed;
-        }
+        if (rc != 0)
+                goto unwind_1;
 
         write_lock_irqsave(g_lock, flags);
-
-        peer = kptllnd_id2peer_locked(target);
-        if (peer != NULL) {                     /* someone else beat me to it */
+ again:
+        *peerp = kptllnd_id2peer_locked(target);
+        if (*peerp != NULL) {
                 write_unlock_irqrestore(g_lock, flags);
-
-                kptllnd_peer_unreserve_buffers();
-                kptllnd_peer_decref(new_peer);
-                kptllnd_tx_decref(hello_tx);
-                goto post;
+                goto unwind_2;
         }
-                
+
+        kptllnd_cull_peertable_locked(target);
+
         if (kptllnd_data.kptl_n_active_peers ==
             kptllnd_data.kptl_expected_peers) {
                 /* peer table full */
@@ -1207,12 +1215,12 @@ kptllnd_tx_launch(kptl_tx_t *tx, lnet_process_id_t target)
                 if (rc != 0) {
                         CERROR("Can't create connection to %s\n",
                                libcfs_id2str(target));
-                        kptllnd_peer_unreserve_buffers();
-                        tx->tx_status = -ENOMEM;
-                        goto failed;
+                        rc = -ENOMEM;
+                        goto unwind_2;
                 }
                 write_lock_irqsave(g_lock, flags);
                 kptllnd_data.kptl_expected_peers++;
+                goto again;
         }
 
         last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);
@@ -1234,23 +1242,18 @@ kptllnd_tx_launch(kptl_tx_t *tx, lnet_process_id_t target)
         CDEBUG(D_NETTRACE, "%s: post initial hello %p\n",
                libcfs_id2str(new_peer->peer_id), hello_tx);
 
-        peer = new_peer;
-        kptllnd_post_tx(peer, hello_tx);
-
- post:
-        kptllnd_post_tx(peer, tx);
-        kptllnd_peer_check_sends(peer);
-        kptllnd_peer_decref(peer);
-        return;
+        kptllnd_post_tx(new_peer, hello_tx, 0);
+        kptllnd_peer_check_sends(new_peer);
+       
+        *peerp = new_peer;
+        return 0;
         
failed:
-        if (hello_tx != NULL)
-                kptllnd_tx_decref(hello_tx);
-
-        if (new_peer != NULL)
-                kptllnd_peer_decref(new_peer);
unwind_2:
+        kptllnd_peer_unreserve_buffers();
+ unwind_1:
+        kptllnd_peer_decref(new_peer);
+ unwind_0:
+        kptllnd_tx_decref(hello_tx);
 
-        LASSERT (tx->tx_status != 0);
-        kptllnd_tx_decref(tx);
-        
+        return rc;
 }
index ad0f05d..364540b 100644 (file)
@@ -344,16 +344,15 @@ kptllnd_rx_done(kptl_rx_t *rx)
                 /* Update credits (after I've decref-ed the buffer) */
                 spin_lock_irqsave(&peer->peer_lock, flags);
 
-                peer->peer_active_rxs--;
-                LASSERT (peer->peer_active_rxs >= 0);
-
                 peer->peer_outstanding_credits++;
-                LASSERT (peer->peer_outstanding_credits <=
+                LASSERT (peer->peer_outstanding_credits +
+                         peer->peer_sent_credits <=
                          *kptllnd_tunables.kptl_peercredits);
 
-                CDEBUG(D_NETTRACE, "%s[%d/%d]: rx %p done\n",
-                       libcfs_id2str(peer->peer_id),
-                       peer->peer_credits, peer->peer_outstanding_credits, rx);
+                CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: rx %p done\n",
+                       libcfs_id2str(peer->peer_id), peer->peer_credits,
+                       peer->peer_outstanding_credits, peer->peer_sent_credits,
+                       rx);
 
                 spin_unlock_irqrestore(&peer->peer_lock, flags);
 
@@ -381,10 +380,11 @@ kptllnd_rx_buffer_callback (ptl_event_t *ev)
         unlinked = ev->type == PTL_EVENT_UNLINK;
 #endif
 
-        CDEBUG(D_NET, "RXB Callback %s(%d) rxb=%p id=%s unlink=%d rc %d\n",
-               kptllnd_evtype2str(ev->type), ev->type, rxb, 
+        CDEBUG(D_NET, "%s: %s(%d) rxb=%p fail=%s(%d) unlink=%d\n",
                kptllnd_ptlid2str(ev->initiator), 
-               unlinked, ev->ni_fail_type);
+               kptllnd_evtype2str(ev->type), ev->type, rxb, 
+               kptllnd_errtype2str(ev->ni_fail_type), ev->ni_fail_type,
+               unlinked);
 
         LASSERT (!rxb->rxb_idle);
         LASSERT (ev->md.start == rxb->rxb_buffer);
@@ -396,9 +396,11 @@ kptllnd_rx_buffer_callback (ptl_event_t *ev)
                  ev->match_bits == LNET_MSG_MATCHBITS);
 
         if (ev->ni_fail_type != PTL_NI_OK)
-                CERROR("event type %d, status %d from %s\n",
-                       ev->type, ev->ni_fail_type,
-                       kptllnd_ptlid2str(ev->initiator));
+                CERROR("Portals error from %s: %s(%d) rxb=%p fail=%s(%d) unlink=%dn",
+                       kptllnd_ptlid2str(ev->initiator),
+                       kptllnd_evtype2str(ev->type), ev->type, rxb,
+                       kptllnd_errtype2str(ev->ni_fail_type),
+                       ev->ni_fail_type, unlinked);
 
         if (ev->type == PTL_EVENT_PUT_END &&
             ev->ni_fail_type == PTL_NI_OK &&
@@ -446,6 +448,7 @@ kptllnd_rx_buffer_callback (ptl_event_t *ev)
                         }
 
                         rx->rx_initiator = ev->initiator;
+                        rx->rx_treceived = jiffies;
 #ifdef CRAY_XT3
                         rx->rx_uid = ev->uid;
 #endif
@@ -511,7 +514,6 @@ kptllnd_rx_parse(kptl_rx_t *rx)
         kptl_msg_t             *msg = rx->rx_msg;
         kptl_peer_t            *peer;
         int                     rc;
-        int                     credits;
         unsigned long           flags;
         lnet_process_id_t       srcid;
 
@@ -546,8 +548,9 @@ kptllnd_rx_parse(kptl_rx_t *rx)
         srcid.nid = msg->ptlm_srcnid;
         srcid.pid = msg->ptlm_srcpid;
 
-        CDEBUG(D_NETTRACE, "%s: RX %s c %d %p\n", libcfs_id2str(srcid),
-               kptllnd_msgtype2str(msg->ptlm_type), msg->ptlm_credits, rx);
+        CDEBUG(D_NETTRACE, "%s: RX %s c %d %p rxb %p queued %lu ticks\n",
+               libcfs_id2str(srcid), kptllnd_msgtype2str(msg->ptlm_type),
+               msg->ptlm_credits, rx, rx->rx_rxb, jiffies - rx->rx_treceived);
 
         if (srcid.nid != kptllnd_ptl2lnetnid(rx->rx_initiator.nid)) {
                 CERROR("Bad source id %s from %s\n",
@@ -629,33 +632,23 @@ kptllnd_rx_parse(kptl_rx_t *rx)
 
         spin_lock_irqsave(&peer->peer_lock, flags);
 
-        if (peer->peer_active_rxs == *kptllnd_tunables.kptl_peercredits) {
-                spin_unlock_irqrestore(&peer->peer_lock, flags);
-                        
-                CERROR("Message overflow from %s: handling %d already\n",
-                       libcfs_id2str(peer->peer_id),
-                       *kptllnd_tunables.kptl_peercredits);
-                rc = -EPROTO;
-                goto failed;
-        }
-        
-        if (msg->ptlm_credits != 0 &&
-            peer->peer_credits + msg->ptlm_credits >
-            *kptllnd_tunables.kptl_peercredits) {
-                credits = peer->peer_credits;
+        /* Check peer only sends when I've sent her credits */
+        if (peer->peer_sent_credits == 0) {
+                int  c = peer->peer_credits;
+                int oc = peer->peer_outstanding_credits;
+                int sc = peer->peer_sent_credits;
+                
                 spin_unlock_irqrestore(&peer->peer_lock, flags);
 
-                CERROR("Credit overflow from %s: %d + %d > %d\n",
-                       libcfs_id2str(peer->peer_id),
-                       credits, msg->ptlm_credits,
-                       *kptllnd_tunables.kptl_peercredits);
-                rc = -EPROTO;
+                CERROR("%s: buffer overrun [%d/%d+%d]\n",
+                       libcfs_id2str(peer->peer_id), c, sc, oc);
                 goto failed;
         }
+        peer->peer_sent_credits--;
 
-        /* ptllnd-level protocol correct: account credits */
+        /* No check for credit overflow - the peer may post new
+         * buffers after the startup handshake. */
         peer->peer_credits += msg->ptlm_credits;
-        peer->peer_active_rxs++;
 
         spin_unlock_irqrestore(&peer->peer_lock, flags);
 
@@ -663,7 +656,9 @@ kptllnd_rx_parse(kptl_rx_t *rx)
         if (msg->ptlm_credits != 0)
                 kptllnd_peer_check_sends(peer);
 
-        rx->rx_peer = peer;                /* do buffer accounting on rxdone */
+        /* ptllnd-level protocol correct - rx takes my ref on peer and increments
+         * peer_outstanding_credits when it completes */
+        rx->rx_peer = peer;
         kptllnd_peer_alive(peer);
 
         switch (msg->ptlm_type) {
index 43ca82a..814a7d9 100644 (file)
@@ -22,12 +22,10 @@ void
 kptllnd_free_tx(kptl_tx_t *tx)
 {
         if (tx->tx_msg != NULL)
-                LIBCFS_FREE(tx->tx_msg, 
-                            *kptllnd_tunables.kptl_max_msg_size);
+                LIBCFS_FREE(tx->tx_msg, sizeof(*tx->tx_msg));
                         
-        if (tx->tx_rdma_frags != NULL)
-                LIBCFS_FREE(tx->tx_rdma_frags, 
-                            sizeof(*tx->tx_rdma_frags));
+        if (tx->tx_frags != NULL)
+                LIBCFS_FREE(tx->tx_frags, sizeof(*tx->tx_frags));
 
         LIBCFS_FREE(tx, sizeof(*tx));
 
@@ -59,16 +57,16 @@ kptllnd_alloc_tx(void)
         tx->tx_rdma_eventarg.eva_type = PTLLND_EVENTARG_TYPE_RDMA;
         tx->tx_msg_eventarg.eva_type = PTLLND_EVENTARG_TYPE_MSG;
         tx->tx_msg = NULL;
-        tx->tx_rdma_frags = NULL;
+        tx->tx_frags = NULL;
                 
-        LIBCFS_ALLOC(tx->tx_msg, *kptllnd_tunables.kptl_max_msg_size);
+        LIBCFS_ALLOC(tx->tx_msg, sizeof(*tx->tx_msg));
         if (tx->tx_msg == NULL) {
                 CERROR("Failed to allocate TX payload\n");
                 goto failed;
         }
 
-        LIBCFS_ALLOC(tx->tx_rdma_frags, sizeof(*tx->tx_rdma_frags));
-        if (tx->tx_rdma_frags == NULL) {
+        LIBCFS_ALLOC(tx->tx_frags, sizeof(*tx->tx_frags));
+        if (tx->tx_frags == NULL) {
                 CERROR("Failed to allocate TX frags\n");
                 goto failed;
         }
@@ -172,6 +170,8 @@ kptllnd_get_idle_tx(enum kptl_tx_type type)
         atomic_set(&tx->tx_refcount, 1);
         tx->tx_status = 0;
         tx->tx_idle = 0;
+        tx->tx_tposted = 0;
+        tx->tx_acked = *kptllnd_tunables.kptl_ack_puts;
 
         CDEBUG(D_NET, "tx=%p\n", tx);
         return tx;
@@ -401,11 +401,12 @@ kptllnd_tx_callback(ptl_event_t *ev)
 #else
         unlinked = (ev->type == PTL_EVENT_UNLINK);
 #endif
-        CDEBUG(D_NETTRACE, "%s[%d/%d]: %s(%d) tx=%p fail=%d unlinked=%d\n",
-               libcfs_id2str(peer->peer_id),
-               peer->peer_credits, peer->peer_outstanding_credits,
+        CDEBUG(D_NETTRACE, "%s[%d/%d+%d]: %s(%d) tx=%p fail=%s(%d) unlinked=%d\n",
+               libcfs_id2str(peer->peer_id), peer->peer_credits,
+               peer->peer_outstanding_credits, peer->peer_sent_credits,
                kptllnd_evtype2str(ev->type), ev->type, 
-               tx, ev->ni_fail_type, unlinked);
+               tx, kptllnd_errtype2str(ev->ni_fail_type),
+               ev->ni_fail_type, unlinked);
 
         switch (tx->tx_type) {
         default:
@@ -414,18 +415,21 @@ kptllnd_tx_callback(ptl_event_t *ev)
         case TX_TYPE_SMALL_MESSAGE:
                 LASSERT (ismsg);
                 LASSERT (ev->type == PTL_EVENT_UNLINK ||
-                         ev->type == PTL_EVENT_SEND_END);
+                         ev->type == PTL_EVENT_SEND_END ||
+                         (ev->type == PTL_EVENT_ACK && tx->tx_acked));
                 break;
 
         case TX_TYPE_PUT_REQUEST:
                 LASSERT (ev->type == PTL_EVENT_UNLINK ||
                          (ismsg && ev->type == PTL_EVENT_SEND_END) ||
+                         (ismsg && ev->type == PTL_EVENT_ACK && tx->tx_acked) ||
                          (!ismsg && ev->type == PTL_EVENT_GET_END));
                 break;
 
         case TX_TYPE_GET_REQUEST:
                 LASSERT (ev->type == PTL_EVENT_UNLINK ||
                          (ismsg && ev->type == PTL_EVENT_SEND_END) ||
+                         (ismsg && ev->type == PTL_EVENT_ACK && tx->tx_acked) ||
                          (!ismsg && ev->type == PTL_EVENT_PUT_END));
 
                 if (!ismsg && ok && ev->type == PTL_EVENT_PUT_END) {
@@ -451,21 +455,23 @@ kptllnd_tx_callback(ptl_event_t *ev)
         case TX_TYPE_GET_RESPONSE:
                 LASSERT (!ismsg);
                 LASSERT (ev->type == PTL_EVENT_UNLINK ||
-                         ev->type == PTL_EVENT_SEND_END);
+                         ev->type == PTL_EVENT_SEND_END ||
+                         (ev->type == PTL_EVENT_ACK && tx->tx_acked));
                 break;
         }
 
         if (ok) {
                 kptllnd_peer_alive(peer);
         } else {
-                CDEBUG(D_NETERROR, "%s: %s network error %d, t=%d\n",
+                CERROR("Portals error to %s: %s(%d) tx=%p fail=%s(%d) unlinked=%d\n",
                        libcfs_id2str(peer->peer_id),
-                       ismsg ? "msg" : "bulk",
-                       ev->ni_fail_type, tx->tx_type);
-                tx->tx_status = -EIO;
+                       kptllnd_evtype2str(ev->type), ev->type, 
+                       tx, kptllnd_errtype2str(ev->ni_fail_type),
+                       ev->ni_fail_type, unlinked);
+                tx->tx_status = -EIO; 
                 kptllnd_peer_close(peer, -EIO);
         }
-        
+
         if (!unlinked)
                 return;
 
index 1627064..f598c0b 100644 (file)
@@ -2500,3 +2500,76 @@ LNetDist (lnet_nid_t dstnid, lnet_nid_t *srcnidp, int *orderp)
         return -EHOSTUNREACH;
 }
 
+int
+LNetSetAsync(lnet_process_id_t id, int nasync)
+{
+#ifdef __KERNEL__
+        return 0;
+#else
+        lnet_ni_t        *ni;
+        lnet_remotenet_t *rnet;
+        struct list_head *tmp;
+        lnet_route_t     *route;
+        lnet_nid_t       *nids;
+        int               nnids;
+        int               maxnids = 256;
+        int               rc = 0;
+        int               rc2;
+        
+        /* Target on a local network? */ 
+        
+        ni = lnet_net2ni(LNET_NIDNET(id.nid));
+        if (ni != NULL) {
+                if (ni->ni_lnd->lnd_setasync != NULL) 
+                        rc = (ni->ni_lnd->lnd_setasync)(ni, id, nasync);
+                lnet_ni_decref(ni);
+                return rc;
+        }
+
+        /* Target on a remote network: apply to routers */
+ again:
+        LIBCFS_ALLOC(nids, maxnids * sizeof(*nids));
+        if (nids == NULL)
+                return -ENOMEM;
+        nnids = 0;
+
+        /* Snapshot all the router NIDs */
+        LNET_LOCK();
+        rnet = lnet_find_net_locked(LNET_NIDNET(id.nid));
+        if (rnet != NULL) {
+                list_for_each(tmp, &rnet->lrn_routes) {
+                        if (nnids == maxnids) {
+                                LNET_UNLOCK();
+                                LIBCFS_FREE(nids, maxnids * sizeof(*nids));
+                                maxnids *= 2;
+                                goto again;
+                        }
+                        
+                        route = list_entry(tmp, lnet_route_t, lr_list);
+                        nids[nnids++] = route->lr_gateway->lp_nid;
+                }
+        }
+        LNET_UNLOCK();
+
+        /* set async on all the routers */
+        while (nnids-- > 0) {
+                id.pid = LUSTRE_SRV_LNET_PID;
+                id.nid = nids[nnids];
+
+                ni = lnet_net2ni(LNET_NIDNET(id.nid));
+                if (ni == NULL)
+                        continue;
+                
+                if (ni->ni_lnd->lnd_setasync != NULL) {
+                        rc2 = (ni->ni_lnd->lnd_setasync)(ni, id, nasync);
+                        if (rc2 != 0)
+                                rc = rc2;
+                }
+                lnet_ni_decref(ni);
+        }
+
+        LIBCFS_FREE(nids, maxnids * sizeof(*nids));
+        return rc;
+#endif
+}
+
index eff8daa..d612faf 100644 (file)
@@ -157,6 +157,7 @@ EXPORT_SYMBOL(LNetEQGet);
 EXPORT_SYMBOL(LNetGetId);
 EXPORT_SYMBOL(LNetMDBind);
 EXPORT_SYMBOL(LNetDist);
+EXPORT_SYMBOL(LNetSetAsync);
 EXPORT_SYMBOL(LNetCtl);
 EXPORT_SYMBOL(LNetSetLazyPortal);
 EXPORT_SYMBOL(LNetClearLazyPortal);
index e36301b..92a436f 100644 (file)
@@ -29,6 +29,7 @@ lnd_t               the_ptllnd = {
         .lnd_eager_recv = ptllnd_eager_recv,
         .lnd_notify     = ptllnd_notify,
         .lnd_wait       = ptllnd_wait,
+       .lnd_setasync   = ptllnd_setasync,
 };
 
 static int ptllnd_ni_count = 0;
@@ -83,6 +84,8 @@ ptllnd_history_init(void)
                list_add(&he->he_list, &ptllnd_idle_history);
        }
 
+       PTLLND_HISTORY("Init");
+
        return 0;
 }
 
@@ -123,6 +126,8 @@ void
 ptllnd_dump_history(void)
 {
        ptllnd_he_t    *he;
+
+       PTLLND_HISTORY("dumping...");
        
        while (!list_empty(&ptllnd_history_list)) {
                he = list_entry(ptllnd_history_list.next,
@@ -136,6 +141,8 @@ ptllnd_dump_history(void)
 
                list_add_tail(&he->he_list, &ptllnd_idle_history);
        }
+
+       PTLLND_HISTORY("complete");
 }
 
 void 
@@ -262,7 +269,7 @@ ptllnd_get_tunables(lnet_ni_t *ni)
 
         rc = ptllnd_parse_int_tunable(&max_msg_size,
                                       "PTLLND_MAX_MSG_SIZE",
-                                      PTLLND_MAX_MSG_SIZE);
+                                      PTLLND_MAX_ULND_MSG_SIZE);
         if (rc != 0)
                 return rc;
 
@@ -306,9 +313,17 @@ ptllnd_get_tunables(lnet_ni_t *ni)
        if (rc != 0)
                return rc;
 
+       rc = ptllnd_parse_int_tunable(&plni->plni_dump_on_nak,
+                                     "PTLLND_DUMP_ON_NAK",
+                                     PTLLND_DUMP_ON_NAK);
+       if (rc != 0)
+               return rc;
+
         plni->plni_max_msg_size = max_msg_size & ~7;
-        if (plni->plni_max_msg_size < sizeof(kptl_msg_t))
-                plni->plni_max_msg_size = (sizeof(kptl_msg_t) + 7) & ~7;
+        if (plni->plni_max_msg_size < PTLLND_MIN_BUFFER_SIZE)
+                plni->plni_max_msg_size = PTLLND_MIN_BUFFER_SIZE;
+       CLASSERT ((PTLLND_MIN_BUFFER_SIZE & 7) == 0);
+       CLASSERT (sizeof(kptl_msg_t) <= PTLLND_MIN_BUFFER_SIZE);
 
         plni->plni_buffer_size = plni->plni_max_msg_size * msgs_per_buffer;
 
@@ -369,7 +384,7 @@ ptllnd_destroy_buffer (ptllnd_buffer_t *buf)
 }
 
 int
-ptllnd_grow_buffers (lnet_ni_t *ni)
+ptllnd_size_buffers (lnet_ni_t *ni, int delta)
 {
         ptllnd_ni_t     *plni = ni->ni_data;
         ptllnd_buffer_t *buf;
@@ -380,8 +395,10 @@ ptllnd_grow_buffers (lnet_ni_t *ni)
         CDEBUG(D_NET, "nposted_buffers = %d (before)\n",plni->plni_nposted_buffers);
         CDEBUG(D_NET, "nbuffers = %d (before)\n",plni->plni_nbuffers);
 
-        nmsgs = plni->plni_npeers * plni->plni_peer_credits +
-                plni->plni_msgs_spare;
+       plni->plni_nmsgs += delta;
+       LASSERT(plni->plni_nmsgs >= 0);
+       
+        nmsgs = plni->plni_nmsgs + plni->plni_msgs_spare;
 
         nbufs = (nmsgs * plni->plni_max_msg_size + plni->plni_buffer_size - 1) /
                 plni->plni_buffer_size;
@@ -393,7 +410,7 @@ ptllnd_grow_buffers (lnet_ni_t *ni)
                         return -ENOMEM;
 
                 rc = ptllnd_post_buffer(buf);
-                if (rc != 0){
+                if (rc != 0) {
                         /* TODO - this path seems to orpahn the buffer
                          * in a state where its not posted and will never be
                          * However it does not leak the buffer as it's
@@ -558,8 +575,8 @@ ptllnd_shutdown (lnet_ni_t *ni)
 
        ptllnd_cull_tx_history(plni);
 
-        ptllnd_destroy_buffers(ni);
         ptllnd_close_peers(ni);
+        ptllnd_destroy_buffers(ni);
 
         while (plni->plni_npeers > 0) {
                if (cfs_time_current_sec() > start + w) {
@@ -679,7 +696,7 @@ ptllnd_startup (lnet_ni_t *ni)
                libcfs_id2str((lnet_process_id_t) {
                        .nid = ni->ni_nid, .pid = the_lnet.ln_pid}));
 
-        rc = ptllnd_grow_buffers(ni);
+        rc = ptllnd_size_buffers(ni, 0);
         if (rc != 0)
                 goto failed4;
 
@@ -717,7 +734,7 @@ const char *ptllnd_evtype2str(int type)
                 DO_TYPE(PTL_EVENT_SEND_END);
                 DO_TYPE(PTL_EVENT_UNLINK);
         default:
-                return "";
+                return "<unknown event type>";
         }
 #undef DO_TYPE
 }
@@ -735,7 +752,51 @@ const char *ptllnd_msgtype2str(int type)
                 DO_TYPE(PTLLND_MSG_TYPE_NOOP);
                 DO_TYPE(PTLLND_MSG_TYPE_NAK);
         default:
-                return "";
+                return "<unknown msg type>";
+        }
+#undef DO_TYPE
+}
+
+const char *ptllnd_errtype2str(int type)
+{
+#define DO_TYPE(x) case x: return #x;
+        switch(type)
+        {
+                DO_TYPE(PTL_OK);
+                DO_TYPE(PTL_SEGV);
+                DO_TYPE(PTL_NO_SPACE);
+                DO_TYPE(PTL_ME_IN_USE);
+                DO_TYPE(PTL_NAL_FAILED);
+                DO_TYPE(PTL_NO_INIT);
+                DO_TYPE(PTL_IFACE_DUP);
+                DO_TYPE(PTL_IFACE_INVALID);
+                DO_TYPE(PTL_HANDLE_INVALID);
+                DO_TYPE(PTL_MD_INVALID);
+                DO_TYPE(PTL_ME_INVALID);
+                DO_TYPE(PTL_PROCESS_INVALID);
+                DO_TYPE(PTL_PT_INDEX_INVALID);
+                DO_TYPE(PTL_SR_INDEX_INVALID);
+                DO_TYPE(PTL_EQ_INVALID);
+                DO_TYPE(PTL_EQ_DROPPED);
+                DO_TYPE(PTL_EQ_EMPTY);
+                DO_TYPE(PTL_MD_NO_UPDATE);
+                DO_TYPE(PTL_FAIL);
+                DO_TYPE(PTL_AC_INDEX_INVALID);
+                DO_TYPE(PTL_MD_ILLEGAL);
+                DO_TYPE(PTL_ME_LIST_TOO_LONG);
+                DO_TYPE(PTL_MD_IN_USE);
+                DO_TYPE(PTL_NI_INVALID);
+                DO_TYPE(PTL_PID_INVALID);
+                DO_TYPE(PTL_PT_FULL);
+                DO_TYPE(PTL_VAL_FAILED);
+                DO_TYPE(PTL_NOT_IMPLEMENTED);
+                DO_TYPE(PTL_NO_ACK);
+                DO_TYPE(PTL_EQ_IN_USE);
+                DO_TYPE(PTL_PID_IN_USE);
+                DO_TYPE(PTL_INV_EQ_SIZE);
+                DO_TYPE(PTL_AGAIN);
+        default:
+                return "<unknown error type>";
         }
 #undef DO_TYPE
 }
index 09c0c89..fa71506 100644 (file)
@@ -38,7 +38,8 @@
 # define PTLLND_TX_HISTORY         0
 #endif
 #define PTLLND_WARN_LONG_WAIT      5 /* seconds */
-#define PTLLND_ABORT_ON_NAK        1 /* abort app on protocol version mismatch */
+#define PTLLND_ABORT_ON_NAK        1 /* abort app on (e.g.) protocol version mismatch */
+#define PTLLND_DUMP_ON_NAK         0 /* dump debug? */
 
 
 /* Hack to record history 
@@ -76,6 +77,7 @@ typedef struct
         int                        plni_checksum;
         int                        plni_max_tx_history;
         int                        plni_abort_on_nak;
+        int                        plni_dump_on_nak;
 
         __u64                      plni_stamp;
         struct list_head           plni_active_txs;
@@ -96,6 +98,7 @@ typedef struct
         struct list_head           plni_buffers;
         int                        plni_nbuffers;
         int                        plni_nposted_buffers;
+        int                        plni_nmsgs;
 } ptllnd_ni_t;
 
 #define PTLLND_CREDIT_HIGHWATER(plni) ((plni)->plni_peer_credits - 1)
@@ -106,9 +109,21 @@ typedef struct
         lnet_ni_t                 *plp_ni;
         lnet_process_id_t          plp_id;
         ptl_process_id_t           plp_ptlid;
-        int                        plp_credits;
-        int                        plp_max_credits;
+        int                        plp_credits; /* # msg buffers reserved for me at peer */
+
+        /* credits for msg buffers I've posted for this peer...
+         * outstanding - free buffers I've still to inform my peer about
+         * sent        - free buffers I've told my peer about
+         * lazy        - additional buffers (over and above plni_peer_credits)
+         *               posted to prevent peer blocking on sending a non-RDMA
+         *               messages to me when LNET isn't eagerly responsive to
+         *               the network (i.e. liblustre doesn't have control). 
+         * extra_lazy  - lazy credits not required any more. */
         int                        plp_outstanding_credits;
+        int                        plp_sent_credits;
+        int                        plp_lazy_credits;
+        int                        plp_extra_lazy_credits;
+
         int                        plp_max_msg_size;
         int                        plp_refcount;
         int                        plp_recvd_hello:1;
@@ -221,15 +236,17 @@ int ptllnd_eager_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
 
 ptllnd_tx_t *ptllnd_new_tx(ptllnd_peer_t *peer, int type, int payload_nob);
 void ptllnd_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive);
+int  ptllnd_setasync(lnet_ni_t *ni, lnet_process_id_t id, int n);
 void ptllnd_wait(lnet_ni_t *ni, int milliseconds);
 void ptllnd_check_sends(ptllnd_peer_t *peer);
 void ptllnd_debug_peer(lnet_ni_t *ni, lnet_process_id_t id);
 void ptllnd_destroy_peer(ptllnd_peer_t *peer);
 void ptllnd_close_peer(ptllnd_peer_t *peer, int error);
 int ptllnd_post_buffer(ptllnd_buffer_t *buf);
-int ptllnd_grow_buffers (lnet_ni_t *ni);
+int ptllnd_size_buffers (lnet_ni_t *ni, int delta);
 const char *ptllnd_evtype2str(int type);
 const char *ptllnd_msgtype2str(int type);
+const char *ptllnd_errtype2str(int type);
 char *ptllnd_ptlid2str(ptl_process_id_t id);
 
 static inline void
index bc62e80..96b0345 100644 (file)
@@ -38,6 +38,10 @@ ptllnd_destroy_peer(ptllnd_peer_t *peer)
 {
         lnet_ni_t         *ni = peer->plp_ni;
         ptllnd_ni_t       *plni = ni->ni_data;
+        int                nmsg = peer->plp_lazy_credits +
+                                  plni->plni_peer_credits;
+
+        ptllnd_size_buffers(ni, -nmsg);
 
         LASSERT (peer->plp_closing);
         LASSERT (plni->plni_npeers > 0);
@@ -111,7 +115,7 @@ ptllnd_find_peer(lnet_ni_t *ni, lnet_process_id_t id, int create)
 
         /* New peer: check first for enough posted buffers */
         plni->plni_npeers++;
-        rc = ptllnd_grow_buffers(ni);
+        rc = ptllnd_size_buffers(ni, plni->plni_peer_credits);
         if (rc != 0) {
                 plni->plni_npeers--;
                 return NULL;
@@ -121,19 +125,20 @@ ptllnd_find_peer(lnet_ni_t *ni, lnet_process_id_t id, int create)
         if (plp == NULL) {
                 CERROR("Can't allocate new peer %s\n", libcfs_id2str(id));
                 plni->plni_npeers--;
+                ptllnd_size_buffers(ni, -plni->plni_peer_credits);
                 return NULL;
         }
 
-        CDEBUG(D_NET, "new peer=%p\n",plp);
-
         plp->plp_ni = ni;
         plp->plp_id = id;
         plp->plp_ptlid.nid = LNET_NIDADDR(id.nid);
         plp->plp_ptlid.pid = plni->plni_ptllnd_pid;
-        plp->plp_max_credits =
         plp->plp_credits = 1; /* add more later when she gives me credits */
         plp->plp_max_msg_size = plni->plni_max_msg_size; /* until I hear from her */
+        plp->plp_sent_credits = 1;              /* Implicit credit for HELLO */
         plp->plp_outstanding_credits = plni->plni_peer_credits - 1;
+        plp->plp_lazy_credits = 0;
+        plp->plp_extra_lazy_credits = 0;
         plp->plp_match = 0;
         plp->plp_stamp = 0;
         plp->plp_recvd_hello = 0;
@@ -157,9 +162,12 @@ ptllnd_find_peer(lnet_ni_t *ni, lnet_process_id_t id, int create)
         tx->tx_msg.ptlm_u.hello.kptlhm_matchbits = PTL_RESERVED_MATCHBITS;
         tx->tx_msg.ptlm_u.hello.kptlhm_max_msg_size = plni->plni_max_msg_size;
 
-        PTLLND_HISTORY("%s[%d/%d]: post hello %p", libcfs_id2str(id),
+        PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post hello %p", libcfs_id2str(id),
                        tx->tx_peer->plp_credits,
-                       tx->tx_peer->plp_outstanding_credits, tx);
+                       tx->tx_peer->plp_outstanding_credits,
+                       tx->tx_peer->plp_sent_credits,
+                       plni->plni_peer_credits + 
+                       tx->tx_peer->plp_lazy_credits, tx);
         ptllnd_post_tx(tx);
 
         return plp;
@@ -233,7 +241,7 @@ ptllnd_debug_peer(lnet_ni_t *ni, lnet_process_id_t id)
                 return;
         }
         
-        CDEBUG(D_WARNING, "%s %s%s [%d] "LPD64".%06d m "LPD64" q %d/%d c %d/%d(%d)\n",
+        CDEBUG(D_WARNING, "%s %s%s [%d] "LPD64".%06d m "LPD64" q %d/%d c %d/%d+%d(%d)\n",
                libcfs_id2str(id), 
                plp->plp_recvd_hello ? "H" : "_",
                plp->plp_closing     ? "C" : "_",
@@ -242,7 +250,8 @@ ptllnd_debug_peer(lnet_ni_t *ni, lnet_process_id_t id)
                plp->plp_match,
                ptllnd_count_q(&plp->plp_txq),
                ptllnd_count_q(&plp->plp_activeq),
-               plp->plp_credits, plp->plp_outstanding_credits, plp->plp_max_credits);
+               plp->plp_credits, plp->plp_outstanding_credits, plp->plp_sent_credits,
+               plni->plni_peer_credits + plp->plp_lazy_credits);
 
         CDEBUG(D_WARNING, "txq:\n");
         list_for_each (tmp, &plp->plp_txq) {
@@ -287,7 +296,7 @@ ptllnd_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive)
         ptllnd_peer_t     *peer;
         time_t             start = cfs_time_current_sec();
         int                w = PTLLND_WARN_LONG_WAIT;
-        
+
         /* This is only actually used to connect to routers at startup! */
         if (!alive) {
                 LBUG();
@@ -315,6 +324,46 @@ ptllnd_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive)
         ptllnd_peer_decref(peer);
 }
 
+int
+ptllnd_setasync(lnet_ni_t *ni, lnet_process_id_t id, int nasync)
+{
+        ptllnd_peer_t *peer = ptllnd_find_peer(ni, id, nasync > 0);
+        int            rc;
+        
+        if (peer == NULL)
+                return -ENOMEM;
+
+        LASSERT (peer->plp_lazy_credits >= 0);
+        LASSERT (peer->plp_extra_lazy_credits >= 0);
+
+        /* If nasync < 0, we're being told we can reduce the total message
+         * headroom.  We can't do this right now because our peer might already
+         * have credits for the extra buffers, so we just account the extra
+         * headroom in case we need it later and only destroy buffers when the
+         * peer closes.
+         *
+         * Note that the following condition handles this case, where it
+         * actually increases the extra lazy credit counter. */
+
+        if (nasync <= peer->plp_extra_lazy_credits) {
+                peer->plp_extra_lazy_credits -= nasync;
+                return 0;
+        }
+
+        LASSERT (nasync > 0);
+
+        nasync -= peer->plp_extra_lazy_credits;
+        peer->plp_extra_lazy_credits = 0;
+        
+        rc = ptllnd_size_buffers(ni, nasync);
+        if (rc == 0) {
+                peer->plp_lazy_credits += nasync;
+                peer->plp_outstanding_credits += nasync;
+        }
+
+        return rc;
+}
+
 __u32
 ptllnd_cksum (void *ptr, int nob)
 {
@@ -336,7 +385,7 @@ ptllnd_new_tx(ptllnd_peer_t *peer, int type, int payload_nob)
         ptllnd_tx_t *tx;
         int          msgsize;
 
-        CDEBUG(D_NET, "peer=%p type=%d payload=%d\n",peer,type,payload_nob);
+        CDEBUG(D_NET, "peer=%p type=%d payload=%d\n", peer, type, payload_nob);
 
         switch (type) {
         default:
@@ -375,8 +424,6 @@ ptllnd_new_tx(ptllnd_peer_t *peer, int type, int payload_nob)
         msgsize = (msgsize + 7) & ~7;
         LASSERT (msgsize <= peer->plp_max_msg_size);
 
-        CDEBUG(D_NET, "msgsize=%d\n",msgsize);
-
         LIBCFS_ALLOC(tx, offsetof(ptllnd_tx_t, tx_msg) + msgsize);
 
         if (tx == NULL) {
@@ -534,11 +581,6 @@ ptllnd_set_txiov(ptllnd_tx_t *tx,
                 return 0;
         }
 
-        CDEBUG(D_NET, "niov  =%d\n",niov);
-        CDEBUG(D_NET, "offset=%d\n",offset);
-        CDEBUG(D_NET, "len   =%d\n",len);
-
-
         /*
          * Remove iovec's at the beginning that
          * are skipped because of the offset.
@@ -553,10 +595,6 @@ ptllnd_set_txiov(ptllnd_tx_t *tx,
                 iov++;
         }
 
-        CDEBUG(D_NET, "niov  =%d (after)\n",niov);
-        CDEBUG(D_NET, "offset=%d (after)\n",offset);
-        CDEBUG(D_NET, "len   =%d (after)\n",len);
-
         for (;;) {
                 int temp_offset = offset;
                 int resid = len;
@@ -565,11 +603,6 @@ ptllnd_set_txiov(ptllnd_tx_t *tx,
                         return -ENOMEM;
 
                 for (npiov = 0;; npiov++) {
-                        CDEBUG(D_NET, "npiov=%d\n",npiov);
-                        CDEBUG(D_NET, "offset=%d\n",temp_offset);
-                        CDEBUG(D_NET, "len=%d\n",resid);
-                        CDEBUG(D_NET, "iov[npiov].iov_len=%lu\n",iov[npiov].iov_len);
-
                         LASSERT (npiov < niov);
                         LASSERT (iov->iov_len >= temp_offset);
 
@@ -588,8 +621,6 @@ ptllnd_set_txiov(ptllnd_tx_t *tx,
                 if (npiov == niov) {
                         tx->tx_niov = niov;
                         tx->tx_iov = piov;
-                        CDEBUG(D_NET, "tx->tx_iov=%p\n",tx->tx_iov);
-                        CDEBUG(D_NET, "tx->tx_niov=%d\n",tx->tx_niov);
                         return 0;
                 }
 
@@ -681,7 +712,10 @@ ptllnd_check_sends(ptllnd_peer_t *peer)
         ptl_handle_md_t mdh;
         int             rc;
 
-        CDEBUG(D_NET, "plp_outstanding_credits=%d\n",peer->plp_outstanding_credits);
+        CDEBUG(D_NET, "%s: [%d/%d+%d(%d)\n",
+               libcfs_id2str(peer->plp_id), peer->plp_credits,
+               peer->plp_outstanding_credits, peer->plp_sent_credits,
+               plni->plni_peer_credits + peer->plp_lazy_credits);
 
         if (list_empty(&peer->plp_txq) &&
             peer->plp_outstanding_credits >= PTLLND_CREDIT_HIGHWATER(plni) &&
@@ -700,32 +734,34 @@ ptllnd_check_sends(ptllnd_peer_t *peer)
         while (!list_empty(&peer->plp_txq)) {
                 tx = list_entry(peer->plp_txq.next, ptllnd_tx_t, tx_list);
 
-                CDEBUG(D_NET, "Looking at TX=%p\n",tx);
-                CDEBUG(D_NET, "plp_credits=%d\n",peer->plp_credits);
-                CDEBUG(D_NET, "plp_outstanding_credits=%d\n",peer->plp_outstanding_credits);
-
                 LASSERT (tx->tx_msgsize > 0);
 
                 LASSERT (peer->plp_outstanding_credits >= 0);
-                LASSERT (peer->plp_outstanding_credits <=
-                         plni->plni_peer_credits);
+                LASSERT (peer->plp_sent_credits >= 0);
+                LASSERT (peer->plp_outstanding_credits + peer->plp_sent_credits
+                         <= plni->plni_peer_credits + peer->plp_lazy_credits);
                 LASSERT (peer->plp_credits >= 0);
-                LASSERT (peer->plp_credits <= peer->plp_max_credits);
 
                 if (peer->plp_credits == 0) {   /* no credits */
-                        PTLLND_HISTORY("%s[%d/%d]: no creds for %p",
+                        PTLLND_HISTORY("%s[%d/%d+%d(%d)]: no creds for %p",
                                        libcfs_id2str(peer->plp_id),
                                        peer->plp_credits,
-                                       peer->plp_outstanding_credits, tx);
+                                       peer->plp_outstanding_credits,
+                                       peer->plp_sent_credits,
+                                       plni->plni_peer_credits +
+                                       peer->plp_lazy_credits, tx);
                         break;
                 }
                 
                 if (peer->plp_credits == 1 &&   /* last credit reserved for */
                     peer->plp_outstanding_credits == 0) { /* returning credits */
-                        PTLLND_HISTORY("%s[%d/%d]: too few creds for %p",
+                        PTLLND_HISTORY("%s[%d/%d+%d(%d)]: too few creds for %p",
                                        libcfs_id2str(peer->plp_id),
                                        peer->plp_credits,
-                                       peer->plp_outstanding_credits, tx);
+                                       peer->plp_outstanding_credits,
+                                       peer->plp_sent_credits,
+                                       plni->plni_peer_credits +
+                                       peer->plp_lazy_credits, tx);
                         break;
                 }
                 
@@ -748,12 +784,11 @@ ptllnd_check_sends(ptllnd_peer_t *peer)
                  * until I receive the HELLO back */
                 tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
 
-                CDEBUG(D_NET, "Returning %d to peer\n",peer->plp_outstanding_credits);
-
                 /*
                  * Return all the credits we have
                  */
                 tx->tx_msg.ptlm_credits = peer->plp_outstanding_credits;
+                peer->plp_sent_credits += peer->plp_outstanding_credits;
                 peer->plp_outstanding_credits = 0;
 
                 /*
@@ -782,11 +817,19 @@ ptllnd_check_sends(ptllnd_peer_t *peer)
                         break;
                 }
 
+                LASSERT (tx->tx_type != PTLLND_RDMA_WRITE &&
+                         tx->tx_type != PTLLND_RDMA_READ);
+                
                 tx->tx_reqmdh = mdh;
                 PTLLND_DBGT_STAMP(tx->tx_req_posted);
 
-                PTLLND_HISTORY("%s[%d/%d]: %s %p c %d", libcfs_id2str(peer->plp_id),
-                               peer->plp_credits, peer->plp_outstanding_credits,
+                PTLLND_HISTORY("%s[%d/%d+%d(%d)]: %s %p c %d",
+                               libcfs_id2str(peer->plp_id),
+                               peer->plp_credits,
+                               peer->plp_outstanding_credits,
+                               peer->plp_sent_credits,
+                               plni->plni_peer_credits +
+                               peer->plp_lazy_credits,
                                ptllnd_msgtype2str(tx->tx_type), tx,
                                tx->tx_msg.ptlm_credits);
 
@@ -881,13 +924,6 @@ ptllnd_passive_rdma(ptllnd_peer_t *peer, int type, lnet_msg_t *msg,
                 goto failed;
         }
 
-        CDEBUG(D_NET, "md.start=%p\n",md.start);
-        CDEBUG(D_NET, "md.length=%llu\n",md.length);
-        CDEBUG(D_NET, "md.threshold=%d\n",md.threshold);
-        CDEBUG(D_NET, "md.max_size=%d\n",md.max_size);
-        CDEBUG(D_NET, "md.options=0x%x\n",md.options);
-        CDEBUG(D_NET, "md.user_ptr=%p\n",md.user_ptr);
-
         PTLLND_DBGT_STAMP(tx->tx_bulk_posted);
 
         rc = PtlMDAttach(meh, md, LNET_UNLINK, &mdh);
@@ -922,9 +958,11 @@ ptllnd_passive_rdma(ptllnd_peer_t *peer, int type, lnet_msg_t *msg,
         }
 
         tx->tx_lnetmsg = msg;
-        PTLLND_HISTORY("%s[%d/%d]: post passive %s p %d %p",
+        PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post passive %s p %d %p",
                        libcfs_id2str(msg->msg_target),
                        peer->plp_credits, peer->plp_outstanding_credits,
+                       peer->plp_sent_credits,
+                       plni->plni_peer_credits + peer->plp_lazy_credits,
                        lnet_msgtyp2str(msg->msg_type),
                        (le32_to_cpu(msg->msg_type) == LNET_MSG_PUT) ? 
                        le32_to_cpu(msg->msg_hdr.msg.put.ptl_index) :
@@ -1049,14 +1087,10 @@ ptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *msg)
                 LBUG();
 
         case LNET_MSG_ACK:
-                CDEBUG(D_NET, "LNET_MSG_ACK\n");
-
                 LASSERT (msg->msg_len == 0);
                 break;                          /* send IMMEDIATE */
 
         case LNET_MSG_GET:
-                CDEBUG(D_NET, "LNET_MSG_GET nob=%d\n",msg->msg_md->md_length);
-
                 if (msg->msg_target_is_router)
                         break;                  /* send IMMEDIATE */
 
@@ -1075,10 +1109,8 @@ ptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *msg)
 
         case LNET_MSG_REPLY:
         case LNET_MSG_PUT:
-                CDEBUG(D_NET, "LNET_MSG_PUT nob=%d\n",msg->msg_len);
                 nob = msg->msg_len;
                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);
-                CDEBUG(D_NET, "msg_size=%d max=%d\n",msg->msg_len,plp->plp_max_msg_size);
                 if (nob <= plp->plp_max_msg_size)
                         break;                  /* send IMMEDIATE */
 
@@ -1092,7 +1124,6 @@ ptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *msg)
         /* send IMMEDIATE
          * NB copy the payload so we don't have to do a fragmented send */
 
-        CDEBUG(D_NET, "IMMEDIATE len=%d\n", msg->msg_len);
         tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_IMMEDIATE, msg->msg_len);
         if (tx == NULL) {
                 CERROR("Can't allocate tx for lnet type %d to %s\n",
@@ -1108,9 +1139,11 @@ ptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *msg)
         tx->tx_msg.ptlm_u.immediate.kptlim_hdr = msg->msg_hdr;
 
         tx->tx_lnetmsg = msg;
-        PTLLND_HISTORY("%s[%d/%d]: post immediate %s p %d %p",
+        PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post immediate %s p %d %p",
                        libcfs_id2str(msg->msg_target),
                        plp->plp_credits, plp->plp_outstanding_credits,
+                       plp->plp_sent_credits,
+                       plni->plni_peer_credits + plp->plp_lazy_credits,
                        lnet_msgtyp2str(msg->msg_type),
                        (le32_to_cpu(msg->msg_type) == LNET_MSG_PUT) ? 
                        le32_to_cpu(msg->msg_hdr.msg.put.ptl_index) :
@@ -1131,8 +1164,11 @@ ptllnd_rx_done(ptllnd_rx_t *rx)
 
         plp->plp_outstanding_credits++;
 
-        PTLLND_HISTORY("%s[%d/%d]: rx=%p done\n", libcfs_id2str(plp->plp_id),
-                       plp->plp_credits, plp->plp_outstanding_credits, rx);
+        PTLLND_HISTORY("%s[%d/%d+%d(%d)]: rx=%p done\n",
+                       libcfs_id2str(plp->plp_id),
+                       plp->plp_credits, plp->plp_outstanding_credits, 
+                       plp->plp_sent_credits,
+                       plni->plni_peer_credits + plp->plp_lazy_credits, rx);
 
         ptllnd_check_sends(rx->rx_peer);
 
@@ -1168,7 +1204,6 @@ ptllnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
 
         case PTLLND_MSG_TYPE_IMMEDIATE:
                 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[mlen]);
-                CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE nob=%d\n",nob);
                 if (nob > rx->rx_nob) {
                         CERROR("Immediate message from %s too big: %d(%d)\n",
                                libcfs_id2str(rx->rx_peer->plp_id),
@@ -1184,14 +1219,12 @@ ptllnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
                 break;
 
         case PTLLND_MSG_TYPE_PUT:
-                CDEBUG(D_NET, "PTLLND_MSG_TYPE_PUT offset=%d mlen=%d\n",offset,mlen);
                 rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_READ, msg,
                                         rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
                                         niov, iov, offset, mlen);
                 break;
 
         case PTLLND_MSG_TYPE_GET:
-                CDEBUG(D_NET, "PTLLND_MSG_TYPE_GET\n");
                 if (msg != NULL)
                         rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, msg,
                                                 rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,
@@ -1213,6 +1246,9 @@ ptllnd_abort_on_nak(lnet_ni_t *ni)
 {
         ptllnd_ni_t      *plni = ni->ni_data;
 
+        if (plni->plni_dump_on_nak)
+                ptllnd_dump_history();
+
         if (plni->plni_abort_on_nak)
                 abort();
 }
@@ -1324,13 +1360,12 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
         }
 
         PTLLND_HISTORY("RX %s: %s %d %p", libcfs_id2str(srcid), 
-                       ptllnd_msgtype2str(msg->ptlm_type), msg->ptlm_credits, &rx);
+                       ptllnd_msgtype2str(msg->ptlm_type),
+                       msg->ptlm_credits, &rx);
 
         switch (msg->ptlm_type) {
         case PTLLND_MSG_TYPE_PUT:
         case PTLLND_MSG_TYPE_GET:
-                CDEBUG(D_NET, "PTLLND_MSG_TYPE_%s\n",
-                        msg->ptlm_type==PTLLND_MSG_TYPE_PUT ? "PUT" : "GET");
                 if (nob < basenob + sizeof(kptl_rdma_msg_t)) {
                         CERROR("Short rdma request from %s(%s)\n",
                                libcfs_id2str(srcid),
@@ -1342,7 +1377,6 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
                 break;
 
         case PTLLND_MSG_TYPE_IMMEDIATE:
-                CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE\n");
                 if (nob < offsetof(kptl_msg_t,
                                    ptlm_u.immediate.kptlim_payload)) {
                         CERROR("Short immediate from %s(%s)\n",
@@ -1353,9 +1387,6 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
                 break;
 
         case PTLLND_MSG_TYPE_HELLO:
-                CDEBUG(D_NET, "PTLLND_MSG_TYPE_HELLO from %s(%s)\n",
-                               libcfs_id2str(srcid),
-                               ptllnd_ptlid2str(initiator));
                 if (nob < basenob + sizeof(kptl_hello_msg_t)) {
                         CERROR("Short hello from %s(%s)\n",
                                libcfs_id2str(srcid),
@@ -1369,9 +1400,6 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
                 break;
                 
         case PTLLND_MSG_TYPE_NOOP:
-                CDEBUG(D_NET, "PTLLND_MSG_TYPE_NOOP from %s(%s)\n",
-                               libcfs_id2str(srcid),
-                               ptllnd_ptlid2str(initiator));        
                 break;
 
         default:
@@ -1381,8 +1409,7 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
                 return;
         }
 
-        plp = ptllnd_find_peer(ni, srcid,
-                               msg->ptlm_type == PTLLND_MSG_TYPE_HELLO);
+        plp = ptllnd_find_peer(ni, srcid, 0);
         if (plp == NULL) {
                 CERROR("Can't find peer %s\n", libcfs_id2str(srcid));
                 return;
@@ -1396,20 +1423,11 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
                         return;
                 }
 
-                CDEBUG(D_NET, "maxsz %d match "LPX64" stamp "LPX64"\n",
-                       msg->ptlm_u.hello.kptlhm_max_msg_size,
-                       msg->ptlm_u.hello.kptlhm_matchbits,
-                       msg->ptlm_srcstamp);
-
-                plp->plp_max_msg_size = MAX(plni->plni_max_msg_size,
-                        msg->ptlm_u.hello.kptlhm_max_msg_size);
+                plp->plp_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;
                 plp->plp_match = msg->ptlm_u.hello.kptlhm_matchbits;
                 plp->plp_stamp = msg->ptlm_srcstamp;
-                plp->plp_max_credits += msg->ptlm_credits;
                 plp->plp_recvd_hello = 1;
 
-                CDEBUG(D_NET, "plp_max_msg_size=%d\n",plp->plp_max_msg_size);
-
         } else if (!plp->plp_recvd_hello) {
 
                 CERROR("Bad message type %d (HELLO expected) from %s\n",
@@ -1426,18 +1444,21 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
                 return;
         }
 
+        /* Check peer only sends when I've sent her credits */
+        if (plp->plp_sent_credits == 0) {
+                CERROR("%s[%d/%d+%d(%d)]: unexpected message\n",
+                       libcfs_id2str(plp->plp_id),
+                       plp->plp_credits, plp->plp_outstanding_credits, 
+                       plp->plp_sent_credits,
+                       plni->plni_peer_credits + plp->plp_lazy_credits);
+                return;
+        }
+        plp->plp_sent_credits--;
+        
+        /* No check for credit overflow - the peer may post new buffers after
+         * the startup handshake. */
         if (msg->ptlm_credits > 0) {
-                CDEBUG(D_NET, "Getting back %d credits from peer\n",msg->ptlm_credits);
-                if (plp->plp_credits + msg->ptlm_credits >
-                    plp->plp_max_credits) {
-                        CWARN("Too many credits from %s: %d + %d > %d\n",
-                              libcfs_id2str(srcid),
-                              plp->plp_credits, msg->ptlm_credits,
-                              plp->plp_max_credits);
-                        plp->plp_credits = plp->plp_max_credits;
-                } else {
-                        plp->plp_credits += msg->ptlm_credits;
-                }
+                plp->plp_credits += msg->ptlm_credits;
                 ptllnd_check_sends(plp);
         }
 
@@ -1448,8 +1469,6 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
         rx.rx_nob       = nob;
         plni->plni_nrxs++;
 
-        CDEBUG(D_NET, "rx=%p type=%d\n",&rx,msg->ptlm_type);
-
         switch (msg->ptlm_type) {
         default: /* message types have been checked already */
                 ptllnd_rx_done(&rx);
@@ -1457,20 +1476,15 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
 
         case PTLLND_MSG_TYPE_PUT:
         case PTLLND_MSG_TYPE_GET:
-                CDEBUG(D_NET, "PTLLND_MSG_TYPE_%s\n",
-                        msg->ptlm_type==PTLLND_MSG_TYPE_PUT ? "PUT" : "GET");
                 rc = lnet_parse(ni, &msg->ptlm_u.rdma.kptlrm_hdr,
                                 msg->ptlm_srcnid, &rx, 1);
-                CDEBUG(D_NET, "lnet_parse rc=%d\n",rc);
                 if (rc < 0)
                         ptllnd_rx_done(&rx);
                 break;
 
         case PTLLND_MSG_TYPE_IMMEDIATE:
-                CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE\n");
                 rc = lnet_parse(ni, &msg->ptlm_u.immediate.kptlim_hdr,
                                 msg->ptlm_srcnid, &rx, 0);
-                CDEBUG(D_NET, "lnet_parse rc=%d\n",rc);
                 if (rc < 0)
                         ptllnd_rx_done(&rx);
                 break;
@@ -1492,12 +1506,12 @@ ptllnd_buf_event (lnet_ni_t *ni, ptl_event_t *event)
         LASSERT (event->type == PTL_EVENT_PUT_END ||
                  event->type == PTL_EVENT_UNLINK);
 
-        CDEBUG(D_NET, "buf=%p event=%d\n",buf,event->type);
-
         if (event->ni_fail_type != PTL_NI_OK) {
 
-                CERROR("event type %d, status %d from %s\n",
-                       event->type, event->ni_fail_type,
+                CERROR("event type %s(%d), status %s(%d) from %s\n",
+                       ptllnd_evtype2str(event->type), event->type,
+                       ptllnd_errtype2str(event->ni_fail_type), 
+                       event->ni_fail_type,
                        ptllnd_ptlid2str(event->initiator));
 
         } else if (event->type == PTL_EVENT_PUT_END) {
@@ -1528,8 +1542,6 @@ ptllnd_buf_event (lnet_ni_t *ni, ptl_event_t *event)
         repost = (event->type == PTL_EVENT_UNLINK);
 #endif
 
-        CDEBUG(D_NET, "repost=%d unlinked=%d\n",repost,unlinked);
-
         if (unlinked) {
                 LASSERT(buf->plb_posted);
                 buf->plb_posted = 0;
@@ -1555,19 +1567,16 @@ ptllnd_tx_event (lnet_ni_t *ni, ptl_event_t *event)
 #endif
 
         if (error)
-                CERROR("Error event type %d for %s for %s\n",
-                       event->type, ptllnd_msgtype2str(tx->tx_type),
+                CERROR("Error %s(%d) event %s(%d) unlinked %d, %s(%d) for %s\n",
+                       ptllnd_errtype2str(event->ni_fail_type),
+                       event->ni_fail_type,
+                       ptllnd_evtype2str(event->type), event->type,
+                       unlinked, ptllnd_msgtype2str(tx->tx_type), tx->tx_type,
                        libcfs_id2str(tx->tx_peer->plp_id));
 
         LASSERT (!PtlHandleIsEqual(event->md_handle, PTL_INVALID_HANDLE));
 
-        CDEBUG(D_NET, "tx=%p type=%s (%d)\n",tx,
-                ptllnd_msgtype2str(tx->tx_type),tx->tx_type);
-        CDEBUG(D_NET, "unlinked=%d\n",unlinked);
-        CDEBUG(D_NET, "error=%d\n",error);
-
         isreq = PtlHandleIsEqual(event->md_handle, tx->tx_reqmdh);
-        CDEBUG(D_NET, "isreq=%d\n",isreq);
         if (isreq) {
                 LASSERT (event->md.start == (void *)&tx->tx_msg);
                 if (unlinked) {
@@ -1577,7 +1586,6 @@ ptllnd_tx_event (lnet_ni_t *ni, ptl_event_t *event)
         }
 
         isbulk = PtlHandleIsEqual(event->md_handle, tx->tx_bulkmdh);
-        CDEBUG(D_NET, "isbulk=%d\n",isbulk);
         if ( isbulk && unlinked ) {
                 tx->tx_bulkmdh = PTL_INVALID_HANDLE;
                 PTLLND_DBGT_STAMP(tx->tx_bulk_done);
@@ -1585,10 +1593,12 @@ ptllnd_tx_event (lnet_ni_t *ni, ptl_event_t *event)
 
         LASSERT (!isreq != !isbulk);            /* always one and only 1 match */
 
-        PTLLND_HISTORY("%s[%d/%d]: TX done %p %s%s",
+        PTLLND_HISTORY("%s[%d/%d+%d(%d)]: TX done %p %s%s",
                        libcfs_id2str(tx->tx_peer->plp_id), 
                        tx->tx_peer->plp_credits,
                        tx->tx_peer->plp_outstanding_credits,
+                       tx->tx_peer->plp_sent_credits,
+                       plni->plni_peer_credits + tx->tx_peer->plp_lazy_credits,
                        tx, isreq ? "REQ" : "BULK", unlinked ? "(unlinked)" : "");
 
         LASSERT (!isreq != !isbulk);            /* always one and only 1 match */
@@ -1650,7 +1660,6 @@ ptllnd_tx_event (lnet_ni_t *ni, ptl_event_t *event)
                         tx->tx_status = -EIO;
                 list_del(&tx->tx_list);
                 list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);
-                CDEBUG(D_NET, "tx=%p ONTO ZOMBIE LIST\n",tx);
         }
 }
 
@@ -1683,8 +1692,6 @@ ptllnd_wait (lnet_ni_t *ni, int milliseconds)
         for (;;) {
                 time_t  then = cfs_time_current_sec();
 
-                CDEBUG(D_NET, "Poll(%d)\n", timeout);
-                
                 rc = PtlEQPoll(&plni->plni_eqh, 1,
                                (timeout < 0) ? PTL_TIME_FOREVER : timeout,
                                &event, &which);
@@ -1696,7 +1703,6 @@ ptllnd_wait (lnet_ni_t *ni, int milliseconds)
                                (int)(cfs_time_current_sec() - then));
                 }
                 
-                CDEBUG(D_NET, "PtlEQPoll rc=%d\n",rc);
                 timeout = 0;
 
                 if (rc == PTL_EQ_EMPTY) {
@@ -1717,9 +1723,6 @@ ptllnd_wait (lnet_ni_t *ni, int milliseconds)
                         CERROR("Event queue: size %d is too small\n",
                                plni->plni_eq_size);
 
-                CDEBUG(D_NET, "event.type=%s(%d)\n",
-                       ptllnd_evtype2str(event.type),event.type);
-
                 found = 1;
                 switch (ptllnd_eventarg2type(event.md.user_ptr)) {
                 default:
@@ -1738,7 +1741,6 @@ ptllnd_wait (lnet_ni_t *ni, int milliseconds)
         while (!list_empty(&plni->plni_zombie_txs)) {
                 tx = list_entry(plni->plni_zombie_txs.next,
                                 ptllnd_tx_t, tx_list);
-                CDEBUG(D_NET, "Process ZOMBIE tx=%p\n",tx);
                 ptllnd_tx_done(tx);
         }