X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fldlm%2Fldlm_request.c;h=95b2f6368dc65b643039f670753fd8285743f7ad;hb=a1052417b78bc18898161c4ed44d79de4a1a2f23;hp=bc78f928a263a2c9685040fe28cc058f65394784;hpb=a5b6ed491a5fcbe593f0fb8162b7be11d07ba4de;p=fs%2Flustre-release.git diff --git a/lustre/ldlm/ldlm_request.c b/lustre/ldlm/ldlm_request.c index bc78f92..95b2f63 100644 --- a/lustre/ldlm/ldlm_request.c +++ b/lustre/ldlm/ldlm_request.c @@ -27,7 +27,7 @@ * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2010, 2012, Intel Corporation. + * Copyright (c) 2010, 2013, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -105,12 +105,17 @@ int ldlm_expired_completion_wait(void *data) if (ptlrpc_check_suspend()) RETURN(0); - LDLM_ERROR(lock, "lock timed out (enqueued at "CFS_TIME_T", " - CFS_DURATION_T"s ago); not entering recovery in " - "server code, just going back to sleep", - lock->l_last_activity, - cfs_time_sub(cfs_time_current_sec(), - lock->l_last_activity)); + LCONSOLE_WARN("lock timed out (enqueued at "CFS_TIME_T", " + CFS_DURATION_T"s ago)\n", + lock->l_last_activity, + cfs_time_sub(cfs_time_current_sec(), + lock->l_last_activity)); + LDLM_DEBUG(lock, "lock timed out (enqueued at "CFS_TIME_T", " + CFS_DURATION_T"s ago); not entering recovery in " + "server code, just going back to sleep", + lock->l_last_activity, + cfs_time_sub(cfs_time_current_sec(), + lock->l_last_activity)); if (cfs_time_after(cfs_time_current(), next_dump)) { last_dump = next_dump; next_dump = cfs_time_shift(300); @@ -156,24 +161,24 @@ EXPORT_SYMBOL(ldlm_get_enq_timeout); */ static int ldlm_completion_tail(struct ldlm_lock *lock) { - long delay; - int result; - - if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED) { - LDLM_DEBUG(lock, "client-side enqueue: destroyed"); - result = -EIO; - } else { - delay = cfs_time_sub(cfs_time_current_sec(), - lock->l_last_activity); - LDLM_DEBUG(lock, "client-side enqueue: granted after " - CFS_DURATION_T"s", delay); - - /* Update our time estimate */ - at_measured(ldlm_lock_to_ns_at(lock), - delay); - result = 0; - } - return result; + long delay; + int result; + + if (lock->l_flags & (LDLM_FL_DESTROYED | LDLM_FL_FAILED)) { + LDLM_DEBUG(lock, "client-side enqueue: destroyed"); + result = -EIO; + } else { + delay = cfs_time_sub(cfs_time_current_sec(), + lock->l_last_activity); + LDLM_DEBUG(lock, "client-side enqueue: granted after " + CFS_DURATION_T"s", delay); + + /* Update our time estimate */ + at_measured(ldlm_lock_to_ns_at(lock), + delay); + result = 0; + } + return result; } /** @@ -327,7 +332,7 @@ int ldlm_blocking_ast_nocheck(struct ldlm_lock *lock) LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel"); ldlm_lock2handle(lock, &lockh); - rc = ldlm_cli_cancel(&lockh); + rc = ldlm_cli_cancel(&lockh, LCF_ASYNC); if (rc < 0) CERROR("ldlm_cli_cancel: %d\n", rc); } else { @@ -619,33 +624,27 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, lock->l_req_mode = newmode; } - if (memcmp(reply->lock_desc.l_resource.lr_name.name, - lock->l_resource->lr_name.name, - sizeof(struct ldlm_res_id))) { - CDEBUG(D_INFO, "remote intent success, locking " - "(%ld,%ld,%ld) instead of " - "(%ld,%ld,%ld)\n", - (long)reply->lock_desc.l_resource.lr_name.name[0], - (long)reply->lock_desc.l_resource.lr_name.name[1], - (long)reply->lock_desc.l_resource.lr_name.name[2], - (long)lock->l_resource->lr_name.name[0], - (long)lock->l_resource->lr_name.name[1], - (long)lock->l_resource->lr_name.name[2]); - - rc = ldlm_lock_change_resource(ns, lock, - &reply->lock_desc.l_resource.lr_name); - if (rc || lock->l_resource == NULL) - GOTO(cleanup, rc = -ENOMEM); - LDLM_DEBUG(lock, "client-side enqueue, new resource"); - } - if (with_policy) - if (!(type == LDLM_IBITS && !(exp->exp_connect_flags & - OBD_CONNECT_IBITS))) - /* We assume lock type cannot change on server*/ - ldlm_convert_policy_to_local(exp, - lock->l_resource->lr_type, - &reply->lock_desc.l_policy_data, - &lock->l_policy_data); + if (!ldlm_res_eq(&reply->lock_desc.l_resource.lr_name, + &lock->l_resource->lr_name)) { + CDEBUG(D_INFO, "remote intent success, locking "DLDLMRES + " instead of "DLDLMRES"\n", + PLDLMRES(&reply->lock_desc.l_resource), + PLDLMRES(lock->l_resource)); + + rc = ldlm_lock_change_resource(ns, lock, + &reply->lock_desc.l_resource.lr_name); + if (rc || lock->l_resource == NULL) + GOTO(cleanup, rc = -ENOMEM); + LDLM_DEBUG(lock, "client-side enqueue, new resource"); + } + if (with_policy) + if (!(type == LDLM_IBITS && + !(exp_connect_flags(exp) & OBD_CONNECT_IBITS))) + /* We assume lock type cannot change on server*/ + ldlm_convert_policy_to_local(exp, + lock->l_resource->lr_type, + &reply->lock_desc.l_policy_data, + &lock->l_policy_data); if (type != LDLM_PLAIN) LDLM_DEBUG(lock,"client-side enqueue, new policy data"); } @@ -673,8 +672,10 @@ int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, rc = ldlm_fill_lvb(lock, &req->rq_pill, RCL_SERVER, lock->l_lvb_data, size); unlock_res_and_lock(lock); - if (rc < 0) + if (rc < 0) { + cleanup_phase = 1; GOTO(cleanup, rc); + } } if (!is_replay) { @@ -716,7 +717,7 @@ static inline int ldlm_req_handles_avail(int req_size, int off) { int avail; - avail = min_t(int, LDLM_MAXREQSIZE, CFS_PAGE_SIZE - 512) - req_size; + avail = min_t(int, LDLM_MAXREQSIZE, PAGE_CACHE_SIZE - 512) - req_size; if (likely(avail >= 0)) avail /= (int)sizeof(struct lustre_handle); else @@ -823,6 +824,28 @@ int ldlm_prep_enqueue_req(struct obd_export *exp, struct ptlrpc_request *req, } EXPORT_SYMBOL(ldlm_prep_enqueue_req); +struct ptlrpc_request *ldlm_enqueue_pack(struct obd_export *exp, int lvb_len) +{ + struct ptlrpc_request *req; + int rc; + ENTRY; + + req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_LDLM_ENQUEUE); + if (req == NULL) + RETURN(ERR_PTR(-ENOMEM)); + + rc = ldlm_prep_enqueue_req(exp, req, NULL, 0); + if (rc) { + ptlrpc_request_free(req); + RETURN(ERR_PTR(rc)); + } + + req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, lvb_len); + ptlrpc_request_set_replen(req); + RETURN(req); +} +EXPORT_SYMBOL(ldlm_enqueue_pack); + /** * Client-side lock enqueue. * @@ -861,17 +884,16 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, LDLM_DEBUG(lock, "client-side enqueue START"); LASSERT(exp == lock->l_conn_export); } else { - const struct ldlm_callback_suite cbs = { - .lcs_completion = einfo->ei_cb_cp, - .lcs_blocking = einfo->ei_cb_bl, - .lcs_glimpse = einfo->ei_cb_gl, - .lcs_weigh = einfo->ei_cb_wg - }; - lock = ldlm_lock_create(ns, res_id, einfo->ei_type, - einfo->ei_mode, &cbs, einfo->ei_cbdata, + const struct ldlm_callback_suite cbs = { + .lcs_completion = einfo->ei_cb_cp, + .lcs_blocking = einfo->ei_cb_bl, + .lcs_glimpse = einfo->ei_cb_gl + }; + lock = ldlm_lock_create(ns, res_id, einfo->ei_type, + einfo->ei_mode, &cbs, einfo->ei_cbdata, lvb_len, lvb_type); - if (lock == NULL) - RETURN(-ENOMEM); + if (lock == NULL) + RETURN(-ENOMEM); /* for the local lock, add the reference */ ldlm_lock_addref_internal(lock, einfo->ei_mode); ldlm_lock2handle(lock, lockh); @@ -882,7 +904,8 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, * inodebits lock internally with both bits set. */ if (einfo->ei_type == LDLM_IBITS && - !(exp->exp_connect_flags & OBD_CONNECT_IBITS)) + !(exp_connect_flags(exp) & + OBD_CONNECT_IBITS)) lock->l_policy_data.l_inodebits.bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE; @@ -899,7 +922,7 @@ int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, lock->l_conn_export = exp; lock->l_export = NULL; lock->l_blocking_ast = einfo->ei_cb_bl; - lock->l_flags |= (*flags & LDLM_FL_NO_LRU); + lock->l_flags |= (*flags & (LDLM_FL_NO_LRU | LDLM_FL_EXCL)); /* lock not sent to server yet */ @@ -997,7 +1020,7 @@ static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode, ldlm_reprocess_all(res); rc = 0; } else { - rc = EDEADLOCK; + rc = LUSTRE_EDEADLK; } LDLM_DEBUG(lock, "client-side local convert handler END"); LDLM_LOCK_PUT(lock); @@ -1069,7 +1092,7 @@ int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, __u32 *flags) GOTO(out, rc); } } else { - rc = EDEADLOCK; + rc = LUSTRE_EDEADLK; } EXIT; out: @@ -1222,7 +1245,7 @@ int ldlm_cli_cancel_req(struct obd_export *exp, cfs_list_t *cancels, } else { rc = ptlrpc_queue_wait(req); } - if (rc == ESTALE) { + if (rc == LUSTRE_ESTALE) { CDEBUG(D_DLMTRACE, "client/server (nid %s) " "out of sync -- not fatal\n", libcfs_nid2str(req->rq_import-> @@ -1233,10 +1256,11 @@ int ldlm_cli_cancel_req(struct obd_export *exp, cfs_list_t *cancels, ptlrpc_req_finished(req); continue; } else if (rc != ELDLM_OK) { - if (rc != -ESHUTDOWN) - CERROR("Got rc %d from cancel RPC: canceling " - "anyway\n", rc); - break; + /* -ESHUTDOWN is common on umount */ + CDEBUG_LIMIT(rc == -ESHUTDOWN ? D_DLMTRACE : D_ERROR, + "Got rc %d from cancel RPC: " + "canceling anyway\n", rc); + break; } sent = count; break; @@ -1310,7 +1334,8 @@ EXPORT_SYMBOL(ldlm_cli_update_pool); * * Lock must not have any readers or writers by this time. */ -int ldlm_cli_cancel(struct lustre_handle *lockh) +int ldlm_cli_cancel(struct lustre_handle *lockh, + ldlm_cancel_flags_t cancel_flags) { struct obd_export *exp; int avail, flags, count = 1; @@ -1328,10 +1353,10 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) } rc = ldlm_cli_cancel_local(lock); - if (rc == LDLM_FL_LOCAL_ONLY) { - LDLM_LOCK_RELEASE(lock); + if (rc == LDLM_FL_LOCAL_ONLY || cancel_flags & LCF_LOCAL) { + LDLM_LOCK_RELEASE(lock); RETURN(0); - } + } /* Even if the lock is marked as LDLM_FL_BL_AST, this is a LDLM_CANCEL * RPC which goes to canceld portal, so we can cancel other LRU locks * here and send them all as one LDLM_CANCEL RPC. */ @@ -1351,7 +1376,7 @@ int ldlm_cli_cancel(struct lustre_handle *lockh) count += ldlm_cancel_lru_local(ns, &cancels, 0, avail - 1, LCF_BL_AST, flags); } - ldlm_cli_cancel_list(&cancels, count, NULL, 0); + ldlm_cli_cancel_list(&cancels, count, NULL, cancel_flags); RETURN(0); } EXPORT_SYMBOL(ldlm_cli_cancel); @@ -1743,29 +1768,30 @@ int ldlm_cancel_lru_local(struct ldlm_namespace *ns, cfs_list_t *cancels, /** * Cancel at least \a nr locks from given namespace LRU. * - * When called with LDLM_ASYNC the blocking callback will be handled + * When called with LCF_ASYNC the blocking callback will be handled * in a thread and this function will return after the thread has been - * asked to call the callback. When called with LDLM_SYNC the blocking + * asked to call the callback. When called with LCF_ASYNC the blocking * callback will be performed in this function. */ -int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, ldlm_sync_t mode, - int flags) +int ldlm_cancel_lru(struct ldlm_namespace *ns, int nr, + ldlm_cancel_flags_t cancel_flags, + int flags) { - CFS_LIST_HEAD(cancels); - int count, rc; - ENTRY; + CFS_LIST_HEAD(cancels); + int count, rc; + ENTRY; #ifndef __KERNEL__ - mode = LDLM_SYNC; /* force to be sync in user space */ + cancel_flags &= ~LCF_ASYNC; /* force to be sync in user space */ #endif - /* Just prepare the list of locks, do not actually cancel them yet. - * Locks are cancelled later in a separate thread. */ - count = ldlm_prepare_lru_list(ns, &cancels, nr, 0, flags); - rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count, mode); - if (rc == 0) - RETURN(count); - - RETURN(0); + /* Just prepare the list of locks, do not actually cancel them yet. + * Locks are cancelled later in a separate thread. */ + count = ldlm_prepare_lru_list(ns, &cancels, nr, 0, flags); + rc = ldlm_bl_to_thread_list(ns, NULL, &cancels, count, cancel_flags); + if (rc == 0) + RETURN(count); + + RETURN(0); } /** @@ -1870,11 +1896,11 @@ int ldlm_cli_cancel_list(cfs_list_t *cancels, int count, cancels, 1, flags); } - if (res < 0) { - if (res != -ESHUTDOWN) - CERROR("ldlm_cli_cancel_list: %d\n", res); - res = count; - } + if (res < 0) { + CDEBUG_LIMIT(res == -ESHUTDOWN ? D_DLMTRACE : D_ERROR, + "ldlm_cli_cancel_list: %d\n", res); + res = count; + } count -= res; ldlm_lock_list_put(cancels, l_bl_ast, res); @@ -1914,7 +1940,8 @@ int ldlm_cli_cancel_unused_resource(struct ldlm_namespace *ns, 0, flags | LCF_BL_AST, opaque); rc = ldlm_cli_cancel_list(&cancels, count, NULL, flags); if (rc != ELDLM_OK) - CERROR("ldlm_cli_cancel_unused_resource: %d\n", rc); + CERROR("canceling unused lock "DLDLMRES": rc = %d\n", + PLDLMRES(res), rc); LDLM_RESOURCE_DELREF(res); ldlm_resource_putref(res); @@ -1928,21 +1955,16 @@ struct ldlm_cli_cancel_arg { }; static int ldlm_cli_hash_cancel_unused(cfs_hash_t *hs, cfs_hash_bd_t *bd, - cfs_hlist_node_t *hnode, void *arg) + cfs_hlist_node_t *hnode, void *arg) { - struct ldlm_resource *res = cfs_hash_object(hs, hnode); - struct ldlm_cli_cancel_arg *lc = arg; - int rc; - - rc = ldlm_cli_cancel_unused_resource(ldlm_res_to_ns(res), &res->lr_name, - NULL, LCK_MINMODE, - lc->lc_flags, lc->lc_opaque); - if (rc != 0) { - CERROR("ldlm_cli_cancel_unused ("LPU64"): %d\n", - res->lr_name.name[0], rc); - } - /* must return 0 for hash iteration */ - return 0; + struct ldlm_resource *res = cfs_hash_object(hs, hnode); + struct ldlm_cli_cancel_arg *lc = arg; + + ldlm_cli_cancel_unused_resource(ldlm_res_to_ns(res), &res->lr_name, + NULL, LCK_MINMODE, lc->lc_flags, + lc->lc_opaque); + /* must return 0 for hash iteration */ + return 0; } /**