Whamcloud - gitweb
b=19906
authorjxiong <jxiong>
Thu, 19 Nov 2009 03:39:07 +0000 (03:39 +0000)
committerjxiong <jxiong>
Thu, 19 Nov 2009 03:39:07 +0000 (03:39 +0000)
r=wangdi,ericm

Make racer work at the client side.

lustre/include/cl_object.h
lustre/lclient/glimpse.c
lustre/lclient/lcommon_cl.c
lustre/lclient/lcommon_misc.c
lustre/llite/vvp_io.c
lustre/lov/lov_lock.c
lustre/lov/lovsub_lock.c
lustre/obdclass/cl_io.c
lustre/obdclass/cl_lock.c
lustre/obdecho/echo_client.c
lustre/osc/osc_lock.c

index ba66ba7..563edd1 100644 (file)
@@ -1301,6 +1301,11 @@ struct cl_lock_descr {
         __u64             cld_gid;
         /** Lock mode. */
         enum cl_lock_mode cld_mode;
+        /**
+         * flags to enqueue lock. A combination of bit-flags from
+         * enum cl_enq_flags.
+         */
+        __u32             cld_enq_flags;
 };
 
 #define DDESCR "%s(%d):[%lu, %lu]"
@@ -1666,6 +1671,7 @@ struct cl_lock_operations {
          * usual return values of lock state-machine methods, this can return
          * -ESTALE to indicate that lock cannot be returned to the cache, and
          * has to be re-initialized.
+         * unuse is a one-shot operation, so it must NOT return CLO_WAIT.
          *
          * \see ccc_lock_unuse(), lov_lock_unuse(), osc_lock_unuse()
          */
@@ -2156,11 +2162,6 @@ struct cl_io_lock_link {
         struct list_head     cill_linkage;
         struct cl_lock_descr cill_descr;
         struct cl_lock      *cill_lock;
-        /**
-         * flags to enqueue lock for this IO. A combination of bit-flags from
-         * enum cl_enq_flags.
-         */
-        __u32                cill_enq_flags;
         /** optional destructor */
         void               (*cill_fini)(const struct lu_env *env,
                                         struct cl_io_lock_link *link);
@@ -2763,7 +2764,6 @@ struct cl_lock *cl_lock_peek(const struct lu_env *env, const struct cl_io *io,
                              const char *scope, const void *source);
 struct cl_lock *cl_lock_request(const struct lu_env *env, struct cl_io *io,
                                 const struct cl_lock_descr *need,
-                                __u32 enqflags,
                                 const char *scope, const void *source);
 struct cl_lock *cl_lock_at_page(const struct lu_env *env, struct cl_object *obj,
                                 struct cl_page *page, struct cl_lock *except,
@@ -2901,7 +2901,7 @@ void  cl_io_end          (const struct lu_env *env, struct cl_io *io);
 int   cl_io_lock_add     (const struct lu_env *env, struct cl_io *io,
                           struct cl_io_lock_link *link);
 int   cl_io_lock_alloc_add(const struct lu_env *env, struct cl_io *io,
-                           struct cl_lock_descr *descr, int enqflags);
+                           struct cl_lock_descr *descr);
 int   cl_io_read_page    (const struct lu_env *env, struct cl_io *io,
                           struct cl_page *page);
 int   cl_io_prepare_write(const struct lu_env *env, struct cl_io *io,
index ed81f15..1527635 100644 (file)
@@ -118,6 +118,7 @@ int cl_glimpse_lock(const struct lu_env *env, struct cl_io *io,
                         *descr = whole_file;
                         descr->cld_obj   = clob;
                         descr->cld_mode  = CLM_PHANTOM;
+                        descr->cld_enq_flags = CEF_ASYNC | CEF_MUST;
                         cio->cui_glimpse = 1;
                         /*
                          * CEF_ASYNC is used because glimpse sub-locks cannot
@@ -127,9 +128,8 @@ int cl_glimpse_lock(const struct lu_env *env, struct cl_io *io,
                          * CEF_MUST protects glimpse lock from conversion into
                          * a lockless mode.
                          */
-                        lock = cl_lock_request(env, io, descr,
-                                               CEF_ASYNC|CEF_MUST,
-                                               "glimpse", cfs_current());
+                        lock = cl_lock_request(env, io, descr, "glimpse",
+                                               cfs_current());
                         cio->cui_glimpse = 0;
                         if (!IS_ERR(lock)) {
                                 result = cl_wait(env, lock);
index 195ce87..3d8ea67 100644 (file)
@@ -736,8 +736,8 @@ int ccc_io_one_lock_index(const struct lu_env *env, struct cl_io *io,
         descr->cld_obj   = obj;
         descr->cld_start = start;
         descr->cld_end   = end;
+        descr->cld_enq_flags = enqflags;
 
-        cio->cui_link.cill_enq_flags = enqflags;
         cl_io_lock_add(env, io, &cio->cui_link);
         RETURN(0);
 }
index 24e896d..ee4fef3 100644 (file)
@@ -154,8 +154,9 @@ int cl_get_grouplock(struct cl_object *obj, unsigned long gid, int nonblock,
         descr->cld_mode = CLM_GROUP;
 
         enqflags = CEF_MUST | (nonblock ? CEF_NONBLOCK : 0);
-        lock = cl_lock_request(env, io, descr, enqflags,
-                               GROUPLOCK_SCOPE, cfs_current());
+        descr->cld_enq_flags = enqflags;
+
+        lock = cl_lock_request(env, io, descr, GROUPLOCK_SCOPE, cfs_current());
         if (IS_ERR(lock)) {
                 cl_io_fini(env, io);
                 cl_env_put(env, &refcheck);
index a602672..5107795 100644 (file)
@@ -178,7 +178,8 @@ static int vvp_mmap_locks(const struct lu_env *env,
                                                     policy.l_extent.start);
                         descr->cld_end = cl_index(descr->cld_obj,
                                                   policy.l_extent.end);
-                        result = cl_io_lock_alloc_add(env, io, descr, flags);
+                        descr->cld_enq_flags = flags;
+                        result = cl_io_lock_alloc_add(env, io, descr);
                         if (result < 0)
                                 RETURN(result);
 
index 4a02d03..fae9437 100644 (file)
@@ -216,7 +216,8 @@ static int lov_sublock_lock(const struct lu_env *env,
                 LASSERT(cl_lock_is_mutexed(child));
                 sublock->lss_active = parent;
 
-                if (unlikely(child->cll_state == CLS_FREEING)) {
+                if (unlikely((child->cll_state == CLS_FREEING) ||
+                             (child->cll_flags & CLF_CANCELLED))) {
                         struct lov_lock_link *link;
                         /*
                          * we could race with lock deletion which temporarily
@@ -345,6 +346,7 @@ static int lov_lock_sub_init(const struct lu_env *env,
                         descr->cld_end   = cl_index(descr->cld_obj, end);
                         descr->cld_mode  = parent->cll_descr.cld_mode;
                         descr->cld_gid   = parent->cll_descr.cld_gid;
+                        descr->cld_enq_flags   = parent->cll_descr.cld_enq_flags;
                         /* XXX has no effect */
                         lck->lls_sub[nr].sub_got = *descr;
                         lck->lls_sub[nr].sub_stripe = i;
@@ -366,6 +368,7 @@ static int lov_lock_sub_init(const struct lu_env *env,
                                 result = PTR_ERR(sublock);
                                 break;
                         }
+                        cl_lock_get_trust(sublock);
                         cl_lock_mutex_get(env, sublock);
                         cl_lock_mutex_get(env, parent);
                         /*
@@ -383,6 +386,7 @@ static int lov_lock_sub_init(const struct lu_env *env,
                                                "lov-parent", parent);
                         }
                         cl_lock_mutex_put(env, sublock);
+                        cl_lock_put(env, sublock);
                 }
         }
         /*
@@ -536,10 +540,11 @@ static int lov_sublock_fill(const struct lu_env *env, struct cl_lock *parent,
         cl_lock_mutex_get(env, parent);
 
         if (!IS_ERR(sublock)) {
+                cl_lock_get_trust(sublock);
                 if (parent->cll_state == CLS_QUEUING &&
-                    lck->lls_sub[idx].sub_lock == NULL)
+                    lck->lls_sub[idx].sub_lock == NULL) {
                         lov_sublock_adopt(env, lck, sublock, idx, link);
-                else {
+                else {
                         OBD_SLAB_FREE_PTR(link, lov_lock_link_kmem);
                         /* other thread allocated sub-lock, or enqueue is no
                          * longer going on */
@@ -548,6 +553,7 @@ static int lov_sublock_fill(const struct lu_env *env, struct cl_lock *parent,
                         cl_lock_mutex_get(env, parent);
                 }
                 cl_lock_mutex_put(env, sublock);
+                cl_lock_put(env, sublock);
                 result = CLO_REPEAT;
         } else
                 result = PTR_ERR(sublock);
@@ -659,15 +665,11 @@ static int lov_lock_unuse(const struct lu_env *env,
                         if (lls->sub_flags & LSF_HELD) {
                                 LASSERT(sublock->cll_state == CLS_HELD);
                                 rc = cl_unuse_try(subenv->lse_env, sublock);
-                                if (rc != CLO_WAIT)
-                                        rc = lov_sublock_release(env, lck,
-                                                                 i, 0, rc);
+                                rc = lov_sublock_release(env, lck, i, 0, rc);
                         }
                         lov_sublock_unlock(env, sub, closure, subenv);
                 }
                 result = lov_subresult(result, rc);
-                if (result < 0)
-                        break;
         }
 
         if (result == 0 && lck->lls_cancel_race) {
@@ -678,6 +680,75 @@ static int lov_lock_unuse(const struct lu_env *env,
         RETURN(result);
 }
 
+
+static void lov_lock_cancel(const struct lu_env *env,
+                           const struct cl_lock_slice *slice)
+{
+        struct lov_lock        *lck     = cl2lov_lock(slice);
+        struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock);
+        int i;
+        int result;
+
+        ENTRY;
+
+        for (result = 0, i = 0; i < lck->lls_nr; ++i) {
+                int rc;
+                struct lovsub_lock     *sub;
+                struct cl_lock         *sublock;
+                struct lov_lock_sub    *lls;
+                struct lov_sublock_env *subenv;
+
+                /* top-lock state cannot change concurrently, because single
+                 * thread (one that released the last hold) carries unlocking
+                 * to the completion. */
+                lls = &lck->lls_sub[i];
+                sub = lls->sub_lock;
+                if (sub == NULL)
+                        continue;
+
+                sublock = sub->lss_cl.cls_lock;
+                rc = lov_sublock_lock(env, lck, lls, closure, &subenv);
+                if (rc == 0) {
+                        if (!(lls->sub_flags & LSF_HELD)) {
+                                lov_sublock_unlock(env, sub, closure, subenv);
+                                continue;
+                        }
+
+                        switch(sublock->cll_state) {
+                        case CLS_HELD:
+                                rc = cl_unuse_try(subenv->lse_env,
+                                                  sublock);
+                                lov_sublock_release(env, lck, i, 0, 0);
+                                break;
+                        case CLS_ENQUEUED:
+                                /* TODO: it's not a good idea to cancel this
+                                 * lock because it's innocent. But it's
+                                 * acceptable. The better way would be to
+                                 * define a new lock method to unhold the
+                                 * dlm lock. */
+                                cl_lock_cancel(env, sublock);
+                        default:
+                                lov_sublock_release(env, lck, i, 1, 0);
+                                break;
+                        }
+                        lov_sublock_unlock(env, sub, closure, subenv);
+                }
+
+                if (rc == CLO_REPEAT) {
+                        --i;
+                        continue;
+                }
+
+                result = lov_subresult(result, rc);
+        }
+
+        if (result)
+                CL_LOCK_DEBUG(D_ERROR, env, slice->cls_lock,
+                              "lov_lock_cancel fails with %d.\n", result);
+
+        cl_lock_closure_fini(closure);
+}
+
 static int lov_lock_wait(const struct lu_env *env,
                          const struct cl_lock_slice *slice)
 {
@@ -1049,6 +1120,7 @@ static const struct cl_lock_operations lov_lock_ops = {
         .clo_wait      = lov_lock_wait,
         .clo_use       = lov_lock_use,
         .clo_unuse     = lov_lock_unuse,
+        .clo_cancel    = lov_lock_cancel,
         .clo_fits_into = lov_lock_fits_into,
         .clo_delete    = lov_lock_delete,
         .clo_print     = lov_lock_print
index f682995..0d75bbe 100644 (file)
@@ -331,9 +331,11 @@ static int lovsub_lock_delete_one(const struct lu_env *env,
         int             result;
         ENTRY;
 
-        parent  = lov->lls_cl.cls_lock;
-        result = 0;
+        parent = lov->lls_cl.cls_lock;
+        if (parent->cll_error)
+                RETURN(0);
 
+        result = 0;
         switch (parent->cll_state) {
         case CLS_NEW:
         case CLS_QUEUING:
index fffe551..e386396 100644 (file)
@@ -323,12 +323,11 @@ static int cl_lockset_lock_one(const struct lu_env *env,
 
         ENTRY;
 
-        lock = cl_lock_request(env, io, &link->cill_descr, link->cill_enq_flags,
-                               "io", io);
+        lock = cl_lock_request(env, io, &link->cill_descr, "io", io);
         if (!IS_ERR(lock)) {
                 link->cill_lock = lock;
                 list_move(&link->cill_linkage, &set->cls_curr);
-                if (!(link->cill_enq_flags & CEF_ASYNC)) {
+                if (!(link->cill_descr.cld_enq_flags & CEF_ASYNC)) {
                         result = cl_wait(env, lock);
                         if (result == 0)
                                 list_move(&link->cill_linkage, &set->cls_done);
@@ -573,7 +572,7 @@ static void cl_free_io_lock_link(const struct lu_env *env,
  * Allocates new lock link, and uses it to add a lock to a lockset.
  */
 int cl_io_lock_alloc_add(const struct lu_env *env, struct cl_io *io,
-                         struct cl_lock_descr *descr, int enqflags)
+                         struct cl_lock_descr *descr)
 {
         struct cl_io_lock_link *link;
         int result;
@@ -582,7 +581,6 @@ int cl_io_lock_alloc_add(const struct lu_env *env, struct cl_io *io,
         OBD_ALLOC_PTR(link);
         if (link != NULL) {
                 link->cill_descr     = *descr;
-                link->cill_enq_flags = enqflags;
                 link->cill_fini      = cl_free_io_lock_link;
                 result = cl_io_lock_add(env, io, link);
                 if (result) /* lock match */
index b85becc..50d44f5 100644 (file)
@@ -365,6 +365,7 @@ EXPORT_SYMBOL(cl_lock_get_trust);
 static void cl_lock_finish(const struct lu_env *env, struct cl_lock *lock)
 {
         cl_lock_mutex_get(env, lock);
+        cl_lock_cancel(env, lock);
         cl_lock_delete(env, lock);
         cl_lock_mutex_put(env, lock);
         cl_lock_put(env, lock);
@@ -509,9 +510,10 @@ static struct cl_lock *cl_lock_lookup(const struct lu_env *env,
 
                 LASSERT(cl_is_lock(lock));
                 matched = cl_lock_ext_match(&lock->cll_descr, need) &&
-                        lock->cll_state < CLS_FREEING &&
-                        !(lock->cll_flags & CLF_CANCELLED) &&
-                        cl_lock_fits_into(env, lock, need, io);
+                          lock->cll_state < CLS_FREEING &&
+                          lock->cll_error == 0 &&
+                          !(lock->cll_flags & CLF_CANCELLED) &&
+                          cl_lock_fits_into(env, lock, need, io);
                 CDEBUG(D_DLMTRACE, "has: "DDESCR"(%i) need: "DDESCR": %d\n",
                        PDESCR(&lock->cll_descr), lock->cll_state, PDESCR(need),
                        matched);
@@ -820,6 +822,7 @@ static void cl_lock_delete0(const struct lu_env *env, struct cl_lock *lock)
 
         ENTRY;
         if (lock->cll_state < CLS_FREEING) {
+                LASSERT(lock->cll_state != CLS_INTRANSIT);
                 cl_lock_state_set(env, lock, CLS_FREEING);
 
                 head = cl_object_header(lock->cll_descr.cld_obj);
@@ -1048,9 +1051,6 @@ static int cl_unuse_try_internal(const struct lu_env *env, struct cl_lock *lock)
         do {
                 result = 0;
 
-                if (lock->cll_error != 0)
-                        break;
-
                 LINVRNT(cl_lock_is_mutexed(lock));
                 LINVRNT(cl_lock_invariant(env, lock));
                 LASSERT(lock->cll_state == CLS_INTRANSIT);
@@ -1069,7 +1069,7 @@ static int cl_unuse_try_internal(const struct lu_env *env, struct cl_lock *lock)
                 LASSERT(result != -ENOSYS);
         } while (result == CLO_REPEAT);
 
-        return result ?: lock->cll_error;
+        return result;
 }
 
 /**
@@ -1087,6 +1087,10 @@ int cl_use_try(const struct lu_env *env, struct cl_lock *lock, int atomic)
         ENTRY;
         cl_lock_trace(D_DLMTRACE, env, "use lock", lock);
 
+        LASSERT(lock->cll_state == CLS_CACHED);
+        if (lock->cll_error)
+                RETURN(lock->cll_error);
+
         result = -ENOSYS;
         state = cl_lock_intransit(env, lock);
         list_for_each_entry(slice, &lock->cll_layers, cls_linkage) {
@@ -1098,7 +1102,8 @@ int cl_use_try(const struct lu_env *env, struct cl_lock *lock, int atomic)
         }
         LASSERT(result != -ENOSYS);
 
-        LASSERT(lock->cll_state == CLS_INTRANSIT);
+        LASSERTF(lock->cll_state == CLS_INTRANSIT, "Wrong state %d.\n",
+                 lock->cll_state);
 
         if (result == 0) {
                 state = CLS_HELD;
@@ -1116,17 +1121,7 @@ int cl_use_try(const struct lu_env *env, struct cl_lock *lock, int atomic)
                 /* @atomic means back-off-on-failure. */
                 if (atomic) {
                         int rc;
-
-                        do {
-                                rc = cl_unuse_try_internal(env, lock);
-                                if (rc == 0)
-                                        break;
-                                if (rc == CLO_WAIT)
-                                        rc = cl_lock_state_wait(env, lock);
-                                if (rc < 0)
-                                        break;
-                        } while(1);
-
+                        rc = cl_unuse_try_internal(env, lock);
                         /* Vet the results. */
                         if (rc < 0 && result > 0)
                                 result = rc;
@@ -1251,8 +1246,7 @@ static int cl_enqueue_locked(const struct lu_env *env, struct cl_lock *lock,
         } while (1);
         if (result != 0) {
                 cl_lock_user_del(env, lock);
-                if (result != -EINTR)
-                        cl_lock_error(env, lock, result);
+                cl_lock_error(env, lock, result);
         }
         LASSERT(ergo(result == 0, lock->cll_state == CLS_ENQUEUED ||
                      lock->cll_state == CLS_HELD));
@@ -1292,8 +1286,9 @@ EXPORT_SYMBOL(cl_enqueue);
  *
  * This function is called repeatedly by cl_unuse() until either lock is
  * unlocked, or error occurs.
+ * cl_unuse_try is a one-shot operation, so it must NOT return CLO_WAIT.
  *
- * \pre  lock->cll_state <= CLS_HELD || cl_lock_is_intransit(lock)
+ * \pre  lock->cll_state == CLS_HELD
  *
  * \post ergo(result == 0, lock->cll_state == CLS_CACHED)
  *
@@ -1308,30 +1303,24 @@ int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock)
         ENTRY;
         cl_lock_trace(D_DLMTRACE, env, "unuse lock", lock);
 
-        if (lock->cll_state != CLS_INTRANSIT) {
-                if (lock->cll_users > 1) {
-                        cl_lock_user_del(env, lock);
-                        RETURN(0);
-                }
-                /*
-                 * New lock users (->cll_users) are not protecting unlocking
-                 * from proceeding. From this point, lock eventually reaches
-                 * CLS_CACHED, is reinitialized to CLS_NEW or fails into
-                 * CLS_FREEING.
-                 */
-                state = cl_lock_intransit(env, lock);
+        LASSERT(lock->cll_state == CLS_HELD || lock->cll_state == CLS_ENQUEUED);
+        if (lock->cll_users > 1) {
+                cl_lock_user_del(env, lock);
+                RETURN(0);
         }
 
+        /*
+         * New lock users (->cll_users) are not protecting unlocking
+         * from proceeding. From this point, lock eventually reaches
+         * CLS_CACHED, is reinitialized to CLS_NEW or fails into
+         * CLS_FREEING.
+         */
+        state = cl_lock_intransit(env, lock);
+
         result = cl_unuse_try_internal(env, lock);
         LASSERT(lock->cll_state == CLS_INTRANSIT);
-        if (result != CLO_WAIT)
-                /*
-                 * Once there is no more need to iterate ->clo_unuse() calls,
-                 * remove lock user. This is done even if unrecoverable error
-                 * happened during unlocking, because nothing else can be
-                 * done.
-                 */
-                cl_lock_user_del(env, lock);
+        LASSERT(result != CLO_WAIT);
+        cl_lock_user_del(env, lock);
         if (result == 0 || result == -ESTALE) {
                 /*
                  * Return lock back to the cache. This is the only
@@ -1342,7 +1331,10 @@ int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock)
                  * re-initialized. This happens e.g., when a sub-lock was
                  * canceled while unlocking was in progress.
                  */
-                state = result == 0 ? CLS_CACHED : CLS_NEW;
+                if (state == CLS_HELD && result == 0)
+                        state = CLS_CACHED;
+                else
+                        state = CLS_NEW;
                 cl_lock_extransit(env, lock, state);
 
                 /*
@@ -1356,7 +1348,7 @@ int cl_unuse_try(const struct lu_env *env, struct cl_lock *lock)
                  */
                 result = 0;
         } else {
-                CWARN("result = %d, this is unlikely!\n", result);
+                CERROR("result = %d, this is unlikely!\n", result);
                 cl_lock_extransit(env, lock, state);
         }
 
@@ -1369,19 +1361,13 @@ EXPORT_SYMBOL(cl_unuse_try);
 
 static void cl_unuse_locked(const struct lu_env *env, struct cl_lock *lock)
 {
+        int result;
         ENTRY;
-        LASSERT(lock->cll_state <= CLS_HELD);
-        do {
-                int result;
 
-                result = cl_unuse_try(env, lock);
-                if (result == CLO_WAIT) {
-                        result = cl_lock_state_wait(env, lock);
-                        if (result == 0)
-                                continue;
-                }
-                break;
-        } while (1);
+        result = cl_unuse_try(env, lock);
+        if (result)
+                CL_LOCK_DEBUG(D_ERROR, env, lock, "unuse return %d\n", result);
+
         EXIT;
 }
 
@@ -1447,8 +1433,10 @@ int cl_wait_try(const struct lu_env *env, struct cl_lock *lock)
                         }
                 }
                 LASSERT(result != -ENOSYS);
-                if (result == 0)
+                if (result == 0) {
+                        LASSERT(lock->cll_state != CLS_INTRANSIT);
                         cl_lock_state_set(env, lock, CLS_HELD);
+                }
         } while (result == CLO_REPEAT);
         RETURN(result ?: lock->cll_error);
 }
@@ -1471,7 +1459,8 @@ int cl_wait(const struct lu_env *env, struct cl_lock *lock)
         cl_lock_mutex_get(env, lock);
 
         LINVRNT(cl_lock_invariant(env, lock));
-        LASSERT(lock->cll_state == CLS_ENQUEUED || lock->cll_state == CLS_HELD);
+        LASSERTF(lock->cll_state == CLS_ENQUEUED || lock->cll_state == CLS_HELD,
+                 "Wrong state %d \n", lock->cll_state);
         LASSERT(lock->cll_holds > 0);
         cl_lock_trace(D_DLMTRACE, env, "wait lock", lock);
 
@@ -1486,8 +1475,7 @@ int cl_wait(const struct lu_env *env, struct cl_lock *lock)
         } while (1);
         if (result < 0) {
                 cl_lock_user_del(env, lock);
-                if (result != -EINTR)
-                        cl_lock_error(env, lock, result);
+                cl_lock_error(env, lock, result);
                 cl_lock_lockdep_release(env, lock);
         }
         cl_lock_mutex_put(env, lock);
@@ -1812,6 +1800,7 @@ struct cl_lock *cl_lock_at_page(const struct lu_env *env, struct cl_object *obj,
         need->cld_mode = CLM_READ; /* CLM_READ matches both READ & WRITE, but
                                     * not PHANTOM */
         need->cld_start = need->cld_end = page->cp_index;
+        need->cld_enq_flags = 0;
 
         spin_lock(&head->coh_lock_guard);
         /* It is fine to match any group lock since there could be only one
@@ -2056,7 +2045,8 @@ static struct cl_lock *cl_lock_hold_mutex(const struct lu_env *env,
                 if (IS_ERR(lock))
                         break;
                 cl_lock_mutex_get(env, lock);
-                if (lock->cll_state < CLS_FREEING) {
+                if (lock->cll_state < CLS_FREEING &&
+                    !(lock->cll_flags & CLF_CANCELLED)) {
                         cl_lock_hold_mod(env, lock, +1);
                         lu_ref_add(&lock->cll_holders, scope, source);
                         lu_ref_add(&lock->cll_reference, scope, source);
@@ -2096,23 +2086,18 @@ EXPORT_SYMBOL(cl_lock_hold);
  */
 struct cl_lock *cl_lock_request(const struct lu_env *env, struct cl_io *io,
                                 const struct cl_lock_descr *need,
-                                __u32 enqflags,
                                 const char *scope, const void *source)
 {
         struct cl_lock       *lock;
         const struct lu_fid  *fid;
         int                   rc;
         int                   iter;
-        int warn;
+        __u32                 enqflags = need->cld_enq_flags;
 
         ENTRY;
         fid = lu_object_fid(&io->ci_obj->co_lu);
         iter = 0;
         do {
-                warn = iter >= 16 && IS_PO2(iter);
-                CDEBUG(warn ? D_WARNING : D_DLMTRACE,
-                       DDESCR"@"DFID" %i %08x `%s'\n",
-                       PDESCR(need), PFID(fid), iter, enqflags, scope);
                 lock = cl_lock_hold_mutex(env, io, need, scope, source);
                 if (!IS_ERR(lock)) {
                         rc = cl_enqueue_locked(env, lock, io, enqflags);
@@ -2122,9 +2107,7 @@ struct cl_lock *cl_lock_request(const struct lu_env *env, struct cl_io *io,
                                         cl_lock_lockdep_acquire(env,
                                                                 lock, enqflags);
                                         break;
-                                } else if (warn)
-                                        CL_LOCK_DEBUG(D_WARNING, env, lock,
-                                                      "got (see bug 17665)\n");
+                                }
                                 cl_unuse_locked(env, lock);
                         }
                         cl_lock_trace(D_DLMTRACE, env, "enqueue failed", lock);
index 504196a..3f54f2e 100644 (file)
@@ -995,10 +995,10 @@ static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco,
         descr->cld_start = cl_index(obj, start);
         descr->cld_end   = cl_index(obj, end);
         descr->cld_mode  = mode == LCK_PW ? CLM_WRITE : CLM_READ;
+        descr->cld_enq_flags = CEF_ASYNC | enqflags;
         io->ci_obj = obj;
 
-        lck = cl_lock_request(env, io, descr, CEF_ASYNC | enqflags,
-                              "ec enqueue", eco);
+        lck = cl_lock_request(env, io, descr, "ec enqueue", eco);
         if (lck) {
                 struct echo_client_obd *ec = eco->eo_dev->ed_ec;
                 struct echo_lock *el;
index e9d49fe..b9e711b 100644 (file)
@@ -1591,24 +1591,40 @@ static int osc_lock_fits_into(const struct lu_env *env,
 {
         struct osc_lock *ols = cl2osc_lock(slice);
 
-        /* If the lock hasn't ever enqueued, it can't be matched because
-         * enqueue process brings in many information which can be used to
-         * determine things such as lockless, CEF_MUST, etc.
-         */
-        if (ols->ols_state < OLS_ENQUEUED)
-                return 0;
-
-        /* Don't match this lock if the lock is able to become lockless lock.
-         * This is because the new lock might be covering a mmap region and
-         * so that it must have a cached at the local side. */
-        if (ols->ols_state < OLS_UPCALL_RECEIVED && ols->ols_locklessable)
-                return 0;
-
-        /* If the lock is going to be canceled, no reason to match it as well */
-        if (ols->ols_state > OLS_RELEASED)
+        if (need->cld_enq_flags & CEF_NEVER)
                 return 0;
 
-        /* go for it. */
+        if (need->cld_mode == CLM_PHANTOM) {
+                /*
+                 * Note: the QUEUED lock can't be matched here, otherwise
+                 * it might cause the deadlocks.
+                 * In read_process,
+                 * P1: enqueued read lock, create sublock1
+                 * P2: enqueued write lock, create sublock2(conflicted
+                 *     with sublock1).
+                 * P1: Grant read lock.
+                 * P1: enqueued glimpse lock(with holding sublock1_read),
+                 *     matched with sublock2, waiting sublock2 to be granted.
+                 *     But sublock2 can not be granted, because P1
+                 *     will not release sublock1. Bang!
+                 */
+                if (ols->ols_state < OLS_GRANTED ||
+                        ols->ols_state > OLS_RELEASED)
+                        return 0;
+        } else if (need->cld_enq_flags & CEF_MUST) {
+                 /*
+                 * If the lock hasn't ever enqueued, it can't be matched
+                 * because enqueue process brings in many information
+                 * which can be used to determine things such as lockless,
+                 * CEF_MUST, etc.
+                 */
+                if (ols->ols_state < OLS_GRANTED ||
+                        ols->ols_state > OLS_RELEASED)
+                        return 0;
+                if (ols->ols_state < OLS_UPCALL_RECEIVED &&
+                        ols->ols_locklessable)
+                        return 0;
+        }
         return 1;
 }