From 89693927f0b065d44fdc496f6b49539118570104 Mon Sep 17 00:00:00 2001 From: Bobi Jam Date: Thu, 6 Apr 2017 07:56:43 +0800 Subject: [PATCH] LU-8998 lod: accomodate to composite layout Modify the LOD to make it support the composite layout: - Introduced lod_layout_component structure to represent each individual layout component, and added ldo_comp_entries and ldo_comp_cnt in the lod_object structure to hold the striping information for composite layout. - Object allocation code is adjusted to not only check the used OSTs in this round of allocation, but also the used OSTs in the existing layout components. - Support special xattr names to add/set/delete layout component(s). - Store default layout xattr on the filesystem root inode. Reviewed-on: https://review.whamcloud.com/24823 Change-Id: I73736df1cd529a3b0e32b2bb6a09b5c436e08c86 Signed-off-by: Niu Yawei Signed-off-by: Bobi Jam Reviewed-by: Andreas Dilger Reviewed-by: Lai Siyao Reviewed-by: Jinshan Xiong --- libcfs/include/libcfs/libcfs_debug.h | 4 +- lustre/include/lustre/lustre_idl.h | 9 +- lustre/include/lustre/lustre_user.h | 8 +- lustre/lod/lod_dev.c | 7 + lustre/lod/lod_internal.h | 194 +++-- lustre/lod/lod_lov.c | 705 +++++++++++++--- lustre/lod/lod_object.c | 1505 +++++++++++++++++++++++++++------- lustre/lod/lod_qos.c | 648 ++++++++++----- lustre/mdc/mdc_request.c | 41 +- lustre/mdd/mdd_object.c | 135 ++- lustre/mdt/mdt_handler.c | 53 +- lustre/mdt/mdt_internal.h | 1 + lustre/mdt/mdt_lib.c | 16 +- lustre/mdt/mdt_open.c | 40 +- lustre/mdt/mdt_xattr.c | 58 +- lustre/ptlrpc/layout.c | 8 +- lustre/ptlrpc/pack_generic.c | 55 +- 17 files changed, 2756 insertions(+), 731 deletions(-) diff --git a/libcfs/include/libcfs/libcfs_debug.h b/libcfs/include/libcfs/libcfs_debug.h index eff16dc..2eb6b7a 100644 --- a/libcfs/include/libcfs/libcfs_debug.h +++ b/libcfs/include/libcfs/libcfs_debug.h @@ -154,13 +154,15 @@ struct ptldebug_header { #define D_LFSCK 0x10000000 /* For both OI scrub and LFSCK */ #define D_HSM 0x20000000 #define D_SNAPSHOT 0x40000000 /* snapshot */ +#define D_LAYOUT 0x80000000 #define LIBCFS_DEBUG_MASKS_NAMES { \ "trace", "inode", "super", "ext2", "malloc", "cache", "info", \ "ioctl", "neterror", "net", "warning", "buffs", "other", \ "dentry", "nettrace", "page", "dlmtrace", "error", "emerg", \ "ha", "rpctrace", "vfstrace", "reada", "mmap", "config", \ - "console", "quota", "sec", "lfsck", "hsm", "snapshot", NULL } + "console", "quota", "sec", "lfsck", "hsm", "snapshot", "layout",\ + NULL } #define D_CANTMASK (D_ERROR | D_EMERG | D_WARNING | D_CONSOLE) diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index ea1814c..c31a3c4 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -962,9 +962,14 @@ enum obdo_flags { * depending on the case (replay uses ready striping, non-replay req uses * hints), so MDT replaces magic with appropriate one and now LOD can * easily understand what's inside -bzzz + * + * those *_DEF magics are only used on server side internally, they + * won't be put on wire or disk. */ -#define LOV_MAGIC_V1_DEF 0x0CD10BD0 -#define LOV_MAGIC_V3_DEF 0x0CD30BD0 +#define LOV_MAGIC_DEF 0x10000000 +#define LOV_MAGIC_V1_DEF (LOV_MAGIC_DEF | LOV_MAGIC_V1) +#define LOV_MAGIC_V3_DEF (LOV_MAGIC_DEF | LOV_MAGIC_V3) +#define LOV_MAGIC_COMP_V1_DEF (LOV_MAGIC_DEF | LOV_MAGIC_COMP_V1) #define lov_pattern(pattern) (pattern & ~LOV_PATTERN_F_MASK) #define lov_pattern_flags(pattern) (pattern & LOV_PATTERN_F_MASK) diff --git a/lustre/include/lustre/lustre_user.h b/lustre/include/lustre/lustre_user.h index 6b8df13..4e30017 100644 --- a/lustre/include/lustre/lustre_user.h +++ b/lustre/include/lustre/lustre_user.h @@ -347,6 +347,12 @@ enum ll_lease_type { #define LOV_PATTERN_F_HOLE 0x40000000 /* there is hole in LOV EA */ #define LOV_PATTERN_F_RELEASED 0x80000000 /* HSM released file */ +static inline bool lov_pattern_supported(__u32 pattern) +{ + return pattern == LOV_PATTERN_RAID0 || + pattern == (LOV_PATTERN_RAID0 | LOV_PATTERN_F_RELEASED); +} + #define LOV_MAXPOOLNAME 15 #define LOV_POOLNAMEF "%.15s" @@ -425,7 +431,7 @@ enum lov_comp_md_entry_flags { /* lcme_id can be specified as certain flags, and the the first * bit of lcme_id is used to indicate that the ID is representing - * certain lcme_flags but not a real ID. Which implies we can have + * certain LCME_FL_* but not a real ID. Which implies we can have * at most 31 flags (see LCME_FL_XXX). */ enum lcme_id { LCME_ID_INVAL = 0x0, diff --git a/lustre/lod/lod_dev.c b/lustre/lod/lod_dev.c index 0e2d0c2..68065a5 100644 --- a/lustre/lod/lod_dev.c +++ b/lustre/lod/lod_dev.c @@ -1829,6 +1829,9 @@ static void lod_key_fini(const struct lu_context *ctx, struct lu_context_key *key, void *data) { struct lod_thread_info *info = data; + struct lod_layout_component *lds = + info->lti_def_striping.lds_def_comp_entries; + /* allocated in lod_get_lov_ea * XXX: this is overload, a tread may have such store but used only * once. Probably better would be pool of such stores per LOD. @@ -1839,6 +1842,10 @@ static void lod_key_fini(const struct lu_context *ctx, info->lti_ea_store_size = 0; } lu_buf_free(&info->lti_linkea_buf); + + if (lds != NULL) + lod_free_def_comp_entries(&info->lti_def_striping); + OBD_FREE_PTR(info); } diff --git a/lustre/lod/lod_internal.h b/lustre/lod/lod_internal.h index 0cc8d8b..90f25f7 100644 --- a/lustre/lod/lod_internal.h +++ b/lustre/lod/lod_internal.h @@ -229,78 +229,126 @@ struct lod_device { #define ltd_mdt ltd_tgt #define lod_mdt_desc lod_tgt_desc +struct lod_layout_component { + struct lu_extent llc_extent; + __u32 llc_id; + __u32 llc_flags; + __u32 llc_stripe_size; + __u32 llc_pattern; + __u16 llc_layout_gen; + __u16 llc_stripe_offset; + __u16 llc_stripenr; + __u16 llc_stripes_allocated; + char *llc_pool; + struct dt_object **llc_stripe; +}; + struct lod_default_striping { /* default LOV */ - __u32 lds_def_stripe_size; - __u16 lds_def_stripenr; - __u16 lds_def_stripe_offset; - char lds_def_pool[LOV_MAXPOOLNAME + 1]; + /* current layout component count */ + __u16 lds_def_comp_cnt; + /* the largest comp count ever used */ + __u32 lds_def_comp_size_cnt; + struct lod_layout_component *lds_def_comp_entries; /* default LMV */ - __u32 lds_dir_def_stripenr; - __u32 lds_dir_def_stripe_offset; - __u32 lds_dir_def_hash_type; - /* flags whether default striping is set */ - __u32 lds_def_striping_set:1, - lds_dir_def_striping_set:1; + __u32 lds_dir_def_stripenr; + __u32 lds_dir_def_stripe_offset; + __u32 lds_dir_def_hash_type; + /* default file striping flags (LOV) */ + __u32 lds_def_striping_set:1, + lds_def_striping_is_composite:1, + /* default dir striping flags (LMV) */ + lds_dir_def_striping_set:1; }; struct lod_object { - struct dt_object ldo_obj; + struct dt_object ldo_obj; union { - /* file stripe */ + /* file stripe (LOV) */ struct { - /* - * don't change field order, because both file and - * directory use ldo_stripenr/ldo_stripes_allocated - * to access stripe number. - */ - __u16 ldo_stripenr; - __u16 ldo_stripes_allocated; - __u16 ldo_layout_gen; - __u16 ldo_released_stripenr; - __u32 ldo_pattern; - __u32 ldo_stripe_size; - __u16 ldo_stripe_offset; - char *ldo_pool; + __u32 ldo_layout_gen; + /* Layout component count for a regular file. + * It equals to 1 for non-composite layout. */ + __u16 ldo_comp_cnt; + __u32 ldo_is_composite:1, + ldo_comp_cached:1; }; - /* directory stripe */ + /* directory stripe (LMV) */ struct { - __u16 ldo_dir_stripenr; - __u16 ldo_dir_stripes_allocated; - __u32 ldo_dir_stripe_offset; - __u32 ldo_dir_hash_type; - __u32 ldo_dir_slave_stripe:1, - ldo_dir_striped:1; + /* Slave stripe count for striped directory. */ + __u16 ldo_dir_stripenr; + /* How many stripes allocated for a striped directory */ + __u16 ldo_dir_stripes_allocated; + __u32 ldo_dir_stripe_offset; + __u32 ldo_dir_hash_type; + /* Is a slave stripe of striped directory? */ + __u32 ldo_dir_slave_stripe:1, + ldo_dir_striped:1; /* * default striping is not cached, so this field is * invalid after create, make sure it's used by * lod_dir_striping_create_internal() only. */ - struct lod_default_striping *ldo_def_striping; + struct lod_default_striping *ldo_def_striping; }; }; - struct dt_object **ldo_stripe; + /* file stripe (LOV) */ + struct lod_layout_component *ldo_comp_entries; + /* slave stripes of striped directory (LMV)*/ + struct dt_object **ldo_stripe; }; -static inline int lod_object_set_pool(struct lod_object *lo, const char *pool) +static inline int lod_set_pool(char **pool, const char *new_pool) { int len; - if (lo->ldo_pool != NULL) { - len = strlen(lo->ldo_pool) + 1; - OBD_FREE(lo->ldo_pool, len); - lo->ldo_pool = NULL; + if (*pool == new_pool) + return 0; + + if (*pool != NULL) { + len = strlen(*pool) + 1; + OBD_FREE(*pool, len); + *pool = NULL; } - if (pool != NULL) { - len = strlen(pool) + 1; - OBD_ALLOC(lo->ldo_pool, len); - if (lo->ldo_pool == NULL) + if (new_pool != NULL) { + len = strlen(new_pool) + 1; + OBD_ALLOC(*pool, len); + if (*pool == NULL) return -ENOMEM; - strlcpy(lo->ldo_pool, pool, len); + strlcpy(*pool, new_pool, len); } return 0; } +static inline int lod_set_def_pool(struct lod_default_striping *lds, + int i, const char *new_pool) +{ + return lod_set_pool(&lds->lds_def_comp_entries[i].llc_pool, + new_pool); +} + +static inline int lod_obj_set_pool(struct lod_object *lo, int i, + const char *new_pool) +{ + return lod_set_pool(&lo->ldo_comp_entries[i].llc_pool, + new_pool); +} + +/* + * Layout generation is used to generate unique component ID, to check ID + * collision, we preserve the highest bit of the layout generation when it + * wrapped. + */ +static inline void lod_obj_inc_layout_gen(struct lod_object *lo) +{ + __u32 preserve = lo->ldo_layout_gen & LCME_ID_NONE; + lo->ldo_layout_gen++; + lo->ldo_layout_gen |= preserve; + /* Zero is not a valid component ID */ + if (unlikely((lo->ldo_layout_gen & LCME_ID_MASK) == 0)) + lo->ldo_layout_gen++; +} + struct lod_it { struct dt_object *lit_obj; /* object from the layer below */ /* stripe offset of iteration */ @@ -392,6 +440,26 @@ static inline struct dt_object* lod_object_child(struct lod_object *o) struct dt_object, do_lu); } +static inline bool lod_obj_is_striped(struct dt_object *dt) +{ + struct lod_object *lo = lod_dt_obj(dt); + int i; + + if (!dt_object_exists(dt_object_child(dt))) + return false; + + if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) + return lo->ldo_dir_stripenr != 0; + + for (i = 0; i < lo->ldo_comp_cnt; i++) { + if (lo->ldo_comp_entries[i].llc_stripe == NULL) + continue; + LASSERT(lo->ldo_comp_entries[i].llc_stripenr > 0); + return true; + } + return false; +} + extern struct lu_context_key lod_thread_key; static inline struct lod_thread_info *lod_env_info(const struct lu_env *env) @@ -477,12 +545,17 @@ int lod_parse_striping(const struct lu_env *env, struct lod_object *mo, int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo, const struct lu_buf *buf); int lod_initialize_objects(const struct lu_env *env, struct lod_object *mo, - struct lov_ost_data_v1 *objs); + struct lov_ost_data_v1 *objs, int index); int lod_verify_striping(struct lod_device *d, const struct lu_buf *buf, - bool is_from_disk); -int lod_generate_and_set_lovea(const struct lu_env *env, - struct lod_object *mo, struct thandle *th); + bool is_from_disk, __u64 start); +int lod_generate_lovea(const struct lu_env *env, struct lod_object *lo, + struct lov_mds_md *lmm, int *lmm_size, bool is_dir); int lod_ea_store_resize(struct lod_thread_info *info, size_t size); +int lod_def_striping_comp_resize(struct lod_default_striping *lds, __u16 count); +void lod_free_def_comp_entries(struct lod_default_striping *lds); +void lod_free_comp_entries(struct lod_object *lo); +int lod_alloc_comp_entries(struct lod_object *lo, int cnt); + /* lod_pool.c */ int lod_ost_pool_add(struct ost_pool *op, __u32 idx, unsigned int min_count); int lod_ost_pool_remove(struct ost_pool *op, __u32 idx); @@ -499,12 +572,14 @@ int lod_pool_add(struct obd_device *obd, char *poolname, char *ostname); int lod_pool_remove(struct obd_device *obd, char *poolname, char *ostname); /* lod_qos.c */ -int lod_qos_prep_create(const struct lu_env *env, struct lod_object *lo, - struct lu_attr *attr, const struct lu_buf *buf, - struct thandle *th); +int lod_prepare_create(const struct lu_env *env, struct lod_object *lo, + struct lu_attr *attr, const struct lu_buf *buf, + struct thandle *th); int qos_add_tgt(struct lod_device*, struct lod_tgt_desc *); int qos_del_tgt(struct lod_device *, struct lod_tgt_desc *); void lod_qos_rr_init(struct lod_qos_rr *lqr); +int lod_use_defined_striping(const struct lu_env *, struct lod_object *, + const struct lu_buf *); /* lproc_lod.c */ int lod_procfs_init(struct lod_device *lod); @@ -523,6 +598,23 @@ int lod_striping_create(const struct lu_env *env, struct dt_object *dt, struct thandle *th); void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo); +struct lod_obj_stripe_cb_data { + union { + const struct lu_attr *locd_attr; + struct ost_pool *locd_inuse; + }; + bool locd_declare; +}; + +typedef int (*lod_obj_stripe_cb_t)(const struct lu_env *env, + struct lod_object *lo, struct dt_object *dt, + struct thandle *th, int stripe_idx, + struct lod_obj_stripe_cb_data *data); + +int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo, + struct thandle *th, lod_obj_stripe_cb_t cb, + struct lod_obj_stripe_cb_data *data); + /* lod_sub_object.c */ struct thandle *lod_sub_get_thandle(const struct lu_env *env, struct thandle *th, diff --git a/lustre/lod/lod_lov.c b/lustre/lod/lod_lov.c index fea2d33..dad82bc 100644 --- a/lustre/lod/lod_lov.c +++ b/lustre/lod/lod_lov.c @@ -617,117 +617,354 @@ int lod_ea_store_resize(struct lod_thread_info *info, size_t size) RETURN(0); } +static void lod_free_comp_buffer(struct lod_layout_component *entries, + __u16 count, __u32 bufsize) +{ + struct lod_layout_component *entry; + int i; + + for (i = 0; i < count; i++) { + entry = &entries[i]; + if (entry->llc_pool != NULL) + lod_set_pool(&entry->llc_pool, NULL); + LASSERT(entry->llc_stripe == NULL); + LASSERT(entry->llc_stripes_allocated == 0); + } + + if (bufsize != 0) + OBD_FREE_LARGE(entries, bufsize); +} + +void lod_free_def_comp_entries(struct lod_default_striping *lds) +{ + lod_free_comp_buffer(lds->lds_def_comp_entries, + lds->lds_def_comp_size_cnt, + size_roundup_power2( + sizeof(*lds->lds_def_comp_entries) * + lds->lds_def_comp_size_cnt)); + lds->lds_def_comp_entries = NULL; + lds->lds_def_comp_cnt = 0; + lds->lds_def_striping_is_composite = 0; + lds->lds_def_comp_size_cnt = 0; +} + /** - * Make LOV EA for striped object. + * Resize per-thread storage to hold default striping component entries + * + * A helper function to resize per-thread temporary storage. This storage + * is used to hold default LOV/LVM EAs and may be quite large. We do not want + * to allocate/release it every time, so instead we put it into the env and + * reallocate it on demand. The memory is released when the correspondent + * thread is finished. * - * Generate striping information and store it in the LOV EA of the given - * object. The caller must ensure nobody else is calling the function - * against the object concurrently. The transaction must be started. - * FLDB service must be running as well; it's used to map FID to the target, - * which is stored in LOV EA. + * \param[in,out] lds default striping + * \param[in] count new component count to grow the buffer to + + * \retval 0 on success, -ENOMEM if reallocation failed + */ +int lod_def_striping_comp_resize(struct lod_default_striping *lds, __u16 count) +{ + struct lod_layout_component *entries; + __u32 new = size_roundup_power2(sizeof(*lds->lds_def_comp_entries) * + count); + __u32 old = size_roundup_power2(sizeof(*lds->lds_def_comp_entries) * + lds->lds_def_comp_size_cnt); + + if (new <= old) + return 0; + + OBD_ALLOC_LARGE(entries, new); + if (entries == NULL) + return -ENOMEM; + + if (lds->lds_def_comp_entries != NULL) { + CDEBUG(D_INFO, "default striping component size %d is not " + "enough, need %d\n", old, new); + lod_free_def_comp_entries(lds); + } + + lds->lds_def_comp_entries = entries; + lds->lds_def_comp_size_cnt = count; + + RETURN(0); +} + +void lod_free_comp_entries(struct lod_object *lo) +{ + lod_free_comp_buffer(lo->ldo_comp_entries, + lo->ldo_comp_cnt, + sizeof(*lo->ldo_comp_entries) * lo->ldo_comp_cnt); + lo->ldo_comp_entries = NULL; + lo->ldo_comp_cnt = 0; + lo->ldo_is_composite = 0; +} + +int lod_alloc_comp_entries(struct lod_object *lo, int cnt) +{ + LASSERT(cnt != 0); + LASSERT(lo->ldo_comp_cnt == 0 && lo->ldo_comp_entries == NULL); + + OBD_ALLOC_LARGE(lo->ldo_comp_entries, + sizeof(*lo->ldo_comp_entries) * cnt); + if (lo->ldo_comp_entries == NULL) + return -ENOMEM; + lo->ldo_comp_cnt = cnt; + return 0; +} + +/** + * Generate on-disk lov_mds_md structure for each layout component based on + * the information in lod_object->ldo_comp_entries[i]. * * \param[in] env execution environment for this thread * \param[in] lo LOD object - * \param[in] th transaction handle + * \param[in] comp_idx index of ldo_comp_entries + * \param[in] lmm buffer to cotain the on-disk lov_mds_md + * \param[in|out] lmm_size buffer size/lmm size + * \param[in] is_dir generate lov ea for dir or file? For dir case, + * the stripe info is from the default stripe + * template, which is collected in lod_ah_init(), + * either from parent object or root object; for + * file case, it's from the @lo object * - * \retval 0 if LOV EA is stored successfully + * \retval 0 if on disk structure is created successfully * \retval negative error number on failure */ -int lod_generate_and_set_lovea(const struct lu_env *env, - struct lod_object *lo, struct thandle *th) +static int lod_gen_component_ea(const struct lu_env *env, + struct lod_object *lo, int comp_idx, + struct lov_mds_md *lmm, int *lmm_size, + bool is_dir) { struct lod_thread_info *info = lod_env_info(env); - struct dt_object *next = dt_object_child(&lo->ldo_obj); const struct lu_fid *fid = lu_object_fid(&lo->ldo_obj.do_lu); - struct lov_mds_md_v1 *lmm; + struct lod_device *lod; struct lov_ost_data_v1 *objs; - __u32 magic; - int i, rc; - size_t lmm_size; + struct lod_layout_component *lod_comp; + __u32 magic; + int i, rc = 0; ENTRY; LASSERT(lo); + if (is_dir) + lod_comp = + &lo->ldo_def_striping->lds_def_comp_entries[comp_idx]; + else + lod_comp = &lo->ldo_comp_entries[comp_idx]; - magic = lo->ldo_pool != NULL ? LOV_MAGIC_V3 : LOV_MAGIC_V1; - lmm_size = lov_mds_md_size(lo->ldo_stripenr, magic); - if (info->lti_ea_store_size < lmm_size) { - rc = lod_ea_store_resize(info, lmm_size); - if (rc) - RETURN(rc); - } - - if (lo->ldo_pattern == 0) /* default striping */ - lo->ldo_pattern = LOV_PATTERN_RAID0; - - lmm = info->lti_ea_store; + magic = lod_comp->llc_pool != NULL ? LOV_MAGIC_V3 : LOV_MAGIC_V1; + if (lod_comp->llc_pattern == 0) /* default striping */ + lod_comp->llc_pattern = LOV_PATTERN_RAID0; lmm->lmm_magic = cpu_to_le32(magic); - lmm->lmm_pattern = cpu_to_le32(lo->ldo_pattern); + lmm->lmm_pattern = cpu_to_le32(lod_comp->llc_pattern); fid_to_lmm_oi(fid, &lmm->lmm_oi); if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_LMMOI)) lmm->lmm_oi.oi.oi_id++; lmm_oi_cpu_to_le(&lmm->lmm_oi, &lmm->lmm_oi); - lmm->lmm_stripe_size = cpu_to_le32(lo->ldo_stripe_size); - lmm->lmm_stripe_count = cpu_to_le16(lo->ldo_stripenr); - if (lo->ldo_pattern & LOV_PATTERN_F_RELEASED) - lmm->lmm_stripe_count = cpu_to_le16(lo->ldo_released_stripenr); - lmm->lmm_layout_gen = 0; + + lmm->lmm_stripe_size = cpu_to_le32(lod_comp->llc_stripe_size); + lmm->lmm_stripe_count = cpu_to_le16(lod_comp->llc_stripenr); + /* for dir, lmm_layout_gen stores default stripe offset. */ + lmm->lmm_layout_gen = is_dir ? + cpu_to_le16(lod_comp->llc_stripe_offset) : + cpu_to_le16(lod_comp->llc_layout_gen); + if (magic == LOV_MAGIC_V1) { objs = &lmm->lmm_objects[0]; } else { - struct lov_mds_md_v3 *v3 = (struct lov_mds_md_v3 *) lmm; - size_t cplen = strlcpy(v3->lmm_pool_name, lo->ldo_pool, - sizeof(v3->lmm_pool_name)); + struct lov_mds_md_v3 *v3 = (struct lov_mds_md_v3 *)lmm; + size_t cplen = strlcpy(v3->lmm_pool_name, + lod_comp->llc_pool, + sizeof(v3->lmm_pool_name)); if (cplen >= sizeof(v3->lmm_pool_name)) RETURN(-E2BIG); objs = &v3->lmm_objects[0]; } - for (i = 0; i < lo->ldo_stripenr; i++) { - struct lu_fid *fid = &info->lti_fid; - struct lod_device *lod; - __u32 index; - int type = LU_SEQ_RANGE_OST; + if (is_dir || lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED) + GOTO(done, rc = 0); + + lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); + for (i = 0; i < lod_comp->llc_stripenr; i++) { + struct dt_object *object; + __u32 ost_idx; + int type = LU_SEQ_RANGE_OST; - lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); - LASSERT(lo->ldo_stripe[i]); + object = lod_comp->llc_stripe[i]; + LASSERT(object != NULL); + info->lti_fid = *lu_object_fid(&object->do_lu); - *fid = *lu_object_fid(&lo->ldo_stripe[i]->do_lu); if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MULTIPLE_REF)) { if (cfs_fail_val == 0) - cfs_fail_val = fid->f_oid; + cfs_fail_val = info->lti_fid.f_oid; else - fid->f_oid = cfs_fail_val; + info->lti_fid.f_oid = cfs_fail_val; } - rc = fid_to_ostid(fid, &info->lti_ostid); + rc = fid_to_ostid(&info->lti_fid, &info->lti_ostid); LASSERT(rc == 0); ostid_cpu_to_le(&info->lti_ostid, &objs[i].l_ost_oi); - objs[i].l_ost_gen = cpu_to_le32(0); + objs[i].l_ost_gen = cpu_to_le32(0); if (OBD_FAIL_CHECK(OBD_FAIL_MDS_FLD_LOOKUP)) rc = -ENOENT; else - rc = lod_fld_lookup(env, lod, fid, - &index, &type); + rc = lod_fld_lookup(env, lod, &info->lti_fid, + &ost_idx, &type); if (rc < 0) { CERROR("%s: Can not locate "DFID": rc = %d\n", - lod2obd(lod)->obd_name, PFID(fid), rc); - lod_object_free_striping(env, lo); + lod2obd(lod)->obd_name, PFID(&info->lti_fid), + rc); RETURN(rc); } - objs[i].l_ost_idx = cpu_to_le32(index); + objs[i].l_ost_idx = cpu_to_le32(ost_idx); + } +done: + if (lmm_size != NULL) + *lmm_size = lov_mds_md_size(is_dir ? + 0 : lod_comp->llc_stripenr, magic); + RETURN(rc); +} + +/** + * Generate component ID for new created component. + * + * \param[in] lo LOD object + * \param[in] comp_idx index of ldo_comp_entries + * + * \retval component ID on success + * \retval LCME_ID_INVAL on failure + */ +static __u32 lod_gen_component_id(struct lod_object *lo, int comp_idx) +{ + struct lod_layout_component *lod_comp; + __u32 id, start, end; + int i; + + LASSERT(lo->ldo_comp_entries[comp_idx].llc_id == LCME_ID_INVAL); + + lod_obj_inc_layout_gen(lo); + id = lo->ldo_layout_gen; + if (likely(id <= LCME_ID_MAX)) + return id; + + /* Layout generation wraps, need to check collisions. */ + start = id & LCME_ID_MASK; + end = (__u32)LCME_ID_MAX + 1; +again: + for (id = start; id < end; id++) { + for (i = 0; i < lo->ldo_comp_cnt; i++) { + lod_comp = &lo->ldo_comp_entries[i]; + if (id == lod_comp->llc_id) + break; + } + /* Found the ununsed ID */ + if (i == lo->ldo_comp_cnt) + return id; + } + if (end == (__u32)LCME_ID_MAX + 1) { + start = 0; + end = lo->ldo_layout_gen & LCME_ID_MASK; + goto again; } - info->lti_buf.lb_buf = lmm; - info->lti_buf.lb_len = lmm_size; - rc = lod_sub_object_xattr_set(env, next, &info->lti_buf, XATTR_NAME_LOV, - 0, th); - if (rc < 0) { - lod_object_free_striping(env, lo); + return LCME_ID_INVAL; +} + +/** + * Generate on-disk lov_mds_md structure based on the information in + * the lod_object->ldo_comp_entries. + * + * \param[in] env execution environment for this thread + * \param[in] lo LOD object + * \param[in] lmm buffer to cotain the on-disk lov_mds_md + * \param[in|out] lmm_size buffer size/lmm size + * \param[in] is_dir generate lov ea for dir or file? For dir case, + * the stripe info is from the default stripe + * template, which is collected in lod_ah_init(), + * either from parent object or root object; for + * file case, it's from the @lo object + * + * \retval 0 if on disk structure is created successfully + * \retval negative error number on failure + */ +int lod_generate_lovea(const struct lu_env *env, struct lod_object *lo, + struct lov_mds_md *lmm, int *lmm_size, bool is_dir) +{ + struct lov_comp_md_entry_v1 *lcme; + struct lov_comp_md_v1 *lcm; + struct lod_layout_component *comp_entries; + __u16 comp_cnt; + bool is_composite; + int i, rc = 0, offset; + ENTRY; + + if (is_dir) { + comp_cnt = lo->ldo_def_striping->lds_def_comp_cnt; + comp_entries = lo->ldo_def_striping->lds_def_comp_entries; + is_composite = + lo->ldo_def_striping->lds_def_striping_is_composite; + } else { + comp_cnt = lo->ldo_comp_cnt; + comp_entries = lo->ldo_comp_entries; + is_composite = lo->ldo_is_composite; + } + + LASSERT(lmm_size != NULL); + LASSERT(comp_cnt != 0 && comp_entries != NULL); + + if (!is_composite) { + rc = lod_gen_component_ea(env, lo, 0, lmm, lmm_size, is_dir); RETURN(rc); } + lcm = (struct lov_comp_md_v1 *)lmm; + lcm->lcm_magic = cpu_to_le32(LOV_MAGIC_COMP_V1); + lcm->lcm_entry_count = cpu_to_le16(comp_cnt); + + offset = sizeof(*lcm) + sizeof(*lcme) * comp_cnt; + LASSERT(offset % sizeof(__u64) == 0); + + for (i = 0; i < comp_cnt; i++) { + struct lod_layout_component *lod_comp; + struct lov_mds_md *sub_md; + int size; + + lod_comp = &comp_entries[i]; + lcme = &lcm->lcm_entries[i]; + + if (lod_comp->llc_id == LCME_ID_INVAL && !is_dir) { + lod_comp->llc_id = lod_gen_component_id(lo, i); + if (lod_comp->llc_id == LCME_ID_INVAL) + GOTO(out, rc = -ERANGE); + } + lcme->lcme_id = cpu_to_le32(lod_comp->llc_id); + /* component must has been inistantiated */ + LASSERT(ergo(!is_dir, lod_comp->llc_flags & LCME_FL_INIT)); + lcme->lcme_flags = cpu_to_le32(lod_comp->llc_flags); + lcme->lcme_extent.e_start = + cpu_to_le64(lod_comp->llc_extent.e_start); + lcme->lcme_extent.e_end = + cpu_to_le64(lod_comp->llc_extent.e_end); + lcme->lcme_offset = cpu_to_le32(offset); + + sub_md = (struct lov_mds_md *)((char *)lcm + offset); + rc = lod_gen_component_ea(env, lo, i, sub_md, &size, is_dir); + if (rc) + GOTO(out, rc); + lcme->lcme_size = cpu_to_le32(size); + offset += size; + LASSERTF((offset <= *lmm_size) && (offset % sizeof(__u64) == 0), + "offset:%d lmm_size:%d\n", offset, *lmm_size); + } + lcm->lcm_size = cpu_to_le32(offset); + lcm->lcm_layout_gen = cpu_to_le32(is_dir ? 0 : lo->ldo_layout_gen); + + lustre_print_user_md(D_LAYOUT, (struct lov_user_md *)lmm, + "generate lum"); +out: + if (rc == 0) + *lmm_size = offset; RETURN(rc); } @@ -832,13 +1069,15 @@ static int validate_lod_and_idx(struct lod_device *md, __u32 idx) * \param[in] env execution environment for this thread * \param[in,out] lo LOD object * \param[in] objs an array of IDs to creates the objects from + * \param[in] comp_idx index of ldo_comp_entries * * \retval 0 if the objects are instantiated successfully * \retval negative error number on failure */ int lod_initialize_objects(const struct lu_env *env, struct lod_object *lo, - struct lov_ost_data_v1 *objs) + struct lov_ost_data_v1 *objs, int comp_idx) { + struct lod_layout_component *lod_comp; struct lod_thread_info *info = lod_env_info(env); struct lod_device *md; struct lu_object *o, *n; @@ -851,16 +1090,20 @@ int lod_initialize_objects(const struct lu_env *env, struct lod_object *lo, LASSERT(lo != NULL); md = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); - LASSERT(lo->ldo_stripe == NULL); - LASSERT(lo->ldo_stripenr > 0); - LASSERT(lo->ldo_stripe_size > 0); - stripe_len = lo->ldo_stripenr; + LASSERT(lo->ldo_comp_cnt != 0 && lo->ldo_comp_entries != NULL); + lod_comp = &lo->ldo_comp_entries[comp_idx]; + + LASSERT(lod_comp->llc_stripe == NULL); + LASSERT(lod_comp->llc_stripenr > 0); + LASSERT(lod_comp->llc_stripe_size > 0); + + stripe_len = lod_comp->llc_stripenr; OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_len); if (stripe == NULL) RETURN(-ENOMEM); - for (i = 0; i < lo->ldo_stripenr; i++) { + for (i = 0; i < lod_comp->llc_stripenr; i++) { if (unlikely(lovea_slot_is_dummy(&objs[i]))) continue; @@ -902,10 +1145,10 @@ out: dt_object_put(env, stripe[i]); OBD_FREE(stripe, sizeof(stripe[0]) * stripe_len); - lo->ldo_stripenr = 0; + lod_comp->llc_stripenr = 0; } else { - lo->ldo_stripe = stripe; - lo->ldo_stripes_allocated = stripe_len; + lod_comp->llc_stripe = stripe; + lod_comp->llc_stripes_allocated = stripe_len; } RETURN(rc); @@ -928,56 +1171,131 @@ int lod_parse_striping(const struct lu_env *env, struct lod_object *lo, const struct lu_buf *buf) { struct lov_mds_md_v1 *lmm; + struct lov_comp_md_v1 *comp_v1 = NULL; struct lov_ost_data_v1 *objs; - __u32 magic; - __u32 pattern; - int rc = 0; + __u32 magic, pattern; + int i, rc = 0; + __u16 comp_cnt; ENTRY; LASSERT(buf); LASSERT(buf->lb_buf); LASSERT(buf->lb_len); - lmm = (struct lov_mds_md_v1 *) buf->lb_buf; + lmm = (struct lov_mds_md_v1 *)buf->lb_buf; magic = le32_to_cpu(lmm->lmm_magic); - pattern = le32_to_cpu(lmm->lmm_pattern); - if (magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3) + if (magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3 && + magic != LOV_MAGIC_COMP_V1) GOTO(out, rc = -EINVAL); - if (lov_pattern(pattern) != LOV_PATTERN_RAID0) - GOTO(out, rc = -EINVAL); - - lo->ldo_pattern = pattern; - lo->ldo_stripe_size = le32_to_cpu(lmm->lmm_stripe_size); - lo->ldo_layout_gen = le16_to_cpu(lmm->lmm_layout_gen); - lo->ldo_stripenr = le16_to_cpu(lmm->lmm_stripe_count); - /* released file stripenr fixup. */ - if (pattern & LOV_PATTERN_F_RELEASED) - lo->ldo_stripenr = 0; - LASSERT(buf->lb_len >= lov_mds_md_size(lo->ldo_stripenr, magic)); + lod_free_comp_entries(lo); - if (magic == LOV_MAGIC_V3) { - struct lov_mds_md_v3 *v3 = (struct lov_mds_md_v3 *) lmm; - objs = &v3->lmm_objects[0]; - /* no need to set pool, which is used in create only */ + if (magic == LOV_MAGIC_COMP_V1) { + comp_v1 = (struct lov_comp_md_v1 *)lmm; + comp_cnt = le16_to_cpu(comp_v1->lcm_entry_count); + if (comp_cnt == 0) + GOTO(out, rc = -EINVAL); + lo->ldo_layout_gen = le32_to_cpu(comp_v1->lcm_layout_gen); + lo->ldo_is_composite = 1; } else { - objs = &lmm->lmm_objects[0]; + comp_cnt = 1; + lo->ldo_layout_gen = le16_to_cpu(lmm->lmm_layout_gen); + lo->ldo_is_composite = 0; } - if (lo->ldo_stripenr > 0) - rc = lod_initialize_objects(env, lo, objs); + rc = lod_alloc_comp_entries(lo, comp_cnt); + if (rc) + GOTO(out, rc); + + for (i = 0; i < comp_cnt; i++) { + struct lod_layout_component *lod_comp; + struct lu_extent *ext; + __u32 offs; + + lod_comp = &lo->ldo_comp_entries[i]; + if (lo->ldo_is_composite) { + offs = le32_to_cpu(comp_v1->lcm_entries[i].lcme_offset); + lmm = (struct lov_mds_md_v1 *)((char *)comp_v1 + offs); + magic = le32_to_cpu(lmm->lmm_magic); + + ext = &comp_v1->lcm_entries[i].lcme_extent; + lod_comp->llc_extent.e_start = + le64_to_cpu(ext->e_start); + lod_comp->llc_extent.e_end = le64_to_cpu(ext->e_end); + lod_comp->llc_flags = + le32_to_cpu(comp_v1->lcm_entries[i].lcme_flags); + lod_comp->llc_id = + le32_to_cpu(comp_v1->lcm_entries[i].lcme_id); + if (lod_comp->llc_id == LCME_ID_INVAL) + GOTO(out, rc = -EINVAL); + } else { + lod_comp->llc_flags = LCME_FL_INIT; + } + + pattern = le32_to_cpu(lmm->lmm_pattern); + if (lov_pattern(pattern) != LOV_PATTERN_RAID0) + GOTO(out, rc = -EINVAL); + + lod_comp->llc_pattern = pattern; + lod_comp->llc_stripe_size = le32_to_cpu(lmm->lmm_stripe_size); + lod_comp->llc_stripenr = le16_to_cpu(lmm->lmm_stripe_count); + lod_comp->llc_layout_gen = le16_to_cpu(lmm->lmm_layout_gen); + if (magic == LOV_MAGIC_V3) { + struct lov_mds_md_v3 *v3 = (struct lov_mds_md_v3 *)lmm; + objs = &v3->lmm_objects[0]; + /* no need to set pool, which is used in create only */ + } else { + objs = &lmm->lmm_objects[0]; + } + + if (!(lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED)) { + rc = lod_initialize_objects(env, lo, objs, i); + if (rc) + GOTO(out, rc); + } + } out: + if (rc) + lod_object_free_striping(env, lo); RETURN(rc); } /** + * Check whether the striping (LOVEA for regular file, LMVEA for directory) + * is already cached. + * + * \param[in] lo LOD object + * + * \retval True if the striping is cached, otherwise + * return false. + */ +static bool lod_striping_loaded(struct lod_object *lo) +{ + if (S_ISREG(lod2lu_obj(lo)->lo_header->loh_attr) && + lo->ldo_comp_cached) + return true; + + if (S_ISDIR(lod2lu_obj(lo)->lo_header->loh_attr)) { + if (lo->ldo_stripe != NULL) + return true; + + /* Never load LMV stripe for slaves of striped dir */ + if (lo->ldo_dir_slave_stripe) + return true; + } + + return false; +} + +/** * Initialize the object representing the stripes. * * Unless the stripes are initialized already, fetch LOV (for regular * objects) or LMV (for directory objects) EA and call lod_parse_striping() - * to instantiate the objects representing the stripes. + * to instantiate the objects representing the stripes. Caller should + * hold the dt_write_lock(next). * * \param[in] env execution environment for this thread * \param[in,out] lo LOD object @@ -993,18 +1311,13 @@ int lod_load_striping_locked(const struct lu_env *env, struct lod_object *lo) int rc = 0; ENTRY; - /* already initialized? */ - if (lo->ldo_stripe != NULL) - GOTO(out, rc = 0); - if (!dt_object_exists(next)) GOTO(out, rc = 0); - /* Do not load stripe for slaves of striped dir */ - if (lo->ldo_dir_slave_stripe) + if (lod_striping_loaded(lo)) GOTO(out, rc = 0); - if (S_ISREG(lu_object_attr(lod2lu_obj(lo)))) { + if (S_ISREG(lod2lu_obj(lo)->lo_header->loh_attr)) { rc = lod_get_lov_ea(env, lo); if (rc <= 0) GOTO(out, rc); @@ -1015,7 +1328,9 @@ int lod_load_striping_locked(const struct lu_env *env, struct lod_object *lo) buf->lb_buf = info->lti_ea_store; buf->lb_len = info->lti_ea_store_size; rc = lod_parse_striping(env, lo, buf); - } else if (S_ISDIR(lu_object_attr(lod2lu_obj(lo)))) { + if (rc == 0) + lo->ldo_comp_cached = 1; + } else if (S_ISDIR(lod2lu_obj(lo)->lo_header->loh_attr)) { rc = lod_get_lmv_ea(env, lo); if (rc < (typeof(rc))sizeof(struct lmv_mds_md_v1)) GOTO(out, rc = rc > 0 ? -EINVAL : rc); @@ -1036,7 +1351,7 @@ int lod_load_striping_locked(const struct lu_env *env, struct lod_object *lo) } /* - * there is LOV EA (striping information) in this object + * there is LMV EA (striping information) in this object * let's parse it and create in-core objects for the stripes */ rc = lod_parse_dir_striping(env, lo, buf); @@ -1064,7 +1379,14 @@ out: int lod_load_striping(const struct lu_env *env, struct lod_object *lo) { struct dt_object *next = dt_object_child(&lo->ldo_obj); - int rc = 0; + int rc; + + if (!dt_object_exists(next)) + return 0; + + /* Check without locking first */ + if (lod_striping_loaded(lo)) + return 0; /* currently this code is supposed to be called from declaration * phase only, thus the object is not expected to be locked by caller */ @@ -1075,7 +1397,7 @@ int lod_load_striping(const struct lu_env *env, struct lod_object *lo) } /** - * Verify striping. + * Verify lov_user_md_v1/v3 striping. * * Check the validity of all fields including the magic, stripe size, * stripe count, stripe offset and that the pool is present. Also check @@ -1091,8 +1413,8 @@ int lod_load_striping(const struct lu_env *env, struct lod_object *lo) * \retval 0 if the striping is valid * \retval -EINVAL if striping is invalid */ -int lod_verify_striping(struct lod_device *d, const struct lu_buf *buf, - bool is_from_disk) +static int lod_verify_v1v3(struct lod_device *d, const struct lu_buf *buf, + bool is_from_disk) { struct lov_user_md_v1 *lum; struct lov_user_md_v3 *lum3; @@ -1107,20 +1429,18 @@ int lod_verify_striping(struct lod_device *d, const struct lu_buf *buf, lum = buf->lb_buf; - LASSERT(sizeof(*lum) < sizeof(*lum3)); - if (buf->lb_len < sizeof(*lum)) { - CDEBUG(D_IOCTL, "buf len %zu too small for lov_user_md\n", + CDEBUG(D_LAYOUT, "buf len %zu too small for lov_user_md\n", buf->lb_len); GOTO(out, rc = -EINVAL); } - magic = le32_to_cpu(lum->lmm_magic); + magic = le32_to_cpu(lum->lmm_magic) & ~LOV_MAGIC_DEF; if (magic != LOV_USER_MAGIC_V1 && magic != LOV_USER_MAGIC_V3 && - magic != LOV_MAGIC_V1_DEF && - magic != LOV_MAGIC_V3_DEF) { - CDEBUG(D_IOCTL, "bad userland LOV MAGIC: %#x\n", magic); + magic != LOV_USER_MAGIC_SPECIFIC) { + CDEBUG(D_LAYOUT, "bad userland LOV MAGIC: %#x\n", + le32_to_cpu(lum->lmm_magic)); GOTO(out, rc = -EINVAL); } @@ -1128,65 +1448,64 @@ int lod_verify_striping(struct lod_device *d, const struct lu_buf *buf, if (!is_from_disk && lum->lmm_pattern == 0) lum->lmm_pattern = cpu_to_le32(LOV_PATTERN_RAID0); - if (le32_to_cpu(lum->lmm_pattern) != LOV_PATTERN_RAID0) { - CDEBUG(D_IOCTL, "bad userland stripe pattern: %#x\n", + if (!lov_pattern_supported(le32_to_cpu(lum->lmm_pattern))) { + CDEBUG(D_LAYOUT, "bad userland stripe pattern: %#x\n", le32_to_cpu(lum->lmm_pattern)); GOTO(out, rc = -EINVAL); } + /* a released lum comes from creating orphan on hsm release, + * doesn't make sense to verify it. */ + if (le32_to_cpu(lum->lmm_pattern) & LOV_PATTERN_F_RELEASED) + GOTO(out, rc = 0); + /* 64kB is the largest common page size we see (ia64), and matches the * check in lfs */ stripe_size = le32_to_cpu(lum->lmm_stripe_size); if (stripe_size & (LOV_MIN_STRIPE_SIZE - 1)) { - CDEBUG(D_IOCTL, "stripe size %u not a multiple of %u\n", + CDEBUG(D_LAYOUT, "stripe size %u not a multiple of %u\n", stripe_size, LOV_MIN_STRIPE_SIZE); GOTO(out, rc = -EINVAL); } stripe_offset = le16_to_cpu(lum->lmm_stripe_offset); - if (stripe_offset != LOV_OFFSET_DEFAULT) { + if (!is_from_disk && stripe_offset != LOV_OFFSET_DEFAULT) { /* if offset is not within valid range [0, osts_size) */ if (stripe_offset >= d->lod_osts_size) { - CDEBUG(D_IOCTL, "stripe offset %u >= bitmap size %u\n", + CDEBUG(D_LAYOUT, "stripe offset %u >= bitmap size %u\n", stripe_offset, d->lod_osts_size); GOTO(out, rc = -EINVAL); } /* if lmm_stripe_offset is *not* in bitmap */ if (!cfs_bitmap_check(d->lod_ost_bitmap, stripe_offset)) { - CDEBUG(D_IOCTL, "stripe offset %u not in bitmap\n", + CDEBUG(D_LAYOUT, "stripe offset %u not in bitmap\n", stripe_offset); GOTO(out, rc = -EINVAL); } } - if (magic == LOV_USER_MAGIC_V1 || magic == LOV_MAGIC_V1_DEF) + if (magic == LOV_USER_MAGIC_V1) lum_size = offsetof(struct lov_user_md_v1, lmm_objects[0]); - else if (magic == LOV_USER_MAGIC_V3 || magic == LOV_MAGIC_V3_DEF) + else if (magic == LOV_USER_MAGIC_V3 || magic == LOV_USER_MAGIC_SPECIFIC) lum_size = offsetof(struct lov_user_md_v3, lmm_objects[0]); else GOTO(out, rc = -EINVAL); stripe_count = le16_to_cpu(lum->lmm_stripe_count); - if (buf->lb_len != lum_size) { - CDEBUG(D_IOCTL, "invalid buf len %zu for lov_user_md with " + if (buf->lb_len < lum_size) { + CDEBUG(D_LAYOUT, "invalid buf len %zu/%zu for lov_user_md with " "magic %#x and stripe_count %u\n", - buf->lb_len, magic, stripe_count); + buf->lb_len, lum_size, magic, stripe_count); GOTO(out, rc = -EINVAL); } - if (!(magic == LOV_USER_MAGIC_V3 || magic == LOV_MAGIC_V3_DEF)) + if (!(magic == LOV_USER_MAGIC_V3 || magic == LOV_USER_MAGIC_SPECIFIC)) goto out; lum3 = buf->lb_buf; - if (buf->lb_len < sizeof(*lum3)) { - CDEBUG(D_IOCTL, "buf len %zu too small for lov_user_md_v3\n", - buf->lb_len); - GOTO(out, rc = -EINVAL); - } - /* In the function below, .hs_keycmp resolves to * pool_hashkey_keycmp() */ /* coverity[overrun-buffer-val] */ @@ -1194,15 +1513,14 @@ int lod_verify_striping(struct lod_device *d, const struct lu_buf *buf, if (pool == NULL) goto out; - if (stripe_offset != LOV_OFFSET_DEFAULT) { + if (!is_from_disk && stripe_offset != LOV_OFFSET_DEFAULT) { rc = lod_check_index_in_pool(stripe_offset, pool); if (rc < 0) GOTO(out, rc = -EINVAL); } if (is_from_disk && stripe_count > pool_tgt_count(pool)) { - CDEBUG(D_IOCTL, - "stripe count %u > # OSTs %u in the pool\n", + CDEBUG(D_LAYOUT, "stripe count %u > # OSTs %u in the pool\n", stripe_count, pool_tgt_count(pool)); GOTO(out, rc = -EINVAL); } @@ -1214,6 +1532,125 @@ out: RETURN(rc); } +/** + * Verify LOV striping. + * + * \param[in] d LOD device + * \param[in] buf buffer with LOV EA to verify + * \param[in] is_from_disk 0 - from user, allow some fields to be 0 + * 1 - from disk, do not allow + * \param[in] start extent start for composite layout + * + * \retval 0 if the striping is valid + * \retval -EINVAL if striping is invalid + */ +int lod_verify_striping(struct lod_device *d, const struct lu_buf *buf, + bool is_from_disk, __u64 start) +{ + struct lov_user_md_v1 *lum; + struct lov_comp_md_v1 *comp_v1; + __u32 magic; + int rc = 0, i; + ENTRY; + + lum = buf->lb_buf; + + if (buf->lb_len < sizeof(*lum)) { + CDEBUG(D_LAYOUT, "buf len %zu too small for lov_user_md\n", + buf->lb_len); + RETURN(-EINVAL); + } + + magic = le32_to_cpu(lum->lmm_magic) & ~LOV_MAGIC_DEF; + if (magic != LOV_USER_MAGIC_V1 && + magic != LOV_USER_MAGIC_V3 && + magic != LOV_USER_MAGIC_SPECIFIC && + magic != LOV_USER_MAGIC_COMP_V1) { + CDEBUG(D_LAYOUT, "bad userland LOV MAGIC: %#x\n", + le32_to_cpu(lum->lmm_magic)); + RETURN(-EINVAL); + } + + if (magic == LOV_USER_MAGIC_COMP_V1) { + struct lov_comp_md_entry_v1 *ent; + struct lu_extent *ext; + struct lov_desc *desc = &d->lod_desc; + struct lu_buf tmp; + __u32 stripe_size = 0; + __u64 prev_end = start; + + comp_v1 = buf->lb_buf; + if (buf->lb_len < le32_to_cpu(comp_v1->lcm_size)) { + CDEBUG(D_LAYOUT, "buf len %zu is less than %u\n", + buf->lb_len, le32_to_cpu(comp_v1->lcm_size)); + RETURN(-EINVAL); + } + + if (le32_to_cpu(comp_v1->lcm_entry_count) == 0) { + CDEBUG(D_LAYOUT, "entry count is zero\n"); + RETURN(-EINVAL); + } + + for (i = 0; i < le32_to_cpu(comp_v1->lcm_entry_count); i++) { + ent = &comp_v1->lcm_entries[i]; + ext = &ent->lcme_extent; + + if (is_from_disk && + (le32_to_cpu(ent->lcme_id) == 0 || + le32_to_cpu(ent->lcme_id) > LCME_ID_MAX)) { + CDEBUG(D_LAYOUT, "invalid id %u\n", + le32_to_cpu(ent->lcme_id)); + RETURN(-EINVAL); + } + + if (le64_to_cpu(ext->e_start) >= + le64_to_cpu(ext->e_end)) { + CDEBUG(D_LAYOUT, "invalid extent " + "[%llu, %llu)\n", + le64_to_cpu(ext->e_start), + le64_to_cpu(ext->e_end)); + RETURN(-EINVAL); + } + + /* first component must start with 0, and the next + * must be adjacent with the previous one */ + if (le64_to_cpu(ext->e_start) != prev_end) { + CDEBUG(D_LAYOUT, "invalid start " + "actual:%llu, expect:%llu\n", + le64_to_cpu(ext->e_start), prev_end); + RETURN(-EINVAL); + } + prev_end = le64_to_cpu(ext->e_end); + + tmp.lb_buf = (char *)comp_v1 + + le32_to_cpu(ent->lcme_offset); + tmp.lb_len = le32_to_cpu(ent->lcme_size); + rc = lod_verify_v1v3(d, &tmp, is_from_disk); + if (rc) + break; + + lum = tmp.lb_buf; + + /* extent end must be aligned with the stripe_size */ + stripe_size = le32_to_cpu(lum->lmm_stripe_size); + if (stripe_size == 0) + stripe_size = desc->ld_default_stripe_size; + if (stripe_size == 0 || + (prev_end != LUSTRE_EOF && + (prev_end & (stripe_size - 1)))) { + CDEBUG(D_LAYOUT, "stripe size isn't aligned. " + " stripe_sz: %u, [%llu, %llu)\n", + stripe_size, ext->e_start, prev_end); + RETURN(-EINVAL); + } + } + } else { + rc = lod_verify_v1v3(d, buf, is_from_disk); + } + + RETURN(rc); +} + void lod_fix_desc_stripe_size(__u64 *val) { if (*val < LOV_MIN_STRIPE_SIZE) { diff --git a/lustre/lod/lod_object.c b/lustre/lod/lod_object.c index 1e939b0..f0d467c 100644 --- a/lustre/lod/lod_object.c +++ b/lustre/lod/lod_object.c @@ -396,7 +396,7 @@ static struct dt_it *lod_striped_it_init(const struct lu_env *env, struct dt_it *it_next; ENTRY; - LASSERT(lo->ldo_stripenr > 0); + LASSERT(lo->ldo_dir_stripenr > 0); next = lo->ldo_stripe[0]; LASSERT(next != NULL); LASSERT(next->do_index_ops != NULL); @@ -419,12 +419,12 @@ static struct dt_it *lod_striped_it_init(const struct lu_env *env, return (struct dt_it *)it; } -#define LOD_CHECK_STRIPED_IT(env, it, lo) \ -do { \ - LASSERT((it)->lit_obj != NULL); \ - LASSERT((it)->lit_it != NULL); \ - LASSERT((lo)->ldo_stripenr > 0); \ - LASSERT((it)->lit_stripe_index < (lo)->ldo_stripenr); \ +#define LOD_CHECK_STRIPED_IT(env, it, lo) \ +do { \ + LASSERT((it)->lit_obj != NULL); \ + LASSERT((it)->lit_it != NULL); \ + LASSERT((lo)->ldo_dir_stripenr > 0); \ + LASSERT((it)->lit_stripe_index < (lo)->ldo_dir_stripenr); \ } while (0) /** @@ -560,7 +560,7 @@ again: } /* go to next stripe */ - if (it->lit_stripe_index + 1 >= lo->ldo_stripenr) + if (it->lit_stripe_index + 1 >= lo->ldo_dir_stripenr) RETURN(1); it->lit_stripe_index++; @@ -970,10 +970,10 @@ static int lod_index_try(const struct lu_env *env, struct dt_object *dt, if (rc != 0) RETURN(rc); - if (lo->ldo_stripenr > 0) { + if (lo->ldo_dir_stripenr > 0) { int i; - for (i = 0; i < lo->ldo_stripenr; i++) { + for (i = 0; i < lo->ldo_dir_stripenr; i++) { if (dt_object_exists(lo->ldo_stripe[i]) == 0) continue; rc = lo->ldo_stripe[i]->do_ops->do_index_try(env, @@ -1064,6 +1064,47 @@ static int lod_attr_get(const struct lu_env *env, return dt_attr_get(env, dt_object_child(dt), attr); } +int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo, + struct thandle *th, lod_obj_stripe_cb_t cb, + struct lod_obj_stripe_cb_data *data) +{ + struct lod_layout_component *lod_comp; + int i, j, rc; + ENTRY; + + LASSERT(lo->ldo_comp_cnt != 0 && lo->ldo_comp_entries != NULL); + for (i = 0; i < lo->ldo_comp_cnt; i++) { + lod_comp = &lo->ldo_comp_entries[i]; + + if (lod_comp->llc_stripe == NULL) + continue; + + LASSERT(lod_comp->llc_stripenr > 0); + for (j = 0; j < lod_comp->llc_stripenr; j++) { + struct dt_object *dt = lod_comp->llc_stripe[j]; + + if (dt == NULL) + continue; + rc = cb(env, lo, dt, th, j, data); + if (rc != 0) + RETURN(rc); + } + } + RETURN(0); +} + +static inline int +lod_obj_stripe_attr_set_cb(const struct lu_env *env, struct lod_object *lo, + struct dt_object *dt, struct thandle *th, + int stripe_idx, struct lod_obj_stripe_cb_data *data) +{ + if (data->locd_declare) + return lod_sub_object_declare_attr_set(env, dt, + data->locd_attr, th); + else + return lod_sub_object_attr_set(env, dt, data->locd_attr, th); +} + /** * Implementation of dt_object_operations::do_declare_attr_set. * @@ -1117,21 +1158,30 @@ static int lod_declare_attr_set(const struct lu_env *env, if (rc) RETURN(rc); - if (lo->ldo_stripenr == 0) + if (!lod_obj_is_striped(dt)) RETURN(0); /* * if object is striped declare changes on the stripes */ - LASSERT(lo->ldo_stripe); - for (i = 0; i < lo->ldo_stripenr; i++) { - if (lo->ldo_stripe[i] == NULL) - continue; - rc = lod_sub_object_declare_attr_set(env, + if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) { + LASSERT(lo->ldo_stripe); + for (i = 0; i < lo->ldo_dir_stripenr; i++) { + if (lo->ldo_stripe[i] == NULL) + continue; + rc = lod_sub_object_declare_attr_set(env, lo->ldo_stripe[i], attr, th); - if (rc != 0) - RETURN(rc); + if (rc != 0) + RETURN(rc); + } + } else { + struct lod_obj_stripe_cb_data data; + + data.locd_attr = attr; + data.locd_declare = true; + rc = lod_obj_for_each_stripe(env, lo, th, + lod_obj_stripe_attr_set_cb, &data); } if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) && @@ -1194,24 +1244,33 @@ static int lod_attr_set(const struct lu_env *env, RETURN(rc); } - if (lo->ldo_stripenr == 0) + if (!lod_obj_is_striped(dt)) RETURN(0); /* * if object is striped, apply changes to all the stripes */ - LASSERT(lo->ldo_stripe); - for (i = 0; i < lo->ldo_stripenr; i++) { - if (unlikely(lo->ldo_stripe[i] == NULL)) - continue; + if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) { + LASSERT(lo->ldo_stripe); + for (i = 0; i < lo->ldo_dir_stripenr; i++) { + if (unlikely(lo->ldo_stripe[i] == NULL)) + continue; - if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && - (dt_object_exists(lo->ldo_stripe[i]) == 0)) - continue; + if ((dt_object_exists(lo->ldo_stripe[i]) == 0)) + continue; - rc = lod_sub_object_attr_set(env, lo->ldo_stripe[i], attr, th); - if (rc != 0) - break; + rc = lod_sub_object_attr_set(env, lo->ldo_stripe[i], + attr, th); + if (rc != 0) + break; + } + } else { + struct lod_obj_stripe_cb_data data; + + data.locd_attr = attr; + data.locd_declare = false; + rc = lod_obj_for_each_stripe(env, lo, th, + lod_obj_stripe_attr_set_cb, &data); } if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE) && @@ -1267,9 +1326,10 @@ static int lod_attr_set(const struct lu_env *env, static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt, struct lu_buf *buf, const char *name) { - struct lod_thread_info *info = lod_env_info(env); - struct lod_device *dev = lu2lod_dev(dt->do_lu.lo_dev); - int rc, is_root; + struct lod_thread_info *info = lod_env_info(env); + struct lod_device *dev = lu2lod_dev(dt->do_lu.lo_dev); + int is_root; + int rc; ENTRY; rc = dt_xattr_get(env, dt_object_child(dt), buf, name); @@ -1313,6 +1373,8 @@ static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt, RETURN(rc); /* + * XXX: Only used by lfsck + * * lod returns default striping on the real root of the device * this is like the root stores default striping for the whole * filesystem. historically we've been using a different approach @@ -1417,8 +1479,8 @@ static int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt, ENTRY; LASSERT(lo->ldo_dir_striped != 0); - LASSERT(lo->ldo_stripenr > 0); - stripe_count = lo->ldo_stripenr; + LASSERT(lo->ldo_dir_stripenr > 0); + stripe_count = lo->ldo_dir_stripenr; /* Only store the LMV EA heahder on the disk. */ if (info->lti_ea_store_size < sizeof(*lmm1)) { rc = lod_ea_store_resize(info, sizeof(*lmm1)); @@ -1528,8 +1590,8 @@ int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo, } out: lo->ldo_stripe = stripe; - lo->ldo_stripenr = le32_to_cpu(lmv1->lmv_stripe_count); - lo->ldo_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count); + lo->ldo_dir_stripenr = le32_to_cpu(lmv1->lmv_stripe_count); + lo->ldo_dir_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count); if (rc != 0) lod_object_free_striping(env, lo); @@ -1589,7 +1651,7 @@ static int lod_dir_declare_create_stripes(const struct lu_env *env, GOTO(out, rc = -EINVAL); rec->rec_type = S_IFDIR; - for (i = 0; i < lo->ldo_stripenr; i++) { + for (i = 0; i < lo->ldo_dir_stripenr; i++) { struct dt_object *dto = lo->ldo_stripe[i]; char *stripe_name = info->lti_key; struct lu_name *sname; @@ -1817,10 +1879,10 @@ static int lod_prep_md_striped_create(const struct lu_env *env, lo->ldo_dir_striped = 1; lo->ldo_stripe = stripe; - lo->ldo_stripenr = i; - lo->ldo_stripes_allocated = stripe_count; + lo->ldo_dir_stripenr = i; + lo->ldo_dir_stripes_allocated = stripe_count; - if (lo->ldo_stripenr == 0) + if (lo->ldo_dir_stripenr == 0) GOTO(out_put, rc = -ENOSPC); rc = lod_dir_declare_create_stripes(env, dt, attr, dof, th); @@ -1833,8 +1895,8 @@ out_put: if (stripe[i] != NULL) dt_object_put(env, stripe[i]); OBD_FREE(stripe, sizeof(stripe[0]) * stripe_count); - lo->ldo_stripenr = 0; - lo->ldo_stripes_allocated = 0; + lo->ldo_dir_stripenr = 0; + lo->ldo_dir_stripes_allocated = 0; lo->ldo_stripe = NULL; } @@ -1933,6 +1995,10 @@ static int lod_dir_declare_xattr_set(const struct lu_env *env, rc = lod_verify_md_striping(d, lum); if (rc != 0) RETURN(rc); + } else if (strcmp(name, XATTR_NAME_LOV) == 0) { + rc = lod_verify_striping(d, buf, false, 0); + if (rc != 0) + RETURN(rc); } rc = lod_sub_object_declare_xattr_set(env, next, buf, name, fl, th); @@ -1951,10 +2017,10 @@ static int lod_dir_declare_xattr_set(const struct lu_env *env, if (rc != 0) RETURN(rc); - if (lo->ldo_stripenr == 0) + if (lo->ldo_dir_stripenr == 0) RETURN(0); - for (i = 0; i < lo->ldo_stripenr; i++) { + for (i = 0; i < lo->ldo_dir_stripenr; i++) { LASSERT(lo->ldo_stripe[i]); rc = lod_sub_object_declare_xattr_set(env, lo->ldo_stripe[i], @@ -1966,6 +2032,39 @@ static int lod_dir_declare_xattr_set(const struct lu_env *env, RETURN(rc); } +static int +lod_obj_stripe_replace_parent_fid_cb(const struct lu_env *env, + struct lod_object *lo, + struct dt_object *dt, struct thandle *th, + int stripe_idx, + struct lod_obj_stripe_cb_data *data) +{ + struct lod_thread_info *info = lod_env_info(env); + struct dt_object *obj = &lo->ldo_obj; + struct lu_buf *buf = &info->lti_buf; + struct filter_fid *ff = buf->lb_buf; + int rc; + + rc = dt_xattr_get(env, dt, buf, XATTR_NAME_FID); + if (rc < 0) + return 0; + + fid_le_to_cpu(&ff->ff_parent, &ff->ff_parent); + ff->ff_parent.f_seq = lu_object_fid(&obj->do_lu)->f_seq; + ff->ff_parent.f_oid = lu_object_fid(&obj->do_lu)->f_oid; + fid_cpu_to_le(&ff->ff_parent, &ff->ff_parent); + + if (data->locd_declare) + rc = lod_sub_object_declare_xattr_set(env, dt, buf, + XATTR_NAME_FID, + LU_XATTR_REPLACE, th); + else + rc = lod_sub_object_xattr_set(env, dt, buf, XATTR_NAME_FID, + LU_XATTR_REPLACE, th); + + return rc; +} + /** * Reset parent FID on OST object * @@ -1979,7 +2078,7 @@ static int lod_dir_declare_xattr_set(const struct lu_env *env, * \param[in] declare if it is declare * * \retval 0 if reset succeeds - * \retval negative errno if reset fais + * \retval negative errno if reset fails */ static int lod_object_replace_parent_fid(const struct lu_env *env, struct dt_object *dt, @@ -1989,7 +2088,8 @@ static int lod_object_replace_parent_fid(const struct lu_env *env, struct lod_thread_info *info = lod_env_info(env); struct lu_buf *buf = &info->lti_buf; struct filter_fid *ff; - int i, rc; + struct lod_obj_stripe_cb_data data; + int rc; ENTRY; LASSERT(S_ISREG(dt->do_lu.lo_header->loh_attr)); @@ -1999,7 +2099,7 @@ static int lod_object_replace_parent_fid(const struct lu_env *env, if (rc != 0) RETURN(rc); - if (lo->ldo_stripenr == 0) + if (!lod_obj_is_striped(dt)) RETURN(0); if (info->lti_ea_store_size < sizeof(*ff)) { @@ -2011,37 +2111,420 @@ static int lod_object_replace_parent_fid(const struct lu_env *env, buf->lb_buf = info->lti_ea_store; buf->lb_len = info->lti_ea_store_size; - for (i = 0; i < lo->ldo_stripenr; i++) { - if (lo->ldo_stripe[i] == NULL) - continue; + data.locd_declare = declare; + rc = lod_obj_for_each_stripe(env, lo, th, + lod_obj_stripe_replace_parent_fid_cb, &data); - rc = dt_xattr_get(env, lo->ldo_stripe[i], buf, - XATTR_NAME_FID); - if (rc < 0) { - rc = 0; + RETURN(rc); +} + +/** + * Declare component add. The xattr name is XATTR_LUSTRE_LOV.add, and + * the xattr value is binary lov_comp_md_v1 which contains component(s) + * to be added. + * + * \param[in] env execution environment + * \param[in] dt dt_object to add components on + * \param[in] buf buffer contains components to be added + * \parem[in] th thandle + * + * \retval 0 on success + * \retval negative errno on failure + */ +static int lod_declare_layout_add(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + struct thandle *th) +{ + struct lod_layout_component *comp_array, *lod_comp; + struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); + struct lov_desc *desc = &d->lod_desc; + struct lod_object *lo = lod_dt_obj(dt); + struct lov_user_md_v1 *v1; + struct lov_user_md_v3 *v3; + struct lov_comp_md_v1 *comp_v1 = buf->lb_buf; + struct lu_extent *ext; + __u32 magic; + __u64 prev_end; + int i, rc, array_cnt; + ENTRY; + + LASSERT(lo->ldo_is_composite); + + magic = comp_v1->lcm_magic; + /* Replay request, see comment for LOV_MAGIC_DEF */ + if (unlikely(le32_to_cpu(magic) == LOV_MAGIC_COMP_V1_DEF)) { + struct dt_object *next = dt_object_child(dt); + + lod_object_free_striping(env, lo); + rc = lod_use_defined_striping(env, lo, buf); + if (rc == 0) { + lo->ldo_comp_cached = 1; + rc = lod_sub_object_declare_xattr_set(env, next, buf, + XATTR_NAME_LOV, + 0, th); + } + RETURN(rc); + } + + prev_end = lo->ldo_comp_entries[lo->ldo_comp_cnt - 1].llc_extent.e_end; + rc = lod_verify_striping(d, buf, false, prev_end); + if (rc != 0) + RETURN(rc); + + if (magic == __swab32(LOV_USER_MAGIC_COMP_V1)) { + lustre_swab_lov_comp_md_v1(comp_v1); + magic = comp_v1->lcm_magic; + } + + if (magic != LOV_USER_MAGIC_COMP_V1) + RETURN(-EINVAL); + + array_cnt = lo->ldo_comp_cnt + comp_v1->lcm_entry_count; + OBD_ALLOC(comp_array, sizeof(*comp_array) * array_cnt); + if (comp_array == NULL) + RETURN(-ENOMEM); + + memcpy(comp_array, lo->ldo_comp_entries, + sizeof(*comp_array) * lo->ldo_comp_cnt); + + for (i = 0; i < comp_v1->lcm_entry_count; i++) { + v1 = (struct lov_user_md *)((char *)comp_v1 + + comp_v1->lcm_entries[i].lcme_offset); + ext = &comp_v1->lcm_entries[i].lcme_extent; + + lod_comp = &comp_array[lo->ldo_comp_cnt + i]; + lod_comp->llc_extent.e_start = ext->e_start; + lod_comp->llc_extent.e_end = ext->e_end; + lod_comp->llc_stripe_offset = v1->lmm_stripe_offset; + + lod_comp->llc_stripenr = v1->lmm_stripe_count; + if (lod_comp->llc_stripenr <= 0) + lod_comp->llc_stripenr = desc->ld_default_stripe_count; + lod_comp->llc_stripe_size = v1->lmm_stripe_size; + if (lod_comp->llc_stripe_size <= 0) + lod_comp->llc_stripe_size = + desc->ld_default_stripe_size; + + if (v1->lmm_magic == LOV_USER_MAGIC_V3) { + int len; + v3 = (struct lov_user_md_v3 *) v1; + if (v3->lmm_pool_name[0] != '\0') { + len = strlen(v3->lmm_pool_name); + OBD_ALLOC(lod_comp->llc_pool, len + 1); + if (lod_comp->llc_pool == NULL) + GOTO(error, rc = -ENOMEM); + strncpy(lod_comp->llc_pool, v3->lmm_pool_name, + len + 1); + } + } + } + + OBD_FREE(lo->ldo_comp_entries, sizeof(*lod_comp) * lo->ldo_comp_cnt); + lo->ldo_comp_entries = comp_array; + lo->ldo_comp_cnt = array_cnt; + /* No need to increase layout generation here, it will be increased + * later when generating component ID for the new components */ + + rc = lod_declare_striped_object(env, dt, NULL, NULL, th); + RETURN(rc); + +error: + for (i = lo->ldo_comp_cnt; i < array_cnt; i++) { + lod_comp = &comp_array[i]; + if (lod_comp->llc_pool != NULL) { + OBD_FREE(lod_comp->llc_pool, + strlen(lod_comp->llc_pool) + 1); + lod_comp->llc_pool = NULL; + } + } + OBD_FREE(comp_array, sizeof(*comp_array) * array_cnt); + RETURN(rc); +} + +static int lod_comp_md_size(struct lod_object *lo, bool is_dir) +{ + int magic, size = 0, i; + struct lod_layout_component *comp_entries; + __u16 comp_cnt; + bool is_composite; + + if (is_dir) { + comp_cnt = lo->ldo_def_striping->lds_def_comp_cnt; + comp_entries = lo->ldo_def_striping->lds_def_comp_entries; + is_composite = + lo->ldo_def_striping->lds_def_striping_is_composite; + } else { + comp_cnt = lo->ldo_comp_cnt; + comp_entries = lo->ldo_comp_entries; + is_composite = lo->ldo_is_composite; + } + + + LASSERT(comp_cnt != 0 && comp_entries != NULL); + if (is_composite) { + size = sizeof(struct lov_comp_md_v1) + + sizeof(struct lov_comp_md_entry_v1) * comp_cnt; + LASSERT(size % sizeof(__u64) == 0); + } + + for (i = 0; i < comp_cnt; i++) { + magic = comp_entries[i].llc_pool ? LOV_MAGIC_V3 : LOV_MAGIC_V1; + + size += lov_user_md_size( + is_dir ? 0 : comp_entries[i].llc_stripenr, + magic); + LASSERT(size % sizeof(__u64) == 0); + } + return size; +} + +/** + * Declare component set. The xattr is name XATTR_LUSTRE_LOV.set.$field, + * the '$field' can only be 'flags' now. The xattr value is binary + * lov_comp_md_v1 which contains the component ID(s) and the value of + * the field to be modified. + * + * \param[in] env execution environment + * \param[in] dt dt_object to be modified + * \param[in] op operation string, like "set.flags" + * \param[in] buf buffer contains components to be set + * \parem[in] th thandle + * + * \retval 0 on success + * \retval negative errno on failure + */ +static int lod_declare_layout_set(const struct lu_env *env, + struct dt_object *dt, + char *op, const struct lu_buf *buf, + struct thandle *th) +{ + struct lod_layout_component *lod_comp; + struct lod_thread_info *info = lod_env_info(env); + struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); + struct lod_object *lo = lod_dt_obj(dt); + struct lov_comp_md_v1 *comp_v1 = buf->lb_buf; + __u32 magic, id; + int i, j, rc; + bool changed = false; + ENTRY; + + if (strcmp(op, "set.flags") != 0) { + CDEBUG(D_LAYOUT, "%s: operation (%s) not supported.\n", + lod2obd(d)->obd_name, op); + RETURN(-ENOTSUPP); + } + + magic = comp_v1->lcm_magic; + if (magic == __swab32(LOV_USER_MAGIC_COMP_V1)) { + lustre_swab_lov_comp_md_v1(comp_v1); + magic = comp_v1->lcm_magic; + } + + if (magic != LOV_USER_MAGIC_COMP_V1) + RETURN(-EINVAL); + + if (comp_v1->lcm_entry_count == 0) { + CDEBUG(D_LAYOUT, "%s: entry count is zero.\n", + lod2obd(d)->obd_name); + RETURN(-EINVAL); + } + + for (i = 0; i < comp_v1->lcm_entry_count; i++) { + id = comp_v1->lcm_entries[i].lcme_id; + + for (j = 0; j < lo->ldo_comp_cnt; j++) { + lod_comp = &lo->ldo_comp_entries[j]; + if (id == lod_comp->llc_id || id == LCME_ID_ALL) { + lod_comp->llc_flags = + comp_v1->lcm_entries[i].lcme_flags; + changed = true; + } + } + } + + if (!changed) { + CDEBUG(D_LAYOUT, "%s: requested component(s) not found.\n", + lod2obd(d)->obd_name); + RETURN(-EINVAL); + } + + lod_obj_inc_layout_gen(lo); + + info->lti_buf.lb_len = lod_comp_md_size(lo, false); + rc = lod_sub_object_declare_xattr_set(env, dt, &info->lti_buf, + XATTR_NAME_LOV, 0, th); + RETURN(rc); +} + +/** + * Declare component deletion. The xattr name is XATTR_LUSTRE_LOV.del, + * and the xattr value is a unique component ID or a special lcme_id. + * + * \param[in] env execution environment + * \param[in] dt dt_object to be operated on + * \param[in] buf buffer contains component ID or lcme_id + * \parem[in] th thandle + * + * \retval 0 on success + * \retval negative errno on failure + */ +static int lod_declare_layout_del(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct dt_object *next = dt_object_child(dt); + struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); + struct lod_object *lo = lod_dt_obj(dt); + struct lu_attr *attr = &lod_env_info(env)->lti_attr; + __u32 id; + int rc, i, j, left; + ENTRY; + + LASSERT(lo->ldo_is_composite); + + id = *(__u32 *)buf->lb_buf; + if (id == 0 || id == LCME_ID_NONE) { + CDEBUG(D_LAYOUT, "%s: invalid component id %#x\n", + lod2obd(d)->obd_name, id); + RETURN(-EINVAL); + } + + left = lo->ldo_comp_cnt; + if (left <= 0) + RETURN(-EINVAL); + + for (i = (lo->ldo_comp_cnt - 1); i >= 0; i--) { + struct lod_layout_component *lod_comp; + + lod_comp = &lo->ldo_comp_entries[i]; + + if (id <= LCME_ID_MAX && id != lod_comp->llc_id) + continue; + else if (id > LCME_ID_MAX && id < LCME_ID_ALL && + !(id & lod_comp->llc_flags)) continue; + + if (left != (i + 1)) { + CDEBUG(D_LAYOUT, "%s: this deletion will create " + "a hole.\n", lod2obd(d)->obd_name); + RETURN(-EINVAL); } + left--; - ff = buf->lb_buf; - fid_le_to_cpu(&ff->ff_parent, &ff->ff_parent); - ff->ff_parent.f_seq = lu_object_fid(&dt->do_lu)->f_seq; - ff->ff_parent.f_oid = lu_object_fid(&dt->do_lu)->f_oid; - fid_cpu_to_le(&ff->ff_parent, &ff->ff_parent); - - if (declare) { - rc = lod_sub_object_declare_xattr_set(env, - lo->ldo_stripe[i], buf, - XATTR_NAME_FID, - LU_XATTR_REPLACE, th); - } else { - rc = lod_sub_object_xattr_set(env, lo->ldo_stripe[i], - buf, XATTR_NAME_FID, - LU_XATTR_REPLACE, th); + /* Mark the component as deleted */ + lod_comp->llc_id = LCME_ID_INVAL; + + /* Not instantiated component */ + if (lod_comp->llc_stripe == NULL) + continue; + + LASSERT(lod_comp->llc_stripenr > 0); + for (j = 0; j < lod_comp->llc_stripenr; j++) { + struct dt_object *obj = lod_comp->llc_stripe[j]; + + if (obj == NULL) + continue; + rc = lod_sub_object_declare_destroy(env, obj, th); + if (rc) + RETURN(rc); } - if (rc < 0) - break; } + LASSERTF(left >= 0, "left = %d\n", left); + if (left == lo->ldo_comp_cnt) { + CDEBUG(D_LAYOUT, "%s: requested component id:%#x not found\n", + lod2obd(d)->obd_name, id); + RETURN(-EINVAL); + } + + memset(attr, 0, sizeof(*attr)); + attr->la_valid = LA_SIZE; + rc = lod_sub_object_declare_attr_set(env, next, attr, th); + if (rc) + RETURN(rc); + + if (left > 0) { + info->lti_buf.lb_len = lod_comp_md_size(lo, false); + rc = lod_sub_object_declare_xattr_set(env, next, &info->lti_buf, + XATTR_NAME_LOV, 0, th); + } else { + rc = lod_sub_object_declare_xattr_del(env, next, XATTR_NAME_LOV, + th); + } + + RETURN(rc); +} + +/** + * Declare layout add/set/del operations issued by special xattr names: + * + * XATTR_LUSTRE_LOV.add add component(s) to existing file + * XATTR_LUSTRE_LOV.del delete component(s) from existing file + * XATTR_LUSTRE_LOV.set.$field set specified field of certain component(s) + * + * \param[in] env execution environment + * \param[in] dt object + * \param[in] name name of xattr + * \param[in] buf lu_buf contains xattr value + * \param[in] th transaction handle + * + * \retval 0 on success + * \retval negative if failed + */ +static int lod_declare_modify_layout(const struct lu_env *env, + struct dt_object *dt, + const char *name, + const struct lu_buf *buf, + struct thandle *th) +{ + struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object *next = dt_object_child(&lo->ldo_obj); + char *op; + int rc, len = strlen(XATTR_LUSTRE_LOV); + ENTRY; + + LASSERT(dt_object_exists(dt)); + + if (strlen(name) <= len || name[len] != '.') { + CDEBUG(D_LAYOUT, "%s: invalid xattr name: %s\n", + lod2obd(d)->obd_name, name); + RETURN(-EINVAL); + } + len++; + + dt_write_lock(env, next, 0); + rc = lod_load_striping_locked(env, lo); + if (rc) + GOTO(unlock, rc); + + /* the layout to be modified must be a composite layout */ + if (!lo->ldo_is_composite) { + CDEBUG(D_LAYOUT, "%s: object "DFID" isn't a composite file.\n", + lod2obd(d)->obd_name, PFID(lu_object_fid(&dt->do_lu))); + GOTO(unlock, rc = -EINVAL); + } + + op = (char *)name + len; + if (strcmp(op, "add") == 0) { + rc = lod_declare_layout_add(env, dt, buf, th); + } else if (strcmp(op, "del") == 0) { + rc = lod_declare_layout_del(env, dt, buf, th); + } else if (strncmp(op, "set", strlen("set")) == 0) { + rc = lod_declare_layout_set(env, dt, op, buf, th); + } else { + CDEBUG(D_LAYOUT, "%s: unsupported xattr name:%s\n", + lod2obd(d)->obd_name, name); + GOTO(unlock, rc = -ENOTSUPP); + } +unlock: + if (rc) + lod_object_free_striping(env, lo); + dt_write_unlock(env, next); + RETURN(rc); } @@ -2067,17 +2550,18 @@ static int lod_declare_xattr_set(const struct lu_env *env, int rc; ENTRY; - /* - * allow to declare predefined striping on a new (!mode) object - * which is supposed to be replay of regular file creation - * (when LOV setting is declared) - * LU_XATTR_REPLACE is set to indicate a layout swap - */ mode = dt->do_lu.lo_header->loh_attr & S_IFMT; - if ((S_ISREG(mode) || mode == 0) && strcmp(name, XATTR_NAME_LOV) == 0 && - !(fl & LU_XATTR_REPLACE)) { + if ((S_ISREG(mode) || mode == 0) && !(fl & LU_XATTR_REPLACE) && + (strcmp(name, XATTR_NAME_LOV) == 0 || + strcmp(name, XATTR_LUSTRE_LOV) == 0)) { /* - * this is a request to manipulate object's striping + * this is a request to create object's striping. + * + * allow to declare predefined striping on a new (!mode) object + * which is supposed to be replay of regular file creation + * (when LOV setting is declared) + * + * LU_XATTR_REPLACE is set to indicate a layout swap */ if (dt_object_exists(dt)) { rc = dt_attr_get(env, next, attr); @@ -2089,6 +2573,18 @@ static int lod_declare_xattr_set(const struct lu_env *env, attr->la_mode = S_IFREG; } rc = lod_declare_striped_object(env, dt, attr, buf, th); + } else if (S_ISREG(mode) && + strlen(name) > strlen(XATTR_LUSTRE_LOV) + 1 && + strncmp(name, XATTR_LUSTRE_LOV, + strlen(XATTR_LUSTRE_LOV)) == 0) { + /* + * this is a request to modify object's striping. + * add/set/del component(s). + */ + if (!dt_object_exists(dt)) + RETURN(-ENOENT); + + rc = lod_declare_modify_layout(env, dt, name, buf, th); } else if (S_ISDIR(mode)) { rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th); } else if (strcmp(name, XATTR_NAME_FID) == 0) { @@ -2136,10 +2632,10 @@ static int lod_xattr_set_internal(const struct lu_env *env, * it will confuse the fid2path process(see mdt_path_current()). * The linkEA between master and sub-stripes is set in * lod_xattr_set_lmv(). */ - if (lo->ldo_stripenr == 0 || strcmp(name, XATTR_NAME_LINK) == 0) + if (lo->ldo_dir_stripenr == 0 || strcmp(name, XATTR_NAME_LINK) == 0) RETURN(0); - for (i = 0; i < lo->ldo_stripenr; i++) { + for (i = 0; i < lo->ldo_dir_stripenr; i++) { LASSERT(lo->ldo_stripe[i]); rc = lod_sub_object_xattr_set(env, lo->ldo_stripe[i], buf, name, @@ -2178,10 +2674,10 @@ static int lod_xattr_del_internal(const struct lu_env *env, if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr)) RETURN(rc); - if (lo->ldo_stripenr == 0) + if (lo->ldo_dir_stripenr == 0) RETURN(rc); - for (i = 0; i < lo->ldo_stripenr; i++) { + for (i = 0; i < lo->ldo_dir_stripenr; i++) { LASSERT(lo->ldo_stripe[i]); rc = lod_sub_object_xattr_del(env, lo->ldo_stripe[i], name, @@ -2217,38 +2713,47 @@ static int lod_xattr_set_lov_on_dir(const struct lu_env *env, const char *name, int fl, struct thandle *th) { - struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); struct lov_user_md_v1 *lum; struct lov_user_md_v3 *v3 = NULL; const char *pool_name = NULL; int rc; + bool is_del; ENTRY; LASSERT(buf != NULL && buf->lb_buf != NULL); lum = buf->lb_buf; - rc = lod_verify_striping(d, buf, false); - if (rc) - RETURN(rc); - - if (lum->lmm_magic == LOV_USER_MAGIC_V3) { + switch (lum->lmm_magic) { + case LOV_USER_MAGIC_V3: v3 = buf->lb_buf; if (v3->lmm_pool_name[0] != '\0') pool_name = v3->lmm_pool_name; + /* fall through */ + case LOV_USER_MAGIC_V1: + /* if { size, offset, count } = { 0, -1, 0 } and no pool + * (i.e. all default values specified) then delete default + * striping from dir. */ + CDEBUG(D_LAYOUT, + "set default striping: sz %u # %u offset %d %s %s\n", + (unsigned)lum->lmm_stripe_size, + (unsigned)lum->lmm_stripe_count, + (int)lum->lmm_stripe_offset, + v3 ? "from" : "", v3 ? v3->lmm_pool_name : ""); + + is_del = LOVEA_DELETE_VALUES(lum->lmm_stripe_size, + lum->lmm_stripe_count, + lum->lmm_stripe_offset, + pool_name); + break; + case LOV_USER_MAGIC_COMP_V1: + is_del = false; + break; + default: + CERROR("Invalid magic %x\n", lum->lmm_magic); + RETURN(-EINVAL); } - /* if { size, offset, count } = { 0, -1, 0 } and no pool - * (i.e. all default values specified) then delete default - * striping from dir. */ - CDEBUG(D_OTHER, - "set default striping: sz %u # %u offset %d %s %s\n", - (unsigned)lum->lmm_stripe_size, - (unsigned)lum->lmm_stripe_count, - (int)lum->lmm_stripe_offset, - v3 ? "from" : "", v3 ? v3->lmm_pool_name : ""); - - if (LOVEA_DELETE_VALUES(lum->lmm_stripe_size, lum->lmm_stripe_count, - lum->lmm_stripe_offset, pool_name)) { + if (is_del) { rc = lod_xattr_del_internal(env, dt, name, th); if (rc == -ENODATA) rc = 0; @@ -2350,7 +2855,7 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, /* The stripes are supposed to be allocated in declare phase, * if there are no stripes being allocated, it will skip */ - if (lo->ldo_stripenr == 0) + if (lo->ldo_dir_stripenr == 0) RETURN(0); rc = dt_attr_get(env, dt_object_child(dt), attr); @@ -2375,7 +2880,7 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, slave_lmv_buf.lb_len = sizeof(*slave_lmm); rec->rec_type = S_IFDIR; - for (i = 0; i < lo->ldo_stripenr; i++) { + for (i = 0; i < lo->ldo_dir_stripenr; i++) { struct dt_object *dto; char *stripe_name = info->lti_key; struct lu_name *sname; @@ -2501,7 +3006,6 @@ static int lod_dir_striping_create_internal(const struct lu_env *env, struct lod_thread_info *info = lod_env_info(env); struct lod_object *lo = lod_dt_obj(dt); const struct lod_default_striping *lds = lo->ldo_def_striping; - const char *poolname = NULL; int rc; ENTRY; @@ -2509,10 +3013,10 @@ static int lod_dir_striping_create_internal(const struct lu_env *env, lds->lds_def_striping_set || lds->lds_dir_def_striping_set)); - if (!LMVEA_DELETE_VALUES(lo->ldo_stripenr, + if (!LMVEA_DELETE_VALUES(lo->ldo_dir_stripenr, lo->ldo_dir_stripe_offset)) { struct lmv_user_md_v1 *v1 = info->lti_ea_store; - int stripe_count = lo->ldo_stripenr; + int stripe_count = lo->ldo_dir_stripenr; if (info->lti_ea_store_size < sizeof(*v1)) { rc = lod_ea_store_resize(info, sizeof(*v1)); @@ -2576,35 +3080,25 @@ static int lod_dir_striping_create_internal(const struct lu_env *env, RETURN(rc); } - if (lds != NULL && lds->lds_def_pool[0] != '\0') - poolname = lds->lds_def_pool; - /* Transfer default LOV striping from the parent */ if (lds != NULL && lds->lds_def_striping_set && - !LOVEA_DELETE_VALUES(lds->lds_def_stripe_size, - lds->lds_def_stripenr, - lds->lds_def_stripe_offset, - poolname)) { - struct lov_user_md_v3 *v3 = info->lti_ea_store; - - if (info->lti_ea_store_size < sizeof(*v3)) { - rc = lod_ea_store_resize(info, sizeof(*v3)); + lds->lds_def_comp_cnt != 0) { + struct lov_mds_md *lmm; + int lmm_size = lod_comp_md_size(lo, true); + + if (info->lti_ea_store_size < lmm_size) { + rc = lod_ea_store_resize(info, lmm_size); if (rc != 0) RETURN(rc); - v3 = info->lti_ea_store; } + lmm = info->lti_ea_store; - memset(v3, 0, sizeof(*v3)); - v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3); - v3->lmm_stripe_count = cpu_to_le16(lds->lds_def_stripenr); - v3->lmm_stripe_offset = cpu_to_le16(lds->lds_def_stripe_offset); - v3->lmm_stripe_size = cpu_to_le32(lds->lds_def_stripe_size); - if (poolname != NULL) - strlcpy(v3->lmm_pool_name, poolname, - sizeof(v3->lmm_pool_name)); + rc = lod_generate_lovea(env, lo, lmm, &lmm_size, true); + if (rc != 0) + RETURN(rc); - info->lti_buf.lb_buf = v3; - info->lti_buf.lb_len = sizeof(*v3); + info->lti_buf.lb_buf = lmm; + info->lti_buf.lb_len = lmm_size; if (declare) rc = lod_dir_declare_xattr_set(env, dt, &info->lti_buf, @@ -2638,6 +3132,154 @@ static int lod_dir_striping_create(const struct lu_env *env, } /** + * Make LOV EA for striped object. + * + * Generate striping information and store it in the LOV EA of the given + * object. The caller must ensure nobody else is calling the function + * against the object concurrently. The transaction must be started. + * FLDB service must be running as well; it's used to map FID to the target, + * which is stored in LOV EA. + * + * \param[in] env execution environment for this thread + * \param[in] lo LOD object + * \param[in] th transaction handle + * + * \retval 0 if LOV EA is stored successfully + * \retval negative error number on failure + */ +static int lod_generate_and_set_lovea(const struct lu_env *env, + struct lod_object *lo, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct dt_object *next = dt_object_child(&lo->ldo_obj); + struct lov_mds_md_v1 *lmm; + int rc, lmm_size; + ENTRY; + + LASSERT(lo); + + if (lo->ldo_comp_cnt == 0) { + lod_object_free_striping(env, lo); + rc = lod_sub_object_xattr_del(env, next, XATTR_NAME_LOV, th); + RETURN(rc); + } + + lmm_size = lod_comp_md_size(lo, false); + if (info->lti_ea_store_size < lmm_size) { + rc = lod_ea_store_resize(info, lmm_size); + if (rc) + RETURN(rc); + } + lmm = info->lti_ea_store; + + rc = lod_generate_lovea(env, lo, lmm, &lmm_size, false); + if (rc) + RETURN(rc); + + info->lti_buf.lb_buf = lmm; + info->lti_buf.lb_len = lmm_size; + rc = lod_sub_object_xattr_set(env, next, &info->lti_buf, + XATTR_NAME_LOV, 0, th); + RETURN(rc); +} + +/** + * Delete layout component(s) + * + * \param[in] env execution environment for this thread + * \param[in] dt object + * \param[in] th transaction handle + * + * \retval 0 on success + * \retval negative error number on failure + */ +static int lod_layout_del(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) +{ + struct lod_layout_component *lod_comp; + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object *next = dt_object_child(dt); + struct lu_attr *attr = &lod_env_info(env)->lti_attr; + int rc, i, j, left; + + LASSERT(lo->ldo_is_composite); + LASSERT(lo->ldo_comp_cnt > 0 && lo->ldo_comp_entries != NULL); + + left = lo->ldo_comp_cnt; + for (i = (lo->ldo_comp_cnt - 1); i >= 0; i--) { + lod_comp = &lo->ldo_comp_entries[i]; + + if (lod_comp->llc_id != LCME_ID_INVAL) + break; + left--; + + /* Not instantiated component */ + if (lod_comp->llc_stripe == NULL) + continue; + + LASSERT(lod_comp->llc_stripenr > 0); + for (j = 0; j < lod_comp->llc_stripenr; j++) { + struct dt_object *obj = lod_comp->llc_stripe[j]; + + if (obj == NULL) + continue; + rc = lod_sub_object_destroy(env, obj, th); + if (rc) + GOTO(out, rc); + + lu_object_put(env, &obj->do_lu); + lod_comp->llc_stripe[j] = NULL; + } + OBD_FREE(lod_comp->llc_stripe, sizeof(struct dt_object *) * + lod_comp->llc_stripes_allocated); + lod_comp->llc_stripe = NULL; + lod_comp->llc_stripes_allocated = 0; + lod_obj_set_pool(lo, i, NULL); + } + + LASSERTF(left >= 0 && left < lo->ldo_comp_cnt, "left = %d\n", left); + if (left > 0) { + struct lod_layout_component *comp_array; + + OBD_ALLOC(comp_array, sizeof(*comp_array) * left); + if (comp_array == NULL) + GOTO(out, rc = -ENOMEM); + + memcpy(&comp_array[0], &lo->ldo_comp_entries[0], + sizeof(*comp_array) * left); + + OBD_FREE(lo->ldo_comp_entries, + sizeof(*comp_array) * lo->ldo_comp_cnt); + lo->ldo_comp_entries = comp_array; + lo->ldo_comp_cnt = left; + lod_obj_inc_layout_gen(lo); + } else { + lod_free_comp_entries(lo); + } + + LASSERT(dt_object_exists(dt)); + rc = dt_attr_get(env, next, attr); + if (rc) + GOTO(out, rc); + + if (attr->la_size > 0) { + attr->la_size = 0; + attr->la_valid = LA_SIZE; + rc = lod_sub_object_attr_set(env, next, attr, th); + if (rc) + GOTO(out, rc); + } + + rc = lod_generate_and_set_lovea(env, lo, th); + EXIT; +out: + if (rc) + lod_object_free_striping(env, lo); + return rc; +} + +/** * Implementation of dt_object_operations::do_xattr_set. * * Sets specified extended attribute on the object. Three types of EAs are @@ -2689,7 +3331,9 @@ static int lod_xattr_set(const struct lu_env *env, th); RETURN(rc); } else if (S_ISREG(dt->do_lu.lo_header->loh_attr) && - !strcmp(name, XATTR_NAME_LOV)) { + (!strcmp(name, XATTR_NAME_LOV) || + !strncmp(name, XATTR_LUSTRE_LOV, + strlen(XATTR_LUSTRE_LOV)))) { /* in case of lov EA swap, just set it * if not, it is a replay so check striping match what we * already have during req replay, declare_xattr_set() @@ -2708,7 +3352,25 @@ static int lod_xattr_set(const struct lu_env *env, * and does not need creating each stripes. */ rc = lod_sub_object_xattr_set(env, next, buf, name, fl, th); + } else if (strcmp(name, XATTR_LUSTRE_LOV".del") == 0) { + /* delete component(s) */ + LASSERT(lod_dt_obj(dt)->ldo_comp_cached); + rc = lod_layout_del(env, dt, th); } else { + /* + * When 'name' is XATTR_LUSTRE_LOV or XATTR_NAME_LOV, + * it's going to create create file with specified + * component(s), the striping must have not being + * cached in this case; + * + * Otherwise, it's going to add/change component(s) to + * an existing file, the striping must have been cached + * in this case. + */ + LASSERT(equi(!strcmp(name, XATTR_LUSTRE_LOV) || + !strcmp(name, XATTR_NAME_LOV), + !lod_dt_obj(dt)->ldo_comp_cached)); + rc = lod_striping_create(env, dt, NULL, NULL, th); } RETURN(rc); @@ -2752,10 +3414,10 @@ static int lod_declare_xattr_del(const struct lu_env *env, if (rc != 0) RETURN(rc); - if (lo->ldo_stripenr == 0) + if (lo->ldo_dir_stripenr == 0) RETURN(0); - for (i = 0; i < lo->ldo_stripenr; i++) { + for (i = 0; i < lo->ldo_dir_stripenr; i++) { LASSERT(lo->ldo_stripe[i]); rc = lod_sub_object_declare_xattr_del(env, lo->ldo_stripe[i], name, th); @@ -2790,10 +3452,10 @@ static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt, if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr)) RETURN(rc); - if (lo->ldo_stripenr == 0) + if (lo->ldo_dir_stripenr == 0) RETURN(0); - for (i = 0; i < lo->ldo_stripenr; i++) { + for (i = 0; i < lo->ldo_dir_stripenr; i++) { LASSERT(lo->ldo_stripe[i]); rc = lod_sub_object_xattr_del(env, lo->ldo_stripe[i], name, th); @@ -2839,14 +3501,20 @@ static int lod_get_default_lov_striping(const struct lu_env *env, struct lod_thread_info *info = lod_env_info(env); struct lov_user_md_v1 *v1 = NULL; struct lov_user_md_v3 *v3 = NULL; - int rc; + struct lov_comp_md_v1 *comp_v1 = NULL; + __u16 comp_cnt; + bool composite; + int rc, i; + ENTRY; + + lds->lds_def_striping_set = 0; rc = lod_get_lov_ea(env, lo); if (rc < 0) - return rc; + RETURN(rc); if (rc < (typeof(rc))sizeof(struct lov_user_md)) - return 0; + RETURN(0); v1 = info->lti_ea_store; if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1)) { @@ -2854,26 +3522,82 @@ static int lod_get_default_lov_striping(const struct lu_env *env, } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3)) { v3 = (struct lov_user_md_v3 *)v1; lustre_swab_lov_user_md_v3(v3); + } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_COMP_V1)) { + comp_v1 = (struct lov_comp_md_v1 *)v1; + lustre_swab_lov_comp_md_v1(comp_v1); } - if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1) - return 0; + if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1 && + v1->lmm_magic != LOV_MAGIC_COMP_V1) + RETURN(-ENOTSUPP); - if (v1->lmm_pattern != LOV_PATTERN_RAID0 && v1->lmm_pattern != 0) - return 0; + if (v1->lmm_magic == LOV_MAGIC_COMP_V1) { + comp_v1 = (struct lov_comp_md_v1 *)v1; + comp_cnt = comp_v1->lcm_entry_count; + if (comp_cnt == 0) + RETURN(-EINVAL); + composite = true; + } else { + comp_cnt = 1; + composite = false; + } - lds->lds_def_stripenr = v1->lmm_stripe_count; - lds->lds_def_stripe_size = v1->lmm_stripe_size; - lds->lds_def_stripe_offset = v1->lmm_stripe_offset; - lds->lds_def_striping_set = 1; - if (v1->lmm_magic == LOV_USER_MAGIC_V3) { - v3 = (struct lov_user_md_v3 *)v1; - if (v3->lmm_pool_name[0] != '\0') - strlcpy(lds->lds_def_pool, v3->lmm_pool_name, - sizeof(lds->lds_def_pool)); + /* realloc default comp entries if necessary */ + rc = lod_def_striping_comp_resize(lds, comp_cnt); + if (rc < 0) + RETURN(rc); + + lds->lds_def_comp_cnt = comp_cnt; + lds->lds_def_striping_is_composite = composite ? 1 : 0; + + for (i = 0; i < comp_cnt; i++) { + struct lod_layout_component *lod_comp; + struct lu_extent *ext; + char *pool; + + lod_comp = &lds->lds_def_comp_entries[i]; + /* + * reset lod_comp values, llc_stripes is always NULL in + * the default striping template, llc_pool will be reset + * later below. + */ + memset(lod_comp, 0, offsetof(typeof(*lod_comp), llc_pool)); + + if (composite) { + v1 = (struct lov_user_md *)((char *)comp_v1 + + comp_v1->lcm_entries[i].lcme_offset); + ext = &comp_v1->lcm_entries[i].lcme_extent; + lod_comp->llc_extent = *ext; + } + + if (v1->lmm_pattern != LOV_PATTERN_RAID0 && + v1->lmm_pattern != 0) { + lod_free_def_comp_entries(lds); + RETURN(-EINVAL); + } + + CDEBUG(D_LAYOUT, DFID" stripe_count=%d stripe_size=%d " + "stripe_offset=%d\n", + PFID(lu_object_fid(&lo->ldo_obj.do_lu)), + (int)v1->lmm_stripe_count, (int)v1->lmm_stripe_size, + (int)v1->lmm_stripe_offset); + + lod_comp->llc_stripenr = v1->lmm_stripe_count; + lod_comp->llc_stripe_size = v1->lmm_stripe_size; + lod_comp->llc_stripe_offset = v1->lmm_stripe_offset; + + pool = NULL; + if (v1->lmm_magic == LOV_USER_MAGIC_V3) { + /* XXX: sanity check here */ + v3 = (struct lov_user_md_v3 *) v1; + if (v3->lmm_pool_name[0] != '\0') + pool = v3->lmm_pool_name; + } + lod_set_def_pool(lds, i, pool); } - return 0; + lds->lds_def_striping_set = 1; + RETURN(rc); } /** @@ -2893,13 +3617,15 @@ static int lod_get_default_lmv_striping(const struct lu_env *env, struct lod_thread_info *info = lod_env_info(env); struct lmv_user_md_v1 *v1 = NULL; int rc; + ENTRY; + lds->lds_dir_def_striping_set = 0; rc = lod_get_default_lmv_ea(env, lo); if (rc < 0) - return rc; + RETURN(rc); if (rc < (typeof(rc))sizeof(struct lmv_user_md)) - return 0; + RETURN(0); v1 = info->lti_ea_store; @@ -2908,7 +3634,7 @@ static int lod_get_default_lmv_striping(const struct lu_env *env, lds->lds_dir_def_hash_type = le32_to_cpu(v1->lum_hash_type); lds->lds_dir_def_striping_set = 1; - return 0; + RETURN(0); } /** @@ -2927,11 +3653,13 @@ static int lod_get_default_striping(const struct lu_env *env, struct lod_object *lo, struct lod_default_striping *lds) { - int rc; + int rc, rc1; rc = lod_get_default_lov_striping(env, lo, lds); - if (rc == 0) - rc = lod_get_default_lmv_striping(env, lo, lds); + rc1 = lod_get_default_lmv_striping(env, lo, lds); + if (rc == 0 && rc1 < 0) + rc = rc1; + return rc; } @@ -2949,36 +3677,94 @@ static void lod_striping_from_default(struct lod_object *lo, const struct lod_default_striping *lds, umode_t mode) { + struct lod_device *d = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); + struct lov_desc *desc = &d->lod_desc; + int i, rc; + if (lds->lds_def_striping_set && S_ISREG(mode)) { - if (lo->ldo_stripenr == 0) - lo->ldo_stripenr = lds->lds_def_stripenr; - if (lo->ldo_stripe_size == 0) - lo->ldo_stripe_size = lds->lds_def_stripe_size; - if (lo->ldo_stripe_offset == LOV_OFFSET_DEFAULT) - lo->ldo_stripe_offset = lds->lds_def_stripe_offset; - if (lo->ldo_pool == NULL && lds->lds_def_pool[0] != '\0') - lod_object_set_pool(lo, lds->lds_def_pool); - - CDEBUG(D_INFO, "striping from default: count %hu, size %u, " - "offset %d, pool %s\n", - lo->ldo_stripenr, lo->ldo_stripe_size, - (int)lo->ldo_stripe_offset, lo->ldo_pool ?: ""); + rc = lod_alloc_comp_entries(lo, lds->lds_def_comp_cnt); + if (rc != 0) + return; + + lo->ldo_is_composite = lds->lds_def_striping_is_composite; + + for (i = 0; i < lo->ldo_comp_cnt; i++) { + struct lod_layout_component *obj_comp = + &lo->ldo_comp_entries[i]; + struct lod_layout_component *def_comp = + &lds->lds_def_comp_entries[i]; + + CDEBUG(D_LAYOUT, "Inherite from default: size:%hu " + "nr:%u offset:%u %s\n", + def_comp->llc_stripe_size, + def_comp->llc_stripenr, + def_comp->llc_stripe_offset, + def_comp->llc_pool ?: ""); + + *obj_comp = *def_comp; + if (def_comp->llc_pool != NULL) { + /* pointer was copied from def_comp */ + obj_comp->llc_pool = NULL; + lod_obj_set_pool(lo, i, def_comp->llc_pool); + } + + /* + * Don't initialize these fields for plain layout + * (v1/v3) here, they are inherited in the order of + * 'parent' -> 'fs default (root)' -> 'global default + * values for stripe_count & stripe_size'. + * + * see lod_ah_init(). + */ + if (!lo->ldo_is_composite) + continue; + + if (obj_comp->llc_stripenr <= 0) + obj_comp->llc_stripenr = + desc->ld_default_stripe_count; + if (obj_comp->llc_stripe_size <= 0) + obj_comp->llc_stripe_size = + desc->ld_default_stripe_size; + } } else if (lds->lds_dir_def_striping_set && S_ISDIR(mode)) { - if (lo->ldo_stripenr == 0) - lo->ldo_stripenr = lds->lds_dir_def_stripenr; + if (lo->ldo_dir_stripenr == 0) + lo->ldo_dir_stripenr = lds->lds_dir_def_stripenr; if (lo->ldo_dir_stripe_offset == -1) lo->ldo_dir_stripe_offset = lds->lds_dir_def_stripe_offset; if (lo->ldo_dir_hash_type == 0) lo->ldo_dir_hash_type = lds->lds_dir_def_hash_type; - CDEBUG(D_INFO, "striping from default: count %hu, offset %d, " - "hash_type %u\n", - lo->ldo_stripenr, (int)lo->ldo_dir_stripe_offset, - lo->ldo_dir_hash_type); + CDEBUG(D_LAYOUT, "striping from default dir: nr:%hu, " + "offset:%u, hash_type:%u\n", + lo->ldo_dir_stripenr, lo->ldo_dir_stripe_offset, + lo->ldo_dir_hash_type); } } +static inline bool lod_need_inherit_more(struct lod_object *lo, bool from_root) +{ + struct lod_layout_component *lod_comp; + + if (lo->ldo_comp_cnt == 0) + return true; + + if (lo->ldo_is_composite) + return false; + + lod_comp = &lo->ldo_comp_entries[0]; + + if (lod_comp->llc_stripenr <= 0 || + lod_comp->llc_stripe_size <= 0) + return true; + + if (from_root && (lod_comp->llc_pool == NULL || + lod_comp->llc_stripe_offset == LOV_OFFSET_DEFAULT)) + return true; + + return false; +} + /** * Implementation of dt_object_operations::do_ah_init. * @@ -3004,6 +3790,8 @@ static void lod_ah_init(const struct lu_env *env, struct lod_object *lp = NULL; struct lod_object *lc; struct lov_desc *desc; + struct lod_layout_component *lod_comp; + int rc; ENTRY; LASSERT(child); @@ -3016,8 +3804,11 @@ static void lod_ah_init(const struct lu_env *env, nextc = dt_object_child(child); lc = lod_dt_obj(child); - LASSERT(lc->ldo_stripenr == 0); - LASSERT(lc->ldo_stripe == NULL); + LASSERT(!lod_obj_is_striped(child)); + /* default layout template may have been set on the regular file + * when this is called from mdd_create_data() */ + if (S_ISREG(child_mode)) + lod_free_comp_entries(lc); if (!dt_object_exists(nextc)) nextc->do_ops->do_ah_init(env, ah, nextp, nextc, child_mode); @@ -3026,10 +3817,11 @@ static void lod_ah_init(const struct lu_env *env, /* other default values are 0 */ lc->ldo_dir_stripe_offset = -1; - memset(lds, 0, sizeof(*lds)); - lod_get_default_striping(env, lp, lds); + /* get default striping from parent object */ + if (likely(lp != NULL)) + lod_get_default_striping(env, lp, lds); - /* inherit parent default striping */ + /* set child default striping info, default value is NULL */ if (lds->lds_def_striping_set || lds->lds_dir_def_striping_set) lc->ldo_def_striping = lds; @@ -3038,58 +3830,60 @@ static void lod_ah_init(const struct lu_env *env, lod_verify_md_striping(d, ah->dah_eadata) == 0) { const struct lmv_user_md_v1 *lum1 = ah->dah_eadata; - lc->ldo_stripenr = le32_to_cpu(lum1->lum_stripe_count); + lc->ldo_dir_stripenr = + le32_to_cpu(lum1->lum_stripe_count); lc->ldo_dir_stripe_offset = - le32_to_cpu(lum1->lum_stripe_offset); + le32_to_cpu(lum1->lum_stripe_offset); lc->ldo_dir_hash_type = - le32_to_cpu(lum1->lum_hash_type); + le32_to_cpu(lum1->lum_hash_type); CDEBUG(D_INFO, "set dir stripe: count %hu, offset %d, " "hash_type %u\n", - lc->ldo_stripenr, + lc->ldo_dir_stripenr, (int)lc->ldo_dir_stripe_offset, lc->ldo_dir_hash_type); } else { + /* transfer defaults LMV to new directory */ lod_striping_from_default(lc, lds, child_mode); } /* shrink the stripe_count to the avaible MDT count */ - if (lc->ldo_stripenr > d->lod_remote_mdt_count + 1 && + if (lc->ldo_dir_stripenr > d->lod_remote_mdt_count + 1 && !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE)) - lc->ldo_stripenr = d->lod_remote_mdt_count + 1; + lc->ldo_dir_stripenr = d->lod_remote_mdt_count + 1; /* Directory will be striped only if stripe_count > 1, if * stripe_count == 1, let's reset stripenr = 0 to avoid * create single master stripe and also help to unify the * stripe handling of directories and files */ - if (lc->ldo_stripenr == 1) - lc->ldo_stripenr = 0; + if (lc->ldo_dir_stripenr == 1) + lc->ldo_dir_stripenr = 0; CDEBUG(D_INFO, "final dir stripe [%hu %d %u]\n", - lc->ldo_stripenr, (int)lc->ldo_dir_stripe_offset, + lc->ldo_dir_stripenr, (int)lc->ldo_dir_stripe_offset, lc->ldo_dir_hash_type); RETURN_EXIT; } - /* - * if object is going to be striped over OSTs, transfer default - * striping information to the child, so that we can use it - * during declaration and creation - */ + /* child object regular file*/ + if (!lod_object_will_be_striped(S_ISREG(child_mode), lu_object_fid(&child->do_lu))) RETURN_EXIT; - /* other default values are 0 */ - lc->ldo_stripe_offset = LOV_OFFSET_DEFAULT; - - /* striping from parent default */ - if (likely(parent)) { - memset(lds, 0, sizeof(*lds)); - lod_get_default_lov_striping(env, lp, lds); - lod_striping_from_default(lc, lds, child_mode); + /* If object is going to be striped over OSTs, transfer default + * striping information to the child, so that we can use it + * during declaration and creation. + * + * Try from the parent first. + */ + if (likely(lp != NULL)) { + rc = lod_get_default_lov_striping(env, lp, lds); + if (rc == 0) + lod_striping_from_default(lc, lds, child_mode); } + /* Initialize lod_device::lod_md_root object reference */ if (d->lod_md_root == NULL) { struct dt_object *root; struct lod_object *lroot; @@ -3107,30 +3901,63 @@ static void lod_ah_init(const struct lu_env *env, } } - /* if parent doesn't provide all defaults, striping from fs default */ - if (d->lod_md_root != NULL && - (lc->ldo_stripenr == 0 || - lc->ldo_stripe_size == 0 || - lc->ldo_stripe_offset == LOV_OFFSET_DEFAULT || - lc->ldo_pool == NULL)) { - memset(lds, 0, sizeof(*lds)); - lod_get_default_lov_striping(env, d->lod_md_root, lds); - lod_striping_from_default(lc, lds, child_mode); + /* try inherit layout from the root object (fs default) when: + * - parent does not have default layout; or + * - parent has plain(v1/v3) default layout, and some attributes + * are not specified in the default layout; + */ + if (d->lod_md_root != NULL && lod_need_inherit_more(lc, true)) { + rc = lod_get_default_lov_striping(env, d->lod_md_root, lds); + if (rc) + goto out; + if (lc->ldo_comp_cnt == 0) { + lod_striping_from_default(lc, lds, child_mode); + } else if (!lds->lds_def_striping_is_composite) { + struct lod_layout_component *def_comp; + + LASSERT(!lc->ldo_is_composite); + lod_comp = &lc->ldo_comp_entries[0]; + def_comp = &lds->lds_def_comp_entries[0]; + + if (lod_comp->llc_stripenr <= 0) + lod_comp->llc_stripenr = def_comp->llc_stripenr; + if (lod_comp->llc_stripe_size <= 0) + lod_comp->llc_stripe_size = + def_comp->llc_stripe_size; + if (lod_comp->llc_stripe_offset == LOV_OFFSET_DEFAULT) + lod_comp->llc_stripe_offset = + def_comp->llc_stripe_offset; + if (lod_comp->llc_pool == NULL) + lod_obj_set_pool(lc, 0, def_comp->llc_pool); + } } - +out: /* * fs default striping may not be explicitly set, or historically set - * in config log, check striping sanity here and fix to sane values. + * in config log, use them. */ - desc = &d->lod_desc; - if (lc->ldo_stripenr == 0) - lc->ldo_stripenr = desc->ld_default_stripe_count; - if (lc->ldo_stripe_size == 0) - lc->ldo_stripe_size = desc->ld_default_stripe_size; - - CDEBUG(D_INFO, "final striping [%hu %u %d %s]\n", - lc->ldo_stripenr, lc->ldo_stripe_size, - (int)lc->ldo_stripe_offset, lc->ldo_pool ?: ""); + if (lod_need_inherit_more(lc, false)) { + + if (lc->ldo_comp_cnt == 0) { + rc = lod_alloc_comp_entries(lc, 1); + if (rc) + /* fail to allocate memory, will create a + * non-striped file. */ + RETURN_EXIT; + lc->ldo_is_composite = 0; + lod_comp = &lc->ldo_comp_entries[0]; + lod_comp->llc_stripe_offset = LOV_OFFSET_DEFAULT; + } + LASSERT(!lc->ldo_is_composite); + lod_comp = &lc->ldo_comp_entries[0]; + desc = &d->lod_desc; + if (lod_comp->llc_stripenr <= 0) + lod_comp->llc_stripenr = desc->ld_default_stripe_count; + if (lod_comp->llc_stripe_size <= 0) + lod_comp->llc_stripe_size = + desc->ld_default_stripe_size; + } + EXIT; } @@ -3154,18 +3981,15 @@ static void lod_ah_init(const struct lu_env *env, static int lod_declare_init_size(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { - struct dt_object *next = dt_object_child(dt); - struct lod_object *lo = lod_dt_obj(dt); - struct lu_attr *attr = &lod_env_info(env)->lti_attr; - uint64_t size, offs; - int rc, stripe; + struct dt_object *next = dt_object_child(dt); + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object **objects = NULL; + struct lu_attr *attr = &lod_env_info(env)->lti_attr; + uint64_t size, offs; + int i, rc, stripe, stripenr = 0, stripe_size = 0; ENTRY; - /* XXX: we support the simplest (RAID0) striping so far */ - LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0); - LASSERT(lo->ldo_stripe_size > 0); - - if (lo->ldo_stripenr == 0) + if (!lod_obj_is_striped(dt)) RETURN(0); rc = dt_attr_get(env, next, attr); @@ -3177,19 +4001,43 @@ static int lod_declare_init_size(const struct lu_env *env, if (size == 0) RETURN(0); + for (i = 0; i < lo->ldo_comp_cnt; i++) { + struct lod_layout_component *lod_comp; + struct lu_extent *extent; + + lod_comp = &lo->ldo_comp_entries[i]; + + if (lod_comp->llc_stripe == NULL) + continue; + + extent = &lod_comp->llc_extent; + if (!lo->ldo_is_composite || + (size >= extent->e_start && size < extent->e_end)) { + objects = lod_comp->llc_stripe; + stripenr = lod_comp->llc_stripenr; + stripe_size = lod_comp->llc_stripe_size; + break; + } + } + + if (stripenr == 0) + RETURN(0); + + LASSERT(objects != NULL && stripe_size != 0); + /* ll_do_div64(a, b) returns a % b, and a = a / b */ - ll_do_div64(size, (__u64) lo->ldo_stripe_size); - stripe = ll_do_div64(size, (__u64) lo->ldo_stripenr); + ll_do_div64(size, (__u64)stripe_size); + stripe = ll_do_div64(size, (__u64)stripenr); + LASSERT(objects[stripe] != NULL); - size = size * lo->ldo_stripe_size; + size = size * stripe_size; offs = attr->la_size; - size += ll_do_div64(offs, lo->ldo_stripe_size); + size += ll_do_div64(offs, stripe_size); attr->la_valid = LA_SIZE; attr->la_size = size; - rc = lod_sub_object_declare_attr_set(env, lo->ldo_stripe[stripe], attr, - th); + rc = lod_sub_object_declare_attr_set(env, objects[stripe], attr, th); RETURN(rc); } @@ -3228,15 +4076,14 @@ int lod_declare_striped_object(const struct lu_env *env, struct dt_object *dt, if (!dt_object_remote(next)) { /* choose OST and generate appropriate objects */ - rc = lod_qos_prep_create(env, lo, attr, lovea, th); + rc = lod_prepare_create(env, lo, attr, lovea, th); if (rc) GOTO(out, rc); /* * declare storage for striping data */ - info->lti_buf.lb_len = lov_mds_md_size(lo->ldo_stripenr, - lo->ldo_pool ? LOV_MAGIC_V3 : LOV_MAGIC_V1); + info->lti_buf.lb_len = lod_comp_md_size(lo, false); } else { /* LOD can not choose OST objects for remote objects, i.e. * stripes must be ready before that. Right now, it can only @@ -3316,9 +4163,7 @@ static int lod_declare_object_create(const struct lu_env *env, /* XXX: all tricky interactions with ->ah_make_hint() decided * to use striping, then ->declare_create() behaving differently * should be cleaned */ - if (dof->u.dof_reg.striped == 0) - lo->ldo_stripenr = 0; - if (lo->ldo_stripenr > 0) + if (dof->u.dof_reg.striped != 0) rc = lod_declare_striped_object(env, dt, attr, NULL, th); } else if (dof->dof_type == DFT_DIR) { @@ -3377,6 +4222,10 @@ static int lod_declare_object_create(const struct lu_env *env, rc = lod_declare_dir_striping_create(env, dt, attr, dof, th); } out: + /* failed to create striping or to set initial size, let's reset + * config so that others don't get confused */ + if (rc) + lod_object_free_striping(env, lo); RETURN(rc); } @@ -3403,22 +4252,44 @@ int lod_striping_create(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, struct dt_object_format *dof, struct thandle *th) { - struct lod_object *lo = lod_dt_obj(dt); - int rc = 0, i; + struct lod_layout_component *lod_comp; + struct lod_object *lo = lod_dt_obj(dt); + int rc = 0, i, j; ENTRY; + LASSERT(lo->ldo_comp_cnt != 0 && lo->ldo_comp_entries != NULL); + /* create all underlying objects */ - for (i = 0; i < lo->ldo_stripenr; i++) { - LASSERT(lo->ldo_stripe[i]); - rc = lod_sub_object_create(env, lo->ldo_stripe[i], attr, NULL, - dof, th); - if (rc) - break; + for (i = 0; i < lo->ldo_comp_cnt; i++) { + lod_comp = &lo->ldo_comp_entries[i]; + + if (lod_comp->llc_flags & LCME_FL_INIT) + continue; + + lod_comp->llc_flags |= LCME_FL_INIT; + + if (lod_comp->llc_stripe == NULL) + continue; + + LASSERT(lod_comp->llc_stripenr > 0); + for (j = 0; j < lod_comp->llc_stripenr; j++) { + struct dt_object *object = lod_comp->llc_stripe[j]; + LASSERT(object != NULL); + rc = lod_sub_object_create(env, object, attr, NULL, + dof, th); + if (rc) + break; + } } if (rc == 0) rc = lod_generate_and_set_lovea(env, lo, th); + if (rc == 0) + lo->ldo_comp_cached = 1; + else + lod_object_free_striping(env, lo); + RETURN(rc); } @@ -3436,7 +4307,6 @@ static int lod_object_create(const struct lu_env *env, struct dt_object *dt, struct dt_allocation_hint *hint, struct dt_object_format *dof, struct thandle *th) { - struct lod_object *lo = lod_dt_obj(dt); int rc; ENTRY; @@ -3447,12 +4317,28 @@ static int lod_object_create(const struct lu_env *env, struct dt_object *dt, RETURN(rc); if (S_ISREG(dt->do_lu.lo_header->loh_attr) && - lo->ldo_stripe && dof->u.dof_reg.striped != 0) + lod_obj_is_striped(dt) && dof->u.dof_reg.striped != 0) { + LASSERT(lod_dt_obj(dt)->ldo_comp_cached == 0); rc = lod_striping_create(env, dt, attr, dof, th); + } RETURN(rc); } +static inline int +lod_obj_stripe_destroy_cb(const struct lu_env *env, struct lod_object *lo, + struct dt_object *dt, struct thandle *th, + int stripe_idx, struct lod_obj_stripe_cb_data *data) +{ + if (data->locd_declare) + return lod_sub_object_declare_destroy(env, dt, th); + else if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) || + stripe_idx == cfs_fail_val) + return lod_sub_object_destroy(env, dt, th); + else + return 0; +} + /** * Implementation of dt_object_operations::do_declare_destroy. * @@ -3491,7 +4377,7 @@ static int lod_declare_object_destroy(const struct lu_env *env, if (rc != 0) RETURN(rc); - for (i = 0; i < lo->ldo_stripenr; i++) { + for (i = 0; i < lo->ldo_dir_stripenr; i++) { rc = lod_sub_object_declare_ref_del(env, next, th); if (rc != 0) RETURN(rc); @@ -3517,19 +4403,29 @@ static int lod_declare_object_destroy(const struct lu_env *env, OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ2)) RETURN(0); + if (!lod_obj_is_striped(dt)) + RETURN(0); + /* declare destroy all striped objects */ - for (i = 0; i < lo->ldo_stripenr; i++) { - if (lo->ldo_stripe[i] == NULL) - continue; + if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) { + for (i = 0; i < lo->ldo_dir_stripenr; i++) { + if (lo->ldo_stripe[i] == NULL) + continue; - if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) rc = lod_sub_object_declare_ref_del(env, lo->ldo_stripe[i], th); - rc = lod_sub_object_declare_destroy(env, lo->ldo_stripe[i], - th); - if (rc != 0) - break; + rc = lod_sub_object_declare_destroy(env, + lo->ldo_stripe[i], th); + if (rc != 0) + break; + } + } else { + struct lod_obj_stripe_cb_data data; + + data.locd_declare = true; + rc = lod_obj_for_each_stripe(env, lo, th, + lod_obj_stripe_destroy_cb, &data); } RETURN(rc); @@ -3562,7 +4458,7 @@ static int lod_object_destroy(const struct lu_env *env, if (rc != 0) RETURN(rc); - for (i = 0; i < lo->ldo_stripenr; i++) { + for (i = 0; i < lo->ldo_dir_stripenr; i++) { rc = lod_sub_object_ref_del(env, next, th); if (rc != 0) RETURN(rc); @@ -3590,12 +4486,16 @@ static int lod_object_destroy(const struct lu_env *env, OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ2)) RETURN(0); + if (!lod_obj_is_striped(dt)) + RETURN(0); + /* destroy all striped objects */ - for (i = 0; i < lo->ldo_stripenr; i++) { - if (likely(lo->ldo_stripe[i] != NULL) && - (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) || - i == cfs_fail_val)) { - if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) { + if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) { + for (i = 0; i < lo->ldo_dir_stripenr; i++) { + if (lo->ldo_stripe[i] == NULL) + continue; + if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) || + i == cfs_fail_val) { dt_write_lock(env, lo->ldo_stripe[i], MOR_TGT_CHILD); rc = lod_sub_object_ref_del(env, @@ -3603,12 +4503,19 @@ static int lod_object_destroy(const struct lu_env *env, dt_write_unlock(env, lo->ldo_stripe[i]); if (rc != 0) break; - } - rc = lod_sub_object_destroy(env, lo->ldo_stripe[i], th); - if (rc != 0) - break; + rc = lod_sub_object_destroy(env, + lo->ldo_stripe[i], th); + if (rc != 0) + break; + } } + } else { + struct lod_obj_stripe_cb_data data; + + data.locd_declare = false; + rc = lod_obj_for_each_stripe(env, lo, th, + lod_obj_stripe_destroy_cb, &data); } RETURN(rc); @@ -3730,7 +4637,7 @@ static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt, RETURN(0); LASSERT(S_ISDIR(dt->do_lu.lo_header->loh_attr)); - LASSERT(lo->ldo_stripenr > 1); + LASSERT(lo->ldo_dir_stripenr > 1); /* Note: for remote lock for single stripe dir, MDT will cancel * the lock by lockh directly */ LASSERT(!dt_object_remote(dt_object_child(dt))); @@ -3785,7 +4692,7 @@ static int lod_object_lock(const struct lu_env *env, GOTO(out, rc); /* No stripes */ - if (lo->ldo_stripenr <= 1) { + if (lo->ldo_dir_stripenr <= 1) { /* * NB, ei_cbdata stores pointer to slave locks, if no locks * taken, make sure it's set to NULL, otherwise MDT will try to @@ -3795,16 +4702,16 @@ static int lod_object_lock(const struct lu_env *env, GOTO(out, rc = 0); } - slave_locks_size = sizeof(*slave_locks) + lo->ldo_stripenr * + slave_locks_size = sizeof(*slave_locks) + lo->ldo_dir_stripenr * sizeof(slave_locks->handles[0]); /* Freed in lod_object_unlock */ OBD_ALLOC(slave_locks, slave_locks_size); if (slave_locks == NULL) GOTO(out, rc = -ENOMEM); - slave_locks->count = lo->ldo_stripenr; + slave_locks->count = lo->ldo_dir_stripenr; /* striped directory lock */ - for (i = 1; i < lo->ldo_stripenr; i++) { + for (i = 1; i < lo->ldo_dir_stripenr; i++) { struct lustre_handle lockh; struct ldlm_res_id *res_id; @@ -4055,24 +4962,45 @@ static int lod_object_init(const struct lu_env *env, struct lu_object *lo, */ void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo) { - int len; + struct lod_layout_component *lod_comp; + int i, j; if (lo->ldo_stripe != NULL) { - int i; - - LASSERT(lo->ldo_stripes_allocated > 0); + LASSERT(lo->ldo_comp_entries == NULL); + LASSERT(lo->ldo_dir_stripes_allocated > 0); - for (i = 0; i < lo->ldo_stripenr; i++) { + for (i = 0; i < lo->ldo_dir_stripenr; i++) { if (lo->ldo_stripe[i]) dt_object_put(env, lo->ldo_stripe[i]); } - len = sizeof(struct dt_object *) * lo->ldo_stripes_allocated; - OBD_FREE(lo->ldo_stripe, len); + j = sizeof(struct dt_object *) * lo->ldo_dir_stripes_allocated; + OBD_FREE(lo->ldo_stripe, j); lo->ldo_stripe = NULL; - lo->ldo_stripes_allocated = 0; + lo->ldo_dir_stripes_allocated = 0; + lo->ldo_dir_stripenr = 0; + } else if (lo->ldo_comp_entries != NULL) { + for (i = 0; i < lo->ldo_comp_cnt; i++) { + /* free lod_layout_component::llc_stripe array */ + lod_comp = &lo->ldo_comp_entries[i]; + + if (lod_comp->llc_stripe == NULL) + continue; + LASSERT(lod_comp->llc_stripes_allocated != 0); + for (j = 0; j < lod_comp->llc_stripes_allocated; j++) { + if (lod_comp->llc_stripe[j] != NULL) + lu_object_put(env, + &lod_comp->llc_stripe[j]->do_lu); + } + OBD_FREE(lod_comp->llc_stripe, + sizeof(struct dt_object *) * + lod_comp->llc_stripes_allocated); + lod_comp->llc_stripe = NULL; + lod_comp->llc_stripes_allocated = 0; + } + lod_free_comp_entries(lo); + lo->ldo_comp_cached = 0; } - lo->ldo_stripenr = 0; } /** @@ -4106,7 +5034,6 @@ static void lod_object_free(const struct lu_env *env, struct lu_object *o) { struct lod_object *lo = lu2lod_obj(o); - lod_object_set_pool(lo, NULL); /* release all underlying object pinned */ lod_object_free_striping(env, lo); lu_object_fini(o); diff --git a/lustre/lod/lod_qos.c b/lustre/lod/lod_qos.c index 8621376..4930dfa 100644 --- a/lustre/lod/lod_qos.c +++ b/lustre/lod/lod_qos.c @@ -819,12 +819,54 @@ static int lod_qos_is_ost_used(const struct lu_env *env, int ost, __u32 stripes) return 0; } +/** + * Check is OST used in a composite layout + * + * \param[in] inuse all inuse ost indexs + * \param[in] ost OST target index to check + * + * \retval 0 not used + * \retval 1 used + */ +static inline int lod_comp_is_ost_used(struct ost_pool *inuse, int ost) +{ + __u32 j; + LASSERT(inuse != NULL); + + if (inuse->op_size == 0) + return 0; + + LASSERT(inuse->op_count <= inuse->op_size); + for (j = 0; j < inuse->op_count; j++) { + if (inuse->op_array[j] == ost) + return 1; + } + return 0; +} + +/** + * Mark the given target as used for a composite layout + * + * \param[in] inuse inuse ost index array + * \param[in] idx index in the array + */ +static inline void lod_comp_ost_in_use(struct ost_pool *inuse, int ost) +{ + LASSERT(inuse != NULL); + if (inuse->op_size && !lod_comp_is_ost_used(inuse, ost)) { + LASSERT(inuse->op_count < inuse->op_size); + inuse->op_array[inuse->op_count] = ost; + inuse->op_count++; + } +} + static int lod_check_and_reserve_ost(const struct lu_env *env, struct lod_device *m, struct obd_statfs *sfs, __u32 ost_idx, __u32 speed, __u32 *s_idx, struct dt_object **stripe, - struct thandle *th) + struct thandle *th, + struct ost_pool *inuse) { struct dt_object *o; __u32 stripe_idx = *s_idx; @@ -854,6 +896,15 @@ static int lod_check_and_reserve_ost(const struct lu_env *env, } /* + * try not allocate on OST which has been used by other + * component + */ + if (speed == 0 && lod_comp_is_ost_used(inuse, ost_idx)) { + QOS_DEBUG("#%d: used by other component\n", ost_idx); + goto out_return; + } + + /* * do not put >1 objects on a single OST */ if (lod_qos_is_ost_used(env, ost_idx, stripe_idx)) @@ -871,6 +922,7 @@ static int lod_check_and_reserve_ost(const struct lu_env *env, * We've successfully declared (reserved) an object */ lod_qos_ost_in_use(env, stripe_idx, ost_idx); + lod_comp_ost_in_use(inuse, ost_idx); stripe[stripe_idx] = o; OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_LOV_CREATE_RACE, 2); stripe_idx++; @@ -899,6 +951,8 @@ out_return: * \param[out] stripe striping created * \param[in] flags allocation flags (0 or LOV_USES_DEFAULT_STRIPE) * \param[in] th transaction handle + * \param[in] comp_idx index of ldo_comp_entries + * \param[in|out] inuse array of inuse ost index * * \retval 0 on success * \retval -ENOSPC if not enough OSTs are found @@ -906,26 +960,29 @@ out_return: */ static int lod_alloc_rr(const struct lu_env *env, struct lod_object *lo, struct dt_object **stripe, int flags, - struct thandle *th) + struct thandle *th, int comp_idx, + struct ost_pool *inuse) { + struct lod_layout_component *lod_comp; struct lod_device *m = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); struct obd_statfs *sfs = &lod_env_info(env)->lti_osfs; struct pool_desc *pool = NULL; struct ost_pool *osts; struct lod_qos_rr *lqr; - unsigned int i, array_idx; - int rc; - __u32 ost_start_idx_temp; - int speed = 0; - int ost_connecting = 0; - __u32 stripe_idx = 0; - __u32 stripe_cnt = lo->ldo_stripenr; - __u32 stripe_cnt_min = min_stripe_count(stripe_cnt, flags); - __u32 ost_idx; + unsigned int i, array_idx; + __u32 ost_start_idx_temp; + __u32 stripe_idx = 0; + __u32 stripe_cnt, stripe_cnt_min, ost_idx; + int rc, speed = 0, ost_connecting = 0; ENTRY; - if (lo->ldo_pool) - pool = lod_find_pool(m, lo->ldo_pool); + LASSERT(lo->ldo_comp_cnt > comp_idx && lo->ldo_comp_entries != NULL); + lod_comp = &lo->ldo_comp_entries[comp_idx]; + stripe_cnt = lod_comp->llc_stripenr; + stripe_cnt_min = min_stripe_count(stripe_cnt, flags); + + if (lod_comp->llc_pool != NULL) + pool = lod_find_pool(m, lod_comp->llc_pool); if (pool != NULL) { down_read(&pool_tgt_rw_sem(pool)); @@ -940,7 +997,7 @@ static int lod_alloc_rr(const struct lu_env *env, struct lod_object *lo, if (rc) GOTO(out, rc); - rc = lod_qos_ost_in_use_clear(env, lo->ldo_stripenr); + rc = lod_qos_ost_in_use_clear(env, stripe_cnt); if (rc) GOTO(out, rc); @@ -966,11 +1023,11 @@ repeat_find: QOS_DEBUG("pool '%s' want %d startidx %d startcnt %d offset %d " "active %d count %d\n", - lo->ldo_pool ? lo->ldo_pool : "", + lod_comp->llc_pool ? lod_comp->llc_pool : "", stripe_cnt, lqr->lqr_start_idx, lqr->lqr_start_count, lqr->lqr_offset_idx, osts->op_count, osts->op_count); - for (i = 0; i < osts->op_count && stripe_idx < lo->ldo_stripenr; i++) { + for (i = 0; i < osts->op_count && stripe_idx < stripe_cnt; i++) { array_idx = (lqr->lqr_start_idx + lqr->lqr_offset_idx) % osts->op_count; ++lqr->lqr_start_idx; @@ -991,7 +1048,7 @@ repeat_find: spin_unlock(&lqr->lqr_alloc); rc = lod_check_and_reserve_ost(env, m, sfs, ost_idx, speed, - &stripe_idx, stripe, th); + &stripe_idx, stripe, th, inuse); spin_lock(&lqr->lqr_alloc); if (rc != 0 && OST_TGT(m, ost_idx)->ltd_connecting) @@ -1010,7 +1067,7 @@ repeat_find: up_read(&m->lod_qos.lq_rw_sem); if (stripe_idx) { - lo->ldo_stripenr = stripe_idx; + lod_comp->llc_stripenr = stripe_idx; /* at least one stripe is allocated */ rc = 0; } else { @@ -1049,6 +1106,8 @@ out: * \param[out] stripe striping created * \param[in] lum stripe md to specify list of OSTs * \param[in] th transaction handle + * \param[in] comp_idx index of ldo_comp_entries + * \param[in|out] inuse array of inuse ost index * * \retval 0 on success * \retval -ENODEV OST index does not exist on file system @@ -1057,8 +1116,10 @@ out: */ static int lod_alloc_ost_list(const struct lu_env *env, struct lod_object *lo, struct dt_object **stripe, - struct lov_user_md *lum, struct thandle *th) + struct lov_user_md *lum, struct thandle *th, + int comp_idx, struct ost_pool *inuse) { + struct lod_layout_component *lod_comp; struct lod_device *m = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); struct obd_statfs *sfs = &lod_env_info(env)->lti_osfs; struct dt_object *o; @@ -1066,33 +1127,37 @@ static int lod_alloc_ost_list(const struct lu_env *env, unsigned int array_idx = 0; int stripe_count = 0; int i; - int rc; + int rc = -EINVAL; ENTRY; /* for specific OSTs layout */ LASSERT(lum != NULL && lum->lmm_magic == LOV_USER_MAGIC_SPECIFIC); lustre_print_user_md(D_OTHER, lum, __func__); - rc = lod_qos_ost_in_use_clear(env, lo->ldo_stripenr); + LASSERT(lo->ldo_comp_cnt > comp_idx && lo->ldo_comp_entries != NULL); + lod_comp = &lo->ldo_comp_entries[comp_idx]; + + rc = lod_qos_ost_in_use_clear(env, lod_comp->llc_stripenr); if (rc < 0) RETURN(rc); v3 = (struct lov_user_md_v3 *)lum; - for (i = 0; i < lo->ldo_stripenr; i++) { - if (v3->lmm_objects[i].l_ost_idx == lo->ldo_stripe_offset) { + for (i = 0; i < lod_comp->llc_stripenr; i++) { + if (v3->lmm_objects[i].l_ost_idx == + lod_comp->llc_stripe_offset) { array_idx = i; break; } } - if (i == lo->ldo_stripenr) { + if (i == lod_comp->llc_stripenr) { CDEBUG(D_OTHER, "%s: start index %d not in the specified list of OSTs\n", - lod2obd(m)->obd_name, lo->ldo_stripe_offset); + lod2obd(m)->obd_name, lod_comp->llc_stripe_offset); RETURN(-EINVAL); } - for (i = 0; i < lo->ldo_stripenr; - i++, array_idx = (array_idx + 1) % lo->ldo_stripenr) { + for (i = 0; i < lod_comp->llc_stripenr; + i++, array_idx = (array_idx + 1) % lod_comp->llc_stripenr) { __u32 ost_idx = v3->lmm_objects[array_idx].l_ost_idx; if (!cfs_bitmap_check(m->lod_ost_bitmap, ost_idx)) { @@ -1125,6 +1190,7 @@ static int lod_alloc_ost_list(const struct lu_env *env, * We've successfully declared (reserved) an object */ lod_qos_ost_in_use(env, stripe_count, ost_idx); + lod_comp_ost_in_use(inuse, ost_idx); stripe[stripe_count] = o; stripe_count++; } @@ -1149,6 +1215,8 @@ static int lod_alloc_ost_list(const struct lu_env *env, * \param[out] stripe striping created * \param[in] flags not used * \param[in] th transaction handle + * \param[in] comp_idx index of ldo_comp_entries + * \param[in|out]inuse array of inuse ost index * * \retval 0 on success * \retval -ENOSPC if no OST objects are available at all @@ -1158,8 +1226,10 @@ static int lod_alloc_ost_list(const struct lu_env *env, */ static int lod_alloc_specific(const struct lu_env *env, struct lod_object *lo, struct dt_object **stripe, int flags, - struct thandle *th) + struct thandle *th, int comp_idx, + struct ost_pool *inuse) { + struct lod_layout_component *lod_comp; struct lod_device *m = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); struct obd_statfs *sfs = &lod_env_info(env)->lti_osfs; struct dt_object *o; @@ -1171,12 +1241,15 @@ static int lod_alloc_specific(const struct lu_env *env, struct lod_object *lo, struct ost_pool *osts; ENTRY; - rc = lod_qos_ost_in_use_clear(env, lo->ldo_stripenr); + LASSERT(lo->ldo_comp_cnt > comp_idx && lo->ldo_comp_entries != NULL); + lod_comp = &lo->ldo_comp_entries[comp_idx]; + + rc = lod_qos_ost_in_use_clear(env, lod_comp->llc_stripenr); if (rc) GOTO(out, rc); - if (lo->ldo_pool) - pool = lod_find_pool(m, lo->ldo_pool); + if (lod_comp->llc_pool != NULL) + pool = lod_find_pool(m, lod_comp->llc_pool); if (pool != NULL) { down_read(&pool_tgt_rw_sem(pool)); @@ -1191,14 +1264,15 @@ repeat_find: /* search loi_ost_idx in ost array */ array_idx = 0; for (i = 0; i < ost_count; i++) { - if (osts->op_array[i] == lo->ldo_stripe_offset) { + if (osts->op_array[i] == lod_comp->llc_stripe_offset) { array_idx = i; break; } } if (i == ost_count) { CERROR("Start index %d not found in pool '%s'\n", - lo->ldo_stripe_offset, lo->ldo_pool ?: ""); + lod_comp->llc_stripe_offset, + lod_comp->llc_pool ? lod_comp->llc_pool : ""); GOTO(out, rc = -EINVAL); } @@ -1220,6 +1294,13 @@ repeat_find: if (lod_qos_is_ost_used(env, ost_idx, stripe_num)) continue; + /* + * try not allocate on the OST used by other component + */ + if (speed == 0 && i != 0 && + lod_comp_is_ost_used(inuse, ost_idx)) + continue; + /* Drop slow OSCs if we can, but not for requested start idx. * * This means "if OSC is slow and it is not the requested @@ -1251,11 +1332,12 @@ repeat_find: * We've successfully declared (reserved) an object */ lod_qos_ost_in_use(env, stripe_num, ost_idx); + lod_comp_ost_in_use(inuse, ost_idx); stripe[stripe_num] = o; stripe_num++; /* We have enough stripes */ - if (stripe_num == lo->ldo_stripenr) + if (stripe_num == lod_comp->llc_stripenr) GOTO(out, rc = 0); } if (speed < 2) { @@ -1270,7 +1352,7 @@ repeat_find: */ CERROR("can't lstripe objid "DFID": have %d want %u\n", PFID(lu_object_fid(lod2lu_obj(lo))), stripe_num, - lo->ldo_stripenr); + lod_comp->llc_stripenr); rc = stripe_num == 0 ? -ENOSPC : -EFBIG; out: if (pool != NULL) { @@ -1339,6 +1421,8 @@ static inline int lod_qos_is_usable(struct lod_device *lod) * \param[out] stripe striping created * \param[in] flags 0 or LOV_USES_DEFAULT_STRIPE * \param[in] th transaction handle + * \param[in] comp_idx index of ldo_comp_entries + * \param[in|out]inuse array of inuse ost index * * \retval 0 on success * \retval -EAGAIN not enough OSTs are found for specified stripe count @@ -1347,28 +1431,31 @@ static inline int lod_qos_is_usable(struct lod_device *lod) */ static int lod_alloc_qos(const struct lu_env *env, struct lod_object *lo, struct dt_object **stripe, int flags, - struct thandle *th) + struct thandle *th, int comp_idx, + struct ost_pool *inuse) { + struct lod_layout_component *lod_comp; struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); struct obd_statfs *sfs = &lod_env_info(env)->lti_osfs; struct lod_tgt_desc *ost; struct dt_object *o; __u64 total_weight = 0; - __u32 nfound, good_osts; - __u32 stripe_cnt = lo->ldo_stripenr; - __u32 stripe_cnt_min; struct pool_desc *pool = NULL; struct ost_pool *osts; unsigned int i; + __u32 nfound, good_osts, stripe_cnt, stripe_cnt_min; int rc = 0; ENTRY; + LASSERT(lo->ldo_comp_cnt > comp_idx && lo->ldo_comp_entries != NULL); + lod_comp = &lo->ldo_comp_entries[comp_idx]; + stripe_cnt = lod_comp->llc_stripenr; stripe_cnt_min = min_stripe_count(stripe_cnt, flags); if (stripe_cnt_min < 1) RETURN(-EINVAL); - if (lo->ldo_pool) - pool = lod_find_pool(lod, lo->ldo_pool); + if (lod_comp->llc_pool != NULL) + pool = lod_find_pool(lod, lod_comp->llc_pool); if (pool != NULL) { down_read(&pool_tgt_rw_sem(pool)); @@ -1395,7 +1482,7 @@ static int lod_alloc_qos(const struct lu_env *env, struct lod_object *lo, if (rc) GOTO(out, rc); - rc = lod_qos_ost_in_use_clear(env, lo->ldo_stripenr); + rc = lod_qos_ost_in_use_clear(env, lod_comp->llc_stripenr); if (rc) GOTO(out, rc); @@ -1499,9 +1586,11 @@ static int lod_alloc_qos(const struct lu_env *env, struct lod_object *lo, /* * do not put >1 objects on a single OST */ - if (lod_qos_is_ost_used(env, idx, nfound)) + if (lod_qos_is_ost_used(env, idx, nfound) || + lod_comp_is_ost_used(inuse, idx)) continue; lod_qos_ost_in_use(env, nfound, idx); + lod_comp_ost_in_use(inuse, idx); o = lod_qos_declare_object_on(env, lod, idx, th); if (IS_ERR(o)) { @@ -1536,6 +1625,9 @@ static int lod_alloc_qos(const struct lu_env *env, struct lod_object *lo, dt_object_put(env, stripe[i]); stripe[i] = NULL; } + LASSERTF(nfound <= inuse->op_count, + "nfound:%d, op_count:%u\n", nfound, inuse->op_count); + inuse->op_count -= nfound; /* makes sense to rebalance next time */ lod->lod_qos.lq_dirty = 1; @@ -1566,12 +1658,12 @@ out_nolock: * If the passed one is 0, then the filesystem's default one is used. * * \param[in] lod LOD device - * \param[in] magic the format if striping + * \param[in] lo The lod_object * \param[in] stripe_count count the caller would like to use * * \retval the maximum usable stripe count */ -static __u16 lod_get_stripecnt(struct lod_device *lod, __u32 magic, +static __u16 lod_get_stripecnt(struct lod_device *lod, struct lod_object *lo, __u16 stripe_count) { __u32 max_stripes = LOV_MAX_STRIPE_COUNT_OLD; @@ -1584,9 +1676,30 @@ static __u16 lod_get_stripecnt(struct lod_device *lod, __u32 magic, stripe_count = 1; /* stripe count is based on whether OSD can handle larger EA sizes */ - if (lod->lod_osd_max_easize > 0) - max_stripes = lov_mds_md_max_stripe_count( - lod->lod_osd_max_easize, magic); + if (lod->lod_osd_max_easize > 0) { + unsigned int easize = lod->lod_osd_max_easize; + int i; + + if (lo->ldo_is_composite) { + struct lod_layout_component *lod_comp; + unsigned int header_sz = sizeof(struct lov_comp_md_v1); + + header_sz += sizeof(struct lov_comp_md_entry_v1) * + lo->ldo_comp_cnt; + for (i = 0; i < lo->ldo_comp_cnt; i++) { + lod_comp = &lo->ldo_comp_entries[i]; + if (lod_comp->llc_flags & LCME_FL_INIT) + header_sz += lov_mds_md_size( + lod_comp->llc_stripenr, LOV_MAGIC_V3); + } + if (easize > header_sz) + easize -= header_sz; + else + easize = 0; + } + + max_stripes = lov_mds_md_max_stripe_count(easize, LOV_MAGIC_V3); + } return (stripe_count < max_stripes) ? stripe_count : max_stripes; } @@ -1606,46 +1719,93 @@ static __u16 lod_get_stripecnt(struct lod_device *lod, __u32 magic, * \retval 0 on success * \retval negative negated errno on error */ -static int lod_use_defined_striping(const struct lu_env *env, - struct lod_object *mo, - const struct lu_buf *buf) +int lod_use_defined_striping(const struct lu_env *env, + struct lod_object *mo, + const struct lu_buf *buf) { + struct lod_layout_component *lod_comp; struct lov_mds_md_v1 *v1 = buf->lb_buf; struct lov_mds_md_v3 *v3 = buf->lb_buf; + struct lov_comp_md_v1 *comp_v1 = NULL; struct lov_ost_data_v1 *objs; - __u32 magic; - int rc = 0; + __u32 magic; + __u16 comp_cnt; + int rc = 0, i; ENTRY; - magic = le32_to_cpu(v1->lmm_magic); - if (magic == LOV_MAGIC_V1_DEF) { - magic = LOV_MAGIC_V1; - objs = &v1->lmm_objects[0]; - } else if (magic == LOV_MAGIC_V3_DEF) { - magic = LOV_MAGIC_V3; - objs = &v3->lmm_objects[0]; - lod_object_set_pool(mo, v3->lmm_pool_name); + magic = le32_to_cpu(v1->lmm_magic) & ~LOV_MAGIC_DEF; + + if (magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3 && + magic != LOV_MAGIC_COMP_V1) + RETURN(-EINVAL); + + if (magic == LOV_MAGIC_COMP_V1) { + comp_v1 = buf->lb_buf; + comp_cnt = le16_to_cpu(comp_v1->lcm_entry_count); + if (comp_cnt == 0) + RETURN(-EINVAL); + mo->ldo_is_composite = 1; } else { - GOTO(out, rc = -EINVAL); + mo->ldo_is_composite = 0; + comp_cnt = 1; } - mo->ldo_pattern = le32_to_cpu(v1->lmm_pattern); - mo->ldo_stripe_size = le32_to_cpu(v1->lmm_stripe_size); - mo->ldo_stripenr = le16_to_cpu(v1->lmm_stripe_count); - mo->ldo_layout_gen = le16_to_cpu(v1->lmm_layout_gen); + rc = lod_alloc_comp_entries(mo, comp_cnt); + if (rc) + RETURN(rc); - /* fixup for released file before object initialization */ - if (mo->ldo_pattern & LOV_PATTERN_F_RELEASED) { - mo->ldo_released_stripenr = mo->ldo_stripenr; - mo->ldo_stripenr = 0; - } + for (i = 0; i < comp_cnt; i++) { + struct lu_extent *ext; + char *pool_name; + __u32 offs; + + lod_comp = &mo->ldo_comp_entries[i]; + + if (mo->ldo_is_composite) { + offs = le32_to_cpu(comp_v1->lcm_entries[i].lcme_offset); + v1 = (struct lov_mds_md_v1 *)((char *)comp_v1 + offs); + magic = le32_to_cpu(v1->lmm_magic); + + ext = &comp_v1->lcm_entries[i].lcme_extent; + lod_comp->llc_extent.e_start = + le64_to_cpu(ext->e_start); + lod_comp->llc_extent.e_end = le64_to_cpu(ext->e_end); + lod_comp->llc_flags = + le32_to_cpu(comp_v1->lcm_entries[i].lcme_flags); + lod_comp->llc_id = + le32_to_cpu(comp_v1->lcm_entries[i].lcme_id); + if (lod_comp->llc_id == LCME_ID_INVAL) + GOTO(out, rc = -EINVAL); + } - LASSERT(buf->lb_len >= lov_mds_md_size(mo->ldo_stripenr, magic)); + pool_name = NULL; + if (magic == LOV_MAGIC_V1) { + objs = &v1->lmm_objects[0]; + } else if (magic == LOV_MAGIC_V3) { + objs = &v3->lmm_objects[0]; + if (v3->lmm_pool_name[0] != '\0') + pool_name = v3->lmm_pool_name; + } else { + CDEBUG(D_LAYOUT, "Invalid magic %x\n", magic); + GOTO(out, rc = -EINVAL); + } - if (mo->ldo_stripenr > 0) - rc = lod_initialize_objects(env, mo, objs); + lod_comp->llc_pattern = le32_to_cpu(v1->lmm_pattern); + lod_comp->llc_stripe_size = le32_to_cpu(v1->lmm_stripe_size); + lod_comp->llc_stripenr = le16_to_cpu(v1->lmm_stripe_count); + lod_comp->llc_layout_gen = le16_to_cpu(v1->lmm_layout_gen); + lod_obj_set_pool(mo, i, pool_name); + if (!(lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED)) { + rc = lod_initialize_objects(env, mo, objs, i); + if (rc) + GOTO(out, rc); + } + } out: + if (rc) + lod_object_free_striping(env, mo); + RETURN(rc); } @@ -1670,23 +1830,32 @@ static int lod_qos_parse_config(const struct lu_env *env, struct lod_object *lo, const struct lu_buf *buf) { - struct lod_device *d = lu2lod_dev(lod2lu_obj(lo)->lo_dev); - struct lov_user_md_v1 *v1 = NULL; - struct lov_user_md_v3 *v3 = NULL; - char *pool_name = NULL; - __u32 magic; - int rc; - unsigned int size; + struct lod_layout_component *lod_comp; + struct lod_device *d = lu2lod_dev(lod2lu_obj(lo)->lo_dev); + struct lov_desc *desc = &d->lod_desc; + struct lov_user_md_v1 *v1 = NULL; + struct lov_user_md_v3 *v3 = NULL; + struct lov_comp_md_v1 *comp_v1 = NULL; + __u32 magic; + __u16 comp_cnt; + int i, rc; ENTRY; if (buf == NULL || buf->lb_buf == NULL || buf->lb_len == 0) RETURN(0); + rc = lod_verify_striping(d, buf, false, 0); + if (rc) + RETURN(-EINVAL); + + lod_free_comp_entries(lo); + v3 = buf->lb_buf; v1 = buf->lb_buf; + comp_v1 = buf->lb_buf; magic = v1->lmm_magic; - if (unlikely(magic == LOV_MAGIC_V1_DEF || magic == LOV_MAGIC_V3_DEF)) { + if (unlikely(le32_to_cpu(magic) & LOV_MAGIC_DEF)) { /* try to use as fully defined striping */ rc = lod_use_defined_striping(env, lo, buf); RETURN(rc); @@ -1698,18 +1867,13 @@ static int lod_qos_parse_config(const struct lu_env *env, magic = v1->lmm_magic; /* fall through */ case LOV_USER_MAGIC_V1: - size = sizeof(*v1); break; - case __swab32(LOV_USER_MAGIC_V3): lustre_swab_lov_user_md_v3(v3); magic = v3->lmm_magic; /* fall through */ case LOV_USER_MAGIC_V3: - size = sizeof(*v3); - pool_name = v3->lmm_pool_name; break; - case __swab32(LOV_USER_MAGIC_SPECIFIC): lustre_swab_lov_user_md_v3(v3); lustre_swab_lov_user_md_objects(v3->lmm_objects, @@ -1717,83 +1881,112 @@ static int lod_qos_parse_config(const struct lu_env *env, magic = v3->lmm_magic; /* fall through */ case LOV_USER_MAGIC_SPECIFIC: - if (v3->lmm_stripe_offset == LOV_OFFSET_DEFAULT) - v3->lmm_stripe_offset = v3->lmm_objects[0].l_ost_idx; - if (v3->lmm_pool_name[0] != '\0') - pool_name = v3->lmm_pool_name; - size = lov_user_md_size(v3->lmm_stripe_count, - LOV_USER_MAGIC_SPECIFIC); break; - + case __swab32(LOV_USER_MAGIC_COMP_V1): + lustre_swab_lov_comp_md_v1(comp_v1); + magic = comp_v1->lcm_magic; + /* fall trhough */ + case LOV_USER_MAGIC_COMP_V1: + break; default: CERROR("%s: unrecognized magic %X\n", lod2obd(d)->obd_name, magic); RETURN(-EINVAL); } - if (unlikely(buf->lb_len < size)) { - CERROR("%s: wrong size: %zd, expect: %u\n", - lod2obd(d)->obd_name, buf->lb_len, size); - RETURN(-EINVAL); - } - lustre_print_user_md(D_OTHER, v1, "parse config"); - v1->lmm_magic = magic; - if (v1->lmm_pattern == 0) - v1->lmm_pattern = LOV_PATTERN_RAID0; - if (lov_pattern(v1->lmm_pattern) != LOV_PATTERN_RAID0) { - CERROR("%s: invalid pattern: %x\n", - lod2obd(d)->obd_name, v1->lmm_pattern); - RETURN(-EINVAL); + if (magic == LOV_USER_MAGIC_COMP_V1) { + comp_cnt = comp_v1->lcm_entry_count; + if (comp_cnt == 0) + RETURN(-EINVAL); + lo->ldo_is_composite = 1; + } else { + comp_cnt = 1; + lo->ldo_is_composite = 0; } - lo->ldo_pattern = v1->lmm_pattern; - if (v1->lmm_stripe_size > 0) - lo->ldo_stripe_size = v1->lmm_stripe_size; + rc = lod_alloc_comp_entries(lo, comp_cnt); + if (rc) + RETURN(rc); + + for (i = 0; i < comp_cnt; i++) { + struct pool_desc *pool; + struct lu_extent *ext; + char *pool_name; - if (lo->ldo_stripe_size & (LOV_MIN_STRIPE_SIZE - 1)) - lo->ldo_stripe_size = LOV_MIN_STRIPE_SIZE; + lod_comp = &lo->ldo_comp_entries[i]; - if (v1->lmm_stripe_count > 0) - lo->ldo_stripenr = v1->lmm_stripe_count; + if (lo->ldo_is_composite) { + v1 = (struct lov_user_md *)((char *)comp_v1 + + comp_v1->lcm_entries[i].lcme_offset); + ext = &comp_v1->lcm_entries[i].lcme_extent; + lod_comp->llc_extent = *ext; + } - lo->ldo_stripe_offset = v1->lmm_stripe_offset; + pool_name = NULL; + if (v1->lmm_magic == LOV_USER_MAGIC_V3 || + v1->lmm_magic == LOV_USER_MAGIC_SPECIFIC) { + v3 = (struct lov_user_md_v3 *)v1; - lod_object_set_pool(lo, NULL); - if (pool_name != NULL) { - struct pool_desc *pool; + if (v3->lmm_pool_name[0] != '\0') + pool_name = v3->lmm_pool_name; + + if (v3->lmm_magic == LOV_USER_MAGIC_SPECIFIC && + v3->lmm_stripe_offset == LOV_OFFSET_DEFAULT) + v3->lmm_stripe_offset = + v3->lmm_objects[0].l_ost_idx; + } + + if (v1->lmm_pattern == 0) + v1->lmm_pattern = LOV_PATTERN_RAID0; + if (lov_pattern(v1->lmm_pattern) != LOV_PATTERN_RAID0) { + CDEBUG(D_LAYOUT, "%s: invalid pattern: %x\n", + lod2obd(d)->obd_name, v1->lmm_pattern); + lod_free_comp_entries(lo); + RETURN(-EINVAL); + } + + lod_comp->llc_pattern = v1->lmm_pattern; + + lod_comp->llc_stripe_size = desc->ld_default_stripe_size; + if (v1->lmm_stripe_size > 0) + lod_comp->llc_stripe_size = v1->lmm_stripe_size; + + lod_comp->llc_stripenr = desc->ld_default_stripe_count; + if (v1->lmm_stripe_count > 0) + lod_comp->llc_stripenr = v1->lmm_stripe_count; + + lod_comp->llc_stripe_offset = v1->lmm_stripe_offset; + lod_obj_set_pool(lo, i, pool_name); + + if (pool_name == NULL) + continue; /* In the function below, .hs_keycmp resolves to * pool_hashkey_keycmp() */ /* coverity[overrun-buffer-val] */ pool = lod_find_pool(d, pool_name); - if (pool != NULL) { - if (lo->ldo_stripe_offset != LOV_OFFSET_DEFAULT) { - rc = lod_check_index_in_pool( - lo->ldo_stripe_offset, pool); - if (rc < 0) { - lod_pool_putref(pool); - CERROR("%s: invalid offset, %u\n", - lod2obd(d)->obd_name, - lo->ldo_stripe_offset); - RETURN(-EINVAL); - } - } - - if (lo->ldo_stripenr > pool_tgt_count(pool)) - lo->ldo_stripenr = pool_tgt_count(pool); + if (pool == NULL) + continue; - lod_pool_putref(pool); + if (lod_comp->llc_stripe_offset != LOV_OFFSET_DEFAULT) { + rc = lod_check_index_in_pool( + lod_comp->llc_stripe_offset, pool); + if (rc < 0) { + lod_pool_putref(pool); + CDEBUG(D_LAYOUT, "%s: invalid offset, %u\n", + lod2obd(d)->obd_name, + lod_comp->llc_stripe_offset); + lod_free_comp_entries(lo); + RETURN(-EINVAL); + } } - lod_object_set_pool(lo, pool_name); - } + if (lod_comp->llc_stripenr > pool_tgt_count(pool)) + lod_comp->llc_stripenr = pool_tgt_count(pool); - /* fixup for released file */ - if (lo->ldo_pattern & LOV_PATTERN_F_RELEASED) { - lo->ldo_released_stripenr = lo->ldo_stripenr; - lo->ldo_stripenr = 0; + lod_pool_putref(pool); } RETURN(0); @@ -1814,61 +2007,49 @@ static int lod_qos_parse_config(const struct lu_env *env, * \param[in] attr attributes OST objects will be declared with * \param[in] buf suggested striping configuration or NULL * \param[in] th transaction handle + * \param[in] comp_idx index of ldo_comp_entries + * \param[in|out]inuse array of inuse ost index * * \retval 0 on success * \retval negative negated errno on error */ -int lod_qos_prep_create(const struct lu_env *env, struct lod_object *lo, - struct lu_attr *attr, const struct lu_buf *buf, - struct thandle *th) +static int lod_qos_prep_create(const struct lu_env *env, struct lod_object *lo, + struct lu_attr *attr, const struct lu_buf *buf, + struct thandle *th, int comp_idx, + struct ost_pool *inuse) { + struct lod_layout_component *lod_comp; struct lod_device *d = lu2lod_dev(lod2lu_obj(lo)->lo_dev); struct dt_object **stripe; int stripe_len; int flag = LOV_USES_ASSIGNED_STRIPE; - int i, rc; + int i, rc = 0; ENTRY; LASSERT(lo); + LASSERT(lo->ldo_comp_cnt > comp_idx && lo->ldo_comp_entries != NULL); + lod_comp = &lo->ldo_comp_entries[comp_idx]; - /* no OST available */ - /* XXX: should we be waiting a bit to prevent failures during - * cluster initialization? */ - if (d->lod_ostnr == 0) - GOTO(out, rc = -EIO); - - /* - * by this time, the object's ldo_stripenr and ldo_stripe_size - * contain default value for striping: taken from the parent - * or from filesystem defaults - * - * in case the caller is passing lovea with new striping config, - * we may need to parse lovea and apply new configuration - */ - rc = lod_qos_parse_config(env, lo, buf); - if (rc) - GOTO(out, rc); - - /* A released file is being created */ - if (lo->ldo_stripenr == 0) - GOTO(out, rc = 0); + /* A released component is being created */ + if (lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED) + RETURN(0); - if (likely(lo->ldo_stripe == NULL)) { + if (likely(lod_comp->llc_stripe == NULL)) { struct lov_user_md *lum = NULL; /* * no striping has been created so far */ - LASSERT(lo->ldo_stripenr > 0); + LASSERT(lod_comp->llc_stripenr > 0); /* * statfs and check OST targets now, since ld_active_tgt_count * could be changed if some OSTs are [de]activated manually. */ lod_qos_statfs_update(env, d); - lo->ldo_stripenr = lod_get_stripecnt(d, LOV_MAGIC, - lo->ldo_stripenr); - - stripe_len = lo->ldo_stripenr; + stripe_len = lod_get_stripecnt(d, lo, lod_comp->llc_stripenr); + if (stripe_len == 0) + GOTO(out, rc = -ERANGE); + lod_comp->llc_stripenr = stripe_len; OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_len); if (stripe == NULL) GOTO(out, rc = -ENOMEM); @@ -1878,17 +2059,29 @@ int lod_qos_prep_create(const struct lu_env *env, struct lod_object *lo, CDEBUG(D_OTHER, "tgt_count %d stripenr %d\n", d->lod_desc.ld_tgt_count, stripe_len); - if (buf != NULL && buf->lb_buf != NULL) + if (buf != NULL && buf->lb_buf != NULL) { lum = buf->lb_buf; + if (lum->lmm_magic == LOV_USER_MAGIC_COMP_V1) { + struct lov_comp_md_v1 *comp_v1; + + comp_v1 = (struct lov_comp_md_v1 *)lum; + lum = (struct lov_user_md *)((char *)comp_v1 + + comp_v1->lcm_entries[comp_idx].lcme_offset); + } + } if (lum != NULL && lum->lmm_magic == LOV_USER_MAGIC_SPECIFIC) { - rc = lod_alloc_ost_list(env, lo, stripe, lum, th); - } else if (lo->ldo_stripe_offset == LOV_OFFSET_DEFAULT) { - rc = lod_alloc_qos(env, lo, stripe, flag, th); + rc = lod_alloc_ost_list(env, lo, stripe, lum, th, + comp_idx, inuse); + } else if (lod_comp->llc_stripe_offset == LOV_OFFSET_DEFAULT) { + rc = lod_alloc_qos(env, lo, stripe, flag, th, + comp_idx, inuse); if (rc == -EAGAIN) - rc = lod_alloc_rr(env, lo, stripe, flag, th); + rc = lod_alloc_rr(env, lo, stripe, flag, th, + comp_idx, inuse); } else { - rc = lod_alloc_specific(env, lo, stripe, flag, th); + rc = lod_alloc_specific(env, lo, stripe, flag, th, + comp_idx, inuse); } lod_putref(d, &d->lod_ost_descs); @@ -1898,21 +2091,21 @@ int lod_qos_prep_create(const struct lu_env *env, struct lod_object *lo, dt_object_put(env, stripe[i]); OBD_FREE(stripe, sizeof(stripe[0]) * stripe_len); - lo->ldo_stripenr = 0; + lod_comp->llc_stripenr = 0; } else { - lo->ldo_stripe = stripe; - lo->ldo_stripes_allocated = stripe_len; + lod_comp->llc_stripe = stripe; + lod_comp->llc_stripes_allocated = stripe_len; } - } else { + } else if (!(lod_comp->llc_flags & LCME_FL_INIT)) { /* * lod_qos_parse_config() found supplied buf as a predefined * striping (not a hint), so it allocated all the object * now we need to create them */ - for (i = 0; i < lo->ldo_stripenr; i++) { + for (i = 0; i < lod_comp->llc_stripenr; i++) { struct dt_object *o; - o = lo->ldo_stripe[i]; + o = lod_comp->llc_stripe[i]; LASSERT(o); rc = lod_sub_object_declare_create(env, o, attr, NULL, @@ -1928,3 +2121,92 @@ out: RETURN(rc); } +static int +lod_obj_stripe_set_inuse_cb(const struct lu_env *env, struct lod_object *lo, + struct dt_object *dt, struct thandle *th, + int stripe_idx, struct lod_obj_stripe_cb_data *data) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_device *d = lu2lod_dev(lod2lu_obj(lo)->lo_dev); + struct lu_fid *fid = &info->lti_fid; + __u32 index; + int rc, type = LU_SEQ_RANGE_OST; + + *fid = *lu_object_fid(&dt->do_lu); + rc = lod_fld_lookup(env, d, fid, &index, &type); + if (rc < 0) { + CERROR("%s: fail to locate "DFID": rc = %d\n", + lod2obd(d)->obd_name, PFID(fid), rc); + return rc; + } + lod_comp_ost_in_use(data->locd_inuse, index); + return 0; +} + +int lod_prepare_create(const struct lu_env *env, struct lod_object *lo, + struct lu_attr *attr, const struct lu_buf *buf, + struct thandle *th) + +{ + struct lod_device *d = lu2lod_dev(lod2lu_obj(lo)->lo_dev); + struct ost_pool inuse; + int i, rc, comp_cnt; + ENTRY; + + LASSERT(lo); + + /* no OST available */ + /* XXX: should we be waiting a bit to prevent failures during + * cluster initialization? */ + if (d->lod_ostnr == 0) + RETURN(-EIO); + + /* + * by this time, the object's ldo_stripenr and ldo_stripe_size + * contain default value for striping: taken from the parent + * or from filesystem defaults + * + * in case the caller is passing lovea with new striping config, + * we may need to parse lovea and apply new configuration + */ + rc = lod_qos_parse_config(env, lo, buf); + if (rc) + RETURN(rc); + + memset(&inuse, 0, sizeof(inuse)); + init_rwsem(&inuse.op_rw_sem); + comp_cnt = lo->ldo_comp_cnt; + + /* Prepare inuse array for composite file */ + if (lo->ldo_is_composite) { + struct lod_obj_stripe_cb_data data; + + inuse.op_size = comp_cnt * LOV_MAX_STRIPE_COUNT_OLD * + sizeof(__u32); + if (d->lod_osd_max_easize > 0 && + inuse.op_size > d->lod_osd_max_easize) + inuse.op_size = d->lod_osd_max_easize; + OBD_ALLOC(inuse.op_array, inuse.op_size); + if (inuse.op_array == NULL) + RETURN(-ENOMEM); + + data.locd_inuse = &inuse; + rc = lod_obj_for_each_stripe(env, lo, NULL, + lod_obj_stripe_set_inuse_cb, &data); + if (rc) { + OBD_FREE(inuse.op_array, inuse.op_size); + RETURN(rc); + } + } + + /* prepare OST object creation */ + for (i = 0; i < comp_cnt; i++) { + rc = lod_qos_prep_create(env, lo, attr, buf, th, i, &inuse); + if (rc) + break; + } + + if (inuse.op_size) + OBD_FREE(inuse.op_array, inuse.op_size); + RETURN(rc); +} diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 2c45545..6f9105e 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -371,9 +371,48 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt, rc = ptlrpc_queue_wait(req); - if (opcode == MDS_REINT) + if (opcode == MDS_REINT) mdc_put_mod_rpc_slot(req, NULL); + /* For XATTR_LUSTRE_LOV.add, we'd save the LOVEA for replay. */ + if (opcode == MDS_REINT && rc == 0) { + struct mdt_body *body; + struct req_capsule *pill = &req->rq_pill; + + body = req_capsule_server_get(pill, &RMF_MDT_BODY); + if (body == NULL) + GOTO(out, rc = -EPROTO); + + if (body->mbo_valid & OBD_MD_FLEASIZE) { + void *eadata, *lmm; + + eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD, + body->mbo_eadatasize); + if (eadata == NULL) + GOTO(out, rc = -EPROTO); + + if (req_capsule_get_size(pill, &RMF_EADATA, + RCL_CLIENT) < + body->mbo_eadatasize) { + rc = sptlrpc_cli_enlarge_reqbuf(req, 4, + body->mbo_eadatasize); + if (rc) + GOTO(out, rc = -ENOMEM); + } else { + req_capsule_shrink(pill, &RMF_EADATA, + body->mbo_eadatasize, + RCL_CLIENT); + } + + req_capsule_set_size(pill, &RMF_EADATA, RCL_CLIENT, + body->mbo_eadatasize); + + lmm = req_capsule_client_get(pill, &RMF_EADATA); + if (lmm) + memcpy(lmm, eadata, body->mbo_eadatasize); + } + } +out: if (rc) ptlrpc_req_finished(req); else diff --git a/lustre/mdd/mdd_object.c b/lustre/mdd/mdd_object.c index 2f8e51a..18f5ce3 100644 --- a/lustre/mdd/mdd_object.c +++ b/lustre/mdd/mdd_object.c @@ -974,7 +974,9 @@ mdd_xattr_changelog_type(const struct lu_env *env, struct mdd_device *mdd, const char *xattr_name) { /* Layout changes systematically recorded */ - if (strcmp(XATTR_NAME_LOV, xattr_name) == 0) + if (strcmp(XATTR_NAME_LOV, xattr_name) == 0 || + strncmp(XATTR_LUSTRE_LOV, xattr_name, + strlen(XATTR_LUSTRE_LOV)) == 0) return CL_LAYOUT; /* HSM information changes systematically recorded */ @@ -1353,6 +1355,90 @@ static int mdd_layout_swap_allowed(const struct lu_env *env, RETURN(0); } +/* XXX To set the proper lmm_oi & lmm_layout_gen when swap layouts, we have to + * look into the layout in MDD layer. */ +static int mdd_lmm_oi(struct lov_mds_md *lmm, struct ost_id *oi, bool get) +{ + struct lov_comp_md_v1 *comp_v1; + struct lov_mds_md *v1; + int i, ent_count; + __u32 off; + + if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_COMP_V1) { + comp_v1 = (struct lov_comp_md_v1 *)lmm; + ent_count = le16_to_cpu(comp_v1->lcm_entry_count); + + if (ent_count == 0) + return -EINVAL; + + if (get) { + off = le32_to_cpu(comp_v1->lcm_entries[0].lcme_offset); + v1 = (struct lov_mds_md *)((char *)comp_v1 + off); + *oi = v1->lmm_oi; + } else { + for (i = 0; i < le32_to_cpu(ent_count); i++) { + off = le32_to_cpu(comp_v1->lcm_entries[i]. + lcme_offset); + v1 = (struct lov_mds_md *)((char *)comp_v1 + + off); + v1->lmm_oi = *oi; + } + } + } else if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1 || + le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3) { + if (get) + *oi = lmm->lmm_oi; + else + lmm->lmm_oi = *oi; + } else { + return -EINVAL; + } + return 0; +} + +static inline int mdd_get_lmm_oi(struct lov_mds_md *lmm, struct ost_id *oi) +{ + return mdd_lmm_oi(lmm, oi, true); +} + +static inline int mdd_set_lmm_oi(struct lov_mds_md *lmm, struct ost_id *oi) +{ + return mdd_lmm_oi(lmm, oi, false); +} + +static int mdd_lmm_gen(struct lov_mds_md *lmm, __u32 *gen, bool get) +{ + struct lov_comp_md_v1 *comp_v1; + + if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_COMP_V1) { + comp_v1 = (struct lov_comp_md_v1 *)lmm; + if (get) + *gen = le32_to_cpu(comp_v1->lcm_layout_gen); + else + comp_v1->lcm_layout_gen = cpu_to_le32(*gen); + } else if (le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V1 || + le32_to_cpu(lmm->lmm_magic) == LOV_MAGIC_V3) { + __u16 tmp_gen = *gen; + if (get) + *gen = le16_to_cpu(lmm->lmm_layout_gen); + else + lmm->lmm_layout_gen = cpu_to_le16(tmp_gen); + } else { + return -EINVAL; + } + return 0; +} + +static inline int mdd_get_lmm_gen(struct lov_mds_md *lmm, __u32 *gen) +{ + return mdd_lmm_gen(lmm, gen, true); +} + +static inline int mdd_set_lmm_gen(struct lov_mds_md *lmm, __u32 *gen) +{ + return mdd_lmm_gen(lmm, gen, false); +} + /** * swap layouts between 2 lustre objects */ @@ -1372,7 +1458,7 @@ static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1, struct lu_buf *snd_hsm_buf = &info->mti_buf[3]; struct ost_id *saved_oi = NULL; struct thandle *handle; - __u16 fst_gen, snd_gen; + __u32 fst_gen, snd_gen, saved_gen; int fst_fl; int rc; int rc2; @@ -1430,10 +1516,11 @@ static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1, swap(fst_buf, snd_buf); } + fst_gen = snd_gen = 0; /* lmm and generation layout initialization */ if (fst_buf->lb_buf != NULL) { fst_lmm = fst_buf->lb_buf; - fst_gen = le16_to_cpu(fst_lmm->lmm_layout_gen); + mdd_get_lmm_gen(fst_lmm, &fst_gen); fst_fl = LU_XATTR_REPLACE; } else { fst_lmm = NULL; @@ -1442,29 +1529,45 @@ static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1, } snd_lmm = snd_buf->lb_buf; - snd_gen = le16_to_cpu(snd_lmm->lmm_layout_gen); + mdd_get_lmm_gen(snd_lmm, &snd_gen); + saved_gen = fst_gen; /* increase the generation layout numbers */ snd_gen++; fst_gen++; + /* + * XXX The layout generation is used to generate component IDs for + * the composite file, we have to do some special tweaks to make + * sure the layout generation is always adequate for that job. + */ + + /* Skip invalid generation number for composite layout */ + if ((snd_gen & LCME_ID_MASK) == 0) + snd_gen++; + if ((fst_gen & LCME_ID_MASK) == 0) + fst_gen++; + /* Make sure the generation is greater than all the component IDs */ + if (fst_gen < snd_gen) + fst_gen = snd_gen; + else if (fst_gen > snd_gen) + snd_gen = fst_gen; + /* set the file specific informations in lmm */ if (fst_lmm != NULL) { saved_oi = &info->mti_oa.o_oi; - - *saved_oi = fst_lmm->lmm_oi; - fst_lmm->lmm_layout_gen = cpu_to_le16(snd_gen); - fst_lmm->lmm_oi = snd_lmm->lmm_oi; - snd_lmm->lmm_oi = *saved_oi; + mdd_get_lmm_oi(fst_lmm, saved_oi); + mdd_set_lmm_gen(fst_lmm, &snd_gen); + mdd_set_lmm_oi(fst_lmm, &snd_lmm->lmm_oi); + mdd_set_lmm_oi(snd_lmm, saved_oi); } else { - if (snd_lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V1)) - snd_lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1_DEF); - else if (snd_lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_V3)) - snd_lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V3_DEF); + if ((snd_lmm->lmm_magic & cpu_to_le32(LOV_MAGIC_MASK)) == + cpu_to_le32(LOV_MAGIC_MAGIC)) + snd_lmm->lmm_magic |= cpu_to_le32(LOV_MAGIC_DEF); else GOTO(stop, rc = -EPROTO); } - snd_lmm->lmm_layout_gen = cpu_to_le16(fst_gen); + mdd_set_lmm_gen(snd_lmm, &fst_gen); /* Prepare HSM attribute if it's required */ if (flags & SWAP_LAYOUTS_MDS_HSM) { @@ -1554,8 +1657,8 @@ static int mdd_swap_layouts(const struct lu_env *env, struct md_object *obj1, /* failure on second file, but first was done, so we have * to roll back first. */ if (fst_buf->lb_buf != NULL) { - fst_lmm->lmm_oi = *saved_oi; - fst_lmm->lmm_layout_gen = cpu_to_le16(fst_gen - 1); + mdd_set_lmm_oi(fst_lmm, saved_oi); + mdd_set_lmm_gen(fst_lmm, &saved_gen); rc2 = mdo_xattr_set(env, fst_o, fst_buf, XATTR_NAME_LOV, LU_XATTR_REPLACE, handle); } else { diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index c732096..dd3e494 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -522,6 +522,30 @@ int mdt_pack_acl2body(struct mdt_thread_info *info, struct mdt_body *repbody, } #endif +/* XXX Look into layout in MDT layer. */ +static inline bool mdt_hsm_is_released(struct lov_mds_md *lmm) +{ + struct lov_comp_md_v1 *comp_v1; + struct lov_mds_md *v1; + int i; + + if (lmm->lmm_magic == LOV_MAGIC_COMP_V1) { + comp_v1 = (struct lov_comp_md_v1 *)lmm; + + for (i = 0; i < comp_v1->lcm_entry_count; i++) { + v1 = (struct lov_mds_md *)((char *)comp_v1 + + comp_v1->lcm_entries[i].lcme_offset); + /* We don't support partial release for now */ + if (!(v1->lmm_pattern & LOV_PATTERN_F_RELEASED)) + return false; + } + return true; + } else { + return (lmm->lmm_pattern & LOV_PATTERN_F_RELEASED) ? + true : false; + } +} + void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b, const struct lu_attr *attr, const struct lu_fid *fid) { @@ -600,7 +624,7 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b, * b=22272 */ b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS; } else if ((ma->ma_valid & MA_LOV) && ma->ma_lmm != NULL && - ma->ma_lmm->lmm_pattern & LOV_PATTERN_F_RELEASED) { + mdt_hsm_is_released(ma->ma_lmm)) { /* A released file stores its size on MDS. */ /* But return 1 block for released file, unless tools like tar * will consider it fully sparse. (LU-3864) @@ -947,7 +971,6 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, struct lu_buf *buffer = &info->mti_buf; struct obd_export *exp = info->mti_exp; int rc; - int is_root; ENTRY; if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) @@ -1029,32 +1052,6 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, repbody->mbo_t_state = MS_RESTORE; } - is_root = lu_fid_eq(mdt_object_fid(o), &info->mti_mdt->mdt_md_root_fid); - - /* the Lustre protocol supposes to return default striping - * on the user-visible root if explicitly requested */ - if ((ma->ma_valid & MA_LOV) == 0 && S_ISDIR(la->la_mode) && - (ma->ma_need & MA_LOV_DEF && is_root) && ma->ma_need & MA_LOV) { - struct lu_fid rootfid; - struct mdt_object *root; - struct mdt_device *mdt = info->mti_mdt; - - rc = dt_root_get(env, mdt->mdt_bottom, &rootfid); - if (rc) - RETURN(rc); - root = mdt_object_find(env, mdt, &rootfid); - if (IS_ERR(root)) - RETURN(PTR_ERR(root)); - rc = mdt_stripe_get(info, root, ma, XATTR_NAME_LOV); - mdt_object_put(info->mti_env, root); - if (unlikely(rc)) { - CERROR("%s: getattr error for "DFID": rc = %d\n", - mdt_obd_name(info->mti_mdt), - PFID(mdt_object_fid(o)), rc); - RETURN(rc); - } - } - if (likely(ma->ma_valid & MA_INODE)) mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o)); else diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index 98ce152..f2fe618 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -638,6 +638,7 @@ int mdt_name_unpack(struct req_capsule *pill, enum mdt_name_flags flags); int mdt_close_unpack(struct mdt_thread_info *info); int mdt_reint_unpack(struct mdt_thread_info *info, __u32 op); +void mdt_fix_lov_magic(struct mdt_thread_info *info); int mdt_reint_rec(struct mdt_thread_info *, struct mdt_lock_handle *); #ifdef CONFIG_FS_POSIX_ACL int mdt_pack_acl2body(struct mdt_thread_info *info, struct mdt_body *repbody, diff --git a/lustre/mdt/mdt_lib.c b/lustre/mdt/mdt_lib.c index 23726da..a070d3b 100644 --- a/lustre/mdt/mdt_lib.c +++ b/lustre/mdt/mdt_lib.c @@ -1292,7 +1292,7 @@ static int mdt_rename_unpack(struct mdt_thread_info *info) /* * please see comment above LOV_MAGIC_V1_DEF */ -static void mdt_fix_lov_magic(struct mdt_thread_info *info) +void mdt_fix_lov_magic(struct mdt_thread_info *info) { struct mdt_reint_record *rr = &info->mti_rr; struct lov_user_md_v1 *v1; @@ -1301,15 +1301,11 @@ static void mdt_fix_lov_magic(struct mdt_thread_info *info) LASSERT(v1); if (unlikely(req_is_replay(mdt_info_req(info)))) { - if (v1->lmm_magic == LOV_USER_MAGIC_V1) { - v1->lmm_magic = LOV_MAGIC_V1_DEF; - } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1)) { - v1->lmm_magic = __swab32(LOV_MAGIC_V1_DEF); - } else if (v1->lmm_magic == LOV_USER_MAGIC_V3) { - v1->lmm_magic = LOV_MAGIC_V3_DEF; - } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3)) { - v1->lmm_magic = __swab32(LOV_MAGIC_V3_DEF); - } + if ((v1->lmm_magic & LOV_MAGIC_MASK) == LOV_MAGIC_MAGIC) + v1->lmm_magic |= LOV_MAGIC_DEF; + else if ((v1->lmm_magic & __swab32(LOV_MAGIC_MAGIC)) == + __swab32(LOV_MAGIC_MAGIC)) + v1->lmm_magic |= __swab32(LOV_MAGIC_DEF); } } diff --git a/lustre/mdt/mdt_open.c b/lustre/mdt/mdt_open.c index 799b145..934c6b8 100644 --- a/lustre/mdt/mdt_open.c +++ b/lustre/mdt/mdt_open.c @@ -1665,6 +1665,31 @@ out: return obj; } +/* XXX Look into layout in MDT layer. */ +static inline int mdt_hsm_set_released(struct lov_mds_md *lmm) +{ + struct lov_comp_md_v1 *comp_v1; + struct lov_mds_md *v1; + __u32 off; + int i; + + if (lmm->lmm_magic == cpu_to_le32(LOV_MAGIC_COMP_V1_DEF)) { + comp_v1 = (struct lov_comp_md_v1 *)lmm; + + if (comp_v1->lcm_entry_count == 0) + return -EINVAL; + + for (i = 0; i < le32_to_cpu(comp_v1->lcm_entry_count); i++) { + off = le32_to_cpu(comp_v1->lcm_entries[i].lcme_offset); + v1 = (struct lov_mds_md *)((char *)comp_v1 + off); + v1->lmm_pattern |= cpu_to_le32(LOV_PATTERN_F_RELEASED); + } + } else { + lmm->lmm_pattern |= cpu_to_le32(LOV_PATTERN_F_RELEASED); + } + return 0; +} + static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o, struct md_attr *ma) { @@ -1758,19 +1783,20 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o, ma->ma_lmm->lmm_stripe_size = cpu_to_le32(LOV_MIN_STRIPE_SIZE); ma->ma_lmm_size = sizeof(*ma->ma_lmm); } else { - /* Magic must be LOV_MAGIC_Vx_DEF otherwise LOD will interpret + /* Magic must be LOV_MAGIC_*_DEF otherwise LOD will interpret * ma_lmm as lov_user_md, then it will be confused by union of * layout_gen and stripe_offset. */ - if (le32_to_cpu(ma->ma_lmm->lmm_magic) == LOV_MAGIC_V1) - ma->ma_lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V1_DEF); - else if (le32_to_cpu(ma->ma_lmm->lmm_magic) == LOV_MAGIC_V3) - ma->ma_lmm->lmm_magic = cpu_to_le32(LOV_MAGIC_V3_DEF); + if ((le32_to_cpu(ma->ma_lmm->lmm_magic) & LOV_MAGIC_MASK) == + LOV_MAGIC_MAGIC) + ma->ma_lmm->lmm_magic |= cpu_to_le32(LOV_MAGIC_DEF); else GOTO(out_unlock, rc = -EINVAL); } - /* Set file as released */ - ma->ma_lmm->lmm_pattern |= cpu_to_le32(LOV_PATTERN_F_RELEASED); + /* Set file as released. */ + rc = mdt_hsm_set_released(ma->ma_lmm); + if (rc) + GOTO(out_unlock, rc); orp_ma = &info->mti_u.hsm.attr; orp_ma->ma_attr.la_mode = S_IFREG | S_IWUSR; diff --git a/lustre/mdt/mdt_xattr.c b/lustre/mdt/mdt_xattr.c index 890f075..ec2cb6a 100644 --- a/lustre/mdt/mdt_xattr.c +++ b/lustre/mdt/mdt_xattr.c @@ -261,8 +261,9 @@ int mdt_reint_setxattr(struct mdt_thread_info *info, __u64 valid = attr->la_valid; const char *xattr_name = rr->rr_name.ln_name; int xattr_len = rr->rr_eadatalen; - __u64 lockpart; + __u64 lockpart = MDS_INODELOCK_UPDATE; int rc; + bool reply_ea = false; ENTRY; CDEBUG(D_INODE, "setxattr for "DFID": %s %s\n", PFID(rr->rr_fid1), @@ -320,9 +321,41 @@ int mdt_reint_setxattr(struct mdt_thread_info *info, /* ACLs were mapped out, return an error so the user knows */ if (rc != xattr_len) GOTO(out, rc = -EPERM); + } else if ((strlen(xattr_name) > strlen(XATTR_LUSTRE_LOV) + 1) && + strncmp(xattr_name, XATTR_LUSTRE_LOV, + strlen(XATTR_LUSTRE_LOV)) == 0) { + + if (strncmp(xattr_name, XATTR_LUSTRE_LOV".add", + strlen(XATTR_LUSTRE_LOV".add")) && + strncmp(xattr_name, XATTR_LUSTRE_LOV".set", + strlen(XATTR_LUSTRE_LOV".set")) && + strncmp(xattr_name, XATTR_LUSTRE_LOV".del", + strlen(XATTR_LUSTRE_LOV".del"))) { + CERROR("%s: invalid xattr name: %s\n", + mdt_obd_name(info->mti_mdt), xattr_name); + GOTO(out, rc = -EINVAL); + } + + lockpart |= MDS_INODELOCK_LAYOUT; + + /* + * For XATTR_LUSTRE_LOV.add, we'd reply LOVEA to client, + * client will save it for replay. + */ + if (strncmp(xattr_name, XATTR_LUSTRE_LOV".add", + strlen(XATTR_LUSTRE_LOV".add")) == 0 && + req_capsule_has_field(&req->rq_pill, &RMF_MDT_MD, + RCL_SERVER)) { + /* + * Don't need to reply LOVEA for replay request, + * it's already stored in client request. + */ + if (!req_is_replay(req)) + reply_ea = true; + mdt_fix_lov_magic(info); + } } - lockpart = MDS_INODELOCK_UPDATE; /* Revoke all clients' lookup lock, since the access * permissions for this inode is changed when ACL_ACCESS is * set. This isn't needed for ACL_DEFAULT, since that does @@ -392,6 +425,27 @@ int mdt_reint_setxattr(struct mdt_thread_info *info, CDEBUG(D_INFO, "valid bits: %#llx\n", valid); rc = -EINVAL; } + + if (reply_ea && rc == 0) { + ma->ma_lmm = req_capsule_server_get(&req->rq_pill, &RMF_MDT_MD); + ma->ma_lmm_size = req_capsule_get_size(&req->rq_pill, + &RMF_MDT_MD, RCL_SERVER); + ma->ma_need = MA_LOV; + ma->ma_valid = 0; + if (ma->ma_lmm_size > 0) + rc = mdt_attr_get_complex(info, obj, ma); + + if (ma->ma_valid & MA_LOV) { + struct mdt_body *repbody; + + repbody = req_capsule_server_get(&req->rq_pill, + &RMF_MDT_BODY); + LASSERT(ma->ma_lmm_size != 0); + repbody->mbo_eadatasize = ma->ma_lmm_size; + repbody->mbo_valid |= OBD_MD_FLEASIZE; + } + } + if (rc == 0) mdt_counter_incr(req, LPROC_MDT_SETXATTR); diff --git a/lustre/ptlrpc/layout.c b/lustre/ptlrpc/layout.c index 332d1cc..1a7f069 100644 --- a/lustre/ptlrpc/layout.c +++ b/lustre/ptlrpc/layout.c @@ -319,6 +319,12 @@ static const struct req_msg_field *mds_reint_setxattr_client[] = { &RMF_DLM_REQ }; +static const struct req_msg_field *mds_reint_setxattr_server[] = { + &RMF_PTLRPC_BODY, + &RMF_MDT_BODY, + &RMF_MDT_MD +}; + static const struct req_msg_field *mdt_swap_layouts[] = { &RMF_PTLRPC_BODY, &RMF_MDT_BODY, @@ -1450,7 +1456,7 @@ EXPORT_SYMBOL(RQF_MDS_REINT_SETATTR); struct req_format RQF_MDS_REINT_SETXATTR = DEFINE_REQ_FMT0("MDS_REINT_SETXATTR", - mds_reint_setxattr_client, mdt_body_only); + mds_reint_setxattr_client, mds_reint_setxattr_server); EXPORT_SYMBOL(RQF_MDS_REINT_SETXATTR); struct req_format RQF_MDS_CONNECT = diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index e9ee41f..231a94b 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -2115,12 +2115,9 @@ void lustre_swab_lmv_user_md(struct lmv_user_md *lum) } EXPORT_SYMBOL(lustre_swab_lmv_user_md); -void lustre_print_user_md(unsigned int lvl, struct lov_user_md *lum, - const char *msg) +static void lustre_print_v1v3(unsigned int lvl, struct lov_user_md *lum, + const char *msg) { - if (likely(!cfs_cdebug_show(lvl, DEBUG_SUBSYSTEM))) - return; - CDEBUG(lvl, "%s lov_user_md %p:\n", msg, lum); CDEBUG(lvl, "\tlmm_magic: %#x\n", lum->lmm_magic); CDEBUG(lvl, "\tlmm_pattern: %#x\n", lum->lmm_pattern); @@ -2146,6 +2143,54 @@ void lustre_print_user_md(unsigned int lvl, struct lov_user_md *lum, CDEBUG(lvl, "\t\t%u\n", v3->lmm_objects[i].l_ost_idx); } } + +void lustre_print_user_md(unsigned int lvl, struct lov_user_md *lum, + const char *msg) +{ + struct lov_comp_md_v1 *comp_v1; + int i; + + if (likely(!cfs_cdebug_show(lvl, DEBUG_SUBSYSTEM))) + return; + + if (lum->lmm_magic == LOV_USER_MAGIC_V1 || + lum->lmm_magic == LOV_USER_MAGIC_V3) { + lustre_print_v1v3(lvl, lum, msg); + return; + } + + if (lum->lmm_magic != LOV_USER_MAGIC_COMP_V1) { + CDEBUG(lvl, "%s: bad magic: %x\n", msg, lum->lmm_magic); + return; + } + + comp_v1 = (struct lov_comp_md_v1 *)lum; + CDEBUG(lvl, "%s: lov_comp_md_v1 %p:\n", msg, lum); + CDEBUG(lvl, "\tlcm_magic: %#x\n", comp_v1->lcm_magic); + CDEBUG(lvl, "\tlcm_size: %#x\n", comp_v1->lcm_size); + CDEBUG(lvl, "\tlcm_layout_gen: %#x\n", comp_v1->lcm_layout_gen); + CDEBUG(lvl, "\tlcm_flags: %#x\n", comp_v1->lcm_flags); + CDEBUG(lvl, "\tlcm_entry_count: %#x\n\n", comp_v1->lcm_entry_count); + + for (i = 0; i < comp_v1->lcm_entry_count; i++) { + struct lov_comp_md_entry_v1 *ent = &comp_v1->lcm_entries[i]; + struct lov_user_md *v1; + + CDEBUG(lvl, "\tentry %d:\n", i); + CDEBUG(lvl, "\tlcme_id: %#x\n", ent->lcme_id); + CDEBUG(lvl, "\tlcme_flags: %#x\n", ent->lcme_flags); + CDEBUG(lvl, "\tlcme_extent.e_start: %llu\n", + ent->lcme_extent.e_start); + CDEBUG(lvl, "\tlcme_extent.e_end: %llu\n", + ent->lcme_extent.e_end); + CDEBUG(lvl, "\tlcme_offset: %#x\n", ent->lcme_offset); + CDEBUG(lvl, "\tlcme_size: %#x\n\n", ent->lcme_size); + + v1 = (struct lov_user_md *)((char *)comp_v1 + + comp_v1->lcm_entries[i].lcme_offset); + lustre_print_v1v3(lvl, v1, msg); + } +} EXPORT_SYMBOL(lustre_print_user_md); static void lustre_swab_lmm_oi(struct ost_id *oi) -- 1.8.3.1