X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Finclude%2Flustre_update.h;h=78cd3d4bfdd5190667774d291a0b3d751ebfca84;hb=0c1ae1cb9c19f8a4f6c5a7ff6a1fd54807430795;hp=d79cd70d629e2f1cea882fb5c1235cc634f27939;hpb=4112a290df2763d53760ef6a96ee2453a41f1856;p=fs%2Flustre-release.git diff --git a/lustre/include/lustre_update.h b/lustre/include/lustre_update.h index d79cd70..78cd3d4 100644 --- a/lustre/include/lustre_update.h +++ b/lustre/include/lustre_update.h @@ -20,7 +20,7 @@ * GPL HEADER END */ /* - * Copyright (c) 2013, Intel Corporation. + * Copyright (c) 2013, 2017, Intel Corporation. */ /* * lustre/include/lustre_update.h @@ -30,161 +30,680 @@ #ifndef _LUSTRE_UPDATE_H #define _LUSTRE_UPDATE_H +#include +#include +#include -#define UPDATE_BUFFER_SIZE 8192 -struct update_request { - struct dt_device *ur_dt; - cfs_list_t ur_list; /* attached itself to thandle */ - int ur_flags; - int ur_rc; - struct update_buf *ur_buf; /* Holding the update req */ -}; +#define OUT_UPDATE_REPLY_SIZE 4096 +#define OUT_BULK_BUFFER_SIZE 4096 + +struct dt_key; +struct dt_rec; +struct object_update_param; +struct llog_update_record; -static inline unsigned long update_size(struct update *update) +static inline size_t update_params_size(const struct update_params *params, + unsigned int param_count) { - unsigned long size; - int i; + struct object_update_param *param; + size_t total_size = sizeof(*params); + unsigned int i; - size = cfs_size_round(offsetof(struct update, u_bufs[0])); - for (i = 0; i < UPDATE_BUF_COUNT; i++) - size += cfs_size_round(update->u_lens[i]); + param = (struct object_update_param *)¶ms->up_params[0]; + for (i = 0; i < param_count; i++) { + size_t size = object_update_param_size(param); - return size; + param = (struct object_update_param *)((char *)param + size); + total_size += size; + } + + return total_size; } -static inline void *update_param_buf(struct update *update, int index, - int *size) +static inline struct object_update_param * +update_params_get_param(const struct update_params *params, + unsigned int index, unsigned int param_count) { - int i; - void *ptr; + struct object_update_param *param; + unsigned int i; - if (index >= UPDATE_BUF_COUNT) + if (index > param_count) return NULL; - ptr = (char *)update + cfs_size_round(offsetof(struct update, - u_bufs[0])); - for (i = 0; i < index; i++) { - LASSERT(update->u_lens[i] > 0); - ptr += cfs_size_round(update->u_lens[i]); - } + param = (struct object_update_param *)¶ms->up_params[0]; + for (i = 0; i < index; i++) + param = (struct object_update_param *)((char *)param + + object_update_param_size(param)); + + return param; +} + +static inline void* +update_params_get_param_buf(const struct update_params *params, __u16 index, + unsigned int param_count, __u16 *size) +{ + struct object_update_param *param; + + param = update_params_get_param(params, (unsigned int)index, + param_count); + if (param == NULL) + return NULL; if (size != NULL) - *size = update->u_lens[index]; + *size = param->oup_len; + + return param->oup_buf; +} - return ptr; +static inline size_t +update_op_size(unsigned int param_count) +{ + return offsetof(struct update_op, uop_params_off[param_count]); } -static inline unsigned long update_buf_size(struct update_buf *buf) +static inline struct update_op * +update_op_next_op(const struct update_op *uop) { - unsigned long size; - int i = 0; + return (struct update_op *)((char *)uop + + update_op_size(uop->uop_param_count)); +} - size = cfs_size_round(offsetof(struct update_buf, ub_bufs[0])); - for (i = 0; i < buf->ub_count; i++) { - struct update *update; +static inline size_t update_ops_size(const struct update_ops *ops, + unsigned int update_count) +{ + struct update_op *op; + size_t total_size = sizeof(*ops); + unsigned int i; - update = (struct update *)((char *)buf + size); - size += update_size(update); - } - LASSERT(size <= UPDATE_BUFFER_SIZE); - return size; + op = (struct update_op *)&ops->uops_op[0]; + for (i = 0; i < update_count; i++, op = update_op_next_op(op)) + total_size += update_op_size(op->uop_param_count); + + return total_size; } -static inline void *update_buf_get(struct update_buf *buf, int index, int *size) +static inline struct update_params * +update_records_get_params(const struct update_records *record) { - int count = buf->ub_count; - void *ptr; - int i = 0; + return (struct update_params *)((char *)record + + offsetof(struct update_records, ur_ops) + + update_ops_size(&record->ur_ops, record->ur_update_count)); +} - if (index >= count) - return NULL; +static inline struct update_param * +update_param_next_param(const struct update_param *param) +{ + return (struct update_param *)((char *)param + + object_update_param_size( + (struct object_update_param *)param)); +} - ptr = (char *)buf + cfs_size_round(offsetof(struct update_buf, - ub_bufs[0])); - for (i = 0; i < index; i++) - ptr += update_size((struct update *)ptr); +static inline size_t +__update_records_size(size_t raw_size) +{ + return cfs_size_round(offsetof(struct update_records, ur_ops) + + raw_size); +} - if (size != NULL) - *size = update_size((struct update *)ptr); +static inline size_t +update_records_size(const struct update_records *record) +{ + size_t op_size = 0; + size_t param_size = 0; - return ptr; + if (record->ur_update_count > 0) + op_size = update_ops_size(&record->ur_ops, + record->ur_update_count); + if (record->ur_param_count > 0) { + struct update_params *params; + + params = update_records_get_params(record); + param_size = update_params_size(params, record->ur_param_count); + } + + return __update_records_size(op_size + param_size); } -static inline void update_init_reply_buf(struct update_reply *reply, int count) +static inline size_t +__llog_update_record_size(size_t records_size) { - reply->ur_version = UPDATE_REPLY_V1; - reply->ur_count = count; + return cfs_size_round(sizeof(struct llog_rec_hdr) + records_size + + sizeof(struct llog_rec_tail)); } -static inline void *update_get_buf_internal(struct update_reply *reply, - int index, int *size) +static inline size_t +llog_update_record_size(const struct llog_update_record *lur) { - char *ptr; - int count = reply->ur_count; - int i; + return __llog_update_record_size( + update_records_size(&lur->lur_update_rec)); +} - if (index >= count) +static inline struct update_op * +update_ops_get_op(const struct update_ops *ops, unsigned int index, + unsigned int update_count) +{ + struct update_op *op; + unsigned int i; + + if (index > update_count) return NULL; - ptr = (char *)reply + cfs_size_round(offsetof(struct update_reply, - ur_lens[count])); - for (i = 0; i < index; i++) { - LASSERT(reply->ur_lens[i] > 0); - ptr += cfs_size_round(reply->ur_lens[i]); - } + op = (struct update_op *)&ops->uops_op[0]; + for (i = 0; i < index; i++) + op = update_op_next_op(op); + + return op; +} + +static inline void +*object_update_param_get(const struct object_update *update, size_t index, + size_t *size) +{ + const struct object_update_param *param; + size_t i; + + if (index >= update->ou_params_count) + return ERR_PTR(-EINVAL); + + param = &update->ou_params[0]; + for (i = 0; i < index; i++) + param = (struct object_update_param *)((char *)param + + object_update_param_size(param)); if (size != NULL) - *size = reply->ur_lens[index]; + *size = param->oup_len; + + if (param->oup_len == 0) + return ERR_PTR(-ENODATA); - return ptr; + return (void *)¶m->oup_buf[0]; } -static inline void update_insert_reply(struct update_reply *reply, void *data, - int data_len, int index, int rc) +static inline unsigned long +object_update_request_size(const struct object_update_request *our) { - char *ptr; + unsigned long size; + size_t i = 0; - ptr = update_get_buf_internal(reply, index, NULL); - LASSERT(ptr != NULL); + size = offsetof(struct object_update_request, ourq_updates[0]); + for (i = 0; i < our->ourq_count; i++) { + struct object_update *update; - *(int *)ptr = cpu_to_le32(rc); - ptr += sizeof(int); - if (data_len > 0) { - LASSERT(data != NULL); - memcpy(ptr, data, data_len); + update = (struct object_update *)((char *)our + size); + size += object_update_size(update); } - reply->ur_lens[index] = data_len + sizeof(int); + return size; } -static inline int update_get_reply_buf(struct update_reply *reply, void **buf, - int index) +static inline void +object_update_result_insert(struct object_update_reply *reply, + void *data, size_t data_len, size_t index, + int rc) { - char *ptr; - int size = 0; - int result; + struct object_update_result *update_result; - ptr = update_get_buf_internal(reply, index, &size); - result = *(int *)ptr; + update_result = object_update_result_get(reply, index, NULL); + LASSERT(update_result); + + update_result->our_rc = ptlrpc_status_hton(rc); + if (rc >= 0) { + if (data_len > 0 && data) + memcpy(update_result->our_data, data, data_len); + update_result->our_datalen = data_len; + } + reply->ourp_lens[index] = cfs_size_round(data_len + + sizeof(struct object_update_result)); +} + +static inline int +object_update_result_data_get(const struct object_update_reply *reply, + struct lu_buf *lbuf, size_t index) +{ + struct object_update_result *update_result; + size_t size = 0; + int result; + + LASSERT(lbuf != NULL); + update_result = object_update_result_get(reply, index, &size); + if (update_result == NULL || + size < cfs_size_round(sizeof(struct object_update_reply)) || + update_result->our_datalen > size) + RETURN(-EFAULT); + + result = ptlrpc_status_ntoh(update_result->our_rc); if (result < 0) return result; - LASSERT((ptr != NULL && size >= sizeof(int))); - *buf = ptr + sizeof(int); - return size - sizeof(int); + lbuf->lb_buf = update_result->our_data; + lbuf->lb_len = update_result->our_datalen; + + return result; } -static inline int update_get_reply_result(struct update_reply *reply, - void **buf, int index) -{ - void *ptr; - int size; +/** + * Attached in the thandle to record the updates for distribute + * distribution. + */ +struct thandle_update_records { + /* All of updates for the cross-MDT operation, vmalloc'd. */ + struct llog_update_record *tur_update_records; + size_t tur_update_records_buf_size; + + /* All of parameters for the cross-MDT operation, vmalloc'd */ + struct update_params *tur_update_params; + unsigned int tur_update_param_count; + size_t tur_update_params_buf_size; +}; + +#define TOP_THANDLE_MAGIC 0x20140917 +struct top_multiple_thandle { + struct dt_device *tmt_master_sub_dt; + atomic_t tmt_refcount; + /* Other sub transactions will be listed here. */ + struct list_head tmt_sub_thandle_list; + spinlock_t tmt_sub_lock; + + struct list_head tmt_commit_list; + /* All of update records will packed here */ + struct thandle_update_records *tmt_update_records; + + wait_queue_head_t tmt_stop_waitq; + __u64 tmt_batchid; + int tmt_result; + __u32 tmt_magic; + size_t tmt_record_size; + __u32 tmt_committed:1; +}; - ptr = update_get_buf_internal(reply, index, &size); - LASSERT(ptr != NULL && size > sizeof(int)); - return *(int *)ptr; +/* {top,sub}_thandle are used to manage distributed transactions which + * include updates on several nodes. A top_handle represents the + * whole operation, and sub_thandle represents updates on each node. */ +struct top_thandle { + struct thandle tt_super; + /* The master sub transaction. */ + struct thandle *tt_master_sub_thandle; + + struct top_multiple_thandle *tt_multiple_thandle; +}; + +struct sub_thandle_cookie { + struct llog_cookie stc_cookie; + struct list_head stc_list; +}; + +/* Sub thandle is used to track multiple sub thandles under one parent + * thandle */ +struct sub_thandle { + struct thandle *st_sub_th; + struct dt_device *st_dt; + struct list_head st_cookie_list; + struct dt_txn_commit_cb st_commit_dcb; + struct dt_txn_commit_cb st_stop_dcb; + int st_result; + + /* linked to top_thandle */ + struct list_head st_sub_list; + + /* If this sub thandle is committed */ + bool st_committed:1, + st_stopped:1, + st_started:1; +}; + +struct tx_arg; +typedef int (*tx_exec_func_t)(const struct lu_env *env, struct thandle *th, + struct tx_arg *ta); + +/* Structure for holding one update execution */ +struct tx_arg { + tx_exec_func_t exec_fn; + tx_exec_func_t undo_fn; + struct dt_object *object; + const char *file; + struct object_update_reply *reply; + int line; + int index; + union { + struct { + struct dt_insert_rec rec; + const struct dt_key *key; + } insert; + struct { + } ref; + struct { + struct lu_attr attr; + } attr_set; + struct { + struct lu_buf buf; + const char *name; + int flags; + __u32 csum; + } xattr_set; + struct { + struct lu_attr attr; + struct dt_allocation_hint hint; + struct dt_object_format dof; + struct lu_fid fid; + } create; + struct { + struct lu_buf buf; + loff_t pos; + } write; + struct { + struct ost_body *body; + } destroy; + } u; +}; + +/* Structure for holding all update executations of one transaction */ +struct thandle_exec_args { + struct thandle *ta_handle; + int ta_argno; /* used args */ + int ta_alloc_args; /* allocated args count */ + struct tx_arg **ta_args; +}; + +/* target/out_lib.c */ +int out_update_pack(const struct lu_env *env, struct object_update *update, + size_t *max_update_size, enum update_type op, + const struct lu_fid *fid, unsigned int params_count, + __u16 *param_sizes, const void **param_bufs, + __u32 reply_size); +int out_create_pack(const struct lu_env *env, struct object_update *update, + size_t *max_update_size, const struct lu_fid *fid, + const struct lu_attr *attr, struct dt_allocation_hint *hint, + struct dt_object_format *dof); +int out_destroy_pack(const struct lu_env *env, struct object_update *update, + size_t *max_update_size, const struct lu_fid *fid); +int out_index_delete_pack(const struct lu_env *env, + struct object_update *update, size_t *max_update_size, + const struct lu_fid *fid, const struct dt_key *key); +int out_index_insert_pack(const struct lu_env *env, + struct object_update *update, size_t *max_update_size, + const struct lu_fid *fid, const struct dt_rec *rec, + const struct dt_key *key); +int out_xattr_set_pack(const struct lu_env *env, + struct object_update *update, size_t *max_update_size, + const struct lu_fid *fid, const struct lu_buf *buf, + const char *name, __u32 flag); +int out_xattr_del_pack(const struct lu_env *env, + struct object_update *update, size_t *max_update_size, + const struct lu_fid *fid, const char *name); +int out_attr_set_pack(const struct lu_env *env, + struct object_update *update, size_t *max_update_size, + const struct lu_fid *fid, const struct lu_attr *attr); +int out_ref_add_pack(const struct lu_env *env, + struct object_update *update, size_t *max_update_size, + const struct lu_fid *fid); +int out_ref_del_pack(const struct lu_env *env, + struct object_update *update, size_t *max_update_size, + const struct lu_fid *fid); +int out_write_pack(const struct lu_env *env, + struct object_update *update, size_t *max_update_size, + const struct lu_fid *fid, const struct lu_buf *buf, + __u64 pos); +int out_attr_get_pack(const struct lu_env *env, + struct object_update *update, size_t *max_update_size, + const struct lu_fid *fid); +int out_index_lookup_pack(const struct lu_env *env, + struct object_update *update, size_t *max_update_size, + const struct lu_fid *fid, struct dt_rec *rec, + const struct dt_key *key); +int out_xattr_get_pack(const struct lu_env *env, + struct object_update *update, size_t *max_update_size, + const struct lu_fid *fid, const char *name, + const int bufsize); +int out_xattr_list_pack(const struct lu_env *env, struct object_update *update, + size_t *max_update_size, const struct lu_fid *fid, + const int bufsize); +int out_read_pack(const struct lu_env *env, struct object_update *update, + size_t *max_update_length, const struct lu_fid *fid, + size_t size, loff_t pos); + +const char *update_op_str(__u16 opcode); + +/* target/update_trans.c */ +struct thandle *thandle_get_sub_by_dt(const struct lu_env *env, + struct thandle *th, + struct dt_device *sub_dt); + +static inline struct thandle * +thandle_get_sub(const struct lu_env *env, struct thandle *th, + const struct dt_object *sub_obj) +{ + return thandle_get_sub_by_dt(env, th, lu2dt_dev(sub_obj->do_lu.lo_dev)); } -#endif +struct thandle * +top_trans_create(const struct lu_env *env, struct dt_device *master_dev); +int top_trans_start(const struct lu_env *env, struct dt_device *master_dev, + struct thandle *th); +int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev, + struct thandle *th); +void top_multiple_thandle_destroy(struct top_multiple_thandle *tmt); +static inline void top_multiple_thandle_get(struct top_multiple_thandle *tmt) +{ + atomic_inc(&tmt->tmt_refcount); +} +static inline void top_multiple_thandle_put(struct top_multiple_thandle *tmt) +{ + if (atomic_dec_and_test(&tmt->tmt_refcount)) + top_multiple_thandle_destroy(tmt); +} + +struct sub_thandle *lookup_sub_thandle(struct top_multiple_thandle *tmt, + struct dt_device *dt_dev); +int sub_thandle_trans_create(const struct lu_env *env, + struct top_thandle *top_th, + struct sub_thandle *st); + +/* update_records.c */ +size_t update_records_create_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct lu_attr *attr, + const struct dt_allocation_hint *hint, + struct dt_object_format *dof); +size_t update_records_attr_set_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct lu_attr *attr); +size_t update_records_ref_add_size(const struct lu_env *env, + const struct lu_fid *fid); +size_t update_records_ref_del_size(const struct lu_env *env, + const struct lu_fid *fid); +size_t update_records_destroy_size(const struct lu_env *env, + const struct lu_fid *fid); +size_t update_records_index_insert_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct dt_rec *rec, + const struct dt_key *key); +size_t update_records_index_delete_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct dt_key *key); +size_t update_records_xattr_set_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct lu_buf *buf, + const char *name, + __u32 flag); +size_t update_records_xattr_del_size(const struct lu_env *env, + const struct lu_fid *fid, + const char *name); +size_t update_records_write_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct lu_buf *buf, + __u64 pos); +size_t update_records_punch_size(const struct lu_env *env, + const struct lu_fid *fid, + __u64 start, __u64 end); + +int update_records_create_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const struct lu_attr *attr, + const struct dt_allocation_hint *hint, + struct dt_object_format *dof); +int update_records_attr_set_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const struct lu_attr *attr); +int update_records_ref_add_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid); +int update_records_ref_del_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid); +int update_records_destroy_pack(const struct lu_env *env, + struct update_ops *ops, unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid); +int update_records_index_insert_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const struct dt_rec *rec, + const struct dt_key *key); +int update_records_index_delete_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const struct dt_key *key); +int update_records_xattr_set_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const struct lu_buf *buf, const char *name, + __u32 flag); +int update_records_xattr_del_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const char *name); +int update_records_write_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + const struct lu_buf *buf, + __u64 pos); +int update_records_punch_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid, + __u64 start, __u64 end); +int update_records_noop_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid); + +int tur_update_records_extend(struct thandle_update_records *tur, + size_t new_size); +int tur_update_params_extend(struct thandle_update_records *tur, + size_t new_size); +int tur_update_extend(struct thandle_update_records *tur, + size_t new_op_size, size_t new_param_size); + +#define update_record_pack(name, th, ...) \ +({ \ + struct top_thandle *top_th; \ + struct top_multiple_thandle *tmt; \ + struct thandle_update_records *tur; \ + struct llog_update_record *lur; \ + size_t avail_param_size; \ + size_t avail_op_size; \ + int ret; \ + \ + while (1) { \ + top_th = container_of(th, struct top_thandle, tt_super);\ + tmt = top_th->tt_multiple_thandle; \ + tur = tmt->tmt_update_records; \ + lur = tur->tur_update_records; \ + avail_param_size = tur->tur_update_params_buf_size - \ + update_params_size(tur->tur_update_params, \ + tur->tur_update_param_count); \ + avail_op_size = tur->tur_update_records_buf_size - \ + llog_update_record_size(lur); \ + ret = update_records_##name##_pack(env, \ + &lur->lur_update_rec.ur_ops, \ + &lur->lur_update_rec.ur_update_count, \ + &avail_op_size, \ + tur->tur_update_params, \ + &tur->tur_update_param_count, \ + &avail_param_size, __VA_ARGS__); \ + if (ret == -E2BIG) { \ + ret = tur_update_extend(tur, avail_op_size, \ + avail_param_size); \ + if (ret != 0) \ + break; \ + continue; \ + } else { \ + break; \ + } \ + } \ + ret; \ +}) + +#define update_record_size(env, name, th, ...) \ +({ \ + struct top_thandle *top_th; \ + struct top_multiple_thandle *tmt; \ + \ + top_th = container_of(th, struct top_thandle, tt_super); \ + \ + LASSERT(top_th->tt_multiple_thandle != NULL); \ + tmt = top_th->tt_multiple_thandle; \ + tmt->tmt_record_size += \ + update_records_##name##_size(env, __VA_ARGS__); \ +}) +#endif