Whamcloud - gitweb
LU-6032 ldlm: don't disable softirq for exp_rpc_lock 57/12957/12
authorLiang Zhen <liang.zhen@intel.com>
Fri, 5 Dec 2014 14:13:17 +0000 (22:13 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Mon, 9 Apr 2018 19:46:24 +0000 (19:46 +0000)
it is not necessary to call ldlm_lock_busy() in the context of timer
callback, we can call it in thread context of expired_lock_main.
With this change, we don't need to disable softirq for exp_rpc_lock.

Instead of moving busy locks to the end of the waiting list one
at a time in the context of the timer callback, move any locks
that may be expired onto the expired list.  If these locks are
still being used by RPCs being processed, then put them back
onto the end of the waiting list instead of evicting the client.

Signed-off-by: Liang Zhen <liang.zhen@intel.com>
Change-Id: Ic3da0dd4e81b758c7448d9613ccd4786693e075d
Reviewed-on: https://review.whamcloud.com/12957
Reviewed-by: Dmitry Eremin <dmitry.eremin@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/ldlm/ldlm_lockd.c
lustre/ptlrpc/service.c

index 7f6191b..b1def6d 100644 (file)
@@ -145,6 +145,10 @@ static enum elt_state expired_lock_thread_state = ELT_STOPPED;
 static int expired_lock_dump;
 static LIST_HEAD(expired_lock_list);
 
 static int expired_lock_dump;
 static LIST_HEAD(expired_lock_list);
 
+static int ldlm_lock_busy(struct ldlm_lock *lock);
+static int ldlm_add_waiting_lock(struct ldlm_lock *lock, time64_t timeout);
+static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, time64_t seconds);
+
 static inline int have_expired_locks(void)
 {
        int need_to_run;
 static inline int have_expired_locks(void)
 {
        int need_to_run;
@@ -226,14 +230,31 @@ static int expired_lock_main(void *arg)
                        export = class_export_lock_get(lock->l_export, lock);
                        spin_unlock_bh(&waiting_locks_spinlock);
 
                        export = class_export_lock_get(lock->l_export, lock);
                        spin_unlock_bh(&waiting_locks_spinlock);
 
-                       spin_lock_bh(&export->exp_bl_list_lock);
-                       list_del_init(&lock->l_exp_list);
-                       spin_unlock_bh(&export->exp_bl_list_lock);
-
-                       do_dump++;
-                       class_fail_export(export);
+                       /* Check if we need to prolong timeout */
+                       if (!OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT) &&
+                           lock->l_callback_timeout != 0 && /* not AST error */
+                           ldlm_lock_busy(lock)) {
+                               LDLM_DEBUG(lock, "prolong the busy lock");
+                               lock_res_and_lock(lock);
+                               ldlm_add_waiting_lock(lock,
+                                               ldlm_bl_timeout(lock) >> 1);
+                               unlock_res_and_lock(lock);
+                       } else {
+                               spin_lock_bh(&export->exp_bl_list_lock);
+                               list_del_init(&lock->l_exp_list);
+                               spin_unlock_bh(&export->exp_bl_list_lock);
+
+                               LDLM_ERROR(lock,
+                                          "lock callback timer expired after "
+                                          "%llds: evicting client at %s ",
+                                          cfs_time_current_sec() -
+                                          lock->l_last_activity,
+                                          obd_export_nid2str(export));
+                               ldlm_lock_to_ns(lock)->ns_timeouts++;
+                               do_dump++;
+                               class_fail_export(export);
+                       }
                        class_export_lock_put(export, lock);
                        class_export_lock_put(export, lock);
-
                        /* release extra ref grabbed by ldlm_add_waiting_lock()
                         * or ldlm_failed_ast() */
                        LDLM_LOCK_RELEASE(lock);
                        /* release extra ref grabbed by ldlm_add_waiting_lock()
                         * or ldlm_failed_ast() */
                        LDLM_LOCK_RELEASE(lock);
@@ -256,9 +277,6 @@ static int expired_lock_main(void *arg)
        RETURN(0);
 }
 
        RETURN(0);
 }
 
-static int ldlm_add_waiting_lock(struct ldlm_lock *lock);
-static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, time64_t seconds);
-
 /**
  * Check if there is a request in the export request list
  * which prevents the lock canceling.
 /**
  * Check if there is a request in the export request list
  * which prevents the lock canceling.
@@ -272,7 +290,7 @@ static int ldlm_lock_busy(struct ldlm_lock *lock)
        if (lock->l_export == NULL)
                return 0;
 
        if (lock->l_export == NULL)
                return 0;
 
-       spin_lock_bh(&lock->l_export->exp_rpc_lock);
+       spin_lock(&lock->l_export->exp_rpc_lock);
        list_for_each_entry(req, &lock->l_export->exp_hp_rpcs,
                                rq_exp_list) {
                if (req->rq_ops->hpreq_lock_match) {
        list_for_each_entry(req, &lock->l_export->exp_hp_rpcs,
                                rq_exp_list) {
                if (req->rq_ops->hpreq_lock_match) {
@@ -281,7 +299,7 @@ static int ldlm_lock_busy(struct ldlm_lock *lock)
                                break;
                }
        }
                                break;
                }
        }
-       spin_unlock_bh(&lock->l_export->exp_rpc_lock);
+       spin_unlock(&lock->l_export->exp_rpc_lock);
        RETURN(match);
 }
 
        RETURN(match);
 }
 
@@ -299,37 +317,6 @@ static void waiting_locks_callback(unsigned long unused)
                    lock->l_req_mode == LCK_GROUP)
                        break;
 
                    lock->l_req_mode == LCK_GROUP)
                        break;
 
-                /* Check if we need to prolong timeout */
-                if (!OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT) &&
-                    ldlm_lock_busy(lock)) {
-                        int cont = 1;
-
-                        if (lock->l_pending_chain.next == &waiting_locks_list)
-                                cont = 0;
-
-                        LDLM_LOCK_GET(lock);
-
-                       spin_unlock_bh(&waiting_locks_spinlock);
-                       LDLM_DEBUG(lock, "prolong the busy lock");
-                       ldlm_refresh_waiting_lock(lock,
-                                                 ldlm_bl_timeout(lock) >> 1);
-                       spin_lock_bh(&waiting_locks_spinlock);
-
-                        if (!cont) {
-                                LDLM_LOCK_RELEASE(lock);
-                                break;
-                        }
-
-                        LDLM_LOCK_RELEASE(lock);
-                        continue;
-                }
-                ldlm_lock_to_ns(lock)->ns_timeouts++;
-               LDLM_ERROR(lock, "lock callback timer expired after %llds: "
-                           "evicting client at %s ",
-                          ktime_get_real_seconds() - lock->l_last_activity,
-                           libcfs_nid2str(
-                                   lock->l_export->exp_connection->c_peer.nid));
-
                 /* no needs to take an extra ref on the lock since it was in
                  * the waiting_locks_list and ldlm_add_waiting_lock()
                  * already grabbed a ref */
                 /* no needs to take an extra ref on the lock since it was in
                  * the waiting_locks_list and ldlm_add_waiting_lock()
                  * already grabbed a ref */
@@ -424,9 +411,8 @@ static void ldlm_add_blocked_lock(struct ldlm_lock *lock)
                obd_stale_export_adjust(lock->l_export);
 }
 
                obd_stale_export_adjust(lock->l_export);
 }
 
-static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
+static int ldlm_add_waiting_lock(struct ldlm_lock *lock, time64_t timeout)
 {
 {
-       time64_t timeout = ldlm_bl_timeout(lock);
        int ret;
 
        /* NB: must be called with hold of lock_res_and_lock() */
        int ret;
 
        /* NB: must be called with hold of lock_res_and_lock() */
@@ -639,6 +625,7 @@ static void ldlm_failed_ast(struct ldlm_lock *lock, int rc,
                /* the lock was not in any list, grab an extra ref before adding
                 * the lock to the expired list */
                LDLM_LOCK_GET(lock);
                /* the lock was not in any list, grab an extra ref before adding
                 * the lock to the expired list */
                LDLM_LOCK_GET(lock);
+       lock->l_callback_timeout = 0; /* differentiate it from expired locks */
        list_add(&lock->l_pending_chain, &expired_lock_list);
        wake_up(&expired_lock_wait_queue);
        spin_unlock_bh(&waiting_locks_spinlock);
        list_add(&lock->l_pending_chain, &expired_lock_list);
        wake_up(&expired_lock_wait_queue);
        spin_unlock_bh(&waiting_locks_spinlock);
@@ -814,7 +801,7 @@ static void ldlm_lock_reorder_req(struct ldlm_lock *lock)
                RETURN_EXIT;
        }
 
                RETURN_EXIT;
        }
 
-       spin_lock_bh(&lock->l_export->exp_rpc_lock);
+       spin_lock(&lock->l_export->exp_rpc_lock);
        list_for_each_entry(req, &lock->l_export->exp_hp_rpcs,
                            rq_exp_list) {
                /* Do not process requests that were not yet added to there
        list_for_each_entry(req, &lock->l_export->exp_hp_rpcs,
                            rq_exp_list) {
                /* Do not process requests that were not yet added to there
@@ -828,7 +815,7 @@ static void ldlm_lock_reorder_req(struct ldlm_lock *lock)
                    req->rq_ops->hpreq_lock_match(req, lock))
                        ptlrpc_nrs_req_hp_move(req);
        }
                    req->rq_ops->hpreq_lock_match(req, lock))
                        ptlrpc_nrs_req_hp_move(req);
        }
-       spin_unlock_bh(&lock->l_export->exp_rpc_lock);
+       spin_unlock(&lock->l_export->exp_rpc_lock);
        EXIT;
 }
 
        EXIT;
 }
 
@@ -919,7 +906,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
                req->rq_no_resend = 1;
        } else {
                LASSERT(lock->l_granted_mode == lock->l_req_mode);
                req->rq_no_resend = 1;
        } else {
                LASSERT(lock->l_granted_mode == lock->l_req_mode);
-               ldlm_add_waiting_lock(lock);
+               ldlm_add_waiting_lock(lock, ldlm_bl_timeout(lock));
                unlock_res_and_lock(lock);
 
                /* Do not resend after lock callback timeout */
                unlock_res_and_lock(lock);
 
                /* Do not resend after lock callback timeout */
@@ -1057,7 +1044,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
                        lock_res_and_lock(lock);
                } else {
                        /* start the lock-timeout clock */
                        lock_res_and_lock(lock);
                } else {
                        /* start the lock-timeout clock */
-                       ldlm_add_waiting_lock(lock);
+                       ldlm_add_waiting_lock(lock, ldlm_bl_timeout(lock));
                        /* Do not resend after lock callback timeout */
                        req->rq_delay_limit = ldlm_bl_timeout(lock);
                        req->rq_resend_cb = ldlm_update_resend;
                        /* Do not resend after lock callback timeout */
                        req->rq_delay_limit = ldlm_bl_timeout(lock);
                        req->rq_resend_cb = ldlm_update_resend;
@@ -1375,8 +1362,10 @@ existing_lock:
                                 unlock_res_and_lock(lock);
                                 ldlm_lock_cancel(lock);
                                 lock_res_and_lock(lock);
                                 unlock_res_and_lock(lock);
                                 ldlm_lock_cancel(lock);
                                 lock_res_and_lock(lock);
-                        } else
-                                ldlm_add_waiting_lock(lock);
+                       } else {
+                               ldlm_add_waiting_lock(lock,
+                                                     ldlm_bl_timeout(lock));
+                       }
                 }
         }
        unlock_res_and_lock(lock);
                 }
         }
        unlock_res_and_lock(lock);
index 0bb0175..4cbab10 100644 (file)
@@ -993,18 +993,18 @@ void ptlrpc_request_change_export(struct ptlrpc_request *req,
        if (req->rq_export != NULL) {
                LASSERT(!list_empty(&req->rq_exp_list));
                /* remove rq_exp_list from last export */
        if (req->rq_export != NULL) {
                LASSERT(!list_empty(&req->rq_exp_list));
                /* remove rq_exp_list from last export */
-               spin_lock_bh(&req->rq_export->exp_rpc_lock);
+               spin_lock(&req->rq_export->exp_rpc_lock);
                list_del_init(&req->rq_exp_list);
                list_del_init(&req->rq_exp_list);
-               spin_unlock_bh(&req->rq_export->exp_rpc_lock);
+               spin_unlock(&req->rq_export->exp_rpc_lock);
                /* export has one reference already, so it`s safe to
                 * add req to export queue here and get another
                 * reference for request later */
                /* export has one reference already, so it`s safe to
                 * add req to export queue here and get another
                 * reference for request later */
-               spin_lock_bh(&export->exp_rpc_lock);
+               spin_lock(&export->exp_rpc_lock);
                if (req->rq_ops != NULL) /* hp request */
                        list_add(&req->rq_exp_list, &export->exp_hp_rpcs);
                else
                        list_add(&req->rq_exp_list, &export->exp_reg_rpcs);
                if (req->rq_ops != NULL) /* hp request */
                        list_add(&req->rq_exp_list, &export->exp_hp_rpcs);
                else
                        list_add(&req->rq_exp_list, &export->exp_reg_rpcs);
-               spin_unlock_bh(&export->exp_rpc_lock);
+               spin_unlock(&export->exp_rpc_lock);
 
                class_export_rpc_dec(req->rq_export);
                class_export_put(req->rq_export);
 
                class_export_rpc_dec(req->rq_export);
                class_export_put(req->rq_export);
@@ -1635,9 +1635,9 @@ static void ptlrpc_server_hpreq_fini(struct ptlrpc_request *req)
                if (req->rq_ops && req->rq_ops->hpreq_fini)
                        req->rq_ops->hpreq_fini(req);
 
                if (req->rq_ops && req->rq_ops->hpreq_fini)
                        req->rq_ops->hpreq_fini(req);
 
-               spin_lock_bh(&req->rq_export->exp_rpc_lock);
+               spin_lock(&req->rq_export->exp_rpc_lock);
                list_del_init(&req->rq_exp_list);
                list_del_init(&req->rq_exp_list);
-               spin_unlock_bh(&req->rq_export->exp_rpc_lock);
+               spin_unlock(&req->rq_export->exp_rpc_lock);
        }
        EXIT;
 }
        }
        EXIT;
 }