From 292aa42e0897ec4db0c07d00a3f050c19b3f908d Mon Sep 17 00:00:00 2001 From: Liang Zhen Date: Fri, 5 Dec 2014 22:13:17 +0800 Subject: [PATCH] LU-6032 ldlm: don't disable softirq for exp_rpc_lock it is not necessary to call ldlm_lock_busy() in the context of timer callback, we can call it in thread context of expired_lock_main. With this change, we don't need to disable softirq for exp_rpc_lock. Instead of moving busy locks to the end of the waiting list one at a time in the context of the timer callback, move any locks that may be expired onto the expired list. If these locks are still being used by RPCs being processed, then put them back onto the end of the waiting list instead of evicting the client. Signed-off-by: Liang Zhen Change-Id: Ic3da0dd4e81b758c7448d9613ccd4786693e075d Reviewed-on: https://review.whamcloud.com/12957 Reviewed-by: Dmitry Eremin Reviewed-by: Andreas Dilger Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Oleg Drokin --- lustre/ldlm/ldlm_lockd.c | 91 +++++++++++++++++++++--------------------------- lustre/ptlrpc/service.c | 12 +++---- 2 files changed, 46 insertions(+), 57 deletions(-) diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 7f6191b..b1def6d 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -145,6 +145,10 @@ static enum elt_state expired_lock_thread_state = ELT_STOPPED; static int expired_lock_dump; static LIST_HEAD(expired_lock_list); +static int ldlm_lock_busy(struct ldlm_lock *lock); +static int ldlm_add_waiting_lock(struct ldlm_lock *lock, time64_t timeout); +static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, time64_t seconds); + static inline int have_expired_locks(void) { int need_to_run; @@ -226,14 +230,31 @@ static int expired_lock_main(void *arg) export = class_export_lock_get(lock->l_export, lock); spin_unlock_bh(&waiting_locks_spinlock); - spin_lock_bh(&export->exp_bl_list_lock); - list_del_init(&lock->l_exp_list); - spin_unlock_bh(&export->exp_bl_list_lock); - - do_dump++; - class_fail_export(export); + /* Check if we need to prolong timeout */ + if (!OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT) && + lock->l_callback_timeout != 0 && /* not AST error */ + ldlm_lock_busy(lock)) { + LDLM_DEBUG(lock, "prolong the busy lock"); + lock_res_and_lock(lock); + ldlm_add_waiting_lock(lock, + ldlm_bl_timeout(lock) >> 1); + unlock_res_and_lock(lock); + } else { + spin_lock_bh(&export->exp_bl_list_lock); + list_del_init(&lock->l_exp_list); + spin_unlock_bh(&export->exp_bl_list_lock); + + LDLM_ERROR(lock, + "lock callback timer expired after " + "%llds: evicting client at %s ", + cfs_time_current_sec() - + lock->l_last_activity, + obd_export_nid2str(export)); + ldlm_lock_to_ns(lock)->ns_timeouts++; + do_dump++; + class_fail_export(export); + } class_export_lock_put(export, lock); - /* release extra ref grabbed by ldlm_add_waiting_lock() * or ldlm_failed_ast() */ LDLM_LOCK_RELEASE(lock); @@ -256,9 +277,6 @@ static int expired_lock_main(void *arg) RETURN(0); } -static int ldlm_add_waiting_lock(struct ldlm_lock *lock); -static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, time64_t seconds); - /** * Check if there is a request in the export request list * which prevents the lock canceling. @@ -272,7 +290,7 @@ static int ldlm_lock_busy(struct ldlm_lock *lock) if (lock->l_export == NULL) return 0; - spin_lock_bh(&lock->l_export->exp_rpc_lock); + spin_lock(&lock->l_export->exp_rpc_lock); list_for_each_entry(req, &lock->l_export->exp_hp_rpcs, rq_exp_list) { if (req->rq_ops->hpreq_lock_match) { @@ -281,7 +299,7 @@ static int ldlm_lock_busy(struct ldlm_lock *lock) break; } } - spin_unlock_bh(&lock->l_export->exp_rpc_lock); + spin_unlock(&lock->l_export->exp_rpc_lock); RETURN(match); } @@ -299,37 +317,6 @@ static void waiting_locks_callback(unsigned long unused) lock->l_req_mode == LCK_GROUP) break; - /* Check if we need to prolong timeout */ - if (!OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT) && - ldlm_lock_busy(lock)) { - int cont = 1; - - if (lock->l_pending_chain.next == &waiting_locks_list) - cont = 0; - - LDLM_LOCK_GET(lock); - - spin_unlock_bh(&waiting_locks_spinlock); - LDLM_DEBUG(lock, "prolong the busy lock"); - ldlm_refresh_waiting_lock(lock, - ldlm_bl_timeout(lock) >> 1); - spin_lock_bh(&waiting_locks_spinlock); - - if (!cont) { - LDLM_LOCK_RELEASE(lock); - break; - } - - LDLM_LOCK_RELEASE(lock); - continue; - } - ldlm_lock_to_ns(lock)->ns_timeouts++; - LDLM_ERROR(lock, "lock callback timer expired after %llds: " - "evicting client at %s ", - ktime_get_real_seconds() - lock->l_last_activity, - libcfs_nid2str( - lock->l_export->exp_connection->c_peer.nid)); - /* no needs to take an extra ref on the lock since it was in * the waiting_locks_list and ldlm_add_waiting_lock() * already grabbed a ref */ @@ -424,9 +411,8 @@ static void ldlm_add_blocked_lock(struct ldlm_lock *lock) obd_stale_export_adjust(lock->l_export); } -static int ldlm_add_waiting_lock(struct ldlm_lock *lock) +static int ldlm_add_waiting_lock(struct ldlm_lock *lock, time64_t timeout) { - time64_t timeout = ldlm_bl_timeout(lock); int ret; /* NB: must be called with hold of lock_res_and_lock() */ @@ -639,6 +625,7 @@ static void ldlm_failed_ast(struct ldlm_lock *lock, int rc, /* the lock was not in any list, grab an extra ref before adding * the lock to the expired list */ LDLM_LOCK_GET(lock); + lock->l_callback_timeout = 0; /* differentiate it from expired locks */ list_add(&lock->l_pending_chain, &expired_lock_list); wake_up(&expired_lock_wait_queue); spin_unlock_bh(&waiting_locks_spinlock); @@ -814,7 +801,7 @@ static void ldlm_lock_reorder_req(struct ldlm_lock *lock) RETURN_EXIT; } - spin_lock_bh(&lock->l_export->exp_rpc_lock); + spin_lock(&lock->l_export->exp_rpc_lock); list_for_each_entry(req, &lock->l_export->exp_hp_rpcs, rq_exp_list) { /* Do not process requests that were not yet added to there @@ -828,7 +815,7 @@ static void ldlm_lock_reorder_req(struct ldlm_lock *lock) req->rq_ops->hpreq_lock_match(req, lock)) ptlrpc_nrs_req_hp_move(req); } - spin_unlock_bh(&lock->l_export->exp_rpc_lock); + spin_unlock(&lock->l_export->exp_rpc_lock); EXIT; } @@ -919,7 +906,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock, req->rq_no_resend = 1; } else { LASSERT(lock->l_granted_mode == lock->l_req_mode); - ldlm_add_waiting_lock(lock); + ldlm_add_waiting_lock(lock, ldlm_bl_timeout(lock)); unlock_res_and_lock(lock); /* Do not resend after lock callback timeout */ @@ -1057,7 +1044,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data) lock_res_and_lock(lock); } else { /* start the lock-timeout clock */ - ldlm_add_waiting_lock(lock); + ldlm_add_waiting_lock(lock, ldlm_bl_timeout(lock)); /* Do not resend after lock callback timeout */ req->rq_delay_limit = ldlm_bl_timeout(lock); req->rq_resend_cb = ldlm_update_resend; @@ -1375,8 +1362,10 @@ existing_lock: unlock_res_and_lock(lock); ldlm_lock_cancel(lock); lock_res_and_lock(lock); - } else - ldlm_add_waiting_lock(lock); + } else { + ldlm_add_waiting_lock(lock, + ldlm_bl_timeout(lock)); + } } } unlock_res_and_lock(lock); diff --git a/lustre/ptlrpc/service.c b/lustre/ptlrpc/service.c index 0bb0175..4cbab10 100644 --- a/lustre/ptlrpc/service.c +++ b/lustre/ptlrpc/service.c @@ -993,18 +993,18 @@ void ptlrpc_request_change_export(struct ptlrpc_request *req, if (req->rq_export != NULL) { LASSERT(!list_empty(&req->rq_exp_list)); /* remove rq_exp_list from last export */ - spin_lock_bh(&req->rq_export->exp_rpc_lock); + spin_lock(&req->rq_export->exp_rpc_lock); list_del_init(&req->rq_exp_list); - spin_unlock_bh(&req->rq_export->exp_rpc_lock); + spin_unlock(&req->rq_export->exp_rpc_lock); /* export has one reference already, so it`s safe to * add req to export queue here and get another * reference for request later */ - spin_lock_bh(&export->exp_rpc_lock); + spin_lock(&export->exp_rpc_lock); if (req->rq_ops != NULL) /* hp request */ list_add(&req->rq_exp_list, &export->exp_hp_rpcs); else list_add(&req->rq_exp_list, &export->exp_reg_rpcs); - spin_unlock_bh(&export->exp_rpc_lock); + spin_unlock(&export->exp_rpc_lock); class_export_rpc_dec(req->rq_export); class_export_put(req->rq_export); @@ -1635,9 +1635,9 @@ static void ptlrpc_server_hpreq_fini(struct ptlrpc_request *req) if (req->rq_ops && req->rq_ops->hpreq_fini) req->rq_ops->hpreq_fini(req); - spin_lock_bh(&req->rq_export->exp_rpc_lock); + spin_lock(&req->rq_export->exp_rpc_lock); list_del_init(&req->rq_exp_list); - spin_unlock_bh(&req->rq_export->exp_rpc_lock); + spin_unlock(&req->rq_export->exp_rpc_lock); } EXIT; } -- 1.8.3.1