* GPL HEADER END
*/
/*
- * Copyright (c) 2014, Intel Corporation.
+ * Copyright (c) 2015, 2017, Intel Corporation.
*/
/*
* lustre/lod/lod_sub_object.c
#include <obd.h>
#include <obd_class.h>
-#include <lustre_ver.h>
+#include <uapi/linux/lustre/lustre_ver.h>
#include <obd_support.h>
#include <lprocfs_status.h>
#include <lustre_fid.h>
-#include <lustre_param.h>
+#include <uapi/linux/lustre/lustre_param.h>
#include <md_object.h>
#include <lustre_linkea.h>
#include <lustre_log.h>
RETURN(th);
tth = container_of(th, struct top_thandle, tt_super);
- LASSERT(tth->tt_magic == TOP_THANDLE_MAGIC);
+ tth->tt_master_sub_thandle->th_ignore_quota = th->th_ignore_quota;
+
/* local object must be mdt object, Note: during ost object
- * creation, FID is not assigned until osp_object_create(),
+ * creation, FID is not assigned until osp_create(),
* so if the FID of sub_obj is zero, it means OST object. */
if (!dt_object_remote(sub_obj) ||
fid_is_zero(lu_object_fid(&sub_obj->do_lu))) {
/* local MDT object */
if (fid_is_sane(lu_object_fid(&sub_obj->do_lu)) &&
- tth->tt_update_records != NULL &&
- record_update != NULL)
+ tth->tt_multiple_thandle != NULL &&
+ record_update != NULL &&
+ th->th_result == 0)
*record_update = true;
RETURN(tth->tt_master_sub_thandle);
if (rc < 0)
RETURN(ERR_PTR(rc));
- if (type == LU_SEQ_RANGE_OST)
+ /* th_complex means we need track all of updates for this
+ * transaction, include changes on OST */
+ if (type == LU_SEQ_RANGE_OST && !th->th_complex)
RETURN(tth->tt_master_sub_thandle);
- if (tth->tt_update_records != NULL && record_update != NULL)
- *record_update = true;
-
sub_th = thandle_get_sub(env, th, sub_obj);
+ if (IS_ERR(sub_th))
+ RETURN(sub_th);
+ sub_th->th_ignore_quota = th->th_ignore_quota;
+
+ if (tth->tt_multiple_thandle != NULL && record_update != NULL &&
+ th->th_result == 0)
+ *record_update = true;
RETURN(sub_th);
}
* \retval 0 if the declaration succeeds
* \retval negative errno if the declaration fails.
*/
-int lod_sub_object_declare_create(const struct lu_env *env,
- struct dt_object *dt,
- struct lu_attr *attr,
- struct dt_allocation_hint *hint,
- struct dt_object_format *dof,
- struct thandle *th)
+int lod_sub_declare_create(const struct lu_env *env, struct dt_object *dt,
+ struct lu_attr *attr,
+ struct dt_allocation_hint *hint,
+ struct dt_object_format *dof, struct thandle *th)
{
struct thandle *sub_th;
+ bool record_update;
- sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+ sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
if (IS_ERR(sub_th))
return PTR_ERR(sub_th);
+ if (record_update)
+ update_record_size(env, create, th, lu_object_fid(&dt->do_lu),
+ attr, hint, dof);
+
return dt_declare_create(env, dt, attr, hint, dof, sub_th);
}
* \retval 0 if the creation succeeds
* \retval negative errno if the creation fails.
*/
-int lod_sub_object_create(const struct lu_env *env, struct dt_object *dt,
- struct lu_attr *attr,
- struct dt_allocation_hint *hint,
- struct dt_object_format *dof,
- struct thandle *th)
+int lod_sub_create(const struct lu_env *env, struct dt_object *dt,
+ struct lu_attr *attr, struct dt_allocation_hint *hint,
+ struct dt_object_format *dof, struct thandle *th)
{
struct thandle *sub_th;
bool record_update;
* \retval 0 if the declaration succeeds.
* \retval negative errno if the declaration fails.
*/
-int lod_sub_object_declare_ref_add(const struct lu_env *env,
- struct dt_object *dt,
- struct thandle *th)
+int lod_sub_declare_ref_add(const struct lu_env *env, struct dt_object *dt,
+ struct thandle *th)
{
struct thandle *sub_th;
+ bool record_update;
int rc;
ENTRY;
- sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+ sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
if (IS_ERR(sub_th))
RETURN(PTR_ERR(sub_th));
+ if (record_update)
+ update_record_size(env, ref_add, th, lu_object_fid(&dt->do_lu));
+
rc = dt_declare_ref_add(env, dt, sub_th);
RETURN(rc);
* \retval 0 if it succeeds.
* \retval negative errno if it fails.
*/
-int lod_sub_object_ref_add(const struct lu_env *env, struct dt_object *dt,
- struct thandle *th)
+int lod_sub_ref_add(const struct lu_env *env, struct dt_object *dt,
+ struct thandle *th)
{
struct thandle *sub_th;
bool record_update;
* \retval 0 if the declaration succeeds.
* \retval negative errno if the declaration fails.
*/
-int lod_sub_object_declare_ref_del(const struct lu_env *env,
- struct dt_object *dt,
- struct thandle *th)
+int lod_sub_declare_ref_del(const struct lu_env *env, struct dt_object *dt,
+ struct thandle *th)
{
struct thandle *sub_th;
+ bool record_update;
int rc;
ENTRY;
- sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+ sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
if (IS_ERR(sub_th))
RETURN(PTR_ERR(sub_th));
+ if (record_update)
+ update_record_size(env, ref_del, th, lu_object_fid(&dt->do_lu));
+
rc = dt_declare_ref_del(env, dt, sub_th);
RETURN(rc);
* \retval 0 if it succeeds.
* \retval negative errno if it fails.
*/
-int lod_sub_object_ref_del(const struct lu_env *env, struct dt_object *dt,
- struct thandle *th)
+int lod_sub_ref_del(const struct lu_env *env, struct dt_object *dt,
+ struct thandle *th)
{
struct thandle *sub_th;
bool record_update;
* \retval 0 if the declaration succeeds.
* \retval negative errno if the declaration fails.
*/
-int lod_sub_object_declare_destroy(const struct lu_env *env,
- struct dt_object *dt,
- struct thandle *th)
+int lod_sub_declare_destroy(const struct lu_env *env, struct dt_object *dt,
+ struct thandle *th)
{
struct thandle *sub_th;
+ bool record_update;
int rc;
ENTRY;
- sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+ sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
if (IS_ERR(sub_th))
RETURN(PTR_ERR(sub_th));
+ if (record_update)
+ update_record_size(env, destroy, th, lu_object_fid(&dt->do_lu));
+
rc = dt_declare_destroy(env, dt, sub_th);
RETURN(rc);
* \retval 0 if the destroy succeeds.
* \retval negative errno if the destroy fails.
*/
-int lod_sub_object_destroy(const struct lu_env *env, struct dt_object *dt,
- struct thandle *th)
+int lod_sub_destroy(const struct lu_env *env, struct dt_object *dt,
+ struct thandle *th)
{
struct thandle *sub_th;
bool record_update;
RETURN(PTR_ERR(sub_th));
if (record_update) {
- rc = update_record_pack(object_destroy, th,
- lu_object_fid(&dt->do_lu));
+ rc = update_record_pack(destroy, th, lu_object_fid(&dt->do_lu));
if (rc < 0)
RETURN(rc);
}
* \retval 0 if the declaration succeeds.
* \retval negative errno if the declaration fails.
*/
-int lod_sub_object_declare_insert(const struct lu_env *env,
- struct dt_object *dt,
- const struct dt_rec *rec,
- const struct dt_key *key,
- struct thandle *th)
+int lod_sub_declare_insert(const struct lu_env *env, struct dt_object *dt,
+ const struct dt_rec *rec,
+ const struct dt_key *key, struct thandle *th)
{
struct thandle *sub_th;
+ bool record_update;
- sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+ sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
if (IS_ERR(sub_th))
return PTR_ERR(sub_th);
+ if (record_update)
+ update_record_size(env, index_insert, th,
+ lu_object_fid(&dt->do_lu), rec, key);
+
return dt_declare_insert(env, dt, rec, key, sub_th);
}
* \param[in] rec record of the index to be inserted
* \param[in] key key of the index to be inserted
* \param[in] th the transaction handle
- * \param[in] ign whether ignore quota
*
* \retval 0 if the insertion succeeds.
* \retval negative errno if the insertion fails.
*/
-int lod_sub_object_index_insert(const struct lu_env *env, struct dt_object *dt,
- const struct dt_rec *rec,
- const struct dt_key *key, struct thandle *th,
- int ign)
+int lod_sub_insert(const struct lu_env *env, struct dt_object *dt,
+ const struct dt_rec *rec, const struct dt_key *key,
+ struct thandle *th)
{
struct thandle *sub_th;
int rc;
return rc;
}
- return dt_insert(env, dt, rec, key, sub_th, ign);
+ return dt_insert(env, dt, rec, key, sub_th);
}
/**
* \retval 0 if the declaration succeeds.
* \retval negative errno if the declaration fails.
*/
-int lod_sub_object_declare_delete(const struct lu_env *env,
- struct dt_object *dt,
- const struct dt_key *key,
- struct thandle *th)
+int lod_sub_declare_delete(const struct lu_env *env, struct dt_object *dt,
+ const struct dt_key *key, struct thandle *th)
{
struct thandle *sub_th;
+ bool record_update;
- sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+ sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
if (IS_ERR(sub_th))
return PTR_ERR(sub_th);
+ if (record_update)
+ update_record_size(env, index_delete, th,
+ lu_object_fid(&dt->do_lu), key);
+
return dt_declare_delete(env, dt, key, sub_th);
}
* \retval 0 if the deletion succeeds.
* \retval negative errno if the deletion fails.
*/
-int lod_sub_object_delete(const struct lu_env *env, struct dt_object *dt,
- const struct dt_key *name, struct thandle *th)
+int lod_sub_delete(const struct lu_env *env, struct dt_object *dt,
+ const struct dt_key *name, struct thandle *th)
{
struct thandle *sub_th;
bool record_update;
* \retval 0 if the declaration succeeds.
* \retval negative errno if the declaration fails.
*/
-int lod_sub_object_declare_xattr_set(const struct lu_env *env,
- struct dt_object *dt,
- const struct lu_buf *buf,
- const char *name, int fl,
- struct thandle *th)
+int lod_sub_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
+ const struct lu_buf *buf, const char *name,
+ int fl, struct thandle *th)
{
struct thandle *sub_th;
+ bool record_update;
int rc;
ENTRY;
- sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+ sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
if (IS_ERR(sub_th))
RETURN(PTR_ERR(sub_th));
+ if (record_update)
+ update_record_size(env, xattr_set, th,
+ lu_object_fid(&dt->do_lu),
+ buf, name, fl);
+
rc = dt_declare_xattr_set(env, dt, buf, name, fl, sub_th);
RETURN(rc);
* \retval 0 if the xattr setting succeeds.
* \retval negative errno if xattr setting fails.
*/
-int lod_sub_object_xattr_set(const struct lu_env *env, struct dt_object *dt,
- const struct lu_buf *buf, const char *name, int fl,
- struct thandle *th)
+int lod_sub_xattr_set(const struct lu_env *env, struct dt_object *dt,
+ const struct lu_buf *buf, const char *name, int fl,
+ struct thandle *th)
{
struct thandle *sub_th;
bool record_update;
* \retval 0 if the declaration succeeds.
* \retval negative errno if the declaration fails.
*/
-int lod_sub_object_declare_attr_set(const struct lu_env *env,
- struct dt_object *dt,
- const struct lu_attr *attr,
- struct thandle *th)
+int lod_sub_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
+ const struct lu_attr *attr, struct thandle *th)
{
struct thandle *sub_th;
+ bool record_update;
int rc;
ENTRY;
- sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+ sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
if (IS_ERR(sub_th))
RETURN(PTR_ERR(sub_th));
+ if (record_update)
+ update_record_size(env, attr_set, th,
+ lu_object_fid(&dt->do_lu), attr);
+
rc = dt_declare_attr_set(env, dt, attr, sub_th);
RETURN(rc);
* \retval 0 if attributes setting succeeds.
* \retval negative errno if the attributes setting fails.
*/
-int lod_sub_object_attr_set(const struct lu_env *env,
- struct dt_object *dt,
- const struct lu_attr *attr,
- struct thandle *th)
+int lod_sub_attr_set(const struct lu_env *env, struct dt_object *dt,
+ const struct lu_attr *attr, struct thandle *th)
{
bool record_update;
struct thandle *sub_th;
* \retval 0 if the declaration succeeds.
* \retval negative errno if the declaration fails.
*/
-int lod_sub_object_declare_xattr_del(const struct lu_env *env,
- struct dt_object *dt,
- const char *name,
- struct thandle *th)
+int lod_sub_declare_xattr_del(const struct lu_env *env, struct dt_object *dt,
+ const char *name, struct thandle *th)
{
struct thandle *sub_th;
+ bool record_update;
int rc;
ENTRY;
- sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+ sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
if (IS_ERR(sub_th))
RETURN(PTR_ERR(sub_th));
+ if (record_update)
+ update_record_size(env, xattr_del, th,
+ lu_object_fid(&dt->do_lu),
+ name);
+
rc = dt_declare_xattr_del(env, dt, name, sub_th);
RETURN(rc);
* \retval 0 if the deletion succeeds.
* \retval negative errno if the deletion fails.
*/
-int lod_sub_object_xattr_del(const struct lu_env *env,
- struct dt_object *dt,
- const char *name,
- struct thandle *th)
+int lod_sub_xattr_del(const struct lu_env *env, struct dt_object *dt,
+ const char *name, struct thandle *th)
{
struct thandle *sub_th;
bool record_update;
* \retval 0 if the insertion succeeds.
* \retval negative errno if the insertion fails.
*/
-int lod_sub_object_declare_write(const struct lu_env *env,
- struct dt_object *dt,
- const struct lu_buf *buf, loff_t pos,
- struct thandle *th)
+int lod_sub_declare_write(const struct lu_env *env, struct dt_object *dt,
+ const struct lu_buf *buf, loff_t pos,
+ struct thandle *th)
{
struct thandle *sub_th;
+ bool record_update;
int rc;
ENTRY;
- sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+ sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
if (IS_ERR(sub_th))
RETURN(PTR_ERR(sub_th));
+ if (record_update)
+ update_record_size(env, write, th,
+ lu_object_fid(&dt->do_lu),
+ buf, pos);
+
rc = dt_declare_write(env, dt, buf, pos, sub_th);
RETURN(rc);
* \param[in] buf buffer to write which includes an embedded size field
* \param[in] pos offet in the object to start writing at
* \param[in] th transaction handle
- * \param[in] rq enforcement for this write
*
* \retval the buffer size in bytes if it succeeds.
* \retval negative errno if it fails.
*/
-ssize_t lod_sub_object_write(const struct lu_env *env, struct dt_object *dt,
- const struct lu_buf *buf, loff_t *pos,
- struct thandle *th, int rq)
+ssize_t lod_sub_write(const struct lu_env *env, struct dt_object *dt,
+ const struct lu_buf *buf, loff_t *pos,
+ struct thandle *th)
{
struct thandle *sub_th;
bool record_update;
RETURN(rc);
}
- rc = dt_write(env, dt, buf, pos, sub_th, rq);
+ rc = dt_write(env, dt, buf, pos, sub_th);
RETURN(rc);
}
* \retval 0 if the insertion succeeds.
* \retval negative errno if the insertion fails.
*/
-int lod_sub_object_declare_punch(const struct lu_env *env,
- struct dt_object *dt,
- __u64 start, __u64 end,
- struct thandle *th)
+int lod_sub_declare_punch(const struct lu_env *env, struct dt_object *dt,
+ __u64 start, __u64 end, struct thandle *th)
{
struct thandle *sub_th;
+ bool record_update;
int rc;
ENTRY;
- sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+ sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
if (IS_ERR(sub_th))
RETURN(PTR_ERR(sub_th));
+ if (record_update)
+ update_record_size(env, punch, th,
+ lu_object_fid(&dt->do_lu),
+ start, end);
+
rc = dt_declare_punch(env, dt, start, end, sub_th);
RETURN(rc);
* \retval the buffer size in bytes if it succeeds.
* \retval negative errno if it fails.
*/
-int lod_sub_object_punch(const struct lu_env *env, struct dt_object *dt,
- __u64 start, __u64 end, struct thandle *th)
+int lod_sub_punch(const struct lu_env *env, struct dt_object *dt,
+ __u64 start, __u64 end, struct thandle *th)
{
struct thandle *sub_th;
bool record_update;
struct lu_fid *fid = <i->lti_fid;
struct obd_device *obd;
int rc;
+ bool need_put = false;
ENTRY;
lu_update_log_fid(fid, index);
- fid_to_logid(fid, &cid->lci_logid);
+
+ rc = lodname2mdt_index(lod2obd(lod)->obd_name, (__u32 *)&index);
+ if (rc < 0)
+ RETURN(rc);
+
+ rc = llog_osd_get_cat_list(env, dt, index, 1, cid, fid);
+ if (rc != 0) {
+ CERROR("%s: can't get id from catalogs: rc = %d\n",
+ lod2obd(lod)->obd_name, rc);
+ RETURN(rc);
+ }
obd = dt->dd_lu_dev.ld_obd;
ctxt = llog_get_context(obd, LLOG_UPDATELOG_ORIG_CTXT);
LASSERT(ctxt != NULL);
+ /* concurrent config processing (e.g. setting MDT active)
+ * can try to initialize llog again before causing double
+ * initialization. check for this */
+ if (ctxt->loc_handle)
+ GOTO(out_put, rc = 0);
+
ctxt->loc_flags |= LLOG_CTXT_FLAG_NORMAL_FID;
+ ctxt->loc_chunk_size = LLOG_MIN_CHUNK_SIZE * 4;
+ if (likely(logid_id(&cid->lci_logid) != 0)) {
+ rc = llog_open(env, ctxt, &lgh, &cid->lci_logid, NULL,
+ LLOG_OPEN_EXISTS);
+
+ /* re-create llog if it is missing */
+ if (rc == -ENOENT)
+ logid_set_id(&cid->lci_logid, 0);
+ else if (rc < 0)
+ GOTO(out_put, rc);
+ }
- rc = llog_open(env, ctxt, &lgh, &cid->lci_logid, NULL,
- LLOG_OPEN_EXISTS);
- if (rc < 0) {
- llog_ctxt_put(ctxt);
- RETURN(rc);
+ if (unlikely(logid_id(&cid->lci_logid) == 0)) {
+ rc = llog_open_create(env, ctxt, &lgh, NULL, NULL);
+ if (rc < 0)
+ GOTO(out_put, rc);
+ cid->lci_logid = lgh->lgh_id;
+ need_put = true;
}
LASSERT(lgh != NULL);
- ctxt->loc_handle = lgh;
- rc = llog_cat_init_and_process(env, lgh);
- if (rc != 0) {
- llog_cat_close(env, ctxt->loc_handle);
- ctxt->loc_handle = NULL;
+ rc = llog_init_handle(env, lgh, LLOG_F_IS_CAT, NULL);
+ if (rc != 0)
+ GOTO(out_close, rc);
+
+ if (need_put) {
+ rc = llog_osd_put_cat_list(env, dt, index, 1, cid, fid);
+ if (rc != 0)
+ GOTO(out_close, rc);
}
- llog_ctxt_put(ctxt);
+ ctxt->loc_handle = lgh;
+ CDEBUG(D_INFO, "%s: init llog for index %d - catid "DFID":%x\n",
+ obd->obd_name, index, PFID(&cid->lci_logid.lgl_oi.oi_fid),
+ cid->lci_logid.lgl_ogen);
+out_close:
+ if (rc != 0)
+ llog_cat_close(env, lgh);
+out_put:
+ llog_ctxt_put(ctxt);
RETURN(rc);
}