From a4c325bbad4edd9b610be92f8b5ed2b14a818ed0 Mon Sep 17 00:00:00 2001 From: Alex Zhuravlev Date: Fri, 15 Jul 2016 17:05:29 +0400 Subject: [PATCH] LU-7910 osd: do not lookup child objects in osd_dir_insert() instead cache FID->dnode mapping in @env at declarations. Change-Id: I2c2ab17cd6e158e9462715f12c21da2c2b8402db Signed-off-by: Alex Zhuravlev Reviewed-on: https://review.whamcloud.com/21333 Reviewed-by: Andreas Dilger Tested-by: Jenkins Reviewed-by: Fan Yong Tested-by: Maloo Reviewed-by: Oleg Drokin --- lustre/osd-zfs/osd_handler.c | 12 +++- lustre/osd-zfs/osd_index.c | 141 +++++++++++++----------------------------- lustre/osd-zfs/osd_internal.h | 28 +++++++++ lustre/osd-zfs/osd_object.c | 16 +++++ lustre/osd-zfs/osd_oi.c | 138 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 233 insertions(+), 102 deletions(-) diff --git a/lustre/osd-zfs/osd_handler.c b/lustre/osd-zfs/osd_handler.c index 8755499..0b079ad 100644 --- a/lustre/osd-zfs/osd_handler.c +++ b/lustre/osd-zfs/osd_handler.c @@ -280,6 +280,8 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt, oh = container_of0(th, struct osd_thandle, ot_super); INIT_LIST_HEAD(&unlinked); list_splice_init(&oh->ot_unlinked_list, &unlinked); + /* reset OI cache for safety */ + osd_oti_get(env)->oti_ins_cache_used = 0; if (oh->ot_assigned == 0) { LASSERT(oh->ot_tx); @@ -697,16 +699,20 @@ static void osd_key_fini(const struct lu_context *ctx, struct lu_context_key *key, void *data) { struct osd_thread_info *info = data; + struct osd_idmap_cache *idc = info->oti_ins_cache; + if (idc != NULL) { + LASSERT(info->oti_ins_cache_size > 0); + OBD_FREE(idc, sizeof(*idc) * info->oti_ins_cache_size); + info->oti_ins_cache = NULL; + info->oti_ins_cache_size = 0; + } OBD_FREE_PTR(info); } static void osd_key_exit(const struct lu_context *ctx, struct lu_context_key *key, void *data) { - struct osd_thread_info *info = data; - - memset(info, 0, sizeof(*info)); } struct lu_context_key osd_key = { diff --git a/lustre/osd-zfs/osd_index.c b/lustre/osd-zfs/osd_index.c index f0b588c..ee97cb3 100644 --- a/lustre/osd-zfs/osd_index.c +++ b/lustre/osd-zfs/osd_index.c @@ -455,11 +455,19 @@ static int osd_declare_dir_insert(const struct lu_env *env, const struct dt_key *key, struct thandle *th) { - struct osd_object *obj = osd_dt_obj(dt); - struct osd_thandle *oh; - uint64_t object; + struct osd_object *obj = osd_dt_obj(dt); + struct osd_device *osd = osd_obj2dev(obj); + const struct dt_insert_rec *rec1; + const struct lu_fid *fid; + struct osd_thandle *oh; + uint64_t object; ENTRY; + rec1 = (struct dt_insert_rec *)rec; + fid = rec1->rec_fid; + LASSERT(fid != NULL); + LASSERT(rec1->rec_type != 0); + LASSERT(th != NULL); oh = container_of0(th, struct osd_thandle, ot_super); @@ -474,64 +482,9 @@ static int osd_declare_dir_insert(const struct lu_env *env, * before insertion */ dmu_tx_hold_zap(oh->ot_tx, object, TRUE, NULL); - RETURN(0); -} - -/** - * Find the osd object for given fid. - * - * \param fid need to find the osd object having this fid - * - * \retval osd_object on success - * \retval -ve on error - */ -struct osd_object *osd_object_find(const struct lu_env *env, - struct dt_object *dt, - const struct lu_fid *fid) -{ - struct lu_device *ludev = dt->do_lu.lo_dev; - struct osd_object *child = NULL; - struct lu_object *luch; - struct lu_object *lo; + osd_idc_find_or_init(env, osd, fid); - /* - * at this point topdev might not exist yet - * (i.e. MGS is preparing profiles). so we can - * not rely on topdev and instead lookup with - * our device passed as topdev. this can't work - * if the object isn't cached yet (as osd doesn't - * allocate lu_header). IOW, the object must be - * in the cache, otherwise lu_object_alloc() crashes - * -bzzz - */ - luch = lu_object_find_at(env, ludev, fid, NULL); - if (IS_ERR(luch)) - return (void *)luch; - - if (lu_object_exists(luch)) { - lo = lu_object_locate(luch->lo_header, ludev->ld_type); - if (lo != NULL) - child = osd_obj(lo); - else - LU_OBJECT_DEBUG(D_ERROR, env, luch, - "%s: object can't be located "DFID, - osd_dev(ludev)->od_svname, PFID(fid)); - - if (child == NULL) { - lu_object_put(env, luch); - CERROR("%s: Unable to get osd_object "DFID"\n", - osd_dev(ludev)->od_svname, PFID(fid)); - child = ERR_PTR(-ENOENT); - } - } else { - LU_OBJECT_DEBUG(D_ERROR, env, luch, - "%s: lu_object does not exists "DFID, - osd_dev(ludev)->od_svname, PFID(fid)); - lu_object_put(env, luch); - child = ERR_PTR(-ENOENT); - } - - return child; + RETURN(0); } /** @@ -567,8 +520,8 @@ static int osd_seq_exists(const struct lu_env *env, struct osd_device *osd, RETURN(ss->ss_node_id == range->lsr_index); } -static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd, - const struct lu_fid *fid) +int osd_remote_fid(const struct lu_env *env, struct osd_device *osd, + const struct lu_fid *fid) { struct seq_server_site *ss = osd_seq_site(osd); ENTRY; @@ -611,9 +564,8 @@ static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt, struct osd_device *osd = osd_obj2dev(parent); struct dt_insert_rec *rec1 = (struct dt_insert_rec *)rec; const struct lu_fid *fid = rec1->rec_fid; - struct osd_thandle *oh; - struct osd_object *child = NULL; - __u32 attr; + struct osd_thandle *oh; + struct osd_idmap_cache *idc; char *name = (char *)key; int rc; ENTRY; @@ -626,61 +578,54 @@ static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt, LASSERT(th != NULL); oh = container_of0(th, struct osd_thandle, ot_super); - rc = osd_remote_fid(env, osd, fid); - if (rc < 0) { - CERROR("%s: Can not find object "DFID": rc = %d\n", - osd->od_svname, PFID(fid), rc); - RETURN(rc); + idc = osd_idc_find(env, osd, fid); + if (unlikely(idc == NULL)) { + /* this dt_insert() wasn't declared properly, so + * FID is missing in OI cache. we better do not + * lookup FID in FLDB/OI and don't risk to deadlock, + * but in some special cases (lfsck testing, etc) + * it's much simpler than fixing a caller */ + CERROR("%s: "DFID" wasn't declared for insert\n", + osd_name(osd), PFID(fid)); + idc = osd_idc_find_or_init(env, osd, fid); + if (IS_ERR(idc)) + RETURN(PTR_ERR(idc)); } - if (unlikely(rc == 1)) { + if (idc->oic_remote) { /* Insert remote entry */ memset(&oti->oti_zde.lzd_reg, 0, sizeof(oti->oti_zde.lzd_reg)); oti->oti_zde.lzd_reg.zde_type = IFTODT(rec1->rec_type & S_IFMT); } else { - /* - * To simulate old Orion setups with ./.. stored in the - * directories - */ - /* Insert local entry */ - child = osd_object_find(env, dt, fid); - if (IS_ERR(child)) - RETURN(PTR_ERR(child)); - - LASSERT(child->oo_db); + if (unlikely(idc->oic_dnode == 0)) { + /* for a reason OI cache wasn't filled properly */ + CERROR("%s: OIC for "DFID" isn't filled\n", + osd_name(osd), PFID(fid)); + RETURN(-EINVAL); + } if (name[0] == '.') { if (name[1] == 0) { /* do not store ".", instead generate it * during iteration */ GOTO(out, rc = 0); } else if (name[1] == '.' && name[2] == 0) { - if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PARENT)) { - struct lu_fid tfid = *fid; - - osd_object_put(env, child); - tfid.f_oid--; - child = osd_object_find(env, dt, &tfid); - if (IS_ERR(child)) - RETURN(PTR_ERR(child)); - - LASSERT(child->oo_db); - } + uint64_t dnode = idc->oic_dnode; + if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PARENT)) + dnode--; /* update parent dnode in the child. * later it will be used to generate ".." */ rc = osd_object_sa_update(parent, SA_ZPL_PARENT(osd), - &child->oo_db->db_object, - 8, oh); + &dnode, 8, oh); GOTO(out, rc); } } CLASSERT(sizeof(oti->oti_zde.lzd_reg) == 8); CLASSERT(sizeof(oti->oti_zde) % 8 == 0); - attr = child->oo_dt.do_lu.lo_header ->loh_attr; - oti->oti_zde.lzd_reg.zde_type = IFTODT(attr & S_IFMT); - oti->oti_zde.lzd_reg.zde_dnode = child->oo_db->db_object; + oti->oti_zde.lzd_reg.zde_type = IFTODT(rec1->rec_type & S_IFMT); + oti->oti_zde.lzd_reg.zde_dnode = idc->oic_dnode; } oti->oti_zde.lzd_fid = *fid; @@ -696,8 +641,6 @@ static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt, (void *)&oti->oti_zde, oh->ot_tx); out: - if (child != NULL) - osd_object_put(env, child); RETURN(rc); } diff --git a/lustre/osd-zfs/osd_internal.h b/lustre/osd-zfs/osd_internal.h index 892fbc0..cbaac32 100644 --- a/lustre/osd-zfs/osd_internal.h +++ b/lustre/osd-zfs/osd_internal.h @@ -160,6 +160,18 @@ struct osa_attr { uint64_t ctime[2]; }; + +#define OSD_INS_CACHE_SIZE 8 + +/* OI cache entry */ +struct osd_idmap_cache { + struct osd_device *oic_dev; + struct lu_fid oic_fid; + /** max 2^48 dnodes per dataset, avoid spilling into another word */ + uint64_t oic_dnode:DN_MAX_OBJECT_SHIFT, + oic_remote:1; /* FID isn't local */ +}; + /* max.number of regular attrubites the callers may ask for */ #define OSD_MAX_IN_BULK 13 @@ -192,6 +204,11 @@ struct osd_thread_info { struct lquota_id_info oti_qi; struct lu_seq_range oti_seq_range; + + /* dedicated OI cache for insert (which needs inum) */ + struct osd_idmap_cache *oti_ins_cache; + int oti_ins_cache_size; + int oti_ins_cache_used; }; extern struct lu_context_key osd_key; @@ -485,6 +502,15 @@ uint64_t osd_get_name_n_idx(const struct lu_env *env, struct osd_device *osd, int osd_options_init(void); int osd_ost_seq_exists(const struct lu_env *env, struct osd_device *osd, __u64 seq); +int osd_idc_find_and_init(const struct lu_env *env, struct osd_device *osd, + struct osd_object *obj); +struct osd_idmap_cache *osd_idc_find_or_init(const struct lu_env *env, + struct osd_device *osd, + const struct lu_fid *fid); +struct osd_idmap_cache *osd_idc_find(const struct lu_env *env, + struct osd_device *osd, + const struct lu_fid *fid); + /* osd_index.c */ int osd_index_try(const struct lu_env *env, struct dt_object *dt, const struct dt_index_features *feat); @@ -496,6 +522,8 @@ int osd_zap_cursor_init(zap_cursor_t **zc, struct objset *os, uint64_t id, uint64_t dirhash); void osd_zap_cursor_fini(zap_cursor_t *zc); uint64_t osd_zap_cursor_serialize(zap_cursor_t *zc); +int osd_remote_fid(const struct lu_env *env, struct osd_device *osd, + const struct lu_fid *fid); /* osd_xattr.c */ int __osd_xattr_load(struct osd_device *osd, uint64_t dnode, diff --git a/lustre/osd-zfs/osd_object.c b/lustre/osd-zfs/osd_object.c index 4cfd24b..1cdb0e1 100644 --- a/lustre/osd-zfs/osd_object.c +++ b/lustre/osd-zfs/osd_object.c @@ -543,6 +543,10 @@ static int osd_declare_object_destroy(const struct lu_env *env, else dmu_tx_hold_zap(oh->ot_tx, osd->od_unlinkedid, TRUE, NULL); + /* will help to find FID->ino when this object is being + * added to PENDING/ */ + osd_idc_find_and_init(env, osd, obj); + RETURN(0); } @@ -1124,6 +1128,13 @@ static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah, ah->dah_parent = parent; ah->dah_mode = child_mode; + + if (parent != NULL && !dt_object_remote(parent)) { + /* will help to find FID->ino at dt_insert("..") */ + struct osd_object *pobj = osd_dt_obj(parent); + + osd_idc_find_and_init(env, osd_obj2dev(pobj), pobj); + } } static int osd_declare_object_create(const struct lu_env *env, @@ -1193,8 +1204,12 @@ static int osd_declare_object_create(const struct lu_env *env, dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, FALSE, NULL); dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, FALSE, NULL); + /* will help to find FID->ino mapping at dt_insert() */ + osd_idc_find_and_init(env, osd, obj); + rc = osd_declare_quota(env, osd, attr->la_uid, attr->la_gid, 1, oh, false, NULL, false); + RETURN(rc); } @@ -1555,6 +1570,7 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt, rc = __osd_sa_xattr_update(env, obj, oh); if (rc) GOTO(out, rc); + osd_idc_find_and_init(env, osd, obj); /* Add new object to inode accounting. * Errors are not considered as fatal */ diff --git a/lustre/osd-zfs/osd_oi.c b/lustre/osd-zfs/osd_oi.c index 5383858..d17ae6f 100644 --- a/lustre/osd-zfs/osd_oi.c +++ b/lustre/osd-zfs/osd_oi.c @@ -764,3 +764,141 @@ int osd_options_init(void) return 0; } + +/* + * the following set of functions are used to maintain per-thread + * cache of FID->ino mapping. this mechanism is used to avoid + * expensive LU/OI lookups. + */ +struct osd_idmap_cache *osd_idc_find(const struct lu_env *env, + struct osd_device *osd, + const struct lu_fid *fid) +{ + struct osd_thread_info *oti = osd_oti_get(env); + struct osd_idmap_cache *idc = oti->oti_ins_cache; + int i; + + for (i = 0; i < oti->oti_ins_cache_used; i++) { + if (!lu_fid_eq(&idc[i].oic_fid, fid)) + continue; + if (idc[i].oic_dev != osd) + continue; + + return idc + i; + } + + return NULL; +} + +struct osd_idmap_cache *osd_idc_add(const struct lu_env *env, + struct osd_device *osd, + const struct lu_fid *fid) +{ + struct osd_thread_info *oti = osd_oti_get(env); + struct osd_idmap_cache *idc; + int i; + + if (unlikely(oti->oti_ins_cache_used >= oti->oti_ins_cache_size)) { + i = oti->oti_ins_cache_size * 2; + LASSERT(i < 1000); + if (i == 0) + i = OSD_INS_CACHE_SIZE; + OBD_ALLOC(idc, sizeof(*idc) * i); + if (idc == NULL) + return ERR_PTR(-ENOMEM); + if (oti->oti_ins_cache != NULL) { + memcpy(idc, oti->oti_ins_cache, + oti->oti_ins_cache_used * sizeof(*idc)); + OBD_FREE(oti->oti_ins_cache, + oti->oti_ins_cache_used * sizeof(*idc)); + } + oti->oti_ins_cache = idc; + oti->oti_ins_cache_size = i; + } + + idc = &oti->oti_ins_cache[oti->oti_ins_cache_used++]; + idc->oic_fid = *fid; + idc->oic_dev = osd; + idc->oic_dnode = 0; + idc->oic_remote = 0; + + return idc; +} + +/** + * Lookup mapping for the given fid in the cache + * + * Initialize a new one if not found. the initialization checks whether + * the object is local or remote. for the local objects, OI is used to + * learn dnode#. the function is used when the caller has no information + * about the object, e.g. at dt_insert(). + */ +struct osd_idmap_cache *osd_idc_find_or_init(const struct lu_env *env, + struct osd_device *osd, + const struct lu_fid *fid) +{ + struct osd_idmap_cache *idc; + int rc; + + idc = osd_idc_find(env, osd, fid); + if (idc != NULL) + return idc; + + /* new mapping is needed */ + idc = osd_idc_add(env, osd, fid); + if (IS_ERR(idc)) + return idc; + + /* initialize it */ + rc = osd_remote_fid(env, osd, fid); + if (unlikely(rc < 0)) + return ERR_PTR(rc); + + if (rc == 0) { + /* the object is local, lookup in OI */ + uint64_t dnode; + + rc = osd_fid_lookup(env, osd, fid, &dnode); + if (unlikely(rc < 0)) { + CERROR("%s: can't lookup: rc = %d\n", + osd->od_svname, rc); + return ERR_PTR(rc); + } + LASSERT(dnode < (1ULL << DN_MAX_OBJECT_SHIFT)); + idc->oic_dnode = dnode; + } else { + /* the object is remote */ + idc->oic_remote = 1; + } + + return idc; +} + +/* + * lookup mapping for given FID and fill it from the given object. + * the object is local by definition. + */ +int osd_idc_find_and_init(const struct lu_env *env, struct osd_device *osd, + struct osd_object *obj) +{ + const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu); + struct osd_idmap_cache *idc; + + idc = osd_idc_find(env, osd, fid); + if (idc != NULL) { + if (obj->oo_db == NULL) + return 0; + idc->oic_dnode = obj->oo_db->db_object; + return 0; + } + + /* new mapping is needed */ + idc = osd_idc_add(env, osd, fid); + if (IS_ERR(idc)) + return PTR_ERR(idc); + + if (obj->oo_db) + idc->oic_dnode = obj->oo_db->db_object; + + return 0; +} -- 1.8.3.1