}
if (!msg->msg_peertxcredit) {
+ spin_lock(&lp->lpni_lock);
LASSERT((lp->lpni_txcredits < 0) ==
!list_empty(&lp->lpni_txq));
if (lp->lpni_txcredits < 0) {
msg->msg_tx_delayed = 1;
list_add_tail(&msg->msg_list, &lp->lpni_txq);
+ spin_unlock(&lp->lpni_lock);
return LNET_CREDIT_WAIT;
}
+ spin_unlock(&lp->lpni_lock);
}
if (!msg->msg_txcredit) {
LASSERT(!do_recv || msg->msg_rx_delayed);
if (!msg->msg_peerrtrcredit) {
+ spin_lock(&lp->lpni_lock);
LASSERT((lp->lpni_rtrcredits < 0) ==
!list_empty(&lp->lpni_rtrq));
LASSERT(msg->msg_rx_ready_delay);
msg->msg_rx_delayed = 1;
list_add_tail(&msg->msg_list, &lp->lpni_rtrq);
+ spin_unlock(&lp->lpni_lock);
return LNET_CREDIT_WAIT;
}
+ spin_unlock(&lp->lpni_lock);
}
rbp = lnet_msg2bufpool(msg);
LASSERT(msg2->msg_txni == ni);
LASSERT(msg2->msg_tx_delayed);
+ LASSERT(msg2->msg_tx_cpt == msg->msg_tx_cpt);
(void) lnet_post_send_locked(msg2, 1);
}
/* give back peer txcredits */
msg->msg_peertxcredit = 0;
+ spin_lock(&txpeer->lpni_lock);
LASSERT((txpeer->lpni_txcredits < 0) ==
!list_empty(&txpeer->lpni_txq));
txpeer->lpni_txqnob -= msg->msg_len + sizeof(lnet_hdr_t);
- LASSERT (txpeer->lpni_txqnob >= 0);
+ LASSERT(txpeer->lpni_txqnob >= 0);
txpeer->lpni_txcredits++;
if (txpeer->lpni_txcredits <= 0) {
msg2 = list_entry(txpeer->lpni_txq.next,
lnet_msg_t, msg_list);
list_del(&msg2->msg_list);
+ spin_unlock(&txpeer->lpni_lock);
LASSERT(msg2->msg_txpeer == txpeer);
LASSERT(msg2->msg_tx_delayed);
- (void) lnet_post_send_locked(msg2, 1);
+ if (msg2->msg_tx_cpt != msg->msg_tx_cpt) {
+ lnet_net_unlock(msg->msg_tx_cpt);
+ lnet_net_lock(msg2->msg_tx_cpt);
+ }
+ (void) lnet_post_send_locked(msg2, 1);
+ if (msg2->msg_tx_cpt != msg->msg_tx_cpt) {
+ lnet_net_unlock(msg2->msg_tx_cpt);
+ lnet_net_lock(msg->msg_tx_cpt);
+ }
+ } else {
+ spin_unlock(&txpeer->lpni_lock);
}
- }
+ }
if (txni != NULL) {
msg->msg_txni = NULL;
void
lnet_drop_routed_msgs_locked(struct list_head *list, int cpt)
{
- lnet_msg_t *msg;
- lnet_msg_t *tmp;
- struct list_head drop;
-
- INIT_LIST_HEAD(&drop);
-
- list_splice_init(list, &drop);
+ lnet_msg_t *msg;
+ lnet_msg_t *tmp;
lnet_net_unlock(cpt);
- list_for_each_entry_safe(msg, tmp, &drop, msg_list) {
+ list_for_each_entry_safe(msg, tmp, list, msg_list) {
lnet_ni_recv(msg->msg_rxni, msg->msg_private, NULL,
0, 0, 0, msg->msg_hdr.payload_length);
list_del_init(&msg->msg_list);
/* give back peer router credits */
msg->msg_peerrtrcredit = 0;
+ spin_lock(&rxpeer->lpni_lock);
LASSERT((rxpeer->lpni_rtrcredits < 0) ==
!list_empty(&rxpeer->lpni_rtrq));
/* drop all messages which are queued to be routed on that
* peer. */
if (!the_lnet.ln_routing) {
- lnet_drop_routed_msgs_locked(&rxpeer->lpni_rtrq,
- msg->msg_rx_cpt);
+ struct list_head drop;
+ INIT_LIST_HEAD(&drop);
+ list_splice_init(&rxpeer->lpni_rtrq, &drop);
+ spin_unlock(&rxpeer->lpni_lock);
+ lnet_drop_routed_msgs_locked(&drop, msg->msg_rx_cpt);
} else if (rxpeer->lpni_rtrcredits <= 0) {
msg2 = list_entry(rxpeer->lpni_rtrq.next,
lnet_msg_t, msg_list);
list_del(&msg2->msg_list);
-
+ spin_unlock(&rxpeer->lpni_lock);
(void) lnet_post_routed_recv_locked(msg2, 1);
+ } else {
+ spin_unlock(&rxpeer->lpni_lock);
}
}
if (rxni != NULL) {
* it.
*/
continue;
- } if (lpni->lpni_txcredits < best_lpni_credits)
+ } else if (lpni->lpni_txcredits < best_lpni_credits) {
/*
* We already have a peer that has more credits
* available than this one. No need to consider
* this peer further.
*/
continue;
- else if (lpni->lpni_txcredits == best_lpni_credits) {
+ } else if (lpni->lpni_txcredits == best_lpni_credits) {
/*
* The best peer found so far and the current peer
* have the same number of available credits let's