Whamcloud - gitweb
LU-2177 ldlm: flock completion fixes.
[fs/lustre-release.git] / lustre / ldlm / ldlm_flock.c
index 09655be..930db54 100644 (file)
@@ -146,7 +146,7 @@ ldlm_flock_destroy(struct ldlm_lock *lock, ldlm_mode_t mode, __u64 flags)
        LASSERT(cfs_hlist_unhashed(&lock->l_exp_flock_hash));
 
        cfs_list_del_init(&lock->l_res_link);
-       if (flags == LDLM_FL_WAIT_NOREPROC && !ldlm_is_failed(lock)) {
+       if (flags == LDLM_FL_WAIT_NOREPROC) {
                /* client side - set a flag to prevent sending a CANCEL */
                lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_CBPENDING;
 
@@ -673,27 +673,21 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
         int                             rc = 0;
         ENTRY;
 
+       OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT2, 4);
+       if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_CP_CB_WAIT3)) {
+               lock_res_and_lock(lock);
+               lock->l_flags |= LDLM_FL_FAIL_LOC;
+               unlock_res_and_lock(lock);
+               OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT3, 4);
+       }
        CDEBUG(D_DLMTRACE, "flags: "LPX64" data: %p getlk: %p\n",
               flags, data, getlk);
 
-       /* Import invalidation. We need to actually release the lock
-        * references being held, so that it can go away. No point in
-        * holding the lock even if app still believes it has it, since
-        * server already dropped it anyway. Only for granted locks too. */
-       if ((lock->l_flags & (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) ==
-           (LDLM_FL_FAILED|LDLM_FL_LOCAL_ONLY)) {
-               if (lock->l_req_mode == lock->l_granted_mode &&
-                   lock->l_granted_mode != LCK_NL &&
-                   NULL == data)
-                       ldlm_lock_decref_internal(lock, lock->l_req_mode);
-
-               /* Need to wake up the waiter if we were evicted */
-               wake_up(&lock->l_waitq);
-               RETURN(0);
-       }
-
        LASSERT(flags != LDLM_FL_WAIT_NOREPROC);
 
+       if (flags & LDLM_FL_FAILED)
+               goto granted;
+
        if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |
                       LDLM_FL_BLOCK_CONV))) {
                if (NULL == data)
@@ -733,12 +727,21 @@ ldlm_flock_completion_ast(struct ldlm_lock *lock, __u64 flags, void *data)
 granted:
         OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT, 10);
 
-        if (ldlm_is_failed(lock)) {
-                LDLM_DEBUG(lock, "client-side enqueue waking up: failed");
-                RETURN(-EIO);
-        }
-
-        LDLM_DEBUG(lock, "client-side enqueue granted");
+       if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_CP_CB_WAIT4)) {
+               lock_res_and_lock(lock);
+               /* DEADLOCK is always set with CBPENDING */
+               lock->l_flags |= LDLM_FL_FLOCK_DEADLOCK | LDLM_FL_CBPENDING;
+               unlock_res_and_lock(lock);
+               OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT4, 4);
+       }
+       if (OBD_FAIL_PRECHECK(OBD_FAIL_LDLM_CP_CB_WAIT5)) {
+               lock_res_and_lock(lock);
+               /* DEADLOCK is always set with CBPENDING */
+               lock->l_flags |= LDLM_FL_FAIL_LOC |
+                                LDLM_FL_FLOCK_DEADLOCK | LDLM_FL_CBPENDING;
+               unlock_res_and_lock(lock);
+               OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_CP_CB_WAIT5, 4);
+       }
 
        lock_res_and_lock(lock);
 
@@ -749,22 +752,54 @@ granted:
        if (ldlm_is_destroyed(lock)) {
                unlock_res_and_lock(lock);
                LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");
-               RETURN(0);
-       }
 
-       /* take lock off the deadlock detection hash list. */
-        ldlm_flock_blocking_unlink(lock);
+               /* An error is still to be returned, to propagate it up to
+                * ldlm_cli_enqueue_fini() caller. */
+               RETURN(-EIO);
+       }
 
         /* ldlm_lock_enqueue() has already placed lock on the granted list. */
-        cfs_list_del_init(&lock->l_res_link);
+       ldlm_resource_unlink_lock(lock);
+
+       /* Import invalidation. We need to actually release the lock
+        * references being held, so that it can go away. No point in
+        * holding the lock even if app still believes it has it, since
+        * server already dropped it anyway. Only for granted locks too. */
+       /* Do the same for DEADLOCK'ed locks. */
+       if (ldlm_is_failed(lock) || ldlm_is_flock_deadlock(lock)) {
+               int mode;
+
+               if (flags & LDLM_FL_TEST_LOCK)
+                       LASSERT(ldlm_is_test_lock(lock));
+
+               if (ldlm_is_test_lock(lock) || ldlm_is_flock_deadlock(lock))
+                       mode = flock_type(getlk);
+               else
+                       mode = lock->l_granted_mode;
+
+               if (ldlm_is_flock_deadlock(lock)) {
+                       LDLM_DEBUG(lock, "client-side enqueue deadlock "
+                                  "received");
+                       rc = -EDEADLK;
+               }
+               ldlm_flock_destroy(lock, mode, LDLM_FL_WAIT_NOREPROC);
+               unlock_res_and_lock(lock);
+
+               /* Need to wake up the waiter if we were evicted */
+               wake_up(&lock->l_waitq);
+
+               /* An error is still to be returned, to propagate it up to
+                * ldlm_cli_enqueue_fini() caller. */
+               RETURN(rc ? : -EIO);
+       }
+
+       LDLM_DEBUG(lock, "client-side enqueue granted");
 
-       if (ldlm_is_flock_deadlock(lock)) {
-               LDLM_DEBUG(lock, "client-side enqueue deadlock received");
-               rc = -EDEADLK;
-       } else if (flags & LDLM_FL_TEST_LOCK) {
+       if (flags & LDLM_FL_TEST_LOCK) {
                 /* fcntl(F_GETLK) request */
                 /* The old mode was saved in getlk->fl_type so that if the mode
                  * in the lock changes we can decref the appropriate refcount.*/
+               LASSERT(ldlm_is_test_lock(lock));
                ldlm_flock_destroy(lock, flock_type(getlk),
                                   LDLM_FL_WAIT_NOREPROC);
                switch (lock->l_granted_mode) {