X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Flod%2Flod_object.c;h=0381a6e99107923887e030088c45a6b01548b71b;hp=2fcb93b91da5143121afa4eb4c042db85762b995;hb=2d2dac3ae77a3cbdc505328b6cbf648323a0795c;hpb=11f2c86650fda8f1df606686799752223339b9e9 diff --git a/lustre/lod/lod_object.c b/lustre/lod/lod_object.c index 2fcb93b..0381a6e 100644 --- a/lustre/lod/lod_object.c +++ b/lustre/lod/lod_object.c @@ -367,6 +367,50 @@ static struct dt_index_operations lod_index_ops = { }; /** + * Implementation of dt_index_operations::dio_lookup + * + * Used with striped directories. + * + * \see dt_index_operations::dio_lookup() in the API description for details. + */ +static int lod_striped_lookup(const struct lu_env *env, struct dt_object *dt, + struct dt_rec *rec, const struct dt_key *key) +{ + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object *next; + const char *name = (const char *)key; + + LASSERT(lo->ldo_dir_stripe_count > 0); + + if (strcmp(name, dot) == 0) { + struct lu_fid *fid = (struct lu_fid *)rec; + + *fid = *lod_object_fid(lo); + return 1; + } + + if (strcmp(name, dotdot) == 0) { + next = dt_object_child(dt); + } else { + int index; + + index = __lmv_name_to_stripe_index(lo->ldo_dir_hash_type, + lo->ldo_dir_stripe_count, + lo->ldo_dir_migrate_hash, + lo->ldo_dir_migrate_offset, + name, strlen(name), true); + if (index < 0) + return index; + + next = lo->ldo_stripe[index]; + if (!next || !dt_object_exists(next)) + return -ENODEV; + } + + return next->do_index_ops->dio_lookup(env, next, rec, key); +} + +/** * Implementation of dt_it_ops::init. * * Used with striped objects. Internally just initializes the iterator @@ -744,7 +788,7 @@ static int lod_striped_it_load(const struct lu_env *env, } static struct dt_index_operations lod_striped_index_ops = { - .dio_lookup = lod_lookup, + .dio_lookup = lod_striped_lookup, .dio_declare_insert = lod_declare_insert, .dio_insert = lod_insert, .dio_declare_delete = lod_declare_delete, @@ -871,8 +915,8 @@ int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo, goto next; } - len = snprintf(name, sizeof(name), - DFID":", PFID(&ent->lde_fid)); + len = scnprintf(name, sizeof(name), + DFID":", PFID(&ent->lde_fid)); /* The ent->lde_name is composed of ${FID}:${index} */ if (ent->lde_namelen < len + 1 || memcmp(ent->lde_name, name, len) != 0) { @@ -1075,6 +1119,37 @@ static int lod_attr_get(const struct lu_env *env, return dt_attr_get(env, dt_object_child(dt), attr); } +void lod_adjust_stripe_size(struct lod_layout_component *comp, + __u32 def_stripe_size) +{ + __u64 comp_end = comp->llc_extent.e_end; + + /* Choose stripe size if not set. Note that default stripe size can't + * be used as is, because it must be multiplier of given component end. + * - first check if default stripe size can be used + * - if not than select the lowest set bit from component end and use + * that value as stripe size + */ + if (!comp->llc_stripe_size) { + if (comp_end == LUSTRE_EOF || !(comp_end % def_stripe_size)) + comp->llc_stripe_size = def_stripe_size; + else + comp->llc_stripe_size = comp_end & ~(comp_end - 1); + } else { + /* check stripe size is multiplier of comp_end */ + if (comp_end != LUSTRE_EOF && + comp_end % comp->llc_stripe_size) { + /* fix that even for defined stripe size but warn + * about the problem, that must not happen + */ + CWARN("Component end %llu is not aligned by the stripe size %u\n", + comp_end, comp->llc_stripe_size); + dump_stack(); + comp->llc_stripe_size = comp_end & ~(comp_end - 1); + } + } +} + static inline void lod_adjust_stripe_info(struct lod_layout_component *comp, struct lov_desc *desc, int append_stripes) @@ -1087,8 +1162,8 @@ static inline void lod_adjust_stripe_info(struct lod_layout_component *comp, desc->ld_default_stripe_count; } } - if (comp->llc_stripe_size <= 0) - comp->llc_stripe_size = desc->ld_default_stripe_size; + + lod_adjust_stripe_size(comp, desc->ld_default_stripe_size); } int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo, @@ -1532,10 +1607,10 @@ static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt, if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_V1) rc = lmv_mds_md_size( le32_to_cpu(lmv1->lmv_stripe_count), - LMV_MAGIC_V1); + le32_to_cpu(lmv1->lmv_magic)); } else { - lfm = buf->lb_buf; - if (le32_to_cpu(lfm->lfm_magic) == LMV_MAGIC_FOREIGN) + lmv1 = buf->lb_buf; + if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1) RETURN(rc); if (rc != sizeof(*lmv1)) @@ -1681,7 +1756,8 @@ static int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt, lmm1->lmv_magic = cpu_to_le32(LMV_MAGIC); lmm1->lmv_stripe_count = cpu_to_le32(stripe_count); lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type); - if (lo->ldo_dir_hash_type & LMV_HASH_FLAG_MIGRATION) { + lmm1->lmv_layout_version = cpu_to_le32(lo->ldo_dir_layout_version); + if (lod_is_layout_changing(lo)) { lmm1->lmv_migrate_hash = cpu_to_le32(lo->ldo_dir_migrate_hash); lmm1->lmv_migrate_offset = cpu_to_le32(lo->ldo_dir_migrate_offset); @@ -1738,15 +1814,11 @@ int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo, RETURN(0); } - if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1) + if (!lmv_is_sane(lmv1)) RETURN(-EINVAL); - if (le32_to_cpu(lmv1->lmv_stripe_count) < 1) - RETURN(0); - LASSERT(lo->ldo_stripe == NULL); - OBD_ALLOC(stripe, sizeof(stripe[0]) * - (le32_to_cpu(lmv1->lmv_stripe_count))); + OBD_ALLOC_PTR_ARRAY(stripe, le32_to_cpu(lmv1->lmv_stripe_count)); if (stripe == NULL) RETURN(-ENOMEM); @@ -1789,6 +1861,10 @@ out: lo->ldo_stripe = stripe; lo->ldo_dir_stripe_count = le32_to_cpu(lmv1->lmv_stripe_count); lo->ldo_dir_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count); + lo->ldo_dir_layout_version = le32_to_cpu(lmv1->lmv_layout_version); + lo->ldo_dir_migrate_offset = le32_to_cpu(lmv1->lmv_migrate_offset); + lo->ldo_dir_migrate_hash = le32_to_cpu(lmv1->lmv_migrate_hash); + lo->ldo_dir_hash_type = le32_to_cpu(lmv1->lmv_hash_type); if (rc != 0) lod_striping_free_nolock(env, lo); @@ -1859,31 +1935,74 @@ static int lod_dir_declare_create_stripes(const struct lu_env *env, if (!dto) continue; - rc = lod_sub_declare_create(env, dto, attr, NULL, dof, th); - if (rc != 0) - GOTO(out, rc); + /* directory split skip create for existing stripes */ + if (!(lod_is_splitting(lo) && i < lo->ldo_dir_split_offset)) { + rc = lod_sub_declare_create(env, dto, attr, NULL, dof, + th); + if (rc != 0) + GOTO(out, rc); - if (!dt_try_as_dir(env, dto)) - GOTO(out, rc = -EINVAL); + if (!dt_try_as_dir(env, dto)) + GOTO(out, rc = -EINVAL); - rc = lod_sub_declare_ref_add(env, dto, th); - if (rc != 0) - GOTO(out, rc); + rc = lod_sub_declare_ref_add(env, dto, th); + if (rc != 0) + GOTO(out, rc); - rec->rec_fid = lu_object_fid(&dto->do_lu); - rc = lod_sub_declare_insert(env, dto, - (const struct dt_rec *)rec, - (const struct dt_key *)dot, th); - if (rc != 0) - GOTO(out, rc); + rec->rec_fid = lu_object_fid(&dto->do_lu); + rc = lod_sub_declare_insert(env, dto, + (const struct dt_rec *)rec, + (const struct dt_key *)dot, + th); + if (rc != 0) + GOTO(out, rc); - /* master stripe FID will be put to .. */ - rec->rec_fid = lu_object_fid(&dt->do_lu); - rc = lod_sub_declare_insert(env, dto, - (const struct dt_rec *)rec, - (const struct dt_key *)dotdot, th); - if (rc != 0) - GOTO(out, rc); + /* master stripe FID will be put to .. */ + rec->rec_fid = lu_object_fid(&dt->do_lu); + rc = lod_sub_declare_insert(env, dto, + (const struct dt_rec *)rec, + (const struct dt_key *)dotdot, + th); + if (rc != 0) + GOTO(out, rc); + + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && + cfs_fail_val == i) + snprintf(stripe_name, sizeof(info->lti_key), + DFID":%u", + PFID(lu_object_fid(&dto->do_lu)), + i + 1); + else + snprintf(stripe_name, sizeof(info->lti_key), + DFID":%u", + PFID(lu_object_fid(&dto->do_lu)), i); + + sname = lod_name_get(env, stripe_name, + strlen(stripe_name)); + rc = linkea_links_new(&ldata, &info->lti_linkea_buf, + sname, lu_object_fid(&dt->do_lu)); + if (rc != 0) + GOTO(out, rc); + + linkea_buf.lb_buf = ldata.ld_buf->lb_buf; + linkea_buf.lb_len = ldata.ld_leh->leh_len; + rc = lod_sub_declare_xattr_set(env, dto, &linkea_buf, + XATTR_NAME_LINK, 0, th); + if (rc != 0) + GOTO(out, rc); + + rec->rec_fid = lu_object_fid(&dto->do_lu); + rc = lod_sub_declare_insert(env, dt_object_child(dt), + (const struct dt_rec *)rec, + (const struct dt_key *)stripe_name, th); + if (rc != 0) + GOTO(out, rc); + + rc = lod_sub_declare_ref_add(env, dt_object_child(dt), + th); + if (rc != 0) + GOTO(out, rc); + } if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) || cfs_fail_val != i) { @@ -1899,39 +2018,6 @@ static int lod_dir_declare_create_stripes(const struct lu_env *env, if (rc != 0) GOTO(out, rc); } - - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && - cfs_fail_val == i) - snprintf(stripe_name, sizeof(info->lti_key), DFID":%u", - PFID(lu_object_fid(&dto->do_lu)), i + 1); - else - snprintf(stripe_name, sizeof(info->lti_key), DFID":%u", - PFID(lu_object_fid(&dto->do_lu)), i); - - sname = lod_name_get(env, stripe_name, strlen(stripe_name)); - rc = linkea_links_new(&ldata, &info->lti_linkea_buf, - sname, lu_object_fid(&dt->do_lu)); - if (rc != 0) - GOTO(out, rc); - - linkea_buf.lb_buf = ldata.ld_buf->lb_buf; - linkea_buf.lb_len = ldata.ld_leh->leh_len; - rc = lod_sub_declare_xattr_set(env, dto, &linkea_buf, - XATTR_NAME_LINK, 0, th); - if (rc != 0) - GOTO(out, rc); - - rec->rec_fid = lu_object_fid(&dto->do_lu); - rc = lod_sub_declare_insert(env, dt_object_child(dt), - (const struct dt_rec *)rec, - (const struct dt_key *)stripe_name, - th); - if (rc != 0) - GOTO(out, rc); - - rc = lod_sub_declare_ref_add(env, dt_object_child(dt), th); - if (rc != 0) - GOTO(out, rc); } rc = lod_sub_declare_xattr_set(env, dt_object_child(dt), @@ -1974,7 +2060,6 @@ static int lod_mdt_alloc_specific(const struct lu_env *env, struct dt_object **stripes, __u32 *mdt_indices, bool is_specific) { - struct lod_thread_info *info = lod_env_info(env); struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); struct lu_tgt_descs *ltd = &lod->lod_mdt_descs; struct lu_tgt_desc *tgt = NULL; @@ -2023,17 +2108,17 @@ static int lod_mdt_alloc_specific(const struct lu_env *env, /* Sigh, this index is not in the bitmap, let's check * next available target */ - if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx) && + if (!test_bit(idx, ltd->ltd_tgt_bitmap) && idx != master_index) continue; if (idx == master_index) { /* Allocate the FID locally */ - rc = obd_fid_alloc(env, lod->lod_child_exp, - &fid, NULL); + tgt_dt = lod->lod_child; + rc = dt_fid_alloc(env, tgt_dt, &fid, NULL, + NULL); if (rc < 0) continue; - tgt_dt = lod->lod_child; break; } @@ -2043,12 +2128,11 @@ static int lod_mdt_alloc_specific(const struct lu_env *env, continue; tgt_dt = tgt->ltd_tgt; - rc = dt_statfs(env, tgt_dt, &info->lti_osfs); - if (rc) + if (!tgt->ltd_active) /* this OSP doesn't feel well */ continue; - rc = obd_fid_alloc(env, tgt->ltd_exp, &fid, NULL); + rc = dt_fid_alloc(env, tgt_dt, &fid, NULL, NULL); if (rc < 0) continue; @@ -2133,12 +2217,12 @@ static int lod_prep_md_striped_create(const struct lu_env *env, stripe_count = lo->ldo_dir_stripe_count; - OBD_ALLOC(stripes, sizeof(stripes[0]) * stripe_count); + OBD_ALLOC_PTR_ARRAY(stripes, stripe_count); if (!stripes) RETURN(-ENOMEM); /* Allocate the first stripe locally */ - rc = obd_fid_alloc(env, lod->lod_child_exp, &fid, NULL); + rc = dt_fid_alloc(env, lod->lod_child, &fid, NULL, NULL); if (rc < 0) GOTO(out, rc); @@ -2149,14 +2233,15 @@ static int lod_prep_md_striped_create(const struct lu_env *env, if (lo->ldo_dir_stripe_offset == LMV_OFFSET_DEFAULT) { lod_qos_statfs_update(env, lod, &lod->lod_mdt_descs); - rc = lod_mdt_alloc_qos(env, lo, stripes); + rc = lod_mdt_alloc_qos(env, lo, stripes, 1, stripe_count); if (rc == -EAGAIN) - rc = lod_mdt_alloc_rr(env, lo, stripes); + rc = lod_mdt_alloc_rr(env, lo, stripes, 1, + stripe_count); } else { int *idx_array; bool is_specific = false; - OBD_ALLOC(idx_array, sizeof(idx_array[0]) * stripe_count); + OBD_ALLOC_PTR_ARRAY(idx_array, stripe_count); if (!idx_array) GOTO(out, rc = -ENOMEM); @@ -2172,7 +2257,7 @@ static int lod_prep_md_striped_create(const struct lu_env *env, lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id; rc = lod_mdt_alloc_specific(env, lo, stripes, idx_array, is_specific); - OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count); + OBD_FREE_PTR_ARRAY(idx_array, stripe_count); } if (rc < 0) @@ -2199,7 +2284,7 @@ out: dt_object_put(env, stripes[0]); for (i = 1; i < stripe_count; i++) LASSERT(!stripes[i]); - OBD_FREE(stripes, sizeof(stripes[0]) * stripe_count); + OBD_FREE_PTR_ARRAY(stripes, stripe_count); return rc; } @@ -2286,344 +2371,143 @@ out: } /** - * Append source stripes after target stripes for migrating directory. NB, we - * only need to declare this, the append is done inside lod_xattr_set_lmv(). + * Set or replace striped directory layout, and LFSCK may set layout on a plain + * directory, so don't check stripe count. * * \param[in] env execution environment * \param[in] dt target object * \param[in] buf LMV buf which contains source stripe fids + * \param[in] fl set or replace * \param[in] th transaction handle * * \retval 0 on success * \retval negative if failed */ -static int lod_dir_declare_layout_add(const struct lu_env *env, - struct dt_object *dt, - const struct lu_buf *buf, - struct thandle *th) +static int lod_dir_layout_set(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + int fl, + struct thandle *th) { - struct lod_thread_info *info = lod_env_info(env); - struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev); - struct lod_tgt_descs *ltd = &lod->lod_mdt_descs; - struct lod_object *lo = lod_dt_obj(dt); struct dt_object *next = dt_object_child(dt); - struct dt_object_format *dof = &info->lti_format; + struct lod_object *lo = lod_dt_obj(dt); + struct lod_device *lod = lu2lod_dev(lod2lu_obj(lo)->lo_dev); struct lmv_mds_md_v1 *lmv = buf->lb_buf; - struct dt_object **stripe; - __u32 stripe_count = le32_to_cpu(lmv->lmv_stripe_count); - struct lu_fid *fid = &info->lti_fid; - struct lod_tgt_desc *tgt; - struct dt_object *dto; - struct dt_device *tgt_dt; - int type = LU_SEQ_RANGE_ANY; - struct dt_insert_rec *rec = &info->lti_dt_rec; - char *stripe_name = info->lti_key; - struct lu_name *sname; - struct linkea_data ldata = { NULL }; - struct lu_buf linkea_buf; - __u32 idx; + struct lmv_mds_md_v1 *slave_lmv; + struct lu_buf slave_buf; int i; int rc; ENTRY; - if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1) + if (!lmv_is_sane2(lmv)) RETURN(-EINVAL); - if (stripe_count == 0) - RETURN(-EINVAL); + /* adjust hash for dir merge, which may not be set in user command */ + if (lmv_is_merging(lmv) && !lmv->lmv_migrate_hash) + lmv->lmv_merge_hash = + lod->lod_mdt_descs.ltd_lmv_desc.ld_pattern; - dof->dof_type = DFT_DIR; + LMV_DEBUG(D_INFO, lmv, "set"); - OBD_ALLOC(stripe, - sizeof(*stripe) * (lo->ldo_dir_stripe_count + stripe_count)); - if (stripe == NULL) + rc = lod_sub_xattr_set(env, next, buf, XATTR_NAME_LMV, fl, th); + if (rc) + RETURN(rc); + + /* directory restripe may update stripe LMV directly */ + if (!lo->ldo_dir_stripe_count) + RETURN(0); + + lo->ldo_dir_hash_type = le32_to_cpu(lmv->lmv_hash_type); + lo->ldo_dir_migrate_offset = le32_to_cpu(lmv->lmv_migrate_offset); + lo->ldo_dir_migrate_hash = le32_to_cpu(lmv->lmv_migrate_hash); + lo->ldo_dir_layout_version = le32_to_cpu(lmv->lmv_layout_version); + + OBD_ALLOC_PTR(slave_lmv); + if (!slave_lmv) RETURN(-ENOMEM); - for (i = 0; i < lo->ldo_dir_stripe_count; i++) - stripe[i] = lo->ldo_stripe[i]; + lod_prep_slave_lmv_md(slave_lmv, lmv); + slave_buf.lb_buf = slave_lmv; + slave_buf.lb_len = sizeof(*slave_lmv); - for (i = 0; i < stripe_count; i++) { - fid_le_to_cpu(fid, - &lmv->lmv_stripe_fids[i]); - if (!fid_is_sane(fid)) + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + if (!lo->ldo_stripe[i]) continue; - rc = lod_fld_lookup(env, lod, fid, &idx, &type); + if (!dt_object_exists(lo->ldo_stripe[i])) + continue; + + rc = lod_sub_xattr_set(env, lo->ldo_stripe[i], &slave_buf, + XATTR_NAME_LMV, fl, th); if (rc) - GOTO(out, rc); + break; + } - if (idx == lod2lu_dev(lod)->ld_site->ld_seq_site->ss_node_id) { - tgt_dt = lod->lod_child; - } else { - tgt = LTD_TGT(ltd, idx); - if (tgt == NULL) - GOTO(out, rc = -ESTALE); - tgt_dt = tgt->ltd_tgt; - } + OBD_FREE_PTR(slave_lmv); - dto = dt_locate_at(env, tgt_dt, fid, - lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev, - NULL); - if (IS_ERR(dto)) - GOTO(out, rc = PTR_ERR(dto)); + RETURN(rc); +} - stripe[i + lo->ldo_dir_stripe_count] = dto; +/** + * Implementation of dt_object_operations::do_declare_xattr_set. + * + * Used with regular (non-striped) objects. Basically it + * initializes the striping information and applies the + * change to all the stripes. + * + * \see dt_object_operations::do_declare_xattr_set() in the API description + * for details. + */ +static int lod_dir_declare_xattr_set(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + const char *name, int fl, + struct thandle *th) +{ + struct dt_object *next = dt_object_child(dt); + struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); + struct lod_object *lo = lod_dt_obj(dt); + int i; + int rc; + ENTRY; - if (!dt_try_as_dir(env, dto)) - GOTO(out, rc = -ENOTDIR); + if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { + struct lmv_user_md_v1 *lum; - rc = lod_sub_declare_ref_add(env, dto, th); - if (rc) - GOTO(out, rc); + LASSERT(buf != NULL && buf->lb_buf != NULL); + lum = buf->lb_buf; + rc = lod_verify_md_striping(d, lum); + if (rc != 0) + RETURN(rc); + } else if (strcmp(name, XATTR_NAME_LOV) == 0) { + rc = lod_verify_striping(env, d, lo, buf, false); + if (rc != 0) + RETURN(rc); + } - rc = lod_sub_declare_insert(env, dto, - (const struct dt_rec *)rec, - (const struct dt_key *)dot, th); - if (rc) - GOTO(out, rc); + rc = lod_sub_declare_xattr_set(env, next, buf, name, fl, th); + if (rc != 0) + RETURN(rc); - rc = lod_sub_declare_insert(env, dto, - (const struct dt_rec *)rec, - (const struct dt_key *)dotdot, th); - if (rc) - GOTO(out, rc); + /* Note: Do not set LinkEA on sub-stripes, otherwise + * it will confuse the fid2path process(see mdt_path_current()). + * The linkEA between master and sub-stripes is set in + * lod_xattr_set_lmv(). */ + if (strcmp(name, XATTR_NAME_LINK) == 0) + RETURN(0); - rc = lod_sub_declare_xattr_set(env, dto, buf, - XATTR_NAME_LMV, 0, th); - if (rc) - GOTO(out, rc); + /* set xattr to each stripes, if needed */ + rc = lod_striping_load(env, lo); + if (rc != 0) + RETURN(rc); - snprintf(stripe_name, sizeof(info->lti_key), DFID":%u", - PFID(lu_object_fid(&dto->do_lu)), - i + lo->ldo_dir_stripe_count); + if (lo->ldo_dir_stripe_count == 0) + RETURN(0); - sname = lod_name_get(env, stripe_name, strlen(stripe_name)); - rc = linkea_links_new(&ldata, &info->lti_linkea_buf, - sname, lu_object_fid(&dt->do_lu)); - if (rc) - GOTO(out, rc); - - linkea_buf.lb_buf = ldata.ld_buf->lb_buf; - linkea_buf.lb_len = ldata.ld_leh->leh_len; - rc = lod_sub_declare_xattr_set(env, dto, &linkea_buf, - XATTR_NAME_LINK, 0, th); - if (rc) - GOTO(out, rc); - - rc = lod_sub_declare_insert(env, next, - (const struct dt_rec *)rec, - (const struct dt_key *)stripe_name, - th); - if (rc) - GOTO(out, rc); - - rc = lod_sub_declare_ref_add(env, next, th); - if (rc) - GOTO(out, rc); - } - - if (lo->ldo_stripe) - OBD_FREE(lo->ldo_stripe, - sizeof(*stripe) * lo->ldo_dir_stripes_allocated); - lo->ldo_stripe = stripe; - lo->ldo_dir_migrate_offset = lo->ldo_dir_stripe_count; - lo->ldo_dir_migrate_hash = le32_to_cpu(lmv->lmv_hash_type); - lo->ldo_dir_stripe_count += stripe_count; - lo->ldo_dir_stripes_allocated += stripe_count; - lo->ldo_dir_hash_type |= LMV_HASH_FLAG_MIGRATION; - - RETURN(0); -out: - i = lo->ldo_dir_stripe_count; - while (i < lo->ldo_dir_stripe_count + stripe_count && stripe[i]) - dt_object_put(env, stripe[i++]); - - OBD_FREE(stripe, - sizeof(*stripe) * (stripe_count + lo->ldo_dir_stripe_count)); - RETURN(rc); -} - -static int lod_dir_declare_layout_delete(const struct lu_env *env, - struct dt_object *dt, - const struct lu_buf *buf, - struct thandle *th) -{ - struct lod_thread_info *info = lod_env_info(env); - struct lod_object *lo = lod_dt_obj(dt); - struct dt_object *next = dt_object_child(dt); - struct lmv_user_md *lmu = buf->lb_buf; - __u32 final_stripe_count; - char *stripe_name = info->lti_key; - struct dt_object *dto; - int i; - int rc = 0; - - if (!lmu) - return -EINVAL; - - final_stripe_count = le32_to_cpu(lmu->lum_stripe_count); - if (final_stripe_count >= lo->ldo_dir_stripe_count) - return -EINVAL; - - for (i = final_stripe_count; i < lo->ldo_dir_stripe_count; i++) { - dto = lo->ldo_stripe[i]; - if (!dto) - continue; - - if (!dt_try_as_dir(env, dto)) - return -ENOTDIR; - - rc = lod_sub_declare_delete(env, dto, - (const struct dt_key *)dot, th); - if (rc) - return rc; - - rc = lod_sub_declare_ref_del(env, dto, th); - if (rc) - return rc; - - rc = lod_sub_declare_delete(env, dto, - (const struct dt_key *)dotdot, th); - if (rc) - return rc; - - snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", - PFID(lu_object_fid(&dto->do_lu)), i); - - rc = lod_sub_declare_delete(env, next, - (const struct dt_key *)stripe_name, th); - if (rc) - return rc; - - rc = lod_sub_declare_ref_del(env, next, th); - if (rc) - return rc; - } - - return 0; -} - -/* - * delete stripes from dir master object, the lum_stripe_count in argument is - * the final stripe count, the stripes after that will be deleted, NB, they - * are not destroyed, but deleted from it's parent namespace, this function - * will be called in two places: - * 1. mdd_migrate_create() delete stripes from source, and append them to - * target. - * 2. mdd_dir_layout_shrink() delete stripes from source, and destroy them. - */ -static int lod_dir_layout_delete(const struct lu_env *env, - struct dt_object *dt, - const struct lu_buf *buf, - struct thandle *th) -{ - struct lod_thread_info *info = lod_env_info(env); - struct lod_object *lo = lod_dt_obj(dt); - struct dt_object *next = dt_object_child(dt); - struct lmv_user_md *lmu = buf->lb_buf; - __u32 final_stripe_count; - char *stripe_name = info->lti_key; - struct dt_object *dto; - int i; - int rc = 0; - - ENTRY; - - if (!lmu) - RETURN(-EINVAL); - - final_stripe_count = le32_to_cpu(lmu->lum_stripe_count); - if (final_stripe_count >= lo->ldo_dir_stripe_count) - RETURN(-EINVAL); - - for (i = final_stripe_count; i < lo->ldo_dir_stripe_count; i++) { - dto = lo->ldo_stripe[i]; - if (!dto) - continue; - - rc = lod_sub_delete(env, dto, - (const struct dt_key *)dotdot, th); - if (rc) - break; - - snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", - PFID(lu_object_fid(&dto->do_lu)), i); - - rc = lod_sub_delete(env, next, - (const struct dt_key *)stripe_name, th); - if (rc) - break; - - rc = lod_sub_ref_del(env, next, th); - if (rc) - break; - } - - lod_striping_free(env, lod_dt_obj(dt)); - - RETURN(rc); -} - -/** - * Implementation of dt_object_operations::do_declare_xattr_set. - * - * Used with regular (non-striped) objects. Basically it - * initializes the striping information and applies the - * change to all the stripes. - * - * \see dt_object_operations::do_declare_xattr_set() in the API description - * for details. - */ -static int lod_dir_declare_xattr_set(const struct lu_env *env, - struct dt_object *dt, - const struct lu_buf *buf, - const char *name, int fl, - struct thandle *th) -{ - struct dt_object *next = dt_object_child(dt); - struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); - struct lod_object *lo = lod_dt_obj(dt); - int i; - int rc; - ENTRY; - - if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { - struct lmv_user_md_v1 *lum; - - LASSERT(buf != NULL && buf->lb_buf != NULL); - lum = buf->lb_buf; - rc = lod_verify_md_striping(d, lum); - if (rc != 0) - RETURN(rc); - } else if (strcmp(name, XATTR_NAME_LOV) == 0) { - rc = lod_verify_striping(d, lo, buf, false); - if (rc != 0) - RETURN(rc); - } - - rc = lod_sub_declare_xattr_set(env, next, buf, name, fl, th); - if (rc != 0) - RETURN(rc); - - /* Note: Do not set LinkEA on sub-stripes, otherwise - * it will confuse the fid2path process(see mdt_path_current()). - * The linkEA between master and sub-stripes is set in - * lod_xattr_set_lmv(). */ - if (strcmp(name, XATTR_NAME_LINK) == 0) - RETURN(0); - - /* set xattr to each stripes, if needed */ - rc = lod_striping_load(env, lo); - if (rc != 0) - RETURN(rc); - - if (lo->ldo_dir_stripe_count == 0) - RETURN(0); - - for (i = 0; i < lo->ldo_dir_stripe_count; i++) { - if (!lo->ldo_stripe[i]) - continue; + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + if (!lo->ldo_stripe[i]) + continue; if (!dt_object_exists(lo->ldo_stripe[i])) continue; @@ -2844,7 +2728,7 @@ static int lod_declare_layout_add(const struct lu_env *env, if (lo->ldo_flr_state != LCM_FL_NONE) RETURN(-EBUSY); - rc = lod_verify_striping(d, lo, buf, false); + rc = lod_verify_striping(env, d, lo, buf, false); if (rc != 0) RETURN(rc); @@ -2858,7 +2742,7 @@ static int lod_declare_layout_add(const struct lu_env *env, RETURN(-EINVAL); array_cnt = lo->ldo_comp_cnt + comp_v1->lcm_entry_count; - OBD_ALLOC(comp_array, sizeof(*comp_array) * array_cnt); + OBD_ALLOC_PTR_ARRAY(comp_array, array_cnt); if (comp_array == NULL) RETURN(-ENOMEM); @@ -2912,7 +2796,7 @@ static int lod_declare_layout_add(const struct lu_env *env, GOTO(error, rc); } - OBD_FREE(old_array, sizeof(*lod_comp) * old_array_cnt); + OBD_FREE_PTR_ARRAY(old_array, old_array_cnt); LASSERT(lo->ldo_mirror_count == 1); lo->ldo_mirrors[0].lme_end = array_cnt - 1; @@ -2928,7 +2812,7 @@ error: lod_comp->llc_pool = NULL; } } - OBD_FREE(comp_array, sizeof(*comp_array) * array_cnt); + OBD_FREE_PTR_ARRAY(comp_array, array_cnt); RETURN(rc); } @@ -2973,6 +2857,8 @@ bool lod_last_non_stale_mirror(__u16 mirror_id, struct lod_object *lo) * the '$field' can only be 'flags' now. The xattr value is binary * lov_comp_md_v1 which contains the component ID(s) and the value of * the field to be modified. + * Please update allowed_lustre_lov macro if $field groks more values + * in the future. * * \param[in] env execution environment * \param[in] dt dt_object to be modified @@ -2998,6 +2884,9 @@ static int lod_declare_layout_set(const struct lu_env *env, bool changed = false; ENTRY; + /* Please update allowed_lustre_lov macro if op + * groks more values in the future + */ if (strcmp(op, "set.flags") != 0) { CDEBUG(D_LAYOUT, "%s: operation (%s) not supported.\n", lod2obd(d)->obd_name, op); @@ -3449,7 +3338,7 @@ static int lod_declare_layout_merge(const struct lu_env *env, lcme->lcme_id = cpu_to_le32(id); } - id = MAX(le32_to_cpu(lcme->lcme_id), id); + id = max(le32_to_cpu(lcme->lcme_id), id); } mirror_id = mirror_id_of(id) + 1; @@ -3581,9 +3470,8 @@ static int lod_declare_xattr_set(const struct lu_env *env, strcmp(name, XATTR_LUSTRE_LOV) == 0); rc = lod_declare_layout_split(env, dt, buf, th); } else if (S_ISREG(mode) && - strlen(name) > strlen(XATTR_LUSTRE_LOV) + 1 && - strncmp(name, XATTR_LUSTRE_LOV, - strlen(XATTR_LUSTRE_LOV)) == 0) { + strlen(name) >= sizeof(XATTR_LUSTRE_LOV) + 3 && + allowed_lustre_lov(name)) { /* * this is a request to modify object's striping. * add/set/del component(s). @@ -3592,20 +3480,6 @@ static int lod_declare_xattr_set(const struct lu_env *env, RETURN(-ENOENT); rc = lod_declare_modify_layout(env, dt, name, buf, th); - } else if (strncmp(name, XATTR_NAME_LMV, strlen(XATTR_NAME_LMV)) == 0 && - strlen(name) > strlen(XATTR_NAME_LMV) + 1) { - const char *op = name + strlen(XATTR_NAME_LMV) + 1; - - rc = -ENOTSUPP; - if (strcmp(op, "add") == 0) - rc = lod_dir_declare_layout_add(env, dt, buf, th); - else if (strcmp(op, "del") == 0) - rc = lod_dir_declare_layout_delete(env, dt, buf, th); - else if (strcmp(op, "set") == 0) - rc = lod_sub_declare_xattr_set(env, next, buf, - XATTR_NAME_LMV, fl, th); - - RETURN(rc); } else if (S_ISDIR(mode)) { rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th); } else if (strcmp(name, XATTR_NAME_FID) == 0) { @@ -3944,9 +3818,15 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, if (i && OBD_FAIL_CHECK(OBD_FAIL_MDS_STRIPE_CREATE)) continue; - /* if it's source stripe of migrating directory, don't create */ - if (!((lo->ldo_dir_hash_type & LMV_HASH_FLAG_MIGRATION) && - i >= lo->ldo_dir_migrate_offset)) { + /* don't create stripe if: + * 1. it's source stripe of migrating directory + * 2. it's existed stripe of splitting directory + */ + if ((lod_is_migrating(lo) && i >= lo->ldo_dir_migrate_offset) || + (lod_is_splitting(lo) && i < lo->ldo_dir_split_offset)) { + if (!dt_object_exists(dto)) + GOTO(out, rc = -EINVAL); + } else { dt_write_lock(env, dto, DT_TGT_CHILD); rc = lod_sub_create(env, dto, attr, NULL, dof, th); if (rc != 0) { @@ -3967,12 +3847,6 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, GOTO(out, rc); } - rec->rec_fid = lu_object_fid(&dt->do_lu); - rc = lod_sub_insert(env, dto, (struct dt_rec *)rec, - (const struct dt_key *)dotdot, th); - if (rc != 0) - GOTO(out, rc); - if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) || cfs_fail_val != i) { if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_LMV) && @@ -3984,11 +3858,26 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, cpu_to_le32(i); rc = lod_sub_xattr_set(env, dto, &slave_lmv_buf, - XATTR_NAME_LMV, fl, th); + XATTR_NAME_LMV, 0, th); if (rc != 0) GOTO(out, rc); } + /* don't insert stripe if it's existed stripe of splitting + * directory (this directory is striped). + * NB, plain directory will insert itself as the first + * stripe in target. + */ + if (lod_is_splitting(lo) && lo->ldo_dir_split_offset > 1 && + lo->ldo_dir_split_offset > i) + continue; + + rec->rec_fid = lu_object_fid(&dt->do_lu); + rc = lod_sub_insert(env, dto, (struct dt_rec *)rec, + (const struct dt_key *)dotdot, th); + if (rc != 0) + GOTO(out, rc); + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && cfs_fail_val == i) snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", @@ -4312,7 +4201,7 @@ static int lod_layout_repeat_comp(const struct lu_env *env, CDEBUG(D_LAYOUT, "repeating component %d\n", index); - OBD_ALLOC(comp_array, sizeof(*comp_array) * new_cnt); + OBD_ALLOC_PTR_ARRAY(comp_array, new_cnt); if (comp_array == NULL) GOTO(out, rc = -ENOMEM); @@ -4359,8 +4248,7 @@ static int lod_layout_repeat_comp(const struct lu_env *env, new_comp->llc_ostlist.op_array = op_array; } - OBD_FREE(lo->ldo_comp_entries, - sizeof(*comp_array) * lo->ldo_comp_cnt); + OBD_FREE_PTR_ARRAY(lo->ldo_comp_entries, lo->ldo_comp_cnt); lo->ldo_comp_entries = comp_array; lo->ldo_comp_cnt = new_cnt; @@ -4374,7 +4262,7 @@ static int lod_layout_repeat_comp(const struct lu_env *env, EXIT; out: if (rc) - OBD_FREE(comp_array, sizeof(*comp_array) * new_cnt); + OBD_FREE_PTR_ARRAY(comp_array, new_cnt); return rc; } @@ -4391,12 +4279,11 @@ static int lod_layout_data_init(struct lod_thread_info *info, __u32 comp_cnt) RETURN(0); if (info->lti_comp_size > 0) { - OBD_FREE(info->lti_comp_idx, - info->lti_comp_size * sizeof(__u32)); + OBD_FREE_PTR_ARRAY(info->lti_comp_idx, info->lti_comp_size); info->lti_comp_size = 0; } - OBD_ALLOC(info->lti_comp_idx, comp_cnt * sizeof(__u32)); + OBD_ALLOC_PTR_ARRAY(info->lti_comp_idx, comp_cnt); if (!info->lti_comp_idx) RETURN(-ENOMEM); @@ -4483,11 +4370,11 @@ static int lod_layout_del_prep_layout(const struct lu_env *env, lu_object_put(env, &obj->do_lu); lod_comp->llc_stripe[j] = NULL; } - OBD_FREE(lod_comp->llc_stripe, sizeof(*lod_comp->llc_stripe) * - lod_comp->llc_stripes_allocated); + OBD_FREE_PTR_ARRAY(lod_comp->llc_stripe, + lod_comp->llc_stripes_allocated); lod_comp->llc_stripe = NULL; - OBD_FREE(lod_comp->llc_ost_indices, - sizeof(__u32) * lod_comp->llc_stripes_allocated); + OBD_FREE_PTR_ARRAY(lod_comp->llc_ost_indices, + lod_comp->llc_stripes_allocated); lod_comp->llc_ost_indices = NULL; lod_comp->llc_stripes_allocated = 0; } @@ -4500,7 +4387,7 @@ static int lod_layout_del_prep_layout(const struct lu_env *env, if (info->lti_count > 0) { struct lod_layout_component *comp_array; - OBD_ALLOC(comp_array, sizeof(*comp_array) * info->lti_count); + OBD_ALLOC_PTR_ARRAY(comp_array, info->lti_count); if (comp_array == NULL) GOTO(out, rc = -ENOMEM); @@ -4510,8 +4397,7 @@ static int lod_layout_del_prep_layout(const struct lu_env *env, sizeof(*comp_array)); } - OBD_FREE(lo->ldo_comp_entries, - sizeof(*comp_array) * lo->ldo_comp_cnt); + OBD_FREE_PTR_ARRAY(lo->ldo_comp_entries, lo->ldo_comp_cnt); lo->ldo_comp_entries = comp_array; lo->ldo_comp_cnt = info->lti_count; } else { @@ -4605,33 +4491,28 @@ static int lod_xattr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, const char *name, int fl, struct thandle *th) { - struct dt_object *next = dt_object_child(dt); - int rc; + struct dt_object *next = dt_object_child(dt); + int rc; + ENTRY; if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && - strcmp(name, XATTR_NAME_LMV) == 0) { - rc = lod_dir_striping_create(env, dt, NULL, NULL, th); - RETURN(rc); - } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && - strncmp(name, XATTR_NAME_LMV, strlen(XATTR_NAME_LMV)) == 0 && - strlen(name) > strlen(XATTR_NAME_LMV) + 1) { - const char *op = name + strlen(XATTR_NAME_LMV) + 1; - - rc = -ENOTSUPP; - /* - * XATTR_NAME_LMV".add" is never called, but only declared, - * because lod_xattr_set_lmv() will do the addition. - */ - if (strcmp(op, "del") == 0) - rc = lod_dir_layout_delete(env, dt, buf, th); - else if (strcmp(op, "set") == 0) - rc = lod_sub_xattr_set(env, next, buf, XATTR_NAME_LMV, - fl, th); + !strcmp(name, XATTR_NAME_LMV)) { + switch (fl) { + case LU_XATTR_CREATE: + rc = lod_dir_striping_create(env, dt, NULL, NULL, th); + break; + case 0: + case LU_XATTR_REPLACE: + rc = lod_dir_layout_set(env, dt, buf, fl, th); + break; + default: + LBUG(); + } RETURN(rc); } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && - strcmp(name, XATTR_NAME_LOV) == 0) { + strcmp(name, XATTR_NAME_LOV) == 0) { struct lod_default_striping *lds = lod_lds_buf_get(env); struct lov_user_md_v1 *v1 = buf->lb_buf; char pool[LOV_MAXPOOLNAME + 1]; @@ -4693,9 +4574,9 @@ static int lod_xattr_set(const struct lu_env *env, th); RETURN(rc); } else if (S_ISREG(dt->do_lu.lo_header->loh_attr) && - (!strcmp(name, XATTR_NAME_LOV) || - !strncmp(name, XATTR_LUSTRE_LOV, - strlen(XATTR_LUSTRE_LOV)))) { + (strcmp(name, XATTR_NAME_LOV) == 0 || + strcmp(name, XATTR_LUSTRE_LOV) == 0 || + allowed_lustre_lov(name))) { /* in case of lov EA swap, just set it * if not, it is a replay so check striping match what we * already have during req replay, declare_xattr_set() @@ -5363,6 +5244,10 @@ static void lod_ah_init(const struct lu_env *env, lc->ldo_dir_stripe_count = 0; } + if (!(lc->ldo_dir_hash_type & LMV_HASH_TYPE_MASK)) + lc->ldo_dir_hash_type |= + d->lod_mdt_descs.ltd_lmv_desc.ld_pattern; + CDEBUG(D_INFO, "final dir stripe [%hu %d %u]\n", lc->ldo_dir_stripe_count, (int)lc->ldo_dir_stripe_offset, lc->ldo_dir_hash_type); @@ -5467,7 +5352,6 @@ out: EXIT; } -#define ll_do_div64(aaa,bbb) do_div((aaa), (bbb)) /** * Size initialization on late striping. * @@ -5531,14 +5415,13 @@ static int lod_declare_init_size(const struct lu_env *env, continue; LASSERT(objects != NULL && stripe_size != 0); - /* ll_do_div64(a, b) returns a % b, and a = a / b */ - ll_do_div64(size, (__u64)stripe_size); - stripe = ll_do_div64(size, (__u64)stripe_count); + do_div(size, stripe_size); + stripe = do_div(size, stripe_count); LASSERT(objects[stripe] != NULL); size = size * stripe_size; offs = attr->la_size; - size += ll_do_div64(offs, stripe_size); + size += do_div(offs, stripe_size); attr->la_valid = LA_SIZE; attr->la_size = size; @@ -6511,7 +6394,7 @@ static bool lod_sel_osts_allowed(const struct lu_env *env, if (j < lod_comp->llc_stripe_count) continue; - if (!cfs_bitmap_check(lod->lod_ost_bitmap, index)) { + if (!test_bit(index, lod->lod_ost_bitmap)) { CDEBUG(D_LAYOUT, "ost %d no longer present\n", index); ret = false; break; @@ -6525,9 +6408,9 @@ static bool lod_sel_osts_allowed(const struct lu_env *env, break; } - if (sfs->os_state & OS_STATE_ENOSPC || - sfs->os_state & OS_STATE_READONLY || - sfs->os_state & OS_STATE_DEGRADED) { + if (sfs->os_state & OS_STATFS_ENOSPC || + sfs->os_state & OS_STATFS_READONLY || + sfs->os_state & OS_STATFS_DEGRADED) { CDEBUG(D_LAYOUT, "ost %d is not availble for SEL " "extension, state %u\n", index, sfs->os_state); ret = false; @@ -7214,8 +7097,8 @@ static inline int lod_check_ost_avail(const struct lu_env *env, ost = OST_TGT(lod, idx); if (ost->ltd_statfs.os_state & - (OS_STATE_READONLY | OS_STATE_ENOSPC | OS_STATE_ENOINO | - OS_STATE_NOPRECREATE) || + (OS_STATFS_READONLY | OS_STATFS_ENOSPC | OS_STATFS_ENOINO | + OS_STATFS_NOPRECREATE) || ost->ltd_active == 0) { CDEBUG(D_LAYOUT, DFID ": mirror %d OST%d unavail, rc = %d\n", PFID(lod_object_fid(lo)), index, idx, rc); @@ -7561,7 +7444,7 @@ static int lod_declare_update_write_pending(const struct lu_env *env, if (lo->ldo_mirrors[i].lme_stale) continue; - LASSERTF(primary < 0, DFID " has multiple primary: %u / %u", + LASSERTF(primary < 0, DFID " has multiple primary: %u / %u\n", PFID(lod_object_fid(lo)), lo->ldo_mirrors[i].lme_id, lo->ldo_mirrors[primary].lme_id); @@ -7777,15 +7660,632 @@ out: RETURN(rc); } -static int lod_declare_layout_change(const struct lu_env *env, - struct dt_object *dt, struct md_layout_change *mlc, - struct thandle *th) +typedef int (*mlc_handler)(const struct lu_env *env, struct dt_object *dt, + const struct md_layout_change *mlc, + struct thandle *th); + +/** + * Attach stripes after target's for migrating directory. NB, we + * only need to declare this, the actual work is done inside + * lod_xattr_set_lmv(). + * + * \param[in] env execution environment + * \param[in] dt target object + * \param[in] mlc layout change data + * \param[in] th transaction handle + * + * \retval 0 on success + * \retval negative if failed + */ +static int lod_dir_declare_layout_attach(const struct lu_env *env, + struct dt_object *dt, + const struct md_layout_change *mlc, + struct thandle *th) { - struct lod_thread_info *info = lod_env_info(env); + struct lod_thread_info *info = lod_env_info(env); + struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev); + struct lod_tgt_descs *ltd = &lod->lod_mdt_descs; struct lod_object *lo = lod_dt_obj(dt); - int rc; - ENTRY; - + struct dt_object *next = dt_object_child(dt); + struct dt_object_format *dof = &info->lti_format; + struct lmv_mds_md_v1 *lmv = mlc->mlc_buf.lb_buf; + struct dt_object **stripes; + __u32 stripe_count = le32_to_cpu(lmv->lmv_stripe_count); + struct lu_fid *fid = &info->lti_fid; + struct lod_tgt_desc *tgt; + struct dt_object *dto; + struct dt_device *tgt_dt; + int type = LU_SEQ_RANGE_ANY; + struct dt_insert_rec *rec = &info->lti_dt_rec; + char *stripe_name = info->lti_key; + struct lu_name *sname; + struct linkea_data ldata = { NULL }; + struct lu_buf linkea_buf; + __u32 idx; + int i; + int rc; + + ENTRY; + + if (!lmv_is_sane(lmv)) + RETURN(-EINVAL); + + if (!dt_try_as_dir(env, dt)) + return -ENOTDIR; + + dof->dof_type = DFT_DIR; + + OBD_ALLOC_PTR_ARRAY(stripes, (lo->ldo_dir_stripe_count + stripe_count)); + if (!stripes) + RETURN(-ENOMEM); + + for (i = 0; i < lo->ldo_dir_stripe_count; i++) + stripes[i] = lo->ldo_stripe[i]; + + rec->rec_type = S_IFDIR; + + for (i = 0; i < stripe_count; i++) { + fid_le_to_cpu(fid, + &lmv->lmv_stripe_fids[i]); + if (!fid_is_sane(fid)) + continue; + + rc = lod_fld_lookup(env, lod, fid, &idx, &type); + if (rc) + GOTO(out, rc); + + if (idx == lod2lu_dev(lod)->ld_site->ld_seq_site->ss_node_id) { + tgt_dt = lod->lod_child; + } else { + tgt = LTD_TGT(ltd, idx); + if (tgt == NULL) + GOTO(out, rc = -ESTALE); + tgt_dt = tgt->ltd_tgt; + } + + dto = dt_locate_at(env, tgt_dt, fid, + lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev, + NULL); + if (IS_ERR(dto)) + GOTO(out, rc = PTR_ERR(dto)); + + stripes[i + lo->ldo_dir_stripe_count] = dto; + + if (!dt_try_as_dir(env, dto)) + GOTO(out, rc = -ENOTDIR); + + rc = lod_sub_declare_ref_add(env, dto, th); + if (rc) + GOTO(out, rc); + + rec->rec_fid = lu_object_fid(&dto->do_lu); + rc = lod_sub_declare_insert(env, dto, + (const struct dt_rec *)rec, + (const struct dt_key *)dot, th); + if (rc) + GOTO(out, rc); + + rc = lod_sub_declare_insert(env, dto, + (const struct dt_rec *)rec, + (const struct dt_key *)dotdot, th); + if (rc) + GOTO(out, rc); + + rc = lod_sub_declare_xattr_set(env, dto, &mlc->mlc_buf, + XATTR_NAME_LMV, 0, th); + if (rc) + GOTO(out, rc); + + snprintf(stripe_name, sizeof(info->lti_key), DFID":%u", + PFID(lu_object_fid(&dto->do_lu)), + i + lo->ldo_dir_stripe_count); + + sname = lod_name_get(env, stripe_name, strlen(stripe_name)); + rc = linkea_links_new(&ldata, &info->lti_linkea_buf, + sname, lu_object_fid(&dt->do_lu)); + if (rc) + GOTO(out, rc); + + linkea_buf.lb_buf = ldata.ld_buf->lb_buf; + linkea_buf.lb_len = ldata.ld_leh->leh_len; + rc = lod_sub_declare_xattr_set(env, dto, &linkea_buf, + XATTR_NAME_LINK, 0, th); + if (rc) + GOTO(out, rc); + + rc = lod_sub_declare_insert(env, next, + (const struct dt_rec *)rec, + (const struct dt_key *)stripe_name, + th); + if (rc) + GOTO(out, rc); + + rc = lod_sub_declare_ref_add(env, next, th); + if (rc) + GOTO(out, rc); + } + + if (lo->ldo_stripe) + OBD_FREE_PTR_ARRAY(lo->ldo_stripe, + lo->ldo_dir_stripes_allocated); + lo->ldo_stripe = stripes; + lo->ldo_dir_migrate_offset = lo->ldo_dir_stripe_count; + lo->ldo_dir_migrate_hash = le32_to_cpu(lmv->lmv_hash_type); + lo->ldo_dir_stripe_count += stripe_count; + lo->ldo_dir_stripes_allocated += stripe_count; + + /* plain directory split creates target as a plain directory, while + * after source attached as the first stripe, it becomes a striped + * directory, set correct do_index_ops, otherwise it can't be unlinked. + */ + dt->do_index_ops = &lod_striped_index_ops; + + RETURN(0); +out: + i = lo->ldo_dir_stripe_count; + while (i < lo->ldo_dir_stripe_count + stripe_count && stripes[i]) + dt_object_put(env, stripes[i++]); + + OBD_FREE_PTR_ARRAY(stripes, stripe_count + lo->ldo_dir_stripe_count); + return rc; +} + +static int lod_dir_declare_layout_detach(const struct lu_env *env, + struct dt_object *dt, + const struct md_layout_change *unused, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object *next = dt_object_child(dt); + char *stripe_name = info->lti_key; + struct dt_object *dto; + int i; + int rc = 0; + + if (!dt_try_as_dir(env, dt)) + return -ENOTDIR; + + if (!lo->ldo_dir_stripe_count) + return lod_sub_declare_delete(env, next, + (const struct dt_key *)dotdot, th); + + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + dto = lo->ldo_stripe[i]; + if (!dto) + continue; + + if (!dt_try_as_dir(env, dto)) + return -ENOTDIR; + + rc = lod_sub_declare_delete(env, dto, + (const struct dt_key *)dotdot, th); + if (rc) + return rc; + + snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", + PFID(lu_object_fid(&dto->do_lu)), i); + + rc = lod_sub_declare_delete(env, next, + (const struct dt_key *)stripe_name, th); + if (rc) + return rc; + + rc = lod_sub_declare_ref_del(env, next, th); + if (rc) + return rc; + } + + return 0; +} + +static int dt_dir_is_empty(const struct lu_env *env, + struct dt_object *obj) +{ + struct dt_it *it; + const struct dt_it_ops *iops; + int rc; + + ENTRY; + + if (!dt_try_as_dir(env, obj)) + RETURN(-ENOTDIR); + + iops = &obj->do_index_ops->dio_it; + it = iops->init(env, obj, LUDA_64BITHASH); + if (IS_ERR(it)) + RETURN(PTR_ERR(it)); + + rc = iops->get(env, it, (const struct dt_key *)""); + if (rc > 0) { + int i; + + for (rc = 0, i = 0; rc == 0 && i < 3; ++i) + rc = iops->next(env, it); + if (!rc) + rc = -ENOTEMPTY; + else if (rc == 1) + rc = 0; + } else if (!rc) { + /* Huh? Index contains no zero key? */ + rc = -EIO; + } + + iops->put(env, it); + iops->fini(env, it); + + RETURN(rc); +} + +static int lod_dir_declare_layout_shrink(const struct lu_env *env, + struct dt_object *dt, + const struct md_layout_change *mlc, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object *next = dt_object_child(dt); + struct lmv_user_md *lmu = mlc->mlc_buf.lb_buf; + __u32 final_stripe_count; + char *stripe_name = info->lti_key; + struct lu_buf *lmv_buf = &info->lti_buf; + struct dt_object *dto; + int i; + int rc; + + LASSERT(lmu); + + if (!dt_try_as_dir(env, dt)) + return -ENOTDIR; + + /* shouldn't be called on plain directory */ + LASSERT(lo->ldo_dir_stripe_count); + + lmv_buf->lb_buf = &info->lti_lmv.lmv_md_v1; + lmv_buf->lb_len = sizeof(info->lti_lmv.lmv_md_v1); + + final_stripe_count = le32_to_cpu(lmu->lum_stripe_count); + LASSERT(final_stripe_count && + final_stripe_count < lo->ldo_dir_stripe_count); + + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + dto = lo->ldo_stripe[i]; + if (!dto) + continue; + + if (i < final_stripe_count) { + if (final_stripe_count == 1) + continue; + + rc = lod_sub_declare_xattr_set(env, dto, lmv_buf, + XATTR_NAME_LMV, + LU_XATTR_REPLACE, th); + if (rc) + return rc; + + continue; + } + + rc = dt_dir_is_empty(env, dto); + if (rc < 0) + return rc; + + rc = lod_sub_declare_ref_del(env, dto, th); + if (rc) + return rc; + + rc = lod_sub_declare_destroy(env, dto, th); + if (rc) + return rc; + + snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", + PFID(lu_object_fid(&dto->do_lu)), i); + + rc = lod_sub_declare_delete(env, next, + (const struct dt_key *)stripe_name, th); + if (rc) + return rc; + + rc = lod_sub_declare_ref_del(env, next, th); + if (rc) + return rc; + } + + rc = lod_sub_declare_xattr_set(env, next, lmv_buf, XATTR_NAME_LMV, + LU_XATTR_REPLACE, th); + return rc; +} + +/** + * Allocate stripes for split directory. + * + * \param[in] env execution environment + * \param[in] dt target object + * \param[in] mlc layout change data + * \param[in] th transaction handle + * + * \retval 0 on success + * \retval negative if failed + */ +static int lod_dir_declare_layout_split(const struct lu_env *env, + struct dt_object *dt, + const struct md_layout_change *mlc, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev); + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object_format *dof = &info->lti_format; + struct lmv_user_md_v1 *lum = mlc->mlc_spec->u.sp_ea.eadata; + struct dt_object **stripes; + u32 stripe_count; + u32 saved_count; + int i; + int rc; + + ENTRY; + + LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC); + LASSERT(le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT); + + saved_count = lo->ldo_dir_stripes_allocated; + stripe_count = le32_to_cpu(lum->lum_stripe_count); + if (stripe_count <= saved_count) + RETURN(-EINVAL); + + dof->dof_type = DFT_DIR; + + OBD_ALLOC(stripes, sizeof(*stripes) * stripe_count); + if (!stripes) + RETURN(-ENOMEM); + + for (i = 0; i < lo->ldo_dir_stripes_allocated; i++) + stripes[i] = lo->ldo_stripe[i]; + + lod_qos_statfs_update(env, lod, &lod->lod_mdt_descs); + rc = lod_mdt_alloc_qos(env, lo, stripes, saved_count, stripe_count); + if (rc == -EAGAIN) + rc = lod_mdt_alloc_rr(env, lo, stripes, saved_count, + stripe_count); + if (rc < 0) { + OBD_FREE(stripes, sizeof(*stripes) * stripe_count); + RETURN(rc); + } + + LASSERT(rc > saved_count); + OBD_FREE(lo->ldo_stripe, + sizeof(*stripes) * lo->ldo_dir_stripes_allocated); + lo->ldo_stripe = stripes; + lo->ldo_dir_striped = 1; + lo->ldo_dir_stripe_count = rc; + lo->ldo_dir_stripes_allocated = stripe_count; + lo->ldo_dir_split_hash = lo->ldo_dir_hash_type; + lo->ldo_dir_hash_type = le32_to_cpu(lum->lum_hash_type); + if (!lmv_is_known_hash_type(lo->ldo_dir_hash_type)) + lo->ldo_dir_hash_type = + lod->lod_mdt_descs.ltd_lmv_desc.ld_pattern; + lo->ldo_dir_hash_type |= LMV_HASH_FLAG_SPLIT | LMV_HASH_FLAG_MIGRATION; + lo->ldo_dir_split_offset = saved_count; + lo->ldo_dir_layout_version++; + lo->ldo_dir_stripe_loaded = 1; + + rc = lod_dir_declare_create_stripes(env, dt, mlc->mlc_attr, dof, th); + if (rc) + lod_striping_free(env, lo); + + RETURN(rc); +} + +/* + * detach all stripes from dir master object, NB, stripes are not destroyed, but + * deleted from it's parent namespace, this function is called in two places: + * 1. mdd_migrate_mdt() detach stripes from source, and attach them to + * target. + * 2. mdd_dir_layout_update() detach stripe before turning 1-stripe directory to + * a plain directory. + * + * \param[in] env execution environment + * \param[in] dt target object + * \param[in] mlc layout change data + * \param[in] th transaction handle + * + * \retval 0 on success + * \retval negative if failed + */ +static int lod_dir_layout_detach(const struct lu_env *env, + struct dt_object *dt, + const struct md_layout_change *mlc, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object *next = dt_object_child(dt); + char *stripe_name = info->lti_key; + struct dt_object *dto; + int i; + int rc = 0; + + ENTRY; + + if (!lo->ldo_dir_stripe_count) { + /* plain directory delete .. */ + rc = lod_sub_delete(env, next, + (const struct dt_key *)dotdot, th); + RETURN(rc); + } + + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + dto = lo->ldo_stripe[i]; + if (!dto) + continue; + + rc = lod_sub_delete(env, dto, + (const struct dt_key *)dotdot, th); + if (rc) + break; + + snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", + PFID(lu_object_fid(&dto->do_lu)), i); + + rc = lod_sub_delete(env, next, + (const struct dt_key *)stripe_name, th); + if (rc) + break; + + rc = lod_sub_ref_del(env, next, th); + if (rc) + break; + } + + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + dto = lo->ldo_stripe[i]; + if (dto) + dt_object_put(env, dto); + } + OBD_FREE_PTR_ARRAY(lo->ldo_stripe, lo->ldo_dir_stripes_allocated); + lo->ldo_stripe = NULL; + lo->ldo_dir_stripes_allocated = 0; + lo->ldo_dir_stripe_count = 0; + + RETURN(rc); +} + +static int lod_dir_layout_shrink(const struct lu_env *env, + struct dt_object *dt, + const struct md_layout_change *mlc, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_object *lo = lod_dt_obj(dt); + struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); + struct dt_object *next = dt_object_child(dt); + struct lmv_user_md *lmu = mlc->mlc_buf.lb_buf; + __u32 final_stripe_count; + char *stripe_name = info->lti_key; + struct dt_object *dto; + struct lu_buf *lmv_buf = &info->lti_buf; + struct lmv_mds_md_v1 *lmv = &info->lti_lmv.lmv_md_v1; + u32 mdtidx; + int type = LU_SEQ_RANGE_ANY; + int i; + int rc; + + ENTRY; + + final_stripe_count = le32_to_cpu(lmu->lum_stripe_count); + + lmv_buf->lb_buf = lmv; + lmv_buf->lb_len = sizeof(*lmv); + lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE); + lmv->lmv_stripe_count = cpu_to_le32(final_stripe_count); + lmv->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type) & + cpu_to_le32(LMV_HASH_TYPE_MASK); + lmv->lmv_layout_version = + cpu_to_le32(lo->ldo_dir_layout_version + 1); + + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + dto = lo->ldo_stripe[i]; + if (!dto) + continue; + + if (i < final_stripe_count) { + /* if only one stripe left, no need to update + * LMV because this stripe will replace master + * object and act as a plain directory. + */ + if (final_stripe_count == 1) + continue; + + + rc = lod_fld_lookup(env, lod, + lu_object_fid(&dto->do_lu), + &mdtidx, &type); + if (rc) + RETURN(rc); + + lmv->lmv_master_mdt_index = cpu_to_le32(mdtidx); + rc = lod_sub_xattr_set(env, dto, lmv_buf, + XATTR_NAME_LMV, + LU_XATTR_REPLACE, th); + if (rc) + RETURN(rc); + + continue; + } + + dt_write_lock(env, dto, DT_TGT_CHILD); + rc = lod_sub_ref_del(env, dto, th); + dt_write_unlock(env, dto); + if (rc) + RETURN(rc); + + rc = lod_sub_destroy(env, dto, th); + if (rc) + RETURN(rc); + + snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", + PFID(lu_object_fid(&dto->do_lu)), i); + + rc = lod_sub_delete(env, next, + (const struct dt_key *)stripe_name, th); + if (rc) + RETURN(rc); + + rc = lod_sub_ref_del(env, next, th); + if (rc) + RETURN(rc); + } + + rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu), &mdtidx, + &type); + if (rc) + RETURN(rc); + + lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_V1); + lmv->lmv_master_mdt_index = cpu_to_le32(mdtidx); + rc = lod_sub_xattr_set(env, next, lmv_buf, XATTR_NAME_LMV, + LU_XATTR_REPLACE, th); + if (rc) + RETURN(rc); + + for (i = final_stripe_count; i < lo->ldo_dir_stripe_count; i++) { + dto = lo->ldo_stripe[i]; + if (dto) + dt_object_put(env, dto); + } + lo->ldo_dir_stripe_count = final_stripe_count; + + RETURN(rc); +} + +static mlc_handler dir_mlc_declare_ops[MD_LAYOUT_MAX] = { + [MD_LAYOUT_ATTACH] = lod_dir_declare_layout_attach, + [MD_LAYOUT_DETACH] = lod_dir_declare_layout_detach, + [MD_LAYOUT_SHRINK] = lod_dir_declare_layout_shrink, + [MD_LAYOUT_SPLIT] = lod_dir_declare_layout_split, +}; + +static mlc_handler dir_mlc_ops[MD_LAYOUT_MAX] = { + [MD_LAYOUT_DETACH] = lod_dir_layout_detach, + [MD_LAYOUT_SHRINK] = lod_dir_layout_shrink, +}; + +static int lod_declare_layout_change(const struct lu_env *env, + struct dt_object *dt, struct md_layout_change *mlc, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_object *lo = lod_dt_obj(dt); + int rc; + + ENTRY; + + if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) { + LASSERT(dir_mlc_declare_ops[mlc->mlc_opc]); + rc = dir_mlc_declare_ops[mlc->mlc_opc](env, dt, mlc, th); + RETURN(rc); + } + if (!S_ISREG(dt->do_lu.lo_header->loh_attr) || !dt_object_exists(dt) || dt_object_remote(dt_object_child(dt))) RETURN(-EINVAL); @@ -7833,13 +8333,21 @@ static int lod_layout_change(const struct lu_env *env, struct dt_object *dt, struct lod_object *lo = lod_dt_obj(dt); int rc; + ENTRY; + + if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) { + LASSERT(dir_mlc_ops[mlc->mlc_opc]); + rc = dir_mlc_ops[mlc->mlc_opc](env, dt, mlc, th); + RETURN(rc); + } + rc = lod_striped_create(env, dt, attr, NULL, th); if (!rc && layout_attr->la_valid & LA_LAYOUT_VERSION) { layout_attr->la_layout_version |= lo->ldo_layout_gen; rc = lod_attr_set(env, dt, layout_attr, th); } - return rc; + RETURN(rc); } struct dt_object_operations lod_obj_ops = { @@ -7994,7 +8502,7 @@ static int lod_object_init(const struct lu_env *env, struct lu_object *lo, if (ltd != NULL) { if (ltd->ltd_tgts_size > idx && - cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx)) { + test_bit(idx, ltd->ltd_tgt_bitmap)) { tgt = LTD_TGT(ltd, idx); LASSERT(tgt != NULL); @@ -8118,13 +8626,11 @@ void lod_striping_free_nolock(const struct lu_env *env, struct lod_object *lo) lu_object_put(env, &lod_comp->llc_stripe[j]->do_lu); } - OBD_FREE(lod_comp->llc_stripe, - sizeof(struct dt_object *) * - lod_comp->llc_stripes_allocated); + OBD_FREE_PTR_ARRAY(lod_comp->llc_stripe, + lod_comp->llc_stripes_allocated); lod_comp->llc_stripe = NULL; - OBD_FREE(lod_comp->llc_ost_indices, - sizeof(__u32) * - lod_comp->llc_stripes_allocated); + OBD_FREE_PTR_ARRAY(lod_comp->llc_ost_indices, + lod_comp->llc_stripes_allocated); lod_comp->llc_ost_indices = NULL; lod_comp->llc_stripes_allocated = 0; } @@ -8153,6 +8659,7 @@ static void lod_object_free(const struct lu_env *env, struct lu_object *o) /* release all underlying object pinned */ lod_striping_free(env, lo); lu_object_fini(o); + /* lo doesn't contain a lu_object_header, so we don't need call_rcu */ OBD_SLAB_FREE_PTR(lo, lod_object_kmem); }