Commit {hash:
8c83e7d75989ef527e43a824a0dbe46bffabd07d} removed the
parallel AST limit on the server. However, if there are too many locks
to revoke, it will have to allocate too many RPCs.
Return to having an upper limit on the number of AST RPCs in flight by
adding a flow control algorithm that allows a configurable upper limit on
the number of RPCs in flight.
Change-Id: Ifb68991acf7a33119b334447aec50f7717ed546e
Signed-off-by: Jinshan Xiong <jay@whamcloud.com>
Reviewed-on: http://review.whamcloud.com/1608
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
#endif
#define LDLM_DEFAULT_MAX_ALIVE (cfs_time_seconds(36000))
#define LDLM_CTIME_AGE_LIMIT (10)
#endif
#define LDLM_DEFAULT_MAX_ALIVE (cfs_time_seconds(36000))
#define LDLM_CTIME_AGE_LIMIT (10)
+#define LDLM_DEFAULT_PARALLEL_AST_LIMIT 1024
typedef enum {
ELDLM_OK = 0,
typedef enum {
ELDLM_OK = 0,
/* ldlm_lock.c */
struct ldlm_cb_set_arg {
/* ldlm_lock.c */
struct ldlm_cb_set_arg {
- struct ptlrpc_request_set *set;
- cfs_atomic_t restart;
- int type; /* LDLM_BL_CALLBACK or LDLM_CP_CALLBACK */
- int rpcs; /* # of rpcs in set */
+ int type; /* LDLM_BL_CALLBACK or LDLM_CP_CALLBACK */
+ unsigned int threshold; /* threshold to wake up the waiting proc */
+ cfs_atomic_t rpcs; /* # of inflight rpcs in set */
+ cfs_atomic_t restart;
+ cfs_waitq_t waitq;
-/* Helper function for ldlm_run_ast_work().
- *
- * Send an existing rpc set specified by @arg->set and then
- * destroy it. Create new one if @do_create flag is set. */
-static int ldlm_deliver_cb_set(struct ldlm_cb_set_arg *arg, int do_create)
-{
- int rc = 0;
- ENTRY;
-
- if (arg->set) {
- ptlrpc_set_wait(arg->set);
- if (arg->type == LDLM_BL_CALLBACK)
- OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2);
- ptlrpc_set_destroy(arg->set);
- arg->set = NULL;
- arg->rpcs = 0;
- }
-
- if (do_create) {
- arg->set = ptlrpc_prep_set();
- if (arg->set == NULL)
- rc = -ENOMEM;
- }
-
- RETURN(rc);
-}
-
static int
ldlm_work_bl_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg)
{
static int
ldlm_work_bl_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg)
{
ldlm_desc_ast_t ast_type)
{
struct ldlm_cb_set_arg arg = { 0 };
ldlm_desc_ast_t ast_type)
{
struct ldlm_cb_set_arg arg = { 0 };
+ struct l_wait_info lwi = { 0 };
cfs_list_t *tmp, *pos;
int (*work_ast_lock)(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg);
unsigned int max_ast_count;
cfs_list_t *tmp, *pos;
int (*work_ast_lock)(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg);
unsigned int max_ast_count;
ENTRY;
if (cfs_list_empty(rpc_list))
RETURN(0);
ENTRY;
if (cfs_list_empty(rpc_list))
RETURN(0);
- rc = ldlm_deliver_cb_set(&arg, 1);
- if (rc != 0)
- RETURN(rc);
+ cfs_atomic_set(&arg.restart, 0);
+ cfs_atomic_set(&arg.rpcs, 0);
+ cfs_waitq_init(&arg.waitq);
switch (ast_type) {
case LDLM_WORK_BL_AST:
switch (ast_type) {
case LDLM_WORK_BL_AST:
}
max_ast_count = ns->ns_max_parallel_ast ? : UINT_MAX;
}
max_ast_count = ns->ns_max_parallel_ast ? : UINT_MAX;
+ arg.threshold = max_ast_count;
cfs_list_for_each_safe(tmp, pos, rpc_list) {
(void)work_ast_lock(tmp, &arg);
cfs_list_for_each_safe(tmp, pos, rpc_list) {
(void)work_ast_lock(tmp, &arg);
- if (arg.rpcs > max_ast_count) {
- rc = ldlm_deliver_cb_set(&arg, 1);
- if (rc != 0)
- break;
- }
- }
+ if (cfs_atomic_read(&arg.rpcs) < max_ast_count)
+ continue;
- (void)ldlm_deliver_cb_set(&arg, 0);
+ l_wait_event(arg.waitq,
+ cfs_atomic_read(&arg.rpcs) < arg.threshold,
+ &lwi);
+ }
- if (rc == 0 && cfs_atomic_read(&arg.restart))
- rc = -ERESTART;
+ arg.threshold = 1;
+ l_wait_event(arg.waitq, cfs_atomic_read(&arg.rpcs) == 0, &lwi);
+ RETURN(cfs_atomic_read(&arg.restart) ? -ERESTART : 0);
}
static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
}
static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
struct ldlm_cb_async_args *ca = data;
struct ldlm_lock *lock = ca->ca_lock;
struct ldlm_cb_set_arg *arg = ca->ca_set_arg;
struct ldlm_cb_async_args *ca = data;
struct ldlm_lock *lock = ca->ca_lock;
struct ldlm_cb_set_arg *arg = ca->ca_set_arg;
- struct ptlrpc_request_set *set = arg->set;
ENTRY;
LASSERT(lock != NULL);
ENTRY;
LASSERT(lock != NULL);
}
LDLM_LOCK_RELEASE(lock);
}
LDLM_LOCK_RELEASE(lock);
- cfs_waitq_signal(&set->set_waitq);
+ if (cfs_atomic_dec_return(&arg->rpcs) < arg->threshold)
+ cfs_waitq_signal(&arg->waitq);
cfs_atomic_inc(&arg->restart);
} else {
LDLM_LOCK_GET(lock);
cfs_atomic_inc(&arg->restart);
} else {
LDLM_LOCK_GET(lock);
- ptlrpc_set_add_req(arg->set, req);
- ++arg->rpcs;
+ ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+ cfs_atomic_inc(&arg->rpcs);
ns->ns_contention_time = NS_DEFAULT_CONTENTION_SECONDS;
ns->ns_contended_locks = NS_DEFAULT_CONTENDED_LOCKS;
ns->ns_contention_time = NS_DEFAULT_CONTENTION_SECONDS;
ns->ns_contended_locks = NS_DEFAULT_CONTENDED_LOCKS;
+ ns->ns_max_parallel_ast = LDLM_DEFAULT_PARALLEL_AST_LIMIT;
ns->ns_nr_unused = 0;
ns->ns_max_unused = LDLM_DEFAULT_LRU_SIZE;
ns->ns_max_age = LDLM_DEFAULT_MAX_ALIVE;
ns->ns_nr_unused = 0;
ns->ns_max_unused = LDLM_DEFAULT_LRU_SIZE;
ns->ns_max_age = LDLM_DEFAULT_MAX_ALIVE;