Whamcloud - gitweb
LU-16405 osd: lookup cache
authorAlex Zhuravlev <bzzz@whamcloud.com>
Mon, 3 Apr 2023 10:32:59 +0000 (13:32 +0300)
committerAndreas Dilger <adilger@whamcloud.com>
Mon, 28 Aug 2023 16:17:02 +0000 (16:17 +0000)
MDT may need to re-lookup just checked names (after locking).
introduce a trivial tiny per-thread cache in OSD in order to
make such a repeating lookup cheap.

the original issue is that ext4_add_entry() doesn't really
check for possible duplicate (that would be expensive as
a whole 4K block must be scanned).

important: the cache is reset upon request processing completion as
we don't update iversion on a disk (due to conflict with VBR).

Lustre-change: https://review.whamcloud.com/50521
Lustre-commit: 29f8eb2a67ba2806d91d93de1e82e05a63f76382

Fixes: 79acb9a9e7 ("LU-10235 mdt: mdt_create: check EEXIST without lock")
Signed-off-by: Alex Zhuravlev <bzzz@whamcloud.com>
Change-Id: I40c3ee702f7895c3bda00b380f904cd587e0a1c4
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Lai Siyao <lai.siyao@whamcloud.com>
Reviewed-on: https://review.whamcloud.com/c/ex/lustre-release/+/51809
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
config/lustre-build-ldiskfs.m4
lustre/include/obd_support.h
lustre/mdt/mdt_reint.c
lustre/osd-ldiskfs/osd_handler.c
lustre/osd-ldiskfs/osd_internal.h
lustre/tests/sanityn.sh

index 2269c3b..a96d207 100644 (file)
@@ -417,6 +417,22 @@ handle_t_h_revoke_credits, [
 EXTRA_KCFLAGS="$tmp_flags"
 ]) # LB_JBD2_H_TOTAL_CREDITS
 
+# LB_HAVE_INODE_IVERSION
+#
+AC_DEFUN([LB_HAVE_INODE_IVERSION], [
+LB_CHECK_COMPILE([if iversion primitives defined],
+inode_set_iversion, [
+       #include <linux/iversion.h>
+],[
+       struct inode i;
+
+       inode_set_iversion(&i, 0);
+],[
+       AC_DEFINE(HAVE_INODE_IVERSION, 1,
+               [iversion primitives defined])
+])
+]) # LB_HAVE_INODE_IVERSION
+
 #
 # LB_CONFIG_LDISKFS
 #
@@ -470,6 +486,7 @@ AS_IF([test x$enable_ldiskfs != xno],[
        LB_LDISKFS_FIND_ENTRY_LOCKED_EXISTS
        LB_LDISKFSFS_DIRHASH_WANTS_DIR
        LB_JBD2_H_TOTAL_CREDITS
+       LB_HAVE_INODE_IVERSION
        AC_DEFINE(CONFIG_LDISKFS_FS_POSIX_ACL, 1, [posix acls for ldiskfs])
        AC_DEFINE(CONFIG_LDISKFS_FS_SECURITY, 1, [fs security for ldiskfs])
        AC_DEFINE(CONFIG_LDISKFS_FS_XATTR, 1, [extened attributes for ldiskfs])
index c3bc3f7..77da035 100644 (file)
@@ -250,6 +250,7 @@ extern char obd_jobid_var[];
 #define OBD_FAIL_MDS_CHANGELOG_IDX_PUMP         0x16d
 #define OBD_FAIL_MDS_DELAY_DELORPHAN    0x16e
 #define OBD_FAIL_MDS_DIR_PAGE_WALK      0x16f
+/* continue at 0x2400, see below */
 
 /* layout lock */
 #define OBD_FAIL_MDS_NO_LL_GETATTR      0x170
@@ -752,6 +753,11 @@ extern char obd_jobid_var[];
 
 #define OBD_FAIL_OSD_FAIL_AT_TRUNCATE          0x2301
 
+/* continuation of MDS related constants */
+#define OBD_FAIL_MDS_PAUSE_CREATE_AFTER_LOOKUP 0x2401
+
+/* PLEASE, KEEP NUMBERS UP TO 0x3000 RESERVED FOR OBD_FAIL_MDS_* */
+
 /* LNet is allocated failure locations 0xe000 to 0xffff */
 /* Assign references to moved code to reduce code changes */
 #define OBD_FAIL_PRECHECK(id)                   (unlikely(CFS_FAIL_PRECHECK(id)))
index 455efc6..df0cc5f 100644 (file)
@@ -602,6 +602,8 @@ static int mdt_create(struct mdt_thread_info *info)
        if (rc != -ENOENT)
                GOTO(put_parent, rc);
 
+       OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PAUSE_CREATE_AFTER_LOOKUP, cfs_fail_val);
+
        /* save version of file name for replay, it must be ENOENT here */
        mdt_enoent_version_save(info, 1);
 
@@ -619,6 +621,17 @@ static int mdt_create(struct mdt_thread_info *info)
                        GOTO(unlock_parent, rc);
        }
 
+       /*
+        * now repeat the lookup having a LDLM lock on the parent dir,
+        * as another thread could create the same name. notice this
+        * lookup is supposed to hit cache in OSD and be cheap if the
+        * directory is not being modified concurrently.
+        */
+       rc = mdo_lookup(info->mti_env, mdt_object_child(parent), &rr->rr_name,
+                       &info->mti_tmp_fid1, &info->mti_spec);
+       if (unlikely(rc == 0))
+               GOTO(unlock_parent, rc = -EEXIST);
+
        child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2);
        if (unlikely(IS_ERR(child)))
                GOTO(unlock_parent, rc = PTR_ERR(child));
index a4a1e8d..2c783bf 100644 (file)
 #include <linux/module.h>
 #include <linux/user_namespace.h>
 #include <linux/uidgid.h>
+#ifdef HAVE_INODE_IVERSION
+#include <linux/iversion.h>
+#else
+#define inode_peek_iversion(__inode)   ((__inode)->i_version)
+#endif
 
 /* prerequisite for linux/xattr.h */
 #include <linux/types.h>
@@ -111,6 +116,8 @@ static struct lu_kmem_descr ldiskfs_caches[] = {
        }
 };
 
+static atomic_t osd_mount_seq;
+
 static const char dot[] = ".";
 static const char dotdot[] = "..";
 
@@ -6278,7 +6285,7 @@ again:
  * \retval -ve, on error
  */
 static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
-                            struct dt_rec *rec, const struct dt_key *key)
+                            struct dt_rec *rec, const struct lu_name *ln)
 {
        struct inode *dir = obj->oo_inode;
        struct dentry *dentry;
@@ -6286,7 +6293,6 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
        struct buffer_head *bh;
        struct lu_fid *fid = (struct lu_fid *)rec;
        struct htree_lock *hlock = NULL;
-       struct lu_name ln;
        int ino;
        int rc;
 
@@ -6295,11 +6301,7 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
        LASSERT(dir->i_op != NULL);
        LASSERT(dir->i_op->lookup != NULL);
 
-       rc = obj_name2lu_name(obj, (char *)key, strlen((char *)key), &ln);
-       if (rc)
-               RETURN(rc);
-
-       dentry = osd_child_dentry_get(env, obj, ln.ln_name, ln.ln_namelen);
+       dentry = osd_child_dentry_get(env, obj, ln->ln_name, ln->ln_namelen);
 
        if (obj->oo_hl_head != NULL) {
                hlock = osd_oti_get(env)->oti_hlock;
@@ -6328,15 +6330,13 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
                brelse(bh);
                if (rc != 0) {
                        if (unlikely(is_remote_parent_ino(dev, ino))) {
-                               const char *name = (const char *)key;
-
                                /*
                                 * If the parent is on remote MDT, and there
                                 * is no FID-in-dirent, then we have to get
                                 * the parent FID from the linkEA.
                                 */
-                               if (likely(strlen(name) == 2 &&
-                                          name[0] == '.' && name[1] == '.'))
+                               if (likely(ln->ln_namelen == 2 &&
+                                          ln->ln_name[0] == '.' && ln->ln_name[1] == '.'))
                                        rc = osd_get_pfid_from_linkea(env, obj,
                                                                      fid);
                        } else {
@@ -6375,8 +6375,6 @@ out:
                ldiskfs_htree_unlock(hlock);
        else
                up_read(&obj->oo_ext_idx_sem);
-       if (ln.ln_name != (char *)key)
-               kfree(ln.ln_name);
        RETURN(rc);
 }
 
@@ -7779,6 +7777,96 @@ static int osd_it_ea_load(const struct lu_env *env,
        RETURN(rc);
 }
 
+int osd_olc_lookup(const struct lu_env *env, struct osd_object *obj,
+                         u64 iversion, struct dt_rec *rec,
+                         const struct lu_name *ln, int *result)
+{
+       struct osd_thread_info *oti = osd_oti_get(env);
+       struct osd_lookup_cache *olc = oti->oti_lookup_cache;
+       struct osd_device *osd = osd_obj2dev(obj);
+       struct osd_lookup_cache_object *cobj = &oti->oti_cobj;
+       int i;
+
+       if (unlikely(olc == NULL))
+               return 0;
+
+       if (unlikely(atomic_read(&osd_mount_seq) != olc->olc_mount_seq)) {
+               /*
+                * umount has happened, a new OSD could land to the previous
+                * address so we can't use it any more, invalidate our cache
+                */
+               memset(olc, 0, sizeof(*olc));
+               olc->olc_mount_seq = atomic_read(&osd_mount_seq);
+               return 0;
+       }
+
+       memset(cobj, 0, sizeof(*cobj));
+       cobj->lco_osd = osd;
+       cobj->lco_ino = obj->oo_inode->i_ino;
+       cobj->lco_gen = obj->oo_inode->i_generation;
+       cobj->lco_version = iversion;
+
+       for (i = 0; i < OSD_LOOKUP_CACHE_MAX; i++) {
+               struct osd_lookup_cache_entry *entry;
+
+               entry = &olc->olc_entry[i];
+               /* compare if osd/ino/generation/version match */
+               if (memcmp(&entry->lce_obj, cobj, sizeof(*cobj)) != 0)
+                       continue;
+               if (entry->lce_namelen != ln->ln_namelen)
+                       continue;
+               if (memcmp(entry->lce_name, ln->ln_name, ln->ln_namelen) != 0)
+                       continue;
+               /* match */
+               memcpy(rec, &entry->lce_fid, sizeof(entry->lce_fid));
+               *result = entry->lce_rc;
+               return 1;
+       }
+       return 0;
+}
+
+void osd_olc_save(const struct lu_env *env, struct osd_object *obj,
+                         struct dt_rec *rec, const struct lu_name *ln,
+                         const int result, u64 iversion)
+{
+       struct osd_thread_info *oti = osd_oti_get(env);
+       struct osd_lookup_cache_entry *entry;
+       struct osd_lookup_cache *olc;
+
+       if (unlikely(oti->oti_lookup_cache == NULL)) {
+               OBD_ALLOC_PTR(oti->oti_lookup_cache);
+               if (oti->oti_lookup_cache == NULL)
+                       return;
+       }
+
+       olc = oti->oti_lookup_cache;
+       if (unlikely(atomic_read(&osd_mount_seq) != olc->olc_mount_seq)) {
+               memset(olc, 0, sizeof(*olc));
+               olc->olc_mount_seq = atomic_read(&osd_mount_seq);
+       }
+
+       entry = &olc->olc_entry[olc->olc_cur];
+
+       /* invaliate cache slot if needed */
+       if (entry->lce_obj.lco_osd)
+               memset(&entry->lce_obj, 0, sizeof(entry->lce_obj));
+
+       /* XXX: some kind of LRU */
+       entry->lce_obj.lco_osd = osd_obj2dev(obj);
+       entry->lce_obj.lco_ino = obj->oo_inode->i_ino;
+       entry->lce_obj.lco_gen = obj->oo_inode->i_generation;
+       entry->lce_obj.lco_version = iversion;
+
+       LASSERT(ln->ln_namelen <= LDISKFS_NAME_LEN);
+       entry->lce_namelen = ln->ln_namelen;
+       memcpy(entry->lce_name, ln->ln_name, ln->ln_namelen);
+       memcpy(&entry->lce_fid, rec, sizeof(entry->lce_fid));
+       entry->lce_rc = result;
+
+       if (++olc->olc_cur == OSD_LOOKUP_CACHE_MAX)
+               olc->olc_cur = 0;
+}
+
 /**
  * Index lookup function for interoperability mode (b11826).
  *
@@ -7791,16 +7879,37 @@ static int osd_index_ea_lookup(const struct lu_env *env, struct dt_object *dt,
                               struct dt_rec *rec, const struct dt_key *key)
 {
        struct osd_object *obj = osd_dt_obj(dt);
-       int rc = 0;
+       struct lu_name ln;
+       int rc, result;
+       u64 iversion;
 
        ENTRY;
 
        LASSERT(S_ISDIR(obj->oo_inode->i_mode));
        LINVRNT(osd_invariant(obj));
 
-       rc = osd_ea_lookup_rec(env, obj, rec, key);
+       rc = obj_name2lu_name(obj, (char *)key, strlen((char *)key), &ln);
+       if (rc)
+               RETURN(rc);
+
+       /*
+        * grab version before actual lookup, so that we recognize potential
+        * insert between osd_ea_lookup_rec() and osd_olc_save()
+        */
+       iversion = inode_peek_iversion(obj->oo_inode);
+
+       if (osd_olc_lookup(env, obj, iversion, rec, &ln, &result))
+               GOTO(out, rc = result);
+
+       rc = osd_ea_lookup_rec(env, obj, rec, &ln);
        if (rc == 0)
                rc = 1;
+
+       osd_olc_save(env, obj, rec, &ln, rc, iversion);
+
+out:
+       if (ln.ln_name != (char *)key)
+               kfree(ln.ln_name);
        RETURN(rc);
 }
 
@@ -7895,6 +8004,8 @@ static void osd_key_fini(const struct lu_context *ctx,
                info->oti_ins_cache = NULL;
                info->oti_ins_cache_size = 0;
        }
+       if (info->oti_lookup_cache)
+               OBD_FREE_PTR(info->oti_lookup_cache);
        OBD_FREE_PTR(info);
 }
 
@@ -7902,7 +8013,10 @@ static void osd_key_exit(const struct lu_context *ctx,
                         struct lu_context_key *key, void *data)
 {
        struct osd_thread_info *info = data;
+       struct osd_lookup_cache *olc = info->oti_lookup_cache;
 
+       if (olc)
+               memset(olc, 0, sizeof(*olc));
        LASSERT(info->oti_r_locks == 0);
        LASSERT(info->oti_w_locks == 0);
        LASSERT(info->oti_txns    == 0);
@@ -8004,6 +8118,8 @@ static void osd_umount(const struct lu_env *env, struct osd_device *o)
 {
        ENTRY;
 
+       atomic_inc(&osd_mount_seq);
+
        if (o->od_mnt != NULL) {
                shrink_dcache_sb(osd_sb(o));
                osd_sync(env, &o->od_dt_dev);
index 056740e..09ad631 100644 (file)
@@ -645,6 +645,36 @@ static inline struct osd_timespec osd_inode_time(struct inode *inode,
        return ts;
 }
 
+/*
+ * any object can be identified unique way:
+ *  - OSD (filesystem)
+ *  - inode + generation (unique object within a given filesystem)
+ *
+ * version increments each insert/delete in a directory
+ */
+struct osd_lookup_cache_object {
+       struct osd_device *lco_osd;
+       u32             lco_ino;
+       u32             lco_gen;
+       u32             lco_version;
+};
+
+struct osd_lookup_cache_entry {
+       struct osd_lookup_cache_object lce_obj;
+       struct lu_fid   lce_fid;
+       short           lce_rc;
+       short           lce_namelen;
+       char            lce_name[LDISKFS_NAME_LEN];
+};
+
+#define OSD_LOOKUP_CACHE_MAX   3
+
+struct osd_lookup_cache {
+       struct osd_lookup_cache_entry   olc_entry[OSD_LOOKUP_CACHE_MAX];
+       int                             olc_cur;
+       int                             olc_mount_seq;
+};
+
 #define OSD_INS_CACHE_SIZE     8
 
 struct osd_thread_info {
@@ -754,6 +784,9 @@ struct osd_thread_info {
 
        struct osd_it_ea_dirent *oti_seq_dirent;
        struct osd_it_ea_dirent *oti_dir_dirent;
+
+       struct osd_lookup_cache_object oti_cobj; /* cache object id */
+       struct osd_lookup_cache *oti_lookup_cache;
 };
 
 extern int ldiskfs_pdo;
index dea335c..d509f78 100755 (executable)
@@ -5932,6 +5932,56 @@ test_113 () {
 }
 run_test 113 "check servers of specified fs"
 
+test_115() {
+       (( MDS1_VERSION >= $(version_code 2.14.0.94) )) ||
+               skip "Need server version at least 2.14.0.94"
+
+       local td=$DIR/$tdir
+
+       [ "$mds1_FSTYPE" == "ldiskfs" ] || skip_env "ldiskfs only test"
+
+       mkdir_on_mdt0 $td || error "can't mkdir"
+       # turn it htree (don't really needed)
+       createmany -m $td/f 3000 || error "can't createmany"
+
+       # here is an example of debugfs output for htree command:
+       # Entry #0: Hash 0x00000000, block 27
+       # Reading directory block 27, phys 16760
+       # 938 0x0016fb58-7f3d21f5 (32) f775   834 0x001db8c8-d31a4e0e (32) f671
+       # 1085 0x0040cb70-4498abd4 (32) f922   1850 0x0066a1e6-f6f0dc69 (32) f1687
+       # 2005 0x006c1a46-ef466058 (32) f1842   2025 0x007e64d4-8b28b734 (32) f1862
+       # 642 0x008b53a0-77adc601 (32) f479   447 0x009ec152-af54eea3 (32) f284
+       # 1740 0x00c38f56-ed310e61 (32) f1577   2165 0x00cdfd66-f429a93f (32) f2002
+       # 930 0x00d7ada4-b80421c9 (32) f767   1946 0x00da6a7a-e8080600 (32) f1783
+       # 273 0x00f8ea00-760bf97c (32) f110   1589 0x0103c4ee-94fad5dd (32) f1426
+       # 1383 0x01193516-83120b48 (32) f1220   2379 0x01431e3c-e85b5bd9 (32) f2216
+       #
+       # find couple names in a same htree block of the same size
+       mdt_dev=$(facet_device $SINGLEMDS)
+       de=( $(do_facet $SINGLEMDS "debugfs -c -R 'htree /ROOT/$tdir' $mdt_dev" |
+               awk '/Reading directory block/ { getline; print $4,$8; exit; }' ))
+       local de1=${de[0]}
+       local de2=${de[1]}
+       [[ $de1 == "" || $de2 == "" ]] && error "de1=$de1 de2=$de2"
+       echo "USE: $de1 $de2"
+       # release one mkdir will lookup
+       rm $DIR/$tdir/$de2
+#define OBD_FAIL_MDS_PAUSE_CREATE_AFTER_LOOKUP 0x2401
+       do_facet $SINGLEMDS $LCTL set_param fail_loc=0x80002401 fail_val=5
+       mkdir $DIR/$tdir/$de2 &
+       sleep 0.3
+       local PID1=$!
+       # recreate $de2
+       mkdir $DIR2/$tdir/$de2
+       # release space $de1 (should be enough to save $de2)
+       rm $DIR2/$tdir/$de1
+       # ready to create a dup of $de2
+       wait $PID1
+       local found=$(ls $DIR/$tdir/|grep "^$de2\$"|wc -l)
+       (( $found == 1 )) || error "found $found"
+}
+run_test 115 "ldiskfs doesn't check direntry for uniqueness"
+
 log "cleanup: ======================================================"
 
 # kill and wait in each test only guarentee script finish, but command in script