Whamcloud - gitweb
b=20826
[fs/lustre-release.git] / lustre / lov / lov_lock.c
index 2917d78..e2b1520 100644 (file)
@@ -42,7 +42,9 @@
 
 #include "lov_cl_internal.h"
 
-/** \addtogroup lov lov @{ */
+/** \addtogroup lov
+ *  @{
+ */
 
 static struct cl_lock_closure *lov_closure_get(const struct lu_env *env,
                                                struct cl_lock *parent);
@@ -78,7 +80,6 @@ static struct lov_sublock_env *lov_sublock_env_get(const struct lu_env *env,
                 subenv->lse_io  = io;
                 subenv->lse_sub = NULL;
         } else {
-                LASSERT(io != NULL);
                 sub = lov_sub_get(env, lio, lls->sub_stripe);
                 if (!IS_ERR(sub)) {
                         subenv->lse_env = sub->sub_env;
@@ -144,7 +145,7 @@ static struct cl_lock *lov_sublock_alloc(const struct lu_env *env,
         LASSERT(idx < lck->lls_nr);
         ENTRY;
 
-        OBD_SLAB_ALLOC_PTR(link, lov_lock_link_kmem);
+        OBD_SLAB_ALLOC_PTR_GFP(link, lov_lock_link_kmem, CFS_ALLOC_IO);
         if (link != NULL) {
                 struct lov_sublock_env *subenv;
                 struct lov_lock_sub  *lls;
@@ -192,29 +193,45 @@ static void lov_sublock_unlock(const struct lu_env *env,
 }
 
 static int lov_sublock_lock(const struct lu_env *env,
+                            struct lov_lock *lck,
                             struct lov_lock_sub *lls,
                             struct cl_lock_closure *closure,
                             struct lov_sublock_env **lsep)
 {
-        struct cl_lock *child;
-        int             result = 0;
+        struct lovsub_lock *sublock;
+        struct cl_lock     *child;
+        int                 result = 0;
         ENTRY;
 
         LASSERT(list_empty(&closure->clc_list));
 
-        child = lls->sub_lock->lss_cl.cls_lock;
+        sublock = lls->sub_lock;
+        child = sublock->lss_cl.cls_lock;
         result = cl_lock_closure_build(env, child, closure);
         if (result == 0) {
                 struct cl_lock *parent = closure->clc_origin;
 
                 LASSERT(cl_lock_is_mutexed(child));
-                lls->sub_lock->lss_active = parent;
+                sublock->lss_active = parent;
 
-                if (lsep) {
+                if (unlikely(child->cll_state == CLS_FREEING)) {
+                        struct lov_lock_link *link;
+                        /*
+                         * we could race with lock deletion which temporarily
+                         * put the lock in freeing state, bug 19080.
+                         */
+                        LASSERT(!(lls->sub_flags & LSF_HELD));
+
+                        link = lov_lock_link_find(env, lck, sublock);
+                        LASSERT(link != NULL);
+                        lov_lock_unlink(env, link, sublock);
+                        lov_sublock_unlock(env, sublock, closure, NULL);
+                        result = CLO_REPEAT;
+                } else if (lsep) {
                         struct lov_sublock_env *subenv;
                         subenv = lov_sublock_env_get(env, parent, lls);
                         if (IS_ERR(subenv)) {
-                                lov_sublock_unlock(env, lls->sub_lock,
+                                lov_sublock_unlock(env, sublock,
                                                    closure, NULL);
                                 result = PTR_ERR(subenv);
                         } else {
@@ -274,10 +291,7 @@ static int lov_lock_sub_init(const struct lu_env *env,
 {
         int result = 0;
         int i;
-        int j;
         int nr;
-        int stripe;
-        int start_stripe;
         obd_off start;
         obd_off end;
         obd_off file_start;
@@ -293,14 +307,12 @@ static int lov_lock_sub_init(const struct lu_env *env,
         file_start = cl_offset(lov2cl(loo), parent->cll_descr.cld_start);
         file_end   = cl_offset(lov2cl(loo), parent->cll_descr.cld_end + 1) - 1;
 
-        start_stripe = lov_stripe_number(r0->lo_lsm, file_start);
         for (i = 0, nr = 0; i < r0->lo_nr; i++) {
                 /*
                  * XXX for wide striping smarter algorithm is desirable,
                  * breaking out of the loop, early.
                  */
-                stripe = (start_stripe + i) % r0->lo_nr;
-                if (lov_stripe_intersects(r0->lo_lsm, stripe,
+                if (lov_stripe_intersects(r0->lo_lsm, i,
                                           file_start, file_end, &start, &end))
                         nr++;
         }
@@ -317,21 +329,22 @@ static int lov_lock_sub_init(const struct lu_env *env,
          * create sub-locks. At this moment, no other thread can access
          * top-lock.
          */
-        for (j = 0, nr = 0; j < i; ++j) {
-                stripe = (start_stripe + j) % r0->lo_nr;
-                if (lov_stripe_intersects(r0->lo_lsm, stripe,
+        for (i = 0, nr = 0; i < r0->lo_nr; ++i) {
+                if (lov_stripe_intersects(r0->lo_lsm, i,
                                           file_start, file_end, &start, &end)) {
                         struct cl_lock_descr *descr;
 
                         descr = &lck->lls_sub[nr].sub_descr;
 
                         LASSERT(descr->cld_obj == NULL);
-                        descr->cld_obj   = lovsub2cl(r0->lo_sub[stripe]);
+                        descr->cld_obj   = lovsub2cl(r0->lo_sub[i]);
                         descr->cld_start = cl_index(descr->cld_obj, start);
                         descr->cld_end   = cl_index(descr->cld_obj, end);
                         descr->cld_mode  = parent->cll_descr.cld_mode;
+                        descr->cld_gid   = parent->cll_descr.cld_gid;
+                        /* XXX has no effect */
                         lck->lls_sub[nr].sub_got = *descr;
-                        lck->lls_sub[nr].sub_stripe = stripe;
+                        lck->lls_sub[nr].sub_stripe = i;
                         nr++;
                 }
         }
@@ -361,6 +374,7 @@ static int lov_lock_sub_init(const struct lu_env *env,
                                 lov_sublock_adopt(env, lck, sublock, i, link);
                                 cl_lock_mutex_put(env, parent);
                         } else {
+                                OBD_SLAB_FREE_PTR(link, lov_lock_link_kmem);
                                 cl_lock_mutex_put(env, parent);
                                 cl_lock_unhold(env, sublock,
                                                "lov-parent", parent);
@@ -401,6 +415,7 @@ static int lov_sublock_release(const struct lu_env *env, struct lov_lock *lck,
                  * while sub-lock is being paged out.
                  */
                 dying = (sublock->cll_descr.cld_mode == CLM_PHANTOM ||
+                         sublock->cll_descr.cld_mode == CLM_GROUP ||
                          (sublock->cll_flags & (CLF_CANCELPEND|CLF_DOOMED))) &&
                         sublock->cll_holds == 1;
                 if (dying)
@@ -522,6 +537,7 @@ static int lov_sublock_fill(const struct lu_env *env, struct cl_lock *parent,
                     lck->lls_sub[idx].sub_lock == NULL)
                         lov_sublock_adopt(env, lck, sublock, idx, link);
                 else {
+                        OBD_SLAB_FREE_PTR(link, lov_lock_link_kmem);
                         /* other thread allocated sub-lock, or enqueue is no
                          * longer going on */
                         cl_lock_mutex_put(env, parent);
@@ -585,7 +601,7 @@ static int lov_lock_enqueue(const struct lu_env *env,
                         break;
                 }
                 sublock = sub->lss_cl.cls_lock;
-                rc = lov_sublock_lock(env, lls, closure, &subenv);
+                rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
                 if (rc == 0) {
                         lov_sublock_hold(env, lck, i);
                         rc = lov_lock_enqueue_one(subenv->lse_env, lck, sublock,
@@ -601,7 +617,7 @@ static int lov_lock_enqueue(const struct lu_env *env,
                         lov_sublock_unlock(env, sub, closure, subenv);
                 }
                 result = lov_subresult(result, rc);
-                if (result < 0)
+                if (result != 0)
                         break;
         }
         cl_lock_closure_fini(closure);
@@ -635,7 +651,7 @@ static int lov_lock_unuse(const struct lu_env *env,
                         continue;
 
                 sublock = sub->lss_cl.cls_lock;
-                rc = lov_sublock_lock(env, lls, closure, &subenv);
+                rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
                 if (rc == 0) {
                         if (lck->lls_sub[i].sub_flags & LSF_HELD) {
                                 LASSERT(sublock->cll_state == CLS_HELD);
@@ -680,7 +696,7 @@ static int lov_lock_wait(const struct lu_env *env,
                 sub = lls->sub_lock;
                 LASSERT(sub != NULL);
                 sublock = sub->lss_cl.cls_lock;
-                rc = lov_sublock_lock(env, lls, closure, &subenv);
+                rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
                 if (rc == 0) {
                         LASSERT(sublock->cll_state >= CLS_ENQUEUED);
                         if (sublock->cll_state < CLS_HELD)
@@ -690,7 +706,7 @@ static int lov_lock_wait(const struct lu_env *env,
                         lov_sublock_unlock(env, sub, closure, subenv);
                 }
                 result = lov_subresult(result, rc);
-                if (result < 0)
+                if (result != 0)
                         break;
         }
         cl_lock_closure_fini(closure);
@@ -729,7 +745,7 @@ static int lov_lock_use(const struct lu_env *env,
                 sub = lls->sub_lock;
                 LASSERT(sub != NULL);
                 sublock = sub->lss_cl.cls_lock;
-                rc = lov_sublock_lock(env, lls, closure, &subenv);
+                rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
                 if (rc == 0) {
                         LASSERT(sublock->cll_state != CLS_FREEING);
                         lov_sublock_hold(env, lck, i);
@@ -743,7 +759,7 @@ static int lov_lock_use(const struct lu_env *env,
                         lov_sublock_unlock(env, sub, closure, subenv);
                 }
                 result = lov_subresult(result, rc);
-                if (result < 0)
+                if (result != 0)
                         break;
         }
         cl_lock_closure_fini(closure);
@@ -823,6 +839,7 @@ static int lov_lock_stripe_is_matching(const struct lu_env *env,
 
                 subd->cld_obj  = NULL;   /* don't need sub object at all */
                 subd->cld_mode = descr->cld_mode;
+                subd->cld_gid  = descr->cld_gid;
                 result = lov_stripe_intersects(lsm, stripe, start, end,
                                                &sub_start, &sub_end);
                 LASSERT(result);
@@ -856,7 +873,12 @@ static int lov_lock_fits_into(const struct lu_env *env,
 
         ENTRY;
 
-        if (lov->lls_nr == 1) {
+        if (need->cld_mode == CLM_GROUP)
+                /*
+                 * always allow to match group lock.
+                 */
+                result = cl_lock_ext_match(&lov->lls_orig, need);
+        else if (lov->lls_nr == 1) {
                 struct cl_lock_descr *got = &lov->lls_sub[0].sub_got;
                 result = lov_lock_stripe_is_matching(env,
                                                      cl2lov(slice->cls_obj),
@@ -960,7 +982,7 @@ static void lov_lock_delete(const struct lu_env *env,
                         continue;
 
                 sublock = lsl->lss_cl.cls_lock;
-                rc = lov_sublock_lock(env, lls, closure, NULL);
+                rc = lov_sublock_lock(env, lck, lls, closure, NULL);
                 if (rc == 0) {
                         if (lck->lls_sub[i].sub_flags & LSF_HELD)
                                 lov_sublock_release(env, lck, i, 1, 0);
@@ -1024,7 +1046,7 @@ int lov_lock_init_raid0(const struct lu_env *env, struct cl_object *obj,
         int result;
 
         ENTRY;
-        OBD_SLAB_ALLOC_PTR(lck, lov_lock_kmem);
+        OBD_SLAB_ALLOC_PTR_GFP(lck, lov_lock_kmem, CFS_ALLOC_IO);
         if (lck != NULL) {
                 cl_lock_slice_add(lock, &lck->lls_cl, obj, &lov_lock_ops);
                 result = lov_lock_sub_init(env, lck, io);
@@ -1039,7 +1061,7 @@ static struct cl_lock_closure *lov_closure_get(const struct lu_env *env,
         struct cl_lock_closure *closure;
 
         closure = &lov_env_info(env)->lti_closure;
-        LINVRNT(list_empty(&closure->clc_list));
+        LASSERT(list_empty(&closure->clc_list));
         cl_lock_closure_init(env, closure, parent, 1);
         return closure;
 }