fld->lsf_name, seq, -EIO);
RETURN(-EIO);
} else {
- LASSERT(fld->lsf_control_exp);
+ if (fld->lsf_control_exp == NULL) {
+ CERROR("%s: lookup "LPX64", but not connects to MDT0"
+ "yet: rc = %d.\n", fld->lsf_name, seq, -EIO);
+ RETURN(-EIO);
+ }
/* send request to mdt0 i.e. super seq. controller.
* This is temporary solution, long term solution is fld
* replication on all mdt servers.
return ((__u64)ver << 48) | ((seq & 0xffff) << 32) | oid;
}
+static inline __u32 idif_ost_idx(obd_seq seq)
+{
+ return (seq >> 16) & 0xffff;
+}
+
/* extract ost index from IDIF FID */
static inline __u32 fid_idif_ost_idx(const struct lu_fid *fid)
{
- return (fid_seq(fid) >> 16) & 0xffff;
+ return idif_ost_idx(fid_seq(fid));
}
/* extract OST sequence (group) from a wire ost_id (id/seq) pair */
int server_name2index(const char *svname, __u32 *idx, const char **endptr);
int server_name2svname(const char *label, char *svname, const char **endptr,
size_t svsize);
+int server_name_is_ost(const char *svname);
int lustre_put_lsi(struct super_block *sb);
int lustre_start_simple(char *obdname, char *type, char *uuid,
}
EXPORT_SYMBOL(server_name2svname);
+/**
+ * check server name is OST.
+ **/
+int server_name_is_ost(const char *svname)
+{
+ const char *dash;
+ int rc;
+
+ /* We use server_name2fsname() just for parsing */
+ rc = server_name2fsname(svname, NULL, &dash);
+ if (rc != 0)
+ return rc;
+
+ dash++;
+
+ if (strncmp(dash, "OST", 3) == 0)
+ return 1;
+ return 0;
+}
+EXPORT_SYMBOL(server_name_is_ost);
/* Get the index from the obd name.
rc = server type, or
RETURN(-ENOENT);
/* Search order: 1. per-thread cache. */
- if (lu_fid_eq(fid, &oic->oic_fid)) {
+ if (lu_fid_eq(fid, &oic->oic_fid) &&
+ likely(oic->oic_dev == dev)) {
id = &oic->oic_lid;
goto iget;
}
}
int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd,
- const struct lu_fid *fid, struct lu_seq_range *range)
+ obd_seq seq, struct lu_seq_range *range)
{
struct seq_server_site *ss = osd_seq_site(osd);
int rc;
- if (fid_is_idif(fid)) {
+ if (fid_seq_is_idif(seq)) {
fld_range_set_ost(range);
- range->lsr_index = fid_idif_ost_idx(fid);
+ range->lsr_index = idif_ost_idx(seq);
return 0;
}
- if (!fid_seq_in_fldb(fid_seq(fid))) {
+ if (!fid_seq_in_fldb(seq)) {
fld_range_set_mdt(range);
if (ss != NULL)
/* FIXME: If ss is NULL, it suppose not get lsr_index
LASSERT(ss != NULL);
fld_range_set_any(range);
- rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
+ rc = fld_server_lookup(env, ss->ss_server_fld, seq, range);
if (rc != 0) {
- CERROR("%s: cannot find FLD range for "DFID": rc = %d\n",
- osd_name(osd), PFID(fid), rc);
+ CERROR("%s: cannot find FLD range for "LPX64": rc = %d\n",
+ osd_name(osd), seq, rc);
}
return rc;
}
if (fid_is_norm(lu_object_fid(&dt->do_lu)) &&
!fid_is_last_id(lu_object_fid(&dt->do_lu)))
osd_fld_lookup(env, osd_dt_dev(handle->th_dev),
- lu_object_fid(&dt->do_lu), range);
+ fid_seq(lu_object_fid(&dt->do_lu)), range);
RETURN(rc);
return rc;
}
-static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
- struct lu_fid *fid)
+static int osd_mdt_seq_exists(const struct lu_env *env,
+ struct osd_device *osd, obd_seq seq)
{
struct lu_seq_range *range = &osd_oti_get(env)->oti_seq_range;
struct seq_server_site *ss = osd_seq_site(osd);
int rc;
ENTRY;
- /* Those FID seqs, which are not in FLDB, must be local seq */
- if (unlikely(!fid_seq_in_fldb(fid_seq(fid)) || ss == NULL))
- RETURN(0);
+ if (ss == NULL)
+ RETURN(1);
- rc = osd_fld_lookup(env, osd, fid, range);
+ /* XXX: currently, each MDT only store avaible sequence on disk, and no
+ * allocated sequences information on disk, so we have to lookup FLDB,
+ * but it probably makes more sense also store allocated sequence
+ * locally, so we do not need do remote FLDB lookup in OSD */
+ rc = osd_fld_lookup(env, osd, seq, range);
if (rc != 0) {
- CERROR("%s: Can not lookup fld for "DFID"\n",
- osd_name(osd), PFID(fid));
- RETURN(rc);
+ CERROR("%s: Can not lookup fld for "LPX64"\n",
+ osd_name(osd), seq);
+ RETURN(0);
}
- RETURN(ss->ss_node_id != range->lsr_index);
+ RETURN(ss->ss_node_id == range->lsr_index);
+}
+
+static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
+ struct lu_fid *fid)
+{
+ ENTRY;
+
+ /* FID seqs not in FLDB, must be local seq */
+ if (unlikely(!fid_seq_in_fldb(fid_seq(fid))))
+ RETURN(0);
+
+ /* Currently only check this for FID on MDT */
+ if (osd_mdt_seq_exists(env, osd, fid_seq(fid)))
+ RETURN(0);
+
+ RETURN(1);
}
/**
id->oii_ino, id->oii_gen, info);
info->oti_cache.oic_lid = *id;
info->oti_cache.oic_fid = *fid;
+ info->oti_cache.oic_dev = osd;
return 0;
}
rc = osd_ea_fid_get(env, obj, ino, fid, id);
else
osd_id_gen(id, ino, OSD_OII_NOGEN);
- if (rc != 0 || osd_remote_fid(env, dev, fid)) {
+ if (rc != 0) {
fid_zero(&oic->oic_fid);
GOTO(out, rc);
}
GOTO(out_mnt, rc);
}
+ if (server_name_is_ost(o->od_svname))
+ o->od_is_ost = 1;
+
rc = osd_obj_map_init(env, o);
if (rc != 0)
GOTO(out_mnt, rc);
od_noscrub:1,
od_dirent_journal:1,
od_igif_inoi:1,
- od_check_ff:1;
+ od_check_ff:1,
+ od_is_ost:1;
unsigned long od_capa_timeout;
__u32 od_capa_alg;
int osd_scrub_dump(struct osd_device *dev, char *buf, int len);
int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd,
- const struct lu_fid *fid, struct lu_seq_range *range);
+ obd_seq seq, struct lu_seq_range *range);
int osd_delete_from_remote_parent(const struct lu_env *env,
struct osd_device *osd,
const struct lu_fid *fid,
struct osd_inode_id *id);
+int osd_ost_seq_exists(struct osd_thread_info *info, struct osd_device *osd,
+ __u64 seq);
/* osd_quota_fmt.c */
int walk_tree_dqentry(const struct lu_env *env, struct osd_object *obj,
int type, uint blk, int depth, uint index,
return osd->od_dt_dev.dd_lu_dev.ld_obd->obd_name;
}
-
extern const struct dt_body_operations osd_body_ops;
extern struct lu_context_key osd_key;
int fid_is_on_ost(struct osd_thread_info *info, struct osd_device *osd,
const struct lu_fid *fid, enum oi_check_flags flags)
{
- struct lu_seq_range *range = &info->oti_seq_range;
- int rc;
ENTRY;
if (flags & OI_KNOWN_ON_OST)
if (!(flags & OI_CHECK_FLD))
RETURN(0);
- rc = osd_fld_lookup(info->oti_env, osd, fid, range);
- if (rc != 0) {
- CERROR("%s: Can not lookup fld for "DFID"\n",
- osd_name(osd), PFID(fid));
- RETURN(rc);
- }
-
- CDEBUG(D_INFO, "fid "DFID" range "DRANGE"\n", PFID(fid),
- PRANGE(range));
-
- if (fld_range_is_ost(range))
+ if (osd->od_is_ost)
RETURN(1);
RETURN(0);
struct osd_idmap_cache {
struct lu_fid oic_fid;
struct osd_inode_id oic_lid;
+ struct osd_device *oic_dev;
};
static inline void osd_id_pack(struct osd_inode_id *tgt,
strncpy(o->od_svname, lustre_cfg_string(cfg, 4),
sizeof(o->od_svname) - 1);
+ if (server_name_is_ost(o->od_svname))
+ o->od_is_ost = 1;
+
rc = -udmu_objset_open(o->od_mntdev, &o->od_objset);
if (rc) {
CERROR("can't open objset %s: %d\n", o->od_mntdev, rc);
lu_object_put(env, &obj->oo_dt.do_lu);
}
-static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
- struct lu_fid *fid)
+static int osd_mdt_seq_exists(const struct lu_env *env, struct osd_device *osd,
+ obd_seq seq)
{
struct lu_seq_range *range = &osd_oti_get(env)->oti_seq_range;
struct seq_server_site *ss = osd_seq_site(osd);
int rc;
ENTRY;
- if (!fid_is_norm(fid) && !fid_is_root(fid))
- RETURN(0);
+ if (ss == NULL)
+ RETURN(1);
- rc = osd_fld_lookup(env, osd, fid, range);
+ /* XXX: currently, each MDT only store avaible sequence on disk,
+ * and no allocated sequences information on disk, so it has to
+ * lookup FLDB. It probably makes more sense also store allocated
+ * sequence locally, so we do not need do remote FLDB lookup in OSD */
+ rc = osd_fld_lookup(env, osd, seq, range);
if (rc != 0) {
- CERROR("%s: Can not lookup fld for "DFID"\n",
- osd_name(osd), PFID(fid));
- RETURN(rc);
+ CERROR("%s: Can not lookup fld for "LPX64"\n",
+ osd_name(osd), seq);
+ RETURN(0);
}
- RETURN(ss->ss_node_id != range->lsr_index);
+ RETURN(ss->ss_node_id == range->lsr_index);
+}
+
+static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
+ struct lu_fid *fid)
+{
+ ENTRY;
+
+ if (!fid_is_norm(fid) && !fid_is_root(fid))
+ RETURN(0);
+
+ /* Currently, it only used to check FID on MDT */
+ if (osd_mdt_seq_exists(env, osd, fid_seq(fid)))
+ RETURN(0);
+
+ RETURN(1);
}
/**
struct lprocfs_stats *od_stats;
uint64_t od_root;
+ uint64_t od_O_id;
struct osd_oi **od_oi_table;
unsigned int od_oi_count;
struct osd_seq_list od_seq_list;
unsigned int od_rdonly:1,
od_xattr_in_sa:1,
- od_quota_iused_est:1;
+ od_quota_iused_est:1,
+ od_is_ost:1;
+
char od_mntdev[128];
char od_svname[128];
int osd_options_init(void);
int osd_convert_root_to_new_seq(const struct lu_env *env,
struct osd_device *o);
-
+int osd_ost_seq_exists(const struct lu_env *env, struct osd_device *osd,
+ __u64 seq);
/* osd_index.c */
int osd_index_try(const struct lu_env *env, struct dt_object *dt,
const struct dt_index_features *feat);
int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd,
- const struct lu_fid *fid, struct lu_seq_range *range);
+ obd_seq seq, struct lu_seq_range *range);
/* osd_xattr.c */
int __osd_xattr_load(udmu_objset_t *uos, uint64_t dnode, nvlist_t **sa_xattr);
* the object is located (tgt index) and it is MDT or OST object.
*/
int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd,
- const struct lu_fid *fid, struct lu_seq_range *range)
+ obd_seq seq, struct lu_seq_range *range)
{
struct seq_server_site *ss = osd_seq_site(osd);
int rc;
- if (fid_is_idif(fid)) {
+ if (fid_seq_is_idif(seq)) {
fld_range_set_ost(range);
- range->lsr_index = fid_idif_ost_idx(fid);
+ range->lsr_index = idif_ost_idx(seq);
return 0;
}
- if (!fid_seq_in_fldb(fid_seq(fid))) {
+ if (!fid_seq_in_fldb(seq)) {
fld_range_set_mdt(range);
if (ss != NULL)
/* FIXME: If ss is NULL, it suppose not get lsr_index
LASSERT(ss != NULL);
fld_range_set_any(range);
- rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
+ rc = fld_server_lookup(env, ss->ss_server_fld, seq, range);
if (rc != 0)
- CERROR("%s: cannot find FLD range for "DFID": rc = %d\n",
- osd_name(osd), PFID(fid), rc);
+ CERROR("%s: cannot find FLD range for "LPX64": rc = %d\n",
+ osd_name(osd), seq, rc);
return rc;
}
int fid_is_on_ost(const struct lu_env *env, struct osd_device *osd,
const struct lu_fid *fid)
{
- struct lu_seq_range *range = &osd_oti_get(env)->oti_seq_range;
- int rc;
ENTRY;
if (fid_is_idif(fid))
RETURN(1);
- rc = osd_fld_lookup(env, osd, fid, range);
- if (rc != 0) {
- CERROR("%s: Can not lookup fld for "DFID"\n",
- osd_name(osd), PFID(fid));
- RETURN(rc);
- }
-
- CDEBUG(D_INFO, "fid "DFID" range "DRANGE"\n", PFID(fid),
- PRANGE(range));
-
- if (fld_range_is_ost(range))
+ if (osd->od_is_ost)
RETURN(1);
RETURN(0);
if (osd_seq->os_compat_dirs == NULL)
GOTO(out, rc = -ENOMEM);
- rc = osd_oi_lookup(env, osd, osd->od_root, "O", &oi);
- if (rc != 0) {
- CERROR("%s: Can not find O: rc = %d\n", osd_name(osd), rc);
- GOTO(out, rc);
- }
-
+ oi.oi_zapid = osd->od_O_id;
sprintf(seq_name, (fid_seq_is_rsvd(seq) ||
fid_seq_is_mdt0(seq)) ? LPU64 : LPX64i,
fid_seq_is_idif(seq) ? 0 : seq);
if (rc)
RETURN(rc);
+ o->od_O_id = sdb;
+
osd_ost_seq_init(env, o);
/* Create on-disk indexes to maintain per-UID/GID inode usage.
* Those new indexes are created in the top-level ZAP outside the