From 9ba8a72a8e7ae0859c7700ff52021f83337406b3 Mon Sep 17 00:00:00 2001 From: Alex Zhuravlev Date: Tue, 15 Jun 2021 17:47:39 +0300 Subject: [PATCH] LU-12577 llog: protect partial updates from readers llog_osd_write_rec() adds a record in few steps: the header is updated first, then the record itself is appended. per-loghandle semaphore is used, but remote readers allocate a new separate loghandle for every access (header reading, blocks), the the readers can't use loghandle's semaphore to avoid accessing partial updates. use object-based locking [censored] to serialize the writer vs the readers. Lustre-change: https://review.whamcloud.com/43589 Lustre-commit: 03dd1bb036d426a692584d73f66bcdb221658d79 Signed-off-by: Alex Zhuravlev Change-Id: Ie4e4d4a1e9a6fcdea9fcca7d80b0da920e786424 Reviewed-by: Mike Pershin Reviewed-on: https://review.whamcloud.com/44935 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Andreas Dilger --- lustre/include/obd_support.h | 1 + lustre/obdclass/llog_osd.c | 62 +++++++++++++++++++++++++++++++------------- lustre/tests/sanity.sh | 28 ++++++++++++++++++++ 3 files changed, 73 insertions(+), 18 deletions(-) diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index b52ef0c..57892a1 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -568,6 +568,7 @@ extern char obd_jobid_var[]; #define OBD_FAIL_PLAIN_RECORDS 0x1319 #define OBD_FAIL_CATALOG_FULL_CHECK 0x131a #define OBD_FAIL_CATLIST 0x131b +#define OBD_FAIL_LLOG_PAUSE_AFTER_PAD 0x131c #define OBD_FAIL_LLITE 0x1400 #define OBD_FAIL_LLITE_FAULT_TRUNC_RACE 0x1401 diff --git a/lustre/obdclass/llog_osd.c b/lustre/obdclass/llog_osd.c index 031441e..2336b38 100644 --- a/lustre/obdclass/llog_osd.c +++ b/lustre/obdclass/llog_osd.c @@ -222,15 +222,17 @@ static int llog_osd_read_header(const struct lu_env *env, lgi = llog_info(env); + dt_read_lock(env, o, 0); + rc = dt_attr_get(env, o, &lgi->lgi_attr); if (rc) - RETURN(rc); + GOTO(unlock, rc); LASSERT(lgi->lgi_attr.la_valid & LA_SIZE); if (lgi->lgi_attr.la_size == 0) { CDEBUG(D_HA, "not reading header from 0-byte log\n"); - RETURN(LLOG_EEMPTY); + GOTO(unlock, rc = LLOG_EEMPTY); } flags = handle->lgh_hdr->llh_flags; @@ -249,7 +251,7 @@ static int llog_osd_read_header(const struct lu_env *env, if (rc >= 0) rc = -EFAULT; - RETURN(rc); + GOTO(unlock, rc); } if (LLOG_REC_HDR_NEEDS_SWABBING(llh_hdr)) @@ -261,7 +263,7 @@ static int llog_osd_read_header(const struct lu_env *env, handle->lgh_name ? handle->lgh_name : "", PFID(lu_object_fid(&o->do_lu)), llh_hdr->lrh_type, LLOG_HDR_MAGIC); - RETURN(-EIO); + GOTO(unlock, rc = -EIO); } else if (llh_hdr->lrh_len < LLOG_MIN_CHUNK_SIZE || llh_hdr->lrh_len > handle->lgh_hdr_size) { CERROR("%s: incorrectly sized log %s "DFID" header: " @@ -271,7 +273,7 @@ static int llog_osd_read_header(const struct lu_env *env, handle->lgh_name ? handle->lgh_name : "", PFID(lu_object_fid(&o->do_lu)), llh_hdr->lrh_len, LLOG_MIN_CHUNK_SIZE); - RETURN(-EIO); + GOTO(unlock, rc = -EIO); } else if (LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index > LLOG_HDR_BITMAP_SIZE(handle->lgh_hdr) || LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_len != @@ -282,13 +284,16 @@ static int llog_osd_read_header(const struct lu_env *env, handle->lgh_name ? handle->lgh_name : "", PFID(lu_object_fid(&o->do_lu)), LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_len, -EIO); - RETURN(-EIO); + GOTO(unlock, rc = -EIO); } handle->lgh_hdr->llh_flags |= (flags & LLOG_F_EXT_MASK); handle->lgh_last_idx = LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index; + rc = 0; - RETURN(0); +unlock: + dt_read_unlock(env, o); + RETURN(rc); } /** @@ -381,15 +386,16 @@ static int llog_osd_write_rec(const struct lu_env *env, struct llog_cookie *reccookie, int idx, struct thandle *th) { - struct llog_thread_info *lgi = llog_info(env); - struct llog_log_hdr *llh; - int reclen = rec->lrh_len; - int index, rc; - struct llog_rec_tail *lrt; - struct dt_object *o; - __u32 chunk_size; - size_t left; - __u32 orig_last_idx; + struct llog_thread_info *lgi = llog_info(env); + struct llog_log_hdr *llh; + int reclen = rec->lrh_len; + int index, rc; + struct llog_rec_tail *lrt; + struct dt_object *o; + __u32 chunk_size; + size_t left; + __u32 orig_last_idx; + bool pad = false; ENTRY; llh = loghandle->lgh_hdr; @@ -578,6 +584,7 @@ static int llog_osd_write_rec(const struct lu_env *env, RETURN(rc); loghandle->lgh_last_idx++; /* for pad rec */ + pad = true; } /* if it's the last idx in log file, then return -ENOSPC * or wrap around if a catalog */ @@ -628,6 +635,14 @@ static int llog_osd_write_rec(const struct lu_env *env, llh->llh_size = reclen; } + /* + * readers (e.g. llog_osd_read_header()) must not find + * llog updated partially (bitmap/counter claims record, + * but a record hasn't been added yet) as this results + * in EIO. + */ + dt_write_lock(env, o, 0); + if (lgi->lgi_attr.la_size == 0) { lgi->lgi_off = 0; lgi->lgi_buf.lb_len = llh->llh_hdr.lrh_len; @@ -669,12 +684,19 @@ static int llog_osd_write_rec(const struct lu_env *env, if (rc != 0) GOTO(out_unlock, rc); } + if (OBD_FAIL_PRECHECK(OBD_FAIL_LLOG_PAUSE_AFTER_PAD) && pad) { + /* a window for concurrent llog reader, see LU-12577 */ + OBD_FAIL_TIMEOUT(OBD_FAIL_LLOG_PAUSE_AFTER_PAD, + cfs_fail_val ?: 1); + } out_unlock: /* unlock here for remote object */ mutex_unlock(&loghandle->lgh_hdr_mutex); - if (rc) + if (rc) { + dt_write_unlock(env, o); GOTO(out, rc); + } if (OBD_FAIL_PRECHECK(OBD_FAIL_LLOG_PROCESS_TIMEOUT) && cfs_fail_val == (unsigned int)(loghandle->lgh_id.lgl_oi.oi.oi_id & @@ -688,8 +710,10 @@ out_unlock: lgi->lgi_off = llh->llh_hdr.lrh_len + (index - 1) * reclen; } else { rc = dt_attr_get(env, o, &lgi->lgi_attr); - if (rc) + if (rc) { + dt_write_unlock(env, o); GOTO(out, rc); + } LASSERT(lgi->lgi_attr.la_valid & LA_SIZE); lgi->lgi_off = max_t(__u64, lgi->lgi_attr.la_size, @@ -699,6 +723,8 @@ out_unlock: lgi->lgi_buf.lb_len = reclen; lgi->lgi_buf.lb_buf = rec; rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); + + dt_write_unlock(env, o); if (rc < 0) GOTO(out, rc); diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index 215f9e1..b14ca6a 100755 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -7925,6 +7925,34 @@ test_60h() { } run_test 60h "striped directory with missing stripes can be accessed" +function t60i_load() { + mkdir $DIR/$tdir + #define OBD_FAIL_LLOG_PAUSE_AFTER_PAD 0x131c + $LCTL set_param fail_loc=0x131c fail_val=1 + for ((i=0; i<5000; i++)); do + touch $DIR/$tdir/f$i + done +} + +test_60i() { + changelog_register || error "changelog_register failed" + local cl_user="${CL_USERS[$SINGLEMDS]%% *}" + changelog_users $SINGLEMDS | grep -q $cl_user || + error "User $cl_user not found in changelog_users" + changelog_chmask "ALL" + t60i_load & + local PID=$! + for((i=0; i<100; i++)); do + changelog_dump >/dev/null || + error "can't read changelog" + done + kill $PID + wait $PID + changelog_deregister || error "changelog_deregister failed" + $LCTL set_param fail_loc=0 +} +run_test 60i "llog: new record vs reader race" + test_61a() { [ $PARALLEL == "yes" ] && skip "skip parallel run" -- 1.8.3.1