Whamcloud - gitweb
LU-18448 llite: read dir on open 69/57069/16
authorAlexey Lyashkov <alexey.lyashkov@hpe.com>
Fri, 15 Nov 2024 09:16:04 +0000 (12:16 +0300)
committerOleg Drokin <green@whamcloud.com>
Wed, 19 Mar 2025 23:31:12 +0000 (23:31 +0000)
Let's read some pages at directory start,
a clients needs it probably.

walk over ~100k directories with 150 files on last leaf.

readdir on open enabled.

    real    0m39.977s
    user    0m0.121s
    sys     0m7.161s

readdir on open disabled

    real    1m18.106s
    user    0m0.151s
    sys     0m15.666s

HPE-bug-id: LUS-7695
Signed-off-by: Alexey Lyashkov <alexey.lyashkov@hpe.com>
Change-Id: Iaa674ce0d2e5723b380d7ca09407b27a90bc37f5
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/57069
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
Reviewed-by: Andrew Perepechko <andrew.perepechko@hpe.com>
Reviewed-by: Alexander Boyko <alexander.boyko@hpe.com>
24 files changed:
lustre/include/lu_object.h
lustre/include/lustre_dlm.h
lustre/include/lustre_export.h
lustre/include/lustre_mdc.h
lustre/include/lustre_osc.h
lustre/include/obd.h
lustre/include/uapi/linux/lustre/lustre_idl.h
lustre/llite/dir.c
lustre/llite/file.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/lproc_llite.c
lustre/llite/namei.c
lustre/llite/statahead.c
lustre/mdc/mdc_internal.h
lustre/mdc/mdc_locks.c
lustre/mdd/mdd_object.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_io.c
lustre/mdt/mdt_open.c
lustre/obdclass/dt_object.c
lustre/obdclass/lprocfs_status.c
lustre/tests/sanity.sh

index 1035a33..b4ddc4c 100644 (file)
@@ -814,9 +814,16 @@ struct lu_rdpg {
        /** requested attr */
        __u32                   rp_attrs;
        /** pointers to pages */
-       struct page           **rp_pages;
+       union {
+               struct page     **rp_pages;
+               void            *rp_data;
+       };
 };
 
+/* for dt_index_walk / mdd_readpage */
+void *rdpg_page_get(const struct lu_rdpg *rdpg, unsigned int index);
+void rdpg_page_put(const struct lu_rdpg *rdpg, unsigned int index);
+
 enum lu_xattr_flags {
        LU_XATTR_REPLACE = BIT(0),
        LU_XATTR_CREATE  = BIT(1),
index 5e4f904..8cecf15 100644 (file)
@@ -1172,6 +1172,12 @@ static inline bool ldlm_has_dom(struct ldlm_lock *lock)
                !!(lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_DOM);
 }
 
+static inline bool ldlm_has_update(struct ldlm_lock *lock)
+{
+       return lock->l_resource->lr_type == LDLM_IBITS &&
+               lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_UPDATE;
+}
+
 static inline char *
 ldlm_ns_name(struct ldlm_namespace *ns)
 {
index 8814a21..d9fef16 100644 (file)
@@ -519,6 +519,11 @@ static inline bool exp_connect_batch_rpc(struct obd_export *exp)
        return (exp_connect_flags2(exp) & OBD_CONNECT2_BATCH_RPC);
 }
 
+static inline int exp_connect_open_readdir(struct obd_export *exp)
+{
+       return !!(exp_connect_flags2(exp) & OBD_CONNECT2_READDIR_OPEN);
+}
+
 enum {
        /* archive_ids in array format */
        KKUC_CT_DATA_ARRAY_MAGIC        = 0x092013cea,
index c5424a1..e6df5c5 100644 (file)
@@ -86,6 +86,15 @@ static inline void cl_lov_delay_create_clear(unsigned int *flags)
                *flags &= ~O_LOV_DELAY_CREATE;
 }
 
+static inline unsigned long hash_x_index(__u64 hash, int hash64)
+{
+       if (BITS_PER_LONG == 32 && hash64)
+               hash >>= 32;
+       /* save hash 0 with hash 1 */
+       return ~0UL - (hash + !hash);
+}
+
+
 /** @} mdc */
 
 #endif
index fbe0552..1e32634 100644 (file)
@@ -377,7 +377,7 @@ enum osc_lock_state {
  *
  * - When reply is received from the server (osc_enqueue_interpret())
  *      - ldlm_cli_enqueue_fini()
- *          - LDLM_LOCK_PUT(): releases caller reference acquired by
+ *          - ldlm_lock_put(): releases caller reference acquired by
  *            ldlm_lock_new().
  *          - if (rc != 0)
  *                ldlm_lock_decref(): error case: matches ldlm_cli_enqueue().
@@ -385,7 +385,7 @@ enum osc_lock_state {
  *
  * - When lock is being cancelled (ldlm_lock_cancel())
  *      - ldlm_lock_destroy()
- *          - LDLM_LOCK_PUT(): releases hash-table reference acquired by
+ *          - ldlm_lock_put(): releases hash-table reference acquired by
  *            ldlm_lock_new().
  *
  * osc_lock is detached from ldlm_lock by osc_lock_detach() that is called
index d030273..aebcb36 100644 (file)
@@ -915,6 +915,8 @@ enum md_cli_flags {
        CLI_MIGRATE     = BIT(4),
        CLI_DIRTY_DATA  = BIT(5),
        CLI_NO_SLOT     = BIT(6),
+       /**< read on open (used for directory for now) */
+       CLI_READ_ON_OPEN = BIT(7),
 };
 
 enum md_op_code {
index 1c00ee3..57fe42a 100644 (file)
@@ -848,6 +848,8 @@ struct ptlrpc_body_v2 {
 #define OBD_CONNECT2_SPARSE            0x1000000000ULL /* sparse LNet read */
 #define OBD_CONNECT2_MIRROR_ID_FIX     0x2000000000ULL /* rr_mirror_id move */
 #define OBD_CONNECT2_UPDATE_LAYOUT     0x4000000000ULL /* update compressibility */
+#define OBD_CONNECT2_READDIR_OPEN      0x8000000000ULL /* read first dir page on open */
+
 /* XXX README XXX README XXX README XXX README XXX README XXX README XXX
  * Please DO NOT add OBD_CONNECT flags before first ensuring that this value
  * is not in use by some other branch/patch.  Email adilger@whamcloud.com
index 246e4ae..1479223 100644 (file)
  * * %Failure - Error pointer (pointed by rc)
  */
 struct page *ll_get_dir_page(struct inode *dir, struct md_op_data *op_data,
-                            __u64 offset, int *partial_readdir_rc)
+                            __u64 offset, bool hash64, int *partial_readdir_rc)
 {
        struct md_readdir_info mrinfo = {
                                        .mr_blocking_ast = ll_md_blocking_ast };
        struct page *page;
+       unsigned long idx = hash_x_index(offset, hash64);
        int rc;
 
+       /* check page first */
+       page = find_get_page(dir->i_mapping, idx);
+       if (page) {
+               wait_on_page_locked(page);
+               if (PageUptodate(page))
+                       RETURN(page);
+               put_page(page);
+       }
+
        rc = md_read_page(ll_i2mdexp(dir), op_data, &mrinfo, offset, &page);
        if (rc != 0)
                return ERR_PTR(rc);
@@ -198,7 +208,8 @@ int ll_dir_read(struct inode *inode, __u64 *ppos, struct md_op_data *op_data,
                        RETURN(rc);
        }
 
-       page = ll_get_dir_page(inode, op_data, pos, partial_readdir_rc);
+       page = ll_get_dir_page(inode, op_data, pos, is_hash64,
+                               partial_readdir_rc);
 
        while (rc == 0 && !done) {
                struct lu_dirpage *dp;
@@ -291,7 +302,7 @@ int ll_dir_read(struct inode *inode, __u64 *ppos, struct md_op_data *op_data,
                                        LDF_COLLIDE);
                        next = pos;
                        page = ll_get_dir_page(inode, op_data, pos,
-                                              partial_readdir_rc);
+                                              is_hash64, partial_readdir_rc);
                }
        }
 #ifdef HAVE_DIR_CONTEXT
index 817cf70..00f1842 100644 (file)
@@ -640,6 +640,67 @@ out_io:
 
        EXIT;
 }
+void ll_dir_finish_open(struct inode *inode, struct ptlrpc_request *req)
+{
+       struct obd_export *exp = ll_i2mdexp(inode);
+       void *data;
+       struct page *page;
+       struct lu_dirpage *dp;
+       int is_hash64;
+       int rc;
+       unsigned long   offset;
+       __u64           hash;
+       unsigned int i;
+       unsigned int npages;
+
+       ENTRY;
+
+       if (!exp_connect_open_readdir(exp))
+               RETURN_EXIT;
+
+       if (!req_capsule_field_present(&req->rq_pill, &RMF_NIOBUF_INLINE,
+                                      RCL_SERVER))
+               RETURN_EXIT;
+
+       data = req_capsule_server_get(&req->rq_pill, &RMF_NIOBUF_INLINE);
+       if (data == NULL)
+               RETURN_EXIT;
+
+       npages = req_capsule_get_size(&req->rq_pill, &RMF_NIOBUF_INLINE,
+                                     RCL_SERVER);
+       if (npages < sizeof(*dp))
+               RETURN_EXIT;
+
+       /* div rou*/
+       npages = DIV_ROUND_UP(npages, PAGE_SIZE);
+       is_hash64 = test_bit(LL_SBI_64BIT_HASH, ll_i2sbi(inode)->ll_flags);
+
+       for (i = 0; i < npages; i++) {
+               page = page_cache_alloc(inode->i_mapping);
+               if (!page)
+                       continue;
+
+               lock_page(page);
+               SetPageUptodate(page);
+
+               dp = kmap_atomic(page);
+               memcpy(dp, data, PAGE_SIZE);
+               hash = le64_to_cpu(dp->ldp_hash_start);
+               kunmap_atomic(dp);
+
+               offset = hash_x_index(hash, is_hash64);
+
+               prefetchw(&page->flags);
+               rc = add_to_page_cache_lru(page, inode->i_mapping, offset,
+                                  GFP_KERNEL);
+               if (rc == 0)
+                       unlock_page(page);
+
+               put_page(page);
+       }
+       EXIT;
+}
+
 
 static int ll_intent_file_open(struct dentry *de, void *lmm, ssize_t lmmsize,
                                struct lookup_intent *itp)
@@ -693,6 +754,9 @@ retry:
        op_data->op_data = lmm;
        op_data->op_data_size = lmmsize;
 
+       if (!sbi->ll_dir_open_read && S_ISDIR(de->d_inode->i_mode))
+               op_data->op_cli_flags &= ~CLI_READ_ON_OPEN;
+
        CFS_FAIL_TIMEOUT(OBD_FAIL_LLITE_OPEN_DELAY, cfs_fail_val);
 
        rc = ll_intent_lock(sbi->ll_md_exp, op_data, itp, &req,
@@ -730,6 +794,8 @@ retry:
                ll_set_lock_data(sbi->ll_md_exp, de->d_inode, itp, &bits);
                if (bits & MDS_INODELOCK_DOM && bits & MDS_INODELOCK_LAYOUT)
                        ll_dom_finish_open(de->d_inode, req);
+               if (bits & MDS_INODELOCK_UPDATE && S_ISDIR(de->d_inode->i_mode))
+                       ll_dir_finish_open(de->d_inode, req);
        }
        /* open may not fetch LOOKUP lock, update dir depth and default LMV
         * anyway.
index cc88465..b15132f 100644 (file)
@@ -911,7 +911,9 @@ struct ll_sb_info {
                                 ll_checksum_set:1,
                                 ll_inode_cache_enabled:1,
                                 ll_enable_statahead_fname:1,
-                                ll_intent_mkdir_enabled:1;
+                                ll_intent_mkdir_enabled:1,
+                                ll_dir_open_read:1;
+
 
        struct lustre_client_ocd ll_lco;
 
@@ -1323,7 +1325,8 @@ int ll_dir_read(struct inode *inode, __u64 *pos, struct md_op_data *op_data,
 int ll_get_mdt_idx(struct inode *inode);
 int ll_get_mdt_idx_by_fid(struct ll_sb_info *sbi, const struct lu_fid *fid);
 struct page *ll_get_dir_page(struct inode *dir, struct md_op_data *op_data,
-                             __u64 offset, int *partial_readdir_rc);
+                             __u64 offset, bool is64bit,
+                             int *partial_readdir_rc);
 void ll_release_page(struct inode *inode, struct page *page, bool remove);
 int quotactl_ioctl(struct super_block *sb, struct if_quotactl *qctl);
 void ll_quota_iter_check_and_cleanup(struct ll_sb_info *sbi, bool check);
@@ -1566,6 +1569,8 @@ ssize_t ll_copy_user_md(const struct lov_user_md __user *md,
 void ll_open_cleanup(struct super_block *sb, struct req_capsule *pill);
 
 void ll_dom_finish_open(struct inode *inode, struct ptlrpc_request *req);
+void ll_dir_finish_open(struct inode *inode, struct ptlrpc_request *req);
+
 
 /* Compute expected user md size when passing in a md from user space */
 static inline ssize_t ll_lov_user_md_size(const struct lov_user_md *lum)
index 29228a7..4b2cc9b 100644 (file)
@@ -366,7 +366,8 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt)
                                   OBD_CONNECT2_DMV_IMP_INHERIT |
                                   OBD_CONNECT2_UNALIGNED_DIO |
                                   OBD_CONNECT2_PCCRO |
-                                  OBD_CONNECT2_MIRROR_ID_FIX;
+                                  OBD_CONNECT2_MIRROR_ID_FIX |
+                                  OBD_CONNECT2_READDIR_OPEN;
 
 #ifdef HAVE_LRU_RESIZE_SUPPORT
        if (test_bit(LL_SBI_LRU_RESIZE, sbi->ll_flags))
@@ -3966,6 +3967,7 @@ struct md_op_data *ll_prep_md_op_data(struct md_op_data *op_data,
                op_data->op_bias |= MDS_CREATE_VOLATILE;
        }
        op_data->op_data = data;
+       op_data->op_cli_flags |= CLI_READ_ON_OPEN;
 
        return op_data;
 }
index 11bc14a..b55fc5c 100644 (file)
@@ -2118,6 +2118,41 @@ static ssize_t hybrid_io_read_threshold_bytes_store(struct kobject *kobj,
 }
 LUSTRE_RW_ATTR(hybrid_io_read_threshold_bytes);
 
+static ssize_t dir_read_on_open_show(struct kobject *kobj,
+                                    struct attribute *attr,
+                                    char *buf)
+{
+       struct ll_sb_info *sbi = container_of(kobj, struct ll_sb_info,
+                                             ll_kset.kobj);
+
+       return snprintf(buf, PAGE_SIZE, "%u\n", sbi->ll_dir_open_read);
+}
+
+
+static ssize_t dir_read_on_open_store(struct kobject *kobj,
+                                     struct attribute *attr,
+                                     const char *buffer,
+                                     size_t count)
+{
+       struct ll_sb_info *sbi = container_of(kobj, struct ll_sb_info,
+                                             ll_kset.kobj);
+       bool val;
+       int rc;
+
+       rc = kstrtobool(buffer, &val);
+       if (rc)
+               return rc;
+
+       if (val)
+               sbi->ll_dir_open_read = 1;
+       else
+               sbi->ll_dir_open_read = 0;
+
+       return count;
+}
+
+LUSTRE_RW_ATTR(dir_read_on_open);
+
 static int ll_unstable_stats_seq_show(struct seq_file *m, void *v)
 {
        struct super_block      *sb    = m->private;
@@ -2467,6 +2502,7 @@ static struct attribute *llite_attrs[] = {
        &lustre_attr_pcc_async_threshold.attr,
        &lustre_attr_pcc_mode.attr,
        &lustre_attr_pcc_async_affinity.attr,
+       &lustre_attr_dir_read_on_open.attr,
        NULL,
 };
 
index 62044f6..3e4b1af 100644 (file)
@@ -706,9 +706,14 @@ static int ll_lookup_it_finish(struct ptlrpc_request *request,
 
                ll_set_lock_data(ll_i2sbi(parent)->ll_md_exp, inode, it, &bits);
                /* OPEN can return data if lock has DoM+LAYOUT bits set */
-               if (it->it_op & IT_OPEN &&
-                   bits & MDS_INODELOCK_DOM && bits & MDS_INODELOCK_LAYOUT)
-                       ll_dom_finish_open(inode, request);
+               if (it->it_op & IT_OPEN) {
+                       if (bits & MDS_INODELOCK_DOM &&
+                           bits & MDS_INODELOCK_LAYOUT)
+                               ll_dom_finish_open(inode, request);
+                       if (bits & MDS_INODELOCK_UPDATE &&
+                           S_ISDIR(inode->i_mode))
+                               ll_dir_finish_open(inode, request);
+               }
 
                /* We used to query real size from OSTs here, but actually
                 * this is not needed. For stat() calls size would be updated
@@ -1037,6 +1042,10 @@ static struct dentry *ll_lookup_it(struct inode *parent, struct dentry *dentry,
                        it->it_open_flags |= MDS_OPEN_BY_FID;
        }
 
+       if (!sbi->ll_dir_open_read && it->it_op & IT_OPEN &&
+           it->it_open_flags & O_DIRECTORY)
+               op_data->op_cli_flags &= ~CLI_READ_ON_OPEN;
+
        /* enforce umask if acl disabled or MDS doesn't support umask */
        if (!IS_POSIXACL(parent) || !exp_connect_umask(ll_i2mdexp(parent)))
                it->it_create_mode &= ~current_umask();
index eaf02b1..828ff8b 100644 (file)
@@ -1192,6 +1192,7 @@ static int ll_statahead_by_list(struct ll_statahead_info *sai,
        struct ll_sb_info *sbi = ll_i2sbi(dir);
        struct md_op_data *op_data;
        struct page *page = NULL;
+       bool is_hash64 = test_bit(LL_SBI_64BIT_HASH, sbi->ll_flags);
        __u64 pos = 0;
        int first = 0;
        int rc = 0;
@@ -1220,7 +1221,7 @@ static int ll_statahead_by_list(struct ll_statahead_info *sai,
                        break;
                }
 
-               page = ll_get_dir_page(dir, op_data, pos, NULL);
+               page = ll_get_dir_page(dir, op_data, pos, is_hash64, NULL);
                ll_unlock_md_op_lsm(op_data);
                if (IS_ERR(page)) {
                        rc = PTR_ERR(page);
@@ -1761,6 +1762,8 @@ static int is_first_dirent(struct inode *dir, struct dentry *dentry)
        struct page *page = NULL;
        int rc = LS_NOT_FIRST_DE;
        __u64 pos = 0;
+       struct ll_sb_info *sbi = ll_i2sbi(dir);
+       bool is_hash64 = test_bit(LL_SBI_64BIT_HASH, sbi->ll_flags);
        struct llcrypt_str lltr = LLTR_INIT(NULL, 0);
 
        ENTRY;
@@ -1781,7 +1784,7 @@ static int is_first_dirent(struct inode *dir, struct dentry *dentry)
         *FIXME choose the start offset of the readdir
         */
 
-       page = ll_get_dir_page(dir, op_data, 0, NULL);
+       page = ll_get_dir_page(dir, op_data, 0, is_hash64, NULL);
 
        while (1) {
                struct lu_dirpage *dp;
@@ -1883,7 +1886,8 @@ static int is_first_dirent(struct inode *dir, struct dentry *dentry)
                         */
                        ll_release_page(dir, page, le32_to_cpu(dp->ldp_flags) &
                                              LDF_COLLIDE);
-                       page = ll_get_dir_page(dir, op_data, pos, NULL);
+                       page = ll_get_dir_page(dir, op_data, pos, is_hash64,
+                                               NULL);
                }
        }
        EXIT;
index cc91848..a49eb17 100644 (file)
@@ -172,14 +172,6 @@ static inline void mdc_body2lvb(struct mdt_body *body, struct ost_lvb *lvb)
        lvb->lvb_size = body->mbo_dom_size;
 }
 
-static inline unsigned long hash_x_index(__u64 hash, int hash64)
-{
-       if (BITS_PER_LONG == 32 && hash64)
-               hash >>= 32;
-       /* save hash 0 with hash 1 */
-       return ~0UL - (hash + !hash);
-}
-
 /* mdc_dev.c */
 extern struct lu_device_type mdc_device_type;
 int mdc_ldlm_blocking_ast(struct ldlm_lock *dlmlock,
index ea62eee..2f6e0db 100644 (file)
@@ -422,7 +422,10 @@ mdc_intent_open_pack(struct obd_export *exp, struct lookup_intent *it,
         * Such estimation is safe. Though the final allocated buffer might
         * be even larger, it is not possible to know that at this point.
         */
-       req->rq_reqmsg->lm_repsize = repsize;
+       if ((op_data->op_cli_flags & CLI_READ_ON_OPEN) != 0)
+               req->rq_reqmsg->lm_repsize = repsize;
+       else
+               req->rq_reqmsg->lm_repsize = 0;
        RETURN(req);
 
 err_put_sepol:
index cdd1245..80815a8 100644 (file)
@@ -4149,7 +4149,6 @@ int mdd_readpage(const struct lu_env *env, struct md_object *obj,
                GOTO(out_unlock, rc);
 
        if (mdd_is_dead_obj(mdd_obj)) {
-               struct page *pg;
                struct lu_dirpage *dp;
 
                /*
@@ -4163,13 +4162,12 @@ int mdd_readpage(const struct lu_env *env, struct md_object *obj,
                        GOTO(out_unlock, rc = -EFAULT);
                LASSERT(rdpg->rp_pages != NULL);
 
-               pg = rdpg->rp_pages[0];
-               dp = (struct lu_dirpage *)kmap(pg);
+               dp = (struct lu_dirpage *)rdpg_page_get(rdpg, 0);
                memset(dp, 0, sizeof(struct lu_dirpage));
                dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
                dp->ldp_hash_end   = cpu_to_le64(MDS_DIR_END_OFF);
                dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
-               kunmap(pg);
+               rdpg_page_put(rdpg, 0);
                GOTO(out_unlock, rc = LU_PAGE_SIZE);
        }
 
@@ -4178,7 +4176,7 @@ int mdd_readpage(const struct lu_env *env, struct md_object *obj,
        if (rc >= 0) {
                struct lu_dirpage       *dp;
 
-               dp = kmap(rdpg->rp_pages[0]);
+               dp = (struct lu_dirpage *)rdpg_page_get(rdpg, 0);
                dp->ldp_hash_start = cpu_to_le64(rdpg->rp_hash);
                if (rc == 0) {
                        /*
@@ -4189,7 +4187,7 @@ int mdd_readpage(const struct lu_env *env, struct md_object *obj,
                        dp->ldp_flags = cpu_to_le32(LDF_EMPTY);
                        rc = min_t(unsigned int, LU_PAGE_SIZE, rdpg->rp_count);
                }
-               kunmap(rdpg->rp_pages[0]);
+               rdpg_page_put(rdpg, 0);
        }
 
        GOTO(out_unlock, rc);
index 8518d01..624ee9b 100644 (file)
@@ -3051,6 +3051,142 @@ static void mdt_preset_secctx_size(struct mdt_thread_info *info)
        }
 }
 
+int mdt_object_striped(struct mdt_thread_info *mti, struct mdt_object *obj)
+{
+       struct lu_device *bottom_dev;
+       struct lu_object *bottom_obj;
+       int rc;
+
+       if (!S_ISDIR(obj->mot_header.loh_attr))
+               return 0;
+
+       /* getxattr from bottom obj to avoid reading in shard FIDs */
+       bottom_dev = dt2lu_dev(mti->mti_mdt->mdt_bottom);
+       bottom_obj = lu_object_find_slice(mti->mti_env, bottom_dev,
+                                         mdt_object_fid(obj), NULL);
+       if (IS_ERR(bottom_obj))
+               return PTR_ERR(bottom_obj);
+
+       rc = dt_xattr_get(mti->mti_env, lu2dt(bottom_obj), &LU_BUF_NULL,
+                         XATTR_NAME_LMV);
+       lu_object_put(mti->mti_env, bottom_obj);
+
+       return (rc > 0) ? 1 : (rc == -ENODATA) ? 0 : rc;
+}
+
+#define DIR_READ_ON_OPEN_PAGES 1
+
+static int mdt_dir_read_on_open(struct mdt_thread_info *info,
+                               struct lustre_handle *lhc)
+{
+       const struct lu_env *env = info->mti_env;
+       struct lu_rdpg          *rdpg = &info->mti_u.rdpg.mti_rdpg;
+       struct req_capsule      *pill = info->mti_pill;
+       int                      rc;
+       struct mdt_body         *mbo;
+       struct mdt_device       *mdt = info->mti_mdt;
+       struct mdt_object       *o;
+       struct ptlrpc_request   *req = pill->rc_req;
+       bool have_lock = false;
+       struct lu_fid *fid; // dir fid
+
+       ENTRY;
+
+       if (CFS_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
+               GOTO(out_err, rc = -ENOMEM);
+
+       /* client don't want a reply */
+       if (!req->rq_reqmsg->lm_repsize)
+               RETURN(0);
+
+       if (lustre_handle_is_used(lhc)) {
+               struct ldlm_lock *lock;
+
+               lock = ldlm_handle2lock(lhc);
+               if (lock) {
+                       have_lock = ldlm_has_update(lock);
+                       ldlm_lock_put(lock);
+               }
+       }
+       if (!have_lock)
+               GOTO(out_err, rc = 0);
+
+       rdpg->rp_hash = 0;
+       rdpg->rp_attrs = LUDA_FID | LUDA_TYPE;
+       if (exp_connect_flags(info->mti_exp) & OBD_CONNECT_64BITHASH)
+               rdpg->rp_attrs |= LUDA_64BITHASH;
+       rdpg->rp_count  = min_t(unsigned int, req->rq_reqmsg->lm_repsize,
+                           DIR_READ_ON_OPEN_PAGES << PAGE_SHIFT);
+       rdpg->rp_npages = 0;
+
+       rc = req_capsule_server_grow(pill, &RMF_NIOBUF_INLINE, rdpg->rp_count);
+       if (rc != 0) {
+               /* failed to grow data buffer, just exit */
+               GOTO(out_err, rc = -E2BIG);
+       }
+
+       /* re-take MDT_BODY and NIOBUF_INLINE buffers after the buffer grow */
+       mbo = req_capsule_server_get(pill, &RMF_MDT_BODY);
+       fid = &mbo->mbo_fid1;
+       if (!fid_is_sane(fid))
+               GOTO(out_rnb, rc = -EINVAL);
+
+       rdpg->rp_data = req_capsule_server_get(pill, &RMF_NIOBUF_INLINE);
+       if (rdpg->rp_data == NULL)
+               GOTO(out_rnb, rc = -EPROTO);
+
+       o = mdt_object_find(info->mti_env, mdt, fid);
+       if (IS_ERR(o))
+               GOTO(out_rnb, rc = PTR_ERR(o));
+
+       if (!mdt_object_exists(o) ||
+            mdt_object_remote(o) ||
+            mdt_object_striped(info, o))
+               GOTO(out_put, rc = -ENOENT);
+
+       /* call lower layers to fill allocated pages with directory data */
+       rc = mo_readpage(env, mdt_object_child(o), rdpg);
+out_put:
+       mdt_object_put(env, o);
+
+out_rnb:
+       if (rc < 0)
+               req_capsule_shrink(pill, &RMF_NIOBUF_INLINE, 0, RCL_SERVER);
+out_err:
+       if (rc)
+               CDEBUG(D_INFO, "read dir on open failed with rc = %d\n", rc);
+       RETURN(0);
+}
+
+static int mdt_read_inline(struct mdt_thread_info *info,
+                          struct mdt_lock_handle *lhc)
+{
+       struct req_capsule      *pill = info->mti_pill;
+       struct md_attr          *ma  = &info->mti_attr;
+       struct lu_attr          *la  = &ma->ma_attr;
+       struct ptlrpc_request   *req = pill->rc_req;
+       int rc = 0;
+
+       ENTRY;
+       if (!req_capsule_field_present(pill, &RMF_NIOBUF_INLINE, RCL_SERVER)) {
+               /* There is no reply buffers for this field, this means that
+                * client has no support for data in reply.
+                */
+               RETURN(0);
+       }
+       /* client don't want a reply */
+       if (!req->rq_reqmsg->lm_repsize)
+               RETURN(0);
+
+       if (S_ISREG(la->la_mode))
+               rc = mdt_dom_read_on_open(info, info->mti_mdt,
+                                         &lhc->mlh_reg_lh);
+       else if (S_ISDIR(la->la_mode))
+               rc = mdt_dir_read_on_open(info, &lhc->mlh_reg_lh);
+
+       return rc;
+}
+
 static int mdt_reint_internal(struct mdt_thread_info *info,
                              struct mdt_lock_handle *lhc,
                              __u32 op)
@@ -3146,8 +3282,7 @@ out_shrink:
         * in reply when possible.
         */
        if (rc == 0 && op == REINT_OPEN && !req_is_replay(pill->rc_req))
-               rc = mdt_dom_read_on_open(info, info->mti_mdt,
-                                         &lhc->mlh_reg_lh);
+               rc = mdt_read_inline(info, lhc);
 
        return rc;
 }
index 56d590d..62bba01 100644 (file)
@@ -691,6 +691,8 @@ static inline int mdt_object_remote(const struct mdt_object *o)
        return lu_object_remote(&o->mot_obj);
 }
 
+int mdt_object_striped(struct mdt_thread_info *mti, struct mdt_object *obj);
+
 static inline const struct lu_fid *mdt_object_fid(const struct mdt_object *o)
 {
        return lu_object_fid(&o->mot_obj);
index 9e0aeac..c2594bb 100644 (file)
@@ -1789,13 +1789,6 @@ int mdt_dom_read_on_open(struct mdt_thread_info *mti, struct mdt_device *mdt,
 
        ENTRY;
 
-       if (!req_capsule_field_present(pill, &RMF_NIOBUF_INLINE, RCL_SERVER)) {
-               /* There is no reply buffers for this field, this means that
-                * client has no support for data in reply.
-                */
-               RETURN(0);
-       }
-
        mbo = req_capsule_server_get(pill, &RMF_MDT_BODY);
        if (!(mbo->mbo_valid & OBD_MD_DOM_SIZE))
                RETURN(0);
index 4763926..55904d0 100644 (file)
@@ -805,6 +805,7 @@ static int mdt_object_open_lock(struct mdt_thread_info *info,
        __u32 dom_stripe = 0;
        unsigned int dom_only = 0;
        unsigned int dom_lock = 0;
+       struct ptlrpc_request *req = mdt_info_req(info);
 
        ENTRY;
        *ibits = 0;
@@ -893,6 +894,19 @@ static int mdt_object_open_lock(struct mdt_thread_info *info,
                        trybits |= MDS_INODELOCK_DOM | MDS_INODELOCK_LAYOUT;
                }
 
+               /*
+                * dir read on open - needs a update lock to protect an page
+                * cache contents lets take UPD
+                */
+               if (S_ISDIR(lu_object_attr(&obj->mot_obj)) &&
+                   likely(req->rq_reqmsg->lm_repsize) &&
+                   exp_connect_open_readdir(info->mti_exp) &&
+                   likely(!(mdt_object_remote(obj) ||
+                   mdt_object_striped(info, obj))) ){
+                       *ibits |= MDS_INODELOCK_UPDATE;
+                       lm = LCK_PR;
+               }
+
                CDEBUG(D_INODE, "normal open:"DFID" lease count: %d, lm: %d\n",
                        PFID(mdt_object_fid(obj)),
                        atomic_read(&obj->mot_lease_count), lm);
@@ -1032,8 +1046,9 @@ static void mdt_object_open_unlock(struct mdt_thread_info *info,
        if (ibits == 0 || rc == -MDT_EREMOTE_OPEN)
                RETURN_EXIT;
 
-       if (!(open_flags & MDS_OPEN_LOCK) && !(ibits & MDS_INODELOCK_LAYOUT) &&
-           !(ibits & MDS_INODELOCK_DOM)) {
+       if (!(open_flags & MDS_OPEN_LOCK) &&
+           !(ibits & (MDS_INODELOCK_LAYOUT | MDS_INODELOCK_DOM)) &&
+           !S_ISDIR(lu_object_attr(&obj->mot_obj))) {
                /* for the open request, the lock will only return to client
                 * if open or layout lock is granted. */
                rc = 1;
index c243456..8816cce 100644 (file)
@@ -794,6 +794,26 @@ out:
 }
 
 
+/* for dt_index*/
+void *rdpg_page_get(const struct lu_rdpg *rdpg, unsigned int index)
+{
+       if (rdpg->rp_npages) {
+               LASSERT(index < rdpg->rp_npages);
+               return kmap(rdpg->rp_pages[index]);
+       }
+       LASSERT(index * PAGE_SIZE  < rdpg->rp_count);
+
+       return rdpg->rp_data + index * PAGE_SIZE;
+}
+EXPORT_SYMBOL(rdpg_page_get);
+
+void rdpg_page_put(const struct lu_rdpg *rdpg, unsigned int index)
+{
+       if (rdpg->rp_npages)
+               kunmap(rdpg->rp_pages[index]);
+}
+EXPORT_SYMBOL(rdpg_page_put);
+
 /*
  * Walk index and fill lu_page containers with key/record pairs
  *
@@ -869,9 +889,7 @@ int dt_index_walk(const struct lu_env *env, struct dt_object *obj,
                union lu_page   *lp;
                int              i;
 
-               LASSERT(pageidx < rdpg->rp_npages);
-               lp = kmap(rdpg->rp_pages[pageidx]);
-
+               lp = rdpg_page_get(rdpg, pageidx);
                /* fill lu pages */
                for (i = 0; i < LU_PAGE_COUNT; i++, lp++, bytes-=LU_PAGE_SIZE) {
                        rc = filler(env, obj, lp,
@@ -885,7 +903,7 @@ int dt_index_walk(const struct lu_env *env, struct dt_object *obj,
                                /* end of index */
                                break;
                }
-               kunmap(rdpg->rp_pages[pageidx]);
+               rdpg_page_put(rdpg, pageidx);
        }
 
 out:
index 91bb63d..b44d6c6 100644 (file)
@@ -630,6 +630,7 @@ static const char *const obd_connect_names[] = {
        "sparse_read",                 /* 0x1000000000 */
        "mirror_id_fix",               /* 0x2000000000 */
        "update_layout",               /* 0x4000000000 */
+       "readdir_open",                /* 0x8000000000 */
        NULL
 };
 
index ec590af..3091244 100755 (executable)
@@ -15140,6 +15140,12 @@ test_120a() {
        # asynchronous object destroy at MDT could cause bl ast to client
        cancel_lru_locks osc
 
+       local old=$($LCTL get_param -n llite.*.dir_read_on_open)
+
+       # statahead_agl may cause extra glimpse which confuses results. LU-13017
+       $LCTL set_param -n llite.*.dir_read_on_open=0
+       stack_trap "$LCTL set_param -n llite.*.dir_read_on_open=$old" EXIT
+
        stat $DIR/$tdir > /dev/null
        can1=$(do_facet mds1 \
               "$LCTL get_param -n ldlm.services.ldlm_canceld.stats" |
@@ -16119,6 +16125,7 @@ test_124b() {
        fi
 
        lru_resize_disable mdc
+       stack_trap "lru_resize_enable mdc" EXIT
        test_mkdir -p $DIR/$tdir/disable_lru_resize
 
         createmany -o $DIR/$tdir/disable_lru_resize/f $NR