Whamcloud - gitweb
LU-9165 pfl: MDS handling of write intent IT_LAYOUT RPC
authorBobi Jam <bobijam.xu@intel.com>
Wed, 1 Mar 2017 09:25:49 +0000 (17:25 +0800)
committerJinshan Xiong <jinshan.xiong@intel.com>
Thu, 6 Apr 2017 04:53:01 +0000 (21:53 -0700)
* Instantiate the OST objects of the 1st component of the PFL when its
  data are being created or its layout being set.

* Hence component_add won't instantiate OST objects, the instantiation
  is relayed to later write/truncate access.

* When the client tries to write beyond OST-object-instantiated extent
  range, it will issue an IT_LAYOUT intent lock RPC, indicating the
  write range, MDS needs to handle this RPC to allocate all OST
  objects of components covering the range.

* MDS also needs to handle the replay of write intent IT_LAYOUT RPC,
  which contains the current layout objects which the client knows, so
  that MDS won't initialize those objects again.

Reviewed-on: https://review.whamcloud.com/25717

Signed-off-by: Bobi Jam <bobijam.xu@intel.com>
Change-Id: Idcb242af55788dd8d7f01a6ecec4fbe508375512
Reviewed-by: Niu Yawei <yawei.niu@intel.com>
Reviewed-by: Jinshan Xiong <jinshan.xiong@intel.com>
16 files changed:
lustre/include/dt_object.h
lustre/include/md_object.h
lustre/lod/lod_dev.c
lustre/lod/lod_internal.h
lustre/lod/lod_lov.c
lustre/lod/lod_object.c
lustre/lod/lod_qos.c
lustre/mdc/mdc_request.c
lustre/mdd/mdd_internal.h
lustre/mdd/mdd_object.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_lib.c
lustre/mdt/mdt_xattr.c
lustre/ptlrpc/layout.c
lustre/tests/sanity-pfl.sh

index 038095c..f8e63c6 100644 (file)
@@ -1026,6 +1026,40 @@ struct dt_object_operations {
         * \retval negative     negated errno on error
         */
        int   (*do_invalidate)(const struct lu_env *env, struct dt_object *dt);
+
+       /**
+        * Declare intention to instaintiate extended layout component.
+        *
+        * \param[in] env       execution environment
+        * \param[in] dt        DT object
+        * \param[in] layout    data structure to describe the changes to
+        *                      the DT object's layout
+        * \param[in] buf       buffer containing client's lovea or empty
+        *
+        * \retval 0            success
+        * \retval -ne          error code
+        */
+       int (*do_declare_layout_change)(const struct lu_env *env,
+                                       struct dt_object *dt,
+                                       struct layout_intent *layout,
+                                       const struct lu_buf *buf,
+                                       struct thandle *th);
+
+       /**
+        * Client is trying to write to un-instantiated layout component.
+        *
+        * \param[in] env       execution environment
+        * \param[in] dt        DT object
+        * \param[in] layout    data structure to describe the changes to
+        *                      the DT object's layout
+        * \param[in] buf       buffer containing client's lovea or empty
+        *
+        * \retval 0            success
+        * \retval -ne          error code
+        */
+       int (*do_layout_change)(const struct lu_env *env, struct dt_object *dt,
+                               struct layout_intent *layout,
+                               const struct lu_buf *buf, struct thandle *th);
 };
 
 /**
@@ -2712,6 +2746,30 @@ static inline int dt_lookup(const struct lu_env *env,
         return ret;
 }
 
+static inline int dt_declare_layout_change(const struct lu_env *env,
+                                          struct dt_object *o,
+                                          struct layout_intent *layout,
+                                          const struct lu_buf *buf,
+                                          struct thandle *th)
+{
+       LASSERT(o);
+       LASSERT(o->do_ops);
+       LASSERT(o->do_ops->do_declare_layout_change);
+       return o->do_ops->do_declare_layout_change(env, o, layout, buf, th);
+}
+
+static inline int dt_layout_change(const struct lu_env *env,
+                                  struct dt_object *o,
+                                  struct layout_intent *layout,
+                                  const struct lu_buf *buf,
+                                  struct thandle *th)
+{
+       LASSERT(o);
+       LASSERT(o->do_ops);
+       LASSERT(o->do_ops->do_layout_change);
+       return o->do_ops->do_layout_change(env, o, layout, buf, th);
+}
+
 struct dt_find_hint {
        struct lu_fid        *dfh_fid;
        struct dt_device     *dfh_dt;
index cec9c93..41970de 100644 (file)
@@ -217,6 +217,24 @@ struct md_object_operations {
                                 union ldlm_policy_data *policy);
 
        int (*moo_invalidate)(const struct lu_env *env, struct md_object *obj);
+       /**
+        * Trying to write to un-instantiated layout component.
+        *
+        * The caller should have held layout lock.
+        *
+        * \param[in] env       execution environment
+        * \param[in] obj       MD object
+        * \param[in] layout    data structure to describe the changes to
+        *                      the MD object's layout
+        * \param[in] buf       buffer containing the client's lovea
+        *
+        * \retval 0            success
+        * \retval -ne          error code
+        */
+       int (*moo_layout_change)(const struct lu_env *env,
+                                struct md_object *obj,
+                                struct layout_intent *layout,
+                                const struct lu_buf *buf);
 };
 
 /**
@@ -428,6 +446,20 @@ static inline int mo_invalidate(const struct lu_env *env, struct md_object *m)
        return m->mo_ops->moo_invalidate(env, m);
 }
 
+static inline int mo_layout_change(const struct lu_env *env,
+                                  struct md_object *m,
+                                  struct layout_intent *layout,
+                                  const struct lu_buf *buf)
+{
+       CDEBUG(D_INFO, "got layout change request from client: "
+              "opc:%u flags:%#x extent[%#llx,%#llx)\n",
+              layout->li_opc, layout->li_flags,
+              layout->li_start, layout->li_end);
+       /* need instantiate objects which in the access range */
+       LASSERT(m->mo_ops->moo_layout_change);
+       return m->mo_ops->moo_layout_change(env, m, layout, buf);
+}
+
 static inline int mo_swap_layouts(const struct lu_env *env,
                                  struct md_object *o1,
                                  struct md_object *o2, __u64 flags)
index 68065a5..b75038c 100644 (file)
@@ -1831,6 +1831,7 @@ static void lod_key_fini(const struct lu_context *ctx,
        struct lod_thread_info *info = data;
        struct lod_layout_component *lds =
                                info->lti_def_striping.lds_def_comp_entries;
+       struct ost_pool *inuse = &info->lti_inuse_osts;
 
        /* allocated in lod_get_lov_ea
         * XXX: this is overload, a tread may have such store but used only
@@ -1846,6 +1847,9 @@ static void lod_key_fini(const struct lu_context *ctx,
        if (lds != NULL)
                lod_free_def_comp_entries(&info->lti_def_striping);
 
+       if (inuse->op_size)
+               OBD_FREE(inuse->op_array, inuse->op_size);
+
        OBD_FREE_PTR(info);
 }
 
index 7b2c9a6..7bd32ff 100644 (file)
@@ -240,6 +240,8 @@ struct lod_layout_component {
        __u16                     llc_stripenr;
        __u16                     llc_stripes_allocated;
        char                     *llc_pool;
+       /* ost list specified with LOV_USER_MAGIC_SPECIFIC lum */
+       struct ost_pool           llc_ostlist;
        struct dt_object        **llc_stripe;
 };
 
@@ -369,6 +371,7 @@ struct lod_thread_info {
        struct lu_attr                  lti_attr;
        struct lod_it                   lti_it;
        struct ldlm_res_id              lti_res_id;
+       struct ost_pool                 lti_inuse_osts;
        /* used to hold lu_dirent, sizeof(struct lu_dirent) + NAME_MAX */
        char                            lti_key[sizeof(struct lu_dirent) +
                                                NAME_MAX];
@@ -534,6 +537,24 @@ lod_get_default_lmv_ea(const struct lu_env *env, struct lod_object *lo)
        return lod_get_ea(env, lo, XATTR_NAME_DEFAULT_LMV);
 }
 
+static inline void
+lod_comp_set_init(struct lod_layout_component *entry)
+{
+       entry->llc_flags |= LCME_FL_INIT;
+}
+
+static inline void
+lod_comp_unset_init(struct lod_layout_component *entry)
+{
+       entry->llc_flags &= ~LCME_FL_INIT;
+}
+
+static inline bool
+lod_comp_inited(const struct lod_layout_component *entry)
+{
+       return entry->llc_flags & LCME_FL_INIT;
+}
+
 void lod_fix_desc(struct lov_desc *desc);
 void lod_fix_desc_qos_maxage(__u32 *val);
 void lod_fix_desc_pattern(__u32 *val);
@@ -572,6 +593,18 @@ int lod_pool_new(struct obd_device *obd, char *poolname);
 int lod_pool_add(struct obd_device *obd, char *poolname, char *ostname);
 int lod_pool_remove(struct obd_device *obd, char *poolname, char *ostname);
 
+struct lod_obj_stripe_cb_data {
+       union {
+               const struct lu_attr    *locd_attr;
+               struct ost_pool         *locd_inuse;
+       };
+       bool    locd_declare;
+};
+
+typedef int (*lod_obj_stripe_cb_t)(const struct lu_env *env,
+                                  struct lod_object *lo, struct dt_object *dt,
+                                  struct thandle *th, int stripe_idx,
+                                  struct lod_obj_stripe_cb_data *data);
 /* lod_qos.c */
 int lod_prepare_create(const struct lu_env *env, struct lod_object *lo,
                       struct lu_attr *attr, const struct lu_buf *buf,
@@ -581,6 +614,20 @@ int qos_del_tgt(struct lod_device *, struct lod_tgt_desc *);
 void lod_qos_rr_init(struct lod_qos_rr *lqr);
 int lod_use_defined_striping(const struct lu_env *, struct lod_object *,
                             const struct lu_buf *);
+int lod_obj_stripe_set_inuse_cb(const struct lu_env *env, struct lod_object *lo,
+                               struct dt_object *dt, struct thandle *th,
+                               int stripe_idx,
+                               struct lod_obj_stripe_cb_data *data);
+int lod_qos_parse_config(const struct lu_env *env, struct lod_object *lo,
+                        const struct lu_buf *buf);
+int lod_qos_prep_create(const struct lu_env *env, struct lod_object *lo,
+                       struct lu_attr *attr, struct thandle *th,
+                       int comp_idx, struct ost_pool *inuse);
+__u16 lod_comp_entry_stripecnt(struct lod_object *lo,
+                              struct lod_layout_component *entry,
+                              bool is_dir);
+__u16 lod_get_stripecnt(struct lod_device *lod, struct lod_object *lo,
+                       __u16 stripe_count);
 
 /* lproc_lod.c */
 int lod_procfs_init(struct lod_device *lod);
@@ -599,19 +646,6 @@ int lod_striping_create(const struct lu_env *env, struct dt_object *dt,
                        struct thandle *th);
 void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo);
 
-struct lod_obj_stripe_cb_data {
-       union {
-               const struct lu_attr    *locd_attr;
-               struct ost_pool         *locd_inuse;
-       };
-       bool    locd_declare;
-};
-
-typedef int (*lod_obj_stripe_cb_t)(const struct lu_env *env,
-                                  struct lod_object *lo, struct dt_object *dt,
-                                  struct thandle *th, int stripe_idx,
-                                  struct lod_obj_stripe_cb_data *data);
-
 int lod_obj_for_each_stripe(const struct lu_env *env, struct lod_object *lo,
                            struct thandle *th, lod_obj_stripe_cb_t cb,
                            struct lod_obj_stripe_cb_data *data);
index dceba48..bc1b70d 100644 (file)
@@ -627,6 +627,9 @@ static void lod_free_comp_buffer(struct lod_layout_component *entries,
                entry = &entries[i];
                if (entry->llc_pool != NULL)
                        lod_set_pool(&entry->llc_pool, NULL);
+               if (entry->llc_ostlist.op_array)
+                       OBD_FREE(entry->llc_ostlist.op_array,
+                                entry->llc_ostlist.op_size);
                LASSERT(entry->llc_stripe == NULL);
                LASSERT(entry->llc_stripes_allocated == 0);
        }
@@ -741,6 +744,7 @@ static int lod_gen_component_ea(const struct lu_env *env,
        struct lov_ost_data_v1  *objs;
        struct lod_layout_component *lod_comp;
        __u32   magic;
+       __u16 stripecnt;
        int     i, rc = 0;
        ENTRY;
 
@@ -764,8 +768,12 @@ static int lod_gen_component_ea(const struct lu_env *env,
 
        lmm->lmm_stripe_size = cpu_to_le32(lod_comp->llc_stripe_size);
        lmm->lmm_stripe_count = cpu_to_le16(lod_comp->llc_stripenr);
-       /* for dir, lmm_layout_gen stores default stripe offset. */
-       lmm->lmm_layout_gen = is_dir ?
+       /**
+        * for dir and uninstantiated component, lmm_layout_gen stores
+        * default stripe offset.
+        */
+       lmm->lmm_layout_gen =
+               (is_dir || !lod_comp_inited(lod_comp)) ?
                        cpu_to_le16(lod_comp->llc_stripe_offset) :
                        cpu_to_le16(lod_comp->llc_layout_gen);
 
@@ -780,50 +788,60 @@ static int lod_gen_component_ea(const struct lu_env *env,
                        RETURN(-E2BIG);
                objs = &v3->lmm_objects[0];
        }
+       stripecnt = lod_comp_entry_stripecnt(lo, lod_comp, is_dir);
 
        if (is_dir || lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED)
                GOTO(done, rc = 0);
 
+       /* generate ost_idx of this component stripe */
        lod = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
-       for (i = 0; i < lod_comp->llc_stripenr; i++) {
-               struct dt_object        *object;
-               __u32   ost_idx;
-               int     type = LU_SEQ_RANGE_OST;
-
-               object = lod_comp->llc_stripe[i];
-               LASSERT(object != NULL);
-               info->lti_fid = *lu_object_fid(&object->do_lu);
-
-               if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MULTIPLE_REF) &&
-                   comp_idx == 0) {
-                       if (cfs_fail_val == 0)
-                               cfs_fail_val = info->lti_fid.f_oid;
-                       else if (i == 0)
-                               info->lti_fid.f_oid = cfs_fail_val;
-               }
+       for (i = 0; i < stripecnt; i++) {
+               struct dt_object *object;
+               __u32 ost_idx = (__u32)-1UL;
+               int type = LU_SEQ_RANGE_OST;
+
+               if (lod_comp->llc_stripe && lod_comp->llc_stripe[i]) {
+                       object = lod_comp->llc_stripe[i];
+                       /* instantiated component */
+                       info->lti_fid = *lu_object_fid(&object->do_lu);
+
+                       if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_MULTIPLE_REF) &&
+                           comp_idx == 0) {
+                               if (cfs_fail_val == 0)
+                                       cfs_fail_val = info->lti_fid.f_oid;
+                               else if (i == 0)
+                                       info->lti_fid.f_oid = cfs_fail_val;
+                       }
 
-               rc = fid_to_ostid(&info->lti_fid, &info->lti_ostid);
-               LASSERT(rc == 0);
-
-               ostid_cpu_to_le(&info->lti_ostid, &objs[i].l_ost_oi);
-               objs[i].l_ost_gen = cpu_to_le32(0);
-               if (OBD_FAIL_CHECK(OBD_FAIL_MDS_FLD_LOOKUP))
-                       rc = -ENOENT;
-               else
-                       rc = lod_fld_lookup(env, lod, &info->lti_fid,
-                                           &ost_idx, &type);
-               if (rc < 0) {
-                       CERROR("%s: Can not locate "DFID": rc = %d\n",
-                              lod2obd(lod)->obd_name, PFID(&info->lti_fid),
-                              rc);
-                       RETURN(rc);
+                       rc = fid_to_ostid(&info->lti_fid, &info->lti_ostid);
+                       LASSERT(rc == 0);
+
+                       ostid_cpu_to_le(&info->lti_ostid, &objs[i].l_ost_oi);
+                       objs[i].l_ost_gen = cpu_to_le32(0);
+                       if (OBD_FAIL_CHECK(OBD_FAIL_MDS_FLD_LOOKUP))
+                               rc = -ENOENT;
+                       else
+                               rc = lod_fld_lookup(env, lod, &info->lti_fid,
+                                                   &ost_idx, &type);
+                       if (rc < 0) {
+                               CERROR("%s: Can not locate "DFID": rc = %d\n",
+                                      lod2obd(lod)->obd_name,
+                                      PFID(&info->lti_fid), rc);
+                               RETURN(rc);
+                       }
+               } else if (lod_comp->llc_ostlist.op_array) {
+                       /* user specified ost list */
+                       ost_idx = lod_comp->llc_ostlist.op_array[i];
                }
+               /*
+                * with un-instantiated or with no specified ost list
+                * component, its l_ost_idx does not matter.
+                */
                objs[i].l_ost_idx = cpu_to_le32(ost_idx);
        }
 done:
        if (lmm_size != NULL)
-               *lmm_size = lov_mds_md_size(is_dir ?
-                               0 : lod_comp->llc_stripenr, magic);
+               *lmm_size = lov_mds_md_size(stripecnt, magic);
        RETURN(rc);
 }
 
@@ -940,8 +958,8 @@ int lod_generate_lovea(const struct lu_env *env, struct lod_object *lo,
                                GOTO(out, rc = -ERANGE);
                }
                lcme->lcme_id = cpu_to_le32(lod_comp->llc_id);
-               /* component must has been inistantiated */
-               LASSERT(ergo(!is_dir, lod_comp->llc_flags & LCME_FL_INIT));
+
+               /* component could be un-inistantiated */
                lcme->lcme_flags = cpu_to_le32(lod_comp->llc_flags);
                lcme->lcme_extent.e_start =
                        cpu_to_le64(lod_comp->llc_extent.e_start);
@@ -980,7 +998,8 @@ out:
  * \param[in] lo               LOD object
  * \param[in] name             name of the EA
  *
- * \retval                     0 if EA is fetched successfully
+ * \retval                     > 0 if EA is fetched successfully
+ * \retval                     0 if EA is empty
  * \retval                     negative error number on failure
  */
 int lod_get_ea(const struct lu_env *env, struct lod_object *lo,
@@ -1175,7 +1194,7 @@ int lod_parse_striping(const struct lu_env *env, struct lod_object *lo,
        struct lov_comp_md_v1   *comp_v1 = NULL;
        struct lov_ost_data_v1  *objs;
        __u32   magic, pattern;
-       int     i, rc = 0;
+       int     i, j, rc = 0;
        __u16   comp_cnt;
        ENTRY;
 
@@ -1231,7 +1250,7 @@ int lod_parse_striping(const struct lu_env *env, struct lod_object *lo,
                        if (lod_comp->llc_id == LCME_ID_INVAL)
                                GOTO(out, rc = -EINVAL);
                } else {
-                       lod_comp->llc_flags = LCME_FL_INIT;
+                       lod_comp_set_init(lod_comp);
                }
 
                pattern = le32_to_cpu(lmm->lmm_pattern);
@@ -1251,6 +1270,34 @@ int lod_parse_striping(const struct lu_env *env, struct lod_object *lo,
                        objs = &lmm->lmm_objects[0];
                }
 
+               /**
+                * If uninstantiated template component has valid l_ost_idx,
+                * then use has specified ost list for this component.
+                */
+               if (!lod_comp_inited(lod_comp) &&
+                   objs[0].l_ost_idx != (__u32)-1UL) {
+                       /**
+                        * load the user specified ost list, when this
+                        * component is instantiated later, it will be used
+                        * in lod_alloc_ost_list().
+                        */
+                       lod_comp->llc_ostlist.op_count = lod_comp->llc_stripenr;
+                       lod_comp->llc_ostlist.op_size =
+                                       lod_comp->llc_stripenr * sizeof(__u32);
+                       OBD_ALLOC(lod_comp->llc_ostlist.op_array,
+                                 lod_comp->llc_ostlist.op_size);
+                       if (!lod_comp->llc_ostlist.op_array)
+                               GOTO(out, rc = -ENOMEM);
+
+                       for (j = 0; j < lod_comp->llc_stripenr; j++)
+                               lod_comp->llc_ostlist.op_array[j] =
+                                               le32_to_cpu(objs[j].l_ost_idx);
+               }
+
+               /* skip un-instantiated component object initialization */
+               if (!lod_comp_inited(lod_comp))
+                       continue;
+
                if (!(lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED)) {
                        rc = lod_initialize_objects(env, lo, objs, i);
                        if (rc)
index 93e9e1f..b955a44 100644 (file)
@@ -2162,6 +2162,57 @@ static int lod_object_replace_parent_fid(const struct lu_env *env,
        RETURN(rc);
 }
 
+inline __u16 lod_comp_entry_stripecnt(struct lod_object *lo,
+                                     struct lod_layout_component *entry,
+                                     bool is_dir)
+{
+       if (is_dir)
+               return  0;
+       else if (lod_comp_inited(entry))
+               return entry->llc_stripenr;
+       else
+               return lod_get_stripecnt(lu2lod_dev(lod2lu_obj(lo)->lo_dev), lo,
+                                        entry->llc_stripenr);
+}
+
+static int lod_comp_md_size(struct lod_object *lo, bool is_dir)
+{
+       int magic, size = 0, i;
+       struct lod_layout_component *comp_entries;
+       __u16 comp_cnt;
+       bool is_composite;
+
+       if (is_dir) {
+               comp_cnt = lo->ldo_def_striping->lds_def_comp_cnt;
+               comp_entries = lo->ldo_def_striping->lds_def_comp_entries;
+               is_composite =
+                       lo->ldo_def_striping->lds_def_striping_is_composite;
+       } else {
+               comp_cnt = lo->ldo_comp_cnt;
+               comp_entries = lo->ldo_comp_entries;
+               is_composite = lo->ldo_is_composite;
+       }
+
+
+       LASSERT(comp_cnt != 0 && comp_entries != NULL);
+       if (is_composite) {
+               size = sizeof(struct lov_comp_md_v1) +
+                      sizeof(struct lov_comp_md_entry_v1) * comp_cnt;
+               LASSERT(size % sizeof(__u64) == 0);
+       }
+
+       for (i = 0; i < comp_cnt; i++) {
+               __u16 stripenr;
+
+               magic = comp_entries[i].llc_pool ? LOV_MAGIC_V3 : LOV_MAGIC_V1;
+               stripenr = lod_comp_entry_stripecnt(lo, &comp_entries[i],
+                                                   is_dir);
+               size += lov_user_md_size(stripenr, magic);
+               LASSERT(size % sizeof(__u64) == 0);
+       }
+       return size;
+}
+
 /**
  * Declare component add. The xattr name is XATTR_LUSTRE_LOV.add, and
  * the xattr value is binary lov_comp_md_v1 which contains component(s)
@@ -2180,14 +2231,14 @@ static int lod_declare_layout_add(const struct lu_env *env,
                                  const struct lu_buf *buf,
                                  struct thandle *th)
 {
+       struct lod_thread_info  *info = lod_env_info(env);
        struct lod_layout_component *comp_array, *lod_comp;
        struct lod_device       *d = lu2lod_dev(dt->do_lu.lo_dev);
+       struct dt_object *next = dt_object_child(dt);
        struct lov_desc         *desc = &d->lod_desc;
        struct lod_object       *lo = lod_dt_obj(dt);
-       struct lov_user_md_v1   *v1;
        struct lov_user_md_v3   *v3;
        struct lov_comp_md_v1   *comp_v1 = buf->lb_buf;
-       struct lu_extent        *ext;
        __u32   magic;
        __u64   prev_end;
        int     i, rc, array_cnt;
@@ -2195,27 +2246,12 @@ static int lod_declare_layout_add(const struct lu_env *env,
 
        LASSERT(lo->ldo_is_composite);
 
-       magic = comp_v1->lcm_magic;
-       /* Replay request, see comment for LOV_MAGIC_DEF */
-       if (unlikely(le32_to_cpu(magic) == LOV_MAGIC_COMP_V1_DEF)) {
-               struct dt_object *next = dt_object_child(dt);
-
-               lod_object_free_striping(env, lo);
-               rc = lod_use_defined_striping(env, lo, buf);
-               if (rc == 0) {
-                       lo->ldo_comp_cached = 1;
-                       rc = lod_sub_object_declare_xattr_set(env, next, buf,
-                                                             XATTR_NAME_LOV,
-                                                             0, th);
-               }
-               RETURN(rc);
-       }
-
        prev_end = lo->ldo_comp_entries[lo->ldo_comp_cnt - 1].llc_extent.e_end;
        rc = lod_verify_striping(d, buf, false, prev_end);
        if (rc != 0)
                RETURN(rc);
 
+       magic = comp_v1->lcm_magic;
        if (magic == __swab32(LOV_USER_MAGIC_COMP_V1)) {
                lustre_swab_lov_comp_md_v1(comp_v1);
                magic = comp_v1->lcm_magic;
@@ -2233,6 +2269,9 @@ static int lod_declare_layout_add(const struct lu_env *env,
               sizeof(*comp_array) * lo->ldo_comp_cnt);
 
        for (i = 0; i < comp_v1->lcm_entry_count; i++) {
+               struct lov_user_md_v1 *v1;
+               struct lu_extent *ext;
+
                v1 = (struct lov_user_md *)((char *)comp_v1 +
                                comp_v1->lcm_entries[i].lcme_offset);
                ext = &comp_v1->lcm_entries[i].lcme_extent;
@@ -2243,23 +2282,21 @@ static int lod_declare_layout_add(const struct lu_env *env,
                lod_comp->llc_stripe_offset = v1->lmm_stripe_offset;
 
                lod_comp->llc_stripenr = v1->lmm_stripe_count;
-               if (lod_comp->llc_stripenr <= 0)
+               if (!lod_comp->llc_stripenr ||
+                   lod_comp->llc_stripenr == (__u16)-1)
                        lod_comp->llc_stripenr = desc->ld_default_stripe_count;
                lod_comp->llc_stripe_size = v1->lmm_stripe_size;
-               if (lod_comp->llc_stripe_size <= 0)
+               if (!lod_comp->llc_stripe_size)
                        lod_comp->llc_stripe_size =
                                desc->ld_default_stripe_size;
 
                if (v1->lmm_magic == LOV_USER_MAGIC_V3) {
-                       int len;
                        v3 = (struct lov_user_md_v3 *) v1;
                        if (v3->lmm_pool_name[0] != '\0') {
-                               len = strlen(v3->lmm_pool_name);
-                               OBD_ALLOC(lod_comp->llc_pool, len + 1);
-                               if (lod_comp->llc_pool == NULL)
-                                       GOTO(error, rc = -ENOMEM);
-                               strncpy(lod_comp->llc_pool, v3->lmm_pool_name,
-                                       len + 1);
+                               rc = lod_set_pool(&lod_comp->llc_pool,
+                                                 v3->lmm_pool_name);
+                               if (rc)
+                                       GOTO(error, rc);
                        }
                }
        }
@@ -2270,8 +2307,13 @@ static int lod_declare_layout_add(const struct lu_env *env,
        /* No need to increase layout generation here, it will be increased
         * later when generating component ID for the new components */
 
-       rc = lod_declare_striped_object(env, dt, NULL, NULL, th);
-       RETURN(rc);
+       info->lti_buf.lb_len = lod_comp_md_size(lo, false);
+       rc = lod_sub_object_declare_xattr_set(env, next, &info->lti_buf,
+                                             XATTR_NAME_LOV, 0, th);
+       if (rc)
+               GOTO(error, rc);
+
+       RETURN(0);
 
 error:
        for (i = lo->ldo_comp_cnt; i < array_cnt; i++) {
@@ -2286,43 +2328,6 @@ error:
        RETURN(rc);
 }
 
-static int lod_comp_md_size(struct lod_object *lo, bool is_dir)
-{
-       int magic, size = 0, i;
-       struct lod_layout_component *comp_entries;
-       __u16 comp_cnt;
-       bool is_composite;
-
-       if (is_dir) {
-               comp_cnt = lo->ldo_def_striping->lds_def_comp_cnt;
-               comp_entries = lo->ldo_def_striping->lds_def_comp_entries;
-               is_composite =
-                       lo->ldo_def_striping->lds_def_striping_is_composite;
-       } else {
-               comp_cnt = lo->ldo_comp_cnt;
-               comp_entries = lo->ldo_comp_entries;
-               is_composite = lo->ldo_is_composite;
-       }
-
-
-       LASSERT(comp_cnt != 0 && comp_entries != NULL);
-       if (is_composite) {
-               size = sizeof(struct lov_comp_md_v1) +
-                      sizeof(struct lov_comp_md_entry_v1) * comp_cnt;
-               LASSERT(size % sizeof(__u64) == 0);
-       }
-
-       for (i = 0; i < comp_cnt; i++) {
-               magic = comp_entries[i].llc_pool ? LOV_MAGIC_V3 : LOV_MAGIC_V1;
-
-               size += lov_user_md_size(
-                       is_dir ? 0 : comp_entries[i].llc_stripenr,
-                       magic);
-               LASSERT(size % sizeof(__u64) == 0);
-       }
-       return size;
-}
-
 /**
  * Declare component set. The xattr is name XATTR_LUSTRE_LOV.set.$field,
  * the '$field' can only be 'flags' now. The xattr value is binary
@@ -3280,6 +3285,12 @@ static int lod_layout_del(const struct lu_env *env, struct dt_object *dt,
                lod_comp->llc_stripe = NULL;
                lod_comp->llc_stripes_allocated = 0;
                lod_obj_set_pool(lo, i, NULL);
+               if (lod_comp->llc_ostlist.op_array) {
+                       OBD_FREE(lod_comp->llc_ostlist.op_array,
+                                lod_comp->llc_ostlist.op_size);
+                       lod_comp->llc_ostlist.op_array = NULL;
+                       lod_comp->llc_ostlist.op_size = 0;
+               }
        }
 
        LASSERTF(left >= 0 && left < lo->ldo_comp_cnt, "left = %d\n", left);
@@ -4307,15 +4318,16 @@ int lod_striping_create(const struct lu_env *env, struct dt_object *dt,
        for (i = 0; i < lo->ldo_comp_cnt; i++) {
                lod_comp = &lo->ldo_comp_entries[i];
 
-               if (lod_comp->llc_flags & LCME_FL_INIT)
+               if (lod_comp_inited(lod_comp))
                        continue;
 
-               lod_comp->llc_flags |= LCME_FL_INIT;
+               if (lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED)
+                       lod_comp_set_init(lod_comp);
 
                if (lod_comp->llc_stripe == NULL)
                        continue;
 
-               LASSERT(lod_comp->llc_stripenr > 0);
+               LASSERT(lod_comp->llc_stripenr);
                for (j = 0; j < lod_comp->llc_stripenr; j++) {
                        struct dt_object *object = lod_comp->llc_stripe[j];
                        LASSERT(object != NULL);
@@ -4324,6 +4336,7 @@ int lod_striping_create(const struct lu_env *env, struct dt_object *dt,
                        if (rc)
                                break;
                }
+               lod_comp_set_init(lod_comp);
        }
 
        if (rc == 0)
@@ -4817,6 +4830,209 @@ static int lod_invalidate(const struct lu_env *env, struct dt_object *dt)
        return dt_invalidate(env, dt_object_child(dt));
 }
 
+/**
+ * Resize per-thread ost list to hold OST target index list already used.
+ *
+ * \param[in,out] inuse                structure contains ost list array
+ * \param[in] cnt              total stripe count of all components
+ * \param[in] max              array's max size if @max > 0
+ *
+ * \retval 0           on success
+ * \retval -ENOMEM     reallocation failed
+ */
+int lod_inuse_resize(struct ost_pool *inuse, __u16 cnt, __u16 max)
+{
+       __u32 *array;
+       __u32 new = cnt * sizeof(__u32);
+
+       inuse->op_count = 0;
+
+       if (new <= inuse->op_size)
+               return 0;
+
+       if (max)
+               new = min_t(__u32, new, max);
+       OBD_ALLOC(array, new);
+       if (!array)
+               return -ENOMEM;
+
+       if (inuse->op_array)
+               OBD_FREE(inuse->op_array, inuse->op_size);
+
+       inuse->op_array = array;
+       inuse->op_size = new;
+
+       return 0;
+}
+
+static int lod_declare_layout_change(const struct lu_env *env,
+                                    struct dt_object *dt,
+                                    struct layout_intent *layout,
+                                    const struct lu_buf *buf,
+                                    struct thandle *th)
+{
+       struct lod_thread_info  *info = lod_env_info(env);
+       struct lod_object *lo = lod_dt_obj(dt);
+       struct lod_device *d = lu2lod_dev(dt->do_lu.lo_dev);
+       struct dt_object *next = dt_object_child(dt);
+       struct lod_obj_stripe_cb_data data;
+       struct ost_pool *inuse = &info->lti_inuse_osts;
+       struct lod_layout_component *lod_comp;
+       struct lov_comp_md_v1 *comp_v1 = NULL;
+       bool replay = false;
+       bool need_create = false;
+       int i, rc;
+       __u32 stripe_cnt = 0;
+       ENTRY;
+
+       if (!S_ISREG(dt->do_lu.lo_header->loh_attr) || !dt_object_exists(dt) ||
+           dt_object_remote(next))
+               RETURN(-EINVAL);
+
+       dt_write_lock(env, next, 0);
+       /*
+        * In case the client is passing lovea, which only happens during
+        * the replay of layout intent write RPC for now, we may need to
+        * parse the lovea and apply new layout configuration.
+        */
+       if (buf && buf->lb_len)  {
+               struct lov_user_md_v1 *v1 = buf->lb_buf;
+
+               if (v1->lmm_magic != (LOV_MAGIC_DEF | LOV_MAGIC_COMP_V1) &&
+                   v1->lmm_magic !=
+                               __swab32(LOV_MAGIC_DEF | LOV_MAGIC_COMP_V1)) {
+                       CERROR("%s: the replay buffer of layout extend "
+                              "(magic %#x) does not contain expected "
+                              "composite layout.\n",
+                              lod2obd(d)->obd_name, v1->lmm_magic);
+                       GOTO(out, rc = -EINVAL);
+               }
+
+               lod_object_free_striping(env, lo);
+               rc = lod_use_defined_striping(env, lo, buf);
+               if (rc)
+                       GOTO(out, rc);
+
+               rc = lod_get_lov_ea(env, lo);
+               if (rc <= 0)
+                       GOTO(out, rc);
+               /* old on-disk EA is stored in info->lti_buf */
+               comp_v1 = (struct lov_comp_md_v1 *)&info->lti_buf.lb_buf;
+               replay = true;
+       } else {
+               /* non replay path */
+               rc = lod_load_striping_locked(env, lo);
+               if (rc)
+                       GOTO(out, rc);
+
+               /* Prepare inuse array for composite file */
+               for (i = 0; i < lo->ldo_comp_cnt; i++)
+                       stripe_cnt += lod_comp_entry_stripecnt(lo,
+                                               &lo->ldo_comp_entries[i],
+                                               false);
+               rc = lod_inuse_resize(inuse, stripe_cnt, d->lod_osd_max_easize);
+               if (rc)
+                       GOTO(out, rc);
+
+               data.locd_inuse = inuse;
+               rc = lod_obj_for_each_stripe(env, lo, NULL,
+                                            lod_obj_stripe_set_inuse_cb,
+                                            &data);
+               if (rc)
+                       GOTO(out, rc);
+       }
+
+       /* Make sure defined layout covers the requested write range. */
+       lod_comp = &lo->ldo_comp_entries[lo->ldo_comp_cnt - 1];
+       if ((lod_comp->llc_extent.e_end != OBD_OBJECT_EOF &&
+            lod_comp->llc_extent.e_end < layout->li_end)) {
+               CDEBUG(replay ? D_ERROR : D_LAYOUT,
+                      "%s: the defined layout [0, %#llx) does not covers "
+                      "the write range [%#llx, %#llx).\n",
+                      lod2obd(d)->obd_name, lod_comp->llc_extent.e_end,
+                      layout->li_start, layout->li_end);
+               GOTO(out, rc = -EINVAL);
+       }
+
+       /*
+        * Iterate ld->ldo_comp_entries, find the component whose extent under
+        * the write range and not instantianted.
+        */
+       for (i = 0; i < lo->ldo_comp_cnt; i++) {
+               lod_comp = &lo->ldo_comp_entries[i];
+
+               if (lod_comp->llc_extent.e_start >= layout->li_end)
+                       break;
+
+               if (!replay) {
+                       if (lod_comp_inited(lod_comp))
+                               continue;
+               } else {
+                       /**
+                        * In replay path, lod_comp is the EA passed by
+                        * client replay buffer,  comp_v1 is the pre-recovery
+                        * on-disk EA, we'd sift out those components which
+                        * were init-ed in the on-disk EA.
+                        */
+                       if (le32_to_cpu(comp_v1->lcm_entries[i].lcme_flags) &
+                           LCME_FL_INIT)
+                               continue;
+               }
+               /*
+                * this component hasn't instantiated in normal path, or during
+                * replay it needs replay the instantiation.
+                */
+
+               /* A released component is being extended */
+               if (lod_comp->llc_pattern & LOV_PATTERN_F_RELEASED)
+                       GOTO(out, rc = -EINVAL);
+
+               need_create = true;
+               /*
+                * In replay, the component EA is passed by client,
+                * Clear LCME_FL_INIT so that lod_striping_create() can create
+                * the striping objects.
+                */
+               if (replay)
+                       lod_comp_unset_init(lod_comp);
+
+               rc = lod_qos_prep_create(env, lo, NULL, th, i, inuse);
+               if (rc)
+                       break;
+       }
+
+       if (need_create)
+               lod_obj_inc_layout_gen(lo);
+       else
+               GOTO(unlock, rc = -EALREADY);
+
+       if (!rc) {
+               info->lti_buf.lb_len = lod_comp_md_size(lo, false);
+               rc = lod_sub_object_declare_xattr_set(env, next, &info->lti_buf,
+                                                     XATTR_NAME_LOV, 0, th);
+       }
+out:
+       if (rc)
+               lod_object_free_striping(env, lo);
+
+unlock:
+       dt_write_unlock(env, next);
+
+       RETURN(rc);
+}
+
+/**
+ * Instantiate layout component objects which covers the intent write offset.
+ */
+static int lod_layout_change(const struct lu_env *env, struct dt_object *dt,
+                            struct layout_intent *layout,
+                            const struct lu_buf *buf, struct thandle *th)
+{
+       struct lu_attr *attr = &lod_env_info(env)->lti_attr;
+
+       RETURN(lod_striping_create(env, dt, attr, NULL, th));
+}
+
 struct dt_object_operations lod_obj_ops = {
        .do_read_lock           = lod_object_read_lock,
        .do_write_lock          = lod_object_write_lock,
@@ -4846,6 +5062,8 @@ struct dt_object_operations lod_obj_ops = {
        .do_object_lock         = lod_object_lock,
        .do_object_unlock       = lod_object_unlock,
        .do_invalidate          = lod_invalidate,
+       .do_declare_layout_change = lod_declare_layout_change,
+       .do_layout_change       = lod_layout_change,
 };
 
 /**
index 4930dfa..89850ab 100644 (file)
@@ -1104,7 +1104,6 @@ out:
  * \param[in] env      execution environment for this thread
  * \param[in] lo       LOD object
  * \param[out] stripe  striping created
- * \param[in] lum      stripe md to specify list of OSTs
  * \param[in] th       transaction handle
  * \param[in] comp_idx index of ldo_comp_entries
  * \param[in|out] inuse        array of inuse ost index
@@ -1114,16 +1113,14 @@ out:
  * \retval -EINVAL     requested OST index is invalid
  * \retval negative    negated errno on error
  */
-static int lod_alloc_ost_list(const struct lu_env *env,
-                             struct lod_object *lo, struct dt_object **stripe,
-                             struct lov_user_md *lum, struct thandle *th,
+static int lod_alloc_ost_list(const struct lu_env *env, struct lod_object *lo,
+                             struct dt_object **stripe, struct thandle *th,
                              int comp_idx, struct ost_pool *inuse)
 {
        struct lod_layout_component *lod_comp;
        struct lod_device       *m = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
        struct obd_statfs       *sfs = &lod_env_info(env)->lti_osfs;
        struct dt_object        *o;
-       struct lov_user_md_v3   *v3;
        unsigned int            array_idx = 0;
        int                     stripe_count = 0;
        int                     i;
@@ -1131,20 +1128,17 @@ static int lod_alloc_ost_list(const struct lu_env *env,
        ENTRY;
 
        /* for specific OSTs layout */
-       LASSERT(lum != NULL && lum->lmm_magic == LOV_USER_MAGIC_SPECIFIC);
-       lustre_print_user_md(D_OTHER, lum, __func__);
-
        LASSERT(lo->ldo_comp_cnt > comp_idx && lo->ldo_comp_entries != NULL);
        lod_comp = &lo->ldo_comp_entries[comp_idx];
+       LASSERT(lod_comp->llc_ostlist.op_array);
 
        rc = lod_qos_ost_in_use_clear(env, lod_comp->llc_stripenr);
        if (rc < 0)
                RETURN(rc);
 
-       v3 = (struct lov_user_md_v3 *)lum;
        for (i = 0; i < lod_comp->llc_stripenr; i++) {
-               if (v3->lmm_objects[i].l_ost_idx ==
-                               lod_comp->llc_stripe_offset) {
+               if (lod_comp->llc_ostlist.op_array[i] ==
+                   lod_comp->llc_stripe_offset) {
                        array_idx = i;
                        break;
                }
@@ -1158,7 +1152,7 @@ static int lod_alloc_ost_list(const struct lu_env *env,
 
        for (i = 0; i < lod_comp->llc_stripenr;
             i++, array_idx = (array_idx + 1) % lod_comp->llc_stripenr) {
-               __u32 ost_idx = v3->lmm_objects[array_idx].l_ost_idx;
+               __u32 ost_idx = lod_comp->llc_ostlist.op_array[array_idx];
 
                if (!cfs_bitmap_check(m->lod_ost_bitmap, ost_idx)) {
                        rc = -ENODEV;
@@ -1663,8 +1657,8 @@ out_nolock:
  *
  * \retval             the maximum usable stripe count
  */
-static __u16 lod_get_stripecnt(struct lod_device *lod, struct lod_object *lo,
-                              __u16 stripe_count)
+__u16 lod_get_stripecnt(struct lod_device *lod, struct lod_object *lo,
+                       __u16 stripe_count)
 {
        __u32 max_stripes = LOV_MAX_STRIPE_COUNT_OLD;
 
@@ -1826,9 +1820,8 @@ out:
  * \retval 0           on success
  * \retval negative    negated errno on error
  */
-static int lod_qos_parse_config(const struct lu_env *env,
-                               struct lod_object *lo,
-                               const struct lu_buf *buf)
+int lod_qos_parse_config(const struct lu_env *env, struct lod_object *lo,
+                        const struct lu_buf *buf)
 {
        struct lod_layout_component *lod_comp;
        struct lod_device       *d = lu2lod_dev(lod2lu_obj(lo)->lo_dev);
@@ -1927,15 +1920,31 @@ static int lod_qos_parse_config(const struct lu_env *env,
                pool_name = NULL;
                if (v1->lmm_magic == LOV_USER_MAGIC_V3 ||
                    v1->lmm_magic == LOV_USER_MAGIC_SPECIFIC) {
-                       v3 = (struct lov_user_md_v3 *)v1;
+                       int j;
 
+                       v3 = (struct lov_user_md_v3 *)v1;
                        if (v3->lmm_pool_name[0] != '\0')
                                pool_name = v3->lmm_pool_name;
 
-                       if (v3->lmm_magic == LOV_USER_MAGIC_SPECIFIC &&
-                           v3->lmm_stripe_offset == LOV_OFFSET_DEFAULT)
-                               v3->lmm_stripe_offset =
-                                       v3->lmm_objects[0].l_ost_idx;
+                       if (v3->lmm_magic == LOV_USER_MAGIC_SPECIFIC) {
+                               if (v3->lmm_stripe_offset == LOV_OFFSET_DEFAULT)
+                                       v3->lmm_stripe_offset =
+                                               v3->lmm_objects[0].l_ost_idx;
+
+                               /* copy ost list from lmm */
+                               lod_comp->llc_ostlist.op_count =
+                                       v3->lmm_stripe_count;
+                               lod_comp->llc_ostlist.op_size =
+                                       v3->lmm_stripe_count * sizeof(__u32);
+                               OBD_ALLOC(lod_comp->llc_ostlist.op_array,
+                                         lod_comp->llc_ostlist.op_size);
+                               if (!lod_comp->llc_ostlist.op_array)
+                                       GOTO(free_comp, rc = -ENOMEM);
+
+                               for (j = 0; j < v3->lmm_stripe_count; j++)
+                                       lod_comp->llc_ostlist.op_array[j] =
+                                               v3->lmm_objects[j].l_ost_idx;
+                       }
                }
 
                if (v1->lmm_pattern == 0)
@@ -1943,18 +1952,17 @@ static int lod_qos_parse_config(const struct lu_env *env,
                if (lov_pattern(v1->lmm_pattern) != LOV_PATTERN_RAID0) {
                        CDEBUG(D_LAYOUT, "%s: invalid pattern: %x\n",
                               lod2obd(d)->obd_name, v1->lmm_pattern);
-                       lod_free_comp_entries(lo);
-                       RETURN(-EINVAL);
+                       GOTO(free_comp, rc = -EINVAL);
                }
 
                lod_comp->llc_pattern = v1->lmm_pattern;
 
                lod_comp->llc_stripe_size = desc->ld_default_stripe_size;
-               if (v1->lmm_stripe_size > 0)
+               if (v1->lmm_stripe_size)
                        lod_comp->llc_stripe_size = v1->lmm_stripe_size;
 
                lod_comp->llc_stripenr = desc->ld_default_stripe_count;
-               if (v1->lmm_stripe_count > 0)
+               if (v1->lmm_stripe_count)
                        lod_comp->llc_stripenr = v1->lmm_stripe_count;
 
                lod_comp->llc_stripe_offset = v1->lmm_stripe_offset;
@@ -1978,8 +1986,7 @@ static int lod_qos_parse_config(const struct lu_env *env,
                                CDEBUG(D_LAYOUT, "%s: invalid offset, %u\n",
                                       lod2obd(d)->obd_name,
                                       lod_comp->llc_stripe_offset);
-                               lod_free_comp_entries(lo);
-                               RETURN(-EINVAL);
+                               GOTO(free_comp, rc = -EINVAL);
                        }
                }
 
@@ -1990,13 +1997,16 @@ static int lod_qos_parse_config(const struct lu_env *env,
        }
 
        RETURN(0);
+
+free_comp:
+       lod_free_comp_entries(lo);
+       RETURN(rc);
 }
 
 /**
  * Create a striping for an obejct.
  *
- * The function creates a new striping for the object. A buffer containing
- * configuration hints can be provided optionally. The function tries QoS
+ * The function creates a new striping for the object. The function tries QoS
  * algorithm first unless free space is distributed evenly among OSTs, but
  * by default RR algorithm is preferred due to internal concurrency (QoS is
  * serialized). The caller must ensure no concurrent calls to the function
@@ -2005,18 +2015,16 @@ static int lod_qos_parse_config(const struct lu_env *env,
  * \param[in] env      execution environment for this thread
  * \param[in] lo       LOD object
  * \param[in] attr     attributes OST objects will be declared with
- * \param[in] buf      suggested striping configuration or NULL
  * \param[in] th       transaction handle
  * \param[in] comp_idx index of ldo_comp_entries
- * \param[in|out]inuse array of inuse ost index
+ * \param[in|out] inuse        array of inuse ost index
  *
  * \retval 0           on success
  * \retval negative    negated errno on error
  */
-static int lod_qos_prep_create(const struct lu_env *env, struct lod_object *lo,
-                              struct lu_attr *attr, const struct lu_buf *buf,
-                              struct thandle *th, int comp_idx,
-                              struct ost_pool *inuse)
+int lod_qos_prep_create(const struct lu_env *env, struct lod_object *lo,
+                       struct lu_attr *attr, struct thandle *th,
+                       int comp_idx, struct ost_pool *inuse)
 {
        struct lod_layout_component *lod_comp;
        struct lod_device      *d = lu2lod_dev(lod2lu_obj(lo)->lo_dev);
@@ -2035,12 +2043,10 @@ static int lod_qos_prep_create(const struct lu_env *env, struct lod_object *lo,
                RETURN(0);
 
        if (likely(lod_comp->llc_stripe == NULL)) {
-               struct lov_user_md *lum = NULL;
-
                /*
                 * no striping has been created so far
                 */
-               LASSERT(lod_comp->llc_stripenr > 0);
+               LASSERT(lod_comp->llc_stripenr);
                /*
                 * statfs and check OST targets now, since ld_active_tgt_count
                 * could be changed if some OSTs are [de]activated manually.
@@ -2059,20 +2065,9 @@ static int lod_qos_prep_create(const struct lu_env *env, struct lod_object *lo,
                CDEBUG(D_OTHER, "tgt_count %d stripenr %d\n",
                                d->lod_desc.ld_tgt_count, stripe_len);
 
-               if (buf != NULL && buf->lb_buf != NULL) {
-                       lum = buf->lb_buf;
-                       if (lum->lmm_magic == LOV_USER_MAGIC_COMP_V1) {
-                               struct lov_comp_md_v1 *comp_v1;
-
-                               comp_v1 = (struct lov_comp_md_v1 *)lum;
-                               lum = (struct lov_user_md *)((char *)comp_v1 +
-                               comp_v1->lcm_entries[comp_idx].lcme_offset);
-                       }
-               }
-
-               if (lum != NULL && lum->lmm_magic == LOV_USER_MAGIC_SPECIFIC) {
-                       rc = lod_alloc_ost_list(env, lo, stripe, lum, th,
-                                               comp_idx, inuse);
+               if (lod_comp->llc_ostlist.op_array) {
+                       rc = lod_alloc_ost_list(env, lo, stripe, th, comp_idx,
+                                               inuse);
                } else if (lod_comp->llc_stripe_offset == LOV_OFFSET_DEFAULT) {
                        rc = lod_alloc_qos(env, lo, stripe, flag, th,
                                           comp_idx, inuse);
@@ -2121,10 +2116,11 @@ out:
        RETURN(rc);
 }
 
-static int
-lod_obj_stripe_set_inuse_cb(const struct lu_env *env, struct lod_object *lo,
-                           struct dt_object *dt, struct thandle *th,
-                           int stripe_idx, struct lod_obj_stripe_cb_data *data)
+int lod_obj_stripe_set_inuse_cb(const struct lu_env *env,
+                               struct lod_object *lo,
+                               struct dt_object *dt, struct thandle *th,
+                               int stripe_idx,
+                               struct lod_obj_stripe_cb_data *data)
 {
        struct lod_thread_info  *info = lod_env_info(env);
        struct lod_device       *d = lu2lod_dev(lod2lu_obj(lo)->lo_dev);
@@ -2148,9 +2144,9 @@ int lod_prepare_create(const struct lu_env *env, struct lod_object *lo,
                       struct thandle *th)
 
 {
-       struct lod_device       *d = lu2lod_dev(lod2lu_obj(lo)->lo_dev);
-       struct ost_pool inuse;
-       int     i, rc, comp_cnt;
+       struct lod_device *d = lu2lod_dev(lod2lu_obj(lo)->lo_dev);
+       struct ost_pool inuse = { 0 };
+       int rc;
        ENTRY;
 
        LASSERT(lo);
@@ -2173,40 +2169,8 @@ int lod_prepare_create(const struct lu_env *env, struct lod_object *lo,
        if (rc)
                RETURN(rc);
 
-       memset(&inuse, 0, sizeof(inuse));
-       init_rwsem(&inuse.op_rw_sem);
-       comp_cnt = lo->ldo_comp_cnt;
-
-       /* Prepare inuse array for composite file */
-       if (lo->ldo_is_composite) {
-               struct lod_obj_stripe_cb_data   data;
-
-               inuse.op_size = comp_cnt * LOV_MAX_STRIPE_COUNT_OLD *
-                               sizeof(__u32);
-               if (d->lod_osd_max_easize > 0 &&
-                   inuse.op_size > d->lod_osd_max_easize)
-                       inuse.op_size = d->lod_osd_max_easize;
-               OBD_ALLOC(inuse.op_array, inuse.op_size);
-               if (inuse.op_array == NULL)
-                       RETURN(-ENOMEM);
-
-               data.locd_inuse = &inuse;
-               rc = lod_obj_for_each_stripe(env, lo, NULL,
-                               lod_obj_stripe_set_inuse_cb, &data);
-               if (rc) {
-                       OBD_FREE(inuse.op_array, inuse.op_size);
-                       RETURN(rc);
-               }
-       }
-
-       /* prepare OST object creation */
-       for (i = 0; i < comp_cnt; i++) {
-               rc = lod_qos_prep_create(env, lo, attr, buf, th, i, &inuse);
-               if (rc)
-                       break;
-       }
+       /* prepare OST object creation for the 1st comp. */
+       rc = lod_qos_prep_create(env, lo, attr, th, 0, &inuse);
 
-       if (inuse.op_size)
-               OBD_FREE(inuse.op_array, inuse.op_size);
        RETURN(rc);
 }
index 97404f6..b6a55b7 100644 (file)
@@ -374,30 +374,6 @@ static int mdc_xattr_common(struct obd_export *exp,const struct req_format *fmt,
        if (opcode == MDS_REINT)
                mdc_put_mod_rpc_slot(req, NULL);
 
-       /* For XATTR_LUSTRE_LOV.add, we'd save the LOVEA for replay. */
-       if (opcode == MDS_REINT && rc == 0) {
-               struct mdt_body *body;
-               struct req_capsule *pill = &req->rq_pill;
-
-               body = req_capsule_server_get(pill, &RMF_MDT_BODY);
-               if (body == NULL)
-                       GOTO(out, rc = -EPROTO);
-
-               if (body->mbo_valid & OBD_MD_FLEASIZE) {
-                       void *eadata;
-
-                       eadata = req_capsule_server_sized_get(pill, &RMF_MDT_MD,
-                                                       body->mbo_eadatasize);
-                       if (eadata == NULL)
-                               GOTO(out, rc = -EPROTO);
-
-                       rc = mdc_save_lovea(req, &RMF_EADATA, eadata,
-                                           body->mbo_eadatasize);
-                       if (rc)
-                               GOTO(out, rc);
-               }
-       }
-out:
         if (rc)
                 ptlrpc_req_finished(req);
         else
index a9044a3..e844c64 100644 (file)
@@ -566,6 +566,24 @@ mdo_invalidate(const struct lu_env *env, struct mdd_object *obj)
        return dt_invalidate(env, mdd_object_child(obj));
 }
 
+static inline int
+mdo_declare_layout_change(const struct lu_env *env, struct mdd_object *obj,
+                         struct layout_intent *layout,
+                         const struct lu_buf *buf, struct thandle *handle)
+{
+       return dt_declare_layout_change(env, mdd_object_child(obj),
+                                       layout, buf, handle);
+}
+
+static inline int
+mdo_layout_change(const struct lu_env *env, struct mdd_object *obj,
+                 struct layout_intent *layout, const struct lu_buf *buf,
+                 struct thandle *handle)
+{
+       return dt_layout_change(env, mdd_object_child(obj),
+                               layout, buf, handle);
+}
+
 static inline
 int mdo_declare_index_insert(const struct lu_env *env, struct mdd_object *obj,
                             const struct lu_fid *fid, __u32 type,
index 18f5ce3..0062be6 100644 (file)
@@ -1715,6 +1715,60 @@ stop:
        return rc;
 }
 
+static int mdd_declare_layout_change(const struct lu_env *env,
+                                    struct mdd_device *mdd,
+                                    struct mdd_object *obj,
+                                    struct layout_intent *layout,
+                                    const struct lu_buf *buf,
+                                    struct thandle *handle)
+{
+       int rc;
+
+       rc = mdo_declare_layout_change(env, obj, layout, buf, handle);
+       if (rc)
+               return rc;
+
+       return mdd_declare_changelog_store(env, mdd, NULL, NULL, handle);
+}
+
+/* For PFL, this is used to instantiate necessary component objects. */
+int mdd_layout_change(const struct lu_env *env, struct md_object *obj,
+                     struct layout_intent *layout, const struct lu_buf *buf)
+{
+       struct mdd_object *mdd_obj = md2mdd_obj(obj);
+       struct mdd_device *mdd = mdo2mdd(obj);
+       struct thandle *handle;
+       int rc;
+       ENTRY;
+
+       handle = mdd_trans_create(env, mdd);
+       if (IS_ERR(handle))
+               RETURN(PTR_ERR(handle));
+
+       rc = mdd_declare_layout_change(env, mdd, mdd_obj, layout, buf, handle);
+       /**
+        * It's possible that another layout write intent has already
+        * instantiated our objects, so a -EALREADY returned, and we need to
+        * do nothing.
+        */
+       if (rc)
+               GOTO(stop, rc = (rc == -EALREADY) ? 0 : rc);
+
+       rc = mdd_trans_start(env, mdd, handle);
+       if (rc)
+               GOTO(stop, rc);
+
+       mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
+       rc = mdo_layout_change(env, mdd_obj, layout, buf, handle);
+       mdd_write_unlock(env, mdd_obj);
+       if (rc)
+               GOTO(stop, rc);
+
+       rc = mdd_changelog_data_store(env, mdd, CL_LAYOUT, 0, mdd_obj, handle);
+stop:
+       RETURN(mdd_trans_stop(env, mdd, rc, handle));
+}
+
 void mdd_object_make_hint(const struct lu_env *env, struct mdd_object *parent,
                          struct mdd_object *child, const struct lu_attr *attr,
                          const struct md_op_spec *spec,
@@ -2230,4 +2284,5 @@ const struct md_object_operations mdd_obj_ops = {
        .moo_object_sync        = mdd_object_sync,
        .moo_object_lock        = mdd_object_lock,
        .moo_object_unlock      = mdd_object_unlock,
+       .moo_layout_change      = mdd_layout_change,
 };
index 57b1234..657966d 100644 (file)
@@ -1229,19 +1229,51 @@ out:
 /**
  * Handler of layout intent RPC requiring the layout modification
  *
- * \param info [in]    thread environment
- * \param obj [in]     object
- * \param layout [in]  layout intent
+ * \param[in] info     thread environment
+ * \param[in] obj      object
+ * \param[in] layout   layout intent
+ * \param[in] buf      buffer containing client's lovea, could be empty
  *
  * \retval 0   on success
  * \retval < 0 error code
  */
 static int mdt_layout_change(struct mdt_thread_info *info,
                             struct mdt_object *obj,
-                            struct layout_intent *layout)
+                            struct layout_intent *layout,
+                            const struct lu_buf *buf)
 {
-       /* XXX: to do */
-       return 0;
+       struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL];
+       int rc;
+       ENTRY;
+
+       if (layout->li_start >= layout->li_end) {
+               CERROR("Recieved an invalid layout change range [%llu, %llu) "
+                      "for "DFID"\n", layout->li_start, layout->li_end,
+                      PFID(mdt_object_fid(obj)));
+               RETURN(-EINVAL);
+       }
+
+       if (!S_ISREG(lu_object_attr(&obj->mot_obj)))
+               GOTO(out, rc = -EINVAL);
+
+       rc = mo_permission(info->mti_env, NULL, mdt_object_child(obj), NULL,
+                          MAY_WRITE);
+       if (rc)
+               GOTO(out, rc);
+
+       /* take layout lock to prepare layout change */
+       mdt_lock_reg_init(lh, LCK_EX);
+       rc = mdt_object_lock(info, obj, lh,
+                            MDS_INODELOCK_LAYOUT | MDS_INODELOCK_XATTR);
+       if (rc)
+               GOTO(out, rc);
+
+       rc = mo_layout_change(info->mti_env, mdt_object_child(obj), layout,
+                             buf);
+
+       mdt_object_unlock(info, obj, lh, 1);
+out:
+       RETURN(rc);
 }
 
 /**
@@ -3513,6 +3545,10 @@ static int mdt_intent_layout(enum mdt_it_code opcode,
                        info->mti_mdt->mdt_max_mdsize = layout_size;
        }
 
+       /*
+        * set reply buffer size, so that ldlm_handle_enqueue0()->
+        * ldlm_lvbo_fill() will fill the reply buffer with lovea.
+        */
        (*lockp)->l_lvb_type = LVB_T_LAYOUT;
        req_capsule_set_size(info->mti_pill, &RMF_DLM_LVB, RCL_SERVER,
                             layout_size);
@@ -3520,8 +3556,32 @@ static int mdt_intent_layout(enum mdt_it_code opcode,
        if (rc)
                GOTO(out_obj, rc);
 
+
        if (layout_change) {
-               rc = mdt_layout_change(info, obj, layout);
+               struct lu_buf *buf = &info->mti_buf;
+
+               buf->lb_buf = NULL;
+               buf->lb_len = 0;
+               if (unlikely(req_is_replay(mdt_info_req(info)))) {
+                       buf->lb_buf = req_capsule_client_get(info->mti_pill,
+                                       &RMF_EADATA);
+                       buf->lb_len = req_capsule_get_size(info->mti_pill,
+                                       &RMF_EADATA, RCL_CLIENT);
+                       /*
+                        * If it's a replay of layout write intent RPC, the
+                        * client has saved the extended lovea when
+                        * it get reply then.
+                        */
+                       if (buf->lb_len > 0)
+                               mdt_fix_lov_magic(info, buf->lb_buf);
+               }
+
+               /*
+                * Instantiate some layout components, if @buf contains
+                * lovea, then it's a replay of the layout intent write
+                * RPC.
+                */
+               rc = mdt_layout_change(info, obj, layout, buf);
                if (rc)
                        GOTO(out_obj, rc);
        }
index f2fe618..f3f5e0c 100644 (file)
@@ -638,7 +638,7 @@ int mdt_name_unpack(struct req_capsule *pill,
                    enum mdt_name_flags flags);
 int mdt_close_unpack(struct mdt_thread_info *info);
 int mdt_reint_unpack(struct mdt_thread_info *info, __u32 op);
-void mdt_fix_lov_magic(struct mdt_thread_info *info);
+void mdt_fix_lov_magic(struct mdt_thread_info *info, void *eadata);
 int mdt_reint_rec(struct mdt_thread_info *, struct mdt_lock_handle *);
 #ifdef CONFIG_FS_POSIX_ACL
 int mdt_pack_acl2body(struct mdt_thread_info *info, struct mdt_body *repbody,
index a070d3b..0590fd4 100644 (file)
@@ -1292,12 +1292,10 @@ static int mdt_rename_unpack(struct mdt_thread_info *info)
 /*
  * please see comment above LOV_MAGIC_V1_DEF
  */
-void mdt_fix_lov_magic(struct mdt_thread_info *info)
+void mdt_fix_lov_magic(struct mdt_thread_info *info, void *eadata)
 {
-       struct mdt_reint_record *rr = &info->mti_rr;
-       struct lov_user_md_v1   *v1;
+       struct lov_user_md_v1   *v1 = eadata;
 
-       v1 = (void *)rr->rr_eadata;
        LASSERT(v1);
 
        if (unlikely(req_is_replay(mdt_info_req(info)))) {
@@ -1365,7 +1363,7 @@ static int mdt_open_unpack(struct mdt_thread_info *info)
                         sp->u.sp_ea.eadatalen = rr->rr_eadatalen;
                         sp->u.sp_ea.eadata = rr->rr_eadata;
                         sp->no_create = !!req_is_replay(req);
-                       mdt_fix_lov_magic(info);
+                       mdt_fix_lov_magic(info, rr->rr_eadata);
                 }
 
                 /*
index ec2cb6a..2904f9c 100644 (file)
@@ -263,7 +263,6 @@ int mdt_reint_setxattr(struct mdt_thread_info *info,
        int                      xattr_len = rr->rr_eadatalen;
        __u64                    lockpart = MDS_INODELOCK_UPDATE;
        int                      rc;
-       bool    reply_ea = false;
        ENTRY;
 
        CDEBUG(D_INODE, "setxattr for "DFID": %s %s\n", PFID(rr->rr_fid1),
@@ -337,23 +336,6 @@ int mdt_reint_setxattr(struct mdt_thread_info *info,
                }
 
                lockpart |= MDS_INODELOCK_LAYOUT;
-
-               /*
-                * For XATTR_LUSTRE_LOV.add, we'd reply LOVEA to client,
-                * client will save it for replay.
-                */
-               if (strncmp(xattr_name, XATTR_LUSTRE_LOV".add",
-                           strlen(XATTR_LUSTRE_LOV".add")) == 0 &&
-                   req_capsule_has_field(&req->rq_pill, &RMF_MDT_MD,
-                                         RCL_SERVER)) {
-                       /*
-                        * Don't need to reply LOVEA for replay request,
-                        * it's already stored in client request.
-                        */
-                       if (!req_is_replay(req))
-                               reply_ea = true;
-                       mdt_fix_lov_magic(info);
-               }
        }
 
         /* Revoke all clients' lookup lock, since the access
@@ -426,26 +408,6 @@ int mdt_reint_setxattr(struct mdt_thread_info *info,
                rc = -EINVAL;
        }
 
-       if (reply_ea && rc == 0) {
-               ma->ma_lmm = req_capsule_server_get(&req->rq_pill, &RMF_MDT_MD);
-               ma->ma_lmm_size = req_capsule_get_size(&req->rq_pill,
-                                                      &RMF_MDT_MD, RCL_SERVER);
-               ma->ma_need = MA_LOV;
-               ma->ma_valid = 0;
-               if (ma->ma_lmm_size > 0)
-                       rc = mdt_attr_get_complex(info, obj, ma);
-
-               if (ma->ma_valid & MA_LOV) {
-                       struct mdt_body *repbody;
-
-                       repbody = req_capsule_server_get(&req->rq_pill,
-                                                        &RMF_MDT_BODY);
-                       LASSERT(ma->ma_lmm_size != 0);
-                       repbody->mbo_eadatasize = ma->ma_lmm_size;
-                       repbody->mbo_valid |= OBD_MD_FLEASIZE;
-               }
-       }
-
        if (rc == 0)
                mdt_counter_incr(req, LPROC_MDT_SETXATTR);
 
index 5928306..5a64ceb 100644 (file)
@@ -319,12 +319,6 @@ static const struct req_msg_field *mds_reint_setxattr_client[] = {
        &RMF_DLM_REQ
 };
 
-static const struct req_msg_field *mds_reint_setxattr_server[] = {
-       &RMF_PTLRPC_BODY,
-       &RMF_MDT_BODY,
-       &RMF_MDT_MD
-};
-
 static const struct req_msg_field *mdt_swap_layouts[] = {
        &RMF_PTLRPC_BODY,
        &RMF_MDT_BODY,
@@ -1456,7 +1450,7 @@ EXPORT_SYMBOL(RQF_MDS_REINT_SETATTR);
 
 struct req_format RQF_MDS_REINT_SETXATTR =
         DEFINE_REQ_FMT0("MDS_REINT_SETXATTR",
-                       mds_reint_setxattr_client, mds_reint_setxattr_server);
+                       mds_reint_setxattr_client, mdt_body_only);
 EXPORT_SYMBOL(RQF_MDS_REINT_SETXATTR);
 
 struct req_format RQF_MDS_CONNECT =
index 805344a..d8f56be 100644 (file)
@@ -337,27 +337,28 @@ test_9() {
 
        rm -f $comp_file
 
-       $LFS setstripe -E 1m -S 1m $comp_file ||
+       $LFS setstripe -E 1m -S 1m -E 2M -c 1 $comp_file ||
                error "Create $comp_file failed"
 
        local comp_cnt=$($LFS getstripe --component-count $comp_file)
-       [ $comp_cnt -ne 1 ] && error "component count $comp_cnt != 1"
+       [ $comp_cnt -ne 2 ] && error "component count $comp_cnt != 2"
 
        replay_barrier $SINGLEMDS
 
-       $LFS setstripe --component-add -E 2M -c 1 $comp_file ||
-               error "Add component to $comp_file failed"
+       # instantiate the 2nd component
+       dd if=/dev/zero of=$comp_file bs=1k count=1 seek=1k
 
        local f1=$($LFS getstripe -I 2 $comp_file |
                        awk '/l_fid:/ {print $7}')
-
+       echo "before MDS recovery, the ost fid of 2nd component is $f1"
        fail $SINGLEMDS
 
        local f2=$($LFS getstripe -I 2 $comp_file |
                        awk '/l_fid:/ {print $7}')
+       echo "after MDS recovery, the ost fid of 2nd component is $f2"
        [ $f1 == $f2 ] || error "$f1 != $f2"
 }
-run_test 9 "Replay component add"
+run_test 9 "Replay layout extend object instantiation"
 
 component_dump() {
        echo $($LFS getstripe $1 |
@@ -409,6 +410,88 @@ test_10() {
 }
 run_test 10 "Inherit composite template from root"
 
+test_11() {
+       local comp_file=$DIR/$tfile
+       rm -f $comp_file
+
+       # only 1st component instantiated
+       $LFS setstripe -E 1m -E 2m -E 3m -E -1 $comp_file ||
+               error "Create $comp_file failed"
+
+       local f1=$($LFS getstripe -I 1 $comp_file | grep "l_fid")
+       [[ -z $f1 ]] && error "1: 1st component uninstantiated"
+       local f2=$($LFS getstripe -I 2 $comp_file | grep "l_fid")
+       [[ -n $f2 ]] && error "1: 2nd component instantiated"
+       local f3=$($LFS getstripe -I 3 $comp_file | grep "l_fid")
+       [[ -n $f3 ]] && error "1: 3rd component instantiated"
+       local f4=$($LFS getstripe -I 4 $comp_file | grep "l_fid")
+       [[ -n $f4 ]] && error "1: 4th component instantiated"
+
+       # the first 2 components instantiated
+       $TRUNCATE $comp_file $((1024*1024*1+1))
+
+       f2=$($LFS getstripe -I 2 $comp_file | grep "l_fid")
+       [[ -z $f2 ]] && error "2: 2nd component uninstantiated"
+       f3=$($LFS getstripe -I 3 $comp_file | grep "l_fid")
+       [[ -n $f3 ]] && error "2: 3rd component instantiated"
+       f4=$($LFS getstripe -I 4 $comp_file | grep "l_fid")
+       [[ -n $f4 ]] && error "2: 4th component instantiated"
+
+       # the first 3 components instantiated
+       $TRUNCATE $comp_file $((1024*1024*3))
+       $TRUNCATE $comp_file $((1024*1024*1+1))
+
+       f2=$($LFS getstripe -I 2 $comp_file | grep "l_fid")
+       [[ -z $f2 ]] && error "2: 2nd component uninstantiated"
+       f3=$($LFS getstripe -I 3 $comp_file | grep "l_fid")
+       [[ -z $f3 ]] && error "3: 3rd component uninstantiated"
+       f4=$($LFS getstripe -I 4 $comp_file | grep "l_fid")
+       [[ -n $f4 ]] && error "3: 4th component instantiated"
+
+       # all 4 components instantiated
+       dd if=/dev/zero of=$comp_file bs=1k count=1 seek=3k
+
+       f4=$($LFS getstripe -I 4 $comp_file | grep "l_fid")
+       [[ -z $f4 ]] && error "4: 4th component uninstantiated"
+
+       return 0
+}
+run_test 11 "Verify component instantiation with write/truncate"
+
+test_12() {
+       [ $OSTCOUNT -lt 3 ] && skip "needs >= 3 OSTs" && return
+
+       local file=$DIR/$tfile
+       rm -f $file
+
+       # specify ost list for component
+       $LFS setstripe -E1m -c2 -o0,1 -E2m -c2 -o1,2 -E3m -c2 -o2,1 \
+               -E4m -c2 -o2,0 -E-1 $file ||
+               error "Create $file failed"
+       # instantiate all components
+       $TRUNCATE $file $((1024*1024*4+1))
+
+       #verify object alloc order
+       local o1=$($LFS getstripe -I1 $file |
+                       awk '/l_ost_idx:/ {printf("%d",$5)}')
+       [[ $o1 != "01" ]] && error "$o1 is not 01"
+
+       local o2=$($LFS getstripe -I2 $file |
+                       awk '/l_ost_idx:/ {printf("%d",$5)}')
+       [[ $o2 != "12" ]] && error "$o2 is not 12"
+
+       local o3=$($LFS getstripe -I3 $file |
+                       awk '/l_ost_idx:/ {printf("%d",$5)}')
+       [[ $o3 != "21" ]] && error "$o3 is not 21"
+
+       local o4=$($LFS getstripe -I4 $file |
+                       awk '/l_ost_idx:/ {printf("%d",$5)}')
+       [[ $o4 != "20" ]] && error "$o4 is not 20"
+
+       return 0
+}
+run_test 12 "Verify ost list specification"
+
 complete $SECONDS
 check_and_cleanup_lustre
 exit_status