Whamcloud - gitweb
b=17633
[fs/lustre-release.git] / lustre / lov / lov_lock.c
index 14ecd68..905b6cc 100644 (file)
@@ -53,6 +53,50 @@ static struct cl_lock_closure *lov_closure_get(const struct lu_env *env,
  *
  */
 
+static struct lov_sublock_env *lov_sublock_env_get(const struct lu_env *env,
+                                                   struct cl_lock *parent,
+                                                   struct lov_lock_sub *lls)
+{
+        struct lov_sublock_env *subenv;
+        struct lov_io          *lio    = lov_env_io(env);
+        struct cl_io           *io     = lio->lis_cl.cis_io;
+        struct lov_io_sub      *sub;
+
+        subenv = &lov_env_session(env)->ls_subenv;
+
+        /*
+         * FIXME: We tend to use the subio's env & io to call the sublock
+         * lock operations because osc lock sometimes stores some control
+         * variables in thread's IO infomation(Now only lockless information).
+         * However, if the lock's host(object) is different from the object
+         * for current IO, we have no way to get the subenv and subio because
+         * they are not initialized at all. As a temp fix, in this case,
+         * we still borrow the parent's env to call sublock operations.
+         */
+        if (!cl_object_same(io->ci_obj, parent->cll_descr.cld_obj)) {
+                subenv->lse_env = env;
+                subenv->lse_io  = io;
+                subenv->lse_sub = NULL;
+        } else {
+                LASSERT(io != NULL);
+                sub = lov_sub_get(env, lio, lls->sub_stripe);
+                if (!IS_ERR(sub)) {
+                        subenv->lse_env = sub->sub_env;
+                        subenv->lse_io  = sub->sub_io;
+                        subenv->lse_sub = sub;
+                } else {
+                        subenv = (void*)sub;
+                }
+        }
+        return subenv;
+}
+
+static void lov_sublock_env_put(struct lov_sublock_env *subenv)
+{
+        if (subenv && subenv->lse_sub)
+                lov_sub_put(subenv->lse_sub);
+}
+
 static void lov_sublock_adopt(const struct lu_env *env, struct lov_lock *lck,
                               struct cl_lock *sublock, int idx,
                               struct lov_lock_link *link)
@@ -102,15 +146,30 @@ static struct cl_lock *lov_sublock_alloc(const struct lu_env *env,
 
         OBD_SLAB_ALLOC_PTR(link, lov_lock_link_kmem);
         if (link != NULL) {
-                struct lov_lock_sub  *sub;
+                struct lov_sublock_env *subenv;
+                struct lov_lock_sub  *lls;
                 struct cl_lock_descr *descr;
 
                 parent = lck->lls_cl.cls_lock;
-                sub    = &lck->lls_sub[idx];
-                descr  = &sub->sub_descr;
+                lls    = &lck->lls_sub[idx];
+                descr  = &lls->sub_descr;
+
+                subenv = lov_sublock_env_get(env, parent, lls);
+                if (!IS_ERR(subenv)) {
+                        /* CAVEAT: Don't try to add a field in lov_lock_sub
+                         * to remember the subio. This is because lock is able
+                         * to be cached, but this is not true for IO. This
+                         * further means a sublock might be referenced in
+                         * different io context. -jay */
+
+                        sublock = cl_lock_hold(subenv->lse_env, subenv->lse_io,
+                                               descr, "lov-parent", parent);
+                        lov_sublock_env_put(subenv);
+                } else {
+                        /* error occurs. */
+                        sublock = (void*)subenv;
+                }
 
-                /* XXX maybe sub-io? */
-                sublock = cl_lock_hold(env, io, descr, "lov-parent", parent);
                 if (!IS_ERR(sublock))
                         *out = link;
                 else
@@ -122,28 +181,46 @@ static struct cl_lock *lov_sublock_alloc(const struct lu_env *env,
 
 static void lov_sublock_unlock(const struct lu_env *env,
                                struct lovsub_lock *lsl,
-                               struct cl_lock_closure *closure)
+                               struct cl_lock_closure *closure,
+                               struct lov_sublock_env *subenv)
 {
         ENTRY;
+        lov_sublock_env_put(subenv);
         lsl->lss_active = NULL;
         cl_lock_disclosure(env, closure);
         EXIT;
 }
 
-static int lov_sublock_lock(const struct lu_env *env, struct lovsub_lock *lsl,
-                            struct cl_lock_closure *closure)
+static int lov_sublock_lock(const struct lu_env *env,
+                            struct lov_lock_sub *lls,
+                            struct cl_lock_closure *closure,
+                            struct lov_sublock_env **lsep)
 {
         struct cl_lock *child;
-        int             result;
+        int             result = 0;
+        ENTRY;
 
         LASSERT(list_empty(&closure->clc_list));
 
-        ENTRY;
-        child = lsl->lss_cl.cls_lock;
+        child = lls->sub_lock->lss_cl.cls_lock;
         result = cl_lock_closure_build(env, child, closure);
         if (result == 0) {
+                struct cl_lock *parent = closure->clc_origin;
+
                 LASSERT(cl_lock_is_mutexed(child));
-                lsl->lss_active = closure->clc_origin;
+                lls->sub_lock->lss_active = parent;
+
+                if (lsep) {
+                        struct lov_sublock_env *subenv;
+                        subenv = lov_sublock_env_get(env, parent, lls);
+                        if (IS_ERR(subenv)) {
+                                lov_sublock_unlock(env, lls->sub_lock,
+                                                   closure, NULL);
+                                result = PTR_ERR(subenv);
+                        } else {
+                                *lsep = subenv;
+                        }
+                }
         }
         RETURN(result);
 }
@@ -308,7 +385,7 @@ static int lov_sublock_release(const struct lu_env *env, struct lov_lock *lck,
         ENTRY;
 
         if (lck->lls_sub[i].sub_flags & LSF_HELD) {
-                struct cl_lock *sublock;
+                struct cl_lock    *sublock;
                 int dying;
 
                 LASSERT(lck->lls_sub[i].sub_lock != NULL);
@@ -404,8 +481,8 @@ static int lov_lock_enqueue_one(const struct lu_env *env, struct lov_lock *lck,
                                 struct cl_io *io, __u32 enqflags, int last)
 {
         int result;
-
         ENTRY;
+
         /* first, try to enqueue a sub-lock ... */
         result = cl_enqueue_try(env, sublock, io, enqflags);
         if (sublock->cll_state == CLS_ENQUEUED)
@@ -480,8 +557,10 @@ static int lov_lock_enqueue(const struct lu_env *env,
 
         for (result = 0, minstate = CLS_FREEING, i = 0; i < lck->lls_nr; ++i) {
                 int rc;
-                struct lovsub_lock *sub;
-                struct cl_lock *sublock;
+                struct lovsub_lock     *sub;
+                struct lov_lock_sub    *lls;
+                struct cl_lock         *sublock;
+                struct lov_sublock_env *subenv;
 
                 if (lock->cll_state != CLS_QUEUING) {
                         /*
@@ -493,7 +572,8 @@ static int lov_lock_enqueue(const struct lu_env *env,
                         break;
                 }
 
-                sub = lck->lls_sub[i].sub_lock;
+                lls = &lck->lls_sub[i];
+                sub = lls->sub_lock;
                 /*
                  * Sub-lock might have been canceled, while top-lock was
                  * cached.
@@ -505,11 +585,11 @@ static int lov_lock_enqueue(const struct lu_env *env,
                         break;
                 }
                 sublock = sub->lss_cl.cls_lock;
-                rc = lov_sublock_lock(env, sub, closure);
+                rc = lov_sublock_lock(env, lls, closure, &subenv);
                 if (rc == 0) {
                         lov_sublock_hold(env, lck, i);
-                        rc = lov_lock_enqueue_one(env, lck, sublock, io,
-                                                  enqflags,
+                        rc = lov_lock_enqueue_one(subenv->lse_env, lck, sublock,
+                                                  subenv->lse_io, enqflags,
                                                   i == lck->lls_nr - 1);
                         minstate = min(minstate, sublock->cll_state);
                         /*
@@ -518,7 +598,7 @@ static int lov_lock_enqueue(const struct lu_env *env,
                          */
                         if (sublock->cll_state > CLS_HELD)
                                 rc = lov_sublock_release(env, lck, i, 1, rc);
-                        lov_sublock_unlock(env, sub, closure);
+                        lov_sublock_unlock(env, sub, closure, subenv);
                 }
                 result = lov_subresult(result, rc);
                 if (result < 0)
@@ -540,28 +620,31 @@ static int lov_lock_unuse(const struct lu_env *env,
 
         for (result = 0, i = 0; i < lck->lls_nr; ++i) {
                 int rc;
-                struct lovsub_lock *sub;
-                struct cl_lock *sublock;
+                struct lovsub_lock     *sub;
+                struct cl_lock         *sublock;
+                struct lov_lock_sub    *lls;
+                struct lov_sublock_env *subenv;
 
                 /* top-lock state cannot change concurrently, because single
                  * thread (one that released the last hold) carries unlocking
                  * to the completion. */
                 LASSERT(slice->cls_lock->cll_state == CLS_UNLOCKING);
-                sub = lck->lls_sub[i].sub_lock;
+                lls = &lck->lls_sub[i];
+                sub = lls->sub_lock;
                 if (sub == NULL)
                         continue;
 
                 sublock = sub->lss_cl.cls_lock;
-                rc = lov_sublock_lock(env, sub, closure);
+                rc = lov_sublock_lock(env, lls, closure, &subenv);
                 if (rc == 0) {
                         if (lck->lls_sub[i].sub_flags & LSF_HELD) {
                                 LASSERT(sublock->cll_state == CLS_HELD);
-                                rc = cl_unuse_try(env, sublock);
+                                rc = cl_unuse_try(subenv->lse_env, sublock);
                                 if (rc != CLO_WAIT)
                                         rc = lov_sublock_release(env, lck,
                                                                  i, 0, rc);
                         }
-                        lov_sublock_unlock(env, sub, closure);
+                        lov_sublock_unlock(env, sub, closure, subenv);
                 }
                 result = lov_subresult(result, rc);
                 if (result < 0)
@@ -588,19 +671,23 @@ static int lov_lock_wait(const struct lu_env *env,
 
         for (result = 0, minstate = CLS_FREEING, i = 0; i < lck->lls_nr; ++i) {
                 int rc;
-                struct lovsub_lock *sub;
-                struct cl_lock *sublock;
+                struct lovsub_lock     *sub;
+                struct cl_lock         *sublock;
+                struct lov_lock_sub    *lls;
+                struct lov_sublock_env *subenv;
 
-                sub = lck->lls_sub[i].sub_lock;
+                lls = &lck->lls_sub[i];
+                sub = lls->sub_lock;
                 LASSERT(sub != NULL);
                 sublock = sub->lss_cl.cls_lock;
-                rc = lov_sublock_lock(env, sub, closure);
+                rc = lov_sublock_lock(env, lls, closure, &subenv);
                 if (rc == 0) {
                         LASSERT(sublock->cll_state >= CLS_ENQUEUED);
                         if (sublock->cll_state < CLS_HELD)
                                 rc = cl_wait_try(env, sublock);
+
                         minstate = min(minstate, sublock->cll_state);
-                        lov_sublock_unlock(env, sub, closure);
+                        lov_sublock_unlock(env, sub, closure, subenv);
                 }
                 result = lov_subresult(result, rc);
                 if (result < 0)
@@ -623,8 +710,10 @@ static int lov_lock_use(const struct lu_env *env,
 
         for (result = 0, i = 0; i < lck->lls_nr; ++i) {
                 int rc;
-                struct lovsub_lock *sub;
-                struct cl_lock *sublock;
+                struct lovsub_lock     *sub;
+                struct cl_lock         *sublock;
+                struct lov_lock_sub    *lls;
+                struct lov_sublock_env *subenv;
 
                 if (slice->cls_lock->cll_state != CLS_CACHED) {
                         /* see comment in lov_lock_enqueue(). */
@@ -636,21 +725,22 @@ static int lov_lock_use(const struct lu_env *env,
                  * CLS_CACHED state, top-lock would have been moved into
                  * CLS_NEW state, so all sub-locks have to be in place.
                  */
-                sub = lck->lls_sub[i].sub_lock;
+                lls = &lck->lls_sub[i];
+                sub = lls->sub_lock;
                 LASSERT(sub != NULL);
                 sublock = sub->lss_cl.cls_lock;
-                rc = lov_sublock_lock(env, sub, closure);
+                rc = lov_sublock_lock(env, lls, closure, &subenv);
                 if (rc == 0) {
                         LASSERT(sublock->cll_state != CLS_FREEING);
                         lov_sublock_hold(env, lck, i);
                         if (sublock->cll_state == CLS_CACHED) {
-                                rc = cl_use_try(env, sublock);
+                                rc = cl_use_try(subenv->lse_env, sublock);
                                 if (rc != 0)
                                         rc = lov_sublock_release(env, lck,
                                                                  i, 1, rc);
                         } else
                                 rc = 0;
-                        lov_sublock_unlock(env, sub, closure);
+                        lov_sublock_unlock(env, sub, closure, subenv);
                 }
                 result = lov_subresult(result, rc);
                 if (result < 0)
@@ -838,16 +928,18 @@ static void lov_lock_delete(const struct lu_env *env,
         ENTRY;
 
         for (i = 0; i < lck->lls_nr; ++i) {
-                struct lovsub_lock *lsl;
-                struct cl_lock *sublock;
+                struct lov_lock_sub *lls;
+                struct lovsub_lock  *lsl;
+                struct cl_lock      *sublock;
                 int rc;
 
-                lsl = lck->lls_sub[i].sub_lock;
+                lls = &lck->lls_sub[i];
+                lsl = lls->sub_lock;
                 if (lsl == NULL)
                         continue;
 
                 sublock = lsl->lss_cl.cls_lock;
-                rc = lov_sublock_lock(env, lsl, closure);
+                rc = lov_sublock_lock(env, lls, closure, NULL);
                 if (rc == 0) {
                         if (lck->lls_sub[i].sub_flags & LSF_HELD)
                                 lov_sublock_release(env, lck, i, 1, 0);
@@ -859,7 +951,7 @@ static void lov_lock_delete(const struct lu_env *env,
                                 lov_lock_unlink(env, link, lsl);
                                 LASSERT(lck->lls_sub[i].sub_lock == NULL);
                         }
-                        lov_sublock_unlock(env, lsl, closure);
+                        lov_sublock_unlock(env, lsl, closure, NULL);
                 } else if (rc == CLO_REPEAT) {
                         --i; /* repeat with this lock */
                 } else {