* GPL HEADER END
*/
/*
- * Copyright (c) 2013, Intel Corporation.
+ * Copyright (c) 2013, 2016, Intel Corporation.
*/
/*
* lustre/include/lustre_update.h
#ifndef _LUSTRE_UPDATE_H
#define _LUSTRE_UPDATE_H
+#include <dt_object.h>
#include <lustre_net.h>
+#include <obj_update.h>
-#define OUT_UPDATE_INIT_BUFFER_SIZE 8192
-#define OUT_UPDATE_REPLY_SIZE 8192
-struct dt_update_request {
- struct dt_device *dur_dt;
- /* attached itself to thandle */
- struct list_head dur_list;
- int dur_flags;
- /* update request result */
- int dur_rc;
- /* Current batch(transaction) id */
- __u64 dur_batchid;
- /* Holding the update req */
- struct object_update_request *dur_req;
- int dur_req_len;
- struct list_head dur_cb_items;
-};
+#define OUT_UPDATE_INIT_BUFFER_SIZE 4096
+#define OUT_UPDATE_REPLY_SIZE 4096
+#define OUT_BULK_BUFFER_SIZE 4096
-static inline unsigned long
-object_update_param_size(const struct object_update_param *param)
+struct dt_key;
+struct dt_rec;
+struct object_update_param;
+struct llog_update_record;
+
+static inline size_t update_params_size(const struct update_params *params,
+ unsigned int param_count)
{
- return cfs_size_round(sizeof(*param) + param->oup_len);
+ struct object_update_param *param;
+ size_t total_size = sizeof(*params);
+ unsigned int i;
+
+ param = (struct object_update_param *)¶ms->up_params[0];
+ for (i = 0; i < param_count; i++) {
+ size_t size = object_update_param_size(param);
+
+ param = (struct object_update_param *)((char *)param + size);
+ total_size += size;
+ }
+
+ return total_size;
}
-static inline unsigned long
-object_update_size(const struct object_update *update)
+static inline struct object_update_param *
+update_params_get_param(const struct update_params *params,
+ unsigned int index, unsigned int param_count)
{
- const struct object_update_param *param;
- unsigned long size;
- size_t i;
+ struct object_update_param *param;
+ unsigned int i;
+
+ if (index > param_count)
+ return NULL;
+
+ param = (struct object_update_param *)¶ms->up_params[0];
+ for (i = 0; i < index; i++)
+ param = (struct object_update_param *)((char *)param +
+ object_update_param_size(param));
+
+ return param;
+}
+
+static inline void*
+update_params_get_param_buf(const struct update_params *params, __u16 index,
+ unsigned int param_count, __u16 *size)
+{
+ struct object_update_param *param;
+
+ param = update_params_get_param(params, (unsigned int)index,
+ param_count);
+ if (param == NULL)
+ return NULL;
- size = offsetof(struct object_update, ou_params[0]);
- for (i = 0; i < update->ou_params_count; i++) {
- param = (struct object_update_param *)((char *)update + size);
- size += object_update_param_size(param);
+ if (size != NULL)
+ *size = param->oup_len;
+
+ return param->oup_buf;
+}
+
+static inline size_t
+update_op_size(unsigned int param_count)
+{
+ return offsetof(struct update_op, uop_params_off[param_count]);
+}
+
+static inline struct update_op *
+update_op_next_op(const struct update_op *uop)
+{
+ return (struct update_op *)((char *)uop +
+ update_op_size(uop->uop_param_count));
+}
+
+static inline size_t update_ops_size(const struct update_ops *ops,
+ unsigned int update_count)
+{
+ struct update_op *op;
+ size_t total_size = sizeof(*ops);
+ unsigned int i;
+
+ op = (struct update_op *)&ops->uops_op[0];
+ for (i = 0; i < update_count; i++, op = update_op_next_op(op))
+ total_size += update_op_size(op->uop_param_count);
+
+ return total_size;
+}
+
+static inline struct update_params *
+update_records_get_params(const struct update_records *record)
+{
+ return (struct update_params *)((char *)record +
+ offsetof(struct update_records, ur_ops) +
+ update_ops_size(&record->ur_ops, record->ur_update_count));
+}
+
+static inline struct update_param *
+update_param_next_param(const struct update_param *param)
+{
+ return (struct update_param *)((char *)param +
+ object_update_param_size(
+ (struct object_update_param *)param));
+}
+
+static inline size_t
+__update_records_size(size_t raw_size)
+{
+ return cfs_size_round(offsetof(struct update_records, ur_ops) +
+ raw_size);
+}
+
+static inline size_t
+update_records_size(const struct update_records *record)
+{
+ size_t op_size = 0;
+ size_t param_size = 0;
+
+ if (record->ur_update_count > 0)
+ op_size = update_ops_size(&record->ur_ops,
+ record->ur_update_count);
+ if (record->ur_param_count > 0) {
+ struct update_params *params;
+
+ params = update_records_get_params(record);
+ param_size = update_params_size(params, record->ur_param_count);
}
- return size;
+ return __update_records_size(op_size + param_size);
+}
+
+static inline size_t
+__llog_update_record_size(size_t records_size)
+{
+ return cfs_size_round(sizeof(struct llog_rec_hdr) + records_size +
+ sizeof(struct llog_rec_tail));
+}
+
+static inline size_t
+llog_update_record_size(const struct llog_update_record *lur)
+{
+ return __llog_update_record_size(
+ update_records_size(&lur->lur_update_rec));
+}
+
+static inline struct update_op *
+update_ops_get_op(const struct update_ops *ops, unsigned int index,
+ unsigned int update_count)
+{
+ struct update_op *op;
+ unsigned int i;
+
+ if (index > update_count)
+ return NULL;
+
+ op = (struct update_op *)&ops->uops_op[0];
+ for (i = 0; i < index; i++)
+ op = update_op_next_op(op);
+
+ return op;
}
static inline void
size_t i;
if (index >= update->ou_params_count)
- return NULL;
+ return ERR_PTR(-EINVAL);
param = &update->ou_params[0];
for (i = 0; i < index; i++)
*size = param->oup_len;
if (param->oup_len == 0)
- return NULL;
+ return ERR_PTR(-ENODATA);
return (void *)¶m->oup_buf[0];
}
return size;
}
-static inline struct object_update
-*object_update_request_get(const struct object_update_request *our,
- size_t index, size_t *size)
-{
- void *ptr;
- size_t i;
-
- if (index >= our->ourq_count)
- return NULL;
-
- ptr = (char *)our + offsetof(struct object_update_request,
- ourq_updates[0]);
- for (i = 0; i < index; i++)
- ptr += object_update_size((struct object_update *)ptr);
-
- if (size != NULL)
- *size = object_update_size((struct object_update *)ptr);
-
- return ptr;
-}
-
-static inline void
-object_update_reply_init(struct object_update_reply *reply, size_t count)
-{
- reply->ourp_magic = UPDATE_REPLY_MAGIC;
- reply->ourp_count = count;
-}
-
-static inline struct object_update_result
-*object_update_result_get(const struct object_update_reply *reply,
- size_t index, size_t *size)
-{
- char *ptr;
- size_t count = reply->ourp_count;
- size_t i;
-
- if (index >= count)
- return NULL;
-
- ptr = (char *)reply +
- cfs_size_round(offsetof(struct object_update_reply,
- ourp_lens[count]));
- for (i = 0; i < index; i++) {
- LASSERT(reply->ourp_lens[i] > 0);
- ptr += cfs_size_round(reply->ourp_lens[i]);
- }
-
- if (size != NULL)
- *size = reply->ourp_lens[index];
-
- return (struct object_update_result *)ptr;
-}
-
static inline void
object_update_result_insert(struct object_update_reply *reply,
void *data, size_t data_len, size_t index,
char *ptr;
update_result = object_update_result_get(reply, index, NULL);
- LASSERT(update_result != NULL);
+ LASSERT(update_result);
update_result->our_rc = ptlrpc_status_hton(rc);
- if (data_len > 0) {
- LASSERT(data != NULL);
- ptr = (char *)update_result +
+ if (rc >= 0) {
+ if (data_len > 0) {
+ LASSERT(data);
+
+ ptr = (char *)update_result +
cfs_size_round(sizeof(struct object_update_reply));
+ memcpy(ptr, data, data_len);
+ }
update_result->our_datalen = data_len;
- memcpy(ptr, data, data_len);
}
reply->ourp_lens[index] = cfs_size_round(data_len +
lbuf->lb_buf = update_result->our_data;
lbuf->lb_len = update_result->our_datalen;
- return 0;
+ return result;
+}
+
+/**
+ * Attached in the thandle to record the updates for distribute
+ * distribution.
+ */
+struct thandle_update_records {
+ /* All of updates for the cross-MDT operation, vmalloc'd. */
+ struct llog_update_record *tur_update_records;
+ size_t tur_update_records_buf_size;
+
+ /* All of parameters for the cross-MDT operation, vmalloc'd */
+ struct update_params *tur_update_params;
+ unsigned int tur_update_param_count;
+ size_t tur_update_params_buf_size;
+};
+
+#define TOP_THANDLE_MAGIC 0x20140917
+struct top_multiple_thandle {
+ struct dt_device *tmt_master_sub_dt;
+ atomic_t tmt_refcount;
+ /* Other sub transactions will be listed here. */
+ struct list_head tmt_sub_thandle_list;
+ spinlock_t tmt_sub_lock;
+
+ struct list_head tmt_commit_list;
+ /* All of update records will packed here */
+ struct thandle_update_records *tmt_update_records;
+
+ wait_queue_head_t tmt_stop_waitq;
+ __u64 tmt_batchid;
+ int tmt_result;
+ __u32 tmt_magic;
+ size_t tmt_record_size;
+ __u32 tmt_committed:1;
+};
+
+/* {top,sub}_thandle are used to manage distributed transactions which
+ * include updates on several nodes. A top_handle represents the
+ * whole operation, and sub_thandle represents updates on each node. */
+struct top_thandle {
+ struct thandle tt_super;
+ /* The master sub transaction. */
+ struct thandle *tt_master_sub_thandle;
+
+ struct top_multiple_thandle *tt_multiple_thandle;
+};
+
+struct sub_thandle_cookie {
+ struct llog_cookie stc_cookie;
+ struct list_head stc_list;
+};
+
+/* Sub thandle is used to track multiple sub thandles under one parent
+ * thandle */
+struct sub_thandle {
+ struct thandle *st_sub_th;
+ struct dt_device *st_dt;
+ struct list_head st_cookie_list;
+ struct dt_txn_commit_cb st_commit_dcb;
+ struct dt_txn_commit_cb st_stop_dcb;
+ int st_result;
+
+ /* linked to top_thandle */
+ struct list_head st_sub_list;
+
+ /* If this sub thandle is committed */
+ bool st_committed:1,
+ st_stopped:1,
+ st_started:1;
+};
+
+struct tx_arg;
+typedef int (*tx_exec_func_t)(const struct lu_env *env, struct thandle *th,
+ struct tx_arg *ta);
+
+/* Structure for holding one update execution */
+struct tx_arg {
+ tx_exec_func_t exec_fn;
+ tx_exec_func_t undo_fn;
+ struct dt_object *object;
+ const char *file;
+ struct object_update_reply *reply;
+ int line;
+ int index;
+ union {
+ struct {
+ struct dt_insert_rec rec;
+ const struct dt_key *key;
+ } insert;
+ struct {
+ } ref;
+ struct {
+ struct lu_attr attr;
+ } attr_set;
+ struct {
+ struct lu_buf buf;
+ const char *name;
+ int flags;
+ __u32 csum;
+ } xattr_set;
+ struct {
+ struct lu_attr attr;
+ struct dt_allocation_hint hint;
+ struct dt_object_format dof;
+ struct lu_fid fid;
+ } create;
+ struct {
+ struct lu_buf buf;
+ loff_t pos;
+ } write;
+ struct {
+ struct ost_body *body;
+ } destroy;
+ } u;
+};
+
+/* Structure for holding all update executations of one transaction */
+struct thandle_exec_args {
+ struct thandle *ta_handle;
+ int ta_argno; /* used args */
+ int ta_alloc_args; /* allocated args count */
+ struct tx_arg **ta_args;
+};
+
+/* target/out_lib.c */
+int out_update_pack(const struct lu_env *env, struct object_update *update,
+ size_t *max_update_size, enum update_type op,
+ const struct lu_fid *fid, unsigned int params_count,
+ __u16 *param_sizes, const void **param_bufs,
+ __u32 reply_size);
+int out_create_pack(const struct lu_env *env, struct object_update *update,
+ size_t *max_update_size, const struct lu_fid *fid,
+ const struct lu_attr *attr, struct dt_allocation_hint *hint,
+ struct dt_object_format *dof);
+int out_destroy_pack(const struct lu_env *env, struct object_update *update,
+ size_t *max_update_size, const struct lu_fid *fid);
+int out_index_delete_pack(const struct lu_env *env,
+ struct object_update *update, size_t *max_update_size,
+ const struct lu_fid *fid, const struct dt_key *key);
+int out_index_insert_pack(const struct lu_env *env,
+ struct object_update *update, size_t *max_update_size,
+ const struct lu_fid *fid, const struct dt_rec *rec,
+ const struct dt_key *key);
+int out_xattr_set_pack(const struct lu_env *env,
+ struct object_update *update, size_t *max_update_size,
+ const struct lu_fid *fid, const struct lu_buf *buf,
+ const char *name, __u32 flag);
+int out_xattr_del_pack(const struct lu_env *env,
+ struct object_update *update, size_t *max_update_size,
+ const struct lu_fid *fid, const char *name);
+int out_attr_set_pack(const struct lu_env *env,
+ struct object_update *update, size_t *max_update_size,
+ const struct lu_fid *fid, const struct lu_attr *attr);
+int out_ref_add_pack(const struct lu_env *env,
+ struct object_update *update, size_t *max_update_size,
+ const struct lu_fid *fid);
+int out_ref_del_pack(const struct lu_env *env,
+ struct object_update *update, size_t *max_update_size,
+ const struct lu_fid *fid);
+int out_write_pack(const struct lu_env *env,
+ struct object_update *update, size_t *max_update_size,
+ const struct lu_fid *fid, const struct lu_buf *buf,
+ __u64 pos);
+int out_attr_get_pack(const struct lu_env *env,
+ struct object_update *update, size_t *max_update_size,
+ const struct lu_fid *fid);
+int out_index_lookup_pack(const struct lu_env *env,
+ struct object_update *update, size_t *max_update_size,
+ const struct lu_fid *fid, struct dt_rec *rec,
+ const struct dt_key *key);
+int out_xattr_get_pack(const struct lu_env *env,
+ struct object_update *update, size_t *max_update_size,
+ const struct lu_fid *fid, const char *name,
+ const int bufsize);
+int out_read_pack(const struct lu_env *env, struct object_update *update,
+ size_t *max_update_length, const struct lu_fid *fid,
+ size_t size, loff_t pos);
+
+const char *update_op_str(__u16 opcode);
+
+/* target/update_trans.c */
+struct thandle *thandle_get_sub_by_dt(const struct lu_env *env,
+ struct thandle *th,
+ struct dt_device *sub_dt);
+
+static inline struct thandle *
+thandle_get_sub(const struct lu_env *env, struct thandle *th,
+ const struct dt_object *sub_obj)
+{
+ return thandle_get_sub_by_dt(env, th, lu2dt_dev(sub_obj->do_lu.lo_dev));
+}
+
+struct thandle *
+top_trans_create(const struct lu_env *env, struct dt_device *master_dev);
+int top_trans_start(const struct lu_env *env, struct dt_device *master_dev,
+ struct thandle *th);
+int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev,
+ struct thandle *th);
+void top_multiple_thandle_destroy(struct top_multiple_thandle *tmt);
+
+static inline void top_multiple_thandle_get(struct top_multiple_thandle *tmt)
+{
+ atomic_inc(&tmt->tmt_refcount);
}
-static inline void update_inc_batchid(struct dt_update_request *update)
+static inline void top_multiple_thandle_put(struct top_multiple_thandle *tmt)
{
- update->dur_batchid++;
+ if (atomic_dec_and_test(&tmt->tmt_refcount))
+ top_multiple_thandle_destroy(tmt);
}
+struct sub_thandle *lookup_sub_thandle(struct top_multiple_thandle *tmt,
+ struct dt_device *dt_dev);
+int sub_thandle_trans_create(const struct lu_env *env,
+ struct top_thandle *top_th,
+ struct sub_thandle *st);
+
+/* update_records.c */
+size_t update_records_create_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct lu_attr *attr,
+ const struct dt_allocation_hint *hint,
+ struct dt_object_format *dof);
+size_t update_records_attr_set_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct lu_attr *attr);
+size_t update_records_ref_add_size(const struct lu_env *env,
+ const struct lu_fid *fid);
+size_t update_records_ref_del_size(const struct lu_env *env,
+ const struct lu_fid *fid);
+size_t update_records_destroy_size(const struct lu_env *env,
+ const struct lu_fid *fid);
+size_t update_records_index_insert_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct dt_rec *rec,
+ const struct dt_key *key);
+size_t update_records_index_delete_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct dt_key *key);
+size_t update_records_xattr_set_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct lu_buf *buf,
+ const char *name,
+ __u32 flag);
+size_t update_records_xattr_del_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const char *name);
+size_t update_records_write_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct lu_buf *buf,
+ __u64 pos);
+size_t update_records_punch_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ __u64 start, __u64 end);
+
+int update_records_create_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid,
+ const struct lu_attr *attr,
+ const struct dt_allocation_hint *hint,
+ struct dt_object_format *dof);
+int update_records_attr_set_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid,
+ const struct lu_attr *attr);
+int update_records_ref_add_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid);
+int update_records_ref_del_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid);
+int update_records_destroy_pack(const struct lu_env *env,
+ struct update_ops *ops, unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid);
+int update_records_index_insert_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid,
+ const struct dt_rec *rec,
+ const struct dt_key *key);
+int update_records_index_delete_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid,
+ const struct dt_key *key);
+int update_records_xattr_set_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid,
+ const struct lu_buf *buf, const char *name,
+ __u32 flag);
+int update_records_xattr_del_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid,
+ const char *name);
+int update_records_write_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid,
+ const struct lu_buf *buf,
+ __u64 pos);
+int update_records_punch_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid,
+ __u64 start, __u64 end);
+int update_records_noop_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid);
+
+int tur_update_records_extend(struct thandle_update_records *tur,
+ size_t new_size);
+int tur_update_params_extend(struct thandle_update_records *tur,
+ size_t new_size);
+int tur_update_extend(struct thandle_update_records *tur,
+ size_t new_op_size, size_t new_param_size);
+
+#define update_record_pack(name, th, ...) \
+({ \
+ struct top_thandle *top_th; \
+ struct top_multiple_thandle *tmt; \
+ struct thandle_update_records *tur; \
+ struct llog_update_record *lur; \
+ size_t avail_param_size; \
+ size_t avail_op_size; \
+ int ret; \
+ \
+ while (1) { \
+ top_th = container_of(th, struct top_thandle, tt_super);\
+ tmt = top_th->tt_multiple_thandle; \
+ tur = tmt->tmt_update_records; \
+ lur = tur->tur_update_records; \
+ avail_param_size = tur->tur_update_params_buf_size - \
+ update_params_size(tur->tur_update_params, \
+ tur->tur_update_param_count); \
+ avail_op_size = tur->tur_update_records_buf_size - \
+ llog_update_record_size(lur); \
+ ret = update_records_##name##_pack(env, \
+ &lur->lur_update_rec.ur_ops, \
+ &lur->lur_update_rec.ur_update_count, \
+ &avail_op_size, \
+ tur->tur_update_params, \
+ &tur->tur_update_param_count, \
+ &avail_param_size, __VA_ARGS__); \
+ if (ret == -E2BIG) { \
+ ret = tur_update_extend(tur, avail_op_size, \
+ avail_param_size); \
+ if (ret != 0) \
+ break; \
+ continue; \
+ } else { \
+ break; \
+ } \
+ } \
+ ret; \
+})
+
+#define update_record_size(env, name, th, ...) \
+({ \
+ struct top_thandle *top_th; \
+ struct top_multiple_thandle *tmt; \
+ \
+ top_th = container_of(th, struct top_thandle, tt_super); \
+ \
+ LASSERT(top_th->tt_multiple_thandle != NULL); \
+ tmt = top_th->tt_multiple_thandle; \
+ tmt->tmt_record_size += \
+ update_records_##name##_size(env, __VA_ARGS__); \
+})
#endif