Whamcloud - gitweb
LU-6602 update: split update llog record 62/15162/6
authorwang di <di.wang@intel.com>
Thu, 21 May 2015 09:56:39 +0000 (02:56 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Sat, 4 Jul 2015 01:32:06 +0000 (01:32 +0000)
If the update llog record size exceeds the limit (llog
chunk size), which usually happens when creating striped
directory with large stripe count , then it will be split
into mulitple records, and these update records will have
the same batchid.

During recovery, these records will be combined and
constructed into the updates of one operation, then
do update replay.

Allow multiples stripes in a single MDT, so it can verify
creating large stripes in small scale test environment.

Add sanity 300j/300k and replay-single 116 to verify it.

Signed-off-by: wang di <di.wang@intel.com>
Change-Id: I86ca2594fe62d5b921e794de4cd88981d91f7677
Reviewed-on: http://review.whamcloud.com/15162
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: James Simmons <uja.ornl@yahoo.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
22 files changed:
lustre/include/lu_target.h
lustre/include/lustre/lustre_idl.h
lustre/include/lustre_dlm.h
lustre/include/lustre_update.h
lustre/include/obd_support.h
lustre/llite/dir.c
lustre/lod/lod_dev.c
lustre/lod/lod_object.c
lustre/lod/lod_sub_object.c
lustre/mdt/mdt_reint.c
lustre/obdclass/llog_osd.c
lustre/osp/osp_trans.c
lustre/ptlrpc/wiretest.c
lustre/target/out_lib.c
lustre/target/update_records.c
lustre/target/update_recovery.c
lustre/target/update_trans.c
lustre/tests/replay-single.sh
lustre/tests/sanity.sh
lustre/utils/liblustreapi.c
lustre/utils/wirecheck.c
lustre/utils/wiretest.c

index 4428e69..6d20d2f 100644 (file)
@@ -66,7 +66,10 @@ struct distribute_txn_replay_req {
  * by this structure */
 struct distribute_txn_replay_req_sub {
        __u32                   dtrqs_mdt_index;
-       struct llog_cookie      dtrqs_llog_cookie;
+
+       /* All of cookies for the update will be linked here */
+       spinlock_t              dtrqs_cookie_list_lock;
+       struct list_head        dtrqs_cookie_list;
        struct list_head        dtrqs_list;
 };
 
index 281d88d..afef66f 100644 (file)
@@ -983,9 +983,6 @@ struct lu_orphan_ent {
 };
 void lustre_swab_orphan_ent(struct lu_orphan_ent *ent);
 
-struct update_ops;
-void lustre_swab_update_ops(struct update_ops *uops, unsigned int op_count);
-
 /** @} lu_fid */
 
 /** \defgroup lu_dir lu_dir
@@ -4023,6 +4020,7 @@ enum update_type {
        OUT_XATTR_DEL           = 13,
        OUT_PUNCH               = 14,
        OUT_READ                = 15,
+       OUT_NOOP                = 16,
        OUT_LAST
 };
 
@@ -4223,5 +4221,66 @@ struct close_data {
 
 void lustre_swab_close_data(struct close_data *data);
 
+struct update_ops;
+void lustre_swab_update_ops(struct update_ops *uops, unsigned int op_count);
+
+/* Update llog format */
+struct update_op {
+       struct lu_fid   uop_fid;
+       __u16           uop_type;
+       __u16           uop_param_count;
+       __u16           uop_params_off[0];
+};
+
+struct update_ops {
+       struct update_op        uops_op[0];
+};
+
+struct update_params {
+       struct object_update_param      up_params[0];
+};
+
+enum update_records_flag {
+       UPDATE_RECORD_CONTINUE = 1 >> 0,
+};
+/*
+ * This is the update record format used to store the updates in
+ * disk. All updates of the operation will be stored in ur_ops.
+ * All of parameters for updates of the operation will be stored
+ * in ur_params.
+ * To save the space of the record, parameters in ur_ops will only
+ * remember their offset in ur_params, so to avoid storing duplicate
+ * parameters in ur_params, which can help us save a lot space for
+ * operation like creating striped directory.
+ */
+struct update_records {
+       __u64                   ur_master_transno;
+       __u64                   ur_batchid;
+       __u32                   ur_flags;
+       /* If the operation includes multiple updates, then ur_index
+        * means the index of the update inside the whole updates. */
+       __u32                   ur_index;
+       __u32                   ur_update_count;
+       __u32                   ur_param_count;
+       struct update_ops       ur_ops;
+        /* Note ur_ops has a variable size, so comment out
+         * the following ur_params, in case some use it directly
+         * update_records->ur_params
+         *
+         * struct update_params        ur_params;
+         */
+};
+
+struct llog_update_record {
+       struct llog_rec_hdr     lur_hdr;
+       struct update_records   lur_update_rec;
+       /* Note ur_update_rec has a variable size, so comment out
+       * the following ur_tail, in case someone use it directly
+       *
+       * struct llog_rec_tail lur_tail;
+       */
+};
+
+
 #endif
 /** @} lustreidl */
index 4d64b19..6a2bc6a 100644 (file)
@@ -1084,9 +1084,11 @@ struct ldlm_enqueue_info {
        __u32 ei_type;   /** Type of the lock being enqueued. */
        __u32 ei_mode;   /** Mode of the lock being enqueued. */
        void *ei_cb_bl;  /** blocking lock callback */
+       void *ei_cb_local_bl;  /** blocking local lock callback */
        void *ei_cb_cp;  /** lock completion callback */
        void *ei_cb_gl;  /** lock glimpse callback */
        void *ei_cbdata; /** Data to be passed into callbacks. */
+       void *ei_namespace; /** lock namespace **/
        unsigned int ei_enq_slave:1; /* whether enqueue slave stripes */
 };
 
index c2994b1..f9fd97e 100644 (file)
 struct dt_key;
 struct dt_rec;
 struct object_update_param;
-
-struct update_params {
-       struct object_update_param      up_params[0];
-};
+struct llog_update_record;
 
 static inline size_t update_params_size(const struct update_params *params,
                                        unsigned int param_count)
@@ -98,13 +95,6 @@ update_params_get_param_buf(const struct update_params *params, __u16 index,
        return param->oup_buf;
 }
 
-struct update_op {
-       struct lu_fid uop_fid;
-       __u16   uop_type;
-       __u16   uop_param_count;
-       __u16   uop_params_off[0];
-};
-
 static inline size_t
 update_op_size(unsigned int param_count)
 {
@@ -118,11 +108,6 @@ update_op_next_op(const struct update_op *uop)
                                update_op_size(uop->uop_param_count));
 }
 
-/* All of updates in the mulitple_update_record */
-struct update_ops {
-       struct update_op        uops_op[0];
-};
-
 static inline size_t update_ops_size(const struct update_ops *ops,
                                     unsigned int update_count)
 {
@@ -137,41 +122,6 @@ static inline size_t update_ops_size(const struct update_ops *ops,
        return total_size;
 }
 
-/*
- * This is the update record format used to store the updates in
- * disk. All updates of the operation will be stored in ur_ops.
- * All of parameters for updates of the operation will be stored
- * in ur_params.
- * To save the space of the record, parameters in ur_ops will only
- * remember their offset in ur_params, so to avoid storing duplicate
- * parameters in ur_params, which can help us save a lot space for
- * operation like creating striped directory.
- */
-struct update_records {
-       __u64                   ur_master_transno;
-       __u64                   ur_batchid;
-       __u32                   ur_flags;
-       __u32                   ur_param_count;
-       __u32                   ur_update_count;
-       struct update_ops       ur_ops;
-        /* Note ur_ops has a variable size, so comment out
-         * the following ur_params, in case some use it directly
-         * update_records->ur_params
-         *
-         * struct update_params        ur_params;
-         */
-};
-
-struct llog_update_record {
-       struct llog_rec_hdr     lur_hdr;
-       struct update_records   lur_update_rec;
-       /* Note ur_update_rec has a variable size, so comment out
-        * the following ur_tail, in case someone use it directly
-        *
-        * struct llog_rec_tail lur_tail;
-        */
-};
-
 static inline struct update_params *
 update_records_get_params(const struct update_records *record)
 {
@@ -183,13 +133,21 @@ update_records_get_params(const struct update_records *record)
 static inline size_t
 update_records_size(const struct update_records *record)
 {
-       struct update_params *params;
+       size_t op_size = 0;
+       size_t param_size = 0;
 
-       params = update_records_get_params(record);
+       if (record->ur_update_count > 0)
+               op_size = update_ops_size(&record->ur_ops,
+                                         record->ur_update_count);
+       if (record->ur_param_count > 0) {
+               struct update_params *params;
+
+               params = update_records_get_params(record);
+               param_size = update_params_size(params, record->ur_param_count);
+       }
 
        return cfs_size_round(offsetof(struct update_records, ur_ops) +
-              update_ops_size(&record->ur_ops, record->ur_update_count) +
-              update_params_size(params, record->ur_param_count));
+                             op_size + param_size);
 }
 
 static inline size_t
@@ -336,6 +294,7 @@ struct top_multiple_thandle {
        __u64                   tmt_batchid;
        int                     tmt_result;
        __u32                   tmt_magic;
+       size_t                  tmt_record_size;
        __u32                   tmt_committed:1;
 };
 
@@ -350,12 +309,17 @@ struct top_thandle {
        struct top_multiple_thandle *tt_multiple_thandle;
 };
 
+struct sub_thandle_cookie {
+       struct llog_cookie      stc_cookie;
+       struct list_head        stc_list;
+};
+
 /* Sub thandle is used to track multiple sub thandles under one parent
  * thandle */
 struct sub_thandle {
        struct thandle          *st_sub_th;
        struct dt_device        *st_dt;
-       struct llog_cookie      st_cookie;
+       struct list_head        st_cookie_list;
        struct dt_txn_commit_cb st_commit_dcb;
        struct dt_txn_commit_cb st_stop_dcb;
        int                     st_result;
@@ -515,6 +479,43 @@ int sub_thandle_trans_create(const struct lu_env *env,
                             struct sub_thandle *st);
 
 /* update_records.c */
+size_t update_records_create_size(const struct lu_env *env,
+                                 const struct lu_fid *fid,
+                                 const struct lu_attr *attr,
+                                 const struct dt_allocation_hint *hint,
+                                 struct dt_object_format *dof);
+size_t update_records_attr_set_size(const struct lu_env *env,
+                                   const struct lu_fid *fid,
+                                   const struct lu_attr *attr);
+size_t update_records_ref_add_size(const struct lu_env *env,
+                                  const struct lu_fid *fid);
+size_t update_records_ref_del_size(const struct lu_env *env,
+                                  const struct lu_fid *fid);
+size_t update_records_object_destroy_size(const struct lu_env *env,
+                                         const struct lu_fid *fid);
+size_t update_records_index_insert_size(const struct lu_env *env,
+                                       const struct lu_fid *fid,
+                                       const struct dt_rec *rec,
+                                       const struct dt_key *key);
+size_t update_records_index_delete_size(const struct lu_env *env,
+                                       const struct lu_fid *fid,
+                                       const struct dt_key *key);
+size_t update_records_xattr_set_size(const struct lu_env *env,
+                                    const struct lu_fid *fid,
+                                    const struct lu_buf *buf,
+                                    const char *name,
+                                    __u32 flag);
+size_t update_records_xattr_del_size(const struct lu_env *env,
+                                    const struct lu_fid *fid,
+                                    const char *name);
+size_t update_records_write_size(const struct lu_env *env,
+                                const struct lu_fid *fid,
+                                const struct lu_buf *buf,
+                                __u64 pos);
+size_t update_records_punch_size(const struct lu_env *env,
+                                const struct lu_fid *fid,
+                                __u64 start, __u64 end);
+
 int update_records_create_pack(const struct lu_env *env,
                               struct update_ops *ops,
                               unsigned int *op_count,
@@ -616,6 +617,14 @@ int update_records_punch_pack(const struct lu_env *env,
                              size_t *max_param_size,
                              const struct lu_fid *fid,
                              __u64 start, __u64 end);
+int update_records_noop_pack(const struct lu_env *env,
+                            struct update_ops *ops,
+                            unsigned int *op_count,
+                            size_t *max_ops_size,
+                            struct update_params *params,
+                            unsigned int *param_count,
+                            size_t *max_param_size,
+                            const struct lu_fid *fid);
 
 int tur_update_records_extend(struct thandle_update_records *tur,
                              size_t new_size);
@@ -663,4 +672,17 @@ int tur_update_extend(struct thandle_update_records *tur,
        }                                                               \
        ret;                                                            \
 })
+
+#define update_record_size(env, name, th, ...)                         \
+({                                                                     \
+       struct top_thandle *top_th;                                     \
+       struct top_multiple_thandle *tmt;                               \
+                                                                       \
+       top_th = container_of(th, struct top_thandle, tt_super);        \
+                                                                       \
+       LASSERT(top_th->tt_multiple_thandle != NULL);                   \
+       tmt = top_th->tt_multiple_thandle;                              \
+       tmt->tmt_record_size +=                                         \
+               update_records_##name##_size(env, __VA_ARGS__);         \
+})
 #endif
index a4d400a..ed20d85 100644 (file)
@@ -561,6 +561,8 @@ int obd_alloc_fail(const void *ptr, const char *name, const char *type,
 /* UPDATE */
 #define OBD_FAIL_OUT_UPDATE_NET                0x1700
 #define OBD_FAIL_OUT_UPDATE_NET_REP    0x1701
+#define OBD_FAIL_SPLIT_UPDATE_REC      0x1702
+#define OBD_FAIL_LARGE_STRIPE          0x1703
 
 /* MIGRATE */
 #define OBD_FAIL_MIGRATE_NET_REP               0x1800
index ef46098..9c90098 100644 (file)
@@ -1256,12 +1256,14 @@ lmv_out_free:
                int                     mdt_index;
                int                     lum_size;
                int                     stripe_count;
+               int                     max_stripe_count;
                int                     i;
                int                     rc;
 
                if (copy_from_user(&lum, ulmv, sizeof(*ulmv)))
                        RETURN(-EFAULT);
 
+               max_stripe_count = lum.lum_stripe_count;
                /* lum_magic will indicate which stripe the ioctl will like
                 * to get, LMV_MAGIC_V1 is for normal LMV stripe, LMV_USER_MAGIC
                 * is for default LMV stripe */
@@ -1292,6 +1294,13 @@ lmv_out_free:
                }
 
                stripe_count = lmv_mds_md_stripe_count_get(lmm);
+               if (max_stripe_count < stripe_count) {
+                       lum.lum_stripe_count = stripe_count;
+                       if (copy_to_user(ulmv, &lum, sizeof(lum)))
+                               GOTO(finish_req, rc = -EFAULT);
+                       GOTO(finish_req, rc = -E2BIG);
+               }
+
                lum_size = lmv_user_md_size(stripe_count, LMV_MAGIC_V1);
                OBD_ALLOC(tmp, lum_size);
                if (tmp == NULL)
index 02b41d6..63dfca9 100644 (file)
@@ -1101,6 +1101,42 @@ static int lod_trans_cb_add(struct thandle *th,
 }
 
 /**
+ * add noop update to the update records
+ *
+ * Add noop updates to the update records, which is only used in
+ * test right now.
+ *
+ * \param[in] env      execution environment
+ * \param[in] dt       dt device of lod
+ * \param[in] th       thandle
+ * \param[in] count    the count of update records to be added.
+ *
+ * \retval             0 if adding succeeds.
+ * \retval             negative errno if adding fails.
+ */
+static int lod_add_noop_records(const struct lu_env *env,
+                               struct dt_device *dt, struct thandle *th,
+                               int count)
+{
+       struct top_thandle *top_th;
+       struct lu_fid *fid = &lod_env_info(env)->lti_fid;
+       int i;
+       int rc = 0;
+
+       top_th = container_of(th, struct top_thandle, tt_super);
+       if (top_th->tt_multiple_thandle == NULL)
+               return 0;
+
+       fid_zero(fid);
+       for (i = 0; i < count; i++) {
+               rc = update_record_pack(noop, th, fid);
+               if (rc < 0)
+                       return rc;
+       }
+       return rc;
+}
+
+/**
  * Implementation of dt_device_operations::dt_trans_stop() for LOD
  *
  * Stops the set of local transactions using the targets involved
@@ -1111,6 +1147,13 @@ static int lod_trans_cb_add(struct thandle *th,
 static int lod_trans_stop(const struct lu_env *env, struct dt_device *dt,
                          struct thandle *th)
 {
+       if (OBD_FAIL_CHECK(OBD_FAIL_SPLIT_UPDATE_REC)) {
+               int rc;
+
+               rc = lod_add_noop_records(env, dt, th, 5000);
+               if (rc < 0)
+                       RETURN(rc);
+       }
        return top_trans_stop(env, dt2lod_dev(dt)->lod_child, th);
 }
 
index b1318f6..948a675 100644 (file)
@@ -1787,6 +1787,7 @@ static int lod_prep_md_striped_create(const struct lu_env *env,
        struct dt_object        **stripe;
        __u32                   stripe_count;
        int                     *idx_array;
+       __u32                   master_index;
        int                     rc = 0;
        __u32                   i;
        __u32                   j;
@@ -1799,7 +1800,8 @@ static int lod_prep_md_striped_create(const struct lu_env *env,
        stripe_count = le32_to_cpu(lum->lum_stripe_count);
 
        /* shrink the stripe_count to the avaible MDT count */
-       if (stripe_count > lod->lod_remote_mdt_count + 1)
+       if (stripe_count > lod->lod_remote_mdt_count + 1 &&
+           !OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))
                stripe_count = lod->lod_remote_mdt_count + 1;
 
        OBD_ALLOC(stripe, sizeof(stripe[0]) * stripe_count);
@@ -1810,6 +1812,9 @@ static int lod_prep_md_striped_create(const struct lu_env *env,
        if (idx_array == NULL)
                GOTO(out_free, rc = -ENOMEM);
 
+       /* Start index will be the master MDT */
+       master_index = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id;
+       idx_array[0] = master_index;
        for (i = 0; i < stripe_count; i++) {
                struct lod_tgt_desc     *tgt = NULL;
                struct dt_object        *dto;
@@ -1818,44 +1823,42 @@ static int lod_prep_md_striped_create(const struct lu_env *env,
                struct lu_object_conf   conf = { 0 };
                struct dt_device        *tgt_dt = NULL;
 
-               if (i == 0) {
-                       /* Right now, master stripe and master object are
-                        * on the same MDT */
-                       idx = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id;
-                       rc = obd_fid_alloc(env, lod->lod_child_exp, &fid,
-                                          NULL);
-                       if (rc < 0)
-                               GOTO(out_put, rc);
-                       tgt_dt = lod->lod_child;
-                       goto next;
-               }
-
-               idx = (idx_array[i - 1] + 1) % (lod->lod_remote_mdt_count + 1);
-
+               /* Try to find next avaible target */
+               idx = idx_array[i];
                for (j = 0; j < lod->lod_remote_mdt_count;
                     j++, idx = (idx + 1) % (lod->lod_remote_mdt_count + 1)) {
                        bool already_allocated = false;
                        __u32 k;
 
-                       CDEBUG(D_INFO, "try idx %d, mdt cnt %u,"
-                              " allocated %u, last allocated %d\n", idx,
-                              lod->lod_remote_mdt_count, i, idx_array[i - 1]);
+                       CDEBUG(D_INFO, "try idx %d, mdt cnt %u, allocated %u\n",
+                              idx, lod->lod_remote_mdt_count + 1, i);
+                       if (idx == master_index) {
+                               /* Allocate the FID locally */
+                               rc = obd_fid_alloc(env, lod->lod_child_exp,
+                                                  &fid, NULL);
+                               if (rc < 0)
+                                       GOTO(out_put, rc);
+                               tgt_dt = lod->lod_child;
+                               break;
+                       }
 
                        /* Find next available target */
                        if (!cfs_bitmap_check(ltd->ltd_tgt_bitmap, idx))
                                continue;
 
-                       /* check whether the idx already exists
-                        * in current allocated array */
-                       for (k = 0; k < i; k++) {
-                               if (idx_array[k] == idx) {
-                                       already_allocated = true;
-                                       break;
+                       if (likely(!OBD_FAIL_CHECK(OBD_FAIL_LARGE_STRIPE))) {
+                               /* check whether the idx already exists
+                                * in current allocated array */
+                               for (k = 0; k < i; k++) {
+                                       if (idx_array[k] == idx) {
+                                               already_allocated = true;
+                                               break;
+                                       }
                                }
-                       }
 
-                       if (already_allocated)
-                               continue;
+                               if (already_allocated)
+                                       continue;
+                       }
 
                        /* check the status of the OSP */
                        tgt = LTD_TGT(ltd, idx);
@@ -1886,11 +1889,13 @@ static int lod_prep_md_striped_create(const struct lu_env *env,
                        break;
                }
 
-               CDEBUG(D_INFO, "idx %d, mdt cnt %u,"
-                      " allocated %u, last allocated %d\n", idx,
-                      lod->lod_remote_mdt_count, i, idx_array[i - 1]);
-
-next:
+               CDEBUG(D_INFO, "Get idx %d, for stripe %d "DFID"\n",
+                      idx, i, PFID(&fid));
+               idx_array[i] = idx;
+               /* Set the start index for next stripe allocation */
+               if (i < stripe_count)
+                       idx_array[i + 1] = (idx + 1) %
+                                          (lod->lod_remote_mdt_count + 1);
                /* tgt_dt and fid must be ready after search avaible OSP
                 * in the above loop */
                LASSERT(tgt_dt != NULL);
@@ -1902,7 +1907,6 @@ next:
                if (IS_ERR(dto))
                        GOTO(out_put, rc = PTR_ERR(dto));
                stripe[i] = dto;
-               idx_array[i] = idx;
        }
 
        lo->ldo_dir_striped = 1;
@@ -3904,9 +3908,28 @@ static int lod_object_lock(const struct lu_env *env,
                                       res_id);
                einfo->ei_res_id = res_id;
 
-               LASSERT(lo->ldo_stripe[i]);
-               rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh, einfo,
-                                   policy);
+               LASSERT(lo->ldo_stripe[i] != NULL);
+               if (likely(dt_object_remote(lo->ldo_stripe[i]))) {
+                       rc = dt_object_lock(env, lo->ldo_stripe[i], &lockh,
+                                           einfo, policy);
+               } else {
+                       struct ldlm_namespace *ns = einfo->ei_namespace;
+                       ldlm_blocking_callback blocking = einfo->ei_cb_local_bl;
+                       ldlm_completion_callback completion = einfo->ei_cb_cp;
+                       __u64   dlmflags = LDLM_FL_ATOMIC_CB;
+
+                       /* This only happens if there are mulitple stripes
+                        * on the master MDT, i.e. except stripe0, there are
+                        * other stripes on the Master MDT as well, Only
+                        * happens in the test case right now. */
+                       LASSERT(ns != NULL);
+                       rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS,
+                                                   policy, einfo->ei_mode,
+                                                   &dlmflags, blocking,
+                                                   completion, NULL,
+                                                   NULL, 0, LVB_T_NONE,
+                                                   NULL, &lockh);
+               }
                if (rc != 0)
                        GOTO(out, rc);
                slave_locks->lsl_handle[i] = lockh;
index e86175e..6c46520 100644 (file)
@@ -98,12 +98,14 @@ struct thandle *lod_sub_get_thandle(const struct lu_env *env,
        if (type == LU_SEQ_RANGE_OST)
                RETURN(tth->tt_master_sub_thandle);
 
+       sub_th = thandle_get_sub(env, th, sub_obj);
+       if (IS_ERR(sub_th))
+               RETURN(sub_th);
+
        if (tth->tt_multiple_thandle != NULL && record_update != NULL &&
            th->th_result == 0)
                *record_update = true;
 
-       sub_th = thandle_get_sub(env, th, sub_obj);
-
        RETURN(sub_th);
 }
 
@@ -130,11 +132,16 @@ int lod_sub_object_declare_create(const struct lu_env *env,
                                  struct thandle *th)
 {
        struct thandle *sub_th;
+       bool record_update;
 
-       sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                return PTR_ERR(sub_th);
 
+       if (record_update)
+               update_record_size(env, create, th, lu_object_fid(&dt->do_lu),
+                                  attr, hint, dof);
+
        return dt_declare_create(env, dt, attr, hint, dof, sub_th);
 }
 
@@ -199,13 +206,17 @@ int lod_sub_object_declare_ref_add(const struct lu_env *env,
                                   struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update)
+               update_record_size(env, ref_add, th, lu_object_fid(&dt->do_lu));
+
        rc = dt_declare_ref_add(env, dt, sub_th);
 
        RETURN(rc);
@@ -265,13 +276,17 @@ int lod_sub_object_declare_ref_del(const struct lu_env *env,
                                   struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update)
+               update_record_size(env, ref_del, th, lu_object_fid(&dt->do_lu));
+
        rc = dt_declare_ref_del(env, dt, sub_th);
 
        RETURN(rc);
@@ -331,13 +346,18 @@ int lod_sub_object_declare_destroy(const struct lu_env *env,
                                   struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update)
+               update_record_size(env, object_destroy, th,
+                                  lu_object_fid(&dt->do_lu));
+
        rc = dt_declare_destroy(env, dt, sub_th);
 
        RETURN(rc);
@@ -401,11 +421,16 @@ int lod_sub_object_declare_insert(const struct lu_env *env,
                                  struct thandle *th)
 {
        struct thandle *sub_th;
+       bool            record_update;
 
-       sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                return PTR_ERR(sub_th);
 
+       if (record_update)
+               update_record_size(env, index_insert, th,
+                                  lu_object_fid(&dt->do_lu), rec, key);
+
        return dt_declare_insert(env, dt, rec, key, sub_th);
 }
 
@@ -467,11 +492,16 @@ int lod_sub_object_declare_delete(const struct lu_env *env,
                                  struct thandle *th)
 {
        struct thandle *sub_th;
+       bool            record_update;
 
-       sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                return PTR_ERR(sub_th);
 
+       if (record_update)
+               update_record_size(env, index_delete, th,
+                                  lu_object_fid(&dt->do_lu), key);
+
        return dt_declare_delete(env, dt, key, sub_th);
 }
 
@@ -533,13 +563,19 @@ int lod_sub_object_declare_xattr_set(const struct lu_env *env,
                                     struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update)
+               update_record_size(env, xattr_set, th,
+                                  lu_object_fid(&dt->do_lu),
+                                  buf, name, fl);
+
        rc = dt_declare_xattr_set(env, dt, buf, name, fl, sub_th);
 
        RETURN(rc);
@@ -606,13 +642,18 @@ int lod_sub_object_declare_attr_set(const struct lu_env *env,
                                    struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update)
+               update_record_size(env, attr_set, th,
+                                  lu_object_fid(&dt->do_lu), attr);
+
        rc = dt_declare_attr_set(env, dt, attr, sub_th);
 
        RETURN(rc);
@@ -677,13 +718,19 @@ int lod_sub_object_declare_xattr_del(const struct lu_env *env,
                                     struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update)
+               update_record_size(env, xattr_del, th,
+                                  lu_object_fid(&dt->do_lu),
+                                  name);
+
        rc = dt_declare_xattr_del(env, dt, name, sub_th);
 
        RETURN(rc);
@@ -749,13 +796,19 @@ int lod_sub_object_declare_write(const struct lu_env *env,
                                 struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update)
+               update_record_size(env, write, th,
+                                  lu_object_fid(&dt->do_lu),
+                                  buf, pos);
+
        rc = dt_declare_write(env, dt, buf, pos, sub_th);
 
        RETURN(rc);
@@ -821,13 +874,19 @@ int lod_sub_object_declare_punch(const struct lu_env *env,
                                 struct thandle *th)
 {
        struct thandle  *sub_th;
+       bool            record_update;
        int             rc;
        ENTRY;
 
-       sub_th = lod_sub_get_thandle(env, th, dt, NULL);
+       sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
        if (IS_ERR(sub_th))
                RETURN(PTR_ERR(sub_th));
 
+       if (record_update)
+               update_record_size(env, punch, th,
+                                  lu_object_fid(&dt->do_lu),
+                                  start, end);
+
        rc = dt_declare_punch(env, dt, start, end, sub_th);
 
        RETURN(rc);
index f206e6b..71e49f6 100644 (file)
@@ -483,8 +483,10 @@ static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
        einfo->ei_type = LDLM_IBITS;
        einfo->ei_mode = mode;
        einfo->ei_cb_bl = mdt_remote_blocking_ast;
+       einfo->ei_cb_local_bl = mdt_blocking_ast;
        einfo->ei_cb_cp = ldlm_completion_ast;
        einfo->ei_enq_slave = 1;
+       einfo->ei_namespace = mti->mti_mdt->mdt_namespace;
        memset(policy, 0, sizeof(*policy));
        policy->l_inodebits.bits = ibits;
 
index fb36839..0a09fb5 100644 (file)
@@ -604,8 +604,9 @@ out_remote_unlock:
        if (rc < 0)
                GOTO(out, rc);
 
-       CDEBUG(D_OTHER, "added record "DOSTID": idx: %u, %u\n",
-              POSTID(&loghandle->lgh_id.lgl_oi), index, rec->lrh_len);
+       CDEBUG(D_OTHER, "added record "DOSTID": idx: %u, %u off"LPU64"\n",
+              POSTID(&loghandle->lgh_id.lgl_oi), index, rec->lrh_len,
+              lgi->lgi_off);
        if (reccookie != NULL) {
                reccookie->lgc_lgl = loghandle->lgh_id;
                reccookie->lgc_index = index;
index be7db77..efe9a85 100644 (file)
@@ -130,6 +130,9 @@ int osp_object_update_request_create(struct osp_update_request *our,
        if (ours == NULL)
                return -ENOMEM;
 
+       if (size < OUT_UPDATE_INIT_BUFFER_SIZE)
+               size = OUT_UPDATE_INIT_BUFFER_SIZE;
+
        ours->ours_req = object_update_request_alloc(size);
 
        if (IS_ERR(ours->ours_req)) {
index 49b97cc..32fe7c9 100644 (file)
@@ -3729,18 +3729,6 @@ void lustre_assert_wire_constants(void)
                 (long long)(int)offsetof(struct llog_log_hdr, llh_tgtuuid));
        LASSERTF((int)sizeof(((struct llog_log_hdr *)0)->llh_tgtuuid) == 40, "found %lld\n",
                 (long long)(int)sizeof(((struct llog_log_hdr *)0)->llh_tgtuuid));
-       LASSERTF((int)offsetof(struct llog_log_hdr, llh_reserved) == 84, "found %lld\n",
-                (long long)(int)offsetof(struct llog_log_hdr, llh_reserved));
-       LASSERTF((int)sizeof(((struct llog_log_hdr *)0)->llh_reserved) == 4, "found %lld\n",
-                (long long)(int)sizeof(((struct llog_log_hdr *)0)->llh_reserved));
-       LASSERTF((int)offsetof(struct llog_log_hdr, llh_bitmap) == 88, "found %lld\n",
-                (long long)(int)offsetof(struct llog_log_hdr, llh_bitmap));
-       LASSERTF((int)sizeof(((struct llog_log_hdr *)0)->llh_bitmap) == 8096, "found %lld\n",
-                (long long)(int)sizeof(((struct llog_log_hdr *)0)->llh_bitmap));
-       LASSERTF((int)offsetof(struct llog_log_hdr, llh_tail) == 8184, "found %lld\n",
-                (long long)(int)offsetof(struct llog_log_hdr, llh_tail));
-       LASSERTF((int)sizeof(((struct llog_log_hdr *)0)->llh_tail) == 8, "found %lld\n",
-                (long long)(int)sizeof(((struct llog_log_hdr *)0)->llh_tail));
 
        /* Checks for struct llog_cookie */
        LASSERTF((int)sizeof(struct llog_cookie) == 32, "found %lld\n",
@@ -4084,10 +4072,10 @@ void lustre_assert_wire_constants(void)
                 (long long)(int)offsetof(struct fiemap_extent, fe_flags));
        LASSERTF((int)sizeof(((struct fiemap_extent *)0)->fe_flags) == 4, "found %lld\n",
                 (long long)(int)sizeof(((struct fiemap_extent *)0)->fe_flags));
-       LASSERTF((int)offsetof(struct fiemap_extent, fe_device) == 44, "found %lld\n",
-                (long long)(int)offsetof(struct fiemap_extent, fe_device));
-       LASSERTF((int)sizeof(((struct fiemap_extent *)0)->fe_device) == 4, "found %lld\n",
-                (long long)(int)sizeof(((struct fiemap_extent *)0)->fe_device));
+       LASSERTF((int)offsetof(struct fiemap_extent, fe_reserved[0]) == 44, "found %lld\n",
+                (long long)(int)offsetof(struct fiemap_extent, fe_reserved[0]));
+       LASSERTF((int)sizeof(((struct fiemap_extent *)0)->fe_reserved[0]) == 4, "found %lld\n",
+                (long long)(int)sizeof(((struct fiemap_extent *)0)->fe_reserved[0]));
        CLASSERT(FIEMAP_EXTENT_LAST == 0x00000001);
        CLASSERT(FIEMAP_EXTENT_UNKNOWN == 0x00000002);
        CLASSERT(FIEMAP_EXTENT_DELALLOC == 0x00000004);
index 62e54d7..e6a0879 100644 (file)
@@ -57,6 +57,7 @@ const char *update_op_str(__u16 opc)
                [OUT_XATTR_DEL] = "xattr_del",
                [OUT_PUNCH] = "punch",
                [OUT_READ] = "read",
+               [OUT_NOOP] = "noop",
        };
 
        if (opc < ARRAY_SIZE(opc_str) && opc_str[opc] != NULL)
index 698dacb..d953f85 100644 (file)
@@ -63,12 +63,9 @@ void update_records_dump(const struct update_records *records,
 {
        const struct update_ops *ops;
        const struct update_op  *op = NULL;
-       struct update_params    *params;
+       struct update_params    *params = NULL;
        unsigned int            i;
 
-       ops = &records->ur_ops;
-       params = update_records_get_params(records);
-
        CDEBUG(mask, "master transno = "LPU64" batchid = "LPU64" flags = %x"
               " ops = %d params = %d\n", records->ur_master_transno,
               records->ur_batchid, records->ur_flags, records->ur_update_count,
@@ -80,27 +77,34 @@ void update_records_dump(const struct update_records *records,
        if (!dump_updates)
                return;
 
+       ops = &records->ur_ops;
+       if (records->ur_param_count > 0)
+               params = update_records_get_params(records);
+
        op = &ops->uops_op[0];
-       for (i = 0; i < records->ur_update_count; i++) {
+       for (i = 0; i < records->ur_update_count; i++,
+                                 op = update_op_next_op(op)) {
                unsigned int j;
 
                CDEBUG(mask, "update %dth "DFID" %s params_count = %hu\n", i,
                       PFID(&op->uop_fid), update_op_str(op->uop_type),
                       op->uop_param_count);
 
+               if (params == NULL)
+                       continue;
+
                for (j = 0;  j < op->uop_param_count; j++) {
                        struct object_update_param *param;
 
                        param = update_params_get_param(params,
-                                       (unsigned int)op->uop_params_off[j],
+                               (unsigned int)op->uop_params_off[j],
                                        records->ur_param_count);
 
-                       LASSERT(param != NULL);
+                       if (param == NULL)
+                               continue;
                        CDEBUG(mask, "param = %p %dth off = %hu size = %hu\n",
                               param, j, op->uop_params_off[j], param->oup_len);
                }
-
-               op = update_op_next_op(op);
        }
 }
 
@@ -239,6 +243,66 @@ static int update_records_update_pack(const struct lu_env *env,
 }
 
 /**
+ * Calculate update_records size
+ *
+ * Calculate update_records size by param_count and param_sizes array.
+ *
+ * \param[in] param_count      the count of parameters
+ * \param[in] sizes            the size array of these parameters
+ *
+ * \retval                     the size of this update
+ */
+static size_t update_records_update_size(__u32 param_count, size_t *sizes)
+{
+       int i;
+       size_t size;
+
+       /* Check whether the packing exceeding the maximum update size */
+       size = update_op_size(param_count);
+
+       for (i = 0; i < param_count; i++)
+               size += cfs_size_round(sizeof(struct object_update_param) +
+                                      sizes[i]);
+
+       return size;
+}
+
+/**
+ * Calculate create update size
+ *
+ * \param[in] env      execution environment
+ * \param[in] ops      ur_ops in update records
+ * \param[in] fid      FID of the object to be created
+ * \param[in] attr     attribute of the object to be created
+ * \param[in] hint     creation hint
+ * \param[in] dof      creation format information
+ *
+ * \retval             size of create update.
+ */
+size_t update_records_create_size(const struct lu_env *env,
+                                 const struct lu_fid *fid,
+                                 const struct lu_attr *attr,
+                                 const struct dt_allocation_hint *hint,
+                                 struct dt_object_format *dof)
+{
+       size_t  sizes[2];
+       int     param_count = 0;
+
+       if (attr != NULL) {
+               sizes[param_count] = sizeof(struct obdo);
+               param_count++;
+       }
+
+       if (hint != NULL && hint->dah_parent != NULL) {
+               sizes[param_count] = sizeof(*fid);
+               param_count++;
+       }
+
+       return update_records_update_size(param_count, sizes);
+}
+EXPORT_SYMBOL(update_records_create_size);
+
+/**
  * Pack create update
  *
  * Pack create update into update records.
@@ -304,6 +368,26 @@ int update_records_create_pack(const struct lu_env *env,
 EXPORT_SYMBOL(update_records_create_pack);
 
 /**
+ * Calculate attr set update size
+ *
+ * \param[in] env      execution environment
+ * \param[in] ops      ur_ops in update records
+ * \param[in] fid      FID of the object to set attr
+ * \param[in] attr     attribute of attr set
+ *
+ * \retval             size of attr set update.
+ */
+size_t update_records_attr_set_size(const struct lu_env *env,
+                                   const struct lu_fid *fid,
+                                   const struct lu_attr *attr)
+{
+       size_t size = sizeof(struct obdo);
+
+       return update_records_update_size(1, &size);
+}
+EXPORT_SYMBOL(update_records_attr_set_size);
+
+/**
  * Pack attr set update
  *
  * Pack attr_set update into update records.
@@ -345,6 +429,21 @@ int update_records_attr_set_pack(const struct lu_env *env,
 EXPORT_SYMBOL(update_records_attr_set_pack);
 
 /**
+ * Calculate ref add update size
+ *
+ * \param[in] env      execution environment
+ * \param[in] fid      FID of the object to add reference
+ *
+ * \retval             size of ref_add udpate.
+ */
+size_t update_records_ref_add_size(const struct lu_env *env,
+                                  const struct lu_fid *fid)
+{
+       return update_records_update_size(0, NULL);
+}
+EXPORT_SYMBOL(update_records_ref_add_size);
+
+/**
  * Pack ref add update
  *
  * Pack ref add update into update records.
@@ -377,6 +476,55 @@ int update_records_ref_add_pack(const struct lu_env *env,
 EXPORT_SYMBOL(update_records_ref_add_pack);
 
 /**
+ * Pack noop update
+ *
+ * Pack no op update into update records. Note: no op means
+ * the update does not need do anything, which is only used
+ * in test case to verify large size record.
+ *
+ * \param[in] env      execution environment
+ * \param[in] ops      ur_ops in update records
+ * \param[in|out] op_count pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params   ur_params in update records
+ * \param[in|out] param_count pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid      FID of the object to add reference
+ *
+ * \retval             0 if packing succeeds.
+ * \retval             negative errno if packing fails.
+ */
+int update_records_noop_pack(const struct lu_env *env,
+                            struct update_ops *ops,
+                            unsigned int *op_count,
+                            size_t *max_ops_size,
+                            struct update_params *params,
+                            unsigned int *param_count,
+                            size_t *max_param_size,
+                            const struct lu_fid *fid)
+{
+       return update_records_update_pack(env, fid, OUT_NOOP, ops, op_count,
+                                         max_ops_size, params, param_count,
+                                         max_param_size, 0, NULL, NULL);
+}
+EXPORT_SYMBOL(update_records_noop_pack);
+
+/**
+ * Calculate ref del update size
+ *
+ * \param[in] env      execution environment
+ * \param[in] fid      FID of the object to delete reference
+ *
+ * \retval             size of ref_del update.
+ */
+size_t update_records_ref_del_size(const struct lu_env *env,
+                                  const struct lu_fid *fid)
+{
+       return update_records_update_size(0, NULL);
+}
+EXPORT_SYMBOL(update_records_ref_del_size);
+
+/**
  * Pack ref del update
  *
  * Pack ref del update into update records.
@@ -409,6 +557,21 @@ int update_records_ref_del_pack(const struct lu_env *env,
 EXPORT_SYMBOL(update_records_ref_del_pack);
 
 /**
+ * Calculate object destroy update size
+ *
+ * \param[in] env      execution environment
+ * \param[in] fid      FID of the object to delete reference
+ *
+ * \retval             size of object destroy update.
+ */
+size_t update_records_object_destroy_size(const struct lu_env *env,
+                                         const struct lu_fid *fid)
+{
+       return update_records_update_size(0, NULL);
+}
+EXPORT_SYMBOL(update_records_object_destroy_size);
+
+/**
  * Pack object destroy update
  *
  * Pack object destroy update into update records.
@@ -441,6 +604,28 @@ int update_records_object_destroy_pack(const struct lu_env *env,
 EXPORT_SYMBOL(update_records_object_destroy_pack);
 
 /**
+ * Calculate index insert update size
+ *
+ * \param[in] env      execution environment
+ * \param[in] fid      FID of the object to insert index
+ * \param[in] rec      record of insertion
+ * \param[in] key      key of insertion
+ *
+ * \retval             the size of index insert update.
+ */
+size_t update_records_index_insert_size(const struct lu_env *env,
+                                       const struct lu_fid *fid,
+                                       const struct dt_rec *rec,
+                                       const struct dt_key *key)
+{
+       size_t                     sizes[3] = { strlen((const char *)key) + 1,
+                                               sizeof(struct lu_fid),
+                                               sizeof(__u32) };
+       return update_records_update_size(3, sizes);
+}
+EXPORT_SYMBOL(update_records_index_insert_size);
+
+/**
  * Pack index insert update
  *
  * Pack index insert update into update records.
@@ -490,6 +675,25 @@ int update_records_index_insert_pack(const struct lu_env *env,
 EXPORT_SYMBOL(update_records_index_insert_pack);
 
 /**
+ * Calculate index delete update size
+ *
+ * \param[in] env      execution environment
+ * \param[in] fid      FID of the object to delete index
+ * \param[in] key      key of deletion
+ *
+ * \retval             the size of index delete update
+ */
+size_t update_records_index_delete_size(const struct lu_env *env,
+                                       const struct lu_fid *fid,
+                                       const struct dt_key *key)
+{
+       size_t size = strlen((const char *)key) + 1;
+
+       return update_records_update_size(1, &size);
+}
+EXPORT_SYMBOL(update_records_index_delete_size);
+
+/**
  * Pack index delete update
  *
  * Pack index delete update into update records.
@@ -527,6 +731,28 @@ int update_records_index_delete_pack(const struct lu_env *env,
 EXPORT_SYMBOL(update_records_index_delete_pack);
 
 /**
+ * Calculate xattr set size
+ *
+ * \param[in] env      execution environment
+ * \param[in] fid      FID of the object to set xattr
+ * \param[in] buf      xattr to be set
+ * \param[in] name     name of the xattr
+ * \param[in] flag     flag for setting xattr
+ *
+ * \retval             size of xattr set update.
+ */
+size_t update_records_xattr_set_size(const struct lu_env *env,
+                                    const struct lu_fid *fid,
+                                    const struct lu_buf *buf,
+                                    const char *name, __u32 flag)
+{
+       size_t  sizes[3] = {strlen(name) + 1, buf->lb_len, sizeof(flag)};
+
+       return update_records_update_size(3, sizes);
+}
+EXPORT_SYMBOL(update_records_xattr_set_size);
+
+/**
  * Pack xattr set update
  *
  * Pack xattr set update into update records.
@@ -570,6 +796,25 @@ int update_records_xattr_set_pack(const struct lu_env *env,
 EXPORT_SYMBOL(update_records_xattr_set_pack);
 
 /**
+ * Calculate xattr delete update size.
+ *
+ * \param[in] env      execution environment
+ * \param[in] fid      FID of the object to delete xattr
+ * \param[in] name     name of the xattr
+ *
+ * \retval             size of xattr delet updatee.
+ */
+size_t update_records_xattr_del_size(const struct lu_env *env,
+                                    const struct lu_fid *fid,
+                                    const char *name)
+{
+       size_t  size = strlen(name) + 1;
+
+       return update_records_update_size(1, &size);
+}
+EXPORT_SYMBOL(update_records_xattr_del_size);
+
+/**
  * Pack xattr delete update
  *
  * Pack xattr delete update into update records.
@@ -607,6 +852,27 @@ int update_records_xattr_del_pack(const struct lu_env *env,
 EXPORT_SYMBOL(update_records_xattr_del_pack);
 
 /**
+ * Calculate write update size
+ *
+ * \param[in] env      execution environment
+ * \param[in] fid      FID of the object to write into
+ * \param[in] buf      buffer to write which includes an embedded size field
+ * \param[in] pos      offet in the object to start writing at
+ *
+ * \retval             size of write udpate.
+ */
+size_t update_records_write_size(const struct lu_env *env,
+                                const struct lu_fid *fid,
+                                const struct lu_buf *buf,
+                                __u64 pos)
+{
+       size_t  sizes[2] = {buf->lb_len, sizeof(pos)};
+
+       return update_records_update_size(2, sizes);
+}
+EXPORT_SYMBOL(update_records_write_size);
+
+/**
  * Pack write update
  *
  * Pack write update into update records.
@@ -649,6 +915,26 @@ int update_records_write_pack(const struct lu_env *env,
 EXPORT_SYMBOL(update_records_write_pack);
 
 /**
+ * Calculate size of punch update.
+ *
+ * \param[in] env      execution environment
+ * \param[in] fid      FID of the object to write into
+ * \param[in] start    start offset of punch
+ * \param[in] end      end offet of punch
+ *
+ * \retval             size of update punch.
+ */
+size_t update_records_punch_size(const struct lu_env *env,
+                                const struct lu_fid *fid,
+                                __u64 start, __u64 end)
+{
+       size_t  sizes[2] = {sizeof(start), sizeof(end)};
+
+       return update_records_update_size(2, sizes);
+}
+EXPORT_SYMBOL(update_records_punch_size);
+
+/**
  * Pack punch
  *
  * Pack punch update into update records.
@@ -774,8 +1060,7 @@ int tur_update_extend(struct thandle_update_records *tur,
 
        record_size = llog_update_record_size(tur->tur_update_records);
        /* extend update records buffer */
-       if (new_op_size > (tur->tur_update_records_buf_size - record_size -
-                          sizeof(*tur->tur_update_records))) {
+       if (new_op_size >= (tur->tur_update_records_buf_size - record_size)) {
                extend_size = round_up(new_op_size, UPDATE_RECORDS_BUFFER_SIZE);
                rc = tur_update_records_extend(tur,
                                tur->tur_update_records_buf_size +
@@ -787,7 +1072,7 @@ int tur_update_extend(struct thandle_update_records *tur,
        /* extend parameters buffer */
        params_size = update_params_size(tur->tur_update_params,
                                         tur->tur_update_param_count);
-       if (new_param_size > (tur->tur_update_params_buf_size -
+       if (new_param_size >= (tur->tur_update_params_buf_size -
                              params_size)) {
                extend_size = round_up(new_param_size,
                                       UPDATE_PARAMS_BUFFER_SIZE);
index 2fd87d7..cad0183 100644 (file)
@@ -193,6 +193,36 @@ dtrq_sub_lookup(struct distribute_txn_replay_req *dtrq, __u32 mdt_index)
 }
 
 /**
+ * Try to add cookie to sub distribute txn request
+ *
+ * Check if the update log cookie has been added to the request, if not,
+ * add it to the dtrqs_cookie_list.
+ *
+ * \param[in] dtrqs    sub replay req where cookies to be added.
+ * \param[in] cookie   cookie to be added.
+ *
+ * \retval             0 if the cookie is adding succeeds.
+ * \retval             negative errno if adding fails.
+ */
+static int dtrq_sub_add_cookie(struct distribute_txn_replay_req_sub *dtrqs,
+                              struct llog_cookie *cookie)
+{
+       struct sub_thandle_cookie *new;
+
+       OBD_ALLOC_PTR(new);
+       if (new == NULL)
+               return -ENOMEM;
+
+       INIT_LIST_HEAD(&new->stc_list);
+       new->stc_cookie = *cookie;
+       /* Note: only single thread will access one sub_request each time,
+        * so no need lock here */
+       list_add(&new->stc_list, &dtrqs->dtrqs_cookie_list);
+
+       return 0;
+}
+
+/**
  * Insert distribute txn sub req replay
  *
  * Allocate sub replay req and insert distribute txn replay list.
@@ -209,31 +239,102 @@ dtrq_sub_create_and_insert(struct distribute_txn_replay_req *dtrq,
                           struct llog_cookie *cookie,
                           __u32 mdt_index)
 {
-       struct distribute_txn_replay_req_sub *dtrqs = NULL;
-       struct distribute_txn_replay_req_sub *new;
+       struct distribute_txn_replay_req_sub    *dtrqs = NULL;
+       struct distribute_txn_replay_req_sub    *new;
+       int                                     rc;
        ENTRY;
 
        spin_lock(&dtrq->dtrq_sub_list_lock);
        dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
        spin_unlock(&dtrq->dtrq_sub_list_lock);
-       if (dtrqs != NULL)
+       if (dtrqs != NULL) {
+               rc = dtrq_sub_add_cookie(dtrqs, cookie);
                RETURN(0);
+       }
 
        OBD_ALLOC_PTR(new);
        if (new == NULL)
                RETURN(-ENOMEM);
 
        INIT_LIST_HEAD(&new->dtrqs_list);
+       INIT_LIST_HEAD(&new->dtrqs_cookie_list);
        new->dtrqs_mdt_index = mdt_index;
-       new->dtrqs_llog_cookie = *cookie;
        spin_lock(&dtrq->dtrq_sub_list_lock);
        dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
-       if (dtrqs == NULL)
+       if (dtrqs == NULL) {
                list_add(&new->dtrqs_list, &dtrq->dtrq_sub_list);
-       else
+               dtrqs = new;
+       } else {
                OBD_FREE_PTR(new);
+       }
        spin_unlock(&dtrq->dtrq_sub_list_lock);
 
+       rc = dtrq_sub_add_cookie(dtrqs, cookie);
+
+       RETURN(rc);
+}
+
+/**
+ * append updates to the current replay updates
+ *
+ * Append more updates to the existent replay update. And this is only
+ * used when combining mulitple updates into one large updates during
+ * replay.
+ *
+ * \param[in] dtrq     the update replay request where the new update
+ *                      records will be added.
+ * \param[in] lur      the new update record.
+ *
+ * \retval             0 if appending succeeds.
+ * \retval             negative errno if appending fails.
+ */
+static int dtrq_append_updates(struct distribute_txn_replay_req *dtrq,
+                              struct update_records *record)
+{
+       struct llog_update_record *new_lur;
+       size_t lur_size = dtrq->dtrq_lur_size;
+       void *ptr;
+       ENTRY;
+
+       /* Because several threads might retrieve the same records from
+        * different targets, and we only need one copy of records. So
+        * we will check if the records is in the next one, if not, just
+        * skip it */
+       spin_lock(&dtrq->dtrq_sub_list_lock);
+       if (dtrq->dtrq_lur->lur_update_rec.ur_index + 1 != record->ur_index) {
+               spin_unlock(&dtrq->dtrq_sub_list_lock);
+               RETURN(0);
+       }
+       dtrq->dtrq_lur->lur_update_rec.ur_index++;
+       spin_unlock(&dtrq->dtrq_sub_list_lock);
+
+       lur_size += update_records_size(record);
+       OBD_ALLOC_LARGE(new_lur, lur_size);
+       if (new_lur == NULL) {
+               spin_lock(&dtrq->dtrq_sub_list_lock);
+               dtrq->dtrq_lur->lur_update_rec.ur_index--;
+               spin_unlock(&dtrq->dtrq_sub_list_lock);
+               RETURN(-ENOMEM);
+       }
+
+       /* Copy the old and new records to the new allocated buffer */
+       memcpy(new_lur, dtrq->dtrq_lur, dtrq->dtrq_lur_size);
+       ptr = (char *)&new_lur->lur_update_rec +
+               update_records_size(&new_lur->lur_update_rec);
+       memcpy(ptr, &record->ur_ops,
+              update_records_size(record) -
+              offsetof(struct update_records, ur_ops));
+
+       new_lur->lur_update_rec.ur_update_count += record->ur_update_count;
+       new_lur->lur_update_rec.ur_param_count += record->ur_param_count;
+       new_lur->lur_hdr.lrh_len = llog_update_record_size(new_lur);
+
+       /* Replace the records */
+       OBD_FREE_LARGE(dtrq->dtrq_lur, dtrq->dtrq_lur_size);
+       dtrq->dtrq_lur = new_lur;
+       dtrq->dtrq_lur_size = lur_size;
+       dtrq->dtrq_lur->lur_update_rec.ur_flags = record->ur_flags;
+       update_records_dump(&new_lur->lur_update_rec, D_INFO, true);
        RETURN(0);
 }
 
@@ -266,7 +367,8 @@ insert_update_records_to_replay_list(struct target_distribute_txn_data *tdtd,
        CDEBUG(D_HA, "%s: insert record batchid = "LPU64" transno = "LPU64
               " mdt_index %u\n", tdtd->tdtd_lut->lut_obd->obd_name,
               record->ur_batchid, record->ur_master_transno, mdt_index);
-again:
+
+       /* First try to build the replay update request with the records */
        spin_lock(&tdtd->tdtd_replay_list_lock);
        dtrq = dtrq_lookup(tdtd, record->ur_batchid);
        spin_unlock(&tdtd->tdtd_replay_list_lock);
@@ -286,27 +388,38 @@ again:
                spin_lock(&tdtd->tdtd_replay_list_lock);
                rc = dtrq_insert(tdtd, dtrq);
                spin_unlock(&tdtd->tdtd_replay_list_lock);
-       } else if (record->ur_master_transno != 0 &&
-                  dtrq->dtrq_lur->lur_update_rec.ur_master_transno !=
-                  record->ur_master_transno) {
-               /* If the master transno in update header is not matched with
-                * the one in the record, then it means the dtrq is originally
-                * created by master record, and we need update master transno
-                * and reposition the dtrq(by master transno). */
-               dtrq->dtrq_lur->lur_update_rec.ur_master_transno =
-                                               record->ur_master_transno;
-               list_del_init(&dtrq->dtrq_list);
-               spin_lock(&tdtd->tdtd_replay_list_lock);
-               rc = dtrq_insert(tdtd, dtrq);
-               spin_unlock(&tdtd->tdtd_replay_list_lock);
-       }
+               if (rc == -EEXIST) {
+                       /* Some one else already add the record */
+                       dtrq_destroy(dtrq);
+                       rc = 0;
+               }
+       } else {
+               struct update_records *dtrq_rec;
+
+               /* If the master transno in update header is not
+                * matched with the one in the record, then it means
+                * the dtrq is originally created by master record,
+                * and we need update master transno and reposition
+                * the dtrq(by master transno). */
+               dtrq_rec = &dtrq->dtrq_lur->lur_update_rec;
+               if (record->ur_master_transno != 0 &&
+                   dtrq_rec->ur_master_transno != record->ur_master_transno) {
+                       dtrq_rec->ur_master_transno = record->ur_master_transno;
+                       spin_lock(&tdtd->tdtd_replay_list_lock);
+                       list_del_init(&dtrq->dtrq_list);
+                       rc = dtrq_insert(tdtd, dtrq);
+                       spin_unlock(&tdtd->tdtd_replay_list_lock);
+                       if (rc < 0)
+                               return rc;
+               }
 
-       if (rc == -EEXIST) {
-               dtrq_destroy(dtrq);
-               rc = 0;
-               goto again;
+               /* This is a partial update records, let's try to append
+                * the record to the current replay request */
+               if (record->ur_flags & UPDATE_RECORD_CONTINUE)
+                       rc = dtrq_append_updates(dtrq, record);
        }
 
+       /* Then create and add sub update request */
        rc = dtrq_sub_create_and_insert(dtrq, cookie, mdt_index);
 
        RETURN(rc);
@@ -350,7 +463,15 @@ void dtrq_destroy(struct distribute_txn_replay_req *dtrq)
        LASSERT(list_empty(&dtrq->dtrq_list));
        spin_lock(&dtrq->dtrq_sub_list_lock);
        list_for_each_entry_safe(dtrqs, tmp, &dtrq->dtrq_sub_list, dtrqs_list) {
+               struct sub_thandle_cookie *stc;
+               struct sub_thandle_cookie *tmp;
+
                list_del(&dtrqs->dtrqs_list);
+               list_for_each_entry_safe(stc, tmp, &dtrqs->dtrqs_cookie_list,
+                                        stc_list) {
+                       list_del(&stc->stc_list);
+                       OBD_FREE_PTR(stc);
+               }
                OBD_FREE_PTR(dtrqs);
        }
        spin_unlock(&dtrq->dtrq_sub_list_lock);
@@ -496,8 +617,15 @@ static int update_is_committed(const struct lu_env *env,
        dtrqs = dtrq_sub_lookup(dtrq, mdt_index);
        if (dtrqs != NULL || top_th->tt_multiple_thandle->tmt_committed) {
                st->st_committed = 1;
-               if (dtrqs != NULL)
-                       st->st_cookie = dtrqs->dtrqs_llog_cookie;
+               if (dtrqs != NULL) {
+                       struct sub_thandle_cookie *stc;
+                       struct sub_thandle_cookie *tmp;
+
+                       list_for_each_entry_safe(stc, tmp,
+                                                &dtrqs->dtrqs_cookie_list,
+                                                stc_list)
+                               list_move(&stc->stc_list, &st->st_cookie_list);
+               }
                RETURN(0);
        }
 
@@ -886,6 +1014,9 @@ static int update_recovery_exec(const struct lu_env *env,
                struct dt_device        *sub_dt;
                struct sub_thandle      *st;
 
+               if (op->uop_type == OUT_NOOP)
+                       continue;
+
                dt_obj = dt_locate(env, tdtd->tdtd_dt, fid);
                if (IS_ERR(dt_obj)) {
                        rc = PTR_ERR(dt_obj);
index 28bf6be..001f21d 100644 (file)
@@ -81,12 +81,17 @@ static void top_multiple_thandle_dump(struct top_multiple_thandle *tmt,
               tmt->tmt_result, tmt->tmt_batchid);
 
        list_for_each_entry(st, &tmt->tmt_sub_thandle_list, st_sub_list) {
-               CDEBUG(mask, "st %p obd %s committed %d sub_th %p "
-                      " cookie "DOSTID": %u\n",
+               struct sub_thandle_cookie *stc;
+
+               CDEBUG(mask, "st %p obd %s committed %d sub_th %p\n",
                       st, st->st_dt->dd_lu_dev.ld_obd->obd_name,
-                      st->st_committed, st->st_sub_th,
-                      POSTID(&st->st_cookie.lgc_lgl.lgl_oi),
-                      st->st_cookie.lgc_index);
+                      st->st_committed, st->st_sub_th);
+
+               list_for_each_entry(stc, &st->st_cookie_list, stc_list) {
+                       CDEBUG(mask, " cookie "DOSTID": %u\n",
+                              POSTID(&stc->stc_cookie.lgc_lgl.lgl_oi),
+                              stc->stc_cookie.lgc_index);
+               }
        }
 }
 
@@ -99,16 +104,18 @@ static void top_multiple_thandle_dump(struct top_multiple_thandle *tmt,
  * \param[in] env      execution environment
  * \param[in] record   update records being written
  * \param[in] sub_th   sub transaction handle
+ * \param[in] record_size total update record size
  *
  * \retval             0 if writing succeeds
  * \retval             negative errno if writing fails
  */
 static int sub_declare_updates_write(const struct lu_env *env,
                                     struct llog_update_record *record,
-                                    struct thandle *sub_th)
+                                    struct thandle *sub_th, size_t record_size)
 {
        struct llog_ctxt        *ctxt;
        struct dt_device        *dt = sub_th->th_dev;
+       int                     left = record_size;
        int rc;
 
        /* If ctxt is NULL, it means not need to write update,
@@ -118,14 +125,24 @@ static int sub_declare_updates_write(const struct lu_env *env,
        LASSERT(ctxt != NULL);
 
        /* Not ready to record updates yet. */
-       if (ctxt->loc_handle == NULL) {
-               llog_ctxt_put(ctxt);
-               return 0;
-       }
+       if (ctxt->loc_handle == NULL)
+               GOTO(out_put, rc = 0);
 
-       rc = llog_declare_add(env, ctxt->loc_handle, &record->lur_hdr,
-                             sub_th);
+       rc = llog_declare_add(env, ctxt->loc_handle,
+                             &record->lur_hdr, sub_th);
+       if (rc < 0)
+               GOTO(out_put, rc);
+
+       while (left > ctxt->loc_chunk_size) {
+               rc = llog_declare_add(env, ctxt->loc_handle,
+                                     &record->lur_hdr, sub_th);
+               if (rc < 0)
+                       GOTO(out_put, rc);
 
+               left -= ctxt->loc_chunk_size;
+       }
+
+out_put:
        llog_ctxt_put(ctxt);
 
        return rc;
@@ -148,12 +165,21 @@ static int sub_declare_updates_write(const struct lu_env *env,
  */
 static int sub_updates_write(const struct lu_env *env,
                             struct llog_update_record *record,
-                            struct thandle *sub_th,
-                            struct llog_cookie *cookie)
+                            struct sub_thandle *sub_th)
 {
-       struct dt_device        *dt = sub_th->th_dev;
+       struct dt_device        *dt = sub_th->st_dt;
        struct llog_ctxt        *ctxt;
        int                     rc;
+       struct llog_update_record *lur = NULL;
+       struct update_params    *params = NULL;
+       __u32                   update_count = 0;
+       __u32                   param_count = 0;
+       __u32                   last_update_count = 0;
+       __u32                   last_param_count = 0;
+       void                    *src;
+       void                    *start;
+       void                    *next;
+       struct sub_thandle_cookie *stc;
        ENTRY;
 
        ctxt = llog_get_context(dt->dd_lu_dev.ld_obd,
@@ -162,10 +188,8 @@ static int sub_updates_write(const struct lu_env *env,
 
        /* Not ready to record updates yet, usually happens
         * in error handler path */
-       if (ctxt->loc_handle == NULL) {
-               llog_ctxt_put(ctxt);
-               RETURN(0);
-       }
+       if (ctxt->loc_handle == NULL)
+               GOTO(llog_put, rc = 0);
 
        /* Since the cross-MDT updates will includes both local
         * and remote updates, the update ops count must > 1 */
@@ -174,16 +198,124 @@ static int sub_updates_write(const struct lu_env *env,
                 "lrh_len %u record_size %zu\n", record->lur_hdr.lrh_len,
                 llog_update_record_size(record));
 
-       rc = llog_add(env, ctxt->loc_handle, &record->lur_hdr,
-                     cookie, sub_th);
-       llog_ctxt_put(ctxt);
+       if (likely(record->lur_hdr.lrh_len <= ctxt->loc_chunk_size)) {
+               OBD_ALLOC_PTR(stc);
+               if (stc == NULL)
+                       GOTO(llog_put, rc = -ENOMEM);
+               INIT_LIST_HEAD(&stc->stc_list);
+
+               rc = llog_add(env, ctxt->loc_handle, &record->lur_hdr,
+                             &stc->stc_cookie, sub_th->st_sub_th);
+
+               CDEBUG(D_INFO, "%s: Add update log "DOSTID":%u: rc = %d\n",
+                      dt->dd_lu_dev.ld_obd->obd_name,
+                      POSTID(&stc->stc_cookie.lgc_lgl.lgl_oi),
+                      stc->stc_cookie.lgc_index, rc);
+
+               if (rc > 0) {
+                       list_add(&stc->stc_list, &sub_th->st_cookie_list);
+                       rc = 0;
+               } else {
+                       OBD_FREE_PTR(stc);
+               }
+
+               GOTO(llog_put, rc);
+       }
 
-       CDEBUG(D_INFO, "%s: Add update log "DOSTID":%u.\n",
-              dt->dd_lu_dev.ld_obd->obd_name,
-              POSTID(&cookie->lgc_lgl.lgl_oi), cookie->lgc_index);
+       /* Split the records into chunk_size update record */
+       OBD_ALLOC_LARGE(lur, ctxt->loc_chunk_size);
+       if (lur == NULL)
+               GOTO(llog_put, rc = -ENOMEM);
 
-       if (rc > 0)
-               rc = 0;
+       memcpy(lur, &record->lur_hdr, sizeof(record->lur_hdr));
+       lur->lur_update_rec.ur_update_count = 0;
+       lur->lur_update_rec.ur_param_count = 0;
+       src = &record->lur_update_rec.ur_ops;
+       start = next = src;
+       lur->lur_hdr.lrh_len = llog_update_record_size(lur);
+       params = update_records_get_params(&record->lur_update_rec);
+       do {
+               size_t rec_len;
+
+               if (update_count < record->lur_update_rec.ur_update_count) {
+                       next = update_op_next_op((struct update_op *)src);
+               } else {
+                       if (param_count == 0)
+                               next = update_records_get_params(
+                                               &record->lur_update_rec);
+                       else
+                               next = (char *)src +
+                                       object_update_param_size(
+                                       (struct object_update_param *)src);
+               }
+
+               rec_len = cfs_size_round((unsigned long)(next - src));
+               /* If its size > llog chunk_size, then write current chunk to
+                * the update llog. */
+               if (lur->lur_hdr.lrh_len + rec_len + LLOG_MIN_REC_SIZE >
+                   ctxt->loc_chunk_size ||
+                   param_count == record->lur_update_rec.ur_param_count) {
+                       lur->lur_update_rec.ur_update_count =
+                               update_count > last_update_count ?
+                               update_count - last_update_count : 0;
+                       lur->lur_update_rec.ur_param_count = param_count -
+                                                            last_param_count;
+
+                       memcpy(&lur->lur_update_rec.ur_ops, start,
+                              (unsigned long)(src - start));
+                       if (last_update_count != 0)
+                               lur->lur_update_rec.ur_flags |=
+                                               UPDATE_RECORD_CONTINUE;
+
+                       update_records_dump(&lur->lur_update_rec, D_INFO, true);
+                       lur->lur_hdr.lrh_len = llog_update_record_size(lur);
+                       LASSERT(lur->lur_hdr.lrh_len <= ctxt->loc_chunk_size);
+
+                       OBD_ALLOC_PTR(stc);
+                       if (stc == NULL)
+                               GOTO(llog_put, rc = -ENOMEM);
+                       INIT_LIST_HEAD(&stc->stc_list);
+
+                       rc = llog_add(env, ctxt->loc_handle,
+                                     &lur->lur_hdr,
+                                     &stc->stc_cookie, sub_th->st_sub_th);
+
+                       CDEBUG(D_INFO, "%s: Add update log "DOSTID":%u"
+                              " rc = %d\n", dt->dd_lu_dev.ld_obd->obd_name,
+                              POSTID(&stc->stc_cookie.lgc_lgl.lgl_oi),
+                              stc->stc_cookie.lgc_index, rc);
+
+                       if (rc > 0) {
+                               list_add(&stc->stc_list,
+                                        &sub_th->st_cookie_list);
+                               rc = 0;
+                       } else {
+                               OBD_FREE_PTR(stc);
+                               GOTO(llog_put, rc);
+                       }
+
+                       last_update_count = update_count;
+                       last_param_count = param_count;
+                       start = src;
+                       lur->lur_update_rec.ur_update_count = 0;
+                       lur->lur_update_rec.ur_param_count = 0;
+                       lur->lur_hdr.lrh_len = llog_update_record_size(lur);
+               }
+
+               src = next;
+               lur->lur_hdr.lrh_len += cfs_size_round(rec_len);
+               if (update_count < record->lur_update_rec.ur_update_count)
+                       update_count++;
+               else if (param_count < record->lur_update_rec.ur_param_count)
+                       param_count++;
+               else
+                       break;
+       } while (1);
+
+llog_put:
+       if (lur != NULL)
+               OBD_FREE_LARGE(lur, ctxt->loc_chunk_size);
+       llog_ctxt_put(ctxt);
 
        RETURN(rc);
 }
@@ -308,6 +440,7 @@ struct sub_thandle *create_sub_thandle(struct top_multiple_thandle *tmt,
                RETURN(ERR_PTR(-ENOMEM));
 
        INIT_LIST_HEAD(&st->st_sub_list);
+       INIT_LIST_HEAD(&st->st_cookie_list);
        st->st_dt = dt_dev;
 
        list_add(&st->st_sub_list, &tmt->tmt_sub_thandle_list);
@@ -503,7 +636,8 @@ static int declare_updates_write(const struct lu_env *env,
                if (st->st_sub_th == NULL)
                        continue;
 
-               rc = sub_declare_updates_write(env, record, st->st_sub_th);
+               rc = sub_declare_updates_write(env, record, st->st_sub_th,
+                                              tmt->tmt_record_size);
                if (rc < 0)
                        break;
        }
@@ -814,8 +948,7 @@ int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev,
 
                lur = tur->tur_update_records;
                /* Write updates to the master MDT */
-               rc = sub_updates_write(env, lur, master_st->st_sub_th,
-                                      &master_st->st_cookie);
+               rc = sub_updates_write(env, lur, master_st);
 
                /* Cleanup the common parameters in the update records,
                 * master transno callback might add more parameters.
@@ -877,8 +1010,7 @@ stop_master_trans:
                            st->st_sub_th->th_result < 0)
                                continue;
 
-                       rc = sub_updates_write(env, lur, st->st_sub_th,
-                                              &st->st_cookie);
+                       rc = sub_updates_write(env, lur, st);
                        if (rc < 0) {
                                th->th_result = rc;
                                break;
@@ -1052,7 +1184,15 @@ void top_multiple_thandle_destroy(struct top_multiple_thandle *tmt)
        LASSERT(tmt->tmt_magic == TOP_THANDLE_MAGIC);
        list_for_each_entry_safe(st, tmp, &tmt->tmt_sub_thandle_list,
                                 st_sub_list) {
+               struct sub_thandle_cookie *stc;
+               struct sub_thandle_cookie *tmp;
+
                list_del(&st->st_sub_list);
+               list_for_each_entry_safe(stc, tmp, &st->st_cookie_list,
+                                        stc_list) {
+                       list_del(&stc->stc_list);
+                       OBD_FREE_PTR(stc);
+               }
                OBD_FREE_PTR(st);
        }
        OBD_FREE_PTR(tmt);
@@ -1083,23 +1223,27 @@ static int distribute_txn_cancel_records(const struct lu_env *env,
                struct llog_ctxt        *ctxt;
                struct obd_device       *obd;
                struct llog_cookie      *cookie;
+               struct sub_thandle_cookie *stc;
                int rc;
 
-               cookie = &st->st_cookie;
-               if (fid_is_zero(&cookie->lgc_lgl.lgl_oi.oi_fid))
-                       continue;
-
                obd = st->st_dt->dd_lu_dev.ld_obd;
                ctxt = llog_get_context(obd, LLOG_UPDATELOG_ORIG_CTXT);
                LASSERT(ctxt);
+               list_for_each_entry(stc, &st->st_cookie_list, stc_list) {
+                       cookie = &stc->stc_cookie;
+                       if (fid_is_zero(&cookie->lgc_lgl.lgl_oi.oi_fid))
+                               continue;
 
-               rc = llog_cat_cancel_records(env, ctxt->loc_handle, 1,
-                                            cookie);
+                       rc = llog_cat_cancel_records(env, ctxt->loc_handle, 1,
+                                                    cookie);
+                       CDEBUG(D_HA, "%s: batchid %llu cancel update log "
+                              DOSTID ".%u : rc = %d\n", obd->obd_name,
+                              tmt->tmt_batchid,
+                              POSTID(&cookie->lgc_lgl.lgl_oi),
+                              cookie->lgc_index, rc);
+               }
 
                llog_ctxt_put(ctxt);
-               CDEBUG(D_HA, "%s: batchid %llu cancel update log "DOSTID
-                      ".%u : rc = %d\n", obd->obd_name, tmt->tmt_batchid,
-                      POSTID(&cookie->lgc_lgl.lgl_oi), cookie->lgc_index, rc);
        }
 
        RETURN(0);
index df9c3e1..7d9eb05 100755 (executable)
@@ -3752,6 +3752,56 @@ test_115() {
 }
 run_test 115 "failover for create/unlink striped directory"
 
+test_116a() {
+       [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
+       [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.7.55) ] &&
+               skip "Do not support large update log before 2.7.55" &&
+               return 0
+       ([ $FAILURE_MODE == "HARD" ] &&
+               [ "$(facet_host mds1)" == "$(facet_host mds2)" ]) &&
+               skip "MDTs needs to be on diff hosts for HARD fail mode" &&
+               return 0
+       local fail_index=0
+
+       mkdir -p $DIR/$tdir
+       replay_barrier mds1
+
+       # OBD_FAIL_SPLIT_UPDATE_REC       0x1702
+       do_facet mds1 "lctl set_param fail_loc=0x80001702"
+       $LFS setdirstripe -c$MDSCOUNT $DIR/$tdir/striped_dir
+
+       fail mds1
+       $CHECKSTAT -t dir $DIR/$tdir/striped_dir ||
+               error "stried_dir does not exists"
+}
+run_test 116a "large update log master MDT recovery"
+
+test_116b() {
+       [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return 0
+       [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.7.55) ] &&
+               skip "Do not support large update log before 2.7.55" &&
+               return 0
+
+       ([ $FAILURE_MODE == "HARD" ] &&
+               [ "$(facet_host mds1)" == "$(facet_host mds2)" ]) &&
+               skip "MDTs needs to be on diff hosts for HARD fail mode" &&
+               return 0
+       local fail_index=0
+
+       mkdir -p $DIR/$tdir
+       replay_barrier mds2
+
+       # OBD_FAIL_SPLIT_UPDATE_REC       0x1702
+       do_facet mds2 "lctl set_param fail_loc=0x80001702"
+       $LFS setdirstripe -c$MDSCOUNT $DIR/$tdir/striped_dir
+
+       fail mds2
+       $CHECKSTAT -t dir $DIR/$tdir/striped_dir ||
+               error "stried_dir does not exists"
+}
+run_test 116b "large update log slave MDT recovery"
+
+
 complete $SECONDS
 check_and_cleanup_lustre
 exit_status
index 679a528..0a9e496 100644 (file)
@@ -13399,6 +13399,57 @@ test_300i() {
 }
 run_test 300i "client handle unknown hash type striped directory"
 
+test_300j() {
+       [ $PARALLEL == "yes" ] && skip "skip parallel run" && return
+       [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.7.55) ] &&
+               skip "Need MDS version at least 2.7.55" && return
+       [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return
+       local stripe_count
+       local file
+
+       mkdir $DIR/$tdir
+
+       #define OBD_FAIL_SPLIT_UPDATE_REC       0x1702
+       $LCTL set_param fail_loc=0x1702
+       $LFS setdirstripe -i 0 -c$MDSCOUNT -t all_char $DIR/$tdir/striped_dir ||
+               error "set striped dir error"
+
+       createmany -o $DIR/$tdir/striped_dir/f- 10 ||
+               error "create files under striped dir failed"
+
+       $LCTL set_param fail_loc=0
+
+       rm -rf $DIR/$tdir || error "unlink striped dir fails"
+
+       return 0
+}
+run_test 300j "test large update record"
+
+test_300k() {
+       [ $PARALLEL == "yes" ] && skip "skip parallel run" && return
+       [ $(lustre_version_code $SINGLEMDS) -lt $(version_code 2.7.55) ] &&
+               skip "Need MDS version at least 2.7.55" && return
+       [ $MDSCOUNT -lt 2 ] && skip "needs >= 2 MDTs" && return
+       local stripe_count
+       local file
+
+       mkdir $DIR/$tdir
+
+       #define OBD_FAIL_LARGE_STRIPE   0x1703
+       $LCTL set_param fail_loc=0x1703
+       $LFS setdirstripe -i 0 -c512 $DIR/$tdir/striped_dir ||
+               error "set striped dir error"
+       $LCTL set_param fail_loc=0
+
+       $LFS getdirstripe $DIR/$tdir/striped_dir ||
+               error "getstripeddir fails"
+       rm -rf $DIR/$tdir/striped_dir ||
+               error "unlink striped dir fails"
+
+       return 0
+}
+run_test 300k "test large striped directory"
+
 prepare_remote_file() {
        mkdir $DIR/$tdir/src_dir ||
                error "create remote source failed"
index 8dde0ce..c2dcbda 100644 (file)
@@ -1617,13 +1617,38 @@ static DIR *opendir_parent(char *path)
 
 static int cb_get_dirstripe(char *path, DIR *d, struct find_param *param)
 {
+       int ret;
+
+again:
        param->fp_lmv_md->lum_stripe_count = param->fp_lmv_stripe_count;
        if (param->fp_get_default_lmv)
                param->fp_lmv_md->lum_magic = LMV_USER_MAGIC;
        else
                param->fp_lmv_md->lum_magic = LMV_MAGIC_V1;
 
-       return ioctl(dirfd(d), LL_IOC_LMV_GETSTRIPE, param->fp_lmv_md);
+       ret = ioctl(dirfd(d), LL_IOC_LMV_GETSTRIPE, param->fp_lmv_md);
+       if (errno == E2BIG && ret != 0) {
+               int stripe_count;
+               int lmv_size;
+
+               stripe_count = (__u32)param->fp_lmv_md->lum_stripe_count;
+               if (stripe_count <= param->fp_lmv_stripe_count)
+                       return ret;
+
+               free(param->fp_lmv_md);
+               param->fp_lmv_stripe_count = stripe_count;
+               lmv_size = lmv_user_md_size(stripe_count, LMV_MAGIC_V1);
+               param->fp_lmv_md = malloc(lmv_size);
+               if (param->fp_lmv_md == NULL) {
+                       llapi_error(LLAPI_MSG_ERROR, -ENOMEM,
+                                   "error: allocation of %d bytes for ioctl",
+                                   lmv_user_md_size(param->fp_lmv_stripe_count,
+                                                    LMV_MAGIC_V1));
+                       return -ENOMEM;
+               }
+               goto again;
+       }
+       return ret;
 }
 
 static int get_lmd_info(char *path, DIR *parent, DIR *dir,
index 46cbd53..aa458fe 100644 (file)
@@ -2167,6 +2167,52 @@ static void check_lfsck_reply(void)
        CHECK_MEMBER(lfsck_reply, lr_padding_2);
 }
 
+static void check_update_params(void)
+{
+       BLANK_LINE();
+       CHECK_STRUCT(update_params);
+       CHECK_MEMBER(update_params, up_params);
+}
+
+static void check_update_op(void)
+{
+       BLANK_LINE();
+       CHECK_STRUCT(update_op);
+       CHECK_MEMBER(update_op, uop_fid);
+       CHECK_MEMBER(update_op, uop_type);
+       CHECK_MEMBER(update_op, uop_param_count);
+       CHECK_MEMBER(update_op, uop_params_off);
+}
+
+static void check_update_ops(void)
+{
+       BLANK_LINE();
+       CHECK_STRUCT(update_ops);
+       CHECK_MEMBER(update_ops, uops_op);
+}
+
+static void check_update_records(void)
+{
+       BLANK_LINE();
+       CHECK_STRUCT(update_records);
+       CHECK_MEMBER(update_records, ur_master_transno);
+       CHECK_MEMBER(update_records, ur_batchid);
+       CHECK_MEMBER(update_records, ur_flags);
+       CHECK_MEMBER(update_records, ur_index);
+       CHECK_MEMBER(update_records, ur_update_count);
+       CHECK_MEMBER(update_records, ur_param_count);
+
+       CHECK_VALUE_X(UPDATE_RECORD_CONTINUE);
+}
+
+static void check_llog_update_record(void)
+{
+       BLANK_LINE();
+       CHECK_STRUCT(llog_update_record);
+       CHECK_MEMBER(llog_update_record, lur_hdr);
+       CHECK_MEMBER(llog_update_record, lur_update_rec);
+}
+
 static void system_string(char *cmdline, char *str, int len)
 {
        int   fds[2];
index 435dfc6..7497a91 100644 (file)
@@ -3738,18 +3738,6 @@ void lustre_assert_wire_constants(void)
                 (long long)(int)offsetof(struct llog_log_hdr, llh_tgtuuid));
        LASSERTF((int)sizeof(((struct llog_log_hdr *)0)->llh_tgtuuid) == 40, "found %lld\n",
                 (long long)(int)sizeof(((struct llog_log_hdr *)0)->llh_tgtuuid));
-       LASSERTF((int)offsetof(struct llog_log_hdr, llh_reserved) == 84, "found %lld\n",
-                (long long)(int)offsetof(struct llog_log_hdr, llh_reserved));
-       LASSERTF((int)sizeof(((struct llog_log_hdr *)0)->llh_reserved) == 4, "found %lld\n",
-                (long long)(int)sizeof(((struct llog_log_hdr *)0)->llh_reserved));
-       LASSERTF((int)offsetof(struct llog_log_hdr, llh_bitmap) == 88, "found %lld\n",
-                (long long)(int)offsetof(struct llog_log_hdr, llh_bitmap));
-       LASSERTF((int)sizeof(((struct llog_log_hdr *)0)->llh_bitmap) == 8096, "found %lld\n",
-                (long long)(int)sizeof(((struct llog_log_hdr *)0)->llh_bitmap));
-       LASSERTF((int)offsetof(struct llog_log_hdr, llh_tail) == 8184, "found %lld\n",
-                (long long)(int)offsetof(struct llog_log_hdr, llh_tail));
-       LASSERTF((int)sizeof(((struct llog_log_hdr *)0)->llh_tail) == 8, "found %lld\n",
-                (long long)(int)sizeof(((struct llog_log_hdr *)0)->llh_tail));
 
        /* Checks for struct llog_cookie */
        LASSERTF((int)sizeof(struct llog_cookie) == 32, "found %lld\n",
@@ -4093,10 +4081,10 @@ void lustre_assert_wire_constants(void)
                 (long long)(int)offsetof(struct fiemap_extent, fe_flags));
        LASSERTF((int)sizeof(((struct fiemap_extent *)0)->fe_flags) == 4, "found %lld\n",
                 (long long)(int)sizeof(((struct fiemap_extent *)0)->fe_flags));
-       LASSERTF((int)offsetof(struct fiemap_extent, fe_device) == 44, "found %lld\n",
-                (long long)(int)offsetof(struct fiemap_extent, fe_device));
-       LASSERTF((int)sizeof(((struct fiemap_extent *)0)->fe_device) == 4, "found %lld\n",
-                (long long)(int)sizeof(((struct fiemap_extent *)0)->fe_device));
+       LASSERTF((int)offsetof(struct fiemap_extent, fe_reserved[0]) == 44, "found %lld\n",
+                (long long)(int)offsetof(struct fiemap_extent, fe_reserved[0]));
+       LASSERTF((int)sizeof(((struct fiemap_extent *)0)->fe_reserved[0]) == 4, "found %lld\n",
+                (long long)(int)sizeof(((struct fiemap_extent *)0)->fe_reserved[0]));
        CLASSERT(FIEMAP_EXTENT_LAST == 0x00000001);
        CLASSERT(FIEMAP_EXTENT_UNKNOWN == 0x00000002);
        CLASSERT(FIEMAP_EXTENT_DELALLOC == 0x00000004);