Whamcloud - gitweb
LU-1046 ldlm: hold ldlm_cb_set_arg until the last user exit
authorFan Yong <yong.fan@whamcloud.com>
Tue, 31 Jan 2012 05:51:41 +0000 (13:51 +0800)
committerOleg Drokin <green@whamcloud.com>
Thu, 9 Feb 2012 07:47:43 +0000 (02:47 -0500)
There is race condition between ldlm_cb_interpret() and
ldlm_run_ast_work() when test/release "ldlm_cb_set_arg":
ldlm_run_ast_work() can exit before ldlm_cb_interpret()
processing "ldlm_cb_set_arg::waitq", then causes invalid
memory accessing.

Signed-off-by: Fan Yong <yong.fan@whamcloud.com>
Change-Id: Idfb3f41739e02a79fe4310ecbc7bb842e75a82d8
Reviewed-on: http://review.whamcloud.com/2065
Tested-by: Hudson
Reviewed-by: Jinshan Xiong <jinshan.xiong@whamcloud.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/ldlm/ldlm_internal.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c

index f6bb70e..aa0faef 100644 (file)
@@ -108,9 +108,19 @@ struct ldlm_cb_set_arg {
         unsigned int threshold; /* threshold to wake up the waiting proc */
         cfs_atomic_t rpcs;      /* # of inflight rpcs in set */
         cfs_atomic_t restart;
+        cfs_atomic_t refcount;
         cfs_waitq_t  waitq;
 };
 
+static inline void ldlm_csa_put(struct ldlm_cb_set_arg *arg)
+{
+        if (cfs_atomic_dec_and_test(&arg->refcount)) {
+                LASSERT(cfs_atomic_read(&arg->rpcs) == 0);
+
+                OBD_FREE_PTR(arg);
+        }
+}
+
 typedef enum {
         LDLM_WORK_BL_AST,
         LDLM_WORK_CP_AST,
index fe7aac9..8e34904 100644 (file)
@@ -1577,31 +1577,37 @@ ldlm_work_revoke_ast_lock(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg)
 int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list,
                       ldlm_desc_ast_t ast_type)
 {
-        struct ldlm_cb_set_arg arg = { 0 };
         struct l_wait_info     lwi = { 0 };
+        struct ldlm_cb_set_arg *arg;
         cfs_list_t *tmp, *pos;
         int (*work_ast_lock)(cfs_list_t *tmp, struct ldlm_cb_set_arg *arg);
         unsigned int max_ast_count;
+        int rc;
         ENTRY;
 
         if (cfs_list_empty(rpc_list))
                 RETURN(0);
 
-        cfs_atomic_set(&arg.restart, 0);
-        cfs_atomic_set(&arg.rpcs, 0);
-        cfs_waitq_init(&arg.waitq);
+        OBD_ALLOC_PTR(arg);
+        if (arg == NULL)
+                RETURN(-ENOMEM);
+
+        cfs_atomic_set(&arg->restart, 0);
+        cfs_atomic_set(&arg->rpcs, 0);
+        cfs_atomic_set(&arg->refcount, 1);
+        cfs_waitq_init(&arg->waitq);
 
         switch (ast_type) {
         case LDLM_WORK_BL_AST:
-                arg.type = LDLM_BL_CALLBACK;
+                arg->type = LDLM_BL_CALLBACK;
                 work_ast_lock = ldlm_work_bl_ast_lock;
                 break;
         case LDLM_WORK_CP_AST:
-                arg.type = LDLM_CP_CALLBACK;
+                arg->type = LDLM_CP_CALLBACK;
                 work_ast_lock = ldlm_work_cp_ast_lock;
                 break;
         case LDLM_WORK_REVOKE_AST:
-                arg.type = LDLM_BL_CALLBACK;
+                arg->type = LDLM_BL_CALLBACK;
                 work_ast_lock = ldlm_work_revoke_ast_lock;
                 break;
         default:
@@ -1609,22 +1615,24 @@ int ldlm_run_ast_work(struct ldlm_namespace *ns, cfs_list_t *rpc_list,
         }
 
         max_ast_count = ns->ns_max_parallel_ast ? : UINT_MAX;
-        arg.threshold = max_ast_count;
+        arg->threshold = max_ast_count;
 
         cfs_list_for_each_safe(tmp, pos, rpc_list) {
-                (void)work_ast_lock(tmp, &arg);
-                if (cfs_atomic_read(&arg.rpcs) < max_ast_count)
+                (void)work_ast_lock(tmp, arg);
+                if (cfs_atomic_read(&arg->rpcs) < max_ast_count)
                         continue;
 
-                l_wait_event(arg.waitq,
-                             cfs_atomic_read(&arg.rpcs) < arg.threshold,
+                l_wait_event(arg->waitq,
+                             cfs_atomic_read(&arg->rpcs) < arg->threshold,
                              &lwi);
         }
 
-        arg.threshold = 1;
-        l_wait_event(arg.waitq, cfs_atomic_read(&arg.rpcs) == 0, &lwi);
+        arg->threshold = 1;
+        l_wait_event(arg->waitq, cfs_atomic_read(&arg->rpcs) == 0, &lwi);
 
-        RETURN(cfs_atomic_read(&arg.restart) ? -ERESTART : 0);
+        rc = cfs_atomic_read(&arg->restart) ? -ERESTART : 0;
+        ldlm_csa_put(arg);
+        RETURN(rc);
 }
 
 static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
index d77a893..5de675d 100644 (file)
@@ -650,6 +650,7 @@ static int ldlm_cb_interpret(const struct lu_env *env,
         struct ldlm_cb_async_args *ca   = data;
         struct ldlm_lock          *lock = ca->ca_lock;
         struct ldlm_cb_set_arg    *arg  = ca->ca_set_arg;
+        int count;
         ENTRY;
 
         LASSERT(lock != NULL);
@@ -662,8 +663,11 @@ static int ldlm_cb_interpret(const struct lu_env *env,
         }
         LDLM_LOCK_RELEASE(lock);
 
-        if (cfs_atomic_dec_return(&arg->rpcs) < arg->threshold)
+        count = cfs_atomic_dec_return(&arg->rpcs);
+        if (count < arg->threshold)
                 cfs_waitq_signal(&arg->waitq);
+        if (count == 0)
+                ldlm_csa_put(arg);
         RETURN(0);
 }
 
@@ -682,8 +686,9 @@ static inline int ldlm_bl_and_cp_ast_tail(struct ptlrpc_request *req,
                         cfs_atomic_inc(&arg->restart);
         } else {
                 LDLM_LOCK_GET(lock);
+                if (cfs_atomic_inc_return(&arg->rpcs) == 1)
+                        cfs_atomic_inc(&arg->refcount);
                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
-                cfs_atomic_inc(&arg->rpcs);
         }
 
         RETURN(rc);