Whamcloud - gitweb
LU-18218 mdd: changelog specific write function 42/56342/23
authorAlexander Boyko <alexander.boyko@hpe.com>
Fri, 30 Aug 2024 17:12:09 +0000 (13:12 -0400)
committerOleg Drokin <green@whamcloud.com>
Wed, 7 May 2025 21:10:54 +0000 (21:10 +0000)
Don't use a general llog_osd_write_rec() for a changelog,
it has too many synchronisation for a consistency, and additional
complexity for a remote writes. And as a result changelog enabling
slowdowns MDT IO performance by 3x. The main idea of this patch is
parallel writers to a local file, some inconsistence would be
handled by a reader.

The lgh_hdr_mutex protects modifications at memory, there is no
need to cover dt_record_write() by it. Since write/cancel protects
by lgh_lock mutex. Let's change lgh_hdr_mutex to a spinlock.

Perfomance results for mdtest mean at cluster
Changelog no basic patch diff %
Directory creation 90152 32800 63827 94
Directory removal 96900 32928 61220 85
File creation 116109 52318 82698 58
File removal 119260 42845 80557 88

HPE-bug-id: LUS-11970
Signed-off-by: Alexander Boyko <alexander.boyko@hpe.com>
Change-Id: Icbe26e1198630dc72f4dfd9fca59718076579245
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/56342
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Alexey Lyashkov <alexey.lyashkov@hpe.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/lustre_log.h
lustre/include/uapi/linux/lustre/lustre_idl.h
lustre/mdd/mdd_device.c
lustre/mdd/mdd_dir.c
lustre/obdclass/llog.c
lustre/obdclass/llog_cat.c
lustre/obdclass/llog_osd.c
lustre/osd-zfs/osd_io.c
lustre/ptlrpc/llog_server.c
lustre/tests/conf-sanity.sh
lustre/tests/test-framework.sh

index 392564b..e692249 100644 (file)
@@ -269,7 +269,7 @@ struct llog_operations {
 /* In-memory descriptor for a log object or log catalog */
 struct llog_handle {
        struct rw_semaphore      lgh_lock;
-       struct mutex             lgh_hdr_mutex; /* protect lgh_hdr data */
+       spinlock_t               lgh_hdr_lock; /* protect lgh_hdr data */
        struct llog_logid        lgh_id; /* id of this log */
        struct llog_log_hdr     *lgh_hdr; /* may be vmalloc'd */
        size_t                  lgh_hdr_size;
index c1f98e9..1636271 100644 (file)
@@ -2893,6 +2893,10 @@ struct llog_rec_tail {
        (rec->lrh_len - sizeof(struct llog_rec_hdr) -           \
         sizeof(struct llog_rec_tail))
 
+#define REC_TAIL(rec)                                          \
+       ((struct llog_rec_tail *)((char *)rec + rec->lrh_len -  \
+                       sizeof(struct llog_rec_tail)))
+
 struct llog_logid_rec {
        struct llog_rec_hdr     lid_hdr;
        struct llog_logid       lid_id;
@@ -3086,6 +3090,7 @@ enum llog_flag {
        LLOG_F_RM_ON_ERR        = 0x400,
        LLOG_F_MAX_AGE          = 0x800,
        LLOG_F_EXT_X_NID_BE     = 0x1000,
+       LLOG_F_UNLCK_SEM        = 0x2000,
 
        /* Note: Flags covered by LLOG_F_EXT_MASK will be inherited from
         * catlog to plain log, so do not add LLOG_F_IS_FIXSIZE here,
@@ -3095,7 +3100,7 @@ enum llog_flag {
        LLOG_F_EXT_MASK = LLOG_F_EXT_JOBID | LLOG_F_EXT_EXTRA_FLAGS |
                          LLOG_F_EXT_X_UIDGID | LLOG_F_EXT_X_NID |
                          LLOG_F_EXT_X_OMODE | LLOG_F_EXT_X_XATTR |
-                         LLOG_F_EXT_X_NID_BE,
+                         LLOG_F_EXT_X_NID_BE | LLOG_F_UNLCK_SEM,
 };
 
 /* On-disk header structure of each log object, stored in little endian order */
index 1498371..cb4ce24 100644 (file)
@@ -531,7 +531,8 @@ static int mdd_changelog_llog_init(const struct lu_env *env,
        if (rc)
                GOTO(out_cleanup, rc);
 
-       rc = llog_init_handle(env, ctxt->loc_handle, LLOG_F_IS_CAT, NULL);
+       rc = llog_init_handle(env, ctxt->loc_handle, LLOG_F_IS_CAT |
+                             LLOG_F_UNLCK_SEM, NULL);
        if (rc)
                GOTO(out_close, rc);
 
index dc4fc61..694731b 100644 (file)
@@ -775,6 +775,17 @@ out_put:
        return rc;
 }
 
+/* The locking here is a bit tricky. For a CHANGELOG_REC the function
+ * drops loghandle->lgh_lock for a performance reasons. All dt_write()
+ * are used own offset, so it is safe.
+ * For other records general function is called and it doesnot drop
+ * a semaphore. The callers are changelog catalog records and initialisation
+ * records. llog_cat_new_log->llog_write_rec->mdd_changelog_write_rec()
+ *
+ * Since dt_record_write() could be reordered, rec1|rec2|0x0|rec4 could be
+ * at memory, reader should care about it. When the th is commited it is
+ * impossible to have a hole, since reordered records have the same th.
+ */
 int mdd_changelog_write_rec(const struct lu_env *env,
                            struct llog_handle *loghandle,
                            struct llog_rec_hdr *r,
@@ -782,36 +793,106 @@ int mdd_changelog_write_rec(const struct lu_env *env,
                            int idx, struct thandle *th)
 {
        int rc;
+       static struct thandle *saved_th;
+
+       CDEBUG(D_TRACE, "Adding rec %u type %u to "DFID" flags %x count %d\n",
+              idx, r->lrh_type, PLOGID(&loghandle->lgh_id),
+              loghandle->lgh_hdr->llh_flags, loghandle->lgh_hdr->llh_count);
 
        if (r->lrh_type == CHANGELOG_REC) {
                struct mdd_device *mdd;
                struct llog_changelog_rec *rec;
+               size_t left;
+               __u32 chunk_size = loghandle->lgh_hdr->llh_hdr.lrh_len;
+               struct dt_object *o = loghandle->lgh_obj;
+               loff_t offset;
+               struct lu_buf lgi_buf;
+
+               left = chunk_size - (loghandle->lgh_cur_offset &
+                                    (chunk_size - 1));
 
                mdd = lu2mdd_dev(loghandle->lgh_ctxt->loc_obd->obd_lu_dev);
                rec = container_of(r, struct llog_changelog_rec, cr_hdr);
 
+               /* Don't use padding records because it require a slot at header
+                * so previous result of checking llog_is_full(loghandle)
+                * would be invalid, leave zeroes at the end of block.
+                * A reader would care about it.
+                */
+               if (left != 0 && left < r->lrh_len)
+                       loghandle->lgh_cur_offset += left;
+
+               offset = loghandle->lgh_cur_offset;
+               loghandle->lgh_cur_offset += r->lrh_len;
+               r->lrh_index = ++loghandle->lgh_last_idx;
+
                spin_lock(&mdd->mdd_cl.mc_lock);
-               rec->cr.cr_index = mdd->mdd_cl.mc_index + 1;
+               rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
                spin_unlock(&mdd->mdd_cl.mc_lock);
 
-               rc = llog_osd_ops.lop_write_rec(env, loghandle, r,
-                                               cookie, idx, th);
+               /* drop the loghandle semaphore for parallel writes */
+               up_write(&loghandle->lgh_lock);
 
-               /*
-                * if current llog is full, we will generate a new
-                * llog, and since it's actually not an error, let's
-                * avoid increasing index so that userspace apps
-                * should not see a gap in the changelog sequence
+               REC_TAIL(r)->lrt_len = r->lrh_len;
+               REC_TAIL(r)->lrt_index = r->lrh_index;
+
+               lgi_buf.lb_len = rec->cr_hdr.lrh_len;
+               lgi_buf.lb_buf = rec;
+
+               rc = dt_record_write(env, o, &lgi_buf, &offset, th);
+
+               if (rc) {
+                       CERROR("%s: failed to write changelog record file "DFID" rec idx %u off %llu chnlg idx %llu: rc = %d\n",
+                              loghandle->lgh_ctxt->loc_obd->obd_name,
+                              PFID(lu_object_fid(&o->do_lu)), r->lrh_index,
+                              offset, rec->cr.cr_index, rc);
+                       return rc;
+               }
+
+               /* mark index at bitmap after successful write, increment count,
+                * and lrt_index with a last index. Use a lgh_hdr_lock for
+                * a synchronization with llog_cancel.
                 */
-               if (!(rc == -ENOSPC && llog_is_full(loghandle))) {
-                       spin_lock(&mdd->mdd_cl.mc_lock);
-                       ++mdd->mdd_cl.mc_index;
-                       spin_unlock(&mdd->mdd_cl.mc_lock);
+               spin_lock(&loghandle->lgh_hdr_lock);
+               rc = __test_and_set_bit_le(r->lrh_index,
+                                          LLOG_HDR_BITMAP(loghandle->lgh_hdr));
+               LASSERTF(!rc,
+                        "%s: index %u already set in llog bitmap "DFID"\n",
+                        loghandle->lgh_ctxt->loc_obd->obd_name,
+                        r->lrh_index, PLOGID(&loghandle->lgh_id));
+               loghandle->lgh_hdr->llh_count++;
+               if (LLOG_HDR_TAIL(loghandle->lgh_hdr)->lrt_index < r->lrh_index)
+                       LLOG_HDR_TAIL(loghandle->lgh_hdr)->lrt_index =
+                               r->lrh_index;
+               spin_unlock(&loghandle->lgh_hdr_lock);
+
+               if (unlikely(th != saved_th)) {
+                       CDEBUG(D_OTHER, "%s: wrote rec %u "DFID" count %d\n",
+                              loghandle->lgh_ctxt->loc_obd->obd_name,
+                              r->lrh_index, PLOGID(&loghandle->lgh_id),
+                              loghandle->lgh_hdr->llh_count);
+                       saved_th = th;
                }
+               lgi_buf.lb_len = loghandle->lgh_hdr_size;
+               lgi_buf.lb_buf = loghandle->lgh_hdr;
+               offset = 0;
+               CDEBUG(D_TRACE, "%s: writing header "DFID"\n",
+                      loghandle->lgh_ctxt->loc_obd->obd_name,
+                      PLOGID(&loghandle->lgh_id));
+               /* full header write, it is a local. For a mapped bh
+                * it is memcpy() only. Probably it could be delayed as work.
+                */
+               rc = dt_record_write(env, o, &lgi_buf, &offset, th);
        } else {
                rc = llog_osd_ops.lop_write_rec(env, loghandle, r,
                                                cookie, idx, th);
        }
+       if (rc < 0)
+               CERROR("%s: failed to write changelog record file "DFID" count %d offset %llu: rc = %d\n",
+                      loghandle->lgh_ctxt->loc_obd->obd_name,
+                      PLOGID(&loghandle->lgh_id),
+                      loghandle->lgh_hdr->llh_count, loghandle->lgh_cur_offset,
+                      rc);
 
        return rc;
 }
index 556b4c2..b839e91 100644 (file)
@@ -44,7 +44,7 @@ static struct llog_handle *llog_alloc_handle(void)
                return NULL;
 
        init_rwsem(&loghandle->lgh_lock);
-       mutex_init(&loghandle->lgh_hdr_mutex);
+       spin_lock_init(&loghandle->lgh_hdr_lock);
        init_rwsem(&loghandle->lgh_last_sem);
        INIT_LIST_HEAD(&loghandle->u.phd.phd_entry);
        refcount_set(&loghandle->lgh_refcount, 1);
@@ -243,13 +243,15 @@ int llog_cancel_arr_rec(const struct lu_env *env, struct llog_handle *loghandle,
 
        down_write(&loghandle->lgh_lock);
        /* clear bitmap */
-       mutex_lock(&loghandle->lgh_hdr_mutex);
+       spin_lock(&loghandle->lgh_hdr_lock);
        for (i = 0; i < num; ++i) {
                if (index[i] == 0) {
+                       spin_unlock(&loghandle->lgh_hdr_lock);
                        CERROR("Can't cancel index 0 which is header\n");
                        GOTO(out_unlock, rc = -EINVAL);
                }
                if (!__test_and_clear_bit_le(index[i], LLOG_HDR_BITMAP(llh))) {
+                       spin_unlock(&loghandle->lgh_hdr_lock);
                        CDEBUG(D_OTHER, "Catalog index %u already clear?\n",
                               index[i]);
                        GOTO(out_unlock, rc = -ENOENT);
@@ -257,6 +259,7 @@ int llog_cancel_arr_rec(const struct lu_env *env, struct llog_handle *loghandle,
        }
        loghandle->lgh_hdr->llh_count -= num;
        subtract_count = true;
+       spin_unlock(&loghandle->lgh_hdr_lock);
 
        /* Since llog_process_thread use lgi_cookie, it`s better to save them
         * and restore after using
@@ -305,26 +308,27 @@ int llog_cancel_arr_rec(const struct lu_env *env, struct llog_handle *loghandle,
 out_unlock:
        if (rc < 0) {
                /* restore bitmap while holding a mutex */
+               spin_lock(&loghandle->lgh_hdr_lock);
                if (subtract_count) {
                        loghandle->lgh_hdr->llh_count += num;
                        subtract_count = false;
                }
                for (i = i - 1; i >= 0; i--)
                        set_bit_le(index[i], LLOG_HDR_BITMAP(llh));
+               spin_unlock(&loghandle->lgh_hdr_lock);
        }
-       mutex_unlock(&loghandle->lgh_hdr_mutex);
        up_write(&loghandle->lgh_lock);
 out_trans:
        rc1 = dt_trans_stop(env, dt, th);
        if (rc == 0)
                rc = rc1;
        if (rc1 < 0) {
-               mutex_lock(&loghandle->lgh_hdr_mutex);
+               spin_lock(&loghandle->lgh_hdr_lock);
                if (subtract_count)
                        loghandle->lgh_hdr->llh_count += num;
                for (i = i - 1; i >= 0; i--)
                        set_bit_le(index[i], LLOG_HDR_BITMAP(llh));
-               mutex_unlock(&loghandle->lgh_hdr_mutex);
+               spin_unlock(&loghandle->lgh_hdr_lock);
        }
        RETURN(rc);
 }
@@ -374,6 +378,7 @@ int llog_read_header(const struct lu_env *env, struct llog_handle *handle,
                set_bit_le(0, LLOG_HDR_BITMAP(llh));
                LLOG_HDR_TAIL(llh)->lrt_len = llh->llh_hdr.lrh_len;
                LLOG_HDR_TAIL(llh)->lrt_index = llh->llh_hdr.lrh_index;
+               handle->lgh_cur_offset = llh->llh_hdr.lrh_len;
                rc = 0;
        }
        RETURN(rc);
@@ -456,7 +461,7 @@ out:
 EXPORT_SYMBOL(llog_init_handle);
 
 #define LLOG_ERROR_REC(lgh, rec, format, a...) \
-       CERROR("%s: "DFID" rec type=%x idx=%u len=%u, " format "\n" , \
+       CDEBUG(D_OTHER, "%s: "DFID" rec type=%x idx=%u len=%u, " format "\n", \
               loghandle2name(lgh), PLOGID(&lgh->lgh_id), (rec)->lrh_type, \
               (rec)->lrh_index, (rec)->lrh_len, ##a)
 
@@ -466,7 +471,8 @@ int llog_verify_record(const struct llog_handle *llh, struct llog_rec_hdr *rec)
 
        if ((rec->lrh_type & LLOG_OP_MASK) != LLOG_OP_MAGIC)
                LLOG_ERROR_REC(llh, rec, "magic is bad");
-       else if (rec->lrh_len == 0 || rec->lrh_len > chunk_size)
+       else if (rec->lrh_len == 0 || rec->lrh_len > chunk_size ||
+                rec->lrh_len < LLOG_MIN_REC_SIZE)
                LLOG_ERROR_REC(llh, rec, "bad record len, chunk size is %d",
                               chunk_size);
        else if (rec->lrh_index > llog_max_idx(llh->lgh_hdr))
@@ -551,6 +557,7 @@ static int llog_process_thread(void *arg)
        while (rc == 0) {
                struct llog_rec_hdr *rec;
                off_t chunk_offset = 0;
+               off_t last_chunk_offset = 0;
                unsigned int buf_offset = 0;
                int lh_last_idx;
                int synced_idx = 0;
@@ -597,26 +604,31 @@ repeat:
                 * The absolute offset of the current chunk is calculated
                 * from cur_offset value and stored in chunk_offset variable.
                 */
+               last_chunk_offset = chunk_offset;
                if ((cur_offset & (chunk_size - 1)) != 0)
                        chunk_offset = cur_offset & ~(chunk_size - 1);
                else
                        chunk_offset = cur_offset - chunk_size;
 
+               /* When reread a chunk with zeores at the end, it could
+                * happened that index was found at next chunk. Start
+                * processing from a beginning.
+                */
+               if (last_chunk_offset != chunk_offset)
+                       buf_offset = 0;
+
                /* NB: when rec->lrh_len is accessed it is already swabbed
                 * since it is used at the "end" of the loop and the rec
                 * swabbing is done at the beginning of the loop. */
                for (rec = (struct llog_rec_hdr *)(buf + buf_offset);
-                    (char *)rec < buf + chunk_size;
+                    (char *)rec <= buf + chunk_size - LLOG_MIN_REC_SIZE;
                     rec = llog_rec_hdr_next(rec)) {
 
-                       CDEBUG(D_OTHER, "processing rec 0x%p type %#x\n",
-                              rec, rec->lrh_type);
-
                        if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
                                lustre_swab_llog_rec(rec);
 
-                       CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
-                              rec->lrh_type, rec->lrh_index);
+                       CDEBUG(D_OTHER, "processing rec 0x%px type=%#x idx=%d\n",
+                              rec, rec->lrh_type, rec->lrh_index);
 
                        /* start with first rec if block was skipped */
                        if (!index) {
@@ -720,9 +732,13 @@ repeat:
                               rec->lrh_index, rec->lrh_len,
                               (int)(buf + chunk_size - (char *)rec));
 
-                       /* lgh_cur_offset is used only at llog_test_3 */
-                       loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
-                                                   chunk_offset;
+                       /* lgh_cur_offset is used only at llog_test_3 and
+                        * changelog
+                        */
+                       if (unlikely(loghandle->lgh_ctxt->loc_idx ==
+                                    LLOG_TEST_ORIG_CTXT))
+                               loghandle->lgh_cur_offset = (char *)rec -
+                                               (char *)buf + chunk_offset;
 
                        /* if needed, process the callback on this record */
                        if (!llog_is_index_skipable(index, llh, cd)) {
index 806911e..4252abd 100644 (file)
@@ -579,6 +579,8 @@ retry:
                                up_write(&cathandle->lgh_lock);
                                llog_close(env, loghandle);
                        }
+                       CERROR("%s: initialization error: rc = %d\n",
+                              loghandle2name(cathandle), rc);
                        RETURN(rc);
                }
        }
@@ -605,8 +607,9 @@ retry:
                        dt_attr_set(env, loghandle->lgh_obj, &lgi->lgi_attr, th);
                }
        }
-
-       up_write(&loghandle->lgh_lock);
+       /* llog_write_rec could unlock a semaphore */
+       if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_UNLCK_SEM))
+               up_write(&loghandle->lgh_lock);
 
        if (rc == -ENOBUFS) {
                if (retried++ == 0)
index 62f331f..af20930 100644 (file)
@@ -604,16 +604,14 @@ static int llog_osd_write_rec(const struct lu_env *env,
        lrt->lrt_len = rec->lrh_len;
        lrt->lrt_index = rec->lrh_index;
 
-       /* the lgh_hdr_mutex protects llog header data from concurrent
+       /* the lgh_hdr_lock protects llog header data from concurrent
         * update/cancel, the llh_count and llh_bitmap are protected */
-       mutex_lock(&loghandle->lgh_hdr_mutex);
-       if (__test_and_set_bit_le(index, LLOG_HDR_BITMAP(llh))) {
-               CERROR("%s: index %u already set in llog bitmap "DFID"\n",
-                      o->do_lu.lo_dev->ld_obd->obd_name, index,
-                      PFID(lu_object_fid(&o->do_lu)));
-               mutex_unlock(&loghandle->lgh_hdr_mutex);
-               LBUG(); /* should never happen */
-       }
+       spin_lock(&loghandle->lgh_hdr_lock);
+       rc = __test_and_set_bit_le(index, LLOG_HDR_BITMAP(llh));
+       LASSERTF(!rc,
+                "%s: index %u already set in llog bitmap "DFID"\n",
+                o->do_lu.lo_dev->ld_obd->obd_name, index,
+                PFID(lu_object_fid(&o->do_lu)));
        llh->llh_count++;
 
        if (!(llh->llh_flags & LLOG_F_IS_FIXSIZE)) {
@@ -623,6 +621,7 @@ static int llog_osd_write_rec(const struct lu_env *env,
                else if (reclen < llh->llh_size)
                        llh->llh_size = reclen;
        }
+       spin_unlock(&loghandle->lgh_hdr_lock);
 
        /*
         * readers (e.g. llog_osd_read_header()) must not find
@@ -681,7 +680,6 @@ static int llog_osd_write_rec(const struct lu_env *env,
 
 out_unlock:
        /* unlock here for remote object */
-       mutex_unlock(&loghandle->lgh_hdr_mutex);
        if (rc) {
                dt_write_unlock(env, o);
                GOTO(out, rc);
@@ -747,10 +745,10 @@ out_unlock:
        RETURN(rc);
 out:
        /* cleanup llog for error case */
-       mutex_lock(&loghandle->lgh_hdr_mutex);
+       spin_lock(&loghandle->lgh_hdr_lock);
        clear_bit_le(index, LLOG_HDR_BITMAP(llh));
        llh->llh_count--;
-       mutex_unlock(&loghandle->lgh_hdr_mutex);
+       spin_unlock(&loghandle->lgh_hdr_lock);
 
        /* restore llog last_idx */
        if (dt_object_remote(o)) {
@@ -1054,12 +1052,6 @@ static void changelog_block_trim_ext(struct llog_rec_hdr *hdr,
                struct changelog_rec *rec = (struct changelog_rec *)(hdr + 1);
                enum changelog_rec_extra_flags xflag = CLFE_INVALID;
 
-               if (flags & CLF_EXTRA_FLAGS &&
-                   rec->cr_flags & CLF_EXTRA_FLAGS) {
-                       xflag = changelog_rec_extra_flags(rec)->cr_extra_flags &
-                               extra_flags;
-               }
-
                if (unlikely(hdr->lrh_len == 0)) {
                        /* It is corruption case, we cannot know the next rec,
                         * jump to the last one directly to avoid dead loop. */
@@ -1075,6 +1067,13 @@ static void changelog_block_trim_ext(struct llog_rec_hdr *hdr,
                        break;
                }
 
+
+               if (flags & CLF_EXTRA_FLAGS &&
+                   rec->cr_flags & CLF_EXTRA_FLAGS) {
+                       xflag = changelog_rec_extra_flags(rec)->cr_extra_flags &
+                               extra_flags;
+               }
+
                /* Fill up the changelog record with everything the kernel
                 * version supports.
                 */
@@ -1209,8 +1208,21 @@ static int llog_osd_next_block(const struct lu_env *env,
                rec = buf;
                if (LLOG_REC_HDR_NEEDS_SWABBING(rec))
                        lustre_swab_llog_rec(rec);
+
+               /* caller handles bad records if any */
+               if (llog_verify_record(loghandle, rec))
+                       GOTO(out, rc = 0);
+
                tail = (struct llog_rec_tail *)((char *)buf + rc -
                                                sizeof(struct llog_rec_tail));
+
+               while ((tail->lrt_index == 0 || tail->lrt_len == 0) &&
+                      (void *) tail > buf) {
+                       /* looks like zeroes at the end of block */
+                       /* searching real record, assume 4bytes align */
+                       tail = (struct llog_rec_tail *)(((char *)tail) - 4);
+               };
+
                tail_len = tail->lrt_len;
                /* base on tail_len do swab */
                if (tail_len > chunk_size) {
@@ -1229,10 +1241,6 @@ static int llog_osd_next_block(const struct lu_env *env,
                last_rec = (struct llog_rec_hdr *)((char *)tail - tail_len +
                                sizeof(struct llog_rec_tail));
 
-               /* caller handles bad records if any */
-               if (llog_verify_record(loghandle, rec))
-                       GOTO(out, rc = 0);
-
                if (LLOG_REC_HDR_NEEDS_SWABBING(last_rec))
                        lustre_swab_llog_rec(last_rec);
 
index d35b868..5e4cffe 100644 (file)
@@ -325,7 +325,10 @@ static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
        if (obj->oo_destroyed)
                GOTO(out, rc = -ENOENT);
 
-       if (fid_is_llog(lu_object_fid(&dt->do_lu))) {
+       /* XXX: disable the optimization as it's not compatible
+        * with indexed llog and multiple writes a block in few
+        * threads */
+       if (fid_is_llog(lu_object_fid(&dt->do_lu)) && 0) {
                osd_write_llog_header(obj, buf, pos, oh);
        } else {
                osd_dmu_write(osd, obj->oo_dn, offset, (uint64_t)buf->lb_len,
index 11fa409..465e98e 100644 (file)
@@ -259,6 +259,9 @@ int llog_origin_handle_read_header(struct ptlrpc_request *req)
 
        hdr = req_capsule_server_get(&req->rq_pill, &RMF_LLOG_LOG_HDR);
        *hdr = *loghandle->lgh_hdr;
+       CDEBUG(D_OTHER, "%s: red llog header "DFID" count %d lgh_last_idx %d llh_cat_idx %d\n",
+              ctxt->loc_obd->obd_name, PLOGID(&loghandle->lgh_id),
+              hdr->llh_count, LLOG_HDR_TAIL(hdr)->lrt_index, hdr->llh_cat_idx);
        EXIT;
 out_close:
        llog_origin_close(req->rq_svc_thread->t_env, loghandle);
index aeab954..8932996 100755 (executable)
@@ -11531,6 +11531,9 @@ test_135() {
        stack_trap "echo $rl > /sys/module/libcfs/parameters/libcfs_console_ratelimit" EXIT
 
        test_mkdir -c 1 -i 0 $DIR/$tdir || error "Failed to create directory"
+       do_nodes $(comma_list $(osts_nodes)) $LCTL set_param \
+               seq.*OST*-super.width=$DATA_SEQ_MAX_WIDTH
+
        changelog_chmask "ALL" || error "changelog_chmask failed"
        changelog_register || error "changelog_register failed"
 
@@ -11552,6 +11555,7 @@ test_135() {
 
        # Check changelog entries
        lastread=$(__test_135_reader $fd $cl_user) || exit $?
+
        ! kill -0 $files_pid 2>/dev/null ||
                error "creation thread is running. Is changelog reader stuck?"
 
index 32b4249..f9f215e 100755 (executable)
@@ -12174,7 +12174,7 @@ function createmany() {
 
        if (( count > 100 )); then
                debugsave
-               do_nodes $(comma_list $(all_nodes)) $LCTL set_param -n debug=0
+               do_nodes $(comma_list $(all_nodes)) $LCTL set_param -n debug=ha
        fi
        $LUSTRE/tests/createmany $*
        rc=$?