X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fobdclass%2Fllog_osd.c;h=54f3dd46f217575ab37e78d54b93bd53a69caaca;hp=9d458f4397f189cf51a0bd203ebb74946d466db1;hb=d6bd5e9cc49b3bb9901ada503107e8b0eca44f7e;hpb=5a8e47d0a1a7548a7f6bbf977a4fdb3935adaed6 diff --git a/lustre/obdclass/llog_osd.c b/lustre/obdclass/llog_osd.c index 9d458f4..54f3dd4 100644 --- a/lustre/obdclass/llog_osd.c +++ b/lustre/obdclass/llog_osd.c @@ -23,7 +23,7 @@ * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2012, 2016, Intel Corporation. + * Copyright (c) 2012, 2017, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -44,6 +44,8 @@ #define DEBUG_SUBSYSTEM S_LOG +#include + #include #include #include @@ -124,8 +126,7 @@ static int llog_osd_create_new_object(const struct lu_env *env, static int llog_osd_exist(struct llog_handle *handle) { LASSERT(handle->lgh_obj); - return dt_object_exists(handle->lgh_obj) && - !lu_object_is_dying(handle->lgh_obj->do_lu.lo_header); + return dt_object_exists(handle->lgh_obj) && !handle->lgh_destroyed; } static void *rec_tail(struct llog_rec_hdr *rec) @@ -362,7 +363,7 @@ static int llog_osd_declare_write_rec(const struct lu_env *env, * the full llog record to write. This is * the beginning of buffer to write, the length * of buffer is stored in \a rec::lrh_len - * \param[out] reccookie pointer to the cookie to return back if needed. + * \param[in,out] reccookie pointer to the cookie to return back if needed. * It is used for further cancel of this llog * record. * \param[in] idx index of the llog record. If \a idx == -1 then @@ -490,26 +491,26 @@ static int llog_osd_write_rec(const struct lu_env *env, &lgi->lgi_off, th); RETURN(rc); - } else if (loghandle->lgh_cur_idx > 0) { + } else if (llh->llh_flags & LLOG_F_IS_FIXSIZE) { + lgi->lgi_off = llh->llh_hdr.lrh_len + + (idx - 1) * reclen; + } else if (reccookie != NULL && reccookie->lgc_index > 0) { /** - * The lgh_cur_offset can be used only if index is + * The lgc_offset can be used only if index is * the same. */ - if (idx != loghandle->lgh_cur_idx) { + if (idx != reccookie->lgc_index) { CERROR("%s: modify index mismatch %d %d\n", o->do_lu.lo_dev->ld_obd->obd_name, idx, - loghandle->lgh_cur_idx); + reccookie->lgc_index); RETURN(-EFAULT); } - lgi->lgi_off = loghandle->lgh_cur_offset; + lgi->lgi_off = reccookie->lgc_offset; CDEBUG(D_OTHER, "modify record "DFID": idx:%u, " "len:%u offset %llu\n", PFID(&loghandle->lgh_id.lgl_oi.oi_fid), idx, rec->lrh_len, (long long)lgi->lgi_off); - } else if (llh->llh_flags & LLOG_F_IS_FIXSIZE) { - lgi->lgi_off = llh->llh_hdr.lrh_len + - (idx - 1) * reclen; } else { /* This can be result of lgh_cur_idx is not set during * llog processing or llh_size is not set to proper @@ -590,6 +591,7 @@ static int llog_osd_write_rec(const struct lu_env *env, RETURN(-ENOSPC); } + down_write(&loghandle->lgh_last_sem); /* increment the last_idx along with llh_tail index, they should * be equal for a llog lifetime */ loghandle->lgh_last_idx++; @@ -673,6 +675,12 @@ out_unlock: if (rc) GOTO(out, rc); + if (OBD_FAIL_PRECHECK(OBD_FAIL_LLOG_PROCESS_TIMEOUT) && + cfs_fail_val == (unsigned int)(loghandle->lgh_id.lgl_oi.oi.oi_id & + 0xFFFFFFFF)) { + OBD_RACE(OBD_FAIL_LLOG_PROCESS_TIMEOUT); + msleep(1 * MSEC_PER_SEC); + } /* computed index can be used to determine offset for fixed-size * records. This also allows to handle Catalog wrap around case */ if (llh->llh_flags & LLOG_F_IS_FIXSIZE) { @@ -693,6 +701,8 @@ out_unlock: if (rc < 0) GOTO(out, rc); + up_write(&loghandle->lgh_last_sem); + CDEBUG(D_HA, "added record "DFID".%u, %u off%llu\n", PFID(lu_object_fid(&o->do_lu)), index, rec->lrh_len, lgi->lgi_off); @@ -726,6 +736,7 @@ out: } LLOG_HDR_TAIL(llh)->lrt_index = loghandle->lgh_last_idx; + up_write(&loghandle->lgh_last_sem); RETURN(rc); } @@ -781,22 +792,67 @@ static inline void llog_skip_over(struct llog_handle *lgh, __u64 *off, * big enough to handle the remapped records. It is also assumed that records * of a block have the same format (i.e.: the same features enabled). * - * \param[in,out] hdr Header of the block of records to remap. - * \param[in,out] last_hdr Last header, don't read past this point. - * \param[in] flags Flags describing the fields to keep. + * \param[in,out] hdr Header of the block of records to remap. + * \param[in,out] last_hdr Last header, don't read past this point. + * \param[in] flags Flags describing the fields to keep. + * \param[in] extra_flags Flags describing the extra fields to keep. */ static void changelog_block_trim_ext(struct llog_rec_hdr *hdr, struct llog_rec_hdr *last_hdr, - enum changelog_rec_flags flags) + struct llog_handle *loghandle) { + enum changelog_rec_flags flags = CLF_SUPPORTED; + enum changelog_rec_extra_flags extra_flags = CLFE_SUPPORTED; + + if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_X_XATTR)) + extra_flags &= ~CLFE_XATTR; + if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_X_OMODE)) + extra_flags &= ~CLFE_OPEN; + if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_X_NID)) + extra_flags &= ~CLFE_NID; + if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_X_UIDGID)) + extra_flags &= ~CLFE_UIDGID; + if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_EXTRA_FLAGS)) + flags &= ~CLF_EXTRA_FLAGS; + if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_JOBID)) + flags &= ~CLF_JOBID; + + if (flags == CLF_SUPPORTED && extra_flags == CLFE_SUPPORTED) + return; + if (hdr->lrh_type != CHANGELOG_REC) return; do { struct changelog_rec *rec = (struct changelog_rec *)(hdr + 1); + enum changelog_rec_extra_flags xflag = CLFE_INVALID; - changelog_remap_rec(rec, rec->cr_flags & flags); + if (flags & CLF_EXTRA_FLAGS && + rec->cr_flags & CLF_EXTRA_FLAGS) { + xflag = changelog_rec_extra_flags(rec)->cr_extra_flags & + extra_flags; + } + + if (unlikely(hdr->lrh_len == 0)) { + /* It is corruption case, we cannot know the next rec, + * jump to the last one directly to avoid dead loop. */ + LCONSOLE(D_WARNING, "Hit invalid llog record: " + "idx %u, type %u, id %u\n", + hdr->lrh_index, hdr->lrh_type, hdr->lrh_id); + hdr = llog_rec_hdr_next(last_hdr); + if (unlikely(hdr == last_hdr)) + LCONSOLE(D_WARNING, "The last record crashed: " + "idx %u, type %u, id %u\n", + hdr->lrh_index, hdr->lrh_type, + hdr->lrh_id); + break; + } + + changelog_remap_rec(rec, rec->cr_flags & flags, xflag); hdr = llog_rec_hdr_next(hdr); + /* Yield CPU to avoid soft-lockup if there are too many records + * to be handled. */ + cond_resched(); } while ((char *)hdr <= (char *)last_hdr); } @@ -846,7 +902,7 @@ static int llog_osd_next_block(const struct lu_env *env, o = loghandle->lgh_obj; LASSERT(o); - LASSERT(dt_object_exists(o)); + LASSERT(llog_osd_exist(loghandle)); dt = lu2dt_dev(o->do_lu.lo_dev); LASSERT(dt); @@ -962,9 +1018,7 @@ static int llog_osd_next_block(const struct lu_env *env, } /* Trim unsupported extensions for compat w/ older clients */ - if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_JOBID)) - changelog_block_trim_ext(rec, last_rec, - CLF_VERSION | CLF_RENAME); + changelog_block_trim_ext(rec, last_rec, loghandle); GOTO(out, rc = 0); @@ -1022,7 +1076,7 @@ static int llog_osd_prev_block(const struct lu_env *env, o = loghandle->lgh_obj; LASSERT(o); - LASSERT(dt_object_exists(o)); + LASSERT(llog_osd_exist(loghandle)); dt = lu2dt_dev(o->do_lu.lo_dev); LASSERT(dt); @@ -1099,9 +1153,7 @@ static int llog_osd_prev_block(const struct lu_env *env, } /* Trim unsupported extensions for compat w/ older clients */ - if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_JOBID)) - changelog_block_trim_ext(rec, last_rec, - CLF_VERSION | CLF_RENAME); + changelog_block_trim_ext(rec, last_rec, loghandle); GOTO(out, rc = 0); } @@ -1278,9 +1330,12 @@ generate: after_open: /* No new llog is expected but doesn't exist */ - if (open_param != LLOG_OPEN_NEW && !dt_object_exists(o)) + if (open_param != LLOG_OPEN_NEW && !dt_object_exists(o)) { + CDEBUG(D_INFO, "%s: llog FID: "DFID" obj %p doesn`t exist\n", + o->do_lu.lo_dev->ld_obd->obd_name, + PFID(lu_object_fid(&o->do_lu)), o); GOTO(out_put, rc = -ENOENT); - + } fid_to_logid(&lgi->lgi_fid, &handle->lgh_id); handle->lgh_obj = o; handle->private_data = los; @@ -1387,7 +1442,7 @@ llog_osd_regular_fid_add_name_entry(const struct lu_env *env, (struct dt_key *)name, th); } else { rc = dt_insert(env, dir, (struct dt_rec *)rec, - (struct dt_key *)name, th, 1); + (struct dt_key *)name, th); } dt_write_unlock(env, dir); @@ -1554,8 +1609,7 @@ static int llog_osd_create(const struct lu_env *env, struct llog_handle *res, rec->rec_type = S_IFREG; dt_read_lock(env, llog_dir, 0); rc = dt_insert(env, llog_dir, (struct dt_rec *)rec, - (struct dt_key *)res->lgh_name, - th, 1); + (struct dt_key *)res->lgh_name, th); dt_read_unlock(env, llog_dir); dt_object_put(env, llog_dir); if (rc) @@ -1745,7 +1799,7 @@ static int llog_osd_destroy(const struct lu_env *env, LASSERT(o != NULL); dt_write_lock(env, o, 0); - if (!dt_object_exists(o)) + if (!llog_osd_exist(loghandle)) GOTO(out_unlock, rc = 0); if (loghandle->lgh_name) { @@ -1771,6 +1825,7 @@ static int llog_osd_destroy(const struct lu_env *env, if (rc < 0) GOTO(out_unlock, rc); + loghandle->lgh_destroyed = true; if (loghandle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) { rc = llog_osd_regular_fid_del_name_entry(env, o, th, false); if (rc < 0)