X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Flod%2Flod_object.c;h=316d7ff00d01956cc6833314b4496b34d0839ef5;hp=bef2d6d044b262026da2ccd7d9c655755ce934a4;hb=2e2b16c28bcf4048ba4f34129b7fb91c36b55a71;hpb=03a4431dac1c59fa2b98501fc7dfb8451a0a2af8 diff --git a/lustre/lod/lod_object.c b/lustre/lod/lod_object.c index bef2d6d..316d7ff 100644 --- a/lustre/lod/lod_object.c +++ b/lustre/lod/lod_object.c @@ -1560,13 +1560,13 @@ static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt, /* The on-disk LMV EA only contains header, but the * returned LMV EA size should contain the space for * the FIDs of all shards of the striped directory. */ - if (lmv_is_sane(lmv1)) + if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_V1) rc = lmv_mds_md_size( - le32_to_cpu(lmv1->lmv_stripe_count), - le32_to_cpu(lmv1->lmv_magic)); + le32_to_cpu(lmv1->lmv_stripe_count), + le32_to_cpu(lmv1->lmv_magic)); } else { - lfm = buf->lb_buf; - if (le32_to_cpu(lfm->lfm_magic) == LMV_MAGIC_FOREIGN) + lmv1 = buf->lb_buf; + if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1) RETURN(rc); if (rc != sizeof(*lmv1)) @@ -1712,7 +1712,8 @@ static int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt, lmm1->lmv_magic = cpu_to_le32(LMV_MAGIC); lmm1->lmv_stripe_count = cpu_to_le32(stripe_count); lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type); - if (lo->ldo_dir_hash_type & LMV_HASH_FLAG_MIGRATION) { + lmm1->lmv_layout_version = cpu_to_le32(lo->ldo_dir_layout_version); + if (lod_is_layout_changing(lo)) { lmm1->lmv_migrate_hash = cpu_to_le32(lo->ldo_dir_migrate_hash); lmm1->lmv_migrate_offset = cpu_to_le32(lo->ldo_dir_migrate_offset); @@ -1818,6 +1819,8 @@ out: lo->ldo_dir_stripe_count = le32_to_cpu(lmv1->lmv_stripe_count); lo->ldo_dir_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count); lo->ldo_dir_layout_version = le32_to_cpu(lmv1->lmv_layout_version); + lo->ldo_dir_migrate_offset = le32_to_cpu(lmv1->lmv_migrate_offset); + lo->ldo_dir_migrate_hash = le32_to_cpu(lmv1->lmv_migrate_hash); lo->ldo_dir_hash_type = le32_to_cpu(lmv1->lmv_hash_type); if (rc != 0) lod_striping_free_nolock(env, lo); @@ -1889,31 +1892,74 @@ static int lod_dir_declare_create_stripes(const struct lu_env *env, if (!dto) continue; - rc = lod_sub_declare_create(env, dto, attr, NULL, dof, th); - if (rc != 0) - GOTO(out, rc); + /* directory split skip create for existing stripes */ + if (!(lod_is_splitting(lo) && i < lo->ldo_dir_split_offset)) { + rc = lod_sub_declare_create(env, dto, attr, NULL, dof, + th); + if (rc != 0) + GOTO(out, rc); - if (!dt_try_as_dir(env, dto)) - GOTO(out, rc = -EINVAL); + if (!dt_try_as_dir(env, dto)) + GOTO(out, rc = -EINVAL); - rc = lod_sub_declare_ref_add(env, dto, th); - if (rc != 0) - GOTO(out, rc); + rc = lod_sub_declare_ref_add(env, dto, th); + if (rc != 0) + GOTO(out, rc); - rec->rec_fid = lu_object_fid(&dto->do_lu); - rc = lod_sub_declare_insert(env, dto, - (const struct dt_rec *)rec, - (const struct dt_key *)dot, th); - if (rc != 0) - GOTO(out, rc); + rec->rec_fid = lu_object_fid(&dto->do_lu); + rc = lod_sub_declare_insert(env, dto, + (const struct dt_rec *)rec, + (const struct dt_key *)dot, + th); + if (rc != 0) + GOTO(out, rc); - /* master stripe FID will be put to .. */ - rec->rec_fid = lu_object_fid(&dt->do_lu); - rc = lod_sub_declare_insert(env, dto, - (const struct dt_rec *)rec, - (const struct dt_key *)dotdot, th); - if (rc != 0) - GOTO(out, rc); + /* master stripe FID will be put to .. */ + rec->rec_fid = lu_object_fid(&dt->do_lu); + rc = lod_sub_declare_insert(env, dto, + (const struct dt_rec *)rec, + (const struct dt_key *)dotdot, + th); + if (rc != 0) + GOTO(out, rc); + + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && + cfs_fail_val == i) + snprintf(stripe_name, sizeof(info->lti_key), + DFID":%u", + PFID(lu_object_fid(&dto->do_lu)), + i + 1); + else + snprintf(stripe_name, sizeof(info->lti_key), + DFID":%u", + PFID(lu_object_fid(&dto->do_lu)), i); + + sname = lod_name_get(env, stripe_name, + strlen(stripe_name)); + rc = linkea_links_new(&ldata, &info->lti_linkea_buf, + sname, lu_object_fid(&dt->do_lu)); + if (rc != 0) + GOTO(out, rc); + + linkea_buf.lb_buf = ldata.ld_buf->lb_buf; + linkea_buf.lb_len = ldata.ld_leh->leh_len; + rc = lod_sub_declare_xattr_set(env, dto, &linkea_buf, + XATTR_NAME_LINK, 0, th); + if (rc != 0) + GOTO(out, rc); + + rec->rec_fid = lu_object_fid(&dto->do_lu); + rc = lod_sub_declare_insert(env, dt_object_child(dt), + (const struct dt_rec *)rec, + (const struct dt_key *)stripe_name, th); + if (rc != 0) + GOTO(out, rc); + + rc = lod_sub_declare_ref_add(env, dt_object_child(dt), + th); + if (rc != 0) + GOTO(out, rc); + } if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) || cfs_fail_val != i) { @@ -1929,39 +1975,6 @@ static int lod_dir_declare_create_stripes(const struct lu_env *env, if (rc != 0) GOTO(out, rc); } - - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && - cfs_fail_val == i) - snprintf(stripe_name, sizeof(info->lti_key), DFID":%u", - PFID(lu_object_fid(&dto->do_lu)), i + 1); - else - snprintf(stripe_name, sizeof(info->lti_key), DFID":%u", - PFID(lu_object_fid(&dto->do_lu)), i); - - sname = lod_name_get(env, stripe_name, strlen(stripe_name)); - rc = linkea_links_new(&ldata, &info->lti_linkea_buf, - sname, lu_object_fid(&dt->do_lu)); - if (rc != 0) - GOTO(out, rc); - - linkea_buf.lb_buf = ldata.ld_buf->lb_buf; - linkea_buf.lb_len = ldata.ld_leh->leh_len; - rc = lod_sub_declare_xattr_set(env, dto, &linkea_buf, - XATTR_NAME_LINK, 0, th); - if (rc != 0) - GOTO(out, rc); - - rec->rec_fid = lu_object_fid(&dto->do_lu); - rc = lod_sub_declare_insert(env, dt_object_child(dt), - (const struct dt_rec *)rec, - (const struct dt_key *)stripe_name, - th); - if (rc != 0) - GOTO(out, rc); - - rc = lod_sub_declare_ref_add(env, dt_object_child(dt), th); - if (rc != 0) - GOTO(out, rc); } rc = lod_sub_declare_xattr_set(env, dt_object_child(dt), @@ -2335,6 +2348,7 @@ static int lod_dir_layout_set(const struct lu_env *env, { struct dt_object *next = dt_object_child(dt); struct lod_object *lo = lod_dt_obj(dt); + struct lod_device *lod = lu2lod_dev(lod2lu_obj(lo)->lo_dev); struct lmv_mds_md_v1 *lmv = buf->lb_buf; struct lmv_mds_md_v1 *slave_lmv; struct lu_buf slave_buf; @@ -2343,10 +2357,29 @@ static int lod_dir_layout_set(const struct lu_env *env, ENTRY; + if (!lmv_is_sane2(lmv)) + RETURN(-EINVAL); + + /* adjust hash for dir merge, which may not be set in user command */ + if (lmv_is_merging(lmv) && !lmv->lmv_migrate_hash) + lmv->lmv_merge_hash = + lod->lod_mdt_descs.ltd_lmv_desc.ld_pattern; + + LMV_DEBUG(D_INFO, lmv, "set"); + rc = lod_sub_xattr_set(env, next, buf, XATTR_NAME_LMV, fl, th); if (rc) RETURN(rc); + /* directory restripe may update stripe LMV directly */ + if (!lo->ldo_dir_stripe_count) + RETURN(0); + + lo->ldo_dir_hash_type = le32_to_cpu(lmv->lmv_hash_type); + lo->ldo_dir_migrate_offset = le32_to_cpu(lmv->lmv_migrate_offset); + lo->ldo_dir_migrate_hash = le32_to_cpu(lmv->lmv_migrate_hash); + lo->ldo_dir_layout_version = le32_to_cpu(lmv->lmv_layout_version); + OBD_ALLOC_PTR(slave_lmv); if (!slave_lmv) RETURN(-ENOMEM); @@ -2368,7 +2401,6 @@ static int lod_dir_layout_set(const struct lu_env *env, break; } - lod_striping_free(env, lod_dt_obj(dt)); OBD_FREE_PTR(slave_lmv); RETURN(rc); @@ -3743,9 +3775,15 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, if (i && OBD_FAIL_CHECK(OBD_FAIL_MDS_STRIPE_CREATE)) continue; - /* if it's source stripe of migrating directory, don't create */ - if (!((lo->ldo_dir_hash_type & LMV_HASH_FLAG_MIGRATION) && - i >= lo->ldo_dir_migrate_offset)) { + /* don't create stripe if: + * 1. it's source stripe of migrating directory + * 2. it's existed stripe of splitting directory + */ + if ((lod_is_migrating(lo) && i >= lo->ldo_dir_migrate_offset) || + (lod_is_splitting(lo) && i < lo->ldo_dir_split_offset)) { + if (!dt_object_exists(dto)) + GOTO(out, rc = -EINVAL); + } else { dt_write_lock(env, dto, DT_TGT_CHILD); rc = lod_sub_create(env, dto, attr, NULL, dof, th); if (rc != 0) { @@ -3766,12 +3804,6 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, GOTO(out, rc); } - rec->rec_fid = lu_object_fid(&dt->do_lu); - rc = lod_sub_insert(env, dto, (struct dt_rec *)rec, - (const struct dt_key *)dotdot, th); - if (rc != 0) - GOTO(out, rc); - if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) || cfs_fail_val != i) { if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_LMV) && @@ -3788,6 +3820,21 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, GOTO(out, rc); } + /* don't insert stripe if it's existed stripe of splitting + * directory (this directory is striped). + * NB, plain directory will insert itself as the first + * stripe in target. + */ + if (lod_is_splitting(lo) && lo->ldo_dir_split_offset > 1 && + lo->ldo_dir_split_offset > i) + continue; + + rec->rec_fid = lu_object_fid(&dt->do_lu); + rc = lod_sub_insert(env, dto, (struct dt_rec *)rec, + (const struct dt_key *)dotdot, th); + if (rc != 0) + GOTO(out, rc); + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && cfs_fail_val == i) snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", @@ -7727,7 +7774,12 @@ static int lod_dir_declare_layout_attach(const struct lu_env *env, lo->ldo_dir_migrate_hash = le32_to_cpu(lmv->lmv_hash_type); lo->ldo_dir_stripe_count += stripe_count; lo->ldo_dir_stripes_allocated += stripe_count; - lo->ldo_dir_hash_type |= LMV_HASH_FLAG_MIGRATION; + + /* plain directory split creates target as a plain directory, while + * after source attached as the first stripe, it becomes a striped + * directory, set correct do_index_ops, otherwise it can't be unlinked. + */ + dt->do_index_ops = &lod_striped_index_ops; RETURN(0); out: @@ -7906,6 +7958,86 @@ static int lod_dir_declare_layout_shrink(const struct lu_env *env, return rc; } +/** + * Allocate stripes for split directory. + * + * \param[in] env execution environment + * \param[in] dt target object + * \param[in] mlc layout change data + * \param[in] th transaction handle + * + * \retval 0 on success + * \retval negative if failed + */ +static int lod_dir_declare_layout_split(const struct lu_env *env, + struct dt_object *dt, + const struct md_layout_change *mlc, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev); + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object_format *dof = &info->lti_format; + struct lmv_user_md_v1 *lum = mlc->mlc_spec->u.sp_ea.eadata; + struct dt_object **stripes; + u32 stripe_count; + u32 saved_count; + int i; + int rc; + + ENTRY; + + LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC); + LASSERT(le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT); + + saved_count = lo->ldo_dir_stripes_allocated; + stripe_count = le32_to_cpu(lum->lum_stripe_count); + if (stripe_count <= saved_count) + RETURN(-EINVAL); + + dof->dof_type = DFT_DIR; + + OBD_ALLOC(stripes, sizeof(*stripes) * stripe_count); + if (!stripes) + RETURN(-ENOMEM); + + for (i = 0; i < lo->ldo_dir_stripes_allocated; i++) + stripes[i] = lo->ldo_stripe[i]; + + lod_qos_statfs_update(env, lod, &lod->lod_mdt_descs); + rc = lod_mdt_alloc_qos(env, lo, stripes, saved_count, stripe_count); + if (rc == -EAGAIN) + rc = lod_mdt_alloc_rr(env, lo, stripes, saved_count, + stripe_count); + if (rc < 0) { + OBD_FREE(stripes, sizeof(*stripes) * stripe_count); + RETURN(rc); + } + + LASSERT(rc > saved_count); + OBD_FREE(lo->ldo_stripe, + sizeof(*stripes) * lo->ldo_dir_stripes_allocated); + lo->ldo_stripe = stripes; + lo->ldo_dir_striped = 1; + lo->ldo_dir_stripe_count = rc; + lo->ldo_dir_stripes_allocated = stripe_count; + lo->ldo_dir_split_hash = lo->ldo_dir_hash_type; + lo->ldo_dir_hash_type = le32_to_cpu(lum->lum_hash_type); + if (!lmv_is_known_hash_type(lo->ldo_dir_hash_type)) + lo->ldo_dir_hash_type = + lod->lod_mdt_descs.ltd_lmv_desc.ld_pattern; + lo->ldo_dir_hash_type |= LMV_HASH_FLAG_SPLIT | LMV_HASH_FLAG_MIGRATION; + lo->ldo_dir_split_offset = saved_count; + lo->ldo_dir_layout_version++; + lo->ldo_dir_stripe_loaded = 1; + + rc = lod_dir_declare_create_stripes(env, dt, mlc->mlc_attr, dof, th); + if (rc) + lod_striping_free(env, lo); + + RETURN(rc); +} + /* * detach all stripes from dir master object, NB, stripes are not destroyed, but * deleted from it's parent namespace, this function is called in two places: @@ -8093,6 +8225,7 @@ static mlc_handler dir_mlc_declare_ops[MD_LAYOUT_MAX] = { [MD_LAYOUT_ATTACH] = lod_dir_declare_layout_attach, [MD_LAYOUT_DETACH] = lod_dir_declare_layout_detach, [MD_LAYOUT_SHRINK] = lod_dir_declare_layout_shrink, + [MD_LAYOUT_SPLIT] = lod_dir_declare_layout_split, }; static mlc_handler dir_mlc_ops[MD_LAYOUT_MAX] = {