+ rc = llog_cat_id2handle(env, cat_llh, llhp, &lir->lid_id);
+ if (rc) {
+ /* After a server crash, a stub of index record in catlog could
+ * be kept, because plain log destroy + catlog index record
+ * deletion are not atomic. So we end up with an index but no
+ * actual record. Destroy the index and move on. */
+ if (rc == -ENOENT || rc == -ESTALE)
+ rc = LLOG_DEL_RECORD;
+ else if (rc)
+ CWARN("%s: can't find llog handle "DFID":%x: rc = %d\n",
+ cat_llh->lgh_ctxt->loc_obd->obd_name,
+ PFID(&lir->lid_id.lgl_oi.oi_fid),
+ lir->lid_id.lgl_ogen, rc);
+
+ RETURN(rc);
+ }
+
+ /* clean old empty llogs, do not consider current llog in use */
+ /* ignore remote (lgh_obj == NULL) llogs */
+ hdr = (*llhp)->lgh_hdr;
+ if ((hdr->llh_flags & LLOG_F_ZAP_WHEN_EMPTY) &&
+ hdr->llh_count == 1 && cat_llh->lgh_obj != NULL &&
+ *llhp != cat_llh->u.chd.chd_current_log) {
+ rc = llog_destroy(env, *llhp);
+ if (rc)
+ CWARN("%s: can't destroy empty log "DFID": rc = %d\n",
+ (*llhp)->lgh_ctxt->loc_obd->obd_name,
+ PFID(&lir->lid_id.lgl_oi.oi_fid), rc);
+ rc = LLOG_DEL_PLAIN;
+ }
+
+ RETURN(rc);
+}
+
+static int llog_cat_process_cb(const struct lu_env *env,
+ struct llog_handle *cat_llh,
+ struct llog_rec_hdr *rec, void *data)
+{
+ struct llog_process_data *d = data;
+ struct llog_handle *llh = NULL;
+ int rc;
+
+ ENTRY;
+ rc = llog_cat_process_common(env, cat_llh, rec, &llh);
+ if (rc)
+ GOTO(out, rc);
+
+ if (rec->lrh_index < d->lpd_startcat) {
+ /* Skip processing of the logs until startcat */
+ rc = 0;
+ } else if (d->lpd_startidx > 0) {
+ struct llog_process_cat_data cd;
+
+ cd.lpcd_first_idx = d->lpd_startidx;
+ cd.lpcd_last_idx = 0;
+ rc = llog_process_or_fork(env, llh, d->lpd_cb, d->lpd_data,
+ &cd, false);
+ /* Continue processing the next log from idx 0 */
+ d->lpd_startidx = 0;
+ } else {
+ rc = llog_process_or_fork(env, llh, d->lpd_cb, d->lpd_data,
+ NULL, false);
+ }
+
+out:
+ /* The empty plain log was destroyed while processing */
+ if (rc == LLOG_DEL_PLAIN) {
+ rc = llog_cat_cleanup(env, cat_llh, llh,
+ llh->u.phd.phd_cookie.lgc_index);
+ } else if (rc == LLOG_DEL_RECORD) {
+ /* clear wrong catalog entry */
+ rc = llog_cat_cleanup(env, cat_llh, NULL, rec->lrh_index);
+ }
+
+ if (llh)
+ llog_handle_put(llh);
+
+ RETURN(rc);
+}
+
+int llog_cat_process_or_fork(const struct lu_env *env,
+ struct llog_handle *cat_llh, llog_cb_t cat_cb,
+ llog_cb_t cb, void *data, int startcat,
+ int startidx, bool fork)
+{
+ struct llog_process_data d;
+ struct llog_log_hdr *llh = cat_llh->lgh_hdr;
+ int rc;
+
+ ENTRY;
+
+ LASSERT(llh->llh_flags & LLOG_F_IS_CAT);
+ d.lpd_data = data;
+ d.lpd_cb = cb;
+ d.lpd_startcat = (startcat == LLOG_CAT_FIRST ? 0 : startcat);
+ d.lpd_startidx = startidx;
+
+ if (llh->llh_cat_idx >= cat_llh->lgh_last_idx &&
+ llh->llh_count > 1) {
+ struct llog_process_cat_data cd;
+
+ CWARN("%s: catlog "DFID" crosses index zero\n",
+ cat_llh->lgh_ctxt->loc_obd->obd_name,
+ PFID(&cat_llh->lgh_id.lgl_oi.oi_fid));
+ /*startcat = 0 is default value for general processing */
+ if ((startcat != LLOG_CAT_FIRST &&
+ startcat >= llh->llh_cat_idx) || !startcat) {
+ /* processing the catalog part at the end */
+ cd.lpcd_first_idx = (startcat ? startcat :
+ llh->llh_cat_idx);
+ if (OBD_FAIL_PRECHECK(OBD_FAIL_CAT_RECORDS))
+ cd.lpcd_last_idx = cfs_fail_val;
+ else
+ cd.lpcd_last_idx = 0;
+ rc = llog_process_or_fork(env, cat_llh, cat_cb,
+ &d, &cd, fork);
+ /* Reset the startcat becasue it has already reached
+ * catalog bottom.
+ */
+ startcat = 0;
+ if (rc != 0)
+ RETURN(rc);
+ }
+ /* processing the catalog part at the begining */
+ cd.lpcd_first_idx = (startcat == LLOG_CAT_FIRST) ? 0 : startcat;
+ /* Note, the processing will stop at the lgh_last_idx value,
+ * and it could be increased during processing. So records
+ * between current lgh_last_idx and lgh_last_idx in future
+ * would left unprocessed.
+ */
+ cd.lpcd_last_idx = cat_llh->lgh_last_idx;
+ rc = llog_process_or_fork(env, cat_llh, cat_cb,
+ &d, &cd, fork);
+ } else {
+ rc = llog_process_or_fork(env, cat_llh, cat_cb,
+ &d, NULL, fork);
+ }
+
+ RETURN(rc);
+}
+EXPORT_SYMBOL(llog_cat_process_or_fork);
+
+int llog_cat_process(const struct lu_env *env, struct llog_handle *cat_llh,
+ llog_cb_t cb, void *data, int startcat, int startidx)
+{
+ return llog_cat_process_or_fork(env, cat_llh, llog_cat_process_cb,
+ cb, data, startcat, startidx, false);
+}
+EXPORT_SYMBOL(llog_cat_process);
+
+static int llog_cat_size_cb(const struct lu_env *env,
+ struct llog_handle *cat_llh,
+ struct llog_rec_hdr *rec, void *data)
+{
+ struct llog_process_data *d = data;
+ struct llog_handle *llh = NULL;
+ __u64 *cum_size = d->lpd_data;
+ __u64 size;
+ int rc;
+
+ ENTRY;
+ rc = llog_cat_process_common(env, cat_llh, rec, &llh);
+
+ if (rc == LLOG_DEL_PLAIN) {
+ /* empty log was deleted, don't count it */
+ rc = llog_cat_cleanup(env, cat_llh, llh,
+ llh->u.phd.phd_cookie.lgc_index);
+ } else if (rc == LLOG_DEL_RECORD) {
+ /* clear wrong catalog entry */
+ rc = llog_cat_cleanup(env, cat_llh, NULL, rec->lrh_index);
+ } else {
+ size = llog_size(env, llh);
+ *cum_size += size;
+
+ CDEBUG(D_INFO, "Add llog entry "DFID" size=%llu, tot=%llu\n",
+ PFID(&llh->lgh_id.lgl_oi.oi_fid), size, *cum_size);
+ }
+
+ if (llh != NULL)
+ llog_handle_put(llh);
+
+ RETURN(0);
+}
+
+__u64 llog_cat_size(const struct lu_env *env, struct llog_handle *cat_llh)
+{
+ __u64 size = llog_size(env, cat_llh);
+
+ llog_cat_process_or_fork(env, cat_llh, llog_cat_size_cb,
+ NULL, &size, 0, 0, false);
+
+ return size;
+}
+EXPORT_SYMBOL(llog_cat_size);
+
+/* currently returns the number of "free" entries in catalog,
+ * ie the available entries for a new plain LLOG file creation,
+ * even if catalog has wrapped
+ */
+__u32 llog_cat_free_space(struct llog_handle *cat_llh)
+{
+ /* simulate almost full Catalog */
+ if (OBD_FAIL_CHECK(OBD_FAIL_CAT_FREE_RECORDS))
+ return cfs_fail_val;
+
+ if (cat_llh->lgh_hdr->llh_count == 1)
+ return LLOG_HDR_BITMAP_SIZE(cat_llh->lgh_hdr) - 1;
+
+ if (cat_llh->lgh_last_idx > cat_llh->lgh_hdr->llh_cat_idx)
+ return LLOG_HDR_BITMAP_SIZE(cat_llh->lgh_hdr) - 1 +
+ cat_llh->lgh_hdr->llh_cat_idx - cat_llh->lgh_last_idx;
+
+ /* catalog is presently wrapped */
+ return cat_llh->lgh_hdr->llh_cat_idx - cat_llh->lgh_last_idx;