Whamcloud - gitweb
branch: HEAD
[fs/lustre-release.git] / lustre / lov / lov_lock.c
index 14ecd68..79a6942 100644 (file)
@@ -53,6 +53,50 @@ static struct cl_lock_closure *lov_closure_get(const struct lu_env *env,
  *
  */
 
+static struct lov_sublock_env *lov_sublock_env_get(const struct lu_env *env,
+                                                   struct cl_lock *parent,
+                                                   struct lov_lock_sub *lls)
+{
+        struct lov_sublock_env *subenv;
+        struct lov_io          *lio    = lov_env_io(env);
+        struct cl_io           *io     = lio->lis_cl.cis_io;
+        struct lov_io_sub      *sub;
+
+        subenv = &lov_env_session(env)->ls_subenv;
+
+        /*
+         * FIXME: We tend to use the subio's env & io to call the sublock
+         * lock operations because osc lock sometimes stores some control
+         * variables in thread's IO infomation(Now only lockless information).
+         * However, if the lock's host(object) is different from the object
+         * for current IO, we have no way to get the subenv and subio because
+         * they are not initialized at all. As a temp fix, in this case,
+         * we still borrow the parent's env to call sublock operations.
+         */
+        if (!cl_object_same(io->ci_obj, parent->cll_descr.cld_obj)) {
+                subenv->lse_env = env;
+                subenv->lse_io  = io;
+                subenv->lse_sub = NULL;
+        } else {
+                LASSERT(io != NULL);
+                sub = lov_sub_get(env, lio, lls->sub_stripe);
+                if (!IS_ERR(sub)) {
+                        subenv->lse_env = sub->sub_env;
+                        subenv->lse_io  = sub->sub_io;
+                        subenv->lse_sub = sub;
+                } else {
+                        subenv = (void*)sub;
+                }
+        }
+        return subenv;
+}
+
+static void lov_sublock_env_put(struct lov_sublock_env *subenv)
+{
+        if (subenv && subenv->lse_sub)
+                lov_sub_put(subenv->lse_sub);
+}
+
 static void lov_sublock_adopt(const struct lu_env *env, struct lov_lock *lck,
                               struct cl_lock *sublock, int idx,
                               struct lov_lock_link *link)
@@ -100,17 +144,32 @@ static struct cl_lock *lov_sublock_alloc(const struct lu_env *env,
         LASSERT(idx < lck->lls_nr);
         ENTRY;
 
-        OBD_SLAB_ALLOC_PTR(link, lov_lock_link_kmem);
+        OBD_SLAB_ALLOC_PTR_GFP(link, lov_lock_link_kmem, CFS_ALLOC_IO);
         if (link != NULL) {
-                struct lov_lock_sub  *sub;
+                struct lov_sublock_env *subenv;
+                struct lov_lock_sub  *lls;
                 struct cl_lock_descr *descr;
 
                 parent = lck->lls_cl.cls_lock;
-                sub    = &lck->lls_sub[idx];
-                descr  = &sub->sub_descr;
+                lls    = &lck->lls_sub[idx];
+                descr  = &lls->sub_descr;
+
+                subenv = lov_sublock_env_get(env, parent, lls);
+                if (!IS_ERR(subenv)) {
+                        /* CAVEAT: Don't try to add a field in lov_lock_sub
+                         * to remember the subio. This is because lock is able
+                         * to be cached, but this is not true for IO. This
+                         * further means a sublock might be referenced in
+                         * different io context. -jay */
+
+                        sublock = cl_lock_hold(subenv->lse_env, subenv->lse_io,
+                                               descr, "lov-parent", parent);
+                        lov_sublock_env_put(subenv);
+                } else {
+                        /* error occurs. */
+                        sublock = (void*)subenv;
+                }
 
-                /* XXX maybe sub-io? */
-                sublock = cl_lock_hold(env, io, descr, "lov-parent", parent);
                 if (!IS_ERR(sublock))
                         *out = link;
                 else
@@ -122,28 +181,46 @@ static struct cl_lock *lov_sublock_alloc(const struct lu_env *env,
 
 static void lov_sublock_unlock(const struct lu_env *env,
                                struct lovsub_lock *lsl,
-                               struct cl_lock_closure *closure)
+                               struct cl_lock_closure *closure,
+                               struct lov_sublock_env *subenv)
 {
         ENTRY;
+        lov_sublock_env_put(subenv);
         lsl->lss_active = NULL;
         cl_lock_disclosure(env, closure);
         EXIT;
 }
 
-static int lov_sublock_lock(const struct lu_env *env, struct lovsub_lock *lsl,
-                            struct cl_lock_closure *closure)
+static int lov_sublock_lock(const struct lu_env *env,
+                            struct lov_lock_sub *lls,
+                            struct cl_lock_closure *closure,
+                            struct lov_sublock_env **lsep)
 {
         struct cl_lock *child;
-        int             result;
+        int             result = 0;
+        ENTRY;
 
         LASSERT(list_empty(&closure->clc_list));
 
-        ENTRY;
-        child = lsl->lss_cl.cls_lock;
+        child = lls->sub_lock->lss_cl.cls_lock;
         result = cl_lock_closure_build(env, child, closure);
         if (result == 0) {
+                struct cl_lock *parent = closure->clc_origin;
+
                 LASSERT(cl_lock_is_mutexed(child));
-                lsl->lss_active = closure->clc_origin;
+                lls->sub_lock->lss_active = parent;
+
+                if (lsep) {
+                        struct lov_sublock_env *subenv;
+                        subenv = lov_sublock_env_get(env, parent, lls);
+                        if (IS_ERR(subenv)) {
+                                lov_sublock_unlock(env, lls->sub_lock,
+                                                   closure, NULL);
+                                result = PTR_ERR(subenv);
+                        } else {
+                                *lsep = subenv;
+                        }
+                }
         }
         RETURN(result);
 }
@@ -253,6 +330,8 @@ static int lov_lock_sub_init(const struct lu_env *env,
                         descr->cld_start = cl_index(descr->cld_obj, start);
                         descr->cld_end   = cl_index(descr->cld_obj, end);
                         descr->cld_mode  = parent->cll_descr.cld_mode;
+                        descr->cld_gid   = parent->cll_descr.cld_gid;
+                        /* XXX has no effect */
                         lck->lls_sub[nr].sub_got = *descr;
                         lck->lls_sub[nr].sub_stripe = stripe;
                         nr++;
@@ -308,7 +387,7 @@ static int lov_sublock_release(const struct lu_env *env, struct lov_lock *lck,
         ENTRY;
 
         if (lck->lls_sub[i].sub_flags & LSF_HELD) {
-                struct cl_lock *sublock;
+                struct cl_lock    *sublock;
                 int dying;
 
                 LASSERT(lck->lls_sub[i].sub_lock != NULL);
@@ -324,6 +403,7 @@ static int lov_sublock_release(const struct lu_env *env, struct lov_lock *lck,
                  * while sub-lock is being paged out.
                  */
                 dying = (sublock->cll_descr.cld_mode == CLM_PHANTOM ||
+                         sublock->cll_descr.cld_mode == CLM_GROUP ||
                          (sublock->cll_flags & (CLF_CANCELPEND|CLF_DOOMED))) &&
                         sublock->cll_holds == 1;
                 if (dying)
@@ -404,8 +484,8 @@ static int lov_lock_enqueue_one(const struct lu_env *env, struct lov_lock *lck,
                                 struct cl_io *io, __u32 enqflags, int last)
 {
         int result;
-
         ENTRY;
+
         /* first, try to enqueue a sub-lock ... */
         result = cl_enqueue_try(env, sublock, io, enqflags);
         if (sublock->cll_state == CLS_ENQUEUED)
@@ -480,8 +560,10 @@ static int lov_lock_enqueue(const struct lu_env *env,
 
         for (result = 0, minstate = CLS_FREEING, i = 0; i < lck->lls_nr; ++i) {
                 int rc;
-                struct lovsub_lock *sub;
-                struct cl_lock *sublock;
+                struct lovsub_lock     *sub;
+                struct lov_lock_sub    *lls;
+                struct cl_lock         *sublock;
+                struct lov_sublock_env *subenv;
 
                 if (lock->cll_state != CLS_QUEUING) {
                         /*
@@ -493,7 +575,8 @@ static int lov_lock_enqueue(const struct lu_env *env,
                         break;
                 }
 
-                sub = lck->lls_sub[i].sub_lock;
+                lls = &lck->lls_sub[i];
+                sub = lls->sub_lock;
                 /*
                  * Sub-lock might have been canceled, while top-lock was
                  * cached.
@@ -505,11 +588,11 @@ static int lov_lock_enqueue(const struct lu_env *env,
                         break;
                 }
                 sublock = sub->lss_cl.cls_lock;
-                rc = lov_sublock_lock(env, sub, closure);
+                rc = lov_sublock_lock(env, lls, closure, &subenv);
                 if (rc == 0) {
                         lov_sublock_hold(env, lck, i);
-                        rc = lov_lock_enqueue_one(env, lck, sublock, io,
-                                                  enqflags,
+                        rc = lov_lock_enqueue_one(subenv->lse_env, lck, sublock,
+                                                  subenv->lse_io, enqflags,
                                                   i == lck->lls_nr - 1);
                         minstate = min(minstate, sublock->cll_state);
                         /*
@@ -518,7 +601,7 @@ static int lov_lock_enqueue(const struct lu_env *env,
                          */
                         if (sublock->cll_state > CLS_HELD)
                                 rc = lov_sublock_release(env, lck, i, 1, rc);
-                        lov_sublock_unlock(env, sub, closure);
+                        lov_sublock_unlock(env, sub, closure, subenv);
                 }
                 result = lov_subresult(result, rc);
                 if (result < 0)
@@ -540,28 +623,31 @@ static int lov_lock_unuse(const struct lu_env *env,
 
         for (result = 0, i = 0; i < lck->lls_nr; ++i) {
                 int rc;
-                struct lovsub_lock *sub;
-                struct cl_lock *sublock;
+                struct lovsub_lock     *sub;
+                struct cl_lock         *sublock;
+                struct lov_lock_sub    *lls;
+                struct lov_sublock_env *subenv;
 
                 /* top-lock state cannot change concurrently, because single
                  * thread (one that released the last hold) carries unlocking
                  * to the completion. */
                 LASSERT(slice->cls_lock->cll_state == CLS_UNLOCKING);
-                sub = lck->lls_sub[i].sub_lock;
+                lls = &lck->lls_sub[i];
+                sub = lls->sub_lock;
                 if (sub == NULL)
                         continue;
 
                 sublock = sub->lss_cl.cls_lock;
-                rc = lov_sublock_lock(env, sub, closure);
+                rc = lov_sublock_lock(env, lls, closure, &subenv);
                 if (rc == 0) {
                         if (lck->lls_sub[i].sub_flags & LSF_HELD) {
                                 LASSERT(sublock->cll_state == CLS_HELD);
-                                rc = cl_unuse_try(env, sublock);
+                                rc = cl_unuse_try(subenv->lse_env, sublock);
                                 if (rc != CLO_WAIT)
                                         rc = lov_sublock_release(env, lck,
                                                                  i, 0, rc);
                         }
-                        lov_sublock_unlock(env, sub, closure);
+                        lov_sublock_unlock(env, sub, closure, subenv);
                 }
                 result = lov_subresult(result, rc);
                 if (result < 0)
@@ -588,19 +674,23 @@ static int lov_lock_wait(const struct lu_env *env,
 
         for (result = 0, minstate = CLS_FREEING, i = 0; i < lck->lls_nr; ++i) {
                 int rc;
-                struct lovsub_lock *sub;
-                struct cl_lock *sublock;
+                struct lovsub_lock     *sub;
+                struct cl_lock         *sublock;
+                struct lov_lock_sub    *lls;
+                struct lov_sublock_env *subenv;
 
-                sub = lck->lls_sub[i].sub_lock;
+                lls = &lck->lls_sub[i];
+                sub = lls->sub_lock;
                 LASSERT(sub != NULL);
                 sublock = sub->lss_cl.cls_lock;
-                rc = lov_sublock_lock(env, sub, closure);
+                rc = lov_sublock_lock(env, lls, closure, &subenv);
                 if (rc == 0) {
                         LASSERT(sublock->cll_state >= CLS_ENQUEUED);
                         if (sublock->cll_state < CLS_HELD)
                                 rc = cl_wait_try(env, sublock);
+
                         minstate = min(minstate, sublock->cll_state);
-                        lov_sublock_unlock(env, sub, closure);
+                        lov_sublock_unlock(env, sub, closure, subenv);
                 }
                 result = lov_subresult(result, rc);
                 if (result < 0)
@@ -623,8 +713,10 @@ static int lov_lock_use(const struct lu_env *env,
 
         for (result = 0, i = 0; i < lck->lls_nr; ++i) {
                 int rc;
-                struct lovsub_lock *sub;
-                struct cl_lock *sublock;
+                struct lovsub_lock     *sub;
+                struct cl_lock         *sublock;
+                struct lov_lock_sub    *lls;
+                struct lov_sublock_env *subenv;
 
                 if (slice->cls_lock->cll_state != CLS_CACHED) {
                         /* see comment in lov_lock_enqueue(). */
@@ -636,21 +728,22 @@ static int lov_lock_use(const struct lu_env *env,
                  * CLS_CACHED state, top-lock would have been moved into
                  * CLS_NEW state, so all sub-locks have to be in place.
                  */
-                sub = lck->lls_sub[i].sub_lock;
+                lls = &lck->lls_sub[i];
+                sub = lls->sub_lock;
                 LASSERT(sub != NULL);
                 sublock = sub->lss_cl.cls_lock;
-                rc = lov_sublock_lock(env, sub, closure);
+                rc = lov_sublock_lock(env, lls, closure, &subenv);
                 if (rc == 0) {
                         LASSERT(sublock->cll_state != CLS_FREEING);
                         lov_sublock_hold(env, lck, i);
                         if (sublock->cll_state == CLS_CACHED) {
-                                rc = cl_use_try(env, sublock);
+                                rc = cl_use_try(subenv->lse_env, sublock);
                                 if (rc != 0)
                                         rc = lov_sublock_release(env, lck,
                                                                  i, 1, rc);
                         } else
                                 rc = 0;
-                        lov_sublock_unlock(env, sub, closure);
+                        lov_sublock_unlock(env, sub, closure, subenv);
                 }
                 result = lov_subresult(result, rc);
                 if (result < 0)
@@ -699,19 +792,49 @@ static int lock_lock_multi_match()
 }
 #endif
 
-static int lov_is_same_stripe(struct lov_object *lov, int stripe,
-                              const struct cl_lock_descr *descr)
+/**
+ * Check if the extent region \a descr is covered by \a child against the
+ * specific \a stripe.
+ */
+static int lov_lock_stripe_is_matching(const struct lu_env *env,
+                                       struct lov_object *lov, int stripe,
+                                       const struct cl_lock_descr *child,
+                                       const struct cl_lock_descr *descr)
 {
         struct lov_stripe_md *lsm = lov_r0(lov)->lo_lsm;
         obd_off start;
         obd_off end;
+        int result;
 
+        if (lov_r0(lov)->lo_nr == 1)
+                return cl_lock_ext_match(child, descr);
+
+        /*
+         * For a multi-stripes object:
+         * - make sure the descr only covers child's stripe, and
+         * - check if extent is matching.
+         */
         start = cl_offset(&lov->lo_cl, descr->cld_start);
         end   = cl_offset(&lov->lo_cl, descr->cld_end + 1) - 1;
-        return
-                end - start <= lsm->lsm_stripe_size &&
-                stripe == lov_stripe_number(lsm, start) &&
-                stripe == lov_stripe_number(lsm, end);
+        result = end - start <= lsm->lsm_stripe_size &&
+                 stripe == lov_stripe_number(lsm, start) &&
+                 stripe == lov_stripe_number(lsm, end);
+        if (result) {
+                struct cl_lock_descr *subd = &lov_env_info(env)->lti_ldescr;
+                obd_off sub_start;
+                obd_off sub_end;
+
+                subd->cld_obj  = NULL;   /* don't need sub object at all */
+                subd->cld_mode = descr->cld_mode;
+                subd->cld_gid  = descr->cld_gid;
+                result = lov_stripe_intersects(lsm, stripe, start, end,
+                                               &sub_start, &sub_end);
+                LASSERT(result);
+                subd->cld_start = cl_index(child->cld_obj, sub_start);
+                subd->cld_end   = cl_index(child->cld_obj, sub_end);
+                result = cl_lock_ext_match(child, subd);
+        }
+        return result;
 }
 
 /**
@@ -737,20 +860,17 @@ static int lov_lock_fits_into(const struct lu_env *env,
 
         ENTRY;
 
-        if (lov->lls_nr == 1) {
+        if (need->cld_mode == CLM_GROUP)
                 /*
-                 * If a lock is on a single stripe, it's enough to check that
-                 * @need lock matches actually granted stripe lock, and...
+                 * always allow to match group lock.
                  */
-                result = cl_lock_ext_match(&lov->lls_sub[0].sub_got, need);
-                if (result && lov_r0(obj)->lo_nr > 1)
-                        /*
-                         * ... @need is on the same stripe, if multiple
-                         * stripes are possible at all for this object.
-                         */
-                        result = lov_is_same_stripe(cl2lov(slice->cls_obj),
-                                                    lov->lls_sub[0].sub_stripe,
-                                                    need);
+                result = cl_lock_ext_match(&lov->lls_orig, need);
+        else if (lov->lls_nr == 1) {
+                struct cl_lock_descr *got = &lov->lls_sub[0].sub_got;
+                result = lov_lock_stripe_is_matching(env,
+                                                     cl2lov(slice->cls_obj),
+                                                     lov->lls_sub[0].sub_stripe,
+                                                     got, need);
         } else if (io->ci_type != CIT_TRUNC && io->ci_type != CIT_MISC &&
                    !cl_io_is_append(io) && need->cld_mode != CLM_PHANTOM)
                 /*
@@ -838,16 +958,18 @@ static void lov_lock_delete(const struct lu_env *env,
         ENTRY;
 
         for (i = 0; i < lck->lls_nr; ++i) {
-                struct lovsub_lock *lsl;
-                struct cl_lock *sublock;
+                struct lov_lock_sub *lls;
+                struct lovsub_lock  *lsl;
+                struct cl_lock      *sublock;
                 int rc;
 
-                lsl = lck->lls_sub[i].sub_lock;
+                lls = &lck->lls_sub[i];
+                lsl = lls->sub_lock;
                 if (lsl == NULL)
                         continue;
 
                 sublock = lsl->lss_cl.cls_lock;
-                rc = lov_sublock_lock(env, lsl, closure);
+                rc = lov_sublock_lock(env, lls, closure, NULL);
                 if (rc == 0) {
                         if (lck->lls_sub[i].sub_flags & LSF_HELD)
                                 lov_sublock_release(env, lck, i, 1, 0);
@@ -859,7 +981,7 @@ static void lov_lock_delete(const struct lu_env *env,
                                 lov_lock_unlink(env, link, lsl);
                                 LASSERT(lck->lls_sub[i].sub_lock == NULL);
                         }
-                        lov_sublock_unlock(env, lsl, closure);
+                        lov_sublock_unlock(env, lsl, closure, NULL);
                 } else if (rc == CLO_REPEAT) {
                         --i; /* repeat with this lock */
                 } else {
@@ -911,7 +1033,7 @@ int lov_lock_init_raid0(const struct lu_env *env, struct cl_object *obj,
         int result;
 
         ENTRY;
-        OBD_SLAB_ALLOC_PTR(lck, lov_lock_kmem);
+        OBD_SLAB_ALLOC_PTR_GFP(lck, lov_lock_kmem, CFS_ALLOC_IO);
         if (lck != NULL) {
                 cl_lock_slice_add(lock, &lck->lls_cl, obj, &lov_lock_ops);
                 result = lov_lock_sub_init(env, lck, io);
@@ -926,7 +1048,7 @@ static struct cl_lock_closure *lov_closure_get(const struct lu_env *env,
         struct cl_lock_closure *closure;
 
         closure = &lov_env_info(env)->lti_closure;
-        LINVRNT(list_empty(&closure->clc_list));
+        LASSERT(list_empty(&closure->clc_list));
         cl_lock_closure_init(env, closure, parent, 1);
         return closure;
 }