X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fldlm%2Fldlm_request.c;h=400da088f8a14eabd8b385f815d39bc324f4ef3f;hb=7bdcf3a527a237ad0a1980b32e19045425cb6514;hp=eddf484d10660113d6e0a9b9f7fdeb5b86bb654e;hpb=9140bd3dae4ac7e447da4e7ab56e9d49c9fd29c3;p=fs%2Flustre-release.git diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index eddf484..400da08 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -12,9 +12,88 @@ #define DEBUG_SUBSYSTEM S_LDLM #include +#include -int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, - struct lustre_handle *connh, +int ldlm_completion_ast(struct ldlm_lock *lock, int flags) +{ + ENTRY; + + if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED | + LDLM_FL_BLOCK_CONV)) { + /* Go to sleep until the lock is granted. */ + /* FIXME: or cancelled. */ + LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock," + " sleeping"); + ldlm_lock_dump(lock); + ldlm_reprocess_all(lock->l_resource); + wait_event(lock->l_waitq, (lock->l_req_mode == + lock->l_granted_mode)); + LDLM_DEBUG(lock, "client-side enqueue waking up: granted"); + } else if (flags == LDLM_FL_WAIT_NOREPROC) { + wait_event(lock->l_waitq, (lock->l_req_mode == + lock->l_granted_mode)); + } else if (flags == 0) { + wake_up(&lock->l_waitq); + } + + RETURN(0); +} + +static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, + struct lustre_handle *parent_lockh, + __u64 *res_id, + __u32 type, + void *cookie, int cookielen, + ldlm_mode_t mode, + int *flags, + ldlm_completion_callback completion, + ldlm_blocking_callback blocking, + void *data, + __u32 data_len, + struct lustre_handle *lockh) +{ + struct ldlm_lock *lock; + int err; + + if (ns->ns_client) { + CERROR("Trying to cancel local lock\n"); + LBUG(); + } + + lock = ldlm_lock_create(ns, parent_lockh, res_id, type, mode, data, data_len); + if (!lock) + GOTO(out_nolock, err = -ENOMEM); + LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created"); + + ldlm_lock_addref_internal(lock, mode); + ldlm_lock2handle(lock, lockh); + lock->l_connh = NULL; + + err = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion, + blocking); + if (err != ELDLM_OK) + GOTO(out, err); + + if (type == LDLM_EXTENT) + memcpy(cookie, &lock->l_extent, sizeof(lock->l_extent)); + if ((*flags) & LDLM_FL_LOCK_CHANGED) + memcpy(res_id, lock->l_resource->lr_name, sizeof(*res_id)); + + LDLM_DEBUG_NOLOCK("client-side local enqueue handler END (lock %p)", + lock); + + if (lock->l_completion_ast) + lock->l_completion_ast(lock, *flags); + + LDLM_DEBUG(lock, "client-side local enqueue END"); + EXIT; + out: + LDLM_LOCK_PUT(lock); + out_nolock: + return err; +} + +int ldlm_cli_enqueue(struct lustre_handle *connh, struct ptlrpc_request *req, struct ldlm_namespace *ns, struct lustre_handle *parent_lock_handle, @@ -23,7 +102,8 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, void *cookie, int cookielen, ldlm_mode_t mode, int *flags, - ldlm_lock_callback callback, + ldlm_completion_callback completion, + ldlm_blocking_callback blocking, void *data, __u32 data_len, struct lustre_handle *lockh) @@ -34,19 +114,25 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, int rc, size = sizeof(*body), req_passed_in = 1; ENTRY; + if (connh == NULL) + return ldlm_cli_enqueue_local(ns, parent_lock_handle, res_id, + type, cookie, cookielen, mode, + flags, completion, blocking, data, + data_len, lockh); + *flags = 0; lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type, mode, data, data_len); if (lock == NULL) - GOTO(out, rc = -ENOMEM); + GOTO(out_nolock, rc = -ENOMEM); LDLM_DEBUG(lock, "client-side enqueue START"); /* for the local lock, add the reference */ ldlm_lock_addref_internal(lock, mode); ldlm_lock2handle(lock, lockh); if (req == NULL) { - req = ptlrpc_prep_req2(cl, conn, connh, - LDLM_ENQUEUE, 1, &size, NULL); + req = ptlrpc_prep_req(class_conn2cliimp(connh), LDLM_ENQUEUE, 1, + &size, NULL); if (!req) GOTO(out, rc = -ENOMEM); req_passed_in = 0; @@ -72,9 +158,8 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, size = sizeof(*reply); req->rq_replen = lustre_msg_size(1, &size); } - - lock->l_connection = ptlrpc_connection_addref(conn); - lock->l_client = cl; + lock->l_connh = connh; + lock->l_export = NULL; rc = ptlrpc_queue_wait(req); /* FIXME: status check here? */ @@ -83,7 +168,6 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, if (rc != ELDLM_OK) { LDLM_DEBUG(lock, "client-side enqueue END (%s)", rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED"); - LDLM_LOCK_PUT(lock); ldlm_lock_decref(lockh, mode); /* FIXME: if we've already received a completion AST, this will * LBUG! */ @@ -107,48 +191,48 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, /* If enqueue returned a blocked lock but the completion handler has * already run, then it fixed up the resource and we don't need to do it * again. */ - if ((*flags) & LDLM_FL_LOCK_CHANGED && - lock->l_req_mode != lock->l_granted_mode) { - CDEBUG(D_INFO, "remote intent success, locking %ld instead of" - "%ld\n", (long)reply->lock_resource_name[0], - (long)lock->l_resource->lr_name[0]); - - ldlm_lock_change_resource(lock, reply->lock_resource_name); - if (lock->l_resource == NULL) { - LBUG(); - RETURN(-ENOMEM); + if ((*flags) & LDLM_FL_LOCK_CHANGED) { + int newmode = reply->lock_mode; + if (newmode && newmode != lock->l_req_mode) { + LDLM_DEBUG(lock, "server returned different mode %s", + ldlm_lockname[newmode]); + lock->l_req_mode = newmode; + } + + if (reply->lock_resource_name[0] != + lock->l_resource->lr_name[0]) { + CDEBUG(D_INFO, "remote intent success, locking %ld " + "instead of %ld\n", + (long)reply->lock_resource_name[0], + (long)lock->l_resource->lr_name[0]); + + ldlm_lock_change_resource(lock, + reply->lock_resource_name); + if (lock->l_resource == NULL) { + LBUG(); + RETURN(-ENOMEM); + } + LDLM_DEBUG(lock, "client-side enqueue, new resource"); } - LDLM_DEBUG(lock, "client-side enqueue, new resource"); } if (!req_passed_in) ptlrpc_free_req(req); - rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, callback, - callback); + rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion, + blocking); + if (lock->l_completion_ast) + lock->l_completion_ast(lock, *flags); - if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED | - LDLM_FL_BLOCK_CONV)) { - /* Go to sleep until the lock is granted. */ - /* FIXME: or cancelled. */ - LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock," - " sleeping"); - ldlm_lock_dump(lock); -#warning ldlm needs to time out - wait_event(lock->l_waitq, - lock->l_req_mode == lock->l_granted_mode); - LDLM_DEBUG(lock, "client-side enqueue waking up: granted"); - } LDLM_DEBUG(lock, "client-side enqueue END"); - LDLM_LOCK_PUT(lock); EXIT; out: + LDLM_LOCK_PUT(lock); + out_nolock: return rc; } -int ldlm_match_or_enqueue(struct ptlrpc_client *cl, - struct ptlrpc_connection *conn, - struct lustre_handle *connh, +int ldlm_match_or_enqueue(struct lustre_handle *connh, struct ptlrpc_request *req, struct ldlm_namespace *ns, struct lustre_handle *parent_lock_handle, @@ -157,7 +241,8 @@ int ldlm_match_or_enqueue(struct ptlrpc_client *cl, void *cookie, int cookielen, ldlm_mode_t mode, int *flags, - ldlm_lock_callback callback, + ldlm_completion_callback completion, + ldlm_blocking_callback blocking, void *data, __u32 data_len, struct lustre_handle *lockh) @@ -166,10 +251,10 @@ int ldlm_match_or_enqueue(struct ptlrpc_client *cl, ENTRY; rc = ldlm_lock_match(ns, res_id, type, cookie, cookielen, mode, lockh); if (rc == 0) { - rc = ldlm_cli_enqueue(cl, conn, connh, req, ns, + rc = ldlm_cli_enqueue(connh, req, ns, parent_lock_handle, res_id, type, cookie, - cookielen, mode, flags, callback, data, - data_len, lockh); + cookielen, mode, flags, completion, + blocking, data, data_len, lockh); if (rc != ELDLM_OK) CERROR("ldlm_cli_enqueue: err: %d\n", rc); RETURN(rc); @@ -177,59 +262,28 @@ int ldlm_match_or_enqueue(struct ptlrpc_client *cl, RETURN(0); } -int ldlm_server_ast(struct lustre_handle *lockh, struct ldlm_lock_desc *desc, - void *data, __u32 data_len) +static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode, + int *flags) { - struct ldlm_lock *lock; - struct ldlm_request *body; - struct ptlrpc_request *req; - struct ptlrpc_client *cl; - int rc = 0, size = sizeof(*body); - ENTRY; - lock = ldlm_handle2lock(lockh); - if (lock == NULL) { + if (lock->l_resource->lr_namespace->ns_client) { + CERROR("Trying to cancel local lock\n"); LBUG(); - RETURN(-EINVAL); - } - cl = &lock->l_resource->lr_namespace->ns_rpc_client; - req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 1, - &size, NULL); - if (!req) - GOTO(out, rc = -ENOMEM); - - body = lustre_msg_buf(req->rq_reqmsg, 0); - memcpy(&body->lock_handle1, &lock->l_remote_handle, - sizeof(body->lock_handle1)); - - if (desc == NULL) { - CDEBUG(D_NET, "Sending granted AST\n"); - ldlm_lock2desc(lock, &body->lock_desc); - } else { - CDEBUG(D_NET, "Sending blocked AST\n"); - memcpy(&body->lock_desc, desc, sizeof(*desc)); } + LDLM_DEBUG(lock, "client-side local convert"); - LDLM_DEBUG(lock, "server preparing %s AST", - desc == 0 ? "completion" : "blocked"); - - req->rq_replen = lustre_msg_size(0, NULL); - - rc = ptlrpc_queue_wait(req); - rc = ptlrpc_check_status(req, rc); - ptlrpc_free_req(req); + ldlm_lock_convert(lock, new_mode, flags); + ldlm_reprocess_all(lock->l_resource); - EXIT; - out: + LDLM_DEBUG(lock, "client-side local convert handler END"); LDLM_LOCK_PUT(lock); - return rc; + RETURN(0); } -int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh, - struct lustre_handle *connh, - int new_mode, int *flags) +int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags) { struct ldlm_request *body; + struct lustre_handle *connh; struct ldlm_reply *reply; struct ldlm_lock *lock; struct ldlm_resource *res; @@ -243,11 +297,15 @@ int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh, RETURN(-EINVAL); } *flags = 0; + connh = lock->l_connh; + + if (!connh) + return ldlm_cli_convert_local(lock, new_mode, flags); LDLM_DEBUG(lock, "client-side convert"); - req = ptlrpc_prep_req(cl, lock->l_connection, - LDLM_CONVERT, 1, &size, NULL); + req = ptlrpc_prep_req(class_conn2cliimp(connh), LDLM_CONVERT, 1, &size, + NULL); if (!req) GOTO(out, rc = -ENOMEM); @@ -270,18 +328,13 @@ int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh, res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags); if (res != NULL) ldlm_reprocess_all(res); - if (lock->l_req_mode != lock->l_granted_mode) { - /* Go to sleep until the lock is granted. */ - /* FIXME: or cancelled. */ - CDEBUG(D_NET, "convert returned a blocked lock, " - "going to sleep.\n"); - wait_event(lock->l_waitq, - lock->l_req_mode == lock->l_granted_mode); - CDEBUG(D_NET, "waking up, the lock must be granted.\n"); - } - LDLM_LOCK_PUT(lock); + /* Go to sleep until the lock is granted. */ + /* FIXME: or cancelled. */ + if (lock->l_completion_ast) + lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC); EXIT; out: + LDLM_LOCK_PUT(lock); ptlrpc_free_req(req); return rc; } @@ -291,10 +344,10 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) struct ptlrpc_request *req; struct ldlm_lock *lock; struct ldlm_request *body; - int rc, size = sizeof(*body); + int rc = 0, size = sizeof(*body); ENTRY; - lock = ldlm_handle2lock(lockh); + lock = ldlm_handle2lock(lockh); if (!lock) { /* It's possible that the decref that we did just before this * cancel was the last reader/writer, and caused a cancel before @@ -305,27 +358,97 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) RETURN(-EINVAL); } - LDLM_DEBUG(lock, "client-side cancel"); - req = ptlrpc_prep_req(lock->l_client, lock->l_connection, - LDLM_CANCEL, 1, &size, NULL); - if (!req) - GOTO(out, rc = -ENOMEM); + if (lock->l_connh) { + LDLM_DEBUG(lock, "client-side cancel"); + /* Set this flag to prevent others from getting new references*/ + l_lock(&lock->l_resource->lr_namespace->ns_lock); + lock->l_flags |= LDLM_FL_CBPENDING; + l_unlock(&lock->l_resource->lr_namespace->ns_lock); - body = lustre_msg_buf(req->rq_reqmsg, 0); - memcpy(&body->lock_handle1, &lock->l_remote_handle, - sizeof(body->lock_handle1)); + req = ptlrpc_prep_req(class_conn2cliimp(lock->l_connh), + LDLM_CANCEL, 1, &size, NULL); + if (!req) + GOTO(out, rc = -ENOMEM); - req->rq_replen = lustre_msg_size(0, NULL); + body = lustre_msg_buf(req->rq_reqmsg, 0); + memcpy(&body->lock_handle1, &lock->l_remote_handle, + sizeof(body->lock_handle1)); - rc = ptlrpc_queue_wait(req); - rc = ptlrpc_check_status(req, rc); - ptlrpc_free_req(req); - if (rc != ELDLM_OK) - GOTO(out, rc); + req->rq_replen = lustre_msg_size(0, NULL); + + rc = ptlrpc_queue_wait(req); + rc = ptlrpc_check_status(req, rc); + ptlrpc_free_req(req); + if (rc != ELDLM_OK) + GOTO(out, rc); + + ldlm_lock_cancel(lock); + } else { + LDLM_DEBUG(lock, "client-side local cancel"); + if (lock->l_resource->lr_namespace->ns_client) { + CERROR("Trying to cancel local lock\n"); + LBUG(); + } + ldlm_lock_cancel(lock); + ldlm_reprocess_all(lock->l_resource); + LDLM_DEBUG(lock, "client-side local cancel handler END"); + } - ldlm_lock_cancel(lock); - LDLM_LOCK_PUT(lock); EXIT; out: - return 0; + LDLM_LOCK_PUT(lock); + return rc; +} + +/* Cancel all locks on a given resource that have 0 readers/writers */ +int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, __u64 *res_id) +{ + struct ldlm_resource *res; + struct list_head *tmp, *next, list = LIST_HEAD_INIT(list); + struct ldlm_ast_work *w; + ENTRY; + + res = ldlm_resource_get(ns, NULL, res_id, 0, 0); + if (res == NULL) + RETURN(-EINVAL); + + l_lock(&ns->ns_lock); + list_for_each(tmp, &res->lr_granted) { + struct ldlm_lock *lock; + lock = list_entry(tmp, struct ldlm_lock, l_res_link); + + if (lock->l_readers || lock->l_writers) + continue; + + /* Setting the CBPENDING flag is a little misleading, but + * prevents an important race; namely, once CBPENDING is set, + * the lock can accumulate no more readers/writers. Since + * readers and writers are already zero here, ldlm_lock_decref + * won't see this flag and call l_blocking_ast */ + lock->l_flags |= LDLM_FL_CBPENDING; + + OBD_ALLOC(w, sizeof(*w)); + LASSERT(w); + + w->w_lock = LDLM_LOCK_GET(lock); + list_add(&w->w_list, &list); + } + l_unlock(&ns->ns_lock); + + list_for_each_safe(tmp, next, &list) { + struct lustre_handle lockh; + int rc; + w = list_entry(tmp, struct ldlm_ast_work, w_list); + + ldlm_lock2handle(w->w_lock, &lockh); + rc = ldlm_cli_cancel(&lockh); + if (rc != ELDLM_OK) + CERROR("ldlm_cli_cancel: %d\n", rc); + + LDLM_LOCK_PUT(w->w_lock); + list_del(&w->w_list); + OBD_FREE(w, sizeof(*w)); + } + + RETURN(0); }