Whamcloud - gitweb
LU-17307 mdt: get dirent count by request 29/53229/10
authorLai Siyao <lai.siyao@whamcloud.com>
Sat, 4 Nov 2023 13:32:59 +0000 (09:32 -0400)
committerAndreas Dilger <adilger@whamcloud.com>
Tue, 12 Dec 2023 05:41:56 +0000 (05:41 +0000)
Add MA_DIRENT_CNT/LA_DIRENT_CNT to notify osd to get dirent count.
Set it in mdt_getattr_name_lock() and when auto-split is enabled so it
won't cause overhead when auto-split is disabled, and change
oo_dirent_count type to atomic_t so the result does not become
inaccurate over time from repeated addition/removal (which may
be used to know whether directory is empty or compare directories in
the future).

In osd_dirent_count() set oo_dirent_count to 0 before iteration to
avoid multiple threads iterate at the same time, which means the
result may not be accurate in this case, but it will be eventually.

Fixes: 03a4431dac ("LU-11025 osd: osd_attr_get() returns dirent count")
Signed-off-by: Lai Siyao <lai.siyao@whamcloud.com>
Change-Id: I2be6c0dcfda1c98995a269585c5d8d781a8a3b42
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/53229
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Hongchao Zhang <hongchao@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
lustre/include/lu_object.h
lustre/include/md_object.h
lustre/include/uapi/linux/lustre/lustre_user.h
lustre/mdt/mdt_handler.c
lustre/osd-ldiskfs/osd_handler.c
lustre/osd-ldiskfs/osd_internal.h

index fada7c8..e6814e1 100644 (file)
@@ -458,7 +458,7 @@ struct lu_attr {
        __u64           la_dirent_count;
 };
 
-#define LU_DIRENT_COUNT_UNSET  ~0ULL
+#define LU_DIRENT_COUNT_UNSET  -1
 
 /**
  * Layer in the layered object.
index 7deef4e..80f5a18 100644 (file)
@@ -75,6 +75,7 @@ enum ma_valid {
        MA_LMV_DEF      = BIT(8),
        MA_SOM          = BIT(9),
        MA_FORCE_LOG    = BIT(10), /* forced close logged in mdt_mfd_close */
+       MA_DIRENT_CNT   = BIT(11),
 };
 
 typedef enum {
index 990e301..6b1f909 100644 (file)
@@ -1596,6 +1596,7 @@ enum la_valid {
        LA_LSIZE        = 1 << 17,      /* 0x20000 */
        LA_LBLOCKS      = 1 << 18,      /* 0x40000 */
        LA_BTIME        = 1 << 19,      /* 0x80000 */
+       LA_DIRENT_CNT   = 1 << 20,     /* 0x100000 */
        /**
         * Attributes must be transmitted to OST objects
         */
index 236eaac..9008295 100644 (file)
@@ -1275,6 +1275,10 @@ int mdt_attr_get_complex(struct mdt_thread_info *info,
 
        if (need & MA_INODE) {
                ma->ma_need = MA_INODE;
+               if (need & MA_DIRENT_CNT)
+                       ma->ma_attr.la_valid |= LA_DIRENT_CNT;
+               else
+                       ma->ma_attr.la_valid &= ~LA_DIRENT_CNT;
                rc = mo_attr_get(env, next, ma);
                if (rc)
                        GOTO(out, rc);
@@ -2081,6 +2085,9 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
        if (parent == NULL)
                RETURN(-ENOENT);
 
+       if (info->mti_mdt->mdt_enable_dir_auto_split)
+               ma_need |= MA_DIRENT_CNT;
+
        if (info->mti_cross_ref) {
                /* Only getattr on the child. Parent is on another node. */
                mdt_set_disposition(info, ldlm_rep,
@@ -2119,7 +2126,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info,
                        RETURN(-ENOENT);
                }
 
-               rc = mdt_getattr_internal(info, child, 0);
+               rc = mdt_getattr_internal(info, child, ma_need);
                if (unlikely(rc != 0)) {
                        mdt_object_unlock(info, child, lhc, 1);
                        RETURN(rc);
index 6850059..46f20ba 100644 (file)
@@ -1545,7 +1545,7 @@ static int osd_object_init(const struct lu_env *env, struct lu_object *l,
                        result = 0;
                }
        }
-       obj->oo_dirent_count = LU_DIRENT_COUNT_UNSET;
+       atomic_set(&obj->oo_dirent_count, LU_DIRENT_COUNT_UNSET);
 
        LINVRNT(osd_invariant(obj));
        return result;
@@ -2892,34 +2892,36 @@ static int osd_dirent_count(const struct lu_env *env, struct dt_object *dt,
        LASSERT(S_ISDIR(obj->oo_inode->i_mode));
        LASSERT(fid_is_namespace_visible(lu_object_fid(&obj->oo_dt.do_lu)));
 
-       if (obj->oo_dirent_count != LU_DIRENT_COUNT_UNSET) {
-               *count = obj->oo_dirent_count;
-               RETURN(0);
-       }
-
        /* directory not initialized yet */
        if (!dt->do_index_ops) {
                *count = 0;
                RETURN(0);
        }
 
+       spin_lock(&obj->oo_guard);
+       *count = atomic_read(&obj->oo_dirent_count);
+       if (*count == LU_DIRENT_COUNT_UNSET)
+               atomic_set(&obj->oo_dirent_count, 0);
+       spin_unlock(&obj->oo_guard);
+       if (*count != LU_DIRENT_COUNT_UNSET)
+               RETURN(0);
+
+       *count = 0;
        iops = &dt->do_index_ops->dio_it;
        it = iops->init(env, dt, LUDA_64BITHASH);
        if (IS_ERR(it))
-               RETURN(PTR_ERR(it));
+               GOTO(out, rc = PTR_ERR(it));
 
        rc = iops->load(env, it, 0);
        if (rc < 0) {
-               if (rc == -ENODATA) {
+               if (rc == -ENODATA)
                        rc = 0;
-                       *count = 0;
-               }
-               GOTO(out, rc);
+               GOTO(put, rc);
        }
        if (rc > 0)
                rc = iops->next(env, it);
 
-       for (*count = 0; rc == 0 || rc == -ESTALE; rc = iops->next(env, it)) {
+       for (; rc == 0 || rc == -ESTALE; rc = iops->next(env, it)) {
                if (rc == -ESTALE)
                        continue;
 
@@ -2928,14 +2930,22 @@ static int osd_dirent_count(const struct lu_env *env, struct dt_object *dt,
 
                (*count)++;
        }
-       if (rc == 1) {
-               obj->oo_dirent_count = *count;
+       if (rc == 1 || rc == -ESTALE)
                rc = 0;
-       }
-out:
+put:
        iops->put(env, it);
        iops->fini(env, it);
-
+out:
+       /* If counting dirents failed, use the current count (if any).
+        *
+        * At worst this means the directory will not be split until the
+        * count can be completed successfully (remount or oo_dirent_count
+        * incremented by adding new entries).  This avoids re-walking
+        * the whole directory on each access and hitting the same error.
+        */
+       if (rc && *count == 0)
+               *count = LU_DIRENT_COUNT_UNSET;
+       atomic_set(&obj->oo_dirent_count, *count);
        RETURN(rc);
 }
 
@@ -2966,8 +2976,11 @@ static int osd_attr_get(const struct lu_env *env, struct dt_object *dt,
        spin_unlock(&obj->oo_guard);
 
        if (S_ISDIR(obj->oo_inode->i_mode) &&
+           (attr->la_valid & LA_DIRENT_CNT) &&
            fid_is_namespace_visible(lu_object_fid(&dt->do_lu)))
                rc = osd_dirent_count(env, dt, &attr->la_dirent_count);
+       else
+               attr->la_valid &= ~LA_DIRENT_CNT;
 
        return rc;
 }
@@ -3533,7 +3546,7 @@ static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
        oth = container_of(th, struct osd_thandle, ot_super);
        LASSERT(oth->ot_handle->h_transaction != NULL);
        if (fid_is_namespace_visible(lu_object_fid(&obj->oo_dt.do_lu)))
-               obj->oo_dirent_count = 0;
+               atomic_set(&obj->oo_dirent_count, 0);
        result = osd_mkfile(info, obj, mode, hint, th, attr);
 
        return result;
@@ -5765,16 +5778,8 @@ static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
                rc = PTR_ERR(bh);
        }
 
-       if (!rc && fid_is_namespace_visible(lu_object_fid(&dt->do_lu)) &&
-           obj->oo_dirent_count != LU_DIRENT_COUNT_UNSET) {
-               /* NB, dirent count may not be accurate, because it's counted
-                * without lock.
-                */
-               if (obj->oo_dirent_count)
-                       obj->oo_dirent_count--;
-               else
-                       obj->oo_dirent_count = LU_DIRENT_COUNT_UNSET;
-       }
+       if (!rc && fid_is_namespace_visible(lu_object_fid(&dt->do_lu)))
+               atomic_dec_if_positive(&obj->oo_dirent_count);
        if (hlock != NULL)
                ldiskfs_htree_unlock(hlock);
        else
@@ -6123,9 +6128,14 @@ static int osd_ea_add_rec(const struct lu_env *env, struct osd_object *pobj,
                                              hlock, th);
                }
        }
-       if (!rc && fid_is_namespace_visible(lu_object_fid(&pobj->oo_dt.do_lu))
-           && pobj->oo_dirent_count != LU_DIRENT_COUNT_UNSET)
-               pobj->oo_dirent_count++;
+       if (!rc && fid_is_namespace_visible(lu_object_fid(&pobj->oo_dt.do_lu))){
+               int dirent_count = atomic_read(&pobj->oo_dirent_count);
+
+               /* avoid extremely unlikely 2B-entry directory overflow case */
+               if (dirent_count != LU_DIRENT_COUNT_UNSET &&
+                   likely(dirent_count < INT_MAX - NR_CPUS))
+                       atomic_inc(&pobj->oo_dirent_count);
+       }
 
        if (hlock != NULL)
                ldiskfs_htree_unlock(hlock);
index b3c3488..a762d94 100644 (file)
@@ -158,12 +158,12 @@ struct osd_object {
 
        /* the i_flags in LMA */
        __u32                   oo_lma_flags;
+       atomic_t                oo_dirent_count;
 
         const struct lu_env    *oo_owner;
 
        struct list_head        oo_xattr_list;
        struct lu_object_header *oo_header;
-       __u64                   oo_dirent_count;
 };
 
 struct osd_obj_seq {