From 70f092a0587866662735e1a6eaf27701a576370d Mon Sep 17 00:00:00 2001 From: Patrick Farrell Date: Mon, 16 Oct 2017 05:22:22 -0500 Subject: [PATCH 1/1] LU-1757 brw: add short io osc/ost transfer. There's no need to do target bulk io for small amount of data, and it requires extra network operations. For this case we add short i/o. When the i/o size is less than or equal to some number of pages (default 3), we encapsulate the data in the ptlrpc request. With this patch, 4k direct i/o read latency on a Cray Aries network (data is on flash on another node on the Aries) drops from ~280 microseconds to ~200 microseconds. Write latency drops from ~370 microseconds to ~350 microseconds (much more of write latency is waiting for write commit). This translates to about a 25-30% performance improvement on 4k direct i/o reads and 4k random reads. (Write performance improvement was small to non-existent.) Improvement was similar with 8k i/o. Buffered sequential i/o sees no improvement, because it does not perform small i/os. Performance data: access = file-per-process pattern = segmented (1 segment) ordering in a file = random offsets ordering inter file= no tasks offsets xfersize = 4096 bytes blocksize = 100 MiB nprocs xfsize shortio dio random Read (MB/s) 1 4k no yes no 15.0 8 4k no yes no 73.4 16 4k no yes no 81.1 1 4k yes yes no 16.5 8 4k yes yes no 95.2 16 4k yes yes no 107.3 1 4k no no yes 15.5 8 4k no no yes 73.4 16 4k no no yes 81.2 1 4k yes no yes 16.8 8 4k yes no yes 95.0 16 4k yes no yes 106.5 Note even when individual i/o performance is not improved, this change reduces the # of network operations required for small i/o, which can help on large systems. Signed-off-by: Patrick Farrell Change-Id: I70050935eaa0a5e98ca437e18e730be4aa0e4700 Reviewed-on: https://review.whamcloud.com/27767 Tested-by: Jenkins Reviewed-by: Alexey Lyashkov Reviewed-by: Alexandr Boyko Tested-by: Maloo Reviewed-by: Oleg Drokin --- lustre/include/lprocfs_status.h | 4 + lustre/include/lustre_export.h | 7 + lustre/include/lustre_net.h | 24 ++- lustre/include/lustre_osc.h | 12 ++ lustre/include/lustre_req_layout.h | 1 + lustre/include/obd.h | 1 + lustre/ldlm/ldlm_lib.c | 2 + lustre/llite/llite_lib.c | 2 +- lustre/obdclass/lprocfs_status.c | 50 ++++++ lustre/osc/lproc_osc.c | 4 + lustre/osc/osc_page.c | 47 ++++-- lustre/osc/osc_request.c | 134 ++++++++++++---- lustre/ptlrpc/layout.c | 19 ++- lustre/target/tgt_handler.c | 306 +++++++++++++++++++++++++------------ 14 files changed, 465 insertions(+), 148 deletions(-) diff --git a/lustre/include/lprocfs_status.h b/lustre/include/lprocfs_status.h index 8656a08..2afd68a 100644 --- a/lustre/include/lprocfs_status.h +++ b/lustre/include/lprocfs_status.h @@ -776,6 +776,10 @@ int lprocfs_obd_max_pages_per_rpc_seq_show(struct seq_file *m, void *data); ssize_t lprocfs_obd_max_pages_per_rpc_seq_write(struct file *file, const char __user *buffer, size_t count, loff_t *off); +int lprocfs_obd_short_io_bytes_seq_show(struct seq_file *m, void *data); +ssize_t lprocfs_obd_short_io_bytes_seq_write(struct file *file, + const char __user *buffer, + size_t count, loff_t *off); struct root_squash_info; int lprocfs_wr_root_squash(const char __user *buffer, unsigned long count, diff --git a/lustre/include/lustre_export.h b/lustre/include/lustre_export.h index c9ae7ea..ed511c8 100644 --- a/lustre/include/lustre_export.h +++ b/lustre/include/lustre_export.h @@ -409,6 +409,13 @@ static inline bool imp_connect_disp_stripe(struct obd_import *imp) return ocd->ocd_connect_flags & OBD_CONNECT_DISP_STRIPE; } +static inline bool imp_connect_shortio(struct obd_import *imp) +{ + struct obd_connect_data *ocd = &imp->imp_connect_data; + + return ocd->ocd_connect_flags & OBD_CONNECT_SHORTIO; +} + static inline __u64 exp_connect_ibits(struct obd_export *exp) { struct obd_connect_data *ocd; diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index b10ac91..08207fb 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -472,19 +472,30 @@ * - single object with 16 pages is 512 bytes * - OST_IO_MAXREQSIZE must be at least 1 page of cookies plus some spillover * - Must be a multiple of 1024 - * - actual size is about 18K */ -#define _OST_MAXREQSIZE_SUM (sizeof(struct lustre_msg) + \ - sizeof(struct ptlrpc_body) + \ - sizeof(struct obdo) + \ - sizeof(struct obd_ioobj) + \ - sizeof(struct niobuf_remote) * DT_MAX_BRW_PAGES) +#define _OST_MAXREQSIZE_BASE (sizeof(struct lustre_msg) + \ + sizeof(struct ptlrpc_body) + \ + sizeof(struct obdo) + \ + sizeof(struct obd_ioobj) + \ + sizeof(struct niobuf_remote)) +#define _OST_MAXREQSIZE_SUM (_OST_MAXREQSIZE_BASE + \ + sizeof(struct niobuf_remote) * \ + (DT_MAX_BRW_PAGES - 1)) /** * FIEMAP request can be 4K+ for now */ #define OST_MAXREQSIZE (16 * 1024) #define OST_IO_MAXREQSIZE max_t(int, OST_MAXREQSIZE, \ (((_OST_MAXREQSIZE_SUM - 1) | (1024 - 1)) + 1)) +/* Safe estimate of free space in standard RPC, provides upper limit for # of + * bytes of i/o to pack in RPC (skipping bulk transfer). */ +#define OST_SHORT_IO_SPACE (OST_IO_MAXREQSIZE - _OST_MAXREQSIZE_BASE) + +/* Actual size used for short i/o buffer. Calculation means this: + * At least one page (for large PAGE_SIZE), or 16 KiB, but not more + * than the available space aligned to a page boundary. */ +#define OBD_MAX_SHORT_IO_BYTES (min(max(PAGE_SIZE, 16UL * 1024UL), \ + OST_SHORT_IO_SPACE & PAGE_MASK)) #define OST_MAXREPSIZE (9 * 1024) #define OST_IO_MAXREPSIZE OST_MAXREPSIZE @@ -498,6 +509,7 @@ */ #define OST_IO_BUFSIZE max_t(int, OST_IO_MAXREQSIZE + 1024, 64 * 1024) + /* Macro to hide a typecast. */ #define ptlrpc_req_async_args(req) ((void *)&req->rq_async_args) diff --git a/lustre/include/lustre_osc.h b/lustre/include/lustre_osc.h index 124300e..df36985 100644 --- a/lustre/include/lustre_osc.h +++ b/lustre/include/lustre_osc.h @@ -445,6 +445,18 @@ struct osc_page { cfs_time_t ops_submit_time; }; +struct osc_brw_async_args { + struct obdo *aa_oa; + int aa_requested_nob; + int aa_nio_count; + u32 aa_page_count; + int aa_resends; + struct brw_page **aa_ppga; + struct client_obd *aa_cli; + struct list_head aa_oaps; + struct list_head aa_exts; +}; + extern struct kmem_cache *osc_lock_kmem; extern struct kmem_cache *osc_object_kmem; extern struct kmem_cache *osc_thread_kmem; diff --git a/lustre/include/lustre_req_layout.h b/lustre/include/lustre_req_layout.h index 46e6fa8..d2f3c52 100644 --- a/lustre/include/lustre_req_layout.h +++ b/lustre/include/lustre_req_layout.h @@ -317,6 +317,7 @@ extern struct req_msg_field RMF_RCS; extern struct req_msg_field RMF_FIEMAP_KEY; extern struct req_msg_field RMF_FIEMAP_VAL; extern struct req_msg_field RMF_OST_ID; +extern struct req_msg_field RMF_SHORT_IO; /* MGS config read message format */ extern struct req_msg_field RMF_MGS_CONFIG_BODY; diff --git a/lustre/include/obd.h b/lustre/include/obd.h index be2bec4..a19500a 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -246,6 +246,7 @@ struct client_obd { atomic_t cl_pending_r_pages; __u32 cl_max_pages_per_rpc; __u32 cl_max_rpcs_in_flight; + __u32 cl_short_io_bytes; struct obd_histogram cl_read_rpc_hist; struct obd_histogram cl_write_rpc_hist; struct obd_histogram cl_read_page_hist; diff --git a/lustre/ldlm/ldlm_lib.c b/lustre/ldlm/ldlm_lib.c index 0b0d778..c4865f9 100644 --- a/lustre/ldlm/ldlm_lib.c +++ b/lustre/ldlm/ldlm_lib.c @@ -409,6 +409,8 @@ int client_obd_setup(struct obd_device *obddev, struct lustre_cfg *lcfg) * from OFD after connecting. */ cli->cl_max_pages_per_rpc = PTLRPC_MAX_BRW_PAGES; + cli->cl_short_io_bytes = OBD_MAX_SHORT_IO_BYTES; + /* set cl_chunkbits default value to PAGE_SHIFT, * it will be updated at OSC connection time. */ cli->cl_chunkbits = PAGE_SHIFT; diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index da145a4..6071d49 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -399,7 +399,7 @@ static int client_common_fill_super(struct super_block *sb, char *md, char *dt, OBD_CONNECT_JOBSTATS | OBD_CONNECT_LVB_TYPE | OBD_CONNECT_LAYOUTLOCK | OBD_CONNECT_PINGLESS | OBD_CONNECT_LFSCK | - OBD_CONNECT_BULK_MBITS | + OBD_CONNECT_BULK_MBITS | OBD_CONNECT_SHORTIO | OBD_CONNECT_FLAGS2; /* The client currently advertises support for OBD_CONNECT_LOCKAHEAD_OLD so it diff --git a/lustre/obdclass/lprocfs_status.c b/lustre/obdclass/lprocfs_status.c index 37cb29d..d2505ef 100644 --- a/lustre/obdclass/lprocfs_status.c +++ b/lustre/obdclass/lprocfs_status.c @@ -2432,6 +2432,56 @@ ssize_t lprocfs_obd_max_pages_per_rpc_seq_write(struct file *file, } EXPORT_SYMBOL(lprocfs_obd_max_pages_per_rpc_seq_write); +int lprocfs_obd_short_io_bytes_seq_show(struct seq_file *m, void *data) +{ + struct obd_device *dev = data; + struct client_obd *cli = &dev->u.cli; + + spin_lock(&cli->cl_loi_list_lock); + seq_printf(m, "%d\n", cli->cl_short_io_bytes); + spin_unlock(&cli->cl_loi_list_lock); + return 0; +} +EXPORT_SYMBOL(lprocfs_obd_short_io_bytes_seq_show); + + +/* Used to catch people who think they're specifying pages. */ +#define MIN_SHORT_IO_BYTES 64 + +ssize_t lprocfs_obd_short_io_bytes_seq_write(struct file *file, + const char __user *buffer, + size_t count, loff_t *off) +{ + struct obd_device *dev = ((struct seq_file *) + file->private_data)->private; + struct client_obd *cli = &dev->u.cli; + int rc; + __u64 val; + + LPROCFS_CLIMP_CHECK(dev); + + rc = lprocfs_str_to_s64(buffer, count, &val); + if (rc) + GOTO(out, rc); + + if (val > OBD_MAX_SHORT_IO_BYTES || val < MIN_SHORT_IO_BYTES) + GOTO(out, rc = -ERANGE); + + rc = count; + + spin_lock(&cli->cl_loi_list_lock); + if (val > (cli->cl_max_pages_per_rpc << PAGE_SHIFT)) + rc = -ERANGE; + else + cli->cl_short_io_bytes = val; + spin_unlock(&cli->cl_loi_list_lock); + +out: + LPROCFS_CLIMP_EXIT(dev); + return rc; +} +EXPORT_SYMBOL(lprocfs_obd_short_io_bytes_seq_write); + int lprocfs_wr_root_squash(const char __user *buffer, unsigned long count, struct root_squash_info *squash, char *name) { diff --git a/lustre/osc/lproc_osc.c b/lustre/osc/lproc_osc.c index 84ba911..40b2e5f 100644 --- a/lustre/osc/lproc_osc.c +++ b/lustre/osc/lproc_osc.c @@ -574,6 +574,8 @@ LPROC_SEQ_FOPS_RO(osc_destroys_in_flight); LPROC_SEQ_FOPS_RW_TYPE(osc, obd_max_pages_per_rpc); +LPROC_SEQ_FOPS_RW_TYPE(osc, obd_short_io_bytes); + static int osc_unstable_stats_seq_show(struct seq_file *m, void *v) { struct obd_device *dev = m->private; @@ -616,6 +618,8 @@ struct lprocfs_vars lprocfs_osc_obd_vars[] = { .fops = &osc_active_fops }, { .name = "max_pages_per_rpc", .fops = &osc_obd_max_pages_per_rpc_fops }, + { .name = "short_io_bytes", + .fops = &osc_obd_short_io_bytes_fops }, { .name = "max_rpcs_in_flight", .fops = &osc_max_rpcs_in_flight_fops }, { .name = "destroys_in_flight", diff --git a/lustre/osc/osc_page.c b/lustre/osc/osc_page.c index c0aa4be..e929211 100644 --- a/lustre/osc/osc_page.c +++ b/lustre/osc/osc_page.c @@ -874,17 +874,27 @@ void osc_lru_unreserve(struct client_obd *cli, unsigned long npages) * are likely from the same page zone. */ static inline void unstable_page_accounting(struct ptlrpc_bulk_desc *desc, + struct osc_brw_async_args *aa, int factor) { - int page_count = desc->bd_iov_count; + int page_count; void *zone = NULL; int count = 0; int i; - LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type)); + if (desc != NULL) { + LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type)); + page_count = desc->bd_iov_count; + } else { + page_count = aa->aa_page_count; + } for (i = 0; i < page_count; i++) { - void *pz = page_zone(BD_GET_KIOV(desc, i).kiov_page); + void *pz; + if (desc) + pz = page_zone(BD_GET_KIOV(desc, i).kiov_page); + else + pz = page_zone(aa->aa_ppga[i]->pg); if (likely(pz == zone)) { ++count; @@ -903,14 +913,16 @@ static inline void unstable_page_accounting(struct ptlrpc_bulk_desc *desc, mod_zone_page_state(zone, NR_UNSTABLE_NFS, factor * count); } -static inline void add_unstable_page_accounting(struct ptlrpc_bulk_desc *desc) +static inline void add_unstable_page_accounting(struct ptlrpc_bulk_desc *desc, + struct osc_brw_async_args *aa) { - unstable_page_accounting(desc, 1); + unstable_page_accounting(desc, aa, 1); } -static inline void dec_unstable_page_accounting(struct ptlrpc_bulk_desc *desc) +static inline void dec_unstable_page_accounting(struct ptlrpc_bulk_desc *desc, + struct osc_brw_async_args *aa) { - unstable_page_accounting(desc, -1); + unstable_page_accounting(desc, aa, -1); } /** @@ -927,12 +939,19 @@ static inline void dec_unstable_page_accounting(struct ptlrpc_bulk_desc *desc) void osc_dec_unstable_pages(struct ptlrpc_request *req) { struct ptlrpc_bulk_desc *desc = req->rq_bulk; + struct osc_brw_async_args *aa = (void *)&req->rq_async_args; struct client_obd *cli = &req->rq_import->imp_obd->u.cli; - int page_count = desc->bd_iov_count; + int page_count; long unstable_count; + if (desc) + page_count = desc->bd_iov_count; + else + page_count = aa->aa_page_count; + LASSERT(page_count >= 0); - dec_unstable_page_accounting(desc); + + dec_unstable_page_accounting(desc, aa); unstable_count = atomic_long_sub_return(page_count, &cli->cl_unstable_count); @@ -954,14 +973,20 @@ void osc_dec_unstable_pages(struct ptlrpc_request *req) void osc_inc_unstable_pages(struct ptlrpc_request *req) { struct ptlrpc_bulk_desc *desc = req->rq_bulk; + struct osc_brw_async_args *aa = (void *)&req->rq_async_args; struct client_obd *cli = &req->rq_import->imp_obd->u.cli; - long page_count = desc->bd_iov_count; + long page_count; /* No unstable page tracking */ if (cli->cl_cache == NULL || !cli->cl_cache->ccc_unstable_check) return; - add_unstable_page_accounting(desc); + if (desc) + page_count = desc->bd_iov_count; + else + page_count = aa->aa_page_count; + + add_unstable_page_accounting(desc, aa); atomic_long_add(page_count, &cli->cl_unstable_count); atomic_long_add(page_count, &cli->cl_cache->ccc_unstable_nr); diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 4052b54..63ae9b0 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -58,18 +58,6 @@ struct ptlrpc_request_pool *osc_rq_pool; static unsigned int osc_reqpool_mem_max = 5; module_param(osc_reqpool_mem_max, uint, 0444); -struct osc_brw_async_args { - struct obdo *aa_oa; - int aa_requested_nob; - int aa_nio_count; - u32 aa_page_count; - int aa_resends; - struct brw_page **aa_ppga; - struct client_obd *aa_cli; - struct list_head aa_oaps; - struct list_head aa_exts; -}; - #define osc_grant_args osc_brw_async_args struct osc_setattr_args { @@ -1025,8 +1013,8 @@ static int check_write_rcs(struct ptlrpc_request *req, return(-EPROTO); } } - - if (req->rq_bulk->bd_nob_transferred != requested_nob) { + if (req->rq_bulk != NULL && + req->rq_bulk->bd_nob_transferred != requested_nob) { CERROR("Unexpected # bytes transferred: %d (requested %d)\n", req->rq_bulk->bd_nob_transferred, requested_nob); return(-EPROTO); @@ -1119,10 +1107,11 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, struct ost_body *body; struct obd_ioobj *ioobj; struct niobuf_remote *niobuf; - int niocount, i, requested_nob, opc, rc; + int niocount, i, requested_nob, opc, rc, short_io_size; struct osc_brw_async_args *aa; struct req_capsule *pill; struct brw_page *pg_prev; + void *short_io_buf; ENTRY; if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ)) @@ -1153,6 +1142,20 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT, niocount * sizeof(*niobuf)); + for (i = 0; i < page_count; i++) + short_io_size += pga[i]->count; + + /* Check if we can do a short io. */ + if (!(short_io_size <= cli->cl_short_io_bytes && niocount == 1 && + imp_connect_shortio(cli->cl_import))) + short_io_size = 0; + + req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT, + opc == OST_READ ? 0 : short_io_size); + if (opc == OST_READ) + req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER, + short_io_size); + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc); if (rc) { ptlrpc_request_free(req); @@ -1160,10 +1163,17 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, } req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */ ptlrpc_at_set_req_timeout(req); + /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own * retry logic */ req->rq_no_retry_einprogress = 1; + if (short_io_size != 0) { + desc = NULL; + short_io_buf = NULL; + goto no_bulk; + } + desc = ptlrpc_prep_bulk_imp(req, page_count, cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS, (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE : @@ -1175,7 +1185,7 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, if (desc == NULL) GOTO(out, rc = -ENOMEM); /* NB request now owns desc and will free it when it gets freed */ - +no_bulk: body = req_capsule_client_get(pill, &RMF_OST_BODY); ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ); niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE); @@ -1190,7 +1200,26 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, * when the RPC is finally sent in ptlrpc_register_bulk(). It sends * "max - 1" for old client compatibility sending "0", and also so the * the actual maximum is a power-of-two number, not one less. LU-1431 */ - ioobj_max_brw_set(ioobj, desc->bd_md_max_brw); + if (desc != NULL) + ioobj_max_brw_set(ioobj, desc->bd_md_max_brw); + else /* short io */ + ioobj_max_brw_set(ioobj, 0); + + if (short_io_size != 0) { + if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) { + body->oa.o_valid |= OBD_MD_FLFLAGS; + body->oa.o_flags = 0; + } + body->oa.o_flags |= OBD_FL_SHORT_IO; + CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n", + short_io_size); + if (opc == OST_WRITE) { + short_io_buf = req_capsule_client_get(pill, + &RMF_SHORT_IO); + LASSERT(short_io_buf != NULL); + } + } + LASSERT(page_count > 0); pg_prev = pga[0]; for (requested_nob = i = 0; i < page_count; i++, niobuf++) { @@ -1215,9 +1244,19 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, pg_prev->pg->index, pg_prev->off); LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) == (pg->flag & OBD_BRW_SRVLOCK)); - - desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count); - requested_nob += pg->count; + if (short_io_size != 0 && opc == OST_WRITE) { + unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0); + + LASSERT(short_io_size >= requested_nob + pg->count); + memcpy(short_io_buf + requested_nob, + ptr + poff, + pg->count); + ll_kunmap_atomic(ptr, KM_USER0); + } else if (short_io_size == 0) { + desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, + pg->count); + } + requested_nob += pg->count; if (i > 0 && can_merge_pages(pg_prev, pg)) { niobuf--; @@ -1486,9 +1525,9 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) CERROR("Unexpected +ve rc %d\n", rc); RETURN(-EPROTO); } - LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob); - if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk)) + if (req->rq_bulk != NULL && + sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk)) RETURN(-EAGAIN); if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum && @@ -1503,8 +1542,14 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) /* The rest of this function executes only for OST_READs */ - /* if unwrap_bulk failed, return -EAGAIN to retry */ - rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc); + if (req->rq_bulk == NULL) { + rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO, + RCL_SERVER); + LASSERT(rc == req->rq_status); + } else { + /* if unwrap_bulk failed, return -EAGAIN to retry */ + rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc); + } if (rc < 0) GOTO(out, rc = -EAGAIN); @@ -1514,12 +1559,41 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) RETURN(-EPROTO); } - if (rc != req->rq_bulk->bd_nob_transferred) { + if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) { CERROR ("Unexpected rc %d (%d transferred)\n", rc, req->rq_bulk->bd_nob_transferred); return (-EPROTO); } + if (req->rq_bulk == NULL) { + /* short io */ + int nob, pg_count, i = 0; + unsigned char *buf; + + CDEBUG(D_CACHE, "Using short io read, size %d\n", rc); + pg_count = aa->aa_page_count; + buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO, + rc); + nob = rc; + while (nob > 0 && pg_count > 0) { + unsigned char *ptr; + int count = aa->aa_ppga[i]->count > nob ? + nob : aa->aa_ppga[i]->count; + + CDEBUG(D_CACHE, "page %p count %d\n", + aa->aa_ppga[i]->pg, count); + ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0); + memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf, + count); + ll_kunmap_atomic((void *) ptr, KM_USER0); + + buf += count; + nob -= count; + i++; + pg_count--; + } + } + if (rc < aa->aa_requested_nob) handle_short_read(rc, aa->aa_page_count, aa->aa_ppga); @@ -1536,7 +1610,8 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) aa->aa_ppga, OST_READ, cksum_type); - if (peer->nid != req->rq_bulk->bd_sender) { + if (req->rq_bulk != NULL && + peer->nid != req->rq_bulk->bd_sender) { via = " via "; router = libcfs_nid2str(req->rq_bulk->bd_sender); } @@ -1710,6 +1785,7 @@ static int brw_interpret(const struct lu_env *env, struct osc_extent *ext; struct osc_extent *tmp; struct client_obd *cli = aa->aa_cli; + unsigned long transferred = 0; ENTRY; rc = osc_brw_fini_request(req, rc); @@ -1802,8 +1878,12 @@ static int brw_interpret(const struct lu_env *env, LASSERT(list_empty(&aa->aa_exts)); LASSERT(list_empty(&aa->aa_oaps)); + transferred = (req->rq_bulk == NULL ? /* short io */ + aa->aa_requested_nob : + req->rq_bulk->bd_nob_transferred); + osc_release_ppga(aa->aa_ppga, aa->aa_page_count); - ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred); + ptlrpc_lprocfs_brw(req, transferred); spin_lock(&cli->cl_loi_list_lock); /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters diff --git a/lustre/ptlrpc/layout.c b/lustre/ptlrpc/layout.c index 9ffb860..1b645c4 100644 --- a/lustre/ptlrpc/layout.c +++ b/lustre/ptlrpc/layout.c @@ -609,16 +609,18 @@ static const struct req_msg_field *ost_destroy_client[] = { static const struct req_msg_field *ost_brw_client[] = { - &RMF_PTLRPC_BODY, - &RMF_OST_BODY, - &RMF_OBD_IOOBJ, - &RMF_NIOBUF_REMOTE, - &RMF_CAPA1 + &RMF_PTLRPC_BODY, + &RMF_OST_BODY, + &RMF_OBD_IOOBJ, + &RMF_NIOBUF_REMOTE, + &RMF_CAPA1, + &RMF_SHORT_IO }; static const struct req_msg_field *ost_brw_read_server[] = { - &RMF_PTLRPC_BODY, - &RMF_OST_BODY + &RMF_PTLRPC_BODY, + &RMF_OST_BODY, + &RMF_SHORT_IO }; static const struct req_msg_field *ost_brw_write_server[] = { @@ -1168,6 +1170,9 @@ struct req_msg_field RMF_IDX_INFO = DEFINE_MSGF("idx_info", 0, sizeof(struct idx_info), lustre_swab_idx_info, NULL); EXPORT_SYMBOL(RMF_IDX_INFO); +struct req_msg_field RMF_SHORT_IO = + DEFINE_MSGF("short_io", 0, -1, NULL, NULL); +EXPORT_SYMBOL(RMF_SHORT_IO); struct req_msg_field RMF_HSM_USER_STATE = DEFINE_MSGF("hsm_user_state", 0, sizeof(struct hsm_user_state), lustre_swab_hsm_user_state, NULL); diff --git a/lustre/target/tgt_handler.c b/lustre/target/tgt_handler.c index 8460774..e359462 100644 --- a/lustre/target/tgt_handler.c +++ b/lustre/target/tgt_handler.c @@ -434,6 +434,19 @@ static int tgt_handle_request0(struct tgt_session_info *tsi, &RMF_ACL, RCL_SERVER, LUSTRE_POSIX_ACL_MAX_SIZE_OLD); + if (req_capsule_has_field(tsi->tsi_pill, &RMF_SHORT_IO, + RCL_SERVER)) { + struct niobuf_remote *remote_nb = + req_capsule_client_get(tsi->tsi_pill, + &RMF_NIOBUF_REMOTE); + struct ost_body *body = tsi->tsi_ost_body; + + req_capsule_set_size(tsi->tsi_pill, &RMF_SHORT_IO, + RCL_SERVER, + (body->oa.o_flags & OBD_FL_SHORT_IO) ? + remote_nb[0].rnb_len : 0); + } + rc = req_capsule_server_pack(tsi->tsi_pill); } @@ -1657,10 +1670,9 @@ void tgt_brw_unlock(struct obd_ioobj *obj, struct niobuf_remote *niob, tgt_extent_unlock(lh, mode); EXIT; } - -static __u32 tgt_checksum_bulk(struct lu_target *tgt, - struct ptlrpc_bulk_desc *desc, int opc, - enum cksum_types cksum_type) +static __u32 tgt_checksum_niobuf(struct lu_target *tgt, + struct niobuf_local *local_nb, int npages, + int opc, enum cksum_types cksum_type) { struct cfs_crypto_hash_desc *hdesc; unsigned int bufsize; @@ -1668,8 +1680,6 @@ static __u32 tgt_checksum_bulk(struct lu_target *tgt, unsigned char cfs_alg = cksum_obd2cfs(cksum_type); __u32 cksum; - LASSERT(ptlrpc_is_bulk_desc_kiov(desc->bd_type)); - hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0); if (IS_ERR(hdesc)) { CERROR("%s: unable to initialize checksum hash %s\n", @@ -1678,65 +1688,64 @@ static __u32 tgt_checksum_bulk(struct lu_target *tgt, } CDEBUG(D_INFO, "Checksum for algo %s\n", cfs_crypto_hash_name(cfs_alg)); - for (i = 0; i < desc->bd_iov_count; i++) { + for (i = 0; i < npages; i++) { /* corrupt the data before we compute the checksum, to * simulate a client->OST data error */ if (i == 0 && opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_RECEIVE)) { - int off = BD_GET_KIOV(desc, i).kiov_offset & - ~PAGE_MASK; - int len = BD_GET_KIOV(desc, i).kiov_len; + int off = local_nb[i].lnb_page_offset & ~PAGE_MASK; + int len = local_nb[i].lnb_len; struct page *np = tgt_page_to_corrupt; - char *ptr = kmap(BD_GET_KIOV(desc, i).kiov_page) + off; if (np) { - char *ptr2 = kmap(np) + off; + char *ptr = ll_kmap_atomic(local_nb[i].lnb_page, + KM_USER0); + char *ptr2 = page_address(np); - memcpy(ptr2, ptr, len); - memcpy(ptr2, "bad3", min(4, len)); - kunmap(np); + memcpy(ptr2 + off, ptr + off, len); + memcpy(ptr2 + off, "bad3", min(4, len)); + ll_kunmap_atomic(ptr, KM_USER0); /* LU-8376 to preserve original index for * display in dump_all_bulk_pages() */ - np->index = BD_GET_KIOV(desc, - i).kiov_page->index; + np->index = i; - BD_GET_KIOV(desc, i).kiov_page = np; + cfs_crypto_hash_update_page(hdesc, np, off, + len); + continue; } else { CERROR("%s: can't alloc page for corruption\n", tgt_name(tgt)); } } - cfs_crypto_hash_update_page(hdesc, - BD_GET_KIOV(desc, i).kiov_page, - BD_GET_KIOV(desc, i).kiov_offset & - ~PAGE_MASK, - BD_GET_KIOV(desc, i).kiov_len); + cfs_crypto_hash_update_page(hdesc, local_nb[i].lnb_page, + local_nb[i].lnb_page_offset & ~PAGE_MASK, + local_nb[i].lnb_len); /* corrupt the data after we compute the checksum, to * simulate an OST->client data error */ if (i == 0 && opc == OST_READ && OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_SEND)) { - int off = BD_GET_KIOV(desc, i).kiov_offset - & ~PAGE_MASK; - int len = BD_GET_KIOV(desc, i).kiov_len; + int off = local_nb[i].lnb_page_offset & ~PAGE_MASK; + int len = local_nb[i].lnb_len; struct page *np = tgt_page_to_corrupt; - char *ptr = - kmap(BD_GET_KIOV(desc, i).kiov_page) + off; if (np) { - char *ptr2 = kmap(np) + off; + char *ptr = ll_kmap_atomic(local_nb[i].lnb_page, + KM_USER0); + char *ptr2 = page_address(np); - memcpy(ptr2, ptr, len); - memcpy(ptr2, "bad4", min(4, len)); - kunmap(np); + memcpy(ptr2 + off, ptr + off, len); + memcpy(ptr2 + off, "bad4", min(4, len)); + ll_kunmap_atomic(ptr, KM_USER0); /* LU-8376 to preserve original index for * display in dump_all_bulk_pages() */ - np->index = BD_GET_KIOV(desc, - i).kiov_page->index; + np->index = i; - BD_GET_KIOV(desc, i).kiov_page = np; + cfs_crypto_hash_update_page(hdesc, np, off, + len); + continue; } else { CERROR("%s: can't alloc page for corruption\n", tgt_name(tgt)); @@ -1753,8 +1762,8 @@ static __u32 tgt_checksum_bulk(struct lu_target *tgt, char dbgcksum_file_name[PATH_MAX]; static void dump_all_bulk_pages(struct obdo *oa, int count, - lnet_kiov_t *iov, __u32 server_cksum, - __u32 client_cksum) + struct niobuf_local *local_nb, + __u32 server_cksum, __u32 client_cksum) { struct file *filp; int rc, i; @@ -1772,9 +1781,9 @@ static void dump_all_bulk_pages(struct obdo *oa, int count, oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0, oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0, oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0, - (__u64)iov[0].kiov_page->index << PAGE_SHIFT, - ((__u64)iov[count - 1].kiov_page->index << PAGE_SHIFT) + - iov[count - 1].kiov_len - 1, client_cksum, server_cksum); + local_nb[0].lnb_file_offset, + local_nb[count-1].lnb_file_offset + + local_nb[count-1].lnb_len - 1, client_cksum, server_cksum); filp = filp_open(dbgcksum_file_name, O_CREAT | O_EXCL | O_WRONLY | O_LARGEFILE, 0600); if (IS_ERR(filp)) { @@ -1792,8 +1801,8 @@ static void dump_all_bulk_pages(struct obdo *oa, int count, oldfs = get_fs(); set_fs(KERNEL_DS); for (i = 0; i < count; i++) { - len = iov[i].kiov_len; - buf = kmap(iov[i].kiov_page); + len = local_nb[i].lnb_len; + buf = kmap(local_nb[i].lnb_page); while (len != 0) { rc = vfs_write(filp, (__force const char __user *)buf, len, &filp->f_pos); @@ -1807,7 +1816,7 @@ static void dump_all_bulk_pages(struct obdo *oa, int count, CDEBUG(D_INFO, "%s: wrote %d bytes\n", dbgcksum_file_name, rc); } - kunmap(iov[i].kiov_page); + kunmap(local_nb[i].lnb_page); } set_fs(oldfs); @@ -1818,13 +1827,15 @@ static void dump_all_bulk_pages(struct obdo *oa, int count, return; } -static int check_read_checksum(struct ptlrpc_bulk_desc *desc, struct obdo *oa, +static int check_read_checksum(struct niobuf_local *local_nb, int npages, + struct obd_export *exp, struct obdo *oa, const lnet_process_id_t *peer, __u32 client_cksum, __u32 server_cksum, enum cksum_types server_cksum_type) { char *msg; enum cksum_types cksum_type; + loff_t start, end; /* unlikely to happen and only if resend does not occur due to cksum * control failure on Client */ @@ -1834,9 +1845,8 @@ static int check_read_checksum(struct ptlrpc_bulk_desc *desc, struct obdo *oa, return 0; } - if (desc->bd_export->exp_obd->obd_checksum_dump) - dump_all_bulk_pages(oa, desc->bd_iov_count, - &BD_GET_KIOV(desc, 0), server_cksum, + if (exp->exp_obd->obd_checksum_dump) + dump_all_bulk_pages(oa, npages, local_nb, server_cksum, client_cksum); cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ? @@ -1848,24 +1858,49 @@ static int check_read_checksum(struct ptlrpc_bulk_desc *desc, struct obdo *oa, else msg = "should have changed on the client or in transit"; + start = local_nb[0].lnb_file_offset; + end = local_nb[npages-1].lnb_file_offset + + local_nb[npages-1].lnb_len - 1; + LCONSOLE_ERROR_MSG(0x132, "%s: BAD READ CHECKSUM: %s: from %s inode " DFID " object "DOSTID" extent [%llu-%llu], client returned csum" " %x (type %x), server csum %x (type %x)\n", - desc->bd_export->exp_obd->obd_name, + exp->exp_obd->obd_name, msg, libcfs_nid2str(peer->nid), oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : 0ULL, oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0, oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0, POSTID(&oa->o_oi), - (__u64)BD_GET_KIOV(desc, 0).kiov_page->index << PAGE_SHIFT, - ((__u64)BD_GET_KIOV(desc, - desc->bd_iov_count - 1).kiov_page->index - << PAGE_SHIFT) + - BD_GET_KIOV(desc, desc->bd_iov_count - 1).kiov_len - 1, - client_cksum, cksum_type, server_cksum, server_cksum_type); + start, end, client_cksum, cksum_type, server_cksum, + server_cksum_type); + return 1; } +static int tgt_pages2shortio(struct niobuf_local *local, int npages, + unsigned char *buf, int size) +{ + int i, off, len, copied = size; + char *ptr; + + for (i = 0; i < npages; i++) { + off = local[i].lnb_page_offset & ~PAGE_MASK; + len = local[i].lnb_len; + + CDEBUG(D_PAGE, "index %d offset = %d len = %d left = %d\n", + i, off, len, size); + if (len > size) + return -EINVAL; + + ptr = ll_kmap_atomic(local[i].lnb_page, KM_USER0); + memcpy(buf + off, ptr, len); + ll_kunmap_atomic(ptr, KM_USER0); + buf += len; + size -= len; + } + return copied - size; +} + int tgt_brw_read(struct tgt_session_info *tsi) { struct ptlrpc_request *req = tgt_ses_req(tsi); @@ -1877,7 +1912,8 @@ int tgt_brw_read(struct tgt_session_info *tsi) struct ost_body *body, *repbody; struct l_wait_info lwi; struct lustre_handle lockh = { 0 }; - int npages, nob = 0, rc, i, no_reply = 0; + int npages, nob = 0, rc, i, no_reply = 0, + npages_read; struct tgt_thread_big_cache *tbc = req->rq_svc_thread->t_data; ENTRY; @@ -1953,33 +1989,41 @@ int tgt_brw_read(struct tgt_session_info *tsi) if (rc != 0) GOTO(out_lock, rc); - desc = ptlrpc_prep_bulk_exp(req, npages, ioobj_max_brw_get(ioo), - PTLRPC_BULK_PUT_SOURCE | - PTLRPC_BULK_BUF_KIOV, - OST_BULK_PORTAL, - &ptlrpc_bulk_kiov_nopin_ops); - if (desc == NULL) - GOTO(out_commitrw, rc = -ENOMEM); + if (body->oa.o_flags & OBD_FL_SHORT_IO) { + desc = NULL; + } else { + desc = ptlrpc_prep_bulk_exp(req, npages, ioobj_max_brw_get(ioo), + PTLRPC_BULK_PUT_SOURCE | + PTLRPC_BULK_BUF_KIOV, + OST_BULK_PORTAL, + &ptlrpc_bulk_kiov_nopin_ops); + if (desc == NULL) + GOTO(out_commitrw, rc = -ENOMEM); + } nob = 0; + npages_read = npages; for (i = 0; i < npages; i++) { int page_rc = local_nb[i].lnb_rc; if (page_rc < 0) { rc = page_rc; + npages_read = i; break; } nob += page_rc; - if (page_rc != 0) { /* some data! */ + if (page_rc != 0 && desc != NULL) { /* some data! */ LASSERT(local_nb[i].lnb_page != NULL); desc->bd_frag_ops->add_kiov_frag (desc, local_nb[i].lnb_page, - local_nb[i].lnb_page_offset, + local_nb[i].lnb_page_offset & ~PAGE_MASK, page_rc); } if (page_rc != local_nb[i].lnb_len) { /* short read */ + local_nb[i].lnb_len = page_rc; + npages_read = i + (page_rc != 0 ? 1 : 0); /* All subsequent pages should be 0 */ while (++i < npages) LASSERT(local_nb[i].lnb_rc == 0); @@ -1997,8 +2041,9 @@ int tgt_brw_read(struct tgt_session_info *tsi) repbody->oa.o_flags = cksum_type_pack(cksum_type); repbody->oa.o_valid = OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; - repbody->oa.o_cksum = tgt_checksum_bulk(tsi->tsi_tgt, desc, - OST_READ, cksum_type); + repbody->oa.o_cksum = tgt_checksum_niobuf(tsi->tsi_tgt, + local_nb, npages_read, + OST_READ, cksum_type); CDEBUG(D_PAGE, "checksum at read origin: %x\n", repbody->oa.o_cksum); @@ -2007,7 +2052,8 @@ int tgt_brw_read(struct tgt_session_info *tsi) * zero-cksum case) */ if ((body->oa.o_valid & OBD_MD_FLFLAGS) && (body->oa.o_flags & OBD_FL_RECOV_RESEND)) - check_read_checksum(desc, &body->oa, &req->rq_peer, + check_read_checksum(local_nb, npages_read, exp, + &body->oa, &req->rq_peer, body->oa.o_cksum, repbody->oa.o_cksum, cksum_type); } else { @@ -2017,11 +2063,31 @@ int tgt_brw_read(struct tgt_session_info *tsi) /* Check if client was evicted while we were doing i/o before touching * network */ - if (likely(rc == 0 && - !CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2) && - !CFS_FAIL_CHECK(OBD_FAIL_PTLRPC_DROP_BULK))) { - rc = target_bulk_io(exp, desc, &lwi); + if (rc == 0) { + if (body->oa.o_flags & OBD_FL_SHORT_IO) { + unsigned char *short_io_buf; + int short_io_size; + + short_io_buf = req_capsule_server_get(&req->rq_pill, + &RMF_SHORT_IO); + short_io_size = req_capsule_get_size(&req->rq_pill, + &RMF_SHORT_IO, + RCL_SERVER); + rc = tgt_pages2shortio(local_nb, npages_read, + short_io_buf, short_io_size); + if (rc >= 0) + req_capsule_shrink(&req->rq_pill, + &RMF_SHORT_IO, rc, + RCL_SERVER); + rc = rc > 0 ? 0 : rc; + } else if (!CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2)) { + rc = target_bulk_io(exp, desc, &lwi); + } no_reply = rc != 0; + } else { + if (body->oa.o_flags & OBD_FL_SHORT_IO) + req_capsule_shrink(&req->rq_pill, &RMF_SHORT_IO, 0, + RCL_SERVER); } out_commitrw: @@ -2049,8 +2115,10 @@ out_lock: obd_export_nid2str(exp), rc); } /* send a bulk after reply to simulate a network delay or reordering - * by a router */ - if (unlikely(CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2))) { + * by a router - Note that !desc implies short io, so there is no bulk + * to reorder. */ + if (unlikely(CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2)) && + desc) { wait_queue_head_t waitq; struct l_wait_info lwi1; @@ -2067,6 +2135,32 @@ out_lock: } EXPORT_SYMBOL(tgt_brw_read); +static int tgt_shortio2pages(struct niobuf_local *local, int npages, + unsigned char *buf, int size) +{ + int i, off, len; + char *ptr; + + for (i = 0; i < npages; i++) { + off = local[i].lnb_page_offset & ~PAGE_MASK; + len = local[i].lnb_len; + + if (len == 0) + continue; + + CDEBUG(D_PAGE, "index %d offset = %d len = %d left = %d\n", + i, off, len, size); + ptr = ll_kmap_atomic(local[i].lnb_page, KM_USER0); + if (ptr == NULL) + return -EINVAL; + memcpy(ptr + off, buf, len < size ? len : size); + ll_kunmap_atomic(ptr, KM_USER0); + buf += len; + size -= len; + } + return 0; +} + static void tgt_warn_on_cksum(struct ptlrpc_request *req, struct ptlrpc_bulk_desc *desc, struct niobuf_local *local_nb, int npages, @@ -2081,14 +2175,13 @@ static void tgt_warn_on_cksum(struct ptlrpc_request *req, body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY); LASSERT(body != NULL); - if (req->rq_peer.nid != desc->bd_sender) { + if (desc && req->rq_peer.nid != desc->bd_sender) { via = " via "; router = libcfs_nid2str(desc->bd_sender); } if (exp->exp_obd->obd_checksum_dump) - dump_all_bulk_pages(&body->oa, desc->bd_iov_count, - &BD_GET_KIOV(desc, 0), server_cksum, + dump_all_bulk_pages(&body->oa, npages, local_nb, server_cksum, client_cksum); if (mmap) { @@ -2238,26 +2331,45 @@ int tgt_brw_write(struct tgt_session_info *tsi) objcount, ioo, remote_nb, &npages, local_nb); if (rc < 0) GOTO(out_lock, rc); + if (body->oa.o_flags & OBD_FL_SHORT_IO) { + int short_io_size; + unsigned char *short_io_buf; + + short_io_size = req_capsule_get_size(&req->rq_pill, + &RMF_SHORT_IO, + RCL_CLIENT); + short_io_buf = req_capsule_client_get(&req->rq_pill, + &RMF_SHORT_IO); + CDEBUG(D_INFO, "Client use short io for data transfer," + " size = %d\n", short_io_size); + + /* Copy short io buf to pages */ + rc = tgt_shortio2pages(local_nb, npages, short_io_buf, + short_io_size); + desc = NULL; + } else { + desc = ptlrpc_prep_bulk_exp(req, npages, ioobj_max_brw_get(ioo), + PTLRPC_BULK_GET_SINK | + PTLRPC_BULK_BUF_KIOV, + OST_BULK_PORTAL, + &ptlrpc_bulk_kiov_nopin_ops); + if (desc == NULL) + GOTO(skip_transfer, rc = -ENOMEM); + + /* NB Having prepped, we must commit... */ + for (i = 0; i < npages; i++) + desc->bd_frag_ops->add_kiov_frag(desc, + local_nb[i].lnb_page, + local_nb[i].lnb_page_offset & ~PAGE_MASK, + local_nb[i].lnb_len); + + rc = sptlrpc_svc_prep_bulk(req, desc); + if (rc != 0) + GOTO(skip_transfer, rc); - desc = ptlrpc_prep_bulk_exp(req, npages, ioobj_max_brw_get(ioo), - PTLRPC_BULK_GET_SINK | PTLRPC_BULK_BUF_KIOV, - OST_BULK_PORTAL, - &ptlrpc_bulk_kiov_nopin_ops); - if (desc == NULL) - GOTO(skip_transfer, rc = -ENOMEM); - - /* NB Having prepped, we must commit... */ - for (i = 0; i < npages; i++) - desc->bd_frag_ops->add_kiov_frag(desc, - local_nb[i].lnb_page, - local_nb[i].lnb_page_offset, - local_nb[i].lnb_len); - - rc = sptlrpc_svc_prep_bulk(req, desc); - if (rc != 0) - GOTO(skip_transfer, rc); + rc = target_bulk_io(exp, desc, &lwi); + } - rc = target_bulk_io(exp, desc, &lwi); no_reply = rc != 0; skip_transfer: @@ -2270,8 +2382,10 @@ skip_transfer: repbody->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; repbody->oa.o_flags &= ~OBD_FL_CKSUM_ALL; repbody->oa.o_flags |= cksum_type_pack(cksum_type); - repbody->oa.o_cksum = tgt_checksum_bulk(tsi->tsi_tgt, desc, - OST_WRITE, cksum_type); + repbody->oa.o_cksum = tgt_checksum_niobuf(tsi->tsi_tgt, + local_nb, npages, + OST_WRITE, + cksum_type); cksum_counter++; if (unlikely(body->oa.o_cksum != repbody->oa.o_cksum)) { -- 1.8.3.1