X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fobdclass%2Fllog_osd.c;h=5aed12e9b9d6d431a95bf50730cbdfd686fb67d7;hb=ae1404feefc1572fdafed938a3fc18131d675678;hp=32d74d81919a5dc17a3a876ed69be4f9d2c53310;hpb=d10200a80770f0029d1d665af954187b9ad883df;p=fs%2Flustre-release.git diff --git a/lustre/obdclass/llog_osd.c b/lustre/obdclass/llog_osd.c index 32d74d8..5aed12e 100644 --- a/lustre/obdclass/llog_osd.c +++ b/lustre/obdclass/llog_osd.c @@ -23,11 +23,10 @@ * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2012, 2016, Intel Corporation. + * Copyright (c) 2012, 2017, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ - * Lustre is a trademark of Sun Microsystems, Inc. */ /* * lustre/obdclass/llog_osd.c @@ -44,6 +43,8 @@ #define DEBUG_SUBSYSTEM S_LOG +#include + #include #include #include @@ -124,8 +125,7 @@ static int llog_osd_create_new_object(const struct lu_env *env, static int llog_osd_exist(struct llog_handle *handle) { LASSERT(handle->lgh_obj); - return dt_object_exists(handle->lgh_obj) && - !lu_object_is_dying(handle->lgh_obj->do_lu.lo_header); + return dt_object_exists(handle->lgh_obj) && !handle->lgh_destroyed; } static void *rec_tail(struct llog_rec_hdr *rec) @@ -221,15 +221,17 @@ static int llog_osd_read_header(const struct lu_env *env, lgi = llog_info(env); + dt_read_lock(env, o, 0); + rc = dt_attr_get(env, o, &lgi->lgi_attr); if (rc) - RETURN(rc); + GOTO(unlock, rc); LASSERT(lgi->lgi_attr.la_valid & LA_SIZE); if (lgi->lgi_attr.la_size == 0) { CDEBUG(D_HA, "not reading header from 0-byte log\n"); - RETURN(LLOG_EEMPTY); + GOTO(unlock, rc = LLOG_EEMPTY); } flags = handle->lgh_hdr->llh_flags; @@ -248,7 +250,7 @@ static int llog_osd_read_header(const struct lu_env *env, if (rc >= 0) rc = -EFAULT; - RETURN(rc); + GOTO(unlock, rc); } if (LLOG_REC_HDR_NEEDS_SWABBING(llh_hdr)) @@ -260,7 +262,7 @@ static int llog_osd_read_header(const struct lu_env *env, handle->lgh_name ? handle->lgh_name : "", PFID(lu_object_fid(&o->do_lu)), llh_hdr->lrh_type, LLOG_HDR_MAGIC); - RETURN(-EIO); + GOTO(unlock, rc = -EIO); } else if (llh_hdr->lrh_len < LLOG_MIN_CHUNK_SIZE || llh_hdr->lrh_len > handle->lgh_hdr_size) { CERROR("%s: incorrectly sized log %s "DFID" header: " @@ -270,7 +272,7 @@ static int llog_osd_read_header(const struct lu_env *env, handle->lgh_name ? handle->lgh_name : "", PFID(lu_object_fid(&o->do_lu)), llh_hdr->lrh_len, LLOG_MIN_CHUNK_SIZE); - RETURN(-EIO); + GOTO(unlock, rc = -EIO); } else if (LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index > LLOG_HDR_BITMAP_SIZE(handle->lgh_hdr) || LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_len != @@ -281,14 +283,16 @@ static int llog_osd_read_header(const struct lu_env *env, handle->lgh_name ? handle->lgh_name : "", PFID(lu_object_fid(&o->do_lu)), LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_len, -EIO); - RETURN(-EIO); + GOTO(unlock, rc = -EIO); } handle->lgh_hdr->llh_flags |= (flags & LLOG_F_EXT_MASK); handle->lgh_last_idx = LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index; - handle->lgh_write_offset = lgi->lgi_attr.la_size; + rc = 0; - RETURN(0); +unlock: + dt_read_unlock(env, o); + RETURN(rc); } /** @@ -363,7 +367,7 @@ static int llog_osd_declare_write_rec(const struct lu_env *env, * the full llog record to write. This is * the beginning of buffer to write, the length * of buffer is stored in \a rec::lrh_len - * \param[out] reccookie pointer to the cookie to return back if needed. + * \param[in,out] reccookie pointer to the cookie to return back if needed. * It is used for further cancel of this llog * record. * \param[in] idx index of the llog record. If \a idx == -1 then @@ -381,16 +385,16 @@ static int llog_osd_write_rec(const struct lu_env *env, struct llog_cookie *reccookie, int idx, struct thandle *th) { - struct llog_thread_info *lgi = llog_info(env); - struct llog_log_hdr *llh; - int reclen = rec->lrh_len; - int index, rc; - struct llog_rec_tail *lrt; - struct dt_object *o; - __u32 chunk_size; - size_t left; - __u32 orig_last_idx; - __u64 orig_write_offset; + struct llog_thread_info *lgi = llog_info(env); + struct llog_log_hdr *llh; + int reclen = rec->lrh_len; + int index, rc; + struct llog_rec_tail *lrt; + struct dt_object *o; + __u32 chunk_size; + size_t left; + __u32 orig_last_idx; + bool pad = false; ENTRY; llh = loghandle->lgh_hdr; @@ -413,6 +417,9 @@ static int llog_osd_write_rec(const struct lu_env *env, LASSERT(llh->llh_size == reclen); } + /* return error if osp object is stale */ + if (idx != LLOG_HEADER_IDX && dt_object_stale(o)) + RETURN(-ESTALE); rc = dt_attr_get(env, o, &lgi->lgi_attr); if (rc) RETURN(rc); @@ -438,7 +445,7 @@ static int llog_osd_write_rec(const struct lu_env *env, /* llog can be empty only when first record is being written */ LASSERT(ergo(idx > 0, lgi->lgi_attr.la_size > 0)); - if (!ext2_test_bit(idx, LLOG_HDR_BITMAP(llh))) { + if (!test_bit_le(idx, LLOG_HDR_BITMAP(llh))) { CERROR("%s: modify unset record %u\n", o->do_lu.lo_dev->ld_obd->obd_name, idx); RETURN(-ENOENT); @@ -492,26 +499,26 @@ static int llog_osd_write_rec(const struct lu_env *env, &lgi->lgi_off, th); RETURN(rc); - } else if (loghandle->lgh_cur_idx > 0) { + } else if (llh->llh_flags & LLOG_F_IS_FIXSIZE) { + lgi->lgi_off = llh->llh_hdr.lrh_len + + (idx - 1) * reclen; + } else if (reccookie != NULL && reccookie->lgc_index > 0) { /** - * The lgh_cur_offset can be used only if index is + * The lgc_offset can be used only if index is * the same. */ - if (idx != loghandle->lgh_cur_idx) { + if (idx != reccookie->lgc_index) { CERROR("%s: modify index mismatch %d %d\n", o->do_lu.lo_dev->ld_obd->obd_name, idx, - loghandle->lgh_cur_idx); + reccookie->lgc_index); RETURN(-EFAULT); } - lgi->lgi_off = loghandle->lgh_cur_offset; - CDEBUG(D_OTHER, "modify record "DOSTID": idx:%d, " + lgi->lgi_off = reccookie->lgc_offset; + CDEBUG(D_OTHER, "modify record "DFID": idx:%u, " "len:%u offset %llu\n", - POSTID(&loghandle->lgh_id.lgl_oi), idx, + PFID(&loghandle->lgh_id.lgl_oi.oi_fid), idx, rec->lrh_len, (long long)lgi->lgi_off); - } else if (llh->llh_flags & LLOG_F_IS_FIXSIZE) { - lgi->lgi_off = llh->llh_hdr.lrh_len + - (idx - 1) * reclen; } else { /* This can be result of lgh_cur_idx is not set during * llog processing or llh_size is not set to proper @@ -556,16 +563,14 @@ static int llog_osd_write_rec(const struct lu_env *env, LASSERT(lgi->lgi_attr.la_valid & LA_SIZE); orig_last_idx = loghandle->lgh_last_idx; - orig_write_offset = loghandle->lgh_write_offset; lgi->lgi_off = lgi->lgi_attr.la_size; if (loghandle->lgh_max_size > 0 && lgi->lgi_off >= loghandle->lgh_max_size) { CDEBUG(D_OTHER, "llog is getting too large (%u > %u) at %u " - DOSTID"\n", (unsigned)lgi->lgi_off, - loghandle->lgh_max_size, - (int)loghandle->lgh_last_idx, - POSTID(&loghandle->lgh_id.lgl_oi)); + DFID"\n", (unsigned)lgi->lgi_off, + loghandle->lgh_max_size, (int)loghandle->lgh_last_idx, + PFID(&loghandle->lgh_id.lgl_oi.oi_fid)); /* this is to signal that this llog is full */ loghandle->lgh_last_idx = LLOG_HDR_BITMAP_SIZE(llh) - 1; RETURN(-ENOSPC); @@ -580,10 +585,8 @@ static int llog_osd_write_rec(const struct lu_env *env, if (rc) RETURN(rc); - if (dt_object_remote(o)) - loghandle->lgh_write_offset = lgi->lgi_off; - loghandle->lgh_last_idx++; /* for pad rec */ + pad = true; } /* if it's the last idx in log file, then return -ENOSPC * or wrap around if a catalog */ @@ -597,6 +600,7 @@ static int llog_osd_write_rec(const struct lu_env *env, RETURN(-ENOSPC); } + down_write(&loghandle->lgh_last_sem); /* increment the last_idx along with llh_tail index, they should * be equal for a llog lifetime */ loghandle->lgh_last_idx++; @@ -616,9 +620,10 @@ static int llog_osd_write_rec(const struct lu_env *env, /* the lgh_hdr_mutex protects llog header data from concurrent * update/cancel, the llh_count and llh_bitmap are protected */ mutex_lock(&loghandle->lgh_hdr_mutex); - if (ext2_set_bit(index, LLOG_HDR_BITMAP(llh))) { - CERROR("%s: index %u already set in log bitmap\n", - o->do_lu.lo_dev->ld_obd->obd_name, index); + if (__test_and_set_bit_le(index, LLOG_HDR_BITMAP(llh))) { + CERROR("%s: index %u already set in llog bitmap "DFID"\n", + o->do_lu.lo_dev->ld_obd->obd_name, index, + PFID(lu_object_fid(&o->do_lu))); mutex_unlock(&loghandle->lgh_hdr_mutex); LBUG(); /* should never happen */ } @@ -632,6 +637,14 @@ static int llog_osd_write_rec(const struct lu_env *env, llh->llh_size = reclen; } + /* + * readers (e.g. llog_osd_read_header()) must not find + * llog updated partially (bitmap/counter claims record, + * but a record hasn't been added yet) as this results + * in EIO. + */ + dt_write_lock(env, o, 0); + if (lgi->lgi_attr.la_size == 0) { lgi->lgi_off = 0; lgi->lgi_buf.lb_len = llh->llh_hdr.lrh_len; @@ -673,24 +686,36 @@ static int llog_osd_write_rec(const struct lu_env *env, if (rc != 0) GOTO(out_unlock, rc); } + if (OBD_FAIL_PRECHECK(OBD_FAIL_LLOG_PAUSE_AFTER_PAD) && pad) { + /* a window for concurrent llog reader, see LU-12577 */ + OBD_FAIL_TIMEOUT(OBD_FAIL_LLOG_PAUSE_AFTER_PAD, + cfs_fail_val ?: 1); + } out_unlock: /* unlock here for remote object */ mutex_unlock(&loghandle->lgh_hdr_mutex); - if (rc) + if (rc) { + dt_write_unlock(env, o); GOTO(out, rc); + } + if (OBD_FAIL_PRECHECK(OBD_FAIL_LLOG_PROCESS_TIMEOUT) && + cfs_fail_val == (unsigned int)(loghandle->lgh_id.lgl_oi.oi.oi_id & + 0xFFFFFFFF)) { + OBD_RACE(OBD_FAIL_LLOG_PROCESS_TIMEOUT); + msleep(1 * MSEC_PER_SEC); + } /* computed index can be used to determine offset for fixed-size * records. This also allows to handle Catalog wrap around case */ if (llh->llh_flags & LLOG_F_IS_FIXSIZE) { lgi->lgi_off = llh->llh_hdr.lrh_len + (index - 1) * reclen; - } else if (dt_object_remote(o)) { - lgi->lgi_off = max_t(__u64, loghandle->lgh_write_offset, - lgi->lgi_off); } else { rc = dt_attr_get(env, o, &lgi->lgi_attr); - if (rc) + if (rc) { + dt_write_unlock(env, o); GOTO(out, rc); + } LASSERT(lgi->lgi_attr.la_valid & LA_SIZE); lgi->lgi_off = max_t(__u64, lgi->lgi_attr.la_size, @@ -700,13 +725,14 @@ out_unlock: lgi->lgi_buf.lb_len = reclen; lgi->lgi_buf.lb_buf = rec; rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); + + dt_write_unlock(env, o); if (rc < 0) GOTO(out, rc); - if (dt_object_remote(o)) - loghandle->lgh_write_offset = lgi->lgi_off; + up_write(&loghandle->lgh_last_sem); - CDEBUG(D_HA, "added record "DFID": idx: %u, %u off%llu\n", + CDEBUG(D_HA, "added record "DFID".%u, %u off%llu\n", PFID(lu_object_fid(&o->do_lu)), index, rec->lrh_len, lgi->lgi_off); if (reccookie != NULL) { @@ -725,14 +751,13 @@ out_unlock: out: /* cleanup llog for error case */ mutex_lock(&loghandle->lgh_hdr_mutex); - ext2_clear_bit(index, LLOG_HDR_BITMAP(llh)); + clear_bit_le(index, LLOG_HDR_BITMAP(llh)); llh->llh_count--; mutex_unlock(&loghandle->lgh_hdr_mutex); /* restore llog last_idx */ if (dt_object_remote(o)) { loghandle->lgh_last_idx = orig_last_idx; - loghandle->lgh_write_offset = orig_write_offset; } else if (--loghandle->lgh_last_idx == 0 && (llh->llh_flags & LLOG_F_IS_CAT) && llh->llh_cat_idx != 0) { /* catalog had just wrap-around case */ @@ -740,6 +765,7 @@ out: } LLOG_HDR_TAIL(llh)->lrt_index = loghandle->lgh_last_idx; + up_write(&loghandle->lgh_last_sem); RETURN(rc); } @@ -795,22 +821,67 @@ static inline void llog_skip_over(struct llog_handle *lgh, __u64 *off, * big enough to handle the remapped records. It is also assumed that records * of a block have the same format (i.e.: the same features enabled). * - * \param[in,out] hdr Header of the block of records to remap. - * \param[in,out] last_hdr Last header, don't read past this point. - * \param[in] flags Flags describing the fields to keep. + * \param[in,out] hdr Header of the block of records to remap. + * \param[in,out] last_hdr Last header, don't read past this point. + * \param[in] flags Flags describing the fields to keep. + * \param[in] extra_flags Flags describing the extra fields to keep. */ static void changelog_block_trim_ext(struct llog_rec_hdr *hdr, struct llog_rec_hdr *last_hdr, - enum changelog_rec_flags flags) + struct llog_handle *loghandle) { + enum changelog_rec_flags flags = CLF_SUPPORTED; + enum changelog_rec_extra_flags extra_flags = CLFE_SUPPORTED; + + if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_X_XATTR)) + extra_flags &= ~CLFE_XATTR; + if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_X_OMODE)) + extra_flags &= ~CLFE_OPEN; + if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_X_NID)) + extra_flags &= ~CLFE_NID; + if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_X_UIDGID)) + extra_flags &= ~CLFE_UIDGID; + if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_EXTRA_FLAGS)) + flags &= ~CLF_EXTRA_FLAGS; + if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_JOBID)) + flags &= ~CLF_JOBID; + + if (flags == CLF_SUPPORTED && extra_flags == CLFE_SUPPORTED) + return; + if (hdr->lrh_type != CHANGELOG_REC) return; do { struct changelog_rec *rec = (struct changelog_rec *)(hdr + 1); + enum changelog_rec_extra_flags xflag = CLFE_INVALID; + + if (flags & CLF_EXTRA_FLAGS && + rec->cr_flags & CLF_EXTRA_FLAGS) { + xflag = changelog_rec_extra_flags(rec)->cr_extra_flags & + extra_flags; + } + + if (unlikely(hdr->lrh_len == 0)) { + /* It is corruption case, we cannot know the next rec, + * jump to the last one directly to avoid dead loop. */ + LCONSOLE(D_WARNING, "Hit invalid llog record: " + "idx %u, type %u, id %u\n", + hdr->lrh_index, hdr->lrh_type, hdr->lrh_id); + hdr = llog_rec_hdr_next(last_hdr); + if (unlikely(hdr == last_hdr)) + LCONSOLE(D_WARNING, "The last record crashed: " + "idx %u, type %u, id %u\n", + hdr->lrh_index, hdr->lrh_type, + hdr->lrh_id); + break; + } - changelog_remap_rec(rec, rec->cr_flags & flags); + changelog_remap_rec(rec, rec->cr_flags & flags, xflag); hdr = llog_rec_hdr_next(hdr); + /* Yield CPU to avoid soft-lockup if there are too many records + * to be handled. */ + cond_resched(); } while ((char *)hdr <= (char *)last_hdr); } @@ -858,9 +929,18 @@ static int llog_osd_next_block(const struct lu_env *env, LASSERT(loghandle); LASSERT(loghandle->lgh_ctxt); + if (OBD_FAIL_PRECHECK(OBD_FAIL_MDS_CHANGELOG_DEL) && + cfs_fail_val == ((unsigned long)loghandle & 0xFFFFFFFF)) { + OBD_RACE(OBD_FAIL_MDS_CHANGELOG_DEL); + msleep(MSEC_PER_SEC >> 2); + } + o = loghandle->lgh_obj; LASSERT(o); - LASSERT(dt_object_exists(o)); + dt_read_lock(env, o, 0); + if (!llog_osd_exist(loghandle)) + GOTO(out, rc = -ESTALE); //object was destroyed + dt = lu2dt_dev(o->do_lu.lo_dev); LASSERT(dt); @@ -868,8 +948,9 @@ static int llog_osd_next_block(const struct lu_env *env, if (rc) GOTO(out, rc); - CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off" - "%llu), size %llu\n", next_idx, *cur_idx, + CDEBUG(D_OTHER, + "looking for log index %u (cur idx %u off %llu), size %llu\n", + next_idx, *cur_idx, *cur_offset, lgi->lgi_attr.la_size); while (*cur_offset < lgi->lgi_attr.la_size) { @@ -913,10 +994,10 @@ static int llog_osd_next_block(const struct lu_env *env, if (!force_mini_rec) goto retry; - CERROR("%s: invalid llog block at log id "DOSTID"/%u " + CERROR("%s: invalid llog block at log id "DFID":%x " "offset %llu\n", o->do_lu.lo_dev->ld_obd->obd_name, - POSTID(&loghandle->lgh_id.lgl_oi), + PFID(&loghandle->lgh_id.lgl_oi.oi_fid), loghandle->lgh_id.lgl_ogen, *cur_offset); GOTO(out, rc = -EINVAL); } @@ -924,9 +1005,25 @@ static int llog_osd_next_block(const struct lu_env *env, rec = buf; if (LLOG_REC_HDR_NEEDS_SWABBING(rec)) lustre_swab_llog_rec(rec); - tail = (struct llog_rec_tail *)((char *)buf + rc - sizeof(struct llog_rec_tail)); + + if (llog_verify_record(loghandle, rec)) { + /* + * the block seems corrupted. make a pad record so the + * caller can skip the block and try with the next one + */ + rec->lrh_len = rc; + rec->lrh_index = next_idx; + rec->lrh_type = LLOG_PAD_MAGIC; + + tail = rec_tail(rec); + tail->lrt_len = rc; + tail->lrt_index = next_idx; + + GOTO(out, rc = 0); + } + /* get the last record in block */ last_rec = (struct llog_rec_hdr *)((char *)buf + rc - tail->lrt_len); @@ -935,12 +1032,12 @@ static int llog_osd_next_block(const struct lu_env *env, lustre_swab_llog_rec(last_rec); if (last_rec->lrh_index != tail->lrt_index) { - CERROR("%s: invalid llog tail at log id "DOSTID"/%u " - "offset %llu last_rec idx %u tail idx %u\n", + CERROR("%s: invalid llog tail at log id "DFID":%x offset %llu last_rec idx %u tail idx %u lrt len %u read_size %d\n", o->do_lu.lo_dev->ld_obd->obd_name, - POSTID(&loghandle->lgh_id.lgl_oi), + PFID(&loghandle->lgh_id.lgl_oi.oi_fid), loghandle->lgh_id.lgl_ogen, *cur_offset, - last_rec->lrh_index, tail->lrt_index); + last_rec->lrh_index, tail->lrt_index, + tail->lrt_len, rc); GOTO(out, rc = -EINVAL); } @@ -948,10 +1045,10 @@ static int llog_osd_next_block(const struct lu_env *env, /* this shouldn't happen */ if (tail->lrt_index == 0) { - CERROR("%s: invalid llog tail at log id "DOSTID"/%u " + CERROR("%s: invalid llog tail at log id "DFID":%x " "offset %llu bytes %d\n", o->do_lu.lo_dev->ld_obd->obd_name, - POSTID(&loghandle->lgh_id.lgl_oi), + PFID(&loghandle->lgh_id.lgl_oi.oi_fid), loghandle->lgh_id.lgl_ogen, *cur_offset, rc); GOTO(out, rc = -EINVAL); } @@ -963,7 +1060,7 @@ static int llog_osd_next_block(const struct lu_env *env, /* sanity check that the start of the new buffer is no farther * than the record that we wanted. This shouldn't happen. */ - if (rec->lrh_index > next_idx) { + if (next_idx && rec->lrh_index > next_idx) { if (!force_mini_rec && next_idx > last_idx) goto retry; @@ -974,9 +1071,7 @@ static int llog_osd_next_block(const struct lu_env *env, } /* Trim unsupported extensions for compat w/ older clients */ - if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_JOBID)) - changelog_block_trim_ext(rec, last_rec, - CLF_VERSION | CLF_RENAME); + changelog_block_trim_ext(rec, last_rec, loghandle); GOTO(out, rc = 0); @@ -990,6 +1085,7 @@ retry: } GOTO(out, rc = -EIO); out: + dt_read_unlock(env, o); return rc; } @@ -1034,7 +1130,10 @@ static int llog_osd_prev_block(const struct lu_env *env, o = loghandle->lgh_obj; LASSERT(o); - LASSERT(dt_object_exists(o)); + dt_read_lock(env, o, 0); + if (!llog_osd_exist(loghandle)) + GOTO(out, rc = -ESTALE); + dt = lu2dt_dev(o->do_lu.lo_dev); LASSERT(dt); @@ -1067,10 +1166,10 @@ static int llog_osd_prev_block(const struct lu_env *env, GOTO(out, rc); if (rc < sizeof(*tail)) { - CERROR("%s: invalid llog block at log id "DOSTID"/%u " + CERROR("%s: invalid llog block at log id "DFID":%x " "offset %llu\n", o->do_lu.lo_dev->ld_obd->obd_name, - POSTID(&loghandle->lgh_id.lgl_oi), + PFID(&loghandle->lgh_id.lgl_oi.oi_fid), loghandle->lgh_id.lgl_ogen, cur_offset); GOTO(out, rc = -EINVAL); } @@ -1091,10 +1190,10 @@ static int llog_osd_prev_block(const struct lu_env *env, /* this shouldn't happen */ if (tail->lrt_index == 0) { - CERROR("%s: invalid llog tail at log id "DOSTID"/%u " + CERROR("%s: invalid llog tail at log id "DFID":%x " "offset %llu\n", o->do_lu.lo_dev->ld_obd->obd_name, - POSTID(&loghandle->lgh_id.lgl_oi), + PFID(&loghandle->lgh_id.lgl_oi.oi_fid), loghandle->lgh_id.lgl_ogen, cur_offset); GOTO(out, rc = -EINVAL); } @@ -1111,14 +1210,13 @@ static int llog_osd_prev_block(const struct lu_env *env, } /* Trim unsupported extensions for compat w/ older clients */ - if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_JOBID)) - changelog_block_trim_ext(rec, last_rec, - CLF_VERSION | CLF_RENAME); + changelog_block_trim_ext(rec, last_rec, loghandle); GOTO(out, rc = 0); } GOTO(out, rc = -EIO); out: + dt_read_unlock(env, o); return rc; } @@ -1148,7 +1246,7 @@ static struct dt_object *llog_osd_dir_get(const struct lu_env *env, dir = dt_locate(env, dt, &dti->dti_fid); if (!IS_ERR(dir) && !dt_try_as_dir(env, dir)) { - lu_object_put(env, &dir->do_lu); + dt_object_put(env, dir); return ERR_PTR(-ENOTDIR); } } else { @@ -1208,8 +1306,7 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle, } else { /* If logid == NULL, then it means the caller needs * to allocate new FID (llog_cat_declare_add_rec()). */ - rc = obd_fid_alloc(env, ctxt->loc_exp, - &lgi->lgi_fid, NULL); + rc = dt_fid_alloc(env, dt, &lgi->lgi_fid, NULL, NULL); if (rc < 0) RETURN(rc); rc = 0; @@ -1247,7 +1344,7 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle, dt_read_lock(env, llog_dir, 0); rc = dt_lookup_dir(env, llog_dir, name, &lgi->lgi_fid); dt_read_unlock(env, llog_dir); - lu_object_put(env, &llog_dir->do_lu); + dt_object_put(env, llog_dir); if (rc == -ENOENT && open_param == LLOG_OPEN_NEW) { /* generate fid for new llog */ rc = local_object_fid_generate(env, los, @@ -1270,7 +1367,12 @@ generate: GOTO(out, rc); new_id = true; } - + if (OBD_FAIL_PRECHECK(OBD_FAIL_MDS_LLOG_UMOUNT_RACE) && + cfs_fail_val == 1) { + cfs_fail_val = 2; + OBD_RACE(OBD_FAIL_MDS_LLOG_UMOUNT_RACE); + msleep(MSEC_PER_SEC); + } o = ls_locate(env, ls, &lgi->lgi_fid, NULL); if (IS_ERR(o)) GOTO(out_name, rc = PTR_ERR(o)); @@ -1282,7 +1384,7 @@ generate: ", skipping\n", o->do_lu.lo_dev->ld_obd->obd_name, PFID(lu_object_fid(&o->do_lu))); - lu_object_put(env, &o->do_lu); + dt_object_put(env, o); /* just skip this llog ID, we shouldn't delete it because we * don't know exactly what is its purpose and state. */ goto generate; @@ -1290,9 +1392,12 @@ generate: after_open: /* No new llog is expected but doesn't exist */ - if (open_param != LLOG_OPEN_NEW && !dt_object_exists(o)) + if (open_param != LLOG_OPEN_NEW && !dt_object_exists(o)) { + CDEBUG(D_INFO, "%s: llog FID: "DFID" obj %p doesn`t exist\n", + o->do_lu.lo_dev->ld_obd->obd_name, + PFID(lu_object_fid(&o->do_lu)), o); GOTO(out_put, rc = -ENOENT); - + } fid_to_logid(&lgi->lgi_fid, &handle->lgh_id); handle->lgh_obj = o; handle->private_data = los; @@ -1301,7 +1406,7 @@ after_open: RETURN(rc); out_put: - lu_object_put(env, &o->do_lu); + dt_object_put(env, o); out_name: if (handle->lgh_name != NULL) OBD_FREE(handle->lgh_name, strlen(name) + 1); @@ -1348,7 +1453,7 @@ struct dt_object *llog_osd_get_regular_fid_dir(const struct lu_env *env, RETURN(dir); if (!dt_try_as_dir(env, dir)) { - lu_object_put(env, &dir->do_lu); + dt_object_put(env, dir); RETURN(ERR_PTR(-ENOTDIR)); } @@ -1399,11 +1504,11 @@ llog_osd_regular_fid_add_name_entry(const struct lu_env *env, (struct dt_key *)name, th); } else { rc = dt_insert(env, dir, (struct dt_rec *)rec, - (struct dt_key *)name, th, 1); + (struct dt_key *)name, th); } dt_write_unlock(env, dir); - lu_object_put(env, &dir->do_lu); + dt_object_put(env, dir); RETURN(rc); } @@ -1480,7 +1585,7 @@ static int llog_osd_declare_create(const struct lu_env *env, rc = dt_declare_insert(env, llog_dir, (struct dt_rec *)rec, (struct dt_key *)res->lgh_name, th); - lu_object_put(env, &llog_dir->do_lu); + dt_object_put(env, llog_dir); if (rc) CERROR("%s: can't declare named llog %s: rc = %d\n", o->do_lu.lo_dev->ld_obd->obd_name, @@ -1566,10 +1671,9 @@ static int llog_osd_create(const struct lu_env *env, struct llog_handle *res, rec->rec_type = S_IFREG; dt_read_lock(env, llog_dir, 0); rc = dt_insert(env, llog_dir, (struct dt_rec *)rec, - (struct dt_key *)res->lgh_name, - th, 1); + (struct dt_key *)res->lgh_name, th); dt_read_unlock(env, llog_dir); - lu_object_put(env, &llog_dir->do_lu); + dt_object_put(env, llog_dir); if (rc) CERROR("%s: can't create named llog %s: rc = %d\n", o->do_lu.lo_dev->ld_obd->obd_name, @@ -1602,11 +1706,11 @@ static int llog_osd_close(const struct lu_env *env, struct llog_handle *handle) if (handle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) { /* Remove the object from the cache, otherwise it may * hold LOD being released during cleanup process */ - lu_object_put_nocache(env, &handle->lgh_obj->do_lu); + dt_object_put_nocache(env, handle->lgh_obj); LASSERT(handle->private_data == NULL); RETURN(rc); } else { - lu_object_put(env, &handle->lgh_obj->do_lu); + dt_object_put(env, handle->lgh_obj); } los = handle->private_data; LASSERT(los); @@ -1661,7 +1765,7 @@ llog_osd_regular_fid_del_name_entry(const struct lu_env *env, } dt_write_unlock(env, dir); - lu_object_put(env, &dir->do_lu); + dt_object_put(env, dir); RETURN(rc); } @@ -1721,7 +1825,7 @@ static int llog_osd_declare_destroy(const struct lu_env *env, out_put: if (!(IS_ERR_OR_NULL(llog_dir))) - lu_object_put(env, &llog_dir->do_lu); + dt_object_put(env, llog_dir); RETURN(rc); } @@ -1757,7 +1861,7 @@ static int llog_osd_destroy(const struct lu_env *env, LASSERT(o != NULL); dt_write_lock(env, o, 0); - if (!dt_object_exists(o)) + if (!llog_osd_exist(loghandle)) GOTO(out_unlock, rc = 0); if (loghandle->lgh_name) { @@ -1783,6 +1887,7 @@ static int llog_osd_destroy(const struct lu_env *env, if (rc < 0) GOTO(out_unlock, rc); + loghandle->lgh_destroyed = true; if (loghandle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) { rc = llog_osd_regular_fid_del_name_entry(env, o, th, false); if (rc < 0) @@ -1792,7 +1897,7 @@ static int llog_osd_destroy(const struct lu_env *env, out_unlock: dt_write_unlock(env, o); if (!(IS_ERR_OR_NULL(llog_dir))) - lu_object_put(env, &llog_dir->do_lu); + dt_object_put(env, llog_dir); RETURN(rc); } @@ -1883,7 +1988,7 @@ static int llog_osd_cleanup(const struct lu_env *env, struct llog_ctxt *ctxt) return 0; } -struct llog_operations llog_osd_ops = { +const struct llog_operations llog_osd_ops = { .lop_next_block = llog_osd_next_block, .lop_prev_block = llog_osd_prev_block, .lop_read_header = llog_osd_read_header, @@ -1901,7 +2006,7 @@ struct llog_operations llog_osd_ops = { }; EXPORT_SYMBOL(llog_osd_ops); -struct llog_operations llog_common_cat_ops = { +const struct llog_operations llog_common_cat_ops = { .lop_next_block = llog_osd_next_block, .lop_prev_block = llog_osd_prev_block, .lop_read_header = llog_osd_read_header, @@ -1968,7 +2073,7 @@ int llog_osd_get_cat_list(const struct lu_env *env, struct dt_device *d, if (IS_ERR(th)) GOTO(out, rc = PTR_ERR(th)); - lgi->lgi_attr.la_valid = LA_MODE; + lgi->lgi_attr.la_valid = LA_MODE | LA_TYPE; lgi->lgi_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR; lgi->lgi_dof.dof_type = dt_mode_to_dft(S_IFREG); @@ -2038,7 +2143,7 @@ out_trans: EXIT; out: - lu_object_put(env, &o->do_lu); + dt_object_put(env, o); RETURN(rc); } EXPORT_SYMBOL(llog_osd_get_cat_list); @@ -2128,7 +2233,7 @@ int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d, out_trans: dt_trans_stop(env, d, th); out: - lu_object_put(env, &o->do_lu); + dt_object_put(env, o); RETURN(rc); } EXPORT_SYMBOL(llog_osd_put_cat_list);