struct cl_lockset ci_lockset;
/** lock requirements, this is just a help info for sublayers. */
enum cl_io_lock_dmd ci_lockreq;
+ /** layout version when this IO occurs */
+ __u32 ci_layout_version;
union {
struct cl_rw_io {
struct iov_iter rw_iter;
*/
ci_ignore_layout:1,
/**
- * Need MDS intervention to complete a write. This usually means the
- * corresponding component is not initialized for the writing extent.
+ * Need MDS intervention to complete a write.
+ * Write intent is required for the following cases:
+ * 1. component being written is not initialized, or
+ * 2. the mirrored files are NOT in WRITE_PENDING state.
*/
ci_need_write_intent:1,
/**
__u32 la_rdev;
/** project id */
__u32 la_projid;
+ /** set layout version to OST objects. */
+ __u32 la_layout_version;
};
/** Bit-mask of valid attributes */
LA_KILL_SUID = 1 << 13,
LA_KILL_SGID = 1 << 14,
LA_PROJID = 1 << 15,
+ LA_LAYOUT_VERSION = 1 << 16,
+ /**
+ * Attributes must be transmitted to OST objects
+ */
+ LA_REMOTE_ATTR_SET = (LA_UID | LA_GID | LA_PROJID | LA_LAYOUT_VERSION)
};
/**
{
fid_cpu_to_le(&dst->ff_parent, &src->ff_parent);
- if (size < sizeof(struct filter_fid))
+ if (size < sizeof(struct filter_fid)) {
memset(&dst->ff_layout, 0, sizeof(dst->ff_layout));
- else
+ } else {
ost_layout_cpu_to_le(&dst->ff_layout, &src->ff_layout);
+ dst->ff_layout_version = cpu_to_le32(src->ff_layout_version);
+ dst->ff_range = cpu_to_le32(src->ff_range);
+ }
/* XXX: Add more if filter_fid is enlarged in the future. */
}
{
fid_le_to_cpu(&dst->ff_parent, &src->ff_parent);
- if (size < sizeof(struct filter_fid))
+ if (size < sizeof(struct filter_fid)) {
memset(&dst->ff_layout, 0, sizeof(dst->ff_layout));
- else
+ } else {
ost_layout_le_to_cpu(&dst->ff_layout, &src->ff_layout);
+ dst->ff_layout_version = le32_to_cpu(src->ff_layout_version);
+ dst->ff_range = le32_to_cpu(src->ff_range);
+ }
/* XXX: Add more if filter_fid is enlarged in the future. */
}
struct osc_page *ops);
int osc_flush_async_page(const struct lu_env *env, struct cl_io *io,
struct osc_page *ops);
-int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
- struct list_head *list, int brw_flags);
+int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io,
+ struct osc_object *obj, struct list_head *list,
+ int brw_flags);
int osc_cache_truncate_start(const struct lu_env *env, struct osc_object *obj,
__u64 size, struct osc_extent **extp);
void osc_cache_truncate_end(const struct lu_env *env, struct osc_extent *ext);
int oe_rc;
/** max pages per rpc when this extent was created */
unsigned int oe_mppr;
+ /** FLR: layout version when this osc_extent is publised */
+ __u32 oe_layout_version;
};
/** @} osc */
/* CREAT needs to be tested before open (both could be set) */
if (it->it_op & IT_CREAT)
return LCK_CW;
- else if (it->it_op & (IT_GETATTR | IT_OPEN | IT_LOOKUP |
- IT_LAYOUT))
+ else if (it->it_op & (IT_GETATTR | IT_OPEN | IT_LOOKUP))
return LCK_CR;
+ else if (it->it_op & IT_LAYOUT)
+ return (it->it_flags & FMODE_WRITE) ? LCK_EX : LCK_CR;
else if (it->it_op & IT_READDIR)
return LCK_PR;
else if (it->it_op & IT_GETXATTR)
/* FLR */
#define OBD_FAIL_FLR_GLIMPSE_IMMUTABLE 0x1A00
+#define OBD_FAIL_FLR_LV_DELAY 0x1A01
+#define OBD_FAIL_FLR_LV_INC 0x1A02
/* DT */
#define OBD_FAIL_DT_DECLARE_ATTR_GET 0x2000
#define OBD_MD_DOM_SIZE (0X00001000ULL) /* Data-on-MDT component size */
#define OBD_MD_FLNLINK (0x00002000ULL) /* link count */
#define OBD_MD_FLGENER (0x00004000ULL) /* generation number */
-/*#define OBD_MD_FLINLINE (0x00008000ULL) inline data. used until 1.6.5 */
+#define OBD_MD_LAYOUT_VERSION (0x00008000ULL) /* layout version for
+ * OST objects */
#define OBD_MD_FLRDEV (0x00010000ULL) /* device number */
#define OBD_MD_FLEASIZE (0x00020000ULL) /* extended attribute data */
#define OBD_MD_LINKNAME (0x00040000ULL) /* symbolic link target */
__u32 lsr_gid_h;
__u64 lsr_valid;
__u32 lsr_projid;
- __u32 lsr_padding1;
+ __u32 lsr_layout_version;
__u64 lsr_padding2;
__u64 lsr_padding3;
struct llog_rec_tail lsr_tail;
*
* sizeof(ost_layout) + sieof(__u32) == sizeof(llog_cookie). */
struct ost_layout o_layout;
- __u32 o_padding_3;
+ __u32 o_layout_version;
__u32 o_uid_h;
__u32 o_gid_h;
struct filter_fid {
struct lu_fid ff_parent;
struct ost_layout ff_layout;
+ __u32 ff_layout_version;
+ __u32 ff_range; /* range of layout version that
+ * write are allowed */
} __attribute__((packed));
/* Userspace should treat lu_fid as opaque, and only use the following methods
#define LCME_KNOWN_FLAGS (LCME_FL_NEG | LCME_FL_INIT)
+/* the highest bit in obdo::o_layout_version is used to mark if the file is
+ * being resynced. */
+#define LU_LAYOUT_RESYNC LCME_FL_NEG
+
/* lcme_id can be specified as certain flags, and the the first
* bit of lcme_id is used to indicate that the ID is representing
* certain LCME_FL_* but not a real ID. Which implies we can have
CL_CTIME = 18,
CL_ATIME = 19,
CL_MIGRATE = 20,
+ CL_FLRW = 21, /* FLR: file was firstly written */
+ CL_RESYNC = 22, /* FLR: file was resync-ed */
CL_LAST
};
static const char *changelog_str[] = {
"MARK", "CREAT", "MKDIR", "HLINK", "SLINK", "MKNOD", "UNLNK",
"RMDIR", "RENME", "RNMTO", "OPEN", "CLOSE", "LYOUT", "TRUNC",
- "SATTR", "XATTR", "HSM", "MTIME", "CTIME", "ATIME", "MIGRT"
+ "SATTR", "XATTR", "HSM", "MTIME", "CTIME", "ATIME", "MIGRT",
+ "FLRW", "RESYNC",
};
if (type >= 0 && type < CL_LAST)
}
truncate_inode_pages(inode->i_mapping, 0);
+ if (inode->i_mapping->nrpages) {
+ CDEBUG(D_VFSTRACE, DFID ": still has %lu pages remaining\n",
+ PFID(lu_object_fid(&obj->co_lu)),
+ inode->i_mapping->nrpages);
+ RETURN(-EIO);
+ }
+
RETURN(0);
}
if (inuse->op_size)
OBD_FREE(inuse->op_array, inuse->op_size);
+ if (info->lti_comp_size > 0)
+ OBD_FREE(info->lti_comp_idx,
+ info->lti_comp_size * sizeof(__u32));
+
OBD_FREE_PTR(info);
}
/* used to store parent default striping in create */
struct lod_default_striping lti_def_striping;
struct filter_fid lti_ff;
+ __u32 *lti_comp_idx;
+ size_t lti_comp_size;
+ size_t lti_count;
+ struct lu_attr lti_layout_attr;
};
extern const struct lu_device_operations lod_lu_ops;
return &obj->ldo_obj.do_lu;
}
+static inline const struct lu_fid *lod_object_fid(struct lod_object *obj)
+{
+ return lu_object_fid(lod2lu_obj(obj));
+}
+
static inline struct lod_object *lod_obj(const struct lu_object *o)
{
LASSERT(lu_device_is_lod(o->lo_dev));
int lod_pool_add(struct obd_device *obd, char *poolname, char *ostname);
int lod_pool_remove(struct obd_device *obd, char *poolname, char *ostname);
+struct lod_obj_stripe_cb_data;
+typedef int (*lod_obj_stripe_cb_t)(const struct lu_env *env,
+ struct lod_object *lo, struct dt_object *dt,
+ struct thandle *th,
+ int comp_idx, int stripe_idx,
+ struct lod_obj_stripe_cb_data *data);
+typedef bool (*lod_obj_comp_skip_cb_t)(const struct lu_env *env,
+ struct lod_object *lo, int comp_idx,
+ struct lod_obj_stripe_cb_data *data);
struct lod_obj_stripe_cb_data {
union {
const struct lu_attr *locd_attr;
struct ost_pool *locd_inuse;
};
- bool locd_declare;
+ lod_obj_stripe_cb_t locd_stripe_cb;
+ lod_obj_comp_skip_cb_t locd_comp_skip_cb;
+ bool locd_declare;
};
-typedef int (*lod_obj_stripe_cb_t)(const struct lu_env *env,
- struct lod_object *lo, struct dt_object *dt,
- struct thandle *th, int stripe_idx,
- struct lod_obj_stripe_cb_data *data);
/* lod_qos.c */
int lod_prepare_inuse(const struct lu_env *env, struct lod_object *lo);
int lod_prepare_create(const struct lu_env *env, struct lod_object *lo,
const struct lu_buf *);
int lod_obj_stripe_set_inuse_cb(const struct lu_env *env, struct lod_object *lo,
struct dt_object *dt, struct thandle *th,
- int stripe_idx,
+ int comp_idx, int stripe_idx,
struct lod_obj_stripe_cb_data *data);
int lod_qos_parse_config(const struct lu_env *env, struct lod_object *lo,
const struct lu_buf *buf);
void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo);
int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo,
- struct thandle *th, lod_obj_stripe_cb_t cb,
+ struct thandle *th,
struct lod_obj_stripe_cb_data *data);
/* lod_sub_object.c */
}
int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo,
- struct thandle *th, lod_obj_stripe_cb_t cb,
+ struct thandle *th,
struct lod_obj_stripe_cb_data *data)
{
struct lod_layout_component *lod_comp;
if (lod_comp->llc_stripe == NULL)
continue;
+ /* has stripe but not inited yet, this component has been
+ * declared to be created, but hasn't created yet.
+ */
+ if (!lod_comp_inited(lod_comp))
+ continue;
+
+ if (data->locd_comp_skip_cb &&
+ data->locd_comp_skip_cb(env, lo, i, data))
+ continue;
+
LASSERT(lod_comp->llc_stripe_count > 0);
for (j = 0; j < lod_comp->llc_stripe_count; j++) {
struct dt_object *dt = lod_comp->llc_stripe[j];
if (dt == NULL)
continue;
- rc = cb(env, lo, dt, th, j, data);
+ rc = data->locd_stripe_cb(env, lo, dt, th, i, j, data);
if (rc != 0)
RETURN(rc);
}
RETURN(0);
}
+static bool lod_obj_attr_set_comp_skip_cb(const struct lu_env *env,
+ struct lod_object *lo, int comp_idx,
+ struct lod_obj_stripe_cb_data *data)
+{
+ struct lod_layout_component *lod_comp = &lo->ldo_comp_entries[comp_idx];
+ bool skipped = false;
+
+ if (!(data->locd_attr->la_valid & LA_LAYOUT_VERSION))
+ return skipped;
+
+ switch (lo->ldo_flr_state) {
+ case LCM_FL_WRITE_PENDING: {
+ int i;
+
+ /* skip stale components */
+ if (lod_comp->llc_flags & LCME_FL_STALE) {
+ skipped = true;
+ break;
+ }
+
+ /* skip valid and overlapping components, therefore any
+ * attempts to write overlapped components will never succeed
+ * because client will get EINPROGRESS. */
+ for (i = 0; i < lo->ldo_comp_cnt; i++) {
+ if (i == comp_idx)
+ continue;
+
+ if (lo->ldo_comp_entries[i].llc_flags & LCME_FL_STALE)
+ continue;
+
+ if (lu_extent_is_overlapped(&lod_comp->llc_extent,
+ &lo->ldo_comp_entries[i].llc_extent)) {
+ skipped = true;
+ break;
+ }
+ }
+ break;
+ }
+ default:
+ LASSERTF(0, "impossible: %d\n", lo->ldo_flr_state);
+ case LCM_FL_SYNC_PENDING:
+ break;
+ }
+
+ CDEBUG(D_LAYOUT, DFID": %s to set component %x to version: %u\n",
+ PFID(lu_object_fid(&lo->ldo_obj.do_lu)),
+ skipped ? "skipped" : "chose", lod_comp->llc_id,
+ data->locd_attr->la_layout_version);
+
+ return skipped;
+}
+
static inline int
lod_obj_stripe_attr_set_cb(const struct lu_env *env, struct lod_object *lo,
struct dt_object *dt, struct thandle *th,
- int stripe_idx, struct lod_obj_stripe_cb_data *data)
+ int comp_idx, int stripe_idx,
+ struct lod_obj_stripe_cb_data *data)
{
if (data->locd_declare)
return lod_sub_declare_attr_set(env, dt, data->locd_attr, th);
* speed up rename().
*/
if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
- if (!(attr->la_valid & (LA_UID | LA_GID | LA_PROJID)))
+ if (!(attr->la_valid & LA_REMOTE_ATTR_SET))
RETURN(rc);
if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
RETURN(rc);
}
} else {
- struct lod_obj_stripe_cb_data data;
+ struct lod_obj_stripe_cb_data data = { { 0 } };
data.locd_attr = attr;
data.locd_declare = true;
- rc = lod_obj_for_each_stripe(env, lo, th,
- lod_obj_stripe_attr_set_cb, &data);
+ data.locd_stripe_cb = lod_obj_stripe_attr_set_cb;
+ rc = lod_obj_for_each_stripe(env, lo, th, &data);
}
if (rc)
RETURN(rc);
if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) {
- if (!(attr->la_valid & (LA_UID | LA_GID | LA_PROJID)))
+ if (!(attr->la_valid & LA_REMOTE_ATTR_SET))
RETURN(rc);
if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER))
RETURN(rc);
}
+ /* FIXME: a tricky case in the code path of mdd_layout_change():
+ * the in-memory striping information has been freed in lod_xattr_set()
+ * due to layout change. It has to load stripe here again. It only
+ * changes flags of layout so declare_attr_set() is still accurate */
+ rc = lod_load_striping_locked(env, lo);
+ if (rc)
+ RETURN(rc);
+
if (!lod_obj_is_striped(dt))
RETURN(0);
break;
}
} else {
- struct lod_obj_stripe_cb_data data;
+ struct lod_obj_stripe_cb_data data = { { 0 } };
data.locd_attr = attr;
data.locd_declare = false;
- rc = lod_obj_for_each_stripe(env, lo, th,
- lod_obj_stripe_attr_set_cb, &data);
+ data.locd_comp_skip_cb = lod_obj_attr_set_comp_skip_cb;
+ data.locd_stripe_cb = lod_obj_stripe_attr_set_cb;
+ rc = lod_obj_for_each_stripe(env, lo, th, &data);
}
if (rc)
lod_obj_stripe_replace_parent_fid_cb(const struct lu_env *env,
struct lod_object *lo,
struct dt_object *dt, struct thandle *th,
- int stripe_idx,
+ int comp_idx, int stripe_idx,
struct lod_obj_stripe_cb_data *data)
{
struct lod_thread_info *info = lod_env_info(env);
struct lod_thread_info *info = lod_env_info(env);
struct lu_buf *buf = &info->lti_buf;
struct filter_fid *ff;
- struct lod_obj_stripe_cb_data data;
+ struct lod_obj_stripe_cb_data data = { { 0 } };
int rc;
ENTRY;
buf->lb_len = info->lti_ea_store_size;
data.locd_declare = declare;
- rc = lod_obj_for_each_stripe(env, lo, th,
- lod_obj_stripe_replace_parent_fid_cb,
- &data);
+ data.locd_stripe_cb = lod_obj_stripe_replace_parent_fid_cb;
+ rc = lod_obj_for_each_stripe(env, lo, th, &data);
RETURN(rc);
}
static inline int
lod_obj_stripe_destroy_cb(const struct lu_env *env, struct lod_object *lo,
struct dt_object *dt, struct thandle *th,
- int stripe_idx, struct lod_obj_stripe_cb_data *data)
+ int comp_idx, int stripe_idx,
+ struct lod_obj_stripe_cb_data *data)
{
if (data->locd_declare)
return lod_sub_declare_destroy(env, dt, th);
break;
}
} else {
- struct lod_obj_stripe_cb_data data;
+ struct lod_obj_stripe_cb_data data = { { 0 } };
data.locd_declare = true;
- rc = lod_obj_for_each_stripe(env, lo, th,
- lod_obj_stripe_destroy_cb, &data);
+ data.locd_stripe_cb = lod_obj_stripe_destroy_cb;
+ rc = lod_obj_for_each_stripe(env, lo, th, &data);
}
RETURN(rc);
}
}
} else {
- struct lod_obj_stripe_cb_data data;
+ struct lod_obj_stripe_cb_data data = { { 0 } };
data.locd_declare = false;
- rc = lod_obj_for_each_stripe(env, lo, th,
- lod_obj_stripe_destroy_cb, &data);
+ data.locd_stripe_cb = lod_obj_stripe_destroy_cb;
+ rc = lod_obj_for_each_stripe(env, lo, th, &data);
}
RETURN(rc);
return dt_invalidate(env, dt_object_child(dt));
}
-static int lod_declare_layout_change(const struct lu_env *env,
- struct dt_object *dt,
- struct md_layout_change *mlc,
- struct thandle *th)
+static int lod_layout_data_init(struct lod_thread_info *info, __u32 comp_cnt)
{
- struct lod_thread_info *info = lod_env_info(env);
- struct lod_object *lo = lod_dt_obj(dt);
- struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
- struct dt_object *next = dt_object_child(dt);
+ ENTRY;
+
+ /* clear memory region that will be used for layout change */
+ memset(&info->lti_layout_attr, 0, sizeof(struct lu_attr));
+ info->lti_count = 0;
+
+ if (info->lti_comp_size >= comp_cnt)
+ RETURN(0);
+
+ if (info->lti_comp_size > 0) {
+ OBD_FREE(info->lti_comp_idx,
+ info->lti_comp_size * sizeof(__u32));
+ info->lti_comp_size = 0;
+ }
+
+ OBD_ALLOC(info->lti_comp_idx, comp_cnt * sizeof(__u32));
+ if (!info->lti_comp_idx)
+ RETURN(-ENOMEM);
+
+ info->lti_comp_size = comp_cnt;
+ RETURN(0);
+}
+
+static int lod_declare_instantiate_components(const struct lu_env *env,
+ struct lod_object *lo, struct thandle *th)
+{
+ struct lod_thread_info *info = lod_env_info(env);
struct ost_pool *inuse = &info->lti_inuse_osts;
- struct layout_intent *layout = mlc->mlc_intent;
- struct lu_buf *buf = &mlc->mlc_buf;
+ int i;
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(info->lti_count < lo->ldo_comp_cnt);
+ if (info->lti_count > 0) {
+ /* Prepare inuse array for composite file */
+ rc = lod_prepare_inuse(env, lo);
+ if (rc)
+ RETURN(rc);
+ }
+
+ for (i = 0; i < info->lti_count; i++) {
+ rc = lod_qos_prep_create(env, lo, NULL, th,
+ info->lti_comp_idx[i], inuse);
+ if (rc)
+ break;
+ }
+
+ if (!rc) {
+ info->lti_buf.lb_len = lod_comp_md_size(lo, false);
+ rc = lod_sub_declare_xattr_set(env, lod_object_child(lo),
+ &info->lti_buf, XATTR_NAME_LOV, 0, th);
+ }
+
+ RETURN(rc);
+}
+
+static int lod_declare_update_plain(const struct lu_env *env,
+ struct lod_object *lo, struct layout_intent *layout,
+ const struct lu_buf *buf, struct thandle *th)
+{
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lod_device *d = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
struct lod_layout_component *lod_comp;
struct lov_comp_md_v1 *comp_v1 = NULL;
bool replay = false;
- bool need_create = false;
int i, rc;
ENTRY;
- if (!S_ISREG(dt->do_lu.lo_header->loh_attr) || !dt_object_exists(dt) ||
- dt_object_remote(next))
- RETURN(-EINVAL);
+ LASSERT(lo->ldo_flr_state == LCM_FL_NOT_FLR);
- dt_write_lock(env, next, 0);
/*
* In case the client is passing lovea, which only happens during
* the replay of layout intent write RPC for now, we may need to
rc = lod_load_striping_locked(env, lo);
if (rc)
GOTO(out, rc);
-
- /* Prepare inuse array for composite file */
- rc = lod_prepare_inuse(env, lo);
- if (rc)
- GOTO(out, rc);
}
/* Make sure defined layout covers the requested write range. */
}
CDEBUG(D_LAYOUT, "%s: "DFID": instantiate components "DEXT"\n",
- lod2obd(d)->obd_name, PFID(lu_object_fid(&dt->do_lu)),
+ lod2obd(d)->obd_name, PFID(lod_object_fid(lo)),
PEXT(&layout->li_extent));
/*
if (lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED)
GOTO(out, rc = -EINVAL);
- need_create = true;
+ LASSERT(info->lti_comp_idx != NULL);
+ info->lti_comp_idx[info->lti_count++] = i;
+ }
+
+ if (info->lti_count == 0)
+ RETURN(-EALREADY);
- rc = lod_qos_prep_create(env, lo, NULL, th, i, inuse);
- if (rc)
+ lod_obj_inc_layout_gen(lo);
+ rc = lod_declare_instantiate_components(env, lo, th);
+out:
+ if (rc)
+ lod_object_free_striping(env, lo);
+ RETURN(rc);
+}
+
+#define lod_foreach_mirror_comp(comp, lo, mirror_idx) \
+for (comp = &lo->ldo_comp_entries[lo->ldo_mirrors[mirror_idx].lme_start]; \
+ comp <= &lo->ldo_comp_entries[lo->ldo_mirrors[mirror_idx].lme_end]; \
+ comp++)
+
+static inline int lod_comp_index(struct lod_object *lo,
+ struct lod_layout_component *lod_comp)
+{
+ LASSERT(lod_comp >= lo->ldo_comp_entries &&
+ lod_comp <= &lo->ldo_comp_entries[lo->ldo_comp_cnt - 1]);
+
+ return lod_comp - lo->ldo_comp_entries;
+}
+
+/**
+ * Stale other mirrors by writing extent.
+ */
+static void lod_stale_components(struct lod_object *lo, int primary,
+ struct lu_extent *extent)
+{
+ struct lod_layout_component *pri_comp, *lod_comp;
+ int i;
+
+ /* The writing extent decides which components in the primary
+ * are affected... */
+ lod_foreach_mirror_comp(pri_comp, lo, primary) {
+ if (!lu_extent_is_overlapped(extent, &pri_comp->llc_extent))
+ continue;
+
+ for (i = 0; i < lo->ldo_mirror_count; i++) {
+ if (i == primary)
+ continue;
+
+ /* ... and then stale other components that are
+ * overlapping with primary components */
+ lod_foreach_mirror_comp(lod_comp, lo, i) {
+ if (!lu_extent_is_overlapped(
+ &pri_comp->llc_extent,
+ &lod_comp->llc_extent))
+ continue;
+
+ CDEBUG(D_LAYOUT, "stale: %u / %u\n",
+ i, lod_comp_index(lo, lod_comp));
+
+ lod_comp->llc_flags |= LCME_FL_STALE;
+ lo->ldo_mirrors[i].lme_stale = 1;
+ }
+ }
+ }
+}
+
+static int lod_declare_update_rdonly(const struct lu_env *env,
+ struct lod_object *lo, struct md_layout_change *mlc,
+ struct thandle *th)
+{
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lu_attr *layout_attr = &info->lti_layout_attr;
+ struct lod_layout_component *lod_comp;
+ struct layout_intent *layout = mlc->mlc_intent;
+ struct lu_extent extent = layout->li_extent;
+ int picked;
+ int i;
+ int rc;
+ ENTRY;
+
+ LASSERT(mlc->mlc_opc == MD_LAYOUT_WRITE);
+ LASSERT(lo->ldo_flr_state == LCM_FL_RDONLY);
+ LASSERT(lo->ldo_mirror_count > 0);
+
+ CDEBUG(D_LAYOUT, DFID": trying to write :"DEXT"\n",
+ PFID(lod_object_fid(lo)), PEXT(&extent));
+
+ /**
+ * Pick a mirror as the primary.
+ * Now it only picks the first mirror, this algo can be
+ * revised later after knowing the topology of cluster or
+ * the availability of OSTs.
+ */
+ for (picked = -1, i = 0; i < lo->ldo_mirror_count; i++) {
+ if (!lo->ldo_mirrors[i].lme_stale) {
+ picked = i;
break;
+ }
}
+ if (picked < 0) /* failed to pick a primary */
+ RETURN(-ENODATA);
- if (need_create)
- lod_obj_inc_layout_gen(lo);
- else
- GOTO(unlock, rc = -EALREADY);
+ CDEBUG(D_LAYOUT, DFID": picked mirror %u as primary\n",
+ PFID(lod_object_fid(lo)), lo->ldo_mirrors[picked].lme_id);
- if (!rc) {
- info->lti_buf.lb_len = lod_comp_md_size(lo, false);
- rc = lod_sub_declare_xattr_set(env, next, &info->lti_buf,
- XATTR_NAME_LOV, 0, th);
+ /* stale overlapping components from other mirrors */
+ lod_stale_components(lo, picked, &extent);
+
+ /* instantiate components for the picked mirror, start from 0 */
+ extent = (struct lu_extent) { 0, layout->li_extent.e_end };
+ lod_foreach_mirror_comp(lod_comp, lo, picked) {
+ if (!lu_extent_is_overlapped(&extent,
+ &lod_comp->llc_extent))
+ break;
+
+ if (lod_comp_inited(lod_comp))
+ continue;
+
+ CDEBUG(D_LAYOUT, "instantiate: %u / %u\n",
+ i, lod_comp_index(lo, lod_comp));
+
+ info->lti_comp_idx[info->lti_count++] =
+ lod_comp_index(lo, lod_comp);
}
+
+ lo->ldo_flr_state = LCM_FL_WRITE_PENDING;
+
+ /* Reset the layout version once it's becoming too large.
+ * This way it can make sure that the layout version is
+ * monotonously increased in this writing era. */
+ lod_obj_inc_layout_gen(lo);
+ if (lo->ldo_layout_gen > (LCME_ID_MAX >> 1)) {
+ __u32 layout_version;
+
+ cfs_get_random_bytes(&layout_version, sizeof(layout_version));
+ lo->ldo_layout_gen = layout_version & 0xffff;
+ }
+
+ rc = lod_declare_instantiate_components(env, lo, th);
+ if (rc)
+ GOTO(out, rc);
+
+ layout_attr->la_valid = LA_LAYOUT_VERSION;
+ layout_attr->la_layout_version = 0; /* set current version */
+ rc = lod_declare_attr_set(env, &lo->ldo_obj, layout_attr, th);
+ if (rc)
+ GOTO(out, rc);
+
out:
if (rc)
lod_object_free_striping(env, lo);
+ RETURN(rc);
+}
-unlock:
- dt_write_unlock(env, next);
+static int lod_declare_update_write_pending(const struct lu_env *env,
+ struct lod_object *lo, struct md_layout_change *mlc,
+ struct thandle *th)
+{
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lu_attr *layout_attr = &info->lti_layout_attr;
+ struct lod_layout_component *lod_comp;
+ struct lu_extent extent = { 0 };
+ int primary = -1;
+ int i;
+ int rc;
+ ENTRY;
+
+ LASSERT(lo->ldo_flr_state == LCM_FL_WRITE_PENDING);
+ LASSERT(mlc->mlc_opc == MD_LAYOUT_WRITE);
+
+ /* look for the primary mirror */
+ for (i = 0; i < lo->ldo_mirror_count; i++) {
+ if (lo->ldo_mirrors[i].lme_stale)
+ continue;
+
+ LASSERTF(primary < 0, DFID " has multiple primary: %u / %u",
+ PFID(lod_object_fid(lo)),
+ lo->ldo_mirrors[i].lme_id,
+ lo->ldo_mirrors[primary].lme_id);
+
+ primary = i;
+ }
+ if (primary < 0) {
+ CERROR(DFID ": doesn't have a primary mirror\n",
+ PFID(lod_object_fid(lo)));
+ GOTO(out, rc = -ENODATA);
+ }
+
+ CDEBUG(D_LAYOUT, DFID": found primary %u\n",
+ PFID(lod_object_fid(lo)), lo->ldo_mirrors[primary].lme_id);
+ LASSERT(!lo->ldo_mirrors[primary].lme_stale);
+
+ /* for LAYOUT_WRITE opc, it has to do the following operations:
+ * 1. stale overlapping componets from stale mirrors;
+ * 2. instantiate components of the primary mirror;
+ * 3. transfter layout version to all objects of the primary; */
+
+ if (mlc->mlc_opc == MD_LAYOUT_WRITE) {
+ LASSERT(mlc->mlc_intent != NULL);
+
+ extent = mlc->mlc_intent->li_extent;
+
+ CDEBUG(D_LAYOUT, DFID": intent to write: "DEXT"\n",
+ PFID(lod_object_fid(lo)), PEXT(&extent));
+
+ /* 1. stale overlapping components */
+ lod_stale_components(lo, primary, &extent);
+
+ /* 2. find out the components need instantiating.
+ * instantiate [0, mlc->mlc_intent->e_end) */
+ extent.e_start = 0;
+ lod_foreach_mirror_comp(lod_comp, lo, primary) {
+ if (!lu_extent_is_overlapped(&extent,
+ &lod_comp->llc_extent))
+ break;
+
+ if (lod_comp_inited(lod_comp))
+ continue;
+
+ CDEBUG(D_LAYOUT, "write instantiate %d / %d\n",
+ primary, lod_comp_index(lo, lod_comp));
+ info->lti_comp_idx[info->lti_count++] =
+ lod_comp_index(lo, lod_comp);
+ }
+ }
+
+ rc = lod_declare_instantiate_components(env, lo, th);
+ if (rc)
+ GOTO(out, rc);
+
+ layout_attr->la_valid = LA_LAYOUT_VERSION;
+ layout_attr->la_layout_version = 0; /* set current version */
+ rc = lod_declare_attr_set(env, &lo->ldo_obj, layout_attr, th);
+ if (rc)
+ GOTO(out, rc);
+
+ lod_obj_inc_layout_gen(lo);
+out:
+ if (rc)
+ lod_object_free_striping(env, lo);
+ RETURN(rc);
+}
+
+static int lod_declare_layout_change(const struct lu_env *env,
+ struct dt_object *dt, struct md_layout_change *mlc,
+ struct thandle *th)
+{
+ struct lod_thread_info *info = lod_env_info(env);
+ struct lod_object *lo = lod_dt_obj(dt);
+ int rc;
+ ENTRY;
+
+ if (!S_ISREG(dt->do_lu.lo_header->loh_attr) || !dt_object_exists(dt) ||
+ dt_object_remote(dt_object_child(dt)))
+ RETURN(-EINVAL);
+
+ lod_write_lock(env, dt, 0);
+ rc = lod_load_striping_locked(env, lo);
+ if (rc)
+ GOTO(out, rc);
+
+ LASSERT(lo->ldo_comp_cnt > 0);
+
+ rc = lod_layout_data_init(info, lo->ldo_comp_cnt);
+ if (rc)
+ GOTO(out, rc);
+
+ switch (lo->ldo_flr_state) {
+ case LCM_FL_NOT_FLR:
+ rc = lod_declare_update_plain(env, lo, mlc->mlc_intent,
+ &mlc->mlc_buf, th);
+ break;
+ case LCM_FL_RDONLY:
+ rc = lod_declare_update_rdonly(env, lo, mlc, th);
+ break;
+ case LCM_FL_WRITE_PENDING:
+ rc = lod_declare_update_write_pending(env, lo, mlc, th);
+ break;
+ case LCM_FL_SYNC_PENDING:
+ default:
+ rc = -ENOTSUPP;
+ break;
+ }
+out:
+ dt_write_unlock(env, dt);
RETURN(rc);
}
struct md_layout_change *mlc, struct thandle *th)
{
struct lu_attr *attr = &lod_env_info(env)->lti_attr;
+ struct lu_attr *layout_attr = &lod_env_info(env)->lti_layout_attr;
+ struct lod_object *lo = lod_dt_obj(dt);
+ int rc;
- RETURN(lod_striped_create(env, dt, attr, NULL, th));
+ rc = lod_striped_create(env, dt, attr, NULL, th);
+ if (!rc && layout_attr->la_valid & LA_LAYOUT_VERSION) {
+ layout_attr->la_layout_version |= lo->ldo_layout_gen;
+ rc = lod_attr_set(env, dt, layout_attr, th);
+ }
+
+ return rc;
}
struct dt_object_operations lod_obj_ops = {
int lod_obj_stripe_set_inuse_cb(const struct lu_env *env,
struct lod_object *lo,
struct dt_object *dt, struct thandle *th,
- int stripe_idx,
+ int comp_idx, int stripe_idx,
struct lod_obj_stripe_cb_data *data)
{
struct lod_thread_info *info = lod_env_info(env);
struct lod_thread_info *info = lod_env_info(env);
struct lod_device *d = lu2lod_dev(lod2lu_obj(lo)->lo_dev);
struct ost_pool *inuse = &info->lti_inuse_osts;
- struct lod_obj_stripe_cb_data data;
+ struct lod_obj_stripe_cb_data data = { { 0 } };
__u32 stripe_count = 0;
int i;
int rc;
return rc;
data.locd_inuse = inuse;
- return lod_obj_for_each_stripe(env, lo, NULL,
- lod_obj_stripe_set_inuse_cb, &data);
+ data.locd_stripe_cb = lod_obj_stripe_set_inuse_cb;
+ return lod_obj_for_each_stripe(env, lo, NULL, &data);
}
int lod_prepare_create(const struct lu_env *env, struct lod_object *lo,
struct lov_mirror_entry {
unsigned short lre_mirror_id;
unsigned short lre_preferred:1,
+ lre_stale:1, /* set if any components is stale */
lre_valid:1; /* set if at least one of components
* in this mirror is valid */
unsigned short lre_start; /* index to lo_entries, start index of
struct cl_page_slice lps_cl;
/** layout_entry + stripe index, composed using lov_comp_index() */
unsigned int lps_index;
+ /* the layout gen when this page was created */
+ __u32 lps_layout_gen;
};
/*
sub_io->ci_pio = io->ci_pio;
sub_io->ci_lock_no_expand = io->ci_lock_no_expand;
sub_io->ci_ndelay = io->ci_ndelay;
+ sub_io->ci_layout_version = io->ci_layout_version;
result = cl_io_sub_init(sub->sub_env, sub_io, io->ci_type, sub_obj);
RETURN(0);
}
+/**
+ * Decide if it will need write intent RPC
+ */
+static int lov_io_mirror_write_intent(struct lov_io *lio,
+ struct lov_object *obj, struct cl_io *io)
+{
+ struct lov_layout_composite *comp = &obj->u.composite;
+ struct lu_extent *ext = &io->ci_write_intent;
+ struct lov_mirror_entry *lre;
+ struct lov_mirror_entry *primary;
+ struct lov_layout_entry *lle;
+ size_t count = 0;
+ ENTRY;
+
+ *ext = (typeof(*ext)) { lio->lis_pos, lio->lis_endpos };
+ io->ci_need_write_intent = 0;
+
+ if (!(io->ci_type == CIT_WRITE || cl_io_is_trunc(io) ||
+ cl_io_is_mkwrite(io)))
+ RETURN(0);
+
+ if (lov_flr_state(obj) == LCM_FL_RDONLY ||
+ lov_flr_state(obj) == LCM_FL_SYNC_PENDING) {
+ io->ci_need_write_intent = 1;
+ RETURN(0);
+ }
+
+ LASSERT((lov_flr_state(obj) == LCM_FL_WRITE_PENDING));
+ LASSERT(comp->lo_preferred_mirror >= 0);
+
+ /* need to iterate all components to see if there are
+ * multiple components covering the writing component */
+ primary = &comp->lo_mirrors[comp->lo_preferred_mirror];
+ LASSERT(!primary->lre_stale);
+ lov_foreach_mirror_layout_entry(obj, lle, primary) {
+ LASSERT(lle->lle_valid);
+ if (!lu_extent_is_overlapped(ext, lle->lle_extent))
+ continue;
+
+ ext->e_start = MIN(ext->e_start, lle->lle_extent->e_start);
+ ext->e_end = MAX(ext->e_end, lle->lle_extent->e_end);
+ ++count;
+ }
+ if (count == 0) {
+ CERROR(DFID ": cannot find any valid components covering "
+ "file extent "DEXT", mirror: %d\n",
+ PFID(lu_object_fid(lov2lu(obj))), PEXT(ext),
+ primary->lre_mirror_id);
+ RETURN(-EIO);
+ }
+
+ count = 0;
+ lov_foreach_mirror_entry(obj, lre) {
+ if (lre == primary)
+ continue;
+
+ lov_foreach_mirror_layout_entry(obj, lle, lre) {
+ if (!lle->lle_valid)
+ continue;
+
+ if (lu_extent_is_overlapped(ext, lle->lle_extent)) {
+ ++count;
+ break;
+ }
+ }
+ }
+
+ CDEBUG(D_VFSTRACE, DFID "there are %zd components to be staled to "
+ "modify file extent "DEXT", iot: %d\n",
+ PFID(lu_object_fid(lov2lu(obj))), count, PEXT(ext), io->ci_type);
+
+ io->ci_need_write_intent = count > 0;
+
+ RETURN(0);
+}
+
static int lov_io_mirror_init(struct lov_io *lio, struct lov_object *obj,
struct cl_io *io)
{
struct lov_layout_composite *comp = &obj->u.composite;
int index;
int i;
+ int result;
ENTRY;
if (!lov_is_flr(obj)) {
RETURN(0);
}
+ result = lov_io_mirror_write_intent(lio, obj, io);
+ if (result)
+ RETURN(result);
+
+ if (io->ci_need_write_intent) {
+ CDEBUG(D_VFSTRACE, DFID " need write intent for [%llu, %llu)\n",
+ PFID(lu_object_fid(lov2lu(obj))),
+ lio->lis_pos, lio->lis_endpos);
+
+ /* stop cl_io_init() loop */
+ RETURN(1);
+ }
+
+ /* transfer the layout version for verification */
+ io->ci_layout_version = obj->lo_lsm->lsm_layout_gen;
+
if (io->ci_ndelay_tried == 0 || /* first time to try */
/* reset the mirror index if layout has changed */
lio->lis_mirror_layout_gen != obj->lo_lsm->lsm_layout_gen) {
* the current file-tail exactly. */
if (unlikely(obj->lo_lsm->lsm_entries[0]->lsme_pattern &
LOV_PATTERN_F_HOLE))
- RETURN(-EIO);
+ GOTO(out, result = -EIO);
lio->lis_pos = 0;
lio->lis_endpos = OBD_OBJECT_EOF;
if (lov_flr_state(obj) == LCM_FL_RDONLY &&
!OBD_FAIL_CHECK(OBD_FAIL_FLR_GLIMPSE_IMMUTABLE))
- RETURN(1); /* SoM is accurate, no need glimpse */
+ /* SoM is accurate, no need glimpse */
+ GOTO(out, result = 1);
break;
case CIT_MISC:
result = lov_io_mirror_init(lio, obj, io);
if (result)
- RETURN(result);
+ GOTO(out, result);
/* check if it needs to instantiate layout */
if (!(io->ci_type == CIT_WRITE || cl_io_is_mkwrite(io) ||
(cl_io_is_trunc(io) && io->u.ci_setattr.sa_attr.lvb_size > 0)))
- RETURN(0);
+ GOTO(out, result = 0);
ext.e_start = lio->lis_pos;
ext.e_end = lio->lis_endpos;
if (!lsm_entry_inited(obj->lo_lsm, index)) {
io->ci_need_write_intent = 1;
io->ci_write_intent = ext;
- result = 1;
- break;
+ GOTO(out, result = 1);
}
}
+ EXIT;
- RETURN(result);
+out:
+ return result;
}
static void lov_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
if (index < 0 || !lsm_entry_inited(loo->lo_lsm, index))
RETURN(-ENODATA);
+ /* avoid readahead to expand to stale components */
+ if (!lov_entry(loo, index)->lle_valid)
+ RETURN(-EIO);
+
stripe = lov_stripe_number(loo->lo_lsm, index, offset);
r0 = lov_r0(loo, index);
if (i > 0) {
if (mirror_id == lre->lre_mirror_id) {
lre->lre_valid |= lle->lle_valid;
+ lre->lre_stale |= !lle->lle_valid;
lre->lre_end = i;
continue;
}
lre->lre_preferred = (lle->lle_lsme->lsme_flags &
LCME_FL_PREFERRED);
lre->lre_valid = lle->lle_valid;
+ lre->lre_stale = !lle->lle_valid;
}
/* sanity check for FLR */
mirror_count = 0, i = 0;
lov_foreach_mirror_entry(lov, lre) {
i++;
- if (!lre->lre_valid)
+ if (lre->lre_stale)
continue;
mirror_count++; /* valid mirror */
struct lov_page *lp = cl2lov_page(slice);
return (*printer)(env, cookie,
- LUSTRE_LOV_NAME"-page@%p, comp index: %x\n",
- lp, lp->lps_index);
+ LUSTRE_LOV_NAME"-page@%p, comp index: %x, gen: %u\n",
+ lp, lp->lps_index, lp->lps_layout_gen);
}
static const struct cl_page_operations lov_comp_page_ops = {
LASSERT(rc == 0);
lpg->lps_index = lov_comp_index(entry, stripe);
+ lpg->lps_layout_gen = loo->lo_lsm->lsm_layout_gen;
cl_page_slice_add(page, &lpg->lps_cl, obj, index, &lov_comp_page_ops);
sub = lov_sub_get(env, lio, lpg->lps_index);
}
/* For PFL, this is used to instantiate necessary component objects. */
-int mdd_layout_change(const struct lu_env *env, struct md_object *obj,
- struct md_layout_change *mlc)
+static int
+mdd_layout_instantiate_component(const struct lu_env *env,
+ struct mdd_object *obj, struct md_layout_change *mlc,
+ struct thandle *handle)
{
- struct mdd_object *mdd_obj = md2mdd_obj(obj);
- struct mdd_device *mdd = mdo2mdd(obj);
- struct thandle *handle;
+ struct mdd_device *mdd = mdd_obj2mdd_dev(obj);
int rc;
ENTRY;
- handle = mdd_trans_create(env, mdd);
- if (IS_ERR(handle))
- RETURN(PTR_ERR(handle));
-
- rc = mdd_declare_layout_change(env, mdd, mdd_obj, mlc, handle);
+ rc = mdd_declare_layout_change(env, mdd, obj, mlc, handle);
/**
* It's possible that another layout write intent has already
* instantiated our objects, so a -EALREADY returned, and we need to
* do nothing.
*/
if (rc)
- GOTO(stop, rc = (rc == -EALREADY) ? 0 : rc);
+ RETURN(rc == -EALREADY ? 0 : rc);
rc = mdd_trans_start(env, mdd, handle);
if (rc)
- GOTO(stop, rc);
+ RETURN(rc);
- mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
- rc = mdo_layout_change(env, mdd_obj, mlc, handle);
- mdd_write_unlock(env, mdd_obj);
+ mdd_write_lock(env, obj, MOR_TGT_CHILD);
+ rc = mdo_layout_change(env, obj, mlc, handle);
+ mdd_write_unlock(env, obj);
if (rc)
- GOTO(stop, rc);
+ RETURN(rc);
- rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, mdd_obj, handle);
-stop:
- RETURN(mdd_trans_stop(env, mdd, rc, handle));
+ rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, obj, handle);
+ RETURN(rc);
+}
+
+/**
+ * Change the FLR layout from RDONLY to WRITE_PENDING.
+ *
+ * It picks the primary mirror, and bumps the layout version, and set
+ * layout version xattr to OST objects in a sync tx. In order to facilitate
+ * the handling of phantom writers from evicted clients, the clients carry
+ * layout version of the file with write RPC, so that the OSTs can verify
+ * if the write RPCs are legitimate, meaning not from evicted clients.
+ */
+static int
+mdd_layout_update_rdonly(const struct lu_env *env, struct mdd_object *obj,
+ struct md_layout_change *mlc, struct thandle *handle)
+{
+ struct mdd_device *mdd = mdd_obj2mdd_dev(obj);
+ int rc;
+ ENTRY;
+
+ rc = mdd_declare_layout_change(env, mdd, obj, mlc, handle);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = mdd_declare_xattr_del(env, mdd, obj, XATTR_NAME_SOM, handle);
+ if (rc)
+ GOTO(out, rc);
+
+ /* record a changelog for data mover to consume */
+ rc = mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = mdd_trans_start(env, mdd, handle);
+ if (rc)
+ GOTO(out, rc);
+
+ /* it needs a sync tx to make FLR to work properly */
+ handle->th_sync = 1;
+
+ mdd_write_lock(env, obj, MOR_TGT_CHILD);
+ rc = mdo_layout_change(env, obj, mlc, handle);
+ if (!rc) {
+ rc = mdo_xattr_del(env, obj, XATTR_NAME_SOM, handle);
+ if (rc == -ENODATA)
+ rc = 0;
+ }
+ mdd_write_unlock(env, obj);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = mdd_changelog_data_store(env, mdd, CL_FLRW, 0, obj, handle);
+ if (rc)
+ GOTO(out, rc);
+
+ EXIT;
+
+out:
+ return rc;
+}
+
+static int
+mdd_layout_update_write_pending(const struct lu_env *env,
+ struct mdd_object *obj, struct md_layout_change *mlc,
+ struct thandle *handle)
+{
+ struct mdd_device *mdd = mdd_obj2mdd_dev(obj);
+ int rc;
+ ENTRY;
+
+ rc = mdd_declare_layout_change(env, mdd, obj, mlc, handle);
+ if (rc)
+ GOTO(out, rc);
+
+ rc = mdd_trans_start(env, mdd, handle);
+ if (rc)
+ GOTO(out, rc);
+
+ /* it needs a sync tx to make FLR to work properly */
+ handle->th_sync = 1;
+
+ mdd_write_lock(env, obj, MOR_TGT_CHILD);
+ rc = mdo_layout_change(env, obj, mlc, handle);
+ mdd_write_unlock(env, obj);
+ if (rc)
+ GOTO(out, rc);
+
+ EXIT;
+
+out:
+ return rc;
+}
+
+/**
+ * Layout change callback for object.
+ *
+ * This is only used by FLR for now. In the future, it can be exteneded to
+ * handle all layout change.
+ */
+static int
+mdd_layout_change(const struct lu_env *env, struct md_object *o,
+ struct md_layout_change *mlc)
+{
+ struct mdd_object *obj = md2mdd_obj(o);
+ struct mdd_device *mdd = mdd_obj2mdd_dev(obj);
+ struct lu_buf *buf = mdd_buf_get(env, NULL, 0);
+ struct lov_comp_md_v1 *lcm;
+ struct thandle *handle;
+ int flr_state;
+ int rc;
+ ENTRY;
+
+ if (mlc->mlc_opc != MD_LAYOUT_WRITE)
+ RETURN(-ENOTSUPP);
+
+ handle = mdd_trans_create(env, mdd);
+ if (IS_ERR(handle))
+ RETURN(PTR_ERR(handle));
+
+ rc = mdd_get_lov_ea(env, obj, buf);
+ if (rc < 0) {
+ if (rc == -ENODATA)
+ rc = -EINVAL;
+ GOTO(out, rc);
+ }
+
+ /* analyze the layout to make sure it's a FLR file */
+ lcm = buf->lb_buf;
+ if (le32_to_cpu(lcm->lcm_magic) != LOV_MAGIC_COMP_V1)
+ GOTO(out, rc = -EINVAL);
+
+ flr_state = le16_to_cpu(lcm->lcm_flags) & LCM_FL_FLR_MASK;
+
+ /* please refer to HLD of FLR for state transition */
+ switch (flr_state) {
+ case LCM_FL_NOT_FLR:
+ rc = mdd_layout_instantiate_component(env, obj, mlc, handle);
+ break;
+ case LCM_FL_WRITE_PENDING:
+ rc = mdd_layout_update_write_pending(env, obj, mlc, handle);
+ break;
+ case LCM_FL_RDONLY:
+ rc = mdd_layout_update_rdonly(env, obj, mlc, handle);
+ break;
+ case LCM_FL_SYNC_PENDING:
+ default:
+ rc = 0;
+ break;
+ }
+ EXIT;
+
+out:
+ mdd_trans_stop(env, mdd, rc, handle);
+ lu_buf_free(buf);
+ return rc;
}
void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
(struct llog_setattr64_rec_v2 *)rec;
__swab32s(&lsr2->lsr_projid);
+ __swab32s(&lsr2->lsr_layout_version);
tail = &lsr2->lsr_tail;
} else {
tail = &lsr->lsr_tail;
struct ost_body *repbody;
struct ldlm_resource *res;
struct ofd_object *fo;
- struct filter_fid *ff = NULL;
int rc = 0;
ENTRY;
la_from_obdo(&fti->fti_attr, &body->oa, body->oa.o_valid);
fti->fti_attr.la_valid &= ~LA_TYPE;
- if (body->oa.o_valid & OBD_MD_FLFID) {
- ff = &fti->fti_mds_fid;
- ofd_prepare_fidea(ff, &body->oa);
- }
-
/* setting objects attributes (including owner/group) */
- rc = ofd_attr_set(tsi->tsi_env, fo, &fti->fti_attr, ff);
+ rc = ofd_attr_set(tsi->tsi_env, fo, &fti->fti_attr, &body->oa);
if (rc != 0)
GOTO(out_put, rc);
struct ldlm_namespace *ns = tsi->tsi_tgt->lut_obd->obd_namespace;
struct ldlm_resource *res;
struct ofd_object *fo;
- struct filter_fid *ff = NULL;
__u64 flags = 0;
struct lustre_handle lh = { 0, };
int rc;
info->fti_attr.la_size = start;
info->fti_attr.la_valid |= LA_SIZE;
- if (oa->o_valid & OBD_MD_FLFID) {
- ff = &info->fti_mds_fid;
- ofd_prepare_fidea(ff, oa);
- }
-
rc = ofd_object_punch(tsi->tsi_env, fo, start, end, &info->fti_attr,
- ff, (struct obdo *)oa);
+ (struct obdo *)oa);
if (rc)
GOTO(out_put, rc);
int ofd_stop_inconsistency_verification_thread(struct ofd_device *ofd);
int ofd_verify_ff(const struct lu_env *env, struct ofd_object *fo,
struct obdo *oa);
+int ofd_verify_layout_version(const struct lu_env *env,
+ struct ofd_object *fo, const struct obdo *oa);
int ofd_preprw(const struct lu_env *env,int cmd, struct obd_export *exp,
struct obdo *oa, int objcount, struct obd_ioobj *obj,
struct niobuf_remote *rnb, int *nr_local,
struct ofd_device *ofd,
const struct lu_fid *fid);
int ofd_object_ff_load(const struct lu_env *env, struct ofd_object *fo);
+int ofd_object_ff_update(const struct lu_env *env, struct ofd_object *fo,
+ const struct obdo *oa, struct filter_fid *ff);
int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
u64 id, struct ofd_seq *oseq, int nr, int sync);
dt_object_put(env, &fo->ofo_obj);
}
int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo,
- struct lu_attr *la, struct filter_fid *ff);
+ struct lu_attr *la, struct obdo *oa);
int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo,
__u64 start, __u64 end, struct lu_attr *la,
- struct filter_fid *ff, struct obdo *oa);
+ struct obdo *oa);
int ofd_destroy(const struct lu_env *, struct ofd_object *, int);
int ofd_attr_get(const struct lu_env *env, struct ofd_object *fo,
struct lu_attr *la);
ofd->ofd_lut.lut_sync_lock_cancel = ALWAYS_SYNC_ON_CANCEL;
}
-static inline void ofd_prepare_fidea(struct filter_fid *ff,
- const struct obdo *oa)
-{
- /* packing fid and converting it to LE for storing into EA.
- * Here ->o_stripe_idx should be filled by LOV and rest of
- * fields - by client. */
- ff->ff_parent.f_seq = cpu_to_le64(oa->o_parent_seq);
- ff->ff_parent.f_oid = cpu_to_le32(oa->o_parent_oid);
- /* XXX: we are ignoring o_parent_ver here, since this should
- * be the same for all objects in this fileset. */
- ff->ff_parent.f_ver = cpu_to_le32(oa->o_stripe_idx);
- if (oa->o_valid & OBD_MD_FLOSTLAYOUT)
- ost_layout_cpu_to_le(&ff->ff_layout, &oa->o_layout);
- else
- memset(&ff->ff_layout, 0, sizeof(ff->ff_layout));
-}
-
static inline int ofd_validate_seq(struct obd_export *exp, __u64 seq)
{
struct filter_export_data *fed = &exp->exp_filter_data;
}
/**
+ * FLR: verify the layout version of object.
+ *
+ * \param[in] env execution environment
+ * \param[in] fo OFD object
+ * \param[in] oa OBDO structure with layout version
+ *
+ * \retval 0 on successful verification
+ * \retval -EINPROGRESS layout version is in transfer
+ * \retval -ESTALE the layout version on client is stale
+ */
+int ofd_verify_layout_version(const struct lu_env *env,
+ struct ofd_object *fo, const struct obdo *oa)
+{
+ int rc;
+ ENTRY;
+
+ rc = ofd_object_ff_load(env, fo);
+ if (rc < 0) {
+ if (rc == -ENODATA)
+ rc = -EINPROGRESS;
+ GOTO(out, rc);
+ }
+
+ /* this update is not legitimate */
+ if (oa->o_layout_version < fo->ofo_ff.ff_layout_version)
+ GOTO(out, rc = -ESTALE);
+
+ /* layout version is not transmitted yet */
+ if (oa->o_layout_version >
+ fo->ofo_ff.ff_layout_version + fo->ofo_ff.ff_range)
+ GOTO(out, rc = -EINPROGRESS);
+
+ EXIT;
+
+out:
+ CDEBUG(D_INODE, DFID " verify layout version: %u vs. %u, rc: %d\n",
+ PFID(lu_object_fid(&fo->ofo_obj.do_lu)),
+ fo->ofo_ff.ff_layout_version, oa->o_layout_version, rc);
+ return rc;
+
+}
+
+/**
* Prepare buffers for read request processing.
*
* This function converts remote buffers from client to local buffers
}
}
+ /* need to verify layout version */
+ if (oa->o_valid & OBD_MD_LAYOUT_VERSION) {
+ rc = ofd_verify_layout_version(env, fo, oa);
+ if (rc) {
+ ofd_read_unlock(env, fo);
+ ofd_object_put(env, fo);
+ GOTO(out, rc);
+ }
+
+ oa->o_valid &= ~OBD_MD_LAYOUT_VERSION;
+ }
+
/* Process incoming grant info, set OBD_BRW_GRANTED flag and grant some
* space back if possible */
tgt_grant_prepare_write(env, exp, oa, rnb, obj->ioo_bufcnt);
* \param[in] ofd OFD device
* \param[in] ofd_obj OFD object
* \param[in] la object attributes
- * \param[in] ff parent FID
+ * \param[in] oa obdo
*
* \retval 0 on successful attributes update
* \retval negative value on error
static int
ofd_write_attr_set(const struct lu_env *env, struct ofd_device *ofd,
struct ofd_object *ofd_obj, struct lu_attr *la,
- struct filter_fid *ff)
+ struct obdo *oa)
{
struct ofd_thread_info *info = ofd_info(env);
+ struct filter_fid *ff = &info->fti_mds_fid;
__u64 valid = la->la_valid;
- int rc;
struct thandle *th;
struct dt_object *dt_obj;
- int ff_needed = 0;
+ int fl = 0;
+ int rc;
ENTRY;
if (rc != 0)
GOTO(out, rc);
- if (ff != NULL) {
- rc = ofd_object_ff_load(env, ofd_obj);
- if (rc == -ENODATA)
- ff_needed = 1;
- else if (rc < 0)
- GOTO(out, rc);
- }
+ fl = ofd_object_ff_update(env, ofd_obj, oa, ff);
+ if (fl < 0)
+ GOTO(out, rc = fl);
- if (!la->la_valid && !ff_needed)
+ if (!la->la_valid && !fl)
/* no attributes to set */
GOTO(out, rc = 0);
GOTO(out_tx, rc);
}
- if (ff_needed) {
+ if (fl) {
if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
le32_add_cpu(&ff->ff_parent.f_oid, -1);
- info->fti_buf.lb_buf = ff;
- info->fti_buf.lb_len = sizeof(*ff);
rc = dt_declare_xattr_set(env, dt_obj, &info->fti_buf,
XATTR_NAME_FID, 0, th);
if (rc)
GOTO(out_tx, rc);
}
- /* set filter fid EA */
- if (ff_needed) {
+ /* set filter fid EA.
+ * FIXME: it holds read lock of ofd object to modify the XATTR_NAME_FID
+ * while the write lock should be held. However, it should work because
+ * write RPCs only modify ff_{parent,layout} and those information will
+ * be the same from all the write RPCs. The reason that fl is not used
+ * in dt_xattr_set() is to allow this race. */
+ if (fl) {
if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
GOTO(out_tx, rc);
+ info->fti_buf.lb_buf = ff;
+ info->fti_buf.lb_len = sizeof(*ff);
rc = dt_xattr_set(env, dt_obj, &info->fti_buf, XATTR_NAME_FID,
0, th);
- if (!rc)
+ if (rc == 0)
filter_fid_le_to_cpu(&ofd_obj->ofo_ff, ff, sizeof(*ff));
}
static int
ofd_commitrw_write(const struct lu_env *env, struct obd_export *exp,
struct ofd_device *ofd, const struct lu_fid *fid,
- struct lu_attr *la, struct filter_fid *ff, int objcount,
+ struct lu_attr *la, struct obdo *oa, int objcount,
int niocount, struct niobuf_local *lnb,
unsigned long granted, int old_rc)
{
* dt_declare_write_commit() since quota enforcement is now handled in
* declare phases.
*/
- rc = ofd_write_attr_set(env, ofd, fo, la, ff);
+ rc = ofd_write_attr_set(env, ofd, fo, la, oa);
if (rc)
GOTO(out, rc);
struct ofd_mod_data *fmd;
__u64 valid;
struct ofd_device *ofd = ofd_exp(exp);
- struct filter_fid *ff = NULL;
const struct lu_fid *fid = &oa->o_oi.oi_fid;
int rc = 0;
ofd_fmd_put(exp, fmd);
la_from_obdo(&info->fti_attr, oa, valid);
- if (oa->o_valid & OBD_MD_FLFID) {
- ff = &info->fti_mds_fid;
- ofd_prepare_fidea(ff, oa);
- }
-
rc = ofd_commitrw_write(env, exp, ofd, fid, &info->fti_attr,
- ff, objcount, npages, lnb,
+ oa, objcount, npages, lnb,
oa->o_grant_used, old_rc);
if (rc == 0)
obdo_from_la(oa, &info->fti_attr,
struct ldlm_resource *res;
struct ofd_object *fo;
struct lu_fid *fid = &oa->o_oi.oi_fid;
- struct filter_fid *ff = NULL;
int rc = 0;
ENTRY;
la_from_obdo(&info->fti_attr, oa, oa->o_valid);
info->fti_attr.la_valid &= ~LA_TYPE;
- if (oa->o_valid & OBD_MD_FLFID) {
- ff = &info->fti_mds_fid;
- ofd_prepare_fidea(ff, oa);
- }
-
/* setting objects attributes (including owner/group) */
- rc = ofd_attr_set(env, fo, &info->fti_attr, ff);
+ rc = ofd_attr_set(env, fo, &info->fti_attr, oa);
if (rc)
GOTO(out_unlock, rc);
if (unlikely(rc < sizeof(struct lu_fid))) {
fid_zero(&ff->ff_parent);
-
- return -ENODATA;
+ return -EINVAL;
}
filter_fid_le_to_cpu(ff, ff, rc);
}
/**
+ * Check if it needs to update filter_fid by the value of @oa.
+ *
+ * \param[in] env env
+ * \param[in] fo ofd object
+ * \param[in] oa obdo from client or MDT
+ * \param[out] ff if filter_fid needs updating, this field is used to
+ * return the new buffer
+ *
+ * \retval < 0 error occurred
+ * \retval 0 doesn't need to update filter_fid
+ * \retval FL_XATTR_{CREATE,REPLACE} flag for xattr update
+ */
+int ofd_object_ff_update(const struct lu_env *env, struct ofd_object *fo,
+ const struct obdo *oa, struct filter_fid *ff)
+{
+ int rc = 0;
+ ENTRY;
+
+ if (!(oa->o_valid &
+ (OBD_MD_FLFID | OBD_MD_FLOSTLAYOUT | OBD_MD_LAYOUT_VERSION)))
+ RETURN(0);
+
+ rc = ofd_object_ff_load(env, fo);
+ if (rc < 0 && rc != -ENODATA)
+ RETURN(rc);
+
+ LASSERT(ff != &fo->ofo_ff);
+ if (rc == -ENODATA) {
+ rc = LU_XATTR_CREATE;
+ memset(ff, 0, sizeof(*ff));
+ } else {
+ rc = LU_XATTR_REPLACE;
+ memcpy(ff, &fo->ofo_ff, sizeof(*ff));
+ }
+
+ if (oa->o_valid & OBD_MD_FLFID) {
+ /* packing fid and converting it to LE for storing into EA.
+ * Here ->o_stripe_idx should be filled by LOV and rest of
+ * fields - by client. */
+ ff->ff_parent.f_seq = oa->o_parent_seq;
+ ff->ff_parent.f_oid = oa->o_parent_oid;
+ /* XXX: we are ignoring o_parent_ver here, since this should
+ * be the same for all objects in this fileset. */
+ ff->ff_parent.f_ver = oa->o_stripe_idx;
+ }
+ if (oa->o_valid & OBD_MD_FLOSTLAYOUT)
+ ff->ff_layout = oa->o_layout;
+
+ if (oa->o_valid & OBD_MD_LAYOUT_VERSION) {
+ CDEBUG(D_INODE, DFID": OST("DFID") layout version %u -> %u\n",
+ PFID(&fo->ofo_ff.ff_parent),
+ PFID(lu_object_fid(&fo->ofo_obj.do_lu)),
+ ff->ff_layout_version, oa->o_layout_version);
+
+ /* only the MDS has the authority to update layout version */
+ if (!(exp_connect_flags(ofd_info(env)->fti_exp) &
+ OBD_CONNECT_MDS)) {
+ CERROR(DFID": update layout version from client\n",
+ PFID(&fo->ofo_ff.ff_parent));
+
+ RETURN(-EPERM);
+ }
+
+ /* it's not allowed to change it to a smaller value */
+ if (oa->o_layout_version < ff->ff_layout_version)
+ RETURN(-EINVAL);
+
+ if (ff->ff_layout_version == 0) {
+ ff->ff_layout_version = oa->o_layout_version;
+ ff->ff_range = 0;
+ } else if (oa->o_layout_version > ff->ff_layout_version) {
+ ff->ff_range = MAX(ff->ff_range,
+ oa->o_layout_version - ff->ff_layout_version);
+ }
+ }
+
+ if (memcmp(ff, &fo->ofo_ff, sizeof(*ff)))
+ filter_fid_cpu_to_le(ff, ff, sizeof(*ff));
+ else /* no change */
+ rc = 0;
+
+ RETURN(rc);
+}
+
+/**
* Set OFD object attributes.
*
* This function sets OFD object attributes taken from incoming request.
* \param[in] env execution environment
* \param[in] fo OFD object
* \param[in] la object attributes
- * \param[in] ff filter_fid structure, contains additional attributes
+ * \param[in] oa obdo carries fid, ost_layout, layout version
*
* \retval 0 if successful
* \retval negative value on error
*/
int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo,
- struct lu_attr *la, struct filter_fid *ff)
+ struct lu_attr *la, struct obdo *oa)
{
struct ofd_thread_info *info = ofd_info(env);
struct ofd_device *ofd = ofd_obj2dev(fo);
+ struct filter_fid *ff = &info->fti_mds_fid;
struct thandle *th;
struct ofd_mod_data *fmd;
- int ff_needed = 0;
+ int fl;
int rc;
int rc2;
ENTRY;
if (rc != 0)
GOTO(unlock, rc);
- if (ff != NULL) {
- rc = ofd_object_ff_load(env, fo);
- if (rc == -ENODATA)
- ff_needed = 1;
- else if (rc < 0)
- GOTO(unlock, rc);
- }
+ fl = ofd_object_ff_update(env, fo, oa, ff);
+ if (fl < 0)
+ GOTO(unlock, rc = fl);
th = ofd_trans_create(env, ofd);
if (IS_ERR(th))
if (rc)
GOTO(stop, rc);
- if (ff_needed) {
+ if (fl) {
if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
info->fti_buf.lb_buf = ff;
info->fti_buf.lb_len = sizeof(*ff);
rc = dt_declare_xattr_set(env, ofd_object_child(fo),
- &info->fti_buf, XATTR_NAME_FID, 0,
+ &info->fti_buf, XATTR_NAME_FID, fl,
th);
if (rc)
GOTO(stop, rc);
if (rc)
GOTO(stop, rc);
- if (ff_needed) {
+ if (fl) {
if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
GOTO(stop, rc);
+ info->fti_buf.lb_buf = ff;
+ info->fti_buf.lb_len = sizeof(*ff);
rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
- XATTR_NAME_FID, 0, th);
+ XATTR_NAME_FID, fl, th);
if (!rc)
filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
}
* \param[in] start start offset to punch from
* \param[in] end end of punch
* \param[in] la object attributes
- * \param[in] ff filter_fid structure
* \param[in] oa obdo struct from incoming request
*
* \retval 0 if successful
*/
int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo,
__u64 start, __u64 end, struct lu_attr *la,
- struct filter_fid *ff, struct obdo *oa)
+ struct obdo *oa)
{
struct ofd_thread_info *info = ofd_info(env);
struct ofd_device *ofd = ofd_obj2dev(fo);
struct ofd_mod_data *fmd;
struct dt_object *dob = ofd_object_child(fo);
+ struct filter_fid *ff = &info->fti_mds_fid;
struct thandle *th;
- int ff_needed = 0;
+ int fl;
int rc;
int rc2;
GOTO(unlock, rc);
}
+ /* need to verify layout version */
+ if (oa->o_valid & OBD_MD_LAYOUT_VERSION) {
+ rc = ofd_verify_layout_version(env, fo, oa);
+ if (rc)
+ GOTO(unlock, rc);
+
+ oa->o_valid &= ~OBD_MD_LAYOUT_VERSION;
+ }
+
/* VBR: version recovery check */
rc = ofd_version_get_check(info, fo);
if (rc)
if (rc != 0)
GOTO(unlock, rc);
- if (ff != NULL) {
- rc = ofd_object_ff_load(env, fo);
- if (rc == -ENODATA)
- ff_needed = 1;
- else if (rc < 0)
- GOTO(unlock, rc);
- }
+ fl = ofd_object_ff_update(env, fo, oa, ff);
+ if (fl < 0)
+ GOTO(unlock, rc = fl);
th = ofd_trans_create(env, ofd);
if (IS_ERR(th))
if (rc)
GOTO(stop, rc);
- if (ff_needed) {
+ if (fl) {
if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
info->fti_buf.lb_buf = ff;
info->fti_buf.lb_len = sizeof(*ff);
rc = dt_declare_xattr_set(env, ofd_object_child(fo),
- &info->fti_buf, XATTR_NAME_FID, 0,
+ &info->fti_buf, XATTR_NAME_FID, fl,
th);
if (rc)
GOTO(stop, rc);
if (rc)
GOTO(stop, rc);
- if (ff_needed) {
+ if (fl) {
if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
GOTO(stop, rc);
rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
- XATTR_NAME_FID, 0, th);
+ XATTR_NAME_FID, fl, th);
if (!rc)
filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
}
++ext->oe_nr_pages;
list_add_tail(&oap->oap_pending_item, &ext->oe_pages);
osc_object_unlock(osc);
+
+ if (!ext->oe_layout_version)
+ ext->oe_layout_version = io->ci_layout_version;
}
RETURN(rc);
RETURN(rc);
}
-int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj,
- struct list_head *list, int brw_flags)
+int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io,
+ struct osc_object *obj, struct list_head *list,
+ int brw_flags)
{
struct client_obd *cli = osc_cli(obj);
struct osc_extent *ext;
ext->oe_nr_pages = page_count;
ext->oe_mppr = mppr;
list_splice_init(list, &ext->oe_pages);
+ ext->oe_layout_version = io->ci_layout_version;
osc_object_lock(obj);
/* Reuse the initial refcount for RPC, don't drop it */
if (++queued == max_pages) {
queued = 0;
- result = osc_queue_sync_pages(env, osc, &list,
+ result = osc_queue_sync_pages(env, io, osc, &list,
brw_flags);
if (result < 0)
break;
}
if (queued > 0)
- result = osc_queue_sync_pages(env, osc, &list, brw_flags);
+ result = osc_queue_sync_pages(env, io, osc, &list, brw_flags);
/* Update c/mtime for sync write. LU-7310 */
if (crt == CRT_WRITE && qout->pl_nr > 0 && result == 0) {
oa->o_flags = OBD_FL_SRVLOCK;
oa->o_valid |= OBD_MD_FLFLAGS;
}
+
+ if (io->ci_layout_version > 0) {
+ /* verify layout version */
+ oa->o_valid |= OBD_MD_LAYOUT_VERSION;
+ oa->o_layout_version = io->ci_layout_version;
+ }
} else {
LASSERT(oio->oi_lockless == 0);
}
int i;
int grant = 0;
int rc;
+ __u32 layout_version = 0;
struct list_head rpc_list = LIST_HEAD_INIT(rpc_list);
struct ost_body *body;
ENTRY;
mem_tight |= ext->oe_memalloc;
grant += ext->oe_grants;
page_count += ext->oe_nr_pages;
+ layout_version = MAX(layout_version, ext->oe_layout_version);
if (obj == NULL)
obj = ext->oe_obj;
}
crattr->cra_oa = oa;
cl_req_attr_set(env, osc2cl(obj), crattr);
- if (cmd == OBD_BRW_WRITE)
+ if (cmd == OBD_BRW_WRITE) {
oa->o_grant_used = grant;
+ if (layout_version > 0) {
+ CDEBUG(D_LAYOUT, DFID": write with layout version %u\n",
+ PFID(&oa->o_oi.oi_fid), layout_version);
+
+ oa->o_layout_version = layout_version;
+ oa->o_valid |= OBD_MD_LAYOUT_VERSION;
+ }
+ }
sort_brw_pages(pga, page_count);
rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0);
RETURN(rc);
}
- if (!(attr->la_valid & (LA_UID | LA_GID | LA_PROJID)))
+ if (!(attr->la_valid & LA_REMOTE_ATTR_SET))
RETURN(0);
- /* track all UID/GID changes via llog */
+ /* track all UID/GID, projid, and layout version changes via llog */
rc = osp_sync_declare_add(env, o, MDS_SETATTR64_REC, th);
return 0;
int rc = 0;
ENTRY;
- /* we're interested in uid/gid/projid changes only */
- if (!(attr->la_valid & (LA_UID | LA_GID | LA_PROJID)))
+ /* we're interested in uid/gid/projid/layout version changes only */
+ if (!(attr->la_valid & LA_REMOTE_ATTR_SET))
RETURN(0);
if (!is_only_remote_trans(th)) {
LASSERT(attr);
osi->osi_setattr.lsr_uid = attr->la_uid;
osi->osi_setattr.lsr_gid = attr->la_gid;
+ osi->osi_setattr.lsr_layout_version = attr->la_layout_version;
osi->osi_setattr.lsr_projid = attr->la_projid;
osi->osi_setattr.lsr_valid =
((attr->la_valid & LA_UID) ? OBD_MD_FLUID : 0) |
((attr->la_valid & LA_GID) ? OBD_MD_FLGID : 0) |
((attr->la_valid & LA_PROJID) ? OBD_MD_FLPROJID : 0);
+ if (attr->la_valid & LA_LAYOUT_VERSION)
+ osi->osi_setattr.lsr_valid |= OBD_MD_LAYOUT_VERSION;
break;
default:
LBUG();
/* lsr_valid can only be 0 or HAVE OBD_MD_{FLUID, FLGID, FLPROJID} set,
* so no bits other than these should be set. */
if ((rec->lsr_valid & ~(OBD_MD_FLUID | OBD_MD_FLGID |
- OBD_MD_FLPROJID)) != 0) {
+ OBD_MD_FLPROJID | OBD_MD_LAYOUT_VERSION)) != 0) {
CERROR("%s: invalid setattr record, lsr_valid:%llu\n",
d->opd_obd->obd_name, rec->lsr_valid);
/* return 1 on invalid record */
body->oa.o_uid = rec->lsr_uid;
body->oa.o_gid = rec->lsr_gid;
body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID;
- if (h->lrh_len > sizeof(struct llog_setattr64_rec))
- body->oa.o_projid = ((struct llog_setattr64_rec_v2 *)
- rec)->lsr_projid;
+ if (h->lrh_len > sizeof(struct llog_setattr64_rec)) {
+ struct llog_setattr64_rec_v2 *rec_v2 = (typeof(rec_v2))rec;
+ body->oa.o_projid = rec_v2->lsr_projid;
+ body->oa.o_layout_version = rec_v2->lsr_layout_version;
+ }
/* old setattr record (prior 2.6.0) doesn't have 'valid' stored,
* we assume that both UID and GID are valid in that case. */
else
body->oa.o_valid |= rec->lsr_valid;
+ if (body->oa.o_valid & OBD_MD_LAYOUT_VERSION) {
+ OBD_FAIL_TIMEOUT(OBD_FAIL_FLR_LV_DELAY, cfs_fail_val);
+ if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_FLR_LV_INC)))
+ ++body->oa.o_layout_version;
+ }
+
osp_sync_send_new_rpc(d, llh, h, req);
RETURN(0);
}
__swab32s(&o->o_stripe_idx);
__swab32s(&o->o_parent_ver);
lustre_swab_ost_layout(&o->o_layout);
- CLASSERT(offsetof(typeof(*o), o_padding_3) != 0);
+ __swab32s(&o->o_layout_version);
__swab32s(&o->o_uid_h);
__swab32s(&o->o_gid_h);
__swab64s(&o->o_data_version);
(long long)(int)offsetof(struct obdo, o_layout));
LASSERTF((int)sizeof(((struct obdo *)0)->o_layout) == 28, "found %lld\n",
(long long)(int)sizeof(((struct obdo *)0)->o_layout));
- LASSERTF((int)offsetof(struct obdo, o_padding_3) == 164, "found %lld\n",
- (long long)(int)offsetof(struct obdo, o_padding_3));
- LASSERTF((int)sizeof(((struct obdo *)0)->o_padding_3) == 4, "found %lld\n",
- (long long)(int)sizeof(((struct obdo *)0)->o_padding_3));
+ LASSERTF((int)offsetof(struct obdo, o_layout_version) == 164, "found %lld\n",
+ (long long)(int)offsetof(struct obdo, o_layout_version));
+ LASSERTF((int)sizeof(((struct obdo *)0)->o_layout_version) == 4, "found %lld\n",
+ (long long)(int)sizeof(((struct obdo *)0)->o_layout_version));
LASSERTF((int)offsetof(struct obdo, o_uid_h) == 168, "found %lld\n",
(long long)(int)offsetof(struct obdo, o_uid_h));
LASSERTF((int)sizeof(((struct obdo *)0)->o_uid_h) == 4, "found %lld\n",
CHECK_MEMBER(obdo, o_parent_ver);
CHECK_MEMBER(obdo, o_handle);
CHECK_MEMBER(obdo, o_layout);
- CHECK_MEMBER(obdo, o_padding_3);
+ CHECK_MEMBER(obdo, o_layout_version);
CHECK_MEMBER(obdo, o_uid_h);
CHECK_MEMBER(obdo, o_gid_h);
CHECK_MEMBER(obdo, o_data_version);
(long long)(int)offsetof(struct obdo, o_layout));
LASSERTF((int)sizeof(((struct obdo *)0)->o_layout) == 28, "found %lld\n",
(long long)(int)sizeof(((struct obdo *)0)->o_layout));
- LASSERTF((int)offsetof(struct obdo, o_padding_3) == 164, "found %lld\n",
- (long long)(int)offsetof(struct obdo, o_padding_3));
- LASSERTF((int)sizeof(((struct obdo *)0)->o_padding_3) == 4, "found %lld\n",
- (long long)(int)sizeof(((struct obdo *)0)->o_padding_3));
+ LASSERTF((int)offsetof(struct obdo, o_layout_version) == 164, "found %lld\n",
+ (long long)(int)offsetof(struct obdo, o_layout_version));
+ LASSERTF((int)sizeof(((struct obdo *)0)->o_layout_version) == 4, "found %lld\n",
+ (long long)(int)sizeof(((struct obdo *)0)->o_layout_version));
LASSERTF((int)offsetof(struct obdo, o_uid_h) == 168, "found %lld\n",
(long long)(int)offsetof(struct obdo, o_uid_h));
LASSERTF((int)sizeof(((struct obdo *)0)->o_uid_h) == 4, "found %lld\n",