Whamcloud - gitweb
LU-8347 ldlm: granting conflicting locks
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
index 28c2039..03ba8c2 100644 (file)
  *
  * You should have received a copy of the GNU General Public License
  * version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
  *
  * GPL HEADER END
  */
@@ -383,16 +379,6 @@ static int ldlm_lock_destroy_internal(struct ldlm_lock *lock)
         ldlm_lock_remove_from_lru(lock);
         class_handle_unhash(&lock->l_handle);
 
-#if 0
-        /* Wake anyone waiting for this lock */
-        /* FIXME: I should probably add yet another flag, instead of using
-         * l_export to only call this on clients */
-        if (lock->l_export)
-                class_export_put(lock->l_export);
-        lock->l_export = NULL;
-        if (lock->l_export && lock->l_completion_ast)
-                lock->l_completion_ast(lock, 0);
-#endif
         EXIT;
         return 1;
 }
@@ -603,6 +589,13 @@ struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle,
        if (lock == NULL)
                RETURN(NULL);
 
+       if (lock->l_export != NULL && lock->l_export->exp_failed) {
+               CDEBUG(D_INFO, "lock export failed: lock %p, exp %p\n",
+                      lock, lock->l_export);
+               LDLM_LOCK_PUT(lock);
+               RETURN(NULL);
+       }
+
        /* It's unlikely but possible that someone marked the lock as
         * destroyed after we did handle2object on it */
        if ((flags == 0) && !ldlm_is_destroyed(lock)) {
@@ -714,12 +707,12 @@ void ldlm_add_ast_work_item(struct ldlm_lock *lock, struct ldlm_lock *new,
  * r/w reference type is determined by \a mode
  * Calls ldlm_lock_addref_internal.
  */
-void ldlm_lock_addref(struct lustre_handle *lockh, enum ldlm_mode mode)
+void ldlm_lock_addref(const struct lustre_handle *lockh, enum ldlm_mode mode)
 {
        struct ldlm_lock *lock;
 
        lock = ldlm_handle2lock(lockh);
-       LASSERTF(lock != NULL, "Non-existing lock: "LPX64"\n", lockh->cookie);
+       LASSERTF(lock != NULL, "Non-existing lock: %#llx\n", lockh->cookie);
        ldlm_lock_addref_internal(lock, mode);
        LDLM_LOCK_PUT(lock);
 }
@@ -757,7 +750,7 @@ void ldlm_lock_addref_internal_nolock(struct ldlm_lock *lock,
  *
  * \retval -EAGAIN lock is being canceled.
  */
-int ldlm_lock_addref_try(struct lustre_handle *lockh, enum ldlm_mode mode)
+int ldlm_lock_addref_try(const struct lustre_handle *lockh, enum ldlm_mode mode)
 {
         struct ldlm_lock *lock;
         int               result;
@@ -900,10 +893,10 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, enum ldlm_mode mode)
 /**
  * Decrease reader/writer refcount for LDLM lock with handle \a lockh
  */
-void ldlm_lock_decref(struct lustre_handle *lockh, enum ldlm_mode mode)
+void ldlm_lock_decref(const struct lustre_handle *lockh, enum ldlm_mode mode)
 {
         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
-        LASSERTF(lock != NULL, "Non-existing lock: "LPX64"\n", lockh->cookie);
+       LASSERTF(lock != NULL, "Non-existing lock: %#llx\n", lockh->cookie);
         ldlm_lock_decref_internal(lock, mode);
         LDLM_LOCK_PUT(lock);
 }
@@ -915,7 +908,7 @@ EXPORT_SYMBOL(ldlm_lock_decref);
  * drops to zero instead of putting into LRU.
  *
  */
-void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh,
+void ldlm_lock_decref_and_cancel(const struct lustre_handle *lockh,
                                 enum ldlm_mode mode)
 {
         struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
@@ -1110,23 +1103,26 @@ void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list)
        if (work_list && lock->l_completion_ast != NULL)
                ldlm_add_ast_work_item(lock, NULL, work_list);
 
-       /* We should not add locks to granted list in the following cases:
-        * - this is an UNLOCK but not a real lock;
-        * - this is a TEST lock;
-        * - this is a F_CANCELLK lock (async flock has req_mode == 0)
-        * - this is a deadlock (flock cannot be granted) */
-       if (lock->l_req_mode == 0 ||
-           lock->l_req_mode == LCK_NL ||
-           ldlm_is_test_lock(lock) ||
-           ldlm_is_flock_deadlock(lock))
-               RETURN_EXIT;
-
         if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS)
                 ldlm_grant_lock_with_skiplist(lock);
         else if (res->lr_type == LDLM_EXTENT)
                 ldlm_extent_add_lock(res, lock);
-        else
-                ldlm_resource_add_lock(res, &res->lr_granted, lock);
+       else if (res->lr_type == LDLM_FLOCK) {
+               /* We should not add locks to granted list in the following
+                * cases:
+                * - this is an UNLOCK but not a real lock;
+                * - this is a TEST lock;
+                * - this is a F_CANCELLK lock (async flock has req_mode == 0)
+                * - this is a deadlock (flock cannot be granted) */
+               if (lock->l_req_mode == 0 ||
+                   lock->l_req_mode == LCK_NL ||
+                   ldlm_is_test_lock(lock) ||
+                   ldlm_is_flock_deadlock(lock))
+                       RETURN_EXIT;
+               ldlm_resource_add_lock(res, &res->lr_granted, lock);
+       } else {
+               LBUG();
+       }
 
         ldlm_pool_add(&ldlm_res_to_ns(res)->ns_pool, lock);
         EXIT;
@@ -1467,7 +1463,7 @@ enum ldlm_mode ldlm_lock_match(struct ldlm_namespace *ns, __u64 flags,
         }
  out2:
         if (rc) {
-                LDLM_DEBUG(lock, "matched ("LPU64" "LPU64")",
+               LDLM_DEBUG(lock, "matched (%llu %llu)",
                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
                                 res_id->name[2] : policy->l_extent.start,
                            (type == LDLM_PLAIN || type == LDLM_IBITS) ?
@@ -1487,7 +1483,7 @@ enum ldlm_mode ldlm_lock_match(struct ldlm_namespace *ns, __u64 flags,
 
         } else if (!(flags & LDLM_FL_TEST_LOCK)) {/*less verbose for test-only*/
                 LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res "
-                                  LPU64"/"LPU64" ("LPU64" "LPU64")", ns,
+                                 "%llu/%llu (%llu %llu)", ns,
                                   type, mode, res_id->name[0], res_id->name[1],
                                   (type == LDLM_PLAIN || type == LDLM_IBITS) ?
                                         res_id->name[2] :policy->l_extent.start,
@@ -1501,7 +1497,7 @@ enum ldlm_mode ldlm_lock_match(struct ldlm_namespace *ns, __u64 flags,
 }
 EXPORT_SYMBOL(ldlm_lock_match);
 
-enum ldlm_mode ldlm_revalidate_lock_handle(struct lustre_handle *lockh,
+enum ldlm_mode ldlm_revalidate_lock_handle(const struct lustre_handle *lockh,
                                           __u64 *bits)
 {
        struct ldlm_lock *lock;
@@ -2148,6 +2144,7 @@ void ldlm_reprocess_all(struct ldlm_resource *res)
 {
        struct list_head rpc_list;
 #ifdef HAVE_SERVER_SUPPORT
+       struct obd_device *obd;
         int rc;
         ENTRY;
 
@@ -2158,6 +2155,13 @@ void ldlm_reprocess_all(struct ldlm_resource *res)
                 return;
         }
 
+       /* Disable reprocess during lock replay stage but allow during
+        * request replay stage.
+        */
+       obd = ldlm_res_to_ns(res)->ns_obd;
+       if (obd->obd_recovering &&
+           atomic_read(&obd->obd_req_replay_clients) == 0)
+               RETURN_EXIT;
 restart:
         lock_res(res);
         rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list);
@@ -2270,7 +2274,10 @@ void ldlm_lock_cancel(struct ldlm_lock *lock)
         /* Releases cancel callback. */
         ldlm_cancel_callback(lock);
 
-       LASSERT(!ldlm_is_waited(lock));
+       /* Yes, second time, just in case it was added again while we were
+        * running with no res lock in ldlm_cancel_callback */
+       if (ldlm_is_waited(lock))
+               ldlm_del_waiting_lock(lock);
 
         ldlm_resource_unlink_lock(lock);
         ldlm_lock_destroy_nolock(lock);
@@ -2290,7 +2297,7 @@ EXPORT_SYMBOL(ldlm_lock_cancel);
 /**
  * Set opaque data into the lock that only makes sense to upper layer.
  */
-int ldlm_lock_set_data(struct lustre_handle *lockh, void *data)
+int ldlm_lock_set_data(const struct lustre_handle *lockh, void *data)
 {
         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
         int rc = -EINVAL;
@@ -2583,7 +2590,7 @@ struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock,
  *
  * Used when printing all locks on a resource for debug purposes.
  */
-void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh)
+void ldlm_lock_dump_handle(int level, const struct lustre_handle *lockh)
 {
         struct ldlm_lock *lock;
 
@@ -2624,9 +2631,9 @@ void _ldlm_lock_debug(struct ldlm_lock *lock,
 
         if (resource == NULL) {
                 libcfs_debug_vmsg2(msgdata, fmt, args,
-                       " ns: \?\? lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
-                       "res: \?\? rrc=\?\? type: \?\?\? flags: "LPX64" nid: %s "
-                       "remote: "LPX64" expref: %d pid: %u timeout: %lu "
+                      " ns: \?\? lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s "
+                      "res: \?\? rrc=\?\? type: \?\?\? flags: %#llx nid: %s "
+                      "remote: %#llx expref: %d pid: %u timeout: %lu "
                       "lvb_type: %d\n",
                        lock,
                       lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
@@ -2643,10 +2650,10 @@ void _ldlm_lock_debug(struct ldlm_lock *lock,
        switch (resource->lr_type) {
        case LDLM_EXTENT:
                libcfs_debug_vmsg2(msgdata, fmt, args,
-                       " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
-                       "res: "DLDLMRES" rrc: %d type: %s ["LPU64"->"LPU64"] "
-                       "(req "LPU64"->"LPU64") flags: "LPX64" nid: %s remote: "
-                       LPX64" expref: %d pid: %u timeout: %lu lvb_type: %d\n",
+                       " ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s "
+                       "res: "DLDLMRES" rrc: %d type: %s [%llu->%llu] "
+                       "(req %llu->%llu) flags: %#llx nid: %s remote: "
+                       "%#llx expref: %d pid: %u timeout: %lu lvb_type: %d\n",
                        ldlm_lock_to_ns_name(lock), lock,
                        lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
                        lock->l_readers, lock->l_writers,
@@ -2666,10 +2673,10 @@ void _ldlm_lock_debug(struct ldlm_lock *lock,
 
        case LDLM_FLOCK:
                libcfs_debug_vmsg2(msgdata, fmt, args,
-                       " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
+                       " ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s "
                        "res: "DLDLMRES" rrc: %d type: %s pid: %d "
-                       "["LPU64"->"LPU64"] flags: "LPX64" nid: %s "
-                       "remote: "LPX64" expref: %d pid: %u timeout: %lu\n",
+                       "[%llu->%llu] flags: %#llx nid: %s "
+                       "remote: %#llx expref: %d pid: %u timeout: %lu\n",
                        ldlm_lock_to_ns_name(lock), lock,
                        lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
                        lock->l_readers, lock->l_writers,
@@ -2688,9 +2695,9 @@ void _ldlm_lock_debug(struct ldlm_lock *lock,
 
        case LDLM_IBITS:
                libcfs_debug_vmsg2(msgdata, fmt, args,
-                       " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
-                       "res: "DLDLMRES" bits "LPX64" rrc: %d type: %s "
-                       "flags: "LPX64" nid: %s remote: "LPX64" expref: %d "
+                       " ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s "
+                       "res: "DLDLMRES" bits %#llx rrc: %d type: %s "
+                       "flags: %#llx nid: %s remote: %#llx expref: %d "
                        "pid: %u timeout: %lu lvb_type: %d\n",
                        ldlm_lock_to_ns_name(lock),
                        lock, lock->l_handle.h_cookie,
@@ -2710,9 +2717,9 @@ void _ldlm_lock_debug(struct ldlm_lock *lock,
 
        default:
                libcfs_debug_vmsg2(msgdata, fmt, args,
-                       " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
-                       "res: "DLDLMRES" rrc: %d type: %s flags: "LPX64" "
-                       "nid: %s remote: "LPX64" expref: %d pid: %u "
+                       " ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s "
+                       "res: "DLDLMRES" rrc: %d type: %s flags: %#llx "
+                       "nid: %s remote: %#llx expref: %d pid: %u "
                        "timeout: %lu lvb_type: %d\n",
                        ldlm_lock_to_ns_name(lock),
                        lock, lock->l_handle.h_cookie,