Whamcloud - gitweb
b=18028
authoryury <yury>
Thu, 18 Dec 2008 09:18:41 +0000 (09:18 +0000)
committeryury <yury>
Thu, 18 Dec 2008 09:18:41 +0000 (09:18 +0000)
- handle failed cp_ast vs. bl_ast race correctly. Failed lock should not be found anymore. Almost the same patch as for 17645

lustre/include/lustre_dlm.h
lustre/include/obd_support.h
lustre/ldlm/ldlm_lock.c
lustre/ldlm/ldlm_lockd.c
lustre/ldlm/ldlm_request.c
lustre/tests/sanityN.sh

index 39326dc..96c5543 100644 (file)
@@ -154,11 +154,6 @@ typedef enum {
 /* Flags flags inherited from parent lock when doing intents. */
 #define LDLM_INHERIT_FLAGS     (LDLM_FL_CANCEL_ON_BLOCK)
 
-/* These are flags that are mapped into the flags and ASTs of blocking locks */
-#define LDLM_AST_DISCARD_DATA  0x80000000 /* Add FL_DISCARD to blocking ASTs */
-/* Flags sent in AST lock_flags to be mapped into the receiving lock. */
-#define LDLM_AST_FLAGS         (LDLM_FL_DISCARD_DATA)
-
 /* completion ast to be executed */
 #define LDLM_FL_CP_REQD        0x1000000
 
@@ -188,6 +183,16 @@ typedef enum {
 /* measure lock contention and return -EUSERS if locking contention is high */
 #define LDLM_FL_DENY_ON_CONTENTION 0x40000000
 
+/* These are flags that are mapped into the flags and ASTs of blocking locks */
+#define LDLM_AST_DISCARD_DATA  0x80000000 /* Add FL_DISCARD to blocking ASTs */
+
+/* Flags sent in AST lock_flags to be mapped into the receiving lock. */
+#define LDLM_AST_FLAGS         (LDLM_FL_DISCARD_DATA)
+
+/* Used for marking lock as an target for -EINTR while cp_ast sleep situation
+ * emulation + race with upcoming bl_ast.  */
+#define LDLM_FL_FAIL_LOC       0x100000000ULL
+
 /* The blocking callback is overloaded to perform two functions.  These flags
  * indicate which operation should be performed. */
 #define LDLM_CB_BLOCKING    1
@@ -621,7 +626,7 @@ struct ldlm_lock {
         /*
          * Protected by lr_lock. Various counters: readers, writers, etc.
          */
-        __u32                 l_flags;
+        __u64                 l_flags;
         __u32                 l_readers;
         __u32                 l_writers;
         /*
index bcb16c9..c6ab400 100644 (file)
@@ -258,6 +258,8 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 #define OBD_FAIL_LDLM_CANCEL_BL_CB_RACE  0x314
 #define OBD_FAIL_LDLM_CP_CB_WAIT         0x315
 #define OBD_FAIL_LDLM_OST_FAIL_RACE      0x316
+#define OBD_FAIL_LDLM_INTR_CP_AST        0x317
+#define OBD_FAIL_LDLM_CP_BL_RACE         0x318
 
 /* LOCKLESS IO */
 #define OBD_FAIL_LDLM_SET_CONTENTION     0x315
index 97d680c..50dcf54 100644 (file)
@@ -694,6 +694,10 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, __u32 mode)
                 LDLM_LOCK_GET(lock); /* dropped by bl thread */
                 ldlm_lock_remove_from_lru(lock);
                 unlock_res_and_lock(lock);
+
+                if (lock->l_flags & LDLM_FL_FAIL_LOC)
+                        OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
+
                 if ((lock->l_flags & LDLM_FL_ATOMIC_CB) ||
                     ldlm_bl_to_thread_lock(ns, NULL, lock) != 0)
                         ldlm_handle_bl_callback(ns, NULL, lock);
@@ -1839,7 +1843,7 @@ void ldlm_lock_dump(int level, struct ldlm_lock *lock, int pos)
                   lock->l_resource->lr_name.name[1],
                   lock->l_resource->lr_name.name[2]);
         CDEBUG(level, "  Req mode: %s, grant mode: %s, rc: %u, read: %d, "
-               "write: %d flags: %#x\n", ldlm_lockname[lock->l_req_mode],
+               "write: %d flags: "LPX64"\n", ldlm_lockname[lock->l_req_mode],
                ldlm_lockname[lock->l_granted_mode],
                atomic_read(&lock->l_refc), lock->l_readers, lock->l_writers,
                lock->l_flags);
@@ -1888,7 +1892,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level,
                 libcfs_debug_vmsg2(cdls, data->msg_subsys, level,data->msg_file,
                                    data->msg_fn, data->msg_line, fmt, args,
                        " ns: \?\? lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
-                       "res: \?\? rrc=\?\? type: \?\?\? flags: %x remote: "
+                                   "res: \?\? rrc=\?\? type: \?\?\? flags: "LPX64" remote: "
                        LPX64" expref: %d pid: %u timeout: %lu\n", lock,
                        lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
                        lock->l_readers, lock->l_writers,
@@ -1908,7 +1912,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level,
                                    data->msg_fn, data->msg_line, fmt, args,
                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
                        "res: "LPU64"/"LPU64" rrc: %d type: %s ["LPU64"->"LPU64
-                       "] (req "LPU64"->"LPU64") flags: %x remote: "LPX64
+                                   "] (req "LPU64"->"LPU64") flags: "LPX64" remote: "LPX64
                        " expref: %d pid: %u timeout %lu\n",
                        lock->l_resource->lr_namespace->ns_name, lock,
                        lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
@@ -1933,7 +1937,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level,
                                    data->msg_fn, data->msg_line, fmt, args,
                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
                        "res: "LPU64"/"LPU64" rrc: %d type: %s pid: %d "
-                       "["LPU64"->"LPU64"] flags: %x remote: "LPX64
+                                   "["LPU64"->"LPU64"] flags: "LPX64" remote: "LPX64
                        " expref: %d pid: %u timeout: %lu\n",
                        lock->l_resource->lr_namespace->ns_name, lock,
                        lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
@@ -1958,7 +1962,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level,
                                    data->msg_fn, data->msg_line, fmt, args,
                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
                        "res: "LPU64"/"LPU64" bits "LPX64" rrc: %d type: %s "
-                       "flags: %x remote: "LPX64" expref: %d "
+                                   "flags: "LPX64" remote: "LPX64" expref: %d "
                        "pid: %u timeout: %lu\n",
                        lock->l_resource->lr_namespace->ns_name,
                        lock, lock->l_handle.h_cookie,
@@ -1981,7 +1985,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock, __u32 level,
                 libcfs_debug_vmsg2(cdls, data->msg_subsys, level,data->msg_file,
                                    data->msg_fn, data->msg_line, fmt, args,
                        " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
-                       "res: "LPU64"/"LPU64" rrc: %d type: %s flags: %x "
+                                   "res: "LPU64"/"LPU64" rrc: %d type: %s flags: "LPX64" "
                        "remote: "LPX64" expref: %d pid: %u timeout %lu\n",
                        lock->l_resource->lr_namespace->ns_name,
                        lock, lock->l_handle.h_cookie,
index 13ee9e1..102538d 100644 (file)
@@ -1187,7 +1187,7 @@ existing_lock:
                 if (unlikely(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) ||
                              !(dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK))){
                         CERROR("Granting sync lock to libclient. "
-                               "req fl %d, rep fl %d, lock fl %d\n",
+                               "req fl %d, rep fl %d, lock fl "LPX64"\n",
                                dlm_req->lock_flags, dlm_rep->lock_flags,
                                lock->l_flags);
                         LDLM_ERROR(lock, "sync lock");
@@ -1812,15 +1812,21 @@ static int ldlm_callback_handler(struct ptlrpc_request *req)
                 RETURN(0);
         }
 
+        if ((lock->l_flags & LDLM_FL_FAIL_LOC) && 
+            lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK)
+                OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
+
         /* Copy hints/flags (e.g. LDLM_FL_DISCARD_DATA) from AST. */
         lock_res_and_lock(lock);
         lock->l_flags |= (dlm_req->lock_flags & LDLM_AST_FLAGS);
         if (lustre_msg_get_opc(req->rq_reqmsg) == LDLM_BL_CALLBACK) {
-                /* If somebody cancels locks and cache is already droped,
+                /* If somebody cancels lock and cache is already droped,
+                 * or lock is failed before cp_ast received on client,
                  * we can tell the server we have no lock. Otherwise, we
                  * should send cancel after dropping the cache. */
-                if ((lock->l_flags & LDLM_FL_CANCELING) &&
-                    (lock->l_flags & LDLM_FL_BL_DONE)) {
+                if (((lock->l_flags & LDLM_FL_CANCELING) &&
+                    (lock->l_flags & LDLM_FL_BL_DONE)) ||
+                    (lock->l_flags & LDLM_FL_FAILED)) {
                         LDLM_DEBUG(lock, "callback on lock "
                                    LPX64" - lock disappeared\n",
                                    dlm_req->lock_handle[0].cookie);
index 1317844..7b84a27 100644 (file)
@@ -266,8 +266,16 @@ noreproc:
                 spin_unlock(&imp->imp_lock);
         }
 
-        /* Go to sleep until the lock is granted or cancelled. */
-        rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);
+        if (ns_is_client(lock->l_resource->lr_namespace) && 
+            OBD_FAIL_CHECK_RESET(OBD_FAIL_LDLM_INTR_CP_AST,
+                                 OBD_FAIL_LDLM_CP_BL_RACE | OBD_FAIL_ONCE)) {
+                lock->l_flags |= LDLM_FL_FAIL_LOC;
+                rc = -EINTR;
+        } else {
+                /* Go to sleep until the lock is granted or cancelled. */
+                rc = l_wait_event(lock->l_waitq, 
+                                  is_granted_or_cancelled(lock), &lwi);
+        }
 
         if (rc) {
                 LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",
@@ -447,13 +455,31 @@ static void failed_lock_cleanup(struct ldlm_namespace *ns,
                                 struct ldlm_lock *lock,
                                 struct lustre_handle *lockh, int mode)
 {
+        int need_cancel = 0;
+
         /* Set a flag to prevent us from sending a CANCEL (bug 407) */
         lock_res_and_lock(lock);
-        lock->l_flags |= LDLM_FL_LOCAL_ONLY;
+        /* Check that lock is not granted or failed, we might race. */
+        if ((lock->l_req_mode != lock->l_granted_mode) && 
+            !(lock->l_flags & LDLM_FL_FAILED)) {
+                /* Make sure that this lock will not be found by raced
+                 * bl_ast and -EINVAL reply is sent to server anyways. 
+                 * bug 17645 */
+                lock->l_flags |= LDLM_FL_LOCAL_ONLY | LDLM_FL_FAILED | 
+                                 LDLM_FL_ATOMIC_CB;
+                need_cancel = 1;
+        }
         unlock_res_and_lock(lock);
-        LDLM_DEBUG(lock, "setting FL_LOCAL_ONLY");
-
-        ldlm_lock_decref_and_cancel(lockh, mode);
+  
+        if (need_cancel) {
+                LDLM_DEBUG(lock, 
+                           "setting FL_LOCAL_ONLY | LDLM_FL_FAILED | " 
+                           "LDLM_FL_ATOMIC_CB");
+                ldlm_lock_decref_and_cancel(lockh, mode);
+        } else {
+                LDLM_DEBUG(lock, "lock was granted or failed in race");
+                ldlm_lock_decref(lockh, mode);
+        }
 
         /* XXX - HACK because we shouldn't call ldlm_lock_destroy()
          *       from llite/file.c/ll_file_flock(). */
index 1dba529..89cbd93 100644 (file)
@@ -826,6 +826,45 @@ test_34() { #16129
 }
 run_test 34 "no lock timeout under IO"
 
+test_35() { # bug 17645
+        local generation=[]
+        local count=0
+        for imp in /proc/fs/lustre/mdc/$FSNAME-MDT*-mdc-*; do
+            g=$(awk '/generation/{print $2}' $imp/import)
+            generation[count]=$g
+            let count=count+1
+        done
+
+        mkdir -p $MOUNT1/$tfile
+        createmany -o $MOUNT1/$tfile/a 2000
+        sync
+        cancel_lru_locks mdc
+
+        # Let's initiate -EINTR situation by setting fail_loc and take
+        # write lock on same file from same client. This will not cause
+        # bl_ast yet as lock is already in local cache.
+#define OBD_FAIL_LDLM_INTR_CP_AST        0x317
+        do_facet client "lctl set_param fail_loc=0x80000317"
+        ls -la $MOUNT1/$tfile > /dev/null &
+        sleep 1
+        
+        # Let's take write lock on same file from another mount. This
+        # should cause conflict and bl_ast
+        createmany -o $MOUNT2/$tfile/a 2000 &
+        wait
+        do_facet client "lctl set_param fail_loc=0x0"
+        df -h $MOUNT1 $MOUNT2
+        count=0
+        for imp in /proc/fs/lustre/mdc/$FSNAME-MDT*-mdc-*; do
+            g=$(awk '/generation/{print $2}' $imp/import)
+            if ! test "$g" -eq "${generation[count]}"; then
+                error "Eviction happened on import $(basename $imp)"
+            fi
+            let count=count+1
+        done
+}
+run_test 35 "-EINTR cp_ast vs. bl_ast race does not evict client"
+
 log "cleanup: ======================================================"
 
 check_and_cleanup_lustre