Whamcloud - gitweb
LU-12577 llog: protect partial updates from readers 89/43589/8
authorAlex Zhuravlev <bzzz@whamcloud.com>
Sun, 9 May 2021 06:32:55 +0000 (09:32 +0300)
committerOleg Drokin <green@whamcloud.com>
Mon, 21 Jun 2021 22:16:10 +0000 (22:16 +0000)
llog_osd_write_rec() adds a record in few steps: the header is
updated first, then the record itself is appended. per-loghandle
semaphore is used, but remote readers allocate a new separate
loghandle for every access (header reading, blocks), the the
readers can't use loghandle's semaphore to avoid accessing partial
updates. use object-based locking [censored] to serialize the writer
vs the readers.

Signed-off-by: Alex Zhuravlev <bzzz@whamcloud.com>
Change-Id: Ie4e4d4a1e9a6fcdea9fcca7d80b0da920e786424
Reviewed-on: https://review.whamcloud.com/43589
Reviewed-by: John L. Hammond <jhammond@whamcloud.com>
Reviewed-by: Mike Pershin <mpershin@whamcloud.com>
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/obd_support.h
lustre/obdclass/llog_osd.c
lustre/tests/sanity.sh

index 29dafee..78fe61b 100644 (file)
@@ -572,6 +572,7 @@ extern char obd_jobid_var[];
 #define OBD_FAIL_PLAIN_RECORDS                     0x1319
 #define OBD_FAIL_CATALOG_FULL_CHECK                0x131a
 #define OBD_FAIL_CATLIST                           0x131b
 #define OBD_FAIL_PLAIN_RECORDS                     0x1319
 #define OBD_FAIL_CATALOG_FULL_CHECK                0x131a
 #define OBD_FAIL_CATLIST                           0x131b
+#define OBD_FAIL_LLOG_PAUSE_AFTER_PAD               0x131c
 
 #define OBD_FAIL_LLITE                              0x1400
 #define OBD_FAIL_LLITE_FAULT_TRUNC_RACE             0x1401
 
 #define OBD_FAIL_LLITE                              0x1400
 #define OBD_FAIL_LLITE_FAULT_TRUNC_RACE             0x1401
index 683b622..5aed12e 100644 (file)
@@ -221,15 +221,17 @@ static int llog_osd_read_header(const struct lu_env *env,
 
        lgi = llog_info(env);
 
 
        lgi = llog_info(env);
 
+       dt_read_lock(env, o, 0);
+
        rc = dt_attr_get(env, o, &lgi->lgi_attr);
        if (rc)
        rc = dt_attr_get(env, o, &lgi->lgi_attr);
        if (rc)
-               RETURN(rc);
+               GOTO(unlock, rc);
 
        LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
 
        if (lgi->lgi_attr.la_size == 0) {
                CDEBUG(D_HA, "not reading header from 0-byte log\n");
 
        LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
 
        if (lgi->lgi_attr.la_size == 0) {
                CDEBUG(D_HA, "not reading header from 0-byte log\n");
-               RETURN(LLOG_EEMPTY);
+               GOTO(unlock, rc = LLOG_EEMPTY);
        }
 
        flags = handle->lgh_hdr->llh_flags;
        }
 
        flags = handle->lgh_hdr->llh_flags;
@@ -248,7 +250,7 @@ static int llog_osd_read_header(const struct lu_env *env,
                if (rc >= 0)
                        rc = -EFAULT;
 
                if (rc >= 0)
                        rc = -EFAULT;
 
-               RETURN(rc);
+               GOTO(unlock, rc);
        }
 
        if (LLOG_REC_HDR_NEEDS_SWABBING(llh_hdr))
        }
 
        if (LLOG_REC_HDR_NEEDS_SWABBING(llh_hdr))
@@ -260,7 +262,7 @@ static int llog_osd_read_header(const struct lu_env *env,
                       handle->lgh_name ? handle->lgh_name : "",
                       PFID(lu_object_fid(&o->do_lu)),
                       llh_hdr->lrh_type, LLOG_HDR_MAGIC);
                       handle->lgh_name ? handle->lgh_name : "",
                       PFID(lu_object_fid(&o->do_lu)),
                       llh_hdr->lrh_type, LLOG_HDR_MAGIC);
-               RETURN(-EIO);
+               GOTO(unlock, rc = -EIO);
        } else if (llh_hdr->lrh_len < LLOG_MIN_CHUNK_SIZE ||
                   llh_hdr->lrh_len > handle->lgh_hdr_size) {
                CERROR("%s: incorrectly sized log %s "DFID" header: "
        } else if (llh_hdr->lrh_len < LLOG_MIN_CHUNK_SIZE ||
                   llh_hdr->lrh_len > handle->lgh_hdr_size) {
                CERROR("%s: incorrectly sized log %s "DFID" header: "
@@ -270,7 +272,7 @@ static int llog_osd_read_header(const struct lu_env *env,
                       handle->lgh_name ? handle->lgh_name : "",
                       PFID(lu_object_fid(&o->do_lu)),
                       llh_hdr->lrh_len, LLOG_MIN_CHUNK_SIZE);
                       handle->lgh_name ? handle->lgh_name : "",
                       PFID(lu_object_fid(&o->do_lu)),
                       llh_hdr->lrh_len, LLOG_MIN_CHUNK_SIZE);
-               RETURN(-EIO);
+               GOTO(unlock, rc = -EIO);
        } else if (LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index >
                   LLOG_HDR_BITMAP_SIZE(handle->lgh_hdr) ||
                   LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_len !=
        } else if (LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index >
                   LLOG_HDR_BITMAP_SIZE(handle->lgh_hdr) ||
                   LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_len !=
@@ -281,13 +283,16 @@ static int llog_osd_read_header(const struct lu_env *env,
                       handle->lgh_name ? handle->lgh_name : "",
                       PFID(lu_object_fid(&o->do_lu)),
                       LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_len, -EIO);
                       handle->lgh_name ? handle->lgh_name : "",
                       PFID(lu_object_fid(&o->do_lu)),
                       LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_len, -EIO);
-               RETURN(-EIO);
+               GOTO(unlock, rc = -EIO);
        }
 
        handle->lgh_hdr->llh_flags |= (flags & LLOG_F_EXT_MASK);
        handle->lgh_last_idx = LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index;
        }
 
        handle->lgh_hdr->llh_flags |= (flags & LLOG_F_EXT_MASK);
        handle->lgh_last_idx = LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index;
+       rc = 0;
 
 
-       RETURN(0);
+unlock:
+       dt_read_unlock(env, o);
+       RETURN(rc);
 }
 
 /**
 }
 
 /**
@@ -380,15 +385,16 @@ static int llog_osd_write_rec(const struct lu_env *env,
                              struct llog_cookie *reccookie,
                              int idx, struct thandle *th)
 {
                              struct llog_cookie *reccookie,
                              int idx, struct thandle *th)
 {
-       struct llog_thread_info *lgi = llog_info(env);
-       struct llog_log_hdr     *llh;
-       int                      reclen = rec->lrh_len;
-       int                      index, rc;
-       struct llog_rec_tail    *lrt;
-       struct dt_object        *o;
-       __u32                   chunk_size;
-       size_t                   left;
-       __u32                   orig_last_idx;
+       struct llog_thread_info *lgi = llog_info(env);
+       struct llog_log_hdr *llh;
+       int reclen = rec->lrh_len;
+       int index, rc;
+       struct llog_rec_tail *lrt;
+       struct dt_object *o;
+       __u32 chunk_size;
+       size_t left;
+       __u32 orig_last_idx;
+       bool pad = false;
        ENTRY;
 
        llh = loghandle->lgh_hdr;
        ENTRY;
 
        llh = loghandle->lgh_hdr;
@@ -580,6 +586,7 @@ static int llog_osd_write_rec(const struct lu_env *env,
                        RETURN(rc);
 
                loghandle->lgh_last_idx++; /* for pad rec */
                        RETURN(rc);
 
                loghandle->lgh_last_idx++; /* for pad rec */
+               pad = true;
        }
        /* if it's the last idx in log file, then return -ENOSPC
         * or wrap around if a catalog */
        }
        /* if it's the last idx in log file, then return -ENOSPC
         * or wrap around if a catalog */
@@ -630,6 +637,14 @@ static int llog_osd_write_rec(const struct lu_env *env,
                        llh->llh_size = reclen;
        }
 
                        llh->llh_size = reclen;
        }
 
+       /*
+        * readers (e.g. llog_osd_read_header()) must not find
+        * llog updated partially (bitmap/counter claims record,
+        * but a record hasn't been added yet) as this results
+        * in EIO.
+        */
+       dt_write_lock(env, o, 0);
+
        if (lgi->lgi_attr.la_size == 0) {
                lgi->lgi_off = 0;
                lgi->lgi_buf.lb_len = llh->llh_hdr.lrh_len;
        if (lgi->lgi_attr.la_size == 0) {
                lgi->lgi_off = 0;
                lgi->lgi_buf.lb_len = llh->llh_hdr.lrh_len;
@@ -671,12 +686,19 @@ static int llog_osd_write_rec(const struct lu_env *env,
                if (rc != 0)
                        GOTO(out_unlock, rc);
        }
                if (rc != 0)
                        GOTO(out_unlock, rc);
        }
+       if (OBD_FAIL_PRECHECK(OBD_FAIL_LLOG_PAUSE_AFTER_PAD) && pad) {
+               /* a window for concurrent llog reader, see LU-12577 */
+               OBD_FAIL_TIMEOUT(OBD_FAIL_LLOG_PAUSE_AFTER_PAD,
+                                cfs_fail_val ?: 1);
+       }
 
 out_unlock:
        /* unlock here for remote object */
        mutex_unlock(&loghandle->lgh_hdr_mutex);
 
 out_unlock:
        /* unlock here for remote object */
        mutex_unlock(&loghandle->lgh_hdr_mutex);
-       if (rc)
+       if (rc) {
+               dt_write_unlock(env, o);
                GOTO(out, rc);
                GOTO(out, rc);
+       }
 
        if (OBD_FAIL_PRECHECK(OBD_FAIL_LLOG_PROCESS_TIMEOUT) &&
           cfs_fail_val == (unsigned int)(loghandle->lgh_id.lgl_oi.oi.oi_id &
 
        if (OBD_FAIL_PRECHECK(OBD_FAIL_LLOG_PROCESS_TIMEOUT) &&
           cfs_fail_val == (unsigned int)(loghandle->lgh_id.lgl_oi.oi.oi_id &
@@ -690,8 +712,10 @@ out_unlock:
                lgi->lgi_off = llh->llh_hdr.lrh_len + (index - 1) * reclen;
        } else {
                rc = dt_attr_get(env, o, &lgi->lgi_attr);
                lgi->lgi_off = llh->llh_hdr.lrh_len + (index - 1) * reclen;
        } else {
                rc = dt_attr_get(env, o, &lgi->lgi_attr);
-               if (rc)
+               if (rc) {
+                       dt_write_unlock(env, o);
                        GOTO(out, rc);
                        GOTO(out, rc);
+               }
 
                LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
                lgi->lgi_off = max_t(__u64, lgi->lgi_attr.la_size,
 
                LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
                lgi->lgi_off = max_t(__u64, lgi->lgi_attr.la_size,
@@ -701,6 +725,8 @@ out_unlock:
        lgi->lgi_buf.lb_len = reclen;
        lgi->lgi_buf.lb_buf = rec;
        rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
        lgi->lgi_buf.lb_len = reclen;
        lgi->lgi_buf.lb_buf = rec;
        rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
+
+       dt_write_unlock(env, o);
        if (rc < 0)
                GOTO(out, rc);
 
        if (rc < 0)
                GOTO(out, rc);
 
index eebfe73..58f7462 100755 (executable)
@@ -77,8 +77,8 @@ if (( $LINUX_VERSION_CODE >= $(version_code 4.18.0) &&
        ALWAYS_EXCEPT+=" 411"
 fi
 
        ALWAYS_EXCEPT+=" 411"
 fi
 
-#                                  5          12     8   12  (min)"
-[ "$SLOW" = "no" ] && EXCEPT_SLOW="27m 64b 68 71 115 135 136 300o"
+#                                  5              12     8   12  (min)"
+[ "$SLOW" = "no" ] && EXCEPT_SLOW="27m 60i 64b 68 71 115 135 136 300o"
 
 if [ "$mds1_FSTYPE" = "zfs" ]; then
        # bug number for skipped test:
 
 if [ "$mds1_FSTYPE" = "zfs" ]; then
        # bug number for skipped test:
@@ -8268,6 +8268,34 @@ test_60h() {
 }
 run_test 60h "striped directory with missing stripes can be accessed"
 
 }
 run_test 60h "striped directory with missing stripes can be accessed"
 
+function t60i_load() {
+       mkdir $DIR/$tdir
+       #define OBD_FAIL_LLOG_PAUSE_AFTER_PAD               0x131c
+       $LCTL set_param fail_loc=0x131c fail_val=1
+       for ((i=0; i<5000; i++)); do
+               touch $DIR/$tdir/f$i
+       done
+}
+
+test_60i() {
+       changelog_register || error "changelog_register failed"
+       local cl_user="${CL_USERS[$SINGLEMDS]%% *}"
+       changelog_users $SINGLEMDS | grep -q $cl_user ||
+               error "User $cl_user not found in changelog_users"
+       changelog_chmask "ALL"
+       t60i_load &
+       local PID=$!
+       for((i=0; i<100; i++)); do
+               changelog_dump >/dev/null ||
+                       error "can't read changelog"
+       done
+       kill $PID
+       wait $PID
+       changelog_deregister || error "changelog_deregister failed"
+       $LCTL set_param fail_loc=0
+}
+run_test 60i "llog: new record vs reader race"
+
 test_61a() {
        [ $PARALLEL == "yes" ] && skip "skip parallel run"
 
 test_61a() {
        [ $PARALLEL == "yes" ] && skip "skip parallel run"