Whamcloud - gitweb
LU-3536 lod: record update for cross-MDT operation 39/10939/58
authorWang Di <di.wang@intel.com>
Mon, 11 Aug 2014 14:37:56 +0000 (07:37 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Mon, 1 Jun 2015 20:27:01 +0000 (20:27 +0000)
Packing updates for cross-MDT operation in the buffer, and
the maximum update record size is about 1M, which is enough
to create 2k stripes for now. To save the save, these update
records will use different packing format with OSP RPC,
see lustre/target/update_records.c

These updates will be stored in all of MDTs(in the later patch).
During the recovery, master MDT will redo the operation according
to these updates records.

Change-Id: Ic1919ab1c3d2eeca9ef027e2309c42201e3f7a74
Signed-off-by: Wang Di <di.wang@intel.com>
Reviewed-on: http://review.whamcloud.com/10939
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
17 files changed:
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_update.h
lustre/lod/lod_internal.h
lustre/lod/lod_object.c
lustre/lod/lod_sub_object.c
lustre/osp/osp_internal.h
lustre/osp/osp_md_object.c
lustre/osp/osp_object.c
lustre/osp/osp_trans.c
lustre/ptlrpc/Makefile.in
lustre/target/Makefile.am
lustre/target/out_handler.c
lustre/target/out_lib.c
lustre/target/tgt_internal.h
lustre/target/tgt_main.c
lustre/target/update_records.c [new file with mode: 0644]
lustre/target/update_trans.c

index 314dbd7..bf59646 100644 (file)
@@ -3192,6 +3192,7 @@ typedef enum {
        CHANGELOG_REC           = LLOG_OP_MAGIC | 0x60000,
        CHANGELOG_USER_REC      = LLOG_OP_MAGIC | 0x70000,
        HSM_AGENT_REC           = LLOG_OP_MAGIC | 0x80000,
+       UPDATE_REC              = LLOG_OP_MAGIC | 0xa0000,
        LLOG_HDR_MAGIC          = LLOG_OP_MAGIC | 0x45539,
        LLOG_LOGID_MAGIC        = LLOG_OP_MAGIC | 0x4553b,
 } llog_op_type;
@@ -3947,9 +3948,11 @@ extern void lustre_swab_hsm_request(struct hsm_request *hr);
  */
 
 /**
- * Type of each update
+ * Type of each update, if adding/deleting update, please also update
+ * update_opcode in lustre/target/out_lib.c.
  */
 enum update_type {
+       OUT_START               = 0,
        OUT_CREATE              = 1,
        OUT_DESTROY             = 2,
        OUT_REF_ADD             = 3,
@@ -4013,19 +4016,28 @@ void lustre_swab_object_update(struct object_update *ou);
 void lustre_swab_object_update_request(struct object_update_request *our);
 
 static inline size_t
-object_update_size(const struct object_update *update)
+object_update_params_size(const struct object_update *update)
 {
-       const struct    object_update_param *param;
-       size_t          size;
-       unsigned int    i;
+       const struct object_update_param *param;
+       size_t                           total_size = 0;
+       unsigned int                     i;
 
-       size = offsetof(struct object_update, ou_params[0]);
+       param = &update->ou_params[0];
        for (i = 0; i < update->ou_params_count; i++) {
-               param = (struct object_update_param *)((char *)update + size);
-               size += object_update_param_size(param);
+               size_t size = object_update_param_size(param);
+
+               param = (struct object_update_param *)((char *)param + size);
+               total_size += size;
        }
 
-       return size;
+       return total_size;
+}
+
+static inline size_t
+object_update_size(const struct object_update *update)
+{
+       return offsetof(struct object_update, ou_params[0]) +
+              object_update_params_size(update);
 }
 
 static inline struct object_update *
index 1ba3e34..996a9b3 100644 (file)
 
 struct dt_key;
 struct dt_rec;
+struct object_update_param;
 
 struct update_buffer {
        struct object_update_request    *ub_req;
        size_t                          ub_req_size;
 };
 
-#define TOP_THANDLE_MAGIC      0x20140917
-/* {top,sub}_thandle are used to manage distributed transactions which
- * include updates on several nodes. A top_handle represents the
- * whole operation, and sub_thandle represents updates on each node. */
-struct top_thandle {
-       struct thandle          tt_super;
-       __u32                   tt_magic;
-       /* The master sub transaction. */
-       struct thandle          *tt_master_sub_thandle;
-
-       /* Other sub thandle will be listed here. */
-       struct list_head        tt_sub_thandle_list;
-};
-
-struct sub_thandle {
-       /* point to the osd/osp_thandle */
-       struct thandle          *st_sub_th;
-       struct list_head        st_sub_list;
-};
-
 /**
  * Tracking the updates being executed on this dt_device.
  */
@@ -80,6 +61,165 @@ struct dt_update_request {
        struct list_head                dur_cb_items;
 };
 
+struct update_params {
+       struct object_update_param      up_params[0];
+};
+
+static inline size_t update_params_size(const struct update_params *params,
+                                       unsigned int param_count)
+{
+       struct object_update_param      *param;
+       size_t total_size = sizeof(*params);
+       unsigned int i;
+
+       param = (struct object_update_param *)&params->up_params[0];
+       for (i = 0; i < param_count; i++) {
+               size_t size = object_update_param_size(param);
+
+               param = (struct object_update_param *)((char *)param + size);
+               total_size += size;
+       }
+
+       return total_size;
+}
+
+static inline struct object_update_param *
+update_params_get_param(const struct update_params *params,
+                       unsigned int index, unsigned int param_count)
+{
+       struct object_update_param *param;
+       unsigned int            i;
+
+       if (index > param_count)
+               return NULL;
+
+       param = (struct object_update_param *)&params->up_params[0];
+       for (i = 0; i < index; i++)
+               param = (struct object_update_param *)((char *)param +
+                       object_update_param_size(param));
+
+       return param;
+}
+
+struct update_op {
+       struct lu_fid uop_fid;
+       __u16   uop_type;
+       __u16   uop_param_count;
+       __u16   uop_params_off[0];
+};
+
+static inline size_t
+update_op_size(unsigned int param_count)
+{
+       return offsetof(struct update_op, uop_params_off[param_count]);
+}
+
+static inline struct update_op *
+update_op_next_op(const struct update_op *uop)
+{
+       return (struct update_op *)((char *)uop +
+                               update_op_size(uop->uop_param_count));
+}
+
+/* All of updates in the mulitple_update_record */
+struct update_ops {
+       struct update_op        uops_op[0];
+};
+
+static inline size_t update_ops_size(const struct update_ops *ops,
+                                    unsigned int update_count)
+{
+       struct update_op *op;
+       size_t total_size = sizeof(*ops);
+       unsigned int i;
+
+       op = (struct update_op *)&ops->uops_op[0];
+       for (i = 0; i < update_count; i++, op = update_op_next_op(op))
+               total_size += update_op_size(op->uop_param_count);
+
+       return total_size;
+}
+
+/*
+ * This is the update record format used to store the updates in
+ * disk. All updates of the operation will be stored in ur_ops.
+ * All of parameters for updates of the operation will be stored
+ * in ur_params.
+ * To save the space of the record, parameters in ur_ops will only
+ * remember their offset in ur_params, so to avoid storing duplicate
+ * parameters in ur_params, which can help us save a lot space for
+ * operation like creating striped directory.
+ */
+struct update_records {
+       __u64                   ur_master_transno;
+       __u64                   ur_batchid;
+       __u32                   ur_flags;
+       __u32                   ur_param_count;
+       __u32                   ur_update_count;
+       struct update_ops       ur_ops;
+        /* Note ur_ops has a variable size, so comment out
+         * the following ur_params, in case some use it directly
+         * update_records->ur_params
+         *
+         * struct update_params        ur_params;
+         */
+};
+
+struct llog_update_record {
+       struct llog_rec_hdr     lur_hdr;
+       struct update_records   lur_update_rec;
+       /* Note ur_update_rec has a variable size, so comment out
+        * the following ur_tail, in case someone use it directly
+        *
+        * struct llog_rec_tail lur_tail;
+        */
+};
+
+static inline struct update_params *
+update_records_get_params(const struct update_records *record)
+{
+       return (struct update_params *)((char *)record +
+               offsetof(struct update_records, ur_ops) +
+               update_ops_size(&record->ur_ops, record->ur_update_count));
+}
+
+static inline size_t
+update_records_size(const struct update_records *record)
+{
+       struct update_params *params;
+
+       params = update_records_get_params(record);
+
+       return cfs_size_round(offsetof(struct update_records, ur_ops) +
+              update_ops_size(&record->ur_ops, record->ur_update_count) +
+              update_params_size(params, record->ur_param_count));
+}
+
+static inline size_t
+llog_update_record_size(const struct llog_update_record *lur)
+{
+       return cfs_size_round(sizeof(lur->lur_hdr) +
+                             update_records_size(&lur->lur_update_rec) +
+                             sizeof(struct llog_rec_tail));
+}
+
+static inline struct update_op *
+update_ops_get_op(const struct update_ops *ops, unsigned int index,
+                 unsigned int update_count)
+{
+       struct update_op *op;
+       unsigned int i;
+
+       if (index > update_count)
+               return NULL;
+
+       op = (struct update_op *)&ops->uops_op[0];
+       for (i = 0; i < index; i++)
+               op = update_op_next_op(op);
+
+       return op;
+}
+
 static inline void
 *object_update_param_get(const struct object_update *update, size_t index,
                         size_t *size)
@@ -176,53 +316,105 @@ object_update_result_data_get(const struct object_update_reply *reply,
        return 0;
 }
 
-static inline void update_inc_batchid(struct dt_update_request *update)
-{
-       update->dur_batchid++;
-}
+/**
+ * Attached in the thandle to record the updates for distribute
+ * distribution.
+ */
+struct thandle_update_records {
+       /* All of updates for the cross-MDT operation. */
+       struct llog_update_record       *tur_update_records;
+       size_t                          tur_update_records_buf_size;
+
+       /* All of parameters for the cross-MDT operation */
+       struct update_params    *tur_update_params;
+       unsigned int            tur_update_param_count;
+       size_t                  tur_update_params_buf_size;
+};
+
+#define TOP_THANDLE_MAGIC      0x20140917
+/* {top,sub}_thandle are used to manage distributed transactions which
+ * include updates on several nodes. A top_handle represents the
+ * whole operation, and sub_thandle represents updates on each node. */
+struct top_thandle {
+       struct thandle          tt_super;
+       __u32                   tt_magic;
+       atomic_t                tt_refcount;
+       /* The master sub transaction. */
+       struct thandle          *tt_master_sub_thandle;
+
+       /* Other sub thandle will be listed here. */
+       struct list_head        tt_sub_thandle_list;
+
+       /* All of update records will packed here */
+       struct thandle_update_records *tt_update_records;
+};
+
+struct sub_thandle {
+       /* point to the osd/osp_thandle */
+       struct thandle          *st_sub_th;
+
+       /* linked to top_thandle */
+       struct list_head        st_sub_list;
+
+       /* If this sub thandle is committed */
+       bool                    st_committed:1,
+                               st_record_update:1;
+};
+
 
 /* target/out_lib.c */
-int out_update_pack(const struct lu_env *env, struct update_buffer *ubuf,
-                   enum update_type op, const struct lu_fid *fid,
-                   int params_count, __u16 *param_sizes, const void **bufs,
-                   __u64 batchid);
-int out_create_pack(const struct lu_env *env, struct update_buffer *ubuf,
-                   const struct lu_fid *fid, struct lu_attr *attr,
-                   struct dt_allocation_hint *hint,
-                   struct dt_object_format *dof, __u64 batchid);
+int out_update_pack(const struct lu_env *env, struct object_update *update,
+                   size_t max_update_size, enum update_type op,
+                   const struct lu_fid *fid, unsigned int params_count,
+                   __u16 *param_sizes, const void **param_bufs);
+int out_create_pack(const struct lu_env *env, struct object_update *update,
+                   size_t max_update_size, const struct lu_fid *fid,
+                   const struct lu_attr *attr, struct dt_allocation_hint *hint,
+                   struct dt_object_format *dof);
 int out_object_destroy_pack(const struct lu_env *env,
-                           struct update_buffer *ubuf,
-                           const struct lu_fid *fid, __u64 batchid);
-int out_index_delete_pack(const struct lu_env *env, struct update_buffer *ubuf,
-                         const struct lu_fid *fid, const struct dt_key *key,
-                         __u64 batchid);
-int out_index_insert_pack(const struct lu_env *env, struct update_buffer *ubuf,
+                           struct object_update *update,
+                           size_t max_update_size,
+                           const struct lu_fid *fid);
+int out_index_delete_pack(const struct lu_env *env,
+                         struct object_update *update, size_t max_update_size,
+                         const struct lu_fid *fid, const struct dt_key *key);
+int out_index_insert_pack(const struct lu_env *env,
+                         struct object_update *update, size_t max_update_size,
                          const struct lu_fid *fid, const struct dt_rec *rec,
-                         const struct dt_key *key, __u64 batchid);
-int out_xattr_set_pack(const struct lu_env *env, struct update_buffer *ubuf,
+                         const struct dt_key *key);
+int out_xattr_set_pack(const struct lu_env *env,
+                      struct object_update *update, size_t max_update_size,
                       const struct lu_fid *fid, const struct lu_buf *buf,
-                      const char *name, int flag, __u64 batchid);
-int out_xattr_del_pack(const struct lu_env *env, struct update_buffer *ubuf,
-                      const struct lu_fid *fid, const char *name,
-                      __u64 batchid);
-int out_attr_set_pack(const struct lu_env *env, struct update_buffer *ubuf,
-                     const struct lu_fid *fid, const struct lu_attr *attr,
-                     __u64 batchid);
-int out_ref_add_pack(const struct lu_env *env, struct update_buffer *ubuf,
-                    const struct lu_fid *fid, __u64 batchid);
-int out_ref_del_pack(const struct lu_env *env, struct update_buffer *ubuf,
-                    const struct lu_fid *fid, __u64 batchid);
-int out_write_pack(const struct lu_env *env, struct update_buffer *ubuf,
+                      const char *name, __u32 flag);
+int out_xattr_del_pack(const struct lu_env *env,
+                      struct object_update *update, size_t max_update_size,
+                      const struct lu_fid *fid, const char *name);
+int out_attr_set_pack(const struct lu_env *env,
+                     struct object_update *update, size_t max_update_size,
+                     const struct lu_fid *fid, const struct lu_attr *attr);
+int out_ref_add_pack(const struct lu_env *env,
+                    struct object_update *update, size_t max_update_size,
+                    const struct lu_fid *fid);
+int out_ref_del_pack(const struct lu_env *env,
+                    struct object_update *update, size_t max_update_size,
+                    const struct lu_fid *fid);
+int out_write_pack(const struct lu_env *env,
+                  struct object_update *update, size_t max_update_size,
                   const struct lu_fid *fid, const struct lu_buf *buf,
-                  loff_t pos, __u64 batchid);
-int out_attr_get_pack(const struct lu_env *env, struct update_buffer *ubuf,
+                  __u64 pos);
+int out_attr_get_pack(const struct lu_env *env,
+                     struct object_update *update, size_t max_update_size,
                      const struct lu_fid *fid);
-int out_index_lookup_pack(const struct lu_env *env, struct update_buffer *ubuf,
+int out_index_lookup_pack(const struct lu_env *env,
+                         struct object_update *update, size_t max_update_size,
                          const struct lu_fid *fid, struct dt_rec *rec,
                          const struct dt_key *key);
-int out_xattr_get_pack(const struct lu_env *env, struct update_buffer *ubuf,
+int out_xattr_get_pack(const struct lu_env *env,
+                      struct object_update *update, size_t max_update_size,
                       const struct lu_fid *fid, const char *name);
 
+const char *update_op_str(__u16 opcode);
+
 /* target/update_trans.c */
 struct thandle *thandle_get_sub_by_dt(const struct lu_env *env,
                                      struct thandle *th,
@@ -245,4 +437,147 @@ int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev,
                   struct thandle *th);
 
 void top_thandle_destroy(struct top_thandle *top_th);
+
+/* update_records.c */
+int update_records_create_pack(const struct lu_env *env,
+                              struct update_ops *ops,
+                              unsigned int *op_count,
+                              size_t *max_ops_size,
+                              struct update_params *params,
+                              unsigned int *param_count,
+                              size_t *max_param_size,
+                              const struct lu_fid *fid,
+                              const struct lu_attr *attr,
+                              const struct dt_allocation_hint *hint,
+                              struct dt_object_format *dof);
+int update_records_attr_set_pack(const struct lu_env *env,
+                                struct update_ops *ops,
+                                unsigned int *op_count,
+                                size_t *max_ops_size,
+                                struct update_params *params,
+                                unsigned int *param_count,
+                                size_t *max_param_size,
+                                const struct lu_fid *fid,
+                                const struct lu_attr *attr);
+int update_records_ref_add_pack(const struct lu_env *env,
+                               struct update_ops *ops,
+                               unsigned int *op_count,
+                               size_t *max_ops_size,
+                               struct update_params *params,
+                               unsigned int *param_count,
+                               size_t *max_param_size,
+                               const struct lu_fid *fid);
+int update_records_ref_del_pack(const struct lu_env *env,
+                               struct update_ops *ops,
+                               unsigned int *op_count,
+                               size_t *max_ops_size,
+                               struct update_params *params,
+                               unsigned int *param_count,
+                               size_t *max_param_size,
+                               const struct lu_fid *fid);
+int update_records_object_destroy_pack(const struct lu_env *env,
+                                      struct update_ops *ops,
+                                      unsigned int *op_count,
+                                      size_t *max_ops_size,
+                                      struct update_params *params,
+                                      unsigned int *param_count,
+                                      size_t *max_param_size,
+                                      const struct lu_fid *fid);
+int update_records_index_insert_pack(const struct lu_env *env,
+                                    struct update_ops *ops,
+                                    unsigned int *op_count,
+                                    size_t *max_ops_size,
+                                    struct update_params *params,
+                                    unsigned int *param_count,
+                                    size_t *max_param_size,
+                                    const struct lu_fid *fid,
+                                    const struct dt_rec *rec,
+                                    const struct dt_key *key);
+int update_records_index_delete_pack(const struct lu_env *env,
+                                    struct update_ops *ops,
+                                    unsigned int *op_count,
+                                    size_t *max_ops_size,
+                                    struct update_params *params,
+                                    unsigned int *param_count,
+                                    size_t *max_param_size,
+                                    const struct lu_fid *fid,
+                                    const struct dt_key *key);
+int update_records_xattr_set_pack(const struct lu_env *env,
+                                 struct update_ops *ops,
+                                 unsigned int *op_count,
+                                 size_t *max_ops_size,
+                                 struct update_params *params,
+                                 unsigned int *param_count,
+                                 size_t *max_param_size,
+                                 const struct lu_fid *fid,
+                                 const struct lu_buf *buf, const char *name,
+                                 __u32 flag);
+int update_records_xattr_del_pack(const struct lu_env *env,
+                                 struct update_ops *ops,
+                                 unsigned int *op_count,
+                                 size_t *max_ops_size,
+                                 struct update_params *params,
+                                 unsigned int *param_count,
+                                 size_t *max_param_size,
+                                 const struct lu_fid *fid,
+                                 const char *name);
+int update_records_write_pack(const struct lu_env *env,
+                             struct update_ops *ops,
+                             unsigned int *op_count,
+                             size_t *max_ops_size,
+                             struct update_params *params,
+                             unsigned int *param_count,
+                             size_t *max_param_size,
+                             const struct lu_fid *fid,
+                             const struct lu_buf *buf,
+                             __u64 pos);
+
+int tur_update_records_extend(struct thandle_update_records *tur,
+                             size_t new_size);
+int tur_update_params_extend(struct thandle_update_records *tur,
+                            size_t new_size);
+int check_and_prepare_update_record(const struct lu_env *env,
+                                   struct thandle *th);
+int merge_params_updates_buf(const struct lu_env *env,
+                            struct thandle_update_records *tur);
+int tur_update_extend(struct thandle_update_records *tur,
+                     size_t new_op_size, size_t new_param_size);
+
+#define update_record_pack(name, th, ...)                              \
+({                                                                     \
+       struct top_thandle *top_th;                                     \
+       struct thandle_update_records *tur;                             \
+       struct llog_update_record     *lur;                             \
+       size_t          avail_param_size;                               \
+       size_t          avail_op_size;                                  \
+       int             ret;                                            \
+                                                                       \
+       while (1) {                                                     \
+               top_th = container_of(th, struct top_thandle, tt_super);\
+               tur = top_th->tt_update_records;                        \
+               lur = tur->tur_update_records;                          \
+               avail_param_size = tur->tur_update_params_buf_size -    \
+                            update_params_size(tur->tur_update_params, \
+                                       tur->tur_update_param_count);   \
+               avail_op_size = tur->tur_update_records_buf_size -      \
+                               llog_update_record_size(lur);           \
+               ret = update_records_##name##_pack(env,                 \
+                                         &lur->lur_update_rec.ur_ops,  \
+                                 &lur->lur_update_rec.ur_update_count, \
+                                 &avail_op_size,                       \
+                                 tur->tur_update_params,               \
+                                 &tur->tur_update_param_count,         \
+                                 &avail_param_size, __VA_ARGS__);      \
+               if (ret == -E2BIG) {                                    \
+                       ret = tur_update_extend(tur, avail_op_size,     \
+                                                  avail_param_size);   \
+                       if (ret != 0)                                   \
+                               break;                                  \
+                       continue;                                       \
+               } else {                                                \
+                       break;                                          \
+               }                                                       \
+       }                                                               \
+       ret;                                                            \
+})
 #endif
index 2830b83..b74f714 100644 (file)
@@ -485,7 +485,8 @@ void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo);
 /* lod_sub_object.c */
 struct thandle *lod_sub_get_thandle(const struct lu_env *env,
                                    struct thandle *th,
-                                   const struct dt_object *sub_obj);
+                                   const struct dt_object *sub_obj,
+                                   bool *record_update);
 int lod_sub_object_declare_create(const struct lu_env *env,
                                  struct dt_object *dt,
                                  struct lu_attr *attr,
index 642cda8..61dbede 100644 (file)
@@ -1622,19 +1622,18 @@ out:
 }
 
 /**
- * Create a striped directory.
+ * Declare create a striped directory.
  *
- * Create a striped directory with a given stripe pattern on the specified MDTs.
- * A striped directory is represented as a regular directory - an index listing
- * all the stripes. The stripes point back to the master object with ".." and
- * LinkEA. The master object gets LMV EA which identifies it as a striped
- * directory. The function allocates FIDs for all the stripes.
+ * Declare creating a striped directory with a given stripe pattern on the
+ * specified MDTs. A striped directory is represented as a regular directory
+ * - an index listing all the stripes. The stripes point back to the master
+ * object with ".." and LinkEA. The master object gets LMV EA which
+ * identifies it as a striped directory. The function allocates FIDs
+ * for all stripes.
  *
  * \param[in] env      execution environment
  * \param[in] dt       object
  * \param[in] attr     attributes to initialize the objects with
- * \param[in] lum      a pattern specifying the number of stripes and
- *                     MDT to start from
  * \param[in] dof      type of objects to be created
  * \param[in] th       transaction handle
  *
@@ -2143,7 +2142,8 @@ static void lod_lov_stripe_cache_clear(struct lod_object *lo)
 static int lod_xattr_set_internal(const struct lu_env *env,
                                  struct dt_object *dt,
                                  const struct lu_buf *buf,
-                                 const char *name, int fl, struct thandle *th)
+                                 const char *name, int fl,
+                                 struct thandle *th)
 {
        struct dt_object        *next = dt_object_child(dt);
        struct lod_object       *lo = lod_dt_obj(dt);
index cce804f..17b9f07 100644 (file)
@@ -55,7 +55,8 @@
 
 struct thandle *lod_sub_get_thandle(const struct lu_env *env,
                                    struct thandle *th,
-                                   const struct dt_object *sub_obj)
+                                   const struct dt_object *sub_obj,
+                                   bool *record_update)
 {
        struct lod_device       *lod = dt2lod_dev(th->th_dev);
        struct top_thandle      *tth;
@@ -65,6 +66,9 @@ struct thandle *lod_sub_get_thandle(const struct lu_env *env,
        int                     rc;
        ENTRY;
 
+       if (record_update != NULL)
+               *record_update = false;
+
        if (th->th_top == NULL)
                RETURN(th);
 
@@ -74,8 +78,15 @@ struct thandle *lod_sub_get_thandle(const struct lu_env *env,
         * creation, FID is not assigned until osp_object_create(),
         * so if the FID of sub_obj is zero, it means OST object. */
        if (!dt_object_remote(sub_obj) ||
-           fid_is_zero(lu_object_fid(&sub_obj->do_lu)))
+           fid_is_zero(lu_object_fid(&sub_obj->do_lu))) {
+               /* local MDT object */
+               if (fid_is_sane(lu_object_fid(&sub_obj->do_lu)) &&
+                   tth->tt_update_records != NULL &&
+                   record_update != NULL)
+                       *record_update = true;
+
                RETURN(tth->tt_master_sub_thandle);
+       }
 
        rc = lod_fld_lookup(env, lod, lu_object_fid(&sub_obj->do_lu),
                            &mdt_index, &type);
@@ -85,6 +96,9 @@ struct thandle *lod_sub_get_thandle(const struct lu_env *env,
        if (type == LU_SEQ_RANGE_OST)
                RETURN(tth->tt_master_sub_thandle);
 
+       if (tth->tt_update_records != NULL && record_update != NULL)
+               *record_update = true;
+
        sub_th = thandle_get_sub(env, th, sub_obj);
 
        RETURN(sub_th);
@@ -114,7 +128,7 @@ int lod_sub_object_declare_create(const struct lu_env *env,
 {
        struct thandle *sub_th;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, NULL);
        if (IS_ERR(sub_th))
                return PTR_ERR(sub_th);
 
@@ -144,13 +158,22 @@ int lod_sub_object_create(const struct lu_env *env, struct dt_object *dt,
                          struct thandle *th)
 {
        struct thandle     *sub_th;
+       bool               record_update;
        int                 rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update) {
+               rc = update_record_pack(create, th,
+                                       lu_object_fid(&dt->do_lu),
+                                       attr, hint, dof);
+               if (rc < 0)
+                       RETURN(rc);
+       }
+
        rc = dt_create(env, dt, attr, hint, dof, sub_th);
 
        RETURN(rc);
@@ -176,7 +199,7 @@ int lod_sub_object_declare_ref_add(const struct lu_env *env,
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, NULL);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
@@ -202,13 +225,21 @@ int lod_sub_object_ref_add(const struct lu_env *env, struct dt_object *dt,
                           struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update) {
+               rc = update_record_pack(ref_add, th,
+                                       lu_object_fid(&dt->do_lu));
+               if (rc < 0)
+                       RETURN(rc);
+       }
+
        rc = dt_ref_add(env, dt, sub_th);
 
        RETURN(rc);
@@ -234,7 +265,7 @@ int lod_sub_object_declare_ref_del(const struct lu_env *env,
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, NULL);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
@@ -260,13 +291,21 @@ int lod_sub_object_ref_del(const struct lu_env *env, struct dt_object *dt,
                           struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update) {
+               rc = update_record_pack(ref_del, th,
+                                       lu_object_fid(&dt->do_lu));
+               if (rc < 0)
+                       RETURN(rc);
+       }
+
        rc = dt_ref_del(env, dt, sub_th);
 
        RETURN(rc);
@@ -292,7 +331,7 @@ int lod_sub_object_declare_destroy(const struct lu_env *env,
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, NULL);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
@@ -318,13 +357,21 @@ int lod_sub_object_destroy(const struct lu_env *env, struct dt_object *dt,
                           struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update) {
+               rc = update_record_pack(object_destroy, th,
+                                       lu_object_fid(&dt->do_lu));
+               if (rc < 0)
+                       RETURN(rc);
+       }
+
        rc = dt_destroy(env, dt, sub_th);
 
        RETURN(rc);
@@ -352,7 +399,7 @@ int lod_sub_object_declare_insert(const struct lu_env *env,
 {
        struct thandle *sub_th;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, NULL);
        if (IS_ERR(sub_th))
                return PTR_ERR(sub_th);
 
@@ -381,11 +428,20 @@ int lod_sub_object_index_insert(const struct lu_env *env, struct dt_object *dt,
                                int ign)
 {
        struct thandle *sub_th;
+       int             rc;
+       bool            record_update;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                return PTR_ERR(sub_th);
 
+       if (record_update) {
+               rc = update_record_pack(index_insert, th,
+                                       lu_object_fid(&dt->do_lu), rec, key);
+               if (rc < 0)
+                       return rc;
+       }
+
        return dt_insert(env, dt, rec, key, sub_th, ign);
 }
 
@@ -409,7 +465,7 @@ int lod_sub_object_declare_delete(const struct lu_env *env,
 {
        struct thandle *sub_th;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, NULL);
        if (IS_ERR(sub_th))
                return PTR_ERR(sub_th);
 
@@ -434,13 +490,21 @@ int lod_sub_object_delete(const struct lu_env *env, struct dt_object *dt,
                          const struct dt_key *name, struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update) {
+               rc = update_record_pack(index_delete, th,
+                                       lu_object_fid(&dt->do_lu), name);
+               if (rc < 0)
+                       RETURN(rc);
+       }
+
        rc = dt_delete(env, dt, name, sub_th);
        RETURN(rc);
 }
@@ -469,7 +533,7 @@ int lod_sub_object_declare_xattr_set(const struct lu_env *env,
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, NULL);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
@@ -499,13 +563,22 @@ int lod_sub_object_xattr_set(const struct lu_env *env, struct dt_object *dt,
                             struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update) {
+               rc = update_record_pack(xattr_set, th,
+                                       lu_object_fid(&dt->do_lu),
+                                       buf, name, fl);
+               if (rc < 0)
+                       RETURN(rc);
+       }
+
        rc = dt_xattr_set(env, dt, buf, name, fl, sub_th);
 
        RETURN(rc);
@@ -533,7 +606,7 @@ int lod_sub_object_declare_attr_set(const struct lu_env *env,
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, NULL);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
@@ -561,14 +634,22 @@ int lod_sub_object_attr_set(const struct lu_env *env,
                            const struct lu_attr *attr,
                            struct thandle *th)
 {
+       bool               record_update;
        struct thandle     *sub_th;
        int                 rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update) {
+               rc = update_record_pack(attr_set, th, lu_object_fid(&dt->do_lu),
+                                       attr);
+               if (rc < 0)
+                       RETURN(rc);
+       }
+
        rc = dt_attr_set(env, dt, attr, sub_th);
 
        RETURN(rc);
@@ -596,7 +677,7 @@ int lod_sub_object_declare_xattr_del(const struct lu_env *env,
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, NULL);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
@@ -625,13 +706,21 @@ int lod_sub_object_xattr_del(const struct lu_env *env,
                             struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update) {
+               rc = update_record_pack(xattr_del, th,
+                                       lu_object_fid(&dt->do_lu), name);
+               if (rc < 0)
+                       RETURN(rc);
+       }
+
        rc = dt_xattr_del(env, dt, name, sub_th);
 
        RETURN(rc);
@@ -660,7 +749,7 @@ int lod_sub_object_declare_write(const struct lu_env *env,
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, NULL);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
@@ -690,13 +779,21 @@ ssize_t lod_sub_object_write(const struct lu_env *env, struct dt_object *dt,
                             struct thandle *th, int rq)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        ssize_t         rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update) {
+               rc = update_record_pack(write, th, lu_object_fid(&dt->do_lu),
+                                       buf, *pos);
+               if (rc < 0)
+                       RETURN(rc);
+       }
+
        rc = dt_write(env, dt, buf, pos, sub_th, rq);
        RETURN(rc);
 }
index fd2d41d..1d6db3c 100644 (file)
@@ -535,6 +535,48 @@ static inline int osp_is_fid_client(struct osp_device *osp)
        return imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_FID;
 }
 
+struct object_update *
+update_buffer_get_update(struct object_update_request *request,
+                        unsigned int index);
+
+int osp_extend_update_buffer(const struct lu_env *env,
+                            struct update_buffer *ubuf);
+
+#define osp_update_rpc_pack(env, name, update, op, ...)                \
+({                                                             \
+       struct object_update    *object_update;                 \
+       size_t                  max_update_length;              \
+       struct object_update_request *ureq;                     \
+       int                     ret;                            \
+                                                               \
+       while (1) {                                                     \
+               ureq = update->dur_buf.ub_req;                          \
+               max_update_length = update->dur_buf.ub_req_size -       \
+                                   object_update_request_size(ureq);   \
+                                                                       \
+               object_update = update_buffer_get_update(ureq,          \
+                                                        ureq->ourq_count);    \
+               ret = out_##name##_pack(env, object_update, max_update_length, \
+                                      __VA_ARGS__);                    \
+               if (ret == -E2BIG) {                                    \
+                       int rc1;                                        \
+                       /* extend the buffer and retry */               \
+                       rc1 = osp_extend_update_buffer(env, &update->dur_buf); \
+                       if (rc1 != 0) {                                 \
+                               ret = rc1;                              \
+                               break;                                  \
+                       }                                               \
+               } else {                                                \
+                       if (ret == 0) {                                 \
+                               object_update->ou_flags |= update->dur_flags; \
+                               ureq->ourq_count++;                     \
+                       }                                               \
+                       break;                                          \
+               }                                                       \
+       }                                                               \
+       ret;                                                            \
+})
+
 typedef int (*osp_update_interpreter_t)(const struct lu_env *env,
                                        struct object_update_reply *rep,
                                        struct ptlrpc_request *req,
index fa86222..47ec90a 100644 (file)
@@ -56,8 +56,8 @@
 #include <lustre_log.h>
 #include "osp_internal.h"
 
-static const char dot[] = ".";
-static const char dotdot[] = "..";
+#define OUT_UPDATE_BUFFER_SIZE_ADD     4096
+#define OUT_UPDATE_BUFFER_SIZE_MAX     (256 * 4096)  /*  1M update size now */
 
 /**
  * Interpreter call for object creation
@@ -115,6 +115,47 @@ int osp_md_declare_object_create(const struct lu_env *env,
        return osp_trans_update_request_create(th);
 }
 
+struct object_update *
+update_buffer_get_update(struct object_update_request *request,
+                        unsigned int index)
+{
+       void    *ptr;
+       int     i;
+
+       if (index > request->ourq_count)
+               return NULL;
+
+       ptr = &request->ourq_updates[0];
+       for (i = 0; i < index; i++)
+               ptr += object_update_size(ptr);
+
+       return ptr;
+}
+
+int osp_extend_update_buffer(const struct lu_env *env,
+                            struct update_buffer *ubuf)
+{
+       struct object_update_request *ureq;
+       size_t  new_size = ubuf->ub_req_size + OUT_UPDATE_BUFFER_SIZE_ADD;
+
+       /* enlarge object update request size */
+       if (new_size > OUT_UPDATE_BUFFER_SIZE_MAX)
+               return -E2BIG;
+
+       OBD_ALLOC_LARGE(ureq, new_size);
+       if (ureq == NULL)
+               return -ENOMEM;
+
+       memcpy(ureq, ubuf->ub_req, ubuf->ub_req_size);
+
+       OBD_FREE_LARGE(ubuf->ub_req, ubuf->ub_req_size);
+
+       ubuf->ub_req = ureq;
+       ubuf->ub_req_size = new_size;
+
+       return 0;
+}
+
 /**
  * Implementation of dt_object_operations::do_create
  *
@@ -141,13 +182,12 @@ int osp_md_object_create(const struct lu_env *env, struct dt_object *dt,
        update = thandle_to_dt_update_request(th);
        LASSERT(update != NULL);
 
-       rc = out_create_pack(env, &update->dur_buf,
-                            lu_object_fid(&dt->do_lu), attr, hint, dof,
-                            update->dur_batchid);
+       rc = osp_update_rpc_pack(env, create, update, OUT_CREATE,
+                                lu_object_fid(&dt->do_lu), attr, hint, dof);
        if (rc != 0)
                GOTO(out, rc);
 
-       rc = osp_insert_update_callback(env, update, dt2osp_obj(dt), attr,
+       rc = osp_insert_update_callback(env, update, dt2osp_obj(dt), NULL,
                                        osp_object_create_interpreter);
 
        if (rc < 0)
@@ -200,9 +240,8 @@ static int osp_md_ref_del(const struct lu_env *env, struct dt_object *dt,
        update = thandle_to_dt_update_request(th);
        LASSERT(update != NULL);
 
-       rc = out_ref_del_pack(env, &update->dur_buf,
-                             lu_object_fid(&dt->do_lu),
-                             update->dur_batchid);
+       rc = osp_update_rpc_pack(env, ref_del, update, OUT_REF_DEL,
+                                lu_object_fid(&dt->do_lu));
        return rc;
 }
 
@@ -247,9 +286,8 @@ static int osp_md_ref_add(const struct lu_env *env, struct dt_object *dt,
        update = thandle_to_dt_update_request(th);
        LASSERT(update != NULL);
 
-       rc = out_ref_add_pack(env, &update->dur_buf,
-                             lu_object_fid(&dt->do_lu),
-                             update->dur_batchid);
+       rc = osp_update_rpc_pack(env, ref_add, update, OUT_REF_ADD,
+                                lu_object_fid(&dt->do_lu));
        return rc;
 }
 
@@ -324,10 +362,8 @@ int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
        update = thandle_to_dt_update_request(th);
        LASSERT(update != NULL);
 
-       rc = out_attr_set_pack(env, &update->dur_buf,
-                              lu_object_fid(&dt->do_lu), attr,
-                              update->dur_batchid);
-
+       rc = osp_update_rpc_pack(env, attr_set, update, OUT_ATTR_SET,
+                                lu_object_fid(&dt->do_lu), attr);
        return rc;
 }
 
@@ -459,8 +495,8 @@ static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt,
        if (IS_ERR(update))
                RETURN(PTR_ERR(update));
 
-       rc = out_index_lookup_pack(env, &update->dur_buf,
-                                  lu_object_fid(&dt->do_lu), rec, key);
+       rc = osp_update_rpc_pack(env, index_lookup, update, OUT_INDEX_LOOKUP,
+                                lu_object_fid(&dt->do_lu), rec, key);
        if (rc != 0) {
                CERROR("%s: Insert update error: rc = %d\n",
                       dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
@@ -567,11 +603,8 @@ static int osp_md_index_insert(const struct lu_env *env,
        struct dt_update_request *update = oth->ot_dur;
        int                      rc;
 
-
-       rc = out_index_insert_pack(env, &update->dur_buf,
-                                  lu_object_fid(&dt->do_lu), rec, key,
-                                  update->dur_batchid);
-
+       rc = osp_update_rpc_pack(env, index_insert, update, OUT_INDEX_INSERT,
+                                lu_object_fid(&dt->do_lu), rec, key);
        return rc;
 }
 
@@ -622,9 +655,9 @@ static int osp_md_index_delete(const struct lu_env *env,
        update = thandle_to_dt_update_request(th);
        LASSERT(update != NULL);
 
-       rc = out_index_delete_pack(env, &update->dur_buf,
-                                  lu_object_fid(&dt->do_lu), key,
-                                  update->dur_batchid);
+       rc = osp_update_rpc_pack(env, index_delete, update, OUT_INDEX_DELETE,
+                                lu_object_fid(&dt->do_lu), key);
+
        return rc;
 }
 
@@ -919,6 +952,35 @@ int osp_md_declare_object_destroy(const struct lu_env *env,
 }
 
 /**
+ * Interpreter call for object destroy
+ *
+ * Object destroy interpreter, which will be called after destroying
+ * the remote object to set flags and status.
+ *
+ * \param[in] env      execution environment
+ * \param[in] reply    update reply
+ * \param[in] req      ptlrpc update request for destroying object
+ * \param[in] obj      object to be destroyed
+ * \param[in] data     data used in this function.
+ * \param[in] index    index(position) of destroy update in the whole
+ *                      updates
+ * \param[in] rc       update result on the remote MDT.
+ *
+ * \retval             only return 0 for now
+ */
+static int osp_md_object_destroy_interpreter(const struct lu_env *env,
+                                            struct object_update_reply *reply,
+                                            struct ptlrpc_request *req,
+                                            struct osp_object *obj,
+                                            void *data, int index, int rc)
+{
+       /* not needed in cache any more */
+       set_bit(LU_OBJECT_HEARD_BANSHEE,
+               &obj->opo_obj.do_lu.lo_header->loh_flags);
+       return 0;
+}
+
+/**
  * Implement OSP layer dt_object_operations::do_destroy() interface.
  *
  * Pack the destroy update into the RPC buffer, which will be sent
@@ -948,14 +1010,13 @@ int osp_md_object_destroy(const struct lu_env *env, struct dt_object *dt,
        update = thandle_to_dt_update_request(th);
        LASSERT(update != NULL);
 
-       rc = out_object_destroy_pack(env, &update->dur_buf,
-                      lu_object_fid(&dt->do_lu), update->dur_batchid);
+       rc = osp_update_rpc_pack(env, object_destroy, update, OUT_DESTROY,
+                                lu_object_fid(&dt->do_lu));
        if (rc != 0)
                RETURN(rc);
 
-       /* not needed in cache any more */
-       set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
-
+       rc = osp_insert_update_callback(env, update, dt2osp_obj(dt), NULL,
+                                       osp_md_object_destroy_interpreter);
        RETURN(rc);
 }
 
@@ -1036,8 +1097,8 @@ static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt,
        update = thandle_to_dt_update_request(th);
        LASSERT(update != NULL);
 
-       rc = out_write_pack(env, &update->dur_buf, lu_object_fid(&dt->do_lu),
-                           buf, *pos, update->dur_batchid);
+       rc = osp_update_rpc_pack(env, write, update, OUT_WRITE,
+                                lu_object_fid(&dt->do_lu), buf, *pos);
        if (rc < 0)
                return rc;
 
index 824564f..1ea3461 100644 (file)
@@ -565,8 +565,8 @@ int osp_attr_get(const struct lu_env *env, struct dt_object *dt,
        if (IS_ERR(update))
                RETURN(PTR_ERR(update));
 
-       rc = out_attr_get_pack(env, &update->dur_buf,
-                              lu_object_fid(&dt->do_lu));
+       rc = osp_update_rpc_pack(env, attr_get, update, OUT_ATTR_GET,
+                                lu_object_fid(&dt->do_lu));
        if (rc != 0) {
                CERROR("%s: Insert update error "DFID": rc = %d\n",
                       dev->dd_lu_dev.ld_obd->obd_name,
@@ -963,8 +963,8 @@ unlock:
        if (IS_ERR(update))
                GOTO(out, rc = PTR_ERR(update));
 
-       rc = out_xattr_get_pack(env, &update->dur_buf,
-                               lu_object_fid(&dt->do_lu), name);
+       rc = osp_update_rpc_pack(env, xattr_get, update, OUT_XATTR_GET,
+                                lu_object_fid(&dt->do_lu), name);
        if (rc != 0) {
                CERROR("%s: Insert update error "DFID": rc = %d\n",
                       dname, PFID(lu_object_fid(&dt->do_lu)), rc);
@@ -1152,11 +1152,10 @@ int osp_xattr_set(const struct lu_env *env, struct dt_object *dt,
        CDEBUG(D_INODE, DFID" set xattr '%s' with size %zd\n",
               PFID(lu_object_fid(&dt->do_lu)), name, buf->lb_len);
 
-       rc = out_xattr_set_pack(env, &update->dur_buf,
-                               lu_object_fid(&dt->do_lu),
-                               buf, name, fl, update->dur_batchid);
+       rc = osp_update_rpc_pack(env, xattr_set, update, OUT_XATTR_SET,
+                                lu_object_fid(&dt->do_lu), buf, name, fl);
        if (rc != 0 || o->opo_ooa == NULL)
-               return rc;
+               RETURN(rc);
 
        oxe = osp_oac_xattr_find_or_add(o, name, buf->lb_len);
        if (oxe == NULL) {
@@ -1255,8 +1254,8 @@ int osp_xattr_del(const struct lu_env *env, struct dt_object *dt,
        update = thandle_to_dt_update_request(th);
        LASSERT(update != NULL);
 
-       rc = out_xattr_del_pack(env, &update->dur_buf, fid, name,
-                               update->dur_batchid);
+       rc = osp_update_rpc_pack(env, xattr_del, update, OUT_XATTR_DEL,
+                                fid, name);
        if (rc != 0 || o->opo_ooa == NULL)
                return rc;
 
index 9d643cb..66af5c3 100644 (file)
@@ -168,6 +168,33 @@ void dt_update_request_destroy(struct dt_update_request *dt_update)
        OBD_FREE_PTR(dt_update);
 }
 
+static void
+object_update_request_dump(const struct object_update_request *ourq,
+                          unsigned int mask)
+{
+       unsigned int i;
+       size_t total_size = 0;
+
+       for (i = 0; i < ourq->ourq_count; i++) {
+               struct object_update    *update;
+               size_t                  size = 0;
+
+               update = object_update_request_get(ourq, i, &size);
+               LASSERT(update != NULL);
+               CDEBUG(mask, "i = %u fid = "DFID" op = %s master = %u"
+                      "params = %d batchid = "LPU64" size = %zu\n",
+                      i, PFID(&update->ou_fid),
+                      update_op_str(update->ou_type),
+                      update->ou_master_index, update->ou_params_count,
+                      update->ou_batchid, size);
+
+               total_size += size;
+       }
+
+       CDEBUG(mask, "updates = %p magic = %x count = %d size = %zu\n", ourq,
+              ourq->ourq_magic, ourq->ourq_count, total_size);
+}
+
 /**
  * Allocate an osp request and initialize it with the given parameters.
  *
@@ -428,8 +455,11 @@ int osp_insert_async_request(const struct lu_env *env, enum update_type op,
                             osp_update_interpreter_t interpreter)
 {
        struct osp_device            *osp = lu2osp_dev(osp2lu_obj(obj)->lo_dev);
-       struct dt_update_request     *update;
-       int                           rc  = 0;
+       struct dt_update_request        *update;
+       struct object_update            *object_update;
+       size_t                          max_update_size;
+       struct object_update_request    *ureq;
+       int                             rc = 0;
        ENTRY;
 
        update = osp_find_or_create_async_update_request(osp);
@@ -437,10 +467,14 @@ int osp_insert_async_request(const struct lu_env *env, enum update_type op,
                RETURN(PTR_ERR(update));
 
 again:
+       ureq = update->dur_buf.ub_req;
+       max_update_size = update->dur_buf.ub_req_size -
+                           object_update_request_size(ureq);
+
+       object_update = update_buffer_get_update(ureq, ureq->ourq_count);
+       rc = out_update_pack(env, object_update, max_update_size, op,
+                            lu_object_fid(osp2lu_obj(obj)), count, lens, bufs);
        /* The queue is full. */
-       rc = out_update_pack(env, &update->dur_buf, op,
-                            lu_object_fid(osp2lu_obj(obj)), count, lens, bufs,
-                            0);
        if (rc == -E2BIG) {
                osp->opd_async_requests = NULL;
                mutex_unlock(&osp->opd_async_requests_mutex);
@@ -455,6 +489,11 @@ again:
                        RETURN(PTR_ERR(update));
 
                goto again;
+       } else {
+               if (rc < 0)
+                       RETURN(rc);
+
+               ureq->ourq_count++;
        }
 
        rc = osp_insert_update_callback(env, update, obj, data, interpreter);
@@ -476,9 +515,13 @@ int osp_trans_update_request_create(struct thandle *th)
                return PTR_ERR(update);
        }
 
+       if (dt2osp_dev(th->th_dev)->opd_connect_mdt)
+               update->dur_flags = UPDATE_FL_SYNC;
+
        oth->ot_dur = update;
        return 0;
 }
+
 /**
  * The OSP layer dt_device_operations::dt_trans_create() interface
  * to create a transaction.
@@ -547,6 +590,7 @@ int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
        int                             rc;
        ENTRY;
 
+       object_update_request_dump(ureq, D_INFO);
        req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE);
        if (req == NULL)
                RETURN(-ENOMEM);
index cb47a53..40f4914 100644 (file)
@@ -12,6 +12,7 @@ ldlm_objs += $(LDLM)ldlm_pool.o $(LDLM)interval_tree.o
 target_objs := $(TARGET)tgt_main.o $(TARGET)tgt_lastrcvd.o
 target_objs += $(TARGET)tgt_handler.o $(TARGET)out_handler.o
 target_objs += $(TARGET)out_lib.o $(TARGET)update_trans.o
+target_objs += $(TARGET)update_records.o
 
 ptlrpc_objs := client.o recover.o connection.o niobuf.o pack_generic.o
 ptlrpc_objs += events.o ptlrpc_module.o service.o pinger.o
index eaf3957..924e3c0 100644 (file)
@@ -34,3 +34,4 @@ MOSTLYCLEANFILES := @MOSTLYCLEANFILES@
 EXTRA_DIST = tgt_main.c tgt_lastrcvd.c tgt_handler.c tgt_internal.h \
             out_handler.c out_lib.c
 EXTRA_DIST += update_trans.c
+EXTRA_DIST += update_records.c
index 5a00dfb..68439f1 100644 (file)
@@ -1625,6 +1625,9 @@ int out_handle(struct tgt_session_info *tsi)
                        rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
                        if (rc != 0)
                                GOTO(next, rc);
+
+                       if (update->ou_flags & UPDATE_FL_SYNC)
+                               ta->ta_handle->th_sync = 1;
                }
 
                /* Stop the current update transaction, if the update has
@@ -1642,6 +1645,9 @@ int out_handle(struct tgt_session_info *tsi)
                                if (rc != 0)
                                        GOTO(next, rc);
 
+                               if (update->ou_flags & UPDATE_FL_SYNC)
+                                       ta->ta_handle->th_sync = 1;
+
                                current_batchid = update->ou_batchid;
                        }
                }
index a6d6b36..8db03c1 100644 (file)
 
 #define OUT_UPDATE_BUFFER_SIZE_ADD     4096
 #define OUT_UPDATE_BUFFER_SIZE_MAX     (256 * 4096)  /* 1MB update size now */
-/**
- * resize update buffer
- *
- * Extend the update buffer by new_size.
- *
- * \param[in] ubuf     update buffer to be extended
- * \param[in] new_size  new size of the update buffer
- *
- * \retval             0 if extending succeeds.
- * \retval             negative errno if extending fails.
- */
-static int update_buffer_resize(struct update_buffer *ubuf, size_t new_size)
-{
-       struct object_update_request *ureq;
-
-       if (new_size > ubuf->ub_req_size)
-               return 0;
-
-       OBD_ALLOC_LARGE(ureq, new_size);
-       if (ureq == NULL)
-               return -ENOMEM;
-
-       memcpy(ureq, ubuf->ub_req, ubuf->ub_req_size);
 
-       OBD_FREE_LARGE(ubuf->ub_req, ubuf->ub_req_size);
-
-       ubuf->ub_req = ureq;
-       ubuf->ub_req_size = new_size;
-
-       return 0;
+const char *update_op_str(__u16 opc)
+{
+       static const char *opc_str[] = {
+               [OUT_START] = "start",
+               [OUT_CREATE] = "create",
+               [OUT_DESTROY] = "destroy",
+               [OUT_REF_ADD] = "ref_add",
+               [OUT_REF_DEL] = "ref_del" ,
+               [OUT_ATTR_SET] = "attr_set",
+               [OUT_ATTR_GET] = "attr_get",
+               [OUT_XATTR_SET] = "xattr_set",
+               [OUT_XATTR_GET] = "xattr_get",
+               [OUT_INDEX_LOOKUP] = "lookup",
+               [OUT_INDEX_INSERT] = "insert",
+               [OUT_INDEX_DELETE] = "delete",
+               [OUT_WRITE] = "write",
+               [OUT_XATTR_DEL] = "xattr_del",
+       };
+
+       if (opc < ARRAY_SIZE(opc_str) && opc_str[opc] != NULL)
+               return opc_str[opc];
+       else
+               return "unknown";
 }
+EXPORT_SYMBOL(update_op_str);
 
 /**
- * Pack the header of object_update_request
+ * Fill object update header
  *
- * Packs updates into the update_buffer header, which will either be sent to
- * the remote MDT or stored in the local update log. The maximum update buffer
- * size is 1MB for now.
+ * Only fill the object update header, and parameters will be filled later
+ * in other functions.
  *
- * \param[in] env      execution environment
- * \param[in] ubuf     update bufer which it will pack the update in
- * \param[in] op       update operation
- * \param[in] fid      object FID for this update
- * \param[in] param_count      parameters count for this update
- * \param[in] lens     each parameters length of this update
- * \param[in] batchid  batchid(transaction no) of this update
+ * \params[in] env             execution environment
+ * \params[in] update          object update to be filled
+ * \params[in] max_update_size maximum object update size, if the current
+ *                              update length equals or exceeds the size, it
+ *                              will return -E2BIG.
+ * \params[in] update_op       update type
+ * \params[in] fid             object FID of the update
+ * \params[in] params_count    the count of the update parameters
+ * \params[in] params_sizes    the length of each parameters
  *
- * \retval             0 pack update succeed.
- * \retval              negative errno pack update failed.
- **/
-static struct object_update *
-out_update_header_pack(const struct lu_env *env, struct update_buffer *ubuf,
-                      enum update_type op, const struct lu_fid *fid,
-                      int params_count, __u16 *param_sizes, __u64 batchid)
+ * \retval                     0 if packing succeeds.
+ * \retval                     -E2BIG if packing exceeds the maximum length.
+ */
+int out_update_header_pack(const struct lu_env *env,
+                          struct object_update *update, size_t max_update_size,
+                          enum update_type update_op, const struct lu_fid *fid,
+                          unsigned int param_count, __u16 *params_sizes)
 {
-       struct object_update_request    *ureq = ubuf->ub_req;
-       size_t                          ureq_size = ubuf->ub_req_size;
-       struct object_update            *obj_update;
        struct object_update_param      *param;
-       size_t                          update_size;
-       int                             rc = 0;
        unsigned int                    i;
-       ENTRY;
-
-       /* Check update size to make sure it can fit into the buffer */
-       ureq_size = object_update_request_size(ureq);
-       update_size = offsetof(struct object_update, ou_params[0]);
-       for (i = 0; i < params_count; i++)
-               update_size += cfs_size_round(param_sizes[i] + sizeof(*param));
-
-       if (unlikely(cfs_size_round(ureq_size + update_size) >
-                    ubuf->ub_req_size)) {
-               size_t new_size = ubuf->ub_req_size;
-
-               /* enlarge object update request size */
-               while (new_size <
-                      cfs_size_round(ureq_size + update_size))
-                       new_size += OUT_UPDATE_BUFFER_SIZE_ADD;
-               if (new_size >= OUT_UPDATE_BUFFER_SIZE_MAX)
-                       RETURN(ERR_PTR(-E2BIG));
+       size_t                          update_size;
 
-               rc = update_buffer_resize(ubuf, new_size);
-               if (rc < 0)
-                       RETURN(ERR_PTR(rc));
+       /* Check whether the packing exceeding the maxima update length */
+       update_size = sizeof(*update);
+       for (i = 0; i < param_count; i++)
+               update_size += cfs_size_round(sizeof(*param) + params_sizes[i]);
 
-               ureq = ubuf->ub_req;
-       }
+       if (unlikely(update_size >= max_update_size))
+               return -E2BIG;
 
-       /* fill the update into the update buffer */
-       obj_update = (struct object_update *)((char *)ureq + ureq_size);
-       obj_update->ou_fid = *fid;
-       obj_update->ou_type = op;
-       obj_update->ou_params_count = (__u16)params_count;
-       obj_update->ou_batchid = batchid;
-       param = &obj_update->ou_params[0];
-       for (i = 0; i < params_count; i++) {
-               param->oup_len = param_sizes[i];
+       update->ou_fid = *fid;
+       update->ou_type = update_op;
+       update->ou_params_count = param_count;
+       param = &update->ou_params[0];
+       for (i = 0; i < param_count; i++) {
+               param->oup_len = params_sizes[i];
                param = (struct object_update_param *)((char *)param +
                         object_update_param_size(param));
        }
 
-       CDEBUG(D_INFO, "%p "DFID" idx %u: op %d params %d:%d\n",
-              ureq, PFID(fid), ureq->ourq_count, op, params_count,
-              (int)update_size);
-       ureq->ourq_count++;
-
-       RETURN(obj_update);
+       return 0;
 }
 
 /**
  * Packs one update into the update_buffer.
  *
  * \param[in] env      execution environment
- * \param[in] ubuf     bufer where update will be packed
+ * \param[in] update   update to be packed
+ * \param[in] max_update_size  *maximum size of \a update
  * \param[in] op       update operation (enum update_type)
  * \param[in] fid      object FID for this update
  * \param[in] param_count      number of parameters for this update
  * \param[in] param_sizes      array of parameters length of this update
  * \param[in] param_bufs       parameter buffers
- * \param[in] batchid  transaction no of this update, plus mdt_index, which
- *                      will be globally unique
  *
  * \retval             = 0 if updates packing succeeds
  * \retval             negative errno if updates packing fails
  **/
-int out_update_pack(const struct lu_env *env, struct update_buffer *ubuf,
-                   enum update_type op, const struct lu_fid *fid,
-                   int params_count, __u16 *param_sizes,
-                   const void **param_bufs, __u64 batchid)
+int out_update_pack(const struct lu_env *env, struct object_update *update,
+                   size_t max_update_size, enum update_type op,
+                   const struct lu_fid *fid, unsigned int param_count,
+                   __u16 *param_sizes, const void **param_bufs)
 {
-       struct object_update            *update;
        struct object_update_param      *param;
        unsigned int                    i;
+       int                             rc;
        ENTRY;
 
-       update = out_update_header_pack(env, ubuf, op, fid, params_count,
-                                       param_sizes, batchid);
-       if (IS_ERR(update))
-               RETURN(PTR_ERR(update));
+       rc = out_update_header_pack(env, update, max_update_size, op, fid,
+                                   param_count, param_sizes);
+       if (rc != 0)
+               RETURN(rc);
 
        param = &update->ou_params[0];
-       for (i = 0; i < params_count; i++) {
+       for (i = 0; i < param_count; i++) {
                memcpy(&param->oup_buf[0], param_bufs[i], param_sizes[i]);
                param = (struct object_update_param *)((char *)param +
                         object_update_param_size(param));
@@ -200,79 +166,83 @@ EXPORT_SYMBOL(out_update_pack);
  * \param[in] env      execution environment
  * \param[in] ubuf     update buffer
  * \param[in] fid      fid of this object for the update
- * \param[in] batchid  batch id of this update
  *
  * \retval             0 if insertion succeeds.
  * \retval             negative errno if insertion fails.
  */
-int out_create_pack(const struct lu_env *env, struct update_buffer *ubuf,
-                   const struct lu_fid *fid, struct lu_attr *attr,
-                   struct dt_allocation_hint *hint,
-                   struct dt_object_format *dof, __u64 batchid)
+int out_create_pack(const struct lu_env *env, struct object_update *update,
+                   size_t max_update_size, const struct lu_fid *fid,
+                   const struct lu_attr *attr, struct dt_allocation_hint *hint,
+                   struct dt_object_format *dof)
 {
        struct obdo             *obdo;
        __u16                   sizes[2] = {sizeof(*obdo), 0};
        int                     buf_count = 1;
-       const struct lu_fid     *fid1 = NULL;
-       struct object_update    *update;
+       const struct lu_fid     *parent_fid = NULL;
+       int                     rc;
        ENTRY;
 
        if (hint != NULL && hint->dah_parent) {
-               fid1 = lu_object_fid(&hint->dah_parent->do_lu);
-               sizes[1] = sizeof(*fid1);
+               parent_fid = lu_object_fid(&hint->dah_parent->do_lu);
+               sizes[1] = sizeof(*parent_fid);
                buf_count++;
        }
 
-       update = out_update_header_pack(env, ubuf, OUT_CREATE, fid,
-                                       buf_count, sizes, batchid);
-       if (IS_ERR(update))
-               RETURN(PTR_ERR(update));
+       rc = out_update_header_pack(env, update, max_update_size, OUT_CREATE,
+                                   fid, buf_count, sizes);
+       if (rc != 0)
+               RETURN(rc);
 
        obdo = object_update_param_get(update, 0, NULL);
+       LASSERT(obdo != NULL);
        obdo->o_valid = 0;
        obdo_from_la(obdo, attr, attr->la_valid);
        lustre_set_wire_obdo(NULL, obdo, obdo);
-       if (fid1 != NULL) {
-               struct lu_fid *fid;
-               fid = object_update_param_get(update, 1, NULL);
-               fid_cpu_to_le(fid, fid1);
+
+       if (parent_fid != NULL) {
+               struct lu_fid *tmp;
+
+               tmp = object_update_param_get(update, 1, NULL);
+               LASSERT(tmp != NULL);
+               fid_cpu_to_le(tmp, parent_fid);
        }
 
        RETURN(0);
 }
 EXPORT_SYMBOL(out_create_pack);
 
-int out_ref_del_pack(const struct lu_env *env, struct update_buffer *ubuf,
-                    const struct lu_fid *fid, __u64 batchid)
+int out_ref_del_pack(const struct lu_env *env, struct object_update *update,
+                    size_t max_update_size, const struct lu_fid *fid)
 {
-       return out_update_pack(env, ubuf, OUT_REF_DEL, fid, 0, NULL, NULL,
-                              batchid);
+       return out_update_pack(env, update, max_update_size, OUT_REF_DEL, fid,
+                              0, NULL, NULL);
 }
 EXPORT_SYMBOL(out_ref_del_pack);
 
-int out_ref_add_pack(const struct lu_env *env, struct update_buffer *ubuf,
-                    const struct lu_fid *fid, __u64 batchid)
+int out_ref_add_pack(const struct lu_env *env, struct object_update *update,
+                    size_t max_update_size, const struct lu_fid *fid)
 {
-       return out_update_pack(env, ubuf, OUT_REF_ADD, fid, 0, NULL, NULL,
-                              batchid);
+       return out_update_pack(env, update, max_update_size, OUT_REF_ADD, fid,
+                              0, NULL, NULL);
 }
 EXPORT_SYMBOL(out_ref_add_pack);
 
-int out_attr_set_pack(const struct lu_env *env, struct update_buffer *ubuf,
-                     const struct lu_fid *fid, const struct lu_attr *attr,
-                     __u64 batchid)
+int out_attr_set_pack(const struct lu_env *env, struct object_update *update,
+                     size_t max_update_size, const struct lu_fid *fid,
+                     const struct lu_attr *attr)
 {
-       struct object_update    *update;
        struct obdo             *obdo;
        __u16                   size = sizeof(*obdo);
+       int                     rc;
        ENTRY;
 
-       update = out_update_header_pack(env, ubuf, OUT_ATTR_SET, fid, 1,
-                                       &size, batchid);
-       if (IS_ERR(update))
-               RETURN(PTR_ERR(update));
+       rc = out_update_header_pack(env, update, max_update_size,
+                                   OUT_ATTR_SET, fid, 1, &size);
+       if (rc != 0)
+               RETURN(rc);
 
        obdo = object_update_param_get(update, 0, NULL);
+       LASSERT(obdo != NULL);
        obdo->o_valid = 0;
        obdo_from_la(obdo, attr, attr->la_valid);
        lustre_set_wire_obdo(NULL, obdo, obdo);
@@ -281,34 +251,35 @@ int out_attr_set_pack(const struct lu_env *env, struct update_buffer *ubuf,
 }
 EXPORT_SYMBOL(out_attr_set_pack);
 
-int out_xattr_set_pack(const struct lu_env *env, struct update_buffer *ubuf,
-                      const struct lu_fid *fid, const struct lu_buf *buf,
-                      const char *name, int flag, __u64 batchid)
+int out_xattr_set_pack(const struct lu_env *env, struct object_update *update,
+                      size_t max_update_size, const struct lu_fid *fid,
+                      const struct lu_buf *buf, const char *name, __u32 flag)
 {
        __u16   sizes[3] = {strlen(name) + 1, buf->lb_len, sizeof(flag)};
        const void *bufs[3] = {(char *)name, (char *)buf->lb_buf,
                               (char *)&flag};
 
-       return out_update_pack(env, ubuf, OUT_XATTR_SET, fid,
-                              ARRAY_SIZE(sizes), sizes, bufs, batchid);
+       return out_update_pack(env, update, max_update_size, OUT_XATTR_SET,
+                              fid, ARRAY_SIZE(sizes), sizes, bufs);
 }
 EXPORT_SYMBOL(out_xattr_set_pack);
 
-int out_xattr_del_pack(const struct lu_env *env, struct update_buffer *ubuf,
-                      const struct lu_fid *fid, const char *name,
-                      __u64 batchid)
+int out_xattr_del_pack(const struct lu_env *env, struct object_update *update,
+                      size_t max_update_size, const struct lu_fid *fid,
+                      const char *name)
 {
        __u16   size = strlen(name) + 1;
 
-       return out_update_pack(env, ubuf, OUT_XATTR_DEL, fid, 1, &size,
-                              (const void **)&name, batchid);
+       return out_update_pack(env, update, max_update_size, OUT_XATTR_DEL,
+                              fid, 1, &size, (const void **)&name);
 }
 EXPORT_SYMBOL(out_xattr_del_pack);
 
 
-int out_index_insert_pack(const struct lu_env *env, struct update_buffer *ubuf,
-                         const struct lu_fid *fid, const struct dt_rec *rec,
-                         const struct dt_key *key, __u64 batchid)
+int out_index_insert_pack(const struct lu_env *env,
+                         struct object_update *update,
+                         size_t max_update_size, const struct lu_fid *fid,
+                         const struct dt_rec *rec, const struct dt_key *key)
 {
        struct dt_insert_rec       *rec1 = (struct dt_insert_rec *)rec;
        struct lu_fid              rec_fid;
@@ -322,35 +293,36 @@ int out_index_insert_pack(const struct lu_env *env, struct update_buffer *ubuf,
 
        fid_cpu_to_le(&rec_fid, rec1->rec_fid);
 
-       return out_update_pack(env, ubuf, OUT_INDEX_INSERT, fid,
-                              ARRAY_SIZE(sizes), sizes, bufs, batchid);
+       return out_update_pack(env, update, max_update_size, OUT_INDEX_INSERT,
+                              fid, ARRAY_SIZE(sizes), sizes, bufs);
 }
 EXPORT_SYMBOL(out_index_insert_pack);
 
-int out_index_delete_pack(const struct lu_env *env, struct update_buffer *ubuf,
-                         const struct lu_fid *fid, const struct dt_key *key,
-                         __u64 batchid)
+int out_index_delete_pack(const struct lu_env *env,
+                         struct object_update *update,
+                         size_t max_update_size, const struct lu_fid *fid,
+                         const struct dt_key *key)
 {
        __u16   size = strlen((char *)key) + 1;
        const void *buf = key;
 
-       return out_update_pack(env, ubuf, OUT_INDEX_DELETE, fid, 1, &size,
-                              &buf, batchid);
+       return out_update_pack(env, update, max_update_size, OUT_INDEX_DELETE,
+                              fid, 1, &size, &buf);
 }
 EXPORT_SYMBOL(out_index_delete_pack);
 
 int out_object_destroy_pack(const struct lu_env *env,
-                           struct update_buffer *ubuf,
-                           const struct lu_fid *fid, __u64 batchid)
+                           struct object_update *update,
+                           size_t max_update_size, const struct lu_fid *fid)
 {
-       return out_update_pack(env, ubuf, OUT_DESTROY, fid, 0, NULL, NULL,
-                              batchid);
+       return out_update_pack(env, update, max_update_size, OUT_DESTROY, fid,
+                              0, NULL, NULL);
 }
 EXPORT_SYMBOL(out_object_destroy_pack);
 
-int out_write_pack(const struct lu_env *env, struct update_buffer *ubuf,
-                  const struct lu_fid *fid, const struct lu_buf *buf,
-                  loff_t pos, __u64 batchid)
+int out_write_pack(const struct lu_env *env, struct object_update *update,
+                  size_t max_update_size, const struct lu_fid *fid,
+                  const struct lu_buf *buf, __u64 pos)
 {
        __u16           sizes[2] = {buf->lb_len, sizeof(pos)};
        const void      *bufs[2] = {(char *)buf->lb_buf, (char *)&pos};
@@ -358,8 +330,8 @@ int out_write_pack(const struct lu_env *env, struct update_buffer *ubuf,
 
        pos = cpu_to_le64(pos);
 
-       rc = out_update_pack(env, ubuf, OUT_WRITE, fid, ARRAY_SIZE(sizes),
-                            sizes, bufs, batchid);
+       rc = out_update_pack(env, update, max_update_size, OUT_WRITE, fid,
+                            ARRAY_SIZE(sizes), sizes, bufs);
        return rc;
 }
 EXPORT_SYMBOL(out_write_pack);
@@ -375,36 +347,40 @@ EXPORT_SYMBOL(out_write_pack);
  * \param[in] fid      fid of this object for the update
  * \param[in] ubuf     update buffer
  *
- * \retval             0 if packing succeeds.
- * \retval             negative errno if packing fails.
- */
-int out_index_lookup_pack(const struct lu_env *env, struct update_buffer *ubuf,
-                         const struct lu_fid *fid, struct dt_rec *rec,
-                         const struct dt_key *key)
+ * \retval             = 0 pack succeed.
+ *                      < 0 pack failed.
+ **/
+int out_index_lookup_pack(const struct lu_env *env,
+                         struct object_update *update,
+                         size_t max_update_size, const struct lu_fid *fid,
+                         struct dt_rec *rec, const struct dt_key *key)
 {
        const void      *name = key;
        __u16           size = strlen((char *)name) + 1;
 
-       return out_update_pack(env, ubuf, OUT_INDEX_LOOKUP, fid, 1, &size,
-                              &name, 0);
+       return out_update_pack(env, update, max_update_size, OUT_INDEX_LOOKUP,
+                              fid, 1, &size, &name);
 }
 EXPORT_SYMBOL(out_index_lookup_pack);
 
-int out_attr_get_pack(const struct lu_env *env, struct update_buffer *ubuf,
-                     const struct lu_fid *fid)
+int out_attr_get_pack(const struct lu_env *env, struct object_update *update,
+                     size_t max_update_size, const struct lu_fid *fid)
 {
-       return out_update_pack(env, ubuf, OUT_ATTR_GET, fid, 0, NULL, NULL, 0);
+       return out_update_pack(env, update, max_update_size, OUT_ATTR_GET,
+                              fid, 0, NULL, NULL);
 }
 EXPORT_SYMBOL(out_attr_get_pack);
 
-int out_xattr_get_pack(const struct lu_env *env, struct update_buffer *ubuf,
-                      const struct lu_fid *fid, const char *name)
+int out_xattr_get_pack(const struct lu_env *env, struct object_update *update,
+                      size_t max_update_size, const struct lu_fid *fid,
+                      const char *name)
 {
        __u16 size;
 
        LASSERT(name != NULL);
        size = strlen(name) + 1;
-       return out_update_pack(env, ubuf, OUT_XATTR_GET, fid, 1, &size,
-                              (const void **)&name, 0);
+
+       return out_update_pack(env, update, max_update_size, OUT_XATTR_GET,
+                              fid, 1, &size, (const void **)&name);
 }
 EXPORT_SYMBOL(out_xattr_get_pack);
index 3cb395f..919fa1e 100644 (file)
@@ -209,6 +209,8 @@ int out_handle(struct tgt_session_info *tsi);
 #define out_tx_write(info, obj, buf, pos, th, reply, idx) \
        __out_tx_write(info, obj, buf, pos, th, reply, idx, __FILE__, __LINE__)
 
+const char *update_op_str(__u16 opcode);
+
 extern struct page *tgt_page_to_corrupt;
 
 struct tgt_thread_big_cache {
@@ -221,4 +223,29 @@ int tgt_txn_start_cb(const struct lu_env *env, struct thandle *th,
 int tgt_txn_stop_cb(const struct lu_env *env, struct thandle *th,
                    void *cookie);
 
+void update_records_dump(const struct update_records *records,
+                        unsigned int mask, bool dump_updates);
+
+struct update_thread_info {
+       struct lu_attr                  uti_attr;
+       struct lu_fid                   uti_fid;
+       struct lu_buf                   uti_buf;
+       struct thandle_update_records   uti_tur;
+       struct obdo                     uti_obdo;
+};
+
+extern struct lu_context_key update_thread_key;
+
+static inline struct update_thread_info *
+update_env_info(const struct lu_env *env)
+{
+       struct update_thread_info *uti;
+
+       uti = lu_context_key_get(&env->le_ctx, &update_thread_key);
+       LASSERT(uti != NULL);
+       return uti;
+}
+
+void update_info_init(void);
+void update_info_fini(void);
 #endif /* _TG_INTERNAL_H */
index d7791ca..269d896 100644 (file)
@@ -216,6 +216,8 @@ int tgt_mod_init(void)
        tgt_ses_key_init_generic(&tgt_session_key, NULL);
        lu_context_key_register_many(&tgt_session_key, NULL);
 
+       update_info_init();
+
        RETURN(0);
 }
 
@@ -226,5 +228,6 @@ void tgt_mod_exit(void)
 
        lu_context_key_degister(&tgt_thread_key);
        lu_context_key_degister(&tgt_session_key);
+       update_info_fini();
 }
 
diff --git a/lustre/target/update_records.c b/lustre/target/update_records.c
new file mode 100644 (file)
index 0000000..836f150
--- /dev/null
@@ -0,0 +1,965 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2014, Intel Corporation.
+ */
+
+/*
+ * lustre/target/update_records.c
+ *
+ * This file implement the methods to pack updates as update records, which
+ * will be written to the disk as llog record, and might be used during
+ * recovery.
+ *
+ * For cross-MDT operation, all of updates of the operation needs to be
+ * recorded in the disk, then during recovery phase, the recovery thread
+ * will retrieve and redo these updates if it needed.
+ *
+ * See comments above struct update_records for the format of update_records.
+ *
+ * Author: Di Wang <di.wang@intel.com>
+ */
+#define DEBUG_SUBSYSTEM S_CLASS
+
+#include <lu_target.h>
+#include <lustre_update.h>
+#include <obd.h>
+#include <obd_class.h>
+#include "tgt_internal.h"
+
+#define UPDATE_RECORDS_BUFFER_SIZE     8192
+#define UPDATE_PARAMS_BUFFER_SIZE      8192
+/**
+ * Dump update record.
+ *
+ * Dump all of updates in the update_records, mostly for debugging purpose.
+ *
+ * \param[in] records  update records to be dumpped
+ * \param[in] mask     debug level mask
+ * \param[in] dump_params if dump all of updates the updates.
+ *
+ */
+void update_records_dump(const struct update_records *records,
+                        unsigned int mask, bool dump_updates)
+{
+       const struct update_ops *ops;
+       const struct update_op  *op = NULL;
+       struct update_params    *params;
+       unsigned int            i;
+
+       ops = &records->ur_ops;
+       params = update_records_get_params(records);
+
+       CDEBUG(mask, "ops = %d params_count = %d\n", records->ur_update_count,
+              records->ur_param_count);
+
+       if (records->ur_update_count == 0)
+               return;
+
+       if (!dump_updates)
+               return;
+
+       op = &ops->uops_op[0];
+       for (i = 0; i < records->ur_update_count; i++) {
+               unsigned int j;
+
+               CDEBUG(mask, "update %dth "DFID" %s params_count = %hu\n", i,
+                      PFID(&op->uop_fid), update_op_str(op->uop_type),
+                      op->uop_param_count);
+
+               for (j = 0;  j < op->uop_param_count; j++) {
+                       struct object_update_param *param;
+
+                       param = update_params_get_param(params,
+                                       (unsigned int)op->uop_params_off[j],
+                                       records->ur_param_count);
+
+                       LASSERT(param != NULL);
+                       CDEBUG(mask, "param = %p %dth off = %hu size = %hu\n",
+                              param, j, op->uop_params_off[j], param->oup_len);
+               }
+
+               op = update_op_next_op(op);
+       }
+}
+
+/**
+ * Pack parameters to update records
+ *
+ * Find and insert parameter to update records, if the parameter
+ * already exists in \a params, then just return the offset of this
+ * parameter, otherwise insert the parameter and return its offset
+ *
+ * \param[in] params   update params in which to insert parameter
+ * \param[in] new_param        parameters to be inserted.
+ * \param[in] new_param_size   the size of \a new_param
+ *
+ * \retval             index inside \a params if parameter insertion
+ *                      succeeds.
+ * \retval             negative errno if it fails.
+ */
+static unsigned int update_records_param_pack(struct update_params *params,
+                                             const void *new_param,
+                                             size_t new_param_size,
+                                             unsigned int *param_count)
+{
+       struct object_update_param      *param;
+       unsigned int                    i;
+
+       for (i = 0; i < *param_count; i++) {
+               struct object_update_param *param;
+
+               param = update_params_get_param(params, i, *param_count);
+               if ((new_param == NULL && param->oup_len == new_param_size) ||
+                   (param->oup_len == new_param_size &&
+                    memcmp(param->oup_buf, new_param, new_param_size) == 0))
+                       /* Found the parameter and return its index */
+                       return i;
+       }
+
+       param = (struct object_update_param *)((char *)params +
+                               update_params_size(params, *param_count));
+
+       param->oup_len = new_param_size;
+       if (new_param != NULL)
+               memcpy(param->oup_buf, new_param, new_param_size);
+
+       *param_count = *param_count + 1;
+
+       return *param_count - 1;
+}
+
+/**
+ * Pack update to update records
+ *
+ * Pack the update and its parameters to the update records. First it will
+ * insert parameters, get the offset of these parameter, then fill the
+ * update with these offset. If insertion exceed the maximum size of
+ * current update records, it will return -E2BIG here, and the caller might
+ * extend the update_record size \see lod_updates_pack.
+ *
+ * \param[in] env      execution environment
+ * \param[in] fid      FID of the update.
+ * \param[in] op_type  operation type of the update
+ * \param[in] ops      ur_ops in update records
+ * \param[in|out] op_count     pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params   ur_params in update records
+ * \param[in|out] param_count  pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] param_bufs       buffers of parameters
+ * \param[in] params_buf_count the count of the parameter buffers
+ * \param[in] param_size       sizes of parameters
+ *
+ * \retval             0 if packing succeeds
+ * \retval             negative errno if packing fails
+ */
+static int update_records_update_pack(const struct lu_env *env,
+                                     const struct lu_fid *fid,
+                                     enum update_type op_type,
+                                     struct update_ops *ops,
+                                     unsigned int *op_count,
+                                     size_t *max_op_size,
+                                     struct update_params *params,
+                                     unsigned int *param_count,
+                                     size_t *max_param_size,
+                                     unsigned int param_bufs_count,
+                                     const void **param_bufs,
+                                     size_t *param_sizes)
+{
+       struct update_op        *op;
+       size_t                  total_param_sizes = 0;
+       int                     index;
+       unsigned int            i;
+
+       /* Check whether the packing exceeding the maximum update size */
+       if (unlikely(*max_op_size < update_op_size(param_bufs_count))) {
+               CDEBUG(D_INFO, "max_op_size = %zu update_op = %zu\n",
+                      *max_op_size, update_op_size(param_bufs_count));
+               *max_op_size = update_op_size(param_bufs_count);
+               return -E2BIG;
+       }
+
+       for (i = 0; i < param_bufs_count; i++)
+               total_param_sizes +=
+                       cfs_size_round(sizeof(struct object_update_param) +
+                                      param_sizes[i]);
+
+       /* Check whether the packing exceeding the maximum parameter size */
+       if (unlikely(*max_param_size < total_param_sizes)) {
+               CDEBUG(D_INFO, "max_param_size = %zu params size = %zu\n",
+                      *max_param_size, total_param_sizes);
+
+               *max_param_size = total_param_sizes;
+               return -E2BIG;
+       }
+
+       op = update_ops_get_op(ops, *op_count, *op_count);
+       op->uop_fid = *fid;
+       op->uop_type = op_type;
+       op->uop_param_count = param_bufs_count;
+       for (i = 0; i < param_bufs_count; i++) {
+               index = update_records_param_pack(params, param_bufs[i],
+                                                 param_sizes[i], param_count);
+               if (index < 0)
+                       return index;
+
+               CDEBUG(D_INFO, "%s %uth param offset = %d size = %zu\n",
+                      update_op_str(op_type), i, index, param_sizes[i]);
+
+               op->uop_params_off[i] = index;
+       }
+       CDEBUG(D_INFO, "%huth "DFID" %s param_count = %u\n",
+              *op_count, PFID(fid), update_op_str(op_type), *param_count);
+
+       *op_count = *op_count + 1;
+
+       return 0;
+}
+
+/**
+ * Pack create update
+ *
+ * Pack create update into update records.
+ *
+ * \param[in] env      execution environment
+ * \param[in] ops      ur_ops in update records
+ * \param[in|out] op_count     pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params   ur_params in update records
+ * \param[in|out] param_count  pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid      FID of the object to be created
+ * \param[in] attr     attribute of the object to be created
+ * \param[in] hint     creation hint
+ * \param[in] dof      creation format information
+ *
+ * \retval             0 if packing succeeds.
+ * \retval             negative errno if packing fails.
+ */
+int update_records_create_pack(const struct lu_env *env,
+                              struct update_ops *ops,
+                              unsigned int *op_count,
+                              size_t *max_ops_size,
+                              struct update_params *params,
+                              unsigned int *param_count,
+                              size_t *max_param_size,
+                              const struct lu_fid *fid,
+                              const struct lu_attr *attr,
+                              const struct dt_allocation_hint *hint,
+                              struct dt_object_format *dof)
+{
+       size_t                  sizes[2];
+       const void              *bufs[2];
+       int                     buf_count = 0;
+       const struct lu_fid     *parent_fid = NULL;
+       struct lu_fid           tmp_fid;
+       int                     rc;
+       struct obdo             *obdo;
+
+       if (attr != NULL) {
+               obdo = &update_env_info(env)->uti_obdo;
+               obdo->o_valid = 0;
+               obdo_from_la(obdo, attr, attr->la_valid);
+               lustre_set_wire_obdo(NULL, obdo, obdo);
+               bufs[buf_count] = obdo;
+               sizes[buf_count] = sizeof(*obdo);
+               buf_count++;
+       }
+
+       if (hint != NULL && hint->dah_parent != NULL) {
+               parent_fid = lu_object_fid(&hint->dah_parent->do_lu);
+               fid_cpu_to_le(&tmp_fid, parent_fid);
+               bufs[buf_count] = &tmp_fid;
+               sizes[buf_count] = sizeof(tmp_fid);
+               buf_count++;
+       }
+
+       rc = update_records_update_pack(env, fid, OUT_CREATE, ops, op_count,
+                                       max_ops_size, params, param_count,
+                                       max_param_size, buf_count, bufs, sizes);
+       return rc;
+}
+EXPORT_SYMBOL(update_records_create_pack);
+
+/**
+ * Pack attr set update
+ *
+ * Pack attr_set update into update records.
+ *
+ * \param[in] env      execution environment
+ * \param[in] ops      ur_ops in update records
+ * \param[in|out] op_count pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params   ur_params in update records
+ * \param[in|out] param_count pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid      FID of the object to set attr
+ * \param[in] attr     attribute of attr set
+ *
+ * \retval             0 if packing succeeds.
+ * \retval             negative errno if packing fails.
+ */
+int update_records_attr_set_pack(const struct lu_env *env,
+                                struct update_ops *ops,
+                                unsigned int *op_count,
+                                size_t *max_ops_size,
+                                struct update_params *params,
+                                unsigned int *param_count,
+                                size_t *max_param_size,
+                                const struct lu_fid *fid,
+                                const struct lu_attr *attr)
+{
+       struct obdo *obdo = &update_env_info(env)->uti_obdo;
+       size_t size = sizeof(*obdo);
+
+       obdo->o_valid = 0;
+       obdo_from_la(obdo, attr, attr->la_valid);
+       lustre_set_wire_obdo(NULL, obdo, obdo);
+       return update_records_update_pack(env, fid, OUT_ATTR_SET, ops, op_count,
+                                         max_ops_size, params, param_count,
+                                         max_param_size, 1,
+                                         (const void **)&obdo, &size);
+}
+EXPORT_SYMBOL(update_records_attr_set_pack);
+
+/**
+ * Pack ref add update
+ *
+ * Pack ref add update into update records.
+ *
+ * \param[in] env      execution environment
+ * \param[in] ops      ur_ops in update records
+ * \param[in|out] op_count pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params   ur_params in update records
+ * \param[in|out] param_count pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid      FID of the object to add reference
+ *
+ * \retval             0 if packing succeeds.
+ * \retval             negative errno if packing fails.
+ */
+int update_records_ref_add_pack(const struct lu_env *env,
+                               struct update_ops *ops,
+                               unsigned int *op_count,
+                               size_t *max_ops_size,
+                               struct update_params *params,
+                               unsigned int *param_count,
+                               size_t *max_param_size,
+                               const struct lu_fid *fid)
+{
+       return update_records_update_pack(env, fid, OUT_REF_ADD, ops, op_count,
+                                         max_ops_size, params, param_count,
+                                         max_param_size, 0, NULL, NULL);
+}
+EXPORT_SYMBOL(update_records_ref_add_pack);
+
+/**
+ * Pack ref del update
+ *
+ * Pack ref del update into update records.
+ *
+ * \param[in] env      execution environment
+ * \param[in] ops      ur_ops in update records
+ * \param[in|out] op_count     pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params   ur_params in update records
+ * \param[in] param_count      pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid      FID of the object to delete reference
+ *
+ * \retval             0 if packing succeeds.
+ * \retval             negative errno if packing fails.
+ */
+int update_records_ref_del_pack(const struct lu_env *env,
+                               struct update_ops *ops,
+                               unsigned int *op_count,
+                               size_t *max_ops_size,
+                               struct update_params *params,
+                               unsigned int *param_count,
+                               size_t *max_param_size,
+                               const struct lu_fid *fid)
+{
+       return update_records_update_pack(env, fid, OUT_REF_DEL, ops, op_count,
+                                         max_ops_size, params, param_count,
+                                         max_param_size, 0, NULL, NULL);
+}
+EXPORT_SYMBOL(update_records_ref_del_pack);
+
+/**
+ * Pack object destroy update
+ *
+ * Pack object destroy update into update records.
+ *
+ * \param[in] env      execution environment
+ * \param[in] ops      ur_ops in update records
+ * \param[in|out] op_count pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params   ur_params in update records
+ * \param[in|out] param_count  pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid      FID of the object to delete reference
+ *
+ * \retval             0 if packing succeeds.
+ * \retval             negative errno if packing fails.
+ */
+int update_records_object_destroy_pack(const struct lu_env *env,
+                                      struct update_ops *ops,
+                                      unsigned int *op_count,
+                                      size_t *max_ops_size,
+                                      struct update_params *params,
+                                      unsigned int *param_count,
+                                      size_t *max_param_size,
+                                      const struct lu_fid *fid)
+{
+       return update_records_update_pack(env, fid, OUT_DESTROY, ops, op_count,
+                                         max_ops_size, params, param_count,
+                                         max_param_size, 0, NULL, NULL);
+}
+EXPORT_SYMBOL(update_records_object_destroy_pack);
+
+/**
+ * Pack index insert update
+ *
+ * Pack index insert update into update records.
+ *
+ * \param[in] env      execution environment
+ * \param[in] ops      ur_ops in update records
+ * \param[in] op_count pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params   ur_params in update records
+ * \param[in] param_count      pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid      FID of the object to insert index
+ * \param[in] rec      record of insertion
+ * \param[in] key      key of insertion
+ *
+ * \retval             0 if packing succeeds.
+ * \retval             negative errno if packing fails.
+ */
+int update_records_index_insert_pack(const struct lu_env *env,
+                                    struct update_ops *ops,
+                                    unsigned int *op_count,
+                                    size_t *max_ops_size,
+                                    struct update_params *params,
+                                    unsigned int *param_count,
+                                    size_t *max_param_size,
+                                    const struct lu_fid *fid,
+                                    const struct dt_rec *rec,
+                                    const struct dt_key *key)
+{
+       struct dt_insert_rec       *rec1 = (struct dt_insert_rec *)rec;
+       struct lu_fid              rec_fid;
+       __u32                      type = cpu_to_le32(rec1->rec_type);
+       size_t                     sizes[3] = { strlen((const char *)key) + 1,
+                                               sizeof(rec_fid),
+                                               sizeof(type) };
+       const void                 *bufs[3] = { key,
+                                               &rec_fid,
+                                               &type };
+
+       fid_cpu_to_le(&rec_fid, rec1->rec_fid);
+
+       return update_records_update_pack(env, fid, OUT_INDEX_INSERT, ops,
+                                         op_count, max_ops_size, params,
+                                         param_count, max_param_size,
+                                         3, bufs, sizes);
+}
+EXPORT_SYMBOL(update_records_index_insert_pack);
+
+/**
+ * Pack index delete update
+ *
+ * Pack index delete update into update records.
+ *
+ * \param[in] env      execution environment
+ * \param[in] ops      ur_ops in update records
+ * \param[in|out] op_count     pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params   ur_params in update records
+ * \param[in|ount] param_count pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid      FID of the object to delete index
+ * \param[in] key      key of deletion
+ *
+ * \retval             0 if packing succeeds.
+ * \retval             negative errno if packing fails.
+ */
+int update_records_index_delete_pack(const struct lu_env *env,
+                                    struct update_ops *ops,
+                                    unsigned int *op_count,
+                                    size_t *max_ops_size,
+                                    struct update_params *params,
+                                    unsigned int *param_count,
+                                    size_t *max_param_size,
+                                    const struct lu_fid *fid,
+                                    const struct dt_key *key)
+{
+       size_t size = strlen((const char *)key) + 1;
+
+       return update_records_update_pack(env, fid, OUT_INDEX_DELETE, ops,
+                                         op_count, max_ops_size, params,
+                                         param_count, max_param_size,
+                                         1, (const void **)&key, &size);
+}
+EXPORT_SYMBOL(update_records_index_delete_pack);
+
+/**
+ * Pack xattr set update
+ *
+ * Pack xattr set update into update records.
+ *
+ * \param[in] env      execution environment
+ * \param[in] ops      ur_ops in update records
+ * \param[in|out] op_count     pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params   ur_params in update records
+ * \param[in|out] param_count  pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid      FID of the object to set xattr
+ * \param[in] buf      xattr to be set
+ * \param[in] name     name of the xattr
+ * \param[in] flag     flag for setting xattr
+ *
+ * \retval             0 if packing succeeds.
+ * \retval             negative errno if packing fails.
+ */
+int update_records_xattr_set_pack(const struct lu_env *env,
+                                 struct update_ops *ops,
+                                 unsigned int *op_count,
+                                 size_t *max_ops_size,
+                                 struct update_params *params,
+                                 unsigned int *param_count,
+                                 size_t *max_param_size,
+                                 const struct lu_fid *fid,
+                                 const struct lu_buf *buf, const char *name,
+                                 __u32 flag)
+{
+       size_t  sizes[3] = {strlen(name) + 1, buf->lb_len, sizeof(flag)};
+       const void *bufs[3] = {name, buf->lb_buf, &flag};
+
+       flag = cpu_to_le32(flag);
+
+       return update_records_update_pack(env, fid, OUT_XATTR_SET, ops,
+                                         op_count, max_ops_size, params,
+                                         param_count, max_param_size,
+                                         3, bufs, sizes);
+}
+EXPORT_SYMBOL(update_records_xattr_set_pack);
+
+/**
+ * Pack xattr delete update
+ *
+ * Pack xattr delete update into update records.
+ *
+ * \param[in] env      execution environment
+ * \param[in] ops      ur_ops in update records
+ * \param[in|out] op_count     pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params   ur_params in update records
+ * \param[in|out] param_count  pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid      FID of the object to delete xattr
+ * \param[in] name     name of the xattr
+ *
+ * \retval             0 if packing succeeds.
+ * \retval             negative errno if packing fails.
+ */
+int update_records_xattr_del_pack(const struct lu_env *env,
+                                 struct update_ops *ops,
+                                 unsigned int *op_count,
+                                 size_t *max_ops_size,
+                                 struct update_params *params,
+                                 unsigned int *param_count,
+                                 size_t *max_param_size,
+                                 const struct lu_fid *fid,
+                                 const char *name)
+{
+       size_t  size = strlen(name) + 1;
+
+       return update_records_update_pack(env, fid, OUT_XATTR_DEL, ops,
+                                         op_count, max_ops_size, params,
+                                         param_count, max_param_size,
+                                         1, (const void **)&name, &size);
+}
+EXPORT_SYMBOL(update_records_xattr_del_pack);
+
+/**
+ * Pack write update
+ *
+ * Pack write update into update records.
+ *
+ * \param[in] env      execution environment
+ * \param[in] ops      ur_ops in update records
+ * \param[in|out] op_count     pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params   ur_params in update records
+ * \param[in|out] param_count  pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid      FID of the object to write into
+ * \param[in] buf      buffer to write which includes an embedded size field
+ * \param[in] pos      offet in the object to start writing at
+ *
+ * \retval             0 if packing succeeds.
+ * \retval             negative errno if packing fails.
+ */
+int update_records_write_pack(const struct lu_env *env,
+                             struct update_ops *ops,
+                             unsigned int *op_count,
+                             size_t *max_ops_size,
+                             struct update_params *params,
+                             unsigned int *param_count,
+                             size_t *max_param_size,
+                             const struct lu_fid *fid,
+                             const struct lu_buf *buf,
+                             __u64 pos)
+{
+       size_t          sizes[2] = {buf->lb_len, sizeof(pos)};
+       const void      *bufs[2] = {buf->lb_buf, &pos};
+
+       pos = cpu_to_le64(pos);
+
+       return update_records_update_pack(env, fid, OUT_XATTR_DEL, ops,
+                                         op_count, max_ops_size, params,
+                                         param_count, max_param_size,
+                                         2, bufs, sizes);
+}
+EXPORT_SYMBOL(update_records_write_pack);
+
+/**
+ * Create update records in thandle_update_records
+ *
+ * Allocate update_records for thandle_update_records, the initial size
+ * will be 4KB.
+ *
+ * \param[in] tur      thandle_update_records where update_records will be
+ *                      allocated
+ * \retval             0 if allocation succeeds.
+ * \retval             negative errno if allocation fails.
+ */
+static int tur_update_records_create(struct thandle_update_records *tur)
+{
+       if (tur->tur_update_records != NULL)
+               return 0;
+
+       OBD_ALLOC_LARGE(tur->tur_update_records,
+                       UPDATE_RECORDS_BUFFER_SIZE);
+
+       if (tur->tur_update_records == NULL)
+               return -ENOMEM;
+
+       tur->tur_update_records_buf_size = UPDATE_RECORDS_BUFFER_SIZE;
+
+       return 0;
+}
+
+/**
+ * Extend update records
+ *
+ * Extend update_records to the new size in thandle_update_records.
+ *
+ * \param[in] tur      thandle_update_records where update_records will be
+ *                      extended.
+ * \retval             0 if extension succeeds.
+ * \retval             negative errno if extension fails.
+ */
+int tur_update_records_extend(struct thandle_update_records *tur,
+                             size_t new_size)
+{
+       struct llog_update_record       *record;
+
+       OBD_ALLOC_LARGE(record, new_size);
+       if (record == NULL)
+               return -ENOMEM;
+
+       if (tur->tur_update_records != NULL) {
+               memcpy(record, tur->tur_update_records,
+                      tur->tur_update_records_buf_size);
+               OBD_FREE_LARGE(tur->tur_update_records,
+                              tur->tur_update_records_buf_size);
+       }
+
+       tur->tur_update_records = record;
+       tur->tur_update_records_buf_size = new_size;
+
+       return 0;
+}
+EXPORT_SYMBOL(tur_update_records_extend);
+
+/**
+ * Extend update records
+ *
+ * Extend update records in thandle to make sure it is able to hold
+ * the update with certain update_op and params size.
+ *
+ * \param [in] tur     thandle_update_records to be extend
+ * \param [in] new_op_size update_op size of the update record
+ * \param [in] new_param_size params size of the update record
+ *
+ * \retval             0 if the update_records is being extended.
+ * \retval             negative errno if the update_records is not being
+ *                      extended.
+ */
+int tur_update_extend(struct thandle_update_records *tur,
+                     size_t new_op_size, size_t new_param_size)
+{
+       size_t record_size;
+       size_t params_size;
+       size_t extend_size;
+       int rc;
+       ENTRY;
+
+       record_size = llog_update_record_size(tur->tur_update_records);
+       /* extend update records buffer */
+       if (new_op_size > (tur->tur_update_records_buf_size - record_size -
+                          sizeof(*tur->tur_update_records))) {
+               extend_size = round_up(new_op_size, UPDATE_RECORDS_BUFFER_SIZE);
+               rc = tur_update_records_extend(tur,
+                               tur->tur_update_records_buf_size +
+                               extend_size);
+               if (rc != 0)
+                       RETURN(rc);
+       }
+
+       /* extend parameters buffer */
+       params_size = update_params_size(tur->tur_update_params,
+                                        tur->tur_update_param_count);
+       if (new_param_size > (tur->tur_update_params_buf_size -
+                             params_size)) {
+               extend_size = round_up(new_param_size,
+                                      UPDATE_PARAMS_BUFFER_SIZE);
+               rc = tur_update_params_extend(tur,
+                               tur->tur_update_params_buf_size +
+                               extend_size);
+               if (rc != 0)
+                       RETURN(rc);
+       }
+
+       RETURN(0);
+}
+EXPORT_SYMBOL(tur_update_extend);
+
+/**
+ * Create update params in thandle_update_records
+ *
+ * Allocate update_params for thandle_update_records, the initial size
+ * will be 4KB.
+ *
+ * \param[in] tur      thandle_update_records where update_params will be
+ *                      allocated
+ * \retval             0 if allocation succeeds.
+ * \retval             negative errno if allocation fails.
+ */
+static int tur_update_params_create(struct thandle_update_records *tur)
+{
+       if (tur->tur_update_params != NULL)
+               return 0;
+
+       OBD_ALLOC_LARGE(tur->tur_update_params, UPDATE_PARAMS_BUFFER_SIZE);
+       if (tur->tur_update_params == NULL)
+               return -ENOMEM;
+
+       tur->tur_update_params_buf_size = UPDATE_PARAMS_BUFFER_SIZE;
+       return 0;
+}
+
+/**
+ * Extend update params
+ *
+ * Extend update_params to the new size in thandle_update_records.
+ *
+ * \param[in] tur      thandle_update_records where update_params will be
+ *                      extended.
+ * \retval             0 if extension succeeds.
+ * \retval             negative errno if extension fails.
+ */
+int tur_update_params_extend(struct thandle_update_records *tur,
+                            size_t new_size)
+{
+       struct update_params    *params;
+
+       OBD_ALLOC_LARGE(params, new_size);
+       if (params == NULL)
+               return -ENOMEM;
+
+       if (tur->tur_update_params != NULL) {
+               memcpy(params, tur->tur_update_params,
+                      tur->tur_update_params_buf_size);
+               OBD_FREE_LARGE(tur->tur_update_params,
+                              tur->tur_update_params_buf_size);
+       }
+
+       tur->tur_update_params = params;
+       tur->tur_update_params_buf_size = new_size;
+
+       return 0;
+}
+EXPORT_SYMBOL(tur_update_params_extend);
+
+/**
+ * Check and prepare whether it needs to record update.
+ *
+ * Checks if the transaction needs to record updates, and if it
+ * does, then initialize the update record buffer in the transaction.
+ *
+ * \param[in] env      execution environment
+ * \param[in] th       transaction handle
+ *
+ * \retval             0 if updates recording succeeds.
+ * \retval             negative errno if updates recording fails.
+ */
+int check_and_prepare_update_record(const struct lu_env *env,
+                                   struct thandle *th)
+{
+       struct thandle_update_records   *tur;
+       struct llog_update_record       *lur;
+       struct top_thandle              *top_th;
+       struct sub_thandle              *lst;
+       int                             rc;
+       bool                            record_update = false;
+       ENTRY;
+
+       top_th = container_of(th, struct top_thandle, tt_super);
+       /* Check if it needs to record updates for this transaction */
+       list_for_each_entry(lst, &top_th->tt_sub_thandle_list, st_sub_list) {
+               if (lst->st_record_update) {
+                       record_update = true;
+                       break;
+               }
+       }
+       if (!record_update)
+               RETURN(0);
+
+       if (top_th->tt_update_records != NULL)
+               RETURN(0);
+
+       tur = &update_env_info(env)->uti_tur;
+       if (tur->tur_update_records == NULL) {
+               rc = tur_update_records_create(tur);
+               if (rc < 0)
+                       RETURN(rc);
+       }
+
+       if (tur->tur_update_params == NULL) {
+               rc = tur_update_params_create(tur);
+               if (rc < 0)
+                       RETURN(rc);
+       }
+
+       lur = tur->tur_update_records;
+       lur->lur_update_rec.ur_update_count = 0;
+       lur->lur_update_rec.ur_param_count = 0;
+       lur->lur_update_rec.ur_master_transno = 0;
+       lur->lur_update_rec.ur_batchid = 0;
+       lur->lur_update_rec.ur_flags = 0;
+
+       tur->tur_update_param_count = 0;
+
+       top_th->tt_update_records = tur;
+
+       RETURN(0);
+}
+EXPORT_SYMBOL(check_and_prepare_update_record);
+
+/**
+ * Merge params into the update records
+ *
+ * Merge params and ops into the update records attached to top th.
+ * During transaction execution phase, parameters and update ops
+ * are collected in two different buffers (see lod_updates_pack()),
+ * then in transaction stop, it needs to be merged them in one
+ * buffer, then being written into the update log.
+ *
+ * \param[in] env      execution environment
+ * \param[in] tur      thandle update records whose updates and
+ *                      parameters are merged
+ *
+ * \retval             0 if merging succeeds.
+ * \retval             negaitive errno if merging fails.
+ */
+int merge_params_updates_buf(const struct lu_env *env,
+                            struct thandle_update_records *tur)
+{
+       struct llog_update_record *lur;
+       struct update_params *params;
+       size_t params_size;
+       size_t record_size;
+
+       if (tur->tur_update_records == NULL ||
+           tur->tur_update_params == NULL)
+               return 0;
+
+       lur = tur->tur_update_records;
+       /* Extends the update records buffer if needed */
+       params_size = update_params_size(tur->tur_update_params,
+                                        tur->tur_update_param_count);
+       record_size = llog_update_record_size(lur);
+       if (record_size + params_size > tur->tur_update_records_buf_size) {
+               int rc;
+
+               rc = tur_update_records_extend(tur, record_size + params_size);
+               if (rc < 0)
+                       return rc;
+               lur = tur->tur_update_records;
+       }
+
+       params = update_records_get_params(&lur->lur_update_rec);
+       memcpy(params, tur->tur_update_params, params_size);
+       lur->lur_update_rec.ur_param_count = tur->tur_update_param_count;
+       return 0;
+}
+EXPORT_SYMBOL(merge_params_updates_buf);
+
+static void update_key_fini(const struct lu_context *ctx,
+                           struct lu_context_key *key, void *data)
+{
+       struct update_thread_info *info = data;
+
+       if (info->uti_tur.tur_update_records != NULL)
+               OBD_FREE_LARGE(info->uti_tur.tur_update_records,
+                              info->uti_tur.tur_update_records_buf_size);
+       if (info->uti_tur.tur_update_params != NULL)
+               OBD_FREE_LARGE(info->uti_tur.tur_update_params,
+                              info->uti_tur.tur_update_params_buf_size);
+
+       OBD_FREE_PTR(info);
+}
+
+/* context key constructor/destructor: update_key_init, update_key_fini */
+LU_KEY_INIT(update, struct update_thread_info);
+/* context key: update_thread_key */
+LU_CONTEXT_KEY_DEFINE(update, LCT_MD_THREAD | LCT_MG_THREAD |
+                             LCT_DT_THREAD | LCT_TX_HANDLE | LCT_LOCAL);
+EXPORT_SYMBOL(update_thread_key);
+LU_KEY_INIT_GENERIC(update);
+
+void update_info_init(void)
+{
+       update_key_init_generic(&update_thread_key, NULL);
+       lu_context_key_register(&update_thread_key);
+}
+
+void update_info_fini(void)
+{
+       lu_context_key_degister(&update_thread_key);
+}
index b57fbe8..8ecf3a6 100644 (file)
@@ -53,6 +53,7 @@
 #include <lustre_update.h>
 #include <obd.h>
 #include <obd_class.h>
+#include <tgt_internal.h>
 
 /**
  * Create the top transaction.
@@ -85,8 +86,10 @@ top_trans_create(const struct lu_env *env, struct dt_device *master_dev)
        top_th->tt_magic = TOP_THANDLE_MAGIC;
        top_th->tt_master_sub_thandle = child_th;
        child_th->th_top = &top_th->tt_super;
-       INIT_LIST_HEAD(&top_th->tt_sub_thandle_list);
+
+       top_th->tt_update_records = NULL;
        top_th->tt_super.th_top = &top_th->tt_super;
+       INIT_LIST_HEAD(&top_th->tt_sub_thandle_list);
 
        return &top_th->tt_super;
 }
@@ -110,7 +113,11 @@ int top_trans_start(const struct lu_env *env, struct dt_device *master_dev,
        struct top_thandle      *top_th = container_of(th, struct top_thandle,
                                                       tt_super);
        struct sub_thandle      *lst;
-       int                     rc = 0;
+       int                     rc;
+
+       rc = check_and_prepare_update_record(env, th);
+       if (rc < 0)
+               return rc;
 
        LASSERT(top_th->tt_magic == TOP_THANDLE_MAGIC);
        list_for_each_entry(lst, &top_th->tt_sub_thandle_list, st_sub_list) {
@@ -148,11 +155,22 @@ int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev,
        struct sub_thandle      *lst;
        struct top_thandle      *top_th = container_of(th, struct top_thandle,
                                                       tt_super);
+       struct thandle_update_records *tur = top_th->tt_update_records;
        int                     rc;
        ENTRY;
 
        /* Note: we always need walk through all of sub_transaction to do
         * transaction stop to release the resource here */
+       if (tur != NULL) {
+               rc = merge_params_updates_buf(env, tur);
+               if (rc == 0) {
+                       struct update_records *record;
+
+                       record = &tur->tur_update_records->lur_update_rec;
+                       update_records_dump(record, D_INFO, false);
+               }
+       }
+
        LASSERT(top_th->tt_magic == TOP_THANDLE_MAGIC);
 
        top_th->tt_master_sub_thandle->th_local = th->th_local;
@@ -233,6 +251,7 @@ struct thandle *thandle_get_sub_by_dt(const struct lu_env *env,
        INIT_LIST_HEAD(&lst->st_sub_list);
        lst->st_sub_th = sub_th;
        list_add(&lst->st_sub_list, &top_th->tt_sub_thandle_list);
+       lst->st_record_update = 1;
 
        RETURN(sub_th);
 }