X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Finclude%2Flustre_update.h;h=78cd3d4bfdd5190667774d291a0b3d751ebfca84;hp=3d941b927c3332d8766ef3e08304985284bbd3f8;hb=HEAD;hpb=de8572645d287d17c409b99dabdf176822d91486 diff --git a/lustre/include/lustre_update.h b/lustre/include/lustre_update.h index 3d941b9..da90e7a 100644 --- a/lustre/include/lustre_update.h +++ b/lustre/include/lustre_update.h @@ -20,7 +20,7 @@ * GPL HEADER END */ /* - * Copyright (c) 2013, 2014, Intel Corporation. + * Copyright (c) 2013, 2017, Intel Corporation. */ /* * lustre/include/lustre_update.h @@ -30,55 +30,169 @@ #ifndef _LUSTRE_UPDATE_H #define _LUSTRE_UPDATE_H -#include #include +#include +#include -#define OUT_UPDATE_INIT_BUFFER_SIZE 4096 -#define OUT_UPDATE_REPLY_SIZE 8192 +#define OUT_UPDATE_REPLY_SIZE 4096 +#define OUT_BULK_BUFFER_SIZE 4096 struct dt_key; struct dt_rec; +struct object_update_param; +struct llog_update_record; -struct update_buffer { - struct object_update_request *ub_req; - size_t ub_req_size; -}; +static inline size_t update_params_size(const struct update_params *params, + unsigned int param_count) +{ + struct object_update_param *param; + size_t total_size = sizeof(*params); + unsigned int i; -#define TOP_THANDLE_MAGIC 0x20140917 -/* {top,sub}_thandle are used to manage distributed transactions which - * include updates on several nodes. A top_handle represents the - * whole operation, and sub_thandle represents updates on each node. */ -struct top_thandle { - struct thandle tt_super; - __u32 tt_magic; - /* The master sub transaction. */ - struct thandle *tt_master_sub_thandle; + param = (struct object_update_param *)¶ms->up_params[0]; + for (i = 0; i < param_count; i++) { + size_t size = object_update_param_size(param); - /* Other sub thandle will be listed here. */ - struct list_head tt_sub_thandle_list; -}; + param = (struct object_update_param *)((char *)param + size); + total_size += size; + } -struct sub_thandle { - /* point to the osd/osp_thandle */ - struct thandle *st_sub_th; - struct list_head st_sub_list; -}; + return total_size; +} -/** - * Tracking the updates being executed on this dt_device. - */ -struct dt_update_request { - struct dt_device *dur_dt; - /* attached itself to thandle */ - int dur_flags; - /* update request result */ - int dur_rc; - /* Current batch(transaction) id */ - __u64 dur_batchid; - /* Holding object updates */ - struct update_buffer dur_buf; - struct list_head dur_cb_items; -}; +static inline struct object_update_param * +update_params_get_param(const struct update_params *params, + unsigned int index, unsigned int param_count) +{ + struct object_update_param *param; + unsigned int i; + + if (index > param_count) + return NULL; + + param = (struct object_update_param *)¶ms->up_params[0]; + for (i = 0; i < index; i++) + param = (struct object_update_param *)((char *)param + + object_update_param_size(param)); + + return param; +} + +static inline void* +update_params_get_param_buf(const struct update_params *params, __u16 index, + unsigned int param_count, __u16 *size) +{ + struct object_update_param *param; + + param = update_params_get_param(params, (unsigned int)index, + param_count); + if (param == NULL) + return NULL; + + if (size != NULL) + *size = param->oup_len; + + return param->oup_buf; +} + +static inline size_t +update_op_size(unsigned int param_count) +{ + return offsetof(struct update_op, uop_params_off[param_count]); +} + +static inline struct update_op * +update_op_next_op(const struct update_op *uop) +{ + return (struct update_op *)((char *)uop + + update_op_size(uop->uop_param_count)); +} + +static inline size_t update_ops_size(const struct update_ops *ops, + unsigned int update_count) +{ + struct update_op *op; + size_t total_size = sizeof(*ops); + unsigned int i; + + op = (struct update_op *)&ops->uops_op[0]; + for (i = 0; i < update_count; i++, op = update_op_next_op(op)) + total_size += update_op_size(op->uop_param_count); + + return total_size; +} + +static inline struct update_params * +update_records_get_params(const struct update_records *record) +{ + return (struct update_params *)((char *)record + + offsetof(struct update_records, ur_ops) + + update_ops_size(&record->ur_ops, record->ur_update_count)); +} + +static inline struct update_param * +update_param_next_param(const struct update_param *param) +{ + return (struct update_param *)((char *)param + + object_update_param_size( + (struct object_update_param *)param)); +} + +static inline size_t +__update_records_size(size_t raw_size) +{ + return round_up(offsetof(struct update_records, ur_ops) + raw_size, 8); +} + +static inline size_t +update_records_size(const struct update_records *record) +{ + size_t op_size = 0; + size_t param_size = 0; + + if (record->ur_update_count > 0) + op_size = update_ops_size(&record->ur_ops, + record->ur_update_count); + if (record->ur_param_count > 0) { + struct update_params *params; + + params = update_records_get_params(record); + param_size = update_params_size(params, record->ur_param_count); + } + + return __update_records_size(op_size + param_size); +} + +static inline size_t +__llog_update_record_size(size_t records_size) +{ + return round_up(sizeof(struct llog_rec_hdr) + records_size + + sizeof(struct llog_rec_tail), 8); +} + +static inline size_t +llog_update_record_size(const struct llog_update_record *lur) +{ + return __llog_update_record_size( + update_records_size(&lur->lur_update_rec)); +} + +static inline struct update_op * +update_ops_get_op(const struct update_ops *ops, unsigned int index, + unsigned int update_count) +{ + struct update_op *op; + unsigned int i; + + if (index > update_count) + return NULL; + + op = (struct update_op *)&ops->uops_op[0]; + for (i = 0; i < index; i++) + op = update_op_next_op(op); + + return op; +} static inline void *object_update_param_get(const struct object_update *update, size_t index, @@ -88,7 +202,7 @@ static inline void size_t i; if (index >= update->ou_params_count) - return NULL; + return ERR_PTR(-EINVAL); param = &update->ou_params[0]; for (i = 0; i < index; i++) @@ -99,7 +213,7 @@ static inline void *size = param->oup_len; if (param->oup_len == 0) - return NULL; + return ERR_PTR(-ENODATA); return (void *)¶m->oup_buf[0]; } @@ -121,34 +235,25 @@ object_update_request_size(const struct object_update_request *our) } static inline void -object_update_reply_init(struct object_update_reply *reply, size_t count) -{ - reply->ourp_magic = UPDATE_REPLY_MAGIC; - reply->ourp_count = count; -} - -static inline void object_update_result_insert(struct object_update_reply *reply, void *data, size_t data_len, size_t index, int rc) { struct object_update_result *update_result; - char *ptr; update_result = object_update_result_get(reply, index, NULL); - LASSERT(update_result != NULL); + LASSERT(update_result); update_result->our_rc = ptlrpc_status_hton(rc); - if (data_len > 0) { - LASSERT(data != NULL); - ptr = (char *)update_result + - cfs_size_round(sizeof(struct object_update_reply)); + if (rc >= 0) { + if (data_len > 0 && data) + memcpy(update_result->our_data, data, data_len); update_result->our_datalen = data_len; - memcpy(ptr, data, data_len); } - reply->ourp_lens[index] = cfs_size_round(data_len + - sizeof(struct object_update_result)); + reply->ourp_lens[index] = round_up(data_len + + sizeof(struct object_update_result), + 8); } static inline int @@ -162,7 +267,7 @@ object_update_result_data_get(const struct object_update_reply *reply, LASSERT(lbuf != NULL); update_result = object_update_result_get(reply, index, &size); if (update_result == NULL || - size < cfs_size_round(sizeof(struct object_update_reply)) || + size < round_up(sizeof(struct object_update_reply), 8) || update_result->our_datalen > size) RETURN(-EFAULT); @@ -173,55 +278,190 @@ object_update_result_data_get(const struct object_update_reply *reply, lbuf->lb_buf = update_result->our_data; lbuf->lb_len = update_result->our_datalen; - return 0; + return result; } -static inline void update_inc_batchid(struct dt_update_request *update) -{ - update->dur_batchid++; -} +/** + * Attached in the thandle to record the updates for distribute + * distribution. + */ +struct thandle_update_records { + /* All of updates for the cross-MDT operation, vmalloc'd. */ + struct llog_update_record *tur_update_records; + size_t tur_update_records_buf_size; + + /* All of parameters for the cross-MDT operation, vmalloc'd */ + struct update_params *tur_update_params; + unsigned int tur_update_param_count; + size_t tur_update_params_buf_size; +}; + +#define TOP_THANDLE_MAGIC 0x20140917 +struct top_multiple_thandle { + struct dt_device *tmt_master_sub_dt; + struct kref tmt_refcount; + /* Other sub transactions will be listed here. */ + struct list_head tmt_sub_thandle_list; + spinlock_t tmt_sub_lock; + + struct list_head tmt_commit_list; + /* All of update records will packed here */ + struct thandle_update_records *tmt_update_records; + + wait_queue_head_t tmt_stop_waitq; + __u64 tmt_batchid; + int tmt_result; + __u32 tmt_magic; + size_t tmt_record_size; + __u32 tmt_committed:1; +}; + +/* {top,sub}_thandle are used to manage distributed transactions which + * include updates on several nodes. A top_handle represents the + * whole operation, and sub_thandle represents updates on each node. + */ +struct top_thandle { + struct thandle tt_super; + /* The master sub transaction. */ + struct thandle *tt_master_sub_thandle; + + struct top_multiple_thandle *tt_multiple_thandle; +}; + +struct sub_thandle_cookie { + struct llog_cookie stc_cookie; + struct list_head stc_list; +}; + +/* Sub thandle used to track multiple sub thandles under one parent thandle */ +struct sub_thandle { + struct thandle *st_sub_th; + struct dt_device *st_dt; + struct list_head st_cookie_list; + struct dt_txn_commit_cb st_commit_dcb; + struct dt_txn_commit_cb st_stop_dcb; + int st_result; + + /* linked to top_thandle */ + struct list_head st_sub_list; + + /* If this sub thandle is committed */ + bool st_committed:1, + st_stopped:1, + st_started:1; +}; + +struct tx_arg; +typedef int (*tx_exec_func_t)(const struct lu_env *env, struct thandle *th, + struct tx_arg *ta); + +/* Structure for holding one update execution */ +struct tx_arg { + tx_exec_func_t exec_fn; + tx_exec_func_t undo_fn; + struct dt_object *object; + const char *file; + struct object_update_reply *reply; + int line; + int index; + union { + struct { + struct dt_insert_rec rec; + const struct dt_key *key; + } insert; + struct { + } ref; + struct { + struct lu_attr attr; + } attr_set; + struct { + struct lu_buf buf; + const char *name; + int flags; + __u32 csum; + } xattr_set; + struct { + struct lu_attr attr; + struct dt_allocation_hint hint; + struct dt_object_format dof; + struct lu_fid fid; + } create; + struct { + struct lu_buf buf; + loff_t pos; + } write; + struct { + struct ost_body *body; + } destroy; + } u; +}; + +/* Structure for holding all update executations of one transaction */ +struct thandle_exec_args { + struct thandle *ta_handle; + int ta_argno; /* used args */ + int ta_alloc_args; /* allocated args count */ + struct tx_arg **ta_args; +}; /* target/out_lib.c */ -int out_update_pack(const struct lu_env *env, struct update_buffer *ubuf, - enum update_type op, const struct lu_fid *fid, - int params_count, __u16 *param_sizes, const void **bufs, - __u64 batchid); -int out_create_pack(const struct lu_env *env, struct update_buffer *ubuf, - const struct lu_fid *fid, struct lu_attr *attr, - struct dt_allocation_hint *hint, - struct dt_object_format *dof, __u64 batchid); -int out_object_destroy_pack(const struct lu_env *env, - struct update_buffer *ubuf, - const struct lu_fid *fid, __u64 batchid); -int out_index_delete_pack(const struct lu_env *env, struct update_buffer *ubuf, - const struct lu_fid *fid, const struct dt_key *key, - __u64 batchid); -int out_index_insert_pack(const struct lu_env *env, struct update_buffer *ubuf, +int out_update_pack(const struct lu_env *env, struct object_update *update, + size_t *max_update_size, enum update_type op, + const struct lu_fid *fid, unsigned int params_count, + __u16 *param_sizes, const void **param_bufs, + __u32 reply_size); +int out_create_pack(const struct lu_env *env, struct object_update *update, + size_t *max_update_size, const struct lu_fid *fid, + const struct lu_attr *attr, struct dt_allocation_hint *hint, + struct dt_object_format *dof); +int out_destroy_pack(const struct lu_env *env, struct object_update *update, + size_t *max_update_size, const struct lu_fid *fid); +int out_index_delete_pack(const struct lu_env *env, + struct object_update *update, size_t *max_update_size, + const struct lu_fid *fid, const struct dt_key *key); +int out_index_insert_pack(const struct lu_env *env, + struct object_update *update, size_t *max_update_size, const struct lu_fid *fid, const struct dt_rec *rec, - const struct dt_key *key, __u64 batchid); -int out_xattr_set_pack(const struct lu_env *env, struct update_buffer *ubuf, + const struct dt_key *key); +int out_xattr_set_pack(const struct lu_env *env, + struct object_update *update, size_t *max_update_size, const struct lu_fid *fid, const struct lu_buf *buf, - const char *name, int flag, __u64 batchid); -int out_xattr_del_pack(const struct lu_env *env, struct update_buffer *ubuf, - const struct lu_fid *fid, const char *name, - __u64 batchid); -int out_attr_set_pack(const struct lu_env *env, struct update_buffer *ubuf, - const struct lu_fid *fid, const struct lu_attr *attr, - __u64 batchid); -int out_ref_add_pack(const struct lu_env *env, struct update_buffer *ubuf, - const struct lu_fid *fid, __u64 batchid); -int out_ref_del_pack(const struct lu_env *env, struct update_buffer *ubuf, - const struct lu_fid *fid, __u64 batchid); -int out_write_pack(const struct lu_env *env, struct update_buffer *ubuf, + const char *name, __u32 flag); +int out_xattr_del_pack(const struct lu_env *env, + struct object_update *update, size_t *max_update_size, + const struct lu_fid *fid, const char *name); +int out_attr_set_pack(const struct lu_env *env, + struct object_update *update, size_t *max_update_size, + const struct lu_fid *fid, const struct lu_attr *attr); +int out_ref_add_pack(const struct lu_env *env, + struct object_update *update, size_t *max_update_size, + const struct lu_fid *fid); +int out_ref_del_pack(const struct lu_env *env, + struct object_update *update, size_t *max_update_size, + const struct lu_fid *fid); +int out_write_pack(const struct lu_env *env, + struct object_update *update, size_t *max_update_size, const struct lu_fid *fid, const struct lu_buf *buf, - loff_t pos, __u64 batchid); -int out_attr_get_pack(const struct lu_env *env, struct update_buffer *ubuf, + __u64 pos); +int out_attr_get_pack(const struct lu_env *env, + struct object_update *update, size_t *max_update_size, const struct lu_fid *fid); -int out_index_lookup_pack(const struct lu_env *env, struct update_buffer *ubuf, +int out_index_lookup_pack(const struct lu_env *env, + struct object_update *update, size_t *max_update_size, const struct lu_fid *fid, struct dt_rec *rec, const struct dt_key *key); -int out_xattr_get_pack(const struct lu_env *env, struct update_buffer *ubuf, - const struct lu_fid *fid, const char *name); +int out_xattr_get_pack(const struct lu_env *env, + struct object_update *update, size_t *max_update_size, + const struct lu_fid *fid, const char *name, + const int bufsize); +int out_xattr_list_pack(const struct lu_env *env, struct object_update *update, + size_t *max_update_size, const struct lu_fid *fid, + const int bufsize); +int out_read_pack(const struct lu_env *env, struct object_update *update, + size_t *max_update_length, const struct lu_fid *fid, + size_t size, loff_t pos); + +const char *update_op_str(__u16 opcode); /* target/update_trans.c */ struct thandle *thandle_get_sub_by_dt(const struct lu_env *env, @@ -237,12 +477,232 @@ thandle_get_sub(const struct lu_env *env, struct thandle *th, struct thandle * top_trans_create(const struct lu_env *env, struct dt_device *master_dev); - int top_trans_start(const struct lu_env *env, struct dt_device *master_dev, struct thandle *th); - int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev, struct thandle *th); +void top_multiple_thandle_destroy(struct kref *kref); + +static inline void top_multiple_thandle_get(struct top_multiple_thandle *tmt) +{ + kref_get(&tmt->tmt_refcount); +} + +static inline void top_multiple_thandle_put(struct top_multiple_thandle *tmt) +{ + kref_put(&tmt->tmt_refcount, top_multiple_thandle_destroy); +} -void top_thandle_destroy(struct top_thandle *top_th); +struct sub_thandle *lookup_sub_thandle(struct top_multiple_thandle *tmt, + struct dt_device *dt_dev); +int sub_thandle_trans_create(const struct lu_env *env, + struct top_thandle *top_th, + struct sub_thandle *st); + +/* update_records.c */ +size_t update_records_create_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct lu_attr *attr, + const struct dt_allocation_hint *hint, + struct dt_object_format *dof); +size_t update_records_attr_set_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct lu_attr *attr); +size_t update_records_ref_add_size(const struct lu_env *env, + const struct lu_fid *fid); +size_t update_records_ref_del_size(const struct lu_env *env, + const struct lu_fid *fid); +size_t update_records_destroy_size(const struct lu_env *env, + const struct lu_fid *fid); +size_t update_records_index_insert_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct dt_rec *rec, + const struct dt_key *key); +size_t update_records_index_delete_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct dt_key *key); +size_t update_records_xattr_set_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct lu_buf *buf, + const char *name, + __u32 flag); +size_t update_records_xattr_del_size(const struct lu_env *env, + const struct lu_fid *fid, + const char *name); +size_t update_records_write_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct lu_buf *buf, + __u64 pos); +size_t update_records_punch_size(const struct lu_env *env, + const struct lu_fid *fid, + __u64 start, __u64 end); + +int update_records_create_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const struct lu_attr *attr, + const struct dt_allocation_hint *hint, + struct dt_object_format *dof); +int update_records_attr_set_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const struct lu_attr *attr); +int update_records_ref_add_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid); +int update_records_ref_del_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid); +int update_records_destroy_pack(const struct lu_env *env, + struct update_ops *ops, unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid); +int update_records_index_insert_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const struct dt_rec *rec, + const struct dt_key *key); +int update_records_index_delete_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const struct dt_key *key); +int update_records_xattr_set_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const struct lu_buf *buf, const char *name, + __u32 flag); +int update_records_xattr_del_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const char *name); +int update_records_write_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const struct lu_buf *buf, + __u64 pos); +int update_records_punch_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + __u64 start, __u64 end); +int update_records_noop_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid); + +int tur_update_records_extend(struct thandle_update_records *tur, + size_t new_size); +int tur_update_params_extend(struct thandle_update_records *tur, + size_t new_size); +int tur_update_extend(struct thandle_update_records *tur, + size_t new_op_size, size_t new_param_size); + +#define update_record_pack(name, th, ...) \ +({ \ + struct top_thandle *top_th; \ + struct top_multiple_thandle *tmt; \ + struct thandle_update_records *tur; \ + struct llog_update_record *lur; \ + size_t avail_param_size; \ + size_t avail_op_size; \ + int ret; \ + \ + while (1) { \ + top_th = container_of(th, struct top_thandle, tt_super);\ + tmt = top_th->tt_multiple_thandle; \ + tur = tmt->tmt_update_records; \ + lur = tur->tur_update_records; \ + avail_param_size = tur->tur_update_params_buf_size - \ + update_params_size(tur->tur_update_params, \ + tur->tur_update_param_count); \ + avail_op_size = tur->tur_update_records_buf_size - \ + llog_update_record_size(lur); \ + ret = update_records_##name##_pack(env, \ + &lur->lur_update_rec.ur_ops, \ + &lur->lur_update_rec.ur_update_count, \ + &avail_op_size, \ + tur->tur_update_params, \ + &tur->tur_update_param_count, \ + &avail_param_size, __VA_ARGS__); \ + if (ret == -E2BIG) { \ + ret = tur_update_extend(tur, avail_op_size, \ + avail_param_size); \ + if (ret != 0) \ + break; \ + continue; \ + } else { \ + break; \ + } \ + } \ + ret; \ +}) + +#define update_record_size(env, name, th, ...) \ +({ \ + struct top_thandle *top_th; \ + struct top_multiple_thandle *tmt; \ + \ + top_th = container_of(th, struct top_thandle, tt_super); \ + \ + LASSERT(top_th->tt_multiple_thandle != NULL); \ + tmt = top_th->tt_multiple_thandle; \ + tmt->tmt_record_size += \ + update_records_##name##_size(env, __VA_ARGS__); \ +}) #endif