#include <linux/slab.h>
#include <linux/module.h>
-#include <linux/random.h>
#include <linux/lustre_dlm.h>
#include <linux/lustre_mds.h>
#include <linux/obd_class.h>
static ldlm_res_policy ldlm_intent_policy_func;
-static int ldlm_plain_policy(struct ldlm_lock *lock, void *req_cookie,
- ldlm_mode_t mode, int flags, void *data)
+static int ldlm_plain_policy(struct ldlm_namespace *ns, struct ldlm_lock *lock,
+ void *req_cookie, ldlm_mode_t mode, int flags,
+ void *data)
{
if ((flags & LDLM_FL_HAS_INTENT) && ldlm_intent_policy_func) {
- return ldlm_intent_policy_func(lock, req_cookie, mode, flags,
- data);
+ return ldlm_intent_policy_func(ns, lock, req_cookie, mode,
+ flags, data);
}
return ELDLM_OK;
EXIT;
}
+/* Only called with strict == 0 by recovery, to mark in-use locks as
+ * should-be-destroyed */
void ldlm_lock_destroy(struct ldlm_lock *lock)
{
ENTRY;
if (!list_empty(&lock->l_children)) {
LDLM_DEBUG(lock, "still has children (%p)!",
lock->l_children.next);
- ldlm_lock_dump(lock);
+ ldlm_lock_dump(D_ERROR, lock);
LBUG();
}
if (lock->l_readers || lock->l_writers) {
LDLM_DEBUG(lock, "lock still has references");
- ldlm_lock_dump(lock);
+ ldlm_lock_dump(D_OTHER, lock);
}
if (!list_empty(&lock->l_res_link)) {
- ldlm_lock_dump(lock);
+ ldlm_lock_dump(D_ERROR, lock);
LBUG();
}
list_del_init(&lock->l_export_chain);
ldlm_lock_remove_from_lru(lock);
+ portals_handle_unhash(&lock->l_handle);
#if 0
/* Wake anyone waiting for this lock */
if (lock == NULL)
RETURN(NULL);
- get_random_bytes(&lock->l_random, sizeof(__u64));
lock->l_resource = ldlm_resource_getref(resource);
atomic_set(&lock->l_refc, 2);
l_unlock(&parent->l_resource->lr_namespace->ns_lock);
}
+ INIT_LIST_HEAD(&lock->l_handle.h_link);
+ portals_handle_hash(&lock->l_handle, lock_handle_addref);
+
RETURN(lock);
}
-int ldlm_lock_change_resource(struct ldlm_lock *lock, __u64 new_resid[3])
+int ldlm_lock_change_resource(struct ldlm_namespace *ns, struct ldlm_lock *lock,
+ __u64 new_resid[3])
{
- struct ldlm_namespace *ns = lock->l_resource->lr_namespace;
struct ldlm_resource *oldres = lock->l_resource;
ENTRY;
void ldlm_lock2handle(struct ldlm_lock *lock, struct lustre_handle *lockh)
{
- lockh->addr = (__u64) (unsigned long)lock;
- lockh->cookie = lock->l_random;
+ //lockh->addr = (__u64)(unsigned long)lock;
+ memset(&lockh->addr, 0x69, sizeof(lockh->addr));
+ lockh->cookie = lock->l_handle.h_cookie;
}
-/*
- * if flags: atomically get the lock and set the flags.
- * Return NULL if flag already set
+/* if flags: atomically get the lock and set the flags.
+ * Return NULL if flag already set
*/
-struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *handle, int strict,
- int flags)
+struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *handle, int flags)
{
struct ldlm_lock *lock = NULL, *retval = NULL;
ENTRY;
LASSERT(handle);
- if (!handle->addr)
+ lock = portals_handle2object(handle->cookie);
+ if (lock == NULL)
RETURN(NULL);
- lock = (struct ldlm_lock *)(unsigned long)(handle->addr);
- if (!kmem_cache_validate(ldlm_lock_slab, (void *)lock)) {
- //CERROR("bogus lock %p\n", lock);
- GOTO(out2, retval);
- }
-
- if (lock->l_random != handle->cookie) {
- //CERROR("bogus cookie: lock %p has "LPX64" vs. handle "LPX64
- // "\n", lock, lock->l_random, handle->cookie);
- GOTO(out2, NULL);
- }
- if (!lock->l_resource) {
- CERROR("trying to lock bogus resource: lock %p\n", lock);
- //LDLM_DEBUG(lock, "ldlm_handle2lock(%p)", lock);
- GOTO(out2, retval);
- }
- if (!lock->l_resource->lr_namespace) {
- CERROR("trying to lock bogus namespace: lock %p\n", lock);
- //LDLM_DEBUG(lock, "ldlm_handle2lock(%p)", lock);
- GOTO(out2, retval);
- }
+ LASSERT(lock->l_resource != NULL);
+ LASSERT(lock->l_resource->lr_namespace != NULL);
l_lock(&lock->l_resource->lr_namespace->ns_lock);
- if (strict && lock->l_destroyed) {
+
+ /* It's unlikely but possible that someone marked the lock as
+ * destroyed after we did handle2object on it */
+ if (lock->l_destroyed) {
CERROR("lock already destroyed: lock %p\n", lock);
- //LDLM_DEBUG(lock, "ldlm_handle2lock(%p)", lock);
- GOTO(out, NULL);
+ LDLM_LOCK_PUT(lock);
+ GOTO(out, retval);
}
- if (flags && (lock->l_flags & flags))
- GOTO(out, NULL);
+ if (flags && (lock->l_flags & flags)) {
+ LDLM_LOCK_PUT(lock);
+ GOTO(out, retval);
+ }
if (flags)
lock->l_flags |= flags;
- retval = LDLM_LOCK_GET(lock);
+ retval = lock;
EXIT;
out:
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
- out2:
+ return retval;
+}
+
+struct ldlm_lock *ldlm_handle2lock_ns(struct ldlm_namespace *ns,
+ struct lustre_handle *handle)
+{
+ struct ldlm_lock *retval = NULL;
+
+ l_lock(&ns->ns_lock);
+ retval = __ldlm_handle2lock(handle, 0);
+ l_unlock(&ns->ns_lock);
+
return retval;
}
void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode)
{
- struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0, 0);
+ struct ldlm_lock *lock = __ldlm_handle2lock(lockh, 0);
struct ldlm_namespace *ns;
ENTRY;
LDLM_DEBUG(lock, "ldlm_lock_decref(%s)", ldlm_lockname[mode]);
ns = lock->l_resource->lr_namespace;
l_lock(&lock->l_resource->lr_namespace->ns_lock);
- if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR)
+ if (mode == LCK_NL || mode == LCK_CR || mode == LCK_PR) {
+ LASSERT(lock->l_readers > 0);
lock->l_readers--;
- else
+ } else {
+ LASSERT(lock->l_writers > 0);
lock->l_writers--;
+ }
/* If we received a blocked AST and this was the last reference,
* run the callback. */
ns->ns_nr_unused++;
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
ldlm_cancel_lru(ns);
- } else
+ } else {
l_unlock(&lock->l_resource->lr_namespace->ns_lock);
+ }
LDLM_LOCK_PUT(lock); /* matches the ldlm_lock_get in addref */
LDLM_LOCK_PUT(lock); /* matches the handle2lock above */
return lock;
}
-/* Must be called with lock->l_lock and lock->l_resource->lr_lock not held */
-ldlm_error_t ldlm_lock_enqueue(struct ldlm_lock * lock,
+ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,
+ struct ldlm_lock *lock,
void *cookie, int cookie_len,
int *flags,
ldlm_completion_callback completion,
if (!local && !(*flags & LDLM_FL_REPLAY) &&
(policy = ldlm_res_policy_table[res->lr_type])) {
int rc;
- rc = policy(lock, cookie, lock->l_req_mode, *flags, NULL);
+ rc = policy(ns, lock, cookie, lock->l_req_mode, *flags, NULL);
if (rc == ELDLM_LOCK_CHANGED) {
res = lock->l_resource;
}
}
- l_lock(&res->lr_namespace->ns_lock);
+ l_lock(&ns->ns_lock);
if (local && lock->l_req_mode == lock->l_granted_mode) {
/* The server returned a blocked lock, but it was granted before
* we got a chance to actually enqueue it. We don't need to do
* FIXME (bug 268): Detect obvious lies by checking compatibility in
* granted/converting queues. */
ldlm_resource_unlink_lock(lock);
- if (local || (*flags & LDLM_FL_REPLAY)) {
+ if (local) {
if (*flags & LDLM_FL_BLOCK_CONV)
ldlm_resource_add_lock(res, res->lr_converting.prev,
lock);
else
ldlm_grant_lock(lock);
GOTO(out, ELDLM_OK);
+ } else if (*flags & LDLM_FL_REPLAY) {
+ if (*flags & LDLM_FL_BLOCK_CONV) {
+ ldlm_resource_add_lock(res, res->lr_converting.prev,
+ lock);
+ GOTO(out, ELDLM_OK);
+ } else if (*flags & LDLM_FL_BLOCK_WAIT) {
+ ldlm_resource_add_lock(res, res->lr_waiting.prev, lock);
+ GOTO(out, ELDLM_OK);
+ } else if (*flags & LDLM_FL_BLOCK_GRANTED) {
+ ldlm_grant_lock(lock);
+ GOTO(out, ELDLM_OK);
+ }
+ /* If no flags, fall through to normal enqueue path. */
}
/* FIXME: We may want to optimize by checking lr_most_restr */
ldlm_grant_lock(lock);
EXIT;
out:
- l_unlock(&res->lr_namespace->ns_lock);
+ l_unlock(&ns->ns_lock);
/* Don't set 'completion_ast' until here so that if the lock is granted
* immediately we don't do an unnecessary completion call. */
lock->l_completion_ast = completion;
RETURN(0);
}
-void ldlm_run_ast_work(struct list_head *rpc_list)
+int ldlm_run_ast_work(struct list_head *rpc_list)
{
struct list_head *tmp, *pos;
- int rc;
+ int rc, retval = 0;
ENTRY;
list_for_each_safe(tmp, pos, rpc_list) {
w->w_datalen, LDLM_CB_BLOCKING);
else
rc = w->w_lock->l_completion_ast(w->w_lock, w->w_flags);
- if (rc)
+ if (rc == -ERESTART)
+ retval = rc;
+ else if (rc)
CERROR("Failed AST - should clean & disconnect "
"client\n");
LDLM_LOCK_PUT(w->w_lock);
list_del(&w->w_list);
OBD_FREE(w, sizeof(*w));
}
- EXIT;
+ RETURN(retval);
+}
+
+static int reprocess_one_queue(struct ldlm_resource *res, void *closure)
+{
+ ldlm_reprocess_all(res);
+ return LDLM_ITER_CONTINUE;
+}
+
+void ldlm_reprocess_all_ns(struct ldlm_namespace *ns)
+{
+ (void)ldlm_namespace_foreach_res(ns, reprocess_one_queue, NULL);
}
/* Must be called with resource->lr_lock not taken. */
void ldlm_reprocess_all(struct ldlm_resource *res)
{
struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
+ int rc;
ENTRY;
/* Local lock trees don't get reprocessed. */
return;
}
+ restart:
l_lock(&res->lr_namespace->ns_lock);
res->lr_tmp = &rpc_list;
res->lr_tmp = NULL;
l_unlock(&res->lr_namespace->ns_lock);
- ldlm_run_ast_work(&rpc_list);
+ rc = ldlm_run_ast_work(&rpc_list);
+ if (rc == -ERESTART)
+ goto restart;
EXIT;
}
ns = res->lr_namespace;
l_lock(&ns->ns_lock);
+ /* Please do not, no matter how tempting, remove this LBUG without
+ * talking to me first. -phik */
if (lock->l_readers || lock->l_writers) {
LDLM_DEBUG(lock, "lock still has references");
- ldlm_lock_dump(lock);
- //LBUG();
+ ldlm_lock_dump(D_OTHER, lock);
+ LBUG();
}
ldlm_cancel_callback(lock);
RETURN(res);
}
-void ldlm_lock_dump(struct ldlm_lock *lock)
+void ldlm_lock_dump(int level, struct ldlm_lock *lock)
{
char ver[128];
- if (!(portal_debug & D_OTHER))
+ if (!(portal_debug & level))
return;
if (RES_VERSION_SIZE != 4)
LBUG();
if (!lock) {
- CDEBUG(D_OTHER, " NULL LDLM lock\n");
+ CDEBUG(level, " NULL LDLM lock\n");
return;
}
lock->l_version[0], lock->l_version[1],
lock->l_version[2], lock->l_version[3]);
- CDEBUG(D_OTHER, " -- Lock dump: %p (%s)\n", lock, ver);
+ CDEBUG(level, " -- Lock dump: %p (%s)\n", lock, ver);
if (lock->l_export && lock->l_export->exp_connection)
- CDEBUG(D_OTHER, " Node: NID %x (rhandle: "LPX64")\n",
+ CDEBUG(level, " Node: NID %x (rhandle: "LPX64")\n",
lock->l_export->exp_connection->c_peer.peer_nid,
- lock->l_remote_handle.addr);
+ lock->l_remote_handle.cookie);
else
- CDEBUG(D_OTHER, " Node: local\n");
- CDEBUG(D_OTHER, " Parent: %p\n", lock->l_parent);
- CDEBUG(D_OTHER, " Resource: %p ("LPD64")\n", lock->l_resource,
+ CDEBUG(level, " Node: local\n");
+ CDEBUG(level, " Parent: %p\n", lock->l_parent);
+ CDEBUG(level, " Resource: %p ("LPD64")\n", lock->l_resource,
lock->l_resource->lr_name[0]);
- CDEBUG(D_OTHER, " Requested mode: %d, granted mode: %d\n",
+ CDEBUG(level, " Requested mode: %d, granted mode: %d\n",
(int)lock->l_req_mode, (int)lock->l_granted_mode);
- CDEBUG(D_OTHER, " Readers: %u ; Writers; %u\n",
+ CDEBUG(level, " Readers: %u ; Writers; %u\n",
lock->l_readers, lock->l_writers);
if (lock->l_resource->lr_type == LDLM_EXTENT)
- CDEBUG(D_OTHER, " Extent: %Lu -> %Lu\n",
- (unsigned long long)lock->l_extent.start,
- (unsigned long long)lock->l_extent.end);
+ CDEBUG(level, " Extent: "LPU64" -> "LPU64"\n",
+ lock->l_extent.start, lock->l_extent.end);
}
-void ldlm_lock_dump_handle(struct lustre_handle *lockh)
+void ldlm_lock_dump_handle(int level, struct lustre_handle *lockh)
{
struct ldlm_lock *lock;
if (lock == NULL)
return;
- ldlm_lock_dump(lock);
+ ldlm_lock_dump(D_OTHER, lock);
LDLM_LOCK_PUT(lock);
}