Whamcloud - gitweb
b=20824
[fs/lustre-release.git] / lustre / lov / lov_lock.c
index e2b1520..bc6ab44 100644 (file)
@@ -49,6 +49,8 @@
 static struct cl_lock_closure *lov_closure_get(const struct lu_env *env,
                                                struct cl_lock *parent);
 
+static int lov_lock_unuse(const struct lu_env *env,
+                          const struct cl_lock_slice *slice);
 /*****************************************************************************
  *
  * Lov lock operations.
@@ -226,6 +228,7 @@ static int lov_sublock_lock(const struct lu_env *env,
                         LASSERT(link != NULL);
                         lov_lock_unlink(env, link, sublock);
                         lov_sublock_unlock(env, sublock, closure, NULL);
+                        lck->lls_cancel_race = 1;
                         result = CLO_REPEAT;
                 } else if (lsep) {
                         struct lov_sublock_env *subenv;
@@ -644,7 +647,7 @@ static int lov_lock_unuse(const struct lu_env *env,
                 /* top-lock state cannot change concurrently, because single
                  * thread (one that released the last hold) carries unlocking
                  * to the completion. */
-                LASSERT(slice->cls_lock->cll_state == CLS_UNLOCKING);
+                LASSERT(slice->cls_lock->cll_state == CLS_INTRANSIT);
                 lls = &lck->lls_sub[i];
                 sub = lls->sub_lock;
                 if (sub == NULL)
@@ -653,7 +656,7 @@ static int lov_lock_unuse(const struct lu_env *env,
                 sublock = sub->lss_cl.cls_lock;
                 rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
                 if (rc == 0) {
-                        if (lck->lls_sub[i].sub_flags & LSF_HELD) {
+                        if (lls->sub_flags & LSF_HELD) {
                                 LASSERT(sublock->cll_state == CLS_HELD);
                                 rc = cl_unuse_try(subenv->lse_env, sublock);
                                 if (rc != CLO_WAIT)
@@ -666,8 +669,9 @@ static int lov_lock_unuse(const struct lu_env *env,
                 if (result < 0)
                         break;
         }
-        if (result == 0 && lck->lls_unuse_race) {
-                lck->lls_unuse_race = 0;
+
+        if (result == 0 && lck->lls_cancel_race) {
+                lck->lls_cancel_race = 0;
                 result = -ESTALE;
         }
         cl_lock_closure_fini(closure);
@@ -721,7 +725,7 @@ static int lov_lock_use(const struct lu_env *env,
         int                     result;
         int                     i;
 
-        LASSERT(slice->cls_lock->cll_state == CLS_CACHED);
+        LASSERT(slice->cls_lock->cll_state == CLS_INTRANSIT);
         ENTRY;
 
         for (result = 0, i = 0; i < lck->lls_nr; ++i) {
@@ -731,37 +735,48 @@ static int lov_lock_use(const struct lu_env *env,
                 struct lov_lock_sub    *lls;
                 struct lov_sublock_env *subenv;
 
-                if (slice->cls_lock->cll_state != CLS_CACHED) {
-                        /* see comment in lov_lock_enqueue(). */
-                        LASSERT(i > 0 && result != 0);
-                        break;
-                }
-                /*
-                 * if a sub-lock was destroyed while top-lock was in
-                 * CLS_CACHED state, top-lock would have been moved into
-                 * CLS_NEW state, so all sub-locks have to be in place.
-                 */
+                LASSERT(slice->cls_lock->cll_state == CLS_INTRANSIT);
+
                 lls = &lck->lls_sub[i];
                 sub = lls->sub_lock;
-                LASSERT(sub != NULL);
+                if (sub == NULL) {
+                        /*
+                         * Sub-lock might have been canceled, while top-lock was
+                         * cached.
+                         */
+                        result = -ESTALE;
+                        break;
+                }
+
                 sublock = sub->lss_cl.cls_lock;
                 rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
                 if (rc == 0) {
                         LASSERT(sublock->cll_state != CLS_FREEING);
                         lov_sublock_hold(env, lck, i);
                         if (sublock->cll_state == CLS_CACHED) {
-                                rc = cl_use_try(subenv->lse_env, sublock);
+                                rc = cl_use_try(subenv->lse_env, sublock, 0);
                                 if (rc != 0)
                                         rc = lov_sublock_release(env, lck,
                                                                  i, 1, rc);
-                        } else
-                                rc = 0;
+                        }
                         lov_sublock_unlock(env, sub, closure, subenv);
                 }
                 result = lov_subresult(result, rc);
                 if (result != 0)
                         break;
         }
+
+        if (lck->lls_cancel_race) {
+                /*
+                 * If there is unlocking happened at the same time, then
+                 * sublock_lock state should be FREEING, and lov_sublock_lock
+                 * should return CLO_REPEAT. In this case, it should return
+                 * ESTALE, and up layer should reset the lock state to be NEW.
+                 */
+                lck->lls_cancel_race = 0;
+                LASSERT(result != 0);
+                result = -ESTALE;
+        }
         cl_lock_closure_fini(closure);
         RETURN(result);
 }
@@ -984,7 +999,7 @@ static void lov_lock_delete(const struct lu_env *env,
                 sublock = lsl->lss_cl.cls_lock;
                 rc = lov_sublock_lock(env, lck, lls, closure, NULL);
                 if (rc == 0) {
-                        if (lck->lls_sub[i].sub_flags & LSF_HELD)
+                        if (lls->sub_flags & LSF_HELD)
                                 lov_sublock_release(env, lck, i, 1, 0);
                         if (sublock->cll_state < CLS_FREEING) {
                                 struct lov_lock_link *link;