Whamcloud - gitweb
b=19486 add server identifier into lu_seq_range.
authorEric Mei <eric.mei@oracle.com>
Thu, 14 Oct 2010 14:40:46 +0000 (08:40 -0600)
committerVitaly Fertman <vitaly.fertman@sun.com>
Thu, 14 Oct 2010 23:28:07 +0000 (03:28 +0400)
For future usage to distinguish between MDT and OST fids.

o=di.wang
r=eric.mei
r=mikhail.pershin

13 files changed:
lustre/cmm/cmm_object.c
lustre/fid/fid_handler.c
lustre/fid/fid_request.c
lustre/fid/lproc_fid.c
lustre/fld/fld_cache.c
lustre/fld/fld_handler.c
lustre/fld/fld_index.c
lustre/fld/fld_request.c
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_fid.h
lustre/include/lustre_fld.h
lustre/lmv/lmv_fld.c
lustre/obdclass/llog_swab.c

index feb7b89..86741be 100644 (file)
@@ -64,7 +64,8 @@ int cmm_fld_lookup(struct cmm_device *cm, const struct lu_fid *fid,
 
         LASSERT(fid_is_sane(fid));
 
-        rc = fld_client_lookup(cm->cmm_fld, fid_seq(fid), mds, env);
+        rc = fld_client_lookup(cm->cmm_fld, fid_seq(fid), mds,
+                               LU_SEQ_RANGE_MDT, env);
         if (rc) {
                 CERROR("Can't find mds by seq "LPX64", rc %d\n",
                        fid_seq(fid), rc);
index de14514..2a33c43 100644 (file)
@@ -94,7 +94,7 @@ int seq_server_set_cli(struct lu_server_seq *seq,
                seq->lss_name, cli->lcs_name);
 
         seq->lss_cli = cli;
-        cli->lcs_space.lsr_mdt = seq->lss_site->ms_node_id;
+        cli->lcs_space.lsr_index = seq->lss_site->ms_node_id;
         EXIT;
 out_up:
         cfs_up(&seq->lss_sem);
@@ -363,7 +363,8 @@ static int seq_req_handle(struct ptlrpc_request *req,
                 /* seq client passed mdt id, we need to pass that using out
                  * range parameter */
 
-                out->lsr_mdt = tmp->lsr_mdt;
+                out->lsr_index = tmp->lsr_index;
+                out->lsr_flags = tmp->lsr_flags;
                 rc = seq_server_handle(site, env, *opc, out);
         } else
                 rc = err_serious(-EPROTO);
@@ -518,7 +519,7 @@ int seq_server_init(struct lu_server_seq *seq,
                         LUSTRE_SEQ_ZERO_RANGE:
                         LUSTRE_SEQ_SPACE_RANGE;
 
-                seq->lss_space.lsr_mdt = ms->ms_node_id;
+                seq->lss_space.lsr_index = ms->ms_node_id;
                 CDEBUG(D_INFO, "%s: No data found "
                        "on store. Initialize space\n",
                        seq->lss_name);
index 436cba2..ca60353 100644 (file)
@@ -89,17 +89,26 @@ static int seq_client_rpc(struct lu_client_seq *seq,
 
         ptlrpc_request_set_replen(req);
 
-        if (seq->lcs_type == LUSTRE_SEQ_METADATA) {
-                req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ?
-                        SEQ_CONTROLLER_PORTAL : SEQ_METADATA_PORTAL;
-                /* update mdt field of *in, it is required for fld update
-                 * on super sequence allocator node. */
-                if (opc == SEQ_ALLOC_SUPER)
-                        in->lsr_mdt = seq->lcs_space.lsr_mdt;
+       if (seq->lcs_type == LUSTRE_SEQ_METADATA) {
+                req->rq_request_portal = SEQ_METADATA_PORTAL;
+                in->lsr_flags = LU_SEQ_RANGE_MDT;
         } else {
-                LASSERT(opc == SEQ_ALLOC_META);
+                LASSERTF(seq->lcs_type == LUSTRE_SEQ_DATA,
+                         "unknown lcs_type %u\n", seq->lcs_type);
                 req->rq_request_portal = SEQ_DATA_PORTAL;
+                in->lsr_flags = LU_SEQ_RANGE_OST;
         }
+
+        if (opc == SEQ_ALLOC_SUPER) {
+                /* Update index field of *in, it is required for
+                 * FLD update on super sequence allocator node. */
+                in->lsr_index = seq->lcs_space.lsr_index;
+                req->rq_request_portal = SEQ_CONTROLLER_PORTAL;
+        } else {
+                LASSERTF(opc == SEQ_ALLOC_META,
+                         "unknown opcode %u\n, opc", opc);
+        }
+
         ptlrpc_at_set_req_timeout(req);
 
         mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
@@ -275,7 +284,7 @@ void seq_client_flush(struct lu_client_seq *seq)
          * set to -1 for dgb check.
          */
 
-        seq->lcs_space.lsr_mdt = -1;
+        seq->lcs_space.lsr_index = -1;
 
         range_init(&seq->lcs_space);
         cfs_up(&seq->lcs_sem);
index f696468..70b5c9c 100644 (file)
@@ -94,7 +94,7 @@ seq_proc_read_common(char *page, char **start, off_t off,
        ENTRY;
 
         *eof = 1;
-        rc = snprintf(page, count, "["LPX64" - "LPX64"]:%x\n",
+        rc = snprintf(page, count, "["LPX64" - "LPX64"]:%x:%x\n",
                       PRANGE(range));
        RETURN(rc);
 }
index 9f0c122..4dc1f70 100644 (file)
@@ -171,7 +171,7 @@ restart_fixup:
 
                 /* check merge possibility with next range */
                 if (c_range->lsr_end == n_range->lsr_start) {
-                        if (c_range->lsr_mdt != n_range->lsr_mdt)
+                        if (c_range->lsr_index != n_range->lsr_index)
                                 continue;
                         n_range->lsr_start = c_range->lsr_start;
                         fld_cache_entry_delete(cache, f_curr);
@@ -181,7 +181,7 @@ restart_fixup:
                 /* check if current range overlaps with next range. */
                 if (n_range->lsr_start < c_range->lsr_end) {
 
-                        if (c_range->lsr_mdt == n_range->lsr_mdt) {
+                        if (c_range->lsr_index == n_range->lsr_index) {
                                 n_range->lsr_start = c_range->lsr_start;
                                 n_range->lsr_end = max(c_range->lsr_end,
                                                        n_range->lsr_end);
@@ -302,7 +302,7 @@ void fld_cache_punch_hole(struct fld_cache *cache,
         /* fldt */
         fldt->fce_range.lsr_start = new_end;
         fldt->fce_range.lsr_end = f_curr->fce_range.lsr_end;
-        fldt->fce_range.lsr_mdt = f_curr->fce_range.lsr_mdt;
+        fldt->fce_range.lsr_index = f_curr->fce_range.lsr_index;
 
         /* f_curr */
         f_curr->fce_range.lsr_end = new_start;
@@ -325,12 +325,12 @@ void fld_cache_overlap_handle(struct fld_cache *cache,
         const struct lu_seq_range *range = &f_new->fce_range;
         const seqno_t new_start  = range->lsr_start;
         const seqno_t new_end  = range->lsr_end;
-        const mdsno_t mdt = range->lsr_mdt;
+        const mdsno_t mdt = range->lsr_index;
 
         /* this is overlap case, these case are checking overlapping with
          * prev range only. fixup will handle overlaping with next range. */
 
-        if (f_curr->fce_range.lsr_mdt == mdt) {
+        if (f_curr->fce_range.lsr_index == mdt) {
                 f_curr->fce_range.lsr_start = min(f_curr->fce_range.lsr_start,
                                                   new_start);
 
index e9dff56..489bd2c 100644 (file)
@@ -143,20 +143,18 @@ int fld_server_create(struct lu_server_fld *fld,
 
         /* STEP 1: try to merge with previous range */
         rc = fld_index_lookup(fld, env, new->lsr_start, erange);
-        if (!rc) {
-                /* in case of range overlap, mdt ID must be same for both ranges */
-                if (new->lsr_mdt != erange->lsr_mdt) {
-                        CERROR("mdt[%x] for given range is different from"
-                               "existing overlapping range mdt[%x]\n",
-                                new->lsr_mdt, erange->lsr_mdt);
-                        rc = -EIO;
-                        GOTO(out, rc);
+        if (rc == 0) {
+                /* in case of range overlap, the location must be same */
+                if (range_compare_loc(new, erange) != 0) {
+                        CERROR("the start of given range "DRANGE" conflict to"
+                               "an existing range "DRANGE"\n",
+                               PRANGE(new), PRANGE(erange));
+                        GOTO(out, rc = -EIO);
                 }
 
                 if (new->lsr_end < erange->lsr_end)
                         GOTO(out, rc);
                 do_merge = 1;
-
         } else if (rc == -ENOENT) {
                 /* check for merge case: optimizes for single mds lustre.
                  * As entry does not exist, returned entry must be left side
@@ -164,7 +162,7 @@ int fld_server_create(struct lu_server_fld *fld,
                  * So try to merge from left.
                  */
                 if (new->lsr_start == erange->lsr_end &&
-                    new->lsr_mdt == erange->lsr_mdt)
+                    range_compare_loc(new, erange) == 0)
                         do_merge = 1;
         } else {
                 /* no overlap allowed in fld, so failure in lookup is error */
@@ -172,53 +170,62 @@ int fld_server_create(struct lu_server_fld *fld,
         }
 
         if (do_merge) {
-                /* new range can be combined with existing one.
-                 * So delete existing range.
-                 */
-
+                /* new range will be merged with the existing one.
+                 * delete this range at first. */
                 rc = fld_index_delete(fld, env, erange, th);
-                if (rc == 0) {
-                        new->lsr_start = min(erange->lsr_start, new->lsr_start);
-                        new->lsr_end = max(erange->lsr_end, new->lsr_end);
-                } else
+                if (rc != 0)
                         GOTO(out, rc);
 
+                new->lsr_start = min(erange->lsr_start, new->lsr_start);
+                new->lsr_end = max(erange->lsr_end, new->lsr_end);
                 do_merge = 0;
         }
 
         /* STEP 2: try to merge with next range */
         rc = fld_index_lookup(fld, env, new->lsr_end, erange);
-        if (!rc) {
-                /* case range overlap: with right side entry. */
-                if (new->lsr_mdt == erange->lsr_mdt)
-                        do_merge = 1;
+        if (rc == 0) {
+                /* found a matched range, meaning we're either
+                 * overlapping or ajacent, must merge with it. */
+                do_merge = 1;
         } else if (rc == -ENOENT) {
                 /* this range is left of new range end point */
                 LASSERT(erange->lsr_end <= new->lsr_end);
-
-                if (new->lsr_end == erange->lsr_end)
-                        do_merge = 1;
-                if (new->lsr_start <= erange->lsr_start)
+                /*
+                 * the found left range must be either:
+                 *  1. withing new range.
+                 *  2. left of new range (no overlapping).
+                 * because if they're partly overlapping, the STEP 1 must have
+                 * been removed this range.
+                 */
+                LASSERTF(erange->lsr_start > new->lsr_start ||
+                         erange->lsr_end < new->lsr_start ||
+                         (erange->lsr_end == new->lsr_end &&
+                          range_compare_loc(new, erange) != 0),
+                         "left "DRANGE", new "DRANGE"\n",
+                         PRANGE(erange), PRANGE(new));
+
+                /* if it's within the new range, merge it */
+                if (erange->lsr_start > new->lsr_start)
                         do_merge = 1;
-        } else
+        } else {
                GOTO(out, rc);
+        }
 
         if (do_merge) {
-                if (new->lsr_mdt != erange->lsr_mdt) {
-                        CERROR("mdt[%x] for given range is different from"
-                               "existing overlapping range mdt[%x]\n",
-                                new->lsr_mdt, erange->lsr_mdt);
-                        rc = -EIO;
-                        GOTO(out, rc);
+                if (range_compare_loc(new, erange) != 0) {
+                        CERROR("the end of given range "DRANGE" overlaps "
+                               "with an existing range "DRANGE"\n",
+                               PRANGE(new), PRANGE(erange));
+                        GOTO(out, rc = -EIO);
                 }
-        
+
                 /* merge with next range */
                 rc = fld_index_delete(fld, env, erange, th);
-                if (rc == 0) {
-                        new->lsr_start = min(erange->lsr_start, new->lsr_start);
-                        new->lsr_end = max(erange->lsr_end, new->lsr_end);
-                } else
+                if (rc != 0)
                         GOTO(out, rc);
+
+                new->lsr_start = min(erange->lsr_start, new->lsr_start);
+                new->lsr_end = max(erange->lsr_end, new->lsr_end);
         }
 
         /* now update fld entry. */
@@ -253,17 +260,39 @@ int fld_server_lookup(struct lu_server_fld *fld,
                       const struct lu_env *env,
                       seqno_t seq, struct lu_seq_range *range)
 {
+        struct lu_seq_range *erange;
+        struct fld_thread_info *info;
         int rc;
         ENTRY;
 
+        info = lu_context_key_get(&env->le_ctx, &fld_thread_key);
+        erange = &info->fti_lrange;
+
         /* Lookup it in the cache. */
-        rc = fld_cache_lookup(fld->lsf_cache, seq, range);
-        if (rc == 0)
+        rc = fld_cache_lookup(fld->lsf_cache, seq, erange);
+        if (rc == 0) {
+                if (unlikely(erange->lsr_flags != range->lsr_flags)) {
+                        CERROR("FLD cache found a range "DRANGE" doesn't "
+                               "match the requested flag %x\n",
+                               PRANGE(erange), range->lsr_flags);
+                        RETURN(-EIO);
+                }
+                *range = *erange;
                 RETURN(0);
+        }
 
-        if (fld->lsf_obj)
-                rc = fld_index_lookup(fld, env, seq, range);
-        else {
+        if (fld->lsf_obj) {
+                rc = fld_index_lookup(fld, env, seq, erange);
+                if (rc == 0) {
+                        if (unlikely(erange->lsr_flags != range->lsr_flags)) {
+                                CERROR("FLD found a range "DRANGE" doesn't "
+                                       "match the requested flag %x\n",
+                                       PRANGE(erange), range->lsr_flags);
+                                RETURN(-EIO);
+                        }
+                        *range = *erange;
+                }
+        } else {
                 LASSERT(fld->lsf_control_exp);
                 /* send request to mdt0 i.e. super seq. controller.
                  * This is temporary solution, long term solution is fld
@@ -305,7 +334,7 @@ static int fld_server_handle(struct lu_server_fld *fld,
 
         CDEBUG(D_INFO, "%s: FLD req handle: error %d (opc: %d, range: "
                DRANGE"\n", fld->lsf_name, rc, opc, PRANGE(range));
-        
+
         RETURN(rc);
 
 }
@@ -415,7 +444,7 @@ int fid_is_local(const struct lu_env *env,
                 rc = fld_cache_lookup(msite->ms_client_fld->lcf_cache,
                                       fid_seq(fid), range);
                 if (rc == 0)
-                        result = (range->lsr_mdt == msite->ms_node_id);
+                        result = (range->lsr_index == msite->ms_node_id);
         }
         return result;
 }
@@ -505,7 +534,8 @@ int fld_server_init(struct lu_server_fld *fld, struct dt_device *dt,
         /* Insert reserved sequence number of ".lustre" into fld cache. */
         range.lsr_start = FID_SEQ_DOT_LUSTRE;
         range.lsr_end = FID_SEQ_DOT_LUSTRE + 1;
-        range.lsr_mdt = 0;
+        range.lsr_index = 0;
+        range.lsr_flags = LU_SEQ_RANGE_MDT;
         fld_cache_insert(fld->lsf_cache, &range);
 
         EXIT;
index 66d29b5..dd7b1de 100644 (file)
@@ -69,7 +69,8 @@ const char fld_index_name[] = "fld";
 static const struct lu_seq_range IGIF_FLD_RANGE = {
         .lsr_start = 1,
         .lsr_end   = FID_SEQ_IDIF,
-        .lsr_mdt   = 0
+        .lsr_index   = 0,
+        .lsr_flags  = LU_SEQ_RANGE_MDT
 };
 
 const struct dt_index_features fld_index_features = {
@@ -206,13 +207,15 @@ int fld_index_delete(struct lu_server_fld *fld,
 }
 
 /**
- * lookup range for a seq passed
+ * lookup range for a seq passed. note here we only care about the start/end,
+ * caller should handle the attached location data (flags, index).
  *
- *      \param  seq     seq for lookup.
- *      \param  range   result of lookup.
+ * \param  seq     seq for lookup.
+ * \param  range   result of lookup.
  *
- *      \retval  0  success
- *      \retval  -ve error
+ * \retval  0           found, \a range is the matched range;
+ * \retval -ENOENT      not found, \a range is the left-side range;
+ * \retval  -ve         other error;
  */
 
 int fld_index_lookup(struct lu_server_fld *fld,
index 536b244..2ea527a 100644 (file)
@@ -467,9 +467,8 @@ out_req:
         return rc;
 }
 
-int fld_client_lookup(struct lu_client_fld *fld,
-                      seqno_t seq, mdsno_t *mds,
-                      const struct lu_env *env)
+int fld_client_lookup(struct lu_client_fld *fld, seqno_t seq, mdsno_t *mds,
+                      __u32 flags, const struct lu_env *env)
 {
         struct lu_seq_range res;
         struct lu_fld_target *target;
@@ -480,7 +479,7 @@ int fld_client_lookup(struct lu_client_fld *fld,
 
         rc = fld_cache_lookup(fld->lcf_cache, seq, &res);
         if (rc == 0) {
-                *mds = res.lsr_mdt;
+                *mds = res.lsr_index;
                 RETURN(0);
         }
 
@@ -493,6 +492,7 @@ int fld_client_lookup(struct lu_client_fld *fld,
                fld_target_name(target), target->ft_idx);
 
         res.lsr_start = seq;
+        res.lsr_flags = flags;
 #ifdef __KERNEL__
         if (target->ft_srv != NULL) {
                 LASSERT(env != NULL);
@@ -507,7 +507,7 @@ int fld_client_lookup(struct lu_client_fld *fld,
 #endif
 
         if (rc == 0) {
-                *mds = res.lsr_mdt;
+                *mds = res.lsr_index;
 
                 fld_cache_insert(fld->lcf_cache, &res);
         }
index 6ef74d1..a60e444 100644 (file)
@@ -189,15 +189,18 @@ typedef __u32 obd_count;
 /**
  * Describes a range of sequence, lsr_start is included but lsr_end is
  * not in the range.
- * Same structure is used in fld module where lsr_mdt field holds mdt id
+ * Same structure is used in fld module where lsr_index field holds mdt id
  * of the home mdt.
  */
 
+#define LU_SEQ_RANGE_MDT        0x0
+#define LU_SEQ_RANGE_OST        0x1
+
 struct lu_seq_range {
         __u64 lsr_start;
         __u64 lsr_end;
-        __u32 lsr_mdt;
-        __u32 lsr_padding;
+        __u32 lsr_index;
+        __u32 lsr_flags;
 };
 
 /**
@@ -215,7 +218,7 @@ static inline __u64 range_space(const struct lu_seq_range *range)
 
 static inline void range_init(struct lu_seq_range *range)
 {
-        range->lsr_start = range->lsr_end = range->lsr_mdt = 0;
+        range->lsr_start = range->lsr_end = range->lsr_index = 0;
 }
 
 /**
@@ -244,12 +247,21 @@ static inline int range_is_exhausted(const struct lu_seq_range *range)
         return range_space(range) == 0;
 }
 
-#define DRANGE "[%#16.16"LPF64"x-%#16.16"LPF64"x):%x"
+/* return 0 if two range have the same location */
+static inline int range_compare_loc(const struct lu_seq_range *r1,
+                                    const struct lu_seq_range *r2)
+{
+        return r1->lsr_index != r2->lsr_index ||
+               r1->lsr_flags != r2->lsr_flags;
+}
+
+#define DRANGE "[%#16.16"LPF64"x-%#16.16"LPF64"x):%x:%x"
 
 #define PRANGE(range)      \
         (range)->lsr_start, \
         (range)->lsr_end,    \
-        (range)->lsr_mdt
+        (range)->lsr_index,  \
+        (range)->lsr_flags
 
 /** \defgroup lu_fid lu_fid
  * @{ */
index 1f93d4c..c1f2ec0 100644 (file)
@@ -376,28 +376,32 @@ static inline void range_cpu_to_le(struct lu_seq_range *dst, const struct lu_seq
 {
         dst->lsr_start = cpu_to_le64(src->lsr_start);
         dst->lsr_end = cpu_to_le64(src->lsr_end);
-        dst->lsr_mdt = cpu_to_le32(src->lsr_mdt);
+        dst->lsr_index = cpu_to_le32(src->lsr_index);
+        dst->lsr_flags = cpu_to_le32(src->lsr_flags);
 }
 
 static inline void range_le_to_cpu(struct lu_seq_range *dst, const struct lu_seq_range *src)
 {
         dst->lsr_start = le64_to_cpu(src->lsr_start);
         dst->lsr_end = le64_to_cpu(src->lsr_end);
-        dst->lsr_mdt = le32_to_cpu(src->lsr_mdt);
+        dst->lsr_index = le32_to_cpu(src->lsr_index);
+        dst->lsr_flags = le32_to_cpu(src->lsr_flags);
 }
 
 static inline void range_cpu_to_be(struct lu_seq_range *dst, const struct lu_seq_range *src)
 {
         dst->lsr_start = cpu_to_be64(src->lsr_start);
         dst->lsr_end = cpu_to_be64(src->lsr_end);
-        dst->lsr_mdt = cpu_to_be32(src->lsr_mdt);
+        dst->lsr_index = cpu_to_be32(src->lsr_index);
+        dst->lsr_flags = cpu_to_be32(src->lsr_flags);
 }
 
 static inline void range_be_to_cpu(struct lu_seq_range *dst, const struct lu_seq_range *src)
 {
         dst->lsr_start = be64_to_cpu(src->lsr_start);
         dst->lsr_end = be64_to_cpu(src->lsr_end);
-        dst->lsr_mdt = be32_to_cpu(src->lsr_mdt);
+        dst->lsr_index = be32_to_cpu(src->lsr_index);
+        dst->lsr_flags = be32_to_cpu(src->lsr_flags);
 }
 
 /** @} fid */
index 571ec45..428b352 100644 (file)
@@ -176,9 +176,8 @@ void fld_client_fini(struct lu_client_fld *fld);
 
 void fld_client_flush(struct lu_client_fld *fld);
 
-int fld_client_lookup(struct lu_client_fld *fld,
-                      seqno_t seq, mdsno_t *mds,
-                      const struct lu_env *env);
+int fld_client_lookup(struct lu_client_fld *fld, seqno_t seq, mdsno_t *mds,
+                      __u32 flags, const struct lu_env *env);
 
 int fld_client_create(struct lu_client_fld *fld,
                       struct lu_seq_range *range,
index 5f91a7f..8d111d9 100644 (file)
@@ -68,13 +68,14 @@ int lmv_fld_lookup(struct lmv_obd *lmv,
         ENTRY;
 
         LASSERT(fid_is_sane(fid));
-        rc = fld_client_lookup(&lmv->lmv_fld, fid_seq(fid), mds, NULL);
+        rc = fld_client_lookup(&lmv->lmv_fld, fid_seq(fid), mds,
+                               LU_SEQ_RANGE_MDT, NULL);
         if (rc) {
                 CERROR("Error while looking for mds number. Seq "LPX64
                        ", err = %d\n", fid_seq(fid), rc);
                 RETURN(rc);
         }
-        
+
         CDEBUG(D_INODE, "FLD lookup got mds #%x for fid="DFID"\n",
                *mds, PFID(fid));
 
index e70a453..18ebd29 100644 (file)
@@ -111,7 +111,8 @@ void lustre_swab_lu_seq_range(struct lu_seq_range *range)
 {
         __swab64s (&range->lsr_start);
         __swab64s (&range->lsr_end);
-        __swab32s (&range->lsr_mdt);
+        __swab32s (&range->lsr_index);
+        __swab32s (&range->lsr_flags);
 }
 EXPORT_SYMBOL(lustre_swab_lu_seq_range);