Whamcloud - gitweb
LU-17705 ptlrpc: replace synchronize_rcu() with rcu_barrier()
[fs/lustre-release.git] / lustre / ldlm / ldlm_lockd.c
index 0315364..c4e553c 100644 (file)
@@ -27,7 +27,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * lustre/ldlm/ldlm_lockd.c
  *
@@ -84,27 +83,29 @@ static inline timeout_t ldlm_get_rq_timeout(void)
 }
 
 struct ldlm_bl_pool {
-       spinlock_t blp_lock;
+       spinlock_t              blp_lock;
 
        /*
         * blp_prio_list is used for callbacks that should be handled
         * as a priority. It is used for LDLM_FL_DISCARD_DATA requests.
         * see b=13843
         */
-       struct list_head blp_prio_list;
+       struct list_head        blp_prio_list;
 
        /*
         * blp_list is used for all other callbacks which are likely
         * to take longer to process.
         */
-       struct list_head blp_list;
-
-       wait_queue_head_t blp_waitq;
-       struct completion blp_comp;
-       atomic_t blp_num_threads;
-       atomic_t blp_busy_threads;
-       int blp_min_threads;
-       int blp_max_threads;
+       struct list_head        blp_list;
+
+       wait_queue_head_t       blp_waitq;
+       struct completion       blp_comp;
+       atomic_t                blp_num_threads;
+       atomic_t                blp_busy_threads;
+       int                     blp_min_threads;
+       int                     blp_max_threads;
+       int                     blp_total_locks;
+       int                     blp_total_blwis;
 };
 
 struct ldlm_bl_work_item {
@@ -203,8 +204,8 @@ static int expired_lock_main(void *arg)
                        struct obd_export *export;
                        struct ldlm_lock *lock;
 
-                       lock = list_entry(expired->next, struct ldlm_lock,
-                                         l_pending_chain);
+                       lock = list_first_entry(expired, struct ldlm_lock,
+                                               l_pending_chain);
                        if ((void *)lock < LP_POISON + PAGE_SIZE &&
                            (void *)lock >= LP_POISON) {
                                spin_unlock_bh(&waiting_locks_spinlock);
@@ -240,8 +241,8 @@ static int expired_lock_main(void *arg)
                        spin_unlock_bh(&waiting_locks_spinlock);
 
                        /* Check if we need to prolong timeout */
-                       if (!OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT) &&
-                           lock->l_callback_timestamp != 0 && /* not AST error */
+                       if (!CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT) &&
+                           lock->l_callback_timestamp != 0 && /* not AST err */
                            ldlm_lock_busy(lock)) {
                                LDLM_DEBUG(lock, "prolong the busy lock");
                                lock_res_and_lock(lock);
@@ -255,11 +256,12 @@ static int expired_lock_main(void *arg)
 
                                LDLM_ERROR(lock,
                                           "lock callback timer expired after %llds: evicting client at %s ",
-                                          ktime_get_real_seconds() -
+                                          ktime_get_seconds() -
                                           lock->l_blast_sent,
                                           obd_export_nid2str(export));
                                ldlm_lock_to_ns(lock)->ns_timeouts++;
-                               do_dump++;
+                               if (do_dump_on_eviction(export->exp_obd))
+                                       do_dump++;
                                class_fail_export(export);
                        }
                        class_export_lock_put(export, lock);
@@ -273,7 +275,7 @@ static int expired_lock_main(void *arg)
                }
                spin_unlock_bh(&waiting_locks_spinlock);
 
-               if (do_dump && obd_dump_on_eviction) {
+               if (do_dump) {
                        CERROR("dump the log upon eviction\n");
                        libcfs_debug_dumplog();
                }
@@ -322,8 +324,8 @@ static void waiting_locks_callback(TIMER_DATA_TYPE unused)
 
        spin_lock_bh(&waiting_locks_spinlock);
        while (!list_empty(&waiting_locks_list)) {
-               lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
-                                 l_pending_chain);
+               lock = list_first_entry(&waiting_locks_list, struct ldlm_lock,
+                                       l_pending_chain);
                if (lock->l_callback_timestamp > ktime_get_seconds() ||
                    lock->l_req_mode == LCK_GROUP)
                        break;
@@ -352,8 +354,8 @@ static void waiting_locks_callback(TIMER_DATA_TYPE unused)
                time64_t now = ktime_get_seconds();
                timeout_t delta = 0;
 
-               lock = list_entry(waiting_locks_list.next, struct ldlm_lock,
-                                 l_pending_chain);
+               lock = list_first_entry(&waiting_locks_list, struct ldlm_lock,
+                                       l_pending_chain);
                if (lock->l_callback_timestamp - now > 0)
                        delta = lock->l_callback_timestamp - now;
                mod_timer(&waiting_locks_timer,
@@ -377,22 +379,23 @@ static void waiting_locks_callback(TIMER_DATA_TYPE unused)
 static int __ldlm_add_waiting_lock(struct ldlm_lock *lock, timeout_t delay)
 {
        unsigned long timeout_jiffies = jiffies;
-       time64_t now = ktime_get_seconds();
        time64_t deadline;
        timeout_t timeout;
 
+       lock->l_blast_sent = ktime_get_seconds();
        if (!list_empty(&lock->l_pending_chain))
                return 0;
 
-       if (OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT) ||
-           OBD_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT))
+       if (CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_NOTIMEOUT) ||
+           CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_HPREQ_TIMEOUT))
                delay = 1;
 
-       deadline = now + delay;
+       deadline = lock->l_blast_sent + delay;
        if (likely(deadline > lock->l_callback_timestamp))
                lock->l_callback_timestamp = deadline;
 
-       timeout = clamp_t(timeout_t, lock->l_callback_timestamp - now,
+       timeout = clamp_t(timeout_t,
+                         lock->l_callback_timestamp - lock->l_blast_sent,
                          0, delay);
        timeout_jiffies += cfs_time_seconds(timeout);
 
@@ -435,7 +438,8 @@ static void ldlm_add_blocked_lock(struct ldlm_lock *lock)
 
 static int ldlm_add_waiting_lock(struct ldlm_lock *lock, timeout_t timeout)
 {
-       int ret;
+       struct obd_device *obd = NULL;
+       int at_off, ret;
 
        /* NB: must be called with hold of lock_res_and_lock() */
        LASSERT(ldlm_is_res_locked(lock));
@@ -445,9 +449,12 @@ static int ldlm_add_waiting_lock(struct ldlm_lock *lock, timeout_t timeout)
         * Do not put cross-MDT lock in the waiting list, since we
         * will not evict it due to timeout for now
         */
-       if (lock->l_export != NULL &&
-           (exp_connect_flags(lock->l_export) & OBD_CONNECT_MDS_MDS))
-               return 0;
+       if (lock->l_export != NULL) {
+               obd = lock->l_export->exp_obd;
+
+               if (exp_connect_flags(lock->l_export) & OBD_CONNECT_MDS_MDS)
+                       return 0;
+       }
 
        spin_lock_bh(&waiting_locks_spinlock);
        if (ldlm_is_cancel(lock)) {
@@ -462,13 +469,12 @@ static int ldlm_add_waiting_lock(struct ldlm_lock *lock, timeout_t timeout)
                LDLM_ERROR(lock, "not waiting on destroyed lock (b=5653)");
                if (ktime_get_seconds() > next) {
                        next = ktime_get_seconds() + 14400;
-                       libcfs_debug_dumpstack(NULL);
+                       dump_stack();
                }
                return 0;
        }
 
        ldlm_set_waited(lock);
-       lock->l_blast_sent = ktime_get_real_seconds();
        ret = __ldlm_add_waiting_lock(lock, timeout);
        if (ret) {
                /*
@@ -482,9 +488,10 @@ static int ldlm_add_waiting_lock(struct ldlm_lock *lock, timeout_t timeout)
        if (ret)
                ldlm_add_blocked_lock(lock);
 
+       at_off = obd_at_off(obd);
        LDLM_DEBUG(lock, "%sadding to wait list(timeout: %d, AT: %s)",
                   ret == 0 ? "not re-" : "", timeout,
-                  AT_OFF ? "off" : "on");
+                  at_off ? "off" : "on");
        return ret;
 }
 
@@ -509,7 +516,7 @@ static int __ldlm_del_waiting_lock(struct ldlm_lock *lock)
                /* Removing the head of the list, adjust timer. */
                if (list_next == &waiting_locks_list) {
                        /* No more, just cancel. */
-                       del_timer(&waiting_locks_timer);
+                       timer_delete(&waiting_locks_timer);
                } else {
                        time64_t now = ktime_get_seconds();
                        struct ldlm_lock *next;
@@ -596,7 +603,7 @@ int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, timeout_t timeout)
        __ldlm_add_waiting_lock(lock, timeout);
        spin_unlock_bh(&waiting_locks_spinlock);
 
-       LDLM_DEBUG(lock, "refreshed");
+       LDLM_DEBUG(lock, "refreshed to %ds", timeout);
        return 1;
 }
 EXPORT_SYMBOL(ldlm_refresh_waiting_lock);
@@ -629,8 +636,9 @@ int ldlm_refresh_waiting_lock(struct ldlm_lock *lock, timeout_t timeout)
 timeout_t ldlm_bl_timeout(struct ldlm_lock *lock)
 {
        timeout_t timeout;
+       struct obd_device *obd = lock->l_export->exp_obd;
 
-       if (AT_OFF)
+       if (obd_at_off(obd))
                return obd_timeout / 2;
 
        /*
@@ -639,13 +647,66 @@ timeout_t ldlm_bl_timeout(struct ldlm_lock *lock)
         * It would be nice to have some kind of "early reply" mechanism for
         * lock callbacks too...
         */
-       timeout = at_get(&lock->l_export->exp_bl_lock_at);
+       timeout = obd_at_get(obd, &lock->l_export->exp_bl_lock_at);
        return max_t(timeout_t, timeout + (timeout >> 1),
-                    (timeout_t)ldlm_enqueue_min);
+                    (timeout_t)obd_get_ldlm_enqueue_min(obd));
 }
 EXPORT_SYMBOL(ldlm_bl_timeout);
 
 /**
+ * Calculate the per-export Blocking timeout by the given RPC (covering the
+ * reply to this RPC and the next RPC). The next RPC could be still not CANCEL,
+ * but having the lock refresh mechanism it is enough.
+ *
+ * Used for lock refresh timeout when we are in the middle of the process -
+ * BL AST is sent, CANCEL is ahead - it is still 1 reply for the current RPC
+ * and at least 1 RPC (which will trigger another refresh if it will be not
+ * CANCEL) - but more accurate than ldlm_bl_timeout as the timeout is taken
+ * from the RPC (i.e. the view of the client on the current AT) is taken into
+ * account.
+ *
+ * \param[in] req     req which export needs the timeout calculation
+ *
+ * \retval            timeout in seconds to wait for the next client's RPC
+ */
+timeout_t ldlm_bl_timeout_by_rpc(struct ptlrpc_request *req)
+{
+       struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
+       timeout_t timeout, req_timeout, at_timeout, netl;
+       struct obd_device *obd = req->rq_export->exp_obd;
+
+       if (obd_at_off(obd))
+               return obd_timeout / 2;
+
+       /* A blocked lock means somebody in the cluster is waiting, and we
+        * should not consider the worst ever case, consisting of a chain of
+        * failures on each step, however this timeout should survive a
+        * recovery of at least 1 failure, let this one to be the worst one:
+        * in case a server NID is dead first re-connect is done through the
+        * same router and also times out.
+        *
+        * Either this on the next RPC times out, take the max.
+        * Considering the current RPC, take just the left time.
+        */
+       netl = obd_at_get(obd,
+                         &req->rq_export->exp_imp_reverse->imp_at.iat_net_latency);
+       req_timeout = req->rq_deadline - ktime_get_real_seconds() + netl;
+       at_timeout = at_est2timeout(obd_at_get(obd, &svcpt->scp_at_estimate))
+                                   + netl;
+       req_timeout = max(req_timeout, at_timeout);
+
+       /* Take 1 re-connect failure and 1 re-connect success into account. */
+       timeout = at_timeout + INITIAL_CONNECT_TIMEOUT + netl + req_timeout;
+
+       /* Client's timeout is calculated as at_est2timeout(), let's be a bit
+        * more conservative than client
+        */
+       return max(timeout + (timeout >> 4),
+                  (timeout_t)obd_get_ldlm_enqueue_min(obd));
+}
+EXPORT_SYMBOL(ldlm_bl_timeout_by_rpc);
+
+/**
  * Perform lock cleanup if AST sending failed.
  */
 static void ldlm_failed_ast(struct ldlm_lock *lock, int rc,
@@ -679,14 +740,14 @@ static int ldlm_handle_ast_error(struct ldlm_lock *lock,
                                 struct ptlrpc_request *req, int rc,
                                 const char *ast_type)
 {
-       struct lnet_process_id peer = req->rq_import->imp_connection->c_peer;
+       struct lnet_processid *peer = &req->rq_import->imp_connection->c_peer;
 
        if (!req->rq_replied || (rc && rc != -EINVAL)) {
                if (ldlm_is_cancel(lock)) {
                        LDLM_DEBUG(lock,
                                   "%s AST (req@%p x%llu) timeout from nid %s, but cancel was received (AST reply lost?)",
                                   ast_type, req, req->rq_xid,
-                                  libcfs_nid2str(peer.nid));
+                                  libcfs_nidstr(&peer->nid));
                        ldlm_lock_cancel(lock);
                        rc = -ERESTART;
                } else if (rc == -ENODEV || rc == -ESHUTDOWN ||
@@ -704,13 +765,13 @@ static int ldlm_handle_ast_error(struct ldlm_lock *lock,
                         * In all such cases errors are ignored.
                         */
                        LDLM_DEBUG(lock,
-                                  "%s AST can't be sent due to a server %s failure or umount process: rc = %d\n",
+                                  "%s AST can't be sent due to a server %s failure or umount process: rc = %d",
                                    ast_type,
                                     req->rq_import->imp_obd->obd_name, rc);
                } else {
                        LDLM_ERROR(lock,
                                   "client (nid %s) %s %s AST (req@%p x%llu status %d rc %d), evict it",
-                                  libcfs_nid2str(peer.nid),
+                                  libcfs_nidstr(&peer->nid),
                                   req->rq_replied ? "returned error from" :
                                   "failed to reply to",
                                   ast_type, req, req->rq_xid,
@@ -727,7 +788,7 @@ static int ldlm_handle_ast_error(struct ldlm_lock *lock,
 
                LDLM_DEBUG(lock,
                           "client (nid %s) returned %d from %s AST (req@%p x%llu) - normal race",
-                          libcfs_nid2str(peer.nid),
+                          libcfs_nidstr(&peer->nid),
                           req->rq_repmsg ?
                           lustre_msg_get_status(req->rq_repmsg) : -1,
                           ast_type, req, req->rq_xid);
@@ -886,6 +947,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
        struct ptlrpc_request  *req;
        int instant_cancel = 0;
        int rc = 0;
+       struct obd_device *obd;
 
        ENTRY;
 
@@ -893,14 +955,16 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
                /* Don't need to do anything here. */
                RETURN(0);
 
-       if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_SRV_BL_AST)) {
+       if (CFS_FAIL_PRECHECK(OBD_FAIL_LDLM_SRV_BL_AST)) {
                LDLM_DEBUG(lock, "dropping BL AST");
                RETURN(0);
        }
 
        LASSERT(lock);
        LASSERT(data != NULL);
-       if (lock->l_export->exp_obd->obd_recovering != 0)
+
+       obd = lock->l_export->exp_obd;
+       if (obd->obd_recovering != 0)
                LDLM_ERROR(lock, "BUG 6063: lock collide during recovery");
 
        ldlm_lock_reorder_req(lock);
@@ -946,7 +1010,8 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
        body->lock_handle[0] = lock->l_remote_handle;
        body->lock_handle[1].cookie = lock->l_handle.h_cookie;
        body->lock_desc = *desc;
-       body->lock_flags |= ldlm_flags_to_wire(lock->l_flags & LDLM_FL_AST_MASK);
+       body->lock_flags |= ldlm_flags_to_wire(lock->l_flags &
+                                              LDLM_FL_AST_MASK);
 
        LDLM_DEBUG(lock, "server preparing blocking AST");
 
@@ -969,7 +1034,7 @@ int ldlm_server_blocking_ast(struct ldlm_lock *lock,
 
        req->rq_send_state = LUSTRE_IMP_FULL;
        /* ptlrpc_request_alloc_pack already set timeout */
-       if (AT_OFF)
+       if (obd_at_off(obd))
                req->rq_timeout = ldlm_get_rq_timeout();
 
        if (lock->l_export && lock->l_export->exp_nid_stats &&
@@ -998,17 +1063,19 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
        int instant_cancel = 0;
        int rc = 0;
        int lvb_len;
+       struct obd_device *obd;
 
        ENTRY;
 
        LASSERT(lock != NULL);
        LASSERT(data != NULL);
 
-       if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_SRV_CP_AST)) {
+       if (CFS_FAIL_PRECHECK(OBD_FAIL_LDLM_SRV_CP_AST)) {
                LDLM_DEBUG(lock, "dropping CP AST");
                RETURN(0);
        }
 
+       obd = lock->l_export->exp_obd;
        req = ptlrpc_request_alloc(lock->l_export->exp_imp_reverse,
                                   &RQF_LDLM_CP_CALLBACK);
        if (req == NULL)
@@ -1045,6 +1112,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
        ldlm_lock2desc(lock, &body->lock_desc);
        if (lvb_len > 0) {
                void *lvb = req_capsule_client_get(&req->rq_pill, &RMF_DLM_LVB);
+
                lvb_len = ldlm_lvbo_fill(lock, lvb, &lvb_len);
                if (lvb_len < 0) {
                        /*
@@ -1070,7 +1138,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
 
        req->rq_send_state = LUSTRE_IMP_FULL;
        /* ptlrpc_request_pack already set timeout */
-       if (AT_OFF)
+       if (obd_at_off(obd))
                req->rq_timeout = ldlm_get_rq_timeout();
 
        /* We only send real blocking ASTs after the lock is granted */
@@ -1116,6 +1184,7 @@ int ldlm_server_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
 
        RETURN(lvb_len < 0 ? lvb_len : rc);
 }
+EXPORT_SYMBOL(ldlm_server_completion_ast);
 
 /**
  * Server side ->l_glimpse_ast handler for client locks.
@@ -1131,6 +1200,7 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
        struct ldlm_cb_async_args *ca;
        int rc;
        struct req_format *req_fmt;
+       struct obd_device *obd = lock->l_export->exp_obd;
 
        ENTRY;
 
@@ -1172,7 +1242,7 @@ int ldlm_server_glimpse_ast(struct ldlm_lock *lock, void *data)
 
        req->rq_send_state = LUSTRE_IMP_FULL;
        /* ptlrpc_request_alloc_pack already set timeout */
-       if (AT_OFF)
+       if (obd_at_off(obd))
                req->rq_timeout = ldlm_get_rq_timeout();
 
        req->rq_interpret_reply = ldlm_cb_interpret;
@@ -1200,7 +1270,7 @@ int ldlm_glimpse_locks(struct ldlm_resource *res,
        rc = ldlm_run_ast_work(ldlm_res_to_ns(res), gl_work_list,
                               LDLM_WORK_GL_AST);
        if (rc == -ERESTART)
-               ldlm_reprocess_all(res, NULL);
+               ldlm_reprocess_all(res, 0);
 
        RETURN(rc);
 }
@@ -1227,10 +1297,10 @@ EXPORT_SYMBOL(ldlm_request_lock);
  * Main server-side entry point into LDLM for enqueue. This is called by ptlrpc
  * service threads to carry out client lock enqueueing requests.
  */
-int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
-                        struct ptlrpc_request *req,
-                        const struct ldlm_request *dlm_req,
-                        const struct ldlm_callback_suite *cbs)
+int ldlm_handle_enqueue(struct ldlm_namespace *ns,
+                       struct req_capsule *pill,
+                       const struct ldlm_request *dlm_req,
+                       const struct ldlm_callback_suite *cbs)
 {
        struct ldlm_reply *dlm_rep;
        __u64 flags;
@@ -1239,23 +1309,27 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
        void *cookie = NULL;
        int rc = 0;
        struct ldlm_resource *res = NULL;
+       struct ptlrpc_request *req = pill->rc_req;
        const struct lu_env *env = req->rq_svc_thread->t_env;
 
        ENTRY;
 
        LDLM_DEBUG_NOLOCK("server-side enqueue handler START");
 
-       ldlm_request_cancel(req, dlm_req, LDLM_ENQUEUE_CANCEL_OFF, LATF_SKIP);
-       flags = ldlm_flags_from_wire(dlm_req->lock_flags);
+       LASSERT(req && req->rq_export);
 
-       LASSERT(req->rq_export);
+       if (req_capsule_ptlreq(pill))
+               ldlm_request_cancel(req, dlm_req, LDLM_ENQUEUE_CANCEL_OFF,
+                                   LATF_SKIP);
+
+       flags = ldlm_flags_from_wire(dlm_req->lock_flags);
 
        /* for intent enqueue the stat will be updated inside intent policy */
        if (ptlrpc_req2svc(req)->srv_stats != NULL &&
            !(dlm_req->lock_flags & LDLM_FL_HAS_INTENT))
                ldlm_svc_get_eopc(dlm_req, ptlrpc_req2svc(req)->srv_stats);
 
-       if (req->rq_export && req->rq_export->exp_nid_stats &&
+       if (req->rq_export->exp_nid_stats &&
            req->rq_export->exp_nid_stats->nid_ldlm_stats)
                lprocfs_counter_incr(req->rq_export->exp_nid_stats->nid_ldlm_stats,
                                     LDLM_ENQUEUE - LDLM_FIRST_OPC);
@@ -1283,7 +1357,6 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
                 * In the function below, .hs_keycmp resolves to
                 * ldlm_export_lock_keycmp()
                 */
-               /* coverity[overrun-buffer-val] */
                lock = cfs_hash_lookup(req->rq_export->exp_lock_hash,
                                       (void *)&dlm_req->lock_handle[0]);
                if (lock != NULL) {
@@ -1331,7 +1404,7 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
                }
        }
 
-       OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2);
+       CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_ENQUEUE_BLOCKED, obd_timeout * 2);
        /*
         * Don't enqueue a lock onto the export if it is been disonnected
         * due to eviction (b=3822) or server umount (b=24324).
@@ -1361,25 +1434,29 @@ int ldlm_handle_enqueue0(struct ldlm_namespace *ns,
                                     dlm_req->lock_desc.l_resource.lr_type,
                                     &dlm_req->lock_desc.l_policy_data,
                                     &lock->l_policy_data);
-       if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT)
+       if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) {
                lock->l_req_extent = lock->l_policy_data.l_extent;
-       else if (dlm_req->lock_desc.l_resource.lr_type == LDLM_IBITS)
+       } else if (dlm_req->lock_desc.l_resource.lr_type == LDLM_IBITS) {
                lock->l_policy_data.l_inodebits.try_bits =
                        dlm_req->lock_desc.l_policy_data.l_inodebits.try_bits;
+               lock->l_policy_data.l_inodebits.li_gid =
+                       dlm_req->lock_desc.l_policy_data.l_inodebits.li_gid;
+       }
 
 existing_lock:
        cookie = req;
        if (!(flags & LDLM_FL_HAS_INTENT)) {
                /* based on the assumption that lvb size never changes during
                 * resource life time otherwise it need resource->lr_lock's
-                * protection */
-               req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB,
+                * protection
+                */
+               req_capsule_set_size(pill, &RMF_DLM_LVB,
                                     RCL_SERVER, ldlm_lvbo_size(lock));
 
-               if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
+               if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_EXTENT_ERR))
                        GOTO(out, rc = -ENOMEM);
 
-               rc = req_capsule_server_pack(&req->rq_pill);
+               rc = req_capsule_server_pack(pill);
                if (rc)
                        GOTO(out, rc);
        }
@@ -1391,13 +1468,13 @@ existing_lock:
                GOTO(out, err);
        }
 
-       dlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
+       dlm_rep = req_capsule_server_get(pill, &RMF_DLM_REP);
 
        ldlm_lock2desc(lock, &dlm_rep->lock_desc);
        ldlm_lock2handle(lock, &dlm_rep->lock_handle);
 
-       if (lock && lock->l_resource->lr_type == LDLM_EXTENT)
-               OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_BL_EVICT, 6);
+       if (lock->l_resource->lr_type == LDLM_EXTENT)
+               CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_BL_EVICT, 6);
 
        /*
         * We never send a blocking AST until the lock is granted, but
@@ -1418,7 +1495,7 @@ existing_lock:
         * Cancel it now instead.
         */
        if (unlikely(req->rq_export->exp_disconnected ||
-                    OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_OLD_EXPORT))) {
+                    CFS_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_OLD_EXPORT))) {
                LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export);
                rc = -ENOTCONN;
        } else if (ldlm_is_ast_sent(lock)) {
@@ -1462,11 +1539,14 @@ existing_lock:
 
        EXIT;
 out:
-       req->rq_status = rc ?: err; /* return either error - b=11190 */
-       if (!req->rq_packed_final) {
-               err = lustre_pack_reply(req, 1, NULL, NULL);
-               if (rc == 0)
-                       rc = err;
+       if (req_capsule_ptlreq(pill)) {
+               req->rq_status = rc ?: err; /* return either error - b=11190 */
+               if (!req->rq_packed_final) {
+                       int rc1 = lustre_pack_reply(req, 1, NULL, NULL);
+
+                       if (rc == 0)
+                               rc = rc1;
+               }
        }
 
        /*
@@ -1479,18 +1559,17 @@ out:
                           err, rc);
 
                if (rc == 0 &&
-                   req_capsule_has_field(&req->rq_pill, &RMF_DLM_LVB,
+                   req_capsule_has_field(pill, &RMF_DLM_LVB,
                                          RCL_SERVER) &&
                    ldlm_lvbo_size(lock) > 0) {
                        void *buf;
                        int buflen;
 
 retry:
-                       buf = req_capsule_server_get(&req->rq_pill,
-                                                    &RMF_DLM_LVB);
-                       LASSERTF(buf != NULL, "req %p, lock %p\n", req, lock);
-                       buflen = req_capsule_get_size(&req->rq_pill,
-                                       &RMF_DLM_LVB, RCL_SERVER);
+                       buf = req_capsule_server_get(pill, &RMF_DLM_LVB);
+                       LASSERTF(buf != NULL, "req %px, lock %px\n", req, lock);
+                       buflen = req_capsule_get_size(pill, &RMF_DLM_LVB,
+                                                     RCL_SERVER);
                        /*
                         * non-replayed lock, delayed lvb init may
                         * need to be occur now
@@ -1500,13 +1579,12 @@ retry:
 
                                rc2 = ldlm_lvbo_fill(lock, buf, &buflen);
                                if (rc2 >= 0) {
-                                       req_capsule_shrink(&req->rq_pill,
-                                                          &RMF_DLM_LVB,
+                                       req_capsule_shrink(pill, &RMF_DLM_LVB,
                                                           rc2, RCL_SERVER);
                                } else if (rc2 == -ERANGE) {
                                        rc2 = req_capsule_server_grow(
-                                                       &req->rq_pill,
-                                                       &RMF_DLM_LVB, buflen);
+                                                       pill, &RMF_DLM_LVB,
+                                                       buflen);
                                        if (!rc2) {
                                                goto retry;
                                        } else {
@@ -1516,8 +1594,7 @@ retry:
                                                 * to client.
                                                 */
                                                req_capsule_shrink(
-                                                       &req->rq_pill,
-                                                       &RMF_DLM_LVB, 0,
+                                                       pill, &RMF_DLM_LVB, 0,
                                                        RCL_SERVER);
                                        }
                                } else {
@@ -1526,8 +1603,7 @@ retry:
                        } else if (flags & LDLM_FL_REPLAY) {
                                /* no LVB resend upon replay */
                                if (buflen > 0)
-                                       req_capsule_shrink(&req->rq_pill,
-                                                          &RMF_DLM_LVB,
+                                       req_capsule_shrink(pill, &RMF_DLM_LVB,
                                                           0, RCL_SERVER);
                                else
                                        rc = buflen;
@@ -1544,13 +1620,15 @@ retry:
                                ldlm_resource_unlink_lock(lock);
                                ldlm_lock_destroy_nolock(lock);
                                unlock_res_and_lock(lock);
-
                        }
+                       ldlm_reprocess_all(lock->l_resource,
+                                          lock->l_policy_data.l_inodebits.bits);
                }
 
                if (!err && !ldlm_is_cbpending(lock) &&
                    dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)
-                       ldlm_reprocess_all(lock->l_resource, lock);
+                       ldlm_reprocess_all(lock->l_resource,
+                                          lock->l_policy_data.l_inodebits.bits);
 
                LDLM_LOCK_RELEASE(lock);
        }
@@ -1560,6 +1638,7 @@ retry:
 
        return rc;
 }
+EXPORT_SYMBOL(ldlm_handle_enqueue);
 
 /*
  * Clear the blocking lock, the race is possible between ldlm_handle_convert0()
@@ -1587,9 +1666,7 @@ void ldlm_clear_blocking_data(struct ldlm_lock *lock)
        ldlm_clear_blocking_lock(lock);
 }
 
-/**
- * Main LDLM entry point for server code to process lock conversion requests.
- */
+/* Main LDLM entry point for server code to process lock conversion requests */
 int ldlm_handle_convert0(struct ptlrpc_request *req,
                         const struct ldlm_request *dlm_req)
 {
@@ -1656,7 +1733,10 @@ int ldlm_handle_convert0(struct ptlrpc_request *req,
                ldlm_clear_blocking_data(lock);
                unlock_res_and_lock(lock);
 
-               ldlm_reprocess_all(lock->l_resource, NULL);
+               /* All old bits should be reprocessed to send new BL AST if
+                * it wasn't sent earlier due to LDLM_FL_AST_SENT bit set.
+                */
+               ldlm_reprocess_all(lock->l_resource, bits);
        }
 
        dlm_rep->lock_handle = lock->l_remote_handle;
@@ -1730,7 +1810,7 @@ int ldlm_request_cancel(struct ptlrpc_request *req,
                 */
                if (res != pres) {
                        if (pres != NULL) {
-                               ldlm_reprocess_all(pres, NULL);
+                               ldlm_reprocess_all(pres, 0);
                                LDLM_RESOURCE_DELREF(pres);
                                ldlm_resource_putref(pres);
                        }
@@ -1749,19 +1829,21 @@ int ldlm_request_cancel(struct ptlrpc_request *req,
                    lock->l_blast_sent != 0) {
                        timeout_t delay = 0;
 
-                       if (ktime_get_real_seconds() > lock->l_blast_sent)
-                               delay = ktime_get_real_seconds() -
+                       if (ktime_get_seconds() > lock->l_blast_sent)
+                               delay = ktime_get_seconds() -
                                        lock->l_blast_sent;
                        LDLM_DEBUG(lock,
                                   "server cancels blocked lock after %ds",
                                   delay);
-                       at_measured(&lock->l_export->exp_bl_lock_at, delay);
+                       obd_at_measure(lock->l_export->exp_obd,
+                                      &lock->l_export->exp_bl_lock_at,
+                                      delay);
                }
                ldlm_lock_cancel(lock);
                LDLM_LOCK_PUT(lock);
        }
        if (pres != NULL) {
-               ldlm_reprocess_all(pres, NULL);
+               ldlm_reprocess_all(pres, 0);
                LDLM_RESOURCE_DELREF(pres);
                ldlm_resource_putref(pres);
        }
@@ -1911,9 +1993,9 @@ static int ldlm_callback_reply(struct ptlrpc_request *req, int rc)
  * This only can happen on client side.
  */
 static int ldlm_handle_cp_callback(struct ptlrpc_request *req,
-                                    struct ldlm_namespace *ns,
-                                    struct ldlm_request *dlm_req,
-                                    struct ldlm_lock *lock)
+                                  struct ldlm_namespace *ns,
+                                  struct ldlm_request *dlm_req,
+                                  struct ldlm_lock *lock)
 {
        LIST_HEAD(ast_list);
        int lvb_len;
@@ -1923,7 +2005,7 @@ static int ldlm_handle_cp_callback(struct ptlrpc_request *req,
 
        LDLM_DEBUG(lock, "client completion callback handler START");
 
-       if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {
+       if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {
                long to = cfs_time_seconds(1);
 
                ldlm_callback_reply(req, 0);
@@ -2032,7 +2114,7 @@ static int ldlm_handle_cp_callback(struct ptlrpc_request *req,
         * Let Enqueue to call osc_lock_upcall() and initialize
         * l_ast_data
         */
-       OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2);
+       CFS_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 2);
 
        ldlm_run_ast_work(ns, &ast_list, LDLM_WORK_CP_AST);
 
@@ -2109,22 +2191,41 @@ static int __ldlm_bl_to_thread(struct ldlm_bl_work_item *blwi,
                               enum ldlm_cancel_flags cancel_flags)
 {
        struct ldlm_bl_pool *blp = ldlm_state->ldlm_bl_pool;
+       char *prio = "regular";
+       int count;
 
        ENTRY;
 
        spin_lock(&blp->blp_lock);
+       /* cannot access blwi after added to list and lock is dropped */
+       count = blwi->blwi_lock ? 1 : blwi->blwi_count;
+
+       /* if the server is waiting on a lock to be cancelled (bl_ast), this is
+        * an urgent request and should go in the priority queue so it doesn't
+        * get stuck behind non-priority work (eg, lru size management)
+        *
+        * We also prioritize discard_data, which is for eviction handling
+        */
        if (blwi->blwi_lock &&
-           ldlm_is_discard_data(blwi->blwi_lock)) {
-               /* add LDLM_FL_DISCARD_DATA requests to the priority list */
+           (ldlm_is_discard_data(blwi->blwi_lock) ||
+            ldlm_is_bl_ast(blwi->blwi_lock))) {
                list_add_tail(&blwi->blwi_entry, &blp->blp_prio_list);
+               prio = "priority";
        } else {
                /* other blocking callbacks are added to the regular list */
                list_add_tail(&blwi->blwi_entry, &blp->blp_list);
        }
+       blp->blp_total_locks += count;
+       blp->blp_total_blwis++;
        spin_unlock(&blp->blp_lock);
 
        wake_up(&blp->blp_waitq);
 
+       /* unlocked read of blp values is intentional - OK for debug */
+       CDEBUG(D_DLMTRACE,
+              "added %d/%d locks to %s blp list, %d blwis in pool\n",
+              count, blp->blp_total_locks, prio, blp->blp_total_blwis);
+
        /*
         * can not check blwi->blwi_flags as blwi could be already freed in
         * LCF_ASYNC mode
@@ -2278,7 +2379,7 @@ static inline void ldlm_callback_errmsg(struct ptlrpc_request *req,
 {
        DEBUG_REQ((req->rq_no_reply || rc) ? D_WARNING : D_DLMTRACE, req,
                  "%s, NID=%s lock=%#llx: rc = %d",
-                 msg, libcfs_id2str(req->rq_peer),
+                 msg, libcfs_idstr(&req->rq_peer),
                  handle ? handle->cookie : 0, rc);
        if (req->rq_no_reply)
                CWARN("No reply was sent, maybe cause b=21636.\n");
@@ -2321,18 +2422,18 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
 
        switch (lustre_msg_get_opc(req->rq_reqmsg)) {
        case LDLM_BL_CALLBACK:
-               if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET)) {
+               if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK_NET)) {
                        if (cfs_fail_err)
                                ldlm_callback_reply(req, -(int)cfs_fail_err);
                        RETURN(0);
                }
                break;
        case LDLM_CP_CALLBACK:
-               if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CP_CALLBACK_NET))
+               if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_CP_CALLBACK_NET))
                        RETURN(0);
                break;
        case LDLM_GL_CALLBACK:
-               if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_GL_CALLBACK_NET))
+               if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_GL_CALLBACK_NET))
                        RETURN(0);
                break;
        case LDLM_SET_INFO:
@@ -2363,7 +2464,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
         * Force a known safe race, send a cancel to the server for a lock
         * which the server has already started a blocking callback on.
         */
-       if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE) &&
+       if (CFS_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE) &&
            lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
                rc = ldlm_cli_cancel(&dlm_req->lock_handle[0], 0);
                if (rc < 0)
@@ -2383,7 +2484,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
 
        if (ldlm_is_fail_loc(lock) &&
            lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK)
-               OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
+               CFS_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
 
        /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
        lock_res_and_lock(lock);
@@ -2432,7 +2533,7 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
 
        switch (lustre_msg_get_opc(req->rq_reqmsg)) {
        case LDLM_BL_CALLBACK:
-               CDEBUG(D_INODE, "blocking ast\n");
+               LDLM_DEBUG(lock, "blocking ast");
                req_capsule_extend(&req->rq_pill, &RQF_LDLM_BL_CALLBACK);
                if (!ldlm_is_cancel_on_block(lock)) {
                        rc = ldlm_callback_reply(req, 0);
@@ -2444,14 +2545,14 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                        ldlm_handle_bl_callback(ns, &dlm_req->lock_desc, lock);
                break;
        case LDLM_CP_CALLBACK:
-               CDEBUG(D_INODE, "completion ast\n");
+               LDLM_DEBUG(lock, "completion ast");
                req_capsule_extend(&req->rq_pill, &RQF_LDLM_CP_CALLBACK);
                rc = ldlm_handle_cp_callback(req, ns, dlm_req, lock);
-               if (!OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE))
+               if (!CFS_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE))
                        ldlm_callback_reply(req, rc);
                break;
        case LDLM_GL_CALLBACK:
-               CDEBUG(D_INODE, "glimpse ast\n");
+               LDLM_DEBUG(lock, "glimpse ast");
                req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
                ldlm_handle_gl_callback(req, ns, dlm_req, lock);
                break;
@@ -2488,7 +2589,7 @@ static int ldlm_cancel_handler(struct ptlrpc_request *req)
 
                CERROR("%s from %s arrived at %llu with bad export cookie %llu\n",
                       ll_opcode2str(lustre_msg_get_opc(req->rq_reqmsg)),
-                      libcfs_nid2str(req->rq_peer.nid),
+                      libcfs_nidstr(&req->rq_peer.nid),
                       (unsigned long long)req->rq_arrival_time.tv_sec,
                       lustre_msg_get_handle(req->rq_reqmsg)->cookie);
 
@@ -2676,7 +2777,6 @@ static int ldlm_revoke_lock_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
                 * In the function below, .hs_keycmp resolves to
                 * ldlm_export_lock_keycmp()
                 */
-               /* coverity[overrun-buffer-val] */
                cfs_hash_del(lock->l_export->exp_lock_hash,
                             &lock->l_remote_handle, &lock->l_exp_hash);
        }
@@ -2690,15 +2790,19 @@ static int ldlm_revoke_lock_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
 
 void ldlm_revoke_export_locks(struct obd_export *exp)
 {
+       int rc;
        LIST_HEAD(rpc_list);
 
        ENTRY;
 
        cfs_hash_for_each_nolock(exp->exp_lock_hash,
                                 ldlm_revoke_lock_cb, &rpc_list, 0);
-       ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list,
+       rc = ldlm_run_ast_work(exp->exp_obd->obd_namespace, &rpc_list,
                          LDLM_WORK_REVOKE_AST);
 
+       if (rc == -ERESTART)
+               ldlm_reprocess_recovery_done(exp->exp_obd->obd_namespace);
+
        EXIT;
 }
 EXPORT_SYMBOL(ldlm_revoke_export_locks);
@@ -2727,22 +2831,35 @@ static int ldlm_bl_get_work(struct ldlm_bl_pool *blp,
        /* process a request from the blp_list at least every blp_num_threads */
        if (!list_empty(&blp->blp_list) &&
            (list_empty(&blp->blp_prio_list) || num_bl == 0))
-               blwi = list_entry(blp->blp_list.next,
-                                 struct ldlm_bl_work_item, blwi_entry);
+               blwi = list_first_entry(&blp->blp_list,
+                                       struct ldlm_bl_work_item, blwi_entry);
        else
                if (!list_empty(&blp->blp_prio_list))
-                       blwi = list_entry(blp->blp_prio_list.next,
-                                         struct ldlm_bl_work_item,
-                                         blwi_entry);
+                       blwi = list_first_entry(&blp->blp_prio_list,
+                                               struct ldlm_bl_work_item,
+                                               blwi_entry);
 
        if (blwi) {
                if (++num_bl >= num_th)
                        num_bl = 0;
                list_del(&blwi->blwi_entry);
+               blp->blp_total_locks -= blwi->blwi_lock ? 1 : blwi->blwi_count;
+               blp->blp_total_blwis--;
        }
        spin_unlock(&blp->blp_lock);
        *p_blwi = blwi;
 
+       /* intentional unlocked read of blp values - OK for debug */
+       if (blwi) {
+               CDEBUG(D_DLMTRACE,
+                      "Got %d locks of %d total in blp.  (%d blwis in pool)\n",
+                      blwi->blwi_lock ? 1 : blwi->blwi_count,
+                      blp->blp_total_locks, blp->blp_total_blwis);
+       } else {
+               CDEBUG(D_DLMTRACE,
+                      "No blwi found in queue (no bl locks in queue)\n");
+       }
+
        if (*p_exp != NULL && *p_blwi != NULL) {
                obd_stale_export_put(*p_exp);
                *p_exp = NULL;
@@ -2826,7 +2943,7 @@ static int ldlm_bl_thread_blwi(struct ldlm_bl_pool *blp,
        if (blwi->blwi_mem_pressure)
                mpflags = memalloc_noreclaim_save();
 
-       OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL2, 4);
+       CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL2, 4);
 
        if (blwi->blwi_count) {
                int count;
@@ -2876,7 +2993,7 @@ static int ldlm_bl_thread_exports(struct ldlm_bl_pool *blp,
 
        ENTRY;
 
-       OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_BL_EVICT, 4);
+       CFS_FAIL_TIMEOUT(OBD_FAIL_LDLM_BL_EVICT, 4);
 
        num = ldlm_export_cancel_blocked_locks(exp);
        if (num == 0)
@@ -3004,13 +3121,12 @@ void ldlm_put_ref(void)
        EXIT;
 }
 
-/*
- * Export handle<->lock hash operations.
- */
+/* Export handle<->lock hash operations. */
 static unsigned
-ldlm_export_lock_hash(struct cfs_hash *hs, const void *key, unsigned int mask)
+ldlm_export_lock_hash(struct cfs_hash *hs, const void *key,
+                     const unsigned int bits)
 {
-       return cfs_hash_u64_hash(((struct lustre_handle *)key)->cookie, mask);
+       return cfs_hash_64(((struct lustre_handle *)key)->cookie, bits);
 }
 
 static void *
@@ -3216,9 +3332,9 @@ static int ldlm_setup(void)
                        .so_req_handler         = ldlm_callback_handler,
                },
        };
-       ldlm_state->ldlm_cb_service = \
-                       ptlrpc_register_service(&conf, ldlm_svc_kset,
-                                               ldlm_svc_debugfs_dir);
+       ldlm_state->ldlm_cb_service = ptlrpc_register_service(&conf,
+                                                             ldlm_svc_kset,
+                                                             ldlm_svc_debugfs_dir);
        if (IS_ERR(ldlm_state->ldlm_cb_service)) {
                CERROR("failed to start service\n");
                rc = PTR_ERR(ldlm_state->ldlm_cb_service);
@@ -3248,8 +3364,8 @@ static int ldlm_setup(void)
                        .tc_nthrs_max           = LDLM_NTHRS_MAX,
                        .tc_nthrs_user          = ldlm_num_threads,
                        .tc_cpu_bind            = ldlm_cpu_bind,
-                       .tc_ctx_tags            = LCT_MD_THREAD | \
-                                                 LCT_DT_THREAD | \
+                       .tc_ctx_tags            = LCT_MD_THREAD |
+                                                 LCT_DT_THREAD |
                                                  LCT_CL_THREAD,
                },
                .psc_cpt                = {
@@ -3261,7 +3377,7 @@ static int ldlm_setup(void)
                        .so_hpreq_handler       = ldlm_hpreq_handler,
                },
        };
-       ldlm_state->ldlm_cancel_service = \
+       ldlm_state->ldlm_cancel_service =
                        ptlrpc_register_service(&conf, ldlm_svc_kset,
                                                ldlm_svc_debugfs_dir);
        if (IS_ERR(ldlm_state->ldlm_cancel_service)) {
@@ -3283,12 +3399,14 @@ static int ldlm_setup(void)
        init_waitqueue_head(&blp->blp_waitq);
        atomic_set(&blp->blp_num_threads, 0);
        atomic_set(&blp->blp_busy_threads, 0);
+       blp->blp_total_locks = 0;
+       blp->blp_total_blwis = 0;
 
        if (ldlm_num_threads == 0) {
                blp->blp_min_threads = LDLM_NTHRS_INIT;
                blp->blp_max_threads = LDLM_NTHRS_MAX;
        } else {
-               blp->blp_min_threads = blp->blp_max_threads = \
+               blp->blp_min_threads = blp->blp_max_threads =
                        min_t(int, LDLM_NTHRS_MAX, max_t(int, LDLM_NTHRS_INIT,
                                                         ldlm_num_threads));
        }
@@ -3459,7 +3577,8 @@ out_resource:
 void ldlm_exit(void)
 {
        if (ldlm_refcount)
-               CERROR("ldlm_refcount is %d in ldlm_exit!\n", ldlm_refcount);
+               CERROR("ldlm_refcount is %d in %s\n", ldlm_refcount, __func__);
+       rcu_barrier();
        kmem_cache_destroy(ldlm_resource_slab);
        /*
         * ldlm_lock_put() use RCU to call ldlm_lock_free, so need call