Whamcloud - gitweb
LU-13456 ldlm: fix reprocessing of locks with more bits
[fs/lustre-release.git] / lustre / ldlm / ldlm_request.c
index 46b3751..bb1fa10 100644 (file)
@@ -27,7 +27,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  */
 /**
  * This file contains Asynchronous System Trap (AST) handlers and related
@@ -237,9 +236,8 @@ int ldlm_completion_ast_async(struct ldlm_lock *lock, __u64 flags, void *data)
                RETURN(ldlm_completion_tail(lock, data));
        }
 
-       LDLM_DEBUG(lock,
-                  "client-side enqueue returned a blocked lock, going forward");
-       ldlm_reprocess_all(lock->l_resource, NULL);
+       LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, going forward");
+       ldlm_reprocess_all(lock->l_resource, 0);
        RETURN(0);
 }
 EXPORT_SYMBOL(ldlm_completion_ast_async);
@@ -590,9 +588,14 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns,
        }
 }
 
-static bool ldlm_request_slot_needed(enum ldlm_type type)
+static bool ldlm_request_slot_needed(struct ldlm_enqueue_info *einfo)
 {
-       return type == LDLM_FLOCK || type == LDLM_IBITS;
+       /* exclude EXTENT locks and DOM-only IBITS locks because they
+        * are asynchronous and don't wait on server being blocked.
+        */
+       return einfo->ei_type == LDLM_FLOCK ||
+              (einfo->ei_type == LDLM_IBITS &&
+               einfo->ei_inodebits != MDS_INODELOCK_DOM);
 }
 
 /**
@@ -601,21 +604,21 @@ static bool ldlm_request_slot_needed(enum ldlm_type type)
  * Called after receiving reply from server.
  */
 int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
-                         enum ldlm_type type, __u8 with_policy,
-                         enum ldlm_mode mode, __u64 *flags, void *lvb,
+                         struct ldlm_enqueue_info *einfo,
+                         __u8 with_policy, __u64 *ldlm_flags, void *lvb,
                          __u32 lvb_len, const struct lustre_handle *lockh,
                          int rc)
 {
        struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
        const struct lu_env *env = NULL;
-       int is_replay = *flags & LDLM_FL_REPLAY;
+       int is_replay = *ldlm_flags & LDLM_FL_REPLAY;
        struct ldlm_lock *lock;
        struct ldlm_reply *reply;
        int cleanup_phase = 1;
 
        ENTRY;
 
-       if (ldlm_request_slot_needed(type))
+       if (ldlm_request_slot_needed(einfo))
                obd_put_request_slot(&req->rq_import->imp_obd->u.cli);
 
        ptlrpc_put_mod_rpc_slot(req);
@@ -626,7 +629,7 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
        lock = ldlm_handle2lock(lockh);
        /* ldlm_cli_enqueue is holding a reference on this lock. */
        if (!lock) {
-               LASSERT(type == LDLM_FLOCK);
+               LASSERT(einfo->ei_type == LDLM_FLOCK);
                RETURN(-ENOLCK);
        }
 
@@ -690,20 +693,20 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                lock->l_remote_handle = reply->lock_handle;
        }
 
-       *flags = ldlm_flags_from_wire(reply->lock_flags);
+       *ldlm_flags = ldlm_flags_from_wire(reply->lock_flags);
        lock->l_flags |= ldlm_flags_from_wire(reply->lock_flags &
                                              LDLM_FL_INHERIT_MASK);
        unlock_res_and_lock(lock);
 
        CDEBUG(D_INFO, "local: %p, remote cookie: %#llx, flags: %#llx\n",
-              lock, reply->lock_handle.cookie, *flags);
+              lock, reply->lock_handle.cookie, *ldlm_flags);
 
        /*
         * If enqueue returned a blocked lock but the completion handler has
         * already run, then it fixed up the resource and we don't need to do it
         * again.
         */
-       if ((*flags) & LDLM_FL_LOCK_CHANGED) {
+       if ((*ldlm_flags) & LDLM_FL_LOCK_CHANGED) {
                int newmode = reply->lock_desc.l_req_mode;
 
                LASSERT(!is_replay);
@@ -735,12 +738,12 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
                                                &lock->l_policy_data);
                }
 
-               if (type != LDLM_PLAIN)
+               if (einfo->ei_type != LDLM_PLAIN)
                        LDLM_DEBUG(lock,
                                   "client-side enqueue, new policy data");
        }
 
-       if ((*flags) & LDLM_FL_AST_SENT) {
+       if ((*ldlm_flags) & LDLM_FL_AST_SENT) {
                lock_res_and_lock(lock);
                ldlm_bl_desc2lock(&reply->lock_desc, lock);
                lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;
@@ -771,9 +774,10 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
        }
 
        if (!is_replay) {
-               rc = ldlm_lock_enqueue(env, ns, &lock, NULL, flags);
+               rc = ldlm_lock_enqueue(env, ns, &lock, NULL, ldlm_flags);
                if (lock->l_completion_ast != NULL) {
-                       int err = lock->l_completion_ast(lock, *flags, NULL);
+                       int err = lock->l_completion_ast(lock, *ldlm_flags,
+                                                        NULL);
 
                        if (!rc)
                                rc = err;
@@ -794,7 +798,7 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,
        EXIT;
 cleanup:
        if (cleanup_phase == 1 && rc)
-               failed_lock_cleanup(ns, lock, mode);
+               failed_lock_cleanup(ns, lock, einfo->ei_mode);
        /* Put lock 2 times, the second reference is held by ldlm_cli_enqueue */
        LDLM_LOCK_PUT(lock);
        LDLM_LOCK_RELEASE(lock);
@@ -1069,24 +1073,15 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
 
        /* extended LDLM opcodes in client stats */
        if (exp->exp_obd->obd_svc_stats != NULL) {
-               bool glimpse = *flags & LDLM_FL_HAS_INTENT;
-
-               /* OST glimpse has no intent buffer */
-               if (req_capsule_has_field(&req->rq_pill, &RMF_LDLM_INTENT,
-                                         RCL_CLIENT)) {
-                       struct ldlm_intent *it;
-
-                       it = req_capsule_client_get(&req->rq_pill,
-                                                   &RMF_LDLM_INTENT);
-                       glimpse = (it && (it->opc == IT_GLIMPSE));
-               }
-
-               if (!glimpse)
-                       ldlm_svc_get_eopc(body, exp->exp_obd->obd_svc_stats);
-               else
+               /* glimpse is intent with no intent buffer */
+               if (*flags & LDLM_FL_HAS_INTENT &&
+                   !req_capsule_has_field(&req->rq_pill, &RMF_LDLM_INTENT,
+                                          RCL_CLIENT))
                        lprocfs_counter_incr(exp->exp_obd->obd_svc_stats,
                                             PTLRPC_LAST_CNTR +
                                             LDLM_GLIMPSE_ENQUEUE);
+               else
+                       ldlm_svc_get_eopc(body, exp->exp_obd->obd_svc_stats);
        }
 
        /* It is important to obtain modify RPC slot first (if applicable), so
@@ -1096,13 +1091,15 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
        if (einfo->ei_enq_slot)
                ptlrpc_get_mod_rpc_slot(req);
 
-       if (ldlm_request_slot_needed(einfo->ei_type)) {
+       if (ldlm_request_slot_needed(einfo)) {
                rc = obd_get_request_slot(&req->rq_import->imp_obd->u.cli);
                if (rc) {
                        if (einfo->ei_enq_slot)
                                ptlrpc_put_mod_rpc_slot(req);
                        failed_lock_cleanup(ns, lock, einfo->ei_mode);
                        LDLM_LOCK_RELEASE(lock);
+                       if (!req_passed_in)
+                               ptlrpc_req_finished(req);
                        GOTO(out, rc);
                }
        }
@@ -1116,9 +1113,8 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,
 
        rc = ptlrpc_queue_wait(req);
 
-       err = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, policy ? 1 : 0,
-                                   einfo->ei_mode, flags, lvb, lvb_len,
-                                   lockh, rc);
+       err = ldlm_cli_enqueue_fini(exp, req, einfo, policy ? 1 : 0, flags,
+                                   lvb, lvb_len, lockh, rc);
 
        /*
         * If ldlm_cli_enqueue_fini did not find the lock, we need to free
@@ -1258,7 +1254,8 @@ static __u64 ldlm_cli_cancel_local(struct ldlm_lock *lock)
                }
                LDLM_DEBUG(lock, "server-side local cancel");
                ldlm_lock_cancel(lock);
-               ldlm_reprocess_all(lock->l_resource, lock);
+               ldlm_reprocess_all(lock->l_resource,
+                                  lock->l_policy_data.l_inodebits.bits);
        }
 
        RETURN(rc);
@@ -2533,6 +2530,8 @@ static void ldlm_cancel_unused_locks_for_replay(struct ldlm_namespace *ns)
               "Dropping as many unused locks as possible before replay for namespace %s (%d)\n",
               ldlm_ns_name(ns), ns->ns_nr_unused);
 
+       OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_REPLAY_PAUSE, cfs_fail_val);
+
        /*
         * We don't need to care whether or not LRU resize is enabled
         * because the LDLM_LRU_FLAG_NO_WAIT policy doesn't use the
@@ -2567,7 +2566,8 @@ int __ldlm_replay_locks(struct obd_import *imp, bool rate_limit)
 
        ENTRY;
 
-       LASSERT(atomic_read(&imp->imp_replay_inflight) == 1);
+       while (atomic_read(&imp->imp_replay_inflight) != 1)
+               cond_resched();
 
        /* don't replay locks if import failed recovery */
        if (imp->imp_vbr_failed)
@@ -2580,9 +2580,11 @@ int __ldlm_replay_locks(struct obd_import *imp, bool rate_limit)
 
        list_for_each_entry_safe(lock, next, &list, l_pending_chain) {
                list_del_init(&lock->l_pending_chain);
-               if (rc) {
+               /* If we disconnected in the middle - cleanup and let
+                * reconnection to happen again. LU-14027 */
+               if (rc || (imp->imp_state != LUSTRE_IMP_REPLAY_LOCKS)) {
                        LDLM_LOCK_RELEASE(lock);
-                       continue; /* or try to do the rest? */
+                       continue;
                }
                rc = replay_one_lock(imp, lock);
                LDLM_LOCK_RELEASE(lock);
@@ -2620,9 +2622,12 @@ int ldlm_replay_locks(struct obd_import *imp)
        struct task_struct *task;
        int rc = 0;
 
-       class_import_get(imp);
        /* ensure this doesn't fall to 0 before all have been queued */
-       atomic_inc(&imp->imp_replay_inflight);
+       if (atomic_inc_return(&imp->imp_replay_inflight) > 1) {
+               atomic_dec(&imp->imp_replay_inflight);
+               return 0;
+       }
+       class_import_get(imp);
 
        task = kthread_run(ldlm_lock_replay_thread, imp, "ldlm_lock_replay");
        if (IS_ERR(task)) {