Whamcloud - gitweb
LU-10913 llog: add startcat for wrapped catalog 49/33749/3
authorAlexander Boyko <c17825@cray.com>
Thu, 29 Nov 2018 08:20:00 +0000 (03:20 -0500)
committerOleg Drokin <green@whamcloud.com>
Mon, 11 Feb 2019 03:22:11 +0000 (03:22 +0000)
The osp_sync_thread loop for a llog_cat_process has a mistake.
When llog_cat_process has reached a bottom of catalog, the processing
restarts with 0. Which means a default processing. In this case a
catalog is wrapped and processing starts from a llh_cat_idx. But
records at the bottom were processed already, and were not cancelled
yet. The next message appears at log.
osp_sync_interpret()) reply req ffff8800123e3600/1, rc -2, transno 0

llog_cat_process support startcat index for processing catalog.
In this case the processing starts from startcat index. But if
catalog is wrapped startcat index is ignored.

The patch adds supporting of startcat index for wrapped catalog.

Signed-off-by: Alexander Boyko <c17825@cray.com>
Change-Id: Ie4e3ecf2532878578ae0463969115664e3589188
Cray-bug-id: LUS-6765
Reviewed-on: https://review.whamcloud.com/33749
Tested-by: Jenkins
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Sergey Cheremencev <c17829@cray.com>
Reviewed-by: Alexander Zarochentsev <c17826@cray.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/uapi/linux/lustre/lustre_idl.h
lustre/obdclass/llog_cat.c
lustre/osp/osp_sync.c

index 2284840..71e9af1 100644 (file)
@@ -2916,6 +2916,11 @@ enum llog_flag {
                          LLOG_F_EXT_X_OMODE | LLOG_F_EXT_X_XATTR,
 };
 
+/* means first record of catalog */
+enum {
+       LLOG_CAT_FIRST = -1,
+};
+
 /* On-disk header structure of each log object, stored in little endian order */
 #define LLOG_MIN_CHUNK_SIZE    8192
 #define LLOG_HEADER_SIZE        (96) /* sizeof (llog_log_hdr) + sizeof(llh_tail)
index 18a41f9..f939e8b 100644 (file)
@@ -855,16 +855,17 @@ int llog_cat_process_or_fork(const struct lu_env *env,
                             llog_cb_t cb, void *data, int startcat,
                             int startidx, bool fork)
 {
-        struct llog_process_data d;
-        struct llog_log_hdr *llh = cat_llh->lgh_hdr;
-        int rc;
-        ENTRY;
+       struct llog_process_data d;
+       struct llog_log_hdr *llh = cat_llh->lgh_hdr;
+       int rc;
 
-        LASSERT(llh->llh_flags & LLOG_F_IS_CAT);
-        d.lpd_data = data;
-        d.lpd_cb = cb;
-        d.lpd_startcat = startcat;
-        d.lpd_startidx = startidx;
+       ENTRY;
+
+       LASSERT(llh->llh_flags & LLOG_F_IS_CAT);
+       d.lpd_data = data;
+       d.lpd_cb = cb;
+       d.lpd_startcat = (startcat == LLOG_CAT_FIRST ? 0 : startcat);
+       d.lpd_startidx = startidx;
 
        if (llh->llh_cat_idx >= cat_llh->lgh_last_idx &&
            llh->llh_count > 1) {
@@ -873,24 +874,38 @@ int llog_cat_process_or_fork(const struct lu_env *env,
                CWARN("%s: catlog "DFID" crosses index zero\n",
                      cat_llh->lgh_ctxt->loc_obd->obd_name,
                      PFID(&cat_llh->lgh_id.lgl_oi.oi_fid));
-
-               cd.lpcd_first_idx = llh->llh_cat_idx;
-               cd.lpcd_last_idx = 0;
-               rc = llog_process_or_fork(env, cat_llh, cat_cb,
-                                         &d, &cd, fork);
-               if (rc != 0)
-                       RETURN(rc);
-
-               cd.lpcd_first_idx = 0;
+               /*startcat = 0 is default value for general processing */
+               if ((startcat != LLOG_CAT_FIRST &&
+                   startcat >= llh->llh_cat_idx) || !startcat) {
+                       /* processing the catalog part at the end */
+                       cd.lpcd_first_idx = (startcat ? startcat :
+                                            llh->llh_cat_idx);
+                       cd.lpcd_last_idx = 0;
+                       rc = llog_process_or_fork(env, cat_llh, cat_cb,
+                                                 &d, &cd, fork);
+                       /* Reset the startcat becasue it has already reached
+                        * catalog bottom.
+                        */
+                       startcat = 0;
+                       if (rc != 0)
+                               RETURN(rc);
+               }
+               /* processing the catalog part at the begining */
+               cd.lpcd_first_idx = (startcat == LLOG_CAT_FIRST) ? 0 : startcat;
+               /* Note, the processing will stop at the lgh_last_idx value,
+                * and it could be increased during processing. So records
+                * between current lgh_last_idx and lgh_last_idx in future
+                * would left unprocessed.
+                */
                cd.lpcd_last_idx = cat_llh->lgh_last_idx;
                rc = llog_process_or_fork(env, cat_llh, cat_cb,
                                          &d, &cd, fork);
-        } else {
+       } else {
                rc = llog_process_or_fork(env, cat_llh, cat_cb,
                                          &d, NULL, fork);
-        }
+       }
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 int llog_cat_process(const struct lu_env *env, struct llog_handle *cat_llh,
index c4dece9..63e0b86 100644 (file)
@@ -1233,14 +1233,15 @@ static int osp_sync_thread(void *_arg)
                       cfs_fail_val : (LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr) - 1);
                /* processing reaches catalog bottom */
                if (d->opd_sync_last_catalog_idx == size)
-                       d->opd_sync_last_catalog_idx = 0;
+                       d->opd_sync_last_catalog_idx = LLOG_CAT_FIRST;
                else if (wrapped)
                        /* If catalog is wrapped we can`t predict last index of
                         * processing because lgh_last_idx could be changed.
                         * Starting form the next one */
                        d->opd_sync_last_catalog_idx++;
 
-       } while (rc == 0 && (wrapped || d->opd_sync_last_catalog_idx == 0));
+       } while (rc == 0 && (wrapped ||
+                            d->opd_sync_last_catalog_idx == LLOG_CAT_FIRST));
 
        if (rc < 0) {
                CERROR("%s: llog process with osp_sync_process_queues "