RETURN(rc);
}
+__u32 mdt_lmm_dom_entry_check(struct lov_mds_md *lmm, int *is_dom_only)
+{
+ struct lov_comp_md_v1 *comp_v1;
+ struct lov_mds_md *v1;
+ __u32 off;
+ __u32 dom_stripesize = 0;
+ int i;
+ bool has_ost_stripes = false;
+
+ ENTRY;
+
+ if (is_dom_only)
+ *is_dom_only = 0;
+
+ if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_COMP_V1)
+ RETURN(0);
+
+ comp_v1 = (struct lov_comp_md_v1 *)lmm;
+ off = le32_to_cpu(comp_v1->lcm_entries[0].lcme_offset);
+ v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
+
+ /* Fast check for DoM entry with no mirroring, should be the first */
+ if (le16_to_cpu(comp_v1->lcm_mirror_count) == 0 &&
+ lov_pattern(le32_to_cpu(v1->lmm_pattern)) != LOV_PATTERN_MDT)
+ RETURN(0);
+
+ /* check all entries otherwise */
+ for (i = 0; i < le16_to_cpu(comp_v1->lcm_entry_count); i++) {
+ struct lov_comp_md_entry_v1 *lcme;
+
+ lcme = &comp_v1->lcm_entries[i];
+ if (!(le32_to_cpu(lcme->lcme_flags) & LCME_FL_INIT))
+ continue;
+
+ off = le32_to_cpu(lcme->lcme_offset);
+ v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
+
+ if (lov_pattern(le32_to_cpu(v1->lmm_pattern)) ==
+ LOV_PATTERN_MDT)
+ dom_stripesize = le32_to_cpu(v1->lmm_stripe_size);
+ else
+ has_ost_stripes = true;
+
+ if (dom_stripesize && has_ost_stripes)
+ RETURN(dom_stripesize);
+ }
+ /* DoM-only case exits here */
+ if (is_dom_only && dom_stripesize)
+ *is_dom_only = 1;
+ RETURN(dom_stripesize);
+}
+
/**
* Pack size attributes into the reply.
*/
{
struct mdt_body *b;
struct md_attr *ma = &info->mti_attr;
- int dom_stripe;
+ __u32 dom_stripe;
bool dom_lock = false;
ENTRY;
!(ma->ma_valid & MA_LOV && ma->ma_lmm != NULL))
RETURN(-ENODATA);
- dom_stripe = mdt_lmm_dom_entry(ma->ma_lmm);
+ dom_stripe = mdt_lmm_dom_stripesize(ma->ma_lmm);
/* no DoM stripe, no size in reply */
- if (dom_stripe == LMM_NO_DOM)
+ if (!dom_stripe)
RETURN(-ENOENT);
if (lustre_handle_is_used(lh)) {
/*
* Data-on-MDT optimization - read data along with OPEN and return it
- * in reply. Do that only if we have both DOM and LAYOUT locks.
+ * in reply when possible.
*/
- if (rc == 0 && op == REINT_OPEN && !req_is_replay(pill->rc_req) &&
- info->mti_attr.ma_lmm != NULL &&
- mdt_lmm_dom_entry(info->mti_attr.ma_lmm) == LMM_DOM_ONLY) {
+ if (rc == 0 && op == REINT_OPEN && !req_is_replay(pill->rc_req))
rc = mdt_dom_read_on_open(info, info->mti_mdt,
&lhc->mlh_reg_lh);
- }
return rc;
}
m->mdt_skip_lfsck = 1;
}
- /* DoM files get IO lock at open by default */
+ /* DoM files get IO lock at open optionally by default */
m->mdt_opts.mo_dom_lock = ALWAYS_DOM_LOCK_ON_OPEN;
/* DoM files are read at open and data is packed in the reply */
m->mdt_opts.mo_dom_read_open = 1;
return exp_connect_flags(exp) & OBD_CONNECT_DIR_STRIPE;
}
-enum {
- LMM_NO_DOM,
- LMM_DOM_ONLY,
- LMM_DOM_OST
-};
+__u32 mdt_lmm_dom_entry_check(struct lov_mds_md *lmm, int *dom_only);
-/* XXX Look into layout in MDT layer. This must be done in LOD. */
-static inline int mdt_lmm_dom_entry(struct lov_mds_md *lmm)
+static inline bool mdt_lmm_dom_only(struct lov_mds_md *lmm)
{
- struct lov_comp_md_v1 *comp_v1;
- struct lov_mds_md *v1;
- __u32 off;
- bool has_dom = false, has_ost = false;
- int i;
-
- if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_COMP_V1)
- return LMM_NO_DOM;
-
- comp_v1 = (struct lov_comp_md_v1 *)lmm;
- off = le32_to_cpu(comp_v1->lcm_entries[0].lcme_offset);
- v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
-
- /* DoM entry is the first entry always */
- if (lov_pattern(le32_to_cpu(v1->lmm_pattern)) != LOV_PATTERN_MDT &&
- le16_to_cpu(comp_v1->lcm_mirror_count) == 0)
- return LMM_NO_DOM;
-
- for (i = 0; i < le16_to_cpu(comp_v1->lcm_entry_count); i++) {
- struct lov_comp_md_entry_v1 *lcme;
-
- lcme = &comp_v1->lcm_entries[i];
- if (!(le32_to_cpu(lcme->lcme_flags) & LCME_FL_INIT))
- continue;
+ int dom_only = 0;
- off = le32_to_cpu(lcme->lcme_offset);
- v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
+ mdt_lmm_dom_entry_check(lmm, &dom_only);
+ return dom_only;
+}
- if (lov_pattern(le32_to_cpu(v1->lmm_pattern)) ==
- LOV_PATTERN_MDT)
- has_dom = true;
- else
- has_ost = true;
- if (has_dom && has_ost)
- return LMM_DOM_OST;
- }
- return has_dom ? LMM_DOM_ONLY : LMM_NO_DOM;
+static inline __u32 mdt_lmm_dom_stripesize(struct lov_mds_md *lmm)
+{
+ return mdt_lmm_dom_entry_check(lmm, NULL);
}
static inline bool mdt_lmm_is_flr(struct lov_mds_md *lmm)
}
mbo = req_capsule_server_get(pill, &RMF_MDT_BODY);
+ if (!(mbo->mbo_valid & OBD_MD_DOM_SIZE))
+ RETURN(0);
+
+ if (!mbo->mbo_dom_size)
+ RETURN(0);
if (lustre_handle_is_used(lh)) {
struct ldlm_lock *lock;
if (!dom_lock || !mdt->mdt_opts.mo_dom_read_open)
RETURN(0);
- if (!(mbo->mbo_valid & OBD_MD_DOM_SIZE))
- RETURN(0);
-
- if (mbo->mbo_dom_size == 0)
- RETURN(0);
-
CDEBUG(D_INFO, "File size %llu, reply sizes %d/%d\n",
mbo->mbo_dom_size, req->rq_reqmsg->lm_repsize, req->rq_replen);
len = req->rq_reqmsg->lm_repsize - req->rq_replen;
/* can fit whole data */
len = mbo->mbo_dom_size;
offset = 0;
- } else {
+ } else if (mbo->mbo_dom_size <
+ mdt_lmm_dom_stripesize(mti->mti_attr.ma_lmm)) {
int tail, pgbits;
/* File tail offset must be aligned with larger page size
len = tail;
offset = mbo->mbo_dom_size - len;
+ } else {
+ /* DOM stripe is fully written, so don't expect its tail
+ * will be used by append.
+ */
+ RETURN(0);
}
+
LASSERT((offset & ~PAGE_MASK) == 0);
rc = req_capsule_server_grow(pill, &RMF_NIOBUF_INLINE,
sizeof(*rnb) + len);
bool try_layout = false;
bool create_layout = false;
int rc = 0;
- int dom_stripes = LMM_NO_DOM;
- bool dom_lock = false;
+ __u32 dom_stripe = 0;
+ unsigned int dom_only = 0;
+ unsigned int dom_lock = 0;
ENTRY;
ma->ma_need & MA_LOV)
try_layout = true;
- /* DoM files can have just MDT stripe or combined MDT + OST
- * stripes.
- * - In the first case the open for read/write will do IO to
- * the MDT stripe and it makes sense to take IO lock in
- * advance along with OPEN even if it is blocking lock.
- * - In the second case it is just size of MDT stripe and it
- * is quite unlikely that client will write into it, though
- * it may read it. So IO lock will be taken optionally if it
- * is non-blocking one.
+ /* DoM files can take IO lock at OPEN when it makes sense,
+ * check if file has DoM stripe and ask for lock if client
+ * no lock on that resource yet.
*/
if (ma->ma_valid & MA_LOV && ma->ma_lmm != NULL)
- dom_stripes = mdt_lmm_dom_entry(ma->ma_lmm);
-
- if (dom_stripes == LMM_DOM_ONLY &&
- info->mti_mdt->mdt_opts.mo_dom_lock > 0 &&
+ dom_stripe = mdt_lmm_dom_entry_check(ma->ma_lmm,
+ &dom_only);
+ /* If only DOM stripe is being used then we can expect IO
+ * to it after OPEN and will return corresponding DOM ibit
+ * using default strategy from mdt_opts.mo_dom_lock.
+ * Otherwise trylock mode is used always and DOM ibit will
+ * be returned optionally.
+ */
+ if (dom_stripe &&
!mdt_dom_client_has_lock(info, mdt_object_fid(obj)))
- dom_lock = true;
+ dom_lock = !dom_only ? TRYLOCK_DOM_ON_OPEN :
+ info->mti_mdt->mdt_opts.mo_dom_lock;
}
if (acq_lease) {
lhc = &info->mti_lh[MDT_LH_LOCAL];
} else if (dom_lock) {
lm = (open_flags & MDS_FMODE_WRITE) ? LCK_PW : LCK_PR;
- if (info->mti_mdt->mdt_opts.mo_dom_lock ==
- TRYLOCK_DOM_ON_OPEN) {
+ if (dom_lock == TRYLOCK_DOM_ON_OPEN) {
trybits |= MDS_INODELOCK_DOM |
MDS_INODELOCK_LAYOUT;
} else {
- /* mo_dom_lock == ALWAYS_DOM_LOCK_ON_OPEN */
+ /* dom_lock == ALWAYS_DOM_LOCK_ON_OPEN */
*ibits = MDS_INODELOCK_DOM;
- if (info->mti_mdt->mdt_opts.mo_dom_read_open) {
+ if (info->mti_mdt->mdt_opts.mo_dom_read_open)
trybits |= MDS_INODELOCK_LAYOUT;
- }
}
}
if (rc)
GOTO(put_source, rc);
- if (ma->ma_valid & MA_LOV &&
- mdt_lmm_dom_entry(ma->ma_lmm) != LMM_NO_DOM)
+ if (ma->ma_valid & MA_LOV && mdt_lmm_dom_stripesize(ma->ma_lmm))
GOTO(put_source, rc = -EOPNOTSUPP);
}
* MDS only updates LSOM of the file if the size or block
* size is being increased or the file is being truncated.
*/
- if (mdt_lmm_dom_entry(info->mti_big_lmm) != LMM_DOM_ONLY &&
+ if (!mdt_lmm_dom_only(info->mti_big_lmm) &&
!(tmp_ma->ma_valid & MA_INODE && tmp_ma->ma_attr.la_nlink == 0)) {
__u64 size;
__u64 blocks;