X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Ftarget%2Fupdate_records.c;h=87bd7e7879bb70146ae875c5e5b06fd78bb2e7c4;hp=fd1fce7d4f6ce81f525efccc2178367a6db00bcc;hb=713174908cb8e5e3ceadd3ca1cb42a88b200e576;hpb=4dcd475716418d7b2093d917a8bffa9ad14e4465 diff --git a/lustre/target/update_records.c b/lustre/target/update_records.c index fd1fce7..87bd7e7 100644 --- a/lustre/target/update_records.c +++ b/lustre/target/update_records.c @@ -20,7 +20,7 @@ * GPL HEADER END */ /* - * Copyright (c) 2014, Intel Corporation. + * Copyright (c) 2014, 2015, Intel Corporation. */ /* @@ -41,9 +41,11 @@ #define DEBUG_SUBSYSTEM S_CLASS #include +#include #include #include #include + #include "tgt_internal.h" #define UPDATE_RECORDS_BUFFER_SIZE 8192 @@ -63,13 +65,12 @@ void update_records_dump(const struct update_records *records, { const struct update_ops *ops; const struct update_op *op = NULL; - struct update_params *params; + struct update_params *params = NULL; unsigned int i; - ops = &records->ur_ops; - params = update_records_get_params(records); - - CDEBUG(mask, "ops = %d params_count = %d\n", records->ur_update_count, + CDEBUG(mask, "master transno = "LPU64" batchid = "LPU64" flags = %x" + " ops = %d params = %d\n", records->ur_master_transno, + records->ur_batchid, records->ur_flags, records->ur_update_count, records->ur_param_count); if (records->ur_update_count == 0) @@ -78,27 +79,34 @@ void update_records_dump(const struct update_records *records, if (!dump_updates) return; + ops = &records->ur_ops; + if (records->ur_param_count > 0) + params = update_records_get_params(records); + op = &ops->uops_op[0]; - for (i = 0; i < records->ur_update_count; i++) { + for (i = 0; i < records->ur_update_count; i++, + op = update_op_next_op(op)) { unsigned int j; CDEBUG(mask, "update %dth "DFID" %s params_count = %hu\n", i, PFID(&op->uop_fid), update_op_str(op->uop_type), op->uop_param_count); + if (params == NULL) + continue; + for (j = 0; j < op->uop_param_count; j++) { struct object_update_param *param; param = update_params_get_param(params, - (unsigned int)op->uop_params_off[j], + (unsigned int)op->uop_params_off[j], records->ur_param_count); - LASSERT(param != NULL); + if (param == NULL) + continue; CDEBUG(mask, "param = %p %dth off = %hu size = %hu\n", param, j, op->uop_params_off[j], param->oup_len); } - - op = update_op_next_op(op); } } @@ -237,6 +245,66 @@ static int update_records_update_pack(const struct lu_env *env, } /** + * Calculate update_records size + * + * Calculate update_records size by param_count and param_sizes array. + * + * \param[in] param_count the count of parameters + * \param[in] sizes the size array of these parameters + * + * \retval the size of this update + */ +static size_t update_records_update_size(__u32 param_count, size_t *sizes) +{ + int i; + size_t size; + + /* Check whether the packing exceeding the maximum update size */ + size = update_op_size(param_count); + + for (i = 0; i < param_count; i++) + size += cfs_size_round(sizeof(struct object_update_param) + + sizes[i]); + + return size; +} + +/** + * Calculate create update size + * + * \param[in] env execution environment + * \param[in] ops ur_ops in update records + * \param[in] fid FID of the object to be created + * \param[in] attr attribute of the object to be created + * \param[in] hint creation hint + * \param[in] dof creation format information + * + * \retval size of create update. + */ +size_t update_records_create_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct lu_attr *attr, + const struct dt_allocation_hint *hint, + struct dt_object_format *dof) +{ + size_t sizes[2]; + int param_count = 0; + + if (attr != NULL) { + sizes[param_count] = sizeof(struct obdo); + param_count++; + } + + if (hint != NULL && hint->dah_parent != NULL) { + sizes[param_count] = sizeof(*fid); + param_count++; + } + + return update_records_update_size(param_count, sizes); +} +EXPORT_SYMBOL(update_records_create_size); + +/** * Pack create update * * Pack create update into update records. @@ -302,6 +370,26 @@ int update_records_create_pack(const struct lu_env *env, EXPORT_SYMBOL(update_records_create_pack); /** + * Calculate attr set update size + * + * \param[in] env execution environment + * \param[in] ops ur_ops in update records + * \param[in] fid FID of the object to set attr + * \param[in] attr attribute of attr set + * + * \retval size of attr set update. + */ +size_t update_records_attr_set_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct lu_attr *attr) +{ + size_t size = sizeof(struct obdo); + + return update_records_update_size(1, &size); +} +EXPORT_SYMBOL(update_records_attr_set_size); + +/** * Pack attr set update * * Pack attr_set update into update records. @@ -343,6 +431,21 @@ int update_records_attr_set_pack(const struct lu_env *env, EXPORT_SYMBOL(update_records_attr_set_pack); /** + * Calculate ref add update size + * + * \param[in] env execution environment + * \param[in] fid FID of the object to add reference + * + * \retval size of ref_add udpate. + */ +size_t update_records_ref_add_size(const struct lu_env *env, + const struct lu_fid *fid) +{ + return update_records_update_size(0, NULL); +} +EXPORT_SYMBOL(update_records_ref_add_size); + +/** * Pack ref add update * * Pack ref add update into update records. @@ -375,6 +478,55 @@ int update_records_ref_add_pack(const struct lu_env *env, EXPORT_SYMBOL(update_records_ref_add_pack); /** + * Pack noop update + * + * Pack no op update into update records. Note: no op means + * the update does not need do anything, which is only used + * in test case to verify large size record. + * + * \param[in] env execution environment + * \param[in] ops ur_ops in update records + * \param[in|out] op_count pointer to the count of ops + * \param[in|out] max_op_size maximum size of the update + * \param[in] params ur_params in update records + * \param[in|out] param_count pointer to the count of params + * \param[in|out] max_param_size maximum size of the parameter + * \param[in] fid FID of the object to add reference + * + * \retval 0 if packing succeeds. + * \retval negative errno if packing fails. + */ +int update_records_noop_pack(const struct lu_env *env, + struct update_ops *ops, + unsigned int *op_count, + size_t *max_ops_size, + struct update_params *params, + unsigned int *param_count, + size_t *max_param_size, + const struct lu_fid *fid) +{ + return update_records_update_pack(env, fid, OUT_NOOP, ops, op_count, + max_ops_size, params, param_count, + max_param_size, 0, NULL, NULL); +} +EXPORT_SYMBOL(update_records_noop_pack); + +/** + * Calculate ref del update size + * + * \param[in] env execution environment + * \param[in] fid FID of the object to delete reference + * + * \retval size of ref_del update. + */ +size_t update_records_ref_del_size(const struct lu_env *env, + const struct lu_fid *fid) +{ + return update_records_update_size(0, NULL); +} +EXPORT_SYMBOL(update_records_ref_del_size); + +/** * Pack ref del update * * Pack ref del update into update records. @@ -407,6 +559,21 @@ int update_records_ref_del_pack(const struct lu_env *env, EXPORT_SYMBOL(update_records_ref_del_pack); /** + * Calculate object destroy update size + * + * \param[in] env execution environment + * \param[in] fid FID of the object to delete reference + * + * \retval size of object destroy update. + */ +size_t update_records_object_destroy_size(const struct lu_env *env, + const struct lu_fid *fid) +{ + return update_records_update_size(0, NULL); +} +EXPORT_SYMBOL(update_records_object_destroy_size); + +/** * Pack object destroy update * * Pack object destroy update into update records. @@ -439,6 +606,28 @@ int update_records_object_destroy_pack(const struct lu_env *env, EXPORT_SYMBOL(update_records_object_destroy_pack); /** + * Calculate index insert update size + * + * \param[in] env execution environment + * \param[in] fid FID of the object to insert index + * \param[in] rec record of insertion + * \param[in] key key of insertion + * + * \retval the size of index insert update. + */ +size_t update_records_index_insert_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct dt_rec *rec, + const struct dt_key *key) +{ + size_t sizes[3] = { strlen((const char *)key) + 1, + sizeof(struct lu_fid), + sizeof(__u32) }; + return update_records_update_size(3, sizes); +} +EXPORT_SYMBOL(update_records_index_insert_size); + +/** * Pack index insert update * * Pack index insert update into update records. @@ -488,6 +677,25 @@ int update_records_index_insert_pack(const struct lu_env *env, EXPORT_SYMBOL(update_records_index_insert_pack); /** + * Calculate index delete update size + * + * \param[in] env execution environment + * \param[in] fid FID of the object to delete index + * \param[in] key key of deletion + * + * \retval the size of index delete update + */ +size_t update_records_index_delete_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct dt_key *key) +{ + size_t size = strlen((const char *)key) + 1; + + return update_records_update_size(1, &size); +} +EXPORT_SYMBOL(update_records_index_delete_size); + +/** * Pack index delete update * * Pack index delete update into update records. @@ -525,6 +733,28 @@ int update_records_index_delete_pack(const struct lu_env *env, EXPORT_SYMBOL(update_records_index_delete_pack); /** + * Calculate xattr set size + * + * \param[in] env execution environment + * \param[in] fid FID of the object to set xattr + * \param[in] buf xattr to be set + * \param[in] name name of the xattr + * \param[in] flag flag for setting xattr + * + * \retval size of xattr set update. + */ +size_t update_records_xattr_set_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct lu_buf *buf, + const char *name, __u32 flag) +{ + size_t sizes[3] = {strlen(name) + 1, buf->lb_len, sizeof(flag)}; + + return update_records_update_size(3, sizes); +} +EXPORT_SYMBOL(update_records_xattr_set_size); + +/** * Pack xattr set update * * Pack xattr set update into update records. @@ -568,6 +798,25 @@ int update_records_xattr_set_pack(const struct lu_env *env, EXPORT_SYMBOL(update_records_xattr_set_pack); /** + * Calculate xattr delete update size. + * + * \param[in] env execution environment + * \param[in] fid FID of the object to delete xattr + * \param[in] name name of the xattr + * + * \retval size of xattr delet updatee. + */ +size_t update_records_xattr_del_size(const struct lu_env *env, + const struct lu_fid *fid, + const char *name) +{ + size_t size = strlen(name) + 1; + + return update_records_update_size(1, &size); +} +EXPORT_SYMBOL(update_records_xattr_del_size); + +/** * Pack xattr delete update * * Pack xattr delete update into update records. @@ -605,6 +854,27 @@ int update_records_xattr_del_pack(const struct lu_env *env, EXPORT_SYMBOL(update_records_xattr_del_pack); /** + * Calculate write update size + * + * \param[in] env execution environment + * \param[in] fid FID of the object to write into + * \param[in] buf buffer to write which includes an embedded size field + * \param[in] pos offet in the object to start writing at + * + * \retval size of write udpate. + */ +size_t update_records_write_size(const struct lu_env *env, + const struct lu_fid *fid, + const struct lu_buf *buf, + __u64 pos) +{ + size_t sizes[2] = {buf->lb_len, sizeof(pos)}; + + return update_records_update_size(2, sizes); +} +EXPORT_SYMBOL(update_records_write_size); + +/** * Pack write update * * Pack write update into update records. @@ -647,6 +917,26 @@ int update_records_write_pack(const struct lu_env *env, EXPORT_SYMBOL(update_records_write_pack); /** + * Calculate size of punch update. + * + * \param[in] env execution environment + * \param[in] fid FID of the object to write into + * \param[in] start start offset of punch + * \param[in] end end offet of punch + * + * \retval size of update punch. + */ +size_t update_records_punch_size(const struct lu_env *env, + const struct lu_fid *fid, + __u64 start, __u64 end) +{ + size_t sizes[2] = {sizeof(start), sizeof(end)}; + + return update_records_update_size(2, sizes); +} +EXPORT_SYMBOL(update_records_punch_size); + +/** * Pack punch * * Pack punch update into update records. @@ -772,8 +1062,7 @@ int tur_update_extend(struct thandle_update_records *tur, record_size = llog_update_record_size(tur->tur_update_records); /* extend update records buffer */ - if (new_op_size > (tur->tur_update_records_buf_size - record_size - - sizeof(*tur->tur_update_records))) { + if (new_op_size >= (tur->tur_update_records_buf_size - record_size)) { extend_size = round_up(new_op_size, UPDATE_RECORDS_BUFFER_SIZE); rc = tur_update_records_extend(tur, tur->tur_update_records_buf_size + @@ -785,7 +1074,7 @@ int tur_update_extend(struct thandle_update_records *tur, /* extend parameters buffer */ params_size = update_params_size(tur->tur_update_params, tur->tur_update_param_count); - if (new_param_size > (tur->tur_update_params_buf_size - + if (new_param_size >= (tur->tur_update_params_buf_size - params_size)) { extend_size = round_up(new_param_size, UPDATE_PARAMS_BUFFER_SIZE); @@ -870,31 +1159,11 @@ EXPORT_SYMBOL(tur_update_params_extend); * \retval negative errno if updates recording fails. */ int check_and_prepare_update_record(const struct lu_env *env, - struct thandle *th) + struct thandle_update_records *tur) { - struct thandle_update_records *tur; struct llog_update_record *lur; - struct top_thandle *top_th; - struct sub_thandle *lst; - int rc; - bool record_update = false; - ENTRY; - - top_th = container_of(th, struct top_thandle, tt_super); - /* Check if it needs to record updates for this transaction */ - list_for_each_entry(lst, &top_th->tt_sub_thandle_list, st_sub_list) { - if (lst->st_record_update) { - record_update = true; - break; - } - } - if (!record_update) - RETURN(0); - - if (top_th->tt_update_records != NULL) - RETURN(0); + int rc; - tur = &update_env_info(env)->uti_tur; if (tur->tur_update_records == NULL) { rc = tur_update_records_create(tur); if (rc < 0) @@ -913,70 +1182,28 @@ int check_and_prepare_update_record(const struct lu_env *env, lur->lur_update_rec.ur_master_transno = 0; lur->lur_update_rec.ur_batchid = 0; lur->lur_update_rec.ur_flags = 0; - lur->lur_hdr.lrh_len = LLOG_CHUNK_SIZE; + lur->lur_hdr.lrh_len = LLOG_MIN_CHUNK_SIZE; tur->tur_update_param_count = 0; - top_th->tt_update_records = tur; - RETURN(0); } -EXPORT_SYMBOL(check_and_prepare_update_record); - -/** - * Merge params into the update records - * - * Merge params and ops into the update records attached to top th. - * During transaction execution phase, parameters and update ops - * are collected in two different buffers (see lod_updates_pack()), - * then in transaction stop, it needs to be merged them in one - * buffer, then being written into the update log. - * - * \param[in] env execution environment - * \param[in] tur thandle update records whose updates and - * parameters are merged - * - * \retval 0 if merging succeeds. - * \retval negaitive errno if merging fails. - */ -int merge_params_updates_buf(const struct lu_env *env, - struct thandle_update_records *tur) -{ - struct llog_update_record *lur; - struct update_params *params; - size_t params_size; - size_t record_size; - - if (tur->tur_update_records == NULL || - tur->tur_update_params == NULL) - return 0; - - lur = tur->tur_update_records; - /* Extends the update records buffer if needed */ - params_size = update_params_size(tur->tur_update_params, - tur->tur_update_param_count); - record_size = llog_update_record_size(lur); - if (cfs_size_round(record_size + params_size) > - tur->tur_update_records_buf_size) { - int rc; - - rc = tur_update_records_extend(tur, record_size + params_size); - if (rc < 0) - return rc; - lur = tur->tur_update_records; - } - - params = update_records_get_params(&lur->lur_update_rec); - memcpy(params, tur->tur_update_params, params_size); - lur->lur_update_rec.ur_param_count = tur->tur_update_param_count; - return 0; -} -EXPORT_SYMBOL(merge_params_updates_buf); static void update_key_fini(const struct lu_context *ctx, struct lu_context_key *key, void *data) { struct update_thread_info *info = data; + struct thandle_exec_args *args = &info->uti_tea; + int i; + + for (i = 0; i < args->ta_alloc_args; i++) { + if (args->ta_args[i] != NULL) + OBD_FREE_PTR(args->ta_args[i]); + } + + if (args->ta_args != NULL) + OBD_FREE(args->ta_args, sizeof(args->ta_args[0]) * + args->ta_alloc_args); if (info->uti_tur.tur_update_records != NULL) OBD_FREE_LARGE(info->uti_tur.tur_update_records,