From f7e40c602d4e5f449dcdade8f48f090b41a7743b Mon Sep 17 00:00:00 2001 From: Alex Zhuravlev Date: Thu, 20 Sep 2012 00:13:14 +0400 Subject: [PATCH] LU-1303 lod: destroy and setattr to use striping destroy and setattr methods fetch striping, instantiate involved objects and then call corresponding methods. Signed-off-by: Alex Zhuravlev Change-Id: I33eea2717d10eb5c34b5a324ef35a83f1430242b Reviewed-on: http://review.whamcloud.com/4050 Tested-by: Hudson Reviewed-by: wangdi Tested-by: Maloo Reviewed-by: Andreas Dilger --- lustre/lod/lod_internal.h | 6 ++ lustre/lod/lod_lov.c | 218 ++++++++++++++++++++++++++++++++++++++++++++++ lustre/lod/lod_object.c | 93 ++++++++++++++++++-- 3 files changed, 312 insertions(+), 5 deletions(-) diff --git a/lustre/lod/lod_internal.h b/lustre/lod/lod_internal.h index 8782d72..6bb5020 100644 --- a/lustre/lod/lod_internal.h +++ b/lustre/lod/lod_internal.h @@ -246,6 +246,8 @@ int lod_add_device(const struct lu_env *env, struct lod_device *m, char *osp, unsigned index, unsigned gen, int active); int lod_del_device(const struct lu_env *env, struct lod_device *m, char *osp, unsigned index, unsigned gen); +int lod_load_striping(const struct lu_env *env, struct lod_object *mo); +int lod_get_lov_ea(const struct lu_env *env, struct lod_object *mo); void lod_fix_desc(struct lov_desc *desc); void lod_fix_desc_qos_maxage(__u32 *val); void lod_fix_desc_pattern(__u32 *val); @@ -253,6 +255,10 @@ void lod_fix_desc_stripe_count(__u32 *val); void lod_fix_desc_stripe_size(__u64 *val); int lod_pools_init(struct lod_device *m, struct lustre_cfg *cfg); int lod_pools_fini(struct lod_device *m); +int lod_parse_striping(const struct lu_env *env, struct lod_object *mo, + const struct lu_buf *buf); +int lod_initialize_objects(const struct lu_env *env, struct lod_object *mo, + struct lov_ost_data_v1 *objs); /* lod_pool.c */ int lod_ost_pool_add(struct ost_pool *op, __u32 idx, unsigned int min_count); diff --git a/lustre/lod/lod_lov.c b/lustre/lod/lod_lov.c index 5763ef4..d3be857 100644 --- a/lustre/lod/lod_lov.c +++ b/lustre/lod/lod_lov.c @@ -369,6 +369,224 @@ out: return(rc); } +int lod_ea_store_resize(struct lod_thread_info *info, int size) +{ + int round = size_roundup_power2(size); + + LASSERT(round <= lov_mds_md_size(LOV_MAX_STRIPE_COUNT, LOV_MAGIC_V3)); + if (info->lti_ea_store) { + LASSERT(info->lti_ea_store_size); + LASSERT(info->lti_ea_store_size < round); + CDEBUG(D_INFO, "EA store size %d is not enough, need %d\n", + info->lti_ea_store_size, round); + OBD_FREE_LARGE(info->lti_ea_store, info->lti_ea_store_size); + info->lti_ea_store = NULL; + info->lti_ea_store_size = 0; + } + + OBD_ALLOC_LARGE(info->lti_ea_store, round); + if (info->lti_ea_store == NULL) + RETURN(-ENOMEM); + info->lti_ea_store_size = round; + RETURN(0); +} + +int lod_get_lov_ea(const struct lu_env *env, struct lod_object *lo) +{ + struct lod_thread_info *info = lod_env_info(env); + struct dt_object *next = dt_object_child(&lo->ldo_obj); + int rc; + ENTRY; + + LASSERT(info); + + if (unlikely(info->lti_ea_store_size == 0)) { + /* just to enter in allocation block below */ + rc = -ERANGE; + } else { +repeat: + info->lti_buf.lb_buf = info->lti_ea_store; + info->lti_buf.lb_len = info->lti_ea_store_size; + rc = dt_xattr_get(env, next, &info->lti_buf, XATTR_NAME_LOV, + BYPASS_CAPA); + } + /* if object is not striped or inaccessible */ + if (rc == -ENODATA) + RETURN(0); + + if (rc == -ERANGE) { + /* EA doesn't fit, reallocate new buffer */ + rc = dt_xattr_get(env, next, &LU_BUF_NULL, XATTR_NAME_LOV, + BYPASS_CAPA); + if (rc == -ENODATA) + RETURN(0); + else if (rc < 0) + RETURN(rc); + + LASSERT(rc > 0); + rc = lod_ea_store_resize(info, rc); + if (rc) + RETURN(rc); + goto repeat; + } + + RETURN(rc); +} + +/* + * allocate array of objects pointers, find/create objects + * stripenr and other fields should be initialized by this moment + */ +int lod_initialize_objects(const struct lu_env *env, struct lod_object *lo, + struct lov_ost_data_v1 *objs) +{ + struct lod_thread_info *info = lod_env_info(env); + struct lod_device *md = lu2lod_dev(lo->ldo_obj.do_lu.lo_dev); + struct lu_object *o, *n; + struct lu_device *nd; + int i, idx, rc = 0; + ENTRY; + + LASSERT(lo); + LASSERT(lo->ldo_stripe == NULL); + LASSERT(lo->ldo_stripenr > 0); + LASSERT(lo->ldo_stripe_size > 0); + + i = sizeof(struct dt_object *) * lo->ldo_stripenr; + OBD_ALLOC(lo->ldo_stripe, i); + if (lo->ldo_stripe == NULL) + GOTO(out, rc = -ENOMEM); + lo->ldo_stripes_allocated = lo->ldo_stripenr; + + for (i = 0; i < lo->ldo_stripenr; i++) { + + info->lti_ostid.oi_id = le64_to_cpu(objs[i].l_object_id); + /* XXX: support for DNE? */ + info->lti_ostid.oi_seq = le64_to_cpu(objs[i].l_object_seq); + idx = le64_to_cpu(objs[i].l_ost_idx); + fid_ostid_unpack(&info->lti_fid, &info->lti_ostid, idx); + + /* + * XXX: assertion is left for testing, to make + * sure we never process requests till configuration + * is completed. to be changed to -EINVAL + */ + + lod_getref(md); + LASSERT(cfs_bitmap_check(md->lod_ost_bitmap, idx)); + LASSERT(OST_TGT(md,idx)); + LASSERTF(OST_TGT(md,idx)->ltd_ost, "idx %d\n", idx); + nd = &OST_TGT(md,idx)->ltd_ost->dd_lu_dev; + lod_putref(md); + + o = lu_object_find_at(env, nd, &info->lti_fid, NULL); + if (IS_ERR(o)) + GOTO(out, rc = PTR_ERR(o)); + + n = lu_object_locate(o->lo_header, nd->ld_type); + LASSERT(n); + + lo->ldo_stripe[i] = container_of(n, struct dt_object, do_lu); + } + +out: + RETURN(rc); +} + +/* + * Parse striping information stored in lti_ea_store + */ +int lod_parse_striping(const struct lu_env *env, struct lod_object *lo, + const struct lu_buf *buf) +{ + struct lov_mds_md_v1 *lmm; + struct lov_ost_data_v1 *objs; + __u32 magic; + int rc = 0; + ENTRY; + + LASSERT(buf); + LASSERT(buf->lb_buf); + LASSERT(buf->lb_len); + + lmm = (struct lov_mds_md_v1 *) buf->lb_buf; + magic = le32_to_cpu(lmm->lmm_magic); + + if (magic != LOV_MAGIC_V1 && magic != LOV_MAGIC_V3) + GOTO(out, rc = -EINVAL); + if (le32_to_cpu(lmm->lmm_pattern) != LOV_PATTERN_RAID0) + GOTO(out, rc = -EINVAL); + + lo->ldo_stripe_size = le32_to_cpu(lmm->lmm_stripe_size); + lo->ldo_stripenr = le16_to_cpu(lmm->lmm_stripe_count); + lo->ldo_layout_gen = le16_to_cpu(lmm->lmm_layout_gen); + + LASSERT(buf->lb_len >= lov_mds_md_size(lo->ldo_stripenr, magic)); + + if (magic == LOV_MAGIC_V3) { + struct lov_mds_md_v3 *v3 = (struct lov_mds_md_v3 *) lmm; + objs = &v3->lmm_objects[0]; + lod_object_set_pool(lo, v3->lmm_pool_name); + } else { + objs = &lmm->lmm_objects[0]; + } + + rc = lod_initialize_objects(env, lo, objs); + +out: + RETURN(rc); +} + +/* + * Load and parse striping information, create in-core representation for the + * stripes + */ +int lod_load_striping(const struct lu_env *env, struct lod_object *lo) +{ + struct lod_thread_info *info = lod_env_info(env); + struct dt_object *next = dt_object_child(&lo->ldo_obj); + int rc; + ENTRY; + + /* + * currently this code is supposed to be called from declaration + * phase only, thus the object is not expected to be locked by caller + */ + dt_write_lock(env, next, 0); + /* already initialized? */ + if (lo->ldo_stripe) { + int i; + /* check validity */ + for (i = 0; i < lo->ldo_stripenr; i++) + LASSERTF(lo->ldo_stripe[i], "stripe %d is NULL\n", i); + GOTO(out, rc = 0); + } + + if (!dt_object_exists(next)) + GOTO(out, rc = 0); + + /* only regular files can be striped */ + if (!(lu_object_attr(lod2lu_obj(lo)) & S_IFREG)) + GOTO(out, rc = 0); + + LASSERT(lo->ldo_stripenr == 0); + + rc = lod_get_lov_ea(env, lo); + if (rc <= 0) + GOTO(out, rc); + + /* + * there is LOV EA (striping information) in this object + * let's parse it and create in-core objects for the stripes + */ + info->lti_buf.lb_buf = info->lti_ea_store; + info->lti_buf.lb_len = info->lti_ea_store_size; + rc = lod_parse_striping(env, lo, &info->lti_buf); +out: + dt_write_unlock(env, next); + RETURN(rc); +} + void lod_fix_desc_stripe_size(__u64 *val) { if (*val < PTLRPC_MAX_BRW_SIZE) { diff --git a/lustre/lod/lod_object.c b/lustre/lod/lod_object.c index ecdaefb..e45e195 100644 --- a/lustre/lod/lod_object.c +++ b/lustre/lod/lod_object.c @@ -161,13 +161,38 @@ static int lod_declare_attr_set(const struct lu_env *env, struct thandle *handle) { struct dt_object *next = dt_object_child(dt); - int rc; + struct lod_object *lo = lod_dt_obj(dt); + int rc, i; ENTRY; /* * declare setattr on the local object */ rc = dt_declare_attr_set(env, next, attr, handle); + if (rc) + RETURN(rc); + + /* + * load striping information, notice we don't do this when object + * is being initialized as we don't need this information till + * few specific cases like destroy, chown + */ + rc = lod_load_striping(env, lo); + if (rc) + RETURN(rc); + + /* + * if object is striped declare changes on the stripes + */ + LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0); + for (i = 0; i < lo->ldo_stripenr; i++) { + LASSERT(lo->ldo_stripe[i]); + rc = dt_declare_attr_set(env, lo->ldo_stripe[i], attr, handle); + if (rc) { + CERROR("failed declaration: %d\n", rc); + break; + } + } RETURN(rc); } @@ -179,7 +204,8 @@ static int lod_attr_set(const struct lu_env *env, struct lustre_capa *capa) { struct dt_object *next = dt_object_child(dt); - int rc; + struct lod_object *lo = lod_dt_obj(dt); + int rc, i; ENTRY; /* @@ -189,6 +215,19 @@ static int lod_attr_set(const struct lu_env *env, if (rc) RETURN(rc); + /* + * if object is striped, apply changes to all the stripes + */ + LASSERT(lo->ldo_stripe || lo->ldo_stripenr == 0); + for (i = 0; i < lo->ldo_stripenr; i++) { + LASSERT(lo->ldo_stripe[i]); + rc = dt_attr_set(env, lo->ldo_stripe[i], attr, handle, capa); + if (rc) { + CERROR("failed declaration: %d\n", rc); + break; + } + } + RETURN(rc); } @@ -367,7 +406,8 @@ static int lod_declare_object_destroy(const struct lu_env *env, struct thandle *th) { struct dt_object *next = dt_object_child(dt); - int rc; + struct lod_object *lo = lod_dt_obj(dt); + int rc, i; ENTRY; /* @@ -377,6 +417,24 @@ static int lod_declare_object_destroy(const struct lu_env *env, if (rc) RETURN(rc); + /* + * load striping information, notice we don't do this when object + * is being initialized as we don't need this information till + * few specific cases like destroy, chown + */ + rc = lod_load_striping(env, lo); + if (rc) + RETURN(rc); + + /* declare destroy for all underlying objects */ + for (i = 0; i < lo->ldo_stripenr; i++) { + LASSERT(lo->ldo_stripe[i]); + rc = dt_declare_destroy(env, lo->ldo_stripe[i], th); + + if (rc) + break; + } + RETURN(rc); } @@ -384,7 +442,8 @@ static int lod_object_destroy(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { struct dt_object *next = dt_object_child(dt); - int rc; + struct lod_object *lo = lod_dt_obj(dt); + int rc, i; ENTRY; /* destroy local object */ @@ -392,6 +451,14 @@ static int lod_object_destroy(const struct lu_env *env, if (rc) RETURN(rc); + /* destroy all underlying objects */ + for (i = 0; i < lo->ldo_stripenr; i++) { + LASSERT(lo->ldo_stripe[i]); + rc = dt_destroy(env, lo->ldo_stripe[i], th); + if (rc) + break; + } + RETURN(rc); } @@ -537,8 +604,24 @@ static int lod_object_init(const struct lu_env *env, struct lu_object *o, RETURN(0); } -void lod_object_free_striping(const struct lu_env *env, struct lod_object *o) +void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo) { + int i; + + if (lo->ldo_stripe) { + LASSERT(lo->ldo_stripes_allocated > 0); + + for (i = 0; i < lo->ldo_stripenr; i++) { + if (lo->ldo_stripe[i]) + lu_object_put(env, &lo->ldo_stripe[i]->do_lu); + } + + i = sizeof(struct dt_object *) * lo->ldo_stripes_allocated; + OBD_FREE(lo->ldo_stripe, i); + lo->ldo_stripe = NULL; + lo->ldo_stripes_allocated = 0; + } + lo->ldo_stripenr = 0; } /* -- 1.8.3.1