From ca5d15f3d58305cf6c5d05d8ec615eaf99e7b1d7 Mon Sep 17 00:00:00 2001 From: Wang Di Date: Mon, 11 Aug 2014 07:37:56 -0700 Subject: [PATCH] LU-3536 lod: record update for cross-MDT operation Packing updates for cross-MDT operation in the buffer, and the maximum update record size is about 1M, which is enough to create 2k stripes for now. To save the save, these update records will use different packing format with OSP RPC, see lustre/target/update_records.c These updates will be stored in all of MDTs(in the later patch). During the recovery, master MDT will redo the operation according to these updates records. Change-Id: Ic1919ab1c3d2eeca9ef027e2309c42201e3f7a74 Signed-off-by: Wang Di Reviewed-on: http://review.whamcloud.com/10939 Tested-by: Jenkins Tested-by: Maloo Reviewed-by: Lai Siyao Reviewed-by: Alex Zhuravlev Reviewed-by: Oleg Drokin --- lustre/include/lustre/lustre_idl.h | 30 +- lustre/include/lustre_update.h | 447 ++++++++++++++--- lustre/lod/lod_internal.h | 3 +- lustre/lod/lod_object.c | 18 +- lustre/lod/lod_sub_object.c | 141 +++++- lustre/osp/osp_internal.h | 42 ++ lustre/osp/osp_md_object.c | 127 +++-- lustre/osp/osp_object.c | 19 +- lustre/osp/osp_trans.c | 54 ++- lustre/ptlrpc/Makefile.in | 1 + lustre/target/Makefile.am | 1 + lustre/target/out_handler.c | 6 + lustre/target/out_lib.c | 336 ++++++------- lustre/target/tgt_internal.h | 27 ++ lustre/target/tgt_main.c | 3 + lustre/target/update_records.c | 965 +++++++++++++++++++++++++++++++++++++ lustre/target/update_trans.c | 23 +- 17 files changed, 1916 insertions(+), 327 deletions(-) create mode 100644 lustre/target/update_records.c diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 314dbd7..bf59646 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -3192,6 +3192,7 @@ typedef enum { CHANGELOG_REC = LLOG_OP_MAGIC | 0x60000, CHANGELOG_USER_REC = LLOG_OP_MAGIC | 0x70000, HSM_AGENT_REC = LLOG_OP_MAGIC | 0x80000, + UPDATE_REC = LLOG_OP_MAGIC | 0xa0000, LLOG_HDR_MAGIC = LLOG_OP_MAGIC | 0x45539, LLOG_LOGID_MAGIC = LLOG_OP_MAGIC | 0x4553b, } llog_op_type; @@ -3947,9 +3948,11 @@ extern void lustre_swab_hsm_request(struct hsm_request *hr); */ /** - * Type of each update + * Type of each update, if adding/deleting update, please also update + * update_opcode in lustre/target/out_lib.c. */ enum update_type { + OUT_START = 0, OUT_CREATE = 1, OUT_DESTROY = 2, OUT_REF_ADD = 3, @@ -4013,19 +4016,28 @@ void lustre_swab_object_update(struct object_update *ou); void lustre_swab_object_update_request(struct object_update_request *our); static inline size_t -object_update_size(const struct object_update *update) +object_update_params_size(const struct object_update *update) { - const struct object_update_param *param; - size_t size; - unsigned int i; + const struct object_update_param *param; + size_t total_size = 0; + unsigned int i; - size = offsetof(struct object_update, ou_params[0]); + param = &update->ou_params[0]; for (i = 0; i < update->ou_params_count; i++) { - param = (struct object_update_param *)((char *)update + size); - size += object_update_param_size(param); + size_t size = object_update_param_size(param); + + param = (struct object_update_param *)((char *)param + size); + total_size += size; } - return size; + return total_size; +} + +static inline size_t +object_update_size(const struct object_update *update) +{ + return offsetof(struct object_update, ou_params[0]) + + object_update_params_size(update); } static inline struct object_update * diff --git a/lustre/include/lustre_update.h b/lustre/include/lustre_update.h index 1ba3e34..996a9b3 100644 --- a/lustre/include/lustre_update.h +++ b/lustre/include/lustre_update.h @@ -38,32 +38,13 @@ struct dt_key; struct dt_rec; +struct object_update_param; struct update_buffer { struct object_update_request *ub_req; size_t ub_req_size; }; -#define TOP_THANDLE_MAGIC 0x20140917 -/* {top,sub}_thandle are used to manage distributed transactions which - * include updates on several nodes. A top_handle represents the - * whole operation, and sub_thandle represents updates on each node. */ -struct top_thandle { - struct thandle tt_super; - __u32 tt_magic; - /* The master sub transaction. */ - struct thandle *tt_master_sub_thandle; - - /* Other sub thandle will be listed here. */ - struct list_head tt_sub_thandle_list; -}; - -struct sub_thandle { - /* point to the osd/osp_thandle */ - struct thandle *st_sub_th; - struct list_head st_sub_list; -}; - /** * Tracking the updates being executed on this dt_device. */ @@ -80,6 +61,165 @@ struct dt_update_request { struct list_head dur_cb_items; }; +struct update_params { + struct object_update_param up_params[0]; +}; + +static inline size_t update_params_size(const struct update_params *params, + unsigned int param_count) +{ + struct object_update_param *param; + size_t total_size = sizeof(*params); + unsigned int i; + + param = (struct object_update_param *)¶ms->up_params[0]; + for (i = 0; i < param_count; i++) { + size_t size = object_update_param_size(param); + + param = (struct object_update_param *)((char *)param + size); + total_size += size; + } + + return total_size; +} + +static inline struct object_update_param * +update_params_get_param(const struct update_params *params, + unsigned int index, unsigned int param_count) +{ + struct object_update_param *param; + unsigned int i; + + if (index > param_count) + return NULL; + + param = (struct object_update_param *)¶ms->up_params[0]; + for (i = 0; i < index; i++) + param = (struct object_update_param *)((char *)param + + object_update_param_size(param)); + + return param; +} + +struct update_op { + struct lu_fid uop_fid; + __u16 uop_type; + __u16 uop_param_count; + __u16 uop_params_off[0]; +}; + +static inline size_t +update_op_size(unsigned int param_count) +{ + return offsetof(struct update_op, uop_params_off[param_count]); +} + +static inline struct update_op * +update_op_next_op(const struct update_op *uop) +{ + return (struct update_op *)((char *)uop + + update_op_size(uop->uop_param_count)); +} + +/* All of updates in the mulitple_update_record */ +struct update_ops { + struct update_op uops_op[0]; +}; + +static inline size_t update_ops_size(const struct update_ops *ops, + unsigned int update_count) +{ + struct update_op *op; + size_t total_size = sizeof(*ops); + unsigned int i; + + op = (struct update_op *)&ops->uops_op[0]; + for (i = 0; i < update_count; i++, op = update_op_next_op(op)) + total_size += update_op_size(op->uop_param_count); + + return total_size; +} + +/* + * This is the update record format used to store the updates in + * disk. All updates of the operation will be stored in ur_ops. + * All of parameters for updates of the operation will be stored + * in ur_params. + * To save the space of the record, parameters in ur_ops will only + * remember their offset in ur_params, so to avoid storing duplicate + * parameters in ur_params, which can help us save a lot space for + * operation like creating striped directory. + */ +struct update_records { + __u64 ur_master_transno; + __u64 ur_batchid; + __u32 ur_flags; + __u32 ur_param_count; + __u32 ur_update_count; + struct update_ops ur_ops; + /* Note ur_ops has a variable size, so comment out + * the following ur_params, in case some use it directly + * update_records->ur_params + * + * struct update_params ur_params; + */ +}; + +struct llog_update_record { + struct llog_rec_hdr lur_hdr; + struct update_records lur_update_rec; + /* Note ur_update_rec has a variable size, so comment out + * the following ur_tail, in case someone use it directly + * + * struct llog_rec_tail lur_tail; + */ +}; + +static inline struct update_params * +update_records_get_params(const struct update_records *record) +{ + return (struct update_params *)((char *)record + + offsetof(struct update_records, ur_ops) + + update_ops_size(&record->ur_ops, record->ur_update_count)); +} + +static inline size_t +update_records_size(const struct update_records *record) +{ + struct update_params *params; + + params = update_records_get_params(record); + + return cfs_size_round(offsetof(struct update_records, ur_ops) + + update_ops_size(&record->ur_ops, record->ur_update_count) + + update_params_size(params, record->ur_param_count)); +} + +static inline size_t +llog_update_record_size(const struct llog_update_record *lur) +{ + return cfs_size_round(sizeof(lur->lur_hdr) + + update_records_size(&lur->lur_update_rec) + + sizeof(struct llog_rec_tail)); +} + +static inline struct update_op * +update_ops_get_op(const struct update_ops *ops, unsigned int index, + unsigned int update_count) +{ + struct update_op *op; + unsigned int i; + + if (index > update_count) + return NULL; + + op = (struct update_op *)&ops->uops_op[0]; + for (i = 0; i < index; i++) + op = update_op_next_op(op); + + return op; +} + static inline void *object_update_param_get(const struct object_update *update, size_t index, size_t *size) @@ -176,53 +316,105 @@ object_update_result_data_get(const struct object_update_reply *reply, return 0; } -static inline void update_inc_batchid(struct dt_update_request *update) -{ - update->dur_batchid++; -} +/** + * Attached in the thandle to record the updates for distribute + * distribution. + */ +struct thandle_update_records { + /* All of updates for the cross-MDT operation. */ + struct llog_update_record *tur_update_records; + size_t tur_update_records_buf_size; + + /* All of parameters for the cross-MDT operation */ + struct update_params *tur_update_params; + unsigned int tur_update_param_count; + size_t tur_update_params_buf_size; +}; + +#define TOP_THANDLE_MAGIC 0x20140917 +/* {top,sub}_thandle are used to manage distributed transactions which + * include updates on several nodes. A top_handle represents the + * whole operation, and sub_thandle represents updates on each node. */ +struct top_thandle { + struct thandle tt_super; + __u32 tt_magic; + atomic_t tt_refcount; + /* The master sub transaction. */ + struct thandle *tt_master_sub_thandle; + + /* Other sub thandle will be listed here. */ + struct list_head tt_sub_thandle_list; + + /* All of update records will packed here */ + struct thandle_update_records *tt_update_records; +}; + +struct sub_thandle { + /* point to the osd/osp_thandle */ + struct thandle *st_sub_th; + + /* linked to top_thandle */ + struct list_head st_sub_list; + + /* If this sub thandle is committed */ + bool st_committed:1, + st_record_update:1; +}; + /* target/out_lib.c */ -int out_update_pack(const struct lu_env *env, struct update_buffer *ubuf, - enum update_type op, const struct lu_fid *fid, - int params_count, __u16 *param_sizes, const void **bufs, - __u64 batchid); -int out_create_pack(const struct lu_env *env, struct update_buffer *ubuf, - const struct lu_fid *fid, struct lu_attr *attr, - struct dt_allocation_hint *hint, - struct dt_object_format *dof, __u64 batchid); +int out_update_pack(const struct lu_env *env, struct object_update *update, + size_t max_update_size, enum update_type op, + const struct lu_fid *fid, unsigned int params_count, + __u16 *param_sizes, const void **param_bufs); +int out_create_pack(const struct lu_env *env, struct object_update *update, + size_t max_update_size, const struct lu_fid *fid, + const struct lu_attr *attr, struct dt_allocation_hint *hint, + struct dt_object_format *dof); int out_object_destroy_pack(const struct lu_env *env, - struct update_buffer *ubuf, - const struct lu_fid *fid, __u64 batchid); -int out_index_delete_pack(const struct lu_env *env, struct update_buffer *ubuf, - const struct lu_fid *fid, const struct dt_key *key, - __u64 batchid); -int out_index_insert_pack(const struct lu_env *env, struct update_buffer *ubuf, + struct object_update *update, + size_t max_update_size, + const struct lu_fid *fid); +int out_index_delete_pack(const struct lu_env *env, + struct object_update *update, size_t max_update_size, + const struct lu_fid *fid, const struct dt_key *key); +int out_index_insert_pack(const struct lu_env *env, + struct object_update *update, size_t max_update_size, const struct lu_fid *fid, const struct dt_rec *rec, - const struct dt_key *key, __u64 batchid); -int out_xattr_set_pack(const struct lu_env *env, struct update_buffer *ubuf, + const struct dt_key *key); +int out_xattr_set_pack(const struct lu_env *env, + struct object_update *update, size_t max_update_size, const struct lu_fid *fid, const struct lu_buf *buf, - const char *name, int flag, __u64 batchid); -int out_xattr_del_pack(const struct lu_env *env, struct update_buffer *ubuf, - const struct lu_fid *fid, const char *name, - __u64 batchid); -int out_attr_set_pack(const struct lu_env *env, struct update_buffer *ubuf, - const struct lu_fid *fid, const struct lu_attr *attr, - __u64 batchid); -int out_ref_add_pack(const struct lu_env *env, struct update_buffer *ubuf, - const struct lu_fid *fid, __u64 batchid); -int out_ref_del_pack(const struct lu_env *env, struct update_buffer *ubuf, - const struct lu_fid *fid, __u64 batchid); -int out_write_pack(const struct lu_env *env, struct update_buffer *ubuf, + const char *name, __u32 flag); +int out_xattr_del_pack(const struct lu_env *env, + struct object_update *update, size_t max_update_size, + const struct lu_fid *fid, const char *name); +int out_attr_set_pack(const struct lu_env *env, + struct object_update *update, size_t max_update_size, + const struct lu_fid *fid, const struct lu_attr *attr); +int out_ref_add_pack(const struct lu_env *env, + struct object_update *update, size_t max_update_size, + const struct lu_fid *fid); +int out_ref_del_pack(const struct lu_env *env, + struct object_update *update, size_t max_update_size, + const struct lu_fid *fid); +int out_write_pack(const struct lu_env *env, + struct object_update *update, size_t max_update_size, const struct lu_fid *fid, const struct lu_buf *buf, - loff_t pos, __u64 batchid); -int out_attr_get_pack(const struct lu_env *env, struct update_buffer *ubuf, + __u64 pos); +int out_attr_get_pack(const struct lu_env *env, + struct object_update *update, size_t max_update_size, const struct lu_fid *fid); -int out_index_lookup_pack(const struct lu_env *env, struct update_buffer *ubuf, +int out_index_lookup_pack(const struct lu_env *env, + struct object_update *update, size_t max_update_size, const struct lu_fid *fid, struct dt_rec *rec, const struct dt_key *key); -int out_xattr_get_pack(const struct lu_env *env, struct update_buffer *ubuf, +int out_xattr_get_pack(const struct lu_env *env, + struct object_update *update, size_t max_update_size, const struct lu_fid *fid, const char *name); +const char *update_op_str(__u16 opcode); + /* target/update_trans.c */ struct thandle *thandle_get_sub_by_dt(const struct lu_env *env, struct thandle *th, @@ -245,4 +437,147 @@ int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev, struct thandle *th); void top_thandle_destroy(struct top_thandle *top_th); + +/* update_records.c */ +int update_records_create_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const struct lu_attr *attr, + const struct dt_allocation_hint *hint, + struct dt_object_format *dof); +int update_records_attr_set_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const struct lu_attr *attr); +int update_records_ref_add_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid); +int update_records_ref_del_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid); +int update_records_object_destroy_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid); +int update_records_index_insert_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const struct dt_rec *rec, + const struct dt_key *key); +int update_records_index_delete_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const struct dt_key *key); +int update_records_xattr_set_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const struct lu_buf *buf, const char *name, + __u32 flag); +int update_records_xattr_del_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const char *name); +int update_records_write_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const struct lu_buf *buf, + __u64 pos); + +int tur_update_records_extend(struct thandle_update_records *tur, + size_t new_size); +int tur_update_params_extend(struct thandle_update_records *tur, + size_t new_size); +int check_and_prepare_update_record(const struct lu_env *env, + struct thandle *th); +int merge_params_updates_buf(const struct lu_env *env, + struct thandle_update_records *tur); +int tur_update_extend(struct thandle_update_records *tur, + size_t new_op_size, size_t new_param_size); + +#define update_record_pack(name, th, ...) \ +({ \ + struct top_thandle *top_th; \ + struct thandle_update_records *tur; \ + struct llog_update_record *lur; \ + size_t avail_param_size; \ + size_t avail_op_size; \ + int ret; \ + \ + while (1) { \ + top_th = container_of(th, struct top_thandle, tt_super);\ + tur = top_th->tt_update_records; \ + lur = tur->tur_update_records; \ + avail_param_size = tur->tur_update_params_buf_size - \ + update_params_size(tur->tur_update_params, \ + tur->tur_update_param_count); \ + avail_op_size = tur->tur_update_records_buf_size - \ + llog_update_record_size(lur); \ + ret = update_records_##name##_pack(env, \ + &lur->lur_update_rec.ur_ops, \ + &lur->lur_update_rec.ur_update_count, \ + &avail_op_size, \ + tur->tur_update_params, \ + &tur->tur_update_param_count, \ + &avail_param_size, __VA_ARGS__); \ + if (ret == -E2BIG) { \ + ret = tur_update_extend(tur, avail_op_size, \ + avail_param_size); \ + if (ret != 0) \ + break; \ + continue; \ + } else { \ + break; \ + } \ + } \ + ret; \ +}) #endif diff --git a/lustre/lod/lod_internal.h b/lustre/lod/lod_internal.h index 2830b83..b74f714 100644 --- a/lustre/lod/lod_internal.h +++ b/lustre/lod/lod_internal.h @@ -485,7 +485,8 @@ void lod_object_free_striping(const struct lu_env *env, struct lod_object *lo); /* lod_sub_object.c */ struct thandle *lod_sub_get_thandle(const struct lu_env *env, struct thandle *th, - const struct dt_object *sub_obj); + const struct dt_object *sub_obj, + bool *record_update); int lod_sub_object_declare_create(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr, diff --git a/lustre/lod/lod_object.c b/lustre/lod/lod_object.c index 642cda8..61dbede 100644 --- a/lustre/lod/lod_object.c +++ b/lustre/lod/lod_object.c @@ -1622,19 +1622,18 @@ out: } /** - * Create a striped directory. + * Declare create a striped directory. * - * Create a striped directory with a given stripe pattern on the specified MDTs. - * A striped directory is represented as a regular directory - an index listing - * all the stripes. The stripes point back to the master object with ".." and - * LinkEA. The master object gets LMV EA which identifies it as a striped - * directory. The function allocates FIDs for all the stripes. + * Declare creating a striped directory with a given stripe pattern on the + * specified MDTs. A striped directory is represented as a regular directory + * - an index listing all the stripes. The stripes point back to the master + * object with ".." and LinkEA. The master object gets LMV EA which + * identifies it as a striped directory. The function allocates FIDs + * for all stripes. * * \param[in] env execution environment * \param[in] dt object * \param[in] attr attributes to initialize the objects with - * \param[in] lum a pattern specifying the number of stripes and - * MDT to start from * \param[in] dof type of objects to be created * \param[in] th transaction handle * @@ -2143,7 +2142,8 @@ static void lod_lov_stripe_cache_clear(struct lod_object *lo) static int lod_xattr_set_internal(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, - const char *name, int fl, struct thandle *th) + const char *name, int fl, + struct thandle *th) { struct dt_object *next = dt_object_child(dt); struct lod_object *lo = lod_dt_obj(dt); diff --git a/lustre/lod/lod_sub_object.c b/lustre/lod/lod_sub_object.c index cce804f..17b9f07 100644 --- a/lustre/lod/lod_sub_object.c +++ b/lustre/lod/lod_sub_object.c @@ -55,7 +55,8 @@ struct thandle *lod_sub_get_thandle(const struct lu_env *env, struct thandle *th, - const struct dt_object *sub_obj) + const struct dt_object *sub_obj, + bool *record_update) { struct lod_device *lod = dt2lod_dev(th->th_dev); struct top_thandle *tth; @@ -65,6 +66,9 @@ struct thandle *lod_sub_get_thandle(const struct lu_env *env, int rc; ENTRY; + if (record_update != NULL) + *record_update = false; + if (th->th_top == NULL) RETURN(th); @@ -74,8 +78,15 @@ struct thandle *lod_sub_get_thandle(const struct lu_env *env, * creation, FID is not assigned until osp_object_create(), * so if the FID of sub_obj is zero, it means OST object. */ if (!dt_object_remote(sub_obj) || - fid_is_zero(lu_object_fid(&sub_obj->do_lu))) + fid_is_zero(lu_object_fid(&sub_obj->do_lu))) { + /* local MDT object */ + if (fid_is_sane(lu_object_fid(&sub_obj->do_lu)) && + tth->tt_update_records != NULL && + record_update != NULL) + *record_update = true; + RETURN(tth->tt_master_sub_thandle); + } rc = lod_fld_lookup(env, lod, lu_object_fid(&sub_obj->do_lu), &mdt_index, &type); @@ -85,6 +96,9 @@ struct thandle *lod_sub_get_thandle(const struct lu_env *env, if (type == LU_SEQ_RANGE_OST) RETURN(tth->tt_master_sub_thandle); + if (tth->tt_update_records != NULL && record_update != NULL) + *record_update = true; + sub_th = thandle_get_sub(env, th, sub_obj); RETURN(sub_th); @@ -114,7 +128,7 @@ int lod_sub_object_declare_create(const struct lu_env *env, { struct thandle *sub_th; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, NULL); if (IS_ERR(sub_th)) return PTR_ERR(sub_th); @@ -144,13 +158,22 @@ int lod_sub_object_create(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { struct thandle *sub_th; + bool record_update; int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); + if (record_update) { + rc = update_record_pack(create, th, + lu_object_fid(&dt->do_lu), + attr, hint, dof); + if (rc < 0) + RETURN(rc); + } + rc = dt_create(env, dt, attr, hint, dof, sub_th); RETURN(rc); @@ -176,7 +199,7 @@ int lod_sub_object_declare_ref_add(const struct lu_env *env, int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, NULL); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); @@ -202,13 +225,21 @@ int lod_sub_object_ref_add(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { struct thandle *sub_th; + bool record_update; int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); + if (record_update) { + rc = update_record_pack(ref_add, th, + lu_object_fid(&dt->do_lu)); + if (rc < 0) + RETURN(rc); + } + rc = dt_ref_add(env, dt, sub_th); RETURN(rc); @@ -234,7 +265,7 @@ int lod_sub_object_declare_ref_del(const struct lu_env *env, int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, NULL); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); @@ -260,13 +291,21 @@ int lod_sub_object_ref_del(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { struct thandle *sub_th; + bool record_update; int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); + if (record_update) { + rc = update_record_pack(ref_del, th, + lu_object_fid(&dt->do_lu)); + if (rc < 0) + RETURN(rc); + } + rc = dt_ref_del(env, dt, sub_th); RETURN(rc); @@ -292,7 +331,7 @@ int lod_sub_object_declare_destroy(const struct lu_env *env, int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, NULL); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); @@ -318,13 +357,21 @@ int lod_sub_object_destroy(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { struct thandle *sub_th; + bool record_update; int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); + if (record_update) { + rc = update_record_pack(object_destroy, th, + lu_object_fid(&dt->do_lu)); + if (rc < 0) + RETURN(rc); + } + rc = dt_destroy(env, dt, sub_th); RETURN(rc); @@ -352,7 +399,7 @@ int lod_sub_object_declare_insert(const struct lu_env *env, { struct thandle *sub_th; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, NULL); if (IS_ERR(sub_th)) return PTR_ERR(sub_th); @@ -381,11 +428,20 @@ int lod_sub_object_index_insert(const struct lu_env *env, struct dt_object *dt, int ign) { struct thandle *sub_th; + int rc; + bool record_update; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) return PTR_ERR(sub_th); + if (record_update) { + rc = update_record_pack(index_insert, th, + lu_object_fid(&dt->do_lu), rec, key); + if (rc < 0) + return rc; + } + return dt_insert(env, dt, rec, key, sub_th, ign); } @@ -409,7 +465,7 @@ int lod_sub_object_declare_delete(const struct lu_env *env, { struct thandle *sub_th; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, NULL); if (IS_ERR(sub_th)) return PTR_ERR(sub_th); @@ -434,13 +490,21 @@ int lod_sub_object_delete(const struct lu_env *env, struct dt_object *dt, const struct dt_key *name, struct thandle *th) { struct thandle *sub_th; + bool record_update; int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); + if (record_update) { + rc = update_record_pack(index_delete, th, + lu_object_fid(&dt->do_lu), name); + if (rc < 0) + RETURN(rc); + } + rc = dt_delete(env, dt, name, sub_th); RETURN(rc); } @@ -469,7 +533,7 @@ int lod_sub_object_declare_xattr_set(const struct lu_env *env, int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, NULL); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); @@ -499,13 +563,22 @@ int lod_sub_object_xattr_set(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { struct thandle *sub_th; + bool record_update; int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); + if (record_update) { + rc = update_record_pack(xattr_set, th, + lu_object_fid(&dt->do_lu), + buf, name, fl); + if (rc < 0) + RETURN(rc); + } + rc = dt_xattr_set(env, dt, buf, name, fl, sub_th); RETURN(rc); @@ -533,7 +606,7 @@ int lod_sub_object_declare_attr_set(const struct lu_env *env, int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, NULL); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); @@ -561,14 +634,22 @@ int lod_sub_object_attr_set(const struct lu_env *env, const struct lu_attr *attr, struct thandle *th) { + bool record_update; struct thandle *sub_th; int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); + if (record_update) { + rc = update_record_pack(attr_set, th, lu_object_fid(&dt->do_lu), + attr); + if (rc < 0) + RETURN(rc); + } + rc = dt_attr_set(env, dt, attr, sub_th); RETURN(rc); @@ -596,7 +677,7 @@ int lod_sub_object_declare_xattr_del(const struct lu_env *env, int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, NULL); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); @@ -625,13 +706,21 @@ int lod_sub_object_xattr_del(const struct lu_env *env, struct thandle *th) { struct thandle *sub_th; + bool record_update; int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); + if (record_update) { + rc = update_record_pack(xattr_del, th, + lu_object_fid(&dt->do_lu), name); + if (rc < 0) + RETURN(rc); + } + rc = dt_xattr_del(env, dt, name, sub_th); RETURN(rc); @@ -660,7 +749,7 @@ int lod_sub_object_declare_write(const struct lu_env *env, int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, NULL); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); @@ -690,13 +779,21 @@ ssize_t lod_sub_object_write(const struct lu_env *env, struct dt_object *dt, struct thandle *th, int rq) { struct thandle *sub_th; + bool record_update; ssize_t rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); + if (record_update) { + rc = update_record_pack(write, th, lu_object_fid(&dt->do_lu), + buf, *pos); + if (rc < 0) + RETURN(rc); + } + rc = dt_write(env, dt, buf, pos, sub_th, rq); RETURN(rc); } diff --git a/lustre/osp/osp_internal.h b/lustre/osp/osp_internal.h index fd2d41d..1d6db3c 100644 --- a/lustre/osp/osp_internal.h +++ b/lustre/osp/osp_internal.h @@ -535,6 +535,48 @@ static inline int osp_is_fid_client(struct osp_device *osp) return imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_FID; } +struct object_update * +update_buffer_get_update(struct object_update_request *request, + unsigned int index); + +int osp_extend_update_buffer(const struct lu_env *env, + struct update_buffer *ubuf); + +#define osp_update_rpc_pack(env, name, update, op, ...) \ +({ \ + struct object_update *object_update; \ + size_t max_update_length; \ + struct object_update_request *ureq; \ + int ret; \ + \ + while (1) { \ + ureq = update->dur_buf.ub_req; \ + max_update_length = update->dur_buf.ub_req_size - \ + object_update_request_size(ureq); \ + \ + object_update = update_buffer_get_update(ureq, \ + ureq->ourq_count); \ + ret = out_##name##_pack(env, object_update, max_update_length, \ + __VA_ARGS__); \ + if (ret == -E2BIG) { \ + int rc1; \ + /* extend the buffer and retry */ \ + rc1 = osp_extend_update_buffer(env, &update->dur_buf); \ + if (rc1 != 0) { \ + ret = rc1; \ + break; \ + } \ + } else { \ + if (ret == 0) { \ + object_update->ou_flags |= update->dur_flags; \ + ureq->ourq_count++; \ + } \ + break; \ + } \ + } \ + ret; \ +}) + typedef int (*osp_update_interpreter_t)(const struct lu_env *env, struct object_update_reply *rep, struct ptlrpc_request *req, diff --git a/lustre/osp/osp_md_object.c b/lustre/osp/osp_md_object.c index fa86222..47ec90a 100644 --- a/lustre/osp/osp_md_object.c +++ b/lustre/osp/osp_md_object.c @@ -56,8 +56,8 @@ #include #include "osp_internal.h" -static const char dot[] = "."; -static const char dotdot[] = ".."; +#define OUT_UPDATE_BUFFER_SIZE_ADD 4096 +#define OUT_UPDATE_BUFFER_SIZE_MAX (256 * 4096) /* 1M update size now */ /** * Interpreter call for object creation @@ -115,6 +115,47 @@ int osp_md_declare_object_create(const struct lu_env *env, return osp_trans_update_request_create(th); } +struct object_update * +update_buffer_get_update(struct object_update_request *request, + unsigned int index) +{ + void *ptr; + int i; + + if (index > request->ourq_count) + return NULL; + + ptr = &request->ourq_updates[0]; + for (i = 0; i < index; i++) + ptr += object_update_size(ptr); + + return ptr; +} + +int osp_extend_update_buffer(const struct lu_env *env, + struct update_buffer *ubuf) +{ + struct object_update_request *ureq; + size_t new_size = ubuf->ub_req_size + OUT_UPDATE_BUFFER_SIZE_ADD; + + /* enlarge object update request size */ + if (new_size > OUT_UPDATE_BUFFER_SIZE_MAX) + return -E2BIG; + + OBD_ALLOC_LARGE(ureq, new_size); + if (ureq == NULL) + return -ENOMEM; + + memcpy(ureq, ubuf->ub_req, ubuf->ub_req_size); + + OBD_FREE_LARGE(ubuf->ub_req, ubuf->ub_req_size); + + ubuf->ub_req = ureq; + ubuf->ub_req_size = new_size; + + return 0; +} + /** * Implementation of dt_object_operations::do_create * @@ -141,13 +182,12 @@ int osp_md_object_create(const struct lu_env *env, struct dt_object *dt, update = thandle_to_dt_update_request(th); LASSERT(update != NULL); - rc = out_create_pack(env, &update->dur_buf, - lu_object_fid(&dt->do_lu), attr, hint, dof, - update->dur_batchid); + rc = osp_update_rpc_pack(env, create, update, OUT_CREATE, + lu_object_fid(&dt->do_lu), attr, hint, dof); if (rc != 0) GOTO(out, rc); - rc = osp_insert_update_callback(env, update, dt2osp_obj(dt), attr, + rc = osp_insert_update_callback(env, update, dt2osp_obj(dt), NULL, osp_object_create_interpreter); if (rc < 0) @@ -200,9 +240,8 @@ static int osp_md_ref_del(const struct lu_env *env, struct dt_object *dt, update = thandle_to_dt_update_request(th); LASSERT(update != NULL); - rc = out_ref_del_pack(env, &update->dur_buf, - lu_object_fid(&dt->do_lu), - update->dur_batchid); + rc = osp_update_rpc_pack(env, ref_del, update, OUT_REF_DEL, + lu_object_fid(&dt->do_lu)); return rc; } @@ -247,9 +286,8 @@ static int osp_md_ref_add(const struct lu_env *env, struct dt_object *dt, update = thandle_to_dt_update_request(th); LASSERT(update != NULL); - rc = out_ref_add_pack(env, &update->dur_buf, - lu_object_fid(&dt->do_lu), - update->dur_batchid); + rc = osp_update_rpc_pack(env, ref_add, update, OUT_REF_ADD, + lu_object_fid(&dt->do_lu)); return rc; } @@ -324,10 +362,8 @@ int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt, update = thandle_to_dt_update_request(th); LASSERT(update != NULL); - rc = out_attr_set_pack(env, &update->dur_buf, - lu_object_fid(&dt->do_lu), attr, - update->dur_batchid); - + rc = osp_update_rpc_pack(env, attr_set, update, OUT_ATTR_SET, + lu_object_fid(&dt->do_lu), attr); return rc; } @@ -459,8 +495,8 @@ static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt, if (IS_ERR(update)) RETURN(PTR_ERR(update)); - rc = out_index_lookup_pack(env, &update->dur_buf, - lu_object_fid(&dt->do_lu), rec, key); + rc = osp_update_rpc_pack(env, index_lookup, update, OUT_INDEX_LOOKUP, + lu_object_fid(&dt->do_lu), rec, key); if (rc != 0) { CERROR("%s: Insert update error: rc = %d\n", dt_dev->dd_lu_dev.ld_obd->obd_name, rc); @@ -567,11 +603,8 @@ static int osp_md_index_insert(const struct lu_env *env, struct dt_update_request *update = oth->ot_dur; int rc; - - rc = out_index_insert_pack(env, &update->dur_buf, - lu_object_fid(&dt->do_lu), rec, key, - update->dur_batchid); - + rc = osp_update_rpc_pack(env, index_insert, update, OUT_INDEX_INSERT, + lu_object_fid(&dt->do_lu), rec, key); return rc; } @@ -622,9 +655,9 @@ static int osp_md_index_delete(const struct lu_env *env, update = thandle_to_dt_update_request(th); LASSERT(update != NULL); - rc = out_index_delete_pack(env, &update->dur_buf, - lu_object_fid(&dt->do_lu), key, - update->dur_batchid); + rc = osp_update_rpc_pack(env, index_delete, update, OUT_INDEX_DELETE, + lu_object_fid(&dt->do_lu), key); + return rc; } @@ -919,6 +952,35 @@ int osp_md_declare_object_destroy(const struct lu_env *env, } /** + * Interpreter call for object destroy + * + * Object destroy interpreter, which will be called after destroying + * the remote object to set flags and status. + * + * \param[in] env execution environment + * \param[in] reply update reply + * \param[in] req ptlrpc update request for destroying object + * \param[in] obj object to be destroyed + * \param[in] data data used in this function. + * \param[in] index index(position) of destroy update in the whole + * updates + * \param[in] rc update result on the remote MDT. + * + * \retval only return 0 for now + */ +static int osp_md_object_destroy_interpreter(const struct lu_env *env, + struct object_update_reply *reply, + struct ptlrpc_request *req, + struct osp_object *obj, + void *data, int index, int rc) +{ + /* not needed in cache any more */ + set_bit(LU_OBJECT_HEARD_BANSHEE, + &obj->opo_obj.do_lu.lo_header->loh_flags); + return 0; +} + +/** * Implement OSP layer dt_object_operations::do_destroy() interface. * * Pack the destroy update into the RPC buffer, which will be sent @@ -948,14 +1010,13 @@ int osp_md_object_destroy(const struct lu_env *env, struct dt_object *dt, update = thandle_to_dt_update_request(th); LASSERT(update != NULL); - rc = out_object_destroy_pack(env, &update->dur_buf, - lu_object_fid(&dt->do_lu), update->dur_batchid); + rc = osp_update_rpc_pack(env, object_destroy, update, OUT_DESTROY, + lu_object_fid(&dt->do_lu)); if (rc != 0) RETURN(rc); - /* not needed in cache any more */ - set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags); - + rc = osp_insert_update_callback(env, update, dt2osp_obj(dt), NULL, + osp_md_object_destroy_interpreter); RETURN(rc); } @@ -1036,8 +1097,8 @@ static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt, update = thandle_to_dt_update_request(th); LASSERT(update != NULL); - rc = out_write_pack(env, &update->dur_buf, lu_object_fid(&dt->do_lu), - buf, *pos, update->dur_batchid); + rc = osp_update_rpc_pack(env, write, update, OUT_WRITE, + lu_object_fid(&dt->do_lu), buf, *pos); if (rc < 0) return rc; diff --git a/lustre/osp/osp_object.c b/lustre/osp/osp_object.c index 824564f..1ea3461 100644 --- a/lustre/osp/osp_object.c +++ b/lustre/osp/osp_object.c @@ -565,8 +565,8 @@ int osp_attr_get(const struct lu_env *env, struct dt_object *dt, if (IS_ERR(update)) RETURN(PTR_ERR(update)); - rc = out_attr_get_pack(env, &update->dur_buf, - lu_object_fid(&dt->do_lu)); + rc = osp_update_rpc_pack(env, attr_get, update, OUT_ATTR_GET, + lu_object_fid(&dt->do_lu)); if (rc != 0) { CERROR("%s: Insert update error "DFID": rc = %d\n", dev->dd_lu_dev.ld_obd->obd_name, @@ -963,8 +963,8 @@ unlock: if (IS_ERR(update)) GOTO(out, rc = PTR_ERR(update)); - rc = out_xattr_get_pack(env, &update->dur_buf, - lu_object_fid(&dt->do_lu), name); + rc = osp_update_rpc_pack(env, xattr_get, update, OUT_XATTR_GET, + lu_object_fid(&dt->do_lu), name); if (rc != 0) { CERROR("%s: Insert update error "DFID": rc = %d\n", dname, PFID(lu_object_fid(&dt->do_lu)), rc); @@ -1152,11 +1152,10 @@ int osp_xattr_set(const struct lu_env *env, struct dt_object *dt, CDEBUG(D_INODE, DFID" set xattr '%s' with size %zd\n", PFID(lu_object_fid(&dt->do_lu)), name, buf->lb_len); - rc = out_xattr_set_pack(env, &update->dur_buf, - lu_object_fid(&dt->do_lu), - buf, name, fl, update->dur_batchid); + rc = osp_update_rpc_pack(env, xattr_set, update, OUT_XATTR_SET, + lu_object_fid(&dt->do_lu), buf, name, fl); if (rc != 0 || o->opo_ooa == NULL) - return rc; + RETURN(rc); oxe = osp_oac_xattr_find_or_add(o, name, buf->lb_len); if (oxe == NULL) { @@ -1255,8 +1254,8 @@ int osp_xattr_del(const struct lu_env *env, struct dt_object *dt, update = thandle_to_dt_update_request(th); LASSERT(update != NULL); - rc = out_xattr_del_pack(env, &update->dur_buf, fid, name, - update->dur_batchid); + rc = osp_update_rpc_pack(env, xattr_del, update, OUT_XATTR_DEL, + fid, name); if (rc != 0 || o->opo_ooa == NULL) return rc; diff --git a/lustre/osp/osp_trans.c b/lustre/osp/osp_trans.c index 9d643cb..66af5c3 100644 --- a/lustre/osp/osp_trans.c +++ b/lustre/osp/osp_trans.c @@ -168,6 +168,33 @@ void dt_update_request_destroy(struct dt_update_request *dt_update) OBD_FREE_PTR(dt_update); } +static void +object_update_request_dump(const struct object_update_request *ourq, + unsigned int mask) +{ + unsigned int i; + size_t total_size = 0; + + for (i = 0; i < ourq->ourq_count; i++) { + struct object_update *update; + size_t size = 0; + + update = object_update_request_get(ourq, i, &size); + LASSERT(update != NULL); + CDEBUG(mask, "i = %u fid = "DFID" op = %s master = %u" + "params = %d batchid = "LPU64" size = %zu\n", + i, PFID(&update->ou_fid), + update_op_str(update->ou_type), + update->ou_master_index, update->ou_params_count, + update->ou_batchid, size); + + total_size += size; + } + + CDEBUG(mask, "updates = %p magic = %x count = %d size = %zu\n", ourq, + ourq->ourq_magic, ourq->ourq_count, total_size); +} + /** * Allocate an osp request and initialize it with the given parameters. * @@ -428,8 +455,11 @@ int osp_insert_async_request(const struct lu_env *env, enum update_type op, osp_update_interpreter_t interpreter) { struct osp_device *osp = lu2osp_dev(osp2lu_obj(obj)->lo_dev); - struct dt_update_request *update; - int rc = 0; + struct dt_update_request *update; + struct object_update *object_update; + size_t max_update_size; + struct object_update_request *ureq; + int rc = 0; ENTRY; update = osp_find_or_create_async_update_request(osp); @@ -437,10 +467,14 @@ int osp_insert_async_request(const struct lu_env *env, enum update_type op, RETURN(PTR_ERR(update)); again: + ureq = update->dur_buf.ub_req; + max_update_size = update->dur_buf.ub_req_size - + object_update_request_size(ureq); + + object_update = update_buffer_get_update(ureq, ureq->ourq_count); + rc = out_update_pack(env, object_update, max_update_size, op, + lu_object_fid(osp2lu_obj(obj)), count, lens, bufs); /* The queue is full. */ - rc = out_update_pack(env, &update->dur_buf, op, - lu_object_fid(osp2lu_obj(obj)), count, lens, bufs, - 0); if (rc == -E2BIG) { osp->opd_async_requests = NULL; mutex_unlock(&osp->opd_async_requests_mutex); @@ -455,6 +489,11 @@ again: RETURN(PTR_ERR(update)); goto again; + } else { + if (rc < 0) + RETURN(rc); + + ureq->ourq_count++; } rc = osp_insert_update_callback(env, update, obj, data, interpreter); @@ -476,9 +515,13 @@ int osp_trans_update_request_create(struct thandle *th) return PTR_ERR(update); } + if (dt2osp_dev(th->th_dev)->opd_connect_mdt) + update->dur_flags = UPDATE_FL_SYNC; + oth->ot_dur = update; return 0; } + /** * The OSP layer dt_device_operations::dt_trans_create() interface * to create a transaction. @@ -547,6 +590,7 @@ int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp, int rc; ENTRY; + object_update_request_dump(ureq, D_INFO); req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE); if (req == NULL) RETURN(-ENOMEM); diff --git a/lustre/ptlrpc/Makefile.in b/lustre/ptlrpc/Makefile.in index cb47a53..40f4914 100644 --- a/lustre/ptlrpc/Makefile.in +++ b/lustre/ptlrpc/Makefile.in @@ -12,6 +12,7 @@ ldlm_objs += $(LDLM)ldlm_pool.o $(LDLM)interval_tree.o target_objs := $(TARGET)tgt_main.o $(TARGET)tgt_lastrcvd.o target_objs += $(TARGET)tgt_handler.o $(TARGET)out_handler.o target_objs += $(TARGET)out_lib.o $(TARGET)update_trans.o +target_objs += $(TARGET)update_records.o ptlrpc_objs := client.o recover.o connection.o niobuf.o pack_generic.o ptlrpc_objs += events.o ptlrpc_module.o service.o pinger.o diff --git a/lustre/target/Makefile.am b/lustre/target/Makefile.am index eaf3957..924e3c0 100644 --- a/lustre/target/Makefile.am +++ b/lustre/target/Makefile.am @@ -34,3 +34,4 @@ MOSTLYCLEANFILES := @MOSTLYCLEANFILES@ EXTRA_DIST = tgt_main.c tgt_lastrcvd.c tgt_handler.c tgt_internal.h \ out_handler.c out_lib.c EXTRA_DIST += update_trans.c +EXTRA_DIST += update_records.c diff --git a/lustre/target/out_handler.c b/lustre/target/out_handler.c index 5a00dfb..68439f1 100644 --- a/lustre/target/out_handler.c +++ b/lustre/target/out_handler.c @@ -1625,6 +1625,9 @@ int out_handle(struct tgt_session_info *tsi) rc = out_tx_start(env, dt, ta, tsi->tsi_exp); if (rc != 0) GOTO(next, rc); + + if (update->ou_flags & UPDATE_FL_SYNC) + ta->ta_handle->th_sync = 1; } /* Stop the current update transaction, if the update has @@ -1642,6 +1645,9 @@ int out_handle(struct tgt_session_info *tsi) if (rc != 0) GOTO(next, rc); + if (update->ou_flags & UPDATE_FL_SYNC) + ta->ta_handle->th_sync = 1; + current_batchid = update->ou_batchid; } } diff --git a/lustre/target/out_lib.c b/lustre/target/out_lib.c index a6d6b36..8db03c1 100644 --- a/lustre/target/out_lib.c +++ b/lustre/target/out_lib.c @@ -38,148 +38,114 @@ #define OUT_UPDATE_BUFFER_SIZE_ADD 4096 #define OUT_UPDATE_BUFFER_SIZE_MAX (256 * 4096) /* 1MB update size now */ -/** - * resize update buffer - * - * Extend the update buffer by new_size. - * - * \param[in] ubuf update buffer to be extended - * \param[in] new_size new size of the update buffer - * - * \retval 0 if extending succeeds. - * \retval negative errno if extending fails. - */ -static int update_buffer_resize(struct update_buffer *ubuf, size_t new_size) -{ - struct object_update_request *ureq; - - if (new_size > ubuf->ub_req_size) - return 0; - - OBD_ALLOC_LARGE(ureq, new_size); - if (ureq == NULL) - return -ENOMEM; - - memcpy(ureq, ubuf->ub_req, ubuf->ub_req_size); - OBD_FREE_LARGE(ubuf->ub_req, ubuf->ub_req_size); - - ubuf->ub_req = ureq; - ubuf->ub_req_size = new_size; - - return 0; +const char *update_op_str(__u16 opc) +{ + static const char *opc_str[] = { + [OUT_START] = "start", + [OUT_CREATE] = "create", + [OUT_DESTROY] = "destroy", + [OUT_REF_ADD] = "ref_add", + [OUT_REF_DEL] = "ref_del" , + [OUT_ATTR_SET] = "attr_set", + [OUT_ATTR_GET] = "attr_get", + [OUT_XATTR_SET] = "xattr_set", + [OUT_XATTR_GET] = "xattr_get", + [OUT_INDEX_LOOKUP] = "lookup", + [OUT_INDEX_INSERT] = "insert", + [OUT_INDEX_DELETE] = "delete", + [OUT_WRITE] = "write", + [OUT_XATTR_DEL] = "xattr_del", + }; + + if (opc < ARRAY_SIZE(opc_str) && opc_str[opc] != NULL) + return opc_str[opc]; + else + return "unknown"; } +EXPORT_SYMBOL(update_op_str); /** - * Pack the header of object_update_request + * Fill object update header * - * Packs updates into the update_buffer header, which will either be sent to - * the remote MDT or stored in the local update log. The maximum update buffer - * size is 1MB for now. + * Only fill the object update header, and parameters will be filled later + * in other functions. * - * \param[in] env execution environment - * \param[in] ubuf update bufer which it will pack the update in - * \param[in] op update operation - * \param[in] fid object FID for this update - * \param[in] param_count parameters count for this update - * \param[in] lens each parameters length of this update - * \param[in] batchid batchid(transaction no) of this update + * \params[in] env execution environment + * \params[in] update object update to be filled + * \params[in] max_update_size maximum object update size, if the current + * update length equals or exceeds the size, it + * will return -E2BIG. + * \params[in] update_op update type + * \params[in] fid object FID of the update + * \params[in] params_count the count of the update parameters + * \params[in] params_sizes the length of each parameters * - * \retval 0 pack update succeed. - * \retval negative errno pack update failed. - **/ -static struct object_update * -out_update_header_pack(const struct lu_env *env, struct update_buffer *ubuf, - enum update_type op, const struct lu_fid *fid, - int params_count, __u16 *param_sizes, __u64 batchid) + * \retval 0 if packing succeeds. + * \retval -E2BIG if packing exceeds the maximum length. + */ +int out_update_header_pack(const struct lu_env *env, + struct object_update *update, size_t max_update_size, + enum update_type update_op, const struct lu_fid *fid, + unsigned int param_count, __u16 *params_sizes) { - struct object_update_request *ureq = ubuf->ub_req; - size_t ureq_size = ubuf->ub_req_size; - struct object_update *obj_update; struct object_update_param *param; - size_t update_size; - int rc = 0; unsigned int i; - ENTRY; - - /* Check update size to make sure it can fit into the buffer */ - ureq_size = object_update_request_size(ureq); - update_size = offsetof(struct object_update, ou_params[0]); - for (i = 0; i < params_count; i++) - update_size += cfs_size_round(param_sizes[i] + sizeof(*param)); - - if (unlikely(cfs_size_round(ureq_size + update_size) > - ubuf->ub_req_size)) { - size_t new_size = ubuf->ub_req_size; - - /* enlarge object update request size */ - while (new_size < - cfs_size_round(ureq_size + update_size)) - new_size += OUT_UPDATE_BUFFER_SIZE_ADD; - if (new_size >= OUT_UPDATE_BUFFER_SIZE_MAX) - RETURN(ERR_PTR(-E2BIG)); + size_t update_size; - rc = update_buffer_resize(ubuf, new_size); - if (rc < 0) - RETURN(ERR_PTR(rc)); + /* Check whether the packing exceeding the maxima update length */ + update_size = sizeof(*update); + for (i = 0; i < param_count; i++) + update_size += cfs_size_round(sizeof(*param) + params_sizes[i]); - ureq = ubuf->ub_req; - } + if (unlikely(update_size >= max_update_size)) + return -E2BIG; - /* fill the update into the update buffer */ - obj_update = (struct object_update *)((char *)ureq + ureq_size); - obj_update->ou_fid = *fid; - obj_update->ou_type = op; - obj_update->ou_params_count = (__u16)params_count; - obj_update->ou_batchid = batchid; - param = &obj_update->ou_params[0]; - for (i = 0; i < params_count; i++) { - param->oup_len = param_sizes[i]; + update->ou_fid = *fid; + update->ou_type = update_op; + update->ou_params_count = param_count; + param = &update->ou_params[0]; + for (i = 0; i < param_count; i++) { + param->oup_len = params_sizes[i]; param = (struct object_update_param *)((char *)param + object_update_param_size(param)); } - CDEBUG(D_INFO, "%p "DFID" idx %u: op %d params %d:%d\n", - ureq, PFID(fid), ureq->ourq_count, op, params_count, - (int)update_size); - ureq->ourq_count++; - - RETURN(obj_update); + return 0; } /** * Packs one update into the update_buffer. * * \param[in] env execution environment - * \param[in] ubuf bufer where update will be packed + * \param[in] update update to be packed + * \param[in] max_update_size *maximum size of \a update * \param[in] op update operation (enum update_type) * \param[in] fid object FID for this update * \param[in] param_count number of parameters for this update * \param[in] param_sizes array of parameters length of this update * \param[in] param_bufs parameter buffers - * \param[in] batchid transaction no of this update, plus mdt_index, which - * will be globally unique * * \retval = 0 if updates packing succeeds * \retval negative errno if updates packing fails **/ -int out_update_pack(const struct lu_env *env, struct update_buffer *ubuf, - enum update_type op, const struct lu_fid *fid, - int params_count, __u16 *param_sizes, - const void **param_bufs, __u64 batchid) +int out_update_pack(const struct lu_env *env, struct object_update *update, + size_t max_update_size, enum update_type op, + const struct lu_fid *fid, unsigned int param_count, + __u16 *param_sizes, const void **param_bufs) { - struct object_update *update; struct object_update_param *param; unsigned int i; + int rc; ENTRY; - update = out_update_header_pack(env, ubuf, op, fid, params_count, - param_sizes, batchid); - if (IS_ERR(update)) - RETURN(PTR_ERR(update)); + rc = out_update_header_pack(env, update, max_update_size, op, fid, + param_count, param_sizes); + if (rc != 0) + RETURN(rc); param = &update->ou_params[0]; - for (i = 0; i < params_count; i++) { + for (i = 0; i < param_count; i++) { memcpy(¶m->oup_buf[0], param_bufs[i], param_sizes[i]); param = (struct object_update_param *)((char *)param + object_update_param_size(param)); @@ -200,79 +166,83 @@ EXPORT_SYMBOL(out_update_pack); * \param[in] env execution environment * \param[in] ubuf update buffer * \param[in] fid fid of this object for the update - * \param[in] batchid batch id of this update * * \retval 0 if insertion succeeds. * \retval negative errno if insertion fails. */ -int out_create_pack(const struct lu_env *env, struct update_buffer *ubuf, - const struct lu_fid *fid, struct lu_attr *attr, - struct dt_allocation_hint *hint, - struct dt_object_format *dof, __u64 batchid) +int out_create_pack(const struct lu_env *env, struct object_update *update, + size_t max_update_size, const struct lu_fid *fid, + const struct lu_attr *attr, struct dt_allocation_hint *hint, + struct dt_object_format *dof) { struct obdo *obdo; __u16 sizes[2] = {sizeof(*obdo), 0}; int buf_count = 1; - const struct lu_fid *fid1 = NULL; - struct object_update *update; + const struct lu_fid *parent_fid = NULL; + int rc; ENTRY; if (hint != NULL && hint->dah_parent) { - fid1 = lu_object_fid(&hint->dah_parent->do_lu); - sizes[1] = sizeof(*fid1); + parent_fid = lu_object_fid(&hint->dah_parent->do_lu); + sizes[1] = sizeof(*parent_fid); buf_count++; } - update = out_update_header_pack(env, ubuf, OUT_CREATE, fid, - buf_count, sizes, batchid); - if (IS_ERR(update)) - RETURN(PTR_ERR(update)); + rc = out_update_header_pack(env, update, max_update_size, OUT_CREATE, + fid, buf_count, sizes); + if (rc != 0) + RETURN(rc); obdo = object_update_param_get(update, 0, NULL); + LASSERT(obdo != NULL); obdo->o_valid = 0; obdo_from_la(obdo, attr, attr->la_valid); lustre_set_wire_obdo(NULL, obdo, obdo); - if (fid1 != NULL) { - struct lu_fid *fid; - fid = object_update_param_get(update, 1, NULL); - fid_cpu_to_le(fid, fid1); + + if (parent_fid != NULL) { + struct lu_fid *tmp; + + tmp = object_update_param_get(update, 1, NULL); + LASSERT(tmp != NULL); + fid_cpu_to_le(tmp, parent_fid); } RETURN(0); } EXPORT_SYMBOL(out_create_pack); -int out_ref_del_pack(const struct lu_env *env, struct update_buffer *ubuf, - const struct lu_fid *fid, __u64 batchid) +int out_ref_del_pack(const struct lu_env *env, struct object_update *update, + size_t max_update_size, const struct lu_fid *fid) { - return out_update_pack(env, ubuf, OUT_REF_DEL, fid, 0, NULL, NULL, - batchid); + return out_update_pack(env, update, max_update_size, OUT_REF_DEL, fid, + 0, NULL, NULL); } EXPORT_SYMBOL(out_ref_del_pack); -int out_ref_add_pack(const struct lu_env *env, struct update_buffer *ubuf, - const struct lu_fid *fid, __u64 batchid) +int out_ref_add_pack(const struct lu_env *env, struct object_update *update, + size_t max_update_size, const struct lu_fid *fid) { - return out_update_pack(env, ubuf, OUT_REF_ADD, fid, 0, NULL, NULL, - batchid); + return out_update_pack(env, update, max_update_size, OUT_REF_ADD, fid, + 0, NULL, NULL); } EXPORT_SYMBOL(out_ref_add_pack); -int out_attr_set_pack(const struct lu_env *env, struct update_buffer *ubuf, - const struct lu_fid *fid, const struct lu_attr *attr, - __u64 batchid) +int out_attr_set_pack(const struct lu_env *env, struct object_update *update, + size_t max_update_size, const struct lu_fid *fid, + const struct lu_attr *attr) { - struct object_update *update; struct obdo *obdo; __u16 size = sizeof(*obdo); + int rc; ENTRY; - update = out_update_header_pack(env, ubuf, OUT_ATTR_SET, fid, 1, - &size, batchid); - if (IS_ERR(update)) - RETURN(PTR_ERR(update)); + rc = out_update_header_pack(env, update, max_update_size, + OUT_ATTR_SET, fid, 1, &size); + if (rc != 0) + RETURN(rc); obdo = object_update_param_get(update, 0, NULL); + LASSERT(obdo != NULL); obdo->o_valid = 0; obdo_from_la(obdo, attr, attr->la_valid); lustre_set_wire_obdo(NULL, obdo, obdo); @@ -281,34 +251,35 @@ int out_attr_set_pack(const struct lu_env *env, struct update_buffer *ubuf, } EXPORT_SYMBOL(out_attr_set_pack); -int out_xattr_set_pack(const struct lu_env *env, struct update_buffer *ubuf, - const struct lu_fid *fid, const struct lu_buf *buf, - const char *name, int flag, __u64 batchid) +int out_xattr_set_pack(const struct lu_env *env, struct object_update *update, + size_t max_update_size, const struct lu_fid *fid, + const struct lu_buf *buf, const char *name, __u32 flag) { __u16 sizes[3] = {strlen(name) + 1, buf->lb_len, sizeof(flag)}; const void *bufs[3] = {(char *)name, (char *)buf->lb_buf, (char *)&flag}; - return out_update_pack(env, ubuf, OUT_XATTR_SET, fid, - ARRAY_SIZE(sizes), sizes, bufs, batchid); + return out_update_pack(env, update, max_update_size, OUT_XATTR_SET, + fid, ARRAY_SIZE(sizes), sizes, bufs); } EXPORT_SYMBOL(out_xattr_set_pack); -int out_xattr_del_pack(const struct lu_env *env, struct update_buffer *ubuf, - const struct lu_fid *fid, const char *name, - __u64 batchid) +int out_xattr_del_pack(const struct lu_env *env, struct object_update *update, + size_t max_update_size, const struct lu_fid *fid, + const char *name) { __u16 size = strlen(name) + 1; - return out_update_pack(env, ubuf, OUT_XATTR_DEL, fid, 1, &size, - (const void **)&name, batchid); + return out_update_pack(env, update, max_update_size, OUT_XATTR_DEL, + fid, 1, &size, (const void **)&name); } EXPORT_SYMBOL(out_xattr_del_pack); -int out_index_insert_pack(const struct lu_env *env, struct update_buffer *ubuf, - const struct lu_fid *fid, const struct dt_rec *rec, - const struct dt_key *key, __u64 batchid) +int out_index_insert_pack(const struct lu_env *env, + struct object_update *update, + size_t max_update_size, const struct lu_fid *fid, + const struct dt_rec *rec, const struct dt_key *key) { struct dt_insert_rec *rec1 = (struct dt_insert_rec *)rec; struct lu_fid rec_fid; @@ -322,35 +293,36 @@ int out_index_insert_pack(const struct lu_env *env, struct update_buffer *ubuf, fid_cpu_to_le(&rec_fid, rec1->rec_fid); - return out_update_pack(env, ubuf, OUT_INDEX_INSERT, fid, - ARRAY_SIZE(sizes), sizes, bufs, batchid); + return out_update_pack(env, update, max_update_size, OUT_INDEX_INSERT, + fid, ARRAY_SIZE(sizes), sizes, bufs); } EXPORT_SYMBOL(out_index_insert_pack); -int out_index_delete_pack(const struct lu_env *env, struct update_buffer *ubuf, - const struct lu_fid *fid, const struct dt_key *key, - __u64 batchid) +int out_index_delete_pack(const struct lu_env *env, + struct object_update *update, + size_t max_update_size, const struct lu_fid *fid, + const struct dt_key *key) { __u16 size = strlen((char *)key) + 1; const void *buf = key; - return out_update_pack(env, ubuf, OUT_INDEX_DELETE, fid, 1, &size, - &buf, batchid); + return out_update_pack(env, update, max_update_size, OUT_INDEX_DELETE, + fid, 1, &size, &buf); } EXPORT_SYMBOL(out_index_delete_pack); int out_object_destroy_pack(const struct lu_env *env, - struct update_buffer *ubuf, - const struct lu_fid *fid, __u64 batchid) + struct object_update *update, + size_t max_update_size, const struct lu_fid *fid) { - return out_update_pack(env, ubuf, OUT_DESTROY, fid, 0, NULL, NULL, - batchid); + return out_update_pack(env, update, max_update_size, OUT_DESTROY, fid, + 0, NULL, NULL); } EXPORT_SYMBOL(out_object_destroy_pack); -int out_write_pack(const struct lu_env *env, struct update_buffer *ubuf, - const struct lu_fid *fid, const struct lu_buf *buf, - loff_t pos, __u64 batchid) +int out_write_pack(const struct lu_env *env, struct object_update *update, + size_t max_update_size, const struct lu_fid *fid, + const struct lu_buf *buf, __u64 pos) { __u16 sizes[2] = {buf->lb_len, sizeof(pos)}; const void *bufs[2] = {(char *)buf->lb_buf, (char *)&pos}; @@ -358,8 +330,8 @@ int out_write_pack(const struct lu_env *env, struct update_buffer *ubuf, pos = cpu_to_le64(pos); - rc = out_update_pack(env, ubuf, OUT_WRITE, fid, ARRAY_SIZE(sizes), - sizes, bufs, batchid); + rc = out_update_pack(env, update, max_update_size, OUT_WRITE, fid, + ARRAY_SIZE(sizes), sizes, bufs); return rc; } EXPORT_SYMBOL(out_write_pack); @@ -375,36 +347,40 @@ EXPORT_SYMBOL(out_write_pack); * \param[in] fid fid of this object for the update * \param[in] ubuf update buffer * - * \retval 0 if packing succeeds. - * \retval negative errno if packing fails. - */ -int out_index_lookup_pack(const struct lu_env *env, struct update_buffer *ubuf, - const struct lu_fid *fid, struct dt_rec *rec, - const struct dt_key *key) + * \retval = 0 pack succeed. + * < 0 pack failed. + **/ +int out_index_lookup_pack(const struct lu_env *env, + struct object_update *update, + size_t max_update_size, const struct lu_fid *fid, + struct dt_rec *rec, const struct dt_key *key) { const void *name = key; __u16 size = strlen((char *)name) + 1; - return out_update_pack(env, ubuf, OUT_INDEX_LOOKUP, fid, 1, &size, - &name, 0); + return out_update_pack(env, update, max_update_size, OUT_INDEX_LOOKUP, + fid, 1, &size, &name); } EXPORT_SYMBOL(out_index_lookup_pack); -int out_attr_get_pack(const struct lu_env *env, struct update_buffer *ubuf, - const struct lu_fid *fid) +int out_attr_get_pack(const struct lu_env *env, struct object_update *update, + size_t max_update_size, const struct lu_fid *fid) { - return out_update_pack(env, ubuf, OUT_ATTR_GET, fid, 0, NULL, NULL, 0); + return out_update_pack(env, update, max_update_size, OUT_ATTR_GET, + fid, 0, NULL, NULL); } EXPORT_SYMBOL(out_attr_get_pack); -int out_xattr_get_pack(const struct lu_env *env, struct update_buffer *ubuf, - const struct lu_fid *fid, const char *name) +int out_xattr_get_pack(const struct lu_env *env, struct object_update *update, + size_t max_update_size, const struct lu_fid *fid, + const char *name) { __u16 size; LASSERT(name != NULL); size = strlen(name) + 1; - return out_update_pack(env, ubuf, OUT_XATTR_GET, fid, 1, &size, - (const void **)&name, 0); + + return out_update_pack(env, update, max_update_size, OUT_XATTR_GET, + fid, 1, &size, (const void **)&name); } EXPORT_SYMBOL(out_xattr_get_pack); diff --git a/lustre/target/tgt_internal.h b/lustre/target/tgt_internal.h index 3cb395f..919fa1e 100644 --- a/lustre/target/tgt_internal.h +++ b/lustre/target/tgt_internal.h @@ -209,6 +209,8 @@ int out_handle(struct tgt_session_info *tsi); #define out_tx_write(info, obj, buf, pos, th, reply, idx) \ __out_tx_write(info, obj, buf, pos, th, reply, idx, __FILE__, __LINE__) +const char *update_op_str(__u16 opcode); + extern struct page *tgt_page_to_corrupt; struct tgt_thread_big_cache { @@ -221,4 +223,29 @@ int tgt_txn_start_cb(const struct lu_env *env, struct thandle *th, int tgt_txn_stop_cb(const struct lu_env *env, struct thandle *th, void *cookie); +void update_records_dump(const struct update_records *records, + unsigned int mask, bool dump_updates); + +struct update_thread_info { + struct lu_attr uti_attr; + struct lu_fid uti_fid; + struct lu_buf uti_buf; + struct thandle_update_records uti_tur; + struct obdo uti_obdo; +}; + +extern struct lu_context_key update_thread_key; + +static inline struct update_thread_info * +update_env_info(const struct lu_env *env) +{ + struct update_thread_info *uti; + + uti = lu_context_key_get(&env->le_ctx, &update_thread_key); + LASSERT(uti != NULL); + return uti; +} + +void update_info_init(void); +void update_info_fini(void); #endif /* _TG_INTERNAL_H */ diff --git a/lustre/target/tgt_main.c b/lustre/target/tgt_main.c index d7791ca..269d896 100644 --- a/lustre/target/tgt_main.c +++ b/lustre/target/tgt_main.c @@ -216,6 +216,8 @@ int tgt_mod_init(void) tgt_ses_key_init_generic(&tgt_session_key, NULL); lu_context_key_register_many(&tgt_session_key, NULL); + update_info_init(); + RETURN(0); } @@ -226,5 +228,6 @@ void tgt_mod_exit(void) lu_context_key_degister(&tgt_thread_key); lu_context_key_degister(&tgt_session_key); + update_info_fini(); } diff --git a/lustre/target/update_records.c b/lustre/target/update_records.c new file mode 100644 index 0000000..836f150 --- /dev/null +++ b/lustre/target/update_records.c @@ -0,0 +1,965 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.gnu.org/licenses/gpl-2.0.html + * + * GPL HEADER END + */ +/* + * Copyright (c) 2014, Intel Corporation. + */ + +/* + * lustre/target/update_records.c + * + * This file implement the methods to pack updates as update records, which + * will be written to the disk as llog record, and might be used during + * recovery. + * + * For cross-MDT operation, all of updates of the operation needs to be + * recorded in the disk, then during recovery phase, the recovery thread + * will retrieve and redo these updates if it needed. + * + * See comments above struct update_records for the format of update_records. + * + * Author: Di Wang + */ +#define DEBUG_SUBSYSTEM S_CLASS + +#include +#include +#include +#include +#include "tgt_internal.h" + +#define UPDATE_RECORDS_BUFFER_SIZE 8192 +#define UPDATE_PARAMS_BUFFER_SIZE 8192 +/** + * Dump update record. + * + * Dump all of updates in the update_records, mostly for debugging purpose. + * + * \param[in] records update records to be dumpped + * \param[in] mask debug level mask + * \param[in] dump_params if dump all of updates the updates. + * + */ +void update_records_dump(const struct update_records *records, + unsigned int mask, bool dump_updates) +{ + const struct update_ops *ops; + const struct update_op *op = NULL; + struct update_params *params; + unsigned int i; + + ops = &records->ur_ops; + params = update_records_get_params(records); + + CDEBUG(mask, "ops = %d params_count = %d\n", records->ur_update_count, + records->ur_param_count); + + if (records->ur_update_count == 0) + return; + + if (!dump_updates) + return; + + op = &ops->uops_op[0]; + for (i = 0; i < records->ur_update_count; i++) { + unsigned int j; + + CDEBUG(mask, "update %dth "DFID" %s params_count = %hu\n", i, + PFID(&op->uop_fid), update_op_str(op->uop_type), + op->uop_param_count); + + for (j = 0; j < op->uop_param_count; j++) { + struct object_update_param *param; + + param = update_params_get_param(params, + (unsigned int)op->uop_params_off[j], + records->ur_param_count); + + LASSERT(param != NULL); + CDEBUG(mask, "param = %p %dth off = %hu size = %hu\n", + param, j, op->uop_params_off[j], param->oup_len); + } + + op = update_op_next_op(op); + } +} + +/** + * Pack parameters to update records + * + * Find and insert parameter to update records, if the parameter + * already exists in \a params, then just return the offset of this + * parameter, otherwise insert the parameter and return its offset + * + * \param[in] params update params in which to insert parameter + * \param[in] new_param parameters to be inserted. + * \param[in] new_param_size the size of \a new_param + * + * \retval index inside \a params if parameter insertion + * succeeds. + * \retval negative errno if it fails. + */ +static unsigned int update_records_param_pack(struct update_params *params, + const void *new_param, + size_t new_param_size, + unsigned int *param_count) +{ + struct object_update_param *param; + unsigned int i; + + for (i = 0; i < *param_count; i++) { + struct object_update_param *param; + + param = update_params_get_param(params, i, *param_count); + if ((new_param == NULL && param->oup_len == new_param_size) || + (param->oup_len == new_param_size && + memcmp(param->oup_buf, new_param, new_param_size) == 0)) + /* Found the parameter and return its index */ + return i; + } + + param = (struct object_update_param *)((char *)params + + update_params_size(params, *param_count)); + + param->oup_len = new_param_size; + if (new_param != NULL) + memcpy(param->oup_buf, new_param, new_param_size); + + *param_count = *param_count + 1; + + return *param_count - 1; +} + +/** + * Pack update to update records + * + * Pack the update and its parameters to the update records. First it will + * insert parameters, get the offset of these parameter, then fill the + * update with these offset. If insertion exceed the maximum size of + * current update records, it will return -E2BIG here, and the caller might + * extend the update_record size \see lod_updates_pack. + * + * \param[in] env execution environment + * \param[in] fid FID of the update. + * \param[in] op_type operation type of the update + * \param[in] ops ur_ops in update records + * \param[in|out] op_count pointer to the count of ops + * \param[in|out] max_op_size maximum size of the update + * \param[in] params ur_params in update records + * \param[in|out] param_count pointer to the count of params + * \param[in|out] max_param_size maximum size of the parameter + * \param[in] param_bufs buffers of parameters + * \param[in] params_buf_count the count of the parameter buffers + * \param[in] param_size sizes of parameters + * + * \retval 0 if packing succeeds + * \retval negative errno if packing fails + */ +static int update_records_update_pack(const struct lu_env *env, + const struct lu_fid *fid, + enum update_type op_type, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_op_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + unsigned int param_bufs_count, + const void **param_bufs, + size_t *param_sizes) +{ + struct update_op *op; + size_t total_param_sizes = 0; + int index; + unsigned int i; + + /* Check whether the packing exceeding the maximum update size */ + if (unlikely(*max_op_size < update_op_size(param_bufs_count))) { + CDEBUG(D_INFO, "max_op_size = %zu update_op = %zu\n", + *max_op_size, update_op_size(param_bufs_count)); + *max_op_size = update_op_size(param_bufs_count); + return -E2BIG; + } + + for (i = 0; i < param_bufs_count; i++) + total_param_sizes += + cfs_size_round(sizeof(struct object_update_param) + + param_sizes[i]); + + /* Check whether the packing exceeding the maximum parameter size */ + if (unlikely(*max_param_size < total_param_sizes)) { + CDEBUG(D_INFO, "max_param_size = %zu params size = %zu\n", + *max_param_size, total_param_sizes); + + *max_param_size = total_param_sizes; + return -E2BIG; + } + + op = update_ops_get_op(ops, *op_count, *op_count); + op->uop_fid = *fid; + op->uop_type = op_type; + op->uop_param_count = param_bufs_count; + for (i = 0; i < param_bufs_count; i++) { + index = update_records_param_pack(params, param_bufs[i], + param_sizes[i], param_count); + if (index < 0) + return index; + + CDEBUG(D_INFO, "%s %uth param offset = %d size = %zu\n", + update_op_str(op_type), i, index, param_sizes[i]); + + op->uop_params_off[i] = index; + } + CDEBUG(D_INFO, "%huth "DFID" %s param_count = %u\n", + *op_count, PFID(fid), update_op_str(op_type), *param_count); + + *op_count = *op_count + 1; + + return 0; +} + +/** + * Pack create update + * + * Pack create update into update records. + * + * \param[in] env execution environment + * \param[in] ops ur_ops in update records + * \param[in|out] op_count pointer to the count of ops + * \param[in|out] max_op_size maximum size of the update + * \param[in] params ur_params in update records + * \param[in|out] param_count pointer to the count of params + * \param[in|out] max_param_size maximum size of the parameter + * \param[in] fid FID of the object to be created + * \param[in] attr attribute of the object to be created + * \param[in] hint creation hint + * \param[in] dof creation format information + * + * \retval 0 if packing succeeds. + * \retval negative errno if packing fails. + */ +int update_records_create_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const struct lu_attr *attr, + const struct dt_allocation_hint *hint, + struct dt_object_format *dof) +{ + size_t sizes[2]; + const void *bufs[2]; + int buf_count = 0; + const struct lu_fid *parent_fid = NULL; + struct lu_fid tmp_fid; + int rc; + struct obdo *obdo; + + if (attr != NULL) { + obdo = &update_env_info(env)->uti_obdo; + obdo->o_valid = 0; + obdo_from_la(obdo, attr, attr->la_valid); + lustre_set_wire_obdo(NULL, obdo, obdo); + bufs[buf_count] = obdo; + sizes[buf_count] = sizeof(*obdo); + buf_count++; + } + + if (hint != NULL && hint->dah_parent != NULL) { + parent_fid = lu_object_fid(&hint->dah_parent->do_lu); + fid_cpu_to_le(&tmp_fid, parent_fid); + bufs[buf_count] = &tmp_fid; + sizes[buf_count] = sizeof(tmp_fid); + buf_count++; + } + + rc = update_records_update_pack(env, fid, OUT_CREATE, ops, op_count, + max_ops_size, params, param_count, + max_param_size, buf_count, bufs, sizes); + return rc; +} +EXPORT_SYMBOL(update_records_create_pack); + +/** + * Pack attr set update + * + * Pack attr_set update into update records. + * + * \param[in] env execution environment + * \param[in] ops ur_ops in update records + * \param[in|out] op_count pointer to the count of ops + * \param[in|out] max_op_size maximum size of the update + * \param[in] params ur_params in update records + * \param[in|out] param_count pointer to the count of params + * \param[in|out] max_param_size maximum size of the parameter + * \param[in] fid FID of the object to set attr + * \param[in] attr attribute of attr set + * + * \retval 0 if packing succeeds. + * \retval negative errno if packing fails. + */ +int update_records_attr_set_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const struct lu_attr *attr) +{ + struct obdo *obdo = &update_env_info(env)->uti_obdo; + size_t size = sizeof(*obdo); + + obdo->o_valid = 0; + obdo_from_la(obdo, attr, attr->la_valid); + lustre_set_wire_obdo(NULL, obdo, obdo); + return update_records_update_pack(env, fid, OUT_ATTR_SET, ops, op_count, + max_ops_size, params, param_count, + max_param_size, 1, + (const void **)&obdo, &size); +} +EXPORT_SYMBOL(update_records_attr_set_pack); + +/** + * Pack ref add update + * + * Pack ref add update into update records. + * + * \param[in] env execution environment + * \param[in] ops ur_ops in update records + * \param[in|out] op_count pointer to the count of ops + * \param[in|out] max_op_size maximum size of the update + * \param[in] params ur_params in update records + * \param[in|out] param_count pointer to the count of params + * \param[in|out] max_param_size maximum size of the parameter + * \param[in] fid FID of the object to add reference + * + * \retval 0 if packing succeeds. + * \retval negative errno if packing fails. + */ +int update_records_ref_add_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid) +{ + return update_records_update_pack(env, fid, OUT_REF_ADD, ops, op_count, + max_ops_size, params, param_count, + max_param_size, 0, NULL, NULL); +} +EXPORT_SYMBOL(update_records_ref_add_pack); + +/** + * Pack ref del update + * + * Pack ref del update into update records. + * + * \param[in] env execution environment + * \param[in] ops ur_ops in update records + * \param[in|out] op_count pointer to the count of ops + * \param[in|out] max_op_size maximum size of the update + * \param[in] params ur_params in update records + * \param[in] param_count pointer to the count of params + * \param[in|out] max_param_size maximum size of the parameter + * \param[in] fid FID of the object to delete reference + * + * \retval 0 if packing succeeds. + * \retval negative errno if packing fails. + */ +int update_records_ref_del_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid) +{ + return update_records_update_pack(env, fid, OUT_REF_DEL, ops, op_count, + max_ops_size, params, param_count, + max_param_size, 0, NULL, NULL); +} +EXPORT_SYMBOL(update_records_ref_del_pack); + +/** + * Pack object destroy update + * + * Pack object destroy update into update records. + * + * \param[in] env execution environment + * \param[in] ops ur_ops in update records + * \param[in|out] op_count pointer to the count of ops + * \param[in|out] max_op_size maximum size of the update + * \param[in] params ur_params in update records + * \param[in|out] param_count pointer to the count of params + * \param[in|out] max_param_size maximum size of the parameter + * \param[in] fid FID of the object to delete reference + * + * \retval 0 if packing succeeds. + * \retval negative errno if packing fails. + */ +int update_records_object_destroy_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid) +{ + return update_records_update_pack(env, fid, OUT_DESTROY, ops, op_count, + max_ops_size, params, param_count, + max_param_size, 0, NULL, NULL); +} +EXPORT_SYMBOL(update_records_object_destroy_pack); + +/** + * Pack index insert update + * + * Pack index insert update into update records. + * + * \param[in] env execution environment + * \param[in] ops ur_ops in update records + * \param[in] op_count pointer to the count of ops + * \param[in|out] max_op_size maximum size of the update + * \param[in] params ur_params in update records + * \param[in] param_count pointer to the count of params + * \param[in|out] max_param_size maximum size of the parameter + * \param[in] fid FID of the object to insert index + * \param[in] rec record of insertion + * \param[in] key key of insertion + * + * \retval 0 if packing succeeds. + * \retval negative errno if packing fails. + */ +int update_records_index_insert_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const struct dt_rec *rec, + const struct dt_key *key) +{ + struct dt_insert_rec *rec1 = (struct dt_insert_rec *)rec; + struct lu_fid rec_fid; + __u32 type = cpu_to_le32(rec1->rec_type); + size_t sizes[3] = { strlen((const char *)key) + 1, + sizeof(rec_fid), + sizeof(type) }; + const void *bufs[3] = { key, + &rec_fid, + &type }; + + fid_cpu_to_le(&rec_fid, rec1->rec_fid); + + return update_records_update_pack(env, fid, OUT_INDEX_INSERT, ops, + op_count, max_ops_size, params, + param_count, max_param_size, + 3, bufs, sizes); +} +EXPORT_SYMBOL(update_records_index_insert_pack); + +/** + * Pack index delete update + * + * Pack index delete update into update records. + * + * \param[in] env execution environment + * \param[in] ops ur_ops in update records + * \param[in|out] op_count pointer to the count of ops + * \param[in|out] max_op_size maximum size of the update + * \param[in] params ur_params in update records + * \param[in|ount] param_count pointer to the count of params + * \param[in|out] max_param_size maximum size of the parameter + * \param[in] fid FID of the object to delete index + * \param[in] key key of deletion + * + * \retval 0 if packing succeeds. + * \retval negative errno if packing fails. + */ +int update_records_index_delete_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const struct dt_key *key) +{ + size_t size = strlen((const char *)key) + 1; + + return update_records_update_pack(env, fid, OUT_INDEX_DELETE, ops, + op_count, max_ops_size, params, + param_count, max_param_size, + 1, (const void **)&key, &size); +} +EXPORT_SYMBOL(update_records_index_delete_pack); + +/** + * Pack xattr set update + * + * Pack xattr set update into update records. + * + * \param[in] env execution environment + * \param[in] ops ur_ops in update records + * \param[in|out] op_count pointer to the count of ops + * \param[in|out] max_op_size maximum size of the update + * \param[in] params ur_params in update records + * \param[in|out] param_count pointer to the count of params + * \param[in|out] max_param_size maximum size of the parameter + * \param[in] fid FID of the object to set xattr + * \param[in] buf xattr to be set + * \param[in] name name of the xattr + * \param[in] flag flag for setting xattr + * + * \retval 0 if packing succeeds. + * \retval negative errno if packing fails. + */ +int update_records_xattr_set_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const struct lu_buf *buf, const char *name, + __u32 flag) +{ + size_t sizes[3] = {strlen(name) + 1, buf->lb_len, sizeof(flag)}; + const void *bufs[3] = {name, buf->lb_buf, &flag}; + + flag = cpu_to_le32(flag); + + return update_records_update_pack(env, fid, OUT_XATTR_SET, ops, + op_count, max_ops_size, params, + param_count, max_param_size, + 3, bufs, sizes); +} +EXPORT_SYMBOL(update_records_xattr_set_pack); + +/** + * Pack xattr delete update + * + * Pack xattr delete update into update records. + * + * \param[in] env execution environment + * \param[in] ops ur_ops in update records + * \param[in|out] op_count pointer to the count of ops + * \param[in|out] max_op_size maximum size of the update + * \param[in] params ur_params in update records + * \param[in|out] param_count pointer to the count of params + * \param[in|out] max_param_size maximum size of the parameter + * \param[in] fid FID of the object to delete xattr + * \param[in] name name of the xattr + * + * \retval 0 if packing succeeds. + * \retval negative errno if packing fails. + */ +int update_records_xattr_del_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const char *name) +{ + size_t size = strlen(name) + 1; + + return update_records_update_pack(env, fid, OUT_XATTR_DEL, ops, + op_count, max_ops_size, params, + param_count, max_param_size, + 1, (const void **)&name, &size); +} +EXPORT_SYMBOL(update_records_xattr_del_pack); + +/** + * Pack write update + * + * Pack write update into update records. + * + * \param[in] env execution environment + * \param[in] ops ur_ops in update records + * \param[in|out] op_count pointer to the count of ops + * \param[in|out] max_op_size maximum size of the update + * \param[in] params ur_params in update records + * \param[in|out] param_count pointer to the count of params + * \param[in|out] max_param_size maximum size of the parameter + * \param[in] fid FID of the object to write into + * \param[in] buf buffer to write which includes an embedded size field + * \param[in] pos offet in the object to start writing at + * + * \retval 0 if packing succeeds. + * \retval negative errno if packing fails. + */ +int update_records_write_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const struct lu_buf *buf, + __u64 pos) +{ + size_t sizes[2] = {buf->lb_len, sizeof(pos)}; + const void *bufs[2] = {buf->lb_buf, &pos}; + + pos = cpu_to_le64(pos); + + return update_records_update_pack(env, fid, OUT_XATTR_DEL, ops, + op_count, max_ops_size, params, + param_count, max_param_size, + 2, bufs, sizes); +} +EXPORT_SYMBOL(update_records_write_pack); + +/** + * Create update records in thandle_update_records + * + * Allocate update_records for thandle_update_records, the initial size + * will be 4KB. + * + * \param[in] tur thandle_update_records where update_records will be + * allocated + * \retval 0 if allocation succeeds. + * \retval negative errno if allocation fails. + */ +static int tur_update_records_create(struct thandle_update_records *tur) +{ + if (tur->tur_update_records != NULL) + return 0; + + OBD_ALLOC_LARGE(tur->tur_update_records, + UPDATE_RECORDS_BUFFER_SIZE); + + if (tur->tur_update_records == NULL) + return -ENOMEM; + + tur->tur_update_records_buf_size = UPDATE_RECORDS_BUFFER_SIZE; + + return 0; +} + +/** + * Extend update records + * + * Extend update_records to the new size in thandle_update_records. + * + * \param[in] tur thandle_update_records where update_records will be + * extended. + * \retval 0 if extension succeeds. + * \retval negative errno if extension fails. + */ +int tur_update_records_extend(struct thandle_update_records *tur, + size_t new_size) +{ + struct llog_update_record *record; + + OBD_ALLOC_LARGE(record, new_size); + if (record == NULL) + return -ENOMEM; + + if (tur->tur_update_records != NULL) { + memcpy(record, tur->tur_update_records, + tur->tur_update_records_buf_size); + OBD_FREE_LARGE(tur->tur_update_records, + tur->tur_update_records_buf_size); + } + + tur->tur_update_records = record; + tur->tur_update_records_buf_size = new_size; + + return 0; +} +EXPORT_SYMBOL(tur_update_records_extend); + +/** + * Extend update records + * + * Extend update records in thandle to make sure it is able to hold + * the update with certain update_op and params size. + * + * \param [in] tur thandle_update_records to be extend + * \param [in] new_op_size update_op size of the update record + * \param [in] new_param_size params size of the update record + * + * \retval 0 if the update_records is being extended. + * \retval negative errno if the update_records is not being + * extended. + */ +int tur_update_extend(struct thandle_update_records *tur, + size_t new_op_size, size_t new_param_size) +{ + size_t record_size; + size_t params_size; + size_t extend_size; + int rc; + ENTRY; + + record_size = llog_update_record_size(tur->tur_update_records); + /* extend update records buffer */ + if (new_op_size > (tur->tur_update_records_buf_size - record_size - + sizeof(*tur->tur_update_records))) { + extend_size = round_up(new_op_size, UPDATE_RECORDS_BUFFER_SIZE); + rc = tur_update_records_extend(tur, + tur->tur_update_records_buf_size + + extend_size); + if (rc != 0) + RETURN(rc); + } + + /* extend parameters buffer */ + params_size = update_params_size(tur->tur_update_params, + tur->tur_update_param_count); + if (new_param_size > (tur->tur_update_params_buf_size - + params_size)) { + extend_size = round_up(new_param_size, + UPDATE_PARAMS_BUFFER_SIZE); + rc = tur_update_params_extend(tur, + tur->tur_update_params_buf_size + + extend_size); + if (rc != 0) + RETURN(rc); + } + + RETURN(0); +} +EXPORT_SYMBOL(tur_update_extend); + +/** + * Create update params in thandle_update_records + * + * Allocate update_params for thandle_update_records, the initial size + * will be 4KB. + * + * \param[in] tur thandle_update_records where update_params will be + * allocated + * \retval 0 if allocation succeeds. + * \retval negative errno if allocation fails. + */ +static int tur_update_params_create(struct thandle_update_records *tur) +{ + if (tur->tur_update_params != NULL) + return 0; + + OBD_ALLOC_LARGE(tur->tur_update_params, UPDATE_PARAMS_BUFFER_SIZE); + if (tur->tur_update_params == NULL) + return -ENOMEM; + + tur->tur_update_params_buf_size = UPDATE_PARAMS_BUFFER_SIZE; + return 0; +} + +/** + * Extend update params + * + * Extend update_params to the new size in thandle_update_records. + * + * \param[in] tur thandle_update_records where update_params will be + * extended. + * \retval 0 if extension succeeds. + * \retval negative errno if extension fails. + */ +int tur_update_params_extend(struct thandle_update_records *tur, + size_t new_size) +{ + struct update_params *params; + + OBD_ALLOC_LARGE(params, new_size); + if (params == NULL) + return -ENOMEM; + + if (tur->tur_update_params != NULL) { + memcpy(params, tur->tur_update_params, + tur->tur_update_params_buf_size); + OBD_FREE_LARGE(tur->tur_update_params, + tur->tur_update_params_buf_size); + } + + tur->tur_update_params = params; + tur->tur_update_params_buf_size = new_size; + + return 0; +} +EXPORT_SYMBOL(tur_update_params_extend); + +/** + * Check and prepare whether it needs to record update. + * + * Checks if the transaction needs to record updates, and if it + * does, then initialize the update record buffer in the transaction. + * + * \param[in] env execution environment + * \param[in] th transaction handle + * + * \retval 0 if updates recording succeeds. + * \retval negative errno if updates recording fails. + */ +int check_and_prepare_update_record(const struct lu_env *env, + struct thandle *th) +{ + struct thandle_update_records *tur; + struct llog_update_record *lur; + struct top_thandle *top_th; + struct sub_thandle *lst; + int rc; + bool record_update = false; + ENTRY; + + top_th = container_of(th, struct top_thandle, tt_super); + /* Check if it needs to record updates for this transaction */ + list_for_each_entry(lst, &top_th->tt_sub_thandle_list, st_sub_list) { + if (lst->st_record_update) { + record_update = true; + break; + } + } + if (!record_update) + RETURN(0); + + if (top_th->tt_update_records != NULL) + RETURN(0); + + tur = &update_env_info(env)->uti_tur; + if (tur->tur_update_records == NULL) { + rc = tur_update_records_create(tur); + if (rc < 0) + RETURN(rc); + } + + if (tur->tur_update_params == NULL) { + rc = tur_update_params_create(tur); + if (rc < 0) + RETURN(rc); + } + + lur = tur->tur_update_records; + lur->lur_update_rec.ur_update_count = 0; + lur->lur_update_rec.ur_param_count = 0; + lur->lur_update_rec.ur_master_transno = 0; + lur->lur_update_rec.ur_batchid = 0; + lur->lur_update_rec.ur_flags = 0; + + tur->tur_update_param_count = 0; + + top_th->tt_update_records = tur; + + RETURN(0); +} +EXPORT_SYMBOL(check_and_prepare_update_record); + +/** + * Merge params into the update records + * + * Merge params and ops into the update records attached to top th. + * During transaction execution phase, parameters and update ops + * are collected in two different buffers (see lod_updates_pack()), + * then in transaction stop, it needs to be merged them in one + * buffer, then being written into the update log. + * + * \param[in] env execution environment + * \param[in] tur thandle update records whose updates and + * parameters are merged + * + * \retval 0 if merging succeeds. + * \retval negaitive errno if merging fails. + */ +int merge_params_updates_buf(const struct lu_env *env, + struct thandle_update_records *tur) +{ + struct llog_update_record *lur; + struct update_params *params; + size_t params_size; + size_t record_size; + + if (tur->tur_update_records == NULL || + tur->tur_update_params == NULL) + return 0; + + lur = tur->tur_update_records; + /* Extends the update records buffer if needed */ + params_size = update_params_size(tur->tur_update_params, + tur->tur_update_param_count); + record_size = llog_update_record_size(lur); + if (record_size + params_size > tur->tur_update_records_buf_size) { + int rc; + + rc = tur_update_records_extend(tur, record_size + params_size); + if (rc < 0) + return rc; + lur = tur->tur_update_records; + } + + params = update_records_get_params(&lur->lur_update_rec); + memcpy(params, tur->tur_update_params, params_size); + lur->lur_update_rec.ur_param_count = tur->tur_update_param_count; + return 0; +} +EXPORT_SYMBOL(merge_params_updates_buf); + +static void update_key_fini(const struct lu_context *ctx, + struct lu_context_key *key, void *data) +{ + struct update_thread_info *info = data; + + if (info->uti_tur.tur_update_records != NULL) + OBD_FREE_LARGE(info->uti_tur.tur_update_records, + info->uti_tur.tur_update_records_buf_size); + if (info->uti_tur.tur_update_params != NULL) + OBD_FREE_LARGE(info->uti_tur.tur_update_params, + info->uti_tur.tur_update_params_buf_size); + + OBD_FREE_PTR(info); +} + +/* context key constructor/destructor: update_key_init, update_key_fini */ +LU_KEY_INIT(update, struct update_thread_info); +/* context key: update_thread_key */ +LU_CONTEXT_KEY_DEFINE(update, LCT_MD_THREAD | LCT_MG_THREAD | + LCT_DT_THREAD | LCT_TX_HANDLE | LCT_LOCAL); +EXPORT_SYMBOL(update_thread_key); +LU_KEY_INIT_GENERIC(update); + +void update_info_init(void) +{ + update_key_init_generic(&update_thread_key, NULL); + lu_context_key_register(&update_thread_key); +} + +void update_info_fini(void) +{ + lu_context_key_degister(&update_thread_key); +} diff --git a/lustre/target/update_trans.c b/lustre/target/update_trans.c index b57fbe8..8ecf3a6 100644 --- a/lustre/target/update_trans.c +++ b/lustre/target/update_trans.c @@ -53,6 +53,7 @@ #include #include #include +#include /** * Create the top transaction. @@ -85,8 +86,10 @@ top_trans_create(const struct lu_env *env, struct dt_device *master_dev) top_th->tt_magic = TOP_THANDLE_MAGIC; top_th->tt_master_sub_thandle = child_th; child_th->th_top = &top_th->tt_super; - INIT_LIST_HEAD(&top_th->tt_sub_thandle_list); + + top_th->tt_update_records = NULL; top_th->tt_super.th_top = &top_th->tt_super; + INIT_LIST_HEAD(&top_th->tt_sub_thandle_list); return &top_th->tt_super; } @@ -110,7 +113,11 @@ int top_trans_start(const struct lu_env *env, struct dt_device *master_dev, struct top_thandle *top_th = container_of(th, struct top_thandle, tt_super); struct sub_thandle *lst; - int rc = 0; + int rc; + + rc = check_and_prepare_update_record(env, th); + if (rc < 0) + return rc; LASSERT(top_th->tt_magic == TOP_THANDLE_MAGIC); list_for_each_entry(lst, &top_th->tt_sub_thandle_list, st_sub_list) { @@ -148,11 +155,22 @@ int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev, struct sub_thandle *lst; struct top_thandle *top_th = container_of(th, struct top_thandle, tt_super); + struct thandle_update_records *tur = top_th->tt_update_records; int rc; ENTRY; /* Note: we always need walk through all of sub_transaction to do * transaction stop to release the resource here */ + if (tur != NULL) { + rc = merge_params_updates_buf(env, tur); + if (rc == 0) { + struct update_records *record; + + record = &tur->tur_update_records->lur_update_rec; + update_records_dump(record, D_INFO, false); + } + } + LASSERT(top_th->tt_magic == TOP_THANDLE_MAGIC); top_th->tt_master_sub_thandle->th_local = th->th_local; @@ -233,6 +251,7 @@ struct thandle *thandle_get_sub_by_dt(const struct lu_env *env, INIT_LIST_HEAD(&lst->st_sub_list); lst->st_sub_th = sub_th; list_add(&lst->st_sub_list, &top_th->tt_sub_thandle_list); + lst->st_record_update = 1; RETURN(sub_th); } -- 1.8.3.1