From b2de0e76a0a1ac27e26755805362df0f06f00dd4 Mon Sep 17 00:00:00 2001 From: pschwan Date: Mon, 11 Nov 2002 19:00:40 +0000 Subject: [PATCH 1/1] Fixes to the lock LRU --- lustre/include/linux/lustre_dlm.h | 1 + lustre/ldlm/ldlm_lock.c | 39 +++++++++++----------- lustre/ldlm/ldlm_lockd.c | 1 - lustre/ldlm/ldlm_request.c | 70 ++++++++++++++++++++++++++++++++++----- 4 files changed, 82 insertions(+), 29 deletions(-) diff --git a/lustre/include/linux/lustre_dlm.h b/lustre/include/linux/lustre_dlm.h index cc2c268..d63d095 100644 --- a/lustre/include/linux/lustre_dlm.h +++ b/lustre/include/linux/lustre_dlm.h @@ -410,6 +410,7 @@ int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *new, int ldlm_cli_convert(struct lustre_handle *, int new_mode, int *flags); int ldlm_cli_cancel(struct lustre_handle *lockh); int ldlm_cli_cancel_unused(struct ldlm_namespace *, __u64 *, int flags); +int ldlm_cancel_lru(struct ldlm_namespace *ns); /* mds/handler.c */ /* This has to be here because recurisve inclusion sucks. */ diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index 6ac4a4a..abb2a70 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -210,6 +210,7 @@ void ldlm_lock_destroy(struct ldlm_lock *lock) } if (lock->l_flags & LDLM_FL_DESTROYED) { + LASSERT(list_empty(&lock->l_lru)); l_unlock(&lock->l_resource->lr_namespace->ns_lock); EXIT; return; @@ -286,7 +287,7 @@ int ldlm_lock_change_resource(struct ldlm_lock *lock, __u64 new_resid[3]) { struct ldlm_namespace *ns = lock->l_resource->lr_namespace; struct ldlm_resource *oldres = lock->l_resource; - int type, i; + int i; ENTRY; l_lock(&ns->ns_lock); @@ -297,10 +298,13 @@ int ldlm_lock_change_resource(struct ldlm_lock *lock, __u64 new_resid[3]) RETURN(0); } - type = lock->l_resource->lr_type; - if (new_resid[0] == 0) - LBUG(); - lock->l_resource = ldlm_resource_get(ns, NULL, new_resid, type, 1); + LASSERT(new_resid[0] != 0); + + /* This function assumes that the lock isn't on any lists */ + LASSERT(list_empty(&lock->l_res_link)); + + lock->l_resource = ldlm_resource_get(ns, NULL, new_resid, + lock->l_resource->lr_type, 1); if (lock->l_resource == NULL) { LBUG(); RETURN(-ENOMEM); @@ -343,7 +347,7 @@ struct ldlm_lock *__ldlm_handle2lock(struct lustre_handle *handle, //spin_lock(&ldlm_handle_lock); lock = (struct ldlm_lock *)(unsigned long)(handle->addr); if (!kmem_cache_validate(ldlm_lock_slab, (void *)lock)) { - CERROR("bogus lock %p\n", lock); + //CERROR("bogus lock %p\n", lock); GOTO(out2, retval); } @@ -489,18 +493,13 @@ void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode) /* FIXME: need a real 'desc' here */ lock->l_blocking_ast(lock, NULL, lock->l_data, lock->l_data_len, LDLM_CB_BLOCKING); - } else if (!lock->l_readers && !lock->l_writers) { + } else if (ns->ns_client && !lock->l_readers && !lock->l_writers) { LASSERT(list_empty(&lock->l_lru)); LASSERT(ns->ns_nr_unused >= 0); list_add_tail(&lock->l_lru, &ns->ns_unused_list); ns->ns_nr_unused++; - if (ns->ns_client && ns->ns_nr_unused >= ns->ns_max_unused) { - CDEBUG(D_DLMTRACE, "%d unused (max %d), cancelling " - "LRU\n", ns->ns_nr_unused, ns->ns_max_unused); - ldlm_cli_cancel_unused_resource - (ns, lock->l_resource->lr_name, LDLM_FL_REDUCE); - } l_unlock(&lock->l_resource->lr_namespace->ns_lock); + ldlm_cancel_lru(ns); } else l_unlock(&lock->l_resource->lr_namespace->ns_lock); @@ -602,8 +601,6 @@ static struct ldlm_lock *search_queue(struct list_head *queue, ldlm_mode_t mode, if (lock->l_flags & (LDLM_FL_CBPENDING | LDLM_FL_DESTROYED)) continue; - /* lock_convert() takes the resource lock, so we're sure that - * req_mode and lr_type won't change beneath us */ if (lock->l_req_mode != mode) continue; @@ -654,7 +651,6 @@ int ldlm_lock_match(struct ldlm_namespace *ns, __u64 *res_id, __u32 type, RETURN(0); } - ns = res->lr_namespace; l_lock(&ns->ns_lock); if ((lock = search_queue(&res->lr_granted, mode, cookie, old_lock))) @@ -889,8 +885,12 @@ void ldlm_cancel_callback(struct ldlm_lock *lock) l_lock(&lock->l_resource->lr_namespace->ns_lock); if (!(lock->l_flags & LDLM_FL_CANCEL)) { lock->l_flags |= LDLM_FL_CANCEL; - lock->l_blocking_ast(lock, NULL, lock->l_data, - lock->l_data_len, LDLM_CB_CANCELING); + if (lock->l_blocking_ast) + lock->l_blocking_ast(lock, NULL, lock->l_data, + lock->l_data_len, + LDLM_CB_CANCELING); + else + LDLM_DEBUG(lock, "no blocking ast"); } l_unlock(&lock->l_resource->lr_namespace->ns_lock); } @@ -906,8 +906,7 @@ void ldlm_lock_cancel(struct ldlm_lock *lock) l_lock(&ns->ns_lock); if (lock->l_readers || lock->l_writers) - CDEBUG(D_INFO, "lock still has references (%d readers, %d " - "writers)\n", lock->l_readers, lock->l_writers); + LDLM_DEBUG(lock, "lock still has references"); ldlm_cancel_callback(lock); diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 3319f03..739b609 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -350,7 +350,6 @@ int ldlm_handle_cancel(struct ptlrpc_request *req) lock = ldlm_handle2lock(&dlm_req->lock_handle1); if (!lock) { - CERROR("bad lock handle\n"); LDLM_DEBUG_NOLOCK("server-side cancel handler stale lock (lock " "%p)", (void *)(unsigned long) dlm_req->lock_handle1.addr); diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 7aa9694..778e505 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -442,7 +442,7 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) * impossible (by adding a dec_and_cancel() or similar), then * we can put the LBUG back. */ //LBUG(); - RETURN(-EINVAL); + RETURN(0); } if (lock->l_connh) { @@ -492,6 +492,64 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) return rc; } +int ldlm_cancel_lru(struct ldlm_namespace *ns) +{ + struct list_head *tmp, *next, list = LIST_HEAD_INIT(list); + int count, rc = 0; + struct ldlm_ast_work *w; + ENTRY; + + l_lock(&ns->ns_lock); + count = ns->ns_nr_unused - ns->ns_max_unused; + + if (count <= 0) { + l_unlock(&ns->ns_lock); + RETURN(0); + } + + list_for_each_safe(tmp, next, &ns->ns_unused_list) { + struct ldlm_lock *lock; + lock = list_entry(tmp, struct ldlm_lock, l_lru); + + LASSERT(!lock->l_readers && !lock->l_writers); + + /* Setting the CBPENDING flag is a little misleading, but + * prevents an important race; namely, once CBPENDING is set, + * the lock can accumulate no more readers/writers. Since + * readers and writers are already zero here, ldlm_lock_decref + * won't see this flag and call l_blocking_ast */ + lock->l_flags |= LDLM_FL_CBPENDING; + + OBD_ALLOC(w, sizeof(*w)); + LASSERT(w); + + w->w_lock = LDLM_LOCK_GET(lock); + list_add(&w->w_list, &list); + list_del_init(&lock->l_lru); + + if (--count == 0) + break; + } + l_unlock(&ns->ns_lock); + + list_for_each_safe(tmp, next, &list) { + struct lustre_handle lockh; + int rc; + w = list_entry(tmp, struct ldlm_ast_work, w_list); + + ldlm_lock2handle(w->w_lock, &lockh); + rc = ldlm_cli_cancel(&lockh); + if (rc != ELDLM_OK) + CDEBUG(D_INFO, "ldlm_cli_cancel: %d\n", rc); + + list_del(&w->w_list); + LDLM_LOCK_PUT(w->w_lock); + OBD_FREE(w, sizeof(*w)); + } + + RETURN(rc); +} + int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, __u64 *res_id, int flags) { @@ -519,14 +577,10 @@ int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, if (lock->l_readers || lock->l_writers) continue; - /* Setting the CBPENDING flag is a little misleading, but - * prevents an important race; namely, once CBPENDING is set, - * the lock can accumulate no more readers/writers. Since - * readers and writers are already zero here, ldlm_lock_decref - * won't see this flag and call l_blocking_ast */ + /* See CBPENDING comment in ldlm_cancel_lru */ lock->l_flags |= LDLM_FL_CBPENDING; - OBD_ALLOC(w, sizeof(*w)); + OBD_ALLOC(w, sizeof(*w)); LASSERT(w); w->w_lock = LDLM_LOCK_GET(lock); @@ -555,8 +609,8 @@ int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, if (rc != ELDLM_OK) CERROR("ldlm_cli_cancel: %d\n", rc); } - LDLM_LOCK_PUT(w->w_lock); list_del(&w->w_list); + LDLM_LOCK_PUT(w->w_lock); OBD_FREE(w, sizeof(*w)); } -- 1.8.3.1