CHANGELOG_REC = LLOG_OP_MAGIC | 0x60000,
CHANGELOG_USER_REC = LLOG_OP_MAGIC | 0x70000,
HSM_AGENT_REC = LLOG_OP_MAGIC | 0x80000,
+ UPDATE_REC = LLOG_OP_MAGIC | 0xa0000,
LLOG_HDR_MAGIC = LLOG_OP_MAGIC | 0x45539,
LLOG_LOGID_MAGIC = LLOG_OP_MAGIC | 0x4553b,
} llog_op_type;
*/
/**
- * Type of each update
+ * Type of each update, if adding/deleting update, please also update
+ * update_opcode in lustre/target/out_lib.c.
*/
enum update_type {
+ OUT_START = 0,
OUT_CREATE = 1,
OUT_DESTROY = 2,
OUT_REF_ADD = 3,
void lustre_swab_object_update_request(struct object_update_request *our);
static inline size_t
-object_update_size(const struct object_update *update)
+object_update_params_size(const struct object_update *update)
{
- const struct object_update_param *param;
- size_t size;
- unsigned int i;
+ const struct object_update_param *param;
+ size_t total_size = 0;
+ unsigned int i;
- size = offsetof(struct object_update, ou_params[0]);
+ param = &update->ou_params[0];
for (i = 0; i < update->ou_params_count; i++) {
- param = (struct object_update_param *)((char *)update + size);
- size += object_update_param_size(param);
+ size_t size = object_update_param_size(param);
+
+ param = (struct object_update_param *)((char *)param + size);
+ total_size += size;
}
- return size;
+ return total_size;
+}
+
+static inline size_t
+object_update_size(const struct object_update *update)
+{
+ return offsetof(struct object_update, ou_params[0]) +
+ object_update_params_size(update);
}
static inline struct object_update *
struct dt_key;
struct dt_rec;
+struct object_update_param;
struct update_buffer {
struct object_update_request *ub_req;
size_t ub_req_size;
};
-#define TOP_THANDLE_MAGIC 0x20140917
-/* {top,sub}_thandle are used to manage distributed transactions which
- * include updates on several nodes. A top_handle represents the
- * whole operation, and sub_thandle represents updates on each node. */
-struct top_thandle {
- struct thandle tt_super;
- __u32 tt_magic;
- /* The master sub transaction. */
- struct thandle *tt_master_sub_thandle;
-
- /* Other sub thandle will be listed here. */
- struct list_head tt_sub_thandle_list;
-};
-
-struct sub_thandle {
- /* point to the osd/osp_thandle */
- struct thandle *st_sub_th;
- struct list_head st_sub_list;
-};
-
/**
* Tracking the updates being executed on this dt_device.
*/
struct list_head dur_cb_items;
};
+struct update_params {
+ struct object_update_param up_params[0];
+};
+
+static inline size_t update_params_size(const struct update_params *params,
+ unsigned int param_count)
+{
+ struct object_update_param *param;
+ size_t total_size = sizeof(*params);
+ unsigned int i;
+
+ param = (struct object_update_param *)¶ms->up_params[0];
+ for (i = 0; i < param_count; i++) {
+ size_t size = object_update_param_size(param);
+
+ param = (struct object_update_param *)((char *)param + size);
+ total_size += size;
+ }
+
+ return total_size;
+}
+
+static inline struct object_update_param *
+update_params_get_param(const struct update_params *params,
+ unsigned int index, unsigned int param_count)
+{
+ struct object_update_param *param;
+ unsigned int i;
+
+ if (index > param_count)
+ return NULL;
+
+ param = (struct object_update_param *)¶ms->up_params[0];
+ for (i = 0; i < index; i++)
+ param = (struct object_update_param *)((char *)param +
+ object_update_param_size(param));
+
+ return param;
+}
+
+struct update_op {
+ struct lu_fid uop_fid;
+ __u16 uop_type;
+ __u16 uop_param_count;
+ __u16 uop_params_off[0];
+};
+
+static inline size_t
+update_op_size(unsigned int param_count)
+{
+ return offsetof(struct update_op, uop_params_off[param_count]);
+}
+
+static inline struct update_op *
+update_op_next_op(const struct update_op *uop)
+{
+ return (struct update_op *)((char *)uop +
+ update_op_size(uop->uop_param_count));
+}
+
+/* All of updates in the mulitple_update_record */
+struct update_ops {
+ struct update_op uops_op[0];
+};
+
+static inline size_t update_ops_size(const struct update_ops *ops,
+ unsigned int update_count)
+{
+ struct update_op *op;
+ size_t total_size = sizeof(*ops);
+ unsigned int i;
+
+ op = (struct update_op *)&ops->uops_op[0];
+ for (i = 0; i < update_count; i++, op = update_op_next_op(op))
+ total_size += update_op_size(op->uop_param_count);
+
+ return total_size;
+}
+
+/*
+ * This is the update record format used to store the updates in
+ * disk. All updates of the operation will be stored in ur_ops.
+ * All of parameters for updates of the operation will be stored
+ * in ur_params.
+ * To save the space of the record, parameters in ur_ops will only
+ * remember their offset in ur_params, so to avoid storing duplicate
+ * parameters in ur_params, which can help us save a lot space for
+ * operation like creating striped directory.
+ */
+struct update_records {
+ __u64 ur_master_transno;
+ __u64 ur_batchid;
+ __u32 ur_flags;
+ __u32 ur_param_count;
+ __u32 ur_update_count;
+ struct update_ops ur_ops;
+ /* Note ur_ops has a variable size, so comment out
+ * the following ur_params, in case some use it directly
+ * update_records->ur_params
+ *
+ * struct update_params ur_params;
+ */
+};
+
+struct llog_update_record {
+ struct llog_rec_hdr lur_hdr;
+ struct update_records lur_update_rec;
+ /* Note ur_update_rec has a variable size, so comment out
+ * the following ur_tail, in case someone use it directly
+ *
+ * struct llog_rec_tail lur_tail;
+ */
+};
+
+static inline struct update_params *
+update_records_get_params(const struct update_records *record)
+{
+ return (struct update_params *)((char *)record +
+ offsetof(struct update_records, ur_ops) +
+ update_ops_size(&record->ur_ops, record->ur_update_count));
+}
+
+static inline size_t
+update_records_size(const struct update_records *record)
+{
+ struct update_params *params;
+
+ params = update_records_get_params(record);
+
+ return cfs_size_round(offsetof(struct update_records, ur_ops) +
+ update_ops_size(&record->ur_ops, record->ur_update_count) +
+ update_params_size(params, record->ur_param_count));
+}
+
+static inline size_t
+llog_update_record_size(const struct llog_update_record *lur)
+{
+ return cfs_size_round(sizeof(lur->lur_hdr) +
+ update_records_size(&lur->lur_update_rec) +
+ sizeof(struct llog_rec_tail));
+}
+
+static inline struct update_op *
+update_ops_get_op(const struct update_ops *ops, unsigned int index,
+ unsigned int update_count)
+{
+ struct update_op *op;
+ unsigned int i;
+
+ if (index > update_count)
+ return NULL;
+
+ op = (struct update_op *)&ops->uops_op[0];
+ for (i = 0; i < index; i++)
+ op = update_op_next_op(op);
+
+ return op;
+}
+
static inline void
*object_update_param_get(const struct object_update *update, size_t index,
size_t *size)
return 0;
}
-static inline void update_inc_batchid(struct dt_update_request *update)
-{
- update->dur_batchid++;
-}
+/**
+ * Attached in the thandle to record the updates for distribute
+ * distribution.
+ */
+struct thandle_update_records {
+ /* All of updates for the cross-MDT operation. */
+ struct llog_update_record *tur_update_records;
+ size_t tur_update_records_buf_size;
+
+ /* All of parameters for the cross-MDT operation */
+ struct update_params *tur_update_params;
+ unsigned int tur_update_param_count;
+ size_t tur_update_params_buf_size;
+};
+
+#define TOP_THANDLE_MAGIC 0x20140917
+/* {top,sub}_thandle are used to manage distributed transactions which
+ * include updates on several nodes. A top_handle represents the
+ * whole operation, and sub_thandle represents updates on each node. */
+struct top_thandle {
+ struct thandle tt_super;
+ __u32 tt_magic;
+ atomic_t tt_refcount;
+ /* The master sub transaction. */
+ struct thandle *tt_master_sub_thandle;
+
+ /* Other sub thandle will be listed here. */
+ struct list_head tt_sub_thandle_list;
+
+ /* All of update records will packed here */
+ struct thandle_update_records *tt_update_records;
+};
+
+struct sub_thandle {
+ /* point to the osd/osp_thandle */
+ struct thandle *st_sub_th;
+
+ /* linked to top_thandle */
+ struct list_head st_sub_list;
+
+ /* If this sub thandle is committed */
+ bool st_committed:1,
+ st_record_update:1;
+};
+
/* target/out_lib.c */
-int out_update_pack(const struct lu_env *env, struct update_buffer *ubuf,
- enum update_type op, const struct lu_fid *fid,
- int params_count, __u16 *param_sizes, const void **bufs,
- __u64 batchid);
-int out_create_pack(const struct lu_env *env, struct update_buffer *ubuf,
- const struct lu_fid *fid, struct lu_attr *attr,
- struct dt_allocation_hint *hint,
- struct dt_object_format *dof, __u64 batchid);
+int out_update_pack(const struct lu_env *env, struct object_update *update,
+ size_t max_update_size, enum update_type op,
+ const struct lu_fid *fid, unsigned int params_count,
+ __u16 *param_sizes, const void **param_bufs);
+int out_create_pack(const struct lu_env *env, struct object_update *update,
+ size_t max_update_size, const struct lu_fid *fid,
+ const struct lu_attr *attr, struct dt_allocation_hint *hint,
+ struct dt_object_format *dof);
int out_object_destroy_pack(const struct lu_env *env,
- struct update_buffer *ubuf,
- const struct lu_fid *fid, __u64 batchid);
-int out_index_delete_pack(const struct lu_env *env, struct update_buffer *ubuf,
- const struct lu_fid *fid, const struct dt_key *key,
- __u64 batchid);
-int out_index_insert_pack(const struct lu_env *env, struct update_buffer *ubuf,
+ struct object_update *update,
+ size_t max_update_size,
+ const struct lu_fid *fid);
+int out_index_delete_pack(const struct lu_env *env,
+ struct object_update *update, size_t max_update_size,
+ const struct lu_fid *fid, const struct dt_key *key);
+int out_index_insert_pack(const struct lu_env *env,
+ struct object_update *update, size_t max_update_size,
const struct lu_fid *fid, const struct dt_rec *rec,
- const struct dt_key *key, __u64 batchid);
-int out_xattr_set_pack(const struct lu_env *env, struct update_buffer *ubuf,
+ const struct dt_key *key);
+int out_xattr_set_pack(const struct lu_env *env,
+ struct object_update *update, size_t max_update_size,
const struct lu_fid *fid, const struct lu_buf *buf,
- const char *name, int flag, __u64 batchid);
-int out_xattr_del_pack(const struct lu_env *env, struct update_buffer *ubuf,
- const struct lu_fid *fid, const char *name,
- __u64 batchid);
-int out_attr_set_pack(const struct lu_env *env, struct update_buffer *ubuf,
- const struct lu_fid *fid, const struct lu_attr *attr,
- __u64 batchid);
-int out_ref_add_pack(const struct lu_env *env, struct update_buffer *ubuf,
- const struct lu_fid *fid, __u64 batchid);
-int out_ref_del_pack(const struct lu_env *env, struct update_buffer *ubuf,
- const struct lu_fid *fid, __u64 batchid);
-int out_write_pack(const struct lu_env *env, struct update_buffer *ubuf,
+ const char *name, __u32 flag);
+int out_xattr_del_pack(const struct lu_env *env,
+ struct object_update *update, size_t max_update_size,
+ const struct lu_fid *fid, const char *name);
+int out_attr_set_pack(const struct lu_env *env,
+ struct object_update *update, size_t max_update_size,
+ const struct lu_fid *fid, const struct lu_attr *attr);
+int out_ref_add_pack(const struct lu_env *env,
+ struct object_update *update, size_t max_update_size,
+ const struct lu_fid *fid);
+int out_ref_del_pack(const struct lu_env *env,
+ struct object_update *update, size_t max_update_size,
+ const struct lu_fid *fid);
+int out_write_pack(const struct lu_env *env,
+ struct object_update *update, size_t max_update_size,
const struct lu_fid *fid, const struct lu_buf *buf,
- loff_t pos, __u64 batchid);
-int out_attr_get_pack(const struct lu_env *env, struct update_buffer *ubuf,
+ __u64 pos);
+int out_attr_get_pack(const struct lu_env *env,
+ struct object_update *update, size_t max_update_size,
const struct lu_fid *fid);
-int out_index_lookup_pack(const struct lu_env *env, struct update_buffer *ubuf,
+int out_index_lookup_pack(const struct lu_env *env,
+ struct object_update *update, size_t max_update_size,
const struct lu_fid *fid, struct dt_rec *rec,
const struct dt_key *key);
-int out_xattr_get_pack(const struct lu_env *env, struct update_buffer *ubuf,
+int out_xattr_get_pack(const struct lu_env *env,
+ struct object_update *update, size_t max_update_size,
const struct lu_fid *fid, const char *name);
+const char *update_op_str(__u16 opcode);
+
/* target/update_trans.c */
struct thandle *thandle_get_sub_by_dt(const struct lu_env *env,
struct thandle *th,
struct thandle *th);
void top_thandle_destroy(struct top_thandle *top_th);
+
+/* update_records.c */
+int update_records_create_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid,
+ const struct lu_attr *attr,
+ const struct dt_allocation_hint *hint,
+ struct dt_object_format *dof);
+int update_records_attr_set_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid,
+ const struct lu_attr *attr);
+int update_records_ref_add_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid);
+int update_records_ref_del_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid);
+int update_records_object_destroy_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid);
+int update_records_index_insert_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid,
+ const struct dt_rec *rec,
+ const struct dt_key *key);
+int update_records_index_delete_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid,
+ const struct dt_key *key);
+int update_records_xattr_set_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid,
+ const struct lu_buf *buf, const char *name,
+ __u32 flag);
+int update_records_xattr_del_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid,
+ const char *name);
+int update_records_write_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid,
+ const struct lu_buf *buf,
+ __u64 pos);
+
+int tur_update_records_extend(struct thandle_update_records *tur,
+ size_t new_size);
+int tur_update_params_extend(struct thandle_update_records *tur,
+ size_t new_size);
+int check_and_prepare_update_record(const struct lu_env *env,
+ struct thandle *th);
+int merge_params_updates_buf(const struct lu_env *env,
+ struct thandle_update_records *tur);
+int tur_update_extend(struct thandle_update_records *tur,
+ size_t new_op_size, size_t new_param_size);
+
+#define update_record_pack(name, th, ...) \
+({ \
+ struct top_thandle *top_th; \
+ struct thandle_update_records *tur; \
+ struct llog_update_record *lur; \
+ size_t avail_param_size; \
+ size_t avail_op_size; \
+ int ret; \
+ \
+ while (1) { \
+ top_th = container_of(th, struct top_thandle, tt_super);\
+ tur = top_th->tt_update_records; \
+ lur = tur->tur_update_records; \
+ avail_param_size = tur->tur_update_params_buf_size - \
+ update_params_size(tur->tur_update_params, \
+ tur->tur_update_param_count); \
+ avail_op_size = tur->tur_update_records_buf_size - \
+ llog_update_record_size(lur); \
+ ret = update_records_##name##_pack(env, \
+ &lur->lur_update_rec.ur_ops, \
+ &lur->lur_update_rec.ur_update_count, \
+ &avail_op_size, \
+ tur->tur_update_params, \
+ &tur->tur_update_param_count, \
+ &avail_param_size, __VA_ARGS__); \
+ if (ret == -E2BIG) { \
+ ret = tur_update_extend(tur, avail_op_size, \
+ avail_param_size); \
+ if (ret != 0) \
+ break; \
+ continue; \
+ } else { \
+ break; \
+ } \
+ } \
+ ret; \
+})
#endif
/* lod_sub_object.c */
struct thandle *lod_sub_get_thandle(const struct lu_env *env,
struct thandle *th,
- const struct dt_object *sub_obj);
+ const struct dt_object *sub_obj,
+ bool *record_update);
int lod_sub_object_declare_create(const struct lu_env *env,
struct dt_object *dt,
struct lu_attr *attr,
}
/**
- * Create a striped directory.
+ * Declare create a striped directory.
*
- * Create a striped directory with a given stripe pattern on the specified MDTs.
- * A striped directory is represented as a regular directory - an index listing
- * all the stripes. The stripes point back to the master object with ".." and
- * LinkEA. The master object gets LMV EA which identifies it as a striped
- * directory. The function allocates FIDs for all the stripes.
+ * Declare creating a striped directory with a given stripe pattern on the
+ * specified MDTs. A striped directory is represented as a regular directory
+ * - an index listing all the stripes. The stripes point back to the master
+ * object with ".." and LinkEA. The master object gets LMV EA which
+ * identifies it as a striped directory. The function allocates FIDs
+ * for all stripes.
*
* \param[in] env execution environment
* \param[in] dt object
* \param[in] attr attributes to initialize the objects with
- * \param[in] lum a pattern specifying the number of stripes and
- * MDT to start from
* \param[in] dof type of objects to be created
* \param[in] th transaction handle
*
static int lod_xattr_set_internal(const struct lu_env *env,
struct dt_object *dt,
const struct lu_buf *buf,
- const char *name, int fl, struct thandle *th)
+ const char *name, int fl,
+ struct thandle *th)
{
struct dt_object *next = dt_object_child(dt);
struct lod_object *lo = lod_dt_obj(dt);
struct thandle *lod_sub_get_thandle(const struct lu_env *env,
struct thandle *th,
- const struct dt_object *sub_obj)
+ const struct dt_object *sub_obj,
+ bool *record_update)
{
struct lod_device *lod = dt2lod_dev(th->th_dev);
struct top_thandle *tth;
int rc;
ENTRY;
+ if (record_update != NULL)
+ *record_update = false;
+
if (th->th_top == NULL)
RETURN(th);
* creation, FID is not assigned until osp_object_create(),
* so if the FID of sub_obj is zero, it means OST object. */
if (!dt_object_remote(sub_obj) ||
- fid_is_zero(lu_object_fid(&sub_obj->do_lu)))
+ fid_is_zero(lu_object_fid(&sub_obj->do_lu))) {
+ /* local MDT object */
+ if (fid_is_sane(lu_object_fid(&sub_obj->do_lu)) &&
+ tth->tt_update_records != NULL &&
+ record_update != NULL)
+ *record_update = true;
+
RETURN(tth->tt_master_sub_thandle);
+ }
rc = lod_fld_lookup(env, lod, lu_object_fid(&sub_obj->do_lu),
&mdt_index, &type);
if (type == LU_SEQ_RANGE_OST)
RETURN(tth->tt_master_sub_thandle);
+ if (tth->tt_update_records != NULL && record_update != NULL)
+ *record_update = true;
+
sub_th = thandle_get_sub(env, th, sub_obj);
RETURN(sub_th);
{
struct thandle *sub_th;
- sub_th = lod_sub_get_thandle(env, th, dt);
+ sub_th = lod_sub_get_thandle(env, th, dt, NULL);
if (IS_ERR(sub_th))
return PTR_ERR(sub_th);
struct thandle *th)
{
struct thandle *sub_th;
+ bool record_update;
int rc;
ENTRY;
- sub_th = lod_sub_get_thandle(env, th, dt);
+ sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
if (IS_ERR(sub_th))
RETURN(PTR_ERR(sub_th));
+ if (record_update) {
+ rc = update_record_pack(create, th,
+ lu_object_fid(&dt->do_lu),
+ attr, hint, dof);
+ if (rc < 0)
+ RETURN(rc);
+ }
+
rc = dt_create(env, dt, attr, hint, dof, sub_th);
RETURN(rc);
int rc;
ENTRY;
- sub_th = lod_sub_get_thandle(env, th, dt);
+ sub_th = lod_sub_get_thandle(env, th, dt, NULL);
if (IS_ERR(sub_th))
RETURN(PTR_ERR(sub_th));
struct thandle *th)
{
struct thandle *sub_th;
+ bool record_update;
int rc;
ENTRY;
- sub_th = lod_sub_get_thandle(env, th, dt);
+ sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
if (IS_ERR(sub_th))
RETURN(PTR_ERR(sub_th));
+ if (record_update) {
+ rc = update_record_pack(ref_add, th,
+ lu_object_fid(&dt->do_lu));
+ if (rc < 0)
+ RETURN(rc);
+ }
+
rc = dt_ref_add(env, dt, sub_th);
RETURN(rc);
int rc;
ENTRY;
- sub_th = lod_sub_get_thandle(env, th, dt);
+ sub_th = lod_sub_get_thandle(env, th, dt, NULL);
if (IS_ERR(sub_th))
RETURN(PTR_ERR(sub_th));
struct thandle *th)
{
struct thandle *sub_th;
+ bool record_update;
int rc;
ENTRY;
- sub_th = lod_sub_get_thandle(env, th, dt);
+ sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
if (IS_ERR(sub_th))
RETURN(PTR_ERR(sub_th));
+ if (record_update) {
+ rc = update_record_pack(ref_del, th,
+ lu_object_fid(&dt->do_lu));
+ if (rc < 0)
+ RETURN(rc);
+ }
+
rc = dt_ref_del(env, dt, sub_th);
RETURN(rc);
int rc;
ENTRY;
- sub_th = lod_sub_get_thandle(env, th, dt);
+ sub_th = lod_sub_get_thandle(env, th, dt, NULL);
if (IS_ERR(sub_th))
RETURN(PTR_ERR(sub_th));
struct thandle *th)
{
struct thandle *sub_th;
+ bool record_update;
int rc;
ENTRY;
- sub_th = lod_sub_get_thandle(env, th, dt);
+ sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
if (IS_ERR(sub_th))
RETURN(PTR_ERR(sub_th));
+ if (record_update) {
+ rc = update_record_pack(object_destroy, th,
+ lu_object_fid(&dt->do_lu));
+ if (rc < 0)
+ RETURN(rc);
+ }
+
rc = dt_destroy(env, dt, sub_th);
RETURN(rc);
{
struct thandle *sub_th;
- sub_th = lod_sub_get_thandle(env, th, dt);
+ sub_th = lod_sub_get_thandle(env, th, dt, NULL);
if (IS_ERR(sub_th))
return PTR_ERR(sub_th);
int ign)
{
struct thandle *sub_th;
+ int rc;
+ bool record_update;
- sub_th = lod_sub_get_thandle(env, th, dt);
+ sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
if (IS_ERR(sub_th))
return PTR_ERR(sub_th);
+ if (record_update) {
+ rc = update_record_pack(index_insert, th,
+ lu_object_fid(&dt->do_lu), rec, key);
+ if (rc < 0)
+ return rc;
+ }
+
return dt_insert(env, dt, rec, key, sub_th, ign);
}
{
struct thandle *sub_th;
- sub_th = lod_sub_get_thandle(env, th, dt);
+ sub_th = lod_sub_get_thandle(env, th, dt, NULL);
if (IS_ERR(sub_th))
return PTR_ERR(sub_th);
const struct dt_key *name, struct thandle *th)
{
struct thandle *sub_th;
+ bool record_update;
int rc;
ENTRY;
- sub_th = lod_sub_get_thandle(env, th, dt);
+ sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
if (IS_ERR(sub_th))
RETURN(PTR_ERR(sub_th));
+ if (record_update) {
+ rc = update_record_pack(index_delete, th,
+ lu_object_fid(&dt->do_lu), name);
+ if (rc < 0)
+ RETURN(rc);
+ }
+
rc = dt_delete(env, dt, name, sub_th);
RETURN(rc);
}
int rc;
ENTRY;
- sub_th = lod_sub_get_thandle(env, th, dt);
+ sub_th = lod_sub_get_thandle(env, th, dt, NULL);
if (IS_ERR(sub_th))
RETURN(PTR_ERR(sub_th));
struct thandle *th)
{
struct thandle *sub_th;
+ bool record_update;
int rc;
ENTRY;
- sub_th = lod_sub_get_thandle(env, th, dt);
+ sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
if (IS_ERR(sub_th))
RETURN(PTR_ERR(sub_th));
+ if (record_update) {
+ rc = update_record_pack(xattr_set, th,
+ lu_object_fid(&dt->do_lu),
+ buf, name, fl);
+ if (rc < 0)
+ RETURN(rc);
+ }
+
rc = dt_xattr_set(env, dt, buf, name, fl, sub_th);
RETURN(rc);
int rc;
ENTRY;
- sub_th = lod_sub_get_thandle(env, th, dt);
+ sub_th = lod_sub_get_thandle(env, th, dt, NULL);
if (IS_ERR(sub_th))
RETURN(PTR_ERR(sub_th));
const struct lu_attr *attr,
struct thandle *th)
{
+ bool record_update;
struct thandle *sub_th;
int rc;
ENTRY;
- sub_th = lod_sub_get_thandle(env, th, dt);
+ sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
if (IS_ERR(sub_th))
RETURN(PTR_ERR(sub_th));
+ if (record_update) {
+ rc = update_record_pack(attr_set, th, lu_object_fid(&dt->do_lu),
+ attr);
+ if (rc < 0)
+ RETURN(rc);
+ }
+
rc = dt_attr_set(env, dt, attr, sub_th);
RETURN(rc);
int rc;
ENTRY;
- sub_th = lod_sub_get_thandle(env, th, dt);
+ sub_th = lod_sub_get_thandle(env, th, dt, NULL);
if (IS_ERR(sub_th))
RETURN(PTR_ERR(sub_th));
struct thandle *th)
{
struct thandle *sub_th;
+ bool record_update;
int rc;
ENTRY;
- sub_th = lod_sub_get_thandle(env, th, dt);
+ sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
if (IS_ERR(sub_th))
RETURN(PTR_ERR(sub_th));
+ if (record_update) {
+ rc = update_record_pack(xattr_del, th,
+ lu_object_fid(&dt->do_lu), name);
+ if (rc < 0)
+ RETURN(rc);
+ }
+
rc = dt_xattr_del(env, dt, name, sub_th);
RETURN(rc);
int rc;
ENTRY;
- sub_th = lod_sub_get_thandle(env, th, dt);
+ sub_th = lod_sub_get_thandle(env, th, dt, NULL);
if (IS_ERR(sub_th))
RETURN(PTR_ERR(sub_th));
struct thandle *th, int rq)
{
struct thandle *sub_th;
+ bool record_update;
ssize_t rc;
ENTRY;
- sub_th = lod_sub_get_thandle(env, th, dt);
+ sub_th = lod_sub_get_thandle(env, th, dt, &record_update);
if (IS_ERR(sub_th))
RETURN(PTR_ERR(sub_th));
+ if (record_update) {
+ rc = update_record_pack(write, th, lu_object_fid(&dt->do_lu),
+ buf, *pos);
+ if (rc < 0)
+ RETURN(rc);
+ }
+
rc = dt_write(env, dt, buf, pos, sub_th, rq);
RETURN(rc);
}
return imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_FID;
}
+struct object_update *
+update_buffer_get_update(struct object_update_request *request,
+ unsigned int index);
+
+int osp_extend_update_buffer(const struct lu_env *env,
+ struct update_buffer *ubuf);
+
+#define osp_update_rpc_pack(env, name, update, op, ...) \
+({ \
+ struct object_update *object_update; \
+ size_t max_update_length; \
+ struct object_update_request *ureq; \
+ int ret; \
+ \
+ while (1) { \
+ ureq = update->dur_buf.ub_req; \
+ max_update_length = update->dur_buf.ub_req_size - \
+ object_update_request_size(ureq); \
+ \
+ object_update = update_buffer_get_update(ureq, \
+ ureq->ourq_count); \
+ ret = out_##name##_pack(env, object_update, max_update_length, \
+ __VA_ARGS__); \
+ if (ret == -E2BIG) { \
+ int rc1; \
+ /* extend the buffer and retry */ \
+ rc1 = osp_extend_update_buffer(env, &update->dur_buf); \
+ if (rc1 != 0) { \
+ ret = rc1; \
+ break; \
+ } \
+ } else { \
+ if (ret == 0) { \
+ object_update->ou_flags |= update->dur_flags; \
+ ureq->ourq_count++; \
+ } \
+ break; \
+ } \
+ } \
+ ret; \
+})
+
typedef int (*osp_update_interpreter_t)(const struct lu_env *env,
struct object_update_reply *rep,
struct ptlrpc_request *req,
#include <lustre_log.h>
#include "osp_internal.h"
-static const char dot[] = ".";
-static const char dotdot[] = "..";
+#define OUT_UPDATE_BUFFER_SIZE_ADD 4096
+#define OUT_UPDATE_BUFFER_SIZE_MAX (256 * 4096) /* 1M update size now */
/**
* Interpreter call for object creation
return osp_trans_update_request_create(th);
}
+struct object_update *
+update_buffer_get_update(struct object_update_request *request,
+ unsigned int index)
+{
+ void *ptr;
+ int i;
+
+ if (index > request->ourq_count)
+ return NULL;
+
+ ptr = &request->ourq_updates[0];
+ for (i = 0; i < index; i++)
+ ptr += object_update_size(ptr);
+
+ return ptr;
+}
+
+int osp_extend_update_buffer(const struct lu_env *env,
+ struct update_buffer *ubuf)
+{
+ struct object_update_request *ureq;
+ size_t new_size = ubuf->ub_req_size + OUT_UPDATE_BUFFER_SIZE_ADD;
+
+ /* enlarge object update request size */
+ if (new_size > OUT_UPDATE_BUFFER_SIZE_MAX)
+ return -E2BIG;
+
+ OBD_ALLOC_LARGE(ureq, new_size);
+ if (ureq == NULL)
+ return -ENOMEM;
+
+ memcpy(ureq, ubuf->ub_req, ubuf->ub_req_size);
+
+ OBD_FREE_LARGE(ubuf->ub_req, ubuf->ub_req_size);
+
+ ubuf->ub_req = ureq;
+ ubuf->ub_req_size = new_size;
+
+ return 0;
+}
+
/**
* Implementation of dt_object_operations::do_create
*
update = thandle_to_dt_update_request(th);
LASSERT(update != NULL);
- rc = out_create_pack(env, &update->dur_buf,
- lu_object_fid(&dt->do_lu), attr, hint, dof,
- update->dur_batchid);
+ rc = osp_update_rpc_pack(env, create, update, OUT_CREATE,
+ lu_object_fid(&dt->do_lu), attr, hint, dof);
if (rc != 0)
GOTO(out, rc);
- rc = osp_insert_update_callback(env, update, dt2osp_obj(dt), attr,
+ rc = osp_insert_update_callback(env, update, dt2osp_obj(dt), NULL,
osp_object_create_interpreter);
if (rc < 0)
update = thandle_to_dt_update_request(th);
LASSERT(update != NULL);
- rc = out_ref_del_pack(env, &update->dur_buf,
- lu_object_fid(&dt->do_lu),
- update->dur_batchid);
+ rc = osp_update_rpc_pack(env, ref_del, update, OUT_REF_DEL,
+ lu_object_fid(&dt->do_lu));
return rc;
}
update = thandle_to_dt_update_request(th);
LASSERT(update != NULL);
- rc = out_ref_add_pack(env, &update->dur_buf,
- lu_object_fid(&dt->do_lu),
- update->dur_batchid);
+ rc = osp_update_rpc_pack(env, ref_add, update, OUT_REF_ADD,
+ lu_object_fid(&dt->do_lu));
return rc;
}
update = thandle_to_dt_update_request(th);
LASSERT(update != NULL);
- rc = out_attr_set_pack(env, &update->dur_buf,
- lu_object_fid(&dt->do_lu), attr,
- update->dur_batchid);
-
+ rc = osp_update_rpc_pack(env, attr_set, update, OUT_ATTR_SET,
+ lu_object_fid(&dt->do_lu), attr);
return rc;
}
if (IS_ERR(update))
RETURN(PTR_ERR(update));
- rc = out_index_lookup_pack(env, &update->dur_buf,
- lu_object_fid(&dt->do_lu), rec, key);
+ rc = osp_update_rpc_pack(env, index_lookup, update, OUT_INDEX_LOOKUP,
+ lu_object_fid(&dt->do_lu), rec, key);
if (rc != 0) {
CERROR("%s: Insert update error: rc = %d\n",
dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
struct dt_update_request *update = oth->ot_dur;
int rc;
-
- rc = out_index_insert_pack(env, &update->dur_buf,
- lu_object_fid(&dt->do_lu), rec, key,
- update->dur_batchid);
-
+ rc = osp_update_rpc_pack(env, index_insert, update, OUT_INDEX_INSERT,
+ lu_object_fid(&dt->do_lu), rec, key);
return rc;
}
update = thandle_to_dt_update_request(th);
LASSERT(update != NULL);
- rc = out_index_delete_pack(env, &update->dur_buf,
- lu_object_fid(&dt->do_lu), key,
- update->dur_batchid);
+ rc = osp_update_rpc_pack(env, index_delete, update, OUT_INDEX_DELETE,
+ lu_object_fid(&dt->do_lu), key);
+
return rc;
}
}
/**
+ * Interpreter call for object destroy
+ *
+ * Object destroy interpreter, which will be called after destroying
+ * the remote object to set flags and status.
+ *
+ * \param[in] env execution environment
+ * \param[in] reply update reply
+ * \param[in] req ptlrpc update request for destroying object
+ * \param[in] obj object to be destroyed
+ * \param[in] data data used in this function.
+ * \param[in] index index(position) of destroy update in the whole
+ * updates
+ * \param[in] rc update result on the remote MDT.
+ *
+ * \retval only return 0 for now
+ */
+static int osp_md_object_destroy_interpreter(const struct lu_env *env,
+ struct object_update_reply *reply,
+ struct ptlrpc_request *req,
+ struct osp_object *obj,
+ void *data, int index, int rc)
+{
+ /* not needed in cache any more */
+ set_bit(LU_OBJECT_HEARD_BANSHEE,
+ &obj->opo_obj.do_lu.lo_header->loh_flags);
+ return 0;
+}
+
+/**
* Implement OSP layer dt_object_operations::do_destroy() interface.
*
* Pack the destroy update into the RPC buffer, which will be sent
update = thandle_to_dt_update_request(th);
LASSERT(update != NULL);
- rc = out_object_destroy_pack(env, &update->dur_buf,
- lu_object_fid(&dt->do_lu), update->dur_batchid);
+ rc = osp_update_rpc_pack(env, object_destroy, update, OUT_DESTROY,
+ lu_object_fid(&dt->do_lu));
if (rc != 0)
RETURN(rc);
- /* not needed in cache any more */
- set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
-
+ rc = osp_insert_update_callback(env, update, dt2osp_obj(dt), NULL,
+ osp_md_object_destroy_interpreter);
RETURN(rc);
}
update = thandle_to_dt_update_request(th);
LASSERT(update != NULL);
- rc = out_write_pack(env, &update->dur_buf, lu_object_fid(&dt->do_lu),
- buf, *pos, update->dur_batchid);
+ rc = osp_update_rpc_pack(env, write, update, OUT_WRITE,
+ lu_object_fid(&dt->do_lu), buf, *pos);
if (rc < 0)
return rc;
if (IS_ERR(update))
RETURN(PTR_ERR(update));
- rc = out_attr_get_pack(env, &update->dur_buf,
- lu_object_fid(&dt->do_lu));
+ rc = osp_update_rpc_pack(env, attr_get, update, OUT_ATTR_GET,
+ lu_object_fid(&dt->do_lu));
if (rc != 0) {
CERROR("%s: Insert update error "DFID": rc = %d\n",
dev->dd_lu_dev.ld_obd->obd_name,
if (IS_ERR(update))
GOTO(out, rc = PTR_ERR(update));
- rc = out_xattr_get_pack(env, &update->dur_buf,
- lu_object_fid(&dt->do_lu), name);
+ rc = osp_update_rpc_pack(env, xattr_get, update, OUT_XATTR_GET,
+ lu_object_fid(&dt->do_lu), name);
if (rc != 0) {
CERROR("%s: Insert update error "DFID": rc = %d\n",
dname, PFID(lu_object_fid(&dt->do_lu)), rc);
CDEBUG(D_INODE, DFID" set xattr '%s' with size %zd\n",
PFID(lu_object_fid(&dt->do_lu)), name, buf->lb_len);
- rc = out_xattr_set_pack(env, &update->dur_buf,
- lu_object_fid(&dt->do_lu),
- buf, name, fl, update->dur_batchid);
+ rc = osp_update_rpc_pack(env, xattr_set, update, OUT_XATTR_SET,
+ lu_object_fid(&dt->do_lu), buf, name, fl);
if (rc != 0 || o->opo_ooa == NULL)
- return rc;
+ RETURN(rc);
oxe = osp_oac_xattr_find_or_add(o, name, buf->lb_len);
if (oxe == NULL) {
update = thandle_to_dt_update_request(th);
LASSERT(update != NULL);
- rc = out_xattr_del_pack(env, &update->dur_buf, fid, name,
- update->dur_batchid);
+ rc = osp_update_rpc_pack(env, xattr_del, update, OUT_XATTR_DEL,
+ fid, name);
if (rc != 0 || o->opo_ooa == NULL)
return rc;
OBD_FREE_PTR(dt_update);
}
+static void
+object_update_request_dump(const struct object_update_request *ourq,
+ unsigned int mask)
+{
+ unsigned int i;
+ size_t total_size = 0;
+
+ for (i = 0; i < ourq->ourq_count; i++) {
+ struct object_update *update;
+ size_t size = 0;
+
+ update = object_update_request_get(ourq, i, &size);
+ LASSERT(update != NULL);
+ CDEBUG(mask, "i = %u fid = "DFID" op = %s master = %u"
+ "params = %d batchid = "LPU64" size = %zu\n",
+ i, PFID(&update->ou_fid),
+ update_op_str(update->ou_type),
+ update->ou_master_index, update->ou_params_count,
+ update->ou_batchid, size);
+
+ total_size += size;
+ }
+
+ CDEBUG(mask, "updates = %p magic = %x count = %d size = %zu\n", ourq,
+ ourq->ourq_magic, ourq->ourq_count, total_size);
+}
+
/**
* Allocate an osp request and initialize it with the given parameters.
*
osp_update_interpreter_t interpreter)
{
struct osp_device *osp = lu2osp_dev(osp2lu_obj(obj)->lo_dev);
- struct dt_update_request *update;
- int rc = 0;
+ struct dt_update_request *update;
+ struct object_update *object_update;
+ size_t max_update_size;
+ struct object_update_request *ureq;
+ int rc = 0;
ENTRY;
update = osp_find_or_create_async_update_request(osp);
RETURN(PTR_ERR(update));
again:
+ ureq = update->dur_buf.ub_req;
+ max_update_size = update->dur_buf.ub_req_size -
+ object_update_request_size(ureq);
+
+ object_update = update_buffer_get_update(ureq, ureq->ourq_count);
+ rc = out_update_pack(env, object_update, max_update_size, op,
+ lu_object_fid(osp2lu_obj(obj)), count, lens, bufs);
/* The queue is full. */
- rc = out_update_pack(env, &update->dur_buf, op,
- lu_object_fid(osp2lu_obj(obj)), count, lens, bufs,
- 0);
if (rc == -E2BIG) {
osp->opd_async_requests = NULL;
mutex_unlock(&osp->opd_async_requests_mutex);
RETURN(PTR_ERR(update));
goto again;
+ } else {
+ if (rc < 0)
+ RETURN(rc);
+
+ ureq->ourq_count++;
}
rc = osp_insert_update_callback(env, update, obj, data, interpreter);
return PTR_ERR(update);
}
+ if (dt2osp_dev(th->th_dev)->opd_connect_mdt)
+ update->dur_flags = UPDATE_FL_SYNC;
+
oth->ot_dur = update;
return 0;
}
+
/**
* The OSP layer dt_device_operations::dt_trans_create() interface
* to create a transaction.
int rc;
ENTRY;
+ object_update_request_dump(ureq, D_INFO);
req = ptlrpc_request_alloc(imp, &RQF_OUT_UPDATE);
if (req == NULL)
RETURN(-ENOMEM);
target_objs := $(TARGET)tgt_main.o $(TARGET)tgt_lastrcvd.o
target_objs += $(TARGET)tgt_handler.o $(TARGET)out_handler.o
target_objs += $(TARGET)out_lib.o $(TARGET)update_trans.o
+target_objs += $(TARGET)update_records.o
ptlrpc_objs := client.o recover.o connection.o niobuf.o pack_generic.o
ptlrpc_objs += events.o ptlrpc_module.o service.o pinger.o
EXTRA_DIST = tgt_main.c tgt_lastrcvd.c tgt_handler.c tgt_internal.h \
out_handler.c out_lib.c
EXTRA_DIST += update_trans.c
+EXTRA_DIST += update_records.c
rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
if (rc != 0)
GOTO(next, rc);
+
+ if (update->ou_flags & UPDATE_FL_SYNC)
+ ta->ta_handle->th_sync = 1;
}
/* Stop the current update transaction, if the update has
if (rc != 0)
GOTO(next, rc);
+ if (update->ou_flags & UPDATE_FL_SYNC)
+ ta->ta_handle->th_sync = 1;
+
current_batchid = update->ou_batchid;
}
}
#define OUT_UPDATE_BUFFER_SIZE_ADD 4096
#define OUT_UPDATE_BUFFER_SIZE_MAX (256 * 4096) /* 1MB update size now */
-/**
- * resize update buffer
- *
- * Extend the update buffer by new_size.
- *
- * \param[in] ubuf update buffer to be extended
- * \param[in] new_size new size of the update buffer
- *
- * \retval 0 if extending succeeds.
- * \retval negative errno if extending fails.
- */
-static int update_buffer_resize(struct update_buffer *ubuf, size_t new_size)
-{
- struct object_update_request *ureq;
-
- if (new_size > ubuf->ub_req_size)
- return 0;
-
- OBD_ALLOC_LARGE(ureq, new_size);
- if (ureq == NULL)
- return -ENOMEM;
-
- memcpy(ureq, ubuf->ub_req, ubuf->ub_req_size);
- OBD_FREE_LARGE(ubuf->ub_req, ubuf->ub_req_size);
-
- ubuf->ub_req = ureq;
- ubuf->ub_req_size = new_size;
-
- return 0;
+const char *update_op_str(__u16 opc)
+{
+ static const char *opc_str[] = {
+ [OUT_START] = "start",
+ [OUT_CREATE] = "create",
+ [OUT_DESTROY] = "destroy",
+ [OUT_REF_ADD] = "ref_add",
+ [OUT_REF_DEL] = "ref_del" ,
+ [OUT_ATTR_SET] = "attr_set",
+ [OUT_ATTR_GET] = "attr_get",
+ [OUT_XATTR_SET] = "xattr_set",
+ [OUT_XATTR_GET] = "xattr_get",
+ [OUT_INDEX_LOOKUP] = "lookup",
+ [OUT_INDEX_INSERT] = "insert",
+ [OUT_INDEX_DELETE] = "delete",
+ [OUT_WRITE] = "write",
+ [OUT_XATTR_DEL] = "xattr_del",
+ };
+
+ if (opc < ARRAY_SIZE(opc_str) && opc_str[opc] != NULL)
+ return opc_str[opc];
+ else
+ return "unknown";
}
+EXPORT_SYMBOL(update_op_str);
/**
- * Pack the header of object_update_request
+ * Fill object update header
*
- * Packs updates into the update_buffer header, which will either be sent to
- * the remote MDT or stored in the local update log. The maximum update buffer
- * size is 1MB for now.
+ * Only fill the object update header, and parameters will be filled later
+ * in other functions.
*
- * \param[in] env execution environment
- * \param[in] ubuf update bufer which it will pack the update in
- * \param[in] op update operation
- * \param[in] fid object FID for this update
- * \param[in] param_count parameters count for this update
- * \param[in] lens each parameters length of this update
- * \param[in] batchid batchid(transaction no) of this update
+ * \params[in] env execution environment
+ * \params[in] update object update to be filled
+ * \params[in] max_update_size maximum object update size, if the current
+ * update length equals or exceeds the size, it
+ * will return -E2BIG.
+ * \params[in] update_op update type
+ * \params[in] fid object FID of the update
+ * \params[in] params_count the count of the update parameters
+ * \params[in] params_sizes the length of each parameters
*
- * \retval 0 pack update succeed.
- * \retval negative errno pack update failed.
- **/
-static struct object_update *
-out_update_header_pack(const struct lu_env *env, struct update_buffer *ubuf,
- enum update_type op, const struct lu_fid *fid,
- int params_count, __u16 *param_sizes, __u64 batchid)
+ * \retval 0 if packing succeeds.
+ * \retval -E2BIG if packing exceeds the maximum length.
+ */
+int out_update_header_pack(const struct lu_env *env,
+ struct object_update *update, size_t max_update_size,
+ enum update_type update_op, const struct lu_fid *fid,
+ unsigned int param_count, __u16 *params_sizes)
{
- struct object_update_request *ureq = ubuf->ub_req;
- size_t ureq_size = ubuf->ub_req_size;
- struct object_update *obj_update;
struct object_update_param *param;
- size_t update_size;
- int rc = 0;
unsigned int i;
- ENTRY;
-
- /* Check update size to make sure it can fit into the buffer */
- ureq_size = object_update_request_size(ureq);
- update_size = offsetof(struct object_update, ou_params[0]);
- for (i = 0; i < params_count; i++)
- update_size += cfs_size_round(param_sizes[i] + sizeof(*param));
-
- if (unlikely(cfs_size_round(ureq_size + update_size) >
- ubuf->ub_req_size)) {
- size_t new_size = ubuf->ub_req_size;
-
- /* enlarge object update request size */
- while (new_size <
- cfs_size_round(ureq_size + update_size))
- new_size += OUT_UPDATE_BUFFER_SIZE_ADD;
- if (new_size >= OUT_UPDATE_BUFFER_SIZE_MAX)
- RETURN(ERR_PTR(-E2BIG));
+ size_t update_size;
- rc = update_buffer_resize(ubuf, new_size);
- if (rc < 0)
- RETURN(ERR_PTR(rc));
+ /* Check whether the packing exceeding the maxima update length */
+ update_size = sizeof(*update);
+ for (i = 0; i < param_count; i++)
+ update_size += cfs_size_round(sizeof(*param) + params_sizes[i]);
- ureq = ubuf->ub_req;
- }
+ if (unlikely(update_size >= max_update_size))
+ return -E2BIG;
- /* fill the update into the update buffer */
- obj_update = (struct object_update *)((char *)ureq + ureq_size);
- obj_update->ou_fid = *fid;
- obj_update->ou_type = op;
- obj_update->ou_params_count = (__u16)params_count;
- obj_update->ou_batchid = batchid;
- param = &obj_update->ou_params[0];
- for (i = 0; i < params_count; i++) {
- param->oup_len = param_sizes[i];
+ update->ou_fid = *fid;
+ update->ou_type = update_op;
+ update->ou_params_count = param_count;
+ param = &update->ou_params[0];
+ for (i = 0; i < param_count; i++) {
+ param->oup_len = params_sizes[i];
param = (struct object_update_param *)((char *)param +
object_update_param_size(param));
}
- CDEBUG(D_INFO, "%p "DFID" idx %u: op %d params %d:%d\n",
- ureq, PFID(fid), ureq->ourq_count, op, params_count,
- (int)update_size);
- ureq->ourq_count++;
-
- RETURN(obj_update);
+ return 0;
}
/**
* Packs one update into the update_buffer.
*
* \param[in] env execution environment
- * \param[in] ubuf bufer where update will be packed
+ * \param[in] update update to be packed
+ * \param[in] max_update_size *maximum size of \a update
* \param[in] op update operation (enum update_type)
* \param[in] fid object FID for this update
* \param[in] param_count number of parameters for this update
* \param[in] param_sizes array of parameters length of this update
* \param[in] param_bufs parameter buffers
- * \param[in] batchid transaction no of this update, plus mdt_index, which
- * will be globally unique
*
* \retval = 0 if updates packing succeeds
* \retval negative errno if updates packing fails
**/
-int out_update_pack(const struct lu_env *env, struct update_buffer *ubuf,
- enum update_type op, const struct lu_fid *fid,
- int params_count, __u16 *param_sizes,
- const void **param_bufs, __u64 batchid)
+int out_update_pack(const struct lu_env *env, struct object_update *update,
+ size_t max_update_size, enum update_type op,
+ const struct lu_fid *fid, unsigned int param_count,
+ __u16 *param_sizes, const void **param_bufs)
{
- struct object_update *update;
struct object_update_param *param;
unsigned int i;
+ int rc;
ENTRY;
- update = out_update_header_pack(env, ubuf, op, fid, params_count,
- param_sizes, batchid);
- if (IS_ERR(update))
- RETURN(PTR_ERR(update));
+ rc = out_update_header_pack(env, update, max_update_size, op, fid,
+ param_count, param_sizes);
+ if (rc != 0)
+ RETURN(rc);
param = &update->ou_params[0];
- for (i = 0; i < params_count; i++) {
+ for (i = 0; i < param_count; i++) {
memcpy(¶m->oup_buf[0], param_bufs[i], param_sizes[i]);
param = (struct object_update_param *)((char *)param +
object_update_param_size(param));
* \param[in] env execution environment
* \param[in] ubuf update buffer
* \param[in] fid fid of this object for the update
- * \param[in] batchid batch id of this update
*
* \retval 0 if insertion succeeds.
* \retval negative errno if insertion fails.
*/
-int out_create_pack(const struct lu_env *env, struct update_buffer *ubuf,
- const struct lu_fid *fid, struct lu_attr *attr,
- struct dt_allocation_hint *hint,
- struct dt_object_format *dof, __u64 batchid)
+int out_create_pack(const struct lu_env *env, struct object_update *update,
+ size_t max_update_size, const struct lu_fid *fid,
+ const struct lu_attr *attr, struct dt_allocation_hint *hint,
+ struct dt_object_format *dof)
{
struct obdo *obdo;
__u16 sizes[2] = {sizeof(*obdo), 0};
int buf_count = 1;
- const struct lu_fid *fid1 = NULL;
- struct object_update *update;
+ const struct lu_fid *parent_fid = NULL;
+ int rc;
ENTRY;
if (hint != NULL && hint->dah_parent) {
- fid1 = lu_object_fid(&hint->dah_parent->do_lu);
- sizes[1] = sizeof(*fid1);
+ parent_fid = lu_object_fid(&hint->dah_parent->do_lu);
+ sizes[1] = sizeof(*parent_fid);
buf_count++;
}
- update = out_update_header_pack(env, ubuf, OUT_CREATE, fid,
- buf_count, sizes, batchid);
- if (IS_ERR(update))
- RETURN(PTR_ERR(update));
+ rc = out_update_header_pack(env, update, max_update_size, OUT_CREATE,
+ fid, buf_count, sizes);
+ if (rc != 0)
+ RETURN(rc);
obdo = object_update_param_get(update, 0, NULL);
+ LASSERT(obdo != NULL);
obdo->o_valid = 0;
obdo_from_la(obdo, attr, attr->la_valid);
lustre_set_wire_obdo(NULL, obdo, obdo);
- if (fid1 != NULL) {
- struct lu_fid *fid;
- fid = object_update_param_get(update, 1, NULL);
- fid_cpu_to_le(fid, fid1);
+
+ if (parent_fid != NULL) {
+ struct lu_fid *tmp;
+
+ tmp = object_update_param_get(update, 1, NULL);
+ LASSERT(tmp != NULL);
+ fid_cpu_to_le(tmp, parent_fid);
}
RETURN(0);
}
EXPORT_SYMBOL(out_create_pack);
-int out_ref_del_pack(const struct lu_env *env, struct update_buffer *ubuf,
- const struct lu_fid *fid, __u64 batchid)
+int out_ref_del_pack(const struct lu_env *env, struct object_update *update,
+ size_t max_update_size, const struct lu_fid *fid)
{
- return out_update_pack(env, ubuf, OUT_REF_DEL, fid, 0, NULL, NULL,
- batchid);
+ return out_update_pack(env, update, max_update_size, OUT_REF_DEL, fid,
+ 0, NULL, NULL);
}
EXPORT_SYMBOL(out_ref_del_pack);
-int out_ref_add_pack(const struct lu_env *env, struct update_buffer *ubuf,
- const struct lu_fid *fid, __u64 batchid)
+int out_ref_add_pack(const struct lu_env *env, struct object_update *update,
+ size_t max_update_size, const struct lu_fid *fid)
{
- return out_update_pack(env, ubuf, OUT_REF_ADD, fid, 0, NULL, NULL,
- batchid);
+ return out_update_pack(env, update, max_update_size, OUT_REF_ADD, fid,
+ 0, NULL, NULL);
}
EXPORT_SYMBOL(out_ref_add_pack);
-int out_attr_set_pack(const struct lu_env *env, struct update_buffer *ubuf,
- const struct lu_fid *fid, const struct lu_attr *attr,
- __u64 batchid)
+int out_attr_set_pack(const struct lu_env *env, struct object_update *update,
+ size_t max_update_size, const struct lu_fid *fid,
+ const struct lu_attr *attr)
{
- struct object_update *update;
struct obdo *obdo;
__u16 size = sizeof(*obdo);
+ int rc;
ENTRY;
- update = out_update_header_pack(env, ubuf, OUT_ATTR_SET, fid, 1,
- &size, batchid);
- if (IS_ERR(update))
- RETURN(PTR_ERR(update));
+ rc = out_update_header_pack(env, update, max_update_size,
+ OUT_ATTR_SET, fid, 1, &size);
+ if (rc != 0)
+ RETURN(rc);
obdo = object_update_param_get(update, 0, NULL);
+ LASSERT(obdo != NULL);
obdo->o_valid = 0;
obdo_from_la(obdo, attr, attr->la_valid);
lustre_set_wire_obdo(NULL, obdo, obdo);
}
EXPORT_SYMBOL(out_attr_set_pack);
-int out_xattr_set_pack(const struct lu_env *env, struct update_buffer *ubuf,
- const struct lu_fid *fid, const struct lu_buf *buf,
- const char *name, int flag, __u64 batchid)
+int out_xattr_set_pack(const struct lu_env *env, struct object_update *update,
+ size_t max_update_size, const struct lu_fid *fid,
+ const struct lu_buf *buf, const char *name, __u32 flag)
{
__u16 sizes[3] = {strlen(name) + 1, buf->lb_len, sizeof(flag)};
const void *bufs[3] = {(char *)name, (char *)buf->lb_buf,
(char *)&flag};
- return out_update_pack(env, ubuf, OUT_XATTR_SET, fid,
- ARRAY_SIZE(sizes), sizes, bufs, batchid);
+ return out_update_pack(env, update, max_update_size, OUT_XATTR_SET,
+ fid, ARRAY_SIZE(sizes), sizes, bufs);
}
EXPORT_SYMBOL(out_xattr_set_pack);
-int out_xattr_del_pack(const struct lu_env *env, struct update_buffer *ubuf,
- const struct lu_fid *fid, const char *name,
- __u64 batchid)
+int out_xattr_del_pack(const struct lu_env *env, struct object_update *update,
+ size_t max_update_size, const struct lu_fid *fid,
+ const char *name)
{
__u16 size = strlen(name) + 1;
- return out_update_pack(env, ubuf, OUT_XATTR_DEL, fid, 1, &size,
- (const void **)&name, batchid);
+ return out_update_pack(env, update, max_update_size, OUT_XATTR_DEL,
+ fid, 1, &size, (const void **)&name);
}
EXPORT_SYMBOL(out_xattr_del_pack);
-int out_index_insert_pack(const struct lu_env *env, struct update_buffer *ubuf,
- const struct lu_fid *fid, const struct dt_rec *rec,
- const struct dt_key *key, __u64 batchid)
+int out_index_insert_pack(const struct lu_env *env,
+ struct object_update *update,
+ size_t max_update_size, const struct lu_fid *fid,
+ const struct dt_rec *rec, const struct dt_key *key)
{
struct dt_insert_rec *rec1 = (struct dt_insert_rec *)rec;
struct lu_fid rec_fid;
fid_cpu_to_le(&rec_fid, rec1->rec_fid);
- return out_update_pack(env, ubuf, OUT_INDEX_INSERT, fid,
- ARRAY_SIZE(sizes), sizes, bufs, batchid);
+ return out_update_pack(env, update, max_update_size, OUT_INDEX_INSERT,
+ fid, ARRAY_SIZE(sizes), sizes, bufs);
}
EXPORT_SYMBOL(out_index_insert_pack);
-int out_index_delete_pack(const struct lu_env *env, struct update_buffer *ubuf,
- const struct lu_fid *fid, const struct dt_key *key,
- __u64 batchid)
+int out_index_delete_pack(const struct lu_env *env,
+ struct object_update *update,
+ size_t max_update_size, const struct lu_fid *fid,
+ const struct dt_key *key)
{
__u16 size = strlen((char *)key) + 1;
const void *buf = key;
- return out_update_pack(env, ubuf, OUT_INDEX_DELETE, fid, 1, &size,
- &buf, batchid);
+ return out_update_pack(env, update, max_update_size, OUT_INDEX_DELETE,
+ fid, 1, &size, &buf);
}
EXPORT_SYMBOL(out_index_delete_pack);
int out_object_destroy_pack(const struct lu_env *env,
- struct update_buffer *ubuf,
- const struct lu_fid *fid, __u64 batchid)
+ struct object_update *update,
+ size_t max_update_size, const struct lu_fid *fid)
{
- return out_update_pack(env, ubuf, OUT_DESTROY, fid, 0, NULL, NULL,
- batchid);
+ return out_update_pack(env, update, max_update_size, OUT_DESTROY, fid,
+ 0, NULL, NULL);
}
EXPORT_SYMBOL(out_object_destroy_pack);
-int out_write_pack(const struct lu_env *env, struct update_buffer *ubuf,
- const struct lu_fid *fid, const struct lu_buf *buf,
- loff_t pos, __u64 batchid)
+int out_write_pack(const struct lu_env *env, struct object_update *update,
+ size_t max_update_size, const struct lu_fid *fid,
+ const struct lu_buf *buf, __u64 pos)
{
__u16 sizes[2] = {buf->lb_len, sizeof(pos)};
const void *bufs[2] = {(char *)buf->lb_buf, (char *)&pos};
pos = cpu_to_le64(pos);
- rc = out_update_pack(env, ubuf, OUT_WRITE, fid, ARRAY_SIZE(sizes),
- sizes, bufs, batchid);
+ rc = out_update_pack(env, update, max_update_size, OUT_WRITE, fid,
+ ARRAY_SIZE(sizes), sizes, bufs);
return rc;
}
EXPORT_SYMBOL(out_write_pack);
* \param[in] fid fid of this object for the update
* \param[in] ubuf update buffer
*
- * \retval 0 if packing succeeds.
- * \retval negative errno if packing fails.
- */
-int out_index_lookup_pack(const struct lu_env *env, struct update_buffer *ubuf,
- const struct lu_fid *fid, struct dt_rec *rec,
- const struct dt_key *key)
+ * \retval = 0 pack succeed.
+ * < 0 pack failed.
+ **/
+int out_index_lookup_pack(const struct lu_env *env,
+ struct object_update *update,
+ size_t max_update_size, const struct lu_fid *fid,
+ struct dt_rec *rec, const struct dt_key *key)
{
const void *name = key;
__u16 size = strlen((char *)name) + 1;
- return out_update_pack(env, ubuf, OUT_INDEX_LOOKUP, fid, 1, &size,
- &name, 0);
+ return out_update_pack(env, update, max_update_size, OUT_INDEX_LOOKUP,
+ fid, 1, &size, &name);
}
EXPORT_SYMBOL(out_index_lookup_pack);
-int out_attr_get_pack(const struct lu_env *env, struct update_buffer *ubuf,
- const struct lu_fid *fid)
+int out_attr_get_pack(const struct lu_env *env, struct object_update *update,
+ size_t max_update_size, const struct lu_fid *fid)
{
- return out_update_pack(env, ubuf, OUT_ATTR_GET, fid, 0, NULL, NULL, 0);
+ return out_update_pack(env, update, max_update_size, OUT_ATTR_GET,
+ fid, 0, NULL, NULL);
}
EXPORT_SYMBOL(out_attr_get_pack);
-int out_xattr_get_pack(const struct lu_env *env, struct update_buffer *ubuf,
- const struct lu_fid *fid, const char *name)
+int out_xattr_get_pack(const struct lu_env *env, struct object_update *update,
+ size_t max_update_size, const struct lu_fid *fid,
+ const char *name)
{
__u16 size;
LASSERT(name != NULL);
size = strlen(name) + 1;
- return out_update_pack(env, ubuf, OUT_XATTR_GET, fid, 1, &size,
- (const void **)&name, 0);
+
+ return out_update_pack(env, update, max_update_size, OUT_XATTR_GET,
+ fid, 1, &size, (const void **)&name);
}
EXPORT_SYMBOL(out_xattr_get_pack);
#define out_tx_write(info, obj, buf, pos, th, reply, idx) \
__out_tx_write(info, obj, buf, pos, th, reply, idx, __FILE__, __LINE__)
+const char *update_op_str(__u16 opcode);
+
extern struct page *tgt_page_to_corrupt;
struct tgt_thread_big_cache {
int tgt_txn_stop_cb(const struct lu_env *env, struct thandle *th,
void *cookie);
+void update_records_dump(const struct update_records *records,
+ unsigned int mask, bool dump_updates);
+
+struct update_thread_info {
+ struct lu_attr uti_attr;
+ struct lu_fid uti_fid;
+ struct lu_buf uti_buf;
+ struct thandle_update_records uti_tur;
+ struct obdo uti_obdo;
+};
+
+extern struct lu_context_key update_thread_key;
+
+static inline struct update_thread_info *
+update_env_info(const struct lu_env *env)
+{
+ struct update_thread_info *uti;
+
+ uti = lu_context_key_get(&env->le_ctx, &update_thread_key);
+ LASSERT(uti != NULL);
+ return uti;
+}
+
+void update_info_init(void);
+void update_info_fini(void);
#endif /* _TG_INTERNAL_H */
tgt_ses_key_init_generic(&tgt_session_key, NULL);
lu_context_key_register_many(&tgt_session_key, NULL);
+ update_info_init();
+
RETURN(0);
}
lu_context_key_degister(&tgt_thread_key);
lu_context_key_degister(&tgt_session_key);
+ update_info_fini();
}
--- /dev/null
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2014, Intel Corporation.
+ */
+
+/*
+ * lustre/target/update_records.c
+ *
+ * This file implement the methods to pack updates as update records, which
+ * will be written to the disk as llog record, and might be used during
+ * recovery.
+ *
+ * For cross-MDT operation, all of updates of the operation needs to be
+ * recorded in the disk, then during recovery phase, the recovery thread
+ * will retrieve and redo these updates if it needed.
+ *
+ * See comments above struct update_records for the format of update_records.
+ *
+ * Author: Di Wang <di.wang@intel.com>
+ */
+#define DEBUG_SUBSYSTEM S_CLASS
+
+#include <lu_target.h>
+#include <lustre_update.h>
+#include <obd.h>
+#include <obd_class.h>
+#include "tgt_internal.h"
+
+#define UPDATE_RECORDS_BUFFER_SIZE 8192
+#define UPDATE_PARAMS_BUFFER_SIZE 8192
+/**
+ * Dump update record.
+ *
+ * Dump all of updates in the update_records, mostly for debugging purpose.
+ *
+ * \param[in] records update records to be dumpped
+ * \param[in] mask debug level mask
+ * \param[in] dump_params if dump all of updates the updates.
+ *
+ */
+void update_records_dump(const struct update_records *records,
+ unsigned int mask, bool dump_updates)
+{
+ const struct update_ops *ops;
+ const struct update_op *op = NULL;
+ struct update_params *params;
+ unsigned int i;
+
+ ops = &records->ur_ops;
+ params = update_records_get_params(records);
+
+ CDEBUG(mask, "ops = %d params_count = %d\n", records->ur_update_count,
+ records->ur_param_count);
+
+ if (records->ur_update_count == 0)
+ return;
+
+ if (!dump_updates)
+ return;
+
+ op = &ops->uops_op[0];
+ for (i = 0; i < records->ur_update_count; i++) {
+ unsigned int j;
+
+ CDEBUG(mask, "update %dth "DFID" %s params_count = %hu\n", i,
+ PFID(&op->uop_fid), update_op_str(op->uop_type),
+ op->uop_param_count);
+
+ for (j = 0; j < op->uop_param_count; j++) {
+ struct object_update_param *param;
+
+ param = update_params_get_param(params,
+ (unsigned int)op->uop_params_off[j],
+ records->ur_param_count);
+
+ LASSERT(param != NULL);
+ CDEBUG(mask, "param = %p %dth off = %hu size = %hu\n",
+ param, j, op->uop_params_off[j], param->oup_len);
+ }
+
+ op = update_op_next_op(op);
+ }
+}
+
+/**
+ * Pack parameters to update records
+ *
+ * Find and insert parameter to update records, if the parameter
+ * already exists in \a params, then just return the offset of this
+ * parameter, otherwise insert the parameter and return its offset
+ *
+ * \param[in] params update params in which to insert parameter
+ * \param[in] new_param parameters to be inserted.
+ * \param[in] new_param_size the size of \a new_param
+ *
+ * \retval index inside \a params if parameter insertion
+ * succeeds.
+ * \retval negative errno if it fails.
+ */
+static unsigned int update_records_param_pack(struct update_params *params,
+ const void *new_param,
+ size_t new_param_size,
+ unsigned int *param_count)
+{
+ struct object_update_param *param;
+ unsigned int i;
+
+ for (i = 0; i < *param_count; i++) {
+ struct object_update_param *param;
+
+ param = update_params_get_param(params, i, *param_count);
+ if ((new_param == NULL && param->oup_len == new_param_size) ||
+ (param->oup_len == new_param_size &&
+ memcmp(param->oup_buf, new_param, new_param_size) == 0))
+ /* Found the parameter and return its index */
+ return i;
+ }
+
+ param = (struct object_update_param *)((char *)params +
+ update_params_size(params, *param_count));
+
+ param->oup_len = new_param_size;
+ if (new_param != NULL)
+ memcpy(param->oup_buf, new_param, new_param_size);
+
+ *param_count = *param_count + 1;
+
+ return *param_count - 1;
+}
+
+/**
+ * Pack update to update records
+ *
+ * Pack the update and its parameters to the update records. First it will
+ * insert parameters, get the offset of these parameter, then fill the
+ * update with these offset. If insertion exceed the maximum size of
+ * current update records, it will return -E2BIG here, and the caller might
+ * extend the update_record size \see lod_updates_pack.
+ *
+ * \param[in] env execution environment
+ * \param[in] fid FID of the update.
+ * \param[in] op_type operation type of the update
+ * \param[in] ops ur_ops in update records
+ * \param[in|out] op_count pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params ur_params in update records
+ * \param[in|out] param_count pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] param_bufs buffers of parameters
+ * \param[in] params_buf_count the count of the parameter buffers
+ * \param[in] param_size sizes of parameters
+ *
+ * \retval 0 if packing succeeds
+ * \retval negative errno if packing fails
+ */
+static int update_records_update_pack(const struct lu_env *env,
+ const struct lu_fid *fid,
+ enum update_type op_type,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_op_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ unsigned int param_bufs_count,
+ const void **param_bufs,
+ size_t *param_sizes)
+{
+ struct update_op *op;
+ size_t total_param_sizes = 0;
+ int index;
+ unsigned int i;
+
+ /* Check whether the packing exceeding the maximum update size */
+ if (unlikely(*max_op_size < update_op_size(param_bufs_count))) {
+ CDEBUG(D_INFO, "max_op_size = %zu update_op = %zu\n",
+ *max_op_size, update_op_size(param_bufs_count));
+ *max_op_size = update_op_size(param_bufs_count);
+ return -E2BIG;
+ }
+
+ for (i = 0; i < param_bufs_count; i++)
+ total_param_sizes +=
+ cfs_size_round(sizeof(struct object_update_param) +
+ param_sizes[i]);
+
+ /* Check whether the packing exceeding the maximum parameter size */
+ if (unlikely(*max_param_size < total_param_sizes)) {
+ CDEBUG(D_INFO, "max_param_size = %zu params size = %zu\n",
+ *max_param_size, total_param_sizes);
+
+ *max_param_size = total_param_sizes;
+ return -E2BIG;
+ }
+
+ op = update_ops_get_op(ops, *op_count, *op_count);
+ op->uop_fid = *fid;
+ op->uop_type = op_type;
+ op->uop_param_count = param_bufs_count;
+ for (i = 0; i < param_bufs_count; i++) {
+ index = update_records_param_pack(params, param_bufs[i],
+ param_sizes[i], param_count);
+ if (index < 0)
+ return index;
+
+ CDEBUG(D_INFO, "%s %uth param offset = %d size = %zu\n",
+ update_op_str(op_type), i, index, param_sizes[i]);
+
+ op->uop_params_off[i] = index;
+ }
+ CDEBUG(D_INFO, "%huth "DFID" %s param_count = %u\n",
+ *op_count, PFID(fid), update_op_str(op_type), *param_count);
+
+ *op_count = *op_count + 1;
+
+ return 0;
+}
+
+/**
+ * Pack create update
+ *
+ * Pack create update into update records.
+ *
+ * \param[in] env execution environment
+ * \param[in] ops ur_ops in update records
+ * \param[in|out] op_count pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params ur_params in update records
+ * \param[in|out] param_count pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid FID of the object to be created
+ * \param[in] attr attribute of the object to be created
+ * \param[in] hint creation hint
+ * \param[in] dof creation format information
+ *
+ * \retval 0 if packing succeeds.
+ * \retval negative errno if packing fails.
+ */
+int update_records_create_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid,
+ const struct lu_attr *attr,
+ const struct dt_allocation_hint *hint,
+ struct dt_object_format *dof)
+{
+ size_t sizes[2];
+ const void *bufs[2];
+ int buf_count = 0;
+ const struct lu_fid *parent_fid = NULL;
+ struct lu_fid tmp_fid;
+ int rc;
+ struct obdo *obdo;
+
+ if (attr != NULL) {
+ obdo = &update_env_info(env)->uti_obdo;
+ obdo->o_valid = 0;
+ obdo_from_la(obdo, attr, attr->la_valid);
+ lustre_set_wire_obdo(NULL, obdo, obdo);
+ bufs[buf_count] = obdo;
+ sizes[buf_count] = sizeof(*obdo);
+ buf_count++;
+ }
+
+ if (hint != NULL && hint->dah_parent != NULL) {
+ parent_fid = lu_object_fid(&hint->dah_parent->do_lu);
+ fid_cpu_to_le(&tmp_fid, parent_fid);
+ bufs[buf_count] = &tmp_fid;
+ sizes[buf_count] = sizeof(tmp_fid);
+ buf_count++;
+ }
+
+ rc = update_records_update_pack(env, fid, OUT_CREATE, ops, op_count,
+ max_ops_size, params, param_count,
+ max_param_size, buf_count, bufs, sizes);
+ return rc;
+}
+EXPORT_SYMBOL(update_records_create_pack);
+
+/**
+ * Pack attr set update
+ *
+ * Pack attr_set update into update records.
+ *
+ * \param[in] env execution environment
+ * \param[in] ops ur_ops in update records
+ * \param[in|out] op_count pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params ur_params in update records
+ * \param[in|out] param_count pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid FID of the object to set attr
+ * \param[in] attr attribute of attr set
+ *
+ * \retval 0 if packing succeeds.
+ * \retval negative errno if packing fails.
+ */
+int update_records_attr_set_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid,
+ const struct lu_attr *attr)
+{
+ struct obdo *obdo = &update_env_info(env)->uti_obdo;
+ size_t size = sizeof(*obdo);
+
+ obdo->o_valid = 0;
+ obdo_from_la(obdo, attr, attr->la_valid);
+ lustre_set_wire_obdo(NULL, obdo, obdo);
+ return update_records_update_pack(env, fid, OUT_ATTR_SET, ops, op_count,
+ max_ops_size, params, param_count,
+ max_param_size, 1,
+ (const void **)&obdo, &size);
+}
+EXPORT_SYMBOL(update_records_attr_set_pack);
+
+/**
+ * Pack ref add update
+ *
+ * Pack ref add update into update records.
+ *
+ * \param[in] env execution environment
+ * \param[in] ops ur_ops in update records
+ * \param[in|out] op_count pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params ur_params in update records
+ * \param[in|out] param_count pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid FID of the object to add reference
+ *
+ * \retval 0 if packing succeeds.
+ * \retval negative errno if packing fails.
+ */
+int update_records_ref_add_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid)
+{
+ return update_records_update_pack(env, fid, OUT_REF_ADD, ops, op_count,
+ max_ops_size, params, param_count,
+ max_param_size, 0, NULL, NULL);
+}
+EXPORT_SYMBOL(update_records_ref_add_pack);
+
+/**
+ * Pack ref del update
+ *
+ * Pack ref del update into update records.
+ *
+ * \param[in] env execution environment
+ * \param[in] ops ur_ops in update records
+ * \param[in|out] op_count pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params ur_params in update records
+ * \param[in] param_count pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid FID of the object to delete reference
+ *
+ * \retval 0 if packing succeeds.
+ * \retval negative errno if packing fails.
+ */
+int update_records_ref_del_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid)
+{
+ return update_records_update_pack(env, fid, OUT_REF_DEL, ops, op_count,
+ max_ops_size, params, param_count,
+ max_param_size, 0, NULL, NULL);
+}
+EXPORT_SYMBOL(update_records_ref_del_pack);
+
+/**
+ * Pack object destroy update
+ *
+ * Pack object destroy update into update records.
+ *
+ * \param[in] env execution environment
+ * \param[in] ops ur_ops in update records
+ * \param[in|out] op_count pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params ur_params in update records
+ * \param[in|out] param_count pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid FID of the object to delete reference
+ *
+ * \retval 0 if packing succeeds.
+ * \retval negative errno if packing fails.
+ */
+int update_records_object_destroy_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid)
+{
+ return update_records_update_pack(env, fid, OUT_DESTROY, ops, op_count,
+ max_ops_size, params, param_count,
+ max_param_size, 0, NULL, NULL);
+}
+EXPORT_SYMBOL(update_records_object_destroy_pack);
+
+/**
+ * Pack index insert update
+ *
+ * Pack index insert update into update records.
+ *
+ * \param[in] env execution environment
+ * \param[in] ops ur_ops in update records
+ * \param[in] op_count pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params ur_params in update records
+ * \param[in] param_count pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid FID of the object to insert index
+ * \param[in] rec record of insertion
+ * \param[in] key key of insertion
+ *
+ * \retval 0 if packing succeeds.
+ * \retval negative errno if packing fails.
+ */
+int update_records_index_insert_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid,
+ const struct dt_rec *rec,
+ const struct dt_key *key)
+{
+ struct dt_insert_rec *rec1 = (struct dt_insert_rec *)rec;
+ struct lu_fid rec_fid;
+ __u32 type = cpu_to_le32(rec1->rec_type);
+ size_t sizes[3] = { strlen((const char *)key) + 1,
+ sizeof(rec_fid),
+ sizeof(type) };
+ const void *bufs[3] = { key,
+ &rec_fid,
+ &type };
+
+ fid_cpu_to_le(&rec_fid, rec1->rec_fid);
+
+ return update_records_update_pack(env, fid, OUT_INDEX_INSERT, ops,
+ op_count, max_ops_size, params,
+ param_count, max_param_size,
+ 3, bufs, sizes);
+}
+EXPORT_SYMBOL(update_records_index_insert_pack);
+
+/**
+ * Pack index delete update
+ *
+ * Pack index delete update into update records.
+ *
+ * \param[in] env execution environment
+ * \param[in] ops ur_ops in update records
+ * \param[in|out] op_count pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params ur_params in update records
+ * \param[in|ount] param_count pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid FID of the object to delete index
+ * \param[in] key key of deletion
+ *
+ * \retval 0 if packing succeeds.
+ * \retval negative errno if packing fails.
+ */
+int update_records_index_delete_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid,
+ const struct dt_key *key)
+{
+ size_t size = strlen((const char *)key) + 1;
+
+ return update_records_update_pack(env, fid, OUT_INDEX_DELETE, ops,
+ op_count, max_ops_size, params,
+ param_count, max_param_size,
+ 1, (const void **)&key, &size);
+}
+EXPORT_SYMBOL(update_records_index_delete_pack);
+
+/**
+ * Pack xattr set update
+ *
+ * Pack xattr set update into update records.
+ *
+ * \param[in] env execution environment
+ * \param[in] ops ur_ops in update records
+ * \param[in|out] op_count pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params ur_params in update records
+ * \param[in|out] param_count pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid FID of the object to set xattr
+ * \param[in] buf xattr to be set
+ * \param[in] name name of the xattr
+ * \param[in] flag flag for setting xattr
+ *
+ * \retval 0 if packing succeeds.
+ * \retval negative errno if packing fails.
+ */
+int update_records_xattr_set_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid,
+ const struct lu_buf *buf, const char *name,
+ __u32 flag)
+{
+ size_t sizes[3] = {strlen(name) + 1, buf->lb_len, sizeof(flag)};
+ const void *bufs[3] = {name, buf->lb_buf, &flag};
+
+ flag = cpu_to_le32(flag);
+
+ return update_records_update_pack(env, fid, OUT_XATTR_SET, ops,
+ op_count, max_ops_size, params,
+ param_count, max_param_size,
+ 3, bufs, sizes);
+}
+EXPORT_SYMBOL(update_records_xattr_set_pack);
+
+/**
+ * Pack xattr delete update
+ *
+ * Pack xattr delete update into update records.
+ *
+ * \param[in] env execution environment
+ * \param[in] ops ur_ops in update records
+ * \param[in|out] op_count pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params ur_params in update records
+ * \param[in|out] param_count pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid FID of the object to delete xattr
+ * \param[in] name name of the xattr
+ *
+ * \retval 0 if packing succeeds.
+ * \retval negative errno if packing fails.
+ */
+int update_records_xattr_del_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid,
+ const char *name)
+{
+ size_t size = strlen(name) + 1;
+
+ return update_records_update_pack(env, fid, OUT_XATTR_DEL, ops,
+ op_count, max_ops_size, params,
+ param_count, max_param_size,
+ 1, (const void **)&name, &size);
+}
+EXPORT_SYMBOL(update_records_xattr_del_pack);
+
+/**
+ * Pack write update
+ *
+ * Pack write update into update records.
+ *
+ * \param[in] env execution environment
+ * \param[in] ops ur_ops in update records
+ * \param[in|out] op_count pointer to the count of ops
+ * \param[in|out] max_op_size maximum size of the update
+ * \param[in] params ur_params in update records
+ * \param[in|out] param_count pointer to the count of params
+ * \param[in|out] max_param_size maximum size of the parameter
+ * \param[in] fid FID of the object to write into
+ * \param[in] buf buffer to write which includes an embedded size field
+ * \param[in] pos offet in the object to start writing at
+ *
+ * \retval 0 if packing succeeds.
+ * \retval negative errno if packing fails.
+ */
+int update_records_write_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid,
+ const struct lu_buf *buf,
+ __u64 pos)
+{
+ size_t sizes[2] = {buf->lb_len, sizeof(pos)};
+ const void *bufs[2] = {buf->lb_buf, &pos};
+
+ pos = cpu_to_le64(pos);
+
+ return update_records_update_pack(env, fid, OUT_XATTR_DEL, ops,
+ op_count, max_ops_size, params,
+ param_count, max_param_size,
+ 2, bufs, sizes);
+}
+EXPORT_SYMBOL(update_records_write_pack);
+
+/**
+ * Create update records in thandle_update_records
+ *
+ * Allocate update_records for thandle_update_records, the initial size
+ * will be 4KB.
+ *
+ * \param[in] tur thandle_update_records where update_records will be
+ * allocated
+ * \retval 0 if allocation succeeds.
+ * \retval negative errno if allocation fails.
+ */
+static int tur_update_records_create(struct thandle_update_records *tur)
+{
+ if (tur->tur_update_records != NULL)
+ return 0;
+
+ OBD_ALLOC_LARGE(tur->tur_update_records,
+ UPDATE_RECORDS_BUFFER_SIZE);
+
+ if (tur->tur_update_records == NULL)
+ return -ENOMEM;
+
+ tur->tur_update_records_buf_size = UPDATE_RECORDS_BUFFER_SIZE;
+
+ return 0;
+}
+
+/**
+ * Extend update records
+ *
+ * Extend update_records to the new size in thandle_update_records.
+ *
+ * \param[in] tur thandle_update_records where update_records will be
+ * extended.
+ * \retval 0 if extension succeeds.
+ * \retval negative errno if extension fails.
+ */
+int tur_update_records_extend(struct thandle_update_records *tur,
+ size_t new_size)
+{
+ struct llog_update_record *record;
+
+ OBD_ALLOC_LARGE(record, new_size);
+ if (record == NULL)
+ return -ENOMEM;
+
+ if (tur->tur_update_records != NULL) {
+ memcpy(record, tur->tur_update_records,
+ tur->tur_update_records_buf_size);
+ OBD_FREE_LARGE(tur->tur_update_records,
+ tur->tur_update_records_buf_size);
+ }
+
+ tur->tur_update_records = record;
+ tur->tur_update_records_buf_size = new_size;
+
+ return 0;
+}
+EXPORT_SYMBOL(tur_update_records_extend);
+
+/**
+ * Extend update records
+ *
+ * Extend update records in thandle to make sure it is able to hold
+ * the update with certain update_op and params size.
+ *
+ * \param [in] tur thandle_update_records to be extend
+ * \param [in] new_op_size update_op size of the update record
+ * \param [in] new_param_size params size of the update record
+ *
+ * \retval 0 if the update_records is being extended.
+ * \retval negative errno if the update_records is not being
+ * extended.
+ */
+int tur_update_extend(struct thandle_update_records *tur,
+ size_t new_op_size, size_t new_param_size)
+{
+ size_t record_size;
+ size_t params_size;
+ size_t extend_size;
+ int rc;
+ ENTRY;
+
+ record_size = llog_update_record_size(tur->tur_update_records);
+ /* extend update records buffer */
+ if (new_op_size > (tur->tur_update_records_buf_size - record_size -
+ sizeof(*tur->tur_update_records))) {
+ extend_size = round_up(new_op_size, UPDATE_RECORDS_BUFFER_SIZE);
+ rc = tur_update_records_extend(tur,
+ tur->tur_update_records_buf_size +
+ extend_size);
+ if (rc != 0)
+ RETURN(rc);
+ }
+
+ /* extend parameters buffer */
+ params_size = update_params_size(tur->tur_update_params,
+ tur->tur_update_param_count);
+ if (new_param_size > (tur->tur_update_params_buf_size -
+ params_size)) {
+ extend_size = round_up(new_param_size,
+ UPDATE_PARAMS_BUFFER_SIZE);
+ rc = tur_update_params_extend(tur,
+ tur->tur_update_params_buf_size +
+ extend_size);
+ if (rc != 0)
+ RETURN(rc);
+ }
+
+ RETURN(0);
+}
+EXPORT_SYMBOL(tur_update_extend);
+
+/**
+ * Create update params in thandle_update_records
+ *
+ * Allocate update_params for thandle_update_records, the initial size
+ * will be 4KB.
+ *
+ * \param[in] tur thandle_update_records where update_params will be
+ * allocated
+ * \retval 0 if allocation succeeds.
+ * \retval negative errno if allocation fails.
+ */
+static int tur_update_params_create(struct thandle_update_records *tur)
+{
+ if (tur->tur_update_params != NULL)
+ return 0;
+
+ OBD_ALLOC_LARGE(tur->tur_update_params, UPDATE_PARAMS_BUFFER_SIZE);
+ if (tur->tur_update_params == NULL)
+ return -ENOMEM;
+
+ tur->tur_update_params_buf_size = UPDATE_PARAMS_BUFFER_SIZE;
+ return 0;
+}
+
+/**
+ * Extend update params
+ *
+ * Extend update_params to the new size in thandle_update_records.
+ *
+ * \param[in] tur thandle_update_records where update_params will be
+ * extended.
+ * \retval 0 if extension succeeds.
+ * \retval negative errno if extension fails.
+ */
+int tur_update_params_extend(struct thandle_update_records *tur,
+ size_t new_size)
+{
+ struct update_params *params;
+
+ OBD_ALLOC_LARGE(params, new_size);
+ if (params == NULL)
+ return -ENOMEM;
+
+ if (tur->tur_update_params != NULL) {
+ memcpy(params, tur->tur_update_params,
+ tur->tur_update_params_buf_size);
+ OBD_FREE_LARGE(tur->tur_update_params,
+ tur->tur_update_params_buf_size);
+ }
+
+ tur->tur_update_params = params;
+ tur->tur_update_params_buf_size = new_size;
+
+ return 0;
+}
+EXPORT_SYMBOL(tur_update_params_extend);
+
+/**
+ * Check and prepare whether it needs to record update.
+ *
+ * Checks if the transaction needs to record updates, and if it
+ * does, then initialize the update record buffer in the transaction.
+ *
+ * \param[in] env execution environment
+ * \param[in] th transaction handle
+ *
+ * \retval 0 if updates recording succeeds.
+ * \retval negative errno if updates recording fails.
+ */
+int check_and_prepare_update_record(const struct lu_env *env,
+ struct thandle *th)
+{
+ struct thandle_update_records *tur;
+ struct llog_update_record *lur;
+ struct top_thandle *top_th;
+ struct sub_thandle *lst;
+ int rc;
+ bool record_update = false;
+ ENTRY;
+
+ top_th = container_of(th, struct top_thandle, tt_super);
+ /* Check if it needs to record updates for this transaction */
+ list_for_each_entry(lst, &top_th->tt_sub_thandle_list, st_sub_list) {
+ if (lst->st_record_update) {
+ record_update = true;
+ break;
+ }
+ }
+ if (!record_update)
+ RETURN(0);
+
+ if (top_th->tt_update_records != NULL)
+ RETURN(0);
+
+ tur = &update_env_info(env)->uti_tur;
+ if (tur->tur_update_records == NULL) {
+ rc = tur_update_records_create(tur);
+ if (rc < 0)
+ RETURN(rc);
+ }
+
+ if (tur->tur_update_params == NULL) {
+ rc = tur_update_params_create(tur);
+ if (rc < 0)
+ RETURN(rc);
+ }
+
+ lur = tur->tur_update_records;
+ lur->lur_update_rec.ur_update_count = 0;
+ lur->lur_update_rec.ur_param_count = 0;
+ lur->lur_update_rec.ur_master_transno = 0;
+ lur->lur_update_rec.ur_batchid = 0;
+ lur->lur_update_rec.ur_flags = 0;
+
+ tur->tur_update_param_count = 0;
+
+ top_th->tt_update_records = tur;
+
+ RETURN(0);
+}
+EXPORT_SYMBOL(check_and_prepare_update_record);
+
+/**
+ * Merge params into the update records
+ *
+ * Merge params and ops into the update records attached to top th.
+ * During transaction execution phase, parameters and update ops
+ * are collected in two different buffers (see lod_updates_pack()),
+ * then in transaction stop, it needs to be merged them in one
+ * buffer, then being written into the update log.
+ *
+ * \param[in] env execution environment
+ * \param[in] tur thandle update records whose updates and
+ * parameters are merged
+ *
+ * \retval 0 if merging succeeds.
+ * \retval negaitive errno if merging fails.
+ */
+int merge_params_updates_buf(const struct lu_env *env,
+ struct thandle_update_records *tur)
+{
+ struct llog_update_record *lur;
+ struct update_params *params;
+ size_t params_size;
+ size_t record_size;
+
+ if (tur->tur_update_records == NULL ||
+ tur->tur_update_params == NULL)
+ return 0;
+
+ lur = tur->tur_update_records;
+ /* Extends the update records buffer if needed */
+ params_size = update_params_size(tur->tur_update_params,
+ tur->tur_update_param_count);
+ record_size = llog_update_record_size(lur);
+ if (record_size + params_size > tur->tur_update_records_buf_size) {
+ int rc;
+
+ rc = tur_update_records_extend(tur, record_size + params_size);
+ if (rc < 0)
+ return rc;
+ lur = tur->tur_update_records;
+ }
+
+ params = update_records_get_params(&lur->lur_update_rec);
+ memcpy(params, tur->tur_update_params, params_size);
+ lur->lur_update_rec.ur_param_count = tur->tur_update_param_count;
+ return 0;
+}
+EXPORT_SYMBOL(merge_params_updates_buf);
+
+static void update_key_fini(const struct lu_context *ctx,
+ struct lu_context_key *key, void *data)
+{
+ struct update_thread_info *info = data;
+
+ if (info->uti_tur.tur_update_records != NULL)
+ OBD_FREE_LARGE(info->uti_tur.tur_update_records,
+ info->uti_tur.tur_update_records_buf_size);
+ if (info->uti_tur.tur_update_params != NULL)
+ OBD_FREE_LARGE(info->uti_tur.tur_update_params,
+ info->uti_tur.tur_update_params_buf_size);
+
+ OBD_FREE_PTR(info);
+}
+
+/* context key constructor/destructor: update_key_init, update_key_fini */
+LU_KEY_INIT(update, struct update_thread_info);
+/* context key: update_thread_key */
+LU_CONTEXT_KEY_DEFINE(update, LCT_MD_THREAD | LCT_MG_THREAD |
+ LCT_DT_THREAD | LCT_TX_HANDLE | LCT_LOCAL);
+EXPORT_SYMBOL(update_thread_key);
+LU_KEY_INIT_GENERIC(update);
+
+void update_info_init(void)
+{
+ update_key_init_generic(&update_thread_key, NULL);
+ lu_context_key_register(&update_thread_key);
+}
+
+void update_info_fini(void)
+{
+ lu_context_key_degister(&update_thread_key);
+}
#include <lustre_update.h>
#include <obd.h>
#include <obd_class.h>
+#include <tgt_internal.h>
/**
* Create the top transaction.
top_th->tt_magic = TOP_THANDLE_MAGIC;
top_th->tt_master_sub_thandle = child_th;
child_th->th_top = &top_th->tt_super;
- INIT_LIST_HEAD(&top_th->tt_sub_thandle_list);
+
+ top_th->tt_update_records = NULL;
top_th->tt_super.th_top = &top_th->tt_super;
+ INIT_LIST_HEAD(&top_th->tt_sub_thandle_list);
return &top_th->tt_super;
}
struct top_thandle *top_th = container_of(th, struct top_thandle,
tt_super);
struct sub_thandle *lst;
- int rc = 0;
+ int rc;
+
+ rc = check_and_prepare_update_record(env, th);
+ if (rc < 0)
+ return rc;
LASSERT(top_th->tt_magic == TOP_THANDLE_MAGIC);
list_for_each_entry(lst, &top_th->tt_sub_thandle_list, st_sub_list) {
struct sub_thandle *lst;
struct top_thandle *top_th = container_of(th, struct top_thandle,
tt_super);
+ struct thandle_update_records *tur = top_th->tt_update_records;
int rc;
ENTRY;
/* Note: we always need walk through all of sub_transaction to do
* transaction stop to release the resource here */
+ if (tur != NULL) {
+ rc = merge_params_updates_buf(env, tur);
+ if (rc == 0) {
+ struct update_records *record;
+
+ record = &tur->tur_update_records->lur_update_rec;
+ update_records_dump(record, D_INFO, false);
+ }
+ }
+
LASSERT(top_th->tt_magic == TOP_THANDLE_MAGIC);
top_th->tt_master_sub_thandle->th_local = th->th_local;
INIT_LIST_HEAD(&lst->st_sub_list);
lst->st_sub_th = sub_th;
list_add(&lst->st_sub_list, &top_th->tt_sub_thandle_list);
+ lst->st_record_update = 1;
RETURN(sub_th);
}