*
* You should have received a copy of the GNU General Public License
* version 2 along with this program; If not, see
- * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * http://www.gnu.org/licenses/gpl-2.0.html
*
* GPL HEADER END
*/
* Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2010, 2015, Intel Corporation.
+ * Copyright (c) 2010, 2016, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#include "ldlm_internal.h"
+struct kmem_cache *ldlm_glimpse_work_kmem;
+EXPORT_SYMBOL(ldlm_glimpse_work_kmem);
+
/* lock types */
char *ldlm_lockname[] = {
[0] = "--",
* otherwise, the lock hasn't been in the LRU list.
* \retval 1 the lock was in LRU list and removed.
*/
-int ldlm_lock_remove_from_lru_check(struct ldlm_lock *lock, cfs_time_t last_use)
+int ldlm_lock_remove_from_lru_check(struct ldlm_lock *lock, ktime_t last_use)
{
struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
int rc = 0;
}
spin_lock(&ns->ns_lock);
- if (last_use == 0 || last_use == lock->l_last_used)
+ if (!ktime_compare(last_use, ktime_set(0, 0)) ||
+ !ktime_compare(last_use, lock->l_last_used))
rc = ldlm_lock_remove_from_lru_nolock(lock);
spin_unlock(&ns->ns_lock);
{
struct ldlm_namespace *ns = ldlm_lock_to_ns(lock);
- lock->l_last_used = cfs_time_current();
+ lock->l_last_used = ktime_get();
LASSERT(list_empty(&lock->l_lru));
LASSERT(lock->l_resource->lr_type != LDLM_FLOCK);
list_add_tail(&lock->l_lru, &ns->ns_unused_list);
struct ldlm_lock *lock;
lock = ldlm_handle2lock(lockh);
- LASSERTF(lock != NULL, "Non-existing lock: "LPX64"\n", lockh->cookie);
+ LASSERTF(lock != NULL, "Non-existing lock: %#llx\n", lockh->cookie);
ldlm_lock_addref_internal(lock, mode);
LDLM_LOCK_PUT(lock);
}
void ldlm_lock_decref(const struct lustre_handle *lockh, enum ldlm_mode mode)
{
struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
- LASSERTF(lock != NULL, "Non-existing lock: "LPX64"\n", lockh->cookie);
+ LASSERTF(lock != NULL, "Non-existing lock: %#llx\n", lockh->cookie);
ldlm_lock_decref_internal(lock, mode);
LDLM_LOCK_PUT(lock);
}
}
out2:
if (rc) {
- LDLM_DEBUG(lock, "matched ("LPU64" "LPU64")",
+ LDLM_DEBUG(lock, "matched (%llu %llu)",
(type == LDLM_PLAIN || type == LDLM_IBITS) ?
res_id->name[2] : policy->l_extent.start,
(type == LDLM_PLAIN || type == LDLM_IBITS) ?
} else if (!(flags & LDLM_FL_TEST_LOCK)) {/*less verbose for test-only*/
LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res "
- LPU64"/"LPU64" ("LPU64" "LPU64")", ns,
+ "%llu/%llu (%llu %llu)", ns,
type, mode, res_id->name[0], res_id->name[1],
(type == LDLM_PLAIN || type == LDLM_IBITS) ?
res_id->name[2] :policy->l_extent.start,
}
policy = ldlm_processing_policy_table[res->lr_type];
- policy(lock, flags, 1, &rc, NULL);
+ policy(lock, flags, LDLM_PROCESS_ENQUEUE, &rc, NULL);
GOTO(out, rc);
#else
} else {
* Must be called with resource lock held.
*/
int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,
- struct list_head *work_list)
+ struct list_head *work_list,
+ enum ldlm_process_intention intention)
{
struct list_head *tmp, *pos;
ldlm_processing_policy policy;
policy = ldlm_processing_policy_table[res->lr_type];
LASSERT(policy);
+ LASSERT(intention == LDLM_PROCESS_RESCAN ||
+ intention == LDLM_PROCESS_RECOVERY);
list_for_each_safe(tmp, pos, queue) {
struct ldlm_lock *pending;
CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);
flags = 0;
- rc = policy(pending, &flags, 0, &err, work_list);
- if (rc != LDLM_ITER_CONTINUE)
- break;
+ rc = policy(pending, &flags, intention, &err, work_list);
+ /*
+ * When this is called from recovery done, we always want
+ * to scan the whole list no matter what 'rc' is returned.
+ */
+ if (rc != LDLM_ITER_CONTINUE &&
+ intention == LDLM_PROCESS_RESCAN)
+ break;
}
- RETURN(rc);
+ RETURN(intention == LDLM_PROCESS_RESCAN ? rc : LDLM_ITER_CONTINUE);
+}
+
+/**
+ * Conflicting locks are detected for a lock to be enqueued, add the lock
+ * into waiting list and send blocking ASTs to the conflicting locks.
+ *
+ * \param[in] lock The lock to be enqueued.
+ * \param[out] flags Lock flags for the lock to be enqueued.
+ * \param[in] rpc_list Conflicting locks list.
+ * \param[in] grant_flags extra flags when granting a lock.
+ *
+ * \retval -ERESTART: Some lock was instantly canceled while sending
+ * blocking ASTs, caller needs to re-check conflicting
+ * locks.
+ * \retval -EAGAIN: Lock was destroyed, caller should return error.
+ * \reval 0: Lock is successfully added in waiting list.
+ */
+int ldlm_handle_conflict_lock(struct ldlm_lock *lock, __u64 *flags,
+ struct list_head *rpc_list, __u64 grant_flags)
+{
+ struct ldlm_resource *res = lock->l_resource;
+ int rc;
+ ENTRY;
+
+ check_res_locked(res);
+
+ /* If either of the compat_queue()s returned failure, then we
+ * have ASTs to send and must go onto the waiting list.
+ *
+ * bug 2322: we used to unlink and re-add here, which was a
+ * terrible folly -- if we goto restart, we could get
+ * re-ordered! Causes deadlock, because ASTs aren't sent! */
+ if (list_empty(&lock->l_res_link))
+ ldlm_resource_add_lock(res, &res->lr_waiting, lock);
+ unlock_res(res);
+
+ rc = ldlm_run_ast_work(ldlm_res_to_ns(res), rpc_list,
+ LDLM_WORK_BL_AST);
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_OST_FAIL_RACE) &&
+ !ns_is_client(ldlm_res_to_ns(res)))
+ class_fail_export(lock->l_export);
+
+ lock_res(res);
+ if (rc == -ERESTART) {
+ /* 15715: The lock was granted and destroyed after
+ * resource lock was dropped. Interval node was freed
+ * in ldlm_lock_destroy. Anyway, this always happens
+ * when a client is being evicted. So it would be
+ * ok to return an error. -jay */
+ if (ldlm_is_destroyed(lock))
+ RETURN(-EAGAIN);
+
+ /* lock was granted while resource was unlocked. */
+ if (lock->l_granted_mode == lock->l_req_mode) {
+ /* bug 11300: if the lock has been granted,
+ * break earlier because otherwise, we will go
+ * to restart and ldlm_resource_unlink will be
+ * called and it causes the interval node to be
+ * freed. Then we will fail at
+ * ldlm_extent_add_lock() */
+ *flags &= ~LDLM_FL_BLOCKED_MASK;
+ RETURN(0);
+ }
+
+ RETURN(rc);
+ }
+ *flags |= (LDLM_FL_BLOCK_GRANTED | grant_flags);
+
+ RETURN(0);
}
+
+/**
+ * Discard all AST work items from list.
+ *
+ * If for whatever reason we do not want to send ASTs to conflicting locks
+ * anymore, disassemble the list with this function.
+ */
+void ldlm_discard_bl_list(struct list_head *bl_list)
+{
+ struct list_head *tmp, *pos;
+ ENTRY;
+
+ list_for_each_safe(pos, tmp, bl_list) {
+ struct ldlm_lock *lock =
+ list_entry(pos, struct ldlm_lock, l_bl_ast);
+
+ list_del_init(&lock->l_bl_ast);
+ LASSERT(ldlm_is_ast_sent(lock));
+ ldlm_clear_ast_sent(lock);
+ LASSERT(lock->l_bl_ast_run == 0);
+ LASSERT(lock->l_blocking_lock);
+ LDLM_LOCK_RELEASE(lock->l_blocking_lock);
+ lock->l_blocking_lock = NULL;
+ LDLM_LOCK_RELEASE(lock);
+ }
+ EXIT;
+}
+
#endif
/**
/* transfer the glimpse descriptor to ldlm_cb_set_arg */
arg->gl_desc = gl_work->gl_desc;
+ arg->gl_interpret_reply = gl_work->gl_interpret_reply;
+ arg->gl_interpret_data = gl_work->gl_interpret_data;
/* invoke the actual glimpse callback */
if (lock->l_glimpse_ast(lock, (void*)arg) == 0)
rc = 1;
LDLM_LOCK_RELEASE(lock);
-
- if ((gl_work->gl_flags & LDLM_GL_WORK_NOFREE) == 0)
+ if (gl_work->gl_flags & LDLM_GL_WORK_SLAB_ALLOCATED)
+ OBD_SLAB_FREE_PTR(gl_work, ldlm_glimpse_work_kmem);
+ else
OBD_FREE_PTR(gl_work);
RETURN(rc);
return rc;
}
-static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
-{
- ldlm_reprocess_all(res);
- return LDLM_ITER_CONTINUE;
-}
-
-static int ldlm_reprocess_res(struct cfs_hash *hs, struct cfs_hash_bd *bd,
- struct hlist_node *hnode, void *arg)
-{
- struct ldlm_resource *res = cfs_hash_object(hs, hnode);
- int rc;
-
- rc = reprocess_one_queue(res, arg);
-
- return rc == LDLM_ITER_STOP;
-}
-
-/**
- * Iterate through all resources on a namespace attempting to grant waiting
- * locks.
- */
-void ldlm_reprocess_all_ns(struct ldlm_namespace *ns)
-{
- ENTRY;
-
- if (ns != NULL) {
- cfs_hash_for_each_nolock(ns->ns_rs_hash,
- ldlm_reprocess_res, NULL, 0);
- }
- EXIT;
-}
-
/**
* Try to grant all waiting locks on a resource.
*
* Typically called after some resource locks are cancelled to see
* if anything could be granted as a result of the cancellation.
*/
-void ldlm_reprocess_all(struct ldlm_resource *res)
+static void __ldlm_reprocess_all(struct ldlm_resource *res,
+ enum ldlm_process_intention intention)
{
struct list_head rpc_list;
#ifdef HAVE_SERVER_SUPPORT
+ struct obd_device *obd;
int rc;
ENTRY;
return;
}
+ /* Disable reprocess during lock replay stage but allow during
+ * request replay stage.
+ */
+ obd = ldlm_res_to_ns(res)->ns_obd;
+ if (obd->obd_recovering &&
+ atomic_read(&obd->obd_req_replay_clients) == 0)
+ RETURN_EXIT;
restart:
- lock_res(res);
- rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list);
- if (rc == LDLM_ITER_CONTINUE)
- ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list);
- unlock_res(res);
+ lock_res(res);
+ rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list,
+ intention);
+ if (rc == LDLM_ITER_CONTINUE)
+ ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list,
+ intention);
+ unlock_res(res);
rc = ldlm_run_ast_work(ldlm_res_to_ns(res), &rpc_list,
LDLM_WORK_CP_AST);
#endif
EXIT;
}
+
+void ldlm_reprocess_all(struct ldlm_resource *res)
+{
+ __ldlm_reprocess_all(res, LDLM_PROCESS_RESCAN);
+}
EXPORT_SYMBOL(ldlm_reprocess_all);
+static int ldlm_reprocess_res(struct cfs_hash *hs, struct cfs_hash_bd *bd,
+ struct hlist_node *hnode, void *arg)
+{
+ struct ldlm_resource *res = cfs_hash_object(hs, hnode);
+
+ /* This is only called once after recovery done. LU-8306. */
+ __ldlm_reprocess_all(res, LDLM_PROCESS_RECOVERY);
+ return 0;
+}
+
+/**
+ * Iterate through all resources on a namespace attempting to grant waiting
+ * locks.
+ */
+void ldlm_reprocess_recovery_done(struct ldlm_namespace *ns)
+{
+ ENTRY;
+
+ if (ns != NULL) {
+ cfs_hash_for_each_nolock(ns->ns_rs_hash,
+ ldlm_reprocess_res, NULL, 0);
+ }
+ EXIT;
+}
+
static bool is_bl_done(struct ldlm_lock *lock)
{
bool bl_done = true;
res = ldlm_resource_getref(lock->l_resource);
- ldlm_res_lvbo_update(res, NULL, 1);
+ ldlm_lvbo_update(res, lock, NULL, 1);
ldlm_lock_cancel(lock);
if (!exp->exp_obd->obd_stopping)
ldlm_reprocess_all(res);
"left on hash table %d.\n", exp, ecl.ecl_loop,
atomic_read(&exp->exp_lock_hash->hs_count));
+ if (ecl.ecl_loop > 0 &&
+ atomic_read(&exp->exp_lock_hash->hs_count) == 0 &&
+ exp->exp_obd->obd_stopping)
+ ldlm_reprocess_recovery_done(exp->exp_obd->obd_namespace);
+
return ecl.ecl_loop;
}
/**
* Downgrade an exclusive lock.
*
- * A fast variant of ldlm_lock_convert for convertion of exclusive
- * locks. The convertion is always successful.
+ * A fast variant of ldlm_lock_convert for convertion of exclusive locks. The
+ * convertion may fail if lock was canceled before downgrade, but it doesn't
+ * indicate any problem, because such lock has no reader or writer, and will
+ * be released soon.
* Used by Commit on Sharing (COS) code.
*
* \param lock A lock to convert
*/
void ldlm_lock_downgrade(struct ldlm_lock *lock, enum ldlm_mode new_mode)
{
- ENTRY;
+ ENTRY;
- LASSERT(lock->l_granted_mode & (LCK_PW | LCK_EX));
- LASSERT(new_mode == LCK_COS);
+ LASSERT(new_mode == LCK_COS);
- lock_res_and_lock(lock);
- ldlm_resource_unlink_lock(lock);
- /*
- * Remove the lock from pool as it will be added again in
- * ldlm_grant_lock() called below.
- */
- ldlm_pool_del(&ldlm_lock_to_ns(lock)->ns_pool, lock);
+ lock_res_and_lock(lock);
- lock->l_req_mode = new_mode;
- ldlm_grant_lock(lock, NULL);
- unlock_res_and_lock(lock);
- ldlm_reprocess_all(lock->l_resource);
+ if (!(lock->l_granted_mode & (LCK_PW | LCK_EX))) {
+ unlock_res_and_lock(lock);
- EXIT;
+ LASSERT(lock->l_granted_mode == LCK_MINMODE);
+ LDLM_DEBUG(lock, "lock was canceled before downgrade");
+ RETURN_EXIT;
+ }
+
+ ldlm_resource_unlink_lock(lock);
+ /*
+ * Remove the lock from pool as it will be added again in
+ * ldlm_grant_lock() called below.
+ */
+ ldlm_pool_del(&ldlm_lock_to_ns(lock)->ns_pool, lock);
+ lock->l_req_mode = new_mode;
+ ldlm_grant_lock(lock, NULL);
+
+ unlock_res_and_lock(lock);
+
+ ldlm_reprocess_all(lock->l_resource);
+
+ EXIT;
}
EXPORT_SYMBOL(ldlm_lock_downgrade);
ldlm_processing_policy policy;
policy = ldlm_processing_policy_table[res->lr_type];
- rc = policy(lock, &pflags, 0, &err, &rpc_list);
+ rc = policy(lock, &pflags, LDLM_PROCESS_RESCAN, &err,
+ &rpc_list);
if (rc == LDLM_ITER_STOP) {
lock->l_req_mode = old_mode;
if (res->lr_type == LDLM_EXTENT)
{
va_list args;
struct obd_export *exp = lock->l_export;
- struct ldlm_resource *resource = lock->l_resource;
+ struct ldlm_resource *resource = NULL;
char *nid = "local";
+ /* on server-side resource of lock doesn't change */
+ if ((lock->l_flags & LDLM_FL_NS_SRV) != 0) {
+ if (lock->l_resource != NULL)
+ resource = ldlm_resource_getref(lock->l_resource);
+ } else if (spin_trylock(&lock->l_lock)) {
+ if (lock->l_resource != NULL)
+ resource = ldlm_resource_getref(lock->l_resource);
+ spin_unlock(&lock->l_lock);
+ }
+
va_start(args, fmt);
if (exp && exp->exp_connection) {
if (resource == NULL) {
libcfs_debug_vmsg2(msgdata, fmt, args,
- " ns: \?\? lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
- "res: \?\? rrc=\?\? type: \?\?\? flags: "LPX64" nid: %s "
- "remote: "LPX64" expref: %d pid: %u timeout: %lu "
+ " ns: \?\? lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s "
+ "res: \?\? rrc=\?\? type: \?\?\? flags: %#llx nid: %s "
+ "remote: %#llx expref: %d pid: %u timeout: %lu "
"lvb_type: %d\n",
lock,
lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
switch (resource->lr_type) {
case LDLM_EXTENT:
libcfs_debug_vmsg2(msgdata, fmt, args,
- " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
- "res: "DLDLMRES" rrc: %d type: %s ["LPU64"->"LPU64"] "
- "(req "LPU64"->"LPU64") flags: "LPX64" nid: %s remote: "
- LPX64" expref: %d pid: %u timeout: %lu lvb_type: %d\n",
+ " ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s "
+ "res: "DLDLMRES" rrc: %d type: %s [%llu->%llu] "
+ "(req %llu->%llu) flags: %#llx nid: %s remote: "
+ "%#llx expref: %d pid: %u timeout: %lu lvb_type: %d\n",
ldlm_lock_to_ns_name(lock), lock,
lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
lock->l_readers, lock->l_writers,
case LDLM_FLOCK:
libcfs_debug_vmsg2(msgdata, fmt, args,
- " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
+ " ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s "
"res: "DLDLMRES" rrc: %d type: %s pid: %d "
- "["LPU64"->"LPU64"] flags: "LPX64" nid: %s "
- "remote: "LPX64" expref: %d pid: %u timeout: %lu\n",
+ "[%llu->%llu] flags: %#llx nid: %s "
+ "remote: %#llx expref: %d pid: %u timeout: %lu\n",
ldlm_lock_to_ns_name(lock), lock,
lock->l_handle.h_cookie, atomic_read(&lock->l_refc),
lock->l_readers, lock->l_writers,
case LDLM_IBITS:
libcfs_debug_vmsg2(msgdata, fmt, args,
- " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
- "res: "DLDLMRES" bits "LPX64" rrc: %d type: %s "
- "flags: "LPX64" nid: %s remote: "LPX64" expref: %d "
+ " ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s "
+ "res: "DLDLMRES" bits %#llx/%#llx rrc: %d type: %s "
+ "flags: %#llx nid: %s remote: %#llx expref: %d "
"pid: %u timeout: %lu lvb_type: %d\n",
ldlm_lock_to_ns_name(lock),
lock, lock->l_handle.h_cookie,
ldlm_lockname[lock->l_req_mode],
PLDLMRES(resource),
lock->l_policy_data.l_inodebits.bits,
+ lock->l_policy_data.l_inodebits.try_bits,
atomic_read(&resource->lr_refcount),
ldlm_typename[resource->lr_type],
lock->l_flags, nid, lock->l_remote_handle.cookie,
default:
libcfs_debug_vmsg2(msgdata, fmt, args,
- " ns: %s lock: %p/"LPX64" lrc: %d/%d,%d mode: %s/%s "
- "res: "DLDLMRES" rrc: %d type: %s flags: "LPX64" "
- "nid: %s remote: "LPX64" expref: %d pid: %u "
+ " ns: %s lock: %p/%#llx lrc: %d/%d,%d mode: %s/%s "
+ "res: "DLDLMRES" rrc: %d type: %s flags: %#llx "
+ "nid: %s remote: %#llx expref: %d pid: %u "
"timeout: %lu lvb_type: %d\n",
ldlm_lock_to_ns_name(lock),
lock, lock->l_handle.h_cookie,
break;
}
va_end(args);
+ ldlm_resource_putref(resource);
}
EXPORT_SYMBOL(_ldlm_lock_debug);