X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosp%2Fosp_internal.h;h=aa574a385ed6c9a5526d7a081dfe51bf103c985a;hp=fd2d41d93c4edff60874ae6d775db1386fe55961;hb=b046468f58a1f40e85cb59ed9abf75fd2fd5ea5a;hpb=de8572645d287d17c409b99dabdf176822d91486 diff --git a/lustre/osp/osp_internal.h b/lustre/osp/osp_internal.h index fd2d41d..aa574a38 100644 --- a/lustre/osp/osp_internal.h +++ b/lustre/osp/osp_internal.h @@ -94,6 +94,42 @@ struct osp_precreate { int osp_pre_recovering; }; +struct osp_update_request_sub { + struct object_update_request *ours_req; + size_t ours_req_size; + /* Linked to osp_update_request->our_req_list */ + struct list_head ours_list; +}; + +/** + * Tracking the updates being executed on this dt_device. + */ +struct osp_update_request { + int our_flags; + /* update request result */ + int our_rc; + + /* List of osp_update_request_sub */ + struct list_head our_req_list; + + struct list_head our_cb_items; + + /* points to thandle if this update request belongs to one */ + struct osp_thandle *our_th; + /* linked to the list(ou_list) in osp_updates */ + struct list_head our_list; + __u32 our_req_sent:1; +}; + +struct osp_updates { + struct list_head ou_list; + spinlock_t ou_lock; + wait_queue_head_t ou_waitq; + /* wait for next updates */ + __u64 ou_rpc_version; + __u64 ou_version; +}; + struct osp_device { struct dt_device opd_dt_dev; /* corresponded OST index */ @@ -141,6 +177,11 @@ struct osp_device { /* thread waits for signals about pool going empty */ wait_queue_head_t opd_pre_waitq; + /* send update thread */ + struct osp_updates *opd_update; + /* dedicate update thread */ + struct ptlrpc_thread opd_update_thread; + /* * OST synchronization */ @@ -196,7 +237,7 @@ struct osp_device { * osp_device::opd_async_requests via declare() functions, these * requests can be packed together and sent to the remote server * via single OUT RPC later. */ - struct dt_update_request *opd_async_requests; + struct osp_update_request *opd_async_requests; /* Protect current operations on opd_async_requests. */ struct mutex opd_async_requests_mutex; struct list_head opd_async_updates; @@ -296,12 +337,18 @@ struct osp_it { struct page **ooi_pages; }; +#define OSP_THANDLE_MAGIC 0x20141214 struct osp_thandle { struct thandle ot_super; - struct dt_update_request *ot_dur; /* OSP will use this thandle to update last oid*/ struct thandle *ot_storage_th; + __u32 ot_magic; + struct list_head ot_commit_dcb_list; + struct list_head ot_stop_dcb_list; + struct osp_update_request *ot_our; + atomic_t ot_refcount; + __u64 ot_version; }; static inline struct osp_thandle * @@ -310,13 +357,13 @@ thandle_to_osp_thandle(struct thandle *th) return container_of(th, struct osp_thandle, ot_super); } -static inline struct dt_update_request * -thandle_to_dt_update_request(struct thandle *th) +static inline struct osp_update_request * +thandle_to_osp_update_request(struct thandle *th) { struct osp_thandle *oth; oth = thandle_to_osp_thandle(th); - return oth->ot_dur; + return oth->ot_our; } /* The transaction only include the updates on the remote node, and @@ -535,6 +582,71 @@ static inline int osp_is_fid_client(struct osp_device *osp) return imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_FID; } +struct object_update * +update_buffer_get_update(struct object_update_request *request, + unsigned int index); + +int osp_extend_update_buffer(const struct lu_env *env, + struct osp_update_request *our); + +struct osp_update_request_sub * +osp_current_object_update_request(struct osp_update_request *our); + +int osp_object_update_request_create(struct osp_update_request *our, + size_t size); + +#define osp_update_rpc_pack(env, name, our, op, ...) \ +({ \ + struct object_update *object_update; \ + size_t max_update_length; \ + struct osp_update_request_sub *ours; \ + int ret; \ + \ + while (1) { \ + ours = osp_current_object_update_request(our); \ + LASSERT(ours != NULL); \ + max_update_length = ours->ours_req_size - \ + object_update_request_size(ours->ours_req); \ + \ + object_update = update_buffer_get_update(ours->ours_req,\ + ours->ours_req->ourq_count); \ + ret = out_##name##_pack(env, object_update, \ + &max_update_length, \ + __VA_ARGS__); \ + if (ret == -E2BIG) { \ + int rc1; \ + /* Create new object update request */ \ + rc1 = osp_object_update_request_create(our, \ + max_update_length + \ + offsetof(struct object_update_request, \ + ourq_updates[0]) + 1); \ + if (rc1 != 0) { \ + ret = rc1; \ + break; \ + } \ + continue; \ + } else { \ + if (ret == 0) { \ + ours->ours_req->ourq_count++; \ + object_update->ou_flags |= \ + update->our_flags; \ + } \ + break; \ + } \ + } \ + ret; \ +}) + +static inline bool osp_send_update_thread_running(struct osp_device *osp) +{ + return osp->opd_update_thread.t_flags & SVC_RUNNING; +} + +static inline bool osp_send_update_thread_stopped(struct osp_device *osp) +{ + return osp->opd_update_thread.t_flags & SVC_STOPPED; +} + typedef int (*osp_update_interpreter_t)(const struct lu_env *env, struct object_update_reply *rep, struct ptlrpc_request *req, @@ -553,32 +665,47 @@ int osp_insert_async_request(const struct lu_env *env, enum update_type op, int osp_unplug_async_request(const struct lu_env *env, struct osp_device *osp, - struct dt_update_request *update); + struct osp_update_request *update); int osp_trans_update_request_create(struct thandle *th); struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d); int osp_trans_start(const struct lu_env *env, struct dt_device *dt, struct thandle *th); int osp_insert_update_callback(const struct lu_env *env, - struct dt_update_request *update, + struct osp_update_request *update, struct osp_object *obj, void *data, osp_update_interpreter_t interpreter); -int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp, - const struct object_update_request *ureq, - struct ptlrpc_request **reqp); -struct dt_update_request *dt_update_request_create(struct dt_device *dt); -void dt_update_request_destroy(struct dt_update_request *dt_update); + +struct osp_update_request *osp_update_request_create(struct dt_device *dt); +void osp_update_request_destroy(struct osp_update_request *update); + +int osp_send_update_thread(void *arg); +int osp_check_and_set_rpc_version(struct osp_thandle *oth); + +void osp_thandle_destroy(struct osp_thandle *oth); +static inline void osp_thandle_get(struct osp_thandle *oth) +{ + atomic_inc(&oth->ot_refcount); +} + +static inline void osp_thandle_put(struct osp_thandle *oth) +{ + if (atomic_dec_and_test(&oth->ot_refcount)) + osp_thandle_destroy(oth); +} int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp, - const struct object_update_request *ureq, + struct osp_update_request *our, struct ptlrpc_request **reqp); int osp_remote_sync(const struct lu_env *env, struct osp_device *osp, - struct dt_update_request *update, + struct osp_update_request *update, struct ptlrpc_request **reqp); struct thandle *osp_get_storage_thandle(const struct lu_env *env, struct thandle *th, struct osp_device *osp); +void osp_trans_callback(const struct lu_env *env, + struct osp_thandle *oth, int rc); /* osp_object.c */ int osp_attr_get(const struct lu_env *env, struct dt_object *dt, struct lu_attr *attr); @@ -597,6 +724,7 @@ int osp_xattr_del(const struct lu_env *env, struct dt_object *dt, int osp_trans_stop(const struct lu_env *env, struct dt_device *dt, struct thandle *th); +int osp_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb); struct dt_it *osp_it_init(const struct lu_env *env, struct dt_object *dt, __u32 attr); @@ -608,6 +736,7 @@ __u64 osp_it_store(const struct lu_env *env, const struct dt_it *di); int osp_it_key_rec(const struct lu_env *env, const struct dt_it *di, void *key_rec); int osp_it_next_page(const struct lu_env *env, struct dt_it *di); +int osp_oac_init(struct osp_object *obj); /* osp_md_object.c */ int osp_md_declare_object_create(const struct lu_env *env, struct dt_object *dt,