X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Flod%2Flod_object.c;h=ea57141597a1ece3d1d6b5f0c6f375e8647d472d;hp=660dac0e7dccc3267c1e6982188d148a1f82108b;hb=HEAD;hpb=f0736a6a52ed95814d2cac875caf34f7fc233bf3 diff --git a/lustre/lod/lod_object.c b/lustre/lod/lod_object.c index 660dac0..3878d3b 100644 --- a/lustre/lod/lod_object.c +++ b/lustre/lod/lod_object.c @@ -344,7 +344,7 @@ static int lod_it_key_rec(const struct lu_env *env, const struct dt_it *di, key_rec); } -static struct dt_index_operations lod_index_ops = { +static const struct dt_index_operations lod_index_ops = { .dio_lookup = lod_lookup, .dio_declare_insert = lod_declare_insert, .dio_insert = lod_insert, @@ -787,7 +787,7 @@ static int lod_striped_it_load(const struct lu_env *env, return next->do_index_ops->dio_it.load(env, it->lit_it, hash); } -static struct dt_index_operations lod_striped_index_ops = { +static const struct dt_index_operations lod_striped_index_ops = { .dio_lookup = lod_striped_lookup, .dio_declare_insert = lod_declare_insert, .dio_insert = lod_insert, @@ -878,7 +878,7 @@ int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo, memcpy(buf->lb_buf, tbuf.lb_buf, tbuf.lb_len); } - if (unlikely(!dt_try_as_dir(env, obj))) + if (unlikely(!dt_try_as_dir(env, obj, true))) RETURN(-ENOTDIR); memset(&lmv1->lmv_stripe_fids[0], 0, stripes * sizeof(struct lu_fid)); @@ -920,13 +920,12 @@ int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo, /* The ent->lde_name is composed of ${FID}:${index} */ if (ent->lde_namelen < len + 1 || memcmp(ent->lde_name, name, len) != 0) { - CDEBUG(lod->lod_lmv_failout ? D_ERROR : D_INFO, - "%s: invalid shard name %.*s with the FID "DFID - " for the striped directory "DFID", %s\n", - lod2obd(lod)->obd_name, ent->lde_namelen, - ent->lde_name, PFID(&fid), - PFID(lu_object_fid(&obj->do_lu)), - lod->lod_lmv_failout ? "failout" : "skip"); + CDEBUG_LIMIT(lod->lod_lmv_failout ? D_ERROR : D_INFO, + "%s: invalid shard name %.*s with the FID "DFID" for the striped directory "DFID", %s\n", + lod2obd(lod)->obd_name, ent->lde_namelen, + ent->lde_name, PFID(&fid), + PFID(lu_object_fid(&obj->do_lu)), + lod->lod_lmv_failout ? "failout" : "skip"); if (lod->lod_lmv_failout) break; @@ -938,15 +937,15 @@ int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo, do { if (ent->lde_name[len] < '0' || ent->lde_name[len] > '9') { - CDEBUG(lod->lod_lmv_failout ? D_ERROR : D_INFO, - "%s: invalid shard name %.*s with the " - "FID "DFID" for the striped directory " - DFID", %s\n", - lod2obd(lod)->obd_name, ent->lde_namelen, - ent->lde_name, PFID(&fid), - PFID(lu_object_fid(&obj->do_lu)), - lod->lod_lmv_failout ? - "failout" : "skip"); + CDEBUG_LIMIT(lod->lod_lmv_failout ? + D_ERROR : D_INFO, + "%s: invalid shard name %.*s with the FID "DFID" for the striped directory "DFID", %s\n", + lod2obd(lod)->obd_name, + ent->lde_namelen, + ent->lde_name, PFID(&fid), + PFID(lu_object_fid(&obj->do_lu)), + lod->lod_lmv_failout ? + "failout" : "skip"); if (lod->lod_lmv_failout) break; @@ -972,7 +971,8 @@ int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo, } /* The slot has been occupied. */ - if (!fid_is_zero(&lmv1->lmv_stripe_fids[index])) { + if (!fid_is_zero(&lmv1->lmv_stripe_fids[index]) && + !CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME)) { struct lu_fid fid0; fid_le_to_cpu(&fid0, @@ -1144,6 +1144,7 @@ void lod_adjust_stripe_size(struct lod_layout_component *comp, } /* check stripe size is multiplier of comp_end */ if (comp_end != LUSTRE_EOF && + comp_end != comp->llc_extent.e_start && comp_end % comp->llc_stripe_size) { /* fix that even for defined stripe size but warn * about the problem, that must not happen @@ -1159,7 +1160,7 @@ static inline void lod_adjust_stripe_info(struct lod_layout_component *comp, struct lov_desc *desc, int append_stripes) { - if (comp->llc_pattern != LOV_PATTERN_MDT) { + if (!(comp->llc_pattern & LOV_PATTERN_MDT)) { if (append_stripes) { comp->llc_stripe_count = append_stripes; } else if (!comp->llc_stripe_count) { @@ -1176,20 +1177,23 @@ int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo, struct lod_obj_stripe_cb_data *data) { struct lod_layout_component *lod_comp; - int i, j, rc; + int i, j, rc = 0; ENTRY; - LASSERT(lo->ldo_comp_cnt != 0 && lo->ldo_comp_entries != NULL); + mutex_lock(&lo->ldo_layout_mutex); for (i = 0; i < lo->ldo_comp_cnt; i++) { lod_comp = &lo->ldo_comp_entries[i]; + if (lod_comp->llc_magic == LOV_MAGIC_FOREIGN) + continue; + if (lod_comp->llc_stripe == NULL) continue; /* has stripe but not inited yet, this component has been * declared to be created, but hasn't created yet. */ - if (!lod_comp_inited(lod_comp)) + if (!lod_comp_inited(lod_comp) && !data->locd_declare) continue; if (data->locd_comp_skip_cb && @@ -1199,7 +1203,7 @@ int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo, if (data->locd_comp_cb) { rc = data->locd_comp_cb(env, lo, i, data); if (rc) - RETURN(rc); + GOTO(unlock, rc); } /* could used just to do sth about component, not each @@ -1216,62 +1220,12 @@ int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo, continue; rc = data->locd_stripe_cb(env, lo, dt, th, i, j, data); if (rc != 0) - RETURN(rc); - } - } - RETURN(0); -} - -static bool lod_obj_attr_set_comp_skip_cb(const struct lu_env *env, - struct lod_object *lo, int comp_idx, - struct lod_obj_stripe_cb_data *data) -{ - struct lod_layout_component *lod_comp = &lo->ldo_comp_entries[comp_idx]; - bool skipped = false; - - if (!(data->locd_attr->la_valid & LA_LAYOUT_VERSION)) - return skipped; - - switch (lo->ldo_flr_state) { - case LCM_FL_WRITE_PENDING: { - int i; - - /* skip stale components */ - if (lod_comp->llc_flags & LCME_FL_STALE) { - skipped = true; - break; - } - - /* skip valid and overlapping components, therefore any - * attempts to write overlapped components will never succeed - * because client will get EINPROGRESS. */ - for (i = 0; i < lo->ldo_comp_cnt; i++) { - if (i == comp_idx) - continue; - - if (lo->ldo_comp_entries[i].llc_flags & LCME_FL_STALE) - continue; - - if (lu_extent_is_overlapped(&lod_comp->llc_extent, - &lo->ldo_comp_entries[i].llc_extent)) { - skipped = true; - break; - } + GOTO(unlock, rc); } - break; - } - default: - LASSERTF(0, "impossible: %d\n", lo->ldo_flr_state); - case LCM_FL_SYNC_PENDING: - break; } - - CDEBUG(D_LAYOUT, DFID": %s to set component %x to version: %u\n", - PFID(lu_object_fid(&lo->ldo_obj.do_lu)), - skipped ? "skipped" : "chose", lod_comp->llc_id, - data->locd_attr->la_layout_version); - - return skipped; +unlock: + mutex_unlock(&lo->ldo_layout_mutex); + RETURN(rc); } static inline int @@ -1329,7 +1283,7 @@ static int lod_declare_attr_set(const struct lu_env *env, if (!(attr->la_valid & LA_REMOTE_ATTR_SET)) RETURN(rc); - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER)) + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER)) RETURN(0); } else { if (!(attr->la_valid & (LA_UID | LA_GID | LA_PROJID | LA_MODE | @@ -1380,13 +1334,13 @@ static int lod_declare_attr_set(const struct lu_env *env, !S_ISREG(attr->la_mode)) RETURN(0); - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE)) { + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE)) { rc = lod_sub_declare_xattr_del(env, next, XATTR_NAME_LOV, th); RETURN(rc); } - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) || - OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PFL_RANGE)) { + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) || + CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PFL_RANGE)) { struct lod_thread_info *info = lod_env_info(env); struct lu_buf *buf = &info->lti_buf; @@ -1428,7 +1382,7 @@ static int lod_attr_set(const struct lu_env *env, if (!(attr->la_valid & LA_REMOTE_ATTR_SET)) RETURN(rc); - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER)) + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER)) RETURN(0); } else { if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE | LA_PROJID | @@ -1469,7 +1423,6 @@ static int lod_attr_set(const struct lu_env *env, data.locd_attr = attr; data.locd_declare = false; - data.locd_comp_skip_cb = lod_obj_attr_set_comp_skip_cb; data.locd_stripe_cb = lod_obj_stripe_attr_set_cb; rc = lod_obj_for_each_stripe(env, lo, th, &data); } @@ -1481,12 +1434,12 @@ static int lod_attr_set(const struct lu_env *env, !S_ISREG(attr->la_mode)) RETURN(0); - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE)) { + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE)) { rc = lod_sub_xattr_del(env, next, XATTR_NAME_LOV, th); RETURN(rc); } - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE)) { + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE)) { struct lod_thread_info *info = lod_env_info(env); struct lu_buf *buf = &info->lti_buf; struct ost_id *oi = &info->lti_ostid; @@ -1524,7 +1477,7 @@ static int lod_attr_set(const struct lu_env *env, rc = lod_sub_xattr_set(env, next, buf, XATTR_NAME_LOV, LU_XATTR_REPLACE, th); - } else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PFL_RANGE)) { + } else if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PFL_RANGE)) { struct lod_thread_info *info = lod_env_info(env); struct lu_buf *buf = &info->lti_buf; struct lov_comp_md_v1 *lcm; @@ -1864,6 +1817,7 @@ int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo, } out: lo->ldo_stripe = stripe; + lo->ldo_is_foreign = 0; lo->ldo_dir_stripe_count = le32_to_cpu(lmv1->lmv_stripe_count); lo->ldo_dir_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count); lo->ldo_dir_layout_version = le32_to_cpu(lmv1->lmv_layout_version); @@ -1925,7 +1879,7 @@ static int lod_dir_declare_create_stripes(const struct lu_env *env, slave_lmv_buf.lb_buf = slave_lmm; slave_lmv_buf.lb_len = sizeof(*slave_lmm); - if (!dt_try_as_dir(env, dt_object_child(dt))) + if (!dt_try_as_dir(env, dt_object_child(dt), false)) GOTO(out, rc = -EINVAL); rec->rec_type = S_IFDIR; @@ -1947,7 +1901,7 @@ static int lod_dir_declare_create_stripes(const struct lu_env *env, if (rc != 0) GOTO(out, rc); - if (!dt_try_as_dir(env, dto)) + if (!dt_try_as_dir(env, dto, false)) GOTO(out, rc = -EINVAL); rc = lod_sub_declare_ref_add(env, dto, th); @@ -1971,7 +1925,7 @@ static int lod_dir_declare_create_stripes(const struct lu_env *env, if (rc != 0) GOTO(out, rc); - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && cfs_fail_val == i) snprintf(stripe_name, sizeof(info->lti_key), DFID":%u", @@ -2009,9 +1963,9 @@ static int lod_dir_declare_create_stripes(const struct lu_env *env, GOTO(out, rc); } - if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) || + if (!CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) || cfs_fail_val != i) { - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_LMV) && + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_LMV) && cfs_fail_val == i) slave_lmm->lmv_master_mdt_index = cpu_to_le32(i + 1); @@ -2093,11 +2047,17 @@ static int lod_mdt_alloc_specific(const struct lu_env *env, bool already_allocated = false; __u32 k; - CDEBUG(D_INFO, "try idx %d, mdt cnt %u, allocated %u\n", - idx, lod->lod_remote_mdt_count + 1, stripe_idx); + CDEBUG(D_INFO, + "try idx %d, mdt cnt %u, allocated %u, specific %d count %hu offset %d hash %#X\n", + idx, lod->lod_remote_mdt_count + 1, stripe_idx, + is_specific, lo->ldo_dir_stripe_count, + (int)lo->ldo_dir_stripe_offset, + lo->ldo_dir_hash_type); if (likely(!is_specific && - !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))) { + !CFS_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE) && + !(lo->ldo_dir_hash_type & + LMV_HASH_FLAG_OVERSTRIPED))) { /* check whether the idx already exists * in current allocated array */ for (k = 0; k < stripe_idx; k++) { @@ -2137,6 +2097,9 @@ static int lod_mdt_alloc_specific(const struct lu_env *env, /* this OSP doesn't feel well */ continue; + if (tgt->ltd_statfs.os_state & OS_STATFS_NOCREATE) + continue; + rc = dt_fid_alloc(env, tgt_dt, &fid, NULL, NULL); if (rc < 0) continue; @@ -2162,7 +2125,7 @@ static int lod_mdt_alloc_specific(const struct lu_env *env, * remote MDT, otherwise we may save too many local * slave locks which will exceed RS_MAX_LOCKS. */ - if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))) + if (unlikely(CFS_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))) idx = master_index; mdt_indices[stripe_idx + 1] = (idx + 1) % (lod->lod_remote_mdt_count + 1); @@ -2173,7 +2136,7 @@ static int lod_mdt_alloc_specific(const struct lu_env *env, LASSERT(fid_is_sane(&fid)); /* fail a remote stripe FID allocation */ - if (stripe_idx && OBD_FAIL_CHECK(OBD_FAIL_MDS_STRIPE_FID)) + if (stripe_idx && CFS_FAIL_CHECK(OBD_FAIL_MDS_STRIPE_FID)) continue; dto = dt_locate_at(env, tgt_dt, &fid, @@ -2210,6 +2173,7 @@ static int lod_prep_md_striped_create(const struct lu_env *env, struct dt_object **stripes; struct lu_object_conf conf = { .loc_flags = LOC_F_NEW }; struct lu_fid fid = { 0 }; + int mdt_count = lod->lod_remote_mdt_count + 1; __u32 stripe_count; int i; int rc = 0; @@ -2221,6 +2185,17 @@ static int lod_prep_md_striped_create(const struct lu_env *env, le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC_SPECIFIC); stripe_count = lo->ldo_dir_stripe_count; + if (!(lo->ldo_dir_hash_type & LMV_HASH_FLAG_OVERSTRIPED) && + stripe_count > mdt_count) + RETURN(-E2BIG); + + if ((lo->ldo_dir_hash_type & LMV_HASH_FLAG_OVERSTRIPED) && + (stripe_count > mdt_count * LMV_MAX_STRIPES_PER_MDT || + /* a single MDT doesn't initialize the infrastructure for striped + * directories, so we just don't support overstriping in that case + */ + mdt_count == 1)) + RETURN(-E2BIG); OBD_ALLOC_PTR_ARRAY(stripes, stripe_count); if (!stripes) @@ -2251,7 +2226,23 @@ static int lod_prep_md_striped_create(const struct lu_env *env, GOTO(out, rc = -ENOMEM); if (le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC_SPECIFIC) { + int stripes_per_mdt; + int mdt; + is_specific = true; + + /* Verify we do not exceed the stripes per MDT limit */ + for (mdt = 0; mdt < mdt_count + 1; mdt++) { + stripes_per_mdt = 0; + for (i = 0; i < stripe_count; i++) { + if (mdt == le32_to_cpu( + lum->lum_objects[i].lum_mds)) + stripes_per_mdt++; + } + if (stripes_per_mdt > LMV_MAX_STRIPES_PER_MDT) + GOTO(out_free, rc = -EINVAL); + } + for (i = 0; i < stripe_count; i++) idx_array[i] = le32_to_cpu(lum->lum_objects[i].lum_mds); @@ -2262,6 +2253,7 @@ static int lod_prep_md_striped_create(const struct lu_env *env, lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id; rc = lod_mdt_alloc_specific(env, lo, stripes, idx_array, is_specific); +out_free: OBD_FREE_PTR_ARRAY(idx_array, stripe_count); } @@ -2296,6 +2288,41 @@ out: /** * + * Alloc cached foreign LOV + * + * \param[in] lo object + * \param[in] size size of foreign LOV + * + * \retval 0 on success + * \retval negative if failed + */ +int lod_alloc_foreign_lov(struct lod_object *lo, size_t size) +{ + OBD_ALLOC_LARGE(lo->ldo_foreign_lov, size); + if (lo->ldo_foreign_lov == NULL) + return -ENOMEM; + lo->ldo_foreign_lov_size = size; + lo->ldo_is_foreign = 1; + return 0; +} + +/** + * + * Free cached foreign LOV + * + * \param[in] lo object + */ +void lod_free_foreign_lov(struct lod_object *lo) +{ + if (lo->ldo_foreign_lov != NULL) + OBD_FREE_LARGE(lo->ldo_foreign_lov, lo->ldo_foreign_lov_size); + lo->ldo_foreign_lov = NULL; + lo->ldo_foreign_lov_size = 0; + lo->ldo_is_foreign = 0; +} + +/** + * * Alloc cached foreign LMV * * \param[in] lo object @@ -2304,17 +2331,56 @@ out: * \retval 0 on success * \retval negative if failed */ -int lod_alloc_foreign_lmv(struct lod_object *lo, size_t size) +static int lod_alloc_foreign_lmv(struct lod_object *lo, size_t size) { OBD_ALLOC_LARGE(lo->ldo_foreign_lmv, size); if (lo->ldo_foreign_lmv == NULL) return -ENOMEM; lo->ldo_foreign_lmv_size = size; - lo->ldo_dir_is_foreign = 1; + lo->ldo_is_foreign = 1; return 0; } +static int lod_prep_md_replayed_create(const struct lu_env *env, + struct dt_object *dt, + struct lu_attr *attr, + const struct lu_buf *lmv_buf, + struct dt_object_format *dof, + struct thandle *th) +{ + struct lod_object *lo = lod_dt_obj(dt); + int rc; + + ENTRY; + + mutex_lock(&lo->ldo_layout_mutex); + rc = lod_parse_dir_striping(env, lo, lmv_buf); + if (rc == 0) { + lo->ldo_dir_stripe_loaded = 1; + lo->ldo_dir_striped = 1; + rc = lod_dir_declare_create_stripes(env, dt, attr, dof, th); + } + mutex_unlock(&lo->ldo_layout_mutex); + + RETURN(rc); +} + +/** + * + * Free cached foreign LMV + * + * \param[in] lo object + */ +static void lod_free_foreign_lmv(struct lod_object *lo) +{ + if (lo->ldo_foreign_lmv != NULL) + OBD_FREE_LARGE(lo->ldo_foreign_lmv, lo->ldo_foreign_lmv_size); + lo->ldo_foreign_lmv = NULL; + lo->ldo_foreign_lmv_size = 0; + lo->ldo_is_foreign = 0; +} + /** * Declare create striped md object. * @@ -2341,37 +2407,44 @@ static int lod_declare_xattr_set_lmv(const struct lu_env *env, struct dt_object_format *dof, struct thandle *th) { - struct lod_object *lo = lod_dt_obj(dt); - struct lmv_user_md_v1 *lum = lum_buf->lb_buf; - int rc; - ENTRY; + struct lod_object *lo = lod_dt_obj(dt); + struct lmv_user_md_v1 *lum = lum_buf->lb_buf; + int rc; + ENTRY; LASSERT(lum != NULL); - CDEBUG(D_INFO, "lum magic = %x count = %u offset = %d\n", - le32_to_cpu(lum->lum_magic), le32_to_cpu(lum->lum_stripe_count), - (int)le32_to_cpu(lum->lum_stripe_offset)); + CDEBUG(D_INFO, + "lum magic=%x hash=%x count=%u offset=%d inherit=%u rr=%u\n", + le32_to_cpu(lum->lum_magic), le32_to_cpu(lum->lum_hash_type), + le32_to_cpu(lum->lum_stripe_count), + (int)le32_to_cpu(lum->lum_stripe_offset), + lum->lum_max_inherit, lum->lum_max_inherit_rr); if (lo->ldo_dir_stripe_count == 0) { - if (lo->ldo_dir_is_foreign) { + if (lo->ldo_is_foreign) { rc = lod_alloc_foreign_lmv(lo, lum_buf->lb_len); if (rc != 0) - GOTO(out, rc); + RETURN(rc); memcpy(lo->ldo_foreign_lmv, lum, lum_buf->lb_len); lo->ldo_dir_stripe_loaded = 1; } - GOTO(out, rc = 0); + RETURN(0); } - /* prepare dir striped objects */ - rc = lod_prep_md_striped_create(env, dt, attr, lum, dof, th); - if (rc != 0) { + /* client replay striped directory creation with LMV, this happens when + * all involved MDTs were rebooted, or MDT recovery was aborted. + */ + if (le32_to_cpu(lum->lum_magic) == LMV_MAGIC_V1) + rc = lod_prep_md_replayed_create(env, dt, attr, lum_buf, dof, + th); + else + rc = lod_prep_md_striped_create(env, dt, attr, lum, dof, th); + if (rc != 0) /* failed to create striping, let's reset * config so that others don't get confused */ lod_striping_free(env, lo); - GOTO(out, rc); - } -out: + RETURN(rc); } @@ -2381,7 +2454,7 @@ out: * * \param[in] env execution environment * \param[in] dt target object - * \param[in] buf LMV buf which contains source stripe fids + * \param[in] lmv_buf LMV buf which contains source stripe FIDs * \param[in] fl set or replace * \param[in] th transaction handle * @@ -2390,14 +2463,14 @@ out: */ static int lod_dir_layout_set(const struct lu_env *env, struct dt_object *dt, - const struct lu_buf *buf, + const struct lu_buf *lmv_buf, int fl, struct thandle *th) { struct dt_object *next = dt_object_child(dt); struct lod_object *lo = lod_dt_obj(dt); struct lod_device *lod = lu2lod_dev(lod2lu_obj(lo)->lo_dev); - struct lmv_mds_md_v1 *lmv = buf->lb_buf; + struct lmv_mds_md_v1 *lmv = lmv_buf->lb_buf; struct lmv_mds_md_v1 *slave_lmv; struct lu_buf slave_buf; int i; @@ -2409,13 +2482,15 @@ static int lod_dir_layout_set(const struct lu_env *env, RETURN(-EINVAL); /* adjust hash for dir merge, which may not be set in user command */ - if (lmv_is_merging(lmv) && !lmv->lmv_migrate_hash) - lmv->lmv_merge_hash = - lod->lod_mdt_descs.ltd_lmv_desc.ld_pattern; + if (lmv_is_merging(lmv) && + !(lmv->lmv_migrate_hash & LMV_HASH_TYPE_MASK)) + lmv->lmv_merge_hash |= + lod->lod_mdt_descs.ltd_lmv_desc.ld_pattern & + LMV_HASH_TYPE_MASK; LMV_DEBUG(D_INFO, lmv, "set"); - rc = lod_sub_xattr_set(env, next, buf, XATTR_NAME_LMV, fl, th); + rc = lod_sub_xattr_set(env, next, lmv_buf, XATTR_NAME_LMV, fl, th); if (rc) RETURN(rc); @@ -2480,7 +2555,10 @@ static int lod_dir_declare_xattr_set(const struct lu_env *env, if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { struct lmv_user_md_v1 *lum; - LASSERT(buf != NULL && buf->lb_buf != NULL); + LASSERT(buf != NULL); + if (!buf->lb_buf || buf->lb_len < sizeof(*lum)) + RETURN(-EFAULT); + lum = buf->lb_buf; rc = lod_verify_md_striping(d, lum); if (rc != 0) @@ -2636,23 +2714,31 @@ static int lod_replace_parent_fid(const struct lu_env *env, RETURN(rc); } -__u16 lod_comp_entry_stripe_count(struct lod_object *lo, - int comp_idx, bool is_dir) +__u16 lod_comp_entry_stripe_count(struct lod_object *lo, int comp_idx, + bool is_dir) { struct lod_device *lod = lu2lod_dev(lod2lu_obj(lo)->lo_dev); struct lod_layout_component *entry; + enum lod_uses_hint flags = LOD_USES_ASSIGNED_STRIPE; - if (is_dir) - return 0; + if (is_dir) { + entry = &lo->ldo_def_striping->lds_def_comp_entries[comp_idx]; + return entry->llc_ostlist.op_count; + } entry = &lo->ldo_comp_entries[comp_idx]; if (lod_comp_inited(entry)) return entry->llc_stripe_count; - else if ((__u16)-1 == entry->llc_stripe_count) - return lod->lod_ost_count; - else - return lod_get_stripe_count(lod, lo, comp_idx, - entry->llc_stripe_count, false); + if (entry->llc_stripe_count == LOV_ALL_STRIPES) + return lod_get_stripe_count_plain(lod, lo, + entry->llc_stripe_count, + entry->llc_pattern & + LOV_PATTERN_OVERSTRIPING, + &flags); + + return lod_get_stripe_count(lod, lo, comp_idx, entry->llc_stripe_count, + entry->llc_pattern & LOV_PATTERN_OVERSTRIPING, + &flags); } static int lod_comp_md_size(struct lod_object *lo, bool is_dir) @@ -2687,13 +2773,21 @@ static int lod_comp_md_size(struct lod_object *lo, bool is_dir) for (i = 0; i < comp_cnt; i++) { __u16 stripe_count; - magic = comp_entries[i].llc_pool ? LOV_MAGIC_V3 : LOV_MAGIC_V1; - stripe_count = lod_comp_entry_stripe_count(lo, i, is_dir); - if (!is_dir && is_composite) - lod_comp_shrink_stripe_count(&comp_entries[i], - &stripe_count); - - size += lov_user_md_size(stripe_count, magic); + if (comp_entries[i].llc_magic == LOV_MAGIC_FOREIGN) { + size += lov_foreign_md_size(comp_entries[i].llc_length); + } else { + magic = comp_entries[i].llc_pool ? LOV_MAGIC_V3 : + LOV_MAGIC_V1; + stripe_count = lod_comp_entry_stripe_count(lo, i, + is_dir); + if (!is_dir && is_composite) + lod_comp_shrink_stripe_count(&comp_entries[i], + &stripe_count); + if (is_dir && comp_entries[i].llc_ostlist.op_count) + magic = LOV_MAGIC_SPECIFIC; + + size += lov_user_md_size(stripe_count, magic); + } LASSERT(size % sizeof(__u64) == 0); } return size; @@ -2723,7 +2817,6 @@ static int lod_declare_layout_add(const struct lu_env *env, struct dt_object *next = dt_object_child(dt); struct lov_desc *desc = &d->lod_ost_descs.ltd_lov_desc; struct lod_object *lo = lod_dt_obj(dt); - struct lov_user_md_v3 *v3; struct lov_comp_md_v1 *comp_v1 = buf->lb_buf; __u32 magic; int i, rc, array_cnt, old_array_cnt; @@ -2747,10 +2840,15 @@ static int lod_declare_layout_add(const struct lu_env *env, if (magic != LOV_USER_MAGIC_COMP_V1) RETURN(-EINVAL); + mutex_lock(&lo->ldo_layout_mutex); + array_cnt = lo->ldo_comp_cnt + comp_v1->lcm_entry_count; OBD_ALLOC_PTR_ARRAY(comp_array, array_cnt); - if (comp_array == NULL) + if (comp_array == NULL) { + mutex_unlock(&lo->ldo_layout_mutex); RETURN(-ENOMEM); + } + memcpy(comp_array, lo->ldo_comp_entries, sizeof(*comp_array) * lo->ldo_comp_cnt); @@ -2769,13 +2867,37 @@ static int lod_declare_layout_add(const struct lu_env *env, lod_comp->llc_stripe_offset = v1->lmm_stripe_offset; lod_comp->llc_flags = comp_v1->lcm_entries[i].lcme_flags; - lod_comp->llc_stripe_count = v1->lmm_stripe_count; lod_comp->llc_stripe_size = v1->lmm_stripe_size; + lod_comp->llc_stripe_count = v1->lmm_stripe_count; + lod_comp->llc_pattern = v1->lmm_pattern; + /** + * limit stripe count so that it's less than/equal to + * extent_size / stripe_size. + * + * Note: extension size reused llc_stripe_size field and + * uninstantiated component could be defined with + * extent_start == extent_end as extension component will + * expand it later. + */ + if (!(lod_comp->llc_flags & LCME_FL_EXTENSION) && + (lod_comp_inited(lod_comp) || + lod_comp->llc_extent.e_start < + lod_comp->llc_extent.e_end) && + lod_comp->llc_stripe_count != LOV_ALL_STRIPES && + ext->e_end != OBD_OBJECT_EOF && + (__u64)(lod_comp->llc_stripe_count * + lod_comp->llc_stripe_size) > + (ext->e_end - ext->e_start)) + lod_comp->llc_stripe_count = + DIV_ROUND_UP(ext->e_end - ext->e_start, + lod_comp->llc_stripe_size); lod_adjust_stripe_info(lod_comp, desc, 0); if (v1->lmm_magic == LOV_USER_MAGIC_V3) { - v3 = (struct lov_user_md_v3 *) v1; - if (v3->lmm_pool_name[0] != '\0') { + struct lov_user_md_v3 *v3 = (typeof(*v3) *) v1; + + if (v3->lmm_pool_name[0] != '\0' && + !lov_pool_is_ignored(v3->lmm_pool_name)) { rc = lod_set_pool(&lod_comp->llc_pool, v3->lmm_pool_name); if (rc) @@ -2807,6 +2929,8 @@ static int lod_declare_layout_add(const struct lu_env *env, LASSERT(lo->ldo_mirror_count == 1); lo->ldo_mirrors[0].lme_end = array_cnt - 1; + mutex_unlock(&lo->ldo_layout_mutex); + RETURN(0); error: @@ -2819,6 +2943,8 @@ error: } } OBD_FREE_PTR_ARRAY(comp_array, array_cnt); + mutex_unlock(&lo->ldo_layout_mutex); + RETURN(rc); } @@ -2914,6 +3040,7 @@ static int lod_declare_layout_set(const struct lu_env *env, RETURN(-EINVAL); } + mutex_lock(&lo->ldo_layout_mutex); for (i = 0; i < comp_v1->lcm_entry_count; i++) { __u32 id = comp_v1->lcm_entries[i].lcme_id; __u32 flags = comp_v1->lcm_entries[i].lcme_flags; @@ -2923,7 +3050,8 @@ static int lod_declare_layout_set(const struct lu_env *env, if (flags & LCME_FL_INIT) { if (changed) - lod_striping_free(env, lo); + lod_striping_free_nolock(env, lo); + mutex_unlock(&lo->ldo_layout_mutex); RETURN(-EINVAL); } @@ -2946,8 +3074,11 @@ static int lod_declare_layout_set(const struct lu_env *env, if (flags) { if ((flags & LCME_FL_STALE) && lod_last_non_stale_mirror(mirror_id, - lo)) + lo)) { + mutex_unlock( + &lo->ldo_layout_mutex); RETURN(-EUCLEAN); + } lod_comp->llc_flags |= flags; } if (mirror_flag) { @@ -2960,6 +3091,7 @@ static int lod_declare_layout_set(const struct lu_env *env, changed = true; } } + mutex_unlock(&lo->ldo_layout_mutex); if (!changed) { CDEBUG(D_LAYOUT, "%s: requested component(s) not found.\n", @@ -3042,9 +3174,13 @@ static int lod_declare_layout_del(const struct lu_env *env, flags = 0; } + mutex_lock(&lo->ldo_layout_mutex); + left = lo->ldo_comp_cnt; - if (left <= 0) + if (left <= 0) { + mutex_unlock(&lo->ldo_layout_mutex); RETURN(-EINVAL); + } for (i = (lo->ldo_comp_cnt - 1); i >= 0; i--) { struct lod_layout_component *lod_comp; @@ -3061,6 +3197,7 @@ static int lod_declare_layout_del(const struct lu_env *env, if (left != (i + 1)) { CDEBUG(D_LAYOUT, "%s: this deletion will create " "a hole.\n", lod2obd(d)->obd_name); + mutex_unlock(&lo->ldo_layout_mutex); RETURN(-EINVAL); } left--; @@ -3079,8 +3216,10 @@ static int lod_declare_layout_del(const struct lu_env *env, if (obj == NULL) continue; rc = lod_sub_declare_destroy(env, obj, th); - if (rc) + if (rc) { + mutex_unlock(&lo->ldo_layout_mutex); RETURN(rc); + } } } @@ -3088,9 +3227,12 @@ static int lod_declare_layout_del(const struct lu_env *env, if (left == lo->ldo_comp_cnt) { CDEBUG(D_LAYOUT, "%s: requested component id:%#x not found\n", lod2obd(d)->obd_name, id); + mutex_unlock(&lo->ldo_layout_mutex); RETURN(-EINVAL); } + mutex_unlock(&lo->ldo_layout_mutex); + memset(attr, 0, sizeof(*attr)); attr->la_valid = LA_SIZE; rc = lod_sub_declare_attr_set(env, next, attr, th); @@ -3240,16 +3382,18 @@ out: * Merge layouts to form a mirrored file. */ static int lod_declare_layout_merge(const struct lu_env *env, - struct dt_object *dt, const struct lu_buf *mbuf, - struct thandle *th) + struct dt_object *dt, + const struct lu_buf *mbuf, + struct thandle *th) { - struct lod_thread_info *info = lod_env_info(env); - struct lu_buf *buf = &info->lti_buf; - struct lod_object *lo = lod_dt_obj(dt); - struct lov_comp_md_v1 *lcm; - struct lov_comp_md_v1 *cur_lcm; - struct lov_comp_md_v1 *merge_lcm; - struct lov_comp_md_entry_v1 *lcme; + struct lod_thread_info *info = lod_env_info(env); + struct lu_attr *layout_attr = &info->lti_layout_attr; + struct lu_buf *buf = &info->lti_buf; + struct lod_object *lo = lod_dt_obj(dt); + struct lov_comp_md_v1 *lcm; + struct lov_comp_md_v1 *cur_lcm; + struct lov_comp_md_v1 *merge_lcm; + struct lov_comp_md_entry_v1 *lcme; struct lov_mds_md_v1 *lmm; size_t size = 0; size_t offset; @@ -3258,7 +3402,7 @@ static int lod_declare_layout_merge(const struct lu_env *env, __u32 id = 0; __u16 mirror_id = 0; __u32 mirror_count; - int rc, i; + int rc, i; bool merge_has_dom; ENTRY; @@ -3352,7 +3496,7 @@ static int lod_declare_layout_merge(const struct lu_env *env, /* check if first entry in new layout is DOM */ lmm = (struct lov_mds_md_v1 *)((char *)merge_lcm + merge_lcm->lcm_entries[0].lcme_offset); - merge_has_dom = lov_pattern(le32_to_cpu(lmm->lmm_pattern)) == + merge_has_dom = lov_pattern(le32_to_cpu(lmm->lmm_pattern)) & LOV_PATTERN_MDT; for (i = 0; i < merge_entry_count; i++) { @@ -3377,20 +3521,35 @@ static int lod_declare_layout_merge(const struct lu_env *env, } /* fixup layout information */ - lod_obj_inc_layout_gen(lo); - lcm->lcm_layout_gen = cpu_to_le32(lo->ldo_layout_gen); lcm->lcm_size = cpu_to_le32(size); lcm->lcm_entry_count = cpu_to_le16(cur_entry_count + merge_entry_count); lcm->lcm_mirror_count = cpu_to_le16(mirror_count); if ((le16_to_cpu(lcm->lcm_flags) & LCM_FL_FLR_MASK) == LCM_FL_NONE) lcm->lcm_flags = cpu_to_le32(LCM_FL_RDONLY); - rc = lod_striping_reload(env, lo, buf); + rc = lod_striping_reload(env, lo, buf, 0); if (rc) GOTO(out, rc); + lod_obj_inc_layout_gen(lo); + lcm->lcm_layout_gen = cpu_to_le32(lo->ldo_layout_gen); + + /* transfer layout version to OST objects. */ + if (lo->ldo_mirror_count > 1) { + struct lod_obj_stripe_cb_data data = { {0} }; + + layout_attr->la_valid = LA_LAYOUT_VERSION; + layout_attr->la_layout_version = 0; + data.locd_attr = layout_attr; + data.locd_declare = true; + data.locd_stripe_cb = lod_obj_stripe_attr_set_cb; + rc = lod_obj_for_each_stripe(env, lo, th, &data); + if (rc) + GOTO(out, rc); + } + rc = lod_sub_declare_xattr_set(env, dt_object_child(dt), buf, - XATTR_NAME_LOV, LU_XATTR_REPLACE, th); + XATTR_NAME_LOV, LU_XATTR_REPLACE, th); out: lu_buf_free(buf); @@ -3404,12 +3563,14 @@ static int lod_declare_layout_split(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *mbuf, struct thandle *th) { + struct lod_thread_info *info = lod_env_info(env); + struct lu_attr *layout_attr = &info->lti_layout_attr; struct lod_object *lo = lod_dt_obj(dt); struct lov_comp_md_v1 *lcm = mbuf->lb_buf; int rc; ENTRY; - rc = lod_striping_reload(env, lo, mbuf); + rc = lod_striping_reload(env, lo, mbuf, LVF_ALL_STALE); if (rc) RETURN(rc); @@ -3417,55 +3578,262 @@ static int lod_declare_layout_split(const struct lu_env *env, /* fix on-disk layout gen */ lcm->lcm_layout_gen = cpu_to_le32(lo->ldo_layout_gen); + /* transfer layout version to OST objects. */ + if (lo->ldo_mirror_count > 1) { + struct lod_obj_stripe_cb_data data = { {0} }; + + layout_attr->la_valid = LA_LAYOUT_VERSION; + layout_attr->la_layout_version = 0; + data.locd_attr = layout_attr; + data.locd_declare = true; + data.locd_stripe_cb = lod_obj_stripe_attr_set_cb; + rc = lod_obj_for_each_stripe(env, lo, th, &data); + if (rc) + RETURN(rc); + } + rc = lod_sub_declare_xattr_set(env, dt_object_child(dt), mbuf, XATTR_NAME_LOV, LU_XATTR_REPLACE, th); RETURN(rc); } -/** - * Implementation of dt_object_operations::do_declare_xattr_set. - * - * \see dt_object_operations::do_declare_xattr_set() in the API description - * for details. - * - * the extension to the API: - * - declaring LOVEA requests striping creation - * - LU_XATTR_REPLACE means layout swap - */ -static int lod_declare_xattr_set(const struct lu_env *env, - struct dt_object *dt, - const struct lu_buf *buf, - const char *name, int fl, - struct thandle *th) +static int lod_layout_declare_or_purge_mirror(const struct lu_env *env, + struct dt_object *dt, const struct lu_buf *buf, + struct thandle *th, bool declare) { - struct dt_object *next = dt_object_child(dt); - struct lu_attr *attr = &lod_env_info(env)->lti_attr; - __u32 mode; - int rc; + struct lod_thread_info *info = lod_env_info(env); + struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); + struct lod_object *lo = lod_dt_obj(dt); + struct lov_comp_md_v1 *comp_v1 = buf->lb_buf; + struct lov_comp_md_entry_v1 *entry; + struct lov_mds_md_v1 *lmm; + struct dt_object **sub_objs = NULL; + int rc = 0, i, k, array_count = 0; + ENTRY; - mode = dt->do_lu.lo_header->loh_attr & S_IFMT; - if ((S_ISREG(mode) || mode == 0) && - !(fl & (LU_XATTR_REPLACE | LU_XATTR_MERGE | LU_XATTR_SPLIT)) && - (strcmp(name, XATTR_NAME_LOV) == 0 || - strcmp(name, XATTR_LUSTRE_LOV) == 0)) { - /* - * this is a request to create object's striping. - * - * allow to declare predefined striping on a new (!mode) object - * which is supposed to be replay of regular file creation - * (when LOV setting is declared) - * - * LU_XATTR_REPLACE is set to indicate a layout swap - */ - if (dt_object_exists(dt)) { - rc = dt_attr_get(env, next, attr); - if (rc) - RETURN(rc); - } else { - memset(attr, 0, sizeof(*attr)); - attr->la_valid = LA_TYPE | LA_MODE; - attr->la_mode = S_IFREG; + /** + * other ops (like lod_declare_destroy) could destroying sub objects + * as well. + */ + mutex_lock(&lo->ldo_layout_mutex); + + if (!declare) { + /* prepare sub-objects array */ + for (i = 0; i < comp_v1->lcm_entry_count; i++) { + entry = &comp_v1->lcm_entries[i]; + + if (!(entry->lcme_flags & LCME_FL_INIT)) + continue; + + lmm = (struct lov_mds_md_v1 *) + ((char *)comp_v1 + entry->lcme_offset); + array_count += lmm->lmm_stripe_count; + } + OBD_ALLOC_PTR_ARRAY(sub_objs, array_count); + if (sub_objs == NULL) { + mutex_unlock(&lo->ldo_layout_mutex); + RETURN(-ENOMEM); + } + } + + k = 0; /* sub_objs index */ + for (i = 0; i < comp_v1->lcm_entry_count; i++) { + struct lov_ost_data_v1 *objs; + struct lu_object *o, *n; + struct dt_object *dto; + struct lu_device *nd; + struct lov_mds_md_v3 *v3; + __u32 idx; + int j; + + entry = &comp_v1->lcm_entries[i]; + + if (!(entry->lcme_flags & LCME_FL_INIT)) + continue; + + lmm = (struct lov_mds_md_v1 *) + ((char *)comp_v1 + entry->lcme_offset); + v3 = (struct lov_mds_md_v3 *)lmm; + if (lmm->lmm_magic == LOV_MAGIC_V3) + objs = &v3->lmm_objects[0]; + else + objs = &lmm->lmm_objects[0]; + + for (j = 0; j < lmm->lmm_stripe_count; j++) { + idx = objs[j].l_ost_idx; + rc = ostid_to_fid(&info->lti_fid, &objs[j].l_ost_oi, + idx); + if (rc) + GOTO(out, rc); + + if (!fid_is_sane(&info->lti_fid)) { + CERROR("%s: sub-object insane fid "DFID"\n", + lod2obd(d)->obd_name, + PFID(&info->lti_fid)); + GOTO(out, rc = -EINVAL); + } + + lod_getref(&d->lod_ost_descs); + + rc = validate_lod_and_idx(d, idx); + if (unlikely(rc)) { + lod_putref(d, &d->lod_ost_descs); + GOTO(out, rc); + } + + nd = &OST_TGT(d, idx)->ltd_tgt->dd_lu_dev; + lod_putref(d, &d->lod_ost_descs); + + o = lu_object_find_at(env, nd, &info->lti_fid, NULL); + if (IS_ERR(o)) + GOTO(out, rc = PTR_ERR(o)); + + n = lu_object_locate(o->lo_header, nd->ld_type); + if (unlikely(!n)) { + lu_object_put(env, n); + GOTO(out, rc = -ENOENT); + } + + dto = container_of(n, struct dt_object, do_lu); + + if (declare) { + rc = lod_sub_declare_destroy(env, dto, th); + dt_object_put(env, dto); + if (rc) + GOTO(out, rc); + } else { + /** + * collect to-be-destroyed sub objects, the + * reference would be released after actual + * deletion. + */ + sub_objs[k] = dto; + k++; + } + } /* for each stripe */ + } /* for each component in the mirror */ +out: + if (!declare) { + i = 0; + if (!rc) { + /* destroy the sub objects */ + for (; i < k; i++) { + rc = lod_sub_destroy(env, sub_objs[i], th); + if (rc) + break; + dt_object_put(env, sub_objs[i]); + } + } + /** + * if a sub object destroy failed, we'd release sub objects + * reference get from above sub_objs collection. + */ + for (; i < k; i++) + dt_object_put(env, sub_objs[i]); + + OBD_FREE_PTR_ARRAY(sub_objs, array_count); + } + mutex_unlock(&lo->ldo_layout_mutex); + + RETURN(rc); +} + +/** + * Purge layouts, delete sub objects in the mirror stored in the vic_buf, + * and set the LOVEA with the layout from mbuf. + */ +static int lod_declare_layout_purge(const struct lu_env *env, + struct dt_object *dt, const struct lu_buf *buf, + struct thandle *th) +{ + struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); + struct lov_comp_md_v1 *comp_v1 = buf->lb_buf; + int rc; + + ENTRY; + + if (le32_to_cpu(comp_v1->lcm_magic) != LOV_MAGIC_COMP_V1) { + CERROR("%s: invalid layout magic %#x != %#x\n", + lod2obd(d)->obd_name, le32_to_cpu(comp_v1->lcm_magic), + LOV_MAGIC_COMP_V1); + RETURN(-EINVAL); + } + + if (cpu_to_le32(LOV_MAGIC_COMP_V1) != LOV_MAGIC_COMP_V1) + lustre_swab_lov_comp_md_v1(comp_v1); + + /* from now on, @buf contains cpu endian data */ + + if (comp_v1->lcm_mirror_count != 0) { + CERROR("%s: can only purge one mirror from "DFID"\n", + lod2obd(d)->obd_name, PFID(lu_object_fid(&dt->do_lu))); + RETURN(-EINVAL); + } + + /* delcare sub objects deletion in the mirror stored in @buf */ + rc = lod_layout_declare_or_purge_mirror(env, dt, buf, th, true); + RETURN(rc); +} + +/* delete sub objects from the mirror stored in @buf */ +static int lod_layout_purge(const struct lu_env *env, struct dt_object *dt, + const struct lu_buf *buf, struct thandle *th) +{ + int rc; + + ENTRY; + rc = lod_layout_declare_or_purge_mirror(env, dt, buf, th, false); + RETURN(rc); +} + +/** + * Implementation of dt_object_operations::do_declare_xattr_set. + * + * \see dt_object_operations::do_declare_xattr_set() in the API description + * for details. + * + * the extension to the API: + * - declaring LOVEA requests striping creation + * - LU_XATTR_REPLACE means layout swap + */ +static int lod_declare_xattr_set(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + const char *name, int fl, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct dt_object *next = dt_object_child(dt); + struct lu_attr *attr = &info->lti_attr; + struct lod_object *lo = lod_dt_obj(dt); + __u32 mode; + int rc; + ENTRY; + + mode = dt->do_lu.lo_header->loh_attr & S_IFMT; + if ((S_ISREG(mode) || mode == 0) && + !(fl & (LU_XATTR_REPLACE | LU_XATTR_MERGE | LU_XATTR_SPLIT | + LU_XATTR_PURGE)) && + (strcmp(name, XATTR_NAME_LOV) == 0 || + strcmp(name, XATTR_LUSTRE_LOV) == 0)) { + /* + * this is a request to create object's striping. + * + * allow to declare predefined striping on a new (!mode) object + * which is supposed to be replay of regular file creation + * (when LOV setting is declared) + * + * LU_XATTR_REPLACE is set to indicate a layout swap + */ + if (dt_object_exists(dt)) { + rc = dt_attr_get(env, next, attr); + if (rc) + RETURN(rc); + } else { + memset(attr, 0, sizeof(*attr)); + attr->la_valid = LA_TYPE | LA_MODE; + attr->la_mode = S_IFREG; } rc = lod_declare_striped_create(env, dt, attr, buf, th); } else if (fl & LU_XATTR_MERGE) { @@ -3476,6 +3844,10 @@ static int lod_declare_xattr_set(const struct lu_env *env, LASSERT(strcmp(name, XATTR_NAME_LOV) == 0 || strcmp(name, XATTR_LUSTRE_LOV) == 0); rc = lod_declare_layout_split(env, dt, buf, th); + } else if (fl & LU_XATTR_PURGE) { + LASSERT(strcmp(name, XATTR_NAME_LOV) == 0 || + strcmp(name, XATTR_LUSTRE_LOV) == 0); + rc = lod_declare_layout_purge(env, dt, buf, th); } else if (S_ISREG(mode) && strlen(name) >= sizeof(XATTR_LUSTRE_LOV) + 3 && allowed_lustre_lov(name)) { @@ -3495,6 +3867,11 @@ static int lod_declare_xattr_set(const struct lu_env *env, rc = lod_sub_declare_xattr_set(env, next, buf, name, fl, th); } + if (rc == 0 && + (strcmp(name, XATTR_NAME_LOV) == 0 || + strcmp(name, XATTR_LUSTRE_LOV) == 0 || allowed_lustre_lov(name))) + rc = lod_save_layout_gen_intrans(info, lo); + RETURN(rc); } @@ -3569,10 +3946,11 @@ static int lod_xattr_del_internal(const struct lu_env *env, struct dt_object *dt, const char *name, struct thandle *th) { - struct dt_object *next = dt_object_child(dt); - struct lod_object *lo = lod_dt_obj(dt); - int rc; - int i; + struct dt_object *next = dt_object_child(dt); + struct lod_object *lo = lod_dt_obj(dt); + int i; + int rc; + ENTRY; rc = lod_sub_xattr_del(env, next, name, th); @@ -3583,7 +3961,11 @@ static int lod_xattr_del_internal(const struct lu_env *env, RETURN(rc); for (i = 0; i < lo->ldo_dir_stripe_count; i++) { - LASSERT(lo->ldo_stripe[i]); + if (!lo->ldo_stripe[i]) + continue; + + if (!dt_object_exists(lo->ldo_stripe[i])) + continue; rc = lod_sub_xattr_del(env, lo->ldo_stripe[i], name, th); if (rc != 0) @@ -3631,9 +4013,11 @@ static int lod_xattr_set_lov_on_dir(const struct lu_env *env, case LOV_USER_MAGIC_SPECIFIC: case LOV_USER_MAGIC_V3: v3 = buf->lb_buf; - if (v3->lmm_pool_name[0] != '\0') + if (lov_pool_is_reserved(v3->lmm_pool_name)) + memset(v3->lmm_pool_name, 0, sizeof(v3->lmm_pool_name)); + else if (v3->lmm_pool_name[0] != '\0') pool_name = v3->lmm_pool_name; - /* fall through */ + fallthrough; case LOV_USER_MAGIC_V1: /* if { size, offset, count } = { 0, -1, 0 } and no pool * (i.e. all default values specified) then delete default @@ -3684,6 +4068,192 @@ static int lod_xattr_set_lov_on_dir(const struct lu_env *env, RETURN(rc); } +static int lod_get_default_lov_striping(const struct lu_env *env, + struct lod_object *lo, + struct lod_default_striping *lds, + struct dt_allocation_hint *ah); + +/** + * Helper function to convert compound layout to compound layout with + * pool + * + * Copy lcm_entries array of \a src to \a tgt. Replace lov_user_md_v1 + * components of \a src with lov_user_md_v3 using \a pool. + * + * \param[in] src source layout + * \param[in] pool pool to use in \a tgt + * \param[out] tgt target layout + */ +static void embed_pool_to_comp_v1(const struct lov_comp_md_v1 *src, + const char *pool, + struct lov_comp_md_v1 *tgt) +{ + size_t shift; + struct lov_user_md_v1 *lum; + struct lov_user_md_v3 *lum3; + struct lov_comp_md_entry_v1 *entry; + int i; + __u32 offset; + + entry = tgt->lcm_entries; + shift = 0; + for (i = 0; i < le16_to_cpu(src->lcm_entry_count); i++, entry++) { + *entry = src->lcm_entries[i]; + offset = le32_to_cpu(src->lcm_entries[i].lcme_offset); + entry->lcme_offset = cpu_to_le32(offset + shift); + + lum = (struct lov_user_md_v1 *)((char *)src + offset); + lum3 = (struct lov_user_md_v3 *)((char *)tgt + offset + shift); + *(struct lov_user_md_v1 *)lum3 = *lum; + if (lum->lmm_pattern & cpu_to_le32(LOV_PATTERN_MDT)) { + lum3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V1); + } else { + lum3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3); + entry->lcme_size = cpu_to_le32(sizeof(*lum3)); + strscpy(lum3->lmm_pool_name, pool, + sizeof(lum3->lmm_pool_name)); + shift += sizeof(*lum3) - sizeof(*lum); + } + } +} + +/** + * Set default striping on a directory. + * + * Sets specified striping on a directory object unless it matches the default + * striping (LOVEA_DELETE_VALUES() macro). In the latter case remove existing + * EA. This striping will be used when regular file is being created in this + * directory. + * If current default striping includes a pool but specifed striping + * does not - retain the pool if it exists. + * + * \param[in] env execution environment + * \param[in] dt the striped object + * \param[in] buf buffer with the striping + * \param[in] name name of EA + * \param[in] fl xattr flag (see OSD API description) + * \param[in] th transaction handle + * + * \retval 0 on success + * \retval negative if failed + */ +static int lod_xattr_set_default_lov_on_dir(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + const char *name, int fl, + struct thandle *th) +{ + struct lod_default_striping *lds = lod_lds_buf_get(env); + struct lov_user_md_v1 *v1 = buf->lb_buf; + char pool[LOV_MAXPOOLNAME + 1]; + bool is_del; + int rc; + + ENTRY; + + /* get existing striping config */ + rc = lod_get_default_lov_striping(env, lod_dt_obj(dt), lds, NULL); + if (rc) + RETURN(rc); + + memset(pool, 0, sizeof(pool)); + if (lds->lds_def_striping_set == 1) + lod_layout_get_pool(lds->lds_def_comp_entries, + lds->lds_def_comp_cnt, pool, + sizeof(pool)); + + is_del = LOVEA_DELETE_VALUES(v1->lmm_stripe_size, + v1->lmm_stripe_count, + v1->lmm_stripe_offset, + NULL); + + /* Retain the pool name if it is not given */ + if (v1->lmm_magic == LOV_USER_MAGIC_V1 && pool[0] != '\0' && + !is_del) { + struct lod_thread_info *info = lod_env_info(env); + struct lov_user_md_v3 *v3 = info->lti_ea_store; + + memset(v3, 0, sizeof(*v3)); + v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3); + v3->lmm_pattern = cpu_to_le32(v1->lmm_pattern); + v3->lmm_stripe_count = cpu_to_le32(v1->lmm_stripe_count); + v3->lmm_stripe_offset = cpu_to_le32(v1->lmm_stripe_offset); + v3->lmm_stripe_size = cpu_to_le32(v1->lmm_stripe_size); + + strscpy(v3->lmm_pool_name, pool, sizeof(v3->lmm_pool_name)); + + info->lti_buf.lb_buf = v3; + info->lti_buf.lb_len = sizeof(*v3); + rc = lod_xattr_set_lov_on_dir(env, dt, &info->lti_buf, + name, fl, th); + } else if (v1->lmm_magic == LOV_USER_MAGIC_COMP_V1 && + pool[0] != '\0' && !is_del) { + /* + * try to retain the pool from default layout if the + * specified component layout does not provide pool + * info explicitly + */ + struct lod_thread_info *info = lod_env_info(env); + struct lov_comp_md_v1 *comp_v1 = buf->lb_buf; + struct lov_comp_md_v1 *comp_v1p; + struct lov_user_md_v1 *lum; + int entry_count; + int i; + __u32 offset; + struct lov_comp_md_entry_v1 *entry; + int size; + + entry_count = le16_to_cpu(comp_v1->lcm_entry_count); + size = sizeof(*comp_v1) + + entry_count * sizeof(comp_v1->lcm_entries[0]); + entry = comp_v1->lcm_entries; + for (i = 0; i < entry_count; i++, entry++) { + offset = le32_to_cpu(entry->lcme_offset); + lum = (struct lov_user_md_v1 *)((char *)comp_v1 + + offset); + if (le32_to_cpu(lum->lmm_magic) != LOV_USER_MAGIC_V1) + /* the i-th component includes pool info */ + break; + if (lum->lmm_pattern & cpu_to_le32(LOV_PATTERN_MDT)) + size += sizeof(struct lov_user_md_v1); + else + size += sizeof(struct lov_user_md_v3); + } + + if (i == entry_count) { + /* + * re-compose the layout to include the pool for + * each component + */ + if (info->lti_ea_store_size < size) + rc = lod_ea_store_resize(info, size); + + if (rc == 0) { + comp_v1p = info->lti_ea_store; + *comp_v1p = *comp_v1; + comp_v1p->lcm_size = cpu_to_le32(size); + embed_pool_to_comp_v1(comp_v1, pool, comp_v1p); + + info->lti_buf.lb_buf = comp_v1p; + info->lti_buf.lb_len = size; + rc = lod_xattr_set_lov_on_dir(env, dt, + &info->lti_buf, + name, fl, th); + } + } else { + rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, + th); + } + } else { + rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, th); + } + + if (lds->lds_def_striping_set == 1 && lds->lds_def_comp_entries != NULL) + lod_free_def_comp_entries(lds); + + RETURN(rc); +} + /** * Set default striping on a directory object. * @@ -3748,7 +4318,7 @@ static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env, * * \param[in] env execution environment * \param[in] dt the striped object - * \param[in] buf not used currently + * \param[in] buf buf lmv_user_md for create, or lmv_mds_md for replay * \param[in] name not used currently * \param[in] fl xattr flag (see OSD API description) * \param[in] th transaction handle @@ -3760,26 +4330,29 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, const char *name, int fl, struct thandle *th) { - struct lod_object *lo = lod_dt_obj(dt); - struct lod_thread_info *info = lod_env_info(env); - struct lu_attr *attr = &info->lti_attr; + struct lod_object *lo = lod_dt_obj(dt); + struct lod_thread_info *info = lod_env_info(env); + struct lu_attr *attr = &info->lti_attr; struct dt_object_format *dof = &info->lti_format; - struct lu_buf lmv_buf; - struct lu_buf slave_lmv_buf; - struct lmv_mds_md_v1 *lmm; - struct lmv_mds_md_v1 *slave_lmm = NULL; - struct dt_insert_rec *rec = &info->lti_dt_rec; - int i; - int rc; - ENTRY; + struct lu_buf lmv_buf; + struct lu_buf slave_lmv_buf; + struct lmv_user_md *lum = buf->lb_buf; + struct lmv_mds_md_v1 *lmm; + struct lmv_mds_md_v1 *slave_lmm = NULL; + struct dt_insert_rec *rec = &info->lti_dt_rec; + int i; + int rc; + ENTRY; + /* lum is used to know whether it's replay */ + LASSERT(lum); if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) RETURN(-ENOTDIR); /* The stripes are supposed to be allocated in declare phase, * if there are no stripes being allocated, it will skip */ if (lo->ldo_dir_stripe_count == 0) { - if (lo->ldo_dir_is_foreign) { + if (lo->ldo_is_foreign) { rc = lod_sub_xattr_set(env, dt_object_child(dt), buf, XATTR_NAME_LMV, fl, th); if (rc != 0) @@ -3792,8 +4365,8 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, if (rc != 0) RETURN(rc); - attr->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | LA_FLAGS | - LA_MODE | LA_UID | LA_GID | LA_TYPE | LA_PROJID; + attr->la_valid &= LA_ATIME | LA_MTIME | LA_CTIME | LA_FLAGS | + LA_MODE | LA_UID | LA_GID | LA_TYPE | LA_PROJID; dof->dof_type = DFT_DIR; rc = lod_prep_lmv_md(env, dt, &lmv_buf); @@ -3816,15 +4389,25 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, struct lu_name *sname; struct linkea_data ldata = { NULL }; struct lu_buf linkea_buf; + bool stripe_created = false; /* OBD_FAIL_MDS_STRIPE_FID may leave stripe uninitialized */ if (!dto) continue; /* fail a remote stripe creation */ - if (i && OBD_FAIL_CHECK(OBD_FAIL_MDS_STRIPE_CREATE)) + if (i && CFS_FAIL_CHECK(OBD_FAIL_MDS_STRIPE_CREATE)) continue; + /* if it's replay by client request, and stripe exists on remote + * MDT, it means mkdir was partially executed: stripe was + * created on remote MDT successfully, but target not in last + * run. + */ + if (unlikely((le32_to_cpu(lum->lum_magic) == LMV_MAGIC_V1) && + dt_object_exists(dto) && dt_object_remote(dto))) + stripe_created = true; + /* don't create stripe if: * 1. it's source stripe of migrating directory * 2. it's existed stripe of splitting directory @@ -3833,7 +4416,7 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, (lod_is_splitting(lo) && i < lo->ldo_dir_split_offset)) { if (!dt_object_exists(dto)) GOTO(out, rc = -EINVAL); - } else { + } else if (!stripe_created) { dt_write_lock(env, dto, DT_TGT_CHILD); rc = lod_sub_create(env, dto, attr, NULL, dof, th); if (rc != 0) { @@ -3854,9 +4437,9 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, GOTO(out, rc); } - if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) || + if (!CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) || cfs_fail_val != i) { - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_LMV) && + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_LMV) && cfs_fail_val == i) slave_lmm->lmv_master_mdt_index = cpu_to_le32(i + 1); @@ -3879,13 +4462,7 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, lo->ldo_dir_split_offset > i) continue; - rec->rec_fid = lu_object_fid(&dt->do_lu); - rc = lod_sub_insert(env, dto, (struct dt_rec *)rec, - (const struct dt_key *)dotdot, th); - if (rc != 0) - GOTO(out, rc); - - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && cfs_fail_val == i) snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", PFID(lu_object_fid(&dto->do_lu)), i + 1); @@ -3893,18 +4470,27 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", PFID(lu_object_fid(&dto->do_lu)), i); - sname = lod_name_get(env, stripe_name, strlen(stripe_name)); - rc = linkea_links_new(&ldata, &info->lti_linkea_buf, - sname, lu_object_fid(&dt->do_lu)); - if (rc != 0) - GOTO(out, rc); + if (!stripe_created) { + rec->rec_fid = lu_object_fid(&dt->do_lu); + rc = lod_sub_insert(env, dto, (struct dt_rec *)rec, + (const struct dt_key *)dotdot, th); + if (rc != 0) + GOTO(out, rc); - linkea_buf.lb_buf = ldata.ld_buf->lb_buf; - linkea_buf.lb_len = ldata.ld_leh->leh_len; - rc = lod_sub_xattr_set(env, dto, &linkea_buf, - XATTR_NAME_LINK, 0, th); - if (rc != 0) - GOTO(out, rc); + sname = lod_name_get(env, stripe_name, + strlen(stripe_name)); + rc = linkea_links_new(&ldata, &info->lti_linkea_buf, + sname, lu_object_fid(&dt->do_lu)); + if (rc != 0) + GOTO(out, rc); + + linkea_buf.lb_buf = ldata.ld_buf->lb_buf; + linkea_buf.lb_len = ldata.ld_leh->leh_len; + rc = lod_sub_xattr_set(env, dto, &linkea_buf, + XATTR_NAME_LINK, 0, th); + if (rc != 0) + GOTO(out, rc); + } rec->rec_fid = lu_object_fid(&dto->do_lu); rc = lod_sub_insert(env, dt_object_child(dt), @@ -3918,7 +4504,7 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, GOTO(out, rc); } - if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MASTER_LMV)) + if (!CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MASTER_LMV)) rc = lod_sub_xattr_set(env, dt_object_child(dt), &lmv_buf, XATTR_NAME_LMV, fl, th); out: @@ -3968,10 +4554,12 @@ static int lod_dir_striping_create_internal(const struct lu_env *env, LASSERT(ergo(lds != NULL, lds->lds_def_striping_set || lds->lds_dir_def_striping_set)); + LASSERT(lmu); if (!LMVEA_DELETE_VALUES(lo->ldo_dir_stripe_count, lo->ldo_dir_stripe_offset)) { - if (!lmu) { + if (!lmu->lb_buf) { + /* mkdir by default LMV */ struct lmv_user_md_v1 *v1 = info->lti_ea_store; int stripe_count = lo->ldo_dir_stripe_count; @@ -4001,30 +4589,29 @@ static int lod_dir_striping_create_internal(const struct lu_env *env, th); if (rc != 0) RETURN(rc); - } else { + } else if (lmu->lb_buf) { /* foreign LMV EA case */ - if (lmu) { + if (declare) { struct lmv_foreign_md *lfm = lmu->lb_buf; - if (lfm->lfm_magic == LMV_MAGIC_FOREIGN) { + if (lfm->lfm_magic == LMV_MAGIC_FOREIGN) rc = lod_declare_xattr_set_lmv(env, dt, attr, lmu, dof, th); - } - } else { - if (lo->ldo_dir_is_foreign) { - LASSERT(lo->ldo_foreign_lmv != NULL && - lo->ldo_foreign_lmv_size > 0); - info->lti_buf.lb_buf = lo->ldo_foreign_lmv; - info->lti_buf.lb_len = lo->ldo_foreign_lmv_size; - lmu = &info->lti_buf; - rc = lod_xattr_set_lmv(env, dt, lmu, - XATTR_NAME_LMV, 0, th); - } + } else if (lo->ldo_is_foreign) { + LASSERT(lo->ldo_foreign_lmv != NULL && + lo->ldo_foreign_lmv_size > 0); + info->lti_buf.lb_buf = lo->ldo_foreign_lmv; + info->lti_buf.lb_len = lo->ldo_foreign_lmv_size; + lmu = &info->lti_buf; + rc = lod_xattr_set_lmv(env, dt, lmu, XATTR_NAME_LMV, 0, + th); } } /* Transfer default LMV striping from the parent */ if (lds != NULL && lds->lds_dir_def_striping_set && + lds->lds_dir_def_max_inherit != LMV_INHERIT_END && + lds->lds_dir_def_max_inherit != LMV_INHERIT_NONE && !(LMVEA_DELETE_VALUES(lds->lds_dir_def_stripe_count, lds->lds_dir_def_stripe_offset) && le32_to_cpu(lds->lds_dir_def_hash_type) != @@ -4046,6 +4633,10 @@ static int lod_dir_striping_create_internal(const struct lu_env *env, cpu_to_le32(lds->lds_dir_def_stripe_offset); v1->lum_hash_type = cpu_to_le32(lds->lds_dir_def_hash_type); + v1->lum_max_inherit = + lmv_inherit_next(lds->lds_dir_def_max_inherit); + v1->lum_max_inherit_rr = + lmv_inherit_rr_next(lds->lds_dir_def_max_inherit_rr); info->lti_buf.lb_buf = v1; info->lti_buf.lb_len = sizeof(*v1); @@ -4092,6 +4683,12 @@ static int lod_dir_striping_create_internal(const struct lu_env *env, RETURN(rc); } + /* ldo_def_striping is not allocated, clear after use, in case directory + * layout is changed later. + */ + if (!declare) + lo->ldo_def_striping = NULL; + RETURN(0); } @@ -4109,10 +4706,11 @@ static int lod_declare_dir_striping_create(const struct lu_env *env, static int lod_dir_striping_create(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, + const struct lu_buf *lmu, struct dt_object_format *dof, struct thandle *th) { - return lod_dir_striping_create_internal(env, dt, attr, NULL, dof, th, + return lod_dir_striping_create_internal(env, dt, attr, lmu, dof, th, false); } @@ -4145,7 +4743,7 @@ static int lod_generate_and_set_lovea(const struct lu_env *env, LASSERT(lo); if (lo->ldo_comp_cnt == 0 && !lo->ldo_is_foreign) { - lod_striping_free(env, lo); + lod_striping_free_nolock(env, lo); rc = lod_sub_xattr_del(env, next, XATTR_NAME_LOV, th); RETURN(rc); } @@ -4344,6 +4942,9 @@ static int lod_layout_del_prep_layout(const struct lu_env *env, continue; } + if (lod_comp->llc_magic == LOV_MAGIC_FOREIGN) + continue; + lod_obj_set_pool(lo, i, NULL); if (lod_comp->llc_ostlist.op_array) { OBD_FREE(lod_comp->llc_ostlist.op_array, @@ -4441,6 +5042,8 @@ static int lod_layout_del(const struct lu_env *env, struct dt_object *dt, LASSERT(lo->ldo_mirror_count == 1); + mutex_lock(&lo->ldo_layout_mutex); + rc = lod_layout_del_prep_layout(env, lo, th); if (rc < 0) GOTO(out, rc); @@ -4468,15 +5071,14 @@ static int lod_layout_del(const struct lu_env *env, struct dt_object *dt, EXIT; out: if (rc) - lod_striping_free(env, lo); + lod_striping_free_nolock(env, lo); + + mutex_unlock(&lo->ldo_layout_mutex); + return rc; } -static int lod_get_default_lov_striping(const struct lu_env *env, - struct lod_object *lo, - struct lod_default_striping *lds, - struct dt_allocation_hint *ah); /** * Implementation of dt_object_operations::do_xattr_set. * @@ -4499,8 +5101,12 @@ static int lod_xattr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, const char *name, int fl, struct thandle *th) { + struct lod_thread_info *info = lod_env_info(env); struct dt_object *next = dt_object_child(dt); - int rc; + struct lu_attr *layout_attr = &info->lti_layout_attr; + struct lod_object *lo = lod_dt_obj(dt); + struct lod_obj_stripe_cb_data data = { {0} }; + int rc = 0; ENTRY; @@ -4508,7 +5114,8 @@ static int lod_xattr_set(const struct lu_env *env, !strcmp(name, XATTR_NAME_LMV)) { switch (fl) { case LU_XATTR_CREATE: - rc = lod_dir_striping_create(env, dt, NULL, NULL, th); + rc = lod_dir_striping_create(env, dt, NULL, buf, NULL, + th); break; case 0: case LU_XATTR_REPLACE: @@ -4521,59 +5128,8 @@ static int lod_xattr_set(const struct lu_env *env, RETURN(rc); } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && strcmp(name, XATTR_NAME_LOV) == 0) { - struct lod_default_striping *lds = lod_lds_buf_get(env); - struct lov_user_md_v1 *v1 = buf->lb_buf; - char pool[LOV_MAXPOOLNAME + 1]; - bool is_del; - - /* get existing striping config */ - rc = lod_get_default_lov_striping(env, lod_dt_obj(dt), lds, - NULL); - if (rc) - RETURN(rc); - - memset(pool, 0, sizeof(pool)); - if (lds->lds_def_striping_set == 1) - lod_layout_get_pool(lds->lds_def_comp_entries, - lds->lds_def_comp_cnt, pool, - sizeof(pool)); - - is_del = LOVEA_DELETE_VALUES(v1->lmm_stripe_size, - v1->lmm_stripe_count, - v1->lmm_stripe_offset, - NULL); - - /* Retain the pool name if it is not given */ - if (v1->lmm_magic == LOV_USER_MAGIC_V1 && pool[0] != '\0' && - !is_del) { - struct lod_thread_info *info = lod_env_info(env); - struct lov_user_md_v3 *v3 = info->lti_ea_store; - - memset(v3, 0, sizeof(*v3)); - v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3); - v3->lmm_pattern = cpu_to_le32(v1->lmm_pattern); - v3->lmm_stripe_count = - cpu_to_le32(v1->lmm_stripe_count); - v3->lmm_stripe_offset = - cpu_to_le32(v1->lmm_stripe_offset); - v3->lmm_stripe_size = cpu_to_le32(v1->lmm_stripe_size); - - strlcpy(v3->lmm_pool_name, pool, - sizeof(v3->lmm_pool_name)); - - info->lti_buf.lb_buf = v3; - info->lti_buf.lb_len = sizeof(*v3); - rc = lod_xattr_set_lov_on_dir(env, dt, &info->lti_buf, - name, fl, th); - } else { - rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, - fl, th); - } - - if (lds->lds_def_striping_set == 1 && - lds->lds_def_comp_entries != NULL) - lod_free_def_comp_entries(lds); - + rc = lod_xattr_set_default_lov_on_dir(env, dt, buf, name, fl, + th); RETURN(rc); } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { @@ -4585,6 +5141,17 @@ static int lod_xattr_set(const struct lu_env *env, (strcmp(name, XATTR_NAME_LOV) == 0 || strcmp(name, XATTR_LUSTRE_LOV) == 0 || allowed_lustre_lov(name))) { + /* layout has been changed by others in the transaction */ + rc = lod_check_layout_gen_intrans(info, lo); + if (rc > 0) { + CDEBUG(D_LAYOUT, + "%s: obj "DFID" gen changed from %d to %d in transaction, retry the transaction\n", + dt->do_lu.lo_dev->ld_obd->obd_name, + PFID(lu_object_fid(&dt->do_lu)), + info->lti_gen[rc - 1], lo->ldo_layout_gen); + RETURN(-EAGAIN); + } + /* in case of lov EA swap, just set it * if not, it is a replay so check striping match what we * already have during req replay, declare_xattr_set() @@ -4594,6 +5161,31 @@ static int lod_xattr_set(const struct lu_env *env, lod_striping_free(env, lod_dt_obj(dt)); rc = lod_sub_xattr_set(env, next, buf, name, fl, th); + } else if (fl & LU_XATTR_SPLIT) { + rc = lod_sub_xattr_set(env, next, buf, name, fl, th); + if (rc) + RETURN(rc); + + rc = lod_striping_reload(env, lo, buf, LVF_ALL_STALE); + if (rc) + RETURN(rc); + + if (lo->ldo_mirror_count > 1 && + layout_attr->la_valid & LA_LAYOUT_VERSION) { + /* mirror split */ + layout_attr->la_layout_version = + lo->ldo_layout_gen; + data.locd_attr = layout_attr; + data.locd_declare = false; + data.locd_stripe_cb = + lod_obj_stripe_attr_set_cb; + rc = lod_obj_for_each_stripe(env, lo, th, + &data); + if (rc) + RETURN(rc); + } + } else if (fl & LU_XATTR_PURGE) { + rc = lod_layout_purge(env, dt, buf, th); } else if (dt_object_remote(dt)) { /* This only happens during migration, see * mdd_migrate_create(), in which Master MDT will @@ -4609,7 +5201,7 @@ static int lod_xattr_set(const struct lu_env *env, } else { /* * When 'name' is XATTR_LUSTRE_LOV or XATTR_NAME_LOV, - * it's going to create create file with specified + * it's going to create file with specified * component(s), the striping must have not being * cached in this case; * @@ -4617,11 +5209,29 @@ static int lod_xattr_set(const struct lu_env *env, * an existing file, the striping must have been cached * in this case. */ - LASSERT(equi(!strcmp(name, XATTR_LUSTRE_LOV) || - !strcmp(name, XATTR_NAME_LOV), - !lod_dt_obj(dt)->ldo_comp_cached)); + if (!(fl & LU_XATTR_MERGE)) + LASSERT(equi(!strcmp(name, XATTR_LUSTRE_LOV) || + !strcmp(name, XATTR_NAME_LOV), + !lod_dt_obj(dt)->ldo_comp_cached)); rc = lod_striped_create(env, dt, NULL, NULL, th); + if (rc) + RETURN(rc); + + if (fl & LU_XATTR_MERGE && lo->ldo_mirror_count > 1 && + layout_attr->la_valid & LA_LAYOUT_VERSION) { + /* mirror merge exec phase */ + layout_attr->la_layout_version = + lo->ldo_layout_gen; + data.locd_attr = layout_attr; + data.locd_declare = false; + data.locd_stripe_cb = + lod_obj_stripe_attr_set_cb; + rc = lod_obj_for_each_stripe(env, lo, th, + &data); + if (rc) + RETURN(rc); + } } RETURN(rc); } else if (strcmp(name, XATTR_NAME_FID) == 0) { @@ -4679,6 +5289,9 @@ static int lod_declare_xattr_del(const struct lu_env *env, if (!dto) continue; + if (!dt_object_exists(dto)) + continue; + rc = lod_sub_declare_xattr_del(env, dto, name, th); if (rc != 0) break; @@ -4698,35 +5311,14 @@ static int lod_declare_xattr_del(const struct lu_env *env, static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt, const char *name, struct thandle *th) { - struct dt_object *next = dt_object_child(dt); - struct lod_object *lo = lod_dt_obj(dt); - int rc; - int i; + int rc; + ENTRY; if (!strcmp(name, XATTR_NAME_LOV) || !strcmp(name, XATTR_NAME_LMV)) lod_striping_free(env, lod_dt_obj(dt)); - rc = lod_sub_xattr_del(env, next, name, th); - if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr)) - RETURN(rc); - - if (!strcmp(name, XATTR_NAME_LMV)) - RETURN(0); - - if (lo->ldo_dir_stripe_count == 0) - RETURN(0); - - for (i = 0; i < lo->ldo_dir_stripe_count; i++) { - struct dt_object *dto = lo->ldo_stripe[i]; - - if (!dto) - continue; - - rc = lod_sub_xattr_del(env, dto, name, th); - if (rc != 0) - break; - } + rc = lod_xattr_del_internal(env, dt, name, th); RETURN(rc); } @@ -4808,19 +5400,27 @@ skip: static int lod_get_default_lov_striping(const struct lu_env *env, struct lod_object *lo, struct lod_default_striping *lds, - struct dt_allocation_hint *ah) + struct dt_allocation_hint *dah) { struct lod_thread_info *info = lod_env_info(env); struct lov_user_md_v1 *v1 = NULL; struct lov_user_md_v3 *v3 = NULL; - struct lov_comp_md_v1 *comp_v1 = NULL; - __u16 comp_cnt; - __u16 mirror_cnt; - bool composite; + struct lov_comp_md_v1 *lcm = NULL; + __u32 magic; + int append_stripe_count = dah != NULL ? dah->dah_append_stripe_count : 0; + const char *append_pool = (dah != NULL && + dah->dah_append_pool != NULL && + dah->dah_append_pool[0] != '\0') ? + dah->dah_append_pool : NULL; + __u16 entry_count = 1; + __u16 mirror_count = 0; + bool want_composite = false; int rc, i, j; ENTRY; + lds->lds_def_striping_set = 0; + rc = lod_get_lov_ea(env, lo); if (rc < 0) RETURN(rc); @@ -4828,116 +5428,133 @@ static int lod_get_default_lov_striping(const struct lu_env *env, if (rc < (typeof(rc))sizeof(struct lov_user_md)) RETURN(0); - v1 = info->lti_ea_store; - if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1)) { - lustre_swab_lov_user_md_v1(v1); - } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3)) { - v3 = (struct lov_user_md_v3 *)v1; - lustre_swab_lov_user_md_v3(v3); - } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_SPECIFIC)) { - v3 = (struct lov_user_md_v3 *)v1; + magic = *(__u32 *)info->lti_ea_store; + if (magic == __swab32(LOV_USER_MAGIC_V1)) { + lustre_swab_lov_user_md_v1(info->lti_ea_store); + } else if (magic == __swab32(LOV_USER_MAGIC_V3)) { + lustre_swab_lov_user_md_v3(info->lti_ea_store); + } else if (magic == __swab32(LOV_USER_MAGIC_SPECIFIC)) { + v3 = (struct lov_user_md_v3 *)info->lti_ea_store; lustre_swab_lov_user_md_v3(v3); lustre_swab_lov_user_md_objects(v3->lmm_objects, v3->lmm_stripe_count); - } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_COMP_V1) || - v1->lmm_magic == __swab32(LOV_USER_MAGIC_SEL)) { - comp_v1 = (struct lov_comp_md_v1 *)v1; - lustre_swab_lov_comp_md_v1(comp_v1); + } else if (magic == __swab32(LOV_USER_MAGIC_COMP_V1) || + magic == __swab32(LOV_USER_MAGIC_SEL)) { + lustre_swab_lov_comp_md_v1(info->lti_ea_store); } - if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1 && - v1->lmm_magic != LOV_MAGIC_COMP_V1 && - v1->lmm_magic != LOV_MAGIC_SEL && - v1->lmm_magic != LOV_USER_MAGIC_SPECIFIC) + switch (magic) { + case LOV_MAGIC_V1: + case LOV_MAGIC_V3: + case LOV_USER_MAGIC_SPECIFIC: + v1 = info->lti_ea_store; + break; + case LOV_MAGIC_COMP_V1: + case LOV_MAGIC_SEL: + lcm = info->lti_ea_store; + entry_count = lcm->lcm_entry_count; + if (entry_count == 0) + RETURN(-EINVAL); + + mirror_count = lcm->lcm_mirror_count + 1; + want_composite = true; + break; + default: RETURN(-ENOTSUPP); + } - if ((v1->lmm_magic == LOV_MAGIC_COMP_V1 || - v1->lmm_magic == LOV_MAGIC_SEL) && - !(ah && ah->dah_append_stripes)) { - comp_v1 = (struct lov_comp_md_v1 *)v1; - comp_cnt = comp_v1->lcm_entry_count; - if (comp_cnt == 0) - RETURN(-EINVAL); - mirror_cnt = comp_v1->lcm_mirror_count + 1; - composite = true; - } else { - comp_cnt = 1; - mirror_cnt = 0; - composite = false; + if (append_stripe_count != 0 || append_pool != NULL) { + entry_count = 1; + mirror_count = 0; + want_composite = false; } /* realloc default comp entries if necessary */ - rc = lod_def_striping_comp_resize(lds, comp_cnt); + rc = lod_def_striping_comp_resize(lds, entry_count); if (rc < 0) RETURN(rc); - lds->lds_def_comp_cnt = comp_cnt; - lds->lds_def_striping_is_composite = composite; - lds->lds_def_mirror_cnt = mirror_cnt; + lds->lds_def_comp_cnt = entry_count; + lds->lds_def_striping_is_composite = want_composite; + lds->lds_def_mirror_cnt = mirror_count; - for (i = 0; i < comp_cnt; i++) { - struct lod_layout_component *lod_comp; - char *pool; + for (i = 0; i < entry_count; i++) { + struct lod_layout_component *llc = &lds->lds_def_comp_entries[i]; + const char *pool; - lod_comp = &lds->lds_def_comp_entries[i]; /* - * reset lod_comp values, llc_stripes is always NULL in - * the default striping template, llc_pool will be reset - * later below. + * reset llc values, llc_stripes is always NULL in the + * default striping template, llc_pool will be reset + * later below using lod_set_pool(). + * + * XXX At this point llc_pool may point to valid (!) + * kmalloced strings from previous RPCs. */ - memset(lod_comp, 0, offsetof(typeof(*lod_comp), llc_pool)); - - if (composite) { - v1 = (struct lov_user_md *)((char *)comp_v1 + - comp_v1->lcm_entries[i].lcme_offset); - lod_comp->llc_extent = - comp_v1->lcm_entries[i].lcme_extent; - /* We only inherit certain flags from the layout */ - lod_comp->llc_flags = - comp_v1->lcm_entries[i].lcme_flags & + memset(llc, 0, offsetof(typeof(*llc), llc_pool)); + + if (lcm != NULL) { + v1 = (struct lov_user_md *)((char *)lcm + + lcm->lcm_entries[i].lcme_offset); + + if (want_composite) { + llc->llc_extent = lcm->lcm_entries[i].lcme_extent; + /* We only inherit certain flags from the layout */ + llc->llc_flags = lcm->lcm_entries[i].lcme_flags & LCME_TEMPLATE_FLAGS; + } } + CDEBUG(D_LAYOUT, DFID" magic = %#08x, pattern = %#x, stripe_count = %hu, stripe_size = %u, stripe_offset = %hu, append_pool = '%s', append_stripe_count = %d\n", + PFID(lu_object_fid(&lo->ldo_obj.do_lu)), + v1->lmm_magic, + v1->lmm_pattern, + v1->lmm_stripe_count, + v1->lmm_stripe_size, + v1->lmm_stripe_offset, + append_pool ?: "", + append_stripe_count); + if (!lov_pattern_supported(v1->lmm_pattern) && !(v1->lmm_pattern & LOV_PATTERN_F_RELEASED)) { lod_free_def_comp_entries(lds); RETURN(-EINVAL); } - CDEBUG(D_LAYOUT, DFID" stripe_count=%d stripe_size=%d stripe_offset=%d append_stripes=%d\n", - PFID(lu_object_fid(&lo->ldo_obj.do_lu)), - (int)v1->lmm_stripe_count, (int)v1->lmm_stripe_size, - (int)v1->lmm_stripe_offset, - ah ? ah->dah_append_stripes : 0); + llc->llc_stripe_count = v1->lmm_stripe_count; + llc->llc_stripe_size = v1->lmm_stripe_size; + llc->llc_stripe_offset = v1->lmm_stripe_offset; + llc->llc_pattern = v1->lmm_pattern; - if (ah && ah->dah_append_stripes) - lod_comp->llc_stripe_count = ah->dah_append_stripes; - else - lod_comp->llc_stripe_count = v1->lmm_stripe_count; - lod_comp->llc_stripe_size = v1->lmm_stripe_size; - lod_comp->llc_stripe_offset = v1->lmm_stripe_offset; - lod_comp->llc_pattern = v1->lmm_pattern; + if (append_stripe_count != 0 || append_pool != NULL) + llc->llc_pattern = LOV_PATTERN_RAID0; + + if (append_stripe_count != 0) + llc->llc_stripe_count = append_stripe_count; pool = NULL; - if (ah && ah->dah_append_pool && ah->dah_append_pool[0]) { - pool = ah->dah_append_pool; + if (append_pool != NULL) { + pool = append_pool; } else if (v1->lmm_magic == LOV_USER_MAGIC_V3) { /* XXX: sanity check here */ - v3 = (struct lov_user_md_v3 *) v1; + v3 = (struct lov_user_md_v3 *)v1; if (v3->lmm_pool_name[0] != '\0') pool = v3->lmm_pool_name; } - lod_set_def_pool(lds, i, pool); - if (v1->lmm_magic == LOV_USER_MAGIC_SPECIFIC) { + + lod_set_pool(&llc->llc_pool, pool); + + if (v1->lmm_magic == LOV_USER_MAGIC_SPECIFIC && + append_stripe_count == 0 && + append_pool == NULL) { v3 = (struct lov_user_md_v3 *)v1; - rc = lod_comp_copy_ost_lists(lod_comp, v3); + rc = lod_comp_copy_ost_lists(llc, v3); if (rc) RETURN(rc); - } else if (lod_comp->llc_ostlist.op_array && - lod_comp->llc_ostlist.op_count) { - for (j = 0; j < lod_comp->llc_ostlist.op_count; j++) - lod_comp->llc_ostlist.op_array[j] = -1; - lod_comp->llc_ostlist.op_count = 0; + } else if (llc->llc_ostlist.op_array && + llc->llc_ostlist.op_count) { + for (j = 0; j < llc->llc_ostlist.op_count; j++) + llc->llc_ostlist.op_array[j] = -1; + llc->llc_ostlist.op_count = 0; } } @@ -4945,6 +5562,17 @@ static int lod_get_default_lov_striping(const struct lu_env *env, RETURN(rc); } +static inline void lod_lum2lds(struct lod_default_striping *lds, + const struct lmv_user_md *lum) +{ + lds->lds_dir_def_stripe_count = le32_to_cpu(lum->lum_stripe_count); + lds->lds_dir_def_stripe_offset = le32_to_cpu(lum->lum_stripe_offset); + lds->lds_dir_def_hash_type = le32_to_cpu(lum->lum_hash_type); + lds->lds_dir_def_max_inherit = lum->lum_max_inherit; + lds->lds_dir_def_max_inherit_rr = lum->lum_max_inherit_rr; + lds->lds_dir_def_striping_set = 1; +} + /** * Get default directory striping. * @@ -4972,14 +5600,7 @@ static int lod_get_default_lmv_striping(const struct lu_env *env, struct lod_thread_info *info = lod_env_info(env); lmu = info->lti_ea_store; - - lds->lds_dir_def_stripe_count = - le32_to_cpu(lmu->lum_stripe_count); - lds->lds_dir_def_stripe_offset = - le32_to_cpu(lmu->lum_stripe_offset); - lds->lds_dir_def_hash_type = - le32_to_cpu(lmu->lum_hash_type); - lds->lds_dir_def_striping_set = 1; + lod_lum2lds(lds, lmu); } return 0; @@ -4999,14 +5620,30 @@ static int lod_get_default_lmv_striping(const struct lu_env *env, */ static int lod_get_default_striping(const struct lu_env *env, struct lod_object *lo, + struct dt_allocation_hint *ah, struct lod_default_striping *lds) { int rc, rc1; rc = lod_get_default_lov_striping(env, lo, lds, NULL); - rc1 = lod_get_default_lmv_striping(env, lo, lds); - if (rc == 0 && rc1 < 0) - rc = rc1; + if (lds->lds_def_striping_set) { + struct lod_thread_info *info = lod_env_info(env); + struct lod_device *d = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); + + rc = lod_verify_striping(env, d, lo, &info->lti_buf, false); + if (rc) + lds->lds_def_striping_set = 0; + } + + if (ah->dah_eadata_is_dmv) { + lod_lum2lds(lds, ah->dah_eadata); + } else if (ah->dah_dmv_imp_inherit) { + lds->lds_dir_def_striping_set = 0; + } else { + rc1 = lod_get_default_lmv_striping(env, lo, lds); + if (rc == 0 && rc1 < 0) + rc = rc1; + } return rc; } @@ -5046,8 +5683,9 @@ static void lod_striping_from_default(struct lod_object *lo, struct lod_layout_component *def_comp = &lds->lds_def_comp_entries[i]; - CDEBUG(D_LAYOUT, "Inherit from default: flags=%#x " - "size=%hu nr=%u offset=%u pattern=%#x pool=%s\n", + CDEBUG(D_LAYOUT, + "inherit "DFID" file layout from default: flags=%#x size=%u nr=%u offset=%u pattern=%#x pool=%s\n", + PFID(lu_object_fid(&lo->ldo_obj.do_lu)), def_comp->llc_flags, def_comp->llc_stripe_size, def_comp->llc_stripe_count, @@ -5096,18 +5734,19 @@ static void lod_striping_from_default(struct lod_object *lo, if (lo->ldo_dir_stripe_offset == -1) lo->ldo_dir_stripe_offset = lds->lds_dir_def_stripe_offset; - if (lo->ldo_dir_hash_type == 0) + if (lo->ldo_dir_hash_type == LMV_HASH_TYPE_UNKNOWN) lo->ldo_dir_hash_type = lds->lds_dir_def_hash_type; - CDEBUG(D_LAYOUT, "striping from default dir: count:%hu, " - "offset:%u, hash_type:%u\n", + CDEBUG(D_LAYOUT, + "inherit "DFID" dir layout from default: count=%hu offset=%u hash_type=%x\n", + PFID(lu_object_fid(&lo->ldo_obj.do_lu)), lo->ldo_dir_stripe_count, lo->ldo_dir_stripe_offset, lo->ldo_dir_hash_type); } } static inline bool lod_need_inherit_more(struct lod_object *lo, bool from_root, - char *append_pool) + const char *append_pool) { struct lod_layout_component *lod_comp; @@ -5139,8 +5778,8 @@ static inline bool lod_need_inherit_more(struct lod_object *lo, bool from_root, * This method is used to make a decision on the striping configuration for the * object being created. It can be taken from the \a parent object if it exists, * or filesystem's default. The resulting configuration (number of stripes, - * stripe size/offset, pool name, etc) is stored in the object itself and will - * be used by the methods like ->doo_declare_create(). + * stripe size/offset, pool name, hash_type, etc.) is stored in the object + * itself and will be used by the methods like ->doo_declare_create(). * * \see dt_object_operations::do_ah_init() in the API description for details. */ @@ -5164,8 +5803,8 @@ static void lod_ah_init(const struct lu_env *env, LASSERT(child); - if (ah->dah_append_stripes == -1) - ah->dah_append_stripes = + if (ah->dah_append_stripe_count == -1) + ah->dah_append_stripe_count = d->lod_ost_descs.ltd_lov_desc.ld_tgt_count; if (likely(parent)) { @@ -5187,43 +5826,31 @@ static void lod_ah_init(const struct lu_env *env, if (S_ISDIR(child_mode)) { const struct lmv_user_md_v1 *lum1 = ah->dah_eadata; + int max_stripe_count; /* other default values are 0 */ - lc->ldo_dir_stripe_offset = -1; + lc->ldo_dir_stripe_offset = LMV_OFFSET_DEFAULT; /* no default striping configuration is needed for * foreign dirs */ if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0 && le32_to_cpu(lum1->lum_magic) == LMV_MAGIC_FOREIGN) { - lc->ldo_dir_is_foreign = true; + lc->ldo_is_foreign = true; /* keep stripe_count 0 and stripe_offset -1 */ CDEBUG(D_INFO, "no default striping for foreign dir\n"); RETURN_EXIT; } - /* - * If parent object is not root directory, - * then get default striping from parent object. - */ - if (likely(lp != NULL)) { - lod_get_default_striping(env, lp, lds); - - /* inherit default striping except ROOT */ - if ((lds->lds_def_striping_set || - lds->lds_dir_def_striping_set) && - !fid_is_root(lod_object_fid(lp))) - lc->ldo_def_striping = lds; - } + if (likely(lp != NULL)) + lod_get_default_striping(env, lp, ah, lds); /* It should always honour the specified stripes */ - /* Note: old client (< 2.7)might also do lfs mkdir, whose EA - * will have old magic. In this case, we should ignore the - * stripe count and try to create dir by default stripe. - */ - if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0 && + if (ah->dah_eadata && ah->dah_eadata_len && + !ah->dah_eadata_is_dmv && (le32_to_cpu(lum1->lum_magic) == LMV_USER_MAGIC || - le32_to_cpu(lum1->lum_magic) == LMV_USER_MAGIC_SPECIFIC)) { + le32_to_cpu(lum1->lum_magic) == LMV_USER_MAGIC_SPECIFIC || + le32_to_cpu(lum1->lum_magic) == LMV_MAGIC_V1)) { lc->ldo_dir_stripe_count = le32_to_cpu(lum1->lum_stripe_count); lc->ldo_dir_stripe_offset = @@ -5231,32 +5858,111 @@ static void lod_ah_init(const struct lu_env *env, lc->ldo_dir_hash_type = le32_to_cpu(lum1->lum_hash_type); CDEBUG(D_INFO, - "set dirstripe: count %hu, offset %d, hash %u\n", + "set dirstripe: count %hu, offset %d, hash %x\n", lc->ldo_dir_stripe_count, (int)lc->ldo_dir_stripe_offset, lc->ldo_dir_hash_type); + + if (d->lod_mdt_descs.ltd_lmv_desc.ld_active_tgt_count && + lc->ldo_dir_stripe_count < 2 && + lum1->lum_max_inherit != LMV_INHERIT_NONE) { + /* when filesystem-wide default LMV is set, dirs + * will be created on MDT by space usage, but if + * dir is created with "lfs mkdir -c 1 ...", its + * subdirs should be kept on the same MDT. To + * guarantee this, set default LMV for such dir. + */ + lds->lds_dir_def_stripe_count = + le32_to_cpu(lum1->lum_stripe_count); + /* if "-1" stripe offset is set, save current + * MDT index in default LMV. + */ + if (le32_to_cpu(lum1->lum_stripe_offset) == + LMV_OFFSET_DEFAULT) + lds->lds_dir_def_stripe_offset = + lod2lu_dev(d)->ld_site->ld_seq_site->ss_node_id; + else + lds->lds_dir_def_stripe_offset = + le32_to_cpu(lum1->lum_stripe_offset); + lds->lds_dir_def_hash_type = + le32_to_cpu(lum1->lum_hash_type); + lds->lds_dir_def_max_inherit = + lum1->lum_max_inherit; + /* it will be decreased by 1 later in setting */ + if (lum1->lum_max_inherit >= LMV_INHERIT_END && + lum1->lum_max_inherit < LMV_INHERIT_MAX) + lds->lds_dir_def_max_inherit++; + lds->lds_dir_def_max_inherit_rr = + lum1->lum_max_inherit_rr; + lds->lds_dir_def_striping_set = 1; + /* don't inherit LOV from ROOT */ + if (lds->lds_def_striping_set && + fid_is_root(lod_object_fid(lp))) + lds->lds_def_striping_set = 0; + lc->ldo_def_striping = lds; + } else if (lds->lds_def_striping_set && + !fid_is_root(lod_object_fid(lp))) { + /* don't inherit default LMV for "lfs mkdir" */ + lds->lds_dir_def_striping_set = 0; + lc->ldo_def_striping = lds; + } } else { + /* inherit default striping except ROOT */ + if ((lds->lds_def_striping_set || + lds->lds_dir_def_striping_set) && + !fid_is_root(lod_object_fid(lp))) + lc->ldo_def_striping = lds; + /* transfer defaults LMV to new directory */ lod_striping_from_default(lc, lds, child_mode); /* set count 0 to create normal directory */ if (lc->ldo_dir_stripe_count == 1) lc->ldo_dir_stripe_count = 0; + + /* do not save default LMV on server */ + if (ah->dah_dmv_imp_inherit) { + lds->lds_dir_def_striping_set = 0; + if (!lds->lds_def_striping_set) + lc->ldo_def_striping = NULL; + } } - /* shrink the stripe_count to the avaible MDT count */ - if (lc->ldo_dir_stripe_count > d->lod_remote_mdt_count + 1 && - !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE)) { - lc->ldo_dir_stripe_count = d->lod_remote_mdt_count + 1; + /* shrink the stripe count to max_mdt_stripecount if it is -1 + * and max_mdt_stripecount is not 0 + */ + if (lc->ldo_dir_stripe_count == (__u16)(-1) && + d->lod_max_mdt_stripecount) + lc->ldo_dir_stripe_count = d->lod_max_mdt_stripecount; + + max_stripe_count = d->lod_remote_mdt_count + 1; + if (lc->ldo_dir_hash_type & LMV_HASH_FLAG_OVERSTRIPED) + max_stripe_count = + max_stripe_count * LMV_MAX_STRIPES_PER_MDT; + + /* shrink the stripe_count to max stripe count */ + if (lc->ldo_dir_stripe_count > max_stripe_count && + !CFS_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE)) { + lc->ldo_dir_stripe_count = max_stripe_count; if (lc->ldo_dir_stripe_count == 1) lc->ldo_dir_stripe_count = 0; } - if (!(lc->ldo_dir_hash_type & LMV_HASH_TYPE_MASK)) - lc->ldo_dir_hash_type |= + if (!lmv_is_known_hash_type(lc->ldo_dir_hash_type)) + lc->ldo_dir_hash_type = + (lc->ldo_dir_hash_type & LMV_HASH_FLAG_KNOWN) | d->lod_mdt_descs.ltd_lmv_desc.ld_pattern; - CDEBUG(D_INFO, "final dir stripe [%hu %d %u]\n", + /* make sure all fscrypt metadata stays on same mdt */ + if (child->do_lu.lo_header->loh_attr & LOHA_FSCRYPT_MD) { + lc->ldo_dir_stripe_count = 0; + lds->lds_dir_def_stripe_offset = + lod2lu_dev(d)->ld_site->ld_seq_site->ss_node_id; + lds->lds_dir_def_striping_set = 1; + lc->ldo_def_striping = lds; + } + + CDEBUG(D_INFO, "final dir stripe_count=%hu offset=%d hash=%x\n", lc->ldo_dir_stripe_count, (int)lc->ldo_dir_stripe_offset, lc->ldo_dir_hash_type); @@ -5277,8 +5983,12 @@ static void lod_ah_init(const struct lu_env *env, */ if (likely(lp != NULL)) { rc = lod_get_default_lov_striping(env, lp, lds, ah); - if (rc == 0) - lod_striping_from_default(lc, lds, child_mode); + if (rc == 0 && lds->lds_def_striping_set) { + rc = lod_verify_striping(env, d, lp, &info->lti_buf, + false); + if (rc == 0) + lod_striping_from_default(lc, lds, child_mode); + } } /* Initialize lod_device::lod_md_root object reference */ @@ -5308,8 +6018,14 @@ static void lod_ah_init(const struct lu_env *env, lod_need_inherit_more(lc, true, ah->dah_append_pool)) { rc = lod_get_default_lov_striping(env, d->lod_md_root, lds, ah); + if (rc || !lds->lds_def_striping_set) + goto out; + + rc = lod_verify_striping(env, d, d->lod_md_root, &info->lti_buf, + false); if (rc) goto out; + if (lc->ldo_comp_cnt == 0) { lod_striping_from_default(lc, lds, child_mode); } else if (!lds->lds_def_striping_is_composite) { @@ -5330,7 +6046,7 @@ static void lod_ah_init(const struct lu_env *env, lod_comp->llc_stripe_offset = def_comp->llc_stripe_offset; if (lod_comp->llc_pool == NULL) - lod_obj_set_pool(lc, 0, def_comp->llc_pool); + lod_qos_set_pool(lc, 0, def_comp->llc_pool); } } out: @@ -5352,9 +6068,10 @@ out: LASSERT(!lc->ldo_is_composite); lod_comp = &lc->ldo_comp_entries[0]; desc = &d->lod_ost_descs.ltd_lov_desc; - lod_adjust_stripe_info(lod_comp, desc, ah->dah_append_stripes); + lod_adjust_stripe_info(lod_comp, desc, + ah->dah_append_stripe_count); if (ah->dah_append_pool && ah->dah_append_pool[0]) - lod_obj_set_pool(lc, 0, ah->dah_append_pool); + lod_qos_set_pool(lc, 0, ah->dah_append_pool); } EXIT; @@ -5471,7 +6188,7 @@ int lod_declare_striped_create(const struct lu_env *env, struct dt_object *dt, int rc; ENTRY; - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) + if (CFS_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) GOTO(out, rc = -ENOMEM); if (!dt_object_remote(next)) { @@ -5604,7 +6321,6 @@ static int lod_declare_create(const struct lu_env *env, struct dt_object *dt, } else if (dof->dof_type == DFT_DIR) { struct seq_server_site *ss; struct lu_buf buf = { NULL }; - struct lu_buf *lmu = NULL; ss = lu_site2seq(dt->do_lu.lo_dev->ld_site); @@ -5618,27 +6334,11 @@ static int lod_declare_create(const struct lu_env *env, struct dt_object *dt, * striped directory with specified stripeEA, then it * should ignore the default stripeEA */ if (hint != NULL && hint->dah_eadata == NULL) { - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STALE_DIR_LAYOUT)) + if (CFS_FAIL_CHECK(OBD_FAIL_MDS_STALE_DIR_LAYOUT)) GOTO(out, rc = -EREMOTE); - if (lo->ldo_dir_stripe_offset == LMV_OFFSET_DEFAULT) { - struct lod_default_striping *lds; - - lds = lo->ldo_def_striping; - /* - * child and parent should be on the same MDT, - * but if parent has default LMV, and the start - * MDT offset is -1, it's allowed. This check - * is not necessary after 2.12.22 because client - * follows this already, but old client may not. - */ - if (hint->dah_parent && - dt_object_remote(hint->dah_parent) && lds && - lds->lds_dir_def_stripe_offset != - LMV_OFFSET_DEFAULT) - GOTO(out, rc = -EREMOTE); - } else if (lo->ldo_dir_stripe_offset != - ss->ss_node_id) { + if (lo->ldo_dir_stripe_offset != LMV_OFFSET_DEFAULT && + lo->ldo_dir_stripe_offset != ss->ss_node_id) { struct lod_device *lod; struct lu_tgt_desc *mdt = NULL; bool found_mdt = false; @@ -5662,12 +6362,11 @@ static int lod_declare_create(const struct lu_env *env, struct dt_object *dt, GOTO(out, rc = -EINVAL); } } else if (hint && hint->dah_eadata) { - lmu = &buf; - lmu->lb_buf = (void *)hint->dah_eadata; - lmu->lb_len = hint->dah_eadata_len; + buf.lb_buf = (void *)hint->dah_eadata; + buf.lb_len = hint->dah_eadata_len; } - rc = lod_declare_dir_striping_create(env, dt, attr, lmu, dof, + rc = lod_declare_dir_striping_create(env, dt, attr, &buf, dof, th); } out: @@ -5715,10 +6414,10 @@ again: if (i == lo->ldo_comp_cnt) RETURN(pflr_id(mirror_id, id)); } - if (end == LCME_ID_MAX) { + + if (end == SEQ_ID_MAX) { + end = min_t(__u32, start, SEQ_ID_MAX) - 1; start = 1; - end = min(lo->ldo_layout_gen & LCME_ID_MASK, - (__u32)(LCME_ID_MAX - 1)); goto again; } @@ -5754,6 +6453,8 @@ int lod_striped_create(const struct lu_env *env, struct dt_object *dt, int rc = 0, i, j; ENTRY; + mutex_lock(&lo->ldo_layout_mutex); + LASSERT((lo->ldo_comp_cnt != 0 && lo->ldo_comp_entries != NULL) || lo->ldo_is_foreign); @@ -5788,10 +6489,15 @@ int lod_striped_create(const struct lu_env *env, struct dt_object *dt, if (lod_comp_inited(lod_comp)) continue; + if (lod_comp->llc_magic == LOV_MAGIC_FOREIGN) { + lod_comp_set_init(lod_comp); + continue; + } + if (lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED) lod_comp_set_init(lod_comp); - if (lov_pattern(lod_comp->llc_pattern) == LOV_PATTERN_MDT) + if (lov_pattern(lod_comp->llc_pattern) & LOV_PATTERN_MDT) lod_comp_set_init(lod_comp); if (lod_comp->llc_stripe == NULL) @@ -5812,15 +6518,20 @@ int lod_striped_create(const struct lu_env *env, struct dt_object *dt, if (rc) GOTO(out, rc); + lo->ldo_comp_cached = 1; + rc = lod_generate_and_set_lovea(env, lo, th); if (rc) GOTO(out, rc); - lo->ldo_comp_cached = 1; + mutex_unlock(&lo->ldo_layout_mutex); + RETURN(0); out: - lod_striping_free(env, lo); + lod_striping_free_nolock(env, lo); + mutex_unlock(&lo->ldo_layout_mutex); + RETURN(rc); } @@ -5837,7 +6548,7 @@ static inline bool lod_obj_is_dom(struct dt_object *dt) if (!lo->ldo_comp_cnt) return false; - return (lov_pattern(lo->ldo_comp_entries[0].llc_pattern) == + return (lov_pattern(lo->ldo_comp_entries[0].llc_pattern) & LOV_PATTERN_MDT); } @@ -5880,11 +6591,12 @@ lod_obj_stripe_destroy_cb(const struct lu_env *env, struct lod_object *lo, { if (data->locd_declare) return lod_sub_declare_destroy(env, dt, th); - else if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) || - stripe_idx == cfs_fail_val) + + if (!CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) || + stripe_idx == cfs_fail_val) return lod_sub_destroy(env, dt, th); - else - return 0; + + return 0; } /** @@ -5952,8 +6664,8 @@ static int lod_declare_destroy(const struct lu_env *env, struct dt_object *dt, if (rc) RETURN(rc); - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ) || - OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ2)) + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ) || + CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ2)) RETURN(0); if (!lod_obj_is_striped(dt)) @@ -6044,8 +6756,8 @@ static int lod_destroy(const struct lu_env *env, struct dt_object *dt, if (rc != 0) RETURN(rc); - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ) || - OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ2)) + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ) || + CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ2)) RETURN(0); if (!lod_obj_is_striped(dt)) @@ -6061,7 +6773,7 @@ static int lod_destroy(const struct lu_env *env, struct dt_object *dt, if (!dt_object_exists(stripe)) continue; - if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) || + if (!CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) || i == cfs_fail_val) { dt_write_lock(env, stripe, DT_TGT_CHILD); rc = lod_sub_ref_del(env, stripe, th); @@ -6260,10 +6972,6 @@ static int lod_object_lock(const struct lu_env *env, ldlm_completion_callback completion = einfo->ei_cb_cp; __u64 dlmflags = LDLM_FL_ATOMIC_CB; - if (einfo->ei_mode == LCK_PW || - einfo->ei_mode == LCK_EX) - dlmflags |= LDLM_FL_COS_INCOMPAT; - LASSERT(ns != NULL); rc = ldlm_cli_enqueue_local(env, ns, res_id, LDLM_IBITS, policy, einfo->ei_mode, @@ -6405,9 +7113,11 @@ static bool lod_sel_osts_allowed(const struct lu_env *env, if (sfs->os_state & OS_STATFS_ENOSPC || sfs->os_state & OS_STATFS_READONLY || + sfs->os_state & OS_STATFS_NOCREATE || sfs->os_state & OS_STATFS_DEGRADED) { - CDEBUG(D_LAYOUT, "ost %d is not availble for SEL " - "extension, state %u\n", index, sfs->os_state); + CDEBUG(D_LAYOUT, + "OST%04x unusable for SEL extension, state %x\n", + index, sfs->os_state); ret = false; break; } @@ -6811,8 +7521,13 @@ static int lod_declare_update_extents(const struct lu_env *env, ENTRY; /* This makes us work on the components of the chosen mirror */ - start_index = lo->ldo_mirrors[pick].lme_start; - max_comp = lo->ldo_mirrors[pick].lme_end + 1; + if (lo->ldo_mirrors) { + start_index = lo->ldo_mirrors[pick].lme_start; + max_comp = lo->ldo_mirrors[pick].lme_end + 1; + } else { + start_index = 0; + max_comp = lo->ldo_comp_cnt; + } if (lo->ldo_flr_state == LCM_FL_NONE) LASSERT(start_index == 0 && max_comp == lo->ldo_comp_cnt); @@ -6841,12 +7556,14 @@ static int lod_declare_update_extents(const struct lu_env *env, /* We may have added or removed components. If so, we must update the * start & ends of all the mirrors after the current one, and the end * of the current mirror. */ - change = max_comp - 1 - lo->ldo_mirrors[pick].lme_end; - if (change) { - lo->ldo_mirrors[pick].lme_end += change; - for (i = pick + 1; i < lo->ldo_mirror_count; i++) { - lo->ldo_mirrors[i].lme_start += change; - lo->ldo_mirrors[i].lme_end += change; + if (lo->ldo_mirrors) { + change = max_comp - 1 - lo->ldo_mirrors[pick].lme_end; + if (change) { + lo->ldo_mirrors[pick].lme_end += change; + for (i = pick + 1; i < lo->ldo_mirror_count; i++) { + lo->ldo_mirrors[i].lme_start += change; + lo->ldo_mirrors[i].lme_end += change; + } } } @@ -6861,7 +7578,10 @@ out: /* If striping is already instantiated or INIT'ed DOM? */ static bool lod_is_instantiation_needed(struct lod_layout_component *comp) { - return !(((lov_pattern(comp->llc_pattern) == LOV_PATTERN_MDT) && + if (comp->llc_magic == LOV_MAGIC_FOREIGN) + return false; + + return !(((lov_pattern(comp->llc_pattern) & LOV_PATTERN_MDT) && lod_comp_inited(comp)) || comp->llc_stripe); } @@ -6938,22 +7658,22 @@ static int lod_declare_update_plain(const struct lu_env *env, lod_comp = &lo->ldo_comp_entries[lo->ldo_comp_cnt - 1]; if (lo->ldo_comp_cnt > 1 && lod_comp->llc_extent.e_end != OBD_OBJECT_EOF && - lod_comp->llc_extent.e_end < layout->li_extent.e_end) { + lod_comp->llc_extent.e_end < layout->lai_extent.e_end) { CDEBUG_LIMIT(replay ? D_ERROR : D_LAYOUT, "%s: the defined layout [0, %#llx) does not " "covers the write range "DEXT"\n", lod2obd(d)->obd_name, lod_comp->llc_extent.e_end, - PEXT(&layout->li_extent)); + PEXT(&layout->lai_extent)); GOTO(out, rc = -EINVAL); } CDEBUG(D_LAYOUT, "%s: "DFID": update components "DEXT"\n", lod2obd(d)->obd_name, PFID(lod_object_fid(lo)), - PEXT(&layout->li_extent)); + PEXT(&layout->lai_extent)); if (!replay) { - rc = lod_declare_update_extents(env, lo, &layout->li_extent, - th, 0, layout->li_opc == LAYOUT_INTENT_WRITE); + rc = lod_declare_update_extents(env, lo, &layout->lai_extent, + th, 0, layout->lai_opc == LAYOUT_INTENT_WRITE); if (rc < 0) GOTO(out, rc); else if (rc) @@ -6967,7 +7687,7 @@ static int lod_declare_update_plain(const struct lu_env *env, for (i = 0; i < lo->ldo_comp_cnt; i++) { lod_comp = &lo->ldo_comp_entries[i]; - if (lod_comp->llc_extent.e_start >= layout->li_extent.e_end) + if (lod_comp->llc_extent.e_start >= layout->lai_extent.e_end) break; if (!replay) { @@ -7052,6 +7772,7 @@ restart: for (i = 0; i < lo->ldo_mirror_count; i++) { if (i == primary) continue; + rc = lod_declare_update_extents(env, lo, &pri_extent, th, i, 0); /* if update_extents changed the layout, it may have @@ -7075,6 +7796,8 @@ restart: lod_comp->llc_flags |= LCME_FL_STALE; lo->ldo_mirrors[i].lme_stale = 1; + if (lod_is_hsm(lod_comp)) + lod_comp->llc_foreign_flags |= HS_DIRTY; } } } @@ -7112,12 +7835,9 @@ static inline int lod_check_ost_avail(const struct lu_env *env, } ost = OST_TGT(lod, idx); - if (ost->ltd_statfs.os_state & - (OS_STATFS_READONLY | OS_STATFS_ENOSPC | OS_STATFS_ENOINO | - OS_STATFS_NOPRECREATE) || - ost->ltd_active == 0) { - CDEBUG(D_LAYOUT, DFID ": mirror %d OST%d unavail, rc = %d\n", - PFID(lod_object_fid(lo)), index, idx, rc); + if (ost->ltd_active == 0) { + CDEBUG(D_LAYOUT, DFID ": mirror %d OST%d unavail\n", + PFID(lod_object_fid(lo)), index, idx); return 0; } @@ -7140,7 +7860,7 @@ static int lod_primary_pick(const struct lu_env *env, struct lod_object *lo, int picked = -1, second_pick = -1, third_pick = -1; ENTRY; - if (OBD_FAIL_CHECK(OBD_FAIL_FLR_RANDOM_PICK_MIRROR)) { + if (CFS_FAIL_CHECK(OBD_FAIL_FLR_RANDOM_PICK_MIRROR)) { get_random_bytes(&seq, sizeof(seq)); seq %= lo->ldo_mirror_count; } @@ -7152,6 +7872,11 @@ static int lod_primary_pick(const struct lu_env *env, struct lod_object *lo, * cluster. */ lod_qos_statfs_update(env, lod, &lod->lod_ost_descs); + + rc = lod_fill_mirrors(lo); + if (rc) + RETURN(rc); + for (i = 0; i < lo->ldo_mirror_count; i++) { bool ost_avail = true; int index = (i + seq) % lo->ldo_mirror_count; @@ -7163,7 +7888,7 @@ static int lod_primary_pick(const struct lu_env *env, struct lod_object *lo, } /* 2nd pick is for the primary mirror containing unavail OST */ - if (lo->ldo_mirrors[index].lme_primary && second_pick < 0) + if (lo->ldo_mirrors[index].lme_prefer && second_pick < 0) second_pick = index; /* 3rd pick is for non-primary mirror containing unavail OST */ @@ -7174,7 +7899,7 @@ static int lod_primary_pick(const struct lu_env *env, struct lod_object *lo, * we found a non-primary 1st pick, we'd like to find a * potential pirmary mirror. */ - if (picked >= 0 && !lo->ldo_mirrors[index].lme_primary) + if (picked >= 0 && !lo->ldo_mirrors[index].lme_prefer) continue; /* check the availability of OSTs */ @@ -7211,7 +7936,7 @@ static int lod_primary_pick(const struct lu_env *env, struct lod_object *lo, * primary with all OSTs are available, this is the perfect * 1st pick. */ - if (lo->ldo_mirrors[index].lme_primary) + if (lo->ldo_mirrors[index].lme_prefer) break; } /* for all mirrors */ @@ -7297,6 +8022,254 @@ static int lod_prepare_resync(const struct lu_env *env, struct lod_object *lo, return need_sync ? 0 : -EALREADY; } +static struct lod_layout_component * +lod_locate_comp_hsm(struct lod_object *lo, int *hsm_mirror_id) +{ + struct lod_layout_component *lod_comp = NULL; + int i; + + if (!lo->ldo_is_composite) + return NULL; + + for (i = 0; i < lo->ldo_mirror_count; i++) { + /* + * FIXME: In the current design, there is only one HSM + * mirror component in range [0, EOF] for a FLR file. This + * should be fixed to support multiple HSM mirror components + * with different HSM backend types and partial file ranges + * in the future. + */ + if (lo->ldo_mirrors[i].lme_hsm) { + __u16 start_idx; + __u16 end_idx; + + if (hsm_mirror_id) + *hsm_mirror_id = i; + start_idx = lo->ldo_mirrors[i].lme_start; + end_idx = lo->ldo_mirrors[i].lme_end; + LASSERT(start_idx == end_idx); + lod_comp = &lo->ldo_comp_entries[start_idx]; + LASSERT(lo->ldo_is_composite && lod_is_hsm(lod_comp) && + lod_comp->llc_extent.e_start == 0 && + lod_comp->llc_extent.e_end == LUSTRE_EOF); + break; + } + } + + return lod_comp; +} + +static int lod_declare_pccro_set(const struct lu_env *env, + struct dt_object *dt, struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lu_buf *buf = &info->lti_buf; + struct lod_object *lo = lod_dt_obj(dt); + struct lod_layout_component *lod_comp; + struct lod_layout_component *comp_array; + struct lod_mirror_entry *mirror_array; + __u16 mirror_id; + int hsm_mirror_id; + int mirror_cnt; + int new_cnt; + int rc; + int i; + + ENTRY; + + rc = lod_striping_load(env, lo); + if (rc) + RETURN(rc); + + if (lo->ldo_flr_state & LCM_FL_PCC_RDONLY) + RETURN(-EALREADY); + + rc = lod_layout_data_init(info, lo->ldo_comp_cnt); + if (rc) + RETURN(rc); + + lod_comp = lod_locate_comp_hsm(lo, &hsm_mirror_id); + if (lod_comp) { + if (lod_comp->llc_foreign_flags & HS_PCCRO) { + CDEBUG(D_LAYOUT, "bad HSM flags: %#x\n", + lod_comp->llc_foreign_flags); + RETURN(-EINVAL); + } + + lod_obj_inc_layout_gen(lo); + lod_comp->llc_foreign_flags |= HS_PCCRO; + lod_comp->llc_foreign_flags &= ~HS_DIRTY; + lod_comp->llc_flags &= ~LCME_FL_STALE; + lo->ldo_mirrors[hsm_mirror_id].lme_stale = 0; + lo->ldo_flr_state |= LCM_FL_PCC_RDONLY; + buf->lb_len = lod_comp_md_size(lo, false); + rc = lod_sub_declare_xattr_set(env, lod_object_child(lo), + buf, XATTR_NAME_LOV, 0, th); + RETURN(rc); + } + + /* + * Create an new composite layout with only one HSM component. + * Field @lhm_archive_uuid is used to be the identifier within HSM + * backend for the archive copy. In the PCC case with a POSIX archive, + * This can just be the original inode FID. This is important because + * the inode FID may change due to layout swaps or migration to a new + * MDT, and we do not want that to cause problems with finding the copy + * in HSM/PCC. + */ + mirror_cnt = lo->ldo_mirror_count + 1; + if (!lo->ldo_is_composite) { + LASSERT(lo->ldo_mirror_count == 0); + mirror_cnt++; + } + + OBD_ALLOC_PTR_ARRAY(mirror_array, mirror_cnt); + if (mirror_array == NULL) + RETURN(-ENOMEM); + + new_cnt = lo->ldo_comp_cnt + 1; + OBD_ALLOC_PTR_ARRAY(comp_array, new_cnt); + if (comp_array == NULL) { + OBD_FREE_PTR_ARRAY(mirror_array, mirror_cnt); + RETURN(-ENOMEM); + } + + mirror_id = 0; + for (i = 0; i < lo->ldo_comp_cnt; i++) { + lod_comp = &lo->ldo_comp_entries[i]; + + /* + * Add mirror from a non-flr file, create new mirror ID. + * Otherwise, keep existing mirror's component ID, used + * for mirror extension. + */ + if (lo->ldo_mirror_count == 0 && + mirror_id_of(lod_comp->llc_id) == 0) + lod_comp->llc_id = pflr_id(1, i + 1); + + if (lod_comp->llc_id != LCME_ID_INVAL && + mirror_id_of(lod_comp->llc_id) > mirror_id) + mirror_id = mirror_id_of(lod_comp->llc_id); + + if (!lo->ldo_is_composite) { + lod_comp->llc_extent.e_start = 0; + lod_comp->llc_extent.e_end = LUSTRE_EOF; + lod_comp_set_init(lod_comp); + } + } + + memcpy(comp_array, lo->ldo_comp_entries, + sizeof(*comp_array) * lo->ldo_comp_cnt); + + lod_comp = &comp_array[new_cnt - 1]; + lod_comp->llc_magic = LOV_MAGIC_FOREIGN; + lod_comp->llc_extent.e_start = 0; + lod_comp->llc_extent.e_end = LUSTRE_EOF; + lod_comp->llc_length = sizeof(struct lov_hsm_base); + lod_comp->llc_type = LU_FOREIGN_TYPE_PCCRO; + lod_comp->llc_foreign_flags = HS_EXISTS | HS_ARCHIVED | HS_PCCRO; + memset(&lod_comp->llc_hsm, 0, sizeof(lod_comp->llc_hsm)); + + if (lo->ldo_mirrors) + OBD_FREE_PTR_ARRAY(lo->ldo_mirrors, lo->ldo_mirror_count); + OBD_FREE_PTR_ARRAY(lo->ldo_comp_entries, lo->ldo_comp_cnt); + + /* + * The @ldo_mirror will be refilled by lod_fill_mirrors() when + * call lod_striped_create() for layout change. + */ + lo->ldo_mirrors = mirror_array; + lo->ldo_mirror_count = mirror_cnt; + lo->ldo_comp_entries = comp_array; + lo->ldo_comp_cnt = new_cnt; + lo->ldo_is_composite = 1; + + ++mirror_id; + lod_comp->llc_id = LCME_ID_INVAL; + lod_comp->llc_id = lod_gen_component_id(lo, mirror_id, new_cnt - 1); + + if (lo->ldo_flr_state == LCM_FL_NONE) + lo->ldo_flr_state = LCM_FL_RDONLY; + lo->ldo_flr_state |= LCM_FL_PCC_RDONLY; + buf->lb_len = lod_comp_md_size(lo, false); + rc = lod_sub_declare_xattr_set(env, lod_object_child(lo), + buf, XATTR_NAME_LOV, 0, th); + if (rc) + lod_striping_free(env, lo); + + RETURN(rc); +} + +/* + * TODO: When clear LCM_FL_PCC_RDONLY flag from the layouts, it means the file + * is going to be modified. Currently it needs two RPCs: first one is to clear + * LCM_FL_PCC_RDONLY flag; the second one is to pick primary mirror and mark + * the file as LCM_FL_WRITE_PENDING. + * These two RPCs can be combined in one RPC call. + */ +static int lod_declare_pccro_clear(const struct lu_env *env, + struct dt_object *dt, struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_object *lo = lod_dt_obj(dt); + struct lod_layout_component *lod_comp; + int rc; + + ENTRY; + + rc = lod_striping_load(env, lo); + if (rc) + RETURN(rc); + + if (!(lo->ldo_flr_state & LCM_FL_PCC_RDONLY)) + RETURN(-EALREADY); + + rc = lod_layout_data_init(info, lo->ldo_comp_cnt); + if (rc) + RETURN(rc); + + lod_comp = lod_locate_comp_hsm(lo, NULL); + if (lod_comp == NULL) { + CDEBUG(D_LAYOUT, "Not found any HSM component\n"); + GOTO(out, rc = -EINVAL); + } + + lod_comp->llc_foreign_flags &= ~HS_PCCRO; + lo->ldo_flr_state &= ~LCM_FL_PCC_RDONLY; + lod_obj_inc_layout_gen(lo); + info->lti_buf.lb_len = lod_comp_md_size(lo, false); + rc = lod_sub_declare_xattr_set(env, lod_object_child(lo), + &info->lti_buf, XATTR_NAME_LOV, 0, th); +out: + if (rc) + lod_striping_free(env, lo); + + RETURN(rc); +} + +static int lod_declare_update_pccro(const struct lu_env *env, + struct dt_object *dt, + struct md_layout_change *mlc, + struct thandle *th) +{ + struct layout_intent *intent = mlc->mlc_intent; + int rc; + + switch (intent->lai_opc) { + case LAYOUT_INTENT_PCCRO_SET: + rc = lod_declare_pccro_set(env, dt, th); + break; + case LAYOUT_INTENT_PCCRO_CLEAR: + rc = lod_declare_pccro_clear(env, dt, th); + break; + default: + rc = -EOPNOTSUPP; + break; + } + + return rc; +} + static int lod_declare_update_rdonly(const struct lu_env *env, struct lod_object *lo, struct md_layout_change *mlc, struct thandle *th) @@ -7315,10 +8288,10 @@ static int lod_declare_update_rdonly(const struct lu_env *env, if (mlc->mlc_opc == MD_LAYOUT_WRITE) { struct layout_intent *layout = mlc->mlc_intent; - int write = layout->li_opc == LAYOUT_INTENT_WRITE; + int write = layout->lai_opc == LAYOUT_INTENT_WRITE; int picked; - extent = layout->li_extent; + extent = layout->lai_extent; CDEBUG(D_LAYOUT, DFID": trying to write :"DEXT"\n", PFID(lod_object_fid(lo)), PEXT(&extent)); @@ -7336,7 +8309,7 @@ static int lod_declare_update_rdonly(const struct lu_env *env, if (rc < 0) GOTO(out, rc); - if (layout->li_opc == LAYOUT_INTENT_TRUNC) { + if (layout->lai_opc == LAYOUT_INTENT_TRUNC) { /** * trunc transfers [0, size) in the intent extent, we'd * stale components overlapping [size, eof). @@ -7351,7 +8324,7 @@ static int lod_declare_update_rdonly(const struct lu_env *env, GOTO(out, rc); /* restore truncate intent extent */ - if (layout->li_opc == LAYOUT_INTENT_TRUNC) + if (layout->lai_opc == LAYOUT_INTENT_TRUNC) extent.e_end = extent.e_start; /* instantiate components for the picked mirror, start from 0 */ @@ -7413,19 +8386,13 @@ static int lod_declare_update_rdonly(const struct lu_env *env, * This way it can make sure that the layout version is * monotonously increased in this writing era. */ lod_obj_inc_layout_gen(lo); - if (lo->ldo_layout_gen > (LCME_ID_MAX >> 1)) { - __u32 layout_version; - - get_random_bytes(&layout_version, sizeof(layout_version)); - lo->ldo_layout_gen = layout_version & 0xffff; - } rc = lod_declare_instantiate_components(env, lo, th, 0); if (rc) GOTO(out, rc); layout_attr->la_valid = LA_LAYOUT_VERSION; - layout_attr->la_layout_version = 0; /* set current version */ + layout_attr->la_layout_version = 0; if (mlc->mlc_opc == MD_LAYOUT_RESYNC) layout_attr->la_layout_version = LU_LAYOUT_RESYNC; rc = lod_declare_attr_set(env, &lo->ldo_obj, layout_attr, th); @@ -7455,22 +8422,31 @@ static int lod_declare_update_write_pending(const struct lu_env *env, LASSERT(mlc->mlc_opc == MD_LAYOUT_WRITE || mlc->mlc_opc == MD_LAYOUT_RESYNC); - /* look for the primary mirror */ + /* look for the first preferred mirror */ for (i = 0; i < lo->ldo_mirror_count; i++) { if (lo->ldo_mirrors[i].lme_stale) continue; - - LASSERTF(primary < 0, DFID " has multiple primary: %u / %u\n", - PFID(lod_object_fid(lo)), - lo->ldo_mirrors[i].lme_id, - lo->ldo_mirrors[primary].lme_id); + if (lo->ldo_mirrors[i].lme_prefer == 0) + continue; + if (lo->ldo_mirrors[i].lme_hsm) + continue; primary = i; + break; } if (primary < 0) { - CERROR(DFID ": doesn't have a primary mirror\n", - PFID(lod_object_fid(lo))); - GOTO(out, rc = -ENODATA); + /* no primary, use any in-sync */ + for (i = 0; i < lo->ldo_mirror_count; i++) { + if (lo->ldo_mirrors[i].lme_stale) + continue; + primary = i; + break; + } + if (primary < 0) { + CERROR(DFID ": doesn't have a primary mirror\n", + PFID(lod_object_fid(lo))); + GOTO(out, rc = -ENODATA); + } } CDEBUG(D_LAYOUT, DFID": found primary %u\n", @@ -7489,11 +8465,11 @@ static int lod_declare_update_write_pending(const struct lu_env *env, if (mlc->mlc_opc == MD_LAYOUT_WRITE) { struct layout_intent *layout = mlc->mlc_intent; - int write = layout->li_opc == LAYOUT_INTENT_WRITE; + int write = layout->lai_opc == LAYOUT_INTENT_WRITE; - LASSERT(mlc->mlc_intent != NULL); + LASSERT(layout != NULL); - extent = mlc->mlc_intent->li_extent; + extent = layout->lai_extent; CDEBUG(D_LAYOUT, DFID": intent to write: "DEXT"\n", PFID(lod_object_fid(lo)), PEXT(&extent)); @@ -7504,7 +8480,7 @@ static int lod_declare_update_write_pending(const struct lu_env *env, if (rc < 0) GOTO(out, rc); - if (mlc->mlc_intent->li_opc == LAYOUT_INTENT_TRUNC) { + if (layout->lai_opc == LAYOUT_INTENT_TRUNC) { /** * trunc transfers [0, size) in the intent extent, we'd * stale components overlapping [size, eof). @@ -7522,7 +8498,7 @@ static int lod_declare_update_write_pending(const struct lu_env *env, * instantiate [0, mlc->mlc_intent->e_end) */ /* restore truncate intent extent */ - if (mlc->mlc_intent->li_opc == LAYOUT_INTENT_TRUNC) + if (layout->lai_opc == LAYOUT_INTENT_TRUNC) extent.e_end = extent.e_start; extent.e_start = 0; @@ -7568,20 +8544,20 @@ static int lod_declare_update_write_pending(const struct lu_env *env, if (rc) GOTO(out, rc); + lod_obj_inc_layout_gen(lo); + /* 3. transfer layout version to OST objects. * transfer new layout version to OST objects so that stale writes * can be denied. It also ends an era of writing by setting * LU_LAYOUT_RESYNC. Normal client can never use this bit to * send write RPC; only resync RPCs could do it. */ layout_attr->la_valid = LA_LAYOUT_VERSION; - layout_attr->la_layout_version = 0; /* set current version */ + layout_attr->la_layout_version = 0; if (mlc->mlc_opc == MD_LAYOUT_RESYNC) layout_attr->la_layout_version = LU_LAYOUT_RESYNC; rc = lod_declare_attr_set(env, &lo->ldo_obj, layout_attr, th); if (rc) GOTO(out, rc); - - lod_obj_inc_layout_gen(lo); out: if (rc) lod_striping_free(env, lo); @@ -7593,6 +8569,7 @@ static int lod_declare_update_sync_pending(const struct lu_env *env, struct thandle *th) { struct lod_thread_info *info = lod_env_info(env); + struct lu_attr *layout_attr = &info->lti_layout_attr; unsigned sync_components = 0; unsigned resync_components = 0; int i; @@ -7665,6 +8642,12 @@ static int lod_declare_update_sync_pending(const struct lu_env *env, lo->ldo_flr_state = LCM_FL_RDONLY; lod_obj_inc_layout_gen(lo); + layout_attr->la_valid = LA_LAYOUT_VERSION; + layout_attr->la_layout_version = 0; + rc = lod_declare_attr_set(env, &lo->ldo_obj, layout_attr, th); + if (rc) + GOTO(out, rc); + info->lti_buf.lb_len = lod_comp_md_size(lo, false); rc = lod_sub_declare_xattr_set(env, lod_object_child(lo), &info->lti_buf, XATTR_NAME_LOV, 0, th); @@ -7726,8 +8709,8 @@ static int lod_dir_declare_layout_attach(const struct lu_env *env, if (!lmv_is_sane(lmv)) RETURN(-EINVAL); - if (!dt_try_as_dir(env, dt)) - return -ENOTDIR; + if (!dt_try_as_dir(env, dt, false)) + RETURN(-ENOTDIR); dof->dof_type = DFT_DIR; @@ -7767,7 +8750,7 @@ static int lod_dir_declare_layout_attach(const struct lu_env *env, stripes[i + lo->ldo_dir_stripe_count] = dto; - if (!dt_try_as_dir(env, dto)) + if (!dt_try_as_dir(env, dto, true)) GOTO(out, rc = -ENOTDIR); rc = lod_sub_declare_ref_add(env, dto, th); @@ -7825,9 +8808,11 @@ static int lod_dir_declare_layout_attach(const struct lu_env *env, OBD_FREE_PTR_ARRAY(lo->ldo_stripe, lo->ldo_dir_stripes_allocated); lo->ldo_stripe = stripes; + lo->ldo_is_foreign = 0; lo->ldo_dir_migrate_offset = lo->ldo_dir_stripe_count; lo->ldo_dir_migrate_hash = le32_to_cpu(lmv->lmv_hash_type); lo->ldo_dir_stripe_count += stripe_count; + lo->ldo_dir_layout_version++; lo->ldo_dir_stripes_allocated += stripe_count; /* plain directory split creates target as a plain directory, while @@ -7859,7 +8844,7 @@ static int lod_dir_declare_layout_detach(const struct lu_env *env, int i; int rc = 0; - if (!dt_try_as_dir(env, dt)) + if (!dt_try_as_dir(env, dt, true)) return -ENOTDIR; if (!lo->ldo_dir_stripe_count) @@ -7871,7 +8856,7 @@ static int lod_dir_declare_layout_detach(const struct lu_env *env, if (!dto) continue; - if (!dt_try_as_dir(env, dto)) + if (!dt_try_as_dir(env, dto, true)) return -ENOTDIR; rc = lod_sub_declare_delete(env, dto, @@ -7904,7 +8889,7 @@ static int dt_dir_is_empty(const struct lu_env *env, ENTRY; - if (!dt_try_as_dir(env, obj)) + if (!dt_try_as_dir(env, obj, true)) RETURN(-ENOTDIR); iops = &obj->do_index_ops->dio_it; @@ -7942,16 +8927,16 @@ static int lod_dir_declare_layout_shrink(const struct lu_env *env, struct lod_object *lo = lod_dt_obj(dt); struct dt_object *next = dt_object_child(dt); struct lmv_user_md *lmu = mlc->mlc_buf.lb_buf; - __u32 final_stripe_count; char *stripe_name = info->lti_key; struct lu_buf *lmv_buf = &info->lti_buf; + __u32 final_stripe_count; struct dt_object *dto; int i; int rc; LASSERT(lmu); - if (!dt_try_as_dir(env, dt)) + if (!dt_try_as_dir(env, dt, true)) return -ENOTDIR; /* shouldn't be called on plain directory */ @@ -7970,9 +8955,6 @@ static int lod_dir_declare_layout_shrink(const struct lu_env *env, continue; if (i < final_stripe_count) { - if (final_stripe_count == 1) - continue; - rc = lod_sub_declare_xattr_set(env, dto, lmv_buf, XATTR_NAME_LMV, LU_XATTR_REPLACE, th); @@ -8034,6 +9016,7 @@ static int lod_dir_declare_layout_split(const struct lu_env *env, struct dt_object_format *dof = &info->lti_format; struct lmv_user_md_v1 *lum = mlc->mlc_spec->u.sp_ea.eadata; struct dt_object **stripes; + int mdt_count = lod->lod_remote_mdt_count + 1; u32 stripe_count; u32 saved_count; int i; @@ -8049,6 +9032,29 @@ static int lod_dir_declare_layout_split(const struct lu_env *env, if (stripe_count <= saved_count) RETURN(-EINVAL); + /* if the split target is overstriped, we need to put that flag in the + * current layout so it can allocate the larger number of stripes + * + * Note we need to pick up any hash *flags* which affect allocation + * *before* allocation, so they're used in allocating the directory, + * rather than after when we finalize directory setup (at the end of + * this function). + */ + if (le32_to_cpu(lum->lum_hash_type) & LMV_HASH_FLAG_OVERSTRIPED) + lo->ldo_dir_hash_type |= LMV_HASH_FLAG_OVERSTRIPED; + + if (!(lo->ldo_dir_hash_type & LMV_HASH_FLAG_OVERSTRIPED) && + stripe_count > mdt_count) { + RETURN(-E2BIG); + } else if ((lo->ldo_dir_hash_type & LMV_HASH_FLAG_OVERSTRIPED) && + (stripe_count > mdt_count * LMV_MAX_STRIPES_PER_MDT || + /* a single MDT doesn't initialize the infrastructure for striped + * directories, so we just don't support overstriping in that case + */ + mdt_count == 1)) { + RETURN(-E2BIG); + } + dof->dof_type = DFT_DIR; OBD_ALLOC(stripes, sizeof(*stripes) * stripe_count); @@ -8059,6 +9065,7 @@ static int lod_dir_declare_layout_split(const struct lu_env *env, stripes[i] = lo->ldo_stripe[i]; lod_qos_statfs_update(env, lod, &lod->lod_mdt_descs); + rc = lod_mdt_alloc_qos(env, lo, stripes, saved_count, stripe_count); if (rc == -EAGAIN) rc = lod_mdt_alloc_rr(env, lo, stripes, saved_count, @@ -8072,6 +9079,7 @@ static int lod_dir_declare_layout_split(const struct lu_env *env, OBD_FREE(lo->ldo_stripe, sizeof(*stripes) * lo->ldo_dir_stripes_allocated); lo->ldo_stripe = stripes; + lo->ldo_is_foreign = 0; lo->ldo_dir_striped = 1; lo->ldo_dir_stripe_count = rc; lo->ldo_dir_stripes_allocated = stripe_count; @@ -8196,9 +9204,12 @@ static int lod_dir_layout_shrink(const struct lu_env *env, lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE); lmv->lmv_stripe_count = cpu_to_le32(final_stripe_count); lmv->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type) & - cpu_to_le32(LMV_HASH_TYPE_MASK); + cpu_to_le32(LMV_HASH_TYPE_MASK | + LMV_HASH_FLAG_FIXED); lmv->lmv_layout_version = cpu_to_le32(lo->ldo_dir_layout_version + 1); + lmv->lmv_migrate_offset = 0; + lmv->lmv_migrate_hash = 0; for (i = 0; i < lo->ldo_dir_stripe_count; i++) { dto = lo->ldo_stripe[i]; @@ -8206,14 +9217,6 @@ static int lod_dir_layout_shrink(const struct lu_env *env, continue; if (i < final_stripe_count) { - /* if only one stripe left, no need to update - * LMV because this stripe will replace master - * object and act as a plain directory. - */ - if (final_stripe_count == 1) - continue; - - rc = lod_fld_lookup(env, lod, lu_object_fid(&dto->do_lu), &mdtidx, &type); @@ -8307,6 +9310,19 @@ static int lod_declare_layout_change(const struct lu_env *env, dt_object_remote(dt_object_child(dt))) RETURN(-EINVAL); + if (mlc->mlc_opc == MD_LAYOUT_WRITE) { + struct layout_intent *intent = mlc->mlc_intent; + + if (intent->lai_opc == LAYOUT_INTENT_PCCRO_SET || + intent->lai_opc == LAYOUT_INTENT_PCCRO_CLEAR) { + if (!S_ISREG(dt->do_lu.lo_header->loh_attr)) + RETURN(-EINVAL); + + rc = lod_declare_update_pccro(env, dt, mlc, th); + RETURN(rc); + } + } + rc = lod_striping_load(env, lo); if (rc) GOTO(out, rc); @@ -8335,6 +9351,9 @@ static int lod_declare_layout_change(const struct lu_env *env, rc = -ENOTSUPP; break; } + if (rc == 0) + rc = lod_save_layout_gen_intrans(info, lo); + out: RETURN(rc); } @@ -8345,8 +9364,9 @@ out: static int lod_layout_change(const struct lu_env *env, struct dt_object *dt, struct md_layout_change *mlc, struct thandle *th) { + struct lod_thread_info *info = lod_env_info(env); struct lu_attr *attr = &lod_env_info(env)->lti_attr; - struct lu_attr *layout_attr = &lod_env_info(env)->lti_layout_attr; + struct lu_attr *layout_attr = &info->lti_layout_attr; struct lod_object *lo = lod_dt_obj(dt); int rc; @@ -8358,6 +9378,16 @@ static int lod_layout_change(const struct lu_env *env, struct dt_object *dt, RETURN(rc); } + rc = lod_check_layout_gen_intrans(info, lo); + if (rc > 0) { + CDEBUG(D_LAYOUT, + "%s: obj "DFID" gen changed from %d to %d in transaction, retry the transaction \n", + dt->do_lu.lo_dev->ld_obd->obd_name, + PFID(lu_object_fid(&dt->do_lu)), + info->lti_gen[rc - 1], lo->ldo_layout_gen); + RETURN(-EAGAIN); + } + rc = lod_striped_create(env, dt, attr, NULL, th); if (!rc && layout_attr->la_valid & LA_LAYOUT_VERSION) { layout_attr->la_layout_version |= lo->ldo_layout_gen; @@ -8367,7 +9397,7 @@ static int lod_layout_change(const struct lu_env *env, struct dt_object *dt, RETURN(rc); } -struct dt_object_operations lod_obj_ops = { +const struct dt_object_operations lod_obj_ops = { .do_read_lock = lod_read_lock, .do_write_lock = lod_write_lock, .do_read_unlock = lod_read_unlock, @@ -8468,7 +9498,7 @@ static int lod_punch(const struct lu_env *env, struct dt_object *dt, * body_ops themselves will check file type inside, see lod_read/write/punch for * details. */ -const struct dt_body_operations lod_body_ops = { +static const struct dt_body_operations lod_body_ops = { .dbo_read = lod_read, .dbo_declare_write = lod_declare_write, .dbo_write = lod_write, @@ -8546,56 +9576,6 @@ static int lod_object_init(const struct lu_env *env, struct lu_object *lo, /** * - * Alloc cached foreign LOV - * - * \param[in] lo object - * \param[in] size size of foreign LOV - * - * \retval 0 on success - * \retval negative if failed - */ -int lod_alloc_foreign_lov(struct lod_object *lo, size_t size) -{ - OBD_ALLOC_LARGE(lo->ldo_foreign_lov, size); - if (lo->ldo_foreign_lov == NULL) - return -ENOMEM; - lo->ldo_foreign_lov_size = size; - lo->ldo_is_foreign = 1; - return 0; -} - -/** - * - * Free cached foreign LOV - * - * \param[in] lo object - */ -void lod_free_foreign_lov(struct lod_object *lo) -{ - if (lo->ldo_foreign_lov != NULL) - OBD_FREE_LARGE(lo->ldo_foreign_lov, lo->ldo_foreign_lov_size); - lo->ldo_foreign_lov = NULL; - lo->ldo_foreign_lov_size = 0; - lo->ldo_is_foreign = 0; -} - -/** - * - * Free cached foreign LMV - * - * \param[in] lo object - */ -void lod_free_foreign_lmv(struct lod_object *lo) -{ - if (lo->ldo_foreign_lmv != NULL) - OBD_FREE_LARGE(lo->ldo_foreign_lmv, lo->ldo_foreign_lmv_size); - lo->ldo_foreign_lmv = NULL; - lo->ldo_foreign_lmv_size = 0; - lo->ldo_dir_is_foreign = 0; -} - -/** - * * Release resources associated with striping. * * If the object is striped (regular or directory), then release @@ -8607,14 +9587,17 @@ void lod_free_foreign_lmv(struct lod_object *lo) void lod_striping_free_nolock(const struct lu_env *env, struct lod_object *lo) { struct lod_layout_component *lod_comp; + __u32 obj_attr = lo->ldo_obj.do_lu.lo_header->loh_attr; int i, j; if (unlikely(lo->ldo_is_foreign)) { - lod_free_foreign_lov(lo); - lo->ldo_comp_cached = 0; - } else if (unlikely(lo->ldo_dir_is_foreign)) { - lod_free_foreign_lmv(lo); - lo->ldo_dir_stripe_loaded = 0; + if (S_ISREG(obj_attr)) { + lod_free_foreign_lov(lo); + lo->ldo_comp_cached = 0; + } else if (S_ISDIR(obj_attr)) { + lod_free_foreign_lmv(lo); + lo->ldo_dir_stripe_loaded = 0; + } } else if (lo->ldo_stripe != NULL) { LASSERT(lo->ldo_comp_entries == NULL); LASSERT(lo->ldo_dir_stripes_allocated > 0); @@ -8630,11 +9613,15 @@ void lod_striping_free_nolock(const struct lu_env *env, struct lod_object *lo) lo->ldo_dir_stripes_allocated = 0; lo->ldo_dir_stripe_loaded = 0; lo->ldo_dir_stripe_count = 0; + lo->ldo_obj.do_index_ops = NULL; } else if (lo->ldo_comp_entries != NULL) { for (i = 0; i < lo->ldo_comp_cnt; i++) { /* free lod_layout_component::llc_stripe array */ lod_comp = &lo->ldo_comp_entries[i]; + /* HSM layout component */ + if (lod_comp->llc_magic == LOV_MAGIC_FOREIGN) + continue; if (lod_comp->llc_stripe == NULL) continue; LASSERT(lod_comp->llc_stripes_allocated != 0); @@ -8706,7 +9693,7 @@ static int lod_object_print(const struct lu_env *env, void *cookie, return (*p)(env, cookie, LUSTRE_LOD_NAME"-object@%p", o); } -struct lu_object_operations lod_lu_obj_ops = { +const struct lu_object_operations lod_lu_obj_ops = { .loo_object_init = lod_object_init, .loo_object_free = lod_object_free, .loo_object_release = lod_object_release,