Whamcloud - gitweb
LU-9714 llog: fix llog_process_thread race
[fs/lustre-release.git] / lustre / obdclass / llog.c
index d3b1e04..816f46f 100644 (file)
@@ -419,6 +419,7 @@ static int llog_process_thread(void *arg)
        struct llog_handle              *loghandle = lpi->lpi_loghandle;
        struct llog_log_hdr             *llh = loghandle->lgh_hdr;
        struct llog_process_cat_data    *cd  = lpi->lpi_catdata;
+       struct llog_thread_info         *lti;
        char                            *buf;
        size_t                           chunk_size;
        __u64                            cur_offset;
@@ -432,6 +433,8 @@ static int llog_process_thread(void *arg)
        if (llh == NULL)
                RETURN(-EINVAL);
 
+       lti = lpi->lpi_env == NULL ? NULL : llog_info(lpi->lpi_env);
+
        cur_offset = chunk_size = llh->llh_hdr.lrh_len;
        /* expect chunk_size to be power of two */
        LASSERT(is_power_of_2(chunk_size));
@@ -594,15 +597,37 @@ repeat:
                               rec->lrh_index, rec->lrh_len,
                               (int)(buf + chunk_size - (char *)rec));
 
-                       loghandle->lgh_cur_idx = rec->lrh_index;
+                       /* lgh_cur_offset is used only at llog_test_3 */
                        loghandle->lgh_cur_offset = (char *)rec - (char *)buf +
                                                    chunk_offset;
 
                        /* if set, process the callback on this record */
                        if (ext2_test_bit(index, LLOG_HDR_BITMAP(llh))) {
+                               struct llog_cookie *lgc;
+                               __u64   tmp_off;
+                               int     tmp_idx;
+
+                               if (lti != NULL) {
+                                       lgc = &lti->lgi_cookie;
+                                       /* store lu_env for recursive calls */
+                                       tmp_off = lgc->lgc_offset;
+                                       tmp_idx = lgc->lgc_index;
+
+                                       lgc->lgc_offset = (char *)rec -
+                                               (char *)buf + chunk_offset;
+                                       lgc->lgc_index = rec->lrh_index;
+                               }
+                               /* using lu_env for passing record offset to
+                                * llog_write through various callbacks */
                                rc = lpi->lpi_cb(lpi->lpi_env, loghandle, rec,
                                                 lpi->lpi_cbdata);
                                last_called_index = index;
+
+                               if (lti != NULL) {
+                                       lgc->lgc_offset = tmp_off;
+                                       lgc->lgc_index = tmp_idx;
+                               }
+
                                if (rc == LLOG_PROC_BREAK) {
                                        GOTO(out, rc);
                                } else if (rc == LLOG_DEL_RECORD) {
@@ -1140,7 +1165,8 @@ int llog_write(const struct lu_env *env, struct llog_handle *loghandle,
 {
        struct dt_device        *dt;
        struct thandle          *th;
-       int                      rc;
+       bool                    need_cookie;
+       int                     rc;
 
        ENTRY;
 
@@ -1163,8 +1189,21 @@ int llog_write(const struct lu_env *env, struct llog_handle *loghandle,
        if (rc)
                GOTO(out_trans, rc);
 
+       need_cookie = !(idx == LLOG_HEADER_IDX || idx == LLOG_NEXT_IDX);
+
        down_write(&loghandle->lgh_lock);
-       rc = llog_write_rec(env, loghandle, rec, NULL, idx, th);
+       if (need_cookie) {
+               struct llog_thread_info *lti = llog_info(env);
+
+               /* cookie comes from llog_process_thread */
+               rc = llog_write_rec(env, loghandle, rec, &lti->lgi_cookie,
+                                   rec->lrh_index, th);
+               /* upper layer didn`t pass cookie so change rc */
+               rc = (rc == 1 ? 0 : rc);
+       } else {
+               rc = llog_write_rec(env, loghandle, rec, NULL, idx, th);
+       }
+
        up_write(&loghandle->lgh_lock);
 out_trans:
        dt_trans_stop(env, dt, th);