Whamcloud - gitweb
LU-571 ldlm: add parallel ast flow control
authorJinshan Xiong <jay@whamcloud.com>
Wed, 26 Oct 2011 19:48:17 +0000 (13:48 -0600)
committerOleg Drokin <green@whamcloud.com>
Wed, 4 Jan 2012 01:42:45 +0000 (20:42 -0500)
Commit {hash: 8c83e7d75989ef527e43a824a0dbe46bffabd07d} removed the
parallel AST limit on the server. However, if there are too many locks
to revoke, it will have to allocate too many RPCs.

Return to having an upper limit on the number of AST RPCs in flight by
adding a flow control algorithm that allows a configurable upper limit on
the number of RPCs in flight.

Change-Id: Ifb68991acf7a33119b334447aec50f7717ed546e
Signed-off-by: Jinshan Xiong <jay@whamcloud.com>
Reviewed-on: http://review.whamcloud.com/1608
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/lustre_dlm.h
lustre/ldlm/ldlm_internal.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_resource.c

index 91c715c..cf9c70d 100644 (file)
@@ -75,6 +75,7 @@ struct obd_device;
 #endif
 #define LDLM_DEFAULT_MAX_ALIVE (cfs_time_seconds(36000))
 #define LDLM_CTIME_AGE_LIMIT (10)
 #endif
 #define LDLM_DEFAULT_MAX_ALIVE (cfs_time_seconds(36000))
 #define LDLM_CTIME_AGE_LIMIT (10)
+#define LDLM_DEFAULT_PARALLEL_AST_LIMIT 1024
 
 typedef enum {
         ELDLM_OK = 0,
 
 typedef enum {
         ELDLM_OK = 0,
index 12ff5ce..f6bb70e 100644 (file)
@@ -104,10 +104,11 @@ void ldlm_namespace_free_post(struct ldlm_namespace *ns);
 /* ldlm_lock.c */
 
 struct ldlm_cb_set_arg {
 /* ldlm_lock.c */
 
 struct ldlm_cb_set_arg {
-        struct ptlrpc_request_set *set;
-        cfs_atomic_t    restart;
-        int             type;  /* LDLM_BL_CALLBACK or LDLM_CP_CALLBACK */
-        int             rpcs;  /* # of rpcs in set */
+        int          type;      /* LDLM_BL_CALLBACK or LDLM_CP_CALLBACK */
+        unsigned int threshold; /* threshold to wake up the waiting proc */
+        cfs_atomic_t rpcs;      /* # of inflight rpcs in set */
+        cfs_atomic_t restart;
+        cfs_waitq_t  waitq;
 };
 
 typedef enum {
 };
 
 typedef enum {
index 9378385..eaaaa43 100644 (file)
@@ -1455,33 +1455,6 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, cfs_list_t *queue,
         RETURN(rc);
 }
 
         RETURN(rc);
 }
 
-/* Helper function for ldlm_run_ast_work().
- *
- * Send an existing rpc set specified by @arg->set and then
- * destroy it. Create new one if @do_create flag is set. */
-static int ldlm_deliver_cb_set(struct ldlm_cb_set_arg *arg, int do_create)
-{
-        int rc = 0;
-        ENTRY;
-
-        if (arg->set) {
-                ptlrpc_set_wait(arg->set);
-                if (arg->type == LDLM_BL_CALLBACK)
-                        OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2);
-                ptlrpc_set_destroy(arg->set);
-                arg->set = NULL;
-                arg->rpcs = 0;
-        }
-
-        if (do_create) {
-                arg->set = ptlrpc_prep_set();
-                if (arg->set == NULL)
-                        rc = -ENOMEM;
-        }
-
-        RETURN(rc);
-}
-
 static int
 ldlm_work_bl_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg)
 {
 static int
 ldlm_work_bl_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg)
 {
@@ -1574,18 +1547,18 @@ int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list,
                       ldlm_desc_ast_t ast_type)
 {
         struct ldlm_cb_set_arg arg = { 0 };
                       ldlm_desc_ast_t ast_type)
 {
         struct ldlm_cb_set_arg arg = { 0 };
+        struct l_wait_info     lwi = { 0 };
         cfs_list_t *tmp, *pos;
         int (*work_ast_lock)(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg);
         unsigned int max_ast_count;
         cfs_list_t *tmp, *pos;
         int (*work_ast_lock)(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg);
         unsigned int max_ast_count;
-        int rc;
         ENTRY;
 
         if (cfs_list_empty(rpc_list))
                 RETURN(0);
 
         ENTRY;
 
         if (cfs_list_empty(rpc_list))
                 RETURN(0);
 
-        rc = ldlm_deliver_cb_set(&arg, 1);
-        if (rc != 0)
-                RETURN(rc);
+        cfs_atomic_set(&arg.restart, 0);
+        cfs_atomic_set(&arg.rpcs, 0);
+        cfs_waitq_init(&arg.waitq);
 
         switch (ast_type) {
         case LDLM_WORK_BL_AST:
 
         switch (ast_type) {
         case LDLM_WORK_BL_AST:
@@ -1605,22 +1578,22 @@ int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list,
         }
 
         max_ast_count = ns->ns_max_parallel_ast ? : UINT_MAX;
         }
 
         max_ast_count = ns->ns_max_parallel_ast ? : UINT_MAX;
+        arg.threshold = max_ast_count;
 
         cfs_list_for_each_safe(tmp, pos, rpc_list) {
                 (void)work_ast_lock(tmp, &arg);
 
         cfs_list_for_each_safe(tmp, pos, rpc_list) {
                 (void)work_ast_lock(tmp, &arg);
-                if (arg.rpcs > max_ast_count) {
-                        rc = ldlm_deliver_cb_set(&arg, 1);
-                        if (rc != 0)
-                                break;
-                }
-        }
+                if (cfs_atomic_read(&arg.rpcs) < max_ast_count)
+                        continue;
 
 
-        (void)ldlm_deliver_cb_set(&arg, 0);
+                l_wait_event(arg.waitq,
+                             cfs_atomic_read(&arg.rpcs) < arg.threshold,
+                             &lwi);
+        }
 
 
-        if (rc == 0 && cfs_atomic_read(&arg.restart))
-                rc = -ERESTART;
+        arg.threshold = 1;
+        l_wait_event(arg.waitq, cfs_atomic_read(&arg.rpcs) == 0, &lwi);
 
 
-        RETURN(rc);
+        RETURN(cfs_atomic_read(&arg.restart) ? -ERESTART : 0);
 }
 
 static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
 }
 
 static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
index af7c6c4..d009a1f 100644 (file)
@@ -650,7 +650,6 @@ static int ldlm_cb_interpret(const struct lu_env *env,
         struct ldlm_cb_async_args *ca   = data;
         struct ldlm_lock          *lock = ca->ca_lock;
         struct ldlm_cb_set_arg    *arg  = ca->ca_set_arg;
         struct ldlm_cb_async_args *ca   = data;
         struct ldlm_lock          *lock = ca->ca_lock;
         struct ldlm_cb_set_arg    *arg  = ca->ca_set_arg;
-        struct ptlrpc_request_set *set  = arg->set;
         ENTRY;
 
         LASSERT(lock != NULL);
         ENTRY;
 
         LASSERT(lock != NULL);
@@ -663,7 +662,8 @@ static int ldlm_cb_interpret(const struct lu_env *env,
         }
         LDLM_LOCK_RELEASE(lock);
 
         }
         LDLM_LOCK_RELEASE(lock);
 
-        cfs_waitq_signal(&set->set_waitq);
+        if (cfs_atomic_dec_return(&arg->rpcs) < arg->threshold)
+                cfs_waitq_signal(&arg->waitq);
         RETURN(0);
 }
 
         RETURN(0);
 }
 
@@ -682,8 +682,8 @@ static inline int ldlm_bl_and_cp_ast_tail(struct ptlrpc_request *req,
                         cfs_atomic_inc(&arg->restart);
         } else {
                 LDLM_LOCK_GET(lock);
                         cfs_atomic_inc(&arg->restart);
         } else {
                 LDLM_LOCK_GET(lock);
-                ptlrpc_set_add_req(arg->set, req);
-                ++arg->rpcs;
+                ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+                cfs_atomic_inc(&arg->rpcs);
         }
 
         RETURN(rc);
         }
 
         RETURN(rc);
index eb28062..4d6dab4 100644 (file)
@@ -619,6 +619,7 @@ struct ldlm_namespace *ldlm_namespace_new(struct obd_device *obd, char *name,
         ns->ns_contention_time    = NS_DEFAULT_CONTENTION_SECONDS;
         ns->ns_contended_locks    = NS_DEFAULT_CONTENDED_LOCKS;
 
         ns->ns_contention_time    = NS_DEFAULT_CONTENTION_SECONDS;
         ns->ns_contended_locks    = NS_DEFAULT_CONTENDED_LOCKS;
 
+        ns->ns_max_parallel_ast   = LDLM_DEFAULT_PARALLEL_AST_LIMIT;
         ns->ns_nr_unused          = 0;
         ns->ns_max_unused         = LDLM_DEFAULT_LRU_SIZE;
         ns->ns_max_age            = LDLM_DEFAULT_MAX_ALIVE;
         ns->ns_nr_unused          = 0;
         ns->ns_max_unused         = LDLM_DEFAULT_LRU_SIZE;
         ns->ns_max_age            = LDLM_DEFAULT_MAX_ALIVE;