Whamcloud - gitweb
LU-8998 clio: Client side implementation for PFL
authorBobi Jam <bobijam.xu@intel.com>
Wed, 5 Apr 2017 23:58:41 +0000 (07:58 +0800)
committerJinshan Xiong <jinshan.xiong@intel.com>
Thu, 6 Apr 2017 04:31:52 +0000 (21:31 -0700)
Make client layer support composite layout.

Plain layout will be stored in LOV layer as a composite layout
containing a single component.

Reviewed-on: https://review.whamcloud.com/24850

Signed-off-by: Jinshan Xiong <jinshan.xiong@intel.com>
Signed-off-by: Bobi Jam <bobijam.xu@intel.com>
Signed-off-by: Niu Yawei <yawei.niu@intel.com>
Change-Id: Ic3b85a4b10c66745e5c72ff02ea313baa0b12bb5
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
12 files changed:
lustre/include/lustre/lustre_user.h
lustre/lov/lov_cl_internal.h
lustre/lov/lov_ea.c
lustre/lov/lov_internal.h
lustre/lov/lov_io.c
lustre/lov/lov_lock.c
lustre/lov/lov_merge.c
lustre/lov/lov_object.c
lustre/lov/lov_offset.c
lustre/lov/lov_page.c
lustre/lov/lovsub_object.c
lustre/osc/osc_lock.c

index 4e30017..31ea745 100644 (file)
@@ -419,6 +419,15 @@ struct lu_extent {
        __u64   e_end;
 };
 
+#define DEXT "[ %#llx , %#llx )"
+#define PEXT(ext) (ext)->e_start, (ext)->e_end
+
+static inline bool lu_extent_is_overlapped(struct lu_extent *e1,
+                                          struct lu_extent *e2)
+{
+       return e1->e_start < e2->e_end && e2->e_start < e1->e_end;
+}
+
 enum lov_comp_md_entry_flags {
        LCME_FL_PRIMARY = 0x00000001,   /* Not used */
        LCME_FL_STALE   = 0x00000002,   /* Not used */
@@ -465,7 +474,6 @@ struct lov_comp_md_v1 {
        struct lov_comp_md_entry_v1 lcm_entries[0];
 } __attribute__((packed));
 
-
 static inline __u32 lov_user_md_size(__u16 stripes, __u32 lmm_magic)
 {
        if (lmm_magic == LOV_USER_MAGIC_V1)
index 4edb452..395b5f9 100644 (file)
@@ -108,8 +108,8 @@ struct lov_device {
  */
 enum lov_layout_type {
        LLT_EMPTY,      /** empty file without body (mknod + truncate) */
-       LLT_RAID0,      /** striped file */
        LLT_RELEASED,   /** file with no objects (data in HSM) */
+       LLT_COMP,       /** support composite layout */
        LLT_NR
 };
 
@@ -118,10 +118,10 @@ static inline char *llt2str(enum lov_layout_type llt)
        switch (llt) {
        case LLT_EMPTY:
                return "EMPTY";
-       case LLT_RAID0:
-               return "RAID0";
        case LLT_RELEASED:
                return "RELEASED";
+       case LLT_COMP:
+               return "COMPOSITE";
        case LLT_NR:
                LBUG();
        }
@@ -129,6 +129,42 @@ static inline char *llt2str(enum lov_layout_type llt)
        return "";
 }
 
+struct lov_layout_raid0 {
+       unsigned               lo_nr;
+       /**
+        * When this is true, lov_object::lo_attr contains
+        * valid up to date attributes for a top-level
+        * object. This field is reset to 0 when attributes of
+        * any sub-object change.
+        */
+       int                    lo_attr_valid;
+       /**
+        * Array of sub-objects. Allocated when top-object is
+        * created (lov_init_raid0()).
+        *
+        * Top-object is a strict master of its sub-objects:
+        * it is created before them, and outlives its
+        * children (this later is necessary so that basic
+        * functions like cl_object_top() always
+        * work). Top-object keeps a reference on every
+        * sub-object.
+        *
+        * When top-object is destroyed (lov_delete_raid0())
+        * it releases its reference to a sub-object and waits
+        * until the latter is finally destroyed.
+        */
+       struct lovsub_object **lo_sub;
+       /**
+        * protect lo_sub
+        */
+       spinlock_t              lo_sub_lock;
+       /**
+        * Cached object attribute, built from sub-object
+        * attributes.
+        */
+       struct cl_attr         lo_attr;
+};
+
 /**
  * lov-specific file state.
  *
@@ -178,47 +214,20 @@ struct lov_object {
        struct lov_stripe_md  *lo_lsm;
 
        union lov_layout_state {
-               struct lov_layout_raid0 {
-                       unsigned               lo_nr;
-                       /**
-                        * When this is true, lov_object::lo_attr contains
-                        * valid up to date attributes for a top-level
-                        * object. This field is reset to 0 when attributes of
-                        * any sub-object change.
-                        */
-                       int                    lo_attr_valid;
-                       /**
-                        * Array of sub-objects. Allocated when top-object is
-                        * created (lov_init_raid0()).
-                        *
-                        * Top-object is a strict master of its sub-objects:
-                        * it is created before them, and outlives its
-                        * children (this later is necessary so that basic
-                        * functions like cl_object_top() always
-                        * work). Top-object keeps a reference on every
-                        * sub-object.
-                        *
-                        * When top-object is destroyed (lov_delete_raid0())
-                        * it releases its reference to a sub-object and waits
-                        * until the latter is finally destroyed.
-                        *
-                        * May be vmalloc'd, must be freed with OBD_FREE_LARGE.
-                        */
-                       struct lovsub_object **lo_sub;
-                       /**
-                        * protect lo_sub
-                        */
-                       spinlock_t              lo_sub_lock;
-                       /**
-                        * Cached object attribute, built from sub-object
-                        * attributes.
-                        */
-                       struct cl_attr         lo_attr;
-               } raid0;
                struct lov_layout_state_empty {
                } empty;
                struct lov_layout_state_released {
                } released;
+               struct lov_layout_composite {
+                       /**
+                        * Current valid entry count of lo_entries.
+                        */
+                       unsigned int lo_entry_count;
+                       struct lov_layout_entry {
+                               struct lu_extent lle_extent;
+                               struct lov_layout_raid0 lle_raid0;
+                       } *lo_entries;
+               } composite;
        } u;
        /**
         * Thread that acquired lov_object::lo_type_guard in an exclusive
@@ -227,6 +236,12 @@ struct lov_object {
        struct task_struct            *lo_owner;
 };
 
+#define lov_foreach_layout_entry(lov, entry)                   \
+       for (entry = &lov->u.composite.lo_entries[0];           \
+            entry < &lov->u.composite.lo_entries               \
+                       [lov->u.composite.lo_entry_count];      \
+            entry++)
+
 /**
  * State lov_lock keeps for each sub-lock.
  */
@@ -237,7 +252,7 @@ struct lov_lock_sub {
         * hold resources of underlying layers */
        unsigned int            sub_is_enqueued:1,
                                sub_initialized:1;
-       int                     sub_stripe;
+       int                     sub_index;
 };
 
 /**
@@ -253,7 +268,8 @@ struct lov_lock {
 
 struct lov_page {
        struct cl_page_slice    lps_cl;
-       unsigned int            lps_stripe; /* stripe index */
+       /** layout_entry + stripe index, composed using lov_comp_index() */
+       unsigned int            lps_index;
 };
 
 /*
@@ -305,38 +321,33 @@ struct lov_thread_info {
  * State that lov_io maintains for every sub-io.
  */
 struct lov_io_sub {
-       __u16                   sub_stripe;
        /**
-        * environment's refcheck.
-        *
-        * \see cl_env_get()
-        */
-       __u16                   sub_refcheck;
-       /**
-        * true, iff cl_io_init() was successfully executed against
-        * lov_io_sub::sub_io.
-        */
-       __u16                   sub_io_initialized:1,
-       /**
-        * True, iff lov_io_sub::sub_io and lov_io_sub::sub_env weren't
-        * allocated, but borrowed from a per-device emergency pool.
+        * Linkage into a list (hanging off lov_io::lis_subios)
         */
-                               sub_borrowed:1;
+       struct list_head        sub_list;
        /**
         * Linkage into a list (hanging off lov_io::lis_active) of all
         * sub-io's active for the current IO iteration.
         */
        struct list_head        sub_linkage;
+       unsigned int            sub_subio_index;
        /**
         * sub-io for a stripe. Ideally sub-io's can be stopped and resumed
         * independently, with lov acting as a scheduler to maximize overall
         * throughput.
         */
-       struct cl_io            *sub_io;
+       struct cl_io            sub_io;
        /**
         * environment, in which sub-io executes.
         */
        struct lu_env           *sub_env;
+       /**
+        * environment's refcheck.
+        *
+        * \see cl_env_get()
+        */
+       __u16                   sub_refcheck;
+       __u16                   sub_reenter;
 };
 
 /**
@@ -364,32 +375,29 @@ struct lov_io {
          * starting position within a file, for the current io loop iteration
          * (stripe), used by ci_io_loop().
          */
-       loff_t                   lis_pos;
+       loff_t                  lis_pos;
        /**
         * end position with in a file, for the current stripe io. This is
         * exclusive (i.e., next offset after last byte affected by io).
         */
-       loff_t                   lis_endpos;
-
-       int                     lis_stripe_count;
-       int                     lis_active_subios;
+       loff_t                  lis_endpos;
+       int                     lis_nr_subios;
 
        /**
         * the index of ls_single_subio in ls_subios array
         */
        int                     lis_single_subio_index;
-       struct cl_io            lis_single_subio;
+       struct lov_io_sub       lis_single_subio;
 
        /**
-        * size of ls_subios array, actually the highest stripe #
-        * May be vmalloc'd, must be freed with OBD_FREE_LARGE().
+        * List of active sub-io's. Active sub-io's are under the range
+        * of [lis_pos, lis_endpos).
         */
-       int                     lis_nr_subios;
-       struct lov_io_sub       *lis_subs;
+       struct list_head        lis_active;
        /**
-        * List of active sub-io's.
+        * All sub-io's created in this lov_io.
         */
-       struct list_head        lis_active;
+       struct list_head        lis_subios;
 };
 
 struct lov_session {
@@ -422,11 +430,11 @@ int   lov_io_init         (const struct lu_env *env, struct cl_object *obj,
 int   lovsub_lock_init    (const struct lu_env *env, struct cl_object *obj,
                            struct cl_lock *lock, const struct cl_io *io);
 
-int   lov_lock_init_raid0 (const struct lu_env *env, struct cl_object *obj,
+int   lov_lock_init_composite(const struct lu_env *env, struct cl_object *obj,
                            struct cl_lock *lock, const struct cl_io *io);
 int   lov_lock_init_empty (const struct lu_env *env, struct cl_object *obj,
                            struct cl_lock *lock, const struct cl_io *io);
-int   lov_io_init_raid0   (const struct lu_env *env, struct cl_object *obj,
+int   lov_io_init_composite(const struct lu_env *env, struct cl_object *obj,
                            struct cl_io *io);
 int   lov_io_init_empty   (const struct lu_env *env, struct cl_object *obj,
                            struct cl_io *io);
@@ -442,7 +450,7 @@ int   lovsub_page_init    (const struct lu_env *env, struct cl_object *ob,
                           struct cl_page *page, pgoff_t index);
 int   lov_page_init_empty (const struct lu_env *env, struct cl_object *obj,
                           struct cl_page *page, pgoff_t index);
-int   lov_page_init_raid0 (const struct lu_env *env, struct cl_object *obj,
+int   lov_page_init_composite(const struct lu_env *env, struct cl_object *obj,
                           struct cl_page *page, pgoff_t index);
 struct lu_object *lov_object_alloc   (const struct lu_env *env,
                                       const struct lu_object_header *hdr,
@@ -453,6 +461,7 @@ struct lu_object *lovsub_object_alloc(const struct lu_env *env,
 
 struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov);
 int lov_page_stripe(const struct cl_page *page);
+int lov_lsm_entry(const struct lov_stripe_md *lsm, __u64 offset);
 
 #define lov_foreach_target(lov, var)                    \
         for (var = 0; var < lov_targets_nr(lov); ++var)
@@ -625,12 +634,21 @@ static inline struct lov_thread_info *lov_env_info(const struct lu_env *env)
         return info;
 }
 
-static inline struct lov_layout_raid0 *lov_r0(struct lov_object *lov)
+static inline struct lov_layout_raid0 *lov_r0(struct lov_object *lov, int i)
 {
-       LASSERT(lov->lo_type == LLT_RAID0);
-       LASSERT(lov->lo_lsm->lsm_magic == LOV_MAGIC ||
-               lov->lo_lsm->lsm_magic == LOV_MAGIC_V3);
-       return &lov->u.raid0;
+       LASSERT(lov->lo_type == LLT_COMP);
+       LASSERTF(i < lov->u.composite.lo_entry_count,
+                "entry %d entry_count %d", i, lov->u.composite.lo_entry_count);
+
+       return &lov->u.composite.lo_entries[i].lle_raid0;
+}
+
+static inline struct lov_stripe_md_entry *lov_lse(struct lov_object *lov, int i)
+{
+       LASSERT(lov->lo_lsm != NULL);
+       LASSERT(i < lov->lo_lsm->lsm_entry_count);
+
+       return lov->lo_lsm->lsm_entries[i];
 }
 
 /* lov_pack.c */
index 896b403..4f8271e 100644 (file)
@@ -308,24 +308,6 @@ out_lsme:
        return ERR_PTR(rc);
 }
 
-static void
-lsm_stripe_by_index_plain(struct lov_stripe_md *lsm, int *stripeno,
-                         loff_t *lov_off, loff_t *swidth)
-{
-       if (swidth != NULL)
-               *swidth = (loff_t)lsm->lsm_entries[0]->lsme_stripe_size *
-                         lsm->lsm_entries[0]->lsme_stripe_count;
-}
-
-static void
-lsm_stripe_by_offset_plain(struct lov_stripe_md *lsm, int *stripeno,
-                          loff_t *lov_off, loff_t *swidth)
-{
-       if (swidth != NULL)
-               *swidth = (loff_t)lsm->lsm_entries[0]->lsme_stripe_size *
-                         lsm->lsm_entries[0]->lsme_stripe_count;
-}
-
 static inline struct lov_stripe_md *
 lsm_unpackmd_v1(struct lov_obd *lov, void *buf, size_t buf_size)
 {
@@ -335,8 +317,6 @@ lsm_unpackmd_v1(struct lov_obd *lov, void *buf, size_t buf_size)
 }
 
 const struct lsm_operations lsm_v1_ops = {
-        .lsm_stripe_by_index    = lsm_stripe_by_index_plain,
-        .lsm_stripe_by_offset   = lsm_stripe_by_offset_plain,
         .lsm_unpackmd           = lsm_unpackmd_v1,
 };
 
@@ -350,8 +330,6 @@ lsm_unpackmd_v3(struct lov_obd *lov, void *buf, size_t buf_size)
 }
 
 const struct lsm_operations lsm_v3_ops = {
-       .lsm_stripe_by_index    = lsm_stripe_by_index_plain,
-       .lsm_stripe_by_offset   = lsm_stripe_by_offset_plain,
        .lsm_unpackmd           = lsm_unpackmd_v3,
 };
 
@@ -499,19 +477,44 @@ out_lsm:
 }
 
 const struct lsm_operations lsm_comp_md_v1_ops = {
-       .lsm_stripe_by_index  = lsm_stripe_by_index_plain,
-       .lsm_stripe_by_offset = lsm_stripe_by_offset_plain,
        .lsm_unpackmd         = lsm_unpackmd_comp_md_v1,
 };
 
 void dump_lsm(unsigned int level, const struct lov_stripe_md *lsm)
 {
-       CDEBUG(level, "lsm %p, objid "DOSTID", maxbytes %#llx, magic 0x%08X,"
-              " stripe_size %u, stripe_count %u, refc: %d,"
-              " layout_gen %u, pool ["LOV_POOLNAMEF"]\n", lsm,
-              POSTID(&lsm->lsm_oi), lsm->lsm_maxbytes, lsm->lsm_magic,
-              lsm->lsm_entries[0]->lsme_stripe_size,
-              lsm->lsm_entries[0]->lsme_stripe_count,
-              atomic_read(&lsm->lsm_refc), lsm->lsm_layout_gen,
-              lsm->lsm_entries[0]->lsme_pool_name);
+       int i;
+
+       CDEBUG(level, "lsm %p, objid "DOSTID", maxbytes %#llx, magic 0x%08X, "
+              "refc: %d, entry: %u, layout_gen %u\n",
+              lsm, POSTID(&lsm->lsm_oi), lsm->lsm_maxbytes, lsm->lsm_magic,
+              atomic_read(&lsm->lsm_refc), lsm->lsm_entry_count,
+              lsm->lsm_layout_gen);
+
+       for (i = 0; i < lsm->lsm_entry_count; i++) {
+               struct lov_stripe_md_entry *lse = lsm->lsm_entries[i];
+
+               CDEBUG(level,
+                      DEXT ": id: %u, magic 0x%08X, stripe count %u, "
+                      "size %u, layout_gen %u, pool: ["LOV_POOLNAMEF"]\n",
+                      PEXT(&lse->lsme_extent), lse->lsme_id, lse->lsme_magic,
+                      lse->lsme_stripe_count, lse->lsme_stripe_size,
+                      lse->lsme_layout_gen, lse->lsme_pool_name);
+       }
+}
+
+int lov_lsm_entry(const struct lov_stripe_md *lsm, __u64 offset)
+{
+       int i;
+
+       for (i = 0; i < lsm->lsm_entry_count; i++) {
+               struct lov_stripe_md_entry *lse = lsm->lsm_entries[i];
+
+               if ((offset >= lse->lsme_extent.e_start &&
+                    offset < lse->lsme_extent.e_end) ||
+                   (offset == OBD_OBJECT_EOF &&
+                    lse->lsme_extent.e_end == OBD_OBJECT_EOF))
+                       return i;
+       }
+
+       return -1;
 }
index 376c7eb..c4db325 100644 (file)
@@ -74,11 +74,25 @@ static inline bool lsm_has_objects(struct lov_stripe_md *lsm)
        return lsm != NULL && !lsm->lsm_is_released;
 }
 
+static inline unsigned int lov_comp_index(int entry, int stripe)
+{
+       LASSERT(entry >= 0 && entry <= SHRT_MAX);
+       LASSERT(stripe >= 0 && stripe < USHRT_MAX);
+
+       return entry << 16 | stripe;
+}
+
+static inline int lov_comp_stripe(int index)
+{
+       return index & 0xffff;
+}
+
+static inline int lov_comp_entry(int index)
+{
+       return index >> 16;
+}
+
 struct lsm_operations {
-       void (*lsm_stripe_by_index)(struct lov_stripe_md *, int *, loff_t *,
-                                   loff_t *);
-       void (*lsm_stripe_by_offset)(struct lov_stripe_md *, int *, loff_t *,
-                                    loff_t *);
        struct lov_stripe_md *(*lsm_unpackmd)(struct lov_obd *, void *, size_t);
 };
 
@@ -172,20 +186,21 @@ extern struct lu_kmem_descr lov_caches[];
         (char *)((lv)->lov_tgts[index]->ltd_uuid.uuid)
 
 /* lov_merge.c */
-int lov_merge_lvb_kms(struct lov_stripe_md *lsm,
+int lov_merge_lvb_kms(struct lov_stripe_md *lsm, int index,
                       struct ost_lvb *lvb, __u64 *kms_place);
 
 /* lov_offset.c */
-u64 lov_stripe_size(struct lov_stripe_md *lsm, u64 ost_size, int stripeno);
-int lov_stripe_offset(struct lov_stripe_md *lsm, loff_t lov_off, int stripeno,
-                     loff_t *obd_off);
-loff_t lov_size_to_stripe(struct lov_stripe_md *lsm, u64 file_size,
+u64 lov_stripe_size(struct lov_stripe_md *lsm, int index,
+                   u64 ost_size, int stripeno);
+int lov_stripe_offset(struct lov_stripe_md *lsm, int index, loff_t lov_off,
+                     int stripeno, loff_t *obd_off);
+loff_t lov_size_to_stripe(struct lov_stripe_md *lsm, int index, u64 file_size,
                          int stripeno);
-int lov_stripe_intersects(struct lov_stripe_md *lsm, int stripeno,
-                         u64 start, u64 end, u64 *obd_start, u64 *obd_end);
-int lov_stripe_number(struct lov_stripe_md *lsm, loff_t lov_off);
-pgoff_t lov_stripe_pgoff(struct lov_stripe_md *lsm, pgoff_t stripe_index,
-                        int stripe);
+int lov_stripe_intersects(struct lov_stripe_md *lsm, int index, int stripeno,
+                         struct lu_extent *ext, u64 *obd_start, u64 *obd_end);
+int lov_stripe_number(struct lov_stripe_md *lsm, int index, loff_t lov_off);
+pgoff_t lov_stripe_pgoff(struct lov_stripe_md *lsm, int index,
+                        pgoff_t stripe_index, int stripe);
 
 /* lov_request.c */
 int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
index 00f6ae0..b8c529f 100644 (file)
  *  @{
  */
 
-static void lov_io_sub_fini(const struct lu_env *env, struct lov_io *lio,
-                            struct lov_io_sub *sub)
+static inline struct lov_io_sub *lov_sub_alloc(struct lov_io *lio, int index)
 {
-        ENTRY;
-        if (sub->sub_io != NULL) {
-               if (sub->sub_io_initialized) {
-                       cl_io_fini(sub->sub_env, sub->sub_io);
-                       sub->sub_io_initialized = 0;
-                       lio->lis_active_subios--;
-               }
-                if (sub->sub_stripe == lio->lis_single_subio_index)
-                        lio->lis_single_subio_index = -1;
-                else if (!sub->sub_borrowed)
-                        OBD_FREE_PTR(sub->sub_io);
-                sub->sub_io = NULL;
-        }
-        if (sub->sub_env != NULL && !IS_ERR(sub->sub_env)) {
-                if (!sub->sub_borrowed)
-                        cl_env_put(sub->sub_env, &sub->sub_refcheck);
-                sub->sub_env = NULL;
-        }
-        EXIT;
+       struct lov_io_sub *sub;
+
+       if (lio->lis_nr_subios == 0) {
+               LASSERT(lio->lis_single_subio_index == -1);
+               sub = &lio->lis_single_subio;
+               lio->lis_single_subio_index = index;
+               memset(sub, 0, sizeof(*sub));
+       } else {
+               OBD_ALLOC_PTR(sub);
+       }
+
+       if (sub != NULL) {
+               INIT_LIST_HEAD(&sub->sub_list);
+               INIT_LIST_HEAD(&sub->sub_linkage);
+               sub->sub_subio_index = index;
+       }
+
+       return sub;
 }
 
-static void lov_io_sub_inherit(struct cl_io *io, struct lov_io *lio,
-                               int stripe, loff_t start, loff_t end)
+static inline void lov_sub_free(struct lov_io *lio, struct lov_io_sub *sub)
 {
-       struct lov_stripe_md *lsm    = lio->lis_object->lo_lsm;
-       struct cl_io         *parent = lio->lis_cl.cis_io;
+       if (sub->sub_subio_index == lio->lis_single_subio_index) {
+               LASSERT(sub == &lio->lis_single_subio);
+               lio->lis_single_subio_index = -1;
+       } else {
+               OBD_FREE_PTR(sub);
+       }
+}
 
-       switch (io->ci_type) {
-       case CIT_SETATTR: {
-               io->u.ci_setattr.sa_attr = parent->u.ci_setattr.sa_attr;
-               io->u.ci_setattr.sa_attr_flags =
-                       parent->u.ci_setattr.sa_attr_flags;
-               io->u.ci_setattr.sa_valid = parent->u.ci_setattr.sa_valid;
-               io->u.ci_setattr.sa_stripe_index = stripe;
-               io->u.ci_setattr.sa_parent_fid =
-                                       parent->u.ci_setattr.sa_parent_fid;
-                if (cl_io_is_trunc(io)) {
-                        loff_t new_size = parent->u.ci_setattr.sa_attr.lvb_size;
+static void lov_io_sub_fini(const struct lu_env *env, struct lov_io *lio,
+                           struct lov_io_sub *sub)
+{
+       ENTRY;
 
-                        new_size = lov_size_to_stripe(lsm, new_size, stripe);
-                        io->u.ci_setattr.sa_attr.lvb_size = new_size;
-                }
-                break;
-        }
-       case CIT_DATA_VERSION: {
-               io->u.ci_data_version.dv_data_version = 0;
-               io->u.ci_data_version.dv_flags =
-                       parent->u.ci_data_version.dv_flags;
-               break;
-       }
-        case CIT_FAULT: {
-                struct cl_object *obj = parent->ci_obj;
-                loff_t off = cl_offset(obj, parent->u.ci_fault.ft_index);
+       cl_io_fini(sub->sub_env, &sub->sub_io);
 
-                io->u.ci_fault = parent->u.ci_fault;
-                off = lov_size_to_stripe(lsm, off, stripe);
-                io->u.ci_fault.ft_index = cl_index(obj, off);
-                break;
-        }
-       case CIT_FSYNC: {
-               io->u.ci_fsync.fi_start = start;
-               io->u.ci_fsync.fi_end = end;
-               io->u.ci_fsync.fi_fid = parent->u.ci_fsync.fi_fid;
-               io->u.ci_fsync.fi_mode = parent->u.ci_fsync.fi_mode;
-               break;
-       }
-       case CIT_READ:
-       case CIT_WRITE: {
-               io->u.ci_wr.wr_sync = cl_io_is_sync_write(parent);
-               if (cl_io_is_append(parent)) {
-                       io->u.ci_wr.wr_append = 1;
-               } else {
-                       io->u.ci_rw.crw_pos = start;
-                       io->u.ci_rw.crw_count = end - start;
-               }
-               break;
-       }
-       case CIT_LADVISE: {
-               io->u.ci_ladvise.li_start = start;
-               io->u.ci_ladvise.li_end = end;
-               io->u.ci_ladvise.li_fid = parent->u.ci_ladvise.li_fid;
-               io->u.ci_ladvise.li_advice = parent->u.ci_ladvise.li_advice;
-               io->u.ci_ladvise.li_flags = parent->u.ci_ladvise.li_flags;
-               break;
-       }
-       default:
-               break;
+       if (sub->sub_env != NULL && !IS_ERR(sub->sub_env)) {
+               cl_env_put(sub->sub_env, &sub->sub_refcheck);
+               sub->sub_env = NULL;
        }
+       EXIT;
 }
 
 static int lov_io_sub_init(const struct lu_env *env, struct lov_io *lio,
-                           struct lov_io_sub *sub)
+                          struct lov_io_sub *sub)
 {
        struct lov_object *lov = lio->lis_object;
        struct cl_io *sub_io;
        struct cl_object *sub_obj;
        struct cl_io *io = lio->lis_cl.cis_io;
-       int stripe = sub->sub_stripe;
-       int rc;
-
-        LASSERT(sub->sub_io == NULL);
-        LASSERT(sub->sub_env == NULL);
-        LASSERT(sub->sub_stripe < lio->lis_stripe_count);
-        ENTRY;
+       int index = lov_comp_entry(sub->sub_subio_index);
+       int stripe = lov_comp_stripe(sub->sub_subio_index);
+       int result = 0;
+       LASSERT(sub->sub_env == NULL);
+       ENTRY;
 
-       if (unlikely(lov_r0(lov)->lo_sub[stripe] == NULL))
+       if (unlikely(lov_r0(lov, index)->lo_sub[stripe] == NULL))
                RETURN(-EIO);
 
-        sub->sub_io_initialized = 0;
-        sub->sub_borrowed = 0;
-
        /* obtain new environment */
        sub->sub_env = cl_env_get(&sub->sub_refcheck);
        if (IS_ERR(sub->sub_env))
-               GOTO(fini_lov_io, rc = PTR_ERR(sub->sub_env));
-
-       /*
-        * First sub-io. Use ->lis_single_subio to
-        * avoid dynamic allocation.
-        */
-       if (lio->lis_active_subios == 0) {
-               sub->sub_io = &lio->lis_single_subio;
-               lio->lis_single_subio_index = stripe;
-       } else {
-               OBD_ALLOC_PTR(sub->sub_io);
-               if (sub->sub_io == NULL)
-                       GOTO(fini_lov_io, rc = -ENOMEM);
-       }
+               result = PTR_ERR(sub->sub_env);
 
-       sub_obj = lovsub2cl(lov_r0(lov)->lo_sub[stripe]);
-       sub_io sub->sub_io;
+       sub_obj = lovsub2cl(lov_r0(lov, index)->lo_sub[stripe]);
+       sub_io  = &sub->sub_io;
 
-       sub_io->ci_obj = sub_obj;
+       sub_io->ci_obj    = sub_obj;
        sub_io->ci_result = 0;
-       sub_io->ci_parent = io;
+
+       sub_io->ci_parent  = io;
        sub_io->ci_lockreq = io->ci_lockreq;
-       sub_io->ci_type = io->ci_type;
+       sub_io->ci_type    = io->ci_type;
        sub_io->ci_no_srvlock = io->ci_no_srvlock;
        sub_io->ci_noatime = io->ci_noatime;
 
-       rc = cl_io_sub_init(sub->sub_env, sub_io, io->ci_type, sub_obj);
-       if (rc >= 0) {
-               lio->lis_active_subios++;
-               sub->sub_io_initialized = 1;
-               rc = 0;
-       }
-fini_lov_io:
-       if (rc != 0)
+       result = cl_io_sub_init(sub->sub_env, sub_io, io->ci_type, sub_obj);
+
+       if (result < 0)
                lov_io_sub_fini(env, lio, sub);
-       RETURN(rc);
+
+       RETURN(result);
 }
 
 struct lov_io_sub *lov_sub_get(const struct lu_env *env,
-                               struct lov_io *lio, int stripe)
+                              struct lov_io *lio, int index)
 {
-        int rc;
-        struct lov_io_sub *sub = &lio->lis_subs[stripe];
+       struct lov_io_sub *sub;
+       int rc = 0;
 
-        LASSERT(stripe < lio->lis_stripe_count);
-        ENTRY;
+       ENTRY;
 
-        if (!sub->sub_io_initialized) {
-                sub->sub_stripe = stripe;
-                rc = lov_io_sub_init(env, lio, sub);
-        } else
-                rc = 0;
+       list_for_each_entry(sub, &lio->lis_subios, sub_list) {
+               if (sub->sub_subio_index == index) {
+                       rc = 1;
+                       break;
+               }
+       }
+
+       if (rc == 0) {
+               sub = lov_sub_alloc(lio, index);
+               if (sub == NULL)
+                       GOTO(out, rc = -ENOMEM);
 
+               rc = lov_io_sub_init(env, lio, sub);
+               if (rc < 0) {
+                       lov_sub_free(lio, sub);
+                       GOTO(out, rc);
+               }
+
+               list_add_tail(&sub->sub_list, &lio->lis_subios);
+               lio->lis_nr_subios++;
+       }
+out:
        if (rc < 0)
                sub = ERR_PTR(rc);
-
        RETURN(sub);
 }
 
@@ -225,7 +171,7 @@ struct lov_io_sub *lov_sub_get(const struct lu_env *env,
  *
  */
 
-int lov_page_stripe(const struct cl_page *page)
+int lov_page_index(const struct cl_page *page)
 {
        const struct cl_page_slice *slice;
        ENTRY;
@@ -234,35 +180,21 @@ int lov_page_stripe(const struct cl_page *page)
        LASSERT(slice != NULL);
        LASSERT(slice->cpl_obj != NULL);
 
-       RETURN(cl2lov_page(slice)->lps_stripe);
+       RETURN(cl2lov_page(slice)->lps_index);
 }
 
 static int lov_io_subio_init(const struct lu_env *env, struct lov_io *lio,
                              struct cl_io *io)
 {
-       struct lov_stripe_md *lsm;
-       int result;
        ENTRY;
 
        LASSERT(lio->lis_object != NULL);
-       lsm = lio->lis_object->lo_lsm;
 
-       /*
-        * Need to be optimized, we can't afford to allocate a piece of memory
-        * when writing a page. -jay
-        */
-       OBD_ALLOC_LARGE(lio->lis_subs,
-                       lsm->lsm_entries[0]->lsme_stripe_count *
-                       sizeof lio->lis_subs[0]);
-       if (lio->lis_subs != NULL) {
-               lio->lis_nr_subios = lio->lis_stripe_count;
-               lio->lis_single_subio_index = -1;
-               lio->lis_active_subios = 0;
-               result = 0;
-       } else
-               result = -ENOMEM;
+       INIT_LIST_HEAD(&lio->lis_subios);
+       lio->lis_single_subio_index = -1;
+       lio->lis_nr_subios = 0;
 
-       RETURN(result);
+       RETURN(0);
 }
 
 static int lov_io_slice_init(struct lov_io *lio,
@@ -274,7 +206,6 @@ static int lov_io_slice_init(struct lov_io *lio,
        lio->lis_object = obj;
 
        LASSERT(obj->lo_lsm != NULL);
-       lio->lis_stripe_count = obj->lo_lsm->lsm_entries[0]->lsme_stripe_count;
 
         switch (io->ci_type) {
         case CIT_READ:
@@ -344,16 +275,23 @@ static void lov_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
 {
        struct lov_io *lio = cl2lov_io(env, ios);
        struct lov_object *lov = cl2lov(ios->cis_obj);
-       int i;
 
        ENTRY;
-       if (lio->lis_subs != NULL) {
-               for (i = 0; i < lio->lis_nr_subios; i++)
-                       lov_io_sub_fini(env, lio, &lio->lis_subs[i]);
-               OBD_FREE_LARGE(lio->lis_subs,
-                        lio->lis_nr_subios * sizeof lio->lis_subs[0]);
-               lio->lis_nr_subios = 0;
+
+       LASSERT(list_empty(&lio->lis_active));
+
+       while (!list_empty(&lio->lis_subios)) {
+               struct lov_io_sub *sub = list_entry(lio->lis_subios.next,
+                                                   struct lov_io_sub,
+                                                   sub_list);
+
+               list_del_init(&sub->sub_list);
+               lio->lis_nr_subios--;
+
+               lov_io_sub_fini(env, lio, sub);
+               lov_sub_free(lio, sub);
        }
+       LASSERT(lio->lis_nr_subios == 0);
 
        LASSERT(atomic_read(&lov->lo_active_ios) > 0);
        if (atomic_dec_and_test(&lov->lo_active_ios))
@@ -361,6 +299,79 @@ static void lov_io_fini(const struct lu_env *env, const struct cl_io_slice *ios)
        EXIT;
 }
 
+static void lov_io_sub_inherit(struct lov_io_sub *sub, struct lov_io *lio,
+                              loff_t start, loff_t end)
+{
+       struct cl_io *io = &sub->sub_io;
+       struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
+       struct cl_io *parent = lio->lis_cl.cis_io;
+       int index = lov_comp_entry(sub->sub_subio_index);
+       int stripe = lov_comp_stripe(sub->sub_subio_index);
+
+       switch (io->ci_type) {
+       case CIT_SETATTR: {
+               io->u.ci_setattr.sa_attr = parent->u.ci_setattr.sa_attr;
+               io->u.ci_setattr.sa_attr_flags =
+                       parent->u.ci_setattr.sa_attr_flags;
+               io->u.ci_setattr.sa_valid = parent->u.ci_setattr.sa_valid;
+               io->u.ci_setattr.sa_stripe_index = stripe;
+               io->u.ci_setattr.sa_parent_fid =
+                                       parent->u.ci_setattr.sa_parent_fid;
+               if (cl_io_is_trunc(io)) {
+                       loff_t new_size = parent->u.ci_setattr.sa_attr.lvb_size;
+
+                       new_size = lov_size_to_stripe(lsm, index, new_size,
+                                                     stripe);
+                       io->u.ci_setattr.sa_attr.lvb_size = new_size;
+               }
+               break;
+       }
+       case CIT_DATA_VERSION: {
+               io->u.ci_data_version.dv_data_version = 0;
+               io->u.ci_data_version.dv_flags =
+                       parent->u.ci_data_version.dv_flags;
+               break;
+       }
+       case CIT_FAULT: {
+               struct cl_object *obj = parent->ci_obj;
+               loff_t off = cl_offset(obj, parent->u.ci_fault.ft_index);
+
+               io->u.ci_fault = parent->u.ci_fault;
+               off = lov_size_to_stripe(lsm, index, off, stripe);
+               io->u.ci_fault.ft_index = cl_index(obj, off);
+               break;
+       }
+       case CIT_FSYNC: {
+               io->u.ci_fsync.fi_start = start;
+               io->u.ci_fsync.fi_end = end;
+               io->u.ci_fsync.fi_fid = parent->u.ci_fsync.fi_fid;
+               io->u.ci_fsync.fi_mode = parent->u.ci_fsync.fi_mode;
+               break;
+       }
+       case CIT_READ:
+       case CIT_WRITE: {
+               io->u.ci_wr.wr_sync = cl_io_is_sync_write(parent);
+               if (cl_io_is_append(parent)) {
+                       io->u.ci_wr.wr_append = 1;
+               } else {
+                       io->u.ci_rw.crw_pos = start;
+                       io->u.ci_rw.crw_count = end - start;
+               }
+               break;
+       }
+       case CIT_LADVISE: {
+               io->u.ci_ladvise.li_start = start;
+               io->u.ci_ladvise.li_end = end;
+               io->u.ci_ladvise.li_fid = parent->u.ci_ladvise.li_fid;
+               io->u.ci_ladvise.li_advice = parent->u.ci_ladvise.li_advice;
+               io->u.ci_ladvise.li_flags = parent->u.ci_ladvise.li_flags;
+               break;
+       }
+       default:
+               break;
+       }
+}
+
 static loff_t lov_offset_mod(loff_t val, int delta)
 {
         if (val != OBD_OBJECT_EOF)
@@ -369,85 +380,123 @@ static loff_t lov_offset_mod(loff_t val, int delta)
 }
 
 static int lov_io_iter_init(const struct lu_env *env,
-                            const struct cl_io_slice *ios)
+                           const struct cl_io_slice *ios)
 {
        struct lov_io        *lio = cl2lov_io(env, ios);
        struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
        struct lov_io_sub    *sub;
-       loff_t endpos;
-       loff_t start;
-       loff_t end;
-        int stripe;
-        int rc = 0;
+       struct lov_layout_entry *le;
+       struct lu_extent ext;
+       int index;
+       int rc = 0;
 
         ENTRY;
-        endpos = lov_offset_mod(lio->lis_endpos, -1);
-        for (stripe = 0; stripe < lio->lis_stripe_count; stripe++) {
-                if (!lov_stripe_intersects(lsm, stripe, lio->lis_pos,
-                                           endpos, &start, &end))
-                        continue;
-
-               if (unlikely(lov_r0(lio->lis_object)->lo_sub[stripe] == NULL)) {
-                       if (ios->cis_io->ci_type == CIT_READ ||
-                           ios->cis_io->ci_type == CIT_WRITE ||
-                           ios->cis_io->ci_type == CIT_FAULT)
-                               RETURN(-EIO);
 
+       ext.e_start = lio->lis_pos;
+       ext.e_end = lio->lis_endpos;
+
+       index = 0;
+       lov_foreach_layout_entry(lio->lis_object, le) {
+               struct lov_layout_raid0 *r0 = &le->lle_raid0;
+               u64 start;
+               u64 end;
+               int stripe;
+
+               index++;
+               if (!lu_extent_is_overlapped(&ext, &le->lle_extent))
                        continue;
-               }
 
-               end = lov_offset_mod(end, +1);
-               sub = lov_sub_get(env, lio, stripe);
-               if (IS_ERR(sub)) {
-                       rc = PTR_ERR(sub);
-                       break;
-               }
+               for (stripe = 0; stripe < r0->lo_nr; stripe++) {
+                       if (!lov_stripe_intersects(lsm, index - 1, stripe,
+                                                  &ext, &start, &end))
+                               continue;
+
+                       if (unlikely(r0->lo_sub[stripe] == NULL)) {
+                               if (ios->cis_io->ci_type == CIT_READ ||
+                                   ios->cis_io->ci_type == CIT_WRITE ||
+                                   ios->cis_io->ci_type == CIT_FAULT)
+                                       RETURN(-EIO);
+
+                               continue;
+                       }
+
+                       end = lov_offset_mod(end, 1);
+                       sub = lov_sub_get(env, lio,
+                                         lov_comp_index(index - 1, stripe));
+                       if (IS_ERR(sub)) {
+                               rc = PTR_ERR(sub);
+                               break;
+                       }
 
-               lov_io_sub_inherit(sub->sub_io, lio, stripe, start, end);
-               rc = cl_io_iter_init(sub->sub_env, sub->sub_io);
-               if (rc != 0)
-                       cl_io_iter_fini(sub->sub_env, sub->sub_io);
-               if (rc != 0)
-                       break;
+                       lov_io_sub_inherit(sub, lio, start, end);
+                       rc = cl_io_iter_init(sub->sub_env, &sub->sub_io);
+                       if (rc != 0)
+                               cl_io_iter_fini(sub->sub_env, &sub->sub_io);
+                       if (rc != 0)
+                               break;
 
-               CDEBUG(D_VFSTRACE, "shrink: %d [%llu, %llu)\n",
-                      stripe, start, end);
+                       CDEBUG(D_VFSTRACE, "shrink: %d [%llu, %llu)\n",
+                              stripe, start, end);
 
-               list_add_tail(&sub->sub_linkage, &lio->lis_active);
+                       list_add_tail(&sub->sub_linkage, &lio->lis_active);
+               }
+               if (rc != 0)
+                       break;
        }
        RETURN(rc);
 }
 
 static int lov_io_rw_iter_init(const struct lu_env *env,
-                               const struct cl_io_slice *ios)
+                              const struct cl_io_slice *ios)
 {
        struct lov_io        *lio = cl2lov_io(env, ios);
        struct cl_io         *io  = ios->cis_io;
-       struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
+       struct lov_stripe_md_entry *lse;
        loff_t start = io->u.ci_rw.crw_pos;
        loff_t next;
-       unsigned long ssize = lsm->lsm_entries[0]->lsme_stripe_size;
+       unsigned long ssize;
+       int index;
 
-        LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
-        ENTRY;
+       LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
+       ENTRY;
 
-        /* fast path for common case. */
-        if (lio->lis_nr_subios != 1 && !cl_io_is_append(io)) {
+       if (cl_io_is_append(io))
+               RETURN(lov_io_iter_init(env, ios));
 
-               lov_do_div64(start, ssize);
-               next = (start + 1) * ssize;
-               if (next <= start * ssize)
-                       next = ~0ull;
+       index = lov_lsm_entry(lio->lis_object->lo_lsm, io->u.ci_rw.crw_pos);
+       if (index < 0) { /* non-existing layout component */
+               if (io->ci_type == CIT_READ) {
+                       /* TODO: it needs to detect the next component and
+                        * then set the next pos */
+                       io->ci_continue = 0;
 
-                io->ci_continue = next < lio->lis_io_endpos;
-                io->u.ci_rw.crw_count = min_t(loff_t, lio->lis_io_endpos,
-                                              next) - io->u.ci_rw.crw_pos;
-                lio->lis_pos    = io->u.ci_rw.crw_pos;
-                lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
-               CDEBUG(D_VFSTRACE, "stripe: %llu chunk: [%llu, %llu) "
-                      "%llu\n", (__u64)start, lio->lis_pos, lio->lis_endpos,
-                      (__u64)lio->lis_io_endpos);
+                       RETURN(lov_io_iter_init(env, ios));
+               }
+
+               RETURN(-ENODATA);
        }
+
+       lse = lov_lse(lio->lis_object, index);
+
+       ssize = lse->lsme_stripe_size;
+       lov_do_div64(start, ssize);
+       next = (start + 1) * ssize;
+       if (next <= start * ssize)
+               next = ~0ull;
+
+       LASSERT(io->u.ci_rw.crw_pos >= lse->lsme_extent.e_start);
+       next = min_t(__u64, next, lse->lsme_extent.e_end);
+       next = min_t(loff_t, next, lio->lis_io_endpos);
+
+       io->ci_continue = next < lio->lis_io_endpos;
+       io->u.ci_rw.crw_count = next - io->u.ci_rw.crw_pos;
+       lio->lis_pos    = io->u.ci_rw.crw_pos;
+       lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
+       CDEBUG(D_VFSTRACE,
+              "stripe: %llu chunk: [%llu, %llu) %llu, %zd\n",
+              (__u64)start, lio->lis_pos, lio->lis_endpos,
+              (__u64)lio->lis_io_endpos, io->u.ci_rw.crw_count);
+
        /*
         * XXX The following call should be optimized: we know, that
         * [lio->lis_pos, lio->lis_endpos) intersects with exactly one stripe.
@@ -456,20 +505,20 @@ static int lov_io_rw_iter_init(const struct lu_env *env,
 }
 
 static int lov_io_call(const struct lu_env *env, struct lov_io *lio,
-                       int (*iofunc)(const struct lu_env *, struct cl_io *))
+                      int (*iofunc)(const struct lu_env *, struct cl_io *))
 {
        struct cl_io *parent = lio->lis_cl.cis_io;
-        struct lov_io_sub *sub;
-        int rc = 0;
+       struct lov_io_sub *sub;
+       int rc = 0;
 
        ENTRY;
        list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
-               rc = iofunc(sub->sub_env, sub->sub_io);
+               rc = iofunc(sub->sub_env, &sub->sub_io);
                if (rc)
                        break;
 
                if (parent->ci_result == 0)
-                       parent->ci_result = sub->sub_io->ci_result;
+                       parent->ci_result = sub->sub_io.ci_result;
        }
        RETURN(rc);
 }
@@ -530,13 +579,13 @@ lov_io_data_version_end(const struct lu_env *env, const struct cl_io_slice *ios)
 
        ENTRY;
        list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
-               lov_io_end_wrapper(env, sub->sub_io);
+               lov_io_end_wrapper(env, &sub->sub_io);
 
                parent->u.ci_data_version.dv_data_version +=
-                       sub->sub_io->u.ci_data_version.dv_data_version;
+                       sub->sub_io.u.ci_data_version.dv_data_version;
 
                if (parent->ci_result == 0)
-                       parent->ci_result = sub->sub_io->ci_result;
+                       parent->ci_result = sub->sub_io.ci_result;
        }
 
        EXIT;
@@ -574,25 +623,34 @@ static int lov_io_read_ahead(const struct lu_env *env,
        struct lov_io           *lio = cl2lov_io(env, ios);
        struct lov_object       *loo = lio->lis_object;
        struct cl_object        *obj = lov2cl(loo);
-       struct lov_layout_raid0 *r0 = lov_r0(loo);
+       struct lov_layout_raid0 *r0;
        struct lov_io_sub       *sub;
+       loff_t                   offset;
        loff_t                   suboff;
        pgoff_t                  ra_end;
        unsigned int             pps; /* pages per stripe */
        int                      stripe;
+       int                      index;
        int                      rc;
        ENTRY;
 
-       stripe = lov_stripe_number(loo->lo_lsm, cl_offset(obj, start));
+       offset = cl_offset(obj, start);
+       index = lov_lsm_entry(loo->lo_lsm, offset);
+       if (index < 0)
+               RETURN(-ENODATA);
+
+       stripe = lov_stripe_number(loo->lo_lsm, index, offset);
+
+       r0 = lov_r0(loo, index);
        if (unlikely(r0->lo_sub[stripe] == NULL))
                RETURN(-EIO);
 
-       sub = lov_sub_get(env, lio, stripe);
+       sub = lov_sub_get(env, lio, lov_comp_index(index, stripe));
        if (IS_ERR(sub))
-               return PTR_ERR(sub);
+               RETURN(PTR_ERR(sub));
 
-       lov_stripe_offset(loo->lo_lsm, cl_offset(obj, start), stripe, &suboff);
-       rc = cl_io_read_ahead(sub->sub_env, sub->sub_io,
+       lov_stripe_offset(loo->lo_lsm, index, offset, stripe, &suboff);
+       rc = cl_io_read_ahead(sub->sub_env, &sub->sub_io,
                              cl_index(lovsub2cl(r0->lo_sub[stripe]), suboff),
                              ra);
 
@@ -602,8 +660,8 @@ static int lov_io_read_ahead(const struct lu_env *env,
                RETURN(rc);
 
        /**
-        * Adjust the stripe index by layout of raid0. ra->cra_end is the maximum
-        * page index covered by an underlying DLM lock.
+        * Adjust the stripe index by layout of raid0. ra->cra_end is the
+        * maximum page index covered by an underlying DLM lock.
         * This function converts cra_end from stripe level to file level, and
         * make sure it's not beyond stripe boundary.
         */
@@ -613,14 +671,14 @@ static int lov_io_read_ahead(const struct lu_env *env,
        /* cra_end is stripe level, convert it into file level */
        ra_end = ra->cra_end;
        if (ra_end != CL_PAGE_EOF)
-               ra_end = lov_stripe_pgoff(loo->lo_lsm, ra_end, stripe);
+               ra_end = lov_stripe_pgoff(loo->lo_lsm, index, ra_end, stripe);
 
-       pps = loo->lo_lsm->lsm_entries[0]->lsme_stripe_size >> PAGE_SHIFT;
+       pps = lov_lse(loo, index)->lsme_stripe_size >> PAGE_SHIFT;
 
-       CDEBUG(D_READA, DFID " max_index = %lu, pps = %u, "
+       CDEBUG(D_READA, DFID " max_index = %lu, pps = %u, index = %u, "
               "stripe_size = %u, stripe no = %u, start index = %lu\n",
-              PFID(lu_object_fid(lov2lu(loo))), ra_end, pps,
-              loo->lo_lsm->lsm_entries[0]->lsme_stripe_size, stripe, start);
+              PFID(lu_object_fid(lov2lu(loo))), ra_end, pps, index,
+              lov_lse(loo, index)->lsme_stripe_size, stripe, start);
 
        /* never exceed the end of the stripe */
        ra->cra_end = min_t(pgoff_t, ra_end, start + pps - start % pps - 1);
@@ -651,24 +709,21 @@ static int lov_io_submit(const struct lu_env *env,
        struct lov_io_sub       *sub;
        struct cl_page_list     *plist = &lov_env_info(env)->lti_plist;
        struct cl_page          *page;
-       int stripe;
+       int index;
        int rc = 0;
        ENTRY;
 
-        if (lio->lis_active_subios == 1) {
-                int idx = lio->lis_single_subio_index;
+       if (lio->lis_nr_subios == 1) {
+               int idx = lio->lis_single_subio_index;
 
-               LASSERT(idx < lio->lis_nr_subios);
                sub = lov_sub_get(env, lio, idx);
                LASSERT(!IS_ERR(sub));
-               LASSERT(sub->sub_io == &lio->lis_single_subio);
-               rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
+               LASSERT(sub == &lio->lis_single_subio);
+               rc = cl_io_submit_rw(sub->sub_env, &sub->sub_io,
                                     crt, queue);
                RETURN(rc);
        }
 
-        LASSERT(lio->lis_subs != NULL);
-
        cl_page_list_init(plist);
        while (qin->pl_nr > 0) {
                struct cl_2queue  *cl2q = &lov_env_info(env)->lti_cl2q;
@@ -678,18 +733,18 @@ static int lov_io_submit(const struct lu_env *env,
                page = cl_page_list_first(qin);
                cl_page_list_move(&cl2q->c2_qin, qin, page);
 
-               stripe = lov_page_stripe(page);
+               index = lov_page_index(page);
                while (qin->pl_nr > 0) {
                        page = cl_page_list_first(qin);
-                       if (stripe != lov_page_stripe(page))
+                       if (index != lov_page_index(page))
                                break;
 
                        cl_page_list_move(&cl2q->c2_qin, qin, page);
                }
 
-               sub = lov_sub_get(env, lio, stripe);
+               sub = lov_sub_get(env, lio, index);
                if (!IS_ERR(sub)) {
-                        rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
+                       rc = cl_io_submit_rw(sub->sub_env, &sub->sub_io,
                                             crt, cl2q);
                } else {
                        rc = PTR_ERR(sub);
@@ -721,33 +776,30 @@ static int lov_io_commit_async(const struct lu_env *env,
        int rc = 0;
        ENTRY;
 
-       if (lio->lis_active_subios == 1) {
+       if (lio->lis_nr_subios == 1) {
                int idx = lio->lis_single_subio_index;
 
-               LASSERT(idx < lio->lis_nr_subios);
                sub = lov_sub_get(env, lio, idx);
                LASSERT(!IS_ERR(sub));
-               LASSERT(sub->sub_io == &lio->lis_single_subio);
-               rc = cl_io_commit_async(sub->sub_env, sub->sub_io, queue,
+               LASSERT(sub == &lio->lis_single_subio);
+               rc = cl_io_commit_async(sub->sub_env, &sub->sub_io, queue,
                                        from, to, cb);
                RETURN(rc);
        }
 
-       LASSERT(lio->lis_subs != NULL);
-
        cl_page_list_init(plist);
        while (queue->pl_nr > 0) {
                int stripe_to = to;
-               int stripe;
+               int index;
 
                LASSERT(plist->pl_nr == 0);
                page = cl_page_list_first(queue);
                cl_page_list_move(plist, queue, page);
 
-               stripe = lov_page_stripe(page);
+               index = lov_page_index(page);
                while (queue->pl_nr > 0) {
                        page = cl_page_list_first(queue);
-                       if (stripe != lov_page_stripe(page))
+                       if (index != lov_page_index(page))
                                break;
 
                        cl_page_list_move(plist, queue, page);
@@ -756,9 +808,9 @@ static int lov_io_commit_async(const struct lu_env *env,
                if (queue->pl_nr > 0) /* still has more pages */
                        stripe_to = PAGE_SIZE;
 
-               sub = lov_sub_get(env, lio, stripe);
+               sub = lov_sub_get(env, lio, index);
                if (!IS_ERR(sub)) {
-                       rc = cl_io_commit_async(sub->sub_env, sub->sub_io,
+                       rc = cl_io_commit_async(sub->sub_env, &sub->sub_io,
                                                plist, from, stripe_to, cb);
                } else {
                        rc = PTR_ERR(sub);
@@ -783,17 +835,19 @@ static int lov_io_commit_async(const struct lu_env *env,
 }
 
 static int lov_io_fault_start(const struct lu_env *env,
-                              const struct cl_io_slice *ios)
+                             const struct cl_io_slice *ios)
 {
-        struct cl_fault_io *fio;
-        struct lov_io      *lio;
-        struct lov_io_sub  *sub;
+       struct cl_fault_io *fio;
+       struct lov_io      *lio;
+       struct lov_io_sub  *sub;
 
        ENTRY;
+
        fio = &ios->cis_io->u.ci_fault;
        lio = cl2lov_io(env, ios);
-       sub = lov_sub_get(env, lio, lov_page_stripe(fio->ft_page));
-       sub->sub_io->u.ci_fault.ft_nob = fio->ft_nob;
+       sub = lov_sub_get(env, lio, lov_page_index(fio->ft_page));
+       sub->sub_io.u.ci_fault.ft_nob = fio->ft_nob;
+
        RETURN(lov_io_start(env, ios));
 }
 
@@ -807,7 +861,7 @@ static void lov_io_fsync_end(const struct lu_env *env,
 
        *written = 0;
        list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
-               struct cl_io *subio = sub->sub_io;
+               struct cl_io *subio = &sub->sub_io;
 
                lov_io_end_wrapper(sub->sub_env, subio);
 
@@ -972,8 +1026,8 @@ static const struct cl_io_operations lov_empty_io_ops = {
        .cio_commit_async              = LOV_EMPTY_IMPOSSIBLE
 };
 
-int lov_io_init_raid0(const struct lu_env *env, struct cl_object *obj,
-                     struct cl_io *io)
+int lov_io_init_composite(const struct lu_env *env, struct cl_object *obj,
+                         struct cl_io *io)
 {
        struct lov_io       *lio = lov_env_io(env);
        struct lov_object   *lov = cl2lov(obj);
index 1194dc8..9c4855c 100644 (file)
@@ -50,7 +50,7 @@
 
 static struct lov_sublock_env *lov_sublock_env_get(const struct lu_env *env,
                                                   const struct cl_lock *parent,
-                                                   struct lov_lock_sub *lls)
+                                                  struct lov_lock_sub *lls)
 {
         struct lov_sublock_env *subenv;
         struct lov_io          *lio    = lov_env_io(env);
@@ -72,12 +72,12 @@ static struct lov_sublock_env *lov_sublock_env_get(const struct lu_env *env,
                subenv->lse_env = env;
                subenv->lse_io = io;
        } else {
-               sub = lov_sub_get(env, lio, lls->sub_stripe);
+               sub = lov_sub_get(env, lio, lls->sub_index);
                if (!IS_ERR(sub)) {
                        subenv->lse_env = sub->sub_env;
-                       subenv->lse_io  = sub->sub_io;
+                       subenv->lse_io  = &sub->sub_io;
                } else {
-                       subenv = (void*)sub;
+                       subenv = (void *)sub;
                }
        }
        return subenv;
@@ -114,53 +114,66 @@ static struct lov_lock *lov_lock_sub_init(const struct lu_env *env,
                                          const struct cl_object *obj,
                                          struct cl_lock *lock)
 {
+       struct lov_object *lov = cl2lov(obj);
+       struct lov_lock *lovlck;
+       struct lu_extent ext;
+       loff_t start;
+       loff_t end;
        int result = 0;
        int i;
+       int index;
        int nr;
-       loff_t start;
-       loff_t end;
-       loff_t file_start;
-       loff_t file_end;
-
-       struct lov_object       *loo    = cl2lov(obj);
-       struct lov_layout_raid0 *r0     = lov_r0(loo);
-       struct lov_lock         *lovlck;
 
        ENTRY;
 
-       CDEBUG(D_INODE, "%p: lock/io FID "DFID"/"DFID", lock/io clobj %p/%p\n",
-              loo, PFID(lu_object_fid(lov2lu(loo))),
-              PFID(lu_object_fid(&obj->co_lu)),
-              lov2cl(loo), obj);
-
-       file_start = cl_offset(lov2cl(loo), lock->cll_descr.cld_start);
-       file_end   = cl_offset(lov2cl(loo), lock->cll_descr.cld_end + 1) - 1;
-
-        for (i = 0, nr = 0; i < r0->lo_nr; i++) {
-                /*
-                 * XXX for wide striping smarter algorithm is desirable,
-                 * breaking out of the loop, early.
-                 */
-               if (likely(r0->lo_sub[i] != NULL) && /* spare layout */
-                   lov_stripe_intersects(loo->lo_lsm, i,
-                                         file_start, file_end, &start, &end))
-                       nr++;
+       ext.e_start = cl_offset(obj, lock->cll_descr.cld_start);
+       if (lock->cll_descr.cld_end == CL_PAGE_EOF)
+               ext.e_end = OBD_OBJECT_EOF;
+       else
+               ext.e_end  = cl_offset(obj, lock->cll_descr.cld_end + 1);
+
+       nr = 0;
+       for (index = lov_lsm_entry(lov->lo_lsm, ext.e_start);
+            index != -1 && index < lov->lo_lsm->lsm_entry_count; index++) {
+               struct lov_layout_raid0 *r0 = lov_r0(lov, index);
+
+               /* assume lsm entries are sorted. */
+               if (!lu_extent_is_overlapped(&ext,
+                                            &lov_lse(lov, index)->lsme_extent))
+                       break;
+
+               for (i = 0; i < r0->lo_nr; i++) {
+                       if (likely(r0->lo_sub[i] != NULL) && /* spare layout */
+                           lov_stripe_intersects(lov->lo_lsm, index, i,
+                                                 &ext, &start, &end))
+                               nr++;
+               }
        }
-       LASSERT(nr > 0);
+       if (nr == 0)
+               RETURN(ERR_PTR(-EINVAL));
 
        OBD_ALLOC_LARGE(lovlck, offsetof(struct lov_lock, lls_sub[nr]));
        if (lovlck == NULL)
                RETURN(ERR_PTR(-ENOMEM));
 
        lovlck->lls_nr = nr;
-       for (i = 0, nr = 0; i < r0->lo_nr; ++i) {
-               if (likely(r0->lo_sub[i] != NULL) &&
-                   lov_stripe_intersects(loo->lo_lsm, i,
-                                         file_start, file_end, &start, &end)) {
+       nr = 0;
+       for (index = lov_lsm_entry(lov->lo_lsm, ext.e_start);
+            index < lov->lo_lsm->lsm_entry_count; index++) {
+               struct lov_layout_raid0 *r0 = lov_r0(lov, index);
+
+               /* assume lsm entries are sorted. */
+               if (!lu_extent_is_overlapped(&ext,
+                                            &lov_lse(lov, index)->lsme_extent))
+                       break;
+               for (i = 0; i < r0->lo_nr; ++i) {
                        struct lov_lock_sub *lls = &lovlck->lls_sub[nr];
-                       struct cl_lock_descr *descr;
+                       struct cl_lock_descr *descr = &lls->sub_lock.cll_descr;
 
-                       descr = &lls->sub_lock.cll_descr;
+                       if (unlikely(r0->lo_sub[i] == NULL) ||
+                           !lov_stripe_intersects(lov->lo_lsm, index, i,
+                                                  &ext, &start, &end))
+                               continue;
 
                        LASSERT(descr->cld_obj == NULL);
                        descr->cld_obj   = lovsub2cl(r0->lo_sub[i]);
@@ -170,7 +183,7 @@ static struct lov_lock *lov_lock_sub_init(const struct lu_env *env,
                        descr->cld_gid   = lock->cll_descr.cld_gid;
                        descr->cld_enq_flags = lock->cll_descr.cld_enq_flags;
 
-                       lls->sub_stripe = i;
+                       lls->sub_index = lov_comp_index(index, i);
 
                        /* initialize sub lock */
                        result = lov_sublock_init(env, lock, lls);
@@ -308,8 +321,8 @@ static const struct cl_lock_operations lov_lock_ops = {
         .clo_print     = lov_lock_print
 };
 
-int lov_lock_init_raid0(const struct lu_env *env, struct cl_object *obj,
-                       struct cl_lock *lock, const struct cl_io *io)
+int lov_lock_init_composite(const struct lu_env *env, struct cl_object *obj,
+                           struct cl_lock *lock, const struct cl_io *io)
 {
        struct lov_lock *lck;
        int result = 0;
index f13ec67..de9e429 100644 (file)
  * initializes the current atime, mtime, ctime to avoid regressing a more
  * uptodate time on the local client.
  */
-int lov_merge_lvb_kms(struct lov_stripe_md *lsm,
+int lov_merge_lvb_kms(struct lov_stripe_md *lsm, int index,
                       struct ost_lvb *lvb, __u64 *kms_place)
 {
+       struct lov_stripe_md_entry *lse = lsm->lsm_entries[index];
        u64 size = 0;
        u64 kms = 0;
        u64 blocks = 0;
@@ -61,8 +62,8 @@ int lov_merge_lvb_kms(struct lov_stripe_md *lsm,
               " a=%llu c=%llu b=%llu\n", POSTID(&lsm->lsm_oi),
               lvb->lvb_size, lvb->lvb_mtime, lvb->lvb_atime, lvb->lvb_ctime,
               lvb->lvb_blocks);
-       for (i = 0; i < lsm->lsm_entries[0]->lsme_stripe_count; i++) {
-               struct lov_oinfo *loi = lsm->lsm_entries[0]->lsme_oinfo[i];
+       for (i = 0; i < lse->lsme_stripe_count; i++) {
+               struct lov_oinfo *loi = lse->lsme_oinfo[i];
                u64 lov_size;
                u64 tmpsize;
 
@@ -72,14 +73,14 @@ int lov_merge_lvb_kms(struct lov_stripe_md *lsm,
                 }
 
                 tmpsize = loi->loi_kms;
-                lov_size = lov_stripe_size(lsm, tmpsize, i);
+               lov_size = lov_stripe_size(lsm, index, tmpsize, i);
                 if (lov_size > kms)
                         kms = lov_size;
 
                 if (loi->loi_lvb.lvb_size > tmpsize)
                         tmpsize = loi->loi_lvb.lvb_size;
 
-                lov_size = lov_stripe_size(lsm, tmpsize, i);
+               lov_size = lov_stripe_size(lsm, index, tmpsize, i);
                 if (lov_size > size)
                         size = lov_size;
                 /* merge blocks, mtime, atime */
index 9edd3e4..eb0fe68 100644 (file)
@@ -109,9 +109,9 @@ static int lov_init_empty(const struct lu_env *env, struct lov_device *dev,
        return 0;
 }
 
-static void lov_install_raid0(const struct lu_env *env,
-                              struct lov_object *lov,
-                              union  lov_layout_state *state)
+static void lov_install_composite(const struct lu_env *env,
+                                 struct lov_object *lov,
+                                 union  lov_layout_state *state)
 {
 }
 
@@ -129,13 +129,14 @@ static struct cl_object *lov_sub_find(const struct lu_env *env,
 }
 
 static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
-                       struct cl_object *stripe, struct lov_layout_raid0 *r0,
-                       int idx)
+                       struct cl_object *subobj, struct lov_layout_raid0 *r0,
+                       struct lov_oinfo *oinfo, int idx)
 {
        struct cl_object_header *hdr;
        struct cl_object_header *subhdr;
        struct cl_object_header *parent;
-       struct lov_oinfo        *oinfo;
+       int entry = lov_comp_entry(idx);
+       int stripe = lov_comp_stripe(idx);
        int result;
 
        if (OBD_FAIL_CHECK(OBD_FAIL_LOV_INIT)) {
@@ -144,19 +145,18 @@ static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
                 * freed memory. This is because osc_object is referring to
                 * lov_oinfo of lsm_stripe_data which will be freed due to
                 * this failure. */
-               cl_object_kill(env, stripe);
-               cl_object_put(env, stripe);
+               cl_object_kill(env, subobj);
+               cl_object_put(env, subobj);
                return -EIO;
        }
 
        hdr    = cl_object_header(lov2cl(lov));
-       subhdr = cl_object_header(stripe);
+       subhdr = cl_object_header(subobj);
 
-       oinfo = lov->lo_lsm->lsm_entries[0]->lsme_oinfo[idx];
-       CDEBUG(D_INODE, DFID"@%p[%d] -> "DFID"@%p: ostid: "DOSTID
-              " idx: %d gen: %d\n",
-              PFID(&subhdr->coh_lu.loh_fid), subhdr, idx,
-              PFID(&hdr->coh_lu.loh_fid), hdr, POSTID(&oinfo->loi_oi),
+       CDEBUG(D_INODE, DFID"@%p[%d:%d] -> "DFID"@%p: ostid: "DOSTID
+              " ost idx: %d gen: %d\n",
+              PFID(lu_object_fid(&subobj->co_lu)), subhdr, entry, stripe,
+              PFID(lu_object_fid(lov2lu(lov))), hdr, POSTID(&oinfo->loi_oi),
               oinfo->loi_ost_idx, oinfo->loi_ost_gen);
 
        /* reuse ->coh_attr_guard to protect coh_parent change */
@@ -166,10 +166,10 @@ static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
                subhdr->coh_parent = hdr;
                spin_unlock(&subhdr->coh_attr_guard);
                subhdr->coh_nesting = hdr->coh_nesting + 1;
-               lu_object_ref_add(&stripe->co_lu, "lov-parent", lov);
-               r0->lo_sub[idx] = cl2lovsub(stripe);
-               r0->lo_sub[idx]->lso_super = lov;
-               r0->lo_sub[idx]->lso_index = idx;
+               lu_object_ref_add(&subobj->co_lu, "lov-parent", lov);
+               r0->lo_sub[stripe] = cl2lovsub(subobj);
+               r0->lo_sub[stripe]->lso_super = lov;
+               r0->lo_sub[stripe]->lso_index = idx;
                result = 0;
        } else {
                struct lu_object  *old_obj;
@@ -183,18 +183,18 @@ static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
                if (old_lov->lo_layout_invalid) {
                        /* the object's layout has already changed but isn't
                         * refreshed */
-                       lu_object_unhash(env, &stripe->co_lu);
+                       lu_object_unhash(env, &subobj->co_lu);
                        result = -EAGAIN;
                } else {
                        mask = D_ERROR;
                        result = -EIO;
                }
 
-               LU_OBJECT_DEBUG(mask, env, &stripe->co_lu,
+               LU_OBJECT_DEBUG(mask, env, &subobj->co_lu,
                                "stripe %d is already owned.", idx);
                LU_OBJECT_DEBUG(mask, env, old_obj, "owned.");
                LU_OBJECT_HEADER(mask, env, lov2lu(lov), "try to own.\n");
-               cl_object_put(env, stripe);
+               cl_object_put(env, subobj);
        }
        return result;
 }
@@ -216,94 +216,123 @@ static int lov_page_slice_fixup(struct lov_object *lov,
 }
 
 static int lov_init_raid0(const struct lu_env *env, struct lov_device *dev,
-                         struct lov_object *lov, struct lov_stripe_md *lsm,
-                         const struct cl_object_conf *conf,
-                         union lov_layout_state *state)
+                         struct lov_object *lov, int index,
+                         struct lov_layout_raid0 *r0)
 {
-        int result;
-        int i;
+       struct lov_thread_info  *lti     = lov_env_info(env);
+       struct cl_object_conf   *subconf = &lti->lti_stripe_conf;
+       struct lu_fid           *ofid    = &lti->lti_fid;
+       struct cl_object        *stripe;
+       struct lov_stripe_md_entry *lse  = lov_lse(lov, index);
+       int result;
+       int psz;
+       int i;
 
-        struct cl_object        *stripe;
-        struct lov_thread_info  *lti     = lov_env_info(env);
-        struct cl_object_conf   *subconf = &lti->lti_stripe_conf;
-        struct lu_fid           *ofid    = &lti->lti_fid;
-        struct lov_layout_raid0 *r0      = &state->raid0;
+       ENTRY;
 
-        ENTRY;
+       spin_lock_init(&r0->lo_sub_lock);
+       r0->lo_nr = lse->lsme_stripe_count;
+       LASSERT(r0->lo_nr <= lov_targets_nr(dev));
+
+       OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
+       if (r0->lo_sub == NULL)
+               GOTO(out, result = -ENOMEM);
+
+       psz = 0;
+       result = 0;
+       memset(subconf, 0, sizeof(*subconf));
+
+       /*
+        * Create stripe cl_objects.
+        */
+       for (i = 0; i < r0->lo_nr; ++i) {
+               struct cl_device *subdev;
+               struct lov_oinfo *oinfo = lse->lsme_oinfo[i];
+               int ost_idx = oinfo->loi_ost_idx;
 
-       if (lsm->lsm_magic != LOV_MAGIC_V1 && lsm->lsm_magic != LOV_MAGIC_V3) {
-               dump_lsm(D_ERROR, lsm);
-               LASSERTF(0, "magic mismatch, expected %d/%d, actual %d.\n",
-                        LOV_MAGIC_V1, LOV_MAGIC_V3, lsm->lsm_magic);
+               if (lov_oinfo_is_dummy(oinfo))
+                       continue;
+
+               result = ostid_to_fid(ofid, &oinfo->loi_oi, oinfo->loi_ost_idx);
+               if (result != 0)
+                       GOTO(out, result);
+
+               if (dev->ld_target[ost_idx] == NULL) {
+                       CERROR("%s: OST %04x is not initialized\n",
+                              lov2obd(dev->ld_lov)->obd_name, ost_idx);
+                       GOTO(out, result = -EIO);
+               }
+
+               subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
+               subconf->u.coc_oinfo = oinfo;
+               LASSERTF(subdev != NULL, "not init ost %d\n", ost_idx);
+               /* In the function below, .hs_keycmp resolves to
+                * lu_obj_hop_keycmp() */
+               /* coverity[overrun-buffer-val] */
+               stripe = lov_sub_find(env, subdev, ofid, subconf);
+               if (IS_ERR(stripe))
+                       GOTO(out, result = PTR_ERR(stripe));
+
+               result = lov_init_sub(env, lov, stripe, r0, oinfo,
+                                     lov_comp_index(index, i));
+               if (result == -EAGAIN) { /* try again */
+                       --i;
+                       result = 0;
+                       continue;
+               }
+
+               if (result == 0) {
+                       int sz = lov_page_slice_fixup(lov, stripe);
+                       LASSERT(ergo(psz > 0, psz == sz));
+                       psz = sz;
+               }
        }
+       if (result == 0)
+               result = psz;
+out:
+       RETURN(result);
+}
 
+static int lov_init_composite(const struct lu_env *env, struct lov_device *dev,
+                             struct lov_object *lov, struct lov_stripe_md *lsm,
+                             const struct cl_object_conf *conf,
+                             union lov_layout_state *state)
+{
+       struct lov_layout_composite *comp = &state->composite;
+       unsigned int entry_count;
+       unsigned int psz = 0;
+       int result = 0;
+       int i;
+
+       ENTRY;
+
+       LASSERT(lsm->lsm_entry_count > 0);
        LASSERT(lov->lo_lsm == NULL);
        lov->lo_lsm = lsm_addref(lsm);
-       r0->lo_nr = lsm->lsm_entries[0]->lsme_stripe_count;
-       LASSERT(r0->lo_nr <= lov_targets_nr(dev));
-
        lov->lo_layout_invalid = true;
 
-       OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
-       if (r0->lo_sub != NULL) {
-               int psz = 0;
+       entry_count = lsm->lsm_entry_count;
+       comp->lo_entry_count = entry_count;
 
-               result = 0;
-               subconf->coc_inode = conf->coc_inode;
-               spin_lock_init(&r0->lo_sub_lock);
-                /*
-                 * Create stripe cl_objects.
-                 */
-                for (i = 0; i < r0->lo_nr && result == 0; ++i) {
-                       struct cl_device *subdev;
-                       struct lov_oinfo *oinfo =
-                                       lsm->lsm_entries[0]->lsme_oinfo[i];
-                        int ost_idx = oinfo->loi_ost_idx;
-
-                       if (lov_oinfo_is_dummy(oinfo))
-                               continue;
-
-                       result = ostid_to_fid(ofid, &oinfo->loi_oi,
-                                             oinfo->loi_ost_idx);
-                       if (result != 0)
-                               GOTO(out, result);
-
-                       if (dev->ld_target[ost_idx] == NULL) {
-                               CERROR("%s: OST %04x is not initialized\n",
-                                      lov2obd(dev->ld_lov)->obd_name, ost_idx);
-                               GOTO(out, result = -EIO);
-                       }
+       OBD_ALLOC(comp->lo_entries, entry_count * sizeof(*comp->lo_entries));
+       if (comp->lo_entries == NULL)
+               RETURN(-ENOMEM);
 
-                       subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
-                       subconf->u.coc_oinfo = oinfo;
-                       LASSERTF(subdev != NULL, "not init ost %d\n", ost_idx);
-                       /* In the function below, .hs_keycmp resolves to
-                        * lu_obj_hop_keycmp() */
-                       /* coverity[overrun-buffer-val] */
-                       stripe = lov_sub_find(env, subdev, ofid, subconf);
-                       if (!IS_ERR(stripe)) {
-                               result = lov_init_sub(env, lov, stripe, r0, i);
-                               if (result == -EAGAIN) { /* try again */
-                                       --i;
-                                       result = 0;
-                                       continue;
-                               }
-                       } else {
-                               result = PTR_ERR(stripe);
-                       }
+       for (i = 0; i < entry_count; i++) {
+               struct lov_layout_entry *le = &comp->lo_entries[i];
 
-                       if (result == 0) {
-                               int sz = lov_page_slice_fixup(lov, stripe);
-                               LASSERT(ergo(psz > 0, psz == sz));
-                               psz = sz;
-                       }
-                }
-               if (result == 0)
-                       cl_object_header(&lov->lo_cl)->coh_page_bufsize += psz;
-       } else
-               result = -ENOMEM;
-out:
-       RETURN(result);
+               le->lle_extent = lsm->lsm_entries[i]->lsme_extent;
+               result = lov_init_raid0(env, dev, lov, i, &le->lle_raid0);
+               if (result < 0)
+                       break;
+
+               LASSERT(ergo(psz > 0, psz == result));
+               psz = result;
+       }
+       if (psz > 0)
+               cl_object_header(&lov->lo_cl)->coh_page_bufsize += psz;
+
+       return result > 0 ? 0 : result;
 }
 
 static int lov_init_released(const struct lu_env *env,
@@ -323,20 +352,27 @@ static int lov_init_released(const struct lu_env *env,
 static struct cl_object *lov_find_subobj(const struct lu_env *env,
                                         struct lov_object *lov,
                                         struct lov_stripe_md *lsm,
-                                        int stripe_idx)
+                                        int index)
 {
        struct lov_device       *dev = lu2lov_dev(lov2lu(lov)->lo_dev);
-       struct lov_oinfo *oinfo = lsm->lsm_entries[0]->lsme_oinfo[stripe_idx];
        struct lov_thread_info  *lti = lov_env_info(env);
        struct lu_fid           *ofid = &lti->lti_fid;
+       struct lov_oinfo        *oinfo;
        struct cl_device        *subdev;
+       int                     entry = lov_comp_entry(index);
+       int                     stripe = lov_comp_stripe(index);
        int                     ost_idx;
        int                     rc;
        struct cl_object        *result;
 
-       if (lov->lo_type != LLT_RAID0)
+       if (lov->lo_type != LLT_COMP)
                GOTO(out, result = NULL);
 
+       if (entry >= lsm->lsm_entry_count ||
+           stripe >= lsm->lsm_entries[entry]->lsme_stripe_count)
+               GOTO(out, result = NULL);
+
+       oinfo = lsm->lsm_entries[entry]->lsme_oinfo[stripe];
        ost_idx = oinfo->loi_ost_idx;
        rc = ostid_to_fid(ofid, &oinfo->loi_oi, ost_idx);
        if (rc != 0)
@@ -360,15 +396,14 @@ static int lov_delete_empty(const struct lu_env *env, struct lov_object *lov,
 }
 
 static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
+                              struct lov_layout_raid0 *r0,
                               struct lovsub_object *los, int idx)
 {
        struct cl_object        *sub;
-       struct lov_layout_raid0 *r0;
        struct lu_site          *site;
        struct lu_site_bkt_data *bkt;
        wait_queue_t          *waiter;
 
-        r0  = &lov->u.raid0;
         LASSERT(r0->lo_sub[idx] == los);
 
         sub  = lovsub2cl(los);
@@ -406,32 +441,45 @@ static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
        LASSERT(r0->lo_sub[idx] == NULL);
 }
 
-static int lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
-                           union lov_layout_state *state)
+static void lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
+                            struct lov_layout_raid0 *r0)
 {
-       struct lov_layout_raid0 *r0 = &state->raid0;
-       struct lov_stripe_md    *lsm = lov->lo_lsm;
-       int i;
-
        ENTRY;
 
-       dump_lsm(D_INODE, lsm);
-
-       lov_layout_wait(env, lov);
         if (r0->lo_sub != NULL) {
-                for (i = 0; i < r0->lo_nr; ++i) {
-                        struct lovsub_object *los = r0->lo_sub[i];
+               int i;
+
+               for (i = 0; i < r0->lo_nr; ++i) {
+                       struct lovsub_object *los = r0->lo_sub[i];
 
-                        if (los != NULL) {
+                       if (los != NULL) {
                                cl_object_prune(env, &los->lso_cl);
-                                /*
-                                 * If top-level object is to be evicted from
-                                 * the cache, so are its sub-objects.
-                                 */
-                                lov_subobject_kill(env, lov, los, i);
+                               /*
+                                * If top-level object is to be evicted from
+                                * the cache, so are its sub-objects.
+                                */
+                               lov_subobject_kill(env, lov, r0, los, i);
                        }
                }
        }
+
+       EXIT;
+}
+
+static int lov_delete_composite(const struct lu_env *env,
+                               struct lov_object *lov,
+                               union lov_layout_state *state)
+{
+       struct lov_layout_entry *entry;
+
+       ENTRY;
+
+       dump_lsm(D_INODE, lov->lo_lsm);
+
+       lov_layout_wait(env, lov);
+       lov_foreach_layout_entry(lov, entry)
+               lov_delete_raid0(env, lov, &entry->lle_raid0);
+
        RETURN(0);
 }
 
@@ -441,16 +489,32 @@ static void lov_fini_empty(const struct lu_env *env, struct lov_object *lov,
        LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED);
 }
 
-static void lov_fini_raid0(const struct lu_env *env, struct lov_object *lov,
-                           union lov_layout_state *state)
+static void lov_fini_raid0(const struct lu_env *env,
+                          struct lov_layout_raid0 *r0)
 {
-       struct lov_layout_raid0 *r0 = &state->raid0;
-       ENTRY;
-
        if (r0->lo_sub != NULL) {
                OBD_FREE_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
                r0->lo_sub = NULL;
        }
+}
+
+static void lov_fini_composite(const struct lu_env *env,
+                              struct lov_object *lov,
+                              union lov_layout_state *state)
+{
+       struct lov_layout_composite *comp = &state->composite;
+       ENTRY;
+
+       if (comp->lo_entries != NULL) {
+               struct lov_layout_entry *entry;
+
+               lov_foreach_layout_entry(lov, entry)
+                       lov_fini_raid0(env, &entry->lle_raid0);
+
+               OBD_FREE(comp->lo_entries,
+                        comp->lo_entry_count * sizeof(*comp->lo_entries));
+               comp->lo_entries = NULL;
+       }
 
        dump_lsm(D_INODE, lov->lo_lsm);
        lov_free_memmd(&lov->lo_lsm);
@@ -475,17 +539,10 @@ static int lov_print_empty(const struct lu_env *env, void *cookie,
 }
 
 static int lov_print_raid0(const struct lu_env *env, void *cookie,
-                          lu_printer_t p, const struct lu_object *o)
+                          lu_printer_t p, struct lov_layout_raid0 *r0)
 {
-       struct lov_object       *lov = lu2lov(o);
-       struct lov_layout_raid0 *r0  = lov_r0(lov);
-       struct lov_stripe_md    *lsm = lov->lo_lsm;
-       int                      i;
+       int i;
 
-       (*p)(env, cookie, "stripes: %d, %s, lsm{%p 0x%08X %d %u %u}:\n",
-               r0->lo_nr, lov->lo_layout_invalid ? "invalid" : "valid", lsm,
-               lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
-               lsm->lsm_entries[0]->lsme_stripe_count, lsm->lsm_layout_gen);
        for (i = 0; i < r0->lo_nr; ++i) {
                struct lu_object *sub;
 
@@ -499,6 +556,32 @@ static int lov_print_raid0(const struct lu_env *env, void *cookie,
        return 0;
 }
 
+static int lov_print_composite(const struct lu_env *env, void *cookie,
+                              lu_printer_t p, const struct lu_object *o)
+{
+       struct lov_object *lov = lu2lov(o);
+       struct lov_stripe_md *lsm = lov->lo_lsm;
+       int i;
+
+       (*p)(env, cookie, "entries: %d, %s, lsm{%p 0x%08X %d %u}:\n",
+            lsm->lsm_entry_count,
+            lov->lo_layout_invalid ? "invalid" : "valid", lsm,
+            lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
+            lsm->lsm_layout_gen);
+
+       for (i = 0; i < lsm->lsm_entry_count; i++) {
+               struct lov_stripe_md_entry *lse = lsm->lsm_entries[i];
+
+               (*p)(env, cookie, DEXT ": { 0x%08X, %u, %u, %u, %u }\n",
+                    PEXT(&lse->lsme_extent), lse->lsme_magic,
+                    lse->lsme_id, lse->lsme_layout_gen,
+                    lse->lsme_stripe_count, lse->lsme_stripe_size);
+               lov_print_raid0(env, cookie, p, lov_r0(lov, i));
+       }
+
+       return 0;
+}
+
 static int lov_print_released(const struct lu_env *env, void *cookie,
                                lu_printer_t p, const struct lu_object *o)
 {
@@ -506,10 +589,10 @@ static int lov_print_released(const struct lu_env *env, void *cookie,
        struct lov_stripe_md    *lsm = lov->lo_lsm;
 
        (*p)(env, cookie,
-               "released: %s, lsm{%p 0x%08X %d %u %u}:\n",
+               "released: %s, lsm{%p 0x%08X %d %u}:\n",
                lov->lo_layout_invalid ? "invalid" : "valid", lsm,
                lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
-               lsm->lsm_entries[0]->lsme_stripe_count, lsm->lsm_layout_gen);
+               lsm->lsm_layout_gen);
        return 0;
 }
 
@@ -527,63 +610,80 @@ static int lov_attr_get_empty(const struct lu_env *env, struct cl_object *obj,
         return 0;
 }
 
-static int lov_attr_get_raid0(const struct lu_env *env, struct cl_object *obj,
-                              struct cl_attr *attr)
+static int lov_attr_get_raid0(const struct lu_env *env, struct lov_object *lov,
+                             unsigned int index, struct lov_layout_raid0 *r0)
+
+{
+       struct lov_stripe_md *lsm = lov->lo_lsm;
+       struct ost_lvb *lvb = &lov_env_info(env)->lti_lvb;
+       struct cl_attr *attr = &r0->lo_attr;
+       __u64 kms = 0;
+       int result = 0;
+
+       if (r0->lo_attr_valid)
+               return 0;
+
+       memset(lvb, 0, sizeof(*lvb));
+
+       /* XXX: timestamps can be negative by sanity:test_39m,
+        * how can it be? */
+       lvb->lvb_atime = LLONG_MIN;
+       lvb->lvb_ctime = LLONG_MIN;
+       lvb->lvb_mtime = LLONG_MIN;
+
+       /*
+        * XXX that should be replaced with a loop over sub-objects,
+        * doing cl_object_attr_get() on them. But for now, let's
+        * reuse old lov code.
+        */
+
+       /*
+        * XXX take lsm spin-lock to keep lov_merge_lvb_kms()
+        * happy. It's not needed, because new code uses
+        * ->coh_attr_guard spin-lock to protect consistency of
+        * sub-object attributes.
+        */
+       lov_stripe_lock(lsm);
+       result = lov_merge_lvb_kms(lsm, index, lvb, &kms);
+       lov_stripe_unlock(lsm);
+       if (result == 0) {
+               cl_lvb2attr(attr, lvb);
+               attr->cat_kms = kms;
+               r0->lo_attr_valid = 1;
+       }
+
+       return result;
+}
+
+static int lov_attr_get_composite(const struct lu_env *env,
+                                 struct cl_object *obj,
+                                 struct cl_attr *attr)
 {
        struct lov_object       *lov = cl2lov(obj);
-       struct lov_layout_raid0 *r0 = lov_r0(lov);
-       struct cl_attr          *lov_attr = &r0->lo_attr;
+       struct lov_layout_entry *entry;
        int                      result = 0;
+       int                      index = 0;
 
-        ENTRY;
+       ENTRY;
 
-       /* this is called w/o holding type guard mutex, so it must be inside
-        * an on going IO otherwise lsm may be replaced.
-        * LU-2117: it turns out there exists one exception. For mmaped files,
-        * the lock of those files may be requested in the other file's IO
-        * context, and this function is called in ccc_lock_state(), it will
-        * hit this assertion.
-        * Anyway, it's still okay to call attr_get w/o type guard as layout
-        * can't go if locks exist. */
-       /* LASSERT(atomic_read(&lsm->lsm_refc) > 1); */
-
-       if (!r0->lo_attr_valid) {
-               struct lov_stripe_md    *lsm = lov->lo_lsm;
-               struct ost_lvb          *lvb = &lov_env_info(env)->lti_lvb;
-               __u64                    kms = 0;
-
-               memset(lvb, 0, sizeof(*lvb));
-               /* XXX: timestamps can be negative by sanity:test_39m,
-                * how can it be? */
-               lvb->lvb_atime = LLONG_MIN;
-               lvb->lvb_ctime = LLONG_MIN;
-               lvb->lvb_mtime = LLONG_MIN;
+       attr->cat_size = 0;
+       attr->cat_blocks = 0;
+       lov_foreach_layout_entry(lov, entry) {
+               struct lov_layout_raid0 *r0 = &entry->lle_raid0;
+               struct cl_attr *lov_attr = &r0->lo_attr;
 
-               /*
-                * XXX that should be replaced with a loop over sub-objects,
-                * doing cl_object_attr_get() on them. But for now, let's
-                * reuse old lov code.
-                */
+               result = lov_attr_get_raid0(env, lov, index, r0);
+               if (result != 0)
+                       break;
 
-               /*
-                * XXX take lsm spin-lock to keep lov_merge_lvb_kms()
-                * happy. It's not needed, because new code uses
-                * ->coh_attr_guard spin-lock to protect consistency of
-                * sub-object attributes.
-                */
-               lov_stripe_lock(lsm);
-               result = lov_merge_lvb_kms(lsm, lvb, &kms);
-               lov_stripe_unlock(lsm);
-               if (result == 0) {
-                       cl_lvb2attr(lov_attr, lvb);
-                       lov_attr->cat_kms = kms;
-                       r0->lo_attr_valid = 1;
-               }
-       }
-       if (result == 0) { /* merge results */
-               attr->cat_blocks = lov_attr->cat_blocks;
-               attr->cat_size = lov_attr->cat_size;
-               attr->cat_kms = lov_attr->cat_kms;
+               index++;
+
+               /* merge results */
+               attr->cat_blocks += lov_attr->cat_blocks;
+               if (attr->cat_size < lov_attr->cat_size)
+                       attr->cat_size = lov_attr->cat_size;
+               if (attr->cat_kms < lov_attr->cat_kms)
+                       attr->cat_kms = lov_attr->cat_kms;
                if (attr->cat_atime < lov_attr->cat_atime)
                        attr->cat_atime = lov_attr->cat_atime;
                if (attr->cat_ctime < lov_attr->cat_ctime)
@@ -606,17 +706,6 @@ const static struct lov_layout_operations lov_dispatch[] = {
                 .llo_io_init   = lov_io_init_empty,
                .llo_getattr   = lov_attr_get_empty,
         },
-        [LLT_RAID0] = {
-                .llo_init      = lov_init_raid0,
-                .llo_delete    = lov_delete_raid0,
-                .llo_fini      = lov_fini_raid0,
-                .llo_install   = lov_install_raid0,
-                .llo_print     = lov_print_raid0,
-                .llo_page_init = lov_page_init_raid0,
-                .llo_lock_init = lov_lock_init_raid0,
-                .llo_io_init   = lov_io_init_raid0,
-               .llo_getattr   = lov_attr_get_raid0,
-       },
         [LLT_RELEASED] = {
                 .llo_init      = lov_init_released,
                 .llo_delete    = lov_delete_empty,
@@ -627,7 +716,18 @@ const static struct lov_layout_operations lov_dispatch[] = {
                 .llo_lock_init = lov_lock_init_empty,
                 .llo_io_init   = lov_io_init_released,
                .llo_getattr   = lov_attr_get_empty,
-        }
+       },
+       [LLT_COMP] = {
+               .llo_init      = lov_init_composite,
+               .llo_delete    = lov_delete_composite,
+               .llo_fini      = lov_fini_composite,
+               .llo_install   = lov_install_composite,
+               .llo_print     = lov_print_composite,
+               .llo_page_init = lov_page_init_composite,
+               .llo_lock_init = lov_lock_init_composite,
+               .llo_io_init   = lov_io_init_composite,
+               .llo_getattr   = lov_attr_get_composite,
+       },
 };
 
 /**
@@ -651,13 +751,15 @@ static enum lov_layout_type lov_type(struct lov_stripe_md *lsm)
        if (lsm == NULL)
                return LLT_EMPTY;
 
-       if (lsm->lsm_magic == LOV_MAGIC_COMP_V1)
-               return LLT_EMPTY;
-
        if (lsm->lsm_is_released)
                return LLT_RELEASED;
 
-       return LLT_RAID0;
+       if (lsm->lsm_magic == LOV_MAGIC_V1 ||
+           lsm->lsm_magic == LOV_MAGIC_V3 ||
+           lsm->lsm_magic == LOV_MAGIC_COMP_V1)
+               return LLT_COMP;
+
+       return LLT_EMPTY;
 }
 
 static inline void lov_conf_freeze(struct lov_object *lov)
@@ -841,6 +943,8 @@ int lov_object_init(const struct lu_env *env, struct lu_object *obj,
                                   cconf->u.coc_layout.lb_len);
                if (IS_ERR(lsm))
                        RETURN(PTR_ERR(lsm));
+
+               dump_lsm(D_INODE, lsm);
        }
 
        /* no locking is necessary, as object is being created */
@@ -1010,41 +1114,38 @@ int lov_lock_init(const struct lu_env *env, struct cl_object *obj,
  * over which the mapping is spread
  *
  * \param lsm [in]             striping information for the file
- * \param fm_start [in]                logical start of mapping
- * \param fm_end [in]          logical end of mapping
+ * \param index [in]           stripe component index
+ * \param ext [in]             logical extent of mapping
  * \param start_stripe [in]    starting stripe of the mapping
  * \param stripe_count [out]   the number of stripes across which to map is
  *                             returned
  *
  * \retval last_stripe         return the last stripe of the mapping
  */
-static int fiemap_calc_last_stripe(struct lov_stripe_md *lsm,
-                                  u64 fm_start, u64 fm_end,
+static int fiemap_calc_last_stripe(struct lov_stripe_md *lsm, int index,
+                                  struct lu_extent *ext,
                                   int start_stripe, int *stripe_count)
 {
+       struct lov_stripe_md_entry *lsme = lsm->lsm_entries[index];
        int last_stripe;
        u64 obd_start;
        u64 obd_end;
        int i, j;
 
-       if (fm_end - fm_start > lsm->lsm_entries[0]->lsme_stripe_size *
-                               lsm->lsm_entries[0]->lsme_stripe_count) {
-               last_stripe = (start_stripe < 1 ?
-                              lsm->lsm_entries[0]->lsme_stripe_count - 1 :
-                              start_stripe - 1);
-               *stripe_count = lsm->lsm_entries[0]->lsme_stripe_count;
+       if (ext->e_end - ext->e_start >
+           lsme->lsme_stripe_size * lsme->lsme_stripe_count) {
+               last_stripe = (start_stripe < 1 ? lsme->lsme_stripe_count - 1 :
+                                                 start_stripe - 1);
+               *stripe_count = lsme->lsme_stripe_count;
        } else {
-               for (j = 0, i = start_stripe;
-                    j < lsm->lsm_entries[0]->lsme_stripe_count;
-                    i = (i + 1) % lsm->lsm_entries[0]->lsme_stripe_count,
-                    j++) {
-                       if ((lov_stripe_intersects(lsm, i, fm_start, fm_end,
+               for (j = 0, i = start_stripe; j < lsme->lsme_stripe_count;
+                    i = (i + 1) % lsme->lsme_stripe_count, j++) {
+                       if ((lov_stripe_intersects(lsm, index,  i, ext,
                                                   &obd_start, &obd_end)) == 0)
                                break;
                }
                *stripe_count = j;
-               last_stripe = (start_stripe + j - 1) %
-                             lsm->lsm_entries[0]->lsme_stripe_count;
+               last_stripe = (start_stripe + j - 1) % lsme->lsme_stripe_count;
        }
 
        return last_stripe;
@@ -1093,15 +1194,16 @@ static void fiemap_prepare_and_copy_exts(struct fiemap *fiemap,
  *
  * \param fiemap [in]          fiemap request header
  * \param lsm [in]             striping information for the file
- * \param fm_start [in]                logical start of mapping
- * \param fm_end [in]          logical end of mapping
+ * \param index [in]           stripe component index
+ * \param ext [in]             logical extent of mapping
  * \param start_stripe [out]   starting stripe will be returned in this
  */
 static u64 fiemap_calc_fm_end_offset(struct fiemap *fiemap,
                                     struct lov_stripe_md *lsm,
-                                    u64 fm_start, u64 fm_end,
+                                    int index, struct lu_extent *ext,
                                     int *start_stripe)
 {
+       struct lov_stripe_md_entry *lsme = lsm->lsm_entries[index];
        u64 local_end = fiemap->fm_extents[0].fe_logical;
        u64 lun_start;
        u64 lun_end;
@@ -1114,8 +1216,8 @@ static u64 fiemap_calc_fm_end_offset(struct fiemap *fiemap,
                return 0;
 
        /* Find out stripe_no from ost_index saved in the fe_device */
-       for (i = 0; i < lsm->lsm_entries[0]->lsme_stripe_count; i++) {
-               struct lov_oinfo *oinfo = lsm->lsm_entries[0]->lsme_oinfo[i];
+       for (i = 0; i < lsme->lsme_stripe_count; i++) {
+               struct lov_oinfo *oinfo = lsme->lsme_oinfo[i];
 
                if (lov_oinfo_is_dummy(oinfo))
                        continue;
@@ -1131,8 +1233,8 @@ static u64 fiemap_calc_fm_end_offset(struct fiemap *fiemap,
 
        /* If we have finished mapping on previous device, shift logical
         * offset to start of next device */
-       if (lov_stripe_intersects(lsm, stripe_no, fm_start, fm_end,
-                                 &lun_start, &lun_end) != 0 &&
+       if (lov_stripe_intersects(lsm, index, stripe_no, ext,
+                                  &lun_start, &lun_end) != 0 &&
            local_end < lun_end) {
                fm_end_offset = local_end;
                *start_stripe = stripe_no;
@@ -1140,34 +1242,32 @@ static u64 fiemap_calc_fm_end_offset(struct fiemap *fiemap,
                /* This is a special value to indicate that caller should
                 * calculate offset in next stripe. */
                fm_end_offset = 0;
-               *start_stripe = (stripe_no + 1) %
-                               lsm->lsm_entries[0]->lsme_stripe_count;
+               *start_stripe = (stripe_no + 1) % lsme->lsme_stripe_count;
        }
 
        return fm_end_offset;
 }
 
 struct fiemap_state {
-       struct fiemap   *fs_fm;
-       u64             fs_start;
-       u64             fs_length;
-       u64             fs_end;
-       u64             fs_end_offset;
-       int             fs_cur_extent;
-       int             fs_cnt_need;
-       int             fs_start_stripe;
-       int             fs_last_stripe;
-       bool            fs_device_done;
-       bool            fs_finish;
-       bool            fs_enough;
+       struct fiemap           *fs_fm;
+       struct lu_extent        fs_ext;
+       u64                     fs_length;
+       u64                     fs_end_offset;
+       int                     fs_cur_extent;
+       int                     fs_cnt_need;
+       int                     fs_start_stripe;
+       int                     fs_last_stripe;
+       bool                    fs_device_done;
+       bool                    fs_finish_stripe;
+       bool                    fs_enough;
 };
 
 int fiemap_for_stripe(const struct lu_env *env, struct cl_object *obj,
-                     struct lov_stripe_md *lsm,
-                     struct fiemap *fiemap, size_t *buflen,
-                     struct ll_fiemap_info_key *fmkey, int stripeno,
-                     struct fiemap_state *fs)
+                     struct lov_stripe_md *lsm, struct fiemap *fiemap,
+                     size_t *buflen, struct ll_fiemap_info_key *fmkey,
+                     int index, int stripeno, struct fiemap_state *fs)
 {
+       struct lov_stripe_md_entry *lsme = lsm->lsm_entries[index];
        struct cl_object *subobj;
        struct lov_obd *lov = lu2lov_dev(obj->co_lu.lo_dev)->ld_lov;
        struct fiemap_extent *fm_ext = &fs->fs_fm->fm_extents[0];
@@ -1186,11 +1286,11 @@ int fiemap_for_stripe(const struct lu_env *env, struct cl_object *obj,
 
        fs->fs_device_done = false;
        /* Find out range of mapping on this stripe */
-       if ((lov_stripe_intersects(lsm, stripeno, fs->fs_start, fs->fs_end,
+       if ((lov_stripe_intersects(lsm, index, stripeno, &fs->fs_ext,
                                   &lun_start, &obd_object_end)) == 0)
                return 0;
 
-       if (lov_oinfo_is_dummy(lsm->lsm_entries[0]->lsme_oinfo[stripeno]))
+       if (lov_oinfo_is_dummy(lsme->lsme_oinfo[stripeno]))
                return -EIO;
 
        /* If this is a continuation FIEMAP call and we are on
@@ -1198,16 +1298,7 @@ int fiemap_for_stripe(const struct lu_env *env, struct cl_object *obj,
         * end_offset */
        if (fs->fs_end_offset != 0 && stripeno == fs->fs_start_stripe)
                lun_start = fs->fs_end_offset;
-
-       lun_end = fs->fs_length;
-       if (lun_end != ~0ULL) {
-               /* Handle fs->fs_start + fs->fs_length overflow */
-               if (fs->fs_start + fs->fs_length < fs->fs_start)
-                       fs->fs_length = ~0ULL - fs->fs_start;
-               lun_end = lov_size_to_stripe(lsm, fs->fs_start + fs->fs_length,
-                                            stripeno);
-       }
-
+       lun_end = lov_size_to_stripe(lsm, index, fs->fs_ext.e_end, stripeno);
        if (lun_start == lun_end)
                return 0;
 
@@ -1216,7 +1307,8 @@ int fiemap_for_stripe(const struct lu_env *env, struct cl_object *obj,
        len_mapped_single_call = 0;
 
        /* find lobsub object */
-       subobj = lov_find_subobj(env, cl2lov(obj), lsm, stripeno);
+       subobj = lov_find_subobj(env, cl2lov(obj), lsm,
+                                lov_comp_index(index, stripeno));
        if (IS_ERR(subobj))
                return PTR_ERR(subobj);
        /* If the output buffer is very large and the objects have many
@@ -1233,13 +1325,17 @@ int fiemap_for_stripe(const struct lu_env *env, struct cl_object *obj,
                lun_start += len_mapped_single_call;
                fs->fs_fm->fm_length = req_fm_len - len_mapped_single_call;
                req_fm_len = fs->fs_fm->fm_length;
+               /**
+                * If we've collected enough extent map, we'd request 1 more,
+                * to see whether we coincidentally finished all available
+                * extent map, so that FIEMAP_EXTENT_LAST would be set.
+                */
                fs->fs_fm->fm_extent_count = fs->fs_enough ?
                                             1 : fs->fs_cnt_need;
                fs->fs_fm->fm_mapped_extents = 0;
                fs->fs_fm->fm_flags = fiemap->fm_flags;
 
-               ost_index = lsm->lsm_entries[0]->lsme_oinfo[stripeno]->
-                                                               loi_ost_idx;
+               ost_index = lsme->lsme_oinfo[stripeno]->loi_ost_idx;
 
                if (ost_index < 0 || ost_index >= lov->desc.ld_tgt_count)
                        GOTO(obj_put, rc = -EINVAL);
@@ -1272,7 +1368,7 @@ inactive_tgt:
                         * we need to return */
                        if (stripeno == fs->fs_last_stripe) {
                                fiemap->fm_mapped_extents = 0;
-                               fs->fs_finish = true;
+                               fs->fs_finish_stripe = true;
                                GOTO(obj_put, rc);
                        }
                        break;
@@ -1281,7 +1377,6 @@ inactive_tgt:
                         * We've collected enough extents and there are
                         * more extents after it.
                         */
-                       fs->fs_finish = true;
                        GOTO(obj_put, rc);
                }
 
@@ -1306,8 +1401,9 @@ inactive_tgt:
                 * the last extent */
                if (fm_ext[ext_count - 1].fe_flags & FIEMAP_EXTENT_LAST)
                        fm_ext[ext_count - 1].fe_flags &= ~FIEMAP_EXTENT_LAST;
-               if (lov_stripe_size(lsm, fm_ext[ext_count - 1].fe_logical +
-                                        fm_ext[ext_count - 1].fe_length,
+               if (lov_stripe_size(lsm, index,
+                                   fm_ext[ext_count - 1].fe_logical +
+                                   fm_ext[ext_count - 1].fe_length,
                                    stripeno) >= fmkey->lfik_oa.o_size) {
                        ost_eof = true;
                        fs->fs_device_done = true;
@@ -1323,7 +1419,7 @@ inactive_tgt:
        } while (!ost_done && !ost_eof);
 
        if (stripeno == fs->fs_last_stripe)
-               fs->fs_finish = true;
+               fs->fs_finish_stripe = true;
 obj_put:
        cl_object_put(env, subobj);
 
@@ -1348,12 +1444,18 @@ static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj,
                             struct ll_fiemap_info_key *fmkey,
                             struct fiemap *fiemap, size_t *buflen)
 {
-       struct lov_stripe_md    *lsm;
-       struct fiemap           *fm_local = NULL;
-       int                     cur_stripe;
-       int                     stripe_count;
-       unsigned int            buffer_size = FIEMAP_BUFFER_SIZE;
-       int                     rc = 0;
+       struct lov_stripe_md_entry *lsme;
+       struct lov_stripe_md *lsm;
+       struct fiemap *fm_local = NULL;
+       loff_t whole_start;
+       loff_t whole_end;
+       int entry;
+       int start_entry;
+       int end_entry;
+       int cur_stripe = 0;
+       int stripe_count;
+       unsigned int buffer_size = FIEMAP_BUFFER_SIZE;
+       int rc = 0;
        struct fiemap_state fs = { 0 };
        ENTRY;
 
@@ -1361,13 +1463,17 @@ static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj,
        if (lsm == NULL)
                RETURN(-ENODATA);
 
-       /**
-        * If the stripe_count > 1 and the application does not understand
-        * DEVICE_ORDER flag, it cannot interpret the extents correctly.
-        */
-       if (lsm->lsm_entries[0]->lsme_stripe_count > 1 &&
-           !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
-               GOTO(out_lsm, rc = -ENOTSUPP);
+       if (!(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER)) {
+               /**
+                * If the entry count > 1 or stripe_count > 1 and the
+                * application does not understand DEVICE_ORDER flag,
+                * it cannot interpret the extents correctly.
+                */
+               if (lsm->lsm_entry_count > 1 ||
+                   (lsm->lsm_entry_count == 1 &&
+                    lsm->lsm_entries[0]->lsme_stripe_count > 1))
+                       GOTO(out_lsm, rc = -ENOTSUPP);
+       }
 
        if (lsm->lsm_is_released) {
                if (fiemap->fm_start < fmkey->lfik_oa.o_size) {
@@ -1391,6 +1497,7 @@ static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj,
                GOTO(out_lsm, rc = 0);
        }
 
+       /* buffer_size is small to hold fm_extent_count of extents. */
        if (fiemap_count_to_size(fiemap->fm_extent_count) < buffer_size)
                buffer_size = fiemap_count_to_size(fiemap->fm_extent_count);
 
@@ -1398,34 +1505,6 @@ static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj,
        if (fm_local == NULL)
                GOTO(out_lsm, rc = -ENOMEM);
 
-       fs.fs_fm = fm_local;
-       fs.fs_cnt_need = fiemap_size_to_count(buffer_size);
-
-       fs.fs_start = fiemap->fm_start;
-       /* fs.fs_start is beyond the end of the file */
-       if (fs.fs_start > fmkey->lfik_oa.o_size)
-               GOTO(out_fm_local, rc = -EINVAL);
-
-       fs.fs_length = fiemap->fm_length;
-       /* Calculate start stripe, last stripe and length of mapping */
-       fs.fs_start_stripe = lov_stripe_number(lsm, fs.fs_start);
-       fs.fs_end = (fs.fs_length == ~0ULL) ? fmkey->lfik_oa.o_size :
-                                             fs.fs_start + fs.fs_length - 1;
-       /* If fs_length != ~0ULL but fs_start+fs_length-1 exceeds file size */
-       if (fs.fs_end > fmkey->lfik_oa.o_size) {
-               fs.fs_end = fmkey->lfik_oa.o_size;
-               fs.fs_length = fs.fs_end - fs.fs_start;
-       }
-
-       fs.fs_last_stripe = fiemap_calc_last_stripe(lsm, fs.fs_start, fs.fs_end,
-                                                   fs.fs_start_stripe,
-                                                   &stripe_count);
-       fs.fs_end_offset = fiemap_calc_fm_end_offset(fiemap, lsm, fs.fs_start,
-                                                    fs.fs_end,
-                                                    &fs.fs_start_stripe);
-       if (fs.fs_end_offset == -EINVAL)
-               GOTO(out_fm_local, rc = -EINVAL);
-
        /**
         * Requested extent count exceeds the fiemap buffer size, shrink our
         * ambition.
@@ -1435,26 +1514,79 @@ static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj,
        if (fiemap->fm_extent_count == 0)
                fs.fs_cnt_need = 0;
 
-       fs.fs_finish = false;
        fs.fs_enough = false;
        fs.fs_cur_extent = 0;
+       fs.fs_fm = fm_local;
+       fs.fs_cnt_need = fiemap_size_to_count(buffer_size);
 
-       /* Check each stripe */
-       for (cur_stripe = fs.fs_start_stripe; stripe_count > 0;
-            --stripe_count,
-            cur_stripe = (cur_stripe + 1) %
-                          lsm->lsm_entries[0]->lsme_stripe_count) {
-               rc = fiemap_for_stripe(env, obj, lsm, fiemap, buflen, fmkey,
-                                      cur_stripe, &fs);
-               if (rc < 0)
-                       GOTO(out_fm_local, rc);
-               if (fs.fs_finish)
-                       break;
-       } /* for each stripe */
+       whole_start = fiemap->fm_start;
+       /* whole_start is beyond the end of the file */
+       if (whole_start > fmkey->lfik_oa.o_size)
+               GOTO(out_fm_local, rc = -EINVAL);
+       whole_end = (fiemap->fm_length == OBD_OBJECT_EOF) ?
+                                       fmkey->lfik_oa.o_size :
+                                       whole_start + fiemap->fm_length - 1;
+       /**
+        * If fiemap->fm_length != OBD_OBJECT_EOF but whole_end exceeds file
+        * size
+        */
+       if (whole_end > fmkey->lfik_oa.o_size)
+               whole_end = fmkey->lfik_oa.o_size;
+
+       start_entry = lov_lsm_entry(lsm, whole_start);
+       end_entry = lov_lsm_entry(lsm, whole_end);
+       if (end_entry == -1)
+               end_entry = lsm->lsm_entry_count - 1;
 
+       if (start_entry == -1 || end_entry == -1)
+               GOTO(out_fm_local, rc = -EINVAL);
+
+       for (entry = start_entry; entry <= end_entry; entry++) {
+               lsme = lsm->lsm_entries[entry];
+
+               if (entry == start_entry)
+                       fs.fs_ext.e_start = whole_start;
+               else
+                       fs.fs_ext.e_start = lsme->lsme_extent.e_start;
+               if (entry == end_entry)
+                       fs.fs_ext.e_end = whole_end;
+               else
+                       fs.fs_ext.e_end = lsme->lsme_extent.e_end - 1;
+               fs.fs_length = fs.fs_ext.e_end - fs.fs_ext.e_start + 1;
+
+               /* Calculate start stripe, last stripe and length of mapping */
+               fs.fs_start_stripe = lov_stripe_number(lsm, entry,
+                                                      fs.fs_ext.e_start);
+               fs.fs_last_stripe = fiemap_calc_last_stripe(lsm, entry,
+                                       &fs.fs_ext, fs.fs_start_stripe,
+                                       &stripe_count);
+               fs.fs_end_offset = fiemap_calc_fm_end_offset(fiemap, lsm, entry,
+                                       &fs.fs_ext, &fs.fs_start_stripe);
+               /* Check each stripe */
+               for (cur_stripe = fs.fs_start_stripe; stripe_count > 0;
+                    --stripe_count,
+                    cur_stripe = (cur_stripe + 1) % lsme->lsme_stripe_count) {
+                       rc = fiemap_for_stripe(env, obj, lsm, fiemap, buflen,
+                                              fmkey, entry, cur_stripe, &fs);
+                       if (rc < 0)
+                               GOTO(out_fm_local, rc);
+                       if (fs.fs_enough)
+                               GOTO(finish, rc);
+                       if (fs.fs_finish_stripe)
+                               break;
+               } /* for each stripe */
+       } /* for covering layout component */
+       /*
+        * We've traversed all components, set @entry to the last component
+        * entry, it's for the last stripe check.
+        */
+       entry--;
+finish:
        /* Indicate that we are returning device offsets unless file just has
         * single stripe */
-       if (lsm->lsm_entries[0]->lsme_stripe_count > 1)
+       if (lsm->lsm_entry_count > 1 ||
+           (lsm->lsm_entry_count == 1 &&
+            lsm->lsm_entries[0]->lsme_stripe_count > 1))
                fiemap->fm_flags |= FIEMAP_FLAG_DEVICE_ORDER;
 
        if (fiemap->fm_extent_count == 0)
@@ -1472,7 +1604,6 @@ out_fm_local:
 
 out_lsm:
        lov_lsm_put(lsm);
-
        return rc;
 }
 
@@ -1611,23 +1742,28 @@ int lov_read_and_clear_async_rc(struct cl_object *clob)
 
                lov_conf_freeze(lov);
                switch (lov->lo_type) {
-               case LLT_RAID0: {
+               case LLT_COMP: {
                        struct lov_stripe_md *lsm;
                        int i;
 
                        lsm = lov->lo_lsm;
                        LASSERT(lsm != NULL);
-                       for (i = 0; i < lsm->lsm_entries[0]->lsme_stripe_count;
-                            i++) {
-                               struct lov_oinfo *loi =
-                                       lsm->lsm_entries[0]->lsme_oinfo[i];
+                       for (i = 0; i < lsm->lsm_entry_count; i++) {
+                               struct lov_stripe_md_entry *lse =
+                                               lsm->lsm_entries[i];
+                               int j;
 
-                               if (lov_oinfo_is_dummy(loi))
-                                       continue;
+                               for (j = 0; j < lse->lsme_stripe_count; j++) {
+                                       struct lov_oinfo *loi =
+                                                       lse->lsme_oinfo[j];
 
-                               if (loi->loi_ar.ar_rc && !rc)
-                                       rc = loi->loi_ar.ar_rc;
-                               loi->loi_ar.ar_rc = 0;
+                                       if (lov_oinfo_is_dummy(loi))
+                                               continue;
+
+                                       if (loi->loi_ar.ar_rc && !rc)
+                                               rc = loi->loi_ar.ar_rc;
+                                       loi->loi_ar.ar_rc = 0;
+                               }
                        }
                }
                case LLT_RELEASED:
index ba93d76..3ff0a38 100644 (file)
 
 #include "lov_internal.h"
 
+static loff_t stripe_width(struct lov_stripe_md *lsm, unsigned int index)
+{
+       struct lov_stripe_md_entry *entry = lsm->lsm_entries[index];
+
+       LASSERT(index < lsm->lsm_entry_count);
+
+       return (loff_t)entry->lsme_stripe_size * entry->lsme_stripe_count;
+}
+
 /* compute object size given "stripeno" and the ost size */
-u64 lov_stripe_size(struct lov_stripe_md *lsm, u64 ost_size, int stripeno)
+u64 lov_stripe_size(struct lov_stripe_md *lsm, int index, u64 ost_size,
+                   int stripeno)
 {
-       unsigned long ssize = lsm->lsm_entries[0]->lsme_stripe_size;
+       unsigned long ssize = lsm->lsm_entries[index]->lsme_stripe_size;
        unsigned long stripe_size;
        loff_t swidth;
        loff_t lov_size;
-       u32 magic = lsm->lsm_magic;
         ENTRY;
 
         if (ost_size == 0)
                 RETURN(0);
 
-        LASSERT(lsm_op_find(magic) != NULL);
-        lsm_op_find(magic)->lsm_stripe_by_index(lsm, &stripeno, NULL, &swidth);
+       swidth = stripe_width(lsm, index);
 
        /* lov_do_div64(a, b) returns a % b, and a = a / b */
        stripe_size = lov_do_div64(ost_size, ssize);
@@ -67,12 +75,13 @@ u64 lov_stripe_size(struct lov_stripe_md *lsm, u64 ost_size, int stripeno)
 /**
  * Compute file level page index by stripe level page offset
  */
-pgoff_t lov_stripe_pgoff(struct lov_stripe_md *lsm, pgoff_t stripe_index,
-                        int stripe)
+pgoff_t lov_stripe_pgoff(struct lov_stripe_md *lsm, int index,
+                        pgoff_t stripe_index, int stripe)
 {
        loff_t offset;
 
-       offset = lov_stripe_size(lsm, (stripe_index << PAGE_SHIFT) + 1,
+       offset = lov_stripe_size(lsm, index,
+                                (stripe_index << PAGE_SHIFT) + 1,
                                 stripe);
        return offset >> PAGE_SHIFT;
 }
@@ -125,14 +134,13 @@ pgoff_t lov_stripe_pgoff(struct lov_stripe_md *lsm, pgoff_t stripe_index,
  * was moved forward to the start of the stripe in question;  0 when it
  * falls in the stripe and no shifting was done; > 0 when the offset
  * was outside the stripe and was pulled back to its final byte. */
-int lov_stripe_offset(struct lov_stripe_md *lsm, loff_t lov_off, int stripeno,
-                     loff_t *obdoff)
+int lov_stripe_offset(struct lov_stripe_md *lsm, int index, loff_t lov_off,
+                     int stripeno, loff_t *obdoff)
 {
-       unsigned long ssize  = lsm->lsm_entries[0]->lsme_stripe_size;
+       unsigned long ssize  = lsm->lsm_entries[index]->lsme_stripe_size;
        loff_t stripe_off;
        loff_t this_stripe;
        loff_t swidth;
-       u32 magic = lsm->lsm_magic;
         int ret = 0;
 
         if (lov_off == OBD_OBJECT_EOF) {
@@ -140,9 +148,7 @@ int lov_stripe_offset(struct lov_stripe_md *lsm, loff_t lov_off, int stripeno,
                 return 0;
         }
 
-        LASSERT(lsm_op_find(magic) != NULL);
-        lsm_op_find(magic)->lsm_stripe_by_index(lsm, &stripeno, &lov_off,
-                                                &swidth);
+       swidth = stripe_width(lsm, index);
 
        /* lov_do_div64(a, b) returns a % b, and a = a / b */
        stripe_off = lov_do_div64(lov_off, swidth);
@@ -183,21 +189,18 @@ int lov_stripe_offset(struct lov_stripe_md *lsm, loff_t lov_off, int stripeno,
  * |    0    |     1     |     2     |    0    |     1     |     2     |
  * ---------------------------------------------------------------------
  */
-loff_t lov_size_to_stripe(struct lov_stripe_md *lsm, u64 file_size,
+loff_t lov_size_to_stripe(struct lov_stripe_md *lsm, int index, u64 file_size,
                          int stripeno)
 {
-       unsigned long ssize  = lsm->lsm_entries[0]->lsme_stripe_size;
+       unsigned long ssize = lsm->lsm_entries[index]->lsme_stripe_size;
        loff_t stripe_off;
        loff_t this_stripe;
        loff_t swidth;
-       u32 magic = lsm->lsm_magic;
 
         if (file_size == OBD_OBJECT_EOF)
                 return OBD_OBJECT_EOF;
 
-        LASSERT(lsm_op_find(magic) != NULL);
-        lsm_op_find(magic)->lsm_stripe_by_index(lsm, &stripeno, &file_size,
-                                                &swidth);
+       swidth = stripe_width(lsm, index);
 
        /* lov_do_div64(a, b) returns a % b, and a = a / b */
        stripe_off = lov_do_div64(file_size, swidth);
@@ -226,13 +229,23 @@ loff_t lov_size_to_stripe(struct lov_stripe_md *lsm, u64 file_size,
 /* given an extent in an lov and a stripe, calculate the extent of the stripe
  * that is contained within the lov extent.  this returns true if the given
  * stripe does intersect with the lov extent. */
-int lov_stripe_intersects(struct lov_stripe_md *lsm, int stripeno,
-                         u64 start, u64 end, u64 *obd_start, u64 *obd_end)
+int lov_stripe_intersects(struct lov_stripe_md *lsm, int index, int stripeno,
+                         struct lu_extent *ext, u64 *obd_start, u64 *obd_end)
 {
+       struct lov_stripe_md_entry *entry = lsm->lsm_entries[index];
+       u64 start, end;
         int start_side, end_side;
 
-        start_side = lov_stripe_offset(lsm, start, stripeno, obd_start);
-        end_side = lov_stripe_offset(lsm, end, stripeno, obd_end);
+       if (!lu_extent_is_overlapped(ext, &entry->lsme_extent))
+                       return 0;
+
+       start = max_t(__u64, ext->e_start, entry->lsme_extent.e_start);
+       end = min_t(__u64, ext->e_end, entry->lsme_extent.e_end);
+       if (end != OBD_OBJECT_EOF)
+               end--;
+
+       start_side = lov_stripe_offset(lsm, index, start, stripeno, obd_start);
+       end_side = lov_stripe_offset(lsm, index, end, stripeno, obd_end);
 
        CDEBUG(D_INODE, "[%lld->%lld] -> [(%d) %lld->%lld (%d)]\n",
                start, end, start_side, *obd_start, *obd_end, end_side);
@@ -258,15 +271,13 @@ int lov_stripe_intersects(struct lov_stripe_md *lsm, int stripeno,
 }
 
 /* compute which stripe number "lov_off" will be written into */
-int lov_stripe_number(struct lov_stripe_md *lsm, loff_t lov_off)
+int lov_stripe_number(struct lov_stripe_md *lsm, int index, loff_t lov_off)
 {
-       unsigned long ssize  = lsm->lsm_entries[0]->lsme_stripe_size;
+       unsigned long ssize = lsm->lsm_entries[index]->lsme_stripe_size;
        loff_t stripe_off;
        loff_t swidth;
-       u32 magic = lsm->lsm_magic;
 
-       LASSERT(lsm_op_find(magic) != NULL);
-       lsm_op_find(magic)->lsm_stripe_by_offset(lsm, NULL, &lov_off, &swidth);
+       swidth = stripe_width(lsm, index);
 
        stripe_off = lov_do_div64(lov_off, swidth);
 
index 5af3d91..ae74d25 100644 (file)
  *
  */
 
-static int lov_raid0_page_print(const struct lu_env *env,
-                               const struct cl_page_slice *slice,
-                               void *cookie, lu_printer_t printer)
+static int lov_comp_page_print(const struct lu_env *env,
+                              const struct cl_page_slice *slice,
+                              void *cookie, lu_printer_t printer)
 {
        struct lov_page *lp = cl2lov_page(slice);
 
        return (*printer)(env, cookie, LUSTRE_LOV_NAME"-page@%p, raid0\n", lp);
 }
 
-static const struct cl_page_operations lov_raid0_page_ops = {
-       .cpo_print = lov_raid0_page_print
+static const struct cl_page_operations lov_comp_page_ops = {
+       .cpo_print = lov_comp_page_print
 };
 
-int lov_page_init_raid0(const struct lu_env *env, struct cl_object *obj,
-                       struct cl_page *page, pgoff_t index)
+int lov_page_init_composite(const struct lu_env *env, struct cl_object *obj,
+                           struct cl_page *page, pgoff_t index)
 {
        struct lov_object *loo = cl2lov(obj);
-       struct lov_layout_raid0 *r0 = lov_r0(loo);
        struct lov_io     *lio = lov_env_io(env);
        struct cl_object  *subobj;
        struct cl_object  *o;
        struct lov_io_sub *sub;
        struct lov_page   *lpg = cl_object_page_slice(obj, page);
+       struct lov_layout_raid0 *r0;
        loff_t             offset;
-       loff_t                   suboff;
+       loff_t             suboff;
+       int                entry;
        int                stripe;
        int                rc;
        ENTRY;
 
        offset = cl_offset(obj, index);
-       stripe = lov_stripe_number(loo->lo_lsm, offset);
+       entry = lov_lsm_entry(loo->lo_lsm, offset);
+       if (entry < 0) {
+               /* non-existing layout component */
+               lov_page_init_empty(env, obj, page, index);
+               RETURN(0);
+       }
+
+       r0 = lov_r0(loo, entry);
+       stripe = lov_stripe_number(loo->lo_lsm, entry, offset);
        LASSERT(stripe < r0->lo_nr);
-       rc = lov_stripe_offset(loo->lo_lsm, offset, stripe,
-                              &suboff);
+       rc = lov_stripe_offset(loo->lo_lsm, entry, offset, stripe, &suboff);
        LASSERT(rc == 0);
 
-       lpg->lps_stripe = stripe;
-       cl_page_slice_add(page, &lpg->lps_cl, obj, index, &lov_raid0_page_ops);
+       lpg->lps_index = lov_comp_index(entry, stripe);
+       cl_page_slice_add(page, &lpg->lps_cl, obj, index, &lov_comp_page_ops);
 
-       sub = lov_sub_get(env, lio, stripe);
+       sub = lov_sub_get(env, lio, lpg->lps_index);
        if (IS_ERR(sub))
                RETURN(PTR_ERR(sub));
 
index 752a422..976bf3d 100644 (file)
@@ -79,13 +79,17 @@ static void lovsub_object_free(const struct lu_env *env, struct lu_object *obj)
         /* We can't assume lov was assigned here, because of the shadow
          * object handling in lu_object_find.
          */
-        if (lov) {
-                LASSERT(lov->lo_type == LLT_RAID0);
-                LASSERT(lov->u.raid0.lo_sub[los->lso_index] == los);
-               spin_lock(&lov->u.raid0.lo_sub_lock);
-               lov->u.raid0.lo_sub[los->lso_index] = NULL;
-               spin_unlock(&lov->u.raid0.lo_sub_lock);
-        }
+       if (lov != NULL) {
+               int index = lov_comp_entry(los->lso_index);
+               int stripe = lov_comp_stripe(los->lso_index);
+               struct lov_layout_raid0 *r0 = lov_r0(lov, index);
+
+               LASSERT(lov->lo_type == LLT_COMP);
+               LASSERT(r0->lo_sub[stripe] == los);
+               spin_lock(&r0->lo_sub_lock);
+               r0->lo_sub[stripe] = NULL;
+               spin_unlock(&r0->lo_sub_lock);
+       }
 
         lu_object_fini(obj);
         lu_object_header_fini(&los->lso_header.coh_lu);
@@ -104,10 +108,11 @@ static int lovsub_object_print(const struct lu_env *env, void *cookie,
 static int lovsub_attr_update(const struct lu_env *env, struct cl_object *obj,
                              const struct cl_attr *attr, unsigned valid)
 {
+       struct lovsub_object *los = cl2lovsub(obj);
        struct lov_object *lov = cl2lovsub(obj)->lso_super;
 
        ENTRY;
-       lov_r0(lov)->lo_attr_valid = 0;
+       lov_r0(lov, lov_comp_entry(los->lso_index))->lo_attr_valid = 0;
        RETURN(0);
 }
 
@@ -138,7 +143,7 @@ static void lovsub_req_attr_set(const struct lu_env *env, struct cl_object *obj,
         * There is no OBD_MD_* flag for obdo::o_stripe_idx, so set it
         * unconditionally. It never changes anyway.
         */
-       attr->cra_oa->o_stripe_idx = subobj->lso_index;
+       attr->cra_oa->o_stripe_idx = lov_comp_stripe(subobj->lso_index);
        EXIT;
 }
 
index 2151568..6069c21 100644 (file)
@@ -1137,17 +1137,14 @@ static void osc_lock_set_writer(const struct lu_env *env,
                io_start = cl_index(obj, io->u.ci_rw.crw_pos);
                io_end = cl_index(obj, io->u.ci_rw.crw_pos +
                                                io->u.ci_rw.crw_count - 1);
-               if (cl_io_is_append(io)) {
-                       io_start = 0;
-                       io_end = CL_PAGE_EOF;
-               }
        } else {
                LASSERT(cl_io_is_mkwrite(io));
                io_start = io_end = io->u.ci_fault.ft_index;
        }
 
        if (descr->cld_mode >= CLM_WRITE &&
-           descr->cld_start <= io_start && descr->cld_end >= io_end) {
+           (cl_io_is_append(io) ||
+            (descr->cld_start <= io_start && descr->cld_end >= io_end))) {
                struct osc_io *oio = osc_env_io(env);
 
                /* There must be only one lock to match the write region */