Whamcloud - gitweb
LU-9771 lod: define ldo_mirrors to manage mirrors
[fs/lustre-release.git] / lustre / lod / lod_object.c
index 7b54498..e53a438 100644 (file)
@@ -2218,7 +2218,7 @@ static int lod_declare_layout_add(const struct lu_env *env,
                                  struct thandle *th)
 {
        struct lod_thread_info  *info = lod_env_info(env);
-       struct lod_layout_component *comp_array, *lod_comp;
+       struct lod_layout_component *comp_array, *lod_comp, *old_array;
        struct lod_device       *d = lu2lod_dev(dt->do_lu.lo_dev);
        struct dt_object *next = dt_object_child(dt);
        struct lov_desc         *desc = &d->lod_desc;
@@ -2226,11 +2226,14 @@ static int lod_declare_layout_add(const struct lu_env *env,
        struct lov_user_md_v3   *v3;
        struct lov_comp_md_v1   *comp_v1 = buf->lb_buf;
        __u32   magic;
-       int     i, rc, array_cnt;
+       int     i, rc, array_cnt, old_array_cnt;
        ENTRY;
 
        LASSERT(lo->ldo_is_composite);
 
+       if (lo->ldo_flr_state != LCM_FL_NOT_FLR)
+               RETURN(-EBUSY);
+
        rc = lod_verify_striping(d, lo, buf, false);
        if (rc != 0)
                RETURN(rc);
@@ -2287,7 +2290,9 @@ static int lod_declare_layout_add(const struct lu_env *env,
                }
        }
 
-       OBD_FREE(lo->ldo_comp_entries, sizeof(*lod_comp) * lo->ldo_comp_cnt);
+       old_array = lo->ldo_comp_entries;
+       old_array_cnt = lo->ldo_comp_cnt;
+
        lo->ldo_comp_entries = comp_array;
        lo->ldo_comp_cnt = array_cnt;
 
@@ -2297,8 +2302,16 @@ static int lod_declare_layout_add(const struct lu_env *env,
        info->lti_buf.lb_len = lod_comp_md_size(lo, false);
        rc = lod_sub_declare_xattr_set(env, next, &info->lti_buf,
                                              XATTR_NAME_LOV, 0, th);
-       if (rc)
+       if (rc) {
+               lo->ldo_comp_entries = old_array;
+               lo->ldo_comp_cnt = old_array_cnt;
                GOTO(error, rc);
+       }
+
+       OBD_FREE(old_array, sizeof(*lod_comp) * old_array_cnt);
+
+       LASSERT(lo->ldo_mirror_count == 1);
+       lo->ldo_mirrors[0].lme_end = array_cnt - 1;
 
        RETURN(0);
 
@@ -2422,6 +2435,9 @@ static int lod_declare_layout_del(const struct lu_env *env,
 
        LASSERT(lo->ldo_is_composite);
 
+       if (lo->ldo_flr_state != LCM_FL_NOT_FLR)
+               RETURN(-EBUSY);
+
        magic = comp_v1->lcm_magic;
        if (magic == __swab32(LOV_USER_MAGIC_COMP_V1)) {
                lustre_swab_lov_comp_md_v1(comp_v1);
@@ -3453,6 +3469,9 @@ static int lod_layout_del(const struct lu_env *env, struct dt_object *dt,
                         sizeof(*comp_array) * lo->ldo_comp_cnt);
                lo->ldo_comp_entries = comp_array;
                lo->ldo_comp_cnt = left;
+
+               LASSERT(lo->ldo_mirror_count == 1);
+               lo->ldo_mirrors[0].lme_end = left - 1;
                lod_obj_inc_layout_gen(lo);
        } else {
                lod_free_comp_entries(lo);
@@ -3700,6 +3719,7 @@ static int lod_get_default_lov_striping(const struct lu_env *env,
        struct lov_user_md_v3 *v3 = NULL;
        struct lov_comp_md_v1 *comp_v1 = NULL;
        __u16   comp_cnt;
+       __u16   mirror_cnt;
        bool    composite;
        int     rc, i;
        ENTRY;
@@ -3733,9 +3753,11 @@ static int lod_get_default_lov_striping(const struct lu_env *env,
                comp_cnt = comp_v1->lcm_entry_count;
                if (comp_cnt == 0)
                        RETURN(-EINVAL);
+               mirror_cnt = comp_v1->lcm_mirror_count + 1;
                composite = true;
        } else {
                comp_cnt = 1;
+               mirror_cnt = 0;
                composite = false;
        }
 
@@ -3745,7 +3767,8 @@ static int lod_get_default_lov_striping(const struct lu_env *env,
                RETURN(rc);
 
        lds->lds_def_comp_cnt = comp_cnt;
-       lds->lds_def_striping_is_composite = composite ? 1 : 0;
+       lds->lds_def_striping_is_composite = composite;
+       lds->lds_def_mirror_cnt = mirror_cnt;
 
        for (i = 0; i < comp_cnt; i++) {
                struct lod_layout_component *lod_comp;
@@ -3881,7 +3904,8 @@ static void lod_striping_from_default(struct lod_object *lo,
        int i, rc;
 
        if (lds->lds_def_striping_set && S_ISREG(mode)) {
-               rc = lod_alloc_comp_entries(lo, lds->lds_def_comp_cnt);
+               rc = lod_alloc_comp_entries(lo, lds->lds_def_mirror_cnt,
+                                           lds->lds_def_comp_cnt);
                if (rc != 0)
                        return;
 
@@ -4142,7 +4166,7 @@ out:
        if (lod_need_inherit_more(lc, false)) {
 
                if (lc->ldo_comp_cnt == 0) {
-                       rc = lod_alloc_comp_entries(lc, 1);
+                       rc = lod_alloc_comp_entries(lc, 0, 1);
                        if (rc)
                                /* fail to allocate memory, will create a
                                 * non-striped file. */
@@ -4429,6 +4453,53 @@ out:
 }
 
 /**
+ * Generate component ID for new created component.
+ *
+ * \param[in] lo               LOD object
+ * \param[in] comp_idx         index of ldo_comp_entries
+ *
+ * \retval                     component ID on success
+ * \retval                     LCME_ID_INVAL on failure
+ */
+static __u32 lod_gen_component_id(struct lod_object *lo,
+                                 int mirror_id, int comp_idx)
+{
+       struct lod_layout_component *lod_comp;
+       __u32   id, start, end;
+       int     i;
+
+       LASSERT(lo->ldo_comp_entries[comp_idx].llc_id == LCME_ID_INVAL);
+
+       lod_obj_inc_layout_gen(lo);
+       id = lo->ldo_layout_gen;
+       if (likely(id <= SEQ_ID_MAX))
+               RETURN(pflr_id(mirror_id, id & SEQ_ID_MASK));
+
+       /* Layout generation wraps, need to check collisions. */
+       start = id & SEQ_ID_MASK;
+       end = SEQ_ID_MAX;
+again:
+       for (id = start; id <= end; id++) {
+               for (i = 0; i < lo->ldo_comp_cnt; i++) {
+                       lod_comp = &lo->ldo_comp_entries[i];
+                       if (pflr_id(mirror_id, id) == lod_comp->llc_id)
+                               break;
+               }
+               /* Found the ununsed ID */
+               if (i == lo->ldo_comp_cnt)
+                       RETURN(pflr_id(mirror_id, id));
+       }
+       if (end == LCME_ID_MAX) {
+               start = 1;
+               end = min(lo->ldo_layout_gen & LCME_ID_MASK,
+                         (__u32)(LCME_ID_MAX - 1));
+               goto again;
+       }
+
+       RETURN(LCME_ID_INVAL);
+}
+
+/**
  * Creation of a striped regular object.
  *
  * The function is called to create the stripe objects for a regular
@@ -4462,6 +4533,12 @@ int lod_striped_create(const struct lu_env *env, struct dt_object *dt,
        for (i = 0; i < lo->ldo_comp_cnt; i++) {
                lod_comp = &lo->ldo_comp_entries[i];
 
+               if (lod_comp->llc_id == LCME_ID_INVAL) {
+                       lod_comp->llc_id = lod_gen_component_id(lo, 0, i);
+                       if (lod_comp->llc_id == LCME_ID_INVAL)
+                               GOTO(out, rc = -ERANGE);
+               }
+
                if (lod_comp_inited(lod_comp))
                        continue;
 
@@ -4480,19 +4557,24 @@ int lod_striped_create(const struct lu_env *env, struct dt_object *dt,
                        LASSERT(object != NULL);
                        rc = lod_sub_create(env, object, attr, NULL, dof, th);
                        if (rc)
-                               break;
+                               GOTO(out, rc);
                }
                lod_comp_set_init(lod_comp);
        }
 
-       if (rc == 0)
-               rc = lod_generate_and_set_lovea(env, lo, th);
+       rc = lod_fill_mirrors(lo);
+       if (rc)
+               GOTO(out, rc);
 
-       if (rc == 0)
-               lo->ldo_comp_cached = 1;
-       else
-               lod_object_free_striping(env, lo);
+       rc = lod_generate_and_set_lovea(env, lo, th);
+       if (rc)
+               GOTO(out, rc);
+
+       lo->ldo_comp_cached = 1;
+       RETURN(0);
 
+out:
+       lod_object_free_striping(env, lo);
        RETURN(rc);
 }