Whamcloud - gitweb
b=20824
[fs/lustre-release.git] / lustre / lov / lov_lock.c
index 9c741f8..bc6ab44 100644 (file)
 
 #include "lov_cl_internal.h"
 
-/** \addtogroup lov lov @{ */
+/** \addtogroup lov
+ *  @{
+ */
 
 static struct cl_lock_closure *lov_closure_get(const struct lu_env *env,
                                                struct cl_lock *parent);
 
+static int lov_lock_unuse(const struct lu_env *env,
+                          const struct cl_lock_slice *slice);
 /*****************************************************************************
  *
  * Lov lock operations.
@@ -78,7 +82,6 @@ static struct lov_sublock_env *lov_sublock_env_get(const struct lu_env *env,
                 subenv->lse_io  = io;
                 subenv->lse_sub = NULL;
         } else {
-                LASSERT(io != NULL);
                 sub = lov_sub_get(env, lio, lls->sub_stripe);
                 if (!IS_ERR(sub)) {
                         subenv->lse_env = sub->sub_env;
@@ -192,29 +195,46 @@ static void lov_sublock_unlock(const struct lu_env *env,
 }
 
 static int lov_sublock_lock(const struct lu_env *env,
+                            struct lov_lock *lck,
                             struct lov_lock_sub *lls,
                             struct cl_lock_closure *closure,
                             struct lov_sublock_env **lsep)
 {
-        struct cl_lock *child;
-        int             result = 0;
+        struct lovsub_lock *sublock;
+        struct cl_lock     *child;
+        int                 result = 0;
         ENTRY;
 
         LASSERT(list_empty(&closure->clc_list));
 
-        child = lls->sub_lock->lss_cl.cls_lock;
+        sublock = lls->sub_lock;
+        child = sublock->lss_cl.cls_lock;
         result = cl_lock_closure_build(env, child, closure);
         if (result == 0) {
                 struct cl_lock *parent = closure->clc_origin;
 
                 LASSERT(cl_lock_is_mutexed(child));
-                lls->sub_lock->lss_active = parent;
+                sublock->lss_active = parent;
 
-                if (lsep) {
+                if (unlikely(child->cll_state == CLS_FREEING)) {
+                        struct lov_lock_link *link;
+                        /*
+                         * we could race with lock deletion which temporarily
+                         * put the lock in freeing state, bug 19080.
+                         */
+                        LASSERT(!(lls->sub_flags & LSF_HELD));
+
+                        link = lov_lock_link_find(env, lck, sublock);
+                        LASSERT(link != NULL);
+                        lov_lock_unlink(env, link, sublock);
+                        lov_sublock_unlock(env, sublock, closure, NULL);
+                        lck->lls_cancel_race = 1;
+                        result = CLO_REPEAT;
+                } else if (lsep) {
                         struct lov_sublock_env *subenv;
                         subenv = lov_sublock_env_get(env, parent, lls);
                         if (IS_ERR(subenv)) {
-                                lov_sublock_unlock(env, lls->sub_lock,
+                                lov_sublock_unlock(env, sublock,
                                                    closure, NULL);
                                 result = PTR_ERR(subenv);
                         } else {
@@ -274,10 +294,7 @@ static int lov_lock_sub_init(const struct lu_env *env,
 {
         int result = 0;
         int i;
-        int j;
         int nr;
-        int stripe;
-        int start_stripe;
         obd_off start;
         obd_off end;
         obd_off file_start;
@@ -293,14 +310,12 @@ static int lov_lock_sub_init(const struct lu_env *env,
         file_start = cl_offset(lov2cl(loo), parent->cll_descr.cld_start);
         file_end   = cl_offset(lov2cl(loo), parent->cll_descr.cld_end + 1) - 1;
 
-        start_stripe = lov_stripe_number(r0->lo_lsm, file_start);
         for (i = 0, nr = 0; i < r0->lo_nr; i++) {
                 /*
                  * XXX for wide striping smarter algorithm is desirable,
                  * breaking out of the loop, early.
                  */
-                stripe = (start_stripe + i) % r0->lo_nr;
-                if (lov_stripe_intersects(r0->lo_lsm, stripe,
+                if (lov_stripe_intersects(r0->lo_lsm, i,
                                           file_start, file_end, &start, &end))
                         nr++;
         }
@@ -317,22 +332,22 @@ static int lov_lock_sub_init(const struct lu_env *env,
          * create sub-locks. At this moment, no other thread can access
          * top-lock.
          */
-        for (j = 0, nr = 0; j < i; ++j) {
-                stripe = (start_stripe + j) % r0->lo_nr;
-                if (lov_stripe_intersects(r0->lo_lsm, stripe,
+        for (i = 0, nr = 0; i < r0->lo_nr; ++i) {
+                if (lov_stripe_intersects(r0->lo_lsm, i,
                                           file_start, file_end, &start, &end)) {
                         struct cl_lock_descr *descr;
 
                         descr = &lck->lls_sub[nr].sub_descr;
 
                         LASSERT(descr->cld_obj == NULL);
-                        descr->cld_obj   = lovsub2cl(r0->lo_sub[stripe]);
+                        descr->cld_obj   = lovsub2cl(r0->lo_sub[i]);
                         descr->cld_start = cl_index(descr->cld_obj, start);
                         descr->cld_end   = cl_index(descr->cld_obj, end);
                         descr->cld_mode  = parent->cll_descr.cld_mode;
+                        descr->cld_gid   = parent->cll_descr.cld_gid;
                         /* XXX has no effect */
                         lck->lls_sub[nr].sub_got = *descr;
-                        lck->lls_sub[nr].sub_stripe = stripe;
+                        lck->lls_sub[nr].sub_stripe = i;
                         nr++;
                 }
         }
@@ -362,6 +377,7 @@ static int lov_lock_sub_init(const struct lu_env *env,
                                 lov_sublock_adopt(env, lck, sublock, i, link);
                                 cl_lock_mutex_put(env, parent);
                         } else {
+                                OBD_SLAB_FREE_PTR(link, lov_lock_link_kmem);
                                 cl_lock_mutex_put(env, parent);
                                 cl_lock_unhold(env, sublock,
                                                "lov-parent", parent);
@@ -402,6 +418,7 @@ static int lov_sublock_release(const struct lu_env *env, struct lov_lock *lck,
                  * while sub-lock is being paged out.
                  */
                 dying = (sublock->cll_descr.cld_mode == CLM_PHANTOM ||
+                         sublock->cll_descr.cld_mode == CLM_GROUP ||
                          (sublock->cll_flags & (CLF_CANCELPEND|CLF_DOOMED))) &&
                         sublock->cll_holds == 1;
                 if (dying)
@@ -523,6 +540,7 @@ static int lov_sublock_fill(const struct lu_env *env, struct cl_lock *parent,
                     lck->lls_sub[idx].sub_lock == NULL)
                         lov_sublock_adopt(env, lck, sublock, idx, link);
                 else {
+                        OBD_SLAB_FREE_PTR(link, lov_lock_link_kmem);
                         /* other thread allocated sub-lock, or enqueue is no
                          * longer going on */
                         cl_lock_mutex_put(env, parent);
@@ -586,7 +604,7 @@ static int lov_lock_enqueue(const struct lu_env *env,
                         break;
                 }
                 sublock = sub->lss_cl.cls_lock;
-                rc = lov_sublock_lock(env, lls, closure, &subenv);
+                rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
                 if (rc == 0) {
                         lov_sublock_hold(env, lck, i);
                         rc = lov_lock_enqueue_one(subenv->lse_env, lck, sublock,
@@ -602,7 +620,7 @@ static int lov_lock_enqueue(const struct lu_env *env,
                         lov_sublock_unlock(env, sub, closure, subenv);
                 }
                 result = lov_subresult(result, rc);
-                if (result < 0)
+                if (result != 0)
                         break;
         }
         cl_lock_closure_fini(closure);
@@ -629,16 +647,16 @@ static int lov_lock_unuse(const struct lu_env *env,
                 /* top-lock state cannot change concurrently, because single
                  * thread (one that released the last hold) carries unlocking
                  * to the completion. */
-                LASSERT(slice->cls_lock->cll_state == CLS_UNLOCKING);
+                LASSERT(slice->cls_lock->cll_state == CLS_INTRANSIT);
                 lls = &lck->lls_sub[i];
                 sub = lls->sub_lock;
                 if (sub == NULL)
                         continue;
 
                 sublock = sub->lss_cl.cls_lock;
-                rc = lov_sublock_lock(env, lls, closure, &subenv);
+                rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
                 if (rc == 0) {
-                        if (lck->lls_sub[i].sub_flags & LSF_HELD) {
+                        if (lls->sub_flags & LSF_HELD) {
                                 LASSERT(sublock->cll_state == CLS_HELD);
                                 rc = cl_unuse_try(subenv->lse_env, sublock);
                                 if (rc != CLO_WAIT)
@@ -651,8 +669,9 @@ static int lov_lock_unuse(const struct lu_env *env,
                 if (result < 0)
                         break;
         }
-        if (result == 0 && lck->lls_unuse_race) {
-                lck->lls_unuse_race = 0;
+
+        if (result == 0 && lck->lls_cancel_race) {
+                lck->lls_cancel_race = 0;
                 result = -ESTALE;
         }
         cl_lock_closure_fini(closure);
@@ -681,7 +700,7 @@ static int lov_lock_wait(const struct lu_env *env,
                 sub = lls->sub_lock;
                 LASSERT(sub != NULL);
                 sublock = sub->lss_cl.cls_lock;
-                rc = lov_sublock_lock(env, lls, closure, &subenv);
+                rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
                 if (rc == 0) {
                         LASSERT(sublock->cll_state >= CLS_ENQUEUED);
                         if (sublock->cll_state < CLS_HELD)
@@ -691,7 +710,7 @@ static int lov_lock_wait(const struct lu_env *env,
                         lov_sublock_unlock(env, sub, closure, subenv);
                 }
                 result = lov_subresult(result, rc);
-                if (result < 0)
+                if (result != 0)
                         break;
         }
         cl_lock_closure_fini(closure);
@@ -706,7 +725,7 @@ static int lov_lock_use(const struct lu_env *env,
         int                     result;
         int                     i;
 
-        LASSERT(slice->cls_lock->cll_state == CLS_CACHED);
+        LASSERT(slice->cls_lock->cll_state == CLS_INTRANSIT);
         ENTRY;
 
         for (result = 0, i = 0; i < lck->lls_nr; ++i) {
@@ -716,37 +735,48 @@ static int lov_lock_use(const struct lu_env *env,
                 struct lov_lock_sub    *lls;
                 struct lov_sublock_env *subenv;
 
-                if (slice->cls_lock->cll_state != CLS_CACHED) {
-                        /* see comment in lov_lock_enqueue(). */
-                        LASSERT(i > 0 && result != 0);
-                        break;
-                }
-                /*
-                 * if a sub-lock was destroyed while top-lock was in
-                 * CLS_CACHED state, top-lock would have been moved into
-                 * CLS_NEW state, so all sub-locks have to be in place.
-                 */
+                LASSERT(slice->cls_lock->cll_state == CLS_INTRANSIT);
+
                 lls = &lck->lls_sub[i];
                 sub = lls->sub_lock;
-                LASSERT(sub != NULL);
+                if (sub == NULL) {
+                        /*
+                         * Sub-lock might have been canceled, while top-lock was
+                         * cached.
+                         */
+                        result = -ESTALE;
+                        break;
+                }
+
                 sublock = sub->lss_cl.cls_lock;
-                rc = lov_sublock_lock(env, lls, closure, &subenv);
+                rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
                 if (rc == 0) {
                         LASSERT(sublock->cll_state != CLS_FREEING);
                         lov_sublock_hold(env, lck, i);
                         if (sublock->cll_state == CLS_CACHED) {
-                                rc = cl_use_try(subenv->lse_env, sublock);
+                                rc = cl_use_try(subenv->lse_env, sublock, 0);
                                 if (rc != 0)
                                         rc = lov_sublock_release(env, lck,
                                                                  i, 1, rc);
-                        } else
-                                rc = 0;
+                        }
                         lov_sublock_unlock(env, sub, closure, subenv);
                 }
                 result = lov_subresult(result, rc);
-                if (result < 0)
+                if (result != 0)
                         break;
         }
+
+        if (lck->lls_cancel_race) {
+                /*
+                 * If there is unlocking happened at the same time, then
+                 * sublock_lock state should be FREEING, and lov_sublock_lock
+                 * should return CLO_REPEAT. In this case, it should return
+                 * ESTALE, and up layer should reset the lock state to be NEW.
+                 */
+                lck->lls_cancel_race = 0;
+                LASSERT(result != 0);
+                result = -ESTALE;
+        }
         cl_lock_closure_fini(closure);
         RETURN(result);
 }
@@ -824,6 +854,7 @@ static int lov_lock_stripe_is_matching(const struct lu_env *env,
 
                 subd->cld_obj  = NULL;   /* don't need sub object at all */
                 subd->cld_mode = descr->cld_mode;
+                subd->cld_gid  = descr->cld_gid;
                 result = lov_stripe_intersects(lsm, stripe, start, end,
                                                &sub_start, &sub_end);
                 LASSERT(result);
@@ -857,7 +888,12 @@ static int lov_lock_fits_into(const struct lu_env *env,
 
         ENTRY;
 
-        if (lov->lls_nr == 1) {
+        if (need->cld_mode == CLM_GROUP)
+                /*
+                 * always allow to match group lock.
+                 */
+                result = cl_lock_ext_match(&lov->lls_orig, need);
+        else if (lov->lls_nr == 1) {
                 struct cl_lock_descr *got = &lov->lls_sub[0].sub_got;
                 result = lov_lock_stripe_is_matching(env,
                                                      cl2lov(slice->cls_obj),
@@ -961,9 +997,9 @@ static void lov_lock_delete(const struct lu_env *env,
                         continue;
 
                 sublock = lsl->lss_cl.cls_lock;
-                rc = lov_sublock_lock(env, lls, closure, NULL);
+                rc = lov_sublock_lock(env, lck, lls, closure, NULL);
                 if (rc == 0) {
-                        if (lck->lls_sub[i].sub_flags & LSF_HELD)
+                        if (lls->sub_flags & LSF_HELD)
                                 lov_sublock_release(env, lck, i, 1, 0);
                         if (sublock->cll_state < CLS_FREEING) {
                                 struct lov_lock_link *link;
@@ -1040,7 +1076,7 @@ static struct cl_lock_closure *lov_closure_get(const struct lu_env *env,
         struct cl_lock_closure *closure;
 
         closure = &lov_env_info(env)->lti_closure;
-        LINVRNT(list_empty(&closure->clc_list));
+        LASSERT(list_empty(&closure->clc_list));
         cl_lock_closure_init(env, closure, parent, 1);
         return closure;
 }