From 054bddbe6d12e1b955764c46aec85f6f2e25bd24 Mon Sep 17 00:00:00 2001 From: Mikhail Pershin Date: Wed, 3 Oct 2012 11:18:41 +0400 Subject: [PATCH] LU-1943 echo: echo_client to get attributes like MDT does with changes in MDT/MDD the echo_client start working not-correctly, it uses always 0-stiped files because gets mdsize == 0 from MDD. - All attributes should be taken explicitely from MDD. Introduce echo_attr_get_complex() and use it. - don't use md_getsize_max(), it is not supported anymore. - fix error handling issues - remove obsoleted code Signed-off-by: Mikhail Pershin Change-Id: I6f1277018d4b8716aa2e799e99aea6402c04b605 Reviewed-on: http://review.whamcloud.com/4164 Tested-by: Hudson Reviewed-by: wangdi Reviewed-by: Alex Zhuravlev Tested-by: Maloo Reviewed-by: Oleg Drokin --- lustre/include/lustre/lustre_idl.h | 1 - lustre/obdecho/echo_client.c | 286 ++++++++++++++++++++++++------------- 2 files changed, 185 insertions(+), 102 deletions(-) diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 67a414e..4705c1d 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -2106,7 +2106,6 @@ enum { MDS_CLOSE_CLEANUP = 1 << 6, MDS_KEEP_ORPHAN = 1 << 7, MDS_RECOV_OPEN = 1 << 8, - MDS_UNLINK_DESTROY = 1 << 9, /* Destory ost object in mdd_unlink */ }; /* instance of mdt_reint_rec */ diff --git a/lustre/obdecho/echo_client.c b/lustre/obdecho/echo_client.c index b51d57e..f1901d8 100644 --- a/lustre/obdecho/echo_client.c +++ b/lustre/obdecho/echo_client.c @@ -209,6 +209,9 @@ struct echo_thread_info { struct lov_user_md_v3 eti_lum; struct md_attr eti_ma; struct lu_name eti_lname; + /* per-thread values, can be re-used */ + void *eti_big_lmm; + int eti_big_lmmsize; char eti_name[20]; struct lu_buf eti_buf; char eti_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE]; @@ -1473,6 +1476,120 @@ static inline void echo_md_build_name(struct lu_name *lname, char *name, lname->ln_namelen = strlen(name); } +/* similar to mdt_attr_get_complex */ +static int echo_big_lmm_get(const struct lu_env *env, struct md_object *o, + struct md_attr *ma) +{ + struct echo_thread_info *info = echo_env_info(env); + int rc; + + ENTRY; + + LASSERT(ma->ma_lmm_size > 0); + + rc = mo_xattr_get(env, o, &LU_BUF_NULL, XATTR_NAME_LOV); + if (rc < 0) + RETURN(rc); + + /* big_lmm may need to be grown */ + if (info->eti_big_lmmsize < rc) { + int size = size_roundup_power2(rc); + + if (info->eti_big_lmmsize > 0) { + /* free old buffer */ + LASSERT(info->eti_big_lmm); + OBD_FREE_LARGE(info->eti_big_lmm, + info->eti_big_lmmsize); + info->eti_big_lmm = NULL; + info->eti_big_lmmsize = 0; + } + + OBD_ALLOC_LARGE(info->eti_big_lmm, size); + if (info->eti_big_lmm == NULL) + RETURN(-ENOMEM); + info->eti_big_lmmsize = size; + } + LASSERT(info->eti_big_lmmsize >= rc); + + info->eti_buf.lb_buf = info->eti_big_lmm; + info->eti_buf.lb_len = info->eti_big_lmmsize; + rc = mo_xattr_get(env, o, &info->eti_buf, XATTR_NAME_LOV); + if (rc < 0) + RETURN(rc); + + ma->ma_valid |= MA_LOV; + ma->ma_lmm = info->eti_big_lmm; + ma->ma_lmm_size = rc; + + RETURN(0); +} + +int echo_attr_get_complex(const struct lu_env *env, struct md_object *next, + struct md_attr *ma) +{ + struct echo_thread_info *info = echo_env_info(env); + struct lu_buf *buf = &info->eti_buf; + cfs_umode_t mode = lu_object_attr(&next->mo_lu); + int need = ma->ma_need; + int rc = 0, rc2; + + ENTRY; + + ma->ma_valid = 0; + + if (need & MA_INODE) { + ma->ma_need = MA_INODE; + rc = mo_attr_get(env, next, ma); + if (rc) + GOTO(out, rc); + ma->ma_valid |= MA_INODE; + } + + if (need & MA_LOV) { + if (S_ISREG(mode) || S_ISDIR(mode)) { + LASSERT(ma->ma_lmm_size > 0); + buf->lb_buf = ma->ma_lmm; + buf->lb_len = ma->ma_lmm_size; + rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_LOV); + if (rc2 > 0) { + ma->ma_lmm_size = rc2; + ma->ma_valid |= MA_LOV; + } else if (rc2 == -ENODATA) { + /* no LOV EA */ + ma->ma_lmm_size = 0; + } else if (rc2 == -ERANGE) { + rc2 = echo_big_lmm_get(env, next, ma); + if (rc2 < 0) + GOTO(out, rc = rc2); + } else { + GOTO(out, rc = rc2); + } + } + } + +#ifdef CONFIG_FS_POSIX_ACL + if (need & MA_ACL_DEF && S_ISDIR(mode)) { + buf->lb_buf = ma->ma_acl; + buf->lb_len = ma->ma_acl_size; + rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_DEFAULT); + if (rc2 > 0) { + ma->ma_acl_size = rc2; + ma->ma_valid |= MA_ACL_DEF; + } else if (rc2 == -ENODATA) { + /* no ACLs */ + ma->ma_acl_size = 0; + } else { + GOTO(out, rc = rc2); + } + } +#endif +out: + ma->ma_need = need; + CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n", + rc, ma->ma_valid, ma->ma_lmm); + RETURN(rc); +} + static int echo_md_create_internal(const struct lu_env *env, struct echo_device *ed, struct md_object *parent, struct lu_fid *fid, @@ -1486,6 +1603,8 @@ echo_md_create_internal(const struct lu_env *env, struct echo_device *ed, struct lu_object_conf conf = { .loc_flags = LOC_F_NEW }; int rc; + ENTRY; + rc = mdo_lookup(env, parent, lname, fid2, spec); if (rc == 0) return -EEXIST; @@ -1497,7 +1616,7 @@ echo_md_create_internal(const struct lu_env *env, struct echo_device *ed, if (IS_ERR(ec_child)) { CERROR("Can not find the child "DFID": rc = %ld\n", PFID(fid), PTR_ERR(ec_child)); - return PTR_ERR(ec_child); + RETURN(PTR_ERR(ec_child)); } child = lu_object_locate(ec_child->lo_header, ld->ld_type); @@ -1520,45 +1639,27 @@ echo_md_create_internal(const struct lu_env *env, struct echo_device *ed, } CDEBUG(D_RPCTRACE, "End creating object "DFID" %s %p rc = %d\n", PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent, rc); + EXIT; out_put: lu_object_put(env, ec_child); return rc; } -static int echo_set_lmm_size(const struct lu_env *env, - struct lu_device *ld, - struct md_attr *ma, int *lmm_size, - int *cookie_size) +static int echo_set_lmm_size(const struct lu_env *env, struct lu_device *ld, + struct md_attr *ma) { - struct md_device *md = lu2md_dev(ld); - int rc; - ENTRY; - - md = lu2md_dev(ld); - rc = md->md_ops->mdo_maxsize_get(env, md, - lmm_size, cookie_size); - if (rc) - RETURN(rc); - - ma->ma_lmm_size = *lmm_size; - if (*lmm_size > 0) { - OBD_ALLOC(ma->ma_lmm, *lmm_size); - if (ma->ma_lmm == NULL) { - ma->ma_lmm_size = 0; - RETURN(-ENOMEM); - } - } + struct echo_thread_info *info = echo_env_info(env); - ma->ma_cookie_size = *cookie_size; - if (*cookie_size > 0) { - OBD_ALLOC(ma->ma_cookie, *cookie_size); - if (ma->ma_cookie == NULL) { - ma->ma_cookie_size = 0; - RETURN(-ENOMEM); - } + if (strcmp(ld->ld_type->ldt_name, LUSTRE_MDD_NAME)) { + ma->ma_lmm = (void *)&info->eti_lmm; + ma->ma_lmm_size = sizeof(info->eti_lmm); + } else { + LASSERT(info->eti_big_lmmsize); + ma->ma_lmm = info->eti_big_lmm; + ma->ma_lmm_size = info->eti_big_lmmsize; } - RETURN(0); + return 0; } static int echo_create_md_object(const struct lu_env *env, @@ -1576,25 +1677,22 @@ static int echo_create_md_object(const struct lu_env *env, struct md_attr *ma = &info->eti_ma; struct lu_device *ld = ed->ed_next; int rc = 0; - int lmm_size = 0; - int cookie_size = 0; int i; + ENTRY; + parent = lu_object_locate(ec_parent->lo_header, ld->ld_type); - if (ec_parent == NULL) { - lu_object_put(env, ec_parent); - RETURN(PTR_ERR(parent)); - } + if (parent == NULL) + RETURN(-ENXIO); memset(ma, 0, sizeof(*ma)); memset(spec, 0, sizeof(*spec)); if (stripe_count != 0) { spec->sp_cr_flags |= FMODE_WRITE; - rc = echo_set_lmm_size(env, ld, ma, &lmm_size, &cookie_size); - if (rc) - GOTO(out_free, rc); + echo_set_lmm_size(env, ld, ma); if (stripe_count != -1) { struct lov_user_md_v3 *lum = &info->eti_lum; + lum->lmm_magic = LOV_USER_MAGIC_V3; lum->lmm_stripe_count = stripe_count; lum->lmm_stripe_offset = stripe_offset; @@ -1615,7 +1713,7 @@ static int echo_create_md_object(const struct lu_env *env, /* If name is specified, only create one object by name */ rc = echo_md_create_internal(env, ed, lu2md(parent), fid, lname, spec, ma); - GOTO(out_free, rc); + RETURN(rc); } /* Create multiple object sequenced by id */ @@ -1635,13 +1733,7 @@ static int echo_create_md_object(const struct lu_env *env, fid->f_oid++; } -out_free: - if (lmm_size > 0 && ma->ma_lmm != NULL) - OBD_FREE(ma->ma_lmm, lmm_size); - if (cookie_size > 0 && ma->ma_cookie != NULL) - OBD_FREE(ma->ma_cookie, cookie_size); - - return rc; + RETURN(rc); } static struct lu_object *echo_md_lookup(const struct lu_env *env, @@ -1682,14 +1774,12 @@ static int echo_setattr_object(const struct lu_env *env, int rc = 0; int i; + ENTRY; + parent = lu_object_locate(ec_parent->lo_header, ld->ld_type); - if (ec_parent == NULL) { - lu_object_put(env, ec_parent); - return PTR_ERR(parent); - } + if (parent == NULL) + RETURN(-ENXIO); - buf->lb_buf = info->eti_xattr_buf; - buf->lb_len = sizeof(info->eti_xattr_buf); for (i = 0; i < count; i++) { struct lu_object *ec_child, *child; @@ -1713,10 +1803,13 @@ static int echo_setattr_object(const struct lu_env *env, CDEBUG(D_RPCTRACE, "Start setattr object "DFID"\n", PFID(lu_object_fid(child))); + buf->lb_buf = info->eti_xattr_buf; + buf->lb_len = sizeof(info->eti_xattr_buf); + sprintf(name, "%s.test1", XATTR_USER_PREFIX); rc = mo_xattr_set(env, lu2md(child), buf, name, LU_XATTR_CREATE); - if (rc) { + if (rc < 0) { CERROR("Can not setattr child "DFID": rc = %d\n", PFID(lu_object_fid(child)), rc); lu_object_put(env, ec_child); @@ -1727,7 +1820,7 @@ static int echo_setattr_object(const struct lu_env *env, id++; lu_object_put(env, ec_child); } - return rc; + RETURN(rc); } static int echo_getattr_object(const struct lu_env *env, @@ -1742,21 +1835,15 @@ static int echo_getattr_object(const struct lu_env *env, struct md_attr *ma = &info->eti_ma; struct lu_device *ld = ed->ed_next; int rc = 0; - int lmm_size = 0; - int cookie_size = 0; int i; + ENTRY; + parent = lu_object_locate(ec_parent->lo_header, ld->ld_type); - if (ec_parent == NULL) { - lu_object_put(env, ec_parent); - return PTR_ERR(parent); - } + if (parent == NULL) + RETURN(-ENXIO); memset(ma, 0, sizeof(*ma)); - rc = echo_set_lmm_size(env, ld, ma, &lmm_size, &cookie_size); - if (rc) - GOTO(out_free, rc); - ma->ma_need |= MA_INODE | MA_LOV | MA_PFID | MA_HSM | MA_ACL_DEF; ma->ma_acl = info->eti_xattr_buf; ma->ma_acl_size = sizeof(info->eti_xattr_buf); @@ -1766,6 +1853,7 @@ static int echo_getattr_object(const struct lu_env *env, ma->ma_valid = 0; echo_md_build_name(lname, name, id); + echo_set_lmm_size(env, ld, ma); ec_child = echo_md_lookup(env, ed, lu2md(parent), lname); if (IS_ERR(ec_child)) { @@ -1778,12 +1866,12 @@ static int echo_getattr_object(const struct lu_env *env, if (child == NULL) { CERROR("Can not locate the child %s\n", lname->ln_name); lu_object_put(env, ec_child); - GOTO(out_free, rc = -EINVAL); + RETURN(-EINVAL); } CDEBUG(D_RPCTRACE, "Start getattr object "DFID"\n", PFID(lu_object_fid(child))); - rc = mo_attr_get(env, lu2md(child), ma); + rc = echo_attr_get_complex(env, lu2md(child), ma); if (rc) { CERROR("Can not getattr child "DFID": rc = %d\n", PFID(lu_object_fid(child)), rc); @@ -1796,12 +1884,7 @@ static int echo_getattr_object(const struct lu_env *env, lu_object_put(env, ec_child); } -out_free: - if (lmm_size > 0 && ma->ma_lmm != NULL) - OBD_FREE(ma->ma_lmm, lmm_size); - if (cookie_size > 0 && ma->ma_cookie != NULL) - OBD_FREE(ma->ma_cookie, cookie_size); - return rc; + RETURN(rc); } static int echo_lookup_object(const struct lu_env *env, @@ -1819,10 +1902,8 @@ static int echo_lookup_object(const struct lu_env *env, int i; parent = lu_object_locate(ec_parent->lo_header, ld->ld_type); - if (ec_parent == NULL) { - lu_object_put(env, ec_parent); - return PTR_ERR(parent); - } + if (parent == NULL) + return -ENXIO; /*prepare the requests*/ for (i = 0; i < count; i++) { @@ -1855,6 +1936,8 @@ static int echo_md_destroy_internal(const struct lu_env *env, struct lu_object *child; int rc; + ENTRY; + ec_child = echo_md_lookup(env, ed, parent, lname); if (IS_ERR(ec_child)) { CERROR("Can't find child %s: rc = %ld\n", lname->ln_name, @@ -1897,8 +1980,6 @@ static int echo_destroy_object(const struct lu_env *env, struct lu_device *ld = ed->ed_next; struct lu_object *parent; int rc = 0; - int lmm_size = 0; - int cookie_size = 0; int i; ENTRY; @@ -1913,22 +1994,18 @@ static int echo_destroy_object(const struct lu_env *env, ma->ma_need = MA_INODE; ma->ma_valid = 0; - rc = echo_set_lmm_size(env, ld, ma, &lmm_size, &cookie_size); - if (rc) - GOTO(out_free, rc); if (name != NULL) { lname->ln_name = name; lname->ln_namelen = namelen; rc = echo_md_destroy_internal(env, ed, lu2md(parent), lname, ma); - GOTO(out_free, rc); + RETURN(rc); } /*prepare the requests*/ for (i = 0; i < count; i++) { char *tmp_name = info->eti_name; - ma->ma_need |= MA_LOV; ma->ma_valid = 0; echo_md_build_name(lname, tmp_name, id); @@ -1941,11 +2018,6 @@ static int echo_destroy_object(const struct lu_env *env, id++; } -out_free: - if (lmm_size > 0 && ma->ma_lmm != NULL) - OBD_FREE(ma->ma_lmm, lmm_size); - if (cookie_size > 0 && ma->ma_cookie != NULL) - OBD_FREE(ma->ma_cookie, cookie_size); RETURN(rc); } @@ -2024,6 +2096,7 @@ static int echo_md_handler(struct echo_device *ed, int command, char *path, int path_len, int id, int count, struct obd_ioctl_data *data) { + struct echo_thread_info *info; struct lu_device *ld = ed->ed_next; struct lu_env *env; int refcheck; @@ -2048,27 +2121,30 @@ static int echo_md_handler(struct echo_device *ed, int command, RETURN(PTR_ERR(env)); rc = lu_env_refill_by_tags(env, ECHO_MD_CTX_TAG, ECHO_MD_SES_TAG); - if (rc != 0) { - cl_env_put(env, &refcheck); - RETURN(rc); - } + if (rc != 0) + GOTO(out_env, rc); + + /* init big_lmm buffer */ + info = echo_env_info(env); + LASSERT(info->eti_big_lmm == NULL); + OBD_ALLOC_LARGE(info->eti_big_lmm, MIN_MD_SIZE); + if (info->eti_big_lmm == NULL) + GOTO(out_env, rc = -ENOMEM); + info->eti_big_lmmsize = MIN_MD_SIZE; parent = echo_resolve_path(env, ed, path, path_len); if (IS_ERR(parent)) { CERROR("Can not resolve the path %s: rc = %ld\n", path, PTR_ERR(parent)); - cl_env_put(env, &refcheck); - RETURN(PTR_ERR(parent)); + GOTO(out_free, PTR_ERR(parent)); } if (namelen > 0) { OBD_ALLOC(name, namelen + 1); if (name == NULL) - RETURN(-ENOMEM); - if (cfs_copy_from_user(name, data->ioc_pbuf2, namelen)) { - OBD_FREE(name, namelen + 1); - RETURN(-EFAULT); - } + GOTO(out_put, rc = -ENOMEM); + if (cfs_copy_from_user(name, data->ioc_pbuf2, namelen)) + GOTO(out_name, rc = -EFAULT); } switch (command) { @@ -2113,9 +2189,17 @@ static int echo_md_handler(struct echo_device *ed, int command, rc = -EINVAL; break; } +out_name: if (name != NULL) OBD_FREE(name, namelen + 1); +out_put: lu_object_put(env, parent); +out_free: + LASSERT(info->eti_big_lmm); + OBD_FREE_LARGE(info->eti_big_lmm, info->eti_big_lmmsize); + info->eti_big_lmm = NULL; + info->eti_big_lmmsize = 0; +out_env: cl_env_put(env, &refcheck); return rc; } -- 1.8.3.1