Whamcloud - gitweb
LU-13124 scrub: check for multiple linked file
[fs/lustre-release.git] / lustre / include / lustre_update.h
index 4cb87e0..78cd3d4 100644 (file)
@@ -20,7 +20,7 @@
  * GPL HEADER END
  */
 /*
- * Copyright (c) 2013, 2014, Intel Corporation.
+ * Copyright (c) 2013, 2017, Intel Corporation.
  */
 /*
  * lustre/include/lustre_update.h
 
 #ifndef _LUSTRE_UPDATE_H
 #define _LUSTRE_UPDATE_H
-#include <lustre_net.h>
 #include <dt_object.h>
+#include <lustre_net.h>
+#include <obj_update.h>
 
-#define OUT_UPDATE_INIT_BUFFER_SIZE    4096
-/* 16KB, the current biggest size is llog header(8KB) */
-#define OUT_UPDATE_REPLY_SIZE          16384
+#define OUT_UPDATE_REPLY_SIZE          4096
+#define OUT_BULK_BUFFER_SIZE           4096
 
 struct dt_key;
 struct dt_rec;
 struct object_update_param;
-
-struct update_buffer {
-       struct object_update_request    *ub_req;
-       size_t                          ub_req_size;
-};
-
-/**
- * Tracking the updates being executed on this dt_device.
- */
-struct dt_update_request {
-       struct dt_device                *dur_dt;
-       /* attached itself to thandle */
-       int                             dur_flags;
-       /* update request result */
-       int                             dur_rc;
-       /* Current batch(transaction) id */
-       __u64                           dur_batchid;
-       /* Holding object updates */
-       struct update_buffer            dur_buf;
-       struct list_head                dur_cb_items;
-};
-
-struct update_params {
-       struct object_update_param      up_params[0];
-};
+struct llog_update_record;
 
 static inline size_t update_params_size(const struct update_params *params,
                                        unsigned int param_count)
@@ -102,12 +78,22 @@ update_params_get_param(const struct update_params *params,
        return param;
 }
 
-struct update_op {
-       struct lu_fid uop_fid;
-       __u16   uop_type;
-       __u16   uop_param_count;
-       __u16   uop_params_off[0];
-};
+static inline void*
+update_params_get_param_buf(const struct update_params *params, __u16 index,
+                           unsigned int param_count, __u16 *size)
+{
+       struct object_update_param *param;
+
+       param = update_params_get_param(params, (unsigned int)index,
+                                       param_count);
+       if (param == NULL)
+               return NULL;
+
+       if (size != NULL)
+               *size = param->oup_len;
+
+       return param->oup_buf;
+}
 
 static inline size_t
 update_op_size(unsigned int param_count)
@@ -122,11 +108,6 @@ update_op_next_op(const struct update_op *uop)
                                update_op_size(uop->uop_param_count));
 }
 
-/* All of updates in the mulitple_update_record */
-struct update_ops {
-       struct update_op        uops_op[0];
-};
-
 static inline size_t update_ops_size(const struct update_ops *ops,
                                     unsigned int update_count)
 {
@@ -141,41 +122,6 @@ static inline size_t update_ops_size(const struct update_ops *ops,
        return total_size;
 }
 
-/*
- * This is the update record format used to store the updates in
- * disk. All updates of the operation will be stored in ur_ops.
- * All of parameters for updates of the operation will be stored
- * in ur_params.
- * To save the space of the record, parameters in ur_ops will only
- * remember their offset in ur_params, so to avoid storing duplicate
- * parameters in ur_params, which can help us save a lot space for
- * operation like creating striped directory.
- */
-struct update_records {
-       __u64                   ur_master_transno;
-       __u64                   ur_batchid;
-       __u32                   ur_flags;
-       __u32                   ur_param_count;
-       __u32                   ur_update_count;
-       struct update_ops       ur_ops;
-        /* Note ur_ops has a variable size, so comment out
-         * the following ur_params, in case some use it directly
-         * update_records->ur_params
-         *
-         * struct update_params        ur_params;
-         */
-};
-
-struct llog_update_record {
-       struct llog_rec_hdr     lur_hdr;
-       struct update_records   lur_update_rec;
-       /* Note ur_update_rec has a variable size, so comment out
-        * the following ur_tail, in case someone use it directly
-        *
-        * struct llog_rec_tail lur_tail;
-        */
-};
-
 static inline struct update_params *
 update_records_get_params(const struct update_records *record)
 {
@@ -184,26 +130,54 @@ update_records_get_params(const struct update_records *record)
                update_ops_size(&record->ur_ops, record->ur_update_count));
 }
 
+static inline struct update_param *
+update_param_next_param(const struct update_param *param)
+{
+       return (struct update_param *)((char *)param +
+                                      object_update_param_size(
+                                         (struct object_update_param *)param));
+}
+
+static inline size_t
+__update_records_size(size_t raw_size)
+{
+       return cfs_size_round(offsetof(struct update_records, ur_ops) +
+                             raw_size);
+}
+
 static inline size_t
 update_records_size(const struct update_records *record)
 {
-       struct update_params *params;
+       size_t op_size = 0;
+       size_t param_size = 0;
 
-       params = update_records_get_params(record);
+       if (record->ur_update_count > 0)
+               op_size = update_ops_size(&record->ur_ops,
+                                         record->ur_update_count);
+       if (record->ur_param_count > 0) {
+               struct update_params *params;
 
-       return cfs_size_round(offsetof(struct update_records, ur_ops) +
-              update_ops_size(&record->ur_ops, record->ur_update_count) +
-              update_params_size(params, record->ur_param_count));
+               params = update_records_get_params(record);
+               param_size = update_params_size(params, record->ur_param_count);
+       }
+
+       return __update_records_size(op_size + param_size);
 }
 
 static inline size_t
-llog_update_record_size(const struct llog_update_record *lur)
+__llog_update_record_size(size_t records_size)
 {
-       return cfs_size_round(sizeof(lur->lur_hdr) +
-                             update_records_size(&lur->lur_update_rec) +
+       return cfs_size_round(sizeof(struct llog_rec_hdr) + records_size +
                              sizeof(struct llog_rec_tail));
 }
 
+static inline size_t
+llog_update_record_size(const struct llog_update_record *lur)
+{
+       return __llog_update_record_size(
+                       update_records_size(&lur->lur_update_rec));
+}
+
 static inline struct update_op *
 update_ops_get_op(const struct update_ops *ops, unsigned int index,
                  unsigned int update_count)
@@ -240,7 +214,7 @@ static inline void
                *size = param->oup_len;
 
        if (param->oup_len == 0)
-               return NULL;
+               return ERR_PTR(-ENODATA);
 
        return (void *)&param->oup_buf[0];
 }
@@ -262,30 +236,20 @@ object_update_request_size(const struct object_update_request *our)
 }
 
 static inline void
-object_update_reply_init(struct object_update_reply *reply, size_t count)
-{
-       reply->ourp_magic = UPDATE_REPLY_MAGIC;
-       reply->ourp_count = count;
-}
-
-static inline void
 object_update_result_insert(struct object_update_reply *reply,
                            void *data, size_t data_len, size_t index,
                            int rc)
 {
        struct object_update_result *update_result;
-       char *ptr;
 
        update_result = object_update_result_get(reply, index, NULL);
-       LASSERT(update_result != NULL);
+       LASSERT(update_result);
 
        update_result->our_rc = ptlrpc_status_hton(rc);
-       if (data_len > 0) {
-               LASSERT(data != NULL);
-               ptr = (char *)update_result +
-                       cfs_size_round(sizeof(struct object_update_reply));
+       if (rc >= 0) {
+               if (data_len > 0 && data)
+                       memcpy(update_result->our_data, data, data_len);
                update_result->our_datalen = data_len;
-               memcpy(ptr, data, data_len);
        }
 
        reply->ourp_lens[index] = cfs_size_round(data_len +
@@ -314,7 +278,7 @@ object_update_result_data_get(const struct object_update_reply *reply,
        lbuf->lb_buf = update_result->our_data;
        lbuf->lb_len = update_result->our_datalen;
 
-       return 0;
+       return result;
 }
 
 /**
@@ -322,99 +286,179 @@ object_update_result_data_get(const struct object_update_reply *reply,
  * distribution.
  */
 struct thandle_update_records {
-       /* All of updates for the cross-MDT operation. */
+       /* All of updates for the cross-MDT operation, vmalloc'd. */
        struct llog_update_record       *tur_update_records;
        size_t                          tur_update_records_buf_size;
 
-       /* All of parameters for the cross-MDT operation */
+       /* All of parameters for the cross-MDT operation, vmalloc'd */
        struct update_params    *tur_update_params;
        unsigned int            tur_update_param_count;
        size_t                  tur_update_params_buf_size;
 };
 
 #define TOP_THANDLE_MAGIC      0x20140917
+struct top_multiple_thandle {
+       struct dt_device        *tmt_master_sub_dt;
+       atomic_t                tmt_refcount;
+       /* Other sub transactions will be listed here. */
+       struct list_head        tmt_sub_thandle_list;
+       spinlock_t              tmt_sub_lock;
+
+       struct list_head        tmt_commit_list;
+       /* All of update records will packed here */
+       struct thandle_update_records *tmt_update_records;
+
+       wait_queue_head_t       tmt_stop_waitq;
+       __u64                   tmt_batchid;
+       int                     tmt_result;
+       __u32                   tmt_magic;
+       size_t                  tmt_record_size;
+       __u32                   tmt_committed:1;
+};
+
 /* {top,sub}_thandle are used to manage distributed transactions which
  * include updates on several nodes. A top_handle represents the
  * whole operation, and sub_thandle represents updates on each node. */
 struct top_thandle {
        struct thandle          tt_super;
-       __u32                   tt_magic;
-       atomic_t                tt_refcount;
        /* The master sub transaction. */
        struct thandle          *tt_master_sub_thandle;
 
-       /* Other sub thandle will be listed here. */
-       struct list_head        tt_sub_thandle_list;
+       struct top_multiple_thandle *tt_multiple_thandle;
+};
 
-       /* All of update records will packed here */
-       struct thandle_update_records *tt_update_records;
+struct sub_thandle_cookie {
+       struct llog_cookie      stc_cookie;
+       struct list_head        stc_list;
 };
 
+/* Sub thandle is used to track multiple sub thandles under one parent
+ * thandle */
 struct sub_thandle {
-       /* point to the osd/osp_thandle */
        struct thandle          *st_sub_th;
+       struct dt_device        *st_dt;
+       struct list_head        st_cookie_list;
+       struct dt_txn_commit_cb st_commit_dcb;
+       struct dt_txn_commit_cb st_stop_dcb;
+       int                     st_result;
 
        /* linked to top_thandle */
        struct list_head        st_sub_list;
 
        /* If this sub thandle is committed */
        bool                    st_committed:1,
-                               st_record_update:1;
+                               st_stopped:1,
+                               st_started:1;
 };
 
+struct tx_arg;
+typedef int (*tx_exec_func_t)(const struct lu_env *env, struct thandle *th,
+                             struct tx_arg *ta);
+
+/* Structure for holding one update execution */
+struct tx_arg {
+       tx_exec_func_t           exec_fn;
+       tx_exec_func_t           undo_fn;
+       struct dt_object        *object;
+       const char              *file;
+       struct object_update_reply *reply;
+       int                      line;
+       int                      index;
+       union {
+               struct {
+                       struct dt_insert_rec     rec;
+                       const struct dt_key     *key;
+               } insert;
+               struct {
+               } ref;
+               struct {
+                       struct lu_attr   attr;
+               } attr_set;
+               struct {
+                       struct lu_buf    buf;
+                       const char      *name;
+                       int              flags;
+                       __u32            csum;
+               } xattr_set;
+               struct {
+                       struct lu_attr                  attr;
+                       struct dt_allocation_hint       hint;
+                       struct dt_object_format         dof;
+                       struct lu_fid                   fid;
+               } create;
+               struct {
+                       struct lu_buf   buf;
+                       loff_t          pos;
+               } write;
+               struct {
+                       struct ost_body     *body;
+               } destroy;
+       } u;
+};
+
+/* Structure for holding all update executations of one transaction */
+struct thandle_exec_args {
+       struct thandle          *ta_handle;
+       int                     ta_argno;   /* used args */
+       int                     ta_alloc_args; /* allocated args count */
+       struct tx_arg           **ta_args;
+};
 
 /* target/out_lib.c */
 int out_update_pack(const struct lu_env *env, struct object_update *update,
-                   size_t max_update_size, enum update_type op,
+                   size_t *max_update_size, enum update_type op,
                    const struct lu_fid *fid, unsigned int params_count,
-                   __u16 *param_sizes, const void **param_bufs);
+                   __u16 *param_sizes, const void **param_bufs,
+                   __u32 reply_size);
 int out_create_pack(const struct lu_env *env, struct object_update *update,
-                   size_t max_update_size, const struct lu_fid *fid,
+                   size_t *max_update_size, const struct lu_fid *fid,
                    const struct lu_attr *attr, struct dt_allocation_hint *hint,
                    struct dt_object_format *dof);
-int out_object_destroy_pack(const struct lu_env *env,
-                           struct object_update *update,
-                           size_t max_update_size,
-                           const struct lu_fid *fid);
+int out_destroy_pack(const struct lu_env *env, struct object_update *update,
+                    size_t *max_update_size, const struct lu_fid *fid);
 int out_index_delete_pack(const struct lu_env *env,
-                         struct object_update *update, size_t max_update_size,
+                         struct object_update *update, size_t *max_update_size,
                          const struct lu_fid *fid, const struct dt_key *key);
 int out_index_insert_pack(const struct lu_env *env,
-                         struct object_update *update, size_t max_update_size,
+                         struct object_update *update, size_t *max_update_size,
                          const struct lu_fid *fid, const struct dt_rec *rec,
                          const struct dt_key *key);
 int out_xattr_set_pack(const struct lu_env *env,
-                      struct object_update *update, size_t max_update_size,
+                      struct object_update *update, size_t *max_update_size,
                       const struct lu_fid *fid, const struct lu_buf *buf,
                       const char *name, __u32 flag);
 int out_xattr_del_pack(const struct lu_env *env,
-                      struct object_update *update, size_t max_update_size,
+                      struct object_update *update, size_t *max_update_size,
                       const struct lu_fid *fid, const char *name);
 int out_attr_set_pack(const struct lu_env *env,
-                     struct object_update *update, size_t max_update_size,
+                     struct object_update *update, size_t *max_update_size,
                      const struct lu_fid *fid, const struct lu_attr *attr);
 int out_ref_add_pack(const struct lu_env *env,
-                    struct object_update *update, size_t max_update_size,
+                    struct object_update *update, size_t *max_update_size,
                     const struct lu_fid *fid);
 int out_ref_del_pack(const struct lu_env *env,
-                    struct object_update *update, size_t max_update_size,
+                    struct object_update *update, size_t *max_update_size,
                     const struct lu_fid *fid);
 int out_write_pack(const struct lu_env *env,
-                  struct object_update *update, size_t max_update_size,
+                  struct object_update *update, size_t *max_update_size,
                   const struct lu_fid *fid, const struct lu_buf *buf,
                   __u64 pos);
 int out_attr_get_pack(const struct lu_env *env,
-                     struct object_update *update, size_t max_update_size,
+                     struct object_update *update, size_t *max_update_size,
                      const struct lu_fid *fid);
 int out_index_lookup_pack(const struct lu_env *env,
-                         struct object_update *update, size_t max_update_size,
+                         struct object_update *update, size_t *max_update_size,
                          const struct lu_fid *fid, struct dt_rec *rec,
                          const struct dt_key *key);
 int out_xattr_get_pack(const struct lu_env *env,
-                      struct object_update *update, size_t max_update_size,
-                      const struct lu_fid *fid, const char *name);
+                      struct object_update *update, size_t *max_update_size,
+                      const struct lu_fid *fid, const char *name,
+                      const int bufsize);
+int out_xattr_list_pack(const struct lu_env *env, struct object_update *update,
+                      size_t *max_update_size, const struct lu_fid *fid,
+                      const int bufsize);
 int out_read_pack(const struct lu_env *env, struct object_update *update,
-                 size_t max_update_length, const struct lu_fid *fid,
+                 size_t *max_update_length, const struct lu_fid *fid,
                  size_t size, loff_t pos);
 
 const char *update_op_str(__u16 opcode);
@@ -433,16 +477,67 @@ thandle_get_sub(const struct lu_env *env, struct thandle *th,
 
 struct thandle *
 top_trans_create(const struct lu_env *env, struct dt_device *master_dev);
-
 int top_trans_start(const struct lu_env *env, struct dt_device *master_dev,
                    struct thandle *th);
-
 int top_trans_stop(const struct lu_env *env, struct dt_device *master_dev,
                   struct thandle *th);
+void top_multiple_thandle_destroy(struct top_multiple_thandle *tmt);
+
+static inline void top_multiple_thandle_get(struct top_multiple_thandle *tmt)
+{
+       atomic_inc(&tmt->tmt_refcount);
+}
+
+static inline void top_multiple_thandle_put(struct top_multiple_thandle *tmt)
+{
+       if (atomic_dec_and_test(&tmt->tmt_refcount))
+               top_multiple_thandle_destroy(tmt);
+}
 
-void top_thandle_destroy(struct top_thandle *top_th);
+struct sub_thandle *lookup_sub_thandle(struct top_multiple_thandle *tmt,
+                                      struct dt_device *dt_dev);
+int sub_thandle_trans_create(const struct lu_env *env,
+                            struct top_thandle *top_th,
+                            struct sub_thandle *st);
 
 /* update_records.c */
+size_t update_records_create_size(const struct lu_env *env,
+                                 const struct lu_fid *fid,
+                                 const struct lu_attr *attr,
+                                 const struct dt_allocation_hint *hint,
+                                 struct dt_object_format *dof);
+size_t update_records_attr_set_size(const struct lu_env *env,
+                                   const struct lu_fid *fid,
+                                   const struct lu_attr *attr);
+size_t update_records_ref_add_size(const struct lu_env *env,
+                                  const struct lu_fid *fid);
+size_t update_records_ref_del_size(const struct lu_env *env,
+                                  const struct lu_fid *fid);
+size_t update_records_destroy_size(const struct lu_env *env,
+                                  const struct lu_fid *fid);
+size_t update_records_index_insert_size(const struct lu_env *env,
+                                       const struct lu_fid *fid,
+                                       const struct dt_rec *rec,
+                                       const struct dt_key *key);
+size_t update_records_index_delete_size(const struct lu_env *env,
+                                       const struct lu_fid *fid,
+                                       const struct dt_key *key);
+size_t update_records_xattr_set_size(const struct lu_env *env,
+                                    const struct lu_fid *fid,
+                                    const struct lu_buf *buf,
+                                    const char *name,
+                                    __u32 flag);
+size_t update_records_xattr_del_size(const struct lu_env *env,
+                                    const struct lu_fid *fid,
+                                    const char *name);
+size_t update_records_write_size(const struct lu_env *env,
+                                const struct lu_fid *fid,
+                                const struct lu_buf *buf,
+                                __u64 pos);
+size_t update_records_punch_size(const struct lu_env *env,
+                                const struct lu_fid *fid,
+                                __u64 start, __u64 end);
+
 int update_records_create_pack(const struct lu_env *env,
                               struct update_ops *ops,
                               unsigned int *op_count,
@@ -479,14 +574,13 @@ int update_records_ref_del_pack(const struct lu_env *env,
                                unsigned int *param_count,
                                size_t *max_param_size,
                                const struct lu_fid *fid);
-int update_records_object_destroy_pack(const struct lu_env *env,
-                                      struct update_ops *ops,
-                                      unsigned int *op_count,
-                                      size_t *max_ops_size,
-                                      struct update_params *params,
-                                      unsigned int *param_count,
-                                      size_t *max_param_size,
-                                      const struct lu_fid *fid);
+int update_records_destroy_pack(const struct lu_env *env,
+                               struct update_ops *ops, unsigned int *op_count,
+                               size_t *max_ops_size,
+                               struct update_params *params,
+                               unsigned int *param_count,
+                               size_t *max_param_size,
+                               const struct lu_fid *fid);
 int update_records_index_insert_pack(const struct lu_env *env,
                                     struct update_ops *ops,
                                     unsigned int *op_count,
@@ -544,21 +638,26 @@ int update_records_punch_pack(const struct lu_env *env,
                              size_t *max_param_size,
                              const struct lu_fid *fid,
                              __u64 start, __u64 end);
+int update_records_noop_pack(const struct lu_env *env,
+                            struct update_ops *ops,
+                            unsigned int *op_count,
+                            size_t *max_ops_size,
+                            struct update_params *params,
+                            unsigned int *param_count,
+                            size_t *max_param_size,
+                            const struct lu_fid *fid);
 
 int tur_update_records_extend(struct thandle_update_records *tur,
                              size_t new_size);
 int tur_update_params_extend(struct thandle_update_records *tur,
                             size_t new_size);
-int check_and_prepare_update_record(const struct lu_env *env,
-                                   struct thandle *th);
-int merge_params_updates_buf(const struct lu_env *env,
-                            struct thandle_update_records *tur);
 int tur_update_extend(struct thandle_update_records *tur,
                      size_t new_op_size, size_t new_param_size);
 
 #define update_record_pack(name, th, ...)                              \
 ({                                                                     \
        struct top_thandle *top_th;                                     \
+       struct top_multiple_thandle *tmt;                               \
        struct thandle_update_records *tur;                             \
        struct llog_update_record     *lur;                             \
        size_t          avail_param_size;                               \
@@ -567,7 +666,8 @@ int tur_update_extend(struct thandle_update_records *tur,
                                                                        \
        while (1) {                                                     \
                top_th = container_of(th, struct top_thandle, tt_super);\
-               tur = top_th->tt_update_records;                        \
+               tmt = top_th->tt_multiple_thandle;                      \
+               tur = tmt->tmt_update_records;                          \
                lur = tur->tur_update_records;                          \
                avail_param_size = tur->tur_update_params_buf_size -    \
                             update_params_size(tur->tur_update_params, \
@@ -593,4 +693,17 @@ int tur_update_extend(struct thandle_update_records *tur,
        }                                                               \
        ret;                                                            \
 })
+
+#define update_record_size(env, name, th, ...)                         \
+({                                                                     \
+       struct top_thandle *top_th;                                     \
+       struct top_multiple_thandle *tmt;                               \
+                                                                       \
+       top_th = container_of(th, struct top_thandle, tt_super);        \
+                                                                       \
+       LASSERT(top_th->tt_multiple_thandle != NULL);                   \
+       tmt = top_th->tt_multiple_thandle;                              \
+       tmt->tmt_record_size +=                                         \
+               update_records_##name##_size(env, __VA_ARGS__);         \
+})
 #endif