+ struct llog_thread_info *lgi = llog_info(env);
+ struct llog_logid_rec *rec = &lgi->lgi_logid;
+ struct thandle *handle = NULL;
+ struct dt_device *dt = NULL;
+ struct llog_log_hdr *llh = cathandle->lgh_hdr;
+ int rc, index;
+
+ ENTRY;
+
+ index = (cathandle->lgh_last_idx + 1) %
+ (OBD_FAIL_PRECHECK(OBD_FAIL_CAT_RECORDS) ? (cfs_fail_val + 1) :
+ LLOG_HDR_BITMAP_SIZE(llh));
+
+ /* check that new llog index will not overlap with the first one.
+ * - llh_cat_idx is the index just before the first/oldest still in-use
+ * index in catalog
+ * - lgh_last_idx is the last/newest used index in catalog
+ *
+ * When catalog is not wrapped yet then lgh_last_idx is always larger
+ * than llh_cat_idx. After the wrap around lgh_last_idx re-starts
+ * from 0 and llh_cat_idx becomes the upper limit for it
+ *
+ * Check if catalog has already wrapped around or not by comparing
+ * last_idx and cat_idx */
+ if ((index == llh->llh_cat_idx + 1 && llh->llh_count > 1) ||
+ (index == 0 && llh->llh_cat_idx == 0)) {
+ if (cathandle->lgh_name == NULL) {
+ CWARN("%s: there are no more free slots in catalog "
+ DFID":%x\n",
+ loghandle->lgh_ctxt->loc_obd->obd_name,
+ PFID(&cathandle->lgh_id.lgl_oi.oi_fid),
+ cathandle->lgh_id.lgl_ogen);
+ } else {
+ CWARN("%s: there are no more free slots in "
+ "catalog %s\n",
+ loghandle->lgh_ctxt->loc_obd->obd_name,
+ cathandle->lgh_name);
+ }
+ RETURN(-ENOSPC);
+ }
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_MDS_LLOG_CREATE_FAILED))
+ RETURN(-ENOSPC);
+
+ if (loghandle->lgh_hdr != NULL) {
+ /* If llog object is remote and creation is failed, lgh_hdr
+ * might be left over here, free it first */
+ LASSERT(!llog_exist(loghandle));
+ OBD_FREE_LARGE(loghandle->lgh_hdr, loghandle->lgh_hdr_size);
+ loghandle->lgh_hdr = NULL;
+ }
+
+ if (th == NULL) {
+ dt = lu2dt_dev(cathandle->lgh_obj->do_lu.lo_dev);
+
+ handle = dt_trans_create(env, dt);
+ if (IS_ERR(handle))
+ RETURN(PTR_ERR(handle));
+
+ /* Create update llog object synchronously, which
+ * happens during inialization process see
+ * lod_sub_prep_llog(), to make sure the update
+ * llog object is created before corss-MDT writing
+ * updates into the llog object */
+ if (cathandle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID)
+ handle->th_sync = 1;
+
+ handle->th_wait_submit = 1;
+
+ rc = llog_declare_create(env, loghandle, handle);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ rec->lid_hdr.lrh_len = sizeof(*rec);
+ rec->lid_hdr.lrh_type = LLOG_LOGID_MAGIC;
+ rec->lid_id = loghandle->lgh_id;
+ rc = llog_declare_write_rec(env, cathandle, &rec->lid_hdr, -1,
+ handle);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ rc = dt_trans_start_local(env, dt, handle);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ th = handle;
+ }
+
+ rc = llog_create(env, loghandle, th);
+ /* if llog is already created, no need to initialize it */
+ if (rc == -EEXIST) {
+ GOTO(out, rc = 0);
+ } else if (rc != 0) {
+ CERROR("%s: can't create new plain llog in catalog: rc = %d\n",
+ loghandle->lgh_ctxt->loc_obd->obd_name, rc);
+ GOTO(out, rc);
+ }
+
+ rc = llog_init_handle(env, loghandle,
+ LLOG_F_IS_PLAIN | LLOG_F_ZAP_WHEN_EMPTY,
+ &cathandle->lgh_hdr->llh_tgtuuid);
+ if (rc < 0)
+ GOTO(out, rc);
+
+ /* build the record for this log in the catalog */
+ rec->lid_hdr.lrh_len = sizeof(*rec);
+ rec->lid_hdr.lrh_type = LLOG_LOGID_MAGIC;
+ rec->lid_id = loghandle->lgh_id;
+
+ /* append the new record into catalog. The new index will be
+ * assigned to the record and updated in rec header */
+ rc = llog_write_rec(env, cathandle, &rec->lid_hdr,
+ &loghandle->u.phd.phd_cookie, LLOG_NEXT_IDX, th);
+ if (rc < 0)
+ GOTO(out_destroy, rc);
+
+ CDEBUG(D_OTHER, "new plain log "DFID".%u of catalog "DFID"\n",
+ PFID(&loghandle->lgh_id.lgl_oi.oi_fid), rec->lid_hdr.lrh_index,
+ PFID(&cathandle->lgh_id.lgl_oi.oi_fid));
+
+ loghandle->lgh_hdr->llh_cat_idx = rec->lid_hdr.lrh_index;
+
+ /* limit max size of plain llog so that space can be
+ * released sooner, especially on small filesystems */
+ /* 2MB for the cases when free space hasn't been learned yet */
+ loghandle->lgh_max_size = 2 << 20;
+ dt = lu2dt_dev(cathandle->lgh_obj->do_lu.lo_dev);
+ rc = dt_statfs(env, dt, &lgi->lgi_statfs);
+ if (rc == 0 && lgi->lgi_statfs.os_bfree > 0) {
+ __u64 freespace = (lgi->lgi_statfs.os_bfree *
+ lgi->lgi_statfs.os_bsize) >> 6;
+ if (freespace < loghandle->lgh_max_size)
+ loghandle->lgh_max_size = freespace;
+ /* shouldn't be > 128MB in any case?
+ * it's 256K records of 512 bytes each */
+ if (freespace > (128 << 20))
+ loghandle->lgh_max_size = 128 << 20;
+ }
+ if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_PLAIN_RECORDS) ||
+ OBD_FAIL_PRECHECK(OBD_FAIL_CATALOG_FULL_CHECK))) {
+ // limit the numer of plain records for test
+ loghandle->lgh_max_size = loghandle->lgh_hdr_size +
+ cfs_fail_val * 64;
+ }
+
+ rc = 0;
+
+out:
+ if (handle != NULL) {
+ handle->th_result = rc >= 0 ? 0 : rc;
+ dt_trans_stop(env, dt, handle);
+ }
+ RETURN(rc);
+
+out_destroy:
+ /* to signal llog_cat_close() it shouldn't try to destroy the llog,
+ * we want to destroy it in this transaction, otherwise the object
+ * becomes an orphan */
+ loghandle->lgh_hdr->llh_flags &= ~LLOG_F_ZAP_WHEN_EMPTY;
+ /* this is to mimic full log, so another llog_cat_current_log()
+ * can skip it and ask for another onet */
+ loghandle->lgh_last_idx = LLOG_HDR_BITMAP_SIZE(loghandle->lgh_hdr) + 1;
+ llog_trans_destroy(env, loghandle, th);
+ if (handle != NULL)
+ dt_trans_stop(env, dt, handle);
+ RETURN(rc);
+}