Make client layer support composite layout.
Plain layout will be stored in LOV layer as a composite layout
containing a single component.
Reviewed-on: https://review.whamcloud.com/24850
Signed-off-by: Jinshan Xiong <jinshan.xiong@intel.com>
Signed-off-by: Bobi Jam <bobijam.xu@intel.com>
Signed-off-by: Niu Yawei <yawei.niu@intel.com>
Change-Id: Ic3b85a4b10c66745e5c72ff02ea313baa0b12bb5
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
__u64 e_end;
};
+#define DEXT "[ %#llx , %#llx )"
+#define PEXT(ext) (ext)->e_start, (ext)->e_end
+
+static inline bool lu_extent_is_overlapped(struct lu_extent *e1,
+ struct lu_extent *e2)
+{
+ return e1->e_start < e2->e_end && e2->e_start < e1->e_end;
+}
+
enum lov_comp_md_entry_flags {
LCME_FL_PRIMARY = 0x00000001, /* Not used */
LCME_FL_STALE = 0x00000002, /* Not used */
struct lov_comp_md_entry_v1 lcm_entries[0];
} __attribute__((packed));
-
static inline __u32 lov_user_md_size(__u16 stripes, __u32 lmm_magic)
{
if (lmm_magic == LOV_USER_MAGIC_V1)
*/
enum lov_layout_type {
LLT_EMPTY, /** empty file without body (mknod + truncate) */
- LLT_RAID0, /** striped file */
LLT_RELEASED, /** file with no objects (data in HSM) */
+ LLT_COMP, /** support composite layout */
LLT_NR
};
switch (llt) {
case LLT_EMPTY:
return "EMPTY";
- case LLT_RAID0:
- return "RAID0";
case LLT_RELEASED:
return "RELEASED";
+ case LLT_COMP:
+ return "COMPOSITE";
case LLT_NR:
LBUG();
}
return "";
}
+struct lov_layout_raid0 {
+ unsigned lo_nr;
+ /**
+ * When this is true, lov_object::lo_attr contains
+ * valid up to date attributes for a top-level
+ * object. This field is reset to 0 when attributes of
+ * any sub-object change.
+ */
+ int lo_attr_valid;
+ /**
+ * Array of sub-objects. Allocated when top-object is
+ * created (lov_init_raid0()).
+ *
+ * Top-object is a strict master of its sub-objects:
+ * it is created before them, and outlives its
+ * children (this later is necessary so that basic
+ * functions like cl_object_top() always
+ * work). Top-object keeps a reference on every
+ * sub-object.
+ *
+ * When top-object is destroyed (lov_delete_raid0())
+ * it releases its reference to a sub-object and waits
+ * until the latter is finally destroyed.
+ */
+ struct lovsub_object **lo_sub;
+ /**
+ * protect lo_sub
+ */
+ spinlock_t lo_sub_lock;
+ /**
+ * Cached object attribute, built from sub-object
+ * attributes.
+ */
+ struct cl_attr lo_attr;
+};
+
/**
* lov-specific file state.
*
struct lov_stripe_md *lo_lsm;
union lov_layout_state {
- struct lov_layout_raid0 {
- unsigned lo_nr;
- /**
- * When this is true, lov_object::lo_attr contains
- * valid up to date attributes for a top-level
- * object. This field is reset to 0 when attributes of
- * any sub-object change.
- */
- int lo_attr_valid;
- /**
- * Array of sub-objects. Allocated when top-object is
- * created (lov_init_raid0()).
- *
- * Top-object is a strict master of its sub-objects:
- * it is created before them, and outlives its
- * children (this later is necessary so that basic
- * functions like cl_object_top() always
- * work). Top-object keeps a reference on every
- * sub-object.
- *
- * When top-object is destroyed (lov_delete_raid0())
- * it releases its reference to a sub-object and waits
- * until the latter is finally destroyed.
- *
- * May be vmalloc'd, must be freed with OBD_FREE_LARGE.
- */
- struct lovsub_object **lo_sub;
- /**
- * protect lo_sub
- */
- spinlock_t lo_sub_lock;
- /**
- * Cached object attribute, built from sub-object
- * attributes.
- */
- struct cl_attr lo_attr;
- } raid0;
struct lov_layout_state_empty {
} empty;
struct lov_layout_state_released {
} released;
+ struct lov_layout_composite {
+ /**
+ * Current valid entry count of lo_entries.
+ */
+ unsigned int lo_entry_count;
+ struct lov_layout_entry {
+ struct lu_extent lle_extent;
+ struct lov_layout_raid0 lle_raid0;
+ } *lo_entries;
+ } composite;
} u;
/**
* Thread that acquired lov_object::lo_type_guard in an exclusive
struct task_struct *lo_owner;
};
+#define lov_foreach_layout_entry(lov, entry) \
+ for (entry = &lov->u.composite.lo_entries[0]; \
+ entry < &lov->u.composite.lo_entries \
+ [lov->u.composite.lo_entry_count]; \
+ entry++)
+
/**
* State lov_lock keeps for each sub-lock.
*/
* hold resources of underlying layers */
unsigned int sub_is_enqueued:1,
sub_initialized:1;
- int sub_stripe;
+ int sub_index;
};
/**
struct lov_page {
struct cl_page_slice lps_cl;
- unsigned int lps_stripe; /* stripe index */
+ /** layout_entry + stripe index, composed using lov_comp_index() */
+ unsigned int lps_index;
};
/*
* State that lov_io maintains for every sub-io.
*/
struct lov_io_sub {
- __u16 sub_stripe;
/**
- * environment's refcheck.
- *
- * \see cl_env_get()
- */
- __u16 sub_refcheck;
- /**
- * true, iff cl_io_init() was successfully executed against
- * lov_io_sub::sub_io.
- */
- __u16 sub_io_initialized:1,
- /**
- * True, iff lov_io_sub::sub_io and lov_io_sub::sub_env weren't
- * allocated, but borrowed from a per-device emergency pool.
+ * Linkage into a list (hanging off lov_io::lis_subios)
*/
- sub_borrowed:1;
+ struct list_head sub_list;
/**
* Linkage into a list (hanging off lov_io::lis_active) of all
* sub-io's active for the current IO iteration.
*/
struct list_head sub_linkage;
+ unsigned int sub_subio_index;
/**
* sub-io for a stripe. Ideally sub-io's can be stopped and resumed
* independently, with lov acting as a scheduler to maximize overall
* throughput.
*/
- struct cl_io *sub_io;
+ struct cl_io sub_io;
/**
* environment, in which sub-io executes.
*/
struct lu_env *sub_env;
+ /**
+ * environment's refcheck.
+ *
+ * \see cl_env_get()
+ */
+ __u16 sub_refcheck;
+ __u16 sub_reenter;
};
/**
* starting position within a file, for the current io loop iteration
* (stripe), used by ci_io_loop().
*/
- loff_t lis_pos;
+ loff_t lis_pos;
/**
* end position with in a file, for the current stripe io. This is
* exclusive (i.e., next offset after last byte affected by io).
*/
- loff_t lis_endpos;
-
- int lis_stripe_count;
- int lis_active_subios;
+ loff_t lis_endpos;
+ int lis_nr_subios;
/**
* the index of ls_single_subio in ls_subios array
*/
int lis_single_subio_index;
- struct cl_io lis_single_subio;
+ struct lov_io_sub lis_single_subio;
/**
- * size of ls_subios array, actually the highest stripe #
- * May be vmalloc'd, must be freed with OBD_FREE_LARGE().
+ * List of active sub-io's. Active sub-io's are under the range
+ * of [lis_pos, lis_endpos).
*/
- int lis_nr_subios;
- struct lov_io_sub *lis_subs;
+ struct list_head lis_active;
/**
- * List of active sub-io's.
+ * All sub-io's created in this lov_io.
*/
- struct list_head lis_active;
+ struct list_head lis_subios;
};
struct lov_session {
int lovsub_lock_init (const struct lu_env *env, struct cl_object *obj,
struct cl_lock *lock, const struct cl_io *io);
-int lov_lock_init_raid0 (const struct lu_env *env, struct cl_object *obj,
+int lov_lock_init_composite(const struct lu_env *env, struct cl_object *obj,
struct cl_lock *lock, const struct cl_io *io);
int lov_lock_init_empty (const struct lu_env *env, struct cl_object *obj,
struct cl_lock *lock, const struct cl_io *io);
-int lov_io_init_raid0 (const struct lu_env *env, struct cl_object *obj,
+int lov_io_init_composite(const struct lu_env *env, struct cl_object *obj,
struct cl_io *io);
int lov_io_init_empty (const struct lu_env *env, struct cl_object *obj,
struct cl_io *io);
struct cl_page *page, pgoff_t index);
int lov_page_init_empty (const struct lu_env *env, struct cl_object *obj,
struct cl_page *page, pgoff_t index);
-int lov_page_init_raid0 (const struct lu_env *env, struct cl_object *obj,
+int lov_page_init_composite(const struct lu_env *env, struct cl_object *obj,
struct cl_page *page, pgoff_t index);
struct lu_object *lov_object_alloc (const struct lu_env *env,
const struct lu_object_header *hdr,
struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov);
int lov_page_stripe(const struct cl_page *page);
+int lov_lsm_entry(const struct lov_stripe_md *lsm, __u64 offset);
#define lov_foreach_target(lov, var) \
for (var = 0; var < lov_targets_nr(lov); ++var)
return info;
}
-static inline struct lov_layout_raid0 *lov_r0(struct lov_object *lov)
+static inline struct lov_layout_raid0 *lov_r0(struct lov_object *lov, int i)
{
- LASSERT(lov->lo_type == LLT_RAID0);
- LASSERT(lov->lo_lsm->lsm_magic == LOV_MAGIC ||
- lov->lo_lsm->lsm_magic == LOV_MAGIC_V3);
- return &lov->u.raid0;
+ LASSERT(lov->lo_type == LLT_COMP);
+ LASSERTF(i < lov->u.composite.lo_entry_count,
+ "entry %d entry_count %d", i, lov->u.composite.lo_entry_count);
+
+ return &lov->u.composite.lo_entries[i].lle_raid0;
+}
+
+static inline struct lov_stripe_md_entry *lov_lse(struct lov_object *lov, int i)
+{
+ LASSERT(lov->lo_lsm != NULL);
+ LASSERT(i < lov->lo_lsm->lsm_entry_count);
+
+ return lov->lo_lsm->lsm_entries[i];
}
/* lov_pack.c */
return ERR_PTR(rc);
}
-static void
-lsm_stripe_by_index_plain(struct lov_stripe_md *lsm, int *stripeno,
- loff_t *lov_off, loff_t *swidth)
-{
- if (swidth != NULL)
- *swidth = (loff_t)lsm->lsm_entries[0]->lsme_stripe_size *
- lsm->lsm_entries[0]->lsme_stripe_count;
-}
-
-static void
-lsm_stripe_by_offset_plain(struct lov_stripe_md *lsm, int *stripeno,
- loff_t *lov_off, loff_t *swidth)
-{
- if (swidth != NULL)
- *swidth = (loff_t)lsm->lsm_entries[0]->lsme_stripe_size *
- lsm->lsm_entries[0]->lsme_stripe_count;
-}
-
static inline struct lov_stripe_md *
lsm_unpackmd_v1(struct lov_obd *lov, void *buf, size_t buf_size)
{
}
const struct lsm_operations lsm_v1_ops = {
- .lsm_stripe_by_index = lsm_stripe_by_index_plain,
- .lsm_stripe_by_offset = lsm_stripe_by_offset_plain,
.lsm_unpackmd = lsm_unpackmd_v1,
};
}
const struct lsm_operations lsm_v3_ops = {
- .lsm_stripe_by_index = lsm_stripe_by_index_plain,
- .lsm_stripe_by_offset = lsm_stripe_by_offset_plain,
.lsm_unpackmd = lsm_unpackmd_v3,
};
}
const struct lsm_operations lsm_comp_md_v1_ops = {
- .lsm_stripe_by_index = lsm_stripe_by_index_plain,
- .lsm_stripe_by_offset = lsm_stripe_by_offset_plain,
.lsm_unpackmd = lsm_unpackmd_comp_md_v1,
};
void dump_lsm(unsigned int level, const struct lov_stripe_md *lsm)
{
- CDEBUG(level, "lsm %p, objid "DOSTID", maxbytes %#llx, magic 0x%08X,"
- " stripe_size %u, stripe_count %u, refc: %d,"
- " layout_gen %u, pool ["LOV_POOLNAMEF"]\n", lsm,
- POSTID(&lsm->lsm_oi), lsm->lsm_maxbytes, lsm->lsm_magic,
- lsm->lsm_entries[0]->lsme_stripe_size,
- lsm->lsm_entries[0]->lsme_stripe_count,
- atomic_read(&lsm->lsm_refc), lsm->lsm_layout_gen,
- lsm->lsm_entries[0]->lsme_pool_name);
+ int i;
+
+ CDEBUG(level, "lsm %p, objid "DOSTID", maxbytes %#llx, magic 0x%08X, "
+ "refc: %d, entry: %u, layout_gen %u\n",
+ lsm, POSTID(&lsm->lsm_oi), lsm->lsm_maxbytes, lsm->lsm_magic,
+ atomic_read(&lsm->lsm_refc), lsm->lsm_entry_count,
+ lsm->lsm_layout_gen);
+
+ for (i = 0; i < lsm->lsm_entry_count; i++) {
+ struct lov_stripe_md_entry *lse = lsm->lsm_entries[i];
+
+ CDEBUG(level,
+ DEXT ": id: %u, magic 0x%08X, stripe count %u, "
+ "size %u, layout_gen %u, pool: ["LOV_POOLNAMEF"]\n",
+ PEXT(&lse->lsme_extent), lse->lsme_id, lse->lsme_magic,
+ lse->lsme_stripe_count, lse->lsme_stripe_size,
+ lse->lsme_layout_gen, lse->lsme_pool_name);
+ }
+}
+
+int lov_lsm_entry(const struct lov_stripe_md *lsm, __u64 offset)
+{
+ int i;
+
+ for (i = 0; i < lsm->lsm_entry_count; i++) {
+ struct lov_stripe_md_entry *lse = lsm->lsm_entries[i];
+
+ if ((offset >= lse->lsme_extent.e_start &&
+ offset < lse->lsme_extent.e_end) ||
+ (offset == OBD_OBJECT_EOF &&
+ lse->lsme_extent.e_end == OBD_OBJECT_EOF))
+ return i;
+ }
+
+ return -1;
}
return lsm != NULL && !lsm->lsm_is_released;
}
+static inline unsigned int lov_comp_index(int entry, int stripe)
+{
+ LASSERT(entry >= 0 && entry <= SHRT_MAX);
+ LASSERT(stripe >= 0 && stripe < USHRT_MAX);
+
+ return entry << 16 | stripe;
+}
+
+static inline int lov_comp_stripe(int index)
+{
+ return index & 0xffff;
+}
+
+static inline int lov_comp_entry(int index)
+{
+ return index >> 16;
+}
+
struct lsm_operations {
- void (*lsm_stripe_by_index)(struct lov_stripe_md *, int *, loff_t *,
- loff_t *);
- void (*lsm_stripe_by_offset)(struct lov_stripe_md *, int *, loff_t *,
- loff_t *);
struct lov_stripe_md *(*lsm_unpackmd)(struct lov_obd *, void *, size_t);
};
(char *)((lv)->lov_tgts[index]->ltd_uuid.uuid)
/* lov_merge.c */
-int lov_merge_lvb_kms(struct lov_stripe_md *lsm,
+int lov_merge_lvb_kms(struct lov_stripe_md *lsm, int index,
struct ost_lvb *lvb, __u64 *kms_place);
/* lov_offset.c */
-u64 lov_stripe_size(struct lov_stripe_md *lsm, u64 ost_size, int stripeno);
-int lov_stripe_offset(struct lov_stripe_md *lsm, loff_t lov_off, int stripeno,
- loff_t *obd_off);
-loff_t lov_size_to_stripe(struct lov_stripe_md *lsm, u64 file_size,
+u64 lov_stripe_size(struct lov_stripe_md *lsm, int index,
+ u64 ost_size, int stripeno);
+int lov_stripe_offset(struct lov_stripe_md *lsm, int index, loff_t lov_off,
+ int stripeno, loff_t *obd_off);
+loff_t lov_size_to_stripe(struct lov_stripe_md *lsm, int index, u64 file_size,
int stripeno);
-int lov_stripe_intersects(struct lov_stripe_md *lsm, int stripeno,
- u64 start, u64 end, u64 *obd_start, u64 *obd_end);
-int lov_stripe_number(struct lov_stripe_md *lsm, loff_t lov_off);
-pgoff_t lov_stripe_pgoff(struct lov_stripe_md *lsm, pgoff_t stripe_index,
- int stripe);
+int lov_stripe_intersects(struct lov_stripe_md *lsm, int index, int stripeno,
+ struct lu_extent *ext, u64 *obd_start, u64 *obd_end);
+int lov_stripe_number(struct lov_stripe_md *lsm, int index, loff_t lov_off);
+pgoff_t lov_stripe_pgoff(struct lov_stripe_md *lsm, int index,
+ pgoff_t stripe_index, int stripe);
/* lov_request.c */
int lov_prep_statfs_set(struct obd_device *obd, struct obd_info *oinfo,
* @{
*/
-static void lov_io_sub_fini(const struct lu_env *env, struct lov_io *lio,
- struct lov_io_sub *sub)
+static inline struct lov_io_sub *lov_sub_alloc(struct lov_io *lio, int index)
{
- ENTRY;
- if (sub->sub_io != NULL) {
- if (sub->sub_io_initialized) {
- cl_io_fini(sub->sub_env, sub->sub_io);
- sub->sub_io_initialized = 0;
- lio->lis_active_subios--;
- }
- if (sub->sub_stripe == lio->lis_single_subio_index)
- lio->lis_single_subio_index = -1;
- else if (!sub->sub_borrowed)
- OBD_FREE_PTR(sub->sub_io);
- sub->sub_io = NULL;
- }
- if (sub->sub_env != NULL && !IS_ERR(sub->sub_env)) {
- if (!sub->sub_borrowed)
- cl_env_put(sub->sub_env, &sub->sub_refcheck);
- sub->sub_env = NULL;
- }
- EXIT;
+ struct lov_io_sub *sub;
+
+ if (lio->lis_nr_subios == 0) {
+ LASSERT(lio->lis_single_subio_index == -1);
+ sub = &lio->lis_single_subio;
+ lio->lis_single_subio_index = index;
+ memset(sub, 0, sizeof(*sub));
+ } else {
+ OBD_ALLOC_PTR(sub);
+ }
+
+ if (sub != NULL) {
+ INIT_LIST_HEAD(&sub->sub_list);
+ INIT_LIST_HEAD(&sub->sub_linkage);
+ sub->sub_subio_index = index;
+ }
+
+ return sub;
}
-static void lov_io_sub_inherit(struct cl_io *io, struct lov_io *lio,
- int stripe, loff_t start, loff_t end)
+static inline void lov_sub_free(struct lov_io *lio, struct lov_io_sub *sub)
{
- struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
- struct cl_io *parent = lio->lis_cl.cis_io;
+ if (sub->sub_subio_index == lio->lis_single_subio_index) {
+ LASSERT(sub == &lio->lis_single_subio);
+ lio->lis_single_subio_index = -1;
+ } else {
+ OBD_FREE_PTR(sub);
+ }
+}
- switch (io->ci_type) {
- case CIT_SETATTR: {
- io->u.ci_setattr.sa_attr = parent->u.ci_setattr.sa_attr;
- io->u.ci_setattr.sa_attr_flags =
- parent->u.ci_setattr.sa_attr_flags;
- io->u.ci_setattr.sa_valid = parent->u.ci_setattr.sa_valid;
- io->u.ci_setattr.sa_stripe_index = stripe;
- io->u.ci_setattr.sa_parent_fid =
- parent->u.ci_setattr.sa_parent_fid;
- if (cl_io_is_trunc(io)) {
- loff_t new_size = parent->u.ci_setattr.sa_attr.lvb_size;
+static void lov_io_sub_fini(const struct lu_env *env, struct lov_io *lio,
+ struct lov_io_sub *sub)
+{
+ ENTRY;
- new_size = lov_size_to_stripe(lsm, new_size, stripe);
- io->u.ci_setattr.sa_attr.lvb_size = new_size;
- }
- break;
- }
- case CIT_DATA_VERSION: {
- io->u.ci_data_version.dv_data_version = 0;
- io->u.ci_data_version.dv_flags =
- parent->u.ci_data_version.dv_flags;
- break;
- }
- case CIT_FAULT: {
- struct cl_object *obj = parent->ci_obj;
- loff_t off = cl_offset(obj, parent->u.ci_fault.ft_index);
+ cl_io_fini(sub->sub_env, &sub->sub_io);
- io->u.ci_fault = parent->u.ci_fault;
- off = lov_size_to_stripe(lsm, off, stripe);
- io->u.ci_fault.ft_index = cl_index(obj, off);
- break;
- }
- case CIT_FSYNC: {
- io->u.ci_fsync.fi_start = start;
- io->u.ci_fsync.fi_end = end;
- io->u.ci_fsync.fi_fid = parent->u.ci_fsync.fi_fid;
- io->u.ci_fsync.fi_mode = parent->u.ci_fsync.fi_mode;
- break;
- }
- case CIT_READ:
- case CIT_WRITE: {
- io->u.ci_wr.wr_sync = cl_io_is_sync_write(parent);
- if (cl_io_is_append(parent)) {
- io->u.ci_wr.wr_append = 1;
- } else {
- io->u.ci_rw.crw_pos = start;
- io->u.ci_rw.crw_count = end - start;
- }
- break;
- }
- case CIT_LADVISE: {
- io->u.ci_ladvise.li_start = start;
- io->u.ci_ladvise.li_end = end;
- io->u.ci_ladvise.li_fid = parent->u.ci_ladvise.li_fid;
- io->u.ci_ladvise.li_advice = parent->u.ci_ladvise.li_advice;
- io->u.ci_ladvise.li_flags = parent->u.ci_ladvise.li_flags;
- break;
- }
- default:
- break;
+ if (sub->sub_env != NULL && !IS_ERR(sub->sub_env)) {
+ cl_env_put(sub->sub_env, &sub->sub_refcheck);
+ sub->sub_env = NULL;
}
+ EXIT;
}
static int lov_io_sub_init(const struct lu_env *env, struct lov_io *lio,
- struct lov_io_sub *sub)
+ struct lov_io_sub *sub)
{
struct lov_object *lov = lio->lis_object;
struct cl_io *sub_io;
struct cl_object *sub_obj;
struct cl_io *io = lio->lis_cl.cis_io;
- int stripe = sub->sub_stripe;
- int rc;
-
- LASSERT(sub->sub_io == NULL);
- LASSERT(sub->sub_env == NULL);
- LASSERT(sub->sub_stripe < lio->lis_stripe_count);
- ENTRY;
+ int index = lov_comp_entry(sub->sub_subio_index);
+ int stripe = lov_comp_stripe(sub->sub_subio_index);
+ int result = 0;
+ LASSERT(sub->sub_env == NULL);
+ ENTRY;
- if (unlikely(lov_r0(lov)->lo_sub[stripe] == NULL))
+ if (unlikely(lov_r0(lov, index)->lo_sub[stripe] == NULL))
RETURN(-EIO);
- sub->sub_io_initialized = 0;
- sub->sub_borrowed = 0;
-
/* obtain new environment */
sub->sub_env = cl_env_get(&sub->sub_refcheck);
if (IS_ERR(sub->sub_env))
- GOTO(fini_lov_io, rc = PTR_ERR(sub->sub_env));
-
- /*
- * First sub-io. Use ->lis_single_subio to
- * avoid dynamic allocation.
- */
- if (lio->lis_active_subios == 0) {
- sub->sub_io = &lio->lis_single_subio;
- lio->lis_single_subio_index = stripe;
- } else {
- OBD_ALLOC_PTR(sub->sub_io);
- if (sub->sub_io == NULL)
- GOTO(fini_lov_io, rc = -ENOMEM);
- }
+ result = PTR_ERR(sub->sub_env);
- sub_obj = lovsub2cl(lov_r0(lov)->lo_sub[stripe]);
- sub_io = sub->sub_io;
+ sub_obj = lovsub2cl(lov_r0(lov, index)->lo_sub[stripe]);
+ sub_io = &sub->sub_io;
- sub_io->ci_obj = sub_obj;
+ sub_io->ci_obj = sub_obj;
sub_io->ci_result = 0;
- sub_io->ci_parent = io;
+
+ sub_io->ci_parent = io;
sub_io->ci_lockreq = io->ci_lockreq;
- sub_io->ci_type = io->ci_type;
+ sub_io->ci_type = io->ci_type;
sub_io->ci_no_srvlock = io->ci_no_srvlock;
sub_io->ci_noatime = io->ci_noatime;
- rc = cl_io_sub_init(sub->sub_env, sub_io, io->ci_type, sub_obj);
- if (rc >= 0) {
- lio->lis_active_subios++;
- sub->sub_io_initialized = 1;
- rc = 0;
- }
-fini_lov_io:
- if (rc != 0)
+ result = cl_io_sub_init(sub->sub_env, sub_io, io->ci_type, sub_obj);
+
+ if (result < 0)
lov_io_sub_fini(env, lio, sub);
- RETURN(rc);
+
+ RETURN(result);
}
struct lov_io_sub *lov_sub_get(const struct lu_env *env,
- struct lov_io *lio, int stripe)
+ struct lov_io *lio, int index)
{
- int rc;
- struct lov_io_sub *sub = &lio->lis_subs[stripe];
+ struct lov_io_sub *sub;
+ int rc = 0;
- LASSERT(stripe < lio->lis_stripe_count);
- ENTRY;
+ ENTRY;
- if (!sub->sub_io_initialized) {
- sub->sub_stripe = stripe;
- rc = lov_io_sub_init(env, lio, sub);
- } else
- rc = 0;
+ list_for_each_entry(sub, &lio->lis_subios, sub_list) {
+ if (sub->sub_subio_index == index) {
+ rc = 1;
+ break;
+ }
+ }
+
+ if (rc == 0) {
+ sub = lov_sub_alloc(lio, index);
+ if (sub == NULL)
+ GOTO(out, rc = -ENOMEM);
+ rc = lov_io_sub_init(env, lio, sub);
+ if (rc < 0) {
+ lov_sub_free(lio, sub);
+ GOTO(out, rc);
+ }
+
+ list_add_tail(&sub->sub_list, &lio->lis_subios);
+ lio->lis_nr_subios++;
+ }
+out:
if (rc < 0)
sub = ERR_PTR(rc);
-
RETURN(sub);
}
*
*/
-int lov_page_stripe(const struct cl_page *page)
+int lov_page_index(const struct cl_page *page)
{
const struct cl_page_slice *slice;
ENTRY;
LASSERT(slice != NULL);
LASSERT(slice->cpl_obj != NULL);
- RETURN(cl2lov_page(slice)->lps_stripe);
+ RETURN(cl2lov_page(slice)->lps_index);
}
static int lov_io_subio_init(const struct lu_env *env, struct lov_io *lio,
struct cl_io *io)
{
- struct lov_stripe_md *lsm;
- int result;
ENTRY;
LASSERT(lio->lis_object != NULL);
- lsm = lio->lis_object->lo_lsm;
- /*
- * Need to be optimized, we can't afford to allocate a piece of memory
- * when writing a page. -jay
- */
- OBD_ALLOC_LARGE(lio->lis_subs,
- lsm->lsm_entries[0]->lsme_stripe_count *
- sizeof lio->lis_subs[0]);
- if (lio->lis_subs != NULL) {
- lio->lis_nr_subios = lio->lis_stripe_count;
- lio->lis_single_subio_index = -1;
- lio->lis_active_subios = 0;
- result = 0;
- } else
- result = -ENOMEM;
+ INIT_LIST_HEAD(&lio->lis_subios);
+ lio->lis_single_subio_index = -1;
+ lio->lis_nr_subios = 0;
- RETURN(result);
+ RETURN(0);
}
static int lov_io_slice_init(struct lov_io *lio,
lio->lis_object = obj;
LASSERT(obj->lo_lsm != NULL);
- lio->lis_stripe_count = obj->lo_lsm->lsm_entries[0]->lsme_stripe_count;
switch (io->ci_type) {
case CIT_READ:
{
struct lov_io *lio = cl2lov_io(env, ios);
struct lov_object *lov = cl2lov(ios->cis_obj);
- int i;
ENTRY;
- if (lio->lis_subs != NULL) {
- for (i = 0; i < lio->lis_nr_subios; i++)
- lov_io_sub_fini(env, lio, &lio->lis_subs[i]);
- OBD_FREE_LARGE(lio->lis_subs,
- lio->lis_nr_subios * sizeof lio->lis_subs[0]);
- lio->lis_nr_subios = 0;
+
+ LASSERT(list_empty(&lio->lis_active));
+
+ while (!list_empty(&lio->lis_subios)) {
+ struct lov_io_sub *sub = list_entry(lio->lis_subios.next,
+ struct lov_io_sub,
+ sub_list);
+
+ list_del_init(&sub->sub_list);
+ lio->lis_nr_subios--;
+
+ lov_io_sub_fini(env, lio, sub);
+ lov_sub_free(lio, sub);
}
+ LASSERT(lio->lis_nr_subios == 0);
LASSERT(atomic_read(&lov->lo_active_ios) > 0);
if (atomic_dec_and_test(&lov->lo_active_ios))
EXIT;
}
+static void lov_io_sub_inherit(struct lov_io_sub *sub, struct lov_io *lio,
+ loff_t start, loff_t end)
+{
+ struct cl_io *io = &sub->sub_io;
+ struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
+ struct cl_io *parent = lio->lis_cl.cis_io;
+ int index = lov_comp_entry(sub->sub_subio_index);
+ int stripe = lov_comp_stripe(sub->sub_subio_index);
+
+ switch (io->ci_type) {
+ case CIT_SETATTR: {
+ io->u.ci_setattr.sa_attr = parent->u.ci_setattr.sa_attr;
+ io->u.ci_setattr.sa_attr_flags =
+ parent->u.ci_setattr.sa_attr_flags;
+ io->u.ci_setattr.sa_valid = parent->u.ci_setattr.sa_valid;
+ io->u.ci_setattr.sa_stripe_index = stripe;
+ io->u.ci_setattr.sa_parent_fid =
+ parent->u.ci_setattr.sa_parent_fid;
+ if (cl_io_is_trunc(io)) {
+ loff_t new_size = parent->u.ci_setattr.sa_attr.lvb_size;
+
+ new_size = lov_size_to_stripe(lsm, index, new_size,
+ stripe);
+ io->u.ci_setattr.sa_attr.lvb_size = new_size;
+ }
+ break;
+ }
+ case CIT_DATA_VERSION: {
+ io->u.ci_data_version.dv_data_version = 0;
+ io->u.ci_data_version.dv_flags =
+ parent->u.ci_data_version.dv_flags;
+ break;
+ }
+ case CIT_FAULT: {
+ struct cl_object *obj = parent->ci_obj;
+ loff_t off = cl_offset(obj, parent->u.ci_fault.ft_index);
+
+ io->u.ci_fault = parent->u.ci_fault;
+ off = lov_size_to_stripe(lsm, index, off, stripe);
+ io->u.ci_fault.ft_index = cl_index(obj, off);
+ break;
+ }
+ case CIT_FSYNC: {
+ io->u.ci_fsync.fi_start = start;
+ io->u.ci_fsync.fi_end = end;
+ io->u.ci_fsync.fi_fid = parent->u.ci_fsync.fi_fid;
+ io->u.ci_fsync.fi_mode = parent->u.ci_fsync.fi_mode;
+ break;
+ }
+ case CIT_READ:
+ case CIT_WRITE: {
+ io->u.ci_wr.wr_sync = cl_io_is_sync_write(parent);
+ if (cl_io_is_append(parent)) {
+ io->u.ci_wr.wr_append = 1;
+ } else {
+ io->u.ci_rw.crw_pos = start;
+ io->u.ci_rw.crw_count = end - start;
+ }
+ break;
+ }
+ case CIT_LADVISE: {
+ io->u.ci_ladvise.li_start = start;
+ io->u.ci_ladvise.li_end = end;
+ io->u.ci_ladvise.li_fid = parent->u.ci_ladvise.li_fid;
+ io->u.ci_ladvise.li_advice = parent->u.ci_ladvise.li_advice;
+ io->u.ci_ladvise.li_flags = parent->u.ci_ladvise.li_flags;
+ break;
+ }
+ default:
+ break;
+ }
+}
+
static loff_t lov_offset_mod(loff_t val, int delta)
{
if (val != OBD_OBJECT_EOF)
}
static int lov_io_iter_init(const struct lu_env *env,
- const struct cl_io_slice *ios)
+ const struct cl_io_slice *ios)
{
struct lov_io *lio = cl2lov_io(env, ios);
struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
struct lov_io_sub *sub;
- loff_t endpos;
- loff_t start;
- loff_t end;
- int stripe;
- int rc = 0;
+ struct lov_layout_entry *le;
+ struct lu_extent ext;
+ int index;
+ int rc = 0;
ENTRY;
- endpos = lov_offset_mod(lio->lis_endpos, -1);
- for (stripe = 0; stripe < lio->lis_stripe_count; stripe++) {
- if (!lov_stripe_intersects(lsm, stripe, lio->lis_pos,
- endpos, &start, &end))
- continue;
-
- if (unlikely(lov_r0(lio->lis_object)->lo_sub[stripe] == NULL)) {
- if (ios->cis_io->ci_type == CIT_READ ||
- ios->cis_io->ci_type == CIT_WRITE ||
- ios->cis_io->ci_type == CIT_FAULT)
- RETURN(-EIO);
+ ext.e_start = lio->lis_pos;
+ ext.e_end = lio->lis_endpos;
+
+ index = 0;
+ lov_foreach_layout_entry(lio->lis_object, le) {
+ struct lov_layout_raid0 *r0 = &le->lle_raid0;
+ u64 start;
+ u64 end;
+ int stripe;
+
+ index++;
+ if (!lu_extent_is_overlapped(&ext, &le->lle_extent))
continue;
- }
- end = lov_offset_mod(end, +1);
- sub = lov_sub_get(env, lio, stripe);
- if (IS_ERR(sub)) {
- rc = PTR_ERR(sub);
- break;
- }
+ for (stripe = 0; stripe < r0->lo_nr; stripe++) {
+ if (!lov_stripe_intersects(lsm, index - 1, stripe,
+ &ext, &start, &end))
+ continue;
+
+ if (unlikely(r0->lo_sub[stripe] == NULL)) {
+ if (ios->cis_io->ci_type == CIT_READ ||
+ ios->cis_io->ci_type == CIT_WRITE ||
+ ios->cis_io->ci_type == CIT_FAULT)
+ RETURN(-EIO);
+
+ continue;
+ }
+
+ end = lov_offset_mod(end, 1);
+ sub = lov_sub_get(env, lio,
+ lov_comp_index(index - 1, stripe));
+ if (IS_ERR(sub)) {
+ rc = PTR_ERR(sub);
+ break;
+ }
- lov_io_sub_inherit(sub->sub_io, lio, stripe, start, end);
- rc = cl_io_iter_init(sub->sub_env, sub->sub_io);
- if (rc != 0)
- cl_io_iter_fini(sub->sub_env, sub->sub_io);
- if (rc != 0)
- break;
+ lov_io_sub_inherit(sub, lio, start, end);
+ rc = cl_io_iter_init(sub->sub_env, &sub->sub_io);
+ if (rc != 0)
+ cl_io_iter_fini(sub->sub_env, &sub->sub_io);
+ if (rc != 0)
+ break;
- CDEBUG(D_VFSTRACE, "shrink: %d [%llu, %llu)\n",
- stripe, start, end);
+ CDEBUG(D_VFSTRACE, "shrink: %d [%llu, %llu)\n",
+ stripe, start, end);
- list_add_tail(&sub->sub_linkage, &lio->lis_active);
+ list_add_tail(&sub->sub_linkage, &lio->lis_active);
+ }
+ if (rc != 0)
+ break;
}
RETURN(rc);
}
static int lov_io_rw_iter_init(const struct lu_env *env,
- const struct cl_io_slice *ios)
+ const struct cl_io_slice *ios)
{
struct lov_io *lio = cl2lov_io(env, ios);
struct cl_io *io = ios->cis_io;
- struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
+ struct lov_stripe_md_entry *lse;
loff_t start = io->u.ci_rw.crw_pos;
loff_t next;
- unsigned long ssize = lsm->lsm_entries[0]->lsme_stripe_size;
+ unsigned long ssize;
+ int index;
- LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
- ENTRY;
+ LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
+ ENTRY;
- /* fast path for common case. */
- if (lio->lis_nr_subios != 1 && !cl_io_is_append(io)) {
+ if (cl_io_is_append(io))
+ RETURN(lov_io_iter_init(env, ios));
- lov_do_div64(start, ssize);
- next = (start + 1) * ssize;
- if (next <= start * ssize)
- next = ~0ull;
+ index = lov_lsm_entry(lio->lis_object->lo_lsm, io->u.ci_rw.crw_pos);
+ if (index < 0) { /* non-existing layout component */
+ if (io->ci_type == CIT_READ) {
+ /* TODO: it needs to detect the next component and
+ * then set the next pos */
+ io->ci_continue = 0;
- io->ci_continue = next < lio->lis_io_endpos;
- io->u.ci_rw.crw_count = min_t(loff_t, lio->lis_io_endpos,
- next) - io->u.ci_rw.crw_pos;
- lio->lis_pos = io->u.ci_rw.crw_pos;
- lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
- CDEBUG(D_VFSTRACE, "stripe: %llu chunk: [%llu, %llu) "
- "%llu\n", (__u64)start, lio->lis_pos, lio->lis_endpos,
- (__u64)lio->lis_io_endpos);
+ RETURN(lov_io_iter_init(env, ios));
+ }
+
+ RETURN(-ENODATA);
}
+
+ lse = lov_lse(lio->lis_object, index);
+
+ ssize = lse->lsme_stripe_size;
+ lov_do_div64(start, ssize);
+ next = (start + 1) * ssize;
+ if (next <= start * ssize)
+ next = ~0ull;
+
+ LASSERT(io->u.ci_rw.crw_pos >= lse->lsme_extent.e_start);
+ next = min_t(__u64, next, lse->lsme_extent.e_end);
+ next = min_t(loff_t, next, lio->lis_io_endpos);
+
+ io->ci_continue = next < lio->lis_io_endpos;
+ io->u.ci_rw.crw_count = next - io->u.ci_rw.crw_pos;
+ lio->lis_pos = io->u.ci_rw.crw_pos;
+ lio->lis_endpos = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
+ CDEBUG(D_VFSTRACE,
+ "stripe: %llu chunk: [%llu, %llu) %llu, %zd\n",
+ (__u64)start, lio->lis_pos, lio->lis_endpos,
+ (__u64)lio->lis_io_endpos, io->u.ci_rw.crw_count);
+
/*
* XXX The following call should be optimized: we know, that
* [lio->lis_pos, lio->lis_endpos) intersects with exactly one stripe.
}
static int lov_io_call(const struct lu_env *env, struct lov_io *lio,
- int (*iofunc)(const struct lu_env *, struct cl_io *))
+ int (*iofunc)(const struct lu_env *, struct cl_io *))
{
struct cl_io *parent = lio->lis_cl.cis_io;
- struct lov_io_sub *sub;
- int rc = 0;
+ struct lov_io_sub *sub;
+ int rc = 0;
ENTRY;
list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
- rc = iofunc(sub->sub_env, sub->sub_io);
+ rc = iofunc(sub->sub_env, &sub->sub_io);
if (rc)
break;
if (parent->ci_result == 0)
- parent->ci_result = sub->sub_io->ci_result;
+ parent->ci_result = sub->sub_io.ci_result;
}
RETURN(rc);
}
ENTRY;
list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
- lov_io_end_wrapper(env, sub->sub_io);
+ lov_io_end_wrapper(env, &sub->sub_io);
parent->u.ci_data_version.dv_data_version +=
- sub->sub_io->u.ci_data_version.dv_data_version;
+ sub->sub_io.u.ci_data_version.dv_data_version;
if (parent->ci_result == 0)
- parent->ci_result = sub->sub_io->ci_result;
+ parent->ci_result = sub->sub_io.ci_result;
}
EXIT;
struct lov_io *lio = cl2lov_io(env, ios);
struct lov_object *loo = lio->lis_object;
struct cl_object *obj = lov2cl(loo);
- struct lov_layout_raid0 *r0 = lov_r0(loo);
+ struct lov_layout_raid0 *r0;
struct lov_io_sub *sub;
+ loff_t offset;
loff_t suboff;
pgoff_t ra_end;
unsigned int pps; /* pages per stripe */
int stripe;
+ int index;
int rc;
ENTRY;
- stripe = lov_stripe_number(loo->lo_lsm, cl_offset(obj, start));
+ offset = cl_offset(obj, start);
+ index = lov_lsm_entry(loo->lo_lsm, offset);
+ if (index < 0)
+ RETURN(-ENODATA);
+
+ stripe = lov_stripe_number(loo->lo_lsm, index, offset);
+
+ r0 = lov_r0(loo, index);
if (unlikely(r0->lo_sub[stripe] == NULL))
RETURN(-EIO);
- sub = lov_sub_get(env, lio, stripe);
+ sub = lov_sub_get(env, lio, lov_comp_index(index, stripe));
if (IS_ERR(sub))
- return PTR_ERR(sub);
+ RETURN(PTR_ERR(sub));
- lov_stripe_offset(loo->lo_lsm, cl_offset(obj, start), stripe, &suboff);
- rc = cl_io_read_ahead(sub->sub_env, sub->sub_io,
+ lov_stripe_offset(loo->lo_lsm, index, offset, stripe, &suboff);
+ rc = cl_io_read_ahead(sub->sub_env, &sub->sub_io,
cl_index(lovsub2cl(r0->lo_sub[stripe]), suboff),
ra);
RETURN(rc);
/**
- * Adjust the stripe index by layout of raid0. ra->cra_end is the maximum
- * page index covered by an underlying DLM lock.
+ * Adjust the stripe index by layout of raid0. ra->cra_end is the
+ * maximum page index covered by an underlying DLM lock.
* This function converts cra_end from stripe level to file level, and
* make sure it's not beyond stripe boundary.
*/
/* cra_end is stripe level, convert it into file level */
ra_end = ra->cra_end;
if (ra_end != CL_PAGE_EOF)
- ra_end = lov_stripe_pgoff(loo->lo_lsm, ra_end, stripe);
+ ra_end = lov_stripe_pgoff(loo->lo_lsm, index, ra_end, stripe);
- pps = loo->lo_lsm->lsm_entries[0]->lsme_stripe_size >> PAGE_SHIFT;
+ pps = lov_lse(loo, index)->lsme_stripe_size >> PAGE_SHIFT;
- CDEBUG(D_READA, DFID " max_index = %lu, pps = %u, "
+ CDEBUG(D_READA, DFID " max_index = %lu, pps = %u, index = %u, "
"stripe_size = %u, stripe no = %u, start index = %lu\n",
- PFID(lu_object_fid(lov2lu(loo))), ra_end, pps,
- loo->lo_lsm->lsm_entries[0]->lsme_stripe_size, stripe, start);
+ PFID(lu_object_fid(lov2lu(loo))), ra_end, pps, index,
+ lov_lse(loo, index)->lsme_stripe_size, stripe, start);
/* never exceed the end of the stripe */
ra->cra_end = min_t(pgoff_t, ra_end, start + pps - start % pps - 1);
struct lov_io_sub *sub;
struct cl_page_list *plist = &lov_env_info(env)->lti_plist;
struct cl_page *page;
- int stripe;
+ int index;
int rc = 0;
ENTRY;
- if (lio->lis_active_subios == 1) {
- int idx = lio->lis_single_subio_index;
+ if (lio->lis_nr_subios == 1) {
+ int idx = lio->lis_single_subio_index;
- LASSERT(idx < lio->lis_nr_subios);
sub = lov_sub_get(env, lio, idx);
LASSERT(!IS_ERR(sub));
- LASSERT(sub->sub_io == &lio->lis_single_subio);
- rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
+ LASSERT(sub == &lio->lis_single_subio);
+ rc = cl_io_submit_rw(sub->sub_env, &sub->sub_io,
crt, queue);
RETURN(rc);
}
- LASSERT(lio->lis_subs != NULL);
-
cl_page_list_init(plist);
while (qin->pl_nr > 0) {
struct cl_2queue *cl2q = &lov_env_info(env)->lti_cl2q;
page = cl_page_list_first(qin);
cl_page_list_move(&cl2q->c2_qin, qin, page);
- stripe = lov_page_stripe(page);
+ index = lov_page_index(page);
while (qin->pl_nr > 0) {
page = cl_page_list_first(qin);
- if (stripe != lov_page_stripe(page))
+ if (index != lov_page_index(page))
break;
cl_page_list_move(&cl2q->c2_qin, qin, page);
}
- sub = lov_sub_get(env, lio, stripe);
+ sub = lov_sub_get(env, lio, index);
if (!IS_ERR(sub)) {
- rc = cl_io_submit_rw(sub->sub_env, sub->sub_io,
+ rc = cl_io_submit_rw(sub->sub_env, &sub->sub_io,
crt, cl2q);
} else {
rc = PTR_ERR(sub);
int rc = 0;
ENTRY;
- if (lio->lis_active_subios == 1) {
+ if (lio->lis_nr_subios == 1) {
int idx = lio->lis_single_subio_index;
- LASSERT(idx < lio->lis_nr_subios);
sub = lov_sub_get(env, lio, idx);
LASSERT(!IS_ERR(sub));
- LASSERT(sub->sub_io == &lio->lis_single_subio);
- rc = cl_io_commit_async(sub->sub_env, sub->sub_io, queue,
+ LASSERT(sub == &lio->lis_single_subio);
+ rc = cl_io_commit_async(sub->sub_env, &sub->sub_io, queue,
from, to, cb);
RETURN(rc);
}
- LASSERT(lio->lis_subs != NULL);
-
cl_page_list_init(plist);
while (queue->pl_nr > 0) {
int stripe_to = to;
- int stripe;
+ int index;
LASSERT(plist->pl_nr == 0);
page = cl_page_list_first(queue);
cl_page_list_move(plist, queue, page);
- stripe = lov_page_stripe(page);
+ index = lov_page_index(page);
while (queue->pl_nr > 0) {
page = cl_page_list_first(queue);
- if (stripe != lov_page_stripe(page))
+ if (index != lov_page_index(page))
break;
cl_page_list_move(plist, queue, page);
if (queue->pl_nr > 0) /* still has more pages */
stripe_to = PAGE_SIZE;
- sub = lov_sub_get(env, lio, stripe);
+ sub = lov_sub_get(env, lio, index);
if (!IS_ERR(sub)) {
- rc = cl_io_commit_async(sub->sub_env, sub->sub_io,
+ rc = cl_io_commit_async(sub->sub_env, &sub->sub_io,
plist, from, stripe_to, cb);
} else {
rc = PTR_ERR(sub);
}
static int lov_io_fault_start(const struct lu_env *env,
- const struct cl_io_slice *ios)
+ const struct cl_io_slice *ios)
{
- struct cl_fault_io *fio;
- struct lov_io *lio;
- struct lov_io_sub *sub;
+ struct cl_fault_io *fio;
+ struct lov_io *lio;
+ struct lov_io_sub *sub;
ENTRY;
+
fio = &ios->cis_io->u.ci_fault;
lio = cl2lov_io(env, ios);
- sub = lov_sub_get(env, lio, lov_page_stripe(fio->ft_page));
- sub->sub_io->u.ci_fault.ft_nob = fio->ft_nob;
+ sub = lov_sub_get(env, lio, lov_page_index(fio->ft_page));
+ sub->sub_io.u.ci_fault.ft_nob = fio->ft_nob;
+
RETURN(lov_io_start(env, ios));
}
*written = 0;
list_for_each_entry(sub, &lio->lis_active, sub_linkage) {
- struct cl_io *subio = sub->sub_io;
+ struct cl_io *subio = &sub->sub_io;
lov_io_end_wrapper(sub->sub_env, subio);
.cio_commit_async = LOV_EMPTY_IMPOSSIBLE
};
-int lov_io_init_raid0(const struct lu_env *env, struct cl_object *obj,
- struct cl_io *io)
+int lov_io_init_composite(const struct lu_env *env, struct cl_object *obj,
+ struct cl_io *io)
{
struct lov_io *lio = lov_env_io(env);
struct lov_object *lov = cl2lov(obj);
static struct lov_sublock_env *lov_sublock_env_get(const struct lu_env *env,
const struct cl_lock *parent,
- struct lov_lock_sub *lls)
+ struct lov_lock_sub *lls)
{
struct lov_sublock_env *subenv;
struct lov_io *lio = lov_env_io(env);
subenv->lse_env = env;
subenv->lse_io = io;
} else {
- sub = lov_sub_get(env, lio, lls->sub_stripe);
+ sub = lov_sub_get(env, lio, lls->sub_index);
if (!IS_ERR(sub)) {
subenv->lse_env = sub->sub_env;
- subenv->lse_io = sub->sub_io;
+ subenv->lse_io = &sub->sub_io;
} else {
- subenv = (void*)sub;
+ subenv = (void *)sub;
}
}
return subenv;
const struct cl_object *obj,
struct cl_lock *lock)
{
+ struct lov_object *lov = cl2lov(obj);
+ struct lov_lock *lovlck;
+ struct lu_extent ext;
+ loff_t start;
+ loff_t end;
int result = 0;
int i;
+ int index;
int nr;
- loff_t start;
- loff_t end;
- loff_t file_start;
- loff_t file_end;
-
- struct lov_object *loo = cl2lov(obj);
- struct lov_layout_raid0 *r0 = lov_r0(loo);
- struct lov_lock *lovlck;
ENTRY;
- CDEBUG(D_INODE, "%p: lock/io FID "DFID"/"DFID", lock/io clobj %p/%p\n",
- loo, PFID(lu_object_fid(lov2lu(loo))),
- PFID(lu_object_fid(&obj->co_lu)),
- lov2cl(loo), obj);
-
- file_start = cl_offset(lov2cl(loo), lock->cll_descr.cld_start);
- file_end = cl_offset(lov2cl(loo), lock->cll_descr.cld_end + 1) - 1;
-
- for (i = 0, nr = 0; i < r0->lo_nr; i++) {
- /*
- * XXX for wide striping smarter algorithm is desirable,
- * breaking out of the loop, early.
- */
- if (likely(r0->lo_sub[i] != NULL) && /* spare layout */
- lov_stripe_intersects(loo->lo_lsm, i,
- file_start, file_end, &start, &end))
- nr++;
+ ext.e_start = cl_offset(obj, lock->cll_descr.cld_start);
+ if (lock->cll_descr.cld_end == CL_PAGE_EOF)
+ ext.e_end = OBD_OBJECT_EOF;
+ else
+ ext.e_end = cl_offset(obj, lock->cll_descr.cld_end + 1);
+
+ nr = 0;
+ for (index = lov_lsm_entry(lov->lo_lsm, ext.e_start);
+ index != -1 && index < lov->lo_lsm->lsm_entry_count; index++) {
+ struct lov_layout_raid0 *r0 = lov_r0(lov, index);
+
+ /* assume lsm entries are sorted. */
+ if (!lu_extent_is_overlapped(&ext,
+ &lov_lse(lov, index)->lsme_extent))
+ break;
+
+ for (i = 0; i < r0->lo_nr; i++) {
+ if (likely(r0->lo_sub[i] != NULL) && /* spare layout */
+ lov_stripe_intersects(lov->lo_lsm, index, i,
+ &ext, &start, &end))
+ nr++;
+ }
}
- LASSERT(nr > 0);
+ if (nr == 0)
+ RETURN(ERR_PTR(-EINVAL));
OBD_ALLOC_LARGE(lovlck, offsetof(struct lov_lock, lls_sub[nr]));
if (lovlck == NULL)
RETURN(ERR_PTR(-ENOMEM));
lovlck->lls_nr = nr;
- for (i = 0, nr = 0; i < r0->lo_nr; ++i) {
- if (likely(r0->lo_sub[i] != NULL) &&
- lov_stripe_intersects(loo->lo_lsm, i,
- file_start, file_end, &start, &end)) {
+ nr = 0;
+ for (index = lov_lsm_entry(lov->lo_lsm, ext.e_start);
+ index < lov->lo_lsm->lsm_entry_count; index++) {
+ struct lov_layout_raid0 *r0 = lov_r0(lov, index);
+
+ /* assume lsm entries are sorted. */
+ if (!lu_extent_is_overlapped(&ext,
+ &lov_lse(lov, index)->lsme_extent))
+ break;
+ for (i = 0; i < r0->lo_nr; ++i) {
struct lov_lock_sub *lls = &lovlck->lls_sub[nr];
- struct cl_lock_descr *descr;
+ struct cl_lock_descr *descr = &lls->sub_lock.cll_descr;
- descr = &lls->sub_lock.cll_descr;
+ if (unlikely(r0->lo_sub[i] == NULL) ||
+ !lov_stripe_intersects(lov->lo_lsm, index, i,
+ &ext, &start, &end))
+ continue;
LASSERT(descr->cld_obj == NULL);
descr->cld_obj = lovsub2cl(r0->lo_sub[i]);
descr->cld_gid = lock->cll_descr.cld_gid;
descr->cld_enq_flags = lock->cll_descr.cld_enq_flags;
- lls->sub_stripe = i;
+ lls->sub_index = lov_comp_index(index, i);
/* initialize sub lock */
result = lov_sublock_init(env, lock, lls);
.clo_print = lov_lock_print
};
-int lov_lock_init_raid0(const struct lu_env *env, struct cl_object *obj,
- struct cl_lock *lock, const struct cl_io *io)
+int lov_lock_init_composite(const struct lu_env *env, struct cl_object *obj,
+ struct cl_lock *lock, const struct cl_io *io)
{
struct lov_lock *lck;
int result = 0;
* initializes the current atime, mtime, ctime to avoid regressing a more
* uptodate time on the local client.
*/
-int lov_merge_lvb_kms(struct lov_stripe_md *lsm,
+int lov_merge_lvb_kms(struct lov_stripe_md *lsm, int index,
struct ost_lvb *lvb, __u64 *kms_place)
{
+ struct lov_stripe_md_entry *lse = lsm->lsm_entries[index];
u64 size = 0;
u64 kms = 0;
u64 blocks = 0;
" a=%llu c=%llu b=%llu\n", POSTID(&lsm->lsm_oi),
lvb->lvb_size, lvb->lvb_mtime, lvb->lvb_atime, lvb->lvb_ctime,
lvb->lvb_blocks);
- for (i = 0; i < lsm->lsm_entries[0]->lsme_stripe_count; i++) {
- struct lov_oinfo *loi = lsm->lsm_entries[0]->lsme_oinfo[i];
+ for (i = 0; i < lse->lsme_stripe_count; i++) {
+ struct lov_oinfo *loi = lse->lsme_oinfo[i];
u64 lov_size;
u64 tmpsize;
}
tmpsize = loi->loi_kms;
- lov_size = lov_stripe_size(lsm, tmpsize, i);
+ lov_size = lov_stripe_size(lsm, index, tmpsize, i);
if (lov_size > kms)
kms = lov_size;
if (loi->loi_lvb.lvb_size > tmpsize)
tmpsize = loi->loi_lvb.lvb_size;
- lov_size = lov_stripe_size(lsm, tmpsize, i);
+ lov_size = lov_stripe_size(lsm, index, tmpsize, i);
if (lov_size > size)
size = lov_size;
/* merge blocks, mtime, atime */
return 0;
}
-static void lov_install_raid0(const struct lu_env *env,
- struct lov_object *lov,
- union lov_layout_state *state)
+static void lov_install_composite(const struct lu_env *env,
+ struct lov_object *lov,
+ union lov_layout_state *state)
{
}
}
static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
- struct cl_object *stripe, struct lov_layout_raid0 *r0,
- int idx)
+ struct cl_object *subobj, struct lov_layout_raid0 *r0,
+ struct lov_oinfo *oinfo, int idx)
{
struct cl_object_header *hdr;
struct cl_object_header *subhdr;
struct cl_object_header *parent;
- struct lov_oinfo *oinfo;
+ int entry = lov_comp_entry(idx);
+ int stripe = lov_comp_stripe(idx);
int result;
if (OBD_FAIL_CHECK(OBD_FAIL_LOV_INIT)) {
* freed memory. This is because osc_object is referring to
* lov_oinfo of lsm_stripe_data which will be freed due to
* this failure. */
- cl_object_kill(env, stripe);
- cl_object_put(env, stripe);
+ cl_object_kill(env, subobj);
+ cl_object_put(env, subobj);
return -EIO;
}
hdr = cl_object_header(lov2cl(lov));
- subhdr = cl_object_header(stripe);
+ subhdr = cl_object_header(subobj);
- oinfo = lov->lo_lsm->lsm_entries[0]->lsme_oinfo[idx];
- CDEBUG(D_INODE, DFID"@%p[%d] -> "DFID"@%p: ostid: "DOSTID
- " idx: %d gen: %d\n",
- PFID(&subhdr->coh_lu.loh_fid), subhdr, idx,
- PFID(&hdr->coh_lu.loh_fid), hdr, POSTID(&oinfo->loi_oi),
+ CDEBUG(D_INODE, DFID"@%p[%d:%d] -> "DFID"@%p: ostid: "DOSTID
+ " ost idx: %d gen: %d\n",
+ PFID(lu_object_fid(&subobj->co_lu)), subhdr, entry, stripe,
+ PFID(lu_object_fid(lov2lu(lov))), hdr, POSTID(&oinfo->loi_oi),
oinfo->loi_ost_idx, oinfo->loi_ost_gen);
/* reuse ->coh_attr_guard to protect coh_parent change */
subhdr->coh_parent = hdr;
spin_unlock(&subhdr->coh_attr_guard);
subhdr->coh_nesting = hdr->coh_nesting + 1;
- lu_object_ref_add(&stripe->co_lu, "lov-parent", lov);
- r0->lo_sub[idx] = cl2lovsub(stripe);
- r0->lo_sub[idx]->lso_super = lov;
- r0->lo_sub[idx]->lso_index = idx;
+ lu_object_ref_add(&subobj->co_lu, "lov-parent", lov);
+ r0->lo_sub[stripe] = cl2lovsub(subobj);
+ r0->lo_sub[stripe]->lso_super = lov;
+ r0->lo_sub[stripe]->lso_index = idx;
result = 0;
} else {
struct lu_object *old_obj;
if (old_lov->lo_layout_invalid) {
/* the object's layout has already changed but isn't
* refreshed */
- lu_object_unhash(env, &stripe->co_lu);
+ lu_object_unhash(env, &subobj->co_lu);
result = -EAGAIN;
} else {
mask = D_ERROR;
result = -EIO;
}
- LU_OBJECT_DEBUG(mask, env, &stripe->co_lu,
+ LU_OBJECT_DEBUG(mask, env, &subobj->co_lu,
"stripe %d is already owned.", idx);
LU_OBJECT_DEBUG(mask, env, old_obj, "owned.");
LU_OBJECT_HEADER(mask, env, lov2lu(lov), "try to own.\n");
- cl_object_put(env, stripe);
+ cl_object_put(env, subobj);
}
return result;
}
}
static int lov_init_raid0(const struct lu_env *env, struct lov_device *dev,
- struct lov_object *lov, struct lov_stripe_md *lsm,
- const struct cl_object_conf *conf,
- union lov_layout_state *state)
+ struct lov_object *lov, int index,
+ struct lov_layout_raid0 *r0)
{
- int result;
- int i;
+ struct lov_thread_info *lti = lov_env_info(env);
+ struct cl_object_conf *subconf = <i->lti_stripe_conf;
+ struct lu_fid *ofid = <i->lti_fid;
+ struct cl_object *stripe;
+ struct lov_stripe_md_entry *lse = lov_lse(lov, index);
+ int result;
+ int psz;
+ int i;
- struct cl_object *stripe;
- struct lov_thread_info *lti = lov_env_info(env);
- struct cl_object_conf *subconf = <i->lti_stripe_conf;
- struct lu_fid *ofid = <i->lti_fid;
- struct lov_layout_raid0 *r0 = &state->raid0;
+ ENTRY;
- ENTRY;
+ spin_lock_init(&r0->lo_sub_lock);
+ r0->lo_nr = lse->lsme_stripe_count;
+ LASSERT(r0->lo_nr <= lov_targets_nr(dev));
+
+ OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
+ if (r0->lo_sub == NULL)
+ GOTO(out, result = -ENOMEM);
+
+ psz = 0;
+ result = 0;
+ memset(subconf, 0, sizeof(*subconf));
+
+ /*
+ * Create stripe cl_objects.
+ */
+ for (i = 0; i < r0->lo_nr; ++i) {
+ struct cl_device *subdev;
+ struct lov_oinfo *oinfo = lse->lsme_oinfo[i];
+ int ost_idx = oinfo->loi_ost_idx;
- if (lsm->lsm_magic != LOV_MAGIC_V1 && lsm->lsm_magic != LOV_MAGIC_V3) {
- dump_lsm(D_ERROR, lsm);
- LASSERTF(0, "magic mismatch, expected %d/%d, actual %d.\n",
- LOV_MAGIC_V1, LOV_MAGIC_V3, lsm->lsm_magic);
+ if (lov_oinfo_is_dummy(oinfo))
+ continue;
+
+ result = ostid_to_fid(ofid, &oinfo->loi_oi, oinfo->loi_ost_idx);
+ if (result != 0)
+ GOTO(out, result);
+
+ if (dev->ld_target[ost_idx] == NULL) {
+ CERROR("%s: OST %04x is not initialized\n",
+ lov2obd(dev->ld_lov)->obd_name, ost_idx);
+ GOTO(out, result = -EIO);
+ }
+
+ subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
+ subconf->u.coc_oinfo = oinfo;
+ LASSERTF(subdev != NULL, "not init ost %d\n", ost_idx);
+ /* In the function below, .hs_keycmp resolves to
+ * lu_obj_hop_keycmp() */
+ /* coverity[overrun-buffer-val] */
+ stripe = lov_sub_find(env, subdev, ofid, subconf);
+ if (IS_ERR(stripe))
+ GOTO(out, result = PTR_ERR(stripe));
+
+ result = lov_init_sub(env, lov, stripe, r0, oinfo,
+ lov_comp_index(index, i));
+ if (result == -EAGAIN) { /* try again */
+ --i;
+ result = 0;
+ continue;
+ }
+
+ if (result == 0) {
+ int sz = lov_page_slice_fixup(lov, stripe);
+ LASSERT(ergo(psz > 0, psz == sz));
+ psz = sz;
+ }
}
+ if (result == 0)
+ result = psz;
+out:
+ RETURN(result);
+}
+static int lov_init_composite(const struct lu_env *env, struct lov_device *dev,
+ struct lov_object *lov, struct lov_stripe_md *lsm,
+ const struct cl_object_conf *conf,
+ union lov_layout_state *state)
+{
+ struct lov_layout_composite *comp = &state->composite;
+ unsigned int entry_count;
+ unsigned int psz = 0;
+ int result = 0;
+ int i;
+
+ ENTRY;
+
+ LASSERT(lsm->lsm_entry_count > 0);
LASSERT(lov->lo_lsm == NULL);
lov->lo_lsm = lsm_addref(lsm);
- r0->lo_nr = lsm->lsm_entries[0]->lsme_stripe_count;
- LASSERT(r0->lo_nr <= lov_targets_nr(dev));
-
lov->lo_layout_invalid = true;
- OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
- if (r0->lo_sub != NULL) {
- int psz = 0;
+ entry_count = lsm->lsm_entry_count;
+ comp->lo_entry_count = entry_count;
- result = 0;
- subconf->coc_inode = conf->coc_inode;
- spin_lock_init(&r0->lo_sub_lock);
- /*
- * Create stripe cl_objects.
- */
- for (i = 0; i < r0->lo_nr && result == 0; ++i) {
- struct cl_device *subdev;
- struct lov_oinfo *oinfo =
- lsm->lsm_entries[0]->lsme_oinfo[i];
- int ost_idx = oinfo->loi_ost_idx;
-
- if (lov_oinfo_is_dummy(oinfo))
- continue;
-
- result = ostid_to_fid(ofid, &oinfo->loi_oi,
- oinfo->loi_ost_idx);
- if (result != 0)
- GOTO(out, result);
-
- if (dev->ld_target[ost_idx] == NULL) {
- CERROR("%s: OST %04x is not initialized\n",
- lov2obd(dev->ld_lov)->obd_name, ost_idx);
- GOTO(out, result = -EIO);
- }
+ OBD_ALLOC(comp->lo_entries, entry_count * sizeof(*comp->lo_entries));
+ if (comp->lo_entries == NULL)
+ RETURN(-ENOMEM);
- subdev = lovsub2cl_dev(dev->ld_target[ost_idx]);
- subconf->u.coc_oinfo = oinfo;
- LASSERTF(subdev != NULL, "not init ost %d\n", ost_idx);
- /* In the function below, .hs_keycmp resolves to
- * lu_obj_hop_keycmp() */
- /* coverity[overrun-buffer-val] */
- stripe = lov_sub_find(env, subdev, ofid, subconf);
- if (!IS_ERR(stripe)) {
- result = lov_init_sub(env, lov, stripe, r0, i);
- if (result == -EAGAIN) { /* try again */
- --i;
- result = 0;
- continue;
- }
- } else {
- result = PTR_ERR(stripe);
- }
+ for (i = 0; i < entry_count; i++) {
+ struct lov_layout_entry *le = &comp->lo_entries[i];
- if (result == 0) {
- int sz = lov_page_slice_fixup(lov, stripe);
- LASSERT(ergo(psz > 0, psz == sz));
- psz = sz;
- }
- }
- if (result == 0)
- cl_object_header(&lov->lo_cl)->coh_page_bufsize += psz;
- } else
- result = -ENOMEM;
-out:
- RETURN(result);
+ le->lle_extent = lsm->lsm_entries[i]->lsme_extent;
+ result = lov_init_raid0(env, dev, lov, i, &le->lle_raid0);
+ if (result < 0)
+ break;
+
+ LASSERT(ergo(psz > 0, psz == result));
+ psz = result;
+ }
+ if (psz > 0)
+ cl_object_header(&lov->lo_cl)->coh_page_bufsize += psz;
+
+ return result > 0 ? 0 : result;
}
static int lov_init_released(const struct lu_env *env,
static struct cl_object *lov_find_subobj(const struct lu_env *env,
struct lov_object *lov,
struct lov_stripe_md *lsm,
- int stripe_idx)
+ int index)
{
struct lov_device *dev = lu2lov_dev(lov2lu(lov)->lo_dev);
- struct lov_oinfo *oinfo = lsm->lsm_entries[0]->lsme_oinfo[stripe_idx];
struct lov_thread_info *lti = lov_env_info(env);
struct lu_fid *ofid = <i->lti_fid;
+ struct lov_oinfo *oinfo;
struct cl_device *subdev;
+ int entry = lov_comp_entry(index);
+ int stripe = lov_comp_stripe(index);
int ost_idx;
int rc;
struct cl_object *result;
- if (lov->lo_type != LLT_RAID0)
+ if (lov->lo_type != LLT_COMP)
GOTO(out, result = NULL);
+ if (entry >= lsm->lsm_entry_count ||
+ stripe >= lsm->lsm_entries[entry]->lsme_stripe_count)
+ GOTO(out, result = NULL);
+
+ oinfo = lsm->lsm_entries[entry]->lsme_oinfo[stripe];
ost_idx = oinfo->loi_ost_idx;
rc = ostid_to_fid(ofid, &oinfo->loi_oi, ost_idx);
if (rc != 0)
}
static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
+ struct lov_layout_raid0 *r0,
struct lovsub_object *los, int idx)
{
struct cl_object *sub;
- struct lov_layout_raid0 *r0;
struct lu_site *site;
struct lu_site_bkt_data *bkt;
wait_queue_t *waiter;
- r0 = &lov->u.raid0;
LASSERT(r0->lo_sub[idx] == los);
sub = lovsub2cl(los);
LASSERT(r0->lo_sub[idx] == NULL);
}
-static int lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
- union lov_layout_state *state)
+static void lov_delete_raid0(const struct lu_env *env, struct lov_object *lov,
+ struct lov_layout_raid0 *r0)
{
- struct lov_layout_raid0 *r0 = &state->raid0;
- struct lov_stripe_md *lsm = lov->lo_lsm;
- int i;
-
ENTRY;
- dump_lsm(D_INODE, lsm);
-
- lov_layout_wait(env, lov);
if (r0->lo_sub != NULL) {
- for (i = 0; i < r0->lo_nr; ++i) {
- struct lovsub_object *los = r0->lo_sub[i];
+ int i;
+
+ for (i = 0; i < r0->lo_nr; ++i) {
+ struct lovsub_object *los = r0->lo_sub[i];
- if (los != NULL) {
+ if (los != NULL) {
cl_object_prune(env, &los->lso_cl);
- /*
- * If top-level object is to be evicted from
- * the cache, so are its sub-objects.
- */
- lov_subobject_kill(env, lov, los, i);
+ /*
+ * If top-level object is to be evicted from
+ * the cache, so are its sub-objects.
+ */
+ lov_subobject_kill(env, lov, r0, los, i);
}
}
}
+
+ EXIT;
+}
+
+static int lov_delete_composite(const struct lu_env *env,
+ struct lov_object *lov,
+ union lov_layout_state *state)
+{
+ struct lov_layout_entry *entry;
+
+ ENTRY;
+
+ dump_lsm(D_INODE, lov->lo_lsm);
+
+ lov_layout_wait(env, lov);
+ lov_foreach_layout_entry(lov, entry)
+ lov_delete_raid0(env, lov, &entry->lle_raid0);
+
RETURN(0);
}
LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED);
}
-static void lov_fini_raid0(const struct lu_env *env, struct lov_object *lov,
- union lov_layout_state *state)
+static void lov_fini_raid0(const struct lu_env *env,
+ struct lov_layout_raid0 *r0)
{
- struct lov_layout_raid0 *r0 = &state->raid0;
- ENTRY;
-
if (r0->lo_sub != NULL) {
OBD_FREE_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
r0->lo_sub = NULL;
}
+}
+
+static void lov_fini_composite(const struct lu_env *env,
+ struct lov_object *lov,
+ union lov_layout_state *state)
+{
+ struct lov_layout_composite *comp = &state->composite;
+ ENTRY;
+
+ if (comp->lo_entries != NULL) {
+ struct lov_layout_entry *entry;
+
+ lov_foreach_layout_entry(lov, entry)
+ lov_fini_raid0(env, &entry->lle_raid0);
+
+ OBD_FREE(comp->lo_entries,
+ comp->lo_entry_count * sizeof(*comp->lo_entries));
+ comp->lo_entries = NULL;
+ }
dump_lsm(D_INODE, lov->lo_lsm);
lov_free_memmd(&lov->lo_lsm);
}
static int lov_print_raid0(const struct lu_env *env, void *cookie,
- lu_printer_t p, const struct lu_object *o)
+ lu_printer_t p, struct lov_layout_raid0 *r0)
{
- struct lov_object *lov = lu2lov(o);
- struct lov_layout_raid0 *r0 = lov_r0(lov);
- struct lov_stripe_md *lsm = lov->lo_lsm;
- int i;
+ int i;
- (*p)(env, cookie, "stripes: %d, %s, lsm{%p 0x%08X %d %u %u}:\n",
- r0->lo_nr, lov->lo_layout_invalid ? "invalid" : "valid", lsm,
- lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
- lsm->lsm_entries[0]->lsme_stripe_count, lsm->lsm_layout_gen);
for (i = 0; i < r0->lo_nr; ++i) {
struct lu_object *sub;
return 0;
}
+static int lov_print_composite(const struct lu_env *env, void *cookie,
+ lu_printer_t p, const struct lu_object *o)
+{
+ struct lov_object *lov = lu2lov(o);
+ struct lov_stripe_md *lsm = lov->lo_lsm;
+ int i;
+
+ (*p)(env, cookie, "entries: %d, %s, lsm{%p 0x%08X %d %u}:\n",
+ lsm->lsm_entry_count,
+ lov->lo_layout_invalid ? "invalid" : "valid", lsm,
+ lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
+ lsm->lsm_layout_gen);
+
+ for (i = 0; i < lsm->lsm_entry_count; i++) {
+ struct lov_stripe_md_entry *lse = lsm->lsm_entries[i];
+
+ (*p)(env, cookie, DEXT ": { 0x%08X, %u, %u, %u, %u }\n",
+ PEXT(&lse->lsme_extent), lse->lsme_magic,
+ lse->lsme_id, lse->lsme_layout_gen,
+ lse->lsme_stripe_count, lse->lsme_stripe_size);
+ lov_print_raid0(env, cookie, p, lov_r0(lov, i));
+ }
+
+ return 0;
+}
+
static int lov_print_released(const struct lu_env *env, void *cookie,
lu_printer_t p, const struct lu_object *o)
{
struct lov_stripe_md *lsm = lov->lo_lsm;
(*p)(env, cookie,
- "released: %s, lsm{%p 0x%08X %d %u %u}:\n",
+ "released: %s, lsm{%p 0x%08X %d %u}:\n",
lov->lo_layout_invalid ? "invalid" : "valid", lsm,
lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
- lsm->lsm_entries[0]->lsme_stripe_count, lsm->lsm_layout_gen);
+ lsm->lsm_layout_gen);
return 0;
}
return 0;
}
-static int lov_attr_get_raid0(const struct lu_env *env, struct cl_object *obj,
- struct cl_attr *attr)
+static int lov_attr_get_raid0(const struct lu_env *env, struct lov_object *lov,
+ unsigned int index, struct lov_layout_raid0 *r0)
+
+{
+ struct lov_stripe_md *lsm = lov->lo_lsm;
+ struct ost_lvb *lvb = &lov_env_info(env)->lti_lvb;
+ struct cl_attr *attr = &r0->lo_attr;
+ __u64 kms = 0;
+ int result = 0;
+
+ if (r0->lo_attr_valid)
+ return 0;
+
+ memset(lvb, 0, sizeof(*lvb));
+
+ /* XXX: timestamps can be negative by sanity:test_39m,
+ * how can it be? */
+ lvb->lvb_atime = LLONG_MIN;
+ lvb->lvb_ctime = LLONG_MIN;
+ lvb->lvb_mtime = LLONG_MIN;
+
+ /*
+ * XXX that should be replaced with a loop over sub-objects,
+ * doing cl_object_attr_get() on them. But for now, let's
+ * reuse old lov code.
+ */
+
+ /*
+ * XXX take lsm spin-lock to keep lov_merge_lvb_kms()
+ * happy. It's not needed, because new code uses
+ * ->coh_attr_guard spin-lock to protect consistency of
+ * sub-object attributes.
+ */
+ lov_stripe_lock(lsm);
+ result = lov_merge_lvb_kms(lsm, index, lvb, &kms);
+ lov_stripe_unlock(lsm);
+ if (result == 0) {
+ cl_lvb2attr(attr, lvb);
+ attr->cat_kms = kms;
+ r0->lo_attr_valid = 1;
+ }
+
+ return result;
+}
+
+static int lov_attr_get_composite(const struct lu_env *env,
+ struct cl_object *obj,
+ struct cl_attr *attr)
{
struct lov_object *lov = cl2lov(obj);
- struct lov_layout_raid0 *r0 = lov_r0(lov);
- struct cl_attr *lov_attr = &r0->lo_attr;
+ struct lov_layout_entry *entry;
int result = 0;
+ int index = 0;
- ENTRY;
+ ENTRY;
- /* this is called w/o holding type guard mutex, so it must be inside
- * an on going IO otherwise lsm may be replaced.
- * LU-2117: it turns out there exists one exception. For mmaped files,
- * the lock of those files may be requested in the other file's IO
- * context, and this function is called in ccc_lock_state(), it will
- * hit this assertion.
- * Anyway, it's still okay to call attr_get w/o type guard as layout
- * can't go if locks exist. */
- /* LASSERT(atomic_read(&lsm->lsm_refc) > 1); */
-
- if (!r0->lo_attr_valid) {
- struct lov_stripe_md *lsm = lov->lo_lsm;
- struct ost_lvb *lvb = &lov_env_info(env)->lti_lvb;
- __u64 kms = 0;
-
- memset(lvb, 0, sizeof(*lvb));
- /* XXX: timestamps can be negative by sanity:test_39m,
- * how can it be? */
- lvb->lvb_atime = LLONG_MIN;
- lvb->lvb_ctime = LLONG_MIN;
- lvb->lvb_mtime = LLONG_MIN;
+ attr->cat_size = 0;
+ attr->cat_blocks = 0;
+ lov_foreach_layout_entry(lov, entry) {
+ struct lov_layout_raid0 *r0 = &entry->lle_raid0;
+ struct cl_attr *lov_attr = &r0->lo_attr;
- /*
- * XXX that should be replaced with a loop over sub-objects,
- * doing cl_object_attr_get() on them. But for now, let's
- * reuse old lov code.
- */
+ result = lov_attr_get_raid0(env, lov, index, r0);
+ if (result != 0)
+ break;
- /*
- * XXX take lsm spin-lock to keep lov_merge_lvb_kms()
- * happy. It's not needed, because new code uses
- * ->coh_attr_guard spin-lock to protect consistency of
- * sub-object attributes.
- */
- lov_stripe_lock(lsm);
- result = lov_merge_lvb_kms(lsm, lvb, &kms);
- lov_stripe_unlock(lsm);
- if (result == 0) {
- cl_lvb2attr(lov_attr, lvb);
- lov_attr->cat_kms = kms;
- r0->lo_attr_valid = 1;
- }
- }
- if (result == 0) { /* merge results */
- attr->cat_blocks = lov_attr->cat_blocks;
- attr->cat_size = lov_attr->cat_size;
- attr->cat_kms = lov_attr->cat_kms;
+ index++;
+
+ /* merge results */
+ attr->cat_blocks += lov_attr->cat_blocks;
+ if (attr->cat_size < lov_attr->cat_size)
+ attr->cat_size = lov_attr->cat_size;
+ if (attr->cat_kms < lov_attr->cat_kms)
+ attr->cat_kms = lov_attr->cat_kms;
if (attr->cat_atime < lov_attr->cat_atime)
attr->cat_atime = lov_attr->cat_atime;
if (attr->cat_ctime < lov_attr->cat_ctime)
.llo_io_init = lov_io_init_empty,
.llo_getattr = lov_attr_get_empty,
},
- [LLT_RAID0] = {
- .llo_init = lov_init_raid0,
- .llo_delete = lov_delete_raid0,
- .llo_fini = lov_fini_raid0,
- .llo_install = lov_install_raid0,
- .llo_print = lov_print_raid0,
- .llo_page_init = lov_page_init_raid0,
- .llo_lock_init = lov_lock_init_raid0,
- .llo_io_init = lov_io_init_raid0,
- .llo_getattr = lov_attr_get_raid0,
- },
[LLT_RELEASED] = {
.llo_init = lov_init_released,
.llo_delete = lov_delete_empty,
.llo_lock_init = lov_lock_init_empty,
.llo_io_init = lov_io_init_released,
.llo_getattr = lov_attr_get_empty,
- }
+ },
+ [LLT_COMP] = {
+ .llo_init = lov_init_composite,
+ .llo_delete = lov_delete_composite,
+ .llo_fini = lov_fini_composite,
+ .llo_install = lov_install_composite,
+ .llo_print = lov_print_composite,
+ .llo_page_init = lov_page_init_composite,
+ .llo_lock_init = lov_lock_init_composite,
+ .llo_io_init = lov_io_init_composite,
+ .llo_getattr = lov_attr_get_composite,
+ },
};
/**
if (lsm == NULL)
return LLT_EMPTY;
- if (lsm->lsm_magic == LOV_MAGIC_COMP_V1)
- return LLT_EMPTY;
-
if (lsm->lsm_is_released)
return LLT_RELEASED;
- return LLT_RAID0;
+ if (lsm->lsm_magic == LOV_MAGIC_V1 ||
+ lsm->lsm_magic == LOV_MAGIC_V3 ||
+ lsm->lsm_magic == LOV_MAGIC_COMP_V1)
+ return LLT_COMP;
+
+ return LLT_EMPTY;
}
static inline void lov_conf_freeze(struct lov_object *lov)
cconf->u.coc_layout.lb_len);
if (IS_ERR(lsm))
RETURN(PTR_ERR(lsm));
+
+ dump_lsm(D_INODE, lsm);
}
/* no locking is necessary, as object is being created */
* over which the mapping is spread
*
* \param lsm [in] striping information for the file
- * \param fm_start [in] logical start of mapping
- * \param fm_end [in] logical end of mapping
+ * \param index [in] stripe component index
+ * \param ext [in] logical extent of mapping
* \param start_stripe [in] starting stripe of the mapping
* \param stripe_count [out] the number of stripes across which to map is
* returned
*
* \retval last_stripe return the last stripe of the mapping
*/
-static int fiemap_calc_last_stripe(struct lov_stripe_md *lsm,
- u64 fm_start, u64 fm_end,
+static int fiemap_calc_last_stripe(struct lov_stripe_md *lsm, int index,
+ struct lu_extent *ext,
int start_stripe, int *stripe_count)
{
+ struct lov_stripe_md_entry *lsme = lsm->lsm_entries[index];
int last_stripe;
u64 obd_start;
u64 obd_end;
int i, j;
- if (fm_end - fm_start > lsm->lsm_entries[0]->lsme_stripe_size *
- lsm->lsm_entries[0]->lsme_stripe_count) {
- last_stripe = (start_stripe < 1 ?
- lsm->lsm_entries[0]->lsme_stripe_count - 1 :
- start_stripe - 1);
- *stripe_count = lsm->lsm_entries[0]->lsme_stripe_count;
+ if (ext->e_end - ext->e_start >
+ lsme->lsme_stripe_size * lsme->lsme_stripe_count) {
+ last_stripe = (start_stripe < 1 ? lsme->lsme_stripe_count - 1 :
+ start_stripe - 1);
+ *stripe_count = lsme->lsme_stripe_count;
} else {
- for (j = 0, i = start_stripe;
- j < lsm->lsm_entries[0]->lsme_stripe_count;
- i = (i + 1) % lsm->lsm_entries[0]->lsme_stripe_count,
- j++) {
- if ((lov_stripe_intersects(lsm, i, fm_start, fm_end,
+ for (j = 0, i = start_stripe; j < lsme->lsme_stripe_count;
+ i = (i + 1) % lsme->lsme_stripe_count, j++) {
+ if ((lov_stripe_intersects(lsm, index, i, ext,
&obd_start, &obd_end)) == 0)
break;
}
*stripe_count = j;
- last_stripe = (start_stripe + j - 1) %
- lsm->lsm_entries[0]->lsme_stripe_count;
+ last_stripe = (start_stripe + j - 1) % lsme->lsme_stripe_count;
}
return last_stripe;
*
* \param fiemap [in] fiemap request header
* \param lsm [in] striping information for the file
- * \param fm_start [in] logical start of mapping
- * \param fm_end [in] logical end of mapping
+ * \param index [in] stripe component index
+ * \param ext [in] logical extent of mapping
* \param start_stripe [out] starting stripe will be returned in this
*/
static u64 fiemap_calc_fm_end_offset(struct fiemap *fiemap,
struct lov_stripe_md *lsm,
- u64 fm_start, u64 fm_end,
+ int index, struct lu_extent *ext,
int *start_stripe)
{
+ struct lov_stripe_md_entry *lsme = lsm->lsm_entries[index];
u64 local_end = fiemap->fm_extents[0].fe_logical;
u64 lun_start;
u64 lun_end;
return 0;
/* Find out stripe_no from ost_index saved in the fe_device */
- for (i = 0; i < lsm->lsm_entries[0]->lsme_stripe_count; i++) {
- struct lov_oinfo *oinfo = lsm->lsm_entries[0]->lsme_oinfo[i];
+ for (i = 0; i < lsme->lsme_stripe_count; i++) {
+ struct lov_oinfo *oinfo = lsme->lsme_oinfo[i];
if (lov_oinfo_is_dummy(oinfo))
continue;
/* If we have finished mapping on previous device, shift logical
* offset to start of next device */
- if (lov_stripe_intersects(lsm, stripe_no, fm_start, fm_end,
- &lun_start, &lun_end) != 0 &&
+ if (lov_stripe_intersects(lsm, index, stripe_no, ext,
+ &lun_start, &lun_end) != 0 &&
local_end < lun_end) {
fm_end_offset = local_end;
*start_stripe = stripe_no;
/* This is a special value to indicate that caller should
* calculate offset in next stripe. */
fm_end_offset = 0;
- *start_stripe = (stripe_no + 1) %
- lsm->lsm_entries[0]->lsme_stripe_count;
+ *start_stripe = (stripe_no + 1) % lsme->lsme_stripe_count;
}
return fm_end_offset;
}
struct fiemap_state {
- struct fiemap *fs_fm;
- u64 fs_start;
- u64 fs_length;
- u64 fs_end;
- u64 fs_end_offset;
- int fs_cur_extent;
- int fs_cnt_need;
- int fs_start_stripe;
- int fs_last_stripe;
- bool fs_device_done;
- bool fs_finish;
- bool fs_enough;
+ struct fiemap *fs_fm;
+ struct lu_extent fs_ext;
+ u64 fs_length;
+ u64 fs_end_offset;
+ int fs_cur_extent;
+ int fs_cnt_need;
+ int fs_start_stripe;
+ int fs_last_stripe;
+ bool fs_device_done;
+ bool fs_finish_stripe;
+ bool fs_enough;
};
int fiemap_for_stripe(const struct lu_env *env, struct cl_object *obj,
- struct lov_stripe_md *lsm,
- struct fiemap *fiemap, size_t *buflen,
- struct ll_fiemap_info_key *fmkey, int stripeno,
- struct fiemap_state *fs)
+ struct lov_stripe_md *lsm, struct fiemap *fiemap,
+ size_t *buflen, struct ll_fiemap_info_key *fmkey,
+ int index, int stripeno, struct fiemap_state *fs)
{
+ struct lov_stripe_md_entry *lsme = lsm->lsm_entries[index];
struct cl_object *subobj;
struct lov_obd *lov = lu2lov_dev(obj->co_lu.lo_dev)->ld_lov;
struct fiemap_extent *fm_ext = &fs->fs_fm->fm_extents[0];
fs->fs_device_done = false;
/* Find out range of mapping on this stripe */
- if ((lov_stripe_intersects(lsm, stripeno, fs->fs_start, fs->fs_end,
+ if ((lov_stripe_intersects(lsm, index, stripeno, &fs->fs_ext,
&lun_start, &obd_object_end)) == 0)
return 0;
- if (lov_oinfo_is_dummy(lsm->lsm_entries[0]->lsme_oinfo[stripeno]))
+ if (lov_oinfo_is_dummy(lsme->lsme_oinfo[stripeno]))
return -EIO;
/* If this is a continuation FIEMAP call and we are on
* end_offset */
if (fs->fs_end_offset != 0 && stripeno == fs->fs_start_stripe)
lun_start = fs->fs_end_offset;
-
- lun_end = fs->fs_length;
- if (lun_end != ~0ULL) {
- /* Handle fs->fs_start + fs->fs_length overflow */
- if (fs->fs_start + fs->fs_length < fs->fs_start)
- fs->fs_length = ~0ULL - fs->fs_start;
- lun_end = lov_size_to_stripe(lsm, fs->fs_start + fs->fs_length,
- stripeno);
- }
-
+ lun_end = lov_size_to_stripe(lsm, index, fs->fs_ext.e_end, stripeno);
if (lun_start == lun_end)
return 0;
len_mapped_single_call = 0;
/* find lobsub object */
- subobj = lov_find_subobj(env, cl2lov(obj), lsm, stripeno);
+ subobj = lov_find_subobj(env, cl2lov(obj), lsm,
+ lov_comp_index(index, stripeno));
if (IS_ERR(subobj))
return PTR_ERR(subobj);
/* If the output buffer is very large and the objects have many
lun_start += len_mapped_single_call;
fs->fs_fm->fm_length = req_fm_len - len_mapped_single_call;
req_fm_len = fs->fs_fm->fm_length;
+ /**
+ * If we've collected enough extent map, we'd request 1 more,
+ * to see whether we coincidentally finished all available
+ * extent map, so that FIEMAP_EXTENT_LAST would be set.
+ */
fs->fs_fm->fm_extent_count = fs->fs_enough ?
1 : fs->fs_cnt_need;
fs->fs_fm->fm_mapped_extents = 0;
fs->fs_fm->fm_flags = fiemap->fm_flags;
- ost_index = lsm->lsm_entries[0]->lsme_oinfo[stripeno]->
- loi_ost_idx;
+ ost_index = lsme->lsme_oinfo[stripeno]->loi_ost_idx;
if (ost_index < 0 || ost_index >= lov->desc.ld_tgt_count)
GOTO(obj_put, rc = -EINVAL);
* we need to return */
if (stripeno == fs->fs_last_stripe) {
fiemap->fm_mapped_extents = 0;
- fs->fs_finish = true;
+ fs->fs_finish_stripe = true;
GOTO(obj_put, rc);
}
break;
* We've collected enough extents and there are
* more extents after it.
*/
- fs->fs_finish = true;
GOTO(obj_put, rc);
}
* the last extent */
if (fm_ext[ext_count - 1].fe_flags & FIEMAP_EXTENT_LAST)
fm_ext[ext_count - 1].fe_flags &= ~FIEMAP_EXTENT_LAST;
- if (lov_stripe_size(lsm, fm_ext[ext_count - 1].fe_logical +
- fm_ext[ext_count - 1].fe_length,
+ if (lov_stripe_size(lsm, index,
+ fm_ext[ext_count - 1].fe_logical +
+ fm_ext[ext_count - 1].fe_length,
stripeno) >= fmkey->lfik_oa.o_size) {
ost_eof = true;
fs->fs_device_done = true;
} while (!ost_done && !ost_eof);
if (stripeno == fs->fs_last_stripe)
- fs->fs_finish = true;
+ fs->fs_finish_stripe = true;
obj_put:
cl_object_put(env, subobj);
struct ll_fiemap_info_key *fmkey,
struct fiemap *fiemap, size_t *buflen)
{
- struct lov_stripe_md *lsm;
- struct fiemap *fm_local = NULL;
- int cur_stripe;
- int stripe_count;
- unsigned int buffer_size = FIEMAP_BUFFER_SIZE;
- int rc = 0;
+ struct lov_stripe_md_entry *lsme;
+ struct lov_stripe_md *lsm;
+ struct fiemap *fm_local = NULL;
+ loff_t whole_start;
+ loff_t whole_end;
+ int entry;
+ int start_entry;
+ int end_entry;
+ int cur_stripe = 0;
+ int stripe_count;
+ unsigned int buffer_size = FIEMAP_BUFFER_SIZE;
+ int rc = 0;
struct fiemap_state fs = { 0 };
ENTRY;
if (lsm == NULL)
RETURN(-ENODATA);
- /**
- * If the stripe_count > 1 and the application does not understand
- * DEVICE_ORDER flag, it cannot interpret the extents correctly.
- */
- if (lsm->lsm_entries[0]->lsme_stripe_count > 1 &&
- !(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER))
- GOTO(out_lsm, rc = -ENOTSUPP);
+ if (!(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER)) {
+ /**
+ * If the entry count > 1 or stripe_count > 1 and the
+ * application does not understand DEVICE_ORDER flag,
+ * it cannot interpret the extents correctly.
+ */
+ if (lsm->lsm_entry_count > 1 ||
+ (lsm->lsm_entry_count == 1 &&
+ lsm->lsm_entries[0]->lsme_stripe_count > 1))
+ GOTO(out_lsm, rc = -ENOTSUPP);
+ }
if (lsm->lsm_is_released) {
if (fiemap->fm_start < fmkey->lfik_oa.o_size) {
GOTO(out_lsm, rc = 0);
}
+ /* buffer_size is small to hold fm_extent_count of extents. */
if (fiemap_count_to_size(fiemap->fm_extent_count) < buffer_size)
buffer_size = fiemap_count_to_size(fiemap->fm_extent_count);
if (fm_local == NULL)
GOTO(out_lsm, rc = -ENOMEM);
- fs.fs_fm = fm_local;
- fs.fs_cnt_need = fiemap_size_to_count(buffer_size);
-
- fs.fs_start = fiemap->fm_start;
- /* fs.fs_start is beyond the end of the file */
- if (fs.fs_start > fmkey->lfik_oa.o_size)
- GOTO(out_fm_local, rc = -EINVAL);
-
- fs.fs_length = fiemap->fm_length;
- /* Calculate start stripe, last stripe and length of mapping */
- fs.fs_start_stripe = lov_stripe_number(lsm, fs.fs_start);
- fs.fs_end = (fs.fs_length == ~0ULL) ? fmkey->lfik_oa.o_size :
- fs.fs_start + fs.fs_length - 1;
- /* If fs_length != ~0ULL but fs_start+fs_length-1 exceeds file size */
- if (fs.fs_end > fmkey->lfik_oa.o_size) {
- fs.fs_end = fmkey->lfik_oa.o_size;
- fs.fs_length = fs.fs_end - fs.fs_start;
- }
-
- fs.fs_last_stripe = fiemap_calc_last_stripe(lsm, fs.fs_start, fs.fs_end,
- fs.fs_start_stripe,
- &stripe_count);
- fs.fs_end_offset = fiemap_calc_fm_end_offset(fiemap, lsm, fs.fs_start,
- fs.fs_end,
- &fs.fs_start_stripe);
- if (fs.fs_end_offset == -EINVAL)
- GOTO(out_fm_local, rc = -EINVAL);
-
/**
* Requested extent count exceeds the fiemap buffer size, shrink our
* ambition.
if (fiemap->fm_extent_count == 0)
fs.fs_cnt_need = 0;
- fs.fs_finish = false;
fs.fs_enough = false;
fs.fs_cur_extent = 0;
+ fs.fs_fm = fm_local;
+ fs.fs_cnt_need = fiemap_size_to_count(buffer_size);
- /* Check each stripe */
- for (cur_stripe = fs.fs_start_stripe; stripe_count > 0;
- --stripe_count,
- cur_stripe = (cur_stripe + 1) %
- lsm->lsm_entries[0]->lsme_stripe_count) {
- rc = fiemap_for_stripe(env, obj, lsm, fiemap, buflen, fmkey,
- cur_stripe, &fs);
- if (rc < 0)
- GOTO(out_fm_local, rc);
- if (fs.fs_finish)
- break;
- } /* for each stripe */
+ whole_start = fiemap->fm_start;
+ /* whole_start is beyond the end of the file */
+ if (whole_start > fmkey->lfik_oa.o_size)
+ GOTO(out_fm_local, rc = -EINVAL);
+ whole_end = (fiemap->fm_length == OBD_OBJECT_EOF) ?
+ fmkey->lfik_oa.o_size :
+ whole_start + fiemap->fm_length - 1;
+ /**
+ * If fiemap->fm_length != OBD_OBJECT_EOF but whole_end exceeds file
+ * size
+ */
+ if (whole_end > fmkey->lfik_oa.o_size)
+ whole_end = fmkey->lfik_oa.o_size;
+
+ start_entry = lov_lsm_entry(lsm, whole_start);
+ end_entry = lov_lsm_entry(lsm, whole_end);
+ if (end_entry == -1)
+ end_entry = lsm->lsm_entry_count - 1;
+ if (start_entry == -1 || end_entry == -1)
+ GOTO(out_fm_local, rc = -EINVAL);
+
+ for (entry = start_entry; entry <= end_entry; entry++) {
+ lsme = lsm->lsm_entries[entry];
+
+ if (entry == start_entry)
+ fs.fs_ext.e_start = whole_start;
+ else
+ fs.fs_ext.e_start = lsme->lsme_extent.e_start;
+ if (entry == end_entry)
+ fs.fs_ext.e_end = whole_end;
+ else
+ fs.fs_ext.e_end = lsme->lsme_extent.e_end - 1;
+ fs.fs_length = fs.fs_ext.e_end - fs.fs_ext.e_start + 1;
+
+ /* Calculate start stripe, last stripe and length of mapping */
+ fs.fs_start_stripe = lov_stripe_number(lsm, entry,
+ fs.fs_ext.e_start);
+ fs.fs_last_stripe = fiemap_calc_last_stripe(lsm, entry,
+ &fs.fs_ext, fs.fs_start_stripe,
+ &stripe_count);
+ fs.fs_end_offset = fiemap_calc_fm_end_offset(fiemap, lsm, entry,
+ &fs.fs_ext, &fs.fs_start_stripe);
+ /* Check each stripe */
+ for (cur_stripe = fs.fs_start_stripe; stripe_count > 0;
+ --stripe_count,
+ cur_stripe = (cur_stripe + 1) % lsme->lsme_stripe_count) {
+ rc = fiemap_for_stripe(env, obj, lsm, fiemap, buflen,
+ fmkey, entry, cur_stripe, &fs);
+ if (rc < 0)
+ GOTO(out_fm_local, rc);
+ if (fs.fs_enough)
+ GOTO(finish, rc);
+ if (fs.fs_finish_stripe)
+ break;
+ } /* for each stripe */
+ } /* for covering layout component */
+ /*
+ * We've traversed all components, set @entry to the last component
+ * entry, it's for the last stripe check.
+ */
+ entry--;
+finish:
/* Indicate that we are returning device offsets unless file just has
* single stripe */
- if (lsm->lsm_entries[0]->lsme_stripe_count > 1)
+ if (lsm->lsm_entry_count > 1 ||
+ (lsm->lsm_entry_count == 1 &&
+ lsm->lsm_entries[0]->lsme_stripe_count > 1))
fiemap->fm_flags |= FIEMAP_FLAG_DEVICE_ORDER;
if (fiemap->fm_extent_count == 0)
out_lsm:
lov_lsm_put(lsm);
-
return rc;
}
lov_conf_freeze(lov);
switch (lov->lo_type) {
- case LLT_RAID0: {
+ case LLT_COMP: {
struct lov_stripe_md *lsm;
int i;
lsm = lov->lo_lsm;
LASSERT(lsm != NULL);
- for (i = 0; i < lsm->lsm_entries[0]->lsme_stripe_count;
- i++) {
- struct lov_oinfo *loi =
- lsm->lsm_entries[0]->lsme_oinfo[i];
+ for (i = 0; i < lsm->lsm_entry_count; i++) {
+ struct lov_stripe_md_entry *lse =
+ lsm->lsm_entries[i];
+ int j;
- if (lov_oinfo_is_dummy(loi))
- continue;
+ for (j = 0; j < lse->lsme_stripe_count; j++) {
+ struct lov_oinfo *loi =
+ lse->lsme_oinfo[j];
- if (loi->loi_ar.ar_rc && !rc)
- rc = loi->loi_ar.ar_rc;
- loi->loi_ar.ar_rc = 0;
+ if (lov_oinfo_is_dummy(loi))
+ continue;
+
+ if (loi->loi_ar.ar_rc && !rc)
+ rc = loi->loi_ar.ar_rc;
+ loi->loi_ar.ar_rc = 0;
+ }
}
}
case LLT_RELEASED:
#include "lov_internal.h"
+static loff_t stripe_width(struct lov_stripe_md *lsm, unsigned int index)
+{
+ struct lov_stripe_md_entry *entry = lsm->lsm_entries[index];
+
+ LASSERT(index < lsm->lsm_entry_count);
+
+ return (loff_t)entry->lsme_stripe_size * entry->lsme_stripe_count;
+}
+
/* compute object size given "stripeno" and the ost size */
-u64 lov_stripe_size(struct lov_stripe_md *lsm, u64 ost_size, int stripeno)
+u64 lov_stripe_size(struct lov_stripe_md *lsm, int index, u64 ost_size,
+ int stripeno)
{
- unsigned long ssize = lsm->lsm_entries[0]->lsme_stripe_size;
+ unsigned long ssize = lsm->lsm_entries[index]->lsme_stripe_size;
unsigned long stripe_size;
loff_t swidth;
loff_t lov_size;
- u32 magic = lsm->lsm_magic;
ENTRY;
if (ost_size == 0)
RETURN(0);
- LASSERT(lsm_op_find(magic) != NULL);
- lsm_op_find(magic)->lsm_stripe_by_index(lsm, &stripeno, NULL, &swidth);
+ swidth = stripe_width(lsm, index);
/* lov_do_div64(a, b) returns a % b, and a = a / b */
stripe_size = lov_do_div64(ost_size, ssize);
/**
* Compute file level page index by stripe level page offset
*/
-pgoff_t lov_stripe_pgoff(struct lov_stripe_md *lsm, pgoff_t stripe_index,
- int stripe)
+pgoff_t lov_stripe_pgoff(struct lov_stripe_md *lsm, int index,
+ pgoff_t stripe_index, int stripe)
{
loff_t offset;
- offset = lov_stripe_size(lsm, (stripe_index << PAGE_SHIFT) + 1,
+ offset = lov_stripe_size(lsm, index,
+ (stripe_index << PAGE_SHIFT) + 1,
stripe);
return offset >> PAGE_SHIFT;
}
* was moved forward to the start of the stripe in question; 0 when it
* falls in the stripe and no shifting was done; > 0 when the offset
* was outside the stripe and was pulled back to its final byte. */
-int lov_stripe_offset(struct lov_stripe_md *lsm, loff_t lov_off, int stripeno,
- loff_t *obdoff)
+int lov_stripe_offset(struct lov_stripe_md *lsm, int index, loff_t lov_off,
+ int stripeno, loff_t *obdoff)
{
- unsigned long ssize = lsm->lsm_entries[0]->lsme_stripe_size;
+ unsigned long ssize = lsm->lsm_entries[index]->lsme_stripe_size;
loff_t stripe_off;
loff_t this_stripe;
loff_t swidth;
- u32 magic = lsm->lsm_magic;
int ret = 0;
if (lov_off == OBD_OBJECT_EOF) {
return 0;
}
- LASSERT(lsm_op_find(magic) != NULL);
- lsm_op_find(magic)->lsm_stripe_by_index(lsm, &stripeno, &lov_off,
- &swidth);
+ swidth = stripe_width(lsm, index);
/* lov_do_div64(a, b) returns a % b, and a = a / b */
stripe_off = lov_do_div64(lov_off, swidth);
* | 0 | 1 | 2 | 0 | 1 | 2 |
* ---------------------------------------------------------------------
*/
-loff_t lov_size_to_stripe(struct lov_stripe_md *lsm, u64 file_size,
+loff_t lov_size_to_stripe(struct lov_stripe_md *lsm, int index, u64 file_size,
int stripeno)
{
- unsigned long ssize = lsm->lsm_entries[0]->lsme_stripe_size;
+ unsigned long ssize = lsm->lsm_entries[index]->lsme_stripe_size;
loff_t stripe_off;
loff_t this_stripe;
loff_t swidth;
- u32 magic = lsm->lsm_magic;
if (file_size == OBD_OBJECT_EOF)
return OBD_OBJECT_EOF;
- LASSERT(lsm_op_find(magic) != NULL);
- lsm_op_find(magic)->lsm_stripe_by_index(lsm, &stripeno, &file_size,
- &swidth);
+ swidth = stripe_width(lsm, index);
/* lov_do_div64(a, b) returns a % b, and a = a / b */
stripe_off = lov_do_div64(file_size, swidth);
/* given an extent in an lov and a stripe, calculate the extent of the stripe
* that is contained within the lov extent. this returns true if the given
* stripe does intersect with the lov extent. */
-int lov_stripe_intersects(struct lov_stripe_md *lsm, int stripeno,
- u64 start, u64 end, u64 *obd_start, u64 *obd_end)
+int lov_stripe_intersects(struct lov_stripe_md *lsm, int index, int stripeno,
+ struct lu_extent *ext, u64 *obd_start, u64 *obd_end)
{
+ struct lov_stripe_md_entry *entry = lsm->lsm_entries[index];
+ u64 start, end;
int start_side, end_side;
- start_side = lov_stripe_offset(lsm, start, stripeno, obd_start);
- end_side = lov_stripe_offset(lsm, end, stripeno, obd_end);
+ if (!lu_extent_is_overlapped(ext, &entry->lsme_extent))
+ return 0;
+
+ start = max_t(__u64, ext->e_start, entry->lsme_extent.e_start);
+ end = min_t(__u64, ext->e_end, entry->lsme_extent.e_end);
+ if (end != OBD_OBJECT_EOF)
+ end--;
+
+ start_side = lov_stripe_offset(lsm, index, start, stripeno, obd_start);
+ end_side = lov_stripe_offset(lsm, index, end, stripeno, obd_end);
CDEBUG(D_INODE, "[%lld->%lld] -> [(%d) %lld->%lld (%d)]\n",
start, end, start_side, *obd_start, *obd_end, end_side);
}
/* compute which stripe number "lov_off" will be written into */
-int lov_stripe_number(struct lov_stripe_md *lsm, loff_t lov_off)
+int lov_stripe_number(struct lov_stripe_md *lsm, int index, loff_t lov_off)
{
- unsigned long ssize = lsm->lsm_entries[0]->lsme_stripe_size;
+ unsigned long ssize = lsm->lsm_entries[index]->lsme_stripe_size;
loff_t stripe_off;
loff_t swidth;
- u32 magic = lsm->lsm_magic;
- LASSERT(lsm_op_find(magic) != NULL);
- lsm_op_find(magic)->lsm_stripe_by_offset(lsm, NULL, &lov_off, &swidth);
+ swidth = stripe_width(lsm, index);
stripe_off = lov_do_div64(lov_off, swidth);
*
*/
-static int lov_raid0_page_print(const struct lu_env *env,
- const struct cl_page_slice *slice,
- void *cookie, lu_printer_t printer)
+static int lov_comp_page_print(const struct lu_env *env,
+ const struct cl_page_slice *slice,
+ void *cookie, lu_printer_t printer)
{
struct lov_page *lp = cl2lov_page(slice);
return (*printer)(env, cookie, LUSTRE_LOV_NAME"-page@%p, raid0\n", lp);
}
-static const struct cl_page_operations lov_raid0_page_ops = {
- .cpo_print = lov_raid0_page_print
+static const struct cl_page_operations lov_comp_page_ops = {
+ .cpo_print = lov_comp_page_print
};
-int lov_page_init_raid0(const struct lu_env *env, struct cl_object *obj,
- struct cl_page *page, pgoff_t index)
+int lov_page_init_composite(const struct lu_env *env, struct cl_object *obj,
+ struct cl_page *page, pgoff_t index)
{
struct lov_object *loo = cl2lov(obj);
- struct lov_layout_raid0 *r0 = lov_r0(loo);
struct lov_io *lio = lov_env_io(env);
struct cl_object *subobj;
struct cl_object *o;
struct lov_io_sub *sub;
struct lov_page *lpg = cl_object_page_slice(obj, page);
+ struct lov_layout_raid0 *r0;
loff_t offset;
- loff_t suboff;
+ loff_t suboff;
+ int entry;
int stripe;
int rc;
ENTRY;
offset = cl_offset(obj, index);
- stripe = lov_stripe_number(loo->lo_lsm, offset);
+ entry = lov_lsm_entry(loo->lo_lsm, offset);
+ if (entry < 0) {
+ /* non-existing layout component */
+ lov_page_init_empty(env, obj, page, index);
+ RETURN(0);
+ }
+
+ r0 = lov_r0(loo, entry);
+ stripe = lov_stripe_number(loo->lo_lsm, entry, offset);
LASSERT(stripe < r0->lo_nr);
- rc = lov_stripe_offset(loo->lo_lsm, offset, stripe,
- &suboff);
+ rc = lov_stripe_offset(loo->lo_lsm, entry, offset, stripe, &suboff);
LASSERT(rc == 0);
- lpg->lps_stripe = stripe;
- cl_page_slice_add(page, &lpg->lps_cl, obj, index, &lov_raid0_page_ops);
+ lpg->lps_index = lov_comp_index(entry, stripe);
+ cl_page_slice_add(page, &lpg->lps_cl, obj, index, &lov_comp_page_ops);
- sub = lov_sub_get(env, lio, stripe);
+ sub = lov_sub_get(env, lio, lpg->lps_index);
if (IS_ERR(sub))
RETURN(PTR_ERR(sub));
/* We can't assume lov was assigned here, because of the shadow
* object handling in lu_object_find.
*/
- if (lov) {
- LASSERT(lov->lo_type == LLT_RAID0);
- LASSERT(lov->u.raid0.lo_sub[los->lso_index] == los);
- spin_lock(&lov->u.raid0.lo_sub_lock);
- lov->u.raid0.lo_sub[los->lso_index] = NULL;
- spin_unlock(&lov->u.raid0.lo_sub_lock);
- }
+ if (lov != NULL) {
+ int index = lov_comp_entry(los->lso_index);
+ int stripe = lov_comp_stripe(los->lso_index);
+ struct lov_layout_raid0 *r0 = lov_r0(lov, index);
+
+ LASSERT(lov->lo_type == LLT_COMP);
+ LASSERT(r0->lo_sub[stripe] == los);
+ spin_lock(&r0->lo_sub_lock);
+ r0->lo_sub[stripe] = NULL;
+ spin_unlock(&r0->lo_sub_lock);
+ }
lu_object_fini(obj);
lu_object_header_fini(&los->lso_header.coh_lu);
static int lovsub_attr_update(const struct lu_env *env, struct cl_object *obj,
const struct cl_attr *attr, unsigned valid)
{
+ struct lovsub_object *los = cl2lovsub(obj);
struct lov_object *lov = cl2lovsub(obj)->lso_super;
ENTRY;
- lov_r0(lov)->lo_attr_valid = 0;
+ lov_r0(lov, lov_comp_entry(los->lso_index))->lo_attr_valid = 0;
RETURN(0);
}
* There is no OBD_MD_* flag for obdo::o_stripe_idx, so set it
* unconditionally. It never changes anyway.
*/
- attr->cra_oa->o_stripe_idx = subobj->lso_index;
+ attr->cra_oa->o_stripe_idx = lov_comp_stripe(subobj->lso_index);
EXIT;
}
io_start = cl_index(obj, io->u.ci_rw.crw_pos);
io_end = cl_index(obj, io->u.ci_rw.crw_pos +
io->u.ci_rw.crw_count - 1);
- if (cl_io_is_append(io)) {
- io_start = 0;
- io_end = CL_PAGE_EOF;
- }
} else {
LASSERT(cl_io_is_mkwrite(io));
io_start = io_end = io->u.ci_fault.ft_index;
}
if (descr->cld_mode >= CLM_WRITE &&
- descr->cld_start <= io_start && descr->cld_end >= io_end) {
+ (cl_io_is_append(io) ||
+ (descr->cld_start <= io_start && descr->cld_end >= io_end))) {
struct osc_io *oio = osc_env_io(env);
/* There must be only one lock to match the write region */