extern struct req_msg_field RMF_OBD_ID;
extern struct req_msg_field RMF_FID;
extern struct req_msg_field RMF_NIOBUF_REMOTE;
+extern struct req_msg_field RMF_NIOBUF_INLINE;
extern struct req_msg_field RMF_RCS;
extern struct req_msg_field RMF_FIEMAP_KEY;
extern struct req_msg_field RMF_FIEMAP_VAL;
* run-time if a larger observed size is advertised by the MDT. */
__u32 cl_max_mds_easize;
+ /* Data-on-MDT specific value to set larger reply buffer for possible
+ * data read along with open/stat requests. By default it tries to use
+ * unused space in reply buffer.
+ * This value is used to ensure that reply buffer has at least as
+ * much free space as value indicates. That free space is gained from
+ * LOV EA buffer which is small for DoM files and on big systems can
+ * provide up to 32KB of extra space in reply buffer.
+ * Default value is 8K now.
+ */
+ __u32 cl_dom_min_inline_repsize;
+
enum lustre_sec_part cl_sp_me;
enum lustre_sec_part cl_sp_to;
struct sptlrpc_flavor cl_flvr_mgc; /* fixed flavor of mgc->mgs */
RETURN(rc);
}
+static inline int ll_dom_readpage(void *data, struct page *page)
+{
+ struct niobuf_local *lnb = data;
+ void *kaddr;
+
+ kaddr = ll_kmap_atomic(page, KM_USER0);
+ memcpy(kaddr, lnb->lnb_data, lnb->lnb_len);
+ if (lnb->lnb_len < PAGE_SIZE)
+ memset(kaddr + lnb->lnb_len, 0,
+ PAGE_SIZE - lnb->lnb_len);
+ flush_dcache_page(page);
+ SetPageUptodate(page);
+ ll_kunmap_atomic(kaddr, KM_USER0);
+ unlock_page(page);
+
+ return 0;
+}
+
+void ll_dom_finish_open(struct inode *inode, struct ptlrpc_request *req,
+ struct lookup_intent *it)
+{
+ struct ll_inode_info *lli = ll_i2info(inode);
+ struct cl_object *obj = lli->lli_clob;
+ struct address_space *mapping = inode->i_mapping;
+ struct page *vmpage;
+ struct niobuf_remote *rnb;
+ char *data;
+ struct lu_env *env;
+ struct cl_io *io;
+ __u16 refcheck;
+ struct lustre_handle lockh;
+ struct ldlm_lock *lock;
+ unsigned long index, start;
+ struct niobuf_local lnb;
+ int rc;
+ bool dom_lock = false;
+
+ ENTRY;
+
+ if (obj == NULL)
+ RETURN_EXIT;
+
+ if (it->it_lock_mode != 0) {
+ lockh.cookie = it->it_lock_handle;
+ lock = ldlm_handle2lock(&lockh);
+ if (lock != NULL)
+ dom_lock = ldlm_has_dom(lock);
+ LDLM_LOCK_PUT(lock);
+ }
+
+ if (!dom_lock)
+ RETURN_EXIT;
+
+ env = cl_env_get(&refcheck);
+ if (IS_ERR(env))
+ RETURN_EXIT;
+
+ if (!req_capsule_has_field(&req->rq_pill, &RMF_NIOBUF_INLINE,
+ RCL_SERVER))
+ GOTO(out_env, rc = -ENODATA);
+
+ rnb = req_capsule_server_get(&req->rq_pill, &RMF_NIOBUF_INLINE);
+ data = (char *)rnb + sizeof(*rnb);
+
+ if (rnb == NULL || rnb->rnb_len == 0)
+ GOTO(out_env, rc = 0);
+
+ CDEBUG(D_INFO, "Get data buffer along with open, len %i, i_size %llu\n",
+ rnb->rnb_len, i_size_read(inode));
+
+ io = vvp_env_thread_io(env);
+ io->ci_obj = obj;
+ io->ci_ignore_layout = 1;
+ rc = cl_io_init(env, io, CIT_MISC, obj);
+ if (rc)
+ GOTO(out_io, rc);
+
+ lnb.lnb_file_offset = rnb->rnb_offset;
+ start = lnb.lnb_file_offset / PAGE_SIZE;
+ index = 0;
+ LASSERT(lnb.lnb_file_offset % PAGE_SIZE == 0);
+ lnb.lnb_page_offset = 0;
+ do {
+ struct cl_page *clp;
+
+ lnb.lnb_data = data + (index << PAGE_SHIFT);
+ lnb.lnb_len = rnb->rnb_len - (index << PAGE_SHIFT);
+ if (lnb.lnb_len > PAGE_SIZE)
+ lnb.lnb_len = PAGE_SIZE;
+
+ vmpage = read_cache_page(mapping, index + start,
+ ll_dom_readpage, &lnb);
+ if (IS_ERR(vmpage)) {
+ CWARN("%s: cannot fill page %lu for "DFID
+ " with data: rc = %li\n",
+ ll_get_fsname(inode->i_sb, NULL, 0),
+ index + start, PFID(lu_object_fid(&obj->co_lu)),
+ PTR_ERR(vmpage));
+ break;
+ }
+ lock_page(vmpage);
+ clp = cl_page_find(env, obj, vmpage->index, vmpage,
+ CPT_CACHEABLE);
+ if (IS_ERR(clp)) {
+ unlock_page(vmpage);
+ put_page(vmpage);
+ GOTO(out_io, rc = PTR_ERR(clp));
+ }
+
+ /* export page */
+ cl_page_export(env, clp, 1);
+ cl_page_put(env, clp);
+ unlock_page(vmpage);
+ put_page(vmpage);
+ index++;
+ } while (rnb->rnb_len > (index << PAGE_SHIFT));
+ rc = 0;
+ EXIT;
+out_io:
+ cl_io_fini(env, io);
+out_env:
+ cl_env_put(env, &refcheck);
+}
+
static int ll_intent_file_open(struct dentry *de, void *lmm, int lmmsize,
struct lookup_intent *itp)
{
}
rc = ll_prep_inode(&de->d_inode, req, NULL, itp);
- if (!rc && itp->it_lock_mode)
+
+ if (!rc && itp->it_lock_mode) {
+ ll_dom_finish_open(de->d_inode, req, itp);
ll_set_lock_data(sbi->ll_md_exp, de->d_inode, itp, NULL);
+ }
out:
ptlrpc_req_finished(req);
struct lov_user_md **kbuf);
void ll_open_cleanup(struct super_block *sb, struct ptlrpc_request *open_req);
+void ll_dom_finish_open(struct inode *inode, struct ptlrpc_request *req,
+ struct lookup_intent *it);
+
/* Compute expected user md size when passing in a md from user space */
static inline ssize_t ll_lov_user_md_size(const struct lov_user_md *lum)
{
CDEBUG(D_DENTRY, "it %p it_disposition %x\n", it,
it->it_disposition);
if (!it_disposition(it, DISP_LOOKUP_NEG)) {
- rc = ll_prep_inode(&inode, request, (*de)->d_sb, it);
- if (rc)
- RETURN(rc);
-
- ll_set_lock_data(ll_i2sbi(parent)->ll_md_exp, inode, it, &bits);
-
- /* We used to query real size from OSTs here, but actually
- this is not needed. For stat() calls size would be updated
- from subsequent do_revalidate()->ll_inode_revalidate_it() in
- 2.4 and
- vfs_getattr_it->ll_getattr()->ll_inode_revalidate_it() in 2.6
- Everybody else who needs correct file size would call
- ll_glimpse_size or some equivalent themselves anyway.
- Also see bug 7198. */
+ rc = ll_prep_inode(&inode, request, (*de)->d_sb, it);
+ if (rc)
+ RETURN(rc);
+
+ if (it->it_op & IT_OPEN)
+ ll_dom_finish_open(inode, request, it);
+
+ ll_set_lock_data(ll_i2sbi(parent)->ll_md_exp, inode, it, &bits);
+
+ /* We used to query real size from OSTs here, but actually
+ * this is not needed. For stat() calls size would be updated
+ * from subsequent do_revalidate()->ll_inode_revalidate_it() in
+ * 2.4 and
+ * vfs_getattr_it->ll_getattr()->ll_inode_revalidate_it() in 2.6
+ * Everybody else who needs correct file size would call
+ * ll_glimpse_size or some equivalent themselves anyway.
+ * Also see bug 7198.
+ */
}
/* Only hash *de if it is unhashed (new dentry).
}
LPROC_SEQ_FOPS(mdc_stats);
+static int mdc_dom_min_repsize_seq_show(struct seq_file *m, void *v)
+{
+ struct obd_device *dev = m->private;
+
+ seq_printf(m, "%u\n", dev->u.cli.cl_dom_min_inline_repsize);
+
+ return 0;
+}
+
+static ssize_t mdc_dom_min_repsize_seq_write(struct file *file,
+ const char __user *buffer,
+ size_t count, loff_t *off)
+{
+ struct obd_device *dev;
+ unsigned int val;
+ int rc;
+
+ dev = ((struct seq_file *)file->private_data)->private;
+ rc = kstrtouint_from_user(buffer, count, 0, &val);
+ if (rc)
+ return rc;
+
+ if (val > MDC_DOM_MAX_INLINE_REPSIZE)
+ return -ERANGE;
+
+ dev->u.cli.cl_dom_min_inline_repsize = val;
+ return count;
+}
+LPROC_SEQ_FOPS(mdc_dom_min_repsize);
+
LPROC_SEQ_FOPS_RO_TYPE(mdc, connect_flags);
LPROC_SEQ_FOPS_RO_TYPE(mdc, server_uuid);
LPROC_SEQ_FOPS_RO_TYPE(mdc, timeouts);
.fops = &mdc_unstable_stats_fops },
{ .name = "mdc_stats",
.fops = &mdc_stats_fops },
+ { .name = "mdc_dom_min_repsize",
+ .fops = &mdc_dom_min_repsize_fops },
{ NULL }
};
struct ldlm_lock_desc *new, void *data, int flag);
int mdc_ldlm_glimpse_ast(struct ldlm_lock *dlmlock, void *data);
int mdc_fill_lvb(struct ptlrpc_request *req, struct ost_lvb *lvb);
+
+#define MDC_DOM_DEF_INLINE_REPSIZE 8192
+#define MDC_DOM_MAX_INLINE_REPSIZE XATTR_SIZE_MAX
+
#endif
int count = 0;
enum ldlm_mode mode;
int rc;
+ int repsize;
+
ENTRY;
it->it_create_mode = (it->it_create_mode & ~S_IFMT) | S_IFREG;
req_capsule_set_size(&req->rq_pill, &RMF_MDT_MD, RCL_SERVER,
obddev->u.cli.cl_max_mds_easize);
req_capsule_set_size(&req->rq_pill, &RMF_ACL, RCL_SERVER, acl_bufsize);
- ptlrpc_request_set_replen(req);
- return req;
+
+ /**
+ * Inline buffer for possible data from Data-on-MDT files.
+ */
+ req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE, RCL_SERVER,
+ sizeof(struct niobuf_remote));
+ ptlrpc_request_set_replen(req);
+
+ /* Get real repbuf allocated size as rounded up power of 2 */
+ repsize = size_roundup_power2(req->rq_replen +
+ lustre_msg_early_size());
+
+ /* Estimate free space for DoM files in repbuf */
+ repsize -= req->rq_replen - obddev->u.cli.cl_max_mds_easize +
+ sizeof(struct lov_comp_md_v1) +
+ sizeof(struct lov_comp_md_entry_v1) +
+ lov_mds_md_size(0, LOV_MAGIC_V3);
+
+ if (repsize < obddev->u.cli.cl_dom_min_inline_repsize) {
+ repsize = obddev->u.cli.cl_dom_min_inline_repsize - repsize;
+ req_capsule_set_size(&req->rq_pill, &RMF_NIOBUF_INLINE,
+ RCL_SERVER,
+ sizeof(struct niobuf_remote) + repsize);
+ ptlrpc_request_set_replen(req);
+ CDEBUG(D_INFO, "Increase repbuf by %d bytes, total: %d\n",
+ repsize, req->rq_replen);
+ }
+ return req;
}
#define GA_DEFAULT_EA_NAME_LEN 20
if (rc)
GOTO(err_osc_cleanup, rc);
+ obd->u.cli.cl_dom_min_inline_repsize = MDC_DOM_DEF_INLINE_REPSIZE;
+
ns_register_cancel(obd->obd_namespace, mdc_cancel_weight);
obd->obd_namespace->ns_lvbo = &inode_lvbo;
* Pack size attributes into the reply.
*/
int mdt_pack_size2body(struct mdt_thread_info *info,
- const struct lu_fid *fid, bool dom_lock)
+ const struct lu_fid *fid, struct lustre_handle *lh)
{
struct mdt_body *b;
struct md_attr *ma = &info->mti_attr;
int dom_stripe;
+ bool dom_lock = false;
ENTRY;
if (dom_stripe == LMM_NO_DOM)
RETURN(-ENOENT);
+ if (lustre_handle_is_used(lh)) {
+ struct ldlm_lock *lock;
+
+ lock = ldlm_handle2lock(lh);
+ if (lock != NULL) {
+ dom_lock = ldlm_has_dom(lock);
+ LDLM_LOCK_PUT(lock);
+ }
+ }
+
/* no DoM lock, no size in reply */
if (!dom_lock)
RETURN(0);
* mdt_object_put(), that is why this speacial
* exit path is used. */
rc = mdt_pack_size2body(info, child_fid,
- child_bits & MDS_INODELOCK_DOM);
+ &lhc->mlh_reg_lh);
if (rc != 0 && child_bits & MDS_INODELOCK_DOM) {
/* DOM lock was taken in advance but this is
* not DoM file. Drop the lock. */
out_ucred:
mdt_exit_ucred(info);
out_shrink:
- mdt_client_compatibility(info);
- rc2 = mdt_fix_reply(info);
- if (rc == 0)
- rc = rc2;
- return rc;
+ mdt_client_compatibility(info);
+
+ rc2 = mdt_fix_reply(info);
+ if (rc == 0)
+ rc = rc2;
+
+ /*
+ * Data-on-MDT optimization - read data along with OPEN and return it
+ * in reply. Do that only if we have both DOM and LAYOUT locks.
+ */
+ if (rc == 0 && op == REINT_OPEN &&
+ info->mti_attr.ma_lmm != NULL &&
+ mdt_lmm_dom_entry(info->mti_attr.ma_lmm) == LMM_DOM_ONLY) {
+ rc = mdt_dom_read_on_open(info, info->mti_mdt,
+ &lhc->mlh_reg_lh);
+ }
+
+ return rc;
}
static long mdt_reint_opcode(struct ptlrpc_request *req,
}
/* DoM files get IO lock at open by default */
- m->mdt_opts.mo_dom_lock = 1;
+ m->mdt_opts.mo_dom_lock = ALWAYS_DOM_LOCK_ON_OPEN;
+ /* DoM files are read at open and data is packed in the reply */
+ m->mdt_opts.mo_dom_read_open = 1;
m->mdt_squash.rsi_uid = 0;
m->mdt_squash.rsi_gid = 0;
#define MDT_FL_CFGLOG 0
#define MDT_FL_SYNCED 1
+/* possible values for mo_dom_lock */
+enum {
+ NO_DOM_LOCK_ON_OPEN = 0,
+ TRYLOCK_DOM_ON_OPEN = 1,
+ ALWAYS_DOM_LOCK_ON_OPEN = 2,
+ NUM_DOM_LOCK_ON_OPEN_MODES
+};
+
struct mdt_device {
/* super-class */
struct lu_device mdt_lu_dev;
mo_acl:1,
mo_cos:1,
mo_evict_tgt_nids:1,
- mo_dom_lock:1;
+ mo_dom_read_open:1;
+ unsigned int mo_dom_lock;
} mdt_opts;
/* mdt state flags */
unsigned long mdt_state;
void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
const struct lu_attr *attr, const struct lu_fid *fid);
int mdt_pack_size2body(struct mdt_thread_info *info,
- const struct lu_fid *fid, bool dom_lock);
+ const struct lu_fid *fid, struct lustre_handle *lh);
int mdt_getxattr(struct mdt_thread_info *info);
int mdt_reint_setxattr(struct mdt_thread_info *info,
struct mdt_lock_handle *lh);
struct ldlm_lock **lockp, __u64 flags);
int mdt_brw_enqueue(struct mdt_thread_info *info, struct ldlm_namespace *ns,
struct ldlm_lock **lockp, __u64 flags);
+int mdt_dom_read_on_open(struct mdt_thread_info *mti, struct mdt_device *mdt,
+ struct lustre_handle *lh);
void mdt_dom_discard_data(struct mdt_thread_info *info,
const struct lu_fid *fid);
int mdt_dom_disk_lvbo_update(const struct lu_env *env, struct mdt_object *mo,
repbody->mbo_flags = OBD_FL_FLUSH;
RETURN(rc);
}
+
+/* read file data to the buffer */
+int mdt_dom_read_on_open(struct mdt_thread_info *mti, struct mdt_device *mdt,
+ struct lustre_handle *lh)
+{
+ const struct lu_env *env = mti->mti_env;
+ struct tgt_session_info *tsi = tgt_ses_info(env);
+ struct req_capsule *pill = tsi->tsi_pill;
+ const struct lu_fid *fid;
+ struct ptlrpc_request *req = tgt_ses_req(tsi);
+ struct mdt_body *mbo;
+ struct dt_device *dt = mdt->mdt_bottom;
+ struct dt_object *mo;
+ void *buf;
+ struct niobuf_remote *rnb = NULL;
+ struct niobuf_local *lnb;
+ int rc;
+ int max_reply_len;
+ loff_t offset;
+ unsigned int len, copied = 0;
+ int lnbs, nr_local, i;
+ bool dom_lock = false;
+
+ ENTRY;
+
+ if (!req_capsule_field_present(pill, &RMF_NIOBUF_INLINE, RCL_SERVER)) {
+ /* There is no reply buffers for this field, this means that
+ * client has no support for data in reply.
+ */
+ RETURN(0);
+ }
+
+ mbo = req_capsule_server_get(pill, &RMF_MDT_BODY);
+
+ if (lustre_handle_is_used(lh)) {
+ struct ldlm_lock *lock;
+
+ lock = ldlm_handle2lock(lh);
+ if (lock) {
+ dom_lock = ldlm_has_dom(lock) && ldlm_has_layout(lock);
+ LDLM_LOCK_PUT(lock);
+ }
+ }
+
+ /* return data along with open only along with DoM lock */
+ if (!dom_lock || !mdt->mdt_opts.mo_dom_read_open)
+ RETURN(0);
+
+ if (!(mbo->mbo_valid & OBD_MD_DOM_SIZE))
+ RETURN(0);
+
+ if (mbo->mbo_dom_size == 0)
+ RETURN(0);
+
+ /* check the maximum size available in reply */
+ max_reply_len =
+ req->rq_rqbd->rqbd_svcpt->scp_service->srv_max_reply_size;
+
+ CDEBUG(D_INFO, "File size %llu, reply sizes %d/%d/%d\n",
+ mbo->mbo_dom_size, max_reply_len, req->rq_reqmsg->lm_repsize,
+ req->rq_replen);
+ len = req->rq_reqmsg->lm_repsize - req->rq_replen;
+ max_reply_len -= req->rq_replen;
+
+ /* NB: at this moment we have the following sizes:
+ * - req->rq_replen: used data in reply
+ * - req->rq_reqmsg->lm_repsize: total allocated reply buffer at client
+ * - max_reply_len: maximum reply size allowed by protocol
+ *
+ * Ideal case when file size fits in allocated reply buffer,
+ * that mean we can return whole data in reply. We can also fit more
+ * data up to max_reply_size in total reply size, but this will cause
+ * re-allocation on client and resend with larger buffer. This is still
+ * faster than separate READ IO.
+ * Third case if file is too big to fit even in maximum size, in that
+ * case we return just tail to optimize possible append.
+ *
+ * At the moment the following strategy is used:
+ * 1) try to fit into the buffer we have
+ * 2) respond with bigger buffer so client will re-allocate it and
+ * resend (up to srv_max_reply_size value).
+ * 3) return just file tail otherwise.
+ */
+ if (mbo->mbo_dom_size <= len) {
+ /* can fit whole data */
+ len = mbo->mbo_dom_size;
+ offset = 0;
+ } else if (mbo->mbo_dom_size <= max_reply_len) {
+ /* It is worth to make this tunable ON/OFF because this will
+ * cause buffer re-allocation and resend
+ */
+ len = mbo->mbo_dom_size;
+ offset = 0;
+ } else {
+ int tail = mbo->mbo_dom_size % PAGE_SIZE;
+
+ /* no tail or tail can't fit in reply */
+ if (tail == 0 || len < tail)
+ RETURN(0);
+
+ len = tail;
+ offset = mbo->mbo_dom_size - len;
+ }
+ LASSERT((offset % PAGE_SIZE) == 0);
+ rc = req_capsule_server_grow(pill, &RMF_NIOBUF_INLINE,
+ sizeof(*rnb) + len);
+ if (rc != 0) {
+ /* failed to grow data buffer, just exit */
+ GOTO(out, rc = -E2BIG);
+ }
+
+ /* re-take MDT_BODY buffer after the buffer growing above */
+ mbo = req_capsule_server_get(pill, &RMF_MDT_BODY);
+ fid = &mbo->mbo_fid1;
+ if (!fid_is_sane(fid))
+ RETURN(0);
+
+ rnb = req_capsule_server_get(tsi->tsi_pill, &RMF_NIOBUF_INLINE);
+ if (rnb == NULL)
+ GOTO(out, rc = -EPROTO);
+ buf = (char *)rnb + sizeof(*rnb);
+ rnb->rnb_len = len;
+ rnb->rnb_offset = offset;
+
+ mo = dt_locate(env, dt, fid);
+ if (IS_ERR(mo))
+ GOTO(out, rc = PTR_ERR(mo));
+ LASSERT(mo != NULL);
+
+ dt_read_lock(env, mo, 0);
+ if (!dt_object_exists(mo))
+ GOTO(unlock, rc = -ENOENT);
+
+ /* parse remote buffers to local buffers and prepare the latter */
+ lnbs = (len >> PAGE_SHIFT) + 1;
+ OBD_ALLOC(lnb, sizeof(*lnb) * lnbs);
+ if (lnb == NULL)
+ GOTO(unlock, rc = -ENOMEM);
+
+ rc = dt_bufs_get(env, mo, rnb, lnb, 0);
+ if (unlikely(rc < 0))
+ GOTO(free, rc);
+ LASSERT(rc <= lnbs);
+ nr_local = rc;
+ rc = dt_read_prep(env, mo, lnb, nr_local);
+ if (unlikely(rc))
+ GOTO(buf_put, rc);
+ /* copy data to the buffer finally */
+ for (i = 0; i < nr_local; i++) {
+ char *p = kmap(lnb[i].lnb_page);
+ long off;
+
+ LASSERT(lnb[i].lnb_page_offset == 0);
+ off = lnb[i].lnb_len & ~PAGE_MASK;
+ if (off > 0)
+ memset(p + off, 0, PAGE_SIZE - off);
+
+ memcpy(buf + (i << PAGE_SHIFT), p, lnb[i].lnb_len);
+ kunmap(lnb[i].lnb_page);
+ copied += lnb[i].lnb_len;
+ LASSERT(rc <= len);
+ }
+ CDEBUG(D_INFO, "Read %i (wanted %u) bytes from %llu\n", copied,
+ len, offset);
+ if (copied < len)
+ CWARN("%s: read %i bytes for "DFID
+ " but wanted %u, is size wrong?\n",
+ tsi->tsi_exp->exp_obd->obd_name, copied,
+ PFID(&tsi->tsi_fid), len);
+ EXIT;
+buf_put:
+ dt_bufs_put(env, mo, lnb, nr_local);
+free:
+ OBD_FREE(lnb, sizeof(*lnb) * lnbs);
+unlock:
+ dt_read_unlock(env, mo);
+ lu_object_put(env, &mo->do_lu);
+out:
+ if (rnb != NULL)
+ rnb->rnb_len = copied;
+ RETURN(0);
+}
+
}
LPROC_SEQ_FOPS(mdt_sync_count);
+static char *dom_open_lock_modes[NUM_DOM_LOCK_ON_OPEN_MODES] = {
+ [NO_DOM_LOCK_ON_OPEN] = "never",
+ [TRYLOCK_DOM_ON_OPEN] = "trylock",
+ [ALWAYS_DOM_LOCK_ON_OPEN] = "always",
+};
+
+/* This must be longer than the longest string above */
+#define DOM_LOCK_MODES_MAXLEN 16
+
+/**
+ * Show MDT policy for data prefetch on open for DoM files..
+ *
+ * \param[in] m seq_file handle
+ * \param[in] data unused
+ *
+ * \retval 0 on success
+ * \retval negative value on error
+ */
static int mdt_dom_lock_seq_show(struct seq_file *m, void *data)
{
struct obd_device *obd = m->private;
struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
- seq_printf(m, "%u\n", (mdt->mdt_opts.mo_dom_lock != 0));
+ seq_printf(m, "%s\n", dom_open_lock_modes[mdt->mdt_opts.mo_dom_lock]);
return 0;
}
+/**
+ * Change MDT policy for data prefetch on open for DoM files.
+ *
+ * This variable defines how DOM lock is taken at open enqueue.
+ * There are three possible modes:
+ * 1) never - never take DoM lock on open. DoM lock will be taken as separate
+ * IO lock with own enqueue.
+ * 2) trylock - DoM lock will be taken only if non-blocked.
+ * 3) always - DoM lock will be taken always even if it is blocking lock.
+ *
+ * If dom_read_open is enabled too then DoM lock is taken in PR mode and
+ * is paired with LAYOUT lock when possible.
+ *
+ * \param[in] file proc file
+ * \param[in] buffer string which represents policy
+ * \param[in] count \a buffer length
+ * \param[in] off unused for single entry
+ *
+ * \retval \a count on success
+ * \retval negative number on error
+ */
static ssize_t
mdt_dom_lock_seq_write(struct file *file, const char __user *buffer,
size_t count, loff_t *off)
struct seq_file *m = file->private_data;
struct obd_device *obd = m->private;
struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+ char kernbuf[DOM_LOCK_MODES_MAXLEN];
+ int val = -1;
+ int i, rc;
+
+ if (count == 0 || count >= sizeof(kernbuf))
+ return -EINVAL;
+
+ if (copy_from_user(kernbuf, buffer, count))
+ return -EFAULT;
+
+ kernbuf[count] = 0;
+ if (kernbuf[count - 1] == '\n')
+ kernbuf[count - 1] = 0;
+
+ for (i = 0 ; i < NUM_DOM_LOCK_ON_OPEN_MODES; i++) {
+ if (strcmp(kernbuf, dom_open_lock_modes[i]) == 0) {
+ val = i;
+ break;
+ }
+ }
+
+ /* Legacy numeric codes */
+ if (val == -1) {
+ rc = kstrtoint_from_user(buffer, count, 0, &val);
+ if (rc)
+ return rc;
+ }
+
+ if (val < 0 || val >= NUM_DOM_LOCK_ON_OPEN_MODES)
+ return -EINVAL;
+
+ mdt->mdt_opts.mo_dom_lock = val;
+ return count;
+}
+LPROC_SEQ_FOPS(mdt_dom_lock);
+
+/**
+ * Show MDT policy for data prefetch on open for DoM files..
+ *
+ * \param[in] m seq_file handle
+ * \param[in] data unused
+ *
+ * \retval 0 on success
+ * \retval negative value on error
+ */
+static int mdt_dom_read_open_seq_show(struct seq_file *m, void *data)
+{
+ struct obd_device *obd = m->private;
+ struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
+
+ seq_printf(m, "%u\n", !!mdt->mdt_opts.mo_dom_read_open);
+ return 0;
+}
+
+/**
+ * Modify MDT policy for data prefetch on open for DoM files.
+ *
+ * If enabled then Data-on-MDT file data may be read during open and
+ * returned back in reply. It works only with mo_dom_lock enabled.
+ *
+ * \param[in] file proc file
+ * \param[in] buffer string which represents policy
+ * \param[in] count \a buffer length
+ * \param[in] off unused for single entry
+ *
+ * \retval \a count on success
+ * \retval negative number on error
+ */
+static ssize_t
+mdt_dom_read_open_seq_write(struct file *file, const char __user *buffer,
+ size_t count, loff_t *off)
+{
+ struct seq_file *m = file->private_data;
+ struct obd_device *obd = m->private;
+ struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
bool val;
int rc;
if (rc)
return rc;
- mdt->mdt_opts.mo_dom_lock = val;
+ mdt->mdt_opts.mo_dom_read_open = !!val;
return count;
}
-LPROC_SEQ_FOPS(mdt_dom_lock);
+LPROC_SEQ_FOPS(mdt_dom_read_open);
LPROC_SEQ_FOPS_RO_TYPE(mdt, recovery_status);
LPROC_SEQ_FOPS_RO_TYPE(mdt, num_exports);
.fops = &mdt_sync_count_fops },
{ .name = "dom_lock",
.fops = &mdt_dom_lock_fops },
+ { .name = "dom_read_open",
+ .fops = &mdt_dom_read_open_fops },
{ NULL }
};
dom_stripes = mdt_lmm_dom_entry(ma->ma_lmm);
if (dom_stripes == LMM_DOM_ONLY &&
- info->mti_mdt->mdt_opts.mo_dom_lock != 0 &&
+ info->mti_mdt->mdt_opts.mo_dom_lock > 0 &&
!mdt_dom_client_has_lock(info, mdt_object_fid(obj)))
dom_lock = true;
}
lhc = &info->mti_lh[MDT_LH_LOCAL];
} else if (dom_lock) {
lm = (open_flags & MDS_FMODE_WRITE) ? LCK_PW : LCK_PR;
- *ibits = MDS_INODELOCK_DOM;
+ if (info->mti_mdt->mdt_opts.mo_dom_lock ==
+ TRYLOCK_DOM_ON_OPEN) {
+ trybits |= MDS_INODELOCK_DOM |
+ MDS_INODELOCK_LAYOUT;
+ } else {
+ /* mo_dom_lock == ALWAYS_DOM_LOCK_ON_OPEN */
+ *ibits = MDS_INODELOCK_DOM;
+ if (info->mti_mdt->mdt_opts.mo_dom_read_open) {
+ trybits |= MDS_INODELOCK_LAYOUT;
+ }
+ }
}
CDEBUG(D_INODE, "normal open:"DFID" lease count: %d, lm: %d\n",
out:
mdt_object_put(env, o);
if (rc == 0)
- mdt_pack_size2body(info, rr->rr_fid2,
- ibits & MDS_INODELOCK_DOM);
+ mdt_pack_size2body(info, rr->rr_fid2, &lhc->mlh_reg_lh);
out_parent_put:
if (parent != NULL)
mdt_object_put(env, parent);
out_child:
mdt_object_put(info->mti_env, child);
if (result == 0)
- mdt_pack_size2body(info, child_fid, ibits & MDS_INODELOCK_DOM);
+ mdt_pack_size2body(info, child_fid, &lhc->mlh_reg_lh);
out_parent:
mdt_object_unlock_put(info, parent, lh, result || !created);
out:
&RMF_LAYOUT_INTENT,
&RMF_EADATA /* for new layout to be set up */
};
+
static const struct req_msg_field *ldlm_intent_open_server[] = {
- &RMF_PTLRPC_BODY,
- &RMF_DLM_REP,
- &RMF_MDT_BODY,
- &RMF_MDT_MD,
- &RMF_ACL,
- &RMF_CAPA1,
- &RMF_CAPA2
+ &RMF_PTLRPC_BODY,
+ &RMF_DLM_REP,
+ &RMF_MDT_BODY,
+ &RMF_MDT_MD,
+ &RMF_ACL,
+ &RMF_CAPA1,
+ &RMF_CAPA2,
+ &RMF_NIOBUF_INLINE,
};
static const struct req_msg_field *ldlm_intent_getattr_client[] = {
dump_rniobuf);
EXPORT_SYMBOL(RMF_NIOBUF_REMOTE);
+struct req_msg_field RMF_NIOBUF_INLINE =
+ DEFINE_MSGF("niobuf_inline", RMF_F_NO_SIZE_CHECK,
+ sizeof(struct niobuf_remote), lustre_swab_niobuf_remote,
+ dump_rniobuf);
+EXPORT_SYMBOL(RMF_NIOBUF_INLINE);
+
struct req_msg_field RMF_RCS =
- DEFINE_MSGF("niobuf_remote", RMF_F_STRUCT_ARRAY, sizeof(__u32),
- lustre_swab_generic_32s, dump_rcs);
+ DEFINE_MSGF("niobuf_rcs", RMF_F_STRUCT_ARRAY, sizeof(__u32),
+ lustre_swab_generic_32s, dump_rcs);
EXPORT_SYMBOL(RMF_RCS);
struct req_msg_field RMF_EAVALS_LENS =
spin_lock(&request->rq_lock);
request->rq_err = 1;
spin_unlock(&request->rq_lock);
- request->rq_status = rc;
- GOTO(cleanup_bulk, rc);
- }
+ request->rq_status = rc;
+ GOTO(cleanup_bulk, rc);
+ }
+ /* Use real allocated value in lm_repsize,
+ * so the server may use whole reply buffer
+ * without resends where it is needed.
+ */
+ request->rq_reqmsg->lm_repsize = request->rq_repbuf_len;
} else {
request->rq_repdata = NULL;
request->rq_repmsg = NULL;
}
run_test 271c "DoM: IO lock at open saves enqueue RPCs"
+cleanup_271def_tests() {
+ trap 0
+ rm -f $1
+}
+
+test_271d() {
+ [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.10.57) ] &&
+ skip "Need MDS version at least 2.10.57" && return
+
+ local dom=$DIR/$tdir/dom
+ local tmp=$TMP/$tfile
+ trap "cleanup_271def_tests $tmp" EXIT
+
+ mkdir -p $DIR/$tdir
+
+ $LFS setstripe -E 1024K -L mdt $DIR/$tdir
+
+ local mdtidx=$($GETSTRIPE -M $DIR/$tdir)
+ local facet=mds$((mdtidx + 1))
+
+ cancel_lru_locks mdc
+ dd if=/dev/urandom of=$tmp bs=1000 count=1
+ dd if=$tmp of=$dom bs=1000 count=1
+ cancel_lru_locks mdc
+
+ cat /etc/hosts >> $tmp
+ lctl set_param -n mdc.*.stats=clear
+
+ # append data to the same file it should update local page
+ echo "Append to the same page"
+ cat /etc/hosts >> $dom
+ local num=$(lctl get_param -n mdc.*.stats |
+ awk '/ost_read/ {print $2}')
+ local ra=$(lctl get_param -n mdc.*.stats |
+ awk '/req_active/ {print $2}')
+ local rw=$(lctl get_param -n mdc.*.stats |
+ awk '/req_waittime/ {print $2}')
+
+ [ -z $num ] || error "$num READ RPC occured"
+ [ $ra == $rw ] || error "$((ra - rw)) resend occured"
+ echo "... DONE"
+
+ # compare content
+ cmp $tmp $dom || error "file miscompare"
+
+ cancel_lru_locks mdc
+ lctl set_param -n mdc.*.stats=clear
+
+ echo "Open and read file"
+ cat $dom > /dev/null
+ local num=$(lctl get_param -n mdc.*.stats |
+ awk '/ost_read/ {print $2}')
+ local ra=$(lctl get_param -n mdc.*.stats |
+ awk '/req_active/ {print $2}')
+ local rw=$(lctl get_param -n mdc.*.stats |
+ awk '/req_waittime/ {print $2}')
+
+ [ -z $num ] || error "$num READ RPC occured"
+ [ $ra == $rw ] || error "$((ra - rw)) resend occured"
+ echo "... DONE"
+
+ # compare content
+ cmp $tmp $dom || error "file miscompare"
+
+ return 0
+}
+run_test 271d "DoM: read on open (1K file in reply buffer)"
+
+test_271e() {
+ [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.10.57) ] &&
+ skip "Need MDS version at least 2.10.57" && return
+
+ local dom=$DIR/$tdir/dom
+ local tmp=$TMP/${tfile}.data
+ trap "cleanup_271def_tests $tmp" EXIT
+
+ mkdir -p $DIR/$tdir
+
+ $LFS setstripe -E 1024K -L mdt $DIR/$tdir
+
+ local mdtidx=$($GETSTRIPE -M $DIR/$tdir)
+ local facet=mds$((mdtidx + 1))
+
+ cancel_lru_locks mdc
+ dd if=/dev/urandom of=$tmp bs=30K count=1
+ dd if=$tmp of=$dom bs=30K count=1
+ cancel_lru_locks mdc
+ cat /etc/hosts >> $tmp
+ lctl set_param -n mdc.*.stats=clear
+
+ echo "Append to the same page"
+ cat /etc/hosts >> $dom
+
+ local num=$(lctl get_param -n mdc.*.stats | \
+ awk '/ost_read/ {print $2}')
+ local ra=$(lctl get_param -n mdc.*.stats | \
+ awk '/req_active/ {print $2}')
+ local rw=$(lctl get_param -n mdc.*.stats | \
+ awk '/req_waittime/ {print $2}')
+
+ [ -z $num ] || error "$num READ RPC occured"
+ # Reply buffer can be adjusted for larger buffer by resend
+ echo "... DONE with $((ra - rw)) resends"
+
+ # compare content
+ cmp $tmp $dom || error "file miscompare"
+
+ cancel_lru_locks mdc
+ lctl set_param -n mdc.*.stats=clear
+
+ echo "Open and read file"
+ cat $dom > /dev/null
+ local num=$(lctl get_param -n mdc.*.stats | \
+ awk '/ost_read/ {print $2}')
+ local ra=$(lctl get_param -n mdc.*.stats | \
+ awk '/req_active/ {print $2}')
+ local rw=$(lctl get_param -n mdc.*.stats | \
+ awk '/req_waittime/ {print $2}')
+
+ [ -z $num ] || error "$num READ RPC occured"
+ # Reply buffer can be adjusted for larger buffer by resend
+ echo "... DONE with $((ra - rw)) resends"
+
+ # compare content
+ cmp $tmp $dom || error "file miscompare"
+
+ return 0
+}
+run_test 271e "DoM: read on open (30K file with reply buffer adjusting)"
+
+test_271f() {
+ [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.10.57) ] &&
+ skip "Need MDS version at least 2.10.57" && return
+
+ local dom=$DIR/$tdir/dom
+ local tmp=$TMP/$tfile
+ trap "cleanup_271def_tests $tmp" EXIT
+
+ mkdir -p $DIR/$tdir
+
+ $LFS setstripe -E 1024K -L mdt $DIR/$tdir
+
+ local mdtidx=$($GETSTRIPE -M $DIR/$tdir)
+ local facet=mds$((mdtidx + 1))
+
+ cancel_lru_locks mdc
+ dd if=/dev/urandom of=$tmp bs=200000 count=1
+ dd if=$tmp of=$dom bs=200000 count=1
+ cancel_lru_locks mdc
+ cat /etc/hosts >> $tmp
+ lctl set_param -n mdc.*.stats=clear
+
+ echo "Append to the same page"
+ cat /etc/hosts >> $dom
+ local num=$(lctl get_param -n mdc.*.stats | \
+ awk '/ost_read/ {print $2}')
+ local ra=$(lctl get_param -n mdc.*.stats | \
+ awk '/req_active/ {print $2}')
+ local rw=$(lctl get_param -n mdc.*.stats | \
+ awk '/req_waittime/ {print $2}')
+
+ [ -z $num ] || error "$num READ RPC occured"
+ [ $ra == $rw ] || error "$((ra - rw)) resend occured"
+ echo "... DONE"
+
+ # compare content
+ cmp $tmp $dom || error "file miscompare"
+
+ cancel_lru_locks mdc
+ lctl set_param -n mdc.*.stats=clear
+
+ echo "Open and read file"
+ cat $dom > /dev/null
+ local num=$(lctl get_param -n mdc.*.stats | \
+ awk '/ost_read/ {print $2}')
+ local ra=$(lctl get_param -n mdc.*.stats | \
+ awk '/req_active/ {print $2}')
+ local rw=$(lctl get_param -n mdc.*.stats | \
+ awk '/req_waittime/ {print $2}')
+
+ [ $num -eq 1 ] || error "expect 1 READ RPC, $num occured"
+ [ $ra == $rw ] || error "$((ra - rw)) resend occured"
+ echo "... DONE"
+
+ # compare content
+ cmp $tmp $dom || error "file miscompare"
+
+ return 0
+}
+run_test 271f "DoM: read on open (200K file and read tail)"
+
test_275() {
remote_ost_nodsh && skip "remote OST with nodsh"
[ $(lustre_version_code ost1) -lt $(version_code 2.10.57) ] &&