X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Flod%2Flod_object.c;h=f4829a3ac0a86c8ed4546d816328f20c61f9c0b3;hp=d19f91d6b98ad2fae6f10bd563e9904b14f247b3;hb=bd7a2f9938a7edf09afd133601ca4181e109a7d0;hpb=b25e8a4bc86ee245be19dc05c085a2b0f4fe4a43 diff --git a/lustre/lod/lod_object.c b/lustre/lod/lod_object.c index d19f91d..f4829a3 100644 --- a/lustre/lod/lod_object.c +++ b/lustre/lod/lod_object.c @@ -344,7 +344,7 @@ static int lod_it_key_rec(const struct lu_env *env, const struct dt_it *di, key_rec); } -static struct dt_index_operations lod_index_ops = { +static const struct dt_index_operations lod_index_ops = { .dio_lookup = lod_lookup, .dio_declare_insert = lod_declare_insert, .dio_insert = lod_insert, @@ -367,6 +367,50 @@ static struct dt_index_operations lod_index_ops = { }; /** + * Implementation of dt_index_operations::dio_lookup + * + * Used with striped directories. + * + * \see dt_index_operations::dio_lookup() in the API description for details. + */ +static int lod_striped_lookup(const struct lu_env *env, struct dt_object *dt, + struct dt_rec *rec, const struct dt_key *key) +{ + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object *next; + const char *name = (const char *)key; + + LASSERT(lo->ldo_dir_stripe_count > 0); + + if (strcmp(name, dot) == 0) { + struct lu_fid *fid = (struct lu_fid *)rec; + + *fid = *lod_object_fid(lo); + return 1; + } + + if (strcmp(name, dotdot) == 0) { + next = dt_object_child(dt); + } else { + int index; + + index = __lmv_name_to_stripe_index(lo->ldo_dir_hash_type, + lo->ldo_dir_stripe_count, + lo->ldo_dir_migrate_hash, + lo->ldo_dir_migrate_offset, + name, strlen(name), true); + if (index < 0) + return index; + + next = lo->ldo_stripe[index]; + if (!next || !dt_object_exists(next)) + return -ENODEV; + } + + return next->do_index_ops->dio_lookup(env, next, rec, key); +} + +/** * Implementation of dt_it_ops::init. * * Used with striped objects. Internally just initializes the iterator @@ -743,8 +787,8 @@ static int lod_striped_it_load(const struct lu_env *env, return next->do_index_ops->dio_it.load(env, it->lit_it, hash); } -static struct dt_index_operations lod_striped_index_ops = { - .dio_lookup = lod_lookup, +static const struct dt_index_operations lod_striped_index_ops = { + .dio_lookup = lod_striped_lookup, .dio_declare_insert = lod_declare_insert, .dio_insert = lod_insert, .dio_declare_delete = lod_declare_delete, @@ -876,13 +920,12 @@ int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo, /* The ent->lde_name is composed of ${FID}:${index} */ if (ent->lde_namelen < len + 1 || memcmp(ent->lde_name, name, len) != 0) { - CDEBUG(lod->lod_lmv_failout ? D_ERROR : D_INFO, - "%s: invalid shard name %.*s with the FID "DFID - " for the striped directory "DFID", %s\n", - lod2obd(lod)->obd_name, ent->lde_namelen, - ent->lde_name, PFID(&fid), - PFID(lu_object_fid(&obj->do_lu)), - lod->lod_lmv_failout ? "failout" : "skip"); + CDEBUG_LIMIT(lod->lod_lmv_failout ? D_ERROR : D_INFO, + "%s: invalid shard name %.*s with the FID "DFID" for the striped directory "DFID", %s\n", + lod2obd(lod)->obd_name, ent->lde_namelen, + ent->lde_name, PFID(&fid), + PFID(lu_object_fid(&obj->do_lu)), + lod->lod_lmv_failout ? "failout" : "skip"); if (lod->lod_lmv_failout) break; @@ -894,15 +937,15 @@ int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo, do { if (ent->lde_name[len] < '0' || ent->lde_name[len] > '9') { - CDEBUG(lod->lod_lmv_failout ? D_ERROR : D_INFO, - "%s: invalid shard name %.*s with the " - "FID "DFID" for the striped directory " - DFID", %s\n", - lod2obd(lod)->obd_name, ent->lde_namelen, - ent->lde_name, PFID(&fid), - PFID(lu_object_fid(&obj->do_lu)), - lod->lod_lmv_failout ? - "failout" : "skip"); + CDEBUG_LIMIT(lod->lod_lmv_failout ? + D_ERROR : D_INFO, + "%s: invalid shard name %.*s with the FID "DFID" for the striped directory "DFID", %s\n", + lod2obd(lod)->obd_name, + ent->lde_namelen, + ent->lde_name, PFID(&fid), + PFID(lu_object_fid(&obj->do_lu)), + lod->lod_lmv_failout ? + "failout" : "skip"); if (lod->lod_lmv_failout) break; @@ -1075,6 +1118,43 @@ static int lod_attr_get(const struct lu_env *env, return dt_attr_get(env, dt_object_child(dt), attr); } +void lod_adjust_stripe_size(struct lod_layout_component *comp, + __u32 def_stripe_size) +{ + __u64 comp_end = comp->llc_extent.e_end; + + /* Choose stripe size if not set. Note that default stripe size can't + * be used as is, because it must be multiplier of given component end. + * - first check if default stripe size can be used + * - if not than select the lowest set bit from component end and use + * that value as stripe size + */ + if (!comp->llc_stripe_size) { + if (comp_end == LUSTRE_EOF || !(comp_end % def_stripe_size)) + comp->llc_stripe_size = def_stripe_size; + else + comp->llc_stripe_size = comp_end & ~(comp_end - 1); + } else { + if (comp_end != LUSTRE_EOF && + comp_end & (LOV_MIN_STRIPE_SIZE - 1)) { + CWARN("Component end %llu is not a multiple of min size %u\n", + comp_end, LOV_MIN_STRIPE_SIZE); + comp_end = round_up(comp_end, LOV_MIN_STRIPE_SIZE); + } + /* check stripe size is multiplier of comp_end */ + if (comp_end != LUSTRE_EOF && + comp_end != comp->llc_extent.e_start && + comp_end % comp->llc_stripe_size) { + /* fix that even for defined stripe size but warn + * about the problem, that must not happen + */ + CWARN("Component end %llu is not aligned by the stripe size %u\n", + comp_end, comp->llc_stripe_size); + comp->llc_stripe_size = comp_end & ~(comp_end - 1); + } + } +} + static inline void lod_adjust_stripe_info(struct lod_layout_component *comp, struct lov_desc *desc, int append_stripes) @@ -1087,8 +1167,8 @@ static inline void lod_adjust_stripe_info(struct lod_layout_component *comp, desc->ld_default_stripe_count; } } - if (comp->llc_stripe_size <= 0) - comp->llc_stripe_size = desc->ld_default_stripe_size; + + lod_adjust_stripe_size(comp, desc->ld_default_stripe_size); } int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo, @@ -1096,10 +1176,10 @@ int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo, struct lod_obj_stripe_cb_data *data) { struct lod_layout_component *lod_comp; - int i, j, rc; + int i, j, rc = 0; ENTRY; - LASSERT(lo->ldo_comp_cnt != 0 && lo->ldo_comp_entries != NULL); + mutex_lock(&lo->ldo_layout_mutex); for (i = 0; i < lo->ldo_comp_cnt; i++) { lod_comp = &lo->ldo_comp_entries[i]; @@ -1119,7 +1199,7 @@ int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo, if (data->locd_comp_cb) { rc = data->locd_comp_cb(env, lo, i, data); if (rc) - RETURN(rc); + GOTO(unlock, rc); } /* could used just to do sth about component, not each @@ -1136,10 +1216,12 @@ int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo, continue; rc = data->locd_stripe_cb(env, lo, dt, th, i, j, data); if (rc != 0) - RETURN(rc); + GOTO(unlock, rc); } } - RETURN(0); +unlock: + mutex_unlock(&lo->ldo_layout_mutex); + RETURN(rc); } static bool lod_obj_attr_set_comp_skip_cb(const struct lu_env *env, @@ -1529,13 +1611,13 @@ static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt, /* The on-disk LMV EA only contains header, but the * returned LMV EA size should contain the space for * the FIDs of all shards of the striped directory. */ - if (lmv_is_sane(lmv1)) + if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_V1) rc = lmv_mds_md_size( le32_to_cpu(lmv1->lmv_stripe_count), le32_to_cpu(lmv1->lmv_magic)); } else { - lfm = buf->lb_buf; - if (le32_to_cpu(lfm->lfm_magic) == LMV_MAGIC_FOREIGN) + lmv1 = buf->lb_buf; + if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1) RETURN(rc); if (rc != sizeof(*lmv1)) @@ -1681,7 +1763,8 @@ static int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt, lmm1->lmv_magic = cpu_to_le32(LMV_MAGIC); lmm1->lmv_stripe_count = cpu_to_le32(stripe_count); lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type); - if (lo->ldo_dir_hash_type & LMV_HASH_FLAG_MIGRATION) { + lmm1->lmv_layout_version = cpu_to_le32(lo->ldo_dir_layout_version); + if (lod_is_layout_changing(lo)) { lmm1->lmv_migrate_hash = cpu_to_le32(lo->ldo_dir_migrate_hash); lmm1->lmv_migrate_offset = cpu_to_le32(lo->ldo_dir_migrate_offset); @@ -1742,8 +1825,7 @@ int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo, RETURN(-EINVAL); LASSERT(lo->ldo_stripe == NULL); - OBD_ALLOC(stripe, sizeof(stripe[0]) * - (le32_to_cpu(lmv1->lmv_stripe_count))); + OBD_ALLOC_PTR_ARRAY(stripe, le32_to_cpu(lmv1->lmv_stripe_count)); if (stripe == NULL) RETURN(-ENOMEM); @@ -1787,6 +1869,8 @@ out: lo->ldo_dir_stripe_count = le32_to_cpu(lmv1->lmv_stripe_count); lo->ldo_dir_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count); lo->ldo_dir_layout_version = le32_to_cpu(lmv1->lmv_layout_version); + lo->ldo_dir_migrate_offset = le32_to_cpu(lmv1->lmv_migrate_offset); + lo->ldo_dir_migrate_hash = le32_to_cpu(lmv1->lmv_migrate_hash); lo->ldo_dir_hash_type = le32_to_cpu(lmv1->lmv_hash_type); if (rc != 0) lod_striping_free_nolock(env, lo); @@ -1858,31 +1942,74 @@ static int lod_dir_declare_create_stripes(const struct lu_env *env, if (!dto) continue; - rc = lod_sub_declare_create(env, dto, attr, NULL, dof, th); - if (rc != 0) - GOTO(out, rc); + /* directory split skip create for existing stripes */ + if (!(lod_is_splitting(lo) && i < lo->ldo_dir_split_offset)) { + rc = lod_sub_declare_create(env, dto, attr, NULL, dof, + th); + if (rc != 0) + GOTO(out, rc); - if (!dt_try_as_dir(env, dto)) - GOTO(out, rc = -EINVAL); + if (!dt_try_as_dir(env, dto)) + GOTO(out, rc = -EINVAL); - rc = lod_sub_declare_ref_add(env, dto, th); - if (rc != 0) - GOTO(out, rc); + rc = lod_sub_declare_ref_add(env, dto, th); + if (rc != 0) + GOTO(out, rc); - rec->rec_fid = lu_object_fid(&dto->do_lu); - rc = lod_sub_declare_insert(env, dto, - (const struct dt_rec *)rec, - (const struct dt_key *)dot, th); - if (rc != 0) - GOTO(out, rc); + rec->rec_fid = lu_object_fid(&dto->do_lu); + rc = lod_sub_declare_insert(env, dto, + (const struct dt_rec *)rec, + (const struct dt_key *)dot, + th); + if (rc != 0) + GOTO(out, rc); - /* master stripe FID will be put to .. */ - rec->rec_fid = lu_object_fid(&dt->do_lu); - rc = lod_sub_declare_insert(env, dto, - (const struct dt_rec *)rec, - (const struct dt_key *)dotdot, th); - if (rc != 0) - GOTO(out, rc); + /* master stripe FID will be put to .. */ + rec->rec_fid = lu_object_fid(&dt->do_lu); + rc = lod_sub_declare_insert(env, dto, + (const struct dt_rec *)rec, + (const struct dt_key *)dotdot, + th); + if (rc != 0) + GOTO(out, rc); + + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && + cfs_fail_val == i) + snprintf(stripe_name, sizeof(info->lti_key), + DFID":%u", + PFID(lu_object_fid(&dto->do_lu)), + i + 1); + else + snprintf(stripe_name, sizeof(info->lti_key), + DFID":%u", + PFID(lu_object_fid(&dto->do_lu)), i); + + sname = lod_name_get(env, stripe_name, + strlen(stripe_name)); + rc = linkea_links_new(&ldata, &info->lti_linkea_buf, + sname, lu_object_fid(&dt->do_lu)); + if (rc != 0) + GOTO(out, rc); + + linkea_buf.lb_buf = ldata.ld_buf->lb_buf; + linkea_buf.lb_len = ldata.ld_leh->leh_len; + rc = lod_sub_declare_xattr_set(env, dto, &linkea_buf, + XATTR_NAME_LINK, 0, th); + if (rc != 0) + GOTO(out, rc); + + rec->rec_fid = lu_object_fid(&dto->do_lu); + rc = lod_sub_declare_insert(env, dt_object_child(dt), + (const struct dt_rec *)rec, + (const struct dt_key *)stripe_name, th); + if (rc != 0) + GOTO(out, rc); + + rc = lod_sub_declare_ref_add(env, dt_object_child(dt), + th); + if (rc != 0) + GOTO(out, rc); + } if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) || cfs_fail_val != i) { @@ -1898,39 +2025,6 @@ static int lod_dir_declare_create_stripes(const struct lu_env *env, if (rc != 0) GOTO(out, rc); } - - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && - cfs_fail_val == i) - snprintf(stripe_name, sizeof(info->lti_key), DFID":%u", - PFID(lu_object_fid(&dto->do_lu)), i + 1); - else - snprintf(stripe_name, sizeof(info->lti_key), DFID":%u", - PFID(lu_object_fid(&dto->do_lu)), i); - - sname = lod_name_get(env, stripe_name, strlen(stripe_name)); - rc = linkea_links_new(&ldata, &info->lti_linkea_buf, - sname, lu_object_fid(&dt->do_lu)); - if (rc != 0) - GOTO(out, rc); - - linkea_buf.lb_buf = ldata.ld_buf->lb_buf; - linkea_buf.lb_len = ldata.ld_leh->leh_len; - rc = lod_sub_declare_xattr_set(env, dto, &linkea_buf, - XATTR_NAME_LINK, 0, th); - if (rc != 0) - GOTO(out, rc); - - rec->rec_fid = lu_object_fid(&dto->do_lu); - rc = lod_sub_declare_insert(env, dt_object_child(dt), - (const struct dt_rec *)rec, - (const struct dt_key *)stripe_name, - th); - if (rc != 0) - GOTO(out, rc); - - rc = lod_sub_declare_ref_add(env, dt_object_child(dt), th); - if (rc != 0) - GOTO(out, rc); } rc = lod_sub_declare_xattr_set(env, dt_object_child(dt), @@ -1973,7 +2067,6 @@ static int lod_mdt_alloc_specific(const struct lu_env *env, struct dt_object **stripes, __u32 *mdt_indices, bool is_specific) { - struct lod_thread_info *info = lod_env_info(env); struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); struct lu_tgt_descs *ltd = &lod->lod_mdt_descs; struct lu_tgt_desc *tgt = NULL; @@ -1989,7 +2082,7 @@ static int lod_mdt_alloc_specific(const struct lu_env *env, int rc; master_index = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id; - if (stripe_count > 1) + if (!is_specific && stripe_count > 1) /* Set the start index for the 2nd stripe allocation */ mdt_indices[1] = (mdt_indices[0] + 1) % (lod->lod_remote_mdt_count + 1); @@ -2022,17 +2115,17 @@ static int lod_mdt_alloc_specific(const struct lu_env *env, /* Sigh, this index is not in the bitmap, let's check * next available target */ - if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx) && + if (!test_bit(idx, ltd->ltd_tgt_bitmap) && idx != master_index) continue; if (idx == master_index) { /* Allocate the FID locally */ - rc = obd_fid_alloc(env, lod->lod_child_exp, - &fid, NULL); + tgt_dt = lod->lod_child; + rc = dt_fid_alloc(env, tgt_dt, &fid, NULL, + NULL); if (rc < 0) continue; - tgt_dt = lod->lod_child; break; } @@ -2042,12 +2135,11 @@ static int lod_mdt_alloc_specific(const struct lu_env *env, continue; tgt_dt = tgt->ltd_tgt; - rc = dt_statfs(env, tgt_dt, &info->lti_osfs); - if (rc) + if (!tgt->ltd_active) /* this OSP doesn't feel well */ continue; - rc = obd_fid_alloc(env, tgt->ltd_exp, &fid, NULL); + rc = dt_fid_alloc(env, tgt_dt, &fid, NULL, NULL); if (rc < 0) continue; @@ -2132,12 +2224,12 @@ static int lod_prep_md_striped_create(const struct lu_env *env, stripe_count = lo->ldo_dir_stripe_count; - OBD_ALLOC(stripes, sizeof(stripes[0]) * stripe_count); + OBD_ALLOC_PTR_ARRAY(stripes, stripe_count); if (!stripes) RETURN(-ENOMEM); /* Allocate the first stripe locally */ - rc = obd_fid_alloc(env, lod->lod_child_exp, &fid, NULL); + rc = dt_fid_alloc(env, lod->lod_child, &fid, NULL, NULL); if (rc < 0) GOTO(out, rc); @@ -2148,14 +2240,15 @@ static int lod_prep_md_striped_create(const struct lu_env *env, if (lo->ldo_dir_stripe_offset == LMV_OFFSET_DEFAULT) { lod_qos_statfs_update(env, lod, &lod->lod_mdt_descs); - rc = lod_mdt_alloc_qos(env, lo, stripes); + rc = lod_mdt_alloc_qos(env, lo, stripes, 1, stripe_count); if (rc == -EAGAIN) - rc = lod_mdt_alloc_rr(env, lo, stripes); + rc = lod_mdt_alloc_rr(env, lo, stripes, 1, + stripe_count); } else { int *idx_array; bool is_specific = false; - OBD_ALLOC(idx_array, sizeof(idx_array[0]) * stripe_count); + OBD_ALLOC_PTR_ARRAY(idx_array, stripe_count); if (!idx_array) GOTO(out, rc = -ENOMEM); @@ -2171,7 +2264,7 @@ static int lod_prep_md_striped_create(const struct lu_env *env, lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id; rc = lod_mdt_alloc_specific(env, lo, stripes, idx_array, is_specific); - OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count); + OBD_FREE_PTR_ARRAY(idx_array, stripe_count); } if (rc < 0) @@ -2198,7 +2291,7 @@ out: dt_object_put(env, stripes[0]); for (i = 1; i < stripe_count; i++) LASSERT(!stripes[i]); - OBD_FREE(stripes, sizeof(stripes[0]) * stripe_count); + OBD_FREE_PTR_ARRAY(stripes, stripe_count); return rc; } @@ -2305,6 +2398,7 @@ static int lod_dir_layout_set(const struct lu_env *env, { struct dt_object *next = dt_object_child(dt); struct lod_object *lo = lod_dt_obj(dt); + struct lod_device *lod = lu2lod_dev(lod2lu_obj(lo)->lo_dev); struct lmv_mds_md_v1 *lmv = buf->lb_buf; struct lmv_mds_md_v1 *slave_lmv; struct lu_buf slave_buf; @@ -2313,10 +2407,29 @@ static int lod_dir_layout_set(const struct lu_env *env, ENTRY; + if (!lmv_is_sane2(lmv)) + RETURN(-EINVAL); + + /* adjust hash for dir merge, which may not be set in user command */ + if (lmv_is_merging(lmv) && !lmv->lmv_migrate_hash) + lmv->lmv_merge_hash = + lod->lod_mdt_descs.ltd_lmv_desc.ld_pattern; + + LMV_DEBUG(D_INFO, lmv, "set"); + rc = lod_sub_xattr_set(env, next, buf, XATTR_NAME_LMV, fl, th); if (rc) RETURN(rc); + /* directory restripe may update stripe LMV directly */ + if (!lo->ldo_dir_stripe_count) + RETURN(0); + + lo->ldo_dir_hash_type = le32_to_cpu(lmv->lmv_hash_type); + lo->ldo_dir_migrate_offset = le32_to_cpu(lmv->lmv_migrate_offset); + lo->ldo_dir_migrate_hash = le32_to_cpu(lmv->lmv_migrate_hash); + lo->ldo_dir_layout_version = le32_to_cpu(lmv->lmv_layout_version); + OBD_ALLOC_PTR(slave_lmv); if (!slave_lmv) RETURN(-ENOMEM); @@ -2338,7 +2451,6 @@ static int lod_dir_layout_set(const struct lu_env *env, break; } - lod_striping_free(env, lod_dt_obj(dt)); OBD_FREE_PTR(slave_lmv); RETURN(rc); @@ -2376,7 +2488,7 @@ static int lod_dir_declare_xattr_set(const struct lu_env *env, if (rc != 0) RETURN(rc); } else if (strcmp(name, XATTR_NAME_LOV) == 0) { - rc = lod_verify_striping(d, lo, buf, false); + rc = lod_verify_striping(env, d, lo, buf, false); if (rc != 0) RETURN(rc); } @@ -2526,20 +2638,22 @@ static int lod_replace_parent_fid(const struct lu_env *env, RETURN(rc); } -inline __u16 lod_comp_entry_stripe_count(struct lod_object *lo, - struct lod_layout_component *entry, - bool is_dir) +__u16 lod_comp_entry_stripe_count(struct lod_object *lo, + int comp_idx, bool is_dir) { struct lod_device *lod = lu2lod_dev(lod2lu_obj(lo)->lo_dev); + struct lod_layout_component *entry; if (is_dir) return 0; - else if (lod_comp_inited(entry)) + + entry = &lo->ldo_comp_entries[comp_idx]; + if (lod_comp_inited(entry)) return entry->llc_stripe_count; else if ((__u16)-1 == entry->llc_stripe_count) return lod->lod_ost_count; else - return lod_get_stripe_count(lod, lo, + return lod_get_stripe_count(lod, lo, comp_idx, entry->llc_stripe_count, false); } @@ -2576,8 +2690,7 @@ static int lod_comp_md_size(struct lod_object *lo, bool is_dir) __u16 stripe_count; magic = comp_entries[i].llc_pool ? LOV_MAGIC_V3 : LOV_MAGIC_V1; - stripe_count = lod_comp_entry_stripe_count(lo, &comp_entries[i], - is_dir); + stripe_count = lod_comp_entry_stripe_count(lo, i, is_dir); if (!is_dir && is_composite) lod_comp_shrink_stripe_count(&comp_entries[i], &stripe_count); @@ -2623,7 +2736,7 @@ static int lod_declare_layout_add(const struct lu_env *env, if (lo->ldo_flr_state != LCM_FL_NONE) RETURN(-EBUSY); - rc = lod_verify_striping(d, lo, buf, false); + rc = lod_verify_striping(env, d, lo, buf, false); if (rc != 0) RETURN(rc); @@ -2636,10 +2749,15 @@ static int lod_declare_layout_add(const struct lu_env *env, if (magic != LOV_USER_MAGIC_COMP_V1) RETURN(-EINVAL); + mutex_lock(&lo->ldo_layout_mutex); + array_cnt = lo->ldo_comp_cnt + comp_v1->lcm_entry_count; - OBD_ALLOC(comp_array, sizeof(*comp_array) * array_cnt); - if (comp_array == NULL) + OBD_ALLOC_PTR_ARRAY(comp_array, array_cnt); + if (comp_array == NULL) { + mutex_unlock(&lo->ldo_layout_mutex); RETURN(-ENOMEM); + } + memcpy(comp_array, lo->ldo_comp_entries, sizeof(*comp_array) * lo->ldo_comp_cnt); @@ -2691,11 +2809,13 @@ static int lod_declare_layout_add(const struct lu_env *env, GOTO(error, rc); } - OBD_FREE(old_array, sizeof(*lod_comp) * old_array_cnt); + OBD_FREE_PTR_ARRAY(old_array, old_array_cnt); LASSERT(lo->ldo_mirror_count == 1); lo->ldo_mirrors[0].lme_end = array_cnt - 1; + mutex_unlock(&lo->ldo_layout_mutex); + RETURN(0); error: @@ -2707,7 +2827,9 @@ error: lod_comp->llc_pool = NULL; } } - OBD_FREE(comp_array, sizeof(*comp_array) * array_cnt); + OBD_FREE_PTR_ARRAY(comp_array, array_cnt); + mutex_unlock(&lo->ldo_layout_mutex); + RETURN(rc); } @@ -2803,6 +2925,7 @@ static int lod_declare_layout_set(const struct lu_env *env, RETURN(-EINVAL); } + mutex_lock(&lo->ldo_layout_mutex); for (i = 0; i < comp_v1->lcm_entry_count; i++) { __u32 id = comp_v1->lcm_entries[i].lcme_id; __u32 flags = comp_v1->lcm_entries[i].lcme_flags; @@ -2812,7 +2935,8 @@ static int lod_declare_layout_set(const struct lu_env *env, if (flags & LCME_FL_INIT) { if (changed) - lod_striping_free(env, lo); + lod_striping_free_nolock(env, lo); + mutex_unlock(&lo->ldo_layout_mutex); RETURN(-EINVAL); } @@ -2835,8 +2959,11 @@ static int lod_declare_layout_set(const struct lu_env *env, if (flags) { if ((flags & LCME_FL_STALE) && lod_last_non_stale_mirror(mirror_id, - lo)) + lo)) { + mutex_unlock( + &lo->ldo_layout_mutex); RETURN(-EUCLEAN); + } lod_comp->llc_flags |= flags; } if (mirror_flag) { @@ -2849,6 +2976,7 @@ static int lod_declare_layout_set(const struct lu_env *env, changed = true; } } + mutex_unlock(&lo->ldo_layout_mutex); if (!changed) { CDEBUG(D_LAYOUT, "%s: requested component(s) not found.\n", @@ -2931,9 +3059,13 @@ static int lod_declare_layout_del(const struct lu_env *env, flags = 0; } + mutex_lock(&lo->ldo_layout_mutex); + left = lo->ldo_comp_cnt; - if (left <= 0) + if (left <= 0) { + mutex_unlock(&lo->ldo_layout_mutex); RETURN(-EINVAL); + } for (i = (lo->ldo_comp_cnt - 1); i >= 0; i--) { struct lod_layout_component *lod_comp; @@ -2950,6 +3082,7 @@ static int lod_declare_layout_del(const struct lu_env *env, if (left != (i + 1)) { CDEBUG(D_LAYOUT, "%s: this deletion will create " "a hole.\n", lod2obd(d)->obd_name); + mutex_unlock(&lo->ldo_layout_mutex); RETURN(-EINVAL); } left--; @@ -2968,8 +3101,10 @@ static int lod_declare_layout_del(const struct lu_env *env, if (obj == NULL) continue; rc = lod_sub_declare_destroy(env, obj, th); - if (rc) + if (rc) { + mutex_unlock(&lo->ldo_layout_mutex); RETURN(rc); + } } } @@ -2977,9 +3112,12 @@ static int lod_declare_layout_del(const struct lu_env *env, if (left == lo->ldo_comp_cnt) { CDEBUG(D_LAYOUT, "%s: requested component id:%#x not found\n", lod2obd(d)->obd_name, id); + mutex_unlock(&lo->ldo_layout_mutex); RETURN(-EINVAL); } + mutex_unlock(&lo->ldo_layout_mutex); + memset(attr, 0, sizeof(*attr)); attr->la_valid = LA_SIZE; rc = lod_sub_declare_attr_set(env, next, attr, th); @@ -3101,13 +3239,13 @@ static int lod_layout_convert(struct lod_thread_info *info) } lcm = info->lti_ea_store; + memset(lcm, 0, sizeof(*lcm) + sizeof(*lcme)); lcm->lcm_magic = cpu_to_le32(LOV_MAGIC_COMP_V1); lcm->lcm_size = cpu_to_le32(size); lcm->lcm_layout_gen = cpu_to_le32(le16_to_cpu( lmm_save->lmm_layout_gen)); lcm->lcm_flags = cpu_to_le16(LCM_FL_NONE); lcm->lcm_entry_count = cpu_to_le16(1); - lcm->lcm_mirror_count = 0; lcme = &lcm->lcm_entries[0]; lcme->lcme_flags = cpu_to_le32(LCME_FL_INIT); @@ -3298,18 +3436,209 @@ static int lod_declare_layout_split(const struct lu_env *env, int rc; ENTRY; - lod_obj_inc_layout_gen(lo); - lcm->lcm_layout_gen = cpu_to_le32(lo->ldo_layout_gen); - rc = lod_striping_reload(env, lo, mbuf); if (rc) RETURN(rc); + lod_obj_inc_layout_gen(lo); + /* fix on-disk layout gen */ + lcm->lcm_layout_gen = cpu_to_le32(lo->ldo_layout_gen); + rc = lod_sub_declare_xattr_set(env, dt_object_child(dt), mbuf, XATTR_NAME_LOV, LU_XATTR_REPLACE, th); RETURN(rc); } +static int lod_layout_declare_or_purge_mirror(const struct lu_env *env, + struct dt_object *dt, const struct lu_buf *buf, + struct thandle *th, bool declare) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); + struct lod_object *lo = lod_dt_obj(dt); + struct lov_comp_md_v1 *comp_v1 = buf->lb_buf; + struct lov_comp_md_entry_v1 *entry; + struct lov_mds_md_v1 *lmm; + struct dt_object **sub_objs = NULL; + int rc = 0, i, k, array_count = 0; + + ENTRY; + + /** + * other ops (like lod_declare_destroy) could destroying sub objects + * as well. + */ + mutex_lock(&lo->ldo_layout_mutex); + + if (!declare) { + /* prepare sub-objects array */ + for (i = 0; i < comp_v1->lcm_entry_count; i++) { + entry = &comp_v1->lcm_entries[i]; + + if (!(entry->lcme_flags & LCME_FL_INIT)) + continue; + + lmm = (struct lov_mds_md_v1 *) + ((char *)comp_v1 + entry->lcme_offset); + array_count += lmm->lmm_stripe_count; + } + OBD_ALLOC_PTR_ARRAY(sub_objs, array_count); + if (sub_objs == NULL) { + mutex_unlock(&lo->ldo_layout_mutex); + RETURN(-ENOMEM); + } + } + + k = 0; /* sub_objs index */ + for (i = 0; i < comp_v1->lcm_entry_count; i++) { + struct lov_ost_data_v1 *objs; + struct lu_object *o, *n; + struct dt_object *dto; + struct lu_device *nd; + struct lov_mds_md_v3 *v3; + __u32 idx; + int j; + + entry = &comp_v1->lcm_entries[i]; + + if (!(entry->lcme_flags & LCME_FL_INIT)) + continue; + + lmm = (struct lov_mds_md_v1 *) + ((char *)comp_v1 + entry->lcme_offset); + v3 = (struct lov_mds_md_v3 *)lmm; + if (lmm->lmm_magic == LOV_MAGIC_V3) + objs = &v3->lmm_objects[0]; + else + objs = &lmm->lmm_objects[0]; + + for (j = 0; j < lmm->lmm_stripe_count; j++) { + idx = objs[j].l_ost_idx; + rc = ostid_to_fid(&info->lti_fid, &objs[j].l_ost_oi, + idx); + if (rc) + GOTO(out, rc); + + if (!fid_is_sane(&info->lti_fid)) { + CERROR("%s: sub-object insane fid "DFID"\n", + lod2obd(d)->obd_name, + PFID(&info->lti_fid)); + GOTO(out, rc = -EINVAL); + } + + lod_getref(&d->lod_ost_descs); + + rc = validate_lod_and_idx(d, idx); + if (unlikely(rc)) { + lod_putref(d, &d->lod_ost_descs); + GOTO(out, rc); + } + + nd = &OST_TGT(d, idx)->ltd_tgt->dd_lu_dev; + lod_putref(d, &d->lod_ost_descs); + + o = lu_object_find_at(env, nd, &info->lti_fid, NULL); + if (IS_ERR(o)) + GOTO(out, rc = PTR_ERR(o)); + + n = lu_object_locate(o->lo_header, nd->ld_type); + if (unlikely(!n)) { + lu_object_put(env, n); + GOTO(out, rc = -ENOENT); + } + + dto = container_of(n, struct dt_object, do_lu); + + if (declare) { + rc = lod_sub_declare_destroy(env, dto, th); + dt_object_put(env, dto); + if (rc) + GOTO(out, rc); + } else { + /** + * collect to-be-destroyed sub objects, the + * reference would be released after actual + * deletion. + */ + sub_objs[k] = dto; + k++; + } + } /* for each stripe */ + } /* for each component in the mirror */ +out: + if (!declare) { + i = 0; + if (!rc) { + /* destroy the sub objects */ + for (; i < k; i++) { + rc = lod_sub_destroy(env, sub_objs[i], th); + if (rc) + break; + dt_object_put(env, sub_objs[i]); + } + } + /** + * if a sub object destroy failed, we'd release sub objects + * reference get from above sub_objs collection. + */ + for (; i < k; i++) + dt_object_put(env, sub_objs[i]); + + OBD_FREE_PTR_ARRAY(sub_objs, array_count); + } + mutex_unlock(&lo->ldo_layout_mutex); + + RETURN(rc); +} + +/** + * Purge layouts, delete sub objects in the mirror stored in the vic_buf, + * and set the LOVEA with the layout from mbuf. + */ +static int lod_declare_layout_purge(const struct lu_env *env, + struct dt_object *dt, const struct lu_buf *buf, + struct thandle *th) +{ + struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); + struct lov_comp_md_v1 *comp_v1 = buf->lb_buf; + int rc; + + ENTRY; + + if (le32_to_cpu(comp_v1->lcm_magic) != LOV_MAGIC_COMP_V1) { + CERROR("%s: invalid layout magic %#x != %#x\n", + lod2obd(d)->obd_name, le32_to_cpu(comp_v1->lcm_magic), + LOV_MAGIC_COMP_V1); + RETURN(-EINVAL); + } + + if (cpu_to_le32(LOV_MAGIC_COMP_V1) != LOV_MAGIC_COMP_V1) + lustre_swab_lov_comp_md_v1(comp_v1); + + /* from now on, @buf contains cpu endian data */ + + if (comp_v1->lcm_mirror_count != 0) { + CERROR("%s: can only purge one mirror from "DFID"\n", + lod2obd(d)->obd_name, PFID(lu_object_fid(&dt->do_lu))); + RETURN(-EINVAL); + } + + /* delcare sub objects deletion in the mirror stored in @buf */ + rc = lod_layout_declare_or_purge_mirror(env, dt, buf, th, true); + RETURN(rc); +} + +/* delete sub objects from the mirror stored in @buf */ +static int lod_layout_purge(const struct lu_env *env, struct dt_object *dt, + const struct lu_buf *buf, struct thandle *th) +{ + int rc; + + ENTRY; + rc = lod_layout_declare_or_purge_mirror(env, dt, buf, th, false); + RETURN(rc); +} + /** * Implementation of dt_object_operations::do_declare_xattr_set. * @@ -3334,7 +3663,8 @@ static int lod_declare_xattr_set(const struct lu_env *env, mode = dt->do_lu.lo_header->loh_attr & S_IFMT; if ((S_ISREG(mode) || mode == 0) && - !(fl & (LU_XATTR_REPLACE | LU_XATTR_MERGE | LU_XATTR_SPLIT)) && + !(fl & (LU_XATTR_REPLACE | LU_XATTR_MERGE | LU_XATTR_SPLIT | + LU_XATTR_PURGE)) && (strcmp(name, XATTR_NAME_LOV) == 0 || strcmp(name, XATTR_LUSTRE_LOV) == 0)) { /* @@ -3364,6 +3694,10 @@ static int lod_declare_xattr_set(const struct lu_env *env, LASSERT(strcmp(name, XATTR_NAME_LOV) == 0 || strcmp(name, XATTR_LUSTRE_LOV) == 0); rc = lod_declare_layout_split(env, dt, buf, th); + } else if (fl & LU_XATTR_PURGE) { + LASSERT(strcmp(name, XATTR_NAME_LOV) == 0 || + strcmp(name, XATTR_LUSTRE_LOV) == 0); + rc = lod_declare_layout_purge(env, dt, buf, th); } else if (S_ISREG(mode) && strlen(name) >= sizeof(XATTR_LUSTRE_LOV) + 3 && allowed_lustre_lov(name)) { @@ -3457,10 +3791,11 @@ static int lod_xattr_del_internal(const struct lu_env *env, struct dt_object *dt, const char *name, struct thandle *th) { - struct dt_object *next = dt_object_child(dt); - struct lod_object *lo = lod_dt_obj(dt); - int rc; - int i; + struct dt_object *next = dt_object_child(dt); + struct lod_object *lo = lod_dt_obj(dt); + int i; + int rc; + ENTRY; rc = lod_sub_xattr_del(env, next, name, th); @@ -3471,7 +3806,11 @@ static int lod_xattr_del_internal(const struct lu_env *env, RETURN(rc); for (i = 0; i < lo->ldo_dir_stripe_count; i++) { - LASSERT(lo->ldo_stripe[i]); + if (!lo->ldo_stripe[i]) + continue; + + if (!dt_object_exists(lo->ldo_stripe[i])) + continue; rc = lod_sub_xattr_del(env, lo->ldo_stripe[i], name, th); if (rc != 0) @@ -3680,7 +4019,7 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, if (rc != 0) RETURN(rc); - attr->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | + attr->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_FLAGS | LA_MODE | LA_UID | LA_GID | LA_TYPE | LA_PROJID; dof->dof_type = DFT_DIR; @@ -3713,9 +4052,15 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, if (i && OBD_FAIL_CHECK(OBD_FAIL_MDS_STRIPE_CREATE)) continue; - /* if it's source stripe of migrating directory, don't create */ - if (!((lo->ldo_dir_hash_type & LMV_HASH_FLAG_MIGRATION) && - i >= lo->ldo_dir_migrate_offset)) { + /* don't create stripe if: + * 1. it's source stripe of migrating directory + * 2. it's existed stripe of splitting directory + */ + if ((lod_is_migrating(lo) && i >= lo->ldo_dir_migrate_offset) || + (lod_is_splitting(lo) && i < lo->ldo_dir_split_offset)) { + if (!dt_object_exists(dto)) + GOTO(out, rc = -EINVAL); + } else { dt_write_lock(env, dto, DT_TGT_CHILD); rc = lod_sub_create(env, dto, attr, NULL, dof, th); if (rc != 0) { @@ -3736,12 +4081,6 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, GOTO(out, rc); } - rec->rec_fid = lu_object_fid(&dt->do_lu); - rc = lod_sub_insert(env, dto, (struct dt_rec *)rec, - (const struct dt_key *)dotdot, th); - if (rc != 0) - GOTO(out, rc); - if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) || cfs_fail_val != i) { if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_LMV) && @@ -3758,6 +4097,21 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, GOTO(out, rc); } + /* don't insert stripe if it's existed stripe of splitting + * directory (this directory is striped). + * NB, plain directory will insert itself as the first + * stripe in target. + */ + if (lod_is_splitting(lo) && lo->ldo_dir_split_offset > 1 && + lo->ldo_dir_split_offset > i) + continue; + + rec->rec_fid = lu_object_fid(&dt->do_lu); + rc = lod_sub_insert(env, dto, (struct dt_rec *)rec, + (const struct dt_key *)dotdot, th); + if (rc != 0) + GOTO(out, rc); + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && cfs_fail_val == i) snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", @@ -4018,7 +4372,7 @@ static int lod_generate_and_set_lovea(const struct lu_env *env, LASSERT(lo); if (lo->ldo_comp_cnt == 0 && !lo->ldo_is_foreign) { - lod_striping_free(env, lo); + lod_striping_free_nolock(env, lo); rc = lod_sub_xattr_del(env, next, XATTR_NAME_LOV, th); RETURN(rc); } @@ -4081,7 +4435,7 @@ static int lod_layout_repeat_comp(const struct lu_env *env, CDEBUG(D_LAYOUT, "repeating component %d\n", index); - OBD_ALLOC(comp_array, sizeof(*comp_array) * new_cnt); + OBD_ALLOC_PTR_ARRAY(comp_array, new_cnt); if (comp_array == NULL) GOTO(out, rc = -ENOMEM); @@ -4103,6 +4457,7 @@ static int lod_layout_repeat_comp(const struct lu_env *env, new_comp->llc_flags &= ~LCME_FL_INIT; new_comp->llc_stripe = NULL; new_comp->llc_stripes_allocated = 0; + new_comp->llc_ost_indices = NULL; new_comp->llc_stripe_offset = LOV_OFFSET_DEFAULT; /* for uninstantiated components, layout gen stores default stripe * offset */ @@ -4128,8 +4483,7 @@ static int lod_layout_repeat_comp(const struct lu_env *env, new_comp->llc_ostlist.op_array = op_array; } - OBD_FREE(lo->ldo_comp_entries, - sizeof(*comp_array) * lo->ldo_comp_cnt); + OBD_FREE_PTR_ARRAY(lo->ldo_comp_entries, lo->ldo_comp_cnt); lo->ldo_comp_entries = comp_array; lo->ldo_comp_cnt = new_cnt; @@ -4143,7 +4497,7 @@ static int lod_layout_repeat_comp(const struct lu_env *env, EXIT; out: if (rc) - OBD_FREE(comp_array, sizeof(*comp_array) * new_cnt); + OBD_FREE_PTR_ARRAY(comp_array, new_cnt); return rc; } @@ -4160,12 +4514,11 @@ static int lod_layout_data_init(struct lod_thread_info *info, __u32 comp_cnt) RETURN(0); if (info->lti_comp_size > 0) { - OBD_FREE(info->lti_comp_idx, - info->lti_comp_size * sizeof(__u32)); + OBD_FREE_PTR_ARRAY(info->lti_comp_idx, info->lti_comp_size); info->lti_comp_size = 0; } - OBD_ALLOC(info->lti_comp_idx, comp_cnt * sizeof(__u32)); + OBD_ALLOC_PTR_ARRAY(info->lti_comp_idx, comp_cnt); if (!info->lti_comp_idx) RETURN(-ENOMEM); @@ -4252,11 +4605,11 @@ static int lod_layout_del_prep_layout(const struct lu_env *env, lu_object_put(env, &obj->do_lu); lod_comp->llc_stripe[j] = NULL; } - OBD_FREE(lod_comp->llc_stripe, sizeof(*lod_comp->llc_stripe) * - lod_comp->llc_stripes_allocated); + OBD_FREE_PTR_ARRAY(lod_comp->llc_stripe, + lod_comp->llc_stripes_allocated); lod_comp->llc_stripe = NULL; - OBD_FREE(lod_comp->llc_ost_indices, - sizeof(__u32) * lod_comp->llc_stripes_allocated); + OBD_FREE_PTR_ARRAY(lod_comp->llc_ost_indices, + lod_comp->llc_stripes_allocated); lod_comp->llc_ost_indices = NULL; lod_comp->llc_stripes_allocated = 0; } @@ -4269,7 +4622,7 @@ static int lod_layout_del_prep_layout(const struct lu_env *env, if (info->lti_count > 0) { struct lod_layout_component *comp_array; - OBD_ALLOC(comp_array, sizeof(*comp_array) * info->lti_count); + OBD_ALLOC_PTR_ARRAY(comp_array, info->lti_count); if (comp_array == NULL) GOTO(out, rc = -ENOMEM); @@ -4279,8 +4632,7 @@ static int lod_layout_del_prep_layout(const struct lu_env *env, sizeof(*comp_array)); } - OBD_FREE(lo->ldo_comp_entries, - sizeof(*comp_array) * lo->ldo_comp_cnt); + OBD_FREE_PTR_ARRAY(lo->ldo_comp_entries, lo->ldo_comp_cnt); lo->ldo_comp_entries = comp_array; lo->ldo_comp_cnt = info->lti_count; } else { @@ -4316,6 +4668,8 @@ static int lod_layout_del(const struct lu_env *env, struct dt_object *dt, LASSERT(lo->ldo_mirror_count == 1); + mutex_lock(&lo->ldo_layout_mutex); + rc = lod_layout_del_prep_layout(env, lo, th); if (rc < 0) GOTO(out, rc); @@ -4343,7 +4697,10 @@ static int lod_layout_del(const struct lu_env *env, struct dt_object *dt, EXIT; out: if (rc) - lod_striping_free(env, lo); + lod_striping_free_nolock(env, lo); + + mutex_unlock(&lo->ldo_layout_mutex); + return rc; } @@ -4469,6 +4826,8 @@ static int lod_xattr_set(const struct lu_env *env, lod_striping_free(env, lod_dt_obj(dt)); rc = lod_sub_xattr_set(env, next, buf, name, fl, th); + } else if (fl & LU_XATTR_PURGE) { + rc = lod_layout_purge(env, dt, buf, th); } else if (dt_object_remote(dt)) { /* This only happens during migration, see * mdd_migrate_create(), in which Master MDT will @@ -4554,6 +4913,9 @@ static int lod_declare_xattr_del(const struct lu_env *env, if (!dto) continue; + if (!dt_object_exists(dto)) + continue; + rc = lod_sub_declare_xattr_del(env, dto, name, th); if (rc != 0) break; @@ -4573,35 +4935,14 @@ static int lod_declare_xattr_del(const struct lu_env *env, static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt, const char *name, struct thandle *th) { - struct dt_object *next = dt_object_child(dt); - struct lod_object *lo = lod_dt_obj(dt); - int rc; - int i; + int rc; + ENTRY; if (!strcmp(name, XATTR_NAME_LOV) || !strcmp(name, XATTR_NAME_LMV)) lod_striping_free(env, lod_dt_obj(dt)); - rc = lod_sub_xattr_del(env, next, name, th); - if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr)) - RETURN(rc); - - if (!strcmp(name, XATTR_NAME_LMV)) - RETURN(0); - - if (lo->ldo_dir_stripe_count == 0) - RETURN(0); - - for (i = 0; i < lo->ldo_dir_stripe_count; i++) { - struct dt_object *dto = lo->ldo_stripe[i]; - - if (!dto) - continue; - - rc = lod_sub_xattr_del(env, dto, name, th); - if (rc != 0) - break; - } + rc = lod_xattr_del_internal(env, dt, name, th); RETURN(rc); } @@ -5235,7 +5576,6 @@ out: EXIT; } -#define ll_do_div64(aaa,bbb) do_div((aaa), (bbb)) /** * Size initialization on late striping. * @@ -5299,14 +5639,13 @@ static int lod_declare_init_size(const struct lu_env *env, continue; LASSERT(objects != NULL && stripe_size != 0); - /* ll_do_div64(a, b) returns a % b, and a = a / b */ - ll_do_div64(size, (__u64)stripe_size); - stripe = ll_do_div64(size, (__u64)stripe_count); + do_div(size, stripe_size); + stripe = do_div(size, stripe_count); LASSERT(objects[stripe] != NULL); size = size * stripe_size; offs = attr->la_size; - size += ll_do_div64(offs, stripe_size); + size += do_div(offs, stripe_size); attr->la_valid = LA_SIZE; attr->la_size = size; @@ -5631,6 +5970,8 @@ int lod_striped_create(const struct lu_env *env, struct dt_object *dt, int rc = 0, i, j; ENTRY; + mutex_lock(&lo->ldo_layout_mutex); + LASSERT((lo->ldo_comp_cnt != 0 && lo->ldo_comp_entries != NULL) || lo->ldo_is_foreign); @@ -5689,15 +6030,20 @@ int lod_striped_create(const struct lu_env *env, struct dt_object *dt, if (rc) GOTO(out, rc); + lo->ldo_comp_cached = 1; + rc = lod_generate_and_set_lovea(env, lo, th); if (rc) GOTO(out, rc); - lo->ldo_comp_cached = 1; + mutex_unlock(&lo->ldo_layout_mutex); + RETURN(0); out: - lod_striping_free(env, lo); + lod_striping_free_nolock(env, lo); + mutex_unlock(&lo->ldo_layout_mutex); + RETURN(rc); } @@ -5757,11 +6103,12 @@ lod_obj_stripe_destroy_cb(const struct lu_env *env, struct lod_object *lo, { if (data->locd_declare) return lod_sub_declare_destroy(env, dt, th); - else if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) || - stripe_idx == cfs_fail_val) + + if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) || + stripe_idx == cfs_fail_val) return lod_sub_destroy(env, dt, th); - else - return 0; + + return 0; } /** @@ -6175,7 +6522,9 @@ static int lod_invalidate(const struct lu_env *env, struct dt_object *dt) } static int lod_declare_instantiate_components(const struct lu_env *env, - struct lod_object *lo, struct thandle *th) + struct lod_object *lo, + struct thandle *th, + __u64 reserve) { struct lod_thread_info *info = lod_env_info(env); int i; @@ -6186,7 +6535,7 @@ static int lod_declare_instantiate_components(const struct lu_env *env, for (i = 0; i < info->lti_count; i++) { rc = lod_qos_prep_create(env, lo, NULL, th, - info->lti_comp_idx[i]); + info->lti_comp_idx[i], reserve); if (rc) break; } @@ -6222,15 +6571,15 @@ static int lod_declare_instantiate_components(const struct lu_env *env, */ static bool lod_sel_osts_allowed(const struct lu_env *env, struct lod_object *lo, - int index, __u64 extension_size, + int index, __u64 reserve, struct lu_extent *extent, struct lu_extent *comp_extent, int write) { struct lod_layout_component *lod_comp = &lo->ldo_comp_entries[index]; struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); - struct obd_statfs *sfs = &lod_env_info(env)->lti_osfs; + struct lod_thread_info *tinfo = lod_env_info(env); + struct obd_statfs *sfs = &tinfo->lti_osfs; __u64 available = 0; - __u64 size; bool ret = true; int i, rc; @@ -6238,21 +6587,6 @@ static bool lod_sel_osts_allowed(const struct lu_env *env, LASSERT(lod_comp->llc_stripe_count != 0); - if (write == 0 || - (extent->e_start == 0 && extent->e_end == OBD_OBJECT_EOF)) { - /* truncate or append */ - size = extension_size; - } else { - /* In case of write op, check the real write extent, - * it may be larger than the extension_size */ - size = roundup(min(extent->e_end, comp_extent->e_end) - - max(extent->e_start, comp_extent->e_start), - extension_size); - } - /* extension_size is file level, so we must divide by stripe count to - * compare it to available space on a single OST */ - size /= lod_comp->llc_stripe_count; - lod_getref(&lod->lod_ost_descs); for (i = 0; i < lod_comp->llc_stripe_count; i++) { int index = lod_comp->llc_ost_indices[i]; @@ -6279,7 +6613,7 @@ static bool lod_sel_osts_allowed(const struct lu_env *env, if (j < lod_comp->llc_stripe_count) continue; - if (!cfs_bitmap_check(lod->lod_ost_bitmap, index)) { + if (!test_bit(index, lod->lod_ost_bitmap)) { CDEBUG(D_LAYOUT, "ost %d no longer present\n", index); ret = false; break; @@ -6293,9 +6627,9 @@ static bool lod_sel_osts_allowed(const struct lu_env *env, break; } - if (sfs->os_state & OS_STATE_ENOSPC || - sfs->os_state & OS_STATE_READONLY || - sfs->os_state & OS_STATE_DEGRADED) { + if (sfs->os_state & OS_STATFS_ENOSPC || + sfs->os_state & OS_STATFS_READONLY || + sfs->os_state & OS_STATFS_DEGRADED) { CDEBUG(D_LAYOUT, "ost %d is not availble for SEL " "extension, state %u\n", index, sfs->os_state); ret = false; @@ -6313,11 +6647,11 @@ static bool lod_sel_osts_allowed(const struct lu_env *env, (100ull * sfs->os_bavail) / sfs->os_blocks, (100ull * sfs->os_bfree) / sfs->os_blocks); - if (size * repeated > available) { + if (reserve * repeated > available) { ret = false; CDEBUG(D_LAYOUT, "low space on ost %d, available %llu " - "< extension size %llu\n", index, available, - extension_size); + "< extension size %llu repeated %d\n", index, + available, reserve, repeated); break; } } @@ -6423,6 +6757,26 @@ static __u64 lod_extension_new_end(__u64 extension_size, __u64 extent_end, return new_end; } +/** + * Calculate the exact reservation (per-OST extension_size) on the OSTs being + * instantiated. It needs to be calculated in advance and taken into account at + * the instantiation time, because otherwise lod_statfs_and_check() may consider + * an OST as OK, but SEL needs its extension_size to fit the free space and the + * OST may turn out to be low-on-space, thus inappropriate OST may be used and + * ENOSPC occurs. + * + * \param[in] lod_comp lod component we are checking + * + * \retval size to reserved on each OST of lod_comp's stripe. + */ +static __u64 lod_sel_stripe_reserved(struct lod_layout_component *lod_comp) +{ + /* extension_size is file level, so we must divide by stripe count to + * compare it to available space on a single OST */ + return lod_comp->llc_stripe_size * SEL_UNIT_SIZE / + lod_comp->llc_stripe_count; +} + /* As lod_sel_handler() could be re-entered for the same component several * times, this is the data for the next call. Fields could be changed to * component indexes when needed, (e.g. if there is no need to instantiate @@ -6504,7 +6858,7 @@ static int lod_sel_handler(const struct lu_env *env, struct lod_layout_component *lod_comp; struct lod_layout_component *prev; struct lod_layout_component *next = NULL; - __u64 extension_size; + __u64 extension_size, reserve; __u64 new_end = 0; bool repeated; int change = 0; @@ -6541,11 +6895,13 @@ static int lod_sel_handler(const struct lu_env *env, RETURN(-EINVAL); } + reserve = lod_sel_stripe_reserved(lod_comp); + if (!prev->llc_stripe) { CDEBUG(D_LAYOUT, "Previous component not inited\n"); info->lti_count = 1; info->lti_comp_idx[0] = index - 1; - rc = lod_declare_instantiate_components(env, lo, th); + rc = lod_declare_instantiate_components(env, lo, th, reserve); /* ENOSPC tells us we can't use this component. If there is * a next or we are repeating, we either spill over (next) or * extend the original comp (repeat). Otherwise, return the @@ -6557,8 +6913,7 @@ static int lod_sel_handler(const struct lu_env *env, } if (sd->sd_force == 0 && rc == 0) - rc = !lod_sel_osts_allowed(env, lo, index - 1, - extension_size, extent, + rc = !lod_sel_osts_allowed(env, lo, index - 1, reserve, extent, &lod_comp->llc_extent, write); repeated = !!(sd->sd_repeat); @@ -6872,7 +7227,7 @@ static int lod_declare_update_plain(const struct lu_env *env, RETURN(-EALREADY); lod_obj_inc_layout_gen(lo); - rc = lod_declare_instantiate_components(env, lo, th); + rc = lod_declare_instantiate_components(env, lo, th, 0); EXIT; out: if (rc) @@ -6982,8 +7337,8 @@ static inline int lod_check_ost_avail(const struct lu_env *env, ost = OST_TGT(lod, idx); if (ost->ltd_statfs.os_state & - (OS_STATE_READONLY | OS_STATE_ENOSPC | OS_STATE_ENOINO | - OS_STATE_NOPRECREATE) || + (OS_STATFS_READONLY | OS_STATFS_ENOSPC | OS_STATFS_ENOINO | + OS_STATFS_NOPRECREATE) || ost->ltd_active == 0) { CDEBUG(D_LAYOUT, DFID ": mirror %d OST%d unavail, rc = %d\n", PFID(lod_object_fid(lo)), index, idx, rc); @@ -7032,7 +7387,7 @@ static int lod_primary_pick(const struct lu_env *env, struct lod_object *lo, } /* 2nd pick is for the primary mirror containing unavail OST */ - if (lo->ldo_mirrors[index].lme_primary && second_pick < 0) + if (lo->ldo_mirrors[index].lme_prefer && second_pick < 0) second_pick = index; /* 3rd pick is for non-primary mirror containing unavail OST */ @@ -7043,7 +7398,7 @@ static int lod_primary_pick(const struct lu_env *env, struct lod_object *lo, * we found a non-primary 1st pick, we'd like to find a * potential pirmary mirror. */ - if (picked >= 0 && !lo->ldo_mirrors[index].lme_primary) + if (picked >= 0 && !lo->ldo_mirrors[index].lme_prefer) continue; /* check the availability of OSTs */ @@ -7080,7 +7435,7 @@ static int lod_primary_pick(const struct lu_env *env, struct lod_object *lo, * primary with all OSTs are available, this is the perfect * 1st pick. */ - if (lo->ldo_mirrors[index].lme_primary) + if (lo->ldo_mirrors[index].lme_prefer) break; } /* for all mirrors */ @@ -7289,7 +7644,7 @@ static int lod_declare_update_rdonly(const struct lu_env *env, lo->ldo_layout_gen = layout_version & 0xffff; } - rc = lod_declare_instantiate_components(env, lo, th); + rc = lod_declare_instantiate_components(env, lo, th, 0); if (rc) GOTO(out, rc); @@ -7324,22 +7679,29 @@ static int lod_declare_update_write_pending(const struct lu_env *env, LASSERT(mlc->mlc_opc == MD_LAYOUT_WRITE || mlc->mlc_opc == MD_LAYOUT_RESYNC); - /* look for the primary mirror */ + /* look for the first preferred mirror */ for (i = 0; i < lo->ldo_mirror_count; i++) { if (lo->ldo_mirrors[i].lme_stale) continue; - - LASSERTF(primary < 0, DFID " has multiple primary: %u / %u", - PFID(lod_object_fid(lo)), - lo->ldo_mirrors[i].lme_id, - lo->ldo_mirrors[primary].lme_id); + if (lo->ldo_mirrors[i].lme_prefer == 0) + continue; primary = i; + break; } if (primary < 0) { - CERROR(DFID ": doesn't have a primary mirror\n", - PFID(lod_object_fid(lo))); - GOTO(out, rc = -ENODATA); + /* no primary, use any in-sync */ + for (i = 0; i < lo->ldo_mirror_count; i++) { + if (lo->ldo_mirrors[i].lme_stale) + continue; + primary = i; + break; + } + if (primary < 0) { + CERROR(DFID ": doesn't have a primary mirror\n", + PFID(lod_object_fid(lo))); + GOTO(out, rc = -ENODATA); + } } CDEBUG(D_LAYOUT, DFID": found primary %u\n", @@ -7433,7 +7795,7 @@ static int lod_declare_update_write_pending(const struct lu_env *env, lo->ldo_flr_state = LCM_FL_SYNC_PENDING; } - rc = lod_declare_instantiate_components(env, lo, th); + rc = lod_declare_instantiate_components(env, lo, th, 0); if (rc) GOTO(out, rc); @@ -7600,8 +7962,7 @@ static int lod_dir_declare_layout_attach(const struct lu_env *env, dof->dof_type = DFT_DIR; - OBD_ALLOC(stripes, - sizeof(*stripes) * (lo->ldo_dir_stripe_count + stripe_count)); + OBD_ALLOC_PTR_ARRAY(stripes, (lo->ldo_dir_stripe_count + stripe_count)); if (!stripes) RETURN(-ENOMEM); @@ -7692,14 +8053,19 @@ static int lod_dir_declare_layout_attach(const struct lu_env *env, } if (lo->ldo_stripe) - OBD_FREE(lo->ldo_stripe, - sizeof(*stripes) * lo->ldo_dir_stripes_allocated); + OBD_FREE_PTR_ARRAY(lo->ldo_stripe, + lo->ldo_dir_stripes_allocated); lo->ldo_stripe = stripes; lo->ldo_dir_migrate_offset = lo->ldo_dir_stripe_count; lo->ldo_dir_migrate_hash = le32_to_cpu(lmv->lmv_hash_type); lo->ldo_dir_stripe_count += stripe_count; lo->ldo_dir_stripes_allocated += stripe_count; - lo->ldo_dir_hash_type |= LMV_HASH_FLAG_MIGRATION; + + /* plain directory split creates target as a plain directory, while + * after source attached as the first stripe, it becomes a striped + * directory, set correct do_index_ops, otherwise it can't be unlinked. + */ + dt->do_index_ops = &lod_striped_index_ops; RETURN(0); out: @@ -7707,8 +8073,7 @@ out: while (i < lo->ldo_dir_stripe_count + stripe_count && stripes[i]) dt_object_put(env, stripes[i++]); - OBD_FREE(stripes, - sizeof(*stripes) * (stripe_count + lo->ldo_dir_stripe_count)); + OBD_FREE_PTR_ARRAY(stripes, stripe_count + lo->ldo_dir_stripe_count); return rc; } @@ -7878,6 +8243,86 @@ static int lod_dir_declare_layout_shrink(const struct lu_env *env, return rc; } +/** + * Allocate stripes for split directory. + * + * \param[in] env execution environment + * \param[in] dt target object + * \param[in] mlc layout change data + * \param[in] th transaction handle + * + * \retval 0 on success + * \retval negative if failed + */ +static int lod_dir_declare_layout_split(const struct lu_env *env, + struct dt_object *dt, + const struct md_layout_change *mlc, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev); + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object_format *dof = &info->lti_format; + struct lmv_user_md_v1 *lum = mlc->mlc_spec->u.sp_ea.eadata; + struct dt_object **stripes; + u32 stripe_count; + u32 saved_count; + int i; + int rc; + + ENTRY; + + LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC); + LASSERT(le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT); + + saved_count = lo->ldo_dir_stripes_allocated; + stripe_count = le32_to_cpu(lum->lum_stripe_count); + if (stripe_count <= saved_count) + RETURN(-EINVAL); + + dof->dof_type = DFT_DIR; + + OBD_ALLOC(stripes, sizeof(*stripes) * stripe_count); + if (!stripes) + RETURN(-ENOMEM); + + for (i = 0; i < lo->ldo_dir_stripes_allocated; i++) + stripes[i] = lo->ldo_stripe[i]; + + lod_qos_statfs_update(env, lod, &lod->lod_mdt_descs); + rc = lod_mdt_alloc_qos(env, lo, stripes, saved_count, stripe_count); + if (rc == -EAGAIN) + rc = lod_mdt_alloc_rr(env, lo, stripes, saved_count, + stripe_count); + if (rc < 0) { + OBD_FREE(stripes, sizeof(*stripes) * stripe_count); + RETURN(rc); + } + + LASSERT(rc > saved_count); + OBD_FREE(lo->ldo_stripe, + sizeof(*stripes) * lo->ldo_dir_stripes_allocated); + lo->ldo_stripe = stripes; + lo->ldo_dir_striped = 1; + lo->ldo_dir_stripe_count = rc; + lo->ldo_dir_stripes_allocated = stripe_count; + lo->ldo_dir_split_hash = lo->ldo_dir_hash_type; + lo->ldo_dir_hash_type = le32_to_cpu(lum->lum_hash_type); + if (!lmv_is_known_hash_type(lo->ldo_dir_hash_type)) + lo->ldo_dir_hash_type = + lod->lod_mdt_descs.ltd_lmv_desc.ld_pattern; + lo->ldo_dir_hash_type |= LMV_HASH_FLAG_SPLIT | LMV_HASH_FLAG_MIGRATION; + lo->ldo_dir_split_offset = saved_count; + lo->ldo_dir_layout_version++; + lo->ldo_dir_stripe_loaded = 1; + + rc = lod_dir_declare_create_stripes(env, dt, mlc->mlc_attr, dof, th); + if (rc) + lod_striping_free(env, lo); + + RETURN(rc); +} + /* * detach all stripes from dir master object, NB, stripes are not destroyed, but * deleted from it's parent namespace, this function is called in two places: @@ -7944,11 +8389,11 @@ static int lod_dir_layout_detach(const struct lu_env *env, if (dto) dt_object_put(env, dto); } - OBD_FREE(lo->ldo_stripe, - sizeof(struct dt_object *) * lo->ldo_dir_stripes_allocated); + OBD_FREE_PTR_ARRAY(lo->ldo_stripe, lo->ldo_dir_stripes_allocated); lo->ldo_stripe = NULL; lo->ldo_dir_stripes_allocated = 0; lo->ldo_dir_stripe_count = 0; + dt->do_index_ops = &lod_index_ops; RETURN(rc); } @@ -8065,6 +8510,7 @@ static mlc_handler dir_mlc_declare_ops[MD_LAYOUT_MAX] = { [MD_LAYOUT_ATTACH] = lod_dir_declare_layout_attach, [MD_LAYOUT_DETACH] = lod_dir_declare_layout_detach, [MD_LAYOUT_SHRINK] = lod_dir_declare_layout_shrink, + [MD_LAYOUT_SPLIT] = lod_dir_declare_layout_split, }; static mlc_handler dir_mlc_ops[MD_LAYOUT_MAX] = { @@ -8152,7 +8598,7 @@ static int lod_layout_change(const struct lu_env *env, struct dt_object *dt, RETURN(rc); } -struct dt_object_operations lod_obj_ops = { +const struct dt_object_operations lod_obj_ops = { .do_read_lock = lod_read_lock, .do_write_lock = lod_write_lock, .do_read_unlock = lod_read_unlock, @@ -8253,7 +8699,7 @@ static int lod_punch(const struct lu_env *env, struct dt_object *dt, * body_ops themselves will check file type inside, see lod_read/write/punch for * details. */ -const struct dt_body_operations lod_body_ops = { +static const struct dt_body_operations lod_body_ops = { .dbo_read = lod_read, .dbo_declare_write = lod_declare_write, .dbo_write = lod_write, @@ -8304,7 +8750,7 @@ static int lod_object_init(const struct lu_env *env, struct lu_object *lo, if (ltd != NULL) { if (ltd->ltd_tgts_size > idx && - cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx)) { + test_bit(idx, ltd->ltd_tgt_bitmap)) { tgt = LTD_TGT(ltd, idx); LASSERT(tgt != NULL); @@ -8428,13 +8874,11 @@ void lod_striping_free_nolock(const struct lu_env *env, struct lod_object *lo) lu_object_put(env, &lod_comp->llc_stripe[j]->do_lu); } - OBD_FREE(lod_comp->llc_stripe, - sizeof(struct dt_object *) * - lod_comp->llc_stripes_allocated); + OBD_FREE_PTR_ARRAY(lod_comp->llc_stripe, + lod_comp->llc_stripes_allocated); lod_comp->llc_stripe = NULL; - OBD_FREE(lod_comp->llc_ost_indices, - sizeof(__u32) * - lod_comp->llc_stripes_allocated); + OBD_FREE_PTR_ARRAY(lod_comp->llc_ost_indices, + lod_comp->llc_stripes_allocated); lod_comp->llc_ost_indices = NULL; lod_comp->llc_stripes_allocated = 0; } @@ -8493,7 +8937,7 @@ static int lod_object_print(const struct lu_env *env, void *cookie, return (*p)(env, cookie, LUSTRE_LOD_NAME"-object@%p", o); } -struct lu_object_operations lod_lu_obj_ops = { +const struct lu_object_operations lod_lu_obj_ops = { .loo_object_init = lod_object_init, .loo_object_free = lod_object_free, .loo_object_release = lod_object_release,