X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fobdclass%2Fllog_osd.c;h=b8231855e82c6b918a2ae8a4aa0faead650e06d0;hb=dd99ddf9f22921535c3f083e91bfc06a566e382b;hp=f0e904b17ca39930f3b039e334d1e3b8e0540720;hpb=cd764a5462697261a9a6b1e6c6858c75d969bae1;p=fs%2Flustre-release.git diff --git a/lustre/obdclass/llog_osd.c b/lustre/obdclass/llog_osd.c index f0e904b..b823185 100644 --- a/lustre/obdclass/llog_osd.c +++ b/lustre/obdclass/llog_osd.c @@ -116,7 +116,7 @@ static int llog_osd_create_new_object(const struct lu_env *env, * This function writes a padding record to the end of llog. That may * be needed if llog contains records of variable size, e.g. config logs * or changelogs. - * The padding record just aligns llog to the LLOG_CHUNK_SIZE boundary if + * The padding record just aligns llog to the llog chunk_size boundary if * the current record doesn't fit in the remaining space. * * It allocates full length to avoid two separate writes for header and tail. @@ -192,8 +192,6 @@ static int llog_osd_read_header(const struct lu_env *env, ENTRY; - LASSERT(sizeof(*handle->lgh_hdr) == LLOG_CHUNK_SIZE); - o = handle->lgh_obj; LASSERT(o); @@ -214,17 +212,21 @@ static int llog_osd_read_header(const struct lu_env *env, lgi->lgi_off = 0; lgi->lgi_buf.lb_buf = handle->lgh_hdr; - lgi->lgi_buf.lb_len = LLOG_CHUNK_SIZE; - - rc = dt_record_read(env, o, &lgi->lgi_buf, &lgi->lgi_off); - if (rc) { - CERROR("%s: error reading log header from "DFID": rc = %d\n", + lgi->lgi_buf.lb_len = handle->lgh_hdr_size; + rc = dt_read(env, o, &lgi->lgi_buf, &lgi->lgi_off); + llh_hdr = &handle->lgh_hdr->llh_hdr; + if (rc < sizeof(*llh_hdr) || rc < llh_hdr->lrh_len) { + CERROR("%s: error reading "DFID" log header size %d: rc = %d\n", o->do_lu.lo_dev->ld_obd->obd_name, - PFID(lu_object_fid(&o->do_lu)), rc); + PFID(lu_object_fid(&o->do_lu)), rc < 0 ? 0 : rc, + -EFAULT); + + if (rc >= 0) + rc = -EFAULT; + RETURN(rc); } - llh_hdr = &handle->lgh_hdr->llh_hdr; if (LLOG_REC_HDR_NEEDS_SWABBING(llh_hdr)) lustre_swab_llog_hdr(handle->lgh_hdr); @@ -235,19 +237,31 @@ static int llog_osd_read_header(const struct lu_env *env, PFID(lu_object_fid(&o->do_lu)), llh_hdr->lrh_type, LLOG_HDR_MAGIC); RETURN(-EIO); - } else if (llh_hdr->lrh_len != LLOG_CHUNK_SIZE) { + } else if (llh_hdr->lrh_len < LLOG_MIN_CHUNK_SIZE || + llh_hdr->lrh_len > handle->lgh_hdr_size) { CERROR("%s: incorrectly sized log %s "DFID" header: " - "%#x (expected %#x)\n" + "%#x (expected at least %#x)\n" "you may need to re-run lconf --write_conf.\n", o->do_lu.lo_dev->ld_obd->obd_name, handle->lgh_name ? handle->lgh_name : "", PFID(lu_object_fid(&o->do_lu)), - llh_hdr->lrh_len, LLOG_CHUNK_SIZE); + llh_hdr->lrh_len, LLOG_MIN_CHUNK_SIZE); + RETURN(-EIO); + } else if (LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index > + LLOG_HDR_BITMAP_SIZE(handle->lgh_hdr) || + LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_len != + llh_hdr->lrh_len) { + CERROR("%s: incorrectly sized log %s "DFID" tailer: " + "%#x : rc = %d\n", + o->do_lu.lo_dev->ld_obd->obd_name, + handle->lgh_name ? handle->lgh_name : "", + PFID(lu_object_fid(&o->do_lu)), + LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_len, -EIO); RETURN(-EIO); } handle->lgh_hdr->llh_flags |= (flags & LLOG_F_EXT_MASK); - handle->lgh_last_idx = handle->lgh_hdr->llh_tail.lrt_index; + handle->lgh_last_idx = LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index; RETURN(0); } @@ -277,6 +291,7 @@ static int llog_osd_declare_write_rec(const struct lu_env *env, int idx, struct thandle *th) { struct llog_thread_info *lgi = llog_info(env); + __u32 chunk_size; struct dt_object *o; int rc; @@ -286,12 +301,13 @@ static int llog_osd_declare_write_rec(const struct lu_env *env, LASSERT(th); LASSERT(loghandle); LASSERT(rec); - LASSERT(rec->lrh_len <= LLOG_CHUNK_SIZE); + LASSERT(rec->lrh_len <= loghandle->lgh_ctxt->loc_chunk_size); o = loghandle->lgh_obj; LASSERT(o); - lgi->lgi_buf.lb_len = sizeof(struct llog_log_hdr); + chunk_size = loghandle->lgh_ctxt->loc_chunk_size; + lgi->lgi_buf.lb_len = chunk_size; lgi->lgi_buf.lb_buf = NULL; /* each time we update header */ rc = dt_declare_record_write(env, o, &lgi->lgi_buf, 0, @@ -303,7 +319,7 @@ static int llog_osd_declare_write_rec(const struct lu_env *env, * the pad record can be inserted so take into account double * record size */ - lgi->lgi_buf.lb_len = rec->lrh_len * 2; + lgi->lgi_buf.lb_len = chunk_size * 2; lgi->lgi_buf.lb_buf = NULL; /* XXX: implement declared window or multi-chunks approach */ rc = dt_declare_record_write(env, o, &lgi->lgi_buf, -1, th); @@ -346,6 +362,7 @@ static int llog_osd_write_rec(const struct lu_env *env, int index, rc; struct llog_rec_tail *lrt; struct dt_object *o; + __u32 chunk_size; size_t left; ENTRY; @@ -357,11 +374,12 @@ static int llog_osd_write_rec(const struct lu_env *env, LASSERT(o); LASSERT(th); + chunk_size = llh->llh_hdr.lrh_len; CDEBUG(D_OTHER, "new record %x to "DFID"\n", rec->lrh_type, PFID(lu_object_fid(&o->do_lu))); - /* record length should not bigger than LLOG_CHUNK_SIZE */ - if (reclen > LLOG_CHUNK_SIZE) + /* record length should not bigger than */ + if (reclen > loghandle->lgh_hdr->llh_hdr.lrh_len) RETURN(-E2BIG); rc = dt_attr_get(env, o, &lgi->lgi_attr); @@ -389,7 +407,7 @@ static int llog_osd_write_rec(const struct lu_env *env, /* llog can be empty only when first record is being written */ LASSERT(ergo(idx > 0, lgi->lgi_attr.la_size > 0)); - if (!ext2_test_bit(idx, llh->llh_bitmap)) { + if (!ext2_test_bit(idx, LLOG_HDR_BITMAP(llh))) { CERROR("%s: modify unset record %u\n", o->do_lu.lo_dev->ld_obd->obd_name, idx); RETURN(-ENOENT); @@ -404,7 +422,7 @@ static int llog_osd_write_rec(const struct lu_env *env, if (idx == LLOG_HEADER_IDX) { /* llog header update */ - LASSERT(reclen == sizeof(struct llog_log_hdr)); + LASSERT(reclen >= sizeof(struct llog_log_hdr)); LASSERT(rec == &llh->llh_hdr); lgi->lgi_off = 0; @@ -475,7 +493,7 @@ static int llog_osd_write_rec(const struct lu_env *env, */ LASSERT(lgi->lgi_attr.la_valid & LA_SIZE); lgi->lgi_off = lgi->lgi_attr.la_size; - left = LLOG_CHUNK_SIZE - (lgi->lgi_off & (LLOG_CHUNK_SIZE - 1)); + left = chunk_size - (lgi->lgi_off & (chunk_size - 1)); /* NOTE: padding is a record, but no bit is set */ if (left != 0 && left != reclen && left < (reclen + LLOG_MIN_REC_SIZE)) { @@ -486,36 +504,35 @@ static int llog_osd_write_rec(const struct lu_env *env, loghandle->lgh_last_idx++; /* for pad rec */ } /* if it's the last idx in log file, then return -ENOSPC */ - if (loghandle->lgh_last_idx >= LLOG_BITMAP_SIZE(llh) - 1) + if (loghandle->lgh_last_idx >= LLOG_HDR_BITMAP_SIZE(llh) - 1) RETURN(-ENOSPC); /* increment the last_idx along with llh_tail index, they should * be equal for a llog lifetime */ loghandle->lgh_last_idx++; index = loghandle->lgh_last_idx; - llh->llh_tail.lrt_index = index; + LLOG_HDR_TAIL(llh)->lrt_index = index; /** * NB: the caller should make sure only 1 process access * the lgh_last_idx, e.g. append should be exclusive. * Otherwise it might hit the assert. */ - LASSERT(index < LLOG_BITMAP_SIZE(llh)); + LASSERT(index < LLOG_HDR_BITMAP_SIZE(llh)); rec->lrh_index = index; lrt = rec_tail(rec); lrt->lrt_len = rec->lrh_len; lrt->lrt_index = rec->lrh_index; - /* the lgh_hdr_lock protects llog header data from concurrent + /* the lgh_hdr_mutex protects llog header data from concurrent * update/cancel, the llh_count and llh_bitmap are protected */ - spin_lock(&loghandle->lgh_hdr_lock); - if (ext2_set_bit(index, llh->llh_bitmap)) { + mutex_lock(&loghandle->lgh_hdr_mutex); + if (ext2_set_bit(index, LLOG_HDR_BITMAP(llh))) { CERROR("%s: index %u already set in log bitmap\n", o->do_lu.lo_dev->ld_obd->obd_name, index); - spin_unlock(&loghandle->lgh_hdr_lock); + mutex_unlock(&loghandle->lgh_hdr_mutex); LBUG(); /* should never happen */ } llh->llh_count++; - spin_unlock(&loghandle->lgh_hdr_lock); if (lgi->lgi_attr.la_size == 0) { lgi->lgi_off = 0; @@ -523,8 +540,10 @@ static int llog_osd_write_rec(const struct lu_env *env, lgi->lgi_buf.lb_buf = &llh->llh_hdr; rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); if (rc != 0) - GOTO(out, rc); + GOTO(out_unlock, rc); } else { + __u32 *bitmap = LLOG_HDR_BITMAP(llh); + /* Note: If this is not initialization (size == 0), then do not * write the whole header (8k bytes), only update header/tail * and bits needs to be updated. Because this update might be @@ -538,25 +557,31 @@ static int llog_osd_write_rec(const struct lu_env *env, lgi->lgi_buf.lb_buf = &llh->llh_count; rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); if (rc != 0) - GOTO(out, rc); + GOTO(out_unlock, rc); - lgi->lgi_off = offsetof(typeof(*llh), - llh_bitmap[index / (sizeof(*llh->llh_bitmap) * 8)]); - lgi->lgi_buf.lb_len = sizeof(*llh->llh_bitmap); - lgi->lgi_buf.lb_buf = - &llh->llh_bitmap[index/(sizeof(*llh->llh_bitmap)*8)]; + lgi->lgi_off = llh->llh_bitmap_offset + + (index / (sizeof(*bitmap) * 8)) * sizeof(*bitmap); + lgi->lgi_buf.lb_len = sizeof(*bitmap); + lgi->lgi_buf.lb_buf = &bitmap[index/(sizeof(*bitmap)*8)]; rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); if (rc != 0) - GOTO(out, rc); + GOTO(out_unlock, rc); - lgi->lgi_off = offsetof(typeof(*llh), llh_tail); + lgi->lgi_off = (unsigned long)LLOG_HDR_TAIL(llh) - + (unsigned long)llh; lgi->lgi_buf.lb_len = sizeof(llh->llh_tail); - lgi->lgi_buf.lb_buf = &llh->llh_tail; + lgi->lgi_buf.lb_buf = LLOG_HDR_TAIL(llh); rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); if (rc != 0) - GOTO(out, rc); + GOTO(out_unlock, rc); } +out_unlock: + /* unlock here for remote object */ + mutex_unlock(&loghandle->lgh_hdr_mutex); + if (rc) + GOTO(out, rc); + rc = dt_attr_get(env, o, &lgi->lgi_attr); if (rc) GOTO(out, rc); @@ -569,8 +594,9 @@ static int llog_osd_write_rec(const struct lu_env *env, if (rc < 0) GOTO(out, rc); - CDEBUG(D_OTHER, "added record "DOSTID": idx: %u, %u\n", - POSTID(&loghandle->lgh_id.lgl_oi), index, rec->lrh_len); + CDEBUG(D_OTHER, "added record "DOSTID": idx: %u, %u off"LPU64"\n", + POSTID(&loghandle->lgh_id.lgl_oi), index, rec->lrh_len, + lgi->lgi_off); if (reccookie != NULL) { reccookie->lgc_lgl = loghandle->lgh_id; reccookie->lgc_index = index; @@ -586,14 +612,14 @@ static int llog_osd_write_rec(const struct lu_env *env, RETURN(rc); out: /* cleanup llog for error case */ - spin_lock(&loghandle->lgh_hdr_lock); - ext2_clear_bit(index, llh->llh_bitmap); + mutex_lock(&loghandle->lgh_hdr_mutex); + ext2_clear_bit(index, LLOG_HDR_BITMAP(llh)); llh->llh_count--; - spin_unlock(&loghandle->lgh_hdr_lock); + mutex_unlock(&loghandle->lgh_hdr_mutex); /* restore llog last_idx */ loghandle->lgh_last_idx--; - llh->llh_tail.lrt_index = loghandle->lgh_last_idx; + LLOG_HDR_TAIL(llh)->lrt_index = loghandle->lgh_last_idx; RETURN(rc); } @@ -605,12 +631,19 @@ out: * actual records are larger than minimum size) we just skip * some more records. */ -static inline void llog_skip_over(__u64 *off, int curr, int goal) +static inline void llog_skip_over(struct llog_log_hdr *llh, __u64 *off, + int curr, int goal, __u32 chunk_size) { - if (goal <= curr) - return; - *off = (*off + (goal - curr - 1) * LLOG_MIN_REC_SIZE) & - ~(LLOG_CHUNK_SIZE - 1); + if (goal > curr) { + if (llh->llh_size == 0) { + /* variable size records */ + *off = (*off + (goal - curr - 1) * LLOG_MIN_REC_SIZE); + } else { + *off = chunk_size + (goal - 1) * llh->llh_size; + } + } + /* always align with lower chunk boundary*/ + *off &= ~(chunk_size - 1); } /** @@ -652,7 +685,7 @@ static void changelog_block_trim_ext(struct llog_rec_hdr *hdr, * \param[in,out] cur_offset furtherst point read in the file * \param[in] buf pointer to data buffer to fill * \param[in] len required len to read, it is - * LLOG_CHUNK_SIZE usually. + * usually llog chunk_size. * * \retval 0 on successful buffer read * \retval negative value on error @@ -666,13 +699,15 @@ static int llog_osd_next_block(const struct lu_env *env, struct dt_object *o; struct dt_device *dt; int rc; + __u32 chunk_size; ENTRY; LASSERT(env); LASSERT(lgi); - if (len == 0 || len & (LLOG_CHUNK_SIZE - 1)) + chunk_size = loghandle->lgh_hdr->llh_hdr.lrh_len; + if (len == 0 || len & (chunk_size - 1)) RETURN(-EINVAL); CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off "LPU64")\n", @@ -695,11 +730,12 @@ static int llog_osd_next_block(const struct lu_env *env, struct llog_rec_hdr *rec, *last_rec; struct llog_rec_tail *tail; - llog_skip_over(cur_offset, *cur_idx, next_idx); + llog_skip_over(loghandle->lgh_hdr, cur_offset, *cur_idx, + next_idx, chunk_size); - /* read up to next LLOG_CHUNK_SIZE block */ - lgi->lgi_buf.lb_len = LLOG_CHUNK_SIZE - - (*cur_offset & (LLOG_CHUNK_SIZE - 1)); + /* read up to next llog chunk_size block */ + lgi->lgi_buf.lb_len = chunk_size - + (*cur_offset & (chunk_size - 1)); lgi->lgi_buf.lb_buf = buf; rc = dt_read(env, o, &lgi->lgi_buf, cur_offset); @@ -791,7 +827,7 @@ out: * \param[in] loghandle llog handle of the current llog * \param[in] prev_idx target index to find * \param[in] buf pointer to data buffer to fill - * \param[in] len required len to read, it is LLOG_CHUNK_SIZE usually. + * \param[in] len required len to read, it is llog_chunk_size usually. * * \retval 0 on successful buffer read * \retval negative value on error @@ -804,11 +840,13 @@ static int llog_osd_prev_block(const struct lu_env *env, struct dt_object *o; struct dt_device *dt; loff_t cur_offset; + __u32 chunk_size; int rc; ENTRY; - if (len == 0 || len & (LLOG_CHUNK_SIZE - 1)) + chunk_size = loghandle->lgh_hdr->llh_hdr.lrh_len; + if (len == 0 || len & (chunk_size - 1)) RETURN(-EINVAL); CDEBUG(D_OTHER, "looking for log index %u\n", prev_idx); @@ -822,8 +860,9 @@ static int llog_osd_prev_block(const struct lu_env *env, dt = lu2dt_dev(o->do_lu.lo_dev); LASSERT(dt); - cur_offset = LLOG_CHUNK_SIZE; - llog_skip_over(&cur_offset, 0, prev_idx); + cur_offset = chunk_size; + llog_skip_over(loghandle->lgh_hdr, &cur_offset, 0, prev_idx, + chunk_size); rc = dt_attr_get(env, o, &lgi->lgi_attr); if (rc) @@ -1548,7 +1587,7 @@ out_unlock: dt_write_unlock(env, o); out_trans: dt_trans_stop(env, d, th); - if (llog_dir != NULL) + if (!(IS_ERR_OR_NULL(llog_dir))) lu_object_put(env, &llog_dir->do_lu); RETURN(rc); }