Whamcloud - gitweb
LU-3534 osp: send updates by separate thread
[fs/lustre-release.git] / lustre / obdclass / llog_osd.c
index ef7b0f0..ad843e9 100644 (file)
@@ -199,7 +199,7 @@ static int llog_osd_read_header(const struct lu_env *env,
 
        lgi = llog_info(env);
 
-       rc = dt_attr_get(env, o, &lgi->lgi_attr, NULL);
+       rc = dt_attr_get(env, o, &lgi->lgi_attr);
        if (rc)
                RETURN(rc);
 
@@ -347,7 +347,6 @@ static int llog_osd_write_rec(const struct lu_env *env,
        struct llog_rec_tail    *lrt;
        struct dt_object        *o;
        size_t                   left;
-       bool                     header_is_updated = false;
 
        ENTRY;
 
@@ -365,7 +364,7 @@ static int llog_osd_write_rec(const struct lu_env *env,
        if (reclen > LLOG_CHUNK_SIZE)
                RETURN(-E2BIG);
 
-       rc = dt_attr_get(env, o, &lgi->lgi_attr, NULL);
+       rc = dt_attr_get(env, o, &lgi->lgi_attr);
        if (rc)
                RETURN(rc);
 
@@ -508,30 +507,78 @@ static int llog_osd_write_rec(const struct lu_env *env,
 
        /* the lgh_hdr_lock protects llog header data from concurrent
         * update/cancel, the llh_count and llh_bitmap are protected */
-       spin_lock(&loghandle->lgh_hdr_lock);
+       down_write(&loghandle->lgh_hdr_lock);
        if (ext2_set_bit(index, llh->llh_bitmap)) {
                CERROR("%s: index %u already set in log bitmap\n",
                       o->do_lu.lo_dev->ld_obd->obd_name, index);
-               spin_unlock(&loghandle->lgh_hdr_lock);
+               up_write(&loghandle->lgh_hdr_lock);
                LBUG(); /* should never happen */
        }
        llh->llh_count++;
-       spin_unlock(&loghandle->lgh_hdr_lock);
 
-       lgi->lgi_off = 0;
-       lgi->lgi_buf.lb_len = llh->llh_hdr.lrh_len;
-       lgi->lgi_buf.lb_buf = &llh->llh_hdr;
-       rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
+       /* XXX It is a bit tricky here, if the log object is local,
+        * we do not need lock during write here, because if there is
+        * race, the transaction(jbd2, what about ZFS?) will make sure the
+        * conflicts will all committed in the same transaction group.
+        * But for remote object, we need lock the whole process, so to
+        * set the version of the remote transaction to make sure they
+        * are being sent in order. (see osp_md_write()) */
+       if (!dt_object_remote(o))
+               up_write(&loghandle->lgh_hdr_lock);
+
+       if (lgi->lgi_attr.la_size == 0) {
+               lgi->lgi_off = 0;
+               lgi->lgi_buf.lb_len = llh->llh_hdr.lrh_len;
+               lgi->lgi_buf.lb_buf = &llh->llh_hdr;
+               rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
+               if (rc != 0)
+                       GOTO(out_remote_unlock, rc);
+       } else {
+               /* Note: If this is not initialization (size == 0), then do not
+                * write the whole header (8k bytes), only update header/tail
+                * and bits needs to be updated. Because this update might be
+                * part of cross-MDT operation, which needs to write these
+                * updates into the update log(32KB limit) and also pack inside
+                * the RPC (1MB limit), if we write 8K for each operation, which
+                * will cost a lot space, and keep us adding more updates to one
+                * update log.*/
+               lgi->lgi_off = offsetof(typeof(*llh), llh_count);
+               lgi->lgi_buf.lb_len = sizeof(llh->llh_count);
+               lgi->lgi_buf.lb_buf = &llh->llh_count;
+               rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
+               if (rc != 0)
+                       GOTO(out_remote_unlock, rc);
+
+               lgi->lgi_off = offsetof(typeof(*llh),
+                       llh_bitmap[index / (sizeof(*llh->llh_bitmap) * 8)]);
+               lgi->lgi_buf.lb_len = sizeof(*llh->llh_bitmap);
+               lgi->lgi_buf.lb_buf =
+                       &llh->llh_bitmap[index/(sizeof(*llh->llh_bitmap)*8)];
+               rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
+               if (rc != 0)
+                       GOTO(out_remote_unlock, rc);
+
+               lgi->lgi_off = offsetof(typeof(*llh), llh_tail);
+               lgi->lgi_buf.lb_len = sizeof(llh->llh_tail);
+               lgi->lgi_buf.lb_buf = &llh->llh_tail;
+               rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
+               if (rc != 0)
+                       GOTO(out_remote_unlock, rc);
+       }
+
+out_remote_unlock:
+       /* unlock here for remote object */
+       if (dt_object_remote(o))
+               up_write(&loghandle->lgh_hdr_lock);
        if (rc)
                GOTO(out, rc);
 
-       header_is_updated = true;
-       rc = dt_attr_get(env, o, &lgi->lgi_attr, NULL);
+       rc = dt_attr_get(env, o, &lgi->lgi_attr);
        if (rc)
                GOTO(out, rc);
 
        LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
-       lgi->lgi_off = lgi->lgi_attr.la_size;
+       lgi->lgi_off = max_t(__u64, lgi->lgi_attr.la_size, lgi->lgi_off);
        lgi->lgi_buf.lb_len = reclen;
        lgi->lgi_buf.lb_buf = rec;
        rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
@@ -555,23 +602,15 @@ static int llog_osd_write_rec(const struct lu_env *env,
        RETURN(rc);
 out:
        /* cleanup llog for error case */
-       spin_lock(&loghandle->lgh_hdr_lock);
+       down_write(&loghandle->lgh_hdr_lock);
        ext2_clear_bit(index, llh->llh_bitmap);
        llh->llh_count--;
-       spin_unlock(&loghandle->lgh_hdr_lock);
+       up_write(&loghandle->lgh_hdr_lock);
 
        /* restore llog last_idx */
        loghandle->lgh_last_idx--;
        llh->llh_tail.lrt_index = loghandle->lgh_last_idx;
 
-       /* restore the header on disk if it was written */
-       if (header_is_updated) {
-               lgi->lgi_off = 0;
-               lgi->lgi_buf.lb_len = llh->llh_hdr.lrh_len;
-               lgi->lgi_buf.lb_buf = &llh->llh_hdr;
-               dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
-       }
-
        RETURN(rc);
 }
 
@@ -664,7 +703,7 @@ static int llog_osd_next_block(const struct lu_env *env,
        dt = lu2dt_dev(o->do_lu.lo_dev);
        LASSERT(dt);
 
-       rc = dt_attr_get(env, o, &lgi->lgi_attr, BYPASS_CAPA);
+       rc = dt_attr_get(env, o, &lgi->lgi_attr);
        if (rc)
                GOTO(out, rc);
 
@@ -802,7 +841,7 @@ static int llog_osd_prev_block(const struct lu_env *env,
        cur_offset = LLOG_CHUNK_SIZE;
        llog_skip_over(&cur_offset, 0, prev_idx);
 
-       rc = dt_attr_get(env, o, &lgi->lgi_attr, BYPASS_CAPA);
+       rc = dt_attr_get(env, o, &lgi->lgi_attr);
        if (rc)
                GOTO(out, rc);
 
@@ -890,8 +929,8 @@ out:
  * \retval             dt_object of llog directory
  * \retval             ERR_PTR of negative value on error
  */
-struct dt_object *llog_osd_dir_get(const struct lu_env *env,
-                                  struct llog_ctxt *ctxt)
+static struct dt_object *llog_osd_dir_get(const struct lu_env *env,
+                                         struct llog_ctxt *ctxt)
 {
        struct dt_device        *dt;
        struct dt_thread_info   *dti = dt_info(env);
@@ -947,7 +986,7 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle,
        struct dt_object                *o;
        struct dt_device                *dt;
        struct ls_device                *ls;
-       struct local_oid_storage        *los;
+       struct local_oid_storage        *los = NULL;
        int                              rc = 0;
 
        ENTRY;
@@ -958,6 +997,25 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle,
        LASSERT(ctxt->loc_exp->exp_obd);
        dt = ctxt->loc_exp->exp_obd->obd_lvfs_ctxt.dt;
        LASSERT(dt);
+       if (ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
+               if (logid != NULL) {
+                       logid_to_fid(logid, &lgi->lgi_fid);
+               } else {
+                       /* If logid == NULL, then it means the caller needs
+                        * to allocate new FID (llog_cat_declare_add_rec()). */
+                       rc = obd_fid_alloc(env, ctxt->loc_exp,
+                                          &lgi->lgi_fid, NULL);
+                       if (rc < 0)
+                               RETURN(rc);
+                       rc = 0;
+               }
+
+               o = dt_locate(env, dt, &lgi->lgi_fid);
+               if (IS_ERR(o))
+                       RETURN(PTR_ERR(o));
+
+               goto after_open;
+       }
 
        ls = ls_device_get(dt);
        if (IS_ERR(ls))
@@ -1007,6 +1065,7 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle,
        if (IS_ERR(o))
                GOTO(out_name, rc = PTR_ERR(o));
 
+after_open:
        /* No new llog is expected but doesn't exist */
        if (open_param != LLOG_OPEN_NEW && !dt_object_exists(o))
                GOTO(out_put, rc = -ENOENT);
@@ -1024,7 +1083,8 @@ out_name:
        if (handle->lgh_name != NULL)
                OBD_FREE(handle->lgh_name, strlen(name) + 1);
 out:
-       dt_los_put(los);
+       if (los != NULL)
+               dt_los_put(los);
        RETURN(rc);
 }
 
@@ -1046,6 +1106,103 @@ static int llog_osd_exist(struct llog_handle *handle)
 }
 
 /**
+ * Get dir for regular fid log object
+ *
+ * Get directory for regular fid log object, and these regular fid log
+ * object will be inserted under this directory, to satisfy the FS
+ * consistency check, e2fsck etc.
+ *
+ * \param [in] env     execution environment
+ * \param [in] dto     llog object
+ *
+ * \retval             pointer to the directory if it is found.
+ * \retval             ERR_PTR(negative errno) if it fails.
+ */
+struct dt_object *llog_osd_get_regular_fid_dir(const struct lu_env *env,
+                                              struct dt_object *dto)
+{
+       struct llog_thread_info *lgi = llog_info(env);
+       struct seq_server_site *ss = dto->do_lu.lo_dev->ld_site->ld_seq_site;
+       struct lu_seq_range     *range = &lgi->lgi_range;
+       struct lu_fid           *dir_fid = &lgi->lgi_fid;
+       struct dt_object        *dir;
+       int                     rc;
+       ENTRY;
+
+       fld_range_set_any(range);
+       LASSERT(ss != NULL);
+       rc = ss->ss_server_fld->lsf_seq_lookup(env, ss->ss_server_fld,
+                                  fid_seq(lu_object_fid(&dto->do_lu)), range);
+       if (rc < 0)
+               RETURN(ERR_PTR(rc));
+
+       lu_update_log_dir_fid(dir_fid, range->lsr_index);
+       dir = dt_locate(env, lu2dt_dev(dto->do_lu.lo_dev), dir_fid);
+       if (IS_ERR(dir))
+               RETURN(dir);
+
+       if (!dt_try_as_dir(env, dir)) {
+               lu_object_put(env, &dir->do_lu);
+               RETURN(ERR_PTR(-ENOTDIR));
+       }
+
+       RETURN(dir);
+}
+
+/**
+ * Add llog object with regular FID to name entry
+ *
+ * Add llog object with regular FID to name space, and each llog
+ * object on each MDT will be /update_log_dir/[seq:oid:ver],
+ * so to satisfy the namespace consistency check, e2fsck etc.
+ *
+ * \param [in] env     execution environment
+ * \param [in] dto     llog object
+ * \param [in] th      thandle
+ * \param [in] declare if it is declare or execution
+ *
+ * \retval             0 if insertion succeeds.
+ * \retval             negative errno if insertion fails.
+ */
+static int
+llog_osd_regular_fid_add_name_entry(const struct lu_env *env,
+                                   struct dt_object *dto,
+                                   struct thandle *th, bool declare)
+{
+       struct llog_thread_info *lgi = llog_info(env);
+       const struct lu_fid     *fid = lu_object_fid(&dto->do_lu);
+       struct dt_insert_rec    *rec = &lgi->lgi_dt_rec;
+       struct dt_object        *dir;
+       char                    *name = lgi->lgi_name;
+       int                     rc;
+       ENTRY;
+
+       if (!fid_is_norm(fid))
+               RETURN(0);
+
+       dir = llog_osd_get_regular_fid_dir(env, dto);
+       if (IS_ERR(dir))
+               RETURN(PTR_ERR(dir));
+
+       rec->rec_fid = fid;
+       rec->rec_type = S_IFREG;
+       snprintf(name, sizeof(lgi->lgi_name), DFID, PFID(fid));
+       dt_write_lock(env, dir, 0);
+       if (declare) {
+               rc = dt_declare_insert(env, dir, (struct dt_rec *)rec,
+                              (struct dt_key *)name, th);
+       } else {
+               rc = dt_insert(env, dir, (struct dt_rec *)rec,
+                              (struct dt_key *)name, th, 1);
+       }
+       dt_write_unlock(env, dir);
+
+       lu_object_put(env, &dir->do_lu);
+       RETURN(rc);
+}
+
+
+/**
  * Implementation of the llog_operations::lop_declare_create
  *
  * This function declares the llog create. It declares also name insert
@@ -1077,6 +1234,24 @@ static int llog_osd_declare_create(const struct lu_env *env,
        if (dt_object_exists(o))
                RETURN(0);
 
+       if (res->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
+               struct llog_thread_info *lgi = llog_info(env);
+
+               lgi->lgi_attr.la_valid = LA_MODE | LA_SIZE;
+               lgi->lgi_attr.la_size = 0;
+               lgi->lgi_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
+               lgi->lgi_dof.dof_type = dt_mode_to_dft(S_IFREG);
+
+               rc = dt_declare_create(env, o, &lgi->lgi_attr, NULL,
+                                      &lgi->lgi_dof, th);
+               if (rc < 0)
+                       RETURN(rc);
+
+
+               rc = llog_osd_regular_fid_add_name_entry(env, o, th, true);
+
+               RETURN(rc);
+       }
        los = res->private_data;
        LASSERT(los);
 
@@ -1140,6 +1315,26 @@ static int llog_osd_create(const struct lu_env *env, struct llog_handle *res,
        if (dt_object_exists(o))
                RETURN(-EEXIST);
 
+       if (res->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
+               struct llog_thread_info *lgi = llog_info(env);
+
+               lgi->lgi_attr.la_valid = LA_MODE | LA_SIZE | LA_TYPE;
+               lgi->lgi_attr.la_size = 0;
+               lgi->lgi_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
+               lgi->lgi_dof.dof_type = dt_mode_to_dft(S_IFREG);
+
+               dt_write_lock(env, o, 0);
+               rc = dt_create(env, o, &lgi->lgi_attr, NULL,
+                              &lgi->lgi_dof, th);
+               dt_write_unlock(env, o);
+               if (rc < 0)
+                       RETURN(rc);
+
+               rc = llog_osd_regular_fid_add_name_entry(env, o, th, false);
+
+               RETURN(rc);
+       }
+
        los = res->private_data;
        LASSERT(los);
 
@@ -1166,7 +1361,7 @@ static int llog_osd_create(const struct lu_env *env, struct llog_handle *res,
                dt_read_lock(env, llog_dir, 0);
                rc = dt_insert(env, llog_dir, (struct dt_rec *)rec,
                               (struct dt_key *)res->lgh_name,
-                              th, BYPASS_CAPA, 1);
+                              th, 1);
                dt_read_unlock(env, llog_dir);
                lu_object_put(env, &llog_dir->do_lu);
                if (rc)
@@ -1198,8 +1393,15 @@ static int llog_osd_close(const struct lu_env *env, struct llog_handle *handle)
 
        LASSERT(handle->lgh_obj);
 
-       lu_object_put(env, &handle->lgh_obj->do_lu);
-
+       if (handle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
+               /* Remove the object from the cache, otherwise it may
+                * hold LOD being released during cleanup process */
+               lu_object_put_nocache(env, &handle->lgh_obj->do_lu);
+               LASSERT(handle->private_data == NULL);
+               RETURN(rc);
+       } else {
+               lu_object_put(env, &handle->lgh_obj->do_lu);
+       }
        los = handle->private_data;
        LASSERT(los);
        dt_los_put(los);
@@ -1211,6 +1413,54 @@ static int llog_osd_close(const struct lu_env *env, struct llog_handle *handle)
 }
 
 /**
+ * delete llog object name entry
+ *
+ * Delete llog object (with regular FID) from name space (under
+ * update_log_dir).
+ *
+ * \param [in] env     execution environment
+ * \param [in] dto     llog object
+ * \param [in] th      thandle
+ * \param [in] declare if it is declare or execution
+ *
+ * \retval             0 if deletion succeeds.
+ * \retval             negative errno if deletion fails.
+ */
+static int
+llog_osd_regular_fid_del_name_entry(const struct lu_env *env,
+                                   struct dt_object *dto,
+                                   struct thandle *th, bool declare)
+{
+       struct llog_thread_info *lgi = llog_info(env);
+       const struct lu_fid     *fid = lu_object_fid(&dto->do_lu);
+       struct dt_object        *dir;
+       char                    *name = lgi->lgi_name;
+       int                     rc;
+       ENTRY;
+
+       if (!fid_is_norm(fid))
+               RETURN(0);
+
+       dir = llog_osd_get_regular_fid_dir(env, dto);
+       if (IS_ERR(dir))
+               RETURN(PTR_ERR(dir));
+
+       snprintf(name, sizeof(lgi->lgi_name), DFID, PFID(fid));
+       dt_write_lock(env, dir, 0);
+       if (declare) {
+               rc = dt_declare_delete(env, dir, (struct dt_key *)name,
+                                      th);
+       } else {
+               rc = dt_delete(env, dir, (struct dt_key *)name, th);
+       }
+       dt_write_unlock(env, dir);
+
+       lu_object_put(env, &dir->do_lu);
+       RETURN(rc);
+}
+
+
+/**
  * Implementation of the llog_operations::lop_destroy
  *
  * This function destroys the llog and deletes also entry in the
@@ -1270,17 +1520,25 @@ static int llog_osd_destroy(const struct lu_env *env,
        if (rc)
                GOTO(out_trans, rc);
 
+       if (loghandle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) {
+               rc = llog_osd_regular_fid_del_name_entry(env, o, th, true);
+               if (rc < 0)
+                       GOTO(out_trans, rc);
+       }
+
        rc = dt_trans_start_local(env, d, th);
        if (rc)
                GOTO(out_trans, rc);
 
+       th->th_wait_submit = 1;
+
        dt_write_lock(env, o, 0);
        if (dt_object_exists(o)) {
                if (name) {
                        dt_read_lock(env, llog_dir, 0);
                        rc = dt_delete(env, llog_dir,
                                       (struct dt_key *) name,
-                                      th, BYPASS_CAPA);
+                                      th);
                        dt_read_unlock(env, llog_dir);
                        if (rc) {
                                CERROR("%s: can't remove llog %s: rc = %d\n",
@@ -1293,12 +1551,20 @@ static int llog_osd_destroy(const struct lu_env *env,
                rc = dt_destroy(env, o, th);
                if (rc)
                        GOTO(out_unlock, rc);
+
+               if (loghandle->lgh_ctxt->loc_flags &
+                                               LLOG_CTXT_FLAG_NORMAL_FID) {
+                       rc = llog_osd_regular_fid_del_name_entry(env, o, th,
+                                                                false);
+                       if (rc < 0)
+                               GOTO(out_unlock, rc);
+               }
        }
 out_unlock:
        dt_write_unlock(env, o);
 out_trans:
        dt_trans_stop(env, d, th);
-       if (llog_dir != NULL)
+       if (!(IS_ERR_OR_NULL(llog_dir)))
                lu_object_put(env, &llog_dir->do_lu);
        RETURN(rc);
 }
@@ -1333,6 +1599,9 @@ static int llog_osd_setup(const struct lu_env *env, struct obd_device *obd,
        ctxt = llog_ctxt_get(olg->olg_ctxts[ctxt_idx]);
        LASSERT(ctxt);
 
+       if (disk_obd == NULL)
+               GOTO(out, rc = 0);
+
        /* initialize data allowing to generate new fids,
         * literally we need a sequece */
        lgi->lgi_fid.f_seq = FID_SEQ_LLOG;
@@ -1404,6 +1673,25 @@ struct llog_operations llog_osd_ops = {
 };
 EXPORT_SYMBOL(llog_osd_ops);
 
+struct llog_operations llog_common_cat_ops = {
+       .lop_next_block         = llog_osd_next_block,
+       .lop_prev_block         = llog_osd_prev_block,
+       .lop_read_header        = llog_osd_read_header,
+       .lop_destroy            = llog_osd_destroy,
+       .lop_setup              = llog_osd_setup,
+       .lop_cleanup            = llog_osd_cleanup,
+       .lop_open               = llog_osd_open,
+       .lop_exist              = llog_osd_exist,
+       .lop_declare_create     = llog_osd_declare_create,
+       .lop_create             = llog_osd_create,
+       .lop_declare_write_rec  = llog_osd_declare_write_rec,
+       .lop_write_rec          = llog_osd_write_rec,
+       .lop_close              = llog_osd_close,
+       .lop_add                = llog_cat_add_rec,
+       .lop_declare_add        = llog_cat_declare_add_rec,
+};
+EXPORT_SYMBOL(llog_common_cat_ops);
+
 /**
  * Read the special file which contains the list of llog catalogs IDs
  *
@@ -1455,6 +1743,12 @@ int llog_osd_get_cat_list(const struct lu_env *env, struct dt_device *d,
                lgi->lgi_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
                lgi->lgi_dof.dof_type = dt_mode_to_dft(S_IFREG);
 
+               th->th_wait_submit = 1;
+               /* Make the llog object creation synchronization, so
+                * it will be reliable to the reference, especially
+                * for remote reference */
+               th->th_sync = 1;
+
                rc = dt_declare_create(env, o, &lgi->lgi_attr, NULL,
                                       &lgi->lgi_dof, th);
                if (rc)
@@ -1475,7 +1769,7 @@ out_trans:
                        GOTO(out, rc);
        }
 
-       rc = dt_attr_get(env, o, &lgi->lgi_attr, BYPASS_CAPA);
+       rc = dt_attr_get(env, o, &lgi->lgi_attr);
        if (rc)
                GOTO(out, rc);
 
@@ -1562,7 +1856,7 @@ int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d,
        if (!dt_object_exists(o))
                GOTO(out, rc = -ENOENT);
 
-       rc = dt_attr_get(env, o, &lgi->lgi_attr, BYPASS_CAPA);
+       rc = dt_attr_get(env, o, &lgi->lgi_attr);
        if (rc)
                GOTO(out, rc);
 
@@ -1587,6 +1881,8 @@ int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d,
        if (rc)
                GOTO(out_trans, rc);
 
+       th->th_wait_submit = 1;
+
        rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
        if (rc)
                CDEBUG(D_INODE, "can't write CATALOGS at index %d: rc = %d\n",