Whamcloud - gitweb
LU-13227 LDLM: update LVB if it is server lock
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
index 237fe67..aa2eeea 100644 (file)
@@ -136,8 +136,6 @@ const char *ldlm_it2str(enum ldlm_intent_flags it)
 }
 EXPORT_SYMBOL(ldlm_it2str);
 
-extern struct kmem_cache *ldlm_lock_slab;
-
 #ifdef HAVE_SERVER_SUPPORT
 static ldlm_processing_policy ldlm_processing_policy_table[] = {
        [LDLM_PLAIN]    = ldlm_process_plain_lock,
@@ -151,6 +149,19 @@ ldlm_processing_policy ldlm_get_processing_policy(struct ldlm_resource *res)
         return ldlm_processing_policy_table[res->lr_type];
 }
 EXPORT_SYMBOL(ldlm_get_processing_policy);
+
+static ldlm_reprocessing_policy ldlm_reprocessing_policy_table[] = {
+       [LDLM_PLAIN]    = ldlm_reprocess_queue,
+       [LDLM_EXTENT]   = ldlm_reprocess_queue,
+       [LDLM_FLOCK]    = ldlm_reprocess_queue,
+       [LDLM_IBITS]    = ldlm_reprocess_inodebits_queue,
+};
+
+ldlm_reprocessing_policy ldlm_get_reprocessing_policy(struct ldlm_resource *res)
+{
+       return ldlm_reprocessing_policy_table[res->lr_type];
+}
+
 #endif /* HAVE_SERVER_SUPPORT */
 
 void ldlm_register_intent(struct ldlm_namespace *ns, ldlm_res_policy arg)
@@ -174,11 +185,20 @@ EXPORT_SYMBOL(ldlm_register_intent);
  */
 struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
 {
-       atomic_inc(&lock->l_refc);
+       refcount_inc(&lock->l_handle.h_ref);
         return lock;
 }
 EXPORT_SYMBOL(ldlm_lock_get);
 
+static void lock_handle_free(struct rcu_head *rcu)
+{
+       struct ldlm_lock *lock = container_of(rcu, struct ldlm_lock,
+                                             l_handle.h_rcu);
+
+       OBD_FREE_PRE(lock, sizeof(*lock), "slab-freed");
+       kmem_cache_free(ldlm_lock_slab, lock);
+}
+
 /**
  * Release lock reference.
  *
@@ -189,8 +209,8 @@ void ldlm_lock_put(struct ldlm_lock *lock)
         ENTRY;
 
         LASSERT(lock->l_resource != LP_POISON);
-       LASSERT(atomic_read(&lock->l_refc) > 0);
-       if (atomic_dec_and_test(&lock->l_refc)) {
+       LASSERT(refcount_read(&lock->l_handle.h_ref) > 0);
+       if (refcount_dec_and_test(&lock->l_handle.h_ref)) {
                 struct ldlm_resource *res;
 
                 LDLM_DEBUG(lock,
@@ -205,8 +225,6 @@ void ldlm_lock_put(struct ldlm_lock *lock)
                 lprocfs_counter_decr(ldlm_res_to_ns(res)->ns_stats,
                                      LDLM_NSS_LOCKS);
                 lu_ref_del(&res->lr_reference, "lock", lock);
-                ldlm_resource_putref(res);
-                lock->l_resource = NULL;
                 if (lock->l_export) {
                         class_export_lock_put(lock->l_export, lock);
                         lock->l_export = NULL;
@@ -215,9 +233,17 @@ void ldlm_lock_put(struct ldlm_lock *lock)
                 if (lock->l_lvb_data != NULL)
                         OBD_FREE_LARGE(lock->l_lvb_data, lock->l_lvb_len);
 
-                ldlm_interval_free(ldlm_interval_detach(lock));
+               if (res->lr_type == LDLM_EXTENT) {
+                       ldlm_interval_free(ldlm_interval_detach(lock));
+               } else if (res->lr_type == LDLM_IBITS) {
+                       if (lock->l_ibits_node != NULL)
+                               OBD_SLAB_FREE_PTR(lock->l_ibits_node,
+                                                 ldlm_inodebits_slab);
+               }
+               ldlm_resource_putref(res);
+               lock->l_resource = NULL;
                 lu_ref_fini(&lock->l_reference);
-               OBD_FREE_RCU(lock, sizeof(*lock), &lock->l_handle);
+               call_rcu(&lock->l_handle.h_rcu, lock_handle_free);
         }
 
         EXIT;
@@ -421,22 +447,7 @@ void ldlm_lock_destroy_nolock(struct ldlm_lock *lock)
         EXIT;
 }
 
-/* this is called by portals_handle2object with the handle lock taken */
-static void lock_handle_addref(void *lock)
-{
-        LDLM_LOCK_GET((struct ldlm_lock *)lock);
-}
-
-static void lock_handle_free(void *lock, int size)
-{
-       LASSERT(size == sizeof(struct ldlm_lock));
-       OBD_SLAB_FREE(lock, ldlm_lock_slab, size);
-}
-
-static struct portals_handle_ops lock_handle_ops = {
-       .hop_addref = lock_handle_addref,
-       .hop_free   = lock_handle_free,
-};
+static const char lock_handle_owner[] = "ldlm";
 
 /**
  *
@@ -462,7 +473,7 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
        lock->l_resource = resource;
        lu_ref_add(&resource->lr_reference, "lock", lock);
 
-       atomic_set(&lock->l_refc, 2);
+       refcount_set(&lock->l_handle.h_ref, 2);
        INIT_LIST_HEAD(&lock->l_res_link);
        INIT_LIST_HEAD(&lock->l_lru);
        INIT_LIST_HEAD(&lock->l_pending_chain);
@@ -478,8 +489,8 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
 
         lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats,
                              LDLM_NSS_LOCKS);
-       INIT_LIST_HEAD_RCU(&lock->l_handle.h_link);
-       class_handle_hash(&lock->l_handle, &lock_handle_ops);
+       INIT_HLIST_NODE(&lock->l_handle.h_link);
+       class_handle_hash(&lock->l_handle, lock_handle_owner);
 
         lu_ref_init(&lock->l_reference);
         lu_ref_add(&lock->l_reference, "hash", lock);
@@ -589,7 +600,10 @@ struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle,
 
        LASSERT(handle);
 
-       lock = class_handle2object(handle->cookie, NULL);
+       if (!lustre_handle_is_used(handle))
+               RETURN(NULL);
+
+       lock = class_handle2object(handle->cookie, lock_handle_owner);
        if (lock == NULL)
                RETURN(NULL);
 
@@ -603,7 +617,7 @@ struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle,
        /* It's unlikely but possible that someone marked the lock as
         * destroyed after we did handle2object on it */
        if ((flags == 0) && !ldlm_is_destroyed(lock)) {
-               lu_ref_add(&lock->l_reference, "handle", current);
+               lu_ref_add_atomic(&lock->l_reference, "handle", current);
                RETURN(lock);
        }
 
@@ -1035,7 +1049,6 @@ static void search_granted_lock(struct list_head *queue,
         prev->mode_link = &req->l_sl_mode;
         prev->policy_link = &req->l_sl_policy;
         EXIT;
-        return;
 }
 
 /**
@@ -1084,7 +1097,7 @@ void ldlm_grant_lock_with_skiplist(struct ldlm_lock *lock)
 {
        struct sl_insert_point prev;
 
-       LASSERT(lock->l_req_mode == lock->l_granted_mode);
+       LASSERT(ldlm_is_granted(lock));
 
        search_granted_lock(&lock->l_resource->lr_granted, lock, &prev);
        ldlm_granted_list_add_lock(lock, &prev);
@@ -1138,29 +1151,16 @@ void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list)
 }
 
 /**
- * Describe the overlap between two locks.  itree_overlap_cb data.
- */
-struct lock_match_data {
-       struct ldlm_lock        *lmd_old;
-       struct ldlm_lock        *lmd_lock;
-       enum ldlm_mode          *lmd_mode;
-       union ldlm_policy_data  *lmd_policy;
-       __u64                    lmd_flags;
-       __u64                    lmd_skip_flags;
-       int                      lmd_unref;
-};
-
-/**
  * Check if the given @lock meets the criteria for a match.
  * A reference on the lock is taken if matched.
  *
  * \param lock     test-against this lock
  * \param data    parameters
  */
-static int lock_matches(struct ldlm_lock *lock, struct lock_match_data *data)
+static int lock_matches(struct ldlm_lock *lock, struct ldlm_match_data *data)
 {
        union ldlm_policy_data *lpol = &lock->l_policy_data;
-       enum ldlm_mode match;
+       enum ldlm_mode match = LCK_MINMODE;
 
        if (lock == data->lmd_old)
                return INTERVAL_ITER_STOP;
@@ -1185,6 +1185,17 @@ static int lock_matches(struct ldlm_lock *lock, struct lock_match_data *data)
 
        if (!(lock->l_req_mode & *data->lmd_mode))
                return INTERVAL_ITER_CONT;
+
+       /* When we search for ast_data, we are not doing a traditional match,
+        * so we don't worry about IBITS or extent matching.
+        */
+       if (data->lmd_has_ast_data) {
+               if (!lock->l_ast_data)
+                       return INTERVAL_ITER_CONT;
+
+               goto matched;
+       }
+
        match = lock->l_req_mode;
 
        switch (lock->l_resource->lr_type) {
@@ -1222,6 +1233,7 @@ static int lock_matches(struct ldlm_lock *lock, struct lock_match_data *data)
        if (data->lmd_skip_flags & lock->l_flags)
                return INTERVAL_ITER_CONT;
 
+matched:
        if (data->lmd_flags & LDLM_FL_TEST_LOCK) {
                LDLM_LOCK_GET(lock);
                ldlm_lock_touch_in_lru(lock);
@@ -1238,7 +1250,7 @@ static int lock_matches(struct ldlm_lock *lock, struct lock_match_data *data)
 static unsigned int itree_overlap_cb(struct interval_node *in, void *args)
 {
        struct ldlm_interval *node = to_ldlm_interval(in);
-       struct lock_match_data *data = args;
+       struct ldlm_match_data *data = args;
        struct ldlm_lock *lock;
        int rc;
 
@@ -1258,8 +1270,8 @@ static unsigned int itree_overlap_cb(struct interval_node *in, void *args)
  *
  * \retval a referenced lock or NULL.
  */
-static struct ldlm_lock *search_itree(struct ldlm_resource *res,
-                                     struct lock_match_data *data)
+struct ldlm_lock *search_itree(struct ldlm_resource *res,
+                              struct ldlm_match_data *data)
 {
        struct interval_node_extent ext = {
                .start     = data->lmd_policy->l_extent.start,
@@ -1286,6 +1298,7 @@ static struct ldlm_lock *search_itree(struct ldlm_resource *res,
 
        return NULL;
 }
+EXPORT_SYMBOL(search_itree);
 
 
 /**
@@ -1297,7 +1310,7 @@ static struct ldlm_lock *search_itree(struct ldlm_resource *res,
  * \retval a referenced lock or NULL.
  */
 static struct ldlm_lock *search_queue(struct list_head *queue,
-                                     struct lock_match_data *data)
+                                     struct ldlm_match_data *data)
 {
        struct ldlm_lock *lock;
        int rc;
@@ -1393,7 +1406,7 @@ enum ldlm_mode ldlm_lock_match_with_skip(struct ldlm_namespace *ns,
                                         enum ldlm_mode mode,
                                         struct lustre_handle *lockh, int unref)
 {
-       struct lock_match_data data = {
+       struct ldlm_match_data data = {
                .lmd_old = NULL,
                .lmd_lock = NULL,
                .lmd_mode = &mode,
@@ -1401,6 +1414,7 @@ enum ldlm_mode ldlm_lock_match_with_skip(struct ldlm_namespace *ns,
                .lmd_flags = flags,
                .lmd_skip_flags = skip_flags,
                .lmd_unref = unref,
+               .lmd_has_ast_data = false,
        };
        struct ldlm_resource *res;
        struct ldlm_lock *lock;
@@ -1443,7 +1457,6 @@ enum ldlm_mode ldlm_lock_match_with_skip(struct ldlm_namespace *ns,
                    (!ldlm_is_lvb_ready(lock))) {
                        __u64 wait_flags = LDLM_FL_LVB_READY |
                                LDLM_FL_DESTROYED | LDLM_FL_FAIL_NOTIFIED;
-                       struct l_wait_info lwi;
 
                        if (lock->l_completion_ast) {
                                int err = lock->l_completion_ast(lock,
@@ -1453,12 +1466,11 @@ enum ldlm_mode ldlm_lock_match_with_skip(struct ldlm_namespace *ns,
                                        GOTO(out_fail_match, matched = 0);
                        }
 
-                       lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(obd_timeout),
-                                              NULL, LWI_ON_SIGNAL_NOOP, NULL);
+                       wait_event_idle_timeout(
+                               lock->l_waitq,
+                               lock->l_flags & wait_flags,
+                               cfs_time_seconds(obd_timeout));
 
-                       /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
-                       l_wait_event(lock->l_waitq, lock->l_flags & wait_flags,
-                                    &lwi);
                        if (!ldlm_is_lvb_ready(lock))
                                GOTO(out_fail_match, matched = 0);
                }
@@ -1668,11 +1680,18 @@ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
                lock->l_glimpse_ast = cbs->lcs_glimpse;
        }
 
-       lock->l_tree_node = NULL;
-       /* if this is the extent lock, allocate the interval tree node */
-       if (type == LDLM_EXTENT)
-               if (ldlm_interval_alloc(lock) == NULL)
-                       GOTO(out, rc = -ENOMEM);
+       switch (type) {
+       case LDLM_EXTENT:
+               rc = ldlm_extent_alloc_lock(lock);
+               break;
+       case LDLM_IBITS:
+               rc = ldlm_inodebits_alloc_lock(lock);
+               break;
+       default:
+               rc = 0;
+       }
+       if (rc)
+               GOTO(out, rc);
 
        if (lvb_len) {
                lock->l_lvb_len = lvb_len;
@@ -1699,11 +1718,12 @@ static enum ldlm_error ldlm_lock_enqueue_helper(struct ldlm_lock *lock,
 {
        struct ldlm_resource *res = lock->l_resource;
        enum ldlm_error rc = ELDLM_OK;
-       struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
+       LIST_HEAD(rpc_list);
        ldlm_processing_policy policy;
+
        ENTRY;
 
-       policy = ldlm_processing_policy_table[res->lr_type];
+       policy = ldlm_get_processing_policy(res);
 restart:
        policy(lock, flags, LDLM_PROCESS_ENQUEUE, &rc, &rpc_list);
        if (rc == ELDLM_OK && lock->l_granted_mode != lock->l_req_mode &&
@@ -1740,6 +1760,9 @@ enum ldlm_error ldlm_lock_enqueue(const struct lu_env *env,
        int local = ns_is_client(ldlm_res_to_ns(res));
        enum ldlm_error rc = ELDLM_OK;
        struct ldlm_interval *node = NULL;
+#ifdef HAVE_SERVER_SUPPORT
+       bool reconstruct = false;
+#endif
        ENTRY;
 
         /* policies are not executed on the client or during replay */
@@ -1759,7 +1782,7 @@ enum ldlm_error ldlm_lock_enqueue(const struct lu_env *env,
                         *flags |= LDLM_FL_LOCK_CHANGED;
                         RETURN(0);
                } else if (rc != ELDLM_OK &&
-                          lock->l_req_mode == lock->l_granted_mode) {
+                          ldlm_is_granted(lock)) {
                        LASSERT(*flags & LDLM_FL_RESENT);
                        /* It may happen that ns_policy returns an error in
                         * resend case, object may be unlinked or just some
@@ -1782,7 +1805,7 @@ enum ldlm_error ldlm_lock_enqueue(const struct lu_env *env,
                 * Take NO_TIMEOUT from the lock as it is inherited through
                 * LDLM_FL_INHERIT_MASK */
                *flags |= LDLM_FL_LOCK_CHANGED;
-               if (lock->l_req_mode != lock->l_granted_mode)
+               if (!ldlm_is_granted(lock))
                        *flags |= LDLM_FL_BLOCK_GRANTED;
                *flags |= lock->l_flags & LDLM_FL_NO_TIMEOUT;
                RETURN(ELDLM_OK);
@@ -1795,8 +1818,21 @@ enum ldlm_error ldlm_lock_enqueue(const struct lu_env *env,
        if (!local && (*flags & LDLM_FL_REPLAY) && res->lr_type == LDLM_EXTENT)
                OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS);
 
-        lock_res_and_lock(lock);
-        if (local && lock->l_req_mode == lock->l_granted_mode) {
+#ifdef HAVE_SERVER_SUPPORT
+       reconstruct = !local && res->lr_type == LDLM_FLOCK &&
+                     !(*flags & LDLM_FL_TEST_LOCK);
+       if (reconstruct) {
+               rc = req_can_reconstruct(cookie, NULL);
+               if (rc != 0) {
+                       if (rc == 1)
+                               rc = 0;
+                       RETURN(rc);
+               }
+       }
+#endif
+
+       lock_res_and_lock(lock);
+       if (local && ldlm_is_granted(lock)) {
                 /* The server returned a blocked lock, but it was granted
                  * before we got a chance to actually enqueue it.  We don't
                  * need to do anything else. */
@@ -1868,6 +1904,16 @@ enum ldlm_error ldlm_lock_enqueue(const struct lu_env *env,
 
 out:
         unlock_res_and_lock(lock);
+
+#ifdef HAVE_SERVER_SUPPORT
+       if (reconstruct) {
+               struct ptlrpc_request *req = cookie;
+
+               tgt_mk_reply_data(NULL, NULL,
+                                 &req->rq_export->exp_target_data,
+                                 req, 0, NULL, false, 0);
+       }
+#endif
         if (node)
                 OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));
         return rc;
@@ -1882,19 +1928,21 @@ out:
  */
 int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
                         struct list_head *work_list,
-                        enum ldlm_process_intention intention)
+                        enum ldlm_process_intention intention,
+                        struct ldlm_lock *hint)
 {
        struct list_head *tmp, *pos;
        ldlm_processing_policy policy;
        __u64 flags;
        int rc = LDLM_ITER_CONTINUE;
        enum ldlm_error err;
-       struct list_head bl_ast_list = LIST_HEAD_INIT(bl_ast_list);
+       LIST_HEAD(bl_ast_list);
+
        ENTRY;
 
        check_res_locked(res);
 
-       policy = ldlm_processing_policy_table[res->lr_type];
+       policy = ldlm_get_processing_policy(res);
        LASSERT(policy);
        LASSERT(intention == LDLM_PROCESS_RESCAN ||
                intention == LDLM_PROCESS_RECOVERY);
@@ -1902,7 +1950,7 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
 restart:
        list_for_each_safe(tmp, pos, queue) {
                struct ldlm_lock *pending;
-               struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
+               LIST_HEAD(rpc_list);
 
                pending = list_entry(tmp, struct ldlm_lock, l_res_link);
 
@@ -1993,7 +2041,7 @@ int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags,
                        RETURN(-EAGAIN);
 
                /* lock was granted while resource was unlocked. */
-               if (lock->l_granted_mode == lock->l_req_mode) {
+               if (ldlm_is_granted(lock)) {
                        /* bug 11300: if the lock has been granted,
                         * break earlier because otherwise, we will go
                         * to restart and ldlm_resource_unlink will be
@@ -2156,8 +2204,11 @@ int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
        arg->gl_interpret_data = gl_work->gl_interpret_data;
 
        /* invoke the actual glimpse callback */
-       if (lock->l_glimpse_ast(lock, (void*)arg) == 0)
-               rc = 1;
+       rc = lock->l_glimpse_ast(lock, (void *)arg);
+       if (rc == 0)
+               rc = 1; /* update LVB if this is server lock */
+       else if (rc == -ELDLM_NO_LOCK_DATA)
+               ldlm_lvbo_update(lock->l_resource, lock, NULL, 1);
 
        LDLM_LOCK_RELEASE(lock);
        if (gl_work->gl_flags & LDLM_GL_WORK_SLAB_ALLOCATED)
@@ -2289,20 +2340,22 @@ out:
  * if anything could be granted as a result of the cancellation.
  */
 static void __ldlm_reprocess_all(struct ldlm_resource *res,
-                                enum ldlm_process_intention intention)
+                                enum ldlm_process_intention intention,
+                                struct ldlm_lock *hint)
 {
-       struct list_head rpc_list;
+       LIST_HEAD(rpc_list);
 #ifdef HAVE_SERVER_SUPPORT
+       ldlm_reprocessing_policy reprocess;
        struct obd_device *obd;
-        int rc;
-        ENTRY;
+       int rc;
 
-       INIT_LIST_HEAD(&rpc_list);
-        /* Local lock trees don't get reprocessed. */
-        if (ns_is_client(ldlm_res_to_ns(res))) {
-                EXIT;
-                return;
-        }
+       ENTRY;
+
+       /* Local lock trees don't get reprocessed. */
+       if (ns_is_client(ldlm_res_to_ns(res))) {
+               EXIT;
+               return;
+       }
 
        /* Disable reprocess during lock replay stage but allow during
         * request replay stage.
@@ -2313,7 +2366,8 @@ static void __ldlm_reprocess_all(struct ldlm_resource *res,
                RETURN_EXIT;
 restart:
        lock_res(res);
-       ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list, intention);
+       reprocess = ldlm_get_reprocessing_policy(res);
+       reprocess(res, &res->lr_waiting, &rpc_list, intention, hint);
        unlock_res(res);
 
        rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
@@ -2323,21 +2377,20 @@ restart:
                goto restart;
        }
 #else
-        ENTRY;
+       ENTRY;
 
-       INIT_LIST_HEAD(&rpc_list);
-        if (!ns_is_client(ldlm_res_to_ns(res))) {
-                CERROR("This is client-side-only module, cannot handle "
-                       "LDLM_NAMESPACE_SERVER resource type lock.\n");
-                LBUG();
-        }
+       if (!ns_is_client(ldlm_res_to_ns(res))) {
+               CERROR("This is client-side-only module, cannot handle "
+                      "LDLM_NAMESPACE_SERVER resource type lock.\n");
+               LBUG();
+       }
 #endif
-        EXIT;
+       EXIT;
 }
 
-void ldlm_reprocess_all(struct ldlm_resource *res)
+void ldlm_reprocess_all(struct ldlm_resource *res, struct ldlm_lock *hint)
 {
-       __ldlm_reprocess_all(res, LDLM_PROCESS_RESCAN);
+       __ldlm_reprocess_all(res, LDLM_PROCESS_RESCAN, hint);
 }
 EXPORT_SYMBOL(ldlm_reprocess_all);
 
@@ -2347,7 +2400,7 @@ static int ldlm_reprocess_res(struct cfs_hash *hs, struct cfs_hash_bd *bd,
        struct ldlm_resource *res = cfs_hash_object(hs, hnode);
 
        /* This is only called once after recovery done. LU-8306. */
-       __ldlm_reprocess_all(res, LDLM_PROCESS_RECOVERY);
+       __ldlm_reprocess_all(res, LDLM_PROCESS_RECOVERY, NULL);
        return 0;
 }
 
@@ -2376,24 +2429,22 @@ void ldlm_cancel_callback(struct ldlm_lock *lock)
        if (!ldlm_is_cancel(lock)) {
                ldlm_set_cancel(lock);
                if (lock->l_blocking_ast) {
-                        unlock_res_and_lock(lock);
-                        lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
-                                             LDLM_CB_CANCELING);
-                        lock_res_and_lock(lock);
-                } else {
-                        LDLM_DEBUG(lock, "no blocking ast");
-                }
+                       unlock_res_and_lock(lock);
+                       lock->l_blocking_ast(lock, NULL, lock->l_ast_data,
+                                            LDLM_CB_CANCELING);
+                       lock_res_and_lock(lock);
+               } else {
+                       LDLM_DEBUG(lock, "no blocking ast");
+               }
 
                /* only canceller can set bl_done bit */
                ldlm_set_bl_done(lock);
                wake_up_all(&lock->l_waitq);
        } else if (!ldlm_is_bl_done(lock)) {
-               struct l_wait_info lwi = { 0 };
-
                /* The lock is guaranteed to have been canceled once
                 * returning from this function. */
                unlock_res_and_lock(lock);
-               l_wait_event(lock->l_waitq, is_bl_done(lock), &lwi);
+               wait_event_idle(lock->l_waitq, is_bl_done(lock));
                lock_res_and_lock(lock);
        }
 }
@@ -2447,8 +2498,8 @@ void ldlm_lock_cancel(struct ldlm_lock *lock)
         ldlm_resource_unlink_lock(lock);
         ldlm_lock_destroy_nolock(lock);
 
-        if (lock->l_granted_mode == lock->l_req_mode)
-                ldlm_pool_del(&ns->ns_pool, lock);
+       if (ldlm_is_granted(lock))
+               ldlm_pool_del(&ns->ns_pool, lock);
 
         /* Make sure we will not be called again for same lock what is possible
          * if not to zero out lock->l_granted_mode */
@@ -2493,10 +2544,10 @@ static void ldlm_cancel_lock_for_export(struct obd_export *exp,
 
        res = ldlm_resource_getref(lock->l_resource);
 
-       ldlm_lvbo_update(ecl->ecl_env, res, lock, NULL, 1);
+       ldlm_lvbo_update(res, lock, NULL, 1);
        ldlm_lock_cancel(lock);
        if (!exp->exp_obd->obd_stopping)
-               ldlm_reprocess_all(res);
+               ldlm_reprocess_all(res, lock);
        ldlm_resource_putref(res);
 
        ecl->ecl_loop++;
@@ -2611,7 +2662,7 @@ int ldlm_export_cancel_locks(struct obd_export *exp)
 }
 
 /**
- * Downgrade an PW/EX lock to COS mode.
+ * Downgrade an PW/EX lock to COS | CR mode.
  *
  * A lock mode convertion from PW/EX mode to less conflict mode. The
  * convertion may fail if lock was canceled before downgrade, but it doesn't
@@ -2623,6 +2674,8 @@ int ldlm_export_cancel_locks(struct obd_export *exp)
  * things are cleared, so any pending or new blocked lock on that lock will
  * cause new call to blocking_ast and force resource object commit.
  *
+ * Also used by layout_change to replace EX lock to CR lock.
+ *
  * \param lock A lock to convert
  * \param new_mode new lock mode
  */
@@ -2631,7 +2684,7 @@ void ldlm_lock_mode_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode)
 #ifdef HAVE_SERVER_SUPPORT
        ENTRY;
 
-       LASSERT(new_mode == LCK_COS);
+       LASSERT(new_mode == LCK_COS || new_mode == LCK_CR);
 
        lock_res_and_lock(lock);
 
@@ -2659,7 +2712,7 @@ void ldlm_lock_mode_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode)
        ldlm_grant_lock(lock, NULL);
        unlock_res_and_lock(lock);
 
-       ldlm_reprocess_all(lock->l_resource);
+       ldlm_reprocess_all(lock->l_resource, lock);
 
        EXIT;
 #endif
@@ -2699,6 +2752,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock,
         va_list args;
         struct obd_export *exp = lock->l_export;
        struct ldlm_resource *resource = NULL;
+       struct va_format vaf;
         char *nid = "local";
 
        /* on server-side resource of lock doesn't change */
@@ -2712,6 +2766,8 @@ void _ldlm_lock_debug(struct ldlm_lock *lock,
        }
 
         va_start(args, fmt);
+       vaf.fmt = fmt;
+       vaf.va = &args;
 
         if (exp && exp->exp_connection) {
                nid = obd_export_nid2str(exp);
@@ -2721,111 +2777,110 @@ void _ldlm_lock_debug(struct ldlm_lock *lock,
         }
 
         if (resource == NULL) {
-                libcfs_debug_vmsg2(msgdata, fmt, args,
-                      " ns: \?\? lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s "
-                      "res: \?\? rrc=\?\? type: \?\?\? flags: %#llx nid: %s "
-                      "remote: %#llx expref: %d pid: %u timeout: %lld "
-                      "lvb_type: %d\n",
-                       lock,
-                      lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
-                       lock->l_readers, lock->l_writers,
-                       ldlm_lockname[lock->l_granted_mode],
-                       ldlm_lockname[lock->l_req_mode],
-                       lock->l_flags, nid, lock->l_remote_handle.cookie,
-                      exp ? atomic_read(&exp->exp_refcount) : -99,
-                       lock->l_pid, lock->l_callback_timeout, lock->l_lvb_type);
+               libcfs_debug_msg(msgdata,
+                                "%pV ns: \?\? lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: \?\? rrc=\?\? type: \?\?\? flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lld lvb_type: %d\n",
+                                &vaf,
+                                lock,
+                                lock->l_handle.h_cookie,
+                                refcount_read(&lock->l_handle.h_ref),
+                                lock->l_readers, lock->l_writers,
+                                ldlm_lockname[lock->l_granted_mode],
+                                ldlm_lockname[lock->l_req_mode],
+                                lock->l_flags, nid,
+                                lock->l_remote_handle.cookie,
+                                exp ? refcount_read(&exp->exp_handle.h_ref) : -99,
+                                lock->l_pid, lock->l_callback_timeout,
+                                lock->l_lvb_type);
                 va_end(args);
                 return;
         }
 
        switch (resource->lr_type) {
        case LDLM_EXTENT:
-               libcfs_debug_vmsg2(msgdata, fmt, args,
-                       " ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s "
-                       "res: "DLDLMRES" rrc: %d type: %s [%llu->%llu] "
-                       "(req %llu->%llu) flags: %#llx nid: %s remote: "
-                       "%#llx expref: %d pid: %u timeout: %lld lvb_type: %d\n",
-                       ldlm_lock_to_ns_name(lock), lock,
-                       lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
-                       lock->l_readers, lock->l_writers,
-                       ldlm_lockname[lock->l_granted_mode],
-                       ldlm_lockname[lock->l_req_mode],
-                       PLDLMRES(resource),
-                       atomic_read(&resource->lr_refcount),
-                       ldlm_typename[resource->lr_type],
-                       lock->l_policy_data.l_extent.start,
-                       lock->l_policy_data.l_extent.end,
-                       lock->l_req_extent.start, lock->l_req_extent.end,
-                       lock->l_flags, nid, lock->l_remote_handle.cookie,
-                       exp ? atomic_read(&exp->exp_refcount) : -99,
-                       lock->l_pid, lock->l_callback_timeout,
-                       lock->l_lvb_type);
+               libcfs_debug_msg(msgdata,
+                                "%pV ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: " DLDLMRES " rrc: %d type: %s [%llu->%llu] (req %llu->%llu) flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lld lvb_type: %d\n",
+                                &vaf,
+                                ldlm_lock_to_ns_name(lock), lock,
+                                lock->l_handle.h_cookie,
+                                refcount_read(&lock->l_handle.h_ref),
+                                lock->l_readers, lock->l_writers,
+                                ldlm_lockname[lock->l_granted_mode],
+                                ldlm_lockname[lock->l_req_mode],
+                                PLDLMRES(resource),
+                                atomic_read(&resource->lr_refcount),
+                                ldlm_typename[resource->lr_type],
+                                lock->l_policy_data.l_extent.start,
+                                lock->l_policy_data.l_extent.end,
+                                lock->l_req_extent.start, lock->l_req_extent.end,
+                                lock->l_flags, nid,
+                                lock->l_remote_handle.cookie,
+                                exp ? refcount_read(&exp->exp_handle.h_ref) : -99,
+                                lock->l_pid, lock->l_callback_timeout,
+                                lock->l_lvb_type);
                break;
 
        case LDLM_FLOCK:
-               libcfs_debug_vmsg2(msgdata, fmt, args,
-                       " ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s "
-                       "res: "DLDLMRES" rrc: %d type: %s pid: %d "
-                       "[%llu->%llu] flags: %#llx nid: %s "
-                       "remote: %#llx expref: %d pid: %u timeout: %lld\n",
-                       ldlm_lock_to_ns_name(lock), lock,
-                       lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
-                       lock->l_readers, lock->l_writers,
-                       ldlm_lockname[lock->l_granted_mode],
-                       ldlm_lockname[lock->l_req_mode],
-                       PLDLMRES(resource),
-                       atomic_read(&resource->lr_refcount),
-                       ldlm_typename[resource->lr_type],
-                       lock->l_policy_data.l_flock.pid,
-                       lock->l_policy_data.l_flock.start,
-                       lock->l_policy_data.l_flock.end,
-                       lock->l_flags, nid, lock->l_remote_handle.cookie,
-                       exp ? atomic_read(&exp->exp_refcount) : -99,
-                       lock->l_pid, lock->l_callback_timeout);
+               libcfs_debug_msg(msgdata,
+                                "%pV ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: " DLDLMRES " rrc: %d type: %s pid: %d [%llu->%llu] flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lld\n",
+                                &vaf,
+                                ldlm_lock_to_ns_name(lock), lock,
+                                lock->l_handle.h_cookie,
+                                refcount_read(&lock->l_handle.h_ref),
+                                lock->l_readers, lock->l_writers,
+                                ldlm_lockname[lock->l_granted_mode],
+                                ldlm_lockname[lock->l_req_mode],
+                                PLDLMRES(resource),
+                                atomic_read(&resource->lr_refcount),
+                                ldlm_typename[resource->lr_type],
+                                lock->l_policy_data.l_flock.pid,
+                                lock->l_policy_data.l_flock.start,
+                                lock->l_policy_data.l_flock.end,
+                                lock->l_flags, nid,
+                                lock->l_remote_handle.cookie,
+                                exp ? refcount_read(&exp->exp_handle.h_ref) : -99,
+                                lock->l_pid, lock->l_callback_timeout);
                break;
 
        case LDLM_IBITS:
-               libcfs_debug_vmsg2(msgdata, fmt, args,
-                       " ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s "
-                       "res: "DLDLMRES" bits %#llx/%#llx rrc: %d type: %s "
-                       "flags: %#llx nid: %s remote: %#llx expref: %d "
-                       "pid: %u timeout: %lld lvb_type: %d\n",
-                       ldlm_lock_to_ns_name(lock),
-                       lock, lock->l_handle.h_cookie,
-                       atomic_read(&lock->l_refc),
-                       lock->l_readers, lock->l_writers,
-                       ldlm_lockname[lock->l_granted_mode],
-                       ldlm_lockname[lock->l_req_mode],
-                       PLDLMRES(resource),
-                       lock->l_policy_data.l_inodebits.bits,
-                       lock->l_policy_data.l_inodebits.try_bits,
-                       atomic_read(&resource->lr_refcount),
-                       ldlm_typename[resource->lr_type],
-                       lock->l_flags, nid, lock->l_remote_handle.cookie,
-                       exp ? atomic_read(&exp->exp_refcount) : -99,
-                       lock->l_pid, lock->l_callback_timeout,
-                       lock->l_lvb_type);
+               libcfs_debug_msg(msgdata,
+                                "%pV ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: " DLDLMRES " bits %#llx/%#llx rrc: %d type: %s flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lld lvb_type: %d\n",
+                                &vaf,
+                                ldlm_lock_to_ns_name(lock),
+                                lock, lock->l_handle.h_cookie,
+                                refcount_read(&lock->l_handle.h_ref),
+                                lock->l_readers, lock->l_writers,
+                                ldlm_lockname[lock->l_granted_mode],
+                                ldlm_lockname[lock->l_req_mode],
+                                PLDLMRES(resource),
+                                lock->l_policy_data.l_inodebits.bits,
+                                lock->l_policy_data.l_inodebits.try_bits,
+                                atomic_read(&resource->lr_refcount),
+                                ldlm_typename[resource->lr_type],
+                                lock->l_flags, nid,
+                                lock->l_remote_handle.cookie,
+                                exp ? refcount_read(&exp->exp_handle.h_ref) : -99,
+                                lock->l_pid, lock->l_callback_timeout,
+                                lock->l_lvb_type);
                break;
 
        default:
-               libcfs_debug_vmsg2(msgdata, fmt, args,
-                       " ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s "
-                       "res: "DLDLMRES" rrc: %d type: %s flags: %#llx "
-                       "nid: %s remote: %#llx expref: %d pid: %u "
-                       "timeout: %lld lvb_type: %d\n",
-                       ldlm_lock_to_ns_name(lock),
-                       lock, lock->l_handle.h_cookie,
-                       atomic_read(&lock->l_refc),
-                       lock->l_readers, lock->l_writers,
-                       ldlm_lockname[lock->l_granted_mode],
-                       ldlm_lockname[lock->l_req_mode],
-                       PLDLMRES(resource),
-                       atomic_read(&resource->lr_refcount),
-                       ldlm_typename[resource->lr_type],
-                       lock->l_flags, nid, lock->l_remote_handle.cookie,
-                       exp ? atomic_read(&exp->exp_refcount) : -99,
-                       lock->l_pid, lock->l_callback_timeout,
-                       lock->l_lvb_type);
+               libcfs_debug_msg(msgdata,
+                                "%pV ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: " DLDLMRES " rrc: %d type: %s flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lld lvb_type: %d\n",
+                                &vaf,
+                                ldlm_lock_to_ns_name(lock),
+                                lock, lock->l_handle.h_cookie,
+                                refcount_read(&lock->l_handle.h_ref),
+                                lock->l_readers, lock->l_writers,
+                                ldlm_lockname[lock->l_granted_mode],
+                                ldlm_lockname[lock->l_req_mode],
+                                PLDLMRES(resource),
+                                atomic_read(&resource->lr_refcount),
+                                ldlm_typename[resource->lr_type],
+                                lock->l_flags, nid,
+                                lock->l_remote_handle.cookie,
+                                exp ? refcount_read(&exp->exp_handle.h_ref) : -99,
+                                lock->l_pid, lock->l_callback_timeout,
+                                lock->l_lvb_type);
                break;
        }
        va_end(args);