Whamcloud - gitweb
LU-7910 osd: do not lookup child objects in osd_dir_insert() 33/21333/11
authorAlex Zhuravlev <alexey.zhuravlev@intel.com>
Fri, 15 Jul 2016 13:05:29 +0000 (17:05 +0400)
committerOleg Drokin <oleg.drokin@intel.com>
Tue, 31 Jan 2017 04:07:58 +0000 (04:07 +0000)
instead cache FID->dnode mapping in @env at declarations.

Change-Id: I2c2ab17cd6e158e9462715f12c21da2c2b8402db
Signed-off-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-on: https://review.whamcloud.com/21333
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Tested-by: Jenkins
Reviewed-by: Fan Yong <fan.yong@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/osd-zfs/osd_handler.c
lustre/osd-zfs/osd_index.c
lustre/osd-zfs/osd_internal.h
lustre/osd-zfs/osd_object.c
lustre/osd-zfs/osd_oi.c

index 8755499..0b079ad 100644 (file)
@@ -280,6 +280,8 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt,
        oh = container_of0(th, struct osd_thandle, ot_super);
        INIT_LIST_HEAD(&unlinked);
        list_splice_init(&oh->ot_unlinked_list, &unlinked);
+       /* reset OI cache for safety */
+       osd_oti_get(env)->oti_ins_cache_used = 0;
 
        if (oh->ot_assigned == 0) {
                LASSERT(oh->ot_tx);
@@ -697,16 +699,20 @@ static void osd_key_fini(const struct lu_context *ctx,
                         struct lu_context_key *key, void *data)
 {
        struct osd_thread_info *info = data;
+       struct osd_idmap_cache *idc = info->oti_ins_cache;
 
+       if (idc != NULL) {
+               LASSERT(info->oti_ins_cache_size > 0);
+               OBD_FREE(idc, sizeof(*idc) * info->oti_ins_cache_size);
+               info->oti_ins_cache = NULL;
+               info->oti_ins_cache_size = 0;
+       }
        OBD_FREE_PTR(info);
 }
 
 static void osd_key_exit(const struct lu_context *ctx,
                         struct lu_context_key *key, void *data)
 {
-       struct osd_thread_info *info = data;
-
-       memset(info, 0, sizeof(*info));
 }
 
 struct lu_context_key osd_key = {
index f0b588c..ee97cb3 100644 (file)
@@ -455,11 +455,19 @@ static int osd_declare_dir_insert(const struct lu_env *env,
                                  const struct dt_key *key,
                                  struct thandle *th)
 {
-       struct osd_object  *obj = osd_dt_obj(dt);
-       struct osd_thandle *oh;
-       uint64_t object;
+       struct osd_object       *obj = osd_dt_obj(dt);
+       struct osd_device       *osd = osd_obj2dev(obj);
+       const struct dt_insert_rec *rec1;
+       const struct lu_fid     *fid;
+       struct osd_thandle      *oh;
+       uint64_t                 object;
        ENTRY;
 
+       rec1 = (struct dt_insert_rec *)rec;
+       fid = rec1->rec_fid;
+       LASSERT(fid != NULL);
+       LASSERT(rec1->rec_type != 0);
+
        LASSERT(th != NULL);
        oh = container_of0(th, struct osd_thandle, ot_super);
 
@@ -474,64 +482,9 @@ static int osd_declare_dir_insert(const struct lu_env *env,
         * before insertion */
        dmu_tx_hold_zap(oh->ot_tx, object, TRUE, NULL);
 
-       RETURN(0);
-}
-
-/**
- * Find the osd object for given fid.
- *
- * \param fid need to find the osd object having this fid
- *
- * \retval osd_object on success
- * \retval        -ve on error
- */
-struct osd_object *osd_object_find(const struct lu_env *env,
-                                  struct dt_object *dt,
-                                  const struct lu_fid *fid)
-{
-       struct lu_device         *ludev = dt->do_lu.lo_dev;
-       struct osd_object        *child = NULL;
-       struct lu_object         *luch;
-       struct lu_object         *lo;
+       osd_idc_find_or_init(env, osd, fid);
 
-       /*
-        * at this point topdev might not exist yet
-        * (i.e. MGS is preparing profiles). so we can
-        * not rely on topdev and instead lookup with
-        * our device passed as topdev. this can't work
-        * if the object isn't cached yet (as osd doesn't
-        * allocate lu_header). IOW, the object must be
-        * in the cache, otherwise lu_object_alloc() crashes
-        * -bzzz
-        */
-       luch = lu_object_find_at(env, ludev, fid, NULL);
-       if (IS_ERR(luch))
-               return (void *)luch;
-
-       if (lu_object_exists(luch)) {
-               lo = lu_object_locate(luch->lo_header, ludev->ld_type);
-               if (lo != NULL)
-                       child = osd_obj(lo);
-               else
-                       LU_OBJECT_DEBUG(D_ERROR, env, luch,
-                                       "%s: object can't be located "DFID,
-                                       osd_dev(ludev)->od_svname, PFID(fid));
-
-               if (child == NULL) {
-                       lu_object_put(env, luch);
-                       CERROR("%s: Unable to get osd_object "DFID"\n",
-                              osd_dev(ludev)->od_svname, PFID(fid));
-                       child = ERR_PTR(-ENOENT);
-               }
-       } else {
-               LU_OBJECT_DEBUG(D_ERROR, env, luch,
-                               "%s: lu_object does not exists "DFID,
-                               osd_dev(ludev)->od_svname, PFID(fid));
-               lu_object_put(env, luch);
-               child = ERR_PTR(-ENOENT);
-       }
-
-       return child;
+       RETURN(0);
 }
 
 /**
@@ -567,8 +520,8 @@ static int osd_seq_exists(const struct lu_env *env, struct osd_device *osd,
        RETURN(ss->ss_node_id == range->lsr_index);
 }
 
-static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
-                         const struct lu_fid *fid)
+int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
+                  const struct lu_fid *fid)
 {
        struct seq_server_site  *ss = osd_seq_site(osd);
        ENTRY;
@@ -611,9 +564,8 @@ static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt,
        struct osd_device   *osd = osd_obj2dev(parent);
        struct dt_insert_rec *rec1 = (struct dt_insert_rec *)rec;
        const struct lu_fid *fid = rec1->rec_fid;
-       struct osd_thandle  *oh;
-       struct osd_object   *child = NULL;
-       __u32                attr;
+       struct osd_thandle *oh;
+       struct osd_idmap_cache *idc;
        char                *name = (char *)key;
        int                  rc;
        ENTRY;
@@ -626,61 +578,54 @@ static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt,
        LASSERT(th != NULL);
        oh = container_of0(th, struct osd_thandle, ot_super);
 
-       rc = osd_remote_fid(env, osd, fid);
-       if (rc < 0) {
-               CERROR("%s: Can not find object "DFID": rc = %d\n",
-                      osd->od_svname, PFID(fid), rc);
-               RETURN(rc);
+       idc = osd_idc_find(env, osd, fid);
+       if (unlikely(idc == NULL)) {
+               /* this dt_insert() wasn't declared properly, so
+                * FID is missing in OI cache. we better do not
+                * lookup FID in FLDB/OI and don't risk to deadlock,
+                * but in some special cases (lfsck testing, etc)
+                * it's much simpler than fixing a caller */
+               CERROR("%s: "DFID" wasn't declared for insert\n",
+                      osd_name(osd), PFID(fid));
+               idc = osd_idc_find_or_init(env, osd, fid);
+               if (IS_ERR(idc))
+                       RETURN(PTR_ERR(idc));
        }
 
-       if (unlikely(rc == 1)) {
+       if (idc->oic_remote) {
                /* Insert remote entry */
                memset(&oti->oti_zde.lzd_reg, 0, sizeof(oti->oti_zde.lzd_reg));
                oti->oti_zde.lzd_reg.zde_type = IFTODT(rec1->rec_type & S_IFMT);
        } else {
-               /*
-                * To simulate old Orion setups with ./..  stored in the
-                * directories
-                */
-               /* Insert local entry */
-               child = osd_object_find(env, dt, fid);
-               if (IS_ERR(child))
-                       RETURN(PTR_ERR(child));
-
-               LASSERT(child->oo_db);
+               if (unlikely(idc->oic_dnode == 0)) {
+                       /* for a reason OI cache wasn't filled properly */
+                       CERROR("%s: OIC for "DFID" isn't filled\n",
+                              osd_name(osd), PFID(fid));
+                       RETURN(-EINVAL);
+               }
                if (name[0] == '.') {
                        if (name[1] == 0) {
                                /* do not store ".", instead generate it
                                 * during iteration */
                                GOTO(out, rc = 0);
                        } else if (name[1] == '.' && name[2] == 0) {
-                               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PARENT)) {
-                                       struct lu_fid tfid = *fid;
-
-                                       osd_object_put(env, child);
-                                       tfid.f_oid--;
-                                       child = osd_object_find(env, dt, &tfid);
-                                       if (IS_ERR(child))
-                                               RETURN(PTR_ERR(child));
-
-                                       LASSERT(child->oo_db);
-                               }
+                               uint64_t dnode = idc->oic_dnode;
+                               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PARENT))
+                                       dnode--;
 
                                /* update parent dnode in the child.
                                 * later it will be used to generate ".." */
                                rc = osd_object_sa_update(parent,
                                                 SA_ZPL_PARENT(osd),
-                                                &child->oo_db->db_object,
-                                                8, oh);
+                                                &dnode, 8, oh);
 
                                GOTO(out, rc);
                        }
                }
                CLASSERT(sizeof(oti->oti_zde.lzd_reg) == 8);
                CLASSERT(sizeof(oti->oti_zde) % 8 == 0);
-               attr = child->oo_dt.do_lu.lo_header ->loh_attr;
-               oti->oti_zde.lzd_reg.zde_type = IFTODT(attr & S_IFMT);
-               oti->oti_zde.lzd_reg.zde_dnode = child->oo_db->db_object;
+               oti->oti_zde.lzd_reg.zde_type = IFTODT(rec1->rec_type & S_IFMT);
+               oti->oti_zde.lzd_reg.zde_dnode = idc->oic_dnode;
        }
 
        oti->oti_zde.lzd_fid = *fid;
@@ -696,8 +641,6 @@ static int osd_dir_insert(const struct lu_env *env, struct dt_object *dt,
                                (void *)&oti->oti_zde, oh->ot_tx);
 
 out:
-       if (child != NULL)
-               osd_object_put(env, child);
 
        RETURN(rc);
 }
index 892fbc0..cbaac32 100644 (file)
@@ -160,6 +160,18 @@ struct osa_attr {
        uint64_t        ctime[2];
 };
 
+
+#define OSD_INS_CACHE_SIZE     8
+
+/* OI cache entry */
+struct osd_idmap_cache {
+       struct osd_device       *oic_dev;
+       struct lu_fid           oic_fid;
+       /** max 2^48 dnodes per dataset, avoid spilling into another word */
+       uint64_t                oic_dnode:DN_MAX_OBJECT_SHIFT,
+                               oic_remote:1;      /* FID isn't local */
+};
+
 /* max.number of regular attrubites the callers may ask for */
 #define OSD_MAX_IN_BULK                13
 
@@ -192,6 +204,11 @@ struct osd_thread_info {
 
        struct lquota_id_info    oti_qi;
        struct lu_seq_range      oti_seq_range;
+
+       /* dedicated OI cache for insert (which needs inum) */
+       struct osd_idmap_cache *oti_ins_cache;
+       int                    oti_ins_cache_size;
+       int                    oti_ins_cache_used;
 };
 
 extern struct lu_context_key osd_key;
@@ -485,6 +502,15 @@ uint64_t osd_get_name_n_idx(const struct lu_env *env, struct osd_device *osd,
 int osd_options_init(void);
 int osd_ost_seq_exists(const struct lu_env *env, struct osd_device *osd,
                       __u64 seq);
+int osd_idc_find_and_init(const struct lu_env *env, struct osd_device *osd,
+                         struct osd_object *obj);
+struct osd_idmap_cache *osd_idc_find_or_init(const struct lu_env *env,
+                                            struct osd_device *osd,
+                                            const struct lu_fid *fid);
+struct osd_idmap_cache *osd_idc_find(const struct lu_env *env,
+                                    struct osd_device *osd,
+                                    const struct lu_fid *fid);
+
 /* osd_index.c */
 int osd_index_try(const struct lu_env *env, struct dt_object *dt,
                  const struct dt_index_features *feat);
@@ -496,6 +522,8 @@ int osd_zap_cursor_init(zap_cursor_t **zc, struct objset *os,
                        uint64_t id, uint64_t dirhash);
 void osd_zap_cursor_fini(zap_cursor_t *zc);
 uint64_t osd_zap_cursor_serialize(zap_cursor_t *zc);
+int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
+                  const struct lu_fid *fid);
 
 /* osd_xattr.c */
 int __osd_xattr_load(struct osd_device *osd, uint64_t dnode,
index 4cfd24b..1cdb0e1 100644 (file)
@@ -543,6 +543,10 @@ static int osd_declare_object_destroy(const struct lu_env *env,
        else
                dmu_tx_hold_zap(oh->ot_tx, osd->od_unlinkedid, TRUE, NULL);
 
+       /* will help to find FID->ino when this object is being
+        * added to PENDING/ */
+       osd_idc_find_and_init(env, osd, obj);
+
        RETURN(0);
 }
 
@@ -1124,6 +1128,13 @@ static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
 
        ah->dah_parent = parent;
        ah->dah_mode = child_mode;
+
+       if (parent != NULL && !dt_object_remote(parent)) {
+               /* will help to find FID->ino at dt_insert("..") */
+               struct osd_object *pobj = osd_dt_obj(parent);
+
+               osd_idc_find_and_init(env, osd_obj2dev(pobj), pobj);
+       }
 }
 
 static int osd_declare_object_create(const struct lu_env *env,
@@ -1193,8 +1204,12 @@ static int osd_declare_object_create(const struct lu_env *env,
        dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, FALSE, NULL);
        dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, FALSE, NULL);
 
+       /* will help to find FID->ino mapping at dt_insert() */
+       osd_idc_find_and_init(env, osd, obj);
+
        rc = osd_declare_quota(env, osd, attr->la_uid, attr->la_gid, 1, oh,
                               false, NULL, false);
+
        RETURN(rc);
 }
 
@@ -1555,6 +1570,7 @@ static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
        rc = __osd_sa_xattr_update(env, obj, oh);
        if (rc)
                GOTO(out, rc);
+       osd_idc_find_and_init(env, osd, obj);
 
        /* Add new object to inode accounting.
         * Errors are not considered as fatal */
index 5383858..d17ae6f 100644 (file)
@@ -764,3 +764,141 @@ int osd_options_init(void)
 
        return 0;
 }
+
+/*
+ * the following set of functions are used to maintain per-thread
+ * cache of FID->ino mapping. this mechanism is used to avoid
+ * expensive LU/OI lookups.
+ */
+struct osd_idmap_cache *osd_idc_find(const struct lu_env *env,
+                                    struct osd_device *osd,
+                                    const struct lu_fid *fid)
+{
+       struct osd_thread_info *oti = osd_oti_get(env);
+       struct osd_idmap_cache *idc = oti->oti_ins_cache;
+       int i;
+
+       for (i = 0; i < oti->oti_ins_cache_used; i++) {
+               if (!lu_fid_eq(&idc[i].oic_fid, fid))
+                       continue;
+               if (idc[i].oic_dev != osd)
+                       continue;
+
+               return idc + i;
+       }
+
+       return NULL;
+}
+
+struct osd_idmap_cache *osd_idc_add(const struct lu_env *env,
+                                   struct osd_device *osd,
+                                   const struct lu_fid *fid)
+{
+       struct osd_thread_info *oti = osd_oti_get(env);
+       struct osd_idmap_cache *idc;
+       int i;
+
+       if (unlikely(oti->oti_ins_cache_used >= oti->oti_ins_cache_size)) {
+               i = oti->oti_ins_cache_size * 2;
+               LASSERT(i < 1000);
+               if (i == 0)
+                       i = OSD_INS_CACHE_SIZE;
+               OBD_ALLOC(idc, sizeof(*idc) * i);
+               if (idc == NULL)
+                       return ERR_PTR(-ENOMEM);
+               if (oti->oti_ins_cache != NULL) {
+                       memcpy(idc, oti->oti_ins_cache,
+                              oti->oti_ins_cache_used * sizeof(*idc));
+                       OBD_FREE(oti->oti_ins_cache,
+                                oti->oti_ins_cache_used * sizeof(*idc));
+               }
+               oti->oti_ins_cache = idc;
+               oti->oti_ins_cache_size = i;
+       }
+
+       idc = &oti->oti_ins_cache[oti->oti_ins_cache_used++];
+       idc->oic_fid = *fid;
+       idc->oic_dev = osd;
+       idc->oic_dnode = 0;
+       idc->oic_remote = 0;
+
+       return idc;
+}
+
+/**
+ * Lookup mapping for the given fid in the cache
+ *
+ * Initialize a new one if not found. the initialization checks whether
+ * the object is local or remote. for the local objects, OI is used to
+ * learn dnode#. the function is used when the caller has no information
+ * about the object, e.g. at dt_insert().
+ */
+struct osd_idmap_cache *osd_idc_find_or_init(const struct lu_env *env,
+                                            struct osd_device *osd,
+                                            const struct lu_fid *fid)
+{
+       struct osd_idmap_cache *idc;
+       int rc;
+
+       idc = osd_idc_find(env, osd, fid);
+       if (idc != NULL)
+               return idc;
+
+       /* new mapping is needed */
+       idc = osd_idc_add(env, osd, fid);
+       if (IS_ERR(idc))
+               return idc;
+
+       /* initialize it */
+       rc = osd_remote_fid(env, osd, fid);
+       if (unlikely(rc < 0))
+               return ERR_PTR(rc);
+
+       if (rc == 0) {
+               /* the object is local, lookup in OI */
+               uint64_t dnode;
+
+               rc = osd_fid_lookup(env, osd, fid, &dnode);
+               if (unlikely(rc < 0)) {
+                       CERROR("%s: can't lookup: rc = %d\n",
+                              osd->od_svname, rc);
+                       return ERR_PTR(rc);
+               }
+               LASSERT(dnode < (1ULL << DN_MAX_OBJECT_SHIFT));
+               idc->oic_dnode = dnode;
+       } else {
+               /* the object is remote */
+               idc->oic_remote = 1;
+       }
+
+       return idc;
+}
+
+/*
+ * lookup mapping for given FID and fill it from the given object.
+ * the object is local by definition.
+ */
+int osd_idc_find_and_init(const struct lu_env *env, struct osd_device *osd,
+                         struct osd_object *obj)
+{
+       const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
+       struct osd_idmap_cache *idc;
+
+       idc = osd_idc_find(env, osd, fid);
+       if (idc != NULL) {
+               if (obj->oo_db == NULL)
+                       return 0;
+               idc->oic_dnode = obj->oo_db->db_object;
+               return 0;
+       }
+
+       /* new mapping is needed */
+       idc = osd_idc_add(env, osd, fid);
+       if (IS_ERR(idc))
+               return PTR_ERR(idc);
+
+       if (obj->oo_db)
+               idc->oic_dnode = obj->oo_db->db_object;
+
+       return 0;
+}