X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fobdclass%2Fllog_osd.c;h=92d71869e5a914d7defacd485fe20ac641a1fb8f;hp=519fd270993817bef26a003363f11c55a22687d8;hb=82c6e42d6137f39a1f2394b7bc6e8d600eb36181;hpb=f17796cf08df5004fa68b9b4a9b0e221559389dc diff --git a/lustre/obdclass/llog_osd.c b/lustre/obdclass/llog_osd.c index 519fd27..92d7186 100644 --- a/lustre/obdclass/llog_osd.c +++ b/lustre/obdclass/llog_osd.c @@ -23,7 +23,7 @@ * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2012, 2014 Intel Corporation. + * Copyright (c) 2012, 2017, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -44,10 +44,13 @@ #define DEBUG_SUBSYSTEM S_LOG +#include + +#include +#include +#include #include #include -#include -#include #include "llog_internal.h" #include "local_storage.h" @@ -111,6 +114,28 @@ static int llog_osd_create_new_object(const struct lu_env *env, } /** + * Implementation of the llog_operations::lop_exist + * + * This function checks that llog exists on storage. + * + * \param[in] handle llog handle of the current llog + * + * \retval true if llog object exists and is not just destroyed + * \retval false if llog doesn't exist or just destroyed + */ +static int llog_osd_exist(struct llog_handle *handle) +{ + LASSERT(handle->lgh_obj); + return dt_object_exists(handle->lgh_obj) && !handle->lgh_destroyed; +} + +static void *rec_tail(struct llog_rec_hdr *rec) +{ + return (void *)((char *)rec + rec->lrh_len - + sizeof(struct llog_rec_tail)); +} + +/** * Write a padding record to the llog * * This function writes a padding record to the end of llog. That may @@ -338,7 +363,7 @@ static int llog_osd_declare_write_rec(const struct lu_env *env, * the full llog record to write. This is * the beginning of buffer to write, the length * of buffer is stored in \a rec::lrh_len - * \param[out] reccookie pointer to the cookie to return back if needed. + * \param[in,out] reccookie pointer to the cookie to return back if needed. * It is used for further cancel of this llog * record. * \param[in] idx index of the llog record. If \a idx == -1 then @@ -364,24 +389,32 @@ static int llog_osd_write_rec(const struct lu_env *env, struct dt_object *o; __u32 chunk_size; size_t left; - + __u32 orig_last_idx; ENTRY; - LASSERT(env); llh = loghandle->lgh_hdr; - LASSERT(llh); o = loghandle->lgh_obj; - LASSERT(o); - LASSERT(th); chunk_size = llh->llh_hdr.lrh_len; CDEBUG(D_OTHER, "new record %x to "DFID"\n", rec->lrh_type, PFID(lu_object_fid(&o->do_lu))); + if (!llog_osd_exist(loghandle)) + RETURN(-ENOENT); + /* record length should not bigger than */ if (reclen > loghandle->lgh_hdr->llh_hdr.lrh_len) RETURN(-E2BIG); + /* sanity check for fixed-records llog */ + if (idx != LLOG_HEADER_IDX && (llh->llh_flags & LLOG_F_IS_FIXSIZE)) { + LASSERT(llh->llh_size != 0); + LASSERT(llh->llh_size == reclen); + } + + /* return error if osp object is stale */ + if (idx != LLOG_HEADER_IDX && dt_object_stale(o)) + RETURN(-ESTALE); rc = dt_attr_get(env, o, &lgi->lgi_attr); if (rc) RETURN(rc); @@ -407,7 +440,7 @@ static int llog_osd_write_rec(const struct lu_env *env, /* llog can be empty only when first record is being written */ LASSERT(ergo(idx > 0, lgi->lgi_attr.la_size > 0)); - if (!ext2_test_bit(idx, LLOG_HDR_BITMAP(llh))) { + if (!test_bit_le(idx, LLOG_HDR_BITMAP(llh))) { CERROR("%s: modify unset record %u\n", o->do_lu.lo_dev->ld_obd->obd_name, idx); RETURN(-ENOENT); @@ -422,41 +455,65 @@ static int llog_osd_write_rec(const struct lu_env *env, if (idx == LLOG_HEADER_IDX) { /* llog header update */ - LASSERT(reclen >= sizeof(struct llog_log_hdr)); - LASSERT(rec == &llh->llh_hdr); + __u32 *bitmap = LLOG_HDR_BITMAP(llh); lgi->lgi_off = 0; - lgi->lgi_buf.lb_len = reclen; - lgi->lgi_buf.lb_buf = rec; + + /* If it does not indicate the bitmap index + * (reccookie == NULL), then it means update + * the whole update header. Otherwise only + * update header and bits needs to be updated, + * and in DNE cases, it will signaficantly + * shrink the RPC size. + * see distribute_txn_cancel_records()*/ + if (reccookie == NULL) { + lgi->lgi_buf.lb_len = reclen; + lgi->lgi_buf.lb_buf = rec; + rc = dt_record_write(env, o, &lgi->lgi_buf, + &lgi->lgi_off, th); + RETURN(rc); + } + + /* update the header */ + lgi->lgi_buf.lb_len = llh->llh_bitmap_offset; + lgi->lgi_buf.lb_buf = llh; rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); + if (rc != 0) + RETURN(rc); + + /* update the bitmap */ + index = reccookie->lgc_index; + lgi->lgi_off = llh->llh_bitmap_offset + + (index / (sizeof(*bitmap) * 8)) * + sizeof(*bitmap); + lgi->lgi_buf.lb_len = sizeof(*bitmap); + lgi->lgi_buf.lb_buf = + &bitmap[index/(sizeof(*bitmap)*8)]; + rc = dt_record_write(env, o, &lgi->lgi_buf, + &lgi->lgi_off, th); + RETURN(rc); - } else if (loghandle->lgh_cur_idx > 0) { + } else if (llh->llh_flags & LLOG_F_IS_FIXSIZE) { + lgi->lgi_off = llh->llh_hdr.lrh_len + + (idx - 1) * reclen; + } else if (reccookie != NULL && reccookie->lgc_index > 0) { /** - * The lgh_cur_offset can be used only if index is + * The lgc_offset can be used only if index is * the same. */ - if (idx != loghandle->lgh_cur_idx) { + if (idx != reccookie->lgc_index) { CERROR("%s: modify index mismatch %d %d\n", o->do_lu.lo_dev->ld_obd->obd_name, idx, - loghandle->lgh_cur_idx); + reccookie->lgc_index); RETURN(-EFAULT); } - lgi->lgi_off = loghandle->lgh_cur_offset; - CDEBUG(D_OTHER, "modify record "DOSTID": idx:%d, " + lgi->lgi_off = reccookie->lgc_offset; + CDEBUG(D_OTHER, "modify record "DFID": idx:%u, " "len:%u offset %llu\n", - POSTID(&loghandle->lgh_id.lgl_oi), idx, + PFID(&loghandle->lgh_id.lgl_oi.oi_fid), idx, rec->lrh_len, (long long)lgi->lgi_off); - } else if (llh->llh_size > 0) { - if (llh->llh_size != rec->lrh_len) { - CERROR("%s: wrong record size, llh_size is %u" - " but record size is %u\n", - o->do_lu.lo_dev->ld_obd->obd_name, - llh->llh_size, rec->lrh_len); - RETURN(-EINVAL); - } - lgi->lgi_off = sizeof(*llh) + (idx - 1) * reclen; } else { /* This can be result of lgh_cur_idx is not set during * llog processing or llh_size is not set to proper @@ -491,8 +548,29 @@ static int llog_osd_write_rec(const struct lu_env *env, * process them page-at-a-time if needed. If it will cross a chunk * boundary, write in a fake (but referenced) entry to pad the chunk. */ + + + /* simulate ENOSPC when new plain llog is being added to the + * catalog */ + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED2) && + llh->llh_flags & LLOG_F_IS_CAT) + RETURN(-ENOSPC); + LASSERT(lgi->lgi_attr.la_valid & LA_SIZE); + orig_last_idx = loghandle->lgh_last_idx; lgi->lgi_off = lgi->lgi_attr.la_size; + + if (loghandle->lgh_max_size > 0 && + lgi->lgi_off >= loghandle->lgh_max_size) { + CDEBUG(D_OTHER, "llog is getting too large (%u > %u) at %u " + DFID"\n", (unsigned)lgi->lgi_off, + loghandle->lgh_max_size, (int)loghandle->lgh_last_idx, + PFID(&loghandle->lgh_id.lgl_oi.oi_fid)); + /* this is to signal that this llog is full */ + loghandle->lgh_last_idx = LLOG_HDR_BITMAP_SIZE(llh) - 1; + RETURN(-ENOSPC); + } + left = chunk_size - (lgi->lgi_off & (chunk_size - 1)); /* NOTE: padding is a record, but no bit is set */ if (left != 0 && left != reclen && @@ -501,12 +579,22 @@ static int llog_osd_write_rec(const struct lu_env *env, rc = llog_osd_pad(env, o, &lgi->lgi_off, left, index, th); if (rc) RETURN(rc); + loghandle->lgh_last_idx++; /* for pad rec */ } - /* if it's the last idx in log file, then return -ENOSPC */ - if (loghandle->lgh_last_idx >= LLOG_HDR_BITMAP_SIZE(llh) - 1) - RETURN(-ENOSPC); + /* if it's the last idx in log file, then return -ENOSPC + * or wrap around if a catalog */ + if (llog_is_full(loghandle) || + unlikely(llh->llh_flags & LLOG_F_IS_CAT && + OBD_FAIL_PRECHECK(OBD_FAIL_CAT_RECORDS) && + loghandle->lgh_last_idx >= cfs_fail_val)) { + if (llh->llh_flags & LLOG_F_IS_CAT) + loghandle->lgh_last_idx = 0; + else + RETURN(-ENOSPC); + } + down_write(&loghandle->lgh_last_sem); /* increment the last_idx along with llh_tail index, they should * be equal for a llog lifetime */ loghandle->lgh_last_idx++; @@ -526,14 +614,23 @@ static int llog_osd_write_rec(const struct lu_env *env, /* the lgh_hdr_mutex protects llog header data from concurrent * update/cancel, the llh_count and llh_bitmap are protected */ mutex_lock(&loghandle->lgh_hdr_mutex); - if (ext2_set_bit(index, LLOG_HDR_BITMAP(llh))) { - CERROR("%s: index %u already set in log bitmap\n", - o->do_lu.lo_dev->ld_obd->obd_name, index); + if (__test_and_set_bit_le(index, LLOG_HDR_BITMAP(llh))) { + CERROR("%s: index %u already set in llog bitmap "DFID"\n", + o->do_lu.lo_dev->ld_obd->obd_name, index, + PFID(lu_object_fid(&o->do_lu))); mutex_unlock(&loghandle->lgh_hdr_mutex); LBUG(); /* should never happen */ } llh->llh_count++; + if (!(llh->llh_flags & LLOG_F_IS_FIXSIZE)) { + /* Update the minimum size of the llog record */ + if (llh->llh_size == 0) + llh->llh_size = reclen; + else if (reclen < llh->llh_size) + llh->llh_size = reclen; + } + if (lgi->lgi_attr.la_size == 0) { lgi->lgi_off = 0; lgi->lgi_buf.lb_len = llh->llh_hdr.lrh_len; @@ -552,9 +649,9 @@ static int llog_osd_write_rec(const struct lu_env *env, * the RPC (1MB limit), if we write 8K for each operation, which * will cost a lot space, and keep us adding more updates to one * update log.*/ - lgi->lgi_off = offsetof(typeof(*llh), llh_count); - lgi->lgi_buf.lb_len = sizeof(llh->llh_count); - lgi->lgi_buf.lb_buf = &llh->llh_count; + lgi->lgi_off = 0; + lgi->lgi_buf.lb_len = llh->llh_bitmap_offset; + lgi->lgi_buf.lb_buf = &llh->llh_hdr; rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); if (rc != 0) GOTO(out_unlock, rc); @@ -582,20 +679,36 @@ out_unlock: if (rc) GOTO(out, rc); - rc = dt_attr_get(env, o, &lgi->lgi_attr); - if (rc) - GOTO(out, rc); + if (OBD_FAIL_PRECHECK(OBD_FAIL_LLOG_PROCESS_TIMEOUT) && + cfs_fail_val == (unsigned int)(loghandle->lgh_id.lgl_oi.oi.oi_id & + 0xFFFFFFFF)) { + OBD_RACE(OBD_FAIL_LLOG_PROCESS_TIMEOUT); + msleep(1 * MSEC_PER_SEC); + } + /* computed index can be used to determine offset for fixed-size + * records. This also allows to handle Catalog wrap around case */ + if (llh->llh_flags & LLOG_F_IS_FIXSIZE) { + lgi->lgi_off = llh->llh_hdr.lrh_len + (index - 1) * reclen; + } else { + rc = dt_attr_get(env, o, &lgi->lgi_attr); + if (rc) + GOTO(out, rc); + + LASSERT(lgi->lgi_attr.la_valid & LA_SIZE); + lgi->lgi_off = max_t(__u64, lgi->lgi_attr.la_size, + lgi->lgi_off); + } - LASSERT(lgi->lgi_attr.la_valid & LA_SIZE); - lgi->lgi_off = max_t(__u64, lgi->lgi_attr.la_size, lgi->lgi_off); lgi->lgi_buf.lb_len = reclen; lgi->lgi_buf.lb_buf = rec; rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); if (rc < 0) GOTO(out, rc); - CDEBUG(D_OTHER, "added record "DOSTID": idx: %u, %u off"LPU64"\n", - POSTID(&loghandle->lgh_id.lgl_oi), index, rec->lrh_len, + up_write(&loghandle->lgh_last_sem); + + CDEBUG(D_HA, "added record "DFID".%u, %u off%llu\n", + PFID(lu_object_fid(&o->do_lu)), index, rec->lrh_len, lgi->lgi_off); if (reccookie != NULL) { reccookie->lgc_lgl = loghandle->lgh_id; @@ -613,13 +726,21 @@ out_unlock: out: /* cleanup llog for error case */ mutex_lock(&loghandle->lgh_hdr_mutex); - ext2_clear_bit(index, LLOG_HDR_BITMAP(llh)); + clear_bit_le(index, LLOG_HDR_BITMAP(llh)); llh->llh_count--; mutex_unlock(&loghandle->lgh_hdr_mutex); /* restore llog last_idx */ - loghandle->lgh_last_idx--; + if (dt_object_remote(o)) { + loghandle->lgh_last_idx = orig_last_idx; + } else if (--loghandle->lgh_last_idx == 0 && + (llh->llh_flags & LLOG_F_IS_CAT) && llh->llh_cat_idx != 0) { + /* catalog had just wrap-around case */ + loghandle->lgh_last_idx = LLOG_HDR_BITMAP_SIZE(llh) - 1; + } + LLOG_HDR_TAIL(llh)->lrt_index = loghandle->lgh_last_idx; + up_write(&loghandle->lgh_last_sem); RETURN(rc); } @@ -630,16 +751,38 @@ out: * that we are not far enough along the log (because the * actual records are larger than minimum size) we just skip * some more records. + * + * Note: in llog_process_thread, it will use bitmap offset as + * the index to locate the record, which also includs some pad + * records, whose record size is very small, and it also does not + * consider pad record when recording minimum record size (otherwise + * min_record size might be too small), so in some rare cases, + * it might skip too much record for @goal, see llog_osd_next_block(). + * + * When force_mini_rec is true, it means we have to use LLOG_MIN_REC_SIZE + * as the min record size to skip over, usually because in the previous + * try, it skip too much record, see loog_osd_next(prev)_block(). */ -static inline void llog_skip_over(struct llog_log_hdr *llh, __u64 *off, - int curr, int goal, __u32 chunk_size) +static inline void llog_skip_over(struct llog_handle *lgh, __u64 *off, + int curr, int goal, __u32 chunk_size, + bool force_mini_rec) { + struct llog_log_hdr *llh = lgh->lgh_hdr; + + /* Goal should not bigger than the record count */ + if (goal > lgh->lgh_last_idx) + goal = lgh->lgh_last_idx; + if (goal > curr) { - if (llh->llh_size == 0) { - /* variable size records */ - *off = *off + (goal - curr - 1) * LLOG_MIN_REC_SIZE; - } else { + if (llh->llh_flags & LLOG_F_IS_FIXSIZE) { *off = chunk_size + (goal - 1) * llh->llh_size; + } else { + __u64 min_rec_size = LLOG_MIN_REC_SIZE; + + if (llh->llh_size > 0 && !force_mini_rec) + min_rec_size = llh->llh_size; + + *off = *off + (goal - curr - 1) * min_rec_size; } } /* always align with lower chunk boundary*/ @@ -653,22 +796,67 @@ static inline void llog_skip_over(struct llog_log_hdr *llh, __u64 *off, * big enough to handle the remapped records. It is also assumed that records * of a block have the same format (i.e.: the same features enabled). * - * \param[in,out] hdr Header of the block of records to remap. - * \param[in,out] last_hdr Last header, don't read past this point. - * \param[in] flags Flags describing the fields to keep. + * \param[in,out] hdr Header of the block of records to remap. + * \param[in,out] last_hdr Last header, don't read past this point. + * \param[in] flags Flags describing the fields to keep. + * \param[in] extra_flags Flags describing the extra fields to keep. */ static void changelog_block_trim_ext(struct llog_rec_hdr *hdr, struct llog_rec_hdr *last_hdr, - enum changelog_rec_flags flags) + struct llog_handle *loghandle) { + enum changelog_rec_flags flags = CLF_SUPPORTED; + enum changelog_rec_extra_flags extra_flags = CLFE_SUPPORTED; + + if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_X_XATTR)) + extra_flags &= ~CLFE_XATTR; + if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_X_OMODE)) + extra_flags &= ~CLFE_OPEN; + if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_X_NID)) + extra_flags &= ~CLFE_NID; + if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_X_UIDGID)) + extra_flags &= ~CLFE_UIDGID; + if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_EXTRA_FLAGS)) + flags &= ~CLF_EXTRA_FLAGS; + if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_JOBID)) + flags &= ~CLF_JOBID; + + if (flags == CLF_SUPPORTED && extra_flags == CLFE_SUPPORTED) + return; + if (hdr->lrh_type != CHANGELOG_REC) return; do { struct changelog_rec *rec = (struct changelog_rec *)(hdr + 1); + enum changelog_rec_extra_flags xflag = CLFE_INVALID; - changelog_remap_rec(rec, rec->cr_flags & flags); + if (flags & CLF_EXTRA_FLAGS && + rec->cr_flags & CLF_EXTRA_FLAGS) { + xflag = changelog_rec_extra_flags(rec)->cr_extra_flags & + extra_flags; + } + + if (unlikely(hdr->lrh_len == 0)) { + /* It is corruption case, we cannot know the next rec, + * jump to the last one directly to avoid dead loop. */ + LCONSOLE(D_WARNING, "Hit invalid llog record: " + "idx %u, type %u, id %u\n", + hdr->lrh_index, hdr->lrh_type, hdr->lrh_id); + hdr = llog_rec_hdr_next(last_hdr); + if (unlikely(hdr == last_hdr)) + LCONSOLE(D_WARNING, "The last record crashed: " + "idx %u, type %u, id %u\n", + hdr->lrh_index, hdr->lrh_type, + hdr->lrh_id); + break; + } + + changelog_remap_rec(rec, rec->cr_flags & flags, xflag); hdr = llog_rec_hdr_next(hdr); + /* Yield CPU to avoid soft-lockup if there are too many records + * to be handled. */ + cond_resched(); } while ((char *)hdr <= (char *)last_hdr); } @@ -700,6 +888,9 @@ static int llog_osd_next_block(const struct lu_env *env, struct dt_device *dt; int rc; __u32 chunk_size; + int last_idx = *cur_idx; + __u64 last_offset = *cur_offset; + bool force_mini_rec = false; ENTRY; @@ -710,15 +901,12 @@ static int llog_osd_next_block(const struct lu_env *env, if (len == 0 || len & (chunk_size - 1)) RETURN(-EINVAL); - CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off "LPU64")\n", - next_idx, *cur_idx, *cur_offset); - LASSERT(loghandle); LASSERT(loghandle->lgh_ctxt); o = loghandle->lgh_obj; LASSERT(o); - LASSERT(dt_object_exists(o)); + LASSERT(llog_osd_exist(loghandle)); dt = lu2dt_dev(o->do_lu.lo_dev); LASSERT(dt); @@ -726,12 +914,17 @@ static int llog_osd_next_block(const struct lu_env *env, if (rc) GOTO(out, rc); + CDEBUG(D_OTHER, + "looking for log index %u (cur idx %u off %llu), size %llu\n", + next_idx, *cur_idx, + *cur_offset, lgi->lgi_attr.la_size); + while (*cur_offset < lgi->lgi_attr.la_size) { struct llog_rec_hdr *rec, *last_rec; struct llog_rec_tail *tail; - llog_skip_over(loghandle->lgh_hdr, cur_offset, *cur_idx, - next_idx, chunk_size); + llog_skip_over(loghandle, cur_offset, *cur_idx, + next_idx, chunk_size, force_mini_rec); /* read up to next llog chunk_size block */ lgi->lgi_buf.lb_len = chunk_size - @@ -740,8 +933,11 @@ static int llog_osd_next_block(const struct lu_env *env, rc = dt_read(env, o, &lgi->lgi_buf, cur_offset); if (rc < 0) { + if (rc == -EBADR && !force_mini_rec) + goto retry; + CERROR("%s: can't read llog block from log "DFID - " offset "LPU64": rc = %d\n", + " offset %llu: rc = %d\n", o->do_lu.lo_dev->ld_obd->obd_name, PFID(lu_object_fid(&o->do_lu)), *cur_offset, rc); @@ -754,14 +950,20 @@ static int llog_osd_next_block(const struct lu_env *env, memset(buf + rc, 0, len - rc); } - if (rc == 0) /* end of file, nothing to do */ + if (rc == 0) { /* end of file, nothing to do */ + if (!force_mini_rec) + goto retry; GOTO(out, rc); + } if (rc < sizeof(*tail)) { - CERROR("%s: invalid llog block at log id "DOSTID"/%u " - "offset "LPU64"\n", + if (!force_mini_rec) + goto retry; + + CERROR("%s: invalid llog block at log id "DFID":%x " + "offset %llu\n", o->do_lu.lo_dev->ld_obd->obd_name, - POSTID(&loghandle->lgh_id.lgl_oi), + PFID(&loghandle->lgh_id.lgl_oi.oi_fid), loghandle->lgh_id.lgl_ogen, *cur_offset); GOTO(out, rc = -EINVAL); } @@ -769,34 +971,65 @@ static int llog_osd_next_block(const struct lu_env *env, rec = buf; if (LLOG_REC_HDR_NEEDS_SWABBING(rec)) lustre_swab_llog_rec(rec); - tail = (struct llog_rec_tail *)((char *)buf + rc - sizeof(struct llog_rec_tail)); + + if (llog_verify_record(loghandle, rec)) { + /* + * the block seems corrupted. make a pad record so the + * caller can skip the block and try with the next one + */ + rec->lrh_len = rc; + rec->lrh_index = next_idx; + rec->lrh_type = LLOG_PAD_MAGIC; + + tail = rec_tail(rec); + tail->lrt_len = rc; + tail->lrt_index = next_idx; + + GOTO(out, rc = 0); + } + /* get the last record in block */ last_rec = (struct llog_rec_hdr *)((char *)buf + rc - tail->lrt_len); if (LLOG_REC_HDR_NEEDS_SWABBING(last_rec)) lustre_swab_llog_rec(last_rec); - LASSERT(last_rec->lrh_index == tail->lrt_index); + + if (last_rec->lrh_index != tail->lrt_index) { + CERROR("%s: invalid llog tail at log id "DFID":%x offset %llu last_rec idx %u tail idx %u lrt len %u read_size %d\n", + o->do_lu.lo_dev->ld_obd->obd_name, + PFID(&loghandle->lgh_id.lgl_oi.oi_fid), + loghandle->lgh_id.lgl_ogen, *cur_offset, + last_rec->lrh_index, tail->lrt_index, + tail->lrt_len, rc); + GOTO(out, rc = -EINVAL); + } *cur_idx = tail->lrt_index; /* this shouldn't happen */ if (tail->lrt_index == 0) { - CERROR("%s: invalid llog tail at log id "DOSTID"/%u " - "offset "LPU64"\n", + CERROR("%s: invalid llog tail at log id "DFID":%x " + "offset %llu bytes %d\n", o->do_lu.lo_dev->ld_obd->obd_name, - POSTID(&loghandle->lgh_id.lgl_oi), - loghandle->lgh_id.lgl_ogen, *cur_offset); + PFID(&loghandle->lgh_id.lgl_oi.oi_fid), + loghandle->lgh_id.lgl_ogen, *cur_offset, rc); GOTO(out, rc = -EINVAL); } - if (tail->lrt_index < next_idx) + if (tail->lrt_index < next_idx) { + last_idx = *cur_idx; + last_offset = *cur_offset; continue; + } /* sanity check that the start of the new buffer is no farther * than the record that we wanted. This shouldn't happen. */ - if (rec->lrh_index > next_idx) { + if (next_idx && rec->lrh_index > next_idx) { + if (!force_mini_rec && next_idx > last_idx) + goto retry; + CERROR("%s: missed desired record? %u > %u\n", o->do_lu.lo_dev->ld_obd->obd_name, rec->lrh_index, next_idx); @@ -804,11 +1037,17 @@ static int llog_osd_next_block(const struct lu_env *env, } /* Trim unsupported extensions for compat w/ older clients */ - if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_JOBID)) - changelog_block_trim_ext(rec, last_rec, - CLF_VERSION | CLF_RENAME); + changelog_block_trim_ext(rec, last_rec, loghandle); GOTO(out, rc = 0); + +retry: + /* Note: because there are some pad records in the + * llog, so llog_skip_over() might skip too much + * records, let's try skip again with minimum record */ + force_mini_rec = true; + *cur_offset = last_offset; + *cur_idx = last_idx; } GOTO(out, rc = -EIO); out: @@ -856,13 +1095,15 @@ static int llog_osd_prev_block(const struct lu_env *env, o = loghandle->lgh_obj; LASSERT(o); - LASSERT(dt_object_exists(o)); + LASSERT(llog_osd_exist(loghandle)); dt = lu2dt_dev(o->do_lu.lo_dev); LASSERT(dt); + /* Let's only use mini record size for previous block read + * for now XXX */ cur_offset = chunk_size; - llog_skip_over(loghandle->lgh_hdr, &cur_offset, 0, prev_idx, - chunk_size); + llog_skip_over(loghandle, &cur_offset, 0, prev_idx, + chunk_size, true); rc = dt_attr_get(env, o, &lgi->lgi_attr); if (rc) @@ -877,7 +1118,7 @@ static int llog_osd_prev_block(const struct lu_env *env, rc = dt_read(env, o, &lgi->lgi_buf, &cur_offset); if (rc < 0) { CERROR("%s: can't read llog block from log "DFID - " offset "LPU64": rc = %d\n", + " offset %llu: rc = %d\n", o->do_lu.lo_dev->ld_obd->obd_name, PFID(lu_object_fid(&o->do_lu)), cur_offset, rc); GOTO(out, rc); @@ -887,10 +1128,10 @@ static int llog_osd_prev_block(const struct lu_env *env, GOTO(out, rc); if (rc < sizeof(*tail)) { - CERROR("%s: invalid llog block at log id "DOSTID"/%u " - "offset "LPU64"\n", + CERROR("%s: invalid llog block at log id "DFID":%x " + "offset %llu\n", o->do_lu.lo_dev->ld_obd->obd_name, - POSTID(&loghandle->lgh_id.lgl_oi), + PFID(&loghandle->lgh_id.lgl_oi.oi_fid), loghandle->lgh_id.lgl_ogen, cur_offset); GOTO(out, rc = -EINVAL); } @@ -911,10 +1152,10 @@ static int llog_osd_prev_block(const struct lu_env *env, /* this shouldn't happen */ if (tail->lrt_index == 0) { - CERROR("%s: invalid llog tail at log id "DOSTID"/%u " - "offset "LPU64"\n", + CERROR("%s: invalid llog tail at log id "DFID":%x " + "offset %llu\n", o->do_lu.lo_dev->ld_obd->obd_name, - POSTID(&loghandle->lgh_id.lgl_oi), + PFID(&loghandle->lgh_id.lgl_oi.oi_fid), loghandle->lgh_id.lgl_ogen, cur_offset); GOTO(out, rc = -EINVAL); } @@ -931,9 +1172,7 @@ static int llog_osd_prev_block(const struct lu_env *env, } /* Trim unsupported extensions for compat w/ older clients */ - if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_JOBID)) - changelog_block_trim_ext(rec, last_rec, - CLF_VERSION | CLF_RENAME); + changelog_block_trim_ext(rec, last_rec, loghandle); GOTO(out, rc = 0); } @@ -968,7 +1207,7 @@ static struct dt_object *llog_osd_dir_get(const struct lu_env *env, dir = dt_locate(env, dt, &dti->dti_fid); if (!IS_ERR(dir) && !dt_try_as_dir(env, dir)) { - lu_object_put(env, &dir->do_lu); + dt_object_put(env, dir); return ERR_PTR(-ENOTDIR); } } else { @@ -1011,6 +1250,7 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle, struct ls_device *ls; struct local_oid_storage *los = NULL; int rc = 0; + bool new_id = false; ENTRY; @@ -1021,19 +1261,21 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle, dt = ctxt->loc_exp->exp_obd->obd_lvfs_ctxt.dt; LASSERT(dt); if (ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) { + struct lu_object_conf conf = { 0 }; if (logid != NULL) { logid_to_fid(logid, &lgi->lgi_fid); } else { /* If logid == NULL, then it means the caller needs * to allocate new FID (llog_cat_declare_add_rec()). */ - rc = obd_fid_alloc(env, ctxt->loc_exp, - &lgi->lgi_fid, NULL); + rc = dt_fid_alloc(env, dt, &lgi->lgi_fid, NULL, NULL); if (rc < 0) RETURN(rc); rc = 0; + conf.loc_flags = LOC_F_NEW; } - o = dt_locate(env, dt, &lgi->lgi_fid); + o = dt_locate_at(env, dt, &lgi->lgi_fid, + dt->dd_lu_dev.ld_site->ls_top_dev, &conf); if (IS_ERR(o)) RETURN(PTR_ERR(o)); @@ -1063,11 +1305,12 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle, dt_read_lock(env, llog_dir, 0); rc = dt_lookup_dir(env, llog_dir, name, &lgi->lgi_fid); dt_read_unlock(env, llog_dir); - lu_object_put(env, &llog_dir->do_lu); + dt_object_put(env, llog_dir); if (rc == -ENOENT && open_param == LLOG_OPEN_NEW) { /* generate fid for new llog */ rc = local_object_fid_generate(env, los, &lgi->lgi_fid); + new_id = true; } if (rc < 0) GOTO(out, rc); @@ -1079,20 +1322,43 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle, } else { LASSERTF(open_param & LLOG_OPEN_NEW, "%#x\n", open_param); /* generate fid for new llog */ +generate: rc = local_object_fid_generate(env, los, &lgi->lgi_fid); if (rc < 0) GOTO(out, rc); + new_id = true; + } + if (OBD_FAIL_PRECHECK(OBD_FAIL_MDS_LLOG_UMOUNT_RACE) && + cfs_fail_val == 1) { + cfs_fail_val = 2; + OBD_RACE(OBD_FAIL_MDS_LLOG_UMOUNT_RACE); + msleep(MSEC_PER_SEC); } - o = ls_locate(env, ls, &lgi->lgi_fid, NULL); if (IS_ERR(o)) GOTO(out_name, rc = PTR_ERR(o)); + if (dt_object_exists(o) && new_id) { + /* llog exists with just generated ID, e.g. some old llog file + * still is in use or is orphan, drop a warn and skip it. */ + CDEBUG(D_INFO, "%s: llog exists with the same FID: "DFID + ", skipping\n", + o->do_lu.lo_dev->ld_obd->obd_name, + PFID(lu_object_fid(&o->do_lu))); + dt_object_put(env, o); + /* just skip this llog ID, we shouldn't delete it because we + * don't know exactly what is its purpose and state. */ + goto generate; + } + after_open: /* No new llog is expected but doesn't exist */ - if (open_param != LLOG_OPEN_NEW && !dt_object_exists(o)) + if (open_param != LLOG_OPEN_NEW && !dt_object_exists(o)) { + CDEBUG(D_INFO, "%s: llog FID: "DFID" obj %p doesn`t exist\n", + o->do_lu.lo_dev->ld_obd->obd_name, + PFID(lu_object_fid(&o->do_lu)), o); GOTO(out_put, rc = -ENOENT); - + } fid_to_logid(&lgi->lgi_fid, &handle->lgh_id); handle->lgh_obj = o; handle->private_data = los; @@ -1101,7 +1367,7 @@ after_open: RETURN(rc); out_put: - lu_object_put(env, &o->do_lu); + dt_object_put(env, o); out_name: if (handle->lgh_name != NULL) OBD_FREE(handle->lgh_name, strlen(name) + 1); @@ -1112,23 +1378,6 @@ out: } /** - * Implementation of the llog_operations::lop_exist - * - * This function checks that llog exists on storage. - * - * \param[in] handle llog handle of the current llog - * - * \retval true if llog object exists and is not just destroyed - * \retval false if llog doesn't exist or just destroyed - */ -static int llog_osd_exist(struct llog_handle *handle) -{ - LASSERT(handle->lgh_obj); - return (dt_object_exists(handle->lgh_obj) && - !lu_object_is_dying(handle->lgh_obj->do_lu.lo_header)); -} - -/** * Get dir for regular fid log object * * Get directory for regular fid log object, and these regular fid log @@ -1165,7 +1414,7 @@ struct dt_object *llog_osd_get_regular_fid_dir(const struct lu_env *env, RETURN(dir); if (!dt_try_as_dir(env, dir)) { - lu_object_put(env, &dir->do_lu); + dt_object_put(env, dir); RETURN(ERR_PTR(-ENOTDIR)); } @@ -1216,11 +1465,11 @@ llog_osd_regular_fid_add_name_entry(const struct lu_env *env, (struct dt_key *)name, th); } else { rc = dt_insert(env, dir, (struct dt_rec *)rec, - (struct dt_key *)name, th, 1); + (struct dt_key *)name, th); } dt_write_unlock(env, dir); - lu_object_put(env, &dir->do_lu); + dt_object_put(env, dir); RETURN(rc); } @@ -1297,7 +1546,7 @@ static int llog_osd_declare_create(const struct lu_env *env, rc = dt_declare_insert(env, llog_dir, (struct dt_rec *)rec, (struct dt_key *)res->lgh_name, th); - lu_object_put(env, &llog_dir->do_lu); + dt_object_put(env, llog_dir); if (rc) CERROR("%s: can't declare named llog %s: rc = %d\n", o->do_lu.lo_dev->ld_obd->obd_name, @@ -1383,10 +1632,9 @@ static int llog_osd_create(const struct lu_env *env, struct llog_handle *res, rec->rec_type = S_IFREG; dt_read_lock(env, llog_dir, 0); rc = dt_insert(env, llog_dir, (struct dt_rec *)rec, - (struct dt_key *)res->lgh_name, - th, 1); + (struct dt_key *)res->lgh_name, th); dt_read_unlock(env, llog_dir); - lu_object_put(env, &llog_dir->do_lu); + dt_object_put(env, llog_dir); if (rc) CERROR("%s: can't create named llog %s: rc = %d\n", o->do_lu.lo_dev->ld_obd->obd_name, @@ -1419,11 +1667,11 @@ static int llog_osd_close(const struct lu_env *env, struct llog_handle *handle) if (handle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) { /* Remove the object from the cache, otherwise it may * hold LOD being released during cleanup process */ - lu_object_put_nocache(env, &handle->lgh_obj->do_lu); + dt_object_put_nocache(env, handle->lgh_obj); LASSERT(handle->private_data == NULL); RETURN(rc); } else { - lu_object_put(env, &handle->lgh_obj->do_lu); + dt_object_put(env, handle->lgh_obj); } los = handle->private_data; LASSERT(los); @@ -1478,18 +1726,15 @@ llog_osd_regular_fid_del_name_entry(const struct lu_env *env, } dt_write_unlock(env, dir); - lu_object_put(env, &dir->do_lu); + dt_object_put(env, dir); RETURN(rc); } - /** - * Implementation of the llog_operations::lop_destroy + * Implementation of the llog_operations::lop_declare_destroy * - * This function destroys the llog and deletes also entry in the + * This function declare destroys the llog and deletes also entry in the * llog directory in case of named llog. Llog should be opened prior that. - * Destroy method is not part of external transaction and does everything - * inside. * * \param[in] env execution environment * \param[in] loghandle llog handle of the current llog @@ -1497,14 +1742,12 @@ llog_osd_regular_fid_del_name_entry(const struct lu_env *env, * \retval 0 on successful destroy * \retval negative value on error */ -static int llog_osd_destroy(const struct lu_env *env, - struct llog_handle *loghandle) +static int llog_osd_declare_destroy(const struct lu_env *env, + struct llog_handle *loghandle, + struct thandle *th) { struct llog_ctxt *ctxt; struct dt_object *o, *llog_dir = NULL; - struct dt_device *d; - struct thandle *th; - char *name = NULL; int rc; ENTRY; @@ -1515,80 +1758,107 @@ static int llog_osd_destroy(const struct lu_env *env, o = loghandle->lgh_obj; LASSERT(o); - d = lu2dt_dev(o->do_lu.lo_dev); - LASSERT(d); - LASSERT(d == ctxt->loc_exp->exp_obd->obd_lvfs_ctxt.dt); - - th = dt_trans_create(env, d); - if (IS_ERR(th)) - RETURN(PTR_ERR(th)); - if (loghandle->lgh_name) { llog_dir = llog_osd_dir_get(env, ctxt); if (IS_ERR(llog_dir)) - GOTO(out_trans, rc = PTR_ERR(llog_dir)); + RETURN(PTR_ERR(llog_dir)); - name = loghandle->lgh_name; rc = dt_declare_delete(env, llog_dir, - (struct dt_key *)name, th); - if (rc) - GOTO(out_trans, rc); + (struct dt_key *)loghandle->lgh_name, + th); + if (rc < 0) + GOTO(out_put, rc); } rc = dt_declare_ref_del(env, o, th); if (rc < 0) - GOTO(out_trans, rc); + GOTO(out_put, rc); rc = dt_declare_destroy(env, o, th); - if (rc) - GOTO(out_trans, rc); + if (rc < 0) + GOTO(out_put, rc); if (loghandle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) { rc = llog_osd_regular_fid_del_name_entry(env, o, th, true); if (rc < 0) - GOTO(out_trans, rc); + GOTO(out_put, rc); } - rc = dt_trans_start_local(env, d, th); - if (rc) - GOTO(out_trans, rc); +out_put: + if (!(IS_ERR_OR_NULL(llog_dir))) + dt_object_put(env, llog_dir); - th->th_wait_submit = 1; + RETURN(rc); +} + + +/** + * Implementation of the llog_operations::lop_destroy + * + * This function destroys the llog and deletes also entry in the + * llog directory in case of named llog. Llog should be opened prior that. + * Destroy method is not part of external transaction and does everything + * inside. + * + * \param[in] env execution environment + * \param[in] loghandle llog handle of the current llog + * + * \retval 0 on successful destroy + * \retval negative value on error + */ +static int llog_osd_destroy(const struct lu_env *env, + struct llog_handle *loghandle, struct thandle *th) +{ + struct llog_ctxt *ctxt; + struct dt_object *o, *llog_dir = NULL; + int rc; + + ENTRY; + + ctxt = loghandle->lgh_ctxt; + LASSERT(ctxt != NULL); + + o = loghandle->lgh_obj; + LASSERT(o != NULL); dt_write_lock(env, o, 0); - if (dt_object_exists(o)) { - if (name) { - dt_read_lock(env, llog_dir, 0); - rc = dt_delete(env, llog_dir, - (struct dt_key *) name, - th); - dt_read_unlock(env, llog_dir); - if (rc) { - CERROR("%s: can't remove llog %s: rc = %d\n", - o->do_lu.lo_dev->ld_obd->obd_name, - name, rc); - GOTO(out_unlock, rc); - } - } - dt_ref_del(env, o, th); - rc = dt_destroy(env, o, th); - if (rc) - GOTO(out_unlock, rc); + if (!llog_osd_exist(loghandle)) + GOTO(out_unlock, rc = 0); - if (loghandle->lgh_ctxt->loc_flags & - LLOG_CTXT_FLAG_NORMAL_FID) { - rc = llog_osd_regular_fid_del_name_entry(env, o, th, - false); - if (rc < 0) - GOTO(out_unlock, rc); + if (loghandle->lgh_name) { + llog_dir = llog_osd_dir_get(env, ctxt); + if (IS_ERR(llog_dir)) + GOTO(out_unlock, rc = PTR_ERR(llog_dir)); + + dt_read_lock(env, llog_dir, 0); + rc = dt_delete(env, llog_dir, + (struct dt_key *)loghandle->lgh_name, + th); + dt_read_unlock(env, llog_dir); + if (rc) { + CERROR("%s: can't remove llog %s: rc = %d\n", + o->do_lu.lo_dev->ld_obd->obd_name, + loghandle->lgh_name, rc); + GOTO(out_unlock, rc); } } + + dt_ref_del(env, o, th); + rc = dt_destroy(env, o, th); + if (rc < 0) + GOTO(out_unlock, rc); + + loghandle->lgh_destroyed = true; + if (loghandle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) { + rc = llog_osd_regular_fid_del_name_entry(env, o, th, false); + if (rc < 0) + GOTO(out_unlock, rc); + } + out_unlock: dt_write_unlock(env, o); -out_trans: - dt_trans_stop(env, d, th); if (!(IS_ERR_OR_NULL(llog_dir))) - lu_object_put(env, &llog_dir->do_lu); + dt_object_put(env, llog_dir); RETURN(rc); } @@ -1683,6 +1953,7 @@ struct llog_operations llog_osd_ops = { .lop_next_block = llog_osd_next_block, .lop_prev_block = llog_osd_prev_block, .lop_read_header = llog_osd_read_header, + .lop_declare_destroy = llog_osd_declare_destroy, .lop_destroy = llog_osd_destroy, .lop_setup = llog_osd_setup, .lop_cleanup = llog_osd_cleanup, @@ -1700,6 +1971,7 @@ struct llog_operations llog_common_cat_ops = { .lop_next_block = llog_osd_next_block, .lop_prev_block = llog_osd_prev_block, .lop_read_header = llog_osd_read_header, + .lop_declare_destroy = llog_osd_declare_destroy, .lop_destroy = llog_osd_destroy, .lop_setup = llog_osd_setup, .lop_cleanup = llog_osd_cleanup, @@ -1762,7 +2034,7 @@ int llog_osd_get_cat_list(const struct lu_env *env, struct dt_device *d, if (IS_ERR(th)) GOTO(out, rc = PTR_ERR(th)); - lgi->lgi_attr.la_valid = LA_MODE; + lgi->lgi_attr.la_valid = LA_MODE | LA_TYPE; lgi->lgi_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR; lgi->lgi_dof.dof_type = dt_mode_to_dft(S_IFREG); @@ -1832,7 +2104,7 @@ out_trans: EXIT; out: - lu_object_put(env, &o->do_lu); + dt_object_put(env, o); RETURN(rc); } EXPORT_SYMBOL(llog_osd_get_cat_list); @@ -1898,7 +2170,7 @@ int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d, lgi->lgi_buf.lb_buf = idarray; rc = dt_declare_record_write(env, o, &lgi->lgi_buf, lgi->lgi_off, th); if (rc) - GOTO(out, rc); + GOTO(out_trans, rc); /* For update log, this happens during initialization, * see lod_sub_prep_llog(), and we need make sure catlog @@ -1922,7 +2194,7 @@ int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d, out_trans: dt_trans_stop(env, d, th); out: - lu_object_put(env, &o->do_lu); + dt_object_put(env, o); RETURN(rc); } EXPORT_SYMBOL(llog_osd_put_cat_list);