Whamcloud - gitweb
* Cleanedup send path
authorpjkirner <pjkirner>
Wed, 21 Sep 2005 20:41:14 +0000 (20:41 +0000)
committerpjkirner <pjkirner>
Wed, 21 Sep 2005 20:41:14 +0000 (20:41 +0000)
* Fixed lock being heald while calling lnet_finalize (only in error path)
* Fixed calls to kptllnd_peer_queue_tx_locked(), to actually hold the lock.

lnet/klnds/ptllnd/ptllnd.h
lnet/klnds/ptllnd/ptllnd_cb.c
lnet/klnds/ptllnd/ptllnd_peer.c

index 883322a..2ab40ff 100755 (executable)
@@ -512,11 +512,6 @@ kptllnd_peer_cancel(
         kptl_peer_t *peer);
 
 void
-kptllnd_peer_queue_tx (
-        kptl_peer_t *peer,
-        kptl_tx_t *tx);
-
-void
 kptllnd_peer_queue_bulk_rdma_tx_locked(
         kptl_peer_t *peer,
         kptl_tx_t *tx);
index 817e5b7..7313758 100644 (file)
@@ -95,6 +95,9 @@ kptllnd_setup_md(
                         tempiovec[niov].iov_len  = min((int)(payload_iov->iov_len - payload_offset),
                                                 (int)payload_nob);
 
+                        PJK_UT_MSG("iov_base[%d]=%p\n",niov,tempiovec[niov].iov_base);
+                        PJK_UT_MSG("iov_len[%d] =%d\n",niov,tempiovec[niov].iov_len);
+
                         payload_offset = 0;
                         payload_nob -= tempiovec[niov].iov_len;
                         payload_iov++;
@@ -103,6 +106,9 @@ kptllnd_setup_md(
                 }
         }else{
 
+                PJK_UT_MSG_DATA("Mapping KIOVs tx=%p\n",tx);
+
+
                 while (payload_offset >= payload_kiov->kiov_len) {
                         payload_offset -= payload_kiov->kiov_len;
                         payload_kiov++;
@@ -119,12 +125,17 @@ kptllnd_setup_md(
 
 
                         ptr = cfs_kmap(payload_kiov->kiov_page);
-                        ptr += payload_kiov->kiov_offset + payload_offset;
+                        LASSERT( ptr != 0);
+                        ptr += payload_kiov->kiov_offset;
 
-                        tempiovec[niov].iov_base = ptr;
+                        tempiovec[niov].iov_base = ptr + payload_offset;
                         tempiovec[niov].iov_len = min((int)(payload_kiov->kiov_len - payload_offset),
                                                         (int)payload_nob);
 
+                        PJK_UT_MSG("iov_base[%d]=%p\n",niov,tempiovec[niov].iov_base);
+                        PJK_UT_MSG("iov_len[%d] =%d\n",niov,tempiovec[niov].iov_len);
+
+
                         payload_offset = 0;
                         payload_nob -= tempiovec[niov].iov_len;
                         payload_kiov++;
@@ -163,6 +174,8 @@ kptllnd_cleanup_kiov(
         if(tx->tx_mapped_kiov == 0)
                 return;
 
+        PJK_UT_MSG("Un Mapping KIOVs tx=%p\n",tx);
+
         if (payload_kiov != NULL){
 
                 LASSERT(tx->tx_payload_iov == NULL);
@@ -365,31 +378,24 @@ end:
 
 
 void
-kptlnd_do_put(
+kptllnd_do_put(
         kptl_tx_t       *tx,
         lnet_msg_t      *lntmsg,
-        lnet_hdr_t      *hdr,
-        kptl_data_t     *kptllnd_data,
-        lnet_process_id_t target,
-        unsigned int     payload_niov,
-        struct iovec    *payload_iov,
-        lnet_kiov_t     *payload_kiov,
-        unsigned int     payload_offset,
-        unsigned int     payload_nob)
+        kptl_data_t     *kptllnd_data)
 {
         LASSERT(tx != NULL);
 
-        tx->tx_payload_niov     = payload_niov;
-        tx->tx_payload_iov      = payload_iov;
-        tx->tx_payload_kiov     = payload_kiov;
-        tx->tx_payload_offset   = payload_offset;
-        tx->tx_payload_nob      = payload_nob;
+        tx->tx_payload_niov     = lntmsg->msg_niov;
+        tx->tx_payload_iov      = lntmsg->msg_iov;
+        tx->tx_payload_kiov     = lntmsg->msg_kiov;
+        tx->tx_payload_offset   = lntmsg->msg_offset;
+        tx->tx_payload_nob      = lntmsg->msg_len;
 
-        tx->tx_msg->ptlm_u.req.kptlrm_hdr = *hdr;
+        tx->tx_msg->ptlm_u.req.kptlrm_hdr = lntmsg->msg_hdr;
         kptllnd_init_msg (tx->tx_msg,
                           PLTLND_MSG_TYPE_PUT,
                           sizeof(kptl_request_msg_t));
-        kptllnd_tx_launch(tx, target.nid,lntmsg);
+        kptllnd_tx_launch(tx, lntmsg->msg_target.nid,lntmsg);
 }
 
 int
@@ -464,9 +470,7 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                 if (nob <= *kptllnd_tunables.kptl_max_immd_size)
                         break;
 
-                kptlnd_do_put(tx,lntmsg,hdr,kptllnd_data,target,
-                                payload_niov,payload_iov,
-                                payload_kiov,payload_offset,payload_nob);
+                kptllnd_do_put(tx,lntmsg,kptllnd_data);
 
                 PJK_UT_MSG_DATA("<<< SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS\n");
                 return 0;
@@ -544,9 +548,7 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
                         if (nob <= *kptllnd_tunables.kptl_max_immd_size)
                                 break;
 
-                        kptlnd_do_put(tx,lntmsg,hdr,kptllnd_data,target,
-                                        payload_niov,payload_iov,
-                                        payload_kiov,payload_offset,payload_nob);
+                        kptllnd_do_put(tx,lntmsg,kptllnd_data);
 
                         PJK_UT_MSG_DATA("<<< SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS\n");
                         return 0;
index 91f4066..2e6f879 100644 (file)
@@ -419,19 +419,6 @@ kptllnd_peer_queue_tx_locked (
 }
 
 void
-kptllnd_peer_queue_tx (
-        kptl_peer_t *peer,
-        kptl_tx_t *tx)
-{
-        spin_lock(&peer->peer_lock);
-        kptllnd_peer_queue_tx_locked (peer, tx);
-        spin_unlock(&peer->peer_lock);
-
-        kptllnd_peer_check_sends(peer);
-}
-
-
-void
 kptllnd_peer_queue_bulk_rdma_tx_locked(
         kptl_peer_t *peer,
         kptl_tx_t *tx)
@@ -567,14 +554,20 @@ kptllnd_peer_check_sends (
                  *
                  * THEN it is safe to simply discard this NOOP
                  * and continue one.
+                 *
+                 * NOTE: We can't be holding the lock while calling
+                 * kptllnd_tx_decref because that will call lnet_finalize()
+                 * which can not be called while loding a lock.
                  */
                 if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP &&
                     (!list_empty(&peer->peer_pending_txs) ||
                      peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)) {
+                        spin_unlock(&peer->peer_lock);
                         /* redundant NOOP */
                         kptllnd_tx_decref(tx);
                         CDEBUG(D_NET, LPX64": redundant noop\n",
                                peer->peer_nid);
+                        spin_lock(&peer->peer_lock);
                         continue;
                 }
 
@@ -1220,7 +1213,10 @@ kptllnd_tx_launch (
          * (which could send it)
          */
         if (peer != NULL) {
-                kptllnd_peer_queue_tx ( peer, tx );
+                spin_lock(&peer->peer_lock);
+                kptllnd_peer_queue_tx_locked ( peer, tx );
+                spin_unlock(&peer->peer_lock);
+                kptllnd_peer_check_sends(peer);
                 kptllnd_peer_decref(peer,"find");
                 PJK_UT_MSG("<<< FOUND\n");
                 return;
@@ -1265,7 +1261,10 @@ kptllnd_tx_launch (
 
                 CDEBUG(D_TRACE,"HELLO message race occurred (nid="LPX64")\n",target_nid);
 
-                kptllnd_peer_queue_tx ( peer, tx );
+                spin_lock(&peer->peer_lock);
+                kptllnd_peer_queue_tx_locked ( peer, tx );
+                spin_unlock(&peer->peer_lock);
+                kptllnd_peer_check_sends(peer);
                 kptllnd_peer_decref(peer,"find");
 
                 /* and we don't need the connection tx*/
@@ -1302,8 +1301,11 @@ kptllnd_tx_launch (
          * the tx will wait for a reply.
          */
         PJK_UT_MSG("TXHello=%p\n",tx_hello);
+
+        spin_lock(&peer->peer_lock);
         kptllnd_peer_queue_tx_locked(peer,tx_hello);
         kptllnd_peer_queue_tx_locked(peer,tx);
+        spin_unlock(&peer->peer_lock);
 
         write_unlock_irqrestore(g_lock,flags);