From 10da8afb278634a40be72f48dae42ce9755c62a0 Mon Sep 17 00:00:00 2001 From: Jinshan Xiong Date: Mon, 13 Nov 2017 23:34:14 +0000 Subject: [PATCH] LU-9771 flr: Send write intent RPC to mdt When a mirrored file is going to be written, the client needs to send a write intent RPC to the MDT. The MDT will pick a mirror as primary and mark the others as stale. The new md operation moo_layout_change() is introduced for this purpose. The MDT also transfers the latest layout version to the OST objects via do_attr_set(). Once OSTs receive the setattr RPC for layout version change, it will set the update layout version into extended attribute XATTR_NAME_FID. Test-Parameters: testlist=sanity-flr Signed-off-by: Jinshan Xiong Change-Id: Ib0049a78b95895141b0032e8eff526a73a160dcb Reviewed-on: https://review.whamcloud.com/29091 Tested-by: Jenkins Reviewed-by: Lai Siyao Tested-by: Maloo Reviewed-by: Bobi Jam --- lustre/include/cl_object.h | 8 +- lustre/include/lu_object.h | 7 + lustre/include/lustre_fid.h | 14 +- lustre/include/lustre_osc.h | 7 +- lustre/include/obd.h | 5 +- lustre/include/obd_support.h | 2 + lustre/include/uapi/linux/lustre/lustre_idl.h | 7 +- lustre/include/uapi/linux/lustre/lustre_user.h | 12 +- lustre/llite/vvp_object.c | 7 + lustre/lod/lod_dev.c | 4 + lustre/lod/lod_internal.h | 30 +- lustre/lod/lod_object.c | 502 ++++++++++++++++++++++--- lustre/lod/lod_qos.c | 8 +- lustre/lov/lov_cl_internal.h | 3 + lustre/lov/lov_io.c | 114 +++++- lustre/lov/lov_object.c | 4 +- lustre/lov/lov_page.c | 5 +- lustre/mdd/mdd_object.c | 187 ++++++++- lustre/obdclass/llog_swab.c | 1 + lustre/ofd/ofd_dev.c | 16 +- lustre/ofd/ofd_internal.h | 25 +- lustre/ofd/ofd_io.c | 105 ++++-- lustre/ofd/ofd_obd.c | 8 +- lustre/ofd/ofd_objects.c | 148 ++++++-- lustre/osc/osc_cache.c | 9 +- lustre/osc/osc_io.c | 10 +- lustre/osc/osc_request.c | 12 +- lustre/osp/osp_object.c | 8 +- lustre/osp/osp_sync.c | 19 +- lustre/ptlrpc/pack_generic.c | 2 +- lustre/ptlrpc/wiretest.c | 8 +- lustre/utils/wirecheck.c | 2 +- lustre/utils/wiretest.c | 8 +- 33 files changed, 1075 insertions(+), 232 deletions(-) diff --git a/lustre/include/cl_object.h b/lustre/include/cl_object.h index d8ed065..61338cd 100644 --- a/lustre/include/cl_object.h +++ b/lustre/include/cl_object.h @@ -1796,6 +1796,8 @@ struct cl_io { struct cl_lockset ci_lockset; /** lock requirements, this is just a help info for sublayers. */ enum cl_io_lock_dmd ci_lockreq; + /** layout version when this IO occurs */ + __u32 ci_layout_version; union { struct cl_rw_io { struct iov_iter rw_iter; @@ -1871,8 +1873,10 @@ struct cl_io { */ ci_ignore_layout:1, /** - * Need MDS intervention to complete a write. This usually means the - * corresponding component is not initialized for the writing extent. + * Need MDS intervention to complete a write. + * Write intent is required for the following cases: + * 1. component being written is not initialized, or + * 2. the mirrored files are NOT in WRITE_PENDING state. */ ci_need_write_intent:1, /** diff --git a/lustre/include/lu_object.h b/lustre/include/lu_object.h index 50bad11..e101c7f 100644 --- a/lustre/include/lu_object.h +++ b/lustre/include/lu_object.h @@ -426,6 +426,8 @@ struct lu_attr { __u32 la_rdev; /** project id */ __u32 la_projid; + /** set layout version to OST objects. */ + __u32 la_layout_version; }; /** Bit-mask of valid attributes */ @@ -446,6 +448,11 @@ enum la_valid { LA_KILL_SUID = 1 << 13, LA_KILL_SGID = 1 << 14, LA_PROJID = 1 << 15, + LA_LAYOUT_VERSION = 1 << 16, + /** + * Attributes must be transmitted to OST objects + */ + LA_REMOTE_ATTR_SET = (LA_UID | LA_GID | LA_PROJID | LA_LAYOUT_VERSION) }; /** diff --git a/lustre/include/lustre_fid.h b/lustre/include/lustre_fid.h index 402cbef..e34ac94 100644 --- a/lustre/include/lustre_fid.h +++ b/lustre/include/lustre_fid.h @@ -351,10 +351,13 @@ static inline void filter_fid_cpu_to_le(struct filter_fid *dst, { fid_cpu_to_le(&dst->ff_parent, &src->ff_parent); - if (size < sizeof(struct filter_fid)) + if (size < sizeof(struct filter_fid)) { memset(&dst->ff_layout, 0, sizeof(dst->ff_layout)); - else + } else { ost_layout_cpu_to_le(&dst->ff_layout, &src->ff_layout); + dst->ff_layout_version = cpu_to_le32(src->ff_layout_version); + dst->ff_range = cpu_to_le32(src->ff_range); + } /* XXX: Add more if filter_fid is enlarged in the future. */ } @@ -364,10 +367,13 @@ static inline void filter_fid_le_to_cpu(struct filter_fid *dst, { fid_le_to_cpu(&dst->ff_parent, &src->ff_parent); - if (size < sizeof(struct filter_fid)) + if (size < sizeof(struct filter_fid)) { memset(&dst->ff_layout, 0, sizeof(dst->ff_layout)); - else + } else { ost_layout_le_to_cpu(&dst->ff_layout, &src->ff_layout); + dst->ff_layout_version = le32_to_cpu(src->ff_layout_version); + dst->ff_range = le32_to_cpu(src->ff_range); + } /* XXX: Add more if filter_fid is enlarged in the future. */ } diff --git a/lustre/include/lustre_osc.h b/lustre/include/lustre_osc.h index 0bb766c..734566d 100644 --- a/lustre/include/lustre_osc.h +++ b/lustre/include/lustre_osc.h @@ -594,8 +594,9 @@ int osc_teardown_async_page(const struct lu_env *env, struct osc_object *obj, struct osc_page *ops); int osc_flush_async_page(const struct lu_env *env, struct cl_io *io, struct osc_page *ops); -int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj, - struct list_head *list, int brw_flags); +int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io, + struct osc_object *obj, struct list_head *list, + int brw_flags); int osc_cache_truncate_start(const struct lu_env *env, struct osc_object *obj, __u64 size, struct osc_extent **extp); void osc_cache_truncate_end(const struct lu_env *env, struct osc_extent *ext); @@ -965,6 +966,8 @@ struct osc_extent { int oe_rc; /** max pages per rpc when this extent was created */ unsigned int oe_mppr; + /** FLR: layout version when this osc_extent is publised */ + __u32 oe_layout_version; }; /** @} osc */ diff --git a/lustre/include/obd.h b/lustre/include/obd.h index 9497830..528b7d0 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -771,9 +771,10 @@ static inline int it_to_lock_mode(struct lookup_intent *it) /* CREAT needs to be tested before open (both could be set) */ if (it->it_op & IT_CREAT) return LCK_CW; - else if (it->it_op & (IT_GETATTR | IT_OPEN | IT_LOOKUP | - IT_LAYOUT)) + else if (it->it_op & (IT_GETATTR | IT_OPEN | IT_LOOKUP)) return LCK_CR; + else if (it->it_op & IT_LAYOUT) + return (it->it_flags & FMODE_WRITE) ? LCK_EX : LCK_CR; else if (it->it_op & IT_READDIR) return LCK_PR; else if (it->it_op & IT_GETXATTR) diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index ee48781..1158ec5 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -610,6 +610,8 @@ extern char obd_jobid_var[]; /* FLR */ #define OBD_FAIL_FLR_GLIMPSE_IMMUTABLE 0x1A00 +#define OBD_FAIL_FLR_LV_DELAY 0x1A01 +#define OBD_FAIL_FLR_LV_INC 0x1A02 /* DT */ #define OBD_FAIL_DT_DECLARE_ATTR_GET 0x2000 diff --git a/lustre/include/uapi/linux/lustre/lustre_idl.h b/lustre/include/uapi/linux/lustre/lustre_idl.h index d8aa186..23b13bd 100644 --- a/lustre/include/uapi/linux/lustre/lustre_idl.h +++ b/lustre/include/uapi/linux/lustre/lustre_idl.h @@ -1174,7 +1174,8 @@ lov_mds_md_max_stripe_count(size_t buf_size, __u32 lmm_magic) #define OBD_MD_DOM_SIZE (0X00001000ULL) /* Data-on-MDT component size */ #define OBD_MD_FLNLINK (0x00002000ULL) /* link count */ #define OBD_MD_FLGENER (0x00004000ULL) /* generation number */ -/*#define OBD_MD_FLINLINE (0x00008000ULL) inline data. used until 1.6.5 */ +#define OBD_MD_LAYOUT_VERSION (0x00008000ULL) /* layout version for + * OST objects */ #define OBD_MD_FLRDEV (0x00010000ULL) /* device number */ #define OBD_MD_FLEASIZE (0x00020000ULL) /* extended attribute data */ #define OBD_MD_LINKNAME (0x00040000ULL) /* symbolic link target */ @@ -2683,7 +2684,7 @@ struct llog_setattr64_rec_v2 { __u32 lsr_gid_h; __u64 lsr_valid; __u32 lsr_projid; - __u32 lsr_padding1; + __u32 lsr_layout_version; __u64 lsr_padding2; __u64 lsr_padding3; struct llog_rec_tail lsr_tail; @@ -2922,7 +2923,7 @@ struct obdo { * * sizeof(ost_layout) + sieof(__u32) == sizeof(llog_cookie). */ struct ost_layout o_layout; - __u32 o_padding_3; + __u32 o_layout_version; __u32 o_uid_h; __u32 o_gid_h; diff --git a/lustre/include/uapi/linux/lustre/lustre_user.h b/lustre/include/uapi/linux/lustre/lustre_user.h index c400d20..c538657 100644 --- a/lustre/include/uapi/linux/lustre/lustre_user.h +++ b/lustre/include/uapi/linux/lustre/lustre_user.h @@ -193,6 +193,9 @@ struct filter_fid_old { struct filter_fid { struct lu_fid ff_parent; struct ost_layout ff_layout; + __u32 ff_layout_version; + __u32 ff_range; /* range of layout version that + * write are allowed */ } __attribute__((packed)); /* Userspace should treat lu_fid as opaque, and only use the following methods @@ -550,6 +553,10 @@ enum lov_comp_md_entry_flags { #define LCME_KNOWN_FLAGS (LCME_FL_NEG | LCME_FL_INIT) +/* the highest bit in obdo::o_layout_version is used to mark if the file is + * being resynced. */ +#define LU_LAYOUT_RESYNC LCME_FL_NEG + /* lcme_id can be specified as certain flags, and the the first * bit of lcme_id is used to indicate that the ID is representing * certain LCME_FL_* but not a real ID. Which implies we can have @@ -945,6 +952,8 @@ enum changelog_rec_type { CL_CTIME = 18, CL_ATIME = 19, CL_MIGRATE = 20, + CL_FLRW = 21, /* FLR: file was firstly written */ + CL_RESYNC = 22, /* FLR: file was resync-ed */ CL_LAST }; @@ -952,7 +961,8 @@ static inline const char *changelog_type2str(int type) { static const char *changelog_str[] = { "MARK", "CREAT", "MKDIR", "HLINK", "SLINK", "MKNOD", "UNLNK", "RMDIR", "RENME", "RNMTO", "OPEN", "CLOSE", "LYOUT", "TRUNC", - "SATTR", "XATTR", "HSM", "MTIME", "CTIME", "ATIME", "MIGRT" + "SATTR", "XATTR", "HSM", "MTIME", "CTIME", "ATIME", "MIGRT", + "FLRW", "RESYNC", }; if (type >= 0 && type < CL_LAST) diff --git a/lustre/llite/vvp_object.c b/lustre/llite/vvp_object.c index 8904e45..6ca4212 100644 --- a/lustre/llite/vvp_object.c +++ b/lustre/llite/vvp_object.c @@ -169,6 +169,13 @@ static int vvp_prune(const struct lu_env *env, struct cl_object *obj) } truncate_inode_pages(inode->i_mapping, 0); + if (inode->i_mapping->nrpages) { + CDEBUG(D_VFSTRACE, DFID ": still has %lu pages remaining\n", + PFID(lu_object_fid(&obj->co_lu)), + inode->i_mapping->nrpages); + RETURN(-EIO); + } + RETURN(0); } diff --git a/lustre/lod/lod_dev.c b/lustre/lod/lod_dev.c index 5f80430..94d4a33 100644 --- a/lustre/lod/lod_dev.c +++ b/lustre/lod/lod_dev.c @@ -1865,6 +1865,10 @@ static void lod_key_fini(const struct lu_context *ctx, if (inuse->op_size) OBD_FREE(inuse->op_array, inuse->op_size); + if (info->lti_comp_size > 0) + OBD_FREE(info->lti_comp_idx, + info->lti_comp_size * sizeof(__u32)); + OBD_FREE_PTR(info); } diff --git a/lustre/lod/lod_internal.h b/lustre/lod/lod_internal.h index b475827..af6b736 100644 --- a/lustre/lod/lod_internal.h +++ b/lustre/lod/lod_internal.h @@ -408,6 +408,10 @@ struct lod_thread_info { /* used to store parent default striping in create */ struct lod_default_striping lti_def_striping; struct filter_fid lti_ff; + __u32 *lti_comp_idx; + size_t lti_comp_size; + size_t lti_count; + struct lu_attr lti_layout_attr; }; extern const struct lu_device_operations lod_lu_ops; @@ -450,6 +454,11 @@ static inline struct lu_object *lod2lu_obj(struct lod_object *obj) return &obj->ldo_obj.do_lu; } +static inline const struct lu_fid *lod_object_fid(struct lod_object *obj) +{ + return lu_object_fid(lod2lu_obj(obj)); +} + static inline struct lod_object *lod_obj(const struct lu_object *o) { LASSERT(lu_device_is_lod(o->lo_dev)); @@ -637,18 +646,25 @@ int lod_pool_new(struct obd_device *obd, char *poolname); int lod_pool_add(struct obd_device *obd, char *poolname, char *ostname); int lod_pool_remove(struct obd_device *obd, char *poolname, char *ostname); +struct lod_obj_stripe_cb_data; +typedef int (*lod_obj_stripe_cb_t)(const struct lu_env *env, + struct lod_object *lo, struct dt_object *dt, + struct thandle *th, + int comp_idx, int stripe_idx, + struct lod_obj_stripe_cb_data *data); +typedef bool (*lod_obj_comp_skip_cb_t)(const struct lu_env *env, + struct lod_object *lo, int comp_idx, + struct lod_obj_stripe_cb_data *data); struct lod_obj_stripe_cb_data { union { const struct lu_attr *locd_attr; struct ost_pool *locd_inuse; }; - bool locd_declare; + lod_obj_stripe_cb_t locd_stripe_cb; + lod_obj_comp_skip_cb_t locd_comp_skip_cb; + bool locd_declare; }; -typedef int (*lod_obj_stripe_cb_t)(const struct lu_env *env, - struct lod_object *lo, struct dt_object *dt, - struct thandle *th, int stripe_idx, - struct lod_obj_stripe_cb_data *data); /* lod_qos.c */ int lod_prepare_inuse(const struct lu_env *env, struct lod_object *lo); int lod_prepare_create(const struct lu_env *env, struct lod_object *lo, @@ -661,7 +677,7 @@ int lod_use_defined_striping(const struct lu_env *, struct lod_object *, const struct lu_buf *); int lod_obj_stripe_set_inuse_cb(const struct lu_env *env, struct lod_object *lo, struct dt_object *dt, struct thandle *th, - int stripe_idx, + int comp_idx, int stripe_idx, struct lod_obj_stripe_cb_data *data); int lod_qos_parse_config(const struct lu_env *env, struct lod_object *lo, const struct lu_buf *buf); @@ -693,7 +709,7 @@ int lod_striped_create(const struct lu_env *env, struct dt_object *dt, void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo); int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo, - struct thandle *th, lod_obj_stripe_cb_t cb, + struct thandle *th, struct lod_obj_stripe_cb_data *data); /* lod_sub_object.c */ diff --git a/lustre/lod/lod_object.c b/lustre/lod/lod_object.c index 5cde03d..b0e940f 100644 --- a/lustre/lod/lod_object.c +++ b/lustre/lod/lod_object.c @@ -1047,7 +1047,7 @@ static int lod_attr_get(const struct lu_env *env, } int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo, - struct thandle *th, lod_obj_stripe_cb_t cb, + struct thandle *th, struct lod_obj_stripe_cb_data *data) { struct lod_layout_component *lod_comp; @@ -1061,13 +1061,23 @@ int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo, if (lod_comp->llc_stripe == NULL) continue; + /* has stripe but not inited yet, this component has been + * declared to be created, but hasn't created yet. + */ + if (!lod_comp_inited(lod_comp)) + continue; + + if (data->locd_comp_skip_cb && + data->locd_comp_skip_cb(env, lo, i, data)) + continue; + LASSERT(lod_comp->llc_stripe_count > 0); for (j = 0; j < lod_comp->llc_stripe_count; j++) { struct dt_object *dt = lod_comp->llc_stripe[j]; if (dt == NULL) continue; - rc = cb(env, lo, dt, th, j, data); + rc = data->locd_stripe_cb(env, lo, dt, th, i, j, data); if (rc != 0) RETURN(rc); } @@ -1075,10 +1085,63 @@ int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo, RETURN(0); } +static bool lod_obj_attr_set_comp_skip_cb(const struct lu_env *env, + struct lod_object *lo, int comp_idx, + struct lod_obj_stripe_cb_data *data) +{ + struct lod_layout_component *lod_comp = &lo->ldo_comp_entries[comp_idx]; + bool skipped = false; + + if (!(data->locd_attr->la_valid & LA_LAYOUT_VERSION)) + return skipped; + + switch (lo->ldo_flr_state) { + case LCM_FL_WRITE_PENDING: { + int i; + + /* skip stale components */ + if (lod_comp->llc_flags & LCME_FL_STALE) { + skipped = true; + break; + } + + /* skip valid and overlapping components, therefore any + * attempts to write overlapped components will never succeed + * because client will get EINPROGRESS. */ + for (i = 0; i < lo->ldo_comp_cnt; i++) { + if (i == comp_idx) + continue; + + if (lo->ldo_comp_entries[i].llc_flags & LCME_FL_STALE) + continue; + + if (lu_extent_is_overlapped(&lod_comp->llc_extent, + &lo->ldo_comp_entries[i].llc_extent)) { + skipped = true; + break; + } + } + break; + } + default: + LASSERTF(0, "impossible: %d\n", lo->ldo_flr_state); + case LCM_FL_SYNC_PENDING: + break; + } + + CDEBUG(D_LAYOUT, DFID": %s to set component %x to version: %u\n", + PFID(lu_object_fid(&lo->ldo_obj.do_lu)), + skipped ? "skipped" : "chose", lod_comp->llc_id, + data->locd_attr->la_layout_version); + + return skipped; +} + static inline int lod_obj_stripe_attr_set_cb(const struct lu_env *env, struct lod_object *lo, struct dt_object *dt, struct thandle *th, - int stripe_idx, struct lod_obj_stripe_cb_data *data) + int comp_idx, int stripe_idx, + struct lod_obj_stripe_cb_data *data) { if (data->locd_declare) return lod_sub_declare_attr_set(env, dt, data->locd_attr, th); @@ -1120,7 +1183,7 @@ static int lod_declare_attr_set(const struct lu_env *env, * speed up rename(). */ if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) { - if (!(attr->la_valid & (LA_UID | LA_GID | LA_PROJID))) + if (!(attr->la_valid & LA_REMOTE_ATTR_SET)) RETURN(rc); if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER)) @@ -1157,12 +1220,12 @@ static int lod_declare_attr_set(const struct lu_env *env, RETURN(rc); } } else { - struct lod_obj_stripe_cb_data data; + struct lod_obj_stripe_cb_data data = { { 0 } }; data.locd_attr = attr; data.locd_declare = true; - rc = lod_obj_for_each_stripe(env, lo, th, - lod_obj_stripe_attr_set_cb, &data); + data.locd_stripe_cb = lod_obj_stripe_attr_set_cb; + rc = lod_obj_for_each_stripe(env, lo, th, &data); } if (rc) @@ -1217,7 +1280,7 @@ static int lod_attr_set(const struct lu_env *env, RETURN(rc); if (!S_ISDIR(dt->do_lu.lo_header->loh_attr)) { - if (!(attr->la_valid & (LA_UID | LA_GID | LA_PROJID))) + if (!(attr->la_valid & LA_REMOTE_ATTR_SET)) RETURN(rc); if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_OWNER)) @@ -1229,6 +1292,14 @@ static int lod_attr_set(const struct lu_env *env, RETURN(rc); } + /* FIXME: a tricky case in the code path of mdd_layout_change(): + * the in-memory striping information has been freed in lod_xattr_set() + * due to layout change. It has to load stripe here again. It only + * changes flags of layout so declare_attr_set() is still accurate */ + rc = lod_load_striping_locked(env, lo); + if (rc) + RETURN(rc); + if (!lod_obj_is_striped(dt)) RETURN(0); @@ -1249,12 +1320,13 @@ static int lod_attr_set(const struct lu_env *env, break; } } else { - struct lod_obj_stripe_cb_data data; + struct lod_obj_stripe_cb_data data = { { 0 } }; data.locd_attr = attr; data.locd_declare = false; - rc = lod_obj_for_each_stripe(env, lo, th, - lod_obj_stripe_attr_set_cb, &data); + data.locd_comp_skip_cb = lod_obj_attr_set_comp_skip_cb; + data.locd_stripe_cb = lod_obj_stripe_attr_set_cb; + rc = lod_obj_for_each_stripe(env, lo, th, &data); } if (rc) @@ -2057,7 +2129,7 @@ static int lod_obj_stripe_replace_parent_fid_cb(const struct lu_env *env, struct lod_object *lo, struct dt_object *dt, struct thandle *th, - int stripe_idx, + int comp_idx, int stripe_idx, struct lod_obj_stripe_cb_data *data) { struct lod_thread_info *info = lod_env_info(env); @@ -2110,7 +2182,7 @@ static int lod_replace_parent_fid(const struct lu_env *env, struct lod_thread_info *info = lod_env_info(env); struct lu_buf *buf = &info->lti_buf; struct filter_fid *ff; - struct lod_obj_stripe_cb_data data; + struct lod_obj_stripe_cb_data data = { { 0 } }; int rc; ENTRY; @@ -2134,9 +2206,8 @@ static int lod_replace_parent_fid(const struct lu_env *env, buf->lb_len = info->lti_ea_store_size; data.locd_declare = declare; - rc = lod_obj_for_each_stripe(env, lo, th, - lod_obj_stripe_replace_parent_fid_cb, - &data); + data.locd_stripe_cb = lod_obj_stripe_replace_parent_fid_cb; + rc = lod_obj_for_each_stripe(env, lo, th, &data); RETURN(rc); } @@ -4611,7 +4682,8 @@ static int lod_create(const struct lu_env *env, struct dt_object *dt, static inline int lod_obj_stripe_destroy_cb(const struct lu_env *env, struct lod_object *lo, struct dt_object *dt, struct thandle *th, - int stripe_idx, struct lod_obj_stripe_cb_data *data) + int comp_idx, int stripe_idx, + struct lod_obj_stripe_cb_data *data) { if (data->locd_declare) return lod_sub_declare_destroy(env, dt, th); @@ -4703,11 +4775,11 @@ static int lod_declare_destroy(const struct lu_env *env, struct dt_object *dt, break; } } else { - struct lod_obj_stripe_cb_data data; + struct lod_obj_stripe_cb_data data = { { 0 } }; data.locd_declare = true; - rc = lod_obj_for_each_stripe(env, lo, th, - lod_obj_stripe_destroy_cb, &data); + data.locd_stripe_cb = lod_obj_stripe_destroy_cb; + rc = lod_obj_for_each_stripe(env, lo, th, &data); } RETURN(rc); @@ -4793,11 +4865,11 @@ static int lod_destroy(const struct lu_env *env, struct dt_object *dt, } } } else { - struct lod_obj_stripe_cb_data data; + struct lod_obj_stripe_cb_data data = { { 0 } }; data.locd_declare = false; - rc = lod_obj_for_each_stripe(env, lo, th, - lod_obj_stripe_destroy_cb, &data); + data.locd_stripe_cb = lod_obj_stripe_destroy_cb; + rc = lod_obj_for_each_stripe(env, lo, th, &data); } RETURN(rc); @@ -5055,30 +5127,78 @@ static int lod_invalidate(const struct lu_env *env, struct dt_object *dt) return dt_invalidate(env, dt_object_child(dt)); } -static int lod_declare_layout_change(const struct lu_env *env, - struct dt_object *dt, - struct md_layout_change *mlc, - struct thandle *th) +static int lod_layout_data_init(struct lod_thread_info *info, __u32 comp_cnt) { - struct lod_thread_info *info = lod_env_info(env); - struct lod_object *lo = lod_dt_obj(dt); - struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev); - struct dt_object *next = dt_object_child(dt); + ENTRY; + + /* clear memory region that will be used for layout change */ + memset(&info->lti_layout_attr, 0, sizeof(struct lu_attr)); + info->lti_count = 0; + + if (info->lti_comp_size >= comp_cnt) + RETURN(0); + + if (info->lti_comp_size > 0) { + OBD_FREE(info->lti_comp_idx, + info->lti_comp_size * sizeof(__u32)); + info->lti_comp_size = 0; + } + + OBD_ALLOC(info->lti_comp_idx, comp_cnt * sizeof(__u32)); + if (!info->lti_comp_idx) + RETURN(-ENOMEM); + + info->lti_comp_size = comp_cnt; + RETURN(0); +} + +static int lod_declare_instantiate_components(const struct lu_env *env, + struct lod_object *lo, struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); struct ost_pool *inuse = &info->lti_inuse_osts; - struct layout_intent *layout = mlc->mlc_intent; - struct lu_buf *buf = &mlc->mlc_buf; + int i; + int rc = 0; + ENTRY; + + LASSERT(info->lti_count < lo->ldo_comp_cnt); + if (info->lti_count > 0) { + /* Prepare inuse array for composite file */ + rc = lod_prepare_inuse(env, lo); + if (rc) + RETURN(rc); + } + + for (i = 0; i < info->lti_count; i++) { + rc = lod_qos_prep_create(env, lo, NULL, th, + info->lti_comp_idx[i], inuse); + if (rc) + break; + } + + if (!rc) { + info->lti_buf.lb_len = lod_comp_md_size(lo, false); + rc = lod_sub_declare_xattr_set(env, lod_object_child(lo), + &info->lti_buf, XATTR_NAME_LOV, 0, th); + } + + RETURN(rc); +} + +static int lod_declare_update_plain(const struct lu_env *env, + struct lod_object *lo, struct layout_intent *layout, + const struct lu_buf *buf, struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_device *d = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); struct lod_layout_component *lod_comp; struct lov_comp_md_v1 *comp_v1 = NULL; bool replay = false; - bool need_create = false; int i, rc; ENTRY; - if (!S_ISREG(dt->do_lu.lo_header->loh_attr) || !dt_object_exists(dt) || - dt_object_remote(next)) - RETURN(-EINVAL); + LASSERT(lo->ldo_flr_state == LCM_FL_NOT_FLR); - dt_write_lock(env, next, 0); /* * In case the client is passing lovea, which only happens during * the replay of layout intent write RPC for now, we may need to @@ -5113,11 +5233,6 @@ static int lod_declare_layout_change(const struct lu_env *env, rc = lod_load_striping_locked(env, lo); if (rc) GOTO(out, rc); - - /* Prepare inuse array for composite file */ - rc = lod_prepare_inuse(env, lo); - if (rc) - GOTO(out, rc); } /* Make sure defined layout covers the requested write range. */ @@ -5134,7 +5249,7 @@ static int lod_declare_layout_change(const struct lu_env *env, } CDEBUG(D_LAYOUT, "%s: "DFID": instantiate components "DEXT"\n", - lod2obd(d)->obd_name, PFID(lu_object_fid(&dt->do_lu)), + lod2obd(d)->obd_name, PFID(lod_object_fid(lo)), PEXT(&layout->li_extent)); /* @@ -5170,30 +5285,292 @@ static int lod_declare_layout_change(const struct lu_env *env, if (lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED) GOTO(out, rc = -EINVAL); - need_create = true; + LASSERT(info->lti_comp_idx != NULL); + info->lti_comp_idx[info->lti_count++] = i; + } + + if (info->lti_count == 0) + RETURN(-EALREADY); - rc = lod_qos_prep_create(env, lo, NULL, th, i, inuse); - if (rc) + lod_obj_inc_layout_gen(lo); + rc = lod_declare_instantiate_components(env, lo, th); +out: + if (rc) + lod_object_free_striping(env, lo); + RETURN(rc); +} + +#define lod_foreach_mirror_comp(comp, lo, mirror_idx) \ +for (comp = &lo->ldo_comp_entries[lo->ldo_mirrors[mirror_idx].lme_start]; \ + comp <= &lo->ldo_comp_entries[lo->ldo_mirrors[mirror_idx].lme_end]; \ + comp++) + +static inline int lod_comp_index(struct lod_object *lo, + struct lod_layout_component *lod_comp) +{ + LASSERT(lod_comp >= lo->ldo_comp_entries && + lod_comp <= &lo->ldo_comp_entries[lo->ldo_comp_cnt - 1]); + + return lod_comp - lo->ldo_comp_entries; +} + +/** + * Stale other mirrors by writing extent. + */ +static void lod_stale_components(struct lod_object *lo, int primary, + struct lu_extent *extent) +{ + struct lod_layout_component *pri_comp, *lod_comp; + int i; + + /* The writing extent decides which components in the primary + * are affected... */ + lod_foreach_mirror_comp(pri_comp, lo, primary) { + if (!lu_extent_is_overlapped(extent, &pri_comp->llc_extent)) + continue; + + for (i = 0; i < lo->ldo_mirror_count; i++) { + if (i == primary) + continue; + + /* ... and then stale other components that are + * overlapping with primary components */ + lod_foreach_mirror_comp(lod_comp, lo, i) { + if (!lu_extent_is_overlapped( + &pri_comp->llc_extent, + &lod_comp->llc_extent)) + continue; + + CDEBUG(D_LAYOUT, "stale: %u / %u\n", + i, lod_comp_index(lo, lod_comp)); + + lod_comp->llc_flags |= LCME_FL_STALE; + lo->ldo_mirrors[i].lme_stale = 1; + } + } + } +} + +static int lod_declare_update_rdonly(const struct lu_env *env, + struct lod_object *lo, struct md_layout_change *mlc, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lu_attr *layout_attr = &info->lti_layout_attr; + struct lod_layout_component *lod_comp; + struct layout_intent *layout = mlc->mlc_intent; + struct lu_extent extent = layout->li_extent; + int picked; + int i; + int rc; + ENTRY; + + LASSERT(mlc->mlc_opc == MD_LAYOUT_WRITE); + LASSERT(lo->ldo_flr_state == LCM_FL_RDONLY); + LASSERT(lo->ldo_mirror_count > 0); + + CDEBUG(D_LAYOUT, DFID": trying to write :"DEXT"\n", + PFID(lod_object_fid(lo)), PEXT(&extent)); + + /** + * Pick a mirror as the primary. + * Now it only picks the first mirror, this algo can be + * revised later after knowing the topology of cluster or + * the availability of OSTs. + */ + for (picked = -1, i = 0; i < lo->ldo_mirror_count; i++) { + if (!lo->ldo_mirrors[i].lme_stale) { + picked = i; break; + } } + if (picked < 0) /* failed to pick a primary */ + RETURN(-ENODATA); - if (need_create) - lod_obj_inc_layout_gen(lo); - else - GOTO(unlock, rc = -EALREADY); + CDEBUG(D_LAYOUT, DFID": picked mirror %u as primary\n", + PFID(lod_object_fid(lo)), lo->ldo_mirrors[picked].lme_id); - if (!rc) { - info->lti_buf.lb_len = lod_comp_md_size(lo, false); - rc = lod_sub_declare_xattr_set(env, next, &info->lti_buf, - XATTR_NAME_LOV, 0, th); + /* stale overlapping components from other mirrors */ + lod_stale_components(lo, picked, &extent); + + /* instantiate components for the picked mirror, start from 0 */ + extent = (struct lu_extent) { 0, layout->li_extent.e_end }; + lod_foreach_mirror_comp(lod_comp, lo, picked) { + if (!lu_extent_is_overlapped(&extent, + &lod_comp->llc_extent)) + break; + + if (lod_comp_inited(lod_comp)) + continue; + + CDEBUG(D_LAYOUT, "instantiate: %u / %u\n", + i, lod_comp_index(lo, lod_comp)); + + info->lti_comp_idx[info->lti_count++] = + lod_comp_index(lo, lod_comp); } + + lo->ldo_flr_state = LCM_FL_WRITE_PENDING; + + /* Reset the layout version once it's becoming too large. + * This way it can make sure that the layout version is + * monotonously increased in this writing era. */ + lod_obj_inc_layout_gen(lo); + if (lo->ldo_layout_gen > (LCME_ID_MAX >> 1)) { + __u32 layout_version; + + cfs_get_random_bytes(&layout_version, sizeof(layout_version)); + lo->ldo_layout_gen = layout_version & 0xffff; + } + + rc = lod_declare_instantiate_components(env, lo, th); + if (rc) + GOTO(out, rc); + + layout_attr->la_valid = LA_LAYOUT_VERSION; + layout_attr->la_layout_version = 0; /* set current version */ + rc = lod_declare_attr_set(env, &lo->ldo_obj, layout_attr, th); + if (rc) + GOTO(out, rc); + out: if (rc) lod_object_free_striping(env, lo); + RETURN(rc); +} -unlock: - dt_write_unlock(env, next); +static int lod_declare_update_write_pending(const struct lu_env *env, + struct lod_object *lo, struct md_layout_change *mlc, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lu_attr *layout_attr = &info->lti_layout_attr; + struct lod_layout_component *lod_comp; + struct lu_extent extent = { 0 }; + int primary = -1; + int i; + int rc; + ENTRY; + + LASSERT(lo->ldo_flr_state == LCM_FL_WRITE_PENDING); + LASSERT(mlc->mlc_opc == MD_LAYOUT_WRITE); + + /* look for the primary mirror */ + for (i = 0; i < lo->ldo_mirror_count; i++) { + if (lo->ldo_mirrors[i].lme_stale) + continue; + + LASSERTF(primary < 0, DFID " has multiple primary: %u / %u", + PFID(lod_object_fid(lo)), + lo->ldo_mirrors[i].lme_id, + lo->ldo_mirrors[primary].lme_id); + + primary = i; + } + if (primary < 0) { + CERROR(DFID ": doesn't have a primary mirror\n", + PFID(lod_object_fid(lo))); + GOTO(out, rc = -ENODATA); + } + + CDEBUG(D_LAYOUT, DFID": found primary %u\n", + PFID(lod_object_fid(lo)), lo->ldo_mirrors[primary].lme_id); + LASSERT(!lo->ldo_mirrors[primary].lme_stale); + + /* for LAYOUT_WRITE opc, it has to do the following operations: + * 1. stale overlapping componets from stale mirrors; + * 2. instantiate components of the primary mirror; + * 3. transfter layout version to all objects of the primary; */ + + if (mlc->mlc_opc == MD_LAYOUT_WRITE) { + LASSERT(mlc->mlc_intent != NULL); + + extent = mlc->mlc_intent->li_extent; + + CDEBUG(D_LAYOUT, DFID": intent to write: "DEXT"\n", + PFID(lod_object_fid(lo)), PEXT(&extent)); + + /* 1. stale overlapping components */ + lod_stale_components(lo, primary, &extent); + + /* 2. find out the components need instantiating. + * instantiate [0, mlc->mlc_intent->e_end) */ + extent.e_start = 0; + lod_foreach_mirror_comp(lod_comp, lo, primary) { + if (!lu_extent_is_overlapped(&extent, + &lod_comp->llc_extent)) + break; + + if (lod_comp_inited(lod_comp)) + continue; + + CDEBUG(D_LAYOUT, "write instantiate %d / %d\n", + primary, lod_comp_index(lo, lod_comp)); + info->lti_comp_idx[info->lti_count++] = + lod_comp_index(lo, lod_comp); + } + } + + rc = lod_declare_instantiate_components(env, lo, th); + if (rc) + GOTO(out, rc); + + layout_attr->la_valid = LA_LAYOUT_VERSION; + layout_attr->la_layout_version = 0; /* set current version */ + rc = lod_declare_attr_set(env, &lo->ldo_obj, layout_attr, th); + if (rc) + GOTO(out, rc); + + lod_obj_inc_layout_gen(lo); +out: + if (rc) + lod_object_free_striping(env, lo); + RETURN(rc); +} + +static int lod_declare_layout_change(const struct lu_env *env, + struct dt_object *dt, struct md_layout_change *mlc, + struct thandle *th) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_object *lo = lod_dt_obj(dt); + int rc; + ENTRY; + + if (!S_ISREG(dt->do_lu.lo_header->loh_attr) || !dt_object_exists(dt) || + dt_object_remote(dt_object_child(dt))) + RETURN(-EINVAL); + + lod_write_lock(env, dt, 0); + rc = lod_load_striping_locked(env, lo); + if (rc) + GOTO(out, rc); + + LASSERT(lo->ldo_comp_cnt > 0); + + rc = lod_layout_data_init(info, lo->ldo_comp_cnt); + if (rc) + GOTO(out, rc); + + switch (lo->ldo_flr_state) { + case LCM_FL_NOT_FLR: + rc = lod_declare_update_plain(env, lo, mlc->mlc_intent, + &mlc->mlc_buf, th); + break; + case LCM_FL_RDONLY: + rc = lod_declare_update_rdonly(env, lo, mlc, th); + break; + case LCM_FL_WRITE_PENDING: + rc = lod_declare_update_write_pending(env, lo, mlc, th); + break; + case LCM_FL_SYNC_PENDING: + default: + rc = -ENOTSUPP; + break; + } +out: + dt_write_unlock(env, dt); RETURN(rc); } @@ -5204,8 +5581,17 @@ static int lod_layout_change(const struct lu_env *env, struct dt_object *dt, struct md_layout_change *mlc, struct thandle *th) { struct lu_attr *attr = &lod_env_info(env)->lti_attr; + struct lu_attr *layout_attr = &lod_env_info(env)->lti_layout_attr; + struct lod_object *lo = lod_dt_obj(dt); + int rc; - RETURN(lod_striped_create(env, dt, attr, NULL, th)); + rc = lod_striped_create(env, dt, attr, NULL, th); + if (!rc && layout_attr->la_valid & LA_LAYOUT_VERSION) { + layout_attr->la_layout_version |= lo->ldo_layout_gen; + rc = lod_attr_set(env, dt, layout_attr, th); + } + + return rc; } struct dt_object_operations lod_obj_ops = { diff --git a/lustre/lod/lod_qos.c b/lustre/lod/lod_qos.c index 4943bc2..12aee55 100644 --- a/lustre/lod/lod_qos.c +++ b/lustre/lod/lod_qos.c @@ -2155,7 +2155,7 @@ out: int lod_obj_stripe_set_inuse_cb(const struct lu_env *env, struct lod_object *lo, struct dt_object *dt, struct thandle *th, - int stripe_idx, + int comp_idx, int stripe_idx, struct lod_obj_stripe_cb_data *data) { struct lod_thread_info *info = lod_env_info(env); @@ -2216,7 +2216,7 @@ int lod_prepare_inuse(const struct lu_env *env, struct lod_object *lo) struct lod_thread_info *info = lod_env_info(env); struct lod_device *d = lu2lod_dev(lod2lu_obj(lo)->lo_dev); struct ost_pool *inuse = &info->lti_inuse_osts; - struct lod_obj_stripe_cb_data data; + struct lod_obj_stripe_cb_data data = { { 0 } }; __u32 stripe_count = 0; int i; int rc; @@ -2229,8 +2229,8 @@ int lod_prepare_inuse(const struct lu_env *env, struct lod_object *lo) return rc; data.locd_inuse = inuse; - return lod_obj_for_each_stripe(env, lo, NULL, - lod_obj_stripe_set_inuse_cb, &data); + data.locd_stripe_cb = lod_obj_stripe_set_inuse_cb; + return lod_obj_for_each_stripe(env, lo, NULL, &data); } int lod_prepare_create(const struct lu_env *env, struct lod_object *lo, diff --git a/lustre/lov/lov_cl_internal.h b/lustre/lov/lov_cl_internal.h index 375675c..9382830 100644 --- a/lustre/lov/lov_cl_internal.h +++ b/lustre/lov/lov_cl_internal.h @@ -230,6 +230,7 @@ struct lov_layout_entry { struct lov_mirror_entry { unsigned short lre_mirror_id; unsigned short lre_preferred:1, + lre_stale:1, /* set if any components is stale */ lre_valid:1; /* set if at least one of components * in this mirror is valid */ unsigned short lre_start; /* index to lo_entries, start index of @@ -435,6 +436,8 @@ struct lov_page { struct cl_page_slice lps_cl; /** layout_entry + stripe index, composed using lov_comp_index() */ unsigned int lps_index; + /* the layout gen when this page was created */ + __u32 lps_layout_gen; }; /* diff --git a/lustre/lov/lov_io.c b/lustre/lov/lov_io.c index fa84386..9440d4f 100644 --- a/lustre/lov/lov_io.c +++ b/lustre/lov/lov_io.c @@ -139,6 +139,7 @@ static int lov_io_sub_init(const struct lu_env *env, struct lov_io *lio, sub_io->ci_pio = io->ci_pio; sub_io->ci_lock_no_expand = io->ci_lock_no_expand; sub_io->ci_ndelay = io->ci_ndelay; + sub_io->ci_layout_version = io->ci_layout_version; result = cl_io_sub_init(sub->sub_env, sub_io, io->ci_type, sub_obj); @@ -215,12 +216,89 @@ static int lov_io_subio_init(const struct lu_env *env, struct lov_io *lio, RETURN(0); } +/** + * Decide if it will need write intent RPC + */ +static int lov_io_mirror_write_intent(struct lov_io *lio, + struct lov_object *obj, struct cl_io *io) +{ + struct lov_layout_composite *comp = &obj->u.composite; + struct lu_extent *ext = &io->ci_write_intent; + struct lov_mirror_entry *lre; + struct lov_mirror_entry *primary; + struct lov_layout_entry *lle; + size_t count = 0; + ENTRY; + + *ext = (typeof(*ext)) { lio->lis_pos, lio->lis_endpos }; + io->ci_need_write_intent = 0; + + if (!(io->ci_type == CIT_WRITE || cl_io_is_trunc(io) || + cl_io_is_mkwrite(io))) + RETURN(0); + + if (lov_flr_state(obj) == LCM_FL_RDONLY || + lov_flr_state(obj) == LCM_FL_SYNC_PENDING) { + io->ci_need_write_intent = 1; + RETURN(0); + } + + LASSERT((lov_flr_state(obj) == LCM_FL_WRITE_PENDING)); + LASSERT(comp->lo_preferred_mirror >= 0); + + /* need to iterate all components to see if there are + * multiple components covering the writing component */ + primary = &comp->lo_mirrors[comp->lo_preferred_mirror]; + LASSERT(!primary->lre_stale); + lov_foreach_mirror_layout_entry(obj, lle, primary) { + LASSERT(lle->lle_valid); + if (!lu_extent_is_overlapped(ext, lle->lle_extent)) + continue; + + ext->e_start = MIN(ext->e_start, lle->lle_extent->e_start); + ext->e_end = MAX(ext->e_end, lle->lle_extent->e_end); + ++count; + } + if (count == 0) { + CERROR(DFID ": cannot find any valid components covering " + "file extent "DEXT", mirror: %d\n", + PFID(lu_object_fid(lov2lu(obj))), PEXT(ext), + primary->lre_mirror_id); + RETURN(-EIO); + } + + count = 0; + lov_foreach_mirror_entry(obj, lre) { + if (lre == primary) + continue; + + lov_foreach_mirror_layout_entry(obj, lle, lre) { + if (!lle->lle_valid) + continue; + + if (lu_extent_is_overlapped(ext, lle->lle_extent)) { + ++count; + break; + } + } + } + + CDEBUG(D_VFSTRACE, DFID "there are %zd components to be staled to " + "modify file extent "DEXT", iot: %d\n", + PFID(lu_object_fid(lov2lu(obj))), count, PEXT(ext), io->ci_type); + + io->ci_need_write_intent = count > 0; + + RETURN(0); +} + static int lov_io_mirror_init(struct lov_io *lio, struct lov_object *obj, struct cl_io *io) { struct lov_layout_composite *comp = &obj->u.composite; int index; int i; + int result; ENTRY; if (!lov_is_flr(obj)) { @@ -230,6 +308,22 @@ static int lov_io_mirror_init(struct lov_io *lio, struct lov_object *obj, RETURN(0); } + result = lov_io_mirror_write_intent(lio, obj, io); + if (result) + RETURN(result); + + if (io->ci_need_write_intent) { + CDEBUG(D_VFSTRACE, DFID " need write intent for [%llu, %llu)\n", + PFID(lu_object_fid(lov2lu(obj))), + lio->lis_pos, lio->lis_endpos); + + /* stop cl_io_init() loop */ + RETURN(1); + } + + /* transfer the layout version for verification */ + io->ci_layout_version = obj->lo_lsm->lsm_layout_gen; + if (io->ci_ndelay_tried == 0 || /* first time to try */ /* reset the mirror index if layout has changed */ lio->lis_mirror_layout_gen != obj->lo_lsm->lsm_layout_gen) { @@ -333,7 +427,7 @@ static int lov_io_slice_init(struct lov_io *lio, * the current file-tail exactly. */ if (unlikely(obj->lo_lsm->lsm_entries[0]->lsme_pattern & LOV_PATTERN_F_HOLE)) - RETURN(-EIO); + GOTO(out, result = -EIO); lio->lis_pos = 0; lio->lis_endpos = OBD_OBJECT_EOF; @@ -378,7 +472,8 @@ static int lov_io_slice_init(struct lov_io *lio, if (lov_flr_state(obj) == LCM_FL_RDONLY && !OBD_FAIL_CHECK(OBD_FAIL_FLR_GLIMPSE_IMMUTABLE)) - RETURN(1); /* SoM is accurate, no need glimpse */ + /* SoM is accurate, no need glimpse */ + GOTO(out, result = 1); break; case CIT_MISC: @@ -392,12 +487,12 @@ static int lov_io_slice_init(struct lov_io *lio, result = lov_io_mirror_init(lio, obj, io); if (result) - RETURN(result); + GOTO(out, result); /* check if it needs to instantiate layout */ if (!(io->ci_type == CIT_WRITE || cl_io_is_mkwrite(io) || (cl_io_is_trunc(io) && io->u.ci_setattr.sa_attr.lvb_size > 0))) - RETURN(0); + GOTO(out, result = 0); ext.e_start = lio->lis_pos; ext.e_end = lio->lis_endpos; @@ -414,12 +509,13 @@ static int lov_io_slice_init(struct lov_io *lio, if (!lsm_entry_inited(obj->lo_lsm, index)) { io->ci_need_write_intent = 1; io->ci_write_intent = ext; - result = 1; - break; + GOTO(out, result = 1); } } + EXIT; - RETURN(result); +out: + return result; } static void lov_io_fini(const struct lu_env *env, const struct cl_io_slice *ios) @@ -836,6 +932,10 @@ static int lov_io_read_ahead(const struct lu_env *env, if (index < 0 || !lsm_entry_inited(loo->lo_lsm, index)) RETURN(-ENODATA); + /* avoid readahead to expand to stale components */ + if (!lov_entry(loo, index)->lle_valid) + RETURN(-EIO); + stripe = lov_stripe_number(loo->lo_lsm, index, offset); r0 = lov_r0(loo, index); diff --git a/lustre/lov/lov_object.c b/lustre/lov/lov_object.c index d8d479b..15d5c3c 100644 --- a/lustre/lov/lov_object.c +++ b/lustre/lov/lov_object.c @@ -699,6 +699,7 @@ static int lov_init_composite(const struct lu_env *env, struct lov_device *dev, if (i > 0) { if (mirror_id == lre->lre_mirror_id) { lre->lre_valid |= lle->lle_valid; + lre->lre_stale |= !lle->lle_valid; lre->lre_end = i; continue; } @@ -719,6 +720,7 @@ static int lov_init_composite(const struct lu_env *env, struct lov_device *dev, lre->lre_preferred = (lle->lle_lsme->lsme_flags & LCME_FL_PREFERRED); lre->lre_valid = lle->lle_valid; + lre->lre_stale = !lle->lle_valid; } /* sanity check for FLR */ @@ -758,7 +760,7 @@ static int lov_init_composite(const struct lu_env *env, struct lov_device *dev, mirror_count = 0, i = 0; lov_foreach_mirror_entry(lov, lre) { i++; - if (!lre->lre_valid) + if (lre->lre_stale) continue; mirror_count++; /* valid mirror */ diff --git a/lustre/lov/lov_page.c b/lustre/lov/lov_page.c index 5ab3da5..fa7aea7 100644 --- a/lustre/lov/lov_page.c +++ b/lustre/lov/lov_page.c @@ -56,8 +56,8 @@ static int lov_comp_page_print(const struct lu_env *env, struct lov_page *lp = cl2lov_page(slice); return (*printer)(env, cookie, - LUSTRE_LOV_NAME"-page@%p, comp index: %x\n", - lp, lp->lps_index); + LUSTRE_LOV_NAME"-page@%p, comp index: %x, gen: %u\n", + lp, lp->lps_index, lp->lps_layout_gen); } static const struct cl_page_operations lov_comp_page_ops = { @@ -96,6 +96,7 @@ int lov_page_init_composite(const struct lu_env *env, struct cl_object *obj, LASSERT(rc == 0); lpg->lps_index = lov_comp_index(entry, stripe); + lpg->lps_layout_gen = loo->lo_lsm->lsm_layout_gen; cl_page_slice_add(page, &lpg->lps_cl, obj, index, &lov_comp_page_ops); sub = lov_sub_get(env, lio, lpg->lps_index); diff --git a/lustre/mdd/mdd_object.c b/lustre/mdd/mdd_object.c index c45c86b..6eec728 100644 --- a/lustre/mdd/mdd_object.c +++ b/lustre/mdd/mdd_object.c @@ -1865,41 +1865,190 @@ static int mdd_declare_layout_change(const struct lu_env *env, } /* For PFL, this is used to instantiate necessary component objects. */ -int mdd_layout_change(const struct lu_env *env, struct md_object *obj, - struct md_layout_change *mlc) +static int +mdd_layout_instantiate_component(const struct lu_env *env, + struct mdd_object *obj, struct md_layout_change *mlc, + struct thandle *handle) { - struct mdd_object *mdd_obj = md2mdd_obj(obj); - struct mdd_device *mdd = mdo2mdd(obj); - struct thandle *handle; + struct mdd_device *mdd = mdd_obj2mdd_dev(obj); int rc; ENTRY; - handle = mdd_trans_create(env, mdd); - if (IS_ERR(handle)) - RETURN(PTR_ERR(handle)); - - rc = mdd_declare_layout_change(env, mdd, mdd_obj, mlc, handle); + rc = mdd_declare_layout_change(env, mdd, obj, mlc, handle); /** * It's possible that another layout write intent has already * instantiated our objects, so a -EALREADY returned, and we need to * do nothing. */ if (rc) - GOTO(stop, rc = (rc == -EALREADY) ? 0 : rc); + RETURN(rc == -EALREADY ? 0 : rc); rc = mdd_trans_start(env, mdd, handle); if (rc) - GOTO(stop, rc); + RETURN(rc); - mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD); - rc = mdo_layout_change(env, mdd_obj, mlc, handle); - mdd_write_unlock(env, mdd_obj); + mdd_write_lock(env, obj, MOR_TGT_CHILD); + rc = mdo_layout_change(env, obj, mlc, handle); + mdd_write_unlock(env, obj); if (rc) - GOTO(stop, rc); + RETURN(rc); - rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, mdd_obj, handle); -stop: - RETURN(mdd_trans_stop(env, mdd, rc, handle)); + rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, obj, handle); + RETURN(rc); +} + +/** + * Change the FLR layout from RDONLY to WRITE_PENDING. + * + * It picks the primary mirror, and bumps the layout version, and set + * layout version xattr to OST objects in a sync tx. In order to facilitate + * the handling of phantom writers from evicted clients, the clients carry + * layout version of the file with write RPC, so that the OSTs can verify + * if the write RPCs are legitimate, meaning not from evicted clients. + */ +static int +mdd_layout_update_rdonly(const struct lu_env *env, struct mdd_object *obj, + struct md_layout_change *mlc, struct thandle *handle) +{ + struct mdd_device *mdd = mdd_obj2mdd_dev(obj); + int rc; + ENTRY; + + rc = mdd_declare_layout_change(env, mdd, obj, mlc, handle); + if (rc) + GOTO(out, rc); + + rc = mdd_declare_xattr_del(env, mdd, obj, XATTR_NAME_SOM, handle); + if (rc) + GOTO(out, rc); + + /* record a changelog for data mover to consume */ + rc = mdd_declare_changelog_store(env, mdd, NULL, NULL, handle); + if (rc) + GOTO(out, rc); + + rc = mdd_trans_start(env, mdd, handle); + if (rc) + GOTO(out, rc); + + /* it needs a sync tx to make FLR to work properly */ + handle->th_sync = 1; + + mdd_write_lock(env, obj, MOR_TGT_CHILD); + rc = mdo_layout_change(env, obj, mlc, handle); + if (!rc) { + rc = mdo_xattr_del(env, obj, XATTR_NAME_SOM, handle); + if (rc == -ENODATA) + rc = 0; + } + mdd_write_unlock(env, obj); + if (rc) + GOTO(out, rc); + + rc = mdd_changelog_data_store(env, mdd, CL_FLRW, 0, obj, handle); + if (rc) + GOTO(out, rc); + + EXIT; + +out: + return rc; +} + +static int +mdd_layout_update_write_pending(const struct lu_env *env, + struct mdd_object *obj, struct md_layout_change *mlc, + struct thandle *handle) +{ + struct mdd_device *mdd = mdd_obj2mdd_dev(obj); + int rc; + ENTRY; + + rc = mdd_declare_layout_change(env, mdd, obj, mlc, handle); + if (rc) + GOTO(out, rc); + + rc = mdd_trans_start(env, mdd, handle); + if (rc) + GOTO(out, rc); + + /* it needs a sync tx to make FLR to work properly */ + handle->th_sync = 1; + + mdd_write_lock(env, obj, MOR_TGT_CHILD); + rc = mdo_layout_change(env, obj, mlc, handle); + mdd_write_unlock(env, obj); + if (rc) + GOTO(out, rc); + + EXIT; + +out: + return rc; +} + +/** + * Layout change callback for object. + * + * This is only used by FLR for now. In the future, it can be exteneded to + * handle all layout change. + */ +static int +mdd_layout_change(const struct lu_env *env, struct md_object *o, + struct md_layout_change *mlc) +{ + struct mdd_object *obj = md2mdd_obj(o); + struct mdd_device *mdd = mdd_obj2mdd_dev(obj); + struct lu_buf *buf = mdd_buf_get(env, NULL, 0); + struct lov_comp_md_v1 *lcm; + struct thandle *handle; + int flr_state; + int rc; + ENTRY; + + if (mlc->mlc_opc != MD_LAYOUT_WRITE) + RETURN(-ENOTSUPP); + + handle = mdd_trans_create(env, mdd); + if (IS_ERR(handle)) + RETURN(PTR_ERR(handle)); + + rc = mdd_get_lov_ea(env, obj, buf); + if (rc < 0) { + if (rc == -ENODATA) + rc = -EINVAL; + GOTO(out, rc); + } + + /* analyze the layout to make sure it's a FLR file */ + lcm = buf->lb_buf; + if (le32_to_cpu(lcm->lcm_magic) != LOV_MAGIC_COMP_V1) + GOTO(out, rc = -EINVAL); + + flr_state = le16_to_cpu(lcm->lcm_flags) & LCM_FL_FLR_MASK; + + /* please refer to HLD of FLR for state transition */ + switch (flr_state) { + case LCM_FL_NOT_FLR: + rc = mdd_layout_instantiate_component(env, obj, mlc, handle); + break; + case LCM_FL_WRITE_PENDING: + rc = mdd_layout_update_write_pending(env, obj, mlc, handle); + break; + case LCM_FL_RDONLY: + rc = mdd_layout_update_rdonly(env, obj, mlc, handle); + break; + case LCM_FL_SYNC_PENDING: + default: + rc = 0; + break; + } + EXIT; + +out: + mdd_trans_stop(env, mdd, rc, handle); + lu_buf_free(buf); + return rc; } void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent, diff --git a/lustre/obdclass/llog_swab.c b/lustre/obdclass/llog_swab.c index 480a9f5..3f9d45b 100644 --- a/lustre/obdclass/llog_swab.c +++ b/lustre/obdclass/llog_swab.c @@ -252,6 +252,7 @@ void lustre_swab_llog_rec(struct llog_rec_hdr *rec) (struct llog_setattr64_rec_v2 *)rec; __swab32s(&lsr2->lsr_projid); + __swab32s(&lsr2->lsr_layout_version); tail = &lsr2->lsr_tail; } else { tail = &lsr->lsr_tail; diff --git a/lustre/ofd/ofd_dev.c b/lustre/ofd/ofd_dev.c index 6c0abd0..8b498c0 100644 --- a/lustre/ofd/ofd_dev.c +++ b/lustre/ofd/ofd_dev.c @@ -1367,7 +1367,6 @@ static int ofd_setattr_hdl(struct tgt_session_info *tsi) struct ost_body *repbody; struct ldlm_resource *res; struct ofd_object *fo; - struct filter_fid *ff = NULL; int rc = 0; ENTRY; @@ -1407,13 +1406,8 @@ static int ofd_setattr_hdl(struct tgt_session_info *tsi) la_from_obdo(&fti->fti_attr, &body->oa, body->oa.o_valid); fti->fti_attr.la_valid &= ~LA_TYPE; - if (body->oa.o_valid & OBD_MD_FLFID) { - ff = &fti->fti_mds_fid; - ofd_prepare_fidea(ff, &body->oa); - } - /* setting objects attributes (including owner/group) */ - rc = ofd_attr_set(tsi->tsi_env, fo, &fti->fti_attr, ff); + rc = ofd_attr_set(tsi->tsi_env, fo, &fti->fti_attr, &body->oa); if (rc != 0) GOTO(out_put, rc); @@ -2017,7 +2011,6 @@ static int ofd_punch_hdl(struct tgt_session_info *tsi) struct ldlm_namespace *ns = tsi->tsi_tgt->lut_obd->obd_namespace; struct ldlm_resource *res; struct ofd_object *fo; - struct filter_fid *ff = NULL; __u64 flags = 0; struct lustre_handle lh = { 0, }; int rc; @@ -2078,13 +2071,8 @@ static int ofd_punch_hdl(struct tgt_session_info *tsi) info->fti_attr.la_size = start; info->fti_attr.la_valid |= LA_SIZE; - if (oa->o_valid & OBD_MD_FLFID) { - ff = &info->fti_mds_fid; - ofd_prepare_fidea(ff, oa); - } - rc = ofd_object_punch(tsi->tsi_env, fo, start, end, &info->fti_attr, - ff, (struct obdo *)oa); + (struct obdo *)oa); if (rc) GOTO(out_put, rc); diff --git a/lustre/ofd/ofd_internal.h b/lustre/ofd/ofd_internal.h index 9c7a582..d303646 100644 --- a/lustre/ofd/ofd_internal.h +++ b/lustre/ofd/ofd_internal.h @@ -325,6 +325,8 @@ int ofd_start_inconsistency_verification_thread(struct ofd_device *ofd); int ofd_stop_inconsistency_verification_thread(struct ofd_device *ofd); int ofd_verify_ff(const struct lu_env *env, struct ofd_object *fo, struct obdo *oa); +int ofd_verify_layout_version(const struct lu_env *env, + struct ofd_object *fo, const struct obdo *oa); int ofd_preprw(const struct lu_env *env,int cmd, struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, struct niobuf_remote *rnb, int *nr_local, @@ -358,6 +360,8 @@ struct ofd_object *ofd_object_find(const struct lu_env *env, struct ofd_device *ofd, const struct lu_fid *fid); int ofd_object_ff_load(const struct lu_env *env, struct ofd_object *fo); +int ofd_object_ff_update(const struct lu_env *env, struct ofd_object *fo, + const struct obdo *oa, struct filter_fid *ff); int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd, u64 id, struct ofd_seq *oseq, int nr, int sync); @@ -367,10 +371,10 @@ static inline void ofd_object_put(const struct lu_env *env, dt_object_put(env, &fo->ofo_obj); } int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo, - struct lu_attr *la, struct filter_fid *ff); + struct lu_attr *la, struct obdo *oa); int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo, __u64 start, __u64 end, struct lu_attr *la, - struct filter_fid *ff, struct obdo *oa); + struct obdo *oa); int ofd_destroy(const struct lu_env *, struct ofd_object *, int); int ofd_attr_get(const struct lu_env *env, struct ofd_object *fo, struct lu_attr *la); @@ -485,23 +489,6 @@ static inline void ofd_slc_set(struct ofd_device *ofd) ofd->ofd_lut.lut_sync_lock_cancel = ALWAYS_SYNC_ON_CANCEL; } -static inline void ofd_prepare_fidea(struct filter_fid *ff, - const struct obdo *oa) -{ - /* packing fid and converting it to LE for storing into EA. - * Here ->o_stripe_idx should be filled by LOV and rest of - * fields - by client. */ - ff->ff_parent.f_seq = cpu_to_le64(oa->o_parent_seq); - ff->ff_parent.f_oid = cpu_to_le32(oa->o_parent_oid); - /* XXX: we are ignoring o_parent_ver here, since this should - * be the same for all objects in this fileset. */ - ff->ff_parent.f_ver = cpu_to_le32(oa->o_stripe_idx); - if (oa->o_valid & OBD_MD_FLOSTLAYOUT) - ost_layout_cpu_to_le(&ff->ff_layout, &oa->o_layout); - else - memset(&ff->ff_layout, 0, sizeof(ff->ff_layout)); -} - static inline int ofd_validate_seq(struct obd_export *exp, __u64 seq) { struct filter_export_data *fed = &exp->exp_filter_data; diff --git a/lustre/ofd/ofd_io.c b/lustre/ofd/ofd_io.c index 924bffa..2c51f01 100644 --- a/lustre/ofd/ofd_io.c +++ b/lustre/ofd/ofd_io.c @@ -427,6 +427,49 @@ int ofd_verify_ff(const struct lu_env *env, struct ofd_object *fo, } /** + * FLR: verify the layout version of object. + * + * \param[in] env execution environment + * \param[in] fo OFD object + * \param[in] oa OBDO structure with layout version + * + * \retval 0 on successful verification + * \retval -EINPROGRESS layout version is in transfer + * \retval -ESTALE the layout version on client is stale + */ +int ofd_verify_layout_version(const struct lu_env *env, + struct ofd_object *fo, const struct obdo *oa) +{ + int rc; + ENTRY; + + rc = ofd_object_ff_load(env, fo); + if (rc < 0) { + if (rc == -ENODATA) + rc = -EINPROGRESS; + GOTO(out, rc); + } + + /* this update is not legitimate */ + if (oa->o_layout_version < fo->ofo_ff.ff_layout_version) + GOTO(out, rc = -ESTALE); + + /* layout version is not transmitted yet */ + if (oa->o_layout_version > + fo->ofo_ff.ff_layout_version + fo->ofo_ff.ff_range) + GOTO(out, rc = -EINPROGRESS); + + EXIT; + +out: + CDEBUG(D_INODE, DFID " verify layout version: %u vs. %u, rc: %d\n", + PFID(lu_object_fid(&fo->ofo_obj.do_lu)), + fo->ofo_ff.ff_layout_version, oa->o_layout_version, rc); + return rc; + +} + +/** * Prepare buffers for read request processing. * * This function converts remote buffers from client to local buffers @@ -628,6 +671,18 @@ static int ofd_preprw_write(const struct lu_env *env, struct obd_export *exp, } } + /* need to verify layout version */ + if (oa->o_valid & OBD_MD_LAYOUT_VERSION) { + rc = ofd_verify_layout_version(env, fo, oa); + if (rc) { + ofd_read_unlock(env, fo); + ofd_object_put(env, fo); + GOTO(out, rc); + } + + oa->o_valid &= ~OBD_MD_LAYOUT_VERSION; + } + /* Process incoming grant info, set OBD_BRW_GRANTED flag and grant some * space back if possible */ tgt_grant_prepare_write(env, exp, oa, rnb, obj->ioo_bufcnt); @@ -817,7 +872,7 @@ ofd_commitrw_read(const struct lu_env *env, struct ofd_device *ofd, * \param[in] ofd OFD device * \param[in] ofd_obj OFD object * \param[in] la object attributes - * \param[in] ff parent FID + * \param[in] oa obdo * * \retval 0 on successful attributes update * \retval negative value on error @@ -825,14 +880,15 @@ ofd_commitrw_read(const struct lu_env *env, struct ofd_device *ofd, static int ofd_write_attr_set(const struct lu_env *env, struct ofd_device *ofd, struct ofd_object *ofd_obj, struct lu_attr *la, - struct filter_fid *ff) + struct obdo *oa) { struct ofd_thread_info *info = ofd_info(env); + struct filter_fid *ff = &info->fti_mds_fid; __u64 valid = la->la_valid; - int rc; struct thandle *th; struct dt_object *dt_obj; - int ff_needed = 0; + int fl = 0; + int rc; ENTRY; @@ -847,15 +903,11 @@ ofd_write_attr_set(const struct lu_env *env, struct ofd_device *ofd, if (rc != 0) GOTO(out, rc); - if (ff != NULL) { - rc = ofd_object_ff_load(env, ofd_obj); - if (rc == -ENODATA) - ff_needed = 1; - else if (rc < 0) - GOTO(out, rc); - } + fl = ofd_object_ff_update(env, ofd_obj, oa, ff); + if (fl < 0) + GOTO(out, rc = fl); - if (!la->la_valid && !ff_needed) + if (!la->la_valid && !fl) /* no attributes to set */ GOTO(out, rc = 0); @@ -869,14 +921,12 @@ ofd_write_attr_set(const struct lu_env *env, struct ofd_device *ofd, GOTO(out_tx, rc); } - if (ff_needed) { + if (fl) { if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1)) ff->ff_parent.f_oid = cpu_to_le32(1UL << 31); else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2)) le32_add_cpu(&ff->ff_parent.f_oid, -1); - info->fti_buf.lb_buf = ff; - info->fti_buf.lb_len = sizeof(*ff); rc = dt_declare_xattr_set(env, dt_obj, &info->fti_buf, XATTR_NAME_FID, 0, th); if (rc) @@ -896,14 +946,21 @@ ofd_write_attr_set(const struct lu_env *env, struct ofd_device *ofd, GOTO(out_tx, rc); } - /* set filter fid EA */ - if (ff_needed) { + /* set filter fid EA. + * FIXME: it holds read lock of ofd object to modify the XATTR_NAME_FID + * while the write lock should be held. However, it should work because + * write RPCs only modify ff_{parent,layout} and those information will + * be the same from all the write RPCs. The reason that fl is not used + * in dt_xattr_set() is to allow this race. */ + if (fl) { if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID)) GOTO(out_tx, rc); + info->fti_buf.lb_buf = ff; + info->fti_buf.lb_len = sizeof(*ff); rc = dt_xattr_set(env, dt_obj, &info->fti_buf, XATTR_NAME_FID, 0, th); - if (!rc) + if (rc == 0) filter_fid_le_to_cpu(&ofd_obj->ofo_ff, ff, sizeof(*ff)); } @@ -1012,7 +1069,7 @@ static int ofd_soft_sync_cb_add(struct thandle *th, struct obd_export *exp) static int ofd_commitrw_write(const struct lu_env *env, struct obd_export *exp, struct ofd_device *ofd, const struct lu_fid *fid, - struct lu_attr *la, struct filter_fid *ff, int objcount, + struct lu_attr *la, struct obdo *oa, int objcount, int niocount, struct niobuf_local *lnb, unsigned long granted, int old_rc) { @@ -1048,7 +1105,7 @@ ofd_commitrw_write(const struct lu_env *env, struct obd_export *exp, * dt_declare_write_commit() since quota enforcement is now handled in * declare phases. */ - rc = ofd_write_attr_set(env, ofd, fo, la, ff); + rc = ofd_write_attr_set(env, ofd, fo, la, oa); if (rc) GOTO(out, rc); @@ -1203,7 +1260,6 @@ int ofd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp, struct ofd_mod_data *fmd; __u64 valid; struct ofd_device *ofd = ofd_exp(exp); - struct filter_fid *ff = NULL; const struct lu_fid *fid = &oa->o_oi.oi_fid; int rc = 0; @@ -1227,13 +1283,8 @@ int ofd_commitrw(const struct lu_env *env, int cmd, struct obd_export *exp, ofd_fmd_put(exp, fmd); la_from_obdo(&info->fti_attr, oa, valid); - if (oa->o_valid & OBD_MD_FLFID) { - ff = &info->fti_mds_fid; - ofd_prepare_fidea(ff, oa); - } - rc = ofd_commitrw_write(env, exp, ofd, fid, &info->fti_attr, - ff, objcount, npages, lnb, + oa, objcount, npages, lnb, oa->o_grant_used, old_rc); if (rc == 0) obdo_from_la(oa, &info->fti_attr, diff --git a/lustre/ofd/ofd_obd.c b/lustre/ofd/ofd_obd.c index bb4e1be..0a251f3 100644 --- a/lustre/ofd/ofd_obd.c +++ b/lustre/ofd/ofd_obd.c @@ -817,7 +817,6 @@ static int ofd_echo_setattr(const struct lu_env *env, struct obd_export *exp, struct ldlm_resource *res; struct ofd_object *fo; struct lu_fid *fid = &oa->o_oi.oi_fid; - struct filter_fid *ff = NULL; int rc = 0; ENTRY; @@ -854,13 +853,8 @@ static int ofd_echo_setattr(const struct lu_env *env, struct obd_export *exp, la_from_obdo(&info->fti_attr, oa, oa->o_valid); info->fti_attr.la_valid &= ~LA_TYPE; - if (oa->o_valid & OBD_MD_FLFID) { - ff = &info->fti_mds_fid; - ofd_prepare_fidea(ff, oa); - } - /* setting objects attributes (including owner/group) */ - rc = ofd_attr_set(env, fo, &info->fti_attr, ff); + rc = ofd_attr_set(env, fo, &info->fti_attr, oa); if (rc) GOTO(out_unlock, rc); diff --git a/lustre/ofd/ofd_objects.c b/lustre/ofd/ofd_objects.c index 9f76081..ec5fb4f 100644 --- a/lustre/ofd/ofd_objects.c +++ b/lustre/ofd/ofd_objects.c @@ -152,8 +152,7 @@ int ofd_object_ff_load(const struct lu_env *env, struct ofd_object *fo) if (unlikely(rc < sizeof(struct lu_fid))) { fid_zero(&ff->ff_parent); - - return -ENODATA; + return -EINVAL; } filter_fid_le_to_cpu(ff, ff, rc); @@ -474,6 +473,91 @@ int ofd_attr_handle_id(const struct lu_env *env, struct ofd_object *fo, } /** + * Check if it needs to update filter_fid by the value of @oa. + * + * \param[in] env env + * \param[in] fo ofd object + * \param[in] oa obdo from client or MDT + * \param[out] ff if filter_fid needs updating, this field is used to + * return the new buffer + * + * \retval < 0 error occurred + * \retval 0 doesn't need to update filter_fid + * \retval FL_XATTR_{CREATE,REPLACE} flag for xattr update + */ +int ofd_object_ff_update(const struct lu_env *env, struct ofd_object *fo, + const struct obdo *oa, struct filter_fid *ff) +{ + int rc = 0; + ENTRY; + + if (!(oa->o_valid & + (OBD_MD_FLFID | OBD_MD_FLOSTLAYOUT | OBD_MD_LAYOUT_VERSION))) + RETURN(0); + + rc = ofd_object_ff_load(env, fo); + if (rc < 0 && rc != -ENODATA) + RETURN(rc); + + LASSERT(ff != &fo->ofo_ff); + if (rc == -ENODATA) { + rc = LU_XATTR_CREATE; + memset(ff, 0, sizeof(*ff)); + } else { + rc = LU_XATTR_REPLACE; + memcpy(ff, &fo->ofo_ff, sizeof(*ff)); + } + + if (oa->o_valid & OBD_MD_FLFID) { + /* packing fid and converting it to LE for storing into EA. + * Here ->o_stripe_idx should be filled by LOV and rest of + * fields - by client. */ + ff->ff_parent.f_seq = oa->o_parent_seq; + ff->ff_parent.f_oid = oa->o_parent_oid; + /* XXX: we are ignoring o_parent_ver here, since this should + * be the same for all objects in this fileset. */ + ff->ff_parent.f_ver = oa->o_stripe_idx; + } + if (oa->o_valid & OBD_MD_FLOSTLAYOUT) + ff->ff_layout = oa->o_layout; + + if (oa->o_valid & OBD_MD_LAYOUT_VERSION) { + CDEBUG(D_INODE, DFID": OST("DFID") layout version %u -> %u\n", + PFID(&fo->ofo_ff.ff_parent), + PFID(lu_object_fid(&fo->ofo_obj.do_lu)), + ff->ff_layout_version, oa->o_layout_version); + + /* only the MDS has the authority to update layout version */ + if (!(exp_connect_flags(ofd_info(env)->fti_exp) & + OBD_CONNECT_MDS)) { + CERROR(DFID": update layout version from client\n", + PFID(&fo->ofo_ff.ff_parent)); + + RETURN(-EPERM); + } + + /* it's not allowed to change it to a smaller value */ + if (oa->o_layout_version < ff->ff_layout_version) + RETURN(-EINVAL); + + if (ff->ff_layout_version == 0) { + ff->ff_layout_version = oa->o_layout_version; + ff->ff_range = 0; + } else if (oa->o_layout_version > ff->ff_layout_version) { + ff->ff_range = MAX(ff->ff_range, + oa->o_layout_version - ff->ff_layout_version); + } + } + + if (memcmp(ff, &fo->ofo_ff, sizeof(*ff))) + filter_fid_cpu_to_le(ff, ff, sizeof(*ff)); + else /* no change */ + rc = 0; + + RETURN(rc); +} + +/** * Set OFD object attributes. * * This function sets OFD object attributes taken from incoming request. @@ -484,19 +568,20 @@ int ofd_attr_handle_id(const struct lu_env *env, struct ofd_object *fo, * \param[in] env execution environment * \param[in] fo OFD object * \param[in] la object attributes - * \param[in] ff filter_fid structure, contains additional attributes + * \param[in] oa obdo carries fid, ost_layout, layout version * * \retval 0 if successful * \retval negative value on error */ int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo, - struct lu_attr *la, struct filter_fid *ff) + struct lu_attr *la, struct obdo *oa) { struct ofd_thread_info *info = ofd_info(env); struct ofd_device *ofd = ofd_obj2dev(fo); + struct filter_fid *ff = &info->fti_mds_fid; struct thandle *th; struct ofd_mod_data *fmd; - int ff_needed = 0; + int fl; int rc; int rc2; ENTRY; @@ -521,13 +606,9 @@ int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo, if (rc != 0) GOTO(unlock, rc); - if (ff != NULL) { - rc = ofd_object_ff_load(env, fo); - if (rc == -ENODATA) - ff_needed = 1; - else if (rc < 0) - GOTO(unlock, rc); - } + fl = ofd_object_ff_update(env, fo, oa, ff); + if (fl < 0) + GOTO(unlock, rc = fl); th = ofd_trans_create(env, ofd); if (IS_ERR(th)) @@ -537,7 +618,7 @@ int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo, if (rc) GOTO(stop, rc); - if (ff_needed) { + if (fl) { if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1)) ff->ff_parent.f_oid = cpu_to_le32(1UL << 31); else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2)) @@ -546,7 +627,7 @@ int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo, info->fti_buf.lb_buf = ff; info->fti_buf.lb_len = sizeof(*ff); rc = dt_declare_xattr_set(env, ofd_object_child(fo), - &info->fti_buf, XATTR_NAME_FID, 0, + &info->fti_buf, XATTR_NAME_FID, fl, th); if (rc) GOTO(stop, rc); @@ -560,12 +641,14 @@ int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo, if (rc) GOTO(stop, rc); - if (ff_needed) { + if (fl) { if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID)) GOTO(stop, rc); + info->fti_buf.lb_buf = ff; + info->fti_buf.lb_len = sizeof(*ff); rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf, - XATTR_NAME_FID, 0, th); + XATTR_NAME_FID, fl, th); if (!rc) filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff)); } @@ -599,7 +682,6 @@ unlock: * \param[in] start start offset to punch from * \param[in] end end of punch * \param[in] la object attributes - * \param[in] ff filter_fid structure * \param[in] oa obdo struct from incoming request * * \retval 0 if successful @@ -607,14 +689,15 @@ unlock: */ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo, __u64 start, __u64 end, struct lu_attr *la, - struct filter_fid *ff, struct obdo *oa) + struct obdo *oa) { struct ofd_thread_info *info = ofd_info(env); struct ofd_device *ofd = ofd_obj2dev(fo); struct ofd_mod_data *fmd; struct dt_object *dob = ofd_object_child(fo); + struct filter_fid *ff = &info->fti_mds_fid; struct thandle *th; - int ff_needed = 0; + int fl; int rc; int rc2; @@ -638,6 +721,15 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo, GOTO(unlock, rc); } + /* need to verify layout version */ + if (oa->o_valid & OBD_MD_LAYOUT_VERSION) { + rc = ofd_verify_layout_version(env, fo, oa); + if (rc) + GOTO(unlock, rc); + + oa->o_valid &= ~OBD_MD_LAYOUT_VERSION; + } + /* VBR: version recovery check */ rc = ofd_version_get_check(info, fo); if (rc) @@ -647,13 +739,9 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo, if (rc != 0) GOTO(unlock, rc); - if (ff != NULL) { - rc = ofd_object_ff_load(env, fo); - if (rc == -ENODATA) - ff_needed = 1; - else if (rc < 0) - GOTO(unlock, rc); - } + fl = ofd_object_ff_update(env, fo, oa, ff); + if (fl < 0) + GOTO(unlock, rc = fl); th = ofd_trans_create(env, ofd); if (IS_ERR(th)) @@ -667,7 +755,7 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo, if (rc) GOTO(stop, rc); - if (ff_needed) { + if (fl) { if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1)) ff->ff_parent.f_oid = cpu_to_le32(1UL << 31); else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2)) @@ -676,7 +764,7 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo, info->fti_buf.lb_buf = ff; info->fti_buf.lb_len = sizeof(*ff); rc = dt_declare_xattr_set(env, ofd_object_child(fo), - &info->fti_buf, XATTR_NAME_FID, 0, + &info->fti_buf, XATTR_NAME_FID, fl, th); if (rc) GOTO(stop, rc); @@ -694,12 +782,12 @@ int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo, if (rc) GOTO(stop, rc); - if (ff_needed) { + if (fl) { if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID)) GOTO(stop, rc); rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf, - XATTR_NAME_FID, 0, th); + XATTR_NAME_FID, fl, th); if (!rc) filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff)); } diff --git a/lustre/osc/osc_cache.c b/lustre/osc/osc_cache.c index 101e666..f94f053 100644 --- a/lustre/osc/osc_cache.c +++ b/lustre/osc/osc_cache.c @@ -2533,6 +2533,9 @@ int osc_queue_async_io(const struct lu_env *env, struct cl_io *io, ++ext->oe_nr_pages; list_add_tail(&oap->oap_pending_item, &ext->oe_pages); osc_object_unlock(osc); + + if (!ext->oe_layout_version) + ext->oe_layout_version = io->ci_layout_version; } RETURN(rc); @@ -2720,8 +2723,9 @@ int osc_cancel_async_page(const struct lu_env *env, struct osc_page *ops) RETURN(rc); } -int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj, - struct list_head *list, int brw_flags) +int osc_queue_sync_pages(const struct lu_env *env, const struct cl_io *io, + struct osc_object *obj, struct list_head *list, + int brw_flags) { struct client_obd *cli = osc_cli(obj); struct osc_extent *ext; @@ -2771,6 +2775,7 @@ int osc_queue_sync_pages(const struct lu_env *env, struct osc_object *obj, ext->oe_nr_pages = page_count; ext->oe_mppr = mppr; list_splice_init(list, &ext->oe_pages); + ext->oe_layout_version = io->ci_layout_version; osc_object_lock(obj); /* Reuse the initial refcount for RPC, don't drop it */ diff --git a/lustre/osc/osc_io.c b/lustre/osc/osc_io.c index 6650f0a..5ccff72 100644 --- a/lustre/osc/osc_io.c +++ b/lustre/osc/osc_io.c @@ -187,7 +187,7 @@ int osc_io_submit(const struct lu_env *env, const struct cl_io_slice *ios, if (++queued == max_pages) { queued = 0; - result = osc_queue_sync_pages(env, osc, &list, + result = osc_queue_sync_pages(env, io, osc, &list, brw_flags); if (result < 0) break; @@ -195,7 +195,7 @@ int osc_io_submit(const struct lu_env *env, const struct cl_io_slice *ios, } if (queued > 0) - result = osc_queue_sync_pages(env, osc, &list, brw_flags); + result = osc_queue_sync_pages(env, io, osc, &list, brw_flags); /* Update c/mtime for sync write. LU-7310 */ if (crt == CRT_WRITE && qout->pl_nr > 0 && result == 0) { @@ -556,6 +556,12 @@ static int osc_io_setattr_start(const struct lu_env *env, oa->o_flags = OBD_FL_SRVLOCK; oa->o_valid |= OBD_MD_FLFLAGS; } + + if (io->ci_layout_version > 0) { + /* verify layout version */ + oa->o_valid |= OBD_MD_LAYOUT_VERSION; + oa->o_layout_version = io->ci_layout_version; + } } else { LASSERT(oio->oi_lockless == 0); } diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 9cb2c6d..6d7929d 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -1932,6 +1932,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, int i; int grant = 0; int rc; + __u32 layout_version = 0; struct list_head rpc_list = LIST_HEAD_INIT(rpc_list); struct ost_body *body; ENTRY; @@ -1943,6 +1944,7 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, mem_tight |= ext->oe_memalloc; grant += ext->oe_grants; page_count += ext->oe_nr_pages; + layout_version = MAX(layout_version, ext->oe_layout_version); if (obj == NULL) obj = ext->oe_obj; } @@ -2000,8 +2002,16 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli, crattr->cra_oa = oa; cl_req_attr_set(env, osc2cl(obj), crattr); - if (cmd == OBD_BRW_WRITE) + if (cmd == OBD_BRW_WRITE) { oa->o_grant_used = grant; + if (layout_version > 0) { + CDEBUG(D_LAYOUT, DFID": write with layout version %u\n", + PFID(&oa->o_oi.oi_fid), layout_version); + + oa->o_layout_version = layout_version; + oa->o_valid |= OBD_MD_LAYOUT_VERSION; + } + } sort_brw_pages(pga, page_count); rc = osc_brw_prep_request(cmd, cli, oa, page_count, pga, &req, 0); diff --git a/lustre/osp/osp_object.c b/lustre/osp/osp_object.c index 446f55a..d67de46 100644 --- a/lustre/osp/osp_object.c +++ b/lustre/osp/osp_object.c @@ -667,10 +667,10 @@ static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt, RETURN(rc); } - if (!(attr->la_valid & (LA_UID | LA_GID | LA_PROJID))) + if (!(attr->la_valid & LA_REMOTE_ATTR_SET)) RETURN(0); - /* track all UID/GID changes via llog */ + /* track all UID/GID, projid, and layout version changes via llog */ rc = osp_sync_declare_add(env, o, MDS_SETATTR64_REC, th); return 0; @@ -704,8 +704,8 @@ static int osp_attr_set(const struct lu_env *env, struct dt_object *dt, int rc = 0; ENTRY; - /* we're interested in uid/gid/projid changes only */ - if (!(attr->la_valid & (LA_UID | LA_GID | LA_PROJID))) + /* we're interested in uid/gid/projid/layout version changes only */ + if (!(attr->la_valid & LA_REMOTE_ATTR_SET)) RETURN(0); if (!is_only_remote_trans(th)) { diff --git a/lustre/osp/osp_sync.c b/lustre/osp/osp_sync.c index 80c11c2..47ec49c 100644 --- a/lustre/osp/osp_sync.c +++ b/lustre/osp/osp_sync.c @@ -427,11 +427,14 @@ static int osp_sync_add_rec(const struct lu_env *env, struct osp_device *d, LASSERT(attr); osi->osi_setattr.lsr_uid = attr->la_uid; osi->osi_setattr.lsr_gid = attr->la_gid; + osi->osi_setattr.lsr_layout_version = attr->la_layout_version; osi->osi_setattr.lsr_projid = attr->la_projid; osi->osi_setattr.lsr_valid = ((attr->la_valid & LA_UID) ? OBD_MD_FLUID : 0) | ((attr->la_valid & LA_GID) ? OBD_MD_FLGID : 0) | ((attr->la_valid & LA_PROJID) ? OBD_MD_FLPROJID : 0); + if (attr->la_valid & LA_LAYOUT_VERSION) + osi->osi_setattr.lsr_valid |= OBD_MD_LAYOUT_VERSION; break; default: LBUG(); @@ -745,7 +748,7 @@ static int osp_sync_new_setattr_job(struct osp_device *d, /* lsr_valid can only be 0 or HAVE OBD_MD_{FLUID, FLGID, FLPROJID} set, * so no bits other than these should be set. */ if ((rec->lsr_valid & ~(OBD_MD_FLUID | OBD_MD_FLGID | - OBD_MD_FLPROJID)) != 0) { + OBD_MD_FLPROJID | OBD_MD_LAYOUT_VERSION)) != 0) { CERROR("%s: invalid setattr record, lsr_valid:%llu\n", d->opd_obd->obd_name, rec->lsr_valid); /* return 1 on invalid record */ @@ -762,9 +765,11 @@ static int osp_sync_new_setattr_job(struct osp_device *d, body->oa.o_uid = rec->lsr_uid; body->oa.o_gid = rec->lsr_gid; body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID; - if (h->lrh_len > sizeof(struct llog_setattr64_rec)) - body->oa.o_projid = ((struct llog_setattr64_rec_v2 *) - rec)->lsr_projid; + if (h->lrh_len > sizeof(struct llog_setattr64_rec)) { + struct llog_setattr64_rec_v2 *rec_v2 = (typeof(rec_v2))rec; + body->oa.o_projid = rec_v2->lsr_projid; + body->oa.o_layout_version = rec_v2->lsr_layout_version; + } /* old setattr record (prior 2.6.0) doesn't have 'valid' stored, * we assume that both UID and GID are valid in that case. */ @@ -773,6 +778,12 @@ static int osp_sync_new_setattr_job(struct osp_device *d, else body->oa.o_valid |= rec->lsr_valid; + if (body->oa.o_valid & OBD_MD_LAYOUT_VERSION) { + OBD_FAIL_TIMEOUT(OBD_FAIL_FLR_LV_DELAY, cfs_fail_val); + if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_FLR_LV_INC))) + ++body->oa.o_layout_version; + } + osp_sync_send_new_rpc(d, llh, h, req); RETURN(0); } diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index 2f07e36..50b2d6b 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -1728,7 +1728,7 @@ void lustre_swab_obdo (struct obdo *o) __swab32s(&o->o_stripe_idx); __swab32s(&o->o_parent_ver); lustre_swab_ost_layout(&o->o_layout); - CLASSERT(offsetof(typeof(*o), o_padding_3) != 0); + __swab32s(&o->o_layout_version); __swab32s(&o->o_uid_h); __swab32s(&o->o_gid_h); __swab64s(&o->o_data_version); diff --git a/lustre/ptlrpc/wiretest.c b/lustre/ptlrpc/wiretest.c index ec8b4d8..22c0912 100644 --- a/lustre/ptlrpc/wiretest.c +++ b/lustre/ptlrpc/wiretest.c @@ -1434,10 +1434,10 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct obdo, o_layout)); LASSERTF((int)sizeof(((struct obdo *)0)->o_layout) == 28, "found %lld\n", (long long)(int)sizeof(((struct obdo *)0)->o_layout)); - LASSERTF((int)offsetof(struct obdo, o_padding_3) == 164, "found %lld\n", - (long long)(int)offsetof(struct obdo, o_padding_3)); - LASSERTF((int)sizeof(((struct obdo *)0)->o_padding_3) == 4, "found %lld\n", - (long long)(int)sizeof(((struct obdo *)0)->o_padding_3)); + LASSERTF((int)offsetof(struct obdo, o_layout_version) == 164, "found %lld\n", + (long long)(int)offsetof(struct obdo, o_layout_version)); + LASSERTF((int)sizeof(((struct obdo *)0)->o_layout_version) == 4, "found %lld\n", + (long long)(int)sizeof(((struct obdo *)0)->o_layout_version)); LASSERTF((int)offsetof(struct obdo, o_uid_h) == 168, "found %lld\n", (long long)(int)offsetof(struct obdo, o_uid_h)); LASSERTF((int)sizeof(((struct obdo *)0)->o_uid_h) == 4, "found %lld\n", diff --git a/lustre/utils/wirecheck.c b/lustre/utils/wirecheck.c index a2e0c10..b5fbf88 100644 --- a/lustre/utils/wirecheck.c +++ b/lustre/utils/wirecheck.c @@ -640,7 +640,7 @@ check_obdo(void) CHECK_MEMBER(obdo, o_parent_ver); CHECK_MEMBER(obdo, o_handle); CHECK_MEMBER(obdo, o_layout); - CHECK_MEMBER(obdo, o_padding_3); + CHECK_MEMBER(obdo, o_layout_version); CHECK_MEMBER(obdo, o_uid_h); CHECK_MEMBER(obdo, o_gid_h); CHECK_MEMBER(obdo, o_data_version); diff --git a/lustre/utils/wiretest.c b/lustre/utils/wiretest.c index 0cbe43c..c0bb3fd 100644 --- a/lustre/utils/wiretest.c +++ b/lustre/utils/wiretest.c @@ -1453,10 +1453,10 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct obdo, o_layout)); LASSERTF((int)sizeof(((struct obdo *)0)->o_layout) == 28, "found %lld\n", (long long)(int)sizeof(((struct obdo *)0)->o_layout)); - LASSERTF((int)offsetof(struct obdo, o_padding_3) == 164, "found %lld\n", - (long long)(int)offsetof(struct obdo, o_padding_3)); - LASSERTF((int)sizeof(((struct obdo *)0)->o_padding_3) == 4, "found %lld\n", - (long long)(int)sizeof(((struct obdo *)0)->o_padding_3)); + LASSERTF((int)offsetof(struct obdo, o_layout_version) == 164, "found %lld\n", + (long long)(int)offsetof(struct obdo, o_layout_version)); + LASSERTF((int)sizeof(((struct obdo *)0)->o_layout_version) == 4, "found %lld\n", + (long long)(int)sizeof(((struct obdo *)0)->o_layout_version)); LASSERTF((int)offsetof(struct obdo, o_uid_h) == 168, "found %lld\n", (long long)(int)offsetof(struct obdo, o_uid_h)); LASSERTF((int)sizeof(((struct obdo *)0)->o_uid_h) == 4, "found %lld\n", -- 1.8.3.1