Whamcloud - gitweb
LU-18218 llog: catalog lgh_lock refactoring 41/56341/12
authorAlexander Boyko <alexander.boyko@hpe.com>
Thu, 26 Sep 2024 22:40:50 +0000 (18:40 -0400)
committerOleg Drokin <green@whamcloud.com>
Thu, 2 Jan 2025 20:41:41 +0000 (20:41 +0000)
-call down_read for list processing, and don't use it for
 single pointer reading.
-pass catalog flags to plain log.

llog_osd_prev_block() does not read from the end of file to
the beggining, and duplicates llog_osd_next_block.
tail->lrh_len was used without swabbing, it is an error.

Fixes llog_client error processing, according to llog_osd_next_block
EIO -> EBADR change.

It fixes test_135 race where only one record at llog. A cancel
happens before ENOSPC, a llog stays empty at catalog and could not be
deleted.

Fixes: 1a24dcdce121 ("LU-15938 lod: prevent endless retry in recovery thread")
HPE-bug-id: LUS-11970
Signed-off-by: Alexander Boyko <alexander.boyko@hpe.com>
Change-Id: I353b3f291f2de65924f90650a400333a747a4f74
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/56341
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Alexey Lyashkov <alexey.lyashkov@hpe.com>
Reviewed-by: Andrew Perepechko <andrew.perepechko@hpe.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/obdclass/llog.c
lustre/obdclass/llog_cat.c
lustre/obdclass/llog_osd.c
lustre/ptlrpc/llog_client.c

index e59e723..3182d31 100644 (file)
@@ -1263,7 +1263,7 @@ int llog_open_create(const struct lu_env *env, struct llog_ctxt *ctxt,
        d = lu2dt_dev((*res)->lgh_obj->do_lu.lo_dev);
 
        if (unlikely(unlikely(d->dd_rdonly)))
-               RETURN(-EROFS);
+               GOTO(out, rc = -EROFS);
 
        th = dt_trans_create(env, d);
        if (IS_ERR(th))
@@ -1422,6 +1422,8 @@ int llog_open(const struct lu_env *env, struct llog_ctxt *ctxt,
        rc = ctxt->loc_logops->lop_open(env, *lgh, logid, name, open_param);
        llog_restore_resource(old_cred);
        if (rc) {
+               CDEBUG(D_OTHER, "%s: Failed to open llog %s: rc %d\n",
+                      ctxt->loc_obd->obd_name, name ? name : "", rc);
                llog_free_handle(*lgh);
                *lgh = NULL;
        }
index a0da211..e2954c6 100644 (file)
@@ -160,7 +160,8 @@ static int llog_cat_new_log(const struct lu_env *env,
                GOTO(out, rc);
        }
 
-       rc = llog_init_handle(env, loghandle,
+       rc = llog_init_handle(env, loghandle, (cathandle->lgh_hdr->llh_flags &
+                             LLOG_F_EXT_MASK) |
                              LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
                              &cathandle->lgh_hdr->llh_tgtuuid);
        if (rc < 0)
@@ -240,7 +241,8 @@ static int llog_cat_refresh(const struct lu_env *env,
        struct llog_handle *loghandle;
        int rc;
 
-       down_write(&cathandle->lgh_lock);
+       LASSERT(rwsem_is_locked(&cathandle->lgh_lock));
+
        list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
                            u.phd.phd_entry) {
                if (!llog_exist(loghandle))
@@ -250,16 +252,36 @@ static int llog_cat_refresh(const struct lu_env *env,
                rc = llog_read_header(env, loghandle, NULL);
                up_write(&loghandle->lgh_lock);
                if (rc)
-                       goto unlock;
+                       goto out;
        }
 
        rc = llog_read_header(env, cathandle, NULL);
-unlock:
-       up_write(&cathandle->lgh_lock);
-
+out:
        return rc;
 }
 
+static inline int llog_cat_declare_create(const struct lu_env *env,
+                                         struct llog_handle *cathandle,
+                                         struct llog_handle *loghandle,
+                                         struct thandle *th)
+{
+
+       struct llog_thread_info *lgi = llog_info(env);
+       struct llog_logid_rec *lirec = &lgi->lgi_logid;
+       int rc;
+
+       rc = llog_declare_create(env, loghandle, th);
+       if (rc)
+               return rc;
+
+       lirec->lid_hdr.lrh_len = sizeof(*lirec);
+       rc = llog_declare_write_rec(env, cathandle, &lirec->lid_hdr, -1,
+                                   th);
+       if (!rc)
+               dt_declare_attr_set(env, cathandle->lgh_obj, NULL, th);
+
+       return rc;
+}
 /*
  * prepare current/next log for catalog.
  *
@@ -275,89 +297,79 @@ static int llog_cat_prep_log(const struct lu_env *env,
                             struct llog_handle **ploghandle,
                             struct thandle *th)
 {
+       struct llog_handle *loghandle;
        int rc;
-       int sem_upgraded;
 
 start:
        rc = 0;
-       sem_upgraded = 0;
-       if (IS_ERR_OR_NULL(*ploghandle)) {
-               up_read(&cathandle->lgh_lock);
-               down_write(&cathandle->lgh_lock);
-               sem_upgraded = 1;
-               if (IS_ERR_OR_NULL(*ploghandle)) {
-                       struct llog_handle *loghandle;
-
-                       rc = llog_open(env, cathandle->lgh_ctxt, &loghandle,
-                                      NULL, NULL, LLOG_OPEN_NEW);
-                       if (!rc) {
-                               *ploghandle = loghandle;
-                               list_add_tail(&loghandle->u.phd.phd_entry,
-                                             &cathandle->u.chd.chd_head);
-                       }
-               }
-               if (rc)
-                       GOTO(out, rc);
+       if (!IS_ERR_OR_NULL(*ploghandle)) {
+               if (llog_exist(*ploghandle) == 0)
+                       return llog_cat_declare_create(env, cathandle,
+                                                      *ploghandle, th);
+               return rc;
        }
 
-       rc = llog_exist(*ploghandle);
-       if (rc < 0)
-               GOTO(out, rc);
-       if (rc)
-               GOTO(out, rc = 0);
+       down_write(&cathandle->lgh_lock);
+       if (!IS_ERR_OR_NULL(*ploghandle)) {
+               up_write(&cathandle->lgh_lock);
+               if (llog_exist(*ploghandle) == 0)
+                       return llog_cat_declare_create(env, cathandle,
+                                                      *ploghandle, th);
+               return rc;
+       }
+
+       /* Slow path with open/create declare, only one thread do all stuff
+        * and share loghandle at the end
+        */
+       rc = llog_open(env, cathandle->lgh_ctxt, &loghandle, NULL, NULL,
+                      LLOG_OPEN_NEW);
+       if (rc) {
+               up_write(&cathandle->lgh_lock);
+               CDEBUG(D_OTHER, "%s: failed to open log, catalog "DFID" %d\n",
+                      loghandle2name(cathandle), PLOGID(&cathandle->lgh_id),
+                      rc);
+               return rc;
+       }
 
        if (dt_object_remote(cathandle->lgh_obj)) {
-               down_write_nested(&(*ploghandle)->lgh_lock, LLOGH_LOG);
-               if (!llog_exist(*ploghandle)) {
-                       /* For remote operation, if we put the llog object
-                        * creation in the current transaction, then the
-                        * llog object will not be created on the remote
-                        * target until the transaction stop, if other
-                        * operations start before the transaction stop,
-                        * and use the same llog object, will be dependent
-                        * on the success of this transaction. So let's
-                        * create the llog object synchronously here to
-                        * remove the dependency. */
-                       rc = llog_cat_new_log(env, cathandle, *ploghandle,
-                                             NULL);
-                       if (rc == -ESTALE) {
-                               up_write(&(*ploghandle)->lgh_lock);
-                               if (sem_upgraded)
-                                       up_write(&cathandle->lgh_lock);
-                               else
-                                       up_read(&cathandle->lgh_lock);
-
-                               rc = llog_cat_refresh(env, cathandle);
-                               down_read_nested(&cathandle->lgh_lock,
-                                                LLOGH_CAT);
-                               if (rc)
-                                       return rc;
-                               /* *ploghandle might become NULL, restart */
-                               goto start;
-                       }
-               }
-               up_write(&(*ploghandle)->lgh_lock);
+               /* For remote operation, if we put the llog object
+                * creation in the current transaction, then the
+                * llog object will not be created on the remote
+                * target until the transaction stop, if other
+                * operations start before the transaction stop,
+                * and use the same llog object, will be dependent
+                * on the success of this transaction. So let's
+                * create the llog object synchronously here to
+                * remove the dependency.
+                */
+               rc = llog_cat_new_log(env, cathandle, loghandle, NULL);
+               if (rc == -ESTALE) {
+                       rc = llog_cat_refresh(env, cathandle);
+                       if (rc)
+                               GOTO(out, rc);
+                       up_write(&cathandle->lgh_lock);
+                       llog_close(env, loghandle);
+                       goto start;
+               } else if (rc)
+                       GOTO(out, rc);
        } else {
-               struct llog_thread_info *lgi = llog_info(env);
-               struct llog_logid_rec *lirec = &lgi->lgi_logid;
-
-               rc = llog_declare_create(env, *ploghandle, th);
+               rc = llog_cat_declare_create(env, cathandle, loghandle, th);
                if (rc)
                        GOTO(out, rc);
-
-               lirec->lid_hdr.lrh_len = sizeof(*lirec);
-               rc = llog_declare_write_rec(env, cathandle, &lirec->lid_hdr, -1,
-                                           th);
-               dt_declare_attr_set(env, cathandle->lgh_obj, NULL, th);
        }
 
+       list_add(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
+       *ploghandle = loghandle;
+
 out:
-       if (sem_upgraded) {
-               up_write(&cathandle->lgh_lock);
-               down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
-               if (rc == 0)
-                       goto start;
-       }
+       up_write(&cathandle->lgh_lock);
+       CDEBUG(D_OTHER, "%s: open log "DFID" for catalog "DFID" rc=%d\n",
+              loghandle2name(cathandle), PLOGID(&loghandle->lgh_id),
+              PLOGID(&cathandle->lgh_id), rc);
+
+       if (rc)
+               llog_close(env, loghandle);
+
        return rc;
 }
 
@@ -383,7 +395,7 @@ int llog_cat_id2handle(const struct lu_env *env, struct llog_handle *cathandle,
                RETURN(-EBADF);
 
        fmt = cathandle->lgh_hdr->llh_flags & LLOG_F_EXT_MASK;
-       down_write(&cathandle->lgh_lock);
+       down_read(&cathandle->lgh_lock);
        list_for_each_entry(loghandle, &cathandle->u.chd.chd_head,
                            u.phd.phd_entry) {
                struct llog_logid *cgl = &loghandle->lgh_id;
@@ -398,11 +410,11 @@ int llog_cat_id2handle(const struct lu_env *env, struct llog_handle *cathandle,
                                continue;
                        }
                        loghandle->u.phd.phd_cat_handle = cathandle;
-                       up_write(&cathandle->lgh_lock);
+                       up_read(&cathandle->lgh_lock);
                        RETURN(rc);
                }
        }
-       up_write(&cathandle->lgh_lock);
+       up_read(&cathandle->lgh_lock);
 
        rc = llog_open(env, cathandle->lgh_ctxt, &loghandle, logid, NULL,
                       LLOG_OPEN_EXISTS);
@@ -423,7 +435,7 @@ int llog_cat_id2handle(const struct lu_env *env, struct llog_handle *cathandle,
        *res = llog_handle_get(loghandle);
        LASSERT(*res);
        down_write(&cathandle->lgh_lock);
-       list_add_tail(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
+       list_add(&loghandle->u.phd.phd_entry, &cathandle->u.chd.chd_head);
        up_write(&cathandle->lgh_lock);
 
        loghandle->u.phd.phd_cat_handle = cathandle;
@@ -483,73 +495,70 @@ EXPORT_SYMBOL(llog_cat_close);
 static struct llog_handle *llog_cat_current_log(struct llog_handle *cathandle,
                                                struct thandle *th)
 {
-        struct llog_handle *loghandle = NULL;
-        ENTRY;
+       struct llog_handle *loghandle = NULL;
 
+       ENTRY;
 
        if (CFS_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED2)) {
-               down_write_nested(&cathandle->lgh_lock, LLOGH_CAT);
+               loghandle = cathandle->u.chd.chd_current_log;
                GOTO(next, loghandle);
        }
 
-       down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
-        loghandle = cathandle->u.chd.chd_current_log;
-        if (loghandle) {
-               struct llog_log_hdr *llh;
-
-               down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
-               llh = loghandle->lgh_hdr;
-               if (llh == NULL || !llog_is_full(loghandle)) {
-                       up_read(&cathandle->lgh_lock);
-                        RETURN(loghandle);
-                } else {
-                       up_write(&loghandle->lgh_lock);
-                }
-        }
-       up_read(&cathandle->lgh_lock);
-
-       /* time to use next log */
-
-       /* first, we have to make sure the state hasn't changed */
-       down_write_nested(&cathandle->lgh_lock, LLOGH_CAT);
+retry:
        loghandle = cathandle->u.chd.chd_current_log;
-       if (loghandle) {
+       if (likely(loghandle)) {
                struct llog_log_hdr *llh;
 
                down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
                llh = loghandle->lgh_hdr;
                if (llh == NULL || !llog_is_full(loghandle))
-                       GOTO(out_unlock, loghandle);
+                       RETURN(loghandle);
                else
                        up_write(&loghandle->lgh_lock);
        }
 
+       /* time to use next log */
 next:
-       /* Sigh, the chd_next_log and chd_current_log is initialized
-        * in declare phase, and we do not serialize the catlog
-        * accessing, so it might be possible the llog creation
-        * thread (see llog_cat_declare_add_rec()) did not create
-        * llog successfully, then the following thread might
-        * meet this situation. */
-       if (IS_ERR_OR_NULL(cathandle->u.chd.chd_next_log)) {
-               CERROR("%s: next log does not exist!\n",
-                      loghandle2name(cathandle));
-               loghandle = ERR_PTR(-EIO);
-               if (cathandle->u.chd.chd_next_log == NULL) {
-                       /* Store the error in chd_next_log, so
-                        * the following process can get correct
-                        * failure value */
-                       cathandle->u.chd.chd_next_log = loghandle;
+       /* first, we have to make sure the state hasn't changed */
+       down_write_nested(&cathandle->lgh_lock, LLOGH_CAT);
+       if (unlikely(loghandle == cathandle->u.chd.chd_current_log)) {
+               struct llog_logid lid = {.lgl_oi.oi.oi_id = 0,
+                                        .lgl_oi.oi.oi_seq = 0,
+                                        .lgl_ogen = 0};
+               /* Sigh, the chd_next_log and chd_current_log is initialized
+                * in declare phase, and we do not serialize the catlog
+                * accessing, so it might be possible the llog creation
+                * thread (see llog_cat_declare_add_rec()) did not create
+                * llog successfully, then the following thread might
+                * meet this situation.
+                */
+               if (IS_ERR_OR_NULL(cathandle->u.chd.chd_next_log)) {
+                       CERROR("%s: next log does not exist, catalog "DFID" rc=%d\n",
+                              loghandle2name(cathandle),
+                              PLOGID(&cathandle->lgh_id), -EIO);
+                       loghandle = ERR_PTR(-EIO);
+                       if (cathandle->u.chd.chd_next_log == NULL) {
+                               /* Store the error in chd_next_log, so
+                                * the following process can get correct
+                                * failure value
+                                */
+                               cathandle->u.chd.chd_next_log = loghandle;
+                       }
+                       GOTO(out_unlock, loghandle);
                }
-               GOTO(out_unlock, loghandle);
-       }
-
-       CDEBUG(D_INODE, "use next log\n");
+               if (!IS_ERR_OR_NULL(loghandle))
+                       lid = loghandle->lgh_id;
 
-       loghandle = cathandle->u.chd.chd_next_log;
-       cathandle->u.chd.chd_current_log = loghandle;
-       cathandle->u.chd.chd_next_log = NULL;
-       down_write_nested(&loghandle->lgh_lock, LLOGH_LOG);
+               CDEBUG(D_OTHER, "%s: use next log "DFID"->"DFID" catalog "DFID"\n",
+                      loghandle2name(cathandle), PLOGID(&lid),
+                      PLOGID(&cathandle->u.chd.chd_next_log->lgh_id),
+                      PLOGID(&cathandle->lgh_id));
+               loghandle = cathandle->u.chd.chd_next_log;
+               cathandle->u.chd.chd_current_log = loghandle;
+               cathandle->u.chd.chd_next_log = NULL;
+       }
+       up_write(&cathandle->lgh_lock);
+       GOTO(retry, loghandle);
 
 out_unlock:
        up_write(&cathandle->lgh_lock);
@@ -583,14 +592,24 @@ retry:
                rc = llog_cat_new_log(env, cathandle, loghandle, th);
                if (rc < 0) {
                        up_write(&loghandle->lgh_lock);
-                       /* nobody should be trying to use this llog */
-                       down_write(&cathandle->lgh_lock);
-                       if (cathandle->u.chd.chd_current_log == loghandle)
-                               cathandle->u.chd.chd_current_log = NULL;
-                       up_write(&cathandle->lgh_lock);
+                       /* When ENOSPC happened no need to drop loghandle
+                        * a new one would be allocated anyway for next llog_add
+                        * so better to stay with the old.
+                        */
+                       if (rc != -ENOSPC) {
+                               /* nobody should be trying to use this llog */
+                               down_write(&cathandle->lgh_lock);
+                               if (cathandle->u.chd.chd_current_log ==
+                                   loghandle)
+                                       cathandle->u.chd.chd_current_log = NULL;
+                               list_del_init(&loghandle->u.phd.phd_entry);
+                               up_write(&cathandle->lgh_lock);
+                               llog_close(env, loghandle);
+                       }
                        RETURN(rc);
                }
        }
+
        /* now let's try to add the record */
        rc = llog_write_rec(env, loghandle, rec, reccookie, LLOG_NEXT_IDX, th);
        if (rc < 0) {
@@ -633,22 +652,28 @@ int llog_cat_declare_add_rec(const struct lu_env *env,
        ENTRY;
 
 start:
-       down_read_nested(&cathandle->lgh_lock, LLOGH_CAT);
+       CDEBUG(D_INFO, "Declare adding to "DOSTID" flags %x count %d\n",
+              POSTID(&cathandle->lgh_id.lgl_oi),
+              cathandle->lgh_hdr->llh_flags, cathandle->lgh_hdr->llh_count);
+
+
        rc = llog_cat_prep_log(env, cathandle,
                               &cathandle->u.chd.chd_current_log, th);
        if (rc)
-               GOTO(unlock, rc);
+               RETURN(rc);
 
+       /* For local llog this would always reserves credits for creation */
        rc = llog_cat_prep_log(env, cathandle, &cathandle->u.chd.chd_next_log,
                               th);
        if (rc)
-               GOTO(unlock, rc);
+               RETURN(rc);
 
        rc = llog_declare_write_rec(env, cathandle->u.chd.chd_current_log,
                                    rec, -1, th);
        if (rc == -ESTALE && dt_object_remote(cathandle->lgh_obj)) {
-               up_read(&cathandle->lgh_lock);
+               down_write(&cathandle->lgh_lock);
                rc = llog_cat_refresh(env, cathandle);
+               up_write(&cathandle->lgh_lock);
                if (rc)
                        RETURN(rc);
                goto start;
@@ -665,8 +690,6 @@ start:
        rc = llog_declare_write_rec(env, cathandle->u.chd.chd_next_log, rec, -1,
                                    th);
 #endif
-unlock:
-       up_read(&cathandle->lgh_lock);
        RETURN(rc);
 }
 EXPORT_SYMBOL(llog_cat_declare_add_rec);
index 9151cb4..158a469 100644 (file)
@@ -572,7 +572,7 @@ static int llog_osd_write_rec(const struct lu_env *env,
 
        if (loghandle->lgh_max_size > 0 &&
            lgi->lgi_off >= loghandle->lgh_max_size) {
-               CDEBUG(D_OTHER, "llog is getting too large (%u > %u) at %u "
+               CDEBUG(D_OTHER, "llog is getting too large (%u >= %u) at %u "
                       DFID"\n", (unsigned)lgi->lgi_off,
                       loghandle->lgh_max_size, (int)loghandle->lgh_last_idx,
                       PLOGID(&loghandle->lgh_id));
@@ -734,6 +734,16 @@ out_unlock:
        if (rc < 0)
                GOTO(out, rc);
 
+       if (loghandle->lgh_max_size > 0 &&
+           lgi->lgi_off >= loghandle->lgh_max_size) {
+               CDEBUG(D_OTHER, "llog is getting too large (%u >= %u) at %u "
+                      DFID"\n", (unsigned int)lgi->lgi_off,
+                      loghandle->lgh_max_size, (int)loghandle->lgh_last_idx,
+                      PLOGID(&loghandle->lgh_id));
+               /* this is to signal that this llog is full */
+               loghandle->lgh_last_idx = llog_max_idx(llh);
+       }
+
        up_write(&loghandle->lgh_last_sem);
 
        CDEBUG(D_HA, "added record "DFID".%u, %u off%llu\n",
@@ -1116,11 +1126,12 @@ static int llog_osd_next_block(const struct lu_env *env,
                               int next_idx, __u64 *cur_offset, void *buf,
                               int len)
 {
-       struct llog_thread_info *lgi = llog_info(env);
-       struct dt_object        *o;
-       struct dt_device        *dt;
-       int                      rc;
-       __u32                   chunk_size;
+       struct llog_thread_info *lgi = llog_info(env);
+       struct dt_object *o;
+       struct dt_device *dt;
+       int rc;
+       __u32 chunk_size;
+       __u32 tail_len;
        int last_idx = *cur_idx;
        __u64 last_offset = *cur_offset;
        bool force_mini_rec = !next_idx;
@@ -1217,15 +1228,28 @@ static int llog_osd_next_block(const struct lu_env *env,
                        lustre_swab_llog_rec(rec);
                tail = (struct llog_rec_tail *)((char *)buf + rc -
                                                sizeof(struct llog_rec_tail));
+               tail_len = tail->lrt_len;
+               /* base on tail_len do swab */
+               if (tail_len > chunk_size) {
+                       __swab32s(&tail_len);
+                       if (tail_len > chunk_size) {
+                               CERROR("%s: invalid llog tail at log id "DFID":%x offset %llu tail idx %u lrt len %u read_size %d\n",
+                                       o->do_lu.lo_dev->ld_obd->obd_name,
+                                       PFID(&loghandle->lgh_id.lgl_oi.oi_fid),
+                                       loghandle->lgh_id.lgl_ogen, *cur_offset,
+                                       tail->lrt_index, tail->lrt_len, rc);
+                               /* tail is broken */
+                               GOTO(out, rc = -EINVAL);
+                       }
+               }
+               /* get the last record in block */
+               last_rec = (struct llog_rec_hdr *)((char *)tail - tail_len +
+                               sizeof(struct llog_rec_tail));
 
                /* caller handles bad records if any */
                if (llog_verify_record(loghandle, rec))
                        GOTO(out, rc = 0);
 
-               /* get the last record in block */
-               last_rec = (struct llog_rec_hdr *)((char *)buf + rc -
-                                                  tail->lrt_len);
-
                if (LLOG_REC_HDR_NEEDS_SWABBING(last_rec))
                        lustre_swab_llog_rec(last_rec);
 
@@ -1310,110 +1334,14 @@ static int llog_osd_prev_block(const struct lu_env *env,
                               struct llog_handle *loghandle,
                               int prev_idx, void *buf, int len)
 {
-       struct llog_thread_info *lgi = llog_info(env);
-       struct dt_object        *o;
-       struct dt_device        *dt;
-       loff_t                   cur_offset;
-       __u32                   chunk_size;
-       int                      rc;
-
-       ENTRY;
-
-       chunk_size = loghandle->lgh_hdr->llh_hdr.lrh_len;
-       if (len == 0 || len & (chunk_size - 1))
-               RETURN(-EINVAL);
-
-       CDEBUG(D_OTHER, "looking for log index %u\n", prev_idx);
-
-       LASSERT(loghandle);
-       LASSERT(loghandle->lgh_ctxt);
-
-       o = loghandle->lgh_obj;
-       LASSERT(o);
-       dt_read_lock(env, o, 0);
-       if (!llog_osd_exist(loghandle))
-               GOTO(out, rc = -ESTALE);
-
-       dt = lu2dt_dev(o->do_lu.lo_dev);
-       LASSERT(dt);
-
-       /* Let's only use mini record size for previous block read
-        * for now XXX */
-       cur_offset = chunk_size;
-       llog_skip_over(loghandle, &cur_offset, 0, prev_idx,
-                      chunk_size, true);
-
-       rc = dt_attr_get(env, o, &lgi->lgi_attr);
-       if (rc)
-               GOTO(out, rc);
-
-       while (cur_offset < lgi->lgi_attr.la_size) {
-               struct llog_rec_hdr     *rec, *last_rec;
-               struct llog_rec_tail    *tail;
-
-               lgi->lgi_buf.lb_len = len;
-               lgi->lgi_buf.lb_buf = buf;
-               rc = dt_read(env, o, &lgi->lgi_buf, &cur_offset);
-               if (rc < 0) {
-                       CERROR("%s: can't read llog block from log "DFID
-                              " offset %llu: rc = %d\n",
-                              o->do_lu.lo_dev->ld_obd->obd_name,
-                              PFID(lu_object_fid(&o->do_lu)), cur_offset, rc);
-                       GOTO(out, rc);
-               }
-
-               if (rc == 0) /* end of file, nothing to do */
-                       GOTO(out, rc);
-
-               if (rc < sizeof(*tail)) {
-                       CERROR("%s: invalid llog block at log id "DFID" offset %llu\n",
-                              o->do_lu.lo_dev->ld_obd->obd_name,
-                              PLOGID(&loghandle->lgh_id), cur_offset);
-                       GOTO(out, rc = -EINVAL);
-               }
-
-               rec = buf;
-               if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
-                       lustre_swab_llog_rec(rec);
-
-               tail = (struct llog_rec_tail *)((char *)buf + rc -
-                                               sizeof(struct llog_rec_tail));
-               /* get the last record in block */
-               last_rec = (struct llog_rec_hdr *)((char *)buf + rc -
-                                                  le32_to_cpu(tail->lrt_len));
-
-               if (LLOG_REC_HDR_NEEDS_SWABBING(last_rec))
-                       lustre_swab_llog_rec(last_rec);
-               LASSERT(last_rec->lrh_index == tail->lrt_index);
-
-               /* this shouldn't happen */
-               if (tail->lrt_index == 0) {
-                       CERROR("%s: invalid llog tail at log id "DFID" offset %llu\n",
-                              o->do_lu.lo_dev->ld_obd->obd_name,
-                              PLOGID(&loghandle->lgh_id), cur_offset);
-                       GOTO(out, rc = -EINVAL);
-               }
-               if (tail->lrt_index < prev_idx)
-                       continue;
-
-               /* sanity check that the start of the new buffer is no farther
-                * than the record that we wanted.  This shouldn't happen. */
-               if (rec->lrh_index > prev_idx) {
-                       CERROR("%s: missed desired record? %u > %u\n",
-                              o->do_lu.lo_dev->ld_obd->obd_name,
-                              rec->lrh_index, prev_idx);
-                       GOTO(out, rc = -ENOENT);
-               }
+       loff_t cur_offset;
+       int cur_idx;
 
-               /* Trim unsupported extensions for compat w/ older clients */
-               changelog_block_trim_ext(rec, last_rec, loghandle);
+       cur_offset = loghandle->lgh_hdr->llh_hdr.lrh_len;
+       cur_idx = 1;
 
-               GOTO(out, rc = 0);
-       }
-       GOTO(out, rc = -EIO);
-out:
-       dt_read_unlock(env, o);
-       return rc;
+       return llog_osd_next_block(env, loghandle, &cur_idx, prev_idx,
+                                  &cur_offset, buf, len);
 }
 
 /**
@@ -1602,7 +1530,11 @@ after_open:
        RETURN(rc);
 
 out_put:
-       dt_object_put(env, o);
+       if (ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID)
+               /* according to llog_osd_close() */
+               dt_object_put_nocache(env, o);
+       else
+               dt_object_put(env, o);
 out_name:
        OBD_FREE(handle->lgh_name, strlen(name) + 1);
 out:
index 474729f..46f89fa 100644 (file)
@@ -183,17 +183,17 @@ static int llog_client_next_block(const struct lu_env *env,
        ptlrpc_request_set_replen(req);
        rc = ptlrpc_queue_wait(req);
        /*
-        * -EIO has a special meaning here. If llog_osd_next_block()
+        * -EBADR has a special meaning here. If llog_osd_next_block()
         * reaches the end of the log without finding the desired
         * record then it updates *cur_offset and *cur_idx and returns
-        * -EIO. In llog_process_thread() we use this to detect
-        * EOF. But we must be careful to distinguish between -EIO
-        * coming from llog_osd_next_block() and -EIO coming from
+        * -EBADR. In llog_process_thread() we use this to detect
+        * EOF. But we must be careful to distinguish between -EBADR
+        * coming from llog_osd_next_block() and -EBADR coming from
         * ptlrpc or below.
         */
-       if (rc == -EIO) {
+       if (rc == -EBADR) {
                if (!req->rq_repmsg ||
-                   lustre_msg_get_status(req->rq_repmsg) != -EIO)
+                   lustre_msg_get_status(req->rq_repmsg) != -EBADR)
                        GOTO(out, rc);
        } else if (rc < 0) {
                GOTO(out, rc);