ENTRY;
- index = (cathandle->lgh_last_idx + 1) %
- (OBD_FAIL_PRECHECK(OBD_FAIL_CAT_RECORDS) ? (cfs_fail_val + 1) :
- LLOG_HDR_BITMAP_SIZE(llh));
+ index = (cathandle->lgh_last_idx + 1) % (llog_max_idx(llh) + 1);
/* check that new llog index will not overlap with the first one.
* - llh_cat_idx is the index just before the first/oldest still in-use
loghandle->lgh_hdr->llh_flags &= ~LLOG_F_ZAP_WHEN_EMPTY;
/* this is to mimic full log, so another llog_cat_current_log()
* can skip it and ask for another onet */
- loghandle->lgh_last_idx = LLOG_HDR_BITMAP_SIZE(loghandle->lgh_hdr) + 1;
+ loghandle->lgh_last_idx = llog_max_idx(loghandle->lgh_hdr) + 1;
llog_trans_destroy(env, loghandle, th);
if (handle != NULL)
dt_trans_stop(env, dt, handle);
int rc;
ENTRY;
+
+ /* Skip processing of the logs until startcat */
+ if (rec->lrh_index < d->lpd_startcat)
+ RETURN(0);
+
rc = llog_cat_process_common(env, cat_llh, rec, &llh);
if (rc)
GOTO(out, rc);
- if (rec->lrh_index < d->lpd_startcat) {
- /* Skip processing of the logs until startcat */
- rc = 0;
- } else if (d->lpd_startidx > 0) {
- struct llog_process_cat_data cd;
+ if (d->lpd_startidx > 0) {
+ struct llog_process_cat_data cd = {
+ .lpcd_first_idx = 0,
+ .lpcd_last_idx = 0,
+ .lpcd_read_mode = LLOG_READ_MODE_NORMAL,
+ };
+
+ /* startidx is always associated with a catalog index */
+ if (d->lpd_startcat == rec->lrh_index)
+ cd.lpcd_first_idx = d->lpd_startidx;
- cd.lpcd_read_mode = LLOG_READ_MODE_NORMAL;
- cd.lpcd_first_idx = d->lpd_startidx;
- cd.lpcd_last_idx = 0;
rc = llog_process_or_fork(env, llh, d->lpd_cb, d->lpd_data,
&cd, false);
/* Continue processing the next log from idx 0 */
llog_cb_t cb, void *data, int startcat,
int startidx, bool fork)
{
- struct llog_process_data d;
struct llog_log_hdr *llh = cat_llh->lgh_hdr;
+ struct llog_process_data d;
+ struct llog_process_cat_data cd;
int rc;
ENTRY;
LASSERT(llh->llh_flags & LLOG_F_IS_CAT);
d.lpd_data = data;
d.lpd_cb = cb;
- d.lpd_startcat = (startcat == LLOG_CAT_FIRST ? 0 : startcat);
- d.lpd_startidx = startidx;
- if (llh->llh_cat_idx >= cat_llh->lgh_last_idx &&
- llh->llh_count > 1) {
- struct llog_process_cat_data cd = {
- .lpcd_read_mode = LLOG_READ_MODE_NORMAL
- };
+ /* default: start from the oldest record */
+ d.lpd_startidx = 0;
+ d.lpd_startcat = llh->llh_cat_idx + 1;
+ cd.lpcd_first_idx = llh->llh_cat_idx;
+ cd.lpcd_last_idx = 0;
+ cd.lpcd_read_mode = LLOG_READ_MODE_NORMAL;
+
+ if (startcat > 0 && startcat <= llog_max_idx(llh)) {
+ /* start from a custom catalog/llog plain indexes*/
+ d.lpd_startidx = startidx;
+ d.lpd_startcat = startcat;
+ cd.lpcd_first_idx = startcat - 1;
+ } else if (startcat != 0) {
+ CWARN("%s: startcat %d out of range for catlog "DFID"\n",
+ loghandle2name(cat_llh), startcat,
+ PLOGID(&cat_llh->lgh_id));
+ RETURN(-EINVAL);
+ }
+
+ startcat = d.lpd_startcat;
+
+ /* if startcat <= lgh_last_idx, we only need to process the first part
+ * of the catalog (from startcat).
+ */
+ if (llog_cat_is_wrapped(cat_llh) && startcat > cat_llh->lgh_last_idx) {
+ int cat_idx_origin = llh->llh_cat_idx;
CWARN("%s: catlog "DFID" crosses index zero\n",
- loghandle2name(cat_llh), PLOGID(&cat_llh->lgh_id));
- /*startcat = 0 is default value for general processing */
- if ((startcat != LLOG_CAT_FIRST &&
- startcat >= llh->llh_cat_idx) || !startcat) {
- /* processing the catalog part at the end */
- cd.lpcd_first_idx = (startcat ? startcat :
- llh->llh_cat_idx);
- if (OBD_FAIL_PRECHECK(OBD_FAIL_CAT_RECORDS))
- cd.lpcd_last_idx = cfs_fail_val;
- else
- cd.lpcd_last_idx = 0;
- rc = llog_process_or_fork(env, cat_llh, cat_cb,
- &d, &cd, fork);
- /* Reset the startcat becasue it has already reached
- * catalog bottom.
- */
- startcat = 0;
- d.lpd_startcat = 0;
- if (rc != 0)
- RETURN(rc);
- }
- /* processing the catalog part at the begining */
- cd.lpcd_first_idx = (startcat == LLOG_CAT_FIRST) ? 0 : startcat;
- /* Note, the processing will stop at the lgh_last_idx value,
- * and it could be increased during processing. So records
- * between current lgh_last_idx and lgh_last_idx in future
- * would left unprocessed.
+ loghandle2name(cat_llh),
+ PLOGID(&cat_llh->lgh_id));
+
+ /* processing the catalog part at the end */
+ rc = llog_process_or_fork(env, cat_llh, cat_cb, &d, &cd, fork);
+ if (rc)
+ RETURN(rc);
+
+ /* Reset the startcat because it has already reached catalog
+ * bottom.
+ * lgh_last_idx value could be increased during processing. So
+ * we process the remaining of catalog entries to be sure.
*/
- cd.lpcd_last_idx = cat_llh->lgh_last_idx;
- rc = llog_process_or_fork(env, cat_llh, cat_cb,
- &d, &cd, fork);
- } else {
- rc = llog_process_or_fork(env, cat_llh, cat_cb,
- &d, NULL, fork);
+ d.lpd_startcat = 1;
+ d.lpd_startidx = 0;
+ cd.lpcd_first_idx = 0;
+ cd.lpcd_last_idx = max(cat_idx_origin, cat_llh->lgh_last_idx);
+ } else if (llog_cat_is_wrapped(cat_llh)) {
+ /* only process 1st part -> stop before reaching 2sd part */
+ cd.lpcd_last_idx = llh->llh_cat_idx;
}
+ /* processing the catalog part at the begining */
+ rc = llog_process_or_fork(env, cat_llh, cat_cb, &d, &cd, fork);
+
RETURN(rc);
}
EXPORT_SYMBOL(llog_cat_process_or_fork);
+/**
+ * Process catalog records with a callback
+ *
+ * \note
+ * If "starcat = 0", this is the default processing. "startidx" argument is
+ * ignored and processing begin from the oldest record.
+ * If "startcat > 0", this is a custom starting point. Processing begin with
+ * the llog plain defined in the catalog record at index "startcat". The first
+ * llog plain record to process is at index "startidx + 1".
+ *
+ * \param env Lustre environnement
+ * \param cat_llh Catalog llog handler
+ * \param cb Callback executed for each records (in llog plain files)
+ * \param data Callback data argument
+ * \param startcat Catalog index of the llog plain to start with.
+ * \param startidx Index of the llog plain to start processing. The first
+ * record to process is at startidx + 1.
+ *
+ * \retval 0 processing successfully completed
+ * \retval LLOG_PROC_BREAK processing was stopped by the callback.
+ * \retval -errno on error.
+ */
int llog_cat_process(const struct lu_env *env, struct llog_handle *cat_llh,
llog_cb_t cb, void *data, int startcat, int startidx)
{
return cfs_fail_val;
if (cat_llh->lgh_hdr->llh_count == 1)
- return LLOG_HDR_BITMAP_SIZE(cat_llh->lgh_hdr) - 1;
+ return llog_max_idx(cat_llh->lgh_hdr);
if (cat_llh->lgh_last_idx > cat_llh->lgh_hdr->llh_cat_idx)
- return LLOG_HDR_BITMAP_SIZE(cat_llh->lgh_hdr) - 1 +
+ return llog_max_idx(cat_llh->lgh_hdr) +
cat_llh->lgh_hdr->llh_cat_idx - cat_llh->lgh_last_idx;
/* catalog is presently wrapped */
static int llog_cat_set_first_idx(struct llog_handle *cathandle, int idx)
{
struct llog_log_hdr *llh = cathandle->lgh_hdr;
- int bitmap_size;
+ int idx_nbr;
ENTRY;
- bitmap_size = LLOG_HDR_BITMAP_SIZE(llh);
+ idx_nbr = llog_max_idx(llh) + 1;
/*
* The llh_cat_idx equals to the first used index minus 1
* so if we canceled the first index then llh_cat_idx
llh->llh_cat_idx = idx;
while (idx != cathandle->lgh_last_idx) {
- idx = (idx + 1) % bitmap_size;
+ idx = (idx + 1) % idx_nbr;
if (!test_bit_le(idx, LLOG_HDR_BITMAP(llh))) {
/* update llh_cat_idx for each unset bit,
* expecting the next one is set */
int llog_cat_retain_cb(const struct lu_env *env, struct llog_handle *cat,
struct llog_rec_hdr *rec, void *data)
{
- struct llog_logid_rec *lir = (struct llog_logid_rec *)rec;
- struct llog_handle *log;
+ struct llog_handle *log = NULL;
int rc;
- if (rec->lrh_type != LLOG_LOGID_MAGIC)
- return -EINVAL;
+ rc = llog_cat_process_common(env, cat, rec, &log);
- rc = llog_cat_id2handle(env, cat, &log, &lir->lid_id);
- if (rc) {
- CDEBUG(D_IOCTL, "cannot find log "DFID"\n",
- PLOGID(&lir->lid_id));
- return -ENOENT;
- }
+ /* The empty plain log was destroyed while processing */
+ if (rc == LLOG_DEL_PLAIN || rc == LLOG_DEL_RECORD)
+ /* clear wrong catalog entry */
+ rc = llog_cat_cleanup(env, cat, log, rec->lrh_index);
+ else if (!rc)
+ llog_retain(env, log);
- llog_retain(env, log);
- llog_handle_put(env, log);
+ if (log)
+ llog_handle_put(env, log);
return rc;
}
}
run_test 134 "check_iam works without faults"
+cleanup_test_135(){
+ local oldgc=$1
+
+ printf "\nCleanup test_135\n" >&2
+ do_facet mds1 "$LCTL set_param -n $oldgc"
+ rm -rf $DIR/$tdir &> /dev/null
+ cleanup
+}
+
+__test_135_file_thread() {
+ local service="$1"
+ local init_time=$(awk '{print $1}' /proc/uptime)
+ local awkcmd="/crosses index zero/ {if (\$1 > $init_time) exit(1);}"
+
+ #Generate a full plain llogs
+ while dmesg | sed -r 's/(\[|\])//g' | awk "$awkcmd" ; do
+ createmany -o $DIR/$tdir/f 4500 >&2
+ createmany -u $DIR/$tdir/f 4500 >&2
+ done
+}
+
+__test_135_reader() {
+ local fd=$1
+ local cl_user=$2
+ local firstidx=$(changelog_user_rec mds1 $cl_user)
+ local oldidx=$firstidx
+ local newidx=0
+ local pid=0
+ local other
+
+ while read -t10 -u$fd newidx other; do
+ (( (newidx - oldidx) == 1 )) ||
+ error "changelog jump detected (last: $oldidx, current: $newidx)"
+
+ if (( (newidx - firstidx + 1) % 13000 == 0 )); then
+ [[ $pid -eq 0 ]] ||
+ wait $pid || error "changelog_clear failed"
+ changelog_clear $((newidx - 1)) mds1 >&2 & pid=$!
+ fi
+ oldidx=$newidx
+ done
+
+ [[ $pid -eq 0 ]] ||
+ wait $pid || error "changelog_clear failed"
+
+ echo "$oldidx"
+}
+
+test_135() {
+ (( MDS1_VERSION >= $(version_code 2.15.52) )) ||
+ skip "need MDS version at least 2.15.52"
+
+ local service=$(facet_svc mds1)
+ local rc=0
+ local lastread lastidx
+ local files_pid reader_pid
+ local fd
+ local init_time
+ local cl_user
+
+ # Need to reformat because we are changing llog catalog sizes to 5.
+ # Otherwise, processing could fail with existing catalogs (last_idx>5).
+ reformat
+ setup_noconfig
+
+ # Disable changelog garbage colector
+ local oldgc=$(do_facet mds1 "$LCTL get_param mdd.${service}.changelog_gc")
+ do_facet mds1 "$LCTL set_param -n mdd.${service}.changelog_gc=0"
+ stack_trap "cleanup_test_135 $oldgc" EXIT INT
+
+ # change the changelog_catalog size to 5 entries for everybody
+#define OBD_FAIL_CAT_RECORDS 0x1312
+ do_node $(comma_list $(all_nodes)) $LCTL set_param fail_loc=0x1312 fail_val=5
+
+ # disable console ratelimit
+ local rl=$(cat /sys/module/libcfs/parameters/libcfs_console_ratelimit)
+ echo 0 > /sys/module/libcfs/parameters/libcfs_console_ratelimit
+ stack_trap "echo $rl > /sys/module/libcfs/parameters/libcfs_console_ratelimit" EXIT
+
+ test_mkdir -c 1 -i 0 $DIR/$tdir || error "Failed to create directory"
+ changelog_chmask "ALL" || error "changelog_chmask failed"
+ changelog_register || error "changelog_register failed"
+
+ cl_user="${CL_USERS[mds1]%% *}"
+ changelog_users mds1 | grep -q $cl_user ||
+ error "User $cl_user not found in changelog_users"
+
+ # Start reader thread
+ coproc $LFS changelog --follow $service
+ reader_pid=$!
+ fd=${COPROC[0]}
+ stack_trap "echo kill changelog reader; kill $reader_pid" EXIT
+
+ echo -e "\nWrap arround changelog catalog"
+
+ # Start file writer thread
+ __test_135_file_thread "$service" & files_pid=$!
+ stack_trap "(pkill -P$files_pid; kill $files_pid) &> /dev/null || true" EXIT
+
+ # Check changelog entries
+ lastread=$(__test_135_reader $fd $cl_user) || exit $?
+ ! kill -0 $files_pid 2>/dev/null ||
+ error "creation thread is running. Is changelog reader stuck?"
+
+ lastidx=$(changelog_users mds1 | awk '/current_index/ {print $NF}' )
+ [[ "$lastread" -eq "$lastidx" ]] ||
+ error "invalid changelog lastidx (read: $lastread, mds: $lastidx)"
+}
+run_test 135 "check the behavior when changelog is wrapped around"
+
#
# (This was sanity/802a)
#