Whamcloud - gitweb
LU-13456 ldlm: fix reprocessing of locks with more bits
[fs/lustre-release.git] / lustre / ldlm / ldlm_lock.c
index b8e3989..d9b1ca2 100644 (file)
@@ -27,7 +27,6 @@
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  *
  * lustre/ldlm/ldlm_lock.c
  *
@@ -190,6 +189,15 @@ struct ldlm_lock *ldlm_lock_get(struct ldlm_lock *lock)
 }
 EXPORT_SYMBOL(ldlm_lock_get);
 
+static void lock_handle_free(struct rcu_head *rcu)
+{
+       struct ldlm_lock *lock = container_of(rcu, struct ldlm_lock,
+                                             l_handle.h_rcu);
+
+       OBD_FREE_PRE(lock, sizeof(*lock), "slab-freed");
+       kmem_cache_free(ldlm_lock_slab, lock);
+}
+
 /**
  * Release lock reference.
  *
@@ -234,7 +242,7 @@ void ldlm_lock_put(struct ldlm_lock *lock)
                ldlm_resource_putref(res);
                lock->l_resource = NULL;
                 lu_ref_fini(&lock->l_reference);
-               OBD_FREE_RCU(lock, sizeof(*lock), &lock->l_handle);
+               call_rcu(&lock->l_handle.h_rcu, lock_handle_free);
         }
 
         EXIT;
@@ -438,16 +446,7 @@ void ldlm_lock_destroy_nolock(struct ldlm_lock *lock)
         EXIT;
 }
 
-static void lock_handle_free(void *lock, int size)
-{
-       LASSERT(size == sizeof(struct ldlm_lock));
-       OBD_SLAB_FREE(lock, ldlm_lock_slab, size);
-}
-
-static struct portals_handle_ops lock_handle_ops = {
-       .hop_free   = lock_handle_free,
-       .hop_type       = "ldlm",
-};
+static const char lock_handle_owner[] = "ldlm";
 
 /**
  *
@@ -469,8 +468,7 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
        if (lock == NULL)
                RETURN(NULL);
 
-       spin_lock_init(&lock->l_lock);
-       lock->l_resource = resource;
+       RCU_INIT_POINTER(lock->l_resource, resource);
        lu_ref_add(&resource->lr_reference, "lock", lock);
 
        refcount_set(&lock->l_handle.h_ref, 2);
@@ -487,24 +485,24 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
        INIT_HLIST_NODE(&lock->l_exp_hash);
        INIT_HLIST_NODE(&lock->l_exp_flock_hash);
 
-        lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats,
-                             LDLM_NSS_LOCKS);
-       INIT_LIST_HEAD_RCU(&lock->l_handle.h_link);
-       class_handle_hash(&lock->l_handle, &lock_handle_ops);
+       lprocfs_counter_incr(ldlm_res_to_ns(resource)->ns_stats,
+                            LDLM_NSS_LOCKS);
+       INIT_HLIST_NODE(&lock->l_handle.h_link);
+       class_handle_hash(&lock->l_handle, lock_handle_owner);
 
-        lu_ref_init(&lock->l_reference);
-        lu_ref_add(&lock->l_reference, "hash", lock);
-        lock->l_callback_timeout = 0;
+       lu_ref_init(&lock->l_reference);
+       lu_ref_add(&lock->l_reference, "hash", lock);
+       lock->l_callback_timestamp = 0;
        lock->l_activity = 0;
 
 #if LUSTRE_TRACKS_LOCK_EXP_REFS
        INIT_LIST_HEAD(&lock->l_exp_refs_link);
-        lock->l_exp_refs_nr = 0;
-        lock->l_exp_refs_target = NULL;
+       lock->l_exp_refs_nr = 0;
+       lock->l_exp_refs_target = NULL;
 #endif
        INIT_LIST_HEAD(&lock->l_exp_list);
 
-        RETURN(lock);
+       RETURN(lock);
 }
 
 /**
@@ -515,16 +513,16 @@ static struct ldlm_lock *ldlm_lock_new(struct ldlm_resource *resource)
 int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
                               const struct ldlm_res_id *new_resid)
 {
-        struct ldlm_resource *oldres = lock->l_resource;
+       struct ldlm_resource *oldres;
         struct ldlm_resource *newres;
         int type;
         ENTRY;
 
         LASSERT(ns_is_client(ns));
 
-        lock_res_and_lock(lock);
-        if (memcmp(new_resid, &lock->l_resource->lr_name,
-                   sizeof(lock->l_resource->lr_name)) == 0) {
+       oldres = lock_res_and_lock(lock);
+       if (memcmp(new_resid, &oldres->lr_name,
+                  sizeof(oldres->lr_name)) == 0) {
                 /* Nothing to do */
                 unlock_res_and_lock(lock);
                 RETURN(0);
@@ -544,12 +542,13 @@ int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
 
         lu_ref_add(&newres->lr_reference, "lock", lock);
         /*
-         * To flip the lock from the old to the new resource, lock, oldres and
-         * newres have to be locked. Resource spin-locks are nested within
-         * lock->l_lock, and are taken in the memory address order to avoid
-         * dead-locks.
+        * To flip the lock from the old to the new resource, oldres
+        * and newres have to be locked. Resource spin-locks are taken
+        * in the memory address order to avoid dead-locks.
+        * As this is the only circumstance where ->l_resource
+        * can change, and this cannot race with itself, it is safe
+        * to access lock->l_resource without being careful about locking.
          */
-       spin_lock(&lock->l_lock);
         oldres = lock->l_resource;
         if (oldres < newres) {
                 lock_res(oldres);
@@ -560,9 +559,9 @@ int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
         }
         LASSERT(memcmp(new_resid, &oldres->lr_name,
                        sizeof oldres->lr_name) != 0);
-        lock->l_resource = newres;
+       rcu_assign_pointer(lock->l_resource, newres);
         unlock_res(oldres);
-        unlock_res_and_lock(lock);
+       unlock_res(newres);
 
         /* ...and the flowers are still standing! */
         lu_ref_del(&oldres->lr_reference, "lock", lock);
@@ -603,8 +602,7 @@ struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle,
        if (!lustre_handle_is_used(handle))
                RETURN(NULL);
 
-       lock = class_handle2object(handle->cookie, &lock_handle_ops);
-
+       lock = class_handle2object(handle->cookie, lock_handle_owner);
        if (lock == NULL)
                RETURN(NULL);
 
@@ -618,7 +616,7 @@ struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle,
        /* It's unlikely but possible that someone marked the lock as
         * destroyed after we did handle2object on it */
        if ((flags == 0) && !ldlm_is_destroyed(lock)) {
-               lu_ref_add(&lock->l_reference, "handle", current);
+               lu_ref_add_atomic(&lock->l_reference, "handle", lock);
                RETURN(lock);
        }
 
@@ -626,7 +624,7 @@ struct ldlm_lock *__ldlm_handle2lock(const struct lustre_handle *handle,
 
        LASSERT(lock->l_resource != NULL);
 
-       lu_ref_add_atomic(&lock->l_reference, "handle", current);
+       lu_ref_add_atomic(&lock->l_reference, "handle", lock);
        if (unlikely(ldlm_is_destroyed(lock))) {
                unlock_res_and_lock(lock);
                CDEBUG(D_INFO, "lock already destroyed: lock %p\n", lock);
@@ -845,14 +843,15 @@ void ldlm_lock_decref_internal_nolock(struct ldlm_lock *lock,
  */
 void ldlm_lock_decref_internal(struct ldlm_lock *lock, enum ldlm_mode mode)
 {
-        struct ldlm_namespace *ns;
-        ENTRY;
+       struct ldlm_namespace *ns;
 
-        lock_res_and_lock(lock);
+       ENTRY;
 
-        ns = ldlm_lock_to_ns(lock);
+       lock_res_and_lock(lock);
+
+       ns = ldlm_lock_to_ns(lock);
 
-        ldlm_lock_decref_internal_nolock(lock, mode);
+       ldlm_lock_decref_internal_nolock(lock, mode);
 
        if ((ldlm_is_local(lock) || lock->l_req_mode == LCK_GROUP) &&
            !lock->l_readers && !lock->l_writers) {
@@ -869,52 +868,49 @@ void ldlm_lock_decref_internal(struct ldlm_lock *lock, enum ldlm_mode mode)
        }
 
        if (!lock->l_readers && !lock->l_writers && ldlm_is_cbpending(lock)) {
+               unsigned int mask = D_DLMTRACE;
+
                /* If we received a blocked AST and this was the last reference,
                 * run the callback. */
                if (ldlm_is_ns_srv(lock) && lock->l_export)
-                        CERROR("FL_CBPENDING set on non-local lock--just a "
-                               "warning\n");
-
-                LDLM_DEBUG(lock, "final decref done on cbpending lock");
+                       mask |= D_WARNING;
+               LDLM_DEBUG_LIMIT(mask, lock,
+                                "final decref done on %sCBPENDING lock",
+                                mask & D_WARNING ? "non-local " : "");
 
-                LDLM_LOCK_GET(lock); /* dropped by bl thread */
-                ldlm_lock_remove_from_lru(lock);
-                unlock_res_and_lock(lock);
+               LDLM_LOCK_GET(lock); /* dropped by bl thread */
+               ldlm_lock_remove_from_lru(lock);
+               unlock_res_and_lock(lock);
 
                if (ldlm_is_fail_loc(lock))
-                        OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
+                       OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
 
                if (ldlm_is_atomic_cb(lock) ||
                     ldlm_bl_to_thread_lock(ns, NULL, lock) != 0)
-                        ldlm_handle_bl_callback(ns, NULL, lock);
+                       ldlm_handle_bl_callback(ns, NULL, lock);
         } else if (ns_is_client(ns) &&
-                   !lock->l_readers && !lock->l_writers &&
+                  !lock->l_readers && !lock->l_writers &&
                   !ldlm_is_no_lru(lock) &&
                   !ldlm_is_bl_ast(lock) &&
                   !ldlm_is_converting(lock)) {
 
-                LDLM_DEBUG(lock, "add lock into lru list");
-
-                /* If this is a client-side namespace and this was the last
-                 * reference, put it on the LRU. */
-                ldlm_lock_add_to_lru(lock);
-                unlock_res_and_lock(lock);
+               /* If this is a client-side namespace and this was the last
+                * reference, put it on the LRU.
+                */
+               ldlm_lock_add_to_lru(lock);
+               unlock_res_and_lock(lock);
+               LDLM_DEBUG(lock, "add lock into lru list");
 
                if (ldlm_is_fail_loc(lock))
-                        OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
-
-                /* Call ldlm_cancel_lru() only if EARLY_CANCEL and LRU RESIZE
-                 * are not supported by the server, otherwise, it is done on
-                 * enqueue. */
-                if (!exp_connect_cancelset(lock->l_conn_export) &&
-                    !ns_connect_lru_resize(ns))
-                       ldlm_cancel_lru(ns, 0, LCF_ASYNC, 0);
-        } else {
-                LDLM_DEBUG(lock, "do not add lock into lru list");
-                unlock_res_and_lock(lock);
-        }
+                       OBD_RACE(OBD_FAIL_LDLM_CP_BL_RACE);
 
-        EXIT;
+               ldlm_pool_recalc(&ns->ns_pool, true);
+       } else {
+               LDLM_DEBUG(lock, "do not add lock into lru list");
+               unlock_res_and_lock(lock);
+       }
+
+       EXIT;
 }
 
 /**
@@ -1155,21 +1151,23 @@ void ldlm_grant_lock(struct ldlm_lock *lock, struct list_head *work_list)
  * Check if the given @lock meets the criteria for a match.
  * A reference on the lock is taken if matched.
  *
- * \param lock     test-against this lock
- * \param data    parameters
+ * @lock       test-against this lock
+ * @data       parameters
+ *
+ * RETURN      returns true if @lock matches @data, false otherwise
  */
-static int lock_matches(struct ldlm_lock *lock, struct ldlm_match_data *data)
+static bool lock_matches(struct ldlm_lock *lock, struct ldlm_match_data *data)
 {
        union ldlm_policy_data *lpol = &lock->l_policy_data;
        enum ldlm_mode match = LCK_MINMODE;
 
        if (lock == data->lmd_old)
-               return INTERVAL_ITER_STOP;
+               return true;
 
        /* Check if this lock can be matched.
         * Used by LU-2919(exclusive open) for open lease lock */
        if (ldlm_is_excl(lock))
-               return INTERVAL_ITER_CONT;
+               return false;
 
        /* llite sometimes wants to match locks that will be
         * canceled when their users drop, but we allow it to match
@@ -1179,36 +1177,39 @@ static int lock_matches(struct ldlm_lock *lock, struct ldlm_match_data *data)
         * can still happen. */
        if (ldlm_is_cbpending(lock) &&
            !(data->lmd_flags & LDLM_FL_CBPENDING))
-               return INTERVAL_ITER_CONT;
-       if (!data->lmd_unref && ldlm_is_cbpending(lock) &&
+               return false;
+
+       if (!(data->lmd_match & LDLM_MATCH_UNREF) && ldlm_is_cbpending(lock) &&
            lock->l_readers == 0 && lock->l_writers == 0)
-               return INTERVAL_ITER_CONT;
+               return false;
 
        if (!(lock->l_req_mode & *data->lmd_mode))
-               return INTERVAL_ITER_CONT;
+               return false;
 
        /* When we search for ast_data, we are not doing a traditional match,
         * so we don't worry about IBITS or extent matching.
         */
-       if (data->lmd_has_ast_data) {
+       if (data->lmd_match & (LDLM_MATCH_AST | LDLM_MATCH_AST_ANY)) {
                if (!lock->l_ast_data)
-                       return INTERVAL_ITER_CONT;
+                       return false;
 
-               goto matched;
+               if (data->lmd_match & LDLM_MATCH_AST_ANY)
+                       goto matched;
        }
 
        match = lock->l_req_mode;
 
        switch (lock->l_resource->lr_type) {
        case LDLM_EXTENT:
-               if (lpol->l_extent.start > data->lmd_policy->l_extent.start ||
-                   lpol->l_extent.end < data->lmd_policy->l_extent.end)
-                       return INTERVAL_ITER_CONT;
+               if (!(data->lmd_match & LDLM_MATCH_RIGHT) &&
+                   (lpol->l_extent.start > data->lmd_policy->l_extent.start ||
+                    lpol->l_extent.end < data->lmd_policy->l_extent.end))
+                       return false;
 
                if (unlikely(match == LCK_GROUP) &&
                    data->lmd_policy->l_extent.gid != LDLM_GID_ANY &&
                    lpol->l_extent.gid != data->lmd_policy->l_extent.gid)
-                       return INTERVAL_ITER_CONT;
+                       return false;
                break;
        case LDLM_IBITS:
                /* We match if we have existing lock with same or wider set
@@ -1216,7 +1217,13 @@ static int lock_matches(struct ldlm_lock *lock, struct ldlm_match_data *data)
                if ((lpol->l_inodebits.bits &
                     data->lmd_policy->l_inodebits.bits) !=
                    data->lmd_policy->l_inodebits.bits)
-                       return INTERVAL_ITER_CONT;
+                       return false;
+
+               if (unlikely(match == LCK_GROUP) &&
+                   data->lmd_policy->l_inodebits.li_gid != LDLM_GID_ANY &&
+                   lpol->l_inodebits.li_gid !=
+                   data->lmd_policy->l_inodebits.li_gid)
+                       return false;
                break;
        default:
                ;
@@ -1224,15 +1231,15 @@ static int lock_matches(struct ldlm_lock *lock, struct ldlm_match_data *data)
 
        /* We match if we have existing lock with same or wider set
           of bits. */
-       if (!data->lmd_unref && LDLM_HAVE_MASK(lock, GONE))
-               return INTERVAL_ITER_CONT;
+       if (!(data->lmd_match & LDLM_MATCH_UNREF) && LDLM_HAVE_MASK(lock, GONE))
+               return false;
 
        if (!equi(data->lmd_flags & LDLM_FL_LOCAL_ONLY, ldlm_is_local(lock)))
-               return INTERVAL_ITER_CONT;
+               return false;
 
        /* Filter locks by skipping flags */
        if (data->lmd_skip_flags & lock->l_flags)
-               return INTERVAL_ITER_CONT;
+               return false;
 
 matched:
        if (data->lmd_flags & LDLM_FL_TEST_LOCK) {
@@ -1245,7 +1252,7 @@ matched:
        *data->lmd_mode = match;
        data->lmd_lock = lock;
 
-       return INTERVAL_ITER_STOP;
+       return true;
 }
 
 static unsigned int itree_overlap_cb(struct interval_node *in, void *args)
@@ -1253,11 +1260,9 @@ static unsigned int itree_overlap_cb(struct interval_node *in, void *args)
        struct ldlm_interval *node = to_ldlm_interval(in);
        struct ldlm_match_data *data = args;
        struct ldlm_lock *lock;
-       int rc;
 
        list_for_each_entry(lock, &node->li_group, l_sl_policy) {
-               rc = lock_matches(lock, data);
-               if (rc == INTERVAL_ITER_STOP)
+               if (lock_matches(lock, data))
                        return INTERVAL_ITER_STOP;
        }
        return INTERVAL_ITER_CONT;
@@ -1282,6 +1287,9 @@ struct ldlm_lock *search_itree(struct ldlm_resource *res,
 
        data->lmd_lock = NULL;
 
+       if (data->lmd_match & LDLM_MATCH_RIGHT)
+               ext.end = OBD_OBJECT_EOF;
+
        for (idx = 0; idx < LCK_MODE_NUM; idx++) {
                struct ldlm_interval_tree *tree = &res->lr_itree[idx];
 
@@ -1314,15 +1322,12 @@ static struct ldlm_lock *search_queue(struct list_head *queue,
                                      struct ldlm_match_data *data)
 {
        struct ldlm_lock *lock;
-       int rc;
 
        data->lmd_lock = NULL;
 
-       list_for_each_entry(lock, queue, l_res_link) {
-               rc = lock_matches(lock, data);
-               if (rc == INTERVAL_ITER_STOP)
+       list_for_each_entry(lock, queue, l_res_link)
+               if (lock_matches(lock, data))
                        return data->lmd_lock;
-       }
 
        return NULL;
 }
@@ -1331,7 +1336,7 @@ void ldlm_lock_fail_match_locked(struct ldlm_lock *lock)
 {
        if ((lock->l_flags & LDLM_FL_FAIL_NOTIFIED) == 0) {
                lock->l_flags |= LDLM_FL_FAIL_NOTIFIED;
-               wake_up_all(&lock->l_waitq);
+               wake_up(&lock->l_waitq);
        }
 }
 EXPORT_SYMBOL(ldlm_lock_fail_match_locked);
@@ -1353,7 +1358,7 @@ void ldlm_lock_fail_match(struct ldlm_lock *lock)
 void ldlm_lock_allow_match_locked(struct ldlm_lock *lock)
 {
        ldlm_set_lvb_ready(lock);
-       wake_up_all(&lock->l_waitq);
+       wake_up(&lock->l_waitq);
 }
 EXPORT_SYMBOL(ldlm_lock_allow_match_locked);
 
@@ -1405,7 +1410,8 @@ enum ldlm_mode ldlm_lock_match_with_skip(struct ldlm_namespace *ns,
                                         enum ldlm_type type,
                                         union ldlm_policy_data *policy,
                                         enum ldlm_mode mode,
-                                        struct lustre_handle *lockh, int unref)
+                                        struct lustre_handle *lockh,
+                                        enum ldlm_match_flags match_flags)
 {
        struct ldlm_match_data data = {
                .lmd_old = NULL,
@@ -1414,8 +1420,7 @@ enum ldlm_mode ldlm_lock_match_with_skip(struct ldlm_namespace *ns,
                .lmd_policy = policy,
                .lmd_flags = flags,
                .lmd_skip_flags = skip_flags,
-               .lmd_unref = unref,
-               .lmd_has_ast_data = false,
+               .lmd_match = match_flags,
        };
        struct ldlm_resource *res;
        struct ldlm_lock *lock;
@@ -1458,7 +1463,6 @@ enum ldlm_mode ldlm_lock_match_with_skip(struct ldlm_namespace *ns,
                    (!ldlm_is_lvb_ready(lock))) {
                        __u64 wait_flags = LDLM_FL_LVB_READY |
                                LDLM_FL_DESTROYED | LDLM_FL_FAIL_NOTIFIED;
-                       struct l_wait_info lwi;
 
                        if (lock->l_completion_ast) {
                                int err = lock->l_completion_ast(lock,
@@ -1468,12 +1472,11 @@ enum ldlm_mode ldlm_lock_match_with_skip(struct ldlm_namespace *ns,
                                        GOTO(out_fail_match, matched = 0);
                        }
 
-                       lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(obd_timeout),
-                                              NULL, LWI_ON_SIGNAL_NOOP, NULL);
+                       wait_event_idle_timeout(
+                               lock->l_waitq,
+                               lock->l_flags & wait_flags,
+                               cfs_time_seconds(obd_timeout));
 
-                       /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */
-                       l_wait_event(lock->l_waitq, lock->l_flags & wait_flags,
-                                    &lwi);
                        if (!ldlm_is_lvb_ready(lock))
                                GOTO(out_fail_match, matched = 0);
                }
@@ -1674,7 +1677,7 @@ struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,
 
        lock->l_req_mode = mode;
        lock->l_ast_data = data;
-       lock->l_pid = current_pid();
+       lock->l_pid = current->pid;
        if (ns_is_server(ns))
                ldlm_set_ns_srv(lock);
        if (cbs) {
@@ -1727,14 +1730,10 @@ static enum ldlm_error ldlm_lock_enqueue_helper(struct ldlm_lock *lock,
        ENTRY;
 
        policy = ldlm_get_processing_policy(res);
-restart:
        policy(lock, flags, LDLM_PROCESS_ENQUEUE, &rc, &rpc_list);
        if (rc == ELDLM_OK && lock->l_granted_mode != lock->l_req_mode &&
-           res->lr_type != LDLM_FLOCK) {
+           res->lr_type != LDLM_FLOCK)
                rc = ldlm_handle_conflict_lock(lock, flags, &rpc_list);
-               if (rc == -ERESTART)
-                       GOTO(restart, rc);
-       }
 
        if (!list_empty(&rpc_list))
                ldlm_discard_bl_list(&rpc_list);
@@ -1759,8 +1758,8 @@ enum ldlm_error ldlm_lock_enqueue(const struct lu_env *env,
                                  void *cookie, __u64 *flags)
 {
        struct ldlm_lock *lock = *lockp;
-       struct ldlm_resource *res = lock->l_resource;
-       int local = ns_is_client(ldlm_res_to_ns(res));
+       struct ldlm_resource *res;
+       int local = ns_is_client(ns);
        enum ldlm_error rc = ELDLM_OK;
        struct ldlm_interval *node = NULL;
 #ifdef HAVE_SERVER_SUPPORT
@@ -1814,15 +1813,21 @@ enum ldlm_error ldlm_lock_enqueue(const struct lu_env *env,
                RETURN(ELDLM_OK);
        }
 
+#ifdef HAVE_SERVER_SUPPORT
        /* For a replaying lock, it might be already in granted list. So
         * unlinking the lock will cause the interval node to be freed, we
         * have to allocate the interval node early otherwise we can't regrant
-        * this lock in the future. - jay */
-       if (!local && (*flags & LDLM_FL_REPLAY) && res->lr_type == LDLM_EXTENT)
+        * this lock in the future. - jay
+        *
+        * The only time the ldlm_resource changes for the ldlm_lock is when
+        * ldlm_lock_change_resource() is called and that only happens for
+        * the Lustre client case.
+        */
+       if (!local && (*flags & LDLM_FL_REPLAY) &&
+           lock->l_resource->lr_type == LDLM_EXTENT)
                OBD_SLAB_ALLOC_PTR_GFP(node, ldlm_interval_slab, GFP_NOFS);
 
-#ifdef HAVE_SERVER_SUPPORT
-       reconstruct = !local && res->lr_type == LDLM_FLOCK &&
+       reconstruct = !local && lock->l_resource->lr_type == LDLM_FLOCK &&
                      !(*flags & LDLM_FL_TEST_LOCK);
        if (reconstruct) {
                rc = req_can_reconstruct(cookie, NULL);
@@ -1833,8 +1838,7 @@ enum ldlm_error ldlm_lock_enqueue(const struct lu_env *env,
                }
        }
 #endif
-
-       lock_res_and_lock(lock);
+       res = lock_res_and_lock(lock);
        if (local && ldlm_is_granted(lock)) {
                 /* The server returned a blocked lock, but it was granted
                  * before we got a chance to actually enqueue it.  We don't
@@ -1931,8 +1935,7 @@ out:
  */
 int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
                         struct list_head *work_list,
-                        enum ldlm_process_intention intention,
-                        struct ldlm_lock *hint)
+                        enum ldlm_process_intention intention, __u64 hint)
 {
        struct list_head *tmp, *pos;
        ldlm_processing_policy policy;
@@ -2033,6 +2036,9 @@ int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags,
            !ns_is_client(ldlm_res_to_ns(res)))
                class_fail_export(lock->l_export);
 
+       if (rc == -ERESTART)
+               ldlm_reprocess_all(res, 0);
+
        lock_res(res);
        if (rc == -ERESTART) {
                /* 15715: The lock was granted and destroyed after
@@ -2052,10 +2058,8 @@ int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags,
                         * freed. Then we will fail at
                         * ldlm_extent_add_lock() */
                        *flags &= ~LDLM_FL_BLOCKED_MASK;
-                       RETURN(0);
                }
 
-               RETURN(rc);
        }
        *flags |= LDLM_FL_BLOCK_GRANTED;
 
@@ -2207,8 +2211,11 @@ int ldlm_work_gl_ast_lock(struct ptlrpc_request_set *rqset, void *opaq)
        arg->gl_interpret_data = gl_work->gl_interpret_data;
 
        /* invoke the actual glimpse callback */
-       if (lock->l_glimpse_ast(lock, (void*)arg) == 0)
-               rc = 1;
+       rc = lock->l_glimpse_ast(lock, (void *)arg);
+       if (rc == 0)
+               rc = 1; /* update LVB if this is server lock */
+       else if (rc == -ELDLM_NO_LOCK_DATA)
+               ldlm_lvbo_update(lock->l_resource, lock, NULL, 1);
 
        LDLM_LOCK_RELEASE(lock);
        if (gl_work->gl_flags & LDLM_GL_WORK_SLAB_ALLOCATED)
@@ -2341,9 +2348,9 @@ out:
  */
 static void __ldlm_reprocess_all(struct ldlm_resource *res,
                                 enum ldlm_process_intention intention,
-                                struct ldlm_lock *hint)
+                                __u64 hint)
 {
-       struct list_head rpc_list;
+       LIST_HEAD(rpc_list);
 #ifdef HAVE_SERVER_SUPPORT
        ldlm_reprocessing_policy reprocess;
        struct obd_device *obd;
@@ -2351,7 +2358,6 @@ static void __ldlm_reprocess_all(struct ldlm_resource *res,
 
        ENTRY;
 
-       INIT_LIST_HEAD(&rpc_list);
        /* Local lock trees don't get reprocessed. */
        if (ns_is_client(ldlm_res_to_ns(res))) {
                EXIT;
@@ -2375,12 +2381,12 @@ restart:
                               LDLM_WORK_CP_AST);
        if (rc == -ERESTART) {
                LASSERT(list_empty(&rpc_list));
+               hint = 0;
                goto restart;
        }
 #else
        ENTRY;
 
-       INIT_LIST_HEAD(&rpc_list);
        if (!ns_is_client(ldlm_res_to_ns(res))) {
                CERROR("This is client-side-only module, cannot handle "
                       "LDLM_NAMESPACE_SERVER resource type lock.\n");
@@ -2390,7 +2396,7 @@ restart:
        EXIT;
 }
 
-void ldlm_reprocess_all(struct ldlm_resource *res, struct ldlm_lock *hint)
+void ldlm_reprocess_all(struct ldlm_resource *res, __u64 hint)
 {
        __ldlm_reprocess_all(res, LDLM_PROCESS_RESCAN, hint);
 }
@@ -2402,7 +2408,7 @@ static int ldlm_reprocess_res(struct cfs_hash *hs, struct cfs_hash_bd *bd,
        struct ldlm_resource *res = cfs_hash_object(hs, hnode);
 
        /* This is only called once after recovery done. LU-8306. */
-       __ldlm_reprocess_all(res, LDLM_PROCESS_RECOVERY, NULL);
+       __ldlm_reprocess_all(res, LDLM_PROCESS_RECOVERY, 0);
        return 0;
 }
 
@@ -2441,7 +2447,7 @@ void ldlm_cancel_callback(struct ldlm_lock *lock)
 
                /* only canceller can set bl_done bit */
                ldlm_set_bl_done(lock);
-               wake_up_all(&lock->l_waitq);
+               wake_up(&lock->l_waitq);
        } else if (!ldlm_is_bl_done(lock)) {
                /* The lock is guaranteed to have been canceled once
                 * returning from this function. */
@@ -2549,7 +2555,7 @@ static void ldlm_cancel_lock_for_export(struct obd_export *exp,
        ldlm_lvbo_update(res, lock, NULL, 1);
        ldlm_lock_cancel(lock);
        if (!exp->exp_obd->obd_stopping)
-               ldlm_reprocess_all(res, lock);
+               ldlm_reprocess_all(res, lock->l_policy_data.l_inodebits.bits);
        ldlm_resource_putref(res);
 
        ecl->ecl_loop++;
@@ -2714,7 +2720,8 @@ void ldlm_lock_mode_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode)
        ldlm_grant_lock(lock, NULL);
        unlock_res_and_lock(lock);
 
-       ldlm_reprocess_all(lock->l_resource, lock);
+       ldlm_reprocess_all(lock->l_resource,
+                          lock->l_policy_data.l_inodebits.bits);
 
        EXIT;
 #endif
@@ -2757,15 +2764,11 @@ void _ldlm_lock_debug(struct ldlm_lock *lock,
        struct va_format vaf;
         char *nid = "local";
 
-       /* on server-side resource of lock doesn't change */
-       if ((lock->l_flags & LDLM_FL_NS_SRV) != 0) {
-               if (lock->l_resource != NULL)
-                       resource = ldlm_resource_getref(lock->l_resource);
-       } else if (spin_trylock(&lock->l_lock)) {
-               if (lock->l_resource != NULL)
-                       resource = ldlm_resource_getref(lock->l_resource);
-               spin_unlock(&lock->l_lock);
-       }
+       rcu_read_lock();
+       resource = rcu_dereference(lock->l_resource);
+       if (resource && !atomic_inc_not_zero(&resource->lr_refcount))
+               resource = NULL;
+       rcu_read_unlock();
 
         va_start(args, fmt);
        vaf.fmt = fmt;
@@ -2791,7 +2794,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock,
                                 lock->l_flags, nid,
                                 lock->l_remote_handle.cookie,
                                 exp ? refcount_read(&exp->exp_handle.h_ref) : -99,
-                                lock->l_pid, lock->l_callback_timeout,
+                                lock->l_pid, lock->l_callback_timestamp,
                                 lock->l_lvb_type);
                 va_end(args);
                 return;
@@ -2800,7 +2803,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock,
        switch (resource->lr_type) {
        case LDLM_EXTENT:
                libcfs_debug_msg(msgdata,
-                                "%pV ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: " DLDLMRES " rrc: %d type: %s [%llu->%llu] (req %llu->%llu) flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lld lvb_type: %d\n",
+                                "%pV ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: " DLDLMRES " rrc: %d type: %s [%llu->%llu] (req %llu->%llu) gid %llu flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lld lvb_type: %d\n",
                                 &vaf,
                                 ldlm_lock_to_ns_name(lock), lock,
                                 lock->l_handle.h_cookie,
@@ -2814,10 +2817,11 @@ void _ldlm_lock_debug(struct ldlm_lock *lock,
                                 lock->l_policy_data.l_extent.start,
                                 lock->l_policy_data.l_extent.end,
                                 lock->l_req_extent.start, lock->l_req_extent.end,
+                                lock->l_req_extent.gid,
                                 lock->l_flags, nid,
                                 lock->l_remote_handle.cookie,
                                 exp ? refcount_read(&exp->exp_handle.h_ref) : -99,
-                                lock->l_pid, lock->l_callback_timeout,
+                                lock->l_pid, lock->l_callback_timestamp,
                                 lock->l_lvb_type);
                break;
 
@@ -2840,12 +2844,12 @@ void _ldlm_lock_debug(struct ldlm_lock *lock,
                                 lock->l_flags, nid,
                                 lock->l_remote_handle.cookie,
                                 exp ? refcount_read(&exp->exp_handle.h_ref) : -99,
-                                lock->l_pid, lock->l_callback_timeout);
+                                lock->l_pid, lock->l_callback_timestamp);
                break;
 
        case LDLM_IBITS:
                libcfs_debug_msg(msgdata,
-                                "%pV ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: " DLDLMRES " bits %#llx/%#llx rrc: %d type: %s flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lld lvb_type: %d\n",
+                                "%pV ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s res: " DLDLMRES " bits %#llx/%#llx rrc: %d type: %s gid %llu flags: %#llx nid: %s remote: %#llx expref: %d pid: %u timeout: %lld lvb_type: %d\n",
                                 &vaf,
                                 ldlm_lock_to_ns_name(lock),
                                 lock, lock->l_handle.h_cookie,
@@ -2858,10 +2862,11 @@ void _ldlm_lock_debug(struct ldlm_lock *lock,
                                 lock->l_policy_data.l_inodebits.try_bits,
                                 atomic_read(&resource->lr_refcount),
                                 ldlm_typename[resource->lr_type],
+                                lock->l_policy_data.l_inodebits.li_gid,
                                 lock->l_flags, nid,
                                 lock->l_remote_handle.cookie,
                                 exp ? refcount_read(&exp->exp_handle.h_ref) : -99,
-                                lock->l_pid, lock->l_callback_timeout,
+                                lock->l_pid, lock->l_callback_timestamp,
                                 lock->l_lvb_type);
                break;
 
@@ -2881,7 +2886,7 @@ void _ldlm_lock_debug(struct ldlm_lock *lock,
                                 lock->l_flags, nid,
                                 lock->l_remote_handle.cookie,
                                 exp ? refcount_read(&exp->exp_handle.h_ref) : -99,
-                                lock->l_pid, lock->l_callback_timeout,
+                                lock->l_pid, lock->l_callback_timestamp,
                                 lock->l_lvb_type);
                break;
        }