From: Fan Yong Date: Fri, 6 Jan 2012 06:49:42 +0000 (+0800) Subject: LU-925 agl: async glimpse lock process in CLIO stack X-Git-Tag: 2.1.54~5 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=6f5813d36102a19f314c9aab409972e8a9f1112b LU-925 agl: async glimpse lock process in CLIO stack Adjust CLIO lock state machine for supporting: 1. unuse lock in non-hold state. 2. re-enqueue non-granted glimpse lock. Signed-off-by: Fan Yong Change-Id: I9de8939a398d7b4c7062e6c5859bca06deddd089 Reviewed-on: http://review.whamcloud.com/1243 Reviewed-by: Niu Yawei Tested-by: Hudson Reviewed-by: Jinshan Xiong Tested-by: Maloo Reviewed-by: Oleg Drokin --- diff --git a/lustre/include/cl_object.h b/lustre/include/cl_object.h index cae226b..b4c9bd8 100644 --- a/lustre/include/cl_object.h +++ b/lustre/include/cl_object.h @@ -1616,9 +1616,11 @@ struct cl_lock_slice { */ enum cl_lock_transition { /** operation cannot be completed immediately. Wait for state change. */ - CLO_WAIT = 1, + CLO_WAIT = 1, /** operation had to release lock mutex, restart. */ - CLO_REPEAT = 2 + CLO_REPEAT = 2, + /** lower layer re-enqueued. */ + CLO_REENQUEUED = 3, }; /** @@ -2155,9 +2157,17 @@ enum cl_enq_flags { */ CEF_NEVER = 0x00000010, /** + * for async glimpse lock. + */ + CEF_AGL = 0x00000020, + /** + * do not trigger re-enqueue. + */ + CEF_NO_REENQUEUE = 0x00000040, + /** * mask of enq_flags. */ - CEF_MASK = 0x0000001f + CEF_MASK = 0x0000007f, }; /** diff --git a/lustre/include/lclient.h b/lustre/include/lclient.h index dd6721a..68b49e1 100644 --- a/lustre/include/lclient.h +++ b/lustre/include/lclient.h @@ -28,6 +28,8 @@ /* * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * + * Copyright (c) 2011 Whamcloud, Inc. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -45,9 +47,19 @@ blkcnt_t dirty_cnt(struct inode *inode); -int cl_glimpse_size(struct inode *inode); +int cl_glimpse_size0(struct inode *inode, int agl); int cl_glimpse_lock(const struct lu_env *env, struct cl_io *io, - struct inode *inode, struct cl_object *clob); + struct inode *inode, struct cl_object *clob, int agl); + +static inline int cl_glimpse_size(struct inode *inode) +{ + return cl_glimpse_size0(inode, 0); +} + +static inline int cl_agl(struct inode *inode) +{ + return cl_glimpse_size0(inode, 1); +} /** * Locking policy for setattr. diff --git a/lustre/include/lustre_dlm.h b/lustre/include/lustre_dlm.h index cf9c70d..1d4a2a7 100644 --- a/lustre/include/lustre_dlm.h +++ b/lustre/include/lustre_dlm.h @@ -122,14 +122,15 @@ typedef enum { #define LDLM_FL_HAS_INTENT 0x001000 /* lock request has intent */ #define LDLM_FL_CANCELING 0x002000 /* lock cancel has already been sent */ #define LDLM_FL_LOCAL 0x004000 /* local lock (ie, no srv/cli split) */ -/* was LDLM_FL_WARN until 2.0.0 0x008000 */ #define LDLM_FL_DISCARD_DATA 0x010000 /* discard (no writeback) on cancel */ #define LDLM_FL_NO_TIMEOUT 0x020000 /* Blocked by group lock - wait * indefinitely */ /* file & record locking */ -#define LDLM_FL_BLOCK_NOWAIT 0x040000 // server told not to wait if blocked +#define LDLM_FL_BLOCK_NOWAIT 0x040000 /* server told not to wait if blocked. + * For AGL, OST will not send glimpse + * callback. */ #define LDLM_FL_TEST_LOCK 0x080000 // return blocking lock /* XXX FIXME: This is being added to b_size as a low-risk fix to the fact that @@ -176,8 +177,6 @@ typedef enum { * w/o involving separate thread. in order to decrease cs rate */ #define LDLM_FL_ATOMIC_CB 0x4000000 -/* was LDLM_FL_ASYNC until 2.0.0 0x8000000 */ - /* It may happen that a client initiate 2 operations, e.g. unlink and mkdir, * such that server send blocking ast for conflict locks to this client for * the 1st operation, whereas the 2nd operation has canceled this lock and @@ -736,6 +735,7 @@ struct ldlm_lock { * Client-side-only members. */ + int l_fail_value; /** * Temporary storage for an LVB received during an enqueue operation. */ @@ -763,6 +763,7 @@ struct ldlm_lock { */ __u32 l_pid; + int l_bl_ast_run; /** * For ldlm_add_ast_work_item(). */ @@ -777,7 +778,6 @@ struct ldlm_lock { cfs_list_t l_rk_ast; struct ldlm_lock *l_blocking_lock; - int l_bl_ast_run; /** * Protected by lr_lock, linkages to "skip lists". @@ -1078,6 +1078,8 @@ void ldlm_lock_addref(struct lustre_handle *lockh, __u32 mode); int ldlm_lock_addref_try(struct lustre_handle *lockh, __u32 mode); void ldlm_lock_decref(struct lustre_handle *lockh, __u32 mode); void ldlm_lock_decref_and_cancel(struct lustre_handle *lockh, __u32 mode); +void ldlm_lock_fail_match_locked(struct ldlm_lock *lock, int rc); +void ldlm_lock_fail_match(struct ldlm_lock *lock, int rc); void ldlm_lock_allow_match(struct ldlm_lock *lock); void ldlm_lock_allow_match_locked(struct ldlm_lock *lock); ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags, diff --git a/lustre/include/obd_ost.h b/lustre/include/obd_ost.h index 81b5f44..08b470a 100644 --- a/lustre/include/obd_ost.h +++ b/lustre/include/obd_ost.h @@ -77,6 +77,7 @@ struct osc_enqueue_args { struct ost_lvb *oa_lvb; struct lustre_handle *oa_lockh; struct ldlm_enqueue_info *oa_ei; + unsigned int oa_agl:1; }; #if 0 diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index 739f2e0..91678ec 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -28,6 +28,8 @@ /* * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * + * Copyright (c) 2011 Whamcloud, Inc. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -313,6 +315,8 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type, #define OBD_FAIL_LDLM_INTR_CP_AST 0x317 #define OBD_FAIL_LDLM_CP_BL_RACE 0x318 #define OBD_FAIL_LDLM_NEW_LOCK 0x319 +#define OBD_FAIL_LDLM_AGL_DELAY 0x31a +#define OBD_FAIL_LDLM_AGL_NOLOCK 0x31b /* LOCKLESS IO */ #define OBD_FAIL_LDLM_SET_CONTENTION 0x385 diff --git a/lustre/lclient/glimpse.c b/lustre/lclient/glimpse.c index c95ccce..a4f7f13 100644 --- a/lustre/lclient/glimpse.c +++ b/lustre/lclient/glimpse.c @@ -105,7 +105,7 @@ blkcnt_t dirty_cnt(struct inode *inode) } int cl_glimpse_lock(const struct lu_env *env, struct cl_io *io, - struct inode *inode, struct cl_object *clob) + struct inode *inode, struct cl_object *clob, int agl) { struct cl_lock_descr *descr = &ccc_env_info(env)->cti_descr; struct cl_inode_info *lli = cl_i2info(inode); @@ -136,6 +136,8 @@ int cl_glimpse_lock(const struct lu_env *env, struct cl_io *io, descr->cld_obj = clob; descr->cld_mode = CLM_PHANTOM; descr->cld_enq_flags = CEF_ASYNC | CEF_MUST; + if (agl) + descr->cld_enq_flags |= CEF_AGL; cio->cui_glimpse = 1; /* * CEF_ASYNC is used because glimpse sub-locks cannot @@ -149,9 +151,13 @@ int cl_glimpse_lock(const struct lu_env *env, struct cl_io *io, cfs_current()); cio->cui_glimpse = 0; + if (lock == NULL) + RETURN(0); + if (IS_ERR(lock)) RETURN(PTR_ERR(lock)); + LASSERT(agl == 0); result = cl_wait(env, lock); if (result == 0) { cl_merge_lvb(inode); @@ -200,7 +206,7 @@ static int cl_io_get(struct inode *inode, struct lu_env **envout, return result; } -int cl_glimpse_size(struct inode *inode) +int cl_glimpse_size0(struct inode *inode, int agl) { /* * We don't need ast_flags argument to cl_glimpse_size(), because @@ -229,7 +235,8 @@ int cl_glimpse_size(struct inode *inode) */ result = io->ci_result; else if (result == 0) - result = cl_glimpse_lock(env, io, inode, io->ci_obj); + result = cl_glimpse_lock(env, io, inode, io->ci_obj, + agl); cl_io_fini(env, io); cl_env_put(env, &refcheck); } diff --git a/lustre/lclient/lcommon_cl.c b/lustre/lclient/lcommon_cl.c index 792dc2c..01b8459 100644 --- a/lustre/lclient/lcommon_cl.c +++ b/lustre/lclient/lcommon_cl.c @@ -894,7 +894,7 @@ int ccc_prep_size(const struct lu_env *env, struct cl_object *obj, * of the buffer (C) */ ccc_object_size_unlock(obj); - result = cl_glimpse_lock(env, io, inode, obj); + result = cl_glimpse_lock(env, io, inode, obj, 0); if (result == 0 && exceed != NULL) { /* If objective page index exceed end-of-file * page index, return directly. Do not expect diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index eaaaa43..9d95c13 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -1051,7 +1051,8 @@ static struct ldlm_lock *search_queue(cfs_list_t *queue, continue; if (!unref && - (lock->l_destroyed || (lock->l_flags & LDLM_FL_FAILED))) + (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED || + lock->l_fail_value != 0)) continue; if ((flags & LDLM_FL_LOCAL_ONLY) && @@ -1071,6 +1072,23 @@ static struct ldlm_lock *search_queue(cfs_list_t *queue, return NULL; } +void ldlm_lock_fail_match_locked(struct ldlm_lock *lock, int rc) +{ + if (lock->l_fail_value == 0) { + lock->l_fail_value = rc; + cfs_waitq_signal(&lock->l_waitq); + } +} +EXPORT_SYMBOL(ldlm_lock_fail_match_locked); + +void ldlm_lock_fail_match(struct ldlm_lock *lock, int rc) +{ + lock_res_and_lock(lock); + ldlm_lock_fail_match_locked(lock, rc); + unlock_res_and_lock(lock); +} +EXPORT_SYMBOL(ldlm_lock_fail_match); + void ldlm_lock_allow_match_locked(struct ldlm_lock *lock) { lock->l_flags |= LDLM_FL_LVB_READY; @@ -1183,7 +1201,16 @@ ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, int flags, /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */ l_wait_event(lock->l_waitq, - (lock->l_flags & LDLM_FL_LVB_READY), &lwi); + lock->l_flags & LDLM_FL_LVB_READY || + lock->l_fail_value != 0, + &lwi); + if (!(lock->l_flags & LDLM_FL_LVB_READY)) { + if (flags & LDLM_FL_TEST_LOCK) + LDLM_LOCK_RELEASE(lock); + else + ldlm_lock_decref_internal(lock, mode); + rc = 0; + } } } out2: @@ -1231,7 +1258,8 @@ ldlm_mode_t ldlm_revalidate_lock_handle(struct lustre_handle *lockh, lock = ldlm_handle2lock(lockh); if (lock != NULL) { lock_res_and_lock(lock); - if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED) + if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED || + lock->l_fail_value != 0) GOTO(out, mode); if (lock->l_flags & LDLM_FL_CBPENDING && diff --git a/lustre/llite/statahead.c b/lustre/llite/statahead.c index 537b44d..4f75f43 100644 --- a/lustre/llite/statahead.c +++ b/lustre/llite/statahead.c @@ -571,10 +571,6 @@ static void ll_sai_put(struct ll_statahead_info *sai) EXIT; } -#ifndef HAVE_AGL_SUPPORT -# define cl_agl(inode) do {} while (0) -#endif - /* Do NOT forget to drop inode refcount when into sai_entries_agl. */ static void ll_agl_trigger(struct inode *inode, struct ll_statahead_info *sai) { diff --git a/lustre/lov/lov_lock.c b/lustre/lov/lov_lock.c index d633309..27310d1 100644 --- a/lustre/lov/lov_lock.c +++ b/lustre/lov/lov_lock.c @@ -28,6 +28,8 @@ /* * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. + * + * Copyright (c) 2011 Whamcloud, Inc. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -267,12 +269,12 @@ static int lov_subresult(int result, int rc) int result_rank; int rc_rank; + ENTRY; + LASSERT(result <= 0 || result == CLO_REPEAT || result == CLO_WAIT); LASSERT(rc <= 0 || rc == CLO_REPEAT || rc == CLO_WAIT); CLASSERT(CLO_WAIT < CLO_REPEAT); - ENTRY; - /* calculate ranks in the ordering above */ result_rank = result < 0 ? 1 + CLO_REPEAT : result; rc_rank = rc < 0 ? 1 + CLO_REPEAT : rc; @@ -524,7 +526,7 @@ static int lov_lock_enqueue_one(const struct lu_env *env, struct lov_lock *lck, /* first, try to enqueue a sub-lock ... */ result = cl_enqueue_try(env, sublock, io, enqflags); - if (sublock->cll_state == CLS_ENQUEUED) + if ((sublock->cll_state == CLS_ENQUEUED) && !(enqflags & CEF_AGL)) /* if it is enqueued, try to `wait' on it---maybe it's already * granted */ result = cl_wait_try(env, sublock); @@ -533,8 +535,8 @@ static int lov_lock_enqueue_one(const struct lu_env *env, struct lov_lock *lck, * parallel, otherwise---enqueue has to wait until sub-lock is granted * before proceeding to the next one. */ - if (result == CLO_WAIT && sublock->cll_state <= CLS_HELD && - enqflags & CEF_ASYNC && !last) + if ((result == CLO_WAIT) && (sublock->cll_state <= CLS_HELD) && + (enqflags & CEF_ASYNC) && (!last || (enqflags & CEF_AGL))) result = 0; RETURN(result); } @@ -697,7 +699,21 @@ static int lov_lock_unuse(const struct lu_env *env, rc = lov_sublock_lock(env, lck, lls, closure, &subenv); if (rc == 0) { if (lls->sub_flags & LSF_HELD) { - LASSERT(sublock->cll_state == CLS_HELD); + LASSERT(sublock->cll_state == CLS_HELD || + sublock->cll_state == CLS_ENQUEUED); + /* For AGL case, the sublock state maybe not + * match the lower layer state, so sync them + * before unuse. */ + if (sublock->cll_users == 1 && + sublock->cll_state == CLS_ENQUEUED) { + __u32 save; + + save = sublock->cll_descr.cld_enq_flags; + sublock->cll_descr.cld_enq_flags |= + CEF_NO_REENQUEUE; + cl_wait_try(env, sublock); + sublock->cll_descr.cld_enq_flags = save; + } rc = cl_unuse_try(subenv->lse_env, sublock); rc = lov_sublock_release(env, lck, i, 0, rc); } @@ -789,12 +805,15 @@ static int lov_lock_wait(const struct lu_env *env, struct lov_lock *lck = cl2lov_lock(slice); struct cl_lock_closure *closure = lov_closure_get(env, slice->cls_lock); enum cl_lock_state minstate; + int reenqueued; int result; int i; ENTRY; - for (result = 0, minstate = CLS_FREEING, i = 0; i < lck->lls_nr; ++i) { +again: + for (result = 0, minstate = CLS_FREEING, i = 0, reenqueued = 0; + i < lck->lls_nr; ++i) { int rc; struct lovsub_lock *sub; struct cl_lock *sublock; @@ -814,10 +833,18 @@ static int lov_lock_wait(const struct lu_env *env, minstate = min(minstate, sublock->cll_state); lov_sublock_unlock(env, sub, closure, subenv); } + if (rc == CLO_REENQUEUED) { + reenqueued++; + rc = 0; + } result = lov_subresult(result, rc); if (result != 0) break; } + /* Each sublock only can be reenqueued once, so will not loop for + * ever. */ + if (result == 0 && reenqueued != 0) + goto again; cl_lock_closure_fini(closure); RETURN(result ?: minstate >= CLS_HELD ? 0 : CLO_WAIT); } @@ -863,6 +890,11 @@ static int lov_lock_use(const struct lu_env *env, if (rc != 0) rc = lov_sublock_release(env, lck, i, 1, rc); + } else if (sublock->cll_state == CLS_NEW) { + /* Sub-lock might have been canceled, while + * top-lock was cached. */ + result = -ESTALE; + lov_sublock_release(env, lck, i, 1, result); } lov_sublock_unlock(env, sub, closure, subenv); } diff --git a/lustre/obdclass/cl_lock.c b/lustre/obdclass/cl_lock.c index b8096b7..7d8fc62 100644 --- a/lustre/obdclass/cl_lock.c +++ b/lustre/obdclass/cl_lock.c @@ -925,7 +925,6 @@ static void cl_lock_hold_release(const struct lu_env *env, struct cl_lock *lock, EXIT; } - /** * Waits until lock state is changed. * @@ -1303,7 +1302,8 @@ static int cl_enqueue_locked(const struct lu_env *env, struct cl_lock *lock, cl_lock_user_del(env, lock); cl_lock_error(env, lock, result); } - LASSERT(ergo(result == 0, lock->cll_state == CLS_ENQUEUED || + LASSERT(ergo(result == 0 && !(enqflags & CEF_AGL), + lock->cll_state == CLS_ENQUEUED || lock->cll_state == CLS_HELD)); RETURN(result); } @@ -2150,25 +2150,34 @@ struct cl_lock *cl_lock_request(const struct lu_env *env, struct cl_io *io, ENTRY; do { lock = cl_lock_hold_mutex(env, io, need, scope, source); - if (!IS_ERR(lock)) { - rc = cl_enqueue_locked(env, lock, io, enqflags); - if (rc == 0) { - if (cl_lock_fits_into(env, lock, need, io)) { + if (IS_ERR(lock)) + break; + + rc = cl_enqueue_locked(env, lock, io, enqflags); + if (rc == 0) { + if (cl_lock_fits_into(env, lock, need, io)) { + if (!(enqflags & CEF_AGL)) { cl_lock_mutex_put(env, lock); - cl_lock_lockdep_acquire(env, - lock, enqflags); + cl_lock_lockdep_acquire(env, lock, + enqflags); break; } - cl_unuse_locked(env, lock); + rc = 1; } - cl_lock_trace(D_DLMTRACE, env, "enqueue failed", lock); - cl_lock_hold_release(env, lock, scope, source); - cl_lock_mutex_put(env, lock); - lu_ref_del(&lock->cll_reference, scope, source); - cl_lock_put(env, lock); + cl_unuse_locked(env, lock); + } + cl_lock_trace(D_DLMTRACE, env, + rc <= 0 ? "enqueue failed" : "agl succeed", lock); + cl_lock_hold_release(env, lock, scope, source); + cl_lock_mutex_put(env, lock); + lu_ref_del(&lock->cll_reference, scope, source); + cl_lock_put(env, lock); + if (rc > 0) { + LASSERT(enqflags & CEF_AGL); + lock = NULL; + } else if (rc != 0) { lock = ERR_PTR(rc); - } else - rc = PTR_ERR(lock); + } } while (rc == 0); RETURN(lock); } diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index ffd551e..9ba4d36 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -1693,7 +1693,6 @@ static int filter_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp, void *req_cookie, ldlm_mode_t mode, int flags, void *data) { - CFS_LIST_HEAD(rpc_list); struct ptlrpc_request *req = req_cookie; struct ldlm_lock *lock = *lockp, *l = NULL; struct ldlm_resource *res = lock->l_resource; @@ -1732,24 +1731,18 @@ static int filter_intent_policy(struct ldlm_namespace *ns, * lock, and should not be granted if the lock will be blocked. */ + if (flags & LDLM_FL_BLOCK_NOWAIT) { + OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_AGL_DELAY, 5); + + if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_AGL_NOLOCK)) + RETURN(ELDLM_LOCK_ABORTED); + } + LASSERT(ns == ldlm_res_to_ns(res)); lock_res(res); - rc = policy(lock, &tmpflags, 0, &err, &rpc_list); + rc = policy(lock, &tmpflags, 0, &err, NULL); check_res_locked(res); - /* FIXME: we should change the policy function slightly, to not make - * this list at all, since we just turn around and free it */ - while (!cfs_list_empty(&rpc_list)) { - struct ldlm_lock *wlock = - cfs_list_entry(rpc_list.next, struct ldlm_lock, - l_cp_ast); - LASSERT((lock->l_flags & LDLM_FL_AST_SENT) == 0); - LASSERT(lock->l_flags & LDLM_FL_CP_REQD); - lock->l_flags &= ~LDLM_FL_CP_REQD; - cfs_list_del_init(&wlock->l_cp_ast); - LDLM_LOCK_RELEASE(wlock); - } - /* The lock met with no resistance; we're finished. */ if (rc == LDLM_ITER_CONTINUE) { /* do not grant locks to the liblustre clients: they cannot @@ -1766,6 +1759,12 @@ static int filter_intent_policy(struct ldlm_namespace *ns, } unlock_res(res); RETURN(err); + } else if (flags & LDLM_FL_BLOCK_NOWAIT) { + /* LDLM_FL_BLOCK_NOWAIT means it is for AGL. Do not send glimpse + * callback for glimpse size. The real size user will trigger + * the glimpse callback when necessary. */ + unlock_res(res); + RETURN(ELDLM_LOCK_ABORTED); } /* Do not grant any lock, but instead send GL callbacks. The extent diff --git a/lustre/osc/osc_cl_internal.h b/lustre/osc/osc_cl_internal.h index 6478c26..b0fbef6 100644 --- a/lustre/osc/osc_cl_internal.h +++ b/lustre/osc/osc_cl_internal.h @@ -253,7 +253,11 @@ struct osc_lock { * granted. * Glimpse lock should be destroyed immediately after use. */ - ols_glimpse:1; + ols_glimpse:1, + /** + * For async glimpse lock. + */ + ols_agl:1; /** * IO that owns this lock. This field is used for a dead-lock * avoidance by osc_lock_enqueue_wait(). diff --git a/lustre/osc/osc_internal.h b/lustre/osc/osc_internal.h index 8e79f7a..1236240 100644 --- a/lustre/osc/osc_internal.h +++ b/lustre/osc/osc_internal.h @@ -132,7 +132,7 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, obd_enqueue_update_f upcall, void *cookie, struct ldlm_enqueue_info *einfo, struct lustre_handle *lockh, - struct ptlrpc_request_set *rqset, int async); + struct ptlrpc_request_set *rqset, int async, int agl); int osc_cancel_base(struct lustre_handle *lockh, __u32 mode); int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id, diff --git a/lustre/osc/osc_lock.c b/lustre/osc/osc_lock.c index 42dd2c4..9aa7f59 100644 --- a/lustre/osc/osc_lock.c +++ b/lustre/osc/osc_lock.c @@ -196,23 +196,32 @@ static int osc_lock_unuse(const struct lu_env *env, { struct osc_lock *ols = cl2osc_lock(slice); - LASSERT(ols->ols_state == OLS_GRANTED || - ols->ols_state == OLS_UPCALL_RECEIVED); LINVRNT(osc_lock_invariant(ols)); - if (ols->ols_glimpse) { - LASSERT(ols->ols_hold == 0); + switch (ols->ols_state) { + case OLS_NEW: + LASSERT(!ols->ols_hold); + LASSERT(ols->ols_agl); + return 0; + case OLS_UPCALL_RECEIVED: + LASSERT(!ols->ols_hold); + ols->ols_state = OLS_NEW; return 0; + case OLS_GRANTED: + LASSERT(!ols->ols_glimpse); + LASSERT(ols->ols_hold); + /* + * Move lock into OLS_RELEASED state before calling + * osc_cancel_base() so that possible synchronous cancellation + * (that always happens e.g., for liblustre) sees that lock is + * released. + */ + ols->ols_state = OLS_RELEASED; + return osc_lock_unhold(ols); + default: + CERROR("Impossible state: %d\n", ols->ols_state); + LBUG(); } - LASSERT(ols->ols_hold); - - /* - * Move lock into OLS_RELEASED state before calling osc_cancel_base() - * so that possible synchronous cancellation (that always happens - * e.g., for liblustre) sees that lock is released. - */ - ols->ols_state = OLS_RELEASED; - return osc_lock_unhold(ols); } static void osc_lock_fini(const struct lu_env *env, @@ -346,10 +355,8 @@ static void osc_lock_lvb_update(const struct lu_env *env, struct osc_lock *olck, ENTRY; - if (!(olck->ols_flags & LDLM_FL_LVB_READY)) { - EXIT; - return; - } + if (!(olck->ols_flags & LDLM_FL_LVB_READY)) + RETURN_EXIT; lvb = &olck->ols_lvb; obj = olck->ols_cl.cls_obj; @@ -528,6 +535,7 @@ static int osc_lock_upcall(void *cookie, int errcode) dlmlock->l_ast_data = NULL; olck->ols_handle.cookie = 0ULL; cfs_spin_unlock(&osc_ast_guard); + ldlm_lock_fail_match_locked(dlmlock, rc); unlock_res_and_lock(dlmlock); LDLM_LOCK_PUT(dlmlock); } @@ -556,17 +564,22 @@ static int osc_lock_upcall(void *cookie, int errcode) rc = 0; } - if (rc == 0) - /* on error, lock was signaled by cl_lock_error() */ + if (rc == 0) { cl_lock_signal(env, lock); - else + /* del user for lock upcall cookie */ + cl_unuse_try(env, lock); + } else { + /* del user for lock upcall cookie */ + cl_lock_user_del(env, lock); cl_lock_error(env, lock, rc); + } cl_lock_mutex_put(env, lock); /* release cookie reference, acquired by osc_lock_enqueue() */ lu_ref_del(&lock->cll_reference, "upcall", lock); cl_lock_put(env, lock); + cl_env_nested_put(&nest, env); } else /* should never happen, similar to osc_ldlm_blocking_ast(). */ @@ -1052,7 +1065,6 @@ static int osc_lock_enqueue_wait(const struct lu_env *env, ENTRY; LASSERT(cl_lock_is_mutexed(lock)); - LASSERT(lock->cll_state == CLS_QUEUING); /* make it enqueue anyway for glimpse lock, because we actually * don't need to cancel any conflicting locks. */ @@ -1156,10 +1168,14 @@ static int osc_lock_enqueue(const struct lu_env *env, ENTRY; LASSERT(cl_lock_is_mutexed(lock)); - LASSERT(lock->cll_state == CLS_QUEUING); - LASSERT(ols->ols_state == OLS_NEW); + LASSERTF(ols->ols_state == OLS_NEW, + "Impossible state: %d\n", ols->ols_state); ols->ols_flags = osc_enq2ldlm_flags(enqflags); + if (enqflags & CEF_AGL) { + ols->ols_flags |= LDLM_FL_BLOCK_NOWAIT; + ols->ols_agl = 1; + } if (ols->ols_flags & LDLM_FL_HAS_INTENT) ols->ols_glimpse = 1; if (!osc_lock_is_lockless(ols) && !(enqflags & CEF_MUST)) @@ -1181,6 +1197,8 @@ static int osc_lock_enqueue(const struct lu_env *env, /* a reference for lock, passed as an upcall cookie */ cl_lock_get(lock); lu_ref_add(&lock->cll_reference, "upcall", lock); + /* a user for lock also */ + cl_lock_user_add(env, lock); ols->ols_state = OLS_ENQUEUED; /* @@ -1196,11 +1214,16 @@ static int osc_lock_enqueue(const struct lu_env *env, obj->oo_oinfo->loi_kms_valid, osc_lock_upcall, ols, einfo, &ols->ols_handle, - PTLRPCD_SET, 1); + PTLRPCD_SET, 1, ols->ols_agl); if (result != 0) { + cl_lock_user_del(env, lock); lu_ref_del(&lock->cll_reference, "upcall", lock); cl_lock_put(env, lock); + if (unlikely(result == -ECANCELED)) { + ols->ols_state = OLS_NEW; + result = 0; + } } } else { ols->ols_state = OLS_GRANTED; @@ -1218,8 +1241,34 @@ static int osc_lock_wait(const struct lu_env *env, struct cl_lock *lock = olck->ols_cl.cls_lock; LINVRNT(osc_lock_invariant(olck)); - if (olck->ols_glimpse && olck->ols_state >= OLS_UPCALL_RECEIVED) - return 0; + + if (olck->ols_glimpse && olck->ols_state >= OLS_UPCALL_RECEIVED) { + if (olck->ols_flags & LDLM_FL_LVB_READY) { + return 0; + } else if (olck->ols_agl) { + olck->ols_state = OLS_NEW; + } else { + LASSERT(lock->cll_error); + return lock->cll_error; + } + } + + if (olck->ols_state == OLS_NEW) { + if (lock->cll_descr.cld_enq_flags & CEF_NO_REENQUEUE) { + return -ENAVAIL; + } else { + int rc; + + LASSERT(olck->ols_agl); + + rc = osc_lock_enqueue(env, slice, NULL, CEF_ASYNC | + CEF_MUST); + if (rc != 0) + return rc; + else + return CLO_REENQUEUED; + } + } LASSERT(equi(olck->ols_state >= OLS_UPCALL_RECEIVED && lock->cll_error == 0, olck->ols_lock != NULL)); @@ -1337,6 +1386,7 @@ static void osc_lock_cancel(const struct lu_env *env, lock, result); } olck->ols_state = OLS_CANCELLED; + olck->ols_flags &= ~LDLM_FL_LVB_READY; osc_lock_detach(env, olck); } @@ -1475,6 +1525,9 @@ static int osc_lock_fits_into(const struct lu_env *env, return 0; if (need->cld_mode == CLM_PHANTOM) { + if (ols->ols_agl) + return !(ols->ols_state > OLS_RELEASED); + /* * Note: the QUEUED lock can't be matched here, otherwise * it might cause the deadlocks. diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index f4f2c7a..1f25a97 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -3178,7 +3178,7 @@ static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm, static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb, obd_enqueue_update_f upcall, void *cookie, - int *flags, int rc) + int *flags, int agl, int rc) { int intent = *flags & LDLM_FL_HAS_INTENT; ENTRY; @@ -3196,7 +3196,8 @@ static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb, } } - if ((intent && rc == ELDLM_LOCK_ABORTED) || !rc) { + if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) || + (rc == 0)) { *flags |= LDLM_FL_LVB_READY; CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n", lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime); @@ -3214,6 +3215,9 @@ static int osc_enqueue_interpret(const struct lu_env *env, struct ldlm_lock *lock; struct lustre_handle handle; __u32 mode; + struct ost_lvb *lvb; + __u32 lvb_len; + int *flags = aa->oa_flags; /* Make a local copy of a lock handle and a mode, because aa->oa_* * might be freed anytime after lock upcall has been called. */ @@ -3233,13 +3237,20 @@ static int osc_enqueue_interpret(const struct lu_env *env, /* Let CP AST to grant the lock first. */ OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1); + if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) { + lvb = NULL; + lvb_len = 0; + } else { + lvb = aa->oa_lvb; + lvb_len = sizeof(*aa->oa_lvb); + } + /* Complete obtaining the lock procedure. */ rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1, - mode, aa->oa_flags, aa->oa_lvb, - sizeof(*aa->oa_lvb), &handle, rc); + mode, flags, lvb, lvb_len, &handle, rc); /* Complete osc stuff. */ - rc = osc_enqueue_fini(req, aa->oa_lvb, - aa->oa_upcall, aa->oa_cookie, aa->oa_flags, rc); + rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie, + flags, aa->oa_agl, rc); OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10); @@ -3263,8 +3274,9 @@ void osc_update_enqueue(struct lustre_handle *lov_lockhp, struct lov_oinfo *loi, int flags, struct ost_lvb *lvb, __u32 mode, int rc) { + struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp); + if (rc == ELDLM_OK) { - struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp); __u64 tmp; LASSERT(lock != NULL); @@ -3285,13 +3297,21 @@ void osc_update_enqueue(struct lustre_handle *lov_lockhp, lock->l_policy_data.l_extent.end); } ldlm_lock_allow_match(lock); - LDLM_LOCK_PUT(lock); } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) { + LASSERT(lock != NULL); loi->loi_lvb = *lvb; + ldlm_lock_allow_match(lock); CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving" " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms); rc = ELDLM_OK; } + + if (lock != NULL) { + if (rc != ELDLM_OK) + ldlm_lock_fail_match(lock, rc); + + LDLM_LOCK_PUT(lock); + } } EXPORT_SYMBOL(osc_update_enqueue); @@ -3310,11 +3330,12 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, obd_enqueue_update_f upcall, void *cookie, struct ldlm_enqueue_info *einfo, struct lustre_handle *lockh, - struct ptlrpc_request_set *rqset, int async) + struct ptlrpc_request_set *rqset, int async, int agl) { struct obd_device *obd = exp->exp_obd; struct ptlrpc_request *req = NULL; int intent = *flags & LDLM_FL_HAS_INTENT; + int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY); ldlm_mode_t mode; int rc; ENTRY; @@ -3348,13 +3369,20 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, mode = einfo->ei_mode; if (einfo->ei_mode == LCK_PR) mode |= LCK_PW; - mode = ldlm_lock_match(obd->obd_namespace, - *flags | LDLM_FL_LVB_READY, res_id, + mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id, einfo->ei_type, policy, mode, lockh, 0); if (mode) { struct ldlm_lock *matched = ldlm_handle2lock(lockh); - if (osc_set_lock_data_with_check(matched, einfo)) { + if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) { + /* For AGL, if enqueue RPC is sent but the lock is not + * granted, then skip to process this strpe. + * Return -ECANCELED to tell the caller. */ + ldlm_lock_decref(lockh, mode); + LDLM_LOCK_PUT(matched); + RETURN(-ECANCELED); + } else if (osc_set_lock_data_with_check(matched, einfo)) { + *flags |= LDLM_FL_LVB_READY; /* addref the lock only if not async requests and PW * lock is matched whereas we asked for PR. */ if (!rqset && einfo->ei_mode != mode) @@ -3368,16 +3396,17 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, /* We already have a lock, and it's referenced */ (*upcall)(cookie, ELDLM_OK); - /* For async requests, decref the lock. */ if (einfo->ei_mode != mode) ldlm_lock_decref(lockh, LCK_PW); else if (rqset) + /* For async requests, decref the lock. */ ldlm_lock_decref(lockh, einfo->ei_mode); LDLM_LOCK_PUT(matched); RETURN(ELDLM_OK); - } else + } else { ldlm_lock_decref(lockh, mode); - LDLM_LOCK_PUT(matched); + LDLM_LOCK_PUT(matched); + } } no_match: @@ -3416,6 +3445,7 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, aa->oa_cookie = cookie; aa->oa_lvb = lvb; aa->oa_lockh = lockh; + aa->oa_agl = !!agl; req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_enqueue_interpret; @@ -3429,7 +3459,7 @@ int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id, RETURN(rc); } - rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, rc); + rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc); if (intent) ptlrpc_req_finished(req); @@ -3451,7 +3481,7 @@ static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo, &oinfo->oi_md->lsm_oinfo[0]->loi_lvb, oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid, oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh, - rqset, rqset != NULL); + rqset, rqset != NULL, 0); RETURN(rc); } diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index d426214..a1c47f5 100644 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -8387,6 +8387,50 @@ test_221() { } run_test 221 "make sure fault and truncate race to not cause OOM" +test_222a () { + rm -rf $DIR/$tdir + mkdir -p $DIR/$tdir + $LFS setstripe -c 1 -i 0 $DIR/$tdir + createmany -o $DIR/$tdir/$tfile 10 + cancel_lru_locks mdc + cancel_lru_locks osc + #define OBD_FAIL_LDLM_AGL_DELAY 0x31a + $LCTL set_param fail_loc=0x31a + ls -l $DIR/$tdir > /dev/null || error "AGL for ls failed" + $LCTL set_param fail_loc=0 + rm -r $DIR/$tdir +} +run_test 222a "AGL for ls should not trigger CLIO lock failure ================" + +test_222b () { + rm -rf $DIR/$tdir + mkdir -p $DIR/$tdir + $LFS setstripe -c 1 -i 0 $DIR/$tdir + createmany -o $DIR/$tdir/$tfile 10 + cancel_lru_locks mdc + cancel_lru_locks osc + #define OBD_FAIL_LDLM_AGL_DELAY 0x31a + $LCTL set_param fail_loc=0x31a + rm -r $DIR/$tdir || "AGL for rmdir failed" + $LCTL set_param fail_loc=0 +} +run_test 222b "AGL for rmdir should not trigger CLIO lock failure =============" + +test_223 () { + rm -rf $DIR/$tdir + mkdir -p $DIR/$tdir + $LFS setstripe -c 1 -i 0 $DIR/$tdir + createmany -o $DIR/$tdir/$tfile 10 + cancel_lru_locks mdc + cancel_lru_locks osc + #define OBD_FAIL_LDLM_AGL_NOLOCK 0x31b + $LCTL set_param fail_loc=0x31b + ls -l $DIR/$tdir > /dev/null || error "reenqueue failed" + $LCTL set_param fail_loc=0 + rm -r $DIR/$tdir +} +run_test 223 "osc reenqueue if without AGL lock granted =======================" + # # tests that do cleanup/setup should be run at the end #