* GPL HEADER END
*/
/*
- * Copyright (c) 2013, 2014, Intel Corporation.
+ * Copyright (c) 2013, 2017, Intel Corporation.
*/
/*
* lustre/include/lustre_update.h
#ifndef _LUSTRE_UPDATE_H
#define _LUSTRE_UPDATE_H
-#include <lustre_net.h>
#include <dt_object.h>
+#include <lustre_net.h>
+#include <obj_update.h>
-#define OUT_UPDATE_INIT_BUFFER_SIZE 4096
-/* 16KB, the current biggest size is llog header(8KB) */
-#define OUT_UPDATE_REPLY_SIZE 16384
+#define OUT_UPDATE_REPLY_SIZE 4096
+#define OUT_BULK_BUFFER_SIZE 4096
struct dt_key;
struct dt_rec;
struct object_update_param;
-
-struct update_params {
- struct object_update_param up_params[0];
-};
+struct llog_update_record;
static inline size_t update_params_size(const struct update_params *params,
unsigned int param_count)
return param->oup_buf;
}
-struct update_op {
- struct lu_fid uop_fid;
- __u16 uop_type;
- __u16 uop_param_count;
- __u16 uop_params_off[0];
-};
-
static inline size_t
update_op_size(unsigned int param_count)
{
update_op_size(uop->uop_param_count));
}
-/* All of updates in the mulitple_update_record */
-struct update_ops {
- struct update_op uops_op[0];
-};
-
static inline size_t update_ops_size(const struct update_ops *ops,
unsigned int update_count)
{
return total_size;
}
-/*
- * This is the update record format used to store the updates in
- * disk. All updates of the operation will be stored in ur_ops.
- * All of parameters for updates of the operation will be stored
- * in ur_params.
- * To save the space of the record, parameters in ur_ops will only
- * remember their offset in ur_params, so to avoid storing duplicate
- * parameters in ur_params, which can help us save a lot space for
- * operation like creating striped directory.
- */
-struct update_records {
- __u64 ur_master_transno;
- __u64 ur_batchid;
- __u32 ur_flags;
- __u32 ur_param_count;
- __u32 ur_update_count;
- struct update_ops ur_ops;
- /* Note ur_ops has a variable size, so comment out
- * the following ur_params, in case some use it directly
- * update_records->ur_params
- *
- * struct update_params ur_params;
- */
-};
-
-struct llog_update_record {
- struct llog_rec_hdr lur_hdr;
- struct update_records lur_update_rec;
- /* Note ur_update_rec has a variable size, so comment out
- * the following ur_tail, in case someone use it directly
- *
- * struct llog_rec_tail lur_tail;
- */
-};
-
static inline struct update_params *
update_records_get_params(const struct update_records *record)
{
update_ops_size(&record->ur_ops, record->ur_update_count));
}
+static inline struct update_param *
+update_param_next_param(const struct update_param *param)
+{
+ return (struct update_param *)((char *)param +
+ object_update_param_size(
+ (struct object_update_param *)param));
+}
+
+static inline size_t
+__update_records_size(size_t raw_size)
+{
+ return round_up(offsetof(struct update_records, ur_ops) + raw_size, 8);
+}
+
static inline size_t
update_records_size(const struct update_records *record)
{
- struct update_params *params;
+ size_t op_size = 0;
+ size_t param_size = 0;
- params = update_records_get_params(record);
+ if (record->ur_update_count > 0)
+ op_size = update_ops_size(&record->ur_ops,
+ record->ur_update_count);
+ if (record->ur_param_count > 0) {
+ struct update_params *params;
- return cfs_size_round(offsetof(struct update_records, ur_ops) +
- update_ops_size(&record->ur_ops, record->ur_update_count) +
- update_params_size(params, record->ur_param_count));
+ params = update_records_get_params(record);
+ param_size = update_params_size(params, record->ur_param_count);
+ }
+
+ return __update_records_size(op_size + param_size);
+}
+
+static inline size_t
+__llog_update_record_size(size_t records_size)
+{
+ return round_up(sizeof(struct llog_rec_hdr) + records_size +
+ sizeof(struct llog_rec_tail), 8);
}
static inline size_t
llog_update_record_size(const struct llog_update_record *lur)
{
- return cfs_size_round(sizeof(lur->lur_hdr) +
- update_records_size(&lur->lur_update_rec) +
- sizeof(struct llog_rec_tail));
+ return __llog_update_record_size(
+ update_records_size(&lur->lur_update_rec));
}
static inline struct update_op *
*size = param->oup_len;
if (param->oup_len == 0)
- return NULL;
+ return ERR_PTR(-ENODATA);
return (void *)¶m->oup_buf[0];
}
}
static inline void
-object_update_reply_init(struct object_update_reply *reply, size_t count)
-{
- reply->ourp_magic = UPDATE_REPLY_MAGIC;
- reply->ourp_count = count;
-}
-
-static inline void
object_update_result_insert(struct object_update_reply *reply,
void *data, size_t data_len, size_t index,
int rc)
{
struct object_update_result *update_result;
- char *ptr;
update_result = object_update_result_get(reply, index, NULL);
- LASSERT(update_result != NULL);
+ LASSERT(update_result);
update_result->our_rc = ptlrpc_status_hton(rc);
- if (data_len > 0) {
- LASSERT(data != NULL);
- ptr = (char *)update_result +
- cfs_size_round(sizeof(struct object_update_reply));
+ if (rc >= 0) {
+ if (data_len > 0 && data)
+ memcpy(update_result->our_data, data, data_len);
update_result->our_datalen = data_len;
- memcpy(ptr, data, data_len);
}
- reply->ourp_lens[index] = cfs_size_round(data_len +
- sizeof(struct object_update_result));
+ reply->ourp_lens[index] = round_up(data_len +
+ sizeof(struct object_update_result),
+ 8);
}
static inline int
LASSERT(lbuf != NULL);
update_result = object_update_result_get(reply, index, &size);
if (update_result == NULL ||
- size < cfs_size_round(sizeof(struct object_update_reply)) ||
+ size < round_up(sizeof(struct object_update_reply), 8) ||
update_result->our_datalen > size)
RETURN(-EFAULT);
lbuf->lb_buf = update_result->our_data;
lbuf->lb_len = update_result->our_datalen;
- return 0;
+ return result;
}
/**
* distribution.
*/
struct thandle_update_records {
- /* All of updates for the cross-MDT operation. */
+ /* All of updates for the cross-MDT operation, vmalloc'd. */
struct llog_update_record *tur_update_records;
size_t tur_update_records_buf_size;
- /* All of parameters for the cross-MDT operation */
+ /* All of parameters for the cross-MDT operation, vmalloc'd */
struct update_params *tur_update_params;
unsigned int tur_update_param_count;
size_t tur_update_params_buf_size;
#define TOP_THANDLE_MAGIC 0x20140917
struct top_multiple_thandle {
struct dt_device *tmt_master_sub_dt;
- atomic_t tmt_refcount;
+ struct kref tmt_refcount;
/* Other sub transactions will be listed here. */
struct list_head tmt_sub_thandle_list;
+ spinlock_t tmt_sub_lock;
struct list_head tmt_commit_list;
/* All of update records will packed here */
__u64 tmt_batchid;
int tmt_result;
__u32 tmt_magic;
+ size_t tmt_record_size;
__u32 tmt_committed:1;
};
/* {top,sub}_thandle are used to manage distributed transactions which
* include updates on several nodes. A top_handle represents the
- * whole operation, and sub_thandle represents updates on each node. */
+ * whole operation, and sub_thandle represents updates on each node.
+ */
struct top_thandle {
struct thandle tt_super;
/* The master sub transaction. */
struct top_multiple_thandle *tt_multiple_thandle;
};
-/* Sub thandle is used to track multiple sub thandles under one parent
- * thandle */
+struct sub_thandle_cookie {
+ struct llog_cookie stc_cookie;
+ struct list_head stc_list;
+};
+
+/* Sub thandle used to track multiple sub thandles under one parent thandle */
struct sub_thandle {
struct thandle *st_sub_th;
struct dt_device *st_dt;
- struct llog_cookie st_cookie;
+ struct list_head st_cookie_list;
struct dt_txn_commit_cb st_commit_dcb;
struct dt_txn_commit_cb st_stop_dcb;
int st_result;
/* If this sub thandle is committed */
bool st_committed:1,
- st_stopped:1;
+ st_stopped:1,
+ st_started:1;
};
struct tx_arg;
/* target/out_lib.c */
int out_update_pack(const struct lu_env *env, struct object_update *update,
- size_t max_update_size, enum update_type op,
+ size_t *max_update_size, enum update_type op,
const struct lu_fid *fid, unsigned int params_count,
- __u16 *param_sizes, const void **param_bufs);
+ __u16 *param_sizes, const void **param_bufs,
+ __u32 reply_size);
int out_create_pack(const struct lu_env *env, struct object_update *update,
- size_t max_update_size, const struct lu_fid *fid,
+ size_t *max_update_size, const struct lu_fid *fid,
const struct lu_attr *attr, struct dt_allocation_hint *hint,
struct dt_object_format *dof);
-int out_object_destroy_pack(const struct lu_env *env,
- struct object_update *update,
- size_t max_update_size,
- const struct lu_fid *fid);
+int out_destroy_pack(const struct lu_env *env, struct object_update *update,
+ size_t *max_update_size, const struct lu_fid *fid);
int out_index_delete_pack(const struct lu_env *env,
- struct object_update *update, size_t max_update_size,
+ struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid, const struct dt_key *key);
int out_index_insert_pack(const struct lu_env *env,
- struct object_update *update, size_t max_update_size,
+ struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid, const struct dt_rec *rec,
const struct dt_key *key);
int out_xattr_set_pack(const struct lu_env *env,
- struct object_update *update, size_t max_update_size,
+ struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid, const struct lu_buf *buf,
const char *name, __u32 flag);
int out_xattr_del_pack(const struct lu_env *env,
- struct object_update *update, size_t max_update_size,
+ struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid, const char *name);
int out_attr_set_pack(const struct lu_env *env,
- struct object_update *update, size_t max_update_size,
+ struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid, const struct lu_attr *attr);
int out_ref_add_pack(const struct lu_env *env,
- struct object_update *update, size_t max_update_size,
+ struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid);
int out_ref_del_pack(const struct lu_env *env,
- struct object_update *update, size_t max_update_size,
+ struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid);
int out_write_pack(const struct lu_env *env,
- struct object_update *update, size_t max_update_size,
+ struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid, const struct lu_buf *buf,
__u64 pos);
int out_attr_get_pack(const struct lu_env *env,
- struct object_update *update, size_t max_update_size,
+ struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid);
int out_index_lookup_pack(const struct lu_env *env,
- struct object_update *update, size_t max_update_size,
+ struct object_update *update, size_t *max_update_size,
const struct lu_fid *fid, struct dt_rec *rec,
const struct dt_key *key);
int out_xattr_get_pack(const struct lu_env *env,
- struct object_update *update, size_t max_update_size,
- const struct lu_fid *fid, const char *name);
+ struct object_update *update, size_t *max_update_size,
+ const struct lu_fid *fid, const char *name,
+ const int bufsize);
+int out_xattr_list_pack(const struct lu_env *env, struct object_update *update,
+ size_t *max_update_size, const struct lu_fid *fid,
+ const int bufsize);
int out_read_pack(const struct lu_env *env, struct object_update *update,
- size_t max_update_length, const struct lu_fid *fid,
+ size_t *max_update_length, const struct lu_fid *fid,
size_t size, loff_t pos);
const char *update_op_str(__u16 opcode);
struct thandle *th);
int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev,
struct thandle *th);
-void top_multiple_thandle_destroy(struct top_multiple_thandle *tmt);
+void top_multiple_thandle_destroy(struct kref *kref);
static inline void top_multiple_thandle_get(struct top_multiple_thandle *tmt)
{
- atomic_inc(&tmt->tmt_refcount);
+ kref_get(&tmt->tmt_refcount);
}
static inline void top_multiple_thandle_put(struct top_multiple_thandle *tmt)
{
- if (atomic_dec_and_test(&tmt->tmt_refcount))
- top_multiple_thandle_destroy(tmt);
+ kref_put(&tmt->tmt_refcount, top_multiple_thandle_destroy);
}
struct sub_thandle *lookup_sub_thandle(struct top_multiple_thandle *tmt,
struct sub_thandle *st);
/* update_records.c */
+size_t update_records_create_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct lu_attr *attr,
+ const struct dt_allocation_hint *hint,
+ struct dt_object_format *dof);
+size_t update_records_attr_set_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct lu_attr *attr);
+size_t update_records_ref_add_size(const struct lu_env *env,
+ const struct lu_fid *fid);
+size_t update_records_ref_del_size(const struct lu_env *env,
+ const struct lu_fid *fid);
+size_t update_records_destroy_size(const struct lu_env *env,
+ const struct lu_fid *fid);
+size_t update_records_index_insert_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct dt_rec *rec,
+ const struct dt_key *key);
+size_t update_records_index_delete_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct dt_key *key);
+size_t update_records_xattr_set_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct lu_buf *buf,
+ const char *name,
+ __u32 flag);
+size_t update_records_xattr_del_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const char *name);
+size_t update_records_write_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ const struct lu_buf *buf,
+ __u64 pos);
+size_t update_records_punch_size(const struct lu_env *env,
+ const struct lu_fid *fid,
+ __u64 start, __u64 end);
+
int update_records_create_pack(const struct lu_env *env,
struct update_ops *ops,
unsigned int *op_count,
unsigned int *param_count,
size_t *max_param_size,
const struct lu_fid *fid);
-int update_records_object_destroy_pack(const struct lu_env *env,
- struct update_ops *ops,
- unsigned int *op_count,
- size_t *max_ops_size,
- struct update_params *params,
- unsigned int *param_count,
- size_t *max_param_size,
- const struct lu_fid *fid);
+int update_records_destroy_pack(const struct lu_env *env,
+ struct update_ops *ops, unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid);
int update_records_index_insert_pack(const struct lu_env *env,
struct update_ops *ops,
unsigned int *op_count,
size_t *max_param_size,
const struct lu_fid *fid,
__u64 start, __u64 end);
+int update_records_noop_pack(const struct lu_env *env,
+ struct update_ops *ops,
+ unsigned int *op_count,
+ size_t *max_ops_size,
+ struct update_params *params,
+ unsigned int *param_count,
+ size_t *max_param_size,
+ const struct lu_fid *fid);
int tur_update_records_extend(struct thandle_update_records *tur,
size_t new_size);
} \
ret; \
})
+
+#define update_record_size(env, name, th, ...) \
+({ \
+ struct top_thandle *top_th; \
+ struct top_multiple_thandle *tmt; \
+ \
+ top_th = container_of(th, struct top_thandle, tt_super); \
+ \
+ LASSERT(top_th->tt_multiple_thandle != NULL); \
+ tmt = top_th->tt_multiple_thandle; \
+ tmt->tmt_record_size += \
+ update_records_##name##_size(env, __VA_ARGS__); \
+})
#endif