From: Alex Zhuravlev Date: Sun, 9 May 2021 06:32:55 +0000 (+0300) Subject: LU-12577 llog: protect partial updates from readers X-Git-Tag: 2.14.53~105 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=ae1404feefc1572fdafed938a3fc18131d675678 LU-12577 llog: protect partial updates from readers llog_osd_write_rec() adds a record in few steps: the header is updated first, then the record itself is appended. per-loghandle semaphore is used, but remote readers allocate a new separate loghandle for every access (header reading, blocks), the the readers can't use loghandle's semaphore to avoid accessing partial updates. use object-based locking [censored] to serialize the writer vs the readers. Signed-off-by: Alex Zhuravlev Change-Id: Ie4e4d4a1e9a6fcdea9fcca7d80b0da920e786424 Reviewed-on: https://review.whamcloud.com/43589 Reviewed-by: John L. Hammond Reviewed-by: Mike Pershin Tested-by: jenkins Tested-by: Maloo Reviewed-by: Oleg Drokin --- diff --git a/lustre/include/obd_support.h b/lustre/include/obd_support.h index 29dafee..78fe61b 100644 --- a/lustre/include/obd_support.h +++ b/lustre/include/obd_support.h @@ -572,6 +572,7 @@ extern char obd_jobid_var[]; #define OBD_FAIL_PLAIN_RECORDS 0x1319 #define OBD_FAIL_CATALOG_FULL_CHECK 0x131a #define OBD_FAIL_CATLIST 0x131b +#define OBD_FAIL_LLOG_PAUSE_AFTER_PAD 0x131c #define OBD_FAIL_LLITE 0x1400 #define OBD_FAIL_LLITE_FAULT_TRUNC_RACE 0x1401 diff --git a/lustre/obdclass/llog_osd.c b/lustre/obdclass/llog_osd.c index 683b622..5aed12e 100644 --- a/lustre/obdclass/llog_osd.c +++ b/lustre/obdclass/llog_osd.c @@ -221,15 +221,17 @@ static int llog_osd_read_header(const struct lu_env *env, lgi = llog_info(env); + dt_read_lock(env, o, 0); + rc = dt_attr_get(env, o, &lgi->lgi_attr); if (rc) - RETURN(rc); + GOTO(unlock, rc); LASSERT(lgi->lgi_attr.la_valid & LA_SIZE); if (lgi->lgi_attr.la_size == 0) { CDEBUG(D_HA, "not reading header from 0-byte log\n"); - RETURN(LLOG_EEMPTY); + GOTO(unlock, rc = LLOG_EEMPTY); } flags = handle->lgh_hdr->llh_flags; @@ -248,7 +250,7 @@ static int llog_osd_read_header(const struct lu_env *env, if (rc >= 0) rc = -EFAULT; - RETURN(rc); + GOTO(unlock, rc); } if (LLOG_REC_HDR_NEEDS_SWABBING(llh_hdr)) @@ -260,7 +262,7 @@ static int llog_osd_read_header(const struct lu_env *env, handle->lgh_name ? handle->lgh_name : "", PFID(lu_object_fid(&o->do_lu)), llh_hdr->lrh_type, LLOG_HDR_MAGIC); - RETURN(-EIO); + GOTO(unlock, rc = -EIO); } else if (llh_hdr->lrh_len < LLOG_MIN_CHUNK_SIZE || llh_hdr->lrh_len > handle->lgh_hdr_size) { CERROR("%s: incorrectly sized log %s "DFID" header: " @@ -270,7 +272,7 @@ static int llog_osd_read_header(const struct lu_env *env, handle->lgh_name ? handle->lgh_name : "", PFID(lu_object_fid(&o->do_lu)), llh_hdr->lrh_len, LLOG_MIN_CHUNK_SIZE); - RETURN(-EIO); + GOTO(unlock, rc = -EIO); } else if (LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index > LLOG_HDR_BITMAP_SIZE(handle->lgh_hdr) || LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_len != @@ -281,13 +283,16 @@ static int llog_osd_read_header(const struct lu_env *env, handle->lgh_name ? handle->lgh_name : "", PFID(lu_object_fid(&o->do_lu)), LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_len, -EIO); - RETURN(-EIO); + GOTO(unlock, rc = -EIO); } handle->lgh_hdr->llh_flags |= (flags & LLOG_F_EXT_MASK); handle->lgh_last_idx = LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index; + rc = 0; - RETURN(0); +unlock: + dt_read_unlock(env, o); + RETURN(rc); } /** @@ -380,15 +385,16 @@ static int llog_osd_write_rec(const struct lu_env *env, struct llog_cookie *reccookie, int idx, struct thandle *th) { - struct llog_thread_info *lgi = llog_info(env); - struct llog_log_hdr *llh; - int reclen = rec->lrh_len; - int index, rc; - struct llog_rec_tail *lrt; - struct dt_object *o; - __u32 chunk_size; - size_t left; - __u32 orig_last_idx; + struct llog_thread_info *lgi = llog_info(env); + struct llog_log_hdr *llh; + int reclen = rec->lrh_len; + int index, rc; + struct llog_rec_tail *lrt; + struct dt_object *o; + __u32 chunk_size; + size_t left; + __u32 orig_last_idx; + bool pad = false; ENTRY; llh = loghandle->lgh_hdr; @@ -580,6 +586,7 @@ static int llog_osd_write_rec(const struct lu_env *env, RETURN(rc); loghandle->lgh_last_idx++; /* for pad rec */ + pad = true; } /* if it's the last idx in log file, then return -ENOSPC * or wrap around if a catalog */ @@ -630,6 +637,14 @@ static int llog_osd_write_rec(const struct lu_env *env, llh->llh_size = reclen; } + /* + * readers (e.g. llog_osd_read_header()) must not find + * llog updated partially (bitmap/counter claims record, + * but a record hasn't been added yet) as this results + * in EIO. + */ + dt_write_lock(env, o, 0); + if (lgi->lgi_attr.la_size == 0) { lgi->lgi_off = 0; lgi->lgi_buf.lb_len = llh->llh_hdr.lrh_len; @@ -671,12 +686,19 @@ static int llog_osd_write_rec(const struct lu_env *env, if (rc != 0) GOTO(out_unlock, rc); } + if (OBD_FAIL_PRECHECK(OBD_FAIL_LLOG_PAUSE_AFTER_PAD) && pad) { + /* a window for concurrent llog reader, see LU-12577 */ + OBD_FAIL_TIMEOUT(OBD_FAIL_LLOG_PAUSE_AFTER_PAD, + cfs_fail_val ?: 1); + } out_unlock: /* unlock here for remote object */ mutex_unlock(&loghandle->lgh_hdr_mutex); - if (rc) + if (rc) { + dt_write_unlock(env, o); GOTO(out, rc); + } if (OBD_FAIL_PRECHECK(OBD_FAIL_LLOG_PROCESS_TIMEOUT) && cfs_fail_val == (unsigned int)(loghandle->lgh_id.lgl_oi.oi.oi_id & @@ -690,8 +712,10 @@ out_unlock: lgi->lgi_off = llh->llh_hdr.lrh_len + (index - 1) * reclen; } else { rc = dt_attr_get(env, o, &lgi->lgi_attr); - if (rc) + if (rc) { + dt_write_unlock(env, o); GOTO(out, rc); + } LASSERT(lgi->lgi_attr.la_valid & LA_SIZE); lgi->lgi_off = max_t(__u64, lgi->lgi_attr.la_size, @@ -701,6 +725,8 @@ out_unlock: lgi->lgi_buf.lb_len = reclen; lgi->lgi_buf.lb_buf = rec; rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); + + dt_write_unlock(env, o); if (rc < 0) GOTO(out, rc); diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index eebfe73..58f7462 100755 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -77,8 +77,8 @@ if (( $LINUX_VERSION_CODE >= $(version_code 4.18.0) && ALWAYS_EXCEPT+=" 411" fi -# 5 12 8 12 (min)" -[ "$SLOW" = "no" ] && EXCEPT_SLOW="27m 64b 68 71 115 135 136 300o" +# 5 12 8 12 (min)" +[ "$SLOW" = "no" ] && EXCEPT_SLOW="27m 60i 64b 68 71 115 135 136 300o" if [ "$mds1_FSTYPE" = "zfs" ]; then # bug number for skipped test: @@ -8268,6 +8268,34 @@ test_60h() { } run_test 60h "striped directory with missing stripes can be accessed" +function t60i_load() { + mkdir $DIR/$tdir + #define OBD_FAIL_LLOG_PAUSE_AFTER_PAD 0x131c + $LCTL set_param fail_loc=0x131c fail_val=1 + for ((i=0; i<5000; i++)); do + touch $DIR/$tdir/f$i + done +} + +test_60i() { + changelog_register || error "changelog_register failed" + local cl_user="${CL_USERS[$SINGLEMDS]%% *}" + changelog_users $SINGLEMDS | grep -q $cl_user || + error "User $cl_user not found in changelog_users" + changelog_chmask "ALL" + t60i_load & + local PID=$! + for((i=0; i<100; i++)); do + changelog_dump >/dev/null || + error "can't read changelog" + done + kill $PID + wait $PID + changelog_deregister || error "changelog_deregister failed" + $LCTL set_param fail_loc=0 +} +run_test 60i "llog: new record vs reader race" + test_61a() { [ $PARALLEL == "yes" ] && skip "skip parallel run"