Whamcloud - gitweb
LU-16405 osd: lookup cache 21/50521/36
authorAlex Zhuravlev <bzzz@whamcloud.com>
Mon, 3 Apr 2023 10:32:59 +0000 (13:32 +0300)
committerOleg Drokin <green@whamcloud.com>
Thu, 27 Jul 2023 07:17:54 +0000 (07:17 +0000)
MDT may need to re-lookup just checked names (after locking).
introduce a trivial tiny per-thread cache in OSD in order to
make such a repeating lookup cheap.

the original issue is that ext4_add_entry() doesn't really
check for possible duplicate (that would be expensive as
a whole 4K block must be scanned).

important: the cache is reset upon request processing completion as
we don't update iversion on a disk (due to conflict with VBR).

Fixes: 79acb9a9e7 ("LU-10235 mdt: mdt_create: check EEXIST without lock")
Signed-off-by: Alex Zhuravlev <bzzz@whamcloud.com>
Change-Id: I40c3ee702f7895c3bda00b380f904cd587e0a1c4
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/50521
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Lai Siyao <lai.siyao@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
config/lustre-build-ldiskfs.m4
lustre/include/obd_support.h
lustre/mdt/mdt_reint.c
lustre/osd-ldiskfs/osd_handler.c
lustre/osd-ldiskfs/osd_internal.h
lustre/tests/sanityn.sh

index df5799d..13017be 100644 (file)
@@ -572,6 +572,22 @@ inode_lock_shared, [
 ])
 ]) # LB_HAVE_INODE_LOCK_SHARED
 
+# LB_HAVE_INODE_IVERSION
+#
+AC_DEFUN([LB_HAVE_INODE_IVERSION], [
+LB_CHECK_COMPILE([if iversion primitives defined],
+inode_set_iversion, [
+       #include <linux/iversion.h>
+],[
+       struct inode i;
+
+       inode_set_iversion(&i, 0);
+],[
+       AC_DEFINE(HAVE_INODE_IVERSION, 1,
+               [iversion primitives defined])
+])
+]) # LB_HAVE_INODE_IVERSION
+
 #
 # LB_CONFIG_LDISKFS
 #
@@ -618,6 +634,7 @@ AS_IF([test x$enable_ldiskfs != xno],[
        LB_EXT4_INC_DEC_COUNT_2ARGS
        LB_EXT4_JOURNAL_GET_WRITE_ACCESS_4A
        LB_HAVE_INODE_LOCK_SHARED
+       LB_HAVE_INODE_IVERSION
        AC_DEFINE(CONFIG_LDISKFS_FS_POSIX_ACL, 1, [posix acls for ldiskfs])
        AC_DEFINE(CONFIG_LDISKFS_FS_SECURITY, 1, [fs security for ldiskfs])
        AC_DEFINE(CONFIG_LDISKFS_FS_XATTR, 1, [extened attributes for ldiskfs])
index 5ca93d3..26124d4 100644 (file)
@@ -251,6 +251,7 @@ extern char obd_jobid_var[];
 #define OBD_FAIL_MDS_CHANGELOG_IDX_PUMP                0x16d
 #define OBD_FAIL_MDS_DELAY_DELORPHAN           0x16e
 #define OBD_FAIL_MDS_DIR_PAGE_WALK             0x16f
+/* continue at 0x2400, see below */
 
 /* layout lock */
 #define OBD_FAIL_MDS_NO_LL_GETATTR      0x170
@@ -755,6 +756,11 @@ extern char obd_jobid_var[];
 
 #define OBD_FAIL_OSD_FAIL_AT_TRUNCATE          0x2301
 
+/* continuation of MDS related constants */
+#define OBD_FAIL_MDS_PAUSE_CREATE_AFTER_LOOKUP 0x2401
+
+/* PLEASE, KEEP NUMBERS UP TO 0x3000 RESERVED FOR OBD_FAIL_MDS_* */
+
 /* LNet is allocated failure locations 0xe000 to 0xffff */
 /* Assign references to moved code to reduce code changes */
 #define OBD_FAIL_PRECHECK(id)                   (unlikely(CFS_FAIL_PRECHECK(id)))
index 01f87f1..97c9e83 100644 (file)
@@ -622,6 +622,8 @@ static int mdt_create(struct mdt_thread_info *info)
        if (rc != -ENOENT)
                GOTO(put_parent, rc);
 
+       OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PAUSE_CREATE_AFTER_LOOKUP, cfs_fail_val);
+
        /* save version of file name for replay, it must be ENOENT here */
        mdt_enoent_version_save(info, 1);
 
@@ -638,6 +640,17 @@ static int mdt_create(struct mdt_thread_info *info)
                        GOTO(unlock_parent, rc);
        }
 
+       /*
+        * now repeat the lookup having a LDLM lock on the parent dir,
+        * as another thread could create the same name. notice this
+        * lookup is supposed to hit cache in OSD and be cheap if the
+        * directory is not being modified concurrently.
+        */
+       rc = mdo_lookup(info->mti_env, mdt_object_child(parent), &rr->rr_name,
+                       &info->mti_tmp_fid1, &info->mti_spec);
+       if (unlikely(rc == 0))
+               GOTO(unlock_parent, rc = -EEXIST);
+
        child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2);
        if (unlikely(IS_ERR(child)))
                GOTO(unlock_parent, rc = PTR_ERR(child));
index 782d176..84eeed7 100644 (file)
 #include <linux/module.h>
 #include <linux/user_namespace.h>
 #include <linux/uidgid.h>
+#ifdef HAVE_INODE_IVERSION
+#include <linux/iversion.h>
+#else
+#define inode_peek_iversion(__inode)   ((__inode)->i_version)
+#endif
 
 /* prerequisite for linux/xattr.h */
 #include <linux/types.h>
@@ -116,6 +121,8 @@ static struct lu_kmem_descr ldiskfs_caches[] = {
        }
 };
 
+static atomic_t osd_mount_seq;
+
 static const char dot[] = ".";
 static const char dotdot[] = "..";
 
@@ -6337,7 +6344,7 @@ again:
  * \retval -ve, on error
  */
 static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
-                            struct dt_rec *rec, const struct dt_key *key)
+                            struct dt_rec *rec, const struct lu_name *ln)
 {
        struct inode *dir = obj->oo_inode;
        struct dentry *dentry;
@@ -6345,7 +6352,6 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
        struct buffer_head *bh;
        struct lu_fid *fid = (struct lu_fid *)rec;
        struct htree_lock *hlock = NULL;
-       struct lu_name ln;
        int ino;
        int rc;
 
@@ -6354,11 +6360,7 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
        LASSERT(dir->i_op != NULL);
        LASSERT(dir->i_op->lookup != NULL);
 
-       rc = obj_name2lu_name(obj, (char *)key, strlen((char *)key), &ln);
-       if (rc)
-               RETURN(rc);
-
-       dentry = osd_child_dentry_get(env, obj, ln.ln_name, ln.ln_namelen);
+       dentry = osd_child_dentry_get(env, obj, ln->ln_name, ln->ln_namelen);
 
        if (obj->oo_hl_head != NULL) {
                hlock = osd_oti_get(env)->oti_hlock;
@@ -6387,15 +6389,13 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
                brelse(bh);
                if (rc != 0) {
                        if (unlikely(is_remote_parent_ino(dev, ino))) {
-                               const char *name = (const char *)key;
-
                                /*
                                 * If the parent is on remote MDT, and there
                                 * is no FID-in-dirent, then we have to get
                                 * the parent FID from the linkEA.
                                 */
-                               if (likely(strlen(name) == 2 &&
-                                          name[0] == '.' && name[1] == '.'))
+                               if (likely(ln->ln_namelen == 2 &&
+                                          ln->ln_name[0] == '.' && ln->ln_name[1] == '.'))
                                        rc = osd_get_pfid_from_linkea(env, obj,
                                                                      fid);
                        } else {
@@ -6424,7 +6424,7 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
                                                 osd_obj2dev(obj), id, fid);
                }
                CDEBUG(D_INODE, DFID"/"DNAME" => "DFID"\n",
-                      PFID(lu_object_fid(&obj->oo_dt.do_lu)), PNAME(&ln),
+                      PFID(lu_object_fid(&obj->oo_dt.do_lu)), PNAME(ln),
                       PFID(fid));
        } else {
                rc = PTR_ERR(bh);
@@ -6437,8 +6437,6 @@ out:
                ldiskfs_htree_unlock(hlock);
        else
                up_read(&obj->oo_ext_idx_sem);
-       if (ln.ln_name != (char *)key)
-               kfree(ln.ln_name);
        RETURN(rc);
 }
 
@@ -7852,6 +7850,96 @@ static int osd_it_ea_load(const struct lu_env *env,
        RETURN(rc);
 }
 
+int osd_olc_lookup(const struct lu_env *env, struct osd_object *obj,
+                         u64 iversion, struct dt_rec *rec,
+                         const struct lu_name *ln, int *result)
+{
+       struct osd_thread_info *oti = osd_oti_get(env);
+       struct osd_lookup_cache *olc = oti->oti_lookup_cache;
+       struct osd_device *osd = osd_obj2dev(obj);
+       struct osd_lookup_cache_object *cobj = &oti->oti_cobj;
+       int i;
+
+       if (unlikely(olc == NULL))
+               return 0;
+
+       if (unlikely(atomic_read(&osd_mount_seq) != olc->olc_mount_seq)) {
+               /*
+                * umount has happened, a new OSD could land to the previous
+                * address so we can't use it any more, invalidate our cache
+                */
+               memset(olc, 0, sizeof(*olc));
+               olc->olc_mount_seq = atomic_read(&osd_mount_seq);
+               return 0;
+       }
+
+       memset(cobj, 0, sizeof(*cobj));
+       cobj->lco_osd = osd;
+       cobj->lco_ino = obj->oo_inode->i_ino;
+       cobj->lco_gen = obj->oo_inode->i_generation;
+       cobj->lco_version = iversion;
+
+       for (i = 0; i < OSD_LOOKUP_CACHE_MAX; i++) {
+               struct osd_lookup_cache_entry *entry;
+
+               entry = &olc->olc_entry[i];
+               /* compare if osd/ino/generation/version match */
+               if (memcmp(&entry->lce_obj, cobj, sizeof(*cobj)) != 0)
+                       continue;
+               if (entry->lce_namelen != ln->ln_namelen)
+                       continue;
+               if (memcmp(entry->lce_name, ln->ln_name, ln->ln_namelen) != 0)
+                       continue;
+               /* match */
+               memcpy(rec, &entry->lce_fid, sizeof(entry->lce_fid));
+               *result = entry->lce_rc;
+               return 1;
+       }
+       return 0;
+}
+
+void osd_olc_save(const struct lu_env *env, struct osd_object *obj,
+                         struct dt_rec *rec, const struct lu_name *ln,
+                         const int result, u64 iversion)
+{
+       struct osd_thread_info *oti = osd_oti_get(env);
+       struct osd_lookup_cache_entry *entry;
+       struct osd_lookup_cache *olc;
+
+       if (unlikely(oti->oti_lookup_cache == NULL)) {
+               OBD_ALLOC_PTR(oti->oti_lookup_cache);
+               if (oti->oti_lookup_cache == NULL)
+                       return;
+       }
+
+       olc = oti->oti_lookup_cache;
+       if (unlikely(atomic_read(&osd_mount_seq) != olc->olc_mount_seq)) {
+               memset(olc, 0, sizeof(*olc));
+               olc->olc_mount_seq = atomic_read(&osd_mount_seq);
+       }
+
+       entry = &olc->olc_entry[olc->olc_cur];
+
+       /* invaliate cache slot if needed */
+       if (entry->lce_obj.lco_osd)
+               memset(&entry->lce_obj, 0, sizeof(entry->lce_obj));
+
+       /* XXX: some kind of LRU */
+       entry->lce_obj.lco_osd = osd_obj2dev(obj);
+       entry->lce_obj.lco_ino = obj->oo_inode->i_ino;
+       entry->lce_obj.lco_gen = obj->oo_inode->i_generation;
+       entry->lce_obj.lco_version = iversion;
+
+       LASSERT(ln->ln_namelen <= LDISKFS_NAME_LEN);
+       entry->lce_namelen = ln->ln_namelen;
+       memcpy(entry->lce_name, ln->ln_name, ln->ln_namelen);
+       memcpy(&entry->lce_fid, rec, sizeof(entry->lce_fid));
+       entry->lce_rc = result;
+
+       if (++olc->olc_cur == OSD_LOOKUP_CACHE_MAX)
+               olc->olc_cur = 0;
+}
+
 /**
  * Index lookup function for interoperability mode (b11826).
  *
@@ -7864,16 +7952,37 @@ static int osd_index_ea_lookup(const struct lu_env *env, struct dt_object *dt,
                               struct dt_rec *rec, const struct dt_key *key)
 {
        struct osd_object *obj = osd_dt_obj(dt);
-       int rc = 0;
+       struct lu_name ln;
+       int rc, result;
+       u64 iversion;
 
        ENTRY;
 
        LASSERT(S_ISDIR(obj->oo_inode->i_mode));
        LINVRNT(osd_invariant(obj));
 
-       rc = osd_ea_lookup_rec(env, obj, rec, key);
+       rc = obj_name2lu_name(obj, (char *)key, strlen((char *)key), &ln);
+       if (rc)
+               RETURN(rc);
+
+       /*
+        * grab version before actual lookup, so that we recognize potential
+        * insert between osd_ea_lookup_rec() and osd_olc_save()
+        */
+       iversion = inode_peek_iversion(obj->oo_inode);
+
+       if (osd_olc_lookup(env, obj, iversion, rec, &ln, &result))
+               GOTO(out, rc = result);
+
+       rc = osd_ea_lookup_rec(env, obj, rec, &ln);
        if (rc == 0)
                rc = 1;
+
+       osd_olc_save(env, obj, rec, &ln, rc, iversion);
+
+out:
+       if (ln.ln_name != (char *)key)
+               kfree(ln.ln_name);
        RETURN(rc);
 }
 
@@ -7968,6 +8077,8 @@ static void osd_key_fini(const struct lu_context *ctx,
                info->oti_ins_cache = NULL;
                info->oti_ins_cache_size = 0;
        }
+       if (info->oti_lookup_cache)
+               OBD_FREE_PTR(info->oti_lookup_cache);
        OBD_FREE_PTR(info);
 }
 
@@ -7975,7 +8086,10 @@ static void osd_key_exit(const struct lu_context *ctx,
                         struct lu_context_key *key, void *data)
 {
        struct osd_thread_info *info = data;
+       struct osd_lookup_cache *olc = info->oti_lookup_cache;
 
+       if (olc)
+               memset(olc, 0, sizeof(*olc));
        LASSERT(info->oti_r_locks == 0);
        LASSERT(info->oti_w_locks == 0);
        LASSERT(info->oti_txns    == 0);
@@ -8077,6 +8191,8 @@ static void osd_umount(const struct lu_env *env, struct osd_device *o)
 {
        ENTRY;
 
+       atomic_inc(&osd_mount_seq);
+
        if (o->od_mnt != NULL) {
                shrink_dcache_sb(osd_sb(o));
                osd_sync(env, &o->od_dt_dev);
index 308fed2..1814072 100644 (file)
@@ -628,6 +628,36 @@ static inline struct osd_timespec osd_inode_time(struct inode *inode,
        return ts;
 }
 
+/*
+ * any object can be identified unique way:
+ *  - OSD (filesystem)
+ *  - inode + generation (unique object within a given filesystem)
+ *
+ * version increments each insert/delete in a directory
+ */
+struct osd_lookup_cache_object {
+       struct osd_device *lco_osd;
+       u32             lco_ino;
+       u32             lco_gen;
+       u32             lco_version;
+};
+
+struct osd_lookup_cache_entry {
+       struct osd_lookup_cache_object lce_obj;
+       struct lu_fid   lce_fid;
+       short           lce_rc;
+       short           lce_namelen;
+       char            lce_name[LDISKFS_NAME_LEN];
+};
+
+#define OSD_LOOKUP_CACHE_MAX   3
+
+struct osd_lookup_cache {
+       struct osd_lookup_cache_entry   olc_entry[OSD_LOOKUP_CACHE_MAX];
+       int                             olc_cur;
+       int                             olc_mount_seq;
+};
+
 #define OSD_INS_CACHE_SIZE     8
 
 struct osd_thread_info {
@@ -737,6 +767,9 @@ struct osd_thread_info {
 
        struct osd_it_ea_dirent *oti_seq_dirent;
        struct osd_it_ea_dirent *oti_dir_dirent;
+
+       struct osd_lookup_cache_object oti_cobj; /* cache object id */
+       struct osd_lookup_cache *oti_lookup_cache;
 };
 
 extern int ldiskfs_pdo;
index 562459d..0dc849f 100755 (executable)
@@ -6277,6 +6277,53 @@ test_114() {
 }
 run_test 114 "implicit default LMV inherit"
 
+test_115() {
+       local td=$DIR/$tdir
+
+       [ "$mds1_FSTYPE" == "ldiskfs" ] || skip_env "ldiskfs only test"
+
+       mkdir_on_mdt0 $td || error "can't mkdir"
+       # turn it htree (don't really needed)
+       createmany -m $td/f 3000 || error "can't createmany"
+
+       # here is an example of debugfs output for htree command:
+       # Entry #0: Hash 0x00000000, block 27
+       # Reading directory block 27, phys 16760
+       # 938 0x0016fb58-7f3d21f5 (32) f775   834 0x001db8c8-d31a4e0e (32) f671
+       # 1085 0x0040cb70-4498abd4 (32) f922   1850 0x0066a1e6-f6f0dc69 (32) f1687
+       # 2005 0x006c1a46-ef466058 (32) f1842   2025 0x007e64d4-8b28b734 (32) f1862
+       # 642 0x008b53a0-77adc601 (32) f479   447 0x009ec152-af54eea3 (32) f284
+       # 1740 0x00c38f56-ed310e61 (32) f1577   2165 0x00cdfd66-f429a93f (32) f2002
+       # 930 0x00d7ada4-b80421c9 (32) f767   1946 0x00da6a7a-e8080600 (32) f1783
+       # 273 0x00f8ea00-760bf97c (32) f110   1589 0x0103c4ee-94fad5dd (32) f1426
+       # 1383 0x01193516-83120b48 (32) f1220   2379 0x01431e3c-e85b5bd9 (32) f2216
+       #
+       # find couple names in a same htree block of the same size
+       mdt_dev=$(facet_device $SINGLEMDS)
+       de=( $(do_facet $SINGLEMDS "debugfs -c -R 'htree /ROOT/$tdir' $mdt_dev" |
+               awk '/Reading directory block/ { getline; print $4,$8; exit; }' ))
+       local de1=${de[0]}
+       local de2=${de[1]}
+       [[ $de1 == "" || $de2 == "" ]] && error "de1=$de1 de2=$de2"
+       echo "USE: $de1 $de2"
+       # release one mkdir will lookup
+       rm $DIR/$tdir/$de2
+#define OBD_FAIL_MDS_PAUSE_CREATE_AFTER_LOOKUP 0x2401
+       do_facet $SINGLEMDS $LCTL set_param fail_loc=0x80002401 fail_val=5
+       mkdir $DIR/$tdir/$de2 &
+       sleep 0.3
+       local PID1=$!
+       # recreate $de2
+       mkdir $DIR2/$tdir/$de2
+       # release space $de1 (should be enough to save $de2)
+       rm $DIR2/$tdir/$de1
+       # ready to create a dup of $de2
+       wait $PID1
+       local found=$(ls $DIR/$tdir/|grep "^$de2\$"|wc -l)
+       (( $found == 1 )) || error "found $found"
+}
+run_test 115 "ldiskfs doesn't check direntry for uniqueness"
+
 log "cleanup: ======================================================"
 
 # kill and wait in each test only guarentee script finish, but command in script