Whamcloud - gitweb
LU-1346 libcfs: cleanup libcfs primitive (linux-prim.h)
[fs/lustre-release.git] / lnet / klnds / ptllnd / ptllnd.h
old mode 100755 (executable)
new mode 100644 (file)
index bf6127c..203fe06
@@ -1,6 +1,4 @@
-/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
- * vim:expandtab:shiftwidth=8:tabstop=8:
- *
+/*
  * GPL HEADER START
  *
  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  * GPL HEADER END
  */
 /*
- * Copyright  2008 Sun Microsystems, Inc. All rights reserved
+ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2012, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
  * Author: PJ Kirner <pjkirner@clusterfs.com>
  */
 
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
-#ifndef AUTOCONF_INCLUDED
-#include <linux/config.h>
-#endif
 #include <linux/module.h>
 #include <linux/kernel.h>
 #include <linux/mm.h>
 #include <linux/string.h>
 #include <linux/stat.h>
 #include <linux/errno.h>
-#include <linux/smp_lock.h>
 #include <linux/unistd.h>
 #include <linux/uio.h>
 
-#include <asm/system.h>
 #include <asm/uaccess.h>
 #include <asm/io.h>
 
 #include <libcfs/libcfs.h>
 #include <lnet/lnet.h>
 #include <lnet/lib-lnet.h>
+#include <lnet/lnet-sysctl.h>
 #include <portals/p30.h>
-#ifdef CRAY_XT3
-#include <portals/ptltrace.h>
-#endif
 #include <lnet/ptllnd.h>        /* Depends on portals/p30.h */
 
 /*
 # define PTLLND_N_SCHED         1                   /* # schedulers */
 #endif
 
-#define PTLLND_CREDIT_HIGHWATER ((*kptllnd_tunables.kptl_peercredits)-1)
+#define PTLLND_CREDIT_HIGHWATER ((*kptllnd_tunables.kptl_peertxcredits)-1)
   /* when eagerly to return credits */
 
 typedef struct
 {
-        int             *kptl_ntx;              /* # tx descs to pre-allocate */
-        int             *kptl_max_nodes;        /* max # nodes all talking to me */
-        int             *kptl_max_procs_per_node; /* max # processes per node */
-        int             *kptl_checksum;         /* checksum kptl_msg_t? */
-        int             *kptl_timeout;          /* comms timeout (seconds) */
-        int             *kptl_portal;           /* portal number */
-        int             *kptl_pid;              /* portals PID (self + kernel peers) */
-        int             *kptl_rxb_npages;       /* number of pages for rx buffer */
-        int             *kptl_rxb_nspare;       /* number of spare rx buffers */
-        int             *kptl_credits;          /* number of credits */
-        int             *kptl_peercredits;      /* number of credits */
-        int             *kptl_max_msg_size;     /* max immd message size*/
-        int             *kptl_peer_hash_table_size; /* # slots in peer hash table */
-        int             *kptl_reschedule_loops; /* scheduler yield loops */
-        int             *kptl_ack_puts;         /* make portals ack PUTs */
-#ifdef CRAY_XT3
-        int             *kptl_ptltrace_on_timeout; /* dump pltrace on timeout? */
-        char           **kptl_ptltrace_basename;  /* ptltrace dump file basename */
-#endif
+       int             *kptl_ntx;              /* # tx descs to pre-allocate */
+       int             *kptl_max_nodes;        /* max # nodes all talking to me */
+       int             *kptl_max_procs_per_node; /* max # processes per node */
+       int             *kptl_checksum;         /* checksum kptl_msg_t? */
+       int             *kptl_timeout;          /* comms timeout (seconds) */
+       int             *kptl_portal;           /* portal number */
+       int             *kptl_pid;              /* portals PID (self + kernel peers) */
+       int             *kptl_rxb_npages;       /* number of pages for rx buffer */
+       int             *kptl_rxb_nspare;       /* number of spare rx buffers */
+       int             *kptl_credits;          /* number of credits */
+       int             *kptl_peertxcredits;    /* number of peer tx credits */
+       int             *kptl_peerrtrcredits;   /* number of peer router credits */
+       int             *kptl_max_msg_size;     /* max immd message size*/
+       int             *kptl_peer_hash_table_size; /* # slots in peer hash table */
+       int             *kptl_reschedule_loops; /* scheduler yield loops */
+       int             *kptl_ack_puts;         /* make portals ack PUTs */
 #ifdef PJK_DEBUGGING
-        int             *kptl_simulation_bitmap;/* simulation bitmap */
+       int             *kptl_simulation_bitmap;/* simulation bitmap */
 #endif
 
 #if defined(CONFIG_SYSCTL) && !CFS_SYSFS_MODULE_PARM
-        cfs_sysctl_table_header_t *kptl_sysctl; /* sysctl interface */
+       struct ctl_table_header *kptl_sysctl; /* sysctl interface */
 #endif
 } kptl_tunables_t;
 
@@ -132,6 +119,7 @@ typedef struct
 /***********************************************************************/
 
 typedef struct kptl_data kptl_data_t;
+typedef struct kptl_net kptl_net_t;
 typedef struct kptl_rx_buffer kptl_rx_buffer_t;
 typedef struct kptl_peer kptl_peer_t;
 
@@ -145,15 +133,12 @@ typedef struct {
 
 typedef struct kptl_rx                          /* receive message */
 {
-        struct list_head        rx_list;        /* queue for attention */
+        cfs_list_t              rx_list;        /* queue for attention */
         kptl_rx_buffer_t       *rx_rxb;         /* the rx buffer pointer */
         kptl_msg_t             *rx_msg;         /* received message */
         int                     rx_nob;         /* received message size */
         unsigned long           rx_treceived;   /* time received */
         ptl_process_id_t        rx_initiator;   /* sender's address */
-#ifdef CRAY_XT3
-        ptl_uid_t               rx_uid;         /* sender's uid */
-#endif
         kptl_peer_t            *rx_peer;        /* pointer to peer */
         char                    rx_space[0];    /* copy of incoming request */
 } kptl_rx_t;
@@ -164,8 +149,8 @@ typedef struct kptl_rx                          /* receive message */
 
 typedef struct kptl_rx_buffer_pool
 {
-        spinlock_t              rxbp_lock;
-        struct list_head        rxbp_list;      /* all allocated buffers */
+       spinlock_t              rxbp_lock;
+        cfs_list_t              rxbp_list;      /* all allocated buffers */
         int                     rxbp_count;     /* # allocated buffers */
         int                     rxbp_reserved;  /* # requests to buffer */
         int                     rxbp_shutdown;  /* shutdown flag */
@@ -173,15 +158,15 @@ typedef struct kptl_rx_buffer_pool
 
 struct kptl_rx_buffer
 {
-        kptl_rx_buffer_pool_t  *rxb_pool;
-        struct list_head        rxb_list;       /* for the rxb_pool list */
-        struct list_head        rxb_repost_list;/* for the kptl_sched_rxbq list */
-        int                     rxb_posted:1;   /* on the net */
-        int                     rxb_idle:1;     /* all done */
-        kptl_eventarg_t         rxb_eventarg;   /* event->md.user_ptr */
-        int                     rxb_refcount;   /* reference count */
-        ptl_handle_md_t         rxb_mdh;        /* the portals memory descriptor (MD) handle */
-        char                   *rxb_buffer;     /* the buffer */
+        kptl_rx_buffer_pool_t *rxb_pool;
+        cfs_list_t             rxb_list;       /* for the rxb_pool list */
+        cfs_list_t             rxb_repost_list;/* for the kptl_sched_rxbq list */
+        int                    rxb_posted:1;   /* on the net */
+        int                    rxb_idle:1;     /* all done */
+        kptl_eventarg_t        rxb_eventarg;   /* event->md.user_ptr */
+        int                    rxb_refcount;   /* reference count */
+        ptl_handle_md_t        rxb_mdh;        /* the portals memory descriptor (MD) handle */
+        char                  *rxb_buffer;     /* the buffer */
 
 };
 
@@ -206,8 +191,8 @@ typedef union {
 
 typedef struct kptl_tx                           /* transmit message */
 {
-        struct list_head        tx_list;      /* queue on idle_txs etc */
-        atomic_t                tx_refcount;  /* reference count*/
+        cfs_list_t              tx_list;      /* queue on idle_txs etc */
+        cfs_atomic_t            tx_refcount;  /* reference count*/
         enum kptl_tx_type       tx_type;      /* small msg/{put,get}{req,resp} */
         int                     tx_active:1;  /* queued on the peer */
         int                     tx_idle:1;    /* on the free list */
@@ -239,13 +224,13 @@ enum kptllnd_peer_state
 
 struct kptl_peer
 {
-        struct list_head        peer_list;
-        atomic_t                peer_refcount;          /* The current refrences */
+        cfs_list_t              peer_list;
+        cfs_atomic_t            peer_refcount;          /* The current references */
         enum kptllnd_peer_state peer_state;
-        spinlock_t              peer_lock;              /* serialize */
-        struct list_head        peer_noops;             /* PTLLND_MSG_TYPE_NOOP txs */
-        struct list_head        peer_sendq;             /* txs waiting for mh handles */
-        struct list_head        peer_activeq;           /* txs awaiting completion */
+       spinlock_t              peer_lock;              /* serialize */
+        cfs_list_t              peer_noops;             /* PTLLND_MSG_TYPE_NOOP txs */
+        cfs_list_t              peer_sendq;             /* txs waiting for mh handles */
+        cfs_list_t              peer_activeq;           /* txs awaiting completion */
         lnet_process_id_t       peer_id;                /* Peer's LNET id */
         ptl_process_id_t        peer_ptlid;             /* Peer's portals id */
         __u64                   peer_incarnation;       /* peer's incarnation */
@@ -267,39 +252,49 @@ struct kptl_data
 {
         int                     kptl_init;             /* initialisation state */
         volatile int            kptl_shutdown;         /* shut down? */
-        atomic_t                kptl_nthreads;         /* # live threads */
-        lnet_ni_t              *kptl_ni;               /* _the_ LND instance */
+        cfs_atomic_t            kptl_nthreads;         /* # live threads */
         ptl_handle_ni_t         kptl_nih;              /* network inteface handle */
         ptl_process_id_t        kptl_portals_id;       /* Portals ID of interface */
         __u64                   kptl_incarnation;      /* which one am I */
         ptl_handle_eq_t         kptl_eqh;              /* Event Queue (EQ) */
 
-        spinlock_t              kptl_sched_lock;       /* serialise... */
-        wait_queue_head_t       kptl_sched_waitq;      /* schedulers sleep here */
-        struct list_head        kptl_sched_txq;        /* tx requiring attention */
-        struct list_head        kptl_sched_rxq;        /* rx requiring attention */
-        struct list_head        kptl_sched_rxbq;       /* rxb requiring reposting */
+       rwlock_t                kptl_net_rw_lock;       /* serialise... */
+       cfs_list_t              kptl_nets;              /* kptl_net instance*/
+
+       spinlock_t              kptl_sched_lock;        /* serialise... */
+       wait_queue_head_t       kptl_sched_waitq;      /* schedulers sleep here */
+       cfs_list_t              kptl_sched_txq;        /* tx requiring attention */
+       cfs_list_t              kptl_sched_rxq;        /* rx requiring attention */
+       cfs_list_t              kptl_sched_rxbq;       /* rxb requiring reposting */
 
-        wait_queue_head_t       kptl_watchdog_waitq;   /* watchdog sleeps here */
+       wait_queue_head_t       kptl_watchdog_waitq;   /* watchdog sleeps here */
 
-        kptl_rx_buffer_pool_t   kptl_rx_buffer_pool;   /* rx buffer pool */
-        cfs_mem_cache_t*        kptl_rx_cache;         /* rx descripter cache */
+       kptl_rx_buffer_pool_t   kptl_rx_buffer_pool;   /* rx buffer pool */
+       struct kmem_cache       *kptl_rx_cache;         /* rx descripter cache */
 
-        atomic_t                kptl_ntx;              /* # tx descs allocated */
-        spinlock_t              kptl_tx_lock;          /* serialise idle tx list*/
-        struct list_head        kptl_idle_txs;         /* idle tx descriptors */
+        cfs_atomic_t            kptl_ntx;              /* # tx descs allocated */
+       spinlock_t              kptl_tx_lock;        /* serialise idle tx list*/
+       cfs_list_t              kptl_idle_txs;       /* idle tx descriptors */
 
-        rwlock_t                kptl_peer_rw_lock;     /* lock for peer table */
-        struct list_head       *kptl_peers;            /* hash table of all my known peers */
-        struct list_head        kptl_closing_peers;    /* peers being closed */
-        struct list_head        kptl_zombie_peers;     /* peers waiting for refs to drain */
+       rwlock_t                kptl_peer_rw_lock;   /* lock for peer table */
+        cfs_list_t             *kptl_peers;            /* hash table of all my known peers */
+        cfs_list_t              kptl_closing_peers;    /* peers being closed */
+        cfs_list_t              kptl_zombie_peers;     /* peers waiting for refs to drain */
         int                     kptl_peer_hash_size;   /* size of kptl_peers */
         int                     kptl_npeers;           /* # peers extant */
         int                     kptl_n_active_peers;   /* # active peers */
         int                     kptl_expected_peers;   /* # peers I can buffer HELLOs from */
 
         kptl_msg_t             *kptl_nak_msg;          /* common NAK message */
-        spinlock_t              kptl_ptlid2str_lock;   /* serialise str ops */
+       spinlock_t              kptl_ptlid2str_lock;    /* serialise str ops */
+};
+
+struct kptl_net
+{
+        cfs_list_t        net_list;      /* chain on kptl_data:: kptl_nets */
+        lnet_ni_t        *net_ni;
+        cfs_atomic_t      net_refcount;  /* # current references */
+        int               net_shutdown;  /* lnd_shutdown called */
 };
 
 enum 
@@ -313,14 +308,12 @@ extern kptl_tunables_t  kptllnd_tunables;
 extern kptl_data_t      kptllnd_data;
 
 static inline lnet_nid_t 
-kptllnd_ptl2lnetnid(ptl_nid_t ptl_nid)
+kptllnd_ptl2lnetnid(lnet_nid_t ni_nid, ptl_nid_t ptl_nid)
 {
 #ifdef _USING_LUSTRE_PORTALS_
-        return LNET_MKNID(LNET_NIDNET(kptllnd_data.kptl_ni->ni_nid), 
-                          LNET_NIDADDR(ptl_nid));
+        return LNET_MKNID(LNET_NIDNET(ni_nid), LNET_NIDADDR(ptl_nid));
 #else
-       return LNET_MKNID(LNET_NIDNET(kptllnd_data.kptl_ni->ni_nid), 
-                          ptl_nid);
+        return LNET_MKNID(LNET_NIDNET(ni_nid), ptl_nid);
 #endif
 }
 
@@ -338,6 +331,7 @@ kptllnd_lnet2ptlnid(lnet_nid_t lnet_nid)
 int  kptllnd_startup(lnet_ni_t *ni);
 void kptllnd_shutdown(lnet_ni_t *ni);
 int  kptllnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg);
+void kptllnd_query (struct lnet_ni *ni, lnet_nid_t nid, cfs_time_t *when);
 int  kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg);
 int  kptllnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
                   int delayed, unsigned int niov, 
@@ -363,11 +357,11 @@ kptllnd_eventarg2obj (kptl_eventarg_t *eva)
         default:
                 LBUG();
         case PTLLND_EVENTARG_TYPE_BUF:
-                return list_entry(eva, kptl_rx_buffer_t, rxb_eventarg);
+                return cfs_list_entry(eva, kptl_rx_buffer_t, rxb_eventarg);
         case PTLLND_EVENTARG_TYPE_RDMA:
-                return list_entry(eva, kptl_tx_t, tx_rdma_eventarg);
+                return cfs_list_entry(eva, kptl_tx_t, tx_rdma_eventarg);
         case PTLLND_EVENTARG_TYPE_MSG:
-                return list_entry(eva, kptl_tx_t, tx_msg_eventarg);
+                return cfs_list_entry(eva, kptl_tx_t, tx_msg_eventarg);
         }
 }
 
@@ -390,25 +384,25 @@ kptllnd_rx_buffer_size(void)
 static inline void
 kptllnd_rx_buffer_addref(kptl_rx_buffer_t *rxb)
 {
-        unsigned long flags;
-        
-        spin_lock_irqsave(&rxb->rxb_pool->rxbp_lock, flags);
-        rxb->rxb_refcount++;
-        spin_unlock_irqrestore(&rxb->rxb_pool->rxbp_lock, flags);
+       unsigned long flags;
+
+       spin_lock_irqsave(&rxb->rxb_pool->rxbp_lock, flags);
+       rxb->rxb_refcount++;
+       spin_unlock_irqrestore(&rxb->rxb_pool->rxbp_lock, flags);
 }
 
 static inline void
 kptllnd_rx_buffer_decref_locked(kptl_rx_buffer_t *rxb)
 {
-        if (--(rxb->rxb_refcount) == 0) {
-                spin_lock(&kptllnd_data.kptl_sched_lock);
-        
-                list_add_tail(&rxb->rxb_repost_list,
-                              &kptllnd_data.kptl_sched_rxbq);
-                wake_up(&kptllnd_data.kptl_sched_waitq);
-
-                spin_unlock(&kptllnd_data.kptl_sched_lock);
-        }
+       if (--(rxb->rxb_refcount) == 0) {
+               spin_lock(&kptllnd_data.kptl_sched_lock);
+
+               cfs_list_add_tail(&rxb->rxb_repost_list,
+                                 &kptllnd_data.kptl_sched_rxbq);
+               wake_up(&kptllnd_data.kptl_sched_waitq);
+
+               spin_unlock(&kptllnd_data.kptl_sched_lock);
+       }
 }
 
 static inline void
@@ -416,10 +410,10 @@ kptllnd_rx_buffer_decref(kptl_rx_buffer_t *rxb)
 {
         unsigned long flags;
         int           count;
-        
-        spin_lock_irqsave(&rxb->rxb_pool->rxbp_lock, flags);
-        count = --(rxb->rxb_refcount);
-        spin_unlock_irqrestore(&rxb->rxb_pool->rxbp_lock, flags);
+
+       spin_lock_irqsave(&rxb->rxb_pool->rxbp_lock, flags);
+       count = --(rxb->rxb_refcount);
+       spin_unlock_irqrestore(&rxb->rxb_pool->rxbp_lock, flags);
 
         if (count == 0)
                 kptllnd_rx_buffer_post(rxb);
@@ -450,8 +444,10 @@ int  kptllnd_peer_connect(kptl_tx_t *tx, lnet_nid_t nid);
 void kptllnd_peer_check_sends(kptl_peer_t *peer);
 void kptllnd_peer_check_bucket(int idx, int stamp);
 void kptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag);
-int  kptllnd_find_target(kptl_peer_t **peerp, lnet_process_id_t target);
-kptl_peer_t *kptllnd_peer_handle_hello(ptl_process_id_t initiator,
+int  kptllnd_find_target(kptl_net_t *net, lnet_process_id_t target,
+                         kptl_peer_t **peerp);
+kptl_peer_t *kptllnd_peer_handle_hello(kptl_net_t *net,
+                                       ptl_process_id_t initiator,
                                        kptl_msg_t *msg);
 kptl_peer_t *kptllnd_id2peer_locked(lnet_process_id_t id);
 void kptllnd_peer_alive(kptl_peer_t *peer);
@@ -459,29 +455,45 @@ void kptllnd_peer_alive(kptl_peer_t *peer);
 static inline void
 kptllnd_peer_addref (kptl_peer_t *peer)
 {
-        atomic_inc(&peer->peer_refcount);
+        cfs_atomic_inc(&peer->peer_refcount);
 }
 
 static inline void
 kptllnd_peer_decref (kptl_peer_t *peer)
 {
-        if (atomic_dec_and_test(&peer->peer_refcount))
+        if (cfs_atomic_dec_and_test(&peer->peer_refcount))
                 kptllnd_peer_destroy(peer);
 }
 
 static inline void
-kptllnd_set_tx_peer(kptl_tx_t *tx, kptl_peer_t *peer) 
+kptllnd_net_addref (kptl_net_t *net)
+{
+        LASSERT (cfs_atomic_read(&net->net_refcount) > 0);
+        cfs_atomic_inc(&net->net_refcount);
+}
+
+static inline void
+kptllnd_net_decref (kptl_net_t *net)
+{
+        LASSERT (cfs_atomic_read(&net->net_refcount) > 0);
+        cfs_atomic_dec(&net->net_refcount);
+}
+
+static inline void
+kptllnd_set_tx_peer(kptl_tx_t *tx, kptl_peer_t *peer)
 {
         LASSERT (tx->tx_peer == NULL);
-        
+
         kptllnd_peer_addref(peer);
         tx->tx_peer = peer;
 }
 
-static inline struct list_head *
+static inline cfs_list_t *
 kptllnd_nid2peerlist(lnet_nid_t nid)
 {
-        unsigned int hash = ((unsigned int)nid) %
+        /* Only one copy of peer state for all logical peers, so the net part
+         * of NIDs is ignored; e.g. A@ptl0 and A@ptl2 share peer state */
+        unsigned int hash = ((unsigned int)LNET_NIDADDR(nid)) %
                             kptllnd_data.kptl_peer_hash_size;
 
         return &kptllnd_data.kptl_peers[hash];
@@ -493,9 +505,9 @@ kptllnd_id2peer(lnet_process_id_t id)
         kptl_peer_t   *peer;
         unsigned long  flags;
 
-        read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
-        peer = kptllnd_id2peer_locked(id);
-        read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
+       read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);
+       peer = kptllnd_id2peer_locked(id);
+       read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);
 
         return peer;
 }
@@ -510,14 +522,14 @@ kptllnd_reserve_buffers(int n)
 static inline int
 kptllnd_peer_reserve_buffers(void)
 {
-        return kptllnd_reserve_buffers(*kptllnd_tunables.kptl_peercredits);
+        return kptllnd_reserve_buffers(*kptllnd_tunables.kptl_peertxcredits);
 }
 
 static inline void
 kptllnd_peer_unreserve_buffers(void)
 {
         kptllnd_rx_buffer_pool_unreserve(&kptllnd_data.kptl_rx_buffer_pool,
-                                         *kptllnd_tunables.kptl_peercredits);
+                                         *kptllnd_tunables.kptl_peertxcredits);
 }
 
 /*
@@ -526,6 +538,9 @@ kptllnd_peer_unreserve_buffers(void)
 int  kptllnd_setup_tx_descs(void);
 void kptllnd_cleanup_tx_descs(void);
 void kptllnd_tx_fini(kptl_tx_t *tx);
+void kptllnd_cancel_txlist(cfs_list_t *peerq, cfs_list_t *txs);
+void kptllnd_restart_txs(kptl_net_t *net, lnet_process_id_t id,
+                         cfs_list_t *restarts);
 kptl_tx_t *kptllnd_get_idle_tx(enum kptl_tx_type purpose);
 void kptllnd_tx_callback(ptl_event_t *ev);
 const char *kptllnd_tx_typestr(int type);
@@ -533,22 +548,23 @@ const char *kptllnd_tx_typestr(int type);
 static inline void
 kptllnd_tx_addref(kptl_tx_t *tx)
 {
-        atomic_inc(&tx->tx_refcount);
+        cfs_atomic_inc(&tx->tx_refcount);
 }
 
-static inline void 
+static inline void
 kptllnd_tx_decref(kptl_tx_t *tx)
 {
-        LASSERT (!in_interrupt());        /* Thread context only */
+       LASSERT (!in_interrupt());        /* Thread context only */
 
-        if (atomic_dec_and_test(&tx->tx_refcount))
-                kptllnd_tx_fini(tx);
+       if (cfs_atomic_dec_and_test(&tx->tx_refcount))
+               kptllnd_tx_fini(tx);
 }
 
 /*
  * MESSAGE SUPPORT FUNCTIONS
  */
-void kptllnd_init_msg(kptl_msg_t *msg, int type, int body_nob);
+void kptllnd_init_msg(kptl_msg_t *msg, int type,
+                      lnet_process_id_t target, int body_nob);
 void kptllnd_msg_pack(kptl_msg_t *msg, kptl_peer_t *peer);
 int  kptllnd_msg_unpack(kptl_msg_t *msg, int nob);