Whamcloud - gitweb
LU-6556 obdclass: re-allow catalog to wrap around
[fs/lustre-release.git] / lustre / obdclass / llog_osd.c
index a099df2..fb8fba4 100644 (file)
@@ -382,6 +382,12 @@ static int llog_osd_write_rec(const struct lu_env *env,
        if (reclen > loghandle->lgh_hdr->llh_hdr.lrh_len)
                RETURN(-E2BIG);
 
+       /* sanity check for fixed-records llog */
+       if (idx != LLOG_HEADER_IDX && (llh->llh_flags & LLOG_F_IS_FIXSIZE)) {
+               LASSERT(llh->llh_size != 0);
+               LASSERT(llh->llh_size == reclen);
+       }
+
        rc = dt_attr_get(env, o, &lgi->lgi_attr);
        if (rc)
                RETURN(rc);
@@ -479,15 +485,8 @@ static int llog_osd_write_rec(const struct lu_env *env,
                               POSTID(&loghandle->lgh_id.lgl_oi), idx,
                               rec->lrh_len, (long long)lgi->lgi_off);
                } else if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
-                       if (llh->llh_size == 0 ||
-                           llh->llh_size != rec->lrh_len) {
-                               CERROR("%s: wrong record size, llh_size is %u"
-                                      " but record size is %u\n",
-                                      o->do_lu.lo_dev->ld_obd->obd_name,
-                                      llh->llh_size, rec->lrh_len);
-                               RETURN(-EINVAL);
-                       }
-                       lgi->lgi_off = sizeof(*llh) + (idx - 1) * reclen;
+                       lgi->lgi_off = llh->llh_hdr.lrh_len +
+                                      (idx - 1) * reclen;
                } else {
                        /* This can be result of lgh_cur_idx is not set during
                         * llog processing or llh_size is not set to proper
@@ -534,9 +533,17 @@ static int llog_osd_write_rec(const struct lu_env *env,
                        RETURN(rc);
                loghandle->lgh_last_idx++; /* for pad rec */
        }
-       /* if it's the last idx in log file, then return -ENOSPC */
-       if (loghandle->lgh_last_idx >= LLOG_HDR_BITMAP_SIZE(llh) - 1)
-               RETURN(-ENOSPC);
+       /* if it's the last idx in log file, then return -ENOSPC
+        * or wrap around if a catalog */
+       if ((loghandle->lgh_last_idx >= LLOG_HDR_BITMAP_SIZE(llh) - 1) ||
+           unlikely(llh->llh_flags & LLOG_F_IS_CAT &&
+                    OBD_FAIL_PRECHECK(OBD_FAIL_CAT_RECORDS) &&
+                    loghandle->lgh_last_idx >= cfs_fail_val)) {
+               if (llh->llh_flags & LLOG_F_IS_CAT)
+                       loghandle->lgh_last_idx = 0;
+               else
+                       RETURN(-ENOSPC);
+       }
 
        /* increment the last_idx along with llh_tail index, they should
         * be equal for a llog lifetime */
@@ -565,9 +572,7 @@ static int llog_osd_write_rec(const struct lu_env *env,
        }
        llh->llh_count++;
 
-       if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
-               LASSERT(llh->llh_size == reclen);
-       } else {
+       if (!(llh->llh_flags & LLOG_F_IS_FIXSIZE)) {
                /* Update the minimum size of the llog record */
                if (llh->llh_size == 0)
                        llh->llh_size = reclen;
@@ -623,12 +628,20 @@ out_unlock:
        if (rc)
                GOTO(out, rc);
 
-       rc = dt_attr_get(env, o, &lgi->lgi_attr);
-       if (rc)
-               GOTO(out, rc);
+       /* computed index can be used to determine offset for fixed-size
+        * records. This also allows to handle Catalog wrap around case */
+       if (llh->llh_flags & LLOG_F_IS_FIXSIZE) {
+               lgi->lgi_off = llh->llh_hdr.lrh_len + (index - 1) * reclen;
+       } else {
+               rc = dt_attr_get(env, o, &lgi->lgi_attr);
+               if (rc)
+                       GOTO(out, rc);
+
+               LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
+               lgi->lgi_off = max_t(__u64, lgi->lgi_attr.la_size,
+                                    lgi->lgi_off);
+       }
 
-       LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
-       lgi->lgi_off = max_t(__u64, lgi->lgi_attr.la_size, lgi->lgi_off);
        lgi->lgi_buf.lb_len = reclen;
        lgi->lgi_buf.lb_buf = rec;
        rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
@@ -659,7 +672,11 @@ out:
        mutex_unlock(&loghandle->lgh_hdr_mutex);
 
        /* restore llog last_idx */
-       loghandle->lgh_last_idx--;
+       if (--loghandle->lgh_last_idx == 0 &&
+           (llh->llh_flags & LLOG_F_IS_CAT) && llh->llh_cat_idx != 0) {
+               /* catalog had just wrap-around case */
+               loghandle->lgh_last_idx = LLOG_HDR_BITMAP_SIZE(llh) - 1;
+       }
        LLOG_HDR_TAIL(llh)->lrt_index = loghandle->lgh_last_idx;
 
        RETURN(rc);