Imperative Recovery needs to notify clients as soon as possible.
However, limiting the AST count to 200 will definitely block it.
This may result in the recovery window has passed before the MGS
notifies all clients.
In this patch, I revised the above limitation to be configurable,
and it is no limit by default.
Change-Id: I141da9dbd8b282a76252da15b93c432ac5253bfd
Signed-off-by: Jinshan Xiong <jay@whamcloud.com>
Reviewed-on: http://review.whamcloud.com/1190
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
* Limit size of nolock requests, in bytes.
*/
unsigned ns_max_nolock_size;
+
+ /**
+ * Limit of parallel AST RPC count.
+ */
+ unsigned ns_max_parallel_ast;
+
/* callback to cancel locks before replaying it during recovery */
ldlm_cancel_for_recovery ns_cancel_for_recovery;
/**
if (cfs_list_empty(&lock->l_res_link))
ldlm_resource_add_lock(res, &res->lr_waiting, lock);
unlock_res(res);
- rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_BL_AST);
+ rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
+ LDLM_WORK_BL_AST);
if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) &&
!ns_is_client(ldlm_res_to_ns(res)))
class_fail_export(lock->l_export);
-
+
lock_res(res);
if (rc == -ERESTART) {
&rpc_list);
unlock_res_and_lock(req);
- rc = ldlm_run_ast_work(&rpc_list,
+ rc = ldlm_run_ast_work(ns, &rpc_list,
LDLM_WORK_CP_AST);
lock_res_and_lock(req);
if (rc == -ERESTART)
if (cfs_list_empty(&lock->l_res_link))
ldlm_resource_add_lock(res, &res->lr_waiting, lock);
unlock_res(res);
- rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_BL_AST);
+ rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
+ LDLM_WORK_BL_AST);
lock_res(res);
if (rc == -ERESTART)
GOTO(restart, -ERESTART);
void ldlm_namespace_free_post(struct ldlm_namespace *ns);
/* ldlm_lock.c */
-/* Number of blocking/completion callbacks that will be sent in
- * parallel (see bug 11301). */
-#define PARALLEL_AST_LIMIT 200
-
struct ldlm_cb_set_arg {
struct ptlrpc_request_set *set;
- cfs_atomic_t restart;
- __u32 type; /* LDLM_BL_CALLBACK or LDLM_CP_CALLBACK */
+ cfs_atomic_t restart;
+ int type; /* LDLM_BL_CALLBACK or LDLM_CP_CALLBACK */
+ int rpcs; /* # of rpcs in set */
};
typedef enum {
cfs_list_t *work_list);
int ldlm_reprocess_queue(struct ldlm_resource *res, cfs_list_t *queue,
cfs_list_t *work_list);
-int ldlm_run_ast_work(cfs_list_t *rpc_list, ldlm_desc_ast_t ast_type);
+int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list,
+ ldlm_desc_ast_t ast_type);
int ldlm_lock_remove_from_lru(struct ldlm_lock *lock);
int ldlm_lock_remove_from_lru_nolock(struct ldlm_lock *lock);
void ldlm_lock_add_to_lru_nolock(struct ldlm_lock *lock);
*
* Send an existing rpc set specified by @arg->set and then
* destroy it. Create new one if @do_create flag is set. */
-static void
-ldlm_send_and_maybe_create_set(struct ldlm_cb_set_arg *arg, int do_create)
+static int ldlm_deliver_cb_set(struct ldlm_cb_set_arg *arg, int do_create)
{
+ int rc = 0;
ENTRY;
- ptlrpc_set_wait(arg->set);
- if (arg->type == LDLM_BL_CALLBACK)
- OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2);
- ptlrpc_set_destroy(arg->set);
+ if (arg->set) {
+ ptlrpc_set_wait(arg->set);
+ if (arg->type == LDLM_BL_CALLBACK)
+ OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2);
+ ptlrpc_set_destroy(arg->set);
+ arg->set = NULL;
+ arg->rpcs = 0;
+ }
- if (do_create)
+ if (do_create) {
arg->set = ptlrpc_prep_set();
+ if (arg->set == NULL)
+ rc = -ENOMEM;
+ }
- EXIT;
+ RETURN(rc);
}
static int
struct ldlm_lock_desc d;
struct ldlm_lock *lock = cfs_list_entry(tmp, struct ldlm_lock,
l_bl_ast);
+ int rc;
ENTRY;
/* nobody should touch l_bl_ast */
ldlm_lock2desc(lock->l_blocking_lock, &d);
- lock->l_blocking_ast(lock, &d, (void *)arg,
- LDLM_CB_BLOCKING);
+ rc = lock->l_blocking_ast(lock, &d, (void *)arg,
+ LDLM_CB_BLOCKING);
LDLM_LOCK_RELEASE(lock->l_blocking_lock);
lock->l_blocking_lock = NULL;
LDLM_LOCK_RELEASE(lock);
- RETURN(1);
+ RETURN(rc);
}
static int
lock->l_flags &= ~LDLM_FL_CP_REQD;
unlock_res_and_lock(lock);
- if (completion_callback != NULL) {
- completion_callback(lock, 0, (void *)arg);
- rc = 1;
- }
+ if (completion_callback != NULL)
+ rc = completion_callback(lock, 0, (void *)arg);
LDLM_LOCK_RELEASE(lock);
RETURN(rc);
struct ldlm_lock_desc desc;
struct ldlm_lock *lock = cfs_list_entry(tmp, struct ldlm_lock,
l_rk_ast);
+ int rc;
ENTRY;
cfs_list_del_init(&lock->l_rk_ast);
desc.l_req_mode = LCK_EX;
desc.l_granted_mode = 0;
- lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING);
+ rc = lock->l_blocking_ast(lock, &desc, (void*)arg, LDLM_CB_BLOCKING);
LDLM_LOCK_RELEASE(lock);
- RETURN(1);
+ RETURN(rc);
}
-int ldlm_run_ast_work(cfs_list_t *rpc_list, ldlm_desc_ast_t ast_type)
+int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list,
+ ldlm_desc_ast_t ast_type)
{
- struct ldlm_cb_set_arg arg;
+ struct ldlm_cb_set_arg arg = { 0 };
cfs_list_t *tmp, *pos;
int (*work_ast_lock)(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg);
- int ast_count;
+ unsigned int max_ast_count;
+ int rc;
ENTRY;
if (cfs_list_empty(rpc_list))
RETURN(0);
- arg.set = ptlrpc_prep_set();
- if (NULL == arg.set)
- RETURN(-ERESTART);
- cfs_atomic_set(&arg.restart, 0);
+ rc = ldlm_deliver_cb_set(&arg, 1);
+ if (rc != 0)
+ RETURN(rc);
+
switch (ast_type) {
case LDLM_WORK_BL_AST:
arg.type = LDLM_BL_CALLBACK;
LBUG();
}
- ast_count = 0;
+ max_ast_count = ns->ns_max_parallel_ast ? : UINT_MAX;
+
cfs_list_for_each_safe(tmp, pos, rpc_list) {
- ast_count += work_ast_lock(tmp, &arg);
-
- /* Send the request set if it exceeds the PARALLEL_AST_LIMIT,
- * and create a new set for requests that remained in
- * @rpc_list */
- if (unlikely(ast_count == PARALLEL_AST_LIMIT)) {
- ldlm_send_and_maybe_create_set(&arg, 1);
- ast_count = 0;
+ (void)work_ast_lock(tmp, &arg);
+ if (arg.rpcs > max_ast_count) {
+ rc = ldlm_deliver_cb_set(&arg, 1);
+ if (rc != 0)
+ break;
}
}
- if (ast_count > 0)
- ldlm_send_and_maybe_create_set(&arg, 0);
- else
- /* In case when number of ASTs is multiply of
- * PARALLEL_AST_LIMIT or @rpc_list was initially empty,
- * @arg.set must be destroyed here, otherwise we get
- * write memory leaking. */
- ptlrpc_set_destroy(arg.set);
+ (void)ldlm_deliver_cb_set(&arg, 0);
- RETURN(cfs_atomic_read(&arg.restart) ? -ERESTART : 0);
+ if (rc == 0 && cfs_atomic_read(&arg.restart))
+ rc = -ERESTART;
+
+ RETURN(rc);
}
static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list);
unlock_res(res);
- rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_CP_AST);
+ rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
+ LDLM_WORK_CP_AST);
if (rc == -ERESTART) {
LASSERT(cfs_list_empty(&rpc_list));
goto restart;
unlock_res_and_lock(lock);
if (granted)
- ldlm_run_ast_work(&rpc_list, LDLM_WORK_CP_AST);
+ ldlm_run_ast_work(ns, &rpc_list, LDLM_WORK_CP_AST);
if (node)
OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
RETURN(res);
static int ldlm_cb_interpret(const struct lu_env *env,
struct ptlrpc_request *req, void *data, int rc)
{
- struct ldlm_cb_async_args *ca = data;
- struct ldlm_cb_set_arg *arg = ca->ca_set_arg;
- struct ldlm_lock *lock = ca->ca_lock;
+ struct ldlm_cb_async_args *ca = data;
+ struct ldlm_lock *lock = ca->ca_lock;
+ struct ldlm_cb_set_arg *arg = ca->ca_set_arg;
+ struct ptlrpc_request_set *set = arg->set;
ENTRY;
LASSERT(lock != NULL);
rc = ldlm_handle_ast_error(lock, req, rc,
arg->type == LDLM_BL_CALLBACK
? "blocking" : "completion");
+ if (rc == -ERESTART)
+ cfs_atomic_inc(&arg->restart);
}
-
LDLM_LOCK_RELEASE(lock);
- if (rc == -ERESTART)
- cfs_atomic_set(&arg->restart, 1);
-
+ cfs_waitq_signal(&set->set_waitq);
RETURN(0);
}
-static inline int ldlm_bl_and_cp_ast_fini(struct ptlrpc_request *req,
+static inline int ldlm_bl_and_cp_ast_tail(struct ptlrpc_request *req,
struct ldlm_cb_set_arg *arg,
struct ldlm_lock *lock,
int instant_cancel)
rc = ptl_send_rpc(req, 1);
ptlrpc_req_finished(req);
if (rc == 0)
- /* If we cancelled the lock, we need to restart
- * ldlm_reprocess_queue */
- cfs_atomic_set(&arg->restart, 1);
+ cfs_atomic_inc(&arg->restart);
} else {
LDLM_LOCK_GET(lock);
ptlrpc_set_add_req(arg->set, req);
+ ++arg->rpcs;
}
RETURN(rc);
lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
LDLM_BL_CALLBACK - LDLM_FIRST_OPC);
- rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel);
+ rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel);
RETURN(rc);
}
lprocfs_counter_incr(lock->l_export->exp_nid_stats->nid_ldlm_stats,
LDLM_CP_CALLBACK - LDLM_FIRST_OPC);
- rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel);
+ rc = ldlm_bl_and_cp_ast_tail(req, arg, lock, instant_cancel);
RETURN(rc);
}
* l_ast_data */
OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2);
- ldlm_run_ast_work(&ast_list, LDLM_WORK_CP_AST);
+ ldlm_run_ast_work(ns, &ast_list, LDLM_WORK_CP_AST);
LDLM_DEBUG_NOLOCK("client completion callback handler END (lock %p)",
lock);
CFS_INIT_LIST_HEAD(&rpc_list);
cfs_hash_for_each_empty(exp->exp_lock_hash,
ldlm_revoke_lock_cb, &rpc_list);
- ldlm_run_ast_work(&rpc_list, LDLM_WORK_REVOKE_AST);
+ ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list,
+ LDLM_WORK_REVOKE_AST);
EXIT;
}
if (cfs_list_empty(&lock->l_res_link))
ldlm_resource_add_lock(res, &res->lr_waiting, lock);
unlock_res(res);
- rc = ldlm_run_ast_work(&rpc_list, LDLM_WORK_BL_AST);
+ rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
+ LDLM_WORK_BL_AST);
lock_res(res);
if (rc == -ERESTART)
GOTO(restart, -ERESTART);
lock_vars[0].read_fptr = lprocfs_rd_uint;
lock_vars[0].write_fptr = lprocfs_wr_uint;
lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
+
+ snprintf(lock_name, MAX_STRING_SIZE, "%s/max_parallel_ast",
+ ldlm_ns_name(ns));
+ lock_vars[0].data = &ns->ns_max_parallel_ast;
+ lock_vars[0].read_fptr = lprocfs_rd_uint;
+ lock_vars[0].write_fptr = lprocfs_wr_uint;
+ lprocfs_add_vars(ldlm_ns_proc_dir, lock_vars, 0);
}
return 0;
}