From aa5d6bc2aa9abc745e6f590048e270c59265f699 Mon Sep 17 00:00:00 2001 From: Lai Siyao Date: Fri, 15 Jul 2016 16:53:18 +0800 Subject: [PATCH] LU-8370 dne: error in spliting update records sub_updates_write() needs to split update records if it exceeds llog_chunk_size, current code is not accurate in offset calculation, now it's changed to use the same set of functions to estimate and set update llog size. Besides, the padding size of each split llog should >= LLOG_MIN_REC_SIZE, or exactly matches llog chunk size, otherwise llog_osd_pad() will LASSERT. Signed-off-by: Lai Siyao Change-Id: I061c7808b26ce98967858bbed88b9e5180d88289 Reviewed-on: http://review.whamcloud.com/21334 Tested-by: Jenkins Reviewed-by: Fan Yong Tested-by: Maloo Reviewed-by: wangdi Reviewed-by: Oleg Drokin --- lustre/include/lustre_update.h | 30 ++++++-- lustre/target/update_trans.c | 169 ++++++++++++++++++++--------------------- 2 files changed, 108 insertions(+), 91 deletions(-) diff --git a/lustre/include/lustre_update.h b/lustre/include/lustre_update.h index bfb8317..df1786f 100644 --- a/lustre/include/lustre_update.h +++ b/lustre/include/lustre_update.h @@ -131,6 +131,21 @@ update_records_get_params(const struct update_records *record) update_ops_size(&record->ur_ops, record->ur_update_count)); } +static inline struct update_param * +update_param_next_param(const struct update_param *param) +{ + return (struct update_param *)((char *)param + + object_update_param_size( + (struct object_update_param *)param)); +} + +static inline size_t +__update_records_size(size_t raw_size) +{ + return cfs_size_round(offsetof(struct update_records, ur_ops) + + raw_size); +} + static inline size_t update_records_size(const struct update_records *record) { @@ -147,18 +162,23 @@ update_records_size(const struct update_records *record) param_size = update_params_size(params, record->ur_param_count); } - return cfs_size_round(offsetof(struct update_records, ur_ops) + - op_size + param_size); + return __update_records_size(op_size + param_size); } static inline size_t -llog_update_record_size(const struct llog_update_record *lur) +__llog_update_record_size(size_t records_size) { - return cfs_size_round(sizeof(lur->lur_hdr) + - update_records_size(&lur->lur_update_rec) + + return cfs_size_round(sizeof(struct llog_rec_hdr) + records_size + sizeof(struct llog_rec_tail)); } +static inline size_t +llog_update_record_size(const struct llog_update_record *lur) +{ + return __llog_update_record_size( + update_records_size(&lur->lur_update_rec)); +} + static inline struct update_op * update_ops_get_op(const struct update_ops *ops, unsigned int index, unsigned int update_count) diff --git a/lustre/target/update_trans.c b/lustre/target/update_trans.c index 7928bae..3252de4 100644 --- a/lustre/target/update_trans.c +++ b/lustre/target/update_trans.c @@ -168,19 +168,20 @@ static int sub_updates_write(const struct lu_env *env, struct llog_update_record *record, struct sub_thandle *sub_th) { - struct dt_device *dt = sub_th->st_dt; - struct llog_ctxt *ctxt; - int rc; + struct dt_device *dt = sub_th->st_dt; + struct llog_ctxt *ctxt; struct llog_update_record *lur = NULL; - struct update_params *params = NULL; - __u32 update_count = 0; - __u32 param_count = 0; - __u32 last_update_count = 0; - __u32 last_param_count = 0; - void *src; - void *start; - void *next; + __u32 update_count = 0; + __u32 param_count = 0; + __u32 last_update_count = 0; + __u32 last_param_count = 0; + char *start; + char *cur; + char *next; struct sub_thandle_cookie *stc; + size_t reclen; + bool eof = false; + int rc; ENTRY; ctxt = llog_get_context(dt->dd_lu_dev.ld_obd, @@ -233,87 +234,83 @@ static int sub_updates_write(const struct lu_env *env, memcpy(lur, &record->lur_hdr, sizeof(record->lur_hdr)); lur->lur_update_rec.ur_update_count = 0; lur->lur_update_rec.ur_param_count = 0; - src = &record->lur_update_rec.ur_ops; - start = next = src; - lur->lur_hdr.lrh_len = llog_update_record_size(lur); - params = update_records_get_params(&record->lur_update_rec); + start = (char *)&record->lur_update_rec.ur_ops; + cur = next = start; do { - size_t rec_len; - - if (update_count < record->lur_update_rec.ur_update_count) { - next = update_op_next_op((struct update_op *)src); - } else { - if (param_count == 0) - next = update_records_get_params( - &record->lur_update_rec); - else - next = (char *)src + - object_update_param_size( - (struct object_update_param *)src); + if (update_count < record->lur_update_rec.ur_update_count) + next = (char *)update_op_next_op( + (struct update_op *)cur); + else if (param_count < record->lur_update_rec.ur_param_count) + next = (char *)update_param_next_param( + (struct update_param *)cur); + else + eof = true; + + /* + * If its size > llog chunk_size, then write current chunk to + * the update llog, NB the padding should >= LLOG_MIN_REC_SIZE. + * + * So check padding length is either >= LLOG_MIN_REC_SIZE or is + * 0 (record length just matches the chunk size). + */ + reclen = __llog_update_record_size( + __update_records_size(next - start)); + if ((reclen + LLOG_MIN_REC_SIZE <= ctxt->loc_chunk_size || + reclen == ctxt->loc_chunk_size) && + !eof) { + cur = next; + + if (update_count < + record->lur_update_rec.ur_update_count) + update_count++; + else if (param_count < + record->lur_update_rec.ur_param_count) + param_count++; + continue; } - rec_len = cfs_size_round((unsigned long)(next - src)); - /* If its size > llog chunk_size, then write current chunk to - * the update llog. */ - if (lur->lur_hdr.lrh_len + rec_len + LLOG_MIN_REC_SIZE > - ctxt->loc_chunk_size || - param_count == record->lur_update_rec.ur_param_count) { - lur->lur_update_rec.ur_update_count = - update_count > last_update_count ? - update_count - last_update_count : 0; - lur->lur_update_rec.ur_param_count = param_count - - last_param_count; - - memcpy(&lur->lur_update_rec.ur_ops, start, - (unsigned long)(src - start)); - if (last_update_count != 0) - lur->lur_update_rec.ur_flags |= - UPDATE_RECORD_CONTINUE; - - update_records_dump(&lur->lur_update_rec, D_INFO, true); - lur->lur_hdr.lrh_len = llog_update_record_size(lur); - LASSERT(lur->lur_hdr.lrh_len <= ctxt->loc_chunk_size); - - OBD_ALLOC_PTR(stc); - if (stc == NULL) - GOTO(llog_put, rc = -ENOMEM); - INIT_LIST_HEAD(&stc->stc_list); - - rc = llog_add(env, ctxt->loc_handle, - &lur->lur_hdr, - &stc->stc_cookie, sub_th->st_sub_th); - - CDEBUG(D_INFO, "%s: Add update log "DOSTID":%u" - " rc = %d\n", dt->dd_lu_dev.ld_obd->obd_name, - POSTID(&stc->stc_cookie.lgc_lgl.lgl_oi), - stc->stc_cookie.lgc_index, rc); - - if (rc > 0) { - list_add(&stc->stc_list, - &sub_th->st_cookie_list); - rc = 0; - } else { - OBD_FREE_PTR(stc); - GOTO(llog_put, rc); - } + lur->lur_update_rec.ur_update_count = update_count - + last_update_count; + lur->lur_update_rec.ur_param_count = param_count - + last_param_count; + memcpy(&lur->lur_update_rec.ur_ops, start, cur - start); + lur->lur_hdr.lrh_len = llog_update_record_size(lur); - last_update_count = update_count; - last_param_count = param_count; - start = src; - lur->lur_update_rec.ur_update_count = 0; - lur->lur_update_rec.ur_param_count = 0; - lur->lur_hdr.lrh_len = llog_update_record_size(lur); + LASSERT(lur->lur_hdr.lrh_len == + __llog_update_record_size( + __update_records_size(cur - start))); + LASSERT(lur->lur_hdr.lrh_len <= ctxt->loc_chunk_size); + + update_records_dump(&lur->lur_update_rec, D_INFO, true); + + OBD_ALLOC_PTR(stc); + if (stc == NULL) + GOTO(llog_put, rc = -ENOMEM); + INIT_LIST_HEAD(&stc->stc_list); + + rc = llog_add(env, ctxt->loc_handle, &lur->lur_hdr, + &stc->stc_cookie, sub_th->st_sub_th); + + CDEBUG(D_INFO, "%s: Add update log "DOSTID":%u rc = %d\n", + dt->dd_lu_dev.ld_obd->obd_name, + POSTID(&stc->stc_cookie.lgc_lgl.lgl_oi), + stc->stc_cookie.lgc_index, rc); + + if (rc > 0) { + list_add(&stc->stc_list, &sub_th->st_cookie_list); + rc = 0; + } else { + OBD_FREE_PTR(stc); + GOTO(llog_put, rc); } - src = next; - lur->lur_hdr.lrh_len += cfs_size_round(rec_len); - if (update_count < record->lur_update_rec.ur_update_count) - update_count++; - else if (param_count < record->lur_update_rec.ur_param_count) - param_count++; - else - break; - } while (1); + last_update_count = update_count; + last_param_count = param_count; + start = cur; + lur->lur_update_rec.ur_update_count = 0; + lur->lur_update_rec.ur_param_count = 0; + lur->lur_update_rec.ur_flags |= UPDATE_RECORD_CONTINUE; + } while (!eof); llog_put: if (lur != NULL) -- 1.8.3.1