Whamcloud - gitweb
Revert "b=19808 2.6.29-fc11 patchless client support"
[fs/lustre-release.git] / lustre / obdclass / cl_lock.c
index f9a1925..ce219a2 100644 (file)
@@ -127,6 +127,23 @@ static struct cl_thread_counters *cl_lock_counters(const struct lu_env *env,
         return &info->clt_counters[nesting];
 }
 
+static void cl_lock_trace0(int level, const struct lu_env *env,
+                           const char *prefix, const struct cl_lock *lock,
+                           const char *func, const int line)
+{
+        struct cl_object_header *h = cl_object_header(lock->cll_descr.cld_obj);
+        CDEBUG(level, "%s: %p@(%i %p %i %d %d %d %d %lx)"
+                      "(%p/%d/%i) at %s():%d\n",
+               prefix, lock,
+               atomic_read(&lock->cll_ref), lock->cll_guarder, lock->cll_depth,
+               lock->cll_state, lock->cll_error, lock->cll_holds,
+               lock->cll_users, lock->cll_flags,
+               env, h->coh_nesting, cl_lock_nr_mutexed(env),
+               func, line);
+}
+#define cl_lock_trace(level, env, prefix, lock)                         \
+        cl_lock_trace0(level, env, prefix, lock, __FUNCTION__, __LINE__)
+
 #define RETIP ((unsigned long)__builtin_return_address(0))
 
 #ifdef CONFIG_LOCKDEP
@@ -244,6 +261,7 @@ static void cl_lock_free(const struct lu_env *env, struct cl_lock *lock)
         LINVRNT(!cl_lock_is_mutexed(lock));
 
         ENTRY;
+        cl_lock_trace(D_DLMTRACE, env, "free lock", lock);
         might_sleep();
         while (!list_empty(&lock->cll_layers)) {
                 struct cl_lock_slice *slice;
@@ -347,6 +365,7 @@ EXPORT_SYMBOL(cl_lock_get_trust);
 static void cl_lock_finish(const struct lu_env *env, struct cl_lock *lock)
 {
         cl_lock_mutex_get(env, lock);
+        cl_lock_cancel(env, lock);
         cl_lock_delete(env, lock);
         cl_lock_mutex_put(env, lock);
         cl_lock_put(env, lock);
@@ -491,16 +510,15 @@ static struct cl_lock *cl_lock_lookup(const struct lu_env *env,
 
                 LASSERT(cl_is_lock(lock));
                 matched = cl_lock_ext_match(&lock->cll_descr, need) &&
-                        lock->cll_state < CLS_FREEING &&
-                        !(lock->cll_flags & CLF_CANCELLED) &&
-                        cl_lock_fits_into(env, lock, need, io);
+                          lock->cll_state < CLS_FREEING &&
+                          lock->cll_error == 0 &&
+                          !(lock->cll_flags & CLF_CANCELLED) &&
+                          cl_lock_fits_into(env, lock, need, io);
                 CDEBUG(D_DLMTRACE, "has: "DDESCR"(%i) need: "DDESCR": %d\n",
                        PDESCR(&lock->cll_descr), lock->cll_state, PDESCR(need),
                        matched);
                 if (matched) {
                         cl_lock_get_trust(lock);
-                        /* move the lock to the LRU head */
-                        list_move(&lock->cll_linkage, &head->coh_locks);
                         atomic_inc(&cl_object_site(obj)->cs_locks.cs_hit);
                         RETURN(lock);
                 }
@@ -545,7 +563,7 @@ static struct cl_lock *cl_lock_find(const struct lu_env *env,
                         spin_lock(&head->coh_lock_guard);
                         ghost = cl_lock_lookup(env, obj, io, need);
                         if (ghost == NULL) {
-                                list_add(&lock->cll_linkage, &head->coh_locks);
+                                list_add_tail(&lock->cll_linkage, &head->coh_locks);
                                 spin_unlock(&head->coh_lock_guard);
                                 atomic_inc(&site->cs_locks.cs_busy);
                         } else {
@@ -634,14 +652,6 @@ const struct cl_lock_slice *cl_lock_at(const struct cl_lock *lock,
 }
 EXPORT_SYMBOL(cl_lock_at);
 
-static void cl_lock_trace(struct cl_thread_counters *counters,
-                          const char *prefix, const struct cl_lock *lock)
-{
-        CDEBUG(D_DLMTRACE|D_TRACE, "%s: %i@%p %p %i %i\n", prefix,
-               atomic_read(&lock->cll_ref), lock, lock->cll_guarder,
-               lock->cll_depth, counters->ctc_nr_locks_locked);
-}
-
 static void cl_lock_mutex_tail(const struct lu_env *env, struct cl_lock *lock)
 {
         struct cl_thread_counters *counters;
@@ -650,7 +660,7 @@ static void cl_lock_mutex_tail(const struct lu_env *env, struct cl_lock *lock)
         lock->cll_depth++;
         counters->ctc_nr_locks_locked++;
         lu_ref_add(&counters->ctc_locks_locked, "cll_guard", lock);
-        cl_lock_trace(counters, "got mutex", lock);
+        cl_lock_trace(D_TRACE, env, "got mutex", lock);
 }
 
 /**
@@ -742,7 +752,7 @@ void cl_lock_mutex_put(const struct lu_env *env, struct cl_lock *lock)
         counters = cl_lock_counters(env, lock);
         LINVRNT(counters->ctc_nr_locks_locked > 0);
 
-        cl_lock_trace(counters, "put mutex", lock);
+        cl_lock_trace(D_TRACE, env, "put mutex", lock);
         lu_ref_del(&counters->ctc_locks_locked, "cll_guard", lock);
         counters->ctc_nr_locks_locked--;
         if (--lock->cll_depth == 0) {
@@ -810,6 +820,7 @@ static void cl_lock_delete0(const struct lu_env *env, struct cl_lock *lock)
 
         ENTRY;
         if (lock->cll_state < CLS_FREEING) {
+                LASSERT(lock->cll_state != CLS_INTRANSIT);
                 cl_lock_state_set(env, lock, CLS_FREEING);
 
                 head = cl_object_header(lock->cll_descr.cld_obj);
@@ -888,6 +899,7 @@ static void cl_lock_hold_release(const struct lu_env *env, struct cl_lock *lock,
         LASSERT(lock->cll_holds > 0);
 
         ENTRY;
+        cl_lock_trace(D_DLMTRACE, env, "hold release lock", lock);
         lu_ref_del(&lock->cll_holders, scope, source);
         cl_lock_hold_mod(env, lock, -1);
         if (lock->cll_holds == 0) {
@@ -942,6 +954,7 @@ int cl_lock_state_wait(const struct lu_env *env, struct cl_lock *lock)
         LASSERT(lock->cll_depth == 1);
         LASSERT(lock->cll_state != CLS_FREEING); /* too late to wait */
 
+        cl_lock_trace(D_DLMTRACE, env, "state wait lock", lock);
         result = lock->cll_error;
         if (result == 0) {
                 cfs_waitlink_init(&waiter);
@@ -987,6 +1000,7 @@ static void cl_lock_state_signal(const struct lu_env *env, struct cl_lock *lock,
 void cl_lock_signal(const struct lu_env *env, struct cl_lock *lock)
 {
         ENTRY;
+        cl_lock_trace(D_DLMTRACE, env, "state signal lock", lock);
         cl_lock_state_signal(env, lock, lock->cll_state);
         EXIT;
 }
@@ -1035,14 +1049,9 @@ static int cl_unuse_try_internal(const struct lu_env *env, struct cl_lock *lock)
         do {
                 result = 0;
 
-                if (lock->cll_error != 0)
-                        break;
-
                 LINVRNT(cl_lock_is_mutexed(lock));
                 LINVRNT(cl_lock_invariant(env, lock));
                 LASSERT(lock->cll_state == CLS_INTRANSIT);
-                LASSERT(lock->cll_users > 0);
-                LASSERT(lock->cll_holds > 0);
 
                 result = -ENOSYS;
                 list_for_each_entry_reverse(slice, &lock->cll_layers,
@@ -1056,7 +1065,7 @@ static int cl_unuse_try_internal(const struct lu_env *env, struct cl_lock *lock)
                 LASSERT(result != -ENOSYS);
         } while (result == CLO_REPEAT);
 
-        return result ?: lock->cll_error;
+        return result;
 }
 
 /**
@@ -1072,8 +1081,13 @@ int cl_use_try(const struct lu_env *env, struct cl_lock *lock, int atomic)
         enum cl_lock_state state;
 
         ENTRY;
-        result = -ENOSYS;
+        cl_lock_trace(D_DLMTRACE, env, "use lock", lock);
+
+        LASSERT(lock->cll_state == CLS_CACHED);
+        if (lock->cll_error)
+                RETURN(lock->cll_error);
 
+        result = -ENOSYS;
         state = cl_lock_intransit(env, lock);
         list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
                 if (slice->cls_ops->clo_use != NULL) {
@@ -1084,7 +1098,8 @@ int cl_use_try(const struct lu_env *env, struct cl_lock *lock, int atomic)
         }
         LASSERT(result != -ENOSYS);
 
-        LASSERT(lock->cll_state == CLS_INTRANSIT);
+        LASSERTF(lock->cll_state == CLS_INTRANSIT, "Wrong state %d.\n",
+                 lock->cll_state);
 
         if (result == 0) {
                 state = CLS_HELD;
@@ -1102,17 +1117,7 @@ int cl_use_try(const struct lu_env *env, struct cl_lock *lock, int atomic)
                 /* @atomic means back-off-on-failure. */
                 if (atomic) {
                         int rc;
-
-                        do {
-                                rc = cl_unuse_try_internal(env, lock);
-                                if (rc == 0)
-                                        break;
-                                if (rc == CLO_WAIT)
-                                        rc = cl_lock_state_wait(env, lock);
-                                if (rc < 0)
-                                        break;
-                        } while(1);
-
+                        rc = cl_unuse_try_internal(env, lock);
                         /* Vet the results. */
                         if (rc < 0 && result > 0)
                                 result = rc;
@@ -1168,6 +1173,7 @@ int cl_enqueue_try(const struct lu_env *env, struct cl_lock *lock,
         int result;
 
         ENTRY;
+        cl_lock_trace(D_DLMTRACE, env, "enqueue lock", lock);
         do {
                 result = 0;
 
@@ -1236,8 +1242,7 @@ static int cl_enqueue_locked(const struct lu_env *env, struct cl_lock *lock,
         } while (1);
         if (result != 0) {
                 cl_lock_user_del(env, lock);
-                if (result != -EINTR)
-                        cl_lock_error(env, lock, result);
+                cl_lock_error(env, lock, result);
         }
         LASSERT(ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
                      lock->cll_state == CLS_HELD));
@@ -1277,8 +1282,9 @@ EXPORT_SYMBOL(cl_enqueue);
  *
  * This function is called repeatedly by cl_unuse() until either lock is
  * unlocked, or error occurs.
+ * cl_unuse_try is a one-shot operation, so it must NOT return CLO_WAIT.
  *
- * \pre  lock->cll_state <= CLS_HELD || cl_lock_is_intransit(lock)
+ * \pre  lock->cll_state == CLS_HELD
  *
  * \post ergo(result == 0, lock->cll_state == CLS_CACHED)
  *
@@ -1291,30 +1297,26 @@ int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock)
         enum cl_lock_state          state = CLS_NEW;
 
         ENTRY;
-        if (lock->cll_state != CLS_INTRANSIT) {
-                if (lock->cll_users > 1) {
-                        cl_lock_user_del(env, lock);
-                        RETURN(0);
-                }
-                /*
-                 * New lock users (->cll_users) are not protecting unlocking
-                 * from proceeding. From this point, lock eventually reaches
-                 * CLS_CACHED, is reinitialized to CLS_NEW or fails into
-                 * CLS_FREEING.
-                 */
-                state = cl_lock_intransit(env, lock);
+        cl_lock_trace(D_DLMTRACE, env, "unuse lock", lock);
+
+        LASSERT(lock->cll_state == CLS_HELD || lock->cll_state == CLS_ENQUEUED);
+        if (lock->cll_users > 1) {
+                cl_lock_user_del(env, lock);
+                RETURN(0);
         }
 
+        /*
+         * New lock users (->cll_users) are not protecting unlocking
+         * from proceeding. From this point, lock eventually reaches
+         * CLS_CACHED, is reinitialized to CLS_NEW or fails into
+         * CLS_FREEING.
+         */
+        state = cl_lock_intransit(env, lock);
+
         result = cl_unuse_try_internal(env, lock);
         LASSERT(lock->cll_state == CLS_INTRANSIT);
-        if (result != CLO_WAIT)
-                /*
-                 * Once there is no more need to iterate ->clo_unuse() calls,
-                 * remove lock user. This is done even if unrecoverable error
-                 * happened during unlocking, because nothing else can be
-                 * done.
-                 */
-                cl_lock_user_del(env, lock);
+        LASSERT(result != CLO_WAIT);
+        cl_lock_user_del(env, lock);
         if (result == 0 || result == -ESTALE) {
                 /*
                  * Return lock back to the cache. This is the only
@@ -1325,7 +1327,10 @@ int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock)
                  * re-initialized. This happens e.g., when a sub-lock was
                  * canceled while unlocking was in progress.
                  */
-                state = result == 0 ? CLS_CACHED : CLS_NEW;
+                if (state == CLS_HELD && result == 0)
+                        state = CLS_CACHED;
+                else
+                        state = CLS_NEW;
                 cl_lock_extransit(env, lock, state);
 
                 /*
@@ -1339,7 +1344,7 @@ int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock)
                  */
                 result = 0;
         } else {
-                CWARN("result = %d, this is unlikely!\n", result);
+                CERROR("result = %d, this is unlikely!\n", result);
                 cl_lock_extransit(env, lock, state);
         }
 
@@ -1352,19 +1357,13 @@ EXPORT_SYMBOL(cl_unuse_try);
 
 static void cl_unuse_locked(const struct lu_env *env, struct cl_lock *lock)
 {
+        int result;
         ENTRY;
-        LASSERT(lock->cll_state <= CLS_HELD);
-        do {
-                int result;
 
-                result = cl_unuse_try(env, lock);
-                if (result == CLO_WAIT) {
-                        result = cl_lock_state_wait(env, lock);
-                        if (result == 0)
-                                continue;
-                }
-                break;
-        } while (1);
+        result = cl_unuse_try(env, lock);
+        if (result)
+                CL_LOCK_DEBUG(D_ERROR, env, lock, "unuse return %d\n", result);
+
         EXIT;
 }
 
@@ -1398,6 +1397,7 @@ int cl_wait_try(const struct lu_env *env, struct cl_lock *lock)
         int                         result;
 
         ENTRY;
+        cl_lock_trace(D_DLMTRACE, env, "wait lock try", lock);
         do {
                 LINVRNT(cl_lock_is_mutexed(lock));
                 LINVRNT(cl_lock_invariant(env, lock));
@@ -1429,8 +1429,10 @@ int cl_wait_try(const struct lu_env *env, struct cl_lock *lock)
                         }
                 }
                 LASSERT(result != -ENOSYS);
-                if (result == 0)
+                if (result == 0) {
+                        LASSERT(lock->cll_state != CLS_INTRANSIT);
                         cl_lock_state_set(env, lock, CLS_HELD);
+                }
         } while (result == CLO_REPEAT);
         RETURN(result ?: lock->cll_error);
 }
@@ -1453,8 +1455,10 @@ int cl_wait(const struct lu_env *env, struct cl_lock *lock)
         cl_lock_mutex_get(env, lock);
 
         LINVRNT(cl_lock_invariant(env, lock));
-        LASSERT(lock->cll_state == CLS_ENQUEUED || lock->cll_state == CLS_HELD);
+        LASSERTF(lock->cll_state == CLS_ENQUEUED || lock->cll_state == CLS_HELD,
+                 "Wrong state %d \n", lock->cll_state);
         LASSERT(lock->cll_holds > 0);
+        cl_lock_trace(D_DLMTRACE, env, "wait lock", lock);
 
         do {
                 result = cl_wait_try(env, lock);
@@ -1467,8 +1471,7 @@ int cl_wait(const struct lu_env *env, struct cl_lock *lock)
         } while (1);
         if (result < 0) {
                 cl_lock_user_del(env, lock);
-                if (result != -EINTR)
-                        cl_lock_error(env, lock, result);
+                cl_lock_error(env, lock, result);
                 cl_lock_lockdep_release(env, lock);
         }
         cl_lock_mutex_put(env, lock);
@@ -1523,6 +1526,7 @@ int cl_lock_modify(const struct lu_env *env, struct cl_lock *lock,
         int result;
 
         ENTRY;
+        cl_lock_trace(D_DLMTRACE, env, "modify lock", lock);
         /* don't allow object to change */
         LASSERT(obj == desc->cld_obj);
         LINVRNT(cl_lock_is_mutexed(lock));
@@ -1615,8 +1619,9 @@ EXPORT_SYMBOL(cl_lock_closure_build);
 int cl_lock_enclosure(const struct lu_env *env, struct cl_lock *lock,
                       struct cl_lock_closure *closure)
 {
-        int result;
+        int result = 0;
         ENTRY;
+        cl_lock_trace(D_DLMTRACE, env, "enclosure lock", lock);
         if (!cl_lock_mutex_try(env, lock)) {
                 /*
                  * If lock->cll_inclosure is not empty, lock is already in
@@ -1658,6 +1663,7 @@ void cl_lock_disclosure(const struct lu_env *env,
         struct cl_lock *scan;
         struct cl_lock *temp;
 
+        cl_lock_trace(D_DLMTRACE, env, "disclosure lock", closure->clc_origin);
         list_for_each_entry_safe(scan, temp, &closure->clc_list, cll_inclosure){
                 list_del_init(&scan->cll_inclosure);
                 cl_lock_mutex_put(env, scan);
@@ -1706,6 +1712,7 @@ void cl_lock_delete(const struct lu_env *env, struct cl_lock *lock)
                      cl_lock_nr_mutexed(env) == 1));
 
         ENTRY;
+        cl_lock_trace(D_DLMTRACE, env, "delete lock", lock);
         if (lock->cll_holds == 0)
                 cl_lock_delete0(env, lock);
         else
@@ -1730,6 +1737,7 @@ void cl_lock_error(const struct lu_env *env, struct cl_lock *lock, int error)
         LINVRNT(cl_lock_invariant(env, lock));
 
         ENTRY;
+        cl_lock_trace(D_DLMTRACE, env, "set lock error", lock);
         if (lock->cll_error == 0 && error != 0) {
                 lock->cll_error = error;
                 cl_lock_signal(env, lock);
@@ -1757,6 +1765,7 @@ void cl_lock_cancel(const struct lu_env *env, struct cl_lock *lock)
         LINVRNT(cl_lock_invariant(env, lock));
 
         ENTRY;
+        cl_lock_trace(D_DLMTRACE, env, "cancel lock", lock);
         if (lock->cll_holds == 0)
                 cl_lock_cancel0(env, lock);
         else
@@ -2032,7 +2041,8 @@ static struct cl_lock *cl_lock_hold_mutex(const struct lu_env *env,
                 if (IS_ERR(lock))
                         break;
                 cl_lock_mutex_get(env, lock);
-                if (lock->cll_state < CLS_FREEING) {
+                if (lock->cll_state < CLS_FREEING &&
+                    !(lock->cll_flags & CLF_CANCELLED)) {
                         cl_lock_hold_mod(env, lock, +1);
                         lu_ref_add(&lock->cll_holders, scope, source);
                         lu_ref_add(&lock->cll_reference, scope, source);
@@ -2078,17 +2088,12 @@ struct cl_lock *cl_lock_request(const struct lu_env *env, struct cl_io *io,
         const struct lu_fid  *fid;
         int                   rc;
         int                   iter;
-        int warn;
         __u32                 enqflags = need->cld_enq_flags;
 
         ENTRY;
         fid = lu_object_fid(&io->ci_obj->co_lu);
         iter = 0;
         do {
-                warn = iter >= 16 && IS_PO2(iter);
-                CDEBUG(warn ? D_WARNING : D_DLMTRACE,
-                       DDESCR"@"DFID" %i %08x `%s'\n",
-                       PDESCR(need), PFID(fid), iter, enqflags, scope);
                 lock = cl_lock_hold_mutex(env, io, need, scope, source);
                 if (!IS_ERR(lock)) {
                         rc = cl_enqueue_locked(env, lock, io, enqflags);
@@ -2098,11 +2103,10 @@ struct cl_lock *cl_lock_request(const struct lu_env *env, struct cl_io *io,
                                         cl_lock_lockdep_acquire(env,
                                                                 lock, enqflags);
                                         break;
-                                } else if (warn)
-                                        CL_LOCK_DEBUG(D_WARNING, env, lock,
-                                                      "got (see bug 17665)\n");
+                                }
                                 cl_unuse_locked(env, lock);
                         }
+                        cl_lock_trace(D_DLMTRACE, env, "enqueue failed", lock);
                         cl_lock_hold_release(env, lock, scope, source);
                         cl_lock_mutex_put(env, lock);
                         lu_ref_del(&lock->cll_reference, scope, source);
@@ -2159,6 +2163,7 @@ void cl_lock_release(const struct lu_env *env, struct cl_lock *lock,
 {
         LINVRNT(cl_lock_invariant(env, lock));
         ENTRY;
+        cl_lock_trace(D_DLMTRACE, env, "release lock", lock);
         cl_lock_mutex_get(env, lock);
         cl_lock_hold_release(env, lock, scope, source);
         cl_lock_mutex_put(env, lock);
@@ -2191,37 +2196,18 @@ int cl_lock_user_del(const struct lu_env *env, struct cl_lock *lock)
 }
 EXPORT_SYMBOL(cl_lock_user_del);
 
-/**
- * Check if two lock's mode are compatible.
- *
- * This returns true iff en-queuing \a lock2 won't cause cancellation of \a
- * lock1 even when these locks overlap.
- */
-int cl_lock_compatible(const struct cl_lock *lock1, const struct cl_lock *lock2)
-{
-        enum cl_lock_mode mode1;
-        enum cl_lock_mode mode2;
-
-        ENTRY;
-        mode1 = lock1->cll_descr.cld_mode;
-        mode2 = lock2->cll_descr.cld_mode;
-        RETURN(mode2 == CLM_PHANTOM ||
-               (mode1 == CLM_READ && mode2 == CLM_READ));
-}
-EXPORT_SYMBOL(cl_lock_compatible);
-
 const char *cl_lock_mode_name(const enum cl_lock_mode mode)
 {
         static const char *names[] = {
-                [CLM_PHANTOM] = "PHANTOM",
-                [CLM_READ]    = "READ",
-                [CLM_WRITE]   = "WRITE",
-                [CLM_GROUP]   = "GROUP"
+                [CLM_PHANTOM] = "P",
+                [CLM_READ]    = "R",
+                [CLM_WRITE]   = "W",
+                [CLM_GROUP]   = "G"
         };
         if (0 <= mode && mode < ARRAY_SIZE(names))
                 return names[mode];
         else
-                return "UNKNW";
+                return "U";
 }
 EXPORT_SYMBOL(cl_lock_mode_name);