X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fobdclass%2Fllog_osd.c;h=6c3c288cce9507d5e9a6f65bcb8b57459c915413;hb=4724b52bba54ccdb0f81d0c63010b69e87e7f65c;hp=a099df264d43532d05ea382cea606f00fdb4bc40;hpb=881b288d70318644098c335b92f07388e9e2d3a5;p=fs%2Flustre-release.git diff --git a/lustre/obdclass/llog_osd.c b/lustre/obdclass/llog_osd.c index a099df2..6c3c288 100644 --- a/lustre/obdclass/llog_osd.c +++ b/lustre/obdclass/llog_osd.c @@ -23,7 +23,7 @@ * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2012, 2014 Intel Corporation. + * Copyright (c) 2012, 2015, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -44,10 +44,11 @@ #define DEBUG_SUBSYSTEM S_LOG +#include +#include +#include #include #include -#include -#include #include "llog_internal.h" #include "local_storage.h" @@ -111,6 +112,29 @@ static int llog_osd_create_new_object(const struct lu_env *env, } /** + * Implementation of the llog_operations::lop_exist + * + * This function checks that llog exists on storage. + * + * \param[in] handle llog handle of the current llog + * + * \retval true if llog object exists and is not just destroyed + * \retval false if llog doesn't exist or just destroyed + */ +static int llog_osd_exist(struct llog_handle *handle) +{ + LASSERT(handle->lgh_obj); + return dt_object_exists(handle->lgh_obj) && + !lu_object_is_dying(handle->lgh_obj->do_lu.lo_header); +} + +static void *rec_tail(struct llog_rec_hdr *rec) +{ + return (void *)((char *)rec + rec->lrh_len - + sizeof(struct llog_rec_tail)); +} + +/** * Write a padding record to the llog * * This function writes a padding record to the end of llog. That may @@ -262,6 +286,7 @@ static int llog_osd_read_header(const struct lu_env *env, handle->lgh_hdr->llh_flags |= (flags & LLOG_F_EXT_MASK); handle->lgh_last_idx = LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index; + handle->lgh_write_offset = lgi->lgi_attr.la_size; RETURN(0); } @@ -364,24 +389,30 @@ static int llog_osd_write_rec(const struct lu_env *env, struct dt_object *o; __u32 chunk_size; size_t left; - + __u32 orig_last_idx; + __u64 orig_write_offset; ENTRY; - LASSERT(env); llh = loghandle->lgh_hdr; - LASSERT(llh); o = loghandle->lgh_obj; - LASSERT(o); - LASSERT(th); chunk_size = llh->llh_hdr.lrh_len; CDEBUG(D_OTHER, "new record %x to "DFID"\n", rec->lrh_type, PFID(lu_object_fid(&o->do_lu))); + if (!llog_osd_exist(loghandle)) + RETURN(-ENOENT); + /* record length should not bigger than */ if (reclen > loghandle->lgh_hdr->llh_hdr.lrh_len) RETURN(-E2BIG); + /* sanity check for fixed-records llog */ + if (idx != LLOG_HEADER_IDX && (llh->llh_flags & LLOG_F_IS_FIXSIZE)) { + LASSERT(llh->llh_size != 0); + LASSERT(llh->llh_size == reclen); + } + rc = dt_attr_get(env, o, &lgi->lgi_attr); if (rc) RETURN(rc); @@ -479,15 +510,8 @@ static int llog_osd_write_rec(const struct lu_env *env, POSTID(&loghandle->lgh_id.lgl_oi), idx, rec->lrh_len, (long long)lgi->lgi_off); } else if (llh->llh_flags & LLOG_F_IS_FIXSIZE) { - if (llh->llh_size == 0 || - llh->llh_size != rec->lrh_len) { - CERROR("%s: wrong record size, llh_size is %u" - " but record size is %u\n", - o->do_lu.lo_dev->ld_obd->obd_name, - llh->llh_size, rec->lrh_len); - RETURN(-EINVAL); - } - lgi->lgi_off = sizeof(*llh) + (idx - 1) * reclen; + lgi->lgi_off = llh->llh_hdr.lrh_len + + (idx - 1) * reclen; } else { /* This can be result of lgh_cur_idx is not set during * llog processing or llh_size is not set to proper @@ -522,8 +546,31 @@ static int llog_osd_write_rec(const struct lu_env *env, * process them page-at-a-time if needed. If it will cross a chunk * boundary, write in a fake (but referenced) entry to pad the chunk. */ + + + /* simulate ENOSPC when new plain llog is being added to the + * catalog */ + if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED2) && + llh->llh_flags & LLOG_F_IS_CAT) + RETURN(-ENOSPC); + LASSERT(lgi->lgi_attr.la_valid & LA_SIZE); + orig_last_idx = loghandle->lgh_last_idx; + orig_write_offset = loghandle->lgh_write_offset; lgi->lgi_off = lgi->lgi_attr.la_size; + + if (loghandle->lgh_max_size > 0 && + lgi->lgi_off >= loghandle->lgh_max_size) { + CDEBUG(D_OTHER, "llog is getting too large (%u > %u) at %u " + DOSTID"\n", (unsigned)lgi->lgi_off, + loghandle->lgh_max_size, + (int)loghandle->lgh_last_idx, + POSTID(&loghandle->lgh_id.lgl_oi)); + /* this is to signal that this llog is full */ + loghandle->lgh_last_idx = LLOG_HDR_BITMAP_SIZE(llh) - 1; + RETURN(-ENOSPC); + } + left = chunk_size - (lgi->lgi_off & (chunk_size - 1)); /* NOTE: padding is a record, but no bit is set */ if (left != 0 && left != reclen && @@ -532,11 +579,23 @@ static int llog_osd_write_rec(const struct lu_env *env, rc = llog_osd_pad(env, o, &lgi->lgi_off, left, index, th); if (rc) RETURN(rc); + + if (dt_object_remote(o)) + loghandle->lgh_write_offset = lgi->lgi_off; + loghandle->lgh_last_idx++; /* for pad rec */ } - /* if it's the last idx in log file, then return -ENOSPC */ - if (loghandle->lgh_last_idx >= LLOG_HDR_BITMAP_SIZE(llh) - 1) - RETURN(-ENOSPC); + /* if it's the last idx in log file, then return -ENOSPC + * or wrap around if a catalog */ + if (llog_is_full(loghandle) || + unlikely(llh->llh_flags & LLOG_F_IS_CAT && + OBD_FAIL_PRECHECK(OBD_FAIL_CAT_RECORDS) && + loghandle->lgh_last_idx >= cfs_fail_val)) { + if (llh->llh_flags & LLOG_F_IS_CAT) + loghandle->lgh_last_idx = 0; + else + RETURN(-ENOSPC); + } /* increment the last_idx along with llh_tail index, they should * be equal for a llog lifetime */ @@ -565,9 +624,7 @@ static int llog_osd_write_rec(const struct lu_env *env, } llh->llh_count++; - if (llh->llh_flags & LLOG_F_IS_FIXSIZE) { - LASSERT(llh->llh_size == reclen); - } else { + if (!(llh->llh_flags & LLOG_F_IS_FIXSIZE)) { /* Update the minimum size of the llog record */ if (llh->llh_size == 0) llh->llh_size = reclen; @@ -623,20 +680,34 @@ out_unlock: if (rc) GOTO(out, rc); - rc = dt_attr_get(env, o, &lgi->lgi_attr); - if (rc) - GOTO(out, rc); + /* computed index can be used to determine offset for fixed-size + * records. This also allows to handle Catalog wrap around case */ + if (llh->llh_flags & LLOG_F_IS_FIXSIZE) { + lgi->lgi_off = llh->llh_hdr.lrh_len + (index - 1) * reclen; + } else if (dt_object_remote(o)) { + lgi->lgi_off = max_t(__u64, loghandle->lgh_write_offset, + lgi->lgi_off); + } else { + rc = dt_attr_get(env, o, &lgi->lgi_attr); + if (rc) + GOTO(out, rc); + + LASSERT(lgi->lgi_attr.la_valid & LA_SIZE); + lgi->lgi_off = max_t(__u64, lgi->lgi_attr.la_size, + lgi->lgi_off); + } - LASSERT(lgi->lgi_attr.la_valid & LA_SIZE); - lgi->lgi_off = max_t(__u64, lgi->lgi_attr.la_size, lgi->lgi_off); lgi->lgi_buf.lb_len = reclen; lgi->lgi_buf.lb_buf = rec; rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); if (rc < 0) GOTO(out, rc); - CDEBUG(D_OTHER, "added record "DOSTID": idx: %u, %u off"LPU64"\n", - POSTID(&loghandle->lgh_id.lgl_oi), index, rec->lrh_len, + if (dt_object_remote(o)) + loghandle->lgh_write_offset = lgi->lgi_off; + + CDEBUG(D_HA, "added record "DFID": idx: %u, %u off%llu\n", + PFID(lu_object_fid(&o->do_lu)), index, rec->lrh_len, lgi->lgi_off); if (reccookie != NULL) { reccookie->lgc_lgl = loghandle->lgh_id; @@ -659,7 +730,15 @@ out: mutex_unlock(&loghandle->lgh_hdr_mutex); /* restore llog last_idx */ - loghandle->lgh_last_idx--; + if (dt_object_remote(o)) { + loghandle->lgh_last_idx = orig_last_idx; + loghandle->lgh_write_offset = orig_write_offset; + } else if (--loghandle->lgh_last_idx == 0 && + (llh->llh_flags & LLOG_F_IS_CAT) && llh->llh_cat_idx != 0) { + /* catalog had just wrap-around case */ + loghandle->lgh_last_idx = LLOG_HDR_BITMAP_SIZE(llh) - 1; + } + LLOG_HDR_TAIL(llh)->lrt_index = loghandle->lgh_last_idx; RETURN(rc); @@ -776,9 +855,6 @@ static int llog_osd_next_block(const struct lu_env *env, if (len == 0 || len & (chunk_size - 1)) RETURN(-EINVAL); - CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off "LPU64")\n", - next_idx, *cur_idx, *cur_offset); - LASSERT(loghandle); LASSERT(loghandle->lgh_ctxt); @@ -792,6 +868,10 @@ static int llog_osd_next_block(const struct lu_env *env, if (rc) GOTO(out, rc); + CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off" + "%llu), size %llu\n", next_idx, *cur_idx, + *cur_offset, lgi->lgi_attr.la_size); + while (*cur_offset < lgi->lgi_attr.la_size) { struct llog_rec_hdr *rec, *last_rec; struct llog_rec_tail *tail; @@ -810,7 +890,7 @@ static int llog_osd_next_block(const struct lu_env *env, goto retry; CERROR("%s: can't read llog block from log "DFID - " offset "LPU64": rc = %d\n", + " offset %llu: rc = %d\n", o->do_lu.lo_dev->ld_obd->obd_name, PFID(lu_object_fid(&o->do_lu)), *cur_offset, rc); @@ -834,7 +914,7 @@ static int llog_osd_next_block(const struct lu_env *env, goto retry; CERROR("%s: invalid llog block at log id "DOSTID"/%u " - "offset "LPU64"\n", + "offset %llu\n", o->do_lu.lo_dev->ld_obd->obd_name, POSTID(&loghandle->lgh_id.lgl_oi), loghandle->lgh_id.lgl_ogen, *cur_offset); @@ -853,14 +933,23 @@ static int llog_osd_next_block(const struct lu_env *env, if (LLOG_REC_HDR_NEEDS_SWABBING(last_rec)) lustre_swab_llog_rec(last_rec); - LASSERT(last_rec->lrh_index == tail->lrt_index); + + if (last_rec->lrh_index != tail->lrt_index) { + CERROR("%s: invalid llog tail at log id "DOSTID"/%u " + "offset %llu last_rec idx %u tail idx %u\n", + o->do_lu.lo_dev->ld_obd->obd_name, + POSTID(&loghandle->lgh_id.lgl_oi), + loghandle->lgh_id.lgl_ogen, *cur_offset, + last_rec->lrh_index, tail->lrt_index); + GOTO(out, rc = -EINVAL); + } *cur_idx = tail->lrt_index; /* this shouldn't happen */ if (tail->lrt_index == 0) { CERROR("%s: invalid llog tail at log id "DOSTID"/%u " - "offset "LPU64" bytes %d\n", + "offset %llu bytes %d\n", o->do_lu.lo_dev->ld_obd->obd_name, POSTID(&loghandle->lgh_id.lgl_oi), loghandle->lgh_id.lgl_ogen, *cur_offset, rc); @@ -968,7 +1057,7 @@ static int llog_osd_prev_block(const struct lu_env *env, rc = dt_read(env, o, &lgi->lgi_buf, &cur_offset); if (rc < 0) { CERROR("%s: can't read llog block from log "DFID - " offset "LPU64": rc = %d\n", + " offset %llu: rc = %d\n", o->do_lu.lo_dev->ld_obd->obd_name, PFID(lu_object_fid(&o->do_lu)), cur_offset, rc); GOTO(out, rc); @@ -979,7 +1068,7 @@ static int llog_osd_prev_block(const struct lu_env *env, if (rc < sizeof(*tail)) { CERROR("%s: invalid llog block at log id "DOSTID"/%u " - "offset "LPU64"\n", + "offset %llu\n", o->do_lu.lo_dev->ld_obd->obd_name, POSTID(&loghandle->lgh_id.lgl_oi), loghandle->lgh_id.lgl_ogen, cur_offset); @@ -1003,7 +1092,7 @@ static int llog_osd_prev_block(const struct lu_env *env, /* this shouldn't happen */ if (tail->lrt_index == 0) { CERROR("%s: invalid llog tail at log id "DOSTID"/%u " - "offset "LPU64"\n", + "offset %llu\n", o->do_lu.lo_dev->ld_obd->obd_name, POSTID(&loghandle->lgh_id.lgl_oi), loghandle->lgh_id.lgl_ogen, cur_offset); @@ -1102,6 +1191,7 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle, struct ls_device *ls; struct local_oid_storage *los = NULL; int rc = 0; + bool new_id = false; ENTRY; @@ -1162,6 +1252,7 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle, /* generate fid for new llog */ rc = local_object_fid_generate(env, los, &lgi->lgi_fid); + new_id = true; } if (rc < 0) GOTO(out, rc); @@ -1173,15 +1264,30 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle, } else { LASSERTF(open_param & LLOG_OPEN_NEW, "%#x\n", open_param); /* generate fid for new llog */ +generate: rc = local_object_fid_generate(env, los, &lgi->lgi_fid); if (rc < 0) GOTO(out, rc); + new_id = true; } o = ls_locate(env, ls, &lgi->lgi_fid, NULL); if (IS_ERR(o)) GOTO(out_name, rc = PTR_ERR(o)); + if (dt_object_exists(o) && new_id) { + /* llog exists with just generated ID, e.g. some old llog file + * still is in use or is orphan, drop a warn and skip it. */ + CDEBUG(D_INFO, "%s: llog exists with the same FID: "DFID + ", skipping\n", + o->do_lu.lo_dev->ld_obd->obd_name, + PFID(lu_object_fid(&o->do_lu))); + lu_object_put(env, &o->do_lu); + /* just skip this llog ID, we shouldn't delete it because we + * don't know exactly what is its purpose and state. */ + goto generate; + } + after_open: /* No new llog is expected but doesn't exist */ if (open_param != LLOG_OPEN_NEW && !dt_object_exists(o)) @@ -1206,23 +1312,6 @@ out: } /** - * Implementation of the llog_operations::lop_exist - * - * This function checks that llog exists on storage. - * - * \param[in] handle llog handle of the current llog - * - * \retval true if llog object exists and is not just destroyed - * \retval false if llog doesn't exist or just destroyed - */ -static int llog_osd_exist(struct llog_handle *handle) -{ - LASSERT(handle->lgh_obj); - return (dt_object_exists(handle->lgh_obj) && - !lu_object_is_dying(handle->lgh_obj->do_lu.lo_header)); -} - -/** * Get dir for regular fid log object * * Get directory for regular fid log object, and these regular fid log