Whamcloud - gitweb
LU-12931 ldlm: use proper units for timeouts
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
index 65509be..9b16cb6 100644 (file)
@@ -40,6 +40,7 @@
 #include <linux/kthread.h>
 #include <linux/list.h>
 #include <libcfs/libcfs.h>
+#include <libcfs/linux/linux-mem.h>
 #include <lustre_errno.h>
 #include <lustre_dlm.h>
 #include <obd_class.h>
@@ -72,12 +73,12 @@ static struct ldlm_state *ldlm_state;
 /*
  * timeout for initial callback (AST) reply (bz10399)
  * Due to having to send a 32 bit time value over the
- * wire return it as time_t instead of time64_t
+ * wire return it as timeout_t instead of time64_t
  */
-static inline time_t ldlm_get_rq_timeout(void)
+static inline timeout_t ldlm_get_rq_timeout(void)
 {
        /* Non-AT value */
-       time_t timeout = min(ldlm_timeout, obd_timeout / 3);
+       timeout_t timeout = min(ldlm_timeout, obd_timeout / 3);
 
        return timeout < 1 ? 1 : timeout;
 }
@@ -152,8 +153,8 @@ static int expired_lock_dump;
 static LIST_HEAD(expired_lock_list);
 
 static int ldlm_lock_busy(struct ldlm_lock *lock);
-static int ldlm_add_waiting_lock(struct ldlm_lock *lock, time64_t timeout);
-static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, time64_t seconds);
+static int ldlm_add_waiting_lock(struct ldlm_lock *lock, timeout_t timeout);
+static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, timeout_t timeout);
 
 static inline int have_expired_locks(void)
 {
@@ -173,7 +174,6 @@ static inline int have_expired_locks(void)
 static int expired_lock_main(void *arg)
 {
        struct list_head *expired = &expired_lock_list;
-       struct l_wait_info lwi = { 0 };
        int do_dump;
 
        ENTRY;
@@ -182,10 +182,9 @@ static int expired_lock_main(void *arg)
        wake_up(&expired_lock_wait_queue);
 
        while (1) {
-               l_wait_event(expired_lock_wait_queue,
-                            have_expired_locks() ||
-                            expired_lock_thread_state == ELT_TERMINATE,
-                            &lwi);
+               wait_event_idle(expired_lock_wait_queue,
+                               have_expired_locks() ||
+                               expired_lock_thread_state == ELT_TERMINATE);
 
                spin_lock_bh(&waiting_locks_spinlock);
                if (expired_lock_dump) {
@@ -242,7 +241,7 @@ static int expired_lock_main(void *arg)
 
                        /* Check if we need to prolong timeout */
                        if (!OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT) &&
-                           lock->l_callback_timeout != 0 && /* not AST error */
+                           lock->l_callback_timestamp != 0 && /* not AST error */
                            ldlm_lock_busy(lock)) {
                                LDLM_DEBUG(lock, "prolong the busy lock");
                                lock_res_and_lock(lock);
@@ -325,7 +324,7 @@ static void waiting_locks_callback(TIMER_DATA_TYPE unused)
        while (!list_empty(&waiting_locks_list)) {
                lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
                                  l_pending_chain);
-               if (lock->l_callback_timeout > ktime_get_seconds() ||
+               if (lock->l_callback_timestamp > ktime_get_seconds() ||
                    lock->l_req_mode == LCK_GROUP)
                        break;
 
@@ -334,8 +333,7 @@ static void waiting_locks_callback(TIMER_DATA_TYPE unused)
                 * the waiting_locks_list and ldlm_add_waiting_lock()
                 * already grabbed a ref
                 */
-               list_del(&lock->l_pending_chain);
-               list_add(&lock->l_pending_chain, &expired_lock_list);
+               list_move(&lock->l_pending_chain, &expired_lock_list);
                need_dump = 1;
        }
 
@@ -351,12 +349,15 @@ static void waiting_locks_callback(TIMER_DATA_TYPE unused)
         * left.
         */
        if (!list_empty(&waiting_locks_list)) {
-               unsigned long timeout_jiffies;
+               time64_t now = ktime_get_seconds();
+               timeout_t delta = 0;
 
                lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
                                  l_pending_chain);
-               timeout_jiffies = cfs_time_seconds(lock->l_callback_timeout);
-               mod_timer(&waiting_locks_timer, timeout_jiffies);
+               if (lock->l_callback_timestamp - now > 0)
+                       delta = lock->l_callback_timestamp - now;
+               mod_timer(&waiting_locks_timer,
+                         jiffies + cfs_time_seconds(delta));
        }
        spin_unlock_bh(&waiting_locks_spinlock);
 }
@@ -373,23 +374,27 @@ static void waiting_locks_callback(TIMER_DATA_TYPE unused)
  *
  * Called with the namespace lock held.
  */
-static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, time64_t seconds)
+static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, timeout_t delay)
 {
-       unsigned long timeout_jiffies;
-       time64_t timeout;
+       unsigned long timeout_jiffies = jiffies;
+       time64_t now = ktime_get_seconds();
+       time64_t deadline;
+       timeout_t timeout;
 
        if (!list_empty(&lock->l_pending_chain))
                return 0;
 
        if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT) ||
            OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT))
-               seconds = 1;
+               delay = 1;
 
-       timeout = ktime_get_seconds() + seconds;
-       if (likely(timeout > lock->l_callback_timeout))
-               lock->l_callback_timeout = timeout;
+       deadline = now + delay;
+       if (likely(deadline > lock->l_callback_timestamp))
+               lock->l_callback_timestamp = deadline;
 
-       timeout_jiffies = cfs_time_seconds(lock->l_callback_timeout);
+       timeout = clamp_t(timeout_t, lock->l_callback_timestamp - now,
+                         0, delay);
+       timeout_jiffies += cfs_time_seconds(timeout);
 
        if (time_before(timeout_jiffies, waiting_locks_timer.expires) ||
            !timer_pending(&waiting_locks_timer))
@@ -428,7 +433,7 @@ static void ldlm_add_blocked_lock(struct ldlm_lock *lock)
                obd_stale_export_adjust(lock->l_export);
 }
 
-static int ldlm_add_waiting_lock(struct ldlm_lock *lock, time64_t timeout)
+static int ldlm_add_waiting_lock(struct ldlm_lock *lock, timeout_t timeout)
 {
        int ret;
 
@@ -477,7 +482,7 @@ static int ldlm_add_waiting_lock(struct ldlm_lock *lock, time64_t timeout)
        if (ret)
                ldlm_add_blocked_lock(lock);
 
-       LDLM_DEBUG(lock, "%sadding to wait list(timeout: %lld, AT: %s)",
+       LDLM_DEBUG(lock, "%sadding to wait list(timeout: %d, AT: %s)",
                   ret == 0 ? "not re-" : "", timeout,
                   AT_OFF ? "off" : "on");
        return ret;
@@ -506,12 +511,17 @@ static int __ldlm_del_waiting_lock(struct ldlm_lock *lock)
                        /* No more, just cancel. */
                        del_timer(&waiting_locks_timer);
                } else {
+                       time64_t now = ktime_get_seconds();
                        struct ldlm_lock *next;
+                       timeout_t delta = 0;
 
                        next = list_entry(list_next, struct ldlm_lock,
                                          l_pending_chain);
+                       if (next->l_callback_timestamp - now > 0)
+                               delta = lock->l_callback_timestamp - now;
+
                        mod_timer(&waiting_locks_timer,
-                                 cfs_time_seconds(next->l_callback_timeout));
+                                 jiffies + cfs_time_seconds(delta));
                }
        }
        list_del_init(&lock->l_pending_chain);
@@ -556,7 +566,7 @@ int ldlm_del_waiting_lock(struct ldlm_lock *lock)
  *
  * Called with namespace lock held.
  */
-int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, time64_t timeout)
+int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, timeout_t timeout)
 {
        if (lock->l_export == NULL) {
                /* We don't have a "waiting locks list" on clients. */
@@ -598,7 +608,7 @@ int ldlm_del_waiting_lock(struct ldlm_lock *lock)
        RETURN(0);
 }
 
-int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, time64_t timeout)
+int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, timeout_t timeout)
 {
        RETURN(0);
 }
@@ -616,9 +626,9 @@ int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, time64_t timeout)
  *
  * \retval            timeout in seconds to wait for the client reply
  */
-time64_t ldlm_bl_timeout(struct ldlm_lock *lock)
+timeout_t ldlm_bl_timeout(struct ldlm_lock *lock)
 {
-       time64_t timeout;
+       timeout_t timeout;
 
        if (AT_OFF)
                return obd_timeout / 2;
@@ -630,7 +640,8 @@ time64_t ldlm_bl_timeout(struct ldlm_lock *lock)
         * lock callbacks too...
         */
        timeout = at_get(&lock->l_export->exp_bl_lock_at);
-       return max(timeout + (timeout >> 1), (time64_t)ldlm_enqueue_min);
+       return max_t(timeout_t, timeout + (timeout >> 1),
+                    (timeout_t)ldlm_enqueue_min);
 }
 EXPORT_SYMBOL(ldlm_bl_timeout);
 
@@ -654,7 +665,8 @@ static void ldlm_failed_ast(struct ldlm_lock *lock, int rc,
                 * the lock to the expired list
                 */
                LDLM_LOCK_GET(lock);
-       lock->l_callback_timeout = 0; /* differentiate it from expired locks */
+       /* differentiate it from expired locks */
+       lock->l_callback_timestamp = 0;
        list_add(&lock->l_pending_chain, &expired_lock_list);
        wake_up(&expired_lock_wait_queue);
        spin_unlock_bh(&waiting_locks_spinlock);
@@ -1282,7 +1294,7 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
        } else {
                if (ldlm_reclaim_full()) {
                        DEBUG_REQ(D_DLMTRACE, req,
-                                 "Too many granted locks, reject current enqueue request and let the client retry later.\n");
+                                 "Too many granted locks, reject current enqueue request and let the client retry later");
                        GOTO(out, rc = -EINPROGRESS);
                }
        }
@@ -1349,20 +1361,16 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
                                     &lock->l_policy_data);
        if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
                lock->l_req_extent = lock->l_policy_data.l_extent;
+       else if (dlm_req->lock_desc.l_resource.lr_type == LDLM_IBITS)
+               lock->l_policy_data.l_inodebits.try_bits =
+                       dlm_req->lock_desc.l_policy_data.l_inodebits.try_bits;
 
 existing_lock:
-       if (flags & LDLM_FL_HAS_INTENT) {
-               /*
-                * In this case, the reply buffer is allocated deep in
-                * local_lock_enqueue by the policy function.
-                */
-               cookie = req;
-       } else {
-               /*
-                * based on the assumption that lvb size never changes during
+       cookie = req;
+       if (!(flags & LDLM_FL_HAS_INTENT)) {
+               /* based on the assumption that lvb size never changes during
                 * resource life time otherwise it need resource->lr_lock's
-                * protection
-                */
+                * protection */
                req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB,
                                     RCL_SERVER, ldlm_lvbo_size(lock));
 
@@ -1586,6 +1594,8 @@ int ldlm_handle_convert0(struct ptlrpc_request *req,
        struct obd_export *exp = req->rq_export;
        struct ldlm_reply *dlm_rep;
        struct ldlm_lock *lock;
+       __u64 bits;
+       __u64 new_bits;
        int rc;
 
        ENTRY;
@@ -1602,62 +1612,61 @@ int ldlm_handle_convert0(struct ptlrpc_request *req,
        dlm_rep->lock_flags = dlm_req->lock_flags;
 
        lock = ldlm_handle2lock(&dlm_req->lock_handle[0]);
-       if (lock) {
-               __u64 bits;
-               __u64 new;
+       if (!lock) {
+               LDLM_DEBUG_NOLOCK("server lock is canceled already");
+               req->rq_status = ELDLM_NO_LOCK_DATA;
+               RETURN(0);
+       }
 
-               bits = lock->l_policy_data.l_inodebits.bits;
-               new = dlm_req->lock_desc.l_policy_data.l_inodebits.bits;
-               LDLM_DEBUG(lock, "server-side convert handler START");
+       LDLM_DEBUG(lock, "server-side convert handler START");
 
-               if (ldlm_is_cancel(lock)) {
-                       LDLM_ERROR(lock, "convert on canceled lock!");
-                       rc = ELDLM_NO_LOCK_DATA;
-               } else if (dlm_req->lock_desc.l_req_mode !=
-                          lock->l_granted_mode) {
-                       LDLM_ERROR(lock, "lock mode differs!");
-                       rc = ELDLM_NO_LOCK_DATA;
-               } else if (bits == new) {
-                       /*
-                        * This can be valid situation if CONVERT RPCs are
-                        * re-ordered. Just finish silently
-                        */
-                       LDLM_DEBUG(lock, "lock is converted already!");
-                       rc = ELDLM_OK;
-               } else {
-                       lock_res_and_lock(lock);
-                       if (ldlm_is_waited(lock))
-                               ldlm_del_waiting_lock(lock);
+       lock_res_and_lock(lock);
+       bits = lock->l_policy_data.l_inodebits.bits;
+       new_bits = dlm_req->lock_desc.l_policy_data.l_inodebits.bits;
 
-                       ldlm_clear_cbpending(lock);
-                       lock->l_policy_data.l_inodebits.cancel_bits = 0;
-                       ldlm_inodebits_drop(lock, bits & ~new);
+       if (ldlm_is_cancel(lock)) {
+               LDLM_DEBUG(lock, "convert on canceled lock!");
+               unlock_res_and_lock(lock);
+               GOTO(out_put, rc = ELDLM_NO_LOCK_DATA);
+       }
 
-                       ldlm_clear_blocking_data(lock);
-                       unlock_res_and_lock(lock);
+       if (dlm_req->lock_desc.l_req_mode != lock->l_granted_mode) {
+               LDLM_ERROR(lock, "lock mode differs!");
+               unlock_res_and_lock(lock);
+               GOTO(out_put, rc = -EPROTO);
+       }
 
-                       ldlm_reprocess_all(lock->l_resource, NULL);
-                       rc = ELDLM_OK;
-               }
+       if (bits == new_bits) {
+               /*
+                * This can be valid situation if CONVERT RPCs are
+                * re-ordered. Just finish silently
+                */
+               LDLM_DEBUG(lock, "lock is converted already!");
+               unlock_res_and_lock(lock);
+       } else {
+               if (ldlm_is_waited(lock))
+                       ldlm_del_waiting_lock(lock);
 
-               if (rc == ELDLM_OK) {
-                       dlm_rep->lock_handle = lock->l_remote_handle;
-                       ldlm_ibits_policy_local_to_wire(&lock->l_policy_data,
-                                       &dlm_rep->lock_desc.l_policy_data);
-               }
+               ldlm_clear_cbpending(lock);
+               lock->l_policy_data.l_inodebits.cancel_bits = 0;
+               ldlm_inodebits_drop(lock, bits & ~new_bits);
 
-               LDLM_DEBUG(lock, "server-side convert handler END, rc = %d",
-                          rc);
-               LDLM_LOCK_PUT(lock);
-       } else {
-               rc = ELDLM_NO_LOCK_DATA;
-               LDLM_DEBUG_NOLOCK("server-side convert handler END, rc = %d",
-                                 rc);
+               ldlm_clear_blocking_data(lock);
+               unlock_res_and_lock(lock);
+
+               ldlm_reprocess_all(lock->l_resource, NULL);
        }
 
+       dlm_rep->lock_handle = lock->l_remote_handle;
+       ldlm_ibits_policy_local_to_wire(&lock->l_policy_data,
+                                       &dlm_rep->lock_desc.l_policy_data);
+       rc = ELDLM_OK;
+       EXIT;
+out_put:
+       LDLM_DEBUG(lock, "server-side convert handler END, rc = %d", rc);
+       LDLM_LOCK_PUT(lock);
        req->rq_status = rc;
-
-       RETURN(0);
+       return 0;
 }
 
 /**
@@ -1673,9 +1682,16 @@ int ldlm_request_cancel(struct ptlrpc_request *req,
        struct ldlm_resource *res, *pres = NULL;
        struct ldlm_lock *lock;
        int i, count, done = 0;
+       unsigned int size;
 
        ENTRY;
 
+       size = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT);
+       if (size <= offsetof(struct ldlm_request, lock_handle) ||
+           (size - offsetof(struct ldlm_request, lock_handle)) /
+            sizeof(struct lustre_handle) < dlm_req->lock_count)
+               RETURN(0);
+
        count = dlm_req->lock_count ? dlm_req->lock_count : 1;
        if (first >= count)
                RETURN(0);
@@ -1696,6 +1712,7 @@ int ldlm_request_cancel(struct ptlrpc_request *req,
        for (i = first; i < count; i++) {
                lock = ldlm_handle2lock(&dlm_req->lock_handle[i]);
                if (!lock) {
+                       /* below message checked in replay-single.sh test_36 */
                        LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock (cookie %llu)",
                                          dlm_req->lock_handle[i].cookie);
                        continue;
@@ -1728,11 +1745,14 @@ int ldlm_request_cancel(struct ptlrpc_request *req,
 
                if ((flags & LATF_STATS) && ldlm_is_ast_sent(lock) &&
                    lock->l_blast_sent != 0) {
-                       time64_t delay = ktime_get_real_seconds() -
-                                        lock->l_blast_sent;
+                       timeout_t delay = 0;
+
+                       if (ktime_get_real_seconds() > lock->l_blast_sent)
+                               delay = ktime_get_real_seconds() -
+                                       lock->l_blast_sent;
                        LDLM_DEBUG(lock,
-                                  "server cancels blocked lock after %llds",
-                                  (s64)delay);
+                                  "server cancels blocked lock after %ds",
+                                  delay);
                        at_measured(&lock->l_export->exp_bl_lock_at, delay);
                }
                ldlm_lock_cancel(lock);
@@ -1766,6 +1786,10 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
                RETURN(-EFAULT);
        }
 
+       if (req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT) <
+           offsetof(struct ldlm_request, lock_handle[1]))
+               RETURN(-EPROTO);
+
        if (req->rq_export && req->rq_export->exp_nid_stats &&
            req->rq_export->exp_nid_stats->nid_ldlm_stats)
                lprocfs_counter_incr(req->rq_export->exp_nid_stats->nid_ldlm_stats,
@@ -1783,23 +1807,17 @@ int ldlm_handle_cancel(struct ptlrpc_request *req)
 #endif /* HAVE_SERVER_SUPPORT */
 
 /**
- * Callback handler for receiving incoming blocking ASTs.
- *
- * This can only happen on client side.
+ * Server may pass additional information about blocking lock.
+ * For IBITS locks it is conflicting bits which can be used for
+ * lock convert instead of cancel.
  */
-void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
-                            struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
+void ldlm_bl_desc2lock(const struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
 {
-       int do_ast;
-
-       ENTRY;
-
-       LDLM_DEBUG(lock, "client blocking AST callback handler");
+       struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
 
-       lock_res_and_lock(lock);
-
-       /* set bits to cancel for this lock for possible lock convert */
-       if (ns_is_client(ns) && (lock->l_resource->lr_type == LDLM_IBITS)) {
+       check_res_locked(lock->l_resource);
+       if (ns_is_client(ns) && ld &&
+           (lock->l_resource->lr_type == LDLM_IBITS)) {
                /*
                 * Lock description contains policy of blocking lock,
                 * and its cancel_bits is used to pass conflicting bits.
@@ -1811,18 +1829,43 @@ void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
                 * never use cancel bits from different resource, full cancel
                 * is to be used.
                 */
-               if (ld && ld->l_policy_data.l_inodebits.bits &&
+               if (ld->l_policy_data.l_inodebits.cancel_bits &&
                    ldlm_res_eq(&ld->l_resource.lr_name,
-                               &lock->l_resource->lr_name))
-                       lock->l_policy_data.l_inodebits.cancel_bits =
+                               &lock->l_resource->lr_name) &&
+                   !(ldlm_is_cbpending(lock) &&
+                     lock->l_policy_data.l_inodebits.cancel_bits == 0)) {
+                       /* always combine conflicting ibits */
+                       lock->l_policy_data.l_inodebits.cancel_bits |=
                                ld->l_policy_data.l_inodebits.cancel_bits;
-               /*
-                * if there is no valid ld and lock is cbpending already
-                * then cancel_bits should be kept, otherwise it is zeroed.
-                */
-               else if (!ldlm_is_cbpending(lock))
+               } else {
+                       /* If cancel_bits are not obtained or
+                        * if the lock is already CBPENDING and
+                        * has no cancel_bits set
+                        * - the full lock is to be cancelled
+                        */
                        lock->l_policy_data.l_inodebits.cancel_bits = 0;
+               }
        }
+}
+
+/**
+ * Callback handler for receiving incoming blocking ASTs.
+ *
+ * This can only happen on client side.
+ */
+void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
+                            struct ldlm_lock_desc *ld, struct ldlm_lock *lock)
+{
+       int do_ast;
+
+       ENTRY;
+
+       LDLM_DEBUG(lock, "client blocking AST callback handler");
+
+       lock_res_and_lock(lock);
+
+       /* get extra information from desc if any */
+       ldlm_bl_desc2lock(ld, lock);
        ldlm_set_cbpending(lock);
 
        do_ast = (!lock->l_readers && !lock->l_writers);
@@ -1846,17 +1889,31 @@ void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
        EXIT;
 }
 
+static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
+{
+       if (req->rq_no_reply)
+               return 0;
+
+       req->rq_status = rc;
+       if (!req->rq_packed_final) {
+               rc = lustre_pack_reply(req, 1, NULL, NULL);
+               if (rc)
+                       return rc;
+       }
+       return ptlrpc_reply(req);
+}
+
 /**
  * Callback handler for receiving incoming completion ASTs.
  *
  * This only can happen on client side.
  */
-static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
-                                   struct ldlm_namespace *ns,
-                                   struct ldlm_request *dlm_req,
-                                   struct ldlm_lock *lock)
+static int ldlm_handle_cp_callback(struct ptlrpc_request *req,
+                                    struct ldlm_namespace *ns,
+                                    struct ldlm_request *dlm_req,
+                                    struct ldlm_lock *lock)
 {
-       struct list_head ast_list;
+       LIST_HEAD(ast_list);
        int lvb_len;
        int rc = 0;
 
@@ -1864,13 +1921,13 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
 
        LDLM_DEBUG(lock, "client completion callback handler START");
 
-       INIT_LIST_HEAD(&ast_list);
        if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {
                long to = cfs_time_seconds(1);
 
+               ldlm_callback_reply(req, 0);
+
                while (to > 0) {
-                       set_current_state(TASK_INTERRUPTIBLE);
-                       schedule_timeout(to);
+                       schedule_timeout_interruptible(to);
                        if (ldlm_is_granted(lock) ||
                            ldlm_is_destroyed(lock))
                                break;
@@ -1911,6 +1968,12 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
                lock_res_and_lock(lock);
        }
 
+       if (ldlm_is_failed(lock)) {
+               unlock_res_and_lock(lock);
+               LDLM_LOCK_RELEASE(lock);
+               RETURN(-EINVAL);
+       }
+
        if (ldlm_is_destroyed(lock) ||
            ldlm_is_granted(lock)) {
                /* b=11300: the lock has already been granted */
@@ -1944,6 +2007,7 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req,
                 * Let ldlm_cancel_lru() be fast.
                 */
                ldlm_lock_remove_from_lru(lock);
+               ldlm_bl_desc2lock(&dlm_req->lock_desc, lock);
                lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
                LDLM_DEBUG(lock, "completion AST includes blocking AST");
        }
@@ -1982,6 +2046,8 @@ out:
                wake_up(&lock->l_waitq);
        }
        LDLM_LOCK_RELEASE(lock);
+
+       return 0;
 }
 
 /**
@@ -1996,6 +2062,7 @@ static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
                                    struct ldlm_request *dlm_req,
                                    struct ldlm_lock *lock)
 {
+       struct ldlm_lock_desc *ld = &dlm_req->lock_desc;
        int rc = -ENOSYS;
 
        ENTRY;
@@ -2016,11 +2083,17 @@ static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
        if (lock->l_granted_mode == LCK_PW &&
            !lock->l_readers && !lock->l_writers &&
            ktime_after(ktime_get(),
-                       ktime_add(lock->l_last_used,
-                                 ktime_set(ns->ns_dirty_age_limit, 0)))) {
+                       ktime_add(lock->l_last_used, ns->ns_dirty_age_limit))) {
                unlock_res_and_lock(lock);
-               if (ldlm_bl_to_thread_lock(ns, NULL, lock))
-                       ldlm_handle_bl_callback(ns, NULL, lock);
+
+               /* For MDS glimpse it is always DOM lock, set corresponding
+                * cancel_bits to perform lock convert if needed
+                */
+               if (lock->l_resource->lr_type == LDLM_IBITS)
+                       ld->l_policy_data.l_inodebits.cancel_bits =
+                                                       MDS_INODELOCK_DOM;
+               if (ldlm_bl_to_thread_lock(ns, ld, lock))
+                       ldlm_handle_bl_callback(ns, ld, lock);
 
                EXIT;
                return;
@@ -2030,20 +2103,6 @@ static void ldlm_handle_gl_callback(struct ptlrpc_request *req,
        EXIT;
 }
 
-static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
-{
-       if (req->rq_no_reply)
-               return 0;
-
-       req->rq_status = rc;
-       if (!req->rq_packed_final) {
-               rc = lustre_pack_reply(req, 1, NULL, NULL);
-               if (rc)
-                       return rc;
-       }
-       return ptlrpc_reply(req);
-}
-
 static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi,
                               enum ldlm_cancel_flags cancel_flags)
 {
@@ -2084,7 +2143,7 @@ static inline void init_blwi(struct ldlm_bl_work_item *blwi,
        init_completion(&blwi->blwi_comp);
        INIT_LIST_HEAD(&blwi->blwi_head);
 
-       if (memory_pressure_get())
+       if (current->flags & PF_MEMALLOC)
                blwi->blwi_mem_pressure = 1;
 
        blwi->blwi_ns = ns;
@@ -2092,8 +2151,7 @@ static inline void init_blwi(struct ldlm_bl_work_item *blwi,
        if (ld != NULL)
                blwi->blwi_ld = *ld;
        if (count) {
-               list_add(&blwi->blwi_head, cancels);
-               list_del_init(cancels);
+               list_splice_init(cancels, &blwi->blwi_head);
                blwi->blwi_count = count;
        } else {
                blwi->blwi_lock = lock;
@@ -2173,7 +2231,7 @@ static int ldlm_handle_setinfo(struct ptlrpc_request *req)
 
        ENTRY;
 
-       DEBUG_REQ(D_HSM, req, "%s: handle setinfo\n", obd->obd_name);
+       DEBUG_REQ(D_HSM, req, "%s: handle setinfo", obd->obd_name);
 
        req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO);
 
@@ -2202,7 +2260,7 @@ static int ldlm_handle_setinfo(struct ptlrpc_request *req)
                                        KEY_HSM_COPYTOOL_SEND,
                                        vallen, val, NULL);
        else
-               DEBUG_REQ(D_WARNING, req, "ignoring unknown key %s", key);
+               DEBUG_REQ(D_WARNING, req, "ignoring unknown key '%s'", key);
 
        return rc;
 }
@@ -2212,9 +2270,9 @@ static inline void ldlm_callback_errmsg(struct ptlrpc_request *req,
                                        const struct lustre_handle *handle)
 {
        DEBUG_REQ((req->rq_no_reply || rc) ? D_WARNING : D_DLMTRACE, req,
-                 "%s: [nid %s] [rc %d] [lock %#llx]",
-                 msg, libcfs_id2str(req->rq_peer), rc,
-                 handle ? handle->cookie : 0);
+                 "%s, NID=%s lock=%#llx: rc = %d",
+                 msg, libcfs_id2str(req->rq_peer),
+                 handle ? handle->cookie : 0, rc);
        if (req->rq_no_reply)
                CWARN("No reply was sent, maybe cause b=21636.\n");
        else if (rc)
@@ -2379,8 +2437,9 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
        case LDLM_CP_CALLBACK:
                CDEBUG(D_INODE, "completion ast\n");
                req_capsule_extend(&req->rq_pill, &RQF_LDLM_CP_CALLBACK);
-               ldlm_callback_reply(req, 0);
-               ldlm_handle_cp_callback(req, ns, dlm_req, lock);
+               rc = ldlm_handle_cp_callback(req, ns, dlm_req, lock);
+               if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE))
+                       ldlm_callback_reply(req, rc);
                break;
        case LDLM_GL_CALLBACK:
                CDEBUG(D_INODE, "glimpse ast\n");
@@ -2493,7 +2552,7 @@ static int ldlm_cancel_hpreq_lock_match(struct ptlrpc_request *req,
                if (lustre_handle_equal(&dlm_req->lock_handle[i],
                                        &lockh)) {
                        DEBUG_REQ(D_RPCTRACE, req,
-                                 "Prio raised by lock %#llx.", lockh.cookie);
+                                 "Prio raised by lock %#llx", lockh.cookie);
                        rc = 1;
                        break;
                }
@@ -2507,6 +2566,7 @@ static int ldlm_cancel_hpreq_check(struct ptlrpc_request *req)
        struct ldlm_request *dlm_req;
        int rc = 0;
        int i;
+       unsigned int size;
 
        ENTRY;
 
@@ -2518,6 +2578,12 @@ static int ldlm_cancel_hpreq_check(struct ptlrpc_request *req)
        if (dlm_req == NULL)
                RETURN(-EFAULT);
 
+       size = req_capsule_get_size(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT);
+       if (size <= offsetof(struct ldlm_request, lock_handle) ||
+           (size - offsetof(struct ldlm_request, lock_handle)) /
+            sizeof(struct lustre_handle) < dlm_req->lock_count)
+               RETURN(-EPROTO);
+
        for (i = 0; i < dlm_req->lock_count; i++) {
                struct ldlm_lock *lock;
 
@@ -2615,11 +2681,10 @@ static int ldlm_revoke_lock_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
 
 void ldlm_revoke_export_locks(struct obd_export *exp)
 {
-       struct list_head rpc_list;
+       LIST_HEAD(rpc_list);
 
        ENTRY;
 
-       INIT_LIST_HEAD(&rpc_list);
        cfs_hash_for_each_nolock(exp->exp_lock_hash,
                                 ldlm_revoke_lock_cb, &rpc_list, 0);
        ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list,
@@ -2740,6 +2805,9 @@ static int ldlm_bl_thread_need_create(struct ldlm_bl_pool *blp,
 static int ldlm_bl_thread_blwi(struct ldlm_bl_pool *blp,
                               struct ldlm_bl_work_item *blwi)
 {
+       /* '1' for consistency with code that checks !mpflag to restore */
+       unsigned int mpflags = 1;
+
        ENTRY;
 
        if (blwi->blwi_ns == NULL)
@@ -2747,7 +2815,7 @@ static int ldlm_bl_thread_blwi(struct ldlm_bl_pool *blp,
                RETURN(LDLM_ITER_STOP);
 
        if (blwi->blwi_mem_pressure)
-               memory_pressure_set();
+               mpflags = memalloc_noreclaim_save();
 
        OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL2, 4);
 
@@ -2769,7 +2837,7 @@ static int ldlm_bl_thread_blwi(struct ldlm_bl_pool *blp,
                                        blwi->blwi_lock);
        }
        if (blwi->blwi_mem_pressure)
-               memory_pressure_clr();
+               memalloc_noreclaim_restore(mpflags);
 
        if (blwi->blwi_flags & LCF_ASYNC)
                OBD_FREE(blwi, sizeof(*blwi));
@@ -2836,7 +2904,6 @@ static int ldlm_bl_thread_main(void *arg)
        /* cannot use bltd after this, it is only on caller's stack */
 
        while (1) {
-               struct l_wait_info lwi = { 0 };
                struct ldlm_bl_work_item *blwi = NULL;
                struct obd_export *exp = NULL;
                int rc;
@@ -2844,10 +2911,9 @@ static int ldlm_bl_thread_main(void *arg)
                rc = ldlm_bl_get_work(blp, &blwi, &exp);
 
                if (rc == 0)
-                       l_wait_event_exclusive(blp->blp_waitq,
-                                              ldlm_bl_get_work(blp, &blwi,
-                                                               &exp),
-                                              &lwi);
+                       wait_event_idle_exclusive(blp->blp_waitq,
+                                                 ldlm_bl_get_work(blp, &blwi,
+                                                                  &exp));
                atomic_inc(&blp->blp_busy_threads);
 
                if (ldlm_bl_thread_need_create(blp, blwi))