Whamcloud - gitweb
LU-14132 lod: do not initialize sub llogs twice
[fs/lustre-release.git] / lustre / lod / lod_sub_object.c
index f1df917..3e8d822 100644 (file)
@@ -20,7 +20,7 @@
  * GPL HEADER END
  */
 /*
- * Copyright (c) 2014, Intel Corporation.
+ * Copyright (c) 2015, 2017, Intel Corporation.
  */
 /*
  * lustre/lod/lod_sub_object.c
 
 #include <obd.h>
 #include <obd_class.h>
-#include <lustre_ver.h>
+#include <uapi/linux/lustre/lustre_ver.h>
 #include <obd_support.h>
 #include <lprocfs_status.h>
 
 #include <lustre_fid.h>
-#include <lustre_param.h>
+#include <uapi/linux/lustre/lustre_param.h>
 #include <md_object.h>
 #include <lustre_linkea.h>
 #include <lustre_log.h>
@@ -74,9 +74,10 @@ struct thandle *lod_sub_get_thandle(const struct lu_env *env,
                RETURN(th);
 
        tth = container_of(th, struct top_thandle, tt_super);
+       tth->tt_master_sub_thandle->th_ignore_quota = th->th_ignore_quota;
 
        /* local object must be mdt object, Note: during ost object
-        * creation, FID is not assigned until osp_object_create(),
+        * creation, FID is not assigned until osp_create(),
         * so if the FID of sub_obj is zero, it means OST object. */
        if (!dt_object_remote(sub_obj) ||
            fid_is_zero(lu_object_fid(&sub_obj->do_lu))) {
@@ -95,15 +96,20 @@ struct thandle *lod_sub_get_thandle(const struct lu_env *env,
        if (rc < 0)
                RETURN(ERR_PTR(rc));
 
-       if (type == LU_SEQ_RANGE_OST)
+       /* th_complex means we need track all of updates for this
+        * transaction, include changes on OST */
+       if (type == LU_SEQ_RANGE_OST && !th->th_complex)
                RETURN(tth->tt_master_sub_thandle);
 
+       sub_th = thandle_get_sub(env, th, sub_obj);
+       if (IS_ERR(sub_th))
+               RETURN(sub_th);
+       sub_th->th_ignore_quota = th->th_ignore_quota;
+
        if (tth->tt_multiple_thandle != NULL && record_update != NULL &&
            th->th_result == 0)
                *record_update = true;
 
-       sub_th = thandle_get_sub(env, th, sub_obj);
-
        RETURN(sub_th);
 }
 
@@ -122,19 +128,22 @@ struct thandle *lod_sub_get_thandle(const struct lu_env *env,
  * \retval             0 if the declaration succeeds
  * \retval             negative errno if the declaration fails.
  */
-int lod_sub_object_declare_create(const struct lu_env *env,
-                                 struct dt_object *dt,
-                                 struct lu_attr *attr,
-                                 struct dt_allocation_hint *hint,
-                                 struct dt_object_format *dof,
-                                 struct thandle *th)
+int lod_sub_declare_create(const struct lu_env *env, struct dt_object *dt,
+                          struct lu_attr *attr,
+                          struct dt_allocation_hint *hint,
+                          struct dt_object_format *dof, struct thandle *th)
 {
        struct thandle *sub_th;
+       bool record_update;
 
-       sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                return PTR_ERR(sub_th);
 
+       if (record_update)
+               update_record_size(env, create, th, lu_object_fid(&dt->do_lu),
+                                  attr, hint, dof);
+
        return dt_declare_create(env, dt, attr, hint, dof, sub_th);
 }
 
@@ -154,11 +163,9 @@ int lod_sub_object_declare_create(const struct lu_env *env,
  * \retval             0 if the creation succeeds
  * \retval             negative errno if the creation fails.
  */
-int lod_sub_object_create(const struct lu_env *env, struct dt_object *dt,
-                         struct lu_attr *attr,
-                         struct dt_allocation_hint *hint,
-                         struct dt_object_format *dof,
-                         struct thandle *th)
+int lod_sub_create(const struct lu_env *env, struct dt_object *dt,
+                  struct lu_attr *attr, struct dt_allocation_hint *hint,
+                  struct dt_object_format *dof, struct thandle *th)
 {
        struct thandle     *sub_th;
        bool               record_update;
@@ -194,18 +201,21 @@ int lod_sub_object_create(const struct lu_env *env, struct dt_object *dt,
  * \retval             0 if the declaration succeeds.
  * \retval             negative errno if the declaration fails.
  */
-int lod_sub_object_declare_ref_add(const struct lu_env *env,
-                                  struct dt_object *dt,
-                                  struct thandle *th)
+int lod_sub_declare_ref_add(const struct lu_env *env, struct dt_object *dt,
+                           struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update)
+               update_record_size(env, ref_add, th, lu_object_fid(&dt->do_lu));
+
        rc = dt_declare_ref_add(env, dt, sub_th);
 
        RETURN(rc);
@@ -224,8 +234,8 @@ int lod_sub_object_declare_ref_add(const struct lu_env *env,
  * \retval             0 if it succeeds.
  * \retval             negative errno if it fails.
  */
-int lod_sub_object_ref_add(const struct lu_env *env, struct dt_object *dt,
-                          struct thandle *th)
+int lod_sub_ref_add(const struct lu_env *env, struct dt_object *dt,
+                   struct thandle *th)
 {
        struct thandle  *sub_th;
        bool            record_update;
@@ -260,18 +270,21 @@ int lod_sub_object_ref_add(const struct lu_env *env, struct dt_object *dt,
  * \retval             0 if the declaration succeeds.
  * \retval             negative errno if the declaration fails.
  */
-int lod_sub_object_declare_ref_del(const struct lu_env *env,
-                                  struct dt_object *dt,
-                                  struct thandle *th)
+int lod_sub_declare_ref_del(const struct lu_env *env, struct dt_object *dt,
+                           struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update)
+               update_record_size(env, ref_del, th, lu_object_fid(&dt->do_lu));
+
        rc = dt_declare_ref_del(env, dt, sub_th);
 
        RETURN(rc);
@@ -290,8 +303,8 @@ int lod_sub_object_declare_ref_del(const struct lu_env *env,
  * \retval             0 if it succeeds.
  * \retval             negative errno if it fails.
  */
-int lod_sub_object_ref_del(const struct lu_env *env, struct dt_object *dt,
-                          struct thandle *th)
+int lod_sub_ref_del(const struct lu_env *env, struct dt_object *dt,
+                   struct thandle *th)
 {
        struct thandle  *sub_th;
        bool            record_update;
@@ -326,18 +339,21 @@ int lod_sub_object_ref_del(const struct lu_env *env, struct dt_object *dt,
  * \retval             0 if the declaration succeeds.
  * \retval             negative errno if the declaration fails.
  */
-int lod_sub_object_declare_destroy(const struct lu_env *env,
-                                  struct dt_object *dt,
-                                  struct thandle *th)
+int lod_sub_declare_destroy(const struct lu_env *env, struct dt_object *dt,
+                           struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update)
+               update_record_size(env, destroy, th, lu_object_fid(&dt->do_lu));
+
        rc = dt_declare_destroy(env, dt, sub_th);
 
        RETURN(rc);
@@ -356,8 +372,8 @@ int lod_sub_object_declare_destroy(const struct lu_env *env,
  * \retval             0 if the destroy succeeds.
  * \retval             negative errno if the destroy fails.
  */
-int lod_sub_object_destroy(const struct lu_env *env, struct dt_object *dt,
-                          struct thandle *th)
+int lod_sub_destroy(const struct lu_env *env, struct dt_object *dt,
+                   struct thandle *th)
 {
        struct thandle  *sub_th;
        bool            record_update;
@@ -369,8 +385,7 @@ int lod_sub_object_destroy(const struct lu_env *env, struct dt_object *dt,
                RETURN(PTR_ERR(sub_th));
 
        if (record_update) {
-               rc = update_record_pack(object_destroy, th,
-                                       lu_object_fid(&dt->do_lu));
+               rc = update_record_pack(destroy, th, lu_object_fid(&dt->do_lu));
                if (rc < 0)
                        RETURN(rc);
        }
@@ -394,18 +409,21 @@ int lod_sub_object_destroy(const struct lu_env *env, struct dt_object *dt,
  * \retval             0 if the declaration succeeds.
  * \retval             negative errno if the declaration fails.
  */
-int lod_sub_object_declare_insert(const struct lu_env *env,
-                                 struct dt_object *dt,
-                                 const struct dt_rec *rec,
-                                 const struct dt_key *key,
-                                 struct thandle *th)
+int lod_sub_declare_insert(const struct lu_env *env, struct dt_object *dt,
+                          const struct dt_rec *rec,
+                          const struct dt_key *key, struct thandle *th)
 {
        struct thandle *sub_th;
+       bool            record_update;
 
-       sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                return PTR_ERR(sub_th);
 
+       if (record_update)
+               update_record_size(env, index_insert, th,
+                                  lu_object_fid(&dt->do_lu), rec, key);
+
        return dt_declare_insert(env, dt, rec, key, sub_th);
 }
 
@@ -420,15 +438,13 @@ int lod_sub_object_declare_insert(const struct lu_env *env,
  * \param[in] rec      record of the index to be inserted
  * \param[in] key      key of the index to be inserted
  * \param[in] th       the transaction handle
- * \param[in] ign      whether ignore quota
  *
  * \retval             0 if the insertion succeeds.
  * \retval             negative errno if the insertion fails.
  */
-int lod_sub_object_index_insert(const struct lu_env *env, struct dt_object *dt,
-                               const struct dt_rec *rec,
-                               const struct dt_key *key, struct thandle *th,
-                               int ign)
+int lod_sub_insert(const struct lu_env *env, struct dt_object *dt,
+                  const struct dt_rec *rec, const struct dt_key *key,
+                  struct thandle *th)
 {
        struct thandle *sub_th;
        int             rc;
@@ -445,7 +461,7 @@ int lod_sub_object_index_insert(const struct lu_env *env, struct dt_object *dt,
                        return rc;
        }
 
-       return dt_insert(env, dt, rec, key, sub_th, ign);
+       return dt_insert(env, dt, rec, key, sub_th);
 }
 
 /**
@@ -461,17 +477,20 @@ int lod_sub_object_index_insert(const struct lu_env *env, struct dt_object *dt,
  * \retval             0 if the declaration succeeds.
  * \retval             negative errno if the declaration fails.
  */
-int lod_sub_object_declare_delete(const struct lu_env *env,
-                                 struct dt_object *dt,
-                                 const struct dt_key *key,
-                                 struct thandle *th)
+int lod_sub_declare_delete(const struct lu_env *env, struct dt_object *dt,
+                          const struct dt_key *key, struct thandle *th)
 {
        struct thandle *sub_th;
+       bool            record_update;
 
-       sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                return PTR_ERR(sub_th);
 
+       if (record_update)
+               update_record_size(env, index_delete, th,
+                                  lu_object_fid(&dt->do_lu), key);
+
        return dt_declare_delete(env, dt, key, sub_th);
 }
 
@@ -489,8 +508,8 @@ int lod_sub_object_declare_delete(const struct lu_env *env,
  * \retval             0 if the deletion succeeds.
  * \retval             negative errno if the deletion fails.
  */
-int lod_sub_object_delete(const struct lu_env *env, struct dt_object *dt,
-                         const struct dt_key *name, struct thandle *th)
+int lod_sub_delete(const struct lu_env *env, struct dt_object *dt,
+                  const struct dt_key *name, struct thandle *th)
 {
        struct thandle  *sub_th;
        bool            record_update;
@@ -526,20 +545,24 @@ int lod_sub_object_delete(const struct lu_env *env, struct dt_object *dt,
  * \retval             0 if the declaration succeeds.
  * \retval             negative errno if the declaration fails.
  */
-int lod_sub_object_declare_xattr_set(const struct lu_env *env,
-                                    struct dt_object *dt,
-                                    const struct lu_buf *buf,
-                                    const char *name, int fl,
-                                    struct thandle *th)
+int lod_sub_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
+                             const struct lu_buf *buf, const char *name,
+                             int fl, struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update)
+               update_record_size(env, xattr_set, th,
+                                  lu_object_fid(&dt->do_lu),
+                                  buf, name, fl);
+
        rc = dt_declare_xattr_set(env, dt, buf, name, fl, sub_th);
 
        RETURN(rc);
@@ -561,9 +584,9 @@ int lod_sub_object_declare_xattr_set(const struct lu_env *env,
  * \retval             0 if the xattr setting succeeds.
  * \retval             negative errno if xattr setting fails.
  */
-int lod_sub_object_xattr_set(const struct lu_env *env, struct dt_object *dt,
-                            const struct lu_buf *buf, const char *name, int fl,
-                            struct thandle *th)
+int lod_sub_xattr_set(const struct lu_env *env, struct dt_object *dt,
+                     const struct lu_buf *buf, const char *name, int fl,
+                     struct thandle *th)
 {
        struct thandle  *sub_th;
        bool            record_update;
@@ -600,19 +623,22 @@ int lod_sub_object_xattr_set(const struct lu_env *env, struct dt_object *dt,
  * \retval             0 if the declaration succeeds.
  * \retval             negative errno if the declaration fails.
  */
-int lod_sub_object_declare_attr_set(const struct lu_env *env,
-                                   struct dt_object *dt,
-                                   const struct lu_attr *attr,
-                                   struct thandle *th)
+int lod_sub_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
+                            const struct lu_attr *attr, struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update)
+               update_record_size(env, attr_set, th,
+                                  lu_object_fid(&dt->do_lu), attr);
+
        rc = dt_declare_attr_set(env, dt, attr, sub_th);
 
        RETURN(rc);
@@ -632,10 +658,8 @@ int lod_sub_object_declare_attr_set(const struct lu_env *env,
  * \retval             0 if attributes setting succeeds.
  * \retval             negative errno if the attributes setting fails.
  */
-int lod_sub_object_attr_set(const struct lu_env *env,
-                           struct dt_object *dt,
-                           const struct lu_attr *attr,
-                           struct thandle *th)
+int lod_sub_attr_set(const struct lu_env *env, struct dt_object *dt,
+                    const struct lu_attr *attr, struct thandle *th)
 {
        bool               record_update;
        struct thandle     *sub_th;
@@ -671,19 +695,23 @@ int lod_sub_object_attr_set(const struct lu_env *env,
  * \retval             0 if the declaration succeeds.
  * \retval             negative errno if the declaration fails.
  */
-int lod_sub_object_declare_xattr_del(const struct lu_env *env,
-                                    struct dt_object *dt,
-                                    const char *name,
-                                    struct thandle *th)
+int lod_sub_declare_xattr_del(const struct lu_env *env, struct dt_object *dt,
+                             const char *name, struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update)
+               update_record_size(env, xattr_del, th,
+                                  lu_object_fid(&dt->do_lu),
+                                  name);
+
        rc = dt_declare_xattr_del(env, dt, name, sub_th);
 
        RETURN(rc);
@@ -703,10 +731,8 @@ int lod_sub_object_declare_xattr_del(const struct lu_env *env,
  * \retval             0 if the deletion succeeds.
  * \retval             negative errno if the deletion fails.
  */
-int lod_sub_object_xattr_del(const struct lu_env *env,
-                            struct dt_object *dt,
-                            const char *name,
-                            struct thandle *th)
+int lod_sub_xattr_del(const struct lu_env *env, struct dt_object *dt,
+                     const char *name, struct thandle *th)
 {
        struct thandle  *sub_th;
        bool            record_update;
@@ -743,19 +769,24 @@ int lod_sub_object_xattr_del(const struct lu_env *env,
  * \retval             0 if the insertion succeeds.
  * \retval             negative errno if the insertion fails.
  */
-int lod_sub_object_declare_write(const struct lu_env *env,
-                                struct dt_object *dt,
-                                const struct lu_buf *buf, loff_t pos,
-                                struct thandle *th)
+int lod_sub_declare_write(const struct lu_env *env, struct dt_object *dt,
+                         const struct lu_buf *buf, loff_t pos,
+                         struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update)
+               update_record_size(env, write, th,
+                                  lu_object_fid(&dt->do_lu),
+                                  buf, pos);
+
        rc = dt_declare_write(env, dt, buf, pos, sub_th);
 
        RETURN(rc);
@@ -772,14 +803,13 @@ int lod_sub_object_declare_write(const struct lu_env *env,
  * \param[in] buf      buffer to write which includes an embedded size field
  * \param[in] pos      offet in the object to start writing at
  * \param[in] th       transaction handle
- * \param[in] rq       enforcement for this write
  *
  * \retval             the buffer size in bytes if it succeeds.
  * \retval             negative errno if it fails.
  */
-ssize_t lod_sub_object_write(const struct lu_env *env, struct dt_object *dt,
-                            const struct lu_buf *buf, loff_t *pos,
-                            struct thandle *th, int rq)
+ssize_t lod_sub_write(const struct lu_env *env, struct dt_object *dt,
+                     const struct lu_buf *buf, loff_t *pos,
+                     struct thandle *th)
 {
        struct thandle  *sub_th;
        bool            record_update;
@@ -797,7 +827,7 @@ ssize_t lod_sub_object_write(const struct lu_env *env, struct dt_object *dt,
                        RETURN(rc);
        }
 
-       rc = dt_write(env, dt, buf, pos, sub_th, rq);
+       rc = dt_write(env, dt, buf, pos, sub_th);
        RETURN(rc);
 }
 
@@ -815,19 +845,23 @@ ssize_t lod_sub_object_write(const struct lu_env *env, struct dt_object *dt,
  * \retval             0 if the insertion succeeds.
  * \retval             negative errno if the insertion fails.
  */
-int lod_sub_object_declare_punch(const struct lu_env *env,
-                                struct dt_object *dt,
-                                __u64 start, __u64 end,
-                                struct thandle *th)
+int lod_sub_declare_punch(const struct lu_env *env, struct dt_object *dt,
+                         __u64 start, __u64 end, struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update)
+               update_record_size(env, punch, th,
+                                  lu_object_fid(&dt->do_lu),
+                                  start, end);
+
        rc = dt_declare_punch(env, dt, start, end, sub_th);
 
        RETURN(rc);
@@ -849,8 +883,8 @@ int lod_sub_object_declare_punch(const struct lu_env *env,
  * \retval             the buffer size in bytes if it succeeds.
  * \retval             negative errno if it fails.
  */
-int lod_sub_object_punch(const struct lu_env *env, struct dt_object *dt,
-                        __u64 start, __u64 end, struct thandle *th)
+int lod_sub_punch(const struct lu_env *env, struct dt_object *dt,
+                 __u64 start, __u64 end, struct thandle *th)
 {
        struct thandle  *sub_th;
        bool            record_update;
@@ -902,8 +936,14 @@ int lod_sub_prep_llog(const struct lu_env *env, struct lod_device *lod,
        obd = dt->dd_lu_dev.ld_obd;
        ctxt = llog_get_context(obd, LLOG_UPDATELOG_ORIG_CTXT);
        LASSERT(ctxt != NULL);
-       ctxt->loc_flags |= LLOG_CTXT_FLAG_NORMAL_FID;
+       /* concurrent config processing (e.g. setting MDT active)
+        * can try to initialize llog again before causing double
+        * initialization. check for this */
+       if (ctxt->loc_handle)
+               GOTO(out_put, rc = 0);
 
+       ctxt->loc_flags |= LLOG_CTXT_FLAG_NORMAL_FID;
+       ctxt->loc_chunk_size = LLOG_MIN_CHUNK_SIZE * 4;
        if (likely(logid_id(&cid->lci_logid) != 0)) {
                rc = llog_open(env, ctxt, &lgh, &cid->lci_logid, NULL,
                               LLOG_OPEN_EXISTS);
@@ -924,9 +964,8 @@ int lod_sub_prep_llog(const struct lu_env *env, struct lod_device *lod,
        }
 
        LASSERT(lgh != NULL);
-       ctxt->loc_handle = lgh;
 
-       rc = llog_cat_init_and_process(env, lgh);
+       rc = llog_init_handle(env, lgh, LLOG_F_IS_CAT, NULL);
        if (rc != 0)
                GOTO(out_close, rc);
 
@@ -936,15 +975,14 @@ int lod_sub_prep_llog(const struct lu_env *env, struct lod_device *lod,
                        GOTO(out_close, rc);
        }
 
-       CDEBUG(D_INFO, "%s: Init llog for %d - catid "DOSTID":%x\n",
-              obd->obd_name, index, POSTID(&cid->lci_logid.lgl_oi),
+       ctxt->loc_handle = lgh;
+
+       CDEBUG(D_INFO, "%s: init llog for index %d - catid "DFID":%x\n",
+              obd->obd_name, index, PFID(&cid->lci_logid.lgl_oi.oi_fid),
               cid->lci_logid.lgl_ogen);
 out_close:
-       if (rc != 0) {
-               llog_cat_close(env, ctxt->loc_handle);
-               ctxt->loc_handle = NULL;
-       }
-
+       if (rc != 0)
+               llog_cat_close(env, lgh);
 out_put:
        llog_ctxt_put(ctxt);
        RETURN(rc);