From e584b8fa4150662876de8c195e07253e30ed2cdd Mon Sep 17 00:00:00 2001 From: Bobi Jam Date: Thu, 6 Apr 2017 07:58:41 +0800 Subject: [PATCH] LU-8998 clio: Client side implementation for PFL Make client layer support composite layout. Plain layout will be stored in LOV layer as a composite layout containing a single component. Reviewed-on: https://review.whamcloud.com/24850 Signed-off-by: Jinshan Xiong Signed-off-by: Bobi Jam Signed-off-by: Niu Yawei Change-Id: Ic3b85a4b10c66745e5c72ff02ea313baa0b12bb5 Reviewed-by: Lai Siyao --- lustre/include/lustre/lustre_user.h | 10 +- lustre/lov/lov_cl_internal.h | 174 ++++---- lustre/lov/lov_ea.c | 67 +-- lustre/lov/lov_internal.h | 43 +- lustre/lov/lov_io.c | 588 ++++++++++++++------------ lustre/lov/lov_lock.c | 91 +++-- lustre/lov/lov_merge.c | 11 +- lustre/lov/lov_object.c | 792 +++++++++++++++++++++--------------- lustre/lov/lov_offset.c | 71 ++-- lustre/lov/lov_page.c | 38 +- lustre/lov/lovsub_object.c | 23 +- lustre/osc/osc_lock.c | 7 +- 12 files changed, 1092 insertions(+), 823 deletions(-) diff --git a/lustre/include/lustre/lustre_user.h b/lustre/include/lustre/lustre_user.h index 4e30017..31ea745 100644 --- a/lustre/include/lustre/lustre_user.h +++ b/lustre/include/lustre/lustre_user.h @@ -419,6 +419,15 @@ struct lu_extent { __u64 e_end; }; +#define DEXT "[ %#llx , %#llx )" +#define PEXT(ext) (ext)->e_start, (ext)->e_end + +static inline bool lu_extent_is_overlapped(struct lu_extent *e1, + struct lu_extent *e2) +{ + return e1->e_start < e2->e_end && e2->e_start < e1->e_end; +} + enum lov_comp_md_entry_flags { LCME_FL_PRIMARY = 0x00000001, /* Not used */ LCME_FL_STALE = 0x00000002, /* Not used */ @@ -465,7 +474,6 @@ struct lov_comp_md_v1 { struct lov_comp_md_entry_v1 lcm_entries[0]; } __attribute__((packed)); - static inline __u32 lov_user_md_size(__u16 stripes, __u32 lmm_magic) { if (lmm_magic == LOV_USER_MAGIC_V1) diff --git a/lustre/lov/lov_cl_internal.h b/lustre/lov/lov_cl_internal.h index 4edb452..395b5f9 100644 --- a/lustre/lov/lov_cl_internal.h +++ b/lustre/lov/lov_cl_internal.h @@ -108,8 +108,8 @@ struct lov_device { */ enum lov_layout_type { LLT_EMPTY, /** empty file without body (mknod + truncate) */ - LLT_RAID0, /** striped file */ LLT_RELEASED, /** file with no objects (data in HSM) */ + LLT_COMP, /** support composite layout */ LLT_NR }; @@ -118,10 +118,10 @@ static inline char *llt2str(enum lov_layout_type llt) switch (llt) { case LLT_EMPTY: return "EMPTY"; - case LLT_RAID0: - return "RAID0"; case LLT_RELEASED: return "RELEASED"; + case LLT_COMP: + return "COMPOSITE"; case LLT_NR: LBUG(); } @@ -129,6 +129,42 @@ static inline char *llt2str(enum lov_layout_type llt) return ""; } +struct lov_layout_raid0 { + unsigned lo_nr; + /** + * When this is true, lov_object::lo_attr contains + * valid up to date attributes for a top-level + * object. This field is reset to 0 when attributes of + * any sub-object change. + */ + int lo_attr_valid; + /** + * Array of sub-objects. Allocated when top-object is + * created (lov_init_raid0()). + * + * Top-object is a strict master of its sub-objects: + * it is created before them, and outlives its + * children (this later is necessary so that basic + * functions like cl_object_top() always + * work). Top-object keeps a reference on every + * sub-object. + * + * When top-object is destroyed (lov_delete_raid0()) + * it releases its reference to a sub-object and waits + * until the latter is finally destroyed. + */ + struct lovsub_object **lo_sub; + /** + * protect lo_sub + */ + spinlock_t lo_sub_lock; + /** + * Cached object attribute, built from sub-object + * attributes. + */ + struct cl_attr lo_attr; +}; + /** * lov-specific file state. * @@ -178,47 +214,20 @@ struct lov_object { struct lov_stripe_md *lo_lsm; union lov_layout_state { - struct lov_layout_raid0 { - unsigned lo_nr; - /** - * When this is true, lov_object::lo_attr contains - * valid up to date attributes for a top-level - * object. This field is reset to 0 when attributes of - * any sub-object change. - */ - int lo_attr_valid; - /** - * Array of sub-objects. Allocated when top-object is - * created (lov_init_raid0()). - * - * Top-object is a strict master of its sub-objects: - * it is created before them, and outlives its - * children (this later is necessary so that basic - * functions like cl_object_top() always - * work). Top-object keeps a reference on every - * sub-object. - * - * When top-object is destroyed (lov_delete_raid0()) - * it releases its reference to a sub-object and waits - * until the latter is finally destroyed. - * - * May be vmalloc'd, must be freed with OBD_FREE_LARGE. - */ - struct lovsub_object **lo_sub; - /** - * protect lo_sub - */ - spinlock_t lo_sub_lock; - /** - * Cached object attribute, built from sub-object - * attributes. - */ - struct cl_attr lo_attr; - } raid0; struct lov_layout_state_empty { } empty; struct lov_layout_state_released { } released; + struct lov_layout_composite { + /** + * Current valid entry count of lo_entries. + */ + unsigned int lo_entry_count; + struct lov_layout_entry { + struct lu_extent lle_extent; + struct lov_layout_raid0 lle_raid0; + } *lo_entries; + } composite; } u; /** * Thread that acquired lov_object::lo_type_guard in an exclusive @@ -227,6 +236,12 @@ struct lov_object { struct task_struct *lo_owner; }; +#define lov_foreach_layout_entry(lov, entry) \ + for (entry = &lov->u.composite.lo_entries[0]; \ + entry < &lov->u.composite.lo_entries \ + [lov->u.composite.lo_entry_count]; \ + entry++) + /** * State lov_lock keeps for each sub-lock. */ @@ -237,7 +252,7 @@ struct lov_lock_sub { * hold resources of underlying layers */ unsigned int sub_is_enqueued:1, sub_initialized:1; - int sub_stripe; + int sub_index; }; /** @@ -253,7 +268,8 @@ struct lov_lock { struct lov_page { struct cl_page_slice lps_cl; - unsigned int lps_stripe; /* stripe index */ + /** layout_entry + stripe index, composed using lov_comp_index() */ + unsigned int lps_index; }; /* @@ -305,38 +321,33 @@ struct lov_thread_info { * State that lov_io maintains for every sub-io. */ struct lov_io_sub { - __u16 sub_stripe; /** - * environment's refcheck. - * - * \see cl_env_get() - */ - __u16 sub_refcheck; - /** - * true, iff cl_io_init() was successfully executed against - * lov_io_sub::sub_io. - */ - __u16 sub_io_initialized:1, - /** - * True, iff lov_io_sub::sub_io and lov_io_sub::sub_env weren't - * allocated, but borrowed from a per-device emergency pool. + * Linkage into a list (hanging off lov_io::lis_subios) */ - sub_borrowed:1; + struct list_head sub_list; /** * Linkage into a list (hanging off lov_io::lis_active) of all * sub-io's active for the current IO iteration. */ struct list_head sub_linkage; + unsigned int sub_subio_index; /** * sub-io for a stripe. Ideally sub-io's can be stopped and resumed * independently, with lov acting as a scheduler to maximize overall * throughput. */ - struct cl_io *sub_io; + struct cl_io sub_io; /** * environment, in which sub-io executes. */ struct lu_env *sub_env; + /** + * environment's refcheck. + * + * \see cl_env_get() + */ + __u16 sub_refcheck; + __u16 sub_reenter; }; /** @@ -364,32 +375,29 @@ struct lov_io { * starting position within a file, for the current io loop iteration * (stripe), used by ci_io_loop(). */ - loff_t lis_pos; + loff_t lis_pos; /** * end position with in a file, for the current stripe io. This is * exclusive (i.e., next offset after last byte affected by io). */ - loff_t lis_endpos; - - int lis_stripe_count; - int lis_active_subios; + loff_t lis_endpos; + int lis_nr_subios; /** * the index of ls_single_subio in ls_subios array */ int lis_single_subio_index; - struct cl_io lis_single_subio; + struct lov_io_sub lis_single_subio; /** - * size of ls_subios array, actually the highest stripe # - * May be vmalloc'd, must be freed with OBD_FREE_LARGE(). + * List of active sub-io's. Active sub-io's are under the range + * of [lis_pos, lis_endpos). */ - int lis_nr_subios; - struct lov_io_sub *lis_subs; + struct list_head lis_active; /** - * List of active sub-io's. + * All sub-io's created in this lov_io. */ - struct list_head lis_active; + struct list_head lis_subios; }; struct lov_session { @@ -422,11 +430,11 @@ int lov_io_init (const struct lu_env *env, struct cl_object *obj, int lovsub_lock_init (const struct lu_env *env, struct cl_object *obj, struct cl_lock *lock, const struct cl_io *io); -int lov_lock_init_raid0 (const struct lu_env *env, struct cl_object *obj, +int lov_lock_init_composite(const struct lu_env *env, struct cl_object *obj, struct cl_lock *lock, const struct cl_io *io); int lov_lock_init_empty (const struct lu_env *env, struct cl_object *obj, struct cl_lock *lock, const struct cl_io *io); -int lov_io_init_raid0 (const struct lu_env *env, struct cl_object *obj, +int lov_io_init_composite(const struct lu_env *env, struct cl_object *obj, struct cl_io *io); int lov_io_init_empty (const struct lu_env *env, struct cl_object *obj, struct cl_io *io); @@ -442,7 +450,7 @@ int lovsub_page_init (const struct lu_env *env, struct cl_object *ob, struct cl_page *page, pgoff_t index); int lov_page_init_empty (const struct lu_env *env, struct cl_object *obj, struct cl_page *page, pgoff_t index); -int lov_page_init_raid0 (const struct lu_env *env, struct cl_object *obj, +int lov_page_init_composite(const struct lu_env *env, struct cl_object *obj, struct cl_page *page, pgoff_t index); struct lu_object *lov_object_alloc (const struct lu_env *env, const struct lu_object_header *hdr, @@ -453,6 +461,7 @@ struct lu_object *lovsub_object_alloc(const struct lu_env *env, struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov); int lov_page_stripe(const struct cl_page *page); +int lov_lsm_entry(const struct lov_stripe_md *lsm, __u64 offset); #define lov_foreach_target(lov, var) \ for (var = 0; var < lov_targets_nr(lov); ++var) @@ -625,12 +634,21 @@ static inline struct lov_thread_info *lov_env_info(const struct lu_env *env) return info; } -static inline struct lov_layout_raid0 *lov_r0(struct lov_object *lov) +static inline struct lov_layout_raid0 *lov_r0(struct lov_object *lov, int i) { - LASSERT(lov->lo_type == LLT_RAID0); - LASSERT(lov->lo_lsm->lsm_magic == LOV_MAGIC || - lov->lo_lsm->lsm_magic == LOV_MAGIC_V3); - return &lov->u.raid0; + LASSERT(lov->lo_type == LLT_COMP); + LASSERTF(i < lov->u.composite.lo_entry_count, + "entry %d entry_count %d", i, lov->u.composite.lo_entry_count); + + return &lov->u.composite.lo_entries[i].lle_raid0; +} + +static inline struct lov_stripe_md_entry *lov_lse(struct lov_object *lov, int i) +{ + LASSERT(lov->lo_lsm != NULL); + LASSERT(i < lov->lo_lsm->lsm_entry_count); + + return lov->lo_lsm->lsm_entries[i]; } /* lov_pack.c */ diff --git a/lustre/lov/lov_ea.c b/lustre/lov/lov_ea.c index 896b403..4f8271e 100644 --- a/lustre/lov/lov_ea.c +++ b/lustre/lov/lov_ea.c @@ -308,24 +308,6 @@ out_lsme: return ERR_PTR(rc); } -static void -lsm_stripe_by_index_plain(struct lov_stripe_md *lsm, int *stripeno, - loff_t *lov_off, loff_t *swidth) -{ - if (swidth != NULL) - *swidth = (loff_t)lsm->lsm_entries[0]->lsme_stripe_size * - lsm->lsm_entries[0]->lsme_stripe_count; -} - -static void -lsm_stripe_by_offset_plain(struct lov_stripe_md *lsm, int *stripeno, - loff_t *lov_off, loff_t *swidth) -{ - if (swidth != NULL) - *swidth = (loff_t)lsm->lsm_entries[0]->lsme_stripe_size * - lsm->lsm_entries[0]->lsme_stripe_count; -} - static inline struct lov_stripe_md * lsm_unpackmd_v1(struct lov_obd *lov, void *buf, size_t buf_size) { @@ -335,8 +317,6 @@ lsm_unpackmd_v1(struct lov_obd *lov, void *buf, size_t buf_size) } const struct lsm_operations lsm_v1_ops = { - .lsm_stripe_by_index = lsm_stripe_by_index_plain, - .lsm_stripe_by_offset = lsm_stripe_by_offset_plain, .lsm_unpackmd = lsm_unpackmd_v1, }; @@ -350,8 +330,6 @@ lsm_unpackmd_v3(struct lov_obd *lov, void *buf, size_t buf_size) } const struct lsm_operations lsm_v3_ops = { - .lsm_stripe_by_index = lsm_stripe_by_index_plain, - .lsm_stripe_by_offset = lsm_stripe_by_offset_plain, .lsm_unpackmd = lsm_unpackmd_v3, }; @@ -499,19 +477,44 @@ out_lsm: } const struct lsm_operations lsm_comp_md_v1_ops = { - .lsm_stripe_by_index = lsm_stripe_by_index_plain, - .lsm_stripe_by_offset = lsm_stripe_by_offset_plain, .lsm_unpackmd = lsm_unpackmd_comp_md_v1, }; void dump_lsm(unsigned int level, const struct lov_stripe_md *lsm) { - CDEBUG(level, "lsm %p, objid "DOSTID", maxbytes %#llx, magic 0x%08X," - " stripe_size %u, stripe_count %u, refc: %d," - " layout_gen %u, pool ["LOV_POOLNAMEF"]\n", lsm, - POSTID(&lsm->lsm_oi), lsm->lsm_maxbytes, lsm->lsm_magic, - lsm->lsm_entries[0]->lsme_stripe_size, - lsm->lsm_entries[0]->lsme_stripe_count, - atomic_read(&lsm->lsm_refc), lsm->lsm_layout_gen, - lsm->lsm_entries[0]->lsme_pool_name); + int i; + + CDEBUG(level, "lsm %p, objid "DOSTID", maxbytes %#llx, magic 0x%08X, " + "refc: %d, entry: %u, layout_gen %u\n", + lsm, POSTID(&lsm->lsm_oi), lsm->lsm_maxbytes, lsm->lsm_magic, + atomic_read(&lsm->lsm_refc), lsm->lsm_entry_count, + lsm->lsm_layout_gen); + + for (i = 0; i < lsm->lsm_entry_count; i++) { + struct lov_stripe_md_entry *lse = lsm->lsm_entries[i]; + + CDEBUG(level, + DEXT ": id: %u, magic 0x%08X, stripe count %u, " + "size %u, layout_gen %u, pool: ["LOV_POOLNAMEF"]\n", + PEXT(&lse->lsme_extent), lse->lsme_id, lse->lsme_magic, + lse->lsme_stripe_count, lse->lsme_stripe_size, + lse->lsme_layout_gen, lse->lsme_pool_name); + } +} + +int lov_lsm_entry(const struct lov_stripe_md *lsm, __u64 offset) +{ + int i; + + for (i = 0; i < lsm->lsm_entry_count; i++) { + struct lov_stripe_md_entry *lse = lsm->lsm_entries[i]; + + if ((offset >= lse->lsme_extent.e_start && + offset < lse->lsme_extent.e_end) || + (offset == OBD_OBJECT_EOF && + lse->lsme_extent.e_end == OBD_OBJECT_EOF)) + return i; + } + + return -1; } diff --git a/lustre/lov/lov_internal.h b/lustre/lov/lov_internal.h index 376c7eb..c4db325 100644 --- a/lustre/lov/lov_internal.h +++ b/lustre/lov/lov_internal.h @@ -74,11 +74,25 @@ static inline bool lsm_has_objects(struct lov_stripe_md *lsm) return lsm != NULL && !lsm->lsm_is_released; } +static inline unsigned int lov_comp_index(int entry, int stripe) +{ + LASSERT(entry >= 0 && entry <= SHRT_MAX); + LASSERT(stripe >= 0 && stripe < USHRT_MAX); + + return entry << 16 | stripe; +} + +static inline int lov_comp_stripe(int index) +{ + return index & 0xffff; +} + +static inline int lov_comp_entry(int index) +{ + return index >> 16; +} + struct lsm_operations { - void (*lsm_stripe_by_index)(struct lov_stripe_md *, int *, loff_t *, - loff_t *); - void (*lsm_stripe_by_offset)(struct lov_stripe_md *, int *, loff_t *, - loff_t *); struct lov_stripe_md *(*lsm_unpackmd)(struct lov_obd *, void *, size_t); }; @@ -172,20 +186,21 @@ extern struct lu_kmem_descr lov_caches[]; (char *)((lv)->lov_tgts[index]->ltd_uuid.uuid) /* lov_merge.c */ -int lov_merge_lvb_kms(struct lov_stripe_md *lsm, +int lov_merge_lvb_kms(struct lov_stripe_md *lsm, int index, struct ost_lvb *lvb, __u64 *kms_place); /* lov_offset.c */ -u64 lov_stripe_size(struct lov_stripe_md *lsm, u64 ost_size, int stripeno); -int lov_stripe_offset(struct lov_stripe_md *lsm, loff_t lov_off, int stripeno, - loff_t *obd_off); -loff_t lov_size_to_stripe(struct lov_stripe_md *lsm, u64 file_size, +u64 lov_stripe_size(struct lov_stripe_md *lsm, int index, + u64 ost_size, int stripeno); +int lov_stripe_offset(struct lov_stripe_md *lsm, int index, loff_t lov_off, + int stripeno, loff_t *obd_off); +loff_t lov_size_to_stripe(struct lov_stripe_md *lsm, int index, u64 file_size, int stripeno); -int lov_stripe_intersects(struct lov_stripe_md *lsm, int stripeno, - u64 start, u64 end, u64 *obd_start, u64 *obd_end); -int lov_stripe_number(struct lov_stripe_md *lsm, loff_t lov_off); -pgoff_t lov_stripe_pgoff(struct lov_stripe_md *lsm, pgoff_t stripe_index, - int stripe); +int lov_stripe_intersects(struct lov_stripe_md *lsm, int index, int stripeno, + struct lu_extent *ext, u64 *obd_start, u64 *obd_end); +int lov_stripe_number(struct lov_stripe_md *lsm, int index, loff_t lov_off); +pgoff_t lov_stripe_pgoff(struct lov_stripe_md *lsm, int index, + pgoff_t stripe_index, int stripe); /* lov_request.c */ int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo, diff --git a/lustre/lov/lov_io.c b/lustre/lov/lov_io.c index 00f6ae0..b8c529f 100644 --- a/lustre/lov/lov_io.c +++ b/lustre/lov/lov_io.c @@ -43,179 +43,125 @@ * @{ */ -static void lov_io_sub_fini(const struct lu_env *env, struct lov_io *lio, - struct lov_io_sub *sub) +static inline struct lov_io_sub *lov_sub_alloc(struct lov_io *lio, int index) { - ENTRY; - if (sub->sub_io != NULL) { - if (sub->sub_io_initialized) { - cl_io_fini(sub->sub_env, sub->sub_io); - sub->sub_io_initialized = 0; - lio->lis_active_subios--; - } - if (sub->sub_stripe == lio->lis_single_subio_index) - lio->lis_single_subio_index = -1; - else if (!sub->sub_borrowed) - OBD_FREE_PTR(sub->sub_io); - sub->sub_io = NULL; - } - if (sub->sub_env != NULL && !IS_ERR(sub->sub_env)) { - if (!sub->sub_borrowed) - cl_env_put(sub->sub_env, &sub->sub_refcheck); - sub->sub_env = NULL; - } - EXIT; + struct lov_io_sub *sub; + + if (lio->lis_nr_subios == 0) { + LASSERT(lio->lis_single_subio_index == -1); + sub = &lio->lis_single_subio; + lio->lis_single_subio_index = index; + memset(sub, 0, sizeof(*sub)); + } else { + OBD_ALLOC_PTR(sub); + } + + if (sub != NULL) { + INIT_LIST_HEAD(&sub->sub_list); + INIT_LIST_HEAD(&sub->sub_linkage); + sub->sub_subio_index = index; + } + + return sub; } -static void lov_io_sub_inherit(struct cl_io *io, struct lov_io *lio, - int stripe, loff_t start, loff_t end) +static inline void lov_sub_free(struct lov_io *lio, struct lov_io_sub *sub) { - struct lov_stripe_md *lsm = lio->lis_object->lo_lsm; - struct cl_io *parent = lio->lis_cl.cis_io; + if (sub->sub_subio_index == lio->lis_single_subio_index) { + LASSERT(sub == &lio->lis_single_subio); + lio->lis_single_subio_index = -1; + } else { + OBD_FREE_PTR(sub); + } +} - switch (io->ci_type) { - case CIT_SETATTR: { - io->u.ci_setattr.sa_attr = parent->u.ci_setattr.sa_attr; - io->u.ci_setattr.sa_attr_flags = - parent->u.ci_setattr.sa_attr_flags; - io->u.ci_setattr.sa_valid = parent->u.ci_setattr.sa_valid; - io->u.ci_setattr.sa_stripe_index = stripe; - io->u.ci_setattr.sa_parent_fid = - parent->u.ci_setattr.sa_parent_fid; - if (cl_io_is_trunc(io)) { - loff_t new_size = parent->u.ci_setattr.sa_attr.lvb_size; +static void lov_io_sub_fini(const struct lu_env *env, struct lov_io *lio, + struct lov_io_sub *sub) +{ + ENTRY; - new_size = lov_size_to_stripe(lsm, new_size, stripe); - io->u.ci_setattr.sa_attr.lvb_size = new_size; - } - break; - } - case CIT_DATA_VERSION: { - io->u.ci_data_version.dv_data_version = 0; - io->u.ci_data_version.dv_flags = - parent->u.ci_data_version.dv_flags; - break; - } - case CIT_FAULT: { - struct cl_object *obj = parent->ci_obj; - loff_t off = cl_offset(obj, parent->u.ci_fault.ft_index); + cl_io_fini(sub->sub_env, &sub->sub_io); - io->u.ci_fault = parent->u.ci_fault; - off = lov_size_to_stripe(lsm, off, stripe); - io->u.ci_fault.ft_index = cl_index(obj, off); - break; - } - case CIT_FSYNC: { - io->u.ci_fsync.fi_start = start; - io->u.ci_fsync.fi_end = end; - io->u.ci_fsync.fi_fid = parent->u.ci_fsync.fi_fid; - io->u.ci_fsync.fi_mode = parent->u.ci_fsync.fi_mode; - break; - } - case CIT_READ: - case CIT_WRITE: { - io->u.ci_wr.wr_sync = cl_io_is_sync_write(parent); - if (cl_io_is_append(parent)) { - io->u.ci_wr.wr_append = 1; - } else { - io->u.ci_rw.crw_pos = start; - io->u.ci_rw.crw_count = end - start; - } - break; - } - case CIT_LADVISE: { - io->u.ci_ladvise.li_start = start; - io->u.ci_ladvise.li_end = end; - io->u.ci_ladvise.li_fid = parent->u.ci_ladvise.li_fid; - io->u.ci_ladvise.li_advice = parent->u.ci_ladvise.li_advice; - io->u.ci_ladvise.li_flags = parent->u.ci_ladvise.li_flags; - break; - } - default: - break; + if (sub->sub_env != NULL && !IS_ERR(sub->sub_env)) { + cl_env_put(sub->sub_env, &sub->sub_refcheck); + sub->sub_env = NULL; } + EXIT; } static int lov_io_sub_init(const struct lu_env *env, struct lov_io *lio, - struct lov_io_sub *sub) + struct lov_io_sub *sub) { struct lov_object *lov = lio->lis_object; struct cl_io *sub_io; struct cl_object *sub_obj; struct cl_io *io = lio->lis_cl.cis_io; - int stripe = sub->sub_stripe; - int rc; - - LASSERT(sub->sub_io == NULL); - LASSERT(sub->sub_env == NULL); - LASSERT(sub->sub_stripe < lio->lis_stripe_count); - ENTRY; + int index = lov_comp_entry(sub->sub_subio_index); + int stripe = lov_comp_stripe(sub->sub_subio_index); + int result = 0; + LASSERT(sub->sub_env == NULL); + ENTRY; - if (unlikely(lov_r0(lov)->lo_sub[stripe] == NULL)) + if (unlikely(lov_r0(lov, index)->lo_sub[stripe] == NULL)) RETURN(-EIO); - sub->sub_io_initialized = 0; - sub->sub_borrowed = 0; - /* obtain new environment */ sub->sub_env = cl_env_get(&sub->sub_refcheck); if (IS_ERR(sub->sub_env)) - GOTO(fini_lov_io, rc = PTR_ERR(sub->sub_env)); - - /* - * First sub-io. Use ->lis_single_subio to - * avoid dynamic allocation. - */ - if (lio->lis_active_subios == 0) { - sub->sub_io = &lio->lis_single_subio; - lio->lis_single_subio_index = stripe; - } else { - OBD_ALLOC_PTR(sub->sub_io); - if (sub->sub_io == NULL) - GOTO(fini_lov_io, rc = -ENOMEM); - } + result = PTR_ERR(sub->sub_env); - sub_obj = lovsub2cl(lov_r0(lov)->lo_sub[stripe]); - sub_io = sub->sub_io; + sub_obj = lovsub2cl(lov_r0(lov, index)->lo_sub[stripe]); + sub_io = &sub->sub_io; - sub_io->ci_obj = sub_obj; + sub_io->ci_obj = sub_obj; sub_io->ci_result = 0; - sub_io->ci_parent = io; + + sub_io->ci_parent = io; sub_io->ci_lockreq = io->ci_lockreq; - sub_io->ci_type = io->ci_type; + sub_io->ci_type = io->ci_type; sub_io->ci_no_srvlock = io->ci_no_srvlock; sub_io->ci_noatime = io->ci_noatime; - rc = cl_io_sub_init(sub->sub_env, sub_io, io->ci_type, sub_obj); - if (rc >= 0) { - lio->lis_active_subios++; - sub->sub_io_initialized = 1; - rc = 0; - } -fini_lov_io: - if (rc != 0) + result = cl_io_sub_init(sub->sub_env, sub_io, io->ci_type, sub_obj); + + if (result < 0) lov_io_sub_fini(env, lio, sub); - RETURN(rc); + + RETURN(result); } struct lov_io_sub *lov_sub_get(const struct lu_env *env, - struct lov_io *lio, int stripe) + struct lov_io *lio, int index) { - int rc; - struct lov_io_sub *sub = &lio->lis_subs[stripe]; + struct lov_io_sub *sub; + int rc = 0; - LASSERT(stripe < lio->lis_stripe_count); - ENTRY; + ENTRY; - if (!sub->sub_io_initialized) { - sub->sub_stripe = stripe; - rc = lov_io_sub_init(env, lio, sub); - } else - rc = 0; + list_for_each_entry(sub, &lio->lis_subios, sub_list) { + if (sub->sub_subio_index == index) { + rc = 1; + break; + } + } + + if (rc == 0) { + sub = lov_sub_alloc(lio, index); + if (sub == NULL) + GOTO(out, rc = -ENOMEM); + rc = lov_io_sub_init(env, lio, sub); + if (rc < 0) { + lov_sub_free(lio, sub); + GOTO(out, rc); + } + + list_add_tail(&sub->sub_list, &lio->lis_subios); + lio->lis_nr_subios++; + } +out: if (rc < 0) sub = ERR_PTR(rc); - RETURN(sub); } @@ -225,7 +171,7 @@ struct lov_io_sub *lov_sub_get(const struct lu_env *env, * */ -int lov_page_stripe(const struct cl_page *page) +int lov_page_index(const struct cl_page *page) { const struct cl_page_slice *slice; ENTRY; @@ -234,35 +180,21 @@ int lov_page_stripe(const struct cl_page *page) LASSERT(slice != NULL); LASSERT(slice->cpl_obj != NULL); - RETURN(cl2lov_page(slice)->lps_stripe); + RETURN(cl2lov_page(slice)->lps_index); } static int lov_io_subio_init(const struct lu_env *env, struct lov_io *lio, struct cl_io *io) { - struct lov_stripe_md *lsm; - int result; ENTRY; LASSERT(lio->lis_object != NULL); - lsm = lio->lis_object->lo_lsm; - /* - * Need to be optimized, we can't afford to allocate a piece of memory - * when writing a page. -jay - */ - OBD_ALLOC_LARGE(lio->lis_subs, - lsm->lsm_entries[0]->lsme_stripe_count * - sizeof lio->lis_subs[0]); - if (lio->lis_subs != NULL) { - lio->lis_nr_subios = lio->lis_stripe_count; - lio->lis_single_subio_index = -1; - lio->lis_active_subios = 0; - result = 0; - } else - result = -ENOMEM; + INIT_LIST_HEAD(&lio->lis_subios); + lio->lis_single_subio_index = -1; + lio->lis_nr_subios = 0; - RETURN(result); + RETURN(0); } static int lov_io_slice_init(struct lov_io *lio, @@ -274,7 +206,6 @@ static int lov_io_slice_init(struct lov_io *lio, lio->lis_object = obj; LASSERT(obj->lo_lsm != NULL); - lio->lis_stripe_count = obj->lo_lsm->lsm_entries[0]->lsme_stripe_count; switch (io->ci_type) { case CIT_READ: @@ -344,16 +275,23 @@ static void lov_io_fini(const struct lu_env *env, const struct cl_io_slice *ios) { struct lov_io *lio = cl2lov_io(env, ios); struct lov_object *lov = cl2lov(ios->cis_obj); - int i; ENTRY; - if (lio->lis_subs != NULL) { - for (i = 0; i < lio->lis_nr_subios; i++) - lov_io_sub_fini(env, lio, &lio->lis_subs[i]); - OBD_FREE_LARGE(lio->lis_subs, - lio->lis_nr_subios * sizeof lio->lis_subs[0]); - lio->lis_nr_subios = 0; + + LASSERT(list_empty(&lio->lis_active)); + + while (!list_empty(&lio->lis_subios)) { + struct lov_io_sub *sub = list_entry(lio->lis_subios.next, + struct lov_io_sub, + sub_list); + + list_del_init(&sub->sub_list); + lio->lis_nr_subios--; + + lov_io_sub_fini(env, lio, sub); + lov_sub_free(lio, sub); } + LASSERT(lio->lis_nr_subios == 0); LASSERT(atomic_read(&lov->lo_active_ios) > 0); if (atomic_dec_and_test(&lov->lo_active_ios)) @@ -361,6 +299,79 @@ static void lov_io_fini(const struct lu_env *env, const struct cl_io_slice *ios) EXIT; } +static void lov_io_sub_inherit(struct lov_io_sub *sub, struct lov_io *lio, + loff_t start, loff_t end) +{ + struct cl_io *io = &sub->sub_io; + struct lov_stripe_md *lsm = lio->lis_object->lo_lsm; + struct cl_io *parent = lio->lis_cl.cis_io; + int index = lov_comp_entry(sub->sub_subio_index); + int stripe = lov_comp_stripe(sub->sub_subio_index); + + switch (io->ci_type) { + case CIT_SETATTR: { + io->u.ci_setattr.sa_attr = parent->u.ci_setattr.sa_attr; + io->u.ci_setattr.sa_attr_flags = + parent->u.ci_setattr.sa_attr_flags; + io->u.ci_setattr.sa_valid = parent->u.ci_setattr.sa_valid; + io->u.ci_setattr.sa_stripe_index = stripe; + io->u.ci_setattr.sa_parent_fid = + parent->u.ci_setattr.sa_parent_fid; + if (cl_io_is_trunc(io)) { + loff_t new_size = parent->u.ci_setattr.sa_attr.lvb_size; + + new_size = lov_size_to_stripe(lsm, index, new_size, + stripe); + io->u.ci_setattr.sa_attr.lvb_size = new_size; + } + break; + } + case CIT_DATA_VERSION: { + io->u.ci_data_version.dv_data_version = 0; + io->u.ci_data_version.dv_flags = + parent->u.ci_data_version.dv_flags; + break; + } + case CIT_FAULT: { + struct cl_object *obj = parent->ci_obj; + loff_t off = cl_offset(obj, parent->u.ci_fault.ft_index); + + io->u.ci_fault = parent->u.ci_fault; + off = lov_size_to_stripe(lsm, index, off, stripe); + io->u.ci_fault.ft_index = cl_index(obj, off); + break; + } + case CIT_FSYNC: { + io->u.ci_fsync.fi_start = start; + io->u.ci_fsync.fi_end = end; + io->u.ci_fsync.fi_fid = parent->u.ci_fsync.fi_fid; + io->u.ci_fsync.fi_mode = parent->u.ci_fsync.fi_mode; + break; + } + case CIT_READ: + case CIT_WRITE: { + io->u.ci_wr.wr_sync = cl_io_is_sync_write(parent); + if (cl_io_is_append(parent)) { + io->u.ci_wr.wr_append = 1; + } else { + io->u.ci_rw.crw_pos = start; + io->u.ci_rw.crw_count = end - start; + } + break; + } + case CIT_LADVISE: { + io->u.ci_ladvise.li_start = start; + io->u.ci_ladvise.li_end = end; + io->u.ci_ladvise.li_fid = parent->u.ci_ladvise.li_fid; + io->u.ci_ladvise.li_advice = parent->u.ci_ladvise.li_advice; + io->u.ci_ladvise.li_flags = parent->u.ci_ladvise.li_flags; + break; + } + default: + break; + } +} + static loff_t lov_offset_mod(loff_t val, int delta) { if (val != OBD_OBJECT_EOF) @@ -369,85 +380,123 @@ static loff_t lov_offset_mod(loff_t val, int delta) } static int lov_io_iter_init(const struct lu_env *env, - const struct cl_io_slice *ios) + const struct cl_io_slice *ios) { struct lov_io *lio = cl2lov_io(env, ios); struct lov_stripe_md *lsm = lio->lis_object->lo_lsm; struct lov_io_sub *sub; - loff_t endpos; - loff_t start; - loff_t end; - int stripe; - int rc = 0; + struct lov_layout_entry *le; + struct lu_extent ext; + int index; + int rc = 0; ENTRY; - endpos = lov_offset_mod(lio->lis_endpos, -1); - for (stripe = 0; stripe < lio->lis_stripe_count; stripe++) { - if (!lov_stripe_intersects(lsm, stripe, lio->lis_pos, - endpos, &start, &end)) - continue; - - if (unlikely(lov_r0(lio->lis_object)->lo_sub[stripe] == NULL)) { - if (ios->cis_io->ci_type == CIT_READ || - ios->cis_io->ci_type == CIT_WRITE || - ios->cis_io->ci_type == CIT_FAULT) - RETURN(-EIO); + ext.e_start = lio->lis_pos; + ext.e_end = lio->lis_endpos; + + index = 0; + lov_foreach_layout_entry(lio->lis_object, le) { + struct lov_layout_raid0 *r0 = &le->lle_raid0; + u64 start; + u64 end; + int stripe; + + index++; + if (!lu_extent_is_overlapped(&ext, &le->lle_extent)) continue; - } - end = lov_offset_mod(end, +1); - sub = lov_sub_get(env, lio, stripe); - if (IS_ERR(sub)) { - rc = PTR_ERR(sub); - break; - } + for (stripe = 0; stripe < r0->lo_nr; stripe++) { + if (!lov_stripe_intersects(lsm, index - 1, stripe, + &ext, &start, &end)) + continue; + + if (unlikely(r0->lo_sub[stripe] == NULL)) { + if (ios->cis_io->ci_type == CIT_READ || + ios->cis_io->ci_type == CIT_WRITE || + ios->cis_io->ci_type == CIT_FAULT) + RETURN(-EIO); + + continue; + } + + end = lov_offset_mod(end, 1); + sub = lov_sub_get(env, lio, + lov_comp_index(index - 1, stripe)); + if (IS_ERR(sub)) { + rc = PTR_ERR(sub); + break; + } - lov_io_sub_inherit(sub->sub_io, lio, stripe, start, end); - rc = cl_io_iter_init(sub->sub_env, sub->sub_io); - if (rc != 0) - cl_io_iter_fini(sub->sub_env, sub->sub_io); - if (rc != 0) - break; + lov_io_sub_inherit(sub, lio, start, end); + rc = cl_io_iter_init(sub->sub_env, &sub->sub_io); + if (rc != 0) + cl_io_iter_fini(sub->sub_env, &sub->sub_io); + if (rc != 0) + break; - CDEBUG(D_VFSTRACE, "shrink: %d [%llu, %llu)\n", - stripe, start, end); + CDEBUG(D_VFSTRACE, "shrink: %d [%llu, %llu)\n", + stripe, start, end); - list_add_tail(&sub->sub_linkage, &lio->lis_active); + list_add_tail(&sub->sub_linkage, &lio->lis_active); + } + if (rc != 0) + break; } RETURN(rc); } static int lov_io_rw_iter_init(const struct lu_env *env, - const struct cl_io_slice *ios) + const struct cl_io_slice *ios) { struct lov_io *lio = cl2lov_io(env, ios); struct cl_io *io = ios->cis_io; - struct lov_stripe_md *lsm = lio->lis_object->lo_lsm; + struct lov_stripe_md_entry *lse; loff_t start = io->u.ci_rw.crw_pos; loff_t next; - unsigned long ssize = lsm->lsm_entries[0]->lsme_stripe_size; + unsigned long ssize; + int index; - LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE); - ENTRY; + LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE); + ENTRY; - /* fast path for common case. */ - if (lio->lis_nr_subios != 1 && !cl_io_is_append(io)) { + if (cl_io_is_append(io)) + RETURN(lov_io_iter_init(env, ios)); - lov_do_div64(start, ssize); - next = (start + 1) * ssize; - if (next <= start * ssize) - next = ~0ull; + index = lov_lsm_entry(lio->lis_object->lo_lsm, io->u.ci_rw.crw_pos); + if (index < 0) { /* non-existing layout component */ + if (io->ci_type == CIT_READ) { + /* TODO: it needs to detect the next component and + * then set the next pos */ + io->ci_continue = 0; - io->ci_continue = next < lio->lis_io_endpos; - io->u.ci_rw.crw_count = min_t(loff_t, lio->lis_io_endpos, - next) - io->u.ci_rw.crw_pos; - lio->lis_pos = io->u.ci_rw.crw_pos; - lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count; - CDEBUG(D_VFSTRACE, "stripe: %llu chunk: [%llu, %llu) " - "%llu\n", (__u64)start, lio->lis_pos, lio->lis_endpos, - (__u64)lio->lis_io_endpos); + RETURN(lov_io_iter_init(env, ios)); + } + + RETURN(-ENODATA); } + + lse = lov_lse(lio->lis_object, index); + + ssize = lse->lsme_stripe_size; + lov_do_div64(start, ssize); + next = (start + 1) * ssize; + if (next <= start * ssize) + next = ~0ull; + + LASSERT(io->u.ci_rw.crw_pos >= lse->lsme_extent.e_start); + next = min_t(__u64, next, lse->lsme_extent.e_end); + next = min_t(loff_t, next, lio->lis_io_endpos); + + io->ci_continue = next < lio->lis_io_endpos; + io->u.ci_rw.crw_count = next - io->u.ci_rw.crw_pos; + lio->lis_pos = io->u.ci_rw.crw_pos; + lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count; + CDEBUG(D_VFSTRACE, + "stripe: %llu chunk: [%llu, %llu) %llu, %zd\n", + (__u64)start, lio->lis_pos, lio->lis_endpos, + (__u64)lio->lis_io_endpos, io->u.ci_rw.crw_count); + /* * XXX The following call should be optimized: we know, that * [lio->lis_pos, lio->lis_endpos) intersects with exactly one stripe. @@ -456,20 +505,20 @@ static int lov_io_rw_iter_init(const struct lu_env *env, } static int lov_io_call(const struct lu_env *env, struct lov_io *lio, - int (*iofunc)(const struct lu_env *, struct cl_io *)) + int (*iofunc)(const struct lu_env *, struct cl_io *)) { struct cl_io *parent = lio->lis_cl.cis_io; - struct lov_io_sub *sub; - int rc = 0; + struct lov_io_sub *sub; + int rc = 0; ENTRY; list_for_each_entry(sub, &lio->lis_active, sub_linkage) { - rc = iofunc(sub->sub_env, sub->sub_io); + rc = iofunc(sub->sub_env, &sub->sub_io); if (rc) break; if (parent->ci_result == 0) - parent->ci_result = sub->sub_io->ci_result; + parent->ci_result = sub->sub_io.ci_result; } RETURN(rc); } @@ -530,13 +579,13 @@ lov_io_data_version_end(const struct lu_env *env, const struct cl_io_slice *ios) ENTRY; list_for_each_entry(sub, &lio->lis_active, sub_linkage) { - lov_io_end_wrapper(env, sub->sub_io); + lov_io_end_wrapper(env, &sub->sub_io); parent->u.ci_data_version.dv_data_version += - sub->sub_io->u.ci_data_version.dv_data_version; + sub->sub_io.u.ci_data_version.dv_data_version; if (parent->ci_result == 0) - parent->ci_result = sub->sub_io->ci_result; + parent->ci_result = sub->sub_io.ci_result; } EXIT; @@ -574,25 +623,34 @@ static int lov_io_read_ahead(const struct lu_env *env, struct lov_io *lio = cl2lov_io(env, ios); struct lov_object *loo = lio->lis_object; struct cl_object *obj = lov2cl(loo); - struct lov_layout_raid0 *r0 = lov_r0(loo); + struct lov_layout_raid0 *r0; struct lov_io_sub *sub; + loff_t offset; loff_t suboff; pgoff_t ra_end; unsigned int pps; /* pages per stripe */ int stripe; + int index; int rc; ENTRY; - stripe = lov_stripe_number(loo->lo_lsm, cl_offset(obj, start)); + offset = cl_offset(obj, start); + index = lov_lsm_entry(loo->lo_lsm, offset); + if (index < 0) + RETURN(-ENODATA); + + stripe = lov_stripe_number(loo->lo_lsm, index, offset); + + r0 = lov_r0(loo, index); if (unlikely(r0->lo_sub[stripe] == NULL)) RETURN(-EIO); - sub = lov_sub_get(env, lio, stripe); + sub = lov_sub_get(env, lio, lov_comp_index(index, stripe)); if (IS_ERR(sub)) - return PTR_ERR(sub); + RETURN(PTR_ERR(sub)); - lov_stripe_offset(loo->lo_lsm, cl_offset(obj, start), stripe, &suboff); - rc = cl_io_read_ahead(sub->sub_env, sub->sub_io, + lov_stripe_offset(loo->lo_lsm, index, offset, stripe, &suboff); + rc = cl_io_read_ahead(sub->sub_env, &sub->sub_io, cl_index(lovsub2cl(r0->lo_sub[stripe]), suboff), ra); @@ -602,8 +660,8 @@ static int lov_io_read_ahead(const struct lu_env *env, RETURN(rc); /** - * Adjust the stripe index by layout of raid0. ra->cra_end is the maximum - * page index covered by an underlying DLM lock. + * Adjust the stripe index by layout of raid0. ra->cra_end is the + * maximum page index covered by an underlying DLM lock. * This function converts cra_end from stripe level to file level, and * make sure it's not beyond stripe boundary. */ @@ -613,14 +671,14 @@ static int lov_io_read_ahead(const struct lu_env *env, /* cra_end is stripe level, convert it into file level */ ra_end = ra->cra_end; if (ra_end != CL_PAGE_EOF) - ra_end = lov_stripe_pgoff(loo->lo_lsm, ra_end, stripe); + ra_end = lov_stripe_pgoff(loo->lo_lsm, index, ra_end, stripe); - pps = loo->lo_lsm->lsm_entries[0]->lsme_stripe_size >> PAGE_SHIFT; + pps = lov_lse(loo, index)->lsme_stripe_size >> PAGE_SHIFT; - CDEBUG(D_READA, DFID " max_index = %lu, pps = %u, " + CDEBUG(D_READA, DFID " max_index = %lu, pps = %u, index = %u, " "stripe_size = %u, stripe no = %u, start index = %lu\n", - PFID(lu_object_fid(lov2lu(loo))), ra_end, pps, - loo->lo_lsm->lsm_entries[0]->lsme_stripe_size, stripe, start); + PFID(lu_object_fid(lov2lu(loo))), ra_end, pps, index, + lov_lse(loo, index)->lsme_stripe_size, stripe, start); /* never exceed the end of the stripe */ ra->cra_end = min_t(pgoff_t, ra_end, start + pps - start % pps - 1); @@ -651,24 +709,21 @@ static int lov_io_submit(const struct lu_env *env, struct lov_io_sub *sub; struct cl_page_list *plist = &lov_env_info(env)->lti_plist; struct cl_page *page; - int stripe; + int index; int rc = 0; ENTRY; - if (lio->lis_active_subios == 1) { - int idx = lio->lis_single_subio_index; + if (lio->lis_nr_subios == 1) { + int idx = lio->lis_single_subio_index; - LASSERT(idx < lio->lis_nr_subios); sub = lov_sub_get(env, lio, idx); LASSERT(!IS_ERR(sub)); - LASSERT(sub->sub_io == &lio->lis_single_subio); - rc = cl_io_submit_rw(sub->sub_env, sub->sub_io, + LASSERT(sub == &lio->lis_single_subio); + rc = cl_io_submit_rw(sub->sub_env, &sub->sub_io, crt, queue); RETURN(rc); } - LASSERT(lio->lis_subs != NULL); - cl_page_list_init(plist); while (qin->pl_nr > 0) { struct cl_2queue *cl2q = &lov_env_info(env)->lti_cl2q; @@ -678,18 +733,18 @@ static int lov_io_submit(const struct lu_env *env, page = cl_page_list_first(qin); cl_page_list_move(&cl2q->c2_qin, qin, page); - stripe = lov_page_stripe(page); + index = lov_page_index(page); while (qin->pl_nr > 0) { page = cl_page_list_first(qin); - if (stripe != lov_page_stripe(page)) + if (index != lov_page_index(page)) break; cl_page_list_move(&cl2q->c2_qin, qin, page); } - sub = lov_sub_get(env, lio, stripe); + sub = lov_sub_get(env, lio, index); if (!IS_ERR(sub)) { - rc = cl_io_submit_rw(sub->sub_env, sub->sub_io, + rc = cl_io_submit_rw(sub->sub_env, &sub->sub_io, crt, cl2q); } else { rc = PTR_ERR(sub); @@ -721,33 +776,30 @@ static int lov_io_commit_async(const struct lu_env *env, int rc = 0; ENTRY; - if (lio->lis_active_subios == 1) { + if (lio->lis_nr_subios == 1) { int idx = lio->lis_single_subio_index; - LASSERT(idx < lio->lis_nr_subios); sub = lov_sub_get(env, lio, idx); LASSERT(!IS_ERR(sub)); - LASSERT(sub->sub_io == &lio->lis_single_subio); - rc = cl_io_commit_async(sub->sub_env, sub->sub_io, queue, + LASSERT(sub == &lio->lis_single_subio); + rc = cl_io_commit_async(sub->sub_env, &sub->sub_io, queue, from, to, cb); RETURN(rc); } - LASSERT(lio->lis_subs != NULL); - cl_page_list_init(plist); while (queue->pl_nr > 0) { int stripe_to = to; - int stripe; + int index; LASSERT(plist->pl_nr == 0); page = cl_page_list_first(queue); cl_page_list_move(plist, queue, page); - stripe = lov_page_stripe(page); + index = lov_page_index(page); while (queue->pl_nr > 0) { page = cl_page_list_first(queue); - if (stripe != lov_page_stripe(page)) + if (index != lov_page_index(page)) break; cl_page_list_move(plist, queue, page); @@ -756,9 +808,9 @@ static int lov_io_commit_async(const struct lu_env *env, if (queue->pl_nr > 0) /* still has more pages */ stripe_to = PAGE_SIZE; - sub = lov_sub_get(env, lio, stripe); + sub = lov_sub_get(env, lio, index); if (!IS_ERR(sub)) { - rc = cl_io_commit_async(sub->sub_env, sub->sub_io, + rc = cl_io_commit_async(sub->sub_env, &sub->sub_io, plist, from, stripe_to, cb); } else { rc = PTR_ERR(sub); @@ -783,17 +835,19 @@ static int lov_io_commit_async(const struct lu_env *env, } static int lov_io_fault_start(const struct lu_env *env, - const struct cl_io_slice *ios) + const struct cl_io_slice *ios) { - struct cl_fault_io *fio; - struct lov_io *lio; - struct lov_io_sub *sub; + struct cl_fault_io *fio; + struct lov_io *lio; + struct lov_io_sub *sub; ENTRY; + fio = &ios->cis_io->u.ci_fault; lio = cl2lov_io(env, ios); - sub = lov_sub_get(env, lio, lov_page_stripe(fio->ft_page)); - sub->sub_io->u.ci_fault.ft_nob = fio->ft_nob; + sub = lov_sub_get(env, lio, lov_page_index(fio->ft_page)); + sub->sub_io.u.ci_fault.ft_nob = fio->ft_nob; + RETURN(lov_io_start(env, ios)); } @@ -807,7 +861,7 @@ static void lov_io_fsync_end(const struct lu_env *env, *written = 0; list_for_each_entry(sub, &lio->lis_active, sub_linkage) { - struct cl_io *subio = sub->sub_io; + struct cl_io *subio = &sub->sub_io; lov_io_end_wrapper(sub->sub_env, subio); @@ -972,8 +1026,8 @@ static const struct cl_io_operations lov_empty_io_ops = { .cio_commit_async = LOV_EMPTY_IMPOSSIBLE }; -int lov_io_init_raid0(const struct lu_env *env, struct cl_object *obj, - struct cl_io *io) +int lov_io_init_composite(const struct lu_env *env, struct cl_object *obj, + struct cl_io *io) { struct lov_io *lio = lov_env_io(env); struct lov_object *lov = cl2lov(obj); diff --git a/lustre/lov/lov_lock.c b/lustre/lov/lov_lock.c index 1194dc8..9c4855c 100644 --- a/lustre/lov/lov_lock.c +++ b/lustre/lov/lov_lock.c @@ -50,7 +50,7 @@ static struct lov_sublock_env *lov_sublock_env_get(const struct lu_env *env, const struct cl_lock *parent, - struct lov_lock_sub *lls) + struct lov_lock_sub *lls) { struct lov_sublock_env *subenv; struct lov_io *lio = lov_env_io(env); @@ -72,12 +72,12 @@ static struct lov_sublock_env *lov_sublock_env_get(const struct lu_env *env, subenv->lse_env = env; subenv->lse_io = io; } else { - sub = lov_sub_get(env, lio, lls->sub_stripe); + sub = lov_sub_get(env, lio, lls->sub_index); if (!IS_ERR(sub)) { subenv->lse_env = sub->sub_env; - subenv->lse_io = sub->sub_io; + subenv->lse_io = &sub->sub_io; } else { - subenv = (void*)sub; + subenv = (void *)sub; } } return subenv; @@ -114,53 +114,66 @@ static struct lov_lock *lov_lock_sub_init(const struct lu_env *env, const struct cl_object *obj, struct cl_lock *lock) { + struct lov_object *lov = cl2lov(obj); + struct lov_lock *lovlck; + struct lu_extent ext; + loff_t start; + loff_t end; int result = 0; int i; + int index; int nr; - loff_t start; - loff_t end; - loff_t file_start; - loff_t file_end; - - struct lov_object *loo = cl2lov(obj); - struct lov_layout_raid0 *r0 = lov_r0(loo); - struct lov_lock *lovlck; ENTRY; - CDEBUG(D_INODE, "%p: lock/io FID "DFID"/"DFID", lock/io clobj %p/%p\n", - loo, PFID(lu_object_fid(lov2lu(loo))), - PFID(lu_object_fid(&obj->co_lu)), - lov2cl(loo), obj); - - file_start = cl_offset(lov2cl(loo), lock->cll_descr.cld_start); - file_end = cl_offset(lov2cl(loo), lock->cll_descr.cld_end + 1) - 1; - - for (i = 0, nr = 0; i < r0->lo_nr; i++) { - /* - * XXX for wide striping smarter algorithm is desirable, - * breaking out of the loop, early. - */ - if (likely(r0->lo_sub[i] != NULL) && /* spare layout */ - lov_stripe_intersects(loo->lo_lsm, i, - file_start, file_end, &start, &end)) - nr++; + ext.e_start = cl_offset(obj, lock->cll_descr.cld_start); + if (lock->cll_descr.cld_end == CL_PAGE_EOF) + ext.e_end = OBD_OBJECT_EOF; + else + ext.e_end = cl_offset(obj, lock->cll_descr.cld_end + 1); + + nr = 0; + for (index = lov_lsm_entry(lov->lo_lsm, ext.e_start); + index != -1 && index < lov->lo_lsm->lsm_entry_count; index++) { + struct lov_layout_raid0 *r0 = lov_r0(lov, index); + + /* assume lsm entries are sorted. */ + if (!lu_extent_is_overlapped(&ext, + &lov_lse(lov, index)->lsme_extent)) + break; + + for (i = 0; i < r0->lo_nr; i++) { + if (likely(r0->lo_sub[i] != NULL) && /* spare layout */ + lov_stripe_intersects(lov->lo_lsm, index, i, + &ext, &start, &end)) + nr++; + } } - LASSERT(nr > 0); + if (nr == 0) + RETURN(ERR_PTR(-EINVAL)); OBD_ALLOC_LARGE(lovlck, offsetof(struct lov_lock, lls_sub[nr])); if (lovlck == NULL) RETURN(ERR_PTR(-ENOMEM)); lovlck->lls_nr = nr; - for (i = 0, nr = 0; i < r0->lo_nr; ++i) { - if (likely(r0->lo_sub[i] != NULL) && - lov_stripe_intersects(loo->lo_lsm, i, - file_start, file_end, &start, &end)) { + nr = 0; + for (index = lov_lsm_entry(lov->lo_lsm, ext.e_start); + index < lov->lo_lsm->lsm_entry_count; index++) { + struct lov_layout_raid0 *r0 = lov_r0(lov, index); + + /* assume lsm entries are sorted. */ + if (!lu_extent_is_overlapped(&ext, + &lov_lse(lov, index)->lsme_extent)) + break; + for (i = 0; i < r0->lo_nr; ++i) { struct lov_lock_sub *lls = &lovlck->lls_sub[nr]; - struct cl_lock_descr *descr; + struct cl_lock_descr *descr = &lls->sub_lock.cll_descr; - descr = &lls->sub_lock.cll_descr; + if (unlikely(r0->lo_sub[i] == NULL) || + !lov_stripe_intersects(lov->lo_lsm, index, i, + &ext, &start, &end)) + continue; LASSERT(descr->cld_obj == NULL); descr->cld_obj = lovsub2cl(r0->lo_sub[i]); @@ -170,7 +183,7 @@ static struct lov_lock *lov_lock_sub_init(const struct lu_env *env, descr->cld_gid = lock->cll_descr.cld_gid; descr->cld_enq_flags = lock->cll_descr.cld_enq_flags; - lls->sub_stripe = i; + lls->sub_index = lov_comp_index(index, i); /* initialize sub lock */ result = lov_sublock_init(env, lock, lls); @@ -308,8 +321,8 @@ static const struct cl_lock_operations lov_lock_ops = { .clo_print = lov_lock_print }; -int lov_lock_init_raid0(const struct lu_env *env, struct cl_object *obj, - struct cl_lock *lock, const struct cl_io *io) +int lov_lock_init_composite(const struct lu_env *env, struct cl_object *obj, + struct cl_lock *lock, const struct cl_io *io) { struct lov_lock *lck; int result = 0; diff --git a/lustre/lov/lov_merge.c b/lustre/lov/lov_merge.c index f13ec67..de9e429 100644 --- a/lustre/lov/lov_merge.c +++ b/lustre/lov/lov_merge.c @@ -42,9 +42,10 @@ * initializes the current atime, mtime, ctime to avoid regressing a more * uptodate time on the local client. */ -int lov_merge_lvb_kms(struct lov_stripe_md *lsm, +int lov_merge_lvb_kms(struct lov_stripe_md *lsm, int index, struct ost_lvb *lvb, __u64 *kms_place) { + struct lov_stripe_md_entry *lse = lsm->lsm_entries[index]; u64 size = 0; u64 kms = 0; u64 blocks = 0; @@ -61,8 +62,8 @@ int lov_merge_lvb_kms(struct lov_stripe_md *lsm, " a=%llu c=%llu b=%llu\n", POSTID(&lsm->lsm_oi), lvb->lvb_size, lvb->lvb_mtime, lvb->lvb_atime, lvb->lvb_ctime, lvb->lvb_blocks); - for (i = 0; i < lsm->lsm_entries[0]->lsme_stripe_count; i++) { - struct lov_oinfo *loi = lsm->lsm_entries[0]->lsme_oinfo[i]; + for (i = 0; i < lse->lsme_stripe_count; i++) { + struct lov_oinfo *loi = lse->lsme_oinfo[i]; u64 lov_size; u64 tmpsize; @@ -72,14 +73,14 @@ int lov_merge_lvb_kms(struct lov_stripe_md *lsm, } tmpsize = loi->loi_kms; - lov_size = lov_stripe_size(lsm, tmpsize, i); + lov_size = lov_stripe_size(lsm, index, tmpsize, i); if (lov_size > kms) kms = lov_size; if (loi->loi_lvb.lvb_size > tmpsize) tmpsize = loi->loi_lvb.lvb_size; - lov_size = lov_stripe_size(lsm, tmpsize, i); + lov_size = lov_stripe_size(lsm, index, tmpsize, i); if (lov_size > size) size = lov_size; /* merge blocks, mtime, atime */ diff --git a/lustre/lov/lov_object.c b/lustre/lov/lov_object.c index 9edd3e4..eb0fe68 100644 --- a/lustre/lov/lov_object.c +++ b/lustre/lov/lov_object.c @@ -109,9 +109,9 @@ static int lov_init_empty(const struct lu_env *env, struct lov_device *dev, return 0; } -static void lov_install_raid0(const struct lu_env *env, - struct lov_object *lov, - union lov_layout_state *state) +static void lov_install_composite(const struct lu_env *env, + struct lov_object *lov, + union lov_layout_state *state) { } @@ -129,13 +129,14 @@ static struct cl_object *lov_sub_find(const struct lu_env *env, } static int lov_init_sub(const struct lu_env *env, struct lov_object *lov, - struct cl_object *stripe, struct lov_layout_raid0 *r0, - int idx) + struct cl_object *subobj, struct lov_layout_raid0 *r0, + struct lov_oinfo *oinfo, int idx) { struct cl_object_header *hdr; struct cl_object_header *subhdr; struct cl_object_header *parent; - struct lov_oinfo *oinfo; + int entry = lov_comp_entry(idx); + int stripe = lov_comp_stripe(idx); int result; if (OBD_FAIL_CHECK(OBD_FAIL_LOV_INIT)) { @@ -144,19 +145,18 @@ static int lov_init_sub(const struct lu_env *env, struct lov_object *lov, * freed memory. This is because osc_object is referring to * lov_oinfo of lsm_stripe_data which will be freed due to * this failure. */ - cl_object_kill(env, stripe); - cl_object_put(env, stripe); + cl_object_kill(env, subobj); + cl_object_put(env, subobj); return -EIO; } hdr = cl_object_header(lov2cl(lov)); - subhdr = cl_object_header(stripe); + subhdr = cl_object_header(subobj); - oinfo = lov->lo_lsm->lsm_entries[0]->lsme_oinfo[idx]; - CDEBUG(D_INODE, DFID"@%p[%d] -> "DFID"@%p: ostid: "DOSTID - " idx: %d gen: %d\n", - PFID(&subhdr->coh_lu.loh_fid), subhdr, idx, - PFID(&hdr->coh_lu.loh_fid), hdr, POSTID(&oinfo->loi_oi), + CDEBUG(D_INODE, DFID"@%p[%d:%d] -> "DFID"@%p: ostid: "DOSTID + " ost idx: %d gen: %d\n", + PFID(lu_object_fid(&subobj->co_lu)), subhdr, entry, stripe, + PFID(lu_object_fid(lov2lu(lov))), hdr, POSTID(&oinfo->loi_oi), oinfo->loi_ost_idx, oinfo->loi_ost_gen); /* reuse ->coh_attr_guard to protect coh_parent change */ @@ -166,10 +166,10 @@ static int lov_init_sub(const struct lu_env *env, struct lov_object *lov, subhdr->coh_parent = hdr; spin_unlock(&subhdr->coh_attr_guard); subhdr->coh_nesting = hdr->coh_nesting + 1; - lu_object_ref_add(&stripe->co_lu, "lov-parent", lov); - r0->lo_sub[idx] = cl2lovsub(stripe); - r0->lo_sub[idx]->lso_super = lov; - r0->lo_sub[idx]->lso_index = idx; + lu_object_ref_add(&subobj->co_lu, "lov-parent", lov); + r0->lo_sub[stripe] = cl2lovsub(subobj); + r0->lo_sub[stripe]->lso_super = lov; + r0->lo_sub[stripe]->lso_index = idx; result = 0; } else { struct lu_object *old_obj; @@ -183,18 +183,18 @@ static int lov_init_sub(const struct lu_env *env, struct lov_object *lov, if (old_lov->lo_layout_invalid) { /* the object's layout has already changed but isn't * refreshed */ - lu_object_unhash(env, &stripe->co_lu); + lu_object_unhash(env, &subobj->co_lu); result = -EAGAIN; } else { mask = D_ERROR; result = -EIO; } - LU_OBJECT_DEBUG(mask, env, &stripe->co_lu, + LU_OBJECT_DEBUG(mask, env, &subobj->co_lu, "stripe %d is already owned.", idx); LU_OBJECT_DEBUG(mask, env, old_obj, "owned."); LU_OBJECT_HEADER(mask, env, lov2lu(lov), "try to own.\n"); - cl_object_put(env, stripe); + cl_object_put(env, subobj); } return result; } @@ -216,94 +216,123 @@ static int lov_page_slice_fixup(struct lov_object *lov, } static int lov_init_raid0(const struct lu_env *env, struct lov_device *dev, - struct lov_object *lov, struct lov_stripe_md *lsm, - const struct cl_object_conf *conf, - union lov_layout_state *state) + struct lov_object *lov, int index, + struct lov_layout_raid0 *r0) { - int result; - int i; + struct lov_thread_info *lti = lov_env_info(env); + struct cl_object_conf *subconf = <i->lti_stripe_conf; + struct lu_fid *ofid = <i->lti_fid; + struct cl_object *stripe; + struct lov_stripe_md_entry *lse = lov_lse(lov, index); + int result; + int psz; + int i; - struct cl_object *stripe; - struct lov_thread_info *lti = lov_env_info(env); - struct cl_object_conf *subconf = <i->lti_stripe_conf; - struct lu_fid *ofid = <i->lti_fid; - struct lov_layout_raid0 *r0 = &state->raid0; + ENTRY; - ENTRY; + spin_lock_init(&r0->lo_sub_lock); + r0->lo_nr = lse->lsme_stripe_count; + LASSERT(r0->lo_nr <= lov_targets_nr(dev)); + + OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]); + if (r0->lo_sub == NULL) + GOTO(out, result = -ENOMEM); + + psz = 0; + result = 0; + memset(subconf, 0, sizeof(*subconf)); + + /* + * Create stripe cl_objects. + */ + for (i = 0; i < r0->lo_nr; ++i) { + struct cl_device *subdev; + struct lov_oinfo *oinfo = lse->lsme_oinfo[i]; + int ost_idx = oinfo->loi_ost_idx; - if (lsm->lsm_magic != LOV_MAGIC_V1 && lsm->lsm_magic != LOV_MAGIC_V3) { - dump_lsm(D_ERROR, lsm); - LASSERTF(0, "magic mismatch, expected %d/%d, actual %d.\n", - LOV_MAGIC_V1, LOV_MAGIC_V3, lsm->lsm_magic); + if (lov_oinfo_is_dummy(oinfo)) + continue; + + result = ostid_to_fid(ofid, &oinfo->loi_oi, oinfo->loi_ost_idx); + if (result != 0) + GOTO(out, result); + + if (dev->ld_target[ost_idx] == NULL) { + CERROR("%s: OST %04x is not initialized\n", + lov2obd(dev->ld_lov)->obd_name, ost_idx); + GOTO(out, result = -EIO); + } + + subdev = lovsub2cl_dev(dev->ld_target[ost_idx]); + subconf->u.coc_oinfo = oinfo; + LASSERTF(subdev != NULL, "not init ost %d\n", ost_idx); + /* In the function below, .hs_keycmp resolves to + * lu_obj_hop_keycmp() */ + /* coverity[overrun-buffer-val] */ + stripe = lov_sub_find(env, subdev, ofid, subconf); + if (IS_ERR(stripe)) + GOTO(out, result = PTR_ERR(stripe)); + + result = lov_init_sub(env, lov, stripe, r0, oinfo, + lov_comp_index(index, i)); + if (result == -EAGAIN) { /* try again */ + --i; + result = 0; + continue; + } + + if (result == 0) { + int sz = lov_page_slice_fixup(lov, stripe); + LASSERT(ergo(psz > 0, psz == sz)); + psz = sz; + } } + if (result == 0) + result = psz; +out: + RETURN(result); +} +static int lov_init_composite(const struct lu_env *env, struct lov_device *dev, + struct lov_object *lov, struct lov_stripe_md *lsm, + const struct cl_object_conf *conf, + union lov_layout_state *state) +{ + struct lov_layout_composite *comp = &state->composite; + unsigned int entry_count; + unsigned int psz = 0; + int result = 0; + int i; + + ENTRY; + + LASSERT(lsm->lsm_entry_count > 0); LASSERT(lov->lo_lsm == NULL); lov->lo_lsm = lsm_addref(lsm); - r0->lo_nr = lsm->lsm_entries[0]->lsme_stripe_count; - LASSERT(r0->lo_nr <= lov_targets_nr(dev)); - lov->lo_layout_invalid = true; - OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]); - if (r0->lo_sub != NULL) { - int psz = 0; + entry_count = lsm->lsm_entry_count; + comp->lo_entry_count = entry_count; - result = 0; - subconf->coc_inode = conf->coc_inode; - spin_lock_init(&r0->lo_sub_lock); - /* - * Create stripe cl_objects. - */ - for (i = 0; i < r0->lo_nr && result == 0; ++i) { - struct cl_device *subdev; - struct lov_oinfo *oinfo = - lsm->lsm_entries[0]->lsme_oinfo[i]; - int ost_idx = oinfo->loi_ost_idx; - - if (lov_oinfo_is_dummy(oinfo)) - continue; - - result = ostid_to_fid(ofid, &oinfo->loi_oi, - oinfo->loi_ost_idx); - if (result != 0) - GOTO(out, result); - - if (dev->ld_target[ost_idx] == NULL) { - CERROR("%s: OST %04x is not initialized\n", - lov2obd(dev->ld_lov)->obd_name, ost_idx); - GOTO(out, result = -EIO); - } + OBD_ALLOC(comp->lo_entries, entry_count * sizeof(*comp->lo_entries)); + if (comp->lo_entries == NULL) + RETURN(-ENOMEM); - subdev = lovsub2cl_dev(dev->ld_target[ost_idx]); - subconf->u.coc_oinfo = oinfo; - LASSERTF(subdev != NULL, "not init ost %d\n", ost_idx); - /* In the function below, .hs_keycmp resolves to - * lu_obj_hop_keycmp() */ - /* coverity[overrun-buffer-val] */ - stripe = lov_sub_find(env, subdev, ofid, subconf); - if (!IS_ERR(stripe)) { - result = lov_init_sub(env, lov, stripe, r0, i); - if (result == -EAGAIN) { /* try again */ - --i; - result = 0; - continue; - } - } else { - result = PTR_ERR(stripe); - } + for (i = 0; i < entry_count; i++) { + struct lov_layout_entry *le = &comp->lo_entries[i]; - if (result == 0) { - int sz = lov_page_slice_fixup(lov, stripe); - LASSERT(ergo(psz > 0, psz == sz)); - psz = sz; - } - } - if (result == 0) - cl_object_header(&lov->lo_cl)->coh_page_bufsize += psz; - } else - result = -ENOMEM; -out: - RETURN(result); + le->lle_extent = lsm->lsm_entries[i]->lsme_extent; + result = lov_init_raid0(env, dev, lov, i, &le->lle_raid0); + if (result < 0) + break; + + LASSERT(ergo(psz > 0, psz == result)); + psz = result; + } + if (psz > 0) + cl_object_header(&lov->lo_cl)->coh_page_bufsize += psz; + + return result > 0 ? 0 : result; } static int lov_init_released(const struct lu_env *env, @@ -323,20 +352,27 @@ static int lov_init_released(const struct lu_env *env, static struct cl_object *lov_find_subobj(const struct lu_env *env, struct lov_object *lov, struct lov_stripe_md *lsm, - int stripe_idx) + int index) { struct lov_device *dev = lu2lov_dev(lov2lu(lov)->lo_dev); - struct lov_oinfo *oinfo = lsm->lsm_entries[0]->lsme_oinfo[stripe_idx]; struct lov_thread_info *lti = lov_env_info(env); struct lu_fid *ofid = <i->lti_fid; + struct lov_oinfo *oinfo; struct cl_device *subdev; + int entry = lov_comp_entry(index); + int stripe = lov_comp_stripe(index); int ost_idx; int rc; struct cl_object *result; - if (lov->lo_type != LLT_RAID0) + if (lov->lo_type != LLT_COMP) GOTO(out, result = NULL); + if (entry >= lsm->lsm_entry_count || + stripe >= lsm->lsm_entries[entry]->lsme_stripe_count) + GOTO(out, result = NULL); + + oinfo = lsm->lsm_entries[entry]->lsme_oinfo[stripe]; ost_idx = oinfo->loi_ost_idx; rc = ostid_to_fid(ofid, &oinfo->loi_oi, ost_idx); if (rc != 0) @@ -360,15 +396,14 @@ static int lov_delete_empty(const struct lu_env *env, struct lov_object *lov, } static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov, + struct lov_layout_raid0 *r0, struct lovsub_object *los, int idx) { struct cl_object *sub; - struct lov_layout_raid0 *r0; struct lu_site *site; struct lu_site_bkt_data *bkt; wait_queue_t *waiter; - r0 = &lov->u.raid0; LASSERT(r0->lo_sub[idx] == los); sub = lovsub2cl(los); @@ -406,32 +441,45 @@ static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov, LASSERT(r0->lo_sub[idx] == NULL); } -static int lov_delete_raid0(const struct lu_env *env, struct lov_object *lov, - union lov_layout_state *state) +static void lov_delete_raid0(const struct lu_env *env, struct lov_object *lov, + struct lov_layout_raid0 *r0) { - struct lov_layout_raid0 *r0 = &state->raid0; - struct lov_stripe_md *lsm = lov->lo_lsm; - int i; - ENTRY; - dump_lsm(D_INODE, lsm); - - lov_layout_wait(env, lov); if (r0->lo_sub != NULL) { - for (i = 0; i < r0->lo_nr; ++i) { - struct lovsub_object *los = r0->lo_sub[i]; + int i; + + for (i = 0; i < r0->lo_nr; ++i) { + struct lovsub_object *los = r0->lo_sub[i]; - if (los != NULL) { + if (los != NULL) { cl_object_prune(env, &los->lso_cl); - /* - * If top-level object is to be evicted from - * the cache, so are its sub-objects. - */ - lov_subobject_kill(env, lov, los, i); + /* + * If top-level object is to be evicted from + * the cache, so are its sub-objects. + */ + lov_subobject_kill(env, lov, r0, los, i); } } } + + EXIT; +} + +static int lov_delete_composite(const struct lu_env *env, + struct lov_object *lov, + union lov_layout_state *state) +{ + struct lov_layout_entry *entry; + + ENTRY; + + dump_lsm(D_INODE, lov->lo_lsm); + + lov_layout_wait(env, lov); + lov_foreach_layout_entry(lov, entry) + lov_delete_raid0(env, lov, &entry->lle_raid0); + RETURN(0); } @@ -441,16 +489,32 @@ static void lov_fini_empty(const struct lu_env *env, struct lov_object *lov, LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED); } -static void lov_fini_raid0(const struct lu_env *env, struct lov_object *lov, - union lov_layout_state *state) +static void lov_fini_raid0(const struct lu_env *env, + struct lov_layout_raid0 *r0) { - struct lov_layout_raid0 *r0 = &state->raid0; - ENTRY; - if (r0->lo_sub != NULL) { OBD_FREE_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]); r0->lo_sub = NULL; } +} + +static void lov_fini_composite(const struct lu_env *env, + struct lov_object *lov, + union lov_layout_state *state) +{ + struct lov_layout_composite *comp = &state->composite; + ENTRY; + + if (comp->lo_entries != NULL) { + struct lov_layout_entry *entry; + + lov_foreach_layout_entry(lov, entry) + lov_fini_raid0(env, &entry->lle_raid0); + + OBD_FREE(comp->lo_entries, + comp->lo_entry_count * sizeof(*comp->lo_entries)); + comp->lo_entries = NULL; + } dump_lsm(D_INODE, lov->lo_lsm); lov_free_memmd(&lov->lo_lsm); @@ -475,17 +539,10 @@ static int lov_print_empty(const struct lu_env *env, void *cookie, } static int lov_print_raid0(const struct lu_env *env, void *cookie, - lu_printer_t p, const struct lu_object *o) + lu_printer_t p, struct lov_layout_raid0 *r0) { - struct lov_object *lov = lu2lov(o); - struct lov_layout_raid0 *r0 = lov_r0(lov); - struct lov_stripe_md *lsm = lov->lo_lsm; - int i; + int i; - (*p)(env, cookie, "stripes: %d, %s, lsm{%p 0x%08X %d %u %u}:\n", - r0->lo_nr, lov->lo_layout_invalid ? "invalid" : "valid", lsm, - lsm->lsm_magic, atomic_read(&lsm->lsm_refc), - lsm->lsm_entries[0]->lsme_stripe_count, lsm->lsm_layout_gen); for (i = 0; i < r0->lo_nr; ++i) { struct lu_object *sub; @@ -499,6 +556,32 @@ static int lov_print_raid0(const struct lu_env *env, void *cookie, return 0; } +static int lov_print_composite(const struct lu_env *env, void *cookie, + lu_printer_t p, const struct lu_object *o) +{ + struct lov_object *lov = lu2lov(o); + struct lov_stripe_md *lsm = lov->lo_lsm; + int i; + + (*p)(env, cookie, "entries: %d, %s, lsm{%p 0x%08X %d %u}:\n", + lsm->lsm_entry_count, + lov->lo_layout_invalid ? "invalid" : "valid", lsm, + lsm->lsm_magic, atomic_read(&lsm->lsm_refc), + lsm->lsm_layout_gen); + + for (i = 0; i < lsm->lsm_entry_count; i++) { + struct lov_stripe_md_entry *lse = lsm->lsm_entries[i]; + + (*p)(env, cookie, DEXT ": { 0x%08X, %u, %u, %u, %u }\n", + PEXT(&lse->lsme_extent), lse->lsme_magic, + lse->lsme_id, lse->lsme_layout_gen, + lse->lsme_stripe_count, lse->lsme_stripe_size); + lov_print_raid0(env, cookie, p, lov_r0(lov, i)); + } + + return 0; +} + static int lov_print_released(const struct lu_env *env, void *cookie, lu_printer_t p, const struct lu_object *o) { @@ -506,10 +589,10 @@ static int lov_print_released(const struct lu_env *env, void *cookie, struct lov_stripe_md *lsm = lov->lo_lsm; (*p)(env, cookie, - "released: %s, lsm{%p 0x%08X %d %u %u}:\n", + "released: %s, lsm{%p 0x%08X %d %u}:\n", lov->lo_layout_invalid ? "invalid" : "valid", lsm, lsm->lsm_magic, atomic_read(&lsm->lsm_refc), - lsm->lsm_entries[0]->lsme_stripe_count, lsm->lsm_layout_gen); + lsm->lsm_layout_gen); return 0; } @@ -527,63 +610,80 @@ static int lov_attr_get_empty(const struct lu_env *env, struct cl_object *obj, return 0; } -static int lov_attr_get_raid0(const struct lu_env *env, struct cl_object *obj, - struct cl_attr *attr) +static int lov_attr_get_raid0(const struct lu_env *env, struct lov_object *lov, + unsigned int index, struct lov_layout_raid0 *r0) + +{ + struct lov_stripe_md *lsm = lov->lo_lsm; + struct ost_lvb *lvb = &lov_env_info(env)->lti_lvb; + struct cl_attr *attr = &r0->lo_attr; + __u64 kms = 0; + int result = 0; + + if (r0->lo_attr_valid) + return 0; + + memset(lvb, 0, sizeof(*lvb)); + + /* XXX: timestamps can be negative by sanity:test_39m, + * how can it be? */ + lvb->lvb_atime = LLONG_MIN; + lvb->lvb_ctime = LLONG_MIN; + lvb->lvb_mtime = LLONG_MIN; + + /* + * XXX that should be replaced with a loop over sub-objects, + * doing cl_object_attr_get() on them. But for now, let's + * reuse old lov code. + */ + + /* + * XXX take lsm spin-lock to keep lov_merge_lvb_kms() + * happy. It's not needed, because new code uses + * ->coh_attr_guard spin-lock to protect consistency of + * sub-object attributes. + */ + lov_stripe_lock(lsm); + result = lov_merge_lvb_kms(lsm, index, lvb, &kms); + lov_stripe_unlock(lsm); + if (result == 0) { + cl_lvb2attr(attr, lvb); + attr->cat_kms = kms; + r0->lo_attr_valid = 1; + } + + return result; +} + +static int lov_attr_get_composite(const struct lu_env *env, + struct cl_object *obj, + struct cl_attr *attr) { struct lov_object *lov = cl2lov(obj); - struct lov_layout_raid0 *r0 = lov_r0(lov); - struct cl_attr *lov_attr = &r0->lo_attr; + struct lov_layout_entry *entry; int result = 0; + int index = 0; - ENTRY; + ENTRY; - /* this is called w/o holding type guard mutex, so it must be inside - * an on going IO otherwise lsm may be replaced. - * LU-2117: it turns out there exists one exception. For mmaped files, - * the lock of those files may be requested in the other file's IO - * context, and this function is called in ccc_lock_state(), it will - * hit this assertion. - * Anyway, it's still okay to call attr_get w/o type guard as layout - * can't go if locks exist. */ - /* LASSERT(atomic_read(&lsm->lsm_refc) > 1); */ - - if (!r0->lo_attr_valid) { - struct lov_stripe_md *lsm = lov->lo_lsm; - struct ost_lvb *lvb = &lov_env_info(env)->lti_lvb; - __u64 kms = 0; - - memset(lvb, 0, sizeof(*lvb)); - /* XXX: timestamps can be negative by sanity:test_39m, - * how can it be? */ - lvb->lvb_atime = LLONG_MIN; - lvb->lvb_ctime = LLONG_MIN; - lvb->lvb_mtime = LLONG_MIN; + attr->cat_size = 0; + attr->cat_blocks = 0; + lov_foreach_layout_entry(lov, entry) { + struct lov_layout_raid0 *r0 = &entry->lle_raid0; + struct cl_attr *lov_attr = &r0->lo_attr; - /* - * XXX that should be replaced with a loop over sub-objects, - * doing cl_object_attr_get() on them. But for now, let's - * reuse old lov code. - */ + result = lov_attr_get_raid0(env, lov, index, r0); + if (result != 0) + break; - /* - * XXX take lsm spin-lock to keep lov_merge_lvb_kms() - * happy. It's not needed, because new code uses - * ->coh_attr_guard spin-lock to protect consistency of - * sub-object attributes. - */ - lov_stripe_lock(lsm); - result = lov_merge_lvb_kms(lsm, lvb, &kms); - lov_stripe_unlock(lsm); - if (result == 0) { - cl_lvb2attr(lov_attr, lvb); - lov_attr->cat_kms = kms; - r0->lo_attr_valid = 1; - } - } - if (result == 0) { /* merge results */ - attr->cat_blocks = lov_attr->cat_blocks; - attr->cat_size = lov_attr->cat_size; - attr->cat_kms = lov_attr->cat_kms; + index++; + + /* merge results */ + attr->cat_blocks += lov_attr->cat_blocks; + if (attr->cat_size < lov_attr->cat_size) + attr->cat_size = lov_attr->cat_size; + if (attr->cat_kms < lov_attr->cat_kms) + attr->cat_kms = lov_attr->cat_kms; if (attr->cat_atime < lov_attr->cat_atime) attr->cat_atime = lov_attr->cat_atime; if (attr->cat_ctime < lov_attr->cat_ctime) @@ -606,17 +706,6 @@ const static struct lov_layout_operations lov_dispatch[] = { .llo_io_init = lov_io_init_empty, .llo_getattr = lov_attr_get_empty, }, - [LLT_RAID0] = { - .llo_init = lov_init_raid0, - .llo_delete = lov_delete_raid0, - .llo_fini = lov_fini_raid0, - .llo_install = lov_install_raid0, - .llo_print = lov_print_raid0, - .llo_page_init = lov_page_init_raid0, - .llo_lock_init = lov_lock_init_raid0, - .llo_io_init = lov_io_init_raid0, - .llo_getattr = lov_attr_get_raid0, - }, [LLT_RELEASED] = { .llo_init = lov_init_released, .llo_delete = lov_delete_empty, @@ -627,7 +716,18 @@ const static struct lov_layout_operations lov_dispatch[] = { .llo_lock_init = lov_lock_init_empty, .llo_io_init = lov_io_init_released, .llo_getattr = lov_attr_get_empty, - } + }, + [LLT_COMP] = { + .llo_init = lov_init_composite, + .llo_delete = lov_delete_composite, + .llo_fini = lov_fini_composite, + .llo_install = lov_install_composite, + .llo_print = lov_print_composite, + .llo_page_init = lov_page_init_composite, + .llo_lock_init = lov_lock_init_composite, + .llo_io_init = lov_io_init_composite, + .llo_getattr = lov_attr_get_composite, + }, }; /** @@ -651,13 +751,15 @@ static enum lov_layout_type lov_type(struct lov_stripe_md *lsm) if (lsm == NULL) return LLT_EMPTY; - if (lsm->lsm_magic == LOV_MAGIC_COMP_V1) - return LLT_EMPTY; - if (lsm->lsm_is_released) return LLT_RELEASED; - return LLT_RAID0; + if (lsm->lsm_magic == LOV_MAGIC_V1 || + lsm->lsm_magic == LOV_MAGIC_V3 || + lsm->lsm_magic == LOV_MAGIC_COMP_V1) + return LLT_COMP; + + return LLT_EMPTY; } static inline void lov_conf_freeze(struct lov_object *lov) @@ -841,6 +943,8 @@ int lov_object_init(const struct lu_env *env, struct lu_object *obj, cconf->u.coc_layout.lb_len); if (IS_ERR(lsm)) RETURN(PTR_ERR(lsm)); + + dump_lsm(D_INODE, lsm); } /* no locking is necessary, as object is being created */ @@ -1010,41 +1114,38 @@ int lov_lock_init(const struct lu_env *env, struct cl_object *obj, * over which the mapping is spread * * \param lsm [in] striping information for the file - * \param fm_start [in] logical start of mapping - * \param fm_end [in] logical end of mapping + * \param index [in] stripe component index + * \param ext [in] logical extent of mapping * \param start_stripe [in] starting stripe of the mapping * \param stripe_count [out] the number of stripes across which to map is * returned * * \retval last_stripe return the last stripe of the mapping */ -static int fiemap_calc_last_stripe(struct lov_stripe_md *lsm, - u64 fm_start, u64 fm_end, +static int fiemap_calc_last_stripe(struct lov_stripe_md *lsm, int index, + struct lu_extent *ext, int start_stripe, int *stripe_count) { + struct lov_stripe_md_entry *lsme = lsm->lsm_entries[index]; int last_stripe; u64 obd_start; u64 obd_end; int i, j; - if (fm_end - fm_start > lsm->lsm_entries[0]->lsme_stripe_size * - lsm->lsm_entries[0]->lsme_stripe_count) { - last_stripe = (start_stripe < 1 ? - lsm->lsm_entries[0]->lsme_stripe_count - 1 : - start_stripe - 1); - *stripe_count = lsm->lsm_entries[0]->lsme_stripe_count; + if (ext->e_end - ext->e_start > + lsme->lsme_stripe_size * lsme->lsme_stripe_count) { + last_stripe = (start_stripe < 1 ? lsme->lsme_stripe_count - 1 : + start_stripe - 1); + *stripe_count = lsme->lsme_stripe_count; } else { - for (j = 0, i = start_stripe; - j < lsm->lsm_entries[0]->lsme_stripe_count; - i = (i + 1) % lsm->lsm_entries[0]->lsme_stripe_count, - j++) { - if ((lov_stripe_intersects(lsm, i, fm_start, fm_end, + for (j = 0, i = start_stripe; j < lsme->lsme_stripe_count; + i = (i + 1) % lsme->lsme_stripe_count, j++) { + if ((lov_stripe_intersects(lsm, index, i, ext, &obd_start, &obd_end)) == 0) break; } *stripe_count = j; - last_stripe = (start_stripe + j - 1) % - lsm->lsm_entries[0]->lsme_stripe_count; + last_stripe = (start_stripe + j - 1) % lsme->lsme_stripe_count; } return last_stripe; @@ -1093,15 +1194,16 @@ static void fiemap_prepare_and_copy_exts(struct fiemap *fiemap, * * \param fiemap [in] fiemap request header * \param lsm [in] striping information for the file - * \param fm_start [in] logical start of mapping - * \param fm_end [in] logical end of mapping + * \param index [in] stripe component index + * \param ext [in] logical extent of mapping * \param start_stripe [out] starting stripe will be returned in this */ static u64 fiemap_calc_fm_end_offset(struct fiemap *fiemap, struct lov_stripe_md *lsm, - u64 fm_start, u64 fm_end, + int index, struct lu_extent *ext, int *start_stripe) { + struct lov_stripe_md_entry *lsme = lsm->lsm_entries[index]; u64 local_end = fiemap->fm_extents[0].fe_logical; u64 lun_start; u64 lun_end; @@ -1114,8 +1216,8 @@ static u64 fiemap_calc_fm_end_offset(struct fiemap *fiemap, return 0; /* Find out stripe_no from ost_index saved in the fe_device */ - for (i = 0; i < lsm->lsm_entries[0]->lsme_stripe_count; i++) { - struct lov_oinfo *oinfo = lsm->lsm_entries[0]->lsme_oinfo[i]; + for (i = 0; i < lsme->lsme_stripe_count; i++) { + struct lov_oinfo *oinfo = lsme->lsme_oinfo[i]; if (lov_oinfo_is_dummy(oinfo)) continue; @@ -1131,8 +1233,8 @@ static u64 fiemap_calc_fm_end_offset(struct fiemap *fiemap, /* If we have finished mapping on previous device, shift logical * offset to start of next device */ - if (lov_stripe_intersects(lsm, stripe_no, fm_start, fm_end, - &lun_start, &lun_end) != 0 && + if (lov_stripe_intersects(lsm, index, stripe_no, ext, + &lun_start, &lun_end) != 0 && local_end < lun_end) { fm_end_offset = local_end; *start_stripe = stripe_no; @@ -1140,34 +1242,32 @@ static u64 fiemap_calc_fm_end_offset(struct fiemap *fiemap, /* This is a special value to indicate that caller should * calculate offset in next stripe. */ fm_end_offset = 0; - *start_stripe = (stripe_no + 1) % - lsm->lsm_entries[0]->lsme_stripe_count; + *start_stripe = (stripe_no + 1) % lsme->lsme_stripe_count; } return fm_end_offset; } struct fiemap_state { - struct fiemap *fs_fm; - u64 fs_start; - u64 fs_length; - u64 fs_end; - u64 fs_end_offset; - int fs_cur_extent; - int fs_cnt_need; - int fs_start_stripe; - int fs_last_stripe; - bool fs_device_done; - bool fs_finish; - bool fs_enough; + struct fiemap *fs_fm; + struct lu_extent fs_ext; + u64 fs_length; + u64 fs_end_offset; + int fs_cur_extent; + int fs_cnt_need; + int fs_start_stripe; + int fs_last_stripe; + bool fs_device_done; + bool fs_finish_stripe; + bool fs_enough; }; int fiemap_for_stripe(const struct lu_env *env, struct cl_object *obj, - struct lov_stripe_md *lsm, - struct fiemap *fiemap, size_t *buflen, - struct ll_fiemap_info_key *fmkey, int stripeno, - struct fiemap_state *fs) + struct lov_stripe_md *lsm, struct fiemap *fiemap, + size_t *buflen, struct ll_fiemap_info_key *fmkey, + int index, int stripeno, struct fiemap_state *fs) { + struct lov_stripe_md_entry *lsme = lsm->lsm_entries[index]; struct cl_object *subobj; struct lov_obd *lov = lu2lov_dev(obj->co_lu.lo_dev)->ld_lov; struct fiemap_extent *fm_ext = &fs->fs_fm->fm_extents[0]; @@ -1186,11 +1286,11 @@ int fiemap_for_stripe(const struct lu_env *env, struct cl_object *obj, fs->fs_device_done = false; /* Find out range of mapping on this stripe */ - if ((lov_stripe_intersects(lsm, stripeno, fs->fs_start, fs->fs_end, + if ((lov_stripe_intersects(lsm, index, stripeno, &fs->fs_ext, &lun_start, &obd_object_end)) == 0) return 0; - if (lov_oinfo_is_dummy(lsm->lsm_entries[0]->lsme_oinfo[stripeno])) + if (lov_oinfo_is_dummy(lsme->lsme_oinfo[stripeno])) return -EIO; /* If this is a continuation FIEMAP call and we are on @@ -1198,16 +1298,7 @@ int fiemap_for_stripe(const struct lu_env *env, struct cl_object *obj, * end_offset */ if (fs->fs_end_offset != 0 && stripeno == fs->fs_start_stripe) lun_start = fs->fs_end_offset; - - lun_end = fs->fs_length; - if (lun_end != ~0ULL) { - /* Handle fs->fs_start + fs->fs_length overflow */ - if (fs->fs_start + fs->fs_length < fs->fs_start) - fs->fs_length = ~0ULL - fs->fs_start; - lun_end = lov_size_to_stripe(lsm, fs->fs_start + fs->fs_length, - stripeno); - } - + lun_end = lov_size_to_stripe(lsm, index, fs->fs_ext.e_end, stripeno); if (lun_start == lun_end) return 0; @@ -1216,7 +1307,8 @@ int fiemap_for_stripe(const struct lu_env *env, struct cl_object *obj, len_mapped_single_call = 0; /* find lobsub object */ - subobj = lov_find_subobj(env, cl2lov(obj), lsm, stripeno); + subobj = lov_find_subobj(env, cl2lov(obj), lsm, + lov_comp_index(index, stripeno)); if (IS_ERR(subobj)) return PTR_ERR(subobj); /* If the output buffer is very large and the objects have many @@ -1233,13 +1325,17 @@ int fiemap_for_stripe(const struct lu_env *env, struct cl_object *obj, lun_start += len_mapped_single_call; fs->fs_fm->fm_length = req_fm_len - len_mapped_single_call; req_fm_len = fs->fs_fm->fm_length; + /** + * If we've collected enough extent map, we'd request 1 more, + * to see whether we coincidentally finished all available + * extent map, so that FIEMAP_EXTENT_LAST would be set. + */ fs->fs_fm->fm_extent_count = fs->fs_enough ? 1 : fs->fs_cnt_need; fs->fs_fm->fm_mapped_extents = 0; fs->fs_fm->fm_flags = fiemap->fm_flags; - ost_index = lsm->lsm_entries[0]->lsme_oinfo[stripeno]-> - loi_ost_idx; + ost_index = lsme->lsme_oinfo[stripeno]->loi_ost_idx; if (ost_index < 0 || ost_index >= lov->desc.ld_tgt_count) GOTO(obj_put, rc = -EINVAL); @@ -1272,7 +1368,7 @@ inactive_tgt: * we need to return */ if (stripeno == fs->fs_last_stripe) { fiemap->fm_mapped_extents = 0; - fs->fs_finish = true; + fs->fs_finish_stripe = true; GOTO(obj_put, rc); } break; @@ -1281,7 +1377,6 @@ inactive_tgt: * We've collected enough extents and there are * more extents after it. */ - fs->fs_finish = true; GOTO(obj_put, rc); } @@ -1306,8 +1401,9 @@ inactive_tgt: * the last extent */ if (fm_ext[ext_count - 1].fe_flags & FIEMAP_EXTENT_LAST) fm_ext[ext_count - 1].fe_flags &= ~FIEMAP_EXTENT_LAST; - if (lov_stripe_size(lsm, fm_ext[ext_count - 1].fe_logical + - fm_ext[ext_count - 1].fe_length, + if (lov_stripe_size(lsm, index, + fm_ext[ext_count - 1].fe_logical + + fm_ext[ext_count - 1].fe_length, stripeno) >= fmkey->lfik_oa.o_size) { ost_eof = true; fs->fs_device_done = true; @@ -1323,7 +1419,7 @@ inactive_tgt: } while (!ost_done && !ost_eof); if (stripeno == fs->fs_last_stripe) - fs->fs_finish = true; + fs->fs_finish_stripe = true; obj_put: cl_object_put(env, subobj); @@ -1348,12 +1444,18 @@ static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj, struct ll_fiemap_info_key *fmkey, struct fiemap *fiemap, size_t *buflen) { - struct lov_stripe_md *lsm; - struct fiemap *fm_local = NULL; - int cur_stripe; - int stripe_count; - unsigned int buffer_size = FIEMAP_BUFFER_SIZE; - int rc = 0; + struct lov_stripe_md_entry *lsme; + struct lov_stripe_md *lsm; + struct fiemap *fm_local = NULL; + loff_t whole_start; + loff_t whole_end; + int entry; + int start_entry; + int end_entry; + int cur_stripe = 0; + int stripe_count; + unsigned int buffer_size = FIEMAP_BUFFER_SIZE; + int rc = 0; struct fiemap_state fs = { 0 }; ENTRY; @@ -1361,13 +1463,17 @@ static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj, if (lsm == NULL) RETURN(-ENODATA); - /** - * If the stripe_count > 1 and the application does not understand - * DEVICE_ORDER flag, it cannot interpret the extents correctly. - */ - if (lsm->lsm_entries[0]->lsme_stripe_count > 1 && - !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER)) - GOTO(out_lsm, rc = -ENOTSUPP); + if (!(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER)) { + /** + * If the entry count > 1 or stripe_count > 1 and the + * application does not understand DEVICE_ORDER flag, + * it cannot interpret the extents correctly. + */ + if (lsm->lsm_entry_count > 1 || + (lsm->lsm_entry_count == 1 && + lsm->lsm_entries[0]->lsme_stripe_count > 1)) + GOTO(out_lsm, rc = -ENOTSUPP); + } if (lsm->lsm_is_released) { if (fiemap->fm_start < fmkey->lfik_oa.o_size) { @@ -1391,6 +1497,7 @@ static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj, GOTO(out_lsm, rc = 0); } + /* buffer_size is small to hold fm_extent_count of extents. */ if (fiemap_count_to_size(fiemap->fm_extent_count) < buffer_size) buffer_size = fiemap_count_to_size(fiemap->fm_extent_count); @@ -1398,34 +1505,6 @@ static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj, if (fm_local == NULL) GOTO(out_lsm, rc = -ENOMEM); - fs.fs_fm = fm_local; - fs.fs_cnt_need = fiemap_size_to_count(buffer_size); - - fs.fs_start = fiemap->fm_start; - /* fs.fs_start is beyond the end of the file */ - if (fs.fs_start > fmkey->lfik_oa.o_size) - GOTO(out_fm_local, rc = -EINVAL); - - fs.fs_length = fiemap->fm_length; - /* Calculate start stripe, last stripe and length of mapping */ - fs.fs_start_stripe = lov_stripe_number(lsm, fs.fs_start); - fs.fs_end = (fs.fs_length == ~0ULL) ? fmkey->lfik_oa.o_size : - fs.fs_start + fs.fs_length - 1; - /* If fs_length != ~0ULL but fs_start+fs_length-1 exceeds file size */ - if (fs.fs_end > fmkey->lfik_oa.o_size) { - fs.fs_end = fmkey->lfik_oa.o_size; - fs.fs_length = fs.fs_end - fs.fs_start; - } - - fs.fs_last_stripe = fiemap_calc_last_stripe(lsm, fs.fs_start, fs.fs_end, - fs.fs_start_stripe, - &stripe_count); - fs.fs_end_offset = fiemap_calc_fm_end_offset(fiemap, lsm, fs.fs_start, - fs.fs_end, - &fs.fs_start_stripe); - if (fs.fs_end_offset == -EINVAL) - GOTO(out_fm_local, rc = -EINVAL); - /** * Requested extent count exceeds the fiemap buffer size, shrink our * ambition. @@ -1435,26 +1514,79 @@ static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj, if (fiemap->fm_extent_count == 0) fs.fs_cnt_need = 0; - fs.fs_finish = false; fs.fs_enough = false; fs.fs_cur_extent = 0; + fs.fs_fm = fm_local; + fs.fs_cnt_need = fiemap_size_to_count(buffer_size); - /* Check each stripe */ - for (cur_stripe = fs.fs_start_stripe; stripe_count > 0; - --stripe_count, - cur_stripe = (cur_stripe + 1) % - lsm->lsm_entries[0]->lsme_stripe_count) { - rc = fiemap_for_stripe(env, obj, lsm, fiemap, buflen, fmkey, - cur_stripe, &fs); - if (rc < 0) - GOTO(out_fm_local, rc); - if (fs.fs_finish) - break; - } /* for each stripe */ + whole_start = fiemap->fm_start; + /* whole_start is beyond the end of the file */ + if (whole_start > fmkey->lfik_oa.o_size) + GOTO(out_fm_local, rc = -EINVAL); + whole_end = (fiemap->fm_length == OBD_OBJECT_EOF) ? + fmkey->lfik_oa.o_size : + whole_start + fiemap->fm_length - 1; + /** + * If fiemap->fm_length != OBD_OBJECT_EOF but whole_end exceeds file + * size + */ + if (whole_end > fmkey->lfik_oa.o_size) + whole_end = fmkey->lfik_oa.o_size; + + start_entry = lov_lsm_entry(lsm, whole_start); + end_entry = lov_lsm_entry(lsm, whole_end); + if (end_entry == -1) + end_entry = lsm->lsm_entry_count - 1; + if (start_entry == -1 || end_entry == -1) + GOTO(out_fm_local, rc = -EINVAL); + + for (entry = start_entry; entry <= end_entry; entry++) { + lsme = lsm->lsm_entries[entry]; + + if (entry == start_entry) + fs.fs_ext.e_start = whole_start; + else + fs.fs_ext.e_start = lsme->lsme_extent.e_start; + if (entry == end_entry) + fs.fs_ext.e_end = whole_end; + else + fs.fs_ext.e_end = lsme->lsme_extent.e_end - 1; + fs.fs_length = fs.fs_ext.e_end - fs.fs_ext.e_start + 1; + + /* Calculate start stripe, last stripe and length of mapping */ + fs.fs_start_stripe = lov_stripe_number(lsm, entry, + fs.fs_ext.e_start); + fs.fs_last_stripe = fiemap_calc_last_stripe(lsm, entry, + &fs.fs_ext, fs.fs_start_stripe, + &stripe_count); + fs.fs_end_offset = fiemap_calc_fm_end_offset(fiemap, lsm, entry, + &fs.fs_ext, &fs.fs_start_stripe); + /* Check each stripe */ + for (cur_stripe = fs.fs_start_stripe; stripe_count > 0; + --stripe_count, + cur_stripe = (cur_stripe + 1) % lsme->lsme_stripe_count) { + rc = fiemap_for_stripe(env, obj, lsm, fiemap, buflen, + fmkey, entry, cur_stripe, &fs); + if (rc < 0) + GOTO(out_fm_local, rc); + if (fs.fs_enough) + GOTO(finish, rc); + if (fs.fs_finish_stripe) + break; + } /* for each stripe */ + } /* for covering layout component */ + /* + * We've traversed all components, set @entry to the last component + * entry, it's for the last stripe check. + */ + entry--; +finish: /* Indicate that we are returning device offsets unless file just has * single stripe */ - if (lsm->lsm_entries[0]->lsme_stripe_count > 1) + if (lsm->lsm_entry_count > 1 || + (lsm->lsm_entry_count == 1 && + lsm->lsm_entries[0]->lsme_stripe_count > 1)) fiemap->fm_flags |= FIEMAP_FLAG_DEVICE_ORDER; if (fiemap->fm_extent_count == 0) @@ -1472,7 +1604,6 @@ out_fm_local: out_lsm: lov_lsm_put(lsm); - return rc; } @@ -1611,23 +1742,28 @@ int lov_read_and_clear_async_rc(struct cl_object *clob) lov_conf_freeze(lov); switch (lov->lo_type) { - case LLT_RAID0: { + case LLT_COMP: { struct lov_stripe_md *lsm; int i; lsm = lov->lo_lsm; LASSERT(lsm != NULL); - for (i = 0; i < lsm->lsm_entries[0]->lsme_stripe_count; - i++) { - struct lov_oinfo *loi = - lsm->lsm_entries[0]->lsme_oinfo[i]; + for (i = 0; i < lsm->lsm_entry_count; i++) { + struct lov_stripe_md_entry *lse = + lsm->lsm_entries[i]; + int j; - if (lov_oinfo_is_dummy(loi)) - continue; + for (j = 0; j < lse->lsme_stripe_count; j++) { + struct lov_oinfo *loi = + lse->lsme_oinfo[j]; - if (loi->loi_ar.ar_rc && !rc) - rc = loi->loi_ar.ar_rc; - loi->loi_ar.ar_rc = 0; + if (lov_oinfo_is_dummy(loi)) + continue; + + if (loi->loi_ar.ar_rc && !rc) + rc = loi->loi_ar.ar_rc; + loi->loi_ar.ar_rc = 0; + } } } case LLT_RELEASED: diff --git a/lustre/lov/lov_offset.c b/lustre/lov/lov_offset.c index ba93d76..3ff0a38 100644 --- a/lustre/lov/lov_offset.c +++ b/lustre/lov/lov_offset.c @@ -38,21 +38,29 @@ #include "lov_internal.h" +static loff_t stripe_width(struct lov_stripe_md *lsm, unsigned int index) +{ + struct lov_stripe_md_entry *entry = lsm->lsm_entries[index]; + + LASSERT(index < lsm->lsm_entry_count); + + return (loff_t)entry->lsme_stripe_size * entry->lsme_stripe_count; +} + /* compute object size given "stripeno" and the ost size */ -u64 lov_stripe_size(struct lov_stripe_md *lsm, u64 ost_size, int stripeno) +u64 lov_stripe_size(struct lov_stripe_md *lsm, int index, u64 ost_size, + int stripeno) { - unsigned long ssize = lsm->lsm_entries[0]->lsme_stripe_size; + unsigned long ssize = lsm->lsm_entries[index]->lsme_stripe_size; unsigned long stripe_size; loff_t swidth; loff_t lov_size; - u32 magic = lsm->lsm_magic; ENTRY; if (ost_size == 0) RETURN(0); - LASSERT(lsm_op_find(magic) != NULL); - lsm_op_find(magic)->lsm_stripe_by_index(lsm, &stripeno, NULL, &swidth); + swidth = stripe_width(lsm, index); /* lov_do_div64(a, b) returns a % b, and a = a / b */ stripe_size = lov_do_div64(ost_size, ssize); @@ -67,12 +75,13 @@ u64 lov_stripe_size(struct lov_stripe_md *lsm, u64 ost_size, int stripeno) /** * Compute file level page index by stripe level page offset */ -pgoff_t lov_stripe_pgoff(struct lov_stripe_md *lsm, pgoff_t stripe_index, - int stripe) +pgoff_t lov_stripe_pgoff(struct lov_stripe_md *lsm, int index, + pgoff_t stripe_index, int stripe) { loff_t offset; - offset = lov_stripe_size(lsm, (stripe_index << PAGE_SHIFT) + 1, + offset = lov_stripe_size(lsm, index, + (stripe_index << PAGE_SHIFT) + 1, stripe); return offset >> PAGE_SHIFT; } @@ -125,14 +134,13 @@ pgoff_t lov_stripe_pgoff(struct lov_stripe_md *lsm, pgoff_t stripe_index, * was moved forward to the start of the stripe in question; 0 when it * falls in the stripe and no shifting was done; > 0 when the offset * was outside the stripe and was pulled back to its final byte. */ -int lov_stripe_offset(struct lov_stripe_md *lsm, loff_t lov_off, int stripeno, - loff_t *obdoff) +int lov_stripe_offset(struct lov_stripe_md *lsm, int index, loff_t lov_off, + int stripeno, loff_t *obdoff) { - unsigned long ssize = lsm->lsm_entries[0]->lsme_stripe_size; + unsigned long ssize = lsm->lsm_entries[index]->lsme_stripe_size; loff_t stripe_off; loff_t this_stripe; loff_t swidth; - u32 magic = lsm->lsm_magic; int ret = 0; if (lov_off == OBD_OBJECT_EOF) { @@ -140,9 +148,7 @@ int lov_stripe_offset(struct lov_stripe_md *lsm, loff_t lov_off, int stripeno, return 0; } - LASSERT(lsm_op_find(magic) != NULL); - lsm_op_find(magic)->lsm_stripe_by_index(lsm, &stripeno, &lov_off, - &swidth); + swidth = stripe_width(lsm, index); /* lov_do_div64(a, b) returns a % b, and a = a / b */ stripe_off = lov_do_div64(lov_off, swidth); @@ -183,21 +189,18 @@ int lov_stripe_offset(struct lov_stripe_md *lsm, loff_t lov_off, int stripeno, * | 0 | 1 | 2 | 0 | 1 | 2 | * --------------------------------------------------------------------- */ -loff_t lov_size_to_stripe(struct lov_stripe_md *lsm, u64 file_size, +loff_t lov_size_to_stripe(struct lov_stripe_md *lsm, int index, u64 file_size, int stripeno) { - unsigned long ssize = lsm->lsm_entries[0]->lsme_stripe_size; + unsigned long ssize = lsm->lsm_entries[index]->lsme_stripe_size; loff_t stripe_off; loff_t this_stripe; loff_t swidth; - u32 magic = lsm->lsm_magic; if (file_size == OBD_OBJECT_EOF) return OBD_OBJECT_EOF; - LASSERT(lsm_op_find(magic) != NULL); - lsm_op_find(magic)->lsm_stripe_by_index(lsm, &stripeno, &file_size, - &swidth); + swidth = stripe_width(lsm, index); /* lov_do_div64(a, b) returns a % b, and a = a / b */ stripe_off = lov_do_div64(file_size, swidth); @@ -226,13 +229,23 @@ loff_t lov_size_to_stripe(struct lov_stripe_md *lsm, u64 file_size, /* given an extent in an lov and a stripe, calculate the extent of the stripe * that is contained within the lov extent. this returns true if the given * stripe does intersect with the lov extent. */ -int lov_stripe_intersects(struct lov_stripe_md *lsm, int stripeno, - u64 start, u64 end, u64 *obd_start, u64 *obd_end) +int lov_stripe_intersects(struct lov_stripe_md *lsm, int index, int stripeno, + struct lu_extent *ext, u64 *obd_start, u64 *obd_end) { + struct lov_stripe_md_entry *entry = lsm->lsm_entries[index]; + u64 start, end; int start_side, end_side; - start_side = lov_stripe_offset(lsm, start, stripeno, obd_start); - end_side = lov_stripe_offset(lsm, end, stripeno, obd_end); + if (!lu_extent_is_overlapped(ext, &entry->lsme_extent)) + return 0; + + start = max_t(__u64, ext->e_start, entry->lsme_extent.e_start); + end = min_t(__u64, ext->e_end, entry->lsme_extent.e_end); + if (end != OBD_OBJECT_EOF) + end--; + + start_side = lov_stripe_offset(lsm, index, start, stripeno, obd_start); + end_side = lov_stripe_offset(lsm, index, end, stripeno, obd_end); CDEBUG(D_INODE, "[%lld->%lld] -> [(%d) %lld->%lld (%d)]\n", start, end, start_side, *obd_start, *obd_end, end_side); @@ -258,15 +271,13 @@ int lov_stripe_intersects(struct lov_stripe_md *lsm, int stripeno, } /* compute which stripe number "lov_off" will be written into */ -int lov_stripe_number(struct lov_stripe_md *lsm, loff_t lov_off) +int lov_stripe_number(struct lov_stripe_md *lsm, int index, loff_t lov_off) { - unsigned long ssize = lsm->lsm_entries[0]->lsme_stripe_size; + unsigned long ssize = lsm->lsm_entries[index]->lsme_stripe_size; loff_t stripe_off; loff_t swidth; - u32 magic = lsm->lsm_magic; - LASSERT(lsm_op_find(magic) != NULL); - lsm_op_find(magic)->lsm_stripe_by_offset(lsm, NULL, &lov_off, &swidth); + swidth = stripe_width(lsm, index); stripe_off = lov_do_div64(lov_off, swidth); diff --git a/lustre/lov/lov_page.c b/lustre/lov/lov_page.c index 5af3d91..ae74d25 100644 --- a/lustre/lov/lov_page.c +++ b/lustre/lov/lov_page.c @@ -49,46 +49,54 @@ * */ -static int lov_raid0_page_print(const struct lu_env *env, - const struct cl_page_slice *slice, - void *cookie, lu_printer_t printer) +static int lov_comp_page_print(const struct lu_env *env, + const struct cl_page_slice *slice, + void *cookie, lu_printer_t printer) { struct lov_page *lp = cl2lov_page(slice); return (*printer)(env, cookie, LUSTRE_LOV_NAME"-page@%p, raid0\n", lp); } -static const struct cl_page_operations lov_raid0_page_ops = { - .cpo_print = lov_raid0_page_print +static const struct cl_page_operations lov_comp_page_ops = { + .cpo_print = lov_comp_page_print }; -int lov_page_init_raid0(const struct lu_env *env, struct cl_object *obj, - struct cl_page *page, pgoff_t index) +int lov_page_init_composite(const struct lu_env *env, struct cl_object *obj, + struct cl_page *page, pgoff_t index) { struct lov_object *loo = cl2lov(obj); - struct lov_layout_raid0 *r0 = lov_r0(loo); struct lov_io *lio = lov_env_io(env); struct cl_object *subobj; struct cl_object *o; struct lov_io_sub *sub; struct lov_page *lpg = cl_object_page_slice(obj, page); + struct lov_layout_raid0 *r0; loff_t offset; - loff_t suboff; + loff_t suboff; + int entry; int stripe; int rc; ENTRY; offset = cl_offset(obj, index); - stripe = lov_stripe_number(loo->lo_lsm, offset); + entry = lov_lsm_entry(loo->lo_lsm, offset); + if (entry < 0) { + /* non-existing layout component */ + lov_page_init_empty(env, obj, page, index); + RETURN(0); + } + + r0 = lov_r0(loo, entry); + stripe = lov_stripe_number(loo->lo_lsm, entry, offset); LASSERT(stripe < r0->lo_nr); - rc = lov_stripe_offset(loo->lo_lsm, offset, stripe, - &suboff); + rc = lov_stripe_offset(loo->lo_lsm, entry, offset, stripe, &suboff); LASSERT(rc == 0); - lpg->lps_stripe = stripe; - cl_page_slice_add(page, &lpg->lps_cl, obj, index, &lov_raid0_page_ops); + lpg->lps_index = lov_comp_index(entry, stripe); + cl_page_slice_add(page, &lpg->lps_cl, obj, index, &lov_comp_page_ops); - sub = lov_sub_get(env, lio, stripe); + sub = lov_sub_get(env, lio, lpg->lps_index); if (IS_ERR(sub)) RETURN(PTR_ERR(sub)); diff --git a/lustre/lov/lovsub_object.c b/lustre/lov/lovsub_object.c index 752a422..976bf3d 100644 --- a/lustre/lov/lovsub_object.c +++ b/lustre/lov/lovsub_object.c @@ -79,13 +79,17 @@ static void lovsub_object_free(const struct lu_env *env, struct lu_object *obj) /* We can't assume lov was assigned here, because of the shadow * object handling in lu_object_find. */ - if (lov) { - LASSERT(lov->lo_type == LLT_RAID0); - LASSERT(lov->u.raid0.lo_sub[los->lso_index] == los); - spin_lock(&lov->u.raid0.lo_sub_lock); - lov->u.raid0.lo_sub[los->lso_index] = NULL; - spin_unlock(&lov->u.raid0.lo_sub_lock); - } + if (lov != NULL) { + int index = lov_comp_entry(los->lso_index); + int stripe = lov_comp_stripe(los->lso_index); + struct lov_layout_raid0 *r0 = lov_r0(lov, index); + + LASSERT(lov->lo_type == LLT_COMP); + LASSERT(r0->lo_sub[stripe] == los); + spin_lock(&r0->lo_sub_lock); + r0->lo_sub[stripe] = NULL; + spin_unlock(&r0->lo_sub_lock); + } lu_object_fini(obj); lu_object_header_fini(&los->lso_header.coh_lu); @@ -104,10 +108,11 @@ static int lovsub_object_print(const struct lu_env *env, void *cookie, static int lovsub_attr_update(const struct lu_env *env, struct cl_object *obj, const struct cl_attr *attr, unsigned valid) { + struct lovsub_object *los = cl2lovsub(obj); struct lov_object *lov = cl2lovsub(obj)->lso_super; ENTRY; - lov_r0(lov)->lo_attr_valid = 0; + lov_r0(lov, lov_comp_entry(los->lso_index))->lo_attr_valid = 0; RETURN(0); } @@ -138,7 +143,7 @@ static void lovsub_req_attr_set(const struct lu_env *env, struct cl_object *obj, * There is no OBD_MD_* flag for obdo::o_stripe_idx, so set it * unconditionally. It never changes anyway. */ - attr->cra_oa->o_stripe_idx = subobj->lso_index; + attr->cra_oa->o_stripe_idx = lov_comp_stripe(subobj->lso_index); EXIT; } diff --git a/lustre/osc/osc_lock.c b/lustre/osc/osc_lock.c index 2151568..6069c21 100644 --- a/lustre/osc/osc_lock.c +++ b/lustre/osc/osc_lock.c @@ -1137,17 +1137,14 @@ static void osc_lock_set_writer(const struct lu_env *env, io_start = cl_index(obj, io->u.ci_rw.crw_pos); io_end = cl_index(obj, io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count - 1); - if (cl_io_is_append(io)) { - io_start = 0; - io_end = CL_PAGE_EOF; - } } else { LASSERT(cl_io_is_mkwrite(io)); io_start = io_end = io->u.ci_fault.ft_index; } if (descr->cld_mode >= CLM_WRITE && - descr->cld_start <= io_start && descr->cld_end >= io_end) { + (cl_io_is_append(io) || + (descr->cld_start <= io_start && descr->cld_end >= io_end))) { struct osc_io *oio = osc_env_io(env); /* There must be only one lock to match the write region */ -- 1.8.3.1