Whamcloud - gitweb
LU-8370 dne: error in spliting update records 34/21334/6
authorLai Siyao <lai.siyao@intel.com>
Fri, 15 Jul 2016 08:53:18 +0000 (16:53 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Thu, 11 Aug 2016 05:49:34 +0000 (05:49 +0000)
sub_updates_write() needs to split update records if it exceeds
llog_chunk_size, current code is not accurate in offset
calculation, now it's changed to use the same set of functions to
estimate and set update llog size.

Besides, the padding size of each split llog should >=
LLOG_MIN_REC_SIZE, or exactly matches llog chunk size, otherwise
llog_osd_pad() will LASSERT.

Signed-off-by: Lai Siyao <lai.siyao@intel.com>
Change-Id: I061c7808b26ce98967858bbed88b9e5180d88289
Reviewed-on: http://review.whamcloud.com/21334
Tested-by: Jenkins
Reviewed-by: Fan Yong <fan.yong@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: wangdi <di.wang@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/lustre_update.h
lustre/target/update_trans.c

index bfb8317..df1786f 100644 (file)
@@ -131,6 +131,21 @@ update_records_get_params(const struct update_records *record)
                update_ops_size(&record->ur_ops, record->ur_update_count));
 }
 
+static inline struct update_param *
+update_param_next_param(const struct update_param *param)
+{
+       return (struct update_param *)((char *)param +
+                                      object_update_param_size(
+                                         (struct object_update_param *)param));
+}
+
+static inline size_t
+__update_records_size(size_t raw_size)
+{
+       return cfs_size_round(offsetof(struct update_records, ur_ops) +
+                             raw_size);
+}
+
 static inline size_t
 update_records_size(const struct update_records *record)
 {
@@ -147,18 +162,23 @@ update_records_size(const struct update_records *record)
                param_size = update_params_size(params, record->ur_param_count);
        }
 
-       return cfs_size_round(offsetof(struct update_records, ur_ops) +
-                             op_size + param_size);
+       return __update_records_size(op_size + param_size);
 }
 
 static inline size_t
-llog_update_record_size(const struct llog_update_record *lur)
+__llog_update_record_size(size_t records_size)
 {
-       return cfs_size_round(sizeof(lur->lur_hdr) +
-                             update_records_size(&lur->lur_update_rec) +
+       return cfs_size_round(sizeof(struct llog_rec_hdr) + records_size +
                              sizeof(struct llog_rec_tail));
 }
 
+static inline size_t
+llog_update_record_size(const struct llog_update_record *lur)
+{
+       return __llog_update_record_size(
+                       update_records_size(&lur->lur_update_rec));
+}
+
 static inline struct update_op *
 update_ops_get_op(const struct update_ops *ops, unsigned int index,
                  unsigned int update_count)
index 7928bae..3252de4 100644 (file)
@@ -168,19 +168,20 @@ static int sub_updates_write(const struct lu_env *env,
                             struct llog_update_record *record,
                             struct sub_thandle *sub_th)
 {
-       struct dt_device        *dt = sub_th->st_dt;
-       struct llog_ctxt        *ctxt;
-       int                     rc;
+       struct dt_device *dt = sub_th->st_dt;
+       struct llog_ctxt *ctxt;
        struct llog_update_record *lur = NULL;
-       struct update_params    *params = NULL;
-       __u32                   update_count = 0;
-       __u32                   param_count = 0;
-       __u32                   last_update_count = 0;
-       __u32                   last_param_count = 0;
-       void                    *src;
-       void                    *start;
-       void                    *next;
+       __u32 update_count = 0;
+       __u32 param_count = 0;
+       __u32 last_update_count = 0;
+       __u32 last_param_count = 0;
+       char *start;
+       char *cur;
+       char *next;
        struct sub_thandle_cookie *stc;
+       size_t reclen;
+       bool eof = false;
+       int rc;
        ENTRY;
 
        ctxt = llog_get_context(dt->dd_lu_dev.ld_obd,
@@ -233,87 +234,83 @@ static int sub_updates_write(const struct lu_env *env,
        memcpy(lur, &record->lur_hdr, sizeof(record->lur_hdr));
        lur->lur_update_rec.ur_update_count = 0;
        lur->lur_update_rec.ur_param_count = 0;
-       src = &record->lur_update_rec.ur_ops;
-       start = next = src;
-       lur->lur_hdr.lrh_len = llog_update_record_size(lur);
-       params = update_records_get_params(&record->lur_update_rec);
+       start = (char *)&record->lur_update_rec.ur_ops;
+       cur = next = start;
        do {
-               size_t rec_len;
-
-               if (update_count < record->lur_update_rec.ur_update_count) {
-                       next = update_op_next_op((struct update_op *)src);
-               } else {
-                       if (param_count == 0)
-                               next = update_records_get_params(
-                                               &record->lur_update_rec);
-                       else
-                               next = (char *)src +
-                                       object_update_param_size(
-                                       (struct object_update_param *)src);
+               if (update_count < record->lur_update_rec.ur_update_count)
+                       next = (char *)update_op_next_op(
+                                               (struct update_op *)cur);
+               else if (param_count < record->lur_update_rec.ur_param_count)
+                       next = (char *)update_param_next_param(
+                                               (struct update_param *)cur);
+               else
+                       eof = true;
+
+               /*
+                * If its size > llog chunk_size, then write current chunk to
+                * the update llog, NB the padding should >= LLOG_MIN_REC_SIZE.
+                *
+                * So check padding length is either >= LLOG_MIN_REC_SIZE or is
+                * 0 (record length just matches the chunk size).
+                */
+               reclen = __llog_update_record_size(
+                               __update_records_size(next - start));
+               if ((reclen + LLOG_MIN_REC_SIZE <= ctxt->loc_chunk_size ||
+                    reclen == ctxt->loc_chunk_size) &&
+                   !eof) {
+                       cur = next;
+
+                       if (update_count <
+                           record->lur_update_rec.ur_update_count)
+                               update_count++;
+                       else if (param_count <
+                                record->lur_update_rec.ur_param_count)
+                               param_count++;
+                       continue;
                }
 
-               rec_len = cfs_size_round((unsigned long)(next - src));
-               /* If its size > llog chunk_size, then write current chunk to
-                * the update llog. */
-               if (lur->lur_hdr.lrh_len + rec_len + LLOG_MIN_REC_SIZE >
-                   ctxt->loc_chunk_size ||
-                   param_count == record->lur_update_rec.ur_param_count) {
-                       lur->lur_update_rec.ur_update_count =
-                               update_count > last_update_count ?
-                               update_count - last_update_count : 0;
-                       lur->lur_update_rec.ur_param_count = param_count -
-                                                            last_param_count;
-
-                       memcpy(&lur->lur_update_rec.ur_ops, start,
-                              (unsigned long)(src - start));
-                       if (last_update_count != 0)
-                               lur->lur_update_rec.ur_flags |=
-                                               UPDATE_RECORD_CONTINUE;
-
-                       update_records_dump(&lur->lur_update_rec, D_INFO, true);
-                       lur->lur_hdr.lrh_len = llog_update_record_size(lur);
-                       LASSERT(lur->lur_hdr.lrh_len <= ctxt->loc_chunk_size);
-
-                       OBD_ALLOC_PTR(stc);
-                       if (stc == NULL)
-                               GOTO(llog_put, rc = -ENOMEM);
-                       INIT_LIST_HEAD(&stc->stc_list);
-
-                       rc = llog_add(env, ctxt->loc_handle,
-                                     &lur->lur_hdr,
-                                     &stc->stc_cookie, sub_th->st_sub_th);
-
-                       CDEBUG(D_INFO, "%s: Add update log "DOSTID":%u"
-                              " rc = %d\n", dt->dd_lu_dev.ld_obd->obd_name,
-                              POSTID(&stc->stc_cookie.lgc_lgl.lgl_oi),
-                              stc->stc_cookie.lgc_index, rc);
-
-                       if (rc > 0) {
-                               list_add(&stc->stc_list,
-                                        &sub_th->st_cookie_list);
-                               rc = 0;
-                       } else {
-                               OBD_FREE_PTR(stc);
-                               GOTO(llog_put, rc);
-                       }
+               lur->lur_update_rec.ur_update_count = update_count -
+                                                     last_update_count;
+               lur->lur_update_rec.ur_param_count = param_count -
+                                                    last_param_count;
+               memcpy(&lur->lur_update_rec.ur_ops, start, cur - start);
+               lur->lur_hdr.lrh_len = llog_update_record_size(lur);
 
-                       last_update_count = update_count;
-                       last_param_count = param_count;
-                       start = src;
-                       lur->lur_update_rec.ur_update_count = 0;
-                       lur->lur_update_rec.ur_param_count = 0;
-                       lur->lur_hdr.lrh_len = llog_update_record_size(lur);
+               LASSERT(lur->lur_hdr.lrh_len ==
+                        __llog_update_record_size(
+                               __update_records_size(cur - start)));
+               LASSERT(lur->lur_hdr.lrh_len <= ctxt->loc_chunk_size);
+
+               update_records_dump(&lur->lur_update_rec, D_INFO, true);
+
+               OBD_ALLOC_PTR(stc);
+               if (stc == NULL)
+                       GOTO(llog_put, rc = -ENOMEM);
+               INIT_LIST_HEAD(&stc->stc_list);
+
+               rc = llog_add(env, ctxt->loc_handle, &lur->lur_hdr,
+                             &stc->stc_cookie, sub_th->st_sub_th);
+
+               CDEBUG(D_INFO, "%s: Add update log "DOSTID":%u rc = %d\n",
+                       dt->dd_lu_dev.ld_obd->obd_name,
+                       POSTID(&stc->stc_cookie.lgc_lgl.lgl_oi),
+                       stc->stc_cookie.lgc_index, rc);
+
+               if (rc > 0) {
+                       list_add(&stc->stc_list, &sub_th->st_cookie_list);
+                       rc = 0;
+               } else {
+                       OBD_FREE_PTR(stc);
+                       GOTO(llog_put, rc);
                }
 
-               src = next;
-               lur->lur_hdr.lrh_len += cfs_size_round(rec_len);
-               if (update_count < record->lur_update_rec.ur_update_count)
-                       update_count++;
-               else if (param_count < record->lur_update_rec.ur_param_count)
-                       param_count++;
-               else
-                       break;
-       } while (1);
+               last_update_count = update_count;
+               last_param_count = param_count;
+               start = cur;
+               lur->lur_update_rec.ur_update_count = 0;
+               lur->lur_update_rec.ur_param_count = 0;
+               lur->lur_update_rec.ur_flags |= UPDATE_RECORD_CONTINUE;
+       } while (!eof);
 
 llog_put:
        if (lur != NULL)