Whamcloud - gitweb
LU-3536 lod: record update for cross-MDT operation
[fs/lustre-release.git] / lustre / target / update_records.c
diff --git a/lustre/target/update_records.c b/lustre/target/update_records.c
new file mode 100644 (file)
index 0000000..836f150
--- /dev/null
@@ -0,0 +1,965 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2014, Intel Corporation.
+ */
+
+/*
+ * lustre/target/update_records.c
+ *
+ * This file implement the methods to pack updates as update records, which
+ * will be written to the disk as llog record, and might be used during
+ * recovery.
+ *
+ * For cross-MDT operation, all of updates of the operation needs to be
+ * recorded in the disk, then during recovery phase, the recovery thread
+ * will retrieve and redo these updates if it needed.
+ *
+ * See comments above struct update_records for the format of update_records.
+ *
+ * Author: Di Wang <di.wang@intel.com>
+ */
+#define DEBUG_SUBSYSTEM S_CLASS
+
+#include <lu_target.h>
+#include <lustre_update.h>
+#include <obd.h>
+#include <obd_class.h>
+#include "tgt_internal.h"
+
+#define UPDATE_RECORDS_BUFFER_SIZE     8192
+#define UPDATE_PARAMS_BUFFER_SIZE      8192
+/**
+ * Dump update record.
+ *
+ * Dump all of updates in the update_records, mostly for debugging purpose.
+ *
+ * \param[in] records  update records to be dumpped
+ * \param[in] mask     debug level mask
+ * \param[in] dump_params if dump all of updates the updates.
+ *
+ */
+void update_records_dump(const struct update_records *records,
+                        unsigned int mask, bool dump_updates)
+{
+       const struct update_ops *ops;
+       const struct update_op  *op = NULL;
+       struct update_params    *params;
+       unsigned int            i;
+
+       ops = &records->ur_ops;
+       params = update_records_get_params(records);
+
+       CDEBUG(mask, "ops = %d params_count = %d\n", records->ur_update_count,
+              records->ur_param_count);
+
+       if (records->ur_update_count == 0)
+               return;
+
+       if (!dump_updates)
+               return;
+
+       op = &ops->uops_op[0];
+       for (i = 0; i < records->ur_update_count; i++) {
+               unsigned int j;
+
+               CDEBUG(mask, "update %dth "DFID" %s params_count = %hu\n", i,
+                      PFID(&op->uop_fid), update_op_str(op->uop_type),
+                      op->uop_param_count);
+
+               for (j = 0;  j < op->uop_param_count; j++) {
+                       struct object_update_param *param;
+
+                       param = update_params_get_param(params,
+                                       (unsigned int)op->uop_params_off[j],
+                                       records->ur_param_count);
+
+                       LASSERT(param != NULL);
+                       CDEBUG(mask, "param = %p %dth off = %hu size = %hu\n",
+                              param, j, op->uop_params_off[j], param->oup_len);
+               }
+
+               op = update_op_next_op(op);
+       }
+}
+
+/**
+ * Pack parameters to update records
+ *
+ * Find and insert parameter to update records, if the parameter
+ * already exists in \a params, then just return the offset of this
+ * parameter, otherwise insert the parameter and return its offset
+ *
+ * \param[in] params   update params in which to insert parameter
+ * \param[in] new_param        parameters to be inserted.
+ * \param[in] new_param_size   the size of \a new_param
+ *
+ * \retval             index inside \a params if parameter insertion
+ *                      succeeds.
+ * \retval             negative errno if it fails.
+ */
+static unsigned int update_records_param_pack(struct update_params *params,
+                                             const void *new_param,
+                                             size_t new_param_size,
+                                             unsigned int *param_count)
+{
+       struct object_update_param      *param;
+       unsigned int                    i;
+
+       for (i = 0; i < *param_count; i++) {
+               struct object_update_param *param;
+
+               param = update_params_get_param(params, i, *param_count);
+               if ((new_param == NULL && param->oup_len == new_param_size) ||
+                   (param->oup_len == new_param_size &&
+                    memcmp(param->oup_buf, new_param, new_param_size) == 0))
+                       /* Found the parameter and return its index */
+                       return i;
+       }
+
+       param = (struct object_update_param *)((char *)params +
+                               update_params_size(params, *param_count));
+
+       param->oup_len = new_param_size;
+       if (new_param != NULL)
+               memcpy(param->oup_buf, new_param, new_param_size);
+
+       *param_count = *param_count + 1;
+
+       return *param_count - 1;
+}
+
+/**
+ * Pack update to update records
+ *
+ * Pack the update and its parameters to the update records. First it will
+ * insert parameters, get the offset of these parameter, then fill the
+ * update with these offset. If insertion exceed the maximum size of
+ * current update records, it will return -E2BIG here, and the caller might
+ * extend the update_record size \see lod_updates_pack.
+ *
+ * \param[in] env      execution environment
+ * \param[in] fid      FID of the update.
+ * \param[in] op_type  operation type of the update
+ * \param[in] ops      ur_ops in update records
+ * \param[in|out] op_count     pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params   ur_params in update records
+ * \param[in|out] param_count  pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] param_bufs       buffers of parameters
+ * \param[in] params_buf_count the count of the parameter buffers
+ * \param[in] param_size       sizes of parameters
+ *
+ * \retval             0 if packing succeeds
+ * \retval             negative errno if packing fails
+ */
+static int update_records_update_pack(const struct lu_env *env,
+                                     const struct lu_fid *fid,
+                                     enum update_type op_type,
+                                     struct update_ops *ops,
+                                     unsigned int *op_count,
+                                     size_t *max_op_size,
+                                     struct update_params *params,
+                                     unsigned int *param_count,
+                                     size_t *max_param_size,
+                                     unsigned int param_bufs_count,
+                                     const void **param_bufs,
+                                     size_t *param_sizes)
+{
+       struct update_op        *op;
+       size_t                  total_param_sizes = 0;
+       int                     index;
+       unsigned int            i;
+
+       /* Check whether the packing exceeding the maximum update size */
+       if (unlikely(*max_op_size < update_op_size(param_bufs_count))) {
+               CDEBUG(D_INFO, "max_op_size = %zu update_op = %zu\n",
+                      *max_op_size, update_op_size(param_bufs_count));
+               *max_op_size = update_op_size(param_bufs_count);
+               return -E2BIG;
+       }
+
+       for (i = 0; i < param_bufs_count; i++)
+               total_param_sizes +=
+                       cfs_size_round(sizeof(struct object_update_param) +
+                                      param_sizes[i]);
+
+       /* Check whether the packing exceeding the maximum parameter size */
+       if (unlikely(*max_param_size < total_param_sizes)) {
+               CDEBUG(D_INFO, "max_param_size = %zu params size = %zu\n",
+                      *max_param_size, total_param_sizes);
+
+               *max_param_size = total_param_sizes;
+               return -E2BIG;
+       }
+
+       op = update_ops_get_op(ops, *op_count, *op_count);
+       op->uop_fid = *fid;
+       op->uop_type = op_type;
+       op->uop_param_count = param_bufs_count;
+       for (i = 0; i < param_bufs_count; i++) {
+               index = update_records_param_pack(params, param_bufs[i],
+                                                 param_sizes[i], param_count);
+               if (index < 0)
+                       return index;
+
+               CDEBUG(D_INFO, "%s %uth param offset = %d size = %zu\n",
+                      update_op_str(op_type), i, index, param_sizes[i]);
+
+               op->uop_params_off[i] = index;
+       }
+       CDEBUG(D_INFO, "%huth "DFID" %s param_count = %u\n",
+              *op_count, PFID(fid), update_op_str(op_type), *param_count);
+
+       *op_count = *op_count + 1;
+
+       return 0;
+}
+
+/**
+ * Pack create update
+ *
+ * Pack create update into update records.
+ *
+ * \param[in] env      execution environment
+ * \param[in] ops      ur_ops in update records
+ * \param[in|out] op_count     pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params   ur_params in update records
+ * \param[in|out] param_count  pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid      FID of the object to be created
+ * \param[in] attr     attribute of the object to be created
+ * \param[in] hint     creation hint
+ * \param[in] dof      creation format information
+ *
+ * \retval             0 if packing succeeds.
+ * \retval             negative errno if packing fails.
+ */
+int update_records_create_pack(const struct lu_env *env,
+                              struct update_ops *ops,
+                              unsigned int *op_count,
+                              size_t *max_ops_size,
+                              struct update_params *params,
+                              unsigned int *param_count,
+                              size_t *max_param_size,
+                              const struct lu_fid *fid,
+                              const struct lu_attr *attr,
+                              const struct dt_allocation_hint *hint,
+                              struct dt_object_format *dof)
+{
+       size_t                  sizes[2];
+       const void              *bufs[2];
+       int                     buf_count = 0;
+       const struct lu_fid     *parent_fid = NULL;
+       struct lu_fid           tmp_fid;
+       int                     rc;
+       struct obdo             *obdo;
+
+       if (attr != NULL) {
+               obdo = &update_env_info(env)->uti_obdo;
+               obdo->o_valid = 0;
+               obdo_from_la(obdo, attr, attr->la_valid);
+               lustre_set_wire_obdo(NULL, obdo, obdo);
+               bufs[buf_count] = obdo;
+               sizes[buf_count] = sizeof(*obdo);
+               buf_count++;
+       }
+
+       if (hint != NULL && hint->dah_parent != NULL) {
+               parent_fid = lu_object_fid(&hint->dah_parent->do_lu);
+               fid_cpu_to_le(&tmp_fid, parent_fid);
+               bufs[buf_count] = &tmp_fid;
+               sizes[buf_count] = sizeof(tmp_fid);
+               buf_count++;
+       }
+
+       rc = update_records_update_pack(env, fid, OUT_CREATE, ops, op_count,
+                                       max_ops_size, params, param_count,
+                                       max_param_size, buf_count, bufs, sizes);
+       return rc;
+}
+EXPORT_SYMBOL(update_records_create_pack);
+
+/**
+ * Pack attr set update
+ *
+ * Pack attr_set update into update records.
+ *
+ * \param[in] env      execution environment
+ * \param[in] ops      ur_ops in update records
+ * \param[in|out] op_count pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params   ur_params in update records
+ * \param[in|out] param_count pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid      FID of the object to set attr
+ * \param[in] attr     attribute of attr set
+ *
+ * \retval             0 if packing succeeds.
+ * \retval             negative errno if packing fails.
+ */
+int update_records_attr_set_pack(const struct lu_env *env,
+                                struct update_ops *ops,
+                                unsigned int *op_count,
+                                size_t *max_ops_size,
+                                struct update_params *params,
+                                unsigned int *param_count,
+                                size_t *max_param_size,
+                                const struct lu_fid *fid,
+                                const struct lu_attr *attr)
+{
+       struct obdo *obdo = &update_env_info(env)->uti_obdo;
+       size_t size = sizeof(*obdo);
+
+       obdo->o_valid = 0;
+       obdo_from_la(obdo, attr, attr->la_valid);
+       lustre_set_wire_obdo(NULL, obdo, obdo);
+       return update_records_update_pack(env, fid, OUT_ATTR_SET, ops, op_count,
+                                         max_ops_size, params, param_count,
+                                         max_param_size, 1,
+                                         (const void **)&obdo, &size);
+}
+EXPORT_SYMBOL(update_records_attr_set_pack);
+
+/**
+ * Pack ref add update
+ *
+ * Pack ref add update into update records.
+ *
+ * \param[in] env      execution environment
+ * \param[in] ops      ur_ops in update records
+ * \param[in|out] op_count pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params   ur_params in update records
+ * \param[in|out] param_count pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid      FID of the object to add reference
+ *
+ * \retval             0 if packing succeeds.
+ * \retval             negative errno if packing fails.
+ */
+int update_records_ref_add_pack(const struct lu_env *env,
+                               struct update_ops *ops,
+                               unsigned int *op_count,
+                               size_t *max_ops_size,
+                               struct update_params *params,
+                               unsigned int *param_count,
+                               size_t *max_param_size,
+                               const struct lu_fid *fid)
+{
+       return update_records_update_pack(env, fid, OUT_REF_ADD, ops, op_count,
+                                         max_ops_size, params, param_count,
+                                         max_param_size, 0, NULL, NULL);
+}
+EXPORT_SYMBOL(update_records_ref_add_pack);
+
+/**
+ * Pack ref del update
+ *
+ * Pack ref del update into update records.
+ *
+ * \param[in] env      execution environment
+ * \param[in] ops      ur_ops in update records
+ * \param[in|out] op_count     pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params   ur_params in update records
+ * \param[in] param_count      pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid      FID of the object to delete reference
+ *
+ * \retval             0 if packing succeeds.
+ * \retval             negative errno if packing fails.
+ */
+int update_records_ref_del_pack(const struct lu_env *env,
+                               struct update_ops *ops,
+                               unsigned int *op_count,
+                               size_t *max_ops_size,
+                               struct update_params *params,
+                               unsigned int *param_count,
+                               size_t *max_param_size,
+                               const struct lu_fid *fid)
+{
+       return update_records_update_pack(env, fid, OUT_REF_DEL, ops, op_count,
+                                         max_ops_size, params, param_count,
+                                         max_param_size, 0, NULL, NULL);
+}
+EXPORT_SYMBOL(update_records_ref_del_pack);
+
+/**
+ * Pack object destroy update
+ *
+ * Pack object destroy update into update records.
+ *
+ * \param[in] env      execution environment
+ * \param[in] ops      ur_ops in update records
+ * \param[in|out] op_count pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params   ur_params in update records
+ * \param[in|out] param_count  pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid      FID of the object to delete reference
+ *
+ * \retval             0 if packing succeeds.
+ * \retval             negative errno if packing fails.
+ */
+int update_records_object_destroy_pack(const struct lu_env *env,
+                                      struct update_ops *ops,
+                                      unsigned int *op_count,
+                                      size_t *max_ops_size,
+                                      struct update_params *params,
+                                      unsigned int *param_count,
+                                      size_t *max_param_size,
+                                      const struct lu_fid *fid)
+{
+       return update_records_update_pack(env, fid, OUT_DESTROY, ops, op_count,
+                                         max_ops_size, params, param_count,
+                                         max_param_size, 0, NULL, NULL);
+}
+EXPORT_SYMBOL(update_records_object_destroy_pack);
+
+/**
+ * Pack index insert update
+ *
+ * Pack index insert update into update records.
+ *
+ * \param[in] env      execution environment
+ * \param[in] ops      ur_ops in update records
+ * \param[in] op_count pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params   ur_params in update records
+ * \param[in] param_count      pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid      FID of the object to insert index
+ * \param[in] rec      record of insertion
+ * \param[in] key      key of insertion
+ *
+ * \retval             0 if packing succeeds.
+ * \retval             negative errno if packing fails.
+ */
+int update_records_index_insert_pack(const struct lu_env *env,
+                                    struct update_ops *ops,
+                                    unsigned int *op_count,
+                                    size_t *max_ops_size,
+                                    struct update_params *params,
+                                    unsigned int *param_count,
+                                    size_t *max_param_size,
+                                    const struct lu_fid *fid,
+                                    const struct dt_rec *rec,
+                                    const struct dt_key *key)
+{
+       struct dt_insert_rec       *rec1 = (struct dt_insert_rec *)rec;
+       struct lu_fid              rec_fid;
+       __u32                      type = cpu_to_le32(rec1->rec_type);
+       size_t                     sizes[3] = { strlen((const char *)key) + 1,
+                                               sizeof(rec_fid),
+                                               sizeof(type) };
+       const void                 *bufs[3] = { key,
+                                               &rec_fid,
+                                               &type };
+
+       fid_cpu_to_le(&rec_fid, rec1->rec_fid);
+
+       return update_records_update_pack(env, fid, OUT_INDEX_INSERT, ops,
+                                         op_count, max_ops_size, params,
+                                         param_count, max_param_size,
+                                         3, bufs, sizes);
+}
+EXPORT_SYMBOL(update_records_index_insert_pack);
+
+/**
+ * Pack index delete update
+ *
+ * Pack index delete update into update records.
+ *
+ * \param[in] env      execution environment
+ * \param[in] ops      ur_ops in update records
+ * \param[in|out] op_count     pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params   ur_params in update records
+ * \param[in|ount] param_count pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid      FID of the object to delete index
+ * \param[in] key      key of deletion
+ *
+ * \retval             0 if packing succeeds.
+ * \retval             negative errno if packing fails.
+ */
+int update_records_index_delete_pack(const struct lu_env *env,
+                                    struct update_ops *ops,
+                                    unsigned int *op_count,
+                                    size_t *max_ops_size,
+                                    struct update_params *params,
+                                    unsigned int *param_count,
+                                    size_t *max_param_size,
+                                    const struct lu_fid *fid,
+                                    const struct dt_key *key)
+{
+       size_t size = strlen((const char *)key) + 1;
+
+       return update_records_update_pack(env, fid, OUT_INDEX_DELETE, ops,
+                                         op_count, max_ops_size, params,
+                                         param_count, max_param_size,
+                                         1, (const void **)&key, &size);
+}
+EXPORT_SYMBOL(update_records_index_delete_pack);
+
+/**
+ * Pack xattr set update
+ *
+ * Pack xattr set update into update records.
+ *
+ * \param[in] env      execution environment
+ * \param[in] ops      ur_ops in update records
+ * \param[in|out] op_count     pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params   ur_params in update records
+ * \param[in|out] param_count  pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid      FID of the object to set xattr
+ * \param[in] buf      xattr to be set
+ * \param[in] name     name of the xattr
+ * \param[in] flag     flag for setting xattr
+ *
+ * \retval             0 if packing succeeds.
+ * \retval             negative errno if packing fails.
+ */
+int update_records_xattr_set_pack(const struct lu_env *env,
+                                 struct update_ops *ops,
+                                 unsigned int *op_count,
+                                 size_t *max_ops_size,
+                                 struct update_params *params,
+                                 unsigned int *param_count,
+                                 size_t *max_param_size,
+                                 const struct lu_fid *fid,
+                                 const struct lu_buf *buf, const char *name,
+                                 __u32 flag)
+{
+       size_t  sizes[3] = {strlen(name) + 1, buf->lb_len, sizeof(flag)};
+       const void *bufs[3] = {name, buf->lb_buf, &flag};
+
+       flag = cpu_to_le32(flag);
+
+       return update_records_update_pack(env, fid, OUT_XATTR_SET, ops,
+                                         op_count, max_ops_size, params,
+                                         param_count, max_param_size,
+                                         3, bufs, sizes);
+}
+EXPORT_SYMBOL(update_records_xattr_set_pack);
+
+/**
+ * Pack xattr delete update
+ *
+ * Pack xattr delete update into update records.
+ *
+ * \param[in] env      execution environment
+ * \param[in] ops      ur_ops in update records
+ * \param[in|out] op_count     pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params   ur_params in update records
+ * \param[in|out] param_count  pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid      FID of the object to delete xattr
+ * \param[in] name     name of the xattr
+ *
+ * \retval             0 if packing succeeds.
+ * \retval             negative errno if packing fails.
+ */
+int update_records_xattr_del_pack(const struct lu_env *env,
+                                 struct update_ops *ops,
+                                 unsigned int *op_count,
+                                 size_t *max_ops_size,
+                                 struct update_params *params,
+                                 unsigned int *param_count,
+                                 size_t *max_param_size,
+                                 const struct lu_fid *fid,
+                                 const char *name)
+{
+       size_t  size = strlen(name) + 1;
+
+       return update_records_update_pack(env, fid, OUT_XATTR_DEL, ops,
+                                         op_count, max_ops_size, params,
+                                         param_count, max_param_size,
+                                         1, (const void **)&name, &size);
+}
+EXPORT_SYMBOL(update_records_xattr_del_pack);
+
+/**
+ * Pack write update
+ *
+ * Pack write update into update records.
+ *
+ * \param[in] env      execution environment
+ * \param[in] ops      ur_ops in update records
+ * \param[in|out] op_count     pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params   ur_params in update records
+ * \param[in|out] param_count  pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid      FID of the object to write into
+ * \param[in] buf      buffer to write which includes an embedded size field
+ * \param[in] pos      offet in the object to start writing at
+ *
+ * \retval             0 if packing succeeds.
+ * \retval             negative errno if packing fails.
+ */
+int update_records_write_pack(const struct lu_env *env,
+                             struct update_ops *ops,
+                             unsigned int *op_count,
+                             size_t *max_ops_size,
+                             struct update_params *params,
+                             unsigned int *param_count,
+                             size_t *max_param_size,
+                             const struct lu_fid *fid,
+                             const struct lu_buf *buf,
+                             __u64 pos)
+{
+       size_t          sizes[2] = {buf->lb_len, sizeof(pos)};
+       const void      *bufs[2] = {buf->lb_buf, &pos};
+
+       pos = cpu_to_le64(pos);
+
+       return update_records_update_pack(env, fid, OUT_XATTR_DEL, ops,
+                                         op_count, max_ops_size, params,
+                                         param_count, max_param_size,
+                                         2, bufs, sizes);
+}
+EXPORT_SYMBOL(update_records_write_pack);
+
+/**
+ * Create update records in thandle_update_records
+ *
+ * Allocate update_records for thandle_update_records, the initial size
+ * will be 4KB.
+ *
+ * \param[in] tur      thandle_update_records where update_records will be
+ *                      allocated
+ * \retval             0 if allocation succeeds.
+ * \retval             negative errno if allocation fails.
+ */
+static int tur_update_records_create(struct thandle_update_records *tur)
+{
+       if (tur->tur_update_records != NULL)
+               return 0;
+
+       OBD_ALLOC_LARGE(tur->tur_update_records,
+                       UPDATE_RECORDS_BUFFER_SIZE);
+
+       if (tur->tur_update_records == NULL)
+               return -ENOMEM;
+
+       tur->tur_update_records_buf_size = UPDATE_RECORDS_BUFFER_SIZE;
+
+       return 0;
+}
+
+/**
+ * Extend update records
+ *
+ * Extend update_records to the new size in thandle_update_records.
+ *
+ * \param[in] tur      thandle_update_records where update_records will be
+ *                      extended.
+ * \retval             0 if extension succeeds.
+ * \retval             negative errno if extension fails.
+ */
+int tur_update_records_extend(struct thandle_update_records *tur,
+                             size_t new_size)
+{
+       struct llog_update_record       *record;
+
+       OBD_ALLOC_LARGE(record, new_size);
+       if (record == NULL)
+               return -ENOMEM;
+
+       if (tur->tur_update_records != NULL) {
+               memcpy(record, tur->tur_update_records,
+                      tur->tur_update_records_buf_size);
+               OBD_FREE_LARGE(tur->tur_update_records,
+                              tur->tur_update_records_buf_size);
+       }
+
+       tur->tur_update_records = record;
+       tur->tur_update_records_buf_size = new_size;
+
+       return 0;
+}
+EXPORT_SYMBOL(tur_update_records_extend);
+
+/**
+ * Extend update records
+ *
+ * Extend update records in thandle to make sure it is able to hold
+ * the update with certain update_op and params size.
+ *
+ * \param [in] tur     thandle_update_records to be extend
+ * \param [in] new_op_size update_op size of the update record
+ * \param [in] new_param_size params size of the update record
+ *
+ * \retval             0 if the update_records is being extended.
+ * \retval             negative errno if the update_records is not being
+ *                      extended.
+ */
+int tur_update_extend(struct thandle_update_records *tur,
+                     size_t new_op_size, size_t new_param_size)
+{
+       size_t record_size;
+       size_t params_size;
+       size_t extend_size;
+       int rc;
+       ENTRY;
+
+       record_size = llog_update_record_size(tur->tur_update_records);
+       /* extend update records buffer */
+       if (new_op_size > (tur->tur_update_records_buf_size - record_size -
+                          sizeof(*tur->tur_update_records))) {
+               extend_size = round_up(new_op_size, UPDATE_RECORDS_BUFFER_SIZE);
+               rc = tur_update_records_extend(tur,
+                               tur->tur_update_records_buf_size +
+                               extend_size);
+               if (rc != 0)
+                       RETURN(rc);
+       }
+
+       /* extend parameters buffer */
+       params_size = update_params_size(tur->tur_update_params,
+                                        tur->tur_update_param_count);
+       if (new_param_size > (tur->tur_update_params_buf_size -
+                             params_size)) {
+               extend_size = round_up(new_param_size,
+                                      UPDATE_PARAMS_BUFFER_SIZE);
+               rc = tur_update_params_extend(tur,
+                               tur->tur_update_params_buf_size +
+                               extend_size);
+               if (rc != 0)
+                       RETURN(rc);
+       }
+
+       RETURN(0);
+}
+EXPORT_SYMBOL(tur_update_extend);
+
+/**
+ * Create update params in thandle_update_records
+ *
+ * Allocate update_params for thandle_update_records, the initial size
+ * will be 4KB.
+ *
+ * \param[in] tur      thandle_update_records where update_params will be
+ *                      allocated
+ * \retval             0 if allocation succeeds.
+ * \retval             negative errno if allocation fails.
+ */
+static int tur_update_params_create(struct thandle_update_records *tur)
+{
+       if (tur->tur_update_params != NULL)
+               return 0;
+
+       OBD_ALLOC_LARGE(tur->tur_update_params, UPDATE_PARAMS_BUFFER_SIZE);
+       if (tur->tur_update_params == NULL)
+               return -ENOMEM;
+
+       tur->tur_update_params_buf_size = UPDATE_PARAMS_BUFFER_SIZE;
+       return 0;
+}
+
+/**
+ * Extend update params
+ *
+ * Extend update_params to the new size in thandle_update_records.
+ *
+ * \param[in] tur      thandle_update_records where update_params will be
+ *                      extended.
+ * \retval             0 if extension succeeds.
+ * \retval             negative errno if extension fails.
+ */
+int tur_update_params_extend(struct thandle_update_records *tur,
+                            size_t new_size)
+{
+       struct update_params    *params;
+
+       OBD_ALLOC_LARGE(params, new_size);
+       if (params == NULL)
+               return -ENOMEM;
+
+       if (tur->tur_update_params != NULL) {
+               memcpy(params, tur->tur_update_params,
+                      tur->tur_update_params_buf_size);
+               OBD_FREE_LARGE(tur->tur_update_params,
+                              tur->tur_update_params_buf_size);
+       }
+
+       tur->tur_update_params = params;
+       tur->tur_update_params_buf_size = new_size;
+
+       return 0;
+}
+EXPORT_SYMBOL(tur_update_params_extend);
+
+/**
+ * Check and prepare whether it needs to record update.
+ *
+ * Checks if the transaction needs to record updates, and if it
+ * does, then initialize the update record buffer in the transaction.
+ *
+ * \param[in] env      execution environment
+ * \param[in] th       transaction handle
+ *
+ * \retval             0 if updates recording succeeds.
+ * \retval             negative errno if updates recording fails.
+ */
+int check_and_prepare_update_record(const struct lu_env *env,
+                                   struct thandle *th)
+{
+       struct thandle_update_records   *tur;
+       struct llog_update_record       *lur;
+       struct top_thandle              *top_th;
+       struct sub_thandle              *lst;
+       int                             rc;
+       bool                            record_update = false;
+       ENTRY;
+
+       top_th = container_of(th, struct top_thandle, tt_super);
+       /* Check if it needs to record updates for this transaction */
+       list_for_each_entry(lst, &top_th->tt_sub_thandle_list, st_sub_list) {
+               if (lst->st_record_update) {
+                       record_update = true;
+                       break;
+               }
+       }
+       if (!record_update)
+               RETURN(0);
+
+       if (top_th->tt_update_records != NULL)
+               RETURN(0);
+
+       tur = &update_env_info(env)->uti_tur;
+       if (tur->tur_update_records == NULL) {
+               rc = tur_update_records_create(tur);
+               if (rc < 0)
+                       RETURN(rc);
+       }
+
+       if (tur->tur_update_params == NULL) {
+               rc = tur_update_params_create(tur);
+               if (rc < 0)
+                       RETURN(rc);
+       }
+
+       lur = tur->tur_update_records;
+       lur->lur_update_rec.ur_update_count = 0;
+       lur->lur_update_rec.ur_param_count = 0;
+       lur->lur_update_rec.ur_master_transno = 0;
+       lur->lur_update_rec.ur_batchid = 0;
+       lur->lur_update_rec.ur_flags = 0;
+
+       tur->tur_update_param_count = 0;
+
+       top_th->tt_update_records = tur;
+
+       RETURN(0);
+}
+EXPORT_SYMBOL(check_and_prepare_update_record);
+
+/**
+ * Merge params into the update records
+ *
+ * Merge params and ops into the update records attached to top th.
+ * During transaction execution phase, parameters and update ops
+ * are collected in two different buffers (see lod_updates_pack()),
+ * then in transaction stop, it needs to be merged them in one
+ * buffer, then being written into the update log.
+ *
+ * \param[in] env      execution environment
+ * \param[in] tur      thandle update records whose updates and
+ *                      parameters are merged
+ *
+ * \retval             0 if merging succeeds.
+ * \retval             negaitive errno if merging fails.
+ */
+int merge_params_updates_buf(const struct lu_env *env,
+                            struct thandle_update_records *tur)
+{
+       struct llog_update_record *lur;
+       struct update_params *params;
+       size_t params_size;
+       size_t record_size;
+
+       if (tur->tur_update_records == NULL ||
+           tur->tur_update_params == NULL)
+               return 0;
+
+       lur = tur->tur_update_records;
+       /* Extends the update records buffer if needed */
+       params_size = update_params_size(tur->tur_update_params,
+                                        tur->tur_update_param_count);
+       record_size = llog_update_record_size(lur);
+       if (record_size + params_size > tur->tur_update_records_buf_size) {
+               int rc;
+
+               rc = tur_update_records_extend(tur, record_size + params_size);
+               if (rc < 0)
+                       return rc;
+               lur = tur->tur_update_records;
+       }
+
+       params = update_records_get_params(&lur->lur_update_rec);
+       memcpy(params, tur->tur_update_params, params_size);
+       lur->lur_update_rec.ur_param_count = tur->tur_update_param_count;
+       return 0;
+}
+EXPORT_SYMBOL(merge_params_updates_buf);
+
+static void update_key_fini(const struct lu_context *ctx,
+                           struct lu_context_key *key, void *data)
+{
+       struct update_thread_info *info = data;
+
+       if (info->uti_tur.tur_update_records != NULL)
+               OBD_FREE_LARGE(info->uti_tur.tur_update_records,
+                              info->uti_tur.tur_update_records_buf_size);
+       if (info->uti_tur.tur_update_params != NULL)
+               OBD_FREE_LARGE(info->uti_tur.tur_update_params,
+                              info->uti_tur.tur_update_params_buf_size);
+
+       OBD_FREE_PTR(info);
+}
+
+/* context key constructor/destructor: update_key_init, update_key_fini */
+LU_KEY_INIT(update, struct update_thread_info);
+/* context key: update_thread_key */
+LU_CONTEXT_KEY_DEFINE(update, LCT_MD_THREAD | LCT_MG_THREAD |
+                             LCT_DT_THREAD | LCT_TX_HANDLE | LCT_LOCAL);
+EXPORT_SYMBOL(update_thread_key);
+LU_KEY_INIT_GENERIC(update);
+
+void update_info_init(void)
+{
+       update_key_init_generic(&update_thread_key, NULL);
+       lu_context_key_register(&update_thread_key);
+}
+
+void update_info_fini(void)
+{
+       lu_context_key_degister(&update_thread_key);
+}