instead cache FID->dnode mapping in @env at declarations.
Change-Id: I2c2ab17cd6e158e9462715f12c21da2c2b8402db
Signed-off-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-on: https://review.whamcloud.com/21333
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Tested-by: Jenkins
Reviewed-by: Fan Yong <fan.yong@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
oh = container_of0(th, struct osd_thandle, ot_super);
INIT_LIST_HEAD(&unlinked);
list_splice_init(&oh->ot_unlinked_list, &unlinked);
+ /* reset OI cache for safety */
+ osd_oti_get(env)->oti_ins_cache_used = 0;
if (oh->ot_assigned == 0) {
LASSERT(oh->ot_tx);
struct lu_context_key *key, void *data)
{
struct osd_thread_info *info = data;
+ struct osd_idmap_cache *idc = info->oti_ins_cache;
+ if (idc != NULL) {
+ LASSERT(info->oti_ins_cache_size > 0);
+ OBD_FREE(idc, sizeof(*idc) * info->oti_ins_cache_size);
+ info->oti_ins_cache = NULL;
+ info->oti_ins_cache_size = 0;
+ }
OBD_FREE_PTR(info);
}
static void osd_key_exit(const struct lu_context *ctx,
struct lu_context_key *key, void *data)
{
- struct osd_thread_info *info = data;
-
- memset(info, 0, sizeof(*info));
}
struct lu_context_key osd_key = {
const struct dt_key *key,
struct thandle *th)
{
- struct osd_object *obj = osd_dt_obj(dt);
- struct osd_thandle *oh;
- uint64_t object;
+ struct osd_object *obj = osd_dt_obj(dt);
+ struct osd_device *osd = osd_obj2dev(obj);
+ const struct dt_insert_rec *rec1;
+ const struct lu_fid *fid;
+ struct osd_thandle *oh;
+ uint64_t object;
ENTRY;
+ rec1 = (struct dt_insert_rec *)rec;
+ fid = rec1->rec_fid;
+ LASSERT(fid != NULL);
+ LASSERT(rec1->rec_type != 0);
+
LASSERT(th != NULL);
oh = container_of0(th, struct osd_thandle, ot_super);
* before insertion */
dmu_tx_hold_zap(oh->ot_tx, object, TRUE, NULL);
- RETURN(0);
-}
-
-/**
- * Find the osd object for given fid.
- *
- * \param fid need to find the osd object having this fid
- *
- * \retval osd_object on success
- * \retval -ve on error
- */
-struct osd_object *osd_object_find(const struct lu_env *env,
- struct dt_object *dt,
- const struct lu_fid *fid)
-{
- struct lu_device *ludev = dt->do_lu.lo_dev;
- struct osd_object *child = NULL;
- struct lu_object *luch;
- struct lu_object *lo;
+ osd_idc_find_or_init(env, osd, fid);
- /*
- * at this point topdev might not exist yet
- * (i.e. MGS is preparing profiles). so we can
- * not rely on topdev and instead lookup with
- * our device passed as topdev. this can't work
- * if the object isn't cached yet (as osd doesn't
- * allocate lu_header). IOW, the object must be
- * in the cache, otherwise lu_object_alloc() crashes
- * -bzzz
- */
- luch = lu_object_find_at(env, ludev, fid, NULL);
- if (IS_ERR(luch))
- return (void *)luch;
-
- if (lu_object_exists(luch)) {
- lo = lu_object_locate(luch->lo_header, ludev->ld_type);
- if (lo != NULL)
- child = osd_obj(lo);
- else
- LU_OBJECT_DEBUG(D_ERROR, env, luch,
- "%s: object can't be located "DFID,
- osd_dev(ludev)->od_svname, PFID(fid));
-
- if (child == NULL) {
- lu_object_put(env, luch);
- CERROR("%s: Unable to get osd_object "DFID"\n",
- osd_dev(ludev)->od_svname, PFID(fid));
- child = ERR_PTR(-ENOENT);
- }
- } else {
- LU_OBJECT_DEBUG(D_ERROR, env, luch,
- "%s: lu_object does not exists "DFID,
- osd_dev(ludev)->od_svname, PFID(fid));
- lu_object_put(env, luch);
- child = ERR_PTR(-ENOENT);
- }
-
- return child;
+ RETURN(0);
}
/**
RETURN(ss->ss_node_id == range->lsr_index);
}
-static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
- const struct lu_fid *fid)
+int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
+ const struct lu_fid *fid)
{
struct seq_server_site *ss = osd_seq_site(osd);
ENTRY;
struct osd_device *osd = osd_obj2dev(parent);
struct dt_insert_rec *rec1 = (struct dt_insert_rec *)rec;
const struct lu_fid *fid = rec1->rec_fid;
- struct osd_thandle *oh;
- struct osd_object *child = NULL;
- __u32 attr;
+ struct osd_thandle *oh;
+ struct osd_idmap_cache *idc;
char *name = (char *)key;
int rc;
ENTRY;
LASSERT(th != NULL);
oh = container_of0(th, struct osd_thandle, ot_super);
- rc = osd_remote_fid(env, osd, fid);
- if (rc < 0) {
- CERROR("%s: Can not find object "DFID": rc = %d\n",
- osd->od_svname, PFID(fid), rc);
- RETURN(rc);
+ idc = osd_idc_find(env, osd, fid);
+ if (unlikely(idc == NULL)) {
+ /* this dt_insert() wasn't declared properly, so
+ * FID is missing in OI cache. we better do not
+ * lookup FID in FLDB/OI and don't risk to deadlock,
+ * but in some special cases (lfsck testing, etc)
+ * it's much simpler than fixing a caller */
+ CERROR("%s: "DFID" wasn't declared for insert\n",
+ osd_name(osd), PFID(fid));
+ idc = osd_idc_find_or_init(env, osd, fid);
+ if (IS_ERR(idc))
+ RETURN(PTR_ERR(idc));
}
- if (unlikely(rc == 1)) {
+ if (idc->oic_remote) {
/* Insert remote entry */
memset(&oti->oti_zde.lzd_reg, 0, sizeof(oti->oti_zde.lzd_reg));
oti->oti_zde.lzd_reg.zde_type = IFTODT(rec1->rec_type & S_IFMT);
} else {
- /*
- * To simulate old Orion setups with ./.. stored in the
- * directories
- */
- /* Insert local entry */
- child = osd_object_find(env, dt, fid);
- if (IS_ERR(child))
- RETURN(PTR_ERR(child));
-
- LASSERT(child->oo_db);
+ if (unlikely(idc->oic_dnode == 0)) {
+ /* for a reason OI cache wasn't filled properly */
+ CERROR("%s: OIC for "DFID" isn't filled\n",
+ osd_name(osd), PFID(fid));
+ RETURN(-EINVAL);
+ }
if (name[0] == '.') {
if (name[1] == 0) {
/* do not store ".", instead generate it
* during iteration */
GOTO(out, rc = 0);
} else if (name[1] == '.' && name[2] == 0) {
- if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PARENT)) {
- struct lu_fid tfid = *fid;
-
- osd_object_put(env, child);
- tfid.f_oid--;
- child = osd_object_find(env, dt, &tfid);
- if (IS_ERR(child))
- RETURN(PTR_ERR(child));
-
- LASSERT(child->oo_db);
- }
+ uint64_t dnode = idc->oic_dnode;
+ if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_PARENT))
+ dnode--;
/* update parent dnode in the child.
* later it will be used to generate ".." */
rc = osd_object_sa_update(parent,
SA_ZPL_PARENT(osd),
- &child->oo_db->db_object,
- 8, oh);
+ &dnode, 8, oh);
GOTO(out, rc);
}
}
CLASSERT(sizeof(oti->oti_zde.lzd_reg) == 8);
CLASSERT(sizeof(oti->oti_zde) % 8 == 0);
- attr = child->oo_dt.do_lu.lo_header ->loh_attr;
- oti->oti_zde.lzd_reg.zde_type = IFTODT(attr & S_IFMT);
- oti->oti_zde.lzd_reg.zde_dnode = child->oo_db->db_object;
+ oti->oti_zde.lzd_reg.zde_type = IFTODT(rec1->rec_type & S_IFMT);
+ oti->oti_zde.lzd_reg.zde_dnode = idc->oic_dnode;
}
oti->oti_zde.lzd_fid = *fid;
(void *)&oti->oti_zde, oh->ot_tx);
out:
- if (child != NULL)
- osd_object_put(env, child);
RETURN(rc);
}
uint64_t ctime[2];
};
+
+#define OSD_INS_CACHE_SIZE 8
+
+/* OI cache entry */
+struct osd_idmap_cache {
+ struct osd_device *oic_dev;
+ struct lu_fid oic_fid;
+ /** max 2^48 dnodes per dataset, avoid spilling into another word */
+ uint64_t oic_dnode:DN_MAX_OBJECT_SHIFT,
+ oic_remote:1; /* FID isn't local */
+};
+
/* max.number of regular attrubites the callers may ask for */
#define OSD_MAX_IN_BULK 13
struct lquota_id_info oti_qi;
struct lu_seq_range oti_seq_range;
+
+ /* dedicated OI cache for insert (which needs inum) */
+ struct osd_idmap_cache *oti_ins_cache;
+ int oti_ins_cache_size;
+ int oti_ins_cache_used;
};
extern struct lu_context_key osd_key;
int osd_options_init(void);
int osd_ost_seq_exists(const struct lu_env *env, struct osd_device *osd,
__u64 seq);
+int osd_idc_find_and_init(const struct lu_env *env, struct osd_device *osd,
+ struct osd_object *obj);
+struct osd_idmap_cache *osd_idc_find_or_init(const struct lu_env *env,
+ struct osd_device *osd,
+ const struct lu_fid *fid);
+struct osd_idmap_cache *osd_idc_find(const struct lu_env *env,
+ struct osd_device *osd,
+ const struct lu_fid *fid);
+
/* osd_index.c */
int osd_index_try(const struct lu_env *env, struct dt_object *dt,
const struct dt_index_features *feat);
uint64_t id, uint64_t dirhash);
void osd_zap_cursor_fini(zap_cursor_t *zc);
uint64_t osd_zap_cursor_serialize(zap_cursor_t *zc);
+int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
+ const struct lu_fid *fid);
/* osd_xattr.c */
int __osd_xattr_load(struct osd_device *osd, uint64_t dnode,
else
dmu_tx_hold_zap(oh->ot_tx, osd->od_unlinkedid, TRUE, NULL);
+ /* will help to find FID->ino when this object is being
+ * added to PENDING/ */
+ osd_idc_find_and_init(env, osd, obj);
+
RETURN(0);
}
ah->dah_parent = parent;
ah->dah_mode = child_mode;
+
+ if (parent != NULL && !dt_object_remote(parent)) {
+ /* will help to find FID->ino at dt_insert("..") */
+ struct osd_object *pobj = osd_dt_obj(parent);
+
+ osd_idc_find_and_init(env, osd_obj2dev(pobj), pobj);
+ }
}
static int osd_declare_object_create(const struct lu_env *env,
dmu_tx_hold_zap(oh->ot_tx, osd->od_iusr_oid, FALSE, NULL);
dmu_tx_hold_zap(oh->ot_tx, osd->od_igrp_oid, FALSE, NULL);
+ /* will help to find FID->ino mapping at dt_insert() */
+ osd_idc_find_and_init(env, osd, obj);
+
rc = osd_declare_quota(env, osd, attr->la_uid, attr->la_gid, 1, oh,
false, NULL, false);
+
RETURN(rc);
}
rc = __osd_sa_xattr_update(env, obj, oh);
if (rc)
GOTO(out, rc);
+ osd_idc_find_and_init(env, osd, obj);
/* Add new object to inode accounting.
* Errors are not considered as fatal */
return 0;
}
+
+/*
+ * the following set of functions are used to maintain per-thread
+ * cache of FID->ino mapping. this mechanism is used to avoid
+ * expensive LU/OI lookups.
+ */
+struct osd_idmap_cache *osd_idc_find(const struct lu_env *env,
+ struct osd_device *osd,
+ const struct lu_fid *fid)
+{
+ struct osd_thread_info *oti = osd_oti_get(env);
+ struct osd_idmap_cache *idc = oti->oti_ins_cache;
+ int i;
+
+ for (i = 0; i < oti->oti_ins_cache_used; i++) {
+ if (!lu_fid_eq(&idc[i].oic_fid, fid))
+ continue;
+ if (idc[i].oic_dev != osd)
+ continue;
+
+ return idc + i;
+ }
+
+ return NULL;
+}
+
+struct osd_idmap_cache *osd_idc_add(const struct lu_env *env,
+ struct osd_device *osd,
+ const struct lu_fid *fid)
+{
+ struct osd_thread_info *oti = osd_oti_get(env);
+ struct osd_idmap_cache *idc;
+ int i;
+
+ if (unlikely(oti->oti_ins_cache_used >= oti->oti_ins_cache_size)) {
+ i = oti->oti_ins_cache_size * 2;
+ LASSERT(i < 1000);
+ if (i == 0)
+ i = OSD_INS_CACHE_SIZE;
+ OBD_ALLOC(idc, sizeof(*idc) * i);
+ if (idc == NULL)
+ return ERR_PTR(-ENOMEM);
+ if (oti->oti_ins_cache != NULL) {
+ memcpy(idc, oti->oti_ins_cache,
+ oti->oti_ins_cache_used * sizeof(*idc));
+ OBD_FREE(oti->oti_ins_cache,
+ oti->oti_ins_cache_used * sizeof(*idc));
+ }
+ oti->oti_ins_cache = idc;
+ oti->oti_ins_cache_size = i;
+ }
+
+ idc = &oti->oti_ins_cache[oti->oti_ins_cache_used++];
+ idc->oic_fid = *fid;
+ idc->oic_dev = osd;
+ idc->oic_dnode = 0;
+ idc->oic_remote = 0;
+
+ return idc;
+}
+
+/**
+ * Lookup mapping for the given fid in the cache
+ *
+ * Initialize a new one if not found. the initialization checks whether
+ * the object is local or remote. for the local objects, OI is used to
+ * learn dnode#. the function is used when the caller has no information
+ * about the object, e.g. at dt_insert().
+ */
+struct osd_idmap_cache *osd_idc_find_or_init(const struct lu_env *env,
+ struct osd_device *osd,
+ const struct lu_fid *fid)
+{
+ struct osd_idmap_cache *idc;
+ int rc;
+
+ idc = osd_idc_find(env, osd, fid);
+ if (idc != NULL)
+ return idc;
+
+ /* new mapping is needed */
+ idc = osd_idc_add(env, osd, fid);
+ if (IS_ERR(idc))
+ return idc;
+
+ /* initialize it */
+ rc = osd_remote_fid(env, osd, fid);
+ if (unlikely(rc < 0))
+ return ERR_PTR(rc);
+
+ if (rc == 0) {
+ /* the object is local, lookup in OI */
+ uint64_t dnode;
+
+ rc = osd_fid_lookup(env, osd, fid, &dnode);
+ if (unlikely(rc < 0)) {
+ CERROR("%s: can't lookup: rc = %d\n",
+ osd->od_svname, rc);
+ return ERR_PTR(rc);
+ }
+ LASSERT(dnode < (1ULL << DN_MAX_OBJECT_SHIFT));
+ idc->oic_dnode = dnode;
+ } else {
+ /* the object is remote */
+ idc->oic_remote = 1;
+ }
+
+ return idc;
+}
+
+/*
+ * lookup mapping for given FID and fill it from the given object.
+ * the object is local by definition.
+ */
+int osd_idc_find_and_init(const struct lu_env *env, struct osd_device *osd,
+ struct osd_object *obj)
+{
+ const struct lu_fid *fid = lu_object_fid(&obj->oo_dt.do_lu);
+ struct osd_idmap_cache *idc;
+
+ idc = osd_idc_find(env, osd, fid);
+ if (idc != NULL) {
+ if (obj->oo_db == NULL)
+ return 0;
+ idc->oic_dnode = obj->oo_db->db_object;
+ return 0;
+ }
+
+ /* new mapping is needed */
+ idc = osd_idc_add(env, osd, fid);
+ if (IS_ERR(idc))
+ return PTR_ERR(idc);
+
+ if (obj->oo_db)
+ idc->oic_dnode = obj->oo_db->db_object;
+
+ return 0;
+}