tempiovec[niov].iov_len = min((int)(payload_iov->iov_len - payload_offset),
(int)payload_nob);
+ PJK_UT_MSG("iov_base[%d]=%p\n",niov,tempiovec[niov].iov_base);
+ PJK_UT_MSG("iov_len[%d] =%d\n",niov,tempiovec[niov].iov_len);
+
payload_offset = 0;
payload_nob -= tempiovec[niov].iov_len;
payload_iov++;
}
}else{
+ PJK_UT_MSG_DATA("Mapping KIOVs tx=%p\n",tx);
+
+
while (payload_offset >= payload_kiov->kiov_len) {
payload_offset -= payload_kiov->kiov_len;
payload_kiov++;
ptr = cfs_kmap(payload_kiov->kiov_page);
- ptr += payload_kiov->kiov_offset + payload_offset;
+ LASSERT( ptr != 0);
+ ptr += payload_kiov->kiov_offset;
- tempiovec[niov].iov_base = ptr;
+ tempiovec[niov].iov_base = ptr + payload_offset;
tempiovec[niov].iov_len = min((int)(payload_kiov->kiov_len - payload_offset),
(int)payload_nob);
+ PJK_UT_MSG("iov_base[%d]=%p\n",niov,tempiovec[niov].iov_base);
+ PJK_UT_MSG("iov_len[%d] =%d\n",niov,tempiovec[niov].iov_len);
+
+
payload_offset = 0;
payload_nob -= tempiovec[niov].iov_len;
payload_kiov++;
if(tx->tx_mapped_kiov == 0)
return;
+ PJK_UT_MSG("Un Mapping KIOVs tx=%p\n",tx);
+
if (payload_kiov != NULL){
LASSERT(tx->tx_payload_iov == NULL);
void
-kptlnd_do_put(
+kptllnd_do_put(
kptl_tx_t *tx,
lnet_msg_t *lntmsg,
- lnet_hdr_t *hdr,
- kptl_data_t *kptllnd_data,
- lnet_process_id_t target,
- unsigned int payload_niov,
- struct iovec *payload_iov,
- lnet_kiov_t *payload_kiov,
- unsigned int payload_offset,
- unsigned int payload_nob)
+ kptl_data_t *kptllnd_data)
{
LASSERT(tx != NULL);
- tx->tx_payload_niov = payload_niov;
- tx->tx_payload_iov = payload_iov;
- tx->tx_payload_kiov = payload_kiov;
- tx->tx_payload_offset = payload_offset;
- tx->tx_payload_nob = payload_nob;
+ tx->tx_payload_niov = lntmsg->msg_niov;
+ tx->tx_payload_iov = lntmsg->msg_iov;
+ tx->tx_payload_kiov = lntmsg->msg_kiov;
+ tx->tx_payload_offset = lntmsg->msg_offset;
+ tx->tx_payload_nob = lntmsg->msg_len;
- tx->tx_msg->ptlm_u.req.kptlrm_hdr = *hdr;
+ tx->tx_msg->ptlm_u.req.kptlrm_hdr = lntmsg->msg_hdr;
kptllnd_init_msg (tx->tx_msg,
PLTLND_MSG_TYPE_PUT,
sizeof(kptl_request_msg_t));
- kptllnd_tx_launch(tx, target.nid,lntmsg);
+ kptllnd_tx_launch(tx, lntmsg->msg_target.nid,lntmsg);
}
int
if (nob <= *kptllnd_tunables.kptl_max_immd_size)
break;
- kptlnd_do_put(tx,lntmsg,hdr,kptllnd_data,target,
- payload_niov,payload_iov,
- payload_kiov,payload_offset,payload_nob);
+ kptllnd_do_put(tx,lntmsg,kptllnd_data);
PJK_UT_MSG_DATA("<<< SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS\n");
return 0;
if (nob <= *kptllnd_tunables.kptl_max_immd_size)
break;
- kptlnd_do_put(tx,lntmsg,hdr,kptllnd_data,target,
- payload_niov,payload_iov,
- payload_kiov,payload_offset,payload_nob);
+ kptllnd_do_put(tx,lntmsg,kptllnd_data);
PJK_UT_MSG_DATA("<<< SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS\n");
return 0;
}
void
-kptllnd_peer_queue_tx (
- kptl_peer_t *peer,
- kptl_tx_t *tx)
-{
- spin_lock(&peer->peer_lock);
- kptllnd_peer_queue_tx_locked (peer, tx);
- spin_unlock(&peer->peer_lock);
-
- kptllnd_peer_check_sends(peer);
-}
-
-
-void
kptllnd_peer_queue_bulk_rdma_tx_locked(
kptl_peer_t *peer,
kptl_tx_t *tx)
*
* THEN it is safe to simply discard this NOOP
* and continue one.
+ *
+ * NOTE: We can't be holding the lock while calling
+ * kptllnd_tx_decref because that will call lnet_finalize()
+ * which can not be called while loding a lock.
*/
if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP &&
(!list_empty(&peer->peer_pending_txs) ||
peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)) {
+ spin_unlock(&peer->peer_lock);
/* redundant NOOP */
kptllnd_tx_decref(tx);
CDEBUG(D_NET, LPX64": redundant noop\n",
peer->peer_nid);
+ spin_lock(&peer->peer_lock);
continue;
}
* (which could send it)
*/
if (peer != NULL) {
- kptllnd_peer_queue_tx ( peer, tx );
+ spin_lock(&peer->peer_lock);
+ kptllnd_peer_queue_tx_locked ( peer, tx );
+ spin_unlock(&peer->peer_lock);
+ kptllnd_peer_check_sends(peer);
kptllnd_peer_decref(peer,"find");
PJK_UT_MSG("<<< FOUND\n");
return;
CDEBUG(D_TRACE,"HELLO message race occurred (nid="LPX64")\n",target_nid);
- kptllnd_peer_queue_tx ( peer, tx );
+ spin_lock(&peer->peer_lock);
+ kptllnd_peer_queue_tx_locked ( peer, tx );
+ spin_unlock(&peer->peer_lock);
+ kptllnd_peer_check_sends(peer);
kptllnd_peer_decref(peer,"find");
/* and we don't need the connection tx*/
* the tx will wait for a reply.
*/
PJK_UT_MSG("TXHello=%p\n",tx_hello);
+
+ spin_lock(&peer->peer_lock);
kptllnd_peer_queue_tx_locked(peer,tx_hello);
kptllnd_peer_queue_tx_locked(peer,tx);
+ spin_unlock(&peer->peer_lock);
write_unlock_irqrestore(g_lock,flags);