From 29f8eb2a67ba2806d91d93de1e82e05a63f76382 Mon Sep 17 00:00:00 2001 From: Alex Zhuravlev Date: Mon, 3 Apr 2023 13:32:59 +0300 Subject: [PATCH] LU-16405 osd: lookup cache MDT may need to re-lookup just checked names (after locking). introduce a trivial tiny per-thread cache in OSD in order to make such a repeating lookup cheap. the original issue is that ext4_add_entry() doesn't really check for possible duplicate (that would be expensive as a whole 4K block must be scanned). important: the cache is reset upon request processing completion as we don't update iversion on a disk (due to conflict with VBR). Fixes: 79acb9a9e7 ("LU-10235 mdt: mdt_create: check EEXIST without lock") Signed-off-by: Alex Zhuravlev Change-Id: I40c3ee702f7895c3bda00b380f904cd587e0a1c4 Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/50521 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Andreas Dilger Reviewed-by: Lai Siyao Reviewed-by: Oleg Drokin --- config/lustre-build-ldiskfs.m4 | 17 +++++ lustre/include/obd_support.h | 6 ++ lustre/mdt/mdt_reint.c | 13 ++++ lustre/osd-ldiskfs/osd_handler.c | 148 +++++++++++++++++++++++++++++++++----- lustre/osd-ldiskfs/osd_internal.h | 33 +++++++++ lustre/tests/sanityn.sh | 47 ++++++++++++ 6 files changed, 248 insertions(+), 16 deletions(-) diff --git a/config/lustre-build-ldiskfs.m4 b/config/lustre-build-ldiskfs.m4 index df5799d..13017be 100644 --- a/config/lustre-build-ldiskfs.m4 +++ b/config/lustre-build-ldiskfs.m4 @@ -572,6 +572,22 @@ inode_lock_shared, [ ]) ]) # LB_HAVE_INODE_LOCK_SHARED +# LB_HAVE_INODE_IVERSION +# +AC_DEFUN([LB_HAVE_INODE_IVERSION], [ +LB_CHECK_COMPILE([if iversion primitives defined], +inode_set_iversion, [ + #include +],[ + struct inode i; + + inode_set_iversion(&i, 0); +],[ + AC_DEFINE(HAVE_INODE_IVERSION, 1, + [iversion primitives defined]) +]) +]) # LB_HAVE_INODE_IVERSION + # # LB_CONFIG_LDISKFS # @@ -618,6 +634,7 @@ AS_IF([test x$enable_ldiskfs != xno],[ LB_EXT4_INC_DEC_COUNT_2ARGS LB_EXT4_JOURNAL_GET_WRITE_ACCESS_4A LB_HAVE_INODE_LOCK_SHARED + LB_HAVE_INODE_IVERSION AC_DEFINE(CONFIG_LDISKFS_FS_POSIX_ACL, 1, [posix acls for ldiskfs]) AC_DEFINE(CONFIG_LDISKFS_FS_SECURITY, 1, [fs security for ldiskfs]) AC_DEFINE(CONFIG_LDISKFS_FS_XATTR, 1, [extened attributes for ldiskfs]) diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index 5ca93d3..26124d4 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -251,6 +251,7 @@ extern char obd_jobid_var[]; #define OBD_FAIL_MDS_CHANGELOG_IDX_PUMP 0x16d #define OBD_FAIL_MDS_DELAY_DELORPHAN 0x16e #define OBD_FAIL_MDS_DIR_PAGE_WALK 0x16f +/* continue at 0x2400, see below */ /* layout lock */ #define OBD_FAIL_MDS_NO_LL_GETATTR 0x170 @@ -755,6 +756,11 @@ extern char obd_jobid_var[]; #define OBD_FAIL_OSD_FAIL_AT_TRUNCATE 0x2301 +/* continuation of MDS related constants */ +#define OBD_FAIL_MDS_PAUSE_CREATE_AFTER_LOOKUP 0x2401 + +/* PLEASE, KEEP NUMBERS UP TO 0x3000 RESERVED FOR OBD_FAIL_MDS_* */ + /* LNet is allocated failure locations 0xe000 to 0xffff */ /* Assign references to moved code to reduce code changes */ #define OBD_FAIL_PRECHECK(id) (unlikely(CFS_FAIL_PRECHECK(id))) diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index 01f87f1..97c9e83 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -622,6 +622,8 @@ static int mdt_create(struct mdt_thread_info *info) if (rc != -ENOENT) GOTO(put_parent, rc); + OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PAUSE_CREATE_AFTER_LOOKUP, cfs_fail_val); + /* save version of file name for replay, it must be ENOENT here */ mdt_enoent_version_save(info, 1); @@ -638,6 +640,17 @@ static int mdt_create(struct mdt_thread_info *info) GOTO(unlock_parent, rc); } + /* + * now repeat the lookup having a LDLM lock on the parent dir, + * as another thread could create the same name. notice this + * lookup is supposed to hit cache in OSD and be cheap if the + * directory is not being modified concurrently. + */ + rc = mdo_lookup(info->mti_env, mdt_object_child(parent), &rr->rr_name, + &info->mti_tmp_fid1, &info->mti_spec); + if (unlikely(rc == 0)) + GOTO(unlock_parent, rc = -EEXIST); + child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2); if (unlikely(IS_ERR(child))) GOTO(unlock_parent, rc = PTR_ERR(child)); diff --git a/lustre/osd-ldiskfs/osd_handler.c b/lustre/osd-ldiskfs/osd_handler.c index 782d176..84eeed7 100644 --- a/lustre/osd-ldiskfs/osd_handler.c +++ b/lustre/osd-ldiskfs/osd_handler.c @@ -42,6 +42,11 @@ #include #include #include +#ifdef HAVE_INODE_IVERSION +#include +#else +#define inode_peek_iversion(__inode) ((__inode)->i_version) +#endif /* prerequisite for linux/xattr.h */ #include @@ -116,6 +121,8 @@ static struct lu_kmem_descr ldiskfs_caches[] = { } }; +static atomic_t osd_mount_seq; + static const char dot[] = "."; static const char dotdot[] = ".."; @@ -6337,7 +6344,7 @@ again: * \retval -ve, on error */ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj, - struct dt_rec *rec, const struct dt_key *key) + struct dt_rec *rec, const struct lu_name *ln) { struct inode *dir = obj->oo_inode; struct dentry *dentry; @@ -6345,7 +6352,6 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj, struct buffer_head *bh; struct lu_fid *fid = (struct lu_fid *)rec; struct htree_lock *hlock = NULL; - struct lu_name ln; int ino; int rc; @@ -6354,11 +6360,7 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj, LASSERT(dir->i_op != NULL); LASSERT(dir->i_op->lookup != NULL); - rc = obj_name2lu_name(obj, (char *)key, strlen((char *)key), &ln); - if (rc) - RETURN(rc); - - dentry = osd_child_dentry_get(env, obj, ln.ln_name, ln.ln_namelen); + dentry = osd_child_dentry_get(env, obj, ln->ln_name, ln->ln_namelen); if (obj->oo_hl_head != NULL) { hlock = osd_oti_get(env)->oti_hlock; @@ -6387,15 +6389,13 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj, brelse(bh); if (rc != 0) { if (unlikely(is_remote_parent_ino(dev, ino))) { - const char *name = (const char *)key; - /* * If the parent is on remote MDT, and there * is no FID-in-dirent, then we have to get * the parent FID from the linkEA. */ - if (likely(strlen(name) == 2 && - name[0] == '.' && name[1] == '.')) + if (likely(ln->ln_namelen == 2 && + ln->ln_name[0] == '.' && ln->ln_name[1] == '.')) rc = osd_get_pfid_from_linkea(env, obj, fid); } else { @@ -6424,7 +6424,7 @@ static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj, osd_obj2dev(obj), id, fid); } CDEBUG(D_INODE, DFID"/"DNAME" => "DFID"\n", - PFID(lu_object_fid(&obj->oo_dt.do_lu)), PNAME(&ln), + PFID(lu_object_fid(&obj->oo_dt.do_lu)), PNAME(ln), PFID(fid)); } else { rc = PTR_ERR(bh); @@ -6437,8 +6437,6 @@ out: ldiskfs_htree_unlock(hlock); else up_read(&obj->oo_ext_idx_sem); - if (ln.ln_name != (char *)key) - kfree(ln.ln_name); RETURN(rc); } @@ -7852,6 +7850,96 @@ static int osd_it_ea_load(const struct lu_env *env, RETURN(rc); } +int osd_olc_lookup(const struct lu_env *env, struct osd_object *obj, + u64 iversion, struct dt_rec *rec, + const struct lu_name *ln, int *result) +{ + struct osd_thread_info *oti = osd_oti_get(env); + struct osd_lookup_cache *olc = oti->oti_lookup_cache; + struct osd_device *osd = osd_obj2dev(obj); + struct osd_lookup_cache_object *cobj = &oti->oti_cobj; + int i; + + if (unlikely(olc == NULL)) + return 0; + + if (unlikely(atomic_read(&osd_mount_seq) != olc->olc_mount_seq)) { + /* + * umount has happened, a new OSD could land to the previous + * address so we can't use it any more, invalidate our cache + */ + memset(olc, 0, sizeof(*olc)); + olc->olc_mount_seq = atomic_read(&osd_mount_seq); + return 0; + } + + memset(cobj, 0, sizeof(*cobj)); + cobj->lco_osd = osd; + cobj->lco_ino = obj->oo_inode->i_ino; + cobj->lco_gen = obj->oo_inode->i_generation; + cobj->lco_version = iversion; + + for (i = 0; i < OSD_LOOKUP_CACHE_MAX; i++) { + struct osd_lookup_cache_entry *entry; + + entry = &olc->olc_entry[i]; + /* compare if osd/ino/generation/version match */ + if (memcmp(&entry->lce_obj, cobj, sizeof(*cobj)) != 0) + continue; + if (entry->lce_namelen != ln->ln_namelen) + continue; + if (memcmp(entry->lce_name, ln->ln_name, ln->ln_namelen) != 0) + continue; + /* match */ + memcpy(rec, &entry->lce_fid, sizeof(entry->lce_fid)); + *result = entry->lce_rc; + return 1; + } + return 0; +} + +void osd_olc_save(const struct lu_env *env, struct osd_object *obj, + struct dt_rec *rec, const struct lu_name *ln, + const int result, u64 iversion) +{ + struct osd_thread_info *oti = osd_oti_get(env); + struct osd_lookup_cache_entry *entry; + struct osd_lookup_cache *olc; + + if (unlikely(oti->oti_lookup_cache == NULL)) { + OBD_ALLOC_PTR(oti->oti_lookup_cache); + if (oti->oti_lookup_cache == NULL) + return; + } + + olc = oti->oti_lookup_cache; + if (unlikely(atomic_read(&osd_mount_seq) != olc->olc_mount_seq)) { + memset(olc, 0, sizeof(*olc)); + olc->olc_mount_seq = atomic_read(&osd_mount_seq); + } + + entry = &olc->olc_entry[olc->olc_cur]; + + /* invaliate cache slot if needed */ + if (entry->lce_obj.lco_osd) + memset(&entry->lce_obj, 0, sizeof(entry->lce_obj)); + + /* XXX: some kind of LRU */ + entry->lce_obj.lco_osd = osd_obj2dev(obj); + entry->lce_obj.lco_ino = obj->oo_inode->i_ino; + entry->lce_obj.lco_gen = obj->oo_inode->i_generation; + entry->lce_obj.lco_version = iversion; + + LASSERT(ln->ln_namelen <= LDISKFS_NAME_LEN); + entry->lce_namelen = ln->ln_namelen; + memcpy(entry->lce_name, ln->ln_name, ln->ln_namelen); + memcpy(&entry->lce_fid, rec, sizeof(entry->lce_fid)); + entry->lce_rc = result; + + if (++olc->olc_cur == OSD_LOOKUP_CACHE_MAX) + olc->olc_cur = 0; +} + /** * Index lookup function for interoperability mode (b11826). * @@ -7864,16 +7952,37 @@ static int osd_index_ea_lookup(const struct lu_env *env, struct dt_object *dt, struct dt_rec *rec, const struct dt_key *key) { struct osd_object *obj = osd_dt_obj(dt); - int rc = 0; + struct lu_name ln; + int rc, result; + u64 iversion; ENTRY; LASSERT(S_ISDIR(obj->oo_inode->i_mode)); LINVRNT(osd_invariant(obj)); - rc = osd_ea_lookup_rec(env, obj, rec, key); + rc = obj_name2lu_name(obj, (char *)key, strlen((char *)key), &ln); + if (rc) + RETURN(rc); + + /* + * grab version before actual lookup, so that we recognize potential + * insert between osd_ea_lookup_rec() and osd_olc_save() + */ + iversion = inode_peek_iversion(obj->oo_inode); + + if (osd_olc_lookup(env, obj, iversion, rec, &ln, &result)) + GOTO(out, rc = result); + + rc = osd_ea_lookup_rec(env, obj, rec, &ln); if (rc == 0) rc = 1; + + osd_olc_save(env, obj, rec, &ln, rc, iversion); + +out: + if (ln.ln_name != (char *)key) + kfree(ln.ln_name); RETURN(rc); } @@ -7968,6 +8077,8 @@ static void osd_key_fini(const struct lu_context *ctx, info->oti_ins_cache = NULL; info->oti_ins_cache_size = 0; } + if (info->oti_lookup_cache) + OBD_FREE_PTR(info->oti_lookup_cache); OBD_FREE_PTR(info); } @@ -7975,7 +8086,10 @@ static void osd_key_exit(const struct lu_context *ctx, struct lu_context_key *key, void *data) { struct osd_thread_info *info = data; + struct osd_lookup_cache *olc = info->oti_lookup_cache; + if (olc) + memset(olc, 0, sizeof(*olc)); LASSERT(info->oti_r_locks == 0); LASSERT(info->oti_w_locks == 0); LASSERT(info->oti_txns == 0); @@ -8077,6 +8191,8 @@ static void osd_umount(const struct lu_env *env, struct osd_device *o) { ENTRY; + atomic_inc(&osd_mount_seq); + if (o->od_mnt != NULL) { shrink_dcache_sb(osd_sb(o)); osd_sync(env, &o->od_dt_dev); diff --git a/lustre/osd-ldiskfs/osd_internal.h b/lustre/osd-ldiskfs/osd_internal.h index 308fed2..1814072 100644 --- a/lustre/osd-ldiskfs/osd_internal.h +++ b/lustre/osd-ldiskfs/osd_internal.h @@ -628,6 +628,36 @@ static inline struct osd_timespec osd_inode_time(struct inode *inode, return ts; } +/* + * any object can be identified unique way: + * - OSD (filesystem) + * - inode + generation (unique object within a given filesystem) + * + * version increments each insert/delete in a directory + */ +struct osd_lookup_cache_object { + struct osd_device *lco_osd; + u32 lco_ino; + u32 lco_gen; + u32 lco_version; +}; + +struct osd_lookup_cache_entry { + struct osd_lookup_cache_object lce_obj; + struct lu_fid lce_fid; + short lce_rc; + short lce_namelen; + char lce_name[LDISKFS_NAME_LEN]; +}; + +#define OSD_LOOKUP_CACHE_MAX 3 + +struct osd_lookup_cache { + struct osd_lookup_cache_entry olc_entry[OSD_LOOKUP_CACHE_MAX]; + int olc_cur; + int olc_mount_seq; +}; + #define OSD_INS_CACHE_SIZE 8 struct osd_thread_info { @@ -737,6 +767,9 @@ struct osd_thread_info { struct osd_it_ea_dirent *oti_seq_dirent; struct osd_it_ea_dirent *oti_dir_dirent; + + struct osd_lookup_cache_object oti_cobj; /* cache object id */ + struct osd_lookup_cache *oti_lookup_cache; }; extern int ldiskfs_pdo; diff --git a/lustre/tests/sanityn.sh b/lustre/tests/sanityn.sh index 562459d..0dc849f 100755 --- a/lustre/tests/sanityn.sh +++ b/lustre/tests/sanityn.sh @@ -6277,6 +6277,53 @@ test_114() { } run_test 114 "implicit default LMV inherit" +test_115() { + local td=$DIR/$tdir + + [ "$mds1_FSTYPE" == "ldiskfs" ] || skip_env "ldiskfs only test" + + mkdir_on_mdt0 $td || error "can't mkdir" + # turn it htree (don't really needed) + createmany -m $td/f 3000 || error "can't createmany" + + # here is an example of debugfs output for htree command: + # Entry #0: Hash 0x00000000, block 27 + # Reading directory block 27, phys 16760 + # 938 0x0016fb58-7f3d21f5 (32) f775 834 0x001db8c8-d31a4e0e (32) f671 + # 1085 0x0040cb70-4498abd4 (32) f922 1850 0x0066a1e6-f6f0dc69 (32) f1687 + # 2005 0x006c1a46-ef466058 (32) f1842 2025 0x007e64d4-8b28b734 (32) f1862 + # 642 0x008b53a0-77adc601 (32) f479 447 0x009ec152-af54eea3 (32) f284 + # 1740 0x00c38f56-ed310e61 (32) f1577 2165 0x00cdfd66-f429a93f (32) f2002 + # 930 0x00d7ada4-b80421c9 (32) f767 1946 0x00da6a7a-e8080600 (32) f1783 + # 273 0x00f8ea00-760bf97c (32) f110 1589 0x0103c4ee-94fad5dd (32) f1426 + # 1383 0x01193516-83120b48 (32) f1220 2379 0x01431e3c-e85b5bd9 (32) f2216 + # + # find couple names in a same htree block of the same size + mdt_dev=$(facet_device $SINGLEMDS) + de=( $(do_facet $SINGLEMDS "debugfs -c -R 'htree /ROOT/$tdir' $mdt_dev" | + awk '/Reading directory block/ { getline; print $4,$8; exit; }' )) + local de1=${de[0]} + local de2=${de[1]} + [[ $de1 == "" || $de2 == "" ]] && error "de1=$de1 de2=$de2" + echo "USE: $de1 $de2" + # release one mkdir will lookup + rm $DIR/$tdir/$de2 +#define OBD_FAIL_MDS_PAUSE_CREATE_AFTER_LOOKUP 0x2401 + do_facet $SINGLEMDS $LCTL set_param fail_loc=0x80002401 fail_val=5 + mkdir $DIR/$tdir/$de2 & + sleep 0.3 + local PID1=$! + # recreate $de2 + mkdir $DIR2/$tdir/$de2 + # release space $de1 (should be enough to save $de2) + rm $DIR2/$tdir/$de1 + # ready to create a dup of $de2 + wait $PID1 + local found=$(ls $DIR/$tdir/|grep "^$de2\$"|wc -l) + (( $found == 1 )) || error "found $found" +} +run_test 115 "ldiskfs doesn't check direntry for uniqueness" + log "cleanup: ======================================================" # kill and wait in each test only guarentee script finish, but command in script -- 1.8.3.1