From: Mikhal Pershin Date: Wed, 26 Apr 2017 11:24:57 +0000 (+0300) Subject: LU-3285 lov: add MDT target to the LOV device X-Git-Tag: 2.10.56~64^2~20 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=8b352709a66f9079cadeda2e9af3834941ced969 LU-3285 lov: add MDT target to the LOV device MDC becomes LOV target like OSC for Data-on-MDT needs. Patch does the following: - new composite layout entry type is added - LLT_DOM to describe Data-on-MDT striping. - LOV process config log and checks for MDC targets organizing them separately from OSCs - LOV operations are changed where needed to understand new layout entry type Signed-off-by: Mikhail Pershin Change-Id: I8a985d66a5f283ed387a311ff46f307c60317a79 Reviewed-on: https://review.whamcloud.com/28010 Tested-by: Jenkins Reviewed-by: Jinshan Xiong Tested-by: Maloo Reviewed-by: Andreas Dilger --- diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 29a3cb9..ccd7fd9 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -372,6 +372,11 @@ struct lov_tgt_desc { ltd_reap:1; /* should this target be deleted */ }; +struct lov_md_tgt_desc { + struct obd_device *lmtd_mdc; + __u32 lmtd_index; +}; + struct lov_obd { struct lov_desc desc; struct lov_tgt_desc **lov_tgts; /* sparse array */ @@ -394,10 +399,13 @@ struct lov_obd { struct cl_client_cache *lov_cache; struct rw_semaphore lov_notify_lock; + /* Data-on-MDT: MDC array */ + struct lov_md_tgt_desc *lov_mdc_tgts; }; struct lmv_tgt_desc { struct obd_uuid ltd_uuid; + struct obd_device *ltd_obd; struct obd_export *ltd_exp; __u32 ltd_idx; struct mutex ltd_fid_mutex; diff --git a/lustre/include/uapi/linux/lustre/lustre_user.h b/lustre/include/uapi/linux/lustre/lustre_user.h index bb18450..f4adad5 100644 --- a/lustre/include/uapi/linux/lustre/lustre_user.h +++ b/lustre/include/uapi/linux/lustre/lustre_user.h @@ -432,7 +432,7 @@ enum ll_lease_type { #define LOV_PATTERN_NONE 0x000 #define LOV_PATTERN_RAID0 0x001 #define LOV_PATTERN_RAID1 0x002 -#define LOV_PATTERN_FIRST 0x100 +#define LOV_PATTERN_MDT 0x100 #define LOV_PATTERN_CMOBD 0x200 #define LOV_PATTERN_F_MASK 0xffff0000 @@ -443,6 +443,7 @@ enum ll_lease_type { static inline bool lov_pattern_supported(__u32 pattern) { return pattern == LOV_PATTERN_RAID0 || + pattern == LOV_PATTERN_MDT || pattern == (LOV_PATTERN_RAID0 | LOV_PATTERN_F_RELEASED); } diff --git a/lustre/lmv/lmv_obd.c b/lustre/lmv/lmv_obd.c index 19bbe64..9034ba0 100644 --- a/lustre/lmv/lmv_obd.c +++ b/lustre/lmv/lmv_obd.c @@ -418,7 +418,7 @@ static int lmv_add_target(struct obd_device *obd, struct obd_uuid *uuidp, mutex_lock(&lmv->lmv_init_mutex); if ((index < lmv->tgts_size) && (lmv->tgts[index] != NULL)) { tgt = lmv->tgts[index]; - CERROR("%s: UUID %s already assigned at LOV target index %d:" + CERROR("%s: UUID %s already assigned at LMV target index %d:" " rc = %d\n", obd->obd_name, obd_uuid2str(&tgt->ltd_uuid), index, -EEXIST); mutex_unlock(&lmv->lmv_init_mutex); diff --git a/lustre/lov/lov_cl_internal.h b/lustre/lov/lov_cl_internal.h index ca40cc8..f97747d 100644 --- a/lustre/lov/lov_cl_internal.h +++ b/lustre/lov/lov_cl_internal.h @@ -91,6 +91,12 @@ enum lov_device_flags { * Upper half. */ +/* Data-on-MDT array item in lov_device::ld_md_tgts[] */ +struct lovdom_device { + struct cl_device *ldm_mdc; + int ldm_idx; +}; + struct lov_device { /* * XXX Locking of lov-private data is missing. @@ -101,6 +107,13 @@ struct lov_device { __u32 ld_target_nr; struct lovsub_device **ld_target; __u32 ld_flags; + + /* Data-on-MDT devices */ + __u32 ld_md_tgts_nr; + struct lovdom_device *ld_md_tgts; + struct obd_device *ld_lmv; + /* LU site for subdevices */ + struct lu_site ld_site; }; /** @@ -129,6 +142,34 @@ static inline char *llt2str(enum lov_layout_type llt) return ""; } +/** + * Return lov_layout_entry_type associated with a given composite layout + * entry. + */ +static inline __u32 lov_entry_type(struct lov_stripe_md_entry *lsme) +{ + if ((lov_pattern(lsme->lsme_pattern) == LOV_PATTERN_RAID0) || + (lov_pattern(lsme->lsme_pattern) == LOV_PATTERN_MDT)) + return lov_pattern(lsme->lsme_pattern); + return 0; +} + +struct lov_layout_entry; +struct lov_object; +struct lov_lock_sub; + +struct lov_comp_layout_entry_ops { + int (*lco_init)(const struct lu_env *env, struct lov_device *dev, + struct lov_object *lov, unsigned int index, + const struct cl_object_conf *conf, + struct lov_layout_entry *lle); + void (*lco_fini)(const struct lu_env *env, + struct lov_layout_entry *lle); + int (*lco_getattr)(const struct lu_env *env, struct lov_object *obj, + unsigned int index, struct lov_layout_entry *lle, + struct cl_attr **attr); +}; + struct lov_layout_raid0 { unsigned lo_nr; /** @@ -165,6 +206,25 @@ struct lov_layout_raid0 { struct cl_attr lo_attr; }; +struct lov_layout_dom { + /* keep this always at first place so DOM layout entry + * can be addressed also as RAID0 after initialization. + */ + struct lov_layout_raid0 lo_dom_r0; + struct lovsub_object *lo_dom; + struct lov_oinfo *lo_loi; +}; + +struct lov_layout_entry { + __u32 lle_type; + struct lu_extent lle_extent; + struct lov_comp_layout_entry_ops *lle_comp_ops; + union { + struct lov_layout_raid0 lle_raid0; + struct lov_layout_dom lle_dom; + }; +}; + /** * lov-specific file state. * @@ -180,7 +240,7 @@ struct lov_layout_raid0 { * function corresponding to the current layout type. */ struct lov_object { - struct cl_object lo_cl; + struct cl_object lo_cl; /** * Serializes object operations with transitions between layout types. * @@ -220,13 +280,10 @@ struct lov_object { } released; struct lov_layout_composite { /** - * Current valid entry count of lo_entries. + * Current valid entry count of entries. */ unsigned int lo_entry_count; - struct lov_layout_entry { - struct lu_extent lle_extent; - struct lov_layout_raid0 lle_raid0; - } *lo_entries; + struct lov_layout_entry *lo_entries; } composite; } u; /** @@ -634,6 +691,15 @@ static inline struct lov_thread_info *lov_env_info(const struct lu_env *env) return info; } +static inline struct lov_layout_entry *lov_entry(struct lov_object *lov, int i) +{ + LASSERT(lov->lo_type == LLT_COMP); + LASSERTF(i < lov->u.composite.lo_entry_count, + "entry %d entry_count %d", i, lov->u.composite.lo_entry_count); + + return &lov->u.composite.lo_entries[i]; +} + static inline struct lov_layout_raid0 *lov_r0(struct lov_object *lov, int i) { LASSERT(lov->lo_type == LLT_COMP); diff --git a/lustre/lov/lov_dev.c b/lustre/lov/lov_dev.c index 2506c39..bf9dba2 100644 --- a/lustre/lov/lov_dev.c +++ b/lustre/lov/lov_dev.c @@ -142,64 +142,114 @@ struct lu_context_key lov_session_key = { /* type constructor/destructor: lov_type_{init,fini,start,stop}() */ LU_TYPE_INIT_FINI(lov, &lov_key, &lov_session_key); + +static int lov_mdc_dev_init(const struct lu_env *env, struct lov_device *ld, + struct lu_device *mdc_dev, __u32 idx, __u32 nr) +{ + struct cl_device *cl; + + ENTRY; + cl = cl_type_setup(env, &ld->ld_site, &lovsub_device_type, + mdc_dev); + if (IS_ERR(cl)) + RETURN(PTR_ERR(cl)); + + ld->ld_md_tgts[nr].ldm_mdc = cl; + ld->ld_md_tgts[nr].ldm_idx = idx; + RETURN(0); +} + static struct lu_device *lov_device_fini(const struct lu_env *env, - struct lu_device *d) + struct lu_device *d) { - int i; - struct lov_device *ld = lu2lov_dev(d); + struct lov_device *ld = lu2lov_dev(d); + int i; - LASSERT(ld->ld_lov != NULL); - if (ld->ld_target == NULL) - RETURN(NULL); + LASSERT(ld->ld_lov != NULL); - lov_foreach_target(ld, i) { - struct lovsub_device *lsd; + if (ld->ld_lmv != NULL) { + class_decref(ld->ld_lmv, "lov", d); + ld->ld_lmv = NULL; + } - lsd = ld->ld_target[i]; - if (lsd != NULL) { - cl_stack_fini(env, lovsub2cl_dev(lsd)); - ld->ld_target[i] = NULL; - } - } - RETURN(NULL); + if (ld->ld_md_tgts != NULL) { + for (i = 0; i < ld->ld_md_tgts_nr; i++) { + if (ld->ld_md_tgts[i].ldm_mdc == NULL) + continue; + + cl_stack_fini(env, ld->ld_md_tgts[i].ldm_mdc); + ld->ld_md_tgts[i].ldm_mdc = NULL; + ld->ld_lov->lov_mdc_tgts[i].lmtd_mdc = NULL; + } + } + + if (ld->ld_target != NULL) { + lov_foreach_target(ld, i) { + struct lovsub_device *lsd; + + lsd = ld->ld_target[i]; + if (lsd != NULL) { + cl_stack_fini(env, lovsub2cl_dev(lsd)); + ld->ld_target[i] = NULL; + } + } + } + RETURN(NULL); } static int lov_device_init(const struct lu_env *env, struct lu_device *d, const char *name, struct lu_device *next) { - struct lov_device *ld = lu2lov_dev(d); - int i; - int rc = 0; - - LASSERT(d->ld_site != NULL); - if (ld->ld_target == NULL) - RETURN(rc); - - lov_foreach_target(ld, i) { - struct lovsub_device *lsd; - struct cl_device *cl; - struct lov_tgt_desc *desc; - - desc = ld->ld_lov->lov_tgts[i]; - if (desc == NULL) - continue; - - cl = cl_type_setup(env, d->ld_site, &lovsub_device_type, - desc->ltd_obd->obd_lu_dev); - if (IS_ERR(cl)) { - rc = PTR_ERR(cl); - break; - } - lsd = cl2lovsub_dev(cl); - ld->ld_target[i] = lsd; - } + struct lov_device *ld = lu2lov_dev(d); + int i; + int rc = 0; + + /* check all added already MDC subdevices and initialize them */ + for (i = 0; i < ld->ld_md_tgts_nr; i++) { + struct obd_device *mdc; + __u32 idx; + + mdc = ld->ld_lov->lov_mdc_tgts[i].lmtd_mdc; + idx = ld->ld_lov->lov_mdc_tgts[i].lmtd_index; + + if (mdc == NULL) + continue; + + rc = lov_mdc_dev_init(env, ld, mdc->obd_lu_dev, idx, i); + if (rc) { + CERROR("%s: failed to add MDC %s as target: rc = %d\n", + d->ld_obd->obd_name, + obd_uuid2str(&mdc->obd_uuid), rc); + GOTO(out_err, rc); + } + } - if (rc) - lov_device_fini(env, d); - else - ld->ld_flags |= LOV_DEV_INITIALIZED; + if (ld->ld_target == NULL) + RETURN(0); - RETURN(rc); + lov_foreach_target(ld, i) { + struct lovsub_device *lsd; + struct cl_device *cl; + struct lov_tgt_desc *desc; + + desc = ld->ld_lov->lov_tgts[i]; + if (desc == NULL) + continue; + + cl = cl_type_setup(env, &ld->ld_site, &lovsub_device_type, + desc->ltd_obd->obd_lu_dev); + if (IS_ERR(cl)) + GOTO(out_err, rc = PTR_ERR(cl)); + + lsd = cl2lovsub_dev(cl); + ld->ld_target[i] = lsd; + } + ld->ld_flags |= LOV_DEV_INITIALIZED; + RETURN(0); + +out_err: + lu_device_fini(d); + RETURN(rc); } /* Free the lov specific data created for the back end lu_device. */ @@ -209,9 +259,24 @@ static struct lu_device *lov_device_free(const struct lu_env *env, struct lov_device *ld = lu2lov_dev(d); const int nr = ld->ld_target_nr; + lu_site_fini(&ld->ld_site); + cl_device_fini(lu2cl_dev(d)); - if (ld->ld_target != NULL) + if (ld->ld_target) { OBD_FREE(ld->ld_target, nr * sizeof ld->ld_target[0]); + ld->ld_target = NULL; + } + if (ld->ld_md_tgts) { + OBD_FREE(ld->ld_md_tgts, + sizeof(*ld->ld_md_tgts) * LOV_MDC_TGT_MAX); + ld->ld_md_tgts = NULL; + } + /* free array of MDCs */ + if (ld->ld_lov->lov_mdc_tgts) { + OBD_FREE(ld->ld_lov->lov_mdc_tgts, + sizeof(*ld->ld_lov->lov_mdc_tgts) * LOV_MDC_TGT_MAX); + ld->ld_lov->lov_mdc_tgts = NULL; + } OBD_FREE_PTR(ld); return NULL; @@ -283,11 +348,9 @@ static int lov_cl_add_target(const struct lu_env *env, struct lu_device *dev, RETURN(-EINVAL); } - rc = lov_expand_targets(env, ld); - if (rc == 0 && ld->ld_flags & LOV_DEV_INITIALIZED) { - LASSERT(dev->ld_site != NULL); - - cl = cl_type_setup(env, dev->ld_site, &lovsub_device_type, + rc = lov_expand_targets(env, ld); + if (rc == 0 && ld->ld_flags & LOV_DEV_INITIALIZED) { + cl = cl_type_setup(env, &ld->ld_site, &lovsub_device_type, tgt->ltd_obd->obd_lu_dev); if (!IS_ERR(cl)) { lsd = cl2lovsub_dev(cl); @@ -303,34 +366,139 @@ static int lov_cl_add_target(const struct lu_env *env, struct lu_device *dev, RETURN(rc); } -static int lov_process_config(const struct lu_env *env, - struct lu_device *d, struct lustre_cfg *cfg) +/** + * Add new MDC target device in LOV. + * + * This function is part of the configuration log processing. It adds new MDC + * device to the MDC device array indexed by their indexes. + * + * \param[in] env execution environment + * \param[in] d LU device of LOV device + * \param[in] mdc MDC device to add + * \param[in] idx MDC device index + * + * \retval 0 if successful + * \retval negative value on error + */ +static int lov_add_mdc_target(const struct lu_env *env, struct lu_device *d, + struct obd_device *mdc, __u32 idx) { - struct obd_device *obd = d->ld_obd; - int cmd; - int rc; - int gen; - __u32 index; + struct lov_device *ld = lu2lov_dev(d); + struct obd_device *lov_obd = d->ld_obd; + struct obd_device *lmv_obd; + int next; + int rc = 0; - obd_getref(obd); + ENTRY; - cmd = cfg->lcfg_command; - rc = lov_process_config_base(d->ld_obd, cfg, &index, &gen); - if (rc == 0) { - switch(cmd) { - case LCFG_LOV_ADD_OBD: - case LCFG_LOV_ADD_INA: - rc = lov_cl_add_target(env, d, index); - if (rc != 0) - lov_del_target(d->ld_obd, index, NULL, 0); - break; - case LCFG_LOV_DEL_OBD: - lov_cl_del_target(env, d, index); - break; - } - } - obd_putref(obd); - RETURN(rc); + LASSERT(mdc != NULL); + if (ld->ld_md_tgts_nr == LOV_MDC_TGT_MAX) { + /* If the maximum value of LOV_MDC_TGT_MAX will become too + * small then all MD target handling must be rewritten in LOD + * manner, check lod_add_device() and related functionality. + */ + CERROR("%s: cannot serve more than %d MDC devices\n", + lov_obd->obd_name, LOV_MDC_TGT_MAX); + RETURN(-ERANGE); + } + + /* grab FLD from lmv, do that here, when first MDC is added + * to be sure LMV is set up and can be found */ + if (ld->ld_lmv == NULL) { + next = 0; + while ((lmv_obd = class_devices_in_group(&lov_obd->obd_uuid, + &next)) != NULL) { + if ((strncmp(lmv_obd->obd_type->typ_name, + LUSTRE_LMV_NAME, + strlen(LUSTRE_LMV_NAME)) == 0)) + break; + } + if (lmv_obd == NULL) { + CERROR("%s: cannot find LMV OBD by UUID (%s)\n", + lov_obd->obd_name, + obd_uuid2str(&lmv_obd->obd_uuid)); + RETURN(-ENODEV); + } + spin_lock(&lmv_obd->obd_dev_lock); + class_incref(lmv_obd, "lov", ld); + spin_unlock(&lmv_obd->obd_dev_lock); + ld->ld_lmv = lmv_obd; + } + + LASSERT(lov_obd->u.lov.lov_mdc_tgts[ld->ld_md_tgts_nr].lmtd_mdc == + NULL); + + if (ld->ld_flags & LOV_DEV_INITIALIZED) { + rc = lov_mdc_dev_init(env, ld, mdc->obd_lu_dev, idx, + ld->ld_md_tgts_nr); + if (rc) { + CERROR("%s: failed to add MDC %s as target: rc = %d\n", + lov_obd->obd_name, obd_uuid2str(&mdc->obd_uuid), + rc); + RETURN(rc); + } + } + + lov_obd->u.lov.lov_mdc_tgts[ld->ld_md_tgts_nr].lmtd_mdc = mdc; + lov_obd->u.lov.lov_mdc_tgts[ld->ld_md_tgts_nr].lmtd_index = idx; + ld->ld_md_tgts_nr++; + + RETURN(rc); +} + +static int lov_process_config(const struct lu_env *env, + struct lu_device *d, struct lustre_cfg *cfg) +{ + struct obd_device *obd = d->ld_obd; + int cmd; + int rc; + int gen; + __u32 index; + + obd_getref(obd); + + cmd = cfg->lcfg_command; + + rc = lov_process_config_base(d->ld_obd, cfg, &index, &gen); + if (rc < 0) + GOTO(out, rc); + + switch (cmd) { + case LCFG_LOV_ADD_OBD: + case LCFG_LOV_ADD_INA: + rc = lov_cl_add_target(env, d, index); + if (rc != 0) + lov_del_target(d->ld_obd, index, NULL, 0); + break; + case LCFG_LOV_DEL_OBD: + lov_cl_del_target(env, d, index); + break; + case LCFG_ADD_MDC: + { + struct obd_device *mdc; + struct obd_uuid tgt_uuid; + + /* modify_mdc_tgts add 0:lustre-clilmv 1:lustre-MDT0000_UUID + * 2:0 3:1 4:lustre-MDT0000-mdc_UUID */ + if (LUSTRE_CFG_BUFLEN(cfg, 1) > sizeof(tgt_uuid.uuid)) + GOTO(out, rc = -EINVAL); + + obd_str2uuid(&tgt_uuid, lustre_cfg_buf(cfg, 1)); + + if (sscanf(lustre_cfg_buf(cfg, 2), "%d", &index) != 1) + GOTO(out, rc = -EINVAL); + + mdc = class_find_client_obd(&tgt_uuid, LUSTRE_MDC_NAME, + &obd->obd_uuid); + if (mdc == NULL) + GOTO(out, rc = -ENODEV); + rc = lov_add_mdc_target(env, d, mdc, index); + break; + } + } +out: + obd_putref(obd); + RETURN(rc); } static const struct lu_device_operations lov_lu_ops = { @@ -359,13 +527,45 @@ static struct lu_device *lov_device_alloc(const struct lu_env *env, obd = class_name2obd(lustre_cfg_string(cfg, 0)); LASSERT(obd != NULL); rc = lov_setup(obd, cfg); - if (rc) { - lov_device_free(env, d); - RETURN(ERR_PTR(rc)); - } + if (rc) + GOTO(out, rc); + + /* Alloc MDC devices array */ + /* XXX: need dynamic allocation at some moment */ + OBD_ALLOC(ld->ld_md_tgts, sizeof(*ld->ld_md_tgts) * LOV_MDC_TGT_MAX); + if (ld->ld_md_tgts == NULL) + GOTO(out, rc = -ENOMEM); + + ld->ld_md_tgts_nr = 0; + + ld->ld_lov = &obd->u.lov; + OBD_ALLOC(ld->ld_lov->lov_mdc_tgts, + sizeof(*ld->ld_lov->lov_mdc_tgts) * LOV_MDC_TGT_MAX); + if (ld->ld_lov->lov_mdc_tgts == NULL) + GOTO(out_md_tgts, rc = -ENOMEM); + + rc = lu_site_init(&ld->ld_site, d); + if (rc != 0) + GOTO(out_mdc_tgts, rc); + + rc = lu_site_init_finish(&ld->ld_site); + if (rc != 0) + GOTO(out_site, rc); + + RETURN(d); +out_site: + lu_site_fini(&ld->ld_site); +out_mdc_tgts: + OBD_FREE(ld->ld_lov->lov_mdc_tgts, + sizeof(*ld->ld_lov->lov_mdc_tgts) * LOV_MDC_TGT_MAX); + ld->ld_lov->lov_mdc_tgts = NULL; +out_md_tgts: + OBD_FREE(ld->ld_md_tgts, sizeof(*ld->ld_md_tgts) * LOV_MDC_TGT_MAX); + ld->ld_md_tgts = NULL; +out: + OBD_FREE_PTR(ld); - ld->ld_lov = &obd->u.lov; - RETURN(d); + return ERR_PTR(rc); } static const struct lu_device_type_operations lov_device_type_ops = { diff --git a/lustre/lov/lov_ea.c b/lustre/lov/lov_ea.c index b29bb43..8025d1c 100644 --- a/lustre/lov/lov_ea.c +++ b/lustre/lov/lov_ea.c @@ -90,7 +90,8 @@ static int lsm_lmm_verify_v1v3(struct lov_mds_md *lmm, size_t lmm_size, return -EINVAL; } - if (lov_pattern(le32_to_cpu(lmm->lmm_pattern)) != LOV_PATTERN_RAID0) { + if (lov_pattern(le32_to_cpu(lmm->lmm_pattern)) != LOV_PATTERN_MDT && + lov_pattern(le32_to_cpu(lmm->lmm_pattern)) != LOV_PATTERN_RAID0) { CERROR("bad striping pattern\n"); lov_dump_lmm_common(D_WARNING, lmm); return -EINVAL; @@ -201,6 +202,12 @@ lsme_unpack(struct lov_obd *lov, struct lov_mds_md *lmm, size_t buf_size, GOTO(out_lsme, rc = -E2BIG); } + /* with Data-on-MDT set maxbytes to stripe size */ + if (lsme_is_dom(lsme)) { + lov_bytes = lsme->lsme_stripe_size; + goto out_dom; + } + for (i = 0; i < stripe_count; i++) { struct lov_oinfo *loi; struct lov_tgt_desc *ltd; @@ -244,6 +251,7 @@ lsme_unpack(struct lov_obd *lov, struct lov_mds_md *lmm, size_t buf_size, lov_bytes = min_stripe_maxbytes * stripe_count; +out_dom: if (maxbytes != NULL) { if (lov_bytes < min_stripe_maxbytes) /* handle overflow */ *maxbytes = MAX_LFS_FILESIZE; @@ -381,7 +389,8 @@ lsme_unpack_comp(struct lov_obd *lov, struct lov_mds_md *lmm, unsigned int stripe_count; stripe_count = le16_to_cpu(lmm->lmm_stripe_count); - if (stripe_count == 0) + if (stripe_count == 0 && + lov_pattern(le32_to_cpu(lmm->lmm_pattern)) != LOV_PATTERN_MDT) RETURN(ERR_PTR(-EINVAL)); /* un-instantiated lmm contains no ost id info, i.e. lov_ost_data_v1 */ if (!inited) @@ -467,9 +476,10 @@ lsm_unpackmd_comp_md_v1(struct lov_obd *lov, void *buf, size_t buf_size) maxbytes; /* the last component hasn't been defined, or * lsm_maxbytes overflowed. */ - if (lsme->lsme_extent.e_end != LUSTRE_EOF || - lsm->lsm_maxbytes < - (loff_t)lsme->lsme_extent.e_start) + if (!lsme_is_dom(lsme) && + (lsme->lsme_extent.e_end != LUSTRE_EOF || + lsm->lsm_maxbytes < + (loff_t)lsme->lsme_extent.e_start)) lsm->lsm_maxbytes = MAX_LFS_FILESIZE; } } diff --git a/lustre/lov/lov_internal.h b/lustre/lov/lov_internal.h index aa400ad..a081b33 100644 --- a/lustre/lov/lov_internal.h +++ b/lustre/lov/lov_internal.h @@ -54,6 +54,11 @@ struct lov_stripe_md_entry { struct lov_oinfo *lsme_oinfo[]; }; +static inline bool lsme_is_dom(struct lov_stripe_md_entry *lsme) +{ + return (lov_pattern(lsme->lsme_pattern) == LOV_PATTERN_MDT); +} + static inline void copy_lsm_entry(struct lov_stripe_md_entry *dst, struct lov_stripe_md_entry *src) { @@ -306,6 +311,8 @@ extern struct lprocfs_vars lprocfs_lov_obd_vars[]; /* lov_cl.c */ extern struct lu_device_type lov_device_type; +#define LOV_MDC_TGT_MAX 256 + /* pools */ extern struct cfs_hash_ops pool_hash_operations; /* ost_pool methods */ diff --git a/lustre/lov/lov_io.c b/lustre/lov/lov_io.c index 577e7d1..acddf1d 100644 --- a/lustre/lov/lov_io.c +++ b/lustre/lov/lov_io.c @@ -571,7 +571,10 @@ static int lov_io_setattr_iter_init(const struct lu_env *env, if (cl_io_is_trunc(io) && lio->lis_pos > 0) { index = lov_lsm_entry(lsm, lio->lis_pos - 1); - if (index > 0 && !lsm_entry_inited(lsm, index)) { + /* no entry found for such offset */ + if (index < 0) { + RETURN(io->ci_result = -ENODATA); + } else if (!lsm_entry_inited(lsm, index)) { io->ci_need_write_intent = 1; RETURN(io->ci_result = -ENODATA); } diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index 5204de9..0e645b7 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -899,7 +899,10 @@ int lov_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg, ENTRY; switch(cmd = lcfg->lcfg_command) { - case LCFG_LOV_ADD_OBD: + case LCFG_ADD_MDC: + case LCFG_DEL_MDC: + break; + case LCFG_LOV_ADD_OBD: case LCFG_LOV_ADD_INA: case LCFG_LOV_DEL_OBD: { __u32 index; @@ -1264,58 +1267,71 @@ static int lov_set_info_async(const struct lu_env *env, struct obd_export *exp, struct obd_device *obddev = class_exp2obd(exp); struct lov_obd *lov = &obddev->u.lov; struct lov_tgt_desc *tgt; - int do_inactive = 0; - int no_set = 0; - u32 count; + bool do_inactive = false, no_set = false; u32 i; int rc = 0; int err; - ENTRY; - if (set == NULL) { - no_set = 1; - set = ptlrpc_prep_set(); - if (!set) - RETURN(-ENOMEM); - } + ENTRY; - obd_getref(obddev); - count = lov->desc.ld_tgt_count; + if (set == NULL) { + no_set = true; + set = ptlrpc_prep_set(); + if (!set) + RETURN(-ENOMEM); + } + + obd_getref(obddev); if (KEY_IS(KEY_CHECKSUM)) { - do_inactive = 1; + do_inactive = true; } else if (KEY_IS(KEY_CACHE_SET)) { LASSERT(lov->lov_cache == NULL); lov->lov_cache = val; - do_inactive = 1; + do_inactive = true; cl_cache_incref(lov->lov_cache); } - for (i = 0; i < count; i++) { + for (i = 0; i < lov->desc.ld_tgt_count; i++) { tgt = lov->lov_tgts[i]; - /* OST was disconnected */ - if (!tgt || !tgt->ltd_exp) - continue; + /* OST was disconnected */ + if (tgt == NULL || tgt->ltd_exp == NULL) + continue; - /* OST is inactive and we don't want inactive OSCs */ - if (!tgt->ltd_active && !do_inactive) - continue; + /* OST is inactive and we don't want inactive OSCs */ + if (!tgt->ltd_active && !do_inactive) + continue; err = obd_set_info_async(env, tgt->ltd_exp, keylen, key, vallen, val, set); - if (!rc) - rc = err; - } - obd_putref(obddev); - if (no_set) { - err = ptlrpc_set_wait(set); - if (!rc) - rc = err; - ptlrpc_set_destroy(set); - } - RETURN(rc); + if (rc == 0) + rc = err; + } + + /* cycle through MDC target for Data-on-MDT */ + for (i = 0; i < LOV_MDC_TGT_MAX; i++) { + struct obd_device *mdc; + + mdc = lov->lov_mdc_tgts[i].lmtd_mdc; + if (mdc == NULL) + continue; + + err = obd_set_info_async(env, mdc->obd_self_export, + keylen, key, vallen, val, set); + if (rc == 0) + rc = err; + } + + obd_putref(obddev); + if (no_set) { + err = ptlrpc_set_wait(set); + if (rc == 0) + rc = err; + ptlrpc_set_destroy(set); + } + RETURN(rc); } void lov_stripe_lock(struct lov_stripe_md *md) diff --git a/lustre/lov/lov_object.c b/lustre/lov/lov_object.c index e26769c..216221a 100644 --- a/lustre/lov/lov_object.c +++ b/lustre/lov/lov_object.c @@ -89,30 +89,40 @@ static void lov_lsm_put(struct lov_stripe_md *lsm) * Lov object layout operations. * */ -static int lov_init_empty(const struct lu_env *env, struct lov_device *dev, - struct lov_object *lov, struct lov_stripe_md *lsm, - const struct cl_object_conf *conf, - union lov_layout_state *state) + +static struct cl_object *lov_sub_find(const struct lu_env *env, + struct cl_device *dev, + const struct lu_fid *fid, + const struct cl_object_conf *conf) { - return 0; + struct lu_object *o; + + ENTRY; + + o = lu_object_find_at(env, cl2lu_dev(dev), fid, &conf->coc_lu); + LASSERT(ergo(!IS_ERR(o), o->lo_dev->ld_type == &lovsub_device_type)); + RETURN(lu2cl(o)); } -static struct cl_object *lov_sub_find(const struct lu_env *env, - struct cl_device *dev, - const struct lu_fid *fid, - const struct cl_object_conf *conf) +static int lov_page_slice_fixup(struct lov_object *lov, + struct cl_object *stripe) { - struct lu_object *o; + struct cl_object_header *hdr = cl_object_header(&lov->lo_cl); + struct cl_object *o; - ENTRY; - o = lu_object_find_at(env, cl2lu_dev(dev), fid, &conf->coc_lu); - LASSERT(ergo(!IS_ERR(o), o->lo_dev->ld_type == &lovsub_device_type)); - RETURN(lu2cl(o)); + if (stripe == NULL) + return hdr->coh_page_bufsize - lov->lo_cl.co_slice_off - + cfs_size_round(sizeof(struct lov_page)); + + cl_object_for_each(o, stripe) + o->co_slice_off += hdr->coh_page_bufsize; + + return cl_object_header(stripe)->coh_page_bufsize; } static int lov_init_sub(const struct lu_env *env, struct lov_object *lov, - struct cl_object *subobj, struct lov_layout_raid0 *r0, - struct lov_oinfo *oinfo, int idx) + struct cl_object *subobj, struct lov_oinfo *oinfo, + int idx) { struct cl_object_header *hdr; struct cl_object_header *subhdr; @@ -132,7 +142,7 @@ static int lov_init_sub(const struct lu_env *env, struct lov_object *lov, return -EIO; } - hdr = cl_object_header(lov2cl(lov)); + hdr = cl_object_header(lov2cl(lov)); subhdr = cl_object_header(subobj); CDEBUG(D_INODE, DFID"@%p[%d:%d] -> "DFID"@%p: ostid: "DOSTID @@ -145,13 +155,14 @@ static int lov_init_sub(const struct lu_env *env, struct lov_object *lov, spin_lock(&subhdr->coh_attr_guard); parent = subhdr->coh_parent; if (parent == NULL) { + struct lovsub_object *lso = cl2lovsub(subobj); + subhdr->coh_parent = hdr; spin_unlock(&subhdr->coh_attr_guard); subhdr->coh_nesting = hdr->coh_nesting + 1; lu_object_ref_add(&subobj->co_lu, "lov-parent", lov); - r0->lo_sub[stripe] = cl2lovsub(subobj); - r0->lo_sub[stripe]->lso_super = lov; - r0->lo_sub[stripe]->lso_index = idx; + lso->lso_super = lov; + lso->lso_index = idx; result = 0; } else { struct lu_object *old_obj; @@ -181,33 +192,19 @@ static int lov_init_sub(const struct lu_env *env, struct lov_object *lov, return result; } -static int lov_page_slice_fixup(struct lov_object *lov, - struct cl_object *stripe) -{ - struct cl_object_header *hdr = cl_object_header(&lov->lo_cl); - struct cl_object *o; - - if (stripe == NULL) - return hdr->coh_page_bufsize - lov->lo_cl.co_slice_off - - cfs_size_round(sizeof(struct lov_page)); - - cl_object_for_each(o, stripe) - o->co_slice_off += hdr->coh_page_bufsize; - - return cl_object_header(stripe)->coh_page_bufsize; -} - static int lov_init_raid0(const struct lu_env *env, struct lov_device *dev, - struct lov_object *lov, int index, - struct lov_layout_raid0 *r0) + struct lov_object *lov, unsigned int index, + const struct cl_object_conf *conf, + struct lov_layout_entry *lle) { - struct lov_thread_info *lti = lov_env_info(env); - struct cl_object_conf *subconf = <i->lti_stripe_conf; - struct lu_fid *ofid = <i->lti_fid; - struct cl_object *stripe; + struct lov_layout_raid0 *r0 = &lle->lle_raid0; + struct lov_thread_info *lti = lov_env_info(env); + struct cl_object_conf *subconf = <i->lti_stripe_conf; + struct lu_fid *ofid = <i->lti_fid; + struct cl_object *stripe; struct lov_stripe_md_entry *lse = lov_lse(lov, index); int result; - int psz; + int psz, sz; int i; ENTRY; @@ -255,7 +252,7 @@ static int lov_init_raid0(const struct lu_env *env, struct lov_device *dev, if (IS_ERR(stripe)) GOTO(out, result = PTR_ERR(stripe)); - result = lov_init_sub(env, lov, stripe, r0, oinfo, + result = lov_init_sub(env, lov, stripe, oinfo, lov_comp_index(index, i)); if (result == -EAGAIN) { /* try again */ --i; @@ -264,7 +261,9 @@ static int lov_init_raid0(const struct lu_env *env, struct lov_device *dev, } if (result == 0) { - int sz = lov_page_slice_fixup(lov, stripe); + r0->lo_sub[i] = cl2lovsub(stripe); + + sz = lov_page_slice_fixup(lov, stripe); LASSERT(ergo(psz > 0, psz == sz)); psz = sz; } @@ -275,116 +274,6 @@ out: RETURN(result); } -static int lov_init_composite(const struct lu_env *env, struct lov_device *dev, - struct lov_object *lov, struct lov_stripe_md *lsm, - const struct cl_object_conf *conf, - union lov_layout_state *state) -{ - struct lov_layout_composite *comp = &state->composite; - unsigned int entry_count; - unsigned int psz = 0; - int result = 0; - int i; - - ENTRY; - - LASSERT(lsm->lsm_entry_count > 0); - LASSERT(lov->lo_lsm == NULL); - lov->lo_lsm = lsm_addref(lsm); - lov->lo_layout_invalid = true; - - entry_count = lsm->lsm_entry_count; - comp->lo_entry_count = entry_count; - - OBD_ALLOC(comp->lo_entries, entry_count * sizeof(*comp->lo_entries)); - if (comp->lo_entries == NULL) - RETURN(-ENOMEM); - - for (i = 0; i < entry_count; i++) { - struct lov_layout_entry *le = &comp->lo_entries[i]; - - le->lle_extent = lsm->lsm_entries[i]->lsme_extent; - /** - * If the component has not been init-ed on MDS side, for - * PFL layout, we'd know that the components beyond this one - * will be dynamically init-ed later on file write/trunc ops. - */ - if (!lsm_entry_inited(lsm, i)) - continue; - - result = lov_init_raid0(env, dev, lov, i, &le->lle_raid0); - if (result < 0) - break; - - LASSERT(ergo(psz > 0, psz == result)); - psz = result; - } - if (psz > 0) - cl_object_header(&lov->lo_cl)->coh_page_bufsize += psz; - - return result > 0 ? 0 : result; -} - -static int lov_init_released(const struct lu_env *env, - struct lov_device *dev, struct lov_object *lov, - struct lov_stripe_md *lsm, - const struct cl_object_conf *conf, - union lov_layout_state *state) -{ - LASSERT(lsm != NULL); - LASSERT(lsm->lsm_is_released); - LASSERT(lov->lo_lsm == NULL); - - lov->lo_lsm = lsm_addref(lsm); - return 0; -} - -static struct cl_object *lov_find_subobj(const struct lu_env *env, - struct lov_object *lov, - struct lov_stripe_md *lsm, - int index) -{ - struct lov_device *dev = lu2lov_dev(lov2lu(lov)->lo_dev); - struct lov_thread_info *lti = lov_env_info(env); - struct lu_fid *ofid = <i->lti_fid; - struct lov_oinfo *oinfo; - struct cl_device *subdev; - int entry = lov_comp_entry(index); - int stripe = lov_comp_stripe(index); - int ost_idx; - int rc; - struct cl_object *result; - - if (lov->lo_type != LLT_COMP) - GOTO(out, result = NULL); - - if (entry >= lsm->lsm_entry_count || - stripe >= lsm->lsm_entries[entry]->lsme_stripe_count) - GOTO(out, result = NULL); - - oinfo = lsm->lsm_entries[entry]->lsme_oinfo[stripe]; - ost_idx = oinfo->loi_ost_idx; - rc = ostid_to_fid(ofid, &oinfo->loi_oi, ost_idx); - if (rc != 0) - GOTO(out, result = NULL); - - subdev = lovsub2cl_dev(dev->ld_target[ost_idx]); - result = lov_sub_find(env, subdev, ofid, NULL); -out: - if (result == NULL) - result = ERR_PTR(-EINVAL); - return result; -} - -static int lov_delete_empty(const struct lu_env *env, struct lov_object *lov, - union lov_layout_state *state) -{ - LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED); - - lov_layout_wait(env, lov); - return 0; -} - static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov, struct lov_layout_raid0 *r0, struct lovsub_object *los, int idx) @@ -405,8 +294,8 @@ static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov, lu_object_ref_del(&sub->co_lu, "lov-parent", lov); cl_object_put(env, sub); - /* ... wait until it is actually destroyed---sub-object clears its - * ->lo_sub[] slot in lovsub_object_fini() */ + /* ... wait until it is actually destroyed---sub-object clears its + * ->lo_sub[] slot in lovsub_object_free() */ if (r0->lo_sub[idx] == los) { waiter = &lov_env_info(env)->lti_waiter; init_waitqueue_entry(waiter, current); @@ -432,8 +321,10 @@ static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov, } static void lov_delete_raid0(const struct lu_env *env, struct lov_object *lov, - struct lov_layout_raid0 *r0) + struct lov_layout_entry *lle) { + struct lov_layout_raid0 *r0 = &lle->lle_raid0; + ENTRY; if (r0->lo_sub != NULL) { @@ -456,6 +347,386 @@ static void lov_delete_raid0(const struct lu_env *env, struct lov_object *lov, EXIT; } +static void lov_fini_raid0(const struct lu_env *env, + struct lov_layout_entry *lle) +{ + struct lov_layout_raid0 *r0 = &lle->lle_raid0; + + if (r0->lo_sub != NULL) { + OBD_FREE_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]); + r0->lo_sub = NULL; + } +} + +static int lov_print_raid0(const struct lu_env *env, void *cookie, + lu_printer_t p, const struct lov_layout_entry *lle) +{ + const struct lov_layout_raid0 *r0 = &lle->lle_raid0; + int i; + + for (i = 0; i < r0->lo_nr; ++i) { + struct lu_object *sub; + + if (r0->lo_sub[i] != NULL) { + sub = lovsub2lu(r0->lo_sub[i]); + lu_object_print(env, cookie, p, sub); + } else { + (*p)(env, cookie, "sub %d absent\n", i); + } + } + return 0; +} + +static int lov_attr_get_raid0(const struct lu_env *env, struct lov_object *lov, + unsigned int index, struct lov_layout_entry *lle, + struct cl_attr **lov_attr) +{ + struct lov_layout_raid0 *r0 = &lle->lle_raid0; + struct lov_stripe_md *lsm = lov->lo_lsm; + struct ost_lvb *lvb = &lov_env_info(env)->lti_lvb; + struct cl_attr *attr = &r0->lo_attr; + __u64 kms = 0; + int result = 0; + + if (r0->lo_attr_valid) { + *lov_attr = attr; + return 0; + } + + memset(lvb, 0, sizeof(*lvb)); + + /* XXX: timestamps can be negative by sanity:test_39m, + * how can it be? */ + lvb->lvb_atime = LLONG_MIN; + lvb->lvb_ctime = LLONG_MIN; + lvb->lvb_mtime = LLONG_MIN; + + /* + * XXX that should be replaced with a loop over sub-objects, + * doing cl_object_attr_get() on them. But for now, let's + * reuse old lov code. + */ + + /* + * XXX take lsm spin-lock to keep lov_merge_lvb_kms() + * happy. It's not needed, because new code uses + * ->coh_attr_guard spin-lock to protect consistency of + * sub-object attributes. + */ + lov_stripe_lock(lsm); + result = lov_merge_lvb_kms(lsm, index, lvb, &kms); + lov_stripe_unlock(lsm); + if (result == 0) { + cl_lvb2attr(attr, lvb); + attr->cat_kms = kms; + r0->lo_attr_valid = 1; + *lov_attr = attr; + } + + return result; +} + +static struct lov_comp_layout_entry_ops raid0_ops = { + .lco_init = lov_init_raid0, + .lco_fini = lov_fini_raid0, + .lco_getattr = lov_attr_get_raid0, +}; + +static int lov_attr_get_dom(const struct lu_env *env, struct lov_object *lov, + unsigned int index, struct lov_layout_entry *lle, + struct cl_attr **lov_attr) +{ + struct lov_layout_dom *dom = &lle->lle_dom; + struct lov_oinfo *loi = dom->lo_loi; + struct cl_attr *attr = &dom->lo_dom_r0.lo_attr; + + if (dom->lo_dom_r0.lo_attr_valid) { + *lov_attr = attr; + return 0; + } + + if (OST_LVB_IS_ERR(loi->loi_lvb.lvb_blocks)) + return OST_LVB_GET_ERR(loi->loi_lvb.lvb_blocks); + + cl_lvb2attr(attr, &loi->loi_lvb); + attr->cat_kms = attr->cat_size > loi->loi_kms ? attr->cat_size : + loi->loi_kms; + dom->lo_dom_r0.lo_attr_valid = 1; + *lov_attr = attr; + + return 0; +} + +/** + * Lookup FLD to get MDS index of the given DOM object FID. + * + * \param[in] ld LOV device + * \param[in] fid FID to lookup + * \param[out] nr index in MDC array to return back + * + * \retval 0 and \a mds filled with MDS index if successful + * \retval negative value on error + */ +static int lov_fld_lookup(struct lov_device *ld, const struct lu_fid *fid, + __u32 *nr) +{ + __u32 mds_idx; + int i, rc; + + ENTRY; + + rc = fld_client_lookup(&ld->ld_lmv->u.lmv.lmv_fld, fid_seq(fid), + &mds_idx, LU_SEQ_RANGE_MDT, NULL); + if (rc) { + CERROR("%s: error while looking for mds number. Seq %#llx" + ", err = %d\n", lu_dev_name(cl2lu_dev(&ld->ld_cl)), + fid_seq(fid), rc); + RETURN(rc); + } + + CDEBUG(D_INODE, "FLD lookup got mds #%x for fid="DFID"\n", + mds_idx, PFID(fid)); + + /* find proper MDC device in the array */ + for (i = 0; i < ld->ld_md_tgts_nr; i++) { + if (ld->ld_md_tgts[i].ldm_mdc != NULL && + ld->ld_md_tgts[i].ldm_idx == mds_idx) + break; + } + + if (i == ld->ld_md_tgts_nr) { + CERROR("%s: cannot find corresponding MDC device for mds #%x " + "for fid="DFID"\n", lu_dev_name(cl2lu_dev(&ld->ld_cl)), + mds_idx, PFID(fid)); + rc = -EINVAL; + } else { + *nr = i; + } + RETURN(rc); +} + +/** + * Implementation of lov_comp_layout_entry_ops::lco_init for DOM object. + * + * Init the DOM object for the first time. It prepares also RAID0 entry + * for it to use in common methods with ordinary RAID0 layout entries. + * + * \param[in] env execution environment + * \param[in] dev LOV device + * \param[in] lov LOV object + * \param[in] index Composite layout entry index in LSM + * \param[in] lle Composite LOV layout entry + */ +static int lov_init_dom(const struct lu_env *env, struct lov_device *dev, + struct lov_object *lov, unsigned int index, + const struct cl_object_conf *conf, + struct lov_layout_entry *lle) +{ + struct lov_thread_info *lti = lov_env_info(env); + struct lov_stripe_md_entry *lsme = lov_lse(lov, index); + struct cl_object *clo; + struct lu_object *o = lov2lu(lov); + const struct lu_fid *fid = lu_object_fid(o); + struct cl_device *mdcdev; + struct lov_oinfo *loi = NULL; + struct cl_object_conf *sconf = <i->lti_stripe_conf; + struct inode *inode = conf->coc_inode; + + int rc; + __u32 idx = 0; + + ENTRY; + + LASSERT(index == 0); + + /* find proper MDS device */ + rc = lov_fld_lookup(dev, fid, &idx); + if (rc) + RETURN(rc); + + LASSERTF(dev->ld_md_tgts[idx].ldm_mdc != NULL, + "LOV md target[%u] is NULL\n", idx); + + /* check lsm is DOM, more checks are needed */ + LASSERT(lsme->lsme_stripe_count == 0); + + /* + * Create lower cl_objects. + */ + mdcdev = dev->ld_md_tgts[idx].ldm_mdc; + + LASSERTF(mdcdev != NULL, "non-initialized mdc subdev\n"); + + /* DoM object has no oinfo in LSM entry, create it exclusively */ + OBD_SLAB_ALLOC_PTR_GFP(loi, lov_oinfo_slab, GFP_NOFS); + if (loi == NULL) + RETURN(-ENOMEM); + + fid_to_ostid(lu_object_fid(lov2lu(lov)), &loi->loi_oi); + /* Initialize lvb structure */ + loi->loi_lvb.lvb_mtime = LTIME_S(inode->i_mtime); + loi->loi_lvb.lvb_atime = LTIME_S(inode->i_atime); + loi->loi_lvb.lvb_ctime = LTIME_S(inode->i_ctime); + loi->loi_lvb.lvb_blocks = inode->i_blocks; + loi->loi_lvb.lvb_size = i_size_read(inode); + if (loi->loi_lvb.lvb_size > lsme->lsme_stripe_size) + loi->loi_lvb.lvb_size = lsme->lsme_stripe_size; + loi_kms_set(loi, loi->loi_lvb.lvb_size); + + sconf->u.coc_oinfo = loi; +again: + clo = lov_sub_find(env, mdcdev, fid, sconf); + if (IS_ERR(clo)) + GOTO(out, rc = PTR_ERR(clo)); + + rc = lov_init_sub(env, lov, clo, loi, lov_comp_index(index, 0)); + if (rc == -EAGAIN) /* try again */ + goto again; + else if (rc != 0) + GOTO(out, rc); + + lle->lle_dom.lo_dom = cl2lovsub(clo); + spin_lock_init(&lle->lle_dom.lo_dom_r0.lo_sub_lock); + lle->lle_dom.lo_dom_r0.lo_nr = 1; + lle->lle_dom.lo_dom_r0.lo_sub = &lle->lle_dom.lo_dom; + lle->lle_dom.lo_loi = loi; + + rc = lov_page_slice_fixup(lov, clo); + RETURN(rc); + +out: + if (loi != NULL) + OBD_SLAB_FREE_PTR(loi, lov_oinfo_slab); + return rc; +} + +/** + * Implementation of lov_layout_operations::llo_fini for DOM object. + * + * Finish the DOM object and free related memory. + * + * \param[in] env execution environment + * \param[in] lov LOV object + * \param[in] state LOV layout state + */ +static void lov_fini_dom(const struct lu_env *env, + struct lov_layout_entry *lle) +{ + if (lle->lle_dom.lo_dom != NULL) + lle->lle_dom.lo_dom = NULL; + if (lle->lle_dom.lo_loi != NULL) + OBD_SLAB_FREE_PTR(lle->lle_dom.lo_loi, lov_oinfo_slab); +} + +static struct lov_comp_layout_entry_ops dom_ops = { + .lco_init = lov_init_dom, + .lco_fini = lov_fini_dom, + .lco_getattr = lov_attr_get_dom, +}; + +static int lov_init_composite(const struct lu_env *env, struct lov_device *dev, + struct lov_object *lov, struct lov_stripe_md *lsm, + const struct cl_object_conf *conf, + union lov_layout_state *state) +{ + struct lov_layout_composite *comp = &state->composite; + struct lov_layout_entry *lle; + unsigned int entry_count; + unsigned int psz = 0; + int result = 0; + int i; + + ENTRY; + + LASSERT(lsm->lsm_entry_count > 0); + LASSERT(lov->lo_lsm == NULL); + lov->lo_lsm = lsm_addref(lsm); + lov->lo_layout_invalid = true; + + entry_count = lsm->lsm_entry_count; + comp->lo_entry_count = entry_count; + + OBD_ALLOC(comp->lo_entries, entry_count * sizeof(*comp->lo_entries)); + if (comp->lo_entries == NULL) + RETURN(-ENOMEM); + + /* Initiate all entry types and extents data at first */ + for (i = 0; i < entry_count; i++) { + lle = &comp->lo_entries[i]; + + lle->lle_type = lov_entry_type(lsm->lsm_entries[i]); + switch (lle->lle_type) { + case LOV_PATTERN_RAID0: + lle->lle_comp_ops = &raid0_ops; + break; + case LOV_PATTERN_MDT: + lle->lle_comp_ops = &dom_ops; + break; + default: + CERROR("%s: unknown composite layout entry type %i\n", + lov2obd(dev->ld_lov)->obd_name, + lsm->lsm_entries[i]->lsme_pattern); + dump_lsm(D_ERROR, lsm); + RETURN(-EIO); + } + lle->lle_extent = lsm->lsm_entries[i]->lsme_extent; + } + + i = 0; + lov_foreach_layout_entry(lov, lle) { + /** + * If the component has not been init-ed on MDS side, for + * PFL layout, we'd know that the components beyond this one + * will be dynamically init-ed later on file write/trunc ops. + */ + if (lsm_entry_inited(lsm, i)) { + result = lle->lle_comp_ops->lco_init(env, dev, lov, i, + conf, lle); + if (result < 0) + break; + + LASSERT(ergo(psz > 0, psz == result)); + psz = result; + } + i++; + } + if (psz > 0) + cl_object_header(&lov->lo_cl)->coh_page_bufsize += psz; + + return result > 0 ? 0 : result; +} + +static int lov_init_empty(const struct lu_env *env, struct lov_device *dev, + struct lov_object *lov, struct lov_stripe_md *lsm, + const struct cl_object_conf *conf, + union lov_layout_state *state) +{ + return 0; +} + +static int lov_init_released(const struct lu_env *env, + struct lov_device *dev, struct lov_object *lov, + struct lov_stripe_md *lsm, + const struct cl_object_conf *conf, + union lov_layout_state *state) +{ + LASSERT(lsm != NULL); + LASSERT(lsm->lsm_is_released); + LASSERT(lov->lo_lsm == NULL); + + lov->lo_lsm = lsm_addref(lsm); + return 0; +} + +static int lov_delete_empty(const struct lu_env *env, struct lov_object *lov, + union lov_layout_state *state) +{ + LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED); + + lov_layout_wait(env, lov); + return 0; +} + static int lov_delete_composite(const struct lu_env *env, struct lov_object *lov, union lov_layout_state *state) @@ -470,7 +741,7 @@ static int lov_delete_composite(const struct lu_env *env, lov_layout_wait(env, lov); if (comp->lo_entries) lov_foreach_layout_entry(lov, entry) - lov_delete_raid0(env, lov, &entry->lle_raid0); + lov_delete_raid0(env, lov, entry); RETURN(0); } @@ -481,15 +752,6 @@ static void lov_fini_empty(const struct lu_env *env, struct lov_object *lov, LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED); } -static void lov_fini_raid0(const struct lu_env *env, - struct lov_layout_raid0 *r0) -{ - if (r0->lo_sub != NULL) { - OBD_FREE_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]); - r0->lo_sub = NULL; - } -} - static void lov_fini_composite(const struct lu_env *env, struct lov_object *lov, union lov_layout_state *state) @@ -501,7 +763,7 @@ static void lov_fini_composite(const struct lu_env *env, struct lov_layout_entry *entry; lov_foreach_layout_entry(lov, entry) - lov_fini_raid0(env, &entry->lle_raid0); + entry->lle_comp_ops->lco_fini(env, entry); OBD_FREE(comp->lo_entries, comp->lo_entry_count * sizeof(*comp->lo_entries)); @@ -530,24 +792,6 @@ static int lov_print_empty(const struct lu_env *env, void *cookie, return 0; } -static int lov_print_raid0(const struct lu_env *env, void *cookie, - lu_printer_t p, struct lov_layout_raid0 *r0) -{ - int i; - - for (i = 0; i < r0->lo_nr; ++i) { - struct lu_object *sub; - - if (r0->lo_sub[i] != NULL) { - sub = lovsub2lu(r0->lo_sub[i]); - lu_object_print(env, cookie, p, sub); - } else { - (*p)(env, cookie, "sub %d absent\n", i); - } - } - return 0; -} - static int lov_print_composite(const struct lu_env *env, void *cookie, lu_printer_t p, const struct lu_object *o) { @@ -563,12 +807,15 @@ static int lov_print_composite(const struct lu_env *env, void *cookie, for (i = 0; i < lsm->lsm_entry_count; i++) { struct lov_stripe_md_entry *lse = lsm->lsm_entries[i]; + struct lov_layout_entry *lle = lov_entry(lov, i); - (*p)(env, cookie, DEXT ": { 0x%08X, %u, %u, %#x, %u, %u }\n", + (*p)(env, cookie, + DEXT ": { 0x%08X, %u, %#x, %u, %#x, %u, %u }\n", PEXT(&lse->lsme_extent), lse->lsme_magic, - lse->lsme_id, lse->lsme_layout_gen, lse->lsme_flags, - lse->lsme_stripe_count, lse->lsme_stripe_size); - lov_print_raid0(env, cookie, p, lov_r0(lov, i)); + lse->lsme_id, lse->lsme_pattern, lse->lsme_layout_gen, + lse->lsme_flags, lse->lsme_stripe_count, + lse->lsme_stripe_size); + lov_print_raid0(env, cookie, p, lle); } return 0; @@ -602,51 +849,6 @@ static int lov_attr_get_empty(const struct lu_env *env, struct cl_object *obj, return 0; } -static int lov_attr_get_raid0(const struct lu_env *env, struct lov_object *lov, - unsigned int index, struct lov_layout_raid0 *r0) - -{ - struct lov_stripe_md *lsm = lov->lo_lsm; - struct ost_lvb *lvb = &lov_env_info(env)->lti_lvb; - struct cl_attr *attr = &r0->lo_attr; - __u64 kms = 0; - int result = 0; - - if (r0->lo_attr_valid) - return 0; - - memset(lvb, 0, sizeof(*lvb)); - - /* XXX: timestamps can be negative by sanity:test_39m, - * how can it be? */ - lvb->lvb_atime = LLONG_MIN; - lvb->lvb_ctime = LLONG_MIN; - lvb->lvb_mtime = LLONG_MIN; - - /* - * XXX that should be replaced with a loop over sub-objects, - * doing cl_object_attr_get() on them. But for now, let's - * reuse old lov code. - */ - - /* - * XXX take lsm spin-lock to keep lov_merge_lvb_kms() - * happy. It's not needed, because new code uses - * ->coh_attr_guard spin-lock to protect consistency of - * sub-object attributes. - */ - lov_stripe_lock(lsm); - result = lov_merge_lvb_kms(lsm, index, lvb, &kms); - lov_stripe_unlock(lsm); - if (result == 0) { - cl_lvb2attr(attr, lvb); - attr->cat_kms = kms; - r0->lo_attr_valid = 1; - } - - return result; -} - static int lov_attr_get_composite(const struct lu_env *env, struct cl_object *obj, struct cl_attr *attr) @@ -661,19 +863,22 @@ static int lov_attr_get_composite(const struct lu_env *env, attr->cat_size = 0; attr->cat_blocks = 0; lov_foreach_layout_entry(lov, entry) { - struct lov_layout_raid0 *r0 = &entry->lle_raid0; - struct cl_attr *lov_attr = &r0->lo_attr; + struct cl_attr *lov_attr = NULL; /* PFL: This component has not been init-ed. */ if (!lsm_entry_inited(lov->lo_lsm, index)) break; - result = lov_attr_get_raid0(env, lov, index, r0); - if (result != 0) - break; + result = entry->lle_comp_ops->lco_getattr(env, lov, index, + entry, &lov_attr); + if (result < 0) + RETURN(result); index++; + if (lov_attr == NULL) + continue; + /* merge results */ attr->cat_blocks += lov_attr->cat_blocks; if (attr->cat_size < lov_attr->cat_size) @@ -687,28 +892,28 @@ static int lov_attr_get_composite(const struct lu_env *env, if (attr->cat_mtime < lov_attr->cat_mtime) attr->cat_mtime = lov_attr->cat_mtime; } - RETURN(result); + RETURN(0); } const static struct lov_layout_operations lov_dispatch[] = { - [LLT_EMPTY] = { - .llo_init = lov_init_empty, - .llo_delete = lov_delete_empty, - .llo_fini = lov_fini_empty, - .llo_print = lov_print_empty, - .llo_page_init = lov_page_init_empty, - .llo_lock_init = lov_lock_init_empty, - .llo_io_init = lov_io_init_empty, + [LLT_EMPTY] = { + .llo_init = lov_init_empty, + .llo_delete = lov_delete_empty, + .llo_fini = lov_fini_empty, + .llo_print = lov_print_empty, + .llo_page_init = lov_page_init_empty, + .llo_lock_init = lov_lock_init_empty, + .llo_io_init = lov_io_init_empty, .llo_getattr = lov_attr_get_empty, - }, - [LLT_RELEASED] = { - .llo_init = lov_init_released, - .llo_delete = lov_delete_empty, - .llo_fini = lov_fini_released, - .llo_print = lov_print_released, - .llo_page_init = lov_page_init_empty, - .llo_lock_init = lov_lock_init_empty, - .llo_io_init = lov_io_init_released, + }, + [LLT_RELEASED] = { + .llo_init = lov_init_released, + .llo_delete = lov_delete_empty, + .llo_fini = lov_fini_released, + .llo_print = lov_print_released, + .llo_page_init = lov_page_init_empty, + .llo_lock_init = lov_lock_init_empty, + .llo_io_init = lov_io_init_released, .llo_getattr = lov_attr_get_empty, }, [LLT_COMP] = { @@ -1253,6 +1458,43 @@ struct fiemap_state { bool fs_enough; }; +static struct cl_object *lov_find_subobj(const struct lu_env *env, + struct lov_object *lov, + struct lov_stripe_md *lsm, + int index) +{ + struct lov_device *dev = lu2lov_dev(lov2lu(lov)->lo_dev); + struct lov_thread_info *lti = lov_env_info(env); + struct lu_fid *ofid = <i->lti_fid; + struct lov_oinfo *oinfo; + struct cl_device *subdev; + int entry = lov_comp_entry(index); + int stripe = lov_comp_stripe(index); + int ost_idx; + int rc; + struct cl_object *result; + + if (lov->lo_type != LLT_COMP) + GOTO(out, result = NULL); + + if (entry >= lsm->lsm_entry_count || + stripe >= lsm->lsm_entries[entry]->lsme_stripe_count) + GOTO(out, result = NULL); + + oinfo = lsm->lsm_entries[entry]->lsme_oinfo[stripe]; + ost_idx = oinfo->loi_ost_idx; + rc = ostid_to_fid(ofid, &oinfo->loi_oi, ost_idx); + if (rc != 0) + GOTO(out, result = NULL); + + subdev = lovsub2cl_dev(dev->ld_target[ost_idx]); + result = lov_sub_find(env, subdev, ofid, NULL); +out: + if (result == NULL) + result = ERR_PTR(-EINVAL); + return result; +} + int fiemap_for_stripe(const struct lu_env *env, struct cl_object *obj, struct lov_stripe_md *lsm, struct fiemap *fiemap, size_t *buflen, struct ll_fiemap_info_key *fmkey, @@ -1466,6 +1708,10 @@ static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj, GOTO(out_lsm, rc = -ENOTSUPP); } + /* No support for DOM layout yet. */ + if (lsme_is_dom(lsm->lsm_entries[0])) + GOTO(out_lsm, rc = -ENOTSUPP); + if (lsm->lsm_is_released) { if (fiemap->fm_start < fmkey->lfik_oa.o_size) { /** diff --git a/lustre/lov/lov_offset.c b/lustre/lov/lov_offset.c index 3ff0a38..c5ba4eb 100644 --- a/lustre/lov/lov_offset.c +++ b/lustre/lov/lov_offset.c @@ -44,6 +44,9 @@ static loff_t stripe_width(struct lov_stripe_md *lsm, unsigned int index) LASSERT(index < lsm->lsm_entry_count); + if (lsme_is_dom(entry)) + return (loff_t)entry->lsme_stripe_size; + return (loff_t)entry->lsme_stripe_size * entry->lsme_stripe_count; } @@ -141,12 +144,12 @@ int lov_stripe_offset(struct lov_stripe_md *lsm, int index, loff_t lov_off, loff_t stripe_off; loff_t this_stripe; loff_t swidth; - int ret = 0; + int ret = 0; - if (lov_off == OBD_OBJECT_EOF) { - *obdoff = OBD_OBJECT_EOF; - return 0; - } + if (lov_off == OBD_OBJECT_EOF) { + *obdoff = OBD_OBJECT_EOF; + return 0; + } swidth = stripe_width(lsm, index); @@ -197,8 +200,8 @@ loff_t lov_size_to_stripe(struct lov_stripe_md *lsm, int index, u64 file_size, loff_t this_stripe; loff_t swidth; - if (file_size == OBD_OBJECT_EOF) - return OBD_OBJECT_EOF; + if (file_size == OBD_OBJECT_EOF) + return OBD_OBJECT_EOF; swidth = stripe_width(lsm, index); diff --git a/lustre/mdc/mdc_request.c b/lustre/mdc/mdc_request.c index 793ebc6..718d6ff 100644 --- a/lustre/mdc/mdc_request.c +++ b/lustre/mdc/mdc_request.c @@ -2264,7 +2264,12 @@ static int mdc_set_info_async(const struct lu_env *env, RETURN(0); } - CERROR("Unknown key %s\n", (char *)key); + /* TODO: these OSC-related keys are ignored for now */ + if (KEY_IS(KEY_CHECKSUM) || KEY_IS(KEY_CACHE_SET) || + KEY_IS(KEY_CACHE_LRU_SHRINK) || KEY_IS(KEY_GRANT_SHRINK)) + RETURN(0); + + CERROR("%s: Unknown key %s\n", exp->exp_obd->obd_name, (char *)key); RETURN(-EINVAL); } diff --git a/lustre/mgs/mgs_llog.c b/lustre/mgs/mgs_llog.c index a583d6d..135e78b 100644 --- a/lustre/mgs/mgs_llog.c +++ b/lustre/mgs/mgs_llog.c @@ -1910,7 +1910,8 @@ static int mgs_steal_client_llog_handler(const struct lu_env *env, if (lcfg->lcfg_command == LCFG_SPTLRPC_CONF) RETURN(rc); - if (lcfg->lcfg_command == LCFG_ADD_MDC) { + if (lcfg->lcfg_command == LCFG_ADD_MDC && + strstr(lustre_cfg_string(lcfg, 0), "-clilmv") != NULL) { int index; if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1) @@ -2546,17 +2547,14 @@ static int mgs_write_log_mdt(const struct lu_env *env, if (rc) GOTO(out_free, rc); - rc = record_marker(env, llh, fsdb, CM_START, cliname, - "mount opts"); + rc = record_marker(env, llh, fsdb, CM_START, cliname, "mount opts"); if (rc) GOTO(out_end, rc); rc = record_mount_opt(env, llh, cliname, fsdb->fsdb_clilov, fsdb->fsdb_clilmv); if (rc) GOTO(out_end, rc); - rc = record_marker(env, llh, fsdb, CM_END, cliname, - "mount opts"); - + rc = record_marker(env, llh, fsdb, CM_END, cliname, "mount opts"); if (rc) GOTO(out_end, rc); diff --git a/lustre/obdclass/obd_config.c b/lustre/obdclass/obd_config.c index bcb9375..84987d3 100644 --- a/lustre/obdclass/obd_config.c +++ b/lustre/obdclass/obd_config.c @@ -1295,7 +1295,6 @@ int class_process_config(struct lustre_cfg *lcfg) GOTO(out, err = -EINVAL); } - switch(lcfg->lcfg_command) { case LCFG_SETUP: { err = class_setup(obd, lcfg); @@ -1335,12 +1334,47 @@ int class_process_config(struct lustre_cfg *lcfg) err = obd_pool_del(obd, lustre_cfg_string(lcfg, 2)); GOTO(out, err = 0); } - default: { - err = obd_process_config(obd, sizeof(*lcfg), lcfg); - GOTO(out, err); + /* Process config log ADD_MDC record twice to add MDC also to LOV + * for Data-on-MDT: + * + * add 0:lustre-clilmv 1:lustre-MDT0000_UUID 2:0 3:1 + * 4:lustre-MDT0000-mdc_UUID + */ + case LCFG_ADD_MDC: { + struct obd_device *lov_obd; + char *clilmv; + + err = obd_process_config(obd, sizeof(*lcfg), lcfg); + if (err) + GOTO(out, err); + + /* make sure this is client LMV log entry */ + clilmv = strstr(lustre_cfg_string(lcfg, 0), "clilmv"); + if (!clilmv) + GOTO(out, err); + + /* replace 'lmv' with 'lov' name to address LOV device and + * process llog record to add MDC there. */ + clilmv[4] = 'o'; + lov_obd = class_name2obd(lustre_cfg_string(lcfg, 0)); + if (lov_obd == NULL) { + err = -ENOENT; + CERROR("%s: Cannot find LOV by %s name, rc = %d\n", + obd->obd_name, lustre_cfg_string(lcfg, 0), err); + } else { + err = obd_process_config(lov_obd, sizeof(*lcfg), lcfg); + } + /* restore 'lmv' name */ + clilmv[4] = 'm'; + GOTO(out, err); + } + default: { + err = obd_process_config(obd, sizeof(*lcfg), lcfg); + GOTO(out, err); } } + EXIT; out: if ((err < 0) && !(lcfg->lcfg_command & LCFG_REQUIRED)) { CWARN("Ignoring error %d on optional command %#x\n", err, diff --git a/lustre/ptlrpc/wiretest.c b/lustre/ptlrpc/wiretest.c index 1eb0078..f5bd241a 100644 --- a/lustre/ptlrpc/wiretest.c +++ b/lustre/ptlrpc/wiretest.c @@ -1672,8 +1672,8 @@ void lustre_assert_wire_constants(void) (unsigned)LOV_PATTERN_RAID0); LASSERTF(LOV_PATTERN_RAID1 == 0x00000002UL, "found 0x%.8xUL\n", (unsigned)LOV_PATTERN_RAID1); - LASSERTF(LOV_PATTERN_FIRST == 0x00000100UL, "found 0x%.8xUL\n", - (unsigned)LOV_PATTERN_FIRST); + LASSERTF(LOV_PATTERN_MDT == 0x00000100UL, "found 0x%.8xUL\n", + (unsigned)LOV_PATTERN_MDT); LASSERTF(LOV_PATTERN_CMOBD == 0x00000200UL, "found 0x%.8xUL\n", (unsigned)LOV_PATTERN_CMOBD); diff --git a/lustre/utils/wirecheck.c b/lustre/utils/wirecheck.c index dc3e115..2abe262 100644 --- a/lustre/utils/wirecheck.c +++ b/lustre/utils/wirecheck.c @@ -765,7 +765,7 @@ check_lov_mds_md_v3(void) CHECK_VALUE_X(LOV_PATTERN_RAID0); CHECK_VALUE_X(LOV_PATTERN_RAID1); - CHECK_VALUE_X(LOV_PATTERN_FIRST); + CHECK_VALUE_X(LOV_PATTERN_MDT); CHECK_VALUE_X(LOV_PATTERN_CMOBD); } diff --git a/lustre/utils/wiretest.c b/lustre/utils/wiretest.c index 3851c24..482d3d3 100644 --- a/lustre/utils/wiretest.c +++ b/lustre/utils/wiretest.c @@ -1690,8 +1690,8 @@ void lustre_assert_wire_constants(void) (unsigned)LOV_PATTERN_RAID0); LASSERTF(LOV_PATTERN_RAID1 == 0x00000002UL, "found 0x%.8xUL\n", (unsigned)LOV_PATTERN_RAID1); - LASSERTF(LOV_PATTERN_FIRST == 0x00000100UL, "found 0x%.8xUL\n", - (unsigned)LOV_PATTERN_FIRST); + LASSERTF(LOV_PATTERN_MDT == 0x00000100UL, "found 0x%.8xUL\n", + (unsigned)LOV_PATTERN_MDT); LASSERTF(LOV_PATTERN_CMOBD == 0x00000200UL, "found 0x%.8xUL\n", (unsigned)LOV_PATTERN_CMOBD);