Whamcloud - gitweb
LU-10841 ldlm: ASSERTION(lock->l_granted_mode!=lock->l_req_mode)
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
index 61324cc..b5c68b2 100644 (file)
@@ -1689,6 +1689,33 @@ out:
        RETURN(ERR_PTR(rc));
 }
 
+#ifdef HAVE_SERVER_SUPPORT
+static enum ldlm_error ldlm_lock_enqueue_helper(struct ldlm_lock *lock,
+                                            __u64 *flags)
+{
+       struct ldlm_resource *res = lock->l_resource;
+       enum ldlm_error rc = ELDLM_OK;
+       struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
+       ldlm_processing_policy policy;
+       ENTRY;
+
+       policy = ldlm_processing_policy_table[res->lr_type];
+restart:
+       policy(lock, flags, LDLM_PROCESS_ENQUEUE, &rc, &rpc_list);
+       if (rc == ELDLM_OK && lock->l_granted_mode != lock->l_req_mode &&
+           res->lr_type != LDLM_FLOCK) {
+               rc = ldlm_handle_conflict_lock(lock, flags, &rpc_list);
+               if (rc == -ERESTART)
+                       GOTO(restart, rc);
+       }
+
+       if (!list_empty(&rpc_list))
+               ldlm_discard_bl_list(&rpc_list);
+
+       RETURN(rc);
+}
+#endif
+
 /**
  * Enqueue (request) a lock.
  *
@@ -1706,9 +1733,6 @@ enum ldlm_error ldlm_lock_enqueue(struct ldlm_namespace *ns,
        struct ldlm_lock *lock = *lockp;
        struct ldlm_resource *res = lock->l_resource;
        int local = ns_is_client(ldlm_res_to_ns(res));
-#ifdef HAVE_SERVER_SUPPORT
-       ldlm_processing_policy policy;
-#endif
        enum ldlm_error rc = ELDLM_OK;
        struct ldlm_interval *node = NULL;
        ENTRY;
@@ -1827,8 +1851,7 @@ enum ldlm_error ldlm_lock_enqueue(struct ldlm_namespace *ns,
                /* If no flags, fall through to normal enqueue path. */
        }
 
-       policy = ldlm_processing_policy_table[res->lr_type];
-       policy(lock, flags, LDLM_PROCESS_ENQUEUE, &rc, NULL);
+       rc = ldlm_lock_enqueue_helper(lock, flags);
        GOTO(out, rc);
 #else
         } else {
@@ -1861,6 +1884,7 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
        __u64 flags;
        int rc = LDLM_ITER_CONTINUE;
        enum ldlm_error err;
+       struct list_head bl_ast_list = LIST_HEAD_INIT(bl_ast_list);
        ENTRY;
 
        check_res_locked(res);
@@ -1870,15 +1894,23 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
        LASSERT(intention == LDLM_PROCESS_RESCAN ||
                intention == LDLM_PROCESS_RECOVERY);
 
+restart:
        list_for_each_safe(tmp, pos, queue) {
                struct ldlm_lock *pending;
+               struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
 
                pending = list_entry(tmp, struct ldlm_lock, l_res_link);
 
                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
 
                 flags = 0;
-               rc = policy(pending, &flags, intention, &err, work_list);
+               rc = policy(pending, &flags, intention, &err, &rpc_list);
+               if (pending->l_granted_mode == pending->l_req_mode ||
+                   res->lr_type == LDLM_FLOCK) {
+                       list_splice(&rpc_list, work_list);
+               } else {
+                       list_splice(&rpc_list, &bl_ast_list);
+               }
                /*
                 * When this is called from recovery done, we always want
                 * to scan the whole list no matter what 'rc' is returned.
@@ -1888,6 +1920,22 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
                        break;
         }
 
+       if (!list_empty(&bl_ast_list)) {
+               unlock_res(res);
+
+               LASSERT(intention == LDLM_PROCESS_RECOVERY);
+
+               rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &bl_ast_list,
+                                      LDLM_WORK_BL_AST);
+
+               lock_res(res);
+               if (rc == -ERESTART)
+                       GOTO(restart, rc);
+       }
+
+       if (!list_empty(&bl_ast_list))
+               ldlm_discard_bl_list(&bl_ast_list);
+
         RETURN(intention == LDLM_PROCESS_RESCAN ? rc : LDLM_ITER_CONTINUE);
 }
 
@@ -1898,7 +1946,6 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
  * \param[in] lock             The lock to be enqueued.
  * \param[out] flags           Lock flags for the lock to be enqueued.
  * \param[in] rpc_list         Conflicting locks list.
- * \param[in] grant_flags      extra flags when granting a lock.
  *
  * \retval -ERESTART:  Some lock was instantly canceled while sending
  *                     blocking ASTs, caller needs to re-check conflicting
@@ -1907,7 +1954,7 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
  * \reval 0:           Lock is successfully added in waiting list.
  */
 int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags,
-                             struct list_head *rpc_list, __u64 grant_flags)
+                             struct list_head *rpc_list)
 {
        struct ldlm_resource *res = lock->l_resource;
        int rc;
@@ -1956,7 +2003,7 @@ int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags,
 
                RETURN(rc);
        }
-       *flags |= (LDLM_FL_BLOCK_GRANTED | grant_flags);
+       *flags |= LDLM_FL_BLOCK_GRANTED;
 
        RETURN(0);
 }
@@ -2606,10 +2653,10 @@ void _ldlm_lock_debug(struct ldlm_lock *lock,
         va_start(args, fmt);
 
         if (exp && exp->exp_connection) {
-                nid = libcfs_nid2str(exp->exp_connection->c_peer.nid);
+               nid = obd_export_nid2str(exp);
         } else if (exp && exp->exp_obd != NULL) {
                 struct obd_import *imp = exp->exp_obd->u.cli.cl_import;
-                nid = libcfs_nid2str(imp->imp_connection->c_peer.nid);
+               nid = obd_import_nid2str(imp);
         }
 
         if (resource == NULL) {