Whamcloud - gitweb
LU-1303 lod: destroy and setattr to use striping
authorAlex Zhuravlev <alexey.zhuravlev@intel.com>
Wed, 19 Sep 2012 20:13:14 +0000 (00:13 +0400)
committerOleg Drokin <green@whamcloud.com>
Fri, 28 Sep 2012 18:27:31 +0000 (14:27 -0400)
destroy and setattr methods fetch striping, instantiate
involved objects and then call corresponding methods.

Signed-off-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Change-Id: I33eea2717d10eb5c34b5a324ef35a83f1430242b
Reviewed-on: http://review.whamcloud.com/4050
Tested-by: Hudson
Reviewed-by: wangdi <di.wang@whamcloud.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
lustre/lod/lod_internal.h
lustre/lod/lod_lov.c
lustre/lod/lod_object.c

index 8782d72..6bb5020 100644 (file)
@@ -246,6 +246,8 @@ int lod_add_device(const struct lu_env *env, struct lod_device *m,
                   char *osp, unsigned index, unsigned gen, int active);
 int lod_del_device(const struct lu_env *env, struct lod_device *m,
                   char *osp, unsigned index, unsigned gen);
                   char *osp, unsigned index, unsigned gen, int active);
 int lod_del_device(const struct lu_env *env, struct lod_device *m,
                   char *osp, unsigned index, unsigned gen);
+int lod_load_striping(const struct lu_env *env, struct lod_object *mo);
+int lod_get_lov_ea(const struct lu_env *env, struct lod_object *mo);
 void lod_fix_desc(struct lov_desc *desc);
 void lod_fix_desc_qos_maxage(__u32 *val);
 void lod_fix_desc_pattern(__u32 *val);
 void lod_fix_desc(struct lov_desc *desc);
 void lod_fix_desc_qos_maxage(__u32 *val);
 void lod_fix_desc_pattern(__u32 *val);
@@ -253,6 +255,10 @@ void lod_fix_desc_stripe_count(__u32 *val);
 void lod_fix_desc_stripe_size(__u64 *val);
 int lod_pools_init(struct lod_device *m, struct lustre_cfg *cfg);
 int lod_pools_fini(struct lod_device *m);
 void lod_fix_desc_stripe_size(__u64 *val);
 int lod_pools_init(struct lod_device *m, struct lustre_cfg *cfg);
 int lod_pools_fini(struct lod_device *m);
+int lod_parse_striping(const struct lu_env *env, struct lod_object *mo,
+                      const struct lu_buf *buf);
+int lod_initialize_objects(const struct lu_env *env, struct lod_object *mo,
+                          struct lov_ost_data_v1 *objs);
 
 /* lod_pool.c */
 int lod_ost_pool_add(struct ost_pool *op, __u32 idx, unsigned int min_count);
 
 /* lod_pool.c */
 int lod_ost_pool_add(struct ost_pool *op, __u32 idx, unsigned int min_count);
index 5763ef4..d3be857 100644 (file)
@@ -369,6 +369,224 @@ out:
        return(rc);
 }
 
        return(rc);
 }
 
+int lod_ea_store_resize(struct lod_thread_info *info, int size)
+{
+       int round = size_roundup_power2(size);
+
+       LASSERT(round <= lov_mds_md_size(LOV_MAX_STRIPE_COUNT, LOV_MAGIC_V3));
+       if (info->lti_ea_store) {
+               LASSERT(info->lti_ea_store_size);
+               LASSERT(info->lti_ea_store_size < round);
+               CDEBUG(D_INFO, "EA store size %d is not enough, need %d\n",
+                      info->lti_ea_store_size, round);
+               OBD_FREE_LARGE(info->lti_ea_store, info->lti_ea_store_size);
+               info->lti_ea_store = NULL;
+               info->lti_ea_store_size = 0;
+       }
+
+       OBD_ALLOC_LARGE(info->lti_ea_store, round);
+       if (info->lti_ea_store == NULL)
+               RETURN(-ENOMEM);
+       info->lti_ea_store_size = round;
+       RETURN(0);
+}
+
+int lod_get_lov_ea(const struct lu_env *env, struct lod_object *lo)
+{
+       struct lod_thread_info *info = lod_env_info(env);
+       struct dt_object       *next = dt_object_child(&lo->ldo_obj);
+       int                     rc;
+       ENTRY;
+
+       LASSERT(info);
+
+       if (unlikely(info->lti_ea_store_size == 0)) {
+               /* just to enter in allocation block below */
+               rc = -ERANGE;
+       } else {
+repeat:
+               info->lti_buf.lb_buf = info->lti_ea_store;
+               info->lti_buf.lb_len = info->lti_ea_store_size;
+               rc = dt_xattr_get(env, next, &info->lti_buf, XATTR_NAME_LOV,
+                                 BYPASS_CAPA);
+       }
+       /* if object is not striped or inaccessible */
+       if (rc == -ENODATA)
+               RETURN(0);
+
+       if (rc == -ERANGE) {
+               /* EA doesn't fit, reallocate new buffer */
+               rc = dt_xattr_get(env, next, &LU_BUF_NULL, XATTR_NAME_LOV,
+                                 BYPASS_CAPA);
+               if (rc == -ENODATA)
+                       RETURN(0);
+               else if (rc < 0)
+                       RETURN(rc);
+
+               LASSERT(rc > 0);
+               rc = lod_ea_store_resize(info, rc);
+               if (rc)
+                       RETURN(rc);
+               goto repeat;
+       }
+
+       RETURN(rc);
+}
+
+/*
+ * allocate array of objects pointers, find/create objects
+ * stripenr and other fields should be initialized by this moment
+ */
+int lod_initialize_objects(const struct lu_env *env, struct lod_object *lo,
+                          struct lov_ost_data_v1 *objs)
+{
+       struct lod_thread_info  *info = lod_env_info(env);
+       struct lod_device       *md = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev);
+       struct lu_object        *o, *n;
+       struct lu_device        *nd;
+       int                     i, idx, rc = 0;
+       ENTRY;
+
+       LASSERT(lo);
+       LASSERT(lo->ldo_stripe == NULL);
+       LASSERT(lo->ldo_stripenr > 0);
+       LASSERT(lo->ldo_stripe_size > 0);
+
+       i = sizeof(struct dt_object *) * lo->ldo_stripenr;
+       OBD_ALLOC(lo->ldo_stripe, i);
+       if (lo->ldo_stripe == NULL)
+               GOTO(out, rc = -ENOMEM);
+       lo->ldo_stripes_allocated = lo->ldo_stripenr;
+
+       for (i = 0; i < lo->ldo_stripenr; i++) {
+
+               info->lti_ostid.oi_id = le64_to_cpu(objs[i].l_object_id);
+               /* XXX: support for DNE? */
+               info->lti_ostid.oi_seq = le64_to_cpu(objs[i].l_object_seq);
+               idx = le64_to_cpu(objs[i].l_ost_idx);
+               fid_ostid_unpack(&info->lti_fid, &info->lti_ostid, idx);
+
+               /*
+                * XXX: assertion is left for testing, to make
+                * sure we never process requests till configuration
+                * is completed. to be changed to -EINVAL
+                */
+
+               lod_getref(md);
+               LASSERT(cfs_bitmap_check(md->lod_ost_bitmap, idx));
+               LASSERT(OST_TGT(md,idx));
+               LASSERTF(OST_TGT(md,idx)->ltd_ost, "idx %d\n", idx);
+               nd = &OST_TGT(md,idx)->ltd_ost->dd_lu_dev;
+               lod_putref(md);
+
+               o = lu_object_find_at(env, nd, &info->lti_fid, NULL);
+               if (IS_ERR(o))
+                       GOTO(out, rc = PTR_ERR(o));
+
+               n = lu_object_locate(o->lo_header, nd->ld_type);
+               LASSERT(n);
+
+               lo->ldo_stripe[i] = container_of(n, struct dt_object, do_lu);
+       }
+
+out:
+       RETURN(rc);
+}
+
+/*
+ * Parse striping information stored in lti_ea_store
+ */
+int lod_parse_striping(const struct lu_env *env, struct lod_object *lo,
+                      const struct lu_buf *buf)
+{
+       struct lov_mds_md_v1    *lmm;
+       struct lov_ost_data_v1  *objs;
+       __u32                    magic;
+       int                      rc = 0;
+       ENTRY;
+
+       LASSERT(buf);
+       LASSERT(buf->lb_buf);
+       LASSERT(buf->lb_len);
+
+       lmm = (struct lov_mds_md_v1 *) buf->lb_buf;
+       magic = le32_to_cpu(lmm->lmm_magic);
+
+       if (magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3)
+               GOTO(out, rc = -EINVAL);
+       if (le32_to_cpu(lmm->lmm_pattern) != LOV_PATTERN_RAID0)
+               GOTO(out, rc = -EINVAL);
+
+       lo->ldo_stripe_size = le32_to_cpu(lmm->lmm_stripe_size);
+       lo->ldo_stripenr = le16_to_cpu(lmm->lmm_stripe_count);
+       lo->ldo_layout_gen = le16_to_cpu(lmm->lmm_layout_gen);
+
+       LASSERT(buf->lb_len >= lov_mds_md_size(lo->ldo_stripenr, magic));
+
+       if (magic == LOV_MAGIC_V3) {
+               struct lov_mds_md_v3 *v3 = (struct lov_mds_md_v3 *) lmm;
+               objs = &v3->lmm_objects[0];
+               lod_object_set_pool(lo, v3->lmm_pool_name);
+       } else {
+               objs = &lmm->lmm_objects[0];
+       }
+
+       rc = lod_initialize_objects(env, lo, objs);
+
+out:
+       RETURN(rc);
+}
+
+/*
+ * Load and parse striping information, create in-core representation for the
+ * stripes
+ */
+int lod_load_striping(const struct lu_env *env, struct lod_object *lo)
+{
+       struct lod_thread_info  *info = lod_env_info(env);
+       struct dt_object        *next = dt_object_child(&lo->ldo_obj);
+       int                      rc;
+       ENTRY;
+
+       /*
+        * currently this code is supposed to be called from declaration
+        * phase only, thus the object is not expected to be locked by caller
+        */
+       dt_write_lock(env, next, 0);
+       /* already initialized? */
+       if (lo->ldo_stripe) {
+               int i;
+               /* check validity */
+               for (i = 0; i < lo->ldo_stripenr; i++)
+                       LASSERTF(lo->ldo_stripe[i], "stripe %d is NULL\n", i);
+               GOTO(out, rc = 0);
+       }
+
+       if (!dt_object_exists(next))
+               GOTO(out, rc = 0);
+
+       /* only regular files can be striped */
+       if (!(lu_object_attr(lod2lu_obj(lo)) & S_IFREG))
+               GOTO(out, rc = 0);
+
+       LASSERT(lo->ldo_stripenr == 0);
+
+       rc = lod_get_lov_ea(env, lo);
+       if (rc <= 0)
+               GOTO(out, rc);
+
+       /*
+        * there is LOV EA (striping information) in this object
+        * let's parse it and create in-core objects for the stripes
+        */
+       info->lti_buf.lb_buf = info->lti_ea_store;
+       info->lti_buf.lb_len = info->lti_ea_store_size;
+       rc = lod_parse_striping(env, lo, &info->lti_buf);
+out:
+       dt_write_unlock(env, next);
+       RETURN(rc);
+}
+
 void lod_fix_desc_stripe_size(__u64 *val)
 {
        if (*val < PTLRPC_MAX_BRW_SIZE) {
 void lod_fix_desc_stripe_size(__u64 *val)
 {
        if (*val < PTLRPC_MAX_BRW_SIZE) {
index ecdaefb..e45e195 100644 (file)
@@ -161,13 +161,38 @@ static int lod_declare_attr_set(const struct lu_env *env,
                                struct thandle *handle)
 {
        struct dt_object  *next = dt_object_child(dt);
                                struct thandle *handle)
 {
        struct dt_object  *next = dt_object_child(dt);
-       int                rc;
+       struct lod_object *lo = lod_dt_obj(dt);
+       int                rc, i;
        ENTRY;
 
        /*
         * declare setattr on the local object
         */
        rc = dt_declare_attr_set(env, next, attr, handle);
        ENTRY;
 
        /*
         * declare setattr on the local object
         */
        rc = dt_declare_attr_set(env, next, attr, handle);
+       if (rc)
+               RETURN(rc);
+
+       /*
+        * load striping information, notice we don't do this when object
+        * is being initialized as we don't need this information till
+        * few specific cases like destroy, chown
+        */
+       rc = lod_load_striping(env, lo);
+       if (rc)
+               RETURN(rc);
+
+       /*
+        * if object is striped declare changes on the stripes
+        */
+       LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
+       for (i = 0; i < lo->ldo_stripenr; i++) {
+               LASSERT(lo->ldo_stripe[i]);
+               rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr, handle);
+               if (rc) {
+                       CERROR("failed declaration: %d\n", rc);
+                       break;
+               }
+       }
 
        RETURN(rc);
 }
 
        RETURN(rc);
 }
@@ -179,7 +204,8 @@ static int lod_attr_set(const struct lu_env *env,
                        struct lustre_capa *capa)
 {
        struct dt_object  *next = dt_object_child(dt);
                        struct lustre_capa *capa)
 {
        struct dt_object  *next = dt_object_child(dt);
-       int                rc;
+       struct lod_object *lo = lod_dt_obj(dt);
+       int                rc, i;
        ENTRY;
 
        /*
        ENTRY;
 
        /*
@@ -189,6 +215,19 @@ static int lod_attr_set(const struct lu_env *env,
        if (rc)
                RETURN(rc);
 
        if (rc)
                RETURN(rc);
 
+       /*
+        * if object is striped, apply changes to all the stripes
+        */
+       LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0);
+       for (i = 0; i < lo->ldo_stripenr; i++) {
+               LASSERT(lo->ldo_stripe[i]);
+               rc = dt_attr_set(env, lo->ldo_stripe[i], attr, handle, capa);
+               if (rc) {
+                       CERROR("failed declaration: %d\n", rc);
+                       break;
+               }
+       }
+
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
@@ -367,7 +406,8 @@ static int lod_declare_object_destroy(const struct lu_env *env,
                                      struct thandle *th)
 {
        struct dt_object   *next = dt_object_child(dt);
                                      struct thandle *th)
 {
        struct dt_object   *next = dt_object_child(dt);
-       int                 rc;
+       struct lod_object  *lo = lod_dt_obj(dt);
+       int                 rc, i;
        ENTRY;
 
        /*
        ENTRY;
 
        /*
@@ -377,6 +417,24 @@ static int lod_declare_object_destroy(const struct lu_env *env,
        if (rc)
                RETURN(rc);
 
        if (rc)
                RETURN(rc);
 
+       /*
+        * load striping information, notice we don't do this when object
+        * is being initialized as we don't need this information till
+        * few specific cases like destroy, chown
+        */
+       rc = lod_load_striping(env, lo);
+       if (rc)
+               RETURN(rc);
+
+       /* declare destroy for all underlying objects */
+       for (i = 0; i < lo->ldo_stripenr; i++) {
+               LASSERT(lo->ldo_stripe[i]);
+               rc = dt_declare_destroy(env, lo->ldo_stripe[i], th);
+
+               if (rc)
+                       break;
+       }
+
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
@@ -384,7 +442,8 @@ static int lod_object_destroy(const struct lu_env *env,
                struct dt_object *dt, struct thandle *th)
 {
        struct dt_object  *next = dt_object_child(dt);
                struct dt_object *dt, struct thandle *th)
 {
        struct dt_object  *next = dt_object_child(dt);
-       int                rc;
+       struct lod_object *lo = lod_dt_obj(dt);
+       int                rc, i;
        ENTRY;
 
        /* destroy local object */
        ENTRY;
 
        /* destroy local object */
@@ -392,6 +451,14 @@ static int lod_object_destroy(const struct lu_env *env,
        if (rc)
                RETURN(rc);
 
        if (rc)
                RETURN(rc);
 
+       /* destroy all underlying objects */
+       for (i = 0; i < lo->ldo_stripenr; i++) {
+               LASSERT(lo->ldo_stripe[i]);
+               rc = dt_destroy(env, lo->ldo_stripe[i], th);
+               if (rc)
+                       break;
+       }
+
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
@@ -537,8 +604,24 @@ static int lod_object_init(const struct lu_env *env, struct lu_object *o,
        RETURN(0);
 }
 
        RETURN(0);
 }
 
-void lod_object_free_striping(const struct lu_env *env, struct lod_object *o)
+void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo)
 {
 {
+       int i;
+
+       if (lo->ldo_stripe) {
+               LASSERT(lo->ldo_stripes_allocated > 0);
+
+               for (i = 0; i < lo->ldo_stripenr; i++) {
+                       if (lo->ldo_stripe[i])
+                               lu_object_put(env, &lo->ldo_stripe[i]->do_lu);
+               }
+
+               i = sizeof(struct dt_object *) * lo->ldo_stripes_allocated;
+               OBD_FREE(lo->ldo_stripe, i);
+               lo->ldo_stripe = NULL;
+               lo->ldo_stripes_allocated = 0;
+       }
+       lo->ldo_stripenr = 0;
 }
 
 /*
 }
 
 /*