* GPL HEADER END
*/
/*
- * Copyright (c) 2014, Intel Corporation.
+ * Copyright (c) 2015, 2017, Intel Corporation.
*/
/*
#define DEBUG_SUBSYSTEM S_CLASS
#include <lu_target.h>
+#include <lustre_obdo.h>
#include <lustre_update.h>
#include <obd.h>
#include <obd_class.h>
+
#include "tgt_internal.h"
#define UPDATE_RECORDS_BUFFER_SIZE 8192
{
const struct update_ops *ops;
const struct update_op *op = NULL;
- struct update_params *params;
+ struct update_params *params = NULL;
unsigned int i;
- ops = &records->ur_ops;
- params = update_records_get_params(records);
-
- CDEBUG(mask, "master transno = "LPU64" batchid = "LPU64" flags = %x"
+ CDEBUG(mask, "master transno = %llu batchid = %llu flags = %x"
" ops = %d params = %d\n", records->ur_master_transno,
records->ur_batchid, records->ur_flags, records->ur_update_count,
records->ur_param_count);
if (!dump_updates)
return;
+ ops = &records->ur_ops;
+ if (records->ur_param_count > 0)
+ params = update_records_get_params(records);
+
op = &ops->uops_op[0];
- for (i = 0; i < records->ur_update_count; i++) {
+ for (i = 0; i < records->ur_update_count; i++,
+ op = update_op_next_op(op)) {
unsigned int j;
CDEBUG(mask, "update %dth "DFID" %s params_count = %hu\n", i,
PFID(&op->uop_fid), update_op_str(op->uop_type),
op->uop_param_count);
+ if (params == NULL)
+ continue;
+
for (j = 0; j < op->uop_param_count; j++) {
struct object_update_param *param;
param = update_params_get_param(params,
- (unsigned int)op->uop_params_off[j],
+ (unsigned int)op->uop_params_off[j],
records->ur_param_count);
- LASSERT(param != NULL);
+ if (param == NULL)
+ continue;
CDEBUG(mask, "param = %p %dth off = %hu size = %hu\n",
param, j, op->uop_params_off[j], param->oup_len);
}
-
- op = update_op_next_op(op);
}
}
}
/**
+ * Calculate update_records size
+ *
+ * Calculate update_records size by param_count and param_sizes array.
+ *
+ * \param[in] param_count the count of parameters
+ * \param[in] sizes the size array of these parameters
+ *
+ * \retval the size of this update
+ */
+static size_t update_records_update_size(__u32 param_count, size_t *sizes)
+{
+ int i;
+ size_t size;
+
+ /* Check whether the packing exceeding the maximum update size */
+ size = update_op_size(param_count);
+
+ for (i = 0; i < param_count; i++)
+ size += cfs_size_round(sizeof(struct object_update_param) +
+ sizes[i]);
+
+ return size;
+}
+
+/**
+ * Calculate create update size
+ *
+ * \param[in] env execution environment
+ * \param[in] ops ur_ops in update records
+ * \param[in] fid FID of the object to be created
+ * \param[in] attr attribute of the object to be created
+ * \param[in] hint creation hint
+ * \param[in] dof creation format information
+ *
+ * \retval size of create update.
+ */
+size_t update_records_create_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct lu_attr *attr,
+ const struct dt_allocation_hint *hint,
+ struct dt_object_format *dof)
+{
+ size_t sizes[2];
+ int param_count = 0;
+
+ if (attr != NULL) {
+ sizes[param_count] = sizeof(struct obdo);
+ param_count++;
+ }
+
+ if (hint != NULL && hint->dah_parent != NULL) {
+ sizes[param_count] = sizeof(*fid);
+ param_count++;
+ }
+
+ return update_records_update_size(param_count, sizes);
+}
+EXPORT_SYMBOL(update_records_create_size);
+
+/**
* Pack create update
*
* Pack create update into update records.
obdo = &update_env_info(env)->uti_obdo;
obdo->o_valid = 0;
obdo_from_la(obdo, attr, attr->la_valid);
- lustre_set_wire_obdo(NULL, obdo, obdo);
bufs[buf_count] = obdo;
sizes[buf_count] = sizeof(*obdo);
buf_count++;
EXPORT_SYMBOL(update_records_create_pack);
/**
+ * Calculate attr set update size
+ *
+ * \param[in] env execution environment
+ * \param[in] ops ur_ops in update records
+ * \param[in] fid FID of the object to set attr
+ * \param[in] attr attribute of attr set
+ *
+ * \retval size of attr set update.
+ */
+size_t update_records_attr_set_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct lu_attr *attr)
+{
+ size_t size = sizeof(struct obdo);
+
+ return update_records_update_size(1, &size);
+}
+EXPORT_SYMBOL(update_records_attr_set_size);
+
+/**
* Pack attr set update
*
* Pack attr_set update into update records.
obdo->o_valid = 0;
obdo_from_la(obdo, attr, attr->la_valid);
- lustre_set_wire_obdo(NULL, obdo, obdo);
return update_records_update_pack(env, fid, OUT_ATTR_SET, ops, op_count,
max_ops_size, params, param_count,
max_param_size, 1,
EXPORT_SYMBOL(update_records_attr_set_pack);
/**
+ * Calculate ref add update size
+ *
+ * \param[in] env execution environment
+ * \param[in] fid FID of the object to add reference
+ *
+ * \retval size of ref_add udpate.
+ */
+size_t update_records_ref_add_size(const struct lu_env *env,
+ const struct lu_fid *fid)
+{
+ return update_records_update_size(0, NULL);
+}
+EXPORT_SYMBOL(update_records_ref_add_size);
+
+/**
* Pack ref add update
*
* Pack ref add update into update records.
EXPORT_SYMBOL(update_records_ref_add_pack);
/**
+ * Pack noop update
+ *
+ * Pack no op update into update records. Note: no op means
+ * the update does not need do anything, which is only used
+ * in test case to verify large size record.
+ *
+ * \param[in] env execution environment
+ * \param[in] ops ur_ops in update records
+ * \param[in|out] op_count pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params ur_params in update records
+ * \param[in|out] param_count pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid FID of the object to add reference
+ *
+ * \retval 0 if packing succeeds.
+ * \retval negative errno if packing fails.
+ */
+int update_records_noop_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid)
+{
+ return update_records_update_pack(env, fid, OUT_NOOP, ops, op_count,
+ max_ops_size, params, param_count,
+ max_param_size, 0, NULL, NULL);
+}
+EXPORT_SYMBOL(update_records_noop_pack);
+
+/**
+ * Calculate ref del update size
+ *
+ * \param[in] env execution environment
+ * \param[in] fid FID of the object to delete reference
+ *
+ * \retval size of ref_del update.
+ */
+size_t update_records_ref_del_size(const struct lu_env *env,
+ const struct lu_fid *fid)
+{
+ return update_records_update_size(0, NULL);
+}
+EXPORT_SYMBOL(update_records_ref_del_size);
+
+/**
* Pack ref del update
*
* Pack ref del update into update records.
EXPORT_SYMBOL(update_records_ref_del_pack);
/**
+ * Calculate object destroy update size
+ *
+ * \param[in] env execution environment
+ * \param[in] fid FID of the object to delete reference
+ *
+ * \retval size of object destroy update.
+ */
+size_t update_records_destroy_size(const struct lu_env *env,
+ const struct lu_fid *fid)
+{
+ return update_records_update_size(0, NULL);
+}
+EXPORT_SYMBOL(update_records_destroy_size);
+
+/**
* Pack object destroy update
*
* Pack object destroy update into update records.
* \retval 0 if packing succeeds.
* \retval negative errno if packing fails.
*/
-int update_records_object_destroy_pack(const struct lu_env *env,
+int update_records_destroy_pack(const struct lu_env *env,
struct update_ops *ops,
unsigned int *op_count,
size_t *max_ops_size,
max_ops_size, params, param_count,
max_param_size, 0, NULL, NULL);
}
-EXPORT_SYMBOL(update_records_object_destroy_pack);
+EXPORT_SYMBOL(update_records_destroy_pack);
+
+/**
+ * Calculate index insert update size
+ *
+ * \param[in] env execution environment
+ * \param[in] fid FID of the object to insert index
+ * \param[in] rec record of insertion
+ * \param[in] key key of insertion
+ *
+ * \retval the size of index insert update.
+ */
+size_t update_records_index_insert_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct dt_rec *rec,
+ const struct dt_key *key)
+{
+ size_t sizes[3] = { strlen((const char *)key) + 1,
+ sizeof(struct lu_fid),
+ sizeof(__u32) };
+ return update_records_update_size(3, sizes);
+}
+EXPORT_SYMBOL(update_records_index_insert_size);
/**
* Pack index insert update
EXPORT_SYMBOL(update_records_index_insert_pack);
/**
+ * Calculate index delete update size
+ *
+ * \param[in] env execution environment
+ * \param[in] fid FID of the object to delete index
+ * \param[in] key key of deletion
+ *
+ * \retval the size of index delete update
+ */
+size_t update_records_index_delete_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct dt_key *key)
+{
+ size_t size = strlen((const char *)key) + 1;
+
+ return update_records_update_size(1, &size);
+}
+EXPORT_SYMBOL(update_records_index_delete_size);
+
+/**
* Pack index delete update
*
* Pack index delete update into update records.
EXPORT_SYMBOL(update_records_index_delete_pack);
/**
+ * Calculate xattr set size
+ *
+ * \param[in] env execution environment
+ * \param[in] fid FID of the object to set xattr
+ * \param[in] buf xattr to be set
+ * \param[in] name name of the xattr
+ * \param[in] flag flag for setting xattr
+ *
+ * \retval size of xattr set update.
+ */
+size_t update_records_xattr_set_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct lu_buf *buf,
+ const char *name, __u32 flag)
+{
+ size_t sizes[3] = {strlen(name) + 1, buf->lb_len, sizeof(flag)};
+
+ return update_records_update_size(3, sizes);
+}
+EXPORT_SYMBOL(update_records_xattr_set_size);
+
+/**
* Pack xattr set update
*
* Pack xattr set update into update records.
EXPORT_SYMBOL(update_records_xattr_set_pack);
/**
+ * Calculate xattr delete update size.
+ *
+ * \param[in] env execution environment
+ * \param[in] fid FID of the object to delete xattr
+ * \param[in] name name of the xattr
+ *
+ * \retval size of xattr delet updatee.
+ */
+size_t update_records_xattr_del_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const char *name)
+{
+ size_t size = strlen(name) + 1;
+
+ return update_records_update_size(1, &size);
+}
+EXPORT_SYMBOL(update_records_xattr_del_size);
+
+/**
* Pack xattr delete update
*
* Pack xattr delete update into update records.
EXPORT_SYMBOL(update_records_xattr_del_pack);
/**
+ * Calculate write update size
+ *
+ * \param[in] env execution environment
+ * \param[in] fid FID of the object to write into
+ * \param[in] buf buffer to write which includes an embedded size field
+ * \param[in] pos offet in the object to start writing at
+ *
+ * \retval size of write udpate.
+ */
+size_t update_records_write_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct lu_buf *buf,
+ __u64 pos)
+{
+ size_t sizes[2] = {buf->lb_len, sizeof(pos)};
+
+ return update_records_update_size(2, sizes);
+}
+EXPORT_SYMBOL(update_records_write_size);
+
+/**
* Pack write update
*
* Pack write update into update records.
EXPORT_SYMBOL(update_records_write_pack);
/**
+ * Calculate size of punch update.
+ *
+ * \param[in] env execution environment
+ * \param[in] fid FID of the object to write into
+ * \param[in] start start offset of punch
+ * \param[in] end end offet of punch
+ *
+ * \retval size of update punch.
+ */
+size_t update_records_punch_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ __u64 start, __u64 end)
+{
+ size_t sizes[2] = {sizeof(start), sizeof(end)};
+
+ return update_records_update_size(2, sizes);
+}
+EXPORT_SYMBOL(update_records_punch_size);
+
+/**
* Pack punch
*
* Pack punch update into update records.
record_size = llog_update_record_size(tur->tur_update_records);
/* extend update records buffer */
- if (new_op_size > (tur->tur_update_records_buf_size - record_size -
- sizeof(*tur->tur_update_records))) {
+ if (new_op_size >= (tur->tur_update_records_buf_size - record_size)) {
extend_size = round_up(new_op_size, UPDATE_RECORDS_BUFFER_SIZE);
rc = tur_update_records_extend(tur,
tur->tur_update_records_buf_size +
/* extend parameters buffer */
params_size = update_params_size(tur->tur_update_params,
tur->tur_update_param_count);
- if (new_param_size > (tur->tur_update_params_buf_size -
+ if (new_param_size >= (tur->tur_update_params_buf_size -
params_size)) {
extend_size = round_up(new_param_size,
UPDATE_PARAMS_BUFFER_SIZE);
}
if (args->ta_args != NULL)
- OBD_FREE(args->ta_args, sizeof(args->ta_args[0]) *
- args->ta_alloc_args);
+ OBD_FREE_PTR_ARRAY(args->ta_args, args->ta_alloc_args);
if (info->uti_tur.tur_update_records != NULL)
OBD_FREE_LARGE(info->uti_tur.tur_update_records,
LU_KEY_INIT(update, struct update_thread_info);
/* context key: update_thread_key */
LU_CONTEXT_KEY_DEFINE(update, LCT_MD_THREAD | LCT_MG_THREAD |
- LCT_DT_THREAD | LCT_TX_HANDLE | LCT_LOCAL);
+ LCT_DT_THREAD | LCT_LOCAL);
EXPORT_SYMBOL(update_thread_key);
LU_KEY_INIT_GENERIC(update);