Whamcloud - gitweb
LU-9165 pfl: MDS handling of write intent IT_LAYOUT RPC
[fs/lustre-release.git] / lustre / lod / lod_object.c
index 93e9e1f..b955a44 100644 (file)
@@ -2162,6 +2162,57 @@ static int lod_object_replace_parent_fid(const struct lu_env *env,
        RETURN(rc);
 }
 
+inline __u16 lod_comp_entry_stripecnt(struct lod_object *lo,
+                                     struct lod_layout_component *entry,
+                                     bool is_dir)
+{
+       if (is_dir)
+               return  0;
+       else if (lod_comp_inited(entry))
+               return entry->llc_stripenr;
+       else
+               return lod_get_stripecnt(lu2lod_dev(lod2lu_obj(lo)->lo_dev), lo,
+                                        entry->llc_stripenr);
+}
+
+static int lod_comp_md_size(struct lod_object *lo, bool is_dir)
+{
+       int magic, size = 0, i;
+       struct lod_layout_component *comp_entries;
+       __u16 comp_cnt;
+       bool is_composite;
+
+       if (is_dir) {
+               comp_cnt = lo->ldo_def_striping->lds_def_comp_cnt;
+               comp_entries = lo->ldo_def_striping->lds_def_comp_entries;
+               is_composite =
+                       lo->ldo_def_striping->lds_def_striping_is_composite;
+       } else {
+               comp_cnt = lo->ldo_comp_cnt;
+               comp_entries = lo->ldo_comp_entries;
+               is_composite = lo->ldo_is_composite;
+       }
+
+
+       LASSERT(comp_cnt != 0 && comp_entries != NULL);
+       if (is_composite) {
+               size = sizeof(struct lov_comp_md_v1) +
+                      sizeof(struct lov_comp_md_entry_v1) * comp_cnt;
+               LASSERT(size % sizeof(__u64) == 0);
+       }
+
+       for (i = 0; i < comp_cnt; i++) {
+               __u16 stripenr;
+
+               magic = comp_entries[i].llc_pool ? LOV_MAGIC_V3 : LOV_MAGIC_V1;
+               stripenr = lod_comp_entry_stripecnt(lo, &comp_entries[i],
+                                                   is_dir);
+               size += lov_user_md_size(stripenr, magic);
+               LASSERT(size % sizeof(__u64) == 0);
+       }
+       return size;
+}
+
 /**
  * Declare component add. The xattr name is XATTR_LUSTRE_LOV.add, and
  * the xattr value is binary lov_comp_md_v1 which contains component(s)
@@ -2180,14 +2231,14 @@ static int lod_declare_layout_add(const struct lu_env *env,
                                  const struct lu_buf *buf,
                                  struct thandle *th)
 {
+       struct lod_thread_info  *info = lod_env_info(env);
        struct lod_layout_component *comp_array, *lod_comp;
        struct lod_device       *d = lu2lod_dev(dt->do_lu.lo_dev);
+       struct dt_object *next = dt_object_child(dt);
        struct lov_desc         *desc = &d->lod_desc;
        struct lod_object       *lo = lod_dt_obj(dt);
-       struct lov_user_md_v1   *v1;
        struct lov_user_md_v3   *v3;
        struct lov_comp_md_v1   *comp_v1 = buf->lb_buf;
-       struct lu_extent        *ext;
        __u32   magic;
        __u64   prev_end;
        int     i, rc, array_cnt;
@@ -2195,27 +2246,12 @@ static int lod_declare_layout_add(const struct lu_env *env,
 
        LASSERT(lo->ldo_is_composite);
 
-       magic = comp_v1->lcm_magic;
-       /* Replay request, see comment for LOV_MAGIC_DEF */
-       if (unlikely(le32_to_cpu(magic) == LOV_MAGIC_COMP_V1_DEF)) {
-               struct dt_object *next = dt_object_child(dt);
-
-               lod_object_free_striping(env, lo);
-               rc = lod_use_defined_striping(env, lo, buf);
-               if (rc == 0) {
-                       lo->ldo_comp_cached = 1;
-                       rc = lod_sub_object_declare_xattr_set(env, next, buf,
-                                                             XATTR_NAME_LOV,
-                                                             0, th);
-               }
-               RETURN(rc);
-       }
-
        prev_end = lo->ldo_comp_entries[lo->ldo_comp_cnt - 1].llc_extent.e_end;
        rc = lod_verify_striping(d, buf, false, prev_end);
        if (rc != 0)
                RETURN(rc);
 
+       magic = comp_v1->lcm_magic;
        if (magic == __swab32(LOV_USER_MAGIC_COMP_V1)) {
                lustre_swab_lov_comp_md_v1(comp_v1);
                magic = comp_v1->lcm_magic;
@@ -2233,6 +2269,9 @@ static int lod_declare_layout_add(const struct lu_env *env,
               sizeof(*comp_array) * lo->ldo_comp_cnt);
 
        for (i = 0; i < comp_v1->lcm_entry_count; i++) {
+               struct lov_user_md_v1 *v1;
+               struct lu_extent *ext;
+
                v1 = (struct lov_user_md *)((char *)comp_v1 +
                                comp_v1->lcm_entries[i].lcme_offset);
                ext = &comp_v1->lcm_entries[i].lcme_extent;
@@ -2243,23 +2282,21 @@ static int lod_declare_layout_add(const struct lu_env *env,
                lod_comp->llc_stripe_offset = v1->lmm_stripe_offset;
 
                lod_comp->llc_stripenr = v1->lmm_stripe_count;
-               if (lod_comp->llc_stripenr <= 0)
+               if (!lod_comp->llc_stripenr ||
+                   lod_comp->llc_stripenr == (__u16)-1)
                        lod_comp->llc_stripenr = desc->ld_default_stripe_count;
                lod_comp->llc_stripe_size = v1->lmm_stripe_size;
-               if (lod_comp->llc_stripe_size <= 0)
+               if (!lod_comp->llc_stripe_size)
                        lod_comp->llc_stripe_size =
                                desc->ld_default_stripe_size;
 
                if (v1->lmm_magic == LOV_USER_MAGIC_V3) {
-                       int len;
                        v3 = (struct lov_user_md_v3 *) v1;
                        if (v3->lmm_pool_name[0] != '\0') {
-                               len = strlen(v3->lmm_pool_name);
-                               OBD_ALLOC(lod_comp->llc_pool, len + 1);
-                               if (lod_comp->llc_pool == NULL)
-                                       GOTO(error, rc = -ENOMEM);
-                               strncpy(lod_comp->llc_pool, v3->lmm_pool_name,
-                                       len + 1);
+                               rc = lod_set_pool(&lod_comp->llc_pool,
+                                                 v3->lmm_pool_name);
+                               if (rc)
+                                       GOTO(error, rc);
                        }
                }
        }
@@ -2270,8 +2307,13 @@ static int lod_declare_layout_add(const struct lu_env *env,
        /* No need to increase layout generation here, it will be increased
         * later when generating component ID for the new components */
 
-       rc = lod_declare_striped_object(env, dt, NULL, NULL, th);
-       RETURN(rc);
+       info->lti_buf.lb_len = lod_comp_md_size(lo, false);
+       rc = lod_sub_object_declare_xattr_set(env, next, &info->lti_buf,
+                                             XATTR_NAME_LOV, 0, th);
+       if (rc)
+               GOTO(error, rc);
+
+       RETURN(0);
 
 error:
        for (i = lo->ldo_comp_cnt; i < array_cnt; i++) {
@@ -2286,43 +2328,6 @@ error:
        RETURN(rc);
 }
 
-static int lod_comp_md_size(struct lod_object *lo, bool is_dir)
-{
-       int magic, size = 0, i;
-       struct lod_layout_component *comp_entries;
-       __u16 comp_cnt;
-       bool is_composite;
-
-       if (is_dir) {
-               comp_cnt = lo->ldo_def_striping->lds_def_comp_cnt;
-               comp_entries = lo->ldo_def_striping->lds_def_comp_entries;
-               is_composite =
-                       lo->ldo_def_striping->lds_def_striping_is_composite;
-       } else {
-               comp_cnt = lo->ldo_comp_cnt;
-               comp_entries = lo->ldo_comp_entries;
-               is_composite = lo->ldo_is_composite;
-       }
-
-
-       LASSERT(comp_cnt != 0 && comp_entries != NULL);
-       if (is_composite) {
-               size = sizeof(struct lov_comp_md_v1) +
-                      sizeof(struct lov_comp_md_entry_v1) * comp_cnt;
-               LASSERT(size % sizeof(__u64) == 0);
-       }
-
-       for (i = 0; i < comp_cnt; i++) {
-               magic = comp_entries[i].llc_pool ? LOV_MAGIC_V3 : LOV_MAGIC_V1;
-
-               size += lov_user_md_size(
-                       is_dir ? 0 : comp_entries[i].llc_stripenr,
-                       magic);
-               LASSERT(size % sizeof(__u64) == 0);
-       }
-       return size;
-}
-
 /**
  * Declare component set. The xattr is name XATTR_LUSTRE_LOV.set.$field,
  * the '$field' can only be 'flags' now. The xattr value is binary
@@ -3280,6 +3285,12 @@ static int lod_layout_del(const struct lu_env *env, struct dt_object *dt,
                lod_comp->llc_stripe = NULL;
                lod_comp->llc_stripes_allocated = 0;
                lod_obj_set_pool(lo, i, NULL);
+               if (lod_comp->llc_ostlist.op_array) {
+                       OBD_FREE(lod_comp->llc_ostlist.op_array,
+                                lod_comp->llc_ostlist.op_size);
+                       lod_comp->llc_ostlist.op_array = NULL;
+                       lod_comp->llc_ostlist.op_size = 0;
+               }
        }
 
        LASSERTF(left >= 0 && left < lo->ldo_comp_cnt, "left = %d\n", left);
@@ -4307,15 +4318,16 @@ int lod_striping_create(const struct lu_env *env, struct dt_object *dt,
        for (i = 0; i < lo->ldo_comp_cnt; i++) {
                lod_comp = &lo->ldo_comp_entries[i];
 
-               if (lod_comp->llc_flags & LCME_FL_INIT)
+               if (lod_comp_inited(lod_comp))
                        continue;
 
-               lod_comp->llc_flags |= LCME_FL_INIT;
+               if (lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED)
+                       lod_comp_set_init(lod_comp);
 
                if (lod_comp->llc_stripe == NULL)
                        continue;
 
-               LASSERT(lod_comp->llc_stripenr > 0);
+               LASSERT(lod_comp->llc_stripenr);
                for (j = 0; j < lod_comp->llc_stripenr; j++) {
                        struct dt_object *object = lod_comp->llc_stripe[j];
                        LASSERT(object != NULL);
@@ -4324,6 +4336,7 @@ int lod_striping_create(const struct lu_env *env, struct dt_object *dt,
                        if (rc)
                                break;
                }
+               lod_comp_set_init(lod_comp);
        }
 
        if (rc == 0)
@@ -4817,6 +4830,209 @@ static int lod_invalidate(const struct lu_env *env, struct dt_object *dt)
        return dt_invalidate(env, dt_object_child(dt));
 }
 
+/**
+ * Resize per-thread ost list to hold OST target index list already used.
+ *
+ * \param[in,out] inuse                structure contains ost list array
+ * \param[in] cnt              total stripe count of all components
+ * \param[in] max              array's max size if @max > 0
+ *
+ * \retval 0           on success
+ * \retval -ENOMEM     reallocation failed
+ */
+int lod_inuse_resize(struct ost_pool *inuse, __u16 cnt, __u16 max)
+{
+       __u32 *array;
+       __u32 new = cnt * sizeof(__u32);
+
+       inuse->op_count = 0;
+
+       if (new <= inuse->op_size)
+               return 0;
+
+       if (max)
+               new = min_t(__u32, new, max);
+       OBD_ALLOC(array, new);
+       if (!array)
+               return -ENOMEM;
+
+       if (inuse->op_array)
+               OBD_FREE(inuse->op_array, inuse->op_size);
+
+       inuse->op_array = array;
+       inuse->op_size = new;
+
+       return 0;
+}
+
+static int lod_declare_layout_change(const struct lu_env *env,
+                                    struct dt_object *dt,
+                                    struct layout_intent *layout,
+                                    const struct lu_buf *buf,
+                                    struct thandle *th)
+{
+       struct lod_thread_info  *info = lod_env_info(env);
+       struct lod_object *lo = lod_dt_obj(dt);
+       struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
+       struct dt_object *next = dt_object_child(dt);
+       struct lod_obj_stripe_cb_data data;
+       struct ost_pool *inuse = &info->lti_inuse_osts;
+       struct lod_layout_component *lod_comp;
+       struct lov_comp_md_v1 *comp_v1 = NULL;
+       bool replay = false;
+       bool need_create = false;
+       int i, rc;
+       __u32 stripe_cnt = 0;
+       ENTRY;
+
+       if (!S_ISREG(dt->do_lu.lo_header->loh_attr) || !dt_object_exists(dt) ||
+           dt_object_remote(next))
+               RETURN(-EINVAL);
+
+       dt_write_lock(env, next, 0);
+       /*
+        * In case the client is passing lovea, which only happens during
+        * the replay of layout intent write RPC for now, we may need to
+        * parse the lovea and apply new layout configuration.
+        */
+       if (buf && buf->lb_len)  {
+               struct lov_user_md_v1 *v1 = buf->lb_buf;
+
+               if (v1->lmm_magic != (LOV_MAGIC_DEF | LOV_MAGIC_COMP_V1) &&
+                   v1->lmm_magic !=
+                               __swab32(LOV_MAGIC_DEF | LOV_MAGIC_COMP_V1)) {
+                       CERROR("%s: the replay buffer of layout extend "
+                              "(magic %#x) does not contain expected "
+                              "composite layout.\n",
+                              lod2obd(d)->obd_name, v1->lmm_magic);
+                       GOTO(out, rc = -EINVAL);
+               }
+
+               lod_object_free_striping(env, lo);
+               rc = lod_use_defined_striping(env, lo, buf);
+               if (rc)
+                       GOTO(out, rc);
+
+               rc = lod_get_lov_ea(env, lo);
+               if (rc <= 0)
+                       GOTO(out, rc);
+               /* old on-disk EA is stored in info->lti_buf */
+               comp_v1 = (struct lov_comp_md_v1 *)&info->lti_buf.lb_buf;
+               replay = true;
+       } else {
+               /* non replay path */
+               rc = lod_load_striping_locked(env, lo);
+               if (rc)
+                       GOTO(out, rc);
+
+               /* Prepare inuse array for composite file */
+               for (i = 0; i < lo->ldo_comp_cnt; i++)
+                       stripe_cnt += lod_comp_entry_stripecnt(lo,
+                                               &lo->ldo_comp_entries[i],
+                                               false);
+               rc = lod_inuse_resize(inuse, stripe_cnt, d->lod_osd_max_easize);
+               if (rc)
+                       GOTO(out, rc);
+
+               data.locd_inuse = inuse;
+               rc = lod_obj_for_each_stripe(env, lo, NULL,
+                                            lod_obj_stripe_set_inuse_cb,
+                                            &data);
+               if (rc)
+                       GOTO(out, rc);
+       }
+
+       /* Make sure defined layout covers the requested write range. */
+       lod_comp = &lo->ldo_comp_entries[lo->ldo_comp_cnt - 1];
+       if ((lod_comp->llc_extent.e_end != OBD_OBJECT_EOF &&
+            lod_comp->llc_extent.e_end < layout->li_end)) {
+               CDEBUG(replay ? D_ERROR : D_LAYOUT,
+                      "%s: the defined layout [0, %#llx) does not covers "
+                      "the write range [%#llx, %#llx).\n",
+                      lod2obd(d)->obd_name, lod_comp->llc_extent.e_end,
+                      layout->li_start, layout->li_end);
+               GOTO(out, rc = -EINVAL);
+       }
+
+       /*
+        * Iterate ld->ldo_comp_entries, find the component whose extent under
+        * the write range and not instantianted.
+        */
+       for (i = 0; i < lo->ldo_comp_cnt; i++) {
+               lod_comp = &lo->ldo_comp_entries[i];
+
+               if (lod_comp->llc_extent.e_start >= layout->li_end)
+                       break;
+
+               if (!replay) {
+                       if (lod_comp_inited(lod_comp))
+                               continue;
+               } else {
+                       /**
+                        * In replay path, lod_comp is the EA passed by
+                        * client replay buffer,  comp_v1 is the pre-recovery
+                        * on-disk EA, we'd sift out those components which
+                        * were init-ed in the on-disk EA.
+                        */
+                       if (le32_to_cpu(comp_v1->lcm_entries[i].lcme_flags) &
+                           LCME_FL_INIT)
+                               continue;
+               }
+               /*
+                * this component hasn't instantiated in normal path, or during
+                * replay it needs replay the instantiation.
+                */
+
+               /* A released component is being extended */
+               if (lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED)
+                       GOTO(out, rc = -EINVAL);
+
+               need_create = true;
+               /*
+                * In replay, the component EA is passed by client,
+                * Clear LCME_FL_INIT so that lod_striping_create() can create
+                * the striping objects.
+                */
+               if (replay)
+                       lod_comp_unset_init(lod_comp);
+
+               rc = lod_qos_prep_create(env, lo, NULL, th, i, inuse);
+               if (rc)
+                       break;
+       }
+
+       if (need_create)
+               lod_obj_inc_layout_gen(lo);
+       else
+               GOTO(unlock, rc = -EALREADY);
+
+       if (!rc) {
+               info->lti_buf.lb_len = lod_comp_md_size(lo, false);
+               rc = lod_sub_object_declare_xattr_set(env, next, &info->lti_buf,
+                                                     XATTR_NAME_LOV, 0, th);
+       }
+out:
+       if (rc)
+               lod_object_free_striping(env, lo);
+
+unlock:
+       dt_write_unlock(env, next);
+
+       RETURN(rc);
+}
+
+/**
+ * Instantiate layout component objects which covers the intent write offset.
+ */
+static int lod_layout_change(const struct lu_env *env, struct dt_object *dt,
+                            struct layout_intent *layout,
+                            const struct lu_buf *buf, struct thandle *th)
+{
+       struct lu_attr *attr = &lod_env_info(env)->lti_attr;
+
+       RETURN(lod_striping_create(env, dt, attr, NULL, th));
+}
+
 struct dt_object_operations lod_obj_ops = {
        .do_read_lock           = lod_object_read_lock,
        .do_write_lock          = lod_object_write_lock,
@@ -4846,6 +5062,8 @@ struct dt_object_operations lod_obj_ops = {
        .do_object_lock         = lod_object_lock,
        .do_object_unlock       = lod_object_unlock,
        .do_invalidate          = lod_invalidate,
+       .do_declare_layout_change = lod_declare_layout_change,
+       .do_layout_change       = lod_layout_change,
 };
 
 /**