Whamcloud - gitweb
LU-8306 ldlm: send blocking ASTs after lock replay 16/24716/13
authorNiu Yawei <yawei.niu@intel.com>
Wed, 25 Jan 2017 14:52:34 +0000 (22:52 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Tue, 14 Mar 2017 02:58:43 +0000 (02:58 +0000)
If blocking AST wasn't received by client before recovery,
we need to scan the whole waiting lock list to send blocking
ASTs after lock replay done, otherwise, client could be
evicted unpurposely like following:

- cl1 has a granted lock;
- cl2 has a waiting lock, BL AST is sent but lost on a way;
- failover, locks are replayed and applied on the server in
  the correct order;
- waiting lock is just put to the resource, no new BL AST
  is re-sent, no timeout can happen for the granted lock on
  server, no timeout for the waiting lock on client;
- cl2 will be hanging for a long time until cl1 will cancel
  its aged lock; may lead to cl2 eviction.

Signed-off-by: Niu Yawei <yawei.niu@intel.com>
Change-Id: I2a3fecf3b7fa79f96874d5ae21c599725334d9a5
Reviewed-on: https://review.whamcloud.com/24716
Reviewed-by: Vitaly Fertman <vitaly.fertman@seagate.com>
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
Reviewed-by: Mike Pershin <mike.pershin@intel.com>
lustre/include/lustre_dlm.h
lustre/ldlm/ldlm_extent.c
lustre/ldlm/ldlm_flock.c
lustre/ldlm/ldlm_inodebits.c
lustre/ldlm/ldlm_internal.h
lustre/ldlm/ldlm_lib.c
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_plain.c
lustre/ofd/ofd_dlm.c

index 571a16c..0cd6641 100644 (file)
@@ -1191,8 +1191,41 @@ void _ldlm_lock_debug(struct ldlm_lock *lock,
 # define LDLM_ERROR(lock, fmt, a...) ((void)0)
 #endif
 
 # define LDLM_ERROR(lock, fmt, a...) ((void)0)
 #endif
 
+/*
+ * Three intentions can be used for the policy functions in
+ * ldlm_processing_policy.
+ *
+ * LDLM_PROCESS_RESCAN:
+ *
+ * It's used when policy functions are called from ldlm_reprocess_queue() to
+ * reprocess the wait & convert list and try to grant locks, blocking ASTs
+ * have already been sent in this situation, completion ASTs need be sent for
+ * the locks being granted.
+ *
+ * LDLM_PROCESS_ENQUEUE:
+ *
+ * It's used when policy functions are called from ldlm_lock_enqueue() to
+ * process the wait & convert list for handling an enqueue request, blocking
+ * ASTs have not been sent yet, so list of conflicting locks would be
+ * collected and ASTs sent.
+ *
+ * LDLM_PROCESS_RECOVERY:
+ *
+ * It's used when policy functions are called from ldlm_reprocess_queue() to
+ * reprocess the wait & convert list when recovery done. In case of blocking
+ * ASTs are lost before recovery, it needs not only to grant locks if
+ * available, but also send blocking ASTs to the locks doesn't have AST sent
+ * flag. Completion ASTs need be sent for the locks being granted.
+ */
+enum ldlm_process_intention {
+       LDLM_PROCESS_RESCAN = 0,
+       LDLM_PROCESS_ENQUEUE = 1,
+       LDLM_PROCESS_RECOVERY = 2,
+};
+
 typedef int (*ldlm_processing_policy)(struct ldlm_lock *lock, __u64 *flags,
 typedef int (*ldlm_processing_policy)(struct ldlm_lock *lock, __u64 *flags,
-                                     int first_enq, enum ldlm_error *err,
+                                     enum ldlm_process_intention intention,
+                                     enum ldlm_error *err,
                                      struct list_head *work_list);
 
 /**
                                      struct list_head *work_list);
 
 /**
@@ -1421,7 +1454,7 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock,
 void ldlm_lock_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode);
 void ldlm_lock_cancel(struct ldlm_lock *lock);
 void ldlm_reprocess_all(struct ldlm_resource *res);
 void ldlm_lock_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode);
 void ldlm_lock_cancel(struct ldlm_lock *lock);
 void ldlm_reprocess_all(struct ldlm_resource *res);
-void ldlm_reprocess_all_ns(struct ldlm_namespace *ns);
+void ldlm_reprocess_recovery_done(struct ldlm_namespace *ns);
 void ldlm_lock_dump_handle(int level, const struct lustre_handle *lockh);
 void ldlm_unlink_lock_skiplist(struct ldlm_lock *req);
 
 void ldlm_lock_dump_handle(int level, const struct lustre_handle *lockh);
 void ldlm_unlink_lock_skiplist(struct ldlm_lock *req);
 
index 3388876..a950b0b 100644 (file)
@@ -737,34 +737,6 @@ void ldlm_resource_prolong(struct ldlm_prolong_args *arg)
 }
 EXPORT_SYMBOL(ldlm_resource_prolong);
 
 }
 EXPORT_SYMBOL(ldlm_resource_prolong);
 
-
-/**
- * Discard all AST work items from list.
- *
- * If for whatever reason we do not want to send ASTs to conflicting locks
- * anymore, disassemble the list with this function.
- */
-static void discard_bl_list(struct list_head *bl_list)
-{
-       struct list_head *tmp, *pos;
-        ENTRY;
-
-       list_for_each_safe(pos, tmp, bl_list) {
-                struct ldlm_lock *lock =
-                       list_entry(pos, struct ldlm_lock, l_bl_ast);
-
-               list_del_init(&lock->l_bl_ast);
-               LASSERT(ldlm_is_ast_sent(lock));
-               ldlm_clear_ast_sent(lock);
-                LASSERT(lock->l_bl_ast_run == 0);
-                LASSERT(lock->l_blocking_lock);
-                LDLM_LOCK_RELEASE(lock->l_blocking_lock);
-                lock->l_blocking_lock = NULL;
-                LDLM_LOCK_RELEASE(lock);
-        }
-        EXIT;
-}
-
 /**
  * Process a granting attempt for extent lock.
  * Must be called with ns lock held.
 /**
  * Process a granting attempt for extent lock.
  * Must be called with ns lock held.
@@ -772,17 +744,10 @@ static void discard_bl_list(struct list_head *bl_list)
  * This function looks for any conflicts for \a lock in the granted or
  * waiting queues. The lock is granted if no conflicts are found in
  * either queue.
  * This function looks for any conflicts for \a lock in the granted or
  * waiting queues. The lock is granted if no conflicts are found in
  * either queue.
- *
- * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
- *   - blocking ASTs have already been sent
- *
- * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
- *   - blocking ASTs have not been sent yet, so list of conflicting locks
- *     would be collected and ASTs sent.
  */
 int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags,
  */
 int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags,
-                            int first_enq, enum ldlm_error *err,
-                            struct list_head *work_list)
+                            enum ldlm_process_intention intention,
+                            enum ldlm_error *err, struct list_head *work_list)
 {
        struct ldlm_resource *res = lock->l_resource;
        struct list_head rpc_list;
 {
        struct ldlm_resource *res = lock->l_resource;
        struct list_head rpc_list;
@@ -798,7 +763,7 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags,
        check_res_locked(res);
        *err = ELDLM_OK;
 
        check_res_locked(res);
        *err = ELDLM_OK;
 
-        if (!first_enq) {
+       if (intention == LDLM_PROCESS_RESCAN) {
                 /* Careful observers will note that we don't handle -EWOULDBLOCK
                  * here, but it's ok for a non-obvious reason -- compat_queue
                  * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
                 /* Careful observers will note that we don't handle -EWOULDBLOCK
                  * here, but it's ok for a non-obvious reason -- compat_queue
                  * can only return -EWOULDBLOCK if (flags & BLOCK_NOWAIT).
@@ -823,79 +788,44 @@ int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags,
                 RETURN(LDLM_ITER_CONTINUE);
         }
 
                 RETURN(LDLM_ITER_CONTINUE);
         }
 
+       LASSERT((intention == LDLM_PROCESS_ENQUEUE && work_list == NULL) ||
+               (intention == LDLM_PROCESS_RECOVERY && work_list != NULL));
  restart:
         contended_locks = 0;
         rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
                                       &rpc_list, &contended_locks);
  restart:
         contended_locks = 0;
         rc = ldlm_extent_compat_queue(&res->lr_granted, lock, flags, err,
                                       &rpc_list, &contended_locks);
-        if (rc < 0)
-                GOTO(out, rc); /* lock was destroyed */
-        if (rc == 2)
-                goto grant;
-
-        rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock, flags, err,
-                                       &rpc_list, &contended_locks);
-        if (rc2 < 0)
-                GOTO(out, rc = rc2); /* lock was destroyed */
-
-        if (rc + rc2 == 2) {
-        grant:
-                ldlm_extent_policy(res, lock, flags);
-                ldlm_resource_unlink_lock(lock);
-                ldlm_grant_lock(lock, NULL);
-        } else {
-                /* If either of the compat_queue()s returned failure, then we
-                 * have ASTs to send and must go onto the waiting list.
-                 *
-                 * bug 2322: we used to unlink and re-add here, which was a
-                 * terrible folly -- if we goto restart, we could get
-                 * re-ordered!  Causes deadlock, because ASTs aren't sent! */
-               if (list_empty(&lock->l_res_link))
-                        ldlm_resource_add_lock(res, &res->lr_waiting, lock);
-                unlock_res(res);
-                rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
-                                       LDLM_WORK_BL_AST);
-
-                if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) &&
-                    !ns_is_client(ldlm_res_to_ns(res)))
-                        class_fail_export(lock->l_export);
-
-               lock_res(res);
-               if (rc == -ERESTART) {
-                       /* 15715: The lock was granted and destroyed after
-                        * resource lock was dropped. Interval node was freed
-                        * in ldlm_lock_destroy. Anyway, this always happens
-                        * when a client is being evicted. So it would be
-                        * ok to return an error. -jay */
-                       if (ldlm_is_destroyed(lock)) {
-                               *err = -EAGAIN;
-                               GOTO(out, rc = -EAGAIN);
-                       }
-
-                       /* lock was granted while resource was unlocked. */
-                       if (lock->l_granted_mode == lock->l_req_mode) {
-                               /* bug 11300: if the lock has been granted,
-                                * break earlier because otherwise, we will go
-                                * to restart and ldlm_resource_unlink will be
-                                * called and it causes the interval node to be
-                                * freed. Then we will fail at
-                                * ldlm_extent_add_lock() */
-                               *flags &= ~LDLM_FL_BLOCKED_MASK;
-                               GOTO(out, rc = 0);
-                       }
+       if (rc < 0)
+               GOTO(out_rpc_list, rc);
+
+       rc2 = 0;
+       if (rc != 2) {
+               rc2 = ldlm_extent_compat_queue(&res->lr_waiting, lock,
+                                              flags, err, &rpc_list,
+                                              &contended_locks);
+               if (rc2 < 0)
+                       GOTO(out_rpc_list, rc = rc2);
+       }
 
 
+       if (rc + rc2 != 2) {
+               /* Adding LDLM_FL_NO_TIMEOUT flag to granted lock to force
+                * client to wait for the lock endlessly once the lock is
+                * enqueued -bzzz */
+               rc = ldlm_handle_conflict_lock(lock, flags, &rpc_list,
+                                              LDLM_FL_NO_TIMEOUT);
+               if (rc == -ERESTART)
                        GOTO(restart, rc);
                        GOTO(restart, rc);
-               }
-
-               /* this way we force client to wait for the lock
-                * endlessly once the lock is enqueued -bzzz */
-               *flags |= LDLM_FL_BLOCK_GRANTED | LDLM_FL_NO_TIMEOUT;
-
+               *err = rc;
+       } else {
+               ldlm_extent_policy(res, lock, flags);
+               ldlm_resource_unlink_lock(lock);
+               ldlm_grant_lock(lock, work_list);
+               rc = 0;
        }
        }
-       RETURN(0);
-out:
+
+out_rpc_list:
        if (!list_empty(&rpc_list)) {
                LASSERT(!ldlm_is_ast_discard_data(lock));
        if (!list_empty(&rpc_list)) {
                LASSERT(!ldlm_is_ast_discard_data(lock));
-               discard_bl_list(&rpc_list);
+               ldlm_discard_bl_list(&rpc_list);
        }
        RETURN(rc);
 }
        }
        RETURN(rc);
 }
index d78745a..b3d6697 100644 (file)
@@ -270,19 +270,10 @@ static void ldlm_flock_cancel_on_deadlock(struct ldlm_lock *lock,
  * This function looks for any conflicts for \a lock in the granted or
  * waiting queues. The lock is granted if no conflicts are found in
  * either queue.
  * This function looks for any conflicts for \a lock in the granted or
  * waiting queues. The lock is granted if no conflicts are found in
  * either queue.
- *
- * It is also responsible for splitting a lock if a portion of the lock
- * is released.
- *
- * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
- *   - blocking ASTs have already been sent
- *
- * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
- *   - blocking ASTs have not been sent yet, so list of conflicting locks
- *     would be collected and ASTs sent.
  */
 int
  */
 int
-ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags, int first_enq,
+ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags,
+                       enum ldlm_process_intention intention,
                        enum ldlm_error *err, struct list_head *work_list)
 {
        struct ldlm_resource *res = req->l_resource;
                        enum ldlm_error *err, struct list_head *work_list)
 {
        struct ldlm_resource *res = req->l_resource;
@@ -353,7 +344,7 @@ reprocess:
                         if (!ldlm_flocks_overlap(lock, req))
                                 continue;
 
                         if (!ldlm_flocks_overlap(lock, req))
                                 continue;
 
-                       if (!first_enq) {
+                       if (intention != LDLM_PROCESS_ENQUEUE) {
                                reprocess_failed = 1;
                                if (ldlm_flock_deadlock(req, lock)) {
                                        ldlm_flock_cancel_on_deadlock(req,
                                reprocess_failed = 1;
                                if (ldlm_flock_deadlock(req, lock)) {
                                        ldlm_flock_cancel_on_deadlock(req,
@@ -570,7 +561,7 @@ reprocess:
 
         if (*flags != LDLM_FL_WAIT_NOREPROC) {
 #ifdef HAVE_SERVER_SUPPORT
 
         if (*flags != LDLM_FL_WAIT_NOREPROC) {
 #ifdef HAVE_SERVER_SUPPORT
-                if (first_enq) {
+               if (intention == LDLM_PROCESS_ENQUEUE) {
                         /* If this is an unlock, reprocess the waitq and
                          * send completions ASTs for locks that can now be
                          * granted. The only problem with doing this
                         /* If this is an unlock, reprocess the waitq and
                          * send completions ASTs for locks that can now be
                          * granted. The only problem with doing this
@@ -578,16 +569,17 @@ reprocess:
                          * newly granted locks will be sent before the unlock
                          * completion is sent. It shouldn't be an issue. Also
                          * note that ldlm_process_flock_lock() will recurse,
                          * newly granted locks will be sent before the unlock
                          * completion is sent. It shouldn't be an issue. Also
                          * note that ldlm_process_flock_lock() will recurse,
-                         * but only once because first_enq will be false from
-                         * ldlm_reprocess_queue. */
+                        * but only once because 'intention' won't be
+                        * LDLM_PROCESS_ENQUEUE from ldlm_reprocess_queue. */
                        if ((mode == LCK_NL) && overlaps) {
                                struct list_head rpc_list;
                                 int rc;
 
                                INIT_LIST_HEAD(&rpc_list);
 restart:
                        if ((mode == LCK_NL) && overlaps) {
                                struct list_head rpc_list;
                                 int rc;
 
                                INIT_LIST_HEAD(&rpc_list);
 restart:
-                                ldlm_reprocess_queue(res, &res->lr_waiting,
-                                                     &rpc_list);
+                               ldlm_reprocess_queue(res, &res->lr_waiting,
+                                                    &rpc_list,
+                                                    LDLM_PROCESS_RESCAN);
 
                                 unlock_res_and_lock(req);
                                 rc = ldlm_run_ast_work(ns, &rpc_list,
 
                                 unlock_res_and_lock(req);
                                 rc = ldlm_run_ast_work(ns, &rpc_list,
index 636527b..90e34a6 100644 (file)
@@ -175,16 +175,10 @@ ldlm_inodebits_compat_queue(struct list_head *queue, struct ldlm_lock *req,
  * This function looks for any conflicts for \a lock in the granted or
  * waiting queues. The lock is granted if no conflicts are found in
  * either queue.
  * This function looks for any conflicts for \a lock in the granted or
  * waiting queues. The lock is granted if no conflicts are found in
  * either queue.
- *
- * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
- *   - blocking ASTs have already been sent
- *
- * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
- *   - blocking ASTs have not been sent yet, so list of conflicting locks
- *     would be collected and ASTs sent.
  */
 int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags,
  */
 int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags,
-                               int first_enq, enum ldlm_error *err,
+                               enum ldlm_process_intention intention,
+                               enum ldlm_error *err,
                                struct list_head *work_list)
 {
        struct ldlm_resource *res = lock->l_resource;
                                struct list_head *work_list)
 {
        struct ldlm_resource *res = lock->l_resource;
@@ -198,7 +192,8 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags,
        check_res_locked(res);
 
        /* (*flags & LDLM_FL_BLOCK_NOWAIT) is for layout lock right now. */
        check_res_locked(res);
 
        /* (*flags & LDLM_FL_BLOCK_NOWAIT) is for layout lock right now. */
-        if (!first_enq || (*flags & LDLM_FL_BLOCK_NOWAIT)) {
+       if (intention == LDLM_PROCESS_RESCAN ||
+           (*flags & LDLM_FL_BLOCK_NOWAIT)) {
                *err = ELDLM_LOCK_ABORTED;
                if (*flags & LDLM_FL_BLOCK_NOWAIT)
                        *err = ELDLM_LOCK_WOULDBLOCK;
                *err = ELDLM_LOCK_ABORTED;
                if (*flags & LDLM_FL_BLOCK_NOWAIT)
                        *err = ELDLM_LOCK_WOULDBLOCK;
@@ -217,44 +212,25 @@ int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags,
                RETURN(LDLM_ITER_CONTINUE);
        }
 
                RETURN(LDLM_ITER_CONTINUE);
        }
 
+       LASSERT((intention == LDLM_PROCESS_ENQUEUE && work_list == NULL) ||
+               (intention == LDLM_PROCESS_RECOVERY && work_list != NULL));
  restart:
         rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, &rpc_list);
         rc += ldlm_inodebits_compat_queue(&res->lr_waiting, lock, &rpc_list);
 
         if (rc != 2) {
  restart:
         rc = ldlm_inodebits_compat_queue(&res->lr_granted, lock, &rpc_list);
         rc += ldlm_inodebits_compat_queue(&res->lr_waiting, lock, &rpc_list);
 
         if (rc != 2) {
-                /* If either of the compat_queue()s returned 0, then we
-                 * have ASTs to send and must go onto the waiting list.
-                 *
-                 * bug 2322: we used to unlink and re-add here, which was a
-                 * terrible folly -- if we goto restart, we could get
-                 * re-ordered!  Causes deadlock, because ASTs aren't sent! */
-               if (list_empty(&lock->l_res_link))
-                        ldlm_resource_add_lock(res, &res->lr_waiting, lock);
-                unlock_res(res);
-                rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
-                                       LDLM_WORK_BL_AST);
-                lock_res(res);
-               if (rc == -ERESTART) {
-                       /* We were granted while waiting, nothing left to do */
-                       if (lock->l_granted_mode == lock->l_req_mode)
-                               GOTO(out, rc = 0);
-                       /* Lock was destroyed while we were waiting, abort */
-                       if (ldlm_is_destroyed(lock))
-                               GOTO(out, rc = -EAGAIN);
-
-                       /* Otherwise try again */
+               rc = ldlm_handle_conflict_lock(lock, flags, &rpc_list, 0);
+               if (rc == -ERESTART)
                        GOTO(restart, rc);
                        GOTO(restart, rc);
-               }
-                *flags |= LDLM_FL_BLOCK_GRANTED;
-        } else {
-                ldlm_resource_unlink_lock(lock);
-                ldlm_grant_lock(lock, NULL);
-        }
+               *err = rc;
+       } else {
+               ldlm_resource_unlink_lock(lock);
+               ldlm_grant_lock(lock, work_list);
+               rc = 0;
+       }
 
 
-       rc = 0;
-out:
-       *err = rc;
-       LASSERT(list_empty(&rpc_list));
+       if (!list_empty(&rpc_list))
+               ldlm_discard_bl_list(&rpc_list);
 
        RETURN(rc);
 }
 
        RETURN(rc);
 }
index 0f69104..cb6a923 100644 (file)
@@ -149,7 +149,11 @@ void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
                            struct list_head *work_list);
 #ifdef HAVE_SERVER_SUPPORT
 int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
                            struct list_head *work_list);
 #ifdef HAVE_SERVER_SUPPORT
 int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
-                        struct list_head *work_list);
+                        struct list_head *work_list,
+                        enum ldlm_process_intention intention);
+int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags,
+                             struct list_head *rpc_list, __u64 grant_flags);
+void ldlm_discard_bl_list(struct list_head *bl_list);
 #endif
 int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list,
                       ldlm_desc_ast_t ast_type);
 #endif
 int ldlm_run_ast_work(struct ldlm_namespace *ns, struct list_head *rpc_list,
                       ldlm_desc_ast_t ast_type);
@@ -181,28 +185,26 @@ void ldlm_handle_bl_callback(struct ldlm_namespace *ns,
 #ifdef HAVE_SERVER_SUPPORT
 /* ldlm_plain.c */
 int ldlm_process_plain_lock(struct ldlm_lock *lock, __u64 *flags,
 #ifdef HAVE_SERVER_SUPPORT
 /* ldlm_plain.c */
 int ldlm_process_plain_lock(struct ldlm_lock *lock, __u64 *flags,
-                           int first_enq, enum ldlm_error *err,
-                           struct list_head *work_list);
+                           enum ldlm_process_intention intention,
+                           enum ldlm_error *err, struct list_head *work_list);
 
 /* ldlm_inodebits.c */
 int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags,
 
 /* ldlm_inodebits.c */
 int ldlm_process_inodebits_lock(struct ldlm_lock *lock, __u64 *flags,
-                               int first_enq, enum ldlm_error *err,
+                               enum ldlm_process_intention intention,
+                               enum ldlm_error *err,
                                struct list_head *work_list);
                                struct list_head *work_list);
-#endif
-
 /* ldlm_extent.c */
 /* ldlm_extent.c */
-#ifdef HAVE_SERVER_SUPPORT
 int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags,
 int ldlm_process_extent_lock(struct ldlm_lock *lock, __u64 *flags,
-                            int first_enq, enum ldlm_error *err,
-                            struct list_head *work_list);
+                            enum ldlm_process_intention intention,
+                            enum ldlm_error *err, struct list_head *work_list);
 #endif
 void ldlm_extent_add_lock(struct ldlm_resource *res, struct ldlm_lock *lock);
 void ldlm_extent_unlink_lock(struct ldlm_lock *lock);
 
 /* ldlm_flock.c */
 int ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags,
 #endif
 void ldlm_extent_add_lock(struct ldlm_resource *res, struct ldlm_lock *lock);
 void ldlm_extent_unlink_lock(struct ldlm_lock *lock);
 
 /* ldlm_flock.c */
 int ldlm_process_flock_lock(struct ldlm_lock *req, __u64 *flags,
-                           int first_enq, enum ldlm_error *err,
-                           struct list_head *work_list);
+                           enum ldlm_process_intention intention,
+                           enum ldlm_error *err, struct list_head *work_list);
 int ldlm_init_flock_export(struct obd_export *exp);
 void ldlm_destroy_flock_export(struct obd_export *exp);
 
 int ldlm_init_flock_export(struct obd_export *exp);
 void ldlm_destroy_flock_export(struct obd_export *exp);
 
index 7ea5b43..e2b83d0 100644 (file)
@@ -1579,7 +1579,7 @@ static void target_finish_recovery(struct lu_target *lut)
                        obd->obd_stale_clients == 1 ? "was" : "were");
        }
 
                        obd->obd_stale_clients == 1 ? "was" : "were");
        }
 
-        ldlm_reprocess_all_ns(obd->obd_namespace);
+       ldlm_reprocess_recovery_done(obd->obd_namespace);
        spin_lock(&obd->obd_recovery_task_lock);
        if (!list_empty(&obd->obd_req_replay_queue) ||
            !list_empty(&obd->obd_lock_replay_queue) ||
        spin_lock(&obd->obd_recovery_task_lock);
        if (!list_empty(&obd->obd_req_replay_queue) ||
            !list_empty(&obd->obd_lock_replay_queue) ||
index c78c4bc..6e4663f 100644 (file)
@@ -1835,7 +1835,7 @@ enum ldlm_error ldlm_lock_enqueue(struct ldlm_namespace *ns,
         }
 
         policy = ldlm_processing_policy_table[res->lr_type];
         }
 
         policy = ldlm_processing_policy_table[res->lr_type];
-        policy(lock, flags, 1, &rc, NULL);
+       policy(lock, flags, LDLM_PROCESS_ENQUEUE, &rc, NULL);
         GOTO(out, rc);
 #else
         } else {
         GOTO(out, rc);
 #else
         } else {
@@ -1860,7 +1860,8 @@ out:
  * Must be called with resource lock held.
  */
 int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
  * Must be called with resource lock held.
  */
 int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
-                        struct list_head *work_list)
+                        struct list_head *work_list,
+                        enum ldlm_process_intention intention)
 {
        struct list_head *tmp, *pos;
        ldlm_processing_policy policy;
 {
        struct list_head *tmp, *pos;
        ldlm_processing_policy policy;
@@ -1873,6 +1874,8 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
 
        policy = ldlm_processing_policy_table[res->lr_type];
        LASSERT(policy);
 
        policy = ldlm_processing_policy_table[res->lr_type];
        LASSERT(policy);
+       LASSERT(intention == LDLM_PROCESS_RESCAN ||
+               intention == LDLM_PROCESS_RECOVERY);
 
        list_for_each_safe(tmp, pos, queue) {
                struct ldlm_lock *pending;
 
        list_for_each_safe(tmp, pos, queue) {
                struct ldlm_lock *pending;
@@ -1882,13 +1885,116 @@ int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
 
                 flags = 0;
                 CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
 
                 flags = 0;
-                rc = policy(pending, &flags, 0, &err, work_list);
-                if (rc != LDLM_ITER_CONTINUE)
-                        break;
+               rc = policy(pending, &flags, intention, &err, work_list);
+               /*
+                * When this is called from recovery done, we always want
+                * to scan the whole list no matter what 'rc' is returned.
+                */
+               if (rc != LDLM_ITER_CONTINUE &&
+                   intention == LDLM_PROCESS_RESCAN)
+                       break;
         }
 
         }
 
-        RETURN(rc);
+        RETURN(intention == LDLM_PROCESS_RESCAN ? rc : LDLM_ITER_CONTINUE);
 }
 }
+
+/**
+ * Conflicting locks are detected for a lock to be enqueued, add the lock
+ * into waiting list and send blocking ASTs to the conflicting locks.
+ *
+ * \param[in] lock             The lock to be enqueued.
+ * \param[out] flags           Lock flags for the lock to be enqueued.
+ * \param[in] rpc_list         Conflicting locks list.
+ * \param[in] grant_flags      extra flags when granting a lock.
+ *
+ * \retval -ERESTART:  Some lock was instantly canceled while sending
+ *                     blocking ASTs, caller needs to re-check conflicting
+ *                     locks.
+ * \retval -EAGAIN:    Lock was destroyed, caller should return error.
+ * \reval 0:           Lock is successfully added in waiting list.
+ */
+int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags,
+                             struct list_head *rpc_list, __u64 grant_flags)
+{
+       struct ldlm_resource *res = lock->l_resource;
+       int rc;
+       ENTRY;
+
+       check_res_locked(res);
+
+       /* If either of the compat_queue()s returned failure, then we
+        * have ASTs to send and must go onto the waiting list.
+        *
+        * bug 2322: we used to unlink and re-add here, which was a
+        * terrible folly -- if we goto restart, we could get
+        * re-ordered!  Causes deadlock, because ASTs aren't sent! */
+       if (list_empty(&lock->l_res_link))
+               ldlm_resource_add_lock(res, &res->lr_waiting, lock);
+       unlock_res(res);
+
+       rc = ldlm_run_ast_work(ldlm_res_to_ns(res), rpc_list,
+                              LDLM_WORK_BL_AST);
+
+       if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) &&
+           !ns_is_client(ldlm_res_to_ns(res)))
+               class_fail_export(lock->l_export);
+
+       lock_res(res);
+       if (rc == -ERESTART) {
+               /* 15715: The lock was granted and destroyed after
+                * resource lock was dropped. Interval node was freed
+                * in ldlm_lock_destroy. Anyway, this always happens
+                * when a client is being evicted. So it would be
+                * ok to return an error. -jay */
+               if (ldlm_is_destroyed(lock))
+                       RETURN(-EAGAIN);
+
+               /* lock was granted while resource was unlocked. */
+               if (lock->l_granted_mode == lock->l_req_mode) {
+                       /* bug 11300: if the lock has been granted,
+                        * break earlier because otherwise, we will go
+                        * to restart and ldlm_resource_unlink will be
+                        * called and it causes the interval node to be
+                        * freed. Then we will fail at
+                        * ldlm_extent_add_lock() */
+                       *flags &= ~LDLM_FL_BLOCKED_MASK;
+                       RETURN(0);
+               }
+
+               RETURN(rc);
+       }
+       *flags |= (LDLM_FL_BLOCK_GRANTED | grant_flags);
+
+       RETURN(0);
+}
+
+/**
+ * Discard all AST work items from list.
+ *
+ * If for whatever reason we do not want to send ASTs to conflicting locks
+ * anymore, disassemble the list with this function.
+ */
+void ldlm_discard_bl_list(struct list_head *bl_list)
+{
+       struct list_head *tmp, *pos;
+        ENTRY;
+
+       list_for_each_safe(pos, tmp, bl_list) {
+                struct ldlm_lock *lock =
+                       list_entry(pos, struct ldlm_lock, l_bl_ast);
+
+               list_del_init(&lock->l_bl_ast);
+               LASSERT(ldlm_is_ast_sent(lock));
+               ldlm_clear_ast_sent(lock);
+               LASSERT(lock->l_bl_ast_run == 0);
+               LASSERT(lock->l_blocking_lock);
+               LDLM_LOCK_RELEASE(lock->l_blocking_lock);
+               lock->l_blocking_lock = NULL;
+               LDLM_LOCK_RELEASE(lock);
+       }
+       EXIT;
+}
+
 #endif
 
 /**
 #endif
 
 /**
@@ -2102,38 +2208,6 @@ out:
        return rc;
 }
 
        return rc;
 }
 
-static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
-{
-        ldlm_reprocess_all(res);
-        return LDLM_ITER_CONTINUE;
-}
-
-static int ldlm_reprocess_res(struct cfs_hash *hs, struct cfs_hash_bd *bd,
-                             struct hlist_node *hnode, void *arg)
-{
-        struct ldlm_resource *res = cfs_hash_object(hs, hnode);
-        int    rc;
-
-        rc = reprocess_one_queue(res, arg);
-
-        return rc == LDLM_ITER_STOP;
-}
-
-/**
- * Iterate through all resources on a namespace attempting to grant waiting
- * locks.
- */
-void ldlm_reprocess_all_ns(struct ldlm_namespace *ns)
-{
-       ENTRY;
-
-       if (ns != NULL) {
-               cfs_hash_for_each_nolock(ns->ns_rs_hash,
-                                        ldlm_reprocess_res, NULL, 0);
-       }
-       EXIT;
-}
-
 /**
  * Try to grant all waiting locks on a resource.
  *
 /**
  * Try to grant all waiting locks on a resource.
  *
@@ -2142,7 +2216,8 @@ void ldlm_reprocess_all_ns(struct ldlm_namespace *ns)
  * Typically called after some resource locks are cancelled to see
  * if anything could be granted as a result of the cancellation.
  */
  * Typically called after some resource locks are cancelled to see
  * if anything could be granted as a result of the cancellation.
  */
-void ldlm_reprocess_all(struct ldlm_resource *res)
+static void __ldlm_reprocess_all(struct ldlm_resource *res,
+                                enum ldlm_process_intention intention)
 {
        struct list_head rpc_list;
 #ifdef HAVE_SERVER_SUPPORT
 {
        struct list_head rpc_list;
 #ifdef HAVE_SERVER_SUPPORT
@@ -2165,11 +2240,13 @@ void ldlm_reprocess_all(struct ldlm_resource *res)
            atomic_read(&obd->obd_req_replay_clients) == 0)
                RETURN_EXIT;
 restart:
            atomic_read(&obd->obd_req_replay_clients) == 0)
                RETURN_EXIT;
 restart:
-        lock_res(res);
-        rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list);
-        if (rc == LDLM_ITER_CONTINUE)
-                ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list);
-        unlock_res(res);
+       lock_res(res);
+       rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list,
+                                 intention);
+       if (rc == LDLM_ITER_CONTINUE)
+               ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list,
+                                    intention);
+       unlock_res(res);
 
         rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
                                LDLM_WORK_CP_AST);
 
         rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
                                LDLM_WORK_CP_AST);
@@ -2189,8 +2266,38 @@ restart:
 #endif
         EXIT;
 }
 #endif
         EXIT;
 }
+
+void ldlm_reprocess_all(struct ldlm_resource *res)
+{
+       __ldlm_reprocess_all(res, LDLM_PROCESS_RESCAN);
+}
 EXPORT_SYMBOL(ldlm_reprocess_all);
 
 EXPORT_SYMBOL(ldlm_reprocess_all);
 
+static int ldlm_reprocess_res(struct cfs_hash *hs, struct cfs_hash_bd *bd,
+                             struct hlist_node *hnode, void *arg)
+{
+       struct ldlm_resource *res = cfs_hash_object(hs, hnode);
+
+       /* This is only called once after recovery done. LU-8306. */
+       __ldlm_reprocess_all(res, LDLM_PROCESS_RECOVERY);
+       return 0;
+}
+
+/**
+ * Iterate through all resources on a namespace attempting to grant waiting
+ * locks.
+ */
+void ldlm_reprocess_recovery_done(struct ldlm_namespace *ns)
+{
+       ENTRY;
+
+       if (ns != NULL) {
+               cfs_hash_for_each_nolock(ns->ns_rs_hash,
+                                        ldlm_reprocess_res, NULL, 0);
+       }
+       EXIT;
+}
+
 static bool is_bl_done(struct ldlm_lock *lock)
 {
        bool bl_done = true;
 static bool is_bl_done(struct ldlm_lock *lock)
 {
        bool bl_done = true;
@@ -2557,7 +2664,8 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock,
                ldlm_processing_policy policy;
 
                 policy = ldlm_processing_policy_table[res->lr_type];
                ldlm_processing_policy policy;
 
                 policy = ldlm_processing_policy_table[res->lr_type];
-                rc = policy(lock, &pflags, 0, &err, &rpc_list);
+               rc = policy(lock, &pflags, LDLM_PROCESS_RESCAN, &err,
+                           &rpc_list);
                 if (rc == LDLM_ITER_STOP) {
                         lock->l_req_mode = old_mode;
                         if (res->lr_type == LDLM_EXTENT)
                 if (rc == LDLM_ITER_STOP) {
                         lock->l_req_mode = old_mode;
                         if (res->lr_type == LDLM_EXTENT)
index 423412e..6453cab 100644 (file)
@@ -123,17 +123,10 @@ ldlm_plain_compat_queue(struct list_head *queue, struct ldlm_lock *req,
  * This function looks for any conflicts for \a lock in the granted or
  * waiting queues. The lock is granted if no conflicts are found in
  * either queue.
  * This function looks for any conflicts for \a lock in the granted or
  * waiting queues. The lock is granted if no conflicts are found in
  * either queue.
- *
- * If \a first_enq is 0 (ie, called from ldlm_reprocess_queue):
- *   - blocking ASTs have already been sent
- *
- * If \a first_enq is 1 (ie, called from ldlm_lock_enqueue):
- *   - blocking ASTs have not been sent yet, so list of conflicting locks
- *     would be collected and ASTs sent.
  */
 int ldlm_process_plain_lock(struct ldlm_lock *lock, __u64 *flags,
  */
 int ldlm_process_plain_lock(struct ldlm_lock *lock, __u64 *flags,
-                           int first_enq, enum ldlm_error *err,
-                           struct list_head *work_list)
+                           enum ldlm_process_intention intention,
+                           enum ldlm_error *err, struct list_head *work_list)
 {
        struct ldlm_resource *res = lock->l_resource;
        struct list_head rpc_list;
 {
        struct ldlm_resource *res = lock->l_resource;
        struct list_head rpc_list;
@@ -145,7 +138,7 @@ int ldlm_process_plain_lock(struct ldlm_lock *lock, __u64 *flags,
        LASSERT(list_empty(&res->lr_converting));
        INIT_LIST_HEAD(&rpc_list);
 
        LASSERT(list_empty(&res->lr_converting));
        INIT_LIST_HEAD(&rpc_list);
 
-        if (!first_enq) {
+       if (intention == LDLM_PROCESS_RESCAN) {
                 LASSERT(work_list != NULL);
                 rc = ldlm_plain_compat_queue(&res->lr_granted, lock, NULL);
                 if (!rc)
                 LASSERT(work_list != NULL);
                 rc = ldlm_plain_compat_queue(&res->lr_granted, lock, NULL);
                 if (!rc)
@@ -159,44 +152,25 @@ int ldlm_process_plain_lock(struct ldlm_lock *lock, __u64 *flags,
                 RETURN(LDLM_ITER_CONTINUE);
         }
 
                 RETURN(LDLM_ITER_CONTINUE);
         }
 
+       LASSERT((intention == LDLM_PROCESS_ENQUEUE && work_list == NULL) ||
+               (intention == LDLM_PROCESS_RECOVERY && work_list != NULL));
  restart:
         rc = ldlm_plain_compat_queue(&res->lr_granted, lock, &rpc_list);
         rc += ldlm_plain_compat_queue(&res->lr_waiting, lock, &rpc_list);
 
         if (rc != 2) {
  restart:
         rc = ldlm_plain_compat_queue(&res->lr_granted, lock, &rpc_list);
         rc += ldlm_plain_compat_queue(&res->lr_waiting, lock, &rpc_list);
 
         if (rc != 2) {
-                /* If either of the compat_queue()s returned 0, then we
-                 * have ASTs to send and must go onto the waiting list.
-                 *
-                 * bug 2322: we used to unlink and re-add here, which was a
-                 * terrible folly -- if we goto restart, we could get
-                 * re-ordered!  Causes deadlock, because ASTs aren't sent! */
-               if (list_empty(&lock->l_res_link))
-                        ldlm_resource_add_lock(res, &res->lr_waiting, lock);
-                unlock_res(res);
-                rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
-                                       LDLM_WORK_BL_AST);
-                lock_res(res);
-               if (rc == -ERESTART) {
-                       /* We were granted while waiting, nothing left to do */
-                       if (lock->l_granted_mode == lock->l_req_mode)
-                               GOTO(out, rc = 0);
-                       /* Lock was destroyed while we were waiting, abort */
-                       if (ldlm_is_destroyed(lock))
-                               GOTO(out, rc = -EAGAIN);
-
-                       /* Otherwise try again */
+               rc = ldlm_handle_conflict_lock(lock, flags, &rpc_list, 0);
+               if (rc == -ERESTART)
                        GOTO(restart, rc);
                        GOTO(restart, rc);
-               }
-                *flags |= LDLM_FL_BLOCK_GRANTED;
-        } else {
-                ldlm_resource_unlink_lock(lock);
-                ldlm_grant_lock(lock, NULL);
-        }
-
-       rc = 0;
-out:
-       *err = rc;
-       LASSERT(list_empty(&rpc_list));
+               *err = rc;
+       } else {
+               ldlm_resource_unlink_lock(lock);
+               ldlm_grant_lock(lock, work_list);
+               rc = 0;
+       }
+
+       if (!list_empty(&rpc_list))
+               ldlm_discard_bl_list(&rpc_list);
 
        RETURN(rc);
 }
 
        RETURN(rc);
 }
index f7d20c5..4d86785 100644 (file)
@@ -189,7 +189,7 @@ int ofd_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp,
                rc = LDLM_ITER_CONTINUE;
        } else {
                __u64 tmpflags = 0;
                rc = LDLM_ITER_CONTINUE;
        } else {
                __u64 tmpflags = 0;
-               rc = policy(lock, &tmpflags, 0, &err, NULL);
+               rc = policy(lock, &tmpflags, LDLM_PROCESS_RESCAN, &err, NULL);
                check_res_locked(res);
        }
 
                check_res_locked(res);
        }