X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fobdclass%2Fllog_osd.c;h=92d71869e5a914d7defacd485fe20ac641a1fb8f;hp=ffa19bd282d0558a685f89d3a85d3515ee407423;hb=82c6e42d6137f39a1f2394b7bc6e8d600eb36181;hpb=52b693c588555c55dd44fe3a27a1bf8c8cccac31 diff --git a/lustre/obdclass/llog_osd.c b/lustre/obdclass/llog_osd.c index ffa19bd..92d7186 100644 --- a/lustre/obdclass/llog_osd.c +++ b/lustre/obdclass/llog_osd.c @@ -23,7 +23,7 @@ * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2012, 2016, Intel Corporation. + * Copyright (c) 2012, 2017, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -44,6 +44,8 @@ #define DEBUG_SUBSYSTEM S_LOG +#include + #include #include #include @@ -124,8 +126,7 @@ static int llog_osd_create_new_object(const struct lu_env *env, static int llog_osd_exist(struct llog_handle *handle) { LASSERT(handle->lgh_obj); - return dt_object_exists(handle->lgh_obj) && - !lu_object_is_dying(handle->lgh_obj->do_lu.lo_header); + return dt_object_exists(handle->lgh_obj) && !handle->lgh_destroyed; } static void *rec_tail(struct llog_rec_hdr *rec) @@ -411,6 +412,9 @@ static int llog_osd_write_rec(const struct lu_env *env, LASSERT(llh->llh_size == reclen); } + /* return error if osp object is stale */ + if (idx != LLOG_HEADER_IDX && dt_object_stale(o)) + RETURN(-ESTALE); rc = dt_attr_get(env, o, &lgi->lgi_attr); if (rc) RETURN(rc); @@ -436,7 +440,7 @@ static int llog_osd_write_rec(const struct lu_env *env, /* llog can be empty only when first record is being written */ LASSERT(ergo(idx > 0, lgi->lgi_attr.la_size > 0)); - if (!ext2_test_bit(idx, LLOG_HDR_BITMAP(llh))) { + if (!test_bit_le(idx, LLOG_HDR_BITMAP(llh))) { CERROR("%s: modify unset record %u\n", o->do_lu.lo_dev->ld_obd->obd_name, idx); RETURN(-ENOENT); @@ -590,6 +594,7 @@ static int llog_osd_write_rec(const struct lu_env *env, RETURN(-ENOSPC); } + down_write(&loghandle->lgh_last_sem); /* increment the last_idx along with llh_tail index, they should * be equal for a llog lifetime */ loghandle->lgh_last_idx++; @@ -609,9 +614,10 @@ static int llog_osd_write_rec(const struct lu_env *env, /* the lgh_hdr_mutex protects llog header data from concurrent * update/cancel, the llh_count and llh_bitmap are protected */ mutex_lock(&loghandle->lgh_hdr_mutex); - if (ext2_set_bit(index, LLOG_HDR_BITMAP(llh))) { - CERROR("%s: index %u already set in log bitmap\n", - o->do_lu.lo_dev->ld_obd->obd_name, index); + if (__test_and_set_bit_le(index, LLOG_HDR_BITMAP(llh))) { + CERROR("%s: index %u already set in llog bitmap "DFID"\n", + o->do_lu.lo_dev->ld_obd->obd_name, index, + PFID(lu_object_fid(&o->do_lu))); mutex_unlock(&loghandle->lgh_hdr_mutex); LBUG(); /* should never happen */ } @@ -673,6 +679,12 @@ out_unlock: if (rc) GOTO(out, rc); + if (OBD_FAIL_PRECHECK(OBD_FAIL_LLOG_PROCESS_TIMEOUT) && + cfs_fail_val == (unsigned int)(loghandle->lgh_id.lgl_oi.oi.oi_id & + 0xFFFFFFFF)) { + OBD_RACE(OBD_FAIL_LLOG_PROCESS_TIMEOUT); + msleep(1 * MSEC_PER_SEC); + } /* computed index can be used to determine offset for fixed-size * records. This also allows to handle Catalog wrap around case */ if (llh->llh_flags & LLOG_F_IS_FIXSIZE) { @@ -693,6 +705,8 @@ out_unlock: if (rc < 0) GOTO(out, rc); + up_write(&loghandle->lgh_last_sem); + CDEBUG(D_HA, "added record "DFID".%u, %u off%llu\n", PFID(lu_object_fid(&o->do_lu)), index, rec->lrh_len, lgi->lgi_off); @@ -712,7 +726,7 @@ out_unlock: out: /* cleanup llog for error case */ mutex_lock(&loghandle->lgh_hdr_mutex); - ext2_clear_bit(index, LLOG_HDR_BITMAP(llh)); + clear_bit_le(index, LLOG_HDR_BITMAP(llh)); llh->llh_count--; mutex_unlock(&loghandle->lgh_hdr_mutex); @@ -726,6 +740,7 @@ out: } LLOG_HDR_TAIL(llh)->lrt_index = loghandle->lgh_last_idx; + up_write(&loghandle->lgh_last_sem); RETURN(rc); } @@ -788,9 +803,27 @@ static inline void llog_skip_over(struct llog_handle *lgh, __u64 *off, */ static void changelog_block_trim_ext(struct llog_rec_hdr *hdr, struct llog_rec_hdr *last_hdr, - enum changelog_rec_flags flags, - enum changelog_rec_extra_flags extra_flags) + struct llog_handle *loghandle) { + enum changelog_rec_flags flags = CLF_SUPPORTED; + enum changelog_rec_extra_flags extra_flags = CLFE_SUPPORTED; + + if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_X_XATTR)) + extra_flags &= ~CLFE_XATTR; + if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_X_OMODE)) + extra_flags &= ~CLFE_OPEN; + if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_X_NID)) + extra_flags &= ~CLFE_NID; + if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_X_UIDGID)) + extra_flags &= ~CLFE_UIDGID; + if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_EXTRA_FLAGS)) + flags &= ~CLF_EXTRA_FLAGS; + if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_JOBID)) + flags &= ~CLF_JOBID; + + if (flags == CLF_SUPPORTED && extra_flags == CLFE_SUPPORTED) + return; + if (hdr->lrh_type != CHANGELOG_REC) return; @@ -804,8 +837,26 @@ static void changelog_block_trim_ext(struct llog_rec_hdr *hdr, extra_flags; } + if (unlikely(hdr->lrh_len == 0)) { + /* It is corruption case, we cannot know the next rec, + * jump to the last one directly to avoid dead loop. */ + LCONSOLE(D_WARNING, "Hit invalid llog record: " + "idx %u, type %u, id %u\n", + hdr->lrh_index, hdr->lrh_type, hdr->lrh_id); + hdr = llog_rec_hdr_next(last_hdr); + if (unlikely(hdr == last_hdr)) + LCONSOLE(D_WARNING, "The last record crashed: " + "idx %u, type %u, id %u\n", + hdr->lrh_index, hdr->lrh_type, + hdr->lrh_id); + break; + } + changelog_remap_rec(rec, rec->cr_flags & flags, xflag); hdr = llog_rec_hdr_next(hdr); + /* Yield CPU to avoid soft-lockup if there are too many records + * to be handled. */ + cond_resched(); } while ((char *)hdr <= (char *)last_hdr); } @@ -840,8 +891,6 @@ static int llog_osd_next_block(const struct lu_env *env, int last_idx = *cur_idx; __u64 last_offset = *cur_offset; bool force_mini_rec = false; - enum changelog_rec_flags flags; - enum changelog_rec_extra_flags xflags; ENTRY; @@ -857,7 +906,7 @@ static int llog_osd_next_block(const struct lu_env *env, o = loghandle->lgh_obj; LASSERT(o); - LASSERT(dt_object_exists(o)); + LASSERT(llog_osd_exist(loghandle)); dt = lu2dt_dev(o->do_lu.lo_dev); LASSERT(dt); @@ -865,8 +914,9 @@ static int llog_osd_next_block(const struct lu_env *env, if (rc) GOTO(out, rc); - CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off" - "%llu), size %llu\n", next_idx, *cur_idx, + CDEBUG(D_OTHER, + "looking for log index %u (cur idx %u off %llu), size %llu\n", + next_idx, *cur_idx, *cur_offset, lgi->lgi_attr.la_size); while (*cur_offset < lgi->lgi_attr.la_size) { @@ -921,9 +971,25 @@ static int llog_osd_next_block(const struct lu_env *env, rec = buf; if (LLOG_REC_HDR_NEEDS_SWABBING(rec)) lustre_swab_llog_rec(rec); - tail = (struct llog_rec_tail *)((char *)buf + rc - sizeof(struct llog_rec_tail)); + + if (llog_verify_record(loghandle, rec)) { + /* + * the block seems corrupted. make a pad record so the + * caller can skip the block and try with the next one + */ + rec->lrh_len = rc; + rec->lrh_index = next_idx; + rec->lrh_type = LLOG_PAD_MAGIC; + + tail = rec_tail(rec); + tail->lrt_len = rc; + tail->lrt_index = next_idx; + + GOTO(out, rc = 0); + } + /* get the last record in block */ last_rec = (struct llog_rec_hdr *)((char *)buf + rc - tail->lrt_len); @@ -932,9 +998,7 @@ static int llog_osd_next_block(const struct lu_env *env, lustre_swab_llog_rec(last_rec); if (last_rec->lrh_index != tail->lrt_index) { - CERROR("%s: invalid llog tail at log id "DFID":%x " - "offset %llu last_rec idx %u tail idx %u" - "lrt len %u read_size %d\n", + CERROR("%s: invalid llog tail at log id "DFID":%x offset %llu last_rec idx %u tail idx %u lrt len %u read_size %d\n", o->do_lu.lo_dev->ld_obd->obd_name, PFID(&loghandle->lgh_id.lgl_oi.oi_fid), loghandle->lgh_id.lgl_ogen, *cur_offset, @@ -962,7 +1026,7 @@ static int llog_osd_next_block(const struct lu_env *env, /* sanity check that the start of the new buffer is no farther * than the record that we wanted. This shouldn't happen. */ - if (rec->lrh_index > next_idx) { + if (next_idx && rec->lrh_index > next_idx) { if (!force_mini_rec && next_idx > last_idx) goto retry; @@ -973,16 +1037,7 @@ static int llog_osd_next_block(const struct lu_env *env, } /* Trim unsupported extensions for compat w/ older clients */ - flags = CLF_SUPPORTED; - xflags = CLFE_SUPPORTED; - if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_X_UIDGID)) - xflags &= ~CLFE_UIDGID; - if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_EXTRA_FLAGS)) - flags &= ~CLF_EXTRA_FLAGS; - if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_JOBID)) - flags &= ~CLF_JOBID; - if (flags != CLF_SUPPORTED || xflags != CLFE_SUPPORTED) - changelog_block_trim_ext(rec, last_rec, flags, xflags); + changelog_block_trim_ext(rec, last_rec, loghandle); GOTO(out, rc = 0); @@ -1025,8 +1080,6 @@ static int llog_osd_prev_block(const struct lu_env *env, struct dt_device *dt; loff_t cur_offset; __u32 chunk_size; - enum changelog_rec_flags flags; - enum changelog_rec_extra_flags xflags; int rc; ENTRY; @@ -1042,7 +1095,7 @@ static int llog_osd_prev_block(const struct lu_env *env, o = loghandle->lgh_obj; LASSERT(o); - LASSERT(dt_object_exists(o)); + LASSERT(llog_osd_exist(loghandle)); dt = lu2dt_dev(o->do_lu.lo_dev); LASSERT(dt); @@ -1119,16 +1172,7 @@ static int llog_osd_prev_block(const struct lu_env *env, } /* Trim unsupported extensions for compat w/ older clients */ - flags = CLF_SUPPORTED; - xflags = CLFE_SUPPORTED; - if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_X_UIDGID)) - xflags &= ~CLFE_UIDGID; - if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_EXTRA_FLAGS)) - flags &= ~CLF_EXTRA_FLAGS; - if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_JOBID)) - flags &= ~CLF_JOBID; - if (flags != CLF_SUPPORTED || xflags != CLFE_SUPPORTED) - changelog_block_trim_ext(rec, last_rec, flags, xflags); + changelog_block_trim_ext(rec, last_rec, loghandle); GOTO(out, rc = 0); } @@ -1223,8 +1267,7 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle, } else { /* If logid == NULL, then it means the caller needs * to allocate new FID (llog_cat_declare_add_rec()). */ - rc = obd_fid_alloc(env, ctxt->loc_exp, - &lgi->lgi_fid, NULL); + rc = dt_fid_alloc(env, dt, &lgi->lgi_fid, NULL, NULL); if (rc < 0) RETURN(rc); rc = 0; @@ -1285,7 +1328,12 @@ generate: GOTO(out, rc); new_id = true; } - + if (OBD_FAIL_PRECHECK(OBD_FAIL_MDS_LLOG_UMOUNT_RACE) && + cfs_fail_val == 1) { + cfs_fail_val = 2; + OBD_RACE(OBD_FAIL_MDS_LLOG_UMOUNT_RACE); + msleep(MSEC_PER_SEC); + } o = ls_locate(env, ls, &lgi->lgi_fid, NULL); if (IS_ERR(o)) GOTO(out_name, rc = PTR_ERR(o)); @@ -1417,7 +1465,7 @@ llog_osd_regular_fid_add_name_entry(const struct lu_env *env, (struct dt_key *)name, th); } else { rc = dt_insert(env, dir, (struct dt_rec *)rec, - (struct dt_key *)name, th, 1); + (struct dt_key *)name, th); } dt_write_unlock(env, dir); @@ -1584,8 +1632,7 @@ static int llog_osd_create(const struct lu_env *env, struct llog_handle *res, rec->rec_type = S_IFREG; dt_read_lock(env, llog_dir, 0); rc = dt_insert(env, llog_dir, (struct dt_rec *)rec, - (struct dt_key *)res->lgh_name, - th, 1); + (struct dt_key *)res->lgh_name, th); dt_read_unlock(env, llog_dir); dt_object_put(env, llog_dir); if (rc) @@ -1775,7 +1822,7 @@ static int llog_osd_destroy(const struct lu_env *env, LASSERT(o != NULL); dt_write_lock(env, o, 0); - if (!dt_object_exists(o)) + if (!llog_osd_exist(loghandle)) GOTO(out_unlock, rc = 0); if (loghandle->lgh_name) { @@ -1801,6 +1848,7 @@ static int llog_osd_destroy(const struct lu_env *env, if (rc < 0) GOTO(out_unlock, rc); + loghandle->lgh_destroyed = true; if (loghandle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) { rc = llog_osd_regular_fid_del_name_entry(env, o, th, false); if (rc < 0) @@ -1986,7 +2034,7 @@ int llog_osd_get_cat_list(const struct lu_env *env, struct dt_device *d, if (IS_ERR(th)) GOTO(out, rc = PTR_ERR(th)); - lgi->lgi_attr.la_valid = LA_MODE; + lgi->lgi_attr.la_valid = LA_MODE | LA_TYPE; lgi->lgi_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR; lgi->lgi_dof.dof_type = dt_mode_to_dft(S_IFREG);