Whamcloud - gitweb
LU-1683 agl: increase lock cll_holds for AGL upcall
authorFan Yong <yong.fan@whamcloud.com>
Thu, 2 Aug 2012 01:30:45 +0000 (09:30 +0800)
committerOleg Drokin <green@whamcloud.com>
Mon, 13 Aug 2012 05:10:01 +0000 (01:10 -0400)
If without additional cll_holds held for AGL upcall, the AGL lock
may be cancelled/deleted by the AGL thread if the AGL lock cannot
be granted at once.

The osc_lock_wait() will re-trigger lock enqueue for non-granted
AGL lock and return 'CLO_REENQUEUED' to the caller. Original lov
lock enqueue logic ignored such case for lov_lock_enqueue_one(),
so may cause unexpected LASSERT when checking the result.

Signed-off-by: Fan Yong <yong.fan@whamcloud.com>
Change-Id: I37305aba9d9f9ad525decc20badac4afbe7aedb0
Reviewed-on: http://review.whamcloud.com/3249
Reviewed-by: Jinshan Xiong <jinshan.xiong@whamcloud.com>
Tested-by: Hudson
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: James Simmons <uja.ornl@gmail.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/cl_object.h
lustre/lov/lov_lock.c
lustre/obdclass/cl_lock.c
lustre/osc/osc_lock.c
lustre/osc/osc_request.c

index d8f6f17..b0c11e3 100644 (file)
@@ -2885,6 +2885,8 @@ void  cl_lock_get_trust (struct cl_lock *lock);
 void  cl_lock_put       (const struct lu_env *env, struct cl_lock *lock);
 void  cl_lock_hold_add  (const struct lu_env *env, struct cl_lock *lock,
                          const char *scope, const void *source);
+void cl_lock_hold_release(const struct lu_env *env, struct cl_lock *lock,
+                         const char *scope, const void *source);
 void  cl_lock_unhold    (const struct lu_env *env, struct cl_lock *lock,
                          const char *scope, const void *source);
 void  cl_lock_release   (const struct lu_env *env, struct cl_lock *lock,
index 1fc696c..907baae 100644 (file)
@@ -269,8 +269,10 @@ static int lov_subresult(int result, int rc)
 
         ENTRY;
 
-        LASSERT(result <= 0 || result == CLO_REPEAT || result == CLO_WAIT);
-        LASSERT(rc <= 0 || rc == CLO_REPEAT || rc == CLO_WAIT);
+       LASSERTF(result <= 0 || result == CLO_REPEAT || result == CLO_WAIT,
+                "result = %d", result);
+       LASSERTF(rc <= 0 || rc == CLO_REPEAT || rc == CLO_WAIT,
+                "rc = %d\n", rc);
         CLASSERT(CLO_WAIT < CLO_REPEAT);
 
         /* calculate ranks in the ordering above */
@@ -524,10 +526,13 @@ static int lov_lock_enqueue_one(const struct lu_env *env, struct lov_lock *lck,
 
         /* first, try to enqueue a sub-lock ... */
         result = cl_enqueue_try(env, sublock, io, enqflags);
-        if ((sublock->cll_state == CLS_ENQUEUED) && !(enqflags & CEF_AGL))
-                /* if it is enqueued, try to `wait' on it---maybe it's already
-                 * granted */
-                result = cl_wait_try(env, sublock);
+       if ((sublock->cll_state == CLS_ENQUEUED) && !(enqflags & CEF_AGL)) {
+               /* if it is enqueued, try to `wait' on it---maybe it's already
+                * granted */
+               result = cl_wait_try(env, sublock);
+               if (result == CLO_REENQUEUED)
+                       result = CLO_WAIT;
+       }
         /*
          * If CEF_ASYNC flag is set, then all sub-locks can be enqueued in
          * parallel, otherwise---enqueue has to wait until sub-lock is granted
index d5efa9b..84ee04f 100644 (file)
@@ -884,8 +884,8 @@ static void cl_lock_used_mod(const struct lu_env *env, struct cl_lock *lock,
         }
 }
 
-static void cl_lock_hold_release(const struct lu_env *env, struct cl_lock *lock,
-                                 const char *scope, const void *source)
+void cl_lock_hold_release(const struct lu_env *env, struct cl_lock *lock,
+                         const char *scope, const void *source)
 {
         LINVRNT(cl_lock_is_mutexed(lock));
         LINVRNT(cl_lock_invariant(env, lock));
@@ -917,6 +917,7 @@ static void cl_lock_hold_release(const struct lu_env *env, struct cl_lock *lock,
         }
         EXIT;
 }
+EXPORT_SYMBOL(cl_lock_hold_release);
 
 /**
  * Waits until lock state is changed.
@@ -1196,7 +1197,9 @@ int cl_enqueue_try(const struct lu_env *env, struct cl_lock *lock,
                 case CLS_QUEUING:
                         /* kick layers. */
                         result = cl_enqueue_kick(env, lock, io, flags);
-                        if (result == 0)
+                       /* For AGL case, the cl_lock::cll_state may
+                        * become CLS_HELD already. */
+                       if (result == 0 && lock->cll_state == CLS_QUEUING)
                                 cl_lock_state_set(env, lock, CLS_ENQUEUED);
                         break;
                 case CLS_INTRANSIT:
@@ -1456,9 +1459,11 @@ int cl_wait_try(const struct lu_env *env, struct cl_lock *lock)
         do {
                 LINVRNT(cl_lock_is_mutexed(lock));
                 LINVRNT(cl_lock_invariant(env, lock));
-                LASSERT(lock->cll_state == CLS_ENQUEUED ||
-                        lock->cll_state == CLS_HELD ||
-                        lock->cll_state == CLS_INTRANSIT);
+               LASSERTF(lock->cll_state == CLS_QUEUING ||
+                        lock->cll_state == CLS_ENQUEUED ||
+                        lock->cll_state == CLS_HELD ||
+                        lock->cll_state == CLS_INTRANSIT,
+                        "lock state: %d\n", lock->cll_state);
                 LASSERT(lock->cll_users > 0);
                 LASSERT(lock->cll_holds > 0);
 
@@ -2038,7 +2043,6 @@ void cl_locks_prune(const struct lu_env *env, struct cl_object *obj, int cancel)
 again:
                 cl_lock_mutex_get(env, lock);
                 if (lock->cll_state < CLS_FREEING) {
-                        LASSERT(lock->cll_holds == 0);
                         LASSERT(lock->cll_users <= 1);
                         if (unlikely(lock->cll_users == 1)) {
                                 struct l_wait_info lwi = { 0 };
index 467295f..a4b9be0 100644 (file)
@@ -583,17 +583,21 @@ static int osc_lock_upcall(void *cookie, int errcode)
                         cl_lock_error(env, lock, rc);
                 }
 
-                cl_lock_mutex_put(env, lock);
-
-                /* release cookie reference, acquired by osc_lock_enqueue() */
-                lu_ref_del(&lock->cll_reference, "upcall", lock);
-                cl_lock_put(env, lock);
-
-                cl_env_nested_put(&nest, env);
-        } else
-                /* should never happen, similar to osc_ldlm_blocking_ast(). */
-                LBUG();
-        RETURN(errcode);
+               /* release cookie reference, acquired by osc_lock_enqueue() */
+               cl_lock_hold_release(env, lock, "upcall", lock);
+               cl_lock_mutex_put(env, lock);
+
+               lu_ref_del(&lock->cll_reference, "upcall", lock);
+               /* This maybe the last reference, so must be called after
+                * cl_lock_mutex_put(). */
+               cl_lock_put(env, lock);
+
+               cl_env_nested_put(&nest, env);
+       } else {
+               /* should never happen, similar to osc_ldlm_blocking_ast(). */
+               LBUG();
+       }
+       RETURN(errcode);
 }
 
 /**
@@ -1179,7 +1183,9 @@ static int osc_lock_enqueue(const struct lu_env *env,
         if (enqflags & CEF_AGL) {
                 ols->ols_flags |= LDLM_FL_BLOCK_NOWAIT;
                 ols->ols_agl = 1;
-        }
+       } else {
+               ols->ols_agl = 0;
+       }
         if (ols->ols_flags & LDLM_FL_HAS_INTENT)
                 ols->ols_glimpse = 1;
         if (!osc_lock_is_lockless(ols) && !(enqflags & CEF_MUST))
@@ -1198,9 +1204,9 @@ static int osc_lock_enqueue(const struct lu_env *env,
                         if (ols->ols_locklessable)
                                 ols->ols_flags |= LDLM_FL_DENY_ON_CONTENTION;
 
-                        /* a reference for lock, passed as an upcall cookie */
-                        cl_lock_get(lock);
-                        lu_ref_add(&lock->cll_reference, "upcall", lock);
+                       /* lock will be passed as upcall cookie,
+                        * hold ref to prevent to be released. */
+                        cl_lock_hold_add(env, lock, "upcall", lock);
                         /* a user for lock also */
                         cl_lock_user_add(env, lock);
                         ols->ols_state = OLS_ENQUEUED;
@@ -1221,9 +1227,7 @@ static int osc_lock_enqueue(const struct lu_env *env,
                                           PTLRPCD_SET, 1, ols->ols_agl);
                         if (result != 0) {
                                 cl_lock_user_del(env, lock);
-                                lu_ref_del(&lock->cll_reference,
-                                           "upcall", lock);
-                                cl_lock_put(env, lock);
+                               cl_lock_unhold(env, lock, "upcall", lock);
                                 if (unlikely(result == -ECANCELED)) {
                                         ols->ols_state = OLS_NEW;
                                         result = 0;
index b288b04..d693fb6 100644 (file)
@@ -2495,7 +2495,10 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
                                  * are explained in lov_enqueue() */
                         }
 
-                        /* We already have a lock, and it's referenced */
+                       /* We already have a lock, and it's referenced.
+                        *
+                        * At this point, the cl_lock::cll_state is CLS_QUEUING,
+                        * AGL upcall may change it to CLS_HELD directly. */
                         (*upcall)(cookie, ELDLM_OK);
 
                         if (einfo->ei_mode != mode)