From 2989b9dab9e87529ccadfc5711960b71e5e57b18 Mon Sep 17 00:00:00 2001 From: Jinshan Xiong Date: Fri, 29 Jun 2012 15:52:19 +0800 Subject: [PATCH] LU-169 ldlm: add support for layout lock Add support for layout lock at client side. Signed-off-by: Jinshan Xiong Change-Id: Icbdf691134bfb403b0e2019ed364da3e3a11bf5c Reviewed-on: http://review.whamcloud.com/2025 Tested-by: Hudson Tested-by: Maloo Reviewed-by: jacques-Charles Lafoucriere Reviewed-by: Fan Yong Reviewed-by: Oleg Drokin --- lustre/include/cl_object.h | 10 ++ lustre/ldlm/ldlm_lockd.c | 28 +++++ lustre/llite/file.c | 140 ++++++++++++++++++++++++- lustre/llite/llite_internal.h | 10 +- lustre/llite/llite_lib.c | 46 ++++++--- lustre/llite/namei.c | 16 ++- lustre/lov/lov_cl_internal.h | 37 ++++--- lustre/lov/lov_io.c | 6 +- lustre/lov/lov_lock.c | 8 +- lustre/lov/lov_object.c | 234 ++++++++++++++++++++++++++---------------- lustre/lov/lov_page.c | 6 +- lustre/lov/lovsub_lock.c | 11 +- lustre/mdc/mdc_locks.c | 108 +++++++++++++------ 13 files changed, 489 insertions(+), 171 deletions(-) diff --git a/lustre/include/cl_object.h b/lustre/include/cl_object.h index 53bb5ca..8f27bd6 100644 --- a/lustre/include/cl_object.h +++ b/lustre/include/cl_object.h @@ -276,6 +276,16 @@ struct cl_object_conf { * VFS inode. This is consumed by vvp. */ struct inode *coc_inode; + /** + * Validate object conf. If object is using an invalid conf, + * then invalidate it and set the new layout. + */ + bool coc_validate_only; + /** + * Invalidate the current stripe configuration due to losing + * layout lock. + */ + bool coc_invalidate; }; /** diff --git a/lustre/ldlm/ldlm_lockd.c b/lustre/ldlm/ldlm_lockd.c index 7833e72..d4134b9 100644 --- a/lustre/ldlm/ldlm_lockd.c +++ b/lustre/ldlm/ldlm_lockd.c @@ -1553,6 +1553,7 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, struct ldlm_request *dlm_req, struct ldlm_lock *lock) { + int lvb_len; CFS_LIST_HEAD(ast_list); ENTRY; @@ -1569,6 +1570,33 @@ static void ldlm_handle_cp_callback(struct ptlrpc_request *req, } } + lvb_len = req_capsule_get_size(&req->rq_pill, &RMF_DLM_LVB, RCL_CLIENT); + if (lvb_len > 0) { + if (lock->l_lvb_len > 0) { + /* for extent lock, lvb contains ost_lvb{}. */ + LASSERT(lock->l_lvb_data != NULL); + LASSERTF(lock->l_lvb_len == lvb_len, + "preallocated %d, actual %d.\n", + lock->l_lvb_len, lvb_len); + } else { /* for layout lock, lvb has variable length */ + void *lvb_data; + + OBD_ALLOC(lvb_data, lvb_len); + if (lvb_data == NULL) + LDLM_ERROR(lock, "no memory.\n"); + + lock_res_and_lock(lock); + if (lvb_data == NULL) { + lock->l_flags |= LDLM_FL_FAILED; + } else { + LASSERT(lock->l_lvb_data == NULL); + lock->l_lvb_data = lvb_data; + lock->l_lvb_len = lvb_len; + } + unlock_res_and_lock(lock); + } + } + lock_res_and_lock(lock); if (lock->l_destroyed || lock->l_granted_mode == lock->l_req_mode) { diff --git a/lustre/llite/file.c b/lustre/llite/file.c index c9dad81..8ac9cdf 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -1487,7 +1487,7 @@ static int ll_lov_getstripe(struct inode *inode, unsigned long arg) lsm = ccc_inode_lsm_get(inode); if (lsm != NULL) rc = obd_iocontrol(LL_IOC_LOV_GETSTRIPE, ll_i2dtexp(inode), 0, - lsm, (void *)arg); + lsm, (void *)arg); ccc_inode_lsm_put(inode, lsm); RETURN(rc); } @@ -2800,3 +2800,141 @@ enum llioc_iter ll_iocontrol_call(struct inode *inode, struct file *file, *rcp = rc; return ret; } + +int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf) +{ + struct ll_inode_info *lli = ll_i2info(inode); + struct cl_env_nest nest; + struct lu_env *env; + int result; + ENTRY; + + if (lli->lli_clob == NULL) + RETURN(0); + + env = cl_env_nested_get(&nest); + if (IS_ERR(env)) + RETURN(PTR_ERR(env)); + + result = cl_conf_set(env, lli->lli_clob, conf); + cl_env_nested_put(&nest, env); + RETURN(result); +} + +/** + * This function checks if there exists a LAYOUT lock on the client side, + * or enqueues it if it doesn't have one in cache. + * + * This function will not hold layout lock so it may be revoked any time after + * this function returns. Any operations depend on layout should be redone + * in that case. + * + * This function should be called before lov_io_init() to get an uptodate + * layout version, the caller should save the version number and after IO + * is finished, this function should be called again to verify that layout + * is not changed during IO time. + */ +int ll_layout_refresh(struct inode *inode, __u32 *gen) +{ + struct ll_inode_info *lli = ll_i2info(inode); + struct ll_sb_info *sbi = ll_i2sbi(inode); + struct md_op_data *op_data = NULL; + struct ptlrpc_request *req = NULL; + struct lookup_intent it = { .it_op = IT_LAYOUT }; + struct lustre_handle lockh; + ldlm_mode_t mode; + struct cl_object_conf conf = { .coc_inode = inode, + .coc_validate_only = true }; + int rc; + ENTRY; + + *gen = 0; + if (!(ll_i2sbi(inode)->ll_flags & LL_SBI_LAYOUT_LOCK)) + RETURN(0); + + /* sanity checks */ + LASSERT(fid_is_sane(ll_inode2fid(inode))); + LASSERT(S_ISREG(inode->i_mode)); + + /* mostly layout lock is caching on the local side, so try to match + * it before grabbing layout lock mutex. */ + mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh); + if (mode != 0) { /* hit cached lock */ + struct lov_stripe_md *lsm; + + lsm = ccc_inode_lsm_get(inode); + if (lsm != NULL) + *gen = lsm->lsm_layout_gen; + ccc_inode_lsm_put(inode, lsm); + ldlm_lock_decref(&lockh, mode); + + RETURN(0); + } + + op_data = ll_prep_md_op_data(NULL, inode, inode, NULL, + 0, 0, LUSTRE_OPC_ANY, NULL); + if (IS_ERR(op_data)) + RETURN(PTR_ERR(op_data)); + + /* take layout lock mutex to enqueue layout lock exclusively. */ + cfs_mutex_lock(&lli->lli_layout_mutex); + + /* make sure the old conf goes away */ + ll_layout_conf(inode, &conf); + + /* enqueue layout lock */ + rc = md_intent_lock(sbi->ll_md_exp, op_data, NULL, 0, &it, 0, + &req, ll_md_blocking_ast, 0); + if (rc == 0) { + /* we get a new lock, so update the lock data */ + lockh.cookie = it.d.lustre.it_lock_handle; + md_set_lock_data(sbi->ll_md_exp, &lockh.cookie, inode, NULL); + + /* req == NULL is when lock was found in client cache, without + * any request to server (but lsm can be canceled just after a + * release) */ + if (req != NULL) { + struct ldlm_lock *lock = ldlm_handle2lock(&lockh); + struct lustre_md md = { NULL }; + void *lmm; + int lmmsize; + + /* for IT_LAYOUT lock, lmm is returned in lock's lvb + * data via completion callback */ + LASSERT(lock != NULL); + lmm = lock->l_lvb_data; + lmmsize = lock->l_lvb_len; + if (lmm != NULL) + rc = obd_unpackmd(sbi->ll_dt_exp, &md.lsm, + lmm, lmmsize); + if (rc == 0) { + if (md.lsm != NULL) + *gen = md.lsm->lsm_layout_gen; + + memset(&conf, 0, sizeof conf); + conf.coc_inode = inode; + conf.u.coc_md = &md; + ll_layout_conf(inode, &conf); + /* is this racy? */ + lli->lli_has_smd = md.lsm != NULL; + } + if (md.lsm != NULL) + obd_free_memmd(sbi->ll_dt_exp, &md.lsm); + + LDLM_LOCK_PUT(lock); + ptlrpc_req_finished(req); + } else { /* hit caching lock */ + struct lov_stripe_md *lsm; + + lsm = ccc_inode_lsm_get(inode); + if (lsm != NULL) + *gen = lsm->lsm_layout_gen; + ccc_inode_lsm_put(inode, lsm); + } + ll_intent_drop_lock(&it); + } + cfs_mutex_unlock(&lli->lli_layout_mutex); + ll_finish_md_op_data(op_data); + + RETURN(rc); +} diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index ecc249e..510a9b9 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -265,7 +265,10 @@ struct ll_inode_info { * some of the following members can be moved into u.f. */ bool lli_has_smd; - struct cl_object *lli_clob; + struct cl_object *lli_clob; + + /* mutex to request for layout lock exclusively. */ + cfs_mutex_t lli_layout_mutex; }; /* @@ -387,6 +390,7 @@ enum stats_track_type { #define LL_SBI_64BIT_HASH 0x4000 /* support 64-bits dir hash/offset */ #define LL_SBI_AGL_ENABLED 0x8000 /* enable agl */ #define LL_SBI_VERBOSE 0x10000 /* verbose mount/umount */ +#define LL_SBI_LAYOUT_LOCK 0x20000 /* layout lock support */ /* default value for ll_sb_info->contention_time */ #define SBI_DEFAULT_CONTENTION_SECONDS 60 @@ -661,6 +665,7 @@ struct page *ll_get_dir_page(struct file *filp, struct inode *dir, __u64 hash, int ll_readdir(struct file *filp, void *cookie, filldir_t filldir); int ll_get_mdt_idx(struct inode *inode); +char *ll_get_fsname(struct inode *inode); /* llite/namei.c */ int ll_objects_destroy(struct ptlrpc_request *request, struct inode *dir); @@ -1536,4 +1541,7 @@ struct if_quotactl_18 { #warning "remove old LL_IOC_QUOTACTL_18 compatibility code" #endif /* LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2,7,50,0) */ +int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf); +int ll_layout_refresh(struct inode *inode, __u32 *gen); + #endif /* LLITE_INTERNAL_H */ diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index ec264c4..6da9064 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -372,6 +372,11 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt, else sbi->ll_md_brw_size = CFS_PAGE_SIZE; + if (data->ocd_connect_flags & OBD_CONNECT_LAYOUTLOCK) { + LCONSOLE_INFO("Layout lock feature supported.\n"); + sbi->ll_flags |= LL_SBI_LAYOUT_LOCK; + } + obd = class_name2obd(dt); if (!obd) { CERROR("DT %s: not setup or attached\n", dt); @@ -908,6 +913,7 @@ void ll_lli_init(struct ll_inode_info *lli) CFS_INIT_LIST_HEAD(&lli->lli_agl_list); lli->lli_agl_index = 0; } + cfs_mutex_init(&lli->lli_layout_mutex); } static inline int ll_bdi_register(struct backing_dev_info *bdi) @@ -1602,16 +1608,15 @@ void ll_inode_size_unlock(struct inode *inode) void ll_update_inode(struct inode *inode, struct lustre_md *md) { - struct ll_inode_info *lli = ll_i2info(inode); - struct mdt_body *body = md->body; - struct lov_stripe_md *lsm = md->lsm; - struct ll_sb_info *sbi = ll_i2sbi(inode); - - LASSERT ((lsm != NULL) == ((body->valid & OBD_MD_FLEASIZE) != 0)); - if (lsm != NULL) { - LASSERT(S_ISREG(inode->i_mode)); - - cfs_mutex_lock(&lli->lli_och_mutex); + struct ll_inode_info *lli = ll_i2info(inode); + struct mdt_body *body = md->body; + struct lov_stripe_md *lsm = md->lsm; + struct ll_sb_info *sbi = ll_i2sbi(inode); + + LASSERT ((lsm != NULL) == ((body->valid & OBD_MD_FLEASIZE) != 0)); + if (lsm != NULL) { + LASSERT(S_ISREG(inode->i_mode)); + cfs_mutex_lock(&lli->lli_och_mutex); CDEBUG(D_INODE, "adding lsm %p to inode %lu/%u(%p)\n", lsm, inode->i_ino, inode->i_generation, inode); /* cl_file_inode_init must go before lli_has_smd or a race @@ -1621,12 +1626,13 @@ void ll_update_inode(struct inode *inode, struct lustre_md *md) if (cl_file_inode_init(inode, md) == 0) lli->lli_has_smd = true; cfs_mutex_unlock(&lli->lli_och_mutex); + lli->lli_maxbytes = lsm->lsm_maxbytes; if (lli->lli_maxbytes > MAX_LFS_FILESIZE) lli->lli_maxbytes = MAX_LFS_FILESIZE; if (md->lsm != NULL) obd_free_memmd(ll_i2dtexp(inode), &md->lsm); - } + } if (sbi->ll_flags & LL_SBI_RMT_CLIENT) { if (body->valid & OBD_MD_FLRMTPERM) @@ -2064,8 +2070,9 @@ int ll_prep_inode(struct inode **inode, struct ptlrpc_request *req, struct super_block *sb) { - struct ll_sb_info *sbi = NULL; - struct lustre_md md; + struct ll_sb_info *sbi = NULL; + struct lustre_md md; + __u64 ibits; int rc; ENTRY; @@ -2104,9 +2111,18 @@ int ll_prep_inode(struct inode **inode, } } + /* sanity check for LAYOUT lock. */ + ibits = MDS_INODELOCK_LAYOUT; + if (S_ISREG(md.body->mode) && sbi->ll_flags & LL_SBI_LAYOUT_LOCK && + md.lsm != NULL && !ll_have_md_lock(*inode, &ibits, LCK_MINMODE)) { + CERROR("%s: inode "DFID" (%p) layout lock not granted.\n", + ll_get_fsname(*inode), PFID(ll_inode2fid(*inode)), + *inode); + } + out: - md_free_lustre_md(sbi->ll_md_exp, &md); - RETURN(rc); + md_free_lustre_md(sbi->ll_md_exp, &md); + RETURN(rc); } int ll_obd_statfs(struct inode *inode, void *arg) diff --git a/lustre/llite/namei.c b/lustre/llite/namei.c index 331db4e..1da7ddc 100644 --- a/lustre/llite/namei.c +++ b/lustre/llite/namei.c @@ -216,8 +216,10 @@ int ll_md_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, break; LASSERT(lock->l_flags & LDLM_FL_CANCELING); - /* For OPEN locks we differentiate between lock modes - CR, CW. PR - bug 22891 */ - if (bits & (MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE)) + /* For OPEN locks we differentiate between lock modes + * LCK_CR, LCK_CW, LCK_PR - bug 22891 */ + if (bits & (MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE | + MDS_INODELOCK_LAYOUT)) ll_have_md_lock(inode, &bits, LCK_MINMODE); if (bits & MDS_INODELOCK_OPEN) @@ -251,7 +253,15 @@ int ll_md_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc, ll_md_real_close(inode, flags); } - lli = ll_i2info(inode); + lli = ll_i2info(inode); + if (bits & MDS_INODELOCK_LAYOUT) { + struct cl_object_conf conf = { .coc_inode = inode, + .coc_invalidate = true }; + rc = ll_layout_conf(inode, &conf); + if (rc) + CDEBUG(D_INODE, "invaliding layout %d.\n", rc); + } + if (bits & MDS_INODELOCK_UPDATE) lli->lli_flags &= ~LLIF_MDS_SIZE_LOCK; diff --git a/lustre/lov/lov_cl_internal.h b/lustre/lov/lov_cl_internal.h index 06834f7..3870ddf 100644 --- a/lustre/lov/lov_cl_internal.h +++ b/lustre/lov/lov_cl_internal.h @@ -200,14 +200,29 @@ struct lov_object { */ enum lov_layout_type lo_type; /** + * True if layout is valid. This bit is cleared when layout lock + * is lost. + */ + unsigned lo_lsm_invalid:1; + /** + * Layout metadata. + */ + struct lov_stripe_md *lo_lsm; + /** * Waitq - wait for no one else is using lo_lsm */ - cfs_waitq_t lo_waitq; + cfs_waitq_t lo_waitq; union lov_layout_state { struct lov_layout_raid0 { unsigned lo_nr; - struct lov_stripe_md *lo_lsm; + /** + * When this is true, lov_object::lo_attr contains + * valid up to date attributes for a top-level + * object. This field is reset to 0 when attributes of + * any sub-object change. + */ + int lo_attr_valid; /** * Array of sub-objects. Allocated when top-object is * created (lov_init_raid0()). @@ -229,13 +244,6 @@ struct lov_object { */ cfs_spinlock_t lo_sub_lock; /** - * When this is true, lov_object::lo_attr contains - * valid up to date attributes for a top-level - * object. This field is reset to 0 when attributes of - * any sub-object change. - */ - int lo_attr_valid; - /** * Cached object attribute, built from sub-object * attributes. */ @@ -803,13 +811,10 @@ static inline struct lov_thread_info *lov_env_info(const struct lu_env *env) static inline struct lov_layout_raid0 *lov_r0(struct lov_object *lov) { - struct lov_layout_raid0 *raid0; - - LASSERT(lov->lo_type == LLT_RAID0); - raid0 = &lov->u.raid0; - LASSERT(raid0->lo_lsm->lsm_wire.lw_magic == LOV_MAGIC || - raid0->lo_lsm->lsm_wire.lw_magic == LOV_MAGIC_V3); - return raid0; + LASSERT(lov->lo_type == LLT_RAID0); + LASSERT(lov->lo_lsm->lsm_wire.lw_magic == LOV_MAGIC || + lov->lo_lsm->lsm_wire.lw_magic == LOV_MAGIC_V3); + return &lov->u.raid0; } /** @} lov */ diff --git a/lustre/lov/lov_io.c b/lustre/lov/lov_io.c index 12f620b..e6578ed 100644 --- a/lustre/lov/lov_io.c +++ b/lustre/lov/lov_io.c @@ -303,15 +303,13 @@ static int lov_io_subio_init(const struct lu_env *env, struct lov_io *lio, static void lov_io_slice_init(struct lov_io *lio, struct lov_object *obj, struct cl_io *io) { - struct lov_stripe_md *lsm = lov_lsm_addref(obj); ENTRY; io->ci_result = 0; lio->lis_object = obj; - LASSERT(lsm != NULL); - lio->lis_lsm = lsm; /* called inside lo_type_guard. */ - lio->lis_stripe_count = lsm->lsm_stripe_count; + LASSERT(lio->lis_lsm != NULL); + lio->lis_stripe_count = lio->lis_lsm->lsm_stripe_count; switch (io->ci_type) { case CIT_READ: diff --git a/lustre/lov/lov_lock.c b/lustre/lov/lov_lock.c index 40192ce..1fc696c 100644 --- a/lustre/lov/lov_lock.c +++ b/lustre/lov/lov_lock.c @@ -316,7 +316,7 @@ static int lov_lock_sub_init(const struct lu_env *env, * XXX for wide striping smarter algorithm is desirable, * breaking out of the loop, early. */ - if (lov_stripe_intersects(r0->lo_lsm, i, + if (lov_stripe_intersects(loo->lo_lsm, i, file_start, file_end, &start, &end)) nr++; } @@ -334,7 +334,7 @@ static int lov_lock_sub_init(const struct lu_env *env, * top-lock. */ for (i = 0, nr = 0; i < r0->lo_nr; ++i) { - if (lov_stripe_intersects(r0->lo_lsm, i, + if (lov_stripe_intersects(loo->lo_lsm, i, file_start, file_end, &start, &end)) { struct cl_lock_descr *descr; @@ -919,7 +919,7 @@ static int lock_lock_multi_match() if (sub->sub_lock == NULL) continue; subobj = sub->sub_descr.cld_obj; - if (!lov_stripe_intersects(r0->lo_lsm, sub->sub_stripe, + if (!lov_stripe_intersects(loo->lo_lsm, sub->sub_stripe, fstart, fend, &start, &end)) continue; subneed->cld_start = cl_index(subobj, start); @@ -943,7 +943,7 @@ static int lov_lock_stripe_is_matching(const struct lu_env *env, const struct cl_lock_descr *child, const struct cl_lock_descr *descr) { - struct lov_stripe_md *lsm = lov_r0(lov)->lo_lsm; + struct lov_stripe_md *lsm = lov->lo_lsm; obd_off start; obd_off end; int result; diff --git a/lustre/lov/lov_object.c b/lustre/lov/lov_object.c index 33a47ed..7f92431 100644 --- a/lustre/lov/lov_object.c +++ b/lustre/lov/lov_object.c @@ -59,7 +59,7 @@ struct lov_layout_operations { struct lov_object *lov, const struct cl_object_conf *conf, union lov_layout_state *state); - void (*llo_delete)(const struct lu_env *env, struct lov_object *lov, + int (*llo_delete)(const struct lu_env *env, struct lov_object *lov, union lov_layout_state *state); void (*llo_fini)(const struct lu_env *env, struct lov_object *lov, union lov_layout_state *state); @@ -137,7 +137,7 @@ static int lov_init_sub(const struct lu_env *env, struct lov_object *lov, subhdr = cl_object_header(stripe); parent = subhdr->coh_parent; - oinfo = r0->lo_lsm->lsm_oinfo[idx]; + oinfo = lov->lo_lsm->lsm_oinfo[idx]; CDEBUG(D_INODE, DFID"@%p[%d] -> "DFID"@%p: id: "LPU64" seq: "LPU64 " idx: %d gen: %d\n", PFID(&subhdr->coh_lu.loh_fid), subhdr, idx, @@ -188,7 +188,8 @@ static int lov_init_raid0(const struct lu_env *env, LOV_MAGIC_V1, LOV_MAGIC_V3, lsm->lsm_magic); } - r0->lo_lsm = lsm_addref(lsm); + LASSERT(lov->lo_lsm == NULL); + lov->lo_lsm = lsm_addref(lsm); r0->lo_nr = lsm->lsm_stripe_count; LASSERT(r0->lo_nr <= lov_targets_nr(dev)); @@ -221,10 +222,11 @@ static int lov_init_raid0(const struct lu_env *env, RETURN(result); } -static void lov_delete_empty(const struct lu_env *env, struct lov_object *lov, - union lov_layout_state *state) +static int lov_delete_empty(const struct lu_env *env, struct lov_object *lov, + union lov_layout_state *state) { - LASSERT(lov->lo_type == LLT_EMPTY); + LASSERT(lov->lo_type == LLT_EMPTY); + return 0; } static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov, @@ -274,19 +276,18 @@ static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov, LASSERT(r0->lo_sub[idx] == NULL); } -static void lov_delete_raid0(const struct lu_env *env, struct lov_object *lov, - union lov_layout_state *state) +static int lov_delete_raid0(const struct lu_env *env, struct lov_object *lov, + union lov_layout_state *state) { struct lov_layout_raid0 *r0 = &state->raid0; - struct lov_stripe_md *lsm = r0->lo_lsm; - struct l_wait_info lwi = { 0 }; - int i; + struct lov_stripe_md *lsm = lov->lo_lsm; + int i; ENTRY; - /* wait until there is no extra users. */ dump_lsm(D_INODE, lsm); - l_wait_event(lov->lo_waitq, cfs_atomic_read(&lsm->lsm_refc) == 1, &lwi); + if (cfs_atomic_read(&lsm->lsm_refc) > 1) + RETURN(-EBUSY); if (r0->lo_sub != NULL) { for (i = 0; i < r0->lo_nr; ++i) { @@ -300,7 +301,7 @@ static void lov_delete_raid0(const struct lu_env *env, struct lov_object *lov, lov_subobject_kill(env, lov, los, i); } } - EXIT; + RETURN(0); } static void lov_fini_empty(const struct lu_env *env, struct lov_object *lov, @@ -321,8 +322,11 @@ static void lov_fini_raid0(const struct lu_env *env, struct lov_object *lov, r0->lo_sub = NULL; } - LASSERT(cfs_atomic_read(&r0->lo_lsm->lsm_refc) == 1); - lov_free_memmd(&r0->lo_lsm); + LASSERTF(cfs_atomic_read(&lov->lo_lsm->lsm_refc) == 1, + "actual %d proc %p.\n", + cfs_atomic_read(&lov->lo_lsm->lsm_refc), cfs_current()); + lov_free_memmd(&lov->lo_lsm); + lov->lo_lsm = NULL; EXIT; } @@ -371,14 +375,19 @@ static int lov_attr_get_empty(const struct lu_env *env, struct cl_object *obj, static int lov_attr_get_raid0(const struct lu_env *env, struct cl_object *obj, struct cl_attr *attr) { - struct lov_object *lov = cl2lov(obj); - struct lov_layout_raid0 *r0 = lov_r0(lov); - struct lov_stripe_md *lsm = lov->u.raid0.lo_lsm; + struct lov_object *lov = cl2lov(obj); + struct lov_layout_raid0 *r0 = lov_r0(lov); + struct lov_stripe_md *lsm = lov->lo_lsm; struct ost_lvb *lvb = &lov_env_info(env)->lti_lvb; __u64 kms; int result = 0; ENTRY; + + /* this is called w/o holding type guard mutex, so it must be inside + * an on going IO otherwise lsm may be replaced. */ + LASSERT(cfs_atomic_read(&lsm->lsm_refc) > 1); + if (!r0->lo_attr_valid) { /* * Fill LVB with attributes already initialized by the upper @@ -452,18 +461,29 @@ const static struct lov_layout_operations lov_dispatch[] = { lov_dispatch[__llt].op(__VA_ARGS__); \ }) +static inline void lov_conf_freeze(struct lov_object *lov) +{ + if (lov->lo_owner != cfs_current()) + cfs_down_read(&lov->lo_type_guard); +} + +static inline void lov_conf_thaw(struct lov_object *lov) +{ + if (lov->lo_owner != cfs_current()) + cfs_up_read(&lov->lo_type_guard); +} + #define LOV_2DISPATCH_MAYLOCK(obj, op, lock, ...) \ ({ \ struct lov_object *__obj = (obj); \ int __lock = !!(lock); \ typeof(lov_dispatch[0].op(__VA_ARGS__)) __result; \ \ - __lock &= __obj->lo_owner != cfs_current(); \ if (__lock) \ - cfs_down_read(&__obj->lo_type_guard); \ + lov_conf_freeze(__obj); \ __result = LOV_2DISPATCH_NOLOCK(obj, op, __VA_ARGS__); \ if (__lock) \ - cfs_up_read(&__obj->lo_type_guard); \ + lov_conf_thaw(__obj); \ __result; \ }) @@ -478,59 +498,72 @@ do { \ struct lov_object *__obj = (obj); \ enum lov_layout_type __llt; \ \ - if (__obj->lo_owner != cfs_current()) \ - cfs_down_read(&__obj->lo_type_guard); \ + lov_conf_freeze(__obj); \ __llt = __obj->lo_type; \ LASSERT(0 <= __llt && __llt < ARRAY_SIZE(lov_dispatch)); \ lov_dispatch[__llt].op(__VA_ARGS__); \ - if (__obj->lo_owner != cfs_current()) \ - cfs_up_read(&__obj->lo_type_guard); \ + lov_conf_thaw(__obj); \ } while (0) +static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov) +{ + struct l_wait_info lwi = { 0 }; + struct lov_stripe_md *lsm = lov->lo_lsm; + ENTRY; + + if (!lov->lo_lsm_invalid || lsm == NULL) + RETURN(0); + + l_wait_event(lov->lo_waitq, cfs_atomic_read(&lsm->lsm_refc) == 1, &lwi); + RETURN(0); +} + static int lov_layout_change(const struct lu_env *env, - struct lov_object *obj, enum lov_layout_type llt, + struct lov_object *lov, enum lov_layout_type llt, const struct cl_object_conf *conf) { - int result; - union lov_layout_state *state = &lov_env_info(env)->lti_state; - const struct lov_layout_operations *old_ops; - const struct lov_layout_operations *new_ops; + int result; + union lov_layout_state *state = &lov_env_info(env)->lti_state; + const struct lov_layout_operations *old_ops; + const struct lov_layout_operations *new_ops; - LASSERT(0 <= obj->lo_type && obj->lo_type < ARRAY_SIZE(lov_dispatch)); - LASSERT(0 <= llt && llt < ARRAY_SIZE(lov_dispatch)); - ENTRY; + struct cl_object_header *hdr = cl_object_header(&lov->lo_cl); + void *cookie; + struct lu_env *nested; + int refcheck; - old_ops = &lov_dispatch[obj->lo_type]; - new_ops = &lov_dispatch[llt]; - - result = new_ops->llo_init(env, lu2lov_dev(obj->lo_cl.co_lu.lo_dev), - obj, conf, state); - if (result == 0) { - struct cl_object_header *hdr = cl_object_header(&obj->lo_cl); - void *cookie; - struct lu_env *nested; - int refcheck; - - cookie = cl_env_reenter(); - nested = cl_env_get(&refcheck); - if (!IS_ERR(nested)) - cl_object_prune(nested, &obj->lo_cl); - else - result = PTR_ERR(nested); - cl_env_put(nested, &refcheck); - cl_env_reexit(cookie); - - old_ops->llo_delete(env, obj, &obj->u); - old_ops->llo_fini(env, obj, &obj->u); - LASSERT(cfs_list_empty(&hdr->coh_locks)); - LASSERT(hdr->coh_tree.rnode == NULL); - LASSERT(hdr->coh_pages == 0); - - new_ops->llo_install(env, obj, state); - obj->lo_type = llt; - } else - new_ops->llo_fini(env, obj, state); - RETURN(result); + LASSERT(0 <= lov->lo_type && lov->lo_type < ARRAY_SIZE(lov_dispatch)); + LASSERT(0 <= llt && llt < ARRAY_SIZE(lov_dispatch)); + ENTRY; + + cookie = cl_env_reenter(); + nested = cl_env_get(&refcheck); + if (!IS_ERR(nested)) + cl_object_prune(nested, &lov->lo_cl); + else + result = PTR_ERR(nested); + cl_env_put(nested, &refcheck); + cl_env_reexit(cookie); + + old_ops = &lov_dispatch[lov->lo_type]; + new_ops = &lov_dispatch[llt]; + + result = old_ops->llo_delete(env, lov, &lov->u); + if (result == 0) { + old_ops->llo_fini(env, lov, &lov->u); + LASSERT(cfs_list_empty(&hdr->coh_locks)); + LASSERT(hdr->coh_tree.rnode == NULL); + LASSERT(hdr->coh_pages == 0); + + result = new_ops->llo_init(env, + lu2lov_dev(lov->lo_cl.co_lu.lo_dev), + lov, conf, state); + if (result == 0) { + new_ops->llo_install(env, lov, state); + lov->lo_type = llt; + } + } + RETURN(result); } /***************************************************************************** @@ -570,27 +603,47 @@ static int lov_conf_set(const struct lu_env *env, struct cl_object *obj, struct lov_stripe_md *lsm = conf->u.coc_md->lsm; struct lov_object *lov = cl2lov(obj); int result = 0; + ENTRY; + + /* + * Only LLT_EMPTY <-> LLT_RAID0 transitions are supported. + */ + LASSERT(lov->lo_owner != cfs_current()); + cfs_down_write(&lov->lo_type_guard); + LASSERT(lov->lo_owner == NULL); + lov->lo_owner = cfs_current(); + + if (conf->coc_invalidate) { + lov->lo_lsm_invalid = 1; + GOTO(out, result = 0); + } + + if (conf->coc_validate_only) { + if (!lov->lo_lsm_invalid) + GOTO(out, result = 0); + + lov_layout_wait(env, lov); + /* fall through to set up new layout */ + } - ENTRY; - /* - * Currently only LLT_EMPTY -> LLT_RAID0 transition is supported. - */ - LASSERT(lov->lo_owner != cfs_current()); - cfs_down_write(&lov->lo_type_guard); - LASSERT(lov->lo_owner == NULL); - lov->lo_owner = cfs_current(); switch (lov->lo_type) { case LLT_EMPTY: if (lsm != NULL) result = lov_layout_change(env, lov, LLT_RAID0, conf); break; case LLT_RAID0: - if (lsm == NULL || lov_stripe_md_cmp(lov->u.raid0.lo_lsm, lsm)) + if (lsm == NULL) + result = lov_layout_change(env, lov, LLT_EMPTY, conf); + else if (lov_stripe_md_cmp(lov->lo_lsm, lsm)) result = -EOPNOTSUPP; break; default: LBUG(); } + lov->lo_lsm_invalid = result != 0; + EXIT; + +out: lov->lo_owner = NULL; cfs_up_write(&lov->lo_type_guard); RETURN(result); @@ -636,7 +689,13 @@ struct cl_page *lov_page_init(const struct lu_env *env, struct cl_object *obj, int lov_io_init(const struct lu_env *env, struct cl_object *obj, struct cl_io *io) { + struct lov_io *lio = lov_env_io(env); + CL_IO_SLICE_CLEAN(lov_env_io(env), lis_cl); + + /* hold lsm before initializing because io relies on it */ + lio->lis_lsm = lov_lsm_addref(cl2lov(obj)); + /* * Do not take lock in case of CIT_MISC io, because * @@ -728,16 +787,13 @@ struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov) { struct lov_stripe_md *lsm = NULL; - cfs_down_read(&lov->lo_type_guard); - switch (lov->lo_type) { - case LLT_RAID0: - lsm = lsm_addref(lov->u.raid0.lo_lsm); - case LLT_EMPTY: - break; - default: - LBUG(); + lov_conf_freeze(lov); + if (!lov->lo_lsm_invalid && lov->lo_lsm != NULL) { + lsm = lsm_addref(lov->lo_lsm); + CDEBUG(D_INODE, "lsm %p addref %d by %p.\n", + lsm, cfs_atomic_read(&lsm->lsm_refc), cfs_current()); } - cfs_up_read(&lov->lo_type_guard); + lov_conf_thaw(lov); return lsm; } @@ -746,8 +802,10 @@ void lov_lsm_decref(struct lov_object *lov, struct lov_stripe_md *lsm) if (lsm == NULL) return; - lov_free_memmd(&lsm); - if (lov->lo_owner != NULL) + CDEBUG(D_INODE, "lsm %p decref %d by %p.\n", + lsm, cfs_atomic_read(&lsm->lsm_refc), cfs_current()); + + if (lov_free_memmd(&lsm) <= 1 && lov->lo_lsm_invalid) cfs_waitq_signal(&lov->lo_waitq); } @@ -793,13 +851,13 @@ int lov_read_and_clear_async_rc(struct cl_object *clob) if (luobj != NULL) { struct lov_object *lov = lu2lov(luobj); - cfs_down_read(&lov->lo_type_guard); + lov_conf_freeze(lov); switch (lov->lo_type) { case LLT_RAID0: { struct lov_stripe_md *lsm; int i; - lsm = lov->u.raid0.lo_lsm; + lsm = lov->lo_lsm; LASSERT(lsm != NULL); for (i = 0; i < lsm->lsm_stripe_count; i++) { struct lov_oinfo *loi = lsm->lsm_oinfo[i]; @@ -813,7 +871,7 @@ int lov_read_and_clear_async_rc(struct cl_object *clob) default: LBUG(); } - cfs_up_read(&lov->lo_type_guard); + lov_conf_thaw(lov); } RETURN(rc); } diff --git a/lustre/lov/lov_page.c b/lustre/lov/lov_page.c index 03335ff..9e39543 100644 --- a/lustre/lov/lov_page.c +++ b/lustre/lov/lov_page.c @@ -184,9 +184,9 @@ struct cl_page *lov_page_init_raid0(const struct lu_env *env, ENTRY; offset = cl_offset(obj, page->cp_index); - stripe = lov_stripe_number(r0->lo_lsm, offset); - LASSERT(stripe < r0->lo_nr); - rc = lov_stripe_offset(r0->lo_lsm, offset, stripe, + stripe = lov_stripe_number(loo->lo_lsm, offset); + LASSERT(stripe < r0->lo_nr); + rc = lov_stripe_offset(loo->lo_lsm, offset, stripe, &suboff); LASSERT(rc == 0); diff --git a/lustre/lov/lovsub_lock.c b/lustre/lov/lovsub_lock.c index ab1d9d9..3915be9 100644 --- a/lustre/lov/lovsub_lock.c +++ b/lustre/lov/lovsub_lock.c @@ -153,10 +153,9 @@ static unsigned long lovsub_lock_weigh(const struct lu_env *env, * Maps start/end offsets within a stripe, to offsets within a file. */ static void lovsub_lock_descr_map(const struct cl_lock_descr *in, - struct lov_object *obj, - int stripe, struct cl_lock_descr *out) + struct lov_object *lov, + int stripe, struct cl_lock_descr *out) { - struct lov_stripe_md *lsm = lov_r0(obj)->lo_lsm; pgoff_t size; /* stripe size in pages */ pgoff_t skip; /* how many pages in every stripe are occupied by * "other" stripes */ @@ -167,9 +166,9 @@ static void lovsub_lock_descr_map(const struct cl_lock_descr *in, start = in->cld_start; end = in->cld_end; - if (lsm->lsm_stripe_count > 1) { - size = cl_index(lov2cl(obj), lsm->lsm_stripe_size); - skip = (lsm->lsm_stripe_count - 1) * size; + if (lov->lo_lsm->lsm_stripe_count > 1) { + size = cl_index(lov2cl(lov), lov->lo_lsm->lsm_stripe_size); + skip = (lov->lo_lsm->lsm_stripe_count - 1) * size; /* XXX overflow check here? */ start += start/size * skip + stripe * size; diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index d9219e0..746012e 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -464,9 +464,11 @@ static int mdc_finish_enqueue(struct obd_export *exp, struct lustre_handle *lockh, int rc) { - struct req_capsule *pill = &req->rq_pill; - struct ldlm_request *lockreq; - struct ldlm_reply *lockrep; + struct req_capsule *pill = &req->rq_pill; + struct ldlm_request *lockreq; + struct ldlm_reply *lockrep; + __u64 bits = 0; + struct lustre_intent_data *intent = &it->d.lustre; ENTRY; LASSERT(rc >= 0); @@ -492,20 +494,21 @@ static int mdc_finish_enqueue(struct obd_export *exp, ldlm_lock_decref(lockh, einfo->ei_mode); einfo->ei_mode = lock->l_req_mode; } - LDLM_LOCK_PUT(lock); - } + bits = lock->l_policy_data.l_inodebits.bits; + LDLM_LOCK_PUT(lock); + } - lockrep = req_capsule_server_get(pill, &RMF_DLM_REP); - LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */ + lockrep = req_capsule_server_get(pill, &RMF_DLM_REP); + LASSERT(lockrep != NULL); /* checked by ldlm_cli_enqueue() */ - it->d.lustre.it_disposition = (int)lockrep->lock_policy_res1; - it->d.lustre.it_status = (int)lockrep->lock_policy_res2; - it->d.lustre.it_lock_mode = einfo->ei_mode; - it->d.lustre.it_lock_handle = lockh->cookie; - it->d.lustre.it_data = req; + intent->it_disposition = (int)lockrep->lock_policy_res1; + intent->it_status = (int)lockrep->lock_policy_res2; + intent->it_lock_mode = einfo->ei_mode; + intent->it_lock_handle = lockh->cookie; + intent->it_data = req; - if (it->d.lustre.it_status < 0 && req->rq_replay) - mdc_clear_replay_flag(req, it->d.lustre.it_status); + if (intent->it_status < 0 && req->rq_replay) + mdc_clear_replay_flag(req, intent->it_status); /* If we're doing an IT_OPEN which did not result in an actual * successful open, then we need to remove the bit which saves @@ -515,11 +518,11 @@ static int mdc_finish_enqueue(struct obd_export *exp, * function without doing so, and try to replay a failed create * (bug 3440) */ if (it->it_op & IT_OPEN && req->rq_replay && - (!it_disposition(it, DISP_OPEN_OPEN) ||it->d.lustre.it_status != 0)) - mdc_clear_replay_flag(req, it->d.lustre.it_status); + (!it_disposition(it, DISP_OPEN_OPEN) ||intent->it_status != 0)) + mdc_clear_replay_flag(req, intent->it_status); - DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d", - it->it_op,it->d.lustre.it_disposition,it->d.lustre.it_status); + DEBUG_REQ(D_RPCTRACE, req, "op: %d disposition: %x, status: %d", + it->it_op, intent->it_disposition, intent->it_status); /* We know what to expect, so we do any byte flipping required here */ if (it->it_op & (IT_OPEN | IT_UNLINK | IT_LOOKUP | IT_GETATTR)) { @@ -540,7 +543,9 @@ static int mdc_finish_enqueue(struct obd_export *exp, * is swabbed by that handler correctly. */ mdc_set_open_replay_data(NULL, NULL, req); - } + } + + /* TODO: make sure LAYOUT lock must be granted along with EA */ if ((body->valid & (OBD_MD_FLDIREA | OBD_MD_FLEASIZE)) != 0) { void *eadata; @@ -616,9 +621,47 @@ static int mdc_finish_enqueue(struct obd_export *exp, if (capa == NULL) RETURN(-EPROTO); } - } + } else if (it->it_op & IT_LAYOUT) { + struct ldlm_lock *lock = ldlm_handle2lock(lockh); - RETURN(rc); + if (lock != NULL && lock->l_lvb_data == NULL) { + int lvb_len; + + /* maybe the lock was granted right away and layout + * is packed into RMF_DLM_LVB of req */ + lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, + RCL_SERVER); + if (lvb_len > 0) { + void *lvb; + void *lmm; + + lvb = req_capsule_server_get(pill, + &RMF_DLM_LVB); + if (lvb == NULL) { + LDLM_LOCK_PUT(lock); + RETURN(-EPROTO); + } + + OBD_ALLOC_LARGE(lmm, lvb_len); + if (lmm == NULL) { + LDLM_LOCK_PUT(lock); + RETURN(-ENOMEM); + } + memcpy(lmm, lvb, lvb_len); + + /* install lvb_data */ + lock_res_and_lock(lock); + LASSERT(lock->l_lvb_data == NULL); + lock->l_lvb_data = lmm; + lock->l_lvb_len = lvb_len; + unlock_res_and_lock(lock); + } + } + if (lock != NULL) + LDLM_LOCK_PUT(lock); + } + + RETURN(rc); } /* We always reserve enough space in the reply packet for a stripe MD, because @@ -637,6 +680,8 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, { .l_inodebits = { MDS_INODELOCK_LOOKUP } }; static const ldlm_policy_data_t update_policy = { .l_inodebits = { MDS_INODELOCK_UPDATE } }; + static const ldlm_policy_data_t layout_policy = + { .l_inodebits = { MDS_INODELOCK_LAYOUT } }; ldlm_policy_data_t const *policy = &lookup_policy; int generation, resends = 0; struct ldlm_reply *lockrep; @@ -647,10 +692,13 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, fid_build_reg_res_name(&op_data->op_fid1, &res_id); - if (it) - saved_flags |= LDLM_FL_HAS_INTENT; - if (it && it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR)) - policy = &update_policy; + if (it) { + saved_flags |= LDLM_FL_HAS_INTENT; + if (it->it_op & (IT_UNLINK | IT_GETATTR | IT_READDIR)) + policy = &update_policy; + else if (it->it_op & IT_LAYOUT) + policy = &layout_policy; + } LASSERT(reqp == NULL); @@ -673,11 +721,11 @@ resend: lmm = NULL; } else if (it->it_op & IT_UNLINK) req = mdc_intent_unlink_pack(exp, it, op_data); - else if (it->it_op & (IT_GETATTR | IT_LOOKUP | IT_LAYOUT)) - req = mdc_intent_getattr_pack(exp, it, op_data); - else if (it->it_op == IT_READDIR) - req = ldlm_enqueue_pack(exp); - else { + else if (it->it_op & (IT_GETATTR | IT_LOOKUP)) + req = mdc_intent_getattr_pack(exp, it, op_data); + else if (it->it_op & (IT_READDIR | IT_LAYOUT)) + req = ldlm_enqueue_pack(exp); + else { LBUG(); RETURN(-EINVAL); } -- 1.8.3.1