Whamcloud - gitweb
b=20824
authorjxiong <jxiong>
Tue, 20 Oct 2009 09:20:45 +0000 (09:20 +0000)
committerjxiong <jxiong>
Tue, 20 Oct 2009 09:20:45 +0000 (09:20 +0000)
r=wangdi,eric.mei

Reimplement cl_lock_peek().

lustre/include/cl_object.h
lustre/lov/lov_cl_internal.h
lustre/lov/lov_lock.c
lustre/lov/lovsub_lock.c
lustre/obdclass/cl_lock.c
lustre/obdclass/cl_object.c
lustre/osc/osc_lock.c

index 9fd2d88..7536f80 100644 (file)
@@ -1337,15 +1337,15 @@ const char *cl_lock_mode_name(const enum cl_lock_mode mode);
  *              |  |                 V
  *              |  |                HELD<---------+
  *              |  |                 |            |
- *              |  |                 |            |
+ *              |  |                 |            | cl_use_try()
  *              |  |  cl_unuse_try() |            |
  *              |  |                 |            |
- *              |  |                 V            | cached
- *              |  +------------>UNLOCKING (*)    | lock found
- *              |                    |            |
- *              |     cl_unuse_try() |            |
+ *              |  |                 V         ---+ 
+ *              |  +------------>INTRANSIT (D) <--+
  *              |                    |            |
+ *              |     cl_unuse_try() |            | cached lock found
  *              |                    |            | cl_use_try()
+ *              |                    |            |
  *              |                    V            |
  *              +------------------CACHED---------+
  *                                   |
@@ -1364,6 +1364,8 @@ const char *cl_lock_mode_name(const enum cl_lock_mode mode);
  *
  *         (C) is the point where Cancellation call-back is invoked.
  *
+ *         (D) is the transit state which means the lock is changing.
+ *
  *         Transition to FREEING state is possible from any other state in the
  *         diagram in case of unrecoverable error.
  * </pre>
@@ -1382,9 +1384,6 @@ const char *cl_lock_mode_name(const enum cl_lock_mode mode);
  * handled, and is in ENQUEUED state after enqueue to S2 has been sent (note
  * that in this case, sub-locks move from state to state, and top-lock remains
  * in the same state).
- *
- * Separate UNLOCKING state is needed to maintain an invariant that in HELD
- * state lock is immediately ready for use.
  */
 enum cl_lock_state {
         /**
@@ -1406,10 +1405,16 @@ enum cl_lock_state {
          */
         CLS_HELD,
         /**
-         * Lock is in the transition from CLS_HELD to CLS_CACHED. Lock is in
-         * this state only while cl_unuse() is executing against it.
+         * This state is used to mark the lock is being used, or unused.
+         * We need this state because the lock may have several sublocks,
+         * so it's impossible to have an atomic way to bring all sublocks
+         * into CLS_HELD state at use case, or all sublocks to CLS_CACHED
+         * at unuse case.
+         * If a thread is referring to a lock, and it sees the lock is in this
+         * state, it must wait for the lock.
+         * See state diagram for details.
          */
-        CLS_UNLOCKING,
+         CLS_INTRANSIT,
         /**
          * Lock granted, not used.
          */
@@ -1430,9 +1435,7 @@ enum cl_lock_flags {
         /** cancellation is pending for this lock. */
         CLF_CANCELPEND = 1 << 1,
         /** destruction is pending for this lock. */
-        CLF_DOOMED     = 1 << 2,
-        /** State update is pending. */
-        CLF_STATE      = 1 << 3
+        CLF_DOOMED     = 1 << 2
 };
 
 /**
@@ -1530,6 +1533,10 @@ struct cl_lock {
         cfs_task_t           *cll_guarder;
         int                   cll_depth;
 
+        /**
+         * the owner for INTRANSIT state
+         */
+        cfs_task_t           *cll_intransit_owner;
         int                   cll_error;
         /**
          * Number of holds on a lock. A hold prevents a lock from being
@@ -2779,6 +2786,14 @@ int   cl_lock_user_del  (const struct lu_env *env, struct cl_lock *lock);
 int   cl_lock_compatible(const struct cl_lock *lock1,
                          const struct cl_lock *lock2);
 
+enum cl_lock_state cl_lock_intransit(const struct lu_env *env,
+                                     struct cl_lock *lock);
+
+void cl_lock_extransit(const struct lu_env *env, struct cl_lock *lock,
+                       enum cl_lock_state state);
+
+int cl_lock_is_intransit(struct cl_lock *lock);
+
 /** \name statemachine statemachine
  * Interface to lock state machine consists of 3 parts:
  *
@@ -2819,7 +2834,7 @@ int   cl_enqueue_try(const struct lu_env *env, struct cl_lock *lock,
                      struct cl_io *io, __u32 flags);
 int   cl_unuse_try  (const struct lu_env *env, struct cl_lock *lock);
 int   cl_wait_try   (const struct lu_env *env, struct cl_lock *lock);
-int   cl_use_try    (const struct lu_env *env, struct cl_lock *lock);
+int   cl_use_try    (const struct lu_env *env, struct cl_lock *lock, int atomic);
 /** @} statemachine */
 
 void cl_lock_signal      (const struct lu_env *env, struct cl_lock *lock);
index 5401f6b..dbc63dc 100644 (file)
@@ -283,9 +283,9 @@ struct lov_lock {
         unsigned               lls_nr_filled;
         /**
          * Set when sub-lock was canceled, while top-lock was being
-         * unlocked.
+         * used, or unused.
          */
-        int                    lls_unuse_race;
+        int                    lls_cancel_race:1;
         /**
          * An array of sub-locks
          *
index e2b1520..bc6ab44 100644 (file)
@@ -49,6 +49,8 @@
 static struct cl_lock_closure *lov_closure_get(const struct lu_env *env,
                                                struct cl_lock *parent);
 
+static int lov_lock_unuse(const struct lu_env *env,
+                          const struct cl_lock_slice *slice);
 /*****************************************************************************
  *
  * Lov lock operations.
@@ -226,6 +228,7 @@ static int lov_sublock_lock(const struct lu_env *env,
                         LASSERT(link != NULL);
                         lov_lock_unlink(env, link, sublock);
                         lov_sublock_unlock(env, sublock, closure, NULL);
+                        lck->lls_cancel_race = 1;
                         result = CLO_REPEAT;
                 } else if (lsep) {
                         struct lov_sublock_env *subenv;
@@ -644,7 +647,7 @@ static int lov_lock_unuse(const struct lu_env *env,
                 /* top-lock state cannot change concurrently, because single
                  * thread (one that released the last hold) carries unlocking
                  * to the completion. */
-                LASSERT(slice->cls_lock->cll_state == CLS_UNLOCKING);
+                LASSERT(slice->cls_lock->cll_state == CLS_INTRANSIT);
                 lls = &lck->lls_sub[i];
                 sub = lls->sub_lock;
                 if (sub == NULL)
@@ -653,7 +656,7 @@ static int lov_lock_unuse(const struct lu_env *env,
                 sublock = sub->lss_cl.cls_lock;
                 rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
                 if (rc == 0) {
-                        if (lck->lls_sub[i].sub_flags & LSF_HELD) {
+                        if (lls->sub_flags & LSF_HELD) {
                                 LASSERT(sublock->cll_state == CLS_HELD);
                                 rc = cl_unuse_try(subenv->lse_env, sublock);
                                 if (rc != CLO_WAIT)
@@ -666,8 +669,9 @@ static int lov_lock_unuse(const struct lu_env *env,
                 if (result < 0)
                         break;
         }
-        if (result == 0 && lck->lls_unuse_race) {
-                lck->lls_unuse_race = 0;
+
+        if (result == 0 && lck->lls_cancel_race) {
+                lck->lls_cancel_race = 0;
                 result = -ESTALE;
         }
         cl_lock_closure_fini(closure);
@@ -721,7 +725,7 @@ static int lov_lock_use(const struct lu_env *env,
         int                     result;
         int                     i;
 
-        LASSERT(slice->cls_lock->cll_state == CLS_CACHED);
+        LASSERT(slice->cls_lock->cll_state == CLS_INTRANSIT);
         ENTRY;
 
         for (result = 0, i = 0; i < lck->lls_nr; ++i) {
@@ -731,37 +735,48 @@ static int lov_lock_use(const struct lu_env *env,
                 struct lov_lock_sub    *lls;
                 struct lov_sublock_env *subenv;
 
-                if (slice->cls_lock->cll_state != CLS_CACHED) {
-                        /* see comment in lov_lock_enqueue(). */
-                        LASSERT(i > 0 && result != 0);
-                        break;
-                }
-                /*
-                 * if a sub-lock was destroyed while top-lock was in
-                 * CLS_CACHED state, top-lock would have been moved into
-                 * CLS_NEW state, so all sub-locks have to be in place.
-                 */
+                LASSERT(slice->cls_lock->cll_state == CLS_INTRANSIT);
+
                 lls = &lck->lls_sub[i];
                 sub = lls->sub_lock;
-                LASSERT(sub != NULL);
+                if (sub == NULL) {
+                        /*
+                         * Sub-lock might have been canceled, while top-lock was
+                         * cached.
+                         */
+                        result = -ESTALE;
+                        break;
+                }
+
                 sublock = sub->lss_cl.cls_lock;
                 rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
                 if (rc == 0) {
                         LASSERT(sublock->cll_state != CLS_FREEING);
                         lov_sublock_hold(env, lck, i);
                         if (sublock->cll_state == CLS_CACHED) {
-                                rc = cl_use_try(subenv->lse_env, sublock);
+                                rc = cl_use_try(subenv->lse_env, sublock, 0);
                                 if (rc != 0)
                                         rc = lov_sublock_release(env, lck,
                                                                  i, 1, rc);
-                        } else
-                                rc = 0;
+                        }
                         lov_sublock_unlock(env, sub, closure, subenv);
                 }
                 result = lov_subresult(result, rc);
                 if (result != 0)
                         break;
         }
+
+        if (lck->lls_cancel_race) {
+                /*
+                 * If there is unlocking happened at the same time, then
+                 * sublock_lock state should be FREEING, and lov_sublock_lock
+                 * should return CLO_REPEAT. In this case, it should return
+                 * ESTALE, and up layer should reset the lock state to be NEW.
+                 */
+                lck->lls_cancel_race = 0;
+                LASSERT(result != 0);
+                result = -ESTALE;
+        }
         cl_lock_closure_fini(closure);
         RETURN(result);
 }
@@ -984,7 +999,7 @@ static void lov_lock_delete(const struct lu_env *env,
                 sublock = lsl->lss_cl.cls_lock;
                 rc = lov_sublock_lock(env, lck, lls, closure, NULL);
                 if (rc == 0) {
-                        if (lck->lls_sub[i].sub_flags & LSF_HELD)
+                        if (lls->sub_flags & LSF_HELD)
                                 lov_sublock_release(env, lck, i, 1, 0);
                         if (sublock->cll_state < CLS_FREEING) {
                                 struct lov_lock_link *link;
index c97cb35..e4ff065 100644 (file)
@@ -344,7 +344,7 @@ static int lovsub_lock_delete_one(const struct lu_env *env,
         case CLS_FREEING:
                 cl_lock_signal(env, parent);
                 break;
-        case CLS_UNLOCKING:
+        case CLS_INTRANSIT:
                 /*
                  * Here lies a problem: a sub-lock is canceled while top-lock
                  * is being unlocked. Top-lock cannot be moved into CLS_NEW
@@ -356,13 +356,14 @@ static int lovsub_lock_delete_one(const struct lu_env *env,
                  * to be reused immediately). Nor can we wait for top-lock
                  * state to change, because this can be synchronous to the
                  * current thread.
-                         *
+                 *
                  * We know for sure that lov_lock_unuse() will be called at
                  * least one more time to finish un-using, so leave a mark on
                  * the top-lock, that will be seen by the next call to
                  * lov_lock_unuse().
                  */
-                lov->lls_unuse_race = 1;
+                if (cl_lock_is_intransit(parent))
+                        lov->lls_cancel_race = 1;
                 break;
         case CLS_CACHED:
                 /*
index 5316965..5b9ca93 100644 (file)
@@ -399,6 +399,57 @@ static struct cl_lock *cl_lock_alloc(const struct lu_env *env,
 }
 
 /**
+ * Transfer the lock into INTRANSIT state and return the original state.
+ *
+ * \pre  state: CLS_CACHED, CLS_HELD or CLS_ENQUEUED
+ * \post state: CLS_INTRANSIT
+ * \see CLS_INTRANSIT
+ */
+enum cl_lock_state cl_lock_intransit(const struct lu_env *env,
+                                     struct cl_lock *lock)
+{
+        enum cl_lock_state state = lock->cll_state;
+
+        LASSERT(cl_lock_is_mutexed(lock));
+        LASSERT(state != CLS_INTRANSIT);
+        LASSERTF(state >= CLS_ENQUEUED && state <= CLS_CACHED,
+                 "Malformed lock state %d.\n", state);
+
+        cl_lock_state_set(env, lock, CLS_INTRANSIT);
+        lock->cll_intransit_owner = cfs_current();
+        cl_lock_hold_add(env, lock, "intransit", cfs_current());
+        return state;
+}
+EXPORT_SYMBOL(cl_lock_intransit);
+
+/**
+ *  Exit the intransit state and restore the lock state to the original state
+ */
+void cl_lock_extransit(const struct lu_env *env, struct cl_lock *lock,
+                       enum cl_lock_state state)
+{
+        LASSERT(cl_lock_is_mutexed(lock));
+        LASSERT(lock->cll_state == CLS_INTRANSIT);
+        LASSERT(state != CLS_INTRANSIT);
+        LASSERT(lock->cll_intransit_owner == cfs_current());
+
+        lock->cll_intransit_owner = NULL;
+        cl_lock_state_set(env, lock, state);
+        cl_lock_unhold(env, lock, "intransit", cfs_current());
+}
+EXPORT_SYMBOL(cl_lock_extransit);
+
+/**
+ * Checking whether the lock is intransit state
+ */
+int cl_lock_is_intransit(struct cl_lock *lock)
+{
+        LASSERT(cl_lock_is_mutexed(lock));
+        return lock->cll_state == CLS_INTRANSIT &&
+               lock->cll_intransit_owner != cfs_current();
+}
+EXPORT_SYMBOL(cl_lock_is_intransit);
+/**
  * Returns true iff lock is "suitable" for given io. E.g., locks acquired by
  * truncate and O_APPEND cannot be reused for read/non-append-write, as they
  * cover multiple stripes and can trigger cascading timeouts.
@@ -524,6 +575,7 @@ struct cl_lock *cl_lock_peek(const struct lu_env *env, const struct cl_io *io,
         struct cl_object_header *head;
         struct cl_object        *obj;
         struct cl_lock          *lock;
+        int ok;
 
         obj  = need->cld_obj;
         head = cl_object_header(obj);
@@ -532,24 +584,30 @@ struct cl_lock *cl_lock_peek(const struct lu_env *env, const struct cl_io *io,
         lock = cl_lock_lookup(env, obj, io, need);
         spin_unlock(&head->coh_lock_guard);
 
-        if (lock != NULL) {
-                int ok;
+        if (lock == NULL)
+                return NULL;
 
-                cl_lock_mutex_get(env, lock);
-                if (lock->cll_state == CLS_CACHED)
-                        cl_use_try(env, lock);
-                ok = lock->cll_state == CLS_HELD;
-                if (ok) {
-                        cl_lock_hold_add(env, lock, scope, source);
-                        cl_lock_user_add(env, lock);
-                        cl_lock_put(env, lock);
-                }
-                cl_lock_mutex_put(env, lock);
-                if (!ok) {
-                        cl_lock_put(env, lock);
-                        lock = NULL;
-                }
+        cl_lock_mutex_get(env, lock);
+        if (lock->cll_state == CLS_INTRANSIT)
+                cl_lock_state_wait(env, lock); /* Don't care return value. */
+        if (lock->cll_state == CLS_CACHED) {
+                int result;
+                result = cl_use_try(env, lock, 1);
+                if (result < 0)
+                        cl_lock_error(env, lock, result);
         }
+        ok = lock->cll_state == CLS_HELD;
+        if (ok) {
+                cl_lock_hold_add(env, lock, scope, source);
+                cl_lock_user_add(env, lock);
+                cl_lock_put(env, lock);
+        }
+        cl_lock_mutex_put(env, lock);
+        if (!ok) {
+                cl_lock_put(env, lock);
+                lock = NULL;
+        }
+
         return lock;
 }
 EXPORT_SYMBOL(cl_lock_peek);
@@ -666,7 +724,7 @@ int cl_lock_mutex_try(const struct lu_env *env, struct cl_lock *lock)
 EXPORT_SYMBOL(cl_lock_mutex_try);
 
 /**
- * Unlocks cl_lock object.
{* Unlocks cl_lock object.
  *
  * \pre cl_lock_is_mutexed(lock)
  *
@@ -885,7 +943,7 @@ int cl_lock_state_wait(const struct lu_env *env, struct cl_lock *lock)
         LASSERT(lock->cll_state != CLS_FREEING); /* too late to wait */
 
         result = lock->cll_error;
-        if (result == 0 && !(lock->cll_flags & CLF_STATE)) {
+        if (result == 0) {
                 cfs_waitlink_init(&waiter);
                 cfs_waitq_add(&lock->cll_wq, &waiter);
                 set_current_state(CFS_TASK_INTERRUPTIBLE);
@@ -899,7 +957,6 @@ int cl_lock_state_wait(const struct lu_env *env, struct cl_lock *lock)
                 cfs_waitq_del(&lock->cll_wq, &waiter);
                 result = cfs_signal_pending() ? -EINTR : 0;
         }
-        lock->cll_flags &= ~CLF_STATE;
         RETURN(result);
 }
 EXPORT_SYMBOL(cl_lock_state_wait);
@@ -916,7 +973,6 @@ static void cl_lock_state_signal(const struct lu_env *env, struct cl_lock *lock,
         list_for_each_entry(slice, &lock->cll_layers, cls_linkage)
                 if (slice->cls_ops->clo_state != NULL)
                         slice->cls_ops->clo_state(env, slice, state);
-        lock->cll_flags |= CLF_STATE;
         cfs_waitq_broadcast(&lock->cll_wq);
         EXIT;
 }
@@ -955,9 +1011,10 @@ void cl_lock_state_set(const struct lu_env *env, struct cl_lock *lock,
         LASSERT(lock->cll_state <= state ||
                 (lock->cll_state == CLS_CACHED &&
                  (state == CLS_HELD || /* lock found in cache */
-                  state == CLS_NEW     /* sub-lock canceled */)) ||
-                /* sub-lock canceled during unlocking */
-                (lock->cll_state == CLS_UNLOCKING && state == CLS_NEW));
+                  state == CLS_NEW  ||   /* sub-lock canceled */
+                  state == CLS_INTRANSIT)) ||
+                /* lock is in transit state */
+                lock->cll_state == CLS_INTRANSIT);
 
         if (lock->cll_state != state) {
                 atomic_dec(&site->cs_locks_state[lock->cll_state]);
@@ -970,17 +1027,54 @@ void cl_lock_state_set(const struct lu_env *env, struct cl_lock *lock,
 }
 EXPORT_SYMBOL(cl_lock_state_set);
 
+static int cl_unuse_try_internal(const struct lu_env *env, struct cl_lock *lock)
+{
+        const struct cl_lock_slice *slice;
+        int result;
+
+        do {
+                result = 0;
+
+                if (lock->cll_error != 0)
+                        break;
+
+                LINVRNT(cl_lock_is_mutexed(lock));
+                LINVRNT(cl_lock_invariant(env, lock));
+                LASSERT(lock->cll_state == CLS_INTRANSIT);
+                LASSERT(lock->cll_users > 0);
+                LASSERT(lock->cll_holds > 0);
+
+                result = -ENOSYS;
+                list_for_each_entry_reverse(slice, &lock->cll_layers,
+                                            cls_linkage) {
+                        if (slice->cls_ops->clo_unuse != NULL) {
+                                result = slice->cls_ops->clo_unuse(env, slice);
+                                if (result != 0)
+                                        break;
+                        }
+                }
+                LASSERT(result != -ENOSYS);
+        } while (result == CLO_REPEAT);
+
+        return result ?: lock->cll_error;
+}
+
 /**
  * Yanks lock from the cache (cl_lock_state::CLS_CACHED state) by calling
  * cl_lock_operations::clo_use() top-to-bottom to notify layers.
+ * @atomic = 1, it must unuse the lock to recovery the lock to keep the
+ *  use process atomic
  */
-int cl_use_try(const struct lu_env *env, struct cl_lock *lock)
+int cl_use_try(const struct lu_env *env, struct cl_lock *lock, int atomic)
 {
-        int result;
         const struct cl_lock_slice *slice;
+        int result;
+        enum cl_lock_state state;
 
         ENTRY;
         result = -ENOSYS;
+
+        state = cl_lock_intransit(env, lock);
         list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
                 if (slice->cls_ops->clo_use != NULL) {
                         result = slice->cls_ops->clo_use(env, slice);
@@ -989,8 +1083,43 @@ int cl_use_try(const struct lu_env *env, struct cl_lock *lock)
                 }
         }
         LASSERT(result != -ENOSYS);
-        if (result == 0)
-                cl_lock_state_set(env, lock, CLS_HELD);
+
+        LASSERT(lock->cll_state == CLS_INTRANSIT);
+
+        if (result == 0) {
+                state = CLS_HELD;
+        } else {
+                if (result == -ESTALE) {
+                        /*
+                         * ESTALE means sublock being cancelled
+                         * at this time, and set lock state to
+                         * be NEW here and ask the caller to repeat.
+                         */
+                        state = CLS_NEW;
+                        result = CLO_REPEAT;
+                }
+
+                /* @atomic means back-off-on-failure. */
+                if (atomic) {
+                        int rc;
+
+                        do {
+                                rc = cl_unuse_try_internal(env, lock);
+                                if (rc == 0)
+                                        break;
+                                if (rc == CLO_WAIT)
+                                        rc = cl_lock_state_wait(env, lock);
+                                if (rc < 0)
+                                        break;
+                        } while(1);
+
+                        /* Vet the results. */
+                        if (rc < 0 && result > 0)
+                                result = rc;
+                }
+
+        }
+        cl_lock_extransit(env, lock, state);
         RETURN(result);
 }
 EXPORT_SYMBOL(cl_use_try);
@@ -1056,14 +1185,13 @@ int cl_enqueue_try(const struct lu_env *env, struct cl_lock *lock,
                         if (result == 0)
                                 cl_lock_state_set(env, lock, CLS_ENQUEUED);
                         break;
-                case CLS_UNLOCKING:
-                        /* wait until unlocking finishes, and enqueue lock
-                         * afresh. */
+                case CLS_INTRANSIT:
+                        LASSERT(cl_lock_is_intransit(lock));
                         result = CLO_WAIT;
                         break;
                 case CLS_CACHED:
                         /* yank lock from the cache. */
-                        result = cl_use_try(env, lock);
+                        result = cl_use_try(env, lock, 0);
                         break;
                 case CLS_ENQUEUED:
                 case CLS_HELD:
@@ -1150,7 +1278,7 @@ EXPORT_SYMBOL(cl_enqueue);
  * This function is called repeatedly by cl_unuse() until either lock is
  * unlocked, or error occurs.
  *
- * \pre  lock->cll_state <= CLS_HELD || lock->cll_state == CLS_UNLOCKING
+ * \pre  lock->cll_state <= CLS_HELD || cl_lock_is_intransit(lock)
  *
  * \post ergo(result == 0, lock->cll_state == CLS_CACHED)
  *
@@ -1159,11 +1287,11 @@ EXPORT_SYMBOL(cl_enqueue);
  */
 int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock)
 {
-        const struct cl_lock_slice *slice;
         int                         result;
+        enum cl_lock_state          state = CLS_NEW;
 
         ENTRY;
-        if (lock->cll_state != CLS_UNLOCKING) {
+        if (lock->cll_state != CLS_INTRANSIT) {
                 if (lock->cll_users > 1) {
                         cl_lock_user_del(env, lock);
                         RETURN(0);
@@ -1174,31 +1302,11 @@ int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock)
                  * CLS_CACHED, is reinitialized to CLS_NEW or fails into
                  * CLS_FREEING.
                  */
-                cl_lock_state_set(env, lock, CLS_UNLOCKING);
+                state = cl_lock_intransit(env, lock);
         }
-        do {
-                result = 0;
-
-                if (lock->cll_error != 0)
-                        break;
-
-                LINVRNT(cl_lock_is_mutexed(lock));
-                LINVRNT(cl_lock_invariant(env, lock));
-                LASSERT(lock->cll_state == CLS_UNLOCKING);
-                LASSERT(lock->cll_users > 0);
-                LASSERT(lock->cll_holds > 0);
 
-                result = -ENOSYS;
-                list_for_each_entry_reverse(slice, &lock->cll_layers,
-                                            cls_linkage) {
-                        if (slice->cls_ops->clo_unuse != NULL) {
-                                result = slice->cls_ops->clo_unuse(env, slice);
-                                if (result != 0)
-                                        break;
-                        }
-                }
-                LASSERT(result != -ENOSYS);
-        } while (result == CLO_REPEAT);
+        result = cl_unuse_try_internal(env, lock);
+        LASSERT(lock->cll_state == CLS_INTRANSIT);
         if (result != CLO_WAIT)
                 /*
                  * Once there is no more need to iterate ->clo_unuse() calls,
@@ -1208,8 +1316,6 @@ int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock)
                  */
                 cl_lock_user_del(env, lock);
         if (result == 0 || result == -ESTALE) {
-                enum cl_lock_state state;
-
                 /*
                  * Return lock back to the cache. This is the only
                  * place where lock is moved into CLS_CACHED state.
@@ -1220,7 +1326,7 @@ int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock)
                  * canceled while unlocking was in progress.
                  */
                 state = result == 0 ? CLS_CACHED : CLS_NEW;
-                cl_lock_state_set(env, lock, state);
+                cl_lock_extransit(env, lock, state);
 
                 /*
                  * Hide -ESTALE error.
@@ -1232,7 +1338,11 @@ int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock)
                  * pages won't be written to OSTs. -jay
                  */
                 result = 0;
+        } else {
+                CWARN("result = %d, this is unlikely!\n", result);
+                cl_lock_extransit(env, lock, state);
         }
+
         result = result ?: lock->cll_error;
         if (result < 0)
                 cl_lock_error(env, lock, result);
@@ -1292,13 +1402,20 @@ int cl_wait_try(const struct lu_env *env, struct cl_lock *lock)
                 LINVRNT(cl_lock_is_mutexed(lock));
                 LINVRNT(cl_lock_invariant(env, lock));
                 LASSERT(lock->cll_state == CLS_ENQUEUED ||
-                        lock->cll_state == CLS_HELD);
+                        lock->cll_state == CLS_HELD ||
+                        lock->cll_state == CLS_INTRANSIT);
                 LASSERT(lock->cll_users > 0);
                 LASSERT(lock->cll_holds > 0);
 
                 result = 0;
                 if (lock->cll_error != 0)
                         break;
+
+                if (cl_lock_is_intransit(lock)) {
+                        result = CLO_WAIT;
+                        break;
+                }
+
                 if (lock->cll_state == CLS_HELD)
                         /* nothing to do */
                         break;
index 7ca27c6..4838a0a 100644 (file)
@@ -480,7 +480,7 @@ int cl_site_stats_print(const struct cl_site *site, char *page, int count)
                 [CLS_QUEUING]   = "q",
                 [CLS_ENQUEUED]  = "e",
                 [CLS_HELD]      = "h",
-                [CLS_UNLOCKING] = "u",
+                [CLS_INTRANSIT] = "t",
                 [CLS_CACHED]    = "c",
                 [CLS_FREEING]   = "f"
         };
index 2cc82bc..6ca2014 100644 (file)
@@ -1361,7 +1361,7 @@ static int osc_lock_use(const struct lu_env *env,
                  * cl_lock mutex.
                  */
                 lock = slice->cls_lock;
-                LASSERT(lock->cll_state == CLS_CACHED);
+                LASSERT(lock->cll_state == CLS_INTRANSIT);
                 LASSERT(lock->cll_users > 0);
                 /* set a flag for osc_dlm_blocking_ast0() to signal the
                  * lock.*/