From: Bobi Jam Date: Thu, 6 Apr 2017 00:13:41 +0000 (+0800) Subject: LU-9008 pfl: dynamic layout modification with write/truncate X-Git-Tag: 2.9.56~46^2~2 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=7107e54983b0a701c5c02a1e3c521302e8f79810 LU-9008 pfl: dynamic layout modification with write/truncate * in lov_init_composite(), skip init sub object without LCME_FL_INIT layout component. * issue layout intent RPC during write/trunc ops when try to write to an un-init-ed component (even if at the lock stage). * After layout intent RPC issued, restart the IO. * get rid of unused lov_layout_operations::llo_install() interface. * add an empty mdt_layout_change() interface to handle intent layout write RPC. Reviewed-on: https://review.whamcloud.com/25317 Signed-off-by: Bobi Jam Change-Id: I2f79482187d2af2660dd86e55da3f5dc0138e94a Reviewed-by: Niu Yawei Reviewed-by: Jinshan Xiong --- diff --git a/lustre/include/cl_object.h b/lustre/include/cl_object.h index 655eafb..655e6b1 100644 --- a/lustre/include/cl_object.h +++ b/lustre/include/cl_object.h @@ -1829,6 +1829,11 @@ struct cl_io { */ ci_ignore_layout:1, /** + * Need MDS intervention to complete a write. This usually means the + * corresponding component is not initialized for the writing extent. + */ + ci_need_write_intent:1, + /** * Check if layout changed after the IO finishes. Mainly for HSM * requirement. If IO occurs to openning files, it doesn't need to * verify layout because HSM won't release openning files. diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index d3f0f75..df22acd 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -3119,22 +3119,22 @@ struct getparent { } __attribute__((packed)); enum { - LAYOUT_INTENT_ACCESS = 0, - LAYOUT_INTENT_READ = 1, - LAYOUT_INTENT_WRITE = 2, - LAYOUT_INTENT_GLIMPSE = 3, - LAYOUT_INTENT_TRUNC = 4, - LAYOUT_INTENT_RELEASE = 5, - LAYOUT_INTENT_RESTORE = 6 + LAYOUT_INTENT_ACCESS = 0, /** generic access */ + LAYOUT_INTENT_READ = 1, /** not used */ + LAYOUT_INTENT_WRITE = 2, /** write file, for comp layout */ + LAYOUT_INTENT_GLIMPSE = 3, /** not used */ + LAYOUT_INTENT_TRUNC = 4, /** truncate file, for comp layout */ + LAYOUT_INTENT_RELEASE = 5, /** reserved for HSM release */ + LAYOUT_INTENT_RESTORE = 6, /** reserved for HSM restore */ }; /* enqueue layout lock with intent */ struct layout_intent { - __u32 li_opc; /* intent operation for enqueue, read, write etc */ + __u32 li_opc; /* intent operation for enqueue, read, write etc */ __u32 li_flags; __u64 li_start; __u64 li_end; -}; +} __attribute__((packed)); /** * On the wire version of hsm_progress structure. diff --git a/lustre/include/lustre_sec.h b/lustre/include/lustre_sec.h index 8d49d38..7e6f490 100644 --- a/lustre/include/lustre_sec.h +++ b/lustre/include/lustre_sec.h @@ -63,6 +63,7 @@ struct ptlrpc_sec; struct ptlrpc_svc_ctx; struct ptlrpc_cli_ctx; struct ptlrpc_ctx_ops; +struct req_msg_field; /** * \addtogroup flavor flavor @@ -1084,7 +1085,8 @@ void sptlrpc_cli_free_reqbuf(struct ptlrpc_request *req); int sptlrpc_cli_alloc_repbuf(struct ptlrpc_request *req, int msgsize); void sptlrpc_cli_free_repbuf(struct ptlrpc_request *req); int sptlrpc_cli_enlarge_reqbuf(struct ptlrpc_request *req, - int segment, int newsize); + const struct req_msg_field *field, + int newsize); int sptlrpc_cli_unwrap_early_reply(struct ptlrpc_request *req, struct ptlrpc_request **req_ret); void sptlrpc_cli_finish_early_reply(struct ptlrpc_request *early_req); diff --git a/lustre/llite/file.c b/lustre/llite/file.c index c3b8cef..68ac435 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -1191,7 +1191,6 @@ restart: if (count > 0 && args->via_io_subtype == IO_NORMAL) args->u.normal.via_iter = vio->vui_iter; } - GOTO(out, rc); out: cl_io_fini(env, io); @@ -1226,7 +1225,7 @@ out: CDEBUG(D_VFSTRACE, "iot: %d, result: %zd\n", iot, result); - return result > 0 ? result : rc; + RETURN(result > 0 ? result : rc); } /** @@ -4121,9 +4120,9 @@ static int ll_layout_lock_set(struct lustre_handle *lockh, enum ldlm_mode mode, lock_res_and_lock(lock); lvb_ready = ldlm_is_lvb_ready(lock); unlock_res_and_lock(lock); + /* checking lvb_ready is racy but this is okay. The worst case is * that multi processes may configure the file on the same time. */ - if (lvb_ready) GOTO(out, rc = 0); @@ -4148,7 +4147,6 @@ static int ll_layout_lock_set(struct lustre_handle *lockh, enum ldlm_mode mode, /* refresh layout failed, need to wait */ wait_layout = rc == -EBUSY; EXIT; - out: LDLM_LOCK_PUT(lock); ldlm_lock_decref(lockh, mode); @@ -4173,39 +4171,37 @@ out: RETURN(rc); } -static int ll_layout_refresh_locked(struct inode *inode) +/** + * Issue layout intent RPC to MDS. + * \param inode [in] file inode + * \param intent [in] layout intent + * + * \retval 0 on success + * \retval < 0 error code + */ +static int ll_layout_intent(struct inode *inode, struct layout_intent *intent) { struct ll_inode_info *lli = ll_i2info(inode); struct ll_sb_info *sbi = ll_i2sbi(inode); struct md_op_data *op_data; - struct lookup_intent it; - struct lustre_handle lockh; - enum ldlm_mode mode; + struct lookup_intent it; struct ptlrpc_request *req; int rc; ENTRY; -again: - /* mostly layout lock is caching on the local side, so try to match - * it before grabbing layout lock mutex. */ - mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0, - LCK_CR | LCK_CW | LCK_PR | LCK_PW); - if (mode != 0) { /* hit cached lock */ - rc = ll_layout_lock_set(&lockh, mode, inode); - if (rc == -EAGAIN) - goto again; - - RETURN(rc); - } - op_data = ll_prep_md_op_data(NULL, inode, inode, NULL, 0, 0, LUSTRE_OPC_ANY, NULL); if (IS_ERR(op_data)) RETURN(PTR_ERR(op_data)); - /* have to enqueue one */ + op_data->op_data = intent; + op_data->op_data_size = sizeof(*intent); + memset(&it, 0, sizeof(it)); it.it_op = IT_LAYOUT; + if (intent->li_opc == LAYOUT_INTENT_WRITE || + intent->li_opc == LAYOUT_INTENT_TRUNC) + it.it_flags = FMODE_WRITE; LDLM_DEBUG_NOLOCK("%s: requeue layout lock for file "DFID"(%p)", ll_get_fsname(inode->i_sb, NULL, 0), @@ -4219,18 +4215,11 @@ again: ll_finish_md_op_data(op_data); - mode = it.it_lock_mode; - it.it_lock_mode = 0; - ll_intent_drop_lock(&it); - - if (rc == 0) { - /* set lock data in case this is a new lock */ + /* set lock data in case this is a new lock */ + if (!rc) ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL); - lockh.cookie = it.it_lock_handle; - rc = ll_layout_lock_set(&lockh, mode, inode); - if (rc == -EAGAIN) - goto again; - } + + ll_intent_drop_lock(&it); RETURN(rc); } @@ -4252,6 +4241,11 @@ int ll_layout_refresh(struct inode *inode, __u32 *gen) { struct ll_inode_info *lli = ll_i2info(inode); struct ll_sb_info *sbi = ll_i2sbi(inode); + struct lustre_handle lockh; + struct layout_intent intent = { + .li_opc = LAYOUT_INTENT_ACCESS, + }; + enum ldlm_mode mode; int rc; ENTRY; @@ -4266,18 +4260,57 @@ int ll_layout_refresh(struct inode *inode, __u32 *gen) /* take layout lock mutex to enqueue layout lock exclusively. */ mutex_lock(&lli->lli_layout_mutex); - rc = ll_layout_refresh_locked(inode); - if (rc < 0) - GOTO(out, rc); + while (1) { + /* mostly layout lock is caching on the local side, so try to + * match it before grabbing layout lock mutex. */ + mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0, + LCK_CR | LCK_CW | LCK_PR | LCK_PW); + if (mode != 0) { /* hit cached lock */ + rc = ll_layout_lock_set(&lockh, mode, inode); + if (rc == -EAGAIN) + continue; + break; + } - *gen = ll_layout_version_get(lli); -out: + rc = ll_layout_intent(inode, &intent); + if (rc != 0) + break; + } + + if (rc == 0) + *gen = ll_layout_version_get(lli); mutex_unlock(&lli->lli_layout_mutex); RETURN(rc); } /** + * Issue layout intent RPC indicating where in a file an IO is about to write. + * + * \param[in] inode file inode. + * \param[in] start start offset of fille in bytes where an IO is about to + * write. + * \param[in] end exclusive end offset in bytes of the write range. + * + * \retval 0 on success + * \retval < 0 error code + */ +int ll_layout_write_intent(struct inode *inode, __u64 start, __u64 end) +{ + struct layout_intent intent = { + .li_opc = LAYOUT_INTENT_WRITE, + .li_start = start, + .li_end = end, + }; + int rc; + ENTRY; + + rc = ll_layout_intent(inode, &intent); + + RETURN(rc); +} + +/** * This function send a restore request to the MDT */ int ll_layout_restore(struct inode *inode, loff_t offset, __u64 length) diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 854c6cb5..8426ad2 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -1419,6 +1419,7 @@ static inline void d_lustre_revalidate(struct dentry *dentry) int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf); int ll_layout_refresh(struct inode *inode, __u32 *gen); int ll_layout_restore(struct inode *inode, loff_t start, __u64 length); +int ll_layout_write_intent(struct inode *inode, __u64 start, __u64 end); int ll_xattr_init(void); void ll_xattr_fini(void); diff --git a/lustre/llite/vvp_io.c b/lustre/llite/vvp_io.c index 50ca3f1..50afff6 100644 --- a/lustre/llite/vvp_io.c +++ b/lustre/llite/vvp_io.c @@ -298,18 +298,18 @@ static void vvp_io_fini(const struct lu_env *env, const struct cl_io_slice *ios) struct cl_object *obj = io->ci_obj; struct vvp_io *vio = cl2vvp_io(env, ios); struct inode *inode = vvp_object_inode(obj); + int rc; CLOBINVRNT(env, obj, vvp_object_invariant(obj)); CDEBUG(D_VFSTRACE, DFID" ignore/verify layout %d/%d, layout version %d " - "restore needed %d\n", + "need write layout %d, restore needed %d\n", PFID(lu_object_fid(&obj->co_lu)), io->ci_ignore_layout, io->ci_verify_layout, - vio->vui_layout_gen, io->ci_restore_needed); + vio->vui_layout_gen, io->ci_need_write_intent, + io->ci_restore_needed); if (io->ci_restore_needed) { - int rc; - /* file was detected release, we need to restore it * before finishing the io */ @@ -334,6 +334,31 @@ static void vvp_io_fini(const struct lu_env *env, const struct cl_io_slice *ios) } } + /** + * dynamic layout change needed, send layout intent + * RPC. + */ + if (io->ci_need_write_intent) { + loff_t start = 0; + loff_t end = 0; + + LASSERT(io->ci_type == CIT_WRITE || cl_io_is_trunc(io)); + + io->ci_need_write_intent = 0; + + if (io->ci_type == CIT_WRITE) { + start = io->u.ci_rw.crw_pos; + end = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count; + } else { + end = io->u.ci_setattr.sa_attr.lvb_size; + } + + rc = ll_layout_write_intent(inode, start, end); + io->ci_result = rc; + if (!rc) + io->ci_need_restart = 1; + } + if (!io->ci_ignore_layout && io->ci_verify_layout) { __u32 gen = 0; diff --git a/lustre/lov/lov_ea.c b/lustre/lov/lov_ea.c index 4f8271e..04624a3 100644 --- a/lustre/lov/lov_ea.c +++ b/lustre/lov/lov_ea.c @@ -115,6 +115,9 @@ static void lsme_free(struct lov_stripe_md_entry *lsme) unsigned int i; size_t lsme_size; + if (!lsme_inited(lsme) || + lsme->lsme_pattern & LOV_PATTERN_F_RELEASED) + stripe_count = 0; for (i = 0; i < stripe_count; i++) OBD_SLAB_FREE_PTR(lsme->lsme_oinfo[i], lov_oinfo_slab); @@ -142,7 +145,7 @@ void lsm_free(struct lov_stripe_md *lsm) */ static struct lov_stripe_md_entry * lsme_unpack(struct lov_obd *lov, struct lov_mds_md *lmm, size_t buf_size, - const char *pool_name, struct lov_ost_data_v1 *objects, + const char *pool_name, bool inited, struct lov_ost_data_v1 *objects, loff_t *maxbytes) { struct lov_stripe_md_entry *lsme; @@ -160,7 +163,7 @@ lsme_unpack(struct lov_obd *lov, struct lov_mds_md *lmm, size_t buf_size, RETURN(ERR_PTR(-EINVAL)); pattern = le32_to_cpu(lmm->lmm_pattern); - if (pattern & LOV_PATTERN_F_RELEASED) + if (pattern & LOV_PATTERN_F_RELEASED || !inited) stripe_count = 0; else stripe_count = le16_to_cpu(lmm->lmm_stripe_count); @@ -186,8 +189,10 @@ lsme_unpack(struct lov_obd *lov, struct lov_mds_md *lmm, size_t buf_size, lsme->lsme_magic = magic; lsme->lsme_pattern = pattern; + lsme->lsme_flags = 0; lsme->lsme_stripe_size = le32_to_cpu(lmm->lmm_stripe_size); - lsme->lsme_stripe_count = stripe_count; + /* preserve the possible -1 stripe count for uninstantiated component */ + lsme->lsme_stripe_count = le16_to_cpu(lmm->lmm_stripe_count); lsme->lsme_layout_gen = le16_to_cpu(lmm->lmm_layout_gen); if (pool_name != NULL) { @@ -278,10 +283,12 @@ lsm_unpackmd_v1v3(struct lov_obd *lov, pattern = le32_to_cpu(lmm->lmm_pattern); - lsme = lsme_unpack(lov, lmm, buf_size, pool_name, objects, &maxbytes); + lsme = lsme_unpack(lov, lmm, buf_size, pool_name, true, objects, + &maxbytes); if (IS_ERR(lsme)) RETURN(ERR_CAST(lsme)); + lsme->lsme_flags = LCME_FL_INIT; lsme->lsme_extent.e_start = 0; lsme->lsme_extent.e_end = LUSTRE_EOF; @@ -371,7 +378,7 @@ static int lsm_verify_comp_md_v1(struct lov_comp_md_v1 *lcm, static struct lov_stripe_md_entry * lsme_unpack_comp(struct lov_obd *lov, struct lov_mds_md *lmm, - size_t lmm_buf_size, loff_t *maxbytes) + size_t lmm_buf_size, bool inited, loff_t *maxbytes) { unsigned int magic; unsigned int stripe_count; @@ -379,6 +386,9 @@ lsme_unpack_comp(struct lov_obd *lov, struct lov_mds_md *lmm, stripe_count = le16_to_cpu(lmm->lmm_stripe_count); if (stripe_count == 0) RETURN(ERR_PTR(-EINVAL)); + /* un-instantiated lmm contains no ost id info, i.e. lov_ost_data_v1 */ + if (!inited) + stripe_count = 0; magic = le32_to_cpu(lmm->lmm_magic); if (magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3) @@ -389,12 +399,12 @@ lsme_unpack_comp(struct lov_obd *lov, struct lov_mds_md *lmm, if (magic == LOV_MAGIC_V1) { return lsme_unpack(lov, lmm, lmm_buf_size, NULL, - lmm->lmm_objects, maxbytes); + inited, lmm->lmm_objects, maxbytes); } else { struct lov_mds_md_v3 *lmm3 = (struct lov_mds_md_v3 *)lmm; return lsme_unpack(lov, lmm, lmm_buf_size, lmm3->lmm_pool_name, - lmm3->lmm_objects, maxbytes); + inited, lmm3->lmm_objects, maxbytes); } } @@ -440,6 +450,8 @@ lsm_unpackmd_comp_md_v1(struct lov_obd *lov, void *buf, size_t buf_size) blob = (char *)lcm + blob_offset; lsme = lsme_unpack_comp(lov, blob, blob_size, + le32_to_cpu(lcme->lcme_flags) & + LCME_FL_INIT, (i == entry_count - 1) ? &maxbytes : NULL); if (IS_ERR(lsme)) @@ -450,6 +462,7 @@ lsm_unpackmd_comp_md_v1(struct lov_obd *lov, void *buf, size_t buf_size) lsm->lsm_entries[i] = lsme; lsme->lsme_id = le32_to_cpu(lcme->lcme_id); + lsme->lsme_flags = le32_to_cpu(lcme->lcme_flags); lu_extent_le_to_cpu(&lsme->lsme_extent, &lcme->lcme_extent); if (i == entry_count - 1) { @@ -482,7 +495,7 @@ const struct lsm_operations lsm_comp_md_v1_ops = { void dump_lsm(unsigned int level, const struct lov_stripe_md *lsm) { - int i; + int i, j; CDEBUG(level, "lsm %p, objid "DOSTID", maxbytes %#llx, magic 0x%08X, " "refc: %d, entry: %u, layout_gen %u\n", @@ -493,12 +506,25 @@ void dump_lsm(unsigned int level, const struct lov_stripe_md *lsm) for (i = 0; i < lsm->lsm_entry_count; i++) { struct lov_stripe_md_entry *lse = lsm->lsm_entries[i]; - CDEBUG(level, - DEXT ": id: %u, magic 0x%08X, stripe count %u, " - "size %u, layout_gen %u, pool: ["LOV_POOLNAMEF"]\n", - PEXT(&lse->lsme_extent), lse->lsme_id, lse->lsme_magic, + CDEBUG(level, DEXT ": id: %u, flags: %x, " + "magic 0x%08X, layout_gen %u, " + "stripe count %u, sstripe size %u, " + "pool: ["LOV_POOLNAMEF"]\n", + PEXT(&lse->lsme_extent), lse->lsme_id, lse->lsme_flags, + lse->lsme_magic, lse->lsme_layout_gen, lse->lsme_stripe_count, lse->lsme_stripe_size, - lse->lsme_layout_gen, lse->lsme_pool_name); + lse->lsme_pool_name); + if (!lsme_inited(lse) || + lse->lsme_pattern & LOV_PATTERN_F_RELEASED) + break; + for (j = 0; j < lse->lsme_stripe_count; j++) { + CDEBUG(level, " oinfo:%p: ostid: "DOSTID + " ost idx: %d gen: %d\n", + lse->lsme_oinfo[j], + POSTID(&lse->lsme_oinfo[j]->loi_oi), + lse->lsme_oinfo[j]->loi_ost_idx, + lse->lsme_oinfo[j]->loi_ost_gen); + } } } diff --git a/lustre/lov/lov_internal.h b/lustre/lov/lov_internal.h index b6a21c4..548fd50 100644 --- a/lustre/lov/lov_internal.h +++ b/lustre/lov/lov_internal.h @@ -45,6 +45,7 @@ struct lov_stripe_md_entry { struct lu_extent lsme_extent; u32 lsme_id; u32 lsme_magic; + u32 lsme_flags; u32 lsme_pattern; u32 lsme_stripe_size; u16 lsme_stripe_count; @@ -53,6 +54,16 @@ struct lov_stripe_md_entry { struct lov_oinfo *lsme_oinfo[]; }; +static inline void copy_lsm_entry(struct lov_stripe_md_entry *dst, + struct lov_stripe_md_entry *src) +{ + unsigned i; + + for (i = 0; i < src->lsme_stripe_count; i++) + *dst->lsme_oinfo[i] = *src->lsme_oinfo[i]; + memcpy(dst, src, offsetof(typeof(*src), lsme_oinfo)); +} + struct lov_stripe_md { atomic_t lsm_refc; spinlock_t lsm_lock; @@ -328,4 +339,14 @@ static inline void lov_lsm2layout(struct lov_stripe_md *lsm, ol->ol_comp_id = 0; } } + +static inline bool lsme_inited(const struct lov_stripe_md_entry *lsme) +{ + return lsme->lsme_flags & LCME_FL_INIT; +} + +static inline bool lsm_entry_inited(const struct lov_stripe_md *lsm, int index) +{ + return lsme_inited(lsm->lsm_entries[index]); +} #endif diff --git a/lustre/lov/lov_io.c b/lustre/lov/lov_io.c index 2e21bfb..aa32f58 100644 --- a/lustre/lov/lov_io.c +++ b/lustre/lov/lov_io.c @@ -404,6 +404,11 @@ static int lov_io_iter_init(const struct lu_env *env, u64 end; int stripe; + CDEBUG(D_VFSTRACE, "component[%d] flags %#x\n", + index, lsm->lsm_entries[index]->lsme_flags); + if (!lsm_entry_inited(lsm, index)) + break; + index++; if (!lu_extent_is_overlapped(&ext, &le->lle_extent)) continue; @@ -453,6 +458,7 @@ static int lov_io_rw_iter_init(const struct lu_env *env, { struct lov_io *lio = cl2lov_io(env, ios); struct cl_io *io = ios->cis_io; + struct lov_stripe_md *lsm = lio->lis_object->lo_lsm; struct lov_stripe_md_entry *lse; loff_t start = io->u.ci_rw.crw_pos; loff_t next; @@ -465,7 +471,7 @@ static int lov_io_rw_iter_init(const struct lu_env *env, if (cl_io_is_append(io)) RETURN(lov_io_iter_init(env, ios)); - index = lov_lsm_entry(lio->lis_object->lo_lsm, io->u.ci_rw.crw_pos); + index = lov_lsm_entry(lsm, io->u.ci_rw.crw_pos); if (index < 0) { /* non-existing layout component */ if (io->ci_type == CIT_READ) { /* TODO: it needs to detect the next component and @@ -486,7 +492,9 @@ static int lov_io_rw_iter_init(const struct lu_env *env, if (next <= start * ssize) next = ~0ull; - LASSERT(io->u.ci_rw.crw_pos >= lse->lsme_extent.e_start); + LASSERTF(io->u.ci_rw.crw_pos >= lse->lsme_extent.e_start, + "pos %lld, [%lld, %lld)\n", io->u.ci_rw.crw_pos, + lse->lsme_extent.e_start, lse->lsme_extent.e_end); next = min_t(__u64, next, lse->lsme_extent.e_end); next = min_t(loff_t, next, lio->lis_io_endpos); @@ -499,6 +507,12 @@ static int lov_io_rw_iter_init(const struct lu_env *env, (__u64)start, lio->lis_pos, lio->lis_endpos, (__u64)lio->lis_io_endpos, io->u.ci_rw.crw_count); + index = lov_lsm_entry(lsm, lio->lis_endpos - 1); + if (index > 0 && !lsm_entry_inited(lsm, index)) { + io->ci_need_write_intent = 1; + RETURN(io->ci_result = -ENODATA); + } + /* * XXX The following call should be optimized: we know, that * [lio->lis_pos, lio->lis_endpos) intersects with exactly one stripe. @@ -506,6 +520,26 @@ static int lov_io_rw_iter_init(const struct lu_env *env, RETURN(lov_io_iter_init(env, ios)); } +static int lov_io_setattr_iter_init(const struct lu_env *env, + const struct cl_io_slice *ios) +{ + struct lov_io *lio = cl2lov_io(env, ios); + struct cl_io *io = ios->cis_io; + struct lov_stripe_md *lsm = lio->lis_object->lo_lsm; + int index; + ENTRY; + + if (cl_io_is_trunc(io) && lio->lis_pos) { + index = lov_lsm_entry(lsm, lio->lis_pos - 1); + if (index > 0 && !lsm_entry_inited(lsm, index)) { + io->ci_need_write_intent = 1; + RETURN(io->ci_result = -ENODATA); + } + } + + RETURN(lov_io_iter_init(env, ios)); +} + static int lov_io_call(const struct lu_env *env, struct lov_io *lio, int (*iofunc)(const struct lu_env *, struct cl_io *)) { @@ -638,7 +672,7 @@ static int lov_io_read_ahead(const struct lu_env *env, offset = cl_offset(obj, start); index = lov_lsm_entry(loo->lo_lsm, offset); - if (index < 0) + if (index < 0 || !lsm_entry_inited(loo->lo_lsm, index)) RETURN(-ENODATA); stripe = lov_stripe_number(loo->lo_lsm, index, offset); @@ -893,15 +927,15 @@ static const struct cl_io_operations lov_io_ops = { .cio_start = lov_io_start, .cio_end = lov_io_end }, - [CIT_SETATTR] = { - .cio_fini = lov_io_fini, - .cio_iter_init = lov_io_iter_init, - .cio_iter_fini = lov_io_iter_fini, - .cio_lock = lov_io_lock, - .cio_unlock = lov_io_unlock, - .cio_start = lov_io_start, - .cio_end = lov_io_end - }, + [CIT_SETATTR] = { + .cio_fini = lov_io_fini, + .cio_iter_init = lov_io_setattr_iter_init, + .cio_iter_fini = lov_io_iter_fini, + .cio_lock = lov_io_lock, + .cio_unlock = lov_io_unlock, + .cio_start = lov_io_start, + .cio_end = lov_io_end + }, [CIT_DATA_VERSION] = { .cio_fini = lov_io_fini, .cio_iter_init = lov_io_iter_init, diff --git a/lustre/lov/lov_lock.c b/lustre/lov/lov_lock.c index 9c4855c..efa4cc1 100644 --- a/lustre/lov/lov_lock.c +++ b/lustre/lov/lov_lock.c @@ -134,7 +134,7 @@ static struct lov_lock *lov_lock_sub_init(const struct lu_env *env, nr = 0; for (index = lov_lsm_entry(lov->lo_lsm, ext.e_start); - index != -1 && index < lov->lo_lsm->lsm_entry_count; index++) { + index >= 0 && index < lov->lo_lsm->lsm_entry_count; index++) { struct lov_layout_raid0 *r0 = lov_r0(lov, index); /* assume lsm entries are sorted. */ @@ -149,8 +149,11 @@ static struct lov_lock *lov_lock_sub_init(const struct lu_env *env, nr++; } } - if (nr == 0) - RETURN(ERR_PTR(-EINVAL)); + /** + * Aggressive lock request (from cl_setattr_ost) which asks for + * [eof, -1) lock, could come across uninstantiated layout extent, + * hence a 0 nr is possible. + */ OBD_ALLOC_LARGE(lovlck, offsetof(struct lov_lock, lls_sub[nr])); if (lovlck == NULL) @@ -159,7 +162,7 @@ static struct lov_lock *lov_lock_sub_init(const struct lu_env *env, lovlck->lls_nr = nr; nr = 0; for (index = lov_lsm_entry(lov->lo_lsm, ext.e_start); - index < lov->lo_lsm->lsm_entry_count; index++) { + index >= 0 && index < lov->lo_lsm->lsm_entry_count; index++) { struct lov_layout_raid0 *r0 = lov_r0(lov, index); /* assume lsm entries are sorted. */ diff --git a/lustre/lov/lov_object.c b/lustre/lov/lov_object.c index da3fc21..1d6c8d5 100644 --- a/lustre/lov/lov_object.c +++ b/lustre/lov/lov_object.c @@ -63,8 +63,6 @@ struct lov_layout_operations { union lov_layout_state *state); void (*llo_fini)(const struct lu_env *env, struct lov_object *lov, union lov_layout_state *state); - void (*llo_install)(const struct lu_env *env, struct lov_object *lov, - union lov_layout_state *state); int (*llo_print)(const struct lu_env *env, void *cookie, lu_printer_t p, const struct lu_object *o); int (*llo_page_init)(const struct lu_env *env, struct cl_object *obj, @@ -91,16 +89,6 @@ static void lov_lsm_put(struct lov_stripe_md *lsm) * Lov object layout operations. * */ - -static void lov_install_empty(const struct lu_env *env, - struct lov_object *lov, - union lov_layout_state *state) -{ - /* - * File without objects. - */ -} - static int lov_init_empty(const struct lu_env *env, struct lov_device *dev, struct lov_object *lov, struct lov_stripe_md *lsm, const struct cl_object_conf *conf, @@ -109,12 +97,6 @@ static int lov_init_empty(const struct lu_env *env, struct lov_device *dev, return 0; } -static void lov_install_composite(const struct lu_env *env, - struct lov_object *lov, - union lov_layout_state *state) -{ -} - static struct cl_object *lov_sub_find(const struct lu_env *env, struct cl_device *dev, const struct lu_fid *fid, @@ -322,6 +304,14 @@ static int lov_init_composite(const struct lu_env *env, struct lov_device *dev, struct lov_layout_entry *le = &comp->lo_entries[i]; le->lle_extent = lsm->lsm_entries[i]->lsme_extent; + /** + * If the component has not been init-ed on MDS side, for + * PFL layout, we'd know that the components beyond this one + * will be dynamically init-ed later on file write/trunc ops. + */ + if (!lsm_entry_inited(lsm, i)) + break; + result = lov_init_raid0(env, dev, lov, i, &le->lle_raid0); if (result < 0) break; @@ -572,9 +562,9 @@ static int lov_print_composite(const struct lu_env *env, void *cookie, for (i = 0; i < lsm->lsm_entry_count; i++) { struct lov_stripe_md_entry *lse = lsm->lsm_entries[i]; - (*p)(env, cookie, DEXT ": { 0x%08X, %u, %u, %u, %u }\n", + (*p)(env, cookie, DEXT ": { 0x%08X, %u, %u, %#x, %u, %u }\n", PEXT(&lse->lsme_extent), lse->lsme_magic, - lse->lsme_id, lse->lsme_layout_gen, + lse->lsme_id, lse->lsme_layout_gen, lse->lsme_flags, lse->lsme_stripe_count, lse->lsme_stripe_size); lov_print_raid0(env, cookie, p, lov_r0(lov, i)); } @@ -672,6 +662,10 @@ static int lov_attr_get_composite(const struct lu_env *env, struct lov_layout_raid0 *r0 = &entry->lle_raid0; struct cl_attr *lov_attr = &r0->lo_attr; + /* PFL: This component has not been init-ed. */ + if (!lsm_entry_inited(lov->lo_lsm, index)) + break; + result = lov_attr_get_raid0(env, lov, index, r0); if (result != 0) break; @@ -699,7 +693,6 @@ const static struct lov_layout_operations lov_dispatch[] = { .llo_init = lov_init_empty, .llo_delete = lov_delete_empty, .llo_fini = lov_fini_empty, - .llo_install = lov_install_empty, .llo_print = lov_print_empty, .llo_page_init = lov_page_init_empty, .llo_lock_init = lov_lock_init_empty, @@ -710,7 +703,6 @@ const static struct lov_layout_operations lov_dispatch[] = { .llo_init = lov_init_released, .llo_delete = lov_delete_empty, .llo_fini = lov_fini_released, - .llo_install = lov_install_empty, .llo_print = lov_print_released, .llo_page_init = lov_page_init_empty, .llo_lock_init = lov_lock_init_empty, @@ -721,7 +713,6 @@ const static struct lov_layout_operations lov_dispatch[] = { .llo_init = lov_init_composite, .llo_delete = lov_delete_composite, .llo_fini = lov_fini_composite, - .llo_install = lov_install_composite, .llo_print = lov_print_composite, .llo_page_init = lov_page_init_composite, .llo_lock_init = lov_lock_init_composite, @@ -906,7 +897,6 @@ static int lov_layout_change(const struct lu_env *unused, GOTO(out, rc); } - new_ops->llo_install(env, lov, state); lov->lo_type = llt; out: @@ -954,8 +944,6 @@ int lov_object_init(const struct lu_env *env, struct lu_object *obj, if (rc != 0) GOTO(out_lsm, rc); - ops->llo_install(env, lov, set); - out_lsm: lov_lsm_put(lsm); @@ -977,6 +965,7 @@ static int lov_conf_set(const struct lu_env *env, struct cl_object *obj, conf->u.coc_layout.lb_len); if (IS_ERR(lsm)) RETURN(PTR_ERR(lsm)); + dump_lsm(D_INODE, lsm); } lov_conf_lock(lov); @@ -1544,6 +1533,9 @@ static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj, for (entry = start_entry; entry <= end_entry; entry++) { lsme = lsm->lsm_entries[entry]; + if (!lsme_inited(lsme)) + break; + if (entry == start_entry) fs.fs_ext.e_start = whole_start; else @@ -1752,6 +1744,9 @@ int lov_read_and_clear_async_rc(struct cl_object *clob) lsm->lsm_entries[i]; int j; + if (!lsme_inited(lse)) + break; + for (j = 0; j < lse->lsme_stripe_count; j++) { struct lov_oinfo *loi = lse->lsme_oinfo[j]; diff --git a/lustre/lov/lov_pack.c b/lustre/lov/lov_pack.c index db005e5..f79827b 100644 --- a/lustre/lov/lov_pack.c +++ b/lustre/lov/lov_pack.c @@ -169,6 +169,9 @@ ssize_t lov_lsm_pack_v1v3(const struct lov_stripe_md *lsm, void *buf, lmm_objects = lmmv1->lmm_objects; } + if (lsm->lsm_is_released) + RETURN(lmm_size); + for (i = 0; i < lsm->lsm_entries[0]->lsme_stripe_count; i++) { struct lov_oinfo *loi = lsm->lsm_entries[0]->lsme_oinfo[i]; @@ -213,11 +216,13 @@ ssize_t lov_lsm_pack(const struct lov_stripe_md *lsm, void *buf, for (entry = 0; entry < lsm->lsm_entry_count; entry++) { struct lov_stripe_md_entry *lsme; struct lov_mds_md *lmm; + __u16 stripecnt; lsme = lsm->lsm_entries[entry]; lcme = &lcmv1->lcm_entries[entry]; lcme->lcme_id = cpu_to_le32(lsme->lsme_id); + lcme->lcme_flags = cpu_to_le32(lsme->lsme_flags); lcme->lcme_extent.e_start = cpu_to_le64(lsme->lsme_extent.e_start); lcme->lcme_extent.e_end = @@ -244,7 +249,13 @@ ssize_t lov_lsm_pack(const struct lov_stripe_md *lsm, void *buf, ((struct lov_mds_md_v1 *)lmm)->lmm_objects; } - for (i = 0; i < lsme->lsme_stripe_count; i++) { + if (lsme_inited(lsme) && + !(lsme->lsme_pattern & LOV_PATTERN_F_RELEASED)) + stripecnt = lsme->lsme_stripe_count; + else + stripecnt = 0; + + for (i = 0; i < stripecnt; i++) { struct lov_oinfo *loi = lsme->lsme_oinfo[i]; ostid_cpu_to_le(&loi->loi_oi, &lmm_objects[i].l_ost_oi); @@ -254,8 +265,7 @@ ssize_t lov_lsm_pack(const struct lov_stripe_md *lsm, void *buf, cpu_to_le32(loi->loi_ost_idx); } - size = lov_mds_md_size(lsme->lsme_stripe_count, - lsme->lsme_magic); + size = lov_mds_md_size(stripecnt, lsme->lsme_magic); lcme->lcme_size = cpu_to_le32(size); offset += size; } /* for each layout component */ diff --git a/lustre/lov/lov_page.c b/lustre/lov/lov_page.c index ae74d25..19a908f 100644 --- a/lustre/lov/lov_page.c +++ b/lustre/lov/lov_page.c @@ -81,7 +81,7 @@ int lov_page_init_composite(const struct lu_env *env, struct cl_object *obj, offset = cl_offset(obj, index); entry = lov_lsm_entry(loo->lo_lsm, offset); - if (entry < 0) { + if (entry < 0 || !lsm_entry_inited(loo->lo_lsm, entry)) { /* non-existing layout component */ lov_page_init_empty(env, obj, page, index); RETURN(0); diff --git a/lustre/mdc/mdc_internal.h b/lustre/mdc/mdc_internal.h index e723b55..5ac91ef 100644 --- a/lustre/mdc/mdc_internal.h +++ b/lustre/mdc/mdc_internal.h @@ -89,6 +89,9 @@ int mdc_enqueue(struct obd_export *exp, struct ldlm_enqueue_info *einfo, int mdc_resource_get_unused(struct obd_export *exp, const struct lu_fid *fid, struct list_head *cancels, enum ldlm_mode mode, __u64 bits); +int mdc_save_lovea(struct ptlrpc_request *req, + const struct req_msg_field *field, + void *data, u32 size); /* mdc/mdc_request.c */ int mdc_fid_alloc(const struct lu_env *env, struct obd_export *exp, struct lu_fid *fid, struct md_op_data *op_data); diff --git a/lustre/mdc/mdc_locks.c b/lustre/mdc/mdc_locks.c index f683f02..35b96ce 100644 --- a/lustre/mdc/mdc_locks.c +++ b/lustre/mdc/mdc_locks.c @@ -214,20 +214,32 @@ static inline void mdc_clear_replay_flag(struct ptlrpc_request *req, int rc) * original open if the MDS crashed just when this client also OOM'd) * but this is incredibly unlikely, and questionable whether the client * could do MDS recovery under OOM anyways... */ -static void mdc_realloc_openmsg(struct ptlrpc_request *req, - struct mdt_body *body) +int mdc_save_lovea(struct ptlrpc_request *req, + const struct req_msg_field *field, + void *data, u32 size) { - int rc; + struct req_capsule *pill = &req->rq_pill; + void *lmm; + int rc = 0; - /* FIXME: remove this explicit offset. */ - rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4, - body->mbo_eadatasize); - if (rc) { - CERROR("Can't enlarge segment %d size to %d\n", - DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize); - body->mbo_valid &= ~OBD_MD_FLEASIZE; - body->mbo_eadatasize = 0; + if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) { + rc = sptlrpc_cli_enlarge_reqbuf(req, field, size); + if (rc) { + CERROR("%s: Can't enlarge ea size to %d: rc = %d\n", + req->rq_export->exp_obd->obd_name, + size, rc); + return rc; + } + } else { + req_capsule_shrink(pill, field, size, RCL_CLIENT); } + + req_capsule_set_size(pill, field, RCL_CLIENT, size); + lmm = req_capsule_client_get(pill, field); + if (lmm) + memcpy(lmm, data, size); + + return rc; } static struct ptlrpc_request * @@ -454,7 +466,7 @@ static struct ptlrpc_request *mdc_intent_getattr_pack(struct obd_export *exp, static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp, struct lookup_intent *it, - struct md_op_data *unused) + struct md_op_data *op_data) { struct obd_device *obd = class_exp2obd(exp); struct ptlrpc_request *req; @@ -481,9 +493,9 @@ static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp, /* pack the layout intent request */ layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT); - /* LAYOUT_INTENT_ACCESS is generic, specific operation will be - * set for replication */ - layout->li_opc = LAYOUT_INTENT_ACCESS; + LASSERT(op_data->op_data != NULL); + LASSERT(op_data->op_data_size == sizeof(*layout)); + memcpy(layout, op_data->op_data, sizeof(*layout)); req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, obd->u.cli.cl_default_mds_easize); @@ -632,27 +644,16 @@ static int mdc_finish_enqueue(struct obd_export *exp, * (for example error one). */ if ((it->it_op & IT_OPEN) && req->rq_replay) { - void *lmm; - if (req_capsule_get_size(pill, &RMF_EADATA, - RCL_CLIENT) < - body->mbo_eadatasize) - mdc_realloc_openmsg(req, body); - else - req_capsule_shrink(pill, &RMF_EADATA, - body->mbo_eadatasize, - RCL_CLIENT); - - req_capsule_set_size(pill, &RMF_EADATA, - RCL_CLIENT, - body->mbo_eadatasize); - - lmm = req_capsule_client_get(pill, &RMF_EADATA); - if (lmm) - memcpy(lmm, eadata, - body->mbo_eadatasize); + rc = mdc_save_lovea(req, &RMF_EADATA, eadata, + body->mbo_eadatasize); + if (rc) { + body->mbo_valid &= ~OBD_MD_FLEASIZE; + body->mbo_eadatasize = 0; + rc = 0; + } } } - } else if (it->it_op & IT_LAYOUT) { + } else if (it->it_op & IT_LAYOUT) { /* maybe the lock was granted right away and layout * is packed into RMF_DLM_LVB of req */ lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER); @@ -661,6 +662,15 @@ static int mdc_finish_enqueue(struct obd_export *exp, &RMF_DLM_LVB, lvb_len); if (lvb_data == NULL) RETURN(-EPROTO); + + /** + * save replied layout data to the request buffer for + * recovery consideration (lest MDS reinitialize + * another set of OST objects). + */ + if (req->rq_transno) + (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data, + lvb_len); } } @@ -1035,13 +1045,13 @@ int mdc_revalidate_lock(struct obd_export *exp, struct lookup_intent *it, case IT_READDIR: policy.l_inodebits.bits = MDS_INODELOCK_UPDATE; break; - case IT_LAYOUT: - policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT; - break; - default: - policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP; - break; - } + case IT_LAYOUT: + policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT; + break; + default: + policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP; + break; + } mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid, LDLM_IBITS, &policy, diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 6f9105e..97404f6 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -384,32 +384,17 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt, GOTO(out, rc = -EPROTO); if (body->mbo_valid & OBD_MD_FLEASIZE) { - void *eadata, *lmm; + void *eadata; eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD, body->mbo_eadatasize); if (eadata == NULL) GOTO(out, rc = -EPROTO); - if (req_capsule_get_size(pill, &RMF_EADATA, - RCL_CLIENT) < - body->mbo_eadatasize) { - rc = sptlrpc_cli_enlarge_reqbuf(req, 4, - body->mbo_eadatasize); - if (rc) - GOTO(out, rc = -ENOMEM); - } else { - req_capsule_shrink(pill, &RMF_EADATA, - body->mbo_eadatasize, - RCL_CLIENT); - } - - req_capsule_set_size(pill, &RMF_EADATA, RCL_CLIENT, - body->mbo_eadatasize); - - lmm = req_capsule_client_get(pill, &RMF_EADATA); - if (lmm) - memcpy(lmm, eadata, body->mbo_eadatasize); + rc = mdc_save_lovea(req, &RMF_EADATA, eadata, + body->mbo_eadatasize); + if (rc) + GOTO(out, rc); } } out: diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index dd3e494..57b1234 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -1227,6 +1227,24 @@ out: } /** + * Handler of layout intent RPC requiring the layout modification + * + * \param info [in] thread environment + * \param obj [in] object + * \param layout [in] layout intent + * + * \retval 0 on success + * \retval < 0 error code + */ +static int mdt_layout_change(struct mdt_thread_info *info, + struct mdt_object *obj, + struct layout_intent *layout) +{ + /* XXX: to do */ + return 0; +} + +/** * Exchange MOF_LOV_CREATED flags between two objects after a * layout swap. No assumption is made on whether o1 or o2 have * created objects or not. @@ -3437,6 +3455,7 @@ static int mdt_intent_layout(enum mdt_it_code opcode, struct layout_intent *layout; struct lu_fid *fid; struct mdt_object *obj = NULL; + bool layout_change = false; int layout_size = 0; int rc = 0; ENTRY; @@ -3451,11 +3470,29 @@ static int mdt_intent_layout(enum mdt_it_code opcode, if (layout == NULL) RETURN(-EPROTO); - if (layout->li_opc != LAYOUT_INTENT_ACCESS) { + switch (layout->li_opc) { + case LAYOUT_INTENT_TRUNC: + case LAYOUT_INTENT_WRITE: + layout_change = true; + break; + case LAYOUT_INTENT_ACCESS: + break; + case LAYOUT_INTENT_READ: + case LAYOUT_INTENT_GLIMPSE: + case LAYOUT_INTENT_RELEASE: + case LAYOUT_INTENT_RESTORE: CERROR("%s: Unsupported layout intent opc %d\n", mdt_obd_name(info->mti_mdt), layout->li_opc); - RETURN(-EINVAL); + rc = -ENOTSUPP; + break; + default: + CERROR("%s: Unknown layout intent opc %d\n", + mdt_obd_name(info->mti_mdt), layout->li_opc); + rc = -EINVAL; + break; } + if (rc < 0) + RETURN(rc); fid = &info->mti_tmp_fid2; fid_extract_from_res_name(fid, &(*lockp)->l_resource->lr_name); @@ -3480,8 +3517,14 @@ static int mdt_intent_layout(enum mdt_it_code opcode, req_capsule_set_size(info->mti_pill, &RMF_DLM_LVB, RCL_SERVER, layout_size); rc = req_capsule_server_pack(info->mti_pill); - GOTO(out_obj, rc); + if (rc) + GOTO(out_obj, rc); + if (layout_change) { + rc = mdt_layout_change(info, obj, layout); + if (rc) + GOTO(out_obj, rc); + } out_obj: mdt_object_put(info->mti_env, obj); diff --git a/lustre/obdclass/genops.c b/lustre/obdclass/genops.c index 3ed4c3a..c62e2e9 100644 --- a/lustre/obdclass/genops.c +++ b/lustre/obdclass/genops.c @@ -2224,6 +2224,16 @@ static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli, return avail; } +static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it) +{ + if (it != NULL && + (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP || + it->it_op == IT_READDIR || + (it->it_op == IT_LAYOUT && !(it->it_flags & FMODE_WRITE)))) + return true; + return false; +} + /* Get a modify RPC slot from the obd client @cli according * to the kind of operation @opc that is going to be sent * and the intent @it of the operation if it applies. @@ -2242,8 +2252,7 @@ __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc, /* read-only metadata RPCs don't consume a slot on MDT * for reply reconstruction */ - if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP || - it->it_op == IT_LAYOUT || it->it_op == IT_READDIR)) + if (obd_skip_mod_rpc_slot(it)) return 0; if (opc == MDS_CLOSE) @@ -2289,8 +2298,7 @@ void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, { bool close_req = false; - if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP || - it->it_op == IT_LAYOUT || it->it_op == IT_READDIR)) + if (obd_skip_mod_rpc_slot(it)) return; if (opc == MDS_CLOSE) diff --git a/lustre/ptlrpc/layout.c b/lustre/ptlrpc/layout.c index 1a7f069..5928306 100644 --- a/lustre/ptlrpc/layout.c +++ b/lustre/ptlrpc/layout.c @@ -1929,17 +1929,17 @@ EXPORT_SYMBOL(req_capsule_server_pack); * Returns the PTLRPC request or reply (\a loc) buffer offset of a \a pill * corresponding to the given RMF (\a field). */ -static __u32 __req_capsule_offset(const struct req_capsule *pill, - const struct req_msg_field *field, - enum req_location loc) +__u32 __req_capsule_offset(const struct req_capsule *pill, + const struct req_msg_field *field, + enum req_location loc) { unsigned int offset; - offset = field->rmf_offset[pill->rc_fmt->rf_idx][loc]; - LASSERTF(offset > 0, "%s:%s, off=%d, loc=%d\n", - pill->rc_fmt->rf_name, - field->rmf_name, offset, loc); - offset --; + offset = field->rmf_offset[pill->rc_fmt->rf_idx][loc]; + LASSERTF(offset > 0, "%s:%s, off=%d, loc=%d\n", + pill->rc_fmt->rf_name, + field->rmf_name, offset, loc); + offset--; LASSERT(offset < REQ_MAX_FIELD_NR); return offset; diff --git a/lustre/ptlrpc/ptlrpc_internal.h b/lustre/ptlrpc/ptlrpc_internal.h index 1dbc82d..5da292f 100644 --- a/lustre/ptlrpc/ptlrpc_internal.h +++ b/lustre/ptlrpc/ptlrpc_internal.h @@ -308,6 +308,11 @@ void sptlrpc_conf_fini(void); int sptlrpc_init(void); void sptlrpc_fini(void); +/* layout.c */ +__u32 __req_capsule_offset(const struct req_capsule *pill, + const struct req_msg_field *field, + enum req_location loc); + static inline bool ptlrpc_recoverable_error(int rc) { return (rc == -ENOTCONN || rc == -ENODEV); diff --git a/lustre/ptlrpc/sec.c b/lustre/ptlrpc/sec.c index 79237a0..7037b91 100644 --- a/lustre/ptlrpc/sec.c +++ b/lustre/ptlrpc/sec.c @@ -1657,11 +1657,14 @@ EXPORT_SYMBOL(_sptlrpc_enlarge_msg_inplace); * so caller should refresh its local pointers if needed. */ int sptlrpc_cli_enlarge_reqbuf(struct ptlrpc_request *req, - int segment, int newsize) -{ - struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx; - struct ptlrpc_sec_cops *cops; - struct lustre_msg *msg = req->rq_reqmsg; + const struct req_msg_field *field, + int newsize) +{ + struct req_capsule *pill = &req->rq_pill; + struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx; + struct ptlrpc_sec_cops *cops; + struct lustre_msg *msg = req->rq_reqmsg; + int segment = __req_capsule_offset(pill, field, RCL_CLIENT); LASSERT(ctx); LASSERT(msg); diff --git a/lustre/tests/sanity-pfl.sh b/lustre/tests/sanity-pfl.sh index 2720a1a..805344a 100644 --- a/lustre/tests/sanity-pfl.sh +++ b/lustre/tests/sanity-pfl.sh @@ -47,6 +47,9 @@ test_0() { $LFS setstripe -E 1m -S 1M -c 1 -E -1 -c 1 $comp_file || error "Create $comp_file failed" + #instantiate all components, so that objs are allocted + dd if=/dev/zero of=$comp_file bs=1k count=1 seek=1k + local ost_idx1=$($LFS getstripe -I 1 -i $comp_file) local ost_idx2=$($LFS getstripe -I 2 -i $comp_file) @@ -67,6 +70,9 @@ test_1() { $LFS setstripe -E 1m -S 1m -o 0 -E -1 -o 0 $comp_file || error "Create $comp_file failed" + #instantiate all components, so that objs are allocted + dd if=/dev/zero of=$comp_file bs=1k count=1 seek=1k + local ost_idx1=$($LFS getstripe -I 1 -i $comp_file) local ost_idx2=$($LFS getstripe -I 2 -i $comp_file) @@ -97,8 +103,8 @@ test_2() { dd if=/dev/zero of=$comp_file bs=1M count=2 > /dev/null 2>&1 && error "Write beyond component should fail" - dd if=$comp_file of=/dev/null bs=1M count=2 > /dev/null 2>&1 && - error "Read beyond component should fail" + dd if=$comp_file of=/dev/null bs=1M count=2 > /dev/null 2>&1 || + error "Read beyond component should short read, not fail" $LFS setstripe --component-add -E 2M -c 1 $comp_file || error "Add component to $comp_file failed" @@ -166,6 +172,9 @@ test_3() { $LFS setstripe -E 1M -E 16M -E -1 $comp_file || error "Create second $comp_file failed" + #instantiate all components, so that objs are allocted + dd if=/dev/zero of=$comp_file bs=1k count=1 seek=16k + del_comp_and_verify $comp_file "init" 0 0 rm -f $comp_file || error "Delete second $comp_file failed" } @@ -195,6 +204,9 @@ test_5() { local comp_cnt=$($LFS getstripe --component-count $comp_file) [ $comp_cnt -ne 2 ] && error "file $comp_cnt != 2" + #instantiate all components, so that objs are allocted + dd if=/dev/zero of=$comp_file bs=1k count=1 seek=64k + local ost_idx=$($LFS getstripe -I 1 -i $comp_file) [ $ost_idx -ne 0 ] && error "component 1 ost_idx $ost_idx != 0" diff --git a/lustre/utils/liblustreapi.c b/lustre/utils/liblustreapi.c index abcf0df..04bf813 100644 --- a/lustre/utils/liblustreapi.c +++ b/lustre/utils/liblustreapi.c @@ -2105,6 +2105,7 @@ enum lov_dump_flags { LDF_IS_DIR = 0x0001, LDF_IS_RAW = 0x0002, LDF_INDENT = 0x0004, + LDF_SKIP_OBJS = 0x0008, }; static void lov_dump_user_lmm_header(struct lov_user_md *lum, char *path, @@ -2115,6 +2116,7 @@ static void lov_dump_user_lmm_header(struct lov_user_md *lum, char *path, bool is_dir = flags & LDF_IS_DIR; bool is_raw = flags & LDF_IS_RAW; bool indent = flags & LDF_INDENT; + bool skip_objs = flags & LDF_SKIP_OBJS; char *prefix = is_dir ? "" : "lmm_"; char *separator = ""; char *space = indent ? " " : ""; @@ -2245,7 +2247,7 @@ static void lov_dump_user_lmm_header(struct lov_user_md *lum, char *path, if (verbose & ~VERBOSE_OFFSET) llapi_printf(LLAPI_MSG_NORMAL, "%s%sstripe_offset: ", space, prefix); - if (is_dir) + if (is_dir || skip_objs) llapi_printf(LLAPI_MSG_NORMAL, "%d", lum->lmm_stripe_offset == (typeof(lum->lmm_stripe_offset))(-1) ? -1 : @@ -2276,6 +2278,7 @@ void lov_dump_user_lmm_v1v3(struct lov_user_md *lum, char *pool_name, { bool is_dir = flags & LDF_IS_DIR; bool indent = flags & LDF_INDENT; + bool skip_objs = flags & LDF_SKIP_OBJS; int i, obdstripe = (obdindex != OBD_NOT_FOUND) ? 0 : 1; if (!obdstripe) { @@ -2293,7 +2296,7 @@ void lov_dump_user_lmm_v1v3(struct lov_user_md *lum, char *pool_name, lov_dump_user_lmm_header(lum, path, objects, header, depth, pool_name, flags); - if (!is_dir && (header & VERBOSE_OBJID) && + if (!is_dir && !skip_objs && (header & VERBOSE_OBJID) && !(lum->lmm_pattern & LOV_PATTERN_F_RELEASED)) { char *space = " - "; @@ -2720,6 +2723,11 @@ static void lov_dump_comp_v1(struct find_param *param, char *path, !(param->fp_comp_flags & entry->lcme_flags)) continue; + if (entry->lcme_flags & LCME_FL_INIT) + flags &= ~LDF_SKIP_OBJS; + else + flags |= LDF_SKIP_OBJS; + if (param->fp_check_comp_id && param->fp_comp_id != entry->lcme_id) continue; diff --git a/lustre/utils/liblustreapi_layout.c b/lustre/utils/liblustreapi_layout.c index 813e400..6136cff 100644 --- a/lustre/utils/liblustreapi_layout.c +++ b/lustre/utils/liblustreapi_layout.c @@ -799,8 +799,13 @@ static bool llapi_layout_lum_valid(struct lov_user_md *lum, int lum_size) } obj_count = llapi_layout_objects_in_lum(lum, lum_size); - if (obj_count != lum->lmm_stripe_count) + if (comp_v1) { + if (!(comp_v1->lcm_entries[i].lcme_flags & + LCME_FL_INIT) && obj_count != 0) + return false; + } else if (obj_count != lum->lmm_stripe_count) { return false; + } } return true; } @@ -1710,7 +1715,6 @@ int llapi_layout_comp_add(struct llapi_layout *layout) llc_list); /* Inherit some attributes from existing component */ - new->llc_pattern = comp->llc_pattern; new->llc_stripe_size = comp->llc_stripe_size; new->llc_stripe_count = comp->llc_stripe_count; if (new->llc_extent.e_end <= last->llc_extent.e_end) {