For future usage to distinguish between MDT and OST fids.
o=di.wang
r=eric.mei
r=mikhail.pershin
LASSERT(fid_is_sane(fid));
- rc = fld_client_lookup(cm->cmm_fld, fid_seq(fid), mds, env);
+ rc = fld_client_lookup(cm->cmm_fld, fid_seq(fid), mds,
+ LU_SEQ_RANGE_MDT, env);
if (rc) {
CERROR("Can't find mds by seq "LPX64", rc %d\n",
fid_seq(fid), rc);
seq->lss_name, cli->lcs_name);
seq->lss_cli = cli;
- cli->lcs_space.lsr_mdt = seq->lss_site->ms_node_id;
+ cli->lcs_space.lsr_index = seq->lss_site->ms_node_id;
EXIT;
out_up:
cfs_up(&seq->lss_sem);
/* seq client passed mdt id, we need to pass that using out
* range parameter */
- out->lsr_mdt = tmp->lsr_mdt;
+ out->lsr_index = tmp->lsr_index;
+ out->lsr_flags = tmp->lsr_flags;
rc = seq_server_handle(site, env, *opc, out);
} else
rc = err_serious(-EPROTO);
LUSTRE_SEQ_ZERO_RANGE:
LUSTRE_SEQ_SPACE_RANGE;
- seq->lss_space.lsr_mdt = ms->ms_node_id;
+ seq->lss_space.lsr_index = ms->ms_node_id;
CDEBUG(D_INFO, "%s: No data found "
"on store. Initialize space\n",
seq->lss_name);
ptlrpc_request_set_replen(req);
- if (seq->lcs_type == LUSTRE_SEQ_METADATA) {
- req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ?
- SEQ_CONTROLLER_PORTAL : SEQ_METADATA_PORTAL;
- /* update mdt field of *in, it is required for fld update
- * on super sequence allocator node. */
- if (opc == SEQ_ALLOC_SUPER)
- in->lsr_mdt = seq->lcs_space.lsr_mdt;
+ if (seq->lcs_type == LUSTRE_SEQ_METADATA) {
+ req->rq_request_portal = SEQ_METADATA_PORTAL;
+ in->lsr_flags = LU_SEQ_RANGE_MDT;
} else {
- LASSERT(opc == SEQ_ALLOC_META);
+ LASSERTF(seq->lcs_type == LUSTRE_SEQ_DATA,
+ "unknown lcs_type %u\n", seq->lcs_type);
req->rq_request_portal = SEQ_DATA_PORTAL;
+ in->lsr_flags = LU_SEQ_RANGE_OST;
}
+
+ if (opc == SEQ_ALLOC_SUPER) {
+ /* Update index field of *in, it is required for
+ * FLD update on super sequence allocator node. */
+ in->lsr_index = seq->lcs_space.lsr_index;
+ req->rq_request_portal = SEQ_CONTROLLER_PORTAL;
+ } else {
+ LASSERTF(opc == SEQ_ALLOC_META,
+ "unknown opcode %u\n, opc", opc);
+ }
+
ptlrpc_at_set_req_timeout(req);
mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
* set to -1 for dgb check.
*/
- seq->lcs_space.lsr_mdt = -1;
+ seq->lcs_space.lsr_index = -1;
range_init(&seq->lcs_space);
cfs_up(&seq->lcs_sem);
ENTRY;
*eof = 1;
- rc = snprintf(page, count, "["LPX64" - "LPX64"]:%x\n",
+ rc = snprintf(page, count, "["LPX64" - "LPX64"]:%x:%x\n",
PRANGE(range));
RETURN(rc);
}
/* check merge possibility with next range */
if (c_range->lsr_end == n_range->lsr_start) {
- if (c_range->lsr_mdt != n_range->lsr_mdt)
+ if (c_range->lsr_index != n_range->lsr_index)
continue;
n_range->lsr_start = c_range->lsr_start;
fld_cache_entry_delete(cache, f_curr);
/* check if current range overlaps with next range. */
if (n_range->lsr_start < c_range->lsr_end) {
- if (c_range->lsr_mdt == n_range->lsr_mdt) {
+ if (c_range->lsr_index == n_range->lsr_index) {
n_range->lsr_start = c_range->lsr_start;
n_range->lsr_end = max(c_range->lsr_end,
n_range->lsr_end);
/* fldt */
fldt->fce_range.lsr_start = new_end;
fldt->fce_range.lsr_end = f_curr->fce_range.lsr_end;
- fldt->fce_range.lsr_mdt = f_curr->fce_range.lsr_mdt;
+ fldt->fce_range.lsr_index = f_curr->fce_range.lsr_index;
/* f_curr */
f_curr->fce_range.lsr_end = new_start;
const struct lu_seq_range *range = &f_new->fce_range;
const seqno_t new_start = range->lsr_start;
const seqno_t new_end = range->lsr_end;
- const mdsno_t mdt = range->lsr_mdt;
+ const mdsno_t mdt = range->lsr_index;
/* this is overlap case, these case are checking overlapping with
* prev range only. fixup will handle overlaping with next range. */
- if (f_curr->fce_range.lsr_mdt == mdt) {
+ if (f_curr->fce_range.lsr_index == mdt) {
f_curr->fce_range.lsr_start = min(f_curr->fce_range.lsr_start,
new_start);
/* STEP 1: try to merge with previous range */
rc = fld_index_lookup(fld, env, new->lsr_start, erange);
- if (!rc) {
- /* in case of range overlap, mdt ID must be same for both ranges */
- if (new->lsr_mdt != erange->lsr_mdt) {
- CERROR("mdt[%x] for given range is different from"
- "existing overlapping range mdt[%x]\n",
- new->lsr_mdt, erange->lsr_mdt);
- rc = -EIO;
- GOTO(out, rc);
+ if (rc == 0) {
+ /* in case of range overlap, the location must be same */
+ if (range_compare_loc(new, erange) != 0) {
+ CERROR("the start of given range "DRANGE" conflict to"
+ "an existing range "DRANGE"\n",
+ PRANGE(new), PRANGE(erange));
+ GOTO(out, rc = -EIO);
}
if (new->lsr_end < erange->lsr_end)
GOTO(out, rc);
do_merge = 1;
-
} else if (rc == -ENOENT) {
/* check for merge case: optimizes for single mds lustre.
* As entry does not exist, returned entry must be left side
* So try to merge from left.
*/
if (new->lsr_start == erange->lsr_end &&
- new->lsr_mdt == erange->lsr_mdt)
+ range_compare_loc(new, erange) == 0)
do_merge = 1;
} else {
/* no overlap allowed in fld, so failure in lookup is error */
}
if (do_merge) {
- /* new range can be combined with existing one.
- * So delete existing range.
- */
-
+ /* new range will be merged with the existing one.
+ * delete this range at first. */
rc = fld_index_delete(fld, env, erange, th);
- if (rc == 0) {
- new->lsr_start = min(erange->lsr_start, new->lsr_start);
- new->lsr_end = max(erange->lsr_end, new->lsr_end);
- } else
+ if (rc != 0)
GOTO(out, rc);
+ new->lsr_start = min(erange->lsr_start, new->lsr_start);
+ new->lsr_end = max(erange->lsr_end, new->lsr_end);
do_merge = 0;
}
/* STEP 2: try to merge with next range */
rc = fld_index_lookup(fld, env, new->lsr_end, erange);
- if (!rc) {
- /* case range overlap: with right side entry. */
- if (new->lsr_mdt == erange->lsr_mdt)
- do_merge = 1;
+ if (rc == 0) {
+ /* found a matched range, meaning we're either
+ * overlapping or ajacent, must merge with it. */
+ do_merge = 1;
} else if (rc == -ENOENT) {
/* this range is left of new range end point */
LASSERT(erange->lsr_end <= new->lsr_end);
-
- if (new->lsr_end == erange->lsr_end)
- do_merge = 1;
- if (new->lsr_start <= erange->lsr_start)
+ /*
+ * the found left range must be either:
+ * 1. withing new range.
+ * 2. left of new range (no overlapping).
+ * because if they're partly overlapping, the STEP 1 must have
+ * been removed this range.
+ */
+ LASSERTF(erange->lsr_start > new->lsr_start ||
+ erange->lsr_end < new->lsr_start ||
+ (erange->lsr_end == new->lsr_end &&
+ range_compare_loc(new, erange) != 0),
+ "left "DRANGE", new "DRANGE"\n",
+ PRANGE(erange), PRANGE(new));
+
+ /* if it's within the new range, merge it */
+ if (erange->lsr_start > new->lsr_start)
do_merge = 1;
- } else
+ } else {
GOTO(out, rc);
+ }
if (do_merge) {
- if (new->lsr_mdt != erange->lsr_mdt) {
- CERROR("mdt[%x] for given range is different from"
- "existing overlapping range mdt[%x]\n",
- new->lsr_mdt, erange->lsr_mdt);
- rc = -EIO;
- GOTO(out, rc);
+ if (range_compare_loc(new, erange) != 0) {
+ CERROR("the end of given range "DRANGE" overlaps "
+ "with an existing range "DRANGE"\n",
+ PRANGE(new), PRANGE(erange));
+ GOTO(out, rc = -EIO);
}
-
+
/* merge with next range */
rc = fld_index_delete(fld, env, erange, th);
- if (rc == 0) {
- new->lsr_start = min(erange->lsr_start, new->lsr_start);
- new->lsr_end = max(erange->lsr_end, new->lsr_end);
- } else
+ if (rc != 0)
GOTO(out, rc);
+
+ new->lsr_start = min(erange->lsr_start, new->lsr_start);
+ new->lsr_end = max(erange->lsr_end, new->lsr_end);
}
/* now update fld entry. */
const struct lu_env *env,
seqno_t seq, struct lu_seq_range *range)
{
+ struct lu_seq_range *erange;
+ struct fld_thread_info *info;
int rc;
ENTRY;
+ info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
+ erange = &info->fti_lrange;
+
/* Lookup it in the cache. */
- rc = fld_cache_lookup(fld->lsf_cache, seq, range);
- if (rc == 0)
+ rc = fld_cache_lookup(fld->lsf_cache, seq, erange);
+ if (rc == 0) {
+ if (unlikely(erange->lsr_flags != range->lsr_flags)) {
+ CERROR("FLD cache found a range "DRANGE" doesn't "
+ "match the requested flag %x\n",
+ PRANGE(erange), range->lsr_flags);
+ RETURN(-EIO);
+ }
+ *range = *erange;
RETURN(0);
+ }
- if (fld->lsf_obj)
- rc = fld_index_lookup(fld, env, seq, range);
- else {
+ if (fld->lsf_obj) {
+ rc = fld_index_lookup(fld, env, seq, erange);
+ if (rc == 0) {
+ if (unlikely(erange->lsr_flags != range->lsr_flags)) {
+ CERROR("FLD found a range "DRANGE" doesn't "
+ "match the requested flag %x\n",
+ PRANGE(erange), range->lsr_flags);
+ RETURN(-EIO);
+ }
+ *range = *erange;
+ }
+ } else {
LASSERT(fld->lsf_control_exp);
/* send request to mdt0 i.e. super seq. controller.
* This is temporary solution, long term solution is fld
CDEBUG(D_INFO, "%s: FLD req handle: error %d (opc: %d, range: "
DRANGE"\n", fld->lsf_name, rc, opc, PRANGE(range));
-
+
RETURN(rc);
}
rc = fld_cache_lookup(msite->ms_client_fld->lcf_cache,
fid_seq(fid), range);
if (rc == 0)
- result = (range->lsr_mdt == msite->ms_node_id);
+ result = (range->lsr_index == msite->ms_node_id);
}
return result;
}
/* Insert reserved sequence number of ".lustre" into fld cache. */
range.lsr_start = FID_SEQ_DOT_LUSTRE;
range.lsr_end = FID_SEQ_DOT_LUSTRE + 1;
- range.lsr_mdt = 0;
+ range.lsr_index = 0;
+ range.lsr_flags = LU_SEQ_RANGE_MDT;
fld_cache_insert(fld->lsf_cache, &range);
EXIT;
static const struct lu_seq_range IGIF_FLD_RANGE = {
.lsr_start = 1,
.lsr_end = FID_SEQ_IDIF,
- .lsr_mdt = 0
+ .lsr_index = 0,
+ .lsr_flags = LU_SEQ_RANGE_MDT
};
const struct dt_index_features fld_index_features = {
}
/**
- * lookup range for a seq passed
+ * lookup range for a seq passed. note here we only care about the start/end,
+ * caller should handle the attached location data (flags, index).
*
- * \param seq seq for lookup.
- * \param range result of lookup.
+ * \param seq seq for lookup.
+ * \param range result of lookup.
*
- * \retval 0 success
- * \retval -ve error
+ * \retval 0 found, \a range is the matched range;
+ * \retval -ENOENT not found, \a range is the left-side range;
+ * \retval -ve other error;
*/
int fld_index_lookup(struct lu_server_fld *fld,
return rc;
}
-int fld_client_lookup(struct lu_client_fld *fld,
- seqno_t seq, mdsno_t *mds,
- const struct lu_env *env)
+int fld_client_lookup(struct lu_client_fld *fld, seqno_t seq, mdsno_t *mds,
+ __u32 flags, const struct lu_env *env)
{
struct lu_seq_range res;
struct lu_fld_target *target;
rc = fld_cache_lookup(fld->lcf_cache, seq, &res);
if (rc == 0) {
- *mds = res.lsr_mdt;
+ *mds = res.lsr_index;
RETURN(0);
}
fld_target_name(target), target->ft_idx);
res.lsr_start = seq;
+ res.lsr_flags = flags;
#ifdef __KERNEL__
if (target->ft_srv != NULL) {
LASSERT(env != NULL);
#endif
if (rc == 0) {
- *mds = res.lsr_mdt;
+ *mds = res.lsr_index;
fld_cache_insert(fld->lcf_cache, &res);
}
/**
* Describes a range of sequence, lsr_start is included but lsr_end is
* not in the range.
- * Same structure is used in fld module where lsr_mdt field holds mdt id
+ * Same structure is used in fld module where lsr_index field holds mdt id
* of the home mdt.
*/
+#define LU_SEQ_RANGE_MDT 0x0
+#define LU_SEQ_RANGE_OST 0x1
+
struct lu_seq_range {
__u64 lsr_start;
__u64 lsr_end;
- __u32 lsr_mdt;
- __u32 lsr_padding;
+ __u32 lsr_index;
+ __u32 lsr_flags;
};
/**
static inline void range_init(struct lu_seq_range *range)
{
- range->lsr_start = range->lsr_end = range->lsr_mdt = 0;
+ range->lsr_start = range->lsr_end = range->lsr_index = 0;
}
/**
return range_space(range) == 0;
}
-#define DRANGE "[%#16.16"LPF64"x-%#16.16"LPF64"x):%x"
+/* return 0 if two range have the same location */
+static inline int range_compare_loc(const struct lu_seq_range *r1,
+ const struct lu_seq_range *r2)
+{
+ return r1->lsr_index != r2->lsr_index ||
+ r1->lsr_flags != r2->lsr_flags;
+}
+
+#define DRANGE "[%#16.16"LPF64"x-%#16.16"LPF64"x):%x:%x"
#define PRANGE(range) \
(range)->lsr_start, \
(range)->lsr_end, \
- (range)->lsr_mdt
+ (range)->lsr_index, \
+ (range)->lsr_flags
/** \defgroup lu_fid lu_fid
* @{ */
{
dst->lsr_start = cpu_to_le64(src->lsr_start);
dst->lsr_end = cpu_to_le64(src->lsr_end);
- dst->lsr_mdt = cpu_to_le32(src->lsr_mdt);
+ dst->lsr_index = cpu_to_le32(src->lsr_index);
+ dst->lsr_flags = cpu_to_le32(src->lsr_flags);
}
static inline void range_le_to_cpu(struct lu_seq_range *dst, const struct lu_seq_range *src)
{
dst->lsr_start = le64_to_cpu(src->lsr_start);
dst->lsr_end = le64_to_cpu(src->lsr_end);
- dst->lsr_mdt = le32_to_cpu(src->lsr_mdt);
+ dst->lsr_index = le32_to_cpu(src->lsr_index);
+ dst->lsr_flags = le32_to_cpu(src->lsr_flags);
}
static inline void range_cpu_to_be(struct lu_seq_range *dst, const struct lu_seq_range *src)
{
dst->lsr_start = cpu_to_be64(src->lsr_start);
dst->lsr_end = cpu_to_be64(src->lsr_end);
- dst->lsr_mdt = cpu_to_be32(src->lsr_mdt);
+ dst->lsr_index = cpu_to_be32(src->lsr_index);
+ dst->lsr_flags = cpu_to_be32(src->lsr_flags);
}
static inline void range_be_to_cpu(struct lu_seq_range *dst, const struct lu_seq_range *src)
{
dst->lsr_start = be64_to_cpu(src->lsr_start);
dst->lsr_end = be64_to_cpu(src->lsr_end);
- dst->lsr_mdt = be32_to_cpu(src->lsr_mdt);
+ dst->lsr_index = be32_to_cpu(src->lsr_index);
+ dst->lsr_flags = be32_to_cpu(src->lsr_flags);
}
/** @} fid */
void fld_client_flush(struct lu_client_fld *fld);
-int fld_client_lookup(struct lu_client_fld *fld,
- seqno_t seq, mdsno_t *mds,
- const struct lu_env *env);
+int fld_client_lookup(struct lu_client_fld *fld, seqno_t seq, mdsno_t *mds,
+ __u32 flags, const struct lu_env *env);
int fld_client_create(struct lu_client_fld *fld,
struct lu_seq_range *range,
ENTRY;
LASSERT(fid_is_sane(fid));
- rc = fld_client_lookup(&lmv->lmv_fld, fid_seq(fid), mds, NULL);
+ rc = fld_client_lookup(&lmv->lmv_fld, fid_seq(fid), mds,
+ LU_SEQ_RANGE_MDT, NULL);
if (rc) {
CERROR("Error while looking for mds number. Seq "LPX64
", err = %d\n", fid_seq(fid), rc);
RETURN(rc);
}
-
+
CDEBUG(D_INODE, "FLD lookup got mds #%x for fid="DFID"\n",
*mds, PFID(fid));
{
__swab64s (&range->lsr_start);
__swab64s (&range->lsr_end);
- __swab32s (&range->lsr_mdt);
+ __swab32s (&range->lsr_index);
+ __swab32s (&range->lsr_flags);
}
EXPORT_SYMBOL(lustre_swab_lu_seq_range);