Whamcloud - gitweb
LU-3534 osp: transfer updates with bulk RPC 86/13786/34
authorwang di <di.wang@intel.com>
Fri, 19 Jun 2015 23:40:26 +0000 (16:40 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Fri, 3 Jul 2015 15:25:18 +0000 (15:25 +0000)
Send update request with bulk RPC(iovec), and update request
will be split into multiple buffers, so OSP does not need to
allocate big contingous buffer to store the update request.

Signed-off-by: wang di <di.wang@intel.com>
Change-Id: I70995a552ee6fdd77f4c0a84d2435e5963e4fd6d
Reviewed-on: http://review.whamcloud.com/13786
Tested-by: Jenkins
Reviewed-by: James Simmons <uja.ornl@yahoo.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
16 files changed:
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_net.h
lustre/include/lustre_req_layout.h
lustre/include/lustre_update.h
lustre/ldlm/ldlm_lib.c
lustre/osp/osp_internal.h
lustre/osp/osp_md_object.c
lustre/osp/osp_object.c
lustre/osp/osp_trans.c
lustre/ptlrpc/client.c
lustre/ptlrpc/layout.c
lustre/ptlrpc/pack_generic.c
lustre/target/out_handler.c
lustre/target/out_lib.c
lustre/tests/replay-single.sh
lustre/utils/req-layout.c

index 91ae410..281d88d 100644 (file)
@@ -4069,8 +4069,22 @@ struct object_update_request {
        struct object_update    ourq_updates[0];
 };
 
+#define OUT_UPDATE_HEADER_MAGIC        0xBDDF0001
+/* Header for updates request between MDTs */
+struct out_update_header {
+       __u32           ouh_magic;
+       __u32           ouh_count;
+};
+
+struct out_update_buffer {
+       __u32   oub_size;
+       __u32   oub_padding;
+};
+
 void lustre_swab_object_update(struct object_update *ou);
 void lustre_swab_object_update_request(struct object_update_request *our);
+void lustre_swab_out_update_header(struct out_update_header *ouh);
+void lustre_swab_out_update_buffer(struct out_update_buffer *oub);
 
 static inline size_t
 object_update_params_size(const struct object_update *update)
index 8c6be2d..9a0b67e 100644 (file)
@@ -1409,6 +1409,7 @@ struct ptlrpc_bulk_frag_ops {
 
 extern const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_pin_ops;
 extern const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_nopin_ops;
+extern const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kvec_ops;
 
 /*
  * Definition of bulk descriptor.
index 9295c19..9d859a8 100644 (file)
@@ -334,6 +334,8 @@ extern struct req_msg_field RMF_U32;
 /* OBJ update format */
 extern struct req_msg_field RMF_OUT_UPDATE;
 extern struct req_msg_field RMF_OUT_UPDATE_REPLY;
+extern struct req_msg_field RMF_OUT_UPDATE_HEADER;
+extern struct req_msg_field RMF_OUT_UPDATE_BUF;
 
 /* LFSCK format */
 extern struct req_msg_field RMF_LFSCK_REQUEST;
index 27d56ee..c2994b1 100644 (file)
@@ -258,13 +258,6 @@ object_update_request_size(const struct object_update_request *our)
 }
 
 static inline void
-object_update_reply_init(struct object_update_reply *reply, size_t count)
-{
-       reply->ourp_magic = UPDATE_REPLY_MAGIC;
-       reply->ourp_count = count;
-}
-
-static inline void
 object_update_result_insert(struct object_update_reply *reply,
                            void *data, size_t data_len, size_t index,
                            int rc)
@@ -430,56 +423,56 @@ struct thandle_exec_args {
 
 /* target/out_lib.c */
 int out_update_pack(const struct lu_env *env, struct object_update *update,
-                   size_t max_update_size, enum update_type op,
+                   size_t *max_update_size, enum update_type op,
                    const struct lu_fid *fid, unsigned int params_count,
                    __u16 *param_sizes, const void **param_bufs);
 int out_create_pack(const struct lu_env *env, struct object_update *update,
-                   size_t max_update_size, const struct lu_fid *fid,
+                   size_t *max_update_size, const struct lu_fid *fid,
                    const struct lu_attr *attr, struct dt_allocation_hint *hint,
                    struct dt_object_format *dof);
 int out_object_destroy_pack(const struct lu_env *env,
                            struct object_update *update,
-                           size_t max_update_size,
+                           size_t *max_update_size,
                            const struct lu_fid *fid);
 int out_index_delete_pack(const struct lu_env *env,
-                         struct object_update *update, size_t max_update_size,
+                         struct object_update *update, size_t *max_update_size,
                          const struct lu_fid *fid, const struct dt_key *key);
 int out_index_insert_pack(const struct lu_env *env,
-                         struct object_update *update, size_t max_update_size,
+                         struct object_update *update, size_t *max_update_size,
                          const struct lu_fid *fid, const struct dt_rec *rec,
                          const struct dt_key *key);
 int out_xattr_set_pack(const struct lu_env *env,
-                      struct object_update *update, size_t max_update_size,
+                      struct object_update *update, size_t *max_update_size,
                       const struct lu_fid *fid, const struct lu_buf *buf,
                       const char *name, __u32 flag);
 int out_xattr_del_pack(const struct lu_env *env,
-                      struct object_update *update, size_t max_update_size,
+                      struct object_update *update, size_t *max_update_size,
                       const struct lu_fid *fid, const char *name);
 int out_attr_set_pack(const struct lu_env *env,
-                     struct object_update *update, size_t max_update_size,
+                     struct object_update *update, size_t *max_update_size,
                      const struct lu_fid *fid, const struct lu_attr *attr);
 int out_ref_add_pack(const struct lu_env *env,
-                    struct object_update *update, size_t max_update_size,
+                    struct object_update *update, size_t *max_update_size,
                     const struct lu_fid *fid);
 int out_ref_del_pack(const struct lu_env *env,
-                    struct object_update *update, size_t max_update_size,
+                    struct object_update *update, size_t *max_update_size,
                     const struct lu_fid *fid);
 int out_write_pack(const struct lu_env *env,
-                  struct object_update *update, size_t max_update_size,
+                  struct object_update *update, size_t *max_update_size,
                   const struct lu_fid *fid, const struct lu_buf *buf,
                   __u64 pos);
 int out_attr_get_pack(const struct lu_env *env,
-                     struct object_update *update, size_t max_update_size,
+                     struct object_update *update, size_t *max_update_size,
                      const struct lu_fid *fid);
 int out_index_lookup_pack(const struct lu_env *env,
-                         struct object_update *update, size_t max_update_size,
+                         struct object_update *update, size_t *max_update_size,
                          const struct lu_fid *fid, struct dt_rec *rec,
                          const struct dt_key *key);
 int out_xattr_get_pack(const struct lu_env *env,
-                      struct object_update *update, size_t max_update_size,
+                      struct object_update *update, size_t *max_update_size,
                       const struct lu_fid *fid, const char *name);
 int out_read_pack(const struct lu_env *env, struct object_update *update,
-                 size_t max_update_length, const struct lu_fid *fid,
+                 size_t *max_update_length, const struct lu_fid *fid,
                  size_t size, loff_t pos);
 
 const char *update_op_str(__u16 opcode);
index 603ab41..2203acb 100644 (file)
@@ -1417,6 +1417,7 @@ static int target_exp_enqueue_req_replay(struct ptlrpc_request *req)
         __u64                  transno = lustre_msg_get_transno(req->rq_reqmsg);
         struct obd_export     *exp = req->rq_export;
         struct ptlrpc_request *reqiter;
+       struct ptlrpc_request *dup_req = NULL;
         int                    dup = 0;
 
         LASSERT(exp);
@@ -1425,6 +1426,7 @@ static int target_exp_enqueue_req_replay(struct ptlrpc_request *req)
        list_for_each_entry(reqiter, &exp->exp_req_replay_queue,
                                 rq_replay_list) {
                 if (lustre_msg_get_transno(reqiter->rq_reqmsg) == transno) {
+                       dup_req = reqiter;
                         dup = 1;
                         break;
                 }
@@ -1436,6 +1438,16 @@ static int target_exp_enqueue_req_replay(struct ptlrpc_request *req)
                      (MSG_RESENT | MSG_REPLAY)) != (MSG_RESENT | MSG_REPLAY))
                         CERROR("invalid flags %x of resent replay\n",
                                lustre_msg_get_flags(req->rq_reqmsg));
+
+               if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
+                       __u32 new_conn;
+
+                       new_conn = lustre_msg_get_conn_cnt(req->rq_reqmsg);
+                       if (new_conn >
+                           lustre_msg_get_conn_cnt(dup_req->rq_reqmsg))
+                               lustre_msg_set_conn_cnt(dup_req->rq_reqmsg,
+                                                       new_conn);
+               }
         } else {
                list_add_tail(&req->rq_replay_list,
                                   &exp->exp_req_replay_queue);
index d6cf452..aa574a3 100644 (file)
@@ -94,6 +94,13 @@ struct osp_precreate {
        int                              osp_pre_recovering;
 };
 
+struct osp_update_request_sub {
+       struct object_update_request    *ours_req;
+       size_t                          ours_req_size;
+       /* Linked to osp_update_request->our_req_list */
+       struct list_head                ours_list;
+};
+
 /**
  * Tracking the updates being executed on this dt_device.
  */
@@ -102,9 +109,8 @@ struct osp_update_request {
        /* update request result */
        int                             our_rc;
 
-       /* Holding object updates sent to the remote target */
-       struct object_update_request    *our_req;
-       size_t                          our_req_size;
+       /* List of osp_update_request_sub */
+       struct list_head                our_req_list;
 
        struct list_head                our_cb_items;
 
@@ -583,36 +589,47 @@ update_buffer_get_update(struct object_update_request *request,
 int osp_extend_update_buffer(const struct lu_env *env,
                             struct osp_update_request *our);
 
-#define osp_update_rpc_pack(env, name, update, op, ...)                \
-({                                                             \
-       struct object_update    *object_update;                 \
-       size_t                  max_update_length;              \
-       struct object_update_request *ureq;                     \
-       int                     ret;                            \
-                                                               \
+struct osp_update_request_sub *
+osp_current_object_update_request(struct osp_update_request *our);
+
+int osp_object_update_request_create(struct osp_update_request *our,
+                                    size_t size);
+
+#define osp_update_rpc_pack(env, name, our, op, ...)                   \
+({                                                                     \
+       struct object_update    *object_update;                         \
+       size_t                  max_update_length;                      \
+       struct osp_update_request_sub *ours;                            \
+       int ret;                                                        \
+                                                                       \
        while (1) {                                                     \
-               ureq = update->our_req;                                 \
-               max_update_length = update->our_req_size -              \
-                                   object_update_request_size(ureq);   \
+               ours = osp_current_object_update_request(our);          \
+               LASSERT(ours != NULL);                                  \
+               max_update_length = ours->ours_req_size -               \
+                           object_update_request_size(ours->ours_req); \
                                                                        \
-               object_update = update_buffer_get_update(ureq,          \
-                                                ureq->ourq_count);     \
+               object_update = update_buffer_get_update(ours->ours_req,\
+                                        ours->ours_req->ourq_count);   \
                ret = out_##name##_pack(env, object_update,             \
-                                       max_update_length,              \
+                                       &max_update_length,             \
                                       __VA_ARGS__);                    \
                if (ret == -E2BIG) {                                    \
                        int rc1;                                        \
-                       /* extend the buffer and retry */               \
-                       rc1 = osp_extend_update_buffer(env, update);    \
+                       /* Create new object update request */          \
+                       rc1 = osp_object_update_request_create(our,     \
+                               max_update_length  +                    \
+                               offsetof(struct object_update_request,  \
+                                        ourq_updates[0]) + 1);         \
                        if (rc1 != 0) {                                 \
                                ret = rc1;                              \
                                break;                                  \
                        }                                               \
+                       continue;                                       \
                } else {                                                \
                        if (ret == 0) {                                 \
+                               ours->ours_req->ourq_count++;           \
                                object_update->ou_flags |=              \
                                                     update->our_flags; \
-                               ureq->ourq_count++;                     \
                        }                                               \
                        break;                                          \
                }                                                       \
@@ -678,7 +695,7 @@ static inline void osp_thandle_put(struct osp_thandle *oth)
 }
 
 int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
-                       const struct object_update_request *ureq,
+                       struct osp_update_request *our,
                        struct ptlrpc_request **reqp);
 int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
                    struct osp_update_request *update,
index 46de68b..f84ffba 100644 (file)
@@ -153,30 +153,6 @@ update_buffer_get_update(struct object_update_request *request,
        return ptr;
 }
 
-int osp_extend_update_buffer(const struct lu_env *env,
-                            struct osp_update_request *our)
-{
-       struct object_update_request *obj_update_req;
-       size_t new_size = our->our_req_size + OUT_UPDATE_BUFFER_SIZE_ADD;
-
-       /* enlarge object update request size */
-       if (new_size > OUT_UPDATE_BUFFER_SIZE_MAX)
-               return -E2BIG;
-
-       OBD_ALLOC_LARGE(obj_update_req, new_size);
-       if (obj_update_req == NULL)
-               return -ENOMEM;
-
-       memcpy(obj_update_req, our->our_req, our->our_req_size);
-
-       OBD_FREE_LARGE(our->our_req, our->our_req_size);
-
-       our->our_req = obj_update_req;
-       our->our_req_size = new_size;
-
-       return 0;
-}
-
 /**
  * Implementation of dt_object_operations::do_create
  *
index 0c96f03..1ff4a88 100644 (file)
@@ -857,19 +857,21 @@ static int osp_declare_xattr_get(const struct lu_env *env, struct dt_object *dt,
                mutex_unlock(&osp->opd_async_requests_mutex);
                osp_oac_xattr_put(oxe);
        } else {
-               struct osp_update_request *update;
+               struct osp_update_request *our;
+               struct osp_update_request_sub *ours;
 
                /* XXX: Currently, we trigger the batched async OUT
                 *      RPC via dt_declare_xattr_get(). It is not
                 *      perfect solution, but works well now.
                 *
                 *      We will improve it in the future. */
-               update = osp->opd_async_requests;
-               if (update != NULL && update->our_req != NULL &&
-                   update->our_req->ourq_count > 0) {
+               our = osp->opd_async_requests;
+               ours = osp_current_object_update_request(our);
+               if (ours != NULL && ours->ours_req != NULL &&
+                   ours->ours_req->ourq_count > 0) {
                        osp->opd_async_requests = NULL;
                        mutex_unlock(&osp->opd_async_requests_mutex);
-                       rc = osp_unplug_async_request(env, osp, update);
+                       rc = osp_unplug_async_request(env, osp, our);
                } else {
                        mutex_unlock(&osp->opd_async_requests_mutex);
                }
index c1d1950..be7db77 100644 (file)
@@ -66,6 +66,7 @@
 
 #define DEBUG_SUBSYSTEM S_MDS
 
+#include <lustre_net.h>
 #include "osp_internal.h"
 
 /**
@@ -109,11 +110,61 @@ static struct object_update_request *object_update_request_alloc(size_t size)
        return ourq;
 }
 
-static void object_update_request_free(struct object_update_request *ourq,
-                                      size_t ourq_size)
+/**
+ * Allocate new update request
+ *
+ * Allocate new update request and insert it to the req_update_list.
+ *
+ * \param [in] our     osp_udate_request where to create a new
+ *                      update request
+ *
+ * \retval     0 if creation succeeds.
+ * \retval     negative errno if creation fails.
+ */
+int osp_object_update_request_create(struct osp_update_request *our,
+                                    size_t size)
 {
-       if (ourq != NULL)
-               OBD_FREE_LARGE(ourq, ourq_size);
+       struct osp_update_request_sub *ours;
+
+       OBD_ALLOC_PTR(ours);
+       if (ours == NULL)
+               return -ENOMEM;
+
+       ours->ours_req = object_update_request_alloc(size);
+
+       if (IS_ERR(ours->ours_req)) {
+               OBD_FREE_PTR(ours);
+               return -ENOMEM;
+       }
+
+       ours->ours_req_size = size;
+       INIT_LIST_HEAD(&ours->ours_list);
+       list_add_tail(&ours->ours_list, &our->our_req_list);
+
+       return 0;
+}
+
+/**
+ * Get current update request
+ *
+ * Get current object update request from our_req_list in
+ * osp_update_request, because we always insert the new update
+ * request in the last position, so the last update request
+ * in the list will be the current update req.
+ *
+ * \param[in] our      osp update request where to get the
+ *                      current object update.
+ *
+ * \retval             the current object update.
+ **/
+struct osp_update_request_sub *
+osp_current_object_update_request(struct osp_update_request *our)
+{
+       if (list_empty(&our->our_req_list))
+               return NULL;
+
+       return list_entry(our->our_req_list.prev, struct osp_update_request_sub,
+                         ours_list);
 }
 
 /**
@@ -130,35 +181,34 @@ static void object_update_request_free(struct object_update_request *ourq,
  */
 struct osp_update_request *osp_update_request_create(struct dt_device *dt)
 {
-       struct osp_update_request *osp_update_req;
-       struct object_update_request *ourq;
+       struct osp_update_request *our;
 
-       OBD_ALLOC_PTR(osp_update_req);
-       if (osp_update_req == NULL)
+       OBD_ALLOC_PTR(our);
+       if (our == NULL)
                return ERR_PTR(-ENOMEM);
 
-       ourq = object_update_request_alloc(OUT_UPDATE_INIT_BUFFER_SIZE);
-       if (IS_ERR(ourq)) {
-               OBD_FREE_PTR(osp_update_req);
-               return ERR_CAST(ourq);
-       }
-
-       osp_update_req->our_req = ourq;
-       osp_update_req->our_req_size = OUT_UPDATE_INIT_BUFFER_SIZE;
+       INIT_LIST_HEAD(&our->our_req_list);
+       INIT_LIST_HEAD(&our->our_cb_items);
+       INIT_LIST_HEAD(&our->our_list);
 
-       INIT_LIST_HEAD(&osp_update_req->our_cb_items);
-       INIT_LIST_HEAD(&osp_update_req->our_list);
-
-       return osp_update_req;
+       osp_object_update_request_create(our, OUT_UPDATE_INIT_BUFFER_SIZE);
+       return our;
 }
 
 void osp_update_request_destroy(struct osp_update_request *our)
 {
+       struct osp_update_request_sub *ours;
+       struct osp_update_request_sub *tmp;
+
        if (our == NULL)
                return;
 
-       object_update_request_free(our->our_req,
-                                  our->our_req_size);
+       list_for_each_entry_safe(ours, tmp, &our->our_req_list, ours_list) {
+               list_del(&ours->ours_list);
+               if (ours->ours_req != NULL)
+                       OBD_FREE(ours->ours_req, ours->ours_req_size);
+               OBD_FREE_PTR(ours);
+       }
        OBD_FREE_PTR(our);
 }
 
@@ -189,6 +239,131 @@ object_update_request_dump(const struct object_update_request *ourq,
               ourq->ourq_magic, ourq->ourq_count, total_size);
 }
 
+/**
+ * Prepare update request.
+ *
+ * Prepare OUT update ptlrpc request, and the request usually includes
+ * all of updates (stored in \param ureq) from one operation.
+ *
+ * \param[in] env      execution environment
+ * \param[in] imp      import on which ptlrpc request will be sent
+ * \param[in] ureq     hold all of updates which will be packed into the req
+ * \param[in] reqp     request to be created
+ *
+ * \retval             0 if preparation succeeds.
+ * \retval             negative errno if preparation fails.
+ */
+int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
+                       struct osp_update_request *our,
+                       struct ptlrpc_request **reqp)
+{
+       struct ptlrpc_request           *req;
+       struct ptlrpc_bulk_desc         *desc;
+       struct osp_update_request_sub   *ours;
+       struct out_update_header        *ouh;
+       struct out_update_buffer        *oub;
+       __u32                           buf_count = 0;
+       int                             rc;
+       ENTRY;
+
+       list_for_each_entry(ours, &our->our_req_list, ours_list) {
+               object_update_request_dump(ours->ours_req, D_INFO);
+               buf_count++;
+       }
+
+       req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE);
+       if (req == NULL)
+               RETURN(-ENOMEM);
+
+       req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_BUF, RCL_CLIENT,
+                            buf_count * sizeof(*oub));
+
+       rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE);
+       if (rc != 0)
+               GOTO(out_req, rc);
+
+       ouh = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE_HEADER);
+       ouh->ouh_magic = OUT_UPDATE_HEADER_MAGIC;
+       ouh->ouh_count = buf_count;
+
+       oub = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE_BUF);
+       list_for_each_entry(ours, &our->our_req_list, ours_list) {
+               oub->oub_size = ours->ours_req_size;
+               oub++;
+       }
+
+       req->rq_bulk_write = 1;
+       desc = ptlrpc_prep_bulk_imp(req, buf_count,
+               MD_MAX_BRW_SIZE >> LNET_MTU_BITS,
+               PTLRPC_BULK_GET_SOURCE | PTLRPC_BULK_BUF_KVEC,
+               MDS_BULK_PORTAL, &ptlrpc_bulk_kvec_ops);
+       if (desc == NULL)
+               GOTO(out_req, rc = -ENOMEM);
+
+       /* NB req now owns desc and will free it when it gets freed */
+       list_for_each_entry(ours, &our->our_req_list, ours_list)
+               desc->bd_frag_ops->add_iov_frag(desc, ours->ours_req,
+                                               ours->ours_req_size);
+
+       req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
+                            RCL_SERVER, OUT_UPDATE_REPLY_SIZE);
+
+       ptlrpc_request_set_replen(req);
+       req->rq_request_portal = OUT_PORTAL;
+       req->rq_reply_portal = OSC_REPLY_PORTAL;
+       *reqp = req;
+
+out_req:
+       if (rc < 0)
+               ptlrpc_req_finished(req);
+
+       RETURN(rc);
+}
+
+/**
+ * Send update RPC.
+ *
+ * Send update request to the remote MDT synchronously.
+ *
+ * \param[in] env      execution environment
+ * \param[in] imp      import on which ptlrpc request will be sent
+ * \param[in] our      hold all of updates which will be packed into the req
+ * \param[in] reqp     request to be created
+ *
+ * \retval             0 if RPC succeeds.
+ * \retval             negative errno if RPC fails.
+ */
+
+int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
+                   struct osp_update_request *our,
+                   struct ptlrpc_request **reqp)
+{
+       struct obd_import       *imp = osp->opd_obd->u.cli.cl_import;
+       struct ptlrpc_request   *req = NULL;
+       int                     rc;
+       ENTRY;
+
+       rc = osp_prep_update_req(env, imp, our, &req);
+       if (rc != 0)
+               RETURN(rc);
+
+       /* This will only be called with read-only update, and these updates
+        * might be used to retrieve update log during recovery process, so
+        * it will be allowed to send during recovery process */
+       req->rq_allow_replay = 1;
+
+       /* Note: some dt index api might return non-zero result here, like
+        * osd_index_ea_lookup, so we should only check rc < 0 here */
+       rc = ptlrpc_queue_wait(req);
+       our->our_rc = rc;
+       if (rc < 0 || reqp == NULL)
+               ptlrpc_req_finished(req);
+       else
+               *reqp = req;
+
+       RETURN(rc);
+}
+
 static void osp_trans_stop_cb(struct osp_thandle *oth, int result)
 {
        struct dt_txn_commit_cb *dcb;
@@ -367,7 +542,7 @@ int osp_unplug_async_request(const struct lu_env *env,
        int                      rc;
 
        rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
-                                our->our_req, &req);
+                                our, &req);
        if (rc != 0) {
                struct osp_update_callback *ouc;
                struct osp_update_callback *next;
@@ -489,6 +664,7 @@ int osp_insert_async_request(const struct lu_env *env, enum update_type op,
        struct object_update            *object_update;
        size_t                          max_update_size;
        struct object_update_request    *ureq;
+       struct osp_update_request_sub   *ours;
        int                             rc = 0;
        ENTRY;
 
@@ -498,11 +674,14 @@ int osp_insert_async_request(const struct lu_env *env, enum update_type op,
                RETURN(PTR_ERR(our));
 
 again:
-       ureq = our->our_req;
-       max_update_size = our->our_req_size - object_update_request_size(ureq);
+       ours = osp_current_object_update_request(our);
+
+       ureq = ours->ours_req;
+       max_update_size = ours->ours_req_size -
+                         object_update_request_size(ureq);
 
        object_update = update_buffer_get_update(ureq, ureq->ourq_count);
-       rc = out_update_pack(env, object_update, max_update_size, op,
+       rc = out_update_pack(env, object_update, &max_update_size, op,
                             lu_object_fid(osp2lu_obj(obj)), count, lens, bufs);
        /* The queue is full. */
        if (rc == -E2BIG) {
@@ -614,112 +793,6 @@ struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d)
 }
 
 /**
- * Prepare update request.
- *
- * Prepare OUT update ptlrpc request, and the request usually includes
- * all of updates (stored in \param ureq) from one operation.
- *
- * \param[in] env      execution environment
- * \param[in] imp      import on which ptlrpc request will be sent
- * \param[in] ureq     hold all of updates which will be packed into the req
- * \param[in] reqp     request to be created
- *
- * \retval             0 if preparation succeeds.
- * \retval             negative errno if preparation fails.
- */
-int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
-                       const struct object_update_request *ureq,
-                       struct ptlrpc_request **reqp)
-{
-       struct ptlrpc_request           *req;
-       struct object_update_request    *tmp;
-       size_t                          ureq_len;
-       int                             rc;
-       ENTRY;
-
-       object_update_request_dump(ureq, D_INFO);
-       req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE);
-       if (req == NULL)
-               RETURN(-ENOMEM);
-
-       ureq_len = object_update_request_size(ureq);
-       req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE, RCL_CLIENT,
-                            ureq_len);
-
-       rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, OUT_UPDATE);
-       if (rc != 0) {
-               ptlrpc_req_finished(req);
-               RETURN(rc);
-       }
-
-       req_capsule_set_size(&req->rq_pill, &RMF_OUT_UPDATE_REPLY,
-                            RCL_SERVER, OUT_UPDATE_REPLY_SIZE);
-
-       tmp = req_capsule_client_get(&req->rq_pill, &RMF_OUT_UPDATE);
-       memcpy(tmp, ureq, ureq_len);
-
-       ptlrpc_request_set_replen(req);
-       req->rq_request_portal = OUT_PORTAL;
-       req->rq_reply_portal = OSC_REPLY_PORTAL;
-       *reqp = req;
-
-       RETURN(rc);
-}
-
-/**
- * Send update RPC.
- *
- * Send update request to the remote MDT synchronously.
- *
- * \param[in] env      execution environment
- * \param[in] imp      import on which ptlrpc request will be sent
- * \param[in] our      hold all of updates which will be packed into the req
- * \param[in] reqp     request to be created
- *
- * \retval             0 if RPC succeeds.
- * \retval             negative errno if RPC fails.
- */
-
-int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
-                   struct osp_update_request *our,
-                   struct ptlrpc_request **reqp)
-{
-       struct obd_import       *imp = osp->opd_obd->u.cli.cl_import;
-       struct ptlrpc_request   *req = NULL;
-       int                     rc;
-       ENTRY;
-
-       rc = osp_prep_update_req(env, imp, our->our_req, &req);
-       if (rc != 0)
-               RETURN(rc);
-
-       /* This will only be called with read-only update, and these updates
-        * might be used to retrieve update log during recovery process, so
-        * it will be allowed to send during recovery process */
-       req->rq_allow_replay = 1;
-
-       /* Note: some dt index api might return non-zero result here, like
-        * osd_index_ea_lookup, so we should only check rc < 0 here */
-       rc = ptlrpc_queue_wait(req);
-       if (rc < 0) {
-               ptlrpc_req_finished(req);
-               our->our_rc = rc;
-               RETURN(rc);
-       }
-
-       if (reqp != NULL) {
-               *reqp = req;
-               RETURN(rc);
-       }
-
-       our->our_rc = rc;
-
-       ptlrpc_req_finished(req);
-
-       RETURN(rc);
-}
-
-/**
  * Add commit callback to transaction.
  *
  * Add commit callback to the osp thandle, which will be called
@@ -858,7 +931,7 @@ static int osp_send_update_req(const struct lu_env *env,
        LASSERT(oth != NULL);
        LASSERT(our->our_req_sent == 0);
        rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
-                                our->our_req, &req);
+                                our, &req);
        if (rc != 0) {
                osp_trans_callback(env, oth, rc);
                RETURN(rc);
@@ -1240,8 +1313,7 @@ int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
                oth->ot_storage_th = NULL;
        }
 
-       if (our == NULL || our->our_req == NULL ||
-           our->our_req->ourq_count == 0) {
+       if (our == NULL || list_empty(&our->our_req_list)) {
                osp_trans_callback(env, oth, th->th_result);
                GOTO(out, rc = th->th_result);
        }
index a72e53d..d874d70 100644 (file)
@@ -59,6 +59,11 @@ const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kiov_nopin_ops = {
 };
 EXPORT_SYMBOL(ptlrpc_bulk_kiov_nopin_ops);
 
+const struct ptlrpc_bulk_frag_ops ptlrpc_bulk_kvec_ops = {
+       .add_iov_frag = ptlrpc_prep_bulk_frag,
+};
+EXPORT_SYMBOL(ptlrpc_bulk_kvec_ops);
+
 static int ptlrpc_send_new_req(struct ptlrpc_request *req);
 static int ptlrpcd_check_work(struct ptlrpc_request *req);
 static int ptlrpc_unregister_reply(struct ptlrpc_request *request, int async);
@@ -224,6 +229,7 @@ int ptlrpc_prep_bulk_frag(struct ptlrpc_bulk_desc *desc,
                          void *frag, int len)
 {
        struct kvec *iovec;
+       ENTRY;
 
        LASSERT(desc->bd_iov_count < desc->bd_max_iov);
        LASSERT(frag != NULL);
@@ -239,7 +245,7 @@ int ptlrpc_prep_bulk_frag(struct ptlrpc_bulk_desc *desc,
 
        desc->bd_iov_count++;
 
-       return desc->bd_nob;
+       RETURN(desc->bd_nob);
 }
 EXPORT_SYMBOL(ptlrpc_prep_bulk_frag);
 
index f8f38aa..575f4c8 100644 (file)
@@ -524,7 +524,8 @@ static const struct req_msg_field *mds_setattr_server[] = {
 
 static const struct req_msg_field *mds_update_client[] = {
        &RMF_PTLRPC_BODY,
-       &RMF_OUT_UPDATE,
+       &RMF_OUT_UPDATE_HEADER,
+       &RMF_OUT_UPDATE_BUF,
 };
 
 static const struct req_msg_field *mds_update_server[] = {
@@ -1189,6 +1190,16 @@ struct req_msg_field RMF_LFSCK_REPLY =
                    lustre_swab_lfsck_reply, NULL);
 EXPORT_SYMBOL(RMF_LFSCK_REPLY);
 
+struct req_msg_field RMF_OUT_UPDATE_HEADER = DEFINE_MSGF("out_update", 0,
+                               sizeof(struct out_update_header),
+                               lustre_swab_out_update_header, NULL);
+EXPORT_SYMBOL(RMF_OUT_UPDATE_HEADER);
+
+struct req_msg_field RMF_OUT_UPDATE_BUF = DEFINE_MSGF("update_buf",
+                       RMF_F_STRUCT_ARRAY, sizeof(struct out_update_buffer),
+                       lustre_swab_out_update_buffer, NULL);
+EXPORT_SYMBOL(RMF_OUT_UPDATE_BUF);
+
 /*
  * Request formats.
  */
@@ -1381,7 +1392,7 @@ struct req_format RQF_MDS_GET_INFO =
 EXPORT_SYMBOL(RQF_MDS_GET_INFO);
 
 struct req_format RQF_OUT_UPDATE =
-       DEFINE_REQ_FMT0("OUT_UPDATE_OBJ", mds_update_client,
+       DEFINE_REQ_FMT0("OUT_UPDATE", mds_update_client,
                        mds_update_server);
 EXPORT_SYMBOL(RQF_OUT_UPDATE);
 
index ccc15d5..f1f60c7 100644 (file)
@@ -2590,6 +2590,20 @@ void lustre_swab_object_update_reply(struct object_update_reply *our)
        }
 }
 
+void lustre_swab_out_update_header(struct out_update_header *ouh)
+{
+       __swab32s(&ouh->ouh_magic);
+       __swab32s(&ouh->ouh_count);
+}
+EXPORT_SYMBOL(lustre_swab_out_update_header);
+
+void lustre_swab_out_update_buffer(struct out_update_buffer *oub)
+{
+       __swab32s(&oub->oub_size);
+       __swab32s(&oub->oub_padding);
+}
+EXPORT_SYMBOL(lustre_swab_out_update_buffer);
+
 void lustre_swab_swap_layouts(struct mdc_swap_layouts *msl)
 {
        __swab64s(&msl->msl_flags);
index 9e63594..f2e65f5 100644 (file)
@@ -854,42 +854,39 @@ int out_handle(struct tgt_session_info *tsi)
        struct thandle_exec_args        *ta = &tti->tti_tea;
        struct req_capsule              *pill = tsi->tsi_pill;
        struct dt_device                *dt = tsi->tsi_tgt->lut_bottom;
-       struct object_update_request    *ureq;
+       struct out_update_header        *ouh;
+       struct out_update_buffer        *oub;
        struct object_update            *update;
        struct object_update_reply      *reply;
-       int                              bufsize;
-       int                              count;
-       int                              current_batchid = -1;
-       int                              i;
-       int                              rc = 0;
-       int                              rc1 = 0;
+       struct ptlrpc_bulk_desc         *desc;
+       struct l_wait_info              lwi;
+       void                            **update_bufs;
+       int                             current_batchid = -1;
+       __u32                           update_buf_count;
+       unsigned int                    i;
+       unsigned int                    reply_index = 0;
+       int                             rc = 0;
+       int                             rc1 = 0;
 
        ENTRY;
 
        req_capsule_set(pill, &RQF_OUT_UPDATE);
-       ureq = req_capsule_client_get(pill, &RMF_OUT_UPDATE);
-       if (ureq == NULL) {
+       ouh = req_capsule_client_get(pill, &RMF_OUT_UPDATE_HEADER);
+       if (ouh == NULL) {
                CERROR("%s: No buf!: rc = %d\n", tgt_name(tsi->tsi_tgt),
                       -EPROTO);
                RETURN(err_serious(-EPROTO));
        }
 
-       bufsize = req_capsule_get_size(pill, &RMF_OUT_UPDATE, RCL_CLIENT);
-       if (bufsize != object_update_request_size(ureq)) {
-               CERROR("%s: invalid bufsize %d: rc = %d\n",
-                      tgt_name(tsi->tsi_tgt), bufsize, -EPROTO);
-               RETURN(err_serious(-EPROTO));
-       }
-
-       if (ureq->ourq_magic != UPDATE_REQUEST_MAGIC) {
+       if (ouh->ouh_magic != OUT_UPDATE_HEADER_MAGIC) {
                CERROR("%s: invalid update buffer magic %x expect %x: "
-                      "rc = %d\n", tgt_name(tsi->tsi_tgt), ureq->ourq_magic,
+                      "rc = %d\n", tgt_name(tsi->tsi_tgt), ouh->ouh_magic,
                       UPDATE_REQUEST_MAGIC, -EPROTO);
                RETURN(err_serious(-EPROTO));
        }
 
-       count = ureq->ourq_count;
-       if (count <= 0) {
+       update_buf_count = ouh->ouh_count;
+       if (update_buf_count == 0) {
                CERROR("%s: empty update: rc = %d\n", tgt_name(tsi->tsi_tgt),
                       -EPROTO);
                RETURN(err_serious(-EPROTO));
@@ -904,103 +901,157 @@ int out_handle(struct tgt_session_info *tsi)
                RETURN(rc);
        }
 
+       OBD_ALLOC(update_bufs, sizeof(*update_bufs) * update_buf_count);
+       if (update_bufs == NULL)
+               RETURN(-ENOMEM);
+
+       oub = req_capsule_client_get(pill, &RMF_OUT_UPDATE_BUF);
+       desc = ptlrpc_prep_bulk_exp(pill->rc_req, update_buf_count,
+                                   PTLRPC_BULK_OPS_COUNT,
+                                   PTLRPC_BULK_GET_SINK |
+                                   PTLRPC_BULK_BUF_KVEC,
+                                   MDS_BULK_PORTAL, &ptlrpc_bulk_kvec_ops);
+       if (desc == NULL)
+               GOTO(out_free, rc = -ENOMEM);
+
+       /* NB Having prepped, we must commit... */
+       for (i = 0; i < update_buf_count; i++, oub++) {
+               OBD_ALLOC(update_bufs[i], oub->oub_size);
+               if (update_bufs[i] == NULL)
+                       GOTO(out_free, rc = -ENOMEM);
+
+               desc->bd_frag_ops->add_iov_frag(desc, update_bufs[i],
+                                               oub->oub_size);
+       }
+
+       pill->rc_req->rq_bulk_write = 1;
+       rc = sptlrpc_svc_prep_bulk(pill->rc_req, desc);
+       if (rc != 0)
+               GOTO(out_free, rc);
+
+       rc = target_bulk_io(pill->rc_req->rq_export, desc, &lwi);
+       if (rc < 0)
+               GOTO(out_free, rc);
+
        /* Prepare the update reply buffer */
        reply = req_capsule_server_get(pill, &RMF_OUT_UPDATE_REPLY);
        if (reply == NULL)
-               RETURN(err_serious(-EPROTO));
-       object_update_reply_init(reply, count);
+               GOTO(out_free, rc = err_serious(-EPROTO));
+       reply->ourp_magic = UPDATE_REPLY_MAGIC;
        tti->tti_u.update.tti_update_reply = reply;
        tti->tti_mult_trans = !req_is_replay(tgt_ses_req(tsi));
 
        /* Walk through updates in the request to execute them synchronously */
-       for (i = 0; i < count; i++) {
+       for (i = 0; i < update_buf_count; i++) {
                struct tgt_handler      *h;
                struct dt_object        *dt_obj;
+               int                     update_count;
+               struct object_update_request *our;
+               int                     j;
 
-               update = object_update_request_get(ureq, i, NULL);
-               if (update == NULL)
-                       GOTO(out, rc = -EPROTO);
-
+               our = update_bufs[i];
                if (ptlrpc_req_need_swab(pill->rc_req))
-                       lustre_swab_object_update(update);
-
-               if (!fid_is_sane(&update->ou_fid)) {
-                       CERROR("%s: invalid FID "DFID": rc = %d\n",
-                              tgt_name(tsi->tsi_tgt), PFID(&update->ou_fid),
-                              -EPROTO);
-                       GOTO(out, rc = err_serious(-EPROTO));
-               }
+                       lustre_swab_object_update_request(our);
 
-               dt_obj = dt_locate(env, dt, &update->ou_fid);
-               if (IS_ERR(dt_obj))
-                       GOTO(out, rc = PTR_ERR(dt_obj));
-
-               if (dt->dd_record_fid_accessed) {
-                       lfsck_pack_rfa(&tti->tti_lr,
-                                      lu_object_fid(&dt_obj->do_lu),
-                                      LE_FID_ACCESSED,
-                                      LFSCK_TYPE_LAYOUT);
-                       tgt_lfsck_in_notify(env, dt, &tti->tti_lr, NULL);
+               if (our->ourq_magic != UPDATE_REQUEST_MAGIC) {
+                       CERROR("%s: invalid update buffer magic %x"
+                              " expect %x: rc = %d\n",
+                              tgt_name(tsi->tsi_tgt), our->ourq_magic,
+                              UPDATE_REQUEST_MAGIC, -EPROTO);
+                       GOTO(out, rc = -EPROTO);
                }
 
-               tti->tti_u.update.tti_dt_object = dt_obj;
-               tti->tti_u.update.tti_update = update;
-               tti->tti_u.update.tti_update_reply_index = i;
+               update_count = our->ourq_count;
+               reply->ourp_count += update_count;
+               for (j = 0; j < update_count; j++) {
+                       update = object_update_request_get(our, j, NULL);
+                       if (update == NULL)
+                               GOTO(out, rc = -EPROTO);
+
+                       if (ptlrpc_req_need_swab(pill->rc_req))
+                               lustre_swab_object_update(update);
+
+                       if (!fid_is_sane(&update->ou_fid)) {
+                               CERROR("%s: invalid FID "DFID": rc = %d\n",
+                                      tgt_name(tsi->tsi_tgt),
+                                      PFID(&update->ou_fid), -EPROTO);
+                               GOTO(out, rc = err_serious(-EPROTO));
+                       }
 
-               h = out_handler_find(update->ou_type);
-               if (unlikely(h == NULL)) {
-                       CERROR("%s: unsupported opc: 0x%x\n",
-                              tgt_name(tsi->tsi_tgt), update->ou_type);
-                       GOTO(next, rc = -ENOTSUPP);
-               }
+                       dt_obj = dt_locate(env, dt, &update->ou_fid);
+                       if (IS_ERR(dt_obj))
+                               GOTO(out, rc = PTR_ERR(dt_obj));
+
+                       if (dt->dd_record_fid_accessed) {
+                               lfsck_pack_rfa(&tti->tti_lr,
+                                              lu_object_fid(&dt_obj->do_lu),
+                                              LE_FID_ACCESSED,
+                                              LFSCK_TYPE_LAYOUT);
+                               tgt_lfsck_in_notify(env, dt, &tti->tti_lr,
+                                                   NULL);
+                       }
 
-               /* Check resend case only for modifying RPC */
-               if (h->th_flags & MUTABOR) {
-                       struct ptlrpc_request *req = tgt_ses_req(tsi);
+                       tti->tti_u.update.tti_dt_object = dt_obj;
+                       tti->tti_u.update.tti_update = update;
+                       tti->tti_u.update.tti_update_reply_index = reply_index;
 
-                       if (out_check_resent(env, dt, dt_obj, req,
-                                            out_reconstruct, reply, i))
-                               GOTO(next, rc = 0);
-               }
-
-               /* start transaction for modification RPC only */
-               if (h->th_flags & MUTABOR && current_batchid == -1) {
-                       current_batchid = update->ou_batchid;
-                       rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
-                       if (rc != 0)
-                               GOTO(next, rc);
+                       h = out_handler_find(update->ou_type);
+                       if (unlikely(h == NULL)) {
+                               CERROR("%s: unsupported opc: 0x%x\n",
+                                      tgt_name(tsi->tsi_tgt), update->ou_type);
+                               GOTO(next, rc = -ENOTSUPP);
+                       }
 
-                       if (update->ou_flags & UPDATE_FL_SYNC)
-                               ta->ta_handle->th_sync = 1;
-               }
+                       /* Check resend case only for modifying RPC */
+                       if (h->th_flags & MUTABOR) {
+                               struct ptlrpc_request *req = tgt_ses_req(tsi);
 
-               /* Stop the current update transaction, if the update has
-                * different batchid, or read-only update */
-               if (((current_batchid != update->ou_batchid) ||
-                    !(h->th_flags & MUTABOR)) && ta->ta_handle != NULL) {
-                       rc = out_tx_end(env, ta, rc);
-                       current_batchid = -1;
-                       if (rc != 0)
-                               GOTO(next, rc);
+                               if (out_check_resent(env, dt, dt_obj, req,
+                                                    out_reconstruct, reply,
+                                                    reply_index))
+                                       GOTO(next, rc = 0);
+                       }
 
-                       /* start a new transaction if needed */
-                       if (h->th_flags & MUTABOR) {
+                       /* start transaction for modification RPC only */
+                       if (h->th_flags & MUTABOR && current_batchid == -1) {
+                               current_batchid = update->ou_batchid;
                                rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
                                if (rc != 0)
                                        GOTO(next, rc);
 
                                if (update->ou_flags & UPDATE_FL_SYNC)
                                        ta->ta_handle->th_sync = 1;
+                       }
 
-                               current_batchid = update->ou_batchid;
+                       /* Stop the current update transaction, if the update
+                        * has different batchid, or read-only update */
+                       if (((current_batchid != update->ou_batchid) ||
+                            !(h->th_flags & MUTABOR)) &&
+                            ta->ta_handle != NULL) {
+                               rc = out_tx_end(env, ta, rc);
+                               current_batchid = -1;
+                               if (rc != 0)
+                                       GOTO(next, rc);
+
+                               /* start a new transaction if needed */
+                               if (h->th_flags & MUTABOR) {
+                                       rc = out_tx_start(env, dt, ta,
+                                                         tsi->tsi_exp);
+                                       if (rc != 0)
+                                               GOTO(next, rc);
+                                       if (update->ou_flags & UPDATE_FL_SYNC)
+                                               ta->ta_handle->th_sync = 1;
+                                       current_batchid = update->ou_batchid;
+                               }
                        }
-               }
 
-               rc = h->th_act(tsi);
+                       rc = h->th_act(tsi);
 next:
-               lu_object_put(env, &dt_obj->do_lu);
-               if (rc < 0)
-                       GOTO(out, rc);
+                       reply_index++;
+                       lu_object_put(env, &dt_obj->do_lu);
+                       if (rc < 0)
+                               GOTO(out, rc);
+               }
        }
 out:
        if (current_batchid != -1) {
@@ -1009,6 +1060,20 @@ out:
                        rc = rc1;
        }
 
+out_free:
+       oub = req_capsule_client_get(pill, &RMF_OUT_UPDATE_BUF);
+       if (update_bufs != NULL) {
+               for (i = 0; i < update_buf_count; i++, oub++) {
+                       if (update_bufs[i] != NULL)
+                               OBD_FREE(update_bufs[i], oub->oub_size);
+               }
+               OBD_FREE(update_bufs, sizeof(update_bufs[0]) *
+                                       update_buf_count);
+       }
+
+       if (desc != NULL)
+               ptlrpc_free_bulk(desc);
+
        RETURN(rc);
 }
 
index 0c2c95b..62e54d7 100644 (file)
@@ -38,9 +38,6 @@
 #include <obd_class.h>
 #include "tgt_internal.h"
 
-#define OUT_UPDATE_BUFFER_SIZE_ADD     4096
-#define OUT_UPDATE_BUFFER_SIZE_MAX     (256 * 4096)  /* 1MB update size now */
-
 const char *update_op_str(__u16 opc)
 {
        static const char *opc_str[] = {
@@ -77,21 +74,23 @@ EXPORT_SYMBOL(update_op_str);
  *
  * \params[in] env             execution environment
  * \params[in] update          object update to be filled
- * \params[in] max_update_size maximum object update size, if the current
- *                              update length equals or exceeds the size, it
- *                              will return -E2BIG.
+ * \params[in,out] max_update_size     maximum object update size, if the
+ *                                      current update length equals or
+ *                                      exceeds the size, it will return -E2BIG.
  * \params[in] update_op       update type
  * \params[in] fid             object FID of the update
- * \params[in] params_count    the count of the update parameters
- * \params[in] params_sizes    the length of each parameters
+ * \params[in] param_count     the count of the update parameters
+ * \params[in] param_sizes     the length of each parameters
  *
  * \retval                     0 if packing succeeds.
  * \retval                     -E2BIG if packing exceeds the maximum length.
  */
 int out_update_header_pack(const struct lu_env *env,
-                          struct object_update *update, size_t max_update_size,
-                          enum update_type update_op, const struct lu_fid *fid,
-                          unsigned int param_count, __u16 *params_sizes)
+                          struct object_update *update,
+                          size_t *max_update_size,
+                          enum update_type update_op,
+                          const struct lu_fid *fid,
+                          unsigned int param_count, __u16 *param_sizes)
 {
        struct object_update_param      *param;
        unsigned int                    i;
@@ -100,17 +99,19 @@ int out_update_header_pack(const struct lu_env *env,
        /* Check whether the packing exceeding the maxima update length */
        update_size = sizeof(*update);
        for (i = 0; i < param_count; i++)
-               update_size += cfs_size_round(sizeof(*param) + params_sizes[i]);
+               update_size += cfs_size_round(sizeof(*param) + param_sizes[i]);
 
-       if (unlikely(update_size >= max_update_size))
+       if (unlikely(update_size >= *max_update_size)) {
+               *max_update_size = update_size;
                return -E2BIG;
+       }
 
        update->ou_fid = *fid;
        update->ou_type = update_op;
        update->ou_params_count = param_count;
        param = &update->ou_params[0];
        for (i = 0; i < param_count; i++) {
-               param->oup_len = params_sizes[i];
+               param->oup_len = param_sizes[i];
                param = (struct object_update_param *)((char *)param +
                         object_update_param_size(param));
        }
@@ -134,7 +135,7 @@ int out_update_header_pack(const struct lu_env *env,
  * \retval             negative errno if updates packing fails
  **/
 int out_update_pack(const struct lu_env *env, struct object_update *update,
-                   size_t max_update_size, enum update_type op,
+                   size_t *max_update_size, enum update_type op,
                    const struct lu_fid *fid, unsigned int param_count,
                    __u16 *param_sizes, const void **param_bufs)
 {
@@ -175,7 +176,7 @@ EXPORT_SYMBOL(out_update_pack);
  * \retval             negative errno if insertion fails.
  */
 int out_create_pack(const struct lu_env *env, struct object_update *update,
-                   size_t max_update_size, const struct lu_fid *fid,
+                   size_t *max_update_size, const struct lu_fid *fid,
                    const struct lu_attr *attr, struct dt_allocation_hint *hint,
                    struct dt_object_format *dof)
 {
@@ -216,7 +217,7 @@ int out_create_pack(const struct lu_env *env, struct object_update *update,
 EXPORT_SYMBOL(out_create_pack);
 
 int out_ref_del_pack(const struct lu_env *env, struct object_update *update,
-                    size_t max_update_size, const struct lu_fid *fid)
+                    size_t *max_update_size, const struct lu_fid *fid)
 {
        return out_update_pack(env, update, max_update_size, OUT_REF_DEL, fid,
                               0, NULL, NULL);
@@ -224,7 +225,7 @@ int out_ref_del_pack(const struct lu_env *env, struct object_update *update,
 EXPORT_SYMBOL(out_ref_del_pack);
 
 int out_ref_add_pack(const struct lu_env *env, struct object_update *update,
-                    size_t max_update_size, const struct lu_fid *fid)
+                    size_t *max_update_size, const struct lu_fid *fid)
 {
        return out_update_pack(env, update, max_update_size, OUT_REF_ADD, fid,
                               0, NULL, NULL);
@@ -232,7 +233,7 @@ int out_ref_add_pack(const struct lu_env *env, struct object_update *update,
 EXPORT_SYMBOL(out_ref_add_pack);
 
 int out_attr_set_pack(const struct lu_env *env, struct object_update *update,
-                     size_t max_update_size, const struct lu_fid *fid,
+                     size_t *max_update_size, const struct lu_fid *fid,
                      const struct lu_attr *attr)
 {
        struct obdo             *obdo;
@@ -256,7 +257,7 @@ int out_attr_set_pack(const struct lu_env *env, struct object_update *update,
 EXPORT_SYMBOL(out_attr_set_pack);
 
 int out_xattr_set_pack(const struct lu_env *env, struct object_update *update,
-                      size_t max_update_size, const struct lu_fid *fid,
+                      size_t *max_update_size, const struct lu_fid *fid,
                       const struct lu_buf *buf, const char *name, __u32 flag)
 {
        __u16   sizes[3] = {strlen(name) + 1, buf->lb_len, sizeof(flag)};
@@ -269,7 +270,7 @@ int out_xattr_set_pack(const struct lu_env *env, struct object_update *update,
 EXPORT_SYMBOL(out_xattr_set_pack);
 
 int out_xattr_del_pack(const struct lu_env *env, struct object_update *update,
-                      size_t max_update_size, const struct lu_fid *fid,
+                      size_t *max_update_size, const struct lu_fid *fid,
                       const char *name)
 {
        __u16   size = strlen(name) + 1;
@@ -279,10 +280,9 @@ int out_xattr_del_pack(const struct lu_env *env, struct object_update *update,
 }
 EXPORT_SYMBOL(out_xattr_del_pack);
 
-
 int out_index_insert_pack(const struct lu_env *env,
                          struct object_update *update,
-                         size_t max_update_size, const struct lu_fid *fid,
+                         size_t *max_update_size, const struct lu_fid *fid,
                          const struct dt_rec *rec, const struct dt_key *key)
 {
        struct dt_insert_rec       *rec1 = (struct dt_insert_rec *)rec;
@@ -304,7 +304,7 @@ EXPORT_SYMBOL(out_index_insert_pack);
 
 int out_index_delete_pack(const struct lu_env *env,
                          struct object_update *update,
-                         size_t max_update_size, const struct lu_fid *fid,
+                         size_t *max_update_size, const struct lu_fid *fid,
                          const struct dt_key *key)
 {
        __u16   size = strlen((char *)key) + 1;
@@ -317,7 +317,7 @@ EXPORT_SYMBOL(out_index_delete_pack);
 
 int out_object_destroy_pack(const struct lu_env *env,
                            struct object_update *update,
-                           size_t max_update_size, const struct lu_fid *fid)
+                           size_t *max_update_size, const struct lu_fid *fid)
 {
        return out_update_pack(env, update, max_update_size, OUT_DESTROY, fid,
                               0, NULL, NULL);
@@ -325,7 +325,7 @@ int out_object_destroy_pack(const struct lu_env *env,
 EXPORT_SYMBOL(out_object_destroy_pack);
 
 int out_write_pack(const struct lu_env *env, struct object_update *update,
-                  size_t max_update_size, const struct lu_fid *fid,
+                  size_t *max_update_size, const struct lu_fid *fid,
                   const struct lu_buf *buf, __u64 pos)
 {
        __u16           sizes[2] = {buf->lb_len, sizeof(pos)};
@@ -356,7 +356,7 @@ EXPORT_SYMBOL(out_write_pack);
  **/
 int out_index_lookup_pack(const struct lu_env *env,
                          struct object_update *update,
-                         size_t max_update_size, const struct lu_fid *fid,
+                         size_t *max_update_size, const struct lu_fid *fid,
                          struct dt_rec *rec, const struct dt_key *key)
 {
        const void      *name = key;
@@ -368,7 +368,7 @@ int out_index_lookup_pack(const struct lu_env *env,
 EXPORT_SYMBOL(out_index_lookup_pack);
 
 int out_attr_get_pack(const struct lu_env *env, struct object_update *update,
-                     size_t max_update_size, const struct lu_fid *fid)
+                     size_t *max_update_size, const struct lu_fid *fid)
 {
        return out_update_pack(env, update, max_update_size, OUT_ATTR_GET,
                               fid, 0, NULL, NULL);
@@ -376,7 +376,7 @@ int out_attr_get_pack(const struct lu_env *env, struct object_update *update,
 EXPORT_SYMBOL(out_attr_get_pack);
 
 int out_xattr_get_pack(const struct lu_env *env, struct object_update *update,
-                      size_t max_update_size, const struct lu_fid *fid,
+                      size_t *max_update_size, const struct lu_fid *fid,
                       const char *name)
 {
        __u16 size;
@@ -390,7 +390,7 @@ int out_xattr_get_pack(const struct lu_env *env, struct object_update *update,
 EXPORT_SYMBOL(out_xattr_get_pack);
 
 int out_read_pack(const struct lu_env *env, struct object_update *update,
-                 size_t max_update_length, const struct lu_fid *fid,
+                 size_t *max_update_size, const struct lu_fid *fid,
                  size_t size, loff_t pos)
 {
        __u16           sizes[2] = {sizeof(size), sizeof(pos)};
@@ -399,7 +399,7 @@ int out_read_pack(const struct lu_env *env, struct object_update *update,
        size = cpu_to_le64(size);
        pos = cpu_to_le64(pos);
 
-       return out_update_pack(env, update, max_update_length, OUT_READ, fid,
+       return out_update_pack(env, update, max_update_size, OUT_READ, fid,
                               ARRAY_SIZE(sizes), sizes, bufs);
 }
 EXPORT_SYMBOL(out_read_pack);
index d73c12b..df9c3e1 100755 (executable)
@@ -24,7 +24,8 @@ require_dsh_mds || exit 0
 # bug number for skipped tests:
 # b=17466/LU-472 : 61d
 # LU-5319 : 53a 53d
-ALWAYS_EXCEPT="61d 53a 53d $REPLAY_SINGLE_EXCEPT"
+# LU-6780 : 80d 80h 81d 81h 110e 110f 110g 111c 111d 111e 111f 111g 112
+ALWAYS_EXCEPT="61d 53a 53d  80d 80h 81d 81h 110e 110f 110g 111c 111d 111e 111f 111g 112 $REPLAY_SINGLE_EXCEPT"
 # UPDATE THE COMMENT ABOVE WITH BUG NUMBERS WHEN CHANGING ALWAYS_EXCEPT!
 
 case "$(lsb_release -sr)" in   # only disable tests for el7
index 16a6dce..02f41b9 100644 (file)
 #define lustre_swab_object_update_result NULL
 #define lustre_swab_object_update_reply NULL
 #define lustre_swab_object_update_request NULL
+#define lustre_swab_out_update_header NULL
+#define lustre_swab_out_update_buffer NULL
 
 #define dump_rniobuf NULL
 #define dump_ioo NULL