Whamcloud - gitweb
LU-14098 obdclass: try to skip corrupted llog records 96/44396/2
authorAlex Zhuravlev <bzzz@whamcloud.com>
Mon, 26 Jul 2021 06:18:06 +0000 (09:18 +0300)
committerOleg Drokin <green@whamcloud.com>
Mon, 13 Sep 2021 19:06:28 +0000 (19:06 +0000)
if llog's header or record is found corrupted, then
ignore the remaining records and try with the next one.

Lustre-commit: 910eb97c1b43a44a9da2ae14c3b83e28ca6342fc
Lustre-change: https://review.whamcloud.com/40754

Signed-off-by: Alex Zhuravlev <bzzz@whamcloud.com>
Change-Id: I86a682a8874a2184e8891ff0ee8a68414d232a79
Reviewed-on: https://review.whamcloud.com/44396
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Mike Pershin <mpershin@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/uapi/linux/lustre/lustre_idl.h
lustre/obdclass/llog.c
lustre/obdclass/llog_cat.c
lustre/obdclass/llog_internal.h
lustre/obdclass/llog_osd.c
lustre/obdclass/llog_test.c
lustre/osp/osp_sync.c

index 3bcc641..91bb142 100644 (file)
@@ -2933,6 +2933,7 @@ enum llog_flag {
        LLOG_F_EXT_X_NID        = 0x80,
        LLOG_F_EXT_X_OMODE      = 0x100,
        LLOG_F_EXT_X_XATTR      = 0x200,
+       LLOG_F_RM_ON_ERR        = 0x400,
 
        /* Note: Flags covered by LLOG_F_EXT_MASK will be inherited from
         * catlog to plain log, so do not add LLOG_F_IS_FIXSIZE here,
index 486c3a1..e9228b3 100644 (file)
@@ -50,6 +50,7 @@
 #include <obd_support.h>
 #include <obd_class.h>
 #include "llog_internal.h"
+
 /*
  * Allocate a new log or catalog handle
  * Used inside llog_open().
@@ -303,7 +304,7 @@ int llog_cancel_rec(const struct lu_env *env, struct llog_handle *loghandle,
                         * be accessed anymore, let's return 0 for now, and
                         * the orphan will be handled by LFSCK. */
                        CERROR("%s: can't destroy empty llog "DFID": rc = %d\n",
-                              loghandle->lgh_ctxt->loc_obd->obd_name,
+                              loghandle2name(loghandle),
                               PFID(&loghandle->lgh_id.lgl_oi.oi_fid), rc);
                        GOTO(out_unlock, rc = 0);
                }
@@ -398,7 +399,7 @@ int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
                             (llh->llh_flags & LLOG_F_IS_CAT &&
                              flags & LLOG_F_IS_PLAIN))) {
                        CERROR("%s: llog type is %s but initializing %s\n",
-                              handle->lgh_ctxt->loc_obd->obd_name,
+                              loghandle2name(handle),
                               llh->llh_flags & LLOG_F_IS_CAT ?
                               "catalog" : "plain",
                               flags & LLOG_F_IS_CAT ? "catalog" : "plain");
@@ -418,7 +419,7 @@ int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
                if (unlikely(uuid &&
                             !obd_uuid_equals(uuid, &llh->llh_tgtuuid))) {
                        CERROR("%s: llog uuid mismatch: %s/%s\n",
-                              handle->lgh_ctxt->loc_obd->obd_name,
+                              loghandle2name(handle),
                               (char *)uuid->uuid,
                               (char *)llh->llh_tgtuuid.uuid);
                        GOTO(out, rc = -EEXIST);
@@ -431,8 +432,8 @@ int llog_init_handle(const struct lu_env *env, struct llog_handle *handle,
                llh->llh_flags |= LLOG_F_IS_FIXSIZE;
        } else if (!(flags & LLOG_F_IS_PLAIN)) {
                CERROR("%s: unknown flags: %#x (expected %#x or %#x)\n",
-                      handle->lgh_ctxt->loc_obd->obd_name,
-                      flags, LLOG_F_IS_CAT, LLOG_F_IS_PLAIN);
+                      loghandle2name(handle), flags, LLOG_F_IS_CAT,
+                      LLOG_F_IS_PLAIN);
                rc = -EINVAL;
        }
        llh->llh_flags |= fmt;
@@ -445,6 +446,30 @@ out:
 }
 EXPORT_SYMBOL(llog_init_handle);
 
+int llog_verify_record(const struct llog_handle *llh, struct llog_rec_hdr *rec)
+{
+       int chunk_size = llh->lgh_hdr->llh_hdr.lrh_len;
+
+       if (rec->lrh_len == 0 || rec->lrh_len > chunk_size) {
+               CERROR("%s: record is too large: %d > %d\n",
+                      loghandle2name(llh), rec->lrh_len, chunk_size);
+               return -EINVAL;
+       }
+       if (rec->lrh_index >= LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr)) {
+               CERROR("%s: index is too high: %d\n",
+                      loghandle2name(llh), rec->lrh_index);
+               return -EINVAL;
+       }
+       if ((rec->lrh_type & LLOG_OP_MASK) != LLOG_OP_MAGIC) {
+               CERROR("%s: magic %x is bad\n",
+                      loghandle2name(llh), rec->lrh_type);
+               return -EINVAL;
+       }
+
+       return 0;
+}
+EXPORT_SYMBOL(llog_verify_record);
+
 static int llog_process_thread(void *arg)
 {
        struct llog_process_info        *lpi = arg;
@@ -459,6 +484,7 @@ static int llog_process_thread(void *arg)
        int                              saved_index = 0;
        int                              last_called_index = 0;
        bool                             repeated = false;
+       bool                            refresh_idx = false;
 
        ENTRY;
 
@@ -606,15 +632,21 @@ repeat:
 
                        repeated = false;
 
-                       if (rec->lrh_len == 0 || rec->lrh_len > chunk_size) {
-                               CWARN("%s: invalid length %d in llog "DFID
-                                     "record for index %d/%d\n",
-                                      loghandle->lgh_ctxt->loc_obd->obd_name,
-                                      rec->lrh_len,
+                       rc = llog_verify_record(loghandle, rec);
+                       if (rc) {
+                               CERROR("%s: invalid record in llog "DFID
+                                      " record for index %d/%d: rc = %d\n",
+                                      loghandle2name(loghandle),
                                       PFID(&loghandle->lgh_id.lgl_oi.oi_fid),
-                                      rec->lrh_index, index);
-
-                               GOTO(out, rc = -EINVAL);
+                                      rec->lrh_index, index, rc);
+                               /*
+                                * the block seem to be corrupted, let's try
+                                * with the next one. reset rc to go to the
+                                * next chunk.
+                                */
+                               refresh_idx = true;
+                               index = 0;
+                               GOTO(repeat, rc = 0);
                        }
 
                        if (rec->lrh_index < index) {
@@ -624,12 +656,22 @@ repeat:
                        }
 
                        if (rec->lrh_index != index) {
-                               CERROR("%s: "DFID" Invalid record: index %u"
-                                      " but expected %u\n",
-                                      loghandle->lgh_ctxt->loc_obd->obd_name,
-                                      PFID(&loghandle->lgh_id.lgl_oi.oi_fid),
-                                      rec->lrh_index, index);
-                               GOTO(out, rc = -ERANGE);
+                               /*
+                                * the last time we couldn't parse the block due
+                                * to corruption, thus has no idea about the
+                                * next index, take it from the block, once.
+                                */
+                               if (refresh_idx) {
+                                       refresh_idx = false;
+                                       index = rec->lrh_index;
+                               } else {
+                                       CERROR("%s: "DFID" Invalid record: index"
+                                              " %u but expected %u\n",
+                                              loghandle2name(loghandle),
+                                              PFID(&loghandle->lgh_id.lgl_oi.oi_fid),
+                                              rec->lrh_index, index);
+                                       GOTO(out, rc = -ERANGE);
+                               }
                        }
 
                        CDEBUG(D_OTHER,
@@ -715,7 +757,7 @@ out:
                         * retry until the umount or abort recovery, see
                         * lod_sub_recovery_thread() */
                        CERROR("%s retry remote llog process\n",
-                              loghandle->lgh_ctxt->loc_obd->obd_name);
+                              loghandle2name(loghandle));
                        rc = -EAGAIN;
                } else {
                        /* something bad happened to the processing of a local
@@ -724,7 +766,7 @@ out:
                         * discard any remaining bits in the header */
                        CERROR("%s: Local llog found corrupted #"DOSTID":%x"
                               " %s index %d count %d\n",
-                              loghandle->lgh_ctxt->loc_obd->obd_name,
+                              loghandle2name(loghandle),
                               POSTID(&loghandle->lgh_id.lgl_oi),
                               loghandle->lgh_id.lgl_ogen,
                               ((llh->llh_flags & LLOG_F_IS_CAT) ? "catalog" :
@@ -820,7 +862,7 @@ int llog_process_or_fork(const struct lu_env *env,
                if (IS_ERR(task)) {
                        rc = PTR_ERR(task);
                        CERROR("%s: cannot start thread: rc = %d\n",
-                              loghandle->lgh_ctxt->loc_obd->obd_name, rc);
+                              loghandle2name(loghandle), rc);
                        GOTO(out_lpi, rc);
                }
                wait_for_completion(&lpi->lpi_completion);
@@ -1057,12 +1099,11 @@ int llog_write_rec(const struct lu_env *env, struct llog_handle *handle,
                RETURN(-EPROTO);
        } else if (th == NULL) {
                CERROR("%s: missed transaction handle\n",
-                       handle->lgh_obj->do_lu.lo_dev->ld_obd->obd_name);
+                      loghandle2name(handle));
                RETURN(-EPROTO);
        } else if (handle->lgh_hdr == NULL) {
                CERROR("%s: loghandle %p with no header\n",
-                       handle->lgh_obj->do_lu.lo_dev->ld_obd->obd_name,
-                       handle);
+                      loghandle2name(handle), handle);
                RETURN(-EPROTO);
        }
 
@@ -1434,8 +1475,8 @@ __u64 llog_size(const struct lu_env *env, struct llog_handle *llh)
        rc = llh->lgh_obj->do_ops->do_attr_get(env, llh->lgh_obj, &la);
        if (rc) {
                CERROR("%s: attr_get failed for "DFID": rc = %d\n",
-                      llh->lgh_ctxt->loc_obd->obd_name,
-                      PFID(&llh->lgh_id.lgl_oi.oi_fid), rc);
+                      loghandle2name(llh), PFID(&llh->lgh_id.lgl_oi.oi_fid),
+                      rc);
                return 0;
        }
 
index 1a9ae15..91f0290 100644 (file)
@@ -88,13 +88,12 @@ static int llog_cat_new_log(const struct lu_env *env,
                if (cathandle->lgh_name == NULL) {
                        CWARN("%s: there are no more free slots in catalog "
                              DFID":%x\n",
-                             loghandle->lgh_ctxt->loc_obd->obd_name,
+                             loghandle2name(loghandle),
                              PFID(&cathandle->lgh_id.lgl_oi.oi_fid),
                              cathandle->lgh_id.lgl_ogen);
                } else {
                        CWARN("%s: there are no more free slots in "
-                             "catalog %s\n",
-                             loghandle->lgh_ctxt->loc_obd->obd_name,
+                             "catalog %s\n", loghandle2name(loghandle),
                              cathandle->lgh_name);
                }
                RETURN(-ENOSPC);
@@ -153,7 +152,7 @@ static int llog_cat_new_log(const struct lu_env *env,
                GOTO(out, rc = 0);
        } else if (rc != 0) {
                CERROR("%s: can't create new plain llog in catalog: rc = %d\n",
-                      loghandle->lgh_ctxt->loc_obd->obd_name, rc);
+                      loghandle2name(loghandle), rc);
                GOTO(out, rc);
        }
 
@@ -375,7 +374,7 @@ int llog_cat_id2handle(const struct lu_env *env, struct llog_handle *cathandle,
                    ostid_seq(&cgl->lgl_oi) == ostid_seq(&logid->lgl_oi)) {
                        if (cgl->lgl_ogen != logid->lgl_ogen) {
                                CWARN("%s: log "DFID" generation %x != %x\n",
-                                     loghandle->lgh_ctxt->loc_obd->obd_name,
+                                     loghandle2name(loghandle),
                                      PFID(&logid->lgl_oi.oi_fid),
                                      cgl->lgl_ogen, logid->lgl_ogen);
                                continue;
@@ -383,7 +382,7 @@ int llog_cat_id2handle(const struct lu_env *env, struct llog_handle *cathandle,
                        *res = llog_handle_get(loghandle);
                        if (!*res) {
                                CERROR("%s: log "DFID" refcount is zero!\n",
-                                      loghandle->lgh_ctxt->loc_obd->obd_name,
+                                      loghandle2name(loghandle),
                                       PFID(&logid->lgl_oi.oi_fid));
                                continue;
                        }
@@ -398,8 +397,8 @@ int llog_cat_id2handle(const struct lu_env *env, struct llog_handle *cathandle,
                       LLOG_OPEN_EXISTS);
        if (rc < 0) {
                CERROR("%s: error opening log id "DFID":%x: rc = %d\n",
-                      cathandle->lgh_ctxt->loc_obd->obd_name,
-                      PFID(&logid->lgl_oi.oi_fid), logid->lgl_ogen, rc);
+                      loghandle2name(cathandle), PFID(&logid->lgl_oi.oi_fid),
+                      logid->lgl_ogen, rc);
                RETURN(rc);
        }
 
@@ -445,8 +444,7 @@ int llog_cat_close(const struct lu_env *env, struct llog_handle *cathandle)
                        if (rc)
                                CERROR("%s: failure destroying log during "
                                       "cleanup: rc = %d\n",
-                                      loghandle->lgh_ctxt->loc_obd->obd_name,
-                                      rc);
+                                      loghandle2name(loghandle), rc);
 
                        index = loghandle->u.phd.phd_cookie.lgc_index;
                        llog_cat_cleanup(env, cathandle, NULL, index);
@@ -532,7 +530,7 @@ next:
         * meet this situation. */
        if (IS_ERR_OR_NULL(cathandle->u.chd.chd_next_log)) {
                CERROR("%s: next log does not exist!\n",
-                      cathandle->lgh_ctxt->loc_obd->obd_name);
+                      loghandle2name(cathandle));
                loghandle = ERR_PTR(-EIO);
                if (cathandle->u.chd.chd_next_log == NULL) {
                        /* Store the error in chd_next_log, so
@@ -609,7 +607,7 @@ retry:
                if (retried++ == 0)
                        GOTO(retry, rc);
                CERROR("%s: error on 2nd llog: rc = %d\n",
-                      cathandle->lgh_ctxt->loc_obd->obd_name, rc);
+                      loghandle2name(cathandle), rc);
        }
 
        RETURN(rc);
@@ -719,8 +717,7 @@ int llog_cat_cancel_records(const struct lu_env *env,
                rc = llog_cat_id2handle(env, cathandle, &loghandle, lgl);
                if (rc) {
                        CDEBUG(D_HA, "%s: cannot find llog for handle "DFID":%x"
-                              ": rc = %d\n",
-                              cathandle->lgh_ctxt->loc_obd->obd_name,
+                              ": rc = %d\n", loghandle2name(cathandle),
                               PFID(&lgl->lgl_oi.oi_fid), lgl->lgl_ogen, rc);
                        failed++;
                        continue;
@@ -735,8 +732,7 @@ int llog_cat_cancel_records(const struct lu_env *env,
                         */
                        lrc = -ENOENT;
                        CDEBUG(D_HA, "%s: llog "DFID":%x does not exist"
-                              ": rc = %d\n",
-                              cathandle->lgh_ctxt->loc_obd->obd_name,
+                              ": rc = %d\n", loghandle2name(cathandle),
                               PFID(&lgl->lgl_oi.oi_fid), lgl->lgl_ogen, lrc);
                        failed++;
                        if (rc == 0)
@@ -763,8 +759,7 @@ int llog_cat_cancel_records(const struct lu_env *env,
        }
        if (rc)
                CERROR("%s: fail to cancel %d of %d llog-records: rc = %d\n",
-                      cathandle->lgh_ctxt->loc_obd->obd_name, failed, count,
-                      rc);
+                      loghandle2name(cathandle), failed, count, rc);
 
        RETURN(rc);
 }
@@ -783,7 +778,7 @@ static int llog_cat_process_common(const struct lu_env *env,
        if (rec->lrh_type != le32_to_cpu(LLOG_LOGID_MAGIC)) {
                rc = -EINVAL;
                CWARN("%s: invalid record in catalog "DFID":%x: rc = %d\n",
-                     cat_llh->lgh_ctxt->loc_obd->obd_name,
+                     loghandle2name(cat_llh),
                      PFID(&cat_llh->lgh_id.lgl_oi.oi_fid),
                      cat_llh->lgh_id.lgl_ogen, rc);
                RETURN(rc);
@@ -803,7 +798,7 @@ static int llog_cat_process_common(const struct lu_env *env,
                        rc = LLOG_DEL_RECORD;
                else if (rc)
                        CWARN("%s: can't find llog handle "DFID":%x: rc = %d\n",
-                             cat_llh->lgh_ctxt->loc_obd->obd_name,
+                             loghandle2name(cat_llh),
                              PFID(&lir->lid_id.lgl_oi.oi_fid),
                              lir->lid_id.lgl_ogen, rc);
 
@@ -819,7 +814,7 @@ static int llog_cat_process_common(const struct lu_env *env,
                rc = llog_destroy(env, *llhp);
                if (rc)
                        CWARN("%s: can't destroy empty log "DFID": rc = %d\n",
-                             (*llhp)->lgh_ctxt->loc_obd->obd_name,
+                             loghandle2name((*llhp)),
                              PFID(&lir->lid_id.lgl_oi.oi_fid), rc);
                rc = LLOG_DEL_PLAIN;
        }
@@ -856,6 +851,16 @@ static int llog_cat_process_cb(const struct lu_env *env,
                rc = llog_process_or_fork(env, llh, d->lpd_cb, d->lpd_data,
                                          NULL, false);
        }
+       if (rc == -ENOENT && (cat_llh->lgh_hdr->llh_flags & LLOG_F_RM_ON_ERR)) {
+               /*
+                * plain llog is reported corrupted, so better to just remove
+                * it if the caller is fine with that.
+                */
+               CERROR("%s: remove corrupted/missing llog "DFID"\n",
+                      loghandle2name(cat_llh),
+                      PFID(&llh->lgh_id.lgl_oi.oi_fid));
+               rc = LLOG_DEL_PLAIN;
+       }
 
 out:
        /* The empty plain log was destroyed while processing */
@@ -895,7 +900,7 @@ int llog_cat_process_or_fork(const struct lu_env *env,
                struct llog_process_cat_data cd;
 
                CWARN("%s: catlog "DFID" crosses index zero\n",
-                     cat_llh->lgh_ctxt->loc_obd->obd_name,
+                     loghandle2name(cat_llh),
                      PFID(&cat_llh->lgh_id.lgl_oi.oi_fid));
                /*startcat = 0 is default value for general processing */
                if ((startcat != LLOG_CAT_FIRST &&
@@ -1060,7 +1065,7 @@ int llog_cat_reverse_process(const struct lu_env *env,
        if (llh->llh_cat_idx >= cat_llh->lgh_last_idx &&
            llh->llh_count > 1) {
                CWARN("%s: catalog "DFID" crosses index zero\n",
-                     cat_llh->lgh_ctxt->loc_obd->obd_name,
+                     loghandle2name(cat_llh),
                      PFID(&cat_llh->lgh_id.lgl_oi.oi_fid));
 
                cd.lpcd_first_idx = 0;
index 2642529..c42f13e 100644 (file)
@@ -92,4 +92,9 @@ static inline struct llog_rec_hdr *llog_rec_hdr_next(struct llog_rec_hdr *rec)
 {
        return (struct llog_rec_hdr *)((char *)rec + rec->lrh_len);
 }
+int llog_verify_record(const struct llog_handle *llh, struct llog_rec_hdr *rec);
+static inline char *loghandle2name(const struct llog_handle *lgh)
+{
+       return lgh->lgh_ctxt->loc_obd->obd_name;
+}
 #endif
index 54f3dd4..55088d4 100644 (file)
@@ -966,9 +966,25 @@ static int llog_osd_next_block(const struct lu_env *env,
                rec = buf;
                if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
                        lustre_swab_llog_rec(rec);
-
                tail = (struct llog_rec_tail *)((char *)buf + rc -
                                                sizeof(struct llog_rec_tail));
+
+               if (llog_verify_record(loghandle, rec)) {
+                       /*
+                        * the block seems corrupted. make a pad record so the
+                        * caller can skip the block and try with the next one
+                        */
+                       rec->lrh_len = rc;
+                       rec->lrh_index = next_idx;
+                       rec->lrh_type = LLOG_PAD_MAGIC;
+
+                       tail = rec_tail(rec);
+                       tail->lrt_len = rc;
+                       tail->lrt_index = next_idx;
+
+                       GOTO(out, rc = 0);
+               }
+
                /* get the last record in block */
                last_rec = (struct llog_rec_hdr *)((char *)buf + rc -
                                                   tail->lrt_len);
@@ -1007,7 +1023,7 @@ static int llog_osd_next_block(const struct lu_env *env,
 
                /* sanity check that the start of the new buffer is no farther
                 * than the record that we wanted.  This shouldn't happen. */
-               if (rec->lrh_index > next_idx) {
+               if (next_idx && rec->lrh_index > next_idx) {
                        if (!force_mini_rec && next_idx > last_idx)
                                goto retry;
 
index 856e8d0..f1517ce 100644 (file)
@@ -192,7 +192,7 @@ static int llog_test_2(const struct lu_env *env, struct obd_device *obd,
        logid = lgh->lgh_id;
 
        lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len = LLOG_MIN_REC_SIZE;
-       lmr.lmr_hdr.lrh_type = 0xf02f02;
+       lmr.lmr_hdr.lrh_type = LLOG_OP_MAGIC;
 
        /* Check llog header values are correct after record add/cancel */
        CWARN("2b: write 1 llog records, check llh_count\n");
@@ -591,7 +591,7 @@ static int llog_test_4(const struct lu_env *env, struct obd_device *obd)
        LASSERT(ctxt);
 
        lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len = LLOG_MIN_REC_SIZE;
-       lmr.lmr_hdr.lrh_type = 0xf00f00;
+       lmr.lmr_hdr.lrh_type = LLOG_OP_MAGIC;
 
        sprintf(name, "%x", llog_test_rand + 1);
        CWARN("4a: create a catalog log with name: %s\n", name);
@@ -780,7 +780,7 @@ static int llog_test_5(const struct lu_env *env, struct obd_device *obd)
        LASSERT(ctxt);
 
        lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len = LLOG_MIN_REC_SIZE;
-       lmr.lmr_hdr.lrh_type = 0xf00f00;
+       lmr.lmr_hdr.lrh_type = LLOG_OP_MAGIC;
 
        CWARN("5a: re-open catalog by id\n");
        rc = llog_open(env, ctxt, &llh, &cat_logid, NULL, LLOG_OPEN_EXISTS);
@@ -1202,7 +1202,7 @@ static int llog_test_8(const struct lu_env *env, struct obd_device *obd)
        LASSERT(ctxt);
 
        lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len = LLOG_MIN_REC_SIZE;
-       lmr.lmr_hdr.lrh_type = 0xf00f00;
+       lmr.lmr_hdr.lrh_type = LLOG_OP_MAGIC;
 
        CWARN("8a: fill the first plain llog\n");
        rc = llog_open(env, ctxt, &llh, &cat_logid, NULL, LLOG_OPEN_EXISTS);
@@ -1503,7 +1503,7 @@ static int llog_test_10(const struct lu_env *env, struct obd_device *obd)
        LASSERT(ctxt);
 
        lmr.lmr_hdr.lrh_len = lmr.lmr_tail.lrt_len = LLOG_MIN_REC_SIZE;
-       lmr.lmr_hdr.lrh_type = 0xf00f00;
+       lmr.lmr_hdr.lrh_type = LLOG_OP_MAGIC;
 
        snprintf(name, sizeof(name), "%x", llog_test_rand + 2);
        CWARN("10a: create a catalog log with name: %s\n", name);
index b18317a..83bb58d 100644 (file)
@@ -1270,7 +1270,7 @@ next:
 
                CERROR("%s: llog process with osp_sync_process_queues "
                       "failed: %d\n", d->opd_obd->obd_name, rc);
-               GOTO(close, rc);
+               GOTO(wait, rc);
        }
        LASSERTF(rc == 0 || rc == LLOG_PROC_BREAK,
                 "%u changes, %u in progress, %u in flight: %d\n",
@@ -1285,6 +1285,7 @@ next:
                 atomic_read(&d->opd_sync_rpcs_in_progress),
                 atomic_read(&d->opd_sync_rpcs_in_flight));
 
+wait:
        /* wait till all the requests are completed */
        count = 0;
        while (atomic_read(&d->opd_sync_rpcs_in_progress) > 0) {
@@ -1304,7 +1305,6 @@ next:
 
        }
 
-close:
        llog_cat_close(&env, llh);
        rc = llog_cleanup(&env, ctxt);
        if (rc)
@@ -1428,7 +1428,7 @@ static int osp_sync_llog_init(const struct lu_env *env, struct osp_device *d)
        LASSERT(lgh != NULL);
        ctxt->loc_handle = lgh;
 
-       rc = llog_init_handle(env, lgh, LLOG_F_IS_CAT, NULL);
+       rc = llog_init_handle(env, lgh, LLOG_F_IS_CAT | LLOG_F_RM_ON_ERR, NULL);
        if (rc)
                GOTO(out_close, rc);