Whamcloud - gitweb
LU-6401 headers: Create a header for obdo related functions
[fs/lustre-release.git] / lustre / target / update_records.c
index 63d9dc7..87bd7e7 100644 (file)
@@ -20,7 +20,7 @@
  * GPL HEADER END
  */
 /*
- * Copyright (c) 2014, Intel Corporation.
+ * Copyright (c) 2014, 2015, Intel Corporation.
  */
 
 /*
 #define DEBUG_SUBSYSTEM S_CLASS
 
 #include <lu_target.h>
+#include <lustre_obdo.h>
 #include <lustre_update.h>
 #include <obd.h>
 #include <obd_class.h>
+
 #include "tgt_internal.h"
 
 #define UPDATE_RECORDS_BUFFER_SIZE     8192
@@ -63,13 +65,12 @@ void update_records_dump(const struct update_records *records,
 {
        const struct update_ops *ops;
        const struct update_op  *op = NULL;
-       struct update_params    *params;
+       struct update_params    *params = NULL;
        unsigned int            i;
 
-       ops = &records->ur_ops;
-       params = update_records_get_params(records);
-
-       CDEBUG(mask, "ops = %d params_count = %d\n", records->ur_update_count,
+       CDEBUG(mask, "master transno = "LPU64" batchid = "LPU64" flags = %x"
+              " ops = %d params = %d\n", records->ur_master_transno,
+              records->ur_batchid, records->ur_flags, records->ur_update_count,
               records->ur_param_count);
 
        if (records->ur_update_count == 0)
@@ -78,27 +79,34 @@ void update_records_dump(const struct update_records *records,
        if (!dump_updates)
                return;
 
+       ops = &records->ur_ops;
+       if (records->ur_param_count > 0)
+               params = update_records_get_params(records);
+
        op = &ops->uops_op[0];
-       for (i = 0; i < records->ur_update_count; i++) {
+       for (i = 0; i < records->ur_update_count; i++,
+                                 op = update_op_next_op(op)) {
                unsigned int j;
 
                CDEBUG(mask, "update %dth "DFID" %s params_count = %hu\n", i,
                       PFID(&op->uop_fid), update_op_str(op->uop_type),
                       op->uop_param_count);
 
+               if (params == NULL)
+                       continue;
+
                for (j = 0;  j < op->uop_param_count; j++) {
                        struct object_update_param *param;
 
                        param = update_params_get_param(params,
-                                       (unsigned int)op->uop_params_off[j],
+                               (unsigned int)op->uop_params_off[j],
                                        records->ur_param_count);
 
-                       LASSERT(param != NULL);
+                       if (param == NULL)
+                               continue;
                        CDEBUG(mask, "param = %p %dth off = %hu size = %hu\n",
                               param, j, op->uop_params_off[j], param->oup_len);
                }
-
-               op = update_op_next_op(op);
        }
 }
 
@@ -237,6 +245,66 @@ static int update_records_update_pack(const struct lu_env *env,
 }
 
 /**
+ * Calculate update_records size
+ *
+ * Calculate update_records size by param_count and param_sizes array.
+ *
+ * \param[in] param_count      the count of parameters
+ * \param[in] sizes            the size array of these parameters
+ *
+ * \retval                     the size of this update
+ */
+static size_t update_records_update_size(__u32 param_count, size_t *sizes)
+{
+       int i;
+       size_t size;
+
+       /* Check whether the packing exceeding the maximum update size */
+       size = update_op_size(param_count);
+
+       for (i = 0; i < param_count; i++)
+               size += cfs_size_round(sizeof(struct object_update_param) +
+                                      sizes[i]);
+
+       return size;
+}
+
+/**
+ * Calculate create update size
+ *
+ * \param[in] env      execution environment
+ * \param[in] ops      ur_ops in update records
+ * \param[in] fid      FID of the object to be created
+ * \param[in] attr     attribute of the object to be created
+ * \param[in] hint     creation hint
+ * \param[in] dof      creation format information
+ *
+ * \retval             size of create update.
+ */
+size_t update_records_create_size(const struct lu_env *env,
+                                 const struct lu_fid *fid,
+                                 const struct lu_attr *attr,
+                                 const struct dt_allocation_hint *hint,
+                                 struct dt_object_format *dof)
+{
+       size_t  sizes[2];
+       int     param_count = 0;
+
+       if (attr != NULL) {
+               sizes[param_count] = sizeof(struct obdo);
+               param_count++;
+       }
+
+       if (hint != NULL && hint->dah_parent != NULL) {
+               sizes[param_count] = sizeof(*fid);
+               param_count++;
+       }
+
+       return update_records_update_size(param_count, sizes);
+}
+EXPORT_SYMBOL(update_records_create_size);
+
+/**
  * Pack create update
  *
  * Pack create update into update records.
@@ -302,6 +370,26 @@ int update_records_create_pack(const struct lu_env *env,
 EXPORT_SYMBOL(update_records_create_pack);
 
 /**
+ * Calculate attr set update size
+ *
+ * \param[in] env      execution environment
+ * \param[in] ops      ur_ops in update records
+ * \param[in] fid      FID of the object to set attr
+ * \param[in] attr     attribute of attr set
+ *
+ * \retval             size of attr set update.
+ */
+size_t update_records_attr_set_size(const struct lu_env *env,
+                                   const struct lu_fid *fid,
+                                   const struct lu_attr *attr)
+{
+       size_t size = sizeof(struct obdo);
+
+       return update_records_update_size(1, &size);
+}
+EXPORT_SYMBOL(update_records_attr_set_size);
+
+/**
  * Pack attr set update
  *
  * Pack attr_set update into update records.
@@ -343,6 +431,21 @@ int update_records_attr_set_pack(const struct lu_env *env,
 EXPORT_SYMBOL(update_records_attr_set_pack);
 
 /**
+ * Calculate ref add update size
+ *
+ * \param[in] env      execution environment
+ * \param[in] fid      FID of the object to add reference
+ *
+ * \retval             size of ref_add udpate.
+ */
+size_t update_records_ref_add_size(const struct lu_env *env,
+                                  const struct lu_fid *fid)
+{
+       return update_records_update_size(0, NULL);
+}
+EXPORT_SYMBOL(update_records_ref_add_size);
+
+/**
  * Pack ref add update
  *
  * Pack ref add update into update records.
@@ -375,6 +478,55 @@ int update_records_ref_add_pack(const struct lu_env *env,
 EXPORT_SYMBOL(update_records_ref_add_pack);
 
 /**
+ * Pack noop update
+ *
+ * Pack no op update into update records. Note: no op means
+ * the update does not need do anything, which is only used
+ * in test case to verify large size record.
+ *
+ * \param[in] env      execution environment
+ * \param[in] ops      ur_ops in update records
+ * \param[in|out] op_count pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params   ur_params in update records
+ * \param[in|out] param_count pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid      FID of the object to add reference
+ *
+ * \retval             0 if packing succeeds.
+ * \retval             negative errno if packing fails.
+ */
+int update_records_noop_pack(const struct lu_env *env,
+                            struct update_ops *ops,
+                            unsigned int *op_count,
+                            size_t *max_ops_size,
+                            struct update_params *params,
+                            unsigned int *param_count,
+                            size_t *max_param_size,
+                            const struct lu_fid *fid)
+{
+       return update_records_update_pack(env, fid, OUT_NOOP, ops, op_count,
+                                         max_ops_size, params, param_count,
+                                         max_param_size, 0, NULL, NULL);
+}
+EXPORT_SYMBOL(update_records_noop_pack);
+
+/**
+ * Calculate ref del update size
+ *
+ * \param[in] env      execution environment
+ * \param[in] fid      FID of the object to delete reference
+ *
+ * \retval             size of ref_del update.
+ */
+size_t update_records_ref_del_size(const struct lu_env *env,
+                                  const struct lu_fid *fid)
+{
+       return update_records_update_size(0, NULL);
+}
+EXPORT_SYMBOL(update_records_ref_del_size);
+
+/**
  * Pack ref del update
  *
  * Pack ref del update into update records.
@@ -407,6 +559,21 @@ int update_records_ref_del_pack(const struct lu_env *env,
 EXPORT_SYMBOL(update_records_ref_del_pack);
 
 /**
+ * Calculate object destroy update size
+ *
+ * \param[in] env      execution environment
+ * \param[in] fid      FID of the object to delete reference
+ *
+ * \retval             size of object destroy update.
+ */
+size_t update_records_object_destroy_size(const struct lu_env *env,
+                                         const struct lu_fid *fid)
+{
+       return update_records_update_size(0, NULL);
+}
+EXPORT_SYMBOL(update_records_object_destroy_size);
+
+/**
  * Pack object destroy update
  *
  * Pack object destroy update into update records.
@@ -439,6 +606,28 @@ int update_records_object_destroy_pack(const struct lu_env *env,
 EXPORT_SYMBOL(update_records_object_destroy_pack);
 
 /**
+ * Calculate index insert update size
+ *
+ * \param[in] env      execution environment
+ * \param[in] fid      FID of the object to insert index
+ * \param[in] rec      record of insertion
+ * \param[in] key      key of insertion
+ *
+ * \retval             the size of index insert update.
+ */
+size_t update_records_index_insert_size(const struct lu_env *env,
+                                       const struct lu_fid *fid,
+                                       const struct dt_rec *rec,
+                                       const struct dt_key *key)
+{
+       size_t                     sizes[3] = { strlen((const char *)key) + 1,
+                                               sizeof(struct lu_fid),
+                                               sizeof(__u32) };
+       return update_records_update_size(3, sizes);
+}
+EXPORT_SYMBOL(update_records_index_insert_size);
+
+/**
  * Pack index insert update
  *
  * Pack index insert update into update records.
@@ -488,6 +677,25 @@ int update_records_index_insert_pack(const struct lu_env *env,
 EXPORT_SYMBOL(update_records_index_insert_pack);
 
 /**
+ * Calculate index delete update size
+ *
+ * \param[in] env      execution environment
+ * \param[in] fid      FID of the object to delete index
+ * \param[in] key      key of deletion
+ *
+ * \retval             the size of index delete update
+ */
+size_t update_records_index_delete_size(const struct lu_env *env,
+                                       const struct lu_fid *fid,
+                                       const struct dt_key *key)
+{
+       size_t size = strlen((const char *)key) + 1;
+
+       return update_records_update_size(1, &size);
+}
+EXPORT_SYMBOL(update_records_index_delete_size);
+
+/**
  * Pack index delete update
  *
  * Pack index delete update into update records.
@@ -525,6 +733,28 @@ int update_records_index_delete_pack(const struct lu_env *env,
 EXPORT_SYMBOL(update_records_index_delete_pack);
 
 /**
+ * Calculate xattr set size
+ *
+ * \param[in] env      execution environment
+ * \param[in] fid      FID of the object to set xattr
+ * \param[in] buf      xattr to be set
+ * \param[in] name     name of the xattr
+ * \param[in] flag     flag for setting xattr
+ *
+ * \retval             size of xattr set update.
+ */
+size_t update_records_xattr_set_size(const struct lu_env *env,
+                                    const struct lu_fid *fid,
+                                    const struct lu_buf *buf,
+                                    const char *name, __u32 flag)
+{
+       size_t  sizes[3] = {strlen(name) + 1, buf->lb_len, sizeof(flag)};
+
+       return update_records_update_size(3, sizes);
+}
+EXPORT_SYMBOL(update_records_xattr_set_size);
+
+/**
  * Pack xattr set update
  *
  * Pack xattr set update into update records.
@@ -568,6 +798,25 @@ int update_records_xattr_set_pack(const struct lu_env *env,
 EXPORT_SYMBOL(update_records_xattr_set_pack);
 
 /**
+ * Calculate xattr delete update size.
+ *
+ * \param[in] env      execution environment
+ * \param[in] fid      FID of the object to delete xattr
+ * \param[in] name     name of the xattr
+ *
+ * \retval             size of xattr delet updatee.
+ */
+size_t update_records_xattr_del_size(const struct lu_env *env,
+                                    const struct lu_fid *fid,
+                                    const char *name)
+{
+       size_t  size = strlen(name) + 1;
+
+       return update_records_update_size(1, &size);
+}
+EXPORT_SYMBOL(update_records_xattr_del_size);
+
+/**
  * Pack xattr delete update
  *
  * Pack xattr delete update into update records.
@@ -605,6 +854,27 @@ int update_records_xattr_del_pack(const struct lu_env *env,
 EXPORT_SYMBOL(update_records_xattr_del_pack);
 
 /**
+ * Calculate write update size
+ *
+ * \param[in] env      execution environment
+ * \param[in] fid      FID of the object to write into
+ * \param[in] buf      buffer to write which includes an embedded size field
+ * \param[in] pos      offet in the object to start writing at
+ *
+ * \retval             size of write udpate.
+ */
+size_t update_records_write_size(const struct lu_env *env,
+                                const struct lu_fid *fid,
+                                const struct lu_buf *buf,
+                                __u64 pos)
+{
+       size_t  sizes[2] = {buf->lb_len, sizeof(pos)};
+
+       return update_records_update_size(2, sizes);
+}
+EXPORT_SYMBOL(update_records_write_size);
+
+/**
  * Pack write update
  *
  * Pack write update into update records.
@@ -647,6 +917,26 @@ int update_records_write_pack(const struct lu_env *env,
 EXPORT_SYMBOL(update_records_write_pack);
 
 /**
+ * Calculate size of punch update.
+ *
+ * \param[in] env      execution environment
+ * \param[in] fid      FID of the object to write into
+ * \param[in] start    start offset of punch
+ * \param[in] end      end offet of punch
+ *
+ * \retval             size of update punch.
+ */
+size_t update_records_punch_size(const struct lu_env *env,
+                                const struct lu_fid *fid,
+                                __u64 start, __u64 end)
+{
+       size_t  sizes[2] = {sizeof(start), sizeof(end)};
+
+       return update_records_update_size(2, sizes);
+}
+EXPORT_SYMBOL(update_records_punch_size);
+
+/**
  * Pack punch
  *
  * Pack punch update into update records.
@@ -772,8 +1062,7 @@ int tur_update_extend(struct thandle_update_records *tur,
 
        record_size = llog_update_record_size(tur->tur_update_records);
        /* extend update records buffer */
-       if (new_op_size > (tur->tur_update_records_buf_size - record_size -
-                          sizeof(*tur->tur_update_records))) {
+       if (new_op_size >= (tur->tur_update_records_buf_size - record_size)) {
                extend_size = round_up(new_op_size, UPDATE_RECORDS_BUFFER_SIZE);
                rc = tur_update_records_extend(tur,
                                tur->tur_update_records_buf_size +
@@ -785,7 +1074,7 @@ int tur_update_extend(struct thandle_update_records *tur,
        /* extend parameters buffer */
        params_size = update_params_size(tur->tur_update_params,
                                         tur->tur_update_param_count);
-       if (new_param_size > (tur->tur_update_params_buf_size -
+       if (new_param_size >= (tur->tur_update_params_buf_size -
                              params_size)) {
                extend_size = round_up(new_param_size,
                                       UPDATE_PARAMS_BUFFER_SIZE);
@@ -870,31 +1159,11 @@ EXPORT_SYMBOL(tur_update_params_extend);
  * \retval             negative errno if updates recording fails.
  */
 int check_and_prepare_update_record(const struct lu_env *env,
-                                   struct thandle *th)
+                                   struct thandle_update_records *tur)
 {
-       struct thandle_update_records   *tur;
        struct llog_update_record       *lur;
-       struct top_thandle              *top_th;
-       struct sub_thandle              *lst;
-       int                             rc;
-       bool                            record_update = false;
-       ENTRY;
-
-       top_th = container_of(th, struct top_thandle, tt_super);
-       /* Check if it needs to record updates for this transaction */
-       list_for_each_entry(lst, &top_th->tt_sub_thandle_list, st_sub_list) {
-               if (lst->st_record_update) {
-                       record_update = true;
-                       break;
-               }
-       }
-       if (!record_update)
-               RETURN(0);
-
-       if (top_th->tt_update_records != NULL)
-               RETURN(0);
+       int rc;
 
-       tur = &update_env_info(env)->uti_tur;
        if (tur->tur_update_records == NULL) {
                rc = tur_update_records_create(tur);
                if (rc < 0)
@@ -913,68 +1182,28 @@ int check_and_prepare_update_record(const struct lu_env *env,
        lur->lur_update_rec.ur_master_transno = 0;
        lur->lur_update_rec.ur_batchid = 0;
        lur->lur_update_rec.ur_flags = 0;
+       lur->lur_hdr.lrh_len = LLOG_MIN_CHUNK_SIZE;
 
        tur->tur_update_param_count = 0;
 
-       top_th->tt_update_records = tur;
-
        RETURN(0);
 }
-EXPORT_SYMBOL(check_and_prepare_update_record);
-
-/**
- * Merge params into the update records
- *
- * Merge params and ops into the update records attached to top th.
- * During transaction execution phase, parameters and update ops
- * are collected in two different buffers (see lod_updates_pack()),
- * then in transaction stop, it needs to be merged them in one
- * buffer, then being written into the update log.
- *
- * \param[in] env      execution environment
- * \param[in] tur      thandle update records whose updates and
- *                      parameters are merged
- *
- * \retval             0 if merging succeeds.
- * \retval             negaitive errno if merging fails.
- */
-int merge_params_updates_buf(const struct lu_env *env,
-                            struct thandle_update_records *tur)
-{
-       struct llog_update_record *lur;
-       struct update_params *params;
-       size_t params_size;
-       size_t record_size;
-
-       if (tur->tur_update_records == NULL ||
-           tur->tur_update_params == NULL)
-               return 0;
-
-       lur = tur->tur_update_records;
-       /* Extends the update records buffer if needed */
-       params_size = update_params_size(tur->tur_update_params,
-                                        tur->tur_update_param_count);
-       record_size = llog_update_record_size(lur);
-       if (record_size + params_size > tur->tur_update_records_buf_size) {
-               int rc;
-
-               rc = tur_update_records_extend(tur, record_size + params_size);
-               if (rc < 0)
-                       return rc;
-               lur = tur->tur_update_records;
-       }
-
-       params = update_records_get_params(&lur->lur_update_rec);
-       memcpy(params, tur->tur_update_params, params_size);
-       lur->lur_update_rec.ur_param_count = tur->tur_update_param_count;
-       return 0;
-}
-EXPORT_SYMBOL(merge_params_updates_buf);
 
 static void update_key_fini(const struct lu_context *ctx,
                            struct lu_context_key *key, void *data)
 {
        struct update_thread_info *info = data;
+       struct thandle_exec_args  *args = &info->uti_tea;
+       int                       i;
+
+       for (i = 0; i < args->ta_alloc_args; i++) {
+               if (args->ta_args[i] != NULL)
+                       OBD_FREE_PTR(args->ta_args[i]);
+       }
+
+       if (args->ta_args != NULL)
+               OBD_FREE(args->ta_args, sizeof(args->ta_args[0]) *
+                        args->ta_alloc_args);
 
        if (info->uti_tur.tur_update_records != NULL)
                OBD_FREE_LARGE(info->uti_tur.tur_update_records,