Whamcloud - gitweb
LU-6158 mdt: always shrink_capsule in getxattr_all
[fs/lustre-release.git] / lustre / osp / osp_internal.h
index 1d6db3c..aa574a3 100644 (file)
@@ -94,6 +94,42 @@ struct osp_precreate {
        int                              osp_pre_recovering;
 };
 
+struct osp_update_request_sub {
+       struct object_update_request    *ours_req;
+       size_t                          ours_req_size;
+       /* Linked to osp_update_request->our_req_list */
+       struct list_head                ours_list;
+};
+
+/**
+ * Tracking the updates being executed on this dt_device.
+ */
+struct osp_update_request {
+       int                             our_flags;
+       /* update request result */
+       int                             our_rc;
+
+       /* List of osp_update_request_sub */
+       struct list_head                our_req_list;
+
+       struct list_head                our_cb_items;
+
+       /* points to thandle if this update request belongs to one */
+       struct osp_thandle              *our_th;
+       /* linked to the list(ou_list) in osp_updates */
+       struct list_head                our_list;
+       __u32                           our_req_sent:1;
+};
+
+struct osp_updates {
+       struct list_head        ou_list;
+       spinlock_t              ou_lock;
+       wait_queue_head_t       ou_waitq;
+       /* wait for next updates */
+       __u64                   ou_rpc_version;
+       __u64                   ou_version;
+};
+
 struct osp_device {
        struct dt_device                 opd_dt_dev;
        /* corresponded OST index */
@@ -141,6 +177,11 @@ struct osp_device {
        /* thread waits for signals about pool going empty */
        wait_queue_head_t                opd_pre_waitq;
 
+       /* send update thread */
+       struct osp_updates              *opd_update;
+       /* dedicate update thread */
+       struct ptlrpc_thread             opd_update_thread;
+
        /*
         * OST synchronization
         */
@@ -196,7 +237,7 @@ struct osp_device {
         * osp_device::opd_async_requests via declare() functions, these
         * requests can be packed together and sent to the remote server
         * via single OUT RPC later. */
-       struct dt_update_request        *opd_async_requests;
+       struct osp_update_request       *opd_async_requests;
        /* Protect current operations on opd_async_requests. */
        struct mutex                     opd_async_requests_mutex;
        struct list_head                 opd_async_updates;
@@ -296,12 +337,18 @@ struct osp_it {
        struct page              **ooi_pages;
 };
 
+#define OSP_THANDLE_MAGIC      0x20141214
 struct osp_thandle {
        struct thandle           ot_super;
-       struct dt_update_request *ot_dur;
 
        /* OSP will use this thandle to update last oid*/
        struct thandle          *ot_storage_th;
+       __u32                    ot_magic;
+       struct list_head         ot_commit_dcb_list;
+       struct list_head         ot_stop_dcb_list;
+       struct osp_update_request *ot_our;
+       atomic_t                 ot_refcount;
+       __u64                    ot_version;
 };
 
 static inline struct osp_thandle *
@@ -310,13 +357,13 @@ thandle_to_osp_thandle(struct thandle *th)
        return container_of(th, struct osp_thandle, ot_super);
 }
 
-static inline struct dt_update_request *
-thandle_to_dt_update_request(struct thandle *th)
+static inline struct osp_update_request *
+thandle_to_osp_update_request(struct thandle *th)
 {
        struct osp_thandle *oth;
 
        oth = thandle_to_osp_thandle(th);
-       return oth->ot_dur;
+       return oth->ot_our;
 }
 
 /* The transaction only include the updates on the remote node, and
@@ -540,36 +587,49 @@ update_buffer_get_update(struct object_update_request *request,
                         unsigned int index);
 
 int osp_extend_update_buffer(const struct lu_env *env,
-                            struct update_buffer *ubuf);
-
-#define osp_update_rpc_pack(env, name, update, op, ...)                \
-({                                                             \
-       struct object_update    *object_update;                 \
-       size_t                  max_update_length;              \
-       struct object_update_request *ureq;                     \
-       int                     ret;                            \
-                                                               \
+                            struct osp_update_request *our);
+
+struct osp_update_request_sub *
+osp_current_object_update_request(struct osp_update_request *our);
+
+int osp_object_update_request_create(struct osp_update_request *our,
+                                    size_t size);
+
+#define osp_update_rpc_pack(env, name, our, op, ...)                   \
+({                                                                     \
+       struct object_update    *object_update;                         \
+       size_t                  max_update_length;                      \
+       struct osp_update_request_sub *ours;                            \
+       int ret;                                                        \
+                                                                       \
        while (1) {                                                     \
-               ureq = update->dur_buf.ub_req;                          \
-               max_update_length = update->dur_buf.ub_req_size -       \
-                                   object_update_request_size(ureq);   \
+               ours = osp_current_object_update_request(our);          \
+               LASSERT(ours != NULL);                                  \
+               max_update_length = ours->ours_req_size -               \
+                           object_update_request_size(ours->ours_req); \
                                                                        \
-               object_update = update_buffer_get_update(ureq,          \
-                                                        ureq->ourq_count);    \
-               ret = out_##name##_pack(env, object_update, max_update_length, \
+               object_update = update_buffer_get_update(ours->ours_req,\
+                                        ours->ours_req->ourq_count);   \
+               ret = out_##name##_pack(env, object_update,             \
+                                       &max_update_length,             \
                                       __VA_ARGS__);                    \
                if (ret == -E2BIG) {                                    \
                        int rc1;                                        \
-                       /* extend the buffer and retry */               \
-                       rc1 = osp_extend_update_buffer(env, &update->dur_buf); \
+                       /* Create new object update request */          \
+                       rc1 = osp_object_update_request_create(our,     \
+                               max_update_length  +                    \
+                               offsetof(struct object_update_request,  \
+                                        ourq_updates[0]) + 1);         \
                        if (rc1 != 0) {                                 \
                                ret = rc1;                              \
                                break;                                  \
                        }                                               \
+                       continue;                                       \
                } else {                                                \
                        if (ret == 0) {                                 \
-                               object_update->ou_flags |= update->dur_flags; \
-                               ureq->ourq_count++;                     \
+                               ours->ours_req->ourq_count++;           \
+                               object_update->ou_flags |=              \
+                                                    update->our_flags; \
                        }                                               \
                        break;                                          \
                }                                                       \
@@ -577,6 +637,16 @@ int osp_extend_update_buffer(const struct lu_env *env,
        ret;                                                            \
 })
 
+static inline bool osp_send_update_thread_running(struct osp_device *osp)
+{
+       return osp->opd_update_thread.t_flags & SVC_RUNNING;
+}
+
+static inline bool osp_send_update_thread_stopped(struct osp_device *osp)
+{
+       return osp->opd_update_thread.t_flags & SVC_STOPPED;
+}
+
 typedef int (*osp_update_interpreter_t)(const struct lu_env *env,
                                        struct object_update_reply *rep,
                                        struct ptlrpc_request *req,
@@ -595,32 +665,47 @@ int osp_insert_async_request(const struct lu_env *env, enum update_type op,
 
 int osp_unplug_async_request(const struct lu_env *env,
                             struct osp_device *osp,
-                            struct dt_update_request *update);
+                            struct osp_update_request *update);
 int osp_trans_update_request_create(struct thandle *th);
 struct thandle *osp_trans_create(const struct lu_env *env,
                                 struct dt_device *d);
 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
                    struct thandle *th);
 int osp_insert_update_callback(const struct lu_env *env,
-                              struct dt_update_request *update,
+                              struct osp_update_request *update,
                               struct osp_object *obj, void *data,
                               osp_update_interpreter_t interpreter);
-int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
-                       const struct object_update_request *ureq,
-                       struct ptlrpc_request **reqp);
-struct dt_update_request *dt_update_request_create(struct dt_device *dt);
-void dt_update_request_destroy(struct dt_update_request *dt_update);
+
+struct osp_update_request *osp_update_request_create(struct dt_device *dt);
+void osp_update_request_destroy(struct osp_update_request *update);
+
+int osp_send_update_thread(void *arg);
+int osp_check_and_set_rpc_version(struct osp_thandle *oth);
+
+void osp_thandle_destroy(struct osp_thandle *oth);
+static inline void osp_thandle_get(struct osp_thandle *oth)
+{
+       atomic_inc(&oth->ot_refcount);
+}
+
+static inline void osp_thandle_put(struct osp_thandle *oth)
+{
+       if (atomic_dec_and_test(&oth->ot_refcount))
+               osp_thandle_destroy(oth);
+}
 
 int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
-                       const struct object_update_request *ureq,
+                       struct osp_update_request *our,
                        struct ptlrpc_request **reqp);
 int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
-                   struct dt_update_request *update,
+                   struct osp_update_request *update,
                    struct ptlrpc_request **reqp);
 
 struct thandle *osp_get_storage_thandle(const struct lu_env *env,
                                        struct thandle *th,
                                        struct osp_device *osp);
+void osp_trans_callback(const struct lu_env *env,
+                       struct osp_thandle *oth, int rc);
 /* osp_object.c */
 int osp_attr_get(const struct lu_env *env, struct dt_object *dt,
                 struct lu_attr *attr);
@@ -639,6 +724,7 @@ int osp_xattr_del(const struct lu_env *env, struct dt_object *dt,
 
 int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
                   struct thandle *th);
+int osp_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb);
 
 struct dt_it *osp_it_init(const struct lu_env *env, struct dt_object *dt,
                          __u32 attr);
@@ -650,6 +736,7 @@ __u64 osp_it_store(const struct lu_env *env, const struct dt_it *di);
 int osp_it_key_rec(const struct lu_env *env, const struct dt_it *di,
                   void *key_rec);
 int osp_it_next_page(const struct lu_env *env, struct dt_it *di);
+int osp_oac_init(struct osp_object *obj);
 /* osp_md_object.c */
 int osp_md_declare_object_create(const struct lu_env *env,
                                 struct dt_object *dt,