From 8047d75a5b66fbad12255d810ae0a573c52908a7 Mon Sep 17 00:00:00 2001 From: Alexander Boyko Date: Thu, 26 Sep 2024 18:40:50 -0400 Subject: [PATCH] LU-18218 llog: catalog lgh_lock refactoring -call down_read for list processing, and don't use it for single pointer reading. -pass catalog flags to plain log. llog_osd_prev_block() does not read from the end of file to the beggining, and duplicates llog_osd_next_block. tail->lrh_len was used without swabbing, it is an error. Fixes llog_client error processing, according to llog_osd_next_block EIO -> EBADR change. It fixes test_135 race where only one record at llog. A cancel happens before ENOSPC, a llog stays empty at catalog and could not be deleted. Fixes: 1a24dcdce121 ("LU-15938 lod: prevent endless retry in recovery thread") HPE-bug-id: LUS-11970 Signed-off-by: Alexander Boyko Change-Id: I353b3f291f2de65924f90650a400333a747a4f74 Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/56341 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Alexey Lyashkov Reviewed-by: Andrew Perepechko Reviewed-by: Oleg Drokin --- lustre/obdclass/llog.c | 4 +- lustre/obdclass/llog_cat.c | 299 ++++++++++++++++++++++++-------------------- lustre/obdclass/llog_osd.c | 158 +++++++---------------- lustre/ptlrpc/llog_client.c | 12 +- 4 files changed, 215 insertions(+), 258 deletions(-) diff --git a/lustre/obdclass/llog.c b/lustre/obdclass/llog.c index e59e723..3182d31 100644 --- a/lustre/obdclass/llog.c +++ b/lustre/obdclass/llog.c @@ -1263,7 +1263,7 @@ int llog_open_create(const struct lu_env *env, struct llog_ctxt *ctxt, d = lu2dt_dev((*res)->lgh_obj->do_lu.lo_dev); if (unlikely(unlikely(d->dd_rdonly))) - RETURN(-EROFS); + GOTO(out, rc = -EROFS); th = dt_trans_create(env, d); if (IS_ERR(th)) @@ -1422,6 +1422,8 @@ int llog_open(const struct lu_env *env, struct llog_ctxt *ctxt, rc = ctxt->loc_logops->lop_open(env, *lgh, logid, name, open_param); llog_restore_resource(old_cred); if (rc) { + CDEBUG(D_OTHER, "%s: Failed to open llog %s: rc %d\n", + ctxt->loc_obd->obd_name, name ? name : "", rc); llog_free_handle(*lgh); *lgh = NULL; } diff --git a/lustre/obdclass/llog_cat.c b/lustre/obdclass/llog_cat.c index a0da211..e2954c6 100644 --- a/lustre/obdclass/llog_cat.c +++ b/lustre/obdclass/llog_cat.c @@ -160,7 +160,8 @@ static int llog_cat_new_log(const struct lu_env *env, GOTO(out, rc); } - rc = llog_init_handle(env, loghandle, + rc = llog_init_handle(env, loghandle, (cathandle->lgh_hdr->llh_flags & + LLOG_F_EXT_MASK) | LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY, &cathandle->lgh_hdr->llh_tgtuuid); if (rc < 0) @@ -240,7 +241,8 @@ static int llog_cat_refresh(const struct lu_env *env, struct llog_handle *loghandle; int rc; - down_write(&cathandle->lgh_lock); + LASSERT(rwsem_is_locked(&cathandle->lgh_lock)); + list_for_each_entry(loghandle, &cathandle->u.chd.chd_head, u.phd.phd_entry) { if (!llog_exist(loghandle)) @@ -250,16 +252,36 @@ static int llog_cat_refresh(const struct lu_env *env, rc = llog_read_header(env, loghandle, NULL); up_write(&loghandle->lgh_lock); if (rc) - goto unlock; + goto out; } rc = llog_read_header(env, cathandle, NULL); -unlock: - up_write(&cathandle->lgh_lock); - +out: return rc; } +static inline int llog_cat_declare_create(const struct lu_env *env, + struct llog_handle *cathandle, + struct llog_handle *loghandle, + struct thandle *th) +{ + + struct llog_thread_info *lgi = llog_info(env); + struct llog_logid_rec *lirec = &lgi->lgi_logid; + int rc; + + rc = llog_declare_create(env, loghandle, th); + if (rc) + return rc; + + lirec->lid_hdr.lrh_len = sizeof(*lirec); + rc = llog_declare_write_rec(env, cathandle, &lirec->lid_hdr, -1, + th); + if (!rc) + dt_declare_attr_set(env, cathandle->lgh_obj, NULL, th); + + return rc; +} /* * prepare current/next log for catalog. * @@ -275,89 +297,79 @@ static int llog_cat_prep_log(const struct lu_env *env, struct llog_handle **ploghandle, struct thandle *th) { + struct llog_handle *loghandle; int rc; - int sem_upgraded; start: rc = 0; - sem_upgraded = 0; - if (IS_ERR_OR_NULL(*ploghandle)) { - up_read(&cathandle->lgh_lock); - down_write(&cathandle->lgh_lock); - sem_upgraded = 1; - if (IS_ERR_OR_NULL(*ploghandle)) { - struct llog_handle *loghandle; - - rc = llog_open(env, cathandle->lgh_ctxt, &loghandle, - NULL, NULL, LLOG_OPEN_NEW); - if (!rc) { - *ploghandle = loghandle; - list_add_tail(&loghandle->u.phd.phd_entry, - &cathandle->u.chd.chd_head); - } - } - if (rc) - GOTO(out, rc); + if (!IS_ERR_OR_NULL(*ploghandle)) { + if (llog_exist(*ploghandle) == 0) + return llog_cat_declare_create(env, cathandle, + *ploghandle, th); + return rc; } - rc = llog_exist(*ploghandle); - if (rc < 0) - GOTO(out, rc); - if (rc) - GOTO(out, rc = 0); + down_write(&cathandle->lgh_lock); + if (!IS_ERR_OR_NULL(*ploghandle)) { + up_write(&cathandle->lgh_lock); + if (llog_exist(*ploghandle) == 0) + return llog_cat_declare_create(env, cathandle, + *ploghandle, th); + return rc; + } + + /* Slow path with open/create declare, only one thread do all stuff + * and share loghandle at the end + */ + rc = llog_open(env, cathandle->lgh_ctxt, &loghandle, NULL, NULL, + LLOG_OPEN_NEW); + if (rc) { + up_write(&cathandle->lgh_lock); + CDEBUG(D_OTHER, "%s: failed to open log, catalog "DFID" %d\n", + loghandle2name(cathandle), PLOGID(&cathandle->lgh_id), + rc); + return rc; + } if (dt_object_remote(cathandle->lgh_obj)) { - down_write_nested(&(*ploghandle)->lgh_lock, LLOGH_LOG); - if (!llog_exist(*ploghandle)) { - /* For remote operation, if we put the llog object - * creation in the current transaction, then the - * llog object will not be created on the remote - * target until the transaction stop, if other - * operations start before the transaction stop, - * and use the same llog object, will be dependent - * on the success of this transaction. So let's - * create the llog object synchronously here to - * remove the dependency. */ - rc = llog_cat_new_log(env, cathandle, *ploghandle, - NULL); - if (rc == -ESTALE) { - up_write(&(*ploghandle)->lgh_lock); - if (sem_upgraded) - up_write(&cathandle->lgh_lock); - else - up_read(&cathandle->lgh_lock); - - rc = llog_cat_refresh(env, cathandle); - down_read_nested(&cathandle->lgh_lock, - LLOGH_CAT); - if (rc) - return rc; - /* *ploghandle might become NULL, restart */ - goto start; - } - } - up_write(&(*ploghandle)->lgh_lock); + /* For remote operation, if we put the llog object + * creation in the current transaction, then the + * llog object will not be created on the remote + * target until the transaction stop, if other + * operations start before the transaction stop, + * and use the same llog object, will be dependent + * on the success of this transaction. So let's + * create the llog object synchronously here to + * remove the dependency. + */ + rc = llog_cat_new_log(env, cathandle, loghandle, NULL); + if (rc == -ESTALE) { + rc = llog_cat_refresh(env, cathandle); + if (rc) + GOTO(out, rc); + up_write(&cathandle->lgh_lock); + llog_close(env, loghandle); + goto start; + } else if (rc) + GOTO(out, rc); } else { - struct llog_thread_info *lgi = llog_info(env); - struct llog_logid_rec *lirec = &lgi->lgi_logid; - - rc = llog_declare_create(env, *ploghandle, th); + rc = llog_cat_declare_create(env, cathandle, loghandle, th); if (rc) GOTO(out, rc); - - lirec->lid_hdr.lrh_len = sizeof(*lirec); - rc = llog_declare_write_rec(env, cathandle, &lirec->lid_hdr, -1, - th); - dt_declare_attr_set(env, cathandle->lgh_obj, NULL, th); } + list_add(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head); + *ploghandle = loghandle; + out: - if (sem_upgraded) { - up_write(&cathandle->lgh_lock); - down_read_nested(&cathandle->lgh_lock, LLOGH_CAT); - if (rc == 0) - goto start; - } + up_write(&cathandle->lgh_lock); + CDEBUG(D_OTHER, "%s: open log "DFID" for catalog "DFID" rc=%d\n", + loghandle2name(cathandle), PLOGID(&loghandle->lgh_id), + PLOGID(&cathandle->lgh_id), rc); + + if (rc) + llog_close(env, loghandle); + return rc; } @@ -383,7 +395,7 @@ int llog_cat_id2handle(const struct lu_env *env, struct llog_handle *cathandle, RETURN(-EBADF); fmt = cathandle->lgh_hdr->llh_flags & LLOG_F_EXT_MASK; - down_write(&cathandle->lgh_lock); + down_read(&cathandle->lgh_lock); list_for_each_entry(loghandle, &cathandle->u.chd.chd_head, u.phd.phd_entry) { struct llog_logid *cgl = &loghandle->lgh_id; @@ -398,11 +410,11 @@ int llog_cat_id2handle(const struct lu_env *env, struct llog_handle *cathandle, continue; } loghandle->u.phd.phd_cat_handle = cathandle; - up_write(&cathandle->lgh_lock); + up_read(&cathandle->lgh_lock); RETURN(rc); } } - up_write(&cathandle->lgh_lock); + up_read(&cathandle->lgh_lock); rc = llog_open(env, cathandle->lgh_ctxt, &loghandle, logid, NULL, LLOG_OPEN_EXISTS); @@ -423,7 +435,7 @@ int llog_cat_id2handle(const struct lu_env *env, struct llog_handle *cathandle, *res = llog_handle_get(loghandle); LASSERT(*res); down_write(&cathandle->lgh_lock); - list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head); + list_add(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head); up_write(&cathandle->lgh_lock); loghandle->u.phd.phd_cat_handle = cathandle; @@ -483,73 +495,70 @@ EXPORT_SYMBOL(llog_cat_close); static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle, struct thandle *th) { - struct llog_handle *loghandle = NULL; - ENTRY; + struct llog_handle *loghandle = NULL; + ENTRY; if (CFS_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED2)) { - down_write_nested(&cathandle->lgh_lock, LLOGH_CAT); + loghandle = cathandle->u.chd.chd_current_log; GOTO(next, loghandle); } - down_read_nested(&cathandle->lgh_lock, LLOGH_CAT); - loghandle = cathandle->u.chd.chd_current_log; - if (loghandle) { - struct llog_log_hdr *llh; - - down_write_nested(&loghandle->lgh_lock, LLOGH_LOG); - llh = loghandle->lgh_hdr; - if (llh == NULL || !llog_is_full(loghandle)) { - up_read(&cathandle->lgh_lock); - RETURN(loghandle); - } else { - up_write(&loghandle->lgh_lock); - } - } - up_read(&cathandle->lgh_lock); - - /* time to use next log */ - - /* first, we have to make sure the state hasn't changed */ - down_write_nested(&cathandle->lgh_lock, LLOGH_CAT); +retry: loghandle = cathandle->u.chd.chd_current_log; - if (loghandle) { + if (likely(loghandle)) { struct llog_log_hdr *llh; down_write_nested(&loghandle->lgh_lock, LLOGH_LOG); llh = loghandle->lgh_hdr; if (llh == NULL || !llog_is_full(loghandle)) - GOTO(out_unlock, loghandle); + RETURN(loghandle); else up_write(&loghandle->lgh_lock); } + /* time to use next log */ next: - /* Sigh, the chd_next_log and chd_current_log is initialized - * in declare phase, and we do not serialize the catlog - * accessing, so it might be possible the llog creation - * thread (see llog_cat_declare_add_rec()) did not create - * llog successfully, then the following thread might - * meet this situation. */ - if (IS_ERR_OR_NULL(cathandle->u.chd.chd_next_log)) { - CERROR("%s: next log does not exist!\n", - loghandle2name(cathandle)); - loghandle = ERR_PTR(-EIO); - if (cathandle->u.chd.chd_next_log == NULL) { - /* Store the error in chd_next_log, so - * the following process can get correct - * failure value */ - cathandle->u.chd.chd_next_log = loghandle; + /* first, we have to make sure the state hasn't changed */ + down_write_nested(&cathandle->lgh_lock, LLOGH_CAT); + if (unlikely(loghandle == cathandle->u.chd.chd_current_log)) { + struct llog_logid lid = {.lgl_oi.oi.oi_id = 0, + .lgl_oi.oi.oi_seq = 0, + .lgl_ogen = 0}; + /* Sigh, the chd_next_log and chd_current_log is initialized + * in declare phase, and we do not serialize the catlog + * accessing, so it might be possible the llog creation + * thread (see llog_cat_declare_add_rec()) did not create + * llog successfully, then the following thread might + * meet this situation. + */ + if (IS_ERR_OR_NULL(cathandle->u.chd.chd_next_log)) { + CERROR("%s: next log does not exist, catalog "DFID" rc=%d\n", + loghandle2name(cathandle), + PLOGID(&cathandle->lgh_id), -EIO); + loghandle = ERR_PTR(-EIO); + if (cathandle->u.chd.chd_next_log == NULL) { + /* Store the error in chd_next_log, so + * the following process can get correct + * failure value + */ + cathandle->u.chd.chd_next_log = loghandle; + } + GOTO(out_unlock, loghandle); } - GOTO(out_unlock, loghandle); - } - - CDEBUG(D_INODE, "use next log\n"); + if (!IS_ERR_OR_NULL(loghandle)) + lid = loghandle->lgh_id; - loghandle = cathandle->u.chd.chd_next_log; - cathandle->u.chd.chd_current_log = loghandle; - cathandle->u.chd.chd_next_log = NULL; - down_write_nested(&loghandle->lgh_lock, LLOGH_LOG); + CDEBUG(D_OTHER, "%s: use next log "DFID"->"DFID" catalog "DFID"\n", + loghandle2name(cathandle), PLOGID(&lid), + PLOGID(&cathandle->u.chd.chd_next_log->lgh_id), + PLOGID(&cathandle->lgh_id)); + loghandle = cathandle->u.chd.chd_next_log; + cathandle->u.chd.chd_current_log = loghandle; + cathandle->u.chd.chd_next_log = NULL; + } + up_write(&cathandle->lgh_lock); + GOTO(retry, loghandle); out_unlock: up_write(&cathandle->lgh_lock); @@ -583,14 +592,24 @@ retry: rc = llog_cat_new_log(env, cathandle, loghandle, th); if (rc < 0) { up_write(&loghandle->lgh_lock); - /* nobody should be trying to use this llog */ - down_write(&cathandle->lgh_lock); - if (cathandle->u.chd.chd_current_log == loghandle) - cathandle->u.chd.chd_current_log = NULL; - up_write(&cathandle->lgh_lock); + /* When ENOSPC happened no need to drop loghandle + * a new one would be allocated anyway for next llog_add + * so better to stay with the old. + */ + if (rc != -ENOSPC) { + /* nobody should be trying to use this llog */ + down_write(&cathandle->lgh_lock); + if (cathandle->u.chd.chd_current_log == + loghandle) + cathandle->u.chd.chd_current_log = NULL; + list_del_init(&loghandle->u.phd.phd_entry); + up_write(&cathandle->lgh_lock); + llog_close(env, loghandle); + } RETURN(rc); } } + /* now let's try to add the record */ rc = llog_write_rec(env, loghandle, rec, reccookie, LLOG_NEXT_IDX, th); if (rc < 0) { @@ -633,22 +652,28 @@ int llog_cat_declare_add_rec(const struct lu_env *env, ENTRY; start: - down_read_nested(&cathandle->lgh_lock, LLOGH_CAT); + CDEBUG(D_INFO, "Declare adding to "DOSTID" flags %x count %d\n", + POSTID(&cathandle->lgh_id.lgl_oi), + cathandle->lgh_hdr->llh_flags, cathandle->lgh_hdr->llh_count); + + rc = llog_cat_prep_log(env, cathandle, &cathandle->u.chd.chd_current_log, th); if (rc) - GOTO(unlock, rc); + RETURN(rc); + /* For local llog this would always reserves credits for creation */ rc = llog_cat_prep_log(env, cathandle, &cathandle->u.chd.chd_next_log, th); if (rc) - GOTO(unlock, rc); + RETURN(rc); rc = llog_declare_write_rec(env, cathandle->u.chd.chd_current_log, rec, -1, th); if (rc == -ESTALE && dt_object_remote(cathandle->lgh_obj)) { - up_read(&cathandle->lgh_lock); + down_write(&cathandle->lgh_lock); rc = llog_cat_refresh(env, cathandle); + up_write(&cathandle->lgh_lock); if (rc) RETURN(rc); goto start; @@ -665,8 +690,6 @@ start: rc = llog_declare_write_rec(env, cathandle->u.chd.chd_next_log, rec, -1, th); #endif -unlock: - up_read(&cathandle->lgh_lock); RETURN(rc); } EXPORT_SYMBOL(llog_cat_declare_add_rec); diff --git a/lustre/obdclass/llog_osd.c b/lustre/obdclass/llog_osd.c index 9151cb4..158a469 100644 --- a/lustre/obdclass/llog_osd.c +++ b/lustre/obdclass/llog_osd.c @@ -572,7 +572,7 @@ static int llog_osd_write_rec(const struct lu_env *env, if (loghandle->lgh_max_size > 0 && lgi->lgi_off >= loghandle->lgh_max_size) { - CDEBUG(D_OTHER, "llog is getting too large (%u > %u) at %u " + CDEBUG(D_OTHER, "llog is getting too large (%u >= %u) at %u " DFID"\n", (unsigned)lgi->lgi_off, loghandle->lgh_max_size, (int)loghandle->lgh_last_idx, PLOGID(&loghandle->lgh_id)); @@ -734,6 +734,16 @@ out_unlock: if (rc < 0) GOTO(out, rc); + if (loghandle->lgh_max_size > 0 && + lgi->lgi_off >= loghandle->lgh_max_size) { + CDEBUG(D_OTHER, "llog is getting too large (%u >= %u) at %u " + DFID"\n", (unsigned int)lgi->lgi_off, + loghandle->lgh_max_size, (int)loghandle->lgh_last_idx, + PLOGID(&loghandle->lgh_id)); + /* this is to signal that this llog is full */ + loghandle->lgh_last_idx = llog_max_idx(llh); + } + up_write(&loghandle->lgh_last_sem); CDEBUG(D_HA, "added record "DFID".%u, %u off%llu\n", @@ -1116,11 +1126,12 @@ static int llog_osd_next_block(const struct lu_env *env, int next_idx, __u64 *cur_offset, void *buf, int len) { - struct llog_thread_info *lgi = llog_info(env); - struct dt_object *o; - struct dt_device *dt; - int rc; - __u32 chunk_size; + struct llog_thread_info *lgi = llog_info(env); + struct dt_object *o; + struct dt_device *dt; + int rc; + __u32 chunk_size; + __u32 tail_len; int last_idx = *cur_idx; __u64 last_offset = *cur_offset; bool force_mini_rec = !next_idx; @@ -1217,15 +1228,28 @@ static int llog_osd_next_block(const struct lu_env *env, lustre_swab_llog_rec(rec); tail = (struct llog_rec_tail *)((char *)buf + rc - sizeof(struct llog_rec_tail)); + tail_len = tail->lrt_len; + /* base on tail_len do swab */ + if (tail_len > chunk_size) { + __swab32s(&tail_len); + if (tail_len > chunk_size) { + CERROR("%s: invalid llog tail at log id "DFID":%x offset %llu tail idx %u lrt len %u read_size %d\n", + o->do_lu.lo_dev->ld_obd->obd_name, + PFID(&loghandle->lgh_id.lgl_oi.oi_fid), + loghandle->lgh_id.lgl_ogen, *cur_offset, + tail->lrt_index, tail->lrt_len, rc); + /* tail is broken */ + GOTO(out, rc = -EINVAL); + } + } + /* get the last record in block */ + last_rec = (struct llog_rec_hdr *)((char *)tail - tail_len + + sizeof(struct llog_rec_tail)); /* caller handles bad records if any */ if (llog_verify_record(loghandle, rec)) GOTO(out, rc = 0); - /* get the last record in block */ - last_rec = (struct llog_rec_hdr *)((char *)buf + rc - - tail->lrt_len); - if (LLOG_REC_HDR_NEEDS_SWABBING(last_rec)) lustre_swab_llog_rec(last_rec); @@ -1310,110 +1334,14 @@ static int llog_osd_prev_block(const struct lu_env *env, struct llog_handle *loghandle, int prev_idx, void *buf, int len) { - struct llog_thread_info *lgi = llog_info(env); - struct dt_object *o; - struct dt_device *dt; - loff_t cur_offset; - __u32 chunk_size; - int rc; - - ENTRY; - - chunk_size = loghandle->lgh_hdr->llh_hdr.lrh_len; - if (len == 0 || len & (chunk_size - 1)) - RETURN(-EINVAL); - - CDEBUG(D_OTHER, "looking for log index %u\n", prev_idx); - - LASSERT(loghandle); - LASSERT(loghandle->lgh_ctxt); - - o = loghandle->lgh_obj; - LASSERT(o); - dt_read_lock(env, o, 0); - if (!llog_osd_exist(loghandle)) - GOTO(out, rc = -ESTALE); - - dt = lu2dt_dev(o->do_lu.lo_dev); - LASSERT(dt); - - /* Let's only use mini record size for previous block read - * for now XXX */ - cur_offset = chunk_size; - llog_skip_over(loghandle, &cur_offset, 0, prev_idx, - chunk_size, true); - - rc = dt_attr_get(env, o, &lgi->lgi_attr); - if (rc) - GOTO(out, rc); - - while (cur_offset < lgi->lgi_attr.la_size) { - struct llog_rec_hdr *rec, *last_rec; - struct llog_rec_tail *tail; - - lgi->lgi_buf.lb_len = len; - lgi->lgi_buf.lb_buf = buf; - rc = dt_read(env, o, &lgi->lgi_buf, &cur_offset); - if (rc < 0) { - CERROR("%s: can't read llog block from log "DFID - " offset %llu: rc = %d\n", - o->do_lu.lo_dev->ld_obd->obd_name, - PFID(lu_object_fid(&o->do_lu)), cur_offset, rc); - GOTO(out, rc); - } - - if (rc == 0) /* end of file, nothing to do */ - GOTO(out, rc); - - if (rc < sizeof(*tail)) { - CERROR("%s: invalid llog block at log id "DFID" offset %llu\n", - o->do_lu.lo_dev->ld_obd->obd_name, - PLOGID(&loghandle->lgh_id), cur_offset); - GOTO(out, rc = -EINVAL); - } - - rec = buf; - if (LLOG_REC_HDR_NEEDS_SWABBING(rec)) - lustre_swab_llog_rec(rec); - - tail = (struct llog_rec_tail *)((char *)buf + rc - - sizeof(struct llog_rec_tail)); - /* get the last record in block */ - last_rec = (struct llog_rec_hdr *)((char *)buf + rc - - le32_to_cpu(tail->lrt_len)); - - if (LLOG_REC_HDR_NEEDS_SWABBING(last_rec)) - lustre_swab_llog_rec(last_rec); - LASSERT(last_rec->lrh_index == tail->lrt_index); - - /* this shouldn't happen */ - if (tail->lrt_index == 0) { - CERROR("%s: invalid llog tail at log id "DFID" offset %llu\n", - o->do_lu.lo_dev->ld_obd->obd_name, - PLOGID(&loghandle->lgh_id), cur_offset); - GOTO(out, rc = -EINVAL); - } - if (tail->lrt_index < prev_idx) - continue; - - /* sanity check that the start of the new buffer is no farther - * than the record that we wanted. This shouldn't happen. */ - if (rec->lrh_index > prev_idx) { - CERROR("%s: missed desired record? %u > %u\n", - o->do_lu.lo_dev->ld_obd->obd_name, - rec->lrh_index, prev_idx); - GOTO(out, rc = -ENOENT); - } + loff_t cur_offset; + int cur_idx; - /* Trim unsupported extensions for compat w/ older clients */ - changelog_block_trim_ext(rec, last_rec, loghandle); + cur_offset = loghandle->lgh_hdr->llh_hdr.lrh_len; + cur_idx = 1; - GOTO(out, rc = 0); - } - GOTO(out, rc = -EIO); -out: - dt_read_unlock(env, o); - return rc; + return llog_osd_next_block(env, loghandle, &cur_idx, prev_idx, + &cur_offset, buf, len); } /** @@ -1602,7 +1530,11 @@ after_open: RETURN(rc); out_put: - dt_object_put(env, o); + if (ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) + /* according to llog_osd_close() */ + dt_object_put_nocache(env, o); + else + dt_object_put(env, o); out_name: OBD_FREE(handle->lgh_name, strlen(name) + 1); out: diff --git a/lustre/ptlrpc/llog_client.c b/lustre/ptlrpc/llog_client.c index 474729f..46f89fa 100644 --- a/lustre/ptlrpc/llog_client.c +++ b/lustre/ptlrpc/llog_client.c @@ -183,17 +183,17 @@ static int llog_client_next_block(const struct lu_env *env, ptlrpc_request_set_replen(req); rc = ptlrpc_queue_wait(req); /* - * -EIO has a special meaning here. If llog_osd_next_block() + * -EBADR has a special meaning here. If llog_osd_next_block() * reaches the end of the log without finding the desired * record then it updates *cur_offset and *cur_idx and returns - * -EIO. In llog_process_thread() we use this to detect - * EOF. But we must be careful to distinguish between -EIO - * coming from llog_osd_next_block() and -EIO coming from + * -EBADR. In llog_process_thread() we use this to detect + * EOF. But we must be careful to distinguish between -EBADR + * coming from llog_osd_next_block() and -EBADR coming from * ptlrpc or below. */ - if (rc == -EIO) { + if (rc == -EBADR) { if (!req->rq_repmsg || - lustre_msg_get_status(req->rq_repmsg) != -EIO) + lustre_msg_get_status(req->rq_repmsg) != -EBADR) GOTO(out, rc); } else if (rc < 0) { GOTO(out, rc); -- 1.8.3.1