From 812be268bd6d0395172de5aa0491f5c161096228 Mon Sep 17 00:00:00 2001 From: pjkirner Date: Wed, 5 Oct 2005 18:36:35 +0000 Subject: [PATCH] * Add seperate thread for non-critical work * Moved peer locks so they are not held with Ptl API calls are made * Fixed location of returning credits and checking sends (to before lnet_parse()) is called * Added TBD list --- lnet/klnds/ptllnd/ptllnd.c | 22 +++++- lnet/klnds/ptllnd/ptllnd.h | 4 +- lnet/klnds/ptllnd/ptllnd_cb.c | 154 ++++++++++++++++++++++---------------- lnet/klnds/ptllnd/ptllnd_peer.c | 52 +++++++++---- lnet/klnds/ptllnd/ptllnd_rx_buf.c | 32 +++++--- lnet/klnds/ptllnd/ptllnd_tx.c | 4 +- 6 files changed, 174 insertions(+), 94 deletions(-) diff --git a/lnet/klnds/ptllnd/ptllnd.c b/lnet/klnds/ptllnd/ptllnd.c index a865b15..a2dec41 100755 --- a/lnet/klnds/ptllnd/ptllnd.c +++ b/lnet/klnds/ptllnd/ptllnd.c @@ -18,6 +18,15 @@ #include "ptllnd.h" + +/* + * TBD List + * - Add code to prevent peers with diffrent credits from connection + * - peer->peer_outstanding_credits LASSERT is incorrect if peer is allowed + * to have a diffrent number of credits configured + + */ + lnd_t kptllnd_lnd = { .lnd_type = PTLLND, .lnd_startup = kptllnd_startup, @@ -411,13 +420,22 @@ kptllnd_startup (lnet_ni_t *ni) for (i = 0; i < PTLLND_N_SCHED; i++) { rc = kptllnd_thread_start ( kptllnd_scheduler, - i, + i+1, kptllnd_data); if (rc != 0) { - CERROR("Can't spawn scheduler[%d]: %d\n", i, rc); + CERROR("Can't spawn scheduler[%d]: %d\n", i+1, rc); goto failed; } } + + rc = kptllnd_thread_start ( + kptllnd_watchdog, + 0, + kptllnd_data); + if (rc != 0) { + CERROR("Can't spawn watchdog[0]: %d\n", rc); + goto failed; + } /* * Allocate space for the tx descriptors diff --git a/lnet/klnds/ptllnd/ptllnd.h b/lnet/klnds/ptllnd/ptllnd.h index 00db185..0a847e9 100755 --- a/lnet/klnds/ptllnd/ptllnd.h +++ b/lnet/klnds/ptllnd/ptllnd.h @@ -81,7 +81,7 @@ #define PTLLND_PEER_HASH_SIZE 101 /* # of buckets in peer hash table */ /* tunables fixed at compile time */ -#define PTLLND_CREDIT_HIGHWATER (*kptllnd_tunables.kptl_peercredits-1) /* when to eagerly return credits */ +#define PTLLND_CREDIT_HIGHWATER ((*kptllnd_tunables.kptl_peercredits)-1) /* when to eagerly return credits */ #define PTLLND_TIMEOUT_SEC 3 /* How often we check a subset of the peer hash table for timeout*/ typedef struct @@ -392,6 +392,8 @@ void kptllnd_eq_callback( int kptllnd_scheduler( void *arg); +int kptllnd_watchdog( + void *arg); int kptllnd_thread_start( int (*fn)(void *arg), diff --git a/lnet/klnds/ptllnd/ptllnd_cb.c b/lnet/klnds/ptllnd/ptllnd_cb.c index a4c5215..21e6b22 100644 --- a/lnet/klnds/ptllnd/ptllnd_cb.c +++ b/lnet/klnds/ptllnd/ptllnd_cb.c @@ -52,7 +52,7 @@ kptllnd_setup_md( /* * Get operations need threshold +1 to handle the - * reply operation. But only on the receiver side. QQQ + * reply operation. But only on the receiver side. */ if( op == PTL_MD_OP_GET && tx->tx_associated_rx != NULL) md->threshold++; @@ -219,7 +219,8 @@ kptllnd_start_bulk_rdma( tempiov_t tempiov; kptl_msg_t *rxmsg = rx->rx_msg; kptl_peer_t *peer = rx->rx_peer; - unsigned long flags; + unsigned long flags; + ptl_handle_md_t mdh; /* @@ -254,8 +255,6 @@ kptllnd_start_bulk_rdma( payload_niov,payload_iov,payload_kiov, payload_offset,payload_nob,&tempiov); - spin_lock_irqsave(&peer->peer_lock, flags); - /* * Attach the MD */ @@ -263,19 +262,17 @@ kptllnd_start_bulk_rdma( kptllnd_data->kptl_nih, md, PTL_UNLINK, - &tx->tx_mdh); + &mdh); if(ptl_rc != PTL_OK){ CERROR("PtlMDBind failed %d\n",ptl_rc); - - spin_unlock_irqrestore(&peer->peer_lock, flags); - /* - * Just drop the ref for this MD because it was never - * posted to portals - */ - tx->tx_mdh = PTL_INVALID_HANDLE; rc = -ENOMEM; goto end; } + + spin_lock_irqsave(&peer->peer_lock, flags); + + tx->tx_mdh = mdh; + STAT_UPDATE(kps_posted_tx_bulk_mds); /* @@ -324,7 +321,6 @@ kptllnd_start_bulk_rdma( CERROR("Ptl%s failed: %d\n", op == PTL_MD_OP_GET ? "Get" : "Put",ptl_rc); - spin_lock_irqsave(&peer->peer_lock, flags); /* * Unlink the MD because it's not yet in use @@ -342,6 +338,9 @@ kptllnd_start_bulk_rdma( tx->tx_mdh = PTL_INVALID_HANDLE; kptllnd_tx_decref(tx); #endif + + spin_lock_irqsave(&peer->peer_lock, flags); + /* * We are returning failure so we don't * want tx_done to finalize the message @@ -875,7 +874,83 @@ kptllnd_thread_start (int (*fn)(void *arg), int id,kptl_data_t *kptllnd_data) } } +int +kptllnd_watchdog(void *arg) +{ + kptllnd_thread_data_t *thread_data = arg; + int id = thread_data->id; + kptl_data_t *kptllnd_data = thread_data->kptllnd_data; + char name[16]; + cfs_waitlink_t waitlink; + int peer_index = 0; + unsigned long deadline = jiffies; + int timeout; + int i; + + PJK_UT_MSG(">>>\n"); + + /* + * Daemonize + */ + snprintf(name, sizeof(name), "kptllnd_wd_%02d", id); + libcfs_daemonize(name); + + cfs_waitlink_init(&waitlink); + + /* + * Keep going around + */ + while(!kptllnd_data->kptl_shutdown) { + + /* + * Wait on the scheduler waitq + */ + + set_current_state (TASK_INTERRUPTIBLE); + cfs_waitq_add(&kptllnd_data->kptl_sched_waitq, &waitlink); + cfs_waitq_timedwait(&waitlink,cfs_time_seconds(PTLLND_TIMEOUT_SEC)); + set_current_state (TASK_RUNNING); + cfs_waitq_del (&kptllnd_data->kptl_sched_waitq, &waitlink); + + + timeout = (int)(deadline - jiffies); + if (timeout <= 0) { + const int n = 4; + const int p = 1; + int chunk = kptllnd_data->kptl_peer_hash_size; + + + /* Time to check for RDMA timeouts on a few more + * peers: I do checks every 'p' seconds on a + * proportion of the peer table and I need to check + * every connection 'n' times within a timeout + * interval, to ensure I detect a timeout on any + * connection within (n+1)/n times the timeout + * interval. */ + + if ((*kptllnd_tunables.kptl_timeout) > n * p) + chunk = (chunk * n * p) / + (*kptllnd_tunables.kptl_timeout); + if (chunk == 0) + chunk = 1; + + for (i = 0; i < chunk; i++) { + - STAT_UPDATE(kps_checking_buckets); + kptllnd_peer_check_bucket(peer_index,kptllnd_data); + peer_index = (peer_index + 1) % + kptllnd_data->kptl_peer_hash_size; + } + + deadline += p * HZ; + } + kptllnd_clean_canceled_peers(kptllnd_data); + } + + kptllnd_thread_fini(thread_data); + PJK_UT_MSG("<<<\n"); + return (0); +}; int kptllnd_scheduler(void *arg) @@ -885,11 +960,6 @@ kptllnd_scheduler(void *arg) kptl_data_t *kptllnd_data = thread_data->kptllnd_data; char name[16]; cfs_waitlink_t waitlink; - int bucket =0; - int buckets_to_check; - cfs_time_t last_check = cfs_time_current(); - cfs_duration_t duration; - time_t duration_sec; unsigned long flags; kptl_rx_t *rx = NULL; kptl_rx_buffer_t *rxb = NULL; @@ -920,52 +990,6 @@ kptllnd_scheduler(void *arg) set_current_state (TASK_RUNNING); cfs_waitq_del (&kptllnd_data->kptl_sched_waitq, &waitlink); - - duration = cfs_time_sub(cfs_time_current(),last_check); - duration_sec = cfs_duration_sec(duration); - - /* - * Check all the buckets over the kptl_timeout inteval - * but just determine what percenations we are supposed to be - * checking now. - * Example - * (duration/HZ) = 5 sec - * HASH_SHZE = 100 - * kptl_timeout = 60 sec. - * Result = 8 buckets to be checked (actually 8.3) - */ - buckets_to_check = duration_sec * kptllnd_data->kptl_peer_hash_size / - (*kptllnd_tunables.kptl_timeout); - - if(buckets_to_check){ - /*PJK_UT_MSG("buckets_to_check=%d\n",buckets_to_check);*/ - STAT_UPDATE(kps_checking_buckets); - - /* - * Because we round down the buckets we need to store - * the left over portion (.3 in the above example) - * somewhere so we don't - * lose it. Do this but updating the last check now - * to "now" but rather to some time less than "now" that - * takes into account the routing error. - */ - last_check = cfs_time_add( last_check, - cfs_time_seconds(buckets_to_check * - (*kptllnd_tunables.kptl_timeout))/ - kptllnd_data->kptl_peer_hash_size); - - /* - * If we are supposed to check buckets then - * do that here. - */ - while(buckets_to_check){ - kptllnd_peer_check_bucket(bucket,kptllnd_data); - bucket = (bucket+1) % kptllnd_data->kptl_peer_hash_size; - --buckets_to_check; - } - } - - /* * Now service the queuse */ @@ -1023,8 +1047,6 @@ kptllnd_scheduler(void *arg) * try again. */ }while(rx != NULL || rxb != NULL || tx != NULL); - - kptllnd_clean_canceled_peers(kptllnd_data); } kptllnd_thread_fini(thread_data); diff --git a/lnet/klnds/ptllnd/ptllnd_peer.c b/lnet/klnds/ptllnd/ptllnd_peer.c index d0e24df..2b35939 100644 --- a/lnet/klnds/ptllnd/ptllnd_peer.c +++ b/lnet/klnds/ptllnd/ptllnd_peer.c @@ -518,6 +518,8 @@ kptllnd_peer_check_sends ( int rc,rc2; ptl_md_t md; ptl_handle_me_t meh; + ptl_handle_md_t mdh; + ptl_handle_md_t mdh_msg; ptl_process_id_t target; unsigned long flags; @@ -619,6 +621,8 @@ kptllnd_peer_check_sends ( PJK_UT_MSG_DATA("Sending TX=%p Size=%d\n",tx,tx->tx_msg->ptlm_nob); PJK_UT_MSG_DATA("Target nid="LPX64"\n",peer->peer_nid); + mdh = PTL_INVALID_HANDLE; + mdh_msg =PTL_INVALID_HANDLE; /* * Assign matchbits for a put/get @@ -682,6 +686,9 @@ kptllnd_peer_check_sends ( */ peer->peer_credits--; + spin_unlock_irqrestore(&peer->peer_lock, flags); + + /* * Set the state before the PtlPut() because * we could get the PUT_END callback before PtlPut() @@ -698,7 +705,7 @@ kptllnd_peer_check_sends ( target.pid = PTLLND_PID; PJK_UT_MSG_DATA("Msg NOB = %d\n",tx->tx_msg->ptlm_nob); - PJK_UT_MSG_DATA("Returned Credits=%d\n",tx->tx_msg->ptlm_credits); + PJK_UT_MSG_DATA("Giving %d credits back to peer\n",tx->tx_msg->ptlm_credits); PJK_UT_MSG_DATA("Seq # = "LPX64"\n",tx->tx_msg->ptlm_seq); PJK_UT_MSG("lnet TX nid=" LPX64 "\n",peer->peer_nid); @@ -711,7 +718,6 @@ kptllnd_peer_check_sends ( PJK_UT_MSG_DATA("matchibts=" LPX64 "\n", tx->tx_msg->ptlm_u.req.kptlrm_matchbits); - /* * Attach the ME */ @@ -726,7 +732,7 @@ kptllnd_peer_check_sends ( &meh); if(rc != 0) { CERROR("PtlMeAttach failed %d\n",rc); - goto failed_with_lock; + goto failed_without_lock; } /* Setup the MD */ @@ -755,7 +761,7 @@ kptllnd_peer_check_sends ( meh, md, PTL_UNLINK, - &tx->tx_mdh); + &mdh); if(rc != 0){ CERROR("PtlMDAttach failed %d\n",rc); @@ -763,10 +769,6 @@ kptllnd_peer_check_sends ( * Just drop the ref for this MD because it was never * posted to portals */ - tx->tx_mdh = PTL_INVALID_HANDLE; - - spin_unlock_irqrestore(&peer->peer_lock, flags); - kptllnd_tx_decref(tx); rc2 = PtlMEUnlink(meh); @@ -799,17 +801,40 @@ kptllnd_peer_check_sends ( kptllnd_data->kptl_nih, md, PTL_UNLINK, - &tx->tx_mdh_msg); + &mdh_msg); if(rc != 0){ + if(!PtlHandleIsEqual(mdh,PTL_INVALID_HANDLE)){ + rc2 = PtlMDUnlink(mdh); + /* + * The unlink should succeed + */ + LASSERT( rc2 == 0); + } CERROR("PtlMDBind failed %d\n",rc); - tx->tx_mdh_msg = PTL_INVALID_HANDLE; - goto failed_with_lock; + goto failed_without_lock; } STAT_UPDATE(kps_posted_tx_msg_mds); + spin_lock_irqsave(&peer->peer_lock, flags); + + /* + * Assign the MDH's under lock + */ + LASSERT(PtlHandleIsEqual(tx->tx_mdh,PTL_INVALID_HANDLE)); + LASSERT(PtlHandleIsEqual(tx->tx_mdh_msg,PTL_INVALID_HANDLE)); +#ifdef _USING_LUSTRE_PORTALS_ + PJK_UT_MSG("tx_mdh = " LPX64 "\n",mdh.cookie); + PJK_UT_MSG("tx_mdh_msg = " LPX64 "\n",mdh_msg.cookie); +#endif + tx->tx_mdh = mdh; + tx->tx_mdh_msg = mdh_msg; + + if(tx->tx_type == TX_TYPE_SMALL_MESSAGE) + LASSERT(PtlHandleIsEqual(tx->tx_mdh,PTL_INVALID_HANDLE)); list_add_tail(&tx->tx_list, &peer->peer_active_txs); peer->peer_active_txs_change_counter++; + LASSERT(tx->tx_peer == peer); /* @@ -862,8 +887,6 @@ kptllnd_peer_check_sends ( PJK_UT_MSG_DATA("<<<\n"); return; -failed_with_lock: - spin_unlock_irqrestore(&peer->peer_lock, flags); failed_without_lock: /* @@ -955,6 +978,9 @@ kptllnd_peer_check_bucket (int idx, kptl_data_t *kptllnd_data) list_for_each (ptmp, peers) { peer = list_entry (ptmp, kptl_peer_t, peer_list); + PJK_UT_MSG("Peer=%p Credits=%d Outstanding=%d\n", + peer,peer->peer_credits,peer->peer_outstanding_credits); + /* In case we have enough credits to return via a * NOOP, but there were no non-blocking tx descs * free to do it last time... */ diff --git a/lnet/klnds/ptllnd/ptllnd_rx_buf.c b/lnet/klnds/ptllnd/ptllnd_rx_buf.c index 080af8b..2723d30 100644 --- a/lnet/klnds/ptllnd/ptllnd_rx_buf.c +++ b/lnet/klnds/ptllnd/ptllnd_rx_buf.c @@ -59,7 +59,7 @@ kptllnd_rx_buffer_pool_fini( unsigned long flags; PJK_UT_MSG("kptllnd_rx_buffer_pool_fini\n"); - + spin_lock_irqsave(&rxbp->rxbp_lock, flags); /* @@ -502,7 +502,7 @@ kptllnd_rx_buffer_callback(ptl_event_t *ev) int nob; int unlinked; unsigned long flags; - + /* * Set the local unlinked flag */ @@ -648,7 +648,7 @@ kptllnd_rx_scheduler_handler(kptl_rx_t *rx) PJK_UT_MSG_DATA("RX=%p Type=%s(%d)\n",rx, get_msg_type_string(type),type); PJK_UT_MSG_DATA("Msg NOB = %d\n",msg->ptlm_nob); - PJK_UT_MSG_DATA("Returned Credits=%d\n",msg->ptlm_credits); + PJK_UT_MSG_DATA("Credits back from peer=%d\n",msg->ptlm_credits); PJK_UT_MSG_DATA("Seq # ="LPX64"\n",msg->ptlm_seq); PJK_UT_MSG_DATA("lnet RX nid=" LPX64 "\n",lnet_initiator_nid); PJK_UT_MSG("ptl RX nid=" FMT_NID "\n",rx->rx_initiator.nid); @@ -718,6 +718,22 @@ kptllnd_rx_scheduler_handler(kptl_rx_t *rx) */ returned_credits = msg->ptlm_credits; + if (returned_credits != 0) { + + /* Have I received credits that will let me send? */ + spin_lock_irqsave(&peer->peer_lock, flags); + peer->peer_credits += returned_credits; + LASSERT( peer->peer_credits <= + *kptllnd_tunables.kptl_peercredits); + spin_unlock_irqrestore(&peer->peer_lock, flags); + + PJK_UT_MSG("Peer=%p Credits=%d Outstanding=%d\n", + peer,peer->peer_credits,peer->peer_outstanding_credits); + PJK_UT_MSG_DATA("Getting %d credits back rx=%p\n",returned_credits,rx); + + kptllnd_peer_check_sends(peer); + } + /* * Attach the peer to the RX * it now is responsibly for releaseing the refrence @@ -877,7 +893,6 @@ kptllnd_rx_destroy(kptl_rx_t *rx,kptl_data_t *kptllnd_data) { kptl_peer_t *peer = rx->rx_peer; kptl_msg_t *msg = rx->rx_msg; - int returned_credits = msg->ptlm_credits; unsigned long flags; PJK_UT_MSG(">>> rx=%p\n",rx); @@ -901,19 +916,16 @@ kptllnd_rx_destroy(kptl_rx_t *rx,kptl_data_t *kptllnd_data) * (Only after I've reposted the buffer) */ spin_lock_irqsave(&peer->peer_lock, flags); - peer->peer_credits += returned_credits; - LASSERT( peer->peer_credits <= - *kptllnd_tunables.kptl_peercredits); peer->peer_outstanding_credits++; LASSERT( peer->peer_outstanding_credits <= *kptllnd_tunables.kptl_peercredits); spin_unlock_irqrestore(&peer->peer_lock, flags); - PJK_UT_MSG_DATA("Giving Back %d credits rx=%p\n",returned_credits,rx); + PJK_UT_MSG_ALWAYS("Peer=%p Credits=%d Outstanding=%d\n", + peer,peer->peer_credits,peer->peer_outstanding_credits); /* Have I received credits that will let me send? */ - if (returned_credits != 0) - kptllnd_peer_check_sends(peer); + kptllnd_peer_check_sends(peer); kptllnd_peer_decref(peer,"lookup"); } diff --git a/lnet/klnds/ptllnd/ptllnd_tx.c b/lnet/klnds/ptllnd/ptllnd_tx.c index f7be29e..4d3b7b5 100644 --- a/lnet/klnds/ptllnd/ptllnd_tx.c +++ b/lnet/klnds/ptllnd/ptllnd_tx.c @@ -164,8 +164,8 @@ kptllnd_get_idle_tx( * Initialize the TX descriptor so that cleanup can be * handled easily even with a partially initialized descriptor */ - tx->tx_mdh = PTL_INVALID_HANDLE; - tx->tx_mdh_msg = PTL_INVALID_HANDLE; + tx->tx_mdh = PTL_INVALID_HANDLE; + tx->tx_mdh_msg = PTL_INVALID_HANDLE; tx->tx_ptlmsg = NULL; tx->tx_ptlmsg_reply = NULL; tx->tx_peer = NULL; -- 1.8.3.1