X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fobdclass%2Fllog_osd.c;h=5aed12e9b9d6d431a95bf50730cbdfd686fb67d7;hp=09246e50a6f3a38a9b6f3c9fb2916a20717fadff;hb=ae1404feefc1572fdafed938a3fc18131d675678;hpb=7aef9051f38bcb5757bc132ad7627742b74c654c diff --git a/lustre/obdclass/llog_osd.c b/lustre/obdclass/llog_osd.c index 09246e50..5aed12e 100644 --- a/lustre/obdclass/llog_osd.c +++ b/lustre/obdclass/llog_osd.c @@ -27,7 +27,6 @@ */ /* * This file is part of Lustre, http://www.lustre.org/ - * Lustre is a trademark of Sun Microsystems, Inc. */ /* * lustre/obdclass/llog_osd.c @@ -222,15 +221,17 @@ static int llog_osd_read_header(const struct lu_env *env, lgi = llog_info(env); + dt_read_lock(env, o, 0); + rc = dt_attr_get(env, o, &lgi->lgi_attr); if (rc) - RETURN(rc); + GOTO(unlock, rc); LASSERT(lgi->lgi_attr.la_valid & LA_SIZE); if (lgi->lgi_attr.la_size == 0) { CDEBUG(D_HA, "not reading header from 0-byte log\n"); - RETURN(LLOG_EEMPTY); + GOTO(unlock, rc = LLOG_EEMPTY); } flags = handle->lgh_hdr->llh_flags; @@ -249,7 +250,7 @@ static int llog_osd_read_header(const struct lu_env *env, if (rc >= 0) rc = -EFAULT; - RETURN(rc); + GOTO(unlock, rc); } if (LLOG_REC_HDR_NEEDS_SWABBING(llh_hdr)) @@ -261,7 +262,7 @@ static int llog_osd_read_header(const struct lu_env *env, handle->lgh_name ? handle->lgh_name : "", PFID(lu_object_fid(&o->do_lu)), llh_hdr->lrh_type, LLOG_HDR_MAGIC); - RETURN(-EIO); + GOTO(unlock, rc = -EIO); } else if (llh_hdr->lrh_len < LLOG_MIN_CHUNK_SIZE || llh_hdr->lrh_len > handle->lgh_hdr_size) { CERROR("%s: incorrectly sized log %s "DFID" header: " @@ -271,7 +272,7 @@ static int llog_osd_read_header(const struct lu_env *env, handle->lgh_name ? handle->lgh_name : "", PFID(lu_object_fid(&o->do_lu)), llh_hdr->lrh_len, LLOG_MIN_CHUNK_SIZE); - RETURN(-EIO); + GOTO(unlock, rc = -EIO); } else if (LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index > LLOG_HDR_BITMAP_SIZE(handle->lgh_hdr) || LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_len != @@ -282,13 +283,16 @@ static int llog_osd_read_header(const struct lu_env *env, handle->lgh_name ? handle->lgh_name : "", PFID(lu_object_fid(&o->do_lu)), LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_len, -EIO); - RETURN(-EIO); + GOTO(unlock, rc = -EIO); } handle->lgh_hdr->llh_flags |= (flags & LLOG_F_EXT_MASK); handle->lgh_last_idx = LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index; + rc = 0; - RETURN(0); +unlock: + dt_read_unlock(env, o); + RETURN(rc); } /** @@ -381,15 +385,16 @@ static int llog_osd_write_rec(const struct lu_env *env, struct llog_cookie *reccookie, int idx, struct thandle *th) { - struct llog_thread_info *lgi = llog_info(env); - struct llog_log_hdr *llh; - int reclen = rec->lrh_len; - int index, rc; - struct llog_rec_tail *lrt; - struct dt_object *o; - __u32 chunk_size; - size_t left; - __u32 orig_last_idx; + struct llog_thread_info *lgi = llog_info(env); + struct llog_log_hdr *llh; + int reclen = rec->lrh_len; + int index, rc; + struct llog_rec_tail *lrt; + struct dt_object *o; + __u32 chunk_size; + size_t left; + __u32 orig_last_idx; + bool pad = false; ENTRY; llh = loghandle->lgh_hdr; @@ -412,6 +417,9 @@ static int llog_osd_write_rec(const struct lu_env *env, LASSERT(llh->llh_size == reclen); } + /* return error if osp object is stale */ + if (idx != LLOG_HEADER_IDX && dt_object_stale(o)) + RETURN(-ESTALE); rc = dt_attr_get(env, o, &lgi->lgi_attr); if (rc) RETURN(rc); @@ -578,6 +586,7 @@ static int llog_osd_write_rec(const struct lu_env *env, RETURN(rc); loghandle->lgh_last_idx++; /* for pad rec */ + pad = true; } /* if it's the last idx in log file, then return -ENOSPC * or wrap around if a catalog */ @@ -612,8 +621,9 @@ static int llog_osd_write_rec(const struct lu_env *env, * update/cancel, the llh_count and llh_bitmap are protected */ mutex_lock(&loghandle->lgh_hdr_mutex); if (__test_and_set_bit_le(index, LLOG_HDR_BITMAP(llh))) { - CERROR("%s: index %u already set in log bitmap\n", - o->do_lu.lo_dev->ld_obd->obd_name, index); + CERROR("%s: index %u already set in llog bitmap "DFID"\n", + o->do_lu.lo_dev->ld_obd->obd_name, index, + PFID(lu_object_fid(&o->do_lu))); mutex_unlock(&loghandle->lgh_hdr_mutex); LBUG(); /* should never happen */ } @@ -627,6 +637,14 @@ static int llog_osd_write_rec(const struct lu_env *env, llh->llh_size = reclen; } + /* + * readers (e.g. llog_osd_read_header()) must not find + * llog updated partially (bitmap/counter claims record, + * but a record hasn't been added yet) as this results + * in EIO. + */ + dt_write_lock(env, o, 0); + if (lgi->lgi_attr.la_size == 0) { lgi->lgi_off = 0; lgi->lgi_buf.lb_len = llh->llh_hdr.lrh_len; @@ -668,12 +686,19 @@ static int llog_osd_write_rec(const struct lu_env *env, if (rc != 0) GOTO(out_unlock, rc); } + if (OBD_FAIL_PRECHECK(OBD_FAIL_LLOG_PAUSE_AFTER_PAD) && pad) { + /* a window for concurrent llog reader, see LU-12577 */ + OBD_FAIL_TIMEOUT(OBD_FAIL_LLOG_PAUSE_AFTER_PAD, + cfs_fail_val ?: 1); + } out_unlock: /* unlock here for remote object */ mutex_unlock(&loghandle->lgh_hdr_mutex); - if (rc) + if (rc) { + dt_write_unlock(env, o); GOTO(out, rc); + } if (OBD_FAIL_PRECHECK(OBD_FAIL_LLOG_PROCESS_TIMEOUT) && cfs_fail_val == (unsigned int)(loghandle->lgh_id.lgl_oi.oi.oi_id & @@ -687,8 +712,10 @@ out_unlock: lgi->lgi_off = llh->llh_hdr.lrh_len + (index - 1) * reclen; } else { rc = dt_attr_get(env, o, &lgi->lgi_attr); - if (rc) + if (rc) { + dt_write_unlock(env, o); GOTO(out, rc); + } LASSERT(lgi->lgi_attr.la_valid & LA_SIZE); lgi->lgi_off = max_t(__u64, lgi->lgi_attr.la_size, @@ -698,6 +725,8 @@ out_unlock: lgi->lgi_buf.lb_len = reclen; lgi->lgi_buf.lb_buf = rec; rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); + + dt_write_unlock(env, o); if (rc < 0) GOTO(out, rc); @@ -900,9 +929,18 @@ static int llog_osd_next_block(const struct lu_env *env, LASSERT(loghandle); LASSERT(loghandle->lgh_ctxt); + if (OBD_FAIL_PRECHECK(OBD_FAIL_MDS_CHANGELOG_DEL) && + cfs_fail_val == ((unsigned long)loghandle & 0xFFFFFFFF)) { + OBD_RACE(OBD_FAIL_MDS_CHANGELOG_DEL); + msleep(MSEC_PER_SEC >> 2); + } + o = loghandle->lgh_obj; LASSERT(o); - LASSERT(llog_osd_exist(loghandle)); + dt_read_lock(env, o, 0); + if (!llog_osd_exist(loghandle)) + GOTO(out, rc = -ESTALE); //object was destroyed + dt = lu2dt_dev(o->do_lu.lo_dev); LASSERT(dt); @@ -967,9 +1005,25 @@ static int llog_osd_next_block(const struct lu_env *env, rec = buf; if (LLOG_REC_HDR_NEEDS_SWABBING(rec)) lustre_swab_llog_rec(rec); - tail = (struct llog_rec_tail *)((char *)buf + rc - sizeof(struct llog_rec_tail)); + + if (llog_verify_record(loghandle, rec)) { + /* + * the block seems corrupted. make a pad record so the + * caller can skip the block and try with the next one + */ + rec->lrh_len = rc; + rec->lrh_index = next_idx; + rec->lrh_type = LLOG_PAD_MAGIC; + + tail = rec_tail(rec); + tail->lrt_len = rc; + tail->lrt_index = next_idx; + + GOTO(out, rc = 0); + } + /* get the last record in block */ last_rec = (struct llog_rec_hdr *)((char *)buf + rc - tail->lrt_len); @@ -1006,7 +1060,7 @@ static int llog_osd_next_block(const struct lu_env *env, /* sanity check that the start of the new buffer is no farther * than the record that we wanted. This shouldn't happen. */ - if (rec->lrh_index > next_idx) { + if (next_idx && rec->lrh_index > next_idx) { if (!force_mini_rec && next_idx > last_idx) goto retry; @@ -1031,6 +1085,7 @@ retry: } GOTO(out, rc = -EIO); out: + dt_read_unlock(env, o); return rc; } @@ -1075,7 +1130,10 @@ static int llog_osd_prev_block(const struct lu_env *env, o = loghandle->lgh_obj; LASSERT(o); - LASSERT(llog_osd_exist(loghandle)); + dt_read_lock(env, o, 0); + if (!llog_osd_exist(loghandle)) + GOTO(out, rc = -ESTALE); + dt = lu2dt_dev(o->do_lu.lo_dev); LASSERT(dt); @@ -1158,6 +1216,7 @@ static int llog_osd_prev_block(const struct lu_env *env, } GOTO(out, rc = -EIO); out: + dt_read_unlock(env, o); return rc; } @@ -1929,7 +1988,7 @@ static int llog_osd_cleanup(const struct lu_env *env, struct llog_ctxt *ctxt) return 0; } -struct llog_operations llog_osd_ops = { +const struct llog_operations llog_osd_ops = { .lop_next_block = llog_osd_next_block, .lop_prev_block = llog_osd_prev_block, .lop_read_header = llog_osd_read_header, @@ -1947,7 +2006,7 @@ struct llog_operations llog_osd_ops = { }; EXPORT_SYMBOL(llog_osd_ops); -struct llog_operations llog_common_cat_ops = { +const struct llog_operations llog_common_cat_ops = { .lop_next_block = llog_osd_next_block, .lop_prev_block = llog_osd_prev_block, .lop_read_header = llog_osd_read_header,