From: wang di Date: Tue, 8 Sep 2015 15:05:49 +0000 (-0700) Subject: LU-6741 osp: bulk transfer for osp_md_read X-Git-Tag: 2.7.62~47 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=28638e84cde481d0216ffc1d170e272bc2cccc76 LU-6741 osp: bulk transfer for osp_md_read Do buffer bulk read for osp_md_read(), so it would be more efficient to retrieve update logs from remote target. Signed-off-by: wang di Change-Id: Idcb7ef402d02ad46a33cb4d913763235a6215b5b Reviewed-on: http://review.whamcloud.com/15899 Tested-by: Jenkins Reviewed-by: James Simmons Reviewed-by: Lai Siyao Tested-by: Maloo Reviewed-by: Oleg Drokin --- diff --git a/lustre/include/lu_object.h b/lustre/include/lu_object.h index 488308e..c0c1e68 100644 --- a/lustre/include/lu_object.h +++ b/lustre/include/lu_object.h @@ -905,6 +905,16 @@ struct lu_rdpg { struct page **rp_pages; }; +/* read buffer params, should be filled out by out */ +struct lu_rdbuf { + /** count in bytes */ + unsigned int rb_bytes; + /** number of pages */ + unsigned int rb_nbufs; + /** pointers to pages */ + struct lu_buf **rb_bufs; +}; + enum lu_xattr_flags { LU_XATTR_REPLACE = (1 << 0), LU_XATTR_CREATE = (1 << 1) diff --git a/lustre/include/lu_target.h b/lustre/include/lu_target.h index 9b62156..358f41e 100644 --- a/lustre/include/lu_target.h +++ b/lustre/include/lu_target.h @@ -372,6 +372,7 @@ int tgt_sec_ctx_init(struct tgt_session_info *tsi); int tgt_sec_ctx_init_cont(struct tgt_session_info *tsi); int tgt_sec_ctx_fini(struct tgt_session_info *tsi); int tgt_sendpage(struct tgt_session_info *tsi, struct lu_rdpg *rdpg, int nob); +int tgt_send_buffer(struct tgt_session_info *tsi, struct lu_rdbuf *rdbuf); int tgt_validate_obdo(struct tgt_session_info *tsi, struct obdo *oa); int tgt_sync(const struct lu_env *env, struct lu_target *tgt, struct dt_object *obj, __u64 start, __u64 end); diff --git a/lustre/include/lustre_update.h b/lustre/include/lustre_update.h index ab81ee6..027ab12 100644 --- a/lustre/include/lustre_update.h +++ b/lustre/include/lustre_update.h @@ -34,8 +34,8 @@ #include #define OUT_UPDATE_INIT_BUFFER_SIZE 4096 -/* 16KB, the current biggest size is llog header(8KB) */ -#define OUT_UPDATE_REPLY_SIZE 16384 +#define OUT_UPDATE_REPLY_SIZE 4096 +#define OUT_BULK_BUFFER_SIZE 4096 struct dt_key; struct dt_rec; diff --git a/lustre/osp/osp_md_object.c b/lustre/osp/osp_md_object.c index 4aada8f..8021c03 100644 --- a/lustre/osp/osp_md_object.c +++ b/lustre/osp/osp_md_object.c @@ -1115,98 +1115,100 @@ static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt, static ssize_t osp_md_read(const struct lu_env *env, struct dt_object *dt, struct lu_buf *rbuf, loff_t *pos) { - struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); - struct dt_device *dt_dev = &osp->opd_dt_dev; - struct lu_buf *lbuf = &osp_env_info(env)->osi_lb2; - struct osp_update_request *update = NULL; + struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); + struct dt_device *dt_dev = &osp->opd_dt_dev; + struct lu_buf *lbuf = &osp_env_info(env)->osi_lb2; + char *ptr = rbuf->lb_buf; + struct osp_update_request *update = NULL; + struct ptlrpc_request *req = NULL; + struct out_read_reply *orr; + struct ptlrpc_bulk_desc *desc; struct object_update_reply *reply; - struct out_read_reply *orr; - char *ptr = rbuf->lb_buf; - struct ptlrpc_request *req = NULL; - size_t total_length = rbuf->lb_len; - size_t max_buf_size; - loff_t offset = *pos; - int rc; + __u32 left_size; + int nbufs; + int i; + int rc; ENTRY; - /* Calculate the maxium buffer length for each read request */ - max_buf_size = OUT_UPDATE_REPLY_SIZE - cfs_size_round(sizeof(*orr)) - - cfs_size_round(sizeof(struct object_update_result)) - - cfs_size_round(offsetof(struct object_update_reply, - ourp_lens[1])); - while (total_length > 0) { - size_t read_length; - - /* Because it needs send the update buffer right away, - * just create an update buffer, instead of attaching the - * update_remote list of the thandle. */ - update = osp_update_request_create(dt_dev); - if (IS_ERR(update)) - GOTO(out, rc = PTR_ERR(update)); - - read_length = total_length > max_buf_size ? - max_buf_size : total_length; - - rc = osp_update_rpc_pack(env, read, update, OUT_READ, - lu_object_fid(&dt->do_lu), - read_length, offset); - if (rc != 0) { - CERROR("%s: cannot insert update: rc = %d\n", - dt_dev->dd_lu_dev.ld_obd->obd_name, rc); - GOTO(out, rc); - } - - rc = osp_remote_sync(env, osp, update, &req); - if (rc < 0) - GOTO(out, rc); + /* Because it needs send the update buffer right away, + * just create an update buffer, instead of attaching the + * update_remote list of the thandle. */ + update = osp_update_request_create(dt_dev); + if (IS_ERR(update)) + GOTO(out, rc = PTR_ERR(update)); - reply = req_capsule_server_sized_get(&req->rq_pill, - &RMF_OUT_UPDATE_REPLY, - OUT_UPDATE_REPLY_SIZE); + rc = osp_update_rpc_pack(env, read, update, OUT_READ, + lu_object_fid(&dt->do_lu), + rbuf->lb_len, *pos); + if (rc != 0) { + CERROR("%s: cannot insert update: rc = %d\n", + dt_dev->dd_lu_dev.ld_obd->obd_name, rc); + GOTO(out, rc); + } - if (reply->ourp_magic != UPDATE_REPLY_MAGIC) { - CERROR("%s: invalid update reply magic %x expected %x:" - " rc = %d\n", dt_dev->dd_lu_dev.ld_obd->obd_name, - reply->ourp_magic, UPDATE_REPLY_MAGIC, -EPROTO); - GOTO(out, rc = -EPROTO); - } + rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import, update, + &req); + if (rc != 0) + GOTO(out, rc); - rc = object_update_result_data_get(reply, lbuf, 0); - if (rc < 0) - GOTO(out, rc); + nbufs = (rbuf->lb_len + OUT_BULK_BUFFER_SIZE - 1) / + OUT_BULK_BUFFER_SIZE; + /* allocate bulk descriptor */ + desc = ptlrpc_prep_bulk_imp(req, nbufs, 1, + PTLRPC_BULK_PUT_SINK | PTLRPC_BULK_BUF_KVEC, + MDS_BULK_PORTAL, &ptlrpc_bulk_kvec_ops); + if (desc == NULL) + GOTO(out, rc = -ENOMEM); + + /* split the buffer into small chunk size */ + left_size = rbuf->lb_len; + for (i = 0; i < nbufs; i++) { + int read_size; + + read_size = left_size > OUT_BULK_BUFFER_SIZE ? + OUT_BULK_BUFFER_SIZE : left_size; + desc->bd_frag_ops->add_iov_frag(desc, ptr, read_size); + + ptr += read_size; + } - if (lbuf->lb_len < sizeof(*orr)) - GOTO(out, rc = -EPROTO); + /* This will only be called with read-only update, and these updates + * might be used to retrieve update log during recovery process, so + * it will be allowed to send during recovery process */ + req->rq_allow_replay = 1; + req->rq_bulk_read = 1; + /* send request to master and wait for RPC to complete */ + rc = ptlrpc_queue_wait(req); + if (rc != 0) + GOTO(out, rc); - orr = lbuf->lb_buf; - orr_le_to_cpu(orr, orr); - offset = orr->orr_offset; - if (orr->orr_size > max_buf_size) - GOTO(out, rc = -EPROTO); + rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, + req->rq_bulk->bd_nob_transferred); + if (rc < 0) + GOTO(out, rc); - memcpy(ptr, orr->orr_data, orr->orr_size); - ptr += orr->orr_size; - total_length -= orr->orr_size; + reply = req_capsule_server_sized_get(&req->rq_pill, + &RMF_OUT_UPDATE_REPLY, + OUT_UPDATE_REPLY_SIZE); - CDEBUG(D_INFO, "%s: read "DFID" pos "LPU64" len %u left %zu\n", - osp->opd_obd->obd_name, PFID(lu_object_fid(&dt->do_lu)), - offset, orr->orr_size, total_length); + if (reply->ourp_magic != UPDATE_REPLY_MAGIC) { + CERROR("%s: invalid update reply magic %x expected %x:" + " rc = %d\n", dt_dev->dd_lu_dev.ld_obd->obd_name, + reply->ourp_magic, UPDATE_REPLY_MAGIC, -EPROTO); + GOTO(out, rc = -EPROTO); + } - if (orr->orr_size < read_length) - break; + rc = object_update_result_data_get(reply, lbuf, 0); + if (rc < 0) + GOTO(out, rc); - ptlrpc_req_finished(req); - osp_update_request_destroy(update); - req = NULL; - update = NULL; - } + if (lbuf->lb_len < sizeof(*orr)) + GOTO(out, rc = -EPROTO); - total_length = rbuf->lb_len - total_length; - *pos = offset; - CDEBUG(D_INFO, "%s: total read "DFID" pos "LPU64" len %zu\n", - osp->opd_obd->obd_name, PFID(lu_object_fid(&dt->do_lu)), - *pos, total_length); - GOTO(out, rc = (int)total_length); + orr = lbuf->lb_buf; + orr_le_to_cpu(orr, orr); + rc = orr->orr_size; + *pos = orr->orr_offset; out: if (req != NULL) ptlrpc_req_finished(req); @@ -1214,7 +1216,7 @@ out: if (update != NULL) osp_update_request_destroy(update); - return rc; + RETURN(rc); } /* These body operation will be used to write symlinks during migration etc */ diff --git a/lustre/target/out_handler.c b/lustre/target/out_handler.c index 002cda2..f4b0368 100644 --- a/lustre/target/out_handler.c +++ b/lustre/target/out_handler.c @@ -598,14 +598,17 @@ static int out_read(struct tgt_session_info *tsi) struct object_update *update = tti->tti_u.update.tti_update; struct dt_object *obj = tti->tti_u.update.tti_dt_object; struct object_update_reply *reply = tti->tti_u.update.tti_update_reply; - int index = tti->tti_u.update.tti_update_reply_index; + int index = tti->tti_u.update.tti_update_reply_index; + struct lu_rdbuf *rdbuf = &tti->tti_u.rdbuf.tti_rdbuf; struct object_update_result *update_result; - struct lu_buf *lbuf = &tti->tti_buf; struct out_read_reply *orr; - void *tmp; - size_t size; - __u64 pos; - int rc; + void *tmp; + size_t size; + size_t total_size = 0; + __u64 pos; + unsigned int i; + unsigned int nbufs; + int rc = 0; ENTRY; update_result = object_update_result_get(reply, index, NULL); @@ -631,32 +634,62 @@ static int out_read(struct tgt_session_info *tsi) } pos = le64_to_cpu(*(__u64 *)(tmp)); - /* Check if the read buffer can hold the read_size */ - if (size > OUT_UPDATE_REPLY_SIZE - - cfs_size_round(offsetof(struct object_update_reply, - ourp_lens[1])) - - cfs_size_round(sizeof(*update_result)) - - cfs_size_round(sizeof(*orr))) { - CERROR("%s: get %zu the biggest read size is %d: rc = %d\n", - tgt_name(tsi->tsi_tgt), size, OUT_UPDATE_REPLY_SIZE, - -EPROTO); - GOTO(out, rc = err_serious(-EPROTO)); - } - /* Put the offset into the begining of the buffer in reply */ orr = (struct out_read_reply *)update_result->our_data; - lbuf->lb_buf = orr->orr_data; - lbuf->lb_len = size; + nbufs = (size + OUT_BULK_BUFFER_SIZE - 1) / OUT_BULK_BUFFER_SIZE; + OBD_ALLOC(rdbuf->rb_bufs, nbufs * sizeof(rdbuf->rb_bufs[0])); + if (rdbuf->rb_bufs == NULL) + GOTO(out, rc = -ENOMEM); - dt_read_lock(env, obj, MOR_TGT_CHILD); - rc = dt_read(env, obj, lbuf, &pos); - dt_read_unlock(env, obj); - orr->orr_size = rc < 0 ? 0 : rc; + rdbuf->rb_nbufs = 0; + total_size = 0; + for (i = 0; i < nbufs; i++) { + __u32 read_size; + + OBD_ALLOC_PTR(rdbuf->rb_bufs[i]); + if (rdbuf->rb_bufs[i] == NULL) + GOTO(out_free, rc = -ENOMEM); + + read_size = size > OUT_BULK_BUFFER_SIZE ? + OUT_BULK_BUFFER_SIZE : size; + OBD_ALLOC(rdbuf->rb_bufs[i]->lb_buf, read_size); + if (rdbuf->rb_bufs[i] == NULL) + GOTO(out_free, rc = -ENOMEM); + + rdbuf->rb_bufs[i]->lb_len = read_size; + dt_read_lock(env, obj, MOR_TGT_CHILD); + rc = dt_read(env, obj, rdbuf->rb_bufs[i], &pos); + dt_read_unlock(env, obj); + + total_size += rc < 0 ? 0 : rc; + if (rc <= 0) + break; + + rdbuf->rb_nbufs++; + size -= read_size; + } + + rdbuf->rb_bytes = total_size; + /* send pages to client */ + rc = tgt_send_buffer(tsi, rdbuf); + if (rc < 0) + GOTO(out_free, rc); + + orr->orr_size = total_size; orr->orr_offset = pos; orr_cpu_to_le(orr, orr); update_result->our_datalen += orr->orr_size; +out_free: + for (i = 0; i < nbufs; i++) { + if (rdbuf->rb_bufs[i] != NULL) { + OBD_FREE(rdbuf->rb_bufs[i]->lb_buf, + rdbuf->rb_bufs[i]->lb_len); + OBD_FREE_PTR(rdbuf->rb_bufs[i]); + } + } + OBD_FREE(rdbuf->rb_bufs, nbufs * sizeof(rdbuf->rb_bufs[0])); out: /* Insert read buffer */ update_result->our_rc = ptlrpc_status_hton(rc); diff --git a/lustre/target/tgt_handler.c b/lustre/target/tgt_handler.c index 52fc489..83741c6 100644 --- a/lustre/target/tgt_handler.c +++ b/lustre/target/tgt_handler.c @@ -1107,6 +1107,35 @@ int tgt_obd_log_cancel(struct tgt_session_info *tsi) return err_serious(-EOPNOTSUPP); } +int tgt_send_buffer(struct tgt_session_info *tsi, struct lu_rdbuf *rdbuf) +{ + struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env); + struct ptlrpc_request *req = tgt_ses_req(tsi); + struct obd_export *exp = req->rq_export; + struct ptlrpc_bulk_desc *desc; + struct l_wait_info *lwi = &tti->tti_u.rdbuf.tti_wait_info; + int i; + int rc; + + ENTRY; + + desc = ptlrpc_prep_bulk_exp(req, rdbuf->rb_nbufs, 1, + PTLRPC_BULK_PUT_SOURCE | PTLRPC_BULK_BUF_KVEC, + MDS_BULK_PORTAL, &ptlrpc_bulk_kvec_ops); + if (desc == NULL) + RETURN(-ENOMEM); + + for (i = 0; i < rdbuf->rb_nbufs; i++) + desc->bd_frag_ops->add_iov_frag(desc, + rdbuf->rb_bufs[i]->lb_buf, + rdbuf->rb_bufs[i]->lb_len); + + rc = target_bulk_io(exp, desc, lwi); + ptlrpc_free_bulk(desc); + RETURN(rc); +} +EXPORT_SYMBOL(tgt_send_buffer); + int tgt_sendpage(struct tgt_session_info *tsi, struct lu_rdpg *rdpg, int nob) { struct tgt_thread_info *tti = tgt_th_info(tsi->tsi_env); diff --git a/lustre/target/tgt_internal.h b/lustre/target/tgt_internal.h index ace6af9..4644404 100644 --- a/lustre/target/tgt_internal.h +++ b/lustre/target/tgt_internal.h @@ -77,6 +77,12 @@ struct tgt_thread_info { struct l_wait_info tti_wait_info; } rdpg; struct { + /* for out_read() */ + struct lu_rdbuf tti_rdbuf; + /* for tgt_sendpage() */ + struct l_wait_info tti_wait_info; + } rdbuf; + struct { struct dt_object_format tti_update_dof; struct object_update_reply *tti_update_reply; struct object_update *tti_update;