int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list,
ldlm_desc_ast_t ast_type)
{
- struct ldlm_cb_set_arg arg = { 0 };
struct l_wait_info lwi = { 0 };
+ struct ldlm_cb_set_arg *arg;
cfs_list_t *tmp, *pos;
int (*work_ast_lock)(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg);
unsigned int max_ast_count;
+ int rc;
ENTRY;
if (cfs_list_empty(rpc_list))
RETURN(0);
- cfs_atomic_set(&arg.restart, 0);
- cfs_atomic_set(&arg.rpcs, 0);
- cfs_waitq_init(&arg.waitq);
+ OBD_ALLOC_PTR(arg);
+ if (arg == NULL)
+ RETURN(-ENOMEM);
+
+ cfs_atomic_set(&arg->restart, 0);
+ cfs_atomic_set(&arg->rpcs, 0);
+ cfs_atomic_set(&arg->refcount, 1);
+ cfs_waitq_init(&arg->waitq);
switch (ast_type) {
case LDLM_WORK_BL_AST:
- arg.type = LDLM_BL_CALLBACK;
+ arg->type = LDLM_BL_CALLBACK;
work_ast_lock = ldlm_work_bl_ast_lock;
break;
case LDLM_WORK_CP_AST:
- arg.type = LDLM_CP_CALLBACK;
+ arg->type = LDLM_CP_CALLBACK;
work_ast_lock = ldlm_work_cp_ast_lock;
break;
case LDLM_WORK_REVOKE_AST:
- arg.type = LDLM_BL_CALLBACK;
+ arg->type = LDLM_BL_CALLBACK;
work_ast_lock = ldlm_work_revoke_ast_lock;
break;
default:
}
max_ast_count = ns->ns_max_parallel_ast ? : UINT_MAX;
- arg.threshold = max_ast_count;
+ arg->threshold = max_ast_count;
cfs_list_for_each_safe(tmp, pos, rpc_list) {
- (void)work_ast_lock(tmp, &arg);
- if (cfs_atomic_read(&arg.rpcs) < max_ast_count)
+ (void)work_ast_lock(tmp, arg);
+ if (cfs_atomic_read(&arg->rpcs) < max_ast_count)
continue;
- l_wait_event(arg.waitq,
- cfs_atomic_read(&arg.rpcs) < arg.threshold,
+ l_wait_event(arg->waitq,
+ cfs_atomic_read(&arg->rpcs) < arg->threshold,
&lwi);
}
- arg.threshold = 1;
- l_wait_event(arg.waitq, cfs_atomic_read(&arg.rpcs) == 0, &lwi);
+ arg->threshold = 1;
+ l_wait_event(arg->waitq, cfs_atomic_read(&arg->rpcs) == 0, &lwi);
- RETURN(cfs_atomic_read(&arg.restart) ? -ERESTART : 0);
+ rc = cfs_atomic_read(&arg->restart) ? -ERESTART : 0;
+ ldlm_csa_put(arg);
+ RETURN(rc);
}
static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
struct ldlm_cb_async_args *ca = data;
struct ldlm_lock *lock = ca->ca_lock;
struct ldlm_cb_set_arg *arg = ca->ca_set_arg;
+ int count;
ENTRY;
LASSERT(lock != NULL);
}
LDLM_LOCK_RELEASE(lock);
- if (cfs_atomic_dec_return(&arg->rpcs) < arg->threshold)
+ count = cfs_atomic_dec_return(&arg->rpcs);
+ if (count < arg->threshold)
cfs_waitq_signal(&arg->waitq);
+ if (count == 0)
+ ldlm_csa_put(arg);
RETURN(0);
}
cfs_atomic_inc(&arg->restart);
} else {
LDLM_LOCK_GET(lock);
+ if (cfs_atomic_inc_return(&arg->rpcs) == 1)
+ cfs_atomic_inc(&arg->refcount);
ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
- cfs_atomic_inc(&arg->rpcs);
}
RETURN(rc);