X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fosc%2Fosc_request.c;h=63ae9b041c30bd309c0598d866aa9180b96cee58;hp=4052b540b94cf0264df073c66c8d5991761451ce;hb=70f092a0587866662735e1a6eaf27701a576370d;hpb=a046e879fcadd601c9a19fd906f82ecbd2d4efd5 diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 4052b54..63ae9b0 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -58,18 +58,6 @@ struct ptlrpc_request_pool *osc_rq_pool; static unsigned int osc_reqpool_mem_max = 5; module_param(osc_reqpool_mem_max, uint, 0444); -struct osc_brw_async_args { - struct obdo *aa_oa; - int aa_requested_nob; - int aa_nio_count; - u32 aa_page_count; - int aa_resends; - struct brw_page **aa_ppga; - struct client_obd *aa_cli; - struct list_head aa_oaps; - struct list_head aa_exts; -}; - #define osc_grant_args osc_brw_async_args struct osc_setattr_args { @@ -1025,8 +1013,8 @@ static int check_write_rcs(struct ptlrpc_request *req, return(-EPROTO); } } - - if (req->rq_bulk->bd_nob_transferred != requested_nob) { + if (req->rq_bulk != NULL && + req->rq_bulk->bd_nob_transferred != requested_nob) { CERROR("Unexpected # bytes transferred: %d (requested %d)\n", req->rq_bulk->bd_nob_transferred, requested_nob); return(-EPROTO); @@ -1119,10 +1107,11 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, struct ost_body *body; struct obd_ioobj *ioobj; struct niobuf_remote *niobuf; - int niocount, i, requested_nob, opc, rc; + int niocount, i, requested_nob, opc, rc, short_io_size; struct osc_brw_async_args *aa; struct req_capsule *pill; struct brw_page *pg_prev; + void *short_io_buf; ENTRY; if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ)) @@ -1153,6 +1142,20 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT, niocount * sizeof(*niobuf)); + for (i = 0; i < page_count; i++) + short_io_size += pga[i]->count; + + /* Check if we can do a short io. */ + if (!(short_io_size <= cli->cl_short_io_bytes && niocount == 1 && + imp_connect_shortio(cli->cl_import))) + short_io_size = 0; + + req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_CLIENT, + opc == OST_READ ? 0 : short_io_size); + if (opc == OST_READ) + req_capsule_set_size(pill, &RMF_SHORT_IO, RCL_SERVER, + short_io_size); + rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc); if (rc) { ptlrpc_request_free(req); @@ -1160,10 +1163,17 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, } req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */ ptlrpc_at_set_req_timeout(req); + /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own * retry logic */ req->rq_no_retry_einprogress = 1; + if (short_io_size != 0) { + desc = NULL; + short_io_buf = NULL; + goto no_bulk; + } + desc = ptlrpc_prep_bulk_imp(req, page_count, cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS, (opc == OST_WRITE ? PTLRPC_BULK_GET_SOURCE : @@ -1175,7 +1185,7 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, if (desc == NULL) GOTO(out, rc = -ENOMEM); /* NB request now owns desc and will free it when it gets freed */ - +no_bulk: body = req_capsule_client_get(pill, &RMF_OST_BODY); ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ); niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE); @@ -1190,7 +1200,26 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, * when the RPC is finally sent in ptlrpc_register_bulk(). It sends * "max - 1" for old client compatibility sending "0", and also so the * the actual maximum is a power-of-two number, not one less. LU-1431 */ - ioobj_max_brw_set(ioobj, desc->bd_md_max_brw); + if (desc != NULL) + ioobj_max_brw_set(ioobj, desc->bd_md_max_brw); + else /* short io */ + ioobj_max_brw_set(ioobj, 0); + + if (short_io_size != 0) { + if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) { + body->oa.o_valid |= OBD_MD_FLFLAGS; + body->oa.o_flags = 0; + } + body->oa.o_flags |= OBD_FL_SHORT_IO; + CDEBUG(D_CACHE, "Using short io for data transfer, size = %d\n", + short_io_size); + if (opc == OST_WRITE) { + short_io_buf = req_capsule_client_get(pill, + &RMF_SHORT_IO); + LASSERT(short_io_buf != NULL); + } + } + LASSERT(page_count > 0); pg_prev = pga[0]; for (requested_nob = i = 0; i < page_count; i++, niobuf++) { @@ -1215,9 +1244,19 @@ osc_brw_prep_request(int cmd, struct client_obd *cli, struct obdo *oa, pg_prev->pg->index, pg_prev->off); LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) == (pg->flag & OBD_BRW_SRVLOCK)); - - desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, pg->count); - requested_nob += pg->count; + if (short_io_size != 0 && opc == OST_WRITE) { + unsigned char *ptr = ll_kmap_atomic(pg->pg, KM_USER0); + + LASSERT(short_io_size >= requested_nob + pg->count); + memcpy(short_io_buf + requested_nob, + ptr + poff, + pg->count); + ll_kunmap_atomic(ptr, KM_USER0); + } else if (short_io_size == 0) { + desc->bd_frag_ops->add_kiov_frag(desc, pg->pg, poff, + pg->count); + } + requested_nob += pg->count; if (i > 0 && can_merge_pages(pg_prev, pg)) { niobuf--; @@ -1486,9 +1525,9 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) CERROR("Unexpected +ve rc %d\n", rc); RETURN(-EPROTO); } - LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob); - if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk)) + if (req->rq_bulk != NULL && + sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk)) RETURN(-EAGAIN); if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum && @@ -1503,8 +1542,14 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) /* The rest of this function executes only for OST_READs */ - /* if unwrap_bulk failed, return -EAGAIN to retry */ - rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc); + if (req->rq_bulk == NULL) { + rc = req_capsule_get_size(&req->rq_pill, &RMF_SHORT_IO, + RCL_SERVER); + LASSERT(rc == req->rq_status); + } else { + /* if unwrap_bulk failed, return -EAGAIN to retry */ + rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc); + } if (rc < 0) GOTO(out, rc = -EAGAIN); @@ -1514,12 +1559,41 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) RETURN(-EPROTO); } - if (rc != req->rq_bulk->bd_nob_transferred) { + if (req->rq_bulk != NULL && rc != req->rq_bulk->bd_nob_transferred) { CERROR ("Unexpected rc %d (%d transferred)\n", rc, req->rq_bulk->bd_nob_transferred); return (-EPROTO); } + if (req->rq_bulk == NULL) { + /* short io */ + int nob, pg_count, i = 0; + unsigned char *buf; + + CDEBUG(D_CACHE, "Using short io read, size %d\n", rc); + pg_count = aa->aa_page_count; + buf = req_capsule_server_sized_get(&req->rq_pill, &RMF_SHORT_IO, + rc); + nob = rc; + while (nob > 0 && pg_count > 0) { + unsigned char *ptr; + int count = aa->aa_ppga[i]->count > nob ? + nob : aa->aa_ppga[i]->count; + + CDEBUG(D_CACHE, "page %p count %d\n", + aa->aa_ppga[i]->pg, count); + ptr = ll_kmap_atomic(aa->aa_ppga[i]->pg, KM_USER0); + memcpy(ptr + (aa->aa_ppga[i]->off & ~PAGE_MASK), buf, + count); + ll_kunmap_atomic((void *) ptr, KM_USER0); + + buf += count; + nob -= count; + i++; + pg_count--; + } + } + if (rc < aa->aa_requested_nob) handle_short_read(rc, aa->aa_page_count, aa->aa_ppga); @@ -1536,7 +1610,8 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) aa->aa_ppga, OST_READ, cksum_type); - if (peer->nid != req->rq_bulk->bd_sender) { + if (req->rq_bulk != NULL && + peer->nid != req->rq_bulk->bd_sender) { via = " via "; router = libcfs_nid2str(req->rq_bulk->bd_sender); } @@ -1710,6 +1785,7 @@ static int brw_interpret(const struct lu_env *env, struct osc_extent *ext; struct osc_extent *tmp; struct client_obd *cli = aa->aa_cli; + unsigned long transferred = 0; ENTRY; rc = osc_brw_fini_request(req, rc); @@ -1802,8 +1878,12 @@ static int brw_interpret(const struct lu_env *env, LASSERT(list_empty(&aa->aa_exts)); LASSERT(list_empty(&aa->aa_oaps)); + transferred = (req->rq_bulk == NULL ? /* short io */ + aa->aa_requested_nob : + req->rq_bulk->bd_nob_transferred); + osc_release_ppga(aa->aa_ppga, aa->aa_page_count); - ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred); + ptlrpc_lprocfs_brw(req, transferred); spin_lock(&cli->cl_loi_list_lock); /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters