Whamcloud - gitweb
LU-14182 lov: cancel layout lock on replay deadlock
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
index b53a858..55994cd 100644 (file)
@@ -552,8 +552,11 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns,
                 * bl_ast and -EINVAL reply is sent to server anyways.
                 * b=17645
                 */
-               lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_FAILED |
+               lock->l_flags |= LDLM_FL_FAILED |
                                 LDLM_FL_ATOMIC_CB | LDLM_FL_CBPENDING;
+               if (!(ldlm_is_bl_ast(lock) &&
+                     lock->l_remote_handle.cookie != 0))
+                       lock->l_flags |= LDLM_FL_LOCAL_ONLY;
                need_cancel = 1;
        }
        unlock_res_and_lock(lock);
@@ -587,9 +590,14 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns,
        }
 }
 
-static bool ldlm_request_slot_needed(enum ldlm_type type)
+static bool ldlm_request_slot_needed(struct ldlm_enqueue_info *einfo)
 {
-       return type == LDLM_FLOCK || type == LDLM_IBITS;
+       /* exclude EXTENT locks and DOM-only IBITS locks because they
+        * are asynchronous and don't wait on server being blocked.
+        */
+       return einfo->ei_type == LDLM_FLOCK ||
+              (einfo->ei_type == LDLM_IBITS &&
+               einfo->ei_inodebits != MDS_INODELOCK_DOM);
 }
 
 /**
@@ -598,21 +606,21 @@ static bool ldlm_request_slot_needed(enum ldlm_type type)
  * Called after receiving reply from server.
  */
 int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
-                         enum ldlm_type type, __u8 with_policy,
-                         enum ldlm_mode mode, __u64 *flags, void *lvb,
+                         struct ldlm_enqueue_info *einfo,
+                         __u8 with_policy, __u64 *ldlm_flags, void *lvb,
                          __u32 lvb_len, const struct lustre_handle *lockh,
                          int rc)
 {
        struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
        const struct lu_env *env = NULL;
-       int is_replay = *flags & LDLM_FL_REPLAY;
+       int is_replay = *ldlm_flags & LDLM_FL_REPLAY;
        struct ldlm_lock *lock;
        struct ldlm_reply *reply;
        int cleanup_phase = 1;
 
        ENTRY;
 
-       if (ldlm_request_slot_needed(type))
+       if (ldlm_request_slot_needed(einfo))
                obd_put_request_slot(&req->rq_import->imp_obd->u.cli);
 
        ptlrpc_put_mod_rpc_slot(req);
@@ -623,7 +631,7 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
        lock = ldlm_handle2lock(lockh);
        /* ldlm_cli_enqueue is holding a reference on this lock. */
        if (!lock) {
-               LASSERT(type == LDLM_FLOCK);
+               LASSERT(einfo->ei_type == LDLM_FLOCK);
                RETURN(-ENOLCK);
        }
 
@@ -687,20 +695,20 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                lock->l_remote_handle = reply->lock_handle;
        }
 
-       *flags = ldlm_flags_from_wire(reply->lock_flags);
+       *ldlm_flags = ldlm_flags_from_wire(reply->lock_flags);
        lock->l_flags |= ldlm_flags_from_wire(reply->lock_flags &
                                              LDLM_FL_INHERIT_MASK);
        unlock_res_and_lock(lock);
 
        CDEBUG(D_INFO, "local: %p, remote cookie: %#llx, flags: %#llx\n",
-              lock, reply->lock_handle.cookie, *flags);
+              lock, reply->lock_handle.cookie, *ldlm_flags);
 
        /*
         * If enqueue returned a blocked lock but the completion handler has
         * already run, then it fixed up the resource and we don't need to do it
         * again.
         */
-       if ((*flags) & LDLM_FL_LOCK_CHANGED) {
+       if ((*ldlm_flags) & LDLM_FL_LOCK_CHANGED) {
                int newmode = reply->lock_desc.l_req_mode;
 
                LASSERT(!is_replay);
@@ -732,12 +740,12 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                                                &lock->l_policy_data);
                }
 
-               if (type != LDLM_PLAIN)
+               if (einfo->ei_type != LDLM_PLAIN)
                        LDLM_DEBUG(lock,
                                   "client-side enqueue, new policy data");
        }
 
-       if ((*flags) & LDLM_FL_AST_SENT) {
+       if ((*ldlm_flags) & LDLM_FL_AST_SENT) {
                lock_res_and_lock(lock);
                ldlm_bl_desc2lock(&reply->lock_desc, lock);
                lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
@@ -768,9 +776,10 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
        }
 
        if (!is_replay) {
-               rc = ldlm_lock_enqueue(env, ns, &lock, NULL, flags);
+               rc = ldlm_lock_enqueue(env, ns, &lock, NULL, ldlm_flags);
                if (lock->l_completion_ast != NULL) {
-                       int err = lock->l_completion_ast(lock, *flags, NULL);
+                       int err = lock->l_completion_ast(lock, *ldlm_flags,
+                                                        NULL);
 
                        if (!rc)
                                rc = err;
@@ -791,7 +800,7 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
        EXIT;
 cleanup:
        if (cleanup_phase == 1 && rc)
-               failed_lock_cleanup(ns, lock, mode);
+               failed_lock_cleanup(ns, lock, einfo->ei_mode);
        /* Put lock 2 times, the second reference is held by ldlm_cli_enqueue */
        LDLM_LOCK_PUT(lock);
        LDLM_LOCK_RELEASE(lock);
@@ -1066,24 +1075,15 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
 
        /* extended LDLM opcodes in client stats */
        if (exp->exp_obd->obd_svc_stats != NULL) {
-               bool glimpse = *flags & LDLM_FL_HAS_INTENT;
-
-               /* OST glimpse has no intent buffer */
-               if (req_capsule_has_field(&req->rq_pill, &RMF_LDLM_INTENT,
-                                         RCL_CLIENT)) {
-                       struct ldlm_intent *it;
-
-                       it = req_capsule_client_get(&req->rq_pill,
-                                                   &RMF_LDLM_INTENT);
-                       glimpse = (it && (it->opc == IT_GLIMPSE));
-               }
-
-               if (!glimpse)
-                       ldlm_svc_get_eopc(body, exp->exp_obd->obd_svc_stats);
-               else
+               /* glimpse is intent with no intent buffer */
+               if (*flags & LDLM_FL_HAS_INTENT &&
+                   !req_capsule_has_field(&req->rq_pill, &RMF_LDLM_INTENT,
+                                          RCL_CLIENT))
                        lprocfs_counter_incr(exp->exp_obd->obd_svc_stats,
                                             PTLRPC_LAST_CNTR +
                                             LDLM_GLIMPSE_ENQUEUE);
+               else
+                       ldlm_svc_get_eopc(body, exp->exp_obd->obd_svc_stats);
        }
 
        /* It is important to obtain modify RPC slot first (if applicable), so
@@ -1093,13 +1093,15 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
        if (einfo->ei_enq_slot)
                ptlrpc_get_mod_rpc_slot(req);
 
-       if (ldlm_request_slot_needed(einfo->ei_type)) {
+       if (ldlm_request_slot_needed(einfo)) {
                rc = obd_get_request_slot(&req->rq_import->imp_obd->u.cli);
                if (rc) {
                        if (einfo->ei_enq_slot)
                                ptlrpc_put_mod_rpc_slot(req);
                        failed_lock_cleanup(ns, lock, einfo->ei_mode);
                        LDLM_LOCK_RELEASE(lock);
+                       if (!req_passed_in)
+                               ptlrpc_req_finished(req);
                        GOTO(out, rc);
                }
        }
@@ -1113,9 +1115,8 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
 
        rc = ptlrpc_queue_wait(req);
 
-       err = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, policy ? 1 : 0,
-                                   einfo->ei_mode, flags, lvb, lvb_len,
-                                   lockh, rc);
+       err = ldlm_cli_enqueue_fini(exp, req, einfo, policy ? 1 : 0, flags,
+                                   lvb, lvb_len, lockh, rc);
 
        /*
         * If ldlm_cli_enqueue_fini did not find the lock, we need to free
@@ -2479,6 +2480,8 @@ static int replay_one_lock(struct obd_import *imp, struct ldlm_lock *lock)
 
        /* We're part of recovery, so don't wait for it. */
        req->rq_send_state = LUSTRE_IMP_REPLAY_LOCKS;
+       /* If the state changed while we were prepared, don't wait */
+       req->rq_no_delay = 1;
 
        body = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
        ldlm_lock2desc(lock, &body->lock_desc);
@@ -2528,6 +2531,8 @@ static void ldlm_cancel_unused_locks_for_replay(struct ldlm_namespace *ns)
               "Dropping as many unused locks as possible before replay for namespace %s (%d)\n",
               ldlm_ns_name(ns), ns->ns_nr_unused);
 
+       OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_REPLAY_PAUSE, cfs_fail_val);
+
        /*
         * We don't need to care whether or not LRU resize is enabled
         * because the LDLM_LRU_FLAG_NO_WAIT policy doesn't use the
@@ -2575,9 +2580,11 @@ int __ldlm_replay_locks(struct obd_import *imp, bool rate_limit)
 
        list_for_each_entry_safe(lock, next, &list, l_pending_chain) {
                list_del_init(&lock->l_pending_chain);
-               if (rc) {
+               /* If we disconnected in the middle - cleanup and let
+                * reconnection to happen again. LU-14027 */
+               if (rc || (imp->imp_state != LUSTRE_IMP_REPLAY_LOCKS)) {
                        LDLM_LOCK_RELEASE(lock);
-                       continue; /* or try to do the rest? */
+                       continue;
                }
                rc = replay_one_lock(imp, lock);
                LDLM_LOCK_RELEASE(lock);