X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fobdclass%2Fllog_osd.c;h=f25126d7d8463af4170ebfcc72371add0285261a;hp=32d74d81919a5dc17a3a876ed69be4f9d2c53310;hb=01fb4edfd7bde7e495e767c5321235565cfadaee;hpb=d10200a80770f0029d1d665af954187b9ad883df diff --git a/lustre/obdclass/llog_osd.c b/lustre/obdclass/llog_osd.c index 32d74d8..f25126d 100644 --- a/lustre/obdclass/llog_osd.c +++ b/lustre/obdclass/llog_osd.c @@ -23,7 +23,7 @@ * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2012, 2016, Intel Corporation. + * Copyright (c) 2012, 2017, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -286,7 +286,6 @@ static int llog_osd_read_header(const struct lu_env *env, handle->lgh_hdr->llh_flags |= (flags & LLOG_F_EXT_MASK); handle->lgh_last_idx = LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index; - handle->lgh_write_offset = lgi->lgi_attr.la_size; RETURN(0); } @@ -363,7 +362,7 @@ static int llog_osd_declare_write_rec(const struct lu_env *env, * the full llog record to write. This is * the beginning of buffer to write, the length * of buffer is stored in \a rec::lrh_len - * \param[out] reccookie pointer to the cookie to return back if needed. + * \param[in,out] reccookie pointer to the cookie to return back if needed. * It is used for further cancel of this llog * record. * \param[in] idx index of the llog record. If \a idx == -1 then @@ -390,7 +389,6 @@ static int llog_osd_write_rec(const struct lu_env *env, __u32 chunk_size; size_t left; __u32 orig_last_idx; - __u64 orig_write_offset; ENTRY; llh = loghandle->lgh_hdr; @@ -492,26 +490,26 @@ static int llog_osd_write_rec(const struct lu_env *env, &lgi->lgi_off, th); RETURN(rc); - } else if (loghandle->lgh_cur_idx > 0) { + } else if (llh->llh_flags & LLOG_F_IS_FIXSIZE) { + lgi->lgi_off = llh->llh_hdr.lrh_len + + (idx - 1) * reclen; + } else if (reccookie != NULL && reccookie->lgc_index > 0) { /** - * The lgh_cur_offset can be used only if index is + * The lgc_offset can be used only if index is * the same. */ - if (idx != loghandle->lgh_cur_idx) { + if (idx != reccookie->lgc_index) { CERROR("%s: modify index mismatch %d %d\n", o->do_lu.lo_dev->ld_obd->obd_name, idx, - loghandle->lgh_cur_idx); + reccookie->lgc_index); RETURN(-EFAULT); } - lgi->lgi_off = loghandle->lgh_cur_offset; - CDEBUG(D_OTHER, "modify record "DOSTID": idx:%d, " + lgi->lgi_off = reccookie->lgc_offset; + CDEBUG(D_OTHER, "modify record "DFID": idx:%u, " "len:%u offset %llu\n", - POSTID(&loghandle->lgh_id.lgl_oi), idx, + PFID(&loghandle->lgh_id.lgl_oi.oi_fid), idx, rec->lrh_len, (long long)lgi->lgi_off); - } else if (llh->llh_flags & LLOG_F_IS_FIXSIZE) { - lgi->lgi_off = llh->llh_hdr.lrh_len + - (idx - 1) * reclen; } else { /* This can be result of lgh_cur_idx is not set during * llog processing or llh_size is not set to proper @@ -556,16 +554,14 @@ static int llog_osd_write_rec(const struct lu_env *env, LASSERT(lgi->lgi_attr.la_valid & LA_SIZE); orig_last_idx = loghandle->lgh_last_idx; - orig_write_offset = loghandle->lgh_write_offset; lgi->lgi_off = lgi->lgi_attr.la_size; if (loghandle->lgh_max_size > 0 && lgi->lgi_off >= loghandle->lgh_max_size) { CDEBUG(D_OTHER, "llog is getting too large (%u > %u) at %u " - DOSTID"\n", (unsigned)lgi->lgi_off, - loghandle->lgh_max_size, - (int)loghandle->lgh_last_idx, - POSTID(&loghandle->lgh_id.lgl_oi)); + DFID"\n", (unsigned)lgi->lgi_off, + loghandle->lgh_max_size, (int)loghandle->lgh_last_idx, + PFID(&loghandle->lgh_id.lgl_oi.oi_fid)); /* this is to signal that this llog is full */ loghandle->lgh_last_idx = LLOG_HDR_BITMAP_SIZE(llh) - 1; RETURN(-ENOSPC); @@ -580,9 +576,6 @@ static int llog_osd_write_rec(const struct lu_env *env, if (rc) RETURN(rc); - if (dt_object_remote(o)) - loghandle->lgh_write_offset = lgi->lgi_off; - loghandle->lgh_last_idx++; /* for pad rec */ } /* if it's the last idx in log file, then return -ENOSPC @@ -684,9 +677,6 @@ out_unlock: * records. This also allows to handle Catalog wrap around case */ if (llh->llh_flags & LLOG_F_IS_FIXSIZE) { lgi->lgi_off = llh->llh_hdr.lrh_len + (index - 1) * reclen; - } else if (dt_object_remote(o)) { - lgi->lgi_off = max_t(__u64, loghandle->lgh_write_offset, - lgi->lgi_off); } else { rc = dt_attr_get(env, o, &lgi->lgi_attr); if (rc) @@ -703,10 +693,7 @@ out_unlock: if (rc < 0) GOTO(out, rc); - if (dt_object_remote(o)) - loghandle->lgh_write_offset = lgi->lgi_off; - - CDEBUG(D_HA, "added record "DFID": idx: %u, %u off%llu\n", + CDEBUG(D_HA, "added record "DFID".%u, %u off%llu\n", PFID(lu_object_fid(&o->do_lu)), index, rec->lrh_len, lgi->lgi_off); if (reccookie != NULL) { @@ -732,7 +719,6 @@ out: /* restore llog last_idx */ if (dt_object_remote(o)) { loghandle->lgh_last_idx = orig_last_idx; - loghandle->lgh_write_offset = orig_write_offset; } else if (--loghandle->lgh_last_idx == 0 && (llh->llh_flags & LLOG_F_IS_CAT) && llh->llh_cat_idx != 0) { /* catalog had just wrap-around case */ @@ -795,22 +781,67 @@ static inline void llog_skip_over(struct llog_handle *lgh, __u64 *off, * big enough to handle the remapped records. It is also assumed that records * of a block have the same format (i.e.: the same features enabled). * - * \param[in,out] hdr Header of the block of records to remap. - * \param[in,out] last_hdr Last header, don't read past this point. - * \param[in] flags Flags describing the fields to keep. + * \param[in,out] hdr Header of the block of records to remap. + * \param[in,out] last_hdr Last header, don't read past this point. + * \param[in] flags Flags describing the fields to keep. + * \param[in] extra_flags Flags describing the extra fields to keep. */ static void changelog_block_trim_ext(struct llog_rec_hdr *hdr, struct llog_rec_hdr *last_hdr, - enum changelog_rec_flags flags) + struct llog_handle *loghandle) { + enum changelog_rec_flags flags = CLF_SUPPORTED; + enum changelog_rec_extra_flags extra_flags = CLFE_SUPPORTED; + + if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_X_XATTR)) + extra_flags &= ~CLFE_XATTR; + if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_X_OMODE)) + extra_flags &= ~CLFE_OPEN; + if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_X_NID)) + extra_flags &= ~CLFE_NID; + if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_X_UIDGID)) + extra_flags &= ~CLFE_UIDGID; + if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_EXTRA_FLAGS)) + flags &= ~CLF_EXTRA_FLAGS; + if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_JOBID)) + flags &= ~CLF_JOBID; + + if (flags == CLF_SUPPORTED && extra_flags == CLFE_SUPPORTED) + return; + if (hdr->lrh_type != CHANGELOG_REC) return; do { struct changelog_rec *rec = (struct changelog_rec *)(hdr + 1); + enum changelog_rec_extra_flags xflag = CLFE_INVALID; - changelog_remap_rec(rec, rec->cr_flags & flags); + if (flags & CLF_EXTRA_FLAGS && + rec->cr_flags & CLF_EXTRA_FLAGS) { + xflag = changelog_rec_extra_flags(rec)->cr_extra_flags & + extra_flags; + } + + if (unlikely(hdr->lrh_len == 0)) { + /* It is corruption case, we cannot know the next rec, + * jump to the last one directly to avoid dead loop. */ + LCONSOLE(D_WARNING, "Hit invalid llog record: " + "idx %u, type %u, id %u\n", + hdr->lrh_index, hdr->lrh_type, hdr->lrh_id); + hdr = llog_rec_hdr_next(last_hdr); + if (unlikely(hdr == last_hdr)) + LCONSOLE(D_WARNING, "The last record crashed: " + "idx %u, type %u, id %u\n", + hdr->lrh_index, hdr->lrh_type, + hdr->lrh_id); + break; + } + + changelog_remap_rec(rec, rec->cr_flags & flags, xflag); hdr = llog_rec_hdr_next(hdr); + /* Yield CPU to avoid soft-lockup if there are too many records + * to be handled. */ + cond_resched(); } while ((char *)hdr <= (char *)last_hdr); } @@ -913,10 +944,10 @@ static int llog_osd_next_block(const struct lu_env *env, if (!force_mini_rec) goto retry; - CERROR("%s: invalid llog block at log id "DOSTID"/%u " + CERROR("%s: invalid llog block at log id "DFID":%x " "offset %llu\n", o->do_lu.lo_dev->ld_obd->obd_name, - POSTID(&loghandle->lgh_id.lgl_oi), + PFID(&loghandle->lgh_id.lgl_oi.oi_fid), loghandle->lgh_id.lgl_ogen, *cur_offset); GOTO(out, rc = -EINVAL); } @@ -935,12 +966,14 @@ static int llog_osd_next_block(const struct lu_env *env, lustre_swab_llog_rec(last_rec); if (last_rec->lrh_index != tail->lrt_index) { - CERROR("%s: invalid llog tail at log id "DOSTID"/%u " - "offset %llu last_rec idx %u tail idx %u\n", + CERROR("%s: invalid llog tail at log id "DFID":%x " + "offset %llu last_rec idx %u tail idx %u" + "lrt len %u read_size %d\n", o->do_lu.lo_dev->ld_obd->obd_name, - POSTID(&loghandle->lgh_id.lgl_oi), + PFID(&loghandle->lgh_id.lgl_oi.oi_fid), loghandle->lgh_id.lgl_ogen, *cur_offset, - last_rec->lrh_index, tail->lrt_index); + last_rec->lrh_index, tail->lrt_index, + tail->lrt_len, rc); GOTO(out, rc = -EINVAL); } @@ -948,10 +981,10 @@ static int llog_osd_next_block(const struct lu_env *env, /* this shouldn't happen */ if (tail->lrt_index == 0) { - CERROR("%s: invalid llog tail at log id "DOSTID"/%u " + CERROR("%s: invalid llog tail at log id "DFID":%x " "offset %llu bytes %d\n", o->do_lu.lo_dev->ld_obd->obd_name, - POSTID(&loghandle->lgh_id.lgl_oi), + PFID(&loghandle->lgh_id.lgl_oi.oi_fid), loghandle->lgh_id.lgl_ogen, *cur_offset, rc); GOTO(out, rc = -EINVAL); } @@ -974,9 +1007,7 @@ static int llog_osd_next_block(const struct lu_env *env, } /* Trim unsupported extensions for compat w/ older clients */ - if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_JOBID)) - changelog_block_trim_ext(rec, last_rec, - CLF_VERSION | CLF_RENAME); + changelog_block_trim_ext(rec, last_rec, loghandle); GOTO(out, rc = 0); @@ -1067,10 +1098,10 @@ static int llog_osd_prev_block(const struct lu_env *env, GOTO(out, rc); if (rc < sizeof(*tail)) { - CERROR("%s: invalid llog block at log id "DOSTID"/%u " + CERROR("%s: invalid llog block at log id "DFID":%x " "offset %llu\n", o->do_lu.lo_dev->ld_obd->obd_name, - POSTID(&loghandle->lgh_id.lgl_oi), + PFID(&loghandle->lgh_id.lgl_oi.oi_fid), loghandle->lgh_id.lgl_ogen, cur_offset); GOTO(out, rc = -EINVAL); } @@ -1091,10 +1122,10 @@ static int llog_osd_prev_block(const struct lu_env *env, /* this shouldn't happen */ if (tail->lrt_index == 0) { - CERROR("%s: invalid llog tail at log id "DOSTID"/%u " + CERROR("%s: invalid llog tail at log id "DFID":%x " "offset %llu\n", o->do_lu.lo_dev->ld_obd->obd_name, - POSTID(&loghandle->lgh_id.lgl_oi), + PFID(&loghandle->lgh_id.lgl_oi.oi_fid), loghandle->lgh_id.lgl_ogen, cur_offset); GOTO(out, rc = -EINVAL); } @@ -1111,9 +1142,7 @@ static int llog_osd_prev_block(const struct lu_env *env, } /* Trim unsupported extensions for compat w/ older clients */ - if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_JOBID)) - changelog_block_trim_ext(rec, last_rec, - CLF_VERSION | CLF_RENAME); + changelog_block_trim_ext(rec, last_rec, loghandle); GOTO(out, rc = 0); } @@ -1148,7 +1177,7 @@ static struct dt_object *llog_osd_dir_get(const struct lu_env *env, dir = dt_locate(env, dt, &dti->dti_fid); if (!IS_ERR(dir) && !dt_try_as_dir(env, dir)) { - lu_object_put(env, &dir->do_lu); + dt_object_put(env, dir); return ERR_PTR(-ENOTDIR); } } else { @@ -1247,7 +1276,7 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle, dt_read_lock(env, llog_dir, 0); rc = dt_lookup_dir(env, llog_dir, name, &lgi->lgi_fid); dt_read_unlock(env, llog_dir); - lu_object_put(env, &llog_dir->do_lu); + dt_object_put(env, llog_dir); if (rc == -ENOENT && open_param == LLOG_OPEN_NEW) { /* generate fid for new llog */ rc = local_object_fid_generate(env, los, @@ -1282,7 +1311,7 @@ generate: ", skipping\n", o->do_lu.lo_dev->ld_obd->obd_name, PFID(lu_object_fid(&o->do_lu))); - lu_object_put(env, &o->do_lu); + dt_object_put(env, o); /* just skip this llog ID, we shouldn't delete it because we * don't know exactly what is its purpose and state. */ goto generate; @@ -1290,9 +1319,12 @@ generate: after_open: /* No new llog is expected but doesn't exist */ - if (open_param != LLOG_OPEN_NEW && !dt_object_exists(o)) + if (open_param != LLOG_OPEN_NEW && !dt_object_exists(o)) { + CDEBUG(D_INFO, "%s: llog FID: "DFID" obj %p doesn`t exist\n", + o->do_lu.lo_dev->ld_obd->obd_name, + PFID(lu_object_fid(&o->do_lu)), o); GOTO(out_put, rc = -ENOENT); - + } fid_to_logid(&lgi->lgi_fid, &handle->lgh_id); handle->lgh_obj = o; handle->private_data = los; @@ -1301,7 +1333,7 @@ after_open: RETURN(rc); out_put: - lu_object_put(env, &o->do_lu); + dt_object_put(env, o); out_name: if (handle->lgh_name != NULL) OBD_FREE(handle->lgh_name, strlen(name) + 1); @@ -1348,7 +1380,7 @@ struct dt_object *llog_osd_get_regular_fid_dir(const struct lu_env *env, RETURN(dir); if (!dt_try_as_dir(env, dir)) { - lu_object_put(env, &dir->do_lu); + dt_object_put(env, dir); RETURN(ERR_PTR(-ENOTDIR)); } @@ -1403,7 +1435,7 @@ llog_osd_regular_fid_add_name_entry(const struct lu_env *env, } dt_write_unlock(env, dir); - lu_object_put(env, &dir->do_lu); + dt_object_put(env, dir); RETURN(rc); } @@ -1480,7 +1512,7 @@ static int llog_osd_declare_create(const struct lu_env *env, rc = dt_declare_insert(env, llog_dir, (struct dt_rec *)rec, (struct dt_key *)res->lgh_name, th); - lu_object_put(env, &llog_dir->do_lu); + dt_object_put(env, llog_dir); if (rc) CERROR("%s: can't declare named llog %s: rc = %d\n", o->do_lu.lo_dev->ld_obd->obd_name, @@ -1569,7 +1601,7 @@ static int llog_osd_create(const struct lu_env *env, struct llog_handle *res, (struct dt_key *)res->lgh_name, th, 1); dt_read_unlock(env, llog_dir); - lu_object_put(env, &llog_dir->do_lu); + dt_object_put(env, llog_dir); if (rc) CERROR("%s: can't create named llog %s: rc = %d\n", o->do_lu.lo_dev->ld_obd->obd_name, @@ -1602,11 +1634,11 @@ static int llog_osd_close(const struct lu_env *env, struct llog_handle *handle) if (handle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) { /* Remove the object from the cache, otherwise it may * hold LOD being released during cleanup process */ - lu_object_put_nocache(env, &handle->lgh_obj->do_lu); + dt_object_put_nocache(env, handle->lgh_obj); LASSERT(handle->private_data == NULL); RETURN(rc); } else { - lu_object_put(env, &handle->lgh_obj->do_lu); + dt_object_put(env, handle->lgh_obj); } los = handle->private_data; LASSERT(los); @@ -1661,7 +1693,7 @@ llog_osd_regular_fid_del_name_entry(const struct lu_env *env, } dt_write_unlock(env, dir); - lu_object_put(env, &dir->do_lu); + dt_object_put(env, dir); RETURN(rc); } @@ -1721,7 +1753,7 @@ static int llog_osd_declare_destroy(const struct lu_env *env, out_put: if (!(IS_ERR_OR_NULL(llog_dir))) - lu_object_put(env, &llog_dir->do_lu); + dt_object_put(env, llog_dir); RETURN(rc); } @@ -1792,7 +1824,7 @@ static int llog_osd_destroy(const struct lu_env *env, out_unlock: dt_write_unlock(env, o); if (!(IS_ERR_OR_NULL(llog_dir))) - lu_object_put(env, &llog_dir->do_lu); + dt_object_put(env, llog_dir); RETURN(rc); } @@ -2038,7 +2070,7 @@ out_trans: EXIT; out: - lu_object_put(env, &o->do_lu); + dt_object_put(env, o); RETURN(rc); } EXPORT_SYMBOL(llog_osd_get_cat_list); @@ -2128,7 +2160,7 @@ int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d, out_trans: dt_trans_stop(env, d, th); out: - lu_object_put(env, &o->do_lu); + dt_object_put(env, o); RETURN(rc); } EXPORT_SYMBOL(llog_osd_put_cat_list);