From: Mikhail Pershin Date: Thu, 29 Mar 2012 13:07:42 +0000 (+0800) Subject: LU-80 mds: use md_size supplied by client, repack reply X-Git-Tag: 2.2.52~37 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=4fd92d576124fd7772c9f718b83eb67f500e5cec LU-80 mds: use md_size supplied by client, repack reply - mdt uses only client easize to pack reply buffer with the same buffer sizes as client has. - introduce reply growing, when packing reply the proper MD size might be unknown so clients data is being used, but after request processing the proper size may be bigger than client expects and reply buffer must be re-packed. - if server data doesn't fit in buffer then it allocates bigger buffer instead of using reply message buffer. - rename mdt_shrink_reply into mdt_fix_reply. It does shrink and/or grow when needed. Upon growing the bigger MD is copied into new reply. - server unpacks ea attr size always in mdt_reint_record for all operation, so it is available always in rr_eadatasize. Port from: ORI-80 Author: Mikhail Pershin Signed-off-by: Mikhail Pershin Signed-off-by: Yu Jian Change-Id: Ieae6a1d4d07fdf7643ca6900d02d0dd962a07f6c Reviewed-on: http://review.whamcloud.com/1808 Tested-by: Hudson Tested-by: Maloo Reviewed-by: Oleg Drokin --- diff --git a/lustre/include/lustre_req_layout.h b/lustre/include/lustre_req_layout.h index fbde927..cd96f1a 100644 --- a/lustre/include/lustre_req_layout.h +++ b/lustre/include/lustre_req_layout.h @@ -128,7 +128,9 @@ void req_capsule_shrink(struct req_capsule *pill, const struct req_msg_field *field, unsigned int newlen, enum req_location loc); - +int req_capsule_server_grow(struct req_capsule *pill, + const struct req_msg_field *field, + unsigned int newlen); int req_layout_init(void); void req_layout_fini(void); diff --git a/lustre/include/md_object.h b/lustre/include/md_object.h index d4451ae..68fadcf 100644 --- a/lustre/include/md_object.h +++ b/lustre/include/md_object.h @@ -182,6 +182,7 @@ struct md_attr { struct lustre_capa *ma_capa; struct md_som_data *ma_som; int ma_lmm_size; + int ma_big_lmm_used:1; int ma_lmv_size; int ma_acl_size; int ma_cookie_size; diff --git a/lustre/mdc/mdc_lib.c b/lustre/mdc/mdc_lib.c index afbce57..f071439 100644 --- a/lustre/mdc/mdc_lib.c +++ b/lustre/mdc/mdc_lib.c @@ -328,7 +328,7 @@ void mdc_setattr_pack(struct ptlrpc_request *req, struct md_op_data *op_data, struct mdt_rec_setattr *rec; struct mdt_ioepoch *epoch; struct lov_user_md *lum = NULL; - + CLASSERT(sizeof(struct mdt_rec_reint) ==sizeof(struct mdt_rec_setattr)); rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT); mdc_setattr_pack_rec(rec, op_data); diff --git a/lustre/mdd/mdd_dir.c b/lustre/mdd/mdd_dir.c index 15be75a..099fcc3 100644 --- a/lustre/mdd/mdd_dir.c +++ b/lustre/mdd/mdd_dir.c @@ -2096,6 +2096,14 @@ static int mdd_create(const struct lu_env *env, } if (lmm && lmm_size > 0) { /* Set Lov here, do not get lmm again later */ + if (lmm_size > ma->ma_lmm_size) { + /* Reply buffer is smaller, need bigger one */ + mdd_max_lmm_buffer(env, lmm_size); + if (unlikely(info->mti_max_lmm == NULL)) + GOTO(cleanup, rc = -ENOMEM); + ma->ma_lmm = info->mti_max_lmm; + ma->ma_big_lmm_used = 1; + } memcpy(ma->ma_lmm, lmm, lmm_size); ma->ma_lmm_size = lmm_size; ma->ma_valid |= MA_LOV; diff --git a/lustre/mdd/mdd_internal.h b/lustre/mdd/mdd_internal.h index 4d312b7..f0294ce 100644 --- a/lustre/mdd/mdd_internal.h +++ b/lustre/mdd/mdd_internal.h @@ -175,6 +175,7 @@ extern const char orph_index_name[]; extern const struct dt_index_features orph_index_features; +struct lov_mds_md *mdd_max_lmm_buffer(const struct lu_env *env, int size); struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env, struct mdd_device *mdd); diff --git a/lustre/mdd/mdd_lov.c b/lustre/mdd/mdd_lov.c index ad6d8da..3fa7531 100644 --- a/lustre/mdd/mdd_lov.c +++ b/lustre/mdd/mdd_lov.c @@ -222,7 +222,8 @@ int mdd_get_md(const struct lu_env *env, struct mdd_object *obj, *md_size = 0; rc = 0; } else if (rc < 0) { - CERROR("Error %d reading eadata - %d\n", rc, *md_size); + CDEBUG(D_OTHER, "Error %d reading eadata - %d\n", + rc, *md_size); } else { /* XXX: Convert lov EA but fixed after verification test. */ *md_size = rc; @@ -463,24 +464,23 @@ int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd, } else { /* get lov ea from parent and set to lov */ struct lov_mds_md *_lmm; - int _lmm_size; + int _lmm_size = mdd_lov_mdsize(env, mdd); LASSERT(parent != NULL); - _lmm_size = mdd_lov_mdsize(env, mdd); _lmm = mdd_max_lmm_get(env, mdd); - if (_lmm == NULL) GOTO(out_oti, rc = -ENOMEM); rc = mdd_get_md_locked(env, parent, _lmm, &_lmm_size, XATTR_NAME_LOV); - if (rc > 0) + if (rc > 0) { + _lmm_size = mdd_lov_mdsize(env, mdd); rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE, - lov_exp, *lmm_size, + lov_exp, _lmm_size, &lsm, _lmm); - + } if (rc) GOTO(out_oti, rc); } diff --git a/lustre/mdd/mdd_object.c b/lustre/mdd/mdd_object.c index e4ff0ef..277f896 100644 --- a/lustre/mdd/mdd_object.c +++ b/lustre/mdd/mdd_object.c @@ -208,27 +208,37 @@ struct llog_cookie *mdd_max_cookie_get(const struct lu_env *env, return mti->mti_max_cookie; } -struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env, - struct mdd_device *mdd) +struct lov_mds_md *mdd_max_lmm_buffer(const struct lu_env *env, int size) { struct mdd_thread_info *mti = mdd_env_info(env); - int max_lmm_size; - max_lmm_size = mdd_lov_mdsize(env, mdd); - if (unlikely(mti->mti_max_lmm_size < max_lmm_size)) { - if (mti->mti_max_lmm) - OBD_FREE_LARGE(mti->mti_max_lmm, mti->mti_max_lmm_size); - mti->mti_max_lmm = NULL; - mti->mti_max_lmm_size = 0; - } - if (unlikely(mti->mti_max_lmm == NULL)) { - OBD_ALLOC_LARGE(mti->mti_max_lmm, max_lmm_size); + if (unlikely(mti->mti_max_lmm_size < size)) { + int rsize = size_roundup_power2(size); + + if (mti->mti_max_lmm_size > 0) { + LASSERT(mti->mti_max_lmm); + OBD_FREE_LARGE(mti->mti_max_lmm, + mti->mti_max_lmm_size); + mti->mti_max_lmm = NULL; + mti->mti_max_lmm_size = 0; + } + + OBD_ALLOC_LARGE(mti->mti_max_lmm, rsize); if (likely(mti->mti_max_lmm != NULL)) - mti->mti_max_lmm_size = max_lmm_size; + mti->mti_max_lmm_size = rsize; } return mti->mti_max_lmm; } +struct lov_mds_md *mdd_max_lmm_get(const struct lu_env *env, + struct mdd_device *mdd) +{ + int max_lmm_size; + + max_lmm_size = mdd_lov_mdsize(env, mdd); + return mdd_max_lmm_buffer(env, max_lmm_size); +} + struct lu_object *mdd_object_alloc(const struct lu_env *env, const struct lu_object_header *hdr, struct lu_device *d) @@ -613,6 +623,43 @@ static int is_rootdir(struct mdd_object *mdd_obj) return lu_fid_eq(&mdd_dev->mdd_root_fid, fid); } +int mdd_big_lmm_get(const struct lu_env *env, struct mdd_object *obj, + struct md_attr *ma) +{ + struct mdd_thread_info *info = mdd_env_info(env); + int size; + int rc; + ENTRY; + + LASSERT(info != NULL); + LASSERT(ma->ma_lmm_size > 0); + LASSERT(ma->ma_big_lmm_used == 0); + + rc = mdo_xattr_get(env, obj, &LU_BUF_NULL, XATTR_NAME_LOV, + mdd_object_capa(env, obj)); + if (rc < 0) + RETURN(rc); + + /* big_lmm may need to grow */ + size = rc; + mdd_max_lmm_buffer(env, size); + if (info->mti_max_lmm == NULL) + RETURN(-ENOMEM); + + LASSERT(info->mti_max_lmm_size >= size); + rc = mdd_get_md(env, obj, info->mti_max_lmm, &size, + XATTR_NAME_LOV); + if (rc < 0) + RETURN(rc); + + ma->ma_big_lmm_used = 1; + ma->ma_valid |= MA_LOV; + ma->ma_lmm = info->mti_max_lmm; + ma->ma_lmm_size = size; + LASSERT(size == rc); + RETURN(rc); +} + /* get lov EA only */ static int __mdd_lmm_get(const struct lu_env *env, struct mdd_object *mdd_obj, struct md_attr *ma) @@ -625,8 +672,11 @@ static int __mdd_lmm_get(const struct lu_env *env, rc = mdd_get_md(env, mdd_obj, ma->ma_lmm, &ma->ma_lmm_size, XATTR_NAME_LOV); - if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj)) + if (rc == -ERANGE) + rc = mdd_big_lmm_get(env, mdd_obj, ma); + else if (rc == 0 && (ma->ma_need & MA_LOV_DEF) && is_rootdir(mdd_obj)) rc = mdd_get_default_md(mdd_obj, ma->ma_lmm); + if (rc > 0) { ma->ma_lmm_size = rc; ma->ma_layout_gen = ma->ma_lmm->lmm_layout_gen; @@ -2337,7 +2387,6 @@ int mdd_object_kill(const struct lu_env *env, struct mdd_object *obj, if (S_ISREG(mdd_object_type(obj))) { /* Return LOV & COOKIES unconditionally here. We clean evth up. * Caller must be ready for that. */ - rc = __mdd_lmm_get(env, obj, ma); if ((ma->ma_valid & MA_LOV)) rc = mdd_unlink_log(env, mdo2mdd(&obj->mod_obj), diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 55ab803..a1aa523 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -706,8 +706,7 @@ static int mdt_getattr(struct mdt_thread_info *info) struct mdt_body *reqbody; struct mdt_body *repbody; mode_t mode; - int md_size; - int rc; + int rc, rc2; ENTRY; reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY); @@ -725,13 +724,12 @@ static int mdt_getattr(struct mdt_thread_info *info) LASSERT(lu_object_assert_exists(&obj->mot_obj.mo_lu)); mode = lu_object_attr(&obj->mot_obj.mo_lu); - if (S_ISLNK(mode) && (reqbody->valid & OBD_MD_LINKNAME) && - (reqbody->eadatasize > info->mti_mdt->mdt_max_mdsize)) - md_size = reqbody->eadatasize; - else - md_size = info->mti_mdt->mdt_max_mdsize; - req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, md_size); + /* old clients may not report needed easize, use max value then */ + req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, + reqbody->eadatasize == 0 ? + info->mti_mdt->mdt_max_mdsize : + reqbody->eadatasize); rc = req_capsule_server_pack(pill); if (unlikely(rc != 0)) @@ -766,7 +764,9 @@ out_shrink: mdt_counter_incr(req->rq_export, LPROC_MDT_GETATTR); mdt_client_compatibility(info); - mdt_shrink_reply(info); + rc2 = mdt_fix_reply(info); + if (rc == 0) + rc = rc2; return rc; } @@ -1124,7 +1124,7 @@ static int mdt_getattr_name(struct mdt_thread_info *info) struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD]; struct mdt_body *reqbody; struct mdt_body *repbody; - int rc; + int rc, rc2; ENTRY; reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY); @@ -1150,7 +1150,9 @@ static int mdt_getattr_name(struct mdt_thread_info *info) EXIT; out_shrink: mdt_client_compatibility(info); - mdt_shrink_reply(info); + rc2 = mdt_fix_reply(info); + if (rc == 0) + rc = rc2; return rc; } @@ -1538,19 +1540,26 @@ static int mdt_reint_internal(struct mdt_thread_info *info, __u32 op) { struct req_capsule *pill = info->mti_pill; - struct mdt_device *mdt = info->mti_mdt; struct md_quota *mq = md_quota(info->mti_env); struct mdt_body *repbody; - int rc = 0; + int rc = 0, rc2; ENTRY; - /* pack reply */ + + rc = mdt_reint_unpack(info, op); + if (rc != 0) { + CERROR("Can't unpack reint, rc %d\n", rc); + RETURN(err_serious(rc)); + } + + /* for replay (no_create) lmm is not needed, client has it already */ if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, - mdt->mdt_max_mdsize); + info->mti_rr.rr_eadatalen); + if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER)) req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER, - mdt->mdt_max_cookiesize); + info->mti_mdt->mdt_max_cookiesize); rc = req_capsule_server_pack(pill); if (rc != 0) { @@ -1565,27 +1574,13 @@ static int mdt_reint_internal(struct mdt_thread_info *info, repbody->aclsize = 0; } - if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) - GOTO(out_shrink, rc = err_serious(-EFAULT)); - - rc = mdt_reint_unpack(info, op); - if (rc != 0) { - CERROR("Can't unpack reint, rc %d\n", rc); - GOTO(out_shrink, rc = err_serious(rc)); - } - OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_REINT_DELAY, 10); /* for replay no cookkie / lmm need, because client have this already */ - if (info->mti_spec.no_create == 1) { + if (info->mti_spec.no_create) if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, 0); - if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER)) - req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER, - 0); - } - rc = mdt_init_ucred_reint(info); if (rc) GOTO(out_shrink, rc); @@ -1605,7 +1600,9 @@ out_ucred: mdt_exit_ucred(info); out_shrink: mdt_client_compatibility(info); - mdt_shrink_reply(info); + rc2 = mdt_fix_reply(info); + if (rc == 0) + rc = rc2; return rc; } @@ -2509,16 +2506,13 @@ static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags) rc = 0; if (rc == 0 && (flags & HABEO_REFERO)) { - struct mdt_device *mdt = info->mti_mdt; - /* Pack reply. */ - if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, - mdt->mdt_max_mdsize); + info->mti_body->eadatasize); if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER)) req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER, - mdt->mdt_max_cookiesize); + info->mti_mdt->mdt_max_cookiesize); rc = req_capsule_server_pack(pill); } @@ -3321,7 +3315,7 @@ static int mdt_intent_getattr(enum mdt_it_code opcode, struct ptlrpc_request *req; struct mdt_body *reqbody; struct mdt_body *repbody; - int rc; + int rc, rc2; ENTRY; reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY); @@ -3385,7 +3379,9 @@ out_ucred: mdt_exit_ucred(info); out_shrink: mdt_client_compatibility(info); - mdt_shrink_reply(info); + rc2 = mdt_fix_reply(info); + if (rc == 0) + rc = rc2; return rc; } @@ -4504,8 +4500,9 @@ static int mdt_init0(const struct lu_env *env, struct mdt_device *m, obd = class_name2obd(dev); LASSERT(obd != NULL); - m->mdt_max_mdsize = MAX_MD_SIZE; + m->mdt_max_mdsize = MAX_MD_SIZE; /* 4 stripes */ m->mdt_max_cookiesize = sizeof(struct llog_cookie); + m->mdt_som_conf = 0; m->mdt_opts.mo_cos = MDT_COS_DEFAULT; diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index 8913d67..9059459 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -397,7 +397,6 @@ struct mdt_thread_info { /* Ops object filename */ struct lu_name mti_name; - struct md_attr mti_tmp_attr; }; typedef void (*mdt_cb_t)(const struct mdt_device *mdt, __u64 transno, @@ -605,7 +604,7 @@ int mdt_close(struct mdt_thread_info *info); int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, struct md_attr *ma, int flags); int mdt_done_writing(struct mdt_thread_info *info); -void mdt_shrink_reply(struct mdt_thread_info *info); +int mdt_fix_reply(struct mdt_thread_info *info); int mdt_handle_last_unlink(struct mdt_thread_info *, struct mdt_object *, const struct md_attr *); void mdt_reconstruct_open(struct mdt_thread_info *, struct mdt_lock_handle *); diff --git a/lustre/mdt/mdt_lib.c b/lustre/mdt/mdt_lib.c index 2319add..bbfc4f1 100644 --- a/lustre/mdt/mdt_lib.c +++ b/lustre/mdt/mdt_lib.c @@ -550,12 +550,14 @@ void mdt_dump_lmm(int level, const struct lov_mds_md *lmm) le64_to_cpu(lod->l_object_id)); } -void mdt_shrink_reply(struct mdt_thread_info *info) +/* Shrink and/or grow reply buffers */ +int mdt_fix_reply(struct mdt_thread_info *info) { struct req_capsule *pill = info->mti_pill; struct mdt_body *body; - int md_size; + int md_size, md_packed = 0; int acl_size; + int rc = 0; ENTRY; body = req_capsule_server_get(pill, &RMF_MDT_BODY); @@ -588,9 +590,24 @@ void mdt_shrink_reply(struct mdt_thread_info *info) (optional) something else */ - if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) - req_capsule_shrink(pill, &RMF_MDT_MD, md_size, - RCL_SERVER); + /* MDT_MD buffer may be bigger than packed value, let's shrink all + * buffers before growing it */ + if (info->mti_attr.ma_big_lmm_used) { + LASSERT(req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)); + md_packed = req_capsule_get_size(pill, &RMF_MDT_MD, + RCL_SERVER); + LASSERT(md_packed > 0); + /* buffer must be allocated separately */ + LASSERT(info->mti_attr.ma_lmm != + req_capsule_server_get(pill, &RMF_MDT_MD)); + req_capsule_shrink(pill, &RMF_MDT_MD, 0, RCL_SERVER); + /* free big lmm if md_size is not needed */ + if (md_size == 0) + info->mti_attr.ma_big_lmm_used = 0; + } else if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) { + req_capsule_shrink(pill, &RMF_MDT_MD, md_size, RCL_SERVER); + } + if (req_capsule_has_field(pill, &RMF_ACL, RCL_SERVER)) req_capsule_shrink(pill, &RMF_ACL, acl_size, RCL_SERVER); else if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER)) @@ -609,7 +626,38 @@ void mdt_shrink_reply(struct mdt_thread_info *info) * Some more field should be shrinked if needed. * This should be done by those who added fields to reply message. */ - EXIT; + + /* Grow MD buffer if needed finally */ + if (info->mti_attr.ma_big_lmm_used) { + void *lmm; + + LASSERT(md_size > md_packed); + CDEBUG(D_INFO, "Enlarge reply buffer, need extra %d bytes\n", + md_size - md_packed); + rc = req_capsule_server_grow(pill, &RMF_MDT_MD, md_size); + if (rc) { + /* we can't answer with proper LOV EA, drop flags, + * the rc is also returned so this request is + * considered as failed */ + body->valid &= ~(OBD_MD_FLDIREA | OBD_MD_FLEASIZE); + /* don't return transno along with error */ + lustre_msg_set_transno(pill->rc_req->rq_repmsg, 0); + } else { + /* now we need to pack right LOV EA */ + lmm = req_capsule_server_get(pill, &RMF_MDT_MD); + LASSERT(req_capsule_get_size(pill, &RMF_MDT_MD, + RCL_SERVER) == + info->mti_attr.ma_lmm_size); + memcpy(lmm, info->mti_attr.ma_lmm, + info->mti_attr.ma_lmm_size); + } + /* update mdt_max_mdsize so clients will be aware about that */ + if (info->mti_mdt->mdt_max_mdsize < info->mti_attr.ma_lmm_size) + info->mti_mdt->mdt_max_mdsize = + info->mti_attr.ma_lmm_size; + info->mti_attr.ma_big_lmm_used = 0; + } + RETURN(rc); } @@ -837,6 +885,7 @@ static inline int mdt_dlmreq_unpack(struct mdt_thread_info *info) { static int mdt_setattr_unpack(struct mdt_thread_info *info) { + struct mdt_reint_record *rr = &info->mti_rr; struct md_attr *ma = &info->mti_attr; struct req_capsule *pill = info->mti_pill; int rc; @@ -849,10 +898,15 @@ static int mdt_setattr_unpack(struct mdt_thread_info *info) /* Epoch may be absent */ mdt_ioepoch_unpack(info); - ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_EADATA, RCL_CLIENT); - if (ma->ma_lmm_size) { - ma->ma_lmm = req_capsule_client_get(pill, &RMF_EADATA); - ma->ma_valid |= MA_LOV; + if (req_capsule_field_present(pill, &RMF_EADATA, RCL_CLIENT)) { + rr->rr_eadata = req_capsule_client_get(pill, &RMF_EADATA); + rr->rr_eadatalen = req_capsule_get_size(pill, &RMF_EADATA, + RCL_CLIENT); + ma->ma_lmm_size = rr->rr_eadatalen; + if (ma->ma_lmm_size > 0) { + ma->ma_lmm = (void *)rr->rr_eadata; + ma->ma_valid |= MA_LOV; + } } ma->ma_cookie_size = req_capsule_get_size(pill, &RMF_LOGCOOKIES, @@ -938,9 +992,11 @@ static int mdt_create_unpack(struct mdt_thread_info *info) req_capsule_extend(pill, &RQF_MDS_REINT_CREATE_RMT_ACL); LASSERT(req_capsule_field_present(pill, &RMF_EADATA, RCL_CLIENT)); - sp->u.sp_ea.eadata = req_capsule_client_get(pill, &RMF_EADATA); - sp->u.sp_ea.eadatalen = req_capsule_get_size(pill, &RMF_EADATA, - RCL_CLIENT); + rr->rr_eadata = req_capsule_client_get(pill, &RMF_EADATA); + rr->rr_eadatalen = req_capsule_get_size(pill, &RMF_EADATA, + RCL_CLIENT); + sp->u.sp_ea.eadata = rr->rr_eadata; + sp->u.sp_ea.eadatalen = rr->rr_eadatalen; sp->u.sp_ea.fid = rr->rr_fid1; RETURN(0); } @@ -955,11 +1011,13 @@ static int mdt_create_unpack(struct mdt_thread_info *info) req_capsule_extend(pill, &RQF_MDS_REINT_CREATE_SLAVE); LASSERT(req_capsule_field_present(pill, &RMF_EADATA, RCL_CLIENT)); - sp->u.sp_ea.eadata = req_capsule_client_get(pill, - &RMF_EADATA); - sp->u.sp_ea.eadatalen = req_capsule_get_size(pill, - &RMF_EADATA, - RCL_CLIENT); + rr->rr_eadata = req_capsule_client_get(pill, + &RMF_EADATA); + rr->rr_eadatalen = req_capsule_get_size(pill, + &RMF_EADATA, + RCL_CLIENT); + sp->u.sp_ea.eadata = rr->rr_eadata; + sp->u.sp_ea.eadatalen = rr->rr_eadatalen; sp->u.sp_ea.fid = rr->rr_fid1; RETURN(0); } @@ -1082,6 +1140,8 @@ static int mdt_unlink_unpack(struct mdt_thread_info *info) ma->ma_attr_flags &= ~MDS_VTX_BYPASS; info->mti_spec.no_create = !!req_is_replay(mdt_info_req(info)); + /* last unlink need LOV EA sent back */ + rr->rr_eadatalen = info->mti_mdt->mdt_max_mdsize; rc = mdt_dlmreq_unpack(info); RETURN(rc); @@ -1142,6 +1202,8 @@ static int mdt_rename_unpack(struct mdt_thread_info *info) ma->ma_attr_flags &= ~MDS_VTX_BYPASS; info->mti_spec.no_create = !!req_is_replay(mdt_info_req(info)); + /* rename may contain unlink so we might need LOV EA sent back */ + rr->rr_eadatalen = info->mti_mdt->mdt_max_mdsize; rc = mdt_dlmreq_unpack(info); RETURN(rc); @@ -1214,12 +1276,25 @@ static int mdt_open_unpack(struct mdt_thread_info *info) RETURN(-EFAULT); rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT) - 1; - sp->u.sp_ea.eadatalen = req_capsule_get_size(pill, &RMF_EADATA, - RCL_CLIENT); - if (sp->u.sp_ea.eadatalen) { - sp->u.sp_ea.eadata = req_capsule_client_get(pill, &RMF_EADATA); - sp->no_create = !!req_is_replay(req); - } + if (req_capsule_field_present(pill, &RMF_EADATA, RCL_CLIENT)) { + rr->rr_eadatalen = req_capsule_get_size(pill, &RMF_EADATA, + RCL_CLIENT); + if (rr->rr_eadatalen > 0) { + rr->rr_eadata = req_capsule_client_get(pill, + &RMF_EADATA); + sp->u.sp_ea.eadatalen = rr->rr_eadatalen; + sp->u.sp_ea.eadata = rr->rr_eadata; + sp->no_create = !!req_is_replay(req); + } + + /* + * Client default md_size may be 0 right after client start, + * until all osc are connected, set here just some reasonable + * value to prevent misbehavior. + */ + if (rr->rr_eadatalen == 0 && + !(info->mti_spec.sp_cr_flags & MDS_OPEN_DELAY_CREATE)) + rr->rr_eadatalen = MIN_MD_SIZE; } RETURN(0); } @@ -1266,11 +1341,20 @@ static int mdt_setxattr_unpack(struct mdt_thread_info *info) rr->rr_namelen = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT) - 1; LASSERT(rr->rr_namelen > 0); - rr->rr_eadatalen = req_capsule_get_size(pill, &RMF_EADATA, RCL_CLIENT); - if (rr->rr_eadatalen > 0) { - rr->rr_eadata = req_capsule_client_get(pill, &RMF_EADATA); - if (rr->rr_eadata == NULL) - RETURN(-EFAULT); + if (req_capsule_field_present(pill, &RMF_EADATA, RCL_CLIENT)) { + rr->rr_eadatalen = req_capsule_get_size(pill, &RMF_EADATA, + RCL_CLIENT); + if (rr->rr_eadatalen > 0) { + rr->rr_eadata = req_capsule_client_get(pill, + &RMF_EADATA); + if (rr->rr_eadata == NULL) + RETURN(-EFAULT); + } else { + rr->rr_eadata = NULL; + } + } else if (!(attr->la_valid & OBD_MD_FLXATTRRM)) { + CDEBUG(D_INFO, "no xattr data supplied\n"); + RETURN(-EFAULT); } RETURN(0); diff --git a/lustre/mdt/mdt_open.c b/lustre/mdt/mdt_open.c index 4301a98..e1f5eb3 100644 --- a/lustre/mdt/mdt_open.c +++ b/lustre/mdt/mdt_open.c @@ -1280,7 +1280,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc) if (result != -ENOENT) { if (req->rq_export->exp_libclient && - create_flags&MDS_OPEN_HAS_EA) + create_flags & MDS_OPEN_HAS_EA) GOTO(out, result = 0); GOTO(out, result); } @@ -1623,7 +1623,7 @@ int mdt_close(struct mdt_thread_info *info) if (mdt_check_resent(info, mdt_reconstruct_generic, NULL)) { mdt_client_compatibility(info); if (rc == 0) - mdt_shrink_reply(info); + mdt_fix_reply(info); RETURN(lustre_msg_get_status(req->rq_repmsg)); } @@ -1674,7 +1674,7 @@ int mdt_close(struct mdt_thread_info *info) } if (repbody != NULL) { mdt_client_compatibility(info); - mdt_shrink_reply(info); + rc = mdt_fix_reply(info); } if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index 032c0f9..9375140 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -480,7 +480,7 @@ static int mdt_reint_setattr(struct mdt_thread_info *info, struct mdt_object *mo; struct md_object *next; struct mdt_body *repbody; - int som_au, rc; + int som_au, rc, rc2; ENTRY; DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1), @@ -600,7 +600,9 @@ out: mdt_counter_incr(req->rq_export, LPROC_MDT_SETATTR); mdt_client_compatibility(info); - mdt_shrink_reply(info); + rc2 = mdt_fix_reply(info); + if (rc == 0) + rc = rc2; return rc; } diff --git a/lustre/mdt/mdt_xattr.c b/lustre/mdt/mdt_xattr.c index df4fb54..0c2a965 100644 --- a/lustre/mdt/mdt_xattr.c +++ b/lustre/mdt/mdt_xattr.c @@ -90,13 +90,11 @@ static int mdt_getxattr_pack_reply(struct mdt_thread_info * info) RETURN(-EINVAL); } - if (size < 0) { - if (size == -ENODATA) - size = 0; - else if (size != -EOPNOTSUPP) { - CDEBUG(D_INFO, "Error geting EA size: %d\n", size); - RETURN(size); - } + if (size == -ENODATA) { + size = 0; + } else if (size < 0) { + CERROR("Error geting EA size: %d\n", size); + RETURN(size); } if (info->mti_body->eadatasize != 0 && @@ -104,8 +102,7 @@ static int mdt_getxattr_pack_reply(struct mdt_thread_info * info) RETURN(-ERANGE); req_capsule_set_size(pill, &RMF_EADATA, RCL_SERVER, - min_t(int, size, info->mti_body->eadatasize)); - + info->mti_body->eadatasize == 0 ? 0 : size); rc = req_capsule_server_pack(pill); if (rc) { LASSERT(rc < 0); @@ -170,6 +167,7 @@ int mdt_getxattr(struct mdt_thread_info *info) if (easize == 0 || reqbody->eadatasize == 0) GOTO(out, rc = easize); + buf = &info->mti_buf; buf->lb_buf = req_capsule_server_get(info->mti_pill, &RMF_EADATA); buf->lb_len = easize; @@ -278,7 +276,6 @@ int mdt_reint_setxattr(struct mdt_thread_info *info, struct ptlrpc_request *req = mdt_info_req(info); struct md_ucred *uc = mdt_ucred(info); struct mdt_lock_handle *lh; - struct req_capsule *pill = info->mti_pill; const struct lu_env *env = info->mti_env; struct lu_buf *buf = &info->mti_buf; struct mdt_reint_record *rr = &info->mti_rr; @@ -287,8 +284,8 @@ int mdt_reint_setxattr(struct mdt_thread_info *info, struct mdt_object *obj; struct md_object *child; __u64 valid = attr->la_valid; - const char *xattr_name; - int xattr_len = 0; + const char *xattr_name = rr->rr_name; + int xattr_len = rr->rr_eadatalen; __u64 lockpart; int rc; posix_acl_xattr_header *new_xattr = NULL; @@ -301,8 +298,6 @@ int mdt_reint_setxattr(struct mdt_thread_info *info, if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SETXATTR)) RETURN(err_serious(-ENOMEM)); - xattr_name = rr->rr_name; - CDEBUG(D_INODE, "%s xattr %s\n", valid & OBD_MD_FLXATTR ? "set" : "remove", xattr_name); @@ -320,11 +315,6 @@ int mdt_reint_setxattr(struct mdt_thread_info *info, GOTO(out, rc = err_serious(-EPERM)); } - /* various sanity check for xattr name */ - xattr_name = req_capsule_client_get(pill, &RMF_NAME); - if (!xattr_name) - GOTO(out, rc = err_serious(-EFAULT)); - if (strncmp(xattr_name, XATTR_USER_PREFIX, sizeof(XATTR_USER_PREFIX) - 1) == 0) { if (!(req->rq_export->exp_connect_flags & OBD_CONNECT_XATTR)) @@ -341,8 +331,6 @@ int mdt_reint_setxattr(struct mdt_thread_info *info, strncmp(xattr_name, XATTR_NAME_ACL_DEFAULT, sizeof(XATTR_NAME_ACL_DEFAULT) - 1) == 0)) { /* currently lustre limit acl access size */ - xattr_len = req_capsule_get_size(pill, &RMF_EADATA, RCL_CLIENT); - if (xattr_len > LUSTRE_POSIX_ACL_MAX_SIZE) GOTO(out, -ERANGE); } @@ -382,19 +370,11 @@ int mdt_reint_setxattr(struct mdt_thread_info *info, attr->la_valid = LA_CTIME; child = mdt_object_child(obj); if (valid & OBD_MD_FLXATTR) { - char * xattr; - - if (!req_capsule_field_present(pill, &RMF_EADATA, RCL_CLIENT)) { - CDEBUG(D_INFO, "no xattr data supplied\n"); - GOTO(out_unlock, rc = -EFAULT); - } + char *xattr = (void *)rr->rr_eadata; - xattr_len = req_capsule_get_size(pill, &RMF_EADATA, RCL_CLIENT); - if (xattr_len) { + if (xattr_len > 0) { int flags = 0; - xattr = req_capsule_client_get(pill, &RMF_EADATA); - if (valid & OBD_MD_FLRMTLSETFACL) { if (unlikely(!remote)) GOTO(out_unlock, rc = -EINVAL); diff --git a/lustre/ptlrpc/layout.c b/lustre/ptlrpc/layout.c index 393fa7e..a23601f 100644 --- a/lustre/ptlrpc/layout.c +++ b/lustre/ptlrpc/layout.c @@ -2087,5 +2087,67 @@ void req_capsule_shrink(struct req_capsule *pill, } EXPORT_SYMBOL(req_capsule_shrink); +int req_capsule_server_grow(struct req_capsule *pill, + const struct req_msg_field *field, + unsigned int newlen) +{ + struct ptlrpc_reply_state *rs = pill->rc_req->rq_reply_state, *nrs; + char *from, *to; + int offset, len, rc; + + LASSERT(pill->rc_fmt != NULL); + LASSERT(__req_format_is_sane(pill->rc_fmt)); + LASSERT(req_capsule_has_field(pill, field, RCL_SERVER)); + LASSERT(req_capsule_field_present(pill, field, RCL_SERVER)); + + len = req_capsule_get_size(pill, field, RCL_SERVER); + offset = __req_capsule_offset(pill, field, RCL_SERVER); + if (pill->rc_req->rq_repbuf_len >= + lustre_packed_msg_size(pill->rc_req->rq_repmsg) - len + newlen) + CERROR("Inplace repack might be done\n"); + + pill->rc_req->rq_reply_state = NULL; + req_capsule_set_size(pill, field, RCL_SERVER, newlen); + rc = req_capsule_server_pack(pill); + if (rc) { + /* put old rs back, the caller will decide what to do */ + pill->rc_req->rq_reply_state = rs; + return rc; + } + nrs = pill->rc_req->rq_reply_state; + /* Now we need only buffers, copy first chunk */ + to = lustre_msg_buf(nrs->rs_msg, 0, 0); + from = lustre_msg_buf(rs->rs_msg, 0, 0); + len = (char *)lustre_msg_buf(rs->rs_msg, offset, 0) - from; + memcpy(to, from, len); + /* check if we have tail and copy it too */ + if (rs->rs_msg->lm_bufcount > offset + 1) { + to = lustre_msg_buf(nrs->rs_msg, offset + 1, 0); + from = lustre_msg_buf(rs->rs_msg, offset + 1, 0); + offset = rs->rs_msg->lm_bufcount - 1; + len = (char *)lustre_msg_buf(rs->rs_msg, offset, 0) + + cfs_size_round(rs->rs_msg->lm_buflens[offset]) - from; + memcpy(to, from, len); + } + /* drop old reply if everything is fine */ + if (rs->rs_difficult) { + /* copy rs data */ + int i; + + nrs->rs_difficult = 1; + nrs->rs_no_ack = rs->rs_no_ack; + for (i = 0; i < rs->rs_nlocks; i++) { + nrs->rs_locks[i] = rs->rs_locks[i]; + nrs->rs_modes[i] = rs->rs_modes[i]; + nrs->rs_nlocks++; + } + rs->rs_nlocks = 0; + rs->rs_difficult = 0; + rs->rs_no_ack = 0; + } + ptlrpc_rs_decref(rs); + return 0; +} +EXPORT_SYMBOL(req_capsule_server_grow); /* __REQ_LAYOUT_USER__ */ #endif