X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Flod%2Flod_object.c;h=ea57141597a1ece3d1d6b5f0c6f375e8647d472d;hb=HEAD;hp=1fa0ffb2ba1913c123d7c073dbea19d24980cb31;hpb=4becec906d3a997a8ff4a8c10e39633ff69bd738;p=fs%2Flustre-release.git diff --git a/lustre/lod/lod_object.c b/lustre/lod/lod_object.c index 1fa0ffb..3878d3b 100644 --- a/lustre/lod/lod_object.c +++ b/lustre/lod/lod_object.c @@ -344,7 +344,7 @@ static int lod_it_key_rec(const struct lu_env *env, const struct dt_it *di, key_rec); } -static struct dt_index_operations lod_index_ops = { +static const struct dt_index_operations lod_index_ops = { .dio_lookup = lod_lookup, .dio_declare_insert = lod_declare_insert, .dio_insert = lod_insert, @@ -367,6 +367,50 @@ static struct dt_index_operations lod_index_ops = { }; /** + * Implementation of dt_index_operations::dio_lookup + * + * Used with striped directories. + * + * \see dt_index_operations::dio_lookup() in the API description for details. + */ +static int lod_striped_lookup(const struct lu_env *env, struct dt_object *dt, + struct dt_rec *rec, const struct dt_key *key) +{ + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object *next; + const char *name = (const char *)key; + + LASSERT(lo->ldo_dir_stripe_count > 0); + + if (strcmp(name, dot) == 0) { + struct lu_fid *fid = (struct lu_fid *)rec; + + *fid = *lod_object_fid(lo); + return 1; + } + + if (strcmp(name, dotdot) == 0) { + next = dt_object_child(dt); + } else { + int index; + + index = __lmv_name_to_stripe_index(lo->ldo_dir_hash_type, + lo->ldo_dir_stripe_count, + lo->ldo_dir_migrate_hash, + lo->ldo_dir_migrate_offset, + name, strlen(name), true); + if (index < 0) + return index; + + next = lo->ldo_stripe[index]; + if (!next || !dt_object_exists(next)) + return -ENODEV; + } + + return next->do_index_ops->dio_lookup(env, next, rec, key); +} + +/** * Implementation of dt_it_ops::init. * * Used with striped objects. Internally just initializes the iterator @@ -743,8 +787,8 @@ static int lod_striped_it_load(const struct lu_env *env, return next->do_index_ops->dio_it.load(env, it->lit_it, hash); } -static struct dt_index_operations lod_striped_index_ops = { - .dio_lookup = lod_lookup, +static const struct dt_index_operations lod_striped_index_ops = { + .dio_lookup = lod_striped_lookup, .dio_declare_insert = lod_declare_insert, .dio_insert = lod_insert, .dio_declare_delete = lod_declare_delete, @@ -834,7 +878,7 @@ int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo, memcpy(buf->lb_buf, tbuf.lb_buf, tbuf.lb_len); } - if (unlikely(!dt_try_as_dir(env, obj))) + if (unlikely(!dt_try_as_dir(env, obj, true))) RETURN(-ENOTDIR); memset(&lmv1->lmv_stripe_fids[0], 0, stripes * sizeof(struct lu_fid)); @@ -871,18 +915,17 @@ int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo, goto next; } - len = snprintf(name, sizeof(name), - DFID":", PFID(&ent->lde_fid)); + len = scnprintf(name, sizeof(name), + DFID":", PFID(&ent->lde_fid)); /* The ent->lde_name is composed of ${FID}:${index} */ if (ent->lde_namelen < len + 1 || memcmp(ent->lde_name, name, len) != 0) { - CDEBUG(lod->lod_lmv_failout ? D_ERROR : D_INFO, - "%s: invalid shard name %.*s with the FID "DFID - " for the striped directory "DFID", %s\n", - lod2obd(lod)->obd_name, ent->lde_namelen, - ent->lde_name, PFID(&fid), - PFID(lu_object_fid(&obj->do_lu)), - lod->lod_lmv_failout ? "failout" : "skip"); + CDEBUG_LIMIT(lod->lod_lmv_failout ? D_ERROR : D_INFO, + "%s: invalid shard name %.*s with the FID "DFID" for the striped directory "DFID", %s\n", + lod2obd(lod)->obd_name, ent->lde_namelen, + ent->lde_name, PFID(&fid), + PFID(lu_object_fid(&obj->do_lu)), + lod->lod_lmv_failout ? "failout" : "skip"); if (lod->lod_lmv_failout) break; @@ -894,15 +937,15 @@ int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo, do { if (ent->lde_name[len] < '0' || ent->lde_name[len] > '9') { - CDEBUG(lod->lod_lmv_failout ? D_ERROR : D_INFO, - "%s: invalid shard name %.*s with the " - "FID "DFID" for the striped directory " - DFID", %s\n", - lod2obd(lod)->obd_name, ent->lde_namelen, - ent->lde_name, PFID(&fid), - PFID(lu_object_fid(&obj->do_lu)), - lod->lod_lmv_failout ? - "failout" : "skip"); + CDEBUG_LIMIT(lod->lod_lmv_failout ? + D_ERROR : D_INFO, + "%s: invalid shard name %.*s with the FID "DFID" for the striped directory "DFID", %s\n", + lod2obd(lod)->obd_name, + ent->lde_namelen, + ent->lde_name, PFID(&fid), + PFID(lu_object_fid(&obj->do_lu)), + lod->lod_lmv_failout ? + "failout" : "skip"); if (lod->lod_lmv_failout) break; @@ -928,7 +971,8 @@ int lod_load_lmv_shards(const struct lu_env *env, struct lod_object *lo, } /* The slot has been occupied. */ - if (!fid_is_zero(&lmv1->lmv_stripe_fids[index])) { + if (!fid_is_zero(&lmv1->lmv_stripe_fids[index]) && + !CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME)) { struct lu_fid fid0; fid_le_to_cpu(&fid0, @@ -1075,16 +1119,57 @@ static int lod_attr_get(const struct lu_env *env, return dt_attr_get(env, dt_object_child(dt), attr); } +void lod_adjust_stripe_size(struct lod_layout_component *comp, + __u32 def_stripe_size) +{ + __u64 comp_end = comp->llc_extent.e_end; + + /* Choose stripe size if not set. Note that default stripe size can't + * be used as is, because it must be multiplier of given component end. + * - first check if default stripe size can be used + * - if not than select the lowest set bit from component end and use + * that value as stripe size + */ + if (!comp->llc_stripe_size) { + if (comp_end == LUSTRE_EOF || !(comp_end % def_stripe_size)) + comp->llc_stripe_size = def_stripe_size; + else + comp->llc_stripe_size = comp_end & ~(comp_end - 1); + } else { + if (comp_end != LUSTRE_EOF && + comp_end & (LOV_MIN_STRIPE_SIZE - 1)) { + CWARN("Component end %llu is not a multiple of min size %u\n", + comp_end, LOV_MIN_STRIPE_SIZE); + comp_end = round_up(comp_end, LOV_MIN_STRIPE_SIZE); + } + /* check stripe size is multiplier of comp_end */ + if (comp_end != LUSTRE_EOF && + comp_end != comp->llc_extent.e_start && + comp_end % comp->llc_stripe_size) { + /* fix that even for defined stripe size but warn + * about the problem, that must not happen + */ + CWARN("Component end %llu is not aligned by the stripe size %u\n", + comp_end, comp->llc_stripe_size); + comp->llc_stripe_size = comp_end & ~(comp_end - 1); + } + } +} + static inline void lod_adjust_stripe_info(struct lod_layout_component *comp, - struct lov_desc *desc) + struct lov_desc *desc, + int append_stripes) { - if (comp->llc_pattern != LOV_PATTERN_MDT) { - if (!comp->llc_stripe_count) + if (!(comp->llc_pattern & LOV_PATTERN_MDT)) { + if (append_stripes) { + comp->llc_stripe_count = append_stripes; + } else if (!comp->llc_stripe_count) { comp->llc_stripe_count = desc->ld_default_stripe_count; + } } - if (comp->llc_stripe_size <= 0) - comp->llc_stripe_size = desc->ld_default_stripe_size; + + lod_adjust_stripe_size(comp, desc->ld_default_stripe_size); } int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo, @@ -1092,20 +1177,23 @@ int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo, struct lod_obj_stripe_cb_data *data) { struct lod_layout_component *lod_comp; - int i, j, rc; + int i, j, rc = 0; ENTRY; - LASSERT(lo->ldo_comp_cnt != 0 && lo->ldo_comp_entries != NULL); + mutex_lock(&lo->ldo_layout_mutex); for (i = 0; i < lo->ldo_comp_cnt; i++) { lod_comp = &lo->ldo_comp_entries[i]; + if (lod_comp->llc_magic == LOV_MAGIC_FOREIGN) + continue; + if (lod_comp->llc_stripe == NULL) continue; /* has stripe but not inited yet, this component has been * declared to be created, but hasn't created yet. */ - if (!lod_comp_inited(lod_comp)) + if (!lod_comp_inited(lod_comp) && !data->locd_declare) continue; if (data->locd_comp_skip_cb && @@ -1115,7 +1203,7 @@ int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo, if (data->locd_comp_cb) { rc = data->locd_comp_cb(env, lo, i, data); if (rc) - RETURN(rc); + GOTO(unlock, rc); } /* could used just to do sth about component, not each @@ -1132,62 +1220,12 @@ int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo, continue; rc = data->locd_stripe_cb(env, lo, dt, th, i, j, data); if (rc != 0) - RETURN(rc); - } - } - RETURN(0); -} - -static bool lod_obj_attr_set_comp_skip_cb(const struct lu_env *env, - struct lod_object *lo, int comp_idx, - struct lod_obj_stripe_cb_data *data) -{ - struct lod_layout_component *lod_comp = &lo->ldo_comp_entries[comp_idx]; - bool skipped = false; - - if (!(data->locd_attr->la_valid & LA_LAYOUT_VERSION)) - return skipped; - - switch (lo->ldo_flr_state) { - case LCM_FL_WRITE_PENDING: { - int i; - - /* skip stale components */ - if (lod_comp->llc_flags & LCME_FL_STALE) { - skipped = true; - break; - } - - /* skip valid and overlapping components, therefore any - * attempts to write overlapped components will never succeed - * because client will get EINPROGRESS. */ - for (i = 0; i < lo->ldo_comp_cnt; i++) { - if (i == comp_idx) - continue; - - if (lo->ldo_comp_entries[i].llc_flags & LCME_FL_STALE) - continue; - - if (lu_extent_is_overlapped(&lod_comp->llc_extent, - &lo->ldo_comp_entries[i].llc_extent)) { - skipped = true; - break; - } + GOTO(unlock, rc); } - break; - } - default: - LASSERTF(0, "impossible: %d\n", lo->ldo_flr_state); - case LCM_FL_SYNC_PENDING: - break; } - - CDEBUG(D_LAYOUT, DFID": %s to set component %x to version: %u\n", - PFID(lu_object_fid(&lo->ldo_obj.do_lu)), - skipped ? "skipped" : "chose", lod_comp->llc_id, - data->locd_attr->la_layout_version); - - return skipped; +unlock: + mutex_unlock(&lo->ldo_layout_mutex); + RETURN(rc); } static inline int @@ -1245,7 +1283,7 @@ static int lod_declare_attr_set(const struct lu_env *env, if (!(attr->la_valid & LA_REMOTE_ATTR_SET)) RETURN(rc); - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER)) + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER)) RETURN(0); } else { if (!(attr->la_valid & (LA_UID | LA_GID | LA_PROJID | LA_MODE | @@ -1296,13 +1334,13 @@ static int lod_declare_attr_set(const struct lu_env *env, !S_ISREG(attr->la_mode)) RETURN(0); - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE)) { + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE)) { rc = lod_sub_declare_xattr_del(env, next, XATTR_NAME_LOV, th); RETURN(rc); } - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) || - OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PFL_RANGE)) { + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE) || + CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PFL_RANGE)) { struct lod_thread_info *info = lod_env_info(env); struct lu_buf *buf = &info->lti_buf; @@ -1344,7 +1382,7 @@ static int lod_attr_set(const struct lu_env *env, if (!(attr->la_valid & LA_REMOTE_ATTR_SET)) RETURN(rc); - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER)) + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER)) RETURN(0); } else { if (!(attr->la_valid & (LA_UID | LA_GID | LA_MODE | LA_PROJID | @@ -1385,7 +1423,6 @@ static int lod_attr_set(const struct lu_env *env, data.locd_attr = attr; data.locd_declare = false; - data.locd_comp_skip_cb = lod_obj_attr_set_comp_skip_cb; data.locd_stripe_cb = lod_obj_stripe_attr_set_cb; rc = lod_obj_for_each_stripe(env, lo, th, &data); } @@ -1397,12 +1434,12 @@ static int lod_attr_set(const struct lu_env *env, !S_ISREG(attr->la_mode)) RETURN(0); - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE)) { + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_STRIPE)) { rc = lod_sub_xattr_del(env, next, XATTR_NAME_LOV, th); RETURN(rc); } - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE)) { + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_CHANGE_STRIPE)) { struct lod_thread_info *info = lod_env_info(env); struct lu_buf *buf = &info->lti_buf; struct ost_id *oi = &info->lti_ostid; @@ -1440,7 +1477,7 @@ static int lod_attr_set(const struct lu_env *env, rc = lod_sub_xattr_set(env, next, buf, XATTR_NAME_LOV, LU_XATTR_REPLACE, th); - } else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PFL_RANGE)) { + } else if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PFL_RANGE)) { struct lod_thread_info *info = lod_env_info(env); struct lu_buf *buf = &info->lti_buf; struct lov_comp_md_v1 *lcm; @@ -1501,7 +1538,7 @@ static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt, RETURN(rc = rc > 0 ? -EINVAL : rc); if (buf->lb_buf == NULL || buf->lb_len == 0) { - CLASSERT(sizeof(*lmv1) <= sizeof(info->lti_key)); + BUILD_BUG_ON(sizeof(*lmv1) > sizeof(info->lti_key)); /* lti_buf is large enough for *lmv1 or a short * (<= sizeof(struct lmv_mds_md_v1)) foreign LMV @@ -1528,10 +1565,10 @@ static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt, if (le32_to_cpu(lmv1->lmv_magic) == LMV_MAGIC_V1) rc = lmv_mds_md_size( le32_to_cpu(lmv1->lmv_stripe_count), - LMV_MAGIC_V1); + le32_to_cpu(lmv1->lmv_magic)); } else { - lfm = buf->lb_buf; - if (le32_to_cpu(lfm->lfm_magic) == LMV_MAGIC_FOREIGN) + lmv1 = buf->lb_buf; + if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1) RETURN(rc); if (rc != sizeof(*lmv1)) @@ -1567,7 +1604,7 @@ static int lod_xattr_get(const struct lu_env *env, struct dt_object *dt, if (is_root && strcmp(XATTR_NAME_LOV, name) == 0) { struct lov_user_md *lum = buf->lb_buf; - struct lov_desc *desc = &dev->lod_desc; + struct lov_desc *desc = &dev->lod_ost_descs.ltd_lov_desc; if (buf->lb_buf == NULL) { rc = sizeof(*lum); @@ -1677,7 +1714,8 @@ static int lod_prep_lmv_md(const struct lu_env *env, struct dt_object *dt, lmm1->lmv_magic = cpu_to_le32(LMV_MAGIC); lmm1->lmv_stripe_count = cpu_to_le32(stripe_count); lmm1->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type); - if (lo->ldo_dir_hash_type & LMV_HASH_FLAG_MIGRATION) { + lmm1->lmv_layout_version = cpu_to_le32(lo->ldo_dir_layout_version); + if (lod_is_layout_changing(lo)) { lmm1->lmv_migrate_hash = cpu_to_le32(lo->ldo_dir_migrate_hash); lmm1->lmv_migrate_offset = cpu_to_le32(lo->ldo_dir_migrate_offset); @@ -1734,15 +1772,11 @@ int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo, RETURN(0); } - if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1) + if (!lmv_is_sane(lmv1)) RETURN(-EINVAL); - if (le32_to_cpu(lmv1->lmv_stripe_count) < 1) - RETURN(0); - LASSERT(lo->ldo_stripe == NULL); - OBD_ALLOC(stripe, sizeof(stripe[0]) * - (le32_to_cpu(lmv1->lmv_stripe_count))); + OBD_ALLOC_PTR_ARRAY(stripe, le32_to_cpu(lmv1->lmv_stripe_count)); if (stripe == NULL) RETURN(-ENOMEM); @@ -1783,8 +1817,13 @@ int lod_parse_dir_striping(const struct lu_env *env, struct lod_object *lo, } out: lo->ldo_stripe = stripe; + lo->ldo_is_foreign = 0; lo->ldo_dir_stripe_count = le32_to_cpu(lmv1->lmv_stripe_count); lo->ldo_dir_stripes_allocated = le32_to_cpu(lmv1->lmv_stripe_count); + lo->ldo_dir_layout_version = le32_to_cpu(lmv1->lmv_layout_version); + lo->ldo_dir_migrate_offset = le32_to_cpu(lmv1->lmv_migrate_offset); + lo->ldo_dir_migrate_hash = le32_to_cpu(lmv1->lmv_migrate_hash); + lo->ldo_dir_hash_type = le32_to_cpu(lmv1->lmv_hash_type); if (rc != 0) lod_striping_free_nolock(env, lo); @@ -1840,7 +1879,7 @@ static int lod_dir_declare_create_stripes(const struct lu_env *env, slave_lmv_buf.lb_buf = slave_lmm; slave_lmv_buf.lb_len = sizeof(*slave_lmm); - if (!dt_try_as_dir(env, dt_object_child(dt))) + if (!dt_try_as_dir(env, dt_object_child(dt), false)) GOTO(out, rc = -EINVAL); rec->rec_type = S_IFDIR; @@ -1855,35 +1894,78 @@ static int lod_dir_declare_create_stripes(const struct lu_env *env, if (!dto) continue; - rc = lod_sub_declare_create(env, dto, attr, NULL, dof, th); - if (rc != 0) - GOTO(out, rc); + /* directory split skip create for existing stripes */ + if (!(lod_is_splitting(lo) && i < lo->ldo_dir_split_offset)) { + rc = lod_sub_declare_create(env, dto, attr, NULL, dof, + th); + if (rc != 0) + GOTO(out, rc); - if (!dt_try_as_dir(env, dto)) - GOTO(out, rc = -EINVAL); + if (!dt_try_as_dir(env, dto, false)) + GOTO(out, rc = -EINVAL); - rc = lod_sub_declare_ref_add(env, dto, th); - if (rc != 0) - GOTO(out, rc); + rc = lod_sub_declare_ref_add(env, dto, th); + if (rc != 0) + GOTO(out, rc); - rec->rec_fid = lu_object_fid(&dto->do_lu); - rc = lod_sub_declare_insert(env, dto, - (const struct dt_rec *)rec, - (const struct dt_key *)dot, th); - if (rc != 0) - GOTO(out, rc); + rec->rec_fid = lu_object_fid(&dto->do_lu); + rc = lod_sub_declare_insert(env, dto, + (const struct dt_rec *)rec, + (const struct dt_key *)dot, + th); + if (rc != 0) + GOTO(out, rc); - /* master stripe FID will be put to .. */ - rec->rec_fid = lu_object_fid(&dt->do_lu); - rc = lod_sub_declare_insert(env, dto, - (const struct dt_rec *)rec, - (const struct dt_key *)dotdot, th); - if (rc != 0) - GOTO(out, rc); + /* master stripe FID will be put to .. */ + rec->rec_fid = lu_object_fid(&dt->do_lu); + rc = lod_sub_declare_insert(env, dto, + (const struct dt_rec *)rec, + (const struct dt_key *)dotdot, + th); + if (rc != 0) + GOTO(out, rc); + + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && + cfs_fail_val == i) + snprintf(stripe_name, sizeof(info->lti_key), + DFID":%u", + PFID(lu_object_fid(&dto->do_lu)), + i + 1); + else + snprintf(stripe_name, sizeof(info->lti_key), + DFID":%u", + PFID(lu_object_fid(&dto->do_lu)), i); + + sname = lod_name_get(env, stripe_name, + strlen(stripe_name)); + rc = linkea_links_new(&ldata, &info->lti_linkea_buf, + sname, lu_object_fid(&dt->do_lu)); + if (rc != 0) + GOTO(out, rc); + + linkea_buf.lb_buf = ldata.ld_buf->lb_buf; + linkea_buf.lb_len = ldata.ld_leh->leh_len; + rc = lod_sub_declare_xattr_set(env, dto, &linkea_buf, + XATTR_NAME_LINK, 0, th); + if (rc != 0) + GOTO(out, rc); + + rec->rec_fid = lu_object_fid(&dto->do_lu); + rc = lod_sub_declare_insert(env, dt_object_child(dt), + (const struct dt_rec *)rec, + (const struct dt_key *)stripe_name, th); + if (rc != 0) + GOTO(out, rc); + + rc = lod_sub_declare_ref_add(env, dt_object_child(dt), + th); + if (rc != 0) + GOTO(out, rc); + } - if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) || + if (!CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) || cfs_fail_val != i) { - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_LMV) && + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_LMV) && cfs_fail_val == i) slave_lmm->lmv_master_mdt_index = cpu_to_le32(i + 1); @@ -1895,39 +1977,6 @@ static int lod_dir_declare_create_stripes(const struct lu_env *env, if (rc != 0) GOTO(out, rc); } - - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && - cfs_fail_val == i) - snprintf(stripe_name, sizeof(info->lti_key), DFID":%u", - PFID(lu_object_fid(&dto->do_lu)), i + 1); - else - snprintf(stripe_name, sizeof(info->lti_key), DFID":%u", - PFID(lu_object_fid(&dto->do_lu)), i); - - sname = lod_name_get(env, stripe_name, strlen(stripe_name)); - rc = linkea_links_new(&ldata, &info->lti_linkea_buf, - sname, lu_object_fid(&dt->do_lu)); - if (rc != 0) - GOTO(out, rc); - - linkea_buf.lb_buf = ldata.ld_buf->lb_buf; - linkea_buf.lb_len = ldata.ld_leh->leh_len; - rc = lod_sub_declare_xattr_set(env, dto, &linkea_buf, - XATTR_NAME_LINK, 0, th); - if (rc != 0) - GOTO(out, rc); - - rec->rec_fid = lu_object_fid(&dto->do_lu); - rc = lod_sub_declare_insert(env, dt_object_child(dt), - (const struct dt_rec *)rec, - (const struct dt_key *)stripe_name, - th); - if (rc != 0) - GOTO(out, rc); - - rc = lod_sub_declare_ref_add(env, dt_object_child(dt), th); - if (rc != 0) - GOTO(out, rc); } rc = lod_sub_declare_xattr_set(env, dt_object_child(dt), @@ -1941,74 +1990,78 @@ out: RETURN(rc); } -static int lod_prep_md_striped_create(const struct lu_env *env, - struct dt_object *dt, - struct lu_attr *attr, - const struct lmv_user_md_v1 *lum, - struct dt_object_format *dof, - struct thandle *th) +/** + * Allocate a striping on a predefined set of MDTs. + * + * Allocates new striping using the MDT index range provided by the data from + * the lum_obejcts contained in the lmv_user_md passed to this method if + * \a is_specific is true; or allocates new layout starting from MDT index in + * lo->ldo_dir_stripe_offset. The exact order of MDTs is not important and + * varies depending on MDT status. The number of stripes needed and stripe + * offset are taken from the object. If that number cannot be met, then the + * function returns an error and then it's the caller's responsibility to + * release the stripes allocated. All the internal structures are protected, + * but no concurrent allocation is allowed on the same objects. + * + * \param[in] env execution environment for this thread + * \param[in] lo LOD object + * \param[out] stripes striping created + * \param[out] mdt_indices MDT indices of striping created + * \param[in] is_specific true if the MDTs are provided by lum; false if + * only the starting MDT index is provided + * + * \retval positive stripes allocated, including the first stripe allocated + * outside + * \retval negative errno on failure + */ +static int lod_mdt_alloc_specific(const struct lu_env *env, + struct lod_object *lo, + struct dt_object **stripes, + __u32 *mdt_indices, bool is_specific) { - struct lod_thread_info *info = lod_env_info(env); - struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev); - struct lod_tgt_descs *ltd = &lod->lod_mdt_descs; - struct lod_object *lo = lod_dt_obj(dt); - struct dt_object **stripe; - __u32 stripe_count; - int *idx_array; - __u32 master_index; - int rc = 0; - __u32 i; - __u32 j; - bool is_specific = false; - ENTRY; - - /* The lum has been verifed in lod_verify_md_striping */ - LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC || - le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC_SPECIFIC); - - stripe_count = lo->ldo_dir_stripe_count; - - OBD_ALLOC(idx_array, sizeof(idx_array[0]) * stripe_count); - if (idx_array == NULL) - RETURN(-ENOMEM); - - OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_count); - if (stripe == NULL) - GOTO(out_free, rc = -ENOMEM); + struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); + struct lu_tgt_descs *ltd = &lod->lod_mdt_descs; + struct lu_tgt_desc *tgt = NULL; + struct lu_object_conf conf = { .loc_flags = LOC_F_NEW }; + struct dt_device *tgt_dt = NULL; + struct lu_fid fid = { 0 }; + struct dt_object *dto; + u32 master_index; + u32 stripe_count = lo->ldo_dir_stripe_count; + int stripe_idx = 1; + int j; + int idx; + int rc; - /* Start index must be the master MDT */ master_index = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id; - idx_array[0] = master_index; - if (le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC_SPECIFIC) { - is_specific = true; - for (i = 1; i < stripe_count; i++) - idx_array[i] = le32_to_cpu(lum->lum_objects[i].lum_mds); - } - - for (i = 0; i < stripe_count; i++) { - struct lod_tgt_desc *tgt = NULL; - struct dt_object *dto; - struct lu_fid fid = { 0 }; - int idx; - struct lu_object_conf conf = { 0 }; - struct dt_device *tgt_dt = NULL; + if (!is_specific && stripe_count > 1) + /* Set the start index for the 2nd stripe allocation */ + mdt_indices[1] = (mdt_indices[0] + 1) % + (lod->lod_remote_mdt_count + 1); + for (; stripe_idx < stripe_count; stripe_idx++) { /* Try to find next avaible target */ - idx = idx_array[i]; + idx = mdt_indices[stripe_idx]; for (j = 0; j < lod->lod_remote_mdt_count; j++, idx = (idx + 1) % (lod->lod_remote_mdt_count + 1)) { bool already_allocated = false; __u32 k; - CDEBUG(D_INFO, "try idx %d, mdt cnt %u, allocated %u\n", - idx, lod->lod_remote_mdt_count + 1, i); + CDEBUG(D_INFO, + "try idx %d, mdt cnt %u, allocated %u, specific %d count %hu offset %d hash %#X\n", + idx, lod->lod_remote_mdt_count + 1, stripe_idx, + is_specific, lo->ldo_dir_stripe_count, + (int)lo->ldo_dir_stripe_offset, + lo->ldo_dir_hash_type); if (likely(!is_specific && - !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))) { + !CFS_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE) && + !(lo->ldo_dir_hash_type & + LMV_HASH_FLAG_OVERSTRIPED))) { /* check whether the idx already exists * in current allocated array */ - for (k = 0; k < i; k++) { - if (idx_array[k] == idx) { + for (k = 0; k < stripe_idx; k++) { + if (mdt_indices[k] == idx) { already_allocated = true; break; } @@ -2020,38 +2073,36 @@ static int lod_prep_md_striped_create(const struct lu_env *env, /* Sigh, this index is not in the bitmap, let's check * next available target */ - if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx) && + if (!test_bit(idx, ltd->ltd_tgt_bitmap) && idx != master_index) continue; if (idx == master_index) { /* Allocate the FID locally */ - rc = obd_fid_alloc(env, lod->lod_child_exp, - &fid, NULL); - if (rc < 0) - GOTO(out_put, rc); tgt_dt = lod->lod_child; + rc = dt_fid_alloc(env, tgt_dt, &fid, NULL, + NULL); + if (rc < 0) + continue; break; } /* check the status of the OSP */ tgt = LTD_TGT(ltd, idx); - if (tgt == NULL) + if (!tgt) continue; tgt_dt = tgt->ltd_tgt; - rc = dt_statfs(env, tgt_dt, &info->lti_osfs); - if (rc) { + if (!tgt->ltd_active) /* this OSP doesn't feel well */ - rc = 0; continue; - } - rc = obd_fid_alloc(env, tgt->ltd_exp, &fid, NULL); - if (rc < 0) { - rc = 0; + if (tgt->ltd_statfs.os_state & OS_STATFS_NOCREATE) + continue; + + rc = dt_fid_alloc(env, tgt_dt, &fid, NULL, NULL); + if (rc < 0) continue; - } break; } @@ -2059,23 +2110,24 @@ static int lod_prep_md_striped_create(const struct lu_env *env, /* Can not allocate more stripes */ if (j == lod->lod_remote_mdt_count) { CDEBUG(D_INFO, "%s: require stripes %u only get %d\n", - lod2obd(lod)->obd_name, stripe_count, i); + lod2obd(lod)->obd_name, stripe_count, + stripe_idx); break; } CDEBUG(D_INFO, "Get idx %d, for stripe %d "DFID"\n", - idx, i, PFID(&fid)); - idx_array[i] = idx; + idx, stripe_idx, PFID(&fid)); + mdt_indices[stripe_idx] = idx; /* Set the start index for next stripe allocation */ - if (!is_specific && i < stripe_count - 1) { + if (!is_specific && stripe_idx < stripe_count - 1) { /* * for large dir test, put all other slaves on one * remote MDT, otherwise we may save too many local * slave locks which will exceed RS_MAX_LOCKS. */ - if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))) + if (unlikely(CFS_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))) idx = master_index; - idx_array[i + 1] = (idx + 1) % + mdt_indices[stripe_idx + 1] = (idx + 1) % (lod->lod_remote_mdt_count + 1); } /* tgt_dt and fid must be ready after search avaible OSP @@ -2084,73 +2136,254 @@ static int lod_prep_md_striped_create(const struct lu_env *env, LASSERT(fid_is_sane(&fid)); /* fail a remote stripe FID allocation */ - if (i && OBD_FAIL_CHECK(OBD_FAIL_MDS_STRIPE_FID)) + if (stripe_idx && CFS_FAIL_CHECK(OBD_FAIL_MDS_STRIPE_FID)) continue; - conf.loc_flags = LOC_F_NEW; dto = dt_locate_at(env, tgt_dt, &fid, - dt->do_lu.lo_dev->ld_site->ls_top_dev, - &conf); - if (IS_ERR(dto)) - GOTO(out_put, rc = PTR_ERR(dto)); - stripe[i] = dto; - } - - lo->ldo_dir_striped = 1; - lo->ldo_stripe = stripe; - lo->ldo_dir_stripe_count = i; - lo->ldo_dir_stripes_allocated = stripe_count; - smp_mb(); - lo->ldo_dir_stripe_loaded = 1; - - if (lo->ldo_dir_stripe_count == 0) - GOTO(out_put, rc = -ENOSPC); - - rc = lod_dir_declare_create_stripes(env, dt, attr, dof, th); - if (rc != 0) - GOTO(out_put, rc); + lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev, + &conf); + if (IS_ERR(dto)) { + rc = PTR_ERR(dto); + goto error; + } -out_put: - if (rc < 0) { - for (i = 0; i < stripe_count; i++) - if (stripe[i] != NULL) - dt_object_put(env, stripe[i]); - OBD_FREE(stripe, sizeof(stripe[0]) * stripe_count); - lo->ldo_dir_stripe_count = 0; - lo->ldo_dir_stripes_allocated = 0; - lo->ldo_stripe = NULL; + stripes[stripe_idx] = dto; } -out_free: - OBD_FREE(idx_array, sizeof(idx_array[0]) * stripe_count); + return stripe_idx; - RETURN(rc); +error: + for (j = 1; j < stripe_idx; j++) { + LASSERT(stripes[j] != NULL); + dt_object_put(env, stripes[j]); + stripes[j] = NULL; + } + return rc; } -/** - * - * Alloc cached foreign LMV - * - * \param[in] lo object - * \param[in] size size of foreign LMV - * - * \retval 0 on success - * \retval negative if failed - */ -int lod_alloc_foreign_lmv(struct lod_object *lo, size_t size) +static int lod_prep_md_striped_create(const struct lu_env *env, + struct dt_object *dt, + struct lu_attr *attr, + const struct lmv_user_md_v1 *lum, + struct dt_object_format *dof, + struct thandle *th) { - OBD_ALLOC_LARGE(lo->ldo_foreign_lmv, size); - if (lo->ldo_foreign_lmv == NULL) - return -ENOMEM; - lo->ldo_foreign_lmv_size = size; - lo->ldo_dir_is_foreign = 1; + struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev); + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object **stripes; + struct lu_object_conf conf = { .loc_flags = LOC_F_NEW }; + struct lu_fid fid = { 0 }; + int mdt_count = lod->lod_remote_mdt_count + 1; + __u32 stripe_count; + int i; + int rc = 0; - return 0; -} + ENTRY; -/** - * Declare create striped md object. - * + /* The lum has been verifed in lod_verify_md_striping */ + LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC || + le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC_SPECIFIC); + + stripe_count = lo->ldo_dir_stripe_count; + if (!(lo->ldo_dir_hash_type & LMV_HASH_FLAG_OVERSTRIPED) && + stripe_count > mdt_count) + RETURN(-E2BIG); + + if ((lo->ldo_dir_hash_type & LMV_HASH_FLAG_OVERSTRIPED) && + (stripe_count > mdt_count * LMV_MAX_STRIPES_PER_MDT || + /* a single MDT doesn't initialize the infrastructure for striped + * directories, so we just don't support overstriping in that case + */ + mdt_count == 1)) + RETURN(-E2BIG); + + OBD_ALLOC_PTR_ARRAY(stripes, stripe_count); + if (!stripes) + RETURN(-ENOMEM); + + /* Allocate the first stripe locally */ + rc = dt_fid_alloc(env, lod->lod_child, &fid, NULL, NULL); + if (rc < 0) + GOTO(out, rc); + + stripes[0] = dt_locate_at(env, lod->lod_child, &fid, + dt->do_lu.lo_dev->ld_site->ls_top_dev, &conf); + if (IS_ERR(stripes[0])) + GOTO(out, rc = PTR_ERR(stripes[0])); + + if (lo->ldo_dir_stripe_offset == LMV_OFFSET_DEFAULT) { + lod_qos_statfs_update(env, lod, &lod->lod_mdt_descs); + rc = lod_mdt_alloc_qos(env, lo, stripes, 1, stripe_count); + if (rc == -EAGAIN) + rc = lod_mdt_alloc_rr(env, lo, stripes, 1, + stripe_count); + } else { + int *idx_array; + bool is_specific = false; + + OBD_ALLOC_PTR_ARRAY(idx_array, stripe_count); + if (!idx_array) + GOTO(out, rc = -ENOMEM); + + if (le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC_SPECIFIC) { + int stripes_per_mdt; + int mdt; + + is_specific = true; + + /* Verify we do not exceed the stripes per MDT limit */ + for (mdt = 0; mdt < mdt_count + 1; mdt++) { + stripes_per_mdt = 0; + for (i = 0; i < stripe_count; i++) { + if (mdt == le32_to_cpu( + lum->lum_objects[i].lum_mds)) + stripes_per_mdt++; + } + if (stripes_per_mdt > LMV_MAX_STRIPES_PER_MDT) + GOTO(out_free, rc = -EINVAL); + } + + for (i = 0; i < stripe_count; i++) + idx_array[i] = + le32_to_cpu(lum->lum_objects[i].lum_mds); + } + + /* stripe 0 is local */ + idx_array[0] = + lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id; + rc = lod_mdt_alloc_specific(env, lo, stripes, idx_array, + is_specific); +out_free: + OBD_FREE_PTR_ARRAY(idx_array, stripe_count); + } + + if (rc < 0) + GOTO(out, rc); + + LASSERT(rc > 0); + + lo->ldo_dir_striped = 1; + lo->ldo_stripe = stripes; + lo->ldo_dir_stripe_count = rc; + lo->ldo_dir_stripes_allocated = stripe_count; + smp_mb(); + lo->ldo_dir_stripe_loaded = 1; + + rc = lod_dir_declare_create_stripes(env, dt, attr, dof, th); + if (rc < 0) + lod_striping_free(env, lo); + + RETURN(rc); + +out: + LASSERT(rc < 0); + if (!IS_ERR_OR_NULL(stripes[0])) + dt_object_put(env, stripes[0]); + for (i = 1; i < stripe_count; i++) + LASSERT(!stripes[i]); + OBD_FREE_PTR_ARRAY(stripes, stripe_count); + + return rc; +} + +/** + * + * Alloc cached foreign LOV + * + * \param[in] lo object + * \param[in] size size of foreign LOV + * + * \retval 0 on success + * \retval negative if failed + */ +int lod_alloc_foreign_lov(struct lod_object *lo, size_t size) +{ + OBD_ALLOC_LARGE(lo->ldo_foreign_lov, size); + if (lo->ldo_foreign_lov == NULL) + return -ENOMEM; + lo->ldo_foreign_lov_size = size; + lo->ldo_is_foreign = 1; + return 0; +} + +/** + * + * Free cached foreign LOV + * + * \param[in] lo object + */ +void lod_free_foreign_lov(struct lod_object *lo) +{ + if (lo->ldo_foreign_lov != NULL) + OBD_FREE_LARGE(lo->ldo_foreign_lov, lo->ldo_foreign_lov_size); + lo->ldo_foreign_lov = NULL; + lo->ldo_foreign_lov_size = 0; + lo->ldo_is_foreign = 0; +} + +/** + * + * Alloc cached foreign LMV + * + * \param[in] lo object + * \param[in] size size of foreign LMV + * + * \retval 0 on success + * \retval negative if failed + */ +static int lod_alloc_foreign_lmv(struct lod_object *lo, size_t size) +{ + OBD_ALLOC_LARGE(lo->ldo_foreign_lmv, size); + if (lo->ldo_foreign_lmv == NULL) + return -ENOMEM; + lo->ldo_foreign_lmv_size = size; + lo->ldo_is_foreign = 1; + + return 0; +} + +static int lod_prep_md_replayed_create(const struct lu_env *env, + struct dt_object *dt, + struct lu_attr *attr, + const struct lu_buf *lmv_buf, + struct dt_object_format *dof, + struct thandle *th) +{ + struct lod_object *lo = lod_dt_obj(dt); + int rc; + + ENTRY; + + mutex_lock(&lo->ldo_layout_mutex); + rc = lod_parse_dir_striping(env, lo, lmv_buf); + if (rc == 0) { + lo->ldo_dir_stripe_loaded = 1; + lo->ldo_dir_striped = 1; + rc = lod_dir_declare_create_stripes(env, dt, attr, dof, th); + } + mutex_unlock(&lo->ldo_layout_mutex); + + RETURN(rc); +} + +/** + * + * Free cached foreign LMV + * + * \param[in] lo object + */ +static void lod_free_foreign_lmv(struct lod_object *lo) +{ + if (lo->ldo_foreign_lmv != NULL) + OBD_FREE_LARGE(lo->ldo_foreign_lmv, lo->ldo_foreign_lmv_size); + lo->ldo_foreign_lmv = NULL; + lo->ldo_foreign_lmv_size = 0; + lo->ldo_is_foreign = 0; +} + +/** + * Declare create striped md object. + * * The function declares intention to create a striped directory. This is a * wrapper for lod_prep_md_striped_create(). The only additional functionality * is to verify pattern \a lum_buf is good. Check that function for the details. @@ -2174,353 +2407,164 @@ static int lod_declare_xattr_set_lmv(const struct lu_env *env, struct dt_object_format *dof, struct thandle *th) { - struct lod_object *lo = lod_dt_obj(dt); - struct lmv_user_md_v1 *lum = lum_buf->lb_buf; - int rc; - ENTRY; + struct lod_object *lo = lod_dt_obj(dt); + struct lmv_user_md_v1 *lum = lum_buf->lb_buf; + int rc; + ENTRY; LASSERT(lum != NULL); - CDEBUG(D_INFO, "lum magic = %x count = %u offset = %d\n", - le32_to_cpu(lum->lum_magic), le32_to_cpu(lum->lum_stripe_count), - (int)le32_to_cpu(lum->lum_stripe_offset)); + CDEBUG(D_INFO, + "lum magic=%x hash=%x count=%u offset=%d inherit=%u rr=%u\n", + le32_to_cpu(lum->lum_magic), le32_to_cpu(lum->lum_hash_type), + le32_to_cpu(lum->lum_stripe_count), + (int)le32_to_cpu(lum->lum_stripe_offset), + lum->lum_max_inherit, lum->lum_max_inherit_rr); if (lo->ldo_dir_stripe_count == 0) { - if (lo->ldo_dir_is_foreign) { + if (lo->ldo_is_foreign) { rc = lod_alloc_foreign_lmv(lo, lum_buf->lb_len); if (rc != 0) - GOTO(out, rc); + RETURN(rc); memcpy(lo->ldo_foreign_lmv, lum, lum_buf->lb_len); lo->ldo_dir_stripe_loaded = 1; } - GOTO(out, rc = 0); + RETURN(0); } - /* prepare dir striped objects */ - rc = lod_prep_md_striped_create(env, dt, attr, lum, dof, th); - if (rc != 0) { + /* client replay striped directory creation with LMV, this happens when + * all involved MDTs were rebooted, or MDT recovery was aborted. + */ + if (le32_to_cpu(lum->lum_magic) == LMV_MAGIC_V1) + rc = lod_prep_md_replayed_create(env, dt, attr, lum_buf, dof, + th); + else + rc = lod_prep_md_striped_create(env, dt, attr, lum, dof, th); + if (rc != 0) /* failed to create striping, let's reset * config so that others don't get confused */ lod_striping_free(env, lo); - GOTO(out, rc); - } -out: + RETURN(rc); } /** - * Append source stripes after target stripes for migrating directory. NB, we - * only need to declare this, the append is done inside lod_xattr_set_lmv(). + * Set or replace striped directory layout, and LFSCK may set layout on a plain + * directory, so don't check stripe count. * * \param[in] env execution environment * \param[in] dt target object - * \param[in] buf LMV buf which contains source stripe fids + * \param[in] lmv_buf LMV buf which contains source stripe FIDs + * \param[in] fl set or replace * \param[in] th transaction handle * * \retval 0 on success * \retval negative if failed */ -static int lod_dir_declare_layout_add(const struct lu_env *env, - struct dt_object *dt, - const struct lu_buf *buf, - struct thandle *th) +static int lod_dir_layout_set(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *lmv_buf, + int fl, + struct thandle *th) { - struct lod_thread_info *info = lod_env_info(env); - struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev); - struct lod_tgt_descs *ltd = &lod->lod_mdt_descs; - struct lod_object *lo = lod_dt_obj(dt); struct dt_object *next = dt_object_child(dt); - struct dt_object_format *dof = &info->lti_format; - struct lmv_mds_md_v1 *lmv = buf->lb_buf; - struct dt_object **stripe; - __u32 stripe_count = le32_to_cpu(lmv->lmv_stripe_count); - struct lu_fid *fid = &info->lti_fid; - struct lod_tgt_desc *tgt; - struct dt_object *dto; - struct dt_device *tgt_dt; - int type = LU_SEQ_RANGE_ANY; - struct dt_insert_rec *rec = &info->lti_dt_rec; - char *stripe_name = info->lti_key; - struct lu_name *sname; - struct linkea_data ldata = { NULL }; - struct lu_buf linkea_buf; - __u32 idx; + struct lod_object *lo = lod_dt_obj(dt); + struct lod_device *lod = lu2lod_dev(lod2lu_obj(lo)->lo_dev); + struct lmv_mds_md_v1 *lmv = lmv_buf->lb_buf; + struct lmv_mds_md_v1 *slave_lmv; + struct lu_buf slave_buf; int i; int rc; ENTRY; - if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1) - RETURN(-EINVAL); - - if (stripe_count == 0) + if (!lmv_is_sane2(lmv)) RETURN(-EINVAL); - dof->dof_type = DFT_DIR; + /* adjust hash for dir merge, which may not be set in user command */ + if (lmv_is_merging(lmv) && + !(lmv->lmv_migrate_hash & LMV_HASH_TYPE_MASK)) + lmv->lmv_merge_hash |= + lod->lod_mdt_descs.ltd_lmv_desc.ld_pattern & + LMV_HASH_TYPE_MASK; - OBD_ALLOC(stripe, - sizeof(*stripe) * (lo->ldo_dir_stripe_count + stripe_count)); - if (stripe == NULL) - RETURN(-ENOMEM); + LMV_DEBUG(D_INFO, lmv, "set"); - for (i = 0; i < lo->ldo_dir_stripe_count; i++) - stripe[i] = lo->ldo_stripe[i]; + rc = lod_sub_xattr_set(env, next, lmv_buf, XATTR_NAME_LMV, fl, th); + if (rc) + RETURN(rc); - for (i = 0; i < stripe_count; i++) { - fid_le_to_cpu(fid, - &lmv->lmv_stripe_fids[i]); - if (!fid_is_sane(fid)) - continue; + /* directory restripe may update stripe LMV directly */ + if (!lo->ldo_dir_stripe_count) + RETURN(0); - rc = lod_fld_lookup(env, lod, fid, &idx, &type); - if (rc) - GOTO(out, rc); + lo->ldo_dir_hash_type = le32_to_cpu(lmv->lmv_hash_type); + lo->ldo_dir_migrate_offset = le32_to_cpu(lmv->lmv_migrate_offset); + lo->ldo_dir_migrate_hash = le32_to_cpu(lmv->lmv_migrate_hash); + lo->ldo_dir_layout_version = le32_to_cpu(lmv->lmv_layout_version); - if (idx == lod2lu_dev(lod)->ld_site->ld_seq_site->ss_node_id) { - tgt_dt = lod->lod_child; - } else { - tgt = LTD_TGT(ltd, idx); - if (tgt == NULL) - GOTO(out, rc = -ESTALE); - tgt_dt = tgt->ltd_tgt; - } + OBD_ALLOC_PTR(slave_lmv); + if (!slave_lmv) + RETURN(-ENOMEM); - dto = dt_locate_at(env, tgt_dt, fid, - lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev, - NULL); - if (IS_ERR(dto)) - GOTO(out, rc = PTR_ERR(dto)); + lod_prep_slave_lmv_md(slave_lmv, lmv); + slave_buf.lb_buf = slave_lmv; + slave_buf.lb_len = sizeof(*slave_lmv); - stripe[i + lo->ldo_dir_stripe_count] = dto; + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + if (!lo->ldo_stripe[i]) + continue; - if (!dt_try_as_dir(env, dto)) - GOTO(out, rc = -ENOTDIR); + if (!dt_object_exists(lo->ldo_stripe[i])) + continue; - rc = lod_sub_declare_ref_add(env, dto, th); + rc = lod_sub_xattr_set(env, lo->ldo_stripe[i], &slave_buf, + XATTR_NAME_LMV, fl, th); if (rc) - GOTO(out, rc); + break; + } - rc = lod_sub_declare_insert(env, dto, - (const struct dt_rec *)rec, - (const struct dt_key *)dot, th); - if (rc) - GOTO(out, rc); + OBD_FREE_PTR(slave_lmv); - rc = lod_sub_declare_insert(env, dto, - (const struct dt_rec *)rec, - (const struct dt_key *)dotdot, th); - if (rc) - GOTO(out, rc); + RETURN(rc); +} - rc = lod_sub_declare_xattr_set(env, dto, buf, - XATTR_NAME_LMV, 0, th); - if (rc) - GOTO(out, rc); +/** + * Implementation of dt_object_operations::do_declare_xattr_set. + * + * Used with regular (non-striped) objects. Basically it + * initializes the striping information and applies the + * change to all the stripes. + * + * \see dt_object_operations::do_declare_xattr_set() in the API description + * for details. + */ +static int lod_dir_declare_xattr_set(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + const char *name, int fl, + struct thandle *th) +{ + struct dt_object *next = dt_object_child(dt); + struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); + struct lod_object *lo = lod_dt_obj(dt); + int i; + int rc; + ENTRY; - snprintf(stripe_name, sizeof(info->lti_key), DFID":%u", - PFID(lu_object_fid(&dto->do_lu)), - i + lo->ldo_dir_stripe_count); + if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { + struct lmv_user_md_v1 *lum; - sname = lod_name_get(env, stripe_name, strlen(stripe_name)); - rc = linkea_links_new(&ldata, &info->lti_linkea_buf, - sname, lu_object_fid(&dt->do_lu)); - if (rc) - GOTO(out, rc); - - linkea_buf.lb_buf = ldata.ld_buf->lb_buf; - linkea_buf.lb_len = ldata.ld_leh->leh_len; - rc = lod_sub_declare_xattr_set(env, dto, &linkea_buf, - XATTR_NAME_LINK, 0, th); - if (rc) - GOTO(out, rc); - - rc = lod_sub_declare_insert(env, next, - (const struct dt_rec *)rec, - (const struct dt_key *)stripe_name, - th); - if (rc) - GOTO(out, rc); - - rc = lod_sub_declare_ref_add(env, next, th); - if (rc) - GOTO(out, rc); - } - - if (lo->ldo_stripe) - OBD_FREE(lo->ldo_stripe, - sizeof(*stripe) * lo->ldo_dir_stripes_allocated); - lo->ldo_stripe = stripe; - lo->ldo_dir_migrate_offset = lo->ldo_dir_stripe_count; - lo->ldo_dir_migrate_hash = le32_to_cpu(lmv->lmv_hash_type); - lo->ldo_dir_stripe_count += stripe_count; - lo->ldo_dir_stripes_allocated += stripe_count; - lo->ldo_dir_hash_type |= LMV_HASH_FLAG_MIGRATION; - - RETURN(0); -out: - i = lo->ldo_dir_stripe_count; - while (i < lo->ldo_dir_stripe_count + stripe_count && stripe[i]) - dt_object_put(env, stripe[i++]); - - OBD_FREE(stripe, - sizeof(*stripe) * (stripe_count + lo->ldo_dir_stripe_count)); - RETURN(rc); -} - -static int lod_dir_declare_layout_delete(const struct lu_env *env, - struct dt_object *dt, - const struct lu_buf *buf, - struct thandle *th) -{ - struct lod_thread_info *info = lod_env_info(env); - struct lod_object *lo = lod_dt_obj(dt); - struct dt_object *next = dt_object_child(dt); - struct lmv_user_md *lmu = buf->lb_buf; - __u32 final_stripe_count; - char *stripe_name = info->lti_key; - struct dt_object *dto; - int i; - int rc = 0; - - if (!lmu) - return -EINVAL; - - final_stripe_count = le32_to_cpu(lmu->lum_stripe_count); - if (final_stripe_count >= lo->ldo_dir_stripe_count) - return -EINVAL; - - for (i = final_stripe_count; i < lo->ldo_dir_stripe_count; i++) { - dto = lo->ldo_stripe[i]; - if (!dto) - continue; - - if (!dt_try_as_dir(env, dto)) - return -ENOTDIR; - - rc = lod_sub_declare_delete(env, dto, - (const struct dt_key *)dot, th); - if (rc) - return rc; - - rc = lod_sub_declare_ref_del(env, dto, th); - if (rc) - return rc; - - rc = lod_sub_declare_delete(env, dto, - (const struct dt_key *)dotdot, th); - if (rc) - return rc; - - snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", - PFID(lu_object_fid(&dto->do_lu)), i); - - rc = lod_sub_declare_delete(env, next, - (const struct dt_key *)stripe_name, th); - if (rc) - return rc; - - rc = lod_sub_declare_ref_del(env, next, th); - if (rc) - return rc; - } - - return 0; -} - -/* - * delete stripes from dir master object, the lum_stripe_count in argument is - * the final stripe count, the stripes after that will be deleted, NB, they - * are not destroyed, but deleted from it's parent namespace, this function - * will be called in two places: - * 1. mdd_migrate_create() delete stripes from source, and append them to - * target. - * 2. mdd_dir_layout_shrink() delete stripes from source, and destroy them. - */ -static int lod_dir_layout_delete(const struct lu_env *env, - struct dt_object *dt, - const struct lu_buf *buf, - struct thandle *th) -{ - struct lod_thread_info *info = lod_env_info(env); - struct lod_object *lo = lod_dt_obj(dt); - struct dt_object *next = dt_object_child(dt); - struct lmv_user_md *lmu = buf->lb_buf; - __u32 final_stripe_count; - char *stripe_name = info->lti_key; - struct dt_object *dto; - int i; - int rc = 0; - - ENTRY; - - if (!lmu) - RETURN(-EINVAL); - - final_stripe_count = le32_to_cpu(lmu->lum_stripe_count); - if (final_stripe_count >= lo->ldo_dir_stripe_count) - RETURN(-EINVAL); - - for (i = final_stripe_count; i < lo->ldo_dir_stripe_count; i++) { - dto = lo->ldo_stripe[i]; - if (!dto) - continue; - - rc = lod_sub_delete(env, dto, - (const struct dt_key *)dotdot, th); - if (rc) - break; - - snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", - PFID(lu_object_fid(&dto->do_lu)), i); - - rc = lod_sub_delete(env, next, - (const struct dt_key *)stripe_name, th); - if (rc) - break; - - rc = lod_sub_ref_del(env, next, th); - if (rc) - break; - } - - lod_striping_free(env, lod_dt_obj(dt)); - - RETURN(rc); -} - -/** - * Implementation of dt_object_operations::do_declare_xattr_set. - * - * Used with regular (non-striped) objects. Basically it - * initializes the striping information and applies the - * change to all the stripes. - * - * \see dt_object_operations::do_declare_xattr_set() in the API description - * for details. - */ -static int lod_dir_declare_xattr_set(const struct lu_env *env, - struct dt_object *dt, - const struct lu_buf *buf, - const char *name, int fl, - struct thandle *th) -{ - struct dt_object *next = dt_object_child(dt); - struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); - struct lod_object *lo = lod_dt_obj(dt); - int i; - int rc; - ENTRY; - - if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { - struct lmv_user_md_v1 *lum; + LASSERT(buf != NULL); + if (!buf->lb_buf || buf->lb_len < sizeof(*lum)) + RETURN(-EFAULT); - LASSERT(buf != NULL && buf->lb_buf != NULL); lum = buf->lb_buf; rc = lod_verify_md_striping(d, lum); if (rc != 0) RETURN(rc); } else if (strcmp(name, XATTR_NAME_LOV) == 0) { - rc = lod_verify_striping(d, lo, buf, false); + rc = lod_verify_striping(env, d, lo, buf, false); if (rc != 0) RETURN(rc); } @@ -2670,21 +2714,31 @@ static int lod_replace_parent_fid(const struct lu_env *env, RETURN(rc); } -inline __u16 lod_comp_entry_stripe_count(struct lod_object *lo, - struct lod_layout_component *entry, - bool is_dir) +__u16 lod_comp_entry_stripe_count(struct lod_object *lo, int comp_idx, + bool is_dir) { struct lod_device *lod = lu2lod_dev(lod2lu_obj(lo)->lo_dev); + struct lod_layout_component *entry; + enum lod_uses_hint flags = LOD_USES_ASSIGNED_STRIPE; + + if (is_dir) { + entry = &lo->ldo_def_striping->lds_def_comp_entries[comp_idx]; + return entry->llc_ostlist.op_count; + } - if (is_dir) - return 0; - else if (lod_comp_inited(entry)) + entry = &lo->ldo_comp_entries[comp_idx]; + if (lod_comp_inited(entry)) return entry->llc_stripe_count; - else if ((__u16)-1 == entry->llc_stripe_count) - return lod->lod_desc.ld_tgt_count; - else - return lod_get_stripe_count(lod, lo, - entry->llc_stripe_count, false); + if (entry->llc_stripe_count == LOV_ALL_STRIPES) + return lod_get_stripe_count_plain(lod, lo, + entry->llc_stripe_count, + entry->llc_pattern & + LOV_PATTERN_OVERSTRIPING, + &flags); + + return lod_get_stripe_count(lod, lo, comp_idx, entry->llc_stripe_count, + entry->llc_pattern & LOV_PATTERN_OVERSTRIPING, + &flags); } static int lod_comp_md_size(struct lod_object *lo, bool is_dir) @@ -2719,14 +2773,21 @@ static int lod_comp_md_size(struct lod_object *lo, bool is_dir) for (i = 0; i < comp_cnt; i++) { __u16 stripe_count; - magic = comp_entries[i].llc_pool ? LOV_MAGIC_V3 : LOV_MAGIC_V1; - stripe_count = lod_comp_entry_stripe_count(lo, &comp_entries[i], - is_dir); - if (!is_dir && is_composite) - lod_comp_shrink_stripe_count(&comp_entries[i], - &stripe_count); - - size += lov_user_md_size(stripe_count, magic); + if (comp_entries[i].llc_magic == LOV_MAGIC_FOREIGN) { + size += lov_foreign_md_size(comp_entries[i].llc_length); + } else { + magic = comp_entries[i].llc_pool ? LOV_MAGIC_V3 : + LOV_MAGIC_V1; + stripe_count = lod_comp_entry_stripe_count(lo, i, + is_dir); + if (!is_dir && is_composite) + lod_comp_shrink_stripe_count(&comp_entries[i], + &stripe_count); + if (is_dir && comp_entries[i].llc_ostlist.op_count) + magic = LOV_MAGIC_SPECIFIC; + + size += lov_user_md_size(stripe_count, magic); + } LASSERT(size % sizeof(__u64) == 0); } return size; @@ -2752,14 +2813,13 @@ static int lod_declare_layout_add(const struct lu_env *env, { struct lod_thread_info *info = lod_env_info(env); struct lod_layout_component *comp_array, *lod_comp, *old_array; - struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); + struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); struct dt_object *next = dt_object_child(dt); - struct lov_desc *desc = &d->lod_desc; - struct lod_object *lo = lod_dt_obj(dt); - struct lov_user_md_v3 *v3; - struct lov_comp_md_v1 *comp_v1 = buf->lb_buf; - __u32 magic; - int i, rc, array_cnt, old_array_cnt; + struct lov_desc *desc = &d->lod_ost_descs.ltd_lov_desc; + struct lod_object *lo = lod_dt_obj(dt); + struct lov_comp_md_v1 *comp_v1 = buf->lb_buf; + __u32 magic; + int i, rc, array_cnt, old_array_cnt; ENTRY; LASSERT(lo->ldo_is_composite); @@ -2767,7 +2827,7 @@ static int lod_declare_layout_add(const struct lu_env *env, if (lo->ldo_flr_state != LCM_FL_NONE) RETURN(-EBUSY); - rc = lod_verify_striping(d, lo, buf, false); + rc = lod_verify_striping(env, d, lo, buf, false); if (rc != 0) RETURN(rc); @@ -2780,10 +2840,15 @@ static int lod_declare_layout_add(const struct lu_env *env, if (magic != LOV_USER_MAGIC_COMP_V1) RETURN(-EINVAL); + mutex_lock(&lo->ldo_layout_mutex); + array_cnt = lo->ldo_comp_cnt + comp_v1->lcm_entry_count; - OBD_ALLOC(comp_array, sizeof(*comp_array) * array_cnt); - if (comp_array == NULL) + OBD_ALLOC_PTR_ARRAY(comp_array, array_cnt); + if (comp_array == NULL) { + mutex_unlock(&lo->ldo_layout_mutex); RETURN(-ENOMEM); + } + memcpy(comp_array, lo->ldo_comp_entries, sizeof(*comp_array) * lo->ldo_comp_cnt); @@ -2802,13 +2867,37 @@ static int lod_declare_layout_add(const struct lu_env *env, lod_comp->llc_stripe_offset = v1->lmm_stripe_offset; lod_comp->llc_flags = comp_v1->lcm_entries[i].lcme_flags; - lod_comp->llc_stripe_count = v1->lmm_stripe_count; lod_comp->llc_stripe_size = v1->lmm_stripe_size; - lod_adjust_stripe_info(lod_comp, desc); + lod_comp->llc_stripe_count = v1->lmm_stripe_count; + lod_comp->llc_pattern = v1->lmm_pattern; + /** + * limit stripe count so that it's less than/equal to + * extent_size / stripe_size. + * + * Note: extension size reused llc_stripe_size field and + * uninstantiated component could be defined with + * extent_start == extent_end as extension component will + * expand it later. + */ + if (!(lod_comp->llc_flags & LCME_FL_EXTENSION) && + (lod_comp_inited(lod_comp) || + lod_comp->llc_extent.e_start < + lod_comp->llc_extent.e_end) && + lod_comp->llc_stripe_count != LOV_ALL_STRIPES && + ext->e_end != OBD_OBJECT_EOF && + (__u64)(lod_comp->llc_stripe_count * + lod_comp->llc_stripe_size) > + (ext->e_end - ext->e_start)) + lod_comp->llc_stripe_count = + DIV_ROUND_UP(ext->e_end - ext->e_start, + lod_comp->llc_stripe_size); + lod_adjust_stripe_info(lod_comp, desc, 0); if (v1->lmm_magic == LOV_USER_MAGIC_V3) { - v3 = (struct lov_user_md_v3 *) v1; - if (v3->lmm_pool_name[0] != '\0') { + struct lov_user_md_v3 *v3 = (typeof(*v3) *) v1; + + if (v3->lmm_pool_name[0] != '\0' && + !lov_pool_is_ignored(v3->lmm_pool_name)) { rc = lod_set_pool(&lod_comp->llc_pool, v3->lmm_pool_name); if (rc) @@ -2835,11 +2924,13 @@ static int lod_declare_layout_add(const struct lu_env *env, GOTO(error, rc); } - OBD_FREE(old_array, sizeof(*lod_comp) * old_array_cnt); + OBD_FREE_PTR_ARRAY(old_array, old_array_cnt); LASSERT(lo->ldo_mirror_count == 1); lo->ldo_mirrors[0].lme_end = array_cnt - 1; + mutex_unlock(&lo->ldo_layout_mutex); + RETURN(0); error: @@ -2851,15 +2942,55 @@ error: lod_comp->llc_pool = NULL; } } - OBD_FREE(comp_array, sizeof(*comp_array) * array_cnt); + OBD_FREE_PTR_ARRAY(comp_array, array_cnt); + mutex_unlock(&lo->ldo_layout_mutex); + RETURN(rc); } /** + * lod_last_non_stale_mirror() - Check if a mirror is the last non-stale mirror. + * @mirror_id: Mirror id to be checked. + * @lo: LOD object. + * + * This function checks if a mirror with specified @mirror_id is the last + * non-stale mirror of a LOD object @lo. + * + * Return: true or false. + */ +static inline +bool lod_last_non_stale_mirror(__u16 mirror_id, struct lod_object *lo) +{ + struct lod_layout_component *lod_comp; + bool has_stale_flag; + int i; + + for (i = 0; i < lo->ldo_mirror_count; i++) { + if (lo->ldo_mirrors[i].lme_id == mirror_id || + lo->ldo_mirrors[i].lme_stale) + continue; + + has_stale_flag = false; + lod_foreach_mirror_comp(lod_comp, lo, i) { + if (lod_comp->llc_flags & LCME_FL_STALE) { + has_stale_flag = true; + break; + } + } + if (!has_stale_flag) + return false; + } + + return true; +} + +/** * Declare component set. The xattr is name XATTR_LUSTRE_LOV.set.$field, * the '$field' can only be 'flags' now. The xattr value is binary * lov_comp_md_v1 which contains the component ID(s) and the value of * the field to be modified. + * Please update allowed_lustre_lov macro if $field groks more values + * in the future. * * \param[in] env execution environment * \param[in] dt dt_object to be modified @@ -2885,6 +3016,9 @@ static int lod_declare_layout_set(const struct lu_env *env, bool changed = false; ENTRY; + /* Please update allowed_lustre_lov macro if op + * groks more values in the future + */ if (strcmp(op, "set.flags") != 0) { CDEBUG(D_LAYOUT, "%s: operation (%s) not supported.\n", lod2obd(d)->obd_name, op); @@ -2906,15 +3040,18 @@ static int lod_declare_layout_set(const struct lu_env *env, RETURN(-EINVAL); } + mutex_lock(&lo->ldo_layout_mutex); for (i = 0; i < comp_v1->lcm_entry_count; i++) { __u32 id = comp_v1->lcm_entries[i].lcme_id; __u32 flags = comp_v1->lcm_entries[i].lcme_flags; __u32 mirror_flag = flags & LCME_MIRROR_FLAGS; + __u16 mirror_id = mirror_id_of(id); bool neg = flags & LCME_FL_NEG; if (flags & LCME_FL_INIT) { if (changed) - lod_striping_free(env, lo); + lod_striping_free_nolock(env, lo); + mutex_unlock(&lo->ldo_layout_mutex); RETURN(-EINVAL); } @@ -2924,7 +3061,7 @@ static int lod_declare_layout_set(const struct lu_env *env, /* lfs only put one flag in each entry */ if ((flags && id != lod_comp->llc_id) || - (mirror_flag && mirror_id_of(id) != + (mirror_flag && mirror_id != mirror_id_of(lod_comp->llc_id))) continue; @@ -2934,8 +3071,16 @@ static int lod_declare_layout_set(const struct lu_env *env, if (mirror_flag) lod_comp->llc_flags &= ~mirror_flag; } else { - if (flags) + if (flags) { + if ((flags & LCME_FL_STALE) && + lod_last_non_stale_mirror(mirror_id, + lo)) { + mutex_unlock( + &lo->ldo_layout_mutex); + RETURN(-EUCLEAN); + } lod_comp->llc_flags |= flags; + } if (mirror_flag) { lod_comp->llc_flags |= mirror_flag; if (mirror_flag & LCME_FL_NOSYNC) @@ -2946,6 +3091,7 @@ static int lod_declare_layout_set(const struct lu_env *env, changed = true; } } + mutex_unlock(&lo->ldo_layout_mutex); if (!changed) { CDEBUG(D_LAYOUT, "%s: requested component(s) not found.\n", @@ -3028,9 +3174,13 @@ static int lod_declare_layout_del(const struct lu_env *env, flags = 0; } + mutex_lock(&lo->ldo_layout_mutex); + left = lo->ldo_comp_cnt; - if (left <= 0) + if (left <= 0) { + mutex_unlock(&lo->ldo_layout_mutex); RETURN(-EINVAL); + } for (i = (lo->ldo_comp_cnt - 1); i >= 0; i--) { struct lod_layout_component *lod_comp; @@ -3047,6 +3197,7 @@ static int lod_declare_layout_del(const struct lu_env *env, if (left != (i + 1)) { CDEBUG(D_LAYOUT, "%s: this deletion will create " "a hole.\n", lod2obd(d)->obd_name); + mutex_unlock(&lo->ldo_layout_mutex); RETURN(-EINVAL); } left--; @@ -3065,8 +3216,10 @@ static int lod_declare_layout_del(const struct lu_env *env, if (obj == NULL) continue; rc = lod_sub_declare_destroy(env, obj, th); - if (rc) + if (rc) { + mutex_unlock(&lo->ldo_layout_mutex); RETURN(rc); + } } } @@ -3074,9 +3227,12 @@ static int lod_declare_layout_del(const struct lu_env *env, if (left == lo->ldo_comp_cnt) { CDEBUG(D_LAYOUT, "%s: requested component id:%#x not found\n", lod2obd(d)->obd_name, id); + mutex_unlock(&lo->ldo_layout_mutex); RETURN(-EINVAL); } + mutex_unlock(&lo->ldo_layout_mutex); + memset(attr, 0, sizeof(*attr)); attr->la_valid = LA_SIZE; rc = lod_sub_declare_attr_set(env, next, attr, th); @@ -3198,13 +3354,13 @@ static int lod_layout_convert(struct lod_thread_info *info) } lcm = info->lti_ea_store; + memset(lcm, 0, sizeof(*lcm) + sizeof(*lcme)); lcm->lcm_magic = cpu_to_le32(LOV_MAGIC_COMP_V1); lcm->lcm_size = cpu_to_le32(size); lcm->lcm_layout_gen = cpu_to_le32(le16_to_cpu( lmm_save->lmm_layout_gen)); lcm->lcm_flags = cpu_to_le16(LCM_FL_NONE); lcm->lcm_entry_count = cpu_to_le16(1); - lcm->lcm_mirror_count = 0; lcme = &lcm->lcm_entries[0]; lcme->lcme_flags = cpu_to_le32(LCME_FL_INIT); @@ -3226,16 +3382,19 @@ out: * Merge layouts to form a mirrored file. */ static int lod_declare_layout_merge(const struct lu_env *env, - struct dt_object *dt, const struct lu_buf *mbuf, - struct thandle *th) + struct dt_object *dt, + const struct lu_buf *mbuf, + struct thandle *th) { - struct lod_thread_info *info = lod_env_info(env); - struct lu_buf *buf = &info->lti_buf; - struct lod_object *lo = lod_dt_obj(dt); - struct lov_comp_md_v1 *lcm; - struct lov_comp_md_v1 *cur_lcm; - struct lov_comp_md_v1 *merge_lcm; - struct lov_comp_md_entry_v1 *lcme; + struct lod_thread_info *info = lod_env_info(env); + struct lu_attr *layout_attr = &info->lti_layout_attr; + struct lu_buf *buf = &info->lti_buf; + struct lod_object *lo = lod_dt_obj(dt); + struct lov_comp_md_v1 *lcm; + struct lov_comp_md_v1 *cur_lcm; + struct lov_comp_md_v1 *merge_lcm; + struct lov_comp_md_entry_v1 *lcme; + struct lov_mds_md_v1 *lmm; size_t size = 0; size_t offset; __u16 cur_entry_count; @@ -3243,7 +3402,9 @@ static int lod_declare_layout_merge(const struct lu_env *env, __u32 id = 0; __u16 mirror_id = 0; __u32 mirror_count; - int rc, i; + int rc, i; + bool merge_has_dom; + ENTRY; merge_lcm = mbuf->lb_buf; @@ -3327,10 +3488,17 @@ static int lod_declare_layout_merge(const struct lu_env *env, lcme->lcme_id = cpu_to_le32(id); } - id = MAX(le32_to_cpu(lcme->lcme_id), id); + id = max(le32_to_cpu(lcme->lcme_id), id); } mirror_id = mirror_id_of(id) + 1; + + /* check if first entry in new layout is DOM */ + lmm = (struct lov_mds_md_v1 *)((char *)merge_lcm + + merge_lcm->lcm_entries[0].lcme_offset); + merge_has_dom = lov_pattern(le32_to_cpu(lmm->lmm_pattern)) & + LOV_PATTERN_MDT; + for (i = 0; i < merge_entry_count; i++) { struct lov_comp_md_entry_v1 *merge_lcme; @@ -3339,6 +3507,8 @@ static int lod_declare_layout_merge(const struct lu_env *env, *lcme = *merge_lcme; lcme->lcme_offset = cpu_to_le32(offset); + if (merge_has_dom && i == 0) + lcme->lcme_flags |= cpu_to_le32(LCME_FL_STALE); id = pflr_id(mirror_id, i + 1); lcme->lcme_id = cpu_to_le32(id); @@ -3351,50 +3521,272 @@ static int lod_declare_layout_merge(const struct lu_env *env, } /* fixup layout information */ - lod_obj_inc_layout_gen(lo); - lcm->lcm_layout_gen = cpu_to_le32(lo->ldo_layout_gen); lcm->lcm_size = cpu_to_le32(size); lcm->lcm_entry_count = cpu_to_le16(cur_entry_count + merge_entry_count); lcm->lcm_mirror_count = cpu_to_le16(mirror_count); if ((le16_to_cpu(lcm->lcm_flags) & LCM_FL_FLR_MASK) == LCM_FL_NONE) lcm->lcm_flags = cpu_to_le32(LCM_FL_RDONLY); - rc = lod_striping_reload(env, lo, buf); + rc = lod_striping_reload(env, lo, buf, 0); if (rc) GOTO(out, rc); - rc = lod_sub_declare_xattr_set(env, dt_object_child(dt), buf, - XATTR_NAME_LOV, LU_XATTR_REPLACE, th); + lod_obj_inc_layout_gen(lo); + lcm->lcm_layout_gen = cpu_to_le32(lo->ldo_layout_gen); -out: - lu_buf_free(buf); - RETURN(rc); -} + /* transfer layout version to OST objects. */ + if (lo->ldo_mirror_count > 1) { + struct lod_obj_stripe_cb_data data = { {0} }; -/** + layout_attr->la_valid = LA_LAYOUT_VERSION; + layout_attr->la_layout_version = 0; + data.locd_attr = layout_attr; + data.locd_declare = true; + data.locd_stripe_cb = lod_obj_stripe_attr_set_cb; + rc = lod_obj_for_each_stripe(env, lo, th, &data); + if (rc) + GOTO(out, rc); + } + + rc = lod_sub_declare_xattr_set(env, dt_object_child(dt), buf, + XATTR_NAME_LOV, LU_XATTR_REPLACE, th); + +out: + lu_buf_free(buf); + RETURN(rc); +} + +/** * Split layouts, just set the LOVEA with the layout from mbuf. */ static int lod_declare_layout_split(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *mbuf, struct thandle *th) { + struct lod_thread_info *info = lod_env_info(env); + struct lu_attr *layout_attr = &info->lti_layout_attr; struct lod_object *lo = lod_dt_obj(dt); struct lov_comp_md_v1 *lcm = mbuf->lb_buf; int rc; ENTRY; + rc = lod_striping_reload(env, lo, mbuf, LVF_ALL_STALE); + if (rc) + RETURN(rc); + lod_obj_inc_layout_gen(lo); + /* fix on-disk layout gen */ lcm->lcm_layout_gen = cpu_to_le32(lo->ldo_layout_gen); - rc = lod_striping_reload(env, lo, mbuf); - if (rc) - RETURN(rc); + /* transfer layout version to OST objects. */ + if (lo->ldo_mirror_count > 1) { + struct lod_obj_stripe_cb_data data = { {0} }; + + layout_attr->la_valid = LA_LAYOUT_VERSION; + layout_attr->la_layout_version = 0; + data.locd_attr = layout_attr; + data.locd_declare = true; + data.locd_stripe_cb = lod_obj_stripe_attr_set_cb; + rc = lod_obj_for_each_stripe(env, lo, th, &data); + if (rc) + RETURN(rc); + } rc = lod_sub_declare_xattr_set(env, dt_object_child(dt), mbuf, XATTR_NAME_LOV, LU_XATTR_REPLACE, th); RETURN(rc); } +static int lod_layout_declare_or_purge_mirror(const struct lu_env *env, + struct dt_object *dt, const struct lu_buf *buf, + struct thandle *th, bool declare) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); + struct lod_object *lo = lod_dt_obj(dt); + struct lov_comp_md_v1 *comp_v1 = buf->lb_buf; + struct lov_comp_md_entry_v1 *entry; + struct lov_mds_md_v1 *lmm; + struct dt_object **sub_objs = NULL; + int rc = 0, i, k, array_count = 0; + + ENTRY; + + /** + * other ops (like lod_declare_destroy) could destroying sub objects + * as well. + */ + mutex_lock(&lo->ldo_layout_mutex); + + if (!declare) { + /* prepare sub-objects array */ + for (i = 0; i < comp_v1->lcm_entry_count; i++) { + entry = &comp_v1->lcm_entries[i]; + + if (!(entry->lcme_flags & LCME_FL_INIT)) + continue; + + lmm = (struct lov_mds_md_v1 *) + ((char *)comp_v1 + entry->lcme_offset); + array_count += lmm->lmm_stripe_count; + } + OBD_ALLOC_PTR_ARRAY(sub_objs, array_count); + if (sub_objs == NULL) { + mutex_unlock(&lo->ldo_layout_mutex); + RETURN(-ENOMEM); + } + } + + k = 0; /* sub_objs index */ + for (i = 0; i < comp_v1->lcm_entry_count; i++) { + struct lov_ost_data_v1 *objs; + struct lu_object *o, *n; + struct dt_object *dto; + struct lu_device *nd; + struct lov_mds_md_v3 *v3; + __u32 idx; + int j; + + entry = &comp_v1->lcm_entries[i]; + + if (!(entry->lcme_flags & LCME_FL_INIT)) + continue; + + lmm = (struct lov_mds_md_v1 *) + ((char *)comp_v1 + entry->lcme_offset); + v3 = (struct lov_mds_md_v3 *)lmm; + if (lmm->lmm_magic == LOV_MAGIC_V3) + objs = &v3->lmm_objects[0]; + else + objs = &lmm->lmm_objects[0]; + + for (j = 0; j < lmm->lmm_stripe_count; j++) { + idx = objs[j].l_ost_idx; + rc = ostid_to_fid(&info->lti_fid, &objs[j].l_ost_oi, + idx); + if (rc) + GOTO(out, rc); + + if (!fid_is_sane(&info->lti_fid)) { + CERROR("%s: sub-object insane fid "DFID"\n", + lod2obd(d)->obd_name, + PFID(&info->lti_fid)); + GOTO(out, rc = -EINVAL); + } + + lod_getref(&d->lod_ost_descs); + + rc = validate_lod_and_idx(d, idx); + if (unlikely(rc)) { + lod_putref(d, &d->lod_ost_descs); + GOTO(out, rc); + } + + nd = &OST_TGT(d, idx)->ltd_tgt->dd_lu_dev; + lod_putref(d, &d->lod_ost_descs); + + o = lu_object_find_at(env, nd, &info->lti_fid, NULL); + if (IS_ERR(o)) + GOTO(out, rc = PTR_ERR(o)); + + n = lu_object_locate(o->lo_header, nd->ld_type); + if (unlikely(!n)) { + lu_object_put(env, n); + GOTO(out, rc = -ENOENT); + } + + dto = container_of(n, struct dt_object, do_lu); + + if (declare) { + rc = lod_sub_declare_destroy(env, dto, th); + dt_object_put(env, dto); + if (rc) + GOTO(out, rc); + } else { + /** + * collect to-be-destroyed sub objects, the + * reference would be released after actual + * deletion. + */ + sub_objs[k] = dto; + k++; + } + } /* for each stripe */ + } /* for each component in the mirror */ +out: + if (!declare) { + i = 0; + if (!rc) { + /* destroy the sub objects */ + for (; i < k; i++) { + rc = lod_sub_destroy(env, sub_objs[i], th); + if (rc) + break; + dt_object_put(env, sub_objs[i]); + } + } + /** + * if a sub object destroy failed, we'd release sub objects + * reference get from above sub_objs collection. + */ + for (; i < k; i++) + dt_object_put(env, sub_objs[i]); + + OBD_FREE_PTR_ARRAY(sub_objs, array_count); + } + mutex_unlock(&lo->ldo_layout_mutex); + + RETURN(rc); +} + +/** + * Purge layouts, delete sub objects in the mirror stored in the vic_buf, + * and set the LOVEA with the layout from mbuf. + */ +static int lod_declare_layout_purge(const struct lu_env *env, + struct dt_object *dt, const struct lu_buf *buf, + struct thandle *th) +{ + struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); + struct lov_comp_md_v1 *comp_v1 = buf->lb_buf; + int rc; + + ENTRY; + + if (le32_to_cpu(comp_v1->lcm_magic) != LOV_MAGIC_COMP_V1) { + CERROR("%s: invalid layout magic %#x != %#x\n", + lod2obd(d)->obd_name, le32_to_cpu(comp_v1->lcm_magic), + LOV_MAGIC_COMP_V1); + RETURN(-EINVAL); + } + + if (cpu_to_le32(LOV_MAGIC_COMP_V1) != LOV_MAGIC_COMP_V1) + lustre_swab_lov_comp_md_v1(comp_v1); + + /* from now on, @buf contains cpu endian data */ + + if (comp_v1->lcm_mirror_count != 0) { + CERROR("%s: can only purge one mirror from "DFID"\n", + lod2obd(d)->obd_name, PFID(lu_object_fid(&dt->do_lu))); + RETURN(-EINVAL); + } + + /* delcare sub objects deletion in the mirror stored in @buf */ + rc = lod_layout_declare_or_purge_mirror(env, dt, buf, th, true); + RETURN(rc); +} + +/* delete sub objects from the mirror stored in @buf */ +static int lod_layout_purge(const struct lu_env *env, struct dt_object *dt, + const struct lu_buf *buf, struct thandle *th) +{ + int rc; + + ENTRY; + rc = lod_layout_declare_or_purge_mirror(env, dt, buf, th, false); + RETURN(rc); +} + /** * Implementation of dt_object_operations::do_declare_xattr_set. * @@ -3411,15 +3803,18 @@ static int lod_declare_xattr_set(const struct lu_env *env, const char *name, int fl, struct thandle *th) { + struct lod_thread_info *info = lod_env_info(env); struct dt_object *next = dt_object_child(dt); - struct lu_attr *attr = &lod_env_info(env)->lti_attr; + struct lu_attr *attr = &info->lti_attr; + struct lod_object *lo = lod_dt_obj(dt); __u32 mode; int rc; ENTRY; mode = dt->do_lu.lo_header->loh_attr & S_IFMT; if ((S_ISREG(mode) || mode == 0) && - !(fl & (LU_XATTR_REPLACE | LU_XATTR_MERGE | LU_XATTR_SPLIT)) && + !(fl & (LU_XATTR_REPLACE | LU_XATTR_MERGE | LU_XATTR_SPLIT | + LU_XATTR_PURGE)) && (strcmp(name, XATTR_NAME_LOV) == 0 || strcmp(name, XATTR_LUSTRE_LOV) == 0)) { /* @@ -3449,10 +3844,13 @@ static int lod_declare_xattr_set(const struct lu_env *env, LASSERT(strcmp(name, XATTR_NAME_LOV) == 0 || strcmp(name, XATTR_LUSTRE_LOV) == 0); rc = lod_declare_layout_split(env, dt, buf, th); + } else if (fl & LU_XATTR_PURGE) { + LASSERT(strcmp(name, XATTR_NAME_LOV) == 0 || + strcmp(name, XATTR_LUSTRE_LOV) == 0); + rc = lod_declare_layout_purge(env, dt, buf, th); } else if (S_ISREG(mode) && - strlen(name) > strlen(XATTR_LUSTRE_LOV) + 1 && - strncmp(name, XATTR_LUSTRE_LOV, - strlen(XATTR_LUSTRE_LOV)) == 0) { + strlen(name) >= sizeof(XATTR_LUSTRE_LOV) + 3 && + allowed_lustre_lov(name)) { /* * this is a request to modify object's striping. * add/set/del component(s). @@ -3461,20 +3859,6 @@ static int lod_declare_xattr_set(const struct lu_env *env, RETURN(-ENOENT); rc = lod_declare_modify_layout(env, dt, name, buf, th); - } else if (strncmp(name, XATTR_NAME_LMV, strlen(XATTR_NAME_LMV)) == 0 && - strlen(name) > strlen(XATTR_NAME_LMV) + 1) { - const char *op = name + strlen(XATTR_NAME_LMV) + 1; - - rc = -ENOTSUPP; - if (strcmp(op, "add") == 0) - rc = lod_dir_declare_layout_add(env, dt, buf, th); - else if (strcmp(op, "del") == 0) - rc = lod_dir_declare_layout_delete(env, dt, buf, th); - else if (strcmp(op, "set") == 0) - rc = lod_sub_declare_xattr_set(env, next, buf, - XATTR_NAME_LMV, fl, th); - - RETURN(rc); } else if (S_ISDIR(mode)) { rc = lod_dir_declare_xattr_set(env, dt, buf, name, fl, th); } else if (strcmp(name, XATTR_NAME_FID) == 0) { @@ -3483,6 +3867,11 @@ static int lod_declare_xattr_set(const struct lu_env *env, rc = lod_sub_declare_xattr_set(env, next, buf, name, fl, th); } + if (rc == 0 && + (strcmp(name, XATTR_NAME_LOV) == 0 || + strcmp(name, XATTR_LUSTRE_LOV) == 0 || allowed_lustre_lov(name))) + rc = lod_save_layout_gen_intrans(info, lo); + RETURN(rc); } @@ -3557,10 +3946,11 @@ static int lod_xattr_del_internal(const struct lu_env *env, struct dt_object *dt, const char *name, struct thandle *th) { - struct dt_object *next = dt_object_child(dt); - struct lod_object *lo = lod_dt_obj(dt); - int rc; - int i; + struct dt_object *next = dt_object_child(dt); + struct lod_object *lo = lod_dt_obj(dt); + int i; + int rc; + ENTRY; rc = lod_sub_xattr_del(env, next, name, th); @@ -3571,7 +3961,11 @@ static int lod_xattr_del_internal(const struct lu_env *env, RETURN(rc); for (i = 0; i < lo->ldo_dir_stripe_count; i++) { - LASSERT(lo->ldo_stripe[i]); + if (!lo->ldo_stripe[i]) + continue; + + if (!dt_object_exists(lo->ldo_stripe[i])) + continue; rc = lod_sub_xattr_del(env, lo->ldo_stripe[i], name, th); if (rc != 0) @@ -3619,9 +4013,11 @@ static int lod_xattr_set_lov_on_dir(const struct lu_env *env, case LOV_USER_MAGIC_SPECIFIC: case LOV_USER_MAGIC_V3: v3 = buf->lb_buf; - if (v3->lmm_pool_name[0] != '\0') + if (lov_pool_is_reserved(v3->lmm_pool_name)) + memset(v3->lmm_pool_name, 0, sizeof(v3->lmm_pool_name)); + else if (v3->lmm_pool_name[0] != '\0') pool_name = v3->lmm_pool_name; - /* fall through */ + fallthrough; case LOV_USER_MAGIC_V1: /* if { size, offset, count } = { 0, -1, 0 } and no pool * (i.e. all default values specified) then delete default @@ -3672,6 +4068,192 @@ static int lod_xattr_set_lov_on_dir(const struct lu_env *env, RETURN(rc); } +static int lod_get_default_lov_striping(const struct lu_env *env, + struct lod_object *lo, + struct lod_default_striping *lds, + struct dt_allocation_hint *ah); + +/** + * Helper function to convert compound layout to compound layout with + * pool + * + * Copy lcm_entries array of \a src to \a tgt. Replace lov_user_md_v1 + * components of \a src with lov_user_md_v3 using \a pool. + * + * \param[in] src source layout + * \param[in] pool pool to use in \a tgt + * \param[out] tgt target layout + */ +static void embed_pool_to_comp_v1(const struct lov_comp_md_v1 *src, + const char *pool, + struct lov_comp_md_v1 *tgt) +{ + size_t shift; + struct lov_user_md_v1 *lum; + struct lov_user_md_v3 *lum3; + struct lov_comp_md_entry_v1 *entry; + int i; + __u32 offset; + + entry = tgt->lcm_entries; + shift = 0; + for (i = 0; i < le16_to_cpu(src->lcm_entry_count); i++, entry++) { + *entry = src->lcm_entries[i]; + offset = le32_to_cpu(src->lcm_entries[i].lcme_offset); + entry->lcme_offset = cpu_to_le32(offset + shift); + + lum = (struct lov_user_md_v1 *)((char *)src + offset); + lum3 = (struct lov_user_md_v3 *)((char *)tgt + offset + shift); + *(struct lov_user_md_v1 *)lum3 = *lum; + if (lum->lmm_pattern & cpu_to_le32(LOV_PATTERN_MDT)) { + lum3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V1); + } else { + lum3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3); + entry->lcme_size = cpu_to_le32(sizeof(*lum3)); + strscpy(lum3->lmm_pool_name, pool, + sizeof(lum3->lmm_pool_name)); + shift += sizeof(*lum3) - sizeof(*lum); + } + } +} + +/** + * Set default striping on a directory. + * + * Sets specified striping on a directory object unless it matches the default + * striping (LOVEA_DELETE_VALUES() macro). In the latter case remove existing + * EA. This striping will be used when regular file is being created in this + * directory. + * If current default striping includes a pool but specifed striping + * does not - retain the pool if it exists. + * + * \param[in] env execution environment + * \param[in] dt the striped object + * \param[in] buf buffer with the striping + * \param[in] name name of EA + * \param[in] fl xattr flag (see OSD API description) + * \param[in] th transaction handle + * + * \retval 0 on success + * \retval negative if failed + */ +static int lod_xattr_set_default_lov_on_dir(const struct lu_env *env, + struct dt_object *dt, + const struct lu_buf *buf, + const char *name, int fl, + struct thandle *th) +{ + struct lod_default_striping *lds = lod_lds_buf_get(env); + struct lov_user_md_v1 *v1 = buf->lb_buf; + char pool[LOV_MAXPOOLNAME + 1]; + bool is_del; + int rc; + + ENTRY; + + /* get existing striping config */ + rc = lod_get_default_lov_striping(env, lod_dt_obj(dt), lds, NULL); + if (rc) + RETURN(rc); + + memset(pool, 0, sizeof(pool)); + if (lds->lds_def_striping_set == 1) + lod_layout_get_pool(lds->lds_def_comp_entries, + lds->lds_def_comp_cnt, pool, + sizeof(pool)); + + is_del = LOVEA_DELETE_VALUES(v1->lmm_stripe_size, + v1->lmm_stripe_count, + v1->lmm_stripe_offset, + NULL); + + /* Retain the pool name if it is not given */ + if (v1->lmm_magic == LOV_USER_MAGIC_V1 && pool[0] != '\0' && + !is_del) { + struct lod_thread_info *info = lod_env_info(env); + struct lov_user_md_v3 *v3 = info->lti_ea_store; + + memset(v3, 0, sizeof(*v3)); + v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3); + v3->lmm_pattern = cpu_to_le32(v1->lmm_pattern); + v3->lmm_stripe_count = cpu_to_le32(v1->lmm_stripe_count); + v3->lmm_stripe_offset = cpu_to_le32(v1->lmm_stripe_offset); + v3->lmm_stripe_size = cpu_to_le32(v1->lmm_stripe_size); + + strscpy(v3->lmm_pool_name, pool, sizeof(v3->lmm_pool_name)); + + info->lti_buf.lb_buf = v3; + info->lti_buf.lb_len = sizeof(*v3); + rc = lod_xattr_set_lov_on_dir(env, dt, &info->lti_buf, + name, fl, th); + } else if (v1->lmm_magic == LOV_USER_MAGIC_COMP_V1 && + pool[0] != '\0' && !is_del) { + /* + * try to retain the pool from default layout if the + * specified component layout does not provide pool + * info explicitly + */ + struct lod_thread_info *info = lod_env_info(env); + struct lov_comp_md_v1 *comp_v1 = buf->lb_buf; + struct lov_comp_md_v1 *comp_v1p; + struct lov_user_md_v1 *lum; + int entry_count; + int i; + __u32 offset; + struct lov_comp_md_entry_v1 *entry; + int size; + + entry_count = le16_to_cpu(comp_v1->lcm_entry_count); + size = sizeof(*comp_v1) + + entry_count * sizeof(comp_v1->lcm_entries[0]); + entry = comp_v1->lcm_entries; + for (i = 0; i < entry_count; i++, entry++) { + offset = le32_to_cpu(entry->lcme_offset); + lum = (struct lov_user_md_v1 *)((char *)comp_v1 + + offset); + if (le32_to_cpu(lum->lmm_magic) != LOV_USER_MAGIC_V1) + /* the i-th component includes pool info */ + break; + if (lum->lmm_pattern & cpu_to_le32(LOV_PATTERN_MDT)) + size += sizeof(struct lov_user_md_v1); + else + size += sizeof(struct lov_user_md_v3); + } + + if (i == entry_count) { + /* + * re-compose the layout to include the pool for + * each component + */ + if (info->lti_ea_store_size < size) + rc = lod_ea_store_resize(info, size); + + if (rc == 0) { + comp_v1p = info->lti_ea_store; + *comp_v1p = *comp_v1; + comp_v1p->lcm_size = cpu_to_le32(size); + embed_pool_to_comp_v1(comp_v1, pool, comp_v1p); + + info->lti_buf.lb_buf = comp_v1p; + info->lti_buf.lb_len = size; + rc = lod_xattr_set_lov_on_dir(env, dt, + &info->lti_buf, + name, fl, th); + } + } else { + rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, + th); + } + } else { + rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, fl, th); + } + + if (lds->lds_def_striping_set == 1 && lds->lds_def_comp_entries != NULL) + lod_free_def_comp_entries(lds); + + RETURN(rc); +} + /** * Set default striping on a directory object. * @@ -3712,8 +4294,7 @@ static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env, if (LMVEA_DELETE_VALUES((le32_to_cpu(lum->lum_stripe_count)), le32_to_cpu(lum->lum_stripe_offset)) && - le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC && - !(le32_to_cpu(lum->lum_hash_type) & LMV_HASH_FLAG_SPACE)) { + le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC) { rc = lod_xattr_del_internal(env, dt, name, th); if (rc == -ENODATA) rc = 0; @@ -3737,7 +4318,7 @@ static int lod_xattr_set_default_lmv_on_dir(const struct lu_env *env, * * \param[in] env execution environment * \param[in] dt the striped object - * \param[in] buf not used currently + * \param[in] buf buf lmv_user_md for create, or lmv_mds_md for replay * \param[in] name not used currently * \param[in] fl xattr flag (see OSD API description) * \param[in] th transaction handle @@ -3749,26 +4330,29 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, const char *name, int fl, struct thandle *th) { - struct lod_object *lo = lod_dt_obj(dt); - struct lod_thread_info *info = lod_env_info(env); - struct lu_attr *attr = &info->lti_attr; + struct lod_object *lo = lod_dt_obj(dt); + struct lod_thread_info *info = lod_env_info(env); + struct lu_attr *attr = &info->lti_attr; struct dt_object_format *dof = &info->lti_format; - struct lu_buf lmv_buf; - struct lu_buf slave_lmv_buf; - struct lmv_mds_md_v1 *lmm; - struct lmv_mds_md_v1 *slave_lmm = NULL; - struct dt_insert_rec *rec = &info->lti_dt_rec; - int i; - int rc; - ENTRY; + struct lu_buf lmv_buf; + struct lu_buf slave_lmv_buf; + struct lmv_user_md *lum = buf->lb_buf; + struct lmv_mds_md_v1 *lmm; + struct lmv_mds_md_v1 *slave_lmm = NULL; + struct dt_insert_rec *rec = &info->lti_dt_rec; + int i; + int rc; + ENTRY; + /* lum is used to know whether it's replay */ + LASSERT(lum); if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) RETURN(-ENOTDIR); /* The stripes are supposed to be allocated in declare phase, * if there are no stripes being allocated, it will skip */ if (lo->ldo_dir_stripe_count == 0) { - if (lo->ldo_dir_is_foreign) { + if (lo->ldo_is_foreign) { rc = lod_sub_xattr_set(env, dt_object_child(dt), buf, XATTR_NAME_LMV, fl, th); if (rc != 0) @@ -3781,8 +4365,8 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, if (rc != 0) RETURN(rc); - attr->la_valid = LA_ATIME | LA_MTIME | LA_CTIME | - LA_MODE | LA_UID | LA_GID | LA_TYPE | LA_PROJID; + attr->la_valid &= LA_ATIME | LA_MTIME | LA_CTIME | LA_FLAGS | + LA_MODE | LA_UID | LA_GID | LA_TYPE | LA_PROJID; dof->dof_type = DFT_DIR; rc = lod_prep_lmv_md(env, dt, &lmv_buf); @@ -3805,18 +4389,34 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, struct lu_name *sname; struct linkea_data ldata = { NULL }; struct lu_buf linkea_buf; + bool stripe_created = false; /* OBD_FAIL_MDS_STRIPE_FID may leave stripe uninitialized */ if (!dto) continue; /* fail a remote stripe creation */ - if (i && OBD_FAIL_CHECK(OBD_FAIL_MDS_STRIPE_CREATE)) + if (i && CFS_FAIL_CHECK(OBD_FAIL_MDS_STRIPE_CREATE)) continue; - /* if it's source stripe of migrating directory, don't create */ - if (!((lo->ldo_dir_hash_type & LMV_HASH_FLAG_MIGRATION) && - i >= lo->ldo_dir_migrate_offset)) { + /* if it's replay by client request, and stripe exists on remote + * MDT, it means mkdir was partially executed: stripe was + * created on remote MDT successfully, but target not in last + * run. + */ + if (unlikely((le32_to_cpu(lum->lum_magic) == LMV_MAGIC_V1) && + dt_object_exists(dto) && dt_object_remote(dto))) + stripe_created = true; + + /* don't create stripe if: + * 1. it's source stripe of migrating directory + * 2. it's existed stripe of splitting directory + */ + if ((lod_is_migrating(lo) && i >= lo->ldo_dir_migrate_offset) || + (lod_is_splitting(lo) && i < lo->ldo_dir_split_offset)) { + if (!dt_object_exists(dto)) + GOTO(out, rc = -EINVAL); + } else if (!stripe_created) { dt_write_lock(env, dto, DT_TGT_CHILD); rc = lod_sub_create(env, dto, attr, NULL, dof, th); if (rc != 0) { @@ -3837,15 +4437,9 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, GOTO(out, rc); } - rec->rec_fid = lu_object_fid(&dt->do_lu); - rc = lod_sub_insert(env, dto, (struct dt_rec *)rec, - (const struct dt_key *)dotdot, th); - if (rc != 0) - GOTO(out, rc); - - if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) || + if (!CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SLAVE_LMV) || cfs_fail_val != i) { - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_LMV) && + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_LMV) && cfs_fail_val == i) slave_lmm->lmv_master_mdt_index = cpu_to_le32(i + 1); @@ -3854,12 +4448,21 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, cpu_to_le32(i); rc = lod_sub_xattr_set(env, dto, &slave_lmv_buf, - XATTR_NAME_LMV, fl, th); + XATTR_NAME_LMV, 0, th); if (rc != 0) GOTO(out, rc); } - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && + /* don't insert stripe if it's existed stripe of splitting + * directory (this directory is striped). + * NB, plain directory will insert itself as the first + * stripe in target. + */ + if (lod_is_splitting(lo) && lo->ldo_dir_split_offset > 1 && + lo->ldo_dir_split_offset > i) + continue; + + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) && cfs_fail_val == i) snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", PFID(lu_object_fid(&dto->do_lu)), i + 1); @@ -3867,22 +4470,31 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", PFID(lu_object_fid(&dto->do_lu)), i); - sname = lod_name_get(env, stripe_name, strlen(stripe_name)); - rc = linkea_links_new(&ldata, &info->lti_linkea_buf, - sname, lu_object_fid(&dt->do_lu)); - if (rc != 0) - GOTO(out, rc); + if (!stripe_created) { + rec->rec_fid = lu_object_fid(&dt->do_lu); + rc = lod_sub_insert(env, dto, (struct dt_rec *)rec, + (const struct dt_key *)dotdot, th); + if (rc != 0) + GOTO(out, rc); - linkea_buf.lb_buf = ldata.ld_buf->lb_buf; - linkea_buf.lb_len = ldata.ld_leh->leh_len; - rc = lod_sub_xattr_set(env, dto, &linkea_buf, - XATTR_NAME_LINK, 0, th); - if (rc != 0) - GOTO(out, rc); + sname = lod_name_get(env, stripe_name, + strlen(stripe_name)); + rc = linkea_links_new(&ldata, &info->lti_linkea_buf, + sname, lu_object_fid(&dt->do_lu)); + if (rc != 0) + GOTO(out, rc); - rec->rec_fid = lu_object_fid(&dto->do_lu); - rc = lod_sub_insert(env, dt_object_child(dt), - (const struct dt_rec *)rec, + linkea_buf.lb_buf = ldata.ld_buf->lb_buf; + linkea_buf.lb_len = ldata.ld_leh->leh_len; + rc = lod_sub_xattr_set(env, dto, &linkea_buf, + XATTR_NAME_LINK, 0, th); + if (rc != 0) + GOTO(out, rc); + } + + rec->rec_fid = lu_object_fid(&dto->do_lu); + rc = lod_sub_insert(env, dt_object_child(dt), + (const struct dt_rec *)rec, (const struct dt_key *)stripe_name, th); if (rc != 0) GOTO(out, rc); @@ -3892,7 +4504,7 @@ static int lod_xattr_set_lmv(const struct lu_env *env, struct dt_object *dt, GOTO(out, rc); } - if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MASTER_LMV)) + if (!CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MASTER_LMV)) rc = lod_sub_xattr_set(env, dt_object_child(dt), &lmv_buf, XATTR_NAME_LMV, fl, th); out: @@ -3942,10 +4554,12 @@ static int lod_dir_striping_create_internal(const struct lu_env *env, LASSERT(ergo(lds != NULL, lds->lds_def_striping_set || lds->lds_dir_def_striping_set)); + LASSERT(lmu); if (!LMVEA_DELETE_VALUES(lo->ldo_dir_stripe_count, lo->ldo_dir_stripe_offset)) { - if (!lmu) { + if (!lmu->lb_buf) { + /* mkdir by default LMV */ struct lmv_user_md_v1 *v1 = info->lti_ea_store; int stripe_count = lo->ldo_dir_stripe_count; @@ -3975,30 +4589,29 @@ static int lod_dir_striping_create_internal(const struct lu_env *env, th); if (rc != 0) RETURN(rc); - } else { + } else if (lmu->lb_buf) { /* foreign LMV EA case */ - if (lmu) { + if (declare) { struct lmv_foreign_md *lfm = lmu->lb_buf; - if (lfm->lfm_magic == LMV_MAGIC_FOREIGN) { + if (lfm->lfm_magic == LMV_MAGIC_FOREIGN) rc = lod_declare_xattr_set_lmv(env, dt, attr, lmu, dof, th); - } - } else { - if (lo->ldo_dir_is_foreign) { - LASSERT(lo->ldo_foreign_lmv != NULL && - lo->ldo_foreign_lmv_size > 0); - info->lti_buf.lb_buf = lo->ldo_foreign_lmv; - info->lti_buf.lb_len = lo->ldo_foreign_lmv_size; - lmu = &info->lti_buf; - rc = lod_xattr_set_lmv(env, dt, lmu, - XATTR_NAME_LMV, 0, th); - } + } else if (lo->ldo_is_foreign) { + LASSERT(lo->ldo_foreign_lmv != NULL && + lo->ldo_foreign_lmv_size > 0); + info->lti_buf.lb_buf = lo->ldo_foreign_lmv; + info->lti_buf.lb_len = lo->ldo_foreign_lmv_size; + lmu = &info->lti_buf; + rc = lod_xattr_set_lmv(env, dt, lmu, XATTR_NAME_LMV, 0, + th); } } /* Transfer default LMV striping from the parent */ if (lds != NULL && lds->lds_dir_def_striping_set && + lds->lds_dir_def_max_inherit != LMV_INHERIT_END && + lds->lds_dir_def_max_inherit != LMV_INHERIT_NONE && !(LMVEA_DELETE_VALUES(lds->lds_dir_def_stripe_count, lds->lds_dir_def_stripe_offset) && le32_to_cpu(lds->lds_dir_def_hash_type) != @@ -4020,6 +4633,10 @@ static int lod_dir_striping_create_internal(const struct lu_env *env, cpu_to_le32(lds->lds_dir_def_stripe_offset); v1->lum_hash_type = cpu_to_le32(lds->lds_dir_def_hash_type); + v1->lum_max_inherit = + lmv_inherit_next(lds->lds_dir_def_max_inherit); + v1->lum_max_inherit_rr = + lmv_inherit_rr_next(lds->lds_dir_def_max_inherit_rr); info->lti_buf.lb_buf = v1; info->lti_buf.lb_len = sizeof(*v1); @@ -4066,6 +4683,12 @@ static int lod_dir_striping_create_internal(const struct lu_env *env, RETURN(rc); } + /* ldo_def_striping is not allocated, clear after use, in case directory + * layout is changed later. + */ + if (!declare) + lo->ldo_def_striping = NULL; + RETURN(0); } @@ -4083,10 +4706,11 @@ static int lod_declare_dir_striping_create(const struct lu_env *env, static int lod_dir_striping_create(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, + const struct lu_buf *lmu, struct dt_object_format *dof, struct thandle *th) { - return lod_dir_striping_create_internal(env, dt, attr, NULL, dof, th, + return lod_dir_striping_create_internal(env, dt, attr, lmu, dof, th, false); } @@ -4119,7 +4743,7 @@ static int lod_generate_and_set_lovea(const struct lu_env *env, LASSERT(lo); if (lo->ldo_comp_cnt == 0 && !lo->ldo_is_foreign) { - lod_striping_free(env, lo); + lod_striping_free_nolock(env, lo); rc = lod_sub_xattr_del(env, next, XATTR_NAME_LOV, th); RETURN(rc); } @@ -4182,7 +4806,7 @@ static int lod_layout_repeat_comp(const struct lu_env *env, CDEBUG(D_LAYOUT, "repeating component %d\n", index); - OBD_ALLOC(comp_array, sizeof(*comp_array) * new_cnt); + OBD_ALLOC_PTR_ARRAY(comp_array, new_cnt); if (comp_array == NULL) GOTO(out, rc = -ENOMEM); @@ -4204,6 +4828,7 @@ static int lod_layout_repeat_comp(const struct lu_env *env, new_comp->llc_flags &= ~LCME_FL_INIT; new_comp->llc_stripe = NULL; new_comp->llc_stripes_allocated = 0; + new_comp->llc_ost_indices = NULL; new_comp->llc_stripe_offset = LOV_OFFSET_DEFAULT; /* for uninstantiated components, layout gen stores default stripe * offset */ @@ -4229,8 +4854,7 @@ static int lod_layout_repeat_comp(const struct lu_env *env, new_comp->llc_ostlist.op_array = op_array; } - OBD_FREE(lo->ldo_comp_entries, - sizeof(*comp_array) * lo->ldo_comp_cnt); + OBD_FREE_PTR_ARRAY(lo->ldo_comp_entries, lo->ldo_comp_cnt); lo->ldo_comp_entries = comp_array; lo->ldo_comp_cnt = new_cnt; @@ -4244,7 +4868,7 @@ static int lod_layout_repeat_comp(const struct lu_env *env, EXIT; out: if (rc) - OBD_FREE(comp_array, sizeof(*comp_array) * new_cnt); + OBD_FREE_PTR_ARRAY(comp_array, new_cnt); return rc; } @@ -4261,12 +4885,11 @@ static int lod_layout_data_init(struct lod_thread_info *info, __u32 comp_cnt) RETURN(0); if (info->lti_comp_size > 0) { - OBD_FREE(info->lti_comp_idx, - info->lti_comp_size * sizeof(__u32)); + OBD_FREE_PTR_ARRAY(info->lti_comp_idx, info->lti_comp_size); info->lti_comp_size = 0; } - OBD_ALLOC(info->lti_comp_idx, comp_cnt * sizeof(__u32)); + OBD_ALLOC_PTR_ARRAY(info->lti_comp_idx, comp_cnt); if (!info->lti_comp_idx) RETURN(-ENOMEM); @@ -4319,6 +4942,9 @@ static int lod_layout_del_prep_layout(const struct lu_env *env, continue; } + if (lod_comp->llc_magic == LOV_MAGIC_FOREIGN) + continue; + lod_obj_set_pool(lo, i, NULL); if (lod_comp->llc_ostlist.op_array) { OBD_FREE(lod_comp->llc_ostlist.op_array, @@ -4353,11 +4979,11 @@ static int lod_layout_del_prep_layout(const struct lu_env *env, lu_object_put(env, &obj->do_lu); lod_comp->llc_stripe[j] = NULL; } - OBD_FREE(lod_comp->llc_stripe, sizeof(struct dt_object *) * - lod_comp->llc_stripes_allocated); + OBD_FREE_PTR_ARRAY(lod_comp->llc_stripe, + lod_comp->llc_stripes_allocated); lod_comp->llc_stripe = NULL; - OBD_FREE(lod_comp->llc_ost_indices, - sizeof(__u32) * lod_comp->llc_stripes_allocated); + OBD_FREE_PTR_ARRAY(lod_comp->llc_ost_indices, + lod_comp->llc_stripes_allocated); lod_comp->llc_ost_indices = NULL; lod_comp->llc_stripes_allocated = 0; } @@ -4370,7 +4996,7 @@ static int lod_layout_del_prep_layout(const struct lu_env *env, if (info->lti_count > 0) { struct lod_layout_component *comp_array; - OBD_ALLOC(comp_array, sizeof(*comp_array) * info->lti_count); + OBD_ALLOC_PTR_ARRAY(comp_array, info->lti_count); if (comp_array == NULL) GOTO(out, rc = -ENOMEM); @@ -4380,8 +5006,7 @@ static int lod_layout_del_prep_layout(const struct lu_env *env, sizeof(*comp_array)); } - OBD_FREE(lo->ldo_comp_entries, - sizeof(*comp_array) * lo->ldo_comp_cnt); + OBD_FREE_PTR_ARRAY(lo->ldo_comp_entries, lo->ldo_comp_cnt); lo->ldo_comp_entries = comp_array; lo->ldo_comp_cnt = info->lti_count; } else { @@ -4417,6 +5042,8 @@ static int lod_layout_del(const struct lu_env *env, struct dt_object *dt, LASSERT(lo->ldo_mirror_count == 1); + mutex_lock(&lo->ldo_layout_mutex); + rc = lod_layout_del_prep_layout(env, lo, th); if (rc < 0) GOTO(out, rc); @@ -4444,14 +5071,14 @@ static int lod_layout_del(const struct lu_env *env, struct dt_object *dt, EXIT; out: if (rc) - lod_striping_free(env, lo); + lod_striping_free_nolock(env, lo); + + mutex_unlock(&lo->ldo_layout_mutex); + return rc; } -static int lod_get_default_lov_striping(const struct lu_env *env, - struct lod_object *lo, - struct lod_default_striping *lds); /** * Implementation of dt_object_operations::do_xattr_set. * @@ -4474,85 +5101,35 @@ static int lod_xattr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, const char *name, int fl, struct thandle *th) { - struct dt_object *next = dt_object_child(dt); - int rc; + struct lod_thread_info *info = lod_env_info(env); + struct dt_object *next = dt_object_child(dt); + struct lu_attr *layout_attr = &info->lti_layout_attr; + struct lod_object *lo = lod_dt_obj(dt); + struct lod_obj_stripe_cb_data data = { {0} }; + int rc = 0; + ENTRY; if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && - strcmp(name, XATTR_NAME_LMV) == 0) { - rc = lod_dir_striping_create(env, dt, NULL, NULL, th); - RETURN(rc); - } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && - strncmp(name, XATTR_NAME_LMV, strlen(XATTR_NAME_LMV)) == 0 && - strlen(name) > strlen(XATTR_NAME_LMV) + 1) { - const char *op = name + strlen(XATTR_NAME_LMV) + 1; - - rc = -ENOTSUPP; - /* - * XATTR_NAME_LMV".add" is never called, but only declared, - * because lod_xattr_set_lmv() will do the addition. - */ - if (strcmp(op, "del") == 0) - rc = lod_dir_layout_delete(env, dt, buf, th); - else if (strcmp(op, "set") == 0) - rc = lod_sub_xattr_set(env, next, buf, XATTR_NAME_LMV, - fl, th); + !strcmp(name, XATTR_NAME_LMV)) { + switch (fl) { + case LU_XATTR_CREATE: + rc = lod_dir_striping_create(env, dt, NULL, buf, NULL, + th); + break; + case 0: + case LU_XATTR_REPLACE: + rc = lod_dir_layout_set(env, dt, buf, fl, th); + break; + default: + LBUG(); + } RETURN(rc); } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && - strcmp(name, XATTR_NAME_LOV) == 0) { - struct lod_default_striping *lds = lod_lds_buf_get(env); - struct lov_user_md_v1 *v1 = buf->lb_buf; - char pool[LOV_MAXPOOLNAME + 1]; - bool is_del; - - /* get existing striping config */ - rc = lod_get_default_lov_striping(env, lod_dt_obj(dt), lds); - if (rc) - RETURN(rc); - - memset(pool, 0, sizeof(pool)); - if (lds->lds_def_striping_set == 1) - lod_layout_get_pool(lds->lds_def_comp_entries, - lds->lds_def_comp_cnt, pool, - sizeof(pool)); - - is_del = LOVEA_DELETE_VALUES(v1->lmm_stripe_size, - v1->lmm_stripe_count, - v1->lmm_stripe_offset, - NULL); - - /* Retain the pool name if it is not given */ - if (v1->lmm_magic == LOV_USER_MAGIC_V1 && pool[0] != '\0' && - !is_del) { - struct lod_thread_info *info = lod_env_info(env); - struct lov_user_md_v3 *v3 = info->lti_ea_store; - - memset(v3, 0, sizeof(*v3)); - v3->lmm_magic = cpu_to_le32(LOV_USER_MAGIC_V3); - v3->lmm_pattern = cpu_to_le32(v1->lmm_pattern); - v3->lmm_stripe_count = - cpu_to_le32(v1->lmm_stripe_count); - v3->lmm_stripe_offset = - cpu_to_le32(v1->lmm_stripe_offset); - v3->lmm_stripe_size = cpu_to_le32(v1->lmm_stripe_size); - - strlcpy(v3->lmm_pool_name, pool, - sizeof(v3->lmm_pool_name)); - - info->lti_buf.lb_buf = v3; - info->lti_buf.lb_len = sizeof(*v3); - rc = lod_xattr_set_lov_on_dir(env, dt, &info->lti_buf, - name, fl, th); - } else { - rc = lod_xattr_set_lov_on_dir(env, dt, buf, name, - fl, th); - } - - if (lds->lds_def_striping_set == 1 && - lds->lds_def_comp_entries != NULL) - lod_free_def_comp_entries(lds); - + strcmp(name, XATTR_NAME_LOV) == 0) { + rc = lod_xattr_set_default_lov_on_dir(env, dt, buf, name, fl, + th); RETURN(rc); } else if (S_ISDIR(dt->do_lu.lo_header->loh_attr) && strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) { @@ -4561,9 +5138,20 @@ static int lod_xattr_set(const struct lu_env *env, th); RETURN(rc); } else if (S_ISREG(dt->do_lu.lo_header->loh_attr) && - (!strcmp(name, XATTR_NAME_LOV) || - !strncmp(name, XATTR_LUSTRE_LOV, - strlen(XATTR_LUSTRE_LOV)))) { + (strcmp(name, XATTR_NAME_LOV) == 0 || + strcmp(name, XATTR_LUSTRE_LOV) == 0 || + allowed_lustre_lov(name))) { + /* layout has been changed by others in the transaction */ + rc = lod_check_layout_gen_intrans(info, lo); + if (rc > 0) { + CDEBUG(D_LAYOUT, + "%s: obj "DFID" gen changed from %d to %d in transaction, retry the transaction\n", + dt->do_lu.lo_dev->ld_obd->obd_name, + PFID(lu_object_fid(&dt->do_lu)), + info->lti_gen[rc - 1], lo->ldo_layout_gen); + RETURN(-EAGAIN); + } + /* in case of lov EA swap, just set it * if not, it is a replay so check striping match what we * already have during req replay, declare_xattr_set() @@ -4573,6 +5161,31 @@ static int lod_xattr_set(const struct lu_env *env, lod_striping_free(env, lod_dt_obj(dt)); rc = lod_sub_xattr_set(env, next, buf, name, fl, th); + } else if (fl & LU_XATTR_SPLIT) { + rc = lod_sub_xattr_set(env, next, buf, name, fl, th); + if (rc) + RETURN(rc); + + rc = lod_striping_reload(env, lo, buf, LVF_ALL_STALE); + if (rc) + RETURN(rc); + + if (lo->ldo_mirror_count > 1 && + layout_attr->la_valid & LA_LAYOUT_VERSION) { + /* mirror split */ + layout_attr->la_layout_version = + lo->ldo_layout_gen; + data.locd_attr = layout_attr; + data.locd_declare = false; + data.locd_stripe_cb = + lod_obj_stripe_attr_set_cb; + rc = lod_obj_for_each_stripe(env, lo, th, + &data); + if (rc) + RETURN(rc); + } + } else if (fl & LU_XATTR_PURGE) { + rc = lod_layout_purge(env, dt, buf, th); } else if (dt_object_remote(dt)) { /* This only happens during migration, see * mdd_migrate_create(), in which Master MDT will @@ -4588,7 +5201,7 @@ static int lod_xattr_set(const struct lu_env *env, } else { /* * When 'name' is XATTR_LUSTRE_LOV or XATTR_NAME_LOV, - * it's going to create create file with specified + * it's going to create file with specified * component(s), the striping must have not being * cached in this case; * @@ -4596,11 +5209,29 @@ static int lod_xattr_set(const struct lu_env *env, * an existing file, the striping must have been cached * in this case. */ - LASSERT(equi(!strcmp(name, XATTR_LUSTRE_LOV) || - !strcmp(name, XATTR_NAME_LOV), - !lod_dt_obj(dt)->ldo_comp_cached)); + if (!(fl & LU_XATTR_MERGE)) + LASSERT(equi(!strcmp(name, XATTR_LUSTRE_LOV) || + !strcmp(name, XATTR_NAME_LOV), + !lod_dt_obj(dt)->ldo_comp_cached)); rc = lod_striped_create(env, dt, NULL, NULL, th); + if (rc) + RETURN(rc); + + if (fl & LU_XATTR_MERGE && lo->ldo_mirror_count > 1 && + layout_attr->la_valid & LA_LAYOUT_VERSION) { + /* mirror merge exec phase */ + layout_attr->la_layout_version = + lo->ldo_layout_gen; + data.locd_attr = layout_attr; + data.locd_declare = false; + data.locd_stripe_cb = + lod_obj_stripe_attr_set_cb; + rc = lod_obj_for_each_stripe(env, lo, th, + &data); + if (rc) + RETURN(rc); + } } RETURN(rc); } else if (strcmp(name, XATTR_NAME_FID) == 0) { @@ -4658,6 +5289,9 @@ static int lod_declare_xattr_del(const struct lu_env *env, if (!dto) continue; + if (!dt_object_exists(dto)) + continue; + rc = lod_sub_declare_xattr_del(env, dto, name, th); if (rc != 0) break; @@ -4677,35 +5311,14 @@ static int lod_declare_xattr_del(const struct lu_env *env, static int lod_xattr_del(const struct lu_env *env, struct dt_object *dt, const char *name, struct thandle *th) { - struct dt_object *next = dt_object_child(dt); - struct lod_object *lo = lod_dt_obj(dt); - int rc; - int i; + int rc; + ENTRY; if (!strcmp(name, XATTR_NAME_LOV) || !strcmp(name, XATTR_NAME_LMV)) lod_striping_free(env, lod_dt_obj(dt)); - rc = lod_sub_xattr_del(env, next, name, th); - if (rc != 0 || !S_ISDIR(dt->do_lu.lo_header->loh_attr)) - RETURN(rc); - - if (!strcmp(name, XATTR_NAME_LMV)) - RETURN(0); - - if (lo->ldo_dir_stripe_count == 0) - RETURN(0); - - for (i = 0; i < lo->ldo_dir_stripe_count; i++) { - struct dt_object *dto = lo->ldo_stripe[i]; - - if (!dto) - continue; - - rc = lod_sub_xattr_del(env, dto, name, th); - if (rc != 0) - break; - } + rc = lod_xattr_del_internal(env, dt, name, th); RETURN(rc); } @@ -4786,19 +5399,28 @@ skip: */ static int lod_get_default_lov_striping(const struct lu_env *env, struct lod_object *lo, - struct lod_default_striping *lds) + struct lod_default_striping *lds, + struct dt_allocation_hint *dah) { struct lod_thread_info *info = lod_env_info(env); struct lov_user_md_v1 *v1 = NULL; struct lov_user_md_v3 *v3 = NULL; - struct lov_comp_md_v1 *comp_v1 = NULL; - __u16 comp_cnt; - __u16 mirror_cnt; - bool composite; + struct lov_comp_md_v1 *lcm = NULL; + __u32 magic; + int append_stripe_count = dah != NULL ? dah->dah_append_stripe_count : 0; + const char *append_pool = (dah != NULL && + dah->dah_append_pool != NULL && + dah->dah_append_pool[0] != '\0') ? + dah->dah_append_pool : NULL; + __u16 entry_count = 1; + __u16 mirror_count = 0; + bool want_composite = false; int rc, i, j; ENTRY; + lds->lds_def_striping_set = 0; + rc = lod_get_lov_ea(env, lo); if (rc < 0) RETURN(rc); @@ -4806,110 +5428,133 @@ static int lod_get_default_lov_striping(const struct lu_env *env, if (rc < (typeof(rc))sizeof(struct lov_user_md)) RETURN(0); - v1 = info->lti_ea_store; - if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V1)) { - lustre_swab_lov_user_md_v1(v1); - } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_V3)) { - v3 = (struct lov_user_md_v3 *)v1; - lustre_swab_lov_user_md_v3(v3); - } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_SPECIFIC)) { - v3 = (struct lov_user_md_v3 *)v1; + magic = *(__u32 *)info->lti_ea_store; + if (magic == __swab32(LOV_USER_MAGIC_V1)) { + lustre_swab_lov_user_md_v1(info->lti_ea_store); + } else if (magic == __swab32(LOV_USER_MAGIC_V3)) { + lustre_swab_lov_user_md_v3(info->lti_ea_store); + } else if (magic == __swab32(LOV_USER_MAGIC_SPECIFIC)) { + v3 = (struct lov_user_md_v3 *)info->lti_ea_store; lustre_swab_lov_user_md_v3(v3); lustre_swab_lov_user_md_objects(v3->lmm_objects, v3->lmm_stripe_count); - } else if (v1->lmm_magic == __swab32(LOV_USER_MAGIC_COMP_V1) || - v1->lmm_magic == __swab32(LOV_USER_MAGIC_SEL)) { - comp_v1 = (struct lov_comp_md_v1 *)v1; - lustre_swab_lov_comp_md_v1(comp_v1); + } else if (magic == __swab32(LOV_USER_MAGIC_COMP_V1) || + magic == __swab32(LOV_USER_MAGIC_SEL)) { + lustre_swab_lov_comp_md_v1(info->lti_ea_store); } - if (v1->lmm_magic != LOV_MAGIC_V3 && v1->lmm_magic != LOV_MAGIC_V1 && - v1->lmm_magic != LOV_MAGIC_COMP_V1 && - v1->lmm_magic != LOV_MAGIC_SEL && - v1->lmm_magic != LOV_USER_MAGIC_SPECIFIC) + switch (magic) { + case LOV_MAGIC_V1: + case LOV_MAGIC_V3: + case LOV_USER_MAGIC_SPECIFIC: + v1 = info->lti_ea_store; + break; + case LOV_MAGIC_COMP_V1: + case LOV_MAGIC_SEL: + lcm = info->lti_ea_store; + entry_count = lcm->lcm_entry_count; + if (entry_count == 0) + RETURN(-EINVAL); + + mirror_count = lcm->lcm_mirror_count + 1; + want_composite = true; + break; + default: RETURN(-ENOTSUPP); + } - if (v1->lmm_magic == LOV_MAGIC_COMP_V1 || - v1->lmm_magic == LOV_MAGIC_SEL) { - comp_v1 = (struct lov_comp_md_v1 *)v1; - comp_cnt = comp_v1->lcm_entry_count; - if (comp_cnt == 0) - RETURN(-EINVAL); - mirror_cnt = comp_v1->lcm_mirror_count + 1; - composite = true; - } else { - comp_cnt = 1; - mirror_cnt = 0; - composite = false; + if (append_stripe_count != 0 || append_pool != NULL) { + entry_count = 1; + mirror_count = 0; + want_composite = false; } /* realloc default comp entries if necessary */ - rc = lod_def_striping_comp_resize(lds, comp_cnt); + rc = lod_def_striping_comp_resize(lds, entry_count); if (rc < 0) RETURN(rc); - lds->lds_def_comp_cnt = comp_cnt; - lds->lds_def_striping_is_composite = composite; - lds->lds_def_mirror_cnt = mirror_cnt; + lds->lds_def_comp_cnt = entry_count; + lds->lds_def_striping_is_composite = want_composite; + lds->lds_def_mirror_cnt = mirror_count; - for (i = 0; i < comp_cnt; i++) { - struct lod_layout_component *lod_comp; - char *pool; + for (i = 0; i < entry_count; i++) { + struct lod_layout_component *llc = &lds->lds_def_comp_entries[i]; + const char *pool; - lod_comp = &lds->lds_def_comp_entries[i]; /* - * reset lod_comp values, llc_stripes is always NULL in - * the default striping template, llc_pool will be reset - * later below. + * reset llc values, llc_stripes is always NULL in the + * default striping template, llc_pool will be reset + * later below using lod_set_pool(). + * + * XXX At this point llc_pool may point to valid (!) + * kmalloced strings from previous RPCs. */ - memset(lod_comp, 0, offsetof(typeof(*lod_comp), llc_pool)); - - if (composite) { - v1 = (struct lov_user_md *)((char *)comp_v1 + - comp_v1->lcm_entries[i].lcme_offset); - lod_comp->llc_extent = - comp_v1->lcm_entries[i].lcme_extent; - /* We only inherit certain flags from the layout */ - lod_comp->llc_flags = - comp_v1->lcm_entries[i].lcme_flags & + memset(llc, 0, offsetof(typeof(*llc), llc_pool)); + + if (lcm != NULL) { + v1 = (struct lov_user_md *)((char *)lcm + + lcm->lcm_entries[i].lcme_offset); + + if (want_composite) { + llc->llc_extent = lcm->lcm_entries[i].lcme_extent; + /* We only inherit certain flags from the layout */ + llc->llc_flags = lcm->lcm_entries[i].lcme_flags & LCME_TEMPLATE_FLAGS; + } } + CDEBUG(D_LAYOUT, DFID" magic = %#08x, pattern = %#x, stripe_count = %hu, stripe_size = %u, stripe_offset = %hu, append_pool = '%s', append_stripe_count = %d\n", + PFID(lu_object_fid(&lo->ldo_obj.do_lu)), + v1->lmm_magic, + v1->lmm_pattern, + v1->lmm_stripe_count, + v1->lmm_stripe_size, + v1->lmm_stripe_offset, + append_pool ?: "", + append_stripe_count); + if (!lov_pattern_supported(v1->lmm_pattern) && !(v1->lmm_pattern & LOV_PATTERN_F_RELEASED)) { lod_free_def_comp_entries(lds); RETURN(-EINVAL); } - CDEBUG(D_LAYOUT, DFID" stripe_count=%d stripe_size=%d " - "stripe_offset=%d\n", - PFID(lu_object_fid(&lo->ldo_obj.do_lu)), - (int)v1->lmm_stripe_count, (int)v1->lmm_stripe_size, - (int)v1->lmm_stripe_offset); + llc->llc_stripe_count = v1->lmm_stripe_count; + llc->llc_stripe_size = v1->lmm_stripe_size; + llc->llc_stripe_offset = v1->lmm_stripe_offset; + llc->llc_pattern = v1->lmm_pattern; - lod_comp->llc_stripe_count = v1->lmm_stripe_count; - lod_comp->llc_stripe_size = v1->lmm_stripe_size; - lod_comp->llc_stripe_offset = v1->lmm_stripe_offset; - lod_comp->llc_pattern = v1->lmm_pattern; + if (append_stripe_count != 0 || append_pool != NULL) + llc->llc_pattern = LOV_PATTERN_RAID0; + + if (append_stripe_count != 0) + llc->llc_stripe_count = append_stripe_count; pool = NULL; - if (v1->lmm_magic == LOV_USER_MAGIC_V3) { + if (append_pool != NULL) { + pool = append_pool; + } else if (v1->lmm_magic == LOV_USER_MAGIC_V3) { /* XXX: sanity check here */ - v3 = (struct lov_user_md_v3 *) v1; + v3 = (struct lov_user_md_v3 *)v1; if (v3->lmm_pool_name[0] != '\0') pool = v3->lmm_pool_name; } - lod_set_def_pool(lds, i, pool); - if (v1->lmm_magic == LOV_USER_MAGIC_SPECIFIC) { + + lod_set_pool(&llc->llc_pool, pool); + + if (v1->lmm_magic == LOV_USER_MAGIC_SPECIFIC && + append_stripe_count == 0 && + append_pool == NULL) { v3 = (struct lov_user_md_v3 *)v1; - rc = lod_comp_copy_ost_lists(lod_comp, v3); + rc = lod_comp_copy_ost_lists(llc, v3); if (rc) RETURN(rc); - } else if (lod_comp->llc_ostlist.op_array && - lod_comp->llc_ostlist.op_count) { - for (j = 0; j < lod_comp->llc_ostlist.op_count; j++) - lod_comp->llc_ostlist.op_array[j] = -1; - lod_comp->llc_ostlist.op_count = 0; + } else if (llc->llc_ostlist.op_array && + llc->llc_ostlist.op_count) { + for (j = 0; j < llc->llc_ostlist.op_count; j++) + llc->llc_ostlist.op_array[j] = -1; + llc->llc_ostlist.op_count = 0; } } @@ -4917,6 +5562,17 @@ static int lod_get_default_lov_striping(const struct lu_env *env, RETURN(rc); } +static inline void lod_lum2lds(struct lod_default_striping *lds, + const struct lmv_user_md *lum) +{ + lds->lds_dir_def_stripe_count = le32_to_cpu(lum->lum_stripe_count); + lds->lds_dir_def_stripe_offset = le32_to_cpu(lum->lum_stripe_offset); + lds->lds_dir_def_hash_type = le32_to_cpu(lum->lum_hash_type); + lds->lds_dir_def_max_inherit = lum->lum_max_inherit; + lds->lds_dir_def_max_inherit_rr = lum->lum_max_inherit_rr; + lds->lds_dir_def_striping_set = 1; +} + /** * Get default directory striping. * @@ -4944,14 +5600,7 @@ static int lod_get_default_lmv_striping(const struct lu_env *env, struct lod_thread_info *info = lod_env_info(env); lmu = info->lti_ea_store; - - lds->lds_dir_def_stripe_count = - le32_to_cpu(lmu->lum_stripe_count); - lds->lds_dir_def_stripe_offset = - le32_to_cpu(lmu->lum_stripe_offset); - lds->lds_dir_def_hash_type = - le32_to_cpu(lmu->lum_hash_type); - lds->lds_dir_def_striping_set = 1; + lod_lum2lds(lds, lmu); } return 0; @@ -4971,14 +5620,30 @@ static int lod_get_default_lmv_striping(const struct lu_env *env, */ static int lod_get_default_striping(const struct lu_env *env, struct lod_object *lo, + struct dt_allocation_hint *ah, struct lod_default_striping *lds) { int rc, rc1; - rc = lod_get_default_lov_striping(env, lo, lds); - rc1 = lod_get_default_lmv_striping(env, lo, lds); - if (rc == 0 && rc1 < 0) - rc = rc1; + rc = lod_get_default_lov_striping(env, lo, lds, NULL); + if (lds->lds_def_striping_set) { + struct lod_thread_info *info = lod_env_info(env); + struct lod_device *d = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); + + rc = lod_verify_striping(env, d, lo, &info->lti_buf, false); + if (rc) + lds->lds_def_striping_set = 0; + } + + if (ah->dah_eadata_is_dmv) { + lod_lum2lds(lds, ah->dah_eadata); + } else if (ah->dah_dmv_imp_inherit) { + lds->lds_dir_def_striping_set = 0; + } else { + rc1 = lod_get_default_lmv_striping(env, lo, lds); + if (rc == 0 && rc1 < 0) + rc = rc1; + } return rc; } @@ -4998,10 +5663,11 @@ static void lod_striping_from_default(struct lod_object *lo, umode_t mode) { struct lod_device *d = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); - struct lov_desc *desc = &d->lod_desc; int i, rc; if (lds->lds_def_striping_set && S_ISREG(mode)) { + struct lov_desc *desc = &d->lod_ost_descs.ltd_lov_desc; + rc = lod_alloc_comp_entries(lo, lds->lds_def_mirror_cnt, lds->lds_def_comp_cnt); if (rc != 0) @@ -5017,8 +5683,9 @@ static void lod_striping_from_default(struct lod_object *lo, struct lod_layout_component *def_comp = &lds->lds_def_comp_entries[i]; - CDEBUG(D_LAYOUT, "Inherit from default: flags=%#x " - "size=%hu nr=%u offset=%u pattern=%#x pool=%s\n", + CDEBUG(D_LAYOUT, + "inherit "DFID" file layout from default: flags=%#x size=%u nr=%u offset=%u pattern=%#x pool=%s\n", + PFID(lu_object_fid(&lo->ldo_obj.do_lu)), def_comp->llc_flags, def_comp->llc_stripe_size, def_comp->llc_stripe_count, @@ -5058,7 +5725,7 @@ static void lod_striping_from_default(struct lod_object *lo, if (!lo->ldo_is_composite) continue; - lod_adjust_stripe_info(obj_comp, desc); + lod_adjust_stripe_info(obj_comp, desc, 0); } } else if (lds->lds_dir_def_striping_set && S_ISDIR(mode)) { if (lo->ldo_dir_stripe_count == 0) @@ -5067,18 +5734,19 @@ static void lod_striping_from_default(struct lod_object *lo, if (lo->ldo_dir_stripe_offset == -1) lo->ldo_dir_stripe_offset = lds->lds_dir_def_stripe_offset; - if (lo->ldo_dir_hash_type == 0) - lo->ldo_dir_hash_type = lds->lds_dir_def_hash_type & - ~LMV_HASH_FLAG_SPACE; + if (lo->ldo_dir_hash_type == LMV_HASH_TYPE_UNKNOWN) + lo->ldo_dir_hash_type = lds->lds_dir_def_hash_type; - CDEBUG(D_LAYOUT, "striping from default dir: count:%hu, " - "offset:%u, hash_type:%u\n", + CDEBUG(D_LAYOUT, + "inherit "DFID" dir layout from default: count=%hu offset=%u hash_type=%x\n", + PFID(lu_object_fid(&lo->ldo_obj.do_lu)), lo->ldo_dir_stripe_count, lo->ldo_dir_stripe_offset, lo->ldo_dir_hash_type); } } -static inline bool lod_need_inherit_more(struct lod_object *lo, bool from_root) +static inline bool lod_need_inherit_more(struct lod_object *lo, bool from_root, + const char *append_pool) { struct lod_layout_component *lod_comp; @@ -5098,6 +5766,9 @@ static inline bool lod_need_inherit_more(struct lod_object *lo, bool from_root) lod_comp->llc_stripe_offset == LOV_OFFSET_DEFAULT)) return true; + if (append_pool && append_pool[0]) + return true; + return false; } @@ -5107,8 +5778,8 @@ static inline bool lod_need_inherit_more(struct lod_object *lo, bool from_root) * This method is used to make a decision on the striping configuration for the * object being created. It can be taken from the \a parent object if it exists, * or filesystem's default. The resulting configuration (number of stripes, - * stripe size/offset, pool name, etc) is stored in the object itself and will - * be used by the methods like ->doo_declare_create(). + * stripe size/offset, pool name, hash_type, etc.) is stored in the object + * itself and will be used by the methods like ->doo_declare_create(). * * \see dt_object_operations::do_ah_init() in the API description for details. */ @@ -5132,6 +5803,10 @@ static void lod_ah_init(const struct lu_env *env, LASSERT(child); + if (ah->dah_append_stripe_count == -1) + ah->dah_append_stripe_count = + d->lod_ost_descs.ltd_lov_desc.ld_tgt_count; + if (likely(parent)) { nextp = dt_object_child(parent); lp = lod_dt_obj(parent); @@ -5151,40 +5826,31 @@ static void lod_ah_init(const struct lu_env *env, if (S_ISDIR(child_mode)) { const struct lmv_user_md_v1 *lum1 = ah->dah_eadata; + int max_stripe_count; /* other default values are 0 */ - lc->ldo_dir_stripe_offset = -1; + lc->ldo_dir_stripe_offset = LMV_OFFSET_DEFAULT; /* no default striping configuration is needed for * foreign dirs */ if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0 && le32_to_cpu(lum1->lum_magic) == LMV_MAGIC_FOREIGN) { - lc->ldo_dir_is_foreign = true; + lc->ldo_is_foreign = true; /* keep stripe_count 0 and stripe_offset -1 */ CDEBUG(D_INFO, "no default striping for foreign dir\n"); RETURN_EXIT; } - /* - * If parent object is not root directory, - * then get default striping from parent object. - */ - if (likely(lp != NULL) && !fid_is_root(lod_object_fid(lp))) - lod_get_default_striping(env, lp, lds); - - /* set child default striping info, default value is NULL */ - if (lds->lds_def_striping_set || lds->lds_dir_def_striping_set) - lc->ldo_def_striping = lds; + if (likely(lp != NULL)) + lod_get_default_striping(env, lp, ah, lds); /* It should always honour the specified stripes */ - /* Note: old client (< 2.7)might also do lfs mkdir, whose EA - * will have old magic. In this case, we should ignore the - * stripe count and try to create dir by default stripe. - */ - if (ah->dah_eadata != NULL && ah->dah_eadata_len != 0 && + if (ah->dah_eadata && ah->dah_eadata_len && + !ah->dah_eadata_is_dmv && (le32_to_cpu(lum1->lum_magic) == LMV_USER_MAGIC || - le32_to_cpu(lum1->lum_magic) == LMV_USER_MAGIC_SPECIFIC)) { + le32_to_cpu(lum1->lum_magic) == LMV_USER_MAGIC_SPECIFIC || + le32_to_cpu(lum1->lum_magic) == LMV_MAGIC_V1)) { lc->ldo_dir_stripe_count = le32_to_cpu(lum1->lum_stripe_count); lc->ldo_dir_stripe_offset = @@ -5192,28 +5858,111 @@ static void lod_ah_init(const struct lu_env *env, lc->ldo_dir_hash_type = le32_to_cpu(lum1->lum_hash_type); CDEBUG(D_INFO, - "set dirstripe: count %hu, offset %d, hash %u\n", + "set dirstripe: count %hu, offset %d, hash %x\n", lc->ldo_dir_stripe_count, (int)lc->ldo_dir_stripe_offset, lc->ldo_dir_hash_type); + + if (d->lod_mdt_descs.ltd_lmv_desc.ld_active_tgt_count && + lc->ldo_dir_stripe_count < 2 && + lum1->lum_max_inherit != LMV_INHERIT_NONE) { + /* when filesystem-wide default LMV is set, dirs + * will be created on MDT by space usage, but if + * dir is created with "lfs mkdir -c 1 ...", its + * subdirs should be kept on the same MDT. To + * guarantee this, set default LMV for such dir. + */ + lds->lds_dir_def_stripe_count = + le32_to_cpu(lum1->lum_stripe_count); + /* if "-1" stripe offset is set, save current + * MDT index in default LMV. + */ + if (le32_to_cpu(lum1->lum_stripe_offset) == + LMV_OFFSET_DEFAULT) + lds->lds_dir_def_stripe_offset = + lod2lu_dev(d)->ld_site->ld_seq_site->ss_node_id; + else + lds->lds_dir_def_stripe_offset = + le32_to_cpu(lum1->lum_stripe_offset); + lds->lds_dir_def_hash_type = + le32_to_cpu(lum1->lum_hash_type); + lds->lds_dir_def_max_inherit = + lum1->lum_max_inherit; + /* it will be decreased by 1 later in setting */ + if (lum1->lum_max_inherit >= LMV_INHERIT_END && + lum1->lum_max_inherit < LMV_INHERIT_MAX) + lds->lds_dir_def_max_inherit++; + lds->lds_dir_def_max_inherit_rr = + lum1->lum_max_inherit_rr; + lds->lds_dir_def_striping_set = 1; + /* don't inherit LOV from ROOT */ + if (lds->lds_def_striping_set && + fid_is_root(lod_object_fid(lp))) + lds->lds_def_striping_set = 0; + lc->ldo_def_striping = lds; + } else if (lds->lds_def_striping_set && + !fid_is_root(lod_object_fid(lp))) { + /* don't inherit default LMV for "lfs mkdir" */ + lds->lds_dir_def_striping_set = 0; + lc->ldo_def_striping = lds; + } } else { + /* inherit default striping except ROOT */ + if ((lds->lds_def_striping_set || + lds->lds_dir_def_striping_set) && + !fid_is_root(lod_object_fid(lp))) + lc->ldo_def_striping = lds; + /* transfer defaults LMV to new directory */ lod_striping_from_default(lc, lds, child_mode); /* set count 0 to create normal directory */ if (lc->ldo_dir_stripe_count == 1) lc->ldo_dir_stripe_count = 0; + + /* do not save default LMV on server */ + if (ah->dah_dmv_imp_inherit) { + lds->lds_dir_def_striping_set = 0; + if (!lds->lds_def_striping_set) + lc->ldo_def_striping = NULL; + } } - /* shrink the stripe_count to the avaible MDT count */ - if (lc->ldo_dir_stripe_count > d->lod_remote_mdt_count + 1 && - !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE)) { - lc->ldo_dir_stripe_count = d->lod_remote_mdt_count + 1; + /* shrink the stripe count to max_mdt_stripecount if it is -1 + * and max_mdt_stripecount is not 0 + */ + if (lc->ldo_dir_stripe_count == (__u16)(-1) && + d->lod_max_mdt_stripecount) + lc->ldo_dir_stripe_count = d->lod_max_mdt_stripecount; + + max_stripe_count = d->lod_remote_mdt_count + 1; + if (lc->ldo_dir_hash_type & LMV_HASH_FLAG_OVERSTRIPED) + max_stripe_count = + max_stripe_count * LMV_MAX_STRIPES_PER_MDT; + + /* shrink the stripe_count to max stripe count */ + if (lc->ldo_dir_stripe_count > max_stripe_count && + !CFS_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE)) { + lc->ldo_dir_stripe_count = max_stripe_count; if (lc->ldo_dir_stripe_count == 1) lc->ldo_dir_stripe_count = 0; } - CDEBUG(D_INFO, "final dir stripe [%hu %d %u]\n", + if (!lmv_is_known_hash_type(lc->ldo_dir_hash_type)) + lc->ldo_dir_hash_type = + (lc->ldo_dir_hash_type & LMV_HASH_FLAG_KNOWN) | + d->lod_mdt_descs.ltd_lmv_desc.ld_pattern; + + /* make sure all fscrypt metadata stays on same mdt */ + if (child->do_lu.lo_header->loh_attr & LOHA_FSCRYPT_MD) { + lc->ldo_dir_stripe_count = 0; + lds->lds_dir_def_stripe_offset = + lod2lu_dev(d)->ld_site->ld_seq_site->ss_node_id; + lds->lds_dir_def_striping_set = 1; + lc->ldo_def_striping = lds; + } + + CDEBUG(D_INFO, "final dir stripe_count=%hu offset=%d hash=%x\n", lc->ldo_dir_stripe_count, (int)lc->ldo_dir_stripe_offset, lc->ldo_dir_hash_type); @@ -5233,9 +5982,13 @@ static void lod_ah_init(const struct lu_env *env, * Try from the parent first. */ if (likely(lp != NULL)) { - rc = lod_get_default_lov_striping(env, lp, lds); - if (rc == 0) - lod_striping_from_default(lc, lds, child_mode); + rc = lod_get_default_lov_striping(env, lp, lds, ah); + if (rc == 0 && lds->lds_def_striping_set) { + rc = lod_verify_striping(env, d, lp, &info->lti_buf, + false); + if (rc == 0) + lod_striping_from_default(lc, lds, child_mode); + } } /* Initialize lod_device::lod_md_root object reference */ @@ -5261,10 +6014,18 @@ static void lod_ah_init(const struct lu_env *env, * - parent has plain(v1/v3) default layout, and some attributes * are not specified in the default layout; */ - if (d->lod_md_root != NULL && lod_need_inherit_more(lc, true)) { - rc = lod_get_default_lov_striping(env, d->lod_md_root, lds); + if (d->lod_md_root != NULL && + lod_need_inherit_more(lc, true, ah->dah_append_pool)) { + rc = lod_get_default_lov_striping(env, d->lod_md_root, lds, + ah); + if (rc || !lds->lds_def_striping_set) + goto out; + + rc = lod_verify_striping(env, d, d->lod_md_root, &info->lti_buf, + false); if (rc) goto out; + if (lc->ldo_comp_cnt == 0) { lod_striping_from_default(lc, lds, child_mode); } else if (!lds->lds_def_striping_is_composite) { @@ -5285,7 +6046,7 @@ static void lod_ah_init(const struct lu_env *env, lod_comp->llc_stripe_offset = def_comp->llc_stripe_offset; if (lod_comp->llc_pool == NULL) - lod_obj_set_pool(lc, 0, def_comp->llc_pool); + lod_qos_set_pool(lc, 0, def_comp->llc_pool); } } out: @@ -5293,7 +6054,7 @@ out: * fs default striping may not be explicitly set, or historically set * in config log, use them. */ - if (lod_need_inherit_more(lc, false)) { + if (lod_need_inherit_more(lc, false, ah->dah_append_pool)) { if (lc->ldo_comp_cnt == 0) { rc = lod_alloc_comp_entries(lc, 0, 1); if (rc) @@ -5306,14 +6067,16 @@ out: } LASSERT(!lc->ldo_is_composite); lod_comp = &lc->ldo_comp_entries[0]; - desc = &d->lod_desc; - lod_adjust_stripe_info(lod_comp, desc); + desc = &d->lod_ost_descs.ltd_lov_desc; + lod_adjust_stripe_info(lod_comp, desc, + ah->dah_append_stripe_count); + if (ah->dah_append_pool && ah->dah_append_pool[0]) + lod_qos_set_pool(lc, 0, ah->dah_append_pool); } EXIT; } -#define ll_do_div64(aaa,bbb) do_div((aaa), (bbb)) /** * Size initialization on late striping. * @@ -5377,14 +6140,13 @@ static int lod_declare_init_size(const struct lu_env *env, continue; LASSERT(objects != NULL && stripe_size != 0); - /* ll_do_div64(a, b) returns a % b, and a = a / b */ - ll_do_div64(size, (__u64)stripe_size); - stripe = ll_do_div64(size, (__u64)stripe_count); + do_div(size, stripe_size); + stripe = do_div(size, stripe_count); LASSERT(objects[stripe] != NULL); size = size * stripe_size; offs = attr->la_size; - size += ll_do_div64(offs, stripe_size); + size += do_div(offs, stripe_size); attr->la_valid = LA_SIZE; attr->la_size = size; @@ -5426,7 +6188,7 @@ int lod_declare_striped_create(const struct lu_env *env, struct dt_object *dt, int rc; ENTRY; - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) + if (CFS_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO)) GOTO(out, rc = -ENOMEM); if (!dt_object_remote(next)) { @@ -5510,7 +6272,7 @@ static inline int dt_object_qos_mkdir(const struct lu_env *env, return -EINVAL; lmu = info->lti_ea_store; - return !!(le32_to_cpu(lmu->lum_hash_type) & LMV_HASH_FLAG_SPACE); + return le32_to_cpu(lmu->lum_stripe_offset) == LMV_OFFSET_DEFAULT; } /** @@ -5559,7 +6321,6 @@ static int lod_declare_create(const struct lu_env *env, struct dt_object *dt, } else if (dof->dof_type == DFT_DIR) { struct seq_server_site *ss; struct lu_buf buf = { NULL }; - struct lu_buf *lmu = NULL; ss = lu_site2seq(dt->do_lu.lo_dev->ld_site); @@ -5573,35 +6334,18 @@ static int lod_declare_create(const struct lu_env *env, struct dt_object *dt, * striped directory with specified stripeEA, then it * should ignore the default stripeEA */ if (hint != NULL && hint->dah_eadata == NULL) { - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STALE_DIR_LAYOUT)) + if (CFS_FAIL_CHECK(OBD_FAIL_MDS_STALE_DIR_LAYOUT)) GOTO(out, rc = -EREMOTE); - if (lo->ldo_dir_stripe_offset == -1) { - /* - * child and parent should be in the same MDT, - * but if parent has plain layout, it's allowed. - */ - if (hint->dah_parent && - dt_object_remote(hint->dah_parent)) { - rc = dt_object_qos_mkdir(env, - lo->ldo_obj.do_lu.lo_dev, - hint->dah_parent); - if (rc <= 0) - GOTO(out, rc ? rc : -EREMOTE); - } - } else if (lo->ldo_dir_stripe_offset != - ss->ss_node_id) { + if (lo->ldo_dir_stripe_offset != LMV_OFFSET_DEFAULT && + lo->ldo_dir_stripe_offset != ss->ss_node_id) { struct lod_device *lod; - struct lod_tgt_descs *ltd; - struct lod_tgt_desc *tgt = NULL; + struct lu_tgt_desc *mdt = NULL; bool found_mdt = false; - int i; lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); - ltd = &lod->lod_mdt_descs; - cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) { - tgt = LTD_TGT(ltd, i); - if (tgt->ltd_index == + lod_foreach_mdt(lod, mdt) { + if (mdt->ltd_index == lo->ldo_dir_stripe_offset) { found_mdt = true; break; @@ -5618,12 +6362,11 @@ static int lod_declare_create(const struct lu_env *env, struct dt_object *dt, GOTO(out, rc = -EINVAL); } } else if (hint && hint->dah_eadata) { - lmu = &buf; - lmu->lb_buf = (void *)hint->dah_eadata; - lmu->lb_len = hint->dah_eadata_len; + buf.lb_buf = (void *)hint->dah_eadata; + buf.lb_len = hint->dah_eadata_len; } - rc = lod_declare_dir_striping_create(env, dt, attr, lmu, dof, + rc = lod_declare_dir_striping_create(env, dt, attr, &buf, dof, th); } out: @@ -5671,10 +6414,10 @@ again: if (i == lo->ldo_comp_cnt) RETURN(pflr_id(mirror_id, id)); } - if (end == LCME_ID_MAX) { + + if (end == SEQ_ID_MAX) { + end = min_t(__u32, start, SEQ_ID_MAX) - 1; start = 1; - end = min(lo->ldo_layout_gen & LCME_ID_MASK, - (__u32)(LCME_ID_MAX - 1)); goto again; } @@ -5710,6 +6453,8 @@ int lod_striped_create(const struct lu_env *env, struct dt_object *dt, int rc = 0, i, j; ENTRY; + mutex_lock(&lo->ldo_layout_mutex); + LASSERT((lo->ldo_comp_cnt != 0 && lo->ldo_comp_entries != NULL) || lo->ldo_is_foreign); @@ -5744,10 +6489,15 @@ int lod_striped_create(const struct lu_env *env, struct dt_object *dt, if (lod_comp_inited(lod_comp)) continue; + if (lod_comp->llc_magic == LOV_MAGIC_FOREIGN) { + lod_comp_set_init(lod_comp); + continue; + } + if (lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED) lod_comp_set_init(lod_comp); - if (lov_pattern(lod_comp->llc_pattern) == LOV_PATTERN_MDT) + if (lov_pattern(lod_comp->llc_pattern) & LOV_PATTERN_MDT) lod_comp_set_init(lod_comp); if (lod_comp->llc_stripe == NULL) @@ -5768,15 +6518,20 @@ int lod_striped_create(const struct lu_env *env, struct dt_object *dt, if (rc) GOTO(out, rc); + lo->ldo_comp_cached = 1; + rc = lod_generate_and_set_lovea(env, lo, th); if (rc) GOTO(out, rc); - lo->ldo_comp_cached = 1; + mutex_unlock(&lo->ldo_layout_mutex); + RETURN(0); out: - lod_striping_free(env, lo); + lod_striping_free_nolock(env, lo); + mutex_unlock(&lo->ldo_layout_mutex); + RETURN(rc); } @@ -5793,7 +6548,7 @@ static inline bool lod_obj_is_dom(struct dt_object *dt) if (!lo->ldo_comp_cnt) return false; - return (lov_pattern(lo->ldo_comp_entries[0].llc_pattern) == + return (lov_pattern(lo->ldo_comp_entries[0].llc_pattern) & LOV_PATTERN_MDT); } @@ -5836,11 +6591,12 @@ lod_obj_stripe_destroy_cb(const struct lu_env *env, struct lod_object *lo, { if (data->locd_declare) return lod_sub_declare_destroy(env, dt, th); - else if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) || - stripe_idx == cfs_fail_val) + + if (!CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) || + stripe_idx == cfs_fail_val) return lod_sub_destroy(env, dt, th); - else - return 0; + + return 0; } /** @@ -5908,8 +6664,8 @@ static int lod_declare_destroy(const struct lu_env *env, struct dt_object *dt, if (rc) RETURN(rc); - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ) || - OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ2)) + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ) || + CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ2)) RETURN(0); if (!lod_obj_is_striped(dt)) @@ -6000,8 +6756,8 @@ static int lod_destroy(const struct lu_env *env, struct dt_object *dt, if (rc != 0) RETURN(rc); - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ) || - OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ2)) + if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ) || + CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ2)) RETURN(0); if (!lod_obj_is_striped(dt)) @@ -6017,7 +6773,7 @@ static int lod_destroy(const struct lu_env *env, struct dt_object *dt, if (!dt_object_exists(stripe)) continue; - if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) || + if (!CFS_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_SPEOBJ) || i == cfs_fail_val) { dt_write_lock(env, stripe, DT_TGT_CHILD); rc = lod_sub_ref_del(env, stripe, th); @@ -6216,10 +6972,6 @@ static int lod_object_lock(const struct lu_env *env, ldlm_completion_callback completion = einfo->ei_cb_cp; __u64 dlmflags = LDLM_FL_ATOMIC_CB; - if (einfo->ei_mode == LCK_PW || - einfo->ei_mode == LCK_EX) - dlmflags |= LDLM_FL_COS_INCOMPAT; - LASSERT(ns != NULL); rc = ldlm_cli_enqueue_local(env, ns, res_id, LDLM_IBITS, policy, einfo->ei_mode, @@ -6254,7 +7006,9 @@ static int lod_invalidate(const struct lu_env *env, struct dt_object *dt) } static int lod_declare_instantiate_components(const struct lu_env *env, - struct lod_object *lo, struct thandle *th) + struct lod_object *lo, + struct thandle *th, + __u64 reserve) { struct lod_thread_info *info = lod_env_info(env); int i; @@ -6265,7 +7019,7 @@ static int lod_declare_instantiate_components(const struct lu_env *env, for (i = 0; i < info->lti_count; i++) { rc = lod_qos_prep_create(env, lo, NULL, th, - info->lti_comp_idx[i]); + info->lti_comp_idx[i], reserve); if (rc) break; } @@ -6301,15 +7055,15 @@ static int lod_declare_instantiate_components(const struct lu_env *env, */ static bool lod_sel_osts_allowed(const struct lu_env *env, struct lod_object *lo, - int index, __u64 extension_size, + int index, __u64 reserve, struct lu_extent *extent, struct lu_extent *comp_extent, int write) { struct lod_layout_component *lod_comp = &lo->ldo_comp_entries[index]; struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); - struct obd_statfs *sfs = &lod_env_info(env)->lti_osfs; + struct lod_thread_info *tinfo = lod_env_info(env); + struct obd_statfs *sfs = &tinfo->lti_osfs; __u64 available = 0; - __u64 size; bool ret = true; int i, rc; @@ -6317,21 +7071,6 @@ static bool lod_sel_osts_allowed(const struct lu_env *env, LASSERT(lod_comp->llc_stripe_count != 0); - if (write == 0 || - (extent->e_start == 0 && extent->e_end == OBD_OBJECT_EOF)) { - /* truncate or append */ - size = extension_size; - } else { - /* In case of write op, check the real write extent, - * it may be larger than the extension_size */ - size = roundup(min(extent->e_end, comp_extent->e_end) - - max(extent->e_start, comp_extent->e_start), - extension_size); - } - /* extension_size is file level, so we must divide by stripe count to - * compare it to available space on a single OST */ - size /= lod_comp->llc_stripe_count; - lod_getref(&lod->lod_ost_descs); for (i = 0; i < lod_comp->llc_stripe_count; i++) { int index = lod_comp->llc_ost_indices[i]; @@ -6358,13 +7097,13 @@ static bool lod_sel_osts_allowed(const struct lu_env *env, if (j < lod_comp->llc_stripe_count) continue; - if (!cfs_bitmap_check(lod->lod_ost_bitmap, index)) { + if (!test_bit(index, lod->lod_ost_bitmap)) { CDEBUG(D_LAYOUT, "ost %d no longer present\n", index); ret = false; break; } - rc = dt_statfs_info(env, ost->ltd_ost, sfs, &info); + rc = dt_statfs_info(env, ost->ltd_tgt, sfs, &info); if (rc) { CDEBUG(D_LAYOUT, "statfs failed for ost %d, error %d\n", index, rc); @@ -6372,11 +7111,13 @@ static bool lod_sel_osts_allowed(const struct lu_env *env, break; } - if (sfs->os_state & OS_STATE_ENOSPC || - sfs->os_state & OS_STATE_READONLY || - sfs->os_state & OS_STATE_DEGRADED) { - CDEBUG(D_LAYOUT, "ost %d is not availble for SEL " - "extension, state %u\n", index, sfs->os_state); + if (sfs->os_state & OS_STATFS_ENOSPC || + sfs->os_state & OS_STATFS_READONLY || + sfs->os_state & OS_STATFS_NOCREATE || + sfs->os_state & OS_STATFS_DEGRADED) { + CDEBUG(D_LAYOUT, + "OST%04x unusable for SEL extension, state %x\n", + index, sfs->os_state); ret = false; break; } @@ -6392,11 +7133,11 @@ static bool lod_sel_osts_allowed(const struct lu_env *env, (100ull * sfs->os_bavail) / sfs->os_blocks, (100ull * sfs->os_bfree) / sfs->os_blocks); - if (size * repeated > available) { + if (reserve * repeated > available) { ret = false; CDEBUG(D_LAYOUT, "low space on ost %d, available %llu " - "< extension size %llu\n", index, available, - extension_size); + "< extension size %llu repeated %d\n", index, + available, reserve, repeated); break; } } @@ -6502,6 +7243,26 @@ static __u64 lod_extension_new_end(__u64 extension_size, __u64 extent_end, return new_end; } +/** + * Calculate the exact reservation (per-OST extension_size) on the OSTs being + * instantiated. It needs to be calculated in advance and taken into account at + * the instantiation time, because otherwise lod_statfs_and_check() may consider + * an OST as OK, but SEL needs its extension_size to fit the free space and the + * OST may turn out to be low-on-space, thus inappropriate OST may be used and + * ENOSPC occurs. + * + * \param[in] lod_comp lod component we are checking + * + * \retval size to reserved on each OST of lod_comp's stripe. + */ +static __u64 lod_sel_stripe_reserved(struct lod_layout_component *lod_comp) +{ + /* extension_size is file level, so we must divide by stripe count to + * compare it to available space on a single OST */ + return lod_comp->llc_stripe_size * SEL_UNIT_SIZE / + lod_comp->llc_stripe_count; +} + /* As lod_sel_handler() could be re-entered for the same component several * times, this is the data for the next call. Fields could be changed to * component indexes when needed, (e.g. if there is no need to instantiate @@ -6583,7 +7344,7 @@ static int lod_sel_handler(const struct lu_env *env, struct lod_layout_component *lod_comp; struct lod_layout_component *prev; struct lod_layout_component *next = NULL; - __u64 extension_size; + __u64 extension_size, reserve; __u64 new_end = 0; bool repeated; int change = 0; @@ -6620,11 +7381,13 @@ static int lod_sel_handler(const struct lu_env *env, RETURN(-EINVAL); } + reserve = lod_sel_stripe_reserved(lod_comp); + if (!prev->llc_stripe) { CDEBUG(D_LAYOUT, "Previous component not inited\n"); info->lti_count = 1; info->lti_comp_idx[0] = index - 1; - rc = lod_declare_instantiate_components(env, lo, th); + rc = lod_declare_instantiate_components(env, lo, th, reserve); /* ENOSPC tells us we can't use this component. If there is * a next or we are repeating, we either spill over (next) or * extend the original comp (repeat). Otherwise, return the @@ -6636,8 +7399,7 @@ static int lod_sel_handler(const struct lu_env *env, } if (sd->sd_force == 0 && rc == 0) - rc = !lod_sel_osts_allowed(env, lo, index - 1, - extension_size, extent, + rc = !lod_sel_osts_allowed(env, lo, index - 1, reserve, extent, &lod_comp->llc_extent, write); repeated = !!(sd->sd_repeat); @@ -6759,8 +7521,13 @@ static int lod_declare_update_extents(const struct lu_env *env, ENTRY; /* This makes us work on the components of the chosen mirror */ - start_index = lo->ldo_mirrors[pick].lme_start; - max_comp = lo->ldo_mirrors[pick].lme_end + 1; + if (lo->ldo_mirrors) { + start_index = lo->ldo_mirrors[pick].lme_start; + max_comp = lo->ldo_mirrors[pick].lme_end + 1; + } else { + start_index = 0; + max_comp = lo->ldo_comp_cnt; + } if (lo->ldo_flr_state == LCM_FL_NONE) LASSERT(start_index == 0 && max_comp == lo->ldo_comp_cnt); @@ -6789,12 +7556,14 @@ static int lod_declare_update_extents(const struct lu_env *env, /* We may have added or removed components. If so, we must update the * start & ends of all the mirrors after the current one, and the end * of the current mirror. */ - change = max_comp - 1 - lo->ldo_mirrors[pick].lme_end; - if (change) { - lo->ldo_mirrors[pick].lme_end += change; - for (i = pick + 1; i < lo->ldo_mirror_count; i++) { - lo->ldo_mirrors[i].lme_start += change; - lo->ldo_mirrors[i].lme_end += change; + if (lo->ldo_mirrors) { + change = max_comp - 1 - lo->ldo_mirrors[pick].lme_end; + if (change) { + lo->ldo_mirrors[pick].lme_end += change; + for (i = pick + 1; i < lo->ldo_mirror_count; i++) { + lo->ldo_mirrors[i].lme_start += change; + lo->ldo_mirrors[i].lme_end += change; + } } } @@ -6809,7 +7578,10 @@ out: /* If striping is already instantiated or INIT'ed DOM? */ static bool lod_is_instantiation_needed(struct lod_layout_component *comp) { - return !(((lov_pattern(comp->llc_pattern) == LOV_PATTERN_MDT) && + if (comp->llc_magic == LOV_MAGIC_FOREIGN) + return false; + + return !(((lov_pattern(comp->llc_pattern) & LOV_PATTERN_MDT) && lod_comp_inited(comp)) || comp->llc_stripe); } @@ -6886,22 +7658,22 @@ static int lod_declare_update_plain(const struct lu_env *env, lod_comp = &lo->ldo_comp_entries[lo->ldo_comp_cnt - 1]; if (lo->ldo_comp_cnt > 1 && lod_comp->llc_extent.e_end != OBD_OBJECT_EOF && - lod_comp->llc_extent.e_end < layout->li_extent.e_end) { + lod_comp->llc_extent.e_end < layout->lai_extent.e_end) { CDEBUG_LIMIT(replay ? D_ERROR : D_LAYOUT, "%s: the defined layout [0, %#llx) does not " "covers the write range "DEXT"\n", lod2obd(d)->obd_name, lod_comp->llc_extent.e_end, - PEXT(&layout->li_extent)); + PEXT(&layout->lai_extent)); GOTO(out, rc = -EINVAL); } CDEBUG(D_LAYOUT, "%s: "DFID": update components "DEXT"\n", lod2obd(d)->obd_name, PFID(lod_object_fid(lo)), - PEXT(&layout->li_extent)); + PEXT(&layout->lai_extent)); if (!replay) { - rc = lod_declare_update_extents(env, lo, &layout->li_extent, - th, 0, layout->li_opc == LAYOUT_INTENT_WRITE); + rc = lod_declare_update_extents(env, lo, &layout->lai_extent, + th, 0, layout->lai_opc == LAYOUT_INTENT_WRITE); if (rc < 0) GOTO(out, rc); else if (rc) @@ -6915,7 +7687,7 @@ static int lod_declare_update_plain(const struct lu_env *env, for (i = 0; i < lo->ldo_comp_cnt; i++) { lod_comp = &lo->ldo_comp_entries[i]; - if (lod_comp->llc_extent.e_start >= layout->li_extent.e_end) + if (lod_comp->llc_extent.e_start >= layout->lai_extent.e_end) break; if (!replay) { @@ -6951,7 +7723,7 @@ static int lod_declare_update_plain(const struct lu_env *env, RETURN(-EALREADY); lod_obj_inc_layout_gen(lo); - rc = lod_declare_instantiate_components(env, lo, th); + rc = lod_declare_instantiate_components(env, lo, th, 0); EXIT; out: if (rc) @@ -7000,6 +7772,7 @@ restart: for (i = 0; i < lo->ldo_mirror_count; i++) { if (i == primary) continue; + rc = lod_declare_update_extents(env, lo, &pri_extent, th, i, 0); /* if update_extents changed the layout, it may have @@ -7023,6 +7796,8 @@ restart: lod_comp->llc_flags |= LCME_FL_STALE; lo->ldo_mirrors[i].lme_stale = 1; + if (lod_is_hsm(lod_comp)) + lod_comp->llc_foreign_flags |= HS_DIRTY; } } } @@ -7060,12 +7835,9 @@ static inline int lod_check_ost_avail(const struct lu_env *env, } ost = OST_TGT(lod, idx); - if (ost->ltd_statfs.os_state & - (OS_STATE_READONLY | OS_STATE_ENOSPC | OS_STATE_ENOINO | - OS_STATE_NOPRECREATE) || - ost->ltd_active == 0) { - CDEBUG(D_LAYOUT, DFID ": mirror %d OST%d unavail, rc = %d\n", - PFID(lod_object_fid(lo)), index, idx, rc); + if (ost->ltd_active == 0) { + CDEBUG(D_LAYOUT, DFID ": mirror %d OST%d unavail\n", + PFID(lod_object_fid(lo)), index, idx); return 0; } @@ -7088,7 +7860,7 @@ static int lod_primary_pick(const struct lu_env *env, struct lod_object *lo, int picked = -1, second_pick = -1, third_pick = -1; ENTRY; - if (OBD_FAIL_CHECK(OBD_FAIL_FLR_RANDOM_PICK_MIRROR)) { + if (CFS_FAIL_CHECK(OBD_FAIL_FLR_RANDOM_PICK_MIRROR)) { get_random_bytes(&seq, sizeof(seq)); seq %= lo->ldo_mirror_count; } @@ -7099,7 +7871,12 @@ static int lod_primary_pick(const struct lu_env *env, struct lod_object *lo, * This algo can be revised later after knowing the topology of * cluster. */ - lod_qos_statfs_update(env, lod); + lod_qos_statfs_update(env, lod, &lod->lod_ost_descs); + + rc = lod_fill_mirrors(lo); + if (rc) + RETURN(rc); + for (i = 0; i < lo->ldo_mirror_count; i++) { bool ost_avail = true; int index = (i + seq) % lo->ldo_mirror_count; @@ -7111,7 +7888,7 @@ static int lod_primary_pick(const struct lu_env *env, struct lod_object *lo, } /* 2nd pick is for the primary mirror containing unavail OST */ - if (lo->ldo_mirrors[index].lme_primary && second_pick < 0) + if (lo->ldo_mirrors[index].lme_prefer && second_pick < 0) second_pick = index; /* 3rd pick is for non-primary mirror containing unavail OST */ @@ -7122,7 +7899,7 @@ static int lod_primary_pick(const struct lu_env *env, struct lod_object *lo, * we found a non-primary 1st pick, we'd like to find a * potential pirmary mirror. */ - if (picked >= 0 && !lo->ldo_mirrors[index].lme_primary) + if (picked >= 0 && !lo->ldo_mirrors[index].lme_prefer) continue; /* check the availability of OSTs */ @@ -7159,7 +7936,7 @@ static int lod_primary_pick(const struct lu_env *env, struct lod_object *lo, * primary with all OSTs are available, this is the perfect * 1st pick. */ - if (lo->ldo_mirrors[index].lme_primary) + if (lo->ldo_mirrors[index].lme_prefer) break; } /* for all mirrors */ @@ -7245,29 +8022,277 @@ static int lod_prepare_resync(const struct lu_env *env, struct lod_object *lo, return need_sync ? 0 : -EALREADY; } -static int lod_declare_update_rdonly(const struct lu_env *env, - struct lod_object *lo, struct md_layout_change *mlc, - struct thandle *th) +static struct lod_layout_component * +lod_locate_comp_hsm(struct lod_object *lo, int *hsm_mirror_id) { - struct lod_thread_info *info = lod_env_info(env); - struct lu_attr *layout_attr = &info->lti_layout_attr; - struct lod_layout_component *lod_comp; - struct lu_extent extent = { 0 }; - int rc; - ENTRY; - - LASSERT(lo->ldo_flr_state == LCM_FL_RDONLY); - LASSERT(mlc->mlc_opc == MD_LAYOUT_WRITE || - mlc->mlc_opc == MD_LAYOUT_RESYNC); - LASSERT(lo->ldo_mirror_count > 0); + struct lod_layout_component *lod_comp = NULL; + int i; - if (mlc->mlc_opc == MD_LAYOUT_WRITE) { - struct layout_intent *layout = mlc->mlc_intent; - int write = layout->li_opc == LAYOUT_INTENT_WRITE; - int picked; + if (!lo->ldo_is_composite) + return NULL; - extent = layout->li_extent; - CDEBUG(D_LAYOUT, DFID": trying to write :"DEXT"\n", + for (i = 0; i < lo->ldo_mirror_count; i++) { + /* + * FIXME: In the current design, there is only one HSM + * mirror component in range [0, EOF] for a FLR file. This + * should be fixed to support multiple HSM mirror components + * with different HSM backend types and partial file ranges + * in the future. + */ + if (lo->ldo_mirrors[i].lme_hsm) { + __u16 start_idx; + __u16 end_idx; + + if (hsm_mirror_id) + *hsm_mirror_id = i; + start_idx = lo->ldo_mirrors[i].lme_start; + end_idx = lo->ldo_mirrors[i].lme_end; + LASSERT(start_idx == end_idx); + lod_comp = &lo->ldo_comp_entries[start_idx]; + LASSERT(lo->ldo_is_composite && lod_is_hsm(lod_comp) && + lod_comp->llc_extent.e_start == 0 && + lod_comp->llc_extent.e_end == LUSTRE_EOF); + break; + } + } + + return lod_comp; +} + +static int lod_declare_pccro_set(const struct lu_env *env, + struct dt_object *dt, struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lu_buf *buf = &info->lti_buf; + struct lod_object *lo = lod_dt_obj(dt); + struct lod_layout_component *lod_comp; + struct lod_layout_component *comp_array; + struct lod_mirror_entry *mirror_array; + __u16 mirror_id; + int hsm_mirror_id; + int mirror_cnt; + int new_cnt; + int rc; + int i; + + ENTRY; + + rc = lod_striping_load(env, lo); + if (rc) + RETURN(rc); + + if (lo->ldo_flr_state & LCM_FL_PCC_RDONLY) + RETURN(-EALREADY); + + rc = lod_layout_data_init(info, lo->ldo_comp_cnt); + if (rc) + RETURN(rc); + + lod_comp = lod_locate_comp_hsm(lo, &hsm_mirror_id); + if (lod_comp) { + if (lod_comp->llc_foreign_flags & HS_PCCRO) { + CDEBUG(D_LAYOUT, "bad HSM flags: %#x\n", + lod_comp->llc_foreign_flags); + RETURN(-EINVAL); + } + + lod_obj_inc_layout_gen(lo); + lod_comp->llc_foreign_flags |= HS_PCCRO; + lod_comp->llc_foreign_flags &= ~HS_DIRTY; + lod_comp->llc_flags &= ~LCME_FL_STALE; + lo->ldo_mirrors[hsm_mirror_id].lme_stale = 0; + lo->ldo_flr_state |= LCM_FL_PCC_RDONLY; + buf->lb_len = lod_comp_md_size(lo, false); + rc = lod_sub_declare_xattr_set(env, lod_object_child(lo), + buf, XATTR_NAME_LOV, 0, th); + RETURN(rc); + } + + /* + * Create an new composite layout with only one HSM component. + * Field @lhm_archive_uuid is used to be the identifier within HSM + * backend for the archive copy. In the PCC case with a POSIX archive, + * This can just be the original inode FID. This is important because + * the inode FID may change due to layout swaps or migration to a new + * MDT, and we do not want that to cause problems with finding the copy + * in HSM/PCC. + */ + mirror_cnt = lo->ldo_mirror_count + 1; + if (!lo->ldo_is_composite) { + LASSERT(lo->ldo_mirror_count == 0); + mirror_cnt++; + } + + OBD_ALLOC_PTR_ARRAY(mirror_array, mirror_cnt); + if (mirror_array == NULL) + RETURN(-ENOMEM); + + new_cnt = lo->ldo_comp_cnt + 1; + OBD_ALLOC_PTR_ARRAY(comp_array, new_cnt); + if (comp_array == NULL) { + OBD_FREE_PTR_ARRAY(mirror_array, mirror_cnt); + RETURN(-ENOMEM); + } + + mirror_id = 0; + for (i = 0; i < lo->ldo_comp_cnt; i++) { + lod_comp = &lo->ldo_comp_entries[i]; + + /* + * Add mirror from a non-flr file, create new mirror ID. + * Otherwise, keep existing mirror's component ID, used + * for mirror extension. + */ + if (lo->ldo_mirror_count == 0 && + mirror_id_of(lod_comp->llc_id) == 0) + lod_comp->llc_id = pflr_id(1, i + 1); + + if (lod_comp->llc_id != LCME_ID_INVAL && + mirror_id_of(lod_comp->llc_id) > mirror_id) + mirror_id = mirror_id_of(lod_comp->llc_id); + + if (!lo->ldo_is_composite) { + lod_comp->llc_extent.e_start = 0; + lod_comp->llc_extent.e_end = LUSTRE_EOF; + lod_comp_set_init(lod_comp); + } + } + + memcpy(comp_array, lo->ldo_comp_entries, + sizeof(*comp_array) * lo->ldo_comp_cnt); + + lod_comp = &comp_array[new_cnt - 1]; + lod_comp->llc_magic = LOV_MAGIC_FOREIGN; + lod_comp->llc_extent.e_start = 0; + lod_comp->llc_extent.e_end = LUSTRE_EOF; + lod_comp->llc_length = sizeof(struct lov_hsm_base); + lod_comp->llc_type = LU_FOREIGN_TYPE_PCCRO; + lod_comp->llc_foreign_flags = HS_EXISTS | HS_ARCHIVED | HS_PCCRO; + memset(&lod_comp->llc_hsm, 0, sizeof(lod_comp->llc_hsm)); + + if (lo->ldo_mirrors) + OBD_FREE_PTR_ARRAY(lo->ldo_mirrors, lo->ldo_mirror_count); + OBD_FREE_PTR_ARRAY(lo->ldo_comp_entries, lo->ldo_comp_cnt); + + /* + * The @ldo_mirror will be refilled by lod_fill_mirrors() when + * call lod_striped_create() for layout change. + */ + lo->ldo_mirrors = mirror_array; + lo->ldo_mirror_count = mirror_cnt; + lo->ldo_comp_entries = comp_array; + lo->ldo_comp_cnt = new_cnt; + lo->ldo_is_composite = 1; + + ++mirror_id; + lod_comp->llc_id = LCME_ID_INVAL; + lod_comp->llc_id = lod_gen_component_id(lo, mirror_id, new_cnt - 1); + + if (lo->ldo_flr_state == LCM_FL_NONE) + lo->ldo_flr_state = LCM_FL_RDONLY; + lo->ldo_flr_state |= LCM_FL_PCC_RDONLY; + buf->lb_len = lod_comp_md_size(lo, false); + rc = lod_sub_declare_xattr_set(env, lod_object_child(lo), + buf, XATTR_NAME_LOV, 0, th); + if (rc) + lod_striping_free(env, lo); + + RETURN(rc); +} + +/* + * TODO: When clear LCM_FL_PCC_RDONLY flag from the layouts, it means the file + * is going to be modified. Currently it needs two RPCs: first one is to clear + * LCM_FL_PCC_RDONLY flag; the second one is to pick primary mirror and mark + * the file as LCM_FL_WRITE_PENDING. + * These two RPCs can be combined in one RPC call. + */ +static int lod_declare_pccro_clear(const struct lu_env *env, + struct dt_object *dt, struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_object *lo = lod_dt_obj(dt); + struct lod_layout_component *lod_comp; + int rc; + + ENTRY; + + rc = lod_striping_load(env, lo); + if (rc) + RETURN(rc); + + if (!(lo->ldo_flr_state & LCM_FL_PCC_RDONLY)) + RETURN(-EALREADY); + + rc = lod_layout_data_init(info, lo->ldo_comp_cnt); + if (rc) + RETURN(rc); + + lod_comp = lod_locate_comp_hsm(lo, NULL); + if (lod_comp == NULL) { + CDEBUG(D_LAYOUT, "Not found any HSM component\n"); + GOTO(out, rc = -EINVAL); + } + + lod_comp->llc_foreign_flags &= ~HS_PCCRO; + lo->ldo_flr_state &= ~LCM_FL_PCC_RDONLY; + lod_obj_inc_layout_gen(lo); + info->lti_buf.lb_len = lod_comp_md_size(lo, false); + rc = lod_sub_declare_xattr_set(env, lod_object_child(lo), + &info->lti_buf, XATTR_NAME_LOV, 0, th); +out: + if (rc) + lod_striping_free(env, lo); + + RETURN(rc); +} + +static int lod_declare_update_pccro(const struct lu_env *env, + struct dt_object *dt, + struct md_layout_change *mlc, + struct thandle *th) +{ + struct layout_intent *intent = mlc->mlc_intent; + int rc; + + switch (intent->lai_opc) { + case LAYOUT_INTENT_PCCRO_SET: + rc = lod_declare_pccro_set(env, dt, th); + break; + case LAYOUT_INTENT_PCCRO_CLEAR: + rc = lod_declare_pccro_clear(env, dt, th); + break; + default: + rc = -EOPNOTSUPP; + break; + } + + return rc; +} + +static int lod_declare_update_rdonly(const struct lu_env *env, + struct lod_object *lo, struct md_layout_change *mlc, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lu_attr *layout_attr = &info->lti_layout_attr; + struct lod_layout_component *lod_comp; + struct lu_extent extent = { 0 }; + int rc; + ENTRY; + + LASSERT(lo->ldo_flr_state == LCM_FL_RDONLY); + LASSERT(mlc->mlc_opc == MD_LAYOUT_WRITE || + mlc->mlc_opc == MD_LAYOUT_RESYNC); + LASSERT(lo->ldo_mirror_count > 0); + + if (mlc->mlc_opc == MD_LAYOUT_WRITE) { + struct layout_intent *layout = mlc->mlc_intent; + int write = layout->lai_opc == LAYOUT_INTENT_WRITE; + int picked; + + extent = layout->lai_extent; + CDEBUG(D_LAYOUT, DFID": trying to write :"DEXT"\n", PFID(lod_object_fid(lo)), PEXT(&extent)); picked = lod_primary_pick(env, lo, &extent); @@ -7284,7 +8309,7 @@ static int lod_declare_update_rdonly(const struct lu_env *env, if (rc < 0) GOTO(out, rc); - if (layout->li_opc == LAYOUT_INTENT_TRUNC) { + if (layout->lai_opc == LAYOUT_INTENT_TRUNC) { /** * trunc transfers [0, size) in the intent extent, we'd * stale components overlapping [size, eof). @@ -7299,7 +8324,7 @@ static int lod_declare_update_rdonly(const struct lu_env *env, GOTO(out, rc); /* restore truncate intent extent */ - if (layout->li_opc == LAYOUT_INTENT_TRUNC) + if (layout->lai_opc == LAYOUT_INTENT_TRUNC) extent.e_end = extent.e_start; /* instantiate components for the picked mirror, start from 0 */ @@ -7361,19 +8386,13 @@ static int lod_declare_update_rdonly(const struct lu_env *env, * This way it can make sure that the layout version is * monotonously increased in this writing era. */ lod_obj_inc_layout_gen(lo); - if (lo->ldo_layout_gen > (LCME_ID_MAX >> 1)) { - __u32 layout_version; - - get_random_bytes(&layout_version, sizeof(layout_version)); - lo->ldo_layout_gen = layout_version & 0xffff; - } - rc = lod_declare_instantiate_components(env, lo, th); + rc = lod_declare_instantiate_components(env, lo, th, 0); if (rc) GOTO(out, rc); layout_attr->la_valid = LA_LAYOUT_VERSION; - layout_attr->la_layout_version = 0; /* set current version */ + layout_attr->la_layout_version = 0; if (mlc->mlc_opc == MD_LAYOUT_RESYNC) layout_attr->la_layout_version = LU_LAYOUT_RESYNC; rc = lod_declare_attr_set(env, &lo->ldo_obj, layout_attr, th); @@ -7403,22 +8422,31 @@ static int lod_declare_update_write_pending(const struct lu_env *env, LASSERT(mlc->mlc_opc == MD_LAYOUT_WRITE || mlc->mlc_opc == MD_LAYOUT_RESYNC); - /* look for the primary mirror */ + /* look for the first preferred mirror */ for (i = 0; i < lo->ldo_mirror_count; i++) { if (lo->ldo_mirrors[i].lme_stale) continue; - - LASSERTF(primary < 0, DFID " has multiple primary: %u / %u", - PFID(lod_object_fid(lo)), - lo->ldo_mirrors[i].lme_id, - lo->ldo_mirrors[primary].lme_id); + if (lo->ldo_mirrors[i].lme_prefer == 0) + continue; + if (lo->ldo_mirrors[i].lme_hsm) + continue; primary = i; + break; } if (primary < 0) { - CERROR(DFID ": doesn't have a primary mirror\n", - PFID(lod_object_fid(lo))); - GOTO(out, rc = -ENODATA); + /* no primary, use any in-sync */ + for (i = 0; i < lo->ldo_mirror_count; i++) { + if (lo->ldo_mirrors[i].lme_stale) + continue; + primary = i; + break; + } + if (primary < 0) { + CERROR(DFID ": doesn't have a primary mirror\n", + PFID(lod_object_fid(lo))); + GOTO(out, rc = -ENODATA); + } } CDEBUG(D_LAYOUT, DFID": found primary %u\n", @@ -7437,11 +8465,11 @@ static int lod_declare_update_write_pending(const struct lu_env *env, if (mlc->mlc_opc == MD_LAYOUT_WRITE) { struct layout_intent *layout = mlc->mlc_intent; - int write = layout->li_opc == LAYOUT_INTENT_WRITE; + int write = layout->lai_opc == LAYOUT_INTENT_WRITE; - LASSERT(mlc->mlc_intent != NULL); + LASSERT(layout != NULL); - extent = mlc->mlc_intent->li_extent; + extent = layout->lai_extent; CDEBUG(D_LAYOUT, DFID": intent to write: "DEXT"\n", PFID(lod_object_fid(lo)), PEXT(&extent)); @@ -7452,7 +8480,7 @@ static int lod_declare_update_write_pending(const struct lu_env *env, if (rc < 0) GOTO(out, rc); - if (mlc->mlc_intent->li_opc == LAYOUT_INTENT_TRUNC) { + if (layout->lai_opc == LAYOUT_INTENT_TRUNC) { /** * trunc transfers [0, size) in the intent extent, we'd * stale components overlapping [size, eof). @@ -7470,7 +8498,7 @@ static int lod_declare_update_write_pending(const struct lu_env *env, * instantiate [0, mlc->mlc_intent->e_end) */ /* restore truncate intent extent */ - if (mlc->mlc_intent->li_opc == LAYOUT_INTENT_TRUNC) + if (layout->lai_opc == LAYOUT_INTENT_TRUNC) extent.e_end = extent.e_start; extent.e_start = 0; @@ -7479,151 +8507,789 @@ static int lod_declare_update_write_pending(const struct lu_env *env, &lod_comp->llc_extent)) break; - if (!lod_is_instantiation_needed(lod_comp)) - continue; + if (!lod_is_instantiation_needed(lod_comp)) + continue; + + CDEBUG(D_LAYOUT, "write instantiate %d / %d\n", + primary, lod_comp_index(lo, lod_comp)); + info->lti_comp_idx[info->lti_count++] = + lod_comp_index(lo, lod_comp); + } + } else { /* MD_LAYOUT_RESYNC */ + if (mlc->mlc_mirror_id == 0) { + /* normal resync */ + lod_foreach_mirror_comp(lod_comp, lo, primary) { + if (!lod_comp_inited(lod_comp)) + break; + + extent.e_end = lod_comp->llc_extent.e_end; + } + + rc = lod_prepare_resync(env, lo, &extent); + if (rc) + GOTO(out, rc); + } else { + /* mirror write, try to init its all components */ + rc = lod_prepare_resync_mirror(env, lo, + mlc->mlc_mirror_id); + if (rc) + GOTO(out, rc); + } + + /* change the file state to SYNC_PENDING */ + lo->ldo_flr_state = LCM_FL_SYNC_PENDING; + } + + rc = lod_declare_instantiate_components(env, lo, th, 0); + if (rc) + GOTO(out, rc); + + lod_obj_inc_layout_gen(lo); + + /* 3. transfer layout version to OST objects. + * transfer new layout version to OST objects so that stale writes + * can be denied. It also ends an era of writing by setting + * LU_LAYOUT_RESYNC. Normal client can never use this bit to + * send write RPC; only resync RPCs could do it. */ + layout_attr->la_valid = LA_LAYOUT_VERSION; + layout_attr->la_layout_version = 0; + if (mlc->mlc_opc == MD_LAYOUT_RESYNC) + layout_attr->la_layout_version = LU_LAYOUT_RESYNC; + rc = lod_declare_attr_set(env, &lo->ldo_obj, layout_attr, th); + if (rc) + GOTO(out, rc); +out: + if (rc) + lod_striping_free(env, lo); + RETURN(rc); +} + +static int lod_declare_update_sync_pending(const struct lu_env *env, + struct lod_object *lo, struct md_layout_change *mlc, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lu_attr *layout_attr = &info->lti_layout_attr; + unsigned sync_components = 0; + unsigned resync_components = 0; + int i; + int rc; + ENTRY; + + LASSERT(lo->ldo_flr_state == LCM_FL_SYNC_PENDING); + LASSERT(mlc->mlc_opc == MD_LAYOUT_RESYNC_DONE || + mlc->mlc_opc == MD_LAYOUT_WRITE); + + CDEBUG(D_LAYOUT, DFID ": received op %d in sync pending\n", + PFID(lod_object_fid(lo)), mlc->mlc_opc); + + if (mlc->mlc_opc == MD_LAYOUT_WRITE) { + CDEBUG(D_LAYOUT, DFID": cocurrent write to sync pending\n", + PFID(lod_object_fid(lo))); + + lo->ldo_flr_state = LCM_FL_WRITE_PENDING; + return lod_declare_update_write_pending(env, lo, mlc, th); + } + + /* MD_LAYOUT_RESYNC_DONE */ + + for (i = 0; i < lo->ldo_comp_cnt; i++) { + struct lod_layout_component *lod_comp; + int j; + + lod_comp = &lo->ldo_comp_entries[i]; + + if (!(lod_comp->llc_flags & LCME_FL_STALE)) { + sync_components++; + continue; + } + + for (j = 0; j < mlc->mlc_resync_count; j++) { + if (lod_comp->llc_id != mlc->mlc_resync_ids[j]) + continue; + + mlc->mlc_resync_ids[j] = LCME_ID_INVAL; + lod_comp->llc_flags &= ~LCME_FL_STALE; + resync_components++; + break; + } + } + + /* valid check */ + for (i = 0; i < mlc->mlc_resync_count; i++) { + if (mlc->mlc_resync_ids[i] == LCME_ID_INVAL) + continue; + + CDEBUG(D_LAYOUT, DFID": lcme id %u (%d / %zd) not exist " + "or already synced\n", PFID(lod_object_fid(lo)), + mlc->mlc_resync_ids[i], i, mlc->mlc_resync_count); + GOTO(out, rc = -EINVAL); + } + + if (!sync_components || (mlc->mlc_resync_count && !resync_components)) { + CDEBUG(D_LAYOUT, DFID": no mirror in sync\n", + PFID(lod_object_fid(lo))); + + /* tend to return an error code here to prevent + * the MDT from setting SoM attribute */ + GOTO(out, rc = -EINVAL); + } + + CDEBUG(D_LAYOUT, DFID": synced %u resynced %u/%zu components\n", + PFID(lod_object_fid(lo)), + sync_components, resync_components, mlc->mlc_resync_count); + + lo->ldo_flr_state = LCM_FL_RDONLY; + lod_obj_inc_layout_gen(lo); + + layout_attr->la_valid = LA_LAYOUT_VERSION; + layout_attr->la_layout_version = 0; + rc = lod_declare_attr_set(env, &lo->ldo_obj, layout_attr, th); + if (rc) + GOTO(out, rc); + + info->lti_buf.lb_len = lod_comp_md_size(lo, false); + rc = lod_sub_declare_xattr_set(env, lod_object_child(lo), + &info->lti_buf, XATTR_NAME_LOV, 0, th); + EXIT; + +out: + if (rc) + lod_striping_free(env, lo); + RETURN(rc); +} + +typedef int (*mlc_handler)(const struct lu_env *env, struct dt_object *dt, + const struct md_layout_change *mlc, + struct thandle *th); + +/** + * Attach stripes after target's for migrating directory. NB, we + * only need to declare this, the actual work is done inside + * lod_xattr_set_lmv(). + * + * \param[in] env execution environment + * \param[in] dt target object + * \param[in] mlc layout change data + * \param[in] th transaction handle + * + * \retval 0 on success + * \retval negative if failed + */ +static int lod_dir_declare_layout_attach(const struct lu_env *env, + struct dt_object *dt, + const struct md_layout_change *mlc, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev); + struct lod_tgt_descs *ltd = &lod->lod_mdt_descs; + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object *next = dt_object_child(dt); + struct dt_object_format *dof = &info->lti_format; + struct lmv_mds_md_v1 *lmv = mlc->mlc_buf.lb_buf; + struct dt_object **stripes; + __u32 stripe_count = le32_to_cpu(lmv->lmv_stripe_count); + struct lu_fid *fid = &info->lti_fid; + struct lod_tgt_desc *tgt; + struct dt_object *dto; + struct dt_device *tgt_dt; + int type = LU_SEQ_RANGE_ANY; + struct dt_insert_rec *rec = &info->lti_dt_rec; + char *stripe_name = info->lti_key; + struct lu_name *sname; + struct linkea_data ldata = { NULL }; + struct lu_buf linkea_buf; + __u32 idx; + int i; + int rc; + + ENTRY; + + if (!lmv_is_sane(lmv)) + RETURN(-EINVAL); + + if (!dt_try_as_dir(env, dt, false)) + RETURN(-ENOTDIR); + + dof->dof_type = DFT_DIR; + + OBD_ALLOC_PTR_ARRAY(stripes, (lo->ldo_dir_stripe_count + stripe_count)); + if (!stripes) + RETURN(-ENOMEM); + + for (i = 0; i < lo->ldo_dir_stripe_count; i++) + stripes[i] = lo->ldo_stripe[i]; + + rec->rec_type = S_IFDIR; + + for (i = 0; i < stripe_count; i++) { + fid_le_to_cpu(fid, + &lmv->lmv_stripe_fids[i]); + if (!fid_is_sane(fid)) + continue; + + rc = lod_fld_lookup(env, lod, fid, &idx, &type); + if (rc) + GOTO(out, rc); + + if (idx == lod2lu_dev(lod)->ld_site->ld_seq_site->ss_node_id) { + tgt_dt = lod->lod_child; + } else { + tgt = LTD_TGT(ltd, idx); + if (tgt == NULL) + GOTO(out, rc = -ESTALE); + tgt_dt = tgt->ltd_tgt; + } + + dto = dt_locate_at(env, tgt_dt, fid, + lo->ldo_obj.do_lu.lo_dev->ld_site->ls_top_dev, + NULL); + if (IS_ERR(dto)) + GOTO(out, rc = PTR_ERR(dto)); + + stripes[i + lo->ldo_dir_stripe_count] = dto; + + if (!dt_try_as_dir(env, dto, true)) + GOTO(out, rc = -ENOTDIR); + + rc = lod_sub_declare_ref_add(env, dto, th); + if (rc) + GOTO(out, rc); + + rec->rec_fid = lu_object_fid(&dto->do_lu); + rc = lod_sub_declare_insert(env, dto, + (const struct dt_rec *)rec, + (const struct dt_key *)dot, th); + if (rc) + GOTO(out, rc); + + rc = lod_sub_declare_insert(env, dto, + (const struct dt_rec *)rec, + (const struct dt_key *)dotdot, th); + if (rc) + GOTO(out, rc); + + rc = lod_sub_declare_xattr_set(env, dto, &mlc->mlc_buf, + XATTR_NAME_LMV, 0, th); + if (rc) + GOTO(out, rc); + + snprintf(stripe_name, sizeof(info->lti_key), DFID":%u", + PFID(lu_object_fid(&dto->do_lu)), + i + lo->ldo_dir_stripe_count); + + sname = lod_name_get(env, stripe_name, strlen(stripe_name)); + rc = linkea_links_new(&ldata, &info->lti_linkea_buf, + sname, lu_object_fid(&dt->do_lu)); + if (rc) + GOTO(out, rc); + + linkea_buf.lb_buf = ldata.ld_buf->lb_buf; + linkea_buf.lb_len = ldata.ld_leh->leh_len; + rc = lod_sub_declare_xattr_set(env, dto, &linkea_buf, + XATTR_NAME_LINK, 0, th); + if (rc) + GOTO(out, rc); + + rc = lod_sub_declare_insert(env, next, + (const struct dt_rec *)rec, + (const struct dt_key *)stripe_name, + th); + if (rc) + GOTO(out, rc); + + rc = lod_sub_declare_ref_add(env, next, th); + if (rc) + GOTO(out, rc); + } + + if (lo->ldo_stripe) + OBD_FREE_PTR_ARRAY(lo->ldo_stripe, + lo->ldo_dir_stripes_allocated); + lo->ldo_stripe = stripes; + lo->ldo_is_foreign = 0; + lo->ldo_dir_migrate_offset = lo->ldo_dir_stripe_count; + lo->ldo_dir_migrate_hash = le32_to_cpu(lmv->lmv_hash_type); + lo->ldo_dir_stripe_count += stripe_count; + lo->ldo_dir_layout_version++; + lo->ldo_dir_stripes_allocated += stripe_count; + + /* plain directory split creates target as a plain directory, while + * after source attached as the first stripe, it becomes a striped + * directory, set correct do_index_ops, otherwise it can't be unlinked. + */ + dt->do_index_ops = &lod_striped_index_ops; + + RETURN(0); +out: + i = lo->ldo_dir_stripe_count; + while (i < lo->ldo_dir_stripe_count + stripe_count && stripes[i]) + dt_object_put(env, stripes[i++]); + + OBD_FREE_PTR_ARRAY(stripes, stripe_count + lo->ldo_dir_stripe_count); + return rc; +} + +static int lod_dir_declare_layout_detach(const struct lu_env *env, + struct dt_object *dt, + const struct md_layout_change *unused, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object *next = dt_object_child(dt); + char *stripe_name = info->lti_key; + struct dt_object *dto; + int i; + int rc = 0; + + if (!dt_try_as_dir(env, dt, true)) + return -ENOTDIR; + + if (!lo->ldo_dir_stripe_count) + return lod_sub_declare_delete(env, next, + (const struct dt_key *)dotdot, th); + + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + dto = lo->ldo_stripe[i]; + if (!dto) + continue; + + if (!dt_try_as_dir(env, dto, true)) + return -ENOTDIR; + + rc = lod_sub_declare_delete(env, dto, + (const struct dt_key *)dotdot, th); + if (rc) + return rc; + + snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", + PFID(lu_object_fid(&dto->do_lu)), i); + + rc = lod_sub_declare_delete(env, next, + (const struct dt_key *)stripe_name, th); + if (rc) + return rc; + + rc = lod_sub_declare_ref_del(env, next, th); + if (rc) + return rc; + } + + return 0; +} + +static int dt_dir_is_empty(const struct lu_env *env, + struct dt_object *obj) +{ + struct dt_it *it; + const struct dt_it_ops *iops; + int rc; + + ENTRY; + + if (!dt_try_as_dir(env, obj, true)) + RETURN(-ENOTDIR); + + iops = &obj->do_index_ops->dio_it; + it = iops->init(env, obj, LUDA_64BITHASH); + if (IS_ERR(it)) + RETURN(PTR_ERR(it)); + + rc = iops->get(env, it, (const struct dt_key *)""); + if (rc > 0) { + int i; + + for (rc = 0, i = 0; rc == 0 && i < 3; ++i) + rc = iops->next(env, it); + if (!rc) + rc = -ENOTEMPTY; + else if (rc == 1) + rc = 0; + } else if (!rc) { + /* Huh? Index contains no zero key? */ + rc = -EIO; + } + + iops->put(env, it); + iops->fini(env, it); + + RETURN(rc); +} + +static int lod_dir_declare_layout_shrink(const struct lu_env *env, + struct dt_object *dt, + const struct md_layout_change *mlc, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object *next = dt_object_child(dt); + struct lmv_user_md *lmu = mlc->mlc_buf.lb_buf; + char *stripe_name = info->lti_key; + struct lu_buf *lmv_buf = &info->lti_buf; + __u32 final_stripe_count; + struct dt_object *dto; + int i; + int rc; + + LASSERT(lmu); + + if (!dt_try_as_dir(env, dt, true)) + return -ENOTDIR; + + /* shouldn't be called on plain directory */ + LASSERT(lo->ldo_dir_stripe_count); + + lmv_buf->lb_buf = &info->lti_lmv.lmv_md_v1; + lmv_buf->lb_len = sizeof(info->lti_lmv.lmv_md_v1); + + final_stripe_count = le32_to_cpu(lmu->lum_stripe_count); + LASSERT(final_stripe_count && + final_stripe_count < lo->ldo_dir_stripe_count); + + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + dto = lo->ldo_stripe[i]; + if (!dto) + continue; + + if (i < final_stripe_count) { + rc = lod_sub_declare_xattr_set(env, dto, lmv_buf, + XATTR_NAME_LMV, + LU_XATTR_REPLACE, th); + if (rc) + return rc; + + continue; + } + + rc = dt_dir_is_empty(env, dto); + if (rc < 0) + return rc; + + rc = lod_sub_declare_ref_del(env, dto, th); + if (rc) + return rc; + + rc = lod_sub_declare_destroy(env, dto, th); + if (rc) + return rc; + + snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", + PFID(lu_object_fid(&dto->do_lu)), i); + + rc = lod_sub_declare_delete(env, next, + (const struct dt_key *)stripe_name, th); + if (rc) + return rc; + + rc = lod_sub_declare_ref_del(env, next, th); + if (rc) + return rc; + } + + rc = lod_sub_declare_xattr_set(env, next, lmv_buf, XATTR_NAME_LMV, + LU_XATTR_REPLACE, th); + return rc; +} + +/** + * Allocate stripes for split directory. + * + * \param[in] env execution environment + * \param[in] dt target object + * \param[in] mlc layout change data + * \param[in] th transaction handle + * + * \retval 0 on success + * \retval negative if failed + */ +static int lod_dir_declare_layout_split(const struct lu_env *env, + struct dt_object *dt, + const struct md_layout_change *mlc, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_device *lod = lu2lod_dev(dt->do_lu.lo_dev); + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object_format *dof = &info->lti_format; + struct lmv_user_md_v1 *lum = mlc->mlc_spec->u.sp_ea.eadata; + struct dt_object **stripes; + int mdt_count = lod->lod_remote_mdt_count + 1; + u32 stripe_count; + u32 saved_count; + int i; + int rc; + + ENTRY; + + LASSERT(le32_to_cpu(lum->lum_magic) == LMV_USER_MAGIC); + LASSERT(le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT); + + saved_count = lo->ldo_dir_stripes_allocated; + stripe_count = le32_to_cpu(lum->lum_stripe_count); + if (stripe_count <= saved_count) + RETURN(-EINVAL); + + /* if the split target is overstriped, we need to put that flag in the + * current layout so it can allocate the larger number of stripes + * + * Note we need to pick up any hash *flags* which affect allocation + * *before* allocation, so they're used in allocating the directory, + * rather than after when we finalize directory setup (at the end of + * this function). + */ + if (le32_to_cpu(lum->lum_hash_type) & LMV_HASH_FLAG_OVERSTRIPED) + lo->ldo_dir_hash_type |= LMV_HASH_FLAG_OVERSTRIPED; + + if (!(lo->ldo_dir_hash_type & LMV_HASH_FLAG_OVERSTRIPED) && + stripe_count > mdt_count) { + RETURN(-E2BIG); + } else if ((lo->ldo_dir_hash_type & LMV_HASH_FLAG_OVERSTRIPED) && + (stripe_count > mdt_count * LMV_MAX_STRIPES_PER_MDT || + /* a single MDT doesn't initialize the infrastructure for striped + * directories, so we just don't support overstriping in that case + */ + mdt_count == 1)) { + RETURN(-E2BIG); + } + + dof->dof_type = DFT_DIR; + + OBD_ALLOC(stripes, sizeof(*stripes) * stripe_count); + if (!stripes) + RETURN(-ENOMEM); + + for (i = 0; i < lo->ldo_dir_stripes_allocated; i++) + stripes[i] = lo->ldo_stripe[i]; + + lod_qos_statfs_update(env, lod, &lod->lod_mdt_descs); + + rc = lod_mdt_alloc_qos(env, lo, stripes, saved_count, stripe_count); + if (rc == -EAGAIN) + rc = lod_mdt_alloc_rr(env, lo, stripes, saved_count, + stripe_count); + if (rc < 0) { + OBD_FREE(stripes, sizeof(*stripes) * stripe_count); + RETURN(rc); + } + + LASSERT(rc > saved_count); + OBD_FREE(lo->ldo_stripe, + sizeof(*stripes) * lo->ldo_dir_stripes_allocated); + lo->ldo_stripe = stripes; + lo->ldo_is_foreign = 0; + lo->ldo_dir_striped = 1; + lo->ldo_dir_stripe_count = rc; + lo->ldo_dir_stripes_allocated = stripe_count; + lo->ldo_dir_split_hash = lo->ldo_dir_hash_type; + lo->ldo_dir_hash_type = le32_to_cpu(lum->lum_hash_type); + if (!lmv_is_known_hash_type(lo->ldo_dir_hash_type)) + lo->ldo_dir_hash_type = + lod->lod_mdt_descs.ltd_lmv_desc.ld_pattern; + lo->ldo_dir_hash_type |= LMV_HASH_FLAG_SPLIT | LMV_HASH_FLAG_MIGRATION; + lo->ldo_dir_split_offset = saved_count; + lo->ldo_dir_layout_version++; + lo->ldo_dir_stripe_loaded = 1; + + rc = lod_dir_declare_create_stripes(env, dt, mlc->mlc_attr, dof, th); + if (rc) + lod_striping_free(env, lo); + + RETURN(rc); +} + +/* + * detach all stripes from dir master object, NB, stripes are not destroyed, but + * deleted from it's parent namespace, this function is called in two places: + * 1. mdd_migrate_mdt() detach stripes from source, and attach them to + * target. + * 2. mdd_dir_layout_update() detach stripe before turning 1-stripe directory to + * a plain directory. + * + * \param[in] env execution environment + * \param[in] dt target object + * \param[in] mlc layout change data + * \param[in] th transaction handle + * + * \retval 0 on success + * \retval negative if failed + */ +static int lod_dir_layout_detach(const struct lu_env *env, + struct dt_object *dt, + const struct md_layout_change *mlc, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_object *lo = lod_dt_obj(dt); + struct dt_object *next = dt_object_child(dt); + char *stripe_name = info->lti_key; + struct dt_object *dto; + int i; + int rc = 0; + + ENTRY; + + if (!lo->ldo_dir_stripe_count) { + /* plain directory delete .. */ + rc = lod_sub_delete(env, next, + (const struct dt_key *)dotdot, th); + RETURN(rc); + } + + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + dto = lo->ldo_stripe[i]; + if (!dto) + continue; - CDEBUG(D_LAYOUT, "write instantiate %d / %d\n", - primary, lod_comp_index(lo, lod_comp)); - info->lti_comp_idx[info->lti_count++] = - lod_comp_index(lo, lod_comp); - } - } else { /* MD_LAYOUT_RESYNC */ - if (mlc->mlc_mirror_id == 0) { - /* normal resync */ - lod_foreach_mirror_comp(lod_comp, lo, primary) { - if (!lod_comp_inited(lod_comp)) - break; + rc = lod_sub_delete(env, dto, + (const struct dt_key *)dotdot, th); + if (rc) + break; - extent.e_end = lod_comp->llc_extent.e_end; - } + snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", + PFID(lu_object_fid(&dto->do_lu)), i); - rc = lod_prepare_resync(env, lo, &extent); - if (rc) - GOTO(out, rc); - } else { - /* mirror write, try to init its all components */ - rc = lod_prepare_resync_mirror(env, lo, - mlc->mlc_mirror_id); - if (rc) - GOTO(out, rc); - } + rc = lod_sub_delete(env, next, + (const struct dt_key *)stripe_name, th); + if (rc) + break; - /* change the file state to SYNC_PENDING */ - lo->ldo_flr_state = LCM_FL_SYNC_PENDING; + rc = lod_sub_ref_del(env, next, th); + if (rc) + break; } - rc = lod_declare_instantiate_components(env, lo, th); - if (rc) - GOTO(out, rc); - - /* 3. transfer layout version to OST objects. - * transfer new layout version to OST objects so that stale writes - * can be denied. It also ends an era of writing by setting - * LU_LAYOUT_RESYNC. Normal client can never use this bit to - * send write RPC; only resync RPCs could do it. */ - layout_attr->la_valid = LA_LAYOUT_VERSION; - layout_attr->la_layout_version = 0; /* set current version */ - if (mlc->mlc_opc == MD_LAYOUT_RESYNC) - layout_attr->la_layout_version = LU_LAYOUT_RESYNC; - rc = lod_declare_attr_set(env, &lo->ldo_obj, layout_attr, th); - if (rc) - GOTO(out, rc); + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + dto = lo->ldo_stripe[i]; + if (dto) + dt_object_put(env, dto); + } + OBD_FREE_PTR_ARRAY(lo->ldo_stripe, lo->ldo_dir_stripes_allocated); + lo->ldo_stripe = NULL; + lo->ldo_dir_stripes_allocated = 0; + lo->ldo_dir_stripe_count = 0; + dt->do_index_ops = &lod_index_ops; - lod_obj_inc_layout_gen(lo); -out: - if (rc) - lod_striping_free(env, lo); RETURN(rc); } -static int lod_declare_update_sync_pending(const struct lu_env *env, - struct lod_object *lo, struct md_layout_change *mlc, - struct thandle *th) +static int lod_dir_layout_shrink(const struct lu_env *env, + struct dt_object *dt, + const struct md_layout_change *mlc, + struct thandle *th) { - struct lod_thread_info *info = lod_env_info(env); - unsigned sync_components = 0; - unsigned resync_components = 0; + struct lod_thread_info *info = lod_env_info(env); + struct lod_object *lo = lod_dt_obj(dt); + struct lod_device *lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); + struct dt_object *next = dt_object_child(dt); + struct lmv_user_md *lmu = mlc->mlc_buf.lb_buf; + __u32 final_stripe_count; + char *stripe_name = info->lti_key; + struct dt_object *dto; + struct lu_buf *lmv_buf = &info->lti_buf; + struct lmv_mds_md_v1 *lmv = &info->lti_lmv.lmv_md_v1; + u32 mdtidx; + int type = LU_SEQ_RANGE_ANY; int i; int rc; - ENTRY; - - LASSERT(lo->ldo_flr_state == LCM_FL_SYNC_PENDING); - LASSERT(mlc->mlc_opc == MD_LAYOUT_RESYNC_DONE || - mlc->mlc_opc == MD_LAYOUT_WRITE); - CDEBUG(D_LAYOUT, DFID ": received op %d in sync pending\n", - PFID(lod_object_fid(lo)), mlc->mlc_opc); + ENTRY; - if (mlc->mlc_opc == MD_LAYOUT_WRITE) { - CDEBUG(D_LAYOUT, DFID": cocurrent write to sync pending\n", - PFID(lod_object_fid(lo))); + final_stripe_count = le32_to_cpu(lmu->lum_stripe_count); - lo->ldo_flr_state = LCM_FL_WRITE_PENDING; - return lod_declare_update_write_pending(env, lo, mlc, th); - } + lmv_buf->lb_buf = lmv; + lmv_buf->lb_len = sizeof(*lmv); + lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_STRIPE); + lmv->lmv_stripe_count = cpu_to_le32(final_stripe_count); + lmv->lmv_hash_type = cpu_to_le32(lo->ldo_dir_hash_type) & + cpu_to_le32(LMV_HASH_TYPE_MASK | + LMV_HASH_FLAG_FIXED); + lmv->lmv_layout_version = + cpu_to_le32(lo->ldo_dir_layout_version + 1); + lmv->lmv_migrate_offset = 0; + lmv->lmv_migrate_hash = 0; - /* MD_LAYOUT_RESYNC_DONE */ + for (i = 0; i < lo->ldo_dir_stripe_count; i++) { + dto = lo->ldo_stripe[i]; + if (!dto) + continue; - for (i = 0; i < lo->ldo_comp_cnt; i++) { - struct lod_layout_component *lod_comp; - int j; + if (i < final_stripe_count) { + rc = lod_fld_lookup(env, lod, + lu_object_fid(&dto->do_lu), + &mdtidx, &type); + if (rc) + RETURN(rc); - lod_comp = &lo->ldo_comp_entries[i]; + lmv->lmv_master_mdt_index = cpu_to_le32(mdtidx); + rc = lod_sub_xattr_set(env, dto, lmv_buf, + XATTR_NAME_LMV, + LU_XATTR_REPLACE, th); + if (rc) + RETURN(rc); - if (!(lod_comp->llc_flags & LCME_FL_STALE)) { - sync_components++; continue; } - for (j = 0; j < mlc->mlc_resync_count; j++) { - if (lod_comp->llc_id != mlc->mlc_resync_ids[j]) - continue; - - mlc->mlc_resync_ids[j] = LCME_ID_INVAL; - lod_comp->llc_flags &= ~LCME_FL_STALE; - resync_components++; - break; - } - } + dt_write_lock(env, dto, DT_TGT_CHILD); + rc = lod_sub_ref_del(env, dto, th); + dt_write_unlock(env, dto); + if (rc) + RETURN(rc); - /* valid check */ - for (i = 0; i < mlc->mlc_resync_count; i++) { - if (mlc->mlc_resync_ids[i] == LCME_ID_INVAL) - continue; + rc = lod_sub_destroy(env, dto, th); + if (rc) + RETURN(rc); - CDEBUG(D_LAYOUT, DFID": lcme id %u (%d / %zd) not exist " - "or already synced\n", PFID(lod_object_fid(lo)), - mlc->mlc_resync_ids[i], i, mlc->mlc_resync_count); - GOTO(out, rc = -EINVAL); - } + snprintf(stripe_name, sizeof(info->lti_key), DFID":%d", + PFID(lu_object_fid(&dto->do_lu)), i); - if (!sync_components || (mlc->mlc_resync_count && !resync_components)) { - CDEBUG(D_LAYOUT, DFID": no mirror in sync\n", - PFID(lod_object_fid(lo))); + rc = lod_sub_delete(env, next, + (const struct dt_key *)stripe_name, th); + if (rc) + RETURN(rc); - /* tend to return an error code here to prevent - * the MDT from setting SoM attribute */ - GOTO(out, rc = -EINVAL); + rc = lod_sub_ref_del(env, next, th); + if (rc) + RETURN(rc); } - CDEBUG(D_LAYOUT, DFID": synced %u resynced %u/%zu components\n", - PFID(lod_object_fid(lo)), - sync_components, resync_components, mlc->mlc_resync_count); + rc = lod_fld_lookup(env, lod, lu_object_fid(&dt->do_lu), &mdtidx, + &type); + if (rc) + RETURN(rc); - lo->ldo_flr_state = LCM_FL_RDONLY; - lod_obj_inc_layout_gen(lo); + lmv->lmv_magic = cpu_to_le32(LMV_MAGIC_V1); + lmv->lmv_master_mdt_index = cpu_to_le32(mdtidx); + rc = lod_sub_xattr_set(env, next, lmv_buf, XATTR_NAME_LMV, + LU_XATTR_REPLACE, th); + if (rc) + RETURN(rc); - info->lti_buf.lb_len = lod_comp_md_size(lo, false); - rc = lod_sub_declare_xattr_set(env, lod_object_child(lo), - &info->lti_buf, XATTR_NAME_LOV, 0, th); - EXIT; + for (i = final_stripe_count; i < lo->ldo_dir_stripe_count; i++) { + dto = lo->ldo_stripe[i]; + if (dto) + dt_object_put(env, dto); + } + lo->ldo_dir_stripe_count = final_stripe_count; -out: - if (rc) - lod_striping_free(env, lo); RETURN(rc); } +static mlc_handler dir_mlc_declare_ops[MD_LAYOUT_MAX] = { + [MD_LAYOUT_ATTACH] = lod_dir_declare_layout_attach, + [MD_LAYOUT_DETACH] = lod_dir_declare_layout_detach, + [MD_LAYOUT_SHRINK] = lod_dir_declare_layout_shrink, + [MD_LAYOUT_SPLIT] = lod_dir_declare_layout_split, +}; + +static mlc_handler dir_mlc_ops[MD_LAYOUT_MAX] = { + [MD_LAYOUT_DETACH] = lod_dir_layout_detach, + [MD_LAYOUT_SHRINK] = lod_dir_layout_shrink, +}; + static int lod_declare_layout_change(const struct lu_env *env, struct dt_object *dt, struct md_layout_change *mlc, struct thandle *th) @@ -7631,12 +9297,32 @@ static int lod_declare_layout_change(const struct lu_env *env, struct lod_thread_info *info = lod_env_info(env); struct lod_object *lo = lod_dt_obj(dt); int rc; + ENTRY; + if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) { + LASSERT(dir_mlc_declare_ops[mlc->mlc_opc]); + rc = dir_mlc_declare_ops[mlc->mlc_opc](env, dt, mlc, th); + RETURN(rc); + } + if (!S_ISREG(dt->do_lu.lo_header->loh_attr) || !dt_object_exists(dt) || dt_object_remote(dt_object_child(dt))) RETURN(-EINVAL); + if (mlc->mlc_opc == MD_LAYOUT_WRITE) { + struct layout_intent *intent = mlc->mlc_intent; + + if (intent->lai_opc == LAYOUT_INTENT_PCCRO_SET || + intent->lai_opc == LAYOUT_INTENT_PCCRO_CLEAR) { + if (!S_ISREG(dt->do_lu.lo_header->loh_attr)) + RETURN(-EINVAL); + + rc = lod_declare_update_pccro(env, dt, mlc, th); + RETURN(rc); + } + } + rc = lod_striping_load(env, lo); if (rc) GOTO(out, rc); @@ -7665,6 +9351,9 @@ static int lod_declare_layout_change(const struct lu_env *env, rc = -ENOTSUPP; break; } + if (rc == 0) + rc = lod_save_layout_gen_intrans(info, lo); + out: RETURN(rc); } @@ -7675,21 +9364,40 @@ out: static int lod_layout_change(const struct lu_env *env, struct dt_object *dt, struct md_layout_change *mlc, struct thandle *th) { + struct lod_thread_info *info = lod_env_info(env); struct lu_attr *attr = &lod_env_info(env)->lti_attr; - struct lu_attr *layout_attr = &lod_env_info(env)->lti_layout_attr; + struct lu_attr *layout_attr = &info->lti_layout_attr; struct lod_object *lo = lod_dt_obj(dt); int rc; + ENTRY; + + if (S_ISDIR(dt->do_lu.lo_header->loh_attr)) { + LASSERT(dir_mlc_ops[mlc->mlc_opc]); + rc = dir_mlc_ops[mlc->mlc_opc](env, dt, mlc, th); + RETURN(rc); + } + + rc = lod_check_layout_gen_intrans(info, lo); + if (rc > 0) { + CDEBUG(D_LAYOUT, + "%s: obj "DFID" gen changed from %d to %d in transaction, retry the transaction \n", + dt->do_lu.lo_dev->ld_obd->obd_name, + PFID(lu_object_fid(&dt->do_lu)), + info->lti_gen[rc - 1], lo->ldo_layout_gen); + RETURN(-EAGAIN); + } + rc = lod_striped_create(env, dt, attr, NULL, th); if (!rc && layout_attr->la_valid & LA_LAYOUT_VERSION) { layout_attr->la_layout_version |= lo->ldo_layout_gen; rc = lod_attr_set(env, dt, layout_attr, th); } - return rc; + RETURN(rc); } -struct dt_object_operations lod_obj_ops = { +const struct dt_object_operations lod_obj_ops = { .do_read_lock = lod_read_lock, .do_write_lock = lod_write_lock, .do_read_unlock = lod_read_unlock, @@ -7790,7 +9498,7 @@ static int lod_punch(const struct lu_env *env, struct dt_object *dt, * body_ops themselves will check file type inside, see lod_read/write/punch for * details. */ -const struct dt_body_operations lod_body_ops = { +static const struct dt_body_operations lod_body_ops = { .dbo_read = lod_read, .dbo_declare_write = lod_declare_write, .dbo_write = lod_write, @@ -7841,7 +9549,7 @@ static int lod_object_init(const struct lu_env *env, struct lu_object *lo, if (ltd != NULL) { if (ltd->ltd_tgts_size > idx && - cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx)) { + test_bit(idx, ltd->ltd_tgt_bitmap)) { tgt = LTD_TGT(ltd, idx); LASSERT(tgt != NULL); @@ -7868,56 +9576,6 @@ static int lod_object_init(const struct lu_env *env, struct lu_object *lo, /** * - * Alloc cached foreign LOV - * - * \param[in] lo object - * \param[in] size size of foreign LOV - * - * \retval 0 on success - * \retval negative if failed - */ -int lod_alloc_foreign_lov(struct lod_object *lo, size_t size) -{ - OBD_ALLOC_LARGE(lo->ldo_foreign_lov, size); - if (lo->ldo_foreign_lov == NULL) - return -ENOMEM; - lo->ldo_foreign_lov_size = size; - lo->ldo_is_foreign = 1; - return 0; -} - -/** - * - * Free cached foreign LOV - * - * \param[in] lo object - */ -void lod_free_foreign_lov(struct lod_object *lo) -{ - if (lo->ldo_foreign_lov != NULL) - OBD_FREE_LARGE(lo->ldo_foreign_lov, lo->ldo_foreign_lov_size); - lo->ldo_foreign_lov = NULL; - lo->ldo_foreign_lov_size = 0; - lo->ldo_is_foreign = 0; -} - -/** - * - * Free cached foreign LMV - * - * \param[in] lo object - */ -void lod_free_foreign_lmv(struct lod_object *lo) -{ - if (lo->ldo_foreign_lmv != NULL) - OBD_FREE_LARGE(lo->ldo_foreign_lmv, lo->ldo_foreign_lmv_size); - lo->ldo_foreign_lmv = NULL; - lo->ldo_foreign_lmv_size = 0; - lo->ldo_dir_is_foreign = 0; -} - -/** - * * Release resources associated with striping. * * If the object is striped (regular or directory), then release @@ -7929,14 +9587,17 @@ void lod_free_foreign_lmv(struct lod_object *lo) void lod_striping_free_nolock(const struct lu_env *env, struct lod_object *lo) { struct lod_layout_component *lod_comp; + __u32 obj_attr = lo->ldo_obj.do_lu.lo_header->loh_attr; int i, j; if (unlikely(lo->ldo_is_foreign)) { - lod_free_foreign_lov(lo); - lo->ldo_comp_cached = 0; - } else if (unlikely(lo->ldo_dir_is_foreign)) { - lod_free_foreign_lmv(lo); - lo->ldo_dir_stripe_loaded = 0; + if (S_ISREG(obj_attr)) { + lod_free_foreign_lov(lo); + lo->ldo_comp_cached = 0; + } else if (S_ISDIR(obj_attr)) { + lod_free_foreign_lmv(lo); + lo->ldo_dir_stripe_loaded = 0; + } } else if (lo->ldo_stripe != NULL) { LASSERT(lo->ldo_comp_entries == NULL); LASSERT(lo->ldo_dir_stripes_allocated > 0); @@ -7952,11 +9613,15 @@ void lod_striping_free_nolock(const struct lu_env *env, struct lod_object *lo) lo->ldo_dir_stripes_allocated = 0; lo->ldo_dir_stripe_loaded = 0; lo->ldo_dir_stripe_count = 0; + lo->ldo_obj.do_index_ops = NULL; } else if (lo->ldo_comp_entries != NULL) { for (i = 0; i < lo->ldo_comp_cnt; i++) { /* free lod_layout_component::llc_stripe array */ lod_comp = &lo->ldo_comp_entries[i]; + /* HSM layout component */ + if (lod_comp->llc_magic == LOV_MAGIC_FOREIGN) + continue; if (lod_comp->llc_stripe == NULL) continue; LASSERT(lod_comp->llc_stripes_allocated != 0); @@ -7965,13 +9630,11 @@ void lod_striping_free_nolock(const struct lu_env *env, struct lod_object *lo) lu_object_put(env, &lod_comp->llc_stripe[j]->do_lu); } - OBD_FREE(lod_comp->llc_stripe, - sizeof(struct dt_object *) * - lod_comp->llc_stripes_allocated); + OBD_FREE_PTR_ARRAY(lod_comp->llc_stripe, + lod_comp->llc_stripes_allocated); lod_comp->llc_stripe = NULL; - OBD_FREE(lod_comp->llc_ost_indices, - sizeof(__u32) * - lod_comp->llc_stripes_allocated); + OBD_FREE_PTR_ARRAY(lod_comp->llc_ost_indices, + lod_comp->llc_stripes_allocated); lod_comp->llc_ost_indices = NULL; lod_comp->llc_stripes_allocated = 0; } @@ -8000,6 +9663,7 @@ static void lod_object_free(const struct lu_env *env, struct lu_object *o) /* release all underlying object pinned */ lod_striping_free(env, lo); lu_object_fini(o); + /* lo doesn't contain a lu_object_header, so we don't need call_rcu */ OBD_SLAB_FREE_PTR(lo, lod_object_kmem); } @@ -8029,7 +9693,7 @@ static int lod_object_print(const struct lu_env *env, void *cookie, return (*p)(env, cookie, LUSTRE_LOD_NAME"-object@%p", o); } -struct lu_object_operations lod_lu_obj_ops = { +const struct lu_object_operations lod_lu_obj_ops = { .loo_object_init = lod_object_init, .loo_object_free = lod_object_free, .loo_object_release = lod_object_release,