* GPL HEADER END
*/
/*
- * Copyright (c) 2013, 2014, Intel Corporation.
+ * Copyright (c) 2013, 2016, Intel Corporation.
*/
/*
* lustre/include/lustre_update.h
#ifndef _LUSTRE_UPDATE_H
#define _LUSTRE_UPDATE_H
-#include <lustre_net.h>
#include <dt_object.h>
+#include <lustre_net.h>
+#include <obj_update.h>
#define OUT_UPDATE_INIT_BUFFER_SIZE 4096
-#define OUT_UPDATE_REPLY_SIZE 8192
+#define OUT_UPDATE_REPLY_SIZE 4096
+#define OUT_BULK_BUFFER_SIZE 4096
struct dt_key;
struct dt_rec;
struct object_update_param;
-
-struct update_buffer {
- struct object_update_request *ub_req;
- size_t ub_req_size;
-};
-
-/**
- * Tracking the updates being executed on this dt_device.
- */
-struct dt_update_request {
- struct dt_device *dur_dt;
- /* attached itself to thandle */
- int dur_flags;
- /* update request result */
- int dur_rc;
- /* Current batch(transaction) id */
- __u64 dur_batchid;
- /* Holding object updates */
- struct update_buffer dur_buf;
- struct list_head dur_cb_items;
-};
-
-struct update_params {
- struct object_update_param up_params[0];
-};
+struct llog_update_record;
static inline size_t update_params_size(const struct update_params *params,
unsigned int param_count)
return param;
}
-struct update_op {
- struct lu_fid uop_fid;
- __u16 uop_type;
- __u16 uop_param_count;
- __u16 uop_params_off[0];
-};
+static inline void*
+update_params_get_param_buf(const struct update_params *params, __u16 index,
+ unsigned int param_count, __u16 *size)
+{
+ struct object_update_param *param;
+
+ param = update_params_get_param(params, (unsigned int)index,
+ param_count);
+ if (param == NULL)
+ return NULL;
+
+ if (size != NULL)
+ *size = param->oup_len;
+
+ return param->oup_buf;
+}
static inline size_t
update_op_size(unsigned int param_count)
update_op_size(uop->uop_param_count));
}
-/* All of updates in the mulitple_update_record */
-struct update_ops {
- struct update_op uops_op[0];
-};
-
static inline size_t update_ops_size(const struct update_ops *ops,
unsigned int update_count)
{
return total_size;
}
-/*
- * This is the update record format used to store the updates in
- * disk. All updates of the operation will be stored in ur_ops.
- * All of parameters for updates of the operation will be stored
- * in ur_params.
- * To save the space of the record, parameters in ur_ops will only
- * remember their offset in ur_params, so to avoid storing duplicate
- * parameters in ur_params, which can help us save a lot space for
- * operation like creating striped directory.
- */
-struct update_records {
- __u64 ur_master_transno;
- __u64 ur_batchid;
- __u32 ur_flags;
- __u32 ur_param_count;
- __u32 ur_update_count;
- struct update_ops ur_ops;
- /* Note ur_ops has a variable size, so comment out
- * the following ur_params, in case some use it directly
- * update_records->ur_params
- *
- * struct update_params ur_params;
- */
-};
-
-struct llog_update_record {
- struct llog_rec_hdr lur_hdr;
- struct update_records lur_update_rec;
- /* Note ur_update_rec has a variable size, so comment out
- * the following ur_tail, in case someone use it directly
- *
- * struct llog_rec_tail lur_tail;
- */
-};
-
static inline struct update_params *
update_records_get_params(const struct update_records *record)
{
update_ops_size(&record->ur_ops, record->ur_update_count));
}
+static inline struct update_param *
+update_param_next_param(const struct update_param *param)
+{
+ return (struct update_param *)((char *)param +
+ object_update_param_size(
+ (struct object_update_param *)param));
+}
+
+static inline size_t
+__update_records_size(size_t raw_size)
+{
+ return cfs_size_round(offsetof(struct update_records, ur_ops) +
+ raw_size);
+}
+
static inline size_t
update_records_size(const struct update_records *record)
{
- struct update_params *params;
+ size_t op_size = 0;
+ size_t param_size = 0;
- params = update_records_get_params(record);
+ if (record->ur_update_count > 0)
+ op_size = update_ops_size(&record->ur_ops,
+ record->ur_update_count);
+ if (record->ur_param_count > 0) {
+ struct update_params *params;
- return cfs_size_round(offsetof(struct update_records, ur_ops) +
- update_ops_size(&record->ur_ops, record->ur_update_count) +
- update_params_size(params, record->ur_param_count));
+ params = update_records_get_params(record);
+ param_size = update_params_size(params, record->ur_param_count);
+ }
+
+ return __update_records_size(op_size + param_size);
}
static inline size_t
-llog_update_record_size(const struct llog_update_record *lur)
+__llog_update_record_size(size_t records_size)
{
- return cfs_size_round(sizeof(lur->lur_hdr) +
- update_records_size(&lur->lur_update_rec) +
+ return cfs_size_round(sizeof(struct llog_rec_hdr) + records_size +
sizeof(struct llog_rec_tail));
}
+static inline size_t
+llog_update_record_size(const struct llog_update_record *lur)
+{
+ return __llog_update_record_size(
+ update_records_size(&lur->lur_update_rec));
+}
+
static inline struct update_op *
update_ops_get_op(const struct update_ops *ops, unsigned int index,
unsigned int update_count)
*size = param->oup_len;
if (param->oup_len == 0)
- return NULL;
+ return ERR_PTR(-ENODATA);
return (void *)¶m->oup_buf[0];
}
}
static inline void
-object_update_reply_init(struct object_update_reply *reply, size_t count)
-{
- reply->ourp_magic = UPDATE_REPLY_MAGIC;
- reply->ourp_count = count;
-}
-
-static inline void
object_update_result_insert(struct object_update_reply *reply,
void *data, size_t data_len, size_t index,
int rc)
char *ptr;
update_result = object_update_result_get(reply, index, NULL);
- LASSERT(update_result != NULL);
+ LASSERT(update_result);
update_result->our_rc = ptlrpc_status_hton(rc);
- if (data_len > 0) {
- LASSERT(data != NULL);
- ptr = (char *)update_result +
+ if (rc >= 0) {
+ if (data_len > 0) {
+ LASSERT(data);
+
+ ptr = (char *)update_result +
cfs_size_round(sizeof(struct object_update_reply));
+ memcpy(ptr, data, data_len);
+ }
update_result->our_datalen = data_len;
- memcpy(ptr, data, data_len);
}
reply->ourp_lens[index] = cfs_size_round(data_len +
lbuf->lb_buf = update_result->our_data;
lbuf->lb_len = update_result->our_datalen;
- return 0;
+ return result;
}
/**
* distribution.
*/
struct thandle_update_records {
- /* All of updates for the cross-MDT operation. */
+ /* All of updates for the cross-MDT operation, vmalloc'd. */
struct llog_update_record *tur_update_records;
size_t tur_update_records_buf_size;
- /* All of parameters for the cross-MDT operation */
+ /* All of parameters for the cross-MDT operation, vmalloc'd */
struct update_params *tur_update_params;
unsigned int tur_update_param_count;
size_t tur_update_params_buf_size;
};
#define TOP_THANDLE_MAGIC 0x20140917
+struct top_multiple_thandle {
+ struct dt_device *tmt_master_sub_dt;
+ atomic_t tmt_refcount;
+ /* Other sub transactions will be listed here. */
+ struct list_head tmt_sub_thandle_list;
+ spinlock_t tmt_sub_lock;
+
+ struct list_head tmt_commit_list;
+ /* All of update records will packed here */
+ struct thandle_update_records *tmt_update_records;
+
+ wait_queue_head_t tmt_stop_waitq;
+ __u64 tmt_batchid;
+ int tmt_result;
+ __u32 tmt_magic;
+ size_t tmt_record_size;
+ __u32 tmt_committed:1;
+};
+
/* {top,sub}_thandle are used to manage distributed transactions which
* include updates on several nodes. A top_handle represents the
* whole operation, and sub_thandle represents updates on each node. */
struct top_thandle {
struct thandle tt_super;
- __u32 tt_magic;
- atomic_t tt_refcount;
/* The master sub transaction. */
struct thandle *tt_master_sub_thandle;
- /* Other sub thandle will be listed here. */
- struct list_head tt_sub_thandle_list;
+ struct top_multiple_thandle *tt_multiple_thandle;
+};
- /* All of update records will packed here */
- struct thandle_update_records *tt_update_records;
+struct sub_thandle_cookie {
+ struct llog_cookie stc_cookie;
+ struct list_head stc_list;
};
+/* Sub thandle is used to track multiple sub thandles under one parent
+ * thandle */
struct sub_thandle {
- /* point to the osd/osp_thandle */
struct thandle *st_sub_th;
+ struct dt_device *st_dt;
+ struct list_head st_cookie_list;
+ struct dt_txn_commit_cb st_commit_dcb;
+ struct dt_txn_commit_cb st_stop_dcb;
+ int st_result;
/* linked to top_thandle */
struct list_head st_sub_list;
/* If this sub thandle is committed */
bool st_committed:1,
- st_record_update:1;
+ st_stopped:1,
+ st_started:1;
};
+struct tx_arg;
+typedef int (*tx_exec_func_t)(const struct lu_env *env, struct thandle *th,
+ struct tx_arg *ta);
+
+/* Structure for holding one update execution */
+struct tx_arg {
+ tx_exec_func_t exec_fn;
+ tx_exec_func_t undo_fn;
+ struct dt_object *object;
+ const char *file;
+ struct object_update_reply *reply;
+ int line;
+ int index;
+ union {
+ struct {
+ struct dt_insert_rec rec;
+ const struct dt_key *key;
+ } insert;
+ struct {
+ } ref;
+ struct {
+ struct lu_attr attr;
+ } attr_set;
+ struct {
+ struct lu_buf buf;
+ const char *name;
+ int flags;
+ __u32 csum;
+ } xattr_set;
+ struct {
+ struct lu_attr attr;
+ struct dt_allocation_hint hint;
+ struct dt_object_format dof;
+ struct lu_fid fid;
+ } create;
+ struct {
+ struct lu_buf buf;
+ loff_t pos;
+ } write;
+ struct {
+ struct ost_body *body;
+ } destroy;
+ } u;
+};
+
+/* Structure for holding all update executations of one transaction */
+struct thandle_exec_args {
+ struct thandle *ta_handle;
+ int ta_argno; /* used args */
+ int ta_alloc_args; /* allocated args count */
+ struct tx_arg **ta_args;
+};
/* target/out_lib.c */
int out_update_pack(const struct lu_env *env, struct object_update *update,
- size_t max_update_size, enum update_type op,
+ size_t *max_update_size, enum update_type op,
const struct lu_fid *fid, unsigned int params_count,
- __u16 *param_sizes, const void **param_bufs);
+ __u16 *param_sizes, const void **param_bufs,
+ __u32 reply_size);
int out_create_pack(const struct lu_env *env, struct object_update *update,
- size_t max_update_size, const struct lu_fid *fid,
+ size_t *max_update_size, const struct lu_fid *fid,
const struct lu_attr *attr, struct dt_allocation_hint *hint,
struct dt_object_format *dof);
-int out_object_destroy_pack(const struct lu_env *env,
- struct object_update *update,
- size_t max_update_size,
- const struct lu_fid *fid);
+int out_destroy_pack(const struct lu_env *env, struct object_update *update,
+ size_t *max_update_size, const struct lu_fid *fid);
int out_index_delete_pack(const struct lu_env *env,
- struct object_update *update, size_t max_update_size,
+ struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid, const struct dt_key *key);
int out_index_insert_pack(const struct lu_env *env,
- struct object_update *update, size_t max_update_size,
+ struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid, const struct dt_rec *rec,
const struct dt_key *key);
int out_xattr_set_pack(const struct lu_env *env,
- struct object_update *update, size_t max_update_size,
+ struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid, const struct lu_buf *buf,
const char *name, __u32 flag);
int out_xattr_del_pack(const struct lu_env *env,
- struct object_update *update, size_t max_update_size,
+ struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid, const char *name);
int out_attr_set_pack(const struct lu_env *env,
- struct object_update *update, size_t max_update_size,
+ struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid, const struct lu_attr *attr);
int out_ref_add_pack(const struct lu_env *env,
- struct object_update *update, size_t max_update_size,
+ struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid);
int out_ref_del_pack(const struct lu_env *env,
- struct object_update *update, size_t max_update_size,
+ struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid);
int out_write_pack(const struct lu_env *env,
- struct object_update *update, size_t max_update_size,
+ struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid, const struct lu_buf *buf,
__u64 pos);
int out_attr_get_pack(const struct lu_env *env,
- struct object_update *update, size_t max_update_size,
+ struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid);
int out_index_lookup_pack(const struct lu_env *env,
- struct object_update *update, size_t max_update_size,
+ struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid, struct dt_rec *rec,
const struct dt_key *key);
int out_xattr_get_pack(const struct lu_env *env,
- struct object_update *update, size_t max_update_size,
- const struct lu_fid *fid, const char *name);
+ struct object_update *update, size_t *max_update_size,
+ const struct lu_fid *fid, const char *name,
+ const int bufsize);
+int out_read_pack(const struct lu_env *env, struct object_update *update,
+ size_t *max_update_length, const struct lu_fid *fid,
+ size_t size, loff_t pos);
const char *update_op_str(__u16 opcode);
struct thandle *
top_trans_create(const struct lu_env *env, struct dt_device *master_dev);
-
int top_trans_start(const struct lu_env *env, struct dt_device *master_dev,
struct thandle *th);
-
int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev,
struct thandle *th);
+void top_multiple_thandle_destroy(struct top_multiple_thandle *tmt);
+
+static inline void top_multiple_thandle_get(struct top_multiple_thandle *tmt)
+{
+ atomic_inc(&tmt->tmt_refcount);
+}
+
+static inline void top_multiple_thandle_put(struct top_multiple_thandle *tmt)
+{
+ if (atomic_dec_and_test(&tmt->tmt_refcount))
+ top_multiple_thandle_destroy(tmt);
+}
-void top_thandle_destroy(struct top_thandle *top_th);
+struct sub_thandle *lookup_sub_thandle(struct top_multiple_thandle *tmt,
+ struct dt_device *dt_dev);
+int sub_thandle_trans_create(const struct lu_env *env,
+ struct top_thandle *top_th,
+ struct sub_thandle *st);
/* update_records.c */
+size_t update_records_create_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct lu_attr *attr,
+ const struct dt_allocation_hint *hint,
+ struct dt_object_format *dof);
+size_t update_records_attr_set_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct lu_attr *attr);
+size_t update_records_ref_add_size(const struct lu_env *env,
+ const struct lu_fid *fid);
+size_t update_records_ref_del_size(const struct lu_env *env,
+ const struct lu_fid *fid);
+size_t update_records_destroy_size(const struct lu_env *env,
+ const struct lu_fid *fid);
+size_t update_records_index_insert_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct dt_rec *rec,
+ const struct dt_key *key);
+size_t update_records_index_delete_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct dt_key *key);
+size_t update_records_xattr_set_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct lu_buf *buf,
+ const char *name,
+ __u32 flag);
+size_t update_records_xattr_del_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const char *name);
+size_t update_records_write_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct lu_buf *buf,
+ __u64 pos);
+size_t update_records_punch_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ __u64 start, __u64 end);
+
int update_records_create_pack(const struct lu_env *env,
struct update_ops *ops,
unsigned int *op_count,
unsigned int *param_count,
size_t *max_param_size,
const struct lu_fid *fid);
-int update_records_object_destroy_pack(const struct lu_env *env,
- struct update_ops *ops,
- unsigned int *op_count,
- size_t *max_ops_size,
- struct update_params *params,
- unsigned int *param_count,
- size_t *max_param_size,
- const struct lu_fid *fid);
+int update_records_destroy_pack(const struct lu_env *env,
+ struct update_ops *ops, unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid);
int update_records_index_insert_pack(const struct lu_env *env,
struct update_ops *ops,
unsigned int *op_count,
size_t *max_param_size,
const struct lu_fid *fid,
__u64 start, __u64 end);
+int update_records_noop_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid);
int tur_update_records_extend(struct thandle_update_records *tur,
size_t new_size);
int tur_update_params_extend(struct thandle_update_records *tur,
size_t new_size);
-int check_and_prepare_update_record(const struct lu_env *env,
- struct thandle *th);
-int merge_params_updates_buf(const struct lu_env *env,
- struct thandle_update_records *tur);
int tur_update_extend(struct thandle_update_records *tur,
size_t new_op_size, size_t new_param_size);
#define update_record_pack(name, th, ...) \
({ \
struct top_thandle *top_th; \
+ struct top_multiple_thandle *tmt; \
struct thandle_update_records *tur; \
struct llog_update_record *lur; \
size_t avail_param_size; \
\
while (1) { \
top_th = container_of(th, struct top_thandle, tt_super);\
- tur = top_th->tt_update_records; \
+ tmt = top_th->tt_multiple_thandle; \
+ tur = tmt->tmt_update_records; \
lur = tur->tur_update_records; \
avail_param_size = tur->tur_update_params_buf_size - \
update_params_size(tur->tur_update_params, \
} \
ret; \
})
+
+#define update_record_size(env, name, th, ...) \
+({ \
+ struct top_thandle *top_th; \
+ struct top_multiple_thandle *tmt; \
+ \
+ top_th = container_of(th, struct top_thandle, tt_super); \
+ \
+ LASSERT(top_th->tt_multiple_thandle != NULL); \
+ tmt = top_th->tt_multiple_thandle; \
+ tmt->tmt_record_size += \
+ update_records_##name##_size(env, __VA_ARGS__); \
+})
#endif