X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Finclude%2Flustre_update.h;h=78cd3d4bfdd5190667774d291a0b3d751ebfca84;hb=6d5fe29066af5f8e40055fd89b285853c363e947;hp=4cb87e02f1b675c73eaf4d9ea184e0b41716cc89;hpb=4dcd475716418d7b2093d917a8bffa9ad14e4465;p=fs%2Flustre-release.git diff --git a/lustre/include/lustre_update.h b/lustre/include/lustre_update.h index 4cb87e0..78cd3d4 100644 --- a/lustre/include/lustre_update.h +++ b/lustre/include/lustre_update.h @@ -20,7 +20,7 @@ * GPL HEADER END */ /* - * Copyright (c) 2013, 2014, Intel Corporation. + * Copyright (c) 2013, 2017, Intel Corporation. */ /* * lustre/include/lustre_update.h @@ -30,41 +30,17 @@ #ifndef _LUSTRE_UPDATE_H #define _LUSTRE_UPDATE_H -#include #include +#include +#include -#define OUT_UPDATE_INIT_BUFFER_SIZE 4096 -/* 16KB, the current biggest size is llog header(8KB) */ -#define OUT_UPDATE_REPLY_SIZE 16384 +#define OUT_UPDATE_REPLY_SIZE 4096 +#define OUT_BULK_BUFFER_SIZE 4096 struct dt_key; struct dt_rec; struct object_update_param; - -struct update_buffer { - struct object_update_request *ub_req; - size_t ub_req_size; -}; - -/** - * Tracking the updates being executed on this dt_device. - */ -struct dt_update_request { - struct dt_device *dur_dt; - /* attached itself to thandle */ - int dur_flags; - /* update request result */ - int dur_rc; - /* Current batch(transaction) id */ - __u64 dur_batchid; - /* Holding object updates */ - struct update_buffer dur_buf; - struct list_head dur_cb_items; -}; - -struct update_params { - struct object_update_param up_params[0]; -}; +struct llog_update_record; static inline size_t update_params_size(const struct update_params *params, unsigned int param_count) @@ -102,12 +78,22 @@ update_params_get_param(const struct update_params *params, return param; } -struct update_op { - struct lu_fid uop_fid; - __u16 uop_type; - __u16 uop_param_count; - __u16 uop_params_off[0]; -}; +static inline void* +update_params_get_param_buf(const struct update_params *params, __u16 index, + unsigned int param_count, __u16 *size) +{ + struct object_update_param *param; + + param = update_params_get_param(params, (unsigned int)index, + param_count); + if (param == NULL) + return NULL; + + if (size != NULL) + *size = param->oup_len; + + return param->oup_buf; +} static inline size_t update_op_size(unsigned int param_count) @@ -122,11 +108,6 @@ update_op_next_op(const struct update_op *uop) update_op_size(uop->uop_param_count)); } -/* All of updates in the mulitple_update_record */ -struct update_ops { - struct update_op uops_op[0]; -}; - static inline size_t update_ops_size(const struct update_ops *ops, unsigned int update_count) { @@ -141,41 +122,6 @@ static inline size_t update_ops_size(const struct update_ops *ops, return total_size; } -/* - * This is the update record format used to store the updates in - * disk. All updates of the operation will be stored in ur_ops. - * All of parameters for updates of the operation will be stored - * in ur_params. - * To save the space of the record, parameters in ur_ops will only - * remember their offset in ur_params, so to avoid storing duplicate - * parameters in ur_params, which can help us save a lot space for - * operation like creating striped directory. - */ -struct update_records { - __u64 ur_master_transno; - __u64 ur_batchid; - __u32 ur_flags; - __u32 ur_param_count; - __u32 ur_update_count; - struct update_ops ur_ops; - /* Note ur_ops has a variable size, so comment out - * the following ur_params, in case some use it directly - * update_records->ur_params - * - * struct update_params ur_params; - */ -}; - -struct llog_update_record { - struct llog_rec_hdr lur_hdr; - struct update_records lur_update_rec; - /* Note ur_update_rec has a variable size, so comment out - * the following ur_tail, in case someone use it directly - * - * struct llog_rec_tail lur_tail; - */ -}; - static inline struct update_params * update_records_get_params(const struct update_records *record) { @@ -184,26 +130,54 @@ update_records_get_params(const struct update_records *record) update_ops_size(&record->ur_ops, record->ur_update_count)); } +static inline struct update_param * +update_param_next_param(const struct update_param *param) +{ + return (struct update_param *)((char *)param + + object_update_param_size( + (struct object_update_param *)param)); +} + +static inline size_t +__update_records_size(size_t raw_size) +{ + return cfs_size_round(offsetof(struct update_records, ur_ops) + + raw_size); +} + static inline size_t update_records_size(const struct update_records *record) { - struct update_params *params; + size_t op_size = 0; + size_t param_size = 0; - params = update_records_get_params(record); + if (record->ur_update_count > 0) + op_size = update_ops_size(&record->ur_ops, + record->ur_update_count); + if (record->ur_param_count > 0) { + struct update_params *params; - return cfs_size_round(offsetof(struct update_records, ur_ops) + - update_ops_size(&record->ur_ops, record->ur_update_count) + - update_params_size(params, record->ur_param_count)); + params = update_records_get_params(record); + param_size = update_params_size(params, record->ur_param_count); + } + + return __update_records_size(op_size + param_size); } static inline size_t -llog_update_record_size(const struct llog_update_record *lur) +__llog_update_record_size(size_t records_size) { - return cfs_size_round(sizeof(lur->lur_hdr) + - update_records_size(&lur->lur_update_rec) + + return cfs_size_round(sizeof(struct llog_rec_hdr) + records_size + sizeof(struct llog_rec_tail)); } +static inline size_t +llog_update_record_size(const struct llog_update_record *lur) +{ + return __llog_update_record_size( + update_records_size(&lur->lur_update_rec)); +} + static inline struct update_op * update_ops_get_op(const struct update_ops *ops, unsigned int index, unsigned int update_count) @@ -240,7 +214,7 @@ static inline void *size = param->oup_len; if (param->oup_len == 0) - return NULL; + return ERR_PTR(-ENODATA); return (void *)¶m->oup_buf[0]; } @@ -262,30 +236,20 @@ object_update_request_size(const struct object_update_request *our) } static inline void -object_update_reply_init(struct object_update_reply *reply, size_t count) -{ - reply->ourp_magic = UPDATE_REPLY_MAGIC; - reply->ourp_count = count; -} - -static inline void object_update_result_insert(struct object_update_reply *reply, void *data, size_t data_len, size_t index, int rc) { struct object_update_result *update_result; - char *ptr; update_result = object_update_result_get(reply, index, NULL); - LASSERT(update_result != NULL); + LASSERT(update_result); update_result->our_rc = ptlrpc_status_hton(rc); - if (data_len > 0) { - LASSERT(data != NULL); - ptr = (char *)update_result + - cfs_size_round(sizeof(struct object_update_reply)); + if (rc >= 0) { + if (data_len > 0 && data) + memcpy(update_result->our_data, data, data_len); update_result->our_datalen = data_len; - memcpy(ptr, data, data_len); } reply->ourp_lens[index] = cfs_size_round(data_len + @@ -314,7 +278,7 @@ object_update_result_data_get(const struct object_update_reply *reply, lbuf->lb_buf = update_result->our_data; lbuf->lb_len = update_result->our_datalen; - return 0; + return result; } /** @@ -322,99 +286,179 @@ object_update_result_data_get(const struct object_update_reply *reply, * distribution. */ struct thandle_update_records { - /* All of updates for the cross-MDT operation. */ + /* All of updates for the cross-MDT operation, vmalloc'd. */ struct llog_update_record *tur_update_records; size_t tur_update_records_buf_size; - /* All of parameters for the cross-MDT operation */ + /* All of parameters for the cross-MDT operation, vmalloc'd */ struct update_params *tur_update_params; unsigned int tur_update_param_count; size_t tur_update_params_buf_size; }; #define TOP_THANDLE_MAGIC 0x20140917 +struct top_multiple_thandle { + struct dt_device *tmt_master_sub_dt; + atomic_t tmt_refcount; + /* Other sub transactions will be listed here. */ + struct list_head tmt_sub_thandle_list; + spinlock_t tmt_sub_lock; + + struct list_head tmt_commit_list; + /* All of update records will packed here */ + struct thandle_update_records *tmt_update_records; + + wait_queue_head_t tmt_stop_waitq; + __u64 tmt_batchid; + int tmt_result; + __u32 tmt_magic; + size_t tmt_record_size; + __u32 tmt_committed:1; +}; + /* {top,sub}_thandle are used to manage distributed transactions which * include updates on several nodes. A top_handle represents the * whole operation, and sub_thandle represents updates on each node. */ struct top_thandle { struct thandle tt_super; - __u32 tt_magic; - atomic_t tt_refcount; /* The master sub transaction. */ struct thandle *tt_master_sub_thandle; - /* Other sub thandle will be listed here. */ - struct list_head tt_sub_thandle_list; + struct top_multiple_thandle *tt_multiple_thandle; +}; - /* All of update records will packed here */ - struct thandle_update_records *tt_update_records; +struct sub_thandle_cookie { + struct llog_cookie stc_cookie; + struct list_head stc_list; }; +/* Sub thandle is used to track multiple sub thandles under one parent + * thandle */ struct sub_thandle { - /* point to the osd/osp_thandle */ struct thandle *st_sub_th; + struct dt_device *st_dt; + struct list_head st_cookie_list; + struct dt_txn_commit_cb st_commit_dcb; + struct dt_txn_commit_cb st_stop_dcb; + int st_result; /* linked to top_thandle */ struct list_head st_sub_list; /* If this sub thandle is committed */ bool st_committed:1, - st_record_update:1; + st_stopped:1, + st_started:1; }; +struct tx_arg; +typedef int (*tx_exec_func_t)(const struct lu_env *env, struct thandle *th, + struct tx_arg *ta); + +/* Structure for holding one update execution */ +struct tx_arg { + tx_exec_func_t exec_fn; + tx_exec_func_t undo_fn; + struct dt_object *object; + const char *file; + struct object_update_reply *reply; + int line; + int index; + union { + struct { + struct dt_insert_rec rec; + const struct dt_key *key; + } insert; + struct { + } ref; + struct { + struct lu_attr attr; + } attr_set; + struct { + struct lu_buf buf; + const char *name; + int flags; + __u32 csum; + } xattr_set; + struct { + struct lu_attr attr; + struct dt_allocation_hint hint; + struct dt_object_format dof; + struct lu_fid fid; + } create; + struct { + struct lu_buf buf; + loff_t pos; + } write; + struct { + struct ost_body *body; + } destroy; + } u; +}; + +/* Structure for holding all update executations of one transaction */ +struct thandle_exec_args { + struct thandle *ta_handle; + int ta_argno; /* used args */ + int ta_alloc_args; /* allocated args count */ + struct tx_arg **ta_args; +}; /* target/out_lib.c */ int out_update_pack(const struct lu_env *env, struct object_update *update, - size_t max_update_size, enum update_type op, + size_t *max_update_size, enum update_type op, const struct lu_fid *fid, unsigned int params_count, - __u16 *param_sizes, const void **param_bufs); + __u16 *param_sizes, const void **param_bufs, + __u32 reply_size); int out_create_pack(const struct lu_env *env, struct object_update *update, - size_t max_update_size, const struct lu_fid *fid, + size_t *max_update_size, const struct lu_fid *fid, const struct lu_attr *attr, struct dt_allocation_hint *hint, struct dt_object_format *dof); -int out_object_destroy_pack(const struct lu_env *env, - struct object_update *update, - size_t max_update_size, - const struct lu_fid *fid); +int out_destroy_pack(const struct lu_env *env, struct object_update *update, + size_t *max_update_size, const struct lu_fid *fid); int out_index_delete_pack(const struct lu_env *env, - struct object_update *update, size_t max_update_size, + struct object_update *update, size_t *max_update_size, const struct lu_fid *fid, const struct dt_key *key); int out_index_insert_pack(const struct lu_env *env, - struct object_update *update, size_t max_update_size, + struct object_update *update, size_t *max_update_size, const struct lu_fid *fid, const struct dt_rec *rec, const struct dt_key *key); int out_xattr_set_pack(const struct lu_env *env, - struct object_update *update, size_t max_update_size, + struct object_update *update, size_t *max_update_size, const struct lu_fid *fid, const struct lu_buf *buf, const char *name, __u32 flag); int out_xattr_del_pack(const struct lu_env *env, - struct object_update *update, size_t max_update_size, + struct object_update *update, size_t *max_update_size, const struct lu_fid *fid, const char *name); int out_attr_set_pack(const struct lu_env *env, - struct object_update *update, size_t max_update_size, + struct object_update *update, size_t *max_update_size, const struct lu_fid *fid, const struct lu_attr *attr); int out_ref_add_pack(const struct lu_env *env, - struct object_update *update, size_t max_update_size, + struct object_update *update, size_t *max_update_size, const struct lu_fid *fid); int out_ref_del_pack(const struct lu_env *env, - struct object_update *update, size_t max_update_size, + struct object_update *update, size_t *max_update_size, const struct lu_fid *fid); int out_write_pack(const struct lu_env *env, - struct object_update *update, size_t max_update_size, + struct object_update *update, size_t *max_update_size, const struct lu_fid *fid, const struct lu_buf *buf, __u64 pos); int out_attr_get_pack(const struct lu_env *env, - struct object_update *update, size_t max_update_size, + struct object_update *update, size_t *max_update_size, const struct lu_fid *fid); int out_index_lookup_pack(const struct lu_env *env, - struct object_update *update, size_t max_update_size, + struct object_update *update, size_t *max_update_size, const struct lu_fid *fid, struct dt_rec *rec, const struct dt_key *key); int out_xattr_get_pack(const struct lu_env *env, - struct object_update *update, size_t max_update_size, - const struct lu_fid *fid, const char *name); + struct object_update *update, size_t *max_update_size, + const struct lu_fid *fid, const char *name, + const int bufsize); +int out_xattr_list_pack(const struct lu_env *env, struct object_update *update, + size_t *max_update_size, const struct lu_fid *fid, + const int bufsize); int out_read_pack(const struct lu_env *env, struct object_update *update, - size_t max_update_length, const struct lu_fid *fid, + size_t *max_update_length, const struct lu_fid *fid, size_t size, loff_t pos); const char *update_op_str(__u16 opcode); @@ -433,16 +477,67 @@ thandle_get_sub(const struct lu_env *env, struct thandle *th, struct thandle * top_trans_create(const struct lu_env *env, struct dt_device *master_dev); - int top_trans_start(const struct lu_env *env, struct dt_device *master_dev, struct thandle *th); - int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev, struct thandle *th); +void top_multiple_thandle_destroy(struct top_multiple_thandle *tmt); + +static inline void top_multiple_thandle_get(struct top_multiple_thandle *tmt) +{ + atomic_inc(&tmt->tmt_refcount); +} + +static inline void top_multiple_thandle_put(struct top_multiple_thandle *tmt) +{ + if (atomic_dec_and_test(&tmt->tmt_refcount)) + top_multiple_thandle_destroy(tmt); +} -void top_thandle_destroy(struct top_thandle *top_th); +struct sub_thandle *lookup_sub_thandle(struct top_multiple_thandle *tmt, + struct dt_device *dt_dev); +int sub_thandle_trans_create(const struct lu_env *env, + struct top_thandle *top_th, + struct sub_thandle *st); /* update_records.c */ +size_t update_records_create_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct lu_attr *attr, + const struct dt_allocation_hint *hint, + struct dt_object_format *dof); +size_t update_records_attr_set_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct lu_attr *attr); +size_t update_records_ref_add_size(const struct lu_env *env, + const struct lu_fid *fid); +size_t update_records_ref_del_size(const struct lu_env *env, + const struct lu_fid *fid); +size_t update_records_destroy_size(const struct lu_env *env, + const struct lu_fid *fid); +size_t update_records_index_insert_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct dt_rec *rec, + const struct dt_key *key); +size_t update_records_index_delete_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct dt_key *key); +size_t update_records_xattr_set_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct lu_buf *buf, + const char *name, + __u32 flag); +size_t update_records_xattr_del_size(const struct lu_env *env, + const struct lu_fid *fid, + const char *name); +size_t update_records_write_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct lu_buf *buf, + __u64 pos); +size_t update_records_punch_size(const struct lu_env *env, + const struct lu_fid *fid, + __u64 start, __u64 end); + int update_records_create_pack(const struct lu_env *env, struct update_ops *ops, unsigned int *op_count, @@ -479,14 +574,13 @@ int update_records_ref_del_pack(const struct lu_env *env, unsigned int *param_count, size_t *max_param_size, const struct lu_fid *fid); -int update_records_object_destroy_pack(const struct lu_env *env, - struct update_ops *ops, - unsigned int *op_count, - size_t *max_ops_size, - struct update_params *params, - unsigned int *param_count, - size_t *max_param_size, - const struct lu_fid *fid); +int update_records_destroy_pack(const struct lu_env *env, + struct update_ops *ops, unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid); int update_records_index_insert_pack(const struct lu_env *env, struct update_ops *ops, unsigned int *op_count, @@ -544,21 +638,26 @@ int update_records_punch_pack(const struct lu_env *env, size_t *max_param_size, const struct lu_fid *fid, __u64 start, __u64 end); +int update_records_noop_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid); int tur_update_records_extend(struct thandle_update_records *tur, size_t new_size); int tur_update_params_extend(struct thandle_update_records *tur, size_t new_size); -int check_and_prepare_update_record(const struct lu_env *env, - struct thandle *th); -int merge_params_updates_buf(const struct lu_env *env, - struct thandle_update_records *tur); int tur_update_extend(struct thandle_update_records *tur, size_t new_op_size, size_t new_param_size); #define update_record_pack(name, th, ...) \ ({ \ struct top_thandle *top_th; \ + struct top_multiple_thandle *tmt; \ struct thandle_update_records *tur; \ struct llog_update_record *lur; \ size_t avail_param_size; \ @@ -567,7 +666,8 @@ int tur_update_extend(struct thandle_update_records *tur, \ while (1) { \ top_th = container_of(th, struct top_thandle, tt_super);\ - tur = top_th->tt_update_records; \ + tmt = top_th->tt_multiple_thandle; \ + tur = tmt->tmt_update_records; \ lur = tur->tur_update_records; \ avail_param_size = tur->tur_update_params_buf_size - \ update_params_size(tur->tur_update_params, \ @@ -593,4 +693,17 @@ int tur_update_extend(struct thandle_update_records *tur, } \ ret; \ }) + +#define update_record_size(env, name, th, ...) \ +({ \ + struct top_thandle *top_th; \ + struct top_multiple_thandle *tmt; \ + \ + top_th = container_of(th, struct top_thandle, tt_super); \ + \ + LASSERT(top_th->tt_multiple_thandle != NULL); \ + tmt = top_th->tt_multiple_thandle; \ + tmt->tmt_record_size += \ + update_records_##name##_size(env, __VA_ARGS__); \ +}) #endif