Whamcloud - gitweb
Branch:HEAD
[fs/lustre-release.git] / lustre / obdclass / cl_lock.c
index 6f6251d..f9a1925 100644 (file)
@@ -286,7 +286,7 @@ void cl_lock_put(const struct lu_env *env, struct cl_lock *lock)
         head = cl_object_header(obj);
         site = cl_object_site(obj);
 
-        CDEBUG(D_DLMTRACE, "releasing reference: %d %p %lu\n",
+        CDEBUG(D_TRACE, "releasing reference: %d %p %lu\n",
                atomic_read(&lock->cll_ref), lock, RETIP);
 
         if (atomic_dec_and_test(&lock->cll_ref)) {
@@ -311,7 +311,7 @@ EXPORT_SYMBOL(cl_lock_put);
 void cl_lock_get(struct cl_lock *lock)
 {
         LINVRNT(cl_lock_invariant(NULL, lock));
-        CDEBUG(D_DLMTRACE|D_TRACE, "acquiring reference: %d %p %lu\n",
+        CDEBUG(D_TRACE, "acquiring reference: %d %p %lu\n",
                atomic_read(&lock->cll_ref), lock, RETIP);
         atomic_inc(&lock->cll_ref);
 }
@@ -331,7 +331,7 @@ void cl_lock_get_trust(struct cl_lock *lock)
         struct cl_site *site = cl_object_site(lock->cll_descr.cld_obj);
 
         LASSERT(cl_is_lock(lock));
-        CDEBUG(D_DLMTRACE|D_TRACE, "acquiring trusted reference: %d %p %lu\n",
+        CDEBUG(D_TRACE, "acquiring trusted reference: %d %p %lu\n",
                atomic_read(&lock->cll_ref), lock, RETIP);
         if (atomic_inc_return(&lock->cll_ref) == 1)
                 atomic_inc(&site->cs_locks.cs_busy);
@@ -399,6 +399,57 @@ static struct cl_lock *cl_lock_alloc(const struct lu_env *env,
 }
 
 /**
+ * Transfer the lock into INTRANSIT state and return the original state.
+ *
+ * \pre  state: CLS_CACHED, CLS_HELD or CLS_ENQUEUED
+ * \post state: CLS_INTRANSIT
+ * \see CLS_INTRANSIT
+ */
+enum cl_lock_state cl_lock_intransit(const struct lu_env *env,
+                                     struct cl_lock *lock)
+{
+        enum cl_lock_state state = lock->cll_state;
+
+        LASSERT(cl_lock_is_mutexed(lock));
+        LASSERT(state != CLS_INTRANSIT);
+        LASSERTF(state >= CLS_ENQUEUED && state <= CLS_CACHED,
+                 "Malformed lock state %d.\n", state);
+
+        cl_lock_state_set(env, lock, CLS_INTRANSIT);
+        lock->cll_intransit_owner = cfs_current();
+        cl_lock_hold_add(env, lock, "intransit", cfs_current());
+        return state;
+}
+EXPORT_SYMBOL(cl_lock_intransit);
+
+/**
+ *  Exit the intransit state and restore the lock state to the original state
+ */
+void cl_lock_extransit(const struct lu_env *env, struct cl_lock *lock,
+                       enum cl_lock_state state)
+{
+        LASSERT(cl_lock_is_mutexed(lock));
+        LASSERT(lock->cll_state == CLS_INTRANSIT);
+        LASSERT(state != CLS_INTRANSIT);
+        LASSERT(lock->cll_intransit_owner == cfs_current());
+
+        lock->cll_intransit_owner = NULL;
+        cl_lock_state_set(env, lock, state);
+        cl_lock_unhold(env, lock, "intransit", cfs_current());
+}
+EXPORT_SYMBOL(cl_lock_extransit);
+
+/**
+ * Checking whether the lock is intransit state
+ */
+int cl_lock_is_intransit(struct cl_lock *lock)
+{
+        LASSERT(cl_lock_is_mutexed(lock));
+        return lock->cll_state == CLS_INTRANSIT &&
+               lock->cll_intransit_owner != cfs_current();
+}
+EXPORT_SYMBOL(cl_lock_is_intransit);
+/**
  * Returns true iff lock is "suitable" for given io. E.g., locks acquired by
  * truncate and O_APPEND cannot be reused for read/non-append-write, as they
  * cover multiple stripes and can trigger cascading timeouts.
@@ -524,6 +575,7 @@ struct cl_lock *cl_lock_peek(const struct lu_env *env, const struct cl_io *io,
         struct cl_object_header *head;
         struct cl_object        *obj;
         struct cl_lock          *lock;
+        int ok;
 
         obj  = need->cld_obj;
         head = cl_object_header(obj);
@@ -532,23 +584,30 @@ struct cl_lock *cl_lock_peek(const struct lu_env *env, const struct cl_io *io,
         lock = cl_lock_lookup(env, obj, io, need);
         spin_unlock(&head->coh_lock_guard);
 
-        if (lock != NULL) {
-                int ok;
+        if (lock == NULL)
+                return NULL;
 
-                cl_lock_mutex_get(env, lock);
-                if (lock->cll_state == CLS_CACHED)
-                        cl_use_try(env, lock);
-                ok = lock->cll_state == CLS_HELD;
-                if (ok) {
-                        cl_lock_hold_add(env, lock, scope, source);
-                        cl_lock_user_add(env, lock);
-                }
-                cl_lock_mutex_put(env, lock);
-                if (!ok) {
-                        cl_lock_put(env, lock);
-                        lock = NULL;
-                }
+        cl_lock_mutex_get(env, lock);
+        if (lock->cll_state == CLS_INTRANSIT)
+                cl_lock_state_wait(env, lock); /* Don't care return value. */
+        if (lock->cll_state == CLS_CACHED) {
+                int result;
+                result = cl_use_try(env, lock, 1);
+                if (result < 0)
+                        cl_lock_error(env, lock, result);
+        }
+        ok = lock->cll_state == CLS_HELD;
+        if (ok) {
+                cl_lock_hold_add(env, lock, scope, source);
+                cl_lock_user_add(env, lock);
+                cl_lock_put(env, lock);
         }
+        cl_lock_mutex_put(env, lock);
+        if (!ok) {
+                cl_lock_put(env, lock);
+                lock = NULL;
+        }
+
         return lock;
 }
 EXPORT_SYMBOL(cl_lock_peek);
@@ -665,7 +724,7 @@ int cl_lock_mutex_try(const struct lu_env *env, struct cl_lock *lock)
 EXPORT_SYMBOL(cl_lock_mutex_try);
 
 /**
- * Unlocks cl_lock object.
{* Unlocks cl_lock object.
  *
  * \pre cl_lock_is_mutexed(lock)
  *
@@ -757,13 +816,7 @@ static void cl_lock_delete0(const struct lu_env *env, struct cl_lock *lock)
 
                 spin_lock(&head->coh_lock_guard);
                 list_del_init(&lock->cll_linkage);
-                /*
-                 * No locks, no pages. This is only valid for bottom sub-locks
-                 * and head->coh_nesting == 1 check assumes two level top-sub
-                 * hierarchy.
-                 */
-                LASSERT(ergo(head->coh_nesting == 1 &&
-                             list_empty(&head->coh_locks), !head->coh_pages));
+
                 spin_unlock(&head->coh_lock_guard);
                 /*
                  * From now on, no new references to this lock can be acquired
@@ -890,7 +943,7 @@ int cl_lock_state_wait(const struct lu_env *env, struct cl_lock *lock)
         LASSERT(lock->cll_state != CLS_FREEING); /* too late to wait */
 
         result = lock->cll_error;
-        if (result == 0 && !(lock->cll_flags & CLF_STATE)) {
+        if (result == 0) {
                 cfs_waitlink_init(&waiter);
                 cfs_waitq_add(&lock->cll_wq, &waiter);
                 set_current_state(CFS_TASK_INTERRUPTIBLE);
@@ -904,7 +957,6 @@ int cl_lock_state_wait(const struct lu_env *env, struct cl_lock *lock)
                 cfs_waitq_del(&lock->cll_wq, &waiter);
                 result = cfs_signal_pending() ? -EINTR : 0;
         }
-        lock->cll_flags &= ~CLF_STATE;
         RETURN(result);
 }
 EXPORT_SYMBOL(cl_lock_state_wait);
@@ -921,7 +973,6 @@ static void cl_lock_state_signal(const struct lu_env *env, struct cl_lock *lock,
         list_for_each_entry(slice, &lock->cll_layers, cls_linkage)
                 if (slice->cls_ops->clo_state != NULL)
                         slice->cls_ops->clo_state(env, slice, state);
-        lock->cll_flags |= CLF_STATE;
         cfs_waitq_broadcast(&lock->cll_wq);
         EXIT;
 }
@@ -960,9 +1011,10 @@ void cl_lock_state_set(const struct lu_env *env, struct cl_lock *lock,
         LASSERT(lock->cll_state <= state ||
                 (lock->cll_state == CLS_CACHED &&
                  (state == CLS_HELD || /* lock found in cache */
-                  state == CLS_NEW     /* sub-lock canceled */)) ||
-                /* sub-lock canceled during unlocking */
-                (lock->cll_state == CLS_UNLOCKING && state == CLS_NEW));
+                  state == CLS_NEW  ||   /* sub-lock canceled */
+                  state == CLS_INTRANSIT)) ||
+                /* lock is in transit state */
+                lock->cll_state == CLS_INTRANSIT);
 
         if (lock->cll_state != state) {
                 atomic_dec(&site->cs_locks_state[lock->cll_state]);
@@ -975,17 +1027,54 @@ void cl_lock_state_set(const struct lu_env *env, struct cl_lock *lock,
 }
 EXPORT_SYMBOL(cl_lock_state_set);
 
+static int cl_unuse_try_internal(const struct lu_env *env, struct cl_lock *lock)
+{
+        const struct cl_lock_slice *slice;
+        int result;
+
+        do {
+                result = 0;
+
+                if (lock->cll_error != 0)
+                        break;
+
+                LINVRNT(cl_lock_is_mutexed(lock));
+                LINVRNT(cl_lock_invariant(env, lock));
+                LASSERT(lock->cll_state == CLS_INTRANSIT);
+                LASSERT(lock->cll_users > 0);
+                LASSERT(lock->cll_holds > 0);
+
+                result = -ENOSYS;
+                list_for_each_entry_reverse(slice, &lock->cll_layers,
+                                            cls_linkage) {
+                        if (slice->cls_ops->clo_unuse != NULL) {
+                                result = slice->cls_ops->clo_unuse(env, slice);
+                                if (result != 0)
+                                        break;
+                        }
+                }
+                LASSERT(result != -ENOSYS);
+        } while (result == CLO_REPEAT);
+
+        return result ?: lock->cll_error;
+}
+
 /**
  * Yanks lock from the cache (cl_lock_state::CLS_CACHED state) by calling
  * cl_lock_operations::clo_use() top-to-bottom to notify layers.
+ * @atomic = 1, it must unuse the lock to recovery the lock to keep the
+ *  use process atomic
  */
-int cl_use_try(const struct lu_env *env, struct cl_lock *lock)
+int cl_use_try(const struct lu_env *env, struct cl_lock *lock, int atomic)
 {
-        int result;
         const struct cl_lock_slice *slice;
+        int result;
+        enum cl_lock_state state;
 
         ENTRY;
         result = -ENOSYS;
+
+        state = cl_lock_intransit(env, lock);
         list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
                 if (slice->cls_ops->clo_use != NULL) {
                         result = slice->cls_ops->clo_use(env, slice);
@@ -994,8 +1083,43 @@ int cl_use_try(const struct lu_env *env, struct cl_lock *lock)
                 }
         }
         LASSERT(result != -ENOSYS);
-        if (result == 0)
-                cl_lock_state_set(env, lock, CLS_HELD);
+
+        LASSERT(lock->cll_state == CLS_INTRANSIT);
+
+        if (result == 0) {
+                state = CLS_HELD;
+        } else {
+                if (result == -ESTALE) {
+                        /*
+                         * ESTALE means sublock being cancelled
+                         * at this time, and set lock state to
+                         * be NEW here and ask the caller to repeat.
+                         */
+                        state = CLS_NEW;
+                        result = CLO_REPEAT;
+                }
+
+                /* @atomic means back-off-on-failure. */
+                if (atomic) {
+                        int rc;
+
+                        do {
+                                rc = cl_unuse_try_internal(env, lock);
+                                if (rc == 0)
+                                        break;
+                                if (rc == CLO_WAIT)
+                                        rc = cl_lock_state_wait(env, lock);
+                                if (rc < 0)
+                                        break;
+                        } while(1);
+
+                        /* Vet the results. */
+                        if (rc < 0 && result > 0)
+                                result = rc;
+                }
+
+        }
+        cl_lock_extransit(env, lock, state);
         RETURN(result);
 }
 EXPORT_SYMBOL(cl_use_try);
@@ -1061,14 +1185,13 @@ int cl_enqueue_try(const struct lu_env *env, struct cl_lock *lock,
                         if (result == 0)
                                 cl_lock_state_set(env, lock, CLS_ENQUEUED);
                         break;
-                case CLS_UNLOCKING:
-                        /* wait until unlocking finishes, and enqueue lock
-                         * afresh. */
+                case CLS_INTRANSIT:
+                        LASSERT(cl_lock_is_intransit(lock));
                         result = CLO_WAIT;
                         break;
                 case CLS_CACHED:
                         /* yank lock from the cache. */
-                        result = cl_use_try(env, lock);
+                        result = cl_use_try(env, lock, 0);
                         break;
                 case CLS_ENQUEUED:
                 case CLS_HELD:
@@ -1155,7 +1278,7 @@ EXPORT_SYMBOL(cl_enqueue);
  * This function is called repeatedly by cl_unuse() until either lock is
  * unlocked, or error occurs.
  *
- * \ppre lock->cll_state <= CLS_HELD || lock->cll_state == CLS_UNLOCKING
+ * \pre  lock->cll_state <= CLS_HELD || cl_lock_is_intransit(lock)
  *
  * \post ergo(result == 0, lock->cll_state == CLS_CACHED)
  *
@@ -1164,11 +1287,11 @@ EXPORT_SYMBOL(cl_enqueue);
  */
 int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock)
 {
-        const struct cl_lock_slice *slice;
         int                         result;
+        enum cl_lock_state          state = CLS_NEW;
 
         ENTRY;
-        if (lock->cll_state != CLS_UNLOCKING) {
+        if (lock->cll_state != CLS_INTRANSIT) {
                 if (lock->cll_users > 1) {
                         cl_lock_user_del(env, lock);
                         RETURN(0);
@@ -1179,31 +1302,11 @@ int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock)
                  * CLS_CACHED, is reinitialized to CLS_NEW or fails into
                  * CLS_FREEING.
                  */
-                cl_lock_state_set(env, lock, CLS_UNLOCKING);
+                state = cl_lock_intransit(env, lock);
         }
-        do {
-                result = 0;
 
-                if (lock->cll_error != 0)
-                        break;
-
-                LINVRNT(cl_lock_is_mutexed(lock));
-                LINVRNT(cl_lock_invariant(env, lock));
-                LASSERT(lock->cll_state == CLS_UNLOCKING);
-                LASSERT(lock->cll_users > 0);
-                LASSERT(lock->cll_holds > 0);
-
-                result = -ENOSYS;
-                list_for_each_entry_reverse(slice, &lock->cll_layers,
-                                            cls_linkage) {
-                        if (slice->cls_ops->clo_unuse != NULL) {
-                                result = slice->cls_ops->clo_unuse(env, slice);
-                                if (result != 0)
-                                        break;
-                        }
-                }
-                LASSERT(result != -ENOSYS);
-        } while (result == CLO_REPEAT);
+        result = cl_unuse_try_internal(env, lock);
+        LASSERT(lock->cll_state == CLS_INTRANSIT);
         if (result != CLO_WAIT)
                 /*
                  * Once there is no more need to iterate ->clo_unuse() calls,
@@ -1213,8 +1316,6 @@ int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock)
                  */
                 cl_lock_user_del(env, lock);
         if (result == 0 || result == -ESTALE) {
-                enum cl_lock_state state;
-
                 /*
                  * Return lock back to the cache. This is the only
                  * place where lock is moved into CLS_CACHED state.
@@ -1225,7 +1326,7 @@ int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock)
                  * canceled while unlocking was in progress.
                  */
                 state = result == 0 ? CLS_CACHED : CLS_NEW;
-                cl_lock_state_set(env, lock, state);
+                cl_lock_extransit(env, lock, state);
 
                 /*
                  * Hide -ESTALE error.
@@ -1237,7 +1338,11 @@ int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock)
                  * pages won't be written to OSTs. -jay
                  */
                 result = 0;
+        } else {
+                CWARN("result = %d, this is unlikely!\n", result);
+                cl_lock_extransit(env, lock, state);
         }
+
         result = result ?: lock->cll_error;
         if (result < 0)
                 cl_lock_error(env, lock, result);
@@ -1297,13 +1402,20 @@ int cl_wait_try(const struct lu_env *env, struct cl_lock *lock)
                 LINVRNT(cl_lock_is_mutexed(lock));
                 LINVRNT(cl_lock_invariant(env, lock));
                 LASSERT(lock->cll_state == CLS_ENQUEUED ||
-                        lock->cll_state == CLS_HELD);
+                        lock->cll_state == CLS_HELD ||
+                        lock->cll_state == CLS_INTRANSIT);
                 LASSERT(lock->cll_users > 0);
                 LASSERT(lock->cll_holds > 0);
 
                 result = 0;
                 if (lock->cll_error != 0)
                         break;
+
+                if (cl_lock_is_intransit(lock)) {
+                        result = CLO_WAIT;
+                        break;
+                }
+
                 if (lock->cll_state == CLS_HELD)
                         /* nothing to do */
                         break;
@@ -1675,11 +1787,16 @@ struct cl_lock *cl_lock_at_page(const struct lu_env *env, struct cl_object *obj,
         need->cld_mode = CLM_READ; /* CLM_READ matches both READ & WRITE, but
                                     * not PHANTOM */
         need->cld_start = need->cld_end = page->cp_index;
+        need->cld_enq_flags = 0;
 
         spin_lock(&head->coh_lock_guard);
+        /* It is fine to match any group lock since there could be only one
+         * with a uniq gid and it conflicts with all other lock modes too */
         list_for_each_entry(scan, &head->coh_locks, cll_linkage) {
                 if (scan != except &&
-                    cl_lock_ext_match(&scan->cll_descr, need) &&
+                    (scan->cll_descr.cld_mode == CLM_GROUP ||
+                    cl_lock_ext_match(&scan->cll_descr, need)) &&
+                    scan->cll_state >= CLS_HELD &&
                     scan->cll_state < CLS_FREEING &&
                     /*
                      * This check is racy as the lock can be canceled right
@@ -1793,9 +1910,8 @@ int cl_lock_page_out(const struct lu_env *env, struct cl_lock *lock,
         struct cl_io          *io    = &info->clt_io;
         struct cl_2queue      *queue = &info->clt_queue;
         struct cl_lock_descr  *descr = &lock->cll_descr;
-        int                      result;
-        int                      rc0;
-        int                      rc1;
+        long page_count;
+        int result;
 
         LINVRNT(cl_lock_invariant(env, lock));
         ENTRY;
@@ -1803,24 +1919,38 @@ int cl_lock_page_out(const struct lu_env *env, struct cl_lock *lock,
         io->ci_obj = cl_object_top(descr->cld_obj);
         result = cl_io_init(env, io, CIT_MISC, io->ci_obj);
         if (result == 0) {
+                int nonblock = 1;
 
+restart:
                 cl_2queue_init(queue);
                 cl_page_gang_lookup(env, descr->cld_obj, io, descr->cld_start,
-                                    descr->cld_end, &queue->c2_qin);
-                if (queue->c2_qin.pl_nr > 0) {
+                                    descr->cld_end, &queue->c2_qin, nonblock);
+                page_count = queue->c2_qin.pl_nr;
+                if (page_count > 0) {
                         result = cl_page_list_unmap(env, io, &queue->c2_qin);
                         if (!discard) {
-                                rc0 = cl_io_submit_rw(env, io, CRT_WRITE,
-                                                      queue, CRP_CANCEL);
-                                rc1 = cl_page_list_own(env, io,
-                                                       &queue->c2_qout);
-                                result = result ?: rc0 ?: rc1;
+                                long timeout = 600; /* 10 minutes. */
+                                /* for debug purpose, if this request can't be
+                                 * finished in 10 minutes, we hope it can
+                                 * notify us.
+                                 */
+                                result = cl_io_submit_sync(env, io, CRT_WRITE,
+                                                           queue, CRP_CANCEL,
+                                                           timeout);
+                                if (result)
+                                        CWARN("Writing %lu pages error: %d\n",
+                                              page_count, result);
                         }
                         cl_lock_page_list_fixup(env, io, lock, &queue->c2_qout);
                         cl_2queue_discard(env, io, queue);
                         cl_2queue_disown(env, io, queue);
                 }
                 cl_2queue_fini(env, queue);
+
+                if (nonblock) {
+                        nonblock = 0;
+                        goto restart;
+                }
         }
         cl_io_fini(env, io);
         RETURN(result);
@@ -1942,7 +2072,6 @@ EXPORT_SYMBOL(cl_lock_hold);
  */
 struct cl_lock *cl_lock_request(const struct lu_env *env, struct cl_io *io,
                                 const struct cl_lock_descr *need,
-                                __u32 enqflags,
                                 const char *scope, const void *source)
 {
         struct cl_lock       *lock;
@@ -1950,6 +2079,7 @@ struct cl_lock *cl_lock_request(const struct lu_env *env, struct cl_io *io,
         int                   rc;
         int                   iter;
         int warn;
+        __u32                 enqflags = need->cld_enq_flags;
 
         ENTRY;
         fid = lu_object_fid(&io->ci_obj->co_lu);