Whamcloud - gitweb
LU-13456 ldlm: fix reprocessing of locks with more bits
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
index 546f351..d9b1ca2 100644 (file)
@@ -27,7 +27,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * lustre/ldlm/ldlm_lock.c
  *
@@ -469,8 +468,7 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
        if (lock == NULL)
                RETURN(NULL);
 
-       spin_lock_init(&lock->l_lock);
-       lock->l_resource = resource;
+       RCU_INIT_POINTER(lock->l_resource, resource);
        lu_ref_add(&resource->lr_reference, "lock", lock);
 
        refcount_set(&lock->l_handle.h_ref, 2);
@@ -487,24 +485,24 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
        INIT_HLIST_NODE(&lock->l_exp_hash);
        INIT_HLIST_NODE(&lock->l_exp_flock_hash);
 
-        lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats,
-                             LDLM_NSS_LOCKS);
+       lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats,
+                            LDLM_NSS_LOCKS);
        INIT_HLIST_NODE(&lock->l_handle.h_link);
        class_handle_hash(&lock->l_handle, lock_handle_owner);
 
-        lu_ref_init(&lock->l_reference);
-        lu_ref_add(&lock->l_reference, "hash", lock);
+       lu_ref_init(&lock->l_reference);
+       lu_ref_add(&lock->l_reference, "hash", lock);
        lock->l_callback_timestamp = 0;
        lock->l_activity = 0;
 
 #if LUSTRE_TRACKS_LOCK_EXP_REFS
        INIT_LIST_HEAD(&lock->l_exp_refs_link);
-        lock->l_exp_refs_nr = 0;
-        lock->l_exp_refs_target = NULL;
+       lock->l_exp_refs_nr = 0;
+       lock->l_exp_refs_target = NULL;
 #endif
        INIT_LIST_HEAD(&lock->l_exp_list);
 
-        RETURN(lock);
+       RETURN(lock);
 }
 
 /**
@@ -544,12 +542,13 @@ int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
 
         lu_ref_add(&newres->lr_reference, "lock", lock);
         /*
-         * To flip the lock from the old to the new resource, lock, oldres and
-         * newres have to be locked. Resource spin-locks are nested within
-         * lock->l_lock, and are taken in the memory address order to avoid
-         * dead-locks.
+        * To flip the lock from the old to the new resource, oldres
+        * and newres have to be locked. Resource spin-locks are taken
+        * in the memory address order to avoid dead-locks.
+        * As this is the only circumstance where ->l_resource
+        * can change, and this cannot race with itself, it is safe
+        * to access lock->l_resource without being careful about locking.
          */
-       spin_lock(&lock->l_lock);
         oldres = lock->l_resource;
         if (oldres < newres) {
                 lock_res(oldres);
@@ -560,9 +559,9 @@ int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
         }
         LASSERT(memcmp(new_resid, &oldres->lr_name,
                        sizeof oldres->lr_name) != 0);
-        lock->l_resource = newres;
+       rcu_assign_pointer(lock->l_resource, newres);
         unlock_res(oldres);
-        unlock_res_and_lock(lock);
+       unlock_res(newres);
 
         /* ...and the flowers are still standing! */
         lu_ref_del(&oldres->lr_reference, "lock", lock);
@@ -1152,21 +1151,23 @@ void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list)
  * Check if the given @lock meets the criteria for a match.
  * A reference on the lock is taken if matched.
  *
- * \param lock     test-against this lock
- * \param data    parameters
+ * @lock       test-against this lock
+ * @data       parameters
+ *
+ * RETURN      returns true if @lock matches @data, false otherwise
  */
-static int lock_matches(struct ldlm_lock *lock, struct ldlm_match_data *data)
+static bool lock_matches(struct ldlm_lock *lock, struct ldlm_match_data *data)
 {
        union ldlm_policy_data *lpol = &lock->l_policy_data;
        enum ldlm_mode match = LCK_MINMODE;
 
        if (lock == data->lmd_old)
-               return INTERVAL_ITER_STOP;
+               return true;
 
        /* Check if this lock can be matched.
         * Used by LU-2919(exclusive open) for open lease lock */
        if (ldlm_is_excl(lock))
-               return INTERVAL_ITER_CONT;
+               return false;
 
        /* llite sometimes wants to match locks that will be
         * canceled when their users drop, but we allow it to match
@@ -1176,20 +1177,21 @@ static int lock_matches(struct ldlm_lock *lock, struct ldlm_match_data *data)
         * can still happen. */
        if (ldlm_is_cbpending(lock) &&
            !(data->lmd_flags & LDLM_FL_CBPENDING))
-               return INTERVAL_ITER_CONT;
+               return false;
+
        if (!(data->lmd_match & LDLM_MATCH_UNREF) && ldlm_is_cbpending(lock) &&
            lock->l_readers == 0 && lock->l_writers == 0)
-               return INTERVAL_ITER_CONT;
+               return false;
 
        if (!(lock->l_req_mode & *data->lmd_mode))
-               return INTERVAL_ITER_CONT;
+               return false;
 
        /* When we search for ast_data, we are not doing a traditional match,
         * so we don't worry about IBITS or extent matching.
         */
        if (data->lmd_match & (LDLM_MATCH_AST | LDLM_MATCH_AST_ANY)) {
                if (!lock->l_ast_data)
-                       return INTERVAL_ITER_CONT;
+                       return false;
 
                if (data->lmd_match & LDLM_MATCH_AST_ANY)
                        goto matched;
@@ -1199,14 +1201,15 @@ static int lock_matches(struct ldlm_lock *lock, struct ldlm_match_data *data)
 
        switch (lock->l_resource->lr_type) {
        case LDLM_EXTENT:
-               if (lpol->l_extent.start > data->lmd_policy->l_extent.start ||
-                   lpol->l_extent.end < data->lmd_policy->l_extent.end)
-                       return INTERVAL_ITER_CONT;
+               if (!(data->lmd_match & LDLM_MATCH_RIGHT) &&
+                   (lpol->l_extent.start > data->lmd_policy->l_extent.start ||
+                    lpol->l_extent.end < data->lmd_policy->l_extent.end))
+                       return false;
 
                if (unlikely(match == LCK_GROUP) &&
                    data->lmd_policy->l_extent.gid != LDLM_GID_ANY &&
                    lpol->l_extent.gid != data->lmd_policy->l_extent.gid)
-                       return INTERVAL_ITER_CONT;
+                       return false;
                break;
        case LDLM_IBITS:
                /* We match if we have existing lock with same or wider set
@@ -1214,13 +1217,13 @@ static int lock_matches(struct ldlm_lock *lock, struct ldlm_match_data *data)
                if ((lpol->l_inodebits.bits &
                     data->lmd_policy->l_inodebits.bits) !=
                    data->lmd_policy->l_inodebits.bits)
-                       return INTERVAL_ITER_CONT;
+                       return false;
 
                if (unlikely(match == LCK_GROUP) &&
                    data->lmd_policy->l_inodebits.li_gid != LDLM_GID_ANY &&
                    lpol->l_inodebits.li_gid !=
                    data->lmd_policy->l_inodebits.li_gid)
-                       return INTERVAL_ITER_CONT;
+                       return false;
                break;
        default:
                ;
@@ -1229,14 +1232,14 @@ static int lock_matches(struct ldlm_lock *lock, struct ldlm_match_data *data)
        /* We match if we have existing lock with same or wider set
           of bits. */
        if (!(data->lmd_match & LDLM_MATCH_UNREF) && LDLM_HAVE_MASK(lock, GONE))
-               return INTERVAL_ITER_CONT;
+               return false;
 
        if (!equi(data->lmd_flags & LDLM_FL_LOCAL_ONLY, ldlm_is_local(lock)))
-               return INTERVAL_ITER_CONT;
+               return false;
 
        /* Filter locks by skipping flags */
        if (data->lmd_skip_flags & lock->l_flags)
-               return INTERVAL_ITER_CONT;
+               return false;
 
 matched:
        if (data->lmd_flags & LDLM_FL_TEST_LOCK) {
@@ -1249,7 +1252,7 @@ matched:
        *data->lmd_mode = match;
        data->lmd_lock = lock;
 
-       return INTERVAL_ITER_STOP;
+       return true;
 }
 
 static unsigned int itree_overlap_cb(struct interval_node *in, void *args)
@@ -1257,11 +1260,9 @@ static unsigned int itree_overlap_cb(struct interval_node *in, void *args)
        struct ldlm_interval *node = to_ldlm_interval(in);
        struct ldlm_match_data *data = args;
        struct ldlm_lock *lock;
-       int rc;
 
        list_for_each_entry(lock, &node->li_group, l_sl_policy) {
-               rc = lock_matches(lock, data);
-               if (rc == INTERVAL_ITER_STOP)
+               if (lock_matches(lock, data))
                        return INTERVAL_ITER_STOP;
        }
        return INTERVAL_ITER_CONT;
@@ -1286,6 +1287,9 @@ struct ldlm_lock *search_itree(struct ldlm_resource *res,
 
        data->lmd_lock = NULL;
 
+       if (data->lmd_match & LDLM_MATCH_RIGHT)
+               ext.end = OBD_OBJECT_EOF;
+
        for (idx = 0; idx < LCK_MODE_NUM; idx++) {
                struct ldlm_interval_tree *tree = &res->lr_itree[idx];
 
@@ -1318,15 +1322,12 @@ static struct ldlm_lock *search_queue(struct list_head *queue,
                                      struct ldlm_match_data *data)
 {
        struct ldlm_lock *lock;
-       int rc;
 
        data->lmd_lock = NULL;
 
-       list_for_each_entry(lock, queue, l_res_link) {
-               rc = lock_matches(lock, data);
-               if (rc == INTERVAL_ITER_STOP)
+       list_for_each_entry(lock, queue, l_res_link)
+               if (lock_matches(lock, data))
                        return data->lmd_lock;
-       }
 
        return NULL;
 }
@@ -1335,7 +1336,7 @@ void ldlm_lock_fail_match_locked(struct ldlm_lock *lock)
 {
        if ((lock->l_flags & LDLM_FL_FAIL_NOTIFIED) == 0) {
                lock->l_flags |= LDLM_FL_FAIL_NOTIFIED;
-               wake_up_all(&lock->l_waitq);
+               wake_up(&lock->l_waitq);
        }
 }
 EXPORT_SYMBOL(ldlm_lock_fail_match_locked);
@@ -1357,7 +1358,7 @@ void ldlm_lock_fail_match(struct ldlm_lock *lock)
 void ldlm_lock_allow_match_locked(struct ldlm_lock *lock)
 {
        ldlm_set_lvb_ready(lock);
-       wake_up_all(&lock->l_waitq);
+       wake_up(&lock->l_waitq);
 }
 EXPORT_SYMBOL(ldlm_lock_allow_match_locked);
 
@@ -1934,8 +1935,7 @@ out:
  */
 int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
                         struct list_head *work_list,
-                        enum ldlm_process_intention intention,
-                        struct ldlm_lock *hint)
+                        enum ldlm_process_intention intention, __u64 hint)
 {
        struct list_head *tmp, *pos;
        ldlm_processing_policy policy;
@@ -2037,7 +2037,7 @@ int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags,
                class_fail_export(lock->l_export);
 
        if (rc == -ERESTART)
-               ldlm_reprocess_all(res, NULL);
+               ldlm_reprocess_all(res, 0);
 
        lock_res(res);
        if (rc == -ERESTART) {
@@ -2348,7 +2348,7 @@ out:
  */
 static void __ldlm_reprocess_all(struct ldlm_resource *res,
                                 enum ldlm_process_intention intention,
-                                struct ldlm_lock *hint)
+                                __u64 hint)
 {
        LIST_HEAD(rpc_list);
 #ifdef HAVE_SERVER_SUPPORT
@@ -2381,6 +2381,7 @@ restart:
                               LDLM_WORK_CP_AST);
        if (rc == -ERESTART) {
                LASSERT(list_empty(&rpc_list));
+               hint = 0;
                goto restart;
        }
 #else
@@ -2395,7 +2396,7 @@ restart:
        EXIT;
 }
 
-void ldlm_reprocess_all(struct ldlm_resource *res, struct ldlm_lock *hint)
+void ldlm_reprocess_all(struct ldlm_resource *res, __u64 hint)
 {
        __ldlm_reprocess_all(res, LDLM_PROCESS_RESCAN, hint);
 }
@@ -2407,7 +2408,7 @@ static int ldlm_reprocess_res(struct cfs_hash *hs, struct cfs_hash_bd *bd,
        struct ldlm_resource *res = cfs_hash_object(hs, hnode);
 
        /* This is only called once after recovery done. LU-8306. */
-       __ldlm_reprocess_all(res, LDLM_PROCESS_RECOVERY, NULL);
+       __ldlm_reprocess_all(res, LDLM_PROCESS_RECOVERY, 0);
        return 0;
 }
 
@@ -2446,7 +2447,7 @@ void ldlm_cancel_callback(struct ldlm_lock *lock)
 
                /* only canceller can set bl_done bit */
                ldlm_set_bl_done(lock);
-               wake_up_all(&lock->l_waitq);
+               wake_up(&lock->l_waitq);
        } else if (!ldlm_is_bl_done(lock)) {
                /* The lock is guaranteed to have been canceled once
                 * returning from this function. */
@@ -2554,7 +2555,7 @@ static void ldlm_cancel_lock_for_export(struct obd_export *exp,
        ldlm_lvbo_update(res, lock, NULL, 1);
        ldlm_lock_cancel(lock);
        if (!exp->exp_obd->obd_stopping)
-               ldlm_reprocess_all(res, lock);
+               ldlm_reprocess_all(res, lock->l_policy_data.l_inodebits.bits);
        ldlm_resource_putref(res);
 
        ecl->ecl_loop++;
@@ -2719,7 +2720,8 @@ void ldlm_lock_mode_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode)
        ldlm_grant_lock(lock, NULL);
        unlock_res_and_lock(lock);
 
-       ldlm_reprocess_all(lock->l_resource, lock);
+       ldlm_reprocess_all(lock->l_resource,
+                          lock->l_policy_data.l_inodebits.bits);
 
        EXIT;
 #endif
@@ -2762,15 +2764,11 @@ void _ldlm_lock_debug(struct ldlm_lock *lock,
        struct va_format vaf;
         char *nid = "local";
 
-       /* on server-side resource of lock doesn't change */
-       if ((lock->l_flags & LDLM_FL_NS_SRV) != 0) {
-               if (lock->l_resource != NULL)
-                       resource = ldlm_resource_getref(lock->l_resource);
-       } else if (spin_trylock(&lock->l_lock)) {
-               if (lock->l_resource != NULL)
-                       resource = ldlm_resource_getref(lock->l_resource);
-               spin_unlock(&lock->l_lock);
-       }
+       rcu_read_lock();
+       resource = rcu_dereference(lock->l_resource);
+       if (resource && !atomic_inc_not_zero(&resource->lr_refcount))
+               resource = NULL;
+       rcu_read_unlock();
 
         va_start(args, fmt);
        vaf.fmt = fmt;