X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Flod%2Flod_object.c;h=c781ed5e5aa7d6ff6dc4a5bf2e8b4f0468444bb1;hp=330dd31fab6f0a859d81c3fa041afc21a69e1dda;hb=a24f6153292753bf6e40f5638930d6cffa78e1ac;hpb=5b663ca0cb31517e307913873ca5df4676e5654d diff --git a/lustre/lod/lod_object.c b/lustre/lod/lod_object.c index 330dd31..c781ed5 100644 --- a/lustre/lod/lod_object.c +++ b/lustre/lod/lod_object.c @@ -23,7 +23,7 @@ * Copyright 2009 Sun Microsystems, Inc. All rights reserved * Use is subject to license terms. * - * Copyright (c) 2012, 2016, Intel Corporation. + * Copyright (c) 2012, 2017, Intel Corporation. */ /* * lustre/lod/lod_object.c @@ -40,6 +40,8 @@ #define DEBUG_SUBSYSTEM S_MDS +#include + #include #include #include @@ -96,9 +98,9 @@ static int lod_declare_insert(const struct lu_env *env, struct dt_object *dt, */ static int lod_insert(const struct lu_env *env, struct dt_object *dt, const struct dt_rec *rec, const struct dt_key *key, - struct thandle *th, int ign) + struct thandle *th) { - return lod_sub_insert(env, dt_object_child(dt), rec, key, th, ign); + return lod_sub_insert(env, dt_object_child(dt), rec, key, th); } /** @@ -375,15 +377,24 @@ static struct dt_index_operations lod_index_ops = { static struct dt_it *lod_striped_it_init(const struct lu_env *env, struct dt_object *dt, __u32 attr) { - struct lod_object *lo = lod_dt_obj(dt); - struct dt_object *next; - struct lod_it *it = &lod_env_info(env)->lti_it; - struct dt_it *it_next; - ENTRY; + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object *next; + struct lod_it *it = &lod_env_info(env)->lti_it; + struct dt_it *it_next; + __u16 index = 0; LASSERT(lo->ldo_dir_stripe_count > 0); - next = lo->ldo_stripe[0]; - LASSERT(next != NULL); + + do { + next = lo->ldo_stripe[index]; + if (next && dt_object_exists(next)) + break; + } while (++index < lo->ldo_dir_stripe_count); + + /* no valid stripe */ + if (!next || !dt_object_exists(next)) + return ERR_PTR(-ENODEV); + LASSERT(next->do_index_ops != NULL); it_next = next->do_index_ops->dio_it.init(env, next, attr); @@ -396,7 +407,7 @@ static struct dt_it *lod_striped_it_init(const struct lu_env *env, * additional ones */ LASSERT(it->lit_obj == NULL); - it->lit_stripe_index = 0; + it->lit_stripe_index = index; it->lit_attr = attr; it->lit_it = it_next; it->lit_obj = dt; @@ -431,10 +442,10 @@ static void lod_striped_it_fini(const struct lu_env *env, struct dt_it *di) LOD_CHECK_STRIPED_IT(env, it, lo); next = lo->ldo_stripe[it->lit_stripe_index]; - LASSERT(next != NULL); - LASSERT(next->do_index_ops != NULL); - - next->do_index_ops->dio_it.fini(env, it->lit_it); + if (next) { + LASSERT(next->do_index_ops != NULL); + next->do_index_ops->dio_it.fini(env, it->lit_it); + } } /* the iterator not in use any more */ @@ -455,15 +466,15 @@ static void lod_striped_it_fini(const struct lu_env *env, struct dt_it *di) static int lod_striped_it_get(const struct lu_env *env, struct dt_it *di, const struct dt_key *key) { - const struct lod_it *it = (const struct lod_it *)di; - struct lod_object *lo = lod_dt_obj(it->lit_obj); - struct dt_object *next; - ENTRY; + const struct lod_it *it = (const struct lod_it *)di; + struct lod_object *lo = lod_dt_obj(it->lit_obj); + struct dt_object *next; LOD_CHECK_STRIPED_IT(env, it, lo); next = lo->ldo_stripe[it->lit_stripe_index]; LASSERT(next != NULL); + LASSERT(dt_object_exists(next)); LASSERT(next->do_index_ops != NULL); return next->do_index_ops->dio_it.get(env, it->lit_it, key); @@ -478,9 +489,16 @@ static int lod_striped_it_get(const struct lu_env *env, struct dt_it *di, */ static void lod_striped_it_put(const struct lu_env *env, struct dt_it *di) { - struct lod_it *it = (struct lod_it *)di; - struct lod_object *lo = lod_dt_obj(it->lit_obj); - struct dt_object *next; + struct lod_it *it = (struct lod_it *)di; + struct lod_object *lo = lod_dt_obj(it->lit_obj); + struct dt_object *next; + + /* + * If lit_it == NULL, then it means the sub_it has been finished, + * which only happens in failure cases, see lod_striped_it_next() + */ + if (!it->lit_it) + return; LOD_CHECK_STRIPED_IT(env, it, lo); @@ -501,17 +519,20 @@ static void lod_striped_it_put(const struct lu_env *env, struct dt_it *di) */ static int lod_striped_it_next(const struct lu_env *env, struct dt_it *di) { - struct lod_it *it = (struct lod_it *)di; - struct lod_object *lo = lod_dt_obj(it->lit_obj); - struct dt_object *next; - struct dt_it *it_next; - int rc; + struct lod_it *it = (struct lod_it *)di; + struct lod_object *lo = lod_dt_obj(it->lit_obj); + struct dt_object *next; + struct dt_it *it_next; + __u32 index; + int rc; + ENTRY; LOD_CHECK_STRIPED_IT(env, it, lo); next = lo->ldo_stripe[it->lit_stripe_index]; LASSERT(next != NULL); + LASSERT(dt_object_exists(next)); LASSERT(next->do_index_ops != NULL); again: rc = next->do_index_ops->dio_it.next(env, it->lit_it); @@ -544,33 +565,44 @@ again: RETURN(rc); } - /* go to next stripe */ - if (it->lit_stripe_index + 1 >= lo->ldo_dir_stripe_count) - RETURN(1); - - it->lit_stripe_index++; - next->do_index_ops->dio_it.put(env, it->lit_it); next->do_index_ops->dio_it.fini(env, it->lit_it); it->lit_it = NULL; - next = lo->ldo_stripe[it->lit_stripe_index]; - LASSERT(next != NULL); - rc = next->do_ops->do_index_try(env, next, &dt_directory_features); - if (rc != 0) - RETURN(rc); + /* go to next stripe */ + index = it->lit_stripe_index; + while (++index < lo->ldo_dir_stripe_count) { + next = lo->ldo_stripe[index]; + if (!next) + continue; - LASSERT(next->do_index_ops != NULL); + if (!dt_object_exists(next)) + continue; + + rc = next->do_ops->do_index_try(env, next, + &dt_directory_features); + if (rc != 0) + RETURN(rc); + + LASSERT(next->do_index_ops != NULL); + + it_next = next->do_index_ops->dio_it.init(env, next, + it->lit_attr); + if (IS_ERR(it_next)) + RETURN(PTR_ERR(it_next)); + + rc = next->do_index_ops->dio_it.get(env, it_next, + (const struct dt_key *)""); + if (rc <= 0) + RETURN(rc == 0 ? -EIO : rc); - it_next = next->do_index_ops->dio_it.init(env, next, it->lit_attr); - if (!IS_ERR(it_next)) { it->lit_it = it_next; + it->lit_stripe_index = index; goto again; - } else { - rc = PTR_ERR(it_next); + } - RETURN(rc); + RETURN(1); } /** @@ -774,14 +806,9 @@ int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo, int rc; ENTRY; - /* If it is not a striped directory, then load nothing. */ if (magic != LMV_MAGIC_V1) RETURN(0); - /* If it is in migration (or failure), then load nothing. */ - if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION) - RETURN(0); - stripes = le32_to_cpu(lmv1->lmv_stripe_count); if (stripes < 1) RETURN(0); @@ -947,7 +974,7 @@ static int lod_index_try(const struct lu_env *env, struct dt_object *dt, LASSERT(next->do_ops); LASSERT(next->do_ops->do_index_try); - rc = lod_load_striping_locked(env, lo); + rc = lod_striping_load(env, lo); if (rc != 0) RETURN(rc); @@ -959,7 +986,9 @@ static int lod_index_try(const struct lu_env *env, struct dt_object *dt, int i; for (i = 0; i < lo->ldo_dir_stripe_count; i++) { - if (dt_object_exists(lo->ldo_stripe[i]) == 0) + if (!lo->ldo_stripe[i]) + continue; + if (!dt_object_exists(lo->ldo_stripe[i])) continue; rc = lo->ldo_stripe[i]->do_ops->do_index_try(env, lo->ldo_stripe[i], feat); @@ -1046,8 +1075,20 @@ static int lod_attr_get(const struct lu_env *env, return dt_attr_get(env, dt_object_child(dt), attr); } +static inline void lod_adjust_stripe_info(struct lod_layout_component *comp, + struct lov_desc *desc) +{ + if (comp->llc_pattern != LOV_PATTERN_MDT) { + if (!comp->llc_stripe_count) + comp->llc_stripe_count = + desc->ld_default_stripe_count; + } + if (comp->llc_stripe_size <= 0) + comp->llc_stripe_size = desc->ld_default_stripe_size; +} + int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo, - struct thandle *th, lod_obj_stripe_cb_t cb, + struct thandle *th, struct lod_obj_stripe_cb_data *data) { struct lod_layout_component *lod_comp; @@ -1061,13 +1102,35 @@ int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo, if (lod_comp->llc_stripe == NULL) continue; + /* has stripe but not inited yet, this component has been + * declared to be created, but hasn't created yet. + */ + if (!lod_comp_inited(lod_comp)) + continue; + + if (data->locd_comp_skip_cb && + data->locd_comp_skip_cb(env, lo, i, data)) + continue; + + if (data->locd_comp_cb) { + rc = data->locd_comp_cb(env, lo, i, data); + if (rc) + RETURN(rc); + } + + /* could used just to do sth about component, not each + * stripes + */ + if (!data->locd_stripe_cb) + continue; + LASSERT(lod_comp->llc_stripe_count > 0); for (j = 0; j < lod_comp->llc_stripe_count; j++) { struct dt_object *dt = lod_comp->llc_stripe[j]; if (dt == NULL) continue; - rc = cb(env, lo, dt, th, j, data); + rc = data->locd_stripe_cb(env, lo, dt, th, i, j, data); if (rc != 0) RETURN(rc); } @@ -1075,14 +1138,73 @@ int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo, RETURN(0); } +static bool lod_obj_attr_set_comp_skip_cb(const struct lu_env *env, + struct lod_object *lo, int comp_idx, + struct lod_obj_stripe_cb_data *data) +{ + struct lod_layout_component *lod_comp = &lo->ldo_comp_entries[comp_idx]; + bool skipped = false; + + if (!(data->locd_attr->la_valid & LA_LAYOUT_VERSION)) + return skipped; + + switch (lo->ldo_flr_state) { + case LCM_FL_WRITE_PENDING: { + int i; + + /* skip stale components */ + if (lod_comp->llc_flags & LCME_FL_STALE) { + skipped = true; + break; + } + + /* skip valid and overlapping components, therefore any + * attempts to write overlapped components will never succeed + * because client will get EINPROGRESS. */ + for (i = 0; i < lo->ldo_comp_cnt; i++) { + if (i == comp_idx) + continue; + + if (lo->ldo_comp_entries[i].llc_flags & LCME_FL_STALE) + continue; + + if (lu_extent_is_overlapped(&lod_comp->llc_extent, + &lo->ldo_comp_entries[i].llc_extent)) { + skipped = true; + break; + } + } + break; + } + default: + LASSERTF(0, "impossible: %d\n", lo->ldo_flr_state); + case LCM_FL_SYNC_PENDING: + break; + } + + CDEBUG(D_LAYOUT, DFID": %s to set component %x to version: %u\n", + PFID(lu_object_fid(&lo->ldo_obj.do_lu)), + skipped ? "skipped" : "chose", lod_comp->llc_id, + data->locd_attr->la_layout_version); + + return skipped; +} + static inline int lod_obj_stripe_attr_set_cb(const struct lu_env *env, struct lod_object *lo, struct dt_object *dt, struct thandle *th, - int stripe_idx, struct lod_obj_stripe_cb_data *data) + int comp_idx, int stripe_idx, + struct lod_obj_stripe_cb_data *data) { if (data->locd_declare) return lod_sub_declare_attr_set(env, dt, data->locd_attr, th); + if (data->locd_attr->la_valid & LA_LAYOUT_VERSION) { + CDEBUG(D_LAYOUT, DFID": set layout version: %u, comp_idx: %d\n", + PFID(lu_object_fid(&dt->do_lu)), + data->locd_attr->la_layout_version, comp_idx); + } + return lod_sub_attr_set(env, dt, data->locd_attr, th); } @@ -1120,7 +1242,7 @@ static int lod_declare_attr_set(const struct lu_env *env, * speed up rename(). */ if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) { - if (!(attr->la_valid & (LA_UID | LA_GID | LA_PROJID))) + if (!(attr->la_valid & LA_REMOTE_ATTR_SET)) RETURN(rc); if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER)) @@ -1136,7 +1258,7 @@ static int lod_declare_attr_set(const struct lu_env *env, * is being initialized as we don't need this information till * few specific cases like destroy, chown */ - rc = lod_load_striping(env, lo); + rc = lod_striping_load(env, lo); if (rc) RETURN(rc); @@ -1151,18 +1273,20 @@ static int lod_declare_attr_set(const struct lu_env *env, for (i = 0; i < lo->ldo_dir_stripe_count; i++) { if (lo->ldo_stripe[i] == NULL) continue; + if (!dt_object_exists(lo->ldo_stripe[i])) + continue; rc = lod_sub_declare_attr_set(env, lo->ldo_stripe[i], attr, th); if (rc != 0) RETURN(rc); } } else { - struct lod_obj_stripe_cb_data data; + struct lod_obj_stripe_cb_data data = { { 0 } }; data.locd_attr = attr; data.locd_declare = true; - rc = lod_obj_for_each_stripe(env, lo, th, - lod_obj_stripe_attr_set_cb, &data); + data.locd_stripe_cb = lod_obj_stripe_attr_set_cb; + rc = lod_obj_for_each_stripe(env, lo, th, &data); } if (rc) @@ -1217,7 +1341,7 @@ static int lod_attr_set(const struct lu_env *env, RETURN(rc); if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) { - if (!(attr->la_valid & (LA_UID | LA_GID | LA_PROJID))) + if (!(attr->la_valid & LA_REMOTE_ATTR_SET)) RETURN(rc); if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER)) @@ -1229,6 +1353,14 @@ static int lod_attr_set(const struct lu_env *env, RETURN(rc); } + /* FIXME: a tricky case in the code path of mdd_layout_change(): + * the in-memory striping information has been freed in lod_xattr_set() + * due to layout change. It has to load stripe here again. It only + * changes flags of layout so declare_attr_set() is still accurate */ + rc = lod_striping_load(env, lo); + if (rc) + RETURN(rc); + if (!lod_obj_is_striped(dt)) RETURN(0); @@ -1249,12 +1381,13 @@ static int lod_attr_set(const struct lu_env *env, break; } } else { - struct lod_obj_stripe_cb_data data; + struct lod_obj_stripe_cb_data data = { { 0 } }; data.locd_attr = attr; data.locd_declare = false; - rc = lod_obj_for_each_stripe(env, lo, th, - lod_obj_stripe_attr_set_cb, &data); + data.locd_comp_skip_cb = lod_obj_attr_set_comp_skip_cb; + data.locd_stripe_cb = lod_obj_stripe_attr_set_cb; + rc = lod_obj_for_each_stripe(env, lo, th, &data); } if (rc) @@ -1355,21 +1488,35 @@ static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt, rc = dt_xattr_get(env, dt_object_child(dt), buf, name); if (strcmp(name, XATTR_NAME_LMV) == 0) { struct lmv_mds_md_v1 *lmv1; + struct lmv_foreign_md *lfm; int rc1 = 0; if (rc > (typeof(rc))sizeof(*lmv1)) RETURN(rc); - if (rc < (typeof(rc))sizeof(*lmv1)) + /* short (<= sizeof(struct lmv_mds_md_v1)) foreign LMV case */ + /* XXX empty foreign LMV is not allowed */ + if (rc <= offsetof(typeof(*lfm), lfm_value)) RETURN(rc = rc > 0 ? -EINVAL : rc); if (buf->lb_buf == NULL || buf->lb_len == 0) { CLASSERT(sizeof(*lmv1) <= sizeof(info->lti_key)); + /* lti_buf is large enough for *lmv1 or a short + * (<= sizeof(struct lmv_mds_md_v1)) foreign LMV + */ info->lti_buf.lb_buf = info->lti_key; info->lti_buf.lb_len = sizeof(*lmv1); rc = dt_xattr_get(env, dt_object_child(dt), &info->lti_buf, name); + if (unlikely(rc <= offsetof(typeof(*lfm), + lfm_value))) + RETURN(rc = rc > 0 ? -EINVAL : rc); + + lfm = info->lti_buf.lb_buf; + if (le32_to_cpu(lfm->lfm_magic) == LMV_MAGIC_FOREIGN) + RETURN(rc); + if (unlikely(rc != sizeof(*lmv1))) RETURN(rc = rc > 0 ? -EINVAL : rc); @@ -1382,6 +1529,13 @@ static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt, le32_to_cpu(lmv1->lmv_stripe_count), LMV_MAGIC_V1); } else { + lfm = buf->lb_buf; + if (le32_to_cpu(lfm->lfm_magic) == LMV_MAGIC_FOREIGN) + RETURN(rc); + + if (rc != sizeof(*lmv1)) + RETURN(rc = rc > 0 ? -EINVAL : rc); + rc1 = lod_load_lmv_shards(env, lod_dt_obj(dt), buf, false); } @@ -1511,9 +1665,15 @@ static int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt, } lmm1 = (struct lmv_mds_md_v1 *)info->lti_ea_store; + memset(lmm1, 0, sizeof(*lmm1)); lmm1->lmv_magic = cpu_to_le32(LMV_MAGIC); lmm1->lmv_stripe_count = cpu_to_le32(stripe_count); lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type); + if (lo->ldo_dir_hash_type & LMV_HASH_FLAG_MIGRATION) { + lmm1->lmv_migrate_hash = cpu_to_le32(lo->ldo_dir_migrate_hash); + lmm1->lmv_migrate_offset = + cpu_to_le32(lo->ldo_dir_migrate_offset); + } rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu), &mdtidx, &type); if (rc != 0) @@ -1555,7 +1715,10 @@ int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo, int rc = 0; ENTRY; - if (le32_to_cpu(lmv1->lmv_hash_type) & LMV_HASH_FLAG_MIGRATION) + LASSERT(mutex_is_locked(&lo->ldo_layout_mutex)); + + /* XXX may be useless as not called for foreign LMV ?? */ + if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_FOREIGN) RETURN(0); if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_STRIPE) { @@ -1582,8 +1745,10 @@ int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo, __u32 idx; fid_le_to_cpu(fid, &lmv1->lmv_stripe_fids[i]); - if (!fid_is_sane(fid)) - GOTO(out, rc = -ESTALE); + if (!fid_is_sane(fid)) { + stripe[i] = NULL; + continue; + } rc = lod_fld_lookup(env, lod, fid, &idx, &type); if (rc != 0) @@ -1613,7 +1778,7 @@ out: lo->ldo_dir_stripe_count = le32_to_cpu(lmv1->lmv_stripe_count); lo->ldo_dir_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count); if (rc != 0) - lod_object_free_striping(env, lo); + lod_striping_free_nolock(env, lo); RETURN(rc); } @@ -1678,6 +1843,10 @@ static int lod_dir_declare_create_stripes(const struct lu_env *env, struct linkea_data ldata = { NULL }; struct lu_buf linkea_buf; + /* OBD_FAIL_MDS_STRIPE_FID may leave stripe uninitialized */ + if (!dto) + continue; + rc = lod_sub_declare_create(env, dto, attr, NULL, dof, th); if (rc != 0) GOTO(out, rc); @@ -1771,6 +1940,7 @@ static int lod_prep_md_striped_create(const struct lu_env *env, struct dt_object_format *dof, struct thandle *th) { + struct lod_thread_info *info = lod_env_info(env); struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev); struct lod_tgt_descs *ltd = &lod->lod_mdt_descs; struct lod_object *lo = lod_dt_obj(dt); @@ -1781,13 +1951,14 @@ static int lod_prep_md_striped_create(const struct lu_env *env, int rc = 0; __u32 i; __u32 j; + bool is_specific = false; ENTRY; /* The lum has been verifed in lod_verify_md_striping */ - LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC); - LASSERT(le32_to_cpu(lum->lum_stripe_count) > 0); + LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC || + le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC_SPECIFIC); - stripe_count = le32_to_cpu(lum->lum_stripe_count); + stripe_count = lo->ldo_dir_stripe_count; OBD_ALLOC(idx_array, sizeof(idx_array[0]) * stripe_count); if (idx_array == NULL) @@ -1800,6 +1971,12 @@ static int lod_prep_md_striped_create(const struct lu_env *env, /* Start index must be the master MDT */ master_index = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id; idx_array[0] = master_index; + if (le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC_SPECIFIC) { + is_specific = true; + for (i = 1; i < stripe_count; i++) + idx_array[i] = le32_to_cpu(lum->lum_objects[i].lum_mds); + } + for (i = 0; i < stripe_count; i++) { struct lod_tgt_desc *tgt = NULL; struct dt_object *dto; @@ -1818,7 +1995,8 @@ static int lod_prep_md_striped_create(const struct lu_env *env, CDEBUG(D_INFO, "try idx %d, mdt cnt %u, allocated %u\n", idx, lod->lod_remote_mdt_count + 1, i); - if (likely(!OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))) { + if (likely(!is_specific && + !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))) { /* check whether the idx already exists * in current allocated array */ for (k = 0; k < i; k++) { @@ -1854,7 +2032,7 @@ static int lod_prep_md_striped_create(const struct lu_env *env, continue; tgt_dt = tgt->ltd_tgt; - rc = dt_statfs(env, tgt_dt, NULL); + rc = dt_statfs(env, tgt_dt, &info->lti_osfs); if (rc) { /* this OSP doesn't feel well */ rc = 0; @@ -1881,13 +2059,26 @@ static int lod_prep_md_striped_create(const struct lu_env *env, idx, i, PFID(&fid)); idx_array[i] = idx; /* Set the start index for next stripe allocation */ - if (i < stripe_count - 1) + if (!is_specific && i < stripe_count - 1) { + /* + * for large dir test, put all other slaves on one + * remote MDT, otherwise we may save too many local + * slave locks which will exceed RS_MAX_LOCKS. + */ + if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))) + idx = master_index; idx_array[i + 1] = (idx + 1) % (lod->lod_remote_mdt_count + 1); + } /* tgt_dt and fid must be ready after search avaible OSP * in the above loop */ LASSERT(tgt_dt != NULL); LASSERT(fid_is_sane(&fid)); + + /* fail a remote stripe FID allocation */ + if (i && OBD_FAIL_CHECK(OBD_FAIL_MDS_STRIPE_FID)) + continue; + conf.loc_flags = LOC_F_NEW; dto = dt_locate_at(env, tgt_dt, &fid, dt->do_lu.lo_dev->ld_site->ls_top_dev, @@ -1897,11 +2088,12 @@ static int lod_prep_md_striped_create(const struct lu_env *env, stripe[i] = dto; } - lo->ldo_dir_stripe_loaded = 1; lo->ldo_dir_striped = 1; lo->ldo_stripe = stripe; lo->ldo_dir_stripe_count = i; lo->ldo_dir_stripes_allocated = stripe_count; + smp_mb(); + lo->ldo_dir_stripe_loaded = 1; if (lo->ldo_dir_stripe_count == 0) GOTO(out_put, rc = -ENOSPC); @@ -1928,6 +2120,27 @@ out_free: } /** + * + * Alloc cached foreign LMV + * + * \param[in] lo object + * \param[in] size size of foreign LMV + * + * \retval 0 on success + * \retval negative if failed + */ +int lod_alloc_foreign_lmv(struct lod_object *lo, size_t size) +{ + OBD_ALLOC_LARGE(lo->ldo_foreign_lmv, size); + if (lo->ldo_foreign_lmv == NULL) + return -ENOMEM; + lo->ldo_foreign_lmv_size = size; + lo->ldo_dir_is_foreign = 1; + + return 0; +} + +/** * Declare create striped md object. * * The function declares intention to create a striped directory. This is a @@ -1954,31 +2167,33 @@ static int lod_declare_xattr_set_lmv(const struct lu_env *env, struct thandle *th) { struct lod_object *lo = lod_dt_obj(dt); - struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev); - struct lmv_user_md_v1 *lum; + struct lmv_user_md_v1 *lum = lum_buf->lb_buf; int rc; ENTRY; - lum = lum_buf->lb_buf; LASSERT(lum != NULL); CDEBUG(D_INFO, "lum magic = %x count = %u offset = %d\n", le32_to_cpu(lum->lum_magic), le32_to_cpu(lum->lum_stripe_count), (int)le32_to_cpu(lum->lum_stripe_offset)); - if (le32_to_cpu(lum->lum_stripe_count) == 0) + if (lo->ldo_dir_stripe_count == 0) { + if (lo->ldo_dir_is_foreign) { + rc = lod_alloc_foreign_lmv(lo, lum_buf->lb_len); + if (rc != 0) + GOTO(out, rc); + memcpy(lo->ldo_foreign_lmv, lum, lum_buf->lb_len); + lo->ldo_dir_stripe_loaded = 1; + } GOTO(out, rc = 0); - - rc = lod_verify_md_striping(lod, lum); - if (rc != 0) - GOTO(out, rc); + } /* prepare dir striped objects */ rc = lod_prep_md_striped_create(env, dt, attr, lum, dof, th); if (rc != 0) { /* failed to create striping, let's reset * config so that others don't get confused */ - lod_object_free_striping(env, lo); + lod_striping_free(env, lo); GOTO(out, rc); } out: @@ -1986,97 +2201,406 @@ out: } /** - * Implementation of dt_object_operations::do_declare_xattr_set. + * Append source stripes after target stripes for migrating directory. NB, we + * only need to declare this, the append is done inside lod_xattr_set_lmv(). * - * Used with regular (non-striped) objects. Basically it - * initializes the striping information and applies the - * change to all the stripes. + * \param[in] env execution environment + * \param[in] dt target object + * \param[in] buf LMV buf which contains source stripe fids + * \param[in] th transaction handle * - * \see dt_object_operations::do_declare_xattr_set() in the API description - * for details. + * \retval 0 on success + * \retval negative if failed */ -static int lod_dir_declare_xattr_set(const struct lu_env *env, - struct dt_object *dt, - const struct lu_buf *buf, - const char *name, int fl, - struct thandle *th) +static int lod_dir_declare_layout_add(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + struct thandle *th) { - struct dt_object *next = dt_object_child(dt); - struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); - struct lod_object *lo = lod_dt_obj(dt); - int i; - int rc; + struct lod_thread_info *info = lod_env_info(env); + struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev); + struct lod_tgt_descs *ltd = &lod->lod_mdt_descs; + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object *next = dt_object_child(dt); + struct dt_object_format *dof = &info->lti_format; + struct lmv_mds_md_v1 *lmv = buf->lb_buf; + struct dt_object **stripe; + __u32 stripe_count = le32_to_cpu(lmv->lmv_stripe_count); + struct lu_fid *fid = &info->lti_fid; + struct lod_tgt_desc *tgt; + struct dt_object *dto; + struct dt_device *tgt_dt; + int type = LU_SEQ_RANGE_ANY; + struct dt_insert_rec *rec = &info->lti_dt_rec; + char *stripe_name = info->lti_key; + struct lu_name *sname; + struct linkea_data ldata = { NULL }; + struct lu_buf linkea_buf; + __u32 idx; + int i; + int rc; + ENTRY; - if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { - struct lmv_user_md_v1 *lum; + if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1) + RETURN(-EINVAL); - LASSERT(buf != NULL && buf->lb_buf != NULL); - lum = buf->lb_buf; - rc = lod_verify_md_striping(d, lum); - if (rc != 0) - RETURN(rc); - } else if (strcmp(name, XATTR_NAME_LOV) == 0) { - rc = lod_verify_striping(d, buf, false, 0); - if (rc != 0) - RETURN(rc); - } + if (stripe_count == 0) + RETURN(-EINVAL); - rc = lod_sub_declare_xattr_set(env, next, buf, name, fl, th); - if (rc != 0) - RETURN(rc); + dof->dof_type = DFT_DIR; - /* Note: Do not set LinkEA on sub-stripes, otherwise - * it will confuse the fid2path process(see mdt_path_current()). - * The linkEA between master and sub-stripes is set in - * lod_xattr_set_lmv(). */ - if (strcmp(name, XATTR_NAME_LINK) == 0) - RETURN(0); + OBD_ALLOC(stripe, + sizeof(*stripe) * (lo->ldo_dir_stripe_count + stripe_count)); + if (stripe == NULL) + RETURN(-ENOMEM); - /* set xattr to each stripes, if needed */ - rc = lod_load_striping(env, lo); - if (rc != 0) - RETURN(rc); + for (i = 0; i < lo->ldo_dir_stripe_count; i++) + stripe[i] = lo->ldo_stripe[i]; - if (lo->ldo_dir_stripe_count == 0) - RETURN(0); + for (i = 0; i < stripe_count; i++) { + fid_le_to_cpu(fid, + &lmv->lmv_stripe_fids[i]); + if (!fid_is_sane(fid)) + continue; - for (i = 0; i < lo->ldo_dir_stripe_count; i++) { - LASSERT(lo->ldo_stripe[i]); + rc = lod_fld_lookup(env, lod, fid, &idx, &type); + if (rc) + GOTO(out, rc); - rc = lod_sub_declare_xattr_set(env, lo->ldo_stripe[i], - buf, name, fl, th); - if (rc != 0) - break; - } + if (idx == lod2lu_dev(lod)->ld_site->ld_seq_site->ss_node_id) { + tgt_dt = lod->lod_child; + } else { + tgt = LTD_TGT(ltd, idx); + if (tgt == NULL) + GOTO(out, rc = -ESTALE); + tgt_dt = tgt->ltd_tgt; + } - RETURN(rc); -} + dto = dt_locate_at(env, tgt_dt, fid, + lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev, + NULL); + if (IS_ERR(dto)) + GOTO(out, rc = PTR_ERR(dto)); -static int -lod_obj_stripe_replace_parent_fid_cb(const struct lu_env *env, - struct lod_object *lo, - struct dt_object *dt, struct thandle *th, - int stripe_idx, - struct lod_obj_stripe_cb_data *data) -{ - struct lod_thread_info *info = lod_env_info(env); - struct filter_fid *ff = &info->lti_ff; - struct lu_buf *buf = &info->lti_buf; - int rc; + stripe[i + lo->ldo_dir_stripe_count] = dto; - buf->lb_buf = ff; + if (!dt_try_as_dir(env, dto)) + GOTO(out, rc = -ENOTDIR); + + rc = lod_sub_declare_ref_add(env, dto, th); + if (rc) + GOTO(out, rc); + + rc = lod_sub_declare_insert(env, dto, + (const struct dt_rec *)rec, + (const struct dt_key *)dot, th); + if (rc) + GOTO(out, rc); + + rc = lod_sub_declare_insert(env, dto, + (const struct dt_rec *)rec, + (const struct dt_key *)dotdot, th); + if (rc) + GOTO(out, rc); + + rc = lod_sub_declare_xattr_set(env, dto, buf, + XATTR_NAME_LMV, 0, th); + if (rc) + GOTO(out, rc); + + snprintf(stripe_name, sizeof(info->lti_key), DFID":%u", + PFID(lu_object_fid(&dto->do_lu)), + i + lo->ldo_dir_stripe_count); + + sname = lod_name_get(env, stripe_name, strlen(stripe_name)); + rc = linkea_links_new(&ldata, &info->lti_linkea_buf, + sname, lu_object_fid(&dt->do_lu)); + if (rc) + GOTO(out, rc); + + linkea_buf.lb_buf = ldata.ld_buf->lb_buf; + linkea_buf.lb_len = ldata.ld_leh->leh_len; + rc = lod_sub_declare_xattr_set(env, dto, &linkea_buf, + XATTR_NAME_LINK, 0, th); + if (rc) + GOTO(out, rc); + + rc = lod_sub_declare_insert(env, next, + (const struct dt_rec *)rec, + (const struct dt_key *)stripe_name, + th); + if (rc) + GOTO(out, rc); + + rc = lod_sub_declare_ref_add(env, next, th); + if (rc) + GOTO(out, rc); + } + + if (lo->ldo_stripe) + OBD_FREE(lo->ldo_stripe, + sizeof(*stripe) * lo->ldo_dir_stripes_allocated); + lo->ldo_stripe = stripe; + lo->ldo_dir_migrate_offset = lo->ldo_dir_stripe_count; + lo->ldo_dir_migrate_hash = le32_to_cpu(lmv->lmv_hash_type); + lo->ldo_dir_stripe_count += stripe_count; + lo->ldo_dir_stripes_allocated += stripe_count; + lo->ldo_dir_hash_type |= LMV_HASH_FLAG_MIGRATION; + + RETURN(0); +out: + i = lo->ldo_dir_stripe_count; + while (i < lo->ldo_dir_stripe_count + stripe_count && stripe[i]) + dt_object_put(env, stripe[i++]); + + OBD_FREE(stripe, + sizeof(*stripe) * (stripe_count + lo->ldo_dir_stripe_count)); + RETURN(rc); +} + +static int lod_dir_declare_layout_delete(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object *next = dt_object_child(dt); + struct lmv_user_md *lmu = buf->lb_buf; + __u32 final_stripe_count; + char *stripe_name = info->lti_key; + struct dt_object *dto; + int i; + int rc = 0; + + if (!lmu) + return -EINVAL; + + final_stripe_count = le32_to_cpu(lmu->lum_stripe_count); + if (final_stripe_count >= lo->ldo_dir_stripe_count) + return -EINVAL; + + for (i = final_stripe_count; i < lo->ldo_dir_stripe_count; i++) { + dto = lo->ldo_stripe[i]; + if (!dto) + continue; + + if (!dt_try_as_dir(env, dto)) + return -ENOTDIR; + + rc = lod_sub_declare_delete(env, dto, + (const struct dt_key *)dot, th); + if (rc) + return rc; + + rc = lod_sub_declare_ref_del(env, dto, th); + if (rc) + return rc; + + rc = lod_sub_declare_delete(env, dto, + (const struct dt_key *)dotdot, th); + if (rc) + return rc; + + snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", + PFID(lu_object_fid(&dto->do_lu)), i); + + rc = lod_sub_declare_delete(env, next, + (const struct dt_key *)stripe_name, th); + if (rc) + return rc; + + rc = lod_sub_declare_ref_del(env, next, th); + if (rc) + return rc; + } + + return 0; +} + +/* + * delete stripes from dir master object, the lum_stripe_count in argument is + * the final stripe count, the stripes after that will be deleted, NB, they + * are not destroyed, but deleted from it's parent namespace, this function + * will be called in two places: + * 1. mdd_migrate_create() delete stripes from source, and append them to + * target. + * 2. mdd_dir_layout_shrink() delete stripes from source, and destroy them. + */ +static int lod_dir_layout_delete(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object *next = dt_object_child(dt); + struct lmv_user_md *lmu = buf->lb_buf; + __u32 final_stripe_count; + char *stripe_name = info->lti_key; + struct dt_object *dto; + int i; + int rc = 0; + + ENTRY; + + if (!lmu) + RETURN(-EINVAL); + + final_stripe_count = le32_to_cpu(lmu->lum_stripe_count); + if (final_stripe_count >= lo->ldo_dir_stripe_count) + RETURN(-EINVAL); + + for (i = final_stripe_count; i < lo->ldo_dir_stripe_count; i++) { + dto = lo->ldo_stripe[i]; + if (!dto) + continue; + + rc = lod_sub_delete(env, dto, + (const struct dt_key *)dotdot, th); + if (rc) + break; + + snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", + PFID(lu_object_fid(&dto->do_lu)), i); + + rc = lod_sub_delete(env, next, + (const struct dt_key *)stripe_name, th); + if (rc) + break; + + rc = lod_sub_ref_del(env, next, th); + if (rc) + break; + } + + lod_striping_free(env, lod_dt_obj(dt)); + + RETURN(rc); +} + +/** + * Implementation of dt_object_operations::do_declare_xattr_set. + * + * Used with regular (non-striped) objects. Basically it + * initializes the striping information and applies the + * change to all the stripes. + * + * \see dt_object_operations::do_declare_xattr_set() in the API description + * for details. + */ +static int lod_dir_declare_xattr_set(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + const char *name, int fl, + struct thandle *th) +{ + struct dt_object *next = dt_object_child(dt); + struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); + struct lod_object *lo = lod_dt_obj(dt); + int i; + int rc; + ENTRY; + + if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { + struct lmv_user_md_v1 *lum; + + LASSERT(buf != NULL && buf->lb_buf != NULL); + lum = buf->lb_buf; + rc = lod_verify_md_striping(d, lum); + if (rc != 0) + RETURN(rc); + } else if (strcmp(name, XATTR_NAME_LOV) == 0) { + rc = lod_verify_striping(d, lo, buf, false); + if (rc != 0) + RETURN(rc); + } + + rc = lod_sub_declare_xattr_set(env, next, buf, name, fl, th); + if (rc != 0) + RETURN(rc); + + /* Note: Do not set LinkEA on sub-stripes, otherwise + * it will confuse the fid2path process(see mdt_path_current()). + * The linkEA between master and sub-stripes is set in + * lod_xattr_set_lmv(). */ + if (strcmp(name, XATTR_NAME_LINK) == 0) + RETURN(0); + + /* set xattr to each stripes, if needed */ + rc = lod_striping_load(env, lo); + if (rc != 0) + RETURN(rc); + + if (lo->ldo_dir_stripe_count == 0) + RETURN(0); + + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + if (!lo->ldo_stripe[i]) + continue; + + if (!dt_object_exists(lo->ldo_stripe[i])) + continue; + + rc = lod_sub_declare_xattr_set(env, lo->ldo_stripe[i], + buf, name, fl, th); + if (rc != 0) + break; + } + + RETURN(rc); +} + +static int +lod_obj_stripe_replace_parent_fid_cb(const struct lu_env *env, + struct lod_object *lo, + struct dt_object *dt, struct thandle *th, + int comp_idx, int stripe_idx, + struct lod_obj_stripe_cb_data *data) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_layout_component *comp = &lo->ldo_comp_entries[comp_idx]; + struct filter_fid *ff = &info->lti_ff; + struct lu_buf *buf = &info->lti_buf; + int rc; + + buf->lb_buf = ff; buf->lb_len = sizeof(*ff); rc = dt_xattr_get(env, dt, buf, XATTR_NAME_FID); - if (rc == -ENODATA) - return 0; - - if (rc < 0) + if (rc < 0) { + if (rc == -ENODATA) + return 0; return rc; + } + + /* + * locd_buf is set if it's called by dir migration, which doesn't check + * pfid and comp id. + */ + if (data->locd_buf) { + memset(ff, 0, sizeof(*ff)); + ff->ff_parent = *(struct lu_fid *)data->locd_buf->lb_buf; + } else { + filter_fid_le_to_cpu(ff, ff, sizeof(*ff)); + + if (lu_fid_eq(lod_object_fid(lo), &ff->ff_parent) && + ff->ff_layout.ol_comp_id == comp->llc_id) + return 0; + + memset(ff, 0, sizeof(*ff)); + ff->ff_parent = *lu_object_fid(&lo->ldo_obj.do_lu); + } - ff->ff_parent = *lu_object_fid(&lo->ldo_obj.do_lu); + /* rewrite filter_fid */ ff->ff_parent.f_ver = stripe_idx; - fid_cpu_to_le(&ff->ff_parent, &ff->ff_parent); + ff->ff_layout.ol_stripe_size = comp->llc_stripe_size; + ff->ff_layout.ol_stripe_count = comp->llc_stripe_count; + ff->ff_layout.ol_comp_id = comp->llc_id; + ff->ff_layout.ol_comp_start = comp->llc_extent.e_start; + ff->ff_layout.ol_comp_end = comp->llc_extent.e_end; + filter_fid_cpu_to_le(ff, ff, sizeof(*ff)); + if (data->locd_declare) rc = lod_sub_declare_xattr_set(env, dt, buf, XATTR_NAME_FID, LU_XATTR_REPLACE, th); @@ -2104,20 +2628,20 @@ lod_obj_stripe_replace_parent_fid_cb(const struct lu_env *env, */ static int lod_replace_parent_fid(const struct lu_env *env, struct dt_object *dt, + const struct lu_buf *buf, struct thandle *th, bool declare) { struct lod_object *lo = lod_dt_obj(dt); struct lod_thread_info *info = lod_env_info(env); - struct lu_buf *buf = &info->lti_buf; struct filter_fid *ff; - struct lod_obj_stripe_cb_data data; + struct lod_obj_stripe_cb_data data = { { 0 } }; int rc; ENTRY; LASSERT(S_ISREG(dt->do_lu.lo_header->loh_attr)); /* set xattr to each stripes, if needed */ - rc = lod_load_striping(env, lo); + rc = lod_striping_load(env, lo); if (rc != 0) RETURN(rc); @@ -2130,13 +2654,10 @@ static int lod_replace_parent_fid(const struct lu_env *env, RETURN(rc); } - buf->lb_buf = info->lti_ea_store; - buf->lb_len = info->lti_ea_store_size; - data.locd_declare = declare; - rc = lod_obj_for_each_stripe(env, lo, th, - lod_obj_stripe_replace_parent_fid_cb, - &data); + data.locd_stripe_cb = lod_obj_stripe_replace_parent_fid_cb; + data.locd_buf = buf; + rc = lod_obj_for_each_stripe(env, lo, th, &data); RETURN(rc); } @@ -2154,7 +2675,8 @@ inline __u16 lod_comp_entry_stripe_count(struct lod_object *lo, else if ((__u16)-1 == entry->llc_stripe_count) return lod->lod_desc.ld_tgt_count; else - return lod_get_stripe_count(lod, lo, entry->llc_stripe_count); + return lod_get_stripe_count(lod, lo, + entry->llc_stripe_count, false); } static int lod_comp_md_size(struct lod_object *lo, bool is_dir) @@ -2162,7 +2684,7 @@ static int lod_comp_md_size(struct lod_object *lo, bool is_dir) int magic, size = 0, i; struct lod_layout_component *comp_entries; __u16 comp_cnt; - bool is_composite; + bool is_composite, is_foreign = false; if (is_dir) { comp_cnt = lo->ldo_def_striping->lds_def_comp_cnt; @@ -2173,8 +2695,11 @@ static int lod_comp_md_size(struct lod_object *lo, bool is_dir) comp_cnt = lo->ldo_comp_cnt; comp_entries = lo->ldo_comp_entries; is_composite = lo->ldo_is_composite; + is_foreign = lo->ldo_is_foreign; } + if (is_foreign) + return lo->ldo_foreign_lov_size; LASSERT(comp_cnt != 0 && comp_entries != NULL); if (is_composite) { @@ -2218,7 +2743,7 @@ static int lod_declare_layout_add(const struct lu_env *env, struct thandle *th) { struct lod_thread_info *info = lod_env_info(env); - struct lod_layout_component *comp_array, *lod_comp; + struct lod_layout_component *comp_array, *lod_comp, *old_array; struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); struct dt_object *next = dt_object_child(dt); struct lov_desc *desc = &d->lod_desc; @@ -2226,14 +2751,15 @@ static int lod_declare_layout_add(const struct lu_env *env, struct lov_user_md_v3 *v3; struct lov_comp_md_v1 *comp_v1 = buf->lb_buf; __u32 magic; - __u64 prev_end; - int i, rc, array_cnt; + int i, rc, array_cnt, old_array_cnt; ENTRY; LASSERT(lo->ldo_is_composite); - prev_end = lo->ldo_comp_entries[lo->ldo_comp_cnt - 1].llc_extent.e_end; - rc = lod_verify_striping(d, buf, false, prev_end); + if (lo->ldo_flr_state != LCM_FL_NONE) + RETURN(-EBUSY); + + rc = lod_verify_striping(d, lo, buf, false); if (rc != 0) RETURN(rc); @@ -2266,16 +2792,11 @@ static int lod_declare_layout_add(const struct lu_env *env, lod_comp->llc_extent.e_start = ext->e_start; lod_comp->llc_extent.e_end = ext->e_end; lod_comp->llc_stripe_offset = v1->lmm_stripe_offset; + lod_comp->llc_flags = comp_v1->lcm_entries[i].lcme_flags; lod_comp->llc_stripe_count = v1->lmm_stripe_count; - if (!lod_comp->llc_stripe_count || - lod_comp->llc_stripe_count == (__u16)-1) - lod_comp->llc_stripe_count = - desc->ld_default_stripe_count; lod_comp->llc_stripe_size = v1->lmm_stripe_size; - if (!lod_comp->llc_stripe_size) - lod_comp->llc_stripe_size = - desc->ld_default_stripe_size; + lod_adjust_stripe_info(lod_comp, desc); if (v1->lmm_magic == LOV_USER_MAGIC_V3) { v3 = (struct lov_user_md_v3 *) v1; @@ -2288,17 +2809,28 @@ static int lod_declare_layout_add(const struct lu_env *env, } } - OBD_FREE(lo->ldo_comp_entries, sizeof(*lod_comp) * lo->ldo_comp_cnt); + old_array = lo->ldo_comp_entries; + old_array_cnt = lo->ldo_comp_cnt; + lo->ldo_comp_entries = comp_array; lo->ldo_comp_cnt = array_cnt; + /* No need to increase layout generation here, it will be increased * later when generating component ID for the new components */ info->lti_buf.lb_len = lod_comp_md_size(lo, false); rc = lod_sub_declare_xattr_set(env, next, &info->lti_buf, XATTR_NAME_LOV, 0, th); - if (rc) + if (rc) { + lo->ldo_comp_entries = old_array; + lo->ldo_comp_cnt = old_array_cnt; GOTO(error, rc); + } + + OBD_FREE(old_array, sizeof(*lod_comp) * old_array_cnt); + + LASSERT(lo->ldo_mirror_count == 1); + lo->ldo_mirrors[0].lme_end = array_cnt - 1; RETURN(0); @@ -2340,7 +2872,7 @@ static int lod_declare_layout_set(const struct lu_env *env, struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); struct lod_object *lo = lod_dt_obj(dt); struct lov_comp_md_v1 *comp_v1 = buf->lb_buf; - __u32 magic, id; + __u32 magic; int i, j, rc; bool changed = false; ENTRY; @@ -2367,15 +2899,43 @@ static int lod_declare_layout_set(const struct lu_env *env, } for (i = 0; i < comp_v1->lcm_entry_count; i++) { - id = comp_v1->lcm_entries[i].lcme_id; + __u32 id = comp_v1->lcm_entries[i].lcme_id; + __u32 flags = comp_v1->lcm_entries[i].lcme_flags; + __u32 mirror_flag = flags & LCME_MIRROR_FLAGS; + bool neg = flags & LCME_FL_NEG; + + if (flags & LCME_FL_INIT) { + if (changed) + lod_striping_free(env, lo); + RETURN(-EINVAL); + } + flags &= ~(LCME_MIRROR_FLAGS | LCME_FL_NEG); for (j = 0; j < lo->ldo_comp_cnt; j++) { lod_comp = &lo->ldo_comp_entries[j]; - if (id == lod_comp->llc_id || id == LCME_ID_ALL) { - lod_comp->llc_flags = - comp_v1->lcm_entries[i].lcme_flags; - changed = true; + + /* lfs only put one flag in each entry */ + if ((flags && id != lod_comp->llc_id) || + (mirror_flag && mirror_id_of(id) != + mirror_id_of(lod_comp->llc_id))) + continue; + + if (neg) { + if (flags) + lod_comp->llc_flags &= ~flags; + if (mirror_flag) + lod_comp->llc_flags &= ~mirror_flag; + } else { + if (flags) + lod_comp->llc_flags |= flags; + if (mirror_flag) { + lod_comp->llc_flags |= mirror_flag; + if (mirror_flag & LCME_FL_NOSYNC) + lod_comp->llc_timestamp = + ktime_get_real_seconds(); + } } + changed = true; } } @@ -2388,8 +2948,8 @@ static int lod_declare_layout_set(const struct lu_env *env, lod_obj_inc_layout_gen(lo); info->lti_buf.lb_len = lod_comp_md_size(lo, false); - rc = lod_sub_declare_xattr_set(env, dt, &info->lti_buf, - XATTR_NAME_LOV, 0, th); + rc = lod_sub_declare_xattr_set(env, dt_object_child(dt), &info->lti_buf, + XATTR_NAME_LOV, LU_XATTR_REPLACE, th); RETURN(rc); } @@ -2422,9 +2982,8 @@ static int lod_declare_layout_del(const struct lu_env *env, LASSERT(lo->ldo_is_composite); - rc = lod_verify_striping(d, buf, false, 0); - if (rc != 0) - RETURN(rc); + if (lo->ldo_flr_state != LCM_FL_NONE) + RETURN(-EBUSY); magic = comp_v1->lcm_magic; if (magic == __swab32(LOV_USER_MAGIC_COMP_V1)) { @@ -2450,6 +3009,12 @@ static int lod_declare_layout_del(const struct lu_env *env, RETURN(-EINVAL); } + if (id == LCME_ID_INVAL && !flags) { + CDEBUG(D_LAYOUT, "%s: no id or flags specified.\n", + lod2obd(d)->obd_name); + RETURN(-EINVAL); + } + if (flags & LCME_FL_NEG) { neg_flags = flags & ~LCME_FL_NEG; flags = 0; @@ -2545,7 +3110,6 @@ static int lod_declare_modify_layout(const struct lu_env *env, { struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); struct lod_object *lo = lod_dt_obj(dt); - struct dt_object *next = dt_object_child(&lo->ldo_obj); char *op; int rc, len = strlen(XATTR_LUSTRE_LOV); ENTRY; @@ -2559,8 +3123,7 @@ static int lod_declare_modify_layout(const struct lu_env *env, } len++; - dt_write_lock(env, next, 0); - rc = lod_load_striping_locked(env, lo); + rc = lod_striping_load(env, lo); if (rc) GOTO(unlock, rc); @@ -2585,37 +3148,270 @@ static int lod_declare_modify_layout(const struct lu_env *env, } unlock: if (rc) - lod_object_free_striping(env, lo); - dt_write_unlock(env, next); + lod_striping_free(env, lo); RETURN(rc); } /** - * Implementation of dt_object_operations::do_declare_xattr_set. + * Convert a plain file lov_mds_md to a composite layout. * - * \see dt_object_operations::do_declare_xattr_set() in the API description - * for details. + * \param[in,out] info the thread info::lti_ea_store buffer contains little + * endian plain file layout * - * the extension to the API: - * - declaring LOVEA requests striping creation - * - LU_XATTR_REPLACE means layout swap + * \retval 0 on success, <0 on failure */ -static int lod_declare_xattr_set(const struct lu_env *env, - struct dt_object *dt, - const struct lu_buf *buf, - const char *name, int fl, - struct thandle *th) +static int lod_layout_convert(struct lod_thread_info *info) { - struct dt_object *next = dt_object_child(dt); - struct lu_attr *attr = &lod_env_info(env)->lti_attr; - __u32 mode; - int rc; + struct lov_mds_md *lmm = info->lti_ea_store; + struct lov_mds_md *lmm_save; + struct lov_comp_md_v1 *lcm; + struct lov_comp_md_entry_v1 *lcme; + size_t size; + __u32 blob_size; + int rc = 0; ENTRY; - mode = dt->do_lu.lo_header->loh_attr & S_IFMT; - if ((S_ISREG(mode) || mode == 0) && !(fl & LU_XATTR_REPLACE) && - (strcmp(name, XATTR_NAME_LOV) == 0 || + /* realloc buffer to a composite layout which contains one component */ + blob_size = lov_mds_md_size(le16_to_cpu(lmm->lmm_stripe_count), + le32_to_cpu(lmm->lmm_magic)); + size = sizeof(*lcm) + sizeof(*lcme) + blob_size; + + OBD_ALLOC_LARGE(lmm_save, blob_size); + if (!lmm_save) + GOTO(out, rc = -ENOMEM); + + memcpy(lmm_save, lmm, blob_size); + + if (info->lti_ea_store_size < size) { + rc = lod_ea_store_resize(info, size); + if (rc) + GOTO(out, rc); + } + + lcm = info->lti_ea_store; + lcm->lcm_magic = cpu_to_le32(LOV_MAGIC_COMP_V1); + lcm->lcm_size = cpu_to_le32(size); + lcm->lcm_layout_gen = cpu_to_le32(le16_to_cpu( + lmm_save->lmm_layout_gen)); + lcm->lcm_flags = cpu_to_le16(LCM_FL_NONE); + lcm->lcm_entry_count = cpu_to_le16(1); + lcm->lcm_mirror_count = 0; + + lcme = &lcm->lcm_entries[0]; + lcme->lcme_flags = cpu_to_le32(LCME_FL_INIT); + lcme->lcme_extent.e_start = 0; + lcme->lcme_extent.e_end = cpu_to_le64(OBD_OBJECT_EOF); + lcme->lcme_offset = cpu_to_le32(sizeof(*lcm) + sizeof(*lcme)); + lcme->lcme_size = cpu_to_le32(blob_size); + + memcpy((char *)lcm + lcme->lcme_offset, (char *)lmm_save, blob_size); + + EXIT; +out: + if (lmm_save) + OBD_FREE_LARGE(lmm_save, blob_size); + return rc; +} + +/** + * Merge layouts to form a mirrored file. + */ +static int lod_declare_layout_merge(const struct lu_env *env, + struct dt_object *dt, const struct lu_buf *mbuf, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lu_buf *buf = &info->lti_buf; + struct lod_object *lo = lod_dt_obj(dt); + struct lov_comp_md_v1 *lcm; + struct lov_comp_md_v1 *cur_lcm; + struct lov_comp_md_v1 *merge_lcm; + struct lov_comp_md_entry_v1 *lcme; + size_t size = 0; + size_t offset; + __u16 cur_entry_count; + __u16 merge_entry_count; + __u32 id = 0; + __u16 mirror_id = 0; + __u32 mirror_count; + int rc, i; + ENTRY; + + merge_lcm = mbuf->lb_buf; + if (mbuf->lb_len < sizeof(*merge_lcm)) + RETURN(-EINVAL); + + /* must be an existing layout from disk */ + if (le32_to_cpu(merge_lcm->lcm_magic) != LOV_MAGIC_COMP_V1) + RETURN(-EINVAL); + + merge_entry_count = le16_to_cpu(merge_lcm->lcm_entry_count); + + /* do not allow to merge two mirrored files */ + if (le16_to_cpu(merge_lcm->lcm_mirror_count)) + RETURN(-EBUSY); + + /* verify the target buffer */ + rc = lod_get_lov_ea(env, lo); + if (rc <= 0) + RETURN(rc ? : -ENODATA); + + cur_lcm = info->lti_ea_store; + switch (le32_to_cpu(cur_lcm->lcm_magic)) { + case LOV_MAGIC_V1: + case LOV_MAGIC_V3: + rc = lod_layout_convert(info); + break; + case LOV_MAGIC_COMP_V1: + rc = 0; + break; + default: + rc = -EINVAL; + } + if (rc) + RETURN(rc); + + /* info->lti_ea_store could be reallocated in lod_layout_convert() */ + cur_lcm = info->lti_ea_store; + cur_entry_count = le16_to_cpu(cur_lcm->lcm_entry_count); + + /* 'lcm_mirror_count + 1' is the current # of mirrors the file has */ + mirror_count = le16_to_cpu(cur_lcm->lcm_mirror_count) + 1; + if (mirror_count + 1 > LUSTRE_MIRROR_COUNT_MAX) + RETURN(-ERANGE); + + /* size of new layout */ + size = le32_to_cpu(cur_lcm->lcm_size) + + le32_to_cpu(merge_lcm->lcm_size) - sizeof(*cur_lcm); + + memset(buf, 0, sizeof(*buf)); + lu_buf_alloc(buf, size); + if (buf->lb_buf == NULL) + RETURN(-ENOMEM); + + lcm = buf->lb_buf; + memcpy(lcm, cur_lcm, sizeof(*lcm) + cur_entry_count * sizeof(*lcme)); + + offset = sizeof(*lcm) + + sizeof(*lcme) * (cur_entry_count + merge_entry_count); + for (i = 0; i < cur_entry_count; i++) { + struct lov_comp_md_entry_v1 *cur_lcme; + + lcme = &lcm->lcm_entries[i]; + cur_lcme = &cur_lcm->lcm_entries[i]; + + lcme->lcme_offset = cpu_to_le32(offset); + memcpy((char *)lcm + offset, + (char *)cur_lcm + le32_to_cpu(cur_lcme->lcme_offset), + le32_to_cpu(lcme->lcme_size)); + + offset += le32_to_cpu(lcme->lcme_size); + + if (mirror_count == 1 && + mirror_id_of(le32_to_cpu(lcme->lcme_id)) == 0) { + /* Add mirror from a non-flr file, create new mirror ID. + * Otherwise, keep existing mirror's component ID, used + * for mirror extension. + */ + id = pflr_id(1, i + 1); + lcme->lcme_id = cpu_to_le32(id); + } + + id = MAX(le32_to_cpu(lcme->lcme_id), id); + } + + mirror_id = mirror_id_of(id) + 1; + for (i = 0; i < merge_entry_count; i++) { + struct lov_comp_md_entry_v1 *merge_lcme; + + merge_lcme = &merge_lcm->lcm_entries[i]; + lcme = &lcm->lcm_entries[cur_entry_count + i]; + + *lcme = *merge_lcme; + lcme->lcme_offset = cpu_to_le32(offset); + + id = pflr_id(mirror_id, i + 1); + lcme->lcme_id = cpu_to_le32(id); + + memcpy((char *)lcm + offset, + (char *)merge_lcm + le32_to_cpu(merge_lcme->lcme_offset), + le32_to_cpu(lcme->lcme_size)); + + offset += le32_to_cpu(lcme->lcme_size); + } + + /* fixup layout information */ + lod_obj_inc_layout_gen(lo); + lcm->lcm_layout_gen = cpu_to_le32(lo->ldo_layout_gen); + lcm->lcm_size = cpu_to_le32(size); + lcm->lcm_entry_count = cpu_to_le16(cur_entry_count + merge_entry_count); + lcm->lcm_mirror_count = cpu_to_le16(mirror_count); + if ((le16_to_cpu(lcm->lcm_flags) & LCM_FL_FLR_MASK) == LCM_FL_NONE) + lcm->lcm_flags = cpu_to_le32(LCM_FL_RDONLY); + + rc = lod_striping_reload(env, lo, buf); + if (rc) + GOTO(out, rc); + + rc = lod_sub_declare_xattr_set(env, dt_object_child(dt), buf, + XATTR_NAME_LOV, LU_XATTR_REPLACE, th); + +out: + lu_buf_free(buf); + RETURN(rc); +} + +/** + * Split layouts, just set the LOVEA with the layout from mbuf. + */ +static int lod_declare_layout_split(const struct lu_env *env, + struct dt_object *dt, const struct lu_buf *mbuf, + struct thandle *th) +{ + struct lod_object *lo = lod_dt_obj(dt); + struct lov_comp_md_v1 *lcm = mbuf->lb_buf; + int rc; + ENTRY; + + lod_obj_inc_layout_gen(lo); + lcm->lcm_layout_gen = cpu_to_le32(lo->ldo_layout_gen); + + rc = lod_striping_reload(env, lo, mbuf); + if (rc) + RETURN(rc); + + rc = lod_sub_declare_xattr_set(env, dt_object_child(dt), mbuf, + XATTR_NAME_LOV, LU_XATTR_REPLACE, th); + RETURN(rc); +} + +/** + * Implementation of dt_object_operations::do_declare_xattr_set. + * + * \see dt_object_operations::do_declare_xattr_set() in the API description + * for details. + * + * the extension to the API: + * - declaring LOVEA requests striping creation + * - LU_XATTR_REPLACE means layout swap + */ +static int lod_declare_xattr_set(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + const char *name, int fl, + struct thandle *th) +{ + struct dt_object *next = dt_object_child(dt); + struct lu_attr *attr = &lod_env_info(env)->lti_attr; + __u32 mode; + int rc; + ENTRY; + + mode = dt->do_lu.lo_header->loh_attr & S_IFMT; + if ((S_ISREG(mode) || mode == 0) && + !(fl & (LU_XATTR_REPLACE | LU_XATTR_MERGE | LU_XATTR_SPLIT)) && + (strcmp(name, XATTR_NAME_LOV) == 0 || strcmp(name, XATTR_LUSTRE_LOV) == 0)) { /* * this is a request to create object's striping. @@ -2636,6 +3432,14 @@ static int lod_declare_xattr_set(const struct lu_env *env, attr->la_mode = S_IFREG; } rc = lod_declare_striped_create(env, dt, attr, buf, th); + } else if (fl & LU_XATTR_MERGE) { + LASSERT(strcmp(name, XATTR_NAME_LOV) == 0 || + strcmp(name, XATTR_LUSTRE_LOV) == 0); + rc = lod_declare_layout_merge(env, dt, buf, th); + } else if (fl & LU_XATTR_SPLIT) { + LASSERT(strcmp(name, XATTR_NAME_LOV) == 0 || + strcmp(name, XATTR_LUSTRE_LOV) == 0); + rc = lod_declare_layout_split(env, dt, buf, th); } else if (S_ISREG(mode) && strlen(name) > strlen(XATTR_LUSTRE_LOV) + 1 && strncmp(name, XATTR_LUSTRE_LOV, @@ -2648,10 +3452,24 @@ static int lod_declare_xattr_set(const struct lu_env *env, RETURN(-ENOENT); rc = lod_declare_modify_layout(env, dt, name, buf, th); + } else if (strncmp(name, XATTR_NAME_LMV, strlen(XATTR_NAME_LMV)) == 0 && + strlen(name) > strlen(XATTR_NAME_LMV) + 1) { + const char *op = name + strlen(XATTR_NAME_LMV) + 1; + + rc = -ENOTSUPP; + if (strcmp(op, "add") == 0) + rc = lod_dir_declare_layout_add(env, dt, buf, th); + else if (strcmp(op, "del") == 0) + rc = lod_dir_declare_layout_delete(env, dt, buf, th); + else if (strcmp(op, "set") == 0) + rc = lod_sub_declare_xattr_set(env, next, buf, + XATTR_NAME_LMV, fl, th); + + RETURN(rc); } else if (S_ISDIR(mode)) { rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th); } else if (strcmp(name, XATTR_NAME_FID) == 0) { - rc = lod_replace_parent_fid(env, dt, th, true); + rc = lod_replace_parent_fid(env, dt, buf, th, true); } else { rc = lod_sub_declare_xattr_set(env, next, buf, name, fl, th); } @@ -2698,7 +3516,11 @@ static int lod_xattr_set_internal(const struct lu_env *env, RETURN(0); for (i = 0; i < lo->ldo_dir_stripe_count; i++) { - LASSERT(lo->ldo_stripe[i]); + if (!lo->ldo_stripe[i]) + continue; + + if (!dt_object_exists(lo->ldo_stripe[i])) + continue; rc = lod_sub_xattr_set(env, lo->ldo_stripe[i], buf, name, fl, th); @@ -2785,6 +3607,7 @@ static int lod_xattr_set_lov_on_dir(const struct lu_env *env, lum = buf->lb_buf; switch (lum->lmm_magic) { + case LOV_USER_MAGIC_SPECIFIC: case LOV_USER_MAGIC_V3: v3 = buf->lb_buf; if (v3->lmm_pool_name[0] != '\0') @@ -2862,7 +3685,8 @@ static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env, if (LMVEA_DELETE_VALUES((le32_to_cpu(lum->lum_stripe_count)), le32_to_cpu(lum->lum_stripe_offset)) && - le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC) { + le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC && + le32_to_cpu(lum->lum_hash_type) == LMV_HASH_TYPE_UNKNOWN) { rc = lod_xattr_del_internal(env, dt, name, th); if (rc == -ENODATA) rc = 0; @@ -2916,8 +3740,15 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, /* The stripes are supposed to be allocated in declare phase, * if there are no stripes being allocated, it will skip */ - if (lo->ldo_dir_stripe_count == 0) + if (lo->ldo_dir_stripe_count == 0) { + if (lo->ldo_dir_is_foreign) { + rc = lod_sub_xattr_set(env, dt_object_child(dt), buf, + XATTR_NAME_LMV, fl, th); + if (rc != 0) + RETURN(rc); + } RETURN(0); + } rc = dt_attr_get(env, dt_object_child(dt), attr); if (rc != 0) @@ -2942,35 +3773,46 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, rec->rec_type = S_IFDIR; for (i = 0; i < lo->ldo_dir_stripe_count; i++) { - struct dt_object *dto; - char *stripe_name = info->lti_key; - struct lu_name *sname; - struct linkea_data ldata = { NULL }; - struct lu_buf linkea_buf; + struct dt_object *dto = lo->ldo_stripe[i]; + char *stripe_name = info->lti_key; + struct lu_name *sname; + struct linkea_data ldata = { NULL }; + struct lu_buf linkea_buf; + + /* OBD_FAIL_MDS_STRIPE_FID may leave stripe uninitialized */ + if (!dto) + continue; - dto = lo->ldo_stripe[i]; + /* fail a remote stripe creation */ + if (i && OBD_FAIL_CHECK(OBD_FAIL_MDS_STRIPE_CREATE)) + continue; - dt_write_lock(env, dto, MOR_TGT_CHILD); - rc = lod_sub_create(env, dto, attr, NULL, dof, th); - if (rc != 0) { - dt_write_unlock(env, dto); - GOTO(out, rc); - } + /* if it's source stripe of migrating directory, don't create */ + if (!((lo->ldo_dir_hash_type & LMV_HASH_FLAG_MIGRATION) && + i >= lo->ldo_dir_migrate_offset)) { + dt_write_lock(env, dto, MOR_TGT_CHILD); + rc = lod_sub_create(env, dto, attr, NULL, dof, th); + if (rc != 0) { + dt_write_unlock(env, dto); + GOTO(out, rc); + } - rc = lod_sub_ref_add(env, dto, th); - dt_write_unlock(env, dto); - if (rc != 0) - GOTO(out, rc); + rc = lod_sub_ref_add(env, dto, th); + dt_write_unlock(env, dto); + if (rc != 0) + GOTO(out, rc); - rec->rec_fid = lu_object_fid(&dto->do_lu); - rc = lod_sub_insert(env, dto, (const struct dt_rec *)rec, - (const struct dt_key *)dot, th, 0); - if (rc != 0) - GOTO(out, rc); + rec->rec_fid = lu_object_fid(&dto->do_lu); + rc = lod_sub_insert(env, dto, + (const struct dt_rec *)rec, + (const struct dt_key *)dot, th); + if (rc != 0) + GOTO(out, rc); + } rec->rec_fid = lu_object_fid(&dt->do_lu); rc = lod_sub_insert(env, dto, (struct dt_rec *)rec, - (const struct dt_key *)dotdot, th, 0); + (const struct dt_key *)dotdot, th); if (rc != 0) GOTO(out, rc); @@ -3014,7 +3856,7 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, rec->rec_fid = lu_object_fid(&dto->do_lu); rc = lod_sub_insert(env, dt_object_child(dt), (const struct dt_rec *)rec, - (const struct dt_key *)stripe_name, th, 0); + (const struct dt_key *)stripe_name, th); if (rc != 0) GOTO(out, rc); @@ -3048,6 +3890,7 @@ out: * \param[in] env execution environment * \param[in] dt object * \param[in] attr attributes the stripes will be created with + * \param[in] lmu lmv_user_md if MDT indices are specified * \param[in] dof format of stripes (see OSD API description) * \param[in] th transaction handle * \param[in] declare where to call "declare" or "execute" methods @@ -3058,6 +3901,7 @@ out: static int lod_dir_striping_create_internal(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, + const struct lu_buf *lmu, struct dt_object_format *dof, struct thandle *th, bool declare) @@ -3074,39 +3918,64 @@ static int lod_dir_striping_create_internal(const struct lu_env *env, if (!LMVEA_DELETE_VALUES(lo->ldo_dir_stripe_count, lo->ldo_dir_stripe_offset)) { - struct lmv_user_md_v1 *v1 = info->lti_ea_store; - int stripe_count = lo->ldo_dir_stripe_count; + if (!lmu) { + struct lmv_user_md_v1 *v1 = info->lti_ea_store; + int stripe_count = lo->ldo_dir_stripe_count; - if (info->lti_ea_store_size < sizeof(*v1)) { - rc = lod_ea_store_resize(info, sizeof(*v1)); - if (rc != 0) - RETURN(rc); - v1 = info->lti_ea_store; - } + if (info->lti_ea_store_size < sizeof(*v1)) { + rc = lod_ea_store_resize(info, sizeof(*v1)); + if (rc != 0) + RETURN(rc); + v1 = info->lti_ea_store; + } - memset(v1, 0, sizeof(*v1)); - v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC); - v1->lum_stripe_count = cpu_to_le32(stripe_count); - v1->lum_stripe_offset = - cpu_to_le32(lo->ldo_dir_stripe_offset); + memset(v1, 0, sizeof(*v1)); + v1->lum_magic = cpu_to_le32(LMV_USER_MAGIC); + v1->lum_stripe_count = cpu_to_le32(stripe_count); + v1->lum_stripe_offset = + cpu_to_le32(lo->ldo_dir_stripe_offset); - info->lti_buf.lb_buf = v1; - info->lti_buf.lb_len = sizeof(*v1); + info->lti_buf.lb_buf = v1; + info->lti_buf.lb_len = sizeof(*v1); + lmu = &info->lti_buf; + } if (declare) - rc = lod_declare_xattr_set_lmv(env, dt, attr, - &info->lti_buf, dof, th); + rc = lod_declare_xattr_set_lmv(env, dt, attr, lmu, dof, + th); else - rc = lod_xattr_set_lmv(env, dt, &info->lti_buf, - XATTR_NAME_LMV, 0, th); + rc = lod_xattr_set_lmv(env, dt, lmu, XATTR_NAME_LMV, 0, + th); if (rc != 0) RETURN(rc); + } else { + /* foreign LMV EA case */ + if (lmu) { + struct lmv_foreign_md *lfm = lmu->lb_buf; + + if (lfm->lfm_magic == LMV_MAGIC_FOREIGN) { + rc = lod_declare_xattr_set_lmv(env, dt, attr, + lmu, dof, th); + } + } else { + if (lo->ldo_dir_is_foreign) { + LASSERT(lo->ldo_foreign_lmv != NULL && + lo->ldo_foreign_lmv_size > 0); + info->lti_buf.lb_buf = lo->ldo_foreign_lmv; + info->lti_buf.lb_len = lo->ldo_foreign_lmv_size; + lmu = &info->lti_buf; + rc = lod_xattr_set_lmv(env, dt, lmu, + XATTR_NAME_LMV, 0, th); + } + } } /* Transfer default LMV striping from the parent */ if (lds != NULL && lds->lds_dir_def_striping_set && - !LMVEA_DELETE_VALUES(lds->lds_dir_def_stripe_count, - lds->lds_dir_def_stripe_offset)) { + !(LMVEA_DELETE_VALUES(lds->lds_dir_def_stripe_count, + lds->lds_dir_def_stripe_offset) && + le32_to_cpu(lds->lds_dir_def_hash_type) != + LMV_HASH_TYPE_UNKNOWN)) { struct lmv_user_md_v1 *v1 = info->lti_ea_store; if (info->lti_ea_store_size < sizeof(*v1)) { @@ -3176,10 +4045,12 @@ static int lod_dir_striping_create_internal(const struct lu_env *env, static int lod_declare_dir_striping_create(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, + struct lu_buf *lmu, struct dt_object_format *dof, struct thandle *th) { - return lod_dir_striping_create_internal(env, dt, attr, dof, th, true); + return lod_dir_striping_create_internal(env, dt, attr, lmu, dof, th, + true); } static int lod_dir_striping_create(const struct lu_env *env, @@ -3188,7 +4059,8 @@ static int lod_dir_striping_create(const struct lu_env *env, struct dt_object_format *dof, struct thandle *th) { - return lod_dir_striping_create_internal(env, dt, attr, dof, th, false); + return lod_dir_striping_create_internal(env, dt, attr, NULL, dof, th, + false); } /** @@ -3219,8 +4091,8 @@ static int lod_generate_and_set_lovea(const struct lu_env *env, LASSERT(lo); - if (lo->ldo_comp_cnt == 0) { - lod_object_free_striping(env, lo); + if (lo->ldo_comp_cnt == 0 && !lo->ldo_is_foreign) { + lod_striping_free(env, lo); rc = lod_sub_xattr_del(env, next, XATTR_NAME_LOV, th); RETURN(rc); } @@ -3294,6 +4166,9 @@ static int lod_layout_del(const struct lu_env *env, struct dt_object *dt, OBD_FREE(lod_comp->llc_stripe, sizeof(struct dt_object *) * lod_comp->llc_stripes_allocated); lod_comp->llc_stripe = NULL; + OBD_FREE(lod_comp->llc_ost_indices, + sizeof(__u32) * lod_comp->llc_stripes_allocated); + lod_comp->llc_ost_indices = NULL; lod_comp->llc_stripes_allocated = 0; lod_obj_set_pool(lo, i, NULL); if (lod_comp->llc_ostlist.op_array) { @@ -3319,6 +4194,9 @@ static int lod_layout_del(const struct lu_env *env, struct dt_object *dt, sizeof(*comp_array) * lo->ldo_comp_cnt); lo->ldo_comp_entries = comp_array; lo->ldo_comp_cnt = left; + + LASSERT(lo->ldo_mirror_count == 1); + lo->ldo_mirrors[0].lme_end = left - 1; lod_obj_inc_layout_gen(lo); } else { lod_free_comp_entries(lo); @@ -3341,10 +4219,14 @@ static int lod_layout_del(const struct lu_env *env, struct dt_object *dt, EXIT; out: if (rc) - lod_object_free_striping(env, lo); + lod_striping_free(env, lo); return rc; } + +static int lod_get_default_lov_striping(const struct lu_env *env, + struct lod_object *lo, + struct lod_default_striping *lds); /** * Implementation of dt_object_operations::do_xattr_set. * @@ -3373,21 +4255,79 @@ static int lod_xattr_set(const struct lu_env *env, if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && strcmp(name, XATTR_NAME_LMV) == 0) { - struct lmv_mds_md_v1 *lmm = buf->lb_buf; + rc = lod_dir_striping_create(env, dt, NULL, NULL, th); + RETURN(rc); + } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && + strncmp(name, XATTR_NAME_LMV, strlen(XATTR_NAME_LMV)) == 0 && + strlen(name) > strlen(XATTR_NAME_LMV) + 1) { + const char *op = name + strlen(XATTR_NAME_LMV) + 1; - if (lmm != NULL && le32_to_cpu(lmm->lmv_hash_type) & - LMV_HASH_FLAG_MIGRATION) - rc = lod_sub_xattr_set(env, next, buf, name, fl, th); - else - rc = lod_dir_striping_create(env, dt, NULL, NULL, th); + rc = -ENOTSUPP; + /* + * XATTR_NAME_LMV".add" is never called, but only declared, + * because lod_xattr_set_lmv() will do the addition. + */ + if (strcmp(op, "del") == 0) + rc = lod_dir_layout_delete(env, dt, buf, th); + else if (strcmp(op, "set") == 0) + rc = lod_sub_xattr_set(env, next, buf, XATTR_NAME_LMV, + fl, th); RETURN(rc); - } - - if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && + } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && strcmp(name, XATTR_NAME_LOV) == 0) { - /* default LOVEA */ - rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, th); + struct lod_default_striping *lds = lod_lds_buf_get(env); + struct lov_user_md_v1 *v1 = buf->lb_buf; + char pool[LOV_MAXPOOLNAME + 1]; + bool is_del; + + /* get existing striping config */ + rc = lod_get_default_lov_striping(env, lod_dt_obj(dt), lds); + if (rc) + RETURN(rc); + + memset(pool, 0, sizeof(pool)); + if (lds->lds_def_striping_set == 1) + lod_layout_get_pool(lds->lds_def_comp_entries, + lds->lds_def_comp_cnt, pool, + sizeof(pool)); + + is_del = LOVEA_DELETE_VALUES(v1->lmm_stripe_size, + v1->lmm_stripe_count, + v1->lmm_stripe_offset, + NULL); + + /* Retain the pool name if it is not given */ + if (v1->lmm_magic == LOV_USER_MAGIC_V1 && pool[0] != '\0' && + !is_del) { + struct lod_thread_info *info = lod_env_info(env); + struct lov_user_md_v3 *v3 = info->lti_ea_store; + + memset(v3, 0, sizeof(*v3)); + v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3); + v3->lmm_pattern = cpu_to_le32(v1->lmm_pattern); + v3->lmm_stripe_count = + cpu_to_le32(v1->lmm_stripe_count); + v3->lmm_stripe_offset = + cpu_to_le32(v1->lmm_stripe_offset); + v3->lmm_stripe_size = cpu_to_le32(v1->lmm_stripe_size); + + strlcpy(v3->lmm_pool_name, pool, + sizeof(v3->lmm_pool_name)); + + info->lti_buf.lb_buf = v3; + info->lti_buf.lb_len = sizeof(*v3); + rc = lod_xattr_set_lov_on_dir(env, dt, &info->lti_buf, + name, fl, th); + } else { + rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, + fl, th); + } + + if (lds->lds_def_striping_set == 1 && + lds->lds_def_comp_entries != NULL) + lod_free_def_comp_entries(lds); + RETURN(rc); } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { @@ -3405,7 +4345,7 @@ static int lod_xattr_set(const struct lu_env *env, * defines striping, then create() does the work */ if (fl & LU_XATTR_REPLACE) { /* free stripes, then update disk */ - lod_object_free_striping(env, lod_dt_obj(dt)); + lod_striping_free(env, lod_dt_obj(dt)); rc = lod_sub_xattr_set(env, next, buf, name, fl, th); } else if (dt_object_remote(dt)) { @@ -3439,7 +4379,7 @@ static int lod_xattr_set(const struct lu_env *env, } RETURN(rc); } else if (strcmp(name, XATTR_NAME_FID) == 0) { - rc = lod_replace_parent_fid(env, dt, th, false); + rc = lod_replace_parent_fid(env, dt, buf, th, false); RETURN(rc); } @@ -3460,12 +4400,13 @@ static int lod_declare_xattr_del(const struct lu_env *env, struct dt_object *dt, const char *name, struct thandle *th) { - struct lod_object *lo = lod_dt_obj(dt); - int rc; - int i; + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object *next = dt_object_child(dt); + int i; + int rc; ENTRY; - rc = lod_sub_declare_xattr_del(env, dt_object_child(dt), name, th); + rc = lod_sub_declare_xattr_del(env, next, name, th); if (rc != 0) RETURN(rc); @@ -3473,7 +4414,7 @@ static int lod_declare_xattr_del(const struct lu_env *env, RETURN(0); /* set xattr to each stripes, if needed */ - rc = lod_load_striping(env, lo); + rc = lod_striping_load(env, lo); if (rc != 0) RETURN(rc); @@ -3481,9 +4422,12 @@ static int lod_declare_xattr_del(const struct lu_env *env, RETURN(0); for (i = 0; i < lo->ldo_dir_stripe_count; i++) { - LASSERT(lo->ldo_stripe[i]); - rc = lod_sub_declare_xattr_del(env, lo->ldo_stripe[i], - name, th); + struct dt_object *dto = lo->ldo_stripe[i]; + + if (!dto) + continue; + + rc = lod_sub_declare_xattr_del(env, dto, name, th); if (rc != 0) break; } @@ -3508,8 +4452,8 @@ static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt, int i; ENTRY; - if (!strcmp(name, XATTR_NAME_LOV)) - lod_object_free_striping(env, lod_dt_obj(dt)); + if (!strcmp(name, XATTR_NAME_LOV) || !strcmp(name, XATTR_NAME_LMV)) + lod_striping_free(env, lod_dt_obj(dt)); rc = lod_sub_xattr_del(env, next, name, th); if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr)) @@ -3519,9 +4463,12 @@ static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt, RETURN(0); for (i = 0; i < lo->ldo_dir_stripe_count; i++) { - LASSERT(lo->ldo_stripe[i]); + struct dt_object *dto = lo->ldo_stripe[i]; - rc = lod_sub_xattr_del(env, lo->ldo_stripe[i], name, th); + if (!dto) + continue; + + rc = lod_sub_xattr_del(env, dto, name, th); if (rc != 0) break; } @@ -3546,6 +4493,52 @@ static inline int lod_object_will_be_striped(int is_reg, const struct lu_fid *fi return (is_reg && fid_seq(fid) != FID_SEQ_LOCAL_FILE); } +/** + * Copy OST list from layout provided by user. + * + * \param[in] lod_comp layout_component to be filled + * \param[in] v3 LOV EA V3 user data + * + * \retval 0 on success + * \retval negative if failed + */ +int lod_comp_copy_ost_lists(struct lod_layout_component *lod_comp, + struct lov_user_md_v3 *v3) +{ + int j; + + ENTRY; + + if (v3->lmm_stripe_offset == LOV_OFFSET_DEFAULT) + v3->lmm_stripe_offset = v3->lmm_objects[0].l_ost_idx; + + if (lod_comp->llc_ostlist.op_array) { + if (lod_comp->llc_ostlist.op_size >= + v3->lmm_stripe_count * sizeof(__u32)) { + lod_comp->llc_ostlist.op_count = + v3->lmm_stripe_count; + goto skip; + } + OBD_FREE(lod_comp->llc_ostlist.op_array, + lod_comp->llc_ostlist.op_size); + } + + /* copy ost list from lmm */ + lod_comp->llc_ostlist.op_count = v3->lmm_stripe_count; + lod_comp->llc_ostlist.op_size = v3->lmm_stripe_count * sizeof(__u32); + OBD_ALLOC(lod_comp->llc_ostlist.op_array, + lod_comp->llc_ostlist.op_size); + if (!lod_comp->llc_ostlist.op_array) + RETURN(-ENOMEM); +skip: + for (j = 0; j < v3->lmm_stripe_count; j++) { + lod_comp->llc_ostlist.op_array[j] = + v3->lmm_objects[j].l_ost_idx; + } + + RETURN(0); +} + /** * Get default striping. @@ -3566,8 +4559,9 @@ static int lod_get_default_lov_striping(const struct lu_env *env, struct lov_user_md_v3 *v3 = NULL; struct lov_comp_md_v1 *comp_v1 = NULL; __u16 comp_cnt; + __u16 mirror_cnt; bool composite; - int rc, i; + int rc, i, j; ENTRY; lds->lds_def_striping_set = 0; @@ -3585,13 +4579,19 @@ static int lod_get_default_lov_striping(const struct lu_env *env, } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3)) { v3 = (struct lov_user_md_v3 *)v1; lustre_swab_lov_user_md_v3(v3); + } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_SPECIFIC)) { + v3 = (struct lov_user_md_v3 *)v1; + lustre_swab_lov_user_md_v3(v3); + lustre_swab_lov_user_md_objects(v3->lmm_objects, + v3->lmm_stripe_count); } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_COMP_V1)) { comp_v1 = (struct lov_comp_md_v1 *)v1; lustre_swab_lov_comp_md_v1(comp_v1); } if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1 && - v1->lmm_magic != LOV_MAGIC_COMP_V1) + v1->lmm_magic != LOV_MAGIC_COMP_V1 && + v1->lmm_magic != LOV_USER_MAGIC_SPECIFIC) RETURN(-ENOTSUPP); if (v1->lmm_magic == LOV_MAGIC_COMP_V1) { @@ -3599,9 +4599,11 @@ static int lod_get_default_lov_striping(const struct lu_env *env, comp_cnt = comp_v1->lcm_entry_count; if (comp_cnt == 0) RETURN(-EINVAL); + mirror_cnt = comp_v1->lcm_mirror_count + 1; composite = true; } else { comp_cnt = 1; + mirror_cnt = 0; composite = false; } @@ -3611,11 +4613,11 @@ static int lod_get_default_lov_striping(const struct lu_env *env, RETURN(rc); lds->lds_def_comp_cnt = comp_cnt; - lds->lds_def_striping_is_composite = composite ? 1 : 0; + lds->lds_def_striping_is_composite = composite; + lds->lds_def_mirror_cnt = mirror_cnt; for (i = 0; i < comp_cnt; i++) { struct lod_layout_component *lod_comp; - struct lu_extent *ext; char *pool; lod_comp = &lds->lds_def_comp_entries[i]; @@ -3629,12 +4631,16 @@ static int lod_get_default_lov_striping(const struct lu_env *env, if (composite) { v1 = (struct lov_user_md *)((char *)comp_v1 + comp_v1->lcm_entries[i].lcme_offset); - ext = &comp_v1->lcm_entries[i].lcme_extent; - lod_comp->llc_extent = *ext; + lod_comp->llc_extent = + comp_v1->lcm_entries[i].lcme_extent; + /* We only inherit certain flags from the layout */ + lod_comp->llc_flags = + comp_v1->lcm_entries[i].lcme_flags & + LCME_TEMPLATE_FLAGS; } - if (v1->lmm_pattern != LOV_PATTERN_RAID0 && - v1->lmm_pattern != 0) { + if (!lov_pattern_supported(v1->lmm_pattern) && + !(v1->lmm_pattern & LOV_PATTERN_F_RELEASED)) { lod_free_def_comp_entries(lds); RETURN(-EINVAL); } @@ -3648,6 +4654,7 @@ static int lod_get_default_lov_striping(const struct lu_env *env, lod_comp->llc_stripe_count = v1->lmm_stripe_count; lod_comp->llc_stripe_size = v1->lmm_stripe_size; lod_comp->llc_stripe_offset = v1->lmm_stripe_offset; + lod_comp->llc_pattern = v1->lmm_pattern; pool = NULL; if (v1->lmm_magic == LOV_USER_MAGIC_V3) { @@ -3657,6 +4664,17 @@ static int lod_get_default_lov_striping(const struct lu_env *env, pool = v3->lmm_pool_name; } lod_set_def_pool(lds, i, pool); + if (v1->lmm_magic == LOV_USER_MAGIC_SPECIFIC) { + v3 = (struct lov_user_md_v3 *)v1; + rc = lod_comp_copy_ost_lists(lod_comp, v3); + if (rc) + RETURN(rc); + } else if (lod_comp->llc_ostlist.op_array && + lod_comp->llc_ostlist.op_count) { + for (j = 0; j < lod_comp->llc_ostlist.op_count; j++) + lod_comp->llc_ostlist.op_array[j] = -1; + lod_comp->llc_ostlist.op_count = 0; + } } lds->lds_def_striping_set = 1; @@ -3745,11 +4763,14 @@ static void lod_striping_from_default(struct lod_object *lo, int i, rc; if (lds->lds_def_striping_set && S_ISREG(mode)) { - rc = lod_alloc_comp_entries(lo, lds->lds_def_comp_cnt); + rc = lod_alloc_comp_entries(lo, lds->lds_def_mirror_cnt, + lds->lds_def_comp_cnt); if (rc != 0) return; lo->ldo_is_composite = lds->lds_def_striping_is_composite; + if (lds->lds_def_mirror_cnt > 1) + lo->ldo_flr_state = LCM_FL_RDONLY; for (i = 0; i < lo->ldo_comp_cnt; i++) { struct lod_layout_component *obj_comp = @@ -3757,11 +4778,13 @@ static void lod_striping_from_default(struct lod_object *lo, struct lod_layout_component *def_comp = &lds->lds_def_comp_entries[i]; - CDEBUG(D_LAYOUT, "Inherite from default: size:%hu " - "nr:%u offset:%u %s\n", + CDEBUG(D_LAYOUT, "Inherit from default: flags=%#x " + "size=%hu nr=%u offset=%u pattern=%#x pool=%s\n", + def_comp->llc_flags, def_comp->llc_stripe_size, def_comp->llc_stripe_count, def_comp->llc_stripe_offset, + def_comp->llc_pattern, def_comp->llc_pool ?: ""); *obj_comp = *def_comp; @@ -3771,6 +4794,20 @@ static void lod_striping_from_default(struct lod_object *lo, lod_obj_set_pool(lo, i, def_comp->llc_pool); } + /* copy ost list */ + if (def_comp->llc_ostlist.op_array && + def_comp->llc_ostlist.op_count) { + OBD_ALLOC(obj_comp->llc_ostlist.op_array, + obj_comp->llc_ostlist.op_size); + if (!obj_comp->llc_ostlist.op_array) + return; + memcpy(obj_comp->llc_ostlist.op_array, + def_comp->llc_ostlist.op_array, + obj_comp->llc_ostlist.op_size); + } else if (def_comp->llc_ostlist.op_array) { + obj_comp->llc_ostlist.op_array = NULL; + } + /* * Don't initialize these fields for plain layout * (v1/v3) here, they are inherited in the order of @@ -3782,12 +4819,7 @@ static void lod_striping_from_default(struct lod_object *lo, if (!lo->ldo_is_composite) continue; - if (obj_comp->llc_stripe_count <= 0) - obj_comp->llc_stripe_count = - desc->ld_default_stripe_count; - if (obj_comp->llc_stripe_size <= 0) - obj_comp->llc_stripe_size = - desc->ld_default_stripe_size; + lod_adjust_stripe_info(obj_comp, desc); } } else if (lds->lds_dir_def_striping_set && S_ISDIR(mode)) { if (lo->ldo_dir_stripe_count == 0) @@ -3848,7 +4880,7 @@ static void lod_ah_init(const struct lu_env *env, { struct lod_device *d = lu2lod_dev(child->do_lu.lo_dev); struct lod_thread_info *info = lod_env_info(env); - struct lod_default_striping *lds = &info->lti_def_striping; + struct lod_default_striping *lds = lod_lds_buf_get(env); struct dt_object *nextp = NULL; struct dt_object *nextc; struct lod_object *lp = NULL; @@ -3878,11 +4910,27 @@ static void lod_ah_init(const struct lu_env *env, nextc->do_ops->do_ah_init(env, ah, nextp, nextc, child_mode); if (S_ISDIR(child_mode)) { + const struct lmv_user_md_v1 *lum1 = ah->dah_eadata; + /* other default values are 0 */ lc->ldo_dir_stripe_offset = -1; - /* get default striping from parent object */ - if (likely(lp != NULL)) + /* no default striping configuration is needed for + * foreign dirs + */ + if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0 && + le32_to_cpu(lum1->lum_magic) == LMV_MAGIC_FOREIGN) { + lc->ldo_dir_is_foreign = true; + /* keep stripe_count 0 and stripe_offset -1 */ + CDEBUG(D_INFO, "no default striping for foreign dir\n"); + RETURN_EXIT; + } + + /* + * If parent object is not root directory, + * then get default striping from parent object. + */ + if (likely(lp != NULL) && !fid_is_root(lod_object_fid(lp))) lod_get_default_striping(env, lp, lds); /* set child default striping info, default value is NULL */ @@ -3890,10 +4938,13 @@ static void lod_ah_init(const struct lu_env *env, lc->ldo_def_striping = lds; /* It should always honour the specified stripes */ + /* Note: old client (< 2.7)might also do lfs mkdir, whose EA + * will have old magic. In this case, we should ignore the + * stripe count and try to create dir by default stripe. + */ if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0 && - lod_verify_md_striping(d, ah->dah_eadata) == 0) { - const struct lmv_user_md_v1 *lum1 = ah->dah_eadata; - + (le32_to_cpu(lum1->lum_magic) == LMV_USER_MAGIC || + le32_to_cpu(lum1->lum_magic) == LMV_USER_MAGIC_SPECIFIC)) { lc->ldo_dir_stripe_count = le32_to_cpu(lum1->lum_stripe_count); lc->ldo_dir_stripe_offset = @@ -3908,19 +4959,19 @@ static void lod_ah_init(const struct lu_env *env, } else { /* transfer defaults LMV to new directory */ lod_striping_from_default(lc, lds, child_mode); + + /* set count 0 to create normal directory */ + if (lc->ldo_dir_stripe_count == 1) + lc->ldo_dir_stripe_count = 0; } /* shrink the stripe_count to the avaible MDT count */ if (lc->ldo_dir_stripe_count > d->lod_remote_mdt_count + 1 && - !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE)) + !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE)) { lc->ldo_dir_stripe_count = d->lod_remote_mdt_count + 1; - - /* Directory will be striped only if stripe_count > 1, if - * stripe_count == 1, let's reset stripe_count = 0 to avoid - * create single master stripe and also help to unify the - * stripe handling of directories and files */ - if (lc->ldo_dir_stripe_count == 1) - lc->ldo_dir_stripe_count = 0; + if (lc->ldo_dir_stripe_count == 1) + lc->ldo_dir_stripe_count = 0; + } CDEBUG(D_INFO, "final dir stripe [%hu %d %u]\n", lc->ldo_dir_stripe_count, @@ -3989,7 +5040,8 @@ static void lod_ah_init(const struct lu_env *env, if (lod_comp->llc_stripe_size <= 0) lod_comp->llc_stripe_size = def_comp->llc_stripe_size; - if (lod_comp->llc_stripe_offset == LOV_OFFSET_DEFAULT) + if (lod_comp->llc_stripe_offset == LOV_OFFSET_DEFAULT && + (!lod_comp->llc_pool || !lod_comp->llc_pool[0])) lod_comp->llc_stripe_offset = def_comp->llc_stripe_offset; if (lod_comp->llc_pool == NULL) @@ -4002,9 +5054,8 @@ out: * in config log, use them. */ if (lod_need_inherit_more(lc, false)) { - if (lc->ldo_comp_cnt == 0) { - rc = lod_alloc_comp_entries(lc, 1); + rc = lod_alloc_comp_entries(lc, 0, 1); if (rc) /* fail to allocate memory, will create a * non-striped file. */ @@ -4016,12 +5067,7 @@ out: LASSERT(!lc->ldo_is_composite); lod_comp = &lc->ldo_comp_entries[0]; desc = &d->lod_desc; - if (lod_comp->llc_stripe_count <= 0) - lod_comp->llc_stripe_count = - desc->ld_default_stripe_count; - if (lod_comp->llc_stripe_size <= 0) - lod_comp->llc_stripe_size = - desc->ld_default_stripe_size; + lod_adjust_stripe_info(lod_comp, desc); } EXIT; @@ -4053,6 +5099,7 @@ static int lod_declare_init_size(const struct lu_env *env, struct lu_attr *attr = &lod_env_info(env)->lti_attr; uint64_t size, offs; int i, rc, stripe, stripe_count = 0, stripe_size = 0; + struct lu_extent size_ext; ENTRY; if (!lod_obj_is_striped(dt)) @@ -4067,6 +5114,7 @@ static int lod_declare_init_size(const struct lu_env *env, if (size == 0) RETURN(0); + size_ext = (typeof(size_ext)){ .e_start = size - 1, .e_end = size }; for (i = 0; i < lo->ldo_comp_cnt; i++) { struct lod_layout_component *lod_comp; struct lu_extent *extent; @@ -4077,35 +5125,34 @@ static int lod_declare_init_size(const struct lu_env *env, continue; extent = &lod_comp->llc_extent; - CDEBUG(D_INFO, "%lld [%lld, %lld)\n", - size, extent->e_start, extent->e_end); + CDEBUG(D_INFO, "%lld "DEXT"\n", size, PEXT(extent)); if (!lo->ldo_is_composite || - (size >= extent->e_start && size < extent->e_end)) { + lu_extent_is_overlapped(extent, &size_ext)) { objects = lod_comp->llc_stripe; stripe_count = lod_comp->llc_stripe_count; stripe_size = lod_comp->llc_stripe_size; - break; - } - } - if (stripe_count == 0) - RETURN(0); + /* next mirror */ + if (stripe_count == 0) + continue; - LASSERT(objects != NULL && stripe_size != 0); + LASSERT(objects != NULL && stripe_size != 0); + /* ll_do_div64(a, b) returns a % b, and a = a / b */ + ll_do_div64(size, (__u64)stripe_size); + stripe = ll_do_div64(size, (__u64)stripe_count); + LASSERT(objects[stripe] != NULL); - /* ll_do_div64(a, b) returns a % b, and a = a / b */ - ll_do_div64(size, (__u64)stripe_size); - stripe = ll_do_div64(size, (__u64)stripe_count); - LASSERT(objects[stripe] != NULL); + size = size * stripe_size; + offs = attr->la_size; + size += ll_do_div64(offs, stripe_size); - size = size * stripe_size; - offs = attr->la_size; - size += ll_do_div64(offs, stripe_size); + attr->la_valid = LA_SIZE; + attr->la_size = size; - attr->la_valid = LA_SIZE; - attr->la_size = size; - - rc = lod_sub_declare_attr_set(env, objects[stripe], attr, th); + rc = lod_sub_declare_attr_set(env, objects[stripe], + attr, th); + } + } RETURN(rc); } @@ -4179,7 +5226,7 @@ out: /* failed to create striping or to set initial size, let's reset * config so that others don't get confused */ if (rc) - lod_object_free_striping(env, lo); + lod_striping_free(env, lo); RETURN(rc); } @@ -4229,6 +5276,8 @@ static int lod_declare_create(const struct lu_env *env, struct dt_object *dt, NULL, th); } else if (dof->dof_type == DFT_DIR) { struct seq_server_site *ss; + struct lu_buf buf = { NULL }; + struct lu_buf *lmu = NULL; ss = lu_site2seq(dt->do_lu.lo_dev->ld_site); @@ -4278,19 +5327,71 @@ static int lod_declare_create(const struct lu_env *env, struct dt_object *dt, else GOTO(out, rc = -EINVAL); } + } else if (hint && hint->dah_eadata) { + lmu = &buf; + lmu->lb_buf = (void *)hint->dah_eadata; + lmu->lb_len = hint->dah_eadata_len; } - rc = lod_declare_dir_striping_create(env, dt, attr, dof, th); + rc = lod_declare_dir_striping_create(env, dt, attr, lmu, dof, + th); } out: /* failed to create striping or to set initial size, let's reset * config so that others don't get confused */ if (rc) - lod_object_free_striping(env, lo); + lod_striping_free(env, lo); RETURN(rc); } /** + * Generate component ID for new created component. + * + * \param[in] lo LOD object + * \param[in] comp_idx index of ldo_comp_entries + * + * \retval component ID on success + * \retval LCME_ID_INVAL on failure + */ +static __u32 lod_gen_component_id(struct lod_object *lo, + int mirror_id, int comp_idx) +{ + struct lod_layout_component *lod_comp; + __u32 id, start, end; + int i; + + LASSERT(lo->ldo_comp_entries[comp_idx].llc_id == LCME_ID_INVAL); + + lod_obj_inc_layout_gen(lo); + id = lo->ldo_layout_gen; + if (likely(id <= SEQ_ID_MAX)) + RETURN(pflr_id(mirror_id, id & SEQ_ID_MASK)); + + /* Layout generation wraps, need to check collisions. */ + start = id & SEQ_ID_MASK; + end = SEQ_ID_MAX; +again: + for (id = start; id <= end; id++) { + for (i = 0; i < lo->ldo_comp_cnt; i++) { + lod_comp = &lo->ldo_comp_entries[i]; + if (pflr_id(mirror_id, id) == lod_comp->llc_id) + break; + } + /* Found the ununsed ID */ + if (i == lo->ldo_comp_cnt) + RETURN(pflr_id(mirror_id, id)); + } + if (end == LCME_ID_MAX) { + start = 1; + end = min(lo->ldo_layout_gen & LCME_ID_MASK, + (__u32)(LCME_ID_MAX - 1)); + goto again; + } + + RETURN(LCME_ID_INVAL); +} + +/** * Creation of a striped regular object. * * The function is called to create the stripe objects for a regular @@ -4315,21 +5416,50 @@ int lod_striped_create(const struct lu_env *env, struct dt_object *dt, { struct lod_layout_component *lod_comp; struct lod_object *lo = lod_dt_obj(dt); + __u16 mirror_id; int rc = 0, i, j; ENTRY; - LASSERT(lo->ldo_comp_cnt != 0 && lo->ldo_comp_entries != NULL); + LASSERT((lo->ldo_comp_cnt != 0 && lo->ldo_comp_entries != NULL) || + lo->ldo_is_foreign); + + mirror_id = 0; /* non-flr file's mirror_id is 0 */ + if (lo->ldo_mirror_count > 1) { + for (i = 0; i < lo->ldo_comp_cnt; i++) { + lod_comp = &lo->ldo_comp_entries[i]; + if (lod_comp->llc_id != LCME_ID_INVAL && + mirror_id_of(lod_comp->llc_id) > mirror_id) + mirror_id = mirror_id_of(lod_comp->llc_id); + } + } /* create all underlying objects */ for (i = 0; i < lo->ldo_comp_cnt; i++) { lod_comp = &lo->ldo_comp_entries[i]; + if (lod_comp->llc_id == LCME_ID_INVAL) { + /* only the component of FLR layout with more than 1 + * mirror has mirror ID in its component ID. + */ + if (lod_comp->llc_extent.e_start == 0 && + lo->ldo_mirror_count > 1) + ++mirror_id; + + lod_comp->llc_id = lod_gen_component_id(lo, + mirror_id, i); + if (lod_comp->llc_id == LCME_ID_INVAL) + GOTO(out, rc = -ERANGE); + } + if (lod_comp_inited(lod_comp)) continue; if (lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED) lod_comp_set_init(lod_comp); + if (lov_pattern(lod_comp->llc_pattern) == LOV_PATTERN_MDT) + lod_comp_set_init(lod_comp); + if (lod_comp->llc_stripe == NULL) continue; @@ -4339,22 +5469,44 @@ int lod_striped_create(const struct lu_env *env, struct dt_object *dt, LASSERT(object != NULL); rc = lod_sub_create(env, object, attr, NULL, dof, th); if (rc) - break; + GOTO(out, rc); } lod_comp_set_init(lod_comp); } - if (rc == 0) - rc = lod_generate_and_set_lovea(env, lo, th); + rc = lod_fill_mirrors(lo); + if (rc) + GOTO(out, rc); - if (rc == 0) - lo->ldo_comp_cached = 1; - else - lod_object_free_striping(env, lo); + rc = lod_generate_and_set_lovea(env, lo, th); + if (rc) + GOTO(out, rc); + + lo->ldo_comp_cached = 1; + RETURN(0); +out: + lod_striping_free(env, lo); RETURN(rc); } +static inline bool lod_obj_is_dom(struct dt_object *dt) +{ + struct lod_object *lo = lod_dt_obj(dt); + + if (!dt_object_exists(dt_object_child(dt))) + return false; + + if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) + return false; + + if (!lo->ldo_comp_cnt) + return false; + + return (lov_pattern(lo->ldo_comp_entries[0].llc_pattern) == + LOV_PATTERN_MDT); +} + /** * Implementation of dt_object_operations::do_create. * @@ -4377,7 +5529,8 @@ static int lod_create(const struct lu_env *env, struct dt_object *dt, RETURN(rc); if (S_ISREG(dt->do_lu.lo_header->loh_attr) && - lod_obj_is_striped(dt) && dof->u.dof_reg.striped != 0) { + (lod_obj_is_striped(dt) || lod_obj_is_dom(dt)) && + dof->u.dof_reg.striped != 0) { LASSERT(lod_dt_obj(dt)->ldo_comp_cached == 0); rc = lod_striped_create(env, dt, attr, dof, th); } @@ -4388,7 +5541,8 @@ static int lod_create(const struct lu_env *env, struct dt_object *dt, static inline int lod_obj_stripe_destroy_cb(const struct lu_env *env, struct lod_object *lo, struct dt_object *dt, struct thandle *th, - int stripe_idx, struct lod_obj_stripe_cb_data *data) + int comp_idx, int stripe_idx, + struct lod_obj_stripe_cb_data *data) { if (data->locd_declare) return lod_sub_declare_destroy(env, dt, th); @@ -4413,11 +5567,13 @@ lod_obj_stripe_destroy_cb(const struct lu_env *env, struct lod_object *lo, static int lod_declare_destroy(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { - struct dt_object *next = dt_object_child(dt); - struct lod_object *lo = lod_dt_obj(dt); + struct dt_object *next = dt_object_child(dt); + struct lod_object *lo = lod_dt_obj(dt); struct lod_thread_info *info = lod_env_info(env); - char *stripe_name = info->lti_key; - int rc, i; + struct dt_object *stripe; + char *stripe_name = info->lti_key; + int rc, i; + ENTRY; /* @@ -4425,7 +5581,7 @@ static int lod_declare_destroy(const struct lu_env *env, struct dt_object *dt, * is being initialized as we don't need this information till * few specific cases like destroy, chown */ - rc = lod_load_striping(env, lo); + rc = lod_striping_load(env, lo); if (rc) RETURN(rc); @@ -4437,13 +5593,17 @@ static int lod_declare_destroy(const struct lu_env *env, struct dt_object *dt, RETURN(rc); for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + stripe = lo->ldo_stripe[i]; + if (!stripe) + continue; + rc = lod_sub_declare_ref_del(env, next, th); if (rc != 0) RETURN(rc); - snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", - PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)), - i); + snprintf(stripe_name, sizeof(info->lti_key), + DFID":%d", + PFID(lu_object_fid(&stripe->do_lu)), i); rc = lod_sub_declare_delete(env, next, (const struct dt_key *)stripe_name, th); if (rc != 0) @@ -4468,23 +5628,27 @@ static int lod_declare_destroy(const struct lu_env *env, struct dt_object *dt, /* declare destroy all striped objects */ if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) { for (i = 0; i < lo->ldo_dir_stripe_count; i++) { - if (lo->ldo_stripe[i] == NULL) + stripe = lo->ldo_stripe[i]; + if (!stripe) continue; - rc = lod_sub_declare_ref_del(env, lo->ldo_stripe[i], - th); + if (!dt_object_exists(stripe)) + continue; - rc = lod_sub_declare_destroy(env, lo->ldo_stripe[i], - th); + rc = lod_sub_declare_ref_del(env, stripe, th); + if (rc != 0) + break; + + rc = lod_sub_declare_destroy(env, stripe, th); if (rc != 0) break; } } else { - struct lod_obj_stripe_cb_data data; + struct lod_obj_stripe_cb_data data = { { 0 } }; data.locd_declare = true; - rc = lod_obj_for_each_stripe(env, lo, th, - lod_obj_stripe_destroy_cb, &data); + data.locd_stripe_cb = lod_obj_stripe_destroy_cb; + rc = lod_obj_for_each_stripe(env, lo, th, &data); } RETURN(rc); @@ -4505,9 +5669,11 @@ static int lod_destroy(const struct lu_env *env, struct dt_object *dt, struct dt_object *next = dt_object_child(dt); struct lod_object *lo = lod_dt_obj(dt); struct lod_thread_info *info = lod_env_info(env); - char *stripe_name = info->lti_key; - unsigned int i; - int rc; + char *stripe_name = info->lti_key; + struct dt_object *stripe; + unsigned int i; + int rc; + ENTRY; /* destroy sub-stripe of master object */ @@ -4518,17 +5684,20 @@ static int lod_destroy(const struct lu_env *env, struct dt_object *dt, RETURN(rc); for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + stripe = lo->ldo_stripe[i]; + if (!stripe) + continue; + rc = lod_sub_ref_del(env, next, th); if (rc != 0) RETURN(rc); snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", - PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu)), - i); + PFID(lu_object_fid(&stripe->do_lu)), i); CDEBUG(D_INFO, DFID" delete stripe %s "DFID"\n", PFID(lu_object_fid(&dt->do_lu)), stripe_name, - PFID(lu_object_fid(&lo->ldo_stripe[i]->do_lu))); + PFID(lu_object_fid(&stripe->do_lu))); rc = lod_sub_delete(env, next, (const struct dt_key *)stripe_name, th); @@ -4551,30 +5720,32 @@ static int lod_destroy(const struct lu_env *env, struct dt_object *dt, /* destroy all striped objects */ if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) { for (i = 0; i < lo->ldo_dir_stripe_count; i++) { - if (lo->ldo_stripe[i] == NULL) + stripe = lo->ldo_stripe[i]; + if (!stripe) continue; + + if (!dt_object_exists(stripe)) + continue; + if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) || i == cfs_fail_val) { - dt_write_lock(env, lo->ldo_stripe[i], - MOR_TGT_CHILD); - rc = lod_sub_ref_del(env, lo->ldo_stripe[i], - th); - dt_write_unlock(env, lo->ldo_stripe[i]); + dt_write_lock(env, stripe, MOR_TGT_CHILD); + rc = lod_sub_ref_del(env, stripe, th); + dt_write_unlock(env, stripe); if (rc != 0) break; - rc = lod_sub_destroy(env, lo->ldo_stripe[i], - th); + rc = lod_sub_destroy(env, stripe, th); if (rc != 0) break; } } } else { - struct lod_obj_stripe_cb_data data; + struct lod_obj_stripe_cb_data data = { { 0 } }; data.locd_declare = false; - rc = lod_obj_for_each_stripe(env, lo, th, - lod_obj_stripe_destroy_cb, &data); + data.locd_stripe_cb = lod_obj_stripe_destroy_cb; + rc = lod_obj_for_each_stripe(env, lo, th, &data); } RETURN(rc); @@ -4639,42 +5810,6 @@ static int lod_object_sync(const struct lu_env *env, struct dt_object *dt, } /** - * Release LDLM locks on the stripes of a striped directory. - * - * Iterates over all the locks taken on the stripe objects and - * cancel them. - * - * \param[in] env execution environment - * \param[in] dt striped object - * \param[in] einfo lock description - * \param[in] policy data describing requested lock - * - * \retval 0 on success - * \retval negative if failed - */ -static int lod_object_unlock_internal(const struct lu_env *env, - struct dt_object *dt, - struct ldlm_enqueue_info *einfo, - union ldlm_policy_data *policy) -{ - struct lustre_handle_array *slave_locks = einfo->ei_cbdata; - int rc = 0; - int i; - ENTRY; - - if (slave_locks == NULL) - RETURN(0); - - for (i = 1; i < slave_locks->count; i++) { - if (lustre_handle_is_used(&slave_locks->handles[i])) - ldlm_lock_decref_and_cancel(&slave_locks->handles[i], - einfo->ei_mode); - } - - RETURN(rc); -} - -/** * Implementation of dt_object_operations::do_object_unlock. * * Used to release LDLM lock(s). @@ -4696,19 +5831,25 @@ static int lod_object_unlock(const struct lu_env *env, struct dt_object *dt, RETURN(0); LASSERT(S_ISDIR(dt->do_lu.lo_header->loh_attr)); - LASSERT(lo->ldo_dir_stripe_count > 1); /* Note: for remote lock for single stripe dir, MDT will cancel * the lock by lockh directly */ LASSERT(!dt_object_remote(dt_object_child(dt))); /* locks were unlocked in MDT layer */ - for (i = 1; i < slave_locks->count; i++) { - LASSERT(!lustre_handle_is_used(&slave_locks->handles[i])); - dt_invalidate(env, lo->ldo_stripe[i]); + for (i = 0; i < slave_locks->ha_count; i++) + LASSERT(!lustre_handle_is_used(&slave_locks->ha_handles[i])); + + /* + * NB, ha_count may not equal to ldo_dir_stripe_count, because dir + * layout may change, e.g., shrink dir layout after migration. + */ + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + if (lo->ldo_stripe[i]) + dt_invalidate(env, lo->ldo_stripe[i]); } - slave_locks_size = sizeof(*slave_locks) + slave_locks->count * - sizeof(slave_locks->handles[0]); + slave_locks_size = offsetof(typeof(*slave_locks), + ha_handles[slave_locks->ha_count]); OBD_FREE(slave_locks, slave_locks_size); einfo->ei_cbdata = NULL; @@ -4729,11 +5870,11 @@ static int lod_object_lock(const struct lu_env *env, struct ldlm_enqueue_info *einfo, union ldlm_policy_data *policy) { - struct lod_object *lo = lod_dt_obj(dt); - int rc = 0; - int i; - int slave_locks_size; + struct lod_object *lo = lod_dt_obj(dt); + int slave_locks_size; struct lustre_handle_array *slave_locks = NULL; + int i; + int rc; ENTRY; /* remote object lock */ @@ -4744,82 +5885,72 @@ static int lod_object_lock(const struct lu_env *env, } if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) - GOTO(out, rc = -ENOTDIR); + RETURN(-ENOTDIR); - rc = lod_load_striping(env, lo); + rc = lod_striping_load(env, lo); if (rc != 0) - GOTO(out, rc); + RETURN(rc); /* No stripes */ - if (lo->ldo_dir_stripe_count <= 1) { - /* - * NB, ei_cbdata stores pointer to slave locks, if no locks - * taken, make sure it's set to NULL, otherwise MDT will try to - * unlock them. - */ - einfo->ei_cbdata = NULL; - GOTO(out, rc = 0); - } + if (lo->ldo_dir_stripe_count <= 1) + RETURN(0); - slave_locks_size = sizeof(*slave_locks) + lo->ldo_dir_stripe_count * - sizeof(slave_locks->handles[0]); + slave_locks_size = offsetof(typeof(*slave_locks), + ha_handles[lo->ldo_dir_stripe_count]); /* Freed in lod_object_unlock */ OBD_ALLOC(slave_locks, slave_locks_size); - if (slave_locks == NULL) - GOTO(out, rc = -ENOMEM); - slave_locks->count = lo->ldo_dir_stripe_count; + if (!slave_locks) + RETURN(-ENOMEM); + slave_locks->ha_count = lo->ldo_dir_stripe_count; /* striped directory lock */ - for (i = 1; i < lo->ldo_dir_stripe_count; i++) { - struct lustre_handle lockh; - struct ldlm_res_id *res_id; + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + struct lustre_handle lockh; + struct ldlm_res_id *res_id; + struct dt_object *stripe; + + stripe = lo->ldo_stripe[i]; + if (!stripe) + continue; res_id = &lod_env_info(env)->lti_res_id; - fid_build_reg_res_name(lu_object_fid(&lo->ldo_stripe[i]->do_lu), - res_id); + fid_build_reg_res_name(lu_object_fid(&stripe->do_lu), res_id); einfo->ei_res_id = res_id; - LASSERT(lo->ldo_stripe[i] != NULL); - if (likely(dt_object_remote(lo->ldo_stripe[i]))) { - rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, - einfo, policy); + if (dt_object_remote(stripe)) { + set_bit(i, (void *)slave_locks->ha_map); + rc = dt_object_lock(env, stripe, &lockh, einfo, policy); } else { struct ldlm_namespace *ns = einfo->ei_namespace; ldlm_blocking_callback blocking = einfo->ei_cb_local_bl; ldlm_completion_callback completion = einfo->ei_cb_cp; - __u64 dlmflags = LDLM_FL_ATOMIC_CB; + __u64 dlmflags = LDLM_FL_ATOMIC_CB; if (einfo->ei_mode == LCK_PW || einfo->ei_mode == LCK_EX) dlmflags |= LDLM_FL_COS_INCOMPAT; - /* This only happens if there are mulitple stripes - * on the master MDT, i.e. except stripe0, there are - * other stripes on the Master MDT as well, Only - * happens in the test case right now. */ LASSERT(ns != NULL); - rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, + rc = ldlm_cli_enqueue_local(env, ns, res_id, LDLM_IBITS, policy, einfo->ei_mode, &dlmflags, blocking, completion, NULL, NULL, 0, LVB_T_NONE, NULL, &lockh); } - if (rc != 0) - break; - slave_locks->handles[i] = lockh; + if (rc) { + while (i--) + ldlm_lock_decref_and_cancel( + &slave_locks->ha_handles[i], + einfo->ei_mode); + OBD_FREE(slave_locks, slave_locks_size); + RETURN(rc); + } + slave_locks->ha_handles[i] = lockh; } einfo->ei_cbdata = slave_locks; - if (rc != 0 && slave_locks != NULL) { - lod_object_unlock_internal(env, dt, einfo, policy); - OBD_FREE(slave_locks, slave_locks_size); - } - EXIT; -out: - if (rc != 0) - einfo->ei_cbdata = NULL; - RETURN(rc); + RETURN(0); } /** @@ -4832,29 +5963,71 @@ static int lod_invalidate(const struct lu_env *env, struct dt_object *dt) return dt_invalidate(env, dt_object_child(dt)); } -static int lod_declare_layout_change(const struct lu_env *env, - struct dt_object *dt, - struct layout_intent *layout, - const struct lu_buf *buf, - struct thandle *th) +static int lod_layout_data_init(struct lod_thread_info *info, __u32 comp_cnt) { - struct lod_thread_info *info = lod_env_info(env); - struct lod_object *lo = lod_dt_obj(dt); - struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); - struct dt_object *next = dt_object_child(dt); - struct ost_pool *inuse = &info->lti_inuse_osts; + ENTRY; + + /* clear memory region that will be used for layout change */ + memset(&info->lti_layout_attr, 0, sizeof(struct lu_attr)); + info->lti_count = 0; + + if (info->lti_comp_size >= comp_cnt) + RETURN(0); + + if (info->lti_comp_size > 0) { + OBD_FREE(info->lti_comp_idx, + info->lti_comp_size * sizeof(__u32)); + info->lti_comp_size = 0; + } + + OBD_ALLOC(info->lti_comp_idx, comp_cnt * sizeof(__u32)); + if (!info->lti_comp_idx) + RETURN(-ENOMEM); + + info->lti_comp_size = comp_cnt; + RETURN(0); +} + +static int lod_declare_instantiate_components(const struct lu_env *env, + struct lod_object *lo, struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + int i; + int rc = 0; + ENTRY; + + LASSERT(info->lti_count < lo->ldo_comp_cnt); + + for (i = 0; i < info->lti_count; i++) { + rc = lod_qos_prep_create(env, lo, NULL, th, + info->lti_comp_idx[i]); + if (rc) + break; + } + + if (!rc) { + info->lti_buf.lb_len = lod_comp_md_size(lo, false); + rc = lod_sub_declare_xattr_set(env, lod_object_child(lo), + &info->lti_buf, XATTR_NAME_LOV, 0, th); + } + + RETURN(rc); +} + +static int lod_declare_update_plain(const struct lu_env *env, + struct lod_object *lo, struct layout_intent *layout, + const struct lu_buf *buf, struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_device *d = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); struct lod_layout_component *lod_comp; struct lov_comp_md_v1 *comp_v1 = NULL; bool replay = false; - bool need_create = false; int i, rc; ENTRY; - if (!S_ISREG(dt->do_lu.lo_header->loh_attr) || !dt_object_exists(dt) || - dt_object_remote(next)) - RETURN(-EINVAL); + LASSERT(lo->ldo_flr_state == LCM_FL_NONE); - dt_write_lock(env, next, 0); /* * In case the client is passing lovea, which only happens during * the replay of layout intent write RPC for now, we may need to @@ -4873,25 +6046,20 @@ static int lod_declare_layout_change(const struct lu_env *env, GOTO(out, rc = -EINVAL); } - lod_object_free_striping(env, lo); rc = lod_use_defined_striping(env, lo, buf); if (rc) GOTO(out, rc); + lo->ldo_comp_cached = 1; rc = lod_get_lov_ea(env, lo); if (rc <= 0) GOTO(out, rc); /* old on-disk EA is stored in info->lti_buf */ - comp_v1 = (struct lov_comp_md_v1 *)&info->lti_buf.lb_buf; + comp_v1 = (struct lov_comp_md_v1 *)info->lti_buf.lb_buf; replay = true; } else { /* non replay path */ - rc = lod_load_striping_locked(env, lo); - if (rc) - GOTO(out, rc); - - /* Prepare inuse array for composite file */ - rc = lod_prepare_inuse(env, lo); + rc = lod_striping_load(env, lo); if (rc) GOTO(out, rc); } @@ -4900,15 +6068,19 @@ static int lod_declare_layout_change(const struct lu_env *env, lod_comp = &lo->ldo_comp_entries[lo->ldo_comp_cnt - 1]; if (lo->ldo_comp_cnt > 1 && lod_comp->llc_extent.e_end != OBD_OBJECT_EOF && - lod_comp->llc_extent.e_end < layout->li_end) { + lod_comp->llc_extent.e_end < layout->li_extent.e_end) { CDEBUG(replay ? D_ERROR : D_LAYOUT, "%s: the defined layout [0, %#llx) does not covers " - "the write range [%#llx, %#llx).\n", + "the write range "DEXT"\n", lod2obd(d)->obd_name, lod_comp->llc_extent.e_end, - layout->li_start, layout->li_end); + PEXT(&layout->li_extent)); GOTO(out, rc = -EINVAL); } + CDEBUG(D_LAYOUT, "%s: "DFID": instantiate components "DEXT"\n", + lod2obd(d)->obd_name, PFID(lod_object_fid(lo)), + PEXT(&layout->li_extent)); + /* * Iterate ld->ldo_comp_entries, find the component whose extent under * the write range and not instantianted. @@ -4916,7 +6088,7 @@ static int lod_declare_layout_change(const struct lu_env *env, for (i = 0; i < lo->ldo_comp_cnt; i++) { lod_comp = &lo->ldo_comp_entries[i]; - if (lod_comp->llc_extent.e_start >= layout->li_end) + if (lod_comp->llc_extent.e_start >= layout->li_extent.e_end) break; if (!replay) { @@ -4942,91 +6114,756 @@ static int lod_declare_layout_change(const struct lu_env *env, if (lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED) GOTO(out, rc = -EINVAL); - need_create = true; - - rc = lod_qos_prep_create(env, lo, NULL, th, i, inuse); - if (rc) - break; + LASSERT(info->lti_comp_idx != NULL); + info->lti_comp_idx[info->lti_count++] = i; } - if (need_create) - lod_obj_inc_layout_gen(lo); - else - GOTO(unlock, rc = -EALREADY); + if (info->lti_count == 0) + RETURN(-EALREADY); - if (!rc) { - info->lti_buf.lb_len = lod_comp_md_size(lo, false); - rc = lod_sub_declare_xattr_set(env, next, &info->lti_buf, - XATTR_NAME_LOV, 0, th); - } + lod_obj_inc_layout_gen(lo); + rc = lod_declare_instantiate_components(env, lo, th); out: if (rc) - lod_object_free_striping(env, lo); - -unlock: - dt_write_unlock(env, next); - + lod_striping_free(env, lo); RETURN(rc); } -/** - * Instantiate layout component objects which covers the intent write offset. - */ -static int lod_layout_change(const struct lu_env *env, struct dt_object *dt, - struct layout_intent *layout, - const struct lu_buf *buf, struct thandle *th) +static inline int lod_comp_index(struct lod_object *lo, + struct lod_layout_component *lod_comp) { - struct lu_attr *attr = &lod_env_info(env)->lti_attr; + LASSERT(lod_comp >= lo->ldo_comp_entries && + lod_comp <= &lo->ldo_comp_entries[lo->ldo_comp_cnt - 1]); - RETURN(lod_striped_create(env, dt, attr, NULL, th)); + return lod_comp - lo->ldo_comp_entries; } -struct dt_object_operations lod_obj_ops = { - .do_read_lock = lod_read_lock, - .do_write_lock = lod_write_lock, - .do_read_unlock = lod_read_unlock, - .do_write_unlock = lod_write_unlock, - .do_write_locked = lod_write_locked, - .do_attr_get = lod_attr_get, - .do_declare_attr_set = lod_declare_attr_set, - .do_attr_set = lod_attr_set, - .do_xattr_get = lod_xattr_get, - .do_declare_xattr_set = lod_declare_xattr_set, - .do_xattr_set = lod_xattr_set, - .do_declare_xattr_del = lod_declare_xattr_del, - .do_xattr_del = lod_xattr_del, - .do_xattr_list = lod_xattr_list, - .do_ah_init = lod_ah_init, - .do_declare_create = lod_declare_create, - .do_create = lod_create, - .do_declare_destroy = lod_declare_destroy, - .do_destroy = lod_destroy, - .do_index_try = lod_index_try, - .do_declare_ref_add = lod_declare_ref_add, - .do_ref_add = lod_ref_add, - .do_declare_ref_del = lod_declare_ref_del, - .do_ref_del = lod_ref_del, - .do_object_sync = lod_object_sync, - .do_object_lock = lod_object_lock, - .do_object_unlock = lod_object_unlock, - .do_invalidate = lod_invalidate, - .do_declare_layout_change = lod_declare_layout_change, - .do_layout_change = lod_layout_change, -}; - /** - * Implementation of dt_body_operations::dbo_read. - * - * \see dt_body_operations::dbo_read() in the API description for details. + * Stale other mirrors by writing extent. */ -static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt, - struct lu_buf *buf, loff_t *pos) +static void lod_stale_components(struct lod_object *lo, int primary, + struct lu_extent *extent) { - struct dt_object *next = dt_object_child(dt); + struct lod_layout_component *pri_comp, *lod_comp; + int i; - LASSERT(S_ISREG(dt->do_lu.lo_header->loh_attr) || - S_ISLNK(dt->do_lu.lo_header->loh_attr)); - return next->do_body_ops->dbo_read(env, next, buf, pos); + /* The writing extent decides which components in the primary + * are affected... */ + CDEBUG(D_LAYOUT, "primary mirror %d, "DEXT"\n", primary, PEXT(extent)); + lod_foreach_mirror_comp(pri_comp, lo, primary) { + if (!lu_extent_is_overlapped(extent, &pri_comp->llc_extent)) + continue; + + CDEBUG(D_LAYOUT, "primary comp %u "DEXT"\n", + lod_comp_index(lo, pri_comp), + PEXT(&pri_comp->llc_extent)); + + for (i = 0; i < lo->ldo_mirror_count; i++) { + if (i == primary) + continue; + + /* ... and then stale other components that are + * overlapping with primary components */ + lod_foreach_mirror_comp(lod_comp, lo, i) { + if (!lu_extent_is_overlapped( + &pri_comp->llc_extent, + &lod_comp->llc_extent)) + continue; + + CDEBUG(D_LAYOUT, "stale: %u / %u\n", + i, lod_comp_index(lo, lod_comp)); + + lod_comp->llc_flags |= LCME_FL_STALE; + lo->ldo_mirrors[i].lme_stale = 1; + } + } + } +} + +/** + * check an OST's availability + * \param[in] env execution environment + * \param[in] lo lod object + * \param[in] dt dt object + * \param[in] index mirror index + * + * \retval negative if failed + * \retval 1 if \a dt is available + * \retval 0 if \a dt is not available + */ +static inline int lod_check_ost_avail(const struct lu_env *env, + struct lod_object *lo, + struct dt_object *dt, int index) +{ + struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); + struct lod_tgt_desc *ost; + __u32 idx; + int type = LU_SEQ_RANGE_OST; + int rc; + + rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu), &idx, &type); + if (rc < 0) { + CERROR("%s: can't locate "DFID":rc = %d\n", + lod2obd(lod)->obd_name, PFID(lu_object_fid(&dt->do_lu)), + rc); + return rc; + } + + ost = OST_TGT(lod, idx); + if (ost->ltd_statfs.os_state & + (OS_STATE_READONLY | OS_STATE_ENOSPC | OS_STATE_ENOINO | + OS_STATE_NOPRECREATE) || + ost->ltd_active == 0) { + CDEBUG(D_LAYOUT, DFID ": mirror %d OST%d unavail, rc = %d\n", + PFID(lod_object_fid(lo)), index, idx, rc); + return 0; + } + + return 1; +} + +/** + * Pick primary mirror for write + * \param[in] env execution environment + * \param[in] lo object + * \param[in] extent write range + */ +static int lod_primary_pick(const struct lu_env *env, struct lod_object *lo, + struct lu_extent *extent) +{ + struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); + unsigned int seq = 0; + struct lod_layout_component *lod_comp; + int i, j, rc; + int picked = -1, second_pick = -1, third_pick = -1; + ENTRY; + + if (OBD_FAIL_CHECK(OBD_FAIL_FLR_RANDOM_PICK_MIRROR)) { + get_random_bytes(&seq, sizeof(seq)); + seq %= lo->ldo_mirror_count; + } + + /** + * Pick a mirror as the primary, and check the availability of OSTs. + * + * This algo can be revised later after knowing the topology of + * cluster. + */ + lod_qos_statfs_update(env, lod); + for (i = 0; i < lo->ldo_mirror_count; i++) { + bool ost_avail = true; + int index = (i + seq) % lo->ldo_mirror_count; + + if (lo->ldo_mirrors[index].lme_stale) { + CDEBUG(D_LAYOUT, DFID": mirror %d stale\n", + PFID(lod_object_fid(lo)), index); + continue; + } + + /* 2nd pick is for the primary mirror containing unavail OST */ + if (lo->ldo_mirrors[index].lme_primary && second_pick < 0) + second_pick = index; + + /* 3rd pick is for non-primary mirror containing unavail OST */ + if (second_pick < 0 && third_pick < 0) + third_pick = index; + + /** + * we found a non-primary 1st pick, we'd like to find a + * potential pirmary mirror. + */ + if (picked >= 0 && !lo->ldo_mirrors[index].lme_primary) + continue; + + /* check the availability of OSTs */ + lod_foreach_mirror_comp(lod_comp, lo, index) { + if (!lod_comp_inited(lod_comp) || !lod_comp->llc_stripe) + continue; + + for (j = 0; j < lod_comp->llc_stripe_count; j++) { + struct dt_object *dt = lod_comp->llc_stripe[j]; + + rc = lod_check_ost_avail(env, lo, dt, index); + if (rc < 0) + RETURN(rc); + + ost_avail = !!rc; + if (!ost_avail) + break; + } /* for all dt object in one component */ + if (!ost_avail) + break; + } /* for all components in a mirror */ + + /** + * the OSTs where allocated objects locates in the components + * of the mirror are available. + */ + if (!ost_avail) + continue; + + /* this mirror has all OSTs available */ + picked = index; + + /** + * primary with all OSTs are available, this is the perfect + * 1st pick. + */ + if (lo->ldo_mirrors[index].lme_primary) + break; + } /* for all mirrors */ + + /* failed to pick a sound mirror, lower our expectation */ + if (picked < 0) + picked = second_pick; + if (picked < 0) + picked = third_pick; + if (picked < 0) + RETURN(-ENODATA); + + RETURN(picked); +} + +static int lod_prepare_resync_mirror(const struct lu_env *env, + struct lod_object *lo, + __u16 mirror_id) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_layout_component *lod_comp; + bool neg = !!(MIRROR_ID_NEG & mirror_id); + int i; + + mirror_id &= ~MIRROR_ID_NEG; + + for (i = 0; i < lo->ldo_mirror_count; i++) { + if ((!neg && lo->ldo_mirrors[i].lme_id != mirror_id) || + (neg && lo->ldo_mirrors[i].lme_id == mirror_id)) + continue; + + lod_foreach_mirror_comp(lod_comp, lo, i) { + if (lod_comp_inited(lod_comp)) + continue; + + info->lti_comp_idx[info->lti_count++] = + lod_comp_index(lo, lod_comp); + } + } + + return 0; +} + +/** + * figure out the components should be instantiated for resync. + */ +static int lod_prepare_resync(const struct lu_env *env, struct lod_object *lo, + struct lu_extent *extent) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_layout_component *lod_comp; + unsigned int need_sync = 0; + int i; + + CDEBUG(D_LAYOUT, + DFID": instantiate all stale components in "DEXT"\n", + PFID(lod_object_fid(lo)), PEXT(extent)); + + /** + * instantiate all components within this extent, even non-stale + * components. + */ + for (i = 0; i < lo->ldo_mirror_count; i++) { + if (!lo->ldo_mirrors[i].lme_stale) + continue; + + lod_foreach_mirror_comp(lod_comp, lo, i) { + if (!lu_extent_is_overlapped(extent, + &lod_comp->llc_extent)) + break; + + need_sync++; + + if (lod_comp_inited(lod_comp)) + continue; + + CDEBUG(D_LAYOUT, "resync instantiate %d / %d\n", + i, lod_comp_index(lo, lod_comp)); + info->lti_comp_idx[info->lti_count++] = + lod_comp_index(lo, lod_comp); + } + } + + return need_sync ? 0 : -EALREADY; +} + +static int lod_declare_update_rdonly(const struct lu_env *env, + struct lod_object *lo, struct md_layout_change *mlc, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lu_attr *layout_attr = &info->lti_layout_attr; + struct lod_layout_component *lod_comp; + struct lu_extent extent = { 0 }; + int rc; + ENTRY; + + LASSERT(lo->ldo_flr_state == LCM_FL_RDONLY); + LASSERT(mlc->mlc_opc == MD_LAYOUT_WRITE || + mlc->mlc_opc == MD_LAYOUT_RESYNC); + LASSERT(lo->ldo_mirror_count > 0); + + if (mlc->mlc_opc == MD_LAYOUT_WRITE) { + struct layout_intent *layout = mlc->mlc_intent; + int picked; + + extent = layout->li_extent; + CDEBUG(D_LAYOUT, DFID": trying to write :"DEXT"\n", + PFID(lod_object_fid(lo)), PEXT(&extent)); + + picked = lod_primary_pick(env, lo, &extent); + if (picked < 0) + RETURN(picked); + + CDEBUG(D_LAYOUT, DFID": picked mirror id %u as primary\n", + PFID(lod_object_fid(lo)), + lo->ldo_mirrors[picked].lme_id); + + if (layout->li_opc == LAYOUT_INTENT_TRUNC) { + /** + * trunc transfers [0, size) in the intent extent, we'd + * stale components overlapping [size, eof). + */ + extent.e_start = extent.e_end; + extent.e_end = OBD_OBJECT_EOF; + } + + /* stale overlapping components from other mirrors */ + lod_stale_components(lo, picked, &extent); + + /* restore truncate intent extent */ + if (layout->li_opc == LAYOUT_INTENT_TRUNC) + extent.e_end = extent.e_start; + + /* instantiate components for the picked mirror, start from 0 */ + extent.e_start = 0; + + lod_foreach_mirror_comp(lod_comp, lo, picked) { + if (!lu_extent_is_overlapped(&extent, + &lod_comp->llc_extent)) + break; + + if (lod_comp_inited(lod_comp)) + continue; + + info->lti_comp_idx[info->lti_count++] = + lod_comp_index(lo, lod_comp); + } + + lo->ldo_flr_state = LCM_FL_WRITE_PENDING; + } else { /* MD_LAYOUT_RESYNC */ + int i; + + /** + * could contain multiple non-stale mirrors, so we need to + * prep uninited all components assuming any non-stale mirror + * could be picked as the primary mirror. + */ + if (mlc->mlc_mirror_id == 0) { + /* normal resync */ + for (i = 0; i < lo->ldo_mirror_count; i++) { + if (lo->ldo_mirrors[i].lme_stale) + continue; + + lod_foreach_mirror_comp(lod_comp, lo, i) { + if (!lod_comp_inited(lod_comp)) + break; + + if (extent.e_end < + lod_comp->llc_extent.e_end) + extent.e_end = + lod_comp->llc_extent.e_end; + } + } + rc = lod_prepare_resync(env, lo, &extent); + if (rc) + GOTO(out, rc); + } else { + /* mirror write, try to init its all components */ + rc = lod_prepare_resync_mirror(env, lo, + mlc->mlc_mirror_id); + if (rc) + GOTO(out, rc); + } + + /* change the file state to SYNC_PENDING */ + lo->ldo_flr_state = LCM_FL_SYNC_PENDING; + } + + /* Reset the layout version once it's becoming too large. + * This way it can make sure that the layout version is + * monotonously increased in this writing era. */ + lod_obj_inc_layout_gen(lo); + if (lo->ldo_layout_gen > (LCME_ID_MAX >> 1)) { + __u32 layout_version; + + cfs_get_random_bytes(&layout_version, sizeof(layout_version)); + lo->ldo_layout_gen = layout_version & 0xffff; + } + + rc = lod_declare_instantiate_components(env, lo, th); + if (rc) + GOTO(out, rc); + + layout_attr->la_valid = LA_LAYOUT_VERSION; + layout_attr->la_layout_version = 0; /* set current version */ + if (mlc->mlc_opc == MD_LAYOUT_RESYNC) + layout_attr->la_layout_version = LU_LAYOUT_RESYNC; + rc = lod_declare_attr_set(env, &lo->ldo_obj, layout_attr, th); + if (rc) + GOTO(out, rc); + +out: + if (rc) + lod_striping_free(env, lo); + RETURN(rc); +} + +static int lod_declare_update_write_pending(const struct lu_env *env, + struct lod_object *lo, struct md_layout_change *mlc, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lu_attr *layout_attr = &info->lti_layout_attr; + struct lod_layout_component *lod_comp; + struct lu_extent extent = { 0 }; + int primary = -1; + int i; + int rc; + ENTRY; + + LASSERT(lo->ldo_flr_state == LCM_FL_WRITE_PENDING); + LASSERT(mlc->mlc_opc == MD_LAYOUT_WRITE || + mlc->mlc_opc == MD_LAYOUT_RESYNC); + + /* look for the primary mirror */ + for (i = 0; i < lo->ldo_mirror_count; i++) { + if (lo->ldo_mirrors[i].lme_stale) + continue; + + LASSERTF(primary < 0, DFID " has multiple primary: %u / %u", + PFID(lod_object_fid(lo)), + lo->ldo_mirrors[i].lme_id, + lo->ldo_mirrors[primary].lme_id); + + primary = i; + } + if (primary < 0) { + CERROR(DFID ": doesn't have a primary mirror\n", + PFID(lod_object_fid(lo))); + GOTO(out, rc = -ENODATA); + } + + CDEBUG(D_LAYOUT, DFID": found primary %u\n", + PFID(lod_object_fid(lo)), lo->ldo_mirrors[primary].lme_id); + + LASSERT(!lo->ldo_mirrors[primary].lme_stale); + + /* for LAYOUT_WRITE opc, it has to do the following operations: + * 1. stale overlapping componets from stale mirrors; + * 2. instantiate components of the primary mirror; + * 3. transfter layout version to all objects of the primary; + * + * for LAYOUT_RESYNC opc, it will do: + * 1. instantiate components of all stale mirrors; + * 2. transfer layout version to all objects to close write era. */ + + if (mlc->mlc_opc == MD_LAYOUT_WRITE) { + LASSERT(mlc->mlc_intent != NULL); + + extent = mlc->mlc_intent->li_extent; + + CDEBUG(D_LAYOUT, DFID": intent to write: "DEXT"\n", + PFID(lod_object_fid(lo)), PEXT(&extent)); + + if (mlc->mlc_intent->li_opc == LAYOUT_INTENT_TRUNC) { + /** + * trunc transfers [0, size) in the intent extent, we'd + * stale components overlapping [size, eof). + */ + extent.e_start = extent.e_end; + extent.e_end = OBD_OBJECT_EOF; + } + /* 1. stale overlapping components */ + lod_stale_components(lo, primary, &extent); + + /* 2. find out the components need instantiating. + * instantiate [0, mlc->mlc_intent->e_end) */ + + /* restore truncate intent extent */ + if (mlc->mlc_intent->li_opc == LAYOUT_INTENT_TRUNC) + extent.e_end = extent.e_start; + extent.e_start = 0; + + lod_foreach_mirror_comp(lod_comp, lo, primary) { + if (!lu_extent_is_overlapped(&extent, + &lod_comp->llc_extent)) + break; + + if (lod_comp_inited(lod_comp)) + continue; + + CDEBUG(D_LAYOUT, "write instantiate %d / %d\n", + primary, lod_comp_index(lo, lod_comp)); + info->lti_comp_idx[info->lti_count++] = + lod_comp_index(lo, lod_comp); + } + } else { /* MD_LAYOUT_RESYNC */ + if (mlc->mlc_mirror_id == 0) { + /* normal resync */ + lod_foreach_mirror_comp(lod_comp, lo, primary) { + if (!lod_comp_inited(lod_comp)) + break; + + extent.e_end = lod_comp->llc_extent.e_end; + } + + rc = lod_prepare_resync(env, lo, &extent); + if (rc) + GOTO(out, rc); + } else { + /* mirror write, try to init its all components */ + rc = lod_prepare_resync_mirror(env, lo, + mlc->mlc_mirror_id); + if (rc) + GOTO(out, rc); + } + + /* change the file state to SYNC_PENDING */ + lo->ldo_flr_state = LCM_FL_SYNC_PENDING; + } + + rc = lod_declare_instantiate_components(env, lo, th); + if (rc) + GOTO(out, rc); + + /* 3. transfer layout version to OST objects. + * transfer new layout version to OST objects so that stale writes + * can be denied. It also ends an era of writing by setting + * LU_LAYOUT_RESYNC. Normal client can never use this bit to + * send write RPC; only resync RPCs could do it. */ + layout_attr->la_valid = LA_LAYOUT_VERSION; + layout_attr->la_layout_version = 0; /* set current version */ + if (mlc->mlc_opc == MD_LAYOUT_RESYNC) + layout_attr->la_layout_version = LU_LAYOUT_RESYNC; + rc = lod_declare_attr_set(env, &lo->ldo_obj, layout_attr, th); + if (rc) + GOTO(out, rc); + + lod_obj_inc_layout_gen(lo); +out: + if (rc) + lod_striping_free(env, lo); + RETURN(rc); +} + +static int lod_declare_update_sync_pending(const struct lu_env *env, + struct lod_object *lo, struct md_layout_change *mlc, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + unsigned sync_components = 0; + unsigned resync_components = 0; + int i; + int rc; + ENTRY; + + LASSERT(lo->ldo_flr_state == LCM_FL_SYNC_PENDING); + LASSERT(mlc->mlc_opc == MD_LAYOUT_RESYNC_DONE || + mlc->mlc_opc == MD_LAYOUT_WRITE); + + CDEBUG(D_LAYOUT, DFID ": received op %d in sync pending\n", + PFID(lod_object_fid(lo)), mlc->mlc_opc); + + if (mlc->mlc_opc == MD_LAYOUT_WRITE) { + CDEBUG(D_LAYOUT, DFID": cocurrent write to sync pending\n", + PFID(lod_object_fid(lo))); + + lo->ldo_flr_state = LCM_FL_WRITE_PENDING; + return lod_declare_update_write_pending(env, lo, mlc, th); + } + + /* MD_LAYOUT_RESYNC_DONE */ + + for (i = 0; i < lo->ldo_comp_cnt; i++) { + struct lod_layout_component *lod_comp; + int j; + + lod_comp = &lo->ldo_comp_entries[i]; + + if (!(lod_comp->llc_flags & LCME_FL_STALE)) { + sync_components++; + continue; + } + + for (j = 0; j < mlc->mlc_resync_count; j++) { + if (lod_comp->llc_id != mlc->mlc_resync_ids[j]) + continue; + + mlc->mlc_resync_ids[j] = LCME_ID_INVAL; + lod_comp->llc_flags &= ~LCME_FL_STALE; + resync_components++; + break; + } + } + + /* valid check */ + for (i = 0; i < mlc->mlc_resync_count; i++) { + if (mlc->mlc_resync_ids[i] == LCME_ID_INVAL) + continue; + + CDEBUG(D_LAYOUT, DFID": lcme id %u (%d / %zd) not exist " + "or already synced\n", PFID(lod_object_fid(lo)), + mlc->mlc_resync_ids[i], i, mlc->mlc_resync_count); + GOTO(out, rc = -EINVAL); + } + + if (!sync_components || (mlc->mlc_resync_count && !resync_components)) { + CDEBUG(D_LAYOUT, DFID": no mirror in sync\n", + PFID(lod_object_fid(lo))); + + /* tend to return an error code here to prevent + * the MDT from setting SoM attribute */ + GOTO(out, rc = -EINVAL); + } + + CDEBUG(D_LAYOUT, DFID": resynced %u/%zu components\n", + PFID(lod_object_fid(lo)), + resync_components, mlc->mlc_resync_count); + + lo->ldo_flr_state = LCM_FL_RDONLY; + lod_obj_inc_layout_gen(lo); + + info->lti_buf.lb_len = lod_comp_md_size(lo, false); + rc = lod_sub_declare_xattr_set(env, lod_object_child(lo), + &info->lti_buf, XATTR_NAME_LOV, 0, th); + EXIT; + +out: + if (rc) + lod_striping_free(env, lo); + RETURN(rc); +} + +static int lod_declare_layout_change(const struct lu_env *env, + struct dt_object *dt, struct md_layout_change *mlc, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_object *lo = lod_dt_obj(dt); + int rc; + ENTRY; + + if (!S_ISREG(dt->do_lu.lo_header->loh_attr) || !dt_object_exists(dt) || + dt_object_remote(dt_object_child(dt))) + RETURN(-EINVAL); + + rc = lod_striping_load(env, lo); + if (rc) + GOTO(out, rc); + + LASSERT(lo->ldo_comp_cnt > 0); + + rc = lod_layout_data_init(info, lo->ldo_comp_cnt); + if (rc) + GOTO(out, rc); + + switch (lo->ldo_flr_state) { + case LCM_FL_NONE: + rc = lod_declare_update_plain(env, lo, mlc->mlc_intent, + &mlc->mlc_buf, th); + break; + case LCM_FL_RDONLY: + rc = lod_declare_update_rdonly(env, lo, mlc, th); + break; + case LCM_FL_WRITE_PENDING: + rc = lod_declare_update_write_pending(env, lo, mlc, th); + break; + case LCM_FL_SYNC_PENDING: + rc = lod_declare_update_sync_pending(env, lo, mlc, th); + break; + default: + rc = -ENOTSUPP; + break; + } +out: + RETURN(rc); +} + +/** + * Instantiate layout component objects which covers the intent write offset. + */ +static int lod_layout_change(const struct lu_env *env, struct dt_object *dt, + struct md_layout_change *mlc, struct thandle *th) +{ + struct lu_attr *attr = &lod_env_info(env)->lti_attr; + struct lu_attr *layout_attr = &lod_env_info(env)->lti_layout_attr; + struct lod_object *lo = lod_dt_obj(dt); + int rc; + + rc = lod_striped_create(env, dt, attr, NULL, th); + if (!rc && layout_attr->la_valid & LA_LAYOUT_VERSION) { + layout_attr->la_layout_version |= lo->ldo_layout_gen; + rc = lod_attr_set(env, dt, layout_attr, th); + } + + return rc; +} + +struct dt_object_operations lod_obj_ops = { + .do_read_lock = lod_read_lock, + .do_write_lock = lod_write_lock, + .do_read_unlock = lod_read_unlock, + .do_write_unlock = lod_write_unlock, + .do_write_locked = lod_write_locked, + .do_attr_get = lod_attr_get, + .do_declare_attr_set = lod_declare_attr_set, + .do_attr_set = lod_attr_set, + .do_xattr_get = lod_xattr_get, + .do_declare_xattr_set = lod_declare_xattr_set, + .do_xattr_set = lod_xattr_set, + .do_declare_xattr_del = lod_declare_xattr_del, + .do_xattr_del = lod_xattr_del, + .do_xattr_list = lod_xattr_list, + .do_ah_init = lod_ah_init, + .do_declare_create = lod_declare_create, + .do_create = lod_create, + .do_declare_destroy = lod_declare_destroy, + .do_destroy = lod_destroy, + .do_index_try = lod_index_try, + .do_declare_ref_add = lod_declare_ref_add, + .do_ref_add = lod_ref_add, + .do_declare_ref_del = lod_declare_ref_del, + .do_ref_del = lod_ref_del, + .do_object_sync = lod_object_sync, + .do_object_lock = lod_object_lock, + .do_object_unlock = lod_object_unlock, + .do_invalidate = lod_invalidate, + .do_declare_layout_change = lod_declare_layout_change, + .do_layout_change = lod_layout_change, +}; + +/** + * Implementation of dt_body_operations::dbo_read. + * + * \see dt_body_operations::dbo_read() in the API description for details. + */ +static ssize_t lod_read(const struct lu_env *env, struct dt_object *dt, + struct lu_buf *buf, loff_t *pos) +{ + struct dt_object *next = dt_object_child(dt); + + LASSERT(S_ISREG(dt->do_lu.lo_header->loh_attr) || + S_ISLNK(dt->do_lu.lo_header->loh_attr)); + return next->do_body_ops->dbo_read(env, next, buf, pos); } /** @@ -5050,11 +6887,11 @@ static ssize_t lod_declare_write(const struct lu_env *env, */ static ssize_t lod_write(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, loff_t *pos, - struct thandle *th, int iq) + struct thandle *th) { LASSERT(S_ISREG(dt->do_lu.lo_header->loh_attr) || S_ISLNK(dt->do_lu.lo_header->loh_attr)); - return lod_sub_write(env, dt_object_child(dt), buf, pos, th, iq); + return lod_sub_write(env, dt_object_child(dt), buf, pos, th); } static int lod_declare_punch(const struct lu_env *env, struct dt_object *dt, @@ -5168,6 +7005,56 @@ static int lod_object_init(const struct lu_env *env, struct lu_object *lo, /** * + * Alloc cached foreign LOV + * + * \param[in] lo object + * \param[in] size size of foreign LOV + * + * \retval 0 on success + * \retval negative if failed + */ +int lod_alloc_foreign_lov(struct lod_object *lo, size_t size) +{ + OBD_ALLOC_LARGE(lo->ldo_foreign_lov, size); + if (lo->ldo_foreign_lov == NULL) + return -ENOMEM; + lo->ldo_foreign_lov_size = size; + lo->ldo_is_foreign = 1; + return 0; +} + +/** + * + * Free cached foreign LOV + * + * \param[in] lo object + */ +void lod_free_foreign_lov(struct lod_object *lo) +{ + if (lo->ldo_foreign_lov != NULL) + OBD_FREE_LARGE(lo->ldo_foreign_lov, lo->ldo_foreign_lov_size); + lo->ldo_foreign_lov = NULL; + lo->ldo_foreign_lov_size = 0; + lo->ldo_is_foreign = 0; +} + +/** + * + * Free cached foreign LMV + * + * \param[in] lo object + */ +void lod_free_foreign_lmv(struct lod_object *lo) +{ + if (lo->ldo_foreign_lmv != NULL) + OBD_FREE_LARGE(lo->ldo_foreign_lmv, lo->ldo_foreign_lmv_size); + lo->ldo_foreign_lmv = NULL; + lo->ldo_foreign_lmv_size = 0; + lo->ldo_dir_is_foreign = 0; +} + +/** + * * Release resources associated with striping. * * If the object is striped (regular or directory), then release @@ -5176,12 +7063,18 @@ static int lod_object_init(const struct lu_env *env, struct lu_object *lo, * \param[in] env execution environment * \param[in] lo object */ -void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo) +void lod_striping_free_nolock(const struct lu_env *env, struct lod_object *lo) { struct lod_layout_component *lod_comp; int i, j; - if (lo->ldo_stripe != NULL) { + if (unlikely(lo->ldo_is_foreign)) { + lod_free_foreign_lov(lo); + lo->ldo_comp_cached = 0; + } else if (unlikely(lo->ldo_dir_is_foreign)) { + lod_free_foreign_lmv(lo); + lo->ldo_dir_stripe_loaded = 0; + } else if (lo->ldo_stripe != NULL) { LASSERT(lo->ldo_comp_entries == NULL); LASSERT(lo->ldo_dir_stripes_allocated > 0); @@ -5213,6 +7106,10 @@ void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo) sizeof(struct dt_object *) * lod_comp->llc_stripes_allocated); lod_comp->llc_stripe = NULL; + OBD_FREE(lod_comp->llc_ost_indices, + sizeof(__u32) * + lod_comp->llc_stripes_allocated); + lod_comp->llc_ost_indices = NULL; lod_comp->llc_stripes_allocated = 0; } lod_free_comp_entries(lo); @@ -5220,6 +7117,13 @@ void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo) } } +void lod_striping_free(const struct lu_env *env, struct lod_object *lo) +{ + mutex_lock(&lo->ldo_layout_mutex); + lod_striping_free_nolock(env, lo); + mutex_unlock(&lo->ldo_layout_mutex); +} + /** * Implementation of lu_object_operations::loo_object_free. * @@ -5231,7 +7135,7 @@ static void lod_object_free(const struct lu_env *env, struct lu_object *o) struct lod_object *lo = lu2lod_obj(o); /* release all underlying object pinned */ - lod_object_free_striping(env, lo); + lod_striping_free(env, lo); lu_object_fini(o); OBD_SLAB_FREE_PTR(lo, lod_object_kmem); }