Remove fld lookup during configuration, so to avoid accessing
FLDB on MDT0 before MDT0 is setup.
1. add od_is_ost to check whether the FID is on OST, instead
of lookup in FLDB.
2. add oic_device in OI cache, so in OI cache lookup, it will
try to match device to avoid the ino of agent inode is being
retrieved from OI cache. Then osd_remote_fid, which will trigger
fld lookup, can be removed from osd index lookup.
Signed-off-by: wang di <di.wang@intel.com>
Change-Id: I85649431f47ad8aa8bd1e46a7b074e15b080bb1d
Reviewed-on: http://review.whamcloud.com/7266
Tested-by: Hudson
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Niu Yawei <yawei.niu@intel.com>
Reviewed-by: Mike Pershin <mike.pershin@intel.com>
Reviewed-by: John L. Hammond <john.hammond@intel.com>
Reviewed-by: Fan Yong <fan.yong@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
12 files changed:
fld->lsf_name, seq, -EIO);
RETURN(-EIO);
} else {
fld->lsf_name, seq, -EIO);
RETURN(-EIO);
} else {
- LASSERT(fld->lsf_control_exp);
+ if (fld->lsf_control_exp == NULL) {
+ CERROR("%s: lookup "LPX64", but not connects to MDT0"
+ "yet: rc = %d.\n", fld->lsf_name, seq, -EIO);
+ RETURN(-EIO);
+ }
/* send request to mdt0 i.e. super seq. controller.
* This is temporary solution, long term solution is fld
* replication on all mdt servers.
/* send request to mdt0 i.e. super seq. controller.
* This is temporary solution, long term solution is fld
* replication on all mdt servers.
return ((__u64)ver << 48) | ((seq & 0xffff) << 32) | oid;
}
return ((__u64)ver << 48) | ((seq & 0xffff) << 32) | oid;
}
+static inline __u32 idif_ost_idx(obd_seq seq)
+{
+ return (seq >> 16) & 0xffff;
+}
+
/* extract ost index from IDIF FID */
static inline __u32 fid_idif_ost_idx(const struct lu_fid *fid)
{
/* extract ost index from IDIF FID */
static inline __u32 fid_idif_ost_idx(const struct lu_fid *fid)
{
- return (fid_seq(fid) >> 16) & 0xffff;
+ return idif_ost_idx(fid_seq(fid));
}
/* extract OST sequence (group) from a wire ost_id (id/seq) pair */
}
/* extract OST sequence (group) from a wire ost_id (id/seq) pair */
int server_name2index(const char *svname, __u32 *idx, const char **endptr);
int server_name2svname(const char *label, char *svname, const char **endptr,
size_t svsize);
int server_name2index(const char *svname, __u32 *idx, const char **endptr);
int server_name2svname(const char *label, char *svname, const char **endptr,
size_t svsize);
+int server_name_is_ost(const char *svname);
int lustre_put_lsi(struct super_block *sb);
int lustre_start_simple(char *obdname, char *type, char *uuid,
int lustre_put_lsi(struct super_block *sb);
int lustre_start_simple(char *obdname, char *type, char *uuid,
}
EXPORT_SYMBOL(server_name2svname);
}
EXPORT_SYMBOL(server_name2svname);
+/**
+ * check server name is OST.
+ **/
+int server_name_is_ost(const char *svname)
+{
+ const char *dash;
+ int rc;
+
+ /* We use server_name2fsname() just for parsing */
+ rc = server_name2fsname(svname, NULL, &dash);
+ if (rc != 0)
+ return rc;
+
+ dash++;
+
+ if (strncmp(dash, "OST", 3) == 0)
+ return 1;
+ return 0;
+}
+EXPORT_SYMBOL(server_name_is_ost);
/* Get the index from the obd name.
rc = server type, or
/* Get the index from the obd name.
rc = server type, or
RETURN(-ENOENT);
/* Search order: 1. per-thread cache. */
RETURN(-ENOENT);
/* Search order: 1. per-thread cache. */
- if (lu_fid_eq(fid, &oic->oic_fid)) {
+ if (lu_fid_eq(fid, &oic->oic_fid) &&
+ likely(oic->oic_dev == dev)) {
id = &oic->oic_lid;
goto iget;
}
id = &oic->oic_lid;
goto iget;
}
}
int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd,
}
int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd,
- const struct lu_fid *fid, struct lu_seq_range *range)
+ obd_seq seq, struct lu_seq_range *range)
{
struct seq_server_site *ss = osd_seq_site(osd);
int rc;
{
struct seq_server_site *ss = osd_seq_site(osd);
int rc;
- if (fid_is_idif(fid)) {
+ if (fid_seq_is_idif(seq)) {
fld_range_set_ost(range);
fld_range_set_ost(range);
- range->lsr_index = fid_idif_ost_idx(fid);
+ range->lsr_index = idif_ost_idx(seq);
- if (!fid_seq_in_fldb(fid_seq(fid))) {
+ if (!fid_seq_in_fldb(seq)) {
fld_range_set_mdt(range);
if (ss != NULL)
/* FIXME: If ss is NULL, it suppose not get lsr_index
fld_range_set_mdt(range);
if (ss != NULL)
/* FIXME: If ss is NULL, it suppose not get lsr_index
LASSERT(ss != NULL);
fld_range_set_any(range);
LASSERT(ss != NULL);
fld_range_set_any(range);
- rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
+ rc = fld_server_lookup(env, ss->ss_server_fld, seq, range);
- CERROR("%s: cannot find FLD range for "DFID": rc = %d\n",
- osd_name(osd), PFID(fid), rc);
+ CERROR("%s: cannot find FLD range for "LPX64": rc = %d\n",
+ osd_name(osd), seq, rc);
if (fid_is_norm(lu_object_fid(&dt->do_lu)) &&
!fid_is_last_id(lu_object_fid(&dt->do_lu)))
osd_fld_lookup(env, osd_dt_dev(handle->th_dev),
if (fid_is_norm(lu_object_fid(&dt->do_lu)) &&
!fid_is_last_id(lu_object_fid(&dt->do_lu)))
osd_fld_lookup(env, osd_dt_dev(handle->th_dev),
- lu_object_fid(&dt->do_lu), range);
+ fid_seq(lu_object_fid(&dt->do_lu)), range);
-static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
- struct lu_fid *fid)
+static int osd_mdt_seq_exists(const struct lu_env *env,
+ struct osd_device *osd, obd_seq seq)
{
struct lu_seq_range *range = &osd_oti_get(env)->oti_seq_range;
struct seq_server_site *ss = osd_seq_site(osd);
int rc;
ENTRY;
{
struct lu_seq_range *range = &osd_oti_get(env)->oti_seq_range;
struct seq_server_site *ss = osd_seq_site(osd);
int rc;
ENTRY;
- /* Those FID seqs, which are not in FLDB, must be local seq */
- if (unlikely(!fid_seq_in_fldb(fid_seq(fid)) || ss == NULL))
- RETURN(0);
+ if (ss == NULL)
+ RETURN(1);
- rc = osd_fld_lookup(env, osd, fid, range);
+ /* XXX: currently, each MDT only store avaible sequence on disk, and no
+ * allocated sequences information on disk, so we have to lookup FLDB,
+ * but it probably makes more sense also store allocated sequence
+ * locally, so we do not need do remote FLDB lookup in OSD */
+ rc = osd_fld_lookup(env, osd, seq, range);
- CERROR("%s: Can not lookup fld for "DFID"\n",
- osd_name(osd), PFID(fid));
- RETURN(rc);
+ CERROR("%s: Can not lookup fld for "LPX64"\n",
+ osd_name(osd), seq);
+ RETURN(0);
- RETURN(ss->ss_node_id != range->lsr_index);
+ RETURN(ss->ss_node_id == range->lsr_index);
+}
+
+static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
+ struct lu_fid *fid)
+{
+ ENTRY;
+
+ /* FID seqs not in FLDB, must be local seq */
+ if (unlikely(!fid_seq_in_fldb(fid_seq(fid))))
+ RETURN(0);
+
+ /* Currently only check this for FID on MDT */
+ if (osd_mdt_seq_exists(env, osd, fid_seq(fid)))
+ RETURN(0);
+
+ RETURN(1);
id->oii_ino, id->oii_gen, info);
info->oti_cache.oic_lid = *id;
info->oti_cache.oic_fid = *fid;
id->oii_ino, id->oii_gen, info);
info->oti_cache.oic_lid = *id;
info->oti_cache.oic_fid = *fid;
+ info->oti_cache.oic_dev = osd;
rc = osd_ea_fid_get(env, obj, ino, fid, id);
else
osd_id_gen(id, ino, OSD_OII_NOGEN);
rc = osd_ea_fid_get(env, obj, ino, fid, id);
else
osd_id_gen(id, ino, OSD_OII_NOGEN);
- if (rc != 0 || osd_remote_fid(env, dev, fid)) {
fid_zero(&oic->oic_fid);
GOTO(out, rc);
}
fid_zero(&oic->oic_fid);
GOTO(out, rc);
}
+ if (server_name_is_ost(o->od_svname))
+ o->od_is_ost = 1;
+
rc = osd_obj_map_init(env, o);
if (rc != 0)
GOTO(out_mnt, rc);
rc = osd_obj_map_init(env, o);
if (rc != 0)
GOTO(out_mnt, rc);
od_noscrub:1,
od_dirent_journal:1,
od_igif_inoi:1,
od_noscrub:1,
od_dirent_journal:1,
od_igif_inoi:1,
+ od_check_ff:1,
+ od_is_ost:1;
unsigned long od_capa_timeout;
__u32 od_capa_alg;
unsigned long od_capa_timeout;
__u32 od_capa_alg;
int osd_scrub_dump(struct osd_device *dev, char *buf, int len);
int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd,
int osd_scrub_dump(struct osd_device *dev, char *buf, int len);
int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd,
- const struct lu_fid *fid, struct lu_seq_range *range);
+ obd_seq seq, struct lu_seq_range *range);
int osd_delete_from_remote_parent(const struct lu_env *env,
struct osd_device *osd,
int osd_delete_from_remote_parent(const struct lu_env *env,
struct osd_device *osd,
const struct lu_fid *fid,
struct osd_inode_id *id);
const struct lu_fid *fid,
struct osd_inode_id *id);
+int osd_ost_seq_exists(struct osd_thread_info *info, struct osd_device *osd,
+ __u64 seq);
/* osd_quota_fmt.c */
int walk_tree_dqentry(const struct lu_env *env, struct osd_object *obj,
int type, uint blk, int depth, uint index,
/* osd_quota_fmt.c */
int walk_tree_dqentry(const struct lu_env *env, struct osd_object *obj,
int type, uint blk, int depth, uint index,
return osd->od_dt_dev.dd_lu_dev.ld_obd->obd_name;
}
return osd->od_dt_dev.dd_lu_dev.ld_obd->obd_name;
}
extern const struct dt_body_operations osd_body_ops;
extern struct lu_context_key osd_key;
extern const struct dt_body_operations osd_body_ops;
extern struct lu_context_key osd_key;
int fid_is_on_ost(struct osd_thread_info *info, struct osd_device *osd,
const struct lu_fid *fid, enum oi_check_flags flags)
{
int fid_is_on_ost(struct osd_thread_info *info, struct osd_device *osd,
const struct lu_fid *fid, enum oi_check_flags flags)
{
- struct lu_seq_range *range = &info->oti_seq_range;
- int rc;
ENTRY;
if (flags & OI_KNOWN_ON_OST)
ENTRY;
if (flags & OI_KNOWN_ON_OST)
if (!(flags & OI_CHECK_FLD))
RETURN(0);
if (!(flags & OI_CHECK_FLD))
RETURN(0);
- rc = osd_fld_lookup(info->oti_env, osd, fid, range);
- if (rc != 0) {
- CERROR("%s: Can not lookup fld for "DFID"\n",
- osd_name(osd), PFID(fid));
- RETURN(rc);
- }
-
- CDEBUG(D_INFO, "fid "DFID" range "DRANGE"\n", PFID(fid),
- PRANGE(range));
-
- if (fld_range_is_ost(range))
struct osd_idmap_cache {
struct lu_fid oic_fid;
struct osd_inode_id oic_lid;
struct osd_idmap_cache {
struct lu_fid oic_fid;
struct osd_inode_id oic_lid;
+ struct osd_device *oic_dev;
};
static inline void osd_id_pack(struct osd_inode_id *tgt,
};
static inline void osd_id_pack(struct osd_inode_id *tgt,
strncpy(o->od_svname, lustre_cfg_string(cfg, 4),
sizeof(o->od_svname) - 1);
strncpy(o->od_svname, lustre_cfg_string(cfg, 4),
sizeof(o->od_svname) - 1);
+ if (server_name_is_ost(o->od_svname))
+ o->od_is_ost = 1;
+
rc = -udmu_objset_open(o->od_mntdev, &o->od_objset);
if (rc) {
CERROR("can't open objset %s: %d\n", o->od_mntdev, rc);
rc = -udmu_objset_open(o->od_mntdev, &o->od_objset);
if (rc) {
CERROR("can't open objset %s: %d\n", o->od_mntdev, rc);
lu_object_put(env, &obj->oo_dt.do_lu);
}
lu_object_put(env, &obj->oo_dt.do_lu);
}
-static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
- struct lu_fid *fid)
+static int osd_mdt_seq_exists(const struct lu_env *env, struct osd_device *osd,
+ obd_seq seq)
{
struct lu_seq_range *range = &osd_oti_get(env)->oti_seq_range;
struct seq_server_site *ss = osd_seq_site(osd);
int rc;
ENTRY;
{
struct lu_seq_range *range = &osd_oti_get(env)->oti_seq_range;
struct seq_server_site *ss = osd_seq_site(osd);
int rc;
ENTRY;
- if (!fid_is_norm(fid) && !fid_is_root(fid))
- RETURN(0);
+ if (ss == NULL)
+ RETURN(1);
- rc = osd_fld_lookup(env, osd, fid, range);
+ /* XXX: currently, each MDT only store avaible sequence on disk,
+ * and no allocated sequences information on disk, so it has to
+ * lookup FLDB. It probably makes more sense also store allocated
+ * sequence locally, so we do not need do remote FLDB lookup in OSD */
+ rc = osd_fld_lookup(env, osd, seq, range);
- CERROR("%s: Can not lookup fld for "DFID"\n",
- osd_name(osd), PFID(fid));
- RETURN(rc);
+ CERROR("%s: Can not lookup fld for "LPX64"\n",
+ osd_name(osd), seq);
+ RETURN(0);
- RETURN(ss->ss_node_id != range->lsr_index);
+ RETURN(ss->ss_node_id == range->lsr_index);
+}
+
+static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
+ struct lu_fid *fid)
+{
+ ENTRY;
+
+ if (!fid_is_norm(fid) && !fid_is_root(fid))
+ RETURN(0);
+
+ /* Currently, it only used to check FID on MDT */
+ if (osd_mdt_seq_exists(env, osd, fid_seq(fid)))
+ RETURN(0);
+
+ RETURN(1);
struct lprocfs_stats *od_stats;
uint64_t od_root;
struct lprocfs_stats *od_stats;
uint64_t od_root;
struct osd_oi **od_oi_table;
unsigned int od_oi_count;
struct osd_seq_list od_seq_list;
unsigned int od_rdonly:1,
od_xattr_in_sa:1,
struct osd_oi **od_oi_table;
unsigned int od_oi_count;
struct osd_seq_list od_seq_list;
unsigned int od_rdonly:1,
od_xattr_in_sa:1,
+ od_quota_iused_est:1,
+ od_is_ost:1;
+
char od_mntdev[128];
char od_svname[128];
char od_mntdev[128];
char od_svname[128];
int osd_options_init(void);
int osd_convert_root_to_new_seq(const struct lu_env *env,
struct osd_device *o);
int osd_options_init(void);
int osd_convert_root_to_new_seq(const struct lu_env *env,
struct osd_device *o);
+int osd_ost_seq_exists(const struct lu_env *env, struct osd_device *osd,
+ __u64 seq);
/* osd_index.c */
int osd_index_try(const struct lu_env *env, struct dt_object *dt,
const struct dt_index_features *feat);
int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd,
/* osd_index.c */
int osd_index_try(const struct lu_env *env, struct dt_object *dt,
const struct dt_index_features *feat);
int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd,
- const struct lu_fid *fid, struct lu_seq_range *range);
+ obd_seq seq, struct lu_seq_range *range);
/* osd_xattr.c */
int __osd_xattr_load(udmu_objset_t *uos, uint64_t dnode, nvlist_t **sa_xattr);
/* osd_xattr.c */
int __osd_xattr_load(udmu_objset_t *uos, uint64_t dnode, nvlist_t **sa_xattr);
* the object is located (tgt index) and it is MDT or OST object.
*/
int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd,
* the object is located (tgt index) and it is MDT or OST object.
*/
int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd,
- const struct lu_fid *fid, struct lu_seq_range *range)
+ obd_seq seq, struct lu_seq_range *range)
{
struct seq_server_site *ss = osd_seq_site(osd);
int rc;
{
struct seq_server_site *ss = osd_seq_site(osd);
int rc;
- if (fid_is_idif(fid)) {
+ if (fid_seq_is_idif(seq)) {
fld_range_set_ost(range);
fld_range_set_ost(range);
- range->lsr_index = fid_idif_ost_idx(fid);
+ range->lsr_index = idif_ost_idx(seq);
- if (!fid_seq_in_fldb(fid_seq(fid))) {
+ if (!fid_seq_in_fldb(seq)) {
fld_range_set_mdt(range);
if (ss != NULL)
/* FIXME: If ss is NULL, it suppose not get lsr_index
fld_range_set_mdt(range);
if (ss != NULL)
/* FIXME: If ss is NULL, it suppose not get lsr_index
LASSERT(ss != NULL);
fld_range_set_any(range);
LASSERT(ss != NULL);
fld_range_set_any(range);
- rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
+ rc = fld_server_lookup(env, ss->ss_server_fld, seq, range);
- CERROR("%s: cannot find FLD range for "DFID": rc = %d\n",
- osd_name(osd), PFID(fid), rc);
+ CERROR("%s: cannot find FLD range for "LPX64": rc = %d\n",
+ osd_name(osd), seq, rc);
return rc;
}
int fid_is_on_ost(const struct lu_env *env, struct osd_device *osd,
const struct lu_fid *fid)
{
return rc;
}
int fid_is_on_ost(const struct lu_env *env, struct osd_device *osd,
const struct lu_fid *fid)
{
- struct lu_seq_range *range = &osd_oti_get(env)->oti_seq_range;
- int rc;
ENTRY;
if (fid_is_idif(fid))
RETURN(1);
ENTRY;
if (fid_is_idif(fid))
RETURN(1);
- rc = osd_fld_lookup(env, osd, fid, range);
- if (rc != 0) {
- CERROR("%s: Can not lookup fld for "DFID"\n",
- osd_name(osd), PFID(fid));
- RETURN(rc);
- }
-
- CDEBUG(D_INFO, "fid "DFID" range "DRANGE"\n", PFID(fid),
- PRANGE(range));
-
- if (fld_range_is_ost(range))
if (osd_seq->os_compat_dirs == NULL)
GOTO(out, rc = -ENOMEM);
if (osd_seq->os_compat_dirs == NULL)
GOTO(out, rc = -ENOMEM);
- rc = osd_oi_lookup(env, osd, osd->od_root, "O", &oi);
- if (rc != 0) {
- CERROR("%s: Can not find O: rc = %d\n", osd_name(osd), rc);
- GOTO(out, rc);
- }
-
+ oi.oi_zapid = osd->od_O_id;
sprintf(seq_name, (fid_seq_is_rsvd(seq) ||
fid_seq_is_mdt0(seq)) ? LPU64 : LPX64i,
fid_seq_is_idif(seq) ? 0 : seq);
sprintf(seq_name, (fid_seq_is_rsvd(seq) ||
fid_seq_is_mdt0(seq)) ? LPU64 : LPX64i,
fid_seq_is_idif(seq) ? 0 : seq);
osd_ost_seq_init(env, o);
/* Create on-disk indexes to maintain per-UID/GID inode usage.
* Those new indexes are created in the top-level ZAP outside the
osd_ost_seq_init(env, o);
/* Create on-disk indexes to maintain per-UID/GID inode usage.
* Those new indexes are created in the top-level ZAP outside the