From e4472c979c809ee4ba8c14b933c2e53713af1845 Mon Sep 17 00:00:00 2001 From: anserper Date: Tue, 5 May 2009 20:57:32 +0000 Subject: [PATCH] i=Johann Lombardi i=Alexander Zarochentsev b=18801 allow unaligned direct I/O --- lustre/include/lustre_net.h | 2 +- lustre/include/obd.h | 2 +- lustre/include/obd_class.h | 7 ++--- lustre/include/obd_ost.h | 1 + lustre/llite/rw26.c | 39 ++++++++++++++-------------- lustre/lov/lov_obd.c | 4 +-- lustre/obdecho/echo_client.c | 2 +- lustre/osc/osc_internal.h | 2 +- lustre/osc/osc_request.c | 61 +++++++++++++++++++++++++------------------- 9 files changed, 66 insertions(+), 54 deletions(-) diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index de90bd1..86dec19 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -187,7 +187,7 @@ union ptlrpc_async_args { * a pointer to it here. The pointer_arg ensures this struct is at * least big enough for that. */ void *pointer_arg[9]; - __u64 space[4]; + __u64 space[5]; }; struct ptlrpc_request_set; diff --git a/lustre/include/obd.h b/lustre/include/obd.h index cfeb383..4bc3435 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -1135,7 +1135,7 @@ struct obd_ops { int (*o_brw_async)(int rw, struct obd_export *exp, struct obd_info *oinfo, obd_count oa_bufs, struct brw_page *pgarr, struct obd_trans_info *oti, - struct ptlrpc_request_set *); + struct ptlrpc_request_set *, int pshift); int (*o_prep_async_page)(struct obd_export *exp, struct lov_stripe_md *lsm, struct lov_oinfo *loi, diff --git a/lustre/include/obd_class.h b/lustre/include/obd_class.h index 8088f0d..0517955 100644 --- a/lustre/include/obd_class.h +++ b/lustre/include/obd_class.h @@ -1062,7 +1062,7 @@ static inline int obd_brw(int cmd, struct obd_export *exp, static inline int obd_brw_async(int cmd, struct obd_export *exp, struct obd_info *oinfo, obd_count oa_bufs, struct brw_page *pg, struct obd_trans_info *oti, - struct ptlrpc_request_set *set) + struct ptlrpc_request_set *set, int pshift) { int rc; ENTRY; @@ -1075,7 +1075,8 @@ static inline int obd_brw_async(int cmd, struct obd_export *exp, LBUG(); } - rc = OBP(exp->exp_obd, brw_async)(cmd, exp, oinfo, oa_bufs, pg,oti,set); + rc = OBP(exp->exp_obd, brw_async)(cmd, exp, oinfo, oa_bufs, + pg, oti,set, pshift); RETURN(rc); } @@ -1095,7 +1096,7 @@ static inline int obd_brw_rqset(int cmd, struct obd_export *exp, oinfo.oi_oa = oa; oinfo.oi_md = lsm; - rc = obd_brw_async(cmd, exp, &oinfo, oa_bufs, pg, oti, set); + rc = obd_brw_async(cmd, exp, &oinfo, oa_bufs, pg, oti, set, 0); if (rc == 0) { rc = ptlrpc_set_wait(set); if (rc) diff --git a/lustre/include/obd_ost.h b/lustre/include/obd_ost.h index 5ae31e9..3195586 100644 --- a/lustre/include/obd_ost.h +++ b/lustre/include/obd_ost.h @@ -51,6 +51,7 @@ struct osc_brw_async_args { int aa_nio_count; obd_count aa_page_count; int aa_resends; + int aa_pshift; struct brw_page **aa_ppga; struct client_obd *aa_cli; struct list_head aa_oaps; diff --git a/lustre/llite/rw26.c b/lustre/llite/rw26.c index 32c0162..3b53d54 100644 --- a/lustre/llite/rw26.c +++ b/lustre/llite/rw26.c @@ -160,10 +160,11 @@ static ssize_t ll_direct_IO_26_seg(int rw, struct inode *inode, struct obd_info *oinfo, struct ptlrpc_request_set *set, size_t size, loff_t file_offset, - struct page **pages, int page_count) + struct page **pages, int page_count, + unsigned long user_addr) { struct brw_page *pga; - int i, rc = 0; + int i, rc = 0, pshift; size_t length; ENTRY; @@ -174,21 +175,32 @@ static ssize_t ll_direct_IO_26_seg(int rw, struct inode *inode, RETURN(-ENOMEM); } - for (i = 0, length = size; length > 0; - length -=pga[i].count, file_offset +=pga[i].count,i++) {/*i last!*/ + /* + * pshift is something we'll add to ->off to get the in-memory offset, + * also see the OSC_FILE2MEM_OFF macro + */ + pshift = (user_addr & ~CFS_PAGE_MASK) - (file_offset & ~CFS_PAGE_MASK); + + for (i = 0, length = size; length > 0; i++) {/*i last!*/ + LASSERT(i < page_count); + pga[i].pg = pages[i]; pga[i].off = file_offset; /* To the end of the page, or the length, whatever is less */ - pga[i].count = min_t(int, CFS_PAGE_SIZE -(file_offset & ~CFS_PAGE_MASK), + pga[i].count = min_t(int, CFS_PAGE_SIZE -(user_addr & ~CFS_PAGE_MASK), length); pga[i].flag = OBD_BRW_SYNC; if (rw == READ) POISON_PAGE(pages[i], 0x0d); + + length -= pga[i].count; + file_offset += pga[i].count; + user_addr += pga[i].count; } rc = obd_brw_async(rw == WRITE ? OBD_BRW_WRITE : OBD_BRW_READ, ll_i2obdexp(inode), oinfo, page_count, - pga, NULL, set); + pga, NULL, set, pshift); if (rc == 0) rc = size; @@ -221,10 +233,6 @@ static ssize_t ll_direct_IO_26(int rw, struct kiocb *iocb, if (!lli->lli_smd || !lli->lli_smd->lsm_object_id) RETURN(-EBADF); - /* FIXME: io smaller than CFS_PAGE_SIZE is broken on ia64 ??? */ - if ((file_offset & (~CFS_PAGE_MASK)) || (count & ~CFS_PAGE_MASK)) - RETURN(-EINVAL); - CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p), size="LPSZ" (max %lu), " "offset=%lld=%llx, pages "LPSZ" (max %lu)\n", inode->i_ino, inode->i_generation, inode, count, MAX_DIO_SIZE, @@ -236,13 +244,6 @@ static ssize_t ll_direct_IO_26(int rw, struct kiocb *iocb, else ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_DIRECT_READ, count); - /* Check that all user buffers are aligned as well */ - for (seg = 0; seg < nr_segs; seg++) { - if (((unsigned long)iov[seg].iov_base & ~CFS_PAGE_MASK) || - (iov[seg].iov_len & ~CFS_PAGE_MASK)) - RETURN(-EINVAL); - } - set = ptlrpc_prep_set(); if (set == NULL) RETURN(-ENOMEM); @@ -255,7 +256,6 @@ static ssize_t ll_direct_IO_26(int rw, struct kiocb *iocb, *size changing by concurrent truncates and writes. */ if (rw == READ) LOCK_INODE_MUTEX(inode); - for (seg = 0; seg < nr_segs; seg++) { size_t iov_left = iov[seg].iov_len; unsigned long user_addr = (unsigned long)iov[seg].iov_base; @@ -282,7 +282,8 @@ static ssize_t ll_direct_IO_26(int rw, struct kiocb *iocb, &oinfo, set, min(size,iov_left), file_offset, pages, - page_count); + page_count, + user_addr); ll_free_user_pages(pages, page_count, rw==READ); } else { result = 0; diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index bfeab55..4ee1021 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -1764,7 +1764,7 @@ static int lov_brw_interpret(struct ptlrpc_request_set *reqset, void *data, static int lov_brw_async(int cmd, struct obd_export *exp, struct obd_info *oinfo, obd_count oa_bufs, struct brw_page *pga, struct obd_trans_info *oti, - struct ptlrpc_request_set *set) + struct ptlrpc_request_set *set, int pshift) { struct lov_request_set *lovset; struct lov_request *req; @@ -1793,7 +1793,7 @@ static int lov_brw_async(int cmd, struct obd_export *exp, sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp; sub_pga = lovset->set_pga + req->rq_pgaidx; rc = obd_brw_async(cmd, sub_exp, &req->rq_oi, req->rq_oabufs, - sub_pga, oti, set); + sub_pga, oti, set, pshift); if (rc) GOTO(out, rc); lov_update_common_set(lovset, req, rc); diff --git a/lustre/obdecho/echo_client.c b/lustre/obdecho/echo_client.c index 8ed0815..c0c822e 100644 --- a/lustre/obdecho/echo_client.c +++ b/lustre/obdecho/echo_client.c @@ -577,7 +577,7 @@ static int echo_client_kbrw(struct obd_device *obd, int rw, struct obdo *oa, oinfo.oi_oa = oa; oinfo.oi_md = lsm; - rc = obd_brw_async(rw, ec->ec_exp, &oinfo, npages, pga, oti, set); + rc = obd_brw_async(rw, ec->ec_exp, &oinfo, npages, pga, oti, set, 0); if (rc == 0) { rc = ptlrpc_set_wait(set); if (rc) diff --git a/lustre/osc/osc_internal.h b/lustre/osc/osc_internal.h index 393e6d9..a8d6e84 100644 --- a/lustre/osc/osc_internal.h +++ b/lustre/osc/osc_internal.h @@ -130,5 +130,5 @@ static inline int osc_exp_is_2_0_server(struct obd_export *exp) { return !!(exp->exp_connect_flags & OBD_CONNECT_FID); } - +#define OSC_FILE2MEM_OFF(fileoff,pshift) ((fileoff) + (pshift)) #endif /* OSC_INTERNAL_H */ diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index df93f5e..1373bd1 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -1070,7 +1070,7 @@ static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2) static obd_count osc_checksum_bulk(int nob, obd_count pg_count, struct brw_page **pga, int opc, - cksum_type_t cksum_type) + cksum_type_t cksum_type, int pshift) { __u32 cksum; int i = 0; @@ -1079,7 +1079,7 @@ static obd_count osc_checksum_bulk(int nob, obd_count pg_count, cksum = init_checksum(cksum_type); while (nob > 0 && pg_count > 0) { unsigned char *ptr = cfs_kmap(pga[i]->pg); - int off = pga[i]->off & ~CFS_PAGE_MASK; + int off = OSC_FILE2MEM_OFF(pga[i]->off, pshift) & ~CFS_PAGE_MASK; int count = pga[i]->count > nob ? nob : pga[i]->count; /* corrupt the data before we compute the checksum, to @@ -1107,7 +1107,7 @@ static obd_count osc_checksum_bulk(int nob, obd_count pg_count, static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, struct lov_stripe_md *lsm, obd_count page_count, struct brw_page **pga, - struct ptlrpc_request **reqp) + struct ptlrpc_request **reqp, int pshift) { struct ptlrpc_request *req; struct ptlrpc_bulk_desc *desc; @@ -1168,9 +1168,10 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, struct brw_page *pg = pga[i]; LASSERT(pg->count > 0); - LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE, - "i: %d pg: %p off: "LPU64", count: %u\n", i, pg, - pg->off, pg->count); + LASSERTF((OSC_FILE2MEM_OFF(pg->off, pshift) & ~CFS_PAGE_MASK) + + pg->count <= CFS_PAGE_SIZE, + "i: %d pg: %p off: "LPU64", count: %u, shift: %d\n", + i, pg, pg->off, pg->count, pshift); #ifdef __linux__ LASSERTF(i == 0 || pg->off > pg_prev->off, "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64 @@ -1186,7 +1187,8 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) == (pg->flag & OBD_BRW_SRVLOCK)); - ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK, + ptlrpc_prep_bulk_page(desc, pg->pg, + OSC_FILE2MEM_OFF(pg->off,pshift)&~CFS_PAGE_MASK, pg->count); requested_nob += pg->count; @@ -1228,7 +1230,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, body->oa.o_cksum = osc_checksum_bulk(requested_nob, page_count, pga, OST_WRITE, - cksum_type); + cksum_type, pshift); CDEBUG(D_PAGE, "checksum at write origin: %x\n", body->oa.o_cksum); /* save this in 'oa', too, for later checking */ @@ -1263,6 +1265,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, aa->aa_resends = 0; aa->aa_ppga = pga; aa->aa_cli = cli; + aa->aa_pshift = pshift; CFS_INIT_LIST_HEAD(&aa->aa_oaps); *reqp = req; @@ -1276,7 +1279,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa, static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer, __u32 client_cksum, __u32 server_cksum, int nob, obd_count page_count, struct brw_page **pga, - cksum_type_t client_cksum_type) + cksum_type_t client_cksum_type, int pshift) { __u32 new_cksum; char *msg; @@ -1293,7 +1296,7 @@ static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer, cksum_type = OBD_CKSUM_CRC32; new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE, - cksum_type); + cksum_type, pshift); if (cksum_type != client_cksum_type) msg = "the server did not use the checksum type specified in " @@ -1373,7 +1376,8 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) check_write_checksum(&body->oa, peer, client_cksum, body->oa.o_cksum, aa->aa_requested_nob, aa->aa_page_count, aa->aa_ppga, - cksum_type_unpack(aa->aa_oa->o_flags))) + cksum_type_unpack(aa->aa_oa->o_flags), + aa->aa_pshift)) RETURN(-EAGAIN); rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count, @@ -1410,7 +1414,7 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc) cksum_type = OBD_CKSUM_CRC32; client_cksum = osc_checksum_bulk(rc, aa->aa_page_count, aa->aa_ppga, OST_READ, - cksum_type); + cksum_type, aa->aa_pshift); if (peer->nid == req->rq_bulk->bd_sender) { via = router = ""; @@ -1485,7 +1489,7 @@ static int osc_brw_internal(int cmd, struct obd_export *exp,struct obdo *oa, restart_bulk: rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm, - page_count, pga, &request); + page_count, pga, &request, 0); if (rc != 0) return (rc); @@ -1536,7 +1540,8 @@ int osc_brw_redo_request(struct ptlrpc_request *request, OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ, aa->aa_cli, aa->aa_oa, NULL /* lsm unused by osc currently */, - aa->aa_page_count, aa->aa_ppga, &new_req); + aa->aa_page_count, aa->aa_ppga, &new_req, + aa->aa_pshift); if (rc) RETURN(rc); @@ -1588,7 +1593,8 @@ int osc_brw_redo_request(struct ptlrpc_request *request, static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa, struct lov_stripe_md *lsm, obd_count page_count, - struct brw_page **pga, struct ptlrpc_request_set *set) + struct brw_page **pga, struct ptlrpc_request_set *set, + int pshift) { struct ptlrpc_request *request; struct client_obd *cli = &exp->exp_obd->u.cli; @@ -1598,7 +1604,8 @@ static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa, /* Consume write credits even if doing a sync write - * otherwise we may run out of space on OST due to grant. */ - if (cmd == OBD_BRW_WRITE) { + /* Badly aligned writes are not subject to write granting */ + if (cmd == OBD_BRW_WRITE && pshift == 0) { client_obd_list_lock(&cli->cl_loi_list_lock); for (i = 0; i < page_count; i++) { if (cli->cl_avail_grant >= CFS_PAGE_SIZE) @@ -1608,7 +1615,7 @@ static int async_internal(int cmd, struct obd_export *exp, struct obdo *oa, } rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm, - page_count, pga, &request); + page_count, pga, &request, pshift); CLASSERT(sizeof(*aa) <= sizeof(request->rq_async_args)); aa = ptlrpc_req_async_args(request); @@ -1676,14 +1683,15 @@ static void sort_brw_pages(struct brw_page **array, int num) } while (stride > 1); } -static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages) +static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages, + int pshift) { int count = 1; int offset; int i = 0; LASSERT (pages > 0); - offset = pg[i]->off & (~CFS_PAGE_MASK); + offset = OSC_FILE2MEM_OFF(pg[i]->off, pshift) & ~CFS_PAGE_MASK; for (;;) { pages--; @@ -1694,7 +1702,7 @@ static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages) return count; /* doesn't end on page boundary */ i++; - offset = pg[i]->off & (~CFS_PAGE_MASK); + offset = OSC_FILE2MEM_OFF(pg[i]->off, pshift) & ~CFS_PAGE_MASK; if (offset != 0) /* doesn't start on page boundary */ return count; @@ -1764,7 +1772,7 @@ static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo, else pages_per_brw = page_count; - pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw); + pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw, 0); if (saved_oa != NULL) { /* restore previously saved oa */ @@ -1799,7 +1807,7 @@ out: static int osc_brw_async(int cmd, struct obd_export *exp, struct obd_info *oinfo, obd_count page_count, struct brw_page *pga, struct obd_trans_info *oti, - struct ptlrpc_request_set *set) + struct ptlrpc_request_set *set, int pshift) { struct brw_page **ppga, **orig; int page_count_orig; @@ -1830,7 +1838,8 @@ static int osc_brw_async(int cmd, struct obd_export *exp, pages_per_brw = min_t(obd_count, page_count, class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc); - pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw); + pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw, + pshift); /* use ppga only if single RPC is going to fly */ if (pages_per_brw != page_count_orig || ppga != orig) { @@ -1853,7 +1862,7 @@ static int osc_brw_async(int cmd, struct obd_export *exp, } rc = async_internal(cmd, exp, oa, oinfo->oi_md, pages_per_brw, - copy, set); + copy, set, pshift); if (rc != 0) { if (copy != ppga) @@ -2182,7 +2191,7 @@ static int brw_interpret(struct ptlrpc_request *request, void *data, int rc) obd_count i; for (i = 0; i < aa->aa_page_count; i++) osc_release_write_grant(aa->aa_cli, aa->aa_ppga[i], 1); - + if (aa->aa_oa->o_flags & OBD_FL_TEMPORARY) OBDO_FREE(aa->aa_oa); } @@ -2244,7 +2253,7 @@ static struct ptlrpc_request *osc_build_req(struct client_obd *cli, } sort_brw_pages(pga, page_count); - rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, pga, &req); + rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count, pga, &req, 0); if (rc != 0) { CERROR("prep_req failed: %d\n", rc); GOTO(out, req = ERR_PTR(rc)); -- 1.8.3.1