X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Flod%2Flod_sub_object.c;h=e2eb8d7fcd7fae626353af30e66334d2518ed207;hb=1f6cb3534e74f0c9462008c8088b5734b64ed41c;hp=cce804fd30d6742fa683ebb6bbb4cee9c0ea933d;hpb=b4e6b6b280626dbafebe3e35858707f3143de24a;p=fs%2Flustre-release.git diff --git a/lustre/lod/lod_sub_object.c b/lustre/lod/lod_sub_object.c index cce804f..e2eb8d7 100644 --- a/lustre/lod/lod_sub_object.c +++ b/lustre/lod/lod_sub_object.c @@ -20,7 +20,7 @@ * GPL HEADER END */ /* - * Copyright (c) 2014, Intel Corporation. + * Copyright (c) 2015, 2017, Intel Corporation. */ /* * lustre/lod/lod_sub_object.c @@ -42,20 +42,22 @@ #include #include -#include +#include #include #include #include -#include +#include #include #include +#include #include "lod_internal.h" struct thandle *lod_sub_get_thandle(const struct lu_env *env, struct thandle *th, - const struct dt_object *sub_obj) + const struct dt_object *sub_obj, + bool *record_update) { struct lod_device *lod = dt2lod_dev(th->th_dev); struct top_thandle *tth; @@ -65,27 +67,48 @@ struct thandle *lod_sub_get_thandle(const struct lu_env *env, int rc; ENTRY; + if (record_update != NULL) + *record_update = false; + if (th->th_top == NULL) RETURN(th); tth = container_of(th, struct top_thandle, tt_super); - LASSERT(tth->tt_magic == TOP_THANDLE_MAGIC); + tth->tt_master_sub_thandle->th_ignore_quota = th->th_ignore_quota; + /* local object must be mdt object, Note: during ost object - * creation, FID is not assigned until osp_object_create(), + * creation, FID is not assigned until osp_create(), * so if the FID of sub_obj is zero, it means OST object. */ if (!dt_object_remote(sub_obj) || - fid_is_zero(lu_object_fid(&sub_obj->do_lu))) + fid_is_zero(lu_object_fid(&sub_obj->do_lu))) { + /* local MDT object */ + if (fid_is_sane(lu_object_fid(&sub_obj->do_lu)) && + tth->tt_multiple_thandle != NULL && + record_update != NULL && + th->th_result == 0) + *record_update = true; + RETURN(tth->tt_master_sub_thandle); + } rc = lod_fld_lookup(env, lod, lu_object_fid(&sub_obj->do_lu), &mdt_index, &type); if (rc < 0) RETURN(ERR_PTR(rc)); - if (type == LU_SEQ_RANGE_OST) + /* th_complex means we need track all of updates for this + * transaction, include changes on OST */ + if (type == LU_SEQ_RANGE_OST && !th->th_complex) RETURN(tth->tt_master_sub_thandle); sub_th = thandle_get_sub(env, th, sub_obj); + if (IS_ERR(sub_th)) + RETURN(sub_th); + sub_th->th_ignore_quota = th->th_ignore_quota; + + if (tth->tt_multiple_thandle != NULL && record_update != NULL && + th->th_result == 0) + *record_update = true; RETURN(sub_th); } @@ -105,19 +128,22 @@ struct thandle *lod_sub_get_thandle(const struct lu_env *env, * \retval 0 if the declaration succeeds * \retval negative errno if the declaration fails. */ -int lod_sub_object_declare_create(const struct lu_env *env, - struct dt_object *dt, - struct lu_attr *attr, - struct dt_allocation_hint *hint, - struct dt_object_format *dof, - struct thandle *th) +int lod_sub_declare_create(const struct lu_env *env, struct dt_object *dt, + struct lu_attr *attr, + struct dt_allocation_hint *hint, + struct dt_object_format *dof, struct thandle *th) { struct thandle *sub_th; + bool record_update; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) return PTR_ERR(sub_th); + if (record_update) + update_record_size(env, create, th, lu_object_fid(&dt->do_lu), + attr, hint, dof); + return dt_declare_create(env, dt, attr, hint, dof, sub_th); } @@ -137,20 +163,27 @@ int lod_sub_object_declare_create(const struct lu_env *env, * \retval 0 if the creation succeeds * \retval negative errno if the creation fails. */ -int lod_sub_object_create(const struct lu_env *env, struct dt_object *dt, - struct lu_attr *attr, - struct dt_allocation_hint *hint, - struct dt_object_format *dof, - struct thandle *th) +int lod_sub_create(const struct lu_env *env, struct dt_object *dt, + struct lu_attr *attr, struct dt_allocation_hint *hint, + struct dt_object_format *dof, struct thandle *th) { struct thandle *sub_th; + bool record_update; int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); + if (record_update) { + rc = update_record_pack(create, th, + lu_object_fid(&dt->do_lu), + attr, hint, dof); + if (rc < 0) + RETURN(rc); + } + rc = dt_create(env, dt, attr, hint, dof, sub_th); RETURN(rc); @@ -168,18 +201,21 @@ int lod_sub_object_create(const struct lu_env *env, struct dt_object *dt, * \retval 0 if the declaration succeeds. * \retval negative errno if the declaration fails. */ -int lod_sub_object_declare_ref_add(const struct lu_env *env, - struct dt_object *dt, - struct thandle *th) +int lod_sub_declare_ref_add(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) { struct thandle *sub_th; + bool record_update; int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); + if (record_update) + update_record_size(env, ref_add, th, lu_object_fid(&dt->do_lu)); + rc = dt_declare_ref_add(env, dt, sub_th); RETURN(rc); @@ -198,17 +234,25 @@ int lod_sub_object_declare_ref_add(const struct lu_env *env, * \retval 0 if it succeeds. * \retval negative errno if it fails. */ -int lod_sub_object_ref_add(const struct lu_env *env, struct dt_object *dt, - struct thandle *th) +int lod_sub_ref_add(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) { struct thandle *sub_th; + bool record_update; int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); + if (record_update) { + rc = update_record_pack(ref_add, th, + lu_object_fid(&dt->do_lu)); + if (rc < 0) + RETURN(rc); + } + rc = dt_ref_add(env, dt, sub_th); RETURN(rc); @@ -226,18 +270,21 @@ int lod_sub_object_ref_add(const struct lu_env *env, struct dt_object *dt, * \retval 0 if the declaration succeeds. * \retval negative errno if the declaration fails. */ -int lod_sub_object_declare_ref_del(const struct lu_env *env, - struct dt_object *dt, - struct thandle *th) +int lod_sub_declare_ref_del(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) { struct thandle *sub_th; + bool record_update; int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); + if (record_update) + update_record_size(env, ref_del, th, lu_object_fid(&dt->do_lu)); + rc = dt_declare_ref_del(env, dt, sub_th); RETURN(rc); @@ -256,17 +303,25 @@ int lod_sub_object_declare_ref_del(const struct lu_env *env, * \retval 0 if it succeeds. * \retval negative errno if it fails. */ -int lod_sub_object_ref_del(const struct lu_env *env, struct dt_object *dt, - struct thandle *th) +int lod_sub_ref_del(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) { struct thandle *sub_th; + bool record_update; int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); + if (record_update) { + rc = update_record_pack(ref_del, th, + lu_object_fid(&dt->do_lu)); + if (rc < 0) + RETURN(rc); + } + rc = dt_ref_del(env, dt, sub_th); RETURN(rc); @@ -284,18 +339,21 @@ int lod_sub_object_ref_del(const struct lu_env *env, struct dt_object *dt, * \retval 0 if the declaration succeeds. * \retval negative errno if the declaration fails. */ -int lod_sub_object_declare_destroy(const struct lu_env *env, - struct dt_object *dt, - struct thandle *th) +int lod_sub_declare_destroy(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) { struct thandle *sub_th; + bool record_update; int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); + if (record_update) + update_record_size(env, destroy, th, lu_object_fid(&dt->do_lu)); + rc = dt_declare_destroy(env, dt, sub_th); RETURN(rc); @@ -314,17 +372,24 @@ int lod_sub_object_declare_destroy(const struct lu_env *env, * \retval 0 if the destroy succeeds. * \retval negative errno if the destroy fails. */ -int lod_sub_object_destroy(const struct lu_env *env, struct dt_object *dt, - struct thandle *th) +int lod_sub_destroy(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) { struct thandle *sub_th; + bool record_update; int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); + if (record_update) { + rc = update_record_pack(destroy, th, lu_object_fid(&dt->do_lu)); + if (rc < 0) + RETURN(rc); + } + rc = dt_destroy(env, dt, sub_th); RETURN(rc); @@ -344,18 +409,21 @@ int lod_sub_object_destroy(const struct lu_env *env, struct dt_object *dt, * \retval 0 if the declaration succeeds. * \retval negative errno if the declaration fails. */ -int lod_sub_object_declare_insert(const struct lu_env *env, - struct dt_object *dt, - const struct dt_rec *rec, - const struct dt_key *key, - struct thandle *th) +int lod_sub_declare_insert(const struct lu_env *env, struct dt_object *dt, + const struct dt_rec *rec, + const struct dt_key *key, struct thandle *th) { struct thandle *sub_th; + bool record_update; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) return PTR_ERR(sub_th); + if (record_update) + update_record_size(env, index_insert, th, + lu_object_fid(&dt->do_lu), rec, key); + return dt_declare_insert(env, dt, rec, key, sub_th); } @@ -370,23 +438,30 @@ int lod_sub_object_declare_insert(const struct lu_env *env, * \param[in] rec record of the index to be inserted * \param[in] key key of the index to be inserted * \param[in] th the transaction handle - * \param[in] ign whether ignore quota * * \retval 0 if the insertion succeeds. * \retval negative errno if the insertion fails. */ -int lod_sub_object_index_insert(const struct lu_env *env, struct dt_object *dt, - const struct dt_rec *rec, - const struct dt_key *key, struct thandle *th, - int ign) +int lod_sub_insert(const struct lu_env *env, struct dt_object *dt, + const struct dt_rec *rec, const struct dt_key *key, + struct thandle *th) { struct thandle *sub_th; + int rc; + bool record_update; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) return PTR_ERR(sub_th); - return dt_insert(env, dt, rec, key, sub_th, ign); + if (record_update) { + rc = update_record_pack(index_insert, th, + lu_object_fid(&dt->do_lu), rec, key); + if (rc < 0) + return rc; + } + + return dt_insert(env, dt, rec, key, sub_th); } /** @@ -402,17 +477,20 @@ int lod_sub_object_index_insert(const struct lu_env *env, struct dt_object *dt, * \retval 0 if the declaration succeeds. * \retval negative errno if the declaration fails. */ -int lod_sub_object_declare_delete(const struct lu_env *env, - struct dt_object *dt, - const struct dt_key *key, - struct thandle *th) +int lod_sub_declare_delete(const struct lu_env *env, struct dt_object *dt, + const struct dt_key *key, struct thandle *th) { struct thandle *sub_th; + bool record_update; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) return PTR_ERR(sub_th); + if (record_update) + update_record_size(env, index_delete, th, + lu_object_fid(&dt->do_lu), key); + return dt_declare_delete(env, dt, key, sub_th); } @@ -430,17 +508,25 @@ int lod_sub_object_declare_delete(const struct lu_env *env, * \retval 0 if the deletion succeeds. * \retval negative errno if the deletion fails. */ -int lod_sub_object_delete(const struct lu_env *env, struct dt_object *dt, - const struct dt_key *name, struct thandle *th) +int lod_sub_delete(const struct lu_env *env, struct dt_object *dt, + const struct dt_key *name, struct thandle *th) { struct thandle *sub_th; + bool record_update; int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); + if (record_update) { + rc = update_record_pack(index_delete, th, + lu_object_fid(&dt->do_lu), name); + if (rc < 0) + RETURN(rc); + } + rc = dt_delete(env, dt, name, sub_th); RETURN(rc); } @@ -459,20 +545,24 @@ int lod_sub_object_delete(const struct lu_env *env, struct dt_object *dt, * \retval 0 if the declaration succeeds. * \retval negative errno if the declaration fails. */ -int lod_sub_object_declare_xattr_set(const struct lu_env *env, - struct dt_object *dt, - const struct lu_buf *buf, - const char *name, int fl, - struct thandle *th) +int lod_sub_declare_xattr_set(const struct lu_env *env, struct dt_object *dt, + const struct lu_buf *buf, const char *name, + int fl, struct thandle *th) { struct thandle *sub_th; + bool record_update; int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); + if (record_update) + update_record_size(env, xattr_set, th, + lu_object_fid(&dt->do_lu), + buf, name, fl); + rc = dt_declare_xattr_set(env, dt, buf, name, fl, sub_th); RETURN(rc); @@ -494,18 +584,27 @@ int lod_sub_object_declare_xattr_set(const struct lu_env *env, * \retval 0 if the xattr setting succeeds. * \retval negative errno if xattr setting fails. */ -int lod_sub_object_xattr_set(const struct lu_env *env, struct dt_object *dt, - const struct lu_buf *buf, const char *name, int fl, - struct thandle *th) +int lod_sub_xattr_set(const struct lu_env *env, struct dt_object *dt, + const struct lu_buf *buf, const char *name, int fl, + struct thandle *th) { struct thandle *sub_th; + bool record_update; int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); + if (record_update) { + rc = update_record_pack(xattr_set, th, + lu_object_fid(&dt->do_lu), + buf, name, fl); + if (rc < 0) + RETURN(rc); + } + rc = dt_xattr_set(env, dt, buf, name, fl, sub_th); RETURN(rc); @@ -524,19 +623,22 @@ int lod_sub_object_xattr_set(const struct lu_env *env, struct dt_object *dt, * \retval 0 if the declaration succeeds. * \retval negative errno if the declaration fails. */ -int lod_sub_object_declare_attr_set(const struct lu_env *env, - struct dt_object *dt, - const struct lu_attr *attr, - struct thandle *th) +int lod_sub_declare_attr_set(const struct lu_env *env, struct dt_object *dt, + const struct lu_attr *attr, struct thandle *th) { struct thandle *sub_th; + bool record_update; int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); + if (record_update) + update_record_size(env, attr_set, th, + lu_object_fid(&dt->do_lu), attr); + rc = dt_declare_attr_set(env, dt, attr, sub_th); RETURN(rc); @@ -556,19 +658,25 @@ int lod_sub_object_declare_attr_set(const struct lu_env *env, * \retval 0 if attributes setting succeeds. * \retval negative errno if the attributes setting fails. */ -int lod_sub_object_attr_set(const struct lu_env *env, - struct dt_object *dt, - const struct lu_attr *attr, - struct thandle *th) +int lod_sub_attr_set(const struct lu_env *env, struct dt_object *dt, + const struct lu_attr *attr, struct thandle *th) { + bool record_update; struct thandle *sub_th; int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); + if (record_update) { + rc = update_record_pack(attr_set, th, lu_object_fid(&dt->do_lu), + attr); + if (rc < 0) + RETURN(rc); + } + rc = dt_attr_set(env, dt, attr, sub_th); RETURN(rc); @@ -587,19 +695,23 @@ int lod_sub_object_attr_set(const struct lu_env *env, * \retval 0 if the declaration succeeds. * \retval negative errno if the declaration fails. */ -int lod_sub_object_declare_xattr_del(const struct lu_env *env, - struct dt_object *dt, - const char *name, - struct thandle *th) +int lod_sub_declare_xattr_del(const struct lu_env *env, struct dt_object *dt, + const char *name, struct thandle *th) { struct thandle *sub_th; + bool record_update; int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); + if (record_update) + update_record_size(env, xattr_del, th, + lu_object_fid(&dt->do_lu), + name); + rc = dt_declare_xattr_del(env, dt, name, sub_th); RETURN(rc); @@ -619,19 +731,25 @@ int lod_sub_object_declare_xattr_del(const struct lu_env *env, * \retval 0 if the deletion succeeds. * \retval negative errno if the deletion fails. */ -int lod_sub_object_xattr_del(const struct lu_env *env, - struct dt_object *dt, - const char *name, - struct thandle *th) +int lod_sub_xattr_del(const struct lu_env *env, struct dt_object *dt, + const char *name, struct thandle *th) { struct thandle *sub_th; + bool record_update; int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); + if (record_update) { + rc = update_record_pack(xattr_del, th, + lu_object_fid(&dt->do_lu), name); + if (rc < 0) + RETURN(rc); + } + rc = dt_xattr_del(env, dt, name, sub_th); RETURN(rc); @@ -651,19 +769,24 @@ int lod_sub_object_xattr_del(const struct lu_env *env, * \retval 0 if the insertion succeeds. * \retval negative errno if the insertion fails. */ -int lod_sub_object_declare_write(const struct lu_env *env, - struct dt_object *dt, - const struct lu_buf *buf, loff_t pos, - struct thandle *th) +int lod_sub_declare_write(const struct lu_env *env, struct dt_object *dt, + const struct lu_buf *buf, loff_t pos, + struct thandle *th) { struct thandle *sub_th; + bool record_update; int rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); + if (record_update) + update_record_size(env, write, th, + lu_object_fid(&dt->do_lu), + buf, pos); + rc = dt_declare_write(env, dt, buf, pos, sub_th); RETURN(rc); @@ -680,23 +803,181 @@ int lod_sub_object_declare_write(const struct lu_env *env, * \param[in] buf buffer to write which includes an embedded size field * \param[in] pos offet in the object to start writing at * \param[in] th transaction handle - * \param[in] rq enforcement for this write * * \retval the buffer size in bytes if it succeeds. * \retval negative errno if it fails. */ -ssize_t lod_sub_object_write(const struct lu_env *env, struct dt_object *dt, - const struct lu_buf *buf, loff_t *pos, - struct thandle *th, int rq) +ssize_t lod_sub_write(const struct lu_env *env, struct dt_object *dt, + const struct lu_buf *buf, loff_t *pos, + struct thandle *th) { struct thandle *sub_th; + bool record_update; ssize_t rc; ENTRY; - sub_th = lod_sub_get_thandle(env, th, dt); + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); + if (IS_ERR(sub_th)) + RETURN(PTR_ERR(sub_th)); + + if (record_update) { + rc = update_record_pack(write, th, lu_object_fid(&dt->do_lu), + buf, *pos); + if (rc < 0) + RETURN(rc); + } + + rc = dt_write(env, dt, buf, pos, sub_th); + RETURN(rc); +} + +/** + * Declare punch + * + * Get transaction of next layer and declare punch. + * + * \param[in] env execution environment + * \param[in] dt object to be written + * \param[in] start start offset of punch + * \param[in] end end offet of punch + * \param[in] th transaction handle + * + * \retval 0 if the insertion succeeds. + * \retval negative errno if the insertion fails. + */ +int lod_sub_declare_punch(const struct lu_env *env, struct dt_object *dt, + __u64 start, __u64 end, struct thandle *th) +{ + struct thandle *sub_th; + bool record_update; + int rc; + ENTRY; + + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); + if (IS_ERR(sub_th)) + RETURN(PTR_ERR(sub_th)); + + if (record_update) + update_record_size(env, punch, th, + lu_object_fid(&dt->do_lu), + start, end); + + rc = dt_declare_punch(env, dt, start, end, sub_th); + + RETURN(rc); +} + +/** + * Punch to sub object + * + * Get transaction of next layer, records buffer write if it belongs to + * Cross-MDT operation, and punch object. + * + * \param[in] env execution environment + * \param[in] dt object to be written + * \param[in] start start offset of punch + * \param[in] end end offset of punch + * \param[in] th transaction handle + * \param[in] capa capability of the write + * + * \retval the buffer size in bytes if it succeeds. + * \retval negative errno if it fails. + */ +int lod_sub_punch(const struct lu_env *env, struct dt_object *dt, + __u64 start, __u64 end, struct thandle *th) +{ + struct thandle *sub_th; + bool record_update; + int rc; + ENTRY; + + sub_th = lod_sub_get_thandle(env, th, dt, &record_update); if (IS_ERR(sub_th)) RETURN(PTR_ERR(sub_th)); - rc = dt_write(env, dt, buf, pos, sub_th, rq); + if (record_update) { + rc = update_record_pack(punch, th, lu_object_fid(&dt->do_lu), + start, end); + if (rc < 0) + RETURN(rc); + } + + rc = dt_punch(env, dt, start, end, sub_th); + + RETURN(rc); +} + +int lod_sub_prep_llog(const struct lu_env *env, struct lod_device *lod, + struct dt_device *dt, int index) +{ + struct lod_thread_info *lti = lod_env_info(env); + struct llog_ctxt *ctxt; + struct llog_handle *lgh; + struct llog_catid *cid = <i->lti_cid; + struct lu_fid *fid = <i->lti_fid; + struct obd_device *obd; + int rc; + bool need_put = false; + ENTRY; + + lu_update_log_fid(fid, index); + + rc = lodname2mdt_index(lod2obd(lod)->obd_name, (__u32 *)&index); + if (rc < 0) + RETURN(rc); + + rc = llog_osd_get_cat_list(env, dt, index, 1, cid, fid); + if (rc != 0) { + CERROR("%s: can't get id from catalogs: rc = %d\n", + lod2obd(lod)->obd_name, rc); + RETURN(rc); + } + + obd = dt->dd_lu_dev.ld_obd; + ctxt = llog_get_context(obd, LLOG_UPDATELOG_ORIG_CTXT); + LASSERT(ctxt != NULL); + ctxt->loc_flags |= LLOG_CTXT_FLAG_NORMAL_FID; + ctxt->loc_chunk_size = LLOG_MIN_CHUNK_SIZE * 4; + if (likely(logid_id(&cid->lci_logid) != 0)) { + rc = llog_open(env, ctxt, &lgh, &cid->lci_logid, NULL, + LLOG_OPEN_EXISTS); + + /* re-create llog if it is missing */ + if (rc == -ENOENT) + logid_set_id(&cid->lci_logid, 0); + else if (rc < 0) + GOTO(out_put, rc); + } + + if (unlikely(logid_id(&cid->lci_logid) == 0)) { + rc = llog_open_create(env, ctxt, &lgh, NULL, NULL); + if (rc < 0) + GOTO(out_put, rc); + cid->lci_logid = lgh->lgh_id; + need_put = true; + } + + LASSERT(lgh != NULL); + + rc = llog_init_handle(env, lgh, LLOG_F_IS_CAT, NULL); + if (rc != 0) + GOTO(out_close, rc); + + if (need_put) { + rc = llog_osd_put_cat_list(env, dt, index, 1, cid, fid); + if (rc != 0) + GOTO(out_close, rc); + } + + ctxt->loc_handle = lgh; + + CDEBUG(D_INFO, "%s: init llog for index %d - catid "DFID":%x\n", + obd->obd_name, index, PFID(&cid->lci_logid.lgl_oi.oi_fid), + cid->lci_logid.lgl_ogen); +out_close: + if (rc != 0) + llog_cat_close(env, lgh); +out_put: + llog_ctxt_put(ctxt); RETURN(rc); }