Whamcloud - gitweb
LU-1061 agl: cl_locks_prune() waits for the last user
authorFan Yong <yong.fan@whamcloud.com>
Thu, 2 Feb 2012 11:17:41 +0000 (19:17 +0800)
committerOleg Drokin <green@whamcloud.com>
Mon, 13 Feb 2012 17:29:27 +0000 (12:29 -0500)
The AGL sponsor holds user reference count on the cl_lock before
triggering AGL RPC. The user reference count on the cl_lock will
be released by AGL RPC reply upcall. Such AGL mechanism conflict
with cl_locks_prune(), which requires no lock is in active using
when the last iput() called.

So the cl_locks_prune() should wait for the last user reference
count to be released by the enqueue reply upcall.

Signed-off-by: Fan Yong <yong.fan@whamcloud.com>
Change-Id: I8998c0a247448b1b6c6e99995c9d956b1666279b
Reviewed-on: http://review.whamcloud.com/2079
Tested-by: Hudson
Reviewed-by: James Simmons <uja.ornl@gmail.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Jinshan Xiong <jinshan.xiong@whamcloud.com>
Reviewed-by: Niu Yawei <niu@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/cl_object.h
lustre/include/lustre_dlm.h
lustre/ldlm/ldlm_lock.c
lustre/obdclass/cl_lock.c
lustre/osc/osc_lock.c
lustre/osc/osc_request.c

index 628bfbf..0760bf8 100644 (file)
@@ -2815,7 +2815,7 @@ void  cl_lock_unhold    (const struct lu_env *env, struct cl_lock *lock,
 void  cl_lock_release   (const struct lu_env *env, struct cl_lock *lock,
                          const char *scope, const void *source);
 void  cl_lock_user_add  (const struct lu_env *env, struct cl_lock *lock);
 void  cl_lock_release   (const struct lu_env *env, struct cl_lock *lock,
                          const char *scope, const void *source);
 void  cl_lock_user_add  (const struct lu_env *env, struct cl_lock *lock);
-int   cl_lock_user_del  (const struct lu_env *env, struct cl_lock *lock);
+void  cl_lock_user_del  (const struct lu_env *env, struct cl_lock *lock);
 
 enum cl_lock_state cl_lock_intransit(const struct lu_env *env,
                                      struct cl_lock *lock);
 
 enum cl_lock_state cl_lock_intransit(const struct lu_env *env,
                                      struct cl_lock *lock);
index 3f6af4f..bd4d87d 100644 (file)
@@ -699,18 +699,6 @@ struct ldlm_lock {
         __u64                 l_flags;
         __u32                 l_readers;
         __u32                 l_writers;
         __u64                 l_flags;
         __u32                 l_readers;
         __u32                 l_writers;
-        /*
-         * Set for locks that were removed from class hash table and will be
-         * destroyed when last reference to them is released. Set by
-         * ldlm_lock_destroy_internal().
-         *
-         * Protected by lock and resource locks.
-         */
-        __u8                  l_destroyed;
-        /**
-         * flag whether this is a server namespace lock
-         */
-        __u8                  l_ns_srv;
         /**
          * If the lock is granted, a process sleeps on this waitq to learn when
          * it's no longer in use.  If the lock is not granted, a process sleeps
         /**
          * If the lock is granted, a process sleeps on this waitq to learn when
          * it's no longer in use.  If the lock is not granted, a process sleeps
@@ -731,11 +719,24 @@ struct ldlm_lock {
 
         struct ldlm_extent    l_req_extent;
 
 
         struct ldlm_extent    l_req_extent;
 
+        unsigned int          l_failed:1,
+        /*
+         * Set for locks that were removed from class hash table and will be
+         * destroyed when last reference to them is released. Set by
+         * ldlm_lock_destroy_internal().
+         *
+         * Protected by lock and resource locks.
+         */
+                              l_destroyed:1,
+        /**
+         * flag whether this is a server namespace lock.
+         */
+                              l_ns_srv:1;
+
         /*
          * Client-side-only members.
          */
 
         /*
          * Client-side-only members.
          */
 
-        int                   l_fail_value;
         /**
          * Temporary storage for an LVB received during an enqueue operation.
          */
         /**
          * Temporary storage for an LVB received during an enqueue operation.
          */
@@ -1083,8 +1084,8 @@ void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode);
 int  ldlm_lock_addref_try(struct lustre_handle *lockh, __u32 mode);
 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode);
 void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode);
 int  ldlm_lock_addref_try(struct lustre_handle *lockh, __u32 mode);
 void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode);
 void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode);
-void ldlm_lock_fail_match_locked(struct ldlm_lock *lock, int rc);
-void ldlm_lock_fail_match(struct ldlm_lock *lock, int rc);
+void ldlm_lock_fail_match_locked(struct ldlm_lock *lock);
+void ldlm_lock_fail_match(struct ldlm_lock *lock);
 void ldlm_lock_allow_match(struct ldlm_lock *lock);
 void ldlm_lock_allow_match_locked(struct ldlm_lock *lock);
 ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags,
 void ldlm_lock_allow_match(struct ldlm_lock *lock);
 void ldlm_lock_allow_match_locked(struct ldlm_lock *lock);
 ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags,
index f72248a..4c124d4 100644 (file)
@@ -1056,7 +1056,7 @@ static struct ldlm_lock *search_queue(cfs_list_t *queue,
 
                 if (!unref &&
                     (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED ||
 
                 if (!unref &&
                     (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED ||
-                     lock->l_fail_value != 0))
+                     lock->l_failed))
                         continue;
 
                 if ((flags & LDLM_FL_LOCAL_ONLY) &&
                         continue;
 
                 if ((flags & LDLM_FL_LOCAL_ONLY) &&
@@ -1076,19 +1076,19 @@ static struct ldlm_lock *search_queue(cfs_list_t *queue,
         return NULL;
 }
 
         return NULL;
 }
 
-void ldlm_lock_fail_match_locked(struct ldlm_lock *lock, int rc)
+void ldlm_lock_fail_match_locked(struct ldlm_lock *lock)
 {
 {
-        if (lock->l_fail_value == 0) {
-                lock->l_fail_value = rc;
-                cfs_waitq_signal(&lock->l_waitq);
+        if (!lock->l_failed) {
+                lock->l_failed = 1;
+                cfs_waitq_broadcast(&lock->l_waitq);
         }
 }
 EXPORT_SYMBOL(ldlm_lock_fail_match_locked);
 
         }
 }
 EXPORT_SYMBOL(ldlm_lock_fail_match_locked);
 
-void ldlm_lock_fail_match(struct ldlm_lock *lock, int rc)
+void ldlm_lock_fail_match(struct ldlm_lock *lock)
 {
         lock_res_and_lock(lock);
 {
         lock_res_and_lock(lock);
-        ldlm_lock_fail_match_locked(lock, rc);
+        ldlm_lock_fail_match_locked(lock);
         unlock_res_and_lock(lock);
 }
 EXPORT_SYMBOL(ldlm_lock_fail_match);
         unlock_res_and_lock(lock);
 }
 EXPORT_SYMBOL(ldlm_lock_fail_match);
@@ -1096,7 +1096,7 @@ EXPORT_SYMBOL(ldlm_lock_fail_match);
 void ldlm_lock_allow_match_locked(struct ldlm_lock *lock)
 {
         lock->l_flags |= LDLM_FL_LVB_READY;
 void ldlm_lock_allow_match_locked(struct ldlm_lock *lock)
 {
         lock->l_flags |= LDLM_FL_LVB_READY;
-        cfs_waitq_signal(&lock->l_waitq);
+        cfs_waitq_broadcast(&lock->l_waitq);
 }
 
 void ldlm_lock_allow_match(struct ldlm_lock *lock)
 }
 
 void ldlm_lock_allow_match(struct ldlm_lock *lock)
@@ -1206,7 +1206,7 @@ ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags,
                         /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
                         l_wait_event(lock->l_waitq,
                                      lock->l_flags & LDLM_FL_LVB_READY ||
                         /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
                         l_wait_event(lock->l_waitq,
                                      lock->l_flags & LDLM_FL_LVB_READY ||
-                                     lock->l_fail_value != 0,
+                                     lock->l_failed,
                                      &lwi);
                         if (!(lock->l_flags & LDLM_FL_LVB_READY)) {
                                 if (flags & LDLM_FL_TEST_LOCK)
                                      &lwi);
                         if (!(lock->l_flags & LDLM_FL_LVB_READY)) {
                                 if (flags & LDLM_FL_TEST_LOCK)
@@ -1263,7 +1263,7 @@ ldlm_mode_t ldlm_revalidate_lock_handle(struct lustre_handle *lockh,
         if (lock != NULL) {
                 lock_res_and_lock(lock);
                 if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED ||
         if (lock != NULL) {
                 lock_res_and_lock(lock);
                 if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED ||
-                    lock->l_fail_value != 0)
+                    lock->l_failed)
                         GOTO(out, mode);
 
                 if (lock->l_flags & LDLM_FL_CBPENDING &&
                         GOTO(out, mode);
 
                 if (lock->l_flags & LDLM_FL_CBPENDING &&
@@ -1311,7 +1311,7 @@ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
         lock->l_req_mode = mode;
         lock->l_ast_data = data;
         lock->l_pid = cfs_curproc_pid();
         lock->l_req_mode = mode;
         lock->l_ast_data = data;
         lock->l_pid = cfs_curproc_pid();
-        lock->l_ns_srv = ns_is_server(ns);
+        lock->l_ns_srv = !!ns_is_server(ns);
         if (cbs) {
                 lock->l_blocking_ast = cbs->lcs_blocking;
                 lock->l_completion_ast = cbs->lcs_completion;
         if (cbs) {
                 lock->l_blocking_ast = cbs->lcs_blocking;
                 lock->l_completion_ast = cbs->lcs_completion;
index 70d94a9..4202603 100644 (file)
@@ -2076,10 +2076,22 @@ void cl_locks_prune(const struct lu_env *env, struct cl_object *obj, int cancel)
                 cl_lock_get_trust(lock);
                 cfs_spin_unlock(&head->coh_lock_guard);
                 lu_ref_add(&lock->cll_reference, "prune", cfs_current());
                 cl_lock_get_trust(lock);
                 cfs_spin_unlock(&head->coh_lock_guard);
                 lu_ref_add(&lock->cll_reference, "prune", cfs_current());
+
+again:
                 cl_lock_mutex_get(env, lock);
                 if (lock->cll_state < CLS_FREEING) {
                         LASSERT(lock->cll_holds == 0);
                 cl_lock_mutex_get(env, lock);
                 if (lock->cll_state < CLS_FREEING) {
                         LASSERT(lock->cll_holds == 0);
-                        LASSERT(lock->cll_users == 0);
+                        LASSERT(lock->cll_users <= 1);
+                        if (unlikely(lock->cll_users == 1)) {
+                                struct l_wait_info lwi = { 0 };
+
+                                cl_lock_mutex_put(env, lock);
+                                l_wait_event(lock->cll_wq,
+                                             lock->cll_users == 0,
+                                             &lwi);
+                                goto again;
+                        }
+
                         if (cancel)
                                 cl_lock_cancel(env, lock);
                         cl_lock_delete(env, lock);
                         if (cancel)
                                 cl_lock_cancel(env, lock);
                         cl_lock_delete(env, lock);
@@ -2255,7 +2267,7 @@ void cl_lock_user_add(const struct lu_env *env, struct cl_lock *lock)
 }
 EXPORT_SYMBOL(cl_lock_user_add);
 
 }
 EXPORT_SYMBOL(cl_lock_user_add);
 
-int cl_lock_user_del(const struct lu_env *env, struct cl_lock *lock)
+void cl_lock_user_del(const struct lu_env *env, struct cl_lock *lock)
 {
         LINVRNT(cl_lock_is_mutexed(lock));
         LINVRNT(cl_lock_invariant(env, lock));
 {
         LINVRNT(cl_lock_is_mutexed(lock));
         LINVRNT(cl_lock_invariant(env, lock));
@@ -2263,7 +2275,9 @@ int cl_lock_user_del(const struct lu_env *env, struct cl_lock *lock)
 
         ENTRY;
         cl_lock_used_mod(env, lock, -1);
 
         ENTRY;
         cl_lock_used_mod(env, lock, -1);
-        RETURN(lock->cll_users == 0);
+        if (lock->cll_users == 0)
+                cfs_waitq_broadcast(&lock->cll_wq);
+        EXIT;
 }
 EXPORT_SYMBOL(cl_lock_user_del);
 
 }
 EXPORT_SYMBOL(cl_lock_user_del);
 
index f239956..a1dc1bf 100644 (file)
@@ -535,13 +535,15 @@ static int osc_lock_upcall(void *cookie, int errcode)
                                 dlmlock->l_ast_data = NULL;
                                 olck->ols_handle.cookie = 0ULL;
                                 cfs_spin_unlock(&osc_ast_guard);
                                 dlmlock->l_ast_data = NULL;
                                 olck->ols_handle.cookie = 0ULL;
                                 cfs_spin_unlock(&osc_ast_guard);
-                                ldlm_lock_fail_match_locked(dlmlock, rc);
+                                ldlm_lock_fail_match_locked(dlmlock);
                                 unlock_res_and_lock(dlmlock);
                                 LDLM_LOCK_PUT(dlmlock);
                         }
                 } else {
                                 unlock_res_and_lock(dlmlock);
                                 LDLM_LOCK_PUT(dlmlock);
                         }
                 } else {
-                        if (olck->ols_glimpse)
+                        if (olck->ols_glimpse) {
                                 olck->ols_glimpse = 0;
                                 olck->ols_glimpse = 0;
+                                olck->ols_agl = 0 ;
+                        }
                         osc_lock_upcall0(env, olck);
                 }
 
                         osc_lock_upcall0(env, olck);
                 }
 
index c4ea288..2c4cef7 100644 (file)
@@ -3332,7 +3332,7 @@ void osc_update_enqueue(struct lustre_handle *lov_lockhp,
 
         if (lock != NULL) {
                 if (rc != ELDLM_OK)
 
         if (lock != NULL) {
                 if (rc != ELDLM_OK)
-                        ldlm_lock_fail_match(lock, rc);
+                        ldlm_lock_fail_match(lock);
 
                 LDLM_LOCK_PUT(lock);
         }
 
                 LDLM_LOCK_PUT(lock);
         }