X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Flod%2Flod_object.c;h=f25a4903b0f242beb2b07454f5e646cd7e38e693;hp=c4a405a378d2c679c537d2ca642540c0ed6c498b;hb=c1d0a355a6a64ec97c9f56c38ba036e5e50cd8c4;hpb=a277952c65d4aad1abb9ac9f759af16a43902068 diff --git a/lustre/lod/lod_object.c b/lustre/lod/lod_object.c index c4a405a..f25a490 100644 --- a/lustre/lod/lod_object.c +++ b/lustre/lod/lod_object.c @@ -98,9 +98,9 @@ static int lod_declare_insert(const struct lu_env *env, struct dt_object *dt, */ static int lod_insert(const struct lu_env *env, struct dt_object *dt, const struct dt_rec *rec, const struct dt_key *key, - struct thandle *th, int ign) + struct thandle *th) { - return lod_sub_insert(env, dt_object_child(dt), rec, key, th, ign); + return lod_sub_insert(env, dt_object_child(dt), rec, key, th); } /** @@ -377,15 +377,24 @@ static struct dt_index_operations lod_index_ops = { static struct dt_it *lod_striped_it_init(const struct lu_env *env, struct dt_object *dt, __u32 attr) { - struct lod_object *lo = lod_dt_obj(dt); - struct dt_object *next; - struct lod_it *it = &lod_env_info(env)->lti_it; - struct dt_it *it_next; - ENTRY; + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object *next; + struct lod_it *it = &lod_env_info(env)->lti_it; + struct dt_it *it_next; + __u16 index = 0; LASSERT(lo->ldo_dir_stripe_count > 0); - next = lo->ldo_stripe[0]; - LASSERT(next != NULL); + + do { + next = lo->ldo_stripe[index]; + if (next && dt_object_exists(next)) + break; + } while (++index < lo->ldo_dir_stripe_count); + + /* no valid stripe */ + if (!next || !dt_object_exists(next)) + return ERR_PTR(-ENODEV); + LASSERT(next->do_index_ops != NULL); it_next = next->do_index_ops->dio_it.init(env, next, attr); @@ -398,7 +407,7 @@ static struct dt_it *lod_striped_it_init(const struct lu_env *env, * additional ones */ LASSERT(it->lit_obj == NULL); - it->lit_stripe_index = 0; + it->lit_stripe_index = index; it->lit_attr = attr; it->lit_it = it_next; it->lit_obj = dt; @@ -433,10 +442,10 @@ static void lod_striped_it_fini(const struct lu_env *env, struct dt_it *di) LOD_CHECK_STRIPED_IT(env, it, lo); next = lo->ldo_stripe[it->lit_stripe_index]; - LASSERT(next != NULL); - LASSERT(next->do_index_ops != NULL); - - next->do_index_ops->dio_it.fini(env, it->lit_it); + if (next) { + LASSERT(next->do_index_ops != NULL); + next->do_index_ops->dio_it.fini(env, it->lit_it); + } } /* the iterator not in use any more */ @@ -457,15 +466,15 @@ static void lod_striped_it_fini(const struct lu_env *env, struct dt_it *di) static int lod_striped_it_get(const struct lu_env *env, struct dt_it *di, const struct dt_key *key) { - const struct lod_it *it = (const struct lod_it *)di; - struct lod_object *lo = lod_dt_obj(it->lit_obj); - struct dt_object *next; - ENTRY; + const struct lod_it *it = (const struct lod_it *)di; + struct lod_object *lo = lod_dt_obj(it->lit_obj); + struct dt_object *next; LOD_CHECK_STRIPED_IT(env, it, lo); next = lo->ldo_stripe[it->lit_stripe_index]; LASSERT(next != NULL); + LASSERT(dt_object_exists(next)); LASSERT(next->do_index_ops != NULL); return next->do_index_ops->dio_it.get(env, it->lit_it, key); @@ -480,9 +489,16 @@ static int lod_striped_it_get(const struct lu_env *env, struct dt_it *di, */ static void lod_striped_it_put(const struct lu_env *env, struct dt_it *di) { - struct lod_it *it = (struct lod_it *)di; - struct lod_object *lo = lod_dt_obj(it->lit_obj); - struct dt_object *next; + struct lod_it *it = (struct lod_it *)di; + struct lod_object *lo = lod_dt_obj(it->lit_obj); + struct dt_object *next; + + /* + * If lit_it == NULL, then it means the sub_it has been finished, + * which only happens in failure cases, see lod_striped_it_next() + */ + if (!it->lit_it) + return; LOD_CHECK_STRIPED_IT(env, it, lo); @@ -503,17 +519,20 @@ static void lod_striped_it_put(const struct lu_env *env, struct dt_it *di) */ static int lod_striped_it_next(const struct lu_env *env, struct dt_it *di) { - struct lod_it *it = (struct lod_it *)di; - struct lod_object *lo = lod_dt_obj(it->lit_obj); - struct dt_object *next; - struct dt_it *it_next; - int rc; + struct lod_it *it = (struct lod_it *)di; + struct lod_object *lo = lod_dt_obj(it->lit_obj); + struct dt_object *next; + struct dt_it *it_next; + __u32 index; + int rc; + ENTRY; LOD_CHECK_STRIPED_IT(env, it, lo); next = lo->ldo_stripe[it->lit_stripe_index]; LASSERT(next != NULL); + LASSERT(dt_object_exists(next)); LASSERT(next->do_index_ops != NULL); again: rc = next->do_index_ops->dio_it.next(env, it->lit_it); @@ -546,33 +565,44 @@ again: RETURN(rc); } - /* go to next stripe */ - if (it->lit_stripe_index + 1 >= lo->ldo_dir_stripe_count) - RETURN(1); - - it->lit_stripe_index++; - next->do_index_ops->dio_it.put(env, it->lit_it); next->do_index_ops->dio_it.fini(env, it->lit_it); it->lit_it = NULL; - next = lo->ldo_stripe[it->lit_stripe_index]; - LASSERT(next != NULL); - rc = next->do_ops->do_index_try(env, next, &dt_directory_features); - if (rc != 0) - RETURN(rc); + /* go to next stripe */ + index = it->lit_stripe_index; + while (++index < lo->ldo_dir_stripe_count) { + next = lo->ldo_stripe[index]; + if (!next) + continue; - LASSERT(next->do_index_ops != NULL); + if (!dt_object_exists(next)) + continue; + + rc = next->do_ops->do_index_try(env, next, + &dt_directory_features); + if (rc != 0) + RETURN(rc); + + LASSERT(next->do_index_ops != NULL); + + it_next = next->do_index_ops->dio_it.init(env, next, + it->lit_attr); + if (IS_ERR(it_next)) + RETURN(PTR_ERR(it_next)); + + rc = next->do_index_ops->dio_it.get(env, it_next, + (const struct dt_key *)""); + if (rc <= 0) + RETURN(rc == 0 ? -EIO : rc); - it_next = next->do_index_ops->dio_it.init(env, next, it->lit_attr); - if (!IS_ERR(it_next)) { it->lit_it = it_next; + it->lit_stripe_index = index; goto again; - } else { - rc = PTR_ERR(it_next); + } - RETURN(rc); + RETURN(1); } /** @@ -776,14 +806,9 @@ int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo, int rc; ENTRY; - /* If it is not a striped directory, then load nothing. */ if (magic != LMV_MAGIC_V1) RETURN(0); - /* If it is in migration (or failure), then load nothing. */ - if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION) - RETURN(0); - stripes = le32_to_cpu(lmv1->lmv_stripe_count); if (stripes < 1) RETURN(0); @@ -961,7 +986,9 @@ static int lod_index_try(const struct lu_env *env, struct dt_object *dt, int i; for (i = 0; i < lo->ldo_dir_stripe_count; i++) { - if (dt_object_exists(lo->ldo_stripe[i]) == 0) + if (!lo->ldo_stripe[i]) + continue; + if (!dt_object_exists(lo->ldo_stripe[i])) continue; rc = lo->ldo_stripe[i]->do_ops->do_index_try(env, lo->ldo_stripe[i], feat); @@ -1049,12 +1076,16 @@ static int lod_attr_get(const struct lu_env *env, } static inline void lod_adjust_stripe_info(struct lod_layout_component *comp, - struct lov_desc *desc) + struct lov_desc *desc, + int append_stripes) { if (comp->llc_pattern != LOV_PATTERN_MDT) { - if (!comp->llc_stripe_count) + if (append_stripes) { + comp->llc_stripe_count = append_stripes; + } else if (!comp->llc_stripe_count) { comp->llc_stripe_count = desc->ld_default_stripe_count; + } } if (comp->llc_stripe_size <= 0) comp->llc_stripe_size = desc->ld_default_stripe_size; @@ -1246,6 +1277,8 @@ static int lod_declare_attr_set(const struct lu_env *env, for (i = 0; i < lo->ldo_dir_stripe_count; i++) { if (lo->ldo_stripe[i] == NULL) continue; + if (!dt_object_exists(lo->ldo_stripe[i])) + continue; rc = lod_sub_declare_attr_set(env, lo->ldo_stripe[i], attr, th); if (rc != 0) @@ -1390,7 +1423,7 @@ static int lod_attr_set(const struct lu_env *env, buf->lb_len = info->lti_ea_store_size; lmm = info->lti_ea_store; magic = le32_to_cpu(lmm->lmm_magic); - if (magic == LOV_MAGIC_COMP_V1) { + if (magic == LOV_MAGIC_COMP_V1 || magic == LOV_MAGIC_SEL) { struct lov_comp_md_v1 *lcm = buf->lb_buf; struct lov_comp_md_entry_v1 *lcme = &lcm->lcm_entries[0]; @@ -1424,7 +1457,8 @@ static int lod_attr_set(const struct lu_env *env, buf->lb_buf = info->lti_ea_store; buf->lb_len = info->lti_ea_store_size; lcm = buf->lb_buf; - if (le32_to_cpu(lcm->lcm_magic) != LOV_MAGIC_COMP_V1) + if (le32_to_cpu(lcm->lcm_magic) != LOV_MAGIC_COMP_V1 && + le32_to_cpu(lcm->lcm_magic) != LOV_MAGIC_SEL) RETURN(-EINVAL); le32_add_cpu(&lcm->lcm_layout_gen, 1); @@ -1459,21 +1493,35 @@ static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt, rc = dt_xattr_get(env, dt_object_child(dt), buf, name); if (strcmp(name, XATTR_NAME_LMV) == 0) { struct lmv_mds_md_v1 *lmv1; + struct lmv_foreign_md *lfm; int rc1 = 0; if (rc > (typeof(rc))sizeof(*lmv1)) RETURN(rc); - if (rc < (typeof(rc))sizeof(*lmv1)) + /* short (<= sizeof(struct lmv_mds_md_v1)) foreign LMV case */ + /* XXX empty foreign LMV is not allowed */ + if (rc <= offsetof(typeof(*lfm), lfm_value)) RETURN(rc = rc > 0 ? -EINVAL : rc); if (buf->lb_buf == NULL || buf->lb_len == 0) { CLASSERT(sizeof(*lmv1) <= sizeof(info->lti_key)); + /* lti_buf is large enough for *lmv1 or a short + * (<= sizeof(struct lmv_mds_md_v1)) foreign LMV + */ info->lti_buf.lb_buf = info->lti_key; info->lti_buf.lb_len = sizeof(*lmv1); rc = dt_xattr_get(env, dt_object_child(dt), &info->lti_buf, name); + if (unlikely(rc <= offsetof(typeof(*lfm), + lfm_value))) + RETURN(rc = rc > 0 ? -EINVAL : rc); + + lfm = info->lti_buf.lb_buf; + if (le32_to_cpu(lfm->lfm_magic) == LMV_MAGIC_FOREIGN) + RETURN(rc); + if (unlikely(rc != sizeof(*lmv1))) RETURN(rc = rc > 0 ? -EINVAL : rc); @@ -1486,6 +1534,13 @@ static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt, le32_to_cpu(lmv1->lmv_stripe_count), LMV_MAGIC_V1); } else { + lfm = buf->lb_buf; + if (le32_to_cpu(lfm->lfm_magic) == LMV_MAGIC_FOREIGN) + RETURN(rc); + + if (rc != sizeof(*lmv1)) + RETURN(rc = rc > 0 ? -EINVAL : rc); + rc1 = lod_load_lmv_shards(env, lod_dt_obj(dt), buf, false); } @@ -1493,6 +1548,13 @@ static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt, RETURN(rc = rc1 != 0 ? rc1 : rc); } + if ((rc > 0) && buf->lb_buf && strcmp(name, XATTR_NAME_LOV) == 0) { + struct lov_comp_md_v1 *lcm = buf->lb_buf; + + if (lcm->lcm_magic == cpu_to_le32(LOV_MAGIC_SEL)) + lcm->lcm_magic = cpu_to_le32(LOV_MAGIC_COMP_V1); + } + if (rc != -ENODATA || !S_ISDIR(dt->do_lu.lo_header->loh_attr & S_IFMT)) RETURN(rc); @@ -1509,7 +1571,7 @@ static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt, if (is_root && strcmp(XATTR_NAME_LOV, name) == 0) { struct lov_user_md *lum = buf->lb_buf; - struct lov_desc *desc = &dev->lod_desc; + struct lov_desc *desc = &dev->lod_ost_descs.ltd_lov_desc; if (buf->lb_buf == NULL) { rc = sizeof(*lum); @@ -1615,9 +1677,15 @@ static int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt, } lmm1 = (struct lmv_mds_md_v1 *)info->lti_ea_store; + memset(lmm1, 0, sizeof(*lmm1)); lmm1->lmv_magic = cpu_to_le32(LMV_MAGIC); lmm1->lmv_stripe_count = cpu_to_le32(stripe_count); lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type); + if (lo->ldo_dir_hash_type & LMV_HASH_FLAG_MIGRATION) { + lmm1->lmv_migrate_hash = cpu_to_le32(lo->ldo_dir_migrate_hash); + lmm1->lmv_migrate_offset = + cpu_to_le32(lo->ldo_dir_migrate_offset); + } rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu), &mdtidx, &type); if (rc != 0) @@ -1661,7 +1729,8 @@ int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo, LASSERT(mutex_is_locked(&lo->ldo_layout_mutex)); - if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION) + /* XXX may be useless as not called for foreign LMV ?? */ + if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_FOREIGN) RETURN(0); if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_STRIPE) { @@ -1688,8 +1757,10 @@ int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo, __u32 idx; fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[i]); - if (!fid_is_sane(fid)) - GOTO(out, rc = -ESTALE); + if (!fid_is_sane(fid)) { + stripe[i] = NULL; + continue; + } rc = lod_fld_lookup(env, lod, fid, &idx, &type); if (rc != 0) @@ -1784,6 +1855,10 @@ static int lod_dir_declare_create_stripes(const struct lu_env *env, struct linkea_data ldata = { NULL }; struct lu_buf linkea_buf; + /* OBD_FAIL_MDS_STRIPE_FID may leave stripe uninitialized */ + if (!dto) + continue; + rc = lod_sub_declare_create(env, dto, attr, NULL, dof, th); if (rc != 0) GOTO(out, rc); @@ -1870,74 +1945,73 @@ out: RETURN(rc); } -static int lod_prep_md_striped_create(const struct lu_env *env, - struct dt_object *dt, - struct lu_attr *attr, - const struct lmv_user_md_v1 *lum, - struct dt_object_format *dof, - struct thandle *th) +/** + * Allocate a striping on a predefined set of MDTs. + * + * Allocates new striping using the MDT index range provided by the data from + * the lum_obejcts contained in the lmv_user_md passed to this method if + * \a is_specific is true; or allocates new layout starting from MDT index in + * lo->ldo_dir_stripe_offset. The exact order of MDTs is not important and + * varies depending on MDT status. The number of stripes needed and stripe + * offset are taken from the object. If that number cannot be met, then the + * function returns an error and then it's the caller's responsibility to + * release the stripes allocated. All the internal structures are protected, + * but no concurrent allocation is allowed on the same objects. + * + * \param[in] env execution environment for this thread + * \param[in] lo LOD object + * \param[out] stripes striping created + * \param[out] mdt_indices MDT indices of striping created + * \param[in] is_specific true if the MDTs are provided by lum; false if + * only the starting MDT index is provided + * + * \retval positive stripes allocated, including the first stripe allocated + * outside + * \retval negative errno on failure + */ +static int lod_mdt_alloc_specific(const struct lu_env *env, + struct lod_object *lo, + struct dt_object **stripes, + __u32 *mdt_indices, bool is_specific) { - struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev); - struct lod_tgt_descs *ltd = &lod->lod_mdt_descs; - struct lod_object *lo = lod_dt_obj(dt); - struct dt_object **stripe; - __u32 stripe_count; - int *idx_array; - __u32 master_index; - int rc = 0; - __u32 i; - __u32 j; - bool is_specific = false; - ENTRY; - - /* The lum has been verifed in lod_verify_md_striping */ - LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC || - le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC_SPECIFIC); - LASSERT(le32_to_cpu(lum->lum_stripe_count) > 0); - - stripe_count = le32_to_cpu(lum->lum_stripe_count); - - OBD_ALLOC(idx_array, sizeof(idx_array[0]) * stripe_count); - if (idx_array == NULL) - RETURN(-ENOMEM); - - OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_count); - if (stripe == NULL) - GOTO(out_free, rc = -ENOMEM); + struct lod_thread_info *info = lod_env_info(env); + struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); + struct lu_tgt_descs *ltd = &lod->lod_mdt_descs; + struct lu_tgt_desc *tgt = NULL; + struct lu_object_conf conf = { .loc_flags = LOC_F_NEW }; + struct dt_device *tgt_dt = NULL; + struct lu_fid fid = { 0 }; + struct dt_object *dto; + u32 master_index; + u32 stripe_count = lo->ldo_dir_stripe_count; + int stripe_idx = 1; + int j; + int idx; + int rc; - /* Start index must be the master MDT */ master_index = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id; - idx_array[0] = master_index; - if (le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC_SPECIFIC) { - is_specific = true; - for (i = 1; i < stripe_count; i++) - idx_array[i] = le32_to_cpu(lum->lum_objects[i].lum_mds); - } - - for (i = 0; i < stripe_count; i++) { - struct lod_tgt_desc *tgt = NULL; - struct dt_object *dto; - struct lu_fid fid = { 0 }; - int idx; - struct lu_object_conf conf = { 0 }; - struct dt_device *tgt_dt = NULL; + if (stripe_count > 1) + /* Set the start index for the 2nd stripe allocation */ + mdt_indices[1] = (mdt_indices[0] + 1) % + (lod->lod_remote_mdt_count + 1); + for (; stripe_idx < stripe_count; stripe_idx++) { /* Try to find next avaible target */ - idx = idx_array[i]; + idx = mdt_indices[stripe_idx]; for (j = 0; j < lod->lod_remote_mdt_count; j++, idx = (idx + 1) % (lod->lod_remote_mdt_count + 1)) { bool already_allocated = false; __u32 k; CDEBUG(D_INFO, "try idx %d, mdt cnt %u, allocated %u\n", - idx, lod->lod_remote_mdt_count + 1, i); + idx, lod->lod_remote_mdt_count + 1, stripe_idx); if (likely(!is_specific && !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))) { /* check whether the idx already exists * in current allocated array */ - for (k = 0; k < i; k++) { - if (idx_array[k] == idx) { + for (k = 0; k < stripe_idx; k++) { + if (mdt_indices[k] == idx) { already_allocated = true; break; } @@ -1958,29 +2032,25 @@ static int lod_prep_md_striped_create(const struct lu_env *env, rc = obd_fid_alloc(env, lod->lod_child_exp, &fid, NULL); if (rc < 0) - GOTO(out_put, rc); + continue; tgt_dt = lod->lod_child; break; } /* check the status of the OSP */ tgt = LTD_TGT(ltd, idx); - if (tgt == NULL) + if (!tgt) continue; tgt_dt = tgt->ltd_tgt; - rc = dt_statfs(env, tgt_dt, NULL); - if (rc) { + rc = dt_statfs(env, tgt_dt, &info->lti_osfs); + if (rc) /* this OSP doesn't feel well */ - rc = 0; continue; - } rc = obd_fid_alloc(env, tgt->ltd_exp, &fid, NULL); - if (rc < 0) { - rc = 0; + if (rc < 0) continue; - } break; } @@ -1988,15 +2058,16 @@ static int lod_prep_md_striped_create(const struct lu_env *env, /* Can not allocate more stripes */ if (j == lod->lod_remote_mdt_count) { CDEBUG(D_INFO, "%s: require stripes %u only get %d\n", - lod2obd(lod)->obd_name, stripe_count, i); + lod2obd(lod)->obd_name, stripe_count, + stripe_idx); break; } CDEBUG(D_INFO, "Get idx %d, for stripe %d "DFID"\n", - idx, i, PFID(&fid)); - idx_array[i] = idx; + idx, stripe_idx, PFID(&fid)); + mdt_indices[stripe_idx] = idx; /* Set the start index for next stripe allocation */ - if (!is_specific && i < stripe_count - 1) { + if (!is_specific && stripe_idx < stripe_count - 1) { /* * for large dir test, put all other slaves on one * remote MDT, otherwise we may save too many local @@ -2004,51 +2075,154 @@ static int lod_prep_md_striped_create(const struct lu_env *env, */ if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))) idx = master_index; - idx_array[i + 1] = (idx + 1) % + mdt_indices[stripe_idx + 1] = (idx + 1) % (lod->lod_remote_mdt_count + 1); } /* tgt_dt and fid must be ready after search avaible OSP * in the above loop */ LASSERT(tgt_dt != NULL); LASSERT(fid_is_sane(&fid)); - conf.loc_flags = LOC_F_NEW; + + /* fail a remote stripe FID allocation */ + if (stripe_idx && OBD_FAIL_CHECK(OBD_FAIL_MDS_STRIPE_FID)) + continue; + dto = dt_locate_at(env, tgt_dt, &fid, - dt->do_lu.lo_dev->ld_site->ls_top_dev, - &conf); - if (IS_ERR(dto)) - GOTO(out_put, rc = PTR_ERR(dto)); - stripe[i] = dto; + lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev, + &conf); + if (IS_ERR(dto)) { + rc = PTR_ERR(dto); + goto error; + } + + stripes[stripe_idx] = dto; + } + + return stripe_idx; + +error: + for (j = 1; j < stripe_idx; j++) { + LASSERT(stripes[j] != NULL); + dt_object_put(env, stripes[j]); + stripes[j] = NULL; + } + return rc; +} + +static int lod_prep_md_striped_create(const struct lu_env *env, + struct dt_object *dt, + struct lu_attr *attr, + const struct lmv_user_md_v1 *lum, + struct dt_object_format *dof, + struct thandle *th) +{ + struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev); + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object **stripes; + struct lu_object_conf conf = { .loc_flags = LOC_F_NEW }; + struct lu_fid fid = { 0 }; + __u32 stripe_count; + int i; + int rc = 0; + + ENTRY; + + /* The lum has been verifed in lod_verify_md_striping */ + LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC || + le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC_SPECIFIC); + + stripe_count = lo->ldo_dir_stripe_count; + + OBD_ALLOC(stripes, sizeof(stripes[0]) * stripe_count); + if (!stripes) + RETURN(-ENOMEM); + + /* Allocate the first stripe locally */ + rc = obd_fid_alloc(env, lod->lod_child_exp, &fid, NULL); + if (rc < 0) + GOTO(out, rc); + + stripes[0] = dt_locate_at(env, lod->lod_child, &fid, + dt->do_lu.lo_dev->ld_site->ls_top_dev, &conf); + if (IS_ERR(stripes[0])) + GOTO(out, rc = PTR_ERR(stripes[0])); + + if (lo->ldo_dir_stripe_offset == LMV_OFFSET_DEFAULT) { + lod_qos_statfs_update(env, lod, &lod->lod_mdt_descs); + rc = lod_mdt_alloc_qos(env, lo, stripes); + if (rc == -EAGAIN) + rc = lod_mdt_alloc_rr(env, lo, stripes); + } else { + int *idx_array; + bool is_specific = false; + + OBD_ALLOC(idx_array, sizeof(idx_array[0]) * stripe_count); + if (!idx_array) + GOTO(out, rc = -ENOMEM); + + if (le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC_SPECIFIC) { + is_specific = true; + for (i = 0; i < stripe_count; i++) + idx_array[i] = + le32_to_cpu(lum->lum_objects[i].lum_mds); + } + + /* stripe 0 is local */ + idx_array[0] = + lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id; + rc = lod_mdt_alloc_specific(env, lo, stripes, idx_array, + is_specific); + OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count); } + if (rc < 0) + GOTO(out, rc); + + LASSERT(rc > 0); + lo->ldo_dir_striped = 1; - lo->ldo_stripe = stripe; - lo->ldo_dir_stripe_count = i; + lo->ldo_stripe = stripes; + lo->ldo_dir_stripe_count = rc; lo->ldo_dir_stripes_allocated = stripe_count; smp_mb(); lo->ldo_dir_stripe_loaded = 1; - if (lo->ldo_dir_stripe_count == 0) - GOTO(out_put, rc = -ENOSPC); - rc = lod_dir_declare_create_stripes(env, dt, attr, dof, th); - if (rc != 0) - GOTO(out_put, rc); + if (rc < 0) + lod_striping_free(env, lo); -out_put: - if (rc < 0) { - for (i = 0; i < stripe_count; i++) - if (stripe[i] != NULL) - dt_object_put(env, stripe[i]); - OBD_FREE(stripe, sizeof(stripe[0]) * stripe_count); - lo->ldo_dir_stripe_count = 0; - lo->ldo_dir_stripes_allocated = 0; - lo->ldo_stripe = NULL; - } + RETURN(rc); -out_free: - OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count); +out: + LASSERT(rc < 0); + if (!IS_ERR_OR_NULL(stripes[0])) + dt_object_put(env, stripes[0]); + for (i = 1; i < stripe_count; i++) + LASSERT(!stripes[i]); + OBD_FREE(stripes, sizeof(stripes[0]) * stripe_count); - RETURN(rc); + return rc; +} + +/** + * + * Alloc cached foreign LMV + * + * \param[in] lo object + * \param[in] size size of foreign LMV + * + * \retval 0 on success + * \retval negative if failed + */ +int lod_alloc_foreign_lmv(struct lod_object *lo, size_t size) +{ + OBD_ALLOC_LARGE(lo->ldo_foreign_lmv, size); + if (lo->ldo_foreign_lmv == NULL) + return -ENOMEM; + lo->ldo_foreign_lmv_size = size; + lo->ldo_dir_is_foreign = 1; + + return 0; } /** @@ -2088,8 +2262,16 @@ static int lod_declare_xattr_set_lmv(const struct lu_env *env, le32_to_cpu(lum->lum_magic), le32_to_cpu(lum->lum_stripe_count), (int)le32_to_cpu(lum->lum_stripe_offset)); - if (lo->ldo_dir_stripe_count == 0) + if (lo->ldo_dir_stripe_count == 0) { + if (lo->ldo_dir_is_foreign) { + rc = lod_alloc_foreign_lmv(lo, lum_buf->lb_len); + if (rc != 0) + GOTO(out, rc); + memcpy(lo->ldo_foreign_lmv, lum, lum_buf->lb_len); + lo->ldo_dir_stripe_loaded = 1; + } GOTO(out, rc = 0); + } /* prepare dir striped objects */ rc = lod_prep_md_striped_create(env, dt, attr, lum, dof, th); @@ -2104,51 +2286,331 @@ out: } /** - * Implementation of dt_object_operations::do_declare_xattr_set. + * Append source stripes after target stripes for migrating directory. NB, we + * only need to declare this, the append is done inside lod_xattr_set_lmv(). * - * Used with regular (non-striped) objects. Basically it - * initializes the striping information and applies the - * change to all the stripes. + * \param[in] env execution environment + * \param[in] dt target object + * \param[in] buf LMV buf which contains source stripe fids + * \param[in] th transaction handle * - * \see dt_object_operations::do_declare_xattr_set() in the API description - * for details. + * \retval 0 on success + * \retval negative if failed */ -static int lod_dir_declare_xattr_set(const struct lu_env *env, - struct dt_object *dt, - const struct lu_buf *buf, - const char *name, int fl, - struct thandle *th) +static int lod_dir_declare_layout_add(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + struct thandle *th) { - struct dt_object *next = dt_object_child(dt); - struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); - struct lod_object *lo = lod_dt_obj(dt); - int i; - int rc; + struct lod_thread_info *info = lod_env_info(env); + struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev); + struct lod_tgt_descs *ltd = &lod->lod_mdt_descs; + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object *next = dt_object_child(dt); + struct dt_object_format *dof = &info->lti_format; + struct lmv_mds_md_v1 *lmv = buf->lb_buf; + struct dt_object **stripe; + __u32 stripe_count = le32_to_cpu(lmv->lmv_stripe_count); + struct lu_fid *fid = &info->lti_fid; + struct lod_tgt_desc *tgt; + struct dt_object *dto; + struct dt_device *tgt_dt; + int type = LU_SEQ_RANGE_ANY; + struct dt_insert_rec *rec = &info->lti_dt_rec; + char *stripe_name = info->lti_key; + struct lu_name *sname; + struct linkea_data ldata = { NULL }; + struct lu_buf linkea_buf; + __u32 idx; + int i; + int rc; + ENTRY; - if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { - struct lmv_user_md_v1 *lum; + if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1) + RETURN(-EINVAL); - LASSERT(buf != NULL && buf->lb_buf != NULL); - lum = buf->lb_buf; - rc = lod_verify_md_striping(d, lum); - if (rc != 0) - RETURN(rc); - } else if (strcmp(name, XATTR_NAME_LOV) == 0) { - rc = lod_verify_striping(d, lo, buf, false); - if (rc != 0) - RETURN(rc); - } + if (stripe_count == 0) + RETURN(-EINVAL); - rc = lod_sub_declare_xattr_set(env, next, buf, name, fl, th); - if (rc != 0) - RETURN(rc); + dof->dof_type = DFT_DIR; - /* Note: Do not set LinkEA on sub-stripes, otherwise - * it will confuse the fid2path process(see mdt_path_current()). - * The linkEA between master and sub-stripes is set in - * lod_xattr_set_lmv(). */ - if (strcmp(name, XATTR_NAME_LINK) == 0) + OBD_ALLOC(stripe, + sizeof(*stripe) * (lo->ldo_dir_stripe_count + stripe_count)); + if (stripe == NULL) + RETURN(-ENOMEM); + + for (i = 0; i < lo->ldo_dir_stripe_count; i++) + stripe[i] = lo->ldo_stripe[i]; + + for (i = 0; i < stripe_count; i++) { + fid_le_to_cpu(fid, + &lmv->lmv_stripe_fids[i]); + if (!fid_is_sane(fid)) + continue; + + rc = lod_fld_lookup(env, lod, fid, &idx, &type); + if (rc) + GOTO(out, rc); + + if (idx == lod2lu_dev(lod)->ld_site->ld_seq_site->ss_node_id) { + tgt_dt = lod->lod_child; + } else { + tgt = LTD_TGT(ltd, idx); + if (tgt == NULL) + GOTO(out, rc = -ESTALE); + tgt_dt = tgt->ltd_tgt; + } + + dto = dt_locate_at(env, tgt_dt, fid, + lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev, + NULL); + if (IS_ERR(dto)) + GOTO(out, rc = PTR_ERR(dto)); + + stripe[i + lo->ldo_dir_stripe_count] = dto; + + if (!dt_try_as_dir(env, dto)) + GOTO(out, rc = -ENOTDIR); + + rc = lod_sub_declare_ref_add(env, dto, th); + if (rc) + GOTO(out, rc); + + rc = lod_sub_declare_insert(env, dto, + (const struct dt_rec *)rec, + (const struct dt_key *)dot, th); + if (rc) + GOTO(out, rc); + + rc = lod_sub_declare_insert(env, dto, + (const struct dt_rec *)rec, + (const struct dt_key *)dotdot, th); + if (rc) + GOTO(out, rc); + + rc = lod_sub_declare_xattr_set(env, dto, buf, + XATTR_NAME_LMV, 0, th); + if (rc) + GOTO(out, rc); + + snprintf(stripe_name, sizeof(info->lti_key), DFID":%u", + PFID(lu_object_fid(&dto->do_lu)), + i + lo->ldo_dir_stripe_count); + + sname = lod_name_get(env, stripe_name, strlen(stripe_name)); + rc = linkea_links_new(&ldata, &info->lti_linkea_buf, + sname, lu_object_fid(&dt->do_lu)); + if (rc) + GOTO(out, rc); + + linkea_buf.lb_buf = ldata.ld_buf->lb_buf; + linkea_buf.lb_len = ldata.ld_leh->leh_len; + rc = lod_sub_declare_xattr_set(env, dto, &linkea_buf, + XATTR_NAME_LINK, 0, th); + if (rc) + GOTO(out, rc); + + rc = lod_sub_declare_insert(env, next, + (const struct dt_rec *)rec, + (const struct dt_key *)stripe_name, + th); + if (rc) + GOTO(out, rc); + + rc = lod_sub_declare_ref_add(env, next, th); + if (rc) + GOTO(out, rc); + } + + if (lo->ldo_stripe) + OBD_FREE(lo->ldo_stripe, + sizeof(*stripe) * lo->ldo_dir_stripes_allocated); + lo->ldo_stripe = stripe; + lo->ldo_dir_migrate_offset = lo->ldo_dir_stripe_count; + lo->ldo_dir_migrate_hash = le32_to_cpu(lmv->lmv_hash_type); + lo->ldo_dir_stripe_count += stripe_count; + lo->ldo_dir_stripes_allocated += stripe_count; + lo->ldo_dir_hash_type |= LMV_HASH_FLAG_MIGRATION; + + RETURN(0); +out: + i = lo->ldo_dir_stripe_count; + while (i < lo->ldo_dir_stripe_count + stripe_count && stripe[i]) + dt_object_put(env, stripe[i++]); + + OBD_FREE(stripe, + sizeof(*stripe) * (stripe_count + lo->ldo_dir_stripe_count)); + RETURN(rc); +} + +static int lod_dir_declare_layout_delete(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object *next = dt_object_child(dt); + struct lmv_user_md *lmu = buf->lb_buf; + __u32 final_stripe_count; + char *stripe_name = info->lti_key; + struct dt_object *dto; + int i; + int rc = 0; + + if (!lmu) + return -EINVAL; + + final_stripe_count = le32_to_cpu(lmu->lum_stripe_count); + if (final_stripe_count >= lo->ldo_dir_stripe_count) + return -EINVAL; + + for (i = final_stripe_count; i < lo->ldo_dir_stripe_count; i++) { + dto = lo->ldo_stripe[i]; + if (!dto) + continue; + + if (!dt_try_as_dir(env, dto)) + return -ENOTDIR; + + rc = lod_sub_declare_delete(env, dto, + (const struct dt_key *)dot, th); + if (rc) + return rc; + + rc = lod_sub_declare_ref_del(env, dto, th); + if (rc) + return rc; + + rc = lod_sub_declare_delete(env, dto, + (const struct dt_key *)dotdot, th); + if (rc) + return rc; + + snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", + PFID(lu_object_fid(&dto->do_lu)), i); + + rc = lod_sub_declare_delete(env, next, + (const struct dt_key *)stripe_name, th); + if (rc) + return rc; + + rc = lod_sub_declare_ref_del(env, next, th); + if (rc) + return rc; + } + + return 0; +} + +/* + * delete stripes from dir master object, the lum_stripe_count in argument is + * the final stripe count, the stripes after that will be deleted, NB, they + * are not destroyed, but deleted from it's parent namespace, this function + * will be called in two places: + * 1. mdd_migrate_create() delete stripes from source, and append them to + * target. + * 2. mdd_dir_layout_shrink() delete stripes from source, and destroy them. + */ +static int lod_dir_layout_delete(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object *next = dt_object_child(dt); + struct lmv_user_md *lmu = buf->lb_buf; + __u32 final_stripe_count; + char *stripe_name = info->lti_key; + struct dt_object *dto; + int i; + int rc = 0; + + ENTRY; + + if (!lmu) + RETURN(-EINVAL); + + final_stripe_count = le32_to_cpu(lmu->lum_stripe_count); + if (final_stripe_count >= lo->ldo_dir_stripe_count) + RETURN(-EINVAL); + + for (i = final_stripe_count; i < lo->ldo_dir_stripe_count; i++) { + dto = lo->ldo_stripe[i]; + if (!dto) + continue; + + rc = lod_sub_delete(env, dto, + (const struct dt_key *)dotdot, th); + if (rc) + break; + + snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", + PFID(lu_object_fid(&dto->do_lu)), i); + + rc = lod_sub_delete(env, next, + (const struct dt_key *)stripe_name, th); + if (rc) + break; + + rc = lod_sub_ref_del(env, next, th); + if (rc) + break; + } + + lod_striping_free(env, lod_dt_obj(dt)); + + RETURN(rc); +} + +/** + * Implementation of dt_object_operations::do_declare_xattr_set. + * + * Used with regular (non-striped) objects. Basically it + * initializes the striping information and applies the + * change to all the stripes. + * + * \see dt_object_operations::do_declare_xattr_set() in the API description + * for details. + */ +static int lod_dir_declare_xattr_set(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + const char *name, int fl, + struct thandle *th) +{ + struct dt_object *next = dt_object_child(dt); + struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); + struct lod_object *lo = lod_dt_obj(dt); + int i; + int rc; + ENTRY; + + if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { + struct lmv_user_md_v1 *lum; + + LASSERT(buf != NULL && buf->lb_buf != NULL); + lum = buf->lb_buf; + rc = lod_verify_md_striping(d, lum); + if (rc != 0) + RETURN(rc); + } else if (strcmp(name, XATTR_NAME_LOV) == 0) { + rc = lod_verify_striping(d, lo, buf, false); + if (rc != 0) + RETURN(rc); + } + + rc = lod_sub_declare_xattr_set(env, next, buf, name, fl, th); + if (rc != 0) + RETURN(rc); + + /* Note: Do not set LinkEA on sub-stripes, otherwise + * it will confuse the fid2path process(see mdt_path_current()). + * The linkEA between master and sub-stripes is set in + * lod_xattr_set_lmv(). */ + if (strcmp(name, XATTR_NAME_LINK) == 0) RETURN(0); /* set xattr to each stripes, if needed */ @@ -2160,7 +2622,11 @@ static int lod_dir_declare_xattr_set(const struct lu_env *env, RETURN(0); for (i = 0; i < lo->ldo_dir_stripe_count; i++) { - LASSERT(lo->ldo_stripe[i]); + if (!lo->ldo_stripe[i]) + continue; + + if (!dt_object_exists(lo->ldo_stripe[i])) + continue; rc = lod_sub_declare_xattr_set(env, lo->ldo_stripe[i], buf, name, fl, th); @@ -2193,14 +2659,25 @@ lod_obj_stripe_replace_parent_fid_cb(const struct lu_env *env, return rc; } - filter_fid_le_to_cpu(ff, ff, sizeof(*ff)); - if (lu_fid_eq(lu_object_fid(&lo->ldo_obj.do_lu), &ff->ff_parent) && - ff->ff_layout.ol_comp_id == comp->llc_id) - return 0; + /* + * locd_buf is set if it's called by dir migration, which doesn't check + * pfid and comp id. + */ + if (data->locd_buf) { + memset(ff, 0, sizeof(*ff)); + ff->ff_parent = *(struct lu_fid *)data->locd_buf->lb_buf; + } else { + filter_fid_le_to_cpu(ff, ff, sizeof(*ff)); + + if (lu_fid_eq(lod_object_fid(lo), &ff->ff_parent) && + ff->ff_layout.ol_comp_id == comp->llc_id) + return 0; + + memset(ff, 0, sizeof(*ff)); + ff->ff_parent = *lu_object_fid(&lo->ldo_obj.do_lu); + } /* rewrite filter_fid */ - memset(ff, 0, sizeof(*ff)); - ff->ff_parent = *lu_object_fid(&lo->ldo_obj.do_lu); ff->ff_parent.f_ver = stripe_idx; ff->ff_layout.ol_stripe_size = comp->llc_stripe_size; ff->ff_layout.ol_stripe_count = comp->llc_stripe_count; @@ -2236,11 +2713,11 @@ lod_obj_stripe_replace_parent_fid_cb(const struct lu_env *env, */ static int lod_replace_parent_fid(const struct lu_env *env, struct dt_object *dt, + const struct lu_buf *buf, struct thandle *th, bool declare) { struct lod_object *lo = lod_dt_obj(dt); struct lod_thread_info *info = lod_env_info(env); - struct lu_buf *buf = &info->lti_buf; struct filter_fid *ff; struct lod_obj_stripe_cb_data data = { { 0 } }; int rc; @@ -2262,11 +2739,9 @@ static int lod_replace_parent_fid(const struct lu_env *env, RETURN(rc); } - buf->lb_buf = info->lti_ea_store; - buf->lb_len = info->lti_ea_store_size; - data.locd_declare = declare; data.locd_stripe_cb = lod_obj_stripe_replace_parent_fid_cb; + data.locd_buf = buf; rc = lod_obj_for_each_stripe(env, lo, th, &data); RETURN(rc); @@ -2283,9 +2758,10 @@ inline __u16 lod_comp_entry_stripe_count(struct lod_object *lo, else if (lod_comp_inited(entry)) return entry->llc_stripe_count; else if ((__u16)-1 == entry->llc_stripe_count) - return lod->lod_desc.ld_tgt_count; + return lod->lod_ost_count; else - return lod_get_stripe_count(lod, lo, entry->llc_stripe_count); + return lod_get_stripe_count(lod, lo, + entry->llc_stripe_count, false); } static int lod_comp_md_size(struct lod_object *lo, bool is_dir) @@ -2293,7 +2769,7 @@ static int lod_comp_md_size(struct lod_object *lo, bool is_dir) int magic, size = 0, i; struct lod_layout_component *comp_entries; __u16 comp_cnt; - bool is_composite; + bool is_composite, is_foreign = false; if (is_dir) { comp_cnt = lo->ldo_def_striping->lds_def_comp_cnt; @@ -2304,8 +2780,11 @@ static int lod_comp_md_size(struct lod_object *lo, bool is_dir) comp_cnt = lo->ldo_comp_cnt; comp_entries = lo->ldo_comp_entries; is_composite = lo->ldo_is_composite; + is_foreign = lo->ldo_is_foreign; } + if (is_foreign) + return lo->ldo_foreign_lov_size; LASSERT(comp_cnt != 0 && comp_entries != NULL); if (is_composite) { @@ -2350,14 +2829,14 @@ static int lod_declare_layout_add(const struct lu_env *env, { struct lod_thread_info *info = lod_env_info(env); struct lod_layout_component *comp_array, *lod_comp, *old_array; - struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); + struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); struct dt_object *next = dt_object_child(dt); - struct lov_desc *desc = &d->lod_desc; - struct lod_object *lo = lod_dt_obj(dt); - struct lov_user_md_v3 *v3; - struct lov_comp_md_v1 *comp_v1 = buf->lb_buf; - __u32 magic; - int i, rc, array_cnt, old_array_cnt; + struct lov_desc *desc = &d->lod_ost_descs.ltd_lov_desc; + struct lod_object *lo = lod_dt_obj(dt); + struct lov_user_md_v3 *v3; + struct lov_comp_md_v1 *comp_v1 = buf->lb_buf; + __u32 magic; + int i, rc, array_cnt, old_array_cnt; ENTRY; LASSERT(lo->ldo_is_composite); @@ -2402,7 +2881,7 @@ static int lod_declare_layout_add(const struct lu_env *env, lod_comp->llc_stripe_count = v1->lmm_stripe_count; lod_comp->llc_stripe_size = v1->lmm_stripe_size; - lod_adjust_stripe_info(lod_comp, desc); + lod_adjust_stripe_info(lod_comp, desc, 0); if (v1->lmm_magic == LOV_USER_MAGIC_V3) { v3 = (struct lov_user_md_v3 *) v1; @@ -2454,6 +2933,42 @@ error: } /** + * lod_last_non_stale_mirror() - Check if a mirror is the last non-stale mirror. + * @mirror_id: Mirror id to be checked. + * @lo: LOD object. + * + * This function checks if a mirror with specified @mirror_id is the last + * non-stale mirror of a LOD object @lo. + * + * Return: true or false. + */ +static inline +bool lod_last_non_stale_mirror(__u16 mirror_id, struct lod_object *lo) +{ + struct lod_layout_component *lod_comp; + bool has_stale_flag; + int i; + + for (i = 0; i < lo->ldo_mirror_count; i++) { + if (lo->ldo_mirrors[i].lme_id == mirror_id || + lo->ldo_mirrors[i].lme_stale) + continue; + + has_stale_flag = false; + lod_foreach_mirror_comp(lod_comp, lo, i) { + if (lod_comp->llc_flags & LCME_FL_STALE) { + has_stale_flag = true; + break; + } + } + if (!has_stale_flag) + return false; + } + + return true; +} + +/** * Declare component set. The xattr is name XATTR_LUSTRE_LOV.set.$field, * the '$field' can only be 'flags' now. The xattr value is binary * lov_comp_md_v1 which contains the component ID(s) and the value of @@ -2507,6 +3022,9 @@ static int lod_declare_layout_set(const struct lu_env *env, for (i = 0; i < comp_v1->lcm_entry_count; i++) { __u32 id = comp_v1->lcm_entries[i].lcme_id; __u32 flags = comp_v1->lcm_entries[i].lcme_flags; + __u32 mirror_flag = flags & LCME_MIRROR_FLAGS; + __u16 mirror_id = mirror_id_of(id); + bool neg = flags & LCME_FL_NEG; if (flags & LCME_FL_INIT) { if (changed) @@ -2514,16 +3032,35 @@ static int lod_declare_layout_set(const struct lu_env *env, RETURN(-EINVAL); } + flags &= ~(LCME_MIRROR_FLAGS | LCME_FL_NEG); for (j = 0; j < lo->ldo_comp_cnt; j++) { lod_comp = &lo->ldo_comp_entries[j]; - if (id != lod_comp->llc_id) + + /* lfs only put one flag in each entry */ + if ((flags && id != lod_comp->llc_id) || + (mirror_flag && mirror_id != + mirror_id_of(lod_comp->llc_id))) continue; - if (flags & LCME_FL_NEG) { - flags &= ~LCME_FL_NEG; - lod_comp->llc_flags &= ~flags; + if (neg) { + if (flags) + lod_comp->llc_flags &= ~flags; + if (mirror_flag) + lod_comp->llc_flags &= ~mirror_flag; } else { - lod_comp->llc_flags |= flags; + if (flags) { + if ((flags & LCME_FL_STALE) && + lod_last_non_stale_mirror(mirror_id, + lo)) + RETURN(-EUCLEAN); + lod_comp->llc_flags |= flags; + } + if (mirror_flag) { + lod_comp->llc_flags |= mirror_flag; + if (mirror_flag & LCME_FL_NOSYNC) + lod_comp->llc_timestamp = + ktime_get_real_seconds(); + } } changed = true; } @@ -2818,6 +3355,7 @@ static int lod_declare_layout_merge(const struct lu_env *env, struct lov_comp_md_v1 *cur_lcm; struct lov_comp_md_v1 *merge_lcm; struct lov_comp_md_entry_v1 *lcme; + struct lov_mds_md_v1 *lmm; size_t size = 0; size_t offset; __u16 cur_entry_count; @@ -2826,6 +3364,8 @@ static int lod_declare_layout_merge(const struct lu_env *env, __u16 mirror_id = 0; __u32 mirror_count; int rc, i; + bool merge_has_dom; + ENTRY; merge_lcm = mbuf->lb_buf; @@ -2854,6 +3394,7 @@ static int lod_declare_layout_merge(const struct lu_env *env, rc = lod_layout_convert(info); break; case LOV_MAGIC_COMP_V1: + case LOV_MAGIC_SEL: rc = 0; break; default: @@ -2898,8 +3439,12 @@ static int lod_declare_layout_merge(const struct lu_env *env, offset += le32_to_cpu(lcme->lcme_size); - if (mirror_count == 1) { - /* new mirrored file, create new mirror ID */ + if (mirror_count == 1 && + mirror_id_of(le32_to_cpu(lcme->lcme_id)) == 0) { + /* Add mirror from a non-flr file, create new mirror ID. + * Otherwise, keep existing mirror's component ID, used + * for mirror extension. + */ id = pflr_id(1, i + 1); lcme->lcme_id = cpu_to_le32(id); } @@ -2908,6 +3453,13 @@ static int lod_declare_layout_merge(const struct lu_env *env, } mirror_id = mirror_id_of(id) + 1; + + /* check if first entry in new layout is DOM */ + lmm = (struct lov_mds_md_v1 *)((char *)merge_lcm + + merge_lcm->lcm_entries[0].lcme_offset); + merge_has_dom = lov_pattern(le32_to_cpu(lmm->lmm_pattern)) == + LOV_PATTERN_MDT; + for (i = 0; i < merge_entry_count; i++) { struct lov_comp_md_entry_v1 *merge_lcme; @@ -2916,6 +3468,8 @@ static int lod_declare_layout_merge(const struct lu_env *env, *lcme = *merge_lcme; lcme->lcme_offset = cpu_to_le32(offset); + if (merge_has_dom && i == 0) + lcme->lcme_flags |= cpu_to_le32(LCME_FL_STALE); id = pflr_id(mirror_id, i + 1); lcme->lcme_id = cpu_to_le32(id); @@ -3038,10 +3592,24 @@ static int lod_declare_xattr_set(const struct lu_env *env, RETURN(-ENOENT); rc = lod_declare_modify_layout(env, dt, name, buf, th); + } else if (strncmp(name, XATTR_NAME_LMV, strlen(XATTR_NAME_LMV)) == 0 && + strlen(name) > strlen(XATTR_NAME_LMV) + 1) { + const char *op = name + strlen(XATTR_NAME_LMV) + 1; + + rc = -ENOTSUPP; + if (strcmp(op, "add") == 0) + rc = lod_dir_declare_layout_add(env, dt, buf, th); + else if (strcmp(op, "del") == 0) + rc = lod_dir_declare_layout_delete(env, dt, buf, th); + else if (strcmp(op, "set") == 0) + rc = lod_sub_declare_xattr_set(env, next, buf, + XATTR_NAME_LMV, fl, th); + + RETURN(rc); } else if (S_ISDIR(mode)) { rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th); } else if (strcmp(name, XATTR_NAME_FID) == 0) { - rc = lod_replace_parent_fid(env, dt, th, true); + rc = lod_replace_parent_fid(env, dt, buf, th, true); } else { rc = lod_sub_declare_xattr_set(env, next, buf, name, fl, th); } @@ -3088,7 +3656,11 @@ static int lod_xattr_set_internal(const struct lu_env *env, RETURN(0); for (i = 0; i < lo->ldo_dir_stripe_count; i++) { - LASSERT(lo->ldo_stripe[i]); + if (!lo->ldo_stripe[i]) + continue; + + if (!dt_object_exists(lo->ldo_stripe[i])) + continue; rc = lod_sub_xattr_set(env, lo->ldo_stripe[i], buf, name, fl, th); @@ -3175,6 +3747,7 @@ static int lod_xattr_set_lov_on_dir(const struct lu_env *env, lum = buf->lb_buf; switch (lum->lmm_magic) { + case LOV_USER_MAGIC_SPECIFIC: case LOV_USER_MAGIC_V3: v3 = buf->lb_buf; if (v3->lmm_pool_name[0] != '\0') @@ -3197,9 +3770,24 @@ static int lod_xattr_set_lov_on_dir(const struct lu_env *env, pool_name); break; case LOV_USER_MAGIC_COMP_V1: - is_del = false; - break; - default: + { + struct lov_comp_md_v1 *lcm = (struct lov_comp_md_v1 *)lum; + struct lov_comp_md_entry_v1 *lcme; + int i, comp_cnt; + + comp_cnt = le16_to_cpu(lcm->lcm_entry_count); + for (i = 0; i < comp_cnt; i++) { + lcme = &lcm->lcm_entries[i]; + if (lcme->lcme_flags & cpu_to_le32(LCME_FL_EXTENSION)) { + lcm->lcm_magic = cpu_to_le32(LOV_MAGIC_SEL); + break; + } + } + + is_del = false; + break; + } + default: CERROR("Invalid magic %x\n", lum->lmm_magic); RETURN(-EINVAL); } @@ -3239,20 +3827,23 @@ static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env, const char *name, int fl, struct thandle *th) { - struct lmv_user_md_v1 *lum; - int rc; + struct lmv_user_md_v1 *lum; + int rc; + ENTRY; LASSERT(buf != NULL && buf->lb_buf != NULL); lum = buf->lb_buf; - CDEBUG(D_OTHER, "set default stripe_count # %u stripe_offset %d\n", + CDEBUG(D_INFO, + "set default stripe_count # %u stripe_offset %d hash %u\n", le32_to_cpu(lum->lum_stripe_count), - (int)le32_to_cpu(lum->lum_stripe_offset)); + (int)le32_to_cpu(lum->lum_stripe_offset), + le32_to_cpu(lum->lum_hash_type)); if (LMVEA_DELETE_VALUES((le32_to_cpu(lum->lum_stripe_count)), le32_to_cpu(lum->lum_stripe_offset)) && - le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC) { + le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC) { rc = lod_xattr_del_internal(env, dt, name, th); if (rc == -ENODATA) rc = 0; @@ -3306,8 +3897,15 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, /* The stripes are supposed to be allocated in declare phase, * if there are no stripes being allocated, it will skip */ - if (lo->ldo_dir_stripe_count == 0) + if (lo->ldo_dir_stripe_count == 0) { + if (lo->ldo_dir_is_foreign) { + rc = lod_sub_xattr_set(env, dt_object_child(dt), buf, + XATTR_NAME_LMV, fl, th); + if (rc != 0) + RETURN(rc); + } RETURN(0); + } rc = dt_attr_get(env, dt_object_child(dt), attr); if (rc != 0) @@ -3332,35 +3930,46 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, rec->rec_type = S_IFDIR; for (i = 0; i < lo->ldo_dir_stripe_count; i++) { - struct dt_object *dto; - char *stripe_name = info->lti_key; - struct lu_name *sname; - struct linkea_data ldata = { NULL }; - struct lu_buf linkea_buf; + struct dt_object *dto = lo->ldo_stripe[i]; + char *stripe_name = info->lti_key; + struct lu_name *sname; + struct linkea_data ldata = { NULL }; + struct lu_buf linkea_buf; + + /* OBD_FAIL_MDS_STRIPE_FID may leave stripe uninitialized */ + if (!dto) + continue; - dto = lo->ldo_stripe[i]; + /* fail a remote stripe creation */ + if (i && OBD_FAIL_CHECK(OBD_FAIL_MDS_STRIPE_CREATE)) + continue; - dt_write_lock(env, dto, MOR_TGT_CHILD); - rc = lod_sub_create(env, dto, attr, NULL, dof, th); - if (rc != 0) { - dt_write_unlock(env, dto); - GOTO(out, rc); - } + /* if it's source stripe of migrating directory, don't create */ + if (!((lo->ldo_dir_hash_type & LMV_HASH_FLAG_MIGRATION) && + i >= lo->ldo_dir_migrate_offset)) { + dt_write_lock(env, dto, DT_TGT_CHILD); + rc = lod_sub_create(env, dto, attr, NULL, dof, th); + if (rc != 0) { + dt_write_unlock(env, dto); + GOTO(out, rc); + } - rc = lod_sub_ref_add(env, dto, th); - dt_write_unlock(env, dto); - if (rc != 0) - GOTO(out, rc); + rc = lod_sub_ref_add(env, dto, th); + dt_write_unlock(env, dto); + if (rc != 0) + GOTO(out, rc); - rec->rec_fid = lu_object_fid(&dto->do_lu); - rc = lod_sub_insert(env, dto, (const struct dt_rec *)rec, - (const struct dt_key *)dot, th, 0); - if (rc != 0) - GOTO(out, rc); + rec->rec_fid = lu_object_fid(&dto->do_lu); + rc = lod_sub_insert(env, dto, + (const struct dt_rec *)rec, + (const struct dt_key *)dot, th); + if (rc != 0) + GOTO(out, rc); + } rec->rec_fid = lu_object_fid(&dt->do_lu); rc = lod_sub_insert(env, dto, (struct dt_rec *)rec, - (const struct dt_key *)dotdot, th, 0); + (const struct dt_key *)dotdot, th); if (rc != 0) GOTO(out, rc); @@ -3404,7 +4013,7 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, rec->rec_fid = lu_object_fid(&dto->do_lu); rc = lod_sub_insert(env, dt_object_child(dt), (const struct dt_rec *)rec, - (const struct dt_key *)stripe_name, th, 0); + (const struct dt_key *)stripe_name, th); if (rc != 0) GOTO(out, rc); @@ -3496,12 +4105,34 @@ static int lod_dir_striping_create_internal(const struct lu_env *env, th); if (rc != 0) RETURN(rc); + } else { + /* foreign LMV EA case */ + if (lmu) { + struct lmv_foreign_md *lfm = lmu->lb_buf; + + if (lfm->lfm_magic == LMV_MAGIC_FOREIGN) { + rc = lod_declare_xattr_set_lmv(env, dt, attr, + lmu, dof, th); + } + } else { + if (lo->ldo_dir_is_foreign) { + LASSERT(lo->ldo_foreign_lmv != NULL && + lo->ldo_foreign_lmv_size > 0); + info->lti_buf.lb_buf = lo->ldo_foreign_lmv; + info->lti_buf.lb_len = lo->ldo_foreign_lmv_size; + lmu = &info->lti_buf; + rc = lod_xattr_set_lmv(env, dt, lmu, + XATTR_NAME_LMV, 0, th); + } + } } /* Transfer default LMV striping from the parent */ if (lds != NULL && lds->lds_dir_def_striping_set && - !LMVEA_DELETE_VALUES(lds->lds_dir_def_stripe_count, - lds->lds_dir_def_stripe_offset)) { + !(LMVEA_DELETE_VALUES(lds->lds_dir_def_stripe_count, + lds->lds_dir_def_stripe_offset) && + le32_to_cpu(lds->lds_dir_def_hash_type) != + LMV_HASH_TYPE_UNKNOWN)) { struct lmv_user_md_v1 *v1 = info->lti_ea_store; if (info->lti_ea_store_size < sizeof(*v1)) { @@ -3617,7 +4248,7 @@ static int lod_generate_and_set_lovea(const struct lu_env *env, LASSERT(lo); - if (lo->ldo_comp_cnt == 0) { + if (lo->ldo_comp_cnt == 0 && !lo->ldo_is_foreign) { lod_striping_free(env, lo); rc = lod_sub_xattr_del(env, next, XATTR_NAME_LOV, th); RETURN(rc); @@ -3642,37 +4273,195 @@ static int lod_generate_and_set_lovea(const struct lu_env *env, RETURN(rc); } +static __u32 lod_gen_component_id(struct lod_object *lo, + int mirror_id, int comp_idx); + /** - * Delete layout component(s) + * Repeat an existing component + * + * Creates a new layout by replicating an existing component. Uses striping + * policy from previous component as a template for the striping for the new + * new component. + * + * New component starts with zero length, will be extended (or removed) before + * returning layout to client. + * + * NB: Reallocates layout components array (lo->ldo_comp_entries), invalidating + * any pre-existing pointers to components. Handle with care. * * \param[in] env execution environment for this thread - * \param[in] dt object - * \param[in] th transaction handle + * \param[in,out] lo object to update the layout of + * \param[in] index index of component to copy * * \retval 0 on success - * \retval negative error number on failure + * \retval negative errno on error */ -static int lod_layout_del(const struct lu_env *env, struct dt_object *dt, - struct thandle *th) +static int lod_layout_repeat_comp(const struct lu_env *env, + struct lod_object *lo, int index) { - struct lod_layout_component *lod_comp; - struct lod_object *lo = lod_dt_obj(dt); - struct dt_object *next = dt_object_child(dt); - struct lu_attr *attr = &lod_env_info(env)->lti_attr; - int rc, i, j, left; + struct lod_layout_component *lod_comp; + struct lod_layout_component *new_comp = NULL; + struct lod_layout_component *comp_array; + int rc = 0, i, new_cnt = lo->ldo_comp_cnt + 1; + __u16 mirror_id; + int offset = 0; + ENTRY; + + lod_comp = &lo->ldo_comp_entries[index]; + LASSERT(lod_comp_inited(lod_comp) && lod_comp->llc_id != LCME_ID_INVAL); + + CDEBUG(D_LAYOUT, "repeating component %d\n", index); + + OBD_ALLOC(comp_array, sizeof(*comp_array) * new_cnt); + if (comp_array == NULL) + GOTO(out, rc = -ENOMEM); + + for (i = 0; i < lo->ldo_comp_cnt; i++) { + memcpy(&comp_array[i + offset], &lo->ldo_comp_entries[i], + sizeof(*comp_array)); + + /* Duplicate this component in to the next slot */ + if (i == index) { + new_comp = &comp_array[i + 1]; + memcpy(&comp_array[i + 1], &lo->ldo_comp_entries[i], + sizeof(*comp_array)); + /* We must now skip this new component when copying */ + offset = 1; + } + } + + /* Set up copied component */ + new_comp->llc_flags &= ~LCME_FL_INIT; + new_comp->llc_stripe = NULL; + new_comp->llc_stripes_allocated = 0; + new_comp->llc_stripe_offset = LOV_OFFSET_DEFAULT; + /* for uninstantiated components, layout gen stores default stripe + * offset */ + new_comp->llc_layout_gen = lod_comp->llc_stripe_offset; + /* This makes the repeated component zero-length, placed at the end of + * the preceding component */ + new_comp->llc_extent.e_start = new_comp->llc_extent.e_end; + new_comp->llc_timestamp = lod_comp->llc_timestamp; + new_comp->llc_pool = NULL; + + rc = lod_set_pool(&new_comp->llc_pool, lod_comp->llc_pool); + if (rc) + GOTO(out, rc); + + if (new_comp->llc_ostlist.op_array) { + __u32 *op_array = NULL; + + OBD_ALLOC(op_array, new_comp->llc_ostlist.op_size); + if (!op_array) + GOTO(out, rc = -ENOMEM); + memcpy(op_array, &new_comp->llc_ostlist.op_array, + new_comp->llc_ostlist.op_size); + new_comp->llc_ostlist.op_array = op_array; + } + + OBD_FREE(lo->ldo_comp_entries, + sizeof(*comp_array) * lo->ldo_comp_cnt); + lo->ldo_comp_entries = comp_array; + lo->ldo_comp_cnt = new_cnt; + + /* Generate an id for the new component */ + mirror_id = mirror_id_of(new_comp->llc_id); + new_comp->llc_id = LCME_ID_INVAL; + new_comp->llc_id = lod_gen_component_id(lo, mirror_id, index + 1); + if (new_comp->llc_id == LCME_ID_INVAL) + GOTO(out, rc = -ERANGE); + + EXIT; +out: + if (rc) + OBD_FREE(comp_array, sizeof(*comp_array) * new_cnt); + + return rc; +} + +static int lod_layout_data_init(struct lod_thread_info *info, __u32 comp_cnt) +{ + ENTRY; + + /* clear memory region that will be used for layout change */ + memset(&info->lti_layout_attr, 0, sizeof(struct lu_attr)); + info->lti_count = 0; + + if (info->lti_comp_size >= comp_cnt) + RETURN(0); + + if (info->lti_comp_size > 0) { + OBD_FREE(info->lti_comp_idx, + info->lti_comp_size * sizeof(__u32)); + info->lti_comp_size = 0; + } + + OBD_ALLOC(info->lti_comp_idx, comp_cnt * sizeof(__u32)); + if (!info->lti_comp_idx) + RETURN(-ENOMEM); + + info->lti_comp_size = comp_cnt; + RETURN(0); +} + +/** + * Prepare new layout minus deleted components + * + * Removes components marked for deletion (LCME_ID_INVAL) by copying to a new + * layout and skipping those components. Removes stripe objects if any exist. + * + * NB: + * Reallocates layout components array (lo->ldo_comp_entries), invalidating + * any pre-existing pointers to components. + * + * Caller is responsible for updating mirror end (ldo_mirror[].lme_end). + * + * \param[in] env execution environment for this thread + * \param[in,out] lo object to update the layout of + * \param[in] th transaction handle for this operation + * + * \retval # of components deleted + * \retval negative errno on error + */ +static int lod_layout_del_prep_layout(const struct lu_env *env, + struct lod_object *lo, + struct thandle *th) +{ + struct lod_layout_component *lod_comp; + struct lod_thread_info *info = lod_env_info(env); + int rc = 0, i, j, deleted = 0; + + ENTRY; LASSERT(lo->ldo_is_composite); LASSERT(lo->ldo_comp_cnt > 0 && lo->ldo_comp_entries != NULL); - left = lo->ldo_comp_cnt; - for (i = (lo->ldo_comp_cnt - 1); i >= 0; i--) { + rc = lod_layout_data_init(info, lo->ldo_comp_cnt); + if (rc) + RETURN(rc); + + for (i = 0; i < lo->ldo_comp_cnt; i++) { lod_comp = &lo->ldo_comp_entries[i]; - if (lod_comp->llc_id != LCME_ID_INVAL) - break; - left--; + if (lod_comp->llc_id != LCME_ID_INVAL) { + /* Build array of things to keep */ + info->lti_comp_idx[info->lti_count++] = i; + continue; + } - /* Not instantiated component */ + lod_obj_set_pool(lo, i, NULL); + if (lod_comp->llc_ostlist.op_array) { + OBD_FREE(lod_comp->llc_ostlist.op_array, + lod_comp->llc_ostlist.op_size); + lod_comp->llc_ostlist.op_array = NULL; + lod_comp->llc_ostlist.op_size = 0; + } + + deleted++; + CDEBUG(D_LAYOUT, "deleting comp %d, left %d\n", i, + lo->ldo_comp_cnt - deleted); + + /* No striping info for this component */ if (lod_comp->llc_stripe == NULL) continue; @@ -3682,9 +4471,14 @@ static int lod_layout_del(const struct lu_env *env, struct dt_object *dt, if (obj == NULL) continue; - rc = lod_sub_destroy(env, obj, th); - if (rc) - GOTO(out, rc); + + /* components which are not init have no sub objects + * to destroy */ + if (lod_comp_inited(lod_comp)) { + rc = lod_sub_destroy(env, obj, th); + if (rc) + GOTO(out, rc); + } lu_object_put(env, &obj->do_lu); lod_comp->llc_stripe[j] = NULL; @@ -3696,38 +4490,73 @@ static int lod_layout_del(const struct lu_env *env, struct dt_object *dt, sizeof(__u32) * lod_comp->llc_stripes_allocated); lod_comp->llc_ost_indices = NULL; lod_comp->llc_stripes_allocated = 0; - lod_obj_set_pool(lo, i, NULL); - if (lod_comp->llc_ostlist.op_array) { - OBD_FREE(lod_comp->llc_ostlist.op_array, - lod_comp->llc_ostlist.op_size); - lod_comp->llc_ostlist.op_array = NULL; - lod_comp->llc_ostlist.op_size = 0; - } } - LASSERTF(left >= 0 && left < lo->ldo_comp_cnt, "left = %d\n", left); - if (left > 0) { - struct lod_layout_component *comp_array; + /* info->lti_count has the amount of left components */ + LASSERTF(info->lti_count >= 0 && info->lti_count < lo->ldo_comp_cnt, + "left = %d, lo->ldo_comp_cnt %d\n", (int)info->lti_count, + (int)lo->ldo_comp_cnt); + + if (info->lti_count > 0) { + struct lod_layout_component *comp_array; - OBD_ALLOC(comp_array, sizeof(*comp_array) * left); + OBD_ALLOC(comp_array, sizeof(*comp_array) * info->lti_count); if (comp_array == NULL) GOTO(out, rc = -ENOMEM); - memcpy(&comp_array[0], &lo->ldo_comp_entries[0], - sizeof(*comp_array) * left); + for (i = 0; i < info->lti_count; i++) { + memcpy(&comp_array[i], + &lo->ldo_comp_entries[info->lti_comp_idx[i]], + sizeof(*comp_array)); + } OBD_FREE(lo->ldo_comp_entries, sizeof(*comp_array) * lo->ldo_comp_cnt); lo->ldo_comp_entries = comp_array; - lo->ldo_comp_cnt = left; - - LASSERT(lo->ldo_mirror_count == 1); - lo->ldo_mirrors[0].lme_end = left - 1; - lod_obj_inc_layout_gen(lo); + lo->ldo_comp_cnt = info->lti_count; } else { lod_free_comp_entries(lo); } + EXIT; +out: + return rc ? rc : deleted; +} + +/** + * Delete layout component(s) + * + * This function sets up the layout data in the env and does the setattrs + * required to write out the new layout. The layout itself is modified in + * lod_layout_del_prep_layout. + * + * \param[in] env execution environment for this thread + * \param[in] dt object + * \param[in] th transaction handle + * + * \retval 0 on success + * \retval negative error number on failure + */ +static int lod_layout_del(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) +{ + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object *next = dt_object_child(dt); + struct lu_attr *attr = &lod_env_info(env)->lti_attr; + int rc; + + LASSERT(lo->ldo_mirror_count == 1); + + rc = lod_layout_del_prep_layout(env, lo, th); + if (rc < 0) + GOTO(out, rc); + + /* Only do this if we didn't delete all components */ + if (lo->ldo_comp_cnt > 0) { + lo->ldo_mirrors[0].lme_end = lo->ldo_comp_cnt - 1; + lod_obj_inc_layout_gen(lo); + } + LASSERT(dt_object_exists(dt)); rc = dt_attr_get(env, next, attr); if (rc) @@ -3752,7 +4581,8 @@ out: static int lod_get_default_lov_striping(const struct lu_env *env, struct lod_object *lo, - struct lod_default_striping *lds); + struct lod_default_striping *lds, + struct dt_allocation_hint *ah); /** * Implementation of dt_object_operations::do_xattr_set. * @@ -3781,27 +4611,35 @@ static int lod_xattr_set(const struct lu_env *env, if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && strcmp(name, XATTR_NAME_LMV) == 0) { - struct lmv_mds_md_v1 *lmm = buf->lb_buf; + rc = lod_dir_striping_create(env, dt, NULL, NULL, th); + RETURN(rc); + } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && + strncmp(name, XATTR_NAME_LMV, strlen(XATTR_NAME_LMV)) == 0 && + strlen(name) > strlen(XATTR_NAME_LMV) + 1) { + const char *op = name + strlen(XATTR_NAME_LMV) + 1; - if (lmm != NULL && le32_to_cpu(lmm->lmv_hash_type) & - LMV_HASH_FLAG_MIGRATION) - rc = lod_sub_xattr_set(env, next, buf, name, fl, th); - else - rc = lod_dir_striping_create(env, dt, NULL, NULL, th); + rc = -ENOTSUPP; + /* + * XATTR_NAME_LMV".add" is never called, but only declared, + * because lod_xattr_set_lmv() will do the addition. + */ + if (strcmp(op, "del") == 0) + rc = lod_dir_layout_delete(env, dt, buf, th); + else if (strcmp(op, "set") == 0) + rc = lod_sub_xattr_set(env, next, buf, XATTR_NAME_LMV, + fl, th); RETURN(rc); - } - - if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && + } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && strcmp(name, XATTR_NAME_LOV) == 0) { - struct lod_thread_info *info = lod_env_info(env); - struct lod_default_striping *lds = &info->lti_def_striping; + struct lod_default_striping *lds = lod_lds_buf_get(env); struct lov_user_md_v1 *v1 = buf->lb_buf; char pool[LOV_MAXPOOLNAME + 1]; bool is_del; /* get existing striping config */ - rc = lod_get_default_lov_striping(env, lod_dt_obj(dt), lds); + rc = lod_get_default_lov_striping(env, lod_dt_obj(dt), lds, + NULL); if (rc) RETURN(rc); @@ -3898,7 +4736,7 @@ static int lod_xattr_set(const struct lu_env *env, } RETURN(rc); } else if (strcmp(name, XATTR_NAME_FID) == 0) { - rc = lod_replace_parent_fid(env, dt, th, false); + rc = lod_replace_parent_fid(env, dt, buf, th, false); RETURN(rc); } @@ -3919,19 +4757,26 @@ static int lod_declare_xattr_del(const struct lu_env *env, struct dt_object *dt, const char *name, struct thandle *th) { - struct lod_object *lo = lod_dt_obj(dt); - int rc; - int i; + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object *next = dt_object_child(dt); + int i; + int rc; ENTRY; - rc = lod_sub_declare_xattr_del(env, dt_object_child(dt), name, th); + rc = lod_sub_declare_xattr_del(env, next, name, th); if (rc != 0) RETURN(rc); if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) RETURN(0); - /* set xattr to each stripes, if needed */ + /* NB: don't delete stripe LMV, because when we do this, normally we + * will remove stripes, besides, if directory LMV is corrupt, this will + * prevent deleting its LMV and fixing it (via LFSCK). + */ + if (!strcmp(name, XATTR_NAME_LMV)) + RETURN(0); + rc = lod_striping_load(env, lo); if (rc != 0) RETURN(rc); @@ -3940,9 +4785,12 @@ static int lod_declare_xattr_del(const struct lu_env *env, RETURN(0); for (i = 0; i < lo->ldo_dir_stripe_count; i++) { - LASSERT(lo->ldo_stripe[i]); - rc = lod_sub_declare_xattr_del(env, lo->ldo_stripe[i], - name, th); + struct dt_object *dto = lo->ldo_stripe[i]; + + if (!dto) + continue; + + rc = lod_sub_declare_xattr_del(env, dto, name, th); if (rc != 0) break; } @@ -3967,20 +4815,26 @@ static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt, int i; ENTRY; - if (!strcmp(name, XATTR_NAME_LOV)) + if (!strcmp(name, XATTR_NAME_LOV) || !strcmp(name, XATTR_NAME_LMV)) lod_striping_free(env, lod_dt_obj(dt)); rc = lod_sub_xattr_del(env, next, name, th); if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr)) RETURN(rc); + if (!strcmp(name, XATTR_NAME_LMV)) + RETURN(0); + if (lo->ldo_dir_stripe_count == 0) RETURN(0); for (i = 0; i < lo->ldo_dir_stripe_count; i++) { - LASSERT(lo->ldo_stripe[i]); + struct dt_object *dto = lo->ldo_stripe[i]; - rc = lod_sub_xattr_del(env, lo->ldo_stripe[i], name, th); + if (!dto) + continue; + + rc = lod_sub_xattr_del(env, dto, name, th); if (rc != 0) break; } @@ -4005,6 +4859,52 @@ static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fi return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE); } +/** + * Copy OST list from layout provided by user. + * + * \param[in] lod_comp layout_component to be filled + * \param[in] v3 LOV EA V3 user data + * + * \retval 0 on success + * \retval negative if failed + */ +int lod_comp_copy_ost_lists(struct lod_layout_component *lod_comp, + struct lov_user_md_v3 *v3) +{ + int j; + + ENTRY; + + if (v3->lmm_stripe_offset == LOV_OFFSET_DEFAULT) + v3->lmm_stripe_offset = v3->lmm_objects[0].l_ost_idx; + + if (lod_comp->llc_ostlist.op_array) { + if (lod_comp->llc_ostlist.op_size >= + v3->lmm_stripe_count * sizeof(__u32)) { + lod_comp->llc_ostlist.op_count = + v3->lmm_stripe_count; + goto skip; + } + OBD_FREE(lod_comp->llc_ostlist.op_array, + lod_comp->llc_ostlist.op_size); + } + + /* copy ost list from lmm */ + lod_comp->llc_ostlist.op_count = v3->lmm_stripe_count; + lod_comp->llc_ostlist.op_size = v3->lmm_stripe_count * sizeof(__u32); + OBD_ALLOC(lod_comp->llc_ostlist.op_array, + lod_comp->llc_ostlist.op_size); + if (!lod_comp->llc_ostlist.op_array) + RETURN(-ENOMEM); +skip: + for (j = 0; j < v3->lmm_stripe_count; j++) { + lod_comp->llc_ostlist.op_array[j] = + v3->lmm_objects[j].l_ost_idx; + } + + RETURN(0); +} + /** * Get default striping. @@ -4018,19 +4918,19 @@ static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fi */ static int lod_get_default_lov_striping(const struct lu_env *env, struct lod_object *lo, - struct lod_default_striping *lds) + struct lod_default_striping *lds, + struct dt_allocation_hint *ah) { struct lod_thread_info *info = lod_env_info(env); struct lov_user_md_v1 *v1 = NULL; struct lov_user_md_v3 *v3 = NULL; struct lov_comp_md_v1 *comp_v1 = NULL; - __u16 comp_cnt; - __u16 mirror_cnt; - bool composite; - int rc, i; - ENTRY; + __u16 comp_cnt; + __u16 mirror_cnt; + bool composite; + int rc, i, j; - lds->lds_def_striping_set = 0; + ENTRY; rc = lod_get_lov_ea(env, lo); if (rc < 0) @@ -4045,16 +4945,26 @@ static int lod_get_default_lov_striping(const struct lu_env *env, } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3)) { v3 = (struct lov_user_md_v3 *)v1; lustre_swab_lov_user_md_v3(v3); - } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_COMP_V1)) { + } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_SPECIFIC)) { + v3 = (struct lov_user_md_v3 *)v1; + lustre_swab_lov_user_md_v3(v3); + lustre_swab_lov_user_md_objects(v3->lmm_objects, + v3->lmm_stripe_count); + } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_COMP_V1) || + v1->lmm_magic == __swab32(LOV_USER_MAGIC_SEL)) { comp_v1 = (struct lov_comp_md_v1 *)v1; lustre_swab_lov_comp_md_v1(comp_v1); } if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1 && - v1->lmm_magic != LOV_MAGIC_COMP_V1) + v1->lmm_magic != LOV_MAGIC_COMP_V1 && + v1->lmm_magic != LOV_MAGIC_SEL && + v1->lmm_magic != LOV_USER_MAGIC_SPECIFIC) RETURN(-ENOTSUPP); - if (v1->lmm_magic == LOV_MAGIC_COMP_V1) { + if ((v1->lmm_magic == LOV_MAGIC_COMP_V1 || + v1->lmm_magic == LOV_MAGIC_SEL) && + !(ah && ah->dah_append_stripes)) { comp_v1 = (struct lov_comp_md_v1 *)v1; comp_cnt = comp_v1->lcm_entry_count; if (comp_cnt == 0) @@ -4078,7 +4988,6 @@ static int lod_get_default_lov_striping(const struct lu_env *env, for (i = 0; i < comp_cnt; i++) { struct lod_layout_component *lod_comp; - struct lu_extent *ext; char *pool; lod_comp = &lds->lds_def_comp_entries[i]; @@ -4092,36 +5001,55 @@ static int lod_get_default_lov_striping(const struct lu_env *env, if (composite) { v1 = (struct lov_user_md *)((char *)comp_v1 + comp_v1->lcm_entries[i].lcme_offset); - ext = &comp_v1->lcm_entries[i].lcme_extent; - lod_comp->llc_extent = *ext; + lod_comp->llc_extent = + comp_v1->lcm_entries[i].lcme_extent; + /* We only inherit certain flags from the layout */ + lod_comp->llc_flags = + comp_v1->lcm_entries[i].lcme_flags & + LCME_TEMPLATE_FLAGS; } - if (v1->lmm_pattern != LOV_PATTERN_RAID0 && - v1->lmm_pattern != LOV_PATTERN_MDT && - v1->lmm_pattern != 0) { + if (!lov_pattern_supported(v1->lmm_pattern) && + !(v1->lmm_pattern & LOV_PATTERN_F_RELEASED)) { lod_free_def_comp_entries(lds); RETURN(-EINVAL); } - CDEBUG(D_LAYOUT, DFID" stripe_count=%d stripe_size=%d " - "stripe_offset=%d\n", + CDEBUG(D_LAYOUT, DFID" stripe_count=%d stripe_size=%d stripe_offset=%d append_stripes=%d\n", PFID(lu_object_fid(&lo->ldo_obj.do_lu)), (int)v1->lmm_stripe_count, (int)v1->lmm_stripe_size, - (int)v1->lmm_stripe_offset); + (int)v1->lmm_stripe_offset, + ah ? ah->dah_append_stripes : 0); - lod_comp->llc_stripe_count = v1->lmm_stripe_count; + if (ah && ah->dah_append_stripes) + lod_comp->llc_stripe_count = ah->dah_append_stripes; + else + lod_comp->llc_stripe_count = v1->lmm_stripe_count; lod_comp->llc_stripe_size = v1->lmm_stripe_size; lod_comp->llc_stripe_offset = v1->lmm_stripe_offset; lod_comp->llc_pattern = v1->lmm_pattern; pool = NULL; - if (v1->lmm_magic == LOV_USER_MAGIC_V3) { + if (ah && ah->dah_append_pool && ah->dah_append_pool[0]) { + pool = ah->dah_append_pool; + } else if (v1->lmm_magic == LOV_USER_MAGIC_V3) { /* XXX: sanity check here */ v3 = (struct lov_user_md_v3 *) v1; if (v3->lmm_pool_name[0] != '\0') pool = v3->lmm_pool_name; } lod_set_def_pool(lds, i, pool); + if (v1->lmm_magic == LOV_USER_MAGIC_SPECIFIC) { + v3 = (struct lov_user_md_v3 *)v1; + rc = lod_comp_copy_ost_lists(lod_comp, v3); + if (rc) + RETURN(rc); + } else if (lod_comp->llc_ostlist.op_array && + lod_comp->llc_ostlist.op_count) { + for (j = 0; j < lod_comp->llc_ostlist.op_count; j++) + lod_comp->llc_ostlist.op_array[j] = -1; + lod_comp->llc_ostlist.op_count = 0; + } } lds->lds_def_striping_set = 1; @@ -4142,27 +5070,30 @@ static int lod_get_default_lmv_striping(const struct lu_env *env, struct lod_object *lo, struct lod_default_striping *lds) { - struct lod_thread_info *info = lod_env_info(env); - struct lmv_user_md_v1 *v1 = NULL; - int rc; - ENTRY; + struct lmv_user_md *lmu; + int rc; lds->lds_dir_def_striping_set = 0; + rc = lod_get_default_lmv_ea(env, lo); if (rc < 0) - RETURN(rc); + return rc; - if (rc < (typeof(rc))sizeof(struct lmv_user_md)) - RETURN(0); + if (rc >= (int)sizeof(*lmu)) { + struct lod_thread_info *info = lod_env_info(env); - v1 = info->lti_ea_store; + lmu = info->lti_ea_store; - lds->lds_dir_def_stripe_count = le32_to_cpu(v1->lum_stripe_count); - lds->lds_dir_def_stripe_offset = le32_to_cpu(v1->lum_stripe_offset); - lds->lds_dir_def_hash_type = le32_to_cpu(v1->lum_hash_type); - lds->lds_dir_def_striping_set = 1; + lds->lds_dir_def_stripe_count = + le32_to_cpu(lmu->lum_stripe_count); + lds->lds_dir_def_stripe_offset = + le32_to_cpu(lmu->lum_stripe_offset); + lds->lds_dir_def_hash_type = + le32_to_cpu(lmu->lum_hash_type); + lds->lds_dir_def_striping_set = 1; + } - RETURN(0); + return 0; } /** @@ -4183,7 +5114,7 @@ static int lod_get_default_striping(const struct lu_env *env, { int rc, rc1; - rc = lod_get_default_lov_striping(env, lo, lds); + rc = lod_get_default_lov_striping(env, lo, lds, NULL); rc1 = lod_get_default_lmv_striping(env, lo, lds); if (rc == 0 && rc1 < 0) rc = rc1; @@ -4206,10 +5137,11 @@ static void lod_striping_from_default(struct lod_object *lo, umode_t mode) { struct lod_device *d = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); - struct lov_desc *desc = &d->lod_desc; int i, rc; if (lds->lds_def_striping_set && S_ISREG(mode)) { + struct lov_desc *desc = &d->lod_ost_descs.ltd_lov_desc; + rc = lod_alloc_comp_entries(lo, lds->lds_def_mirror_cnt, lds->lds_def_comp_cnt); if (rc != 0) @@ -4225,8 +5157,9 @@ static void lod_striping_from_default(struct lod_object *lo, struct lod_layout_component *def_comp = &lds->lds_def_comp_entries[i]; - CDEBUG(D_LAYOUT, "Inherite from default: size:%hu " - "nr:%u offset:%u pattern %#x %s\n", + CDEBUG(D_LAYOUT, "Inherit from default: flags=%#x " + "size=%hu nr=%u offset=%u pattern=%#x pool=%s\n", + def_comp->llc_flags, def_comp->llc_stripe_size, def_comp->llc_stripe_count, def_comp->llc_stripe_offset, @@ -4240,6 +5173,20 @@ static void lod_striping_from_default(struct lod_object *lo, lod_obj_set_pool(lo, i, def_comp->llc_pool); } + /* copy ost list */ + if (def_comp->llc_ostlist.op_array && + def_comp->llc_ostlist.op_count) { + OBD_ALLOC(obj_comp->llc_ostlist.op_array, + obj_comp->llc_ostlist.op_size); + if (!obj_comp->llc_ostlist.op_array) + return; + memcpy(obj_comp->llc_ostlist.op_array, + def_comp->llc_ostlist.op_array, + obj_comp->llc_ostlist.op_size); + } else if (def_comp->llc_ostlist.op_array) { + obj_comp->llc_ostlist.op_array = NULL; + } + /* * Don't initialize these fields for plain layout * (v1/v3) here, they are inherited in the order of @@ -4251,7 +5198,7 @@ static void lod_striping_from_default(struct lod_object *lo, if (!lo->ldo_is_composite) continue; - lod_adjust_stripe_info(obj_comp, desc); + lod_adjust_stripe_info(obj_comp, desc, 0); } } else if (lds->lds_dir_def_striping_set && S_ISDIR(mode)) { if (lo->ldo_dir_stripe_count == 0) @@ -4270,7 +5217,8 @@ static void lod_striping_from_default(struct lod_object *lo, } } -static inline bool lod_need_inherit_more(struct lod_object *lo, bool from_root) +static inline bool lod_need_inherit_more(struct lod_object *lo, bool from_root, + char *append_pool) { struct lod_layout_component *lod_comp; @@ -4290,6 +5238,9 @@ static inline bool lod_need_inherit_more(struct lod_object *lo, bool from_root) lod_comp->llc_stripe_offset == LOV_OFFSET_DEFAULT)) return true; + if (append_pool && append_pool[0]) + return true; + return false; } @@ -4312,7 +5263,7 @@ static void lod_ah_init(const struct lu_env *env, { struct lod_device *d = lu2lod_dev(child->do_lu.lo_dev); struct lod_thread_info *info = lod_env_info(env); - struct lod_default_striping *lds = &info->lti_def_striping; + struct lod_default_striping *lds = lod_lds_buf_get(env); struct dt_object *nextp = NULL; struct dt_object *nextc; struct lod_object *lp = NULL; @@ -4324,6 +5275,10 @@ static void lod_ah_init(const struct lu_env *env, LASSERT(child); + if (ah->dah_append_stripes == -1) + ah->dah_append_stripes = + d->lod_ost_descs.ltd_lov_desc.ld_tgt_count; + if (likely(parent)) { nextp = dt_object_child(parent); lp = lod_dt_obj(parent); @@ -4347,13 +5302,30 @@ static void lod_ah_init(const struct lu_env *env, /* other default values are 0 */ lc->ldo_dir_stripe_offset = -1; - /* get default striping from parent object */ - if (likely(lp != NULL)) + /* no default striping configuration is needed for + * foreign dirs + */ + if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0 && + le32_to_cpu(lum1->lum_magic) == LMV_MAGIC_FOREIGN) { + lc->ldo_dir_is_foreign = true; + /* keep stripe_count 0 and stripe_offset -1 */ + CDEBUG(D_INFO, "no default striping for foreign dir\n"); + RETURN_EXIT; + } + + /* + * If parent object is not root directory, + * then get default striping from parent object. + */ + if (likely(lp != NULL)) { lod_get_default_striping(env, lp, lds); - /* set child default striping info, default value is NULL */ - if (lds->lds_def_striping_set || lds->lds_dir_def_striping_set) - lc->ldo_def_striping = lds; + /* inherit default striping except ROOT */ + if ((lds->lds_def_striping_set || + lds->lds_dir_def_striping_set) && + !fid_is_root(lod_object_fid(lp))) + lc->ldo_def_striping = lds; + } /* It should always honour the specified stripes */ /* Note: old client (< 2.7)might also do lfs mkdir, whose EA @@ -4377,19 +5349,19 @@ static void lod_ah_init(const struct lu_env *env, } else { /* transfer defaults LMV to new directory */ lod_striping_from_default(lc, lds, child_mode); + + /* set count 0 to create normal directory */ + if (lc->ldo_dir_stripe_count == 1) + lc->ldo_dir_stripe_count = 0; } /* shrink the stripe_count to the avaible MDT count */ if (lc->ldo_dir_stripe_count > d->lod_remote_mdt_count + 1 && - !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE)) + !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE)) { lc->ldo_dir_stripe_count = d->lod_remote_mdt_count + 1; - - /* Directory will be striped only if stripe_count > 1, if - * stripe_count == 1, let's reset stripe_count = 0 to avoid - * create single master stripe and also help to unify the - * stripe handling of directories and files */ - if (lc->ldo_dir_stripe_count == 1) - lc->ldo_dir_stripe_count = 0; + if (lc->ldo_dir_stripe_count == 1) + lc->ldo_dir_stripe_count = 0; + } CDEBUG(D_INFO, "final dir stripe [%hu %d %u]\n", lc->ldo_dir_stripe_count, @@ -4411,7 +5383,7 @@ static void lod_ah_init(const struct lu_env *env, * Try from the parent first. */ if (likely(lp != NULL)) { - rc = lod_get_default_lov_striping(env, lp, lds); + rc = lod_get_default_lov_striping(env, lp, lds, ah); if (rc == 0) lod_striping_from_default(lc, lds, child_mode); } @@ -4439,8 +5411,10 @@ static void lod_ah_init(const struct lu_env *env, * - parent has plain(v1/v3) default layout, and some attributes * are not specified in the default layout; */ - if (d->lod_md_root != NULL && lod_need_inherit_more(lc, true)) { - rc = lod_get_default_lov_striping(env, d->lod_md_root, lds); + if (d->lod_md_root != NULL && + lod_need_inherit_more(lc, true, ah->dah_append_pool)) { + rc = lod_get_default_lov_striping(env, d->lod_md_root, lds, + ah); if (rc) goto out; if (lc->ldo_comp_cnt == 0) { @@ -4458,7 +5432,8 @@ static void lod_ah_init(const struct lu_env *env, if (lod_comp->llc_stripe_size <= 0) lod_comp->llc_stripe_size = def_comp->llc_stripe_size; - if (lod_comp->llc_stripe_offset == LOV_OFFSET_DEFAULT) + if (lod_comp->llc_stripe_offset == LOV_OFFSET_DEFAULT && + (!lod_comp->llc_pool || !lod_comp->llc_pool[0])) lod_comp->llc_stripe_offset = def_comp->llc_stripe_offset; if (lod_comp->llc_pool == NULL) @@ -4470,7 +5445,7 @@ out: * fs default striping may not be explicitly set, or historically set * in config log, use them. */ - if (lod_need_inherit_more(lc, false)) { + if (lod_need_inherit_more(lc, false, ah->dah_append_pool)) { if (lc->ldo_comp_cnt == 0) { rc = lod_alloc_comp_entries(lc, 0, 1); if (rc) @@ -4483,8 +5458,10 @@ out: } LASSERT(!lc->ldo_is_composite); lod_comp = &lc->ldo_comp_entries[0]; - desc = &d->lod_desc; - lod_adjust_stripe_info(lod_comp, desc); + desc = &d->lod_ost_descs.ltd_lov_desc; + lod_adjust_stripe_info(lod_comp, desc, ah->dah_append_stripes); + if (ah->dah_append_pool && ah->dah_append_pool[0]) + lod_obj_set_pool(lc, 0, ah->dah_append_pool); } EXIT; @@ -4648,6 +5625,48 @@ out: RETURN(rc); } +/* + * Whether subdirectories under \a dt should be created on MDTs by space QoS + * + * If LMV_HASH_FLAG_SPACE is set on directory default layout, its subdirectories + * should be created on MDT by space QoS. + * + * \param[in] env execution environment + * \param[in] dev lu device + * \param[in] dt object + * + * \retval 1 if directory should create subdir by space usage + * \retval 0 if not + * \retval -ev if failed + */ +static inline int dt_object_qos_mkdir(const struct lu_env *env, + struct lu_device *dev, + struct dt_object *dt) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lu_object *obj; + struct lod_object *lo; + struct lmv_user_md *lmu; + int rc; + + obj = lu_object_find_slice(env, dev, lu_object_fid(&dt->do_lu), NULL); + if (IS_ERR(obj)) + return PTR_ERR(obj); + + lo = lu2lod_obj(obj); + + rc = lod_get_default_lmv_ea(env, lo); + dt_object_put(env, dt); + if (rc <= 0) + return rc; + + if (rc < (int)sizeof(*lmu)) + return -EINVAL; + + lmu = info->lti_ea_store; + return le32_to_cpu(lmu->lum_stripe_offset) == LMV_OFFSET_DEFAULT; +} + /** * Implementation of dt_object_operations::do_declare_create. * @@ -4711,24 +5730,31 @@ static int lod_declare_create(const struct lu_env *env, struct dt_object *dt, if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STALE_DIR_LAYOUT)) GOTO(out, rc = -EREMOTE); - if (lo->ldo_dir_stripe_offset == -1) { - /* child and parent should be in the same MDT */ - if (hint->dah_parent != NULL && - dt_object_remote(hint->dah_parent)) + if (lo->ldo_dir_stripe_offset == LMV_OFFSET_DEFAULT) { + struct lod_default_striping *lds; + + lds = lo->ldo_def_striping; + /* + * child and parent should be on the same MDT, + * but if parent has default LMV, and the start + * MDT offset is -1, it's allowed. This check + * is not necessary after 2.12.22 because client + * follows this already, but old client may not. + */ + if (hint->dah_parent && + dt_object_remote(hint->dah_parent) && lds && + lds->lds_dir_def_stripe_offset != + LMV_OFFSET_DEFAULT) GOTO(out, rc = -EREMOTE); } else if (lo->ldo_dir_stripe_offset != ss->ss_node_id) { struct lod_device *lod; - struct lod_tgt_descs *ltd; - struct lod_tgt_desc *tgt = NULL; + struct lu_tgt_desc *mdt = NULL; bool found_mdt = false; - int i; lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); - ltd = &lod->lod_mdt_descs; - cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) { - tgt = LTD_TGT(ltd, i); - if (tgt->ltd_index == + lod_foreach_mdt(lod, mdt) { + if (mdt->ltd_index == lo->ldo_dir_stripe_offset) { found_mdt = true; break; @@ -4837,18 +5863,31 @@ int lod_striped_create(const struct lu_env *env, struct dt_object *dt, int rc = 0, i, j; ENTRY; - LASSERT(lo->ldo_comp_cnt != 0 && lo->ldo_comp_entries != NULL); + LASSERT((lo->ldo_comp_cnt != 0 && lo->ldo_comp_entries != NULL) || + lo->ldo_is_foreign); - mirror_id = lo->ldo_mirror_count > 1 ? 1 : 0; + mirror_id = 0; /* non-flr file's mirror_id is 0 */ + if (lo->ldo_mirror_count > 1) { + for (i = 0; i < lo->ldo_comp_cnt; i++) { + lod_comp = &lo->ldo_comp_entries[i]; + if (lod_comp->llc_id != LCME_ID_INVAL && + mirror_id_of(lod_comp->llc_id) > mirror_id) + mirror_id = mirror_id_of(lod_comp->llc_id); + } + } /* create all underlying objects */ for (i = 0; i < lo->ldo_comp_cnt; i++) { lod_comp = &lo->ldo_comp_entries[i]; - if (lod_comp->llc_extent.e_start == 0 && i > 0) /* new mirror */ - ++mirror_id; - if (lod_comp->llc_id == LCME_ID_INVAL) { + /* only the component of FLR layout with more than 1 + * mirror has mirror ID in its component ID. + */ + if (lod_comp->llc_extent.e_start == 0 && + lo->ldo_mirror_count > 1) + ++mirror_id; + lod_comp->llc_id = lod_gen_component_id(lo, mirror_id, i); if (lod_comp->llc_id == LCME_ID_INVAL) @@ -4894,6 +5933,23 @@ out: RETURN(rc); } +static inline bool lod_obj_is_dom(struct dt_object *dt) +{ + struct lod_object *lo = lod_dt_obj(dt); + + if (!dt_object_exists(dt_object_child(dt))) + return false; + + if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) + return false; + + if (!lo->ldo_comp_cnt) + return false; + + return (lov_pattern(lo->ldo_comp_entries[0].llc_pattern) == + LOV_PATTERN_MDT); +} + /** * Implementation of dt_object_operations::do_create. * @@ -4916,7 +5972,8 @@ static int lod_create(const struct lu_env *env, struct dt_object *dt, RETURN(rc); if (S_ISREG(dt->do_lu.lo_header->loh_attr) && - lod_obj_is_striped(dt) && dof->u.dof_reg.striped != 0) { + (lod_obj_is_striped(dt) || lod_obj_is_dom(dt)) && + dof->u.dof_reg.striped != 0) { LASSERT(lod_dt_obj(dt)->ldo_comp_cached == 0); rc = lod_striped_create(env, dt, attr, dof, th); } @@ -4953,11 +6010,13 @@ lod_obj_stripe_destroy_cb(const struct lu_env *env, struct lod_object *lo, static int lod_declare_destroy(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { - struct dt_object *next = dt_object_child(dt); - struct lod_object *lo = lod_dt_obj(dt); + struct dt_object *next = dt_object_child(dt); + struct lod_object *lo = lod_dt_obj(dt); struct lod_thread_info *info = lod_env_info(env); - char *stripe_name = info->lti_key; - int rc, i; + struct dt_object *stripe; + char *stripe_name = info->lti_key; + int rc, i; + ENTRY; /* @@ -4977,13 +6036,17 @@ static int lod_declare_destroy(const struct lu_env *env, struct dt_object *dt, RETURN(rc); for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + stripe = lo->ldo_stripe[i]; + if (!stripe) + continue; + rc = lod_sub_declare_ref_del(env, next, th); if (rc != 0) RETURN(rc); - snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", - PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)), - i); + snprintf(stripe_name, sizeof(info->lti_key), + DFID":%d", + PFID(lu_object_fid(&stripe->do_lu)), i); rc = lod_sub_declare_delete(env, next, (const struct dt_key *)stripe_name, th); if (rc != 0) @@ -5008,14 +6071,18 @@ static int lod_declare_destroy(const struct lu_env *env, struct dt_object *dt, /* declare destroy all striped objects */ if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) { for (i = 0; i < lo->ldo_dir_stripe_count; i++) { - if (lo->ldo_stripe[i] == NULL) + stripe = lo->ldo_stripe[i]; + if (!stripe) continue; - rc = lod_sub_declare_ref_del(env, lo->ldo_stripe[i], - th); + if (!dt_object_exists(stripe)) + continue; - rc = lod_sub_declare_destroy(env, lo->ldo_stripe[i], - th); + rc = lod_sub_declare_ref_del(env, stripe, th); + if (rc != 0) + break; + + rc = lod_sub_declare_destroy(env, stripe, th); if (rc != 0) break; } @@ -5045,9 +6112,11 @@ static int lod_destroy(const struct lu_env *env, struct dt_object *dt, struct dt_object *next = dt_object_child(dt); struct lod_object *lo = lod_dt_obj(dt); struct lod_thread_info *info = lod_env_info(env); - char *stripe_name = info->lti_key; - unsigned int i; - int rc; + char *stripe_name = info->lti_key; + struct dt_object *stripe; + unsigned int i; + int rc; + ENTRY; /* destroy sub-stripe of master object */ @@ -5058,17 +6127,20 @@ static int lod_destroy(const struct lu_env *env, struct dt_object *dt, RETURN(rc); for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + stripe = lo->ldo_stripe[i]; + if (!stripe) + continue; + rc = lod_sub_ref_del(env, next, th); if (rc != 0) RETURN(rc); snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", - PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)), - i); + PFID(lu_object_fid(&stripe->do_lu)), i); CDEBUG(D_INFO, DFID" delete stripe %s "DFID"\n", PFID(lu_object_fid(&dt->do_lu)), stripe_name, - PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu))); + PFID(lu_object_fid(&stripe->do_lu))); rc = lod_sub_delete(env, next, (const struct dt_key *)stripe_name, th); @@ -5091,20 +6163,22 @@ static int lod_destroy(const struct lu_env *env, struct dt_object *dt, /* destroy all striped objects */ if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) { for (i = 0; i < lo->ldo_dir_stripe_count; i++) { - if (lo->ldo_stripe[i] == NULL) + stripe = lo->ldo_stripe[i]; + if (!stripe) continue; + + if (!dt_object_exists(stripe)) + continue; + if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) || i == cfs_fail_val) { - dt_write_lock(env, lo->ldo_stripe[i], - MOR_TGT_CHILD); - rc = lod_sub_ref_del(env, lo->ldo_stripe[i], - th); - dt_write_unlock(env, lo->ldo_stripe[i]); + dt_write_lock(env, stripe, DT_TGT_CHILD); + rc = lod_sub_ref_del(env, stripe, th); + dt_write_unlock(env, stripe); if (rc != 0) break; - rc = lod_sub_destroy(env, lo->ldo_stripe[i], - th); + rc = lod_sub_destroy(env, stripe, th); if (rc != 0) break; } @@ -5200,7 +6274,6 @@ static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt, RETURN(0); LASSERT(S_ISDIR(dt->do_lu.lo_header->loh_attr)); - LASSERT(lo->ldo_dir_stripe_count > 1); /* Note: for remote lock for single stripe dir, MDT will cancel * the lock by lockh directly */ LASSERT(!dt_object_remote(dt_object_child(dt))); @@ -5213,8 +6286,10 @@ static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt, * NB, ha_count may not equal to ldo_dir_stripe_count, because dir * layout may change, e.g., shrink dir layout after migration. */ - for (i = 0; i < lo->ldo_dir_stripe_count; i++) - dt_invalidate(env, lo->ldo_stripe[i]); + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + if (lo->ldo_stripe[i]) + dt_invalidate(env, lo->ldo_stripe[i]); + } slave_locks_size = offsetof(typeof(*slave_locks), ha_handles[slave_locks->ha_count]); @@ -5255,131 +6330,655 @@ static int lod_object_lock(const struct lu_env *env, if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) RETURN(-ENOTDIR); - rc = lod_striping_load(env, lo); - if (rc != 0) - RETURN(rc); + rc = lod_striping_load(env, lo); + if (rc != 0) + RETURN(rc); + + /* No stripes */ + if (lo->ldo_dir_stripe_count <= 1) + RETURN(0); + + slave_locks_size = offsetof(typeof(*slave_locks), + ha_handles[lo->ldo_dir_stripe_count]); + /* Freed in lod_object_unlock */ + OBD_ALLOC(slave_locks, slave_locks_size); + if (!slave_locks) + RETURN(-ENOMEM); + slave_locks->ha_count = lo->ldo_dir_stripe_count; + + /* striped directory lock */ + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + struct lustre_handle lockh; + struct ldlm_res_id *res_id; + struct dt_object *stripe; + + stripe = lo->ldo_stripe[i]; + if (!stripe) + continue; + + res_id = &lod_env_info(env)->lti_res_id; + fid_build_reg_res_name(lu_object_fid(&stripe->do_lu), res_id); + einfo->ei_res_id = res_id; + + if (dt_object_remote(stripe)) { + set_bit(i, (void *)slave_locks->ha_map); + rc = dt_object_lock(env, stripe, &lockh, einfo, policy); + } else { + struct ldlm_namespace *ns = einfo->ei_namespace; + ldlm_blocking_callback blocking = einfo->ei_cb_local_bl; + ldlm_completion_callback completion = einfo->ei_cb_cp; + __u64 dlmflags = LDLM_FL_ATOMIC_CB; + + if (einfo->ei_mode == LCK_PW || + einfo->ei_mode == LCK_EX) + dlmflags |= LDLM_FL_COS_INCOMPAT; + + LASSERT(ns != NULL); + rc = ldlm_cli_enqueue_local(env, ns, res_id, LDLM_IBITS, + policy, einfo->ei_mode, + &dlmflags, blocking, + completion, NULL, + NULL, 0, LVB_T_NONE, + NULL, &lockh); + } + if (rc) { + while (i--) + ldlm_lock_decref_and_cancel( + &slave_locks->ha_handles[i], + einfo->ei_mode); + OBD_FREE(slave_locks, slave_locks_size); + RETURN(rc); + } + slave_locks->ha_handles[i] = lockh; + } + einfo->ei_cbdata = slave_locks; + + RETURN(0); +} + +/** + * Implementation of dt_object_operations::do_invalidate. + * + * \see dt_object_operations::do_invalidate() in the API description for details + */ +static int lod_invalidate(const struct lu_env *env, struct dt_object *dt) +{ + return dt_invalidate(env, dt_object_child(dt)); +} + +static int lod_declare_instantiate_components(const struct lu_env *env, + struct lod_object *lo, struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + int i; + int rc = 0; + ENTRY; + + LASSERT(info->lti_count < lo->ldo_comp_cnt); + + for (i = 0; i < info->lti_count; i++) { + rc = lod_qos_prep_create(env, lo, NULL, th, + info->lti_comp_idx[i]); + if (rc) + break; + } + + if (!rc) { + info->lti_buf.lb_len = lod_comp_md_size(lo, false); + rc = lod_sub_declare_xattr_set(env, lod_object_child(lo), + &info->lti_buf, XATTR_NAME_LOV, 0, th); + } + + RETURN(rc); +} + +/** + * Check OSTs for an existing component for further extension + * + * Checks if OSTs are still healthy and not out of space. Gets free space + * on OSTs (relative to allocation watermark rmb_low) and compares to + * the proposed new_end for this component. + * + * Decides whether or not to extend a component on its current OSTs. + * + * \param[in] env execution environment for this thread + * \param[in] lo object we're checking + * \param[in] index index of this component + * \param[in] extension_size extension size for this component + * \param[in] extent layout extent for requested operation + * \param[in] comp_extent extension component extent + * \param[in] write if this is write operation + * + * \retval true - OK to extend on current OSTs + * \retval false - do not extend on current OSTs + */ +static bool lod_sel_osts_allowed(const struct lu_env *env, + struct lod_object *lo, + int index, __u64 extension_size, + struct lu_extent *extent, + struct lu_extent *comp_extent, int write) +{ + struct lod_layout_component *lod_comp = &lo->ldo_comp_entries[index]; + struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); + struct obd_statfs *sfs = &lod_env_info(env)->lti_osfs; + __u64 available = 0; + __u64 size; + bool ret = true; + int i, rc; + + ENTRY; + + LASSERT(lod_comp->llc_stripe_count != 0); + + if (write == 0 || + (extent->e_start == 0 && extent->e_end == OBD_OBJECT_EOF)) { + /* truncate or append */ + size = extension_size; + } else { + /* In case of write op, check the real write extent, + * it may be larger than the extension_size */ + size = roundup(min(extent->e_end, comp_extent->e_end) - + max(extent->e_start, comp_extent->e_start), + extension_size); + } + /* extension_size is file level, so we must divide by stripe count to + * compare it to available space on a single OST */ + size /= lod_comp->llc_stripe_count; + + lod_getref(&lod->lod_ost_descs); + for (i = 0; i < lod_comp->llc_stripe_count; i++) { + int index = lod_comp->llc_ost_indices[i]; + struct lod_tgt_desc *ost = OST_TGT(lod, index); + struct obd_statfs_info info = { 0 }; + int j, repeated = 0; + + LASSERT(ost); + + /* Get the number of times this OST repeats in this component. + * Note: inter-component repeats are not counted as this is + * considered as a rare case: we try to not repeat OST in other + * components if possible. */ + for (j = 0; j < lod_comp->llc_stripe_count; j++) { + if (index != lod_comp->llc_ost_indices[j]) + continue; + + /* already handled */ + if (j < i) + break; + + repeated++; + } + if (j < lod_comp->llc_stripe_count) + continue; + + if (!cfs_bitmap_check(lod->lod_ost_bitmap, index)) { + CDEBUG(D_LAYOUT, "ost %d no longer present\n", index); + ret = false; + break; + } + + rc = dt_statfs_info(env, ost->ltd_tgt, sfs, &info); + if (rc) { + CDEBUG(D_LAYOUT, "statfs failed for ost %d, error %d\n", + index, rc); + ret = false; + break; + } + + if (sfs->os_state & OS_STATE_ENOSPC || + sfs->os_state & OS_STATE_READONLY || + sfs->os_state & OS_STATE_DEGRADED) { + CDEBUG(D_LAYOUT, "ost %d is not availble for SEL " + "extension, state %u\n", index, sfs->os_state); + ret = false; + break; + } + + /* In bytes */ + available = sfs->os_bavail * sfs->os_bsize; + /* 'available' is relative to the allocation threshold */ + available -= (__u64) info.os_reserved_mb_low << 20; + + CDEBUG(D_LAYOUT, "ost %d lowwm: %d highwm: %d, " + "%llu %% blocks available, %llu %% blocks free\n", + index, info.os_reserved_mb_low, info.os_reserved_mb_high, + (100ull * sfs->os_bavail) / sfs->os_blocks, + (100ull * sfs->os_bfree) / sfs->os_blocks); + + if (size * repeated > available) { + ret = false; + CDEBUG(D_LAYOUT, "low space on ost %d, available %llu " + "< extension size %llu\n", index, available, + extension_size); + break; + } + } + lod_putref(lod, &lod->lod_ost_descs); + + RETURN(ret); +} + +/** + * Adjust extents after component removal + * + * When we remove an extension component, we move the start of the next + * component to match the start of the extension component, so no space is left + * without layout. + * + * \param[in] env execution environment for this thread + * \param[in] lo object + * \param[in] max_comp layout component + * \param[in] index index of this component + * + * \retval 0 on success + * \retval negative errno on error + */ +static void lod_sel_adjust_extents(const struct lu_env *env, + struct lod_object *lo, + int max_comp, int index) +{ + struct lod_layout_component *lod_comp = NULL; + struct lod_layout_component *next = NULL; + struct lod_layout_component *prev = NULL; + __u64 new_start = 0; + __u64 start; + int i; + + /* Extension space component */ + lod_comp = &lo->ldo_comp_entries[index]; + next = &lo->ldo_comp_entries[index + 1]; + prev = &lo->ldo_comp_entries[index - 1]; + + LASSERT(lod_comp != NULL && prev != NULL && next != NULL); + LASSERT(lod_comp->llc_flags & LCME_FL_EXTENSION); + + /* Previous is being removed */ + if (prev && prev->llc_id == LCME_ID_INVAL) + new_start = prev->llc_extent.e_start; + else + new_start = lod_comp->llc_extent.e_start; + + for (i = index + 1; i < max_comp; i++) { + lod_comp = &lo->ldo_comp_entries[i]; - /* No stripes */ - if (lo->ldo_dir_stripe_count <= 1) - RETURN(0); + start = lod_comp->llc_extent.e_start; + lod_comp->llc_extent.e_start = new_start; - slave_locks_size = offsetof(typeof(*slave_locks), - ha_handles[lo->ldo_dir_stripe_count]); - /* Freed in lod_object_unlock */ - OBD_ALLOC(slave_locks, slave_locks_size); - if (!slave_locks) - RETURN(-ENOMEM); - slave_locks->ha_count = lo->ldo_dir_stripe_count; + /* We only move zero length extendable components */ + if (!(start == lod_comp->llc_extent.e_end)) + break; - /* striped directory lock */ - for (i = 0; i < lo->ldo_dir_stripe_count; i++) { - struct lustre_handle lockh; - struct ldlm_res_id *res_id; + LASSERT(!(lod_comp->llc_flags & LCME_FL_INIT)); - res_id = &lod_env_info(env)->lti_res_id; - fid_build_reg_res_name(lu_object_fid(&lo->ldo_stripe[i]->do_lu), - res_id); - einfo->ei_res_id = res_id; + lod_comp->llc_extent.e_end = new_start; + } +} - LASSERT(lo->ldo_stripe[i] != NULL); - if (dt_object_remote(lo->ldo_stripe[i])) { - set_bit(i, (void *)slave_locks->ha_map); - rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, - einfo, policy); - } else { - struct ldlm_namespace *ns = einfo->ei_namespace; - ldlm_blocking_callback blocking = einfo->ei_cb_local_bl; - ldlm_completion_callback completion = einfo->ei_cb_cp; - __u64 dlmflags = LDLM_FL_ATOMIC_CB; +/* Calculate the proposed 'new end' for a component we're extending */ +static __u64 lod_extension_new_end(__u64 extension_size, __u64 extent_end, + __u32 stripe_size, __u64 component_end, + __u64 extension_end) +{ + __u64 new_end; - if (einfo->ei_mode == LCK_PW || - einfo->ei_mode == LCK_EX) - dlmflags |= LDLM_FL_COS_INCOMPAT; + LASSERT(extension_size != 0 && stripe_size != 0); - LASSERT(ns != NULL); - rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, - policy, einfo->ei_mode, - &dlmflags, blocking, - completion, NULL, - NULL, 0, LVB_T_NONE, - NULL, &lockh); - } - if (rc) { - while (i--) - ldlm_lock_decref_and_cancel( - &slave_locks->ha_handles[i], - einfo->ei_mode); - OBD_FREE(slave_locks, slave_locks_size); - RETURN(rc); + /* Round up to extension size */ + if (extent_end == OBD_OBJECT_EOF) { + new_end = OBD_OBJECT_EOF; + } else { + /* Add at least extension_size to the previous component_end, + * covering the req layout extent */ + new_end = max(extent_end - component_end, extension_size); + new_end = roundup(new_end, extension_size); + new_end += component_end; + + /* Component end must be min stripe size aligned */ + if (new_end % stripe_size) { + CDEBUG(D_LAYOUT, "new component end is not aligned " + "by the stripe size %u: [%llu, %llu) ext size " + "%llu new end %llu, aligning\n", + stripe_size, component_end, extent_end, + extension_size, new_end); + new_end = roundup(new_end, stripe_size); } - slave_locks->ha_handles[i] = lockh; + + /* Overflow */ + if (new_end < extent_end) + new_end = OBD_OBJECT_EOF; } - einfo->ei_cbdata = slave_locks; - RETURN(0); + /* Don't extend past the end of the extension component */ + if (new_end > extension_end) + new_end = extension_end; + + return new_end; } +/* As lod_sel_handler() could be re-entered for the same component several + * times, this is the data for the next call. Fields could be changed to + * component indexes when needed, (e.g. if there is no need to instantiate + * all the previous components up to the current position) to tell the caller + * where to start over from. */ +struct sel_data { + int sd_force; + int sd_repeat; +}; + /** - * Implementation of dt_object_operations::do_invalidate. + * Process extent updates for a particular layout component + * + * Handle layout updates for a particular extension space component touched by + * a layout update operation. Core function of self-extending PFL feature. + * + * In general, this function processes exactly *one* stage of an extension + * operation, modifying the layout accordingly, then returns to the caller. + * The caller is responsible for restarting processing with the new layout, + * which may repeatedly return to this function until the extension updates + * are complete. + * + * This function does one of a few things to the layout: + * 1. Extends the component before the current extension space component to + * allow it to accomodate the requested operation (if space/policy permit that + * component to continue on its current OSTs) + * + * 2. If extension of the existing component fails, we do one of two things: + * a. If there is a component after the extension space, we remove the + * extension space component, move the start of the next component down + * accordingly, then notify the caller to restart processing w/the new + * layout. + * b. If there is no following component, we try repeating the current + * component, creating a new component using the current one as a + * template (keeping its stripe properties but not specific striping), + * and try assigning striping for this component. If there is sufficient + * free space on the OSTs chosen for this component, it is instantiated + * and i/o continues there. + * + * If there is not sufficient space on the new OSTs, we remove this new + * component & extend the current component. + * + * Note further that uninited components followed by extension space can be zero + * length meaning that we will try to extend them before initializing them, and + * if that fails, they will be removed without initialization. + * + * 3. If we extend to/beyond the end of an extension space component, that + * component is exhausted (all of its range has been given to real components), + * so we remove it and restart processing. * - * \see dt_object_operations::do_invalidate() in the API description for details + * \param[in] env execution environment for this thread + * \param[in,out] lo object to update the layout of + * \param[in] extent layout extent for requested operation, update + * layout to fit this operation + * \param[in] th transaction handle for this operation + * \param[in,out] max_comp the highest comp for the portion of the layout + * we are operating on (For FLR, the chosen + * replica). Updated because we may remove + * components. + * \param[in] index index of the extension space component we're + * working on + * \param[in] write if this is write op + * \param[in,out] force if the extension is to be forced; set here + to force it on the 2nd call for the same + extension component + * + * \retval 0 on success + * \retval negative errno on error */ -static int lod_invalidate(const struct lu_env *env, struct dt_object *dt) -{ - return dt_invalidate(env, dt_object_child(dt)); -} - -static int lod_layout_data_init(struct lod_thread_info *info, __u32 comp_cnt) +static int lod_sel_handler(const struct lu_env *env, + struct lod_object *lo, + struct lu_extent *extent, + struct thandle *th, int *max_comp, + int index, int write, + struct sel_data *sd) { + struct lod_device *d = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); + struct lod_thread_info *info = lod_env_info(env); + struct lod_layout_component *lod_comp; + struct lod_layout_component *prev; + struct lod_layout_component *next = NULL; + __u64 extension_size; + __u64 new_end = 0; + bool repeated; + int change = 0; + int rc = 0; ENTRY; - /* clear memory region that will be used for layout change */ - memset(&info->lti_layout_attr, 0, sizeof(struct lu_attr)); - info->lti_count = 0; + /* First component cannot be extension space */ + if (index == 0) { + CERROR("%s: "DFID" first component cannot be extension space\n", + lod2obd(d)->obd_name, PFID(lod_object_fid(lo))); + RETURN(-EINVAL); + } - if (info->lti_comp_size >= comp_cnt) - RETURN(0); + lod_comp = &lo->ldo_comp_entries[index]; + prev = &lo->ldo_comp_entries[index - 1]; + if ((index + 1) < *max_comp) + next = &lo->ldo_comp_entries[index + 1]; - if (info->lti_comp_size > 0) { - OBD_FREE(info->lti_comp_idx, - info->lti_comp_size * sizeof(__u32)); - info->lti_comp_size = 0; + /* extension size uses the stripe size field as KiB */ + extension_size = lod_comp->llc_stripe_size * SEL_UNIT_SIZE; + + CDEBUG(D_LAYOUT, "prev start %llu, extension start %llu, extension end" + " %llu, extension size %llu\n", prev->llc_extent.e_start, + lod_comp->llc_extent.e_start, lod_comp->llc_extent.e_end, + extension_size); + + /* Two extension space components cannot be adjacent & extension space + * components cannot be init */ + if ((prev->llc_flags & LCME_FL_EXTENSION) || + !(ergo(next, !(next->llc_flags & LCME_FL_EXTENSION))) || + lod_comp_inited(lod_comp)) { + CERROR("%s: "DFID" invalid extension space components\n", + lod2obd(d)->obd_name, PFID(lod_object_fid(lo))); + RETURN(-EINVAL); } - OBD_ALLOC(info->lti_comp_idx, comp_cnt * sizeof(__u32)); - if (!info->lti_comp_idx) - RETURN(-ENOMEM); + if (!prev->llc_stripe) { + CDEBUG(D_LAYOUT, "Previous component not inited\n"); + info->lti_count = 1; + info->lti_comp_idx[0] = index - 1; + rc = lod_declare_instantiate_components(env, lo, th); + /* ENOSPC tells us we can't use this component. If there is + * a next or we are repeating, we either spill over (next) or + * extend the original comp (repeat). Otherwise, return the + * error to the user. */ + if (rc == -ENOSPC && (next || sd->sd_repeat)) + rc = 1; + if (rc < 0) + RETURN(rc); + } - info->lti_comp_size = comp_cnt; + if (sd->sd_force == 0 && rc == 0) + rc = !lod_sel_osts_allowed(env, lo, index - 1, + extension_size, extent, + &lod_comp->llc_extent, write); + + repeated = !!(sd->sd_repeat); + sd->sd_repeat = 0; + sd->sd_force = 0; + + /* Extend previous component */ + if (rc == 0) { + new_end = lod_extension_new_end(extension_size, extent->e_end, + prev->llc_stripe_size, + prev->llc_extent.e_end, + lod_comp->llc_extent.e_end); + + CDEBUG(D_LAYOUT, "new end %llu\n", new_end); + lod_comp->llc_extent.e_start = new_end; + prev->llc_extent.e_end = new_end; + + if (prev->llc_extent.e_end == lod_comp->llc_extent.e_end) { + CDEBUG(D_LAYOUT, "Extension component exhausted\n"); + lod_comp->llc_id = LCME_ID_INVAL; + change--; + } + } else { + /* rc == 1, failed to extend current component */ + LASSERT(rc == 1); + if (next) { + /* Normal 'spillover' case - Remove the extension + * space component & bring down the start of the next + * component. */ + lod_comp->llc_id = LCME_ID_INVAL; + change--; + if (!(prev->llc_flags & LCME_FL_INIT)) { + prev->llc_id = LCME_ID_INVAL; + change--; + } + lod_sel_adjust_extents(env, lo, *max_comp, index); + } else if (lod_comp_inited(prev)) { + /* If there is no next, and the previous component is + * INIT'ed, try repeating the previous component. */ + LASSERT(repeated == 0); + rc = lod_layout_repeat_comp(env, lo, index - 1); + if (rc < 0) + RETURN(rc); + change++; + /* The previous component is a repeated component. + * Record this so we don't keep trying to repeat it. */ + sd->sd_repeat = 1; + } else { + /* If the previous component is not INIT'ed, this may + * be a component we have just instantiated but failed + * to extend. Or even a repeated component we failed + * to prepare a striping for. Do not repeat but instead + * remove the repeated component & force the extention + * of the original one */ + sd->sd_force = 1; + if (repeated) { + prev->llc_id = LCME_ID_INVAL; + change--; + } + } + } + + if (change < 0) { + rc = lod_layout_del_prep_layout(env, lo, NULL); + if (rc < 0) + RETURN(rc); + LASSERTF(-rc == change, + "number deleted %d != requested %d\n", -rc, + change); + } + *max_comp = *max_comp + change; + + /* lod_del_prep_layout reallocates ldo_comp_entries, so we must + * refresh these pointers before using them */ + lod_comp = &lo->ldo_comp_entries[index]; + prev = &lo->ldo_comp_entries[index - 1]; + CDEBUG(D_LAYOUT, "After extent updates: prev start %llu, current start " + "%llu, current end %llu max_comp %d ldo_comp_cnt %d\n", + prev->llc_extent.e_start, lod_comp->llc_extent.e_start, + lod_comp->llc_extent.e_end, *max_comp, lo->ldo_comp_cnt); + + /* Layout changed successfully */ RETURN(0); } -static int lod_declare_instantiate_components(const struct lu_env *env, - struct lod_object *lo, struct thandle *th) +/** + * Declare layout extent updates + * + * Handles extensions. Identifies extension components touched by current + * operation and passes them to processing function. + * + * Restarts with updated layouts from the processing function until the current + * operation no longer touches an extension space component. + * + * \param[in] env execution environment for this thread + * \param[in,out] lo object to update the layout of + * \param[in] extent layout extent for requested operation, update layout to + * fit this operation + * \param[in] th transaction handle for this operation + * \param[in] pick identifies chosen mirror for FLR layouts + * \param[in] write if this is write op + * + * \retval 1 on layout changed, 0 on no change + * \retval negative errno on error + */ +static int lod_declare_update_extents(const struct lu_env *env, + struct lod_object *lo, struct lu_extent *extent, + struct thandle *th, int pick, int write) { struct lod_thread_info *info = lod_env_info(env); - int i; - int rc = 0; + struct lod_layout_component *lod_comp; + bool layout_changed = false; + struct sel_data sd = { 0 }; + int start_index; + int i = 0; + int max_comp = 0; + int rc = 0, rc2; + int change = 0; ENTRY; - LASSERT(info->lti_count < lo->ldo_comp_cnt); + /* This makes us work on the components of the chosen mirror */ + start_index = lo->ldo_mirrors[pick].lme_start; + max_comp = lo->ldo_mirrors[pick].lme_end + 1; + if (lo->ldo_flr_state == LCM_FL_NONE) + LASSERT(start_index == 0 && max_comp == lo->ldo_comp_cnt); - for (i = 0; i < info->lti_count; i++) { - rc = lod_qos_prep_create(env, lo, NULL, th, - info->lti_comp_idx[i]); - if (rc) + CDEBUG(D_LAYOUT, "extent->e_start %llu, extent->e_end %llu\n", + extent->e_start, extent->e_end); + for (i = start_index; i < max_comp; i++) { + lod_comp = &lo->ldo_comp_entries[i]; + + /* We've passed all components of interest */ + if (lod_comp->llc_extent.e_start >= extent->e_end) break; + + if (lod_comp->llc_flags & LCME_FL_EXTENSION) { + layout_changed = true; + rc = lod_sel_handler(env, lo, extent, th, &max_comp, + i, write, &sd); + if (rc < 0) + GOTO(out, rc); + + /* Nothing has changed behind the prev one */ + i -= 2; + continue; + } } - if (!rc) { - info->lti_buf.lb_len = lod_comp_md_size(lo, false); - rc = lod_sub_declare_xattr_set(env, lod_object_child(lo), - &info->lti_buf, XATTR_NAME_LOV, 0, th); + /* We may have added or removed components. If so, we must update the + * start & ends of all the mirrors after the current one, and the end + * of the current mirror. */ + change = max_comp - 1 - lo->ldo_mirrors[pick].lme_end; + if (change) { + lo->ldo_mirrors[pick].lme_end += change; + for (i = pick + 1; i < lo->ldo_mirror_count; i++) { + lo->ldo_mirrors[i].lme_start += change; + lo->ldo_mirrors[i].lme_end += change; + } } - RETURN(rc); + EXIT; +out: + /* The amount of components has changed, adjust the lti_comp_idx */ + rc2 = lod_layout_data_init(info, lo->ldo_comp_cnt); + + return rc < 0 ? rc : rc2 < 0 ? rc2 : layout_changed; +} + +/* If striping is already instantiated or INIT'ed DOM? */ +static bool lod_is_instantiation_needed(struct lod_layout_component *comp) +{ + return !(((lov_pattern(comp->llc_pattern) == LOV_PATTERN_MDT) && + lod_comp_inited(comp)) || comp->llc_stripe); } +/** + * Declare layout update for a non-FLR layout. + * + * \param[in] env execution environment for this thread + * \param[in,out] lo object to update the layout of + * \param[in] layout layout intent for requested operation, "update" is + * a process of reacting to this + * \param[in] buf buffer containing lov ea (see comment on usage inline) + * \param[in] th transaction handle for this operation + * + * \retval 0 on success + * \retval negative errno on error + */ static int lod_declare_update_plain(const struct lu_env *env, struct lod_object *lo, struct layout_intent *layout, const struct lu_buf *buf, struct thandle *th) @@ -5388,6 +6987,7 @@ static int lod_declare_update_plain(const struct lu_env *env, struct lod_device *d = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); struct lod_layout_component *lod_comp; struct lov_comp_md_v1 *comp_v1 = NULL; + bool layout_changed = false; bool replay = false; int i, rc; ENTRY; @@ -5415,6 +7015,7 @@ static int lod_declare_update_plain(const struct lu_env *env, rc = lod_use_defined_striping(env, lo, buf); if (rc) GOTO(out, rc); + lo->ldo_comp_cached = 1; rc = lod_get_lov_ea(env, lo); if (rc <= 0) @@ -5422,6 +7023,11 @@ static int lod_declare_update_plain(const struct lu_env *env, /* old on-disk EA is stored in info->lti_buf */ comp_v1 = (struct lov_comp_md_v1 *)info->lti_buf.lb_buf; replay = true; + layout_changed = true; + + rc = lod_layout_data_init(info, lo->ldo_comp_cnt); + if (rc) + GOTO(out, rc); } else { /* non replay path */ rc = lod_striping_load(env, lo); @@ -5434,18 +7040,27 @@ static int lod_declare_update_plain(const struct lu_env *env, if (lo->ldo_comp_cnt > 1 && lod_comp->llc_extent.e_end != OBD_OBJECT_EOF && lod_comp->llc_extent.e_end < layout->li_extent.e_end) { - CDEBUG(replay ? D_ERROR : D_LAYOUT, - "%s: the defined layout [0, %#llx) does not covers " - "the write range "DEXT"\n", - lod2obd(d)->obd_name, lod_comp->llc_extent.e_end, - PEXT(&layout->li_extent)); + CDEBUG_LIMIT(replay ? D_ERROR : D_LAYOUT, + "%s: the defined layout [0, %#llx) does not " + "covers the write range "DEXT"\n", + lod2obd(d)->obd_name, lod_comp->llc_extent.e_end, + PEXT(&layout->li_extent)); GOTO(out, rc = -EINVAL); } - CDEBUG(D_LAYOUT, "%s: "DFID": instantiate components "DEXT"\n", + CDEBUG(D_LAYOUT, "%s: "DFID": update components "DEXT"\n", lod2obd(d)->obd_name, PFID(lod_object_fid(lo)), PEXT(&layout->li_extent)); + if (!replay) { + rc = lod_declare_update_extents(env, lo, &layout->li_extent, + th, 0, layout->li_opc == LAYOUT_INTENT_WRITE); + if (rc < 0) + GOTO(out, rc); + else if (rc) + layout_changed = true; + } + /* * Iterate ld->ldo_comp_entries, find the component whose extent under * the write range and not instantianted. @@ -5457,7 +7072,8 @@ static int lod_declare_update_plain(const struct lu_env *env, break; if (!replay) { - if (lod_comp_inited(lod_comp)) + /* If striping is instantiated or INIT'ed DOM skip */ + if (!lod_is_instantiation_needed(lod_comp)) continue; } else { /** @@ -5481,17 +7097,19 @@ static int lod_declare_update_plain(const struct lu_env *env, LASSERT(info->lti_comp_idx != NULL); info->lti_comp_idx[info->lti_count++] = i; + layout_changed = true; } - if (info->lti_count == 0) + if (!layout_changed) RETURN(-EALREADY); lod_obj_inc_layout_gen(lo); rc = lod_declare_instantiate_components(env, lo, th); + EXIT; out: if (rc) lod_striping_free(env, lo); - RETURN(rc); + return rc; } static inline int lod_comp_index(struct lod_object *lo, @@ -5506,15 +7124,21 @@ static inline int lod_comp_index(struct lod_object *lo, /** * Stale other mirrors by writing extent. */ -static void lod_stale_components(struct lod_object *lo, int primary, - struct lu_extent *extent) +static int lod_stale_components(const struct lu_env *env, struct lod_object *lo, + int primary, struct lu_extent *extent, + struct thandle *th) { struct lod_layout_component *pri_comp, *lod_comp; + struct lu_extent pri_extent; + int rc = 0; int i; + ENTRY; /* The writing extent decides which components in the primary * are affected... */ CDEBUG(D_LAYOUT, "primary mirror %d, "DEXT"\n", primary, PEXT(extent)); + +restart: lod_foreach_mirror_comp(pri_comp, lo, primary) { if (!lu_extent_is_overlapped(extent, &pri_comp->llc_extent)) continue; @@ -5523,15 +7147,27 @@ static void lod_stale_components(struct lod_object *lo, int primary, lod_comp_index(lo, pri_comp), PEXT(&pri_comp->llc_extent)); + pri_extent.e_start = pri_comp->llc_extent.e_start; + pri_extent.e_end = pri_comp->llc_extent.e_end; + for (i = 0; i < lo->ldo_mirror_count; i++) { if (i == primary) continue; + rc = lod_declare_update_extents(env, lo, &pri_extent, + th, i, 0); + /* if update_extents changed the layout, it may have + * reallocated the component array, so start over to + * avoid using stale pointers */ + if (rc == 1) + goto restart; + if (rc < 0) + RETURN(rc); /* ... and then stale other components that are * overlapping with primary components */ lod_foreach_mirror_comp(lod_comp, lo, i) { if (!lu_extent_is_overlapped( - &pri_comp->llc_extent, + &pri_extent, &lod_comp->llc_extent)) continue; @@ -5543,6 +7179,8 @@ static void lod_stale_components(struct lod_object *lo, int primary, } } } + + RETURN(rc); } /** @@ -5614,7 +7252,7 @@ static int lod_primary_pick(const struct lu_env *env, struct lod_object *lo, * This algo can be revised later after knowing the topology of * cluster. */ - lod_qos_statfs_update(env, lod); + lod_qos_statfs_update(env, lod, &lod->lod_ost_descs); for (i = 0; i < lo->ldo_mirror_count; i++) { bool ost_avail = true; int index = (i + seq) % lo->ldo_mirror_count; @@ -5689,6 +7327,34 @@ static int lod_primary_pick(const struct lu_env *env, struct lod_object *lo, RETURN(picked); } +static int lod_prepare_resync_mirror(const struct lu_env *env, + struct lod_object *lo, + __u16 mirror_id) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_layout_component *lod_comp; + bool neg = !!(MIRROR_ID_NEG & mirror_id); + int i; + + mirror_id &= ~MIRROR_ID_NEG; + + for (i = 0; i < lo->ldo_mirror_count; i++) { + if ((!neg && lo->ldo_mirrors[i].lme_id != mirror_id) || + (neg && lo->ldo_mirrors[i].lme_id == mirror_id)) + continue; + + lod_foreach_mirror_comp(lod_comp, lo, i) { + if (lod_comp_inited(lod_comp)) + continue; + + info->lti_comp_idx[info->lti_count++] = + lod_comp_index(lo, lod_comp); + } + } + + return 0; +} + /** * figure out the components should be instantiated for resync. */ @@ -5750,6 +7416,7 @@ static int lod_declare_update_rdonly(const struct lu_env *env, if (mlc->mlc_opc == MD_LAYOUT_WRITE) { struct layout_intent *layout = mlc->mlc_intent; + int write = layout->li_opc == LAYOUT_INTENT_WRITE; int picked; extent = layout->li_extent; @@ -5764,6 +7431,12 @@ static int lod_declare_update_rdonly(const struct lu_env *env, PFID(lod_object_fid(lo)), lo->ldo_mirrors[picked].lme_id); + /* Update extents of primary before staling */ + rc = lod_declare_update_extents(env, lo, &extent, th, picked, + write); + if (rc < 0) + GOTO(out, rc); + if (layout->li_opc == LAYOUT_INTENT_TRUNC) { /** * trunc transfers [0, size) in the intent extent, we'd @@ -5774,7 +7447,9 @@ static int lod_declare_update_rdonly(const struct lu_env *env, } /* stale overlapping components from other mirrors */ - lod_stale_components(lo, picked, &extent); + rc = lod_stale_components(env, lo, picked, &extent, th); + if (rc < 0) + GOTO(out, rc); /* restore truncate intent extent */ if (layout->li_opc == LAYOUT_INTENT_TRUNC) @@ -5788,7 +7463,7 @@ static int lod_declare_update_rdonly(const struct lu_env *env, &lod_comp->llc_extent)) break; - if (lod_comp_inited(lod_comp)) + if (!lod_is_instantiation_needed(lod_comp)) continue; info->lti_comp_idx[info->lti_count++] = @@ -5804,23 +7479,33 @@ static int lod_declare_update_rdonly(const struct lu_env *env, * prep uninited all components assuming any non-stale mirror * could be picked as the primary mirror. */ - for (i = 0; i < lo->ldo_mirror_count; i++) { - if (lo->ldo_mirrors[i].lme_stale) - continue; + if (mlc->mlc_mirror_id == 0) { + /* normal resync */ + for (i = 0; i < lo->ldo_mirror_count; i++) { + if (lo->ldo_mirrors[i].lme_stale) + continue; - lod_foreach_mirror_comp(lod_comp, lo, i) { - if (!lod_comp_inited(lod_comp)) - break; + lod_foreach_mirror_comp(lod_comp, lo, i) { + if (!lod_comp_inited(lod_comp)) + break; - if (extent.e_end < lod_comp->llc_extent.e_end) - extent.e_end = - lod_comp->llc_extent.e_end; + if (extent.e_end < + lod_comp->llc_extent.e_end) + extent.e_end = + lod_comp->llc_extent.e_end; + } } + rc = lod_prepare_resync(env, lo, &extent); + if (rc) + GOTO(out, rc); + } else { + /* mirror write, try to init its all components */ + rc = lod_prepare_resync_mirror(env, lo, + mlc->mlc_mirror_id); + if (rc) + GOTO(out, rc); } - rc = lod_prepare_resync(env, lo, &extent); - if (rc) - GOTO(out, rc); /* change the file state to SYNC_PENDING */ lo->ldo_flr_state = LCM_FL_SYNC_PENDING; } @@ -5832,7 +7517,7 @@ static int lod_declare_update_rdonly(const struct lu_env *env, if (lo->ldo_layout_gen > (LCME_ID_MAX >> 1)) { __u32 layout_version; - cfs_get_random_bytes(&layout_version, sizeof(layout_version)); + get_random_bytes(&layout_version, sizeof(layout_version)); lo->ldo_layout_gen = layout_version & 0xffff; } @@ -5904,6 +7589,9 @@ static int lod_declare_update_write_pending(const struct lu_env *env, * 2. transfer layout version to all objects to close write era. */ if (mlc->mlc_opc == MD_LAYOUT_WRITE) { + struct layout_intent *layout = mlc->mlc_intent; + int write = layout->li_opc == LAYOUT_INTENT_WRITE; + LASSERT(mlc->mlc_intent != NULL); extent = mlc->mlc_intent->li_extent; @@ -5911,6 +7599,12 @@ static int lod_declare_update_write_pending(const struct lu_env *env, CDEBUG(D_LAYOUT, DFID": intent to write: "DEXT"\n", PFID(lod_object_fid(lo)), PEXT(&extent)); + /* 1. Update extents of primary before staling */ + rc = lod_declare_update_extents(env, lo, &extent, th, primary, + write); + if (rc < 0) + GOTO(out, rc); + if (mlc->mlc_intent->li_opc == LAYOUT_INTENT_TRUNC) { /** * trunc transfers [0, size) in the intent extent, we'd @@ -5919,10 +7613,13 @@ static int lod_declare_update_write_pending(const struct lu_env *env, extent.e_start = extent.e_end; extent.e_end = OBD_OBJECT_EOF; } - /* 1. stale overlapping components */ - lod_stale_components(lo, primary, &extent); - /* 2. find out the components need instantiating. + /* 2. stale overlapping components */ + rc = lod_stale_components(env, lo, primary, &extent, th); + if (rc < 0) + GOTO(out, rc); + + /* 3. find the components which need instantiating. * instantiate [0, mlc->mlc_intent->e_end) */ /* restore truncate intent extent */ @@ -5935,7 +7632,7 @@ static int lod_declare_update_write_pending(const struct lu_env *env, &lod_comp->llc_extent)) break; - if (lod_comp_inited(lod_comp)) + if (!lod_is_instantiation_needed(lod_comp)) continue; CDEBUG(D_LAYOUT, "write instantiate %d / %d\n", @@ -5944,16 +7641,26 @@ static int lod_declare_update_write_pending(const struct lu_env *env, lod_comp_index(lo, lod_comp); } } else { /* MD_LAYOUT_RESYNC */ - lod_foreach_mirror_comp(lod_comp, lo, primary) { - if (!lod_comp_inited(lod_comp)) - break; + if (mlc->mlc_mirror_id == 0) { + /* normal resync */ + lod_foreach_mirror_comp(lod_comp, lo, primary) { + if (!lod_comp_inited(lod_comp)) + break; + + extent.e_end = lod_comp->llc_extent.e_end; + } - extent.e_end = lod_comp->llc_extent.e_end; + rc = lod_prepare_resync(env, lo, &extent); + if (rc) + GOTO(out, rc); + } else { + /* mirror write, try to init its all components */ + rc = lod_prepare_resync_mirror(env, lo, + mlc->mlc_mirror_id); + if (rc) + GOTO(out, rc); } - rc = lod_prepare_resync(env, lo, &extent); - if (rc) - GOTO(out, rc); /* change the file state to SYNC_PENDING */ lo->ldo_flr_state = LCM_FL_SYNC_PENDING; } @@ -6052,9 +7759,9 @@ static int lod_declare_update_sync_pending(const struct lu_env *env, GOTO(out, rc = -EINVAL); } - CDEBUG(D_LAYOUT, DFID": resynced %u/%zu components\n", + CDEBUG(D_LAYOUT, DFID": synced %u resynced %u/%zu components\n", PFID(lod_object_fid(lo)), - resync_components, mlc->mlc_resync_count); + sync_components, resync_components, mlc->mlc_resync_count); lo->ldo_flr_state = LCM_FL_RDONLY; lod_obj_inc_layout_gen(lo); @@ -6204,11 +7911,11 @@ static ssize_t lod_declare_write(const struct lu_env *env, */ static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, loff_t *pos, - struct thandle *th, int iq) + struct thandle *th) { LASSERT(S_ISREG(dt->do_lu.lo_header->loh_attr) || S_ISLNK(dt->do_lu.lo_header->loh_attr)); - return lod_sub_write(env, dt_object_child(dt), buf, pos, th, iq); + return lod_sub_write(env, dt_object_child(dt), buf, pos, th); } static int lod_declare_punch(const struct lu_env *env, struct dt_object *dt, @@ -6269,16 +7976,8 @@ static int lod_object_init(const struct lu_env *env, struct lu_object *lo, ENTRY; rc = lod_fld_lookup(env, lod, lu_object_fid(lo), &idx, &type); - if (rc != 0) { - /* Note: Sometimes, it will Return EAGAIN here, see - * ptrlpc_import_delay_req(), which might confuse - * lu_object_find_at() and make it wait there incorrectly. - * so we convert it to EIO here.*/ - if (rc == -EAGAIN) - rc = -EIO; - + if (rc != 0) RETURN(rc); - } if (type == LU_SEQ_RANGE_MDT && idx == lu_site2seq(lo->lo_dev->ld_site)->ss_node_id) { @@ -6322,6 +8021,56 @@ static int lod_object_init(const struct lu_env *env, struct lu_object *lo, /** * + * Alloc cached foreign LOV + * + * \param[in] lo object + * \param[in] size size of foreign LOV + * + * \retval 0 on success + * \retval negative if failed + */ +int lod_alloc_foreign_lov(struct lod_object *lo, size_t size) +{ + OBD_ALLOC_LARGE(lo->ldo_foreign_lov, size); + if (lo->ldo_foreign_lov == NULL) + return -ENOMEM; + lo->ldo_foreign_lov_size = size; + lo->ldo_is_foreign = 1; + return 0; +} + +/** + * + * Free cached foreign LOV + * + * \param[in] lo object + */ +void lod_free_foreign_lov(struct lod_object *lo) +{ + if (lo->ldo_foreign_lov != NULL) + OBD_FREE_LARGE(lo->ldo_foreign_lov, lo->ldo_foreign_lov_size); + lo->ldo_foreign_lov = NULL; + lo->ldo_foreign_lov_size = 0; + lo->ldo_is_foreign = 0; +} + +/** + * + * Free cached foreign LMV + * + * \param[in] lo object + */ +void lod_free_foreign_lmv(struct lod_object *lo) +{ + if (lo->ldo_foreign_lmv != NULL) + OBD_FREE_LARGE(lo->ldo_foreign_lmv, lo->ldo_foreign_lmv_size); + lo->ldo_foreign_lmv = NULL; + lo->ldo_foreign_lmv_size = 0; + lo->ldo_dir_is_foreign = 0; +} + +/** + * * Release resources associated with striping. * * If the object is striped (regular or directory), then release @@ -6335,7 +8084,13 @@ void lod_striping_free_nolock(const struct lu_env *env, struct lod_object *lo) struct lod_layout_component *lod_comp; int i, j; - if (lo->ldo_stripe != NULL) { + if (unlikely(lo->ldo_is_foreign)) { + lod_free_foreign_lov(lo); + lo->ldo_comp_cached = 0; + } else if (unlikely(lo->ldo_dir_is_foreign)) { + lod_free_foreign_lmv(lo); + lo->ldo_dir_stripe_loaded = 0; + } else if (lo->ldo_stripe != NULL) { LASSERT(lo->ldo_comp_entries == NULL); LASSERT(lo->ldo_dir_stripes_allocated > 0);