From 57c88a7f7819fc326a81413a7ea12c168813415d Mon Sep 17 00:00:00 2001 From: anserper Date: Tue, 9 Jun 2009 19:11:36 +0000 Subject: [PATCH] b=18801 i=Alexander Zarochentsev i=Oleg Drokin replace lockless I/O with direct I/O --- lustre/include/lustre/lustre_user.h | 8 +++++--- lustre/llite/file.c | 40 ++++++++++++++++++++++++------------- lustre/llite/llite_internal.h | 2 ++ lustre/llite/rw26.c | 23 +++++++++++++++------ lustre/osc/osc_request.c | 4 +++- 5 files changed, 53 insertions(+), 24 deletions(-) diff --git a/lustre/include/lustre/lustre_user.h b/lustre/include/lustre/lustre_user.h index dbeaa1e..c88602d 100644 --- a/lustre/include/lustre/lustre_user.h +++ b/lustre/include/lustre/lustre_user.h @@ -114,9 +114,11 @@ struct obd_statfs; #define O_LOV_DELAY_CREATE 0100000000 /* hopefully this does not conflict */ -#define LL_FILE_IGNORE_LOCK 0x00000001 -#define LL_FILE_GROUP_LOCKED 0x00000002 -#define LL_FILE_READAHEAD 0x00000004 +#define LL_FILE_IGNORE_LOCK 0x00000001 +#define LL_FILE_GROUP_LOCKED 0x00000002 +#define LL_FILE_READAHEAD 0x00000004 +#define LL_FILE_LOCKED_DIRECTIO 0x00000008 /* client-side locks with dio */ +#define LL_FILE_LOCKLESS_IO 0x00000010 /* server-side locks with cio */ #define LOV_USER_MAGIC_V1 0x0BD10BD0 #define LOV_USER_MAGIC LOV_USER_MAGIC_V1 diff --git a/lustre/llite/file.c b/lustre/llite/file.c index b16def4..e3a5c94 100644 --- a/lustre/llite/file.c +++ b/lustre/llite/file.c @@ -1145,8 +1145,20 @@ static int ll_is_file_contended(struct file *file) sbi->ll_lco.lco_flags); RETURN(0); } + if (fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) RETURN(0); + + /* server-side locking for dio unless LL_FILE_LOCKED_DIRECTIO */ + if ((file->f_flags & O_DIRECT) && + !(fd && (fd->fd_flags & LL_FILE_LOCKED_DIRECTIO))) + RETURN(1); + + /* server-side locking for cached I/O with LL_FILE_LOCKLESS_IO */ + if (!(file->f_flags & O_DIRECT) && + fd && fd->fd_flags & LL_FILE_LOCKLESS_IO) + RETURN(1); + if (test_bit(LLI_F_CONTENDED, &lli->lli_flags)) { cfs_time_t cur_time = cfs_time_current(); cfs_time_t retry_time; @@ -1631,13 +1643,13 @@ repeat: &tree, OBD_BRW_READ); up_read(&lli->lli_truncate_rwsem); } else { - /* lockless read - * - * current time will get into request as atime - * (lustre/osc/osc_request.c:osc_build_request()) - */ - retval = ll_file_lockless_io(file, iov_copy, nrsegs_copy, ppos, - READ, chunk); + retval = ll_direct_IO(READ, file, iov_copy, *ppos, nr_segs, 0); + if (retval > 0) { + lprocfs_counter_add(sbi->ll_stats, + LPROC_LL_LOCKLESS_READ, + (long)retval); + *ppos += retval; + } } ll_rw_stats_tally(sbi, current->pid, file, count, 0); if (retval > 0) { @@ -1836,13 +1848,13 @@ repeat: *ppos); #endif } else { - /* lockless write - * - * current time will get into request as mtime and - * ctime (lustre/osc/osc_request.c:osc_build_request()) - */ - retval = ll_file_lockless_io(file, iov_copy, nrsegs_copy, - ppos, WRITE, chunk); + retval = ll_direct_IO(WRITE, file, iov_copy, *ppos, nr_segs, 0); + if (retval > 0) { + lprocfs_counter_add(sbi->ll_stats, + LPROC_LL_LOCKLESS_WRITE, + (long)retval); + *ppos += retval; + } } ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, chunk, 1); diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 933b081..d89e9ef 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -750,6 +750,8 @@ void ll_truncate(struct inode *inode); int ll_file_punch(struct inode *, loff_t, int); ssize_t ll_file_lockless_io(struct file *, const struct iovec *, unsigned long, loff_t *, int, ssize_t); +ssize_t ll_direct_IO(int rw, struct file *file,const struct iovec *iov, + loff_t file_offset, unsigned long nr_segs, int locked); void ll_clear_file_contended(struct inode*); int ll_sync_page_range(struct inode *, struct address_space *, loff_t, size_t); diff --git a/lustre/llite/rw26.c b/lustre/llite/rw26.c index 9bfbf6a..2be2987 100644 --- a/lustre/llite/rw26.c +++ b/lustre/llite/rw26.c @@ -162,7 +162,7 @@ static ssize_t ll_direct_IO_26_seg(int rw, struct inode *inode, struct ptlrpc_request_set *set, size_t size, loff_t file_offset, struct page **pages, int page_count, - unsigned long user_addr) + unsigned long user_addr, int locked) { struct brw_page *pga; int i, rc = 0, pshift; @@ -190,7 +190,11 @@ static ssize_t ll_direct_IO_26_seg(int rw, struct inode *inode, /* To the end of the page, or the length, whatever is less */ pga[i].count = min_t(int, CFS_PAGE_SIZE -(user_addr & ~CFS_PAGE_MASK), length); + pga[i].flag = OBD_BRW_SYNC; + if (!locked) + pga[i].flag |= OBD_BRW_SRVLOCK; + if (rw == READ) POISON_PAGE(pages[i], 0x0d); @@ -215,11 +219,11 @@ static ssize_t ll_direct_IO_26_seg(int rw, struct inode *inode, * then truncate this to be a full-sized RPC. This is 22MB for 4kB pages. */ #define MAX_DIO_SIZE ((128 * 1024 / sizeof(struct brw_page) * CFS_PAGE_SIZE) & \ ~(PTLRPC_MAX_BRW_SIZE - 1)) -static ssize_t ll_direct_IO_26(int rw, struct kiocb *iocb, - const struct iovec *iov, loff_t file_offset, - unsigned long nr_segs) + +ssize_t ll_direct_IO(int rw, struct file *file, + const struct iovec *iov, loff_t file_offset, + unsigned long nr_segs, int locked) { - struct file *file = iocb->ki_filp; struct inode *inode = file->f_mapping->host; ssize_t count = iov_length(iov, nr_segs); ssize_t tot_bytes = 0, result = 0; @@ -287,7 +291,7 @@ static ssize_t ll_direct_IO_26(int rw, struct kiocb *iocb, bytes, file_offset, pages, page_count, - user_addr); + user_addr, locked); ll_free_user_pages(pages, max_pages, rw==READ); } else if (page_count == 0) { GOTO(out, result = -EFAULT); @@ -342,6 +346,13 @@ unlock_mutex: RETURN(tot_bytes); } +static ssize_t ll_direct_IO_26(int rw, struct kiocb *kiocb, + const struct iovec *iov, loff_t file_offset, + unsigned long nr_segs) +{ + return ll_direct_IO(rw, kiocb->ki_filp, iov, file_offset, nr_segs, 1); +} + struct address_space_operations ll_aops = { .readpage = ll_readpage, // .readpages = ll_readpages, diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index 4f64d97..38d0ad0 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -1871,8 +1871,10 @@ static int osc_brw_async(int cmd, struct obd_export *exp, struct obdo *oa; obd_count pages_per_brw; + /* one page less under unaligned direct i/o */ pages_per_brw = min_t(obd_count, page_count, - class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc); + class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc - + !!pshift); pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw, pshift); -- 1.8.3.1