From ecaba99677b28536f9c376b2b835b554a7792668 Mon Sep 17 00:00:00 2001 From: Jinshan Xiong Date: Tue, 6 Nov 2012 16:56:16 -0800 Subject: [PATCH] LU-1876 hsm: bugfix about layout lock on the client The following issues are fixed in this patch: * deadlock at add_lsmref * lov_delete_raid0 should wait for refcount of lsm to be zero * handle empty layout at lov layer since layout can be changed anytime so original assumption of skipping lov layer when lsm is NULL is no longer correct * other fixes Signed-off-by: Jinshan Xiong Change-Id: Ie7cd744b188f4d28fdbebda8870259b931328d00 Reviewed-on: http://review.whamcloud.com/4416 Tested-by: Hudson Reviewed-by: Johann Lombardi Reviewed-by: Niu Yawei Tested-by: Maloo --- lustre/include/cl_object.h | 28 +++---- lustre/lclient/glimpse.c | 2 + lustre/lclient/lcommon_cl.c | 8 ++ lustre/ldlm/ldlm_lock.c | 2 +- lustre/llite/dcache.c | 18 ++++- lustre/llite/file.c | 177 +++++++++++++++++++++--------------------- lustre/llite/llite_internal.h | 7 +- lustre/llite/llite_lib.c | 11 +-- lustre/llite/statahead.c | 5 +- lustre/llite/vvp_io.c | 18 +++-- lustre/llite/vvp_object.c | 13 +++- lustre/lov/lov_cl_internal.h | 2 + lustre/lov/lov_io.c | 19 +++-- lustre/lov/lov_lock.c | 35 +++++++++ lustre/lov/lov_object.c | 111 ++++++++++++++++---------- lustre/obdclass/cl_io.c | 7 +- lustre/obdclass/cl_lock.c | 2 +- 17 files changed, 285 insertions(+), 180 deletions(-) diff --git a/lustre/include/cl_object.h b/lustre/include/cl_object.h index 78f0b49..b4f7917 100644 --- a/lustre/include/cl_object.h +++ b/lustre/include/cl_object.h @@ -277,11 +277,6 @@ struct cl_object_conf { */ struct inode *coc_inode; /** - * Validate object conf. If object is using an invalid conf, - * then invalidate it and set the new layout. - */ - bool coc_validate_only; - /** * Invalidate the current stripe configuration due to losing * layout lock. */ @@ -2360,18 +2355,19 @@ struct cl_io { */ ci_need_restart:1, /** - * Ignore layout change. - * Most of the CIT_MISC operations can ignore layout change, because - * the purpose to create this kind of cl_io is to give an environment - * to run clio methods, for example: - * 1. request group lock; - * 2. flush caching pages by osc; - * 3. writepage - * 4. echo client - * So far, only direct IO and glimpse clio need restart if layout - * change during IO time. + * to not refresh layout - the IO issuer knows that the layout won't + * change(page operations, layout change causes all page to be + * discarded), or it doesn't matter if it changes(sync). + */ + ci_ignore_layout:1, + /** + * Check if layout changed after the IO finishes. Mainly for HSM + * requirement. If IO occurs to openning files, it doesn't need to + * verify layout because HSM won't release openning files. + * Right now, only two opertaions need to verify layout: glimpse + * and setattr. */ - ci_ignore_layout:1; + ci_verify_layout:1; /** * Number of pages owned by this IO. For invariant checking. */ diff --git a/lustre/lclient/glimpse.c b/lustre/lclient/glimpse.c index d03b855..58aad5e 100644 --- a/lustre/lclient/glimpse.c +++ b/lustre/lclient/glimpse.c @@ -173,6 +173,7 @@ int cl_glimpse_lock(const struct lu_env *env, struct cl_io *io, cl_lock_release(env, lock, "glimpse", cfs_current()); } else { CDEBUG(D_DLMTRACE, "No objects for inode\n"); + cl_merge_lvb(inode); } } @@ -225,6 +226,7 @@ int cl_glimpse_size0(struct inode *inode, int agl) result = cl_io_get(inode, &env, &io, &refcheck); if (result > 0) { again: + io->ci_verify_layout = 1; result = cl_io_init(env, io, CIT_MISC, io->ci_obj); if (result > 0) /* diff --git a/lustre/lclient/lcommon_cl.c b/lustre/lclient/lcommon_cl.c index 22e3526..9f97e75 100644 --- a/lustre/lclient/lcommon_cl.c +++ b/lustre/lclient/lcommon_cl.c @@ -1334,6 +1334,14 @@ __u32 cl_fid_build_gen(const struct lu_fid *fid) RETURN(gen); } +/* lsm is unreliable after hsm implementation as layout can be changed at + * any time. This is only to support old, non-clio-ized interfaces. It will + * cause deadlock if clio operations are called with this extra layout refcount + * because in case the layout changed during the IO, ll_layout_refresh() will + * have to wait for the refcount to become zero to destroy the older layout. + * + * Notice that the lsm returned by this function may not be valid unless called + * inside layout lock - MDS_INODELOCK_LAYOUT. */ struct lov_stripe_md *ccc_inode_lsm_get(struct inode *inode) { return lov_lsm_get(cl_i2info(inode)->lli_clob); diff --git a/lustre/ldlm/ldlm_lock.c b/lustre/ldlm/ldlm_lock.c index f575bda..8c78ae6 100644 --- a/lustre/ldlm/ldlm_lock.c +++ b/lustre/ldlm/ldlm_lock.c @@ -1249,7 +1249,7 @@ ldlm_mode_t ldlm_lock_match(struct ldlm_namespace *ns, __u64 flags, /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */ l_wait_event(lock->l_waitq, lock->l_flags & LDLM_FL_LVB_READY || - lock->l_failed, + lock->l_destroyed || lock->l_failed, &lwi); if (!(lock->l_flags & LDLM_FL_LVB_READY)) { if (flags & LDLM_FL_TEST_LOCK) diff --git a/lustre/llite/dcache.c b/lustre/llite/dcache.c index 1edafba..f94d0de4 100644 --- a/lustre/llite/dcache.c +++ b/lustre/llite/dcache.c @@ -262,7 +262,20 @@ void ll_intent_drop_lock(struct lookup_intent *it) struct lustre_handle *handle; if (it->it_op && it->d.lustre.it_lock_mode) { - handle = (struct lustre_handle *)&it->d.lustre.it_lock_handle; + struct ldlm_lock *lock; + + handle = (struct lustre_handle *)&it->d.lustre.it_lock_handle; + lock = ldlm_handle2lock(handle); + if (lock != NULL) { + /* it can only be allowed to match after layout is + * applied to inode otherwise false layout would be + * seen. Applying layout shoud happen before dropping + * the intent lock. */ + if (it->d.lustre.it_lock_bits & MDS_INODELOCK_LAYOUT) + ldlm_lock_allow_match(lock); + LDLM_LOCK_PUT(lock); + } + CDEBUG(D_DLMTRACE, "releasing lock with cookie "LPX64 " from it %p\n", handle->cookie, it); ldlm_lock_decref(handle, it->d.lustre.it_lock_mode); @@ -543,6 +556,9 @@ out: if (req != NULL && !it_disposition(it, DISP_ENQ_COMPLETE)) ptlrpc_req_finished(req); if (rc == 0) { + /* mdt may grant layout lock for the newly created file, so + * release the lock to avoid leaking */ + ll_intent_drop_lock(it); ll_invalidate_aliases(de->d_inode); } else { __u64 bits = 0; diff --git a/lustre/llite/file.c b/lustre/llite/file.c index 34c08cc..21011ae 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -779,11 +779,10 @@ int ll_merge_lvb(struct inode *inode) CDEBUG(D_VFSTRACE, DFID" updating i_size "LPU64"\n", PFID(&lli->lli_fid), lvb.lvb_size); inode->i_blocks = lvb.lvb_blocks; - - LTIME_S(inode->i_mtime) = lvb.lvb_mtime; - LTIME_S(inode->i_atime) = lvb.lvb_atime; - LTIME_S(inode->i_ctime) = lvb.lvb_ctime; } + LTIME_S(inode->i_mtime) = lvb.lvb_mtime; + LTIME_S(inode->i_atime) = lvb.lvb_atime; + LTIME_S(inode->i_ctime) = lvb.lvb_ctime; ll_inode_size_unlock(inode); ccc_inode_lsm_put(inode, lsm); @@ -2071,9 +2070,9 @@ int ll_fsync(struct file *file, struct dentry *dentry, int data) struct ll_inode_info *lli = ll_i2info(inode); struct ptlrpc_request *req; struct obd_capa *oc; - struct lov_stripe_md *lsm; int rc, err; ENTRY; + CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p)\n", inode->i_ino, inode->i_generation, inode); ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_FSYNC, 1); @@ -2108,8 +2107,7 @@ int ll_fsync(struct file *file, struct dentry *dentry, int data) if (!err) ptlrpc_req_finished(req); - lsm = ccc_inode_lsm_get(inode); - if (data && lsm) { + if (data) { struct ll_file_data *fd = LUSTRE_FPRIVATE(file); err = cl_sync_file_range(inode, 0, OBD_OBJECT_EOF, @@ -2121,7 +2119,6 @@ int ll_fsync(struct file *file, struct dentry *dentry, int data) else fd->fd_write_failed = false; } - ccc_inode_lsm_put(inode, lsm); #ifdef HAVE_FILE_FSYNC_4ARGS mutex_unlock(&inode->i_mutex); @@ -2308,19 +2305,18 @@ int ll_have_md_lock(struct inode *inode, __u64 *bits, ldlm_mode_t l_req_mode) } ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits, - struct lustre_handle *lockh) + struct lustre_handle *lockh, __u64 flags) { ldlm_policy_data_t policy = { .l_inodebits = {bits}}; struct lu_fid *fid; ldlm_mode_t rc; - __u64 flags; ENTRY; fid = &ll_i2info(inode)->lli_fid; CDEBUG(D_INFO, "trying to match res "DFID"\n", PFID(fid)); - flags = LDLM_FL_BLOCK_GRANTED | LDLM_FL_CBPENDING; - rc = md_lock_match(ll_i2mdexp(inode), flags, fid, LDLM_IBITS, &policy, + rc = md_lock_match(ll_i2mdexp(inode), LDLM_FL_BLOCK_GRANTED|flags, + fid, LDLM_IBITS, &policy, LCK_CR|LCK_CW|LCK_PR|LCK_PW, lockh); RETURN(rc); } @@ -2453,21 +2449,17 @@ int ll_inode_revalidate_it(struct dentry *dentry, struct lookup_intent *it, ENTRY; rc = __ll_inode_revalidate_it(dentry, it, ibits); - - /* if object not yet allocated, don't validate size */ - if (rc == 0 && !ll_i2info(dentry->d_inode)->lli_has_smd) { - LTIME_S(inode->i_atime) = ll_i2info(inode)->lli_lvb.lvb_atime; - LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_lvb.lvb_mtime; - LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_lvb.lvb_ctime; - RETURN(0); - } - - /* ll_glimpse_size will prefer locally cached writes if they extend - * the file */ - - if (rc == 0) - rc = ll_glimpse_size(inode); - + if (rc != 0) + RETURN(rc); + + /* if object isn't regular file, don't validate size */ + if (!S_ISREG(inode->i_mode)) { + LTIME_S(inode->i_atime) = ll_i2info(inode)->lli_lvb.lvb_atime; + LTIME_S(inode->i_mtime) = ll_i2info(inode)->lli_lvb.lvb_mtime; + LTIME_S(inode->i_ctime) = ll_i2info(inode)->lli_lvb.lvb_ctime; + } else { + rc = ll_glimpse_size(inode); + } RETURN(rc); } @@ -2871,17 +2863,19 @@ int ll_layout_refresh(struct inode *inode, __u32 *gen) struct ll_inode_info *lli = ll_i2info(inode); struct ll_sb_info *sbi = ll_i2sbi(inode); struct md_op_data *op_data = NULL; - struct ptlrpc_request *req = NULL; struct lookup_intent it = { .it_op = IT_LAYOUT }; - struct lustre_handle lockh; + struct lustre_handle lockh = { 0 }; ldlm_mode_t mode; - struct cl_object_conf conf = { .coc_inode = inode, - .coc_validate_only = true }; + struct ldlm_enqueue_info einfo = { .ei_type = LDLM_IBITS, + .ei_mode = LCK_CR, + .ei_cb_bl = ll_md_blocking_ast, + .ei_cb_cp = ldlm_completion_ast, + .ei_cbdata = inode }; int rc; ENTRY; *gen = 0; - if (!(ll_i2sbi(inode)->ll_flags & LL_SBI_LAYOUT_LOCK)) + if (!(sbi->ll_flags & LL_SBI_LAYOUT_LOCK)) RETURN(0); /* sanity checks */ @@ -2890,16 +2884,14 @@ int ll_layout_refresh(struct inode *inode, __u32 *gen) /* mostly layout lock is caching on the local side, so try to match * it before grabbing layout lock mutex. */ - mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh); + mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, + LDLM_FL_LVB_READY); if (mode != 0) { /* hit cached lock */ - struct lov_stripe_md *lsm; + /* lsm_layout_gen is started from 0, plus 1 here to distinguish + * the cases of no layout and first layout. */ + *gen = lli->lli_layout_gen + 1; - lsm = ccc_inode_lsm_get(inode); - if (lsm != NULL) - *gen = lsm->lsm_layout_gen; - ccc_inode_lsm_put(inode, lsm); ldlm_lock_decref(&lockh, mode); - RETURN(0); } @@ -2911,60 +2903,71 @@ int ll_layout_refresh(struct inode *inode, __u32 *gen) /* take layout lock mutex to enqueue layout lock exclusively. */ cfs_mutex_lock(&lli->lli_layout_mutex); - /* make sure the old conf goes away */ - ll_layout_conf(inode, &conf); + /* try again inside layout mutex */ + mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, + LDLM_FL_LVB_READY); + if (mode != 0) { /* hit cached lock */ + *gen = lli->lli_layout_gen + 1; + + ldlm_lock_decref(&lockh, mode); + cfs_mutex_unlock(&lli->lli_layout_mutex); + ll_finish_md_op_data(op_data); + RETURN(0); + } + + /* have to enqueue one */ + rc = md_enqueue(sbi->ll_md_exp, &einfo, &it, op_data, &lockh, + NULL, 0, NULL, 0); + if (it.d.lustre.it_data != NULL) + ptlrpc_req_finished(it.d.lustre.it_data); + it.d.lustre.it_data = NULL; - /* enqueue layout lock */ - rc = md_intent_lock(sbi->ll_md_exp, op_data, NULL, 0, &it, 0, - &req, ll_md_blocking_ast, 0); if (rc == 0) { - /* we get a new lock, so update the lock data */ - lockh.cookie = it.d.lustre.it_lock_handle; - md_set_lock_data(sbi->ll_md_exp, &lockh.cookie, inode, NULL); - - /* req == NULL is when lock was found in client cache, without - * any request to server (but lsm can be canceled just after a - * release) */ - if (req != NULL) { - struct ldlm_lock *lock = ldlm_handle2lock(&lockh); - struct lustre_md md = { NULL }; - void *lmm; - int lmmsize; - - /* for IT_LAYOUT lock, lmm is returned in lock's lvb - * data via completion callback */ - LASSERT(lock != NULL); - lmm = lock->l_lvb_data; - lmmsize = lock->l_lvb_len; - if (lmm != NULL) - rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm, - lmm, lmmsize); - if (rc == 0) { + struct ldlm_lock *lock; + struct cl_object_conf conf; + struct lustre_md md = { NULL }; + void *lmm; + int lmmsize; + + LASSERT(lustre_handle_is_used(&lockh)); + + /* set lock data in case this is a new lock */ + ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL); + + lock = ldlm_handle2lock(&lockh); + LASSERT(lock != NULL); + + /* for IT_LAYOUT lock, lmm is returned in lock's lvb + * data via completion callback */ + lmm = lock->l_lvb_data; + lmmsize = lock->l_lvb_len; + if (lmm != NULL) { + rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm, + lmm, lmmsize); + if (rc >= 0) { if (md.lsm != NULL) - *gen = md.lsm->lsm_layout_gen; - - memset(&conf, 0, sizeof conf); - conf.coc_inode = inode; - conf.u.coc_md = &md; - ll_layout_conf(inode, &conf); - /* is this racy? */ - lli->lli_has_smd = md.lsm != NULL; + *gen = md.lsm->lsm_layout_gen + 1; + rc = 0; + } else { + CERROR("file: "DFID" unpackmd error: %d\n", + PFID(&lli->lli_fid), rc); } - if (md.lsm != NULL) - obd_free_memmd(sbi->ll_dt_exp, &md.lsm); - - LDLM_LOCK_PUT(lock); - ptlrpc_req_finished(req); - } else { /* hit caching lock */ - struct lov_stripe_md *lsm; - - lsm = ccc_inode_lsm_get(inode); - if (lsm != NULL) - *gen = lsm->lsm_layout_gen; - ccc_inode_lsm_put(inode, lsm); } - ll_intent_drop_lock(&it); + LDLM_LOCK_PUT(lock); + + /* set layout to file. This may cause lock expiration as we + * set layout inside layout ibits lock. */ + memset(&conf, 0, sizeof conf); + conf.coc_inode = inode; + conf.u.coc_md = &md; + ll_layout_conf(inode, &conf); + /* is this racy? */ + lli->lli_has_smd = md.lsm != NULL; + if (md.lsm != NULL) + obd_free_memmd(sbi->ll_dt_exp, &md.lsm); } + ll_intent_drop_lock(&it); + cfs_mutex_unlock(&lli->lli_layout_mutex); ll_finish_md_op_data(op_data); diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 8b3ab1b..b4eb309 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -165,8 +165,7 @@ struct ll_inode_info { __u64 lli_open_fd_read_count; __u64 lli_open_fd_write_count; __u64 lli_open_fd_exec_count; - /* Protects access to och pointers and their usage counters, also - * atomicity of check-update of lli_has_smd */ + /* Protects access to och pointers and their usage counters */ cfs_mutex_t lli_och_mutex; struct inode lli_vfs_inode; @@ -270,6 +269,8 @@ struct ll_inode_info { /* mutex to request for layout lock exclusively. */ cfs_mutex_t lli_layout_mutex; + /* valid only inside LAYOUT ibits lock, protected by lli_layout_mutex */ + __u32 lli_layout_gen; }; /* @@ -704,7 +705,7 @@ extern int ll_inode_revalidate_it(struct dentry *, struct lookup_intent *, extern int ll_have_md_lock(struct inode *inode, __u64 *bits, ldlm_mode_t l_req_mode); extern ldlm_mode_t ll_take_md_lock(struct inode *inode, __u64 bits, - struct lustre_handle *lockh); + struct lustre_handle *lockh, __u64 flags); int __ll_inode_revalidate_it(struct dentry *, struct lookup_intent *, __u64 bits); int ll_revalidate_nd(struct dentry *dentry, struct nameidata *nd); diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 5ee8fb1..2d8665b 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -1479,6 +1479,8 @@ int ll_setattr_raw(struct dentry *dentry, struct iattr *attr) * resides on the MDS, ie, this file has no objects. */ if (lsm != NULL) attr->ia_valid &= ~ATTR_SIZE; + /* can't call ll_setattr_ost() while holding a refcount of lsm */ + ccc_inode_lsm_put(inode, lsm); memcpy(&op_data->op_attr, attr, sizeof(*attr)); @@ -1492,10 +1494,8 @@ int ll_setattr_raw(struct dentry *dentry, struct iattr *attr) GOTO(out, rc); ll_ioepoch_open(lli, op_data->op_ioepoch); - if (lsm == NULL || !S_ISREG(inode->i_mode)) { - CDEBUG(D_INODE, "no lsm: not setting attrs on OST\n"); + if (!S_ISREG(inode->i_mode)) GOTO(out, rc = 0); - } if (ia_valid & ATTR_SIZE) attr->ia_valid |= ATTR_SIZE; @@ -1511,7 +1511,6 @@ int ll_setattr_raw(struct dentry *dentry, struct iattr *attr) rc = ll_setattr_ost(inode, attr); EXIT; out: - ccc_inode_lsm_put(inode, lsm); if (op_data) { if (op_data->op_ioepoch) { rc1 = ll_setattr_done_writing(inode, op_data, mod); @@ -1678,7 +1677,6 @@ void ll_update_inode(struct inode *inode, struct lustre_md *md) LASSERT ((lsm != NULL) == ((body->valid & OBD_MD_FLEASIZE) != 0)); if (lsm != NULL) { LASSERT(S_ISREG(inode->i_mode)); - cfs_mutex_lock(&lli->lli_och_mutex); CDEBUG(D_INODE, "adding lsm %p to inode %lu/%u(%p)\n", lsm, inode->i_ino, inode->i_generation, inode); /* cl_file_inode_init must go before lli_has_smd or a race @@ -1687,7 +1685,6 @@ void ll_update_inode(struct inode *inode, struct lustre_md *md) * glimpse would try to use uninitialized lov */ if (cl_file_inode_init(inode, md) == 0) lli->lli_has_smd = true; - cfs_mutex_unlock(&lli->lli_och_mutex); lli->lli_maxbytes = lsm->lsm_maxbytes; if (lli->lli_maxbytes > MAX_LFS_FILESIZE) @@ -1777,7 +1774,7 @@ void ll_update_inode(struct inode *inode, struct lustre_md *md) * lock on the client and set LLIF_MDS_SIZE_LOCK holding * it. */ mode = ll_take_md_lock(inode, MDS_INODELOCK_UPDATE, - &lockh); + &lockh, LDLM_FL_CBPENDING); if (mode) { if (lli->lli_flags & (LLIF_DONE_WRITING | LLIF_EPOCH_PENDING | diff --git a/lustre/llite/statahead.c b/lustre/llite/statahead.c index 5cf4b9c..3ac64c9 100644 --- a/lustre/llite/statahead.c +++ b/lustre/llite/statahead.c @@ -138,10 +138,7 @@ ll_sa_entry_unhash(struct ll_statahead_info *sai, struct ll_sa_entry *entry) static inline int agl_should_run(struct ll_statahead_info *sai, struct inode *inode) { - if (inode != NULL && S_ISREG(inode->i_mode) && - ll_i2info(inode)->lli_has_smd && sai->sai_agl_valid) - return 1; - return 0; + return (inode != NULL && S_ISREG(inode->i_mode) && sai->sai_agl_valid); } static inline struct ll_sa_entry * diff --git a/lustre/llite/vvp_io.c b/lustre/llite/vvp_io.c index 413fd94..f6f2271 100644 --- a/lustre/llite/vvp_io.c +++ b/lustre/llite/vvp_io.c @@ -88,14 +88,22 @@ static void vvp_io_fini(const struct lu_env *env, const struct cl_io_slice *ios) struct cl_io *io = ios->cis_io; struct cl_object *obj = io->ci_obj; struct ccc_io *cio = cl2ccc_io(env, ios); - __u32 gen; CLOBINVRNT(env, obj, ccc_object_invariant(obj)); - /* check layout version */ - ll_layout_refresh(ccc_object_inode(obj), &gen); - if (cio->cui_layout_gen > 0) - io->ci_need_restart = cio->cui_layout_gen == gen; + CDEBUG(D_VFSTRACE, "ignore/verify layout %d/%d, layout version %d.\n", + io->ci_ignore_layout, io->ci_verify_layout, cio->cui_layout_gen); + + if (!io->ci_ignore_layout && io->ci_verify_layout) { + __u32 gen = 0; + + /* check layout version */ + ll_layout_refresh(ccc_object_inode(obj), &gen); + io->ci_need_restart = cio->cui_layout_gen != gen; + if (io->ci_need_restart) + CDEBUG(D_VFSTRACE, "layout changed from %d to %d.\n", + cio->cui_layout_gen, gen); + } } static void vvp_io_fault_fini(const struct lu_env *env, diff --git a/lustre/llite/vvp_object.c b/lustre/llite/vvp_object.c index 85a8dd6..48132bd 100644 --- a/lustre/llite/vvp_object.c +++ b/lustre/llite/vvp_object.c @@ -121,13 +121,24 @@ static int vvp_attr_set(const struct lu_env *env, struct cl_object *obj, return 0; } +int vvp_conf_set(const struct lu_env *env, struct cl_object *obj, + const struct cl_object_conf *conf) +{ + struct ll_inode_info *lli = ll_i2info(conf->coc_inode); + + if (conf->u.coc_md != NULL && conf->u.coc_md->lsm != NULL) + lli->lli_layout_gen = conf->u.coc_md->lsm->lsm_layout_gen; + + return 0; +} + static const struct cl_object_operations vvp_ops = { .coo_page_init = vvp_page_init, .coo_lock_init = vvp_lock_init, .coo_io_init = vvp_io_init, .coo_attr_get = vvp_attr_get, .coo_attr_set = vvp_attr_set, - .coo_conf_set = ccc_conf_set, + .coo_conf_set = vvp_conf_set, .coo_glimpse = ccc_object_glimpse }; diff --git a/lustre/lov/lov_cl_internal.h b/lustre/lov/lov_cl_internal.h index 627abb2..b74df27 100644 --- a/lustre/lov/lov_cl_internal.h +++ b/lustre/lov/lov_cl_internal.h @@ -579,6 +579,8 @@ int lovsub_lock_init (const struct lu_env *env, struct cl_object *obj, int lov_lock_init_raid0 (const struct lu_env *env, struct cl_object *obj, struct cl_lock *lock, const struct cl_io *io); +int lov_lock_init_empty (const struct lu_env *env, struct cl_object *obj, + struct cl_lock *lock, const struct cl_io *io); int lov_io_init_raid0 (const struct lu_env *env, struct cl_object *obj, struct cl_io *io); int lov_io_init_empty (const struct lu_env *env, struct cl_object *obj, diff --git a/lustre/lov/lov_io.c b/lustre/lov/lov_io.c index e6578ed..e8a5c54 100644 --- a/lustre/lov/lov_io.c +++ b/lustre/lov/lov_io.c @@ -308,7 +308,8 @@ static void lov_io_slice_init(struct lov_io *lio, io->ci_result = 0; lio->lis_object = obj; - LASSERT(lio->lis_lsm != NULL); + LASSERT(obj->lo_lsm != NULL); + lio->lis_lsm = lsm_addref(obj->lo_lsm); lio->lis_stripe_count = lio->lis_lsm->lsm_stripe_count; switch (io->ci_type) { @@ -932,13 +933,15 @@ int lov_io_init_empty(const struct lu_env *env, struct cl_object *obj, switch (io->ci_type) { default: LBUG(); + case CIT_MISC: + case CIT_READ: + result = 0; + break; case CIT_FSYNC: - case CIT_MISC: - case CIT_READ: - result = 0; - break; + case CIT_SETATTR: + result = +1; + break; case CIT_WRITE: - case CIT_SETATTR: result = -EBADF; break; case CIT_FAULT: @@ -949,8 +952,8 @@ int lov_io_init_empty(const struct lu_env *env, struct cl_object *obj, } if (result == 0) cl_io_slice_add(io, &lio->lis_cl, obj, &lov_empty_io_ops); - io->ci_result = result; - RETURN(result != 0); + io->ci_result = result < 0 ? result : 0; + RETURN(result != 0); } /** @} lov */ diff --git a/lustre/lov/lov_lock.c b/lustre/lov/lov_lock.c index 6d66adb..2424581 100644 --- a/lustre/lov/lov_lock.c +++ b/lustre/lov/lov_lock.c @@ -1186,6 +1186,41 @@ int lov_lock_init_raid0(const struct lu_env *env, struct cl_object *obj, RETURN(result); } +static void lov_empty_lock_fini(const struct lu_env *env, + struct cl_lock_slice *slice) +{ + struct lov_lock *lck = cl2lov_lock(slice); + OBD_SLAB_FREE_PTR(lck, lov_lock_kmem); +} + +static int lov_empty_lock_print(const struct lu_env *env, void *cookie, + lu_printer_t p, const struct cl_lock_slice *slice) +{ + (*p)(env, cookie, "empty\n"); + return 0; +} + +static const struct cl_lock_operations lov_empty_lock_ops = { + .clo_fini = lov_empty_lock_fini, + .clo_print = lov_empty_lock_print +}; + +int lov_lock_init_empty(const struct lu_env *env, struct cl_object *obj, + struct cl_lock *lock, const struct cl_io *io) +{ + struct lov_lock *lck; + int result = -ENOMEM; + + ENTRY; + OBD_SLAB_ALLOC_PTR_GFP(lck, lov_lock_kmem, CFS_ALLOC_IO); + if (lck != NULL) { + cl_lock_slice_add(lock, &lck->lls_cl, obj, &lov_empty_lock_ops); + lck->lls_orig = lock->cll_descr; + result = 0; + } + RETURN(result); +} + static struct cl_lock_closure *lov_closure_get(const struct lu_env *env, struct cl_lock *parent) { diff --git a/lustre/lov/lov_object.c b/lustre/lov/lov_object.c index bef4eef..99d543a 100644 --- a/lustre/lov/lov_object.c +++ b/lustre/lov/lov_object.c @@ -233,6 +233,7 @@ static int lov_delete_empty(const struct lu_env *env, struct lov_object *lov, union lov_layout_state *state) { LASSERT(lov->lo_type == LLT_EMPTY); + cl_object_prune(env, &lov->lo_cl); return 0; } @@ -300,12 +301,14 @@ static int lov_delete_raid0(const struct lu_env *env, struct lov_object *lov, for (i = 0; i < r0->lo_nr; ++i) { struct lovsub_object *los = r0->lo_sub[i]; - if (los != NULL) + if (los != NULL) { + cl_locks_prune(env, &los->lso_cl, 1); /* * If top-level object is to be evicted from * the cache, so are its sub-objects. */ lov_subobject_kill(env, lov, los, i); + } } } RETURN(0); @@ -388,8 +391,14 @@ static int lov_attr_get_raid0(const struct lu_env *env, struct cl_object *obj, ENTRY; /* this is called w/o holding type guard mutex, so it must be inside - * an on going IO otherwise lsm may be replaced. */ - LASSERT(cfs_atomic_read(&lsm->lsm_refc) > 1); + * an on going IO otherwise lsm may be replaced. + * LU-2117: it turns out there exists one exception. For mmaped files, + * the lock of those files may be requested in the other file's IO + * context, and this function is called in ccc_lock_state(), it will + * hit this assertion. + * Anyway, it's still okay to call attr_get w/o type guard as layout + * can't go if locks exist. */ + /* LASSERT(cfs_atomic_read(&lsm->lsm_refc) > 1); */ if (!r0->lo_attr_valid) { /* @@ -433,7 +442,7 @@ const static struct lov_layout_operations lov_dispatch[] = { .llo_install = lov_install_empty, .llo_print = lov_print_empty, .llo_page_init = lov_page_init_empty, - .llo_lock_init = NULL, + .llo_lock_init = lov_lock_init_empty, .llo_io_init = lov_io_init_empty, .llo_getattr = lov_attr_get_empty }, @@ -508,6 +517,20 @@ do { \ lov_conf_thaw(__obj); \ } while (0) +static void lov_conf_lock(struct lov_object *lov) +{ + LASSERT(lov->lo_owner != cfs_current()); + cfs_down_write(&lov->lo_type_guard); + LASSERT(lov->lo_owner == NULL); + lov->lo_owner = cfs_current(); +} + +static void lov_conf_unlock(struct lov_object *lov) +{ + lov->lo_owner = NULL; + cfs_up_write(&lov->lo_type_guard); +} + static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov) { struct l_wait_info lwi = { 0 }; @@ -517,11 +540,17 @@ static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov) if (!lov->lo_lsm_invalid || lsm == NULL) RETURN(0); - l_wait_event(lov->lo_waitq, cfs_atomic_read(&lsm->lsm_refc) == 1, &lwi); + LASSERT(cfs_atomic_read(&lsm->lsm_refc) > 0); + while (cfs_atomic_read(&lsm->lsm_refc) > 1) { + lov_conf_unlock(lov); + l_wait_event(lov->lo_waitq, + cfs_atomic_read(&lsm->lsm_refc) == 1, &lwi); + lov_conf_lock(lov); + } RETURN(0); } -static int lov_layout_change(const struct lu_env *env, +static int lov_layout_change(const struct lu_env *unused, struct lov_object *lov, enum lov_layout_type llt, const struct cl_object_conf *conf) { @@ -532,7 +561,7 @@ static int lov_layout_change(const struct lu_env *env, struct cl_object_header *hdr = cl_object_header(&lov->lo_cl); void *cookie; - struct lu_env *nested; + struct lu_env *env; int refcheck; LASSERT(0 <= lov->lo_type && lov->lo_type < ARRAY_SIZE(lov_dispatch)); @@ -540,13 +569,11 @@ static int lov_layout_change(const struct lu_env *env, ENTRY; cookie = cl_env_reenter(); - nested = cl_env_get(&refcheck); - if (!IS_ERR(nested)) - cl_object_prune(nested, &lov->lo_cl); - else - result = PTR_ERR(nested); - cl_env_put(nested, &refcheck); - cl_env_reexit(cookie); + env = cl_env_get(&refcheck); + if (IS_ERR(env)) { + cl_env_reexit(cookie); + RETURN(PTR_ERR(env)); + } old_ops = &lov_dispatch[lov->lo_type]; new_ops = &lov_dispatch[llt]; @@ -571,6 +598,9 @@ static int lov_layout_change(const struct lu_env *env, /* this file becomes an EMPTY file. */ } } + + cl_env_put(env, &refcheck); + cl_env_reexit(cookie); RETURN(result); } @@ -606,32 +636,33 @@ int lov_object_init(const struct lu_env *env, struct lu_object *obj, static int lov_conf_set(const struct lu_env *env, struct cl_object *obj, const struct cl_object_conf *conf) { - struct lov_stripe_md *lsm = conf->u.coc_md->lsm; + struct lov_stripe_md *lsm = NULL; struct lov_object *lov = cl2lov(obj); int result = 0; ENTRY; - /* - * Only LLT_EMPTY <-> LLT_RAID0 transitions are supported. - */ - LASSERT(lov->lo_owner != cfs_current()); - cfs_down_write(&lov->lo_type_guard); - LASSERT(lov->lo_owner == NULL); - lov->lo_owner = cfs_current(); - + lov_conf_lock(lov); if (conf->coc_invalidate) { lov->lo_lsm_invalid = 1; GOTO(out, result = 0); } - if (conf->coc_validate_only) { - if (!lov->lo_lsm_invalid) - GOTO(out, result = 0); + if (conf->u.coc_md != NULL) + lsm = conf->u.coc_md->lsm; - lov_layout_wait(env, lov); - /* fall through to set up new layout */ + if ((lsm == NULL && lov->lo_lsm == NULL) || + (lsm != NULL && lov->lo_lsm != NULL && + lov->lo_lsm->lsm_layout_gen == lsm->lsm_layout_gen)) { + lov->lo_lsm_invalid = 0; + GOTO(out, result = 0); } + /* will change layout */ + lov_layout_wait(env, lov); + + /* + * Only LLT_EMPTY <-> LLT_RAID0 transitions are supported. + */ switch (lov->lo_type) { case LLT_EMPTY: if (lsm != NULL) @@ -650,8 +681,7 @@ static int lov_conf_set(const struct lu_env *env, struct cl_object *obj, EXIT; out: - lov->lo_owner = NULL; - cfs_up_write(&lov->lo_type_guard); + lov_conf_unlock(lov); RETURN(result); } @@ -684,8 +714,8 @@ static int lov_object_print(const struct lu_env *env, void *cookie, struct cl_page *lov_page_init(const struct lu_env *env, struct cl_object *obj, struct cl_page *page, cfs_page_t *vmpage) { - return LOV_2DISPATCH(cl2lov(obj), - llo_page_init, env, obj, page, vmpage); + return LOV_2DISPATCH_NOLOCK(cl2lov(obj), + llo_page_init, env, obj, page, vmpage); } /** @@ -695,15 +725,9 @@ struct cl_page *lov_page_init(const struct lu_env *env, struct cl_object *obj, int lov_io_init(const struct lu_env *env, struct cl_object *obj, struct cl_io *io) { - struct lov_io *lio = lov_env_io(env); - CL_IO_SLICE_CLEAN(lov_env_io(env), lis_cl); - - /* hold lsm before initializing because io relies on it */ - lio->lis_lsm = lov_lsm_addref(cl2lov(obj)); - - /* No need to lock because we've taken one refcount of layout. */ - return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_io_init, env, obj, io); + return LOV_2DISPATCH_MAYLOCK(cl2lov(obj), llo_io_init, + !io->ci_ignore_layout, env, obj, io); } /** @@ -784,10 +808,11 @@ struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov) struct lov_stripe_md *lsm = NULL; lov_conf_freeze(lov); - if (!lov->lo_lsm_invalid && lov->lo_lsm != NULL) { + if (lov->lo_lsm != NULL) { lsm = lsm_addref(lov->lo_lsm); - CDEBUG(D_INODE, "lsm %p addref %d by %p.\n", - lsm, cfs_atomic_read(&lsm->lsm_refc), cfs_current()); + CDEBUG(D_INODE, "lsm %p addref %d/%d by %p.\n", + lsm, cfs_atomic_read(&lsm->lsm_refc), + lov->lo_lsm_invalid, cfs_current()); } lov_conf_thaw(lov); return lsm; diff --git a/lustre/obdclass/cl_io.c b/lustre/obdclass/cl_io.c index 6619073..5bae736 100644 --- a/lustre/obdclass/cl_io.c +++ b/lustre/obdclass/cl_io.c @@ -112,7 +112,7 @@ void cl_io_fini(const struct lu_env *env, struct cl_io *io) ENTRY; while (!cfs_list_empty(&io->ci_layers)) { - slice = container_of(io->ci_layers.next, struct cl_io_slice, + slice = container_of(io->ci_layers.prev, struct cl_io_slice, cis_linkage); cfs_list_del_init(&slice->cis_linkage); if (slice->cis_iop->op[io->ci_type].cio_fini != NULL) @@ -137,10 +137,11 @@ void cl_io_fini(const struct lu_env *env, struct cl_io *io) case CIT_FSYNC: LASSERT(!io->ci_need_restart); break; + case CIT_SETATTR: case CIT_MISC: /* Check ignore layout change conf */ - LASSERT(ergo(io->ci_ignore_layout, !io->ci_need_restart)); - case CIT_SETATTR: + LASSERT(ergo(io->ci_ignore_layout || !io->ci_verify_layout, + !io->ci_need_restart)); break; default: LBUG(); diff --git a/lustre/obdclass/cl_lock.c b/lustre/obdclass/cl_lock.c index c3ad51e..72f8a1a 100644 --- a/lustre/obdclass/cl_lock.c +++ b/lustre/obdclass/cl_lock.c @@ -828,8 +828,8 @@ static void cl_lock_delete0(const struct lu_env *env, struct cl_lock *lock) cfs_spin_lock(&head->coh_lock_guard); cfs_list_del_init(&lock->cll_linkage); - cfs_spin_unlock(&head->coh_lock_guard); + /* * From now on, no new references to this lock can be acquired * by cl_lock_lookup(). -- 1.8.3.1