From 22d02329e7fb5b836c9f05c721e23a68645aed48 Mon Sep 17 00:00:00 2001 From: qian Date: Sat, 26 Jan 2008 07:46:20 +0000 Subject: [PATCH] b=13099 i=adilger,deen,shadow fix the directIO path: read hitting the hole or beyong the end of file. Submit I/O to various oscs in one RPC set. --- lustre/include/lustre_net.h | 11 ++++- lustre/include/obd_class.h | 5 --- lustre/llite/rw26.c | 96 ++++++++++++++++++++++++++++--------------- lustre/lov/lov_obd.c | 6 ++- lustre/osc/osc_request.c | 3 -- lustre/ptlrpc/client.c | 31 +++++++++++++- lustre/ptlrpc/ptlrpc_module.c | 1 + lustre/tests/directio.c | 10 +++-- lustre/tests/sanity.sh | 10 +++++ 9 files changed, 124 insertions(+), 49 deletions(-) diff --git a/lustre/include/lustre_net.h b/lustre/include/lustre_net.h index 9e1a929..4f5bc8a 100644 --- a/lustre/include/lustre_net.h +++ b/lustre/include/lustre_net.h @@ -205,16 +205,21 @@ struct ptlrpc_request_set { cfs_waitq_t set_waitq; cfs_waitq_t *set_wakeup_ptr; struct list_head set_requests; + struct list_head set_cblist; /* list of completion callbacks */ set_interpreter_func set_interpret; /* completion callback */ void *set_arg; /* completion context */ - void *set_countp; /* pointer to NOB counter in case - * of directIO (bug11737) */ /* locked so that any old caller can communicate requests to * the set holder who can then fold them into the lock-free set */ spinlock_t set_new_req_lock; struct list_head set_new_requests; }; +struct ptlrpc_set_cbdata { + struct list_head psc_item; + set_interpreter_func psc_interpret; + void *psc_data; +}; + struct ptlrpc_bulk_desc; /* @@ -743,6 +748,8 @@ void ptlrpc_restart_req(struct ptlrpc_request *req); void ptlrpc_abort_inflight(struct obd_import *imp); struct ptlrpc_request_set *ptlrpc_prep_set(void); +int ptlrpc_set_add_cb(struct ptlrpc_request_set *set, + set_interpreter_func fn, void *data); int ptlrpc_set_next_timeout(struct ptlrpc_request_set *); int ptlrpc_check_set(struct ptlrpc_request_set *set); int ptlrpc_set_wait(struct ptlrpc_request_set *); diff --git a/lustre/include/obd_class.h b/lustre/include/obd_class.h index 46b1196..3f111f9 100644 --- a/lustre/include/obd_class.h +++ b/lustre/include/obd_class.h @@ -1173,15 +1173,12 @@ static inline int obd_brw_rqset(int cmd, struct obd_export *exp, { struct ptlrpc_request_set *set = NULL; struct obd_info oinfo = { { { 0 } } }; - atomic_t nob; int rc = 0; ENTRY; set = ptlrpc_prep_set(); if (set == NULL) RETURN(-ENOMEM); - atomic_set(&nob, 0); - set->set_countp = &nob; oinfo.oi_oa = oa; oinfo.oi_md = lsm; @@ -1191,8 +1188,6 @@ static inline int obd_brw_rqset(int cmd, struct obd_export *exp, rc = ptlrpc_set_wait(set); if (rc) CERROR("error from callback: rc = %d\n", rc); - else - rc = atomic_read(&nob); } else { CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, "error from obd_brw_async: rc = %d\n", rc); diff --git a/lustre/llite/rw26.c b/lustre/llite/rw26.c index 85450c1..d2e625c 100644 --- a/lustre/llite/rw26.c +++ b/lustre/llite/rw26.c @@ -145,16 +145,14 @@ static void ll_free_user_pages(struct page **pages, int npages, int do_dirty) static ssize_t ll_direct_IO_26_seg(int rw, struct inode *inode, struct address_space *mapping, - struct lov_stripe_md *lsm, + struct obd_info *oinfo, + struct ptlrpc_request_set *set, size_t size, loff_t file_offset, struct page **pages, int page_count) { struct brw_page *pga; - struct obdo oa; - int opc, i, rc = 0; + int i, rc = 0; size_t length; - struct obd_capa *ocapa; - loff_t file_offset_orig = file_offset; ENTRY; OBD_ALLOC(pga, sizeof(*pga) * page_count); @@ -177,28 +175,11 @@ static ssize_t ll_direct_IO_26_seg(int rw, struct inode *inode, POISON_PAGE(pages[i], 0x0d); } - ll_inode_fill_obdo(inode, rw == WRITE ? OBD_BRW_WRITE : OBD_BRW_READ, &oa); - - if (rw == WRITE) { - lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, - LPROC_LL_DIRECT_WRITE, size); - opc = CAPA_OPC_OSS_WRITE; - llap_write_pending(inode, NULL); - } else { - lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, - LPROC_LL_DIRECT_READ, size); - opc = CAPA_OPC_OSS_RW; - } - ocapa = ll_osscapa_get(inode, opc); - rc = obd_brw_rqset(rw == WRITE ? OBD_BRW_WRITE : OBD_BRW_READ, - ll_i2dtexp(inode), &oa, lsm, page_count, pga, NULL, - ocapa); - capa_put(ocapa); - if ((rc > 0) && (rw == WRITE)) { - lov_stripe_lock(lsm); - obd_adjust_kms(ll_i2dtexp(inode), lsm, file_offset_orig + rc, 0); - lov_stripe_unlock(lsm); - } + rc = obd_brw_async(rw == WRITE ? OBD_BRW_WRITE : OBD_BRW_READ, + ll_i2dtexp(inode), oinfo, page_count, + pga, NULL, set); + if (rc == 0) + rc = size; OBD_FREE(pga, sizeof(*pga) * page_count); RETURN(rc); @@ -218,8 +199,13 @@ static ssize_t ll_direct_IO_26(int rw, struct kiocb *iocb, struct inode *inode = file->f_mapping->host; ssize_t count = iov_length(iov, nr_segs), tot_bytes = 0; struct ll_inode_info *lli = ll_i2info(inode); + struct lov_stripe_md *lsm = lli->lli_smd; + struct ptlrpc_request_set *set; + struct obd_info oinfo; + struct obdo oa; unsigned long seg = 0; size_t size = MAX_DIO_SIZE; + int opc; ENTRY; if (!lli->lli_smd || !lli->lli_smd->lsm_object_id) @@ -235,10 +221,14 @@ static ssize_t ll_direct_IO_26(int rw, struct kiocb *iocb, file_offset, file_offset, count >> CFS_PAGE_SHIFT, MAX_DIO_SIZE >> CFS_PAGE_SHIFT); - if (rw == WRITE) + if (rw == WRITE) { ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_DIRECT_WRITE, count); - else + opc = CAPA_OPC_OSS_WRITE; + llap_write_pending(inode, NULL); + } else { ll_stats_ops_tally(ll_i2sbi(inode), LPROC_LL_DIRECT_READ, count); + opc = CAPA_OPC_OSS_RW; + } /* Check that all user buffers are aligned as well */ for (seg = 0; seg < nr_segs; seg++) { @@ -247,10 +237,31 @@ static ssize_t ll_direct_IO_26(int rw, struct kiocb *iocb, RETURN(-EINVAL); } + set = ptlrpc_prep_set(); + if (set == NULL) + RETURN(-ENOMEM); + + ll_inode_fill_obdo(inode, rw, &oa); + oinfo.oi_oa = &oa; + oinfo.oi_md = lsm; + oinfo.oi_capa = ll_osscapa_get(inode, opc); + + /* need locking between buffered and direct access. and race with + *size changing by concurrent truncates and writes. */ + if (rw == READ) + LOCK_INODE_MUTEX(inode); + for (seg = 0; seg < nr_segs; seg++) { size_t iov_left = iov[seg].iov_len; unsigned long user_addr = (unsigned long)iov[seg].iov_base; + if (rw == READ) { + if (file_offset >= inode->i_size) + break; + if (file_offset + iov_left > inode->i_size) + iov_left = inode->i_size - file_offset; + } + while (iov_left > 0) { struct page **pages; int page_count; @@ -263,7 +274,7 @@ static ssize_t ll_direct_IO_26(int rw, struct kiocb *iocb, if (page_count > 0) { result = ll_direct_IO_26_seg(rw, inode, file->f_mapping, - lli->lli_smd, + &oinfo, set, min(size,iov_left), file_offset, pages, page_count); @@ -288,9 +299,9 @@ static ssize_t ll_direct_IO_26(int rw, struct kiocb *iocb, continue; } - if (tot_bytes > 0) - RETURN(tot_bytes); - RETURN(page_count < 0 ? page_count : result); + if (tot_bytes <= 0) + tot_bytes = page_count < 0 ? page_count : result; + GOTO(out, tot_bytes); } tot_bytes += result; @@ -299,6 +310,25 @@ static ssize_t ll_direct_IO_26(int rw, struct kiocb *iocb, user_addr += result; } } +out: + if (rw == READ) + UNLOCK_INODE_MUTEX(inode); + + if (tot_bytes > 0) { + int rc; + + rc = ptlrpc_set_wait(set); + if (rc) { + tot_bytes = rc; + } else if (rw == WRITE) { + lov_stripe_lock(lsm); + obd_adjust_kms(ll_i2dtexp(inode), lsm, file_offset, 0); + lov_stripe_unlock(lsm); + } + } + + capa_put(oinfo.oi_capa); + ptlrpc_set_destroy(set); RETURN(tot_bytes); } diff --git a/lustre/lov/lov_obd.c b/lustre/lov/lov_obd.c index 7383ae6..73292c8 100644 --- a/lustre/lov/lov_obd.c +++ b/lustre/lov/lov_obd.c @@ -1593,8 +1593,10 @@ static int lov_brw_async(int cmd, struct obd_export *exp, } LASSERT(rc == 0); LASSERT(set->set_interpret == NULL); - set->set_interpret = (set_interpreter_func)lov_brw_interpret; - set->set_arg = (void *)lovset; + LASSERT(set->set_arg == NULL); + rc = ptlrpc_set_add_cb(set, lov_brw_interpret, lovset); + if (rc) + GOTO(out, rc); RETURN(rc); out: diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index fbbc634..ba98d4e 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -1404,7 +1404,6 @@ static int brw_interpret(struct ptlrpc_request *req, void *data, int rc) { struct osc_brw_async_args *aa = data; int i; - int nob = rc; ENTRY; rc = osc_brw_fini_request(req, rc); @@ -1413,8 +1412,6 @@ static int brw_interpret(struct ptlrpc_request *req, void *data, int rc) if (rc == 0) RETURN(0); } - if ((rc >= 0) && req->rq_set && req->rq_set->set_countp) - atomic_add(nob, (atomic_t *)req->rq_set->set_countp); client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock); if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) diff --git a/lustre/ptlrpc/client.c b/lustre/ptlrpc/client.c index 55a5ee2..f46f9e3 100644 --- a/lustre/ptlrpc/client.c +++ b/lustre/ptlrpc/client.c @@ -436,7 +436,8 @@ struct ptlrpc_request_set *ptlrpc_prep_set(void) set->set_remaining = 0; spin_lock_init(&set->set_new_req_lock); CFS_INIT_LIST_HEAD(&set->set_new_requests); - + CFS_INIT_LIST_HEAD(&set->set_cblist); + RETURN(set); } @@ -495,6 +496,22 @@ void ptlrpc_set_destroy(struct ptlrpc_request_set *set) EXIT; } +int ptlrpc_set_add_cb(struct ptlrpc_request_set *set, + set_interpreter_func fn, void *data) +{ + struct ptlrpc_set_cbdata *cbdata; + + OBD_ALLOC_PTR(cbdata); + if (cbdata == NULL) + RETURN(-ENOMEM); + + cbdata->psc_interpret = fn; + cbdata->psc_data = data; + list_add_tail(&cbdata->psc_item, &set->set_cblist); + + RETURN(0); +} + void ptlrpc_set_add_req(struct ptlrpc_request_set *set, struct ptlrpc_request *req) { @@ -1274,6 +1291,18 @@ int ptlrpc_set_wait(struct ptlrpc_request_set *set) int (*interpreter)(struct ptlrpc_request_set *set,void *,int) = set->set_interpret; rc = interpreter (set, set->set_arg, rc); + } else { + struct ptlrpc_set_cbdata *cbdata, *n; + int err; + + list_for_each_entry_safe(cbdata, n, + &set->set_cblist, psc_item) { + list_del_init(&cbdata->psc_item); + err = cbdata->psc_interpret(set, cbdata->psc_data, rc); + if (err && !rc) + rc = err; + OBD_FREE_PTR(cbdata); + } } RETURN(rc); diff --git a/lustre/ptlrpc/ptlrpc_module.c b/lustre/ptlrpc/ptlrpc_module.c index aa8db8c..768e892 100644 --- a/lustre/ptlrpc/ptlrpc_module.c +++ b/lustre/ptlrpc/ptlrpc_module.c @@ -174,6 +174,7 @@ EXPORT_SYMBOL(ptlrpc_retain_replayable_request); EXPORT_SYMBOL(ptlrpc_next_xid); EXPORT_SYMBOL(ptlrpc_prep_set); +EXPORT_SYMBOL(ptlrpc_set_add_cb); EXPORT_SYMBOL(ptlrpc_set_add_req); EXPORT_SYMBOL(ptlrpc_set_add_new_req); EXPORT_SYMBOL(ptlrpc_set_destroy); diff --git a/lustre/tests/directio.c b/lustre/tests/directio.c index ebcedb2..1108cba 100644 --- a/lustre/tests/directio.c +++ b/lustre/tests/directio.c @@ -23,11 +23,12 @@ int main(int argc, char **argv) long len; off64_t seek; struct stat64 st; + char pad = 0xba; int action; int rc; if (argc < 5 || argc > 6) { - printf("Usage: %s file seek nr_blocks [blocksize]\n", argv[0]); + printf("Usage: %s file seek nr_blocks [blocksize]\n", argv[0]); return 1; } @@ -37,7 +38,10 @@ int main(int argc, char **argv) action = O_WRONLY; else if (!strcmp(argv[1], "rdwr")) action = O_RDWR; - else { + else if (!strcmp(argv[1], "readhole")) { + action = O_RDONLY; + pad = 0; + } else { printf("Usage: %s file seek nr_blocks [blocksize]\n", argv[0]); return 1; } @@ -74,7 +78,7 @@ int main(int argc, char **argv) printf("No memory %s\n", strerror(errno)); return 1; } - memset(wbuf, 0xba, len); + memset(wbuf, pad, len); if (action == O_WRONLY || action == O_RDWR) { if (lseek64(fd, seek, SEEK_SET) < 0) { diff --git a/lustre/tests/sanity.sh b/lustre/tests/sanity.sh index 3ddf3bd..a29a855 100644 --- a/lustre/tests/sanity.sh +++ b/lustre/tests/sanity.sh @@ -4482,9 +4482,19 @@ test_119b() # bug 11737 sync multiop $DIR/$tfile oO_RDONLY:O_DIRECT:r$((2048 * 1024)) || \ error "direct read failed" + rm -f $DIR/$tfile } run_test 119b "Sparse directIO read must return actual read amount" +test_119c() # bug 13099 +{ + BSIZE=1048576 + directio write $DIR/$tfile 3 1 $BSIZE || error "direct write failed" + directio readhole $DIR/$tfile 0 2 $BSIZE || error "reading hole failed" + rm -f $DIR/$tfile +} +run_test 119c "Testing for direct read hitting hole" + LDLM_POOL_CTL_RECALC=1 LDLM_POOL_CTL_SHRINK=2 -- 1.8.3.1