X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Finclude%2Flustre_update.h;h=1417804f68f316dc183b7b8b597f46557fee08d5;hp=b65e00d409ba58f68f85321cdc175bb213dd3078;hb=4f53536d002c13886210b672b657795baa067144;hpb=281671b5ee43c2aea5d5b708aadf10fd1df45b16 diff --git a/lustre/include/lustre_update.h b/lustre/include/lustre_update.h index b65e00d4..1417804 100644 --- a/lustre/include/lustre_update.h +++ b/lustre/include/lustre_update.h @@ -20,7 +20,7 @@ * GPL HEADER END */ /* - * Copyright (c) 2013, Intel Corporation. + * Copyright (c) 2013, 2014, Intel Corporation. */ /* * lustre/include/lustre_update.h @@ -31,55 +31,222 @@ #ifndef _LUSTRE_UPDATE_H #define _LUSTRE_UPDATE_H #include +#include -#define OUT_UPDATE_INIT_BUFFER_SIZE 8192 -#define OUT_UPDATE_REPLY_SIZE 8192 +#define OUT_UPDATE_INIT_BUFFER_SIZE 4096 +/* 16KB, the current biggest size is llog header(8KB) */ +#define OUT_UPDATE_REPLY_SIZE 16384 + +struct dt_key; +struct dt_rec; +struct object_update_param; + +struct update_buffer { + struct object_update_request *ub_req; + size_t ub_req_size; +}; + +/** + * Tracking the updates being executed on this dt_device. + */ struct dt_update_request { struct dt_device *dur_dt; /* attached itself to thandle */ - struct list_head dur_list; int dur_flags; /* update request result */ int dur_rc; /* Current batch(transaction) id */ __u64 dur_batchid; - /* Holding the update req */ - struct object_update_request *dur_req; - int dur_req_len; + /* Holding object updates */ + struct update_buffer dur_buf; struct list_head dur_cb_items; }; -static inline unsigned long -object_update_param_size(const struct object_update_param *param) +struct update_params { + struct object_update_param up_params[0]; +}; + +static inline size_t update_params_size(const struct update_params *params, + unsigned int param_count) { - return cfs_size_round(sizeof(*param) + param->oup_len); + struct object_update_param *param; + size_t total_size = sizeof(*params); + unsigned int i; + + param = (struct object_update_param *)¶ms->up_params[0]; + for (i = 0; i < param_count; i++) { + size_t size = object_update_param_size(param); + + param = (struct object_update_param *)((char *)param + size); + total_size += size; + } + + return total_size; } -static inline unsigned long -object_update_size(const struct object_update *update) +static inline struct object_update_param * +update_params_get_param(const struct update_params *params, + unsigned int index, unsigned int param_count) { - const struct object_update_param *param; - unsigned long size; - int i; + struct object_update_param *param; + unsigned int i; - size = offsetof(struct object_update, ou_params[0]); - for (i = 0; i < update->ou_params_count; i++) { - param = (struct object_update_param *)((char *)update + size); - size += object_update_param_size(param); - } + if (index > param_count) + return NULL; - return size; + param = (struct object_update_param *)¶ms->up_params[0]; + for (i = 0; i < index; i++) + param = (struct object_update_param *)((char *)param + + object_update_param_size(param)); + + return param; +} + +static inline void* +update_params_get_param_buf(const struct update_params *params, __u16 index, + unsigned int param_count, __u16 *size) +{ + struct object_update_param *param; + + param = update_params_get_param(params, (unsigned int)index, + param_count); + if (param == NULL) + return NULL; + + if (size != NULL) + *size = param->oup_len; + + return ¶m->oup_buf[0]; +} + +struct update_op { + struct lu_fid uop_fid; + __u16 uop_type; + __u16 uop_param_count; + __u16 uop_params_off[0]; +}; + +static inline size_t +update_op_size(unsigned int param_count) +{ + return offsetof(struct update_op, uop_params_off[param_count]); +} + +static inline struct update_op * +update_op_next_op(const struct update_op *uop) +{ + return (struct update_op *)((char *)uop + + update_op_size(uop->uop_param_count)); +} + +/* All of updates in the mulitple_update_record */ +struct update_ops { + struct update_op uops_op[0]; +}; + +static inline size_t update_ops_size(const struct update_ops *ops, + unsigned int update_count) +{ + struct update_op *op; + size_t total_size = sizeof(*ops); + unsigned int i; + + op = (struct update_op *)&ops->uops_op[0]; + for (i = 0; i < update_count; i++, op = update_op_next_op(op)) + total_size += update_op_size(op->uop_param_count); + + return total_size; +} + +/* + * This is the update record format used to store the updates in + * disk. All updates of the operation will be stored in ur_ops. + * All of parameters for updates of the operation will be stored + * in ur_params. + * To save the space of the record, parameters in ur_ops will only + * remember their offset in ur_params, so to avoid storing duplicate + * parameters in ur_params, which can help us save a lot space for + * operation like creating striped directory. + */ +struct update_records { + __u64 ur_master_transno; + __u64 ur_batchid; + __u32 ur_flags; + __u32 ur_param_count; + __u32 ur_update_count; + struct update_ops ur_ops; + /* Note ur_ops has a variable size, so comment out + * the following ur_params, in case some use it directly + * update_records->ur_params + * + * struct update_params ur_params; + */ +}; + +struct llog_update_record { + struct llog_rec_hdr lur_hdr; + struct update_records lur_update_rec; + /* Note ur_update_rec has a variable size, so comment out + * the following ur_tail, in case someone use it directly + * + * struct llog_rec_tail lur_tail; + */ +}; + +static inline struct update_params * +update_records_get_params(const struct update_records *record) +{ + return (struct update_params *)((char *)record + + offsetof(struct update_records, ur_ops) + + update_ops_size(&record->ur_ops, record->ur_update_count)); +} + +static inline size_t +update_records_size(const struct update_records *record) +{ + struct update_params *params; + + params = update_records_get_params(record); + + return cfs_size_round(offsetof(struct update_records, ur_ops) + + update_ops_size(&record->ur_ops, record->ur_update_count) + + update_params_size(params, record->ur_param_count)); +} + +static inline size_t +llog_update_record_size(const struct llog_update_record *lur) +{ + return cfs_size_round(sizeof(lur->lur_hdr) + + update_records_size(&lur->lur_update_rec) + + sizeof(struct llog_rec_tail)); +} + +static inline struct update_op * +update_ops_get_op(const struct update_ops *ops, unsigned int index, + unsigned int update_count) +{ + struct update_op *op; + unsigned int i; + + if (index > update_count) + return NULL; + + op = (struct update_op *)&ops->uops_op[0]; + for (i = 0; i < index; i++) + op = update_op_next_op(op); + + return op; } static inline void -*object_update_param_get(const struct object_update *update, int index, - int *size) +*object_update_param_get(const struct object_update *update, size_t index, + size_t *size) { const struct object_update_param *param; - int i; + size_t i; if (index >= update->ou_params_count) - return NULL; + return ERR_PTR(-EINVAL); param = &update->ou_params[0]; for (i = 0; i < index; i++) @@ -98,8 +265,8 @@ static inline void static inline unsigned long object_update_request_size(const struct object_update_request *our) { - unsigned long size; - int i = 0; + unsigned long size; + size_t i = 0; size = offsetof(struct object_update_request, ourq_updates[0]); for (i = 0; i < our->ourq_count; i++) { @@ -111,62 +278,16 @@ object_update_request_size(const struct object_update_request *our) return size; } -static inline struct object_update -*object_update_request_get(const struct object_update_request *our, - int index, int *size) -{ - void *ptr; - int i; - - if (index >= our->ourq_count) - return NULL; - - ptr = (char *)our + offsetof(struct object_update_request, - ourq_updates[0]); - for (i = 0; i < index; i++) - ptr += object_update_size((struct object_update *)ptr); - - if (size != NULL) - *size = object_update_size((struct object_update *)ptr); - - return ptr; -} - static inline void -object_update_reply_init(struct object_update_reply *reply, int count) +object_update_reply_init(struct object_update_reply *reply, size_t count) { reply->ourp_magic = UPDATE_REPLY_MAGIC; reply->ourp_count = count; } -static inline struct object_update_result -*object_update_result_get(const struct object_update_reply *reply, - int index, int *size) -{ - char *ptr; - int count = reply->ourp_count; - int i; - - if (index >= count) - return NULL; - - ptr = (char *)reply + - cfs_size_round(offsetof(struct object_update_reply, - ourp_lens[count])); - for (i = 0; i < index; i++) { - LASSERT(reply->ourp_lens[i] > 0); - ptr += cfs_size_round(reply->ourp_lens[i]); - } - - if (size != NULL) - *size = reply->ourp_lens[index]; - - return (struct object_update_result *)ptr; -} - static inline void object_update_result_insert(struct object_update_reply *reply, - void *data, int data_len, int index, + void *data, size_t data_len, size_t index, int rc) { struct object_update_result *update_result; @@ -190,11 +311,11 @@ object_update_result_insert(struct object_update_reply *reply, static inline int object_update_result_data_get(const struct object_update_reply *reply, - struct lu_buf *lbuf, int index) + struct lu_buf *lbuf, size_t index) { struct object_update_result *update_result; - int size = 0; - int result; + size_t size = 0; + int result; LASSERT(lbuf != NULL); update_result = object_update_result_get(reply, index, &size); @@ -213,9 +334,359 @@ object_update_result_data_get(const struct object_update_reply *reply, return 0; } -static inline void update_inc_batchid(struct dt_update_request *update) +/** + * Attached in the thandle to record the updates for distribute + * distribution. + */ +struct thandle_update_records { + /* All of updates for the cross-MDT operation. */ + struct llog_update_record *tur_update_records; + size_t tur_update_records_buf_size; + + /* All of parameters for the cross-MDT operation */ + struct update_params *tur_update_params; + unsigned int tur_update_param_count; + size_t tur_update_params_buf_size; +}; + +#define TOP_THANDLE_MAGIC 0x20140917 +struct top_multiple_thandle { + struct dt_device *tmt_master_sub_dt; + atomic_t tmt_refcount; + /* Other sub transactions will be listed here. */ + struct list_head tmt_sub_thandle_list; + + struct list_head tmt_commit_list; + /* All of update records will packed here */ + struct thandle_update_records *tmt_update_records; + + __u64 tmt_batchid; + int tmt_result; + __u32 tmt_magic; + __u32 tmt_committed:1; +}; + +/* {top,sub}_thandle are used to manage distributed transactions which + * include updates on several nodes. A top_handle represents the + * whole operation, and sub_thandle represents updates on each node. */ +struct top_thandle { + struct thandle tt_super; + /* The master sub transaction. */ + struct thandle *tt_master_sub_thandle; + + struct top_multiple_thandle *tt_multiple_thandle; +}; + +/* Sub thandle is used to track multiple sub thandles under one parent + * thandle */ +struct sub_thandle { + struct thandle *st_sub_th; + struct dt_device *st_dt; + struct list_head st_list; + struct llog_cookie st_cookie; + struct dt_txn_commit_cb st_commit_dcb; + int st_result; + + /* linked to top_thandle */ + struct list_head st_sub_list; + + /* If this sub thandle is committed */ + bool st_committed:1; +}; + +struct tx_arg; +typedef int (*tx_exec_func_t)(const struct lu_env *env, struct thandle *th, + struct tx_arg *ta); + +/* Structure for holding one update executation */ +struct tx_arg { + tx_exec_func_t exec_fn; + tx_exec_func_t undo_fn; + struct dt_object *object; + const char *file; + struct object_update_reply *reply; + int line; + int index; + union { + struct { + struct dt_insert_rec rec; + const struct dt_key *key; + } insert; + struct { + } ref; + struct { + struct lu_attr attr; + } attr_set; + struct { + struct lu_buf buf; + const char *name; + int flags; + __u32 csum; + } xattr_set; + struct { + struct lu_attr attr; + struct dt_allocation_hint hint; + struct dt_object_format dof; + struct lu_fid fid; + } create; + struct { + struct lu_buf buf; + loff_t pos; + } write; + struct { + struct ost_body *body; + } destroy; + } u; +}; + +/* Structure for holding all update executations of one transaction */ +struct thandle_exec_args { + struct thandle *ta_handle; + int ta_argno; /* used args */ + int ta_alloc_args; /* allocated args count */ + struct tx_arg **ta_args; +}; + +/* target/out_lib.c */ +int out_update_pack(const struct lu_env *env, struct object_update *update, + size_t max_update_size, enum update_type op, + const struct lu_fid *fid, unsigned int params_count, + __u16 *param_sizes, const void **param_bufs); +int out_create_pack(const struct lu_env *env, struct object_update *update, + size_t max_update_size, const struct lu_fid *fid, + const struct lu_attr *attr, struct dt_allocation_hint *hint, + struct dt_object_format *dof); +int out_object_destroy_pack(const struct lu_env *env, + struct object_update *update, + size_t max_update_size, + const struct lu_fid *fid); +int out_index_delete_pack(const struct lu_env *env, + struct object_update *update, size_t max_update_size, + const struct lu_fid *fid, const struct dt_key *key); +int out_index_insert_pack(const struct lu_env *env, + struct object_update *update, size_t max_update_size, + const struct lu_fid *fid, const struct dt_rec *rec, + const struct dt_key *key); +int out_xattr_set_pack(const struct lu_env *env, + struct object_update *update, size_t max_update_size, + const struct lu_fid *fid, const struct lu_buf *buf, + const char *name, __u32 flag); +int out_xattr_del_pack(const struct lu_env *env, + struct object_update *update, size_t max_update_size, + const struct lu_fid *fid, const char *name); +int out_attr_set_pack(const struct lu_env *env, + struct object_update *update, size_t max_update_size, + const struct lu_fid *fid, const struct lu_attr *attr); +int out_ref_add_pack(const struct lu_env *env, + struct object_update *update, size_t max_update_size, + const struct lu_fid *fid); +int out_ref_del_pack(const struct lu_env *env, + struct object_update *update, size_t max_update_size, + const struct lu_fid *fid); +int out_write_pack(const struct lu_env *env, + struct object_update *update, size_t max_update_size, + const struct lu_fid *fid, const struct lu_buf *buf, + __u64 pos); +int out_attr_get_pack(const struct lu_env *env, + struct object_update *update, size_t max_update_size, + const struct lu_fid *fid); +int out_index_lookup_pack(const struct lu_env *env, + struct object_update *update, size_t max_update_size, + const struct lu_fid *fid, struct dt_rec *rec, + const struct dt_key *key); +int out_xattr_get_pack(const struct lu_env *env, + struct object_update *update, size_t max_update_size, + const struct lu_fid *fid, const char *name); +int out_read_pack(const struct lu_env *env, struct object_update *update, + size_t max_update_length, const struct lu_fid *fid, + size_t size, loff_t pos); + +const char *update_op_str(__u16 opcode); + +/* target/update_trans.c */ +struct thandle *thandle_get_sub_by_dt(const struct lu_env *env, + struct thandle *th, + struct dt_device *sub_dt); + +static inline struct thandle * +thandle_get_sub(const struct lu_env *env, struct thandle *th, + const struct dt_object *sub_obj) +{ + return thandle_get_sub_by_dt(env, th, lu2dt_dev(sub_obj->do_lu.lo_dev)); +} + +struct thandle * +top_trans_create(const struct lu_env *env, struct dt_device *master_dev); +int top_trans_start(const struct lu_env *env, struct dt_device *master_dev, + struct thandle *th); +int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev, + struct thandle *th); +void top_multiple_thandle_destroy(struct top_multiple_thandle *tmt); + +static inline void top_multiple_thandle_get(struct top_multiple_thandle *tmt) +{ + atomic_inc(&tmt->tmt_refcount); +} + +static inline void top_multiple_thandle_put(struct top_multiple_thandle *tmt) { - update->dur_batchid++; + if (atomic_dec_and_test(&tmt->tmt_refcount)) + top_multiple_thandle_destroy(tmt); } +struct sub_thandle *lookup_sub_thandle(struct top_multiple_thandle *tmt, + struct dt_device *dt_dev); +int sub_thandle_trans_create(const struct lu_env *env, + struct top_thandle *top_th, + struct sub_thandle *st); + +/* update_records.c */ +int update_records_create_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const struct lu_attr *attr, + const struct dt_allocation_hint *hint, + struct dt_object_format *dof); +int update_records_attr_set_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const struct lu_attr *attr); +int update_records_ref_add_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid); +int update_records_ref_del_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid); +int update_records_object_destroy_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid); +int update_records_index_insert_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const struct dt_rec *rec, + const struct dt_key *key); +int update_records_index_delete_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const struct dt_key *key); +int update_records_xattr_set_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const struct lu_buf *buf, const char *name, + __u32 flag); +int update_records_xattr_del_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const char *name); +int update_records_write_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const struct lu_buf *buf, + __u64 pos); +int update_records_punch_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + __u64 start, __u64 end); + +int tur_update_records_extend(struct thandle_update_records *tur, + size_t new_size); +int tur_update_params_extend(struct thandle_update_records *tur, + size_t new_size); +int tur_update_extend(struct thandle_update_records *tur, + size_t new_op_size, size_t new_param_size); + +#define update_record_pack(name, th, ...) \ +({ \ + struct top_thandle *top_th; \ + struct top_multiple_thandle *tmt; \ + struct thandle_update_records *tur; \ + struct llog_update_record *lur; \ + size_t avail_param_size; \ + size_t avail_op_size; \ + int ret; \ + \ + while (1) { \ + top_th = container_of(th, struct top_thandle, tt_super);\ + tmt = top_th->tt_multiple_thandle; \ + tur = tmt->tmt_update_records; \ + lur = tur->tur_update_records; \ + avail_param_size = tur->tur_update_params_buf_size - \ + update_params_size(tur->tur_update_params, \ + tur->tur_update_param_count); \ + avail_op_size = tur->tur_update_records_buf_size - \ + llog_update_record_size(lur); \ + ret = update_records_##name##_pack(env, \ + &lur->lur_update_rec.ur_ops, \ + &lur->lur_update_rec.ur_update_count, \ + &avail_op_size, \ + tur->tur_update_params, \ + &tur->tur_update_param_count, \ + &avail_param_size, __VA_ARGS__); \ + if (ret == -E2BIG) { \ + ret = tur_update_extend(tur, avail_op_size, \ + avail_param_size); \ + if (ret != 0) \ + break; \ + continue; \ + } else { \ + break; \ + } \ + } \ + ret; \ +}) #endif