From: pjkirner Date: Thu, 29 Sep 2005 20:04:49 +0000 (+0000) Subject: * make scheduler threads balance rx and tx more evenly X-Git-Tag: v1_7_100~1^25~6^2~128 X-Git-Url: https://git.whamcloud.com/?a=commitdiff_plain;h=056532605a58b186e34646147e9432e2a98ad227;p=fs%2Flustre-release.git * make scheduler threads balance rx and tx more evenly * move peer allocation outside spin lock * fix bug with irqs not being disabled on spinlocks that need syncronzation with callbacks --- diff --git a/lnet/klnds/ptllnd/ptllnd.h b/lnet/klnds/ptllnd/ptllnd.h index 95f31c0..9806bc4 100755 --- a/lnet/klnds/ptllnd/ptllnd.h +++ b/lnet/klnds/ptllnd/ptllnd.h @@ -266,9 +266,10 @@ typedef struct kptl_tx /* transmit message */ typedef enum { PEER_STATE_UNINITIALIZED = 0, - PEER_STATE_WAITING_HELLO = 1, - PEER_STATE_ACTIVE = 2, - PEER_STATE_CANCELED = 3, + PEER_STATE_ALLOCATED = 1, //QQQ + PEER_STATE_WAITING_HELLO = 2, + PEER_STATE_ACTIVE = 3, + PEER_STATE_CANCELED = 4, }kptllnd_peer_state_t; struct kptl_peer @@ -313,7 +314,7 @@ struct kptl_data cfs_mem_cache_t* kptl_rx_cache; /* rx descripter cache */ struct kptl_tx *kptl_tx_descs; /* the tx descriptors array */ - spinlock_t kptl_tx_lock; /* serialise the next 4 members*/ + spinlock_t kptl_tx_lock; /* serialise idle tx list*/ struct list_head kptl_idle_txs; /* idle tx descriptors */ rwlock_t kptl_peer_rw_lock; /* lock for peer table */ @@ -658,10 +659,6 @@ kptllnd_setup_md( int payload_nob, tempiov_t *tempiov); -int kptllnd_process_scheduled_tx(kptl_data_t *kptllnd_data); -int kptllnd_process_scheduled_rx(kptl_data_t *kptllnd_data); -int kptllnd_process_scheduled_rxb(kptl_data_t *kptllnd_data); - static inline lnet_nid_t ptl2lnetnid(kptl_data_t *kptllnd_data,ptl_nid_t portals_nid) { #ifdef _USING_LUSTRE_PORTALS_ diff --git a/lnet/klnds/ptllnd/ptllnd_cb.c b/lnet/klnds/ptllnd/ptllnd_cb.c index 408cc49..27b1f81 100644 --- a/lnet/klnds/ptllnd/ptllnd_cb.c +++ b/lnet/klnds/ptllnd/ptllnd_cb.c @@ -218,6 +218,7 @@ kptllnd_start_bulk_rdma( tempiov_t tempiov; kptl_msg_t *rxmsg = rx->rx_msg; kptl_peer_t *peer = rx->rx_peer; + unsigned long flags; /* @@ -252,7 +253,7 @@ kptllnd_start_bulk_rdma( payload_niov,payload_iov,payload_kiov, payload_offset,payload_nob,&tempiov); - spin_lock(&peer->peer_lock); + spin_lock_irqsave(&peer->peer_lock, flags); /* * Attach the MD @@ -265,7 +266,7 @@ kptllnd_start_bulk_rdma( if(ptl_rc != PTL_OK){ CERROR("PtlMDBind failed %d\n",ptl_rc); - spin_unlock(&peer->peer_lock); + spin_unlock_irqrestore(&peer->peer_lock, flags); /* * Just drop the ref for this MD because it was never * posted to portals @@ -290,7 +291,7 @@ kptllnd_start_bulk_rdma( */ kptllnd_tx_addref(tx); - spin_unlock(&peer->peer_lock); + spin_unlock_irqrestore(&peer->peer_lock, flags); /* @@ -321,7 +322,7 @@ kptllnd_start_bulk_rdma( CERROR("Ptl%s failed: %d\n", op == PTL_MD_OP_GET ? "Get" : "Put",ptl_rc); - spin_lock(&peer->peer_lock); + spin_lock_irqsave(&peer->peer_lock, flags); /* * Unlink the MD because it's not yet in use @@ -349,7 +350,7 @@ kptllnd_start_bulk_rdma( kptllnd_peer_dequeue_tx_locked(peer,tx); tx->tx_peer = NULL; - spin_unlock(&peer->peer_lock); + spin_unlock_irqrestore(&peer->peer_lock, flags); rc = -ENOMEM; goto end; @@ -888,6 +889,10 @@ kptllnd_scheduler(void *arg) cfs_time_t last_check = cfs_time_current(); cfs_duration_t duration; time_t duration_sec; + unsigned long flags; + kptl_rx_t *rx = NULL; + kptl_rx_buffer_t *rxb = NULL; + kptl_tx_t *tx = NULL; PJK_UT_MSG(">>>\n"); @@ -959,26 +964,65 @@ kptllnd_scheduler(void *arg) } } - /* - * Drain the RX queue - */ - while(kptllnd_process_scheduled_rx(kptllnd_data)!=0); /* - * Repost all RXBs + * Now service the queuse */ - while(kptllnd_process_scheduled_rxb(kptllnd_data)!=0); - - /* - * Drain the TX queue. Note RX's can cause new TX's - * to be added to the queue. - */ - while(kptllnd_process_scheduled_tx(kptllnd_data)!=0); - + do{ + spin_lock_irqsave(&kptllnd_data->kptl_sched_lock, flags); + + /* + * Drain the RX queue + */ + rx = NULL; + if(!list_empty(&kptllnd_data->kptl_sched_rxq)){ + rx = list_entry (kptllnd_data->kptl_sched_rxq.next, + kptl_rx_t, rx_list); + list_del_init(&rx->rx_list); + } + + /* + * IDrain the RXB Repost queue + */ + rxb = NULL; + if(!list_empty(&kptllnd_data->kptl_sched_rxbq)){ + rxb = list_entry (kptllnd_data->kptl_sched_rxbq.next, + kptl_rx_buffer_t, rxb_repost_list); + list_del_init(&rxb->rxb_repost_list); + } + /* + * Drain the TX queue. Note RX's can cause new TX's + * to be added to the queue. + */ + tx = NULL; + if(!list_empty(&kptllnd_data->kptl_sched_txq)){ + tx = list_entry (kptllnd_data->kptl_sched_txq.next, + kptl_tx_t, tx_schedlist); + list_del_init(&tx->tx_schedlist); + } + + spin_unlock_irqrestore(&kptllnd_data->kptl_sched_lock, flags); + + + /* + * Process anything that came off the list + */ + if(rx) + kptllnd_rx_scheduler_handler(rx); + if(rxb) + kptllnd_rx_buffer_post_handle_error(rxb); + if(tx){ + PJK_UT_MSG(">>> tx=%p\n",tx); + kptllnd_tx_done(tx); + PJK_UT_MSG("<<<\n"); + } + + /* + * As long as we did something this time around + * try again. + */ + }while(rx != NULL || rxb != NULL || tx != NULL); - /* - * Clean any canceled peers - */ kptllnd_clean_canceled_peers(kptllnd_data); } diff --git a/lnet/klnds/ptllnd/ptllnd_peer.c b/lnet/klnds/ptllnd/ptllnd_peer.c index c5866e3..b5d0326 100644 --- a/lnet/klnds/ptllnd/ptllnd_peer.c +++ b/lnet/klnds/ptllnd/ptllnd_peer.c @@ -24,14 +24,53 @@ kptllnd_peer_destroy ( kptl_peer_t *peer); kptl_peer_t * -kptllnd_peer_find_locked ( +kptllnd_peer_find_holding_list_lock ( kptl_data_t *kptllnd_data, lnet_nid_t nid); +int +kptllnd_peer_add_to_list_locked ( + kptl_data_t *kptllnd_data, + kptl_peer_t *peer) +{ + /* QQQ - this got split out + * But first check we haven't exceeded or maximum + * number of peers + */ + if (atomic_read(&kptllnd_data->kptl_npeers) >= + *kptllnd_tunables.kptl_concurrent_peers) { + STAT_UPDATE(kps_too_many_peers); + CERROR("Can't create peer: too many peers\n"); + return -EOVERFLOW; /* !! but at least it distinguishes */ + } + + /* + * Update the state + */ + LASSERT(peer->peer_state == PEER_STATE_ALLOCATED); + peer->peer_state = PEER_STATE_WAITING_HELLO; + + /* + * +1 ref for the list + */ + atomic_inc(&peer->peer_refcount); + + /* npeers only grows with the global lock held */ + atomic_inc(&kptllnd_data->kptl_npeers); + + /* And add this to the list */ + LASSERT(list_empty(&peer->peer_list)); + list_add_tail (&peer->peer_list, + kptllnd_nid2peerlist (kptllnd_data,peer->peer_nid)); + + STAT_UPDATE(kps_peers_created); + + return 0; +} int -kptllnd_peer_create_locked ( +kptllnd_peer_allocate ( kptl_data_t *kptllnd_data, kptl_peer_t **peerp, lnet_nid_t nid) @@ -43,17 +82,6 @@ kptllnd_peer_create_locked ( LASSERT (nid != PTL_NID_ANY); - /* - * But first check we haven't exceeded or maximum - * number of peers - */ - if (atomic_read(&kptllnd_data->kptl_npeers) >= - *kptllnd_tunables.kptl_concurrent_peers) { - STAT_UPDATE(kps_too_many_peers); - CERROR("Can't create peer: too many peers\n"); - rc = -EOVERFLOW; /* !! but at least it distinguishes */ - } - LIBCFS_ALLOC(peer, sizeof (*peer)); if (peer == NULL) { CERROR("Cannot allocate memory for peer\n"); @@ -68,7 +96,7 @@ kptllnd_peer_create_locked ( spin_lock_init (&peer->peer_lock); - peer->peer_state = PEER_STATE_WAITING_HELLO; + peer->peer_state = PEER_STATE_ALLOCATED; peer->peer_kptllnd_data = kptllnd_data; peer->peer_nid = nid; //peer->peer_incarnation = 0; @@ -89,7 +117,6 @@ kptllnd_peer_create_locked ( peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS; //peer->peer_last_matchbits_seen = 0; - /* * Reserve space in the RX buffer pool for this new peer */ @@ -103,20 +130,10 @@ kptllnd_peer_create_locked ( return rc; } - /* - * 1 ref for the list + /* QQQ - we're not adding to the list anymore * 1 for the caller */ - atomic_set (&peer->peer_refcount, 2); - - /* npeers only grows with the global lock held */ - atomic_inc(&kptllnd_data->kptl_npeers); - - /* And add this to the list */ - list_add_tail (&peer->peer_list, - kptllnd_nid2peerlist (kptllnd_data,nid)); - - STAT_UPDATE(kps_peers_created); + atomic_set (&peer->peer_refcount, 1); PJK_UT_MSG("<<< Peer=%p nid="LPX64"\n",peer,nid); *peerp = peer; @@ -145,13 +162,15 @@ kptllnd_peer_destroy ( kptllnd_rx_buffer_pool_unreserve( &kptllnd_data->kptl_rx_buffer_pool, *kptllnd_tunables.kptl_peercredits); - - - /* NB a peer's connections keep a reference on their peer until - * they are destroyed, so we can be assured that _all_ state to do - * with this peer has been cleaned up when its refcount drops to - * zero. */ - atomic_dec(&kptllnd_data->kptl_npeers); + + /* + * If the peer is only in the ALLOCATED state + * then it isn't yet trackied in kptl_npeers, + * so do nothing in that case. In all other cases + * we need to decrement the counter. + */ + if(peer->peer_state != PEER_STATE_ALLOCATED) + atomic_dec(&kptllnd_data->kptl_npeers); } @@ -198,7 +217,7 @@ kptllnd_peer_decref ( if(peer->peer_state == PEER_STATE_CANCELED) kptllnd_data->kptl_canceled_peers_counter++; write_unlock_irqrestore(&kptllnd_data->kptl_peer_rw_lock, flags); - + kptllnd_peer_destroy(peer); } @@ -211,6 +230,7 @@ kptllnd_peer_cancel_pending_txs( struct list_head *tx_temp; struct list_head *tx_next; kptl_tx_t *tx; + unsigned long flags; INIT_LIST_HEAD (&list); @@ -219,7 +239,7 @@ kptllnd_peer_cancel_pending_txs( * Tranfer all the PENDING TX's to a temporary list * while holding the peer lock */ - spin_lock(&peer->peer_lock); + spin_lock_irqsave(&peer->peer_lock, flags); if(!list_empty(&peer->peer_pending_txs)) PJK_UT_MSG("Clearing Pending TXs\n"); @@ -231,7 +251,7 @@ kptllnd_peer_cancel_pending_txs( list_add(&tx->tx_list,&list); } - spin_unlock(&peer->peer_lock); + spin_unlock_irqrestore(&peer->peer_lock, flags); /* * Now relese the refereces outside of the peer_lock @@ -251,8 +271,9 @@ kptllnd_peer_cancel_active_txs( kptl_tx_t *tx; ptl_err_t ptl_rc; int counter; + unsigned long flags; - spin_lock(&peer->peer_lock); + spin_lock_irqsave(&peer->peer_lock, flags); if(!list_empty(&peer->peer_active_txs)) PJK_UT_MSG("Clearing Active TXs\n"); @@ -271,7 +292,7 @@ again: */ kptllnd_tx_addref(tx); - spin_unlock(&peer->peer_lock); + spin_unlock_irqrestore(&peer->peer_lock, flags); /* @@ -306,7 +327,7 @@ again: kptllnd_tx_decref(tx); - spin_lock(&peer->peer_lock); + spin_lock_irqsave(&peer->peer_lock, flags); /* * If a change in the list has be detected @@ -316,7 +337,7 @@ again: goto again; } - spin_unlock(&peer->peer_lock); + spin_unlock_irqrestore(&peer->peer_lock, flags); } void @@ -481,9 +502,10 @@ kptllnd_peer_dequeue_tx( kptl_peer_t *peer, kptl_tx_t *tx) { - spin_lock(&peer->peer_lock); + unsigned long flags; + spin_lock_irqsave(&peer->peer_lock, flags); kptllnd_peer_dequeue_tx_locked(peer,tx); - spin_unlock(&peer->peer_lock); + spin_unlock_irqrestore(&peer->peer_lock, flags); } void @@ -491,18 +513,21 @@ kptllnd_peer_check_sends ( kptl_peer_t *peer ) { - kptl_tx_t *tx; - kptl_data_t *kptllnd_data = peer->peer_kptllnd_data; - int rc,rc2; - ptl_md_t md; - ptl_handle_me_t meh; + kptl_tx_t *tx; + kptl_data_t *kptllnd_data = peer->peer_kptllnd_data; + int rc,rc2; + ptl_md_t md; + ptl_handle_me_t meh; ptl_process_id_t target; + unsigned long flags; + + LASSERT(!in_interrupt()); /* * If there is nothing to send, and we have hit the credit * high water mark, then send a no-op message */ - spin_lock(&peer->peer_lock); + spin_lock_irqsave(&peer->peer_lock, flags); PJK_UT_MSG_DATA(">>>Peer=%p Credits=%d Outstanding=%d\n", peer,peer->peer_credits,peer->peer_outstanding_credits); @@ -580,12 +605,12 @@ kptllnd_peer_check_sends ( if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP && (!list_empty(&peer->peer_pending_txs) || peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)) { - spin_unlock(&peer->peer_lock); + spin_unlock_irqrestore(&peer->peer_lock, flags); /* redundant NOOP */ kptllnd_tx_decref(tx); CDEBUG(D_NET, LPX64": redundant noop\n", peer->peer_nid); - spin_lock(&peer->peer_lock); + spin_lock_irqsave(&peer->peer_lock, flags); continue; } @@ -739,7 +764,7 @@ kptllnd_peer_check_sends ( */ tx->tx_mdh = PTL_INVALID_HANDLE; - spin_unlock(&peer->peer_lock); + spin_unlock_irqrestore(&peer->peer_lock, flags); kptllnd_tx_decref(tx); @@ -789,7 +814,7 @@ kptllnd_peer_check_sends ( */ kptllnd_tx_addref(tx); - spin_unlock(&peer->peer_lock); + spin_unlock_irqrestore(&peer->peer_lock, flags); rc = PtlPut ( tx->tx_mdh_msg, @@ -823,18 +848,18 @@ kptllnd_peer_check_sends ( */ kptllnd_tx_decref(tx); - spin_lock(&peer->peer_lock); + spin_lock_irqsave(&peer->peer_lock, flags); } - spin_unlock(&peer->peer_lock); + spin_unlock_irqrestore(&peer->peer_lock, flags); PJK_UT_MSG_DATA("<<<\n"); return; failed_with_lock: - spin_unlock(&peer->peer_lock); + spin_unlock_irqrestore(&peer->peer_lock, flags); failed_without_lock: /* @@ -869,10 +894,11 @@ failed_without_lock: int kptllnd_peer_timedout(kptl_peer_t *peer) { - kptl_tx_t *tx; - int rc = 0; + kptl_tx_t *tx; + int rc = 0; + unsigned long flags; - spin_lock(&peer->peer_lock); + spin_lock_irqsave(&peer->peer_lock, flags); /* * Check the head of the pending list for expiration @@ -900,7 +926,7 @@ kptllnd_peer_timedout(kptl_peer_t *peer) } } - spin_unlock(&peer->peer_lock); + spin_unlock_irqrestore(&peer->peer_lock, flags); return rc; } @@ -958,13 +984,13 @@ kptllnd_peer_find ( kptl_peer_t *peer; unsigned long flags; read_lock_irqsave(&kptllnd_data->kptl_peer_rw_lock, flags); - peer = kptllnd_peer_find_locked(kptllnd_data,nid); + peer = kptllnd_peer_find_holding_list_lock(kptllnd_data,nid); read_unlock_irqrestore(&kptllnd_data->kptl_peer_rw_lock, flags); return peer; } kptl_peer_t * -kptllnd_peer_find_locked ( +kptllnd_peer_find_holding_list_lock ( kptl_data_t *kptllnd_data, lnet_nid_t nid) { @@ -1001,7 +1027,8 @@ kptllnd_peer_handle_hello ( lnet_nid_t nid, kptl_msg_t *msg) { - kptl_peer_t *peer; + kptl_peer_t *peer = NULL; + kptl_peer_t *peer_allocated = NULL; kptl_peer_t *peer_to_cancel = NULL; unsigned long flags; kptl_tx_t *tx_hello = NULL; @@ -1026,13 +1053,40 @@ kptllnd_peer_handle_hello ( return 0; } + + /* + * Setup a connect HELLO message. We ultimately might not + * use it but likely we will. + */ + tx_hello = kptllnd_get_idle_tx(kptllnd_data,TX_TYPE_SMALL_MESSAGE); + if( tx_hello == NULL) { + CERROR("Unable to allocate connect message for "LPX64"\n",nid); + return 0; + } + + kptllnd_init_msg( + tx_hello->tx_msg, + PTLLND_MSG_TYPE_HELLO, + sizeof(kptl_hello_msg_t)); + + /* + * Allocate a peer, even though we might not ultimatly use it + * however we want to avoid doing this while holidng + * the peer_rw_lock and be forced into atomic context + */ + rc = kptllnd_peer_allocate ( kptllnd_data, &peer_allocated, nid); + if(rc != 0){ + kptllnd_tx_decref(tx_hello); + CERROR("Failed to create peer (nid="LPX64")\n",nid); + return 0; + } write_lock_irqsave(&kptllnd_data->kptl_peer_rw_lock, flags); /* * Look for peer because it could have been previously here */ - peer = kptllnd_peer_find_locked(kptllnd_data,nid); + peer = kptllnd_peer_find_holding_list_lock(kptllnd_data,nid); /* * If peer is already here @@ -1089,20 +1143,6 @@ kptllnd_peer_handle_hello ( if( peer == NULL) { /* - * Setup a connect HELLO message. We ultimately might not - * use it but likely we will. - */ - tx_hello = kptllnd_get_idle_tx(kptllnd_data,TX_TYPE_SMALL_MESSAGE); - if( tx_hello == NULL) { - CERROR("Unable to allocate connect message for "LPX64"\n",nid); - goto failed; - } - - kptllnd_init_msg( - tx_hello->tx_msg, - PTLLND_MSG_TYPE_HELLO, - sizeof(kptl_hello_msg_t)); - /* * Put the match bits into the hello message */ tx_hello->tx_msg->ptlm_u.hello.kptlhm_matchbits = @@ -1110,12 +1150,18 @@ kptllnd_peer_handle_hello ( tx_hello->tx_msg->ptlm_u.hello.kptlhm_max_immd_size = *kptllnd_tunables.kptl_max_immd_size; - rc = kptllnd_peer_create_locked ( kptllnd_data, &peer, nid); + /* + * Try and attach this peer to the list + */ + rc = kptllnd_peer_add_to_list_locked ( kptllnd_data, peer_allocated); if(rc != 0){ CERROR("Failed to create peer (nid="LPX64")\n",nid); - peer = NULL; goto failed; } + + peer = peer_allocated; + peer_allocated = NULL; + LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO); peer->peer_state = PEER_STATE_ACTIVE; @@ -1124,7 +1170,7 @@ kptllnd_peer_handle_hello ( * NB We don't need to hold the peer->peer_lock * because we haven't released the kptl_peer_rw_lock which * holds prevents anyone else from getting a pointer to - * this newly created peer + * this newly added (to the lost) peer */ /* @@ -1176,6 +1222,9 @@ failed: kptllnd_peer_cancel(peer_to_cancel); kptllnd_peer_decref(peer_to_cancel,"find"); } + + if(peer_allocated) + kptllnd_peer_decref(peer_allocated,"alloc"); PJK_UT_MSG("<<< Peer=%p\n",peer); @@ -1189,11 +1238,13 @@ kptllnd_tx_launch ( lnet_msg_t *ptlmsg ) { kptl_data_t *kptllnd_data = tx->tx_po.po_kptllnd_data; - kptl_peer_t *peer; + kptl_peer_t *peer = NULL; + kptl_peer_t *peer_allocated = NULL; unsigned long flags; rwlock_t *g_lock = &kptllnd_data->kptl_peer_rw_lock; int rc; - kptl_tx_t *tx_hello; + kptl_tx_t *tx_hello = NULL; + /* If I get here, I've committed to send, so I complete the tx with * failure on any problems */ @@ -1218,9 +1269,9 @@ kptllnd_tx_launch ( * (which could send it) */ if (peer != NULL) { - spin_lock(&peer->peer_lock); + spin_lock_irqsave(&peer->peer_lock, flags); kptllnd_peer_queue_tx_locked ( peer, tx ); - spin_unlock(&peer->peer_lock); + spin_unlock_irqrestore(&peer->peer_lock, flags); kptllnd_peer_check_sends(peer); kptllnd_peer_decref(peer,"find"); PJK_UT_MSG("<<< FOUND\n"); @@ -1238,7 +1289,6 @@ kptllnd_tx_launch ( if( tx_hello == NULL) { CERROR("Unable to allocate connect message for "LPX64"\n",target_nid); kptllnd_tx_decref (tx); - kptllnd_peer_decref(peer,"find"); return; } @@ -1246,6 +1296,27 @@ kptllnd_tx_launch ( tx_hello->tx_msg, PTLLND_MSG_TYPE_HELLO, sizeof(kptl_hello_msg_t)); + + /* + * We've never seen this peer before. So setup + * a default message. + */ + tx_hello->tx_msg->ptlm_u.hello.kptlhm_matchbits = 0; + tx_hello->tx_msg->ptlm_u.hello.kptlhm_max_immd_size = + *kptllnd_tunables.kptl_max_immd_size; + + /* + * Allocate a new peer + * (it's not active until its on the list) + */ + PJK_UT_MSG("TX %p creating NEW PEER nid="LPX64"\n",tx,target_nid); + rc = kptllnd_peer_allocate ( kptllnd_data, &peer_allocated, target_nid); + if(rc != 0){ + CERROR("Failed to create peer (nid="LPX64")\n",target_nid); + kptllnd_tx_decref (tx); + kptllnd_tx_decref (tx_hello); + return; + } /* @@ -1254,7 +1325,7 @@ kptllnd_tx_launch ( */ write_lock_irqsave(g_lock, flags); - peer = kptllnd_peer_find_locked (kptllnd_data,target_nid); + peer = kptllnd_peer_find_holding_list_lock (kptllnd_data,target_nid); /* * If we find the peer @@ -1266,11 +1337,13 @@ kptllnd_tx_launch ( CDEBUG(D_TRACE,"HELLO message race occurred (nid="LPX64")\n",target_nid); - spin_lock(&peer->peer_lock); + spin_lock_irqsave(&peer->peer_lock, flags); kptllnd_peer_queue_tx_locked ( peer, tx ); - spin_unlock(&peer->peer_lock); + spin_unlock_irqrestore(&peer->peer_lock, flags); + kptllnd_peer_check_sends(peer); kptllnd_peer_decref(peer,"find"); + kptllnd_peer_decref(peer_allocated,"alloc"); /* and we don't need the connection tx*/ kptllnd_tx_decref(tx_hello); @@ -1278,27 +1351,27 @@ kptllnd_tx_launch ( PJK_UT_MSG("<<< FOUND2\n"); return; } - - PJK_UT_MSG("TX %p creating NEW PEER nid="LPX64"\n",tx,target_nid); - rc = kptllnd_peer_create_locked ( kptllnd_data, &peer, target_nid); + + + rc = kptllnd_peer_add_to_list_locked ( kptllnd_data, peer_allocated); if(rc != 0){ - CERROR("Failed to create peer (nid="LPX64")\n",target_nid); write_unlock_irqrestore(g_lock, flags); + + CERROR("Failed to add peer to list (nid="LPX64")\n",target_nid); + + /* Drop these TXs tx*/ kptllnd_tx_decref (tx); kptllnd_tx_decref (tx_hello); - kptllnd_peer_decref(peer,"find"); + kptllnd_peer_decref(peer_allocated,"create"); return; } + + peer = peer_allocated; + peer_allocated = NULL; - - /* - * We've never seen this peer before. So setup - * a default message. - */ - tx_hello->tx_msg->ptlm_u.hello.kptlhm_matchbits = 0; - tx_hello->tx_msg->ptlm_u.hello.kptlhm_max_immd_size = - *kptllnd_tunables.kptl_max_immd_size; - + write_unlock_irqrestore(g_lock,flags); + + /* * Queue the connection request * and the actually tx. We have one credit so @@ -1306,13 +1379,12 @@ kptllnd_tx_launch ( * the tx will wait for a reply. */ PJK_UT_MSG("TXHello=%p\n",tx_hello); - - spin_lock(&peer->peer_lock); + + + spin_lock_irqsave(&peer->peer_lock, flags); kptllnd_peer_queue_tx_locked(peer,tx_hello); kptllnd_peer_queue_tx_locked(peer,tx); - spin_unlock(&peer->peer_lock); - - write_unlock_irqrestore(g_lock,flags); + spin_unlock_irqrestore(&peer->peer_lock, flags); kptllnd_peer_check_sends(peer); kptllnd_peer_decref(peer,"find"); diff --git a/lnet/klnds/ptllnd/ptllnd_rx_buf.c b/lnet/klnds/ptllnd/ptllnd_rx_buf.c index 45f0c15..5fae660 100644 --- a/lnet/klnds/ptllnd/ptllnd_rx_buf.c +++ b/lnet/klnds/ptllnd/ptllnd_rx_buf.c @@ -56,10 +56,11 @@ kptllnd_rx_buffer_pool_fini( kptl_rx_buffer_t *rxb; int rc; int i; + unsigned long flags; PJK_UT_MSG("kptllnd_rx_buffer_pool_fini\n"); - - spin_lock(&rxbp->rxbp_lock); + + spin_lock_irqsave(&rxbp->rxbp_lock, flags); /* * Set the shutdown flag under the lock @@ -110,7 +111,7 @@ kptllnd_rx_buffer_pool_fini( */ kptllnd_rx_buffer_addref(rxb,"temp"); - spin_unlock(&rxbp->rxbp_lock); + spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); /* * Unlinked the MD @@ -127,14 +128,14 @@ kptllnd_rx_buffer_pool_fini( */ kptllnd_rx_buffer_decref(rxb,"temp"); - spin_lock(&rxbp->rxbp_lock); + spin_lock_irqsave(&rxbp->rxbp_lock, flags); }else{ PJK_UT_MSG("PtlMDUnlink(%p) rc=%d\n",rxb,rc); /* * The unlinked failed so put this back * on the list for later */ - spin_lock(&rxbp->rxbp_lock); + spin_lock_irqsave(&rxbp->rxbp_lock, flags); list_add_tail(&rxb->rxb_list,&rxbp->rxbp_list); @@ -155,9 +156,9 @@ kptllnd_rx_buffer_pool_fini( CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */ "Waiting for %d Busy RX Buffers\n", rxbp->rxbp_count); - spin_unlock(&rxbp->rxbp_lock); + spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); cfs_pause(cfs_time_seconds(1)); - spin_lock(&rxbp->rxbp_lock); + spin_lock_irqsave(&rxbp->rxbp_lock, flags); } } @@ -172,15 +173,15 @@ kptllnd_rx_buffer_pool_fini( CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */ "Waiting for %d RX Buffers to unlink\n", rxbp->rxbp_count); - spin_unlock(&rxbp->rxbp_lock); + spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); cfs_pause(cfs_time_seconds(1)); - spin_lock(&rxbp->rxbp_lock); + spin_lock_irqsave(&rxbp->rxbp_lock, flags); } } CDEBUG(D_TRACE,"|rxbp_count|=0\n"); - spin_unlock(&rxbp->rxbp_lock); + spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); } @@ -195,8 +196,9 @@ kptllnd_rx_buffer_pool_reserve( int rc; kptl_rx_buffer_t *rxb; int nbuffers; + unsigned long flags; - spin_lock(&rxbp->rxbp_lock); + spin_lock_irqsave(&rxbp->rxbp_lock, flags); PJK_UT_MSG("kptllnd_rx_buffer_pool_reserve(%d)\n",count); @@ -204,7 +206,7 @@ kptllnd_rx_buffer_pool_reserve( * Prevent reservation of anymore while we are shutting down */ if(rxbp->rxbp_shutdown){ - spin_unlock(&rxbp->rxbp_lock); + spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); return -ESHUTDOWN; } @@ -234,7 +236,7 @@ kptllnd_rx_buffer_pool_reserve( * we'll subtract if we hit an error. */ rxbp->rxbp_count += add; - spin_unlock(&rxbp->rxbp_lock); + spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); for(i=0;irxbp_lock); + spin_lock_irqsave(&rxbp->rxbp_lock, flags); /* * We really didn't add as many @@ -284,7 +286,7 @@ failed: * Cancel this reservation */ rxbp->rxbp_reserved -= count; - spin_unlock(&rxbp->rxbp_lock); + spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); if(rxb){ @@ -301,10 +303,11 @@ kptllnd_rx_buffer_pool_unreserve( kptl_rx_buffer_pool_t *rxbp, int count) { - spin_lock(&rxbp->rxbp_lock); + unsigned long flags; + spin_lock_irqsave(&rxbp->rxbp_lock, flags); PJK_UT_MSG("kptllnd_rx_buffer_pool_unreserve(%d)\n",count); rxbp->rxbp_reserved -= count; - spin_unlock(&rxbp->rxbp_lock); + spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); } void @@ -335,19 +338,20 @@ kptllnd_rx_buffer_post( ptl_process_id_t any; kptl_data_t *kptllnd_data = rxb->rxb_po.po_kptllnd_data; kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool; + unsigned long flags; any.nid = PTL_NID_ANY; any.pid = PTL_PID_ANY; /*PJK_UT_MSG("rxb=%p\n",rxb);*/ - spin_lock(&rxbp->rxbp_lock); + spin_lock_irqsave(&rxbp->rxbp_lock, flags); /* * No new RXB's can enter the POSTED state */ if(rxbp->rxbp_shutdown){ - spin_unlock(&rxbp->rxbp_lock); + spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); return -ESHUTDOWN; } @@ -361,7 +365,7 @@ kptllnd_rx_buffer_post( atomic_set(&rxb->rxb_refcount,1); rxb->rxb_state = RXB_STATE_POSTED; - spin_unlock(&rxbp->rxbp_lock); + spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); /* * Attach the ME @@ -416,9 +420,9 @@ kptllnd_rx_buffer_post( * to deal with shutdown race, of * a partially constructed rbx */ - spin_lock(&rxbp->rxbp_lock); + spin_lock_irqsave(&rxbp->rxbp_lock, flags); rxb->rxb_mdh = mdh; - spin_unlock(&rxbp->rxbp_lock); + spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); return 0; @@ -427,11 +431,11 @@ failure: /* * Cleanup on error */ - spin_lock(&rxbp->rxbp_lock); + spin_lock_irqsave(&rxbp->rxbp_lock, flags); list_del_init(&rxb->rxb_list); atomic_set(&rxb->rxb_refcount,0); rxb->rxb_state = RXB_STATE_IDLE; - spin_unlock(&rxbp->rxbp_lock); + spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); return rc; } @@ -472,15 +476,16 @@ kptllnd_rx_buffer_destroy( kptl_rx_buffer_t *rxb) { kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool; + unsigned long flags; LASSERT(atomic_read(&rxb->rxb_refcount) == 0); LASSERT(rxb->rxb_state == RXB_STATE_IDLE); LASSERT(PtlHandleIsEqual(rxb->rxb_mdh,PTL_INVALID_HANDLE)); - spin_lock(&rxbp->rxbp_lock); + spin_lock_irqsave(&rxbp->rxbp_lock, flags); list_del(&rxb->rxb_list); rxbp->rxbp_count--; - spin_unlock(&rxbp->rxbp_lock); + spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); LIBCFS_FREE( rxb->rxb_buffer,PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages); LIBCFS_FREE(rxb,sizeof(*rxb)); @@ -491,12 +496,12 @@ kptllnd_rx_buffer_destroy( void kptllnd_rx_buffer_callback(ptl_event_t *ev) { - kptl_rx_buffer_t *rxb = ev->md.user_ptr; - kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool; - /*kptl_data_t *kptllnd_data = rxb->rxb_po.po_kptllnd_data;*/ - kptl_rx_t *rx; - int nob; - int unlinked; + kptl_rx_buffer_t *rxb = ev->md.user_ptr; + kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool; + kptl_rx_t *rx; + int nob; + int unlinked; + unsigned long flags; /* * Set the local unlinked flag @@ -525,7 +530,7 @@ kptllnd_rx_buffer_callback(ptl_event_t *ev) nob = ev->mlength; if(unlinked){ - spin_lock(&rxbp->rxbp_lock); + spin_lock_irqsave(&rxbp->rxbp_lock, flags); /* * Remove this from the list @@ -537,14 +542,12 @@ kptllnd_rx_buffer_callback(ptl_event_t *ev) rxb->rxb_mdh = PTL_INVALID_HANDLE; if( rxbp->rxbp_shutdown){ - spin_unlock(&rxbp->rxbp_lock); + spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); kptllnd_rx_buffer_decref(rxb,"portals"); return; } - spin_unlock(&rxbp->rxbp_lock); - - + spin_unlock_irqrestore(&rxbp->rxbp_lock, flags); } /* @@ -619,6 +622,7 @@ kptllnd_rx_scheduler_handler(kptl_rx_t *rx) int returned_credits = 0; int type = msg->ptlm_type; lnet_nid_t lnet_initiator_nid = ptl2lnetnid(kptllnd_data,rx->rx_initiator.nid); + unsigned long flags; PJK_UT_MSG_DATA(">>> RXRXRXRXRXRXRXRXRXRXRXRX\n"); @@ -755,10 +759,10 @@ kptllnd_rx_scheduler_handler(kptl_rx_t *rx) /* * Save the last match bits used */ - spin_lock(&rx->rx_peer->peer_lock); + spin_lock_irqsave(&rx->rx_peer->peer_lock, flags); if(msg->ptlm_u.req.kptlrm_matchbits > rx->rx_peer->peer_last_matchbits_seen) rx->rx_peer->peer_last_matchbits_seen = msg->ptlm_u.req.kptlrm_matchbits; - spin_unlock(&rx->rx_peer->peer_lock); + spin_unlock_irqrestore(&rx->rx_peer->peer_lock, flags); rc = lnet_parse(kptllnd_data->kptl_ni, &msg->ptlm_u.req.kptlrm_hdr, @@ -867,9 +871,10 @@ kptllnd_rx_alloc( void kptllnd_rx_destroy(kptl_rx_t *rx,kptl_data_t *kptllnd_data) { - kptl_peer_t *peer = rx->rx_peer; - kptl_msg_t *msg = rx->rx_msg; - int returned_credits = msg->ptlm_credits; + kptl_peer_t *peer = rx->rx_peer; + kptl_msg_t *msg = rx->rx_msg; + int returned_credits = msg->ptlm_credits; + unsigned long flags; PJK_UT_MSG(">>> rx=%p\n",rx); @@ -891,14 +896,14 @@ kptllnd_rx_destroy(kptl_rx_t *rx,kptl_data_t *kptllnd_data) * Update credits * (Only after I've reposted the buffer) */ - spin_lock(&peer->peer_lock); + spin_lock_irqsave(&peer->peer_lock, flags); peer->peer_credits += returned_credits; LASSERT( peer->peer_credits <= *kptllnd_tunables.kptl_peercredits); peer->peer_outstanding_credits++; LASSERT( peer->peer_outstanding_credits <= *kptllnd_tunables.kptl_peercredits); - spin_unlock(&peer->peer_lock); + spin_unlock_irqrestore(&peer->peer_lock, flags); PJK_UT_MSG_DATA("Giving Back %d credits rx=%p\n",returned_credits,rx); diff --git a/lnet/klnds/ptllnd/ptllnd_tx.c b/lnet/klnds/ptllnd/ptllnd_tx.c index 10dfc96..344f006 100644 --- a/lnet/klnds/ptllnd/ptllnd_tx.c +++ b/lnet/klnds/ptllnd/ptllnd_tx.c @@ -274,10 +274,11 @@ kptllnd_tx_schedule (kptl_tx_t *tx) void kptllnd_tx_callback(ptl_event_t *ev) { - kptl_tx_t *tx = ev->md.user_ptr; - kptl_peer_t *peer; - int rc; - int do_decref = 0; + kptl_tx_t *tx = ev->md.user_ptr; + kptl_peer_t *peer; + int rc; + int do_decref = 0; + unsigned long flags; PJK_UT_MSG(">>> %s(%d) tx=%p fail=%d\n", get_ev_type_string(ev->type),ev->type,tx,ev->ni_fail_type); @@ -315,8 +316,8 @@ kptllnd_tx_callback(ptl_event_t *ev) LASSERT(tx->tx_peer != NULL); peer = tx->tx_peer; - - spin_lock(&peer->peer_lock); + + spin_lock_irqsave(&peer->peer_lock, flags); /* * Save the status flag @@ -443,7 +444,7 @@ kptllnd_tx_callback(ptl_event_t *ev) LBUG(); } - spin_unlock(&peer->peer_lock); + spin_unlock_irqrestore(&peer->peer_lock, flags); if(do_decref) kptllnd_tx_scheduled_decref(tx);