From 12d6356a48de70922975e38451059211c753252e Mon Sep 17 00:00:00 2001 From: Alex Zhuravlev Date: Tue, 20 Oct 2015 16:53:18 +0300 Subject: [PATCH] LU-7318 out: dynamic reply size every update on the initiator side can declare how many bytes it expects back. OUT packing library put these numbers on the wire and prepary an appropriate buffer for the reply. then OUT target do few checks to ensure individual replies fit their buffers. Change-Id: I443b5c879bc321c33efb70af665ecd2b2f7baa18 Signed-off-by: Alex Zhuravlev Reviewed-on: http://review.whamcloud.com/16889 Tested-by: Jenkins Reviewed-by: Fan Yong Reviewed-by: James Simmons Tested-by: Maloo Reviewed-by: Oleg Drokin --- lustre/include/lustre/lustre_idl.h | 4 +- lustre/include/lustre_update.h | 10 ++-- lustre/osp/osp_internal.h | 10 +++- lustre/osp/osp_object.c | 10 ++-- lustre/osp/osp_trans.c | 67 +++++++++++++++++----- lustre/ptlrpc/pack_generic.c | 4 +- lustre/ptlrpc/wiretest.c | 16 +++--- lustre/target/out_handler.c | 110 ++++++++++++++++++++++--------------- lustre/target/out_lib.c | 45 +++++++++------ lustre/utils/wirecheck.c | 4 +- lustre/utils/wiretest.c | 16 +++--- 11 files changed, 190 insertions(+), 106 deletions(-) diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 896ddb4..5106b37 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -3952,7 +3952,7 @@ object_update_param_size(const struct object_update_param *param) struct object_update { __u16 ou_type; /* enum update_type */ __u16 ou_params_count; /* update parameters count */ - __u32 ou_master_index; /* master MDT/OST index */ + __u32 ou_result_size; /* how many bytes can return */ __u32 ou_flags; /* enum update_flag */ __u32 ou_padding1; /* padding 1 */ __u64 ou_batchid; /* op transno on master */ @@ -3978,7 +3978,7 @@ struct out_update_header { __u32 ouh_magic; __u32 ouh_count; __u32 ouh_inline_length; - __u32 ouh_padding; + __u32 ouh_reply_size; __u32 ouh_inline_data[0]; }; diff --git a/lustre/include/lustre_update.h b/lustre/include/lustre_update.h index 0c69a70..1ec8b4d 100644 --- a/lustre/include/lustre_update.h +++ b/lustre/include/lustre_update.h @@ -227,7 +227,7 @@ object_update_result_insert(struct object_update_reply *reply, LASSERT(update_result != NULL); update_result->our_rc = ptlrpc_status_hton(rc); - if (data_len > 0) { + if (data != NULL && data_len > 0) { LASSERT(data != NULL); ptr = (char *)update_result + cfs_size_round(sizeof(struct object_update_reply)); @@ -261,7 +261,7 @@ object_update_result_data_get(const struct object_update_reply *reply, lbuf->lb_buf = update_result->our_data; lbuf->lb_len = update_result->our_datalen; - return 0; + return result; } /** @@ -390,7 +390,8 @@ struct thandle_exec_args { int out_update_pack(const struct lu_env *env, struct object_update *update, size_t *max_update_size, enum update_type op, const struct lu_fid *fid, unsigned int params_count, - __u16 *param_sizes, const void **param_bufs); + __u16 *param_sizes, const void **param_bufs, + __u32 reply_size); int out_create_pack(const struct lu_env *env, struct object_update *update, size_t *max_update_size, const struct lu_fid *fid, const struct lu_attr *attr, struct dt_allocation_hint *hint, @@ -435,7 +436,8 @@ int out_index_lookup_pack(const struct lu_env *env, const struct dt_key *key); int out_xattr_get_pack(const struct lu_env *env, struct object_update *update, size_t *max_update_size, - const struct lu_fid *fid, const char *name); + const struct lu_fid *fid, const char *name, + const int bufsize); int out_read_pack(const struct lu_env *env, struct object_update *update, size_t *max_update_length, const struct lu_fid *fid, size_t size, loff_t pos); diff --git a/lustre/osp/osp_internal.h b/lustre/osp/osp_internal.h index 26ecc45..1352d64 100644 --- a/lustre/osp/osp_internal.h +++ b/lustre/osp/osp_internal.h @@ -111,6 +111,8 @@ struct osp_update_request { /* List of osp_update_request_sub */ struct list_head our_req_list; + int our_req_nr; + int our_update_nr; struct list_head our_cb_items; @@ -118,6 +120,7 @@ struct osp_update_request { struct osp_thandle *our_th; /* linked to the list(ou_list) in osp_updates */ struct list_head our_list; + __u32 our_batchid; __u32 our_req_sent:1; }; @@ -630,8 +633,11 @@ int osp_object_update_request_create(struct osp_update_request *our, } else { \ if (ret == 0) { \ ours->ours_req->ourq_count++; \ + (our)->our_update_nr++; \ + object_update->ou_batchid = \ + (our)->our_batchid;\ object_update->ou_flags |= \ - update->our_flags; \ + (our)->our_flags; \ } \ break; \ } \ @@ -662,7 +668,7 @@ extern struct llog_operations osp_mds_ost_orig_logops; /* osp_trans.c */ int osp_insert_async_request(const struct lu_env *env, enum update_type op, struct osp_object *obj, int count, __u16 *lens, - const void **bufs, void *data, + const void **bufs, void *data, __u32 repsize, osp_update_interpreter_t interpreter); int osp_unplug_async_request(const struct lu_env *env, diff --git a/lustre/osp/osp_object.c b/lustre/osp/osp_object.c index 8571ce3..033044d 100644 --- a/lustre/osp/osp_object.c +++ b/lustre/osp/osp_object.c @@ -512,6 +512,7 @@ static int osp_declare_attr_get(const struct lu_env *env, struct dt_object *dt) mutex_lock(&osp->opd_async_requests_mutex); rc = osp_insert_async_request(env, OUT_ATTR_GET, obj, 0, NULL, NULL, &obj->opo_ooa->ooa_attr, + sizeof(struct obdo), osp_attr_get_interpterer); mutex_unlock(&osp->opd_async_requests_mutex); @@ -851,7 +852,8 @@ static int osp_declare_xattr_get(const struct lu_env *env, struct dt_object *dt, mutex_lock(&osp->opd_async_requests_mutex); rc = osp_insert_async_request(env, OUT_XATTR_GET, obj, 1, - &namelen, (const void **)&name, oxe, + &namelen, (const void **)&name, + oxe, buf->lb_len, osp_xattr_get_interpterer); if (rc != 0) { mutex_unlock(&osp->opd_async_requests_mutex); @@ -969,7 +971,7 @@ unlock: GOTO(out, rc = PTR_ERR(update)); rc = osp_update_rpc_pack(env, xattr_get, update, OUT_XATTR_GET, - lu_object_fid(&dt->do_lu), name); + lu_object_fid(&dt->do_lu), name, buf->lb_len); if (rc != 0) { CERROR("%s: Insert update error "DFID": rc = %d\n", dname, PFID(lu_object_fid(&dt->do_lu)), rc); @@ -977,7 +979,7 @@ unlock: } rc = osp_remote_sync(env, osp, update, &req); - if (rc != 0) { + if (rc < 0) { if (rc == -ENOENT) { dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS; obj->opo_non_exist = 1; @@ -1025,7 +1027,7 @@ unlock: GOTO(out, rc); if (buf->lb_buf == NULL) - GOTO(out, rc = rbuf->lb_len); + GOTO(out, rc); if (unlikely(buf->lb_len < rbuf->lb_len)) GOTO(out, rc = -ERANGE); diff --git a/lustre/osp/osp_trans.c b/lustre/osp/osp_trans.c index d49bf7e..b055d33 100644 --- a/lustre/osp/osp_trans.c +++ b/lustre/osp/osp_trans.c @@ -143,6 +143,7 @@ int osp_object_update_request_create(struct osp_update_request *our, ours->ours_req_size = size; INIT_LIST_HEAD(&ours->ours_list); list_add_tail(&ours->ours_list, &our->our_req_list); + our->our_req_nr++; return 0; } @@ -228,12 +229,13 @@ object_update_request_dump(const struct object_update_request *ourq, update = object_update_request_get(ourq, i, &size); LASSERT(update != NULL); - CDEBUG(mask, "i = %u fid = "DFID" op = %s master = %u" - "params = %d batchid = "LPU64" size = %zu\n", + CDEBUG(mask, "i = %u fid = "DFID" op = %s " + "params = %d batchid = "LPU64" size = %zu repsize %u\n", i, PFID(&update->ou_fid), update_op_str(update->ou_type), - update->ou_master_index, update->ou_params_count, - update->ou_batchid, size); + update->ou_params_count, + update->ou_batchid, size, + (unsigned)update->ou_result_size); total_size += size; } @@ -257,12 +259,17 @@ object_update_request_dump(const struct object_update_request *ourq, */ int osp_prep_inline_update_req(const struct lu_env *env, struct ptlrpc_request *req, - struct osp_update_request_sub *ours) + struct osp_update_request *our, + int repsize) { + struct osp_update_request_sub *ours; struct out_update_header *ouh; - __u32 update_req_size = object_update_request_size(ours->ours_req); + __u32 update_req_size; int rc; + ours = list_entry(our->our_req_list.next, + struct osp_update_request_sub, ours_list); + update_req_size = object_update_request_size(ours->ours_req); req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_HEADER, RCL_CLIENT, update_req_size + sizeof(*ouh)); @@ -274,11 +281,12 @@ int osp_prep_inline_update_req(const struct lu_env *env, ouh->ouh_magic = OUT_UPDATE_HEADER_MAGIC; ouh->ouh_count = 1; ouh->ouh_inline_length = update_req_size; + ouh->ouh_reply_size = repsize; memcpy(ouh->ouh_inline_data, ours->ours_req, update_req_size); req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY, - RCL_SERVER, OUT_UPDATE_REPLY_SIZE); + RCL_SERVER, repsize); ptlrpc_request_set_replen(req); req->rq_request_portal = OUT_PORTAL; @@ -308,16 +316,41 @@ int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp, struct ptlrpc_request *req; struct ptlrpc_bulk_desc *desc; struct osp_update_request_sub *ours; + const struct object_update_request *ourq; struct out_update_header *ouh; struct out_update_buffer *oub; __u32 buf_count = 0; - int rc; + int repsize = 0; + struct object_update_reply *reply; + int rc, i; + int total = 0; ENTRY; list_for_each_entry(ours, &our->our_req_list, ours_list) { object_update_request_dump(ours->ours_req, D_INFO); + + ourq = ours->ours_req; + for (i = 0; i < ourq->ourq_count; i++) { + struct object_update *update; + size_t size = 0; + + + /* XXX: it's very inefficient to lookup update + * this way, iterating from the beginning + * each time */ + update = object_update_request_get(ourq, i, &size); + LASSERT(update != NULL); + + repsize += sizeof(reply->ourp_lens[0]); + repsize += sizeof(struct object_update_result); + repsize += update->ou_result_size; + } + buf_count++; } + repsize += sizeof(*reply); + repsize = (repsize + OUT_UPDATE_REPLY_SIZE - 1) & + ~(OUT_UPDATE_REPLY_SIZE - 1); LASSERT(buf_count > 0); req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE); @@ -332,7 +365,7 @@ int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp, if (object_update_request_size(ours->ours_req) + sizeof(struct out_update_header) < OUT_UPDATE_MAX_INLINE_SIZE) { - rc = osp_prep_inline_update_req(env, req, ours); + rc = osp_prep_inline_update_req(env, req, our, repsize); if (rc == 0) *reqp = req; GOTO(out_req, rc); @@ -353,6 +386,7 @@ int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp, ouh->ouh_magic = OUT_UPDATE_HEADER_MAGIC; ouh->ouh_count = buf_count; ouh->ouh_inline_length = 0; + ouh->ouh_reply_size = repsize; oub = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE_BUF); list_for_each_entry(ours, &our->our_req_list, ours_list) { oub->oub_size = ours->ours_req_size; @@ -368,12 +402,15 @@ int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp, GOTO(out_req, rc = -ENOMEM); /* NB req now owns desc and will free it when it gets freed */ - list_for_each_entry(ours, &our->our_req_list, ours_list) + list_for_each_entry(ours, &our->our_req_list, ours_list) { desc->bd_frag_ops->add_iov_frag(desc, ours->ours_req, ours->ours_req_size); + total += ours->ours_req_size; + } + CDEBUG(D_OTHER, "total %d in %u\n", total, our->our_update_nr); req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY, - RCL_SERVER, OUT_UPDATE_REPLY_SIZE); + RCL_SERVER, repsize); ptlrpc_request_set_replen(req); req->rq_request_portal = OUT_PORTAL; @@ -715,6 +752,7 @@ int osp_insert_update_callback(const struct lu_env *env, * \param[in] lens buffer length array for the subsequent \a bufs * \param[in] bufs the buffers to compose the request * \param[in] data pointer to the data used by the interpreter + * \param[in] repsize how many bytes the caller allocated for \a data * \param[in] interpreter pointer to the interpreter function * * \retval 0 for success @@ -722,7 +760,8 @@ int osp_insert_update_callback(const struct lu_env *env, */ int osp_insert_async_request(const struct lu_env *env, enum update_type op, struct osp_object *obj, int count, - __u16 *lens, const void **bufs, void *data, + __u16 *lens, const void **bufs, + void *data, __u32 repsize, osp_update_interpreter_t interpreter) { struct osp_device *osp; @@ -748,7 +787,8 @@ again: object_update = update_buffer_get_update(ureq, ureq->ourq_count); rc = out_update_pack(env, object_update, &max_update_size, op, - lu_object_fid(osp2lu_obj(obj)), count, lens, bufs); + lu_object_fid(osp2lu_obj(obj)), count, lens, bufs, + repsize); /* The queue is full. */ if (rc == -E2BIG) { osp->opd_async_requests = NULL; @@ -769,6 +809,7 @@ again: RETURN(rc); ureq->ourq_count++; + our->our_update_nr++; } rc = osp_insert_update_callback(env, our, obj, data, interpreter); diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index 362b3dc..02aafad 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -2518,7 +2518,7 @@ void lustre_swab_object_update(struct object_update *ou) __swab16s(&ou->ou_type); __swab16s(&ou->ou_params_count); - __swab32s(&ou->ou_master_index); + __swab32s(&ou->ou_result_size); __swab32s(&ou->ou_flags); __swab32s(&ou->ou_padding1); __swab64s(&ou->ou_batchid); @@ -2579,7 +2579,7 @@ void lustre_swab_out_update_header(struct out_update_header *ouh) __swab32s(&ouh->ouh_magic); __swab32s(&ouh->ouh_count); __swab32s(&ouh->ouh_inline_length); - __swab32s(&ouh->ouh_padding); + __swab32s(&ouh->ouh_reply_size); } EXPORT_SYMBOL(lustre_swab_out_update_header); diff --git a/lustre/ptlrpc/wiretest.c b/lustre/ptlrpc/wiretest.c index aa28a3f..4a5a4c3 100644 --- a/lustre/ptlrpc/wiretest.c +++ b/lustre/ptlrpc/wiretest.c @@ -4563,10 +4563,10 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct object_update, ou_params_count)); LASSERTF((int)sizeof(((struct object_update *)0)->ou_params_count) == 2, "found %lld\n", (long long)(int)sizeof(((struct object_update *)0)->ou_params_count)); - LASSERTF((int)offsetof(struct object_update, ou_master_index) == 4, "found %lld\n", - (long long)(int)offsetof(struct object_update, ou_master_index)); - LASSERTF((int)sizeof(((struct object_update *)0)->ou_master_index) == 4, "found %lld\n", - (long long)(int)sizeof(((struct object_update *)0)->ou_master_index)); + LASSERTF((int)offsetof(struct object_update, ou_result_size) == 4, "found %lld\n", + (long long)(int)offsetof(struct object_update, ou_result_size)); + LASSERTF((int)sizeof(((struct object_update *)0)->ou_result_size) == 4, "found %lld\n", + (long long)(int)sizeof(((struct object_update *)0)->ou_result_size)); LASSERTF((int)offsetof(struct object_update, ou_flags) == 8, "found %lld\n", (long long)(int)offsetof(struct object_update, ou_flags)); LASSERTF((int)sizeof(((struct object_update *)0)->ou_flags) == 4, "found %lld\n", @@ -4663,10 +4663,10 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct out_update_header, ouh_inline_length)); LASSERTF((int)sizeof(((struct out_update_header *)0)->ouh_inline_length) == 4, "found %lld\n", (long long)(int)sizeof(((struct out_update_header *)0)->ouh_inline_length)); - LASSERTF((int)offsetof(struct out_update_header, ouh_padding) == 12, "found %lld\n", - (long long)(int)offsetof(struct out_update_header, ouh_padding)); - LASSERTF((int)sizeof(((struct out_update_header *)0)->ouh_padding) == 4, "found %lld\n", - (long long)(int)sizeof(((struct out_update_header *)0)->ouh_padding)); + LASSERTF((int)offsetof(struct out_update_header, ouh_reply_size) == 12, "found %lld\n", + (long long)(int)offsetof(struct out_update_header, ouh_reply_size)); + LASSERTF((int)sizeof(((struct out_update_header *)0)->ouh_reply_size) == 4, "found %lld\n", + (long long)(int)sizeof(((struct out_update_header *)0)->ouh_reply_size)); LASSERTF((int)offsetof(struct out_update_header, ouh_inline_data) == 16, "found %lld\n", (long long)(int)offsetof(struct out_update_header, ouh_inline_data)); LASSERTF((int)sizeof(((struct out_update_header *)0)->ouh_inline_data) == 0, "found %lld\n", diff --git a/lustre/target/out_handler.c b/lustre/target/out_handler.c index b38e1eb..e725424 100644 --- a/lustre/target/out_handler.c +++ b/lustre/target/out_handler.c @@ -185,6 +185,7 @@ static int out_attr_get(struct tgt_session_info *tsi) { const struct lu_env *env = tsi->tsi_env; struct tgt_thread_info *tti = tgt_th_info(env); + struct object_update *update = tti->tti_u.update.tti_update; struct obdo *obdo = &tti->tti_u.update.tti_obdo; struct lu_attr *la = &tti->tti_attr; struct dt_object *obj = tti->tti_u.update.tti_dt_object; @@ -193,6 +194,9 @@ static int out_attr_get(struct tgt_session_info *tsi) ENTRY; + if (unlikely(update->ou_result_size < sizeof(*obdo))) + return -EPROTO; + if (!lu_object_exists(&obj->do_lu)) { /* Usually, this will be called when the master MDT try * to init a remote object(see osp_object_init), so if @@ -260,19 +264,15 @@ static int out_xattr_get(struct tgt_session_info *tsi) RETURN(err_serious(-EPROTO)); } + lbuf->lb_len = (int)tti->tti_u.update.tti_update->ou_result_size; lbuf->lb_buf = update_result->our_data; - lbuf->lb_len = OUT_UPDATE_REPLY_SIZE - - cfs_size_round((unsigned long)update_result->our_data - - (unsigned long)update_result); + if (lbuf->lb_len == 0) + lbuf->lb_buf = 0; dt_read_lock(env, obj, MOR_TGT_CHILD); rc = dt_xattr_get(env, obj, lbuf, name); dt_read_unlock(env, obj); - if (rc < 0) { + if (rc < 0) lbuf->lb_len = 0; - GOTO(out, rc); - } - lbuf->lb_len = rc; - rc = 0; CDEBUG(D_INFO, "%s: "DFID" get xattr %s len %d\n", tgt_name(tsi->tsi_tgt), PFID(lu_object_fid(&obj->do_lu)), name, (int)lbuf->lb_len); @@ -281,7 +281,7 @@ static int out_xattr_get(struct tgt_session_info *tsi) out: object_update_result_insert(reply, lbuf->lb_buf, lbuf->lb_len, idx, rc); - RETURN(rc); + RETURN(0); } static int out_index_lookup(struct tgt_session_info *tsi) @@ -295,6 +295,9 @@ static int out_index_lookup(struct tgt_session_info *tsi) ENTRY; + if (unlikely(update->ou_result_size < sizeof(tti->tti_fid1))) + return -EPROTO; + if (!lu_object_exists(&obj->do_lu)) RETURN(-ENOENT); @@ -904,7 +907,8 @@ int out_handle(struct tgt_session_info *tsi) unsigned int reply_index = 0; int rc = 0; int rc1 = 0; - int ouh_size; + int ouh_size, reply_size; + int updates; ENTRY; req_capsule_set(pill, &RQF_OUT_UPDATE); @@ -928,15 +932,6 @@ int out_handle(struct tgt_session_info *tsi) if (update_buf_count == 0) RETURN(err_serious(-EPROTO)); - req_capsule_set_size(pill, &RMF_OUT_UPDATE_REPLY, RCL_SERVER, - OUT_UPDATE_REPLY_SIZE); - rc = req_capsule_server_pack(pill); - if (rc != 0) { - CERROR("%s: Can't pack response: rc = %d\n", - tgt_name(tsi->tsi_tgt), rc); - RETURN(rc); - } - OBD_ALLOC(update_bufs, sizeof(*update_bufs) * update_buf_count); if (update_bufs == NULL) RETURN(-ENOMEM); @@ -981,19 +976,13 @@ int out_handle(struct tgt_session_info *tsi) if (rc < 0) GOTO(out_free, rc); } - /* Prepare the update reply buffer */ - reply = req_capsule_server_get(pill, &RMF_OUT_UPDATE_REPLY); - if (reply == NULL) - GOTO(out_free, rc = err_serious(-EPROTO)); - reply->ourp_magic = UPDATE_REPLY_MAGIC; - tti->tti_u.update.tti_update_reply = reply; - tti->tti_mult_trans = !req_is_replay(tgt_ses_req(tsi)); - /* validate the request and calculate the total update count and * set it to reply */ + reply_size = 0; + updates = 0; for (i = 0; i < update_buf_count; i++) { - struct object_update_request *our; - int update_count; + struct object_update_request *our; + int j; our = update_bufs[i]; if (ptlrpc_req_need_swab(pill->rc_req)) @@ -1006,9 +995,56 @@ int out_handle(struct tgt_session_info *tsi) UPDATE_REQUEST_MAGIC, -EPROTO); GOTO(out_free, rc = -EPROTO); } - update_count = our->ourq_count; - reply->ourp_count += update_count; + updates += our->ourq_count; + + /* need to calculate reply size */ + for (j = 0; j < our->ourq_count; j++) { + update = object_update_request_get(our, j, NULL); + if (update == NULL) + GOTO(out, rc = -EPROTO); + if (ptlrpc_req_need_swab(pill->rc_req)) + lustre_swab_object_update(update); + + if (!fid_is_sane(&update->ou_fid)) { + CERROR("%s: invalid FID "DFID": rc = %d\n", + tgt_name(tsi->tsi_tgt), + PFID(&update->ou_fid), -EPROTO); + GOTO(out, rc = err_serious(-EPROTO)); + } + + /* XXX: what ou_result_size can be considered safe? */ + + reply_size += sizeof(reply->ourp_lens[0]); + reply_size += sizeof(struct object_update_result); + reply_size += update->ou_result_size; + } } + reply_size += sizeof(*reply); + + if (unlikely(reply_size > ouh->ouh_reply_size)) { + CERROR("%s: too small reply buf %u for %u, need %u at least\n", + tgt_name(tsi->tsi_tgt), ouh->ouh_reply_size, + updates, reply_size); + GOTO(out_free, rc = -EPROTO); + } + + req_capsule_set_size(pill, &RMF_OUT_UPDATE_REPLY, RCL_SERVER, + ouh->ouh_reply_size); + rc = req_capsule_server_pack(pill); + if (rc != 0) { + CERROR("%s: Can't pack response: rc = %d\n", + tgt_name(tsi->tsi_tgt), rc); + GOTO(out_free, rc = -EPROTO); + } + + /* Prepare the update reply buffer */ + reply = req_capsule_server_get(pill, &RMF_OUT_UPDATE_REPLY); + if (reply == NULL) + GOTO(out_free, rc = err_serious(-EPROTO)); + reply->ourp_magic = UPDATE_REPLY_MAGIC; + reply->ourp_count = updates; + tti->tti_u.update.tti_update_reply = reply; + tti->tti_mult_trans = !req_is_replay(tgt_ses_req(tsi)); /* Walk through updates in the request to execute them */ for (i = 0; i < update_buf_count; i++) { @@ -1022,18 +1058,6 @@ int out_handle(struct tgt_session_info *tsi) update_count = our->ourq_count; for (j = 0; j < update_count; j++) { update = object_update_request_get(our, j, NULL); - if (update == NULL) - GOTO(out, rc = -EPROTO); - - if (ptlrpc_req_need_swab(pill->rc_req)) - lustre_swab_object_update(update); - - if (!fid_is_sane(&update->ou_fid)) { - CERROR("%s: invalid FID "DFID": rc = %d\n", - tgt_name(tsi->tsi_tgt), - PFID(&update->ou_fid), -EPROTO); - GOTO(out, rc = err_serious(-EPROTO)); - } dt_obj = dt_locate(env, dt, &update->ou_fid); if (IS_ERR(dt_obj)) diff --git a/lustre/target/out_lib.c b/lustre/target/out_lib.c index 0f3e037..abd7963 100644 --- a/lustre/target/out_lib.c +++ b/lustre/target/out_lib.c @@ -91,12 +91,17 @@ int out_update_header_pack(const struct lu_env *env, size_t *max_update_size, enum update_type update_op, const struct lu_fid *fid, - unsigned int param_count, __u16 *param_sizes) + unsigned int param_count, + __u16 *param_sizes, + __u32 reply_size) { struct object_update_param *param; unsigned int i; size_t update_size; + if (((reply_size + 7) >> 3) >= 1ULL << 16) + return -EINVAL; + /* Check whether the packing exceeding the maxima update length */ update_size = sizeof(*update); for (i = 0; i < param_count; i++) @@ -110,6 +115,7 @@ int out_update_header_pack(const struct lu_env *env, update->ou_fid = *fid; update->ou_type = update_op; update->ou_params_count = param_count; + update->ou_result_size = reply_size; param = &update->ou_params[0]; for (i = 0; i < param_count; i++) { param->oup_len = param_sizes[i]; @@ -138,7 +144,8 @@ int out_update_header_pack(const struct lu_env *env, int out_update_pack(const struct lu_env *env, struct object_update *update, size_t *max_update_size, enum update_type op, const struct lu_fid *fid, unsigned int param_count, - __u16 *param_sizes, const void **param_bufs) + __u16 *param_sizes, const void **param_bufs, + __u32 reply_size) { struct object_update_param *param; unsigned int i; @@ -146,7 +153,7 @@ int out_update_pack(const struct lu_env *env, struct object_update *update, ENTRY; rc = out_update_header_pack(env, update, max_update_size, op, fid, - param_count, param_sizes); + param_count, param_sizes, reply_size); if (rc != 0) RETURN(rc); @@ -195,7 +202,7 @@ int out_create_pack(const struct lu_env *env, struct object_update *update, } rc = out_update_header_pack(env, update, max_update_size, OUT_CREATE, - fid, buf_count, sizes); + fid, buf_count, sizes, 0); if (rc != 0) RETURN(rc); @@ -221,7 +228,7 @@ int out_ref_del_pack(const struct lu_env *env, struct object_update *update, size_t *max_update_size, const struct lu_fid *fid) { return out_update_pack(env, update, max_update_size, OUT_REF_DEL, fid, - 0, NULL, NULL); + 0, NULL, NULL, 0); } EXPORT_SYMBOL(out_ref_del_pack); @@ -229,7 +236,7 @@ int out_ref_add_pack(const struct lu_env *env, struct object_update *update, size_t *max_update_size, const struct lu_fid *fid) { return out_update_pack(env, update, max_update_size, OUT_REF_ADD, fid, - 0, NULL, NULL); + 0, NULL, NULL, 0); } EXPORT_SYMBOL(out_ref_add_pack); @@ -243,7 +250,7 @@ int out_attr_set_pack(const struct lu_env *env, struct object_update *update, ENTRY; rc = out_update_header_pack(env, update, max_update_size, - OUT_ATTR_SET, fid, 1, &size); + OUT_ATTR_SET, fid, 1, &size, 0); if (rc != 0) RETURN(rc); @@ -266,7 +273,7 @@ int out_xattr_set_pack(const struct lu_env *env, struct object_update *update, (char *)&flag}; return out_update_pack(env, update, max_update_size, OUT_XATTR_SET, - fid, ARRAY_SIZE(sizes), sizes, bufs); + fid, ARRAY_SIZE(sizes), sizes, bufs, 0); } EXPORT_SYMBOL(out_xattr_set_pack); @@ -277,7 +284,7 @@ int out_xattr_del_pack(const struct lu_env *env, struct object_update *update, __u16 size = strlen(name) + 1; return out_update_pack(env, update, max_update_size, OUT_XATTR_DEL, - fid, 1, &size, (const void **)&name); + fid, 1, &size, (const void **)&name, 0); } EXPORT_SYMBOL(out_xattr_del_pack); @@ -299,7 +306,7 @@ int out_index_insert_pack(const struct lu_env *env, fid_cpu_to_le(&rec_fid, rec1->rec_fid); return out_update_pack(env, update, max_update_size, OUT_INDEX_INSERT, - fid, ARRAY_SIZE(sizes), sizes, bufs); + fid, ARRAY_SIZE(sizes), sizes, bufs, 0); } EXPORT_SYMBOL(out_index_insert_pack); @@ -312,7 +319,7 @@ int out_index_delete_pack(const struct lu_env *env, const void *buf = key; return out_update_pack(env, update, max_update_size, OUT_INDEX_DELETE, - fid, 1, &size, &buf); + fid, 1, &size, &buf, 0); } EXPORT_SYMBOL(out_index_delete_pack); @@ -321,7 +328,7 @@ int out_object_destroy_pack(const struct lu_env *env, size_t *max_update_size, const struct lu_fid *fid) { return out_update_pack(env, update, max_update_size, OUT_DESTROY, fid, - 0, NULL, NULL); + 0, NULL, NULL, 0); } EXPORT_SYMBOL(out_object_destroy_pack); @@ -336,7 +343,7 @@ int out_write_pack(const struct lu_env *env, struct object_update *update, pos = cpu_to_le64(pos); rc = out_update_pack(env, update, max_update_size, OUT_WRITE, fid, - ARRAY_SIZE(sizes), sizes, bufs); + ARRAY_SIZE(sizes), sizes, bufs, 0); return rc; } EXPORT_SYMBOL(out_write_pack); @@ -363,8 +370,9 @@ int out_index_lookup_pack(const struct lu_env *env, const void *name = key; __u16 size = strlen((char *)name) + 1; + /* XXX: this shouldn't be hardcoded */ return out_update_pack(env, update, max_update_size, OUT_INDEX_LOOKUP, - fid, 1, &size, &name); + fid, 1, &size, &name, 256); } EXPORT_SYMBOL(out_index_lookup_pack); @@ -372,13 +380,13 @@ int out_attr_get_pack(const struct lu_env *env, struct object_update *update, size_t *max_update_size, const struct lu_fid *fid) { return out_update_pack(env, update, max_update_size, OUT_ATTR_GET, - fid, 0, NULL, NULL); + fid, 0, NULL, NULL, sizeof(struct obdo)); } EXPORT_SYMBOL(out_attr_get_pack); int out_xattr_get_pack(const struct lu_env *env, struct object_update *update, size_t *max_update_size, const struct lu_fid *fid, - const char *name) + const char *name, const int bufsize) { __u16 size; @@ -386,7 +394,7 @@ int out_xattr_get_pack(const struct lu_env *env, struct object_update *update, size = strlen(name) + 1; return out_update_pack(env, update, max_update_size, OUT_XATTR_GET, - fid, 1, &size, (const void **)&name); + fid, 1, &size, (const void **)&name, bufsize); } EXPORT_SYMBOL(out_xattr_get_pack); @@ -397,11 +405,12 @@ int out_read_pack(const struct lu_env *env, struct object_update *update, __u16 sizes[2] = {sizeof(size), sizeof(pos)}; const void *bufs[2] = {&size, &pos}; + LASSERT(size > 0); size = cpu_to_le64(size); pos = cpu_to_le64(pos); return out_update_pack(env, update, max_update_size, OUT_READ, fid, - ARRAY_SIZE(sizes), sizes, bufs); + ARRAY_SIZE(sizes), sizes, bufs, size); } EXPORT_SYMBOL(out_read_pack); diff --git a/lustre/utils/wirecheck.c b/lustre/utils/wirecheck.c index 6f519f0..e72a556 100644 --- a/lustre/utils/wirecheck.c +++ b/lustre/utils/wirecheck.c @@ -2087,7 +2087,7 @@ static void check_object_update(void) CHECK_STRUCT(object_update); CHECK_MEMBER(object_update, ou_type); CHECK_MEMBER(object_update, ou_params_count); - CHECK_MEMBER(object_update, ou_master_index); + CHECK_MEMBER(object_update, ou_result_size); CHECK_MEMBER(object_update, ou_flags); CHECK_MEMBER(object_update, ou_padding1); CHECK_MEMBER(object_update, ou_batchid); @@ -2132,7 +2132,7 @@ static void check_out_update_header(void) CHECK_MEMBER(out_update_header, ouh_magic); CHECK_MEMBER(out_update_header, ouh_count); CHECK_MEMBER(out_update_header, ouh_inline_length); - CHECK_MEMBER(out_update_header, ouh_padding); + CHECK_MEMBER(out_update_header, ouh_reply_size); CHECK_MEMBER(out_update_header, ouh_inline_data); } diff --git a/lustre/utils/wiretest.c b/lustre/utils/wiretest.c index ab0487b..bec44ea 100644 --- a/lustre/utils/wiretest.c +++ b/lustre/utils/wiretest.c @@ -4574,10 +4574,10 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct object_update, ou_params_count)); LASSERTF((int)sizeof(((struct object_update *)0)->ou_params_count) == 2, "found %lld\n", (long long)(int)sizeof(((struct object_update *)0)->ou_params_count)); - LASSERTF((int)offsetof(struct object_update, ou_master_index) == 4, "found %lld\n", - (long long)(int)offsetof(struct object_update, ou_master_index)); - LASSERTF((int)sizeof(((struct object_update *)0)->ou_master_index) == 4, "found %lld\n", - (long long)(int)sizeof(((struct object_update *)0)->ou_master_index)); + LASSERTF((int)offsetof(struct object_update, ou_result_size) == 4, "found %lld\n", + (long long)(int)offsetof(struct object_update, ou_result_size)); + LASSERTF((int)sizeof(((struct object_update *)0)->ou_result_size) == 4, "found %lld\n", + (long long)(int)sizeof(((struct object_update *)0)->ou_result_size)); LASSERTF((int)offsetof(struct object_update, ou_flags) == 8, "found %lld\n", (long long)(int)offsetof(struct object_update, ou_flags)); LASSERTF((int)sizeof(((struct object_update *)0)->ou_flags) == 4, "found %lld\n", @@ -4674,10 +4674,10 @@ void lustre_assert_wire_constants(void) (long long)(int)offsetof(struct out_update_header, ouh_inline_length)); LASSERTF((int)sizeof(((struct out_update_header *)0)->ouh_inline_length) == 4, "found %lld\n", (long long)(int)sizeof(((struct out_update_header *)0)->ouh_inline_length)); - LASSERTF((int)offsetof(struct out_update_header, ouh_padding) == 12, "found %lld\n", - (long long)(int)offsetof(struct out_update_header, ouh_padding)); - LASSERTF((int)sizeof(((struct out_update_header *)0)->ouh_padding) == 4, "found %lld\n", - (long long)(int)sizeof(((struct out_update_header *)0)->ouh_padding)); + LASSERTF((int)offsetof(struct out_update_header, ouh_reply_size) == 12, "found %lld\n", + (long long)(int)offsetof(struct out_update_header, ouh_reply_size)); + LASSERTF((int)sizeof(((struct out_update_header *)0)->ouh_reply_size) == 4, "found %lld\n", + (long long)(int)sizeof(((struct out_update_header *)0)->ouh_reply_size)); LASSERTF((int)offsetof(struct out_update_header, ouh_inline_data) == 16, "found %lld\n", (long long)(int)offsetof(struct out_update_header, ouh_inline_data)); LASSERTF((int)sizeof(((struct out_update_header *)0)->ouh_inline_data) == 0, "found %lld\n", -- 1.8.3.1