From 386818f0c56e438779e17d0ca12b481f17c53682 Mon Sep 17 00:00:00 2001 From: Wang Di Date: Sat, 7 Jun 2014 12:57:47 -0700 Subject: [PATCH] LU-3536 osp: move update packing into out_lib.c Move osp_update_insert to out_lib.c, so OSP/LOD/OUT can all pack update into OUT RPC. Signed-off-by: wang di Change-Id: If087a0bc6a858e9e6c128311ed0d80e4392ffaff Reviewed-on: http://review.whamcloud.com/9321 Tested-by: Jenkins Tested-by: Maloo Reviewed-by: John L. Hammond Reviewed-by: Alex Zhuravlev Reviewed-by: Oleg Drokin --- lustre/include/lu_target.h | 18 -- lustre/include/lustre/lustre_idl.h | 70 ++++++ lustre/include/lustre_update.h | 148 ++++++------ lustre/osp/osp_dev.c | 2 +- lustre/osp/osp_internal.h | 8 +- lustre/osp/osp_md_object.c | 156 ++++-------- lustre/osp/osp_object.c | 58 ++--- lustre/osp/osp_sync.c | 25 +- lustre/osp/osp_trans.c | 34 +-- lustre/ptlrpc/Makefile.in | 2 +- lustre/ptlrpc/pack_generic.c | 1 - lustre/target/out_lib.c | 482 +++++++++++++++++++++++++++++++------ 12 files changed, 670 insertions(+), 334 deletions(-) diff --git a/lustre/include/lu_target.h b/lustre/include/lu_target.h index d01a49e..929a4d4 100644 --- a/lustre/include/lu_target.h +++ b/lustre/include/lu_target.h @@ -334,24 +334,6 @@ int tgt_server_data_update(const struct lu_env *env, struct lu_target *tg, int tgt_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tg, loff_t off); -/* target/out_lib.c */ -struct dt_update_request * -out_find_update(struct thandle_update *tu, struct dt_device *dt_dev); -void out_destroy_update_req(struct dt_update_request *update); -struct dt_update_request *out_create_update_req(struct dt_device *dt); -struct dt_update_request *out_find_create_update_loc(struct thandle *th, - struct dt_object *dt); -int out_prep_update_req(const struct lu_env *env, struct obd_import *imp, - const struct object_update_request *ureq, - struct ptlrpc_request **reqp); -int out_remote_sync(const struct lu_env *env, struct obd_import *imp, - struct dt_update_request *update, - struct ptlrpc_request **reqp); -int out_insert_update(const struct lu_env *env, - struct dt_update_request *update, - int op, const struct lu_fid *fid, int count, - int *lens, const char **bufs); - enum { ESERIOUS = 0x0001000 }; diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 61216c3..8372ace 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -4008,6 +4008,12 @@ struct object_update_param { char oup_buf[0]; }; +static inline size_t +object_update_param_size(const struct object_update_param *param) +{ + return cfs_size_round(sizeof(*param) + param->oup_len); +} + /* object update */ struct object_update { __u16 ou_type; /* enum update_type */ @@ -4034,6 +4040,43 @@ struct object_update_request { void lustre_swab_object_update(struct object_update *ou); void lustre_swab_object_update_request(struct object_update_request *our); +static inline size_t +object_update_size(const struct object_update *update) +{ + const struct object_update_param *param; + size_t size; + unsigned int i; + + size = offsetof(struct object_update, ou_params[0]); + for (i = 0; i < update->ou_params_count; i++) { + param = (struct object_update_param *)((char *)update + size); + size += object_update_param_size(param); + } + + return size; +} + +static inline struct object_update * +object_update_request_get(const struct object_update_request *our, + unsigned int index, size_t *size) +{ + void *ptr; + unsigned int i; + + if (index >= our->ourq_count) + return NULL; + + ptr = (void *)&our->ourq_updates[0]; + for (i = 0; i < index; i++) + ptr += object_update_size(ptr); + + if (size != NULL) + *size = object_update_size(ptr); + + return ptr; +} + + /* the result of object update */ struct object_update_result { __u32 our_rc; @@ -4056,6 +4099,33 @@ struct object_update_reply { void lustre_swab_object_update_result(struct object_update_result *our); void lustre_swab_object_update_reply(struct object_update_reply *our); +static inline struct object_update_result * +object_update_result_get(const struct object_update_reply *reply, + unsigned int index, size_t *size) +{ + __u16 count = reply->ourp_count; + unsigned int i; + void *ptr; + + if (index >= count) + return NULL; + + ptr = (char *)reply + + cfs_size_round(offsetof(struct object_update_reply, + ourp_lens[count])); + for (i = 0; i < index; i++) { + if (reply->ourp_lens[i] == 0) + return NULL; + + ptr += cfs_size_round(reply->ourp_lens[i]); + } + + if (size != NULL) + *size = reply->ourp_lens[index]; + + return ptr; +} + /** layout swap request structure * fid1 and fid2 are in mdt_body */ diff --git a/lustre/include/lustre_update.h b/lustre/include/lustre_update.h index e09fe3f..d330df1 100644 --- a/lustre/include/lustre_update.h +++ b/lustre/include/lustre_update.h @@ -32,8 +32,25 @@ #define _LUSTRE_UPDATE_H #include -#define OUT_UPDATE_INIT_BUFFER_SIZE 8192 -#define OUT_UPDATE_REPLY_SIZE 8192 +#define OUT_UPDATE_INIT_BUFFER_SIZE 4096 +#define OUT_UPDATE_REPLY_SIZE 8192 + +struct dt_object; +struct dt_object_hint; +struct dt_object_format; +struct dt_allocation_hint; +struct dt_key; +struct dt_rec; +struct thandle; + +struct update_buffer { + struct object_update_request *ub_req; + size_t ub_req_size; +}; + +/** + * Tracking the updates being executed on this dt_device. + */ struct dt_update_request { struct dt_device *dur_dt; /* attached itself to thandle */ @@ -43,34 +60,11 @@ struct dt_update_request { int dur_rc; /* Current batch(transaction) id */ __u64 dur_batchid; - /* Holding the update req */ - struct object_update_request *dur_req; - int dur_req_len; + /* Holding object updates */ + struct update_buffer dur_buf; struct list_head dur_cb_items; }; -static inline unsigned long -object_update_param_size(const struct object_update_param *param) -{ - return cfs_size_round(sizeof(*param) + param->oup_len); -} - -static inline unsigned long -object_update_size(const struct object_update *update) -{ - const struct object_update_param *param; - unsigned long size; - size_t i; - - size = offsetof(struct object_update, ou_params[0]); - for (i = 0; i < update->ou_params_count; i++) { - param = (struct object_update_param *)((char *)update + size); - size += object_update_param_size(param); - } - - return size; -} - static inline void *object_update_param_get(const struct object_update *update, size_t index, size_t *size) @@ -111,27 +105,6 @@ object_update_request_size(const struct object_update_request *our) return size; } -static inline struct object_update -*object_update_request_get(const struct object_update_request *our, - size_t index, size_t *size) -{ - void *ptr; - size_t i; - - if (index >= our->ourq_count) - return NULL; - - ptr = (char *)our + offsetof(struct object_update_request, - ourq_updates[0]); - for (i = 0; i < index; i++) - ptr += object_update_size((struct object_update *)ptr); - - if (size != NULL) - *size = object_update_size((struct object_update *)ptr); - - return ptr; -} - static inline void object_update_reply_init(struct object_update_reply *reply, size_t count) { @@ -139,31 +112,6 @@ object_update_reply_init(struct object_update_reply *reply, size_t count) reply->ourp_count = count; } -static inline struct object_update_result -*object_update_result_get(const struct object_update_reply *reply, - size_t index, size_t *size) -{ - char *ptr; - size_t count = reply->ourp_count; - size_t i; - - if (index >= count) - return NULL; - - ptr = (char *)reply + - cfs_size_round(offsetof(struct object_update_reply, - ourp_lens[count])); - for (i = 0; i < index; i++) { - LASSERT(reply->ourp_lens[i] > 0); - ptr += cfs_size_round(reply->ourp_lens[i]); - } - - if (size != NULL) - *size = reply->ourp_lens[index]; - - return (struct object_update_result *)ptr; -} - static inline void object_update_result_insert(struct object_update_reply *reply, void *data, size_t data_len, size_t index, @@ -218,4 +166,58 @@ static inline void update_inc_batchid(struct dt_update_request *update) update->dur_batchid++; } +/* target/out_lib.c */ +struct thandle_update; +struct dt_update_request *out_find_update(struct thandle_update *tu, + struct dt_device *dt_dev); +void dt_update_request_destroy(struct dt_update_request *update); +struct dt_update_request *dt_update_request_create(struct dt_device *dt); +struct dt_update_request *dt_update_request_find_or_create(struct thandle *th, + struct dt_object *dt); +int out_prep_update_req(const struct lu_env *env, struct obd_import *imp, + const struct object_update_request *ureq, + struct ptlrpc_request **reqp); +int out_remote_sync(const struct lu_env *env, struct obd_import *imp, + struct dt_update_request *update, + struct ptlrpc_request **reqp); +int out_update_pack(const struct lu_env *env, struct update_buffer *ubuf, + enum update_type op, const struct lu_fid *fid, + int params_count, __u16 *param_sizes, const void **bufs, + __u64 batchid); +int out_create_pack(const struct lu_env *env, struct update_buffer *ubuf, + const struct lu_fid *fid, struct lu_attr *attr, + struct dt_allocation_hint *hint, + struct dt_object_format *dof, __u64 batchid); +int out_object_destroy_pack(const struct lu_env *env, + struct update_buffer *ubuf, + const struct lu_fid *fid, __u64 batchid); +int out_index_delete_pack(const struct lu_env *env, struct update_buffer *ubuf, + const struct lu_fid *fid, const struct dt_key *key, + __u64 batchid); +int out_index_insert_pack(const struct lu_env *env, struct update_buffer *ubuf, + const struct lu_fid *fid, const struct dt_rec *rec, + const struct dt_key *key, __u64 batchid); +int out_xattr_set_pack(const struct lu_env *env, struct update_buffer *ubuf, + const struct lu_fid *fid, const struct lu_buf *buf, + const char *name, int flag, __u64 batchid); +int out_xattr_del_pack(const struct lu_env *env, struct update_buffer *ubuf, + const struct lu_fid *fid, const char *name, + __u64 batchid); +int out_attr_set_pack(const struct lu_env *env, struct update_buffer *ubuf, + const struct lu_fid *fid, const struct lu_attr *attr, + __u64 batchid); +int out_ref_add_pack(const struct lu_env *env, struct update_buffer *ubuf, + const struct lu_fid *fid, __u64 batchid); +int out_ref_del_pack(const struct lu_env *env, struct update_buffer *ubuf, + const struct lu_fid *fid, __u64 batchid); +int out_write_pack(const struct lu_env *env, struct update_buffer *ubuf, + const struct lu_fid *fid, const struct lu_buf *buf, + loff_t pos, __u64 batchid); +int out_attr_get_pack(const struct lu_env *env, struct update_buffer *ubuf, + const struct lu_fid *fid); +int out_index_lookup_pack(const struct lu_env *env, struct update_buffer *ubuf, + const struct lu_fid *fid, struct dt_rec *rec, + const struct dt_key *key); +int out_xattr_get_pack(const struct lu_env *env, struct update_buffer *ubuf, + const struct lu_fid *fid, const char *name); #endif diff --git a/lustre/osp/osp_dev.c b/lustre/osp/osp_dev.c index 2e7206c..5208b9f 100644 --- a/lustre/osp/osp_dev.c +++ b/lustre/osp/osp_dev.c @@ -1167,7 +1167,7 @@ static struct lu_device *osp_device_fini(const struct lu_env *env, ENTRY; if (osp->opd_async_requests != NULL) { - out_destroy_update_req(osp->opd_async_requests); + dt_update_request_destroy(osp->opd_async_requests); osp->opd_async_requests = NULL; } diff --git a/lustre/osp/osp_internal.h b/lustre/osp/osp_internal.h index a0cf435..7c0c497 100644 --- a/lustre/osp/osp_internal.h +++ b/lustre/osp/osp_internal.h @@ -505,10 +505,10 @@ void osp_update_last_id(struct osp_device *d, obd_id objid); extern struct llog_operations osp_mds_ost_orig_logops; /* osp_trans.c */ -int osp_insert_async_request(const struct lu_env *env, - int op, struct osp_object *obj, int count, - int *lens, const char **bufs, void *data, - osp_async_request_interpreter_t interpterer); +int osp_insert_async_request(const struct lu_env *env, enum update_type op, + struct osp_object *obj, int count, __u16 *lens, + const void **bufs, void *data, + osp_async_request_interpreter_t interpreter); int osp_unplug_async_request(const struct lu_env *env, struct osp_device *osp, struct dt_update_request *update); diff --git a/lustre/osp/osp_md_object.c b/lustre/osp/osp_md_object.c index 237e8ae..c10a9dc 100644 --- a/lustre/osp/osp_md_object.c +++ b/lustre/osp/osp_md_object.c @@ -84,15 +84,10 @@ int osp_md_declare_object_create(const struct lu_env *env, struct dt_object_format *dof, struct thandle *th) { - struct osp_thread_info *osi = osp_env_info(env); struct dt_update_request *update; - struct lu_fid *fid1; - int sizes[2] = {sizeof(struct obdo), 0}; - char *bufs[2] = {NULL, NULL}; - int buf_count; int rc; - update = out_find_create_update_loc(th, dt); + update = dt_update_request_find_or_create(th, dt); if (IS_ERR(update)) { CERROR("%s: Get OSP update buf failed: rc = %d\n", dt->do_lu.lo_dev->ld_obd->obd_name, @@ -100,22 +95,6 @@ int osp_md_declare_object_create(const struct lu_env *env, return PTR_ERR(update); } - osi->osi_obdo.o_valid = 0; - obdo_from_la(&osi->osi_obdo, attr, attr->la_valid); - lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo); - - bufs[0] = (char *)&osi->osi_obdo; - buf_count = 1; - fid1 = (struct lu_fid *)lu_object_fid(&dt->do_lu); - if (hint != NULL && hint->dah_parent) { - struct lu_fid *fid2; - - fid2 = (struct lu_fid *)lu_object_fid(&hint->dah_parent->do_lu); - sizes[1] = sizeof(*fid2); - bufs[1] = (char *)fid2; - buf_count++; - } - if (lu_object_exists(&dt->do_lu)) { /* If the object already exists, we needs to destroy * this orphan object first. @@ -131,23 +110,27 @@ int osp_md_declare_object_create(const struct lu_env *env, * but find the object already exists */ CDEBUG(D_HA, "%s: object "DFID" exists, destroy this orphan\n", - dt->do_lu.lo_dev->ld_obd->obd_name, PFID(fid1)); + dt->do_lu.lo_dev->ld_obd->obd_name, + PFID(lu_object_fid(&dt->do_lu))); - rc = out_insert_update(env, update, OUT_REF_DEL, fid1, 0, - NULL, NULL); + rc = out_ref_del_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), + update->dur_batchid); if (rc != 0) GOTO(out, rc); if (S_ISDIR(lu_object_attr(&dt->do_lu))) { /* decrease for ".." */ - rc = out_insert_update(env, update, OUT_REF_DEL, fid1, - 0, NULL, NULL); + rc = out_ref_del_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), + update->dur_batchid); if (rc != 0) GOTO(out, rc); } - rc = out_insert_update(env, update, OUT_DESTROY, fid1, 0, NULL, - NULL); + rc = out_object_destroy_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), + update->dur_batchid); if (rc != 0) GOTO(out, rc); @@ -157,8 +140,11 @@ int osp_md_declare_object_create(const struct lu_env *env, update_inc_batchid(update); } - rc = out_insert_update(env, update, OUT_CREATE, fid1, buf_count, sizes, - (const char **)bufs); + rc = out_create_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), attr, hint, dof, + update->dur_batchid); + if (rc != 0) + GOTO(out, rc); out: if (rc) CERROR("%s: Insert update error: rc = %d\n", @@ -218,10 +204,9 @@ static int osp_md_declare_object_ref_del(const struct lu_env *env, struct thandle *th) { struct dt_update_request *update; - struct lu_fid *fid; int rc; - update = out_find_create_update_loc(th, dt); + update = dt_update_request_find_or_create(th, dt); if (IS_ERR(update)) { CERROR("%s: Get OSP update buf failed: rc = %d\n", dt->do_lu.lo_dev->ld_obd->obd_name, @@ -229,10 +214,9 @@ static int osp_md_declare_object_ref_del(const struct lu_env *env, return PTR_ERR(update); } - fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); - - rc = out_insert_update(env, update, OUT_REF_DEL, fid, 0, NULL, NULL); - + rc = out_ref_del_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), + update->dur_batchid); return rc; } @@ -276,10 +260,9 @@ static int osp_md_declare_ref_add(const struct lu_env *env, struct dt_object *dt, struct thandle *th) { struct dt_update_request *update; - struct lu_fid *fid; int rc; - update = out_find_create_update_loc(th, dt); + update = dt_update_request_find_or_create(th, dt); if (IS_ERR(update)) { CERROR("%s: Get OSP update buf failed: rc = %d\n", dt->do_lu.lo_dev->ld_obd->obd_name, @@ -287,9 +270,9 @@ static int osp_md_declare_ref_add(const struct lu_env *env, return PTR_ERR(update); } - fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); - - rc = out_insert_update(env, update, OUT_REF_ADD, fid, 0, NULL, NULL); + rc = out_ref_add_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), + update->dur_batchid); return rc; } @@ -359,14 +342,10 @@ static void osp_md_ah_init(const struct lu_env *env, int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_attr *attr, struct thandle *th) { - struct osp_thread_info *osi = osp_env_info(env); struct dt_update_request *update; - struct lu_fid *fid; - int size = sizeof(struct obdo); - char *buf; int rc; - update = out_find_create_update_loc(th, dt); + update = dt_update_request_find_or_create(th, dt); if (IS_ERR(update)) { CERROR("%s: Get OSP update buf failed: %d\n", dt->do_lu.lo_dev->ld_obd->obd_name, @@ -374,16 +353,9 @@ int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt, return PTR_ERR(update); } - osi->osi_obdo.o_valid = 0; - obdo_from_la(&osi->osi_obdo, (struct lu_attr *)attr, - attr->la_valid); - lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo); - - buf = (char *)&osi->osi_obdo; - fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); - - rc = out_insert_update(env, update, OUT_ATTR_SET, fid, 1, &size, - (const char **)&buf); + rc = out_attr_set_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), attr, + update->dur_batchid); return rc; } @@ -531,7 +503,6 @@ static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt, struct dt_update_request *update; struct object_update_reply *reply; struct ptlrpc_request *req = NULL; - int size = strlen((char *)key) + 1; struct lu_fid *fid; int rc; ENTRY; @@ -540,14 +511,13 @@ static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt, * just create an update buffer, instead of attaching the * update_remote list of the thandle. */ - update = out_create_update_req(dt_dev); + update = dt_update_request_create(dt_dev); if (IS_ERR(update)) RETURN(PTR_ERR(update)); - rc = out_insert_update(env, update, OUT_INDEX_LOOKUP, - lu_object_fid(&dt->do_lu), - 1, &size, (const char **)&key); - if (rc) { + rc = out_index_lookup_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), rec, key); + if (rc != 0) { CERROR("%s: Insert update error: rc = %d\n", dt_dev->dd_lu_dev.ld_obd->obd_name, rc); GOTO(out, rc); @@ -597,7 +567,7 @@ out: if (req != NULL) ptlrpc_req_finished(req); - out_destroy_update_req(update); + dt_update_request_destroy(update); return rc; } @@ -623,22 +593,10 @@ static int osp_md_declare_insert(const struct lu_env *env, const struct dt_key *key, struct thandle *th) { - struct osp_thread_info *info = osp_env_info(env); - struct dt_update_request *update; - struct dt_insert_rec *rec1 = (struct dt_insert_rec *)rec; - struct lu_fid *fid = - (struct lu_fid *)lu_object_fid(&dt->do_lu); - struct lu_fid *rec_fid = &info->osi_fid; - __u32 type = cpu_to_le32(rec1->rec_type); - int size[3] = { strlen((char *)key) + 1, - sizeof(*rec_fid), - sizeof(type) }; - const char *bufs[3] = { (char *)key, - (char *)rec_fid, - (char *)&type }; - int rc; - - update = out_find_create_update_loc(th, dt); + struct dt_update_request *update; + int rc; + + update = dt_update_request_find_or_create(th, dt); if (IS_ERR(update)) { CERROR("%s: Get OSP update buf failed: rc = %d\n", dt->do_lu.lo_dev->ld_obd->obd_name, @@ -646,13 +604,9 @@ static int osp_md_declare_insert(const struct lu_env *env, return PTR_ERR(update); } - CDEBUG(D_INFO, "%s: insert index of "DFID" %s: "DFID", %u\n", - dt->do_lu.lo_dev->ld_obd->obd_name, - PFID(fid), (char *)key, PFID(rec1->rec_fid), rec1->rec_type); - - fid_cpu_to_le(rec_fid, rec1->rec_fid); - rc = out_insert_update(env, update, OUT_INDEX_INSERT, fid, - ARRAY_SIZE(size), size, bufs); + rc = out_index_insert_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), rec, key, + update->dur_batchid); return rc; } @@ -704,11 +658,9 @@ static int osp_md_declare_delete(const struct lu_env *env, struct thandle *th) { struct dt_update_request *update; - struct lu_fid *fid; - int size = strlen((char *)key) + 1; - int rc; + int rc; - update = out_find_create_update_loc(th, dt); + update = dt_update_request_find_or_create(th, dt); if (IS_ERR(update)) { CERROR("%s: Get OSP update buf failed: rc = %d\n", dt->do_lu.lo_dev->ld_obd->obd_name, @@ -716,11 +668,9 @@ static int osp_md_declare_delete(const struct lu_env *env, return PTR_ERR(update); } - fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); - - rc = out_insert_update(env, update, OUT_INDEX_DELETE, fid, 1, &size, - (const char **)&key); - + rc = out_index_delete_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), key, + update->dur_batchid); return rc; } @@ -1071,13 +1021,9 @@ static ssize_t osp_md_declare_write(const struct lu_env *env, loff_t pos, struct thandle *th) { struct dt_update_request *update; - struct lu_fid *fid; - int sizes[2] = {buf->lb_len, sizeof(pos)}; - const char *bufs[2] = {(char *)buf->lb_buf, - (char *)&pos}; ssize_t rc; - update = out_find_create_update_loc(th, dt); + update = dt_update_request_find_or_create(th, dt); if (IS_ERR(update)) { CERROR("%s: Get OSP update buf failed: rc = %d\n", dt->do_lu.lo_dev->ld_obd->obd_name, @@ -1085,11 +1031,8 @@ static ssize_t osp_md_declare_write(const struct lu_env *env, return PTR_ERR(update); } - pos = cpu_to_le64(pos); - bufs[1] = (char *)&pos; - fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); - rc = out_insert_update(env, update, OUT_WRITE, fid, - ARRAY_SIZE(sizes), sizes, bufs); + rc = out_write_pack(env, &update->dur_buf, lu_object_fid(&dt->do_lu), + buf, pos, update->dur_batchid); return rc; @@ -1117,6 +1060,7 @@ static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt, struct thandle *handle, struct lustre_capa *capa, int ignore_quota) { + *pos += buf->lb_len; return buf->lb_len; } diff --git a/lustre/osp/osp_object.c b/lustre/osp/osp_object.c index b2b550a..ce034ed 100644 --- a/lustre/osp/osp_object.c +++ b/lustre/osp/osp_object.c @@ -330,12 +330,12 @@ int osp_attr_get(const struct lu_env *env, struct dt_object *dt, spin_unlock(&obj->opo_lock); } - update = out_create_update_req(dev); + update = dt_update_request_create(dev); if (IS_ERR(update)) RETURN(PTR_ERR(update)); - rc = out_insert_update(env, update, OUT_ATTR_GET, - lu_object_fid(&dt->do_lu), 0, NULL, NULL); + rc = out_attr_get_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu)); if (rc != 0) { CERROR("%s: Insert update error "DFID": rc = %d\n", dev->dd_lu_dev.ld_obd->obd_name, @@ -376,7 +376,7 @@ out: if (req != NULL) ptlrpc_req_finished(req); - out_destroy_update_req(update); + dt_update_request_destroy(update); return rc; } @@ -597,7 +597,7 @@ static int osp_declare_xattr_get(const struct lu_env *env, struct dt_object *dt, struct osp_object *obj = dt2osp_obj(dt); struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); struct osp_xattr_entry *oxe; - int namelen = strlen(name); + __u16 namelen = strlen(name); int rc = 0; LASSERT(buf != NULL); @@ -619,7 +619,7 @@ static int osp_declare_xattr_get(const struct lu_env *env, struct dt_object *dt, mutex_lock(&osp->opd_async_requests_mutex); rc = osp_insert_async_request(env, OUT_XATTR_GET, obj, 1, - &namelen, &name, oxe, + &namelen, (const void **)&name, oxe, osp_xattr_get_interpterer); if (rc != 0) { mutex_unlock(&osp->opd_async_requests_mutex); @@ -633,8 +633,8 @@ static int osp_declare_xattr_get(const struct lu_env *env, struct dt_object *dt, * * We will improve it in the future. */ update = osp->opd_async_requests; - if (update != NULL && update->dur_req != NULL && - update->dur_req->ourq_count > 0) { + if (update != NULL && update->dur_buf.ub_req != NULL && + update->dur_buf.ub_req->ourq_count > 0) { osp->opd_async_requests = NULL; mutex_unlock(&osp->opd_async_requests_mutex); rc = osp_unplug_async_request(env, osp, update); @@ -659,7 +659,6 @@ int osp_xattr_get(const struct lu_env *env, struct dt_object *dt, struct object_update_reply *reply; struct osp_xattr_entry *oxe = NULL; const char *dname = dt->do_lu.lo_dev->ld_obd->obd_name; - int namelen; int rc = 0; ENTRY; @@ -700,17 +699,15 @@ unlock: spin_unlock(&obj->opo_lock); } - update = out_create_update_req(dev); + update = dt_update_request_create(dev); if (IS_ERR(update)) GOTO(out, rc = PTR_ERR(update)); - namelen = strlen(name) + 1; - rc = out_insert_update(env, update, OUT_XATTR_GET, - lu_object_fid(&dt->do_lu), 1, &namelen, &name); + rc = out_xattr_get_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), name); if (rc != 0) { CERROR("%s: Insert update error "DFID": rc = %d\n", dname, PFID(lu_object_fid(&dt->do_lu)), rc); - GOTO(out, rc); } @@ -815,7 +812,7 @@ out: ptlrpc_req_finished(req); if (update != NULL && !IS_ERR(update)) - out_destroy_update_req(update); + dt_update_request_destroy(update); if (oxe != NULL) osp_oac_xattr_put(oxe); @@ -827,21 +824,15 @@ static int __osp_xattr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, const char *name, int flag, struct thandle *th) { + struct osp_object *o = dt2osp_obj(dt); struct dt_update_request *update; - struct lu_fid *fid; - int sizes[3] = { strlen(name), - buf->lb_len, - sizeof(int) }; - char *bufs[3] = { (char *)name, - (char *)buf->lb_buf }; - struct osp_xattr_entry *oxe; - struct osp_object *o = dt2osp_obj(dt); - int rc; + struct osp_xattr_entry *oxe; + int rc; ENTRY; LASSERT(buf->lb_len > 0 && buf->lb_buf != NULL); - update = out_find_create_update_loc(th, dt); + update = dt_update_request_find_or_create(th, dt); if (IS_ERR(update)) { CERROR("%s: Get OSP update buf failed "DFID": rc = %d\n", dt->do_lu.lo_dev->ld_obd->obd_name, @@ -851,12 +842,9 @@ static int __osp_xattr_set(const struct lu_env *env, struct dt_object *dt, RETURN(PTR_ERR(update)); } - flag = cpu_to_le32(flag); - bufs[2] = (char *)&flag; - - fid = (struct lu_fid *)lu_object_fid(&dt->do_lu); - rc = out_insert_update(env, update, OUT_XATTR_SET, fid, - ARRAY_SIZE(sizes), sizes, (const char **)bufs); + rc = out_xattr_set_pack(env, &update->dur_buf, + lu_object_fid(&dt->do_lu), + buf, name, flag, update->dur_batchid); if (rc != 0 || o->opo_ooa == NULL) RETURN(rc); @@ -940,17 +928,17 @@ static int __osp_xattr_del(const struct lu_env *env, struct dt_object *dt, const struct lu_fid *fid; struct osp_object *o = dt2osp_obj(dt); struct osp_xattr_entry *oxe; - int size = strlen(name); int rc; - update = out_find_create_update_loc(th, dt); + update = dt_update_request_find_or_create(th, dt); if (IS_ERR(update)) return PTR_ERR(update); fid = lu_object_fid(&dt->do_lu); - rc = out_insert_update(env, update, OUT_XATTR_DEL, fid, 1, &size, - (const char **)&name); + rc = out_xattr_del_pack(env, &update->dur_buf, fid, name, + update->dur_batchid); + if (rc != 0 || o->opo_ooa == NULL) return rc; diff --git a/lustre/osp/osp_sync.c b/lustre/osp/osp_sync.c index 8bbd938..2d03418 100644 --- a/lustre/osp/osp_sync.c +++ b/lustre/osp/osp_sync.c @@ -44,6 +44,7 @@ #define DEBUG_SUBSYSTEM S_MDS #include +#include #include "osp_internal.h" static int osp_sync_id_traction_init(struct osp_device *d); @@ -553,25 +554,25 @@ static int osp_prep_unlink_update_req(const struct lu_env *env, struct llog_unlink64_rec *rec = (struct llog_unlink64_rec *)h; struct dt_update_request *update = NULL; struct ptlrpc_request *req; - const char *buf; struct llog_cookie lcookie; - int size; + const void *buf; + __u16 size; int rc; ENTRY; - update = out_create_update_req(&osp->opd_dt_dev); + update = dt_update_request_create(&osp->opd_dt_dev); if (IS_ERR(update)) RETURN(PTR_ERR(update)); /* This can only happens for unlink slave directory, so decrease * ref for ".." and "." */ - rc = out_insert_update(env, update, OUT_REF_DEL, &rec->lur_fid, 0, - NULL, NULL); + rc = out_update_pack(env, &update->dur_buf, OUT_REF_DEL, &rec->lur_fid, + 0, NULL, NULL, 0); if (rc != 0) GOTO(out, rc); - rc = out_insert_update(env, update, OUT_REF_DEL, &rec->lur_fid, 0, - NULL, NULL); + rc = out_update_pack(env, &update->dur_buf, OUT_REF_DEL, &rec->lur_fid, + 0, NULL, NULL, 0); if (rc != 0) GOTO(out, rc); @@ -579,15 +580,15 @@ static int osp_prep_unlink_update_req(const struct lu_env *env, lcookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT; lcookie.lgc_index = h->lrh_index; size = sizeof(lcookie); - buf = (const char *)&lcookie; + buf = &lcookie; - rc = out_insert_update(env, update, OUT_DESTROY, &rec->lur_fid, 1, - &size, &buf); + rc = out_update_pack(env, &update->dur_buf, OUT_DESTROY, &rec->lur_fid, + 1, &size, &buf, 0); if (rc != 0) GOTO(out, rc); rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import, - update->dur_req, &req); + update->dur_buf.ub_req, &req); if (rc != 0) GOTO(out, rc); @@ -602,7 +603,7 @@ static int osp_prep_unlink_update_req(const struct lu_env *env, *reqp = req; out: if (update != NULL) - out_destroy_update_req(update); + dt_update_request_destroy(update); RETURN(rc); } diff --git a/lustre/osp/osp_trans.c b/lustre/osp/osp_trans.c index 39a42bc..7d4ace8 100644 --- a/lustre/osp/osp_trans.c +++ b/lustre/osp/osp_trans.c @@ -195,7 +195,7 @@ static int osp_async_update_interpret(const struct lu_env *env, index++; } - out_destroy_update_req(dt_update); + dt_update_request_destroy(dt_update); return 0; } @@ -220,7 +220,7 @@ int osp_unplug_async_request(const struct lu_env *env, int rc; rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import, - update->dur_req, &req); + update->dur_buf.ub_req, &req); if (rc != 0) { struct osp_async_request *oar; struct osp_async_request *next; @@ -232,7 +232,7 @@ int osp_unplug_async_request(const struct lu_env *env, oar->oar_data, 0, rc); osp_async_request_fini(env, oar); } - out_destroy_update_req(update); + dt_update_request_destroy(update); } else { LASSERT(list_empty(&update->dur_list)); @@ -265,7 +265,7 @@ osp_find_or_create_async_update_request(struct osp_device *osp) if (update != NULL) return update; - update = out_create_update_req(&osp->opd_dt_dev); + update = dt_update_request_create(&osp->opd_dt_dev); if (!IS_ERR(update)) osp->opd_async_requests = update; @@ -296,9 +296,9 @@ osp_find_or_create_async_update_request(struct osp_device *osp) * \retval 0 for success * \retval negative error number on failure */ -int osp_insert_async_request(const struct lu_env *env, - int op, struct osp_object *obj, int count, - int *lens, const char **bufs, void *data, +int osp_insert_async_request(const struct lu_env *env, enum update_type op, + struct osp_object *obj, int count, + __u16 *lens, const void **bufs, void *data, osp_async_request_interpreter_t interpreter) { struct osp_async_request *oar; @@ -316,9 +316,10 @@ int osp_insert_async_request(const struct lu_env *env, GOTO(out, rc = PTR_ERR(update)); again: - rc = out_insert_update(env, update, op, lu_object_fid(osp2lu_obj(obj)), - count, lens, bufs); /* The queue is full. */ + rc = out_update_pack(env, &update->dur_buf, op, + lu_object_fid(osp2lu_obj(obj)), count, lens, bufs, + 0); if (rc == -E2BIG) { osp->opd_async_requests = NULL; mutex_unlock(&osp->opd_async_requests_mutex); @@ -442,7 +443,7 @@ static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp, list_del_init(&dt_update->dur_list); rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import, - dt_update->dur_req, &req); + dt_update->dur_buf.ub_req, &req); if (rc == 0) { args = ptlrpc_req_async_args(req); args->oaua_update = dt_update; @@ -451,7 +452,7 @@ static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp, osp_async_update_interpret; ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1); } else { - out_destroy_update_req(dt_update); + dt_update_request_destroy(dt_update); } } else { th->th_sync = 1; @@ -549,8 +550,9 @@ int osp_trans_stop(const struct lu_env *env, struct dt_device *dt, goto put; } - if (dt_update->dur_req->ourq_count == 0) { - out_destroy_update_req(dt_update); + if (dt_update->dur_buf.ub_req == NULL || + dt_update->dur_buf.ub_req->ourq_count == 0) { + dt_update_request_destroy(dt_update); goto put; } @@ -568,7 +570,7 @@ int osp_trans_stop(const struct lu_env *env, struct dt_device *dt, } if (rc != 0) { - out_destroy_update_req(dt_update); + dt_update_request_destroy(dt_update); goto put; } @@ -578,14 +580,14 @@ int osp_trans_stop(const struct lu_env *env, struct dt_device *dt, obd_put_request_slot(cli); } else { rc = th->th_result; - out_destroy_update_req(dt_update); + dt_update_request_destroy(dt_update); } } else { if (tu->tu_sent_after_local_trans) rc = osp_trans_trigger(env, dt2osp_dev(dt), dt_update, th, false); rc = dt_update->dur_rc; - out_destroy_update_req(dt_update); + dt_update_request_destroy(dt_update); } put: diff --git a/lustre/ptlrpc/Makefile.in b/lustre/ptlrpc/Makefile.in index 66f81a7..d591555 100644 --- a/lustre/ptlrpc/Makefile.in +++ b/lustre/ptlrpc/Makefile.in @@ -19,7 +19,7 @@ ptlrpc_objs += nrs_tbf.o errno.o target_objs := $(TARGET)tgt_main.o $(TARGET)tgt_lastrcvd.o target_objs += $(TARGET)tgt_handler.o $(TARGET)out_handler.o -target_objs += $(TARGET)out_lib.o $(TARGET)out_lib.o +target_objs += $(TARGET)out_lib.o ptlrpc-objs := $(ldlm_objs) $(ptlrpc_objs) @SERVER_TRUE@ptlrpc-objs += $(target_objs) diff --git a/lustre/ptlrpc/pack_generic.c b/lustre/ptlrpc/pack_generic.c index 543dd65..48984bf 100644 --- a/lustre/ptlrpc/pack_generic.c +++ b/lustre/ptlrpc/pack_generic.c @@ -51,7 +51,6 @@ #include #include #include -#include static inline int lustre_msg_hdr_size_v2(int count) { diff --git a/lustre/target/out_lib.c b/lustre/target/out_lib.c index 061dac5..81c645b 100644 --- a/lustre/target/out_lib.c +++ b/lustre/target/out_lib.c @@ -34,6 +34,10 @@ #include #include #include +#include + +#define OUT_UPDATE_BUFFER_SIZE_ADD 4096 +#define OUT_UPDATE_BUFFER_SIZE_MAX (256 * 4096) /* 1MB update size now */ struct dt_update_request* out_find_update(struct thandle_update *tu, struct dt_device *dt_dev) @@ -49,53 +53,95 @@ out_find_update(struct thandle_update *tu, struct dt_device *dt_dev) } EXPORT_SYMBOL(out_find_update); -void out_destroy_update_req(struct dt_update_request *dt_update) +static struct object_update_request *object_update_request_alloc(size_t size) +{ + struct object_update_request *ourq; + + OBD_ALLOC_LARGE(ourq, size); + if (ourq == NULL) + RETURN(ERR_PTR(-ENOMEM)); + + ourq->ourq_magic = UPDATE_REQUEST_MAGIC; + ourq->ourq_count = 0; + + RETURN(ourq); +} + +static void object_update_request_free(struct object_update_request *ourq, + size_t ourq_size) +{ + if (ourq != NULL) + OBD_FREE_LARGE(ourq, ourq_size); +} + +void dt_update_request_destroy(struct dt_update_request *dt_update) { if (dt_update == NULL) return; list_del(&dt_update->dur_list); - if (dt_update->dur_req != NULL) - OBD_FREE_LARGE(dt_update->dur_req, dt_update->dur_req_len); + object_update_request_free(dt_update->dur_buf.ub_req, + dt_update->dur_buf.ub_req_size); OBD_FREE_PTR(dt_update); - return; } -EXPORT_SYMBOL(out_destroy_update_req); +EXPORT_SYMBOL(dt_update_request_destroy); -struct dt_update_request *out_create_update_req(struct dt_device *dt) +/** + * Allocate and initialize dt_update_request + * + * dt_update_request is being used to track updates being executed on + * this dt_device(OSD or OSP). The update buffer will be 8k initially, + * and increased if needed. + * + * \param [in] dt dt device + * + * \retval dt_update_request being allocated if succeed + * \retval ERR_PTR(errno) if failed + */ +struct dt_update_request *dt_update_request_create(struct dt_device *dt) { struct dt_update_request *dt_update; + struct object_update_request *ourq; OBD_ALLOC_PTR(dt_update); if (!dt_update) return ERR_PTR(-ENOMEM); - OBD_ALLOC_LARGE(dt_update->dur_req, OUT_UPDATE_INIT_BUFFER_SIZE); - if (dt_update->dur_req == NULL) { + ourq = object_update_request_alloc(OUT_UPDATE_INIT_BUFFER_SIZE); + if (IS_ERR(ourq)) { OBD_FREE_PTR(dt_update); - return ERR_PTR(-ENOMEM); + return ERR_CAST(ourq); } - dt_update->dur_req_len = OUT_UPDATE_INIT_BUFFER_SIZE; + dt_update->dur_buf.ub_req = ourq; + dt_update->dur_buf.ub_req_size = OUT_UPDATE_INIT_BUFFER_SIZE; + INIT_LIST_HEAD(&dt_update->dur_list); dt_update->dur_dt = dt; dt_update->dur_batchid = 0; - dt_update->dur_req->ourq_magic = UPDATE_REQUEST_MAGIC; - dt_update->dur_req->ourq_count = 0; INIT_LIST_HEAD(&dt_update->dur_cb_items); return dt_update; } -EXPORT_SYMBOL(out_create_update_req); +EXPORT_SYMBOL(dt_update_request_create); /** + * Find or create dt_update_request. + * * Find or create one loc in th_dev/dev_obj_update for the update, * Because only one thread can access this thandle, no need * lock now. + * + * \param[in] th transaction handle + * \param[in] dt lookup update request by dt_object + * + * \retval pointer of dt_update_request if it can be created + * or found. + * \retval ERR_PTR(errno) if it can not be created or found. */ -struct dt_update_request *out_find_create_update_loc(struct thandle *th, - struct dt_object *dt) +struct dt_update_request * +dt_update_request_find_or_create(struct thandle *th, struct dt_object *dt) { struct dt_device *dt_dev = lu2dt_dev(dt->do_lu.lo_dev); struct thandle_update *tu = th->th_update; @@ -116,7 +162,7 @@ struct dt_update_request *out_find_create_update_loc(struct thandle *th, if (update != NULL) RETURN(update); - update = out_create_update_req(dt_dev); + update = dt_update_request_create(dt_dev); if (IS_ERR(update)) RETURN(update); @@ -127,8 +173,22 @@ struct dt_update_request *out_find_create_update_loc(struct thandle *th, RETURN(update); } -EXPORT_SYMBOL(out_find_create_update_loc); +EXPORT_SYMBOL(dt_update_request_find_or_create); +/** + * Prepare update request. + * + * Prepare OUT update ptlrpc request, and the request usually includes + * all of updates (stored in \param ureq) from one operation. + * + * \param[in] env execution environment + * \param[in] imp import on which ptlrpc request will be sent + * \param[in] ureq hold all of updates which will be packed into the req + * \param[in] reqp request to be created + * + * \retval 0 if preparation succeeds. + * \retval negative errno if preparation fails. + */ int out_prep_update_req(const struct lu_env *env, struct obd_import *imp, const struct object_update_request *ureq, struct ptlrpc_request **reqp) @@ -168,6 +228,19 @@ int out_prep_update_req(const struct lu_env *env, struct obd_import *imp, } EXPORT_SYMBOL(out_prep_update_req); +/** + * Send update RPC. + * + * Send update request to the remote MDT synchronously. + * + * \param[in] env execution environment + * \param[in] imp import on which ptlrpc request will be sent + * \param[in] dt_update hold all of updates which will be packed into the req + * \param[in] reqp request to be created + * + * \retval 0 if RPC succeeds. + * \retval negative errno if RPC fails. + */ int out_remote_sync(const struct lu_env *env, struct obd_import *imp, struct dt_update_request *dt_update, struct ptlrpc_request **reqp) @@ -176,7 +249,7 @@ int out_remote_sync(const struct lu_env *env, struct obd_import *imp, int rc; ENTRY; - rc = out_prep_update_req(env, imp, dt_update->dur_req, &req); + rc = out_prep_update_req(env, imp, dt_update->dur_buf.ub_req, &req); if (rc != 0) RETURN(rc); @@ -202,98 +275,373 @@ int out_remote_sync(const struct lu_env *env, struct obd_import *imp, } EXPORT_SYMBOL(out_remote_sync); -static int out_resize_update_req(struct dt_update_request *dt_update, - int new_size) +/** + * resize update buffer + * + * Extend the update buffer by new_size. + * + * \param[in] ubuf update buffer to be extended + * \param[in] new_size new size of the update buffer + * + * \retval 0 if extending succeeds. + * \retval negative errno if extending fails. + */ +static int update_buffer_resize(struct update_buffer *ubuf, size_t new_size) { struct object_update_request *ureq; - LASSERT(new_size > dt_update->dur_req_len); - - CDEBUG(D_INFO, "%s: resize update_size from %d to %d\n", - dt_update->dur_dt->dd_lu_dev.ld_obd->obd_name, - dt_update->dur_req_len, new_size); + if (new_size > ubuf->ub_req_size) + return 0; OBD_ALLOC_LARGE(ureq, new_size); if (ureq == NULL) return -ENOMEM; - memcpy(ureq, dt_update->dur_req, - object_update_request_size(dt_update->dur_req)); + memcpy(ureq, ubuf->ub_req, ubuf->ub_req_size); - OBD_FREE_LARGE(dt_update->dur_req, dt_update->dur_req_len); + OBD_FREE_LARGE(ubuf->ub_req, ubuf->ub_req_size); - dt_update->dur_req = ureq; - dt_update->dur_req_len = new_size; + ubuf->ub_req = ureq; + ubuf->ub_req_size = new_size; return 0; } -#define OUT_UPDATE_BUFFER_SIZE_ADD 4096 -#define OUT_UPDATE_BUFFER_SIZE_MAX (64 * 4096) /* 64KB update size now */ /** - * Insert the update into the th_bufs for the device. - */ - -int out_insert_update(const struct lu_env *env, - struct dt_update_request *update, int op, - const struct lu_fid *fid, int params_count, int *lens, - const char **bufs) + * Pack the header of object_update_request + * + * Packs updates into the update_buffer header, which will either be sent to + * the remote MDT or stored in the local update log. The maximum update buffer + * size is 1MB for now. + * + * \param[in] env execution environment + * \param[in] ubuf update bufer which it will pack the update in + * \param[in] op update operation + * \param[in] fid object FID for this update + * \param[in] param_count parameters count for this update + * \param[in] lens each parameters length of this update + * \param[in] batchid batchid(transaction no) of this update + * + * \retval 0 pack update succeed. + * negative errno pack update failed. + **/ +static struct object_update* +out_update_header_pack(const struct lu_env *env, struct update_buffer *ubuf, + enum update_type op, const struct lu_fid *fid, + int params_count, __u16 *param_sizes, __u64 batchid) { - struct object_update_request *ureq = update->dur_req; - int ureq_len; + struct object_update_request *ureq = ubuf->ub_req; + size_t ureq_size = ubuf->ub_req_size; struct object_update *obj_update; struct object_update_param *param; - int update_length; + size_t update_size; int rc = 0; - char *ptr; - int i; + unsigned int i; ENTRY; /* Check update size to make sure it can fit into the buffer */ - ureq_len = object_update_request_size(ureq); - update_length = offsetof(struct object_update, ou_params[0]); + ureq_size = object_update_request_size(ureq); + update_size = offsetof(struct object_update, ou_params[0]); for (i = 0; i < params_count; i++) - update_length += cfs_size_round(lens[i] + sizeof(*param)); + update_size += cfs_size_round(param_sizes[i] + sizeof(*param)); - if (unlikely(cfs_size_round(ureq_len + update_length) > - update->dur_req_len)) { - int new_size = update->dur_req_len; + if (unlikely(cfs_size_round(ureq_size + update_size) > + ubuf->ub_req_size)) { + size_t new_size = ubuf->ub_req_size; /* enlarge object update request size */ while (new_size < - cfs_size_round(ureq_len + update_length)) + cfs_size_round(ureq_size + update_size)) new_size += OUT_UPDATE_BUFFER_SIZE_ADD; if (new_size >= OUT_UPDATE_BUFFER_SIZE_MAX) - RETURN(-E2BIG); + RETURN(ERR_PTR(-E2BIG)); - rc = out_resize_update_req(update, new_size); - if (rc != 0) - RETURN(rc); + rc = update_buffer_resize(ubuf, new_size); + if (rc < 0) + RETURN(ERR_PTR(rc)); - ureq = update->dur_req; + ureq = ubuf->ub_req; } /* fill the update into the update buffer */ - obj_update = (struct object_update *)((char *)ureq + ureq_len); + obj_update = (struct object_update *)((char *)ureq + ureq_size); obj_update->ou_fid = *fid; obj_update->ou_type = op; obj_update->ou_params_count = (__u16)params_count; - obj_update->ou_batchid = update->dur_batchid; + obj_update->ou_batchid = batchid; param = &obj_update->ou_params[0]; for (i = 0; i < params_count; i++) { - param->oup_len = lens[i]; - ptr = ¶m->oup_buf[0]; - memcpy(¶m->oup_buf[0], bufs[i], lens[i]); + param->oup_len = param_sizes[i]; param = (struct object_update_param *)((char *)param + object_update_param_size(param)); } - ureq->ourq_count++; - CDEBUG(D_INFO, "%s: %p "DFID" idx %d: op %d params %d:%d\n", - update->dur_dt->dd_lu_dev.ld_obd->obd_name, ureq, PFID(fid), - ureq->ourq_count, op, params_count, ureq_len + update_length); + CDEBUG(D_INFO, "%p "DFID" idx %u: op %d params %d:%d\n", + ureq, PFID(fid), ureq->ourq_count, op, params_count, + (int)update_size); - RETURN(rc); + RETURN(obj_update); +} + +/** + * Packs one update into the update_buffer. + * + * \param[in] env execution environment + * \param[in] ubuf bufer where update will be packed + * \param[in] op update operation (enum update_type) + * \param[in] fid object FID for this update + * \param[in] param_count number of parameters for this update + * \param[in] param_sizes array of parameters length of this update + * \param[in] param_bufs parameter buffers + * \param[in] batchid transaction no of this update, plus mdt_index, which + * will be globally unique + * + * \retval = 0 if updates packing succeeds + * \retval negative errno if updates packing fails + **/ +int out_update_pack(const struct lu_env *env, struct update_buffer *ubuf, + enum update_type op, const struct lu_fid *fid, + int params_count, __u16 *param_sizes, + const void **param_bufs, __u64 batchid) +{ + struct object_update *update; + struct object_update_param *param; + unsigned int i; + ENTRY; + + update = out_update_header_pack(env, ubuf, op, fid, params_count, + param_sizes, batchid); + if (IS_ERR(update)) + RETURN(PTR_ERR(update)); + + param = &update->ou_params[0]; + for (i = 0; i < params_count; i++) { + memcpy(¶m->oup_buf[0], param_bufs[i], param_sizes[i]); + param = (struct object_update_param *)((char *)param + + object_update_param_size(param)); + } + + RETURN(0); +} +EXPORT_SYMBOL(out_update_pack); + +/** + * Pack various updates into the update_buffer. + * + * The following functions pack different updates into the update_buffer + * So parameters of these API is basically same as its correspondent OSD/OSP + * API, for detail description of these parameters see osd_handler.c or + * osp_md_object.c. + * + * \param[in] env execution environment + * \param[in] ubuf update buffer + * \param[in] fid fid of this object for the update + * \param[in] batchid batch id of this update + * + * \retval 0 if insertion succeeds. + * \retval negative errno if insertion fails. + */ +int out_create_pack(const struct lu_env *env, struct update_buffer *ubuf, + const struct lu_fid *fid, struct lu_attr *attr, + struct dt_allocation_hint *hint, + struct dt_object_format *dof, __u64 batchid) +{ + struct obdo *obdo; + __u16 sizes[2] = {sizeof(*obdo), 0}; + int buf_count = 1; + const struct lu_fid *fid1 = NULL; + struct object_update *update; + ENTRY; + + if (hint != NULL && hint->dah_parent) { + fid1 = lu_object_fid(&hint->dah_parent->do_lu); + sizes[1] = sizeof(*fid1); + buf_count++; + } + + update = out_update_header_pack(env, ubuf, OUT_CREATE, fid, + buf_count, sizes, batchid); + if (IS_ERR(update)) + RETURN(PTR_ERR(update)); + + obdo = object_update_param_get(update, 0, NULL); + obdo->o_valid = 0; + obdo_from_la(obdo, attr, attr->la_valid); + lustre_set_wire_obdo(NULL, obdo, obdo); + if (fid1 != NULL) { + struct lu_fid *fid; + fid = object_update_param_get(update, 1, NULL); + fid_cpu_to_le(fid, fid1); + } + + RETURN(0); +} +EXPORT_SYMBOL(out_create_pack); + +int out_ref_del_pack(const struct lu_env *env, struct update_buffer *ubuf, + const struct lu_fid *fid, __u64 batchid) +{ + return out_update_pack(env, ubuf, OUT_REF_DEL, fid, 0, NULL, NULL, + batchid); +} +EXPORT_SYMBOL(out_ref_del_pack); + +int out_ref_add_pack(const struct lu_env *env, struct update_buffer *ubuf, + const struct lu_fid *fid, __u64 batchid) +{ + return out_update_pack(env, ubuf, OUT_REF_ADD, fid, 0, NULL, NULL, + batchid); +} +EXPORT_SYMBOL(out_ref_add_pack); + +int out_attr_set_pack(const struct lu_env *env, struct update_buffer *ubuf, + const struct lu_fid *fid, const struct lu_attr *attr, + __u64 batchid) +{ + struct object_update *update; + struct obdo *obdo; + __u16 size = sizeof(*obdo); + ENTRY; + + update = out_update_header_pack(env, ubuf, OUT_ATTR_SET, fid, 1, + &size, batchid); + if (IS_ERR(update)) + RETURN(PTR_ERR(update)); + + obdo = object_update_param_get(update, 0, NULL); + obdo->o_valid = 0; + obdo_from_la(obdo, attr, attr->la_valid); + lustre_set_wire_obdo(NULL, obdo, obdo); + + RETURN(0); +} +EXPORT_SYMBOL(out_attr_set_pack); + +int out_xattr_set_pack(const struct lu_env *env, struct update_buffer *ubuf, + const struct lu_fid *fid, const struct lu_buf *buf, + const char *name, int flag, __u64 batchid) +{ + __u16 sizes[3] = {strlen(name) + 1, buf->lb_len, sizeof(flag)}; + const void *bufs[3] = {(char *)name, (char *)buf->lb_buf, + (char *)&flag}; + + return out_update_pack(env, ubuf, OUT_XATTR_SET, fid, + ARRAY_SIZE(sizes), sizes, bufs, batchid); +} +EXPORT_SYMBOL(out_xattr_set_pack); + +int out_xattr_del_pack(const struct lu_env *env, struct update_buffer *ubuf, + const struct lu_fid *fid, const char *name, + __u64 batchid) +{ + __u16 size = strlen(name) + 1; + + return out_update_pack(env, ubuf, OUT_XATTR_DEL, fid, 1, &size, + (const void **)&name, batchid); +} +EXPORT_SYMBOL(out_xattr_del_pack); + + +int out_index_insert_pack(const struct lu_env *env, struct update_buffer *ubuf, + const struct lu_fid *fid, const struct dt_rec *rec, + const struct dt_key *key, __u64 batchid) +{ + struct dt_insert_rec *rec1 = (struct dt_insert_rec *)rec; + struct lu_fid rec_fid; + __u32 type = cpu_to_le32(rec1->rec_type); + __u16 sizes[3] = { strlen((char *)key) + 1, + sizeof(rec_fid), + sizeof(type) }; + const void *bufs[3] = { (char *)key, + (char *)&rec_fid, + (char *)&type }; + + fid_cpu_to_le(&rec_fid, rec1->rec_fid); + + return out_update_pack(env, ubuf, OUT_INDEX_INSERT, fid, + ARRAY_SIZE(sizes), sizes, bufs, batchid); +} +EXPORT_SYMBOL(out_index_insert_pack); + +int out_index_delete_pack(const struct lu_env *env, struct update_buffer *ubuf, + const struct lu_fid *fid, const struct dt_key *key, + __u64 batchid) +{ + __u16 size = strlen((char *)key) + 1; + const void *buf = key; + + return out_update_pack(env, ubuf, OUT_INDEX_DELETE, fid, 1, &size, + &buf, batchid); +} +EXPORT_SYMBOL(out_index_delete_pack); + +int out_object_destroy_pack(const struct lu_env *env, + struct update_buffer *ubuf, + const struct lu_fid *fid, __u64 batchid) +{ + return out_update_pack(env, ubuf, OUT_DESTROY, fid, 0, NULL, NULL, + batchid); +} +EXPORT_SYMBOL(out_object_destroy_pack); + +int out_write_pack(const struct lu_env *env, struct update_buffer *ubuf, + const struct lu_fid *fid, const struct lu_buf *buf, + loff_t pos, __u64 batchid) +{ + __u16 sizes[2] = {buf->lb_len, sizeof(pos)}; + const void *bufs[2] = {(char *)buf->lb_buf, (char *)&pos}; + int rc; + + pos = cpu_to_le64(pos); + + rc = out_update_pack(env, ubuf, OUT_WRITE, fid, ARRAY_SIZE(sizes), + sizes, bufs, batchid); + return rc; +} +EXPORT_SYMBOL(out_write_pack); + +/** + * Pack various readonly updates into the update_buffer. + * + * The following update funcs are only used by read-only ops, lookup, + * getattr etc, so it does not need transaction here. Currently they + * are only used by OSP. + * + * \param[in] env execution environment + * \param[in] fid fid of this object for the update + * \param[in] ubuf update buffer + * + * \retval 0 if packing succeeds. + * \retval negative errno if packing fails. + */ +int out_index_lookup_pack(const struct lu_env *env, struct update_buffer *ubuf, + const struct lu_fid *fid, struct dt_rec *rec, + const struct dt_key *key) +{ + const void *name = key; + __u16 size = strlen((char *)name) + 1; + + return out_update_pack(env, ubuf, OUT_INDEX_LOOKUP, fid, 1, &size, + &name, 0); +} +EXPORT_SYMBOL(out_index_lookup_pack); + +int out_attr_get_pack(const struct lu_env *env, struct update_buffer *ubuf, + const struct lu_fid *fid) +{ + return out_update_pack(env, ubuf, OUT_ATTR_GET, fid, 0, NULL, NULL, 0); +} +EXPORT_SYMBOL(out_attr_get_pack); + +int out_xattr_get_pack(const struct lu_env *env, struct update_buffer *ubuf, + const struct lu_fid *fid, const char *name) +{ + __u16 size; + + LASSERT(name != NULL); + size = strlen(name) + 1; + return out_update_pack(env, ubuf, OUT_XATTR_GET, fid, 1, &size, + (const void **)&name, 0); } -EXPORT_SYMBOL(out_insert_update); +EXPORT_SYMBOL(out_xattr_get_pack); -- 1.8.3.1