X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fldlm%2Fldlm_request.c;h=ec5f8d46f83880f807cb5b8e273ccee4d95c7a04;hp=a996da6acf80b4daaeb415f00c64673c76c0327c;hb=926c6309185a25a8ac1541cfa67910325ed8626f;hpb=7ff01fc9e03faeb8c23182e50b6b85107c598628;ds=sidebyside diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index a996da6..ec5f8d4 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -106,8 +106,15 @@ noreproc: lwd.lwd_lock = lock; - lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, ldlm_expired_completion_wait, - interrupted_completion_wait, &lwd); + if (flags & LDLM_FL_NO_TIMEOUT) { + LDLM_DEBUG(lock, "waiting indefinitely for group lock\n"); + lwi = LWI_INTR(interrupted_completion_wait, &lwd); + } else { + lwi = LWI_TIMEOUT_INTR(obd_timeout * HZ, + ldlm_expired_completion_wait, + interrupted_completion_wait, &lwd); + } + if (imp != NULL) { spin_lock_irqsave(&imp->imp_lock, irqflags); lwd.lwd_generation = imp->imp_generation; @@ -117,9 +124,9 @@ noreproc: /* Go to sleep until the lock is granted or cancelled. */ rc = l_wait_event(lock->l_waitq, ((lock->l_req_mode == lock->l_granted_mode) || - (lock->l_flags & LDLM_FL_CANCEL)), &lwi); + (lock->l_flags & LDLM_FL_FAILED)), &lwi); - if (lock->l_destroyed) { + if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED) { LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed"); RETURN(-EIO); } @@ -230,6 +237,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ldlm_reply *reply; int rc, size[2] = {sizeof(*body), lvb_len}, req_passed_in = 1; int is_replay = *flags & LDLM_FL_REPLAY; + int cleanup_phase = 0; ENTRY; if (exp == NULL) { @@ -251,7 +259,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, lock = ldlm_lock_create(ns, NULL, res_id, type, mode, blocking, completion, glimpse, data, lvb_len); if (lock == NULL) - GOTO(out_nolock, rc = -ENOMEM); + RETURN(-ENOMEM); /* for the local lock, add the reference */ ldlm_lock_addref_internal(lock, mode); ldlm_lock2handle(lock, lockh); @@ -264,11 +272,14 @@ int ldlm_cli_enqueue(struct obd_export *exp, LDLM_DEBUG(lock, "client-side enqueue START"); } + /* lock not sent to server yet */ + cleanup_phase = 2; + if (req == NULL) { req = ptlrpc_prep_req(class_exp2cliimp(exp), LDLM_ENQUEUE, 1, size, NULL); if (req == NULL) - GOTO(out_lock, rc = -ENOMEM); + GOTO(cleanup, rc = -ENOMEM); req_passed_in = 0; } else if (req->rq_reqmsg->buflens[0] != sizeof(*body)) LBUG(); @@ -312,21 +323,29 @@ int ldlm_cli_enqueue(struct obd_export *exp, tmplvb = lustre_swab_repbuf(req, 1, lvb_len, lvb_swabber); if (tmplvb == NULL) - GOTO(out_lock, rc = -EPROTO); + GOTO(cleanup, rc = -EPROTO); if (lvb != NULL) memcpy(lvb, tmplvb, lvb_len); } } - GOTO(out_lock, rc); + GOTO(cleanup, rc); } reply = lustre_swab_repbuf(req, 0, sizeof(*reply), lustre_swab_ldlm_reply); if (reply == NULL) { CERROR("Can't unpack ldlm_reply\n"); - GOTO(out_lock, rc = -EPROTO); + GOTO(cleanup, rc = -EPROTO); } + /* XXX - Phil, wasn't sure if this shoiuld go before or after the + /* lustre_swab_repbuf() ? If we can't unpack the reply then we + /* don't know what occurred on the server so I think the safest + /* bet is to cleanup the lock as if it didn't make it ? */ + + /* lock enqueued on the server */ + cleanup_phase = 1; + memcpy(&lock->l_remote_handle, &reply->lock_handle, sizeof(lock->l_remote_handle)); *flags = reply->lock_flags; @@ -347,7 +366,9 @@ int ldlm_cli_enqueue(struct obd_export *exp, } if (reply->lock_desc.l_resource.lr_name.name[0] != - lock->l_resource->lr_name.name[0]) { + lock->l_resource->lr_name.name[0] || + reply->lock_desc.l_resource.lr_name.name[1] != + lock->l_resource->lr_name.name[1]) { CDEBUG(D_INFO, "remote intent success, locking %ld " "instead of %ld\n", (long)reply->lock_desc.l_resource.lr_name.name[0], @@ -357,7 +378,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, reply->lock_desc.l_resource.lr_name); if (lock->l_resource == NULL) { LBUG(); - GOTO(out_lock, rc = -ENOMEM); + GOTO(cleanup, rc = -ENOMEM); } LDLM_DEBUG(lock, "client-side enqueue, new resource"); } @@ -382,7 +403,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, void *tmplvb; tmplvb = lustre_swab_repbuf(req, 1, lvb_len, lvb_swabber); if (tmplvb == NULL) - GOTO(out_lock, rc = -EPROTO); + GOTO(cleanup, rc = -EPROTO); memcpy(lock->l_lvb_data, tmplvb, lvb_len); } @@ -403,13 +424,17 @@ int ldlm_cli_enqueue(struct obd_export *exp, LDLM_DEBUG(lock, "client-side enqueue END"); EXIT; - out_lock: - if (rc) - failed_lock_cleanup(ns, lock, lockh, mode); - if (!req_passed_in && req != NULL) - ptlrpc_req_finished(req); +cleanup: + switch (cleanup_phase) { + case 2: + if (rc) + failed_lock_cleanup(ns, lock, lockh, mode); + case 1: + if (!req_passed_in && req != NULL) + ptlrpc_req_finished(req); + } + LDLM_LOCK_PUT(lock); - out_nolock: return rc; } @@ -552,9 +577,11 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) rc = ptlrpc_queue_wait(req); if (rc == ESTALE) { - CERROR("client/server (nid "LPU64") out of sync--not " - "fatal\n", - req->rq_import->imp_connection->c_peer.peer_nid); + char str[PTL_NALFMT_SIZE]; + CERROR("client/server (nid %s) out of sync" + " -- not fatal\n", + ptlrpc_peernid2str(&req->rq_import-> + imp_connection->c_peer, str)); } else if (rc == -ETIMEDOUT) { ptlrpc_req_finished(req); GOTO(restart, rc); @@ -583,10 +610,16 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) return rc; } -int ldlm_cancel_lru(struct ldlm_namespace *ns) +/* when called with LDLM_ASYNC the blocking callback will be handled + * in a thread and this function will return after the thread has been + * asked to call the callback. when called with LDLM_SYNC the blocking + * callback will be performed in this function. */ +int ldlm_cancel_lru(struct ldlm_namespace *ns, ldlm_sync_t sync) { struct list_head *tmp, *next; + struct ldlm_lock *lock; int count, rc = 0; + LIST_HEAD(cblist); ENTRY; l_lock(&ns->ns_lock); @@ -598,7 +631,7 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns) } list_for_each_safe(tmp, next, &ns->ns_unused_list) { - struct ldlm_lock *lock; + lock = list_entry(tmp, struct ldlm_lock, l_lru); LASSERT(!lock->l_readers && !lock->l_writers); @@ -612,12 +645,21 @@ int ldlm_cancel_lru(struct ldlm_namespace *ns) LDLM_LOCK_GET(lock); /* dropped by bl thread */ ldlm_lock_remove_from_lru(lock); - ldlm_bl_to_thread(ns, NULL, lock); + if (sync == LDLM_ASYNC) + ldlm_bl_to_thread(ns, NULL, lock); + else + list_add(&lock->l_lru, &cblist); if (--count == 0) break; } l_unlock(&ns->ns_lock); + + list_for_each_safe(tmp, next, &cblist) { + lock = list_entry(tmp, struct ldlm_lock, l_lru); + list_del_init(&lock->l_lru); + ldlm_handle_bl_callback(ns, NULL, lock); + } RETURN(rc); } @@ -645,15 +687,14 @@ static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, if (opaque != NULL && lock->l_ast_data != opaque) { LDLM_ERROR(lock, "data %p doesn't match opaque %p", lock->l_ast_data, opaque); - //LBUG(); continue; } if (lock->l_readers || lock->l_writers) { - if (flags & LDLM_FL_WARN) { + if (flags & LDLM_FL_CONFIG_CHANGE) + lock->l_flags |= LDLM_FL_CBPENDING; + else if (flags & LDLM_FL_WARN) LDLM_ERROR(lock, "lock in use"); - //LBUG(); - } continue; } @@ -665,11 +706,6 @@ static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, w->w_lock = LDLM_LOCK_GET(lock); - /* Prevent the cancel callback from being called by setting - * LDLM_FL_CANCEL in the lock. Very sneaky. -p */ - if (flags & LDLM_FL_NO_CALLBACK) - w->w_lock->l_flags |= LDLM_FL_CANCEL; - list_add(&w->w_list, &list); } l_unlock(&ns->ns_lock); @@ -697,17 +733,32 @@ static int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, RETURN(0); } +static inline int have_no_nsresource(struct ldlm_namespace *ns) +{ + int no_resource = 0; + + spin_lock(&ns->ns_counter_lock); + if (ns->ns_resources == 0) + no_resource = 1; + spin_unlock(&ns->ns_counter_lock); + + RETURN(no_resource); +} + /* Cancel all locks on a namespace (or a specific resource, if given) * that have 0 readers/writers. * * If flags & LDLM_FL_LOCAL_ONLY, throw the locks away without trying * to notify the server. * If flags & LDLM_FL_NO_CALLBACK, don't run the cancel callback. - * If flags & LDLM_FL_WARN, print a warning if some locks are still in use. */ + * If flags & LDLM_FL_WARN, print a warning if some locks are still in use. + * If flags & LDLM_FL_CONFIG_CHANGE, mark all locks as having a pending callback + */ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, struct ldlm_res_id *res_id, int flags, void *opaque) { int i; + struct l_wait_info lwi = { 0 }; ENTRY; if (ns == NULL) @@ -719,23 +770,28 @@ int ldlm_cli_cancel_unused(struct ldlm_namespace *ns, l_lock(&ns->ns_lock); for (i = 0; i < RES_HASH_SIZE; i++) { - struct list_head *tmp, *pos; - list_for_each_safe(tmp, pos, &(ns->ns_hash[i])) { + struct list_head *tmp, *next; + list_for_each_safe(tmp, next, &(ns->ns_hash[i])) { int rc; struct ldlm_resource *res; res = list_entry(tmp, struct ldlm_resource, lr_hash); ldlm_resource_getref(res); + l_unlock(&ns->ns_lock); rc = ldlm_cli_cancel_unused_resource(ns, res->lr_name, flags, opaque); - if (rc) CERROR("cancel_unused_res ("LPU64"): %d\n", res->lr_name.name[0], rc); + + l_lock(&ns->ns_lock); + next = tmp->next; ldlm_resource_putref(res); } } l_unlock(&ns->ns_lock); + if (flags & LDLM_FL_CONFIG_CHANGE) + l_wait_event(ns->ns_waitq, have_no_nsresource(ns), &lwi); RETURN(ELDLM_OK); } @@ -965,6 +1021,7 @@ int ldlm_replay_locks(struct obd_import *imp) INIT_LIST_HEAD(&list); LASSERT(atomic_read(&imp->imp_replay_inflight) == 0); + LASSERT(ns != NULL); /* ensure this doesn't fall to 0 before all have been queued */ atomic_inc(&imp->imp_replay_inflight);