From: Alex Zhuravlev Date: Fri, 17 Aug 2012 04:47:00 +0000 (+0400) Subject: LU-1304 mdt: get attributes with explicit calls X-Git-Tag: 2.3.51~42 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=88c84c7348b1ea35564354eba1c145599b19c649 LU-1304 mdt: get attributes with explicit calls to mdd, so that mdd doesn't need to care of attribute packs Signed-off-by: Alex Zhuravlev Change-Id: I0c2c636e4751f599c5a580526c87ff068a5409a5 Reviewed-on: http://review.whamcloud.com/3782 Tested-by: Hudson Tested-by: Maloo Reviewed-by: wangdi Reviewed-by: Andreas Dilger Reviewed-by: Jinshan Xiong --- diff --git a/lustre/mdd/mdd_device.c b/lustre/mdd/mdd_device.c index 92d75bf..8e22d04 100644 --- a/lustre/mdd/mdd_device.c +++ b/lustre/mdd/mdd_device.c @@ -736,42 +736,11 @@ static struct md_dir_operations mdd_dot_lustre_dir_ops = { static int obf_attr_get(const struct lu_env *env, struct md_object *obj, struct md_attr *ma) { - int rc = 0; - - if (ma->ma_need & MA_INODE) { - struct mdd_device *mdd = mdo2mdd(obj); - - /* "fid" is a virtual object and hence does not have any "real" - * attributes. So we reuse attributes of .lustre for "fid" dir */ - ma->ma_need |= MA_INODE; - rc = mdd_attr_get(env, &mdd->mdd_dot_lustre->mod_obj, ma); - if (rc) - return rc; - ma->ma_valid |= MA_INODE; - } - - /* "fid" directory does not have any striping information. */ - if (ma->ma_need & MA_LOV) { - struct mdd_object *mdd_obj = md2mdd_obj(obj); - - if (ma->ma_valid & MA_LOV) - return 0; - - if (!(S_ISREG(mdd_object_type(mdd_obj)) || - S_ISDIR(mdd_object_type(mdd_obj)))) - return 0; + struct mdd_device *mdd = mdo2mdd(obj); - if (ma->ma_need & MA_LOV_DEF) { - rc = mdd_get_default_md(mdd_obj, ma->ma_lmm); - if (rc > 0) { - ma->ma_lmm_size = rc; - ma->ma_valid |= MA_LOV; - rc = 0; - } - } - } - - return rc; + /* "fid" is a virtual object and hence does not have any "real" + * attributes. So we reuse attributes of .lustre for "fid" dir */ + return mdd_attr_get(env, &mdd->mdd_dot_lustre->mod_obj, ma); } static int obf_attr_set(const struct lu_env *env, struct md_object *obj, @@ -790,7 +759,20 @@ static int obf_xattr_get(const struct lu_env *env, struct md_object *obj, struct lu_buf *buf, const char *name) { - return 0; + int rc = 0; + + /* XXX: a temp. solution till LOD/OSP is landed */ + if (strcmp(name, XATTR_NAME_LOV) == 0) { + if (buf->lb_buf == NULL) { + rc = sizeof(struct lov_user_md); + } else if (buf->lb_len >= sizeof(struct lov_user_md)) { + rc = mdd_get_default_md(md2mdd_obj(obj), buf->lb_buf); + } else { + rc = -ERANGE; + } + } + + return rc; } static int obf_xattr_set(const struct lu_env *env, diff --git a/lustre/mdd/mdd_dir.c b/lustre/mdd/mdd_dir.c index 8f65973..f142139 100644 --- a/lustre/mdd/mdd_dir.c +++ b/lustre/mdd/mdd_dir.c @@ -1319,9 +1319,6 @@ static int mdd_create_data(const struct lu_env *env, struct md_object *pobj, rc = mdd_lov_set_md(env, mdd_pobj, son, lmm, lmm_size, handle, 0); - if (rc == 0) - rc = mdd_attr_get_internal_locked(env, son, ma); - /* update lov_objid data, must be before transaction stop! */ if (rc == 0) mdd_lov_objid_update(mdd, lmm); @@ -1823,20 +1820,6 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj, CERROR("error on stripe info copy %d \n", rc); GOTO(cleanup, rc); } - if (lmm && lmm_size > 0) { - /* Set Lov here, do not get lmm again later */ - if (lmm_size > ma->ma_lmm_size) { - /* Reply buffer is smaller, need bigger one */ - mdd_max_lmm_buffer(env, lmm_size); - if (unlikely(info->mti_max_lmm == NULL)) - GOTO(cleanup, rc = -ENOMEM); - ma->ma_lmm = info->mti_max_lmm; - ma->ma_big_lmm_used = 1; - } - memcpy(ma->ma_lmm, lmm, lmm_size); - ma->ma_lmm_size = lmm_size; - ma->ma_valid |= MA_LOV; - } if (S_ISLNK(attr->la_mode)) { struct md_ucred *uc = md_ucred(env); @@ -1864,8 +1847,6 @@ static int mdd_create(const struct lu_env *env, struct md_object *pobj, if (rc) GOTO(cleanup, rc); - /* Return attr back. */ - rc = mdd_attr_get_internal_locked(env, son, ma); EXIT; cleanup: if (rc != 0 && created != 0) { diff --git a/lustre/mdd/mdd_internal.h b/lustre/mdd/mdd_internal.h index c1f2e98..779c5e1 100644 --- a/lustre/mdd/mdd_internal.h +++ b/lustre/mdd/mdd_internal.h @@ -271,11 +271,6 @@ int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj, struct md_attr *ma, struct thandle *handle); int mdd_iattr_get(const struct lu_env *env, struct mdd_object *mdd_obj, struct md_attr *ma); -int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj, - struct md_attr *ma); -int mdd_attr_get_internal_locked(const struct lu_env *env, - struct mdd_object *mdd_obj, - struct md_attr *ma); int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p, struct mdd_object *c, struct lu_attr *attr, struct thandle *handle, @@ -511,8 +506,6 @@ static inline int mdd_capable(struct md_ucred *uc, cfs_cap_t cap) return 0; } -int mdd_def_acl_get(const struct lu_env *env, struct mdd_object *mdd_obj, - struct md_attr *ma); int mdd_acl_chmod(const struct lu_env *env, struct mdd_object *o, __u32 mode, struct thandle *handle); int __mdd_declare_acl_init(const struct lu_env *env, struct mdd_object *obj, diff --git a/lustre/mdd/mdd_object.c b/lustre/mdd/mdd_object.c index 218cbd6..04aea7e 100644 --- a/lustre/mdd/mdd_object.c +++ b/lustre/mdd/mdd_object.c @@ -687,34 +687,6 @@ static int __mdd_lmm_get(const struct lu_env *env, RETURN(rc); } -/* get the first parent fid from link EA */ -static int mdd_pfid_get(const struct lu_env *env, - struct mdd_object *mdd_obj, struct md_attr *ma) -{ - struct lu_buf *buf; - struct link_ea_header *leh; - struct link_ea_entry *lee; - struct lu_fid *pfid = &ma->ma_pfid; - ENTRY; - - if (ma->ma_valid & MA_PFID) - RETURN(0); - - buf = mdd_links_get(env, mdd_obj); - if (IS_ERR(buf)) - RETURN(PTR_ERR(buf)); - - leh = buf->lb_buf; - lee = (struct link_ea_entry *)(leh + 1); - memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid)); - fid_be_to_cpu(pfid, pfid); - ma->ma_valid |= MA_PFID; - if (buf->lb_len > OBD_ALLOC_BIG) - /* if we vmalloced a large buffer drop it */ - mdd_buf_put(buf); - RETURN(0); -} - int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj, struct md_attr *ma) { @@ -727,128 +699,6 @@ int mdd_lmm_get_locked(const struct lu_env *env, struct mdd_object *mdd_obj, RETURN(rc); } -/* get lmv EA only*/ -static int __mdd_lmv_get(const struct lu_env *env, - struct mdd_object *mdd_obj, struct md_attr *ma) -{ - int rc; - ENTRY; - - if (ma->ma_valid & MA_LMV) - RETURN(0); - - rc = mdd_get_md(env, mdd_obj, ma->ma_lmv, &ma->ma_lmv_size, - XATTR_NAME_LMV); - if (rc > 0) { - ma->ma_valid |= MA_LMV; - rc = 0; - } - RETURN(rc); -} - -static int __mdd_lma_get(const struct lu_env *env, struct mdd_object *mdd_obj, - struct md_attr *ma) -{ - struct mdd_thread_info *info = mdd_env_info(env); - struct lustre_mdt_attrs *lma = - (struct lustre_mdt_attrs *)info->mti_xattr_buf; - int lma_size; - int rc; - ENTRY; - - /* If all needed data are already valid, nothing to do */ - if ((ma->ma_valid & (MA_HSM | MA_SOM)) == - (ma->ma_need & (MA_HSM | MA_SOM))) - RETURN(0); - - /* Read LMA from disk EA */ - lma_size = sizeof(info->mti_xattr_buf); - rc = mdd_get_md(env, mdd_obj, lma, &lma_size, XATTR_NAME_LMA); - if (rc <= 0) - RETURN(rc); - - /* Useless to check LMA incompatibility because this is already done in - * osd_ea_fid_get(), and this will fail long before this code is - * called. - * So, if we are here, LMA is compatible. - */ - - lustre_lma_swab(lma); - - /* Swab and copy LMA */ - if (ma->ma_need & MA_HSM) { - if (lma->lma_compat & LMAC_HSM) - ma->ma_hsm.mh_flags = lma->lma_flags & HSM_FLAGS_MASK; - else - ma->ma_hsm.mh_flags = 0; - ma->ma_valid |= MA_HSM; - } - - /* Copy SOM */ - if (ma->ma_need & MA_SOM && lma->lma_compat & LMAC_SOM) { - LASSERT(ma->ma_som != NULL); - ma->ma_som->msd_ioepoch = lma->lma_ioepoch; - ma->ma_som->msd_size = lma->lma_som_size; - ma->ma_som->msd_blocks = lma->lma_som_blocks; - ma->ma_som->msd_mountid = lma->lma_som_mountid; - ma->ma_valid |= MA_SOM; - } - - RETURN(0); -} - -int mdd_attr_get_internal(const struct lu_env *env, struct mdd_object *mdd_obj, - struct md_attr *ma) -{ - int rc = 0; - ENTRY; - - if (ma->ma_need & MA_INODE) - rc = mdd_iattr_get(env, mdd_obj, ma); - - if (rc == 0 && ma->ma_need & MA_LOV) { - if (S_ISREG(mdd_object_type(mdd_obj)) || - S_ISDIR(mdd_object_type(mdd_obj))) - rc = __mdd_lmm_get(env, mdd_obj, ma); - } - if (rc == 0 && ma->ma_need & MA_PFID && !(ma->ma_valid & MA_LOV)) { - if (S_ISREG(mdd_object_type(mdd_obj))) - rc = mdd_pfid_get(env, mdd_obj, ma); - } - if (rc == 0 && ma->ma_need & MA_LMV) { - if (S_ISDIR(mdd_object_type(mdd_obj))) - rc = __mdd_lmv_get(env, mdd_obj, ma); - } - if (rc == 0 && ma->ma_need & (MA_HSM | MA_SOM)) { - if (S_ISREG(mdd_object_type(mdd_obj))) - rc = __mdd_lma_get(env, mdd_obj, ma); - } -#ifdef CONFIG_FS_POSIX_ACL - if (rc == 0 && ma->ma_need & MA_ACL_DEF) { - if (S_ISDIR(mdd_object_type(mdd_obj))) - rc = mdd_def_acl_get(env, mdd_obj, ma); - } -#endif - CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n", - rc, ma->ma_valid, ma->ma_lmm); - RETURN(rc); -} - -int mdd_attr_get_internal_locked(const struct lu_env *env, - struct mdd_object *mdd_obj, struct md_attr *ma) -{ - int rc; - int needlock = ma->ma_need & - (MA_LOV | MA_LMV | MA_ACL_DEF | MA_HSM | MA_SOM | MA_PFID); - - if (needlock) - mdd_read_lock(env, mdd_obj, MOR_TGT_CHILD); - rc = mdd_attr_get_internal(env, mdd_obj, ma); - if (needlock) - mdd_read_unlock(env, mdd_obj); - return rc; -} - /* * No permission check is needed. */ @@ -859,7 +709,7 @@ int mdd_attr_get(const struct lu_env *env, struct md_object *obj, int rc; ENTRY; - rc = mdd_attr_get_internal_locked(env, mdd_obj, ma); + rc = mdd_iattr_get(env, mdd_obj, ma); RETURN(rc); } @@ -871,6 +721,9 @@ static int mdd_xattr_get(const struct lu_env *env, const char *name) { struct mdd_object *mdd_obj = md2mdd_obj(obj); + struct mdd_device *mdd = mdo2mdd(obj); + struct lu_fid rootfid; + int is_root; int rc; ENTRY; @@ -886,6 +739,20 @@ static int mdd_xattr_get(const struct lu_env *env, mdd_object_capa(env, mdd_obj)); mdd_read_unlock(env, mdd_obj); + dt_root_get(env, mdd->mdd_child, &rootfid); + is_root = lu_fid_eq(mdd_object_fid(mdd_obj), &rootfid); + + /* XXX: a temp. solution till LOD/OSP is landed */ + if (rc == -ENODATA && strcmp(name, XATTR_NAME_LOV) == 0 && is_root) { + if (buf->lb_buf == NULL) { + rc = sizeof(struct lov_user_md); + } else if (buf->lb_len >= sizeof(struct lov_user_md)) { + rc = mdd_get_default_md(mdd_obj, buf->lb_buf); + } else { + rc = -ERANGE; + } + } + RETURN(rc); } diff --git a/lustre/mdd/mdd_permission.c b/lustre/mdd/mdd_permission.c index b74872b..abedd81 100644 --- a/lustre/mdd/mdd_permission.c +++ b/lustre/mdd/mdd_permission.c @@ -51,33 +51,6 @@ #ifdef CONFIG_FS_POSIX_ACL /* - * Get default acl EA only. - * Hold read_lock for mdd_obj. - */ -int mdd_def_acl_get(const struct lu_env *env, struct mdd_object *mdd_obj, - struct md_attr *ma) -{ - struct lu_buf *buf; - int rc; - ENTRY; - - if (ma->ma_valid & MA_ACL_DEF) - RETURN(0); - - buf = mdd_buf_get(env, ma->ma_acl, ma->ma_acl_size); - rc = mdo_xattr_get(env, mdd_obj, buf, XATTR_NAME_ACL_DEFAULT, - BYPASS_CAPA); - if (rc > 0) { - ma->ma_acl_size = rc; - ma->ma_valid |= MA_ACL_DEF; - rc = 0; - } else if ((rc == -EOPNOTSUPP) || (rc == -ENODATA)) { - rc = 0; - } - RETURN(rc); -} - -/* * Hold write_lock for o. */ int mdd_acl_chmod(const struct lu_env *env, struct mdd_object *o, __u32 mode, diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 4fb98ae..71dce5b 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -482,7 +482,7 @@ void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b, if (!S_ISREG(attr->la_mode)) { b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLRDEV; - } else if (ma->ma_need & MA_LOV && ma->ma_lmm_size == 0) { + } else if (ma->ma_need & MA_LOV && !(ma->ma_valid & MA_LOV)) { /* means no objects are allocated on osts. */ LASSERT(!(ma->ma_valid & MA_LOV)); /* just ignore blocks occupied by extend attributes on MDS */ @@ -544,6 +544,186 @@ void mdt_client_compatibility(struct mdt_thread_info *info) EXIT; } +static int mdt_big_lmm_get(const struct lu_env *env, struct mdt_object *o, + struct md_attr *ma) +{ + struct mdt_thread_info *info; + int rc; + ENTRY; + + info = lu_context_key_get(&env->le_ctx, &mdt_thread_key); + LASSERT(info != NULL); + LASSERT(ma->ma_lmm_size > 0); + LASSERT(info->mti_big_lmm_used == 0); + rc = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL, + XATTR_NAME_LOV); + if (rc < 0) + RETURN(rc); + + /* big_lmm may need to be grown */ + if (info->mti_big_lmmsize < rc) { + int size = size_roundup_power2(rc); + + if (info->mti_big_lmmsize > 0) { + /* free old buffer */ + LASSERT(info->mti_big_lmm); + OBD_FREE_LARGE(info->mti_big_lmm, + info->mti_big_lmmsize); + info->mti_big_lmm = NULL; + info->mti_big_lmmsize = 0; + } + + OBD_ALLOC_LARGE(info->mti_big_lmm, size); + if (info->mti_big_lmm == NULL) + RETURN(-ENOMEM); + info->mti_big_lmmsize = size; + } + LASSERT(info->mti_big_lmmsize >= rc); + + info->mti_buf.lb_buf = info->mti_big_lmm; + info->mti_buf.lb_len = info->mti_big_lmmsize; + rc = mo_xattr_get(env, mdt_object_child(o), &info->mti_buf, + XATTR_NAME_LOV); + if (rc < 0) + RETURN(rc); + + info->mti_big_lmm_used = 1; + ma->ma_valid |= MA_LOV; + ma->ma_lmm = info->mti_big_lmm; + ma->ma_lmm_size = rc; + + /* update mdt_max_mdsize so all clients will be aware about that */ + if (info->mti_mdt->mdt_max_mdsize < rc) + info->mti_mdt->mdt_max_mdsize = rc; + + RETURN(0); +} + +int mdt_attr_get_lov(struct mdt_thread_info *info, + struct mdt_object *o, struct md_attr *ma) +{ + struct md_object *next = mdt_object_child(o); + struct lu_buf *buf = &info->mti_buf; + int rc; + + buf->lb_buf = ma->ma_lmm; + buf->lb_len = ma->ma_lmm_size; + rc = mo_xattr_get(info->mti_env, next, buf, XATTR_NAME_LOV); + if (rc > 0) { + ma->ma_lmm_size = rc; + ma->ma_valid |= MA_LOV; + rc = 0; + } else if (rc == -ENODATA) { + /* no LOV EA */ + rc = 0; + } else if (rc == -ERANGE) { + rc = mdt_big_lmm_get(info->mti_env, o, ma); + } + + return rc; +} + +int mdt_attr_get_complex(struct mdt_thread_info *info, + struct mdt_object *o, struct md_attr *ma) +{ + const struct lu_env *env = info->mti_env; + struct md_object *next = mdt_object_child(o); + struct lu_buf *buf = &info->mti_buf; + u32 mode = lu_object_attr(&next->mo_lu); + int need = ma->ma_need; + int rc = 0, rc2; + ENTRY; + + /* do we really need PFID */ + LASSERT((ma->ma_need & MA_PFID) == 0); + + ma->ma_valid = 0; + + if (need & MA_INODE) { + ma->ma_need = MA_INODE; + rc = mo_attr_get(env, next, ma); + if (rc) + GOTO(out, rc); + ma->ma_valid |= MA_INODE; + } + + if (need & MA_LOV && (S_ISREG(mode) || S_ISDIR(mode))) { + rc = mdt_attr_get_lov(info, o, ma); + if (rc) + GOTO(out, rc); + } + + if (need & MA_LMV && S_ISDIR(mode)) { + buf->lb_buf = ma->ma_lmv; + buf->lb_len = ma->ma_lmv_size; + rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_LMV); + if (rc2 > 0) { + ma->ma_lmv_size = rc2; + ma->ma_valid |= MA_LMV; + } else if (rc2 == -ENODATA) { + /* no LMV EA */ + ma->ma_lmv_size = 0; + } else + GOTO(out, rc = rc2); + } + + + if (rc == 0 && S_ISREG(mode) && (need & (MA_HSM | MA_SOM))) { + struct lustre_mdt_attrs *lma; + + lma = (struct lustre_mdt_attrs *)info->mti_xattr_buf; + CLASSERT(sizeof(*lma) <= sizeof(info->mti_xattr_buf)); + + buf->lb_buf = lma; + buf->lb_len = sizeof(info->mti_xattr_buf); + rc = mo_xattr_get(env, next, buf, XATTR_NAME_LMA); + if (rc > 0) { + lustre_lma_swab(lma); + /* Swab and copy LMA */ + if (need & MA_HSM) { + if (lma->lma_compat & LMAC_HSM) + ma->ma_hsm.mh_flags = + lma->lma_flags & HSM_FLAGS_MASK; + else + ma->ma_hsm.mh_flags = 0; + ma->ma_valid |= MA_HSM; + } + /* Copy SOM */ + if (need & MA_SOM && lma->lma_compat & LMAC_SOM) { + LASSERT(ma->ma_som != NULL); + ma->ma_som->msd_ioepoch = lma->lma_ioepoch; + ma->ma_som->msd_size = lma->lma_som_size; + ma->ma_som->msd_blocks = lma->lma_som_blocks; + ma->ma_som->msd_mountid = lma->lma_som_mountid; + ma->ma_valid |= MA_SOM; + } + rc = 0; + } else if (rc == -ENODATA) { + rc = 0; + } + } + +#ifdef CONFIG_FS_POSIX_ACL + if (need & MA_ACL_DEF && S_ISDIR(mode)) { + buf->lb_buf = ma->ma_acl; + buf->lb_len = ma->ma_acl_size; + rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_DEFAULT); + if (rc2 > 0) { + ma->ma_acl_size = rc2; + ma->ma_valid |= MA_ACL_DEF; + } else if (rc2 == -ENODATA) { + /* no ACLs */ + ma->ma_acl_size = 0; + } else + GOTO(out, rc = rc2); + } +#endif +out: + ma->ma_need = need; + CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n", + rc, ma->ma_valid, ma->ma_lmm); + RETURN(rc); +} static int mdt_getattr_internal(struct mdt_thread_info *info, struct mdt_object *o, int ma_need) @@ -558,6 +738,7 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, struct mdt_body *repbody; struct lu_buf *buffer = &info->mti_buf; int rc; + int is_root; ENTRY; if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) @@ -575,8 +756,11 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, GOTO(out, rc = 0); } - buffer->lb_buf = req_capsule_server_get(pill, &RMF_MDT_MD); - buffer->lb_len = req_capsule_get_size(pill, &RMF_MDT_MD, RCL_SERVER); + buffer->lb_len = reqbody->eadatasize; + if (buffer->lb_len > 0) + buffer->lb_buf = req_capsule_server_get(pill, &RMF_MDT_MD); + else + buffer->lb_buf = NULL; /* If it is dir object and client require MEA, then we got MEA */ if (S_ISDIR(lu_object_attr(&next->mo_lu)) && @@ -601,13 +785,38 @@ static int mdt_getattr_internal(struct mdt_thread_info *info, if (ma->ma_need & MA_SOM) ma->ma_som = &info->mti_u.som.data; - rc = mo_attr_get(env, next, ma); + rc = mdt_attr_get_complex(info, o, ma); if (unlikely(rc)) { CERROR("getattr error for "DFID": %d\n", PFID(mdt_object_fid(o)), rc); RETURN(rc); } + is_root = lu_fid_eq(mdt_object_fid(o), &info->mti_mdt->mdt_md_root_fid); + + /* the Lustre protocol supposes to return default striping + * on the user-visible root if explicitly requested */ + if ((ma->ma_valid & MA_LOV) == 0 && S_ISDIR(la->la_mode) && + (ma->ma_need & MA_LOV_DEF && is_root) && (ma->ma_need & MA_LOV)) { + struct lu_fid rootfid; + struct mdt_object *root; + struct mdt_device *mdt = info->mti_mdt; + + rc = dt_root_get(env, mdt->mdt_bottom, &rootfid); + if (rc) + RETURN(rc); + root = mdt_object_find(env, mdt, &rootfid); + if (IS_ERR(root)) + RETURN(PTR_ERR(root)); + rc = mdt_attr_get_lov(info, root, ma); + mdt_object_put(info->mti_env, root); + if (unlikely(rc)) { + CERROR("getattr error for "DFID": %d\n", + PFID(mdt_object_fid(o)), rc); + RETURN(rc); + } + } + if (likely(ma->ma_valid & MA_INODE)) mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o)); else @@ -1117,8 +1326,7 @@ relock: ma->ma_valid = 0; ma->ma_need = MA_INODE; - rc = mo_attr_get(info->mti_env, - mdt_object_child(child), ma); + rc = mdt_attr_get_complex(info, child, ma); if (unlikely(rc != 0)) GOTO(out_child, rc); @@ -1790,15 +1998,13 @@ static int mdt_sync(struct mdt_thread_info *info) if (rc == 0) { rc = mdt_object_sync(info); if (rc == 0) { - struct md_object *next; const struct lu_fid *fid; struct lu_attr *la = &info->mti_attr.ma_attr; - next = mdt_object_child(info->mti_object); info->mti_attr.ma_need = MA_INODE; info->mti_attr.ma_valid = 0; - rc = mo_attr_get(info->mti_env, next, - &info->mti_attr); + rc = mdt_attr_get_complex(info, info->mti_object, + &info->mti_attr); if (rc == 0) { body = req_capsule_server_get(pill, &RMF_MDT_BODY); @@ -2903,6 +3109,7 @@ static void mdt_thread_info_init(struct ptlrpc_request *req, info->mti_no_need_trans = 0; info->mti_cross_ref = 0; info->mti_opdata = 0; + info->mti_big_lmm_used = 0; /* To not check for split by default. */ info->mti_spec.sp_ck_split = 0; @@ -4616,6 +4823,12 @@ static int mdt_stack_init(struct lu_env *env, rc = child_lu_dev->ld_ops->ldo_prepare(env, &m->mdt_md_dev.md_lu_dev, child_lu_dev); + if (rc) + GOTO(out, rc); + + rc = m->mdt_child->md_ops->mdo_root_get(env, m->mdt_child, + &m->mdt_md_root_fid); + out: /* fini from last known good lu_device */ if (rc) @@ -6059,7 +6272,20 @@ static struct lu_device *mdt_device_alloc(const struct lu_env *env, } /* context key constructor/destructor: mdt_key_init, mdt_key_fini */ -LU_KEY_INIT_FINI(mdt, struct mdt_thread_info); +LU_KEY_INIT(mdt, struct mdt_thread_info); + +static void mdt_key_fini(const struct lu_context *ctx, + struct lu_context_key *key, void* data) +{ + struct mdt_thread_info *info = data; + + if (info->mti_big_lmm) { + OBD_FREE_LARGE(info->mti_big_lmm, info->mti_big_lmmsize); + info->mti_big_lmm = NULL; + info->mti_big_lmmsize = 0; + } + OBD_FREE_PTR(info); +} /* context key: mdt_thread_key */ LU_CONTEXT_KEY_DEFINE(mdt, LCT_MD_THREAD); diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index fbb10c0..f1d60b8 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -178,6 +178,7 @@ struct mdt_device { struct lprocfs_stats *mdt_stats; int mdt_sec_level; struct rename_stats mdt_rename_stats; + struct lu_fid mdt_md_root_fid; }; #define MDT_SERVICE_WATCHDOG_FACTOR (2) @@ -400,6 +401,13 @@ struct mdt_thread_info { /* Ops object filename */ struct lu_name mti_name; + /* per-thread values, can be re-used */ + void *mti_big_lmm; + int mti_big_lmmsize; + /* big_lmm buffer was used and must be used in reply */ + int mti_big_lmm_used; + /* should be enough to fit lustre_mdt_attrs */ + char mti_xattr_buf[128]; }; static inline const struct md_device_operations * @@ -577,6 +585,8 @@ enum { MDT_SOM_ENABLE = 1, }; +int mdt_attr_get_complex(struct mdt_thread_info *info, + struct mdt_object *o, struct md_attr *ma); int mdt_ioepoch_open(struct mdt_thread_info *info, struct mdt_object *o, int created); int mdt_object_is_som_enabled(struct mdt_object *mo); diff --git a/lustre/mdt/mdt_lib.c b/lustre/mdt/mdt_lib.c index ade0825..b734013 100644 --- a/lustre/mdt/mdt_lib.c +++ b/lustre/mdt/mdt_lib.c @@ -586,7 +586,7 @@ int mdt_fix_reply(struct mdt_thread_info *info) /* MDT_MD buffer may be bigger than packed value, let's shrink all * buffers before growing it */ - if (info->mti_attr.ma_big_lmm_used) { + if (info->mti_big_lmm_used) { LASSERT(req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)); md_packed = req_capsule_get_size(pill, &RMF_MDT_MD, RCL_SERVER); @@ -597,7 +597,7 @@ int mdt_fix_reply(struct mdt_thread_info *info) req_capsule_shrink(pill, &RMF_MDT_MD, 0, RCL_SERVER); /* free big lmm if md_size is not needed */ if (md_size == 0) - info->mti_attr.ma_big_lmm_used = 0; + info->mti_big_lmm_used = 0; } else if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) { req_capsule_shrink(pill, &RMF_MDT_MD, md_size, RCL_SERVER); } @@ -622,7 +622,7 @@ int mdt_fix_reply(struct mdt_thread_info *info) */ /* Grow MD buffer if needed finally */ - if (info->mti_attr.ma_big_lmm_used) { + if (info->mti_big_lmm_used) { void *lmm; LASSERT(md_size > md_packed); @@ -649,7 +649,7 @@ int mdt_fix_reply(struct mdt_thread_info *info) if (info->mti_mdt->mdt_max_mdsize < info->mti_attr.ma_lmm_size) info->mti_mdt->mdt_max_mdsize = info->mti_attr.ma_lmm_size; - info->mti_attr.ma_big_lmm_used = 0; + info->mti_big_lmm_used = 0; } RETURN(rc); } @@ -1288,7 +1288,8 @@ static int mdt_open_unpack(struct mdt_thread_info *info) */ if (rr->rr_eadatalen == 0 && !(info->mti_spec.sp_cr_flags & MDS_OPEN_DELAY_CREATE)) - rr->rr_eadatalen = MIN_MD_SIZE; } + rr->rr_eadatalen = MIN_MD_SIZE; + } RETURN(0); } diff --git a/lustre/mdt/mdt_open.c b/lustre/mdt/mdt_open.c index f78f20e..816f570 100644 --- a/lustre/mdt/mdt_open.c +++ b/lustre/mdt/mdt_open.c @@ -123,6 +123,9 @@ static int mdt_create_data(struct mdt_thread_info *info, rc = mdo_create_data(info->mti_env, p ? mdt_object_child(p) : NULL, mdt_object_child(o), spec, ma); + if (rc == 0) + rc = mdt_attr_get_complex(info, o, ma); + if (rc == 0 && ma->ma_valid & MA_LOV) o->mot_flags |= MOF_LOV_CREATED; } @@ -340,7 +343,7 @@ static inline int mdt_ioepoch_close_reg(struct mdt_thread_info *info, tmp_ma->ma_som = &info->mti_u.som.data; tmp_ma->ma_need = MA_INODE | MA_LOV | MA_SOM; tmp_ma->ma_valid = 0; - rc = mo_attr_get(info->mti_env, mdt_object_child(o), tmp_ma); + rc = mdt_attr_get_complex(info, o, tmp_ma); if (rc) GOTO(error_up, rc); @@ -1000,11 +1003,9 @@ void mdt_reconstruct_open(struct mdt_thread_info *info, } rc = mdt_object_exists(child); if (rc > 0) { - struct md_object *next; mdt_set_capainfo(info, 1, rr->rr_fid2, BYPASS_CAPA); - next = mdt_object_child(child); - rc = mo_attr_get(env, next, ma); + rc = mdt_attr_get_complex(info, child, ma); if (rc == 0) rc = mdt_finish_open(info, parent, child, flags, 1, ldlm_rep); @@ -1038,7 +1039,6 @@ out: int mdt_open_by_fid(struct mdt_thread_info* info, struct ldlm_reply *rep) { - const struct lu_env *env = info->mti_env; __u32 flags = info->mti_spec.sp_cr_flags; struct mdt_reint_record *rr = &info->mti_rr; struct md_attr *ma = &info->mti_attr; @@ -1056,7 +1056,7 @@ int mdt_open_by_fid(struct mdt_thread_info* info, DISP_LOOKUP_EXECD | DISP_LOOKUP_POS)); - rc = mo_attr_get(env, mdt_object_child(o), ma); + rc = mdt_attr_get_complex(info, o, ma); if (rc == 0) rc = mdt_finish_open(info, NULL, o, flags, 0, rep); } else if (rc == 0) { @@ -1135,7 +1135,7 @@ int mdt_open_by_fid_lock(struct mdt_thread_info *info, struct ldlm_reply *rep, if (rc) GOTO(out, rc); - rc = mo_attr_get(env, mdt_object_child(o), ma); + rc = mdt_attr_get_complex(info, o, ma); if (rc) GOTO(out, rc); @@ -1194,7 +1194,7 @@ int mdt_cross_open(struct mdt_thread_info* info, goto out; mdt_set_capainfo(info, 0, fid, BYPASS_CAPA); - rc = mo_attr_get(info->mti_env, mdt_object_child(o), ma); + rc = mdt_attr_get_complex(info, o, ma); if (rc == 0) rc = mdt_finish_open(info, NULL, o, flags, 0, rep); } else if (rc == 0) { @@ -1413,14 +1413,18 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc) mdt_clear_disposition(info, ldlm_rep, DISP_OPEN_CREATE); GOTO(out_child, result); } else { + + /* XXX: we should call this once, see few lines below */ + if (result == 0) + result = mdt_attr_get_complex(info, child, ma); + if (result != 0) GOTO(out_child, result); } created = 1; } else { /* We have to get attr & lov ea for this object */ - result = mo_attr_get(info->mti_env, mdt_object_child(child), - ma); + result = mdt_attr_get_complex(info, child, ma); /* * The object is on remote node, return its FID for remote open. */ diff --git a/lustre/mdt/mdt_recovery.c b/lustre/mdt/mdt_recovery.c index 057b840..956dcfb 100644 --- a/lustre/mdt/mdt_recovery.c +++ b/lustre/mdt/mdt_recovery.c @@ -741,7 +741,7 @@ static void mdt_reconstruct_create(struct mdt_thread_info *mti, body = req_capsule_server_get(mti->mti_pill, &RMF_MDT_BODY); mti->mti_attr.ma_need = MA_INODE; mti->mti_attr.ma_valid = 0; - rc = mo_attr_get(mti->mti_env, mdt_object_child(child), &mti->mti_attr); + rc = mdt_attr_get_complex(mti, child, &mti->mti_attr); if (rc == -EREMOTE) { /* object was created on remote server */ req->rq_status = rc; @@ -781,7 +781,7 @@ static void mdt_reconstruct_setattr(struct mdt_thread_info *mti, } mti->mti_attr.ma_need = MA_INODE; mti->mti_attr.ma_valid = 0; - mo_attr_get(mti->mti_env, mdt_object_child(obj), &mti->mti_attr); + mdt_attr_get_complex(mti, obj, &mti->mti_attr); mdt_pack_attr2body(mti, body, &mti->mti_attr.ma_attr, mdt_object_fid(obj)); if (mti->mti_ioepoch && (mti->mti_ioepoch->flags & MF_EPOCH_OPEN)) { diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index 682b348..c2227e7 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -347,6 +347,9 @@ static int mdt_md_create(struct mdt_thread_info *info) rc = mdo_create(info->mti_env, next, lname, mdt_object_child(child), &info->mti_spec, ma); + if (rc == 0) + rc = mdt_attr_get_complex(info, child, ma); + if (rc == 0) { /* Return fid & attr to client. */ if (ma->ma_valid & MA_INODE) @@ -391,7 +394,7 @@ static int mdt_md_mkobj(struct mdt_thread_info *info) * recovery, just get attr in that case. */ if (mdt_object_exists(o) == 1) { - rc = mo_attr_get(info->mti_env, next, ma); + rc = mdt_attr_get_complex(info, o, ma); } else { /* * Here, NO permission check for object_create, @@ -476,7 +479,6 @@ static int mdt_reint_setattr(struct mdt_thread_info *info, struct mdt_export_data *med = &req->rq_export->exp_mdt_data; struct mdt_file_data *mfd; struct mdt_object *mo; - struct md_object *next; struct mdt_body *repbody; int som_au, rc, rc2; ENTRY; @@ -571,8 +573,7 @@ static int mdt_reint_setattr(struct mdt_thread_info *info, ma->ma_need = MA_INODE; ma->ma_valid = 0; - next = mdt_object_child(mo); - rc = mo_attr_get(info->mti_env, next, ma); + rc = mdt_attr_get_complex(info, mo, ma); if (rc != 0) GOTO(out_put, rc); @@ -767,6 +768,8 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA); rc = mdo_unlink(info->mti_env, mdt_object_child(mp), mdt_object_child(mc), lname, ma); + if (rc == 0 && !lu_object_is_dying(&mc->mot_header)) + rc = mdt_attr_get_complex(info, mc, ma); if (rc == 0) mdt_handle_last_unlink(info, mc, ma);