tempiov_t tempiov;
kptl_msg_t *rxmsg = rx->rx_msg;
kptl_peer_t *peer = rx->rx_peer;
+ unsigned long flags;
/*
payload_niov,payload_iov,payload_kiov,
payload_offset,payload_nob,&tempiov);
- spin_lock(&peer->peer_lock);
+ spin_lock_irqsave(&peer->peer_lock, flags);
/*
* Attach the MD
if(ptl_rc != PTL_OK){
CERROR("PtlMDBind failed %d\n",ptl_rc);
- spin_unlock(&peer->peer_lock);
+ spin_unlock_irqrestore(&peer->peer_lock, flags);
/*
* Just drop the ref for this MD because it was never
* posted to portals
*/
kptllnd_tx_addref(tx);
- spin_unlock(&peer->peer_lock);
+ spin_unlock_irqrestore(&peer->peer_lock, flags);
/*
CERROR("Ptl%s failed: %d\n",
op == PTL_MD_OP_GET ? "Get" : "Put",ptl_rc);
- spin_lock(&peer->peer_lock);
+ spin_lock_irqsave(&peer->peer_lock, flags);
/*
* Unlink the MD because it's not yet in use
kptllnd_peer_dequeue_tx_locked(peer,tx);
tx->tx_peer = NULL;
- spin_unlock(&peer->peer_lock);
+ spin_unlock_irqrestore(&peer->peer_lock, flags);
rc = -ENOMEM;
goto end;
cfs_time_t last_check = cfs_time_current();
cfs_duration_t duration;
time_t duration_sec;
+ unsigned long flags;
+ kptl_rx_t *rx = NULL;
+ kptl_rx_buffer_t *rxb = NULL;
+ kptl_tx_t *tx = NULL;
PJK_UT_MSG(">>>\n");
}
}
- /*
- * Drain the RX queue
- */
- while(kptllnd_process_scheduled_rx(kptllnd_data)!=0);
/*
- * Repost all RXBs
+ * Now service the queuse
*/
- while(kptllnd_process_scheduled_rxb(kptllnd_data)!=0);
-
- /*
- * Drain the TX queue. Note RX's can cause new TX's
- * to be added to the queue.
- */
- while(kptllnd_process_scheduled_tx(kptllnd_data)!=0);
-
+ do{
+ spin_lock_irqsave(&kptllnd_data->kptl_sched_lock, flags);
+
+ /*
+ * Drain the RX queue
+ */
+ rx = NULL;
+ if(!list_empty(&kptllnd_data->kptl_sched_rxq)){
+ rx = list_entry (kptllnd_data->kptl_sched_rxq.next,
+ kptl_rx_t, rx_list);
+ list_del_init(&rx->rx_list);
+ }
+
+ /*
+ * IDrain the RXB Repost queue
+ */
+ rxb = NULL;
+ if(!list_empty(&kptllnd_data->kptl_sched_rxbq)){
+ rxb = list_entry (kptllnd_data->kptl_sched_rxbq.next,
+ kptl_rx_buffer_t, rxb_repost_list);
+ list_del_init(&rxb->rxb_repost_list);
+ }
+ /*
+ * Drain the TX queue. Note RX's can cause new TX's
+ * to be added to the queue.
+ */
+ tx = NULL;
+ if(!list_empty(&kptllnd_data->kptl_sched_txq)){
+ tx = list_entry (kptllnd_data->kptl_sched_txq.next,
+ kptl_tx_t, tx_schedlist);
+ list_del_init(&tx->tx_schedlist);
+ }
+
+ spin_unlock_irqrestore(&kptllnd_data->kptl_sched_lock, flags);
+
+
+ /*
+ * Process anything that came off the list
+ */
+ if(rx)
+ kptllnd_rx_scheduler_handler(rx);
+ if(rxb)
+ kptllnd_rx_buffer_post_handle_error(rxb);
+ if(tx){
+ PJK_UT_MSG(">>> tx=%p\n",tx);
+ kptllnd_tx_done(tx);
+ PJK_UT_MSG("<<<\n");
+ }
+
+ /*
+ * As long as we did something this time around
+ * try again.
+ */
+ }while(rx != NULL || rxb != NULL || tx != NULL);
- /*
- * Clean any canceled peers
- */
kptllnd_clean_canceled_peers(kptllnd_data);
}
kptl_peer_t *peer);
kptl_peer_t *
-kptllnd_peer_find_locked (
+kptllnd_peer_find_holding_list_lock (
kptl_data_t *kptllnd_data,
lnet_nid_t nid);
+int
+kptllnd_peer_add_to_list_locked (
+ kptl_data_t *kptllnd_data,
+ kptl_peer_t *peer)
+{
+ /* QQQ - this got split out
+ * But first check we haven't exceeded or maximum
+ * number of peers
+ */
+ if (atomic_read(&kptllnd_data->kptl_npeers) >=
+ *kptllnd_tunables.kptl_concurrent_peers) {
+ STAT_UPDATE(kps_too_many_peers);
+ CERROR("Can't create peer: too many peers\n");
+ return -EOVERFLOW; /* !! but at least it distinguishes */
+ }
+
+ /*
+ * Update the state
+ */
+ LASSERT(peer->peer_state == PEER_STATE_ALLOCATED);
+ peer->peer_state = PEER_STATE_WAITING_HELLO;
+
+ /*
+ * +1 ref for the list
+ */
+ atomic_inc(&peer->peer_refcount);
+
+ /* npeers only grows with the global lock held */
+ atomic_inc(&kptllnd_data->kptl_npeers);
+
+ /* And add this to the list */
+ LASSERT(list_empty(&peer->peer_list));
+ list_add_tail (&peer->peer_list,
+ kptllnd_nid2peerlist (kptllnd_data,peer->peer_nid));
+
+ STAT_UPDATE(kps_peers_created);
+
+ return 0;
+}
int
-kptllnd_peer_create_locked (
+kptllnd_peer_allocate (
kptl_data_t *kptllnd_data,
kptl_peer_t **peerp,
lnet_nid_t nid)
LASSERT (nid != PTL_NID_ANY);
- /*
- * But first check we haven't exceeded or maximum
- * number of peers
- */
- if (atomic_read(&kptllnd_data->kptl_npeers) >=
- *kptllnd_tunables.kptl_concurrent_peers) {
- STAT_UPDATE(kps_too_many_peers);
- CERROR("Can't create peer: too many peers\n");
- rc = -EOVERFLOW; /* !! but at least it distinguishes */
- }
-
LIBCFS_ALLOC(peer, sizeof (*peer));
if (peer == NULL) {
CERROR("Cannot allocate memory for peer\n");
spin_lock_init (&peer->peer_lock);
- peer->peer_state = PEER_STATE_WAITING_HELLO;
+ peer->peer_state = PEER_STATE_ALLOCATED;
peer->peer_kptllnd_data = kptllnd_data;
peer->peer_nid = nid;
//peer->peer_incarnation = 0;
peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;
//peer->peer_last_matchbits_seen = 0;
-
/*
* Reserve space in the RX buffer pool for this new peer
*/
return rc;
}
- /*
- * 1 ref for the list
+ /* QQQ - we're not adding to the list anymore
* 1 for the caller
*/
- atomic_set (&peer->peer_refcount, 2);
-
- /* npeers only grows with the global lock held */
- atomic_inc(&kptllnd_data->kptl_npeers);
-
- /* And add this to the list */
- list_add_tail (&peer->peer_list,
- kptllnd_nid2peerlist (kptllnd_data,nid));
-
- STAT_UPDATE(kps_peers_created);
+ atomic_set (&peer->peer_refcount, 1);
PJK_UT_MSG("<<< Peer=%p nid="LPX64"\n",peer,nid);
*peerp = peer;
kptllnd_rx_buffer_pool_unreserve(
&kptllnd_data->kptl_rx_buffer_pool,
*kptllnd_tunables.kptl_peercredits);
-
-
- /* NB a peer's connections keep a reference on their peer until
- * they are destroyed, so we can be assured that _all_ state to do
- * with this peer has been cleaned up when its refcount drops to
- * zero. */
- atomic_dec(&kptllnd_data->kptl_npeers);
+
+ /*
+ * If the peer is only in the ALLOCATED state
+ * then it isn't yet trackied in kptl_npeers,
+ * so do nothing in that case. In all other cases
+ * we need to decrement the counter.
+ */
+ if(peer->peer_state != PEER_STATE_ALLOCATED)
+ atomic_dec(&kptllnd_data->kptl_npeers);
}
if(peer->peer_state == PEER_STATE_CANCELED)
kptllnd_data->kptl_canceled_peers_counter++;
write_unlock_irqrestore(&kptllnd_data->kptl_peer_rw_lock, flags);
-
+
kptllnd_peer_destroy(peer);
}
struct list_head *tx_temp;
struct list_head *tx_next;
kptl_tx_t *tx;
+ unsigned long flags;
INIT_LIST_HEAD (&list);
* Tranfer all the PENDING TX's to a temporary list
* while holding the peer lock
*/
- spin_lock(&peer->peer_lock);
+ spin_lock_irqsave(&peer->peer_lock, flags);
if(!list_empty(&peer->peer_pending_txs))
PJK_UT_MSG("Clearing Pending TXs\n");
list_add(&tx->tx_list,&list);
}
- spin_unlock(&peer->peer_lock);
+ spin_unlock_irqrestore(&peer->peer_lock, flags);
/*
* Now relese the refereces outside of the peer_lock
kptl_tx_t *tx;
ptl_err_t ptl_rc;
int counter;
+ unsigned long flags;
- spin_lock(&peer->peer_lock);
+ spin_lock_irqsave(&peer->peer_lock, flags);
if(!list_empty(&peer->peer_active_txs))
PJK_UT_MSG("Clearing Active TXs\n");
*/
kptllnd_tx_addref(tx);
- spin_unlock(&peer->peer_lock);
+ spin_unlock_irqrestore(&peer->peer_lock, flags);
/*
kptllnd_tx_decref(tx);
- spin_lock(&peer->peer_lock);
+ spin_lock_irqsave(&peer->peer_lock, flags);
/*
* If a change in the list has be detected
goto again;
}
- spin_unlock(&peer->peer_lock);
+ spin_unlock_irqrestore(&peer->peer_lock, flags);
}
void
kptl_peer_t *peer,
kptl_tx_t *tx)
{
- spin_lock(&peer->peer_lock);
+ unsigned long flags;
+ spin_lock_irqsave(&peer->peer_lock, flags);
kptllnd_peer_dequeue_tx_locked(peer,tx);
- spin_unlock(&peer->peer_lock);
+ spin_unlock_irqrestore(&peer->peer_lock, flags);
}
void
kptl_peer_t *peer )
{
- kptl_tx_t *tx;
- kptl_data_t *kptllnd_data = peer->peer_kptllnd_data;
- int rc,rc2;
- ptl_md_t md;
- ptl_handle_me_t meh;
+ kptl_tx_t *tx;
+ kptl_data_t *kptllnd_data = peer->peer_kptllnd_data;
+ int rc,rc2;
+ ptl_md_t md;
+ ptl_handle_me_t meh;
ptl_process_id_t target;
+ unsigned long flags;
+
+ LASSERT(!in_interrupt());
/*
* If there is nothing to send, and we have hit the credit
* high water mark, then send a no-op message
*/
- spin_lock(&peer->peer_lock);
+ spin_lock_irqsave(&peer->peer_lock, flags);
PJK_UT_MSG_DATA(">>>Peer=%p Credits=%d Outstanding=%d\n",
peer,peer->peer_credits,peer->peer_outstanding_credits);
if (tx->tx_msg->ptlm_type == PTLLND_MSG_TYPE_NOOP &&
(!list_empty(&peer->peer_pending_txs) ||
peer->peer_outstanding_credits < PTLLND_CREDIT_HIGHWATER)) {
- spin_unlock(&peer->peer_lock);
+ spin_unlock_irqrestore(&peer->peer_lock, flags);
/* redundant NOOP */
kptllnd_tx_decref(tx);
CDEBUG(D_NET, LPX64": redundant noop\n",
peer->peer_nid);
- spin_lock(&peer->peer_lock);
+ spin_lock_irqsave(&peer->peer_lock, flags);
continue;
}
*/
tx->tx_mdh = PTL_INVALID_HANDLE;
- spin_unlock(&peer->peer_lock);
+ spin_unlock_irqrestore(&peer->peer_lock, flags);
kptllnd_tx_decref(tx);
*/
kptllnd_tx_addref(tx);
- spin_unlock(&peer->peer_lock);
+ spin_unlock_irqrestore(&peer->peer_lock, flags);
rc = PtlPut (
tx->tx_mdh_msg,
*/
kptllnd_tx_decref(tx);
- spin_lock(&peer->peer_lock);
+ spin_lock_irqsave(&peer->peer_lock, flags);
}
- spin_unlock(&peer->peer_lock);
+ spin_unlock_irqrestore(&peer->peer_lock, flags);
PJK_UT_MSG_DATA("<<<\n");
return;
failed_with_lock:
- spin_unlock(&peer->peer_lock);
+ spin_unlock_irqrestore(&peer->peer_lock, flags);
failed_without_lock:
/*
int
kptllnd_peer_timedout(kptl_peer_t *peer)
{
- kptl_tx_t *tx;
- int rc = 0;
+ kptl_tx_t *tx;
+ int rc = 0;
+ unsigned long flags;
- spin_lock(&peer->peer_lock);
+ spin_lock_irqsave(&peer->peer_lock, flags);
/*
* Check the head of the pending list for expiration
}
}
- spin_unlock(&peer->peer_lock);
+ spin_unlock_irqrestore(&peer->peer_lock, flags);
return rc;
}
kptl_peer_t *peer;
unsigned long flags;
read_lock_irqsave(&kptllnd_data->kptl_peer_rw_lock, flags);
- peer = kptllnd_peer_find_locked(kptllnd_data,nid);
+ peer = kptllnd_peer_find_holding_list_lock(kptllnd_data,nid);
read_unlock_irqrestore(&kptllnd_data->kptl_peer_rw_lock, flags);
return peer;
}
kptl_peer_t *
-kptllnd_peer_find_locked (
+kptllnd_peer_find_holding_list_lock (
kptl_data_t *kptllnd_data,
lnet_nid_t nid)
{
lnet_nid_t nid,
kptl_msg_t *msg)
{
- kptl_peer_t *peer;
+ kptl_peer_t *peer = NULL;
+ kptl_peer_t *peer_allocated = NULL;
kptl_peer_t *peer_to_cancel = NULL;
unsigned long flags;
kptl_tx_t *tx_hello = NULL;
return 0;
}
+
+ /*
+ * Setup a connect HELLO message. We ultimately might not
+ * use it but likely we will.
+ */
+ tx_hello = kptllnd_get_idle_tx(kptllnd_data,TX_TYPE_SMALL_MESSAGE);
+ if( tx_hello == NULL) {
+ CERROR("Unable to allocate connect message for "LPX64"\n",nid);
+ return 0;
+ }
+
+ kptllnd_init_msg(
+ tx_hello->tx_msg,
+ PTLLND_MSG_TYPE_HELLO,
+ sizeof(kptl_hello_msg_t));
+
+ /*
+ * Allocate a peer, even though we might not ultimatly use it
+ * however we want to avoid doing this while holidng
+ * the peer_rw_lock and be forced into atomic context
+ */
+ rc = kptllnd_peer_allocate ( kptllnd_data, &peer_allocated, nid);
+ if(rc != 0){
+ kptllnd_tx_decref(tx_hello);
+ CERROR("Failed to create peer (nid="LPX64")\n",nid);
+ return 0;
+ }
write_lock_irqsave(&kptllnd_data->kptl_peer_rw_lock, flags);
/*
* Look for peer because it could have been previously here
*/
- peer = kptllnd_peer_find_locked(kptllnd_data,nid);
+ peer = kptllnd_peer_find_holding_list_lock(kptllnd_data,nid);
/*
* If peer is already here
if( peer == NULL) {
/*
- * Setup a connect HELLO message. We ultimately might not
- * use it but likely we will.
- */
- tx_hello = kptllnd_get_idle_tx(kptllnd_data,TX_TYPE_SMALL_MESSAGE);
- if( tx_hello == NULL) {
- CERROR("Unable to allocate connect message for "LPX64"\n",nid);
- goto failed;
- }
-
- kptllnd_init_msg(
- tx_hello->tx_msg,
- PTLLND_MSG_TYPE_HELLO,
- sizeof(kptl_hello_msg_t));
- /*
* Put the match bits into the hello message
*/
tx_hello->tx_msg->ptlm_u.hello.kptlhm_matchbits =
tx_hello->tx_msg->ptlm_u.hello.kptlhm_max_immd_size =
*kptllnd_tunables.kptl_max_immd_size;
- rc = kptllnd_peer_create_locked ( kptllnd_data, &peer, nid);
+ /*
+ * Try and attach this peer to the list
+ */
+ rc = kptllnd_peer_add_to_list_locked ( kptllnd_data, peer_allocated);
if(rc != 0){
CERROR("Failed to create peer (nid="LPX64")\n",nid);
- peer = NULL;
goto failed;
}
+
+ peer = peer_allocated;
+ peer_allocated = NULL;
+
LASSERT(peer->peer_state == PEER_STATE_WAITING_HELLO);
peer->peer_state = PEER_STATE_ACTIVE;
* NB We don't need to hold the peer->peer_lock
* because we haven't released the kptl_peer_rw_lock which
* holds prevents anyone else from getting a pointer to
- * this newly created peer
+ * this newly added (to the lost) peer
*/
/*
kptllnd_peer_cancel(peer_to_cancel);
kptllnd_peer_decref(peer_to_cancel,"find");
}
+
+ if(peer_allocated)
+ kptllnd_peer_decref(peer_allocated,"alloc");
PJK_UT_MSG("<<< Peer=%p\n",peer);
lnet_msg_t *ptlmsg )
{
kptl_data_t *kptllnd_data = tx->tx_po.po_kptllnd_data;
- kptl_peer_t *peer;
+ kptl_peer_t *peer = NULL;
+ kptl_peer_t *peer_allocated = NULL;
unsigned long flags;
rwlock_t *g_lock = &kptllnd_data->kptl_peer_rw_lock;
int rc;
- kptl_tx_t *tx_hello;
+ kptl_tx_t *tx_hello = NULL;
+
/* If I get here, I've committed to send, so I complete the tx with
* failure on any problems */
* (which could send it)
*/
if (peer != NULL) {
- spin_lock(&peer->peer_lock);
+ spin_lock_irqsave(&peer->peer_lock, flags);
kptllnd_peer_queue_tx_locked ( peer, tx );
- spin_unlock(&peer->peer_lock);
+ spin_unlock_irqrestore(&peer->peer_lock, flags);
kptllnd_peer_check_sends(peer);
kptllnd_peer_decref(peer,"find");
PJK_UT_MSG("<<< FOUND\n");
if( tx_hello == NULL) {
CERROR("Unable to allocate connect message for "LPX64"\n",target_nid);
kptllnd_tx_decref (tx);
- kptllnd_peer_decref(peer,"find");
return;
}
tx_hello->tx_msg,
PTLLND_MSG_TYPE_HELLO,
sizeof(kptl_hello_msg_t));
+
+ /*
+ * We've never seen this peer before. So setup
+ * a default message.
+ */
+ tx_hello->tx_msg->ptlm_u.hello.kptlhm_matchbits = 0;
+ tx_hello->tx_msg->ptlm_u.hello.kptlhm_max_immd_size =
+ *kptllnd_tunables.kptl_max_immd_size;
+
+ /*
+ * Allocate a new peer
+ * (it's not active until its on the list)
+ */
+ PJK_UT_MSG("TX %p creating NEW PEER nid="LPX64"\n",tx,target_nid);
+ rc = kptllnd_peer_allocate ( kptllnd_data, &peer_allocated, target_nid);
+ if(rc != 0){
+ CERROR("Failed to create peer (nid="LPX64")\n",target_nid);
+ kptllnd_tx_decref (tx);
+ kptllnd_tx_decref (tx_hello);
+ return;
+ }
/*
*/
write_lock_irqsave(g_lock, flags);
- peer = kptllnd_peer_find_locked (kptllnd_data,target_nid);
+ peer = kptllnd_peer_find_holding_list_lock (kptllnd_data,target_nid);
/*
* If we find the peer
CDEBUG(D_TRACE,"HELLO message race occurred (nid="LPX64")\n",target_nid);
- spin_lock(&peer->peer_lock);
+ spin_lock_irqsave(&peer->peer_lock, flags);
kptllnd_peer_queue_tx_locked ( peer, tx );
- spin_unlock(&peer->peer_lock);
+ spin_unlock_irqrestore(&peer->peer_lock, flags);
+
kptllnd_peer_check_sends(peer);
kptllnd_peer_decref(peer,"find");
+ kptllnd_peer_decref(peer_allocated,"alloc");
/* and we don't need the connection tx*/
kptllnd_tx_decref(tx_hello);
PJK_UT_MSG("<<< FOUND2\n");
return;
}
-
- PJK_UT_MSG("TX %p creating NEW PEER nid="LPX64"\n",tx,target_nid);
- rc = kptllnd_peer_create_locked ( kptllnd_data, &peer, target_nid);
+
+
+ rc = kptllnd_peer_add_to_list_locked ( kptllnd_data, peer_allocated);
if(rc != 0){
- CERROR("Failed to create peer (nid="LPX64")\n",target_nid);
write_unlock_irqrestore(g_lock, flags);
+
+ CERROR("Failed to add peer to list (nid="LPX64")\n",target_nid);
+
+ /* Drop these TXs tx*/
kptllnd_tx_decref (tx);
kptllnd_tx_decref (tx_hello);
- kptllnd_peer_decref(peer,"find");
+ kptllnd_peer_decref(peer_allocated,"create");
return;
}
+
+ peer = peer_allocated;
+ peer_allocated = NULL;
-
- /*
- * We've never seen this peer before. So setup
- * a default message.
- */
- tx_hello->tx_msg->ptlm_u.hello.kptlhm_matchbits = 0;
- tx_hello->tx_msg->ptlm_u.hello.kptlhm_max_immd_size =
- *kptllnd_tunables.kptl_max_immd_size;
-
+ write_unlock_irqrestore(g_lock,flags);
+
+
/*
* Queue the connection request
* and the actually tx. We have one credit so
* the tx will wait for a reply.
*/
PJK_UT_MSG("TXHello=%p\n",tx_hello);
-
- spin_lock(&peer->peer_lock);
+
+
+ spin_lock_irqsave(&peer->peer_lock, flags);
kptllnd_peer_queue_tx_locked(peer,tx_hello);
kptllnd_peer_queue_tx_locked(peer,tx);
- spin_unlock(&peer->peer_lock);
-
- write_unlock_irqrestore(g_lock,flags);
+ spin_unlock_irqrestore(&peer->peer_lock, flags);
kptllnd_peer_check_sends(peer);
kptllnd_peer_decref(peer,"find");
kptl_rx_buffer_t *rxb;
int rc;
int i;
+ unsigned long flags;
PJK_UT_MSG("kptllnd_rx_buffer_pool_fini\n");
-
- spin_lock(&rxbp->rxbp_lock);
+
+ spin_lock_irqsave(&rxbp->rxbp_lock, flags);
/*
* Set the shutdown flag under the lock
*/
kptllnd_rx_buffer_addref(rxb,"temp");
- spin_unlock(&rxbp->rxbp_lock);
+ spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
/*
* Unlinked the MD
*/
kptllnd_rx_buffer_decref(rxb,"temp");
- spin_lock(&rxbp->rxbp_lock);
+ spin_lock_irqsave(&rxbp->rxbp_lock, flags);
}else{
PJK_UT_MSG("PtlMDUnlink(%p) rc=%d\n",rxb,rc);
/*
* The unlinked failed so put this back
* on the list for later
*/
- spin_lock(&rxbp->rxbp_lock);
+ spin_lock_irqsave(&rxbp->rxbp_lock, flags);
list_add_tail(&rxb->rxb_list,&rxbp->rxbp_list);
CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
"Waiting for %d Busy RX Buffers\n",
rxbp->rxbp_count);
- spin_unlock(&rxbp->rxbp_lock);
+ spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
cfs_pause(cfs_time_seconds(1));
- spin_lock(&rxbp->rxbp_lock);
+ spin_lock_irqsave(&rxbp->rxbp_lock, flags);
}
}
CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */
"Waiting for %d RX Buffers to unlink\n",
rxbp->rxbp_count);
- spin_unlock(&rxbp->rxbp_lock);
+ spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
cfs_pause(cfs_time_seconds(1));
- spin_lock(&rxbp->rxbp_lock);
+ spin_lock_irqsave(&rxbp->rxbp_lock, flags);
}
}
CDEBUG(D_TRACE,"|rxbp_count|=0\n");
- spin_unlock(&rxbp->rxbp_lock);
+ spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
}
int rc;
kptl_rx_buffer_t *rxb;
int nbuffers;
+ unsigned long flags;
- spin_lock(&rxbp->rxbp_lock);
+ spin_lock_irqsave(&rxbp->rxbp_lock, flags);
PJK_UT_MSG("kptllnd_rx_buffer_pool_reserve(%d)\n",count);
* Prevent reservation of anymore while we are shutting down
*/
if(rxbp->rxbp_shutdown){
- spin_unlock(&rxbp->rxbp_lock);
+ spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
return -ESHUTDOWN;
}
* we'll subtract if we hit an error.
*/
rxbp->rxbp_count += add;
- spin_unlock(&rxbp->rxbp_lock);
+ spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
for(i=0;i<add;i++){
LIBCFS_ALLOC( rxb,sizeof(*rxb));
return 0;
failed:
- spin_lock(&rxbp->rxbp_lock);
+ spin_lock_irqsave(&rxbp->rxbp_lock, flags);
/*
* We really didn't add as many
* Cancel this reservation
*/
rxbp->rxbp_reserved -= count;
- spin_unlock(&rxbp->rxbp_lock);
+ spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
if(rxb){
kptl_rx_buffer_pool_t *rxbp,
int count)
{
- spin_lock(&rxbp->rxbp_lock);
+ unsigned long flags;
+ spin_lock_irqsave(&rxbp->rxbp_lock, flags);
PJK_UT_MSG("kptllnd_rx_buffer_pool_unreserve(%d)\n",count);
rxbp->rxbp_reserved -= count;
- spin_unlock(&rxbp->rxbp_lock);
+ spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
}
void
ptl_process_id_t any;
kptl_data_t *kptllnd_data = rxb->rxb_po.po_kptllnd_data;
kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool;
+ unsigned long flags;
any.nid = PTL_NID_ANY;
any.pid = PTL_PID_ANY;
/*PJK_UT_MSG("rxb=%p\n",rxb);*/
- spin_lock(&rxbp->rxbp_lock);
+ spin_lock_irqsave(&rxbp->rxbp_lock, flags);
/*
* No new RXB's can enter the POSTED state
*/
if(rxbp->rxbp_shutdown){
- spin_unlock(&rxbp->rxbp_lock);
+ spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
return -ESHUTDOWN;
}
atomic_set(&rxb->rxb_refcount,1);
rxb->rxb_state = RXB_STATE_POSTED;
- spin_unlock(&rxbp->rxbp_lock);
+ spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
/*
* Attach the ME
* to deal with shutdown race, of
* a partially constructed rbx
*/
- spin_lock(&rxbp->rxbp_lock);
+ spin_lock_irqsave(&rxbp->rxbp_lock, flags);
rxb->rxb_mdh = mdh;
- spin_unlock(&rxbp->rxbp_lock);
+ spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
return 0;
/*
* Cleanup on error
*/
- spin_lock(&rxbp->rxbp_lock);
+ spin_lock_irqsave(&rxbp->rxbp_lock, flags);
list_del_init(&rxb->rxb_list);
atomic_set(&rxb->rxb_refcount,0);
rxb->rxb_state = RXB_STATE_IDLE;
- spin_unlock(&rxbp->rxbp_lock);
+ spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
return rc;
}
kptl_rx_buffer_t *rxb)
{
kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool;
+ unsigned long flags;
LASSERT(atomic_read(&rxb->rxb_refcount) == 0);
LASSERT(rxb->rxb_state == RXB_STATE_IDLE);
LASSERT(PtlHandleIsEqual(rxb->rxb_mdh,PTL_INVALID_HANDLE));
- spin_lock(&rxbp->rxbp_lock);
+ spin_lock_irqsave(&rxbp->rxbp_lock, flags);
list_del(&rxb->rxb_list);
rxbp->rxbp_count--;
- spin_unlock(&rxbp->rxbp_lock);
+ spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
LIBCFS_FREE( rxb->rxb_buffer,PAGE_SIZE * *kptllnd_tunables.kptl_rxb_npages);
LIBCFS_FREE(rxb,sizeof(*rxb));
void
kptllnd_rx_buffer_callback(ptl_event_t *ev)
{
- kptl_rx_buffer_t *rxb = ev->md.user_ptr;
- kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool;
- /*kptl_data_t *kptllnd_data = rxb->rxb_po.po_kptllnd_data;*/
- kptl_rx_t *rx;
- int nob;
- int unlinked;
+ kptl_rx_buffer_t *rxb = ev->md.user_ptr;
+ kptl_rx_buffer_pool_t *rxbp = rxb->rxb_pool;
+ kptl_rx_t *rx;
+ int nob;
+ int unlinked;
+ unsigned long flags;
/*
* Set the local unlinked flag
nob = ev->mlength;
if(unlinked){
- spin_lock(&rxbp->rxbp_lock);
+ spin_lock_irqsave(&rxbp->rxbp_lock, flags);
/*
* Remove this from the list
rxb->rxb_mdh = PTL_INVALID_HANDLE;
if( rxbp->rxbp_shutdown){
- spin_unlock(&rxbp->rxbp_lock);
+ spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
kptllnd_rx_buffer_decref(rxb,"portals");
return;
}
- spin_unlock(&rxbp->rxbp_lock);
-
-
+ spin_unlock_irqrestore(&rxbp->rxbp_lock, flags);
}
/*
int returned_credits = 0;
int type = msg->ptlm_type;
lnet_nid_t lnet_initiator_nid = ptl2lnetnid(kptllnd_data,rx->rx_initiator.nid);
+ unsigned long flags;
PJK_UT_MSG_DATA(">>> RXRXRXRXRXRXRXRXRXRXRXRX\n");
/*
* Save the last match bits used
*/
- spin_lock(&rx->rx_peer->peer_lock);
+ spin_lock_irqsave(&rx->rx_peer->peer_lock, flags);
if(msg->ptlm_u.req.kptlrm_matchbits > rx->rx_peer->peer_last_matchbits_seen)
rx->rx_peer->peer_last_matchbits_seen = msg->ptlm_u.req.kptlrm_matchbits;
- spin_unlock(&rx->rx_peer->peer_lock);
+ spin_unlock_irqrestore(&rx->rx_peer->peer_lock, flags);
rc = lnet_parse(kptllnd_data->kptl_ni,
&msg->ptlm_u.req.kptlrm_hdr,
void
kptllnd_rx_destroy(kptl_rx_t *rx,kptl_data_t *kptllnd_data)
{
- kptl_peer_t *peer = rx->rx_peer;
- kptl_msg_t *msg = rx->rx_msg;
- int returned_credits = msg->ptlm_credits;
+ kptl_peer_t *peer = rx->rx_peer;
+ kptl_msg_t *msg = rx->rx_msg;
+ int returned_credits = msg->ptlm_credits;
+ unsigned long flags;
PJK_UT_MSG(">>> rx=%p\n",rx);
* Update credits
* (Only after I've reposted the buffer)
*/
- spin_lock(&peer->peer_lock);
+ spin_lock_irqsave(&peer->peer_lock, flags);
peer->peer_credits += returned_credits;
LASSERT( peer->peer_credits <=
*kptllnd_tunables.kptl_peercredits);
peer->peer_outstanding_credits++;
LASSERT( peer->peer_outstanding_credits <=
*kptllnd_tunables.kptl_peercredits);
- spin_unlock(&peer->peer_lock);
+ spin_unlock_irqrestore(&peer->peer_lock, flags);
PJK_UT_MSG_DATA("Giving Back %d credits rx=%p\n",returned_credits,rx);