Whamcloud - gitweb
LU-10913 llog: add startcat for wrapped catalog 06/34506/2
authorAlexander Boyko <c17825@cray.com>
Thu, 29 Nov 2018 08:20:00 +0000 (03:20 -0500)
committerOleg Drokin <green@whamcloud.com>
Mon, 8 Apr 2019 06:27:19 +0000 (06:27 +0000)
The osp_sync_thread loop for a llog_cat_process has a mistake.
When llog_cat_process has reached a bottom of catalog, the processing
restarts with 0. Which means a default processing. In this case a
catalog is wrapped and processing starts from a llh_cat_idx. But
records at the bottom were processed already, and were not cancelled
yet. The next message appears at log.
osp_sync_interpret()) reply req ffff8800123e3600/1, rc -2, transno 0

llog_cat_process support startcat index for processing catalog.
In this case the processing starts from startcat index. But if
catalog is wrapped startcat index is ignored.

The patch adds supporting of startcat index for wrapped catalog.

Lustre-change: https://review.whamcloud.com/33749
Lustre-commit: 8109c9e1718d7f425a7186c31399ac035157acd6

Signed-off-by: Alexander Boyko <c17825@cray.com>
Change-Id: Ie4e3ecf2532878578ae0463969115664e3589188
Cray-bug-id: LUS-6765
Reviewed-by: Sergey Cheremencev <c17829@cray.com>
Reviewed-by: Alexander Zarochentsev <c17826@cray.com>
Signed-off-by: Minh Diep <mdiep@whamcloud.com>
Reviewed-on: https://review.whamcloud.com/34506
Tested-by: Jenkins
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/uapi/linux/lustre/lustre_idl.h
lustre/obdclass/llog_cat.c
lustre/osp/osp_sync.c

index 069fe56..62fa261 100644 (file)
@@ -2915,6 +2915,11 @@ enum llog_flag {
                          LLOG_F_EXT_X_OMODE | LLOG_F_EXT_X_XATTR,
 };
 
+/* means first record of catalog */
+enum {
+       LLOG_CAT_FIRST = -1,
+};
+
 /* On-disk header structure of each log object, stored in little endian order */
 #define LLOG_MIN_CHUNK_SIZE    8192
 #define LLOG_HEADER_SIZE        (96) /* sizeof (llog_log_hdr) + sizeof(llh_tail)
index 55ffdf0..f194983 100644 (file)
@@ -878,16 +878,17 @@ int llog_cat_process_or_fork(const struct lu_env *env,
                             llog_cb_t cb, void *data, int startcat,
                             int startidx, bool fork)
 {
-        struct llog_process_data d;
-        struct llog_log_hdr *llh = cat_llh->lgh_hdr;
-        int rc;
-        ENTRY;
+       struct llog_process_data d;
+       struct llog_log_hdr *llh = cat_llh->lgh_hdr;
+       int rc;
 
-        LASSERT(llh->llh_flags & LLOG_F_IS_CAT);
-        d.lpd_data = data;
-        d.lpd_cb = cb;
-        d.lpd_startcat = startcat;
-        d.lpd_startidx = startidx;
+       ENTRY;
+
+       LASSERT(llh->llh_flags & LLOG_F_IS_CAT);
+       d.lpd_data = data;
+       d.lpd_cb = cb;
+       d.lpd_startcat = (startcat == LLOG_CAT_FIRST ? 0 : startcat);
+       d.lpd_startidx = startidx;
 
        if (llh->llh_cat_idx >= cat_llh->lgh_last_idx &&
            llh->llh_count > 1) {
@@ -896,24 +897,38 @@ int llog_cat_process_or_fork(const struct lu_env *env,
                CWARN("%s: catlog "DFID" crosses index zero\n",
                      cat_llh->lgh_ctxt->loc_obd->obd_name,
                      PFID(&cat_llh->lgh_id.lgl_oi.oi_fid));
-
-               cd.lpcd_first_idx = llh->llh_cat_idx;
-               cd.lpcd_last_idx = 0;
-               rc = llog_process_or_fork(env, cat_llh, cat_cb,
-                                         &d, &cd, fork);
-               if (rc != 0)
-                       RETURN(rc);
-
-               cd.lpcd_first_idx = 0;
+               /*startcat = 0 is default value for general processing */
+               if ((startcat != LLOG_CAT_FIRST &&
+                   startcat >= llh->llh_cat_idx) || !startcat) {
+                       /* processing the catalog part at the end */
+                       cd.lpcd_first_idx = (startcat ? startcat :
+                                            llh->llh_cat_idx);
+                       cd.lpcd_last_idx = 0;
+                       rc = llog_process_or_fork(env, cat_llh, cat_cb,
+                                                 &d, &cd, fork);
+                       /* Reset the startcat becasue it has already reached
+                        * catalog bottom.
+                        */
+                       startcat = 0;
+                       if (rc != 0)
+                               RETURN(rc);
+               }
+               /* processing the catalog part at the begining */
+               cd.lpcd_first_idx = (startcat == LLOG_CAT_FIRST) ? 0 : startcat;
+               /* Note, the processing will stop at the lgh_last_idx value,
+                * and it could be increased during processing. So records
+                * between current lgh_last_idx and lgh_last_idx in future
+                * would left unprocessed.
+                */
                cd.lpcd_last_idx = cat_llh->lgh_last_idx;
                rc = llog_process_or_fork(env, cat_llh, cat_cb,
                                          &d, &cd, fork);
-        } else {
+       } else {
                rc = llog_process_or_fork(env, cat_llh, cat_cb,
                                          &d, NULL, fork);
-        }
+       }
 
-        RETURN(rc);
+       RETURN(rc);
 }
 
 int llog_cat_process(const struct lu_env *env, struct llog_handle *cat_llh,
index b21d5af..0e828c7 100644 (file)
@@ -1233,14 +1233,15 @@ static int osp_sync_thread(void *_arg)
                       cfs_fail_val : (LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr) - 1);
                /* processing reaches catalog bottom */
                if (d->opd_sync_last_catalog_idx == size)
-                       d->opd_sync_last_catalog_idx = 0;
+                       d->opd_sync_last_catalog_idx = LLOG_CAT_FIRST;
                else if (wrapped)
                        /* If catalog is wrapped we can`t predict last index of
                         * processing because lgh_last_idx could be changed.
                         * Starting form the next one */
                        d->opd_sync_last_catalog_idx++;
 
-       } while (rc == 0 && (wrapped || d->opd_sync_last_catalog_idx == 0));
+       } while (rc == 0 && (wrapped ||
+                            d->opd_sync_last_catalog_idx == LLOG_CAT_FIRST));
 
        if (rc < 0) {
                CERROR("%s: llog process with osp_sync_process_queues "