From 034f478ed4aee79dbf7d4f42ae0079a677cf093b Mon Sep 17 00:00:00 2001 From: braam Date: Mon, 24 Jun 2002 05:10:40 +0000 Subject: [PATCH] Lots of reorg in the lock manager, dealing with the updated design. See the Lustre documentation for more details. --- lustre/ldlm/ldlm_lock.c | 121 +++++++++++++++++++-------------------------- lustre/ldlm/ldlm_lockd.c | 86 +++++++++++++++++--------------- lustre/ldlm/ldlm_request.c | 28 ++++++----- 3 files changed, 114 insertions(+), 121 deletions(-) diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index e54a0a9..c526c6d 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -18,7 +18,7 @@ #include #include -static kmem_cache_t *ldlm_lock_slab; +kmem_cache_t *ldlm_lock_slab; int (*mds_reint_p)(int offset, struct ptlrpc_request *req) = NULL; int (*mds_getattr_name_p)(int offset, struct ptlrpc_request *req) = NULL; @@ -175,21 +175,20 @@ int ldlm_lock_change_resource(struct ldlm_lock *lock, __u64 new_resid[3]) l_lock(&ns->ns_lock); type = lock->l_resource->lr_type; - for (i = 0; i < lock->l_refc; i++) { - int rc; - rc = ldlm_resource_put(lock->l_resource); - if (rc == 1 && i != lock->l_refc - 1) - LBUG(); - } - lock->l_resource = ldlm_resource_get(ns, NULL, new_resid, type, 1); if (lock->l_resource == NULL) { LBUG(); RETURN(-ENOMEM); } - for (i = 1; i < lock->l_refc; i++) + /* move references over */ + for (i = 0; i < lock->l_refc; i++) { + int rc; ldlm_resource_addref(lock->l_resource); + rc = ldlm_resource_put(lock->l_resource); + if (rc == 1 && i != lock->l_refc - 1) + LBUG(); + } l_unlock(&ns->ns_lock); RETURN(0); @@ -348,26 +347,6 @@ static int ldlm_plain_compat(struct ldlm_lock *a, struct ldlm_lock *b) return lockmode_compat(a->l_req_mode, b->l_req_mode); } -/* Args: unreferenced, locked lock - * - * Caller must do its own ldlm_resource_put() on lock->l_resource */ -void ldlm_lock_free(struct ldlm_lock *lock) -{ - if (!list_empty(&lock->l_children)) { - CERROR("lock %p still has children!\n", lock); - ldlm_lock_dump(lock); - LBUG(); - } - - if (lock->l_readers || lock->l_writers) - CDEBUG(D_INFO, "lock still has references (%d readers, %d " - "writers)\n", lock->l_readers, lock->l_writers); - - if (lock->l_connection) - ptlrpc_put_connection(lock->l_connection); - kmem_cache_free(ldlm_lock_slab, lock); -} - void ldlm_lock2desc(struct ldlm_lock *lock, struct ldlm_lock_desc *desc) { ldlm_res2desc(lock->l_resource, &desc->l_resource); @@ -389,7 +368,7 @@ static int ldlm_send_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock *new) } lock->l_flags |= LDLM_FL_AST_SENT; - + /* FIXME: this should merely add the lock to the lr_tmp list */ lock->l_blocking_ast(lock, new, lock->l_data, lock->l_data_len, &req); l_unlock(&lock->l_resource->lr_namespace->ns_lock); @@ -429,31 +408,31 @@ void ldlm_lock_decref(struct ldlm_lock *lock, __u32 mode) /* If we received a blocked AST and this was the last reference, * run the callback. */ if (!lock->l_readers && !lock->l_writers && - (lock->l_flags & LDLM_FL_BLOCKED_PENDING)) { + (lock->l_flags & LDLM_FL_CBPENDING)) { if (!lock->l_resource->lr_namespace->ns_client) { - CERROR("LDLM_FL_DYING set on non-local lock!\n"); + CERROR("LDLM_FL_CBPENDING set on non-local lock!\n"); LBUG(); } - CDEBUG(D_INFO, "final decref done on dying lock, " + CDEBUG(D_INFO, "final decref done on cbpending lock, " "calling callback.\n"); - l_lock(&lock->l_resource->lr_namespace->ns_lock); + l_unlock(&lock->l_resource->lr_namespace->ns_lock); lock->l_blocking_ast(lock, NULL, lock->l_data, lock->l_data_len, NULL); } else - l_lock(&lock->l_resource->lr_namespace->ns_lock); + l_unlock(&lock->l_resource->lr_namespace->ns_lock); ldlm_lock_put(lock); EXIT; } -static int _ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs, +static int ldlm_lock_compat_list(struct ldlm_lock *lock, int send_cbs, struct list_head *queue) { struct list_head *tmp, *pos; - int rc = 0; + int rc = 1; list_for_each_safe(tmp, pos, queue) { struct ldlm_lock *child; @@ -464,7 +443,7 @@ static int _ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs, continue; compat = ldlm_res_compat_table[child->l_resource->lr_type]; - if (compat(child, lock)) { + if (compat && compat(child, lock)) { CDEBUG(D_OTHER, "compat function succeded, next.\n"); continue; } @@ -473,7 +452,7 @@ static int _ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs, continue; } - rc = 1; + rc = 0; CDEBUG(D_OTHER, "compat function failed and lock modes " "incompat\n"); @@ -497,20 +476,26 @@ static int ldlm_lock_compat(struct ldlm_lock *lock, int send_cbs) ENTRY; l_lock(&lock->l_resource->lr_namespace->ns_lock); - rc = _ldlm_lock_compat(lock, send_cbs, &lock->l_resource->lr_granted); + rc = ldlm_lock_compat_list(lock, send_cbs, &lock->l_resource->lr_granted); /* FIXME: should we be sending ASTs to converting? */ - rc |= _ldlm_lock_compat(lock, send_cbs, - &lock->l_resource->lr_converting); - l_unlock(&lock->l_resource->lr_namespace->ns_lock); + if (rc) + rc = ldlm_lock_compat_list + (lock, send_cbs, &lock->l_resource->lr_converting); + l_unlock(&lock->l_resource->lr_namespace->ns_lock); RETURN(rc); } -void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock) +/* NOTE: called by + - ldlm_handle_enqueuque - resource +*/ +void ldlm_grant_lock(struct ldlm_lock *lock) { + struct ldlm_resource *res = lock->l_resource; struct ptlrpc_request *req = NULL; ENTRY; + l_lock(&lock->l_resource->lr_namespace->ns_lock); ldlm_resource_add_lock(res, &res->lr_granted, lock); lock->l_granted_mode = lock->l_req_mode; @@ -518,10 +503,11 @@ void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock) res->lr_most_restr = lock->l_granted_mode; if (lock->l_completion_ast) { + /* FIXME: this should merely add lock to lr_tmp list */ lock->l_completion_ast(lock, NULL, lock->l_data, lock->l_data_len, &req); if (req != NULL) { - struct list_head *list = res->lr_tmp; + struct list_head *list = &res->lr_tmp; if (list == NULL) { LBUG(); return; @@ -529,6 +515,7 @@ void ldlm_grant_lock(struct ldlm_resource *res, struct ldlm_lock *lock) list_add(&req->rq_multi, list); } } + l_unlock(&lock->l_resource->lr_namespace->ns_lock); EXIT; } @@ -636,11 +623,11 @@ ldlm_local_lock_create(struct ldlm_namespace *ns, } /* Must be called with lock->l_lock and lock->l_resource->lr_lock not held */ -ldlm_error_t ldlm_local_lock_enqueue(struct lustre_handle *lockh, - void *cookie, int cookie_len, - int *flags, - ldlm_lock_callback completion, - ldlm_lock_callback blocking) +ldlm_error_t ldlm_lock_enqueue(struct lustre_lock *lock, + void *cookie, int cookie_len, + int *flags, + ldlm_lock_callback completion, + ldlm_lock_callback blocking) { struct ldlm_resource *res; struct ldlm_lock *lock; @@ -715,8 +702,7 @@ ldlm_error_t ldlm_local_lock_enqueue(struct lustre_handle *lockh, *flags |= LDLM_FL_BLOCK_WAIT; GOTO(out, ELDLM_OK); } - incompat = ldlm_lock_compat(lock, 0); - if (incompat) { + if (!ldlm_lock_compat(lock,0)) { ldlm_resource_add_lock(res, res->lr_waiting.prev, lock); *flags |= LDLM_FL_BLOCK_GRANTED; GOTO(out, ELDLM_OK); @@ -731,20 +717,20 @@ ldlm_error_t ldlm_local_lock_enqueue(struct lustre_handle *lockh, return ELDLM_OK; } -/* Must be called with resource->lr_lock taken. */ +/* Must be called with namespace taken: queue is waiting or converting. */ static int ldlm_reprocess_queue(struct ldlm_resource *res, - struct list_head *converting) + struct list_head *queue) { struct list_head *tmp, *pos; ENTRY; - list_for_each_safe(tmp, pos, converting) { + list_for_each_safe(tmp, pos, queue) { struct ldlm_lock *pending; pending = list_entry(tmp, struct ldlm_lock, l_res_link); CDEBUG(D_INFO, "Reprocessing lock %p\n", pending); - if (ldlm_lock_compat(pending, 1)) + if (!ldlm_lock_compat(pending, 1)) RETURN(1); list_del_init(&pending->l_res_link); @@ -805,7 +791,7 @@ void ldlm_reprocess_all(struct ldlm_resource *res) } /* Must be called with lock and lock->l_resource unlocked */ -struct ldlm_resource *ldlm_local_lock_cancel(struct ldlm_lock *lock) +void ldlm_lock_cancel(struct ldlm_lock *lock) { struct ldlm_resource *res; struct ldlm_namespace *ns; @@ -819,17 +805,13 @@ struct ldlm_resource *ldlm_local_lock_cancel(struct ldlm_lock *lock) CDEBUG(D_INFO, "lock still has references (%d readers, %d " "writers)\n", lock->l_readers, lock->l_writers); - ldlm_resource_del_lock(lock); + ldlm_resource_unlink_lock(lock); ldlm_lock_destroy(lock); - l_unlock(&ns->ns_lock); - - RETURN(res); } /* Must be called with lock and lock->l_resource unlocked */ -struct ldlm_resource *ldlm_local_lock_convert(struct lustre_handle *lockh, - int new_mode, int *flags) +struct ldlm_resource *ldlm_convert(struct lustre_handle *lockh, int new_mode, int *flags) { struct ldlm_lock *lock; struct ldlm_resource *res; @@ -843,17 +825,18 @@ struct ldlm_resource *ldlm_local_lock_convert(struct lustre_handle *lockh, l_lock(&ns->ns_lock); lock->l_req_mode = new_mode; - ldlm_resource_del_lock(lock); + ldlm_resource_unlink_lock(lock); /* If this is a local resource, put it on the appropriate list. */ if (res->lr_namespace->ns_client) { - if (*flags & LDLM_FL_BLOCK_CONV) + if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED)) ldlm_resource_add_lock(res, res->lr_converting.prev, lock); - else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED)) - ldlm_resource_add_lock(res, res->lr_waiting.prev, lock); - else - ldlm_grant_lock(res, lock); + else { + ldlm_grant_lock(lock); + /* FIXME: completion handling not with ns_lock held ! */ + wake_up(&lock->l_waitq); + } } else { list_add(&lock->l_res_link, res->lr_converting.prev); } diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index f10e876..5bb4a37 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -64,13 +64,12 @@ static int ldlm_handle_enqueue(struct obd_device *obddev, struct ptlrpc_service if (!lock) GOTO(out, -ENOMEM); - ldlm_lock2handle(&lockh); memcpy(&lock->l_remote_handle, &dlm_req->lock_handle1, sizeof(lock->l_remote_handle)); LDLM_DEBUG(lock, "server-side enqueue handler, new lock created"); flags = dlm_req->lock_flags; - err = ldlm_local_lock_enqueue(&lockh, cookie, cookielen, &flags, + err = ldlm_lock_enqueue(lock, cookie, cookielen, &flags, ldlm_cli_callback, ldlm_cli_callback); if (err != ELDLM_OK) GOTO(out, err); @@ -78,7 +77,7 @@ static int ldlm_handle_enqueue(struct obd_device *obddev, struct ptlrpc_service dlm_rep = lustre_msg_buf(req->rq_repmsg, 0); dlm_rep->lock_flags = flags; - memcpy(&dlm_rep->lock_handle, &lockh, sizeof(lockh)); + ldlm_lock2handle(lock, &dlm_rep->lock_handle); if (dlm_req->lock_desc.l_resource.lr_type == LDLM_EXTENT) memcpy(&dlm_rep->lock_extent, &lock->l_extent, sizeof(lock->l_extent)); @@ -126,14 +125,15 @@ static int ldlm_handle_convert(struct ptlrpc_service *svc, struct ptlrpc_request dlm_rep->lock_flags = dlm_req->lock_flags; lock = ldlm_handle2lock(&dlm_req->lock_handle1); - if (!lock) - RETURN(0); - - LDLM_DEBUG(lock, "server-side convert handler START"); - ldlm_local_lock_convert(&dlm_req->lock_handle1, - dlm_req->lock_desc.l_req_mode, - &dlm_rep->lock_flags); - req->rq_status = 0; + if (!lock) { + req->rq_status = EINVAL; + } else { + LDLM_DEBUG(lock, "server-side convert handler START"); + ldlm_lock_convert(&dlm_req->lock_handle1, + dlm_req->lock_desc.l_req_mode, + &dlm_rep->lock_flags); + req->rq_status = 0; + } if (ptlrpc_reply(svc, req) != 0) LBUG(); @@ -160,12 +160,13 @@ static int ldlm_handle_cancel(struct ptlrpc_service *svc, struct ptlrpc_request dlm_req = lustre_msg_buf(req->rq_reqmsg, 0); lock = ldlm_handle2lock(&dlm_req->lock_handle1); - if (!lock) - RETURN(0); - - LDLM_DEBUG(lock, "server-side cancel handler START"); - ldlm_local_lock_cancel(lock); - req->rq_status = 0; + if (!lock) { + req->rq_status = ESTALE; + } else { + LDLM_DEBUG(lock, "server-side cancel handler START"); + ldlm_local_lock_cancel(lock); + req->rq_status = 0; + } if (ptlrpc_reply(svc, req) != 0) LBUG(); @@ -181,7 +182,10 @@ static int ldlm_handle_callback(struct ptlrpc_service *svc, struct ptlrpc_request *req) { struct ldlm_request *dlm_req; - struct ldlm_lock *lock, *new; + struct ldlm_lock_desc desc; + struct ldlm_lock_desc *descp; + struct ldlm_lock *lock; + __u64 is_blocking_ast; int rc; ENTRY; @@ -199,7 +203,13 @@ static int ldlm_handle_callback(struct ptlrpc_service *svc, RETURN(rc); lock = ldlm_handle2lock(&dlm_req->lock_handle1); - new = ldlm_handle2lock(&dlm_req->lock_handle2); + is_blocking_ast = dlm_req->lock_handle2.addr; + if (is_blocking_ast) { + /* FIXME: copy lock information into desc here */ + descp = &desc; + } else + descp = NULL; + if (!lock) { CERROR("callback on lock %Lx - lock disappeared\n", dlm_req->lock_handle.addr); @@ -209,25 +219,7 @@ static int ldlm_handle_callback(struct ptlrpc_service *svc, LDLM_DEBUG(lock, "client %s callback handler START", new == NULL ? "completion" : "blocked"); - spin_lock(&lock->l_resource->lr_lock); - spin_lock(&lock->l_lock); - if (!new) { - lock->l_req_mode = dlm_req->lock_desc.l_granted_mode; - - /* If we receive the completion AST before the actual enqueue - * returned, then we might need to switch resources. */ - if (memcmp(dlm_req->lock_desc.l_resource.lr_name, - lock->l_resource->lr_name, - sizeof(__u64) * RES_NAME_SIZE) != 0) { - - ldlm_lock_change_resource(lock, dlm_req->lock_desc.l_resource.lr_name); - LDLM_DEBUG(lock, "completion AST, new resource"); - } - ldlm_grant_lock(lock); - /* XXX Fixme: we want any completion function, not just wake_up */ - wake_up(&lock->l_waitq); - ldlm_lock_put(lock); - } else { + if (is_blocking_ast) { int do_ast; l_lock(&lock->l_resource->lr_namespace->ns_lock); lock->l_flags |= LDLM_FL_CBPENDING; @@ -238,18 +230,32 @@ static int ldlm_handle_callback(struct ptlrpc_service *svc, CDEBUG(D_INFO, "Lock already unused, calling " "callback (%p).\n", lock->l_blocking_ast); if (lock->l_blocking_ast != NULL) - lock->l_blocking_ast(lock, new, lock->l_data, + lock->l_blocking_ast(lock, descp, lock->l_data, lock->l_data_len, NULL); } else { LDLM_DEBUG(lock, "Lock still has references, will be" " cancelled later"); } ldlm_lock_put(lock); + } else { + lock->l_req_mode = dlm_req->lock_desc.l_granted_mode; + + /* If we receive the completion AST before the actual enqueue + * returned, then we might need to switch resources. */ + if (memcmp(dlm_req->lock_desc.l_resource.lr_name, + lock->l_resource->lr_name, + sizeof(__u64) * RES_NAME_SIZE) != 0) { + ldlm_lock_change_resource(lock, dlm_req->lock_desc.l_resource.lr_name); + LDLM_DEBUG(lock, "completion AST, new resource"); + } + ldlm_grant_lock(lock); + /* FIXME: we want any completion function, not just wake_up */ + wake_up(&lock->l_waitq); + ldlm_lock_put(lock); } LDLM_DEBUG_NOLOCK("client %s callback handler END (lock: %p)", new == NULL ? "completion" : "blocked", lock); - RETURN(0); } diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index a0781c9..0917dd1 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -39,7 +39,6 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, if (lock == NULL) GOTO(out, rc = -ENOMEM); - ldlm_lock2handle(lock, &lockh); LDLM_DEBUG(lock, "client-side enqueue START"); if (req == NULL) { @@ -62,8 +61,7 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, sizeof(body->lock_desc.l_extent)); body->lock_flags = *flags; - memcpy(&body->lock_handle1, lockh, sizeof(body->lock_handle1)); - + ldlm_lock2handle(lock, &body->lock_handle1); if (parent_lock_handle) memcpy(&body->lock_handle2, parent_lock_handle, sizeof(body->lock_handle2)); @@ -120,7 +118,7 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, if (!req_passed_in) ptlrpc_free_req(req); - rc = ldlm_local_lock_enqueue(lockh, cookie, cookielen, flags, callback, + rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, callback, callback); LDLM_DEBUG(lock, "client-side enqueue END"); @@ -198,7 +196,9 @@ int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh, int rc, size = sizeof(*body); ENTRY; - lock = lustre_handle2object(lockh); + lock = ldlm_handle2lock(lockh); + if (!lock) + LBUG(); *flags = 0; LDLM_DEBUG(lock, "client-side convert"); @@ -236,20 +236,26 @@ int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh, lock->l_granted_mode); CDEBUG(D_NET, "waking up, the lock must be granted.\n"); } + ldlm_put_lock(lock); EXIT; out: ptlrpc_free_req(req); return rc; } -int ldlm_cli_cancel(struct ptlrpc_client *cl, struct ldlm_lock *lock) +int ldlm_cli_cancel(struct ptlrpc_client *cl, struct lustre_handle *lockh) { struct ptlrpc_request *req; + struct ldlm_lock *lock; struct ldlm_request *body; struct ldlm_resource *res; int rc, size = sizeof(*body); ENTRY; + lock = ldlm_handle2lock(lockh); + if (!lock) + LBUG(); + LDLM_DEBUG(lock, "client-side cancel"); req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CANCEL, 1, &size, NULL); @@ -268,12 +274,10 @@ int ldlm_cli_cancel(struct ptlrpc_client *cl, struct ldlm_lock *lock) if (rc != ELDLM_OK) GOTO(out, rc); - res = ldlm_local_lock_cancel(lock); - if (res != NULL) - ldlm_reprocess_all(res); - else - rc = ELDLM_RESOURCE_FREED; + ldlm_lock_cancel(lock); + ldlm_reprocess_all(res); + ldlm_lock_put(lock); EXIT; out: - return rc; + return 0; } -- 1.8.3.1