*/
ci_ignore_layout:1,
/**
+ * Need MDS intervention to complete a write. This usually means the
+ * corresponding component is not initialized for the writing extent.
+ */
+ ci_need_write_intent:1,
+ /**
* Check if layout changed after the IO finishes. Mainly for HSM
* requirement. If IO occurs to openning files, it doesn't need to
* verify layout because HSM won't release openning files.
} __attribute__((packed));
enum {
- LAYOUT_INTENT_ACCESS = 0,
- LAYOUT_INTENT_READ = 1,
- LAYOUT_INTENT_WRITE = 2,
- LAYOUT_INTENT_GLIMPSE = 3,
- LAYOUT_INTENT_TRUNC = 4,
- LAYOUT_INTENT_RELEASE = 5,
- LAYOUT_INTENT_RESTORE = 6
+ LAYOUT_INTENT_ACCESS = 0, /** generic access */
+ LAYOUT_INTENT_READ = 1, /** not used */
+ LAYOUT_INTENT_WRITE = 2, /** write file, for comp layout */
+ LAYOUT_INTENT_GLIMPSE = 3, /** not used */
+ LAYOUT_INTENT_TRUNC = 4, /** truncate file, for comp layout */
+ LAYOUT_INTENT_RELEASE = 5, /** reserved for HSM release */
+ LAYOUT_INTENT_RESTORE = 6, /** reserved for HSM restore */
};
/* enqueue layout lock with intent */
struct layout_intent {
- __u32 li_opc; /* intent operation for enqueue, read, write etc */
+ __u32 li_opc; /* intent operation for enqueue, read, write etc */
__u32 li_flags;
__u64 li_start;
__u64 li_end;
-};
+} __attribute__((packed));
/**
* On the wire version of hsm_progress structure.
struct ptlrpc_svc_ctx;
struct ptlrpc_cli_ctx;
struct ptlrpc_ctx_ops;
+struct req_msg_field;
/**
* \addtogroup flavor flavor
int sptlrpc_cli_alloc_repbuf(struct ptlrpc_request *req, int msgsize);
void sptlrpc_cli_free_repbuf(struct ptlrpc_request *req);
int sptlrpc_cli_enlarge_reqbuf(struct ptlrpc_request *req,
- int segment, int newsize);
+ const struct req_msg_field *field,
+ int newsize);
int sptlrpc_cli_unwrap_early_reply(struct ptlrpc_request *req,
struct ptlrpc_request **req_ret);
void sptlrpc_cli_finish_early_reply(struct ptlrpc_request *early_req);
if (count > 0 && args->via_io_subtype == IO_NORMAL)
args->u.normal.via_iter = vio->vui_iter;
}
- GOTO(out, rc);
out:
cl_io_fini(env, io);
CDEBUG(D_VFSTRACE, "iot: %d, result: %zd\n", iot, result);
- return result > 0 ? result : rc;
+ RETURN(result > 0 ? result : rc);
}
/**
lock_res_and_lock(lock);
lvb_ready = ldlm_is_lvb_ready(lock);
unlock_res_and_lock(lock);
+
/* checking lvb_ready is racy but this is okay. The worst case is
* that multi processes may configure the file on the same time. */
-
if (lvb_ready)
GOTO(out, rc = 0);
/* refresh layout failed, need to wait */
wait_layout = rc == -EBUSY;
EXIT;
-
out:
LDLM_LOCK_PUT(lock);
ldlm_lock_decref(lockh, mode);
RETURN(rc);
}
-static int ll_layout_refresh_locked(struct inode *inode)
+/**
+ * Issue layout intent RPC to MDS.
+ * \param inode [in] file inode
+ * \param intent [in] layout intent
+ *
+ * \retval 0 on success
+ * \retval < 0 error code
+ */
+static int ll_layout_intent(struct inode *inode, struct layout_intent *intent)
{
struct ll_inode_info *lli = ll_i2info(inode);
struct ll_sb_info *sbi = ll_i2sbi(inode);
struct md_op_data *op_data;
- struct lookup_intent it;
- struct lustre_handle lockh;
- enum ldlm_mode mode;
+ struct lookup_intent it;
struct ptlrpc_request *req;
int rc;
ENTRY;
-again:
- /* mostly layout lock is caching on the local side, so try to match
- * it before grabbing layout lock mutex. */
- mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0,
- LCK_CR | LCK_CW | LCK_PR | LCK_PW);
- if (mode != 0) { /* hit cached lock */
- rc = ll_layout_lock_set(&lockh, mode, inode);
- if (rc == -EAGAIN)
- goto again;
-
- RETURN(rc);
- }
-
op_data = ll_prep_md_op_data(NULL, inode, inode, NULL,
0, 0, LUSTRE_OPC_ANY, NULL);
if (IS_ERR(op_data))
RETURN(PTR_ERR(op_data));
- /* have to enqueue one */
+ op_data->op_data = intent;
+ op_data->op_data_size = sizeof(*intent);
+
memset(&it, 0, sizeof(it));
it.it_op = IT_LAYOUT;
+ if (intent->li_opc == LAYOUT_INTENT_WRITE ||
+ intent->li_opc == LAYOUT_INTENT_TRUNC)
+ it.it_flags = FMODE_WRITE;
LDLM_DEBUG_NOLOCK("%s: requeue layout lock for file "DFID"(%p)",
ll_get_fsname(inode->i_sb, NULL, 0),
ll_finish_md_op_data(op_data);
- mode = it.it_lock_mode;
- it.it_lock_mode = 0;
- ll_intent_drop_lock(&it);
-
- if (rc == 0) {
- /* set lock data in case this is a new lock */
+ /* set lock data in case this is a new lock */
+ if (!rc)
ll_set_lock_data(sbi->ll_md_exp, inode, &it, NULL);
- lockh.cookie = it.it_lock_handle;
- rc = ll_layout_lock_set(&lockh, mode, inode);
- if (rc == -EAGAIN)
- goto again;
- }
+
+ ll_intent_drop_lock(&it);
RETURN(rc);
}
{
struct ll_inode_info *lli = ll_i2info(inode);
struct ll_sb_info *sbi = ll_i2sbi(inode);
+ struct lustre_handle lockh;
+ struct layout_intent intent = {
+ .li_opc = LAYOUT_INTENT_ACCESS,
+ };
+ enum ldlm_mode mode;
int rc;
ENTRY;
/* take layout lock mutex to enqueue layout lock exclusively. */
mutex_lock(&lli->lli_layout_mutex);
- rc = ll_layout_refresh_locked(inode);
- if (rc < 0)
- GOTO(out, rc);
+ while (1) {
+ /* mostly layout lock is caching on the local side, so try to
+ * match it before grabbing layout lock mutex. */
+ mode = ll_take_md_lock(inode, MDS_INODELOCK_LAYOUT, &lockh, 0,
+ LCK_CR | LCK_CW | LCK_PR | LCK_PW);
+ if (mode != 0) { /* hit cached lock */
+ rc = ll_layout_lock_set(&lockh, mode, inode);
+ if (rc == -EAGAIN)
+ continue;
+ break;
+ }
- *gen = ll_layout_version_get(lli);
-out:
+ rc = ll_layout_intent(inode, &intent);
+ if (rc != 0)
+ break;
+ }
+
+ if (rc == 0)
+ *gen = ll_layout_version_get(lli);
mutex_unlock(&lli->lli_layout_mutex);
RETURN(rc);
}
/**
+ * Issue layout intent RPC indicating where in a file an IO is about to write.
+ *
+ * \param[in] inode file inode.
+ * \param[in] start start offset of fille in bytes where an IO is about to
+ * write.
+ * \param[in] end exclusive end offset in bytes of the write range.
+ *
+ * \retval 0 on success
+ * \retval < 0 error code
+ */
+int ll_layout_write_intent(struct inode *inode, __u64 start, __u64 end)
+{
+ struct layout_intent intent = {
+ .li_opc = LAYOUT_INTENT_WRITE,
+ .li_start = start,
+ .li_end = end,
+ };
+ int rc;
+ ENTRY;
+
+ rc = ll_layout_intent(inode, &intent);
+
+ RETURN(rc);
+}
+
+/**
* This function send a restore request to the MDT
*/
int ll_layout_restore(struct inode *inode, loff_t offset, __u64 length)
int ll_layout_conf(struct inode *inode, const struct cl_object_conf *conf);
int ll_layout_refresh(struct inode *inode, __u32 *gen);
int ll_layout_restore(struct inode *inode, loff_t start, __u64 length);
+int ll_layout_write_intent(struct inode *inode, __u64 start, __u64 end);
int ll_xattr_init(void);
void ll_xattr_fini(void);
struct cl_object *obj = io->ci_obj;
struct vvp_io *vio = cl2vvp_io(env, ios);
struct inode *inode = vvp_object_inode(obj);
+ int rc;
CLOBINVRNT(env, obj, vvp_object_invariant(obj));
CDEBUG(D_VFSTRACE, DFID" ignore/verify layout %d/%d, layout version %d "
- "restore needed %d\n",
+ "need write layout %d, restore needed %d\n",
PFID(lu_object_fid(&obj->co_lu)),
io->ci_ignore_layout, io->ci_verify_layout,
- vio->vui_layout_gen, io->ci_restore_needed);
+ vio->vui_layout_gen, io->ci_need_write_intent,
+ io->ci_restore_needed);
if (io->ci_restore_needed) {
- int rc;
-
/* file was detected release, we need to restore it
* before finishing the io
*/
}
}
+ /**
+ * dynamic layout change needed, send layout intent
+ * RPC.
+ */
+ if (io->ci_need_write_intent) {
+ loff_t start = 0;
+ loff_t end = 0;
+
+ LASSERT(io->ci_type == CIT_WRITE || cl_io_is_trunc(io));
+
+ io->ci_need_write_intent = 0;
+
+ if (io->ci_type == CIT_WRITE) {
+ start = io->u.ci_rw.crw_pos;
+ end = io->u.ci_rw.crw_pos + io->u.ci_rw.crw_count;
+ } else {
+ end = io->u.ci_setattr.sa_attr.lvb_size;
+ }
+
+ rc = ll_layout_write_intent(inode, start, end);
+ io->ci_result = rc;
+ if (!rc)
+ io->ci_need_restart = 1;
+ }
+
if (!io->ci_ignore_layout && io->ci_verify_layout) {
__u32 gen = 0;
unsigned int i;
size_t lsme_size;
+ if (!lsme_inited(lsme) ||
+ lsme->lsme_pattern & LOV_PATTERN_F_RELEASED)
+ stripe_count = 0;
for (i = 0; i < stripe_count; i++)
OBD_SLAB_FREE_PTR(lsme->lsme_oinfo[i], lov_oinfo_slab);
*/
static struct lov_stripe_md_entry *
lsme_unpack(struct lov_obd *lov, struct lov_mds_md *lmm, size_t buf_size,
- const char *pool_name, struct lov_ost_data_v1 *objects,
+ const char *pool_name, bool inited, struct lov_ost_data_v1 *objects,
loff_t *maxbytes)
{
struct lov_stripe_md_entry *lsme;
RETURN(ERR_PTR(-EINVAL));
pattern = le32_to_cpu(lmm->lmm_pattern);
- if (pattern & LOV_PATTERN_F_RELEASED)
+ if (pattern & LOV_PATTERN_F_RELEASED || !inited)
stripe_count = 0;
else
stripe_count = le16_to_cpu(lmm->lmm_stripe_count);
lsme->lsme_magic = magic;
lsme->lsme_pattern = pattern;
+ lsme->lsme_flags = 0;
lsme->lsme_stripe_size = le32_to_cpu(lmm->lmm_stripe_size);
- lsme->lsme_stripe_count = stripe_count;
+ /* preserve the possible -1 stripe count for uninstantiated component */
+ lsme->lsme_stripe_count = le16_to_cpu(lmm->lmm_stripe_count);
lsme->lsme_layout_gen = le16_to_cpu(lmm->lmm_layout_gen);
if (pool_name != NULL) {
pattern = le32_to_cpu(lmm->lmm_pattern);
- lsme = lsme_unpack(lov, lmm, buf_size, pool_name, objects, &maxbytes);
+ lsme = lsme_unpack(lov, lmm, buf_size, pool_name, true, objects,
+ &maxbytes);
if (IS_ERR(lsme))
RETURN(ERR_CAST(lsme));
+ lsme->lsme_flags = LCME_FL_INIT;
lsme->lsme_extent.e_start = 0;
lsme->lsme_extent.e_end = LUSTRE_EOF;
static struct lov_stripe_md_entry *
lsme_unpack_comp(struct lov_obd *lov, struct lov_mds_md *lmm,
- size_t lmm_buf_size, loff_t *maxbytes)
+ size_t lmm_buf_size, bool inited, loff_t *maxbytes)
{
unsigned int magic;
unsigned int stripe_count;
stripe_count = le16_to_cpu(lmm->lmm_stripe_count);
if (stripe_count == 0)
RETURN(ERR_PTR(-EINVAL));
+ /* un-instantiated lmm contains no ost id info, i.e. lov_ost_data_v1 */
+ if (!inited)
+ stripe_count = 0;
magic = le32_to_cpu(lmm->lmm_magic);
if (magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3)
if (magic == LOV_MAGIC_V1) {
return lsme_unpack(lov, lmm, lmm_buf_size, NULL,
- lmm->lmm_objects, maxbytes);
+ inited, lmm->lmm_objects, maxbytes);
} else {
struct lov_mds_md_v3 *lmm3 = (struct lov_mds_md_v3 *)lmm;
return lsme_unpack(lov, lmm, lmm_buf_size, lmm3->lmm_pool_name,
- lmm3->lmm_objects, maxbytes);
+ inited, lmm3->lmm_objects, maxbytes);
}
}
blob = (char *)lcm + blob_offset;
lsme = lsme_unpack_comp(lov, blob, blob_size,
+ le32_to_cpu(lcme->lcme_flags) &
+ LCME_FL_INIT,
(i == entry_count - 1) ? &maxbytes :
NULL);
if (IS_ERR(lsme))
lsm->lsm_entries[i] = lsme;
lsme->lsme_id = le32_to_cpu(lcme->lcme_id);
+ lsme->lsme_flags = le32_to_cpu(lcme->lcme_flags);
lu_extent_le_to_cpu(&lsme->lsme_extent, &lcme->lcme_extent);
if (i == entry_count - 1) {
void dump_lsm(unsigned int level, const struct lov_stripe_md *lsm)
{
- int i;
+ int i, j;
CDEBUG(level, "lsm %p, objid "DOSTID", maxbytes %#llx, magic 0x%08X, "
"refc: %d, entry: %u, layout_gen %u\n",
for (i = 0; i < lsm->lsm_entry_count; i++) {
struct lov_stripe_md_entry *lse = lsm->lsm_entries[i];
- CDEBUG(level,
- DEXT ": id: %u, magic 0x%08X, stripe count %u, "
- "size %u, layout_gen %u, pool: ["LOV_POOLNAMEF"]\n",
- PEXT(&lse->lsme_extent), lse->lsme_id, lse->lsme_magic,
+ CDEBUG(level, DEXT ": id: %u, flags: %x, "
+ "magic 0x%08X, layout_gen %u, "
+ "stripe count %u, sstripe size %u, "
+ "pool: ["LOV_POOLNAMEF"]\n",
+ PEXT(&lse->lsme_extent), lse->lsme_id, lse->lsme_flags,
+ lse->lsme_magic, lse->lsme_layout_gen,
lse->lsme_stripe_count, lse->lsme_stripe_size,
- lse->lsme_layout_gen, lse->lsme_pool_name);
+ lse->lsme_pool_name);
+ if (!lsme_inited(lse) ||
+ lse->lsme_pattern & LOV_PATTERN_F_RELEASED)
+ break;
+ for (j = 0; j < lse->lsme_stripe_count; j++) {
+ CDEBUG(level, " oinfo:%p: ostid: "DOSTID
+ " ost idx: %d gen: %d\n",
+ lse->lsme_oinfo[j],
+ POSTID(&lse->lsme_oinfo[j]->loi_oi),
+ lse->lsme_oinfo[j]->loi_ost_idx,
+ lse->lsme_oinfo[j]->loi_ost_gen);
+ }
}
}
struct lu_extent lsme_extent;
u32 lsme_id;
u32 lsme_magic;
+ u32 lsme_flags;
u32 lsme_pattern;
u32 lsme_stripe_size;
u16 lsme_stripe_count;
struct lov_oinfo *lsme_oinfo[];
};
+static inline void copy_lsm_entry(struct lov_stripe_md_entry *dst,
+ struct lov_stripe_md_entry *src)
+{
+ unsigned i;
+
+ for (i = 0; i < src->lsme_stripe_count; i++)
+ *dst->lsme_oinfo[i] = *src->lsme_oinfo[i];
+ memcpy(dst, src, offsetof(typeof(*src), lsme_oinfo));
+}
+
struct lov_stripe_md {
atomic_t lsm_refc;
spinlock_t lsm_lock;
ol->ol_comp_id = 0;
}
}
+
+static inline bool lsme_inited(const struct lov_stripe_md_entry *lsme)
+{
+ return lsme->lsme_flags & LCME_FL_INIT;
+}
+
+static inline bool lsm_entry_inited(const struct lov_stripe_md *lsm, int index)
+{
+ return lsme_inited(lsm->lsm_entries[index]);
+}
#endif
u64 end;
int stripe;
+ CDEBUG(D_VFSTRACE, "component[%d] flags %#x\n",
+ index, lsm->lsm_entries[index]->lsme_flags);
+ if (!lsm_entry_inited(lsm, index))
+ break;
+
index++;
if (!lu_extent_is_overlapped(&ext, &le->lle_extent))
continue;
{
struct lov_io *lio = cl2lov_io(env, ios);
struct cl_io *io = ios->cis_io;
+ struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
struct lov_stripe_md_entry *lse;
loff_t start = io->u.ci_rw.crw_pos;
loff_t next;
if (cl_io_is_append(io))
RETURN(lov_io_iter_init(env, ios));
- index = lov_lsm_entry(lio->lis_object->lo_lsm, io->u.ci_rw.crw_pos);
+ index = lov_lsm_entry(lsm, io->u.ci_rw.crw_pos);
if (index < 0) { /* non-existing layout component */
if (io->ci_type == CIT_READ) {
/* TODO: it needs to detect the next component and
if (next <= start * ssize)
next = ~0ull;
- LASSERT(io->u.ci_rw.crw_pos >= lse->lsme_extent.e_start);
+ LASSERTF(io->u.ci_rw.crw_pos >= lse->lsme_extent.e_start,
+ "pos %lld, [%lld, %lld)\n", io->u.ci_rw.crw_pos,
+ lse->lsme_extent.e_start, lse->lsme_extent.e_end);
next = min_t(__u64, next, lse->lsme_extent.e_end);
next = min_t(loff_t, next, lio->lis_io_endpos);
(__u64)start, lio->lis_pos, lio->lis_endpos,
(__u64)lio->lis_io_endpos, io->u.ci_rw.crw_count);
+ index = lov_lsm_entry(lsm, lio->lis_endpos - 1);
+ if (index > 0 && !lsm_entry_inited(lsm, index)) {
+ io->ci_need_write_intent = 1;
+ RETURN(io->ci_result = -ENODATA);
+ }
+
/*
* XXX The following call should be optimized: we know, that
* [lio->lis_pos, lio->lis_endpos) intersects with exactly one stripe.
RETURN(lov_io_iter_init(env, ios));
}
+static int lov_io_setattr_iter_init(const struct lu_env *env,
+ const struct cl_io_slice *ios)
+{
+ struct lov_io *lio = cl2lov_io(env, ios);
+ struct cl_io *io = ios->cis_io;
+ struct lov_stripe_md *lsm = lio->lis_object->lo_lsm;
+ int index;
+ ENTRY;
+
+ if (cl_io_is_trunc(io) && lio->lis_pos) {
+ index = lov_lsm_entry(lsm, lio->lis_pos - 1);
+ if (index > 0 && !lsm_entry_inited(lsm, index)) {
+ io->ci_need_write_intent = 1;
+ RETURN(io->ci_result = -ENODATA);
+ }
+ }
+
+ RETURN(lov_io_iter_init(env, ios));
+}
+
static int lov_io_call(const struct lu_env *env, struct lov_io *lio,
int (*iofunc)(const struct lu_env *, struct cl_io *))
{
offset = cl_offset(obj, start);
index = lov_lsm_entry(loo->lo_lsm, offset);
- if (index < 0)
+ if (index < 0 || !lsm_entry_inited(loo->lo_lsm, index))
RETURN(-ENODATA);
stripe = lov_stripe_number(loo->lo_lsm, index, offset);
.cio_start = lov_io_start,
.cio_end = lov_io_end
},
- [CIT_SETATTR] = {
- .cio_fini = lov_io_fini,
- .cio_iter_init = lov_io_iter_init,
- .cio_iter_fini = lov_io_iter_fini,
- .cio_lock = lov_io_lock,
- .cio_unlock = lov_io_unlock,
- .cio_start = lov_io_start,
- .cio_end = lov_io_end
- },
+ [CIT_SETATTR] = {
+ .cio_fini = lov_io_fini,
+ .cio_iter_init = lov_io_setattr_iter_init,
+ .cio_iter_fini = lov_io_iter_fini,
+ .cio_lock = lov_io_lock,
+ .cio_unlock = lov_io_unlock,
+ .cio_start = lov_io_start,
+ .cio_end = lov_io_end
+ },
[CIT_DATA_VERSION] = {
.cio_fini = lov_io_fini,
.cio_iter_init = lov_io_iter_init,
nr = 0;
for (index = lov_lsm_entry(lov->lo_lsm, ext.e_start);
- index != -1 && index < lov->lo_lsm->lsm_entry_count; index++) {
+ index >= 0 && index < lov->lo_lsm->lsm_entry_count; index++) {
struct lov_layout_raid0 *r0 = lov_r0(lov, index);
/* assume lsm entries are sorted. */
nr++;
}
}
- if (nr == 0)
- RETURN(ERR_PTR(-EINVAL));
+ /**
+ * Aggressive lock request (from cl_setattr_ost) which asks for
+ * [eof, -1) lock, could come across uninstantiated layout extent,
+ * hence a 0 nr is possible.
+ */
OBD_ALLOC_LARGE(lovlck, offsetof(struct lov_lock, lls_sub[nr]));
if (lovlck == NULL)
lovlck->lls_nr = nr;
nr = 0;
for (index = lov_lsm_entry(lov->lo_lsm, ext.e_start);
- index < lov->lo_lsm->lsm_entry_count; index++) {
+ index >= 0 && index < lov->lo_lsm->lsm_entry_count; index++) {
struct lov_layout_raid0 *r0 = lov_r0(lov, index);
/* assume lsm entries are sorted. */
union lov_layout_state *state);
void (*llo_fini)(const struct lu_env *env, struct lov_object *lov,
union lov_layout_state *state);
- void (*llo_install)(const struct lu_env *env, struct lov_object *lov,
- union lov_layout_state *state);
int (*llo_print)(const struct lu_env *env, void *cookie,
lu_printer_t p, const struct lu_object *o);
int (*llo_page_init)(const struct lu_env *env, struct cl_object *obj,
* Lov object layout operations.
*
*/
-
-static void lov_install_empty(const struct lu_env *env,
- struct lov_object *lov,
- union lov_layout_state *state)
-{
- /*
- * File without objects.
- */
-}
-
static int lov_init_empty(const struct lu_env *env, struct lov_device *dev,
struct lov_object *lov, struct lov_stripe_md *lsm,
const struct cl_object_conf *conf,
return 0;
}
-static void lov_install_composite(const struct lu_env *env,
- struct lov_object *lov,
- union lov_layout_state *state)
-{
-}
-
static struct cl_object *lov_sub_find(const struct lu_env *env,
struct cl_device *dev,
const struct lu_fid *fid,
struct lov_layout_entry *le = &comp->lo_entries[i];
le->lle_extent = lsm->lsm_entries[i]->lsme_extent;
+ /**
+ * If the component has not been init-ed on MDS side, for
+ * PFL layout, we'd know that the components beyond this one
+ * will be dynamically init-ed later on file write/trunc ops.
+ */
+ if (!lsm_entry_inited(lsm, i))
+ break;
+
result = lov_init_raid0(env, dev, lov, i, &le->lle_raid0);
if (result < 0)
break;
for (i = 0; i < lsm->lsm_entry_count; i++) {
struct lov_stripe_md_entry *lse = lsm->lsm_entries[i];
- (*p)(env, cookie, DEXT ": { 0x%08X, %u, %u, %u, %u }\n",
+ (*p)(env, cookie, DEXT ": { 0x%08X, %u, %u, %#x, %u, %u }\n",
PEXT(&lse->lsme_extent), lse->lsme_magic,
- lse->lsme_id, lse->lsme_layout_gen,
+ lse->lsme_id, lse->lsme_layout_gen, lse->lsme_flags,
lse->lsme_stripe_count, lse->lsme_stripe_size);
lov_print_raid0(env, cookie, p, lov_r0(lov, i));
}
struct lov_layout_raid0 *r0 = &entry->lle_raid0;
struct cl_attr *lov_attr = &r0->lo_attr;
+ /* PFL: This component has not been init-ed. */
+ if (!lsm_entry_inited(lov->lo_lsm, index))
+ break;
+
result = lov_attr_get_raid0(env, lov, index, r0);
if (result != 0)
break;
.llo_init = lov_init_empty,
.llo_delete = lov_delete_empty,
.llo_fini = lov_fini_empty,
- .llo_install = lov_install_empty,
.llo_print = lov_print_empty,
.llo_page_init = lov_page_init_empty,
.llo_lock_init = lov_lock_init_empty,
.llo_init = lov_init_released,
.llo_delete = lov_delete_empty,
.llo_fini = lov_fini_released,
- .llo_install = lov_install_empty,
.llo_print = lov_print_released,
.llo_page_init = lov_page_init_empty,
.llo_lock_init = lov_lock_init_empty,
.llo_init = lov_init_composite,
.llo_delete = lov_delete_composite,
.llo_fini = lov_fini_composite,
- .llo_install = lov_install_composite,
.llo_print = lov_print_composite,
.llo_page_init = lov_page_init_composite,
.llo_lock_init = lov_lock_init_composite,
GOTO(out, rc);
}
- new_ops->llo_install(env, lov, state);
lov->lo_type = llt;
out:
if (rc != 0)
GOTO(out_lsm, rc);
- ops->llo_install(env, lov, set);
-
out_lsm:
lov_lsm_put(lsm);
conf->u.coc_layout.lb_len);
if (IS_ERR(lsm))
RETURN(PTR_ERR(lsm));
+ dump_lsm(D_INODE, lsm);
}
lov_conf_lock(lov);
for (entry = start_entry; entry <= end_entry; entry++) {
lsme = lsm->lsm_entries[entry];
+ if (!lsme_inited(lsme))
+ break;
+
if (entry == start_entry)
fs.fs_ext.e_start = whole_start;
else
lsm->lsm_entries[i];
int j;
+ if (!lsme_inited(lse))
+ break;
+
for (j = 0; j < lse->lsme_stripe_count; j++) {
struct lov_oinfo *loi =
lse->lsme_oinfo[j];
lmm_objects = lmmv1->lmm_objects;
}
+ if (lsm->lsm_is_released)
+ RETURN(lmm_size);
+
for (i = 0; i < lsm->lsm_entries[0]->lsme_stripe_count; i++) {
struct lov_oinfo *loi = lsm->lsm_entries[0]->lsme_oinfo[i];
for (entry = 0; entry < lsm->lsm_entry_count; entry++) {
struct lov_stripe_md_entry *lsme;
struct lov_mds_md *lmm;
+ __u16 stripecnt;
lsme = lsm->lsm_entries[entry];
lcme = &lcmv1->lcm_entries[entry];
lcme->lcme_id = cpu_to_le32(lsme->lsme_id);
+ lcme->lcme_flags = cpu_to_le32(lsme->lsme_flags);
lcme->lcme_extent.e_start =
cpu_to_le64(lsme->lsme_extent.e_start);
lcme->lcme_extent.e_end =
((struct lov_mds_md_v1 *)lmm)->lmm_objects;
}
- for (i = 0; i < lsme->lsme_stripe_count; i++) {
+ if (lsme_inited(lsme) &&
+ !(lsme->lsme_pattern & LOV_PATTERN_F_RELEASED))
+ stripecnt = lsme->lsme_stripe_count;
+ else
+ stripecnt = 0;
+
+ for (i = 0; i < stripecnt; i++) {
struct lov_oinfo *loi = lsme->lsme_oinfo[i];
ostid_cpu_to_le(&loi->loi_oi, &lmm_objects[i].l_ost_oi);
cpu_to_le32(loi->loi_ost_idx);
}
- size = lov_mds_md_size(lsme->lsme_stripe_count,
- lsme->lsme_magic);
+ size = lov_mds_md_size(stripecnt, lsme->lsme_magic);
lcme->lcme_size = cpu_to_le32(size);
offset += size;
} /* for each layout component */
offset = cl_offset(obj, index);
entry = lov_lsm_entry(loo->lo_lsm, offset);
- if (entry < 0) {
+ if (entry < 0 || !lsm_entry_inited(loo->lo_lsm, entry)) {
/* non-existing layout component */
lov_page_init_empty(env, obj, page, index);
RETURN(0);
int mdc_resource_get_unused(struct obd_export *exp, const struct lu_fid *fid,
struct list_head *cancels, enum ldlm_mode mode,
__u64 bits);
+int mdc_save_lovea(struct ptlrpc_request *req,
+ const struct req_msg_field *field,
+ void *data, u32 size);
/* mdc/mdc_request.c */
int mdc_fid_alloc(const struct lu_env *env, struct obd_export *exp,
struct lu_fid *fid, struct md_op_data *op_data);
* original open if the MDS crashed just when this client also OOM'd)
* but this is incredibly unlikely, and questionable whether the client
* could do MDS recovery under OOM anyways... */
-static void mdc_realloc_openmsg(struct ptlrpc_request *req,
- struct mdt_body *body)
+int mdc_save_lovea(struct ptlrpc_request *req,
+ const struct req_msg_field *field,
+ void *data, u32 size)
{
- int rc;
+ struct req_capsule *pill = &req->rq_pill;
+ void *lmm;
+ int rc = 0;
- /* FIXME: remove this explicit offset. */
- rc = sptlrpc_cli_enlarge_reqbuf(req, DLM_INTENT_REC_OFF + 4,
- body->mbo_eadatasize);
- if (rc) {
- CERROR("Can't enlarge segment %d size to %d\n",
- DLM_INTENT_REC_OFF + 4, body->mbo_eadatasize);
- body->mbo_valid &= ~OBD_MD_FLEASIZE;
- body->mbo_eadatasize = 0;
+ if (req_capsule_get_size(pill, field, RCL_CLIENT) < size) {
+ rc = sptlrpc_cli_enlarge_reqbuf(req, field, size);
+ if (rc) {
+ CERROR("%s: Can't enlarge ea size to %d: rc = %d\n",
+ req->rq_export->exp_obd->obd_name,
+ size, rc);
+ return rc;
+ }
+ } else {
+ req_capsule_shrink(pill, field, size, RCL_CLIENT);
}
+
+ req_capsule_set_size(pill, field, RCL_CLIENT, size);
+ lmm = req_capsule_client_get(pill, field);
+ if (lmm)
+ memcpy(lmm, data, size);
+
+ return rc;
}
static struct ptlrpc_request *
static struct ptlrpc_request *mdc_intent_layout_pack(struct obd_export *exp,
struct lookup_intent *it,
- struct md_op_data *unused)
+ struct md_op_data *op_data)
{
struct obd_device *obd = class_exp2obd(exp);
struct ptlrpc_request *req;
/* pack the layout intent request */
layout = req_capsule_client_get(&req->rq_pill, &RMF_LAYOUT_INTENT);
- /* LAYOUT_INTENT_ACCESS is generic, specific operation will be
- * set for replication */
- layout->li_opc = LAYOUT_INTENT_ACCESS;
+ LASSERT(op_data->op_data != NULL);
+ LASSERT(op_data->op_data_size == sizeof(*layout));
+ memcpy(layout, op_data->op_data, sizeof(*layout));
req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
obd->u.cli.cl_default_mds_easize);
* (for example error one).
*/
if ((it->it_op & IT_OPEN) && req->rq_replay) {
- void *lmm;
- if (req_capsule_get_size(pill, &RMF_EADATA,
- RCL_CLIENT) <
- body->mbo_eadatasize)
- mdc_realloc_openmsg(req, body);
- else
- req_capsule_shrink(pill, &RMF_EADATA,
- body->mbo_eadatasize,
- RCL_CLIENT);
-
- req_capsule_set_size(pill, &RMF_EADATA,
- RCL_CLIENT,
- body->mbo_eadatasize);
-
- lmm = req_capsule_client_get(pill, &RMF_EADATA);
- if (lmm)
- memcpy(lmm, eadata,
- body->mbo_eadatasize);
+ rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
+ body->mbo_eadatasize);
+ if (rc) {
+ body->mbo_valid &= ~OBD_MD_FLEASIZE;
+ body->mbo_eadatasize = 0;
+ rc = 0;
+ }
}
}
- } else if (it->it_op & IT_LAYOUT) {
+ } else if (it->it_op & IT_LAYOUT) {
/* maybe the lock was granted right away and layout
* is packed into RMF_DLM_LVB of req */
lvb_len = req_capsule_get_size(pill, &RMF_DLM_LVB, RCL_SERVER);
&RMF_DLM_LVB, lvb_len);
if (lvb_data == NULL)
RETURN(-EPROTO);
+
+ /**
+ * save replied layout data to the request buffer for
+ * recovery consideration (lest MDS reinitialize
+ * another set of OST objects).
+ */
+ if (req->rq_transno)
+ (void)mdc_save_lovea(req, &RMF_EADATA, lvb_data,
+ lvb_len);
}
}
case IT_READDIR:
policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
break;
- case IT_LAYOUT:
- policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
- break;
- default:
- policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
- break;
- }
+ case IT_LAYOUT:
+ policy.l_inodebits.bits = MDS_INODELOCK_LAYOUT;
+ break;
+ default:
+ policy.l_inodebits.bits = MDS_INODELOCK_LOOKUP;
+ break;
+ }
mode = mdc_lock_match(exp, LDLM_FL_BLOCK_GRANTED, fid,
LDLM_IBITS, &policy,
GOTO(out, rc = -EPROTO);
if (body->mbo_valid & OBD_MD_FLEASIZE) {
- void *eadata, *lmm;
+ void *eadata;
eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
body->mbo_eadatasize);
if (eadata == NULL)
GOTO(out, rc = -EPROTO);
- if (req_capsule_get_size(pill, &RMF_EADATA,
- RCL_CLIENT) <
- body->mbo_eadatasize) {
- rc = sptlrpc_cli_enlarge_reqbuf(req, 4,
- body->mbo_eadatasize);
- if (rc)
- GOTO(out, rc = -ENOMEM);
- } else {
- req_capsule_shrink(pill, &RMF_EADATA,
- body->mbo_eadatasize,
- RCL_CLIENT);
- }
-
- req_capsule_set_size(pill, &RMF_EADATA, RCL_CLIENT,
- body->mbo_eadatasize);
-
- lmm = req_capsule_client_get(pill, &RMF_EADATA);
- if (lmm)
- memcpy(lmm, eadata, body->mbo_eadatasize);
+ rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
+ body->mbo_eadatasize);
+ if (rc)
+ GOTO(out, rc);
}
}
out:
}
/**
+ * Handler of layout intent RPC requiring the layout modification
+ *
+ * \param info [in] thread environment
+ * \param obj [in] object
+ * \param layout [in] layout intent
+ *
+ * \retval 0 on success
+ * \retval < 0 error code
+ */
+static int mdt_layout_change(struct mdt_thread_info *info,
+ struct mdt_object *obj,
+ struct layout_intent *layout)
+{
+ /* XXX: to do */
+ return 0;
+}
+
+/**
* Exchange MOF_LOV_CREATED flags between two objects after a
* layout swap. No assumption is made on whether o1 or o2 have
* created objects or not.
struct layout_intent *layout;
struct lu_fid *fid;
struct mdt_object *obj = NULL;
+ bool layout_change = false;
int layout_size = 0;
int rc = 0;
ENTRY;
if (layout == NULL)
RETURN(-EPROTO);
- if (layout->li_opc != LAYOUT_INTENT_ACCESS) {
+ switch (layout->li_opc) {
+ case LAYOUT_INTENT_TRUNC:
+ case LAYOUT_INTENT_WRITE:
+ layout_change = true;
+ break;
+ case LAYOUT_INTENT_ACCESS:
+ break;
+ case LAYOUT_INTENT_READ:
+ case LAYOUT_INTENT_GLIMPSE:
+ case LAYOUT_INTENT_RELEASE:
+ case LAYOUT_INTENT_RESTORE:
CERROR("%s: Unsupported layout intent opc %d\n",
mdt_obd_name(info->mti_mdt), layout->li_opc);
- RETURN(-EINVAL);
+ rc = -ENOTSUPP;
+ break;
+ default:
+ CERROR("%s: Unknown layout intent opc %d\n",
+ mdt_obd_name(info->mti_mdt), layout->li_opc);
+ rc = -EINVAL;
+ break;
}
+ if (rc < 0)
+ RETURN(rc);
fid = &info->mti_tmp_fid2;
fid_extract_from_res_name(fid, &(*lockp)->l_resource->lr_name);
req_capsule_set_size(info->mti_pill, &RMF_DLM_LVB, RCL_SERVER,
layout_size);
rc = req_capsule_server_pack(info->mti_pill);
- GOTO(out_obj, rc);
+ if (rc)
+ GOTO(out_obj, rc);
+ if (layout_change) {
+ rc = mdt_layout_change(info, obj, layout);
+ if (rc)
+ GOTO(out_obj, rc);
+ }
out_obj:
mdt_object_put(info->mti_env, obj);
return avail;
}
+static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
+{
+ if (it != NULL &&
+ (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
+ it->it_op == IT_READDIR ||
+ (it->it_op == IT_LAYOUT && !(it->it_flags & FMODE_WRITE))))
+ return true;
+ return false;
+}
+
/* Get a modify RPC slot from the obd client @cli according
* to the kind of operation @opc that is going to be sent
* and the intent @it of the operation if it applies.
/* read-only metadata RPCs don't consume a slot on MDT
* for reply reconstruction
*/
- if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
- it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
+ if (obd_skip_mod_rpc_slot(it))
return 0;
if (opc == MDS_CLOSE)
{
bool close_req = false;
- if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
- it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
+ if (obd_skip_mod_rpc_slot(it))
return;
if (opc == MDS_CLOSE)
* Returns the PTLRPC request or reply (\a loc) buffer offset of a \a pill
* corresponding to the given RMF (\a field).
*/
-static __u32 __req_capsule_offset(const struct req_capsule *pill,
- const struct req_msg_field *field,
- enum req_location loc)
+__u32 __req_capsule_offset(const struct req_capsule *pill,
+ const struct req_msg_field *field,
+ enum req_location loc)
{
unsigned int offset;
- offset = field->rmf_offset[pill->rc_fmt->rf_idx][loc];
- LASSERTF(offset > 0, "%s:%s, off=%d, loc=%d\n",
- pill->rc_fmt->rf_name,
- field->rmf_name, offset, loc);
- offset --;
+ offset = field->rmf_offset[pill->rc_fmt->rf_idx][loc];
+ LASSERTF(offset > 0, "%s:%s, off=%d, loc=%d\n",
+ pill->rc_fmt->rf_name,
+ field->rmf_name, offset, loc);
+ offset--;
LASSERT(offset < REQ_MAX_FIELD_NR);
return offset;
int sptlrpc_init(void);
void sptlrpc_fini(void);
+/* layout.c */
+__u32 __req_capsule_offset(const struct req_capsule *pill,
+ const struct req_msg_field *field,
+ enum req_location loc);
+
static inline bool ptlrpc_recoverable_error(int rc)
{
return (rc == -ENOTCONN || rc == -ENODEV);
* so caller should refresh its local pointers if needed.
*/
int sptlrpc_cli_enlarge_reqbuf(struct ptlrpc_request *req,
- int segment, int newsize)
-{
- struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
- struct ptlrpc_sec_cops *cops;
- struct lustre_msg *msg = req->rq_reqmsg;
+ const struct req_msg_field *field,
+ int newsize)
+{
+ struct req_capsule *pill = &req->rq_pill;
+ struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
+ struct ptlrpc_sec_cops *cops;
+ struct lustre_msg *msg = req->rq_reqmsg;
+ int segment = __req_capsule_offset(pill, field, RCL_CLIENT);
LASSERT(ctx);
LASSERT(msg);
$LFS setstripe -E 1m -S 1M -c 1 -E -1 -c 1 $comp_file ||
error "Create $comp_file failed"
+ #instantiate all components, so that objs are allocted
+ dd if=/dev/zero of=$comp_file bs=1k count=1 seek=1k
+
local ost_idx1=$($LFS getstripe -I 1 -i $comp_file)
local ost_idx2=$($LFS getstripe -I 2 -i $comp_file)
$LFS setstripe -E 1m -S 1m -o 0 -E -1 -o 0 $comp_file ||
error "Create $comp_file failed"
+ #instantiate all components, so that objs are allocted
+ dd if=/dev/zero of=$comp_file bs=1k count=1 seek=1k
+
local ost_idx1=$($LFS getstripe -I 1 -i $comp_file)
local ost_idx2=$($LFS getstripe -I 2 -i $comp_file)
dd if=/dev/zero of=$comp_file bs=1M count=2 > /dev/null 2>&1 &&
error "Write beyond component should fail"
- dd if=$comp_file of=/dev/null bs=1M count=2 > /dev/null 2>&1 &&
- error "Read beyond component should fail"
+ dd if=$comp_file of=/dev/null bs=1M count=2 > /dev/null 2>&1 ||
+ error "Read beyond component should short read, not fail"
$LFS setstripe --component-add -E 2M -c 1 $comp_file ||
error "Add component to $comp_file failed"
$LFS setstripe -E 1M -E 16M -E -1 $comp_file ||
error "Create second $comp_file failed"
+ #instantiate all components, so that objs are allocted
+ dd if=/dev/zero of=$comp_file bs=1k count=1 seek=16k
+
del_comp_and_verify $comp_file "init" 0 0
rm -f $comp_file || error "Delete second $comp_file failed"
}
local comp_cnt=$($LFS getstripe --component-count $comp_file)
[ $comp_cnt -ne 2 ] && error "file $comp_cnt != 2"
+ #instantiate all components, so that objs are allocted
+ dd if=/dev/zero of=$comp_file bs=1k count=1 seek=64k
+
local ost_idx=$($LFS getstripe -I 1 -i $comp_file)
[ $ost_idx -ne 0 ] &&
error "component 1 ost_idx $ost_idx != 0"
LDF_IS_DIR = 0x0001,
LDF_IS_RAW = 0x0002,
LDF_INDENT = 0x0004,
+ LDF_SKIP_OBJS = 0x0008,
};
static void lov_dump_user_lmm_header(struct lov_user_md *lum, char *path,
bool is_dir = flags & LDF_IS_DIR;
bool is_raw = flags & LDF_IS_RAW;
bool indent = flags & LDF_INDENT;
+ bool skip_objs = flags & LDF_SKIP_OBJS;
char *prefix = is_dir ? "" : "lmm_";
char *separator = "";
char *space = indent ? " " : "";
if (verbose & ~VERBOSE_OFFSET)
llapi_printf(LLAPI_MSG_NORMAL, "%s%sstripe_offset: ",
space, prefix);
- if (is_dir)
+ if (is_dir || skip_objs)
llapi_printf(LLAPI_MSG_NORMAL, "%d",
lum->lmm_stripe_offset ==
(typeof(lum->lmm_stripe_offset))(-1) ? -1 :
{
bool is_dir = flags & LDF_IS_DIR;
bool indent = flags & LDF_INDENT;
+ bool skip_objs = flags & LDF_SKIP_OBJS;
int i, obdstripe = (obdindex != OBD_NOT_FOUND) ? 0 : 1;
if (!obdstripe) {
lov_dump_user_lmm_header(lum, path, objects, header, depth, pool_name,
flags);
- if (!is_dir && (header & VERBOSE_OBJID) &&
+ if (!is_dir && !skip_objs && (header & VERBOSE_OBJID) &&
!(lum->lmm_pattern & LOV_PATTERN_F_RELEASED)) {
char *space = " - ";
!(param->fp_comp_flags & entry->lcme_flags))
continue;
+ if (entry->lcme_flags & LCME_FL_INIT)
+ flags &= ~LDF_SKIP_OBJS;
+ else
+ flags |= LDF_SKIP_OBJS;
+
if (param->fp_check_comp_id &&
param->fp_comp_id != entry->lcme_id)
continue;
}
obj_count = llapi_layout_objects_in_lum(lum, lum_size);
- if (obj_count != lum->lmm_stripe_count)
+ if (comp_v1) {
+ if (!(comp_v1->lcm_entries[i].lcme_flags &
+ LCME_FL_INIT) && obj_count != 0)
+ return false;
+ } else if (obj_count != lum->lmm_stripe_count) {
return false;
+ }
}
return true;
}
llc_list);
/* Inherit some attributes from existing component */
- new->llc_pattern = comp->llc_pattern;
new->llc_stripe_size = comp->llc_stripe_size;
new->llc_stripe_count = comp->llc_stripe_count;
if (new->llc_extent.e_end <= last->llc_extent.e_end) {