RETURN(rc);
}
-/* Helper function for ldlm_run_ast_work().
- *
- * Send an existing rpc set specified by @arg->set and then
- * destroy it. Create new one if @do_create flag is set. */
-static int ldlm_deliver_cb_set(struct ldlm_cb_set_arg *arg, int do_create)
-{
- int rc = 0;
- ENTRY;
-
- if (arg->set) {
- ptlrpc_set_wait(arg->set);
- if (arg->type == LDLM_BL_CALLBACK)
- OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2);
- ptlrpc_set_destroy(arg->set);
- arg->set = NULL;
- arg->rpcs = 0;
- }
-
- if (do_create) {
- arg->set = ptlrpc_prep_set();
- if (arg->set == NULL)
- rc = -ENOMEM;
- }
-
- RETURN(rc);
-}
-
static int
ldlm_work_bl_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg)
{
ldlm_desc_ast_t ast_type)
{
struct ldlm_cb_set_arg arg = { 0 };
+ struct l_wait_info lwi = { 0 };
cfs_list_t *tmp, *pos;
int (*work_ast_lock)(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg);
unsigned int max_ast_count;
- int rc;
ENTRY;
if (cfs_list_empty(rpc_list))
RETURN(0);
- rc = ldlm_deliver_cb_set(&arg, 1);
- if (rc != 0)
- RETURN(rc);
+ cfs_atomic_set(&arg.restart, 0);
+ cfs_atomic_set(&arg.rpcs, 0);
+ cfs_waitq_init(&arg.waitq);
switch (ast_type) {
case LDLM_WORK_BL_AST:
}
max_ast_count = ns->ns_max_parallel_ast ? : UINT_MAX;
+ arg.threshold = max_ast_count;
cfs_list_for_each_safe(tmp, pos, rpc_list) {
(void)work_ast_lock(tmp, &arg);
- if (arg.rpcs > max_ast_count) {
- rc = ldlm_deliver_cb_set(&arg, 1);
- if (rc != 0)
- break;
- }
- }
+ if (cfs_atomic_read(&arg.rpcs) < max_ast_count)
+ continue;
- (void)ldlm_deliver_cb_set(&arg, 0);
+ l_wait_event(arg.waitq,
+ cfs_atomic_read(&arg.rpcs) < arg.threshold,
+ &lwi);
+ }
- if (rc == 0 && cfs_atomic_read(&arg.restart))
- rc = -ERESTART;
+ arg.threshold = 1;
+ l_wait_event(arg.waitq, cfs_atomic_read(&arg.rpcs) == 0, &lwi);
- RETURN(rc);
+ RETURN(cfs_atomic_read(&arg.restart) ? -ERESTART : 0);
}
static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
struct ldlm_cb_async_args *ca = data;
struct ldlm_lock *lock = ca->ca_lock;
struct ldlm_cb_set_arg *arg = ca->ca_set_arg;
- struct ptlrpc_request_set *set = arg->set;
ENTRY;
LASSERT(lock != NULL);
}
LDLM_LOCK_RELEASE(lock);
- cfs_waitq_signal(&set->set_waitq);
+ if (cfs_atomic_dec_return(&arg->rpcs) < arg->threshold)
+ cfs_waitq_signal(&arg->waitq);
RETURN(0);
}
cfs_atomic_inc(&arg->restart);
} else {
LDLM_LOCK_GET(lock);
- ptlrpc_set_add_req(arg->set, req);
- ++arg->rpcs;
+ ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+ cfs_atomic_inc(&arg->rpcs);
}
RETURN(rc);