Whamcloud - gitweb
LU-7001 osp: fix llog processing 32/26132/27
authorAlexander Boyko <alexander.boyko@seagate.com>
Wed, 22 Mar 2017 11:39:48 +0000 (14:39 +0300)
committerOleg Drokin <oleg.drokin@intel.com>
Wed, 13 Sep 2017 03:36:56 +0000 (03:36 +0000)
The osp_sync_thread base on fact that llog_cat_process
will not end until umount. This is worng when processing reaches
bottom of catalog, or if catalog is wrapped.
The patch fixes this issue.

For wrapped catalog llog_process_thread could process old
record.
1 thread llog_process_thread read chunk and proccing first record
2 thread add rec to this catalog at this chunk and
  update bitmap
1 check bitmap for next idx and process old record

Test conf-sanity 106 was added.

Signed-off-by: Alexander Boyko <alexander.boyko@seagate.com>
Seagate-bug-id: MRP-4235
Change-Id: Ifc983018e3a325622ef3215bec4b69f5c9ac2ba2
Reviewed-on: https://review.whamcloud.com/26132
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Andriy Skulysh
Reviewed-by: Mike Pershin <mike.pershin@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/obdclass/llog.c
lustre/obdclass/llog_osd.c
lustre/osp/osp_internal.h
lustre/osp/osp_sync.c
lustre/tests/conf-sanity.sh

index b9fee59..61c9a1d 100644 (file)
@@ -421,10 +421,11 @@ static int llog_process_thread(void *arg)
        struct llog_process_cat_data    *cd  = lpi->lpi_catdata;
        char                            *buf;
        size_t                           chunk_size;
-       __u64                            cur_offset, tmp_offset;
+       __u64                            cur_offset;
        int                              rc = 0, index = 1, last_index;
        int                              saved_index = 0;
        int                              last_called_index = 0;
+       bool                             repeated = false;
 
        ENTRY;
 
@@ -452,9 +453,10 @@ static int llog_process_thread(void *arg)
 
        while (rc == 0) {
                struct llog_rec_hdr *rec;
-               off_t chunk_offset;
+               off_t chunk_offset = 0;
                unsigned int buf_offset = 0;
                bool partial_chunk;
+               int     lh_last_idx;
 
                /* skip records not set in bitmap */
                while (index <= last_index &&
@@ -471,8 +473,19 @@ static int llog_process_thread(void *arg)
 repeat:
                /* get the buf with our target record; avoid old garbage */
                memset(buf, 0, chunk_size);
+               /* the record index for outdated chunk data */
+               lh_last_idx = loghandle->lgh_last_idx + 1;
                rc = llog_next_block(lpi->lpi_env, loghandle, &saved_index,
                                     index, &cur_offset, buf, chunk_size);
+               if (repeated && rc)
+                       CDEBUG(D_OTHER, "cur_offset %llu, chunk_offset %llu,"
+                              " buf_offset %u, rc = %d\n", cur_offset,
+                              (__u64)chunk_offset, buf_offset, rc);
+               /* we`ve tried to reread the chunk, but there is no
+                * new records */
+               if (rc == -EIO && repeated && (chunk_offset + buf_offset) ==
+                   cur_offset)
+                       GOTO(out, rc = 0);
                if (rc != 0)
                        GOTO(out, rc);
 
@@ -481,8 +494,7 @@ repeat:
                 * The absolute offset of the current chunk is calculated
                 * from cur_offset value and stored in chunk_offset variable.
                 */
-               tmp_offset = cur_offset;
-               if (do_div(tmp_offset, chunk_size) != 0) {
+               if ((cur_offset & (chunk_size - 1)) != 0) {
                        partial_chunk = true;
                        chunk_offset = cur_offset & ~(chunk_size - 1);
                } else {
@@ -506,15 +518,28 @@ repeat:
                        CDEBUG(D_OTHER, "after swabbing, type=%#x idx=%d\n",
                               rec->lrh_type, rec->lrh_index);
 
+                       /* the bitmap could be changed during processing
+                        * records from the chunk. For wrapped catalog
+                        * it means we can read deleted record and try to
+                        * process it. Check this case and reread the chunk. */
+
                        /* for partial chunk the end of it is zeroed, check
                         * for index 0 to distinguish it. */
-                       if (partial_chunk && rec->lrh_index == 0) {
+                       if ((partial_chunk && rec->lrh_index == 0) ||
+                            (index == lh_last_idx &&
+                             lh_last_idx != (loghandle->lgh_last_idx + 1))) {
                                /* concurrent llog_add() might add new records
                                 * while llog_processing, check this is not
                                 * the case and re-read the current chunk
                                 * otherwise. */
                                int records;
-                               if (index > loghandle->lgh_last_idx)
+                               /* lgh_last_idx could be less then index
+                                * for catalog, if catalog is wrapped */
+                               if ((index > loghandle->lgh_last_idx &&
+                                   !(loghandle->lgh_hdr->llh_flags &
+                                     LLOG_F_IS_CAT)) || repeated ||
+                                   (loghandle->lgh_obj != NULL &&
+                                    dt_object_remote(loghandle->lgh_obj)))
                                        GOTO(out, rc = 0);
                                /* <2 records means no more records
                                 * if the last record we processed was
@@ -532,9 +557,12 @@ repeat:
                                /* save offset inside buffer for the re-read */
                                buf_offset = (char *)rec - (char *)buf;
                                cur_offset = chunk_offset;
+                               repeated = true;
                                goto repeat;
                        }
 
+                       repeated = false;
+
                        if (rec->lrh_len == 0 || rec->lrh_len > chunk_size) {
                                CWARN("%s: invalid length %d in llog "DFID
                                      "record for index %d/%d\n",
@@ -584,6 +612,12 @@ repeat:
                                }
                                if (rc)
                                        GOTO(out, rc);
+                               /* some stupid callbacks directly cancel records
+                                * and delete llog. Check it and stop
+                                * processing. */
+                               if (loghandle->lgh_hdr == NULL ||
+                                   loghandle->lgh_hdr->llh_count == 1)
+                                       GOTO(out, rc = 0);
                        }
                        /* exit if the last index is reached */
                        if (index >= last_index)
@@ -611,7 +645,14 @@ out:
                         * llog file, probably I/O error or the log got
                         * corrupted to be able to finally release the log we
                         * discard any remaining bits in the header */
-                       CERROR("Local llog found corrupted\n");
+                       CERROR("%s: Local llog found corrupted #"DOSTID":%x"
+                              " %s index %d count %d\n",
+                              loghandle->lgh_ctxt->loc_obd->obd_name,
+                              POSTID(&loghandle->lgh_id.lgl_oi),
+                              loghandle->lgh_id.lgl_ogen,
+                              ((llh->llh_flags & LLOG_F_IS_CAT) ? "catalog" :
+                               "plain"), index, llh->llh_count);
+
                        while (index <= last_index) {
                                if (ext2_test_bit(index,
                                                  LLOG_HDR_BITMAP(llh)) != 0)
@@ -624,8 +665,8 @@ out:
        }
 
        OBD_FREE_LARGE(buf, chunk_size);
-        lpi->lpi_rc = rc;
-        return 0;
+       lpi->lpi_rc = rc;
+       return 0;
 }
 
 static int llog_process_thread_daemonize(void *arg)
index 9d458f4..cba9a68 100644 (file)
@@ -1278,9 +1278,12 @@ generate:
 
 after_open:
        /* No new llog is expected but doesn't exist */
-       if (open_param != LLOG_OPEN_NEW && !dt_object_exists(o))
+       if (open_param != LLOG_OPEN_NEW && !dt_object_exists(o)) {
+               CDEBUG(D_INFO, "%s: llog FID: "DFID" obj %p doesn`t exist\n",
+                      o->do_lu.lo_dev->ld_obd->obd_name,
+                      PFID(lu_object_fid(&o->do_lu)), o);
                GOTO(out_put, rc = -ENOENT);
-
+       }
        fid_to_logid(&lgi->lgi_fid, &handle->lgh_id);
        handle->lgh_obj = o;
        handle->private_data = los;
index 6a95d9a..16c6423 100644 (file)
@@ -230,6 +230,8 @@ struct osp_device {
        __u64                            opd_sync_last_committed_id;
        /* last processed (taken from llog) id */
        volatile __u64                   opd_sync_last_processed_id;
+       /* last processed catalog index */
+       int                              opd_sync_last_catalog_idx;
        struct osp_id_tracker           *opd_sync_tracker;
        struct list_head                 opd_sync_ontrack;
        /* stop processing new requests until barrier=0 */
index b04e526..fed6202 100644 (file)
@@ -880,6 +880,8 @@ static void osp_sync_process_record(const struct lu_env *env,
        cookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
        cookie.lgc_index = rec->lrh_index;
 
+       d->opd_sync_last_catalog_idx = llh->lgh_hdr->llh_cat_idx;
+
        if (unlikely(rec->lrh_type == LLOG_GEN_REC)) {
                struct llog_gen_rec *gen = (struct llog_gen_rec *)rec;
 
@@ -1167,6 +1169,7 @@ static int osp_sync_thread(void *_arg)
        struct llog_handle      *llh;
        struct lu_env            env;
        int                      rc, count;
+       bool                     wrapped;
 
        ENTRY;
 
@@ -1201,7 +1204,35 @@ static int osp_sync_thread(void *_arg)
                GOTO(out, rc = -EINVAL);
        }
 
-       rc = llog_cat_process(&env, llh, osp_sync_process_queues, d, 0, 0);
+       /*
+        * Catalog processing stops when it processed last catalog record
+        * with index equal to the end of catalog bitmap. Or if it is wrapped,
+        * processing stops with index equal to the lgh_last_idx. We need to
+        * continue processing.
+        */
+       d->opd_sync_last_catalog_idx = 0;
+       do {
+               int     size;
+
+               wrapped = (llh->lgh_hdr->llh_cat_idx >= llh->lgh_last_idx &&
+                          llh->lgh_hdr->llh_count > 1);
+
+               rc = llog_cat_process(&env, llh, osp_sync_process_queues, d,
+                                     d->opd_sync_last_catalog_idx, 0);
+
+               size = OBD_FAIL_PRECHECK(OBD_FAIL_CAT_RECORDS) ?
+                      cfs_fail_val : (LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr) - 1);
+               /* processing reaches catalog bottom */
+               if (d->opd_sync_last_catalog_idx == size)
+                       d->opd_sync_last_catalog_idx = 0;
+               else if (wrapped)
+                       /* If catalog is wrapped we can`t predict last index of
+                        * processing because lgh_last_idx could be changed.
+                        * Starting form the next one */
+                       d->opd_sync_last_catalog_idx++;
+
+       } while (rc == 0 && (wrapped || d->opd_sync_last_catalog_idx == 0));
+
        if (rc < 0) {
                CERROR("%s: llog process with osp_sync_process_queues "
                       "failed: %d\n", d->opd_obd->obd_name, rc);
index 5f47b57..d10a166 100644 (file)
@@ -7462,6 +7462,32 @@ test_105() {
 }
 run_test 105 "check file creation for ro and rw bind mnt pt"
 
+test_106() {
+       local repeat=5
+
+       reformat
+       setupall
+       mkdir -p $DIR/$tdir || error "create $tdir failed"
+       lfs setstripe -c 1 -i 0 $DIR/$tdir
+#define OBD_FAIL_CAT_RECORDS                        0x1312
+       do_facet mds1 $LCTL set_param fail_loc=0x1312 fail_val=$repeat
+
+       for ((i = 1; i <= $repeat; i++)); do
+
+               #one full plain llog
+               createmany -o $DIR/$tdir/f- 64768
+
+               createmany -u $DIR/$tdir/f- 64768
+       done
+       wait_delete_completed $((TIMEOUT * 7))
+#ASSERTION osp_sync_thread() ( thread->t_flags != SVC_RUNNING ) failed
+#shows that osp code is buggy
+       do_facet mds1 $LCTL set_param fail_loc=0 fail_val=0
+
+       cleanupall
+}
+run_test 106 "check osp llog processing when catalog is wrapped"
+
 test_107() {
        [[ $(lustre_version_code $SINGLEMDS) -ge $(version_code 2.10.50) ]] ||
                { skip "Need MDS version > 2.10.50"; return; }