Whamcloud - gitweb
LU-9259 tests: set fail_loc on the right MDT
[fs/lustre-release.git] / lustre / lod / lod_sub_object.c
index cce804f..a13bf99 100644 (file)
@@ -20,7 +20,7 @@
  * GPL HEADER END
  */
 /*
- * Copyright (c) 2014, Intel Corporation.
+ * Copyright (c) 2015, Intel Corporation.
  */
 /*
  * lustre/lod/lod_sub_object.c
 #include <lustre_param.h>
 #include <md_object.h>
 #include <lustre_linkea.h>
+#include <lustre_log.h>
 
 #include "lod_internal.h"
 
 struct thandle *lod_sub_get_thandle(const struct lu_env *env,
                                    struct thandle *th,
-                                   const struct dt_object *sub_obj)
+                                   const struct dt_object *sub_obj,
+                                   bool *record_update)
 {
        struct lod_device       *lod = dt2lod_dev(th->th_dev);
        struct top_thandle      *tth;
@@ -65,27 +67,46 @@ struct thandle *lod_sub_get_thandle(const struct lu_env *env,
        int                     rc;
        ENTRY;
 
+       if (record_update != NULL)
+               *record_update = false;
+
        if (th->th_top == NULL)
                RETURN(th);
 
        tth = container_of(th, struct top_thandle, tt_super);
-       LASSERT(tth->tt_magic == TOP_THANDLE_MAGIC);
+
        /* local object must be mdt object, Note: during ost object
         * creation, FID is not assigned until osp_object_create(),
         * so if the FID of sub_obj is zero, it means OST object. */
        if (!dt_object_remote(sub_obj) ||
-           fid_is_zero(lu_object_fid(&sub_obj->do_lu)))
+           fid_is_zero(lu_object_fid(&sub_obj->do_lu))) {
+               /* local MDT object */
+               if (fid_is_sane(lu_object_fid(&sub_obj->do_lu)) &&
+                   tth->tt_multiple_thandle != NULL &&
+                   record_update != NULL &&
+                   th->th_result == 0)
+                       *record_update = true;
+
                RETURN(tth->tt_master_sub_thandle);
+       }
 
        rc = lod_fld_lookup(env, lod, lu_object_fid(&sub_obj->do_lu),
                            &mdt_index, &type);
        if (rc < 0)
                RETURN(ERR_PTR(rc));
 
-       if (type == LU_SEQ_RANGE_OST)
+       /* th_complex means we need track all of updates for this
+        * transaction, include changes on OST */
+       if (type == LU_SEQ_RANGE_OST && !th->th_complex)
                RETURN(tth->tt_master_sub_thandle);
 
        sub_th = thandle_get_sub(env, th, sub_obj);
+       if (IS_ERR(sub_th))
+               RETURN(sub_th);
+
+       if (tth->tt_multiple_thandle != NULL && record_update != NULL &&
+           th->th_result == 0)
+               *record_update = true;
 
        RETURN(sub_th);
 }
@@ -113,11 +134,16 @@ int lod_sub_object_declare_create(const struct lu_env *env,
                                  struct thandle *th)
 {
        struct thandle *sub_th;
+       bool record_update;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                return PTR_ERR(sub_th);
 
+       if (record_update)
+               update_record_size(env, create, th, lu_object_fid(&dt->do_lu),
+                                  attr, hint, dof);
+
        return dt_declare_create(env, dt, attr, hint, dof, sub_th);
 }
 
@@ -144,13 +170,22 @@ int lod_sub_object_create(const struct lu_env *env, struct dt_object *dt,
                          struct thandle *th)
 {
        struct thandle     *sub_th;
+       bool               record_update;
        int                 rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update) {
+               rc = update_record_pack(create, th,
+                                       lu_object_fid(&dt->do_lu),
+                                       attr, hint, dof);
+               if (rc < 0)
+                       RETURN(rc);
+       }
+
        rc = dt_create(env, dt, attr, hint, dof, sub_th);
 
        RETURN(rc);
@@ -173,13 +208,17 @@ int lod_sub_object_declare_ref_add(const struct lu_env *env,
                                   struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update)
+               update_record_size(env, ref_add, th, lu_object_fid(&dt->do_lu));
+
        rc = dt_declare_ref_add(env, dt, sub_th);
 
        RETURN(rc);
@@ -202,13 +241,21 @@ int lod_sub_object_ref_add(const struct lu_env *env, struct dt_object *dt,
                           struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update) {
+               rc = update_record_pack(ref_add, th,
+                                       lu_object_fid(&dt->do_lu));
+               if (rc < 0)
+                       RETURN(rc);
+       }
+
        rc = dt_ref_add(env, dt, sub_th);
 
        RETURN(rc);
@@ -231,13 +278,17 @@ int lod_sub_object_declare_ref_del(const struct lu_env *env,
                                   struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update)
+               update_record_size(env, ref_del, th, lu_object_fid(&dt->do_lu));
+
        rc = dt_declare_ref_del(env, dt, sub_th);
 
        RETURN(rc);
@@ -260,13 +311,21 @@ int lod_sub_object_ref_del(const struct lu_env *env, struct dt_object *dt,
                           struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update) {
+               rc = update_record_pack(ref_del, th,
+                                       lu_object_fid(&dt->do_lu));
+               if (rc < 0)
+                       RETURN(rc);
+       }
+
        rc = dt_ref_del(env, dt, sub_th);
 
        RETURN(rc);
@@ -289,13 +348,18 @@ int lod_sub_object_declare_destroy(const struct lu_env *env,
                                   struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update)
+               update_record_size(env, object_destroy, th,
+                                  lu_object_fid(&dt->do_lu));
+
        rc = dt_declare_destroy(env, dt, sub_th);
 
        RETURN(rc);
@@ -318,13 +382,21 @@ int lod_sub_object_destroy(const struct lu_env *env, struct dt_object *dt,
                           struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update) {
+               rc = update_record_pack(object_destroy, th,
+                                       lu_object_fid(&dt->do_lu));
+               if (rc < 0)
+                       RETURN(rc);
+       }
+
        rc = dt_destroy(env, dt, sub_th);
 
        RETURN(rc);
@@ -351,11 +423,16 @@ int lod_sub_object_declare_insert(const struct lu_env *env,
                                  struct thandle *th)
 {
        struct thandle *sub_th;
+       bool            record_update;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                return PTR_ERR(sub_th);
 
+       if (record_update)
+               update_record_size(env, index_insert, th,
+                                  lu_object_fid(&dt->do_lu), rec, key);
+
        return dt_declare_insert(env, dt, rec, key, sub_th);
 }
 
@@ -381,11 +458,20 @@ int lod_sub_object_index_insert(const struct lu_env *env, struct dt_object *dt,
                                int ign)
 {
        struct thandle *sub_th;
+       int             rc;
+       bool            record_update;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                return PTR_ERR(sub_th);
 
+       if (record_update) {
+               rc = update_record_pack(index_insert, th,
+                                       lu_object_fid(&dt->do_lu), rec, key);
+               if (rc < 0)
+                       return rc;
+       }
+
        return dt_insert(env, dt, rec, key, sub_th, ign);
 }
 
@@ -408,11 +494,16 @@ int lod_sub_object_declare_delete(const struct lu_env *env,
                                  struct thandle *th)
 {
        struct thandle *sub_th;
+       bool            record_update;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                return PTR_ERR(sub_th);
 
+       if (record_update)
+               update_record_size(env, index_delete, th,
+                                  lu_object_fid(&dt->do_lu), key);
+
        return dt_declare_delete(env, dt, key, sub_th);
 }
 
@@ -434,13 +525,21 @@ int lod_sub_object_delete(const struct lu_env *env, struct dt_object *dt,
                          const struct dt_key *name, struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update) {
+               rc = update_record_pack(index_delete, th,
+                                       lu_object_fid(&dt->do_lu), name);
+               if (rc < 0)
+                       RETURN(rc);
+       }
+
        rc = dt_delete(env, dt, name, sub_th);
        RETURN(rc);
 }
@@ -466,13 +565,19 @@ int lod_sub_object_declare_xattr_set(const struct lu_env *env,
                                     struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update)
+               update_record_size(env, xattr_set, th,
+                                  lu_object_fid(&dt->do_lu),
+                                  buf, name, fl);
+
        rc = dt_declare_xattr_set(env, dt, buf, name, fl, sub_th);
 
        RETURN(rc);
@@ -499,13 +604,22 @@ int lod_sub_object_xattr_set(const struct lu_env *env, struct dt_object *dt,
                             struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update) {
+               rc = update_record_pack(xattr_set, th,
+                                       lu_object_fid(&dt->do_lu),
+                                       buf, name, fl);
+               if (rc < 0)
+                       RETURN(rc);
+       }
+
        rc = dt_xattr_set(env, dt, buf, name, fl, sub_th);
 
        RETURN(rc);
@@ -530,13 +644,18 @@ int lod_sub_object_declare_attr_set(const struct lu_env *env,
                                    struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update)
+               update_record_size(env, attr_set, th,
+                                  lu_object_fid(&dt->do_lu), attr);
+
        rc = dt_declare_attr_set(env, dt, attr, sub_th);
 
        RETURN(rc);
@@ -561,14 +680,22 @@ int lod_sub_object_attr_set(const struct lu_env *env,
                            const struct lu_attr *attr,
                            struct thandle *th)
 {
+       bool               record_update;
        struct thandle     *sub_th;
        int                 rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update) {
+               rc = update_record_pack(attr_set, th, lu_object_fid(&dt->do_lu),
+                                       attr);
+               if (rc < 0)
+                       RETURN(rc);
+       }
+
        rc = dt_attr_set(env, dt, attr, sub_th);
 
        RETURN(rc);
@@ -593,13 +720,19 @@ int lod_sub_object_declare_xattr_del(const struct lu_env *env,
                                     struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update)
+               update_record_size(env, xattr_del, th,
+                                  lu_object_fid(&dt->do_lu),
+                                  name);
+
        rc = dt_declare_xattr_del(env, dt, name, sub_th);
 
        RETURN(rc);
@@ -625,13 +758,21 @@ int lod_sub_object_xattr_del(const struct lu_env *env,
                             struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update) {
+               rc = update_record_pack(xattr_del, th,
+                                       lu_object_fid(&dt->do_lu), name);
+               if (rc < 0)
+                       RETURN(rc);
+       }
+
        rc = dt_xattr_del(env, dt, name, sub_th);
 
        RETURN(rc);
@@ -657,13 +798,19 @@ int lod_sub_object_declare_write(const struct lu_env *env,
                                 struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update)
+               update_record_size(env, write, th,
+                                  lu_object_fid(&dt->do_lu),
+                                  buf, pos);
+
        rc = dt_declare_write(env, dt, buf, pos, sub_th);
 
        RETURN(rc);
@@ -690,13 +837,174 @@ ssize_t lod_sub_object_write(const struct lu_env *env, struct dt_object *dt,
                             struct thandle *th, int rq)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        ssize_t         rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update) {
+               rc = update_record_pack(write, th, lu_object_fid(&dt->do_lu),
+                                       buf, *pos);
+               if (rc < 0)
+                       RETURN(rc);
+       }
+
        rc = dt_write(env, dt, buf, pos, sub_th, rq);
        RETURN(rc);
 }
+
+/**
+ * Declare punch
+ *
+ * Get transaction of next layer and declare punch.
+ *
+ * \param[in] env      execution environment
+ * \param[in] dt       object to be written
+ * \param[in] start    start offset of punch
+ * \param[in] end      end offet of punch
+ * \param[in] th       transaction handle
+ *
+ * \retval             0 if the insertion succeeds.
+ * \retval             negative errno if the insertion fails.
+ */
+int lod_sub_object_declare_punch(const struct lu_env *env,
+                                struct dt_object *dt,
+                                __u64 start, __u64 end,
+                                struct thandle *th)
+{
+       struct thandle  *sub_th;
+       bool            record_update;
+       int             rc;
+       ENTRY;
+
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
+       if (IS_ERR(sub_th))
+               RETURN(PTR_ERR(sub_th));
+
+       if (record_update)
+               update_record_size(env, punch, th,
+                                  lu_object_fid(&dt->do_lu),
+                                  start, end);
+
+       rc = dt_declare_punch(env, dt, start, end, sub_th);
+
+       RETURN(rc);
+}
+
+/**
+ * Punch to sub object
+ *
+ * Get transaction of next layer, records buffer write if it belongs to
+ * Cross-MDT operation, and punch object.
+ *
+ * \param[in] env      execution environment
+ * \param[in] dt       object to be written
+ * \param[in] start    start offset of punch
+ * \param[in] end      end offset of punch
+ * \param[in] th       transaction handle
+ * \param[in] capa     capability of the write
+ *
+ * \retval             the buffer size in bytes if it succeeds.
+ * \retval             negative errno if it fails.
+ */
+int lod_sub_object_punch(const struct lu_env *env, struct dt_object *dt,
+                        __u64 start, __u64 end, struct thandle *th)
+{
+       struct thandle  *sub_th;
+       bool            record_update;
+       int             rc;
+       ENTRY;
+
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
+       if (IS_ERR(sub_th))
+               RETURN(PTR_ERR(sub_th));
+
+       if (record_update) {
+               rc = update_record_pack(punch, th, lu_object_fid(&dt->do_lu),
+                                       start, end);
+               if (rc < 0)
+                       RETURN(rc);
+       }
+
+       rc = dt_punch(env, dt, start, end, sub_th);
+
+       RETURN(rc);
+}
+
+int lod_sub_prep_llog(const struct lu_env *env, struct lod_device *lod,
+                     struct dt_device *dt, int index)
+{
+       struct lod_thread_info  *lti = lod_env_info(env);
+       struct llog_ctxt        *ctxt;
+       struct llog_handle      *lgh;
+       struct llog_catid       *cid = &lti->lti_cid;
+       struct lu_fid           *fid = &lti->lti_fid;
+       struct obd_device       *obd;
+       int                     rc;
+       bool                    need_put = false;
+       ENTRY;
+
+       lu_update_log_fid(fid, index);
+
+       rc = lodname2mdt_index(lod2obd(lod)->obd_name, (__u32 *)&index);
+       if (rc < 0)
+               RETURN(rc);
+
+       rc = llog_osd_get_cat_list(env, dt, index, 1, cid, fid);
+       if (rc != 0) {
+               CERROR("%s: can't get id from catalogs: rc = %d\n",
+                      lod2obd(lod)->obd_name, rc);
+               RETURN(rc);
+       }
+
+       obd = dt->dd_lu_dev.ld_obd;
+       ctxt = llog_get_context(obd, LLOG_UPDATELOG_ORIG_CTXT);
+       LASSERT(ctxt != NULL);
+       ctxt->loc_flags |= LLOG_CTXT_FLAG_NORMAL_FID;
+       ctxt->loc_chunk_size = LLOG_MIN_CHUNK_SIZE * 4;
+       if (likely(logid_id(&cid->lci_logid) != 0)) {
+               rc = llog_open(env, ctxt, &lgh, &cid->lci_logid, NULL,
+                              LLOG_OPEN_EXISTS);
+
+               /* re-create llog if it is missing */
+               if (rc == -ENOENT)
+                       logid_set_id(&cid->lci_logid, 0);
+               else if (rc < 0)
+                       GOTO(out_put, rc);
+       }
+
+       if (unlikely(logid_id(&cid->lci_logid) == 0)) {
+               rc = llog_open_create(env, ctxt, &lgh, NULL, NULL);
+               if (rc < 0)
+                       GOTO(out_put, rc);
+               cid->lci_logid = lgh->lgh_id;
+               need_put = true;
+       }
+
+       LASSERT(lgh != NULL);
+
+       rc = llog_init_handle(env, lgh, LLOG_F_IS_CAT, NULL);
+       if (rc != 0)
+               GOTO(out_close, rc);
+
+       if (need_put) {
+               rc = llog_osd_put_cat_list(env, dt, index, 1, cid, fid);
+               if (rc != 0)
+                       GOTO(out_close, rc);
+       }
+
+       ctxt->loc_handle = lgh;
+
+       CDEBUG(D_INFO, "%s: Init llog for %d - catid "DOSTID":%x\n",
+              obd->obd_name, index, POSTID(&cid->lci_logid.lgl_oi),
+              cid->lci_logid.lgl_ogen);
+out_close:
+       if (rc != 0)
+               llog_cat_close(env, lgh);
+out_put:
+       llog_ctxt_put(ctxt);
+       RETURN(rc);
+}