Whamcloud - gitweb
LU-1222 ldlm: Fix the race in AST sender vs multiple arriving RPCs
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
index 55e10d8..558eee3 100644 (file)
@@ -29,8 +29,7 @@
  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011 Whamcloud, Inc.
- *
+ * Copyright (c) 2011, 2012, Whamcloud, Inc.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -180,13 +179,15 @@ static int expired_lock_main(void *arg)
 
                 cfs_spin_lock_bh(&waiting_locks_spinlock);
                 if (expired_lock_thread.elt_dump) {
+                        struct libcfs_debug_msg_data msgdata = {
+                                .msg_file = __FILE__,
+                                .msg_fn = "waiting_locks_callback",
+                                .msg_line = expired_lock_thread.elt_dump };
                         cfs_spin_unlock_bh(&waiting_locks_spinlock);
 
                         /* from waiting_locks_callback, but not in timer */
                         libcfs_debug_dumplog();
-                        libcfs_run_lbug_upcall(__FILE__,
-                                                "waiting_locks_callback",
-                                                expired_lock_thread.elt_dump);
+                        libcfs_run_lbug_upcall(&msgdata);
 
                         cfs_spin_lock_bh(&waiting_locks_spinlock);
                         expired_lock_thread.elt_dump = 0;
@@ -264,7 +265,7 @@ static int ldlm_lock_busy(struct ldlm_lock *lock)
                 return 0;
 
         cfs_spin_lock_bh(&lock->l_export->exp_rpc_lock);
-        cfs_list_for_each_entry(req, &lock->l_export->exp_queued_rpc,
+        cfs_list_for_each_entry(req, &lock->l_export->exp_hp_rpcs,
                                 rq_exp_list) {
                 if (req->rq_ops->hpreq_lock_match) {
                         match = req->rq_ops->hpreq_lock_match(req, lock);
@@ -444,12 +445,21 @@ static int ldlm_add_waiting_lock(struct ldlm_lock *lock)
         }
 
         ret = __ldlm_add_waiting_lock(lock, timeout);
-        if (ret)
+        if (ret) {
                 /* grab ref on the lock if it has been added to the
                  * waiting list */
                 LDLM_LOCK_GET(lock);
+        }
         cfs_spin_unlock_bh(&waiting_locks_spinlock);
 
+        if (ret) {
+                cfs_spin_lock_bh(&lock->l_export->exp_bl_list_lock);
+                if (cfs_list_empty(&lock->l_exp_list))
+                        cfs_list_add(&lock->l_exp_list,
+                                     &lock->l_export->exp_bl_list);
+                cfs_spin_unlock_bh(&lock->l_export->exp_bl_list_lock);
+        }
+
         LDLM_DEBUG(lock, "%sadding to wait list(timeout: %d, AT: %s)",
                    ret == 0 ? "not re-" : "", timeout,
                    AT_OFF ? "off" : "on");
@@ -504,10 +514,17 @@ int ldlm_del_waiting_lock(struct ldlm_lock *lock)
         cfs_spin_lock_bh(&waiting_locks_spinlock);
         ret = __ldlm_del_waiting_lock(lock);
         cfs_spin_unlock_bh(&waiting_locks_spinlock);
-        if (ret)
+
+        /* remove the lock out of export blocking list */
+        cfs_spin_lock_bh(&lock->l_export->exp_bl_list_lock);
+        cfs_list_del_init(&lock->l_exp_list);
+        cfs_spin_unlock_bh(&lock->l_export->exp_bl_list_lock);
+
+        if (ret) {
                 /* release lock ref if it has indeed been removed
                  * from a list */
                 LDLM_LOCK_RELEASE(lock);
+        }
 
         LDLM_DEBUG(lock, "%s", ret == 0 ? "wasn't waiting" : "removed");
         return ret;
@@ -664,6 +681,8 @@ static int ldlm_cb_interpret(const struct lu_env *env,
 
         if (cfs_atomic_dec_return(&arg->rpcs) < arg->threshold)
                 cfs_waitq_signal(&arg->waitq);
+
+        ldlm_csa_put(arg);
         RETURN(0);
 }
 
@@ -682,8 +701,9 @@ static inline int ldlm_bl_and_cp_ast_tail(struct ptlrpc_request *req,
                         cfs_atomic_inc(&arg->restart);
         } else {
                 LDLM_LOCK_GET(lock);
-                ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
                 cfs_atomic_inc(&arg->rpcs);
+                cfs_atomic_inc(&arg->refcount);
+                ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
         }
 
         RETURN(rc);
@@ -704,7 +724,7 @@ static void ldlm_lock_reorder_req(struct ldlm_lock *lock)
         }
 
         cfs_spin_lock_bh(&lock->l_export->exp_rpc_lock);
-        cfs_list_for_each_entry(req, &lock->l_export->exp_queued_rpc,
+        cfs_list_for_each_entry(req, &lock->l_export->exp_hp_rpcs,
                                 rq_exp_list) {
                 /* Do not process requests that were not yet added to there
                  * incoming queue or were already removed from there for
@@ -2099,6 +2119,90 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req)
         RETURN(0);
 }
 
+static int ldlm_cancel_hpreq_lock_match(struct ptlrpc_request *req,
+                                        struct ldlm_lock *lock)
+{
+        struct ldlm_request *dlm_req;
+        struct lustre_handle lockh;
+        int rc = 0;
+        int i;
+        ENTRY;
+
+        dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
+        if (dlm_req == NULL)
+                RETURN(0);
+
+        ldlm_lock2handle(lock, &lockh);
+        for (i = 0; i < dlm_req->lock_count; i++) {
+                if (lustre_handle_equal(&dlm_req->lock_handle[i],
+                                        &lockh)) {
+                        DEBUG_REQ(D_RPCTRACE, req,
+                                  "Prio raised by lock "LPX64".", lockh.cookie);
+
+                        rc = 1;
+                        break;
+                }
+        }
+
+        RETURN(rc);
+
+}
+
+static int ldlm_cancel_hpreq_check(struct ptlrpc_request *req)
+{
+        struct ldlm_request *dlm_req;
+        int rc = 0;
+        int i;
+        ENTRY;
+
+        /* no prolong in recovery */
+        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)
+                RETURN(0);
+
+        dlm_req = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
+        if (dlm_req == NULL)
+                RETURN(-EFAULT);
+
+        for (i = 0; i < dlm_req->lock_count; i++) {
+                struct ldlm_lock *lock;
+
+                lock = ldlm_handle2lock(&dlm_req->lock_handle[i]);
+                if (lock == NULL)
+                        continue;
+
+                rc = !!(lock->l_flags & LDLM_FL_AST_SENT);
+                if (rc)
+                        LDLM_DEBUG(lock, "hpreq cancel lock");
+                LDLM_LOCK_PUT(lock);
+
+                if (rc)
+                        break;
+        }
+
+        RETURN(rc);
+}
+
+static struct ptlrpc_hpreq_ops ldlm_cancel_hpreq_ops = {
+        .hpreq_lock_match = ldlm_cancel_hpreq_lock_match,
+        .hpreq_check      = ldlm_cancel_hpreq_check
+};
+
+static int ldlm_hpreq_handler(struct ptlrpc_request *req)
+{
+        ENTRY;
+
+        req_capsule_init(&req->rq_pill, req, RCL_SERVER);
+
+        if (req->rq_export == NULL)
+                RETURN(0);
+
+        if (LDLM_CANCEL == lustre_msg_get_opc(req->rq_reqmsg)) {
+                req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
+                req->rq_ops = &ldlm_cancel_hpreq_ops;
+        }
+        RETURN(0);
+}
+
 int ldlm_revoke_lock_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd,
                         cfs_hlist_node_t *hnode, void *data)
 
@@ -2486,7 +2590,7 @@ static int ldlm_setup(void)
                                 ldlm_min_threads, ldlm_max_threads,
                                 "ldlm_cn",
                                 LCT_MD_THREAD|LCT_DT_THREAD|LCT_CL_THREAD,
-                                NULL);
+                                ldlm_hpreq_handler);
 
         if (!ldlm_state->ldlm_cancel_service) {
                 CERROR("failed to start service\n");