RETURN(rc);
}
-/* Helper function for pair ldlm_run_{bl,cp}_ast_work().
+/* Helper function for ldlm_run_ast_work().
*
* Send an existing rpc set specified by @arg->set and then
* destroy it. Create new one if @do_create flag is set. */
static void
ldlm_send_and_maybe_create_set(struct ldlm_cb_set_arg *arg, int do_create)
{
- int rc;
+ ENTRY;
- rc = ptlrpc_set_wait(arg->set);
+ ptlrpc_set_wait(arg->set);
if (arg->type == LDLM_BL_CALLBACK)
OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2);
ptlrpc_set_destroy(arg->set);
if (do_create)
arg->set = ptlrpc_prep_set();
+
+ EXIT;
}
-int ldlm_run_bl_ast_work(struct list_head *rpc_list)
+static int
+ldlm_work_bl_ast_lock(struct list_head *tmp, struct ldlm_cb_set_arg *arg)
{
- struct ldlm_cb_set_arg arg;
- struct list_head *tmp, *pos;
struct ldlm_lock_desc d;
- int ast_count;
- int rc = 0;
+ struct ldlm_lock *lock = list_entry(tmp, struct ldlm_lock, l_bl_ast);
ENTRY;
- arg.set = ptlrpc_prep_set();
- atomic_set(&arg.restart, 0);
- arg.type = LDLM_BL_CALLBACK;
-
- ast_count = 0;
- list_for_each_safe(tmp, pos, rpc_list) {
- struct ldlm_lock *lock =
- list_entry(tmp, struct ldlm_lock, l_bl_ast);
-
- /* nobody should touch l_bl_ast */
- lock_res_and_lock(lock);
- list_del_init(&lock->l_bl_ast);
-
- LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
- LASSERT(lock->l_bl_ast_run == 0);
- LASSERT(lock->l_blocking_lock);
- lock->l_bl_ast_run++;
- unlock_res_and_lock(lock);
-
- ldlm_lock2desc(lock->l_blocking_lock, &d);
+ /* nobody should touch l_bl_ast */
+ lock_res_and_lock(lock);
+ list_del_init(&lock->l_bl_ast);
- LDLM_LOCK_PUT(lock->l_blocking_lock);
- lock->l_blocking_lock = NULL;
- rc = lock->l_blocking_ast(lock, &d, (void *)&arg,
- LDLM_CB_BLOCKING);
- LDLM_LOCK_PUT(lock);
- ast_count++;
+ LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
+ LASSERT(lock->l_bl_ast_run == 0);
+ LASSERT(lock->l_blocking_lock);
+ lock->l_bl_ast_run++;
+ unlock_res_and_lock(lock);
- /* Send the request set if it exceeds the PARALLEL_AST_LIMIT,
- * and create a new set for requests that remained in
- * @rpc_list */
- if (unlikely(ast_count == PARALLEL_AST_LIMIT)) {
- ldlm_send_and_maybe_create_set(&arg, 1);
- ast_count = 0;
- }
- }
+ ldlm_lock2desc(lock->l_blocking_lock, &d);
- if (ast_count > 0)
- ldlm_send_and_maybe_create_set(&arg, 0);
- else
- /* In case when number of ASTs is multiply of
- * PARALLEL_AST_LIMIT or @rpc_list was initially empty,
- * @arg.set must be destroyed here, otherwise we get
- * write memory leaking. */
- ptlrpc_set_destroy(arg.set);
+ LDLM_LOCK_PUT(lock->l_blocking_lock);
+ lock->l_blocking_lock = NULL;
+ lock->l_blocking_ast(lock, &d, (void *)arg,
+ LDLM_CB_BLOCKING);
+ LDLM_LOCK_PUT(lock);
- RETURN(atomic_read(&arg.restart) ? -ERESTART : 0);
+ RETURN(1);
}
-int ldlm_run_cp_ast_work(struct list_head *rpc_list)
+static int
+ldlm_work_cp_ast_lock(struct list_head *tmp, struct ldlm_cb_set_arg *arg)
{
- struct ldlm_cb_set_arg arg;
- struct list_head *tmp, *pos;
- int ast_count;
+ struct ldlm_lock *lock = list_entry(tmp, struct ldlm_lock, l_cp_ast);
+ ldlm_completion_callback completion_callback;
int rc = 0;
ENTRY;
- arg.set = ptlrpc_prep_set();
- atomic_set(&arg.restart, 0);
- arg.type = LDLM_CP_CALLBACK;
-
/* It's possible to receive a completion AST before we've set
* the l_completion_ast pointer: either because the AST arrived
* before the reply, or simply because there's a small race
* reader/writer reference, which we won't do until we get the
* reply and finish enqueueing. */
+ /* nobody should touch l_cp_ast */
+ lock_res_and_lock(lock);
+ list_del_init(&lock->l_cp_ast);
+ LASSERT(lock->l_flags & LDLM_FL_CP_REQD);
+ /* save l_completion_ast since it can be changed by
+ * mds_intent_policy(), see bug 14225 */
+ completion_callback = lock->l_completion_ast;
+ lock->l_flags &= ~LDLM_FL_CP_REQD;
+ unlock_res_and_lock(lock);
+
+ if (completion_callback != NULL) {
+ completion_callback(lock, 0, (void *)arg);
+ rc = 1;
+ }
+ LDLM_LOCK_PUT(lock);
+
+ RETURN(rc);
+}
+
+static int
+ldlm_work_revoke_ast_lock(struct list_head *tmp, struct ldlm_cb_set_arg *arg)
+{
+ struct ldlm_lock_desc desc;
+ struct ldlm_lock *lock = list_entry(tmp, struct ldlm_lock, l_export_chain);
+ ENTRY;
+
+ list_del_init(&lock->l_export_chain);
+
+ /* the desc just pretend to exclusive */
+ ldlm_lock2desc(lock, &desc);
+ desc.l_req_mode = LCK_EX;
+ desc.l_granted_mode = 0;
+
+ lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING);
+ LDLM_LOCK_PUT(lock);
+
+ RETURN(1);
+}
+
+int ldlm_run_ast_work(struct list_head *rpc_list, ldlm_desc_ast_t ast_type)
+{
+ struct ldlm_cb_set_arg arg;
+ struct list_head *tmp, *pos;
+ int (*work_ast_lock)(struct list_head *tmp, struct ldlm_cb_set_arg *arg);
+ int ast_count;
+ ENTRY;
+
+ arg.set = ptlrpc_prep_set();
+ atomic_set(&arg.restart, 0);
+ switch (ast_type) {
+ case LDLM_WORK_BL_AST:
+ arg.type = LDLM_BL_CALLBACK;
+ work_ast_lock = ldlm_work_bl_ast_lock;
+ break;
+ case LDLM_WORK_CP_AST:
+ arg.type = LDLM_CP_CALLBACK;
+ work_ast_lock = ldlm_work_cp_ast_lock;
+ break;
+ case LDLM_WORK_REVOKE_AST:
+ arg.type = LDLM_BL_CALLBACK;
+ work_ast_lock = ldlm_work_revoke_ast_lock;
+ break;
+ default:
+ LBUG();
+ }
+
ast_count = 0;
list_for_each_safe(tmp, pos, rpc_list) {
- struct ldlm_lock *lock =
- list_entry(tmp, struct ldlm_lock, l_cp_ast);
- ldlm_completion_callback completion_callback;
-
- /* nobody should touch l_cp_ast */
- lock_res_and_lock(lock);
- list_del_init(&lock->l_cp_ast);
- LASSERT(lock->l_flags & LDLM_FL_CP_REQD);
- /* save l_completion_ast since it can be changed by
- * mds_intent_policy(), see bug 14225 */
- completion_callback = lock->l_completion_ast;
- lock->l_flags &= ~LDLM_FL_CP_REQD;
- unlock_res_and_lock(lock);
-
- if (completion_callback != NULL) {
- rc = completion_callback(lock, 0, (void *)&arg);
- ast_count++;
- }
- LDLM_LOCK_PUT(lock);
+ ast_count += work_ast_lock(tmp, &arg);
/* Send the request set if it exceeds the PARALLEL_AST_LIMIT,
* and create a new set for requests that remained in
ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list);
unlock_res(res);
- rc = ldlm_run_cp_ast_work(&rpc_list);
+ rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_CP_AST);
if (rc == -ERESTART) {
LASSERT(list_empty(&rpc_list));
goto restart;
unlock_res_and_lock(lock);
if (granted)
- ldlm_run_cp_ast_work(&rpc_list);
+ ldlm_run_ast_work(&rpc_list, LDLM_WORK_CP_AST);
RETURN(res);
}