Whamcloud - gitweb
LU-14182 lov: cancel layout lock on replay deadlock
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
index 5522a80..55994cd 100644 (file)
@@ -552,8 +552,11 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns,
                 * bl_ast and -EINVAL reply is sent to server anyways.
                 * b=17645
                 */
-               lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_FAILED |
+               lock->l_flags |= LDLM_FL_FAILED |
                                 LDLM_FL_ATOMIC_CB | LDLM_FL_CBPENDING;
+               if (!(ldlm_is_bl_ast(lock) &&
+                     lock->l_remote_handle.cookie != 0))
+                       lock->l_flags |= LDLM_FL_LOCAL_ONLY;
                need_cancel = 1;
        }
        unlock_res_and_lock(lock);
@@ -587,9 +590,14 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns,
        }
 }
 
-static bool ldlm_request_slot_needed(enum ldlm_type type)
+static bool ldlm_request_slot_needed(struct ldlm_enqueue_info *einfo)
 {
-       return type == LDLM_FLOCK || type == LDLM_IBITS;
+       /* exclude EXTENT locks and DOM-only IBITS locks because they
+        * are asynchronous and don't wait on server being blocked.
+        */
+       return einfo->ei_type == LDLM_FLOCK ||
+              (einfo->ei_type == LDLM_IBITS &&
+               einfo->ei_inodebits != MDS_INODELOCK_DOM);
 }
 
 /**
@@ -598,21 +606,21 @@ static bool ldlm_request_slot_needed(enum ldlm_type type)
  * Called after receiving reply from server.
  */
 int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
-                         enum ldlm_type type, __u8 with_policy,
-                         enum ldlm_mode mode, __u64 *flags, void *lvb,
+                         struct ldlm_enqueue_info *einfo,
+                         __u8 with_policy, __u64 *ldlm_flags, void *lvb,
                          __u32 lvb_len, const struct lustre_handle *lockh,
                          int rc)
 {
        struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
        const struct lu_env *env = NULL;
-       int is_replay = *flags & LDLM_FL_REPLAY;
+       int is_replay = *ldlm_flags & LDLM_FL_REPLAY;
        struct ldlm_lock *lock;
        struct ldlm_reply *reply;
        int cleanup_phase = 1;
 
        ENTRY;
 
-       if (ldlm_request_slot_needed(type))
+       if (ldlm_request_slot_needed(einfo))
                obd_put_request_slot(&req->rq_import->imp_obd->u.cli);
 
        ptlrpc_put_mod_rpc_slot(req);
@@ -623,7 +631,7 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
        lock = ldlm_handle2lock(lockh);
        /* ldlm_cli_enqueue is holding a reference on this lock. */
        if (!lock) {
-               LASSERT(type == LDLM_FLOCK);
+               LASSERT(einfo->ei_type == LDLM_FLOCK);
                RETURN(-ENOLCK);
        }
 
@@ -687,20 +695,20 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                lock->l_remote_handle = reply->lock_handle;
        }
 
-       *flags = ldlm_flags_from_wire(reply->lock_flags);
+       *ldlm_flags = ldlm_flags_from_wire(reply->lock_flags);
        lock->l_flags |= ldlm_flags_from_wire(reply->lock_flags &
                                              LDLM_FL_INHERIT_MASK);
        unlock_res_and_lock(lock);
 
        CDEBUG(D_INFO, "local: %p, remote cookie: %#llx, flags: %#llx\n",
-              lock, reply->lock_handle.cookie, *flags);
+              lock, reply->lock_handle.cookie, *ldlm_flags);
 
        /*
         * If enqueue returned a blocked lock but the completion handler has
         * already run, then it fixed up the resource and we don't need to do it
         * again.
         */
-       if ((*flags) & LDLM_FL_LOCK_CHANGED) {
+       if ((*ldlm_flags) & LDLM_FL_LOCK_CHANGED) {
                int newmode = reply->lock_desc.l_req_mode;
 
                LASSERT(!is_replay);
@@ -732,12 +740,12 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                                                &lock->l_policy_data);
                }
 
-               if (type != LDLM_PLAIN)
+               if (einfo->ei_type != LDLM_PLAIN)
                        LDLM_DEBUG(lock,
                                   "client-side enqueue, new policy data");
        }
 
-       if ((*flags) & LDLM_FL_AST_SENT) {
+       if ((*ldlm_flags) & LDLM_FL_AST_SENT) {
                lock_res_and_lock(lock);
                ldlm_bl_desc2lock(&reply->lock_desc, lock);
                lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
@@ -768,9 +776,10 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
        }
 
        if (!is_replay) {
-               rc = ldlm_lock_enqueue(env, ns, &lock, NULL, flags);
+               rc = ldlm_lock_enqueue(env, ns, &lock, NULL, ldlm_flags);
                if (lock->l_completion_ast != NULL) {
-                       int err = lock->l_completion_ast(lock, *flags, NULL);
+                       int err = lock->l_completion_ast(lock, *ldlm_flags,
+                                                        NULL);
 
                        if (!rc)
                                rc = err;
@@ -791,7 +800,7 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
        EXIT;
 cleanup:
        if (cleanup_phase == 1 && rc)
-               failed_lock_cleanup(ns, lock, mode);
+               failed_lock_cleanup(ns, lock, einfo->ei_mode);
        /* Put lock 2 times, the second reference is held by ldlm_cli_enqueue */
        LDLM_LOCK_PUT(lock);
        LDLM_LOCK_RELEASE(lock);
@@ -1066,24 +1075,15 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
 
        /* extended LDLM opcodes in client stats */
        if (exp->exp_obd->obd_svc_stats != NULL) {
-               bool glimpse = *flags & LDLM_FL_HAS_INTENT;
-
-               /* OST glimpse has no intent buffer */
-               if (req_capsule_has_field(&req->rq_pill, &RMF_LDLM_INTENT,
-                                         RCL_CLIENT)) {
-                       struct ldlm_intent *it;
-
-                       it = req_capsule_client_get(&req->rq_pill,
-                                                   &RMF_LDLM_INTENT);
-                       glimpse = (it && (it->opc == IT_GLIMPSE));
-               }
-
-               if (!glimpse)
-                       ldlm_svc_get_eopc(body, exp->exp_obd->obd_svc_stats);
-               else
+               /* glimpse is intent with no intent buffer */
+               if (*flags & LDLM_FL_HAS_INTENT &&
+                   !req_capsule_has_field(&req->rq_pill, &RMF_LDLM_INTENT,
+                                          RCL_CLIENT))
                        lprocfs_counter_incr(exp->exp_obd->obd_svc_stats,
                                             PTLRPC_LAST_CNTR +
                                             LDLM_GLIMPSE_ENQUEUE);
+               else
+                       ldlm_svc_get_eopc(body, exp->exp_obd->obd_svc_stats);
        }
 
        /* It is important to obtain modify RPC slot first (if applicable), so
@@ -1093,13 +1093,15 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
        if (einfo->ei_enq_slot)
                ptlrpc_get_mod_rpc_slot(req);
 
-       if (ldlm_request_slot_needed(einfo->ei_type)) {
+       if (ldlm_request_slot_needed(einfo)) {
                rc = obd_get_request_slot(&req->rq_import->imp_obd->u.cli);
                if (rc) {
                        if (einfo->ei_enq_slot)
                                ptlrpc_put_mod_rpc_slot(req);
                        failed_lock_cleanup(ns, lock, einfo->ei_mode);
                        LDLM_LOCK_RELEASE(lock);
+                       if (!req_passed_in)
+                               ptlrpc_req_finished(req);
                        GOTO(out, rc);
                }
        }
@@ -1113,9 +1115,8 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
 
        rc = ptlrpc_queue_wait(req);
 
-       err = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, policy ? 1 : 0,
-                                   einfo->ei_mode, flags, lvb, lvb_len,
-                                   lockh, rc);
+       err = ldlm_cli_enqueue_fini(exp, req, einfo, policy ? 1 : 0, flags,
+                                   lvb, lvb_len, lockh, rc);
 
        /*
         * If ldlm_cli_enqueue_fini did not find the lock, we need to free
@@ -1418,8 +1419,9 @@ static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp)
  */
 int ldlm_cli_update_pool(struct ptlrpc_request *req)
 {
+       struct ldlm_namespace *ns;
        struct obd_device *obd;
-       __u64 new_slv;
+       __u64 new_slv, ratio;
        __u32 new_limit;
 
        ENTRY;
@@ -1457,17 +1459,39 @@ int ldlm_cli_update_pool(struct ptlrpc_request *req)
        read_unlock(&obd->obd_pool_lock);
 
        /*
-        * Set new SLV and limit in OBD fields to make them accessible
-        * to the pool thread. We do not access obd_namespace and pool
-        * directly here as there is no reliable way to make sure that
-        * they are still alive at cleanup time. Evil races are possible
-        * which may cause Oops at that time.
+        * OBD device keeps the new pool attributes before they are handled by
+        * the pool.
         */
        write_lock(&obd->obd_pool_lock);
        obd->obd_pool_slv = new_slv;
        obd->obd_pool_limit = new_limit;
        write_unlock(&obd->obd_pool_lock);
 
+       /*
+        * Check if an urgent pool recalc is needed, let it to be a change of
+        * SLV on 10%. It is applicable to LRU resize enabled case only.
+        */
+       ns = obd->obd_namespace;
+       if (!ns_connect_lru_resize(ns) ||
+           ldlm_pool_get_slv(&ns->ns_pool) < new_slv)
+               RETURN(0);
+
+       ratio = 100 * new_slv / ldlm_pool_get_slv(&ns->ns_pool);
+       if (100 - ratio >= ns->ns_recalc_pct &&
+           !ns->ns_stopping && !ns->ns_rpc_recalc) {
+               bool recalc = false;
+
+               spin_lock(&ns->ns_lock);
+               if (!ns->ns_stopping && !ns->ns_rpc_recalc) {
+                       ldlm_namespace_get(ns);
+                       recalc = true;
+                       ns->ns_rpc_recalc = 1;
+               }
+               spin_unlock(&ns->ns_lock);
+               if (recalc)
+                       ldlm_bl_to_thread_ns(ns);
+       }
+
        RETURN(0);
 }
 
@@ -1679,7 +1703,7 @@ static enum ldlm_policy_res ldlm_cancel_lrur_policy(struct ldlm_namespace *ns,
        lvf = ldlm_pool_get_lvf(pl);
        la = div_u64(ktime_to_ns(ktime_sub(cur, lock->l_last_used)),
                     NSEC_PER_SEC);
-       lv = lvf * la * ns->ns_nr_unused;
+       lv = lvf * la * ns->ns_nr_unused >> 8;
 
        /* Inform pool about current CLV to see it via debugfs. */
        ldlm_pool_set_clv(pl, lv);
@@ -1768,6 +1792,7 @@ ldlm_cancel_lru_policy(struct ldlm_namespace *ns, enum ldlm_lru_flags lru_flags)
  *   redundant unused locks are canceled locally;
  * - also cancel locally unused aged locks;
  * - do not cancel more than \a max locks;
+ * - if some locks are cancelled, try to cancel at least \a batch locks
  * - GET the found locks and add them into the \a cancels list.
  *
  * A client lock can be added to the l_bl_ast list only when it is
@@ -1792,7 +1817,8 @@ ldlm_cancel_lru_policy(struct ldlm_namespace *ns, enum ldlm_lru_flags lru_flags)
  *                              discard those pages.
  */
 static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
-                                struct list_head *cancels, int min, int max,
+                                struct list_head *cancels,
+                                int min, int max, int batch,
                                 enum ldlm_lru_flags lru_flags)
 {
        ldlm_cancel_lru_policy_t pf;
@@ -1800,11 +1826,27 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
        int no_wait = lru_flags & LDLM_LRU_FLAG_NO_WAIT;
        ENTRY;
 
+       /*
+        * Let only 1 thread to proceed. However, not for those which have the
+        * @max limit given (ELC), as LRU may be left not cleaned up in full.
+        */
+       if (max == 0) {
+               if (test_and_set_bit(LDLM_LRU_CANCEL, &ns->ns_flags))
+                       RETURN(0);
+       } else if (test_bit(LDLM_LRU_CANCEL, &ns->ns_flags))
+               RETURN(0);
+
        LASSERT(ergo(max, min <= max));
+       /* No sense to give @batch for ELC */
+       LASSERT(ergo(max, batch == 0));
 
        if (!ns_connect_lru_resize(ns))
                min = max_t(int, min, ns->ns_nr_unused - ns->ns_max_unused);
 
+       /* If at least 1 lock is to be cancelled, cancel at least @batch locks */
+       if (min && min < batch)
+               min = batch;
+
        pf = ldlm_cancel_lru_policy(ns, lru_flags);
        LASSERT(pf != NULL);
 
@@ -1937,7 +1979,14 @@ static int ldlm_prepare_lru_list(struct ldlm_namespace *ns,
                unlock_res_and_lock(lock);
                lu_ref_del(&lock->l_reference, __FUNCTION__, current);
                added++;
+               /* Once a lock added, batch the requested amount */
+               if (min == 0)
+                       min = batch;
        }
+
+       if (max == 0)
+               clear_bit(LDLM_LRU_CANCEL, &ns->ns_flags);
+
        RETURN(added);
 }
 
@@ -1948,7 +1997,7 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, struct list_head *cancels,
 {
        int added;
 
-       added = ldlm_prepare_lru_list(ns, cancels, min, max, lru_flags);
+       added = ldlm_prepare_lru_list(ns, cancels, min, max, 0, lru_flags);
        if (added <= 0)
                return added;
 
@@ -1976,7 +2025,8 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns, int min,
         * Just prepare the list of locks, do not actually cancel them yet.
         * Locks are cancelled later in a separate thread.
         */
-       count = ldlm_prepare_lru_list(ns, &cancels, min, 0, lru_flags);
+       count = ldlm_prepare_lru_list(ns, &cancels, min, 0,
+                                     ns->ns_cancel_batch, lru_flags);
        rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count, cancel_flags);
        if (rc == 0)
                RETURN(count);
@@ -2430,6 +2480,8 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
 
        /* We're part of recovery, so don't wait for it. */
        req->rq_send_state = LUSTRE_IMP_REPLAY_LOCKS;
+       /* If the state changed while we were prepared, don't wait */
+       req->rq_no_delay = 1;
 
        body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
        ldlm_lock2desc(lock, &body->lock_desc);
@@ -2479,6 +2531,8 @@ static void ldlm_cancel_unused_locks_for_replay(struct ldlm_namespace *ns)
               "Dropping as many unused locks as possible before replay for namespace %s (%d)\n",
               ldlm_ns_name(ns), ns->ns_nr_unused);
 
+       OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_REPLAY_PAUSE, cfs_fail_val);
+
        /*
         * We don't need to care whether or not LRU resize is enabled
         * because the LDLM_LRU_FLAG_NO_WAIT policy doesn't use the
@@ -2526,9 +2580,11 @@ int __ldlm_replay_locks(struct obd_import *imp, bool rate_limit)
 
        list_for_each_entry_safe(lock, next, &list, l_pending_chain) {
                list_del_init(&lock->l_pending_chain);
-               if (rc) {
+               /* If we disconnected in the middle - cleanup and let
+                * reconnection to happen again. LU-14027 */
+               if (rc || (imp->imp_state != LUSTRE_IMP_REPLAY_LOCKS)) {
                        LDLM_LOCK_RELEASE(lock);
-                       continue; /* or try to do the rest? */
+                       continue;
                }
                rc = replay_one_lock(imp, lock);
                LDLM_LOCK_RELEASE(lock);