From 232f9d2f504d94744737e685f56a804d4b4d71a9 Mon Sep 17 00:00:00 2001 From: pjkirner Date: Wed, 21 Sep 2005 20:41:14 +0000 Subject: [PATCH] * Cleanedup send path * Fixed lock being heald while calling lnet_finalize (only in error path) * Fixed calls to kptllnd_peer_queue_tx_locked(), to actually hold the lock. --- lnet/klnds/ptllnd/ptllnd.h | 5 ----- lnet/klnds/ptllnd/ptllnd_cb.c | 50 +++++++++++++++++++++-------------------- lnet/klnds/ptllnd/ptllnd_peer.c | 32 +++++++++++++------------- 3 files changed, 43 insertions(+), 44 deletions(-) diff --git a/lnet/klnds/ptllnd/ptllnd.h b/lnet/klnds/ptllnd/ptllnd.h index 883322a..2ab40ff 100755 --- a/lnet/klnds/ptllnd/ptllnd.h +++ b/lnet/klnds/ptllnd/ptllnd.h @@ -512,11 +512,6 @@ kptllnd_peer_cancel( kptl_peer_t *peer); void -kptllnd_peer_queue_tx ( - kptl_peer_t *peer, - kptl_tx_t *tx); - -void kptllnd_peer_queue_bulk_rdma_tx_locked( kptl_peer_t *peer, kptl_tx_t *tx); diff --git a/lnet/klnds/ptllnd/ptllnd_cb.c b/lnet/klnds/ptllnd/ptllnd_cb.c index 817e5b7..7313758 100644 --- a/lnet/klnds/ptllnd/ptllnd_cb.c +++ b/lnet/klnds/ptllnd/ptllnd_cb.c @@ -95,6 +95,9 @@ kptllnd_setup_md( tempiovec[niov].iov_len = min((int)(payload_iov->iov_len - payload_offset), (int)payload_nob); + PJK_UT_MSG("iov_base[%d]=%p\n",niov,tempiovec[niov].iov_base); + PJK_UT_MSG("iov_len[%d] =%d\n",niov,tempiovec[niov].iov_len); + payload_offset = 0; payload_nob -= tempiovec[niov].iov_len; payload_iov++; @@ -103,6 +106,9 @@ kptllnd_setup_md( } }else{ + PJK_UT_MSG_DATA("Mapping KIOVs tx=%p\n",tx); + + while (payload_offset >= payload_kiov->kiov_len) { payload_offset -= payload_kiov->kiov_len; payload_kiov++; @@ -119,12 +125,17 @@ kptllnd_setup_md( ptr = cfs_kmap(payload_kiov->kiov_page); - ptr += payload_kiov->kiov_offset + payload_offset; + LASSERT( ptr != 0); + ptr += payload_kiov->kiov_offset; - tempiovec[niov].iov_base = ptr; + tempiovec[niov].iov_base = ptr + payload_offset; tempiovec[niov].iov_len = min((int)(payload_kiov->kiov_len - payload_offset), (int)payload_nob); + PJK_UT_MSG("iov_base[%d]=%p\n",niov,tempiovec[niov].iov_base); + PJK_UT_MSG("iov_len[%d] =%d\n",niov,tempiovec[niov].iov_len); + + payload_offset = 0; payload_nob -= tempiovec[niov].iov_len; payload_kiov++; @@ -163,6 +174,8 @@ kptllnd_cleanup_kiov( if(tx->tx_mapped_kiov == 0) return; + PJK_UT_MSG("Un Mapping KIOVs tx=%p\n",tx); + if (payload_kiov != NULL){ LASSERT(tx->tx_payload_iov == NULL); @@ -365,31 +378,24 @@ end: void -kptlnd_do_put( +kptllnd_do_put( kptl_tx_t *tx, lnet_msg_t *lntmsg, - lnet_hdr_t *hdr, - kptl_data_t *kptllnd_data, - lnet_process_id_t target, - unsigned int payload_niov, - struct iovec *payload_iov, - lnet_kiov_t *payload_kiov, - unsigned int payload_offset, - unsigned int payload_nob) + kptl_data_t *kptllnd_data) { LASSERT(tx != NULL); - tx->tx_payload_niov = payload_niov; - tx->tx_payload_iov = payload_iov; - tx->tx_payload_kiov = payload_kiov; - tx->tx_payload_offset = payload_offset; - tx->tx_payload_nob = payload_nob; + tx->tx_payload_niov = lntmsg->msg_niov; + tx->tx_payload_iov = lntmsg->msg_iov; + tx->tx_payload_kiov = lntmsg->msg_kiov; + tx->tx_payload_offset = lntmsg->msg_offset; + tx->tx_payload_nob = lntmsg->msg_len; - tx->tx_msg->ptlm_u.req.kptlrm_hdr = *hdr; + tx->tx_msg->ptlm_u.req.kptlrm_hdr = lntmsg->msg_hdr; kptllnd_init_msg (tx->tx_msg, PLTLND_MSG_TYPE_PUT, sizeof(kptl_request_msg_t)); - kptllnd_tx_launch(tx, target.nid,lntmsg); + kptllnd_tx_launch(tx, lntmsg->msg_target.nid,lntmsg); } int @@ -464,9 +470,7 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) if (nob <= *kptllnd_tunables.kptl_max_immd_size) break; - kptlnd_do_put(tx,lntmsg,hdr,kptllnd_data,target, - payload_niov,payload_iov, - payload_kiov,payload_offset,payload_nob); + kptllnd_do_put(tx,lntmsg,kptllnd_data); PJK_UT_MSG_DATA("<<< SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS\n"); return 0; @@ -544,9 +548,7 @@ kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) if (nob <= *kptllnd_tunables.kptl_max_immd_size) break; - kptlnd_do_put(tx,lntmsg,hdr,kptllnd_data,target, - payload_niov,payload_iov, - payload_kiov,payload_offset,payload_nob); + kptllnd_do_put(tx,lntmsg,kptllnd_data); PJK_UT_MSG_DATA("<<< SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS\n"); return 0; diff --git a/lnet/klnds/ptllnd/ptllnd_peer.c b/lnet/klnds/ptllnd/ptllnd_peer.c index 91f4066..2e6f879 100644 --- a/lnet/klnds/ptllnd/ptllnd_peer.c +++ b/lnet/klnds/ptllnd/ptllnd_peer.c @@ -419,19 +419,6 @@ kptllnd_peer_queue_tx_locked ( } void -kptllnd_peer_queue_tx ( - kptl_peer_t *peer, - kptl_tx_t *tx) -{ - spin_lock(&peer->peer_lock); - kptllnd_peer_queue_tx_locked (peer, tx); - spin_unlock(&peer->peer_lock); - - kptllnd_peer_check_sends(peer); -} - - -void kptllnd_peer_queue_bulk_rdma_tx_locked( kptl_peer_t *peer, kptl_tx_t *tx) @@ -567,14 +554,20 @@ kptllnd_peer_check_sends ( * * THEN it is safe to simply discard this NOOP * and continue one. + * + * NOTE: We can't be holding the lock while calling + * kptllnd_tx_decref because that will call lnet_finalize() + * which can not be called while loding a lock. */ if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP && (!list_empty(&peer->peer_pending_txs) || peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)) { + spin_unlock(&peer->peer_lock); /* redundant NOOP */ kptllnd_tx_decref(tx); CDEBUG(D_NET, LPX64": redundant noop\n", peer->peer_nid); + spin_lock(&peer->peer_lock); continue; } @@ -1220,7 +1213,10 @@ kptllnd_tx_launch ( * (which could send it) */ if (peer != NULL) { - kptllnd_peer_queue_tx ( peer, tx ); + spin_lock(&peer->peer_lock); + kptllnd_peer_queue_tx_locked ( peer, tx ); + spin_unlock(&peer->peer_lock); + kptllnd_peer_check_sends(peer); kptllnd_peer_decref(peer,"find"); PJK_UT_MSG("<<< FOUND\n"); return; @@ -1265,7 +1261,10 @@ kptllnd_tx_launch ( CDEBUG(D_TRACE,"HELLO message race occurred (nid="LPX64")\n",target_nid); - kptllnd_peer_queue_tx ( peer, tx ); + spin_lock(&peer->peer_lock); + kptllnd_peer_queue_tx_locked ( peer, tx ); + spin_unlock(&peer->peer_lock); + kptllnd_peer_check_sends(peer); kptllnd_peer_decref(peer,"find"); /* and we don't need the connection tx*/ @@ -1302,8 +1301,11 @@ kptllnd_tx_launch ( * the tx will wait for a reply. */ PJK_UT_MSG("TXHello=%p\n",tx_hello); + + spin_lock(&peer->peer_lock); kptllnd_peer_queue_tx_locked(peer,tx_hello); kptllnd_peer_queue_tx_locked(peer,tx); + spin_unlock(&peer->peer_lock); write_unlock_irqrestore(g_lock,flags); -- 1.8.3.1