Whamcloud - gitweb
LU-571 ldlm: add parallel ast flow control
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
index 9378385..eaaaa43 100644 (file)
@@ -1455,33 +1455,6 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, cfs_list_t *queue,
         RETURN(rc);
 }
 
-/* Helper function for ldlm_run_ast_work().
- *
- * Send an existing rpc set specified by @arg->set and then
- * destroy it. Create new one if @do_create flag is set. */
-static int ldlm_deliver_cb_set(struct ldlm_cb_set_arg *arg, int do_create)
-{
-        int rc = 0;
-        ENTRY;
-
-        if (arg->set) {
-                ptlrpc_set_wait(arg->set);
-                if (arg->type == LDLM_BL_CALLBACK)
-                        OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2);
-                ptlrpc_set_destroy(arg->set);
-                arg->set = NULL;
-                arg->rpcs = 0;
-        }
-
-        if (do_create) {
-                arg->set = ptlrpc_prep_set();
-                if (arg->set == NULL)
-                        rc = -ENOMEM;
-        }
-
-        RETURN(rc);
-}
-
 static int
 ldlm_work_bl_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg)
 {
@@ -1574,18 +1547,18 @@ int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list,
                       ldlm_desc_ast_t ast_type)
 {
         struct ldlm_cb_set_arg arg = { 0 };
+        struct l_wait_info     lwi = { 0 };
         cfs_list_t *tmp, *pos;
         int (*work_ast_lock)(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg);
         unsigned int max_ast_count;
-        int rc;
         ENTRY;
 
         if (cfs_list_empty(rpc_list))
                 RETURN(0);
 
-        rc = ldlm_deliver_cb_set(&arg, 1);
-        if (rc != 0)
-                RETURN(rc);
+        cfs_atomic_set(&arg.restart, 0);
+        cfs_atomic_set(&arg.rpcs, 0);
+        cfs_waitq_init(&arg.waitq);
 
         switch (ast_type) {
         case LDLM_WORK_BL_AST:
@@ -1605,22 +1578,22 @@ int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list,
         }
 
         max_ast_count = ns->ns_max_parallel_ast ? : UINT_MAX;
+        arg.threshold = max_ast_count;
 
         cfs_list_for_each_safe(tmp, pos, rpc_list) {
                 (void)work_ast_lock(tmp, &arg);
-                if (arg.rpcs > max_ast_count) {
-                        rc = ldlm_deliver_cb_set(&arg, 1);
-                        if (rc != 0)
-                                break;
-                }
-        }
+                if (cfs_atomic_read(&arg.rpcs) < max_ast_count)
+                        continue;
 
-        (void)ldlm_deliver_cb_set(&arg, 0);
+                l_wait_event(arg.waitq,
+                             cfs_atomic_read(&arg.rpcs) < arg.threshold,
+                             &lwi);
+        }
 
-        if (rc == 0 && cfs_atomic_read(&arg.restart))
-                rc = -ERESTART;
+        arg.threshold = 1;
+        l_wait_event(arg.waitq, cfs_atomic_read(&arg.rpcs) == 0, &lwi);
 
-        RETURN(rc);
+        RETURN(cfs_atomic_read(&arg.restart) ? -ERESTART : 0);
 }
 
 static int reprocess_one_queue(struct ldlm_resource *res, void *closure)