Whamcloud - gitweb
* make scheduler threads balance rx and tx more evenly
authorpjkirner <pjkirner>
Thu, 29 Sep 2005 20:04:49 +0000 (20:04 +0000)
committerpjkirner <pjkirner>
Thu, 29 Sep 2005 20:04:49 +0000 (20:04 +0000)
* move peer allocation outside spin lock
* fix bug with irqs not being disabled on spinlocks that need syncronzation with callbacks

lnet/klnds/ptllnd/ptllnd.h
lnet/klnds/ptllnd/ptllnd_cb.c
lnet/klnds/ptllnd/ptllnd_peer.c
lnet/klnds/ptllnd/ptllnd_rx_buf.c
lnet/klnds/ptllnd/ptllnd_tx.c

index 95f31c0..9806bc4 100755 (executable)
@@ -266,9 +266,10 @@ typedef struct kptl_tx                           /* transmit message */
 typedef enum
 {
         PEER_STATE_UNINITIALIZED        = 0,
-        PEER_STATE_WAITING_HELLO        = 1,
-        PEER_STATE_ACTIVE               = 2,
-        PEER_STATE_CANCELED             = 3,
+        PEER_STATE_ALLOCATED            = 1,    //QQQ
+        PEER_STATE_WAITING_HELLO        = 2,
+        PEER_STATE_ACTIVE               = 3,
+        PEER_STATE_CANCELED             = 4,
 }kptllnd_peer_state_t;
 
 struct kptl_peer
@@ -313,7 +314,7 @@ struct kptl_data
         cfs_mem_cache_t*        kptl_rx_cache;         /* rx descripter cache */
 
         struct kptl_tx         *kptl_tx_descs;         /* the tx descriptors array */
-        spinlock_t              kptl_tx_lock;          /* serialise the next 4 members*/
+        spinlock_t              kptl_tx_lock;          /* serialise idle tx list*/
         struct list_head        kptl_idle_txs;         /* idle tx descriptors */
 
         rwlock_t                kptl_peer_rw_lock;     /* lock for peer table */
@@ -658,10 +659,6 @@ kptllnd_setup_md(
         int              payload_nob,
         tempiov_t       *tempiov);
 
-int kptllnd_process_scheduled_tx(kptl_data_t *kptllnd_data);
-int kptllnd_process_scheduled_rx(kptl_data_t *kptllnd_data);
-int kptllnd_process_scheduled_rxb(kptl_data_t *kptllnd_data);
-
 static inline lnet_nid_t ptl2lnetnid(kptl_data_t *kptllnd_data,ptl_nid_t portals_nid)
 {
 #ifdef _USING_LUSTRE_PORTALS_
index 408cc49..27b1f81 100644 (file)
@@ -218,6 +218,7 @@ kptllnd_start_bulk_rdma(
         tempiov_t        tempiov;
         kptl_msg_t      *rxmsg = rx->rx_msg;
         kptl_peer_t     *peer = rx->rx_peer;
+        unsigned long   flags;
 
 
         /*
@@ -252,7 +253,7 @@ kptllnd_start_bulk_rdma(
                 payload_niov,payload_iov,payload_kiov,
                 payload_offset,payload_nob,&tempiov);
 
-        spin_lock(&peer->peer_lock);
+        spin_lock_irqsave(&peer->peer_lock, flags);
 
         /*
          * Attach the MD
@@ -265,7 +266,7 @@ kptllnd_start_bulk_rdma(
         if(ptl_rc != PTL_OK){
                 CERROR("PtlMDBind failed %d\n",ptl_rc);
 
-                spin_unlock(&peer->peer_lock);
+                spin_unlock_irqrestore(&peer->peer_lock, flags);
                 /*
                  * Just drop the ref for this MD because it was never
                  * posted to portals
@@ -290,7 +291,7 @@ kptllnd_start_bulk_rdma(
          */
         kptllnd_tx_addref(tx);
 
-        spin_unlock(&peer->peer_lock);
+        spin_unlock_irqrestore(&peer->peer_lock, flags);
 
 
         /*
@@ -321,7 +322,7 @@ kptllnd_start_bulk_rdma(
                 CERROR("Ptl%s failed: %d\n",
                         op == PTL_MD_OP_GET ? "Get" : "Put",ptl_rc);
 
-                spin_lock(&peer->peer_lock);
+                spin_lock_irqsave(&peer->peer_lock, flags);
 
                 /*
                  * Unlink the MD because it's not yet in use
@@ -349,7 +350,7 @@ kptllnd_start_bulk_rdma(
                 kptllnd_peer_dequeue_tx_locked(peer,tx);
                 tx->tx_peer = NULL;
 
-                spin_unlock(&peer->peer_lock);
+                spin_unlock_irqrestore(&peer->peer_lock, flags);
 
                 rc = -ENOMEM;
                 goto end;
@@ -888,6 +889,10 @@ kptllnd_scheduler(void *arg)
         cfs_time_t      last_check = cfs_time_current();
         cfs_duration_t  duration;
         time_t          duration_sec;
+        unsigned long           flags;
+        kptl_rx_t               *rx = NULL;
+        kptl_rx_buffer_t        *rxb = NULL;
+        kptl_tx_t               *tx = NULL;
 
         PJK_UT_MSG(">>>\n");
 
@@ -959,26 +964,65 @@ kptllnd_scheduler(void *arg)
                         }
                 }
 
-                /*
-                 * Drain the RX queue
-                 */
-                while(kptllnd_process_scheduled_rx(kptllnd_data)!=0);
 
                 /*
-                 * Repost all RXBs
+                 * Now service the queuse
                  */
-                while(kptllnd_process_scheduled_rxb(kptllnd_data)!=0);
-
-                /*
-                 * Drain the TX queue.  Note RX's can cause new TX's
-                 * to be added to the queue.
-                 */
-                while(kptllnd_process_scheduled_tx(kptllnd_data)!=0);
-
+                do{
+                        spin_lock_irqsave(&kptllnd_data->kptl_sched_lock, flags);
+                        
+                        /*
+                         * Drain the RX queue
+                         */     
+                        rx = NULL;                   
+                        if(!list_empty(&kptllnd_data->kptl_sched_rxq)){
+                                rx = list_entry (kptllnd_data->kptl_sched_rxq.next,
+                                                 kptl_rx_t, rx_list);
+                                list_del_init(&rx->rx_list);
+                        }
+                        
+                        /*
+                         * IDrain the RXB Repost queue
+                         */
+                        rxb = NULL;
+                        if(!list_empty(&kptllnd_data->kptl_sched_rxbq)){
+                                rxb = list_entry (kptllnd_data->kptl_sched_rxbq.next,
+                                                 kptl_rx_buffer_t, rxb_repost_list);
+                                list_del_init(&rxb->rxb_repost_list);
+                        }
+                        /*
+                         * Drain the TX queue.  Note RX's can cause new TX's
+                         * to be added to the queue.
+                         */         
+                        tx = NULL;               
+                        if(!list_empty(&kptllnd_data->kptl_sched_txq)){
+                                tx = list_entry (kptllnd_data->kptl_sched_txq.next,
+                                                 kptl_tx_t, tx_schedlist);
+                                list_del_init(&tx->tx_schedlist);
+                        }       
+                        
+                        spin_unlock_irqrestore(&kptllnd_data->kptl_sched_lock, flags);                  
+                        
+                        
+                        /*
+                         * Process anything that came off the list
+                         */         
+                        if(rx)
+                                kptllnd_rx_scheduler_handler(rx);
+                        if(rxb)
+                                kptllnd_rx_buffer_post_handle_error(rxb);
+                        if(tx){
+                                PJK_UT_MSG(">>> tx=%p\n",tx);
+                                kptllnd_tx_done(tx);
+                                PJK_UT_MSG("<<<\n");
+                        }                                
+                
+                        /*
+                         * As long as we did something this time around
+                         * try again.
+                         */                
+                }while(rx != NULL || rxb != NULL || tx != NULL);
 
-                /*
-                 * Clean any canceled peers
-                 */
                 kptllnd_clean_canceled_peers(kptllnd_data);
         }
 
index c5866e3..b5d0326 100644 (file)
@@ -24,14 +24,53 @@ kptllnd_peer_destroy (
         kptl_peer_t *peer);
 
 kptl_peer_t *
-kptllnd_peer_find_locked (
+kptllnd_peer_find_holding_list_lock (
         kptl_data_t *kptllnd_data,
         lnet_nid_t nid);
 
 
+int 
+kptllnd_peer_add_to_list_locked (
+        kptl_data_t *kptllnd_data,
+        kptl_peer_t *peer)
+{
+        /* QQQ - this got split out
+         * But first check we haven't exceeded or maximum
+         * number of peers
+         */
+        if (atomic_read(&kptllnd_data->kptl_npeers) >=
+            *kptllnd_tunables.kptl_concurrent_peers) {
+                STAT_UPDATE(kps_too_many_peers);
+                CERROR("Can't create peer: too many peers\n");
+                return -EOVERFLOW;      /* !! but at least it distinguishes */
+        }
+        
+        /*
+         * Update the state
+         */
+        LASSERT(peer->peer_state == PEER_STATE_ALLOCATED);
+        peer->peer_state = PEER_STATE_WAITING_HELLO;
+        
+        /*
+         * +1 ref for the list
+         */
+        atomic_inc(&peer->peer_refcount);
+
+        /* npeers only grows with the global lock held */
+        atomic_inc(&kptllnd_data->kptl_npeers);
+
+        /* And add this to the list */
+        LASSERT(list_empty(&peer->peer_list));
+        list_add_tail (&peer->peer_list,
+                       kptllnd_nid2peerlist (kptllnd_data,peer->peer_nid));
+
+        STAT_UPDATE(kps_peers_created);
+        
+        return 0;
+}
 
 int
-kptllnd_peer_create_locked (
+kptllnd_peer_allocate (
         kptl_data_t *kptllnd_data,
         kptl_peer_t **peerp,
         lnet_nid_t nid)
@@ -43,17 +82,6 @@ kptllnd_peer_create_locked (
 
         LASSERT (nid != PTL_NID_ANY);
 
-        /*
-         * But first check we haven't exceeded or maximum
-         * number of peers
-         */
-        if (atomic_read(&kptllnd_data->kptl_npeers) >=
-            *kptllnd_tunables.kptl_concurrent_peers) {
-                STAT_UPDATE(kps_too_many_peers);
-                CERROR("Can't create peer: too many peers\n");
-                rc = -EOVERFLOW;        /* !! but at least it distinguishes */
-        }
-
         LIBCFS_ALLOC(peer, sizeof (*peer));
         if (peer == NULL) {
                 CERROR("Cannot allocate memory for peer\n");
@@ -68,7 +96,7 @@ kptllnd_peer_create_locked (
         spin_lock_init (&peer->peer_lock);
 
 
-        peer->peer_state = PEER_STATE_WAITING_HELLO;
+        peer->peer_state = PEER_STATE_ALLOCATED;
         peer->peer_kptllnd_data = kptllnd_data;
         peer->peer_nid = nid;
         //peer->peer_incarnation = 0;
@@ -89,7 +117,6 @@ kptllnd_peer_create_locked (
         peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
         //peer->peer_last_matchbits_seen = 0;
 
-
         /*
          * Reserve space in the RX buffer pool for this new peer
          */
@@ -103,20 +130,10 @@ kptllnd_peer_create_locked (
                 return rc;
         }
 
-        /*
-         * 1 ref for the list
+        /* QQQ - we're not adding to the list anymore
          * 1 for the caller
          */
-        atomic_set (&peer->peer_refcount, 2);
-
-        /* npeers only grows with the global lock held */
-        atomic_inc(&kptllnd_data->kptl_npeers);
-
-        /* And add this to the list */
-        list_add_tail (&peer->peer_list,
-                       kptllnd_nid2peerlist (kptllnd_data,nid));
-
-        STAT_UPDATE(kps_peers_created);
+        atomic_set (&peer->peer_refcount, 1);
 
         PJK_UT_MSG("<<< Peer=%p nid="LPX64"\n",peer,nid);
         *peerp = peer;
@@ -145,13 +162,15 @@ kptllnd_peer_destroy (
         kptllnd_rx_buffer_pool_unreserve(
                 &kptllnd_data->kptl_rx_buffer_pool,
                 *kptllnd_tunables.kptl_peercredits);
-
-
-        /* NB a peer's connections keep a reference on their peer until
-         * they are destroyed, so we can be assured that _all_ state to do
-         * with this peer has been cleaned up when its refcount drops to
-         * zero. */
-        atomic_dec(&kptllnd_data->kptl_npeers);
+                
+        /*
+         * If the peer is only in the ALLOCATED state
+         * then it isn't yet trackied in kptl_npeers,
+         * so do nothing in that case.  In all other cases
+         * we need to decrement the counter.
+         */         
+        if(peer->peer_state != PEER_STATE_ALLOCATED) 
+                atomic_dec(&kptllnd_data->kptl_npeers);                        
 }
 
 
@@ -198,7 +217,7 @@ kptllnd_peer_decref (
         if(peer->peer_state == PEER_STATE_CANCELED)
                 kptllnd_data->kptl_canceled_peers_counter++;
         write_unlock_irqrestore(&kptllnd_data->kptl_peer_rw_lock, flags);
-
+        
         kptllnd_peer_destroy(peer);
 }
 
@@ -211,6 +230,7 @@ kptllnd_peer_cancel_pending_txs(
         struct list_head  *tx_temp;
         struct list_head  *tx_next;
         kptl_tx_t         *tx;
+        unsigned long      flags;
 
 
         INIT_LIST_HEAD (&list);
@@ -219,7 +239,7 @@ kptllnd_peer_cancel_pending_txs(
          * Tranfer all the PENDING TX's to a temporary list
          * while holding the peer lock
          */
-        spin_lock(&peer->peer_lock);
+        spin_lock_irqsave(&peer->peer_lock, flags);
 
         if(!list_empty(&peer->peer_pending_txs))
                 PJK_UT_MSG("Clearing Pending TXs\n");
@@ -231,7 +251,7 @@ kptllnd_peer_cancel_pending_txs(
                 list_add(&tx->tx_list,&list);
         }
 
-        spin_unlock(&peer->peer_lock);
+        spin_unlock_irqrestore(&peer->peer_lock, flags);
 
         /*
          * Now relese the refereces outside of the peer_lock
@@ -251,8 +271,9 @@ kptllnd_peer_cancel_active_txs(
         kptl_tx_t         *tx;
         ptl_err_t          ptl_rc;
         int                counter;
+        unsigned long      flags;
 
-        spin_lock(&peer->peer_lock);
+        spin_lock_irqsave(&peer->peer_lock, flags);
 
         if(!list_empty(&peer->peer_active_txs))
                 PJK_UT_MSG("Clearing Active TXs\n");
@@ -271,7 +292,7 @@ again:
                  */
                 kptllnd_tx_addref(tx);
 
-                spin_unlock(&peer->peer_lock);
+                spin_unlock_irqrestore(&peer->peer_lock, flags);
 
 
                 /*
@@ -306,7 +327,7 @@ again:
 
                 kptllnd_tx_decref(tx);
 
-                spin_lock(&peer->peer_lock);
+                spin_lock_irqsave(&peer->peer_lock, flags);
 
                 /*
                  * If a change in the list has be detected
@@ -316,7 +337,7 @@ again:
                         goto again;
         }
 
-        spin_unlock(&peer->peer_lock);
+        spin_unlock_irqrestore(&peer->peer_lock, flags);
 }
 
 void
@@ -481,9 +502,10 @@ kptllnd_peer_dequeue_tx(
         kptl_peer_t *peer,
         kptl_tx_t *tx)
 {
-        spin_lock(&peer->peer_lock);
+        unsigned long   flags;
+        spin_lock_irqsave(&peer->peer_lock, flags);
         kptllnd_peer_dequeue_tx_locked(peer,tx);
-        spin_unlock(&peer->peer_lock);
+        spin_unlock_irqrestore(&peer->peer_lock, flags);
 }
 
 void
@@ -491,18 +513,21 @@ kptllnd_peer_check_sends (
         kptl_peer_t *peer )
 {
 
-        kptl_tx_t      *tx;
-        kptl_data_t    *kptllnd_data = peer->peer_kptllnd_data;
-        int             rc,rc2;
-        ptl_md_t        md;
-        ptl_handle_me_t meh;
+        kptl_tx_t       *tx;
+        kptl_data_t     *kptllnd_data = peer->peer_kptllnd_data;
+        int              rc,rc2;
+        ptl_md_t         md;
+        ptl_handle_me_t  meh;
         ptl_process_id_t target;
+        unsigned long    flags;
+        
+        LASSERT(!in_interrupt());
 
         /*
          * If there is nothing to send, and we have hit the credit
          * high water mark, then send a no-op message
          */
-        spin_lock(&peer->peer_lock);
+        spin_lock_irqsave(&peer->peer_lock, flags);
 
         PJK_UT_MSG_DATA(">>>Peer=%p Credits=%d Outstanding=%d\n",
                 peer,peer->peer_credits,peer->peer_outstanding_credits);
@@ -580,12 +605,12 @@ kptllnd_peer_check_sends (
                 if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP &&
                     (!list_empty(&peer->peer_pending_txs) ||
                      peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)) {
-                        spin_unlock(&peer->peer_lock);
+                        spin_unlock_irqrestore(&peer->peer_lock, flags);
                         /* redundant NOOP */
                         kptllnd_tx_decref(tx);
                         CDEBUG(D_NET, LPX64": redundant noop\n",
                                peer->peer_nid);
-                        spin_lock(&peer->peer_lock);
+                        spin_lock_irqsave(&peer->peer_lock, flags);
                         continue;
                 }
 
@@ -739,7 +764,7 @@ kptllnd_peer_check_sends (
                                  */
                                 tx->tx_mdh = PTL_INVALID_HANDLE;
 
-                                spin_unlock(&peer->peer_lock);
+                                spin_unlock_irqrestore(&peer->peer_lock, flags);
 
                                 kptllnd_tx_decref(tx);
 
@@ -789,7 +814,7 @@ kptllnd_peer_check_sends (
                  */
                 kptllnd_tx_addref(tx);
 
-                spin_unlock(&peer->peer_lock);
+                spin_unlock_irqrestore(&peer->peer_lock, flags);
 
                 rc = PtlPut (
                             tx->tx_mdh_msg,
@@ -823,18 +848,18 @@ kptllnd_peer_check_sends (
                  */
                 kptllnd_tx_decref(tx);
 
-                spin_lock(&peer->peer_lock);
+                spin_lock_irqsave(&peer->peer_lock, flags);
 
         }
 
 
-        spin_unlock(&peer->peer_lock);
+        spin_unlock_irqrestore(&peer->peer_lock, flags);
 
         PJK_UT_MSG_DATA("<<<\n");
         return;
 
 failed_with_lock:
-        spin_unlock(&peer->peer_lock);
+        spin_unlock_irqrestore(&peer->peer_lock, flags);
 failed_without_lock:
 
         /*
@@ -869,10 +894,11 @@ failed_without_lock:
 int
 kptllnd_peer_timedout(kptl_peer_t *peer)
 {
-        kptl_tx_t          *tx;
-        int rc = 0;
+        kptl_tx_t      *tx;
+        int             rc = 0;
+        unsigned long   flags;
 
-        spin_lock(&peer->peer_lock);
+        spin_lock_irqsave(&peer->peer_lock, flags);
 
         /*
          * Check the head of the pending list for expiration
@@ -900,7 +926,7 @@ kptllnd_peer_timedout(kptl_peer_t *peer)
                 }
         }
 
-        spin_unlock(&peer->peer_lock);
+        spin_unlock_irqrestore(&peer->peer_lock, flags);
         return rc;
 }
 
@@ -958,13 +984,13 @@ kptllnd_peer_find (
         kptl_peer_t *peer;
         unsigned long flags;
         read_lock_irqsave(&kptllnd_data->kptl_peer_rw_lock, flags);
-        peer = kptllnd_peer_find_locked(kptllnd_data,nid);
+        peer = kptllnd_peer_find_holding_list_lock(kptllnd_data,nid);
         read_unlock_irqrestore(&kptllnd_data->kptl_peer_rw_lock, flags);
         return peer;
 }
 
 kptl_peer_t *
-kptllnd_peer_find_locked (
+kptllnd_peer_find_holding_list_lock (
         kptl_data_t *kptllnd_data,
         lnet_nid_t nid)
 {
@@ -1001,7 +1027,8 @@ kptllnd_peer_handle_hello (
         lnet_nid_t nid,
         kptl_msg_t *msg)
 {
-        kptl_peer_t    *peer;
+        kptl_peer_t    *peer           = NULL;
+        kptl_peer_t    *peer_allocated = NULL;
         kptl_peer_t    *peer_to_cancel = NULL;
         unsigned long   flags;
         kptl_tx_t      *tx_hello = NULL;
@@ -1026,13 +1053,40 @@ kptllnd_peer_handle_hello (
 
                 return 0;
         }
+        
+        /*
+         * Setup a connect HELLO message.  We ultimately might not
+         * use it but likely we will.
+         */
+        tx_hello = kptllnd_get_idle_tx(kptllnd_data,TX_TYPE_SMALL_MESSAGE);
+        if( tx_hello == NULL) {
+                CERROR("Unable to allocate connect message for "LPX64"\n",nid);
+                return 0;
+        }
+
+        kptllnd_init_msg(
+                tx_hello->tx_msg,
+                PTLLND_MSG_TYPE_HELLO,
+                sizeof(kptl_hello_msg_t));        
+         
+        /*
+         * Allocate a peer, even though we might not ultimatly use it
+         * however we want to avoid doing this while holidng
+         * the peer_rw_lock and be forced into atomic context
+         */                
+        rc = kptllnd_peer_allocate ( kptllnd_data, &peer_allocated, nid);
+        if(rc != 0){
+                kptllnd_tx_decref(tx_hello);
+                CERROR("Failed to create peer (nid="LPX64")\n",nid);
+                return 0;
+        }                
 
         write_lock_irqsave(&kptllnd_data->kptl_peer_rw_lock, flags);
 
         /*
          * Look for peer because it could have been previously here
          */
-        peer = kptllnd_peer_find_locked(kptllnd_data,nid);
+        peer = kptllnd_peer_find_holding_list_lock(kptllnd_data,nid);
 
         /*
          * If peer is already here
@@ -1089,20 +1143,6 @@ kptllnd_peer_handle_hello (
         if( peer == NULL) {
 
                 /*
-                 * Setup a connect HELLO message.  We ultimately might not
-                 * use it but likely we will.
-                 */
-                tx_hello = kptllnd_get_idle_tx(kptllnd_data,TX_TYPE_SMALL_MESSAGE);
-                if( tx_hello == NULL) {
-                        CERROR("Unable to allocate connect message for "LPX64"\n",nid);
-                        goto failed;
-                }
-
-                kptllnd_init_msg(
-                        tx_hello->tx_msg,
-                        PTLLND_MSG_TYPE_HELLO,
-                        sizeof(kptl_hello_msg_t));
-                /*
                  * Put the match bits into the hello message
                  */
                 tx_hello->tx_msg->ptlm_u.hello.kptlhm_matchbits =
@@ -1110,12 +1150,18 @@ kptllnd_peer_handle_hello (
                 tx_hello->tx_msg->ptlm_u.hello.kptlhm_max_immd_size =
                         *kptllnd_tunables.kptl_max_immd_size;
 
-                rc = kptllnd_peer_create_locked ( kptllnd_data, &peer, nid);
+                /*
+                 * Try and attach this peer to the list
+                 */
+                rc = kptllnd_peer_add_to_list_locked ( kptllnd_data, peer_allocated);
                 if(rc != 0){
                         CERROR("Failed to create peer (nid="LPX64")\n",nid);
-                        peer = NULL;
                         goto failed;
                 }
+                
+                peer = peer_allocated;
+                peer_allocated = NULL;
+                
 
                 LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO);
                 peer->peer_state = PEER_STATE_ACTIVE;
@@ -1124,7 +1170,7 @@ kptllnd_peer_handle_hello (
                  * NB We don't need to hold the peer->peer_lock
                  * because we haven't released the kptl_peer_rw_lock which
                  * holds prevents anyone else from getting a pointer to
-                 * this newly created peer
+                 * this newly added (to the lost) peer
                  */
 
                 /*
@@ -1176,6 +1222,9 @@ failed:
                 kptllnd_peer_cancel(peer_to_cancel);
                 kptllnd_peer_decref(peer_to_cancel,"find");
         }
+        
+        if(peer_allocated)
+                kptllnd_peer_decref(peer_allocated,"alloc");
 
         PJK_UT_MSG("<<< Peer=%p\n",peer);
 
@@ -1189,11 +1238,13 @@ kptllnd_tx_launch (
         lnet_msg_t *ptlmsg )
 {
         kptl_data_t     *kptllnd_data = tx->tx_po.po_kptllnd_data;
-        kptl_peer_t     *peer;
+        kptl_peer_t     *peer = NULL;
+        kptl_peer_t     *peer_allocated = NULL;
         unsigned long    flags;
         rwlock_t        *g_lock = &kptllnd_data->kptl_peer_rw_lock;
         int              rc;
-        kptl_tx_t       *tx_hello;
+        kptl_tx_t       *tx_hello = NULL;
+        
 
         /* If I get here, I've committed to send, so I complete the tx with
          * failure on any problems */
@@ -1218,9 +1269,9 @@ kptllnd_tx_launch (
          * (which could send it)
          */
         if (peer != NULL) {
-                spin_lock(&peer->peer_lock);
+                spin_lock_irqsave(&peer->peer_lock, flags);
                 kptllnd_peer_queue_tx_locked ( peer, tx );
-                spin_unlock(&peer->peer_lock);
+                spin_unlock_irqrestore(&peer->peer_lock, flags);
                 kptllnd_peer_check_sends(peer);
                 kptllnd_peer_decref(peer,"find");
                 PJK_UT_MSG("<<< FOUND\n");
@@ -1238,7 +1289,6 @@ kptllnd_tx_launch (
         if( tx_hello == NULL) {
                 CERROR("Unable to allocate connect message for "LPX64"\n",target_nid);
                 kptllnd_tx_decref (tx);
-                kptllnd_peer_decref(peer,"find");
                 return;
         }
 
@@ -1246,6 +1296,27 @@ kptllnd_tx_launch (
                 tx_hello->tx_msg,
                 PTLLND_MSG_TYPE_HELLO,
                 sizeof(kptl_hello_msg_t));
+                
+        /*
+         * We've never seen this peer before.  So setup
+         * a default message.
+         */
+        tx_hello->tx_msg->ptlm_u.hello.kptlhm_matchbits = 0;
+        tx_hello->tx_msg->ptlm_u.hello.kptlhm_max_immd_size =
+                *kptllnd_tunables.kptl_max_immd_size;
+
+        /* 
+         * Allocate a new peer
+         * (it's not active until its on the list)
+         */       
+        PJK_UT_MSG("TX %p creating NEW PEER nid="LPX64"\n",tx,target_nid);         
+        rc = kptllnd_peer_allocate ( kptllnd_data, &peer_allocated, target_nid);
+        if(rc != 0){
+                CERROR("Failed to create peer (nid="LPX64")\n",target_nid);
+                kptllnd_tx_decref (tx);
+                kptllnd_tx_decref (tx_hello);
+                return;
+        }                     
 
 
         /*
@@ -1254,7 +1325,7 @@ kptllnd_tx_launch (
          */
         write_lock_irqsave(g_lock, flags);
 
-        peer = kptllnd_peer_find_locked (kptllnd_data,target_nid);
+        peer = kptllnd_peer_find_holding_list_lock (kptllnd_data,target_nid);
 
         /*
          * If we find the peer
@@ -1266,11 +1337,13 @@ kptllnd_tx_launch (
 
                 CDEBUG(D_TRACE,"HELLO message race occurred (nid="LPX64")\n",target_nid);
 
-                spin_lock(&peer->peer_lock);
+                spin_lock_irqsave(&peer->peer_lock, flags);
                 kptllnd_peer_queue_tx_locked ( peer, tx );
-                spin_unlock(&peer->peer_lock);
+                spin_unlock_irqrestore(&peer->peer_lock, flags);
+                
                 kptllnd_peer_check_sends(peer);
                 kptllnd_peer_decref(peer,"find");
+                kptllnd_peer_decref(peer_allocated,"alloc");
 
                 /* and we don't need the connection tx*/
                 kptllnd_tx_decref(tx_hello);
@@ -1278,27 +1351,27 @@ kptllnd_tx_launch (
                 PJK_UT_MSG("<<< FOUND2\n");
                 return;
         }
-
-        PJK_UT_MSG("TX %p creating NEW PEER nid="LPX64"\n",tx,target_nid);
-        rc = kptllnd_peer_create_locked ( kptllnd_data, &peer, target_nid);
+        
+      
+        rc = kptllnd_peer_add_to_list_locked ( kptllnd_data, peer_allocated);
         if(rc != 0){
-                CERROR("Failed to create peer (nid="LPX64")\n",target_nid);
                 write_unlock_irqrestore(g_lock, flags);
+                
+                CERROR("Failed to add peer to list (nid="LPX64")\n",target_nid);
+                
+                /* Drop these TXs tx*/
                 kptllnd_tx_decref (tx);
                 kptllnd_tx_decref (tx_hello);
-                kptllnd_peer_decref(peer,"find");
+                kptllnd_peer_decref(peer_allocated,"create");
                 return;
         }
+        
+        peer = peer_allocated;
+        peer_allocated = NULL;
 
-
-        /*
-         * We've never seen this peer before.  So setup
-         * a default message.
-         */
-        tx_hello->tx_msg->ptlm_u.hello.kptlhm_matchbits = 0;
-        tx_hello->tx_msg->ptlm_u.hello.kptlhm_max_immd_size =
-                *kptllnd_tunables.kptl_max_immd_size;
-
+        write_unlock_irqrestore(g_lock,flags);
+        
+        
         /*
          * Queue the connection request
          * and the actually tx.  We have one credit so
@@ -1306,13 +1379,12 @@ kptllnd_tx_launch (
          * the tx will wait for a reply.
          */
         PJK_UT_MSG("TXHello=%p\n",tx_hello);
-
-        spin_lock(&peer->peer_lock);
+        
+        
+        spin_lock_irqsave(&peer->peer_lock, flags);
         kptllnd_peer_queue_tx_locked(peer,tx_hello);
         kptllnd_peer_queue_tx_locked(peer,tx);
-        spin_unlock(&peer->peer_lock);
-
-        write_unlock_irqrestore(g_lock,flags);
+        spin_unlock_irqrestore(&peer->peer_lock, flags);
 
         kptllnd_peer_check_sends(peer);
         kptllnd_peer_decref(peer,"find");
index 45f0c15..5fae660 100644 (file)
@@ -56,10 +56,11 @@ kptllnd_rx_buffer_pool_fini(
         kptl_rx_buffer_t       *rxb;
         int                     rc;
         int                     i;
+        unsigned long           flags;
 
         PJK_UT_MSG("kptllnd_rx_buffer_pool_fini\n");
-
-        spin_lock(&rxbp->rxbp_lock);
+        
+        spin_lock_irqsave(&rxbp->rxbp_lock, flags);
 
         /*
          * Set the shutdown flag under the lock
@@ -110,7 +111,7 @@ kptllnd_rx_buffer_pool_fini(
                          */
                         kptllnd_rx_buffer_addref(rxb,"temp");
 
-                        spin_unlock(&rxbp->rxbp_lock);
+                        spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
 
                         /*
                          * Unlinked the MD
@@ -127,14 +128,14 @@ kptllnd_rx_buffer_pool_fini(
                                  */
                                 kptllnd_rx_buffer_decref(rxb,"temp");
 
-                                spin_lock(&rxbp->rxbp_lock);
+                                spin_lock_irqsave(&rxbp->rxbp_lock, flags);
                         }else{
                                 PJK_UT_MSG("PtlMDUnlink(%p) rc=%d\n",rxb,rc);
                                 /*
                                  * The unlinked failed so put this back
                                  * on the list for later
                                  */
-                                spin_lock(&rxbp->rxbp_lock);
+                                spin_lock_irqsave(&rxbp->rxbp_lock, flags);
 
                                 list_add_tail(&rxb->rxb_list,&rxbp->rxbp_list);
 
@@ -155,9 +156,9 @@ kptllnd_rx_buffer_pool_fini(
                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
                                "Waiting for %d Busy RX Buffers\n",
                                rxbp->rxbp_count);
-                        spin_unlock(&rxbp->rxbp_lock);
+                        spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
                         cfs_pause(cfs_time_seconds(1));
-                        spin_lock(&rxbp->rxbp_lock);
+                        spin_lock_irqsave(&rxbp->rxbp_lock, flags);
                 }
         }
 
@@ -172,15 +173,15 @@ kptllnd_rx_buffer_pool_fini(
                         CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
                                "Waiting for %d RX Buffers to unlink\n",
                                rxbp->rxbp_count);
-                        spin_unlock(&rxbp->rxbp_lock);
+                        spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
                         cfs_pause(cfs_time_seconds(1));
-                        spin_lock(&rxbp->rxbp_lock);
+                        spin_lock_irqsave(&rxbp->rxbp_lock, flags);
                 }
         }
 
         CDEBUG(D_TRACE,"|rxbp_count|=0\n");
 
-        spin_unlock(&rxbp->rxbp_lock);
+        spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
 }
 
 
@@ -195,8 +196,9 @@ kptllnd_rx_buffer_pool_reserve(
         int                     rc;
         kptl_rx_buffer_t       *rxb;
         int                     nbuffers;
+        unsigned long           flags;
 
-        spin_lock(&rxbp->rxbp_lock);
+        spin_lock_irqsave(&rxbp->rxbp_lock, flags);
 
         PJK_UT_MSG("kptllnd_rx_buffer_pool_reserve(%d)\n",count);
 
@@ -204,7 +206,7 @@ kptllnd_rx_buffer_pool_reserve(
          * Prevent reservation of anymore while we are shutting down
          */
         if(rxbp->rxbp_shutdown){
-                spin_unlock(&rxbp->rxbp_lock);
+                spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
                 return -ESHUTDOWN;
         }
 
@@ -234,7 +236,7 @@ kptllnd_rx_buffer_pool_reserve(
          * we'll subtract if we hit an error.
          */
         rxbp->rxbp_count += add;
-        spin_unlock(&rxbp->rxbp_lock);
+        spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
 
         for(i=0;i<add;i++){
                 LIBCFS_ALLOC( rxb,sizeof(*rxb));
@@ -272,7 +274,7 @@ kptllnd_rx_buffer_pool_reserve(
         return 0;
 
 failed:
-        spin_lock(&rxbp->rxbp_lock);
+        spin_lock_irqsave(&rxbp->rxbp_lock, flags);
 
         /*
          * We really didn't add as many
@@ -284,7 +286,7 @@ failed:
          * Cancel this reservation
          */
         rxbp->rxbp_reserved -= count;
-        spin_unlock(&rxbp->rxbp_lock);
+        spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
 
 
         if(rxb){
@@ -301,10 +303,11 @@ kptllnd_rx_buffer_pool_unreserve(
         kptl_rx_buffer_pool_t *rxbp,
         int count)
 {
-        spin_lock(&rxbp->rxbp_lock);
+        unsigned long flags;
+        spin_lock_irqsave(&rxbp->rxbp_lock, flags);
         PJK_UT_MSG("kptllnd_rx_buffer_pool_unreserve(%d)\n",count);
         rxbp->rxbp_reserved -= count;
-        spin_unlock(&rxbp->rxbp_lock);
+        spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
 }
 
 void
@@ -335,19 +338,20 @@ kptllnd_rx_buffer_post(
         ptl_process_id_t        any;
         kptl_data_t            *kptllnd_data = rxb->rxb_po.po_kptllnd_data;
         kptl_rx_buffer_pool_t  *rxbp = rxb->rxb_pool;
+        unsigned long           flags;
 
         any.nid = PTL_NID_ANY;
         any.pid = PTL_PID_ANY;
 
         /*PJK_UT_MSG("rxb=%p\n",rxb);*/
 
-        spin_lock(&rxbp->rxbp_lock);
+        spin_lock_irqsave(&rxbp->rxbp_lock, flags);
 
         /*
          * No new RXB's can enter the POSTED state
          */
         if(rxbp->rxbp_shutdown){
-                spin_unlock(&rxbp->rxbp_lock);
+                spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
                 return -ESHUTDOWN;
         }
 
@@ -361,7 +365,7 @@ kptllnd_rx_buffer_post(
         atomic_set(&rxb->rxb_refcount,1);
         rxb->rxb_state = RXB_STATE_POSTED;
 
-        spin_unlock(&rxbp->rxbp_lock);
+        spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
 
         /*
          * Attach the ME
@@ -416,9 +420,9 @@ kptllnd_rx_buffer_post(
          * to deal with shutdown race, of
          * a partially constructed rbx
          */
-        spin_lock(&rxbp->rxbp_lock);
+        spin_lock_irqsave(&rxbp->rxbp_lock, flags);
         rxb->rxb_mdh = mdh;
-        spin_unlock(&rxbp->rxbp_lock);
+        spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
 
         return 0;
 
@@ -427,11 +431,11 @@ failure:
         /*
          * Cleanup on error
          */
-        spin_lock(&rxbp->rxbp_lock);
+        spin_lock_irqsave(&rxbp->rxbp_lock, flags);
         list_del_init(&rxb->rxb_list);
         atomic_set(&rxb->rxb_refcount,0);
         rxb->rxb_state = RXB_STATE_IDLE;
-        spin_unlock(&rxbp->rxbp_lock);
+        spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
 
         return rc;
 }
@@ -472,15 +476,16 @@ kptllnd_rx_buffer_destroy(
         kptl_rx_buffer_t *rxb)
 {
         kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool;
+        unsigned long          flags;
 
         LASSERT(atomic_read(&rxb->rxb_refcount) == 0);
         LASSERT(rxb->rxb_state == RXB_STATE_IDLE);
         LASSERT(PtlHandleIsEqual(rxb->rxb_mdh,PTL_INVALID_HANDLE));
 
-        spin_lock(&rxbp->rxbp_lock);
+        spin_lock_irqsave(&rxbp->rxbp_lock, flags);
         list_del(&rxb->rxb_list);
         rxbp->rxbp_count--;
-        spin_unlock(&rxbp->rxbp_lock);
+        spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
 
         LIBCFS_FREE( rxb->rxb_buffer,PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages);
         LIBCFS_FREE(rxb,sizeof(*rxb));
@@ -491,12 +496,12 @@ kptllnd_rx_buffer_destroy(
 void
 kptllnd_rx_buffer_callback(ptl_event_t *ev)
 {
-        kptl_rx_buffer_t *rxb = ev->md.user_ptr;
-        kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool;
-        /*kptl_data_t  *kptllnd_data = rxb->rxb_po.po_kptllnd_data;*/
-        kptl_rx_t *rx;
-        int nob;
-        int unlinked;
+        kptl_rx_buffer_t       *rxb = ev->md.user_ptr;
+        kptl_rx_buffer_pool_t  *rxbp = rxb->rxb_pool;
+        kptl_rx_t              *rx;
+        int                     nob;
+        int                     unlinked;
+        unsigned long           flags;
 
         /*
          * Set the local unlinked flag
@@ -525,7 +530,7 @@ kptllnd_rx_buffer_callback(ptl_event_t *ev)
         nob = ev->mlength;
 
         if(unlinked){
-                spin_lock(&rxbp->rxbp_lock);
+                spin_lock_irqsave(&rxbp->rxbp_lock, flags);
 
                 /*
                  * Remove this from the list
@@ -537,14 +542,12 @@ kptllnd_rx_buffer_callback(ptl_event_t *ev)
                 rxb->rxb_mdh = PTL_INVALID_HANDLE;
 
                 if( rxbp->rxbp_shutdown){
-                        spin_unlock(&rxbp->rxbp_lock);
+                        spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
                         kptllnd_rx_buffer_decref(rxb,"portals");
                         return;
                 }
 
-                spin_unlock(&rxbp->rxbp_lock);
-
-
+                spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
         }
 
         /*
@@ -619,6 +622,7 @@ kptllnd_rx_scheduler_handler(kptl_rx_t *rx)
         int                     returned_credits = 0;
         int                     type = msg->ptlm_type;
         lnet_nid_t              lnet_initiator_nid = ptl2lnetnid(kptllnd_data,rx->rx_initiator.nid);
+        unsigned long           flags;
 
 
         PJK_UT_MSG_DATA(">>> RXRXRXRXRXRXRXRXRXRXRXRX\n");
@@ -755,10 +759,10 @@ kptllnd_rx_scheduler_handler(kptl_rx_t *rx)
                 /*
                  * Save the last match bits used
                  */
-                spin_lock(&rx->rx_peer->peer_lock);
+                spin_lock_irqsave(&rx->rx_peer->peer_lock, flags);
                 if(msg->ptlm_u.req.kptlrm_matchbits > rx->rx_peer->peer_last_matchbits_seen)
                         rx->rx_peer->peer_last_matchbits_seen = msg->ptlm_u.req.kptlrm_matchbits;
-                spin_unlock(&rx->rx_peer->peer_lock);
+                spin_unlock_irqrestore(&rx->rx_peer->peer_lock, flags);
 
                 rc = lnet_parse(kptllnd_data->kptl_ni,
                         &msg->ptlm_u.req.kptlrm_hdr,
@@ -867,9 +871,10 @@ kptllnd_rx_alloc(
 void
 kptllnd_rx_destroy(kptl_rx_t *rx,kptl_data_t *kptllnd_data)
 {
-        kptl_peer_t  *peer = rx->rx_peer;
-        kptl_msg_t   *msg = rx->rx_msg;
-        int returned_credits = msg->ptlm_credits;
+        kptl_peer_t    *peer = rx->rx_peer;
+        kptl_msg_t     *msg = rx->rx_msg;
+        int             returned_credits = msg->ptlm_credits;
+        unsigned long   flags;
 
         PJK_UT_MSG(">>> rx=%p\n",rx);
 
@@ -891,14 +896,14 @@ kptllnd_rx_destroy(kptl_rx_t *rx,kptl_data_t *kptllnd_data)
                  * Update credits
                  * (Only after I've reposted the buffer)
                  */
-                spin_lock(&peer->peer_lock);
+                spin_lock_irqsave(&peer->peer_lock, flags);
                 peer->peer_credits += returned_credits;
                 LASSERT( peer->peer_credits <=
                         *kptllnd_tunables.kptl_peercredits);
                 peer->peer_outstanding_credits++;
                 LASSERT( peer->peer_outstanding_credits <=
                         *kptllnd_tunables.kptl_peercredits);
-                spin_unlock(&peer->peer_lock);
+                spin_unlock_irqrestore(&peer->peer_lock, flags);
 
                 PJK_UT_MSG_DATA("Giving Back %d credits rx=%p\n",returned_credits,rx);
 
index 10dfc96..344f006 100644 (file)
@@ -274,10 +274,11 @@ kptllnd_tx_schedule (kptl_tx_t *tx)
 void
 kptllnd_tx_callback(ptl_event_t *ev)
 {
-        kptl_tx_t *tx = ev->md.user_ptr;
-        kptl_peer_t *peer;
-        int rc;
-        int do_decref = 0;
+        kptl_tx_t       *tx = ev->md.user_ptr;
+        kptl_peer_t     *peer;
+        int              rc;
+        int              do_decref = 0;
+        unsigned long    flags;
 
         PJK_UT_MSG(">>> %s(%d) tx=%p fail=%d\n",
                 get_ev_type_string(ev->type),ev->type,tx,ev->ni_fail_type);
@@ -315,8 +316,8 @@ kptllnd_tx_callback(ptl_event_t *ev)
 
         LASSERT(tx->tx_peer != NULL);
         peer = tx->tx_peer;
-
-        spin_lock(&peer->peer_lock);
+        
+        spin_lock_irqsave(&peer->peer_lock, flags);
 
         /*
          * Save the status flag
@@ -443,7 +444,7 @@ kptllnd_tx_callback(ptl_event_t *ev)
                 LBUG();
         }
 
-        spin_unlock(&peer->peer_lock);
+        spin_unlock_irqrestore(&peer->peer_lock, flags);
 
         if(do_decref)
                 kptllnd_tx_scheduled_decref(tx);