Whamcloud - gitweb
LU-571 ldlm: add parallel ast flow control
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
index 6d2795f..d009a1f 100644 (file)
@@ -28,6 +28,9 @@
 /*
  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2011 Whamcloud, Inc.
+ *
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -276,7 +279,7 @@ static int ldlm_lock_busy(struct ldlm_lock *lock)
 /* This is called from within a timer interrupt and cannot schedule */
 static void waiting_locks_callback(unsigned long unused)
 {
-        struct ldlm_lock *lock, *last = NULL;
+        struct ldlm_lock *lock;
 
 repeat:
         cfs_spin_lock_bh(&waiting_locks_spinlock);
@@ -352,8 +355,6 @@ repeat:
                            libcfs_nid2str(
                                    lock->l_export->exp_connection->c_peer.nid));
 
-                last = lock;
-
                 /* no needs to take an extra ref on the lock since it was in
                  * the waiting_locks_list and ldlm_add_waiting_lock()
                  * already grabbed a ref */
@@ -646,9 +647,9 @@ static int ldlm_handle_ast_error(struct ldlm_lock *lock,
 static int ldlm_cb_interpret(const struct lu_env *env,
                              struct ptlrpc_request *req, void *data, int rc)
 {
-        struct ldlm_cb_async_args *ca = data;
-        struct ldlm_cb_set_arg *arg = ca->ca_set_arg;
-        struct ldlm_lock *lock = ca->ca_lock;
+        struct ldlm_cb_async_args *ca   = data;
+        struct ldlm_lock          *lock = ca->ca_lock;
+        struct ldlm_cb_set_arg    *arg  = ca->ca_set_arg;
         ENTRY;
 
         LASSERT(lock != NULL);
@@ -656,17 +657,17 @@ static int ldlm_cb_interpret(const struct lu_env *env,
                 rc = ldlm_handle_ast_error(lock, req, rc,
                                            arg->type == LDLM_BL_CALLBACK
                                            ? "blocking" : "completion");
+                if (rc == -ERESTART)
+                        cfs_atomic_inc(&arg->restart);
         }
-
         LDLM_LOCK_RELEASE(lock);
 
-        if (rc == -ERESTART)
-                cfs_atomic_set(&arg->restart, 1);
-
+        if (cfs_atomic_dec_return(&arg->rpcs) < arg->threshold)
+                cfs_waitq_signal(&arg->waitq);
         RETURN(0);
 }
 
-static inline int ldlm_bl_and_cp_ast_fini(struct ptlrpc_request *req,
+static inline int ldlm_bl_and_cp_ast_tail(struct ptlrpc_request *req,
                                           struct ldlm_cb_set_arg *arg,
                                           struct ldlm_lock *lock,
                                           int instant_cancel)
@@ -678,12 +679,11 @@ static inline int ldlm_bl_and_cp_ast_fini(struct ptlrpc_request *req,
                 rc = ptl_send_rpc(req, 1);
                 ptlrpc_req_finished(req);
                 if (rc == 0)
-                        /* If we cancelled the lock, we need to restart
-                         * ldlm_reprocess_queue */
-                        cfs_atomic_set(&arg->restart, 1);
+                        cfs_atomic_inc(&arg->restart);
         } else {
                 LDLM_LOCK_GET(lock);
-                ptlrpc_set_add_req(arg->set, req);
+                ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+                cfs_atomic_inc(&arg->rpcs);
         }
 
         RETURN(rc);
@@ -807,7 +807,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
                 lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
                                      LDLM_BL_CALLBACK - LDLM_FIRST_OPC);
 
-        rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel);
+        rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel);
 
         RETURN(rc);
 }
@@ -922,7 +922,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data)
                 lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
                                      LDLM_CP_CALLBACK - LDLM_FIRST_OPC);
 
-        rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel);
+        rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel);
 
         RETURN(rc);
 }
@@ -966,12 +966,20 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
                                      LDLM_GL_CALLBACK - LDLM_FIRST_OPC);
 
         rc = ptlrpc_queue_wait(req);
-        if (rc == -ELDLM_NO_LOCK_DATA)
+        /* Update the LVB from disk if the AST failed (this is a legal race)
+         *
+         * - Glimpse callback of local lock just return -ELDLM_NO_LOCK_DATA.
+         * - Glimpse callback of remote lock might return -ELDLM_NO_LOCK_DATA
+         *   when inode is cleared. LU-274
+         */
+        if (rc == -ELDLM_NO_LOCK_DATA) {
                 LDLM_DEBUG(lock, "lost race - client has a lock but no inode");
-        else if (rc != 0)
+                ldlm_res_lvbo_update(res, NULL, 1);
+        } else if (rc != 0) {
                 rc = ldlm_handle_ast_error(lock, req, rc, "glimpse");
-        else
+        } else {
                 rc = ldlm_res_lvbo_update(res, req, 1);
+        }
 
         ptlrpc_req_finished(req);
         if (rc == -ERESTART)
@@ -1165,7 +1173,10 @@ existing_lock:
         }
 
         if (dlm_req->lock_desc.l_resource.lr_type != LDLM_PLAIN)
-                lock->l_policy_data = dlm_req->lock_desc.l_policy_data;
+                ldlm_convert_policy_to_local(
+                                          dlm_req->lock_desc.l_resource.lr_type,
+                                          &dlm_req->lock_desc.l_policy_data,
+                                          &lock->l_policy_data);
         if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
                 lock->l_req_extent = lock->l_policy_data.l_extent;
 
@@ -1537,7 +1548,10 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
         }
 
         if (lock->l_resource->lr_type != LDLM_PLAIN) {
-                lock->l_policy_data = dlm_req->lock_desc.l_policy_data;
+                ldlm_convert_policy_to_local(
+                                          dlm_req->lock_desc.l_resource.lr_type,
+                                          &dlm_req->lock_desc.l_policy_data,
+                                          &lock->l_policy_data);
                 LDLM_DEBUG(lock, "completion AST, new policy data");
         }
 
@@ -1587,7 +1601,7 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
          * l_ast_data */
         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2);
 
-        ldlm_run_ast_work(&ast_list, LDLM_WORK_CP_AST);
+        ldlm_run_ast_work(ns, &ast_list, LDLM_WORK_CP_AST);
 
         LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
                           lock);
@@ -2130,7 +2144,8 @@ void ldlm_revoke_export_locks(struct obd_export *exp)
         CFS_INIT_LIST_HEAD(&rpc_list);
         cfs_hash_for_each_empty(exp->exp_lock_hash,
                                 ldlm_revoke_lock_cb, &rpc_list);
-        ldlm_run_ast_work(&rpc_list, LDLM_WORK_REVOKE_AST);
+        ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list,
+                          LDLM_WORK_REVOKE_AST);
 
         EXIT;
 }
@@ -2179,7 +2194,7 @@ static int ldlm_bl_thread_start(struct ldlm_bl_pool *blp)
         int rc;
 
         cfs_init_completion(&bltd.bltd_comp);
-        rc = cfs_kernel_thread(ldlm_bl_thread_main, &bltd, 0);
+        rc = cfs_create_thread(ldlm_bl_thread_main, &bltd, 0);
         if (rc < 0) {
                 CERROR("cannot start LDLM thread ldlm_bl_%02d: rc %d\n",
                        cfs_atomic_read(&blp->blp_num_threads), rc);
@@ -2513,7 +2528,7 @@ static int ldlm_setup(void)
         cfs_spin_lock_init(&waiting_locks_spinlock);
         cfs_timer_init(&waiting_locks_timer, waiting_locks_callback, 0);
 
-        rc = cfs_kernel_thread(expired_lock_main, NULL, CLONE_VM | CLONE_FILES);
+        rc = cfs_create_thread(expired_lock_main, NULL, CFS_DAEMON_FLAGS);
         if (rc < 0) {
                 CERROR("Cannot start ldlm expired-lock thread: %d\n", rc);
                 GOTO(out_thread, rc);