CFS_INIT_LIST_HEAD(&cache->fci_lru);
cache->fci_cache_count = 0;
- spin_lock_init(&cache->fci_lock);
+ rwlock_init(&cache->fci_lock);
strncpy(cache->fci_name, name,
sizeof(cache->fci_name));
{
ENTRY;
- spin_lock(&cache->fci_lock);
+ write_lock(&cache->fci_lock);
cache->fci_cache_size = 0;
fld_cache_shrink(cache);
- spin_unlock(&cache->fci_lock);
+ write_unlock(&cache->fci_lock);
EXIT;
}
__u32 new_flags = f_new->fce_range.lsr_flags;
ENTRY;
- LASSERT_SPIN_LOCKED(&cache->fci_lock);
-
/*
* Duplicate entries are eliminated in insert op.
* So we don't need to search new entry before starting
if (IS_ERR(flde))
RETURN(PTR_ERR(flde));
- spin_lock(&cache->fci_lock);
+ write_lock(&cache->fci_lock);
rc = fld_cache_insert_nolock(cache, flde);
- spin_unlock(&cache->fci_lock);
+ write_unlock(&cache->fci_lock);
if (rc)
OBD_FREE_PTR(flde);
struct fld_cache_entry *tmp;
cfs_list_t *head;
- LASSERT_SPIN_LOCKED(&cache->fci_lock);
head = &cache->fci_entries_head;
cfs_list_for_each_entry_safe(flde, tmp, head, fce_list) {
/* add list if next is end of list */
void fld_cache_delete(struct fld_cache *cache,
const struct lu_seq_range *range)
{
- spin_lock(&cache->fci_lock);
+ write_lock(&cache->fci_lock);
fld_cache_delete_nolock(cache, range);
- spin_unlock(&cache->fci_lock);
+ write_unlock(&cache->fci_lock);
}
struct fld_cache_entry
struct fld_cache_entry *got = NULL;
cfs_list_t *head;
- LASSERT_SPIN_LOCKED(&cache->fci_lock);
head = &cache->fci_entries_head;
cfs_list_for_each_entry(flde, head, fce_list) {
if (range->lsr_start == flde->fce_range.lsr_start ||
struct fld_cache_entry *got = NULL;
ENTRY;
- spin_lock(&cache->fci_lock);
+ read_lock(&cache->fci_lock);
got = fld_cache_entry_lookup_nolock(cache, range);
- spin_unlock(&cache->fci_lock);
+ read_unlock(&cache->fci_lock);
RETURN(got);
}
cfs_list_t *head;
ENTRY;
- spin_lock(&cache->fci_lock);
+ read_lock(&cache->fci_lock);
head = &cache->fci_entries_head;
cache->fci_stat.fst_count++;
if (range_within(&flde->fce_range, seq)) {
*range = flde->fce_range;
- /* update position of this entry in lru list. */
- cfs_list_move(&flde->fce_lru, &cache->fci_lru);
cache->fci_stat.fst_cache++;
- spin_unlock(&cache->fci_lock);
+ read_unlock(&cache->fci_lock);
RETURN(0);
}
}
- spin_unlock(&cache->fci_lock);
+ read_unlock(&cache->fci_lock);
RETURN(-ENOENT);
}
/* Lookup it in the cache. */
rc = fld_cache_lookup(fld->lsf_cache, seq, erange);
if (rc == 0) {
- if (unlikely(erange->lsr_flags != range->lsr_flags)) {
+ if (unlikely(erange->lsr_flags != range->lsr_flags) &&
+ range->lsr_flags != -1) {
CERROR("FLD cache found a range "DRANGE" doesn't "
"match the requested flag %x\n",
PRANGE(erange), range->lsr_flags);
* This is temporary solution, long term solution is fld
* replication on all mdt servers.
*/
+ range->lsr_start = seq;
rc = fld_client_rpc(fld->lsf_control_exp,
range, FLD_LOOKUP);
if (rc == 0)
RETURN(err_serious(-EPROTO));
*out = *in;
- /* For old 2.0 client, the 'lsr_flags' is uninitialized.
- * Set it as 'LU_SEQ_RANGE_MDT' by default.
- * Old 2.0 liblustre client cannot talk with new 2.1 server. */
- if (!(exp->exp_connect_flags & OBD_CONNECT_64BITHASH) &&
- !exp->exp_libclient)
- out->lsr_flags = LU_SEQ_RANGE_MDT;
+ /* For old 2.0 client, the 'lsr_flags' is uninitialized.
+ * Set it as 'LU_SEQ_RANGE_MDT' by default.
+ * Old 2.0 liblustre client cannot talk with new 2.1 server. */
+ if (!(exp->exp_connect_flags & OBD_CONNECT_64BITHASH) &&
+ !((exp->exp_connect_flags & OBD_CONNECT_MDS) &&
+ (exp->exp_connect_flags & OBD_CONNECT_FID)) &&
+ !(exp->exp_connect_flags & OBD_CONNECT_LIGHTWEIGHT) &&
+ !exp->exp_libclient)
+ out->lsr_flags = LU_SEQ_RANGE_MDT;
rc = fld_server_handle(lu_site2seq(site)->ss_server_fld,
req->rq_svc_thread->t_env,
#endif
int fld_server_init(const struct lu_env *env, struct lu_server_fld *fld,
- struct dt_device *dt, const char *prefix, int mds_node_id)
+ struct dt_device *dt, const char *prefix, int mds_node_id,
+ __u32 lsr_flags)
{
int cache_size, cache_threshold;
struct lu_seq_range range;
GOTO(out, rc);
}
- if (!mds_node_id) {
+ if (!mds_node_id && lsr_flags == LU_SEQ_RANGE_MDT) {
rc = fld_index_init(env, fld, dt);
if (rc)
GOTO(out, rc);
fld->lsf_control_exp = NULL;
/* Insert reserved sequence number of ".lustre" into fld cache. */
- range.lsr_start = FID_SEQ_DOT_LUSTRE;
- range.lsr_end = FID_SEQ_DOT_LUSTRE + 1;
- range.lsr_index = 0;
- range.lsr_flags = LU_SEQ_RANGE_MDT;
- fld_cache_insert(fld->lsf_cache, &range);
-
+ if (lsr_flags == LU_SEQ_RANGE_MDT) {
+ range.lsr_start = FID_SEQ_DOT_LUSTRE;
+ range.lsr_end = FID_SEQ_DOT_LUSTRE + 1;
+ range.lsr_index = 0;
+ range.lsr_flags = lsr_flags;
+ fld_cache_insert(fld->lsf_cache, &range);
+ }
EXIT;
out:
if (rc)
if (IS_ERR(flde))
GOTO(out, rc = PTR_ERR(flde));
- spin_lock(&fld->lsf_cache->fci_lock);
+ write_lock(&fld->lsf_cache->fci_lock);
if (deleted)
fld_cache_delete_nolock(fld->lsf_cache, new_range);
rc = fld_cache_insert_nolock(fld->lsf_cache, flde);
- spin_unlock(&fld->lsf_cache->fci_lock);
+ write_unlock(&fld->lsf_cache->fci_lock);
if (rc)
OBD_FREE_PTR(flde);
out:
* Cache guard, protects fci_hash mostly because others immutable after
* init is finished.
*/
- spinlock_t fci_lock;
+ rwlock_t fci_lock;
/**
* Cache shrink threshold */
/* Server methods */
int fld_server_init(const struct lu_env *env, struct lu_server_fld *fld,
- struct dt_device *dt, const char *prefix, int mds_node_id);
+ struct dt_device *dt, const char *prefix, int mds_node_id,
+ __u32 lsr_flags);
void fld_server_fini(const struct lu_env *env, struct lu_server_fld *fld);
RETURN(rc = -ENOMEM);
rc = fld_server_init(env, ss->ss_server_fld, m->mdt_bottom, uuid,
- ss->ss_node_id);
+ ss->ss_node_id, LU_SEQ_RANGE_MDT);
if (rc) {
OBD_FREE_PTR(ss->ss_server_fld);
ss->ss_server_fld = NULL;
}
if (mdt->mdt_som_conf &&
- !(data->ocd_connect_flags & (OBD_CONNECT_MDS_MDS|OBD_CONNECT_SOM))){
+ !(data->ocd_connect_flags & (OBD_CONNECT_LIGHTWEIGHT |
+ OBD_CONNECT_MDS_MDS |
+ OBD_CONNECT_SOM))) {
CWARN("%s: MDS has SOM enabled, but client does not support "
"it\n", mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
return -EBADE;
lustre_deregister_osp_item(&ss->ss_client_seq->lcs_exp);
ss->ss_client_seq->lcs_exp = NULL;
}
+
+ if (ss->ss_server_fld != NULL) {
+ lustre_deregister_osp_item(&ss->ss_server_fld->lsf_control_exp);
+ ss->ss_server_fld->lsf_control_exp = NULL;
+ }
+}
+
+static int ofd_fld_fini(const struct lu_env *env,
+ struct ofd_device *ofd)
+{
+ struct seq_server_site *ss = &ofd->ofd_seq_site;
+ ENTRY;
+
+ if (ss && ss->ss_server_fld) {
+ fld_server_fini(env, ss->ss_server_fld);
+ OBD_FREE_PTR(ss->ss_server_fld);
+ ss->ss_server_fld = NULL;
+ }
+
+ RETURN(0);
}
void ofd_seqs_fini(const struct lu_env *env, struct ofd_device *ofd)
if (rc != 0)
CERROR("%s: fid fini error: rc = %d\n", ofd_name(ofd), rc);
+ rc = ofd_fld_fini(env, ofd);
+ if (rc != 0)
+ CERROR("%s: fld fini error: rc = %d\n", ofd_name(ofd), rc);
+
CFS_INIT_LIST_HEAD(&dispose);
write_lock(&ofd->ofd_seq_list_lock);
cfs_list_for_each_entry_safe(oseq, tmp, &ofd->ofd_seq_list, os_list) {
return ERR_PTR(rc);
}
+static int ofd_fld_init(const struct lu_env *env, const char *uuid,
+ struct ofd_device *ofd)
+{
+ struct seq_server_site *ss = &ofd->ofd_seq_site;
+ int rc;
+ ENTRY;
+
+ OBD_ALLOC_PTR(ss->ss_server_fld);
+ if (ss->ss_server_fld == NULL)
+ RETURN(rc = -ENOMEM);
+
+ rc = fld_server_init(env, ss->ss_server_fld, ofd->ofd_osd, uuid,
+ ss->ss_node_id, LU_SEQ_RANGE_OST);
+ if (rc) {
+ OBD_FREE_PTR(ss->ss_server_fld);
+ ss->ss_server_fld = NULL;
+ RETURN(rc);
+ }
+ RETURN(0);
+}
+
static int ofd_register_seq_exp(struct ofd_device *ofd)
{
struct seq_server_site *ss = &ofd->ofd_seq_site;
rc = lustre_register_osp_item(osp_name, &ss->ss_client_seq->lcs_exp,
NULL, NULL);
+ if (rc != 0)
+ GOTO(out_free, rc);
+
+ rc = lustre_register_osp_item(osp_name,
+ &ss->ss_server_fld->lsf_control_exp,
+ NULL, NULL);
+ if (rc != 0) {
+ lustre_deregister_osp_item(&ss->ss_client_seq->lcs_exp);
+ ss->ss_client_seq->lcs_exp = NULL;
+ GOTO(out_free, rc);
+ }
out_free:
if (osp_name != NULL)
OBD_FREE(osp_name, MAX_OBD_NAME);
return rc;
}
+ rc = ofd_fld_init(env, ofd_name(ofd), ofd);
+ if (rc) {
+ CERROR("%s: Can't init fld, rc %d\n", ofd_name(ofd), rc);
+ return rc;
+ }
+
rc = ofd_register_seq_exp(ofd);
if (rc) {
CERROR("%s: Can't init seq exp, rc %d\n", ofd_name(ofd), rc);
return osd_oi_insert(info, osd, fid, id, th);
}
+int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd,
+ const struct lu_fid *fid, struct lu_seq_range *range)
+{
+ struct seq_server_site *ss = osd_seq_site(osd);
+ int rc;
+
+ if (fid_is_igif(fid)) {
+ range->lsr_flags = LU_SEQ_RANGE_MDT;
+ range->lsr_index = 0;
+ return 0;
+ }
+
+ if (fid_is_idif(fid)) {
+ range->lsr_flags = LU_SEQ_RANGE_OST;
+ range->lsr_index = fid_idif_ost_idx(fid);
+ return 0;
+ }
+
+ if (!fid_is_norm(fid)) {
+ range->lsr_flags = LU_SEQ_RANGE_MDT;
+ if (ss != NULL)
+ /* FIXME: If ss is NULL, it suppose not get lsr_index
+ * at all */
+ range->lsr_index = ss->ss_node_id;
+ return 0;
+ }
+
+ LASSERT(ss != NULL);
+ range->lsr_flags = -1;
+ rc = fld_server_lookup(env, ss->ss_server_fld, fid_seq(fid), range);
+ if (rc != 0) {
+ CERROR("%s can not find "DFID": rc = %d\n",
+ osd2lu_dev(osd)->ld_obd->obd_name, PFID(fid), rc);
+ }
+ return rc;
+}
+
+
static int osd_declare_object_create(const struct lu_env *env,
struct dt_object *dt,
struct lu_attr *attr,
struct dt_object_format *dof,
struct thandle *handle)
{
+ struct lu_seq_range *range = &osd_oti_get(env)->oti_seq_range;
struct osd_thandle *oh;
int rc;
ENTRY;
OSD_DECLARE_OP(oh, create, osd_dto_credits_noquota[DTO_OBJECT_CREATE]);
/* XXX: So far, only normal fid needs be inserted into the oi,
* things could be changed later. Revise following code then. */
- if (fid_is_norm(lu_object_fid(&dt->do_lu))) {
+ if (fid_is_norm(lu_object_fid(&dt->do_lu)) &&
+ !fid_is_on_ost(osd_oti_get(env), osd_dt_dev(handle->th_dev),
+ lu_object_fid(&dt->do_lu))) {
/* Reuse idle OI block may cause additional one OI block
* to be changed. */
OSD_DECLARE_OP(oh, insert,
rc = osd_declare_inode_qid(env, attr->la_uid, attr->la_gid, 1, oh,
false, false, NULL, false);
+ if (rc != 0)
+ RETURN(rc);
+
+ /* It does fld look up inside declare, and the result will be
+ * added to fld cache, so the following fld lookup inside insert
+ * does not need send RPC anymore, so avoid send rpc with holding
+ * transaction */
+ if (fid_is_norm(lu_object_fid(&dt->do_lu)) &&
+ !fid_is_last_id(lu_object_fid(&dt->do_lu)))
+ osd_fld_lookup(env, osd_dt_dev(handle->th_dev),
+ lu_object_fid(&dt->do_lu), range);
+
+
RETURN(rc);
}
result = __osd_object_create(info, obj, attr, hint, dof, th);
/* objects under osd root shld have igif fid, so dont add fid EA */
- if (result == 0 && fid_seq(fid) >= FID_SEQ_NORMAL)
+ /* For ost object, the fid will be stored during first write */
+ if (result == 0 && fid_seq(fid) >= FID_SEQ_NORMAL &&
+ !fid_is_on_ost(info, osd_dt_dev(th->th_dev), fid))
result = osd_ea_fid_set(env, dt, fid);
if (result == 0)
struct dt_object *dt,
struct thandle *handle)
{
- struct osd_thandle *oh;
+ struct osd_thandle *oh;
/* it's possible that object doesn't exist yet */
LASSERT(handle != NULL);
const struct dt_key *key,
struct thandle *handle)
{
- struct osd_thandle *oh;
- struct inode *inode;
- int rc;
+ struct osd_thandle *oh;
+ struct inode *inode;
+ struct lu_fid *fid = (struct lu_fid *)rec;
+ int rc;
ENTRY;
LASSERT(dt_object_exists(dt));
* insert */
rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0, oh,
true, true, NULL, false);
+ if (fid == NULL)
+ RETURN(0);
+
+ /* It does fld look up inside declare, and the result will be
+ * added to fld cache, so the following fld lookup inside insert
+ * does not need send RPC anymore, so avoid send rpc with holding
+ * transaction */
+ LASSERTF(fid_is_sane(fid), "fid is insane"DFID"\n", PFID(fid));
+ osd_fld_lookup(env, osd_dt_dev(handle->th_dev), fid,
+ &osd_oti_get(env)->oti_seq_range);
+
RETURN(rc);
}
struct lquota_trans oti_quota_trans;
union lquota_rec oti_quota_rec;
__u64 oti_quota_id;
+ struct lu_seq_range oti_seq_range;
};
extern int ldiskfs_pdo;
struct osd_inode_id *id);
int osd_scrub_dump(struct osd_device *dev, char *buf, int len);
+int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd,
+ const struct lu_fid *fid, struct lu_seq_range *range);
/* osd_quota_fmt.c */
int walk_tree_dqentry(const struct lu_env *env, struct osd_object *obj,
int type, uint blk, int depth, uint index,
RETURN(rc);
}
+int fid_is_on_ost(struct osd_thread_info *info, struct osd_device *osd,
+ const struct lu_fid *fid)
+{
+ struct lu_seq_range *range = &info->oti_seq_range;
+ int rc;
+ ENTRY;
+
+ if (fid_is_idif(fid) || fid_is_last_id(fid))
+ RETURN(1);
+
+ rc = osd_fld_lookup(info->oti_env, osd, fid, range);
+ if (rc != 0) {
+ CERROR("%s: Can not lookup fld for "DFID"\n",
+ osd2lu_dev(osd)->ld_obd->obd_name, PFID(fid));
+ RETURN(rc);
+ }
+
+ CDEBUG(D_INFO, "fid "DFID" range "DRANGE"\n", PFID(fid),
+ PRANGE(range));
+
+ if (range->lsr_flags == LU_SEQ_RANGE_OST)
+ RETURN(1);
+
+ RETURN(0);
+}
+
int __osd_oi_lookup(struct osd_thread_info *info, struct osd_device *osd,
const struct lu_fid *fid, struct osd_inode_id *id)
{
{
int rc = 0;
- if ((fid_is_idif(fid) && !fid_is_last_id(fid)) ||
+ if ((!fid_is_last_id(fid) && fid_is_on_ost(info, osd, fid)) ||
fid_is_llog(fid)) {
/* old OSD obj id */
/* FIXME: actually for all of the OST object */
if (fid_is_igif(fid) || unlikely(fid_seq(fid) == FID_SEQ_DOT_LUSTRE))
return 0;
- if ((fid_is_idif(fid) && !fid_is_last_id(fid)) ||
+ if ((fid_is_on_ost(info, osd, fid) && !fid_is_last_id(fid)) ||
fid_is_llog(fid))
return osd_obj_map_insert(info, osd, fid, id, th);
LASSERT(fid_seq(fid) != FID_SEQ_LOCAL_FILE);
- if (fid_is_idif(fid) || fid_is_llog(fid))
+ if (fid_is_on_ost(info, osd, fid) || fid_is_llog(fid))
return osd_obj_map_delete(info, osd, fid, th);
fid_cpu_to_be(oi_fid, fid);
struct osd_device *osd, const struct lu_fid *fid,
struct thandle *th);
+int fid_is_on_ost(struct osd_thread_info *info, struct osd_device *osd,
+ const struct lu_fid *fid);
#endif /* __KERNEL__ */
#endif /* _OSD_OI_H */