Whamcloud - gitweb
LU-7318 out: dynamic reply size 89/16889/16
authorAlex Zhuravlev <alexey.zhuravlev@intel.com>
Tue, 20 Oct 2015 13:53:18 +0000 (16:53 +0300)
committerOleg Drokin <oleg.drokin@intel.com>
Tue, 24 Nov 2015 14:24:41 +0000 (14:24 +0000)
every update on the initiator side can declare how many bytes
it expects back. OUT packing library put these numbers on the
wire and prepary an appropriate buffer for the reply. then OUT
target do few checks to ensure individual replies fit their
buffers.

Change-Id: I443b5c879bc321c33efb70af665ecd2b2f7baa18
Signed-off-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-on: http://review.whamcloud.com/16889
Tested-by: Jenkins
Reviewed-by: Fan Yong <fan.yong@intel.com>
Reviewed-by: James Simmons <uja.ornl@yahoo.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_update.h
lustre/osp/osp_internal.h
lustre/osp/osp_object.c
lustre/osp/osp_trans.c
lustre/ptlrpc/pack_generic.c
lustre/ptlrpc/wiretest.c
lustre/target/out_handler.c
lustre/target/out_lib.c
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index 896ddb4..5106b37 100644 (file)
@@ -3952,7 +3952,7 @@ object_update_param_size(const struct object_update_param *param)
 struct object_update {
        __u16           ou_type;                /* enum update_type */
        __u16           ou_params_count;        /* update parameters count */
-       __u32           ou_master_index;        /* master MDT/OST index */
+       __u32           ou_result_size;         /* how many bytes can return */
        __u32           ou_flags;               /* enum update_flag */
        __u32           ou_padding1;            /* padding 1 */
        __u64           ou_batchid;             /* op transno on master */
@@ -3978,7 +3978,7 @@ struct out_update_header {
        __u32           ouh_magic;
        __u32           ouh_count;
        __u32           ouh_inline_length;
-       __u32           ouh_padding;
+       __u32           ouh_reply_size;
        __u32           ouh_inline_data[0];
 };
 
index 0c69a70..1ec8b4d 100644 (file)
@@ -227,7 +227,7 @@ object_update_result_insert(struct object_update_reply *reply,
        LASSERT(update_result != NULL);
 
        update_result->our_rc = ptlrpc_status_hton(rc);
-       if (data_len > 0) {
+       if (data != NULL && data_len > 0) {
                LASSERT(data != NULL);
                ptr = (char *)update_result +
                        cfs_size_round(sizeof(struct object_update_reply));
@@ -261,7 +261,7 @@ object_update_result_data_get(const struct object_update_reply *reply,
        lbuf->lb_buf = update_result->our_data;
        lbuf->lb_len = update_result->our_datalen;
 
-       return 0;
+       return result;
 }
 
 /**
@@ -390,7 +390,8 @@ struct thandle_exec_args {
 int out_update_pack(const struct lu_env *env, struct object_update *update,
                    size_t *max_update_size, enum update_type op,
                    const struct lu_fid *fid, unsigned int params_count,
-                   __u16 *param_sizes, const void **param_bufs);
+                   __u16 *param_sizes, const void **param_bufs,
+                   __u32 reply_size);
 int out_create_pack(const struct lu_env *env, struct object_update *update,
                    size_t *max_update_size, const struct lu_fid *fid,
                    const struct lu_attr *attr, struct dt_allocation_hint *hint,
@@ -435,7 +436,8 @@ int out_index_lookup_pack(const struct lu_env *env,
                          const struct dt_key *key);
 int out_xattr_get_pack(const struct lu_env *env,
                       struct object_update *update, size_t *max_update_size,
-                      const struct lu_fid *fid, const char *name);
+                      const struct lu_fid *fid, const char *name,
+                      const int bufsize);
 int out_read_pack(const struct lu_env *env, struct object_update *update,
                  size_t *max_update_length, const struct lu_fid *fid,
                  size_t size, loff_t pos);
index 26ecc45..1352d64 100644 (file)
@@ -111,6 +111,8 @@ struct osp_update_request {
 
        /* List of osp_update_request_sub */
        struct list_head                our_req_list;
+       int                             our_req_nr;
+       int                             our_update_nr;
 
        struct list_head                our_cb_items;
 
@@ -118,6 +120,7 @@ struct osp_update_request {
        struct osp_thandle              *our_th;
        /* linked to the list(ou_list) in osp_updates */
        struct list_head                our_list;
+       __u32                           our_batchid;
        __u32                           our_req_sent:1;
 };
 
@@ -630,8 +633,11 @@ int osp_object_update_request_create(struct osp_update_request *our,
                } else {                                                \
                        if (ret == 0) {                                 \
                                ours->ours_req->ourq_count++;           \
+                               (our)->our_update_nr++;                 \
+                               object_update->ou_batchid =             \
+                                                    (our)->our_batchid;\
                                object_update->ou_flags |=              \
-                                                    update->our_flags; \
+                                                    (our)->our_flags;  \
                        }                                               \
                        break;                                          \
                }                                                       \
@@ -662,7 +668,7 @@ extern struct llog_operations osp_mds_ost_orig_logops;
 /* osp_trans.c */
 int osp_insert_async_request(const struct lu_env *env, enum update_type op,
                             struct osp_object *obj, int count, __u16 *lens,
-                            const void **bufs, void *data,
+                            const void **bufs, void *data, __u32 repsize,
                             osp_update_interpreter_t interpreter);
 
 int osp_unplug_async_request(const struct lu_env *env,
index 8571ce3..033044d 100644 (file)
@@ -512,6 +512,7 @@ static int osp_declare_attr_get(const struct lu_env *env, struct dt_object *dt)
        mutex_lock(&osp->opd_async_requests_mutex);
        rc = osp_insert_async_request(env, OUT_ATTR_GET, obj, 0, NULL, NULL,
                                      &obj->opo_ooa->ooa_attr,
+                                     sizeof(struct obdo),
                                      osp_attr_get_interpterer);
        mutex_unlock(&osp->opd_async_requests_mutex);
 
@@ -851,7 +852,8 @@ static int osp_declare_xattr_get(const struct lu_env *env, struct dt_object *dt,
 
        mutex_lock(&osp->opd_async_requests_mutex);
        rc = osp_insert_async_request(env, OUT_XATTR_GET, obj, 1,
-                                     &namelen, (const void **)&name, oxe,
+                                     &namelen, (const void **)&name,
+                                     oxe, buf->lb_len,
                                      osp_xattr_get_interpterer);
        if (rc != 0) {
                mutex_unlock(&osp->opd_async_requests_mutex);
@@ -969,7 +971,7 @@ unlock:
                GOTO(out, rc = PTR_ERR(update));
 
        rc = osp_update_rpc_pack(env, xattr_get, update, OUT_XATTR_GET,
-                                lu_object_fid(&dt->do_lu), name);
+                                lu_object_fid(&dt->do_lu), name, buf->lb_len);
        if (rc != 0) {
                CERROR("%s: Insert update error "DFID": rc = %d\n",
                       dname, PFID(lu_object_fid(&dt->do_lu)), rc);
@@ -977,7 +979,7 @@ unlock:
        }
 
        rc = osp_remote_sync(env, osp, update, &req);
-       if (rc != 0) {
+       if (rc < 0) {
                if (rc == -ENOENT) {
                        dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
                        obj->opo_non_exist = 1;
@@ -1025,7 +1027,7 @@ unlock:
                GOTO(out, rc);
 
        if (buf->lb_buf == NULL)
-               GOTO(out, rc = rbuf->lb_len);
+               GOTO(out, rc);
 
        if (unlikely(buf->lb_len < rbuf->lb_len))
                GOTO(out, rc = -ERANGE);
index d49bf7e..b055d33 100644 (file)
@@ -143,6 +143,7 @@ int osp_object_update_request_create(struct osp_update_request *our,
        ours->ours_req_size = size;
        INIT_LIST_HEAD(&ours->ours_list);
        list_add_tail(&ours->ours_list, &our->our_req_list);
+       our->our_req_nr++;
 
        return 0;
 }
@@ -228,12 +229,13 @@ object_update_request_dump(const struct object_update_request *ourq,
 
                update = object_update_request_get(ourq, i, &size);
                LASSERT(update != NULL);
-               CDEBUG(mask, "i = %u fid = "DFID" op = %s master = %u"
-                      "params = %d batchid = "LPU64" size = %zu\n",
+               CDEBUG(mask, "i = %u fid = "DFID" op = %s "
+                      "params = %d batchid = "LPU64" size = %zu repsize %u\n",
                       i, PFID(&update->ou_fid),
                       update_op_str(update->ou_type),
-                      update->ou_master_index, update->ou_params_count,
-                      update->ou_batchid, size);
+                      update->ou_params_count,
+                      update->ou_batchid, size,
+                      (unsigned)update->ou_result_size);
 
                total_size += size;
        }
@@ -257,12 +259,17 @@ object_update_request_dump(const struct object_update_request *ourq,
  */
 int osp_prep_inline_update_req(const struct lu_env *env,
                               struct ptlrpc_request *req,
-                              struct osp_update_request_sub *ours)
+                              struct osp_update_request *our,
+                              int repsize)
 {
+       struct osp_update_request_sub *ours;
        struct out_update_header *ouh;
-       __u32 update_req_size = object_update_request_size(ours->ours_req);
+       __u32 update_req_size;
        int rc;
 
+       ours = list_entry(our->our_req_list.next,
+                         struct osp_update_request_sub, ours_list);
+       update_req_size = object_update_request_size(ours->ours_req);
        req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_HEADER, RCL_CLIENT,
                             update_req_size + sizeof(*ouh));
 
@@ -274,11 +281,12 @@ int osp_prep_inline_update_req(const struct lu_env *env,
        ouh->ouh_magic = OUT_UPDATE_HEADER_MAGIC;
        ouh->ouh_count = 1;
        ouh->ouh_inline_length = update_req_size;
+       ouh->ouh_reply_size = repsize;
 
        memcpy(ouh->ouh_inline_data, ours->ours_req, update_req_size);
 
        req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
-                            RCL_SERVER, OUT_UPDATE_REPLY_SIZE);
+                            RCL_SERVER, repsize);
 
        ptlrpc_request_set_replen(req);
        req->rq_request_portal = OUT_PORTAL;
@@ -308,16 +316,41 @@ int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
        struct ptlrpc_request           *req;
        struct ptlrpc_bulk_desc         *desc;
        struct osp_update_request_sub   *ours;
+       const struct object_update_request *ourq;
        struct out_update_header        *ouh;
        struct out_update_buffer        *oub;
        __u32                           buf_count = 0;
-       int                             rc;
+       int                             repsize = 0;
+       struct object_update_reply      *reply;
+       int                             rc, i;
+       int                             total = 0;
        ENTRY;
 
        list_for_each_entry(ours, &our->our_req_list, ours_list) {
                object_update_request_dump(ours->ours_req, D_INFO);
+
+               ourq = ours->ours_req;
+               for (i = 0; i < ourq->ourq_count; i++) {
+                       struct object_update    *update;
+                       size_t                  size = 0;
+
+
+                       /* XXX: it's very inefficient to lookup update
+                        *      this way, iterating from the beginning
+                        *      each time */
+                       update = object_update_request_get(ourq, i, &size);
+                       LASSERT(update != NULL);
+
+                       repsize += sizeof(reply->ourp_lens[0]);
+                       repsize += sizeof(struct object_update_result);
+                       repsize += update->ou_result_size;
+               }
+
                buf_count++;
        }
+       repsize += sizeof(*reply);
+       repsize = (repsize + OUT_UPDATE_REPLY_SIZE - 1) &
+                       ~(OUT_UPDATE_REPLY_SIZE - 1);
        LASSERT(buf_count > 0);
 
        req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE);
@@ -332,7 +365,7 @@ int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
                if (object_update_request_size(ours->ours_req) +
                    sizeof(struct out_update_header) <
                                OUT_UPDATE_MAX_INLINE_SIZE) {
-                       rc = osp_prep_inline_update_req(env, req, ours);
+                       rc = osp_prep_inline_update_req(env, req, our, repsize);
                        if (rc == 0)
                                *reqp = req;
                        GOTO(out_req, rc);
@@ -353,6 +386,7 @@ int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
        ouh->ouh_magic = OUT_UPDATE_HEADER_MAGIC;
        ouh->ouh_count = buf_count;
        ouh->ouh_inline_length = 0;
+       ouh->ouh_reply_size = repsize;
        oub = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE_BUF);
        list_for_each_entry(ours, &our->our_req_list, ours_list) {
                oub->oub_size = ours->ours_req_size;
@@ -368,12 +402,15 @@ int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
                GOTO(out_req, rc = -ENOMEM);
 
        /* NB req now owns desc and will free it when it gets freed */
-       list_for_each_entry(ours, &our->our_req_list, ours_list)
+       list_for_each_entry(ours, &our->our_req_list, ours_list) {
                desc->bd_frag_ops->add_iov_frag(desc, ours->ours_req,
                                                ours->ours_req_size);
+               total += ours->ours_req_size;
+       }
+       CDEBUG(D_OTHER, "total %d in %u\n", total, our->our_update_nr);
 
        req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
-                            RCL_SERVER, OUT_UPDATE_REPLY_SIZE);
+                            RCL_SERVER, repsize);
 
        ptlrpc_request_set_replen(req);
        req->rq_request_portal = OUT_PORTAL;
@@ -715,6 +752,7 @@ int osp_insert_update_callback(const struct lu_env *env,
  * \param[in] lens             buffer length array for the subsequent \a bufs
  * \param[in] bufs             the buffers to compose the request
  * \param[in] data             pointer to the data used by the interpreter
+ * \param[in] repsize          how many bytes the caller allocated for \a data
  * \param[in] interpreter      pointer to the interpreter function
  *
  * \retval                     0 for success
@@ -722,7 +760,8 @@ int osp_insert_update_callback(const struct lu_env *env,
  */
 int osp_insert_async_request(const struct lu_env *env, enum update_type op,
                             struct osp_object *obj, int count,
-                            __u16 *lens, const void **bufs, void *data,
+                            __u16 *lens, const void **bufs,
+                            void *data, __u32 repsize,
                             osp_update_interpreter_t interpreter)
 {
        struct osp_device               *osp;
@@ -748,7 +787,8 @@ again:
 
        object_update = update_buffer_get_update(ureq, ureq->ourq_count);
        rc = out_update_pack(env, object_update, &max_update_size, op,
-                            lu_object_fid(osp2lu_obj(obj)), count, lens, bufs);
+                            lu_object_fid(osp2lu_obj(obj)), count, lens, bufs,
+                            repsize);
        /* The queue is full. */
        if (rc == -E2BIG) {
                osp->opd_async_requests = NULL;
@@ -769,6 +809,7 @@ again:
                        RETURN(rc);
 
                ureq->ourq_count++;
+               our->our_update_nr++;
        }
 
        rc = osp_insert_update_callback(env, our, obj, data, interpreter);
index 362b3dc..02aafad 100644 (file)
@@ -2518,7 +2518,7 @@ void lustre_swab_object_update(struct object_update *ou)
 
        __swab16s(&ou->ou_type);
        __swab16s(&ou->ou_params_count);
-       __swab32s(&ou->ou_master_index);
+       __swab32s(&ou->ou_result_size);
        __swab32s(&ou->ou_flags);
        __swab32s(&ou->ou_padding1);
        __swab64s(&ou->ou_batchid);
@@ -2579,7 +2579,7 @@ void lustre_swab_out_update_header(struct out_update_header *ouh)
        __swab32s(&ouh->ouh_magic);
        __swab32s(&ouh->ouh_count);
        __swab32s(&ouh->ouh_inline_length);
-       __swab32s(&ouh->ouh_padding);
+       __swab32s(&ouh->ouh_reply_size);
 }
 EXPORT_SYMBOL(lustre_swab_out_update_header);
 
index aa28a3f..4a5a4c3 100644 (file)
@@ -4563,10 +4563,10 @@ void lustre_assert_wire_constants(void)
                 (long long)(int)offsetof(struct object_update, ou_params_count));
        LASSERTF((int)sizeof(((struct object_update *)0)->ou_params_count) == 2, "found %lld\n",
                 (long long)(int)sizeof(((struct object_update *)0)->ou_params_count));
-       LASSERTF((int)offsetof(struct object_update, ou_master_index) == 4, "found %lld\n",
-                (long long)(int)offsetof(struct object_update, ou_master_index));
-       LASSERTF((int)sizeof(((struct object_update *)0)->ou_master_index) == 4, "found %lld\n",
-                (long long)(int)sizeof(((struct object_update *)0)->ou_master_index));
+       LASSERTF((int)offsetof(struct object_update, ou_result_size) == 4, "found %lld\n",
+                (long long)(int)offsetof(struct object_update, ou_result_size));
+       LASSERTF((int)sizeof(((struct object_update *)0)->ou_result_size) == 4, "found %lld\n",
+                (long long)(int)sizeof(((struct object_update *)0)->ou_result_size));
        LASSERTF((int)offsetof(struct object_update, ou_flags) == 8, "found %lld\n",
                 (long long)(int)offsetof(struct object_update, ou_flags));
        LASSERTF((int)sizeof(((struct object_update *)0)->ou_flags) == 4, "found %lld\n",
@@ -4663,10 +4663,10 @@ void lustre_assert_wire_constants(void)
                 (long long)(int)offsetof(struct out_update_header, ouh_inline_length));
        LASSERTF((int)sizeof(((struct out_update_header *)0)->ouh_inline_length) == 4, "found %lld\n",
                 (long long)(int)sizeof(((struct out_update_header *)0)->ouh_inline_length));
-       LASSERTF((int)offsetof(struct out_update_header, ouh_padding) == 12, "found %lld\n",
-                (long long)(int)offsetof(struct out_update_header, ouh_padding));
-       LASSERTF((int)sizeof(((struct out_update_header *)0)->ouh_padding) == 4, "found %lld\n",
-                (long long)(int)sizeof(((struct out_update_header *)0)->ouh_padding));
+       LASSERTF((int)offsetof(struct out_update_header, ouh_reply_size) == 12, "found %lld\n",
+                (long long)(int)offsetof(struct out_update_header, ouh_reply_size));
+       LASSERTF((int)sizeof(((struct out_update_header *)0)->ouh_reply_size) == 4, "found %lld\n",
+                (long long)(int)sizeof(((struct out_update_header *)0)->ouh_reply_size));
        LASSERTF((int)offsetof(struct out_update_header, ouh_inline_data) == 16, "found %lld\n",
                 (long long)(int)offsetof(struct out_update_header, ouh_inline_data));
        LASSERTF((int)sizeof(((struct out_update_header *)0)->ouh_inline_data) == 0, "found %lld\n",
index b38e1eb..e725424 100644 (file)
@@ -185,6 +185,7 @@ static int out_attr_get(struct tgt_session_info *tsi)
 {
        const struct lu_env     *env = tsi->tsi_env;
        struct tgt_thread_info  *tti = tgt_th_info(env);
+       struct object_update    *update = tti->tti_u.update.tti_update;
        struct obdo             *obdo = &tti->tti_u.update.tti_obdo;
        struct lu_attr          *la = &tti->tti_attr;
        struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
@@ -193,6 +194,9 @@ static int out_attr_get(struct tgt_session_info *tsi)
 
        ENTRY;
 
+       if (unlikely(update->ou_result_size < sizeof(*obdo)))
+               return -EPROTO;
+
        if (!lu_object_exists(&obj->do_lu)) {
                /* Usually, this will be called when the master MDT try
                 * to init a remote object(see osp_object_init), so if
@@ -260,19 +264,15 @@ static int out_xattr_get(struct tgt_session_info *tsi)
                RETURN(err_serious(-EPROTO));
        }
 
+       lbuf->lb_len = (int)tti->tti_u.update.tti_update->ou_result_size;
        lbuf->lb_buf = update_result->our_data;
-       lbuf->lb_len = OUT_UPDATE_REPLY_SIZE -
-                      cfs_size_round((unsigned long)update_result->our_data -
-                                     (unsigned long)update_result);
+       if (lbuf->lb_len == 0)
+               lbuf->lb_buf = 0;
        dt_read_lock(env, obj, MOR_TGT_CHILD);
        rc = dt_xattr_get(env, obj, lbuf, name);
        dt_read_unlock(env, obj);
-       if (rc < 0) {
+       if (rc < 0)
                lbuf->lb_len = 0;
-               GOTO(out, rc);
-       }
-       lbuf->lb_len = rc;
-       rc = 0;
        CDEBUG(D_INFO, "%s: "DFID" get xattr %s len %d\n",
               tgt_name(tsi->tsi_tgt), PFID(lu_object_fid(&obj->do_lu)),
               name, (int)lbuf->lb_len);
@@ -281,7 +281,7 @@ static int out_xattr_get(struct tgt_session_info *tsi)
 
 out:
        object_update_result_insert(reply, lbuf->lb_buf, lbuf->lb_len, idx, rc);
-       RETURN(rc);
+       RETURN(0);
 }
 
 static int out_index_lookup(struct tgt_session_info *tsi)
@@ -295,6 +295,9 @@ static int out_index_lookup(struct tgt_session_info *tsi)
 
        ENTRY;
 
+       if (unlikely(update->ou_result_size < sizeof(tti->tti_fid1)))
+               return -EPROTO;
+
        if (!lu_object_exists(&obj->do_lu))
                RETURN(-ENOENT);
 
@@ -904,7 +907,8 @@ int out_handle(struct tgt_session_info *tsi)
        unsigned int                    reply_index = 0;
        int                             rc = 0;
        int                             rc1 = 0;
-       int                             ouh_size;
+       int                             ouh_size, reply_size;
+       int                             updates;
        ENTRY;
 
        req_capsule_set(pill, &RQF_OUT_UPDATE);
@@ -928,15 +932,6 @@ int out_handle(struct tgt_session_info *tsi)
        if (update_buf_count == 0)
                RETURN(err_serious(-EPROTO));
 
-       req_capsule_set_size(pill, &RMF_OUT_UPDATE_REPLY, RCL_SERVER,
-                            OUT_UPDATE_REPLY_SIZE);
-       rc = req_capsule_server_pack(pill);
-       if (rc != 0) {
-               CERROR("%s: Can't pack response: rc = %d\n",
-                      tgt_name(tsi->tsi_tgt), rc);
-               RETURN(rc);
-       }
-
        OBD_ALLOC(update_bufs, sizeof(*update_bufs) * update_buf_count);
        if (update_bufs == NULL)
                RETURN(-ENOMEM);
@@ -981,19 +976,13 @@ int out_handle(struct tgt_session_info *tsi)
                if (rc < 0)
                        GOTO(out_free, rc);
        }
-       /* Prepare the update reply buffer */
-       reply = req_capsule_server_get(pill, &RMF_OUT_UPDATE_REPLY);
-       if (reply == NULL)
-               GOTO(out_free, rc = err_serious(-EPROTO));
-       reply->ourp_magic = UPDATE_REPLY_MAGIC;
-       tti->tti_u.update.tti_update_reply = reply;
-       tti->tti_mult_trans = !req_is_replay(tgt_ses_req(tsi));
-
        /* validate the request and calculate the total update count and
         * set it to reply */
+       reply_size = 0;
+       updates = 0;
        for (i = 0; i < update_buf_count; i++) {
-               struct object_update_request *our;
-               int                     update_count;
+               struct object_update_request    *our;
+               int                              j;
 
                our = update_bufs[i];
                if (ptlrpc_req_need_swab(pill->rc_req))
@@ -1006,9 +995,56 @@ int out_handle(struct tgt_session_info *tsi)
                               UPDATE_REQUEST_MAGIC, -EPROTO);
                        GOTO(out_free, rc = -EPROTO);
                }
-               update_count = our->ourq_count;
-               reply->ourp_count += update_count;
+               updates += our->ourq_count;
+
+               /* need to calculate reply size */
+               for (j = 0; j < our->ourq_count; j++) {
+                       update = object_update_request_get(our, j, NULL);
+                       if (update == NULL)
+                               GOTO(out, rc = -EPROTO);
+                       if (ptlrpc_req_need_swab(pill->rc_req))
+                               lustre_swab_object_update(update);
+
+                       if (!fid_is_sane(&update->ou_fid)) {
+                               CERROR("%s: invalid FID "DFID": rc = %d\n",
+                                      tgt_name(tsi->tsi_tgt),
+                                      PFID(&update->ou_fid), -EPROTO);
+                               GOTO(out, rc = err_serious(-EPROTO));
+                       }
+
+                       /* XXX: what ou_result_size can be considered safe? */
+
+                       reply_size += sizeof(reply->ourp_lens[0]);
+                       reply_size += sizeof(struct object_update_result);
+                       reply_size += update->ou_result_size;
+               }
        }
+       reply_size += sizeof(*reply);
+
+       if (unlikely(reply_size > ouh->ouh_reply_size)) {
+               CERROR("%s: too small reply buf %u for %u, need %u at least\n",
+                      tgt_name(tsi->tsi_tgt), ouh->ouh_reply_size,
+                      updates, reply_size);
+               GOTO(out_free, rc = -EPROTO);
+       }
+
+       req_capsule_set_size(pill, &RMF_OUT_UPDATE_REPLY, RCL_SERVER,
+                            ouh->ouh_reply_size);
+       rc = req_capsule_server_pack(pill);
+       if (rc != 0) {
+               CERROR("%s: Can't pack response: rc = %d\n",
+                      tgt_name(tsi->tsi_tgt), rc);
+               GOTO(out_free, rc = -EPROTO);
+       }
+
+       /* Prepare the update reply buffer */
+       reply = req_capsule_server_get(pill, &RMF_OUT_UPDATE_REPLY);
+       if (reply == NULL)
+               GOTO(out_free, rc = err_serious(-EPROTO));
+       reply->ourp_magic = UPDATE_REPLY_MAGIC;
+       reply->ourp_count = updates;
+       tti->tti_u.update.tti_update_reply = reply;
+       tti->tti_mult_trans = !req_is_replay(tgt_ses_req(tsi));
  
        /* Walk through updates in the request to execute them */
        for (i = 0; i < update_buf_count; i++) {
@@ -1022,18 +1058,6 @@ int out_handle(struct tgt_session_info *tsi)
                update_count = our->ourq_count;
                for (j = 0; j < update_count; j++) {
                        update = object_update_request_get(our, j, NULL);
-                       if (update == NULL)
-                               GOTO(out, rc = -EPROTO);
-
-                       if (ptlrpc_req_need_swab(pill->rc_req))
-                               lustre_swab_object_update(update);
-
-                       if (!fid_is_sane(&update->ou_fid)) {
-                               CERROR("%s: invalid FID "DFID": rc = %d\n",
-                                      tgt_name(tsi->tsi_tgt),
-                                      PFID(&update->ou_fid), -EPROTO);
-                               GOTO(out, rc = err_serious(-EPROTO));
-                       }
 
                        dt_obj = dt_locate(env, dt, &update->ou_fid);
                        if (IS_ERR(dt_obj))
index 0f3e037..abd7963 100644 (file)
@@ -91,12 +91,17 @@ int out_update_header_pack(const struct lu_env *env,
                           size_t *max_update_size,
                           enum update_type update_op,
                           const struct lu_fid *fid,
-                          unsigned int param_count, __u16 *param_sizes)
+                          unsigned int param_count,
+                          __u16 *param_sizes,
+                          __u32 reply_size)
 {
        struct object_update_param      *param;
        unsigned int                    i;
        size_t                          update_size;
 
+       if (((reply_size + 7) >> 3) >= 1ULL << 16)
+               return -EINVAL;
+
        /* Check whether the packing exceeding the maxima update length */
        update_size = sizeof(*update);
        for (i = 0; i < param_count; i++)
@@ -110,6 +115,7 @@ int out_update_header_pack(const struct lu_env *env,
        update->ou_fid = *fid;
        update->ou_type = update_op;
        update->ou_params_count = param_count;
+       update->ou_result_size = reply_size;
        param = &update->ou_params[0];
        for (i = 0; i < param_count; i++) {
                param->oup_len = param_sizes[i];
@@ -138,7 +144,8 @@ int out_update_header_pack(const struct lu_env *env,
 int out_update_pack(const struct lu_env *env, struct object_update *update,
                    size_t *max_update_size, enum update_type op,
                    const struct lu_fid *fid, unsigned int param_count,
-                   __u16 *param_sizes, const void **param_bufs)
+                   __u16 *param_sizes, const void **param_bufs,
+                   __u32 reply_size)
 {
        struct object_update_param      *param;
        unsigned int                    i;
@@ -146,7 +153,7 @@ int out_update_pack(const struct lu_env *env, struct object_update *update,
        ENTRY;
 
        rc = out_update_header_pack(env, update, max_update_size, op, fid,
-                                   param_count, param_sizes);
+                                   param_count, param_sizes, reply_size);
        if (rc != 0)
                RETURN(rc);
 
@@ -195,7 +202,7 @@ int out_create_pack(const struct lu_env *env, struct object_update *update,
        }
 
        rc = out_update_header_pack(env, update, max_update_size, OUT_CREATE,
-                                   fid, buf_count, sizes);
+                                   fid, buf_count, sizes, 0);
        if (rc != 0)
                RETURN(rc);
 
@@ -221,7 +228,7 @@ int out_ref_del_pack(const struct lu_env *env, struct object_update *update,
                     size_t *max_update_size, const struct lu_fid *fid)
 {
        return out_update_pack(env, update, max_update_size, OUT_REF_DEL, fid,
-                              0, NULL, NULL);
+                              0, NULL, NULL, 0);
 }
 EXPORT_SYMBOL(out_ref_del_pack);
 
@@ -229,7 +236,7 @@ int out_ref_add_pack(const struct lu_env *env, struct object_update *update,
                     size_t *max_update_size, const struct lu_fid *fid)
 {
        return out_update_pack(env, update, max_update_size, OUT_REF_ADD, fid,
-                              0, NULL, NULL);
+                              0, NULL, NULL, 0);
 }
 EXPORT_SYMBOL(out_ref_add_pack);
 
@@ -243,7 +250,7 @@ int out_attr_set_pack(const struct lu_env *env, struct object_update *update,
        ENTRY;
 
        rc = out_update_header_pack(env, update, max_update_size,
-                                   OUT_ATTR_SET, fid, 1, &size);
+                                   OUT_ATTR_SET, fid, 1, &size, 0);
        if (rc != 0)
                RETURN(rc);
 
@@ -266,7 +273,7 @@ int out_xattr_set_pack(const struct lu_env *env, struct object_update *update,
                               (char *)&flag};
 
        return out_update_pack(env, update, max_update_size, OUT_XATTR_SET,
-                              fid, ARRAY_SIZE(sizes), sizes, bufs);
+                              fid, ARRAY_SIZE(sizes), sizes, bufs, 0);
 }
 EXPORT_SYMBOL(out_xattr_set_pack);
 
@@ -277,7 +284,7 @@ int out_xattr_del_pack(const struct lu_env *env, struct object_update *update,
        __u16   size = strlen(name) + 1;
 
        return out_update_pack(env, update, max_update_size, OUT_XATTR_DEL,
-                              fid, 1, &size, (const void **)&name);
+                              fid, 1, &size, (const void **)&name, 0);
 }
 EXPORT_SYMBOL(out_xattr_del_pack);
 
@@ -299,7 +306,7 @@ int out_index_insert_pack(const struct lu_env *env,
        fid_cpu_to_le(&rec_fid, rec1->rec_fid);
 
        return out_update_pack(env, update, max_update_size, OUT_INDEX_INSERT,
-                              fid, ARRAY_SIZE(sizes), sizes, bufs);
+                              fid, ARRAY_SIZE(sizes), sizes, bufs, 0);
 }
 EXPORT_SYMBOL(out_index_insert_pack);
 
@@ -312,7 +319,7 @@ int out_index_delete_pack(const struct lu_env *env,
        const void *buf = key;
 
        return out_update_pack(env, update, max_update_size, OUT_INDEX_DELETE,
-                              fid, 1, &size, &buf);
+                              fid, 1, &size, &buf, 0);
 }
 EXPORT_SYMBOL(out_index_delete_pack);
 
@@ -321,7 +328,7 @@ int out_object_destroy_pack(const struct lu_env *env,
                            size_t *max_update_size, const struct lu_fid *fid)
 {
        return out_update_pack(env, update, max_update_size, OUT_DESTROY, fid,
-                              0, NULL, NULL);
+                              0, NULL, NULL, 0);
 }
 EXPORT_SYMBOL(out_object_destroy_pack);
 
@@ -336,7 +343,7 @@ int out_write_pack(const struct lu_env *env, struct object_update *update,
        pos = cpu_to_le64(pos);
 
        rc = out_update_pack(env, update, max_update_size, OUT_WRITE, fid,
-                            ARRAY_SIZE(sizes), sizes, bufs);
+                            ARRAY_SIZE(sizes), sizes, bufs, 0);
        return rc;
 }
 EXPORT_SYMBOL(out_write_pack);
@@ -363,8 +370,9 @@ int out_index_lookup_pack(const struct lu_env *env,
        const void      *name = key;
        __u16           size = strlen((char *)name) + 1;
 
+       /* XXX: this shouldn't be hardcoded */
        return out_update_pack(env, update, max_update_size, OUT_INDEX_LOOKUP,
-                              fid, 1, &size, &name);
+                              fid, 1, &size, &name, 256);
 }
 EXPORT_SYMBOL(out_index_lookup_pack);
 
@@ -372,13 +380,13 @@ int out_attr_get_pack(const struct lu_env *env, struct object_update *update,
                      size_t *max_update_size, const struct lu_fid *fid)
 {
        return out_update_pack(env, update, max_update_size, OUT_ATTR_GET,
-                              fid, 0, NULL, NULL);
+                              fid, 0, NULL, NULL, sizeof(struct obdo));
 }
 EXPORT_SYMBOL(out_attr_get_pack);
 
 int out_xattr_get_pack(const struct lu_env *env, struct object_update *update,
                       size_t *max_update_size, const struct lu_fid *fid,
-                      const char *name)
+                      const char *name, const int bufsize)
 {
        __u16 size;
 
@@ -386,7 +394,7 @@ int out_xattr_get_pack(const struct lu_env *env, struct object_update *update,
        size = strlen(name) + 1;
 
        return out_update_pack(env, update, max_update_size, OUT_XATTR_GET,
-                              fid, 1, &size, (const void **)&name);
+                              fid, 1, &size, (const void **)&name, bufsize);
 }
 EXPORT_SYMBOL(out_xattr_get_pack);
 
@@ -397,11 +405,12 @@ int out_read_pack(const struct lu_env *env, struct object_update *update,
        __u16           sizes[2] = {sizeof(size), sizeof(pos)};
        const void      *bufs[2] = {&size, &pos};
 
+       LASSERT(size > 0);
        size = cpu_to_le64(size);
        pos = cpu_to_le64(pos);
 
        return out_update_pack(env, update, max_update_size, OUT_READ, fid,
-                              ARRAY_SIZE(sizes), sizes, bufs);
+                              ARRAY_SIZE(sizes), sizes, bufs, size);
 }
 EXPORT_SYMBOL(out_read_pack);
 
index 6f519f0..e72a556 100644 (file)
@@ -2087,7 +2087,7 @@ static void check_object_update(void)
        CHECK_STRUCT(object_update);
        CHECK_MEMBER(object_update, ou_type);
        CHECK_MEMBER(object_update, ou_params_count);
-       CHECK_MEMBER(object_update, ou_master_index);
+       CHECK_MEMBER(object_update, ou_result_size);
        CHECK_MEMBER(object_update, ou_flags);
        CHECK_MEMBER(object_update, ou_padding1);
        CHECK_MEMBER(object_update, ou_batchid);
@@ -2132,7 +2132,7 @@ static void check_out_update_header(void)
        CHECK_MEMBER(out_update_header, ouh_magic);
        CHECK_MEMBER(out_update_header, ouh_count);
        CHECK_MEMBER(out_update_header, ouh_inline_length);
-       CHECK_MEMBER(out_update_header, ouh_padding);
+       CHECK_MEMBER(out_update_header, ouh_reply_size);
        CHECK_MEMBER(out_update_header, ouh_inline_data);
 }
 
index ab0487b..bec44ea 100644 (file)
@@ -4574,10 +4574,10 @@ void lustre_assert_wire_constants(void)
                 (long long)(int)offsetof(struct object_update, ou_params_count));
        LASSERTF((int)sizeof(((struct object_update *)0)->ou_params_count) == 2, "found %lld\n",
                 (long long)(int)sizeof(((struct object_update *)0)->ou_params_count));
-       LASSERTF((int)offsetof(struct object_update, ou_master_index) == 4, "found %lld\n",
-                (long long)(int)offsetof(struct object_update, ou_master_index));
-       LASSERTF((int)sizeof(((struct object_update *)0)->ou_master_index) == 4, "found %lld\n",
-                (long long)(int)sizeof(((struct object_update *)0)->ou_master_index));
+       LASSERTF((int)offsetof(struct object_update, ou_result_size) == 4, "found %lld\n",
+                (long long)(int)offsetof(struct object_update, ou_result_size));
+       LASSERTF((int)sizeof(((struct object_update *)0)->ou_result_size) == 4, "found %lld\n",
+                (long long)(int)sizeof(((struct object_update *)0)->ou_result_size));
        LASSERTF((int)offsetof(struct object_update, ou_flags) == 8, "found %lld\n",
                 (long long)(int)offsetof(struct object_update, ou_flags));
        LASSERTF((int)sizeof(((struct object_update *)0)->ou_flags) == 4, "found %lld\n",
@@ -4674,10 +4674,10 @@ void lustre_assert_wire_constants(void)
                 (long long)(int)offsetof(struct out_update_header, ouh_inline_length));
        LASSERTF((int)sizeof(((struct out_update_header *)0)->ouh_inline_length) == 4, "found %lld\n",
                 (long long)(int)sizeof(((struct out_update_header *)0)->ouh_inline_length));
-       LASSERTF((int)offsetof(struct out_update_header, ouh_padding) == 12, "found %lld\n",
-                (long long)(int)offsetof(struct out_update_header, ouh_padding));
-       LASSERTF((int)sizeof(((struct out_update_header *)0)->ouh_padding) == 4, "found %lld\n",
-                (long long)(int)sizeof(((struct out_update_header *)0)->ouh_padding));
+       LASSERTF((int)offsetof(struct out_update_header, ouh_reply_size) == 12, "found %lld\n",
+                (long long)(int)offsetof(struct out_update_header, ouh_reply_size));
+       LASSERTF((int)sizeof(((struct out_update_header *)0)->ouh_reply_size) == 4, "found %lld\n",
+                (long long)(int)sizeof(((struct out_update_header *)0)->ouh_reply_size));
        LASSERTF((int)offsetof(struct out_update_header, ouh_inline_data) == 16, "found %lld\n",
                 (long long)(int)offsetof(struct out_update_header, ouh_inline_data));
        LASSERTF((int)sizeof(((struct out_update_header *)0)->ouh_inline_data) == 0, "found %lld\n",