* \param[in] env execution environment for this thread
* \param[in] lo LOD object
* \param[in] buf buffer storing LOV EA to parse
+ * \param[in] lvf verify flags when parsing the layout
*
* \retval 0 if parsing and objects creation succeed
* \retval negative error number on failure
*/
int lod_parse_striping(const struct lu_env *env, struct lod_object *lo,
- const struct lu_buf *buf)
+ const struct lu_buf *buf, enum layout_verify_flags lvf)
{
+ struct lod_device *d = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
struct lov_mds_md_v1 *lmm;
struct lov_comp_md_v1 *comp_v1 = NULL;
struct lov_foreign_md *foreign = NULL;
__u16 mirror_cnt = 0;
__u16 comp_cnt;
int i, rc;
+ __u16 mirror_id = MIRROR_ID_NEG;
+ bool stale = false;
+ int stale_mirrors = 0;
ENTRY;
LASSERT(buf);
lod_comp->llc_flags =
le32_to_cpu(comp_v1->lcm_entries[i].lcme_flags);
+
if (lod_comp->llc_flags & LCME_FL_NOSYNC)
lod_comp->llc_timestamp = le64_to_cpu(
comp_v1->lcm_entries[i].lcme_timestamp);
if (lod_comp->llc_id == LCME_ID_INVAL)
GOTO(out, rc = -EINVAL);
+ if (lvf & LVF_ALL_STALE) {
+ if (mirror_id_of(lod_comp->llc_id) ==
+ mirror_id) {
+ /* remaining comps in the mirror */
+ stale |= lod_comp->llc_flags &
+ LCME_FL_STALE;
+ } else {
+ /*
+ * new mirror, check last mirror's
+ * stale-ness
+ */
+ if (stale)
+ stale_mirrors++;
+
+ mirror_id =
+ mirror_id_of(lod_comp->llc_id);
+
+ /* the first comp of the new mirror */
+ stale = lod_comp->llc_flags &
+ LCME_FL_STALE;
+ }
+ }
+
if ((lod_comp->llc_flags & LCME_FL_EXTENSION) &&
comp_v1->lcm_magic != cpu_to_le32(LOV_MAGIC_SEL)) {
- struct lod_device *d =
- lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
-
CWARN("%s: EXTENSION flags=%x set on component[%u]=%x of non-SEL file "DFID" with magic=%#08x\n",
lod2obd(d)->obd_name,
lod_comp->llc_flags, lod_comp->llc_id, i,
}
}
+ if (lo->ldo_is_composite && (lvf & LVF_ALL_STALE)) {
+ /* check the last mirror stale-ness */
+ if (stale)
+ stale_mirrors++;
+
+ if (mirror_cnt == stale_mirrors) {
+ rc = -EPERM;
+ CERROR("%s: can not set all stale mirrors for "
+ DFID": rc = %d\n",
+ lod2obd(d)->obd_name, PFID(lod_object_fid(lo)),
+ rc);
+ GOTO(out, rc);
+ }
+ }
+
rc = lod_fill_mirrors(lo);
if (rc)
GOTO(out, rc);
*/
buf->lb_buf = info->lti_ea_store;
buf->lb_len = info->lti_ea_store_size;
- rc = lod_parse_striping(env, lo, buf);
+ rc = lod_parse_striping(env, lo, buf, 0);
if (rc == 0)
lo->ldo_comp_cached = 1;
} else if (S_ISDIR(lod2lu_obj(lo)->lo_header->loh_attr)) {
}
int lod_striping_reload(const struct lu_env *env, struct lod_object *lo,
- const struct lu_buf *buf)
+ const struct lu_buf *buf, enum layout_verify_flags lvf)
{
int rc;
ENTRY;
mutex_lock(&lo->ldo_layout_mutex);
- rc = lod_parse_striping(env, lo, buf);
+ rc = lod_parse_striping(env, lo, buf, lvf);
mutex_unlock(&lo->ldo_layout_mutex);
RETURN(rc);