X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fldlm%2Fldlm_request.c;h=3731f7dc0f56791fcbc9285c60175c99e34deb73;hb=4b5bb6b7cd2b2e7fad8dae3c5e1e7a0bdc488f62;hp=709b4281d116128f7b4c47eb5b9b37daded888d0;hpb=13c84f1514db1fdeacd361f63737ba86718f36e2;p=fs%2Flustre-release.git diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index 709b428..3731f7d 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -12,8 +12,88 @@ #define DEBUG_SUBSYSTEM S_LDLM #include +#include -int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, +int ldlm_completion_ast(struct ldlm_lock *lock, int flags) +{ + ENTRY; + + if (flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED | + LDLM_FL_BLOCK_CONV)) { + /* Go to sleep until the lock is granted. */ + /* FIXME: or cancelled. */ + LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock," + " sleeping"); + ldlm_lock_dump(lock); + ldlm_reprocess_all(lock->l_resource); + wait_event(lock->l_waitq, (lock->l_req_mode == + lock->l_granted_mode)); + LDLM_DEBUG(lock, "client-side enqueue waking up: granted"); + } else if (flags == LDLM_FL_WAIT_NOREPROC) { + wait_event(lock->l_waitq, (lock->l_req_mode == + lock->l_granted_mode)); + } else if (flags == 0) { + wake_up(&lock->l_waitq); + } + + RETURN(0); +} + +static int ldlm_cli_enqueue_local(struct ldlm_namespace *ns, + struct lustre_handle *parent_lockh, + __u64 *res_id, + __u32 type, + void *cookie, int cookielen, + ldlm_mode_t mode, + int *flags, + ldlm_completion_callback completion, + ldlm_blocking_callback blocking, + void *data, + __u32 data_len, + struct lustre_handle *lockh) +{ + struct ldlm_lock *lock; + int err; + + if (ns->ns_client) { + CERROR("Trying to cancel local lock\n"); + LBUG(); + } + + lock = ldlm_lock_create(ns, parent_lockh, res_id, type, mode, NULL, 0); + if (!lock) + GOTO(out_nolock, err = -ENOMEM); + LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created"); + + ldlm_lock_addref_internal(lock, mode); + ldlm_lock2handle(lock, lockh); + lock->l_connh = NULL; + + err = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion, + blocking); + if (err != ELDLM_OK) + GOTO(out, err); + + if (type == LDLM_EXTENT) + memcpy(cookie, &lock->l_extent, sizeof(lock->l_extent)); + if ((*flags) & LDLM_FL_LOCK_CHANGED) + memcpy(res_id, lock->l_resource->lr_name, sizeof(*res_id)); + + LDLM_DEBUG_NOLOCK("client-side local enqueue handler END (lock %p)", + lock); + + if (lock->l_completion_ast) + lock->l_completion_ast(lock, *flags); + + LDLM_DEBUG(lock, "client-side local enqueue END"); + EXIT; + out: + LDLM_LOCK_PUT(lock); + out_nolock: + return err; +} + +int ldlm_cli_enqueue(struct lustre_handle *connh, struct ptlrpc_request *req, struct ldlm_namespace *ns, struct lustre_handle *parent_lock_handle, @@ -22,7 +102,8 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, void *cookie, int cookielen, ldlm_mode_t mode, int *flags, - ldlm_lock_callback callback, + ldlm_completion_callback completion, + ldlm_blocking_callback blocking, void *data, __u32 data_len, struct lustre_handle *lockh) @@ -33,17 +114,25 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, int rc, size = sizeof(*body), req_passed_in = 1; ENTRY; - *flags = 0; - rc = ldlm_local_lock_create(ns, parent_lock_handle, res_id, type, mode, - data, data_len, lockh); - if (rc != ELDLM_OK) - GOTO(out, rc); + if (connh == NULL) + return ldlm_cli_enqueue_local(ns, parent_lock_handle, res_id, + type, cookie, cookielen, mode, + flags, completion, blocking, data, + data_len, lockh); - lock = lustre_handle2object(lockh); + *flags = 0; + lock = ldlm_lock_create(ns, parent_lock_handle, res_id, type, mode, + data, data_len); + if (lock == NULL) + GOTO(out_nolock, rc = -ENOMEM); LDLM_DEBUG(lock, "client-side enqueue START"); + /* for the local lock, add the reference */ + ldlm_lock_addref_internal(lock, mode); + ldlm_lock2handle(lock, lockh); if (req == NULL) { - req = ptlrpc_prep_req(cl, conn, LDLM_ENQUEUE, 1, &size, NULL); + req = ptlrpc_prep_req(class_conn2cliimp(connh), LDLM_ENQUEUE, 1, + &size, NULL); if (!req) GOTO(out, rc = -ENOMEM); req_passed_in = 0; @@ -52,18 +141,14 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, /* Dump all of this data into the request buffer */ body = lustre_msg_buf(req->rq_reqmsg, 0); - body->lock_desc.l_resource.lr_type = type; - memcpy(body->lock_desc.l_resource.lr_name, res_id, - sizeof(body->lock_desc.l_resource.lr_name)); - - body->lock_desc.l_req_mode = mode; + ldlm_lock2desc(lock, &body->lock_desc); + /* Phil: make this part of ldlm_lock2desc */ if (type == LDLM_EXTENT) memcpy(&body->lock_desc.l_extent, cookie, sizeof(body->lock_desc.l_extent)); body->lock_flags = *flags; - memcpy(&body->lock_handle1, lockh, sizeof(body->lock_handle1)); - + memcpy(&body->lock_handle1, lockh, sizeof(*lockh)); if (parent_lock_handle) memcpy(&body->lock_handle2, parent_lock_handle, sizeof(body->lock_handle2)); @@ -73,21 +158,20 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, size = sizeof(*reply); req->rq_replen = lustre_msg_size(1, &size); } - - lock->l_connection = conn; - lock->l_client = cl; + lock->l_connh = connh; + lock->l_export = NULL; rc = ptlrpc_queue_wait(req); + /* FIXME: status check here? */ rc = ptlrpc_check_status(req, rc); - spin_lock(&lock->l_lock); if (rc != ELDLM_OK) { LDLM_DEBUG(lock, "client-side enqueue END (%s)", rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED"); - spin_lock(&lock->l_resource->lr_lock); - ldlm_resource_put(lock->l_resource); - spin_unlock(&lock->l_resource->lr_lock); - ldlm_lock_free(lock); + ldlm_lock_decref(lockh, mode); + /* FIXME: if we've already received a completion AST, this will + * LBUG! */ + ldlm_lock_destroy(lock); GOTO(out, rc); } @@ -107,99 +191,99 @@ int ldlm_cli_enqueue(struct ptlrpc_client *cl, struct ptlrpc_connection *conn, /* If enqueue returned a blocked lock but the completion handler has * already run, then it fixed up the resource and we don't need to do it * again. */ - if (*flags & LDLM_FL_LOCK_CHANGED && - lock->l_req_mode != lock->l_granted_mode) { - CDEBUG(D_INFO, "remote intent success, locking %ld instead of" - "%ld\n", (long)reply->lock_resource_name[0], - (long)lock->l_resource->lr_name[0]); - spin_lock(&lock->l_resource->lr_lock); - if (!ldlm_resource_put(lock->l_resource)) - spin_unlock(&lock->l_resource->lr_lock); - - lock->l_resource = - ldlm_resource_get(ns, NULL, reply->lock_resource_name, - type, 1); - if (lock->l_resource == NULL) { - LBUG(); - RETURN(-ENOMEM); + if ((*flags) & LDLM_FL_LOCK_CHANGED) { + int newmode = reply->lock_mode; + if (newmode && newmode != lock->l_req_mode) { + LDLM_DEBUG(lock, "server returned different mode %s", + ldlm_lockname[newmode]); + lock->l_req_mode = newmode; + } + + if (reply->lock_resource_name[0] != + lock->l_resource->lr_name[0]) { + CDEBUG(D_INFO, "remote intent success, locking %ld " + "instead of %ld\n", + (long)reply->lock_resource_name[0], + (long)lock->l_resource->lr_name[0]); + + ldlm_lock_change_resource(lock, + reply->lock_resource_name); + if (lock->l_resource == NULL) { + LBUG(); + RETURN(-ENOMEM); + } + LDLM_DEBUG(lock, "client-side enqueue, new resource"); } - LDLM_DEBUG(lock, "client-side enqueue, new resource"); } if (!req_passed_in) ptlrpc_free_req(req); - spin_unlock(&lock->l_lock); - rc = ldlm_local_lock_enqueue(lockh, cookie, cookielen, flags, callback, - callback); + rc = ldlm_lock_enqueue(lock, cookie, cookielen, flags, completion, + blocking); + if (lock->l_completion_ast) + lock->l_completion_ast(lock, *flags); LDLM_DEBUG(lock, "client-side enqueue END"); - if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED | - LDLM_FL_BLOCK_CONV)) { - /* Go to sleep until the lock is granted. */ - /* FIXME: or cancelled. */ - CDEBUG(D_NET, "enqueue returned a blocked lock (%p), " - "going to sleep.\n", lock); - ldlm_lock_dump(lock); - wait_event_interruptible(lock->l_waitq, lock->l_req_mode == - lock->l_granted_mode); - CDEBUG(D_NET, "waking up, the lock must be granted.\n"); - } EXIT; out: + LDLM_LOCK_PUT(lock); + out_nolock: return rc; } -int ldlm_cli_callback(struct ldlm_lock *lock, struct ldlm_lock *new, - void *data, __u32 data_len, struct ptlrpc_request **reqp) +int ldlm_match_or_enqueue(struct lustre_handle *connh, + struct ptlrpc_request *req, + struct ldlm_namespace *ns, + struct lustre_handle *parent_lock_handle, + __u64 *res_id, + __u32 type, + void *cookie, int cookielen, + ldlm_mode_t mode, + int *flags, + ldlm_completion_callback completion, + ldlm_blocking_callback blocking, + void *data, + __u32 data_len, + struct lustre_handle *lockh) { - struct ldlm_request *body; - struct ptlrpc_request *req; - struct ptlrpc_client *cl = - &lock->l_resource->lr_namespace->ns_rpc_client; - int rc = 0, size = sizeof(*body); + int rc; ENTRY; + rc = ldlm_lock_match(ns, res_id, type, cookie, cookielen, mode, lockh); + if (rc == 0) { + rc = ldlm_cli_enqueue(connh, req, ns, + parent_lock_handle, res_id, type, cookie, + cookielen, mode, flags, completion, + blocking, data, data_len, lockh); + if (rc != ELDLM_OK) + CERROR("ldlm_cli_enqueue: err: %d\n", rc); + RETURN(rc); + } else + RETURN(0); +} - req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CALLBACK, 1, - &size, NULL); - if (!req) - GOTO(out, rc = -ENOMEM); - - body = lustre_msg_buf(req->rq_reqmsg, 0); - memcpy(&body->lock_handle1, &lock->l_remote_handle, - sizeof(body->lock_handle1)); +static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode, + int *flags) +{ - if (new == NULL) { - CDEBUG(D_NET, "Sending granted AST\n"); - ldlm_lock2desc(lock, &body->lock_desc); - } else { - CDEBUG(D_NET, "Sending blocked AST\n"); - ldlm_lock2desc(new, &body->lock_desc); - ldlm_object2handle(new, &body->lock_handle2); + if (lock->l_resource->lr_namespace->ns_client) { + CERROR("Trying to cancel local lock\n"); + LBUG(); } + LDLM_DEBUG(lock, "client-side local convert"); - LDLM_DEBUG(lock, "server preparing %s AST", - new == NULL ? "completion" : "blocked"); - - req->rq_replen = lustre_msg_size(0, NULL); - - if (reqp == NULL) { - LBUG(); - rc = ptlrpc_queue_wait(req); - rc = ptlrpc_check_status(req, rc); - ptlrpc_free_req(req); - } else - *reqp = req; + ldlm_lock_convert(lock, new_mode, flags); + ldlm_reprocess_all(lock->l_resource); - EXIT; - out: - return rc; + LDLM_DEBUG(lock, "client-side local convert handler END"); + LDLM_LOCK_PUT(lock); + RETURN(0); } -int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh, - int new_mode, int *flags) +int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, int *flags) { struct ldlm_request *body; + struct lustre_handle *connh; struct ldlm_reply *reply; struct ldlm_lock *lock; struct ldlm_resource *res; @@ -207,12 +291,20 @@ int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh, int rc, size = sizeof(*body); ENTRY; - lock = lustre_handle2object(lockh); + lock = ldlm_handle2lock(lockh); + if (!lock) { + LBUG(); + RETURN(-EINVAL); + } *flags = 0; + connh = lock->l_connh; + + if (!connh) + return ldlm_cli_convert_local(lock, new_mode, flags); LDLM_DEBUG(lock, "client-side convert"); - req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CONVERT, 1, &size, + req = ptlrpc_prep_req(class_conn2cliimp(connh), LDLM_CONVERT, 1, &size, NULL); if (!req) GOTO(out, rc = -ENOMEM); @@ -233,56 +325,130 @@ int ldlm_cli_convert(struct ptlrpc_client *cl, struct lustre_handle *lockh, GOTO(out, rc); reply = lustre_msg_buf(req->rq_repmsg, 0); - res = ldlm_local_lock_convert(lockh, new_mode, &reply->lock_flags); + res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags); if (res != NULL) ldlm_reprocess_all(res); - if (lock->l_req_mode != lock->l_granted_mode) { - /* Go to sleep until the lock is granted. */ - /* FIXME: or cancelled. */ - CDEBUG(D_NET, "convert returned a blocked lock, " - "going to sleep.\n"); - wait_event_interruptible(lock->l_waitq, lock->l_req_mode == - lock->l_granted_mode); - CDEBUG(D_NET, "waking up, the lock must be granted.\n"); - } + /* Go to sleep until the lock is granted. */ + /* FIXME: or cancelled. */ + if (lock->l_completion_ast) + lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC); EXIT; out: + LDLM_LOCK_PUT(lock); ptlrpc_free_req(req); return rc; } -int ldlm_cli_cancel(struct ptlrpc_client *cl, struct ldlm_lock *lock) +int ldlm_cli_cancel(struct lustre_handle *lockh) { struct ptlrpc_request *req; + struct ldlm_lock *lock; struct ldlm_request *body; - struct ldlm_resource *res; - int rc, size = sizeof(*body); + int rc = 0, size = sizeof(*body); ENTRY; - LDLM_DEBUG(lock, "client-side cancel"); - req = ptlrpc_prep_req(cl, lock->l_connection, LDLM_CANCEL, 1, &size, - NULL); - if (!req) - GOTO(out, rc = -ENOMEM); + lock = ldlm_handle2lock(lockh); + if (!lock) { + /* It's possible that the decref that we did just before this + * cancel was the last reader/writer, and caused a cancel before + * we could call this function. If we want to make this + * impossible (by adding a dec_and_cancel() or similar), then + * we can put the LBUG back. */ + //LBUG(); + RETURN(-EINVAL); + } - body = lustre_msg_buf(req->rq_reqmsg, 0); - memcpy(&body->lock_handle1, &lock->l_remote_handle, - sizeof(body->lock_handle1)); + if (lock->l_connh) { + LDLM_DEBUG(lock, "client-side cancel"); + /* Set this flag to prevent others from getting new references*/ + l_lock(&lock->l_resource->lr_namespace->ns_lock); + lock->l_flags |= LDLM_FL_CBPENDING; + l_unlock(&lock->l_resource->lr_namespace->ns_lock); - req->rq_replen = lustre_msg_size(0, NULL); + req = ptlrpc_prep_req(class_conn2cliimp(lock->l_connh), + LDLM_CANCEL, 1, &size, NULL); + if (!req) + GOTO(out, rc = -ENOMEM); - rc = ptlrpc_queue_wait(req); - rc = ptlrpc_check_status(req, rc); - ptlrpc_free_req(req); - if (rc != ELDLM_OK) - GOTO(out, rc); + body = lustre_msg_buf(req->rq_reqmsg, 0); + memcpy(&body->lock_handle1, &lock->l_remote_handle, + sizeof(body->lock_handle1)); + + req->rq_replen = lustre_msg_size(0, NULL); + + rc = ptlrpc_queue_wait(req); + rc = ptlrpc_check_status(req, rc); + ptlrpc_free_req(req); + if (rc != ELDLM_OK) + GOTO(out, rc); + + ldlm_lock_cancel(lock); + } else { + LDLM_DEBUG(lock, "client-side local cancel"); + if (lock->l_resource->lr_namespace->ns_client) { + CERROR("Trying to cancel local lock\n"); + LBUG(); + } + ldlm_lock_cancel(lock); + ldlm_reprocess_all(lock->l_resource); + LDLM_DEBUG(lock, "client-side local cancel handler END"); + } - res = ldlm_local_lock_cancel(lock); - if (res != NULL) - ldlm_reprocess_all(res); - else - rc = ELDLM_RESOURCE_FREED; EXIT; out: + LDLM_LOCK_PUT(lock); return rc; } + +/* Cancel all locks on a given resource that have 0 readers/writers */ +int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, __u64 *res_id) +{ + struct ldlm_resource *res; + struct list_head *tmp, *next, list = LIST_HEAD_INIT(list); + struct ldlm_ast_work *w; + ENTRY; + + res = ldlm_resource_get(ns, NULL, res_id, 0, 0); + if (res == NULL) + RETURN(-EINVAL); + + l_lock(&ns->ns_lock); + list_for_each(tmp, &res->lr_granted) { + struct ldlm_lock *lock; + lock = list_entry(tmp, struct ldlm_lock, l_res_link); + + if (lock->l_readers || lock->l_writers) + continue; + + /* Setting the CBPENDING flag is a little misleading, but + * prevents an important race; namely, once CBPENDING is set, + * the lock can accumulate no more readers/writers. Since + * readers and writers are already zero here, ldlm_lock_decref + * won't see this flag and call l_blocking_ast */ + lock->l_flags |= LDLM_FL_CBPENDING; + + OBD_ALLOC(w, sizeof(*w)); + LASSERT(w); + + w->w_lock = LDLM_LOCK_GET(lock); + list_add(&w->w_list, &list); + } + l_unlock(&ns->ns_lock); + + list_for_each_safe(tmp, next, &list) { + struct lustre_handle lockh; + int rc; + w = list_entry(tmp, struct ldlm_ast_work, w_list); + + ldlm_lock2handle(w->w_lock, &lockh); + rc = ldlm_cli_cancel(&lockh); + if (rc != ELDLM_OK) + CERROR("ldlm_cli_cancel: %d\n", rc); + + LDLM_LOCK_PUT(w->w_lock); + list_del(&w->w_list); + OBD_FREE(w, sizeof(*w)); + } + + RETURN(0); +}