/** requested attr */
__u32 rp_attrs;
/** pointers to pages */
- struct page **rp_pages;
+ union {
+ struct page **rp_pages;
+ void *rp_data;
+ };
};
+/* for dt_index_walk / mdd_readpage */
+void *rdpg_page_get(const struct lu_rdpg *rdpg, unsigned int index);
+void rdpg_page_put(const struct lu_rdpg *rdpg, unsigned int index);
+
enum lu_xattr_flags {
LU_XATTR_REPLACE = BIT(0),
LU_XATTR_CREATE = BIT(1),
!!(lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_DOM);
}
+static inline bool ldlm_has_update(struct ldlm_lock *lock)
+{
+ return lock->l_resource->lr_type == LDLM_IBITS &&
+ lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_UPDATE;
+}
+
static inline char *
ldlm_ns_name(struct ldlm_namespace *ns)
{
return (exp_connect_flags2(exp) & OBD_CONNECT2_BATCH_RPC);
}
+static inline int exp_connect_open_readdir(struct obd_export *exp)
+{
+ return !!(exp_connect_flags2(exp) & OBD_CONNECT2_READDIR_OPEN);
+}
+
enum {
/* archive_ids in array format */
KKUC_CT_DATA_ARRAY_MAGIC = 0x092013cea,
*flags &= ~O_LOV_DELAY_CREATE;
}
+static inline unsigned long hash_x_index(__u64 hash, int hash64)
+{
+ if (BITS_PER_LONG == 32 && hash64)
+ hash >>= 32;
+ /* save hash 0 with hash 1 */
+ return ~0UL - (hash + !hash);
+}
+
+
/** @} mdc */
#endif
*
* - When reply is received from the server (osc_enqueue_interpret())
* - ldlm_cli_enqueue_fini()
- * - LDLM_LOCK_PUT(): releases caller reference acquired by
+ * - ldlm_lock_put(): releases caller reference acquired by
* ldlm_lock_new().
* - if (rc != 0)
* ldlm_lock_decref(): error case: matches ldlm_cli_enqueue().
*
* - When lock is being cancelled (ldlm_lock_cancel())
* - ldlm_lock_destroy()
- * - LDLM_LOCK_PUT(): releases hash-table reference acquired by
+ * - ldlm_lock_put(): releases hash-table reference acquired by
* ldlm_lock_new().
*
* osc_lock is detached from ldlm_lock by osc_lock_detach() that is called
CLI_MIGRATE = BIT(4),
CLI_DIRTY_DATA = BIT(5),
CLI_NO_SLOT = BIT(6),
+ /**< read on open (used for directory for now) */
+ CLI_READ_ON_OPEN = BIT(7),
};
enum md_op_code {
#define OBD_CONNECT2_SPARSE 0x1000000000ULL /* sparse LNet read */
#define OBD_CONNECT2_MIRROR_ID_FIX 0x2000000000ULL /* rr_mirror_id move */
#define OBD_CONNECT2_UPDATE_LAYOUT 0x4000000000ULL /* update compressibility */
+#define OBD_CONNECT2_READDIR_OPEN 0x8000000000ULL /* read first dir page on open */
+
/* XXX README XXX README XXX README XXX README XXX README XXX README XXX
* Please DO NOT add OBD_CONNECT flags before first ensuring that this value
* is not in use by some other branch/patch. Email adilger@whamcloud.com
* * %Failure - Error pointer (pointed by rc)
*/
struct page *ll_get_dir_page(struct inode *dir, struct md_op_data *op_data,
- __u64 offset, int *partial_readdir_rc)
+ __u64 offset, bool hash64, int *partial_readdir_rc)
{
struct md_readdir_info mrinfo = {
.mr_blocking_ast = ll_md_blocking_ast };
struct page *page;
+ unsigned long idx = hash_x_index(offset, hash64);
int rc;
+ /* check page first */
+ page = find_get_page(dir->i_mapping, idx);
+ if (page) {
+ wait_on_page_locked(page);
+ if (PageUptodate(page))
+ RETURN(page);
+ put_page(page);
+ }
+
rc = md_read_page(ll_i2mdexp(dir), op_data, &mrinfo, offset, &page);
if (rc != 0)
return ERR_PTR(rc);
RETURN(rc);
}
- page = ll_get_dir_page(inode, op_data, pos, partial_readdir_rc);
+ page = ll_get_dir_page(inode, op_data, pos, is_hash64,
+ partial_readdir_rc);
while (rc == 0 && !done) {
struct lu_dirpage *dp;
LDF_COLLIDE);
next = pos;
page = ll_get_dir_page(inode, op_data, pos,
- partial_readdir_rc);
+ is_hash64, partial_readdir_rc);
}
}
#ifdef HAVE_DIR_CONTEXT
EXIT;
}
+void ll_dir_finish_open(struct inode *inode, struct ptlrpc_request *req)
+{
+ struct obd_export *exp = ll_i2mdexp(inode);
+ void *data;
+ struct page *page;
+ struct lu_dirpage *dp;
+ int is_hash64;
+ int rc;
+ unsigned long offset;
+ __u64 hash;
+ unsigned int i;
+ unsigned int npages;
+
+ ENTRY;
+
+ if (!exp_connect_open_readdir(exp))
+ RETURN_EXIT;
+
+ if (!req_capsule_field_present(&req->rq_pill, &RMF_NIOBUF_INLINE,
+ RCL_SERVER))
+ RETURN_EXIT;
+
+ data = req_capsule_server_get(&req->rq_pill, &RMF_NIOBUF_INLINE);
+ if (data == NULL)
+ RETURN_EXIT;
+
+ npages = req_capsule_get_size(&req->rq_pill, &RMF_NIOBUF_INLINE,
+ RCL_SERVER);
+ if (npages < sizeof(*dp))
+ RETURN_EXIT;
+
+ /* div rou*/
+ npages = DIV_ROUND_UP(npages, PAGE_SIZE);
+ is_hash64 = test_bit(LL_SBI_64BIT_HASH, ll_i2sbi(inode)->ll_flags);
+
+ for (i = 0; i < npages; i++) {
+ page = page_cache_alloc(inode->i_mapping);
+ if (!page)
+ continue;
+
+ lock_page(page);
+ SetPageUptodate(page);
+
+ dp = kmap_atomic(page);
+ memcpy(dp, data, PAGE_SIZE);
+ hash = le64_to_cpu(dp->ldp_hash_start);
+ kunmap_atomic(dp);
+
+ offset = hash_x_index(hash, is_hash64);
+
+ prefetchw(&page->flags);
+ rc = add_to_page_cache_lru(page, inode->i_mapping, offset,
+ GFP_KERNEL);
+ if (rc == 0)
+ unlock_page(page);
+
+ put_page(page);
+ }
+ EXIT;
+}
+
static int ll_intent_file_open(struct dentry *de, void *lmm, ssize_t lmmsize,
struct lookup_intent *itp)
op_data->op_data = lmm;
op_data->op_data_size = lmmsize;
+ if (!sbi->ll_dir_open_read && S_ISDIR(de->d_inode->i_mode))
+ op_data->op_cli_flags &= ~CLI_READ_ON_OPEN;
+
CFS_FAIL_TIMEOUT(OBD_FAIL_LLITE_OPEN_DELAY, cfs_fail_val);
rc = ll_intent_lock(sbi->ll_md_exp, op_data, itp, &req,
ll_set_lock_data(sbi->ll_md_exp, de->d_inode, itp, &bits);
if (bits & MDS_INODELOCK_DOM && bits & MDS_INODELOCK_LAYOUT)
ll_dom_finish_open(de->d_inode, req);
+ if (bits & MDS_INODELOCK_UPDATE && S_ISDIR(de->d_inode->i_mode))
+ ll_dir_finish_open(de->d_inode, req);
}
/* open may not fetch LOOKUP lock, update dir depth and default LMV
* anyway.
ll_checksum_set:1,
ll_inode_cache_enabled:1,
ll_enable_statahead_fname:1,
- ll_intent_mkdir_enabled:1;
+ ll_intent_mkdir_enabled:1,
+ ll_dir_open_read:1;
+
struct lustre_client_ocd ll_lco;
int ll_get_mdt_idx(struct inode *inode);
int ll_get_mdt_idx_by_fid(struct ll_sb_info *sbi, const struct lu_fid *fid);
struct page *ll_get_dir_page(struct inode *dir, struct md_op_data *op_data,
- __u64 offset, int *partial_readdir_rc);
+ __u64 offset, bool is64bit,
+ int *partial_readdir_rc);
void ll_release_page(struct inode *inode, struct page *page, bool remove);
int quotactl_ioctl(struct super_block *sb, struct if_quotactl *qctl);
void ll_quota_iter_check_and_cleanup(struct ll_sb_info *sbi, bool check);
void ll_open_cleanup(struct super_block *sb, struct req_capsule *pill);
void ll_dom_finish_open(struct inode *inode, struct ptlrpc_request *req);
+void ll_dir_finish_open(struct inode *inode, struct ptlrpc_request *req);
+
/* Compute expected user md size when passing in a md from user space */
static inline ssize_t ll_lov_user_md_size(const struct lov_user_md *lum)
OBD_CONNECT2_DMV_IMP_INHERIT |
OBD_CONNECT2_UNALIGNED_DIO |
OBD_CONNECT2_PCCRO |
- OBD_CONNECT2_MIRROR_ID_FIX;
+ OBD_CONNECT2_MIRROR_ID_FIX |
+ OBD_CONNECT2_READDIR_OPEN;
#ifdef HAVE_LRU_RESIZE_SUPPORT
if (test_bit(LL_SBI_LRU_RESIZE, sbi->ll_flags))
op_data->op_bias |= MDS_CREATE_VOLATILE;
}
op_data->op_data = data;
+ op_data->op_cli_flags |= CLI_READ_ON_OPEN;
return op_data;
}
}
LUSTRE_RW_ATTR(hybrid_io_read_threshold_bytes);
+static ssize_t dir_read_on_open_show(struct kobject *kobj,
+ struct attribute *attr,
+ char *buf)
+{
+ struct ll_sb_info *sbi = container_of(kobj, struct ll_sb_info,
+ ll_kset.kobj);
+
+ return snprintf(buf, PAGE_SIZE, "%u\n", sbi->ll_dir_open_read);
+}
+
+
+static ssize_t dir_read_on_open_store(struct kobject *kobj,
+ struct attribute *attr,
+ const char *buffer,
+ size_t count)
+{
+ struct ll_sb_info *sbi = container_of(kobj, struct ll_sb_info,
+ ll_kset.kobj);
+ bool val;
+ int rc;
+
+ rc = kstrtobool(buffer, &val);
+ if (rc)
+ return rc;
+
+ if (val)
+ sbi->ll_dir_open_read = 1;
+ else
+ sbi->ll_dir_open_read = 0;
+
+ return count;
+}
+
+LUSTRE_RW_ATTR(dir_read_on_open);
+
static int ll_unstable_stats_seq_show(struct seq_file *m, void *v)
{
struct super_block *sb = m->private;
&lustre_attr_pcc_async_threshold.attr,
&lustre_attr_pcc_mode.attr,
&lustre_attr_pcc_async_affinity.attr,
+ &lustre_attr_dir_read_on_open.attr,
NULL,
};
ll_set_lock_data(ll_i2sbi(parent)->ll_md_exp, inode, it, &bits);
/* OPEN can return data if lock has DoM+LAYOUT bits set */
- if (it->it_op & IT_OPEN &&
- bits & MDS_INODELOCK_DOM && bits & MDS_INODELOCK_LAYOUT)
- ll_dom_finish_open(inode, request);
+ if (it->it_op & IT_OPEN) {
+ if (bits & MDS_INODELOCK_DOM &&
+ bits & MDS_INODELOCK_LAYOUT)
+ ll_dom_finish_open(inode, request);
+ if (bits & MDS_INODELOCK_UPDATE &&
+ S_ISDIR(inode->i_mode))
+ ll_dir_finish_open(inode, request);
+ }
/* We used to query real size from OSTs here, but actually
* this is not needed. For stat() calls size would be updated
it->it_open_flags |= MDS_OPEN_BY_FID;
}
+ if (!sbi->ll_dir_open_read && it->it_op & IT_OPEN &&
+ it->it_open_flags & O_DIRECTORY)
+ op_data->op_cli_flags &= ~CLI_READ_ON_OPEN;
+
/* enforce umask if acl disabled or MDS doesn't support umask */
if (!IS_POSIXACL(parent) || !exp_connect_umask(ll_i2mdexp(parent)))
it->it_create_mode &= ~current_umask();
struct ll_sb_info *sbi = ll_i2sbi(dir);
struct md_op_data *op_data;
struct page *page = NULL;
+ bool is_hash64 = test_bit(LL_SBI_64BIT_HASH, sbi->ll_flags);
__u64 pos = 0;
int first = 0;
int rc = 0;
break;
}
- page = ll_get_dir_page(dir, op_data, pos, NULL);
+ page = ll_get_dir_page(dir, op_data, pos, is_hash64, NULL);
ll_unlock_md_op_lsm(op_data);
if (IS_ERR(page)) {
rc = PTR_ERR(page);
struct page *page = NULL;
int rc = LS_NOT_FIRST_DE;
__u64 pos = 0;
+ struct ll_sb_info *sbi = ll_i2sbi(dir);
+ bool is_hash64 = test_bit(LL_SBI_64BIT_HASH, sbi->ll_flags);
struct llcrypt_str lltr = LLTR_INIT(NULL, 0);
ENTRY;
*FIXME choose the start offset of the readdir
*/
- page = ll_get_dir_page(dir, op_data, 0, NULL);
+ page = ll_get_dir_page(dir, op_data, 0, is_hash64, NULL);
while (1) {
struct lu_dirpage *dp;
*/
ll_release_page(dir, page, le32_to_cpu(dp->ldp_flags) &
LDF_COLLIDE);
- page = ll_get_dir_page(dir, op_data, pos, NULL);
+ page = ll_get_dir_page(dir, op_data, pos, is_hash64,
+ NULL);
}
}
EXIT;
lvb->lvb_size = body->mbo_dom_size;
}
-static inline unsigned long hash_x_index(__u64 hash, int hash64)
-{
- if (BITS_PER_LONG == 32 && hash64)
- hash >>= 32;
- /* save hash 0 with hash 1 */
- return ~0UL - (hash + !hash);
-}
-
/* mdc_dev.c */
extern struct lu_device_type mdc_device_type;
int mdc_ldlm_blocking_ast(struct ldlm_lock *dlmlock,
* Such estimation is safe. Though the final allocated buffer might
* be even larger, it is not possible to know that at this point.
*/
- req->rq_reqmsg->lm_repsize = repsize;
+ if ((op_data->op_cli_flags & CLI_READ_ON_OPEN) != 0)
+ req->rq_reqmsg->lm_repsize = repsize;
+ else
+ req->rq_reqmsg->lm_repsize = 0;
RETURN(req);
err_put_sepol:
GOTO(out_unlock, rc);
if (mdd_is_dead_obj(mdd_obj)) {
- struct page *pg;
struct lu_dirpage *dp;
/*
GOTO(out_unlock, rc = -EFAULT);
LASSERT(rdpg->rp_pages != NULL);
- pg = rdpg->rp_pages[0];
- dp = (struct lu_dirpage *)kmap(pg);
+ dp = (struct lu_dirpage *)rdpg_page_get(rdpg, 0);
memset(dp, 0, sizeof(struct lu_dirpage));
dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
dp->ldp_hash_end = cpu_to_le64(MDS_DIR_END_OFF);
dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
- kunmap(pg);
+ rdpg_page_put(rdpg, 0);
GOTO(out_unlock, rc = LU_PAGE_SIZE);
}
if (rc >= 0) {
struct lu_dirpage *dp;
- dp = kmap(rdpg->rp_pages[0]);
+ dp = (struct lu_dirpage *)rdpg_page_get(rdpg, 0);
dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
if (rc == 0) {
/*
dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
rc = min_t(unsigned int, LU_PAGE_SIZE, rdpg->rp_count);
}
- kunmap(rdpg->rp_pages[0]);
+ rdpg_page_put(rdpg, 0);
}
GOTO(out_unlock, rc);
}
}
+int mdt_object_striped(struct mdt_thread_info *mti, struct mdt_object *obj)
+{
+ struct lu_device *bottom_dev;
+ struct lu_object *bottom_obj;
+ int rc;
+
+ if (!S_ISDIR(obj->mot_header.loh_attr))
+ return 0;
+
+ /* getxattr from bottom obj to avoid reading in shard FIDs */
+ bottom_dev = dt2lu_dev(mti->mti_mdt->mdt_bottom);
+ bottom_obj = lu_object_find_slice(mti->mti_env, bottom_dev,
+ mdt_object_fid(obj), NULL);
+ if (IS_ERR(bottom_obj))
+ return PTR_ERR(bottom_obj);
+
+ rc = dt_xattr_get(mti->mti_env, lu2dt(bottom_obj), &LU_BUF_NULL,
+ XATTR_NAME_LMV);
+ lu_object_put(mti->mti_env, bottom_obj);
+
+ return (rc > 0) ? 1 : (rc == -ENODATA) ? 0 : rc;
+}
+
+#define DIR_READ_ON_OPEN_PAGES 1
+
+static int mdt_dir_read_on_open(struct mdt_thread_info *info,
+ struct lustre_handle *lhc)
+{
+ const struct lu_env *env = info->mti_env;
+ struct lu_rdpg *rdpg = &info->mti_u.rdpg.mti_rdpg;
+ struct req_capsule *pill = info->mti_pill;
+ int rc;
+ struct mdt_body *mbo;
+ struct mdt_device *mdt = info->mti_mdt;
+ struct mdt_object *o;
+ struct ptlrpc_request *req = pill->rc_req;
+ bool have_lock = false;
+ struct lu_fid *fid; // dir fid
+
+ ENTRY;
+
+ if (CFS_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
+ GOTO(out_err, rc = -ENOMEM);
+
+ /* client don't want a reply */
+ if (!req->rq_reqmsg->lm_repsize)
+ RETURN(0);
+
+ if (lustre_handle_is_used(lhc)) {
+ struct ldlm_lock *lock;
+
+ lock = ldlm_handle2lock(lhc);
+ if (lock) {
+ have_lock = ldlm_has_update(lock);
+ ldlm_lock_put(lock);
+ }
+ }
+ if (!have_lock)
+ GOTO(out_err, rc = 0);
+
+ rdpg->rp_hash = 0;
+ rdpg->rp_attrs = LUDA_FID | LUDA_TYPE;
+ if (exp_connect_flags(info->mti_exp) & OBD_CONNECT_64BITHASH)
+ rdpg->rp_attrs |= LUDA_64BITHASH;
+ rdpg->rp_count = min_t(unsigned int, req->rq_reqmsg->lm_repsize,
+ DIR_READ_ON_OPEN_PAGES << PAGE_SHIFT);
+ rdpg->rp_npages = 0;
+
+ rc = req_capsule_server_grow(pill, &RMF_NIOBUF_INLINE, rdpg->rp_count);
+ if (rc != 0) {
+ /* failed to grow data buffer, just exit */
+ GOTO(out_err, rc = -E2BIG);
+ }
+
+ /* re-take MDT_BODY and NIOBUF_INLINE buffers after the buffer grow */
+ mbo = req_capsule_server_get(pill, &RMF_MDT_BODY);
+ fid = &mbo->mbo_fid1;
+ if (!fid_is_sane(fid))
+ GOTO(out_rnb, rc = -EINVAL);
+
+ rdpg->rp_data = req_capsule_server_get(pill, &RMF_NIOBUF_INLINE);
+ if (rdpg->rp_data == NULL)
+ GOTO(out_rnb, rc = -EPROTO);
+
+ o = mdt_object_find(info->mti_env, mdt, fid);
+ if (IS_ERR(o))
+ GOTO(out_rnb, rc = PTR_ERR(o));
+
+ if (!mdt_object_exists(o) ||
+ mdt_object_remote(o) ||
+ mdt_object_striped(info, o))
+ GOTO(out_put, rc = -ENOENT);
+
+ /* call lower layers to fill allocated pages with directory data */
+ rc = mo_readpage(env, mdt_object_child(o), rdpg);
+out_put:
+ mdt_object_put(env, o);
+
+out_rnb:
+ if (rc < 0)
+ req_capsule_shrink(pill, &RMF_NIOBUF_INLINE, 0, RCL_SERVER);
+out_err:
+ if (rc)
+ CDEBUG(D_INFO, "read dir on open failed with rc = %d\n", rc);
+ RETURN(0);
+}
+
+static int mdt_read_inline(struct mdt_thread_info *info,
+ struct mdt_lock_handle *lhc)
+{
+ struct req_capsule *pill = info->mti_pill;
+ struct md_attr *ma = &info->mti_attr;
+ struct lu_attr *la = &ma->ma_attr;
+ struct ptlrpc_request *req = pill->rc_req;
+ int rc = 0;
+
+ ENTRY;
+ if (!req_capsule_field_present(pill, &RMF_NIOBUF_INLINE, RCL_SERVER)) {
+ /* There is no reply buffers for this field, this means that
+ * client has no support for data in reply.
+ */
+ RETURN(0);
+ }
+ /* client don't want a reply */
+ if (!req->rq_reqmsg->lm_repsize)
+ RETURN(0);
+
+ if (S_ISREG(la->la_mode))
+ rc = mdt_dom_read_on_open(info, info->mti_mdt,
+ &lhc->mlh_reg_lh);
+ else if (S_ISDIR(la->la_mode))
+ rc = mdt_dir_read_on_open(info, &lhc->mlh_reg_lh);
+
+ return rc;
+}
+
static int mdt_reint_internal(struct mdt_thread_info *info,
struct mdt_lock_handle *lhc,
__u32 op)
* in reply when possible.
*/
if (rc == 0 && op == REINT_OPEN && !req_is_replay(pill->rc_req))
- rc = mdt_dom_read_on_open(info, info->mti_mdt,
- &lhc->mlh_reg_lh);
+ rc = mdt_read_inline(info, lhc);
return rc;
}
return lu_object_remote(&o->mot_obj);
}
+int mdt_object_striped(struct mdt_thread_info *mti, struct mdt_object *obj);
+
static inline const struct lu_fid *mdt_object_fid(const struct mdt_object *o)
{
return lu_object_fid(&o->mot_obj);
ENTRY;
- if (!req_capsule_field_present(pill, &RMF_NIOBUF_INLINE, RCL_SERVER)) {
- /* There is no reply buffers for this field, this means that
- * client has no support for data in reply.
- */
- RETURN(0);
- }
-
mbo = req_capsule_server_get(pill, &RMF_MDT_BODY);
if (!(mbo->mbo_valid & OBD_MD_DOM_SIZE))
RETURN(0);
__u32 dom_stripe = 0;
unsigned int dom_only = 0;
unsigned int dom_lock = 0;
+ struct ptlrpc_request *req = mdt_info_req(info);
ENTRY;
*ibits = 0;
trybits |= MDS_INODELOCK_DOM | MDS_INODELOCK_LAYOUT;
}
+ /*
+ * dir read on open - needs a update lock to protect an page
+ * cache contents lets take UPD
+ */
+ if (S_ISDIR(lu_object_attr(&obj->mot_obj)) &&
+ likely(req->rq_reqmsg->lm_repsize) &&
+ exp_connect_open_readdir(info->mti_exp) &&
+ likely(!(mdt_object_remote(obj) ||
+ mdt_object_striped(info, obj))) ){
+ *ibits |= MDS_INODELOCK_UPDATE;
+ lm = LCK_PR;
+ }
+
CDEBUG(D_INODE, "normal open:"DFID" lease count: %d, lm: %d\n",
PFID(mdt_object_fid(obj)),
atomic_read(&obj->mot_lease_count), lm);
if (ibits == 0 || rc == -MDT_EREMOTE_OPEN)
RETURN_EXIT;
- if (!(open_flags & MDS_OPEN_LOCK) && !(ibits & MDS_INODELOCK_LAYOUT) &&
- !(ibits & MDS_INODELOCK_DOM)) {
+ if (!(open_flags & MDS_OPEN_LOCK) &&
+ !(ibits & (MDS_INODELOCK_LAYOUT | MDS_INODELOCK_DOM)) &&
+ !S_ISDIR(lu_object_attr(&obj->mot_obj))) {
/* for the open request, the lock will only return to client
* if open or layout lock is granted. */
rc = 1;
}
+/* for dt_index*/
+void *rdpg_page_get(const struct lu_rdpg *rdpg, unsigned int index)
+{
+ if (rdpg->rp_npages) {
+ LASSERT(index < rdpg->rp_npages);
+ return kmap(rdpg->rp_pages[index]);
+ }
+ LASSERT(index * PAGE_SIZE < rdpg->rp_count);
+
+ return rdpg->rp_data + index * PAGE_SIZE;
+}
+EXPORT_SYMBOL(rdpg_page_get);
+
+void rdpg_page_put(const struct lu_rdpg *rdpg, unsigned int index)
+{
+ if (rdpg->rp_npages)
+ kunmap(rdpg->rp_pages[index]);
+}
+EXPORT_SYMBOL(rdpg_page_put);
+
/*
* Walk index and fill lu_page containers with key/record pairs
*
union lu_page *lp;
int i;
- LASSERT(pageidx < rdpg->rp_npages);
- lp = kmap(rdpg->rp_pages[pageidx]);
-
+ lp = rdpg_page_get(rdpg, pageidx);
/* fill lu pages */
for (i = 0; i < LU_PAGE_COUNT; i++, lp++, bytes-=LU_PAGE_SIZE) {
rc = filler(env, obj, lp,
/* end of index */
break;
}
- kunmap(rdpg->rp_pages[pageidx]);
+ rdpg_page_put(rdpg, pageidx);
}
out:
"sparse_read", /* 0x1000000000 */
"mirror_id_fix", /* 0x2000000000 */
"update_layout", /* 0x4000000000 */
+ "readdir_open", /* 0x8000000000 */
NULL
};
# asynchronous object destroy at MDT could cause bl ast to client
cancel_lru_locks osc
+ local old=$($LCTL get_param -n llite.*.dir_read_on_open)
+
+ # statahead_agl may cause extra glimpse which confuses results. LU-13017
+ $LCTL set_param -n llite.*.dir_read_on_open=0
+ stack_trap "$LCTL set_param -n llite.*.dir_read_on_open=$old" EXIT
+
stat $DIR/$tdir > /dev/null
can1=$(do_facet mds1 \
"$LCTL get_param -n ldlm.services.ldlm_canceld.stats" |
fi
lru_resize_disable mdc
+ stack_trap "lru_resize_enable mdc" EXIT
test_mkdir -p $DIR/$tdir/disable_lru_resize
createmany -o $DIR/$tdir/disable_lru_resize/f $NR