From 1115184f2a0d38886239f61e553c8c5f992e0de4 Mon Sep 17 00:00:00 2001 From: pschwan Date: Fri, 21 Jun 2002 18:43:43 +0000 Subject: [PATCH] - If a completion callback arrives before the reply for the enqueue, AND the completion callback specifies a different resource than the original enqueue, fixup the resource. - When the enqueue reply does come back, recognize that we've already fixed up the resource and avoid doing it again. - I am ---> <--- this close to replacing all of the DLM locking, which is almost certainly a cesspool of deadlock opportunities, with lock_kernel() until lustre lite performance --- lustre/ldlm/ldlm_lockd.c | 125 ++++++++++++++++++++++---------------------- lustre/ldlm/ldlm_request.c | 18 +++++-- lustre/ldlm/ldlm_resource.c | 9 ++-- 3 files changed, 83 insertions(+), 69 deletions(-) diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 9f726e5..d313fcc 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -21,60 +21,6 @@ extern kmem_cache_t *ldlm_lock_slab; extern int (*mds_reint_p)(int offset, struct ptlrpc_request *req); extern int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req); -/* _ldlm_callback and local_callback setup the variables then call this common - * code */ -static int common_callback(struct ldlm_lock *lock, struct ldlm_lock *new, - ldlm_mode_t mode, void *data, __u32 data_len, - struct ptlrpc_request **reqp) -{ - ENTRY; - - if (!lock) - LBUG(); - if (!lock->l_resource) - LBUG(); - - ldlm_lock_dump(lock); - - spin_lock(&lock->l_resource->lr_lock); - spin_lock(&lock->l_lock); - if (!new) { - CDEBUG(D_INFO, "Got local completion AST for lock %p.\n", lock); - lock->l_req_mode = mode; - - /* FIXME: the API is flawed if I have to do these refcount - * acrobatics (along with the _put() below). */ - lock->l_resource->lr_refcount++; - - /* _del_lock is safe for half-created locks that are not yet on - * a list. */ - ldlm_resource_del_lock(lock); - ldlm_grant_lock(lock->l_resource, lock); - - ldlm_resource_put(lock->l_resource); - - wake_up(&lock->l_waitq); - spin_unlock(&lock->l_lock); - spin_unlock(&lock->l_resource->lr_lock); - } else { - CDEBUG(D_INFO, "Got local blocking AST for lock %p.\n", lock); - lock->l_flags |= LDLM_FL_DYING; - spin_unlock(&lock->l_lock); - spin_unlock(&lock->l_resource->lr_lock); - if (!lock->l_readers && !lock->l_writers) { - CDEBUG(D_INFO, "Lock already unused, calling " - "callback (%p).\n", lock->l_blocking_ast); - if (lock->l_blocking_ast != NULL) - lock->l_blocking_ast(lock, new, lock->l_data, - lock->l_data_len, reqp); - } else { - CDEBUG(D_INFO, "Lock still has references; lock will be" - " cancelled later.\n"); - } - } - RETURN(0); -} - static int _ldlm_enqueue(struct obd_device *obddev, struct ptlrpc_service *svc, struct ptlrpc_request *req) { @@ -227,7 +173,7 @@ static int _ldlm_callback(struct ptlrpc_service *svc, struct ptlrpc_request *req) { struct ldlm_request *dlm_req; - struct ldlm_lock *lock1, *lock2; + struct ldlm_lock *lock, *new; int rc; ENTRY; @@ -244,17 +190,72 @@ static int _ldlm_callback(struct ptlrpc_service *svc, if (rc != 0) RETURN(rc); - lock1 = lustre_handle2object(&dlm_req->lock_handle1); - lock2 = lustre_handle2object(&dlm_req->lock_handle2); + lock = lustre_handle2object(&dlm_req->lock_handle1); + new = lustre_handle2object(&dlm_req->lock_handle2); + + LDLM_DEBUG(lock, "client %s callback handler START", + new == NULL ? "completion" : "blocked"); + + spin_lock(&lock->l_resource->lr_lock); + spin_lock(&lock->l_lock); + if (!new) { + CDEBUG(D_INFO, "Got local completion AST for lock %p.\n", lock); + lock->l_req_mode = dlm_req->lock_desc.l_granted_mode; + + /* If we receive the completion AST before the actual enqueue + * returned, then we might need to switch resources. */ + if (memcmp(dlm_req->lock_desc.l_resource.lr_name, + lock->l_resource->lr_name, + sizeof(__u64) * RES_NAME_SIZE) != 0) { + struct ldlm_namespace *ns = + lock->l_resource->lr_namespace; + int type = lock->l_resource->lr_type; + + if (!ldlm_resource_put(lock->l_resource)) + spin_unlock(&lock->l_resource->lr_lock); + + lock->l_resource = ldlm_resource_get(ns, NULL, dlm_req->lock_desc.l_resource.lr_name, type, 1); + if (lock->l_resource == NULL) { + LBUG(); + RETURN(-ENOMEM); + } + spin_lock(&lock->l_resource->lr_lock); + LDLM_DEBUG(lock, "completion AST, new resource"); + } + + /* FIXME: the API is flawed if I have to do these refcount + * acrobatics (along with the _put() below). */ + lock->l_resource->lr_refcount++; + + /* _del_lock is safe for half-created locks that are not yet on + * a list. */ + ldlm_resource_del_lock(lock); + ldlm_grant_lock(lock->l_resource, lock); - LDLM_DEBUG(lock1, "client %s callback handler START", - lock2 == NULL ? "completion" : "blocked"); + ldlm_resource_put(lock->l_resource); - common_callback(lock1, lock2, dlm_req->lock_desc.l_granted_mode, NULL, - 0, NULL); + wake_up(&lock->l_waitq); + spin_unlock(&lock->l_lock); + spin_unlock(&lock->l_resource->lr_lock); + } else { + CDEBUG(D_INFO, "Got local blocking AST for lock %p.\n", lock); + lock->l_flags |= LDLM_FL_DYING; + spin_unlock(&lock->l_lock); + spin_unlock(&lock->l_resource->lr_lock); + if (!lock->l_readers && !lock->l_writers) { + CDEBUG(D_INFO, "Lock already unused, calling " + "callback (%p).\n", lock->l_blocking_ast); + if (lock->l_blocking_ast != NULL) + lock->l_blocking_ast(lock, new, lock->l_data, + lock->l_data_len, NULL); + } else { + CDEBUG(D_INFO, "Lock still has references; lock will be" + " cancelled later.\n"); + } + } LDLM_DEBUG_NOLOCK("client %s callback handler END (lock: %p)", - lock2 == NULL ? "completion" : "blocked", lock1); + new == NULL ? "completion" : "blocked", lock); RETURN(0); } diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 2942d3f..dff9907 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -74,8 +74,13 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, req->rq_replen = lustre_msg_size(1, &size); } + lock->l_connection = conn; + lock->l_client = cl; + rc = ptlrpc_queue_wait(req); rc = ptlrpc_check_status(req, rc); + + spin_lock(&lock->l_lock); if (rc != ELDLM_OK) { spin_lock(&lock->l_resource->lr_lock); ldlm_resource_put(lock->l_resource); @@ -86,8 +91,6 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, GOTO(out, rc); } - lock->l_connection = conn; - lock->l_client = cl; reply = lustre_msg_buf(req->rq_repmsg, 0); memcpy(&lock->l_remote_handle, &reply->lock_handle, sizeof(lock->l_remote_handle)); @@ -101,11 +104,17 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, (unsigned long long)reply->lock_extent.start, (unsigned long long)reply->lock_extent.end); - if (*flags & LDLM_FL_LOCK_CHANGED) { + /* If enqueue returned a blocked lock but the completion handler has + * already run, then it fixed up the resource and we don't need to do it + * again. */ + if (*flags & LDLM_FL_LOCK_CHANGED && + lock->l_req_mode != lock->l_granted_mode) { CDEBUG(D_INFO, "remote intent success, locking %ld instead of" "%ld\n", (long)reply->lock_resource_name[0], (long)lock->l_resource->lr_name[0]); - ldlm_resource_put(lock->l_resource); + spin_lock(&lock->l_resource->lr_lock); + if (!ldlm_resource_put(lock->l_resource)) + spin_unlock(&lock->l_resource->lr_lock); lock->l_resource = ldlm_resource_get(ns, NULL, reply->lock_resource_name, @@ -120,6 +129,7 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, if (!req_passed_in) ptlrpc_free_req(req); + spin_unlock(&lock->l_lock); rc = ldlm_local_lock_enqueue(lockh, cookie, cookielen, flags, callback, callback); diff --git a/lustre/ldlm/ldlm_resource.c b/lustre/ldlm/ldlm_resource.c index ebda971..39f2720 100644 --- a/lustre/ldlm/ldlm_resource.c +++ b/lustre/ldlm/ldlm_resource.c @@ -116,8 +116,10 @@ int ldlm_namespace_free(struct ldlm_namespace *ns) rc = cleanup_resource(res, &res->lr_waiting); if (rc == 0) { - CERROR("Resource refcount nonzero after lock " - "cleanup; forcing cleanup.\n"); + CERROR("Resource refcount nonzero (%d) after " + "lock cleanup; forcing cleanup.\n", + res->lr_refcount); + ldlm_resource_dump(res); res->lr_refcount = 1; rc = ldlm_resource_put(res); } @@ -352,7 +354,8 @@ void ldlm_resource_dump(struct ldlm_resource *res) (unsigned long long)res->lr_name[1], (unsigned long long)res->lr_name[2]); - CDEBUG(D_OTHER, "--- Resource: %p (%s)\n", res, name); + CDEBUG(D_OTHER, "--- Resource: %p (%s) (rc: %d)\n", res, name, + res->lr_refcount); CDEBUG(D_OTHER, "Namespace: %p (%s)\n", res->lr_namespace, res->lr_namespace->ns_name); CDEBUG(D_OTHER, "Parent: %p, root: %p\n", res->lr_parent, res->lr_root); -- 1.8.3.1