Whamcloud - gitweb
LU-15280 llog: fix processing of a wrapped catalog 08/45708/24
authorEtienne AUJAMES <etienne.aujames@cea.fr>
Wed, 1 Dec 2021 23:22:33 +0000 (00:22 +0100)
committerOleg Drokin <green@whamcloud.com>
Fri, 17 Feb 2023 02:23:42 +0000 (02:23 +0000)
Several issues were found with "lfs changelog --follow" for a wrapped
catalog (llog_cat_process() with startidx):

1/ incorrect lpcd_first_idx value for a wrapped catalog (startcat>0)

The first llog index to process is "lpcd_first_idx + 1". The startidx
represents the last record index processed for a llog plain. The
catalog index of this llog is startcat.
lpcd_first_idx of a catalog should be set to "startcat - 1"
e.g:
llog_cat_process(... startcat=10, startidx=101) means that the
processing will start with the llog plain at the index 10 of the
catalog. And the first record to process will be at index 102.

2/ startidx is not reset for an incorrect startcat index

startidx is relevant only for a startcat. So if the corresponding llog
plain is removed or if startcat is out of range, we need to reset
startidx.

This patch remove LLOG_CAT_FIRST, that was really confusing
(LU-14158). And update osp_sync_thread() with the
llog_cat_process() corrected behavior.

It modifies also llog_cat_retain_cb() to zap empty plain llog directly
in it (like for llog_cat_size_cb()), the current implementation is not
compatible with this patch.

The test "conf-sanity 135" verify "lfs changelog --follow" for a
wrapped changelog_catalog.

Test-Parameters: testlist=conf-sanity env=ONLY=135,ONLY_REPEAT=10
Test-Parameters: testlist=sanity env=ONLY=60a,ONLY_REPEAT=20
Test-Parameters: testlist=conf-sanity env=SLOW=yes,ONLY=106,ONLY_REPEAT=10
Fixes: a4f049b9 ("LU-13102 llog: fix processing of a wrapped catalog")
Signed-off-by: Etienne AUJAMES <eaujames@ddn.com>
Change-Id: Iaf46ddd4a6ec1e06cec0d17aa9bde766bd793abc
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/45708
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
Reviewed-by: Alexander Boyko <alexander.boyko@hpe.com>
Reviewed-by: Mikhail Pershin <mpershin@whamcloud.com>
lustre/include/lustre_log.h
lustre/include/uapi/linux/lustre/lustre_idl.h
lustre/lod/lod_dev.c
lustre/obdclass/llog.c
lustre/obdclass/llog_cat.c
lustre/obdclass/llog_osd.c
lustre/osp/osp_sync.c
lustre/tests/conf-sanity.sh
lustre/tests/test-framework.sh

index 1ee2d24..52ad713 100644 (file)
@@ -526,9 +526,19 @@ static inline int llog_connect(struct llog_ctxt *ctxt,
        RETURN(rc);
 }
 
+
+static inline int llog_max_idx(struct llog_log_hdr *lh)
+{
+       if (OBD_FAIL_PRECHECK(OBD_FAIL_CAT_RECORDS) &&
+           unlikely(lh->llh_flags & LLOG_F_IS_CAT))
+               return cfs_fail_val;
+       else
+               return LLOG_HDR_BITMAP_SIZE(lh) - 1;
+}
+
 static inline int llog_is_full(struct llog_handle *llh)
 {
-       return llh->lgh_last_idx >= LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr) - 1;
+       return llh->lgh_last_idx >= llog_max_idx(llh->lgh_hdr);
 }
 
 /* Determine if a llog plain of a catalog could be skiped based on record
@@ -547,6 +557,13 @@ static inline int llog_is_plain_skipable(struct llog_log_hdr *lh,
        return (LLOG_HDR_BITMAP_SIZE(lh) - rec->lrh_index) < (start - curr);
 }
 
+static inline bool llog_cat_is_wrapped(struct llog_handle *cat)
+{
+       struct llog_log_hdr *llh = cat->lgh_hdr;
+
+       return llh->llh_cat_idx >= cat->lgh_last_idx && llh->llh_count > 1;
+}
+
 struct llog_cfg_rec {
        struct llog_rec_hdr     lcr_hdr;
        struct lustre_cfg       lcr_cfg;
index 4d8b326..c8a9b56 100644 (file)
@@ -2981,11 +2981,6 @@ enum llog_flag {
                          LLOG_F_EXT_X_OMODE | LLOG_F_EXT_X_XATTR,
 };
 
-/* means first record of catalog */
-enum {
-       LLOG_CAT_FIRST = -1,
-};
-
 /* On-disk header structure of each log object, stored in little endian order */
 #define LLOG_MIN_CHUNK_SIZE    8192
 #define LLOG_HEADER_SIZE        (96) /* sizeof (llog_log_hdr) + sizeof(llh_tail)
index 522cf18..f6756f0 100644 (file)
@@ -421,9 +421,6 @@ static int lod_sub_cancel_llog(const struct lu_env *env,
                LCONSOLE(D_INFO, "%s: cancel update llog "DFID"\n",
                         dt->dd_lu_dev.ld_obd->obd_name,
                         PLOGID(&ctxt->loc_handle->lgh_id));
-               /* set startcat to "lgh_last_idx + 1" to zap empty llogs */
-               llog_cat_process(env, ctxt->loc_handle, NULL, NULL,
-                                ctxt->loc_handle->lgh_last_idx + 1, 0);
                /* set retention on logs to simplify reclamation */
                llog_process_or_fork(env, ctxt->loc_handle, llog_cat_retain_cb,
                                     NULL, NULL, false);
index adac984..c464b96 100644 (file)
@@ -293,7 +293,7 @@ int llog_cancel_arr_rec(const struct lu_env *env, struct llog_handle *loghandle,
 
        if ((llh->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
            (llh->llh_count == 1) &&
-           ((loghandle->lgh_last_idx == LLOG_HDR_BITMAP_SIZE(llh) - 1) ||
+           ((loghandle->lgh_last_idx == llog_max_idx(llh)) ||
             (loghandle->u.phd.phd_cat_handle != NULL &&
              loghandle->u.phd.phd_cat_handle->u.chd.chd_current_log !=
                loghandle))) {
@@ -479,7 +479,7 @@ int llog_verify_record(const struct llog_handle *llh, struct llog_rec_hdr *rec)
        else if (rec->lrh_len == 0 || rec->lrh_len > chunk_size)
                LLOG_ERROR_REC(llh, rec, "bad record len, chunk size is %d",
                               chunk_size);
-       else if (rec->lrh_index >= LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr))
+       else if (rec->lrh_index > llog_max_idx(llh->lgh_hdr))
                LLOG_ERROR_REC(llh, rec, "index is too high");
        else
                return 0;
@@ -529,16 +529,20 @@ static int llog_process_thread(void *arg)
                RETURN(0);
        }
 
-       if (cd != NULL) {
-               last_called_index = cd->lpcd_first_idx;
+       last_index = llog_max_idx(llh);
+       if (cd) {
+               if (cd->lpcd_first_idx >= llog_max_idx(llh))
+                       /* End of the indexes -> Nothing to do */
+                       GOTO(out, rc = 0);
+
                index = cd->lpcd_first_idx + 1;
+               last_called_index = cd->lpcd_first_idx;
+               if (cd->lpcd_last_idx > 0 &&
+                   cd->lpcd_last_idx <= llog_max_idx(llh))
+                       last_index = cd->lpcd_last_idx;
+               else if (cd->lpcd_read_mode & LLOG_READ_MODE_RAW)
+                       last_index = loghandle->lgh_last_idx;
        }
-       if (cd && cd->lpcd_last_idx)
-               last_index = cd->lpcd_last_idx;
-       else if (cd && (cd->lpcd_read_mode & LLOG_READ_MODE_RAW))
-               last_index = loghandle->lgh_last_idx;
-       else
-               last_index = LLOG_HDR_BITMAP_SIZE(llh) - 1;
 
        while (rc == 0) {
                struct llog_rec_hdr *rec;
@@ -974,7 +978,7 @@ int llog_reverse_process(const struct lu_env *env,
        if (cd != NULL && cd->lpcd_last_idx)
                index = cd->lpcd_last_idx;
        else
-               index = LLOG_HDR_BITMAP_SIZE(llh) - 1;
+               index = llog_max_idx(llh);
 
        while (rc == 0) {
                struct llog_rec_hdr *rec;
index a0078d2..164055e 100644 (file)
@@ -76,9 +76,7 @@ static int llog_cat_new_log(const struct lu_env *env,
 
        ENTRY;
 
-       index = (cathandle->lgh_last_idx + 1) %
-               (OBD_FAIL_PRECHECK(OBD_FAIL_CAT_RECORDS) ? (cfs_fail_val + 1) :
-                                               LLOG_HDR_BITMAP_SIZE(llh));
+       index = (cathandle->lgh_last_idx + 1) % (llog_max_idx(llh) + 1);
 
        /* check that new llog index will not overlap with the first one.
         * - llh_cat_idx is the index just before the first/oldest still in-use
@@ -224,7 +222,7 @@ out_destroy:
        loghandle->lgh_hdr->llh_flags &= ~LLOG_F_ZAP_WHEN_EMPTY;
        /* this is to mimic full log, so another llog_cat_current_log()
         * can skip it and ask for another onet */
-       loghandle->lgh_last_idx = LLOG_HDR_BITMAP_SIZE(loghandle->lgh_hdr) + 1;
+       loghandle->lgh_last_idx = llog_max_idx(loghandle->lgh_hdr) + 1;
        llog_trans_destroy(env, loghandle, th);
        if (handle != NULL)
                dt_trans_stop(env, dt, handle);
@@ -840,19 +838,26 @@ static int llog_cat_process_cb(const struct lu_env *env,
        int rc;
 
        ENTRY;
+
+       /* Skip processing of the logs until startcat */
+       if (rec->lrh_index < d->lpd_startcat)
+               RETURN(0);
+
        rc = llog_cat_process_common(env, cat_llh, rec, &llh);
        if (rc)
                GOTO(out, rc);
 
-       if (rec->lrh_index < d->lpd_startcat) {
-               /* Skip processing of the logs until startcat */
-               rc = 0;
-       } else if (d->lpd_startidx > 0) {
-                struct llog_process_cat_data cd;
+       if (d->lpd_startidx > 0) {
+               struct llog_process_cat_data cd = {
+                       .lpcd_first_idx = 0,
+                       .lpcd_last_idx = 0,
+                       .lpcd_read_mode = LLOG_READ_MODE_NORMAL,
+               };
+
+               /* startidx is always associated with a catalog index */
+               if (d->lpd_startcat == rec->lrh_index)
+                       cd.lpcd_first_idx = d->lpd_startidx;
 
-                cd.lpcd_read_mode = LLOG_READ_MODE_NORMAL;
-                cd.lpcd_first_idx = d->lpd_startidx;
-                cd.lpcd_last_idx = 0;
                rc = llog_process_or_fork(env, llh, d->lpd_cb, d->lpd_data,
                                          &cd, false);
                /* Continue processing the next log from idx 0 */
@@ -891,8 +896,9 @@ int llog_cat_process_or_fork(const struct lu_env *env,
                             llog_cb_t cb, void *data, int startcat,
                             int startidx, bool fork)
 {
-       struct llog_process_data d;
        struct llog_log_hdr *llh = cat_llh->lgh_hdr;
+       struct llog_process_data d;
+       struct llog_process_cat_data cd;
        int rc;
 
        ENTRY;
@@ -900,56 +906,86 @@ int llog_cat_process_or_fork(const struct lu_env *env,
        LASSERT(llh->llh_flags & LLOG_F_IS_CAT);
        d.lpd_data = data;
        d.lpd_cb = cb;
-       d.lpd_startcat = (startcat == LLOG_CAT_FIRST ? 0 : startcat);
-       d.lpd_startidx = startidx;
 
-       if (llh->llh_cat_idx >= cat_llh->lgh_last_idx &&
-           llh->llh_count > 1) {
-               struct llog_process_cat_data cd = {
-                       .lpcd_read_mode = LLOG_READ_MODE_NORMAL
-               };
+       /* default: start from the oldest record */
+       d.lpd_startidx = 0;
+       d.lpd_startcat = llh->llh_cat_idx + 1;
+       cd.lpcd_first_idx = llh->llh_cat_idx;
+       cd.lpcd_last_idx = 0;
+       cd.lpcd_read_mode = LLOG_READ_MODE_NORMAL;
+
+       if (startcat > 0 && startcat <= llog_max_idx(llh)) {
+               /* start from a custom catalog/llog plain indexes*/
+               d.lpd_startidx = startidx;
+               d.lpd_startcat = startcat;
+               cd.lpcd_first_idx = startcat - 1;
+       } else if (startcat != 0) {
+               CWARN("%s: startcat %d out of range for catlog "DFID"\n",
+                     loghandle2name(cat_llh), startcat,
+                     PLOGID(&cat_llh->lgh_id));
+               RETURN(-EINVAL);
+       }
+
+       startcat = d.lpd_startcat;
+
+       /* if startcat <= lgh_last_idx, we only need to process the first part
+        * of the catalog (from startcat).
+        */
+       if (llog_cat_is_wrapped(cat_llh) && startcat > cat_llh->lgh_last_idx) {
+               int cat_idx_origin = llh->llh_cat_idx;
 
                CWARN("%s: catlog "DFID" crosses index zero\n",
-                     loghandle2name(cat_llh), PLOGID(&cat_llh->lgh_id));
-               /*startcat = 0 is default value for general processing */
-               if ((startcat != LLOG_CAT_FIRST &&
-                   startcat >= llh->llh_cat_idx) || !startcat) {
-                       /* processing the catalog part at the end */
-                       cd.lpcd_first_idx = (startcat ? startcat :
-                                            llh->llh_cat_idx);
-                       if (OBD_FAIL_PRECHECK(OBD_FAIL_CAT_RECORDS))
-                               cd.lpcd_last_idx = cfs_fail_val;
-                       else
-                               cd.lpcd_last_idx = 0;
-                       rc = llog_process_or_fork(env, cat_llh, cat_cb,
-                                                 &d, &cd, fork);
-                       /* Reset the startcat becasue it has already reached
-                        * catalog bottom.
-                        */
-                       startcat = 0;
-                       d.lpd_startcat = 0;
-                       if (rc != 0)
-                               RETURN(rc);
-               }
-               /* processing the catalog part at the begining */
-               cd.lpcd_first_idx = (startcat == LLOG_CAT_FIRST) ? 0 : startcat;
-               /* Note, the processing will stop at the lgh_last_idx value,
-                * and it could be increased during processing. So records
-                * between current lgh_last_idx and lgh_last_idx in future
-                * would left unprocessed.
+                     loghandle2name(cat_llh),
+                     PLOGID(&cat_llh->lgh_id));
+
+               /* processing the catalog part at the end */
+               rc = llog_process_or_fork(env, cat_llh, cat_cb, &d, &cd, fork);
+               if (rc)
+                       RETURN(rc);
+
+               /* Reset the startcat because it has already reached catalog
+                * bottom.
+                * lgh_last_idx value could be increased during processing. So
+                * we process the remaining of catalog entries to be sure.
                 */
-               cd.lpcd_last_idx = cat_llh->lgh_last_idx;
-               rc = llog_process_or_fork(env, cat_llh, cat_cb,
-                                         &d, &cd, fork);
-       } else {
-               rc = llog_process_or_fork(env, cat_llh, cat_cb,
-                                         &d, NULL, fork);
+               d.lpd_startcat = 1;
+               d.lpd_startidx = 0;
+               cd.lpcd_first_idx = 0;
+               cd.lpcd_last_idx = max(cat_idx_origin, cat_llh->lgh_last_idx);
+       } else if (llog_cat_is_wrapped(cat_llh)) {
+               /* only process 1st part -> stop before reaching 2sd part */
+               cd.lpcd_last_idx = llh->llh_cat_idx;
        }
 
+       /* processing the catalog part at the begining */
+       rc = llog_process_or_fork(env, cat_llh, cat_cb, &d, &cd, fork);
+
        RETURN(rc);
 }
 EXPORT_SYMBOL(llog_cat_process_or_fork);
 
+/**
+ * Process catalog records with a callback
+ *
+ * \note
+ * If "starcat = 0", this is the default processing. "startidx" argument is
+ * ignored and processing begin from the oldest record.
+ * If "startcat > 0", this is a custom starting point. Processing begin with
+ * the llog plain defined in the catalog record at index "startcat". The first
+ * llog plain record to process is at index "startidx + 1".
+ *
+ * \param env          Lustre environnement
+ * \param cat_llh      Catalog llog handler
+ * \param cb           Callback executed for each records (in llog plain files)
+ * \param data         Callback data argument
+ * \param startcat     Catalog index of the llog plain to start with.
+ * \param startidx     Index of the llog plain to start processing. The first
+ *                     record to process is at startidx + 1.
+ *
+ * \retval 0 processing successfully completed
+ * \retval LLOG_PROC_BREAK processing was stopped by the callback.
+ * \retval -errno on error.
+ */
 int llog_cat_process(const struct lu_env *env, struct llog_handle *cat_llh,
                     llog_cb_t cb, void *data, int startcat, int startidx)
 {
@@ -1014,10 +1050,10 @@ __u32 llog_cat_free_space(struct llog_handle *cat_llh)
                return cfs_fail_val;
 
        if (cat_llh->lgh_hdr->llh_count == 1)
-               return LLOG_HDR_BITMAP_SIZE(cat_llh->lgh_hdr) - 1;
+               return llog_max_idx(cat_llh->lgh_hdr);
 
        if (cat_llh->lgh_last_idx > cat_llh->lgh_hdr->llh_cat_idx)
-               return LLOG_HDR_BITMAP_SIZE(cat_llh->lgh_hdr) - 1 +
+               return llog_max_idx(cat_llh->lgh_hdr) +
                       cat_llh->lgh_hdr->llh_cat_idx - cat_llh->lgh_last_idx;
 
        /* catalog is presently wrapped */
@@ -1108,11 +1144,11 @@ EXPORT_SYMBOL(llog_cat_reverse_process);
 static int llog_cat_set_first_idx(struct llog_handle *cathandle, int idx)
 {
        struct llog_log_hdr *llh = cathandle->lgh_hdr;
-       int bitmap_size;
+       int idx_nbr;
 
        ENTRY;
 
-       bitmap_size = LLOG_HDR_BITMAP_SIZE(llh);
+       idx_nbr = llog_max_idx(llh) + 1;
        /*
         * The llh_cat_idx equals to the first used index minus 1
         * so if we canceled the first index then llh_cat_idx
@@ -1122,7 +1158,7 @@ static int llog_cat_set_first_idx(struct llog_handle *cathandle, int idx)
                llh->llh_cat_idx = idx;
 
                while (idx != cathandle->lgh_last_idx) {
-                       idx = (idx + 1) % bitmap_size;
+                       idx = (idx + 1) % idx_nbr;
                        if (!test_bit_le(idx, LLOG_HDR_BITMAP(llh))) {
                                /* update llh_cat_idx for each unset bit,
                                 * expecting the next one is set */
@@ -1185,22 +1221,20 @@ int llog_cat_cleanup(const struct lu_env *env, struct llog_handle *cathandle,
 int llog_cat_retain_cb(const struct lu_env *env, struct llog_handle *cat,
                       struct llog_rec_hdr *rec, void *data)
 {
-       struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
-       struct llog_handle *log;
+       struct llog_handle *log = NULL;
        int rc;
 
-       if (rec->lrh_type != LLOG_LOGID_MAGIC)
-               return -EINVAL;
+       rc = llog_cat_process_common(env, cat, rec, &log);
 
-       rc = llog_cat_id2handle(env, cat, &log, &lir->lid_id);
-       if (rc) {
-               CDEBUG(D_IOCTL, "cannot find log "DFID"\n",
-                      PLOGID(&lir->lid_id));
-               return -ENOENT;
-       }
+       /* The empty plain log was destroyed while processing */
+       if (rc == LLOG_DEL_PLAIN || rc == LLOG_DEL_RECORD)
+               /* clear wrong catalog entry */
+               rc = llog_cat_cleanup(env, cat, log, rec->lrh_index);
+       else if (!rc)
+               llog_retain(env, log);
 
-       llog_retain(env, log);
-       llog_handle_put(env, log);
+       if (log)
+               llog_handle_put(env, log);
 
        return rc;
 }
index a246731..90aa6e9 100644 (file)
@@ -571,7 +571,7 @@ static int llog_osd_write_rec(const struct lu_env *env,
                       loghandle->lgh_max_size, (int)loghandle->lgh_last_idx,
                       PLOGID(&loghandle->lgh_id));
                /* this is to signal that this llog is full */
-               loghandle->lgh_last_idx = LLOG_HDR_BITMAP_SIZE(llh) - 1;
+               loghandle->lgh_last_idx = llog_max_idx(llh);
                RETURN(-ENOSPC);
        }
 
@@ -589,10 +589,7 @@ static int llog_osd_write_rec(const struct lu_env *env,
        }
        /* if it's the last idx in log file, then return -ENOSPC
         * or wrap around if a catalog */
-       if (llog_is_full(loghandle) ||
-           unlikely(llh->llh_flags & LLOG_F_IS_CAT &&
-                    OBD_FAIL_PRECHECK(OBD_FAIL_CAT_RECORDS) &&
-                    loghandle->lgh_last_idx >= cfs_fail_val)) {
+       if (llog_is_full(loghandle)) {
                if (llh->llh_flags & LLOG_F_IS_CAT)
                        loghandle->lgh_last_idx = 0;
                else
@@ -762,7 +759,7 @@ out:
        } else if (--loghandle->lgh_last_idx == 0 &&
            (llh->llh_flags & LLOG_F_IS_CAT) && llh->llh_cat_idx != 0) {
                /* catalog had just wrap-around case */
-               loghandle->lgh_last_idx = LLOG_HDR_BITMAP_SIZE(llh) - 1;
+               loghandle->lgh_last_idx = llog_max_idx(llh);
        }
 
        LLOG_HDR_TAIL(llh)->lrt_index = loghandle->lgh_last_idx;
index 5eee526..759ff2f 100644 (file)
@@ -1245,9 +1245,7 @@ again:
        do {
                int     size;
 
-               wrapped = (llh->lgh_hdr->llh_cat_idx >= llh->lgh_last_idx &&
-                          llh->lgh_hdr->llh_count > 1);
-
+               wrapped = llog_cat_is_wrapped(llh);
                if (OBD_FAIL_CHECK(OBD_FAIL_OSP_CANT_PROCESS_LLOG)) {
                        rc = -EINPROGRESS;
                        goto next;
@@ -1256,18 +1254,20 @@ again:
                                      d->opd_sync_last_catalog_idx, 0);
 
 next:
-               size = OBD_FAIL_PRECHECK(OBD_FAIL_CAT_RECORDS) ?
-                      cfs_fail_val : (LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr) - 1);
+               size = llog_max_idx(llh->lgh_hdr);
+
                /* processing reaches catalog bottom */
                if (d->opd_sync_last_catalog_idx == size)
-                       d->opd_sync_last_catalog_idx = LLOG_CAT_FIRST;
+                       d->opd_sync_last_catalog_idx = 1;
+               else if (wrapped)
+                       d->opd_sync_last_catalog_idx++;
                /* If catalog is wrapped we can`t predict last index of
                 * processing because lgh_last_idx could be changed.
                 * Starting form the next one. Index would be increased
                 * at llog_process_thread
                 */
        } while (rc == 0 && (wrapped ||
-                            d->opd_sync_last_catalog_idx == LLOG_CAT_FIRST));
+                            d->opd_sync_last_catalog_idx == 1));
 
        if (rc < 0) {
                if (rc == -EINPROGRESS) {
index ffa9929..c3e823b 100644 (file)
@@ -10461,6 +10461,116 @@ test_134() {
 }
 run_test 134 "check_iam works without faults"
 
+cleanup_test_135(){
+       local oldgc=$1
+
+       printf "\nCleanup test_135\n" >&2
+       do_facet mds1 "$LCTL set_param -n $oldgc"
+       rm -rf $DIR/$tdir &> /dev/null
+       cleanup
+}
+
+__test_135_file_thread() {
+       local service="$1"
+       local init_time=$(awk '{print $1}' /proc/uptime)
+       local awkcmd="/crosses index zero/ {if (\$1 > $init_time) exit(1);}"
+
+       #Generate a full plain llogs
+       while dmesg | sed -r 's/(\[|\])//g' | awk "$awkcmd" ; do
+               createmany -o $DIR/$tdir/f 4500 >&2
+               createmany -u $DIR/$tdir/f 4500 >&2
+       done
+}
+
+__test_135_reader() {
+       local fd=$1
+       local cl_user=$2
+       local firstidx=$(changelog_user_rec mds1 $cl_user)
+       local oldidx=$firstidx
+       local newidx=0
+       local pid=0
+       local other
+
+       while read -t10 -u$fd newidx other; do
+               (( (newidx - oldidx) == 1 )) ||
+                       error  "changelog jump detected (last: $oldidx, current: $newidx)"
+
+               if (( (newidx - firstidx + 1) % 13000 == 0 )); then
+                       [[ $pid -eq 0 ]] ||
+                               wait $pid || error "changelog_clear failed"
+                       changelog_clear $((newidx - 1)) mds1 >&2 & pid=$!
+               fi
+               oldidx=$newidx
+       done
+
+       [[ $pid -eq 0 ]] ||
+               wait $pid || error "changelog_clear failed"
+
+       echo "$oldidx"
+}
+
+test_135() {
+       (( MDS1_VERSION >= $(version_code 2.15.52) )) ||
+               skip "need MDS version at least 2.15.52"
+
+       local service=$(facet_svc mds1)
+       local rc=0
+       local lastread lastidx
+       local files_pid reader_pid
+       local fd
+       local init_time
+       local cl_user
+
+       # Need to reformat because we are changing llog catalog sizes to 5.
+       # Otherwise, processing could fail with existing catalogs (last_idx>5).
+       reformat
+       setup_noconfig
+
+       # Disable changelog garbage colector
+       local oldgc=$(do_facet mds1 "$LCTL get_param mdd.${service}.changelog_gc")
+       do_facet mds1 "$LCTL set_param -n mdd.${service}.changelog_gc=0"
+       stack_trap "cleanup_test_135 $oldgc" EXIT INT
+
+       # change the changelog_catalog size to 5 entries for everybody
+#define OBD_FAIL_CAT_RECORDS                        0x1312
+       do_node $(comma_list $(all_nodes)) $LCTL set_param fail_loc=0x1312 fail_val=5
+
+       # disable console ratelimit
+       local rl=$(cat /sys/module/libcfs/parameters/libcfs_console_ratelimit)
+       echo 0 > /sys/module/libcfs/parameters/libcfs_console_ratelimit
+       stack_trap "echo $rl > /sys/module/libcfs/parameters/libcfs_console_ratelimit" EXIT
+
+       test_mkdir -c 1 -i 0 $DIR/$tdir || error "Failed to create directory"
+       changelog_chmask "ALL" || error "changelog_chmask failed"
+       changelog_register || error "changelog_register failed"
+
+       cl_user="${CL_USERS[mds1]%% *}"
+       changelog_users mds1 | grep -q $cl_user ||
+               error "User $cl_user not found in changelog_users"
+
+       # Start reader thread
+       coproc $LFS changelog --follow $service
+       reader_pid=$!
+       fd=${COPROC[0]}
+       stack_trap "echo kill changelog reader; kill $reader_pid" EXIT
+
+       echo -e "\nWrap arround changelog catalog"
+
+       # Start file writer thread
+       __test_135_file_thread "$service" & files_pid=$!
+       stack_trap "(pkill -P$files_pid; kill $files_pid) &> /dev/null || true" EXIT
+
+       # Check changelog entries
+       lastread=$(__test_135_reader $fd $cl_user) || exit $?
+       ! kill -0 $files_pid 2>/dev/null ||
+               error "creation thread is running. Is changelog reader stuck?"
+
+       lastidx=$(changelog_users mds1 | awk '/current_index/ {print $NF}' )
+       [[ "$lastread" -eq "$lastidx" ]] ||
+               error "invalid changelog lastidx (read: $lastread, mds: $lastidx)"
+}
+run_test 135 "check the behavior when changelog is wrapped around"
+
 #
 # (This was sanity/802a)
 #
index 39b9c7c..f25baf8 100755 (executable)
@@ -9973,22 +9973,26 @@ __changelog_clear()
        $LFS changelog_clear $mdt $cl_user $rec
 }
 
-# usage: changelog_clear [+]INDEX
+# usage: changelog_clear [+]INDEX [facet]...
 #
 # If INDEX is prefixed with '+', increment every changelog user's record index
 # by INDEX. Otherwise, clear the changelog up to INDEX for every changelog
 # users.
 changelog_clear() {
        local rc
+       local idx=$1
+       shift
+       local cl_facets="$@"
        # bash assoc arrays do not guarantee to list keys in created order
        # so reorder to get same order than in changelog_register()
-       local cl_facets=$(echo "${!CL_USERS[@]}" | tr " " "\n" | sort |
-                         tr "\n" " ")
+       [[ -n "$cl_facets" ]] ||
+               cl_facets=$(echo "${!CL_USERS[@]}" | tr " " "\n" | sort |
+                       tr "\n" " ")
        local cl_user
 
        for facet in $cl_facets; do
                for cl_user in ${CL_USERS[$facet]}; do
-                       __changelog_clear $facet $cl_user $1 || rc=${rc:-$?}
+                       __changelog_clear $facet $cl_user $idx || rc=${rc:-$?}
                done
        done