X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Flod%2Flod_lov.c;h=c044bd140ae45d69bd3aabfe2669bea8bb4f18c2;hp=663cbb2fb9da01c41abf02a03ae2a3971de16854;hb=f843facff59226d3788d855d1d6948523ab8d944;hpb=6744eb8eeb9e0a7a745a9a42e5fe09b376e16a82 diff --git a/lustre/lod/lod_lov.c b/lustre/lod/lod_lov.c index 663cbb2..c044bd1 100644 --- a/lustre/lod/lod_lov.c +++ b/lustre/lod/lod_lov.c @@ -23,7 +23,7 @@ * Copyright 2009 Sun Microsystems, Inc. All rights reserved * Use is subject to license terms. * - * Copyright (c) 2012, 2016, Intel Corporation. + * Copyright (c) 2012, 2017, Intel Corporation. */ /* * lustre/lod/lod_lov.c @@ -111,7 +111,7 @@ void lod_putref(struct lod_device *lod, struct lod_tgt_descs *ltd) list_del(&tgt_desc->ltd_kill); if (ltd == &lod->lod_ost_descs) { /* remove from QoS structures */ - rc = qos_del_tgt(lod, tgt_desc); + rc = lqos_del_tgt(&lod->lod_qos, tgt_desc); if (rc) CERROR("%s: qos_del_tgt(%s) failed:" "rc = %d\n", @@ -220,6 +220,7 @@ int lod_add_device(const struct lu_env *env, struct lod_device *lod, struct obd_uuid obd_uuid; bool for_ost; bool lock = false; + bool connected = false; ENTRY; CDEBUG(D_CONFIG, "osp:%s idx:%d gen:%d\n", osp, index, gen); @@ -302,11 +303,12 @@ int lod_add_device(const struct lu_env *env, struct lod_device *lod, obd->obd_name, osp, rc); GOTO(out_cleanup, rc); } + connected = true; /* Allocate ost descriptor and fill it */ OBD_ALLOC_PTR(tgt_desc); if (!tgt_desc) - GOTO(out_conn, rc = -ENOMEM); + GOTO(out_cleanup, rc = -ENOMEM); tgt_desc->ltd_tgt = dt_dev; tgt_desc->ltd_exp = exp; @@ -362,7 +364,7 @@ int lod_add_device(const struct lu_env *env, struct lod_device *lod, GOTO(out_mutex, rc); } - rc = qos_add_tgt(lod, tgt_desc); + rc = lqos_add_tgt(&lod->lod_qos, tgt_desc); if (rc) { CERROR("%s: qos_add_tgt failed with %d\n", obd->obd_name, rc); @@ -426,8 +428,6 @@ out_mutex: } out_desc: OBD_FREE_PTR(tgt_desc); -out_conn: - obd_disconnect(exp); out_cleanup: /* XXX OSP needs us to send down LCFG_CLEANUP because it uses * objects from the MDT stack. See LU-7184. */ @@ -437,6 +437,9 @@ out_cleanup: lcfg->lcfg_command = LCFG_CLEANUP; lu_dev->ld_ops->ldo_process_config(env, lu_dev, lcfg); + if (connected) + obd_disconnect(exp); + return rc; } @@ -597,8 +600,6 @@ int lod_ea_store_resize(struct lod_thread_info *info, size_t size) { __u32 round = size_roundup_power2(size); - LASSERT(round <= - lov_mds_md_size(LOV_MAX_STRIPE_COUNT, LOV_MAGIC_V3)); if (info->lti_ea_store) { LASSERT(info->lti_ea_store_size); LASSERT(info->lti_ea_store_size < round); @@ -694,6 +695,12 @@ int lod_def_striping_comp_resize(struct lod_default_striping *lds, __u16 count) void lod_free_comp_entries(struct lod_object *lo) { + if (lo->ldo_mirrors) { + OBD_FREE(lo->ldo_mirrors, + sizeof(*lo->ldo_mirrors) * lo->ldo_mirror_count); + lo->ldo_mirrors = NULL; + lo->ldo_mirror_count = 0; + } lod_free_comp_buffer(lo->ldo_comp_entries, lo->ldo_comp_cnt, sizeof(*lo->ldo_comp_entries) * lo->ldo_comp_cnt); @@ -702,19 +709,78 @@ void lod_free_comp_entries(struct lod_object *lo) lo->ldo_is_composite = 0; } -int lod_alloc_comp_entries(struct lod_object *lo, int cnt) +int lod_alloc_comp_entries(struct lod_object *lo, + int mirror_count, int comp_count) { - LASSERT(cnt != 0); + LASSERT(comp_count != 0); LASSERT(lo->ldo_comp_cnt == 0 && lo->ldo_comp_entries == NULL); + if (mirror_count > 0) { + OBD_ALLOC(lo->ldo_mirrors, + sizeof(*lo->ldo_mirrors) * mirror_count); + if (!lo->ldo_mirrors) + return -ENOMEM; + + lo->ldo_mirror_count = mirror_count; + } + OBD_ALLOC_LARGE(lo->ldo_comp_entries, - sizeof(*lo->ldo_comp_entries) * cnt); - if (lo->ldo_comp_entries == NULL) + sizeof(*lo->ldo_comp_entries) * comp_count); + if (lo->ldo_comp_entries == NULL) { + OBD_FREE(lo->ldo_mirrors, + sizeof(*lo->ldo_mirrors) * mirror_count); + lo->ldo_mirror_count = 0; return -ENOMEM; - lo->ldo_comp_cnt = cnt; + } + + lo->ldo_comp_cnt = comp_count; return 0; } +int lod_fill_mirrors(struct lod_object *lo) +{ + struct lod_layout_component *lod_comp; + int mirror_idx = -1; + __u16 mirror_id = 0xffff; + int i; + ENTRY; + + LASSERT(equi(!lo->ldo_is_composite, lo->ldo_mirror_count == 0)); + + if (!lo->ldo_is_composite) + RETURN(0); + + lod_comp = &lo->ldo_comp_entries[0]; + for (i = 0; i < lo->ldo_comp_cnt; i++, lod_comp++) { + int stale = !!(lod_comp->llc_flags & LCME_FL_STALE); + int preferred = !!(lod_comp->llc_flags & LCME_FL_PREF_WR); + + if (mirror_id_of(lod_comp->llc_id) == mirror_id) { + lo->ldo_mirrors[mirror_idx].lme_stale |= stale; + lo->ldo_mirrors[mirror_idx].lme_primary |= preferred; + lo->ldo_mirrors[mirror_idx].lme_end = i; + continue; + } + + /* new mirror */ + ++mirror_idx; + if (mirror_idx >= lo->ldo_mirror_count) + RETURN(-EINVAL); + + mirror_id = mirror_id_of(lod_comp->llc_id); + + lo->ldo_mirrors[mirror_idx].lme_id = mirror_id; + lo->ldo_mirrors[mirror_idx].lme_stale = stale; + lo->ldo_mirrors[mirror_idx].lme_primary = preferred; + lo->ldo_mirrors[mirror_idx].lme_start = i; + lo->ldo_mirrors[mirror_idx].lme_end = i; + } + if (mirror_idx != lo->ldo_mirror_count - 1) + RETURN(-EINVAL); + + RETURN(0); +} + /** * Generate on-disk lov_mds_md structure for each layout component based on * the information in lod_object->ldo_comp_entries[i]. @@ -789,6 +855,11 @@ static int lod_gen_component_ea(const struct lu_env *env, objs = &v3->lmm_objects[0]; } stripe_count = lod_comp_entry_stripe_count(lo, lod_comp, is_dir); + if (stripe_count == 0 && !is_dir && + !(lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED) && + !(lod_comp->llc_pattern & LOV_PATTERN_MDT)) + RETURN(-E2BIG); + if (!is_dir && lo->ldo_is_composite) lod_comp_shrink_stripe_count(lod_comp, &stripe_count); @@ -831,7 +902,8 @@ static int lod_gen_component_ea(const struct lu_env *env, PFID(&info->lti_fid), rc); RETURN(rc); } - } else if (lod_comp->llc_ostlist.op_array) { + } else if (lod_comp->llc_ostlist.op_array && + lod_comp->llc_ostlist.op_count) { /* user specified ost list */ ost_idx = lod_comp->llc_ostlist.op_array[i]; } @@ -848,52 +920,6 @@ done: } /** - * Generate component ID for new created component. - * - * \param[in] lo LOD object - * \param[in] comp_idx index of ldo_comp_entries - * - * \retval component ID on success - * \retval LCME_ID_INVAL on failure - */ -static __u32 lod_gen_component_id(struct lod_object *lo, int comp_idx) -{ - struct lod_layout_component *lod_comp; - __u32 id, start, end; - int i; - - LASSERT(lo->ldo_comp_entries[comp_idx].llc_id == LCME_ID_INVAL); - - lod_obj_inc_layout_gen(lo); - id = lo->ldo_layout_gen; - if (likely(id <= LCME_ID_MAX)) - return id; - - /* Layout generation wraps, need to check collisions. */ - start = id & LCME_ID_MASK; - end = LCME_ID_MAX; -again: - for (id = start; id <= end; id++) { - for (i = 0; i < lo->ldo_comp_cnt; i++) { - lod_comp = &lo->ldo_comp_entries[i]; - if (id == lod_comp->llc_id) - break; - } - /* Found the ununsed ID */ - if (i == lo->ldo_comp_cnt) - return id; - } - if (end == LCME_ID_MAX) { - start = 1; - end = min(lo->ldo_layout_gen & LCME_ID_MASK, - (__u32)(LCME_ID_MAX - 1)); - goto again; - } - - return LCME_ID_INVAL; -} - -/** * Generate on-disk lov_mds_md structure based on the information in * the lod_object->ldo_comp_entries. * @@ -916,23 +942,43 @@ int lod_generate_lovea(const struct lu_env *env, struct lod_object *lo, struct lov_comp_md_entry_v1 *lcme; struct lov_comp_md_v1 *lcm; struct lod_layout_component *comp_entries; - __u16 comp_cnt; - bool is_composite; + __u16 comp_cnt, mirror_cnt; + bool is_composite, is_foreign = false; int i, rc = 0, offset; ENTRY; if (is_dir) { comp_cnt = lo->ldo_def_striping->lds_def_comp_cnt; + mirror_cnt = lo->ldo_def_striping->lds_def_mirror_cnt; comp_entries = lo->ldo_def_striping->lds_def_comp_entries; is_composite = lo->ldo_def_striping->lds_def_striping_is_composite; } else { comp_cnt = lo->ldo_comp_cnt; + mirror_cnt = lo->ldo_mirror_count; comp_entries = lo->ldo_comp_entries; is_composite = lo->ldo_is_composite; + is_foreign = lo->ldo_is_foreign; } LASSERT(lmm_size != NULL); + + if (is_foreign) { + struct lov_foreign_md *lfm; + + lfm = (struct lov_foreign_md *)lmm; + memcpy(lfm, lo->ldo_foreign_lov, lo->ldo_foreign_lov_size); + /* need to store little-endian */ + if (cpu_to_le32(LOV_MAGIC_FOREIGN) != LOV_MAGIC_FOREIGN) { + __swab32s(&lfm->lfm_magic); + __swab32s(&lfm->lfm_length); + __swab32s(&lfm->lfm_type); + __swab32s(&lfm->lfm_flags); + } + *lmm_size = lo->ldo_foreign_lov_size; + RETURN(0); + } + LASSERT(comp_cnt != 0 && comp_entries != NULL); if (!is_composite) { @@ -941,8 +987,12 @@ int lod_generate_lovea(const struct lu_env *env, struct lod_object *lo, } lcm = (struct lov_comp_md_v1 *)lmm; + memset(lcm, 0, sizeof(*lcm)); + lcm->lcm_magic = cpu_to_le32(LOV_MAGIC_COMP_V1); lcm->lcm_entry_count = cpu_to_le16(comp_cnt); + lcm->lcm_mirror_count = cpu_to_le16(mirror_cnt - 1); + lcm->lcm_flags = cpu_to_le16(lo->ldo_flr_state); offset = sizeof(*lcm) + sizeof(*lcme) * comp_cnt; LASSERT(offset % sizeof(__u64) == 0); @@ -955,15 +1005,17 @@ int lod_generate_lovea(const struct lu_env *env, struct lod_object *lo, lod_comp = &comp_entries[i]; lcme = &lcm->lcm_entries[i]; - if (lod_comp->llc_id == LCME_ID_INVAL && !is_dir) { - lod_comp->llc_id = lod_gen_component_id(lo, i); - if (lod_comp->llc_id == LCME_ID_INVAL) - GOTO(out, rc = -ERANGE); - } + LASSERT(ergo(!is_dir, lod_comp->llc_id != LCME_ID_INVAL)); lcme->lcme_id = cpu_to_le32(lod_comp->llc_id); /* component could be un-inistantiated */ lcme->lcme_flags = cpu_to_le32(lod_comp->llc_flags); + if (lod_comp->llc_flags & LCME_FL_NOSYNC) + lcme->lcme_timestamp = + cpu_to_le64(lod_comp->llc_timestamp); + if (lod_comp->llc_flags & LCME_FL_EXTENSION && !is_dir) + lcm->lcm_magic = cpu_to_le32(LOV_MAGIC_SEL); + lcme->lcme_extent.e_start = cpu_to_le64(lod_comp->llc_extent.e_start); lcme->lcme_extent.e_end = @@ -1100,15 +1152,16 @@ static int validate_lod_and_idx(struct lod_device *md, __u32 idx) int lod_initialize_objects(const struct lu_env *env, struct lod_object *lo, struct lov_ost_data_v1 *objs, int comp_idx) { - struct lod_layout_component *lod_comp; - struct lod_thread_info *info = lod_env_info(env); - struct lod_device *md; - struct lu_object *o, *n; - struct lu_device *nd; - struct dt_object **stripe; - int stripe_len; - int i, rc = 0; - __u32 idx; + struct lod_layout_component *lod_comp; + struct lod_thread_info *info = lod_env_info(env); + struct lod_device *md; + struct lu_object *o, *n; + struct lu_device *nd; + struct dt_object **stripe = NULL; + __u32 *ost_indices = NULL; + int stripe_len; + int i, rc = 0; + __u32 idx; ENTRY; LASSERT(lo != NULL); @@ -1125,6 +1178,9 @@ int lod_initialize_objects(const struct lu_env *env, struct lod_object *lo, OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_len); if (stripe == NULL) RETURN(-ENOMEM); + OBD_ALLOC(ost_indices, sizeof(*ost_indices) * stripe_len); + if (!ost_indices) + GOTO(out, rc = -ENOMEM); for (i = 0; i < lod_comp->llc_stripe_count; i++) { if (unlikely(lovea_slot_is_dummy(&objs[i]))) @@ -1159,6 +1215,7 @@ int lod_initialize_objects(const struct lu_env *env, struct lod_object *lo, LASSERT(n); stripe[i] = container_of(n, struct dt_object, do_lu); + ost_indices[i] = idx; } out: @@ -1169,8 +1226,12 @@ out: OBD_FREE(stripe, sizeof(stripe[0]) * stripe_len); lod_comp->llc_stripe_count = 0; + if (ost_indices) + OBD_FREE(ost_indices, + sizeof(*ost_indices) * stripe_len); } else { lod_comp->llc_stripe = stripe; + lod_comp->llc_ost_indices = ost_indices; lod_comp->llc_stripes_allocated = stripe_len; } @@ -1195,39 +1256,68 @@ int lod_parse_striping(const struct lu_env *env, struct lod_object *lo, { struct lov_mds_md_v1 *lmm; struct lov_comp_md_v1 *comp_v1 = NULL; + struct lov_foreign_md *foreign = NULL; struct lov_ost_data_v1 *objs; __u32 magic, pattern; int i, j, rc = 0; __u16 comp_cnt; + __u16 mirror_cnt = 0; ENTRY; LASSERT(buf); LASSERT(buf->lb_buf); LASSERT(buf->lb_len); + LASSERT(mutex_is_locked(&lo->ldo_layout_mutex)); lmm = (struct lov_mds_md_v1 *)buf->lb_buf; magic = le32_to_cpu(lmm->lmm_magic); if (magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3 && - magic != LOV_MAGIC_COMP_V1) + magic != LOV_MAGIC_COMP_V1 && magic != LOV_MAGIC_FOREIGN && + magic != LOV_MAGIC_SEL) GOTO(out, rc = -EINVAL); - lod_free_comp_entries(lo); + if (lo->ldo_is_foreign) + lod_free_foreign_lov(lo); + else + lod_free_comp_entries(lo); - if (magic == LOV_MAGIC_COMP_V1) { + if (magic == LOV_MAGIC_COMP_V1 || magic == LOV_MAGIC_SEL) { comp_v1 = (struct lov_comp_md_v1 *)lmm; comp_cnt = le16_to_cpu(comp_v1->lcm_entry_count); if (comp_cnt == 0) GOTO(out, rc = -EINVAL); lo->ldo_layout_gen = le32_to_cpu(comp_v1->lcm_layout_gen); lo->ldo_is_composite = 1; + lo->ldo_flr_state = le16_to_cpu(comp_v1->lcm_flags) & + LCM_FL_FLR_MASK; + mirror_cnt = le16_to_cpu(comp_v1->lcm_mirror_count) + 1; + } else if (magic == LOV_MAGIC_FOREIGN) { + size_t length; + + foreign = (struct lov_foreign_md *)buf->lb_buf; + length = offsetof(typeof(*foreign), lfm_value); + if (buf->lb_len < length || + buf->lb_len < (length + le32_to_cpu(foreign->lfm_length))) { + CDEBUG(D_LAYOUT, + "buf len %zu too small for lov_foreign_md\n", + buf->lb_len); + GOTO(out, rc = -EINVAL); + } + + /* just cache foreign LOV EA raw */ + rc = lod_alloc_foreign_lov(lo, length); + if (rc) + GOTO(out, rc); + memcpy(lo->ldo_foreign_lov, buf->lb_buf, length); + GOTO(out, rc); } else { comp_cnt = 1; lo->ldo_layout_gen = le16_to_cpu(lmm->lmm_layout_gen); lo->ldo_is_composite = 0; } - rc = lod_alloc_comp_entries(lo, comp_cnt); + rc = lod_alloc_comp_entries(lo, mirror_cnt, comp_cnt); if (rc) GOTO(out, rc); @@ -1248,17 +1338,31 @@ int lod_parse_striping(const struct lu_env *env, struct lod_object *lo, lod_comp->llc_extent.e_end = le64_to_cpu(ext->e_end); lod_comp->llc_flags = le32_to_cpu(comp_v1->lcm_entries[i].lcme_flags); + if (lod_comp->llc_flags & LCME_FL_NOSYNC) + lod_comp->llc_timestamp = le64_to_cpu( + comp_v1->lcm_entries[i].lcme_timestamp); lod_comp->llc_id = le32_to_cpu(comp_v1->lcm_entries[i].lcme_id); if (lod_comp->llc_id == LCME_ID_INVAL) GOTO(out, rc = -EINVAL); + + if (comp_v1->lcm_entries[i].lcme_flags & + cpu_to_le32(LCME_FL_EXTENSION) && + magic != LOV_MAGIC_SEL) { + struct lod_device *d = + lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); + + CDEBUG(D_WARNING, "%s: not SEL magic on SEL " + "file "DFID": %x\n", + lod2obd(d)->obd_name, + PFID(lod_object_fid(lo)), magic); + } } else { lod_comp_set_init(lod_comp); } pattern = le32_to_cpu(lmm->lmm_pattern); - if (lov_pattern(pattern) != LOV_PATTERN_RAID0 && - lov_pattern(pattern) != LOV_PATTERN_MDT) + if (!lov_pattern_supported(lov_pattern(pattern))) GOTO(out, rc = -EINVAL); lod_comp->llc_pattern = pattern; @@ -1268,9 +1372,10 @@ int lod_parse_striping(const struct lu_env *env, struct lod_object *lo, if (magic == LOV_MAGIC_V3) { struct lov_mds_md_v3 *v3 = (struct lov_mds_md_v3 *)lmm; + lod_set_pool(&lod_comp->llc_pool, v3->lmm_pool_name); objs = &v3->lmm_objects[0]; - /* no need to set pool, which is used in create only */ } else { + lod_set_pool(&lod_comp->llc_pool, NULL); objs = &lmm->lmm_objects[0]; } @@ -1279,23 +1384,29 @@ int lod_parse_striping(const struct lu_env *env, struct lod_object *lo, * then user has specified ost list for this component. */ if (!lod_comp_inited(lod_comp)) { + __u16 stripe_count; + if (objs[0].l_ost_idx != (__u32)-1UL) { + stripe_count = lod_comp_entry_stripe_count( + lo, lod_comp, false); + if (stripe_count == 0 && + !(lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED) && + !(lod_comp->llc_pattern & LOV_PATTERN_MDT)) + GOTO(out, rc = -E2BIG); /** * load the user specified ost list, when this * component is instantiated later, it will be * used in lod_alloc_ost_list(). */ - lod_comp->llc_ostlist.op_count = - lod_comp->llc_stripe_count; + lod_comp->llc_ostlist.op_count = stripe_count; lod_comp->llc_ostlist.op_size = - lod_comp->llc_stripe_count * - sizeof(__u32); + stripe_count * sizeof(__u32); OBD_ALLOC(lod_comp->llc_ostlist.op_array, lod_comp->llc_ostlist.op_size); if (!lod_comp->llc_ostlist.op_array) GOTO(out, rc = -ENOMEM); - for (j = 0; j < lod_comp->llc_stripe_count; j++) + for (j = 0; j < stripe_count; j++) lod_comp->llc_ostlist.op_array[j] = le32_to_cpu(objs[j].l_ost_idx); @@ -1326,9 +1437,14 @@ int lod_parse_striping(const struct lu_env *env, struct lod_object *lo, GOTO(out, rc); } } + + rc = lod_fill_mirrors(lo); + if (rc) + GOTO(out, rc); + out: if (rc) - lod_object_free_striping(env, lo); + lod_striping_free_nolock(env, lo); RETURN(rc); } @@ -1348,7 +1464,7 @@ static bool lod_striping_loaded(struct lod_object *lo) return true; if (S_ISDIR(lod2lu_obj(lo)->lo_header->loh_attr)) { - if (lo->ldo_stripe != NULL || lo->ldo_dir_stripe_loaded) + if (lo->ldo_dir_stripe_loaded) return true; /* Never load LMV stripe for slaves of striped dir */ @@ -1360,37 +1476,45 @@ static bool lod_striping_loaded(struct lod_object *lo) } /** - * Initialize the object representing the stripes. + * A generic function to initialize the stripe objects. * - * Unless the stripes are initialized already, fetch LOV (for regular - * objects) or LMV (for directory objects) EA and call lod_parse_striping() - * to instantiate the objects representing the stripes. Caller should - * hold the dt_write_lock(next). + * A protected version of lod_striping_load_locked() - load the striping + * information from storage, parse that and instantiate LU objects to + * represent the stripes. The LOD object \a lo supplies a pointer to the + * next sub-object in the LU stack so we can lock it. Also use \a lo to + * return an array of references to the newly instantiated objects. * * \param[in] env execution environment for this thread - * \param[in,out] lo LOD object + * \param[in,out] lo LOD object, where striping is stored and + * which gets an array of references * * \retval 0 if parsing and object creation succeed * \retval negative error number on failure - */ -int lod_load_striping_locked(const struct lu_env *env, struct lod_object *lo) + **/ +int lod_striping_load(const struct lu_env *env, struct lod_object *lo) { - struct lod_thread_info *info = lod_env_info(env); - struct lu_buf *buf = &info->lti_buf; - struct dt_object *next = dt_object_child(&lo->ldo_obj); - int rc = 0; + struct lod_thread_info *info = lod_env_info(env); + struct dt_object *next = dt_object_child(&lo->ldo_obj); + struct lu_buf *buf = &info->lti_buf; + int rc = 0; + ENTRY; if (!dt_object_exists(next)) - GOTO(out, rc = 0); + RETURN(0); if (lod_striping_loaded(lo)) - GOTO(out, rc = 0); + RETURN(0); + + mutex_lock(&lo->ldo_layout_mutex); + if (lod_striping_loaded(lo)) + GOTO(unlock, rc = 0); if (S_ISREG(lod2lu_obj(lo)->lo_header->loh_attr)) { rc = lod_get_lov_ea(env, lo); if (rc <= 0) - GOTO(out, rc); + GOTO(unlock, rc); + /* * there is LOV EA (striping information) in this object * let's parse it and create in-core objects for the stripes @@ -1402,14 +1526,30 @@ int lod_load_striping_locked(const struct lu_env *env, struct lod_object *lo) lo->ldo_comp_cached = 1; } else if (S_ISDIR(lod2lu_obj(lo)->lo_header->loh_attr)) { rc = lod_get_lmv_ea(env, lo); - if (rc < (typeof(rc))sizeof(struct lmv_mds_md_v1)) { + if (rc > sizeof(struct lmv_foreign_md)) { + struct lmv_foreign_md *lfm = info->lti_ea_store; + + if (le32_to_cpu(lfm->lfm_magic) == LMV_MAGIC_FOREIGN) { + lo->ldo_foreign_lmv = info->lti_ea_store; + lo->ldo_foreign_lmv_size = + info->lti_ea_store_size; + info->lti_ea_store = NULL; + info->lti_ea_store_size = 0; + + lo->ldo_dir_stripe_loaded = 1; + lo->ldo_dir_is_foreign = 1; + GOTO(unlock, rc = 0); + } + } + + if (rc < (int)sizeof(struct lmv_mds_md_v1)) { /* Let's set stripe_loaded to avoid further * stripe loading especially for non-stripe directory, * which can hurt performance. (See LU-9840) */ if (rc == 0) lo->ldo_dir_stripe_loaded = 1; - GOTO(out, rc = rc > 0 ? -EINVAL : rc); + GOTO(unlock, rc = rc > 0 ? -EINVAL : rc); } buf->lb_buf = info->lti_ea_store; buf->lb_len = info->lti_ea_store_size; @@ -1423,7 +1563,7 @@ int lod_load_striping_locked(const struct lu_env *env, struct lod_object *lo) } if (rc < 0) - GOTO(out, rc); + GOTO(unlock, rc); } /* @@ -1434,44 +1574,26 @@ int lod_load_striping_locked(const struct lu_env *env, struct lod_object *lo) if (rc == 0) lo->ldo_dir_stripe_loaded = 1; } -out: - RETURN(rc); + EXIT; +unlock: + mutex_unlock(&lo->ldo_layout_mutex); + + return rc; } -/** - * A generic function to initialize the stripe objects. - * - * A protected version of lod_load_striping_locked() - load the striping - * information from storage, parse that and instantiate LU objects to - * represent the stripes. The LOD object \a lo supplies a pointer to the - * next sub-object in the LU stack so we can lock it. Also use \a lo to - * return an array of references to the newly instantiated objects. - * - * \param[in] env execution environment for this thread - * \param[in,out] lo LOD object, where striping is stored and - * which gets an array of references - * - * \retval 0 if parsing and object creation succeed - * \retval negative error number on failure - **/ -int lod_load_striping(const struct lu_env *env, struct lod_object *lo) +int lod_striping_reload(const struct lu_env *env, struct lod_object *lo, + const struct lu_buf *buf) { - struct dt_object *next = dt_object_child(&lo->ldo_obj); - int rc; + int rc; - if (!dt_object_exists(next)) - return 0; + ENTRY; - /* Check without locking first */ - if (lod_striping_loaded(lo)) - return 0; + mutex_lock(&lo->ldo_layout_mutex); + lod_striping_free_nolock(env, lo); + rc = lod_parse_striping(env, lo, buf); + mutex_unlock(&lo->ldo_layout_mutex); - /* currently this code is supposed to be called from declaration - * phase only, thus the object is not expected to be locked by caller */ - dt_write_lock(env, next, 0); - rc = lod_load_striping_locked(env, lo); - dt_write_unlock(env, next); - return rc; + RETURN(rc); } /** @@ -1513,7 +1635,7 @@ static int lod_verify_v1v3(struct lod_device *d, const struct lu_buf *buf, GOTO(out, rc = -EINVAL); } - magic = le32_to_cpu(lum->lmm_magic) & ~LOV_MAGIC_DEF; + magic = le32_to_cpu(lum->lmm_magic) & ~LOV_MAGIC_DEFINED; if (magic != LOV_USER_MAGIC_V1 && magic != LOV_USER_MAGIC_V3 && magic != LOV_USER_MAGIC_SPECIFIC) { @@ -1611,6 +1733,124 @@ out: RETURN(rc); } +static inline +struct lov_comp_md_entry_v1 *comp_entry_v1(struct lov_comp_md_v1 *comp, int i) +{ + LASSERTF((le32_to_cpu(comp->lcm_magic) & ~LOV_MAGIC_DEFINED) == + LOV_USER_MAGIC_COMP_V1, "Wrong magic %x\n", + le32_to_cpu(comp->lcm_magic)); + LASSERTF(i >= 0 && i < le16_to_cpu(comp->lcm_entry_count), + "bad index %d, max = %d\n", + i, le16_to_cpu(comp->lcm_entry_count)); + + return &comp->lcm_entries[i]; +} + +#define for_each_comp_entry_v1(comp, entry) \ + for (entry = comp_entry_v1(comp, 0); \ + entry <= comp_entry_v1(comp, \ + le16_to_cpu(comp->lcm_entry_count) - 1); \ + entry++) + +int lod_erase_dom_stripe(struct lov_comp_md_v1 *comp_v1) +{ + struct lov_comp_md_entry_v1 *ent, *dom_ent; + __u16 entries; + __u32 dom_off, dom_size, comp_size; + void *blob_src, *blob_dst; + unsigned int blob_size, blob_shift; + + entries = le16_to_cpu(comp_v1->lcm_entry_count) - 1; + /* if file has only DoM stripe return just error */ + if (entries == 0) + return -EFBIG; + + comp_size = le32_to_cpu(comp_v1->lcm_size); + dom_ent = &comp_v1->lcm_entries[0]; + dom_off = le32_to_cpu(dom_ent->lcme_offset); + dom_size = le32_to_cpu(dom_ent->lcme_size); + + /* shift entries array first */ + comp_v1->lcm_entry_count = cpu_to_le16(entries); + memmove(dom_ent, dom_ent + 1, + entries * sizeof(struct lov_comp_md_entry_v1)); + + /* now move blob of layouts */ + blob_dst = (void *)comp_v1 + dom_off - sizeof(*dom_ent); + blob_src = (void *)comp_v1 + dom_off + dom_size; + blob_size = (unsigned long)((void *)comp_v1 + comp_size - blob_src); + blob_shift = sizeof(*dom_ent) + dom_size; + + memmove(blob_dst, blob_src, blob_size); + + for_each_comp_entry_v1(comp_v1, ent) { + __u32 off; + + off = le32_to_cpu(ent->lcme_offset); + ent->lcme_offset = cpu_to_le32(off - blob_shift); + } + + comp_v1->lcm_size = cpu_to_le32(comp_size - blob_shift); + + /* notify a caller to re-check entry */ + return -ERESTART; +} + +int lod_fix_dom_stripe(struct lod_device *d, struct lov_comp_md_v1 *comp_v1) +{ + struct lov_comp_md_entry_v1 *ent, *dom_ent; + struct lu_extent *dom_ext, *ext; + struct lov_user_md_v1 *lum; + __u32 stripe_size; + __u16 mid, dom_mid; + int rc = 0; + + dom_ent = &comp_v1->lcm_entries[0]; + dom_ext = &dom_ent->lcme_extent; + dom_mid = mirror_id_of(le32_to_cpu(dom_ent->lcme_id)); + stripe_size = d->lod_dom_max_stripesize; + + lum = (void *)comp_v1 + le32_to_cpu(dom_ent->lcme_offset); + CDEBUG(D_LAYOUT, "DoM component size %u was bigger than MDT limit %u, " + "new size is %u\n", le32_to_cpu(lum->lmm_stripe_size), + d->lod_dom_max_stripesize, stripe_size); + lum->lmm_stripe_size = cpu_to_le32(stripe_size); + + for_each_comp_entry_v1(comp_v1, ent) { + if (ent == dom_ent) + continue; + + mid = mirror_id_of(le32_to_cpu(ent->lcme_id)); + if (mid != dom_mid) + continue; + + ext = &ent->lcme_extent; + if (ext->e_start != dom_ext->e_end) + continue; + + /* Found next component after the DoM one with the same + * mirror_id and adjust its start with DoM component end. + * + * NOTE: we are considering here that there can be only one + * DoM component in a file, all replicas are located on OSTs + * always and don't need adjustment since use own layouts. + */ + ext->e_start = cpu_to_le64(stripe_size); + break; + } + + if (stripe_size == 0) { + /* DoM component size is zero due to server setting, + * remove it from the layout */ + rc = lod_erase_dom_stripe(comp_v1); + } else { + /* Update DoM extent end finally */ + dom_ext->e_end = cpu_to_le64(stripe_size); + } + + return rc; +} + /** * Verify LOV striping. * @@ -1623,24 +1863,64 @@ out: * \retval 0 if the striping is valid * \retval -EINVAL if striping is invalid */ -int lod_verify_striping(struct lod_device *d, const struct lu_buf *buf, - bool is_from_disk, __u64 start) +int lod_verify_striping(struct lod_device *d, struct lod_object *lo, + const struct lu_buf *buf, bool is_from_disk) { - struct lov_user_md_v1 *lum; - struct lov_comp_md_v1 *comp_v1; - __u32 magic; - int rc = 0, i; + struct lov_desc *desc = &d->lod_desc; + struct lov_user_md_v1 *lum; + struct lov_comp_md_v1 *comp_v1; + struct lov_comp_md_entry_v1 *ent; + struct lu_extent *ext; + struct lu_buf tmp; + __u64 prev_end = 0; + __u32 stripe_size = 0; + __u16 prev_mid = -1, mirror_id = -1; + __u32 mirror_count; + __u32 magic; + int rc = 0; ENTRY; + if (buf->lb_len < sizeof(lum->lmm_magic)) { + CDEBUG(D_LAYOUT, "invalid buf len %zu\n", buf->lb_len); + RETURN(-EINVAL); + } + lum = buf->lb_buf; + magic = le32_to_cpu(lum->lmm_magic) & ~LOV_MAGIC_DEFINED; + /* treat foreign LOV EA/object case first + * XXX is it expected to try setting again a foreign? + * XXX should we care about different current vs new layouts ? + */ + if (unlikely(magic == LOV_USER_MAGIC_FOREIGN)) { + struct lov_foreign_md *lfm = buf->lb_buf; + + if (buf->lb_len < offsetof(typeof(*lfm), lfm_value)) { + CDEBUG(D_LAYOUT, + "buf len %zu < min lov_foreign_md size (%zu)\n", + buf->lb_len, offsetof(typeof(*lfm), + lfm_value)); + RETURN(-EINVAL); + } + + if (foreign_size_le(lfm) > buf->lb_len) { + CDEBUG(D_LAYOUT, + "buf len %zu < this lov_foreign_md size (%zu)\n", + buf->lb_len, foreign_size_le(lfm)); + RETURN(-EINVAL); + } + /* Don't do anything with foreign layouts */ + RETURN(0); + } + + /* normal LOV/layout cases */ + if (buf->lb_len < sizeof(*lum)) { CDEBUG(D_LAYOUT, "buf len %zu too small for lov_user_md\n", buf->lb_len); RETURN(-EINVAL); } - magic = le32_to_cpu(lum->lmm_magic) & ~LOV_MAGIC_DEF; if (magic != LOV_USER_MAGIC_V1 && magic != LOV_USER_MAGIC_V3 && magic != LOV_USER_MAGIC_SPECIFIC && @@ -1650,116 +1930,151 @@ int lod_verify_striping(struct lod_device *d, const struct lu_buf *buf, RETURN(-EINVAL); } - if (magic == LOV_USER_MAGIC_COMP_V1) { - struct lov_comp_md_entry_v1 *ent; - struct lu_extent *ext; - struct lov_desc *desc = &d->lod_desc; - struct lu_buf tmp; - __u32 stripe_size = 0; - __u64 prev_end = start; - - comp_v1 = buf->lb_buf; - if (buf->lb_len < le32_to_cpu(comp_v1->lcm_size)) { - CDEBUG(D_LAYOUT, "buf len %zu is less than %u\n", - buf->lb_len, le32_to_cpu(comp_v1->lcm_size)); - RETURN(-EINVAL); - } + if (magic != LOV_USER_MAGIC_COMP_V1) + RETURN(lod_verify_v1v3(d, buf, is_from_disk)); + + /* magic == LOV_USER_MAGIC_COMP_V1 */ + comp_v1 = buf->lb_buf; + if (buf->lb_len < le32_to_cpu(comp_v1->lcm_size)) { + CDEBUG(D_LAYOUT, "buf len %zu is less than %u\n", + buf->lb_len, le32_to_cpu(comp_v1->lcm_size)); + RETURN(-EINVAL); + } + +recheck: + mirror_count = 0; + if (le16_to_cpu(comp_v1->lcm_entry_count) == 0) { + CDEBUG(D_LAYOUT, "entry count is zero\n"); + RETURN(-EINVAL); + } - if (le16_to_cpu(comp_v1->lcm_entry_count) == 0) { - CDEBUG(D_LAYOUT, "entry count is zero\n"); + if (S_ISREG(lod2lu_obj(lo)->lo_header->loh_attr) && + lo->ldo_comp_cnt > 0) { + /* could be called from lustre.lov.add */ + __u32 cnt = lo->ldo_comp_cnt; + + ext = &lo->ldo_comp_entries[cnt - 1].llc_extent; + prev_end = ext->e_end; + + ++mirror_count; + } + + for_each_comp_entry_v1(comp_v1, ent) { + ext = &ent->lcme_extent; + + if (le64_to_cpu(ext->e_start) > le64_to_cpu(ext->e_end)) { + CDEBUG(D_LAYOUT, "invalid extent "DEXT"\n", + le64_to_cpu(ext->e_start), + le64_to_cpu(ext->e_end)); RETURN(-EINVAL); } - for (i = 0; i < le16_to_cpu(comp_v1->lcm_entry_count); i++) { - ent = &comp_v1->lcm_entries[i]; - ext = &ent->lcme_extent; - - if (is_from_disk && - (le32_to_cpu(ent->lcme_id) == 0 || - le32_to_cpu(ent->lcme_id) > LCME_ID_MAX)) { + if (is_from_disk) { + /* lcme_id contains valid value */ + if (le32_to_cpu(ent->lcme_id) == 0 || + le32_to_cpu(ent->lcme_id) > LCME_ID_MAX) { CDEBUG(D_LAYOUT, "invalid id %u\n", le32_to_cpu(ent->lcme_id)); RETURN(-EINVAL); } - if (le64_to_cpu(ext->e_start) >= - le64_to_cpu(ext->e_end)) { - CDEBUG(D_LAYOUT, "invalid extent " - "[%llu, %llu)\n", - le64_to_cpu(ext->e_start), - le64_to_cpu(ext->e_end)); - RETURN(-EINVAL); - } + if (le16_to_cpu(comp_v1->lcm_mirror_count) > 0) { + mirror_id = mirror_id_of( + le32_to_cpu(ent->lcme_id)); - /* first component must start with 0, and the next - * must be adjacent with the previous one */ - if (le64_to_cpu(ext->e_start) != prev_end) { - CDEBUG(D_LAYOUT, "invalid start " - "actual:%llu, expect:%llu\n", - le64_to_cpu(ext->e_start), prev_end); - RETURN(-EINVAL); - } - prev_end = le64_to_cpu(ext->e_end); - - tmp.lb_buf = (char *)comp_v1 + - le32_to_cpu(ent->lcme_offset); - tmp.lb_len = le32_to_cpu(ent->lcme_size); - - /* Checks for DoM entry in composite layout. */ - lum = tmp.lb_buf; - if (lov_pattern(le32_to_cpu(lum->lmm_pattern)) == - LOV_PATTERN_MDT) { - /* DoM component can be only the first entry */ - if (i > 0) { - CDEBUG(D_LAYOUT, "invalid DoM layout " - "entry found at %i index\n", i); - RETURN(-EINVAL); - } - stripe_size = le32_to_cpu(lum->lmm_stripe_size); - /* There is just one stripe on MDT and it must - * cover whole component size. */ - if (stripe_size != prev_end) { - CDEBUG(D_LAYOUT, "invalid DoM layout " - "stripe size %u != %llu " - "(component size)\n", - stripe_size, prev_end); - RETURN(-EINVAL); - } - /* Check stripe size againts per-MDT limit */ - if (stripe_size > d->lod_dom_max_stripesize) { - CDEBUG(D_LAYOUT, "DoM component size " - "%u is bigger than MDT limit " - "%u, check dom_max_stripesize" - " parameter\n", - stripe_size, - d->lod_dom_max_stripesize); + /* first component must start with 0 */ + if (mirror_id != prev_mid && + le64_to_cpu(ext->e_start) != 0) { + CDEBUG(D_LAYOUT, + "invalid start:%llu, expect:0\n", + le64_to_cpu(ext->e_start)); RETURN(-EINVAL); } + + prev_mid = mirror_id; } - rc = lod_verify_v1v3(d, &tmp, is_from_disk); - if (rc) - break; + } - lum = tmp.lb_buf; + if (le64_to_cpu(ext->e_start) == 0) { + ++mirror_count; + prev_end = 0; + } + + /* the next must be adjacent with the previous one */ + if (le64_to_cpu(ext->e_start) != prev_end) { + CDEBUG(D_LAYOUT, + "invalid start actual:%llu, expect:%llu\n", + le64_to_cpu(ext->e_start), prev_end); + RETURN(-EINVAL); + } - /* extent end must be aligned with the stripe_size */ + tmp.lb_buf = (char *)comp_v1 + le32_to_cpu(ent->lcme_offset); + tmp.lb_len = le32_to_cpu(ent->lcme_size); + + /* Check DoM entry is always the first one */ + lum = tmp.lb_buf; + if (lov_pattern(le32_to_cpu(lum->lmm_pattern)) == + LOV_PATTERN_MDT) { + /* DoM component can be only the first stripe */ + if (le64_to_cpu(ext->e_start) > 0) { + CDEBUG(D_LAYOUT, "invalid DoM component " + "with %llu extent start\n", + le64_to_cpu(ext->e_start)); + RETURN(-EINVAL); + } stripe_size = le32_to_cpu(lum->lmm_stripe_size); - if (stripe_size == 0) - stripe_size = desc->ld_default_stripe_size; - if (stripe_size == 0 || - (prev_end != LUSTRE_EOF && - (prev_end & (stripe_size - 1)))) { - CDEBUG(D_LAYOUT, "stripe size isn't aligned. " - " stripe_sz: %u, [%llu, %llu)\n", - stripe_size, ext->e_start, prev_end); + /* There is just one stripe on MDT and it must + * cover whole component size. */ + if (stripe_size != le64_to_cpu(ext->e_end)) { + CDEBUG(D_LAYOUT, "invalid DoM layout " + "stripe size %u != %llu " + "(component size)\n", + stripe_size, prev_end); RETURN(-EINVAL); } + /* Check stripe size againts per-MDT limit */ + if (stripe_size > d->lod_dom_max_stripesize) { + CDEBUG(D_LAYOUT, "DoM component size " + "%u is bigger than MDT limit %u, check " + "dom_max_stripesize parameter\n", + stripe_size, d->lod_dom_max_stripesize); + rc = lod_fix_dom_stripe(d, comp_v1); + if (rc == -ERESTART) { + /* DoM entry was removed, re-check + * new layout from start */ + goto recheck; + } else if (rc) { + RETURN(rc); + } + } + } + + prev_end = le64_to_cpu(ext->e_end); + + rc = lod_verify_v1v3(d, &tmp, is_from_disk); + if (rc) + RETURN(rc); + + if (prev_end == LUSTRE_EOF) + continue; + + /* extent end must be aligned with the stripe_size */ + stripe_size = le32_to_cpu(lum->lmm_stripe_size); + if (stripe_size == 0) + stripe_size = desc->ld_default_stripe_size; + if (prev_end % stripe_size) { + CDEBUG(D_LAYOUT, "stripe size isn't aligned, " + "stripe_sz: %u, [%llu, %llu)\n", + stripe_size, ext->e_start, prev_end); + RETURN(-EINVAL); } - } else { - rc = lod_verify_v1v3(d, buf, is_from_disk); } - RETURN(rc); + /* make sure that the mirror_count is telling the truth */ + if (mirror_count != le16_to_cpu(comp_v1->lcm_mirror_count) + 1) + RETURN(-EINVAL); + + RETURN(0); } /** @@ -1814,9 +2129,8 @@ void lod_fix_desc_stripe_count(__u32 *val) void lod_fix_desc_pattern(__u32 *val) { /* from lov_setstripe */ - if ((*val != 0) && (*val != LOV_PATTERN_RAID0) && - (*val != LOV_PATTERN_MDT)) { - LCONSOLE_WARN("Unknown stripe pattern: %#x\n", *val); + if ((*val != 0) && !lov_pattern_supported_normal_comp(*val)) { + LCONSOLE_WARN("lod: Unknown stripe pattern: %#x\n", *val); *val = 0; } } @@ -1894,16 +2208,17 @@ int lod_pools_init(struct lod_device *lod, struct lustre_cfg *lcfg) lod->lod_sp_me = LUSTRE_SP_CLI; /* Set up allocation policy (QoS and RR) */ - INIT_LIST_HEAD(&lod->lod_qos.lq_oss_list); + INIT_LIST_HEAD(&lod->lod_qos.lq_svr_list); init_rwsem(&lod->lod_qos.lq_rw_sem); lod->lod_qos.lq_dirty = 1; - lod->lod_qos.lq_rr.lqr_dirty = 1; lod->lod_qos.lq_reset = 1; /* Default priority is toward free space balance */ lod->lod_qos.lq_prio_free = 232; /* Default threshold for rr (roughly 17%) */ lod->lod_qos.lq_threshold_rr = 43; + lu_qos_rr_init(&lod->lod_qos.lq_rr); + /* Set up OST pool environment */ lod->lod_pools_hash_body = cfs_hash_create("POOLS", HASH_POOLS_CUR_BITS, HASH_POOLS_MAX_BITS, @@ -1920,7 +2235,6 @@ int lod_pools_init(struct lod_device *lod, struct lustre_cfg *lcfg) rc = lod_ost_pool_init(&lod->lod_pool_info, 0); if (rc) GOTO(out_hash, rc); - lod_qos_rr_init(&lod->lod_qos.lq_rr); rc = lod_ost_pool_init(&lod->lod_qos.lq_rr.lqr_pool, 0); if (rc) GOTO(out_pool_info, rc);