+ struct llog_thread_info *lgi = llog_info(env);
+ struct llog_logid_rec *rec = &lgi->lgi_logid;
+ struct thandle *handle = NULL;
+ struct dt_device *dt = NULL;
+ struct llog_log_hdr *llh = cathandle->lgh_hdr;
+ int rc, index;
+
+ ENTRY;
+
+ index = (cathandle->lgh_last_idx + 1) % (llog_max_idx(llh) + 1);
+
+ /* check that new llog index will not overlap with the first one.
+ * - llh_cat_idx is the index just before the first/oldest still in-use
+ * index in catalog
+ * - lgh_last_idx is the last/newest used index in catalog
+ *
+ * When catalog is not wrapped yet then lgh_last_idx is always larger
+ * than llh_cat_idx. After the wrap around lgh_last_idx re-starts
+ * from 0 and llh_cat_idx becomes the upper limit for it
+ *
+ * Check if catalog has already wrapped around or not by comparing
+ * last_idx and cat_idx */
+ if ((index == llh->llh_cat_idx + 1 && llh->llh_count > 1) ||
+ (index == 0 && llh->llh_cat_idx == 0)) {
+ if (cathandle->lgh_name == NULL) {
+ CWARN("%s: there are no more free slots in catalog "DFID"\n",
+ loghandle2name(loghandle),
+ PLOGID(&cathandle->lgh_id));
+ } else {
+ CWARN("%s: there are no more free slots in catalog %s\n",
+ loghandle2name(loghandle), cathandle->lgh_name);
+ }
+ RETURN(-ENOSPC);
+ }
+
+ if (CFS_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED))
+ RETURN(-ENOSPC);
+
+ if (loghandle->lgh_hdr != NULL) {
+ /* If llog object is remote and creation is failed, lgh_hdr
+ * might be left over here, free it first */
+ LASSERT(!llog_exist(loghandle));
+ OBD_FREE_LARGE(loghandle->lgh_hdr, loghandle->lgh_hdr_size);
+ loghandle->lgh_hdr = NULL;
+ }
+
+ if (th == NULL) {
+ dt = lu2dt_dev(cathandle->lgh_obj->do_lu.lo_dev);
+
+ handle = dt_trans_create(env, dt);
+ if (IS_ERR(handle))
+ RETURN(PTR_ERR(handle));
+
+ /* Create update llog object synchronously, which
+ * happens during inialization process see
+ * lod_sub_prep_llog(), to make sure the update
+ * llog object is created before corss-MDT writing
+ * updates into the llog object */
+ if (cathandle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID)
+ handle->th_sync = 1;
+
+ handle->th_wait_submit = 1;
+
+ rc = llog_declare_create(env, loghandle, handle);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ rec->lid_hdr.lrh_len = sizeof(*rec);
+ rec->lid_hdr.lrh_type = LLOG_LOGID_MAGIC;
+ rec->lid_id = loghandle->lgh_id;
+ rc = llog_declare_write_rec(env, cathandle, &rec->lid_hdr, -1,
+ handle);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ rc = dt_trans_start_local(env, dt, handle);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ th = handle;
+ }
+
+ rc = llog_create(env, loghandle, th);
+ /* if llog is already created, no need to initialize it */
+ if (rc == -EEXIST) {
+ GOTO(out, rc = 0);
+ } else if (rc != 0) {
+ CERROR("%s: can't create new plain llog in catalog: rc = %d\n",
+ loghandle2name(loghandle), rc);
+ GOTO(out, rc);
+ }
+
+ rc = llog_init_handle(env, loghandle,
+ LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
+ &cathandle->lgh_hdr->llh_tgtuuid);
+ if (rc < 0)
+ GOTO(out, rc);
+
+ /* build the record for this log in the catalog */
+ rec->lid_hdr.lrh_len = sizeof(*rec);
+ rec->lid_hdr.lrh_type = LLOG_LOGID_MAGIC;
+ rec->lid_id = loghandle->lgh_id;
+
+ /* append the new record into catalog. The new index will be
+ * assigned to the record and updated in rec header */
+ rc = llog_write_rec(env, cathandle, &rec->lid_hdr,
+ &loghandle->u.phd.phd_cookie, LLOG_NEXT_IDX, th);
+ if (rc < 0)
+ GOTO(out_destroy, rc);
+
+ CDEBUG(D_OTHER, "new plain log "DFID".%u of catalog "DFID"\n",
+ PLOGID(&loghandle->lgh_id), rec->lid_hdr.lrh_index,
+ PLOGID(&cathandle->lgh_id));
+
+ loghandle->lgh_hdr->llh_cat_idx = rec->lid_hdr.lrh_index;
+
+ /* limit max size of plain llog so that space can be
+ * released sooner, especially on small filesystems */
+ /* 2MB for the cases when free space hasn't been learned yet */
+ loghandle->lgh_max_size = 2 << 20;
+ dt = lu2dt_dev(cathandle->lgh_obj->do_lu.lo_dev);
+ rc = dt_statfs(env, dt, &lgi->lgi_statfs);
+ if (rc == 0 && lgi->lgi_statfs.os_bfree > 0) {
+ __u64 freespace = (lgi->lgi_statfs.os_bfree *
+ lgi->lgi_statfs.os_bsize) >> 6;
+ if (freespace < loghandle->lgh_max_size)
+ loghandle->lgh_max_size = freespace;
+ /* shouldn't be > 128MB in any case?
+ * it's 256K records of 512 bytes each */
+ if (freespace > (128 << 20))
+ loghandle->lgh_max_size = 128 << 20;
+ }
+ if (unlikely(CFS_FAIL_PRECHECK(OBD_FAIL_PLAIN_RECORDS) ||
+ CFS_FAIL_PRECHECK(OBD_FAIL_CATALOG_FULL_CHECK))) {
+ // limit the numer of plain records for test
+ loghandle->lgh_max_size = loghandle->lgh_hdr_size +
+ cfs_fail_val * 64;
+ }
+
+ rc = 0;