Whamcloud - gitweb
LU-7053 osd: don't lookup object at insert 92/17092/14
authorAlex Zhuravlev <alexey.zhuravlev@intel.com>
Mon, 9 Nov 2015 15:51:59 +0000 (18:51 +0300)
committerOleg Drokin <oleg.drokin@intel.com>
Mon, 14 Dec 2015 03:01:27 +0000 (03:01 +0000)
the idea is to cache FID->ino/type mapping in per-thread cache
at declaration/object creation. then insert can find that information
and don't lookup object in LU/OI. this should avoid potential deadlock
with lu_object_find() and iget(). also, this should improve performance
as in the majority of cases required data is filled locally by create.

stats collected for sanity-benchmark:
lustre-MDT0000: 448306 created, lookups: 8910 in OI, 8910 in FLD
meaning we have to lookup ino 448K times and only 9K times we had
to use OI, in 439K cases we found ino in the cache.

Change-Id: Ifa66c2d074f04e47d0d85b735f57dc506aa65f4c
Signed-off-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-on: http://review.whamcloud.com/17092
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Fan Yong <fan.yong@intel.com>
Reviewed-by: wangdi <di.wang@intel.com>
Reviewed-by: James Simmons <uja.ornl@yahoo.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/mdd/mdd_dir.c
lustre/mdd/mdd_internal.h
lustre/obdclass/local_storage.c
lustre/osd-ldiskfs/osd_handler.c
lustre/osd-ldiskfs/osd_internal.h
lustre/osd-ldiskfs/osd_oi.h

index 7efd9d3..e7b72c0 100644 (file)
@@ -1447,11 +1447,11 @@ static int mdd_declare_finish_unlink(const struct lu_env *env,
        if (rc != 0)
                return rc;
 
        if (rc != 0)
                return rc;
 
-       rc = orph_declare_index_insert(env, obj, mdd_object_type(obj), handle);
+       rc = mdo_declare_destroy(env, obj, handle);
        if (rc != 0)
                return rc;
 
        if (rc != 0)
                return rc;
 
-       rc = mdo_declare_destroy(env, obj, handle);
+       rc = orph_declare_index_insert(env, obj, mdd_object_type(obj), handle);
        if (rc != 0)
                return rc;
 
        if (rc != 0)
                return rc;
 
index 12929d6..4f99f02 100644 (file)
@@ -577,33 +577,22 @@ int mdo_declare_index_insert(const struct lu_env *env, struct mdd_object *obj,
                             const char *name, struct thandle *handle)
 {
        struct dt_object *next  = mdd_object_child(obj);
                             const char *name, struct thandle *handle)
 {
        struct dt_object *next  = mdd_object_child(obj);
-       int               rc    = 0;
+       int               rc;
 
        /*
         * if the object doesn't exist yet, then it's supposed to be created
         * and declaration of the creation should be enough to insert ./..
         */
 
 
        /*
         * if the object doesn't exist yet, then it's supposed to be created
         * and declaration of the creation should be enough to insert ./..
         */
 
-        /* FIXME: remote object should not be awared by MDD layer, but local
-         * creation does not declare insert ./.. (comments above), which
-         * is required by remote directory creation.
-         * This remote check should be removed when mdd_object_exists check is
-         * removed.
-         */
-       if (mdd_object_exists(obj) || mdd_object_remote(obj)) {
-               rc = -ENOTDIR;
-               if (dt_try_as_dir(env, next)) {
-                       struct dt_insert_rec *rec =
-                                       &mdd_env_info(env)->mti_dt_rec;
-
-                       rec->rec_fid = fid;
-                       rec->rec_type = type;
-                       rc = dt_declare_insert(env, next,
-                                              (const struct dt_rec *)rec,
-                                              (const struct dt_key *)name,
-                                              handle);
-               }
-        }
+       rc = -ENOTDIR;
+       if (dt_try_as_dir(env, next)) {
+               struct dt_insert_rec *rec = &mdd_env_info(env)->mti_dt_rec;
+
+               rec->rec_fid = fid;
+               rec->rec_type = type;
+               rc = dt_declare_insert(env, next, (const struct dt_rec *)rec,
+                                      (const struct dt_key *)name, handle);
+       }
 
         return rc;
 }
 
         return rc;
 }
index 317c776..aeba810 100644 (file)
@@ -339,7 +339,7 @@ static struct dt_object *__local_file_create(const struct lu_env *env,
        }
 
        rec->rec_fid = fid;
        }
 
        rec->rec_fid = fid;
-       rec->rec_type = dto->do_lu.lo_header->loh_attr;
+       rec->rec_type = attr->la_mode & S_IFMT;
        rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
                               (const struct dt_key *)name, th);
        if (rc)
        rc = dt_declare_insert(env, parent, (const struct dt_rec *)rec,
                               (const struct dt_key *)name, th);
        if (rc)
@@ -349,11 +349,14 @@ static struct dt_object *__local_file_create(const struct lu_env *env,
                if (!dt_try_as_dir(env, dto))
                        GOTO(trans_stop, rc = -ENOTDIR);
 
                if (!dt_try_as_dir(env, dto))
                        GOTO(trans_stop, rc = -ENOTDIR);
 
+               rec->rec_type = S_IFDIR;
+               rec->rec_fid = fid;
                rc = dt_declare_insert(env, dto, (const struct dt_rec *)rec,
                                (const struct dt_key *)".", th);
                if (rc != 0)
                        GOTO(trans_stop, rc);
 
                rc = dt_declare_insert(env, dto, (const struct dt_rec *)rec,
                                (const struct dt_key *)".", th);
                if (rc != 0)
                        GOTO(trans_stop, rc);
 
+               rec->rec_fid = lu_object_fid(&parent->do_lu);
                rc = dt_declare_insert(env, dto, (const struct dt_rec *)rec,
                                (const struct dt_key *)"..", th);
                if (rc != 0)
                rc = dt_declare_insert(env, dto, (const struct dt_rec *)rec,
                                (const struct dt_key *)"..", th);
                if (rc != 0)
index bc04181..bf46774 100644 (file)
@@ -175,6 +175,153 @@ static int osd_root_get(const struct lu_env *env,
 }
 
 /*
 }
 
 /*
+ * the following set of functions are used to maintain per-thread
+ * cache of FID->ino mapping. this mechanism is needed to resolve
+ * FID to inode at dt_insert() which in turn stores ino in the
+ * directory entries to keep ldiskfs compatible with ext[34].
+ * due to locking-originated restrictions we can't lookup ino
+ * using LU cache (deadlock is possible). lookup using OI is quite
+ * expensive. so instead we maintain this cache and methods like
+ * dt_create() fill it. so in the majority of cases dt_insert() is
+ * able to find needed mapping in lockless manner.
+ */
+static struct osd_idmap_cache *
+osd_idc_find(const struct lu_env *env, struct osd_device *osd,
+            const struct lu_fid *fid)
+{
+       struct osd_thread_info  *oti   = osd_oti_get(env);
+       struct osd_idmap_cache  *idc    = oti->oti_ins_cache;
+       int i;
+       for (i = 0; i < oti->oti_ins_cache_used; i++) {
+               if (!lu_fid_eq(&idc[i].oic_fid, fid))
+                       continue;
+               if (idc[i].oic_dev != osd)
+                       continue;
+
+               return idc + i;
+       }
+
+       return NULL;
+}
+
+static struct osd_idmap_cache *
+osd_idc_add(const struct lu_env *env, struct osd_device *osd,
+           const struct lu_fid *fid)
+{
+       struct osd_thread_info  *oti   = osd_oti_get(env);
+       struct osd_idmap_cache  *idc;
+       int i;
+
+       if (unlikely(oti->oti_ins_cache_used >= oti->oti_ins_cache_size)) {
+               i = oti->oti_ins_cache_size * 2;
+               if (i == 0)
+                       i = OSD_INS_CACHE_SIZE;
+               OBD_ALLOC(idc, sizeof(*idc) * i);
+               if (idc == NULL)
+                       return ERR_PTR(-ENOMEM);
+               if (oti->oti_ins_cache != NULL) {
+                       memcpy(idc, oti->oti_ins_cache,
+                              oti->oti_ins_cache_used * sizeof(*idc));
+                       OBD_FREE(oti->oti_ins_cache,
+                                oti->oti_ins_cache_used * sizeof(*idc));
+               }
+               oti->oti_ins_cache = idc;
+               oti->oti_ins_cache_size = i;
+       }
+
+       idc = oti->oti_ins_cache + oti->oti_ins_cache_used++;
+       idc->oic_fid = *fid;
+       idc->oic_dev = osd;
+       idc->oic_lid.oii_ino = 0;
+       idc->oic_lid.oii_gen = 0;
+       idc->oic_remote = 0;
+
+       return idc;
+}
+
+/*
+ * lookup mapping for the given fid in the cache, initialize a
+ * new one if not found. the initialization checks whether the
+ * object is local or remote. for local objects, OI is used to
+ * learn ino/generation. the function is used when the caller
+ * has no information about the object, e.g. at dt_insert().
+ */
+static struct osd_idmap_cache *
+osd_idc_find_or_init(const struct lu_env *env, struct osd_device *osd,
+                    const struct lu_fid *fid)
+{
+       struct osd_idmap_cache *idc;
+       int rc;
+
+       idc = osd_idc_find(env, osd, fid);
+       LASSERT(!IS_ERR(idc));
+       if (idc != NULL)
+               return idc;
+
+       /* new mapping is needed */
+       idc = osd_idc_add(env, osd, fid);
+       if (IS_ERR(idc))
+               return idc;
+
+       /* initialize it */
+       rc = osd_remote_fid(env, osd, fid);
+       if (unlikely(rc < 0))
+               return ERR_PTR(rc);
+
+       if (rc == 0) {
+               /* the object is local, lookup in OI */
+               /* XXX: probably cheaper to lookup in LU first? */
+               rc = osd_oi_lookup(osd_oti_get(env), osd, fid,
+                                  &idc->oic_lid, 0);
+               if (unlikely(rc < 0)) {
+                       CERROR("can't lookup: rc = %d\n", rc);
+                       return ERR_PTR(rc);
+               }
+       } else {
+               /* the object is remote */
+               idc->oic_remote = 1;
+       }
+
+       return idc;
+}
+
+/*
+ * lookup mapping for given FID and fill it from the given object.
+ * the object is lolcal by definition.
+ */
+static int osd_idc_find_and_init(const struct lu_env *env,
+                                struct osd_device *osd,
+                                struct osd_object *obj)
+{
+       const struct lu_fid     *fid = lu_object_fid(&obj->oo_dt.do_lu);
+       struct osd_idmap_cache  *idc;
+
+       idc = osd_idc_find(env, osd, fid);
+       LASSERT(!IS_ERR(idc));
+       if (idc != NULL) {
+               if (obj->oo_inode == NULL)
+                       return 0;
+               if (idc->oic_lid.oii_ino != obj->oo_inode->i_ino) {
+                       LASSERT(idc->oic_lid.oii_ino == 0);
+                       idc->oic_lid.oii_ino = obj->oo_inode->i_ino;
+                       idc->oic_lid.oii_gen = obj->oo_inode->i_generation;
+               }
+               return 0;
+       }
+
+       /* new mapping is needed */
+       idc = osd_idc_add(env, osd, fid);
+       if (IS_ERR(idc))
+               return PTR_ERR(idc);
+
+       if (obj->oo_inode != NULL) {
+               idc->oic_lid.oii_ino = obj->oo_inode->i_ino;
+               idc->oic_lid.oii_gen = obj->oo_inode->i_generation;
+       }
+       return 0;
+}
+
+/*
  * OSD object methods.
  */
 
  * OSD object methods.
  */
 
@@ -1216,6 +1363,9 @@ static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt,
 
        oh = container_of0(th, struct osd_thandle, ot_super);
 
 
        oh = container_of0(th, struct osd_thandle, ot_super);
 
+       /* reset OI cache for safety */
+       oti->oti_ins_cache_used = 0;
+
        remove_agents = oh->ot_remove_agents;
 
        qtrans = oh->ot_quota_trans;
        remove_agents = oh->ot_remove_agents;
 
        qtrans = oh->ot_quota_trans;
@@ -2238,6 +2388,12 @@ static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
 
         ah->dah_parent = parent;
         ah->dah_mode = child_mode;
 
         ah->dah_parent = parent;
         ah->dah_mode = child_mode;
+
+       if (parent != NULL && !dt_object_remote(parent)) {
+               /* will help to find FID->ino at dt_insert("..") */
+               struct osd_object *pobj = osd_dt_obj(parent);
+               osd_idc_find_and_init(env, osd_obj2dev(pobj), pobj);
+       }
 }
 
 static void osd_attr_init(struct osd_thread_info *info, struct osd_object *obj,
 }
 
 static void osd_attr_init(struct osd_thread_info *info, struct osd_object *obj,
@@ -2413,6 +2569,10 @@ static int osd_declare_object_create(const struct lu_env *env,
        if (rc != 0)
                RETURN(rc);
 
        if (rc != 0)
                RETURN(rc);
 
+       /* will help to find FID->ino mapping at dt_insert() */
+       rc = osd_idc_find_and_init(env, osd_obj2dev(osd_dt_obj(dt)),
+                                  osd_dt_obj(dt));
+
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
@@ -2488,6 +2648,13 @@ static int osd_declare_object_destroy(const struct lu_env *env,
        /* data to be truncated */
        rc = osd_declare_inode_qid(env, i_uid_read(inode), i_gid_read(inode),
                                   0, oh, obj, true, NULL, false);
        /* data to be truncated */
        rc = osd_declare_inode_qid(env, i_uid_read(inode), i_gid_read(inode),
                                   0, oh, obj, true, NULL, false);
+       if (rc)
+               RETURN(rc);
+
+       /* will help to find FID->ino when this object is being
+        * added to PENDING/ */
+       rc = osd_idc_find_and_init(env, osd_obj2dev(obj), obj);
+
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
@@ -2835,7 +3002,7 @@ static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt,
        const struct lu_fid     *fid    = lu_object_fid(&dt->do_lu);
        struct osd_object       *obj    = osd_dt_obj(dt);
        struct osd_thread_info  *info   = osd_oti_get(env);
        const struct lu_fid     *fid    = lu_object_fid(&dt->do_lu);
        struct osd_object       *obj    = osd_dt_obj(dt);
        struct osd_thread_info  *info   = osd_oti_get(env);
-       int                      result;
+       int                      result, on_ost = 0;
 
        ENTRY;
 
 
        ENTRY;
 
@@ -2861,13 +3028,15 @@ static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt,
 
                        fid_to_ostid(fid, oi);
                        ostid_to_fid(tfid, oi, 0);
 
                        fid_to_ostid(fid, oi);
                        ostid_to_fid(tfid, oi, 0);
+                       on_ost = 1;
                        result = osd_ea_fid_set(info, obj->oo_inode, tfid,
                                                LMAC_FID_ON_OST, 0);
                } else {
                        result = osd_ea_fid_set(info, obj->oo_inode, tfid,
                                                LMAC_FID_ON_OST, 0);
                } else {
+                       on_ost = fid_is_on_ost(info, osd_obj2dev(obj),
+                                              fid, OI_CHECK_FLD);
                        result = osd_ea_fid_set(info, obj->oo_inode, fid,
                        result = osd_ea_fid_set(info, obj->oo_inode, fid,
-                               fid_is_on_ost(info, osd_obj2dev(obj),
-                                             fid, OI_CHECK_FLD) ?
-                               LMAC_FID_ON_OST : 0, 0);
+                                               on_ost ? LMAC_FID_ON_OST : 0,
+                                               0);
                }
                if (obj->oo_dt.do_body_ops == &osd_body_ops_new)
                        obj->oo_dt.do_body_ops = &osd_body_ops;
                }
                if (obj->oo_dt.do_body_ops == &osd_body_ops_new)
                        obj->oo_dt.do_body_ops = &osd_body_ops;
@@ -2876,6 +3045,15 @@ static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt,
        if (result == 0)
                result = __osd_oi_insert(env, obj, fid, th);
 
        if (result == 0)
                result = __osd_oi_insert(env, obj, fid, th);
 
+       /* a small optimization - dt_insert() isn't usually applied
+        * to OST objects, so we don't need to cache OI mapping for
+        * OST objects */
+       if (result == 0 && on_ost == 0) {
+               struct osd_device *osd = osd_dev(dt->do_lu.lo_dev);
+               result = osd_idc_find_and_init(env, osd, obj);
+               LASSERT(result == 0);
+       }
+
        LASSERT(ergo(result == 0,
                     dt_object_exists(dt) && !dt_object_remote(dt)));
         LINVRNT(osd_invariant(obj));
        LASSERT(ergo(result == 0,
                     dt_object_exists(dt) && !dt_object_remote(dt)));
         LINVRNT(osd_invariant(obj));
@@ -3397,7 +3575,7 @@ static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
                 result = 0;
        } else if (feat == &dt_directory_features) {
                 dt->do_index_ops = &osd_index_ea_ops;
                 result = 0;
        } else if (feat == &dt_directory_features) {
                 dt->do_index_ops = &osd_index_ea_ops;
-               if (obj->oo_inode != NULL && S_ISDIR(obj->oo_inode->i_mode))
+               if (obj->oo_inode == NULL || S_ISDIR(obj->oo_inode->i_mode))
                         result = 0;
                 else
                         result = -ENOTDIR;
                         result = 0;
                 else
                         result = -ENOTDIR;
@@ -4030,7 +4208,7 @@ static int osd_add_dot_dotdot(struct osd_thread_info *info,
                 if (dir->oo_compat_dot_created) {
                         result = -EEXIST;
                 } else {
                 if (dir->oo_compat_dot_created) {
                         result = -EEXIST;
                 } else {
-                        LASSERT(inode == parent_dir);
+                       LASSERT(inode->i_ino == parent_dir->i_ino);
                         dir->oo_compat_dot_created = 1;
                         result = 0;
                 }
                         dir->oo_compat_dot_created = 1;
                         result = 0;
                 }
@@ -4397,65 +4575,6 @@ out:
 }
 
 /**
 }
 
 /**
- * Find the osd object for given fid.
- *
- * \param fid need to find the osd object having this fid
- *
- * \retval osd_object on success
- * \retval        -ve on error
- */
-static struct osd_object *osd_object_find(const struct lu_env *env,
-                                         struct dt_object *dt,
-                                         const struct lu_fid *fid)
-{
-        struct lu_device  *ludev = dt->do_lu.lo_dev;
-        struct osd_object *child = NULL;
-        struct lu_object  *luch;
-        struct lu_object  *lo;
-
-       /*
-        * at this point topdev might not exist yet
-        * (i.e. MGS is preparing profiles). so we can
-        * not rely on topdev and instead lookup with
-        * our device passed as topdev. this can't work
-        * if the object isn't cached yet (as osd doesn't
-        * allocate lu_header). IOW, the object must be
-        * in the cache, otherwise lu_object_alloc() crashes
-        * -bzzz
-        */
-       luch = lu_object_find_at(env, ludev->ld_site->ls_top_dev == NULL ?
-                                ludev : ludev->ld_site->ls_top_dev,
-                                fid, NULL);
-       if (!IS_ERR(luch)) {
-               if (lu_object_exists(luch)) {
-                       lo = lu_object_locate(luch->lo_header, ludev->ld_type);
-                       if (lo != NULL)
-                               child = osd_obj(lo);
-                       else
-                               LU_OBJECT_DEBUG(D_ERROR, env, luch,
-                                               "lu_object can't be located"
-                                               DFID"\n", PFID(fid));
-
-                        if (child == NULL) {
-                                lu_object_put(env, luch);
-                                CERROR("Unable to get osd_object\n");
-                                child = ERR_PTR(-ENOENT);
-                        }
-                } else {
-                        LU_OBJECT_DEBUG(D_ERROR, env, luch,
-                                        "lu_object does not exists "DFID"\n",
-                                        PFID(fid));
-                       lu_object_put(env, luch);
-                        child = ERR_PTR(-ENOENT);
-                }
-       } else {
-               child = ERR_CAST(luch);
-       }
-
-       return child;
-}
-
-/**
  * Put the osd object once done with it.
  *
  * \param obj osd object that needs to be put
  * Put the osd object once done with it.
  *
  * \param obj osd object that needs to be put
@@ -4474,28 +4593,34 @@ static int osd_index_declare_ea_insert(const struct lu_env *env,
 {
        struct osd_thandle      *oh;
        struct osd_device       *osd   = osd_dev(dt->do_lu.lo_dev);
 {
        struct osd_thandle      *oh;
        struct osd_device       *osd   = osd_dev(dt->do_lu.lo_dev);
-       struct lu_fid           *fid = (struct lu_fid *)rec;
+       struct dt_insert_rec    *rec1   = (struct dt_insert_rec *)rec;
+       const struct lu_fid     *fid    = rec1->rec_fid;
        int                      credits, rc = 0;
        int                      credits, rc = 0;
+       struct osd_idmap_cache  *idc;
        ENTRY;
 
        LASSERT(!dt_object_remote(dt));
        LASSERT(handle != NULL);
        ENTRY;
 
        LASSERT(!dt_object_remote(dt));
        LASSERT(handle != NULL);
+       LASSERT(fid != NULL);
+       LASSERT(rec1->rec_type != 0);
 
        oh = container_of0(handle, struct osd_thandle, ot_super);
        LASSERT(oh->ot_handle == NULL);
 
        credits = osd_dto_credits_noquota[DTO_INDEX_INSERT];
 
        oh = container_of0(handle, struct osd_thandle, ot_super);
        LASSERT(oh->ot_handle == NULL);
 
        credits = osd_dto_credits_noquota[DTO_INDEX_INSERT];
-       if (fid != NULL) {
-               rc = osd_remote_fid(env, osd, fid);
-               if (unlikely(rc < 0))
-                       RETURN(rc);
-               if (rc > 0) {
-                       /* a reference to remote inode is represented by an
-                        * agent inode which we have to create */
-                       credits += osd_dto_credits_noquota[DTO_OBJECT_CREATE];
-                       credits += osd_dto_credits_noquota[DTO_INDEX_INSERT];
-               }
-               rc = 0;
+
+       /* we can't call iget() while a transactions is running
+        * (this can lead to a deadlock), but we need to know
+        * inum and object type. so we find this information at
+        * declaration and cache in per-thread info */
+       idc = osd_idc_find_or_init(env, osd, fid);
+       if (IS_ERR(idc))
+               RETURN(PTR_ERR(idc));
+       if (idc->oic_remote) {
+               /* a reference to remote inode is represented by an
+                * agent inode which we have to create */
+               credits += osd_dto_credits_noquota[DTO_OBJECT_CREATE];
+               credits += osd_dto_credits_noquota[DTO_INDEX_INSERT];
        }
 
        osd_trans_declare_op(env, oh, OSD_OT_INSERT, credits);
        }
 
        osd_trans_declare_op(env, oh, OSD_OT_INSERT, credits);
@@ -4536,9 +4661,8 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
        const struct lu_fid     *fid    = rec1->rec_fid;
        const char              *name = (const char *)key;
        struct osd_thread_info  *oti   = osd_oti_get(env);
        const struct lu_fid     *fid    = rec1->rec_fid;
        const char              *name = (const char *)key;
        struct osd_thread_info  *oti   = osd_oti_get(env);
-       struct osd_inode_id     *id    = &oti->oti_id;
        struct inode            *child_inode = NULL;
        struct inode            *child_inode = NULL;
-       struct osd_object       *child = NULL;
+       struct osd_idmap_cache  *idc;
        int                     rc;
        ENTRY;
 
        int                     rc;
        ENTRY;
 
@@ -4553,14 +4677,22 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
 
        LASSERTF(fid_is_sane(fid), "fid"DFID" is insane!\n", PFID(fid));
 
 
        LASSERTF(fid_is_sane(fid), "fid"DFID" is insane!\n", PFID(fid));
 
-       rc = osd_remote_fid(env, osd, fid);
-       if (rc < 0) {
-               CERROR("%s: Can not find object "DFID" rc %d\n",
-                      osd_name(osd), PFID(fid), rc);
-               RETURN(rc);
+       idc = osd_idc_find(env, osd, fid);
+       if (unlikely(idc == NULL)) {
+               /* this dt_insert() wasn't declared properly, so
+                * FID is missing in OI cache. we better do not
+                * lookup FID in FLDB/OI and don't risk to deadlock,
+                * but in some special cases (lfsck testing, etc)
+                * it's much simpler than fixing a caller */
+               CERROR("%s: "DFID" wasn't declared for insert\n",
+                      osd_name(osd), PFID(fid));
+               dump_stack();
+               idc = osd_idc_find_or_init(env, osd, fid);
+               if (IS_ERR(idc))
+                       RETURN(PTR_ERR(idc));
        }
 
        }
 
-       if (rc == 1) {
+       if (idc->oic_remote) {
                /* Insert remote entry */
                if (strcmp(name, dotdot) == 0 && strlen(name) == 2) {
                        struct osd_mdobj_map    *omm = osd->od_mdt_map;
                /* Insert remote entry */
                if (strcmp(name, dotdot) == 0 && strlen(name) == 2) {
                        struct osd_mdobj_map    *omm = osd->od_mdt_map;
@@ -4586,15 +4718,23 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
                }
        } else {
                /* Insert local entry */
                }
        } else {
                /* Insert local entry */
-               child = osd_object_find(env, dt, fid);
-               if (IS_ERR(child)) {
-                       CERROR("%s: Can not find object "DFID"%u:%u: rc = %d\n",
-                              osd_name(osd), PFID(fid),
-                              id->oii_ino, id->oii_gen,
-                              (int)PTR_ERR(child));
-                       RETURN(PTR_ERR(child));
+               if (unlikely(idc->oic_lid.oii_ino == 0)) {
+                       /* for a reason OI cache wasn't filled properly */
+                       CERROR("%s: OIC for "DFID" isn't filled\n",
+                              osd_name(osd), PFID(fid));
+                       RETURN(-EINVAL);
                }
                }
-               child_inode = igrab(child->oo_inode);
+               child_inode = oti->oti_inode;
+               if (unlikely(child_inode == NULL)) {
+                       struct ldiskfs_inode_info *lii;
+                       OBD_ALLOC_PTR(lii);
+                       if (lii == NULL)
+                               RETURN(-ENOMEM);
+                       child_inode = oti->oti_inode = &lii->vfs_inode;
+               }
+               child_inode->i_sb = osd_sb(osd);
+               child_inode->i_ino = idc->oic_lid.oii_ino;
+               child_inode->i_mode = rec1->rec_type & S_IFMT;
        }
 
        rc = osd_ea_add_rec(env, obj, child_inode, name, fid, th);
        }
 
        rc = osd_ea_add_rec(env, obj, child_inode, name, fid, th);
@@ -4602,9 +4742,8 @@ static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
        CDEBUG(D_INODE, "parent %lu insert %s:%lu rc = %d\n",
               obj->oo_inode->i_ino, name, child_inode->i_ino, rc);
 
        CDEBUG(D_INODE, "parent %lu insert %s:%lu rc = %d\n",
               obj->oo_inode->i_ino, name, child_inode->i_ino, rc);
 
-       iput(child_inode);
-       if (child != NULL)
-               osd_object_put(env, child);
+       if (child_inode && child_inode != oti->oti_inode)
+               iput(child_inode);
        LASSERT(osd_invariant(obj));
        osd_trans_exec_check(env, th, OSD_OT_INSERT);
        RETURN(rc);
        LASSERT(osd_invariant(obj));
        osd_trans_exec_check(env, th, OSD_OT_INSERT);
        RETURN(rc);
@@ -5828,6 +5967,7 @@ static void osd_key_fini(const struct lu_context *ctx,
 {
        struct osd_thread_info *info = data;
        struct ldiskfs_inode_info *lli = LDISKFS_I(info->oti_inode);
 {
        struct osd_thread_info *info = data;
        struct ldiskfs_inode_info *lli = LDISKFS_I(info->oti_inode);
+       struct osd_idmap_cache  *idc = info->oti_ins_cache;
 
        if (info->oti_inode != NULL)
                OBD_FREE_PTR(lli);
 
        if (info->oti_inode != NULL)
                OBD_FREE_PTR(lli);
@@ -5837,6 +5977,12 @@ static void osd_key_fini(const struct lu_context *ctx,
        lu_buf_free(&info->oti_iobuf.dr_pg_buf);
        lu_buf_free(&info->oti_iobuf.dr_bl_buf);
        lu_buf_free(&info->oti_big_buf);
        lu_buf_free(&info->oti_iobuf.dr_pg_buf);
        lu_buf_free(&info->oti_iobuf.dr_bl_buf);
        lu_buf_free(&info->oti_big_buf);
+       if (idc != NULL) {
+               LASSERT(info->oti_ins_cache_size > 0);
+               OBD_FREE(idc, sizeof(*idc) * info->oti_ins_cache_size);
+               info->oti_ins_cache = NULL;
+               info->oti_ins_cache_size = 0;
+       }
        OBD_FREE_PTR(info);
 }
 
        OBD_FREE_PTR(info);
 }
 
index b47f653..19c7315 100644 (file)
@@ -503,6 +503,8 @@ struct osd_iobuf {
        unsigned int       dr_init_at;  /* the line iobuf was initialized */
 };
 
        unsigned int       dr_init_at;  /* the line iobuf was initialized */
 };
 
+#define OSD_INS_CACHE_SIZE     8
+
 struct osd_thread_info {
        const struct lu_env   *oti_env;
        /**
 struct osd_thread_info {
        const struct lu_env   *oti_env;
        /**
@@ -567,6 +569,11 @@ struct osd_thread_info {
 
        struct osd_idmap_cache oti_cache;
 
 
        struct osd_idmap_cache oti_cache;
 
+       /* dedicated OI cache for insert (which needs inum) */
+       struct osd_idmap_cache *oti_ins_cache;
+       int                    oti_ins_cache_size;
+       int                    oti_ins_cache_used;
+
         int                    oti_r_locks;
         int                    oti_w_locks;
         int                    oti_txns;
         int                    oti_r_locks;
         int                    oti_w_locks;
         int                    oti_txns;
index c9d727d..fbdcffa 100644 (file)
@@ -83,10 +83,12 @@ struct osd_inode_id {
        __u32 oii_gen; /* inode generation */
 };
 
        __u32 oii_gen; /* inode generation */
 };
 
+/* OI cache entry */
 struct osd_idmap_cache {
        struct lu_fid           oic_fid;
        struct osd_inode_id     oic_lid;
        struct osd_device       *oic_dev;
 struct osd_idmap_cache {
        struct lu_fid           oic_fid;
        struct osd_inode_id     oic_lid;
        struct osd_device       *oic_dev;
+       __u16                   oic_remote:1;   /* FID isn't local */
 };
 
 static inline void osd_id_pack(struct osd_inode_id *tgt,
 };
 
 static inline void osd_id_pack(struct osd_inode_id *tgt,