Whamcloud - gitweb
Update b_release_1_4_6 from b_hd_newconfig
authoradilger <adilger>
Tue, 10 Jan 2006 12:48:27 +0000 (12:48 +0000)
committeradilger <adilger>
Tue, 10 Jan 2006 12:48:27 +0000 (12:48 +0000)
- add timestamps for liblustre debug messages
- use wait_event_interruptible_exclusive() to LNDs so we don't get
  too-many-thread wakeups and lock contention when waiting for events
- fix tx credits handling
- fix incorrect error for multiple networks and portals_compatible option
- add 2.4 compatibility macro for wait_event_interruptible_exclusive()

33 files changed:
lnet/include/libcfs/libcfs.h
lnet/include/libcfs/linux/portals_compat25.h
lnet/include/libcfs/list.h
lnet/include/lnet/lib-lnet.h
lnet/include/lnet/lib-types.h
lnet/klnds/gmlnd/gmlnd_cb.c
lnet/klnds/gmlnd/gmlnd_comm.c
lnet/klnds/iiblnd/iiblnd.h
lnet/klnds/iiblnd/iiblnd_cb.c
lnet/klnds/openiblnd/openiblnd.h
lnet/klnds/openiblnd/openiblnd_cb.c
lnet/klnds/ptllnd/ptllnd.c
lnet/klnds/ptllnd/ptllnd_cb.c
lnet/klnds/ptllnd/ptllnd_rx_buf.c
lnet/klnds/ptllnd/ptllnd_tx.c
lnet/klnds/qswlnd/qswlnd_cb.c
lnet/klnds/ralnd/ralnd_cb.c
lnet/klnds/socklnd/socklnd_cb.c
lnet/klnds/viblnd/viblnd.h
lnet/klnds/viblnd/viblnd_cb.c
lnet/libcfs/debug.c
lnet/libcfs/linux/linux-debug.c
lnet/libcfs/nidstrings.c
lnet/lnet/api-ni.c
lnet/lnet/config.c
lnet/lnet/lib-move.c
lnet/lnet/lib-msg.c
lnet/lnet/lo.c
lnet/ulnds/ptllnd/ptllnd.h
lnet/ulnds/ptllnd/ptllnd_cb.c
lnet/ulnds/socklnd/tcplnd.c
lnet/utils/Makefile.am
lnet/utils/routerstat.c

index bd49c4a..898af0b 100644 (file)
@@ -188,9 +188,10 @@ do {                                                                    \
 do {                                                                    \
         if (((mask) & (D_ERROR | D_EMERG | D_WARNING | D_CONSOLE)) ||   \
             (libcfs_debug & (mask) &&                                   \
-             libcfs_subsystem_debug & DEBUG_SUBSYSTEM))                 \
-                fprintf(stderr, "(%s:%d:%s()) " format,                 \
-                        __FILE__, __LINE__, __FUNCTION__, ## a);        \
+             libcfs_subsystem_debug & DEBUG_SUBSYSTEM)) {               \
+                libcfs_debug_msg(DEBUG_SUBSYSTEM, mask, __FILE__,       \
+                                __FUNCTION__, __LINE__, 0, format, ## a);\
+        }                                                               \
 } while (0)
 #define CDEBUG_LIMIT CDEBUG
 
index 31658d5..4c5d748 100644 (file)
@@ -44,6 +44,8 @@
 # define RECALC_SIGPENDING         recalc_sigpending()
 # define CLEAR_SIGPENDING          (current->sigpending = 0)
 # define CURRENT_SECONDS           CURRENT_TIME
+# define wait_event_interruptible_exclusive(wq, condition)              \
+        wait_event_interruptible(wq, condition)
 
 #else /* 2.4.x */
 
@@ -56,6 +58,8 @@
 # define RECALC_SIGPENDING         recalc_sigpending(current)
 # define CLEAR_SIGPENDING          (current->sigpending = 0)
 # define CURRENT_SECONDS           CURRENT_TIME
+# define wait_event_interruptible_exclusive(wq, condition)              \
+        wait_event_interruptible(wq, condition)
 
 #endif
 
index badbd69..7367d14 100644 (file)
@@ -124,6 +124,8 @@ static inline void list_del_init(struct list_head *entry)
  * list_move - delete from one list and add as another's head
  * @list: the entry to move
  * @head: the head that will precede our entry
+ *
+ * This is not safe to use if @list is already on the same list as @head.
  */
 static inline void list_move(struct list_head *list, struct list_head *head)
 {
@@ -135,6 +137,8 @@ static inline void list_move(struct list_head *list, struct list_head *head)
  * list_move_tail - delete from one list and add as another's tail
  * @list: the entry to move
  * @head: the head that will follow our entry
+ *
+ * This is not safe to use if @list is already on the same list as @head.
  */
 static inline void list_move_tail(struct list_head *list,
                                  struct list_head *head)
@@ -223,6 +227,7 @@ static inline void list_splice_init(struct list_head *list,
 
 #define hlist_head     list_head
 #define hlist_node     list_head
+#define hlist_del_init list_del_init
 
 #endif /* __linux__*/
 
index 3f4e93c..0d93071 100644 (file)
@@ -519,8 +519,8 @@ void lnet_prep_send(lnet_msg_t *msg, int type, lnet_process_id_t target,
                     unsigned int offset, unsigned int len);
 int lnet_send(lnet_nid_t nid, lnet_msg_t *msg);
 void lnet_return_credits_locked (lnet_msg_t *msg);
-int lnet_parse (lnet_ni_t *ni, lnet_hdr_t *hdr, 
-                lnet_nid_t fromnid, void *private);
+int lnet_parse (lnet_ni_t *ni, lnet_hdr_t *hdr,
+                lnet_nid_t fromnid, void *private, int rdma_req);
 void lnet_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed,
                unsigned int offset, unsigned int mlen, unsigned int rlen);
 lnet_msg_t *lnet_create_reply_msg (lnet_ni_t *ni, lnet_msg_t *get_msg);
index dfbefdd..b17b4f4 100644 (file)
@@ -145,7 +145,6 @@ typedef struct lnet_msg {
         int                 msg_ack:1;          /* ack on finalize (PUT) */
         int                 msg_sending:1;      /* outgoing message */
         int                 msg_receiving:1;    /* being received */
-        int                 msg_recvaftersend:1; /* lnd_recv() outstanding */
         int                 msg_delayed:1;      /* had to Q for buffer or tx credit */
         int                 msg_txcredit:1;     /* taken an NI send credit */
         int                 msg_peertxcredit:1; /* taken a peer send credit */
index 9a46978..679c324 100644 (file)
@@ -121,10 +121,6 @@ gmnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                 
                 tx->tx_msgnob += len;
                 tx->tx_large_nob = 0;
-
-                /* We've copied everything... */
-                LASSERT(tx->tx_lntmsg == NULL);
-                lnet_finalize(ni, lntmsg, 0);
         } else {
                 /* stash payload pts to copy later */
                 tx->tx_large_nob = len;
@@ -134,10 +130,10 @@ gmnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                         tx->tx_large_frags.kiov = kiov;
                 else
                         tx->tx_large_frags.iov = iov;
-
-                /* finalize later */
-                tx->tx_lntmsg = lntmsg;
         }
+
+        LASSERT(tx->tx_lntmsg == NULL);
+        tx->tx_lntmsg = lntmsg;
         
         spin_lock(&gmni->gmni_tx_lock);
 
index 217bed9..a5a443a 100644 (file)
@@ -149,8 +149,9 @@ gmnal_tx_done(gmnal_tx_t *tx, int rc)
 {
        gmnal_ni_t *gmni = tx->tx_gmni;
         int         wake_sched = 0;
+        lnet_msg_t *lnetmsg = tx->tx_lntmsg;
         
-        LASSERT(tx->tx_lntmsg == NULL);
+        tx->tx_lntmsg = NULL;
 
         spin_lock(&gmni->gmni_tx_lock);
         
@@ -172,6 +173,10 @@ gmnal_tx_done(gmnal_tx_t *tx, int rc)
                 gmnal_check_txqueues_locked(gmni);
 
         spin_unlock(&gmni->gmni_tx_lock);
+
+        /* Delay finalize until tx is free */
+        if (lnetmsg != NULL)
+                lnet_finalize(gmni->gmni_ni, lnetmsg, 0);
 }
 
 void 
@@ -288,15 +293,9 @@ gmnal_check_txqueues_locked (gmnal_ni_t *gmni)
 
                         tx->tx_msgnob += tx->tx_large_nob;
 
-                        /* We've copied everything... */
-                        lnet_finalize(gmni->gmni_ni, tx->tx_lntmsg, 0);
-                        tx->tx_lntmsg = NULL;
-
                         spin_lock(&gmni->gmni_tx_lock);
                 }
 
-                LASSERT (tx->tx_lntmsg == NULL);
-
                 list_add_tail(&tx->tx_list, &gmni->gmni_cred_txq);
         }
 
@@ -434,7 +433,7 @@ gmnal_rx_thread(void *arg)
                         rc =  lnet_parse(gmni->gmni_ni, 
                                          &msg->gmm_u.immediate.gmim_hdr,
                                          msg->gmm_srcnid,
-                                         rx);
+                                         rx, 0);
                 }
 
                 if (rc < 0)                     /* parse failure */
index 69761c9..edb2865 100644 (file)
@@ -347,7 +347,6 @@ typedef struct kib_rx                           /* receive message */
 {
         struct list_head          rx_list;      /* queue for attention */
         struct kib_conn          *rx_conn;      /* owning conn */
-        int                       rx_responded; /* responded to peer? */
         int                       rx_nob;       /* # bytes received (-1 while posted) */
         __u64                     rx_hca_msg;   /* pre-mapped buffer (hca vaddr) */
         kib_msg_t                *rx_msg;       /* pre-mapped buffer (host vaddr) */
@@ -402,7 +401,6 @@ typedef struct kib_conn
         __u64               ibc_rxseq;          /* rx sequence number */
         atomic_t            ibc_refcount;       /* # users */
         int                 ibc_state;          /* what's happening */
-        atomic_t            ibc_nob;            /* # bytes buffered */
         int                 ibc_nsends_posted;  /* # uncompleted sends */
         int                 ibc_credits;        /* # credits I have */
         int                 ibc_outstanding_credits; /* # credits to return */
index a4b21a2..23a9577 100644 (file)
@@ -56,8 +56,9 @@ hexdump(char *string, void *ptr, int len)
 void
 kibnal_tx_done (kib_tx_t *tx)
 {
-        int        rc = tx->tx_status;
-        int        i;
+        lnet_msg_t *lntmsg[2];
+        int         rc = tx->tx_status;
+        int         i;
 
         LASSERT (!in_interrupt());
         LASSERT (!tx->tx_queued);               /* mustn't be queued for sending */
@@ -67,14 +68,9 @@ kibnal_tx_done (kib_tx_t *tx)
 #if IBNAL_USE_FMR
         /* Handle unmapping if required */
 #endif
-        for (i = 0; i < 2; i++) {
-                /* tx may have up to 2 ptlmsgs to finalise */
-                if (tx->tx_lntmsg[i] == NULL)
-                        continue;
-
-                lnet_finalize (kibnal_data.kib_ni, tx->tx_lntmsg[i], rc);
-                tx->tx_lntmsg[i] = NULL;
-        }
+        /* tx may have up to 2 lnet msgs to finalise */
+        lntmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
+        lntmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
         
         if (tx->tx_conn != NULL) {
                 kibnal_conn_decref(tx->tx_conn);
@@ -89,6 +85,14 @@ kibnal_tx_done (kib_tx_t *tx)
         list_add (&tx->tx_list, &kibnal_data.kib_idle_txs);
 
         spin_unlock(&kibnal_data.kib_tx_lock);
+
+        /* delay finalize until my descs have been freed */
+        for (i = 0; i < 2; i++) {
+                if (lntmsg[i] == NULL)
+                        continue;
+
+                lnet_finalize (kibnal_data.kib_ni, lntmsg[i], rc);
+        }
 }
 
 kib_tx_t *
@@ -318,9 +322,6 @@ kibnal_handle_rx (kib_rx_t *rx)
                 kibnal_check_sends(conn);
         }
 
-        /* clear flag so GET_REQ can see if it caused a REPLY */
-        rx->rx_responded = 0;
-
         switch (msg->ibm_type) {
         default:
                 CERROR("Bad IBNAL message type %x from %s\n",
@@ -333,13 +334,13 @@ kibnal_handle_rx (kib_rx_t *rx)
 
         case IBNAL_MSG_IMMEDIATE:
                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.immediate.ibim_hdr,
-                                msg->ibm_srcnid, rx);
+                                msg->ibm_srcnid, rx, 0);
                 repost = rc < 0;                /* repost on error */
                 break;
                 
         case IBNAL_MSG_PUT_REQ:
                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.putreq.ibprm_hdr,
-                                msg->ibm_srcnid, rx);
+                                msg->ibm_srcnid, rx, 1);
                 repost = rc < 0;                /* repost on error */
                 break;
 
@@ -396,7 +397,7 @@ kibnal_handle_rx (kib_rx_t *rx)
 
         case IBNAL_MSG_GET_REQ:
                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.get.ibgm_hdr,
-                                msg->ibm_srcnid, rx);
+                                msg->ibm_srcnid, rx, 1);
                 repost = rc < 0;                /* repost on error */
                 break;
 
@@ -1407,73 +1408,7 @@ kibnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                 kibnal_launch_tx(tx, target.nid);
                 return 0;
 
-        case LNET_MSG_REPLY: {
-                /* reply's 'private' is the incoming receive */
-                kib_rx_t *rx = private;
-
-                LASSERT(routing || rx != NULL);
-
-                if (!routing && rx->rx_msg->ibm_type != IBNAL_MSG_IMMEDIATE) {
-                        /* Incoming message consistent with RDMA? */
-                        if (rx->rx_msg->ibm_type != IBNAL_MSG_GET_REQ) {
-                                CERROR("REPLY to %s bad msg type %x!!!\n",
-                                       libcfs_nid2str(target.nid), 
-                                       rx->rx_msg->ibm_type);
-                                return -EIO;
-                        }
-
-                        /* NB handle_rx() will send GET_NAK when I return to
-                         * it from here, unless I set rx_responded! */
-
-                        tx = kibnal_get_idle_tx();
-                        if (tx == NULL) {
-                                CERROR("Can't get tx for REPLY to %s\n",
-                                       libcfs_nid2str(target.nid));
-                                return -ENOMEM;
-                        }
-
-                        if (payload_nob == 0)
-                                rc = 0;
-                        else if (payload_kiov == NULL)
-                                rc = kibnal_setup_rd_iov(
-                                        tx, tx->tx_rd, 1, 
-                                        payload_niov, payload_iov, 
-                                        payload_offset, payload_nob);
-                        else
-                                rc = kibnal_setup_rd_kiov(
-                                        tx, tx->tx_rd, 1,
-                                        payload_niov, payload_kiov,
-                                        payload_offset, payload_nob);
-                        if (rc != 0) {
-                                CERROR("Can't setup GET src for %s: %d\n",
-                                       libcfs_nid2str(target.nid), rc);
-                                kibnal_tx_done(tx);
-                                return -EIO;
-                        }
-                
-                        rc = kibnal_init_rdma(tx, IBNAL_MSG_GET_DONE, 
-                                              payload_nob,
-                                              &rx->rx_msg->ibm_u.get.ibgm_rd,
-                                              rx->rx_msg->ibm_u.get.ibgm_cookie);
-                        if (rc < 0) {
-                                CERROR("Can't setup rdma for GET from %s: %d\n", 
-                                       libcfs_nid2str(target.nid), rc);
-                        } else if (rc == 0) {
-                                /* No RDMA: local completion may happen now! */
-                                lnet_finalize (kibnal_data.kib_ni, lntmsg, 0);
-                        } else {
-                                /* RDMA: lnet_finalize(lntmsg) when it
-                                 * completes */
-                                tx->tx_lntmsg[0] = lntmsg;
-                        }
-
-                        kibnal_queue_tx(tx, rx->rx_conn);
-                        rx->rx_responded = 1;
-                        return (rc >= 0) ? 0 : -EIO;
-                }
-                /* fall through to handle like PUT */
-        }
-
+        case LNET_MSG_REPLY: 
         case LNET_MSG_PUT:
                 /* Is the payload small enough not to need RDMA? */
                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
@@ -1548,6 +1483,67 @@ kibnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
         return 0;
 }
 
+void
+kibnal_reply(lnet_ni_t *ni, kib_rx_t *rx, lnet_msg_t *lntmsg)
+{
+        lnet_process_id_t target = lntmsg->msg_target;
+        unsigned int      niov = lntmsg->msg_niov; 
+        struct iovec     *iov = lntmsg->msg_iov; 
+        lnet_kiov_t      *kiov = lntmsg->msg_kiov;
+        unsigned int      offset = lntmsg->msg_offset;
+        unsigned int      nob = lntmsg->msg_len;
+        kib_tx_t         *tx;
+        int               rc;
+        
+        tx = kibnal_get_idle_tx();
+        if (tx == NULL) {
+                CERROR("Can't get tx for REPLY to %s\n",
+                       libcfs_nid2str(target.nid));
+                goto failed_0;
+        }
+
+        if (nob == 0)
+                rc = 0;
+        else if (kiov == NULL)
+                rc = kibnal_setup_rd_iov(tx, tx->tx_rd, 1, 
+                                         niov, iov, offset, nob);
+        else
+                rc = kibnal_setup_rd_kiov(tx, tx->tx_rd, 1, 
+                                          niov, kiov, offset, nob);
+
+        if (rc != 0) {
+                CERROR("Can't setup GET src for %s: %d\n",
+                       libcfs_nid2str(target.nid), rc);
+                goto failed_1;
+        }
+        
+        rc = kibnal_init_rdma(tx, IBNAL_MSG_GET_DONE, nob,
+                              &rx->rx_msg->ibm_u.get.ibgm_rd,
+                              rx->rx_msg->ibm_u.get.ibgm_cookie);
+        if (rc < 0) {
+                CERROR("Can't setup rdma for GET from %s: %d\n", 
+                       libcfs_nid2str(target.nid), rc);
+                goto failed_1;
+        }
+        
+        if (rc == 0) {
+                /* No RDMA: local completion may happen now! */
+                lnet_finalize(ni, lntmsg, 0);
+        } else {
+                /* RDMA: lnet_finalize(lntmsg) when it
+                 * completes */
+                tx->tx_lntmsg[0] = lntmsg;
+        }
+        
+        kibnal_queue_tx(tx, rx->rx_conn);
+        return;
+        
+ failed_1:
+        kibnal_tx_done(tx);
+ failed_0:
+        lnet_finalize(ni, lntmsg, -EIO);
+}
+
 int
 kibnal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
              unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
@@ -1650,8 +1646,10 @@ kibnal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
                 break;
 
         case IBNAL_MSG_GET_REQ:
-                LASSERT (lntmsg == NULL);       /* no need to finalise */
-                if (!rx->rx_responded) {
+                if (lntmsg != NULL) {
+                        /* Optimized GET; RDMA lntmsg's payload */
+                        kibnal_reply(ni, rx, lntmsg);
+                } else {
                         /* GET didn't match anything */
                         kibnal_send_completion(rx->rx_conn, IBNAL_MSG_GET_DONE, 
                                                -ENODATA,
@@ -3086,7 +3084,7 @@ kibnal_scheduler(void *arg)
                 /* Nothing to do; sleep... */
 
                 set_current_state(TASK_INTERRUPTIBLE);
-                add_wait_queue(&kibnal_data.kib_sched_waitq, &wait);
+                add_wait_queue_exclusive(&kibnal_data.kib_sched_waitq, &wait);
                 spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
                                        flags);
 
index b4e6bcb..3b5959a 100644 (file)
@@ -312,7 +312,6 @@ typedef struct kib_rx                           /* receive message */
 {
         struct list_head          rx_list;      /* queue for attention */
         struct kib_conn          *rx_conn;      /* owning conn */
-        int                       rx_rdma;      /* RDMA completion posted? */
         int                       rx_nob;       /* # bytes received (-1 while posted) */
         __u64                     rx_vaddr;     /* pre-mapped buffer (hca vaddr) */
         kib_msg_t                *rx_msg;       /* pre-mapped buffer (host vaddr) */
@@ -363,7 +362,6 @@ typedef struct kib_conn
         __u64               ibc_incarnation;    /* which instance of the peer */
         atomic_t            ibc_refcount;       /* # users */
         int                 ibc_state;          /* what's happening */
-        atomic_t            ibc_nob;            /* # bytes buffered */
         int                 ibc_nsends_posted;  /* # uncompleted sends */
         int                 ibc_credits;        /* # credits I have */
         int                 ibc_outstanding_credits; /* # credits to return */
index afa5ace..0218743 100644 (file)
@@ -43,6 +43,7 @@ kibnal_schedule_tx_done (kib_tx_t *tx)
 void
 kibnal_tx_done (kib_tx_t *tx)
 {
+        lnet_msg_t      *lntmsg[2];
         unsigned long    flags;
         int              i;
         int              rc;
@@ -81,16 +82,11 @@ kibnal_tx_done (kib_tx_t *tx)
 #endif
         }
 
-        for (i = 0; i < 2; i++) {
-                /* tx may have up to 2 ptlmsgs to finalise */
-                if (tx->tx_lntmsg[i] == NULL)
-                        continue;
+        /* tx may have up to 2 ptlmsgs to finalise */
+        lntmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
+        lntmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
+        rc = tx->tx_status;
 
-                lnet_finalize (kibnal_data.kib_ni, tx->tx_lntmsg[i], 
-                               tx->tx_status);
-                tx->tx_lntmsg[i] = NULL;
-        }
-        
         if (tx->tx_conn != NULL) {
                 kibnal_put_conn (tx->tx_conn);
                 tx->tx_conn = NULL;
@@ -105,6 +101,14 @@ kibnal_tx_done (kib_tx_t *tx)
         list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_txs);
 
         spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
+
+        /* delay finalize until my descs have been freed */
+        for (i = 0; i < 2; i++) {
+                if (lntmsg[i] == NULL)
+                        continue;
+
+                lnet_finalize (kibnal_data.kib_ni, lntmsg[i], rc);
+        }
 }
 
 kib_tx_t *
@@ -365,23 +369,20 @@ kibnal_rx (kib_rx_t *rx)
         int          rc = 0;
         kib_msg_t   *msg = rx->rx_msg;
 
-        /* Clear flag so I can detect if I've sent an RDMA completion */
-        rx->rx_rdma = 0;
-
         switch (msg->ibm_type) {
         case IBNAL_MSG_GET_RDMA:
                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.rdma.ibrm_hdr,
-                                msg->ibm_srcnid, rx);
+                                msg->ibm_srcnid, rx, 1);
                 break;
                 
         case IBNAL_MSG_PUT_RDMA:
                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.rdma.ibrm_hdr,
-                                msg->ibm_srcnid, rx);
+                                msg->ibm_srcnid, rx, 1);
                 break;
 
         case IBNAL_MSG_IMMEDIATE:
                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.immediate.ibim_hdr,
-                                msg->ibm_srcnid, rx);
+                                msg->ibm_srcnid, rx, 0);
                 break;
 
         default:
@@ -1085,12 +1086,6 @@ kibnal_start_active_rdma (int type, int status,
         LASSERT (type == IBNAL_MSG_GET_DONE ||
                  type == IBNAL_MSG_PUT_DONE);
 
-        /* Flag I'm completing the RDMA.  Even if I fail to send the
-         * completion message, I will have tried my best so further
-         * attempts shouldn't be tried. */
-        LASSERT (!rx->rx_rdma);
-        rx->rx_rdma = 1;
-
         if (type == IBNAL_MSG_GET_DONE) {
                 access   = 0;
                 rdma_op  = IB_OP_RDMA_WRITE;
@@ -1246,31 +1241,7 @@ kibnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                                                  NULL, lntmsg->msg_md->md_iov.kiov,
                                                  lntmsg->msg_md->md_length);
 
-        case LNET_MSG_REPLY: {
-                /* reply's 'private' is the incoming receive */
-                kib_rx_t *rx = private;
-
-                LASSERT (routing || rx != NULL);
-
-                /* RDMA reply expected? */
-                if (!routing && rx->rx_msg->ibm_type != IBNAL_MSG_IMMEDIATE) {
-                        /* Incoming message consistent with RDMA */
-                        if (rx->rx_msg->ibm_type != IBNAL_MSG_GET_RDMA) {
-                                CERROR ("REPLY to %s bad ibm type %d!!!\n",
-                                        libcfs_nid2str(target.nid), 
-                                        rx->rx_msg->ibm_type);
-                                return (-EIO);
-                        }
-
-                        kibnal_start_active_rdma(IBNAL_MSG_GET_DONE, 0,
-                                                 rx, lntmsg, payload_niov, 
-                                                 payload_iov, payload_kiov,
-                                                 payload_offset, payload_nob);
-                        return (0);
-                }
-                /* Fall through to handle like PUT */
-        }
-
+        case LNET_MSG_REPLY:
         case LNET_MSG_PUT:
                 /* Is the payload small enough not to need RDMA? */
                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
@@ -1365,8 +1336,16 @@ kibnal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
                 break;
 
         case IBNAL_MSG_GET_RDMA:
-                LASSERT (lntmsg == NULL);       /* no need to finalise */
-                if (!rx->rx_rdma) {
+                if (lntmsg != NULL) {
+                        /* GET matched: RDMA lntmsg's payload */
+                        kibnal_start_active_rdma(IBNAL_MSG_GET_DONE, 0,
+                                                 rx, lntmsg, 
+                                                 lntmsg->msg_niov, 
+                                                 lntmsg->msg_iov, 
+                                                 lntmsg->msg_kiov,
+                                                 lntmsg->msg_offset, 
+                                                 lntmsg->msg_len);
+                } else {
                         /* GET didn't match anything */
                         kibnal_start_active_rdma (IBNAL_MSG_GET_DONE, -ENODATA,
                                                   rx, NULL, 0, NULL, NULL, 0, 0);
@@ -2408,7 +2387,7 @@ kibnal_connd (void *arg)
                         continue;
 
                 set_current_state (TASK_INTERRUPTIBLE);
-                add_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
+                add_wait_queue_exclusive(&kibnal_data.kib_connd_waitq, &wait);
 
                 spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
 
@@ -2480,7 +2459,7 @@ kibnal_scheduler(void *arg)
                         counter = 0;
 
                         if (!did_something) {
-                                rc = wait_event_interruptible(
+                                rc = wait_event_interruptible_exclusive(
                                         kibnal_data.kib_sched_waitq,
                                         !list_empty(&kibnal_data.kib_sched_txq) || 
                                         !list_empty(&kibnal_data.kib_sched_rxq) || 
index 2c641f9..1d5144f 100755 (executable)
@@ -480,8 +480,7 @@ kptllnd_startup (lnet_ni_t *ni)
                 "ptllnd_rx",
                 sizeof(kptl_rx_t) + *kptllnd_tunables.kptl_max_msg_size,
                 0, /* offset */
-                0, /* flags */
-                NULL,NULL); /* CTOR/DTOR */
+                0); /* flags */
         if( kptllnd_data->kptl_rx_cache == NULL ){
                 CERROR("Can't create slab for RX descriptrs\n");
                 goto failed;
index 4762d62..e5093e9 100644 (file)
@@ -412,7 +412,6 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
         kptl_tx_t        *tx = NULL;
         kptl_data_t      *kptllnd_data = ni->ni_data;
         int               nob;
-        int               rc;
 
         PJK_UT_MSG_DATA(">>> SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS\n");
         PJK_UT_MSG_DATA("nob=%d nov=%d offset=%d to %s\n",
@@ -452,8 +451,9 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                 LBUG();
                 return -EINVAL;
 
+        case LNET_MSG_REPLY:
         case LNET_MSG_PUT:
-                PJK_UT_MSG_DATA("LNET_MSG_PUT\n");
+                PJK_UT_MSG_DATA("LNET_MSG_PUT/REPLY\n");
 
                 /*
                  * Get an idle tx descriptor
@@ -470,8 +470,10 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                 if (nob <= *kptllnd_tunables.kptl_max_msg_size)
                         break;
 
-
-                STAT_UPDATE(kps_send_put);
+                if (type == LNET_MSG_REPLY)
+                        STAT_UPDATE(kps_send_reply);
+                else
+                        STAT_UPDATE(kps_send_put);
 
                 kptllnd_do_put(tx,lntmsg,kptllnd_data);
 
@@ -534,80 +536,6 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                 PJK_UT_MSG_DATA("LNET_MSG_ACK\n");
                 LASSERT (payload_nob == 0);
                 break;
-
-        case LNET_MSG_REPLY:
-                PJK_UT_MSG_DATA("LNET_MSG_REPLY\n");
-
-                STAT_UPDATE(kps_send_reply);
-
-                if(routing!=0 || target_is_router!=0)
-                {
-                        /*
-                         * Get an idle tx descriptor
-                         */
-                        tx = kptllnd_get_idle_tx(kptllnd_data,TX_TYPE_LARGE_PUT);
-                        if(tx == NULL){
-                                CERROR ("Can't send %d to %s: tx descs exhausted\n",
-                                        type, libcfs_id2str(target));
-                                return -ENOMEM;
-                        }
-
-                        /* Is the payload small enough not to need RDMA? */
-                        nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[payload_nob]);
-                        if (nob <= *kptllnd_tunables.kptl_max_msg_size)
-                                break;
-
-                        kptllnd_do_put(tx,lntmsg,kptllnd_data);
-
-                        PJK_UT_MSG_DATA("<<< SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS\n");
-                        return 0;
-                }else{
-                        /*
-                         * Reply's private is the incoming rx descriptor
-                         */
-                        kptl_rx_t *rx = private;
-                        LASSERT(rx != NULL);
-
-                        /*
-                         * If the request was to NOT do RDMA
-                         * break out and just send back an IMMEDIATE message
-                         */
-                        if (rx->rx_msg->ptlm_type == PTLLND_MSG_TYPE_IMMEDIATE) {
-                                /* RDMA not expected */
-                                nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[payload_nob]);
-                                if (nob > *kptllnd_tunables.kptl_max_msg_size) {
-                                        CERROR("REPLY for %s too big but RDMA not requested:"
-                                               "%d (max for message is %d)\n",
-                                               libcfs_id2str(target), payload_nob,
-                                               *kptllnd_tunables.kptl_max_msg_size);
-                                        CERROR("Can't REPLY IMMEDIATE %d to %s\n",
-                                               nob, libcfs_id2str(target));
-                                        return -EINVAL;
-                                }
-                                break;
-                        }
-
-
-                        /* Incoming message consistent with RDMA? */
-                        if (rx->rx_msg->ptlm_type != PTLLND_MSG_TYPE_GET) {
-                                CERROR("REPLY to %s bad msg type %x!!!\n",
-                                       libcfs_id2str(target), rx->rx_msg->ptlm_type);
-                                return -EINVAL;
-                        }
-
-                        rc = kptllnd_start_bulk_rdma(
-                                kptllnd_data,
-                                rx,
-                                lntmsg,
-                                PTL_MD_OP_PUT,
-                                payload_niov,
-                                payload_iov,
-                                payload_kiov,
-                                payload_offset,
-                                payload_nob);
-                        PJK_UT_MSG_DATA("<<< SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS rc=%d\n",rc);
-                        return rc;
-                }
         }
 
 
@@ -785,13 +713,25 @@ int kptllnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
 
         case PTLLND_MSG_TYPE_GET:
                 PJK_UT_MSG_DATA("PTLLND_MSG_TYPE_GET\n");
-                /* We get called here just to discard any junk after the
-                 * GET hdr. */
-                LASSERT (lntmsg == NULL); /* What is this all about ???*/
-
-                lnet_finalize (ni, lntmsg, 0);
 
-                rc = 0;
+                if (lntmsg == NULL) {
+                        /* No match for the GET request */
+                        /* XXX should RDMA 0 bytes of payload + hdr data saying GET failed */
+                        rc = 0;
+                } else {
+                        /* GET matched */
+                        rc = kptllnd_start_bulk_rdma(
+                                kptllnd_data,
+                                rx,
+                                lntmsg,
+                                PTL_MD_OP_PUT,
+                                lntmsg->msg_niov,
+                                lntmsg->msg_iov,
+                                lntmsg->msg_kiov,
+                                lntmsg->msg_offset,
+                                lntmsg->msg_len);
+                        PJK_UT_MSG_DATA("<<< SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS rc=%d\n",rc);
+                }
                 break;
 
         case PTLLND_MSG_TYPE_PUT:
@@ -926,7 +866,8 @@ kptllnd_watchdog(void *arg)
 
                 set_current_state (TASK_INTERRUPTIBLE);
                 cfs_waitq_add(&kptllnd_data->kptl_sched_waitq, &waitlink);
-                cfs_waitq_timedwait(&waitlink,cfs_time_seconds(PTLLND_TIMEOUT_SEC));
+                cfs_waitq_timedwait(&waitlink,CFS_TASK_INTERRUPTIBLE,
+                                    cfs_time_seconds(PTLLND_TIMEOUT_SEC));
                 set_current_state (TASK_RUNNING);
                 cfs_waitq_del (&kptllnd_data->kptl_sched_waitq, &waitlink);
 
@@ -1003,8 +944,9 @@ kptllnd_scheduler(void *arg)
                  */
 
                 set_current_state (TASK_INTERRUPTIBLE);
-                cfs_waitq_add(&kptllnd_data->kptl_sched_waitq, &waitlink);
-                cfs_waitq_timedwait(&waitlink,cfs_time_seconds(PTLLND_TIMEOUT_SEC));
+                cfs_waitq_add_exclusive(&kptllnd_data->kptl_sched_waitq, &waitlink);
+                cfs_waitq_timedwait(&waitlink, CFS_TASK_INTERRUPTIBLE,
+                                    cfs_time_seconds(PTLLND_TIMEOUT_SEC));
                 set_current_state (TASK_RUNNING);
                 cfs_waitq_del (&kptllnd_data->kptl_sched_waitq, &waitlink);
 
index 55ecc16..241be98 100644 (file)
@@ -772,7 +772,7 @@ kptllnd_rx_scheduler_handler(kptl_rx_t *rx)
                 rc = lnet_parse(kptllnd_data->kptl_ni,
                         &msg->ptlm_u.immediate.kptlim_hdr,
                         msg->ptlm_srcnid,
-                        rx);
+                        rx, 0);
                 /* RX Completing asynchronously */
                 if( rc >= 0)
                         rx = 0;
@@ -795,7 +795,7 @@ kptllnd_rx_scheduler_handler(kptl_rx_t *rx)
                 rc = lnet_parse(kptllnd_data->kptl_ni,
                         &msg->ptlm_u.req.kptlrm_hdr,
                         msg->ptlm_srcnid,
-                        rx);
+                        rx, 1);
 
                 /* RX Completing asynchronously */
                 if( rc >= 0)
@@ -878,7 +878,7 @@ kptllnd_rx_alloc(
                 return 0;
         }
 
-        rx = cfs_mem_cache_alloc ( kptllnd_data->kptl_rx_cache , CFS_SLAB_ATOMIC);
+        rx = cfs_mem_cache_alloc(kptllnd_data->kptl_rx_cache , CFS_ALLOC_ATOMIC);
         if(rx == 0 ){
                 CERROR("Failed to allocate rx\n");
                 STAT_UPDATE(kps_rx_allocation_failed);
index 17994b6..5c8d8c7 100644 (file)
@@ -197,7 +197,10 @@ exit:
 void
 kptllnd_tx_done (kptl_tx_t *tx)
 {
+        lnet_msg_t  *lnetmsg[2];
+        int          status = tx->tx_status;
         kptl_data_t *kptllnd_data = tx->tx_po.po_kptllnd_data;
+
         LASSERT (!in_interrupt());
 
         PJK_UT_MSG(">>> tx=%p\n",tx);
@@ -208,16 +211,9 @@ kptllnd_tx_done (kptl_tx_t *tx)
         LASSERT(atomic_read(&tx->tx_refcount) == 0);
         LASSERT(list_empty(&tx->tx_schedlist)); /*not any the scheduler list*/
 
-        if(tx->tx_ptlmsg != NULL){
-                PJK_UT_MSG("tx=%p finalize\n",tx);
-                lnet_finalize (kptllnd_data->kptl_ni, tx->tx_ptlmsg, tx->tx_status);
-                tx->tx_ptlmsg = NULL;
-        }
-        if(tx->tx_ptlmsg_reply != NULL){
-                PJK_UT_MSG("tx=%p finalize reply\n",tx);
-                lnet_finalize (kptllnd_data->kptl_ni, tx->tx_ptlmsg_reply, tx->tx_status);
-                tx->tx_ptlmsg_reply = NULL;
-        }
+        /* stash lnet msgs for finalize AFTER I free this tx desc */
+        lnetmsg[0] = tx->tx_ptlmsg; tx->tx_ptlmsg = NULL;
+        lnetmsg[1] = tx->tx_ptlmsg_reply; tx->tx_ptlmsg_reply = NULL;
 
         /*
          * Release the associated RX if there is one
@@ -252,6 +248,12 @@ kptllnd_tx_done (kptl_tx_t *tx)
         list_add (&tx->tx_list, &kptllnd_data->kptl_idle_txs);
         STAT_UPDATE(kps_tx_released);
         spin_unlock(&kptllnd_data->kptl_tx_lock);
+        
+        if (lnetmsg[0] != NULL)
+                lnet_finalize(kptllnd_data->kptl_ni, lnetmsg[0], status);
+
+        if (lnetmsg[1] != NULL)
+                lnet_finalize(kptllnd_data->kptl_ni, lnetmsg[1], status);
 
         PJK_UT_MSG("<<< tx=%p\n",tx);
 }
index 7a49faa..23ddfd8 100644 (file)
@@ -355,6 +355,10 @@ kqswnal_get_idle_tx (void)
 void
 kqswnal_tx_done_in_thread_context (kqswnal_tx_t *ktx)
 {
+        lnet_msg_t    *lnetmsg[2];
+        int            status[2];
+        int            nlnetmsg = 0;
+        
         LASSERT (!in_interrupt());
         
         if (ktx->ktx_status == -EHOSTDOWN)
@@ -364,20 +368,20 @@ kqswnal_tx_done_in_thread_context (kqswnal_tx_t *ktx)
         case KTX_RDMAING:          /* optimized GET/PUT handled */
         case KTX_PUTTING:          /* optimized PUT sent */
         case KTX_SENDING:          /* normal send */
-                lnet_finalize (kqswnal_data.kqn_ni,
-                               (lnet_msg_t *)ktx->ktx_args[1],
-                               ktx->ktx_status);
+                lnetmsg[0] = (lnet_msg_t *)ktx->ktx_args[1];
+                status[0] = ktx->ktx_status;
+                nlnetmsg = 1;
                 break;
 
         case KTX_GETTING:          /* optimized GET sent & REPLY received */
                 /* Complete the GET with success since we can't avoid
                  * delivering a REPLY event; we committed to it when we
                  * launched the GET */
-                lnet_finalize (kqswnal_data.kqn_ni, 
-                               (lnet_msg_t *)ktx->ktx_args[1], 0);
-                lnet_finalize (kqswnal_data.kqn_ni,
-                               (lnet_msg_t *)ktx->ktx_args[2],
-                               ktx->ktx_status);
+                lnetmsg[0] = (lnet_msg_t *)ktx->ktx_args[1];
+                status[0] = 0;
+                lnetmsg[1] = (lnet_msg_t *)ktx->ktx_args[2];
+                status[1] = ktx->ktx_status;
+                nlnetmsg = 2;
                 break;
 
         default:
@@ -385,6 +389,10 @@ kqswnal_tx_done_in_thread_context (kqswnal_tx_t *ktx)
         }
 
         kqswnal_put_idle_tx (ktx);
+
+        while (nlnetmsg-- > 0)
+                lnet_finalize (kqswnal_data.kqn_ni, 
+                               lnetmsg[nlnetmsg], status[nlnetmsg]);
 }
 
 void
@@ -978,20 +986,6 @@ kqswnal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
         /* payload is either all vaddrs or all pages */
         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
 
-        if (type == LNET_MSG_REPLY) {
-                kqswnal_rx_t *rx = (kqswnal_rx_t *)private;
-                
-                LASSERT (routing || rx != NULL);
-                
-                if (!routing && rx->krx_rpc_reply_needed) { /* is it an RPC */
-                        /* Must be a REPLY for an optimized GET */
-                        return kqswnal_rdma (
-                                rx, lntmsg, LNET_MSG_GET,
-                                payload_niov, payload_iov, payload_kiov, 
-                                payload_offset, payload_nob);
-                }
-        }
-
         if (kqswnal_nid2elanid (target.nid) < 0) {
                 CERROR("%s not in my cluster\n", libcfs_nid2str(target.nid));
                 return -EIO;
@@ -1147,8 +1141,12 @@ kqswnal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                target_is_router ? "(router)" : "", rc);
 
         if (rc != 0) {
-                if (ktx->ktx_state == KTX_GETTING &&
-                    ktx->ktx_args[2] != NULL) {
+                lnet_msg_t *repmsg = (lnet_msg_t *)ktx->ktx_args[2];
+                int         state = ktx->ktx_state;
+                
+                kqswnal_put_idle_tx (ktx);
+
+                if (state == KTX_GETTING && repmsg != NULL) {
                         /* We committed to reply, but there was a problem
                          * launching the GET.  We can't avoid delivering a
                          * REPLY event since we committed above, so we
@@ -1156,11 +1154,9 @@ kqswnal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                          * failed. */
                         rc = 0;
                         lnet_finalize (kqswnal_data.kqn_ni, lntmsg, 0);
-                        lnet_finalize (kqswnal_data.kqn_ni,
-                                       (lnet_msg_t *)ktx->ktx_args[2], -EIO);
+                        lnet_finalize (kqswnal_data.kqn_ni, repmsg, -EIO);
                 }
                 
-                kqswnal_put_idle_tx (ktx);
         }
         
         atomic_dec(&kqswnal_data.kqn_pending_txs);
@@ -1273,7 +1269,8 @@ kqswnal_parse (kqswnal_rx_t *krx)
 
         fromnid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ep_rxd_node(krx->krx_rxd));
         
-        rc = lnet_parse(ni, hdr, kqswnal_rx_nid(krx), krx);
+        rc = lnet_parse(ni, hdr, kqswnal_rx_nid(krx), krx,
+                        krx->krx_rpc_reply_needed);
         if (rc < 0) {
                 kqswnal_rx_decref(krx);
                 return;
@@ -1359,12 +1356,38 @@ kqswnal_recv (lnet_ni_t      *ni,
 
         /* NB hdr still in network byte order */
 
-        if (krx->krx_rpc_reply_needed &&
-            (hdrtype == LNET_MSG_PUT ||
-             hdrtype == LNET_MSG_REPLY)) {
-                /* This is an optimized PUT/REPLY */
-                rc = kqswnal_rdma(krx, lntmsg, hdrtype,
-                                  niov, iov, kiov, offset, mlen);
+        if (krx->krx_rpc_reply_needed) {
+                /* optimized (rdma) request sent as RPC */
+                switch (hdrtype) {
+                case LNET_MSG_PUT:
+                case LNET_MSG_REPLY:
+                        /* This is an optimized PUT/REPLY */
+                        rc = kqswnal_rdma(krx, lntmsg, hdrtype,
+                                          niov, iov, kiov, offset, mlen);
+                        break;
+
+                case LNET_MSG_GET:
+                        if (lntmsg == NULL) {
+                                /* No buffer match: my decref will complete the
+                                 * RPC with failure */
+                                rc = 0;
+                        } else {
+                                /* Matched something! */
+                                rc = kqswnal_rdma (krx, lntmsg, 
+                                                   LNET_MSG_GET,
+                                                   lntmsg->msg_niov, 
+                                                   lntmsg->msg_iov, 
+                                                   lntmsg->msg_kiov, 
+                                                   lntmsg->msg_offset, 
+                                                   lntmsg->msg_len);
+                        }
+                        break;
+
+                default:
+                        CERROR("Bad RPC type %d\n", hdrtype);
+                        rc = -EPROTO;
+                        break;
+                }
                 kqswnal_rx_decref(krx);
                 return rc;
         }
@@ -1502,11 +1525,12 @@ kqswnal_scheduler (void *arg)
                                          * there's nothing left to do */
                                         break;
                                 }
-                                rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
-                                                               kqswnal_data.kqn_shuttingdown == 2 ||
-                                                               !list_empty(&kqswnal_data.kqn_readyrxds) ||
-                                                               !list_empty(&kqswnal_data.kqn_donetxds) ||
-                                                               !list_empty(&kqswnal_data.kqn_delayedtxds));
+                                rc = wait_event_interruptible_exclusive (
+                                        kqswnal_data.kqn_sched_waitq,
+                                        kqswnal_data.kqn_shuttingdown == 2 ||
+                                        !list_empty(&kqswnal_data.kqn_readyrxds) ||
+                                        !list_empty(&kqswnal_data.kqn_donetxds) ||
+                                        !list_empty(&kqswnal_data.kqn_delayedtxds));
                                 LASSERT (rc == 0);
                         } else if (need_resched())
                                 schedule ();
index a3ebbf9..2356ac1 100644 (file)
@@ -368,6 +368,7 @@ kranal_unmap_buffer (kra_tx_t *tx)
 void
 kranal_tx_done (kra_tx_t *tx, int completion)
 {
+        lnet_msg_t      *lnetmsg[2];
         unsigned long    flags;
         int              i;
 
@@ -375,15 +376,8 @@ kranal_tx_done (kra_tx_t *tx, int completion)
 
         kranal_unmap_buffer(tx);
 
-        for (i = 0; i < 2; i++) {
-                /* tx may have up to 2 lntmsgs to finalise */
-                if (tx->tx_lntmsg[i] == NULL)
-                        continue;
-
-                lnet_finalize(kranal_data.kra_ni, tx->tx_lntmsg[i], 
-                              completion);
-                tx->tx_lntmsg[i] = NULL;
-        }
+        lnetmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
+        lnetmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
 
         tx->tx_buftype = RANAL_BUF_NONE;
         tx->tx_msg.ram_type = RANAL_MSG_NONE;
@@ -394,6 +388,14 @@ kranal_tx_done (kra_tx_t *tx, int completion)
         list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
 
         spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
+
+        /* finalize AFTER freeing lnet msgs */
+        for (i = 0; i < 2; i++) {
+                if (lnetmsg[i] == NULL)
+                        continue;
+
+                lnet_finalize(kranal_data.kra_ni, lnetmsg[i], completion);
+        }
 }
 
 kra_conn_t *
@@ -598,7 +600,6 @@ kranal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
         lnet_kiov_t      *kiov = lntmsg->msg_kiov;
         unsigned int      offset = lntmsg->msg_offset;
         unsigned int      nob = lntmsg->msg_len;
-        kra_conn_t       *conn;
         kra_tx_t         *tx;
         int               rc;
 
@@ -676,53 +677,6 @@ kranal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                 return 0;
 
         case LNET_MSG_REPLY:
-                /* reply's 'private' is the conn that received the GET_REQ */
-                conn = private;
-
-                LASSERT (routing || conn != NULL);
-                
-                LASSERT (conn->rac_rxmsg != NULL);
-
-                if (!routing && conn->rac_rxmsg->ram_type != RANAL_MSG_IMMEDIATE) {
-                        /* Incoming message consistent with RDMA? */
-                        if (conn->rac_rxmsg->ram_type != RANAL_MSG_GET_REQ) {
-                                CERROR("REPLY to %s bad msg type %x!!!\n",
-                                       libcfs_nid2str(target.nid), 
-                                       conn->rac_rxmsg->ram_type);
-                                return -EIO;
-                        }
-
-                        tx = kranal_get_idle_tx();
-                        if (tx == NULL)
-                                return -EIO;
-
-                        rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, 
-                                                      offset, nob);
-                        if (rc != 0) {
-                                kranal_tx_done(tx, rc);
-                                return -EIO;
-                        }
-
-                        tx->tx_conn = conn;
-                        tx->tx_lntmsg[0] = lntmsg;
-
-                        rc = kranal_map_buffer(tx);
-                        if (rc != 0) {
-                                kranal_tx_done(tx, rc);
-                                return -EIO;
-                        }
-
-                        kranal_rdma(tx, RANAL_MSG_GET_DONE,
-                                    &conn->rac_rxmsg->ram_u.get.ragm_desc, nob,
-                                    conn->rac_rxmsg->ram_u.get.ragm_cookie);
-
-                        /* flag matched by consuming rx message */
-                        kranal_consume_rxmsg(conn, NULL, 0);
-                        return 0;
-                }
-
-                /* Fall through and handle like PUT */
-
         case LNET_MSG_PUT:
                 if (kiov == NULL &&             /* not paged */
                     nob <= RANAL_FMA_MAX_DATA && /* small enough */
@@ -767,6 +721,44 @@ kranal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
         return 0;
 }
 
+void
+kranal_reply(lnet_ni_t *ni, kra_conn_t *conn, lnet_msg_t *lntmsg)
+{
+        kra_msg_t     *rxmsg = conn->rac_rxmsg;
+        unsigned int   niov = lntmsg->msg_niov;
+        struct iovec  *iov = lntmsg->msg_iov;
+        lnet_kiov_t   *kiov = lntmsg->msg_kiov;
+        unsigned int   offset = lntmsg->msg_offset;
+        unsigned int   nob = lntmsg->msg_len;
+        kra_tx_t      *tx;
+        int            rc;
+
+        tx = kranal_get_idle_tx();
+        if (tx == NULL)
+                goto failed_0;
+
+        rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
+        if (rc != 0)
+                goto failed_1;
+
+        tx->tx_conn = conn;
+        tx->tx_lntmsg[0] = lntmsg;
+
+        rc = kranal_map_buffer(tx);
+        if (rc != 0)
+                goto failed_1;
+
+        kranal_rdma(tx, RANAL_MSG_GET_DONE,
+                    &rxmsg->ram_u.get.ragm_desc, nob,
+                    rxmsg->ram_u.get.ragm_cookie);
+        return;
+
+ failed_1:
+        kranal_tx_done(tx, -EIO);
+ failed_0:
+        lnet_finalize(ni, lntmsg, -EIO);
+}
+
 int
 kranal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
              int delayed, unsigned int niov, 
@@ -786,13 +778,6 @@ kranal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
 
         CDEBUG(D_NET, "conn %p, rxmsg %p, lntmsg %p\n", conn, rxmsg, lntmsg);
 
-        if (rxmsg == NULL) {
-                /* already consumed: must have been a GET_REQ that matched */
-                LASSERT (mlen == 0);
-                LASSERT (lntmsg == NULL);       /* no need to finalise */
-                return 0;
-        }
-        
         switch(rxmsg->ram_type) {
         default:
                 LBUG();
@@ -858,14 +843,18 @@ kranal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
                 return 0;
 
         case RANAL_MSG_GET_REQ:
-                /* This one didn't match anything */
-                tx = kranal_new_tx_msg(RANAL_MSG_GET_NAK);
-                if (tx != NULL) {
-                        tx->tx_msg.ram_u.completion.racm_cookie = 
-                                rxmsg->ram_u.get.ragm_cookie;
-                        kranal_post_fma(conn, tx);
+                if (lntmsg != NULL) {
+                        /* Matched! */
+                        kranal_reply(ni, conn, lntmsg);
+                } else {
+                        /* No match */
+                        tx = kranal_new_tx_msg(RANAL_MSG_GET_NAK);
+                        if (tx != NULL) {
+                                tx->tx_msg.ram_u.completion.racm_cookie = 
+                                        rxmsg->ram_u.get.ragm_cookie;
+                                kranal_post_fma(conn, tx);
+                        }
                 }
-                LASSERT (lntmsg == NULL);       /* no need to finalise */
                 kranal_consume_rxmsg(conn, NULL, 0);
                 return 0;
         }
@@ -1087,7 +1076,7 @@ kranal_connd (void *arg)
                         continue;
 
                 set_current_state(TASK_INTERRUPTIBLE);
-                add_wait_queue(&kranal_data.kra_connd_waitq, &wait);
+                add_wait_queue_exclusive(&kranal_data.kra_connd_waitq, &wait);
 
                 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
 
@@ -1701,7 +1690,11 @@ kranal_check_fma_rx (kra_conn_t *conn)
         if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
                 /* This message signals RDMA completion... */
                 rrc = RapkFmaSyncWait(conn->rac_rihandle);
-                LASSERT (rrc == RAP_SUCCESS);
+                if (rrc != RAP_SUCCESS) {
+                        CERROR("RapkFmaSyncWait failed: %d\n", rrc);
+                        rc = -ENETDOWN;
+                        goto out;
+                }
         }
 
         if (conn->rac_close_recvd) {
@@ -1738,14 +1731,14 @@ kranal_check_fma_rx (kra_conn_t *conn)
         case RANAL_MSG_IMMEDIATE:
                 CDEBUG(D_NET, "RX IMMEDIATE on %p\n", conn);
                 rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.immediate.raim_hdr, 
-                                msg->ram_srcnid, conn);
+                                msg->ram_srcnid, conn, 0);
                 repost = rc < 0;
                 break;
 
         case RANAL_MSG_PUT_REQ:
                 CDEBUG(D_NET, "RX PUT_REQ on %p\n", conn);
                 rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.putreq.raprm_hdr, 
-                                msg->ram_srcnid, conn);
+                                msg->ram_srcnid, conn, 1);
                 repost = rc < 0;
                 break;
 
@@ -1789,7 +1782,7 @@ kranal_check_fma_rx (kra_conn_t *conn)
         case RANAL_MSG_GET_REQ:
                 CDEBUG(D_NET, "RX GET_REQ on %p\n", conn);
                 rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.get.ragm_hdr, 
-                                msg->ram_srcnid, conn);
+                                msg->ram_srcnid, conn, 1);
                 repost = rc < 0;
                 break;
 
@@ -1825,10 +1818,10 @@ kranal_check_fma_rx (kra_conn_t *conn)
                 break;
         }
 
-        if (rc < 0)        /* lnet_parse() detected an error */
+ out:
+        if (rc < 0)                             /* protocol/comms error */
                 kranal_close_conn (conn, rc);
 
- out:
         if (repost && conn->rac_rxmsg != NULL)
                 kranal_consume_rxmsg(conn, NULL, 0);
 
@@ -2005,7 +1998,7 @@ kranal_scheduler (void *arg)
                         continue;
 
                 set_current_state(TASK_INTERRUPTIBLE);
-                add_wait_queue(&dev->rad_waitq, &wait);
+                add_wait_queue_exclusive(&dev->rad_waitq, &wait);
                 spin_unlock_irqrestore(&dev->rad_lock, flags);
 
                 if (nsoonest == 0) {
index 169a8e7..f6539fb 100644 (file)
@@ -359,6 +359,8 @@ ksocknal_zc_callback (zccd_t *zcd)
 void
 ksocknal_tx_done (lnet_ni_t *ni, ksock_tx_t *tx, int asynch)
 {
+        lnet_msg_t  *lnetmsg = tx->tx_lnetmsg;
+        int          rc = (tx->tx_resid == 0) ? 0 : -EIO;
         ENTRY;
 
         if (tx->tx_conn != NULL) {
@@ -373,8 +375,9 @@ ksocknal_tx_done (lnet_ni_t *ni, ksock_tx_t *tx, int asynch)
 #endif
         }
 
-        lnet_finalize (ni, tx->tx_lnetmsg, (tx->tx_resid == 0) ? 0 : -EIO);
         ksocknal_free_tx (tx);
+        lnet_finalize (ni, lnetmsg, rc);
+
         EXIT;
 }
 
@@ -1013,7 +1016,7 @@ ksocknal_process_receive (ksock_conn_t *conn)
                 ksocknal_conn_addref(conn);     /* ++ref while parsing */
                 
                 rc = lnet_parse(conn->ksnc_peer->ksnp_ni, &conn->ksnc_hdr, 
-                                conn->ksnc_peer->ksnp_id.nid, conn);
+                                conn->ksnc_peer->ksnp_id.nid, conn, 0);
                 if (rc < 0) {
                         /* I just received garbage: give up on this conn */
                         ksocknal_new_packet(conn, 0);
@@ -1279,8 +1282,9 @@ int ksocknal_scheduler (void *arg)
                         nloops = 0;
 
                         if (!did_something) {   /* wait for something to do */
-                                rc = wait_event_interruptible (sched->kss_waitq,
-                                                               !ksocknal_sched_cansleep(sched));
+                                rc = wait_event_interruptible_exclusive(
+                                        sched->kss_waitq,
+                                        !ksocknal_sched_cansleep(sched));
                                 LASSERT (rc == 0);
                         } else
                                our_cond_resched();
@@ -1799,10 +1803,11 @@ ksocknal_connd (void *arg)
                 spin_unlock_irqrestore(&ksocknal_data.ksnd_connd_lock,
                                        flags);
 
-                rc = wait_event_interruptible(ksocknal_data.ksnd_connd_waitq,
-                                              ksocknal_data.ksnd_shuttingdown ||
-                                              !list_empty(&ksocknal_data.ksnd_connd_connreqs) ||
-                                              !list_empty(&ksocknal_data.ksnd_connd_routes));
+                rc = wait_event_interruptible_exclusive(
+                        ksocknal_data.ksnd_connd_waitq,
+                        ksocknal_data.ksnd_shuttingdown ||
+                        !list_empty(&ksocknal_data.ksnd_connd_connreqs) ||
+                        !list_empty(&ksocknal_data.ksnd_connd_routes));
 
                 spin_lock_irqsave(&ksocknal_data.ksnd_connd_lock, flags);
         }
index f379b46..6a5aadd 100644 (file)
 #define IBNAL_PEER_HASH_SIZE         101        /* # peer lists */
 #define IBNAL_RESCHED                100        /* # scheduler loops before reschedule */
 #define IBNAL_MSG_QUEUE_SIZE         8          /* # messages/RDMAs in-flight */
-#define IBNAL_CREDIT_HIGHWATER       7          /* when to eagerly return credits */
+#define IBNAL_CREDIT_HIGHWATER       7          /* when eagerly to return credits */
 #define IBNAL_MSG_SIZE              (4<<10)     /* max size of queued messages (inc hdr) */
 
 /* constants derived from sdp-connection.c */
@@ -277,7 +277,6 @@ typedef struct kib_rx                           /* receive message */
 {
         struct list_head          rx_list;      /* queue for attention */
         struct kib_conn          *rx_conn;      /* owning conn */
-        int                       rx_responded; /* responded to peer? */
         int                       rx_nob;       /* # bytes received (-1 while posted) */
         vv_l_key_t                rx_lkey;      /* local key */
         kib_msg_t                *rx_msg;       /* pre-mapped buffer (host vaddr) */
@@ -346,7 +345,6 @@ typedef struct kib_conn
         __u64               ibc_rxseq;          /* rx sequence number */
         atomic_t            ibc_refcount;       /* # users */
         int                 ibc_state;          /* what's happening */
-        atomic_t            ibc_nob;            /* # bytes buffered */
         int                 ibc_nsends_posted;  /* # uncompleted sends */
         int                 ibc_credits;        /* # credits I have */
         int                 ibc_outstanding_credits; /* # credits to return */
@@ -504,8 +502,6 @@ kibnal_peer_active (kib_peer_t *peer)
 static inline void
 kibnal_queue_tx_locked (kib_tx_t *tx, kib_conn_t *conn)
 {
-        /* CAVEAT EMPTOR: tx takes caller's ref on conn */
-
         LASSERT (tx->tx_nwrq > 0);              /* work items set up */
         LASSERT (!tx->tx_queued);               /* not queued for sending already */
 
index 9fa2a8b..216fbe1 100644 (file)
@@ -27,8 +27,9 @@
 void
 kibnal_tx_done (kib_tx_t *tx)
 {
-        int        rc = tx->tx_status;
-        int        i;
+        lnet_msg_t *lntmsg[2];
+        int         rc = tx->tx_status;
+        int         i;
 
         LASSERT (!in_interrupt());
         LASSERT (!tx->tx_queued);               /* mustn't be queued for sending */
@@ -51,14 +52,10 @@ kibnal_tx_done (kib_tx_t *tx)
         }
         tx->tx_md.md_active = 0;
 #endif
-        for (i = 0; i < 2; i++) {
-                /* tx may have up to 2 ptlmsgs to finalise */
-                if (tx->tx_lntmsg[i] == NULL)
-                        continue;
 
-                lnet_finalize (kibnal_data.kib_ni, tx->tx_lntmsg[i], rc);
-                tx->tx_lntmsg[i] = NULL;
-        }
+        /* tx may have up to 2 lnet msgs to finalise */
+        lntmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
+        lntmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
         
         if (tx->tx_conn != NULL) {
                 kibnal_conn_decref(tx->tx_conn);
@@ -73,6 +70,14 @@ kibnal_tx_done (kib_tx_t *tx)
         list_add (&tx->tx_list, &kibnal_data.kib_idle_txs);
 
         spin_unlock(&kibnal_data.kib_tx_lock);
+
+        /* delay finalize until my descs have been freed */
+        for (i = 0; i < 2; i++) {
+                if (lntmsg[i] == NULL)
+                        continue;
+
+                lnet_finalize (kibnal_data.kib_ni, lntmsg[i], rc);
+        }
 }
 
 kib_tx_t *
@@ -307,9 +312,6 @@ kibnal_handle_rx (kib_rx_t *rx)
                 kibnal_check_sends(conn);
         }
 
-        /* clear flag so GET_REQ can see if it caused a REPLY */
-        rx->rx_responded = 0;
-
         switch (msg->ibm_type) {
         default:
                 CERROR("Bad IBNAL message type %x from %s\n",
@@ -322,13 +324,13 @@ kibnal_handle_rx (kib_rx_t *rx)
 
         case IBNAL_MSG_IMMEDIATE:
                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.immediate.ibim_hdr,
-                                msg->ibm_srcnid, rx);
+                                msg->ibm_srcnid, rx, 0);
                 repost = rc < 0;                /* repost on error */
                 break;
                 
         case IBNAL_MSG_PUT_REQ:
                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.putreq.ibprm_hdr,
-                                msg->ibm_srcnid, rx);
+                                msg->ibm_srcnid, rx, 1);
                 repost = rc < 0;                /* repost on error */
                 break;
 
@@ -385,7 +387,7 @@ kibnal_handle_rx (kib_rx_t *rx)
 
         case IBNAL_MSG_GET_REQ:
                 rc = lnet_parse(kibnal_data.kib_ni, &msg->ibm_u.get.ibgm_hdr,
-                                msg->ibm_srcnid, rx);
+                                msg->ibm_srcnid, rx, 1);
                 repost = rc < 0;                /* repost on error */
                 break;
 
@@ -1453,73 +1455,7 @@ kibnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                 kibnal_launch_tx(tx, target.nid);
                 return 0;
 
-        case LNET_MSG_REPLY: {
-                /* reply's 'private' is the incoming receive */
-                kib_rx_t *rx = private;
-
-                LASSERT(routing || rx != NULL);
-
-                if (!routing && rx->rx_msg->ibm_type != IBNAL_MSG_IMMEDIATE) {
-                        /* Incoming message consistent with RDMA? */
-                        if (rx->rx_msg->ibm_type != IBNAL_MSG_GET_REQ) {
-                                CERROR("REPLY to %s bad msg type %x!!!\n",
-                                       libcfs_nid2str(target.nid), 
-                                       rx->rx_msg->ibm_type);
-                                return -EIO;
-                        }
-
-                        /* NB handle_rx() will send GET_NAK when I return to
-                         * it from here, unless I set rx_responded! */
-
-                        tx = kibnal_get_idle_tx();
-                        if (tx == NULL) {
-                                CERROR("Can't get tx for REPLY to %s\n",
-                                       libcfs_nid2str(target.nid));
-                                return -ENOMEM;
-                        }
-
-                        if (payload_nob == 0)
-                                rc = 0;
-                        else if (payload_kiov == NULL)
-                                rc = kibnal_setup_rd_iov(
-                                        tx, tx->tx_rd, 0, 
-                                        payload_niov, payload_iov, 
-                                        payload_offset, payload_nob);
-                        else
-                                rc = kibnal_setup_rd_kiov(
-                                        tx, tx->tx_rd, 0,
-                                        payload_niov, payload_kiov,
-                                        payload_offset, payload_nob);
-                        if (rc != 0) {
-                                CERROR("Can't setup GET src for %s: %d\n",
-                                       libcfs_nid2str(target.nid), rc);
-                                kibnal_tx_done(tx);
-                                return -EIO;
-                        }
-                
-                        rc = kibnal_init_rdma(tx, IBNAL_MSG_GET_DONE, 
-                                              payload_nob,
-                                              &rx->rx_msg->ibm_u.get.ibgm_rd,
-                                              rx->rx_msg->ibm_u.get.ibgm_cookie);
-                        if (rc < 0) {
-                                CERROR("Can't setup rdma for GET from %s: %d\n", 
-                                       libcfs_nid2str(target.nid), rc);
-                        } else if (rc == 0) {
-                                /* No RDMA: local completion may happen now! */
-                                lnet_finalize (kibnal_data.kib_ni, lntmsg, 0);
-                        } else {
-                                /* RDMA: lnet_finalize(lntmsg) when it
-                                 * completes */
-                                tx->tx_lntmsg[0] = lntmsg;
-                        }
-
-                        kibnal_queue_tx(tx, rx->rx_conn);
-                        rx->rx_responded = 1;
-                        return (rc >= 0) ? 0 : -EIO;
-                }
-                /* fall through to handle like PUT */
-        }
-
+        case LNET_MSG_REPLY:
         case LNET_MSG_PUT:
                 /* Is the payload small enough not to need RDMA? */
                 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
@@ -1594,6 +1530,67 @@ kibnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
         return 0;
 }
 
+void
+kibnal_reply (lnet_ni_t *ni, kib_rx_t *rx, lnet_msg_t *lntmsg)
+{
+        lnet_process_id_t target = lntmsg->msg_target;
+        unsigned int      niov = lntmsg->msg_niov; 
+        struct iovec     *iov = lntmsg->msg_iov; 
+        lnet_kiov_t      *kiov = lntmsg->msg_kiov;
+        unsigned int      offset = lntmsg->msg_offset;
+        unsigned int      nob = lntmsg->msg_len;
+        kib_tx_t         *tx;
+        int               rc;
+        
+        tx = kibnal_get_idle_tx();
+        if (tx == NULL) {
+                CERROR("Can't get tx for REPLY to %s\n",
+                       libcfs_nid2str(target.nid));
+                goto failed_0;
+        }
+
+        if (nob == 0)
+                rc = 0;
+        else if (kiov == NULL)
+                rc = kibnal_setup_rd_iov(tx, tx->tx_rd, 0, 
+                                         niov, iov, offset, nob);
+        else
+                rc = kibnal_setup_rd_kiov(tx, tx->tx_rd, 0,
+                                          niov, kiov, offset, nob);
+
+        if (rc != 0) {
+                CERROR("Can't setup GET src for %s: %d\n",
+                       libcfs_nid2str(target.nid), rc);
+                goto failed_1;
+        }
+        
+        rc = kibnal_init_rdma(tx, IBNAL_MSG_GET_DONE, nob,
+                              &rx->rx_msg->ibm_u.get.ibgm_rd,
+                              rx->rx_msg->ibm_u.get.ibgm_cookie);
+        if (rc < 0) {
+                CERROR("Can't setup rdma for GET from %s: %d\n", 
+                       libcfs_nid2str(target.nid), rc);
+                goto failed_1;
+        }
+        
+        if (rc == 0) {
+                /* No RDMA: local completion may happen now! */
+                lnet_finalize(ni, lntmsg, 0);
+        } else {
+                /* RDMA: lnet_finalize(lntmsg) when it
+                 * completes */
+                tx->tx_lntmsg[0] = lntmsg;
+        }
+        
+        kibnal_queue_tx(tx, rx->rx_conn);
+        return;
+        
+ failed_1:
+        kibnal_tx_done(tx);
+ failed_0:
+        lnet_finalize(ni, lntmsg, -EIO);
+}
+
 int
 kibnal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
              unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
@@ -1696,8 +1693,10 @@ kibnal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
                 break;
 
         case IBNAL_MSG_GET_REQ:
-                LASSERT (lntmsg == NULL);       /* no need to finalise */
-                if (!rx->rx_responded) {
+                if (lntmsg != NULL) {
+                        /* Optimized GET; RDMA lntmsg's payload */
+                        kibnal_reply(ni, rx, lntmsg);
+                } else {
                         /* GET didn't match anything */
                         kibnal_send_completion(rx->rx_conn, IBNAL_MSG_GET_DONE, 
                                                -ENODATA,
@@ -3370,7 +3369,7 @@ kibnal_scheduler(void *arg)
                 /* Nothing to do; sleep... */
 
                 set_current_state(TASK_INTERRUPTIBLE);
-                add_wait_queue(&kibnal_data.kib_sched_waitq, &wait);
+                add_wait_queue_exclusive(&kibnal_data.kib_sched_waitq, &wait);
                 spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
                                        flags);
 
index ba382c6..53ec89e 100644 (file)
@@ -228,14 +228,26 @@ void libcfs_debug_dumplog(void)
 
 
 int libcfs_debug_init(unsigned long bufsize)
-{ 
+{
+        /* if (getenv("LIBLUSTRE_DEBUG_BASENAME")) {
+                struct FILE *newfile;
+                sprintf(filename, "%s-%s-%lu.log",
+                        getenv("LIBLUSTRE_DEBUG_BASENAME", uname(), time(0));
+                newfile = fopen(filename, "w");
+                if (newfile != NULL)
+                        debug_file_fd = newfile;
+        } else
+         */
         debug_file_fd = stdout;
+
         return 0;
 }
 
 int libcfs_debug_cleanup(void)
 {
-        return 0; //close(portals_debug_fd);
+        if (debug_file_fd != stdout)
+                fclose(debug_file_fd);
+        return 0;
 }
 
 int libcfs_debug_clear_buffer(void)
@@ -253,39 +265,25 @@ int libcfs_debug_mark_buffer(char *text)
         return 0;
 }
 
-/* FIXME: I'm not very smart; someone smarter should make this better. */
 void
-libcfs_debug_msg (int subsys, int mask, char *file, const char *fn, 
+libcfs_debug_msg (int subsys, int mask, char *file, const char *fn,
                   const int line, unsigned long stack, char *format, ...)
 {
         va_list       ap;
-        unsigned long flags;
         struct timeval tv;
         int nob;
 
-
-        /* NB since we pass a non-zero sized buffer (at least) on the first
-         * print, we can be assured that by the end of all the snprinting,
-         * we _do_ have a terminated buffer, even if our message got truncated.
-         */
+        if (debug_file_fd == NULL)
+                return;
 
         gettimeofday(&tv, NULL);
 
-        nob += fprintf(debug_file_fd,
-                              "%02x:%06x:%d:%lu.%06lu ",
-                              subsys >> 24, mask, smp_processor_id,
-                              tv.tv_sec, tv.tv_usec);
-
-        nob += fprintf(debug_file_fd,
-                            "(%s:%d:%s() %d+%ld): ",
-                            file, line, fn, 0,
-                            8192 - ((unsigned long)&flags & 8191UL));
+        nob += fprintf(debug_file_fd, "%lu.%06lu:(%s:%d:%s()): ",
+                       tv.tv_sec, tv.tv_usec, file, line, fn);
 
         va_start (ap, format);
-        nob += fprintf(debug_file_fd, format, ap);
+        nob += vfprintf(debug_file_fd, format, ap);
         va_end (ap);
-
-
 }
 
 void
index 0c9856a..d3c8f7a 100644 (file)
@@ -118,26 +118,26 @@ void libcfs_run_lbug_upcall(char *file, const char *fn, const int line)
 
 void libcfs_debug_dumpstack(struct task_struct *tsk)
 {
-#if defined(__arch_um__) 
-        if (tsk != NULL) 
-                CWARN("stack dump for pid %d (%d) requested; wake up gdb.\n", 
-                      tsk->pid, UML_PID(tsk)); 
-        asm("int $3");
-#elif defined(HAVE_SHOW_TASK) 
-        /* this is exported by lustre kernel version 42 */ 
-        extern void show_task(struct task_struct *); 
-
-        if (tsk == NULL) 
-                tsk = current; 
-        CWARN("showing stack for process %d\n", tsk->pid); 
-        show_task(tsk); 
-#else 
+#if defined(__arch_um__)
+        if (tsk != NULL)
+                CWARN("stack dump for pid %d (%d) requested; wake up gdb.\n",
+                      tsk->pid, UML_PID(tsk));
+        //asm("int $3");
+#elif defined(HAVE_SHOW_TASK)
+        /* this is exported by lustre kernel version 42 */
+        extern void show_task(struct task_struct *);
+
+        if (tsk == NULL)
+                tsk = current;
+        CWARN("showing stack for process %d\n", tsk->pid);
+        show_task(tsk);
+#else
         CWARN("can't show stack: kernel doesn't export show_task\n");
 #endif
 }
 
 cfs_task_t *libcfs_current(void)
-{ 
+{
         CWARN("current task struct is %p\n", current);
         return current;
 }
index 612d306..e72e047 100644 (file)
@@ -358,7 +358,7 @@ libcfs_nid2str(lnet_nid_t nid)
         if (nid == LNET_NID_ANY)
                 return "LNET_NID_ANY";
 
-        nf = libcfs_lnd2netstrfns(LNET_NETTYP(net));
+        nf = libcfs_lnd2netstrfns(lnd);
         str = libcfs_next_nidstring();
 
         if (nf == NULL)
index 91e7f14..2b0905e 100644 (file)
@@ -1033,7 +1033,7 @@ lnet_startup_lndnis (void)
                         the_lnet.ln_loni = ni;
                         continue;
                 }
-                
+
 #ifndef __KERNEL__
                 if (lnd->lnd_wait != NULL) {
                         if (the_lnet.ln_eqwaitni == NULL) {
@@ -1079,7 +1079,7 @@ lnet_startup_lndnis (void)
         }
 
         return 0;
-        
+
  failed:
         lnet_shutdown_lndnis();
 
@@ -1088,7 +1088,7 @@ lnet_startup_lndnis (void)
                 list_del(&ni->ni_list);
                 LIBCFS_FREE(ni, sizeof(*ni));
         }
-        
+
         return -ENETDOWN;
 }
 
@@ -1105,7 +1105,7 @@ LNetInit(void)
         rc = lnet_get_portals_compatibility();
         if (rc < 0)
                 return rc;
-        
+
         lnet_init_locks();
         CFS_INIT_LIST_HEAD(&the_lnet.ln_lnds);
         the_lnet.ln_ptlcompat = rc;
@@ -1165,7 +1165,7 @@ LNetNIInit(lnet_pid_t requested_pid)
                 rc = -ENETDOWN;
                 goto failed0;
         }
-        
+
         rc = lnet_prepare(requested_pid);
         if (rc != 0)
                 goto failed0;
@@ -1173,7 +1173,7 @@ LNetNIInit(lnet_pid_t requested_pid)
         rc = lnet_startup_lndnis();
         if (rc != 0)
                 goto failed1;
-        
+
         rc = lnet_parse_routes(lnet_get_routes(), &im_a_router);
         if (rc != 0)
                 goto failed2;
index e872ddf..f8986a5 100644 (file)
@@ -157,7 +157,7 @@ lnet_parse_networks(struct list_head *nilist, char *networks)
         char      *str;
         lnet_ni_t *ni;
         __u32      net;
-        int        count = 0;
+        int        nnets = 0;
 
        if (strlen(networks) > LNET_SINGLE_TEXTBUF_NOB) {
                /* _WAY_ conservative */
@@ -222,12 +222,14 @@ lnet_parse_networks(struct list_head *nilist, char *networks)
                         goto failed;
                 } 
 
-                if (count++ > 0) {
+                if (nnets > 0 &&
+                    the_lnet.ln_ptlcompat > 0) {
                         LCONSOLE_ERROR("Only 1 network supported when "
                                        "'portals_compatible' is set\n");
                         goto failed;
                 }
 
+                nnets++;
                 ni = lnet_new_ni(net, nilist);
                 if (ni == NULL)
                         goto failed;
index a418824..22d49cf 100644 (file)
@@ -712,8 +712,8 @@ lnet_ni_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed,
         LASSERT (mlen == 0 || msg != NULL);
         
         if (msg != NULL) {
-                LASSERT(!msg->msg_recvaftersend);
                 LASSERT(msg->msg_receiving);
+                LASSERT(!msg->msg_sending);
                 msg->msg_receiving = 0;
 
                 if (mlen != 0) {
@@ -796,23 +796,9 @@ void
 lnet_ni_send(lnet_ni_t *ni, lnet_msg_t *msg) 
 {
         void   *priv = msg->msg_private;
-        int     recv = msg->msg_recvaftersend;
-        int     delayed = msg->msg_delayed;
         int     rc;
 
         LASSERT (!in_interrupt ());
-
-        /* On GET, call lnet_ni_recv() right after the send. The recv gets
-         * delayed until after the send to ensure the LND still has any RDMA
-         * descriptors associated with the incoming GET when lnd_send() calls
-         * in with the REPLY.  Note that if we actually had to pass 'msg' in to
-         * lnet_ni_recv() here, we'd be forking it (i.e. it would have 2 separate
-         * existances and we'd have to refcount it) */
-
-        LASSERT (!recv == !msg->msg_receiving);
-        msg->msg_recvaftersend = 0;
-        msg->msg_receiving = 0;
-
         LASSERT (LNET_NETTYP(LNET_NIDNET(ni->ni_nid)) == LOLND ||
                  (msg->msg_txcredit && msg->msg_peertxcredit));
 
@@ -820,9 +806,6 @@ lnet_ni_send(lnet_ni_t *ni, lnet_msg_t *msg)
         LASSERT(!in_interrupt());
         if (rc < 0)
                 lnet_finalize(ni, msg, rc);
-        
-        if (recv)
-                lnet_ni_recv(ni, priv, NULL, delayed, 0, 0, 0);
 }
 
 int
@@ -835,20 +818,13 @@ lnet_eager_recv_locked(lnet_msg_t *msg)
         LASSERT (!msg->msg_delayed);
         msg->msg_delayed = 1;
 
-        /* I might have to do an eager receive since I'm blocking */
-        if (!msg->msg_receiving)
-                return 0;
+        LASSERT (msg->msg_receiving);
+        LASSERT (msg->msg_routing);
+        LASSERT (!msg->msg_sending);
         
-        if (msg->msg_routing) {
-                peer = msg->msg_rxpeer;
-                LASSERT (!msg->msg_sending);
-        } else {
-                peer = msg->msg_txpeer;
-                LASSERT (msg->msg_recvaftersend);
-                LASSERT (msg->msg_type == LNET_MSG_REPLY);
-        }
-
+        peer = msg->msg_rxpeer;
         ni = peer->lp_ni;
+
         if (ni->ni_lnd->lnd_eager_recv != NULL) {
                 LNET_UNLOCK();
                         
@@ -877,18 +853,22 @@ lnet_post_send_locked (lnet_msg_t *msg, int do_send)
         lnet_peer_t *lp = msg->msg_txpeer;
         lnet_ni_t   *ni = lp->lp_ni;
 
+        /* non-lnet_send() callers have checked before */
+        LASSERT (!do_send || msg->msg_delayed);
+        LASSERT (!msg->msg_receiving);
+
         if (!msg->msg_peertxcredit) {
                 LASSERT ((lp->lp_txcredits < 0) == !list_empty(&lp->lp_txq));
 
                 msg->msg_peertxcredit = 1;
                 lp->lp_txqnob += msg->msg_len + sizeof(lnet_hdr_t);
                 lp->lp_txcredits--;
+
                 if (lp->lp_txcredits < lp->lp_mintxcredits)
                         lp->lp_mintxcredits = lp->lp_txcredits;
 
                 if (lp->lp_txcredits < 0) {
-                        /* must have checked eager_recv before here */
-                        LASSERT (msg->msg_delayed);
+                        msg->msg_delayed = 1;
                         list_add_tail (&msg->msg_list, &lp->lp_txq);
                         return EAGAIN;
                 }
@@ -904,8 +884,7 @@ lnet_post_send_locked (lnet_msg_t *msg, int do_send)
                         ni->ni_mintxcredits = ni->ni_txcredits;
 
                 if (ni->ni_txcredits < 0) {
-                        /* must have checkd eager_recv before here */
-                        LASSERT (msg->msg_delayed);
+                        msg->msg_delayed = 1;
                         list_add_tail (&msg->msg_list, &ni->ni_txq);
                         return EAGAIN;
                 }
@@ -913,8 +892,6 @@ lnet_post_send_locked (lnet_msg_t *msg, int do_send)
 
         if (do_send) {
                 LNET_UNLOCK();
-                /* non-lnet_send() callers always send delayed */
-                LASSERT (msg->msg_delayed);
                 lnet_ni_send(ni, msg);
                 LNET_LOCK();
         }
@@ -959,7 +936,7 @@ lnet_msg2bufpool(lnet_msg_t *msg)
 int
 lnet_post_routed_recv_locked (lnet_msg_t *msg, int do_recv)
 {
-        /* lnet_route is going to LNET_UNLOCK immediately after this, so it
+        /* lnet_parse is going to LNET_UNLOCK immediately after this, so it
          * sets do_recv FALSE and I don't do the unlock/send/lock bit.  I
          * return EAGAIN if msg blocked and 0 if sent or OK to send */
         lnet_peer_t         *lp = msg->msg_rxpeer;
@@ -971,6 +948,10 @@ lnet_post_routed_recv_locked (lnet_msg_t *msg, int do_recv)
         LASSERT (msg->msg_niov == 0);
         LASSERT (msg->msg_routing);
         LASSERT (msg->msg_receiving);
+        LASSERT (!msg->msg_sending);
+
+        /* non-lnet_parse callers only send delayed messages */
+        LASSERT (!do_recv || msg->msg_delayed);
 
         if (!msg->msg_peerrtrcredit) {
                 LASSERT ((lp->lp_rtrcredits < 0) == !list_empty(&lp->lp_rtrq));
@@ -1015,8 +996,6 @@ lnet_post_routed_recv_locked (lnet_msg_t *msg, int do_recv)
 
         if (do_recv) {
                 LNET_UNLOCK();
-                /* non-lnet_route() callers always send delayed  */
-                LASSERT (msg->msg_delayed);
                 lnet_ni_recv(lp->lp_ni, msg->msg_private, msg, 1,
                              0, msg->msg_len, msg->msg_len);
                 LNET_LOCK();
@@ -1107,8 +1086,6 @@ lnet_return_credits_locked (lnet_msg_t *msg)
                                           lnet_msg_t, msg_list);
                         list_del(&msg2->msg_list);
                         
-                        LASSERT (msg2->msg_delayed);
-
                         (void) lnet_post_routed_recv_locked(msg2, 1);
                 }
         }
@@ -1125,8 +1102,6 @@ lnet_return_credits_locked (lnet_msg_t *msg)
                                           lnet_msg_t, msg_list);
                         list_del(&msg2->msg_list);
                         
-                        LASSERT (msg2->msg_delayed);
-                        
                         (void) lnet_post_routed_recv_locked(msg2, 1);
                 }
         }
@@ -1159,6 +1134,7 @@ lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg)
         LASSERT (msg->msg_txpeer == NULL);
         LASSERT (!msg->msg_sending);
         LASSERT (!msg->msg_target_is_router);
+        LASSERT (!msg->msg_receiving);
 
         msg->msg_sending = 1;
 
@@ -1299,17 +1275,6 @@ lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg)
 
         msg->msg_txpeer = lp;                   /* msg takes my ref on lp */
 
-        if (!msg->msg_delayed &&
-            (lp->lp_txcredits <= 0 || src_ni->ni_txcredits <= 0)) {
-                rc = lnet_eager_recv_locked(msg);
-                if (rc != 0) {
-                        msg->msg_txpeer = NULL;
-                        lnet_peer_decref_locked(lp);
-                        LNET_UNLOCK();
-                        return rc;
-                }
-        }
-        
         rc = lnet_post_send_locked(msg, 0);
         LNET_UNLOCK();
 
@@ -1409,7 +1374,7 @@ lnet_parse_put(lnet_ni_t *ni, lnet_msg_t *msg)
 }
 
 static int
-lnet_parse_get(lnet_ni_t *ni, lnet_msg_t *msg)
+lnet_parse_get(lnet_ni_t *ni, lnet_msg_t *msg, int rdma_get)
 {
         lnet_hdr_t        *hdr = &msg->msg_hdr;
         unsigned int       mlength = 0;
@@ -1453,24 +1418,21 @@ lnet_parse_get(lnet_ni_t *ni, lnet_msg_t *msg)
         msg->msg_ev.target.nid = hdr->dest_nid;
         msg->msg_ev.hdr_data = 0;
 
-        /* set msg_recvaftersend so the incoming message is consumed (by
-         * calling lnet_ni_recv()) in lnet_ni_send() AFTER lnd_send() has been
-         * called.  This ensures that the LND can rely on the recv happening
-         * after the send so any RDMA descriptors it has stashed are still
-         * valid. */
-        msg->msg_recvaftersend = 1;
-        
+        if (rdma_get) {
+                /* The LND completes the REPLY from her recv procedure */
+                lnet_ni_recv(ni, msg->msg_private, msg, 0, 0, 0, 0);
+                return 0;
+        }
+
+        lnet_ni_recv(ni, msg->msg_private, NULL, 0, 0, 0, 0);
+        msg->msg_receiving = 0;
+                             
         rc = lnet_send(ni->ni_nid, msg);
         if (rc < 0) {
                 /* didn't get as far as lnet_ni_send() */
                 CERROR("%s: Unable to send REPLY for GET from %s: %d\n",
                        libcfs_nid2str(ni->ni_nid), libcfs_id2str(src), rc);
 
-                /* consume to release LND resources */
-                lnet_ni_recv(ni, msg->msg_private, NULL, 0, 0, 0, 0);
-
-                msg->msg_recvaftersend = 0;
-                msg->msg_receiving = 0;
                 lnet_finalize(ni, msg, rc);
         }
 
@@ -1681,7 +1643,8 @@ lnet_print_hdr(lnet_hdr_t * hdr)
 
 
 int
-lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid, void *private)
+lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid, 
+           void *private, int rdma_req)
 {
         int            rc = 0;
         int            for_me;
@@ -1738,7 +1701,7 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid, void *private)
         if (!for_me) {
                 if (the_lnet.ln_ptlcompat > 0) {
                         /* portals compatibility is single-network */
-                        CERROR ("%s, %s: Bad dest nid %s "
+                        CERROR ("%s, src %s: Bad dest nid %s "
                                 "(routing not supported)\n",
                                 libcfs_nid2str(from_nid),
                                 libcfs_nid2str(src_nid),
@@ -1749,7 +1712,7 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid, void *private)
                 if (the_lnet.ln_ptlcompat == 0 &&
                     LNET_NIDNET(dest_nid) == LNET_NIDNET(ni->ni_nid)) {
                         /* should have gone direct */
-                        CERROR ("%s, %s: Bad dest nid %s "
+                        CERROR ("%s, src %s: Bad dest nid %s "
                                 "(should have been sent direct)\n",
                                 libcfs_nid2str(from_nid),
                                 libcfs_nid2str(src_nid),
@@ -1761,7 +1724,7 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid, void *private)
                     lnet_islocalnid(dest_nid)) {
                         /* dest is another local NI; sender should have used
                          * this node's NID on its own network */
-                        CERROR ("%s, %s: Bad dest nid %s "
+                        CERROR ("%s, src %s: Bad dest nid %s "
                                 "(it's my nid but on a different network)\n",
                                 libcfs_nid2str(from_nid),
                                 libcfs_nid2str(src_nid),
@@ -1769,8 +1732,17 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid, void *private)
                         return -EPROTO;
                 }
 
+                if (rdma_req && type == LNET_MSG_GET) {
+                        CERROR ("%s, src %s: Bad optimized GET for %s "
+                                "(final destination must be me)\n",
+                                libcfs_nid2str(from_nid),
+                                libcfs_nid2str(src_nid),
+                                libcfs_nid2str(dest_nid));
+                        return -EPROTO;
+                }
+                
                 if (!the_lnet.ln_routing) {
-                        CERROR ("%s, %s: Dropping message for %s "
+                        CERROR ("%s, src %s: Dropping message for %s "
                                 "(routing not enabled)\n",
                                 libcfs_nid2str(from_nid),
                                 libcfs_nid2str(src_nid),
@@ -1864,7 +1836,7 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid, void *private)
                 rc = lnet_parse_put(ni, msg);
                 break;
         case LNET_MSG_GET:
-                rc = lnet_parse_get(ni, msg);
+                rc = lnet_parse_get(ni, msg, rdma_req);
                 break;
         case LNET_MSG_REPLY:
                 rc = lnet_parse_reply(ni, msg);
index 45c2afe..f5a84ec 100644 (file)
@@ -145,14 +145,13 @@ lnet_finalize (lnet_ni_t *ni, lnet_msg_t *msg, int status)
         if (msg == NULL)
                 return;
 #if 0
-        CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
+        CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
                lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target),
                msg->msg_target_is_router ? "t" : "",
                msg->msg_routing ? "X" : "",
                msg->msg_ack ? "A" : "",
                msg->msg_sending ? "S" : "",
                msg->msg_receiving ? "R" : "",
-               msg->msg_recvaftersend ? "g" : "",
                msg->msg_delayed ? "d" : "",
                msg->msg_txcredit ? "C" : "",
                msg->msg_peertxcredit ? "c" : "",
index 43b4f0d..33e374f 100644 (file)
@@ -29,7 +29,7 @@ lolnd_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
         LASSERT (!lntmsg->msg_routing);
         LASSERT (!lntmsg->msg_target_is_router);
 
-        rc = lnet_parse(ni, &lntmsg->msg_hdr, ni->ni_nid, lntmsg);
+        rc = lnet_parse(ni, &lntmsg->msg_hdr, ni->ni_nid, lntmsg, 0);
         if (rc >= 0)
                 lnet_finalize(ni, lntmsg, 0);
         
index 0a2287e..ad98b33 100644 (file)
@@ -98,7 +98,6 @@ typedef struct
         ptllnd_peer_t             *rx_peer;
         kptl_msg_t                *rx_msg;
         int                        rx_nob;
-        int                        rx_replied;
         char                       rx_space[0];
 } ptllnd_rx_t;
 
index dfa2ce5..08f5ed3 100644 (file)
@@ -871,40 +871,7 @@ ptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *msg)
                 PJK_UT_MSG("<<< rc=%d\n",rc);
                 return rc;
 
-        case LNET_MSG_REPLY: {
-                ptllnd_rx_t *rx = private;      /* incoming GET */
-                LASSERT (rx != NULL);
-
-                PJK_UT_MSG("LNET_MSG_REPLY rx=%p\n",rx);
-
-                if (rx->rx_msg->ptlm_type == PTLLND_MSG_TYPE_GET) {
-                        __u64        matchbits;
-
-                        matchbits = rx->rx_msg->ptlm_u.req.kptlrm_matchbits;
-                        PJK_UT_MSG("matchbits="LPX64"\n",matchbits);
-
-                        LASSERT (!rx->rx_replied);
-                        rc = ptllnd_active_rdma(plp, PTLLND_RDMA_WRITE, msg,
-                                                matchbits,
-                                                msg->msg_niov, msg->msg_iov,
-                                                msg->msg_offset, msg->msg_len);
-                        rx->rx_replied = (rc == 0);
-                        ptllnd_peer_decref(plp);
-                        PJK_UT_MSG("<<< rc=%d\n",rc);
-                        return rc;
-                }
-
-                if (rx->rx_msg->ptlm_type != PTLLND_MSG_TYPE_IMMEDIATE) {
-                        CERROR("Reply to %s bad msg type %x!!!\n",
-                               libcfs_id2str(msg->msg_target),
-                               rx->rx_msg->ptlm_type);
-                        ptllnd_peer_decref(plp);
-                        return -EPROTO;
-                }
-
-                /* fall through to handle like PUT */
-        }
-
+        case LNET_MSG_REPLY:
         case LNET_MSG_PUT:
                 PJK_UT_MSG("LNET_MSG_PUT nob=%d\n",msg->msg_len);
                 nob = msg->msg_len;
@@ -972,6 +939,9 @@ ptllnd_eager_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
         ptllnd_rx_t *stackrx = private;
         ptllnd_rx_t *heaprx;
 
+        /* Shouldn't get here; recvs only block for router buffers */
+        LBUG();
+        
         PJK_UT_MSG("rx=%p (stack)\n", stackrx);
 
         /* Don't ++plni_nrxs: heaprx replaces stackrx */
@@ -1039,9 +1009,20 @@ ptllnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,
 
         case PTLLND_MSG_TYPE_GET:
                 PJK_UT_MSG("PTLLND_MSG_TYPE_GET\n");
-                LASSERT (msg == NULL);          /* no need to finalize */
-                if (!rx->rx_replied)            /* peer will time out */
+                if (msg != NULL) {
+                        /* matched! */
+                        PJK_UT_MSG("matchbits="LPX64"\n",
+                                   rx->rx_msg->ptlm_u.req.kptlrm_matchbits);
+
+                        rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, msg,
+                                                rx->rx_msg->ptlm_u.req.kptlrm_matchbits,
+                                                msg->msg_niov, msg->msg_iov,
+                                                msg->msg_offset, msg->msg_len);
+                        PJK_UT_MSG("<<< rc=%d\n",rc);
+                        break;
+                } else {
                         ptllnd_close_peer(rx->rx_peer);
+                }
                 break;
         }
 
@@ -1236,7 +1217,6 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
         rx.rx_peer      = plp;
         rx.rx_msg       = msg;
         rx.rx_nob       = nob;
-        rx.rx_replied   = 0;
         plni->plni_nrxs++;
 
         PJK_UT_MSG("rx=%p type=%d\n",&rx,msg->ptlm_type);
@@ -1251,7 +1231,7 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
                 PJK_UT_MSG("PTLLND_MSG_TYPE_%s\n",
                         msg->ptlm_type==PTLLND_MSG_TYPE_PUT ? "PUT" : "GET");
                 rc = lnet_parse(ni, &msg->ptlm_u.req.kptlrm_hdr,
-                                msg->ptlm_srcnid, &rx);
+                                msg->ptlm_srcnid, &rx, 1);
                 PJK_UT_MSG("lnet_parse rc=%d\n",rc);
                 if (rc < 0)
                         ptllnd_rx_done(&rx);
@@ -1260,8 +1240,8 @@ ptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,
         case PTLLND_MSG_TYPE_IMMEDIATE:
                 PJK_UT_MSG("PTLLND_MSG_TYPE_IMMEDIATE\n");
                 rc = lnet_parse(ni, &msg->ptlm_u.immediate.kptlim_hdr,
-                                msg->ptlm_srcnid, &rx);
-                PJK_UT_MSG("lnet_parse rc=%d\n",rc);
+                                msg->ptlm_srcnid, &rx, 0);
+                PJK_UT_MSG("lnet_parse rc=%d\n",rc, 0);
                 if (rc < 0)
                         ptllnd_rx_done(&rx);
                 break;
index 742c087..2966c52 100644 (file)
@@ -210,7 +210,7 @@ static int from_connection(void *a, void *d)
                 hdr.dest_nid = cpu_to_le64(b->b_ni->ni_nid);
                 hdr.dest_pid = cpu_to_le32(the_lnet.ln_pid);
 
-                rc = lnet_parse(b->b_ni, &hdr, c->peer_nid, c);
+                rc = lnet_parse(b->b_ni, &hdr, c->peer_nid, c, 0);
                 if (rc < 0) {
                         CERROR("Error %d from lnet_parse\n", rc);
                         return 0;
index 9cd3f25..66d310e 100644 (file)
@@ -20,6 +20,8 @@ sbin_PROGRAMS = debugctl
 lib_LIBRARIES = libptlctl.a
 
 libptlctl_a_SOURCES = portals.c nidstrings.c debug.c l_ioctl.c parser.c parser.h
+libptlctl_a_CPPFLAGS = $(LLCPPFLAGS)
+libptlctl_a_CFLAGS = $(LLCFLAGS) -DLUSTRE_UTILS=1
 
 if UTILS
 sbin_PROGRAMS += ptlctl routerstat wirecheck 
index 3f6263b..febe89a 100644 (file)
@@ -115,7 +115,7 @@ do_stat (int fd)
           counter.route_length = subull(new_counter.route_length, old_counter.route_length);
           counter.drop_length = subull(new_counter.drop_length, old_counter.drop_length);
 
-          printf ("M %lu(%lu) E %0.0f S %7.2f/%6.0f R %7.2f/%6.0f F %7.2f/%6.0f D %4.2f/%0.0f\n",
+          printf ("M %3lu(%3lu) E %0.0f S %7.2f/%6.0f R %7.2f/%6.0f F %7.2f/%6.0f D %4.2f/%0.0f\n",
                   counter.msgs_alloc, counter.msgs_max,
                   rul(counter.errors,t),
                   rull(counter.send_length,t*1024.0*1024.0), rul(counter.send_count, t),