Whamcloud - gitweb
* first cut ulnds/ptllnd
authoreeb <eeb>
Tue, 27 Sep 2005 15:33:01 +0000 (15:33 +0000)
committereeb <eeb>
Tue, 27 Sep 2005 15:33:01 +0000 (15:33 +0000)
lnet/include/lnet/lib-lnet.h
lnet/include/lnet/ptllnd_wire.h
lnet/klnds/iiblnd/iiblnd_cb.c
lnet/klnds/openiblnd/openiblnd_cb.c
lnet/klnds/viblnd/viblnd_cb.c
lnet/ulnds/ptllnd/ptllnd.c
lnet/ulnds/ptllnd/ptllnd.h
lnet/ulnds/ptllnd/ptllnd_cb.c

index cf2faa1..46164b4 100644 (file)
@@ -534,20 +534,22 @@ void lnet_copy_kiov2kiov (unsigned int ndkiov, lnet_kiov_t *dkiov,
 
 static inline void
 lnet_copy_iov2flat(int dlen, void *dest, unsigned int doffset,
-                   unsigned int nsiov, struct iovec *siov,
-                   unsigned int soffset, unsigned int nob)
+                   unsigned int nsiov, struct iovec *siov, unsigned int soffset,
+                   unsigned int nob)
 {
         struct iovec diov = {.iov_base = dest, .iov_len = dlen};
+
         lnet_copy_iov2iov(1, &diov, doffset,
                           nsiov, siov, soffset, nob);
 }
 
 static inline void
 lnet_copy_kiov2flat(int dlen, void *dest, unsigned int doffset,
-                    unsigned int nsiov, lnet_kiov_t *skiov,
-                    unsigned int soffset, unsigned int nob)
+                    unsigned int nsiov, lnet_kiov_t *skiov, unsigned int soffset,
+                    unsigned int nob)
 {
         struct iovec diov = {.iov_base = dest, .iov_len = dlen};
+
         lnet_copy_kiov2iov(1, &diov, doffset,
                            nsiov, skiov, soffset, nob);
 }
index a14a4d2..0c724d5 100644 (file)
@@ -4,18 +4,18 @@
 
 #define PTLLND_PORTAL           9          /* The same portal PTLPRC used when talking to cray portals */
 #define PTLLND_PID              9          /* The Portals PID */
-#define PTLLND_PEERCREDITS      8          /* concurrent sends to 1 peer*/
+#define PTLLND_PEERCREDITS      8          /* concurrent sends to 1 peer */
 #define PTLLND_MAX_MSG_SIZE     512        /* Maximum message size */
 
 
 /************************************************************************
- * Portal NAL Wire message format.
+ * Portals LNS Wire message format.
  * These are sent in sender's byte order (i.e. receiver flips).
  */
 
-#define PTL_RESERVED_MATCHBITS  0x100   /* below this value is reserved
-                                         * above is for bult data transfer */
-#define LNET_MSG_MATCHBITS       0       /* the value for the message channel */
+#define PTL_RESERVED_MATCHBITS  0x100  /* below this value is reserved
+                                         * above is for bulk data transfer */
+#define LNET_MSG_MATCHBITS       0      /* the value for the message channel */
 
 typedef struct
 {
@@ -38,7 +38,7 @@ typedef struct
 typedef struct kptl_msg
 {
         /* First 2 fields fixed FOR ALL TIME */
-        __u32           ptlm_magic;     /* I'm an ptl NAL message */
+        __u32           ptlm_magic;     /* I'm a Portals LND message */
         __u16           ptlm_version;   /* this is my version number */
         __u8            ptlm_type;      /* the message type */
         __u8            ptlm_credits;   /* returned credits */
index 08474b5..21bd3bc 100644 (file)
@@ -1605,7 +1605,7 @@ kibnal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
                 if (msg_nob > rx->rx_nob) {
                         CERROR ("Immediate message from %s too big: %d(%d)\n",
                                 libcfs_nid2str(rxmsg->ibm_u.immediate.ibim_hdr.src_nid),
-                                rlen, rx->rx_nob);
+                                msg_nob, rx->rx_nob);
                         rc = -EPROTO;
                         break;
                 }
index 509a9f4..067ab2b 100644 (file)
@@ -1337,7 +1337,7 @@ kibnal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
                 if (msg_nob > rx->rx_nob) {
                         CERROR ("Immediate message from %s too big: %d(%d)\n",
                                 libcfs_nid2str(rxmsg->ibm_u.immediate.ibim_hdr.src_nid),
-                                rlen, rx->rx_nob);
+                                msg_nob, rx->rx_nob);
                         rc = -EPROTO;
                         break;
                 }
index 439d836..948f235 100644 (file)
@@ -1623,7 +1623,7 @@ kibnal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
                 if (nob > rx->rx_nob) {
                         CERROR ("Immediate message from %s too big: %d(%d)\n",
                                 libcfs_nid2str(rxmsg->ibm_u.immediate.ibim_hdr.src_nid),
-                                rlen, rx->rx_nob);
+                                nob, rx->rx_nob);
                         rc = -EPROTO;
                         break;
                 }
@@ -2350,11 +2350,19 @@ kibnal_recv_connreq(cm_cep_handle_t *cep, cm_request_data_t *cmreq)
 
         vvrc = gid2gid_index(kibnal_data.kib_hca, cv->cv_port,
                              &cv->cv_path.sgid, &cv->cv_sgid_index);
-        LASSERT (vvrc == vv_return_ok);
+        if (vvrc != vv_return_ok) {
+                CERROR("gid2gid_index failed for %s: %d\n",
+                       libcfs_nid2str(rxmsg.ibm_srcnid), vvrc);
+                goto reject;
+        }
         
         vvrc = pkey2pkey_index(kibnal_data.kib_hca, cv->cv_port,
                                cv->cv_path.pkey, &cv->cv_pkey_index);
-        LASSERT (vvrc == vv_return_ok);
+        if (vvrc != vv_return_ok) {
+                CERROR("pkey2pkey_index failed for %s: %d\n",
+                       libcfs_nid2str(rxmsg.ibm_srcnid), vvrc);
+                goto reject;
+        }
 
         rc = kibnal_set_qp_state(conn, vv_qp_state_init);
         if (rc != 0)
@@ -2729,7 +2737,6 @@ kibnal_arp_done (kib_conn_t *conn)
                       cv->cv_arprc);
                 goto failed;
         }
-        
 
         if ((arp->mask & IBAT_PRI_PATH_VALID) != 0) {
                 CDEBUG(D_NET, "Got valid path for %s\n",
index 6e40ea7..234cffb 100644 (file)
@@ -32,17 +32,359 @@ lnd_t               the_ptllnd = {
         .lnd_eager_recv = ptllnd_eager_recv,
 };
 
+static int ptllnd_ni_count = 0;
 
-void ptllnd_shutdown(struct lnet_ni *ni)
+int
+ptllnd_parse_int_tunable(int *value, char *name, int dflt)
 {
+        char    *env = getenv(name);
+        char    *end;
+
+        if (env == NULL) {
+                *value = dflt;
+                return 0;
+        }
+        
+        *value = strtoull(env, &end, 0);
+        if (*end == 0)
+                return 0;
+        
+        CERROR("Can't parse tunable %s=%s\n", name, env);
+        return -EINVAL;
 }
 
-int ptllnd_startup(struct lnet_ni *ni)
+int
+ptllnd_get_tunables(lnet_ni_t *ni)
 {
+        ptllnd_ni_t *plni = ni->ni_data;
+        int          max_immediate;
+        int          msgs_per_buffer;
+        int          rc;
+
+        rc = ptllnd_parse_int_tunable(&plni->plni_portal,
+                                      "PTLLND_PORTAL", PTLLND_PORTAL);
+        if (rc != 0)
+                return rc;
+
+        rc = ptllnd_parse_int_tunable(&plni->plni_pid,
+                                      "PTLLND_PID", PTLLND_PID);
+        if (rc != 0)
+                return rc;
+
+        rc = ptllnd_parse_int_tunable(&plni->plni_peer_credits,
+                                      "PTLLND_PEERCREDITS", PTLLND_PEERCREDITS);
+        if (rc != 0)
+                return rc;
+
+        rc = ptllnd_parse_int_tunable(&max_immediate,
+                                      "PTLLND_MAX_MSG_SIZE", 
+                                      PTLLND_MAX_MSG_SIZE);
+        if (rc != 0)
+                return rc;
+        
+        rc = ptllnd_parse_int_tunable(&msgs_per_buffer,
+                                      "PTLLND_MSGS_PER_BUFFER", 
+                                      PTLLND_MSGS_PER_BUFFER);
+        if (rc != 0)
+                return rc;
+
+        rc = ptllnd_parse_int_tunable(&plni->plni_msgs_spare,
+                                      "PTLLND_MSGS_SPARE", 
+                                      PTLLND_MSGS_SPARE);
+        if (rc != 0)
+                return rc;
+
+        rc = ptllnd_parse_int_tunable(&plni->plni_peer_hash_size,
+                                      "PTLLND_PEER_HASH_SIZE", 
+                                      PTLLND_PEER_HASH_SIZE);
+        if (rc != 0)
+                return rc;
+        
+        rc = ptllnd_parse_int_tunable(&plni->plni_eq_size,
+                                      "PTLLND_EQ_SIZE", PTLLND_EQ_SIZE);
+        if (rc != 0)
+                return rc;
+
+        plni->plni_max_msg_size =
+                offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[max_immediate]);
+        if (plni->plni_max_msg_size < sizeof(kptl_msg_t))
+                plni->plni_max_msg_size = sizeof(kptl_msg_t);
+        
+        plni->plni_buffer_size = plni->plni_max_msg_size * msgs_per_buffer;
+        return 0;
+}
+
+ptllnd_buffer_t *
+ptllnd_create_buffer (lnet_ni_t *ni) 
+{
+        ptllnd_ni_t     *plni = ni->ni_data;
+        ptllnd_buffer_t *buf;
+        
+        PORTAL_ALLOC(buf, sizeof(*buf));
+        if (buf == NULL) {
+                CERROR("Can't allocate buffer descriptor\n");
+                return NULL;
+        }
+
+        buf->plb_ni = ni;
+        buf->plb_posted = 0;
+        
+        PORTAL_ALLOC(buf->plb_buffer, plni->plni_buffer_size);
+        if (buf->plb_buffer == NULL) {
+                CERROR("Can't allocate buffer size %d\n",
+                       plni->plni_buffer_size);
+                PORTAL_FREE(buf, sizeof(*buf));
+                return NULL;
+        }
+        
+        list_add(&buf->plb_list, &plni->plni_buffers);
+        plni->plni_nbuffers++;
+
+        return buf;
+}
+
+void
+ptllnd_destroy_buffer (ptllnd_buffer_t *buf)
+{
+        ptllnd_ni_t     *plni = buf->plb_ni->ni_data;
+
+        LASSERT (!buf->plb_posted);
+
+        plni->plni_nbuffers--;
+        list_del(&buf->plb_list);
+        PORTAL_FREE(buf->plb_buffer, plni->plni_buffer_size);
+        PORTAL_FREE(buf, sizeof(*buf));
+}
+
+int
+ptllnd_grow_buffers (lnet_ni_t *ni)
+{
+        ptllnd_ni_t     *plni = ni->ni_data;
+        ptllnd_buffer_t *buf;
+        int              nmsgs;
+        int              nbufs;
+        int              rc;
+        
+        nmsgs = plni->plni_npeers * plni->plni_peer_credits +
+                plni->plni_msgs_spare;
+
+        nbufs = (nmsgs * plni->plni_max_msg_size + plni->plni_buffer_size - 1) /
+                plni->plni_buffer_size;
+        
+        while (nbufs > plni->plni_nbuffers) {
+                buf = ptllnd_create_buffer(ni);
+                
+                if (buf == NULL)
+                        return -ENOMEM;
+
+                rc = ptllnd_post_buffer(buf);
+                if (rc != 0)
+                        return rc;
+        }
+        
+        return 0;
+}
+
+void
+ptllnd_destroy_buffers (lnet_ni_t *ni)
+{
+        ptllnd_ni_t       *plni = ni->ni_data;
+        ptllnd_buffer_t   *buf;
+        struct list_head  *tmp;
+        struct list_head  *nxt;
+        
+        list_for_each_safe(tmp, nxt, &plni->plni_buffers) {
+                buf = list_entry(tmp, ptllnd_buffer_t, plb_list);
+                
+                LASSERT (plni->plni_nbuffers > 0);
+                if (buf->plb_posted) {
+                        LASSERT (plni->plni_nposted_buffers > 0);
+
+#ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS                        
+                        (void) PtlMDUnlink(buf->plb_md);
+                        while (!buf->plb_posted)
+                                ptllnd_wait(ni, -1);
+#else
+                        while (buf->plb_posted) {
+                                rc = PtlMDUnlink(buf->plb_md);
+                                if (rc == PTL_OK) {
+                                        buf->plb_posted = 0;
+                                        plni->plni_nposted_buffers--;
+                                        break;
+                                }
+                                LASSERT (rc == PTL_MD_IN_USE);
+                                ptllnd_wait(ni, -1);
+                        }
+#endif
+                }
+                ptllnd_destroy_buffer(buf);
+        }
+
+        LASSERT (plni->plni_nposted_buffers == 0);
+        LASSERT (plni->plni_nbuffers == 0);
+}
+
+int
+ptllnd_create_peer_hash (lnet_ni_t *ni)
+{
+        ptllnd_ni_t *plni = ni->ni_data;
+        int          i;
+
+        PORTAL_ALLOC(plni->plni_peer_hash, 
+                     plni->plni_peer_hash_size * sizeof(*plni->plni_peer_hash));
+        if (plni->plni_peer_hash == NULL) {
+                CERROR("Can't allocate ptllnd peer hash (size %d)\n",
+                       plni->plni_peer_hash_size);
+                return -ENOMEM;
+        }
+
+        for (i = 0; i < plni->plni_peer_hash_size; i++)
+                CFS_INIT_LIST_HEAD(&plni->plni_peer_hash[i]);
+        
+        return 0;
+}
+
+void 
+ptllnd_destroy_peer_hash (lnet_ni_t *ni)
+{
+        ptllnd_ni_t    *plni = ni->ni_data;
+        int             i;
+
+        for (i = 0; i < plni->plni_peer_hash_size; i++)
+                LASSERT (list_empty(&plni->plni_peer_hash[i]));
+        
+        PORTAL_FREE(plni->plni_peer_hash,
+                    plni->plni_peer_hash_size * sizeof(*plni->plni_peer_hash));
+}
+
+void
+ptllnd_close_peers (lnet_ni_t *ni)
+{
+        ptllnd_ni_t    *plni = ni->ni_data;
+        ptllnd_peer_t  *plp;
+        int             i;
+
+        for (i = 0; i < plni->plni_peer_hash_size; i++)
+                while (!list_empty(&plni->plni_peer_hash[i])) {
+                        plp = list_entry(plni->plni_peer_hash[i].next,
+                                         ptllnd_peer_t, plp_list);
+
+                        ptllnd_close_peer(plp);
+                }
+}
+
+__u64
+ptllnd_get_timestamp(void)
+{
+        struct timeval  tv;
+        int             rc = gettimeofday(&tv, NULL);
+
+        LASSERT (rc == 0);
+        return ((__u64)tv.tv_sec) * 1000000 + tv.tv_usec;
+}
+
+void 
+ptllnd_shutdown (lnet_ni_t *ni)
+{
+        ptllnd_ni_t *plni = ni->ni_data;
+        int          rc;
+
+        LASSERT (ptllnd_ni_count == 1);
+
+        ptllnd_destroy_buffers(ni);
+        ptllnd_close_peers(ni);
+        ptllnd_abort_txs(ni);
+
+        while (plni->plni_npeers > 0)
+                ptllnd_wait(ni, -1);
+
+        LASSERT (plni->plni_ntxs == 0);
+        LASSERT (plni->plni_nrxs == 0);
+        
+        rc = PtlEQFree(plni->plni_eqh);
+        LASSERT (rc == PTL_OK);
+        
+        rc = PtlNIFini(plni->plni_nih);
+        LASSERT (rc == PTL_OK);
+        
+        ptllnd_destroy_peer_hash(ni);
+        PORTAL_FREE(plni, sizeof(*plni));
+        ptllnd_ni_count--;
+}
+
+int
+ptllnd_startup (lnet_ni_t *ni)
+{
+        ptllnd_ni_t *plni;
+        int          rc;
+        
        /* could get limits from portals I guess... */
        ni->ni_maxtxcredits = 
        ni->ni_peertxcredits = 1000;
 
+        if (ptllnd_ni_count != 0) {
+                CERROR("Can't have > 1 instance of ptllnd\n");
+                return -EPERM;
+        }
+
+        ptllnd_ni_count++;
+        
+        PORTAL_ALLOC(plni, sizeof(*plni));
+        if (plni == NULL) {
+                CERROR("Can't allocate ptllnd state\n");
+                rc = -ENOMEM;
+                goto failed0;
+        }
+        
+        ni->ni_data = plni;
+
+        plni->plni_stamp = ptllnd_get_timestamp();
+        plni->plni_nrxs = 0;
+        plni->plni_ntxs = 0;
+        CFS_INIT_LIST_HEAD(&plni->plni_active_txs);
+        CFS_INIT_LIST_HEAD(&plni->plni_zombie_txs);
+
+        rc = ptllnd_get_tunables(ni);
+        if (rc != 0)
+                goto failed1;
+
+        rc = ptllnd_create_peer_hash(ni);
+        if (rc != 0)
+                goto failed1;
+
+        rc = PtlNIInit(PTL_IFACE_DEFAULT, plni->plni_pid,
+                       NULL, NULL, &plni->plni_nih);
+        if (rc != PTL_OK && rc != PTL_IFACE_DUP) {
+                CERROR("PtlNIInit failed: %d\n", rc);
+                rc = -ENODEV;
+                goto failed2;
+        }
+
+        rc = PtlEQAlloc(plni->plni_nih, plni->plni_eq_size,
+                        PTL_EQ_HANDLER_NONE, &plni->plni_eqh);
+        if (rc != PTL_OK) {
+                CERROR("PtlEQAlloc failed: %d\n", rc);
+                rc = -ENODEV;
+                goto failed3;
+        }
+
+        rc = ptllnd_grow_buffers(ni);
+        if (rc != 0)
+                goto failed4;
+
        return 0;
+
+ failed4:
+        ptllnd_destroy_buffers(ni);
+        PtlEQFree(plni->plni_eqh);
+ failed3:
+        PtlNIFini(plni->plni_nih);
+ failed2:
+        ptllnd_destroy_peer_hash(ni);
+ failed1:
+        PORTAL_FREE(plni, sizeof(*plni));
+ failed0:
+        ptllnd_ni_count--;
+        return rc;
 }
 
index af64fef..ab48eb1 100644 (file)
 
 #include <portals/p30.h>
 
+#define LUSTRE_PORTALS_UNLINK_SEMANTICS
+
+#define PTLLND_MSGS_PER_BUFFER     64
+#define PTLLND_MSGS_SPARE          256
+#define PTLLND_PEER_HASH_SIZE      101
+#define PTLLND_EQ_SIZE             1024
+#define PTLLND_NTX                 256
+
+#ifdef PTL_MD_LUSTRE_COMPLETION_SEMANTICS
+# define PTLLND_MD_OPTIONS        (PTL_MD_LUSTRE_COMPLETION_SEMANTICS |\
+                                   PTL_MD_EVENT_START_DISABLE)
+#else
+# define PTLLND_MD_OPTIONS         PTL_MD_EVENT_START_DISABLE
+#endif   
+                                     
+typedef struct
+{
+        int                        plni_portal;
+        ptl_pid_t                  plni_pid;
+        int                        plni_peer_credits;
+        int                        plni_max_msg_size;
+        int                        plni_buffer_size;
+        int                        plni_msgs_spare;
+        int                        plni_peer_hash_size;
+        int                        plni_eq_size;
+
+        __u64                      plni_stamp;
+        struct list_head           plni_active_txs;
+        struct list_head           plni_zombie_txs;
+        int                        plni_ntxs;
+        int                        plni_nrxs;
+
+        ptl_handle_ni_t            plni_nih;
+        ptl_handle_eq_t            plni_eqh;
+
+        struct list_head          *plni_peer_hash;
+        int                        plni_npeers;
+
+        struct list_head           plni_buffers;
+        int                        plni_nbuffers;
+        int                        plni_nposted_buffers;
+} ptllnd_ni_t;
+
+#define PTLLND_CREDIT_HIGHWATER(plni) ((plni)->plni_peer_credits - 1)
+
+typedef struct
+{
+        struct list_head           plp_list;
+        lnet_ni_t                 *plp_ni;
+        lnet_nid_t                 plp_nid;
+        ptl_process_id_t           plp_ptlid;
+        int                        plp_credits;
+        int                        plp_max_credits;
+        int                        plp_outstanding_credits;
+        int                        plp_max_msg_size;
+        int                        plp_refcount;
+        int                        plp_recvd_hello:1;
+        int                        plp_closing:1;
+        __u64                      plp_match;
+        __u64                      plp_stamp;
+        __u64                      plp_seq;
+        struct list_head           plp_txq;
+} ptllnd_peer_t;
+
+typedef struct
+{
+        struct list_head           plb_list;
+        lnet_ni_t                 *plb_ni;
+        int                        plb_posted;
+        int                        plb_refcount;
+        ptl_handle_md_t            plb_md;
+        char                      *plb_buffer;
+} ptllnd_buffer_t;
+
+typedef struct
+{
+        ptllnd_peer_t             *rx_peer;
+        kptl_msg_t                *rx_msg;
+        int                        rx_nob;
+        int                        rx_replied;
+        char                       rx_space[0];
+} ptllnd_rx_t;
+
 typedef struct
 {
-        ptl_handle_ni_t            pls_ni;
-        ptl_handle_eq_t            pls_eq;
+        struct list_head           tx_list;
+        int                        tx_type;
+        int                        tx_status;
+        ptllnd_peer_t             *tx_peer;
+        lnet_msg_t                *tx_lnetmsg;
+        lnet_msg_t                *tx_lnetreplymsg;
+        unsigned int               tx_niov;
+        ptl_md_iovec_t            *tx_iov;
+        ptl_handle_md_t            tx_bulkmdh;
+        ptl_handle_md_t            tx_reqmdh;
+        int                        tx_completing; /* someone already completing */
+        int                        tx_msgsize;  /* # bytes in tx_msg */
+        kptl_msg_t                 tx_msg;      /* message to send */
+} ptllnd_tx_t;
+
+#define PTLLND_RDMA_WRITE           0x100       /* pseudo message type */
+#define PTLLND_RDMA_READ            0x101       /* (no msg actually sent) */
+
+/* Hack to extract object type from event's user_ptr relies on (and checks)
+ * that structs are somewhat aligned. */
+#define PTLLND_EVENTARG_TYPE_TX     0x1
+#define PTLLND_EVENTARG_TYPE_BUF    0x2
+#define PTLLND_EVENTARG_TYPE_MASK   0x3
+
+static inline void *
+ptllnd_obj2eventarg (void *obj, int type) 
+{
+        unsigned long ptr = (unsigned long)obj;
+        
+        LASSERT ((ptr & PTLLND_EVENTARG_TYPE_MASK) == 0);
+        LASSERT ((type & ~PTLLND_EVENTARG_TYPE_MASK) == 0);
+        
+        return (void *)(ptr | type);
+}
+
+static inline int
+ptllnd_eventarg2type (void *arg) 
+{
+        unsigned long ptr = (unsigned long)arg;
+        
+        return (ptr & PTLLND_EVENTARG_TYPE_MASK);
+}
+
+static inline void *
+ptllnd_eventarg2obj (void *arg) 
+{
+        unsigned long ptr = (unsigned long)arg;
+        
+        return (void *)(ptr & ~PTLLND_EVENTARG_TYPE_MASK);
+}
+
+int ptllnd_startup(lnet_ni_t *ni);
+void ptllnd_shutdown(lnet_ni_t *ni);
+int ptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *msg);
+int ptllnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
+                int delayed, unsigned int niov, 
+                struct iovec *iov, lnet_kiov_t *kiov,
+                unsigned int offset, unsigned int mlen, unsigned int rlen);
+int ptllnd_eager_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
+                      void **new_privatep);
+ptllnd_tx_t *ptllnd_new_tx(ptllnd_peer_t *peer, int type, int payload_nob);
+void ptllnd_wait(lnet_ni_t *ni, int milliseconds);
+void ptllnd_check_sends(ptllnd_peer_t *peer);
+void ptllnd_destroy_peer(ptllnd_peer_t *peer);
+void ptllnd_abort_txs(lnet_ni_t *ni);
+void ptllnd_close_peer(ptllnd_peer_t *peer);
+int ptllnd_post_buffer(ptllnd_buffer_t *buf);
+int ptllnd_grow_buffers (lnet_ni_t *ni);
+
+static inline void
+ptllnd_peer_addref (ptllnd_peer_t *peer) 
+{
+        LASSERT (peer->plp_refcount > 0);
+        peer->plp_refcount++;
+}
+
+static inline void
+ptllnd_peer_decref (ptllnd_peer_t *peer)
+{
+        LASSERT (peer->plp_refcount > 0);
+        peer->plp_refcount--;
+        if (peer->plp_refcount == 0)
+                ptllnd_destroy_peer(peer);
+}
+
+static inline void
+ptllnd_post_tx(ptllnd_tx_t *tx)
+{
+        ptllnd_peer_t *peer = tx->tx_peer;
 
-} ptllnd_state;
+        list_add_tail(&tx->tx_list, &peer->plp_txq);
+        ptllnd_check_sends(peer);
+}
 
-extern int ptllnd_startup(struct lnet_ni *ni);
-extern void ptllnd_shutdown(struct lnet_ni *ni);
-extern int ptllnd_send(struct lnet_ni *ni, void *private, lnet_msg_t *msg);
-extern int ptllnd_recv(struct lnet_ni *ni, void *private, lnet_msg_t *msg,
-                      int delayed, unsigned int niov, 
-                      struct iovec *iov, lnet_kiov_t *kiov,
-                      unsigned int offset, unsigned int mlen, unsigned int rlen);
-extern int ptllnd_eager_recv(struct lnet_ni *ni, void *private, lnet_msg_t *msg,
-                            void **new_privatep);
index f5ceb33..7a0cfe9 100644 (file)
 
 #include "ptllnd.h"
 
-int pltlnd_send(struct lnet_ni *ni, void *private, lnet_msg_t *msg)
+char *
+ptllnd_ptlid2str(ptl_process_id_t id)
 {
-       return -EIO;
+        static char strs[32][16];
+        static int  idx = 0;
+        
+        snprintf(strs[idx], sizeof(strs[0]), 
+                 "%d-"LPD64, id.pid, (__u64)id.nid);
+        
+        return strs[idx++];
 }
 
-int pltlnd_recv(struct lnet_ni *ni, void *private, lnet_msg_t *msg,
-               int delayed, unsigned int niov, 
-               struct iovec *iov, lnet_kiov_t *kiov,
-               unsigned int offset, unsigned int mlen, unsigned int rlen)
+void
+ptllnd_destroy_peer(ptllnd_peer_t *peer)
 {
-       return -EIO;
+        lnet_ni_t         *ni = peer->plp_ni;
+        ptllnd_ni_t       *plni = ni->ni_data;
+
+        LASSERT (peer->plp_closing);
+        LASSERT (plni->plni_npeers > 0);
+        plni->plni_npeers--;
+        PORTAL_FREE(peer, sizeof(*peer));
+}
+
+void
+ptllnd_close_peer(ptllnd_peer_t *peer)
+{
+        lnet_ni_t   *ni = peer->plp_ni;
+        ptllnd_ni_t *plni = ni->ni_data;
+
+        if (peer->plp_closing)
+                return;
+
+        peer->plp_closing = 1;
+
+        list_del(&peer->plp_list);
+        ptllnd_peer_decref(peer);
+
+        while (!list_empty(&peer->plp_txq)) {
+                ptllnd_tx_t *tx = list_entry(peer->plp_txq.next,
+                                             ptllnd_tx_t, tx_list);
+                tx->tx_status = -ESHUTDOWN;
+                list_del(&tx->tx_list);
+                list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);
+        }
+}
+
+ptllnd_peer_t *
+ptllnd_find_peer(lnet_ni_t *ni, lnet_nid_t nid, int create)
+{
+        ptllnd_ni_t       *plni = ni->ni_data;
+        unsigned int       hash = PTL_NIDADDR(nid) % plni->plni_peer_hash_size;
+        struct list_head  *tmp;
+        ptllnd_peer_t     *plp;
+        ptllnd_tx_t       *tx;
+        int                rc;
+
+        LASSERT (PTL_NIDNET(nid) == PTL_NIDNET(ni->ni_nid));
+
+        list_for_each(tmp, &plni->plni_peer_hash[hash]) {
+                plp = list_entry(tmp, ptllnd_peer_t, plp_list);
+                
+                if (plp->plp_nid == nid) {
+                        ptllnd_peer_addref(plp);
+                        return plp;
+                }
+        }
+
+        if (!create)
+                return NULL;
+        
+        /* New peer: check first for enough posted buffers */
+        plni->plni_npeers++;
+        rc = ptllnd_grow_buffers(ni);
+        if (rc != 0) {
+                plni->plni_npeers--;
+                return NULL;
+        }
+        
+        PORTAL_ALLOC(plp, sizeof(*plp));
+        if (plp == NULL) {
+                CERROR("Can't allocate new peer %s\n",
+                       libcfs_nid2str(nid));
+                plni->plni_npeers--;
+                return NULL;
+        }
+
+        plp->plp_ni = ni;
+        plp->plp_nid = nid;
+        plp->plp_ptlid.nid = PTL_NIDADDR(nid);
+        plp->plp_ptlid.pid = plni->plni_pid;
+        plp->plp_max_credits =
+        plp->plp_credits = 1; /* add more later when she gives me credits */
+        plp->plp_max_msg_size = sizeof(kptl_msg_t); /* until I hear from her */
+        plp->plp_outstanding_credits = plni->plni_peer_credits - 1;
+        plp->plp_match = 0;
+        plp->plp_recvd_hello = 0;
+        plp->plp_closing = 0;
+        plp->plp_refcount = 1;
+
+        ptllnd_peer_addref(plp);
+        list_add_tail(&plp->plp_list, &plni->plni_peer_hash[hash]);
+        
+        tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_HELLO, 0);
+        if (tx == NULL) {
+                CERROR("Can't send HELLO to %s\n", libcfs_nid2str(nid));
+                ptllnd_close_peer(plp);
+                ptllnd_peer_decref(plp);
+                return NULL;
+        }
+
+        tx->tx_msg.ptlm_u.hello.kptlhm_matchbits = PTL_RESERVED_MATCHBITS;
+        tx->tx_msg.ptlm_u.hello.kptlhm_max_immd_size = 
+                plni->plni_max_msg_size -
+                offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload);
+
+        ptllnd_post_tx(tx);
+        return plp;
+}
+
+ptllnd_tx_t *
+ptllnd_new_tx(ptllnd_peer_t *peer, int type, int payload_nob)
+{
+        lnet_ni_t   *ni = peer->plp_ni;
+        ptllnd_ni_t *plni = ni->ni_data;
+        ptllnd_tx_t *tx;
+        int          msgsize;
+        
+        switch (type) {
+        default:
+                LBUG();
+
+        case PTLLND_RDMA_WRITE:
+        case PTLLND_RDMA_READ:
+                LASSERT (payload_nob == 0);
+                msgsize = 0;
+                break;
+                
+        case PTLLND_MSG_TYPE_PUT:
+        case PTLLND_MSG_TYPE_GET:
+                LASSERT (payload_nob == 0);
+                msgsize = offsetof(kptl_msg_t, ptlm_u) +
+                          sizeof(kptl_request_msg_t);
+                break;
+                
+        case PTLLND_MSG_TYPE_IMMEDIATE:
+                msgsize = offsetof(kptl_msg_t, 
+                                   ptlm_u.immediate.kptlim_payload[payload_nob]);
+                break;
+                
+        case PTLLND_MSG_TYPE_NOOP:
+                LASSERT (payload_nob == 0);
+                msgsize = offsetof(kptl_msg_t, ptlm_u);
+                break;
+                
+        case PTLLND_MSG_TYPE_HELLO:
+                LASSERT (payload_nob == 0);
+                msgsize = offsetof(kptl_msg_t, ptlm_u) +
+                          sizeof(kptl_hello_msg_t);
+                break;
+        }
+
+        LASSERT (msgsize <= peer->plp_max_msg_size);
+
+        PORTAL_ALLOC(tx, offsetof(ptllnd_tx_t, tx_msg) + msgsize);
+        
+        if (tx == NULL) {
+                CERROR("Can't allocate msg type %d for %s\n",
+                       type, libcfs_nid2str(peer->plp_nid));
+                return NULL;
+        }
+
+        CFS_INIT_LIST_HEAD(&tx->tx_list);
+        tx->tx_peer = peer;
+        tx->tx_type = type;
+        tx->tx_lnetmsg = tx->tx_lnetreplymsg = NULL;
+        tx->tx_niov = 0;
+        tx->tx_iov = NULL;
+        tx->tx_reqmdh = PTL_INVALID_HANDLE;
+        tx->tx_bulkmdh = PTL_INVALID_HANDLE;
+        tx->tx_msgsize = msgsize;
+        tx->tx_status = 0;
+
+        if (msgsize != 0) {
+                tx->tx_msg.ptlm_magic = PTLLND_MSG_MAGIC;
+                tx->tx_msg.ptlm_version = PTLLND_MSG_VERSION;
+                tx->tx_msg.ptlm_type = type;
+                tx->tx_msg.ptlm_credits = 0;
+                tx->tx_msg.ptlm_nob = msgsize;
+                tx->tx_msg.ptlm_cksum = 0;
+                tx->tx_msg.ptlm_srcnid = ni->ni_nid;
+                tx->tx_msg.ptlm_srcstamp = plni->plni_stamp;
+                tx->tx_msg.ptlm_dstnid = peer->plp_nid;
+                tx->tx_msg.ptlm_dststamp = peer->plp_stamp;
+                tx->tx_msg.ptlm_seq = peer->plp_seq++;
+        }
+        
+        ptllnd_peer_addref(peer);
+        plni->plni_ntxs++;
+
+        return tx;
+}
+
+void
+ptllnd_abort_tx(ptllnd_tx_t *tx, ptl_handle_md_t *mdh)
+{
+        ptllnd_peer_t   *peer = tx->tx_peer;
+        lnet_ni_t       *ni = peer->plp_ni;
+        int              rc;
+
+        while (!PtlHandleIsEqual(*mdh, PTL_INVALID_HANDLE)) {
+                rc = PtlMDUnlink(*mdh);
+#ifndef LUSTRE_PORTALS_UNLINK_SEMANTICS
+                if (rc == PTL_OK) /* unlink successful => no unlinked event */
+                        return;
+                LASSERT (rc == PTL_MD_IN_USE);
+#endif
+                /* Wait for ptllnd_tx_event() to invalidate */
+                ptllnd_wait(ni, -1);
+        }
+}
+
+void
+ptllnd_tx_done(ptllnd_tx_t *tx)
+{
+        ptllnd_peer_t   *peer = tx->tx_peer;
+        lnet_ni_t       *ni = peer->plp_ni;
+        ptllnd_ni_t     *plni = ni->ni_data;
+
+        /* CAVEAT EMPTOR: If this tx is being aborted, I'll continue to get
+         * events for this tx until it's unlinked.  So I set tx_completing to
+         * flag the tx is getting handled */
+
+        if (tx->tx_completing)
+                return;
+        
+        tx->tx_completing = 1;
+
+        if (!list_empty(&tx->tx_list))
+                list_del_init(&tx->tx_list);
+
+        if (tx->tx_status != 0)
+                ptllnd_close_peer(peer);
+        
+        ptllnd_abort_tx(tx, &tx->tx_reqmdh);
+        ptllnd_abort_tx(tx, &tx->tx_bulkmdh);
+
+        if (tx->tx_niov > 0) {
+                PORTAL_FREE(tx->tx_iov, tx->tx_niov * sizeof(*tx->tx_iov));
+                tx->tx_niov = 0;
+        }
+
+        if (tx->tx_lnetreplymsg != NULL) {
+                LASSERT (tx->tx_type == PTLLND_MSG_TYPE_GET);
+                LASSERT (tx->tx_lnetmsg != NULL);
+                /* Simulate GET success always  */
+                lnet_finalize(ni, tx->tx_lnetmsg, 0);
+                lnet_finalize(ni, tx->tx_lnetreplymsg, tx->tx_status);
+        } else if (tx->tx_lnetmsg != NULL) {
+                lnet_finalize(ni, tx->tx_lnetmsg, tx->tx_status);
+        }
+
+        ptllnd_peer_decref(peer);
+
+        LASSERT (plni->plni_ntxs > 0);
+        plni->plni_ntxs--;
+        PORTAL_FREE(tx, offsetof(ptllnd_tx_t, tx_msg) + tx->tx_msgsize);
+}
+
+void
+ptllnd_abort_txs(lnet_ni_t *ni)
+{
+        ptllnd_ni_t   *plni = ni->ni_data;
+        
+        while (!list_empty(&plni->plni_active_txs)) {
+                ptllnd_tx_t *tx = list_entry(plni->plni_active_txs.next,
+                                             ptllnd_tx_t, tx_list);
+                tx->tx_status = -ESHUTDOWN;
+                ptllnd_tx_done(tx);
+        }
+}
+
+int
+ptllnd_set_txiov(ptllnd_tx_t *tx,
+                 unsigned int niov, struct iovec *iov,
+                 unsigned int offset, unsigned int len)
+{
+        ptl_md_iovec_t *piov;
+        int             npiov;
+
+        if (len == 0) {
+                tx->tx_niov = 0;
+                return 0;
+        }
+        
+        for (;;) {
+                LASSERT (niov > 0);
+                if (offset < iov->iov_len)
+                        break;
+                offset -= iov->iov_len;
+                niov--;
+                iov++;
+        }
+
+        for (;;) {
+                PORTAL_ALLOC(piov, niov * sizeof(*piov));
+                if (piov == NULL)
+                        return -ENOMEM;
+
+                for (npiov = 0;; npiov++) {
+                        LASSERT (npiov < niov);
+                        LASSERT (iov->iov_len >= offset);
+                        
+                        piov[npiov].iov_base = iov[npiov].iov_base + offset;
+                        piov[npiov].iov_len = iov[npiov].iov_len - offset;
+                        
+                        if (piov[npiov].iov_len >= len) {
+                                piov[npiov].iov_len = len;
+                                npiov++;
+                                break;
+                        }
+                        iov++;
+                        offset = 0;
+                }
+
+                if (npiov == niov) {
+                        tx->tx_niov = niov;
+                        tx->tx_iov = piov;
+                        return 0;
+                }
+                
+                /* Dang! The piov I allocated was too big and it's a drag to
+                 * have to maintain separate 'allocated' and 'used' sizes, so
+                 * I'll just do it again; NB this doesn't happen normally... */
+                PORTAL_FREE(piov, niov * sizeof(*piov));
+                niov = npiov;
+        }
+}
+
+void
+ptllnd_set_md_buffer(ptl_md_t *md, ptllnd_tx_t *tx)
+{
+        unsigned int    niov = tx->tx_niov;
+        ptl_md_iovec_t *iov = tx->tx_iov;
+        
+        LASSERT ((md->options & PTL_MD_IOVEC) == 0);
+
+        if (niov == 0) {
+                md->start = NULL;
+                md->length = 0;
+        } else if (niov == 1) {
+                md->start = iov[0].iov_base;
+                md->length = iov[0].iov_len;
+        } else {
+                md->start = iov;
+                md->length = niov;
+                md->options |= PTL_MD_IOVEC;
+        }
+}
+
+int
+ptllnd_post_buffer(ptllnd_buffer_t *buf)
+{
+        lnet_ni_t        *ni = buf->plb_ni;
+        ptllnd_ni_t      *plni = ni->ni_data;
+        ptl_process_id_t  anyid = {
+                .nid       = PTL_NID_ANY,
+                .pid       = PTL_PID_ANY};
+        ptl_md_t          md = {
+                .start     = buf->plb_buffer,
+                .length    = plni->plni_buffer_size,
+                .threshold = PTL_MD_THRESH_INF,
+                .max_size  = plni->plni_max_msg_size,
+                .options   = (PTLLND_MD_OPTIONS | 
+                              PTL_MD_OP_PUT | PTL_MD_MAX_SIZE),
+                .user_ptr  = ptllnd_obj2eventarg(buf, PTLLND_EVENTARG_TYPE_BUF),
+                .eq_handle = plni->plni_eqh};
+        ptl_handle_me_t meh;
+        int             rc;
+
+        LASSERT (!buf->plb_posted);
+        
+        rc = PtlMEAttach(plni->plni_nih, plni->plni_portal, 
+                         anyid, LNET_MSG_MATCHBITS, 0,
+                         PTL_UNLINK, PTL_INS_AFTER, &meh);
+        if (rc != PTL_OK) {
+                CERROR("PtlMEAttach failed: %d\n", rc);
+                return -ENOMEM;
+        }
+
+        buf->plb_posted = 1;
+        plni->plni_nposted_buffers++;
+        
+        rc = PtlMDAttach(meh, md, LNET_UNLINK, &buf->plb_md);
+        if (rc == PTL_OK)
+                return 0;
+
+        CERROR("PtlMDAttach failed: %d\n", rc);
+        
+        buf->plb_posted = 0;
+        plni->plni_nposted_buffers--;
+
+        rc = PtlMEUnlink(meh);
+        LASSERT (rc == PTL_OK);
+        
+        return -ENOMEM;
+}
+
+void
+ptllnd_check_sends(ptllnd_peer_t *peer)
+{
+        lnet_ni_t      *ni = peer->plp_ni;
+        ptllnd_ni_t    *plni = ni->ni_data;
+        ptllnd_tx_t    *tx;
+        ptl_md_t        md;
+        ptl_handle_md_t mdh;
+        int             rc;
+        
+        if (list_empty(&peer->plp_txq) &&
+            peer->plp_outstanding_credits >= 
+            PTLLND_CREDIT_HIGHWATER(plni)) {
+
+                tx = ptllnd_new_tx(peer, PTLLND_MSG_TYPE_NOOP, 0);
+                if (tx == NULL) {
+                        CERROR("Can't return credits to %s\n",
+                               libcfs_nid2str(peer->plp_nid));
+                } else {
+                        list_add_tail(&tx->tx_list, &peer->plp_txq);
+                }
+        }
+
+        while (!list_empty(&peer->plp_txq)) {
+                tx = list_entry(peer->plp_txq.next, ptllnd_tx_t, tx_list);
+                
+                LASSERT (tx->tx_msgsize > 0);
+                
+                LASSERT (peer->plp_outstanding_credits >= 0);
+                LASSERT (peer->plp_outstanding_credits <=
+                         plni->plni_peer_credits);
+                LASSERT (peer->plp_credits >= 0);
+                LASSERT (peer->plp_credits <= peer->plp_max_credits);
+
+                if (peer->plp_credits == 0)     /* no credits */
+                        break;
+
+                if (peer->plp_credits == 1 &&   /* last credit reserved for */
+                    peer->plp_outstanding_credits == 0) /* returning credits */
+                        break;
+                
+                list_del_init(&tx->tx_list);
+                
+                if (tx->tx_type == PTLLND_MSG_TYPE_NOOP &&
+                    (!list_empty(&peer->plp_txq) ||
+                     peer->plp_outstanding_credits <
+                     PTLLND_CREDIT_HIGHWATER(plni))) {
+                        /* redundant NOOP */
+                        ptllnd_tx_done(tx);
+                        continue;
+                }
+
+                md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
+                md.eq_handle = plni->plni_eqh;
+                md.threshold = 1;
+                md.options = PTLLND_MD_OPTIONS;
+                md.start = &tx->tx_msg;
+                md.length = tx->tx_msgsize;
+                
+                rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);
+                if (rc != PTL_OK) {
+                        CERROR("PtlMDBind for %s failed: %d\n",
+                               libcfs_nid2str(peer->plp_nid), rc);
+                        tx->tx_status = -EIO;
+                        ptllnd_tx_done(tx);
+                        break;
+                }
+                
+                tx->tx_reqmdh = mdh;
+                rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,
+                            plni->plni_portal, 0, LNET_MSG_MATCHBITS, 0, 0);
+                if (rc != PTL_OK) {
+                        CERROR("PtlPut for %s failed: %d\n",
+                               libcfs_nid2str(peer->plp_nid), rc);
+                        tx->tx_status = -EIO;
+                        ptllnd_tx_done(tx);
+                        break;
+                }
+
+                list_add_tail(&tx->tx_list, &plni->plni_active_txs);
+        }
+}
+
+int
+ptllnd_passive_rdma(ptllnd_peer_t *peer, int type, lnet_msg_t *msg,
+                    unsigned int niov, struct iovec *iov,
+                    unsigned int offset, unsigned int len)
+{
+        lnet_ni_t      *ni = peer->plp_ni;
+        ptllnd_ni_t    *plni = ni->ni_data;
+        ptllnd_tx_t    *tx = ptllnd_new_tx(peer, type, 0);
+        __u64           matchbits;
+        ptl_md_t        md;
+        ptl_handle_md_t mdh;
+        ptl_handle_me_t meh;
+        int             rc;
+        int             rc2;
+
+        LASSERT (type == PTLLND_MSG_TYPE_GET ||
+                 type == PTLLND_MSG_TYPE_PUT);
+
+        if (tx == NULL) {
+                CERROR("Can't allocate %s tx for %s\n",
+                       type == PTLLND_MSG_TYPE_GET ? "GET" : "PUT/REPLY",
+                       libcfs_nid2str(peer->plp_nid));
+                return -ENOMEM;
+        }
+
+        rc = ptllnd_set_txiov(tx, niov, iov, offset, len);
+        if (rc != 0) {
+                CERROR ("Can't allocate iov %d for %s\n", 
+                        niov, libcfs_nid2str(peer->plp_nid));
+                rc = -ENOMEM;
+                goto failed;
+        }
+
+        md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
+        md.eq_handle = plni->plni_eqh;
+        md.threshold = 1;
+        md.options = PTLLND_MD_OPTIONS |
+                     (type == PTLLND_MSG_TYPE_GET) ? PTL_MD_OP_PUT : PTL_MD_OP_GET;
+        ptllnd_set_md_buffer(&md, tx);
+
+        while (!peer->plp_recvd_hello) {        /* wait to validate plp_match */
+                if (peer->plp_closing) {
+                        rc = -EIO;
+                        goto failed;
+                }
+                ptllnd_wait(ni, -1);
+        }
+        
+        matchbits = peer->plp_match++;
+        LASSERT (matchbits >= PTL_RESERVED_MATCHBITS);
+        
+        rc = PtlMEAttach(plni->plni_nih, plni->plni_portal, peer->plp_ptlid, 
+                         matchbits, 0, PTL_UNLINK, PTL_INS_AFTER, &meh);
+        if (rc != PTL_OK) {
+                CERROR("PtlMEAttach for %s failed: %d\n", 
+                       libcfs_nid2str(peer->plp_nid), rc);
+                rc = -EIO;
+                goto failed;
+        }
+
+        rc = PtlMDAttach(meh, md, LNET_UNLINK, &mdh);
+        if (rc != PTL_OK) {
+                CERROR("PtlMDAttach for %s failed: %d\n",
+                       libcfs_nid2str(peer->plp_nid), rc);
+                rc2 = PtlMEUnlink(meh);
+                LASSERT (rc2 == PTL_OK);
+                rc = -EIO;
+                goto failed;
+        }
+        tx->tx_bulkmdh = mdh;
+
+        tx->tx_msg.ptlm_u.req.kptlrm_hdr = msg->msg_hdr;
+        tx->tx_msg.ptlm_u.req.kptlrm_matchbits = matchbits;
+
+        if (type == PTLLND_MSG_TYPE_GET) {
+                tx->tx_lnetreplymsg = lnet_create_reply_msg(ni, msg);
+                if (tx->tx_lnetreplymsg == NULL) {
+                        CERROR("Can't create reply for GET to %s\n",
+                               libcfs_id2str(msg->msg_target));
+                        rc = -ENOMEM;
+                        goto failed;
+                }
+        }
+        
+        tx->tx_lnetmsg = msg;
+        ptllnd_post_tx(tx);
+        return 0;
+        
+ failed:
+        ptllnd_tx_done(tx);
+        return rc;
+}
+
+int
+ptllnd_active_rdma(ptllnd_peer_t *peer, int type,
+                   lnet_msg_t *msg, __u64 matchbits,
+                   unsigned int niov, struct iovec *iov,
+                   unsigned int offset, unsigned int len)
+{
+        lnet_ni_t       *ni = peer->plp_ni;
+        ptllnd_ni_t     *plni = ni->ni_data;
+        ptllnd_tx_t     *tx = ptllnd_new_tx(peer, type, 0);
+        ptl_md_t         md;
+        ptl_handle_md_t  mdh;
+        int              rc;
+
+        LASSERT (type == PTLLND_RDMA_READ ||
+                 type == PTLLND_RDMA_WRITE);
+        
+        if (tx == NULL) {
+                CERROR("Can't allocate tx for RDMA %s with %s\n",
+                       (type == PTLLND_RDMA_WRITE) ? "write" : "read",
+                       libcfs_nid2str(peer->plp_nid));
+                ptllnd_close_peer(peer);
+                return -ENOMEM;
+        }
+
+        rc = ptllnd_set_txiov(tx, niov, iov, offset, len);
+        if (rc != 0) {
+                CERROR ("Can't allocate iov %d for %s\n", 
+                        niov, libcfs_nid2str(peer->plp_nid));
+                rc = -ENOMEM;
+                goto failed;
+        }
+
+        md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);
+        md.eq_handle = plni->plni_eqh;
+        /* If I've received a PUT, I fetch the data */
+        md.threshold = (type == PTLLND_MSG_TYPE_PUT) ? 2 : 1;
+        md.options = PTLLND_MD_OPTIONS;
+        ptllnd_set_md_buffer(&md, tx);
+        
+        rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);
+        if (rc != PTL_OK) {
+                CERROR("PtlMDBind for %s failed: %d\n",
+                       libcfs_nid2str(peer->plp_nid), rc);
+                rc = -EIO;
+                goto failed;
+        }
+
+        tx->tx_bulkmdh = mdh;
+        tx->tx_lnetmsg = msg;
+        
+        if (type == PTLLND_MSG_TYPE_PUT)
+                rc = PtlGet(mdh, peer->plp_ptlid,
+                            plni->plni_portal, 0, matchbits, 0);
+        else
+                rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,
+                            plni->plni_portal, 0, matchbits, 0, 0);
+        if (rc == 0)
+                return 0;
+        
+        tx->tx_lnetmsg = NULL;
+ failed:
+        tx->tx_status = rc;
+        ptllnd_tx_done(tx);    /* this will close peer */
+        return rc;
+}
+
+int 
+ptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *msg)
+{
+        ptllnd_ni_t    *plni = ni->ni_data;
+        ptllnd_peer_t  *plp;
+        ptllnd_tx_t    *tx;
+        int             nob;
+        int             rc;
+
+        LASSERT (!msg->msg_routing);
+        LASSERT (msg->msg_kiov == NULL);
+
+        plp = ptllnd_find_peer(ni, msg->msg_target.nid, 1);
+        if (plp == NULL)
+                return -ENOMEM;
+        
+        switch (msg->msg_type) {
+        default:
+                LBUG();
+                
+        case LNET_MSG_ACK:
+                LASSERT (msg->msg_len == 0);
+                break;                          /* send IMMEDIATE */
+                
+        case LNET_MSG_GET:
+                if (msg->msg_target_is_router)
+                        break;                  /* send IMMEDIATE */
+
+                nob = msg->msg_md->md_length;
+                nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);
+                if (nob <= plni->plni_max_msg_size)
+                        break;
+
+                LASSERT ((msg->msg_md->md_options & LNET_MD_KIOV) == 0);
+                rc = ptllnd_passive_rdma(plp, PTLLND_MSG_TYPE_GET, msg,
+                                         msg->msg_md->md_niov,
+                                         msg->msg_md->md_iov.iov,
+                                         0, msg->msg_md->md_length);
+                ptllnd_peer_decref(plp);
+                return rc;
+
+        case LNET_MSG_REPLY: {
+                ptllnd_rx_t *rx = private;      /* incoming GET */
+                __u64        match;
+
+                LASSERT (rx != NULL);
+                match = rx->rx_msg->ptlm_u.req.kptlrm_matchbits;
+                
+                if (rx->rx_msg->ptlm_type == PTLLND_MSG_TYPE_GET) {
+                        LASSERT (!rx->rx_replied);
+                        rc = ptllnd_active_rdma(plp, PTLLND_RDMA_WRITE, msg,
+                                                match,
+                                                msg->msg_niov, msg->msg_iov, 
+                                                msg->msg_offset, msg->msg_len);
+                        rx->rx_replied = (rc == 0);
+                        ptllnd_peer_decref(plp);
+                        return rc;
+                }
+                
+                if (rx->rx_msg->ptlm_type != PTLLND_MSG_TYPE_IMMEDIATE) {
+                        CERROR("Reply to %s bad msg type %x!!!\n",
+                               libcfs_id2str(msg->msg_target),
+                               rx->rx_msg->ptlm_type);
+                        ptllnd_peer_decref(plp);
+                        return -EPROTO;
+                }
+
+                /* fall through to handle like PUT */
+        }
+                
+        case LNET_MSG_PUT:
+                nob = msg->msg_len;
+                nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);
+                if (nob <= plp->plp_max_msg_size)
+                        break;                  /* send IMMEDIATE */
+
+                rc = ptllnd_passive_rdma(plp, PTLLND_MSG_TYPE_PUT, msg,
+                                         msg->msg_niov, msg->msg_iov,
+                                         msg->msg_offset, msg->msg_len);
+                ptllnd_peer_decref(plp);
+                return rc;
+        }
+
+        /* send IMMEDIATE 
+         * NB copy the payload so we don't have to do a fragmented send */
+        
+        tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_IMMEDIATE, msg->msg_len);
+        if (tx == NULL) {
+                CERROR("Can't allocate tx for lnet type %d to %s\n",
+                       msg->msg_type, libcfs_id2str(msg->msg_target));
+                ptllnd_peer_decref(plp);
+                return -ENOMEM;
+        }
+        
+        lnet_copy_iov2flat(tx->tx_msgsize, &tx->tx_msg,
+                           offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload),
+                           msg->msg_niov, msg->msg_iov, msg->msg_offset, 
+                           msg->msg_len);
+        tx->tx_msg.ptlm_u.immediate.kptlim_hdr = msg->msg_hdr;
+
+        tx->tx_lnetmsg = msg;
+        ptllnd_post_tx(tx);
+        ptllnd_peer_decref(plp);
+        return 0;
+}
+
+void
+ptllnd_rx_done(ptllnd_rx_t *rx)
+{
+        ptllnd_peer_t *plp = rx->rx_peer;
+        lnet_ni_t     *ni = plp->plp_ni;
+        ptllnd_ni_t   *plni = ni->ni_data;
+        
+        plp->plp_outstanding_credits++;
+        ptllnd_check_sends(rx->rx_peer);
+        
+        if (rx->rx_msg != (kptl_msg_t *)rx->rx_space)
+                PORTAL_FREE(rx, offsetof(ptllnd_rx_t, rx_space[rx->rx_nob]));
+
+        LASSERT (plni->plni_nrxs > 0);
+        plni->plni_nrxs--;
+}
+
+int 
+ptllnd_eager_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
+                  void **new_privatep)
+{
+        ptllnd_rx_t *stackrx = private;
+        ptllnd_rx_t *heaprx;
+
+        /* Don't ++plni_nrxs: heaprx replaces stackrx */
+
+        LASSERT (stackrx->rx_msg != (kptl_msg_t *)stackrx->rx_space);
+        
+        PORTAL_ALLOC(heaprx, offsetof(ptllnd_rx_t, rx_space[stackrx->rx_nob]));
+        if (heaprx == NULL)
+                return -ENOMEM;
+
+        heaprx->rx_msg = (kptl_msg_t *)heaprx->rx_space;
+        memcpy(&heaprx->rx_msg, stackrx->rx_msg, stackrx->rx_nob);
+
+        return 0;
+}
+
+int 
+ptllnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
+            int delayed, unsigned int niov, 
+            struct iovec *iov, lnet_kiov_t *kiov,
+            unsigned int offset, unsigned int mlen, unsigned int rlen)
+{
+        ptllnd_rx_t    *rx = private;
+        int             rc = 0;
+        int             nob;
+
+        LASSERT (kiov == NULL);
+
+        switch (rx->rx_msg->ptlm_type) {
+        default:
+                LBUG();
+
+        case PTLLND_MSG_TYPE_IMMEDIATE:
+                nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[rlen]);
+                if (nob > rx->rx_nob) {
+                        CERROR("Immediate message from %s too big: %d(%d)\n",
+                               libcfs_nid2str(rx->rx_peer->plp_nid),
+                               nob, rx->rx_nob);
+                        rc = -EPROTO;
+                        break;
+                }
+                lnet_copy_flat2iov(niov, iov, offset,
+                                   rx->rx_nob, rx->rx_msg,
+                                   offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload),
+                                   mlen);
+                lnet_finalize(ni, msg, 0);
+                break;
+                
+        case PTLLND_MSG_TYPE_PUT:
+                rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_READ, msg, 
+                                        rx->rx_msg->ptlm_u.req.kptlrm_matchbits,
+                                        niov, iov, offset, mlen);
+                break;
+                
+        case PTLLND_MSG_TYPE_GET:
+                LASSERT (msg == NULL);          /* no need to finalize */
+                if (!rx->rx_replied)            /* peer will time out */
+                        ptllnd_close_peer(rx->rx_peer);
+                break;
+        }
+        
+        ptllnd_rx_done(rx);
+        return rc;
 }
 
-int ptllnd_eager_recv(struct lnet_ni *ni, void *private, lnet_msg_t *msg,
-                     void **new_privatep)
+void
+ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
+                     kptl_msg_t *msg, unsigned int nob)
 {
-       return -EIO;
+        ptllnd_ni_t   *plni = ni->ni_data;
+        const int      basenob = offsetof(kptl_msg_t, ptlm_u);
+        ptllnd_rx_t    rx;
+        int            flip;
+        ptllnd_peer_t *plp;
+        int            rc;
+        
+        if (nob < basenob) {
+                CERROR("Short receive from %s\n",
+                       ptllnd_ptlid2str(initiator));
+                return;
+        }
+
+        flip = msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC);
+        if (!flip && msg->ptlm_magic != PTLLND_MSG_MAGIC) {
+                CERROR("Bad magic %08x from %s\n", msg->ptlm_magic, 
+                       ptllnd_ptlid2str(initiator));
+                return;
+        }
+        
+        if (flip) {
+                /* NB stamps are opaque cookies */
+                __swab16s(&msg->ptlm_version);
+                __swab32s(&msg->ptlm_nob);
+                __swab32s(&msg->ptlm_cksum);
+                __swab64s(&msg->ptlm_srcnid);
+                __swab64s(&msg->ptlm_dstnid);
+                __swab64s(&msg->ptlm_seq);
+        }
+        
+        if (msg->ptlm_version != PTLLND_MSG_VERSION) {
+                CERROR("Bad version %d from %s\n", (__u32)msg->ptlm_version, 
+                       ptllnd_ptlid2str(initiator));
+                return;
+        }
+
+        if (msg->ptlm_dstnid != ni->ni_nid) {
+                CERROR("Bad dstnid %s (%s expected) from %s\n",
+                       libcfs_nid2str(msg->ptlm_dstnid),
+                       libcfs_nid2str(ni->ni_nid),
+                       libcfs_nid2str(msg->ptlm_srcnid));
+                return;
+        }
+        
+        if (msg->ptlm_dststamp != plni->plni_stamp) {
+                CERROR("Bad dststamp "LPX64"("LPX64" expected) from %s\n",
+                       msg->ptlm_dststamp, plni->plni_stamp,
+                       libcfs_nid2str(msg->ptlm_srcnid));
+                return;
+        }
+
+        switch (msg->ptlm_type) {
+        case PTLLND_MSG_TYPE_PUT:
+        case PTLLND_MSG_TYPE_GET:
+                if (nob < basenob + sizeof(kptl_request_msg_t)) {
+                        CERROR("Short rdma request from %s(%s)\n",
+                               libcfs_nid2str(msg->ptlm_srcnid),
+                               ptllnd_ptlid2str(initiator));
+                        return;
+                }
+                if (flip)
+                        __swab64s(&msg->ptlm_u.req.kptlrm_matchbits);
+                break;
+
+        case PTLLND_MSG_TYPE_IMMEDIATE:
+                if (nob < offsetof(kptl_msg_t, 
+                                   ptlm_u.immediate.kptlim_payload)) {
+                        CERROR("Short immediate from %s(%s)\n",
+                               libcfs_nid2str(msg->ptlm_srcnid),
+                               ptllnd_ptlid2str(initiator));
+                        return;
+                }
+                break;
+                
+        case PTLLND_MSG_TYPE_HELLO:
+                if (nob < basenob + sizeof(kptl_hello_msg_t)) {
+                        CERROR("Short hello from %s(%s)\n",
+                               libcfs_nid2str(msg->ptlm_srcnid),
+                               ptllnd_ptlid2str(initiator));
+                        return;
+                }
+                __swab64s(&msg->ptlm_u.hello.kptlhm_matchbits);
+                __swab32s(&msg->ptlm_u.hello.kptlhm_max_immd_size);
+                break;
+                
+        default:
+                CERROR("Bad message type %d from %s(%s)\n", msg->ptlm_type,
+                       libcfs_nid2str(msg->ptlm_srcnid),
+                       ptllnd_ptlid2str(initiator));
+                return;
+        }
+
+        plp = ptllnd_find_peer(ni, msg->ptlm_srcnid,
+                               msg->ptlm_type == PTLLND_MSG_TYPE_HELLO);
+        if (plp == NULL) {
+                CERROR("Can't find peer %s\n", 
+                       libcfs_nid2str(msg->ptlm_srcnid));
+                return;
+        }
+        
+        if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {
+                int n;
+                
+                if (plp->plp_recvd_hello) {
+                        CERROR("Unexpected HELLO from %s\n",
+                               libcfs_nid2str(msg->ptlm_srcnid));
+                        ptllnd_peer_decref(plp);
+                        return;
+                }
+
+                n = msg->ptlm_u.hello.kptlhm_max_immd_size;
+                n = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[n]);
+                plp->plp_max_msg_size = MAX(sizeof(kptl_msg_t), n);
+                plp->plp_match = msg->ptlm_u.hello.kptlhm_matchbits;
+                plp->plp_stamp = msg->ptlm_srcstamp;
+                plp->plp_max_credits += msg->ptlm_credits;
+                plp->plp_recvd_hello = 1;
+
+        } else if (!plp->plp_recvd_hello) {
+
+                CERROR("Bad message type %d (HELLO expected) from %s\n",
+                       msg->ptlm_type, libcfs_nid2str(msg->ptlm_srcnid));
+                ptllnd_peer_decref(plp);
+                return;
+
+        } else if (msg->ptlm_srcstamp != plp->plp_stamp) {
+
+                CERROR("Bad srcstamp "LPX64"("LPX64" expected) from %s\n", 
+                       msg->ptlm_srcstamp, plp->plp_stamp,
+                       libcfs_nid2str(msg->ptlm_srcnid));
+                ptllnd_peer_decref(plp);
+                return;
+        }
+        
+        if (msg->ptlm_credits > 0) {
+                if (plp->plp_credits + msg->ptlm_credits >
+                    plp->plp_max_credits) {
+                        CWARN("Too many credits from %s: %d + %d > %d\n",
+                              libcfs_nid2str(msg->ptlm_srcnid),
+                              plp->plp_credits, msg->ptlm_credits,
+                              plp->plp_max_credits);
+                        plp->plp_credits = plp->plp_max_credits;
+                } else {
+                        plp->plp_credits += msg->ptlm_credits;
+                }
+                ptllnd_check_sends(plp);
+        }
+        
+        /* All OK so far; assume the message is good... */
+
+        rx.rx_peer = plp;
+        rx.rx_msg  = msg;
+        rx.rx_nob  = nob;
+        plni->plni_nrxs++;
+        
+        switch (msg->ptlm_type) {
+        default: /* message types have been checked already */
+                ptllnd_rx_done(&rx);
+                break;
+                
+        case PTLLND_MSG_TYPE_PUT:
+        case PTLLND_MSG_TYPE_GET:
+                rc = lnet_parse(ni, &msg->ptlm_u.req.kptlrm_hdr,
+                                msg->ptlm_srcnid, &rx);
+                if (rc < 0)
+                        ptllnd_rx_done(&rx);
+                break;
+
+        case PTLLND_MSG_TYPE_IMMEDIATE:
+                rc = lnet_parse(ni, &msg->ptlm_u.immediate.kptlim_hdr,
+                                msg->ptlm_srcnid, &rx);
+                if (rc < 0)
+                        ptllnd_rx_done(&rx);
+                break;
+        }
+
+        ptllnd_peer_decref(plp);
+}
+
+void
+ptllnd_buf_event (lnet_ni_t *ni, ptl_event_t *event)
+{
+        ptllnd_buffer_t *buf = ptllnd_eventarg2obj(event->md.user_ptr);
+        char            *msg = &buf->plb_buffer[event->offset];
+        int              repost;
+
+        LASSERT (buf->plb_ni == ni);
+        LASSERT (event->type == PTL_EVENT_PUT_END ||
+                 event->type == PTL_EVENT_UNLINK);
+
+        if (event->type == PTL_EVENT_PUT_END)
+                ptllnd_parse_request(ni, event->initiator, 
+                                     (kptl_msg_t *)msg, event->mlength);
+
+#ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
+        /* UNLINK event only on explicit unlink */
+        repost = (event->unlinked && event->type != PTL_EVENT_UNLINK);
+#else
+        /* UNLINK event only on implicit unlink */
+        repost = (event->type == PTL_EVENT_UNLINK);
+#endif
+        if (repost)
+                (void) ptllnd_post_buffer(buf);
+}
+
+void
+ptllnd_tx_event (lnet_ni_t *ni, ptl_event_t *event)
+{
+        ptllnd_ni_t *plni = ni->ni_data;
+        ptllnd_tx_t *tx = ptllnd_eventarg2obj(event->md.user_ptr);
+        int          error = (event->ni_fail_type != PTL_NI_OK);
+        int          isreq;
+        int          isbulk;
+#ifdef LUSTRE_PORTALS_UNLINK_SEMANTICS
+        int          exhausted = event->unlinked;
+#else
+        int          exhausted = (event->type == PTL_EVENT_UNLINK);
+#endif
+
+        LASSERT (!PtlHandleIsEqual(event->md_handle, PTL_INVALID_HANDLE));
+
+        isreq = PtlHandleIsEqual(event->md_handle, tx->tx_reqmdh);
+        if (isreq) {
+                LASSERT (event->md.start == (void *)&tx->tx_msg);
+                if (exhausted)
+                        tx->tx_reqmdh = PTL_INVALID_HANDLE;
+        }
+            
+        isbulk = PtlHandleIsEqual(event->md_handle, tx->tx_bulkmdh);
+        if (isbulk) {
+                void *ptr;
+
+                if ((event->md.options & PTL_MD_IOVEC) == 0)
+                        ptr = tx->tx_iov[0].iov_base;
+                else
+                        ptr = tx->tx_iov;
+                        
+                LASSERT (event->md.start == ptr);
+                if (exhausted)
+                        tx->tx_bulkmdh = PTL_INVALID_HANDLE;
+        }
+
+        LASSERT (!isreq != !isbulk);            /* always one and only 1 match */
+        
+        switch (tx->tx_type) {
+        default:
+                LBUG();
+                
+        case PTLLND_MSG_TYPE_NOOP:
+        case PTLLND_MSG_TYPE_HELLO:
+        case PTLLND_MSG_TYPE_IMMEDIATE:
+                LASSERT (event->type == PTL_EVENT_UNLINK ||
+                         event->type == PTL_EVENT_SEND_END);
+                LASSERT (isreq);
+                break;
+
+        case PTLLND_MSG_TYPE_GET:
+                LASSERT (event->type == PTL_EVENT_UNLINK ||
+                         (isreq && event->type == PTL_EVENT_SEND_END) ||
+                         (isbulk && event->type == PTL_EVENT_REPLY_END));
+
+                if (isbulk && event->type == PTL_EVENT_REPLY_END) {
+                        LASSERT (tx->tx_lnetreplymsg != NULL);
+                        tx->tx_lnetreplymsg->msg_ev.mlength =
+                                event->mlength;
+                }
+                break;
+                
+        case PTLLND_MSG_TYPE_PUT:
+                LASSERT (event->type == PTL_EVENT_UNLINK ||
+                         (isreq && event->type == PTL_EVENT_SEND_END) ||
+                         (isbulk && event->type == PTL_EVENT_GET_END));
+                break;
+                
+        case PTLLND_RDMA_READ:
+                LASSERT (event->type == PTL_EVENT_UNLINK ||
+                         event->type == PTL_EVENT_SEND_END ||
+                         event->type == PTL_EVENT_REPLY_END);
+                LASSERT (isbulk);
+                break;
+                
+        case PTLLND_RDMA_WRITE:
+                LASSERT (event->type == PTL_EVENT_UNLINK ||
+                         event->type == PTL_EVENT_SEND_END);
+                LASSERT (isbulk);
+        }
+
+        /* Schedule ptllnd_tx_done() on error last completion event */
+        if (error ||
+            (PtlHandleIsEqual(tx->tx_bulkmdh, PTL_INVALID_HANDLE) &&
+             PtlHandleIsEqual(tx->tx_reqmdh, PTL_INVALID_HANDLE))) {
+                if (error)
+                        tx->tx_status = -EIO;
+                list_del(&tx->tx_list);
+                list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);
+        }
+}
+
+void
+ptllnd_wait (lnet_ni_t *ni, int milliseconds)
+{
+        ptllnd_ni_t   *plni = ni->ni_data;
+        ptllnd_tx_t   *tx;
+        ptl_event_t    event;
+        int            which;
+        int            rc;
+        int            blocked = 0;
+        int            found = 0;
+        int            timeout = 0;
+
+        /* Handle any currently queued events, returning immediately if any.
+         * Otherwise block for the timeout and handle all events queued
+         * then. */
+
+        for (;;) {
+                rc = PtlEQPoll(&plni->plni_eqh, 1, timeout, &event, &which);
+                timeout = 0;
+
+                if (rc == PTL_EQ_EMPTY) {
+                        if (found ||            /* handled some events */
+                            milliseconds == 0 || /* just checking */
+                            blocked)            /* blocked already */
+                                break;
+
+                        blocked = 1;
+                        timeout = milliseconds;
+                        continue;
+                }
+
+                LASSERT (rc == PTL_OK || rc == PTL_EQ_DROPPED);
+                
+                if (rc == PTL_EQ_DROPPED)
+                        CERROR("Event queue: size %d is too small\n",
+                               plni->plni_eq_size);
+
+                found = 1;
+                switch (ptllnd_eventarg2type(event.md.user_ptr)) {
+                default:
+                        LBUG();
+                                
+                case PTLLND_EVENTARG_TYPE_TX:
+                        ptllnd_tx_event(ni, &event);
+                        break;
+                                
+                case PTLLND_EVENTARG_TYPE_BUF:
+                        ptllnd_buf_event(ni, &event);
+                        break;
+                }
+        }
+
+        while (!list_empty(&plni->plni_zombie_txs)) {
+                tx = list_entry(plni->plni_zombie_txs.next,
+                                ptllnd_tx_t, tx_list);
+
+                ptllnd_tx_done(tx);
+        }
 }