X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosp%2Fosp_internal.h;h=aa574a385ed6c9a5526d7a081dfe51bf103c985a;hp=6f12c202ba4f88fda680a77209182f0e1610fa81;hb=b046468f58a1f40e85cb59ed9abf75fd2fd5ea5a;hpb=a0da0ad1438f694dcff9ee9826d388927ea19d4e diff --git a/lustre/osp/osp_internal.h b/lustre/osp/osp_internal.h index 6f12c20..aa574a38 100644 --- a/lustre/osp/osp_internal.h +++ b/lustre/osp/osp_internal.h @@ -27,7 +27,7 @@ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2012, 2013, Intel Corporation. + * Copyright (c) 2012, 2014, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -48,6 +48,7 @@ #include #include #include +#include /* * Infrastructure to support tracking of last committed llog record @@ -93,6 +94,42 @@ struct osp_precreate { int osp_pre_recovering; }; +struct osp_update_request_sub { + struct object_update_request *ours_req; + size_t ours_req_size; + /* Linked to osp_update_request->our_req_list */ + struct list_head ours_list; +}; + +/** + * Tracking the updates being executed on this dt_device. + */ +struct osp_update_request { + int our_flags; + /* update request result */ + int our_rc; + + /* List of osp_update_request_sub */ + struct list_head our_req_list; + + struct list_head our_cb_items; + + /* points to thandle if this update request belongs to one */ + struct osp_thandle *our_th; + /* linked to the list(ou_list) in osp_updates */ + struct list_head our_list; + __u32 our_req_sent:1; +}; + +struct osp_updates { + struct list_head ou_list; + spinlock_t ou_lock; + wait_queue_head_t ou_waitq; + /* wait for next updates */ + __u64 ou_rpc_version; + __u64 ou_version; +}; + struct osp_device { struct dt_device opd_dt_dev; /* corresponded OST index */ @@ -119,7 +156,7 @@ struct osp_device { struct obd_uuid opd_cluuid; struct obd_connect_data *opd_connect_data; int opd_connects; - cfs_proc_dir_entry_t *opd_proc_entry; + struct proc_dir_entry *opd_proc_entry; struct lprocfs_stats *opd_stats; /* connection status. */ unsigned int opd_new_connection:1, @@ -140,6 +177,11 @@ struct osp_device { /* thread waits for signals about pool going empty */ wait_queue_head_t opd_pre_waitq; + /* send update thread */ + struct osp_updates *opd_update; + /* dedicate update thread */ + struct ptlrpc_thread opd_update_thread; + /* * OST synchronization */ @@ -188,16 +230,19 @@ struct osp_device { /* how often to update statfs data */ int opd_statfs_maxage; - cfs_proc_dir_entry_t *opd_symlink; + struct proc_dir_entry *opd_symlink; /* If the caller wants to do some idempotent async operations on * remote server, it can append the async remote requests on the * osp_device::opd_async_requests via declare() functions, these * requests can be packed together and sent to the remote server * via single OUT RPC later. */ - struct dt_update_request *opd_async_requests; + struct osp_update_request *opd_async_requests; /* Protect current operations on opd_async_requests. */ struct mutex opd_async_requests_mutex; + struct list_head opd_async_updates; + struct rw_semaphore opd_async_updates_rwsem; + atomic_t opd_async_updates_count; }; #define opd_pre_lock opd_pre->osp_pre_lock @@ -220,9 +265,9 @@ struct osp_xattr_entry { struct list_head oxe_list; atomic_t oxe_ref; void *oxe_value; - int oxe_buflen; - int oxe_namelen; - int oxe_vallen; + size_t oxe_buflen; + size_t oxe_namelen; + size_t oxe_vallen; unsigned int oxe_exist:1, oxe_ready:1; char oxe_buf[0]; @@ -238,7 +283,6 @@ struct osp_object { struct lu_object_header opo_header; struct dt_object opo_obj; unsigned int opo_reserved:1, - opo_new:1, opo_non_exist:1; /* read/write lock for md osp object */ @@ -261,7 +305,7 @@ struct osp_thread_info { struct lu_attr osi_attr; struct ost_id osi_oi; struct ost_id osi_oi2; - obd_id osi_id; + u64 osi_id; loff_t osi_off; union { struct llog_rec_hdr osi_hdr; @@ -280,6 +324,7 @@ struct osp_thread_info { struct osp_it { __u32 ooi_pos_page; __u32 ooi_pos_lu_page; + __u32 ooi_attr; int ooi_pos_ent; int ooi_total_npages; int ooi_valid_npages; @@ -292,27 +337,58 @@ struct osp_it { struct page **ooi_pages; }; +#define OSP_THANDLE_MAGIC 0x20141214 +struct osp_thandle { + struct thandle ot_super; + + /* OSP will use this thandle to update last oid*/ + struct thandle *ot_storage_th; + __u32 ot_magic; + struct list_head ot_commit_dcb_list; + struct list_head ot_stop_dcb_list; + struct osp_update_request *ot_our; + atomic_t ot_refcount; + __u64 ot_version; +}; + +static inline struct osp_thandle * +thandle_to_osp_thandle(struct thandle *th) +{ + return container_of(th, struct osp_thandle, ot_super); +} + +static inline struct osp_update_request * +thandle_to_osp_update_request(struct thandle *th) +{ + struct osp_thandle *oth; + + oth = thandle_to_osp_thandle(th); + return oth->ot_our; +} + /* The transaction only include the updates on the remote node, and * no local updates at all */ static inline bool is_only_remote_trans(struct thandle *th) { - return th->th_dev != NULL && th->th_dev->dd_ops == &osp_dt_ops; + return th->th_top == NULL; } static inline void osp_objid_buf_prep(struct lu_buf *buf, loff_t *off, __u32 *id, int index) { + /* Note: through id is only 32 bits, it will also write 64 bits + * for oid to keep compatibility with the previous version. */ buf->lb_buf = (void *)id; - buf->lb_len = sizeof(obd_id); - *off = sizeof(obd_id) * index; + buf->lb_len = sizeof(u64); + *off = sizeof(u64) * index; } static inline void osp_objseq_buf_prep(struct lu_buf *buf, loff_t *off, __u64 *seq, int index) { buf->lb_buf = (void *)seq; - buf->lb_len = sizeof(obd_id); - *off = sizeof(obd_id) * index; + buf->lb_len = sizeof(u64); + *off = sizeof(u64) * index; } static inline void osp_buf_prep(struct lu_buf *lb, void *buf, int buf_len) @@ -408,8 +484,20 @@ static inline struct seq_server_site *osp_seq_site(struct osp_device *osp) } #define osp_init_rpc_lock(lck) mdc_init_rpc_lock(lck) -#define osp_get_rpc_lock(lck, it) mdc_get_rpc_lock(lck, it) -#define osp_put_rpc_lock(lck, it) mdc_put_rpc_lock(lck, it) + +static inline void osp_get_rpc_lock(struct osp_device *osp) +{ + struct mdc_rpc_lock *rpc_lock = osp->opd_obd->u.cli.cl_rpc_lock; + + mdc_get_rpc_lock(rpc_lock, NULL); +} + +static inline void osp_put_rpc_lock(struct osp_device *osp) +{ + struct mdc_rpc_lock *rpc_lock = osp->opd_obd->u.cli.cl_rpc_lock; + + mdc_put_rpc_lock(rpc_lock, NULL); +} static inline int osp_fid_diff(const struct lu_fid *fid1, const struct lu_fid *fid2) @@ -494,57 +582,152 @@ static inline int osp_is_fid_client(struct osp_device *osp) return imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_FID; } -typedef int (*osp_async_request_interpreter_t)(const struct lu_env *env, - struct object_update_reply *rep, - struct ptlrpc_request *req, - struct osp_object *obj, - void *data, int index, int rc); +struct object_update * +update_buffer_get_update(struct object_update_request *request, + unsigned int index); + +int osp_extend_update_buffer(const struct lu_env *env, + struct osp_update_request *our); + +struct osp_update_request_sub * +osp_current_object_update_request(struct osp_update_request *our); + +int osp_object_update_request_create(struct osp_update_request *our, + size_t size); + +#define osp_update_rpc_pack(env, name, our, op, ...) \ +({ \ + struct object_update *object_update; \ + size_t max_update_length; \ + struct osp_update_request_sub *ours; \ + int ret; \ + \ + while (1) { \ + ours = osp_current_object_update_request(our); \ + LASSERT(ours != NULL); \ + max_update_length = ours->ours_req_size - \ + object_update_request_size(ours->ours_req); \ + \ + object_update = update_buffer_get_update(ours->ours_req,\ + ours->ours_req->ourq_count); \ + ret = out_##name##_pack(env, object_update, \ + &max_update_length, \ + __VA_ARGS__); \ + if (ret == -E2BIG) { \ + int rc1; \ + /* Create new object update request */ \ + rc1 = osp_object_update_request_create(our, \ + max_update_length + \ + offsetof(struct object_update_request, \ + ourq_updates[0]) + 1); \ + if (rc1 != 0) { \ + ret = rc1; \ + break; \ + } \ + continue; \ + } else { \ + if (ret == 0) { \ + ours->ours_req->ourq_count++; \ + object_update->ou_flags |= \ + update->our_flags; \ + } \ + break; \ + } \ + } \ + ret; \ +}) + +static inline bool osp_send_update_thread_running(struct osp_device *osp) +{ + return osp->opd_update_thread.t_flags & SVC_RUNNING; +} + +static inline bool osp_send_update_thread_stopped(struct osp_device *osp) +{ + return osp->opd_update_thread.t_flags & SVC_STOPPED; +} + +typedef int (*osp_update_interpreter_t)(const struct lu_env *env, + struct object_update_reply *rep, + struct ptlrpc_request *req, + struct osp_object *obj, + void *data, int index, int rc); /* osp_dev.c */ -void osp_update_last_id(struct osp_device *d, obd_id objid); +void osp_update_last_id(struct osp_device *d, u64 objid); extern struct llog_operations osp_mds_ost_orig_logops; /* osp_trans.c */ -int osp_insert_async_request(const struct lu_env *env, - int op, struct osp_object *obj, int count, - int *lens, const char **bufs, void *data, - osp_async_request_interpreter_t interpterer); +int osp_insert_async_request(const struct lu_env *env, enum update_type op, + struct osp_object *obj, int count, __u16 *lens, + const void **bufs, void *data, + osp_update_interpreter_t interpreter); + int osp_unplug_async_request(const struct lu_env *env, struct osp_device *osp, - struct dt_update_request *update); + struct osp_update_request *update); +int osp_trans_update_request_create(struct thandle *th); struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d); int osp_trans_start(const struct lu_env *env, struct dt_device *dt, struct thandle *th); +int osp_insert_update_callback(const struct lu_env *env, + struct osp_update_request *update, + struct osp_object *obj, void *data, + osp_update_interpreter_t interpreter); + +struct osp_update_request *osp_update_request_create(struct dt_device *dt); +void osp_update_request_destroy(struct osp_update_request *update); +int osp_send_update_thread(void *arg); +int osp_check_and_set_rpc_version(struct osp_thandle *oth); + +void osp_thandle_destroy(struct osp_thandle *oth); +static inline void osp_thandle_get(struct osp_thandle *oth) +{ + atomic_inc(&oth->ot_refcount); +} + +static inline void osp_thandle_put(struct osp_thandle *oth) +{ + if (atomic_dec_and_test(&oth->ot_refcount)) + osp_thandle_destroy(oth); +} + +int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp, + struct osp_update_request *our, + struct ptlrpc_request **reqp); +int osp_remote_sync(const struct lu_env *env, struct osp_device *osp, + struct osp_update_request *update, + struct ptlrpc_request **reqp); + +struct thandle *osp_get_storage_thandle(const struct lu_env *env, + struct thandle *th, + struct osp_device *osp); +void osp_trans_callback(const struct lu_env *env, + struct osp_thandle *oth, int rc); /* osp_object.c */ int osp_attr_get(const struct lu_env *env, struct dt_object *dt, - struct lu_attr *attr, struct lustre_capa *capa); + struct lu_attr *attr); int osp_xattr_get(const struct lu_env *env, struct dt_object *dt, - struct lu_buf *buf, const char *name, - struct lustre_capa *capa); + struct lu_buf *buf, const char *name); int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, const char *name, int flag, struct thandle *th); int osp_xattr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, const char *name, int fl, - struct thandle *th, struct lustre_capa *capa); + struct thandle *th); int osp_declare_xattr_del(const struct lu_env *env, struct dt_object *dt, const char *name, struct thandle *th); int osp_xattr_del(const struct lu_env *env, struct dt_object *dt, - const char *name, struct thandle *th, - struct lustre_capa *capa); - -int osp_declare_object_destroy(const struct lu_env *env, - struct dt_object *dt, struct thandle *th); -int osp_object_destroy(const struct lu_env *env, struct dt_object *dt, - struct thandle *th); + const char *name, struct thandle *th); int osp_trans_stop(const struct lu_env *env, struct dt_device *dt, struct thandle *th); +int osp_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb); struct dt_it *osp_it_init(const struct lu_env *env, struct dt_object *dt, - __u32 attr, struct lustre_capa *capa); + __u32 attr); void osp_it_fini(const struct lu_env *env, struct dt_it *di); int osp_it_get(const struct lu_env *env, struct dt_it *di, const struct dt_key *key); @@ -553,6 +736,7 @@ __u64 osp_it_store(const struct lu_env *env, const struct dt_it *di); int osp_it_key_rec(const struct lu_env *env, const struct dt_it *di, void *key_rec); int osp_it_next_page(const struct lu_env *env, struct dt_it *di); +int osp_oac_init(struct osp_object *obj); /* osp_md_object.c */ int osp_md_declare_object_create(const struct lu_env *env, struct dt_object *dt, @@ -566,8 +750,7 @@ int osp_md_object_create(const struct lu_env *env, struct dt_object *dt, int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_attr *attr, struct thandle *th); int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt, - const struct lu_attr *attr, struct thandle *th, - struct lustre_capa *capa); + const struct lu_attr *attr, struct thandle *th); extern const struct dt_index_operations osp_md_index_ops; /* osp_precreate.c */