#define O_LOV_DELAY_CREATE 0100000000 /* hopefully this does not conflict */
-#define LL_FILE_IGNORE_LOCK 0x00000001
-#define LL_FILE_GROUP_LOCKED 0x00000002
-#define LL_FILE_READAHEAD 0x00000004
+#define LL_FILE_IGNORE_LOCK 0x00000001
+#define LL_FILE_GROUP_LOCKED 0x00000002
+#define LL_FILE_READAHEAD 0x00000004
+#define LL_FILE_LOCKED_DIRECTIO 0x00000008 /* client-side locks with dio */
+#define LL_FILE_LOCKLESS_IO 0x00000010 /* server-side locks with cio */
#define LOV_USER_MAGIC_V1 0x0BD10BD0
#define LOV_USER_MAGIC LOV_USER_MAGIC_V1
sbi->ll_lco.lco_flags);
RETURN(0);
}
+
if (fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK))
RETURN(0);
+
+ /* server-side locking for dio unless LL_FILE_LOCKED_DIRECTIO */
+ if ((file->f_flags & O_DIRECT) &&
+ !(fd && (fd->fd_flags & LL_FILE_LOCKED_DIRECTIO)))
+ RETURN(1);
+
+ /* server-side locking for cached I/O with LL_FILE_LOCKLESS_IO */
+ if (!(file->f_flags & O_DIRECT) &&
+ fd && fd->fd_flags & LL_FILE_LOCKLESS_IO)
+ RETURN(1);
+
if (test_bit(LLI_F_CONTENDED, &lli->lli_flags)) {
cfs_time_t cur_time = cfs_time_current();
cfs_time_t retry_time;
&tree, OBD_BRW_READ);
up_read(&lli->lli_truncate_rwsem);
} else {
- /* lockless read
- *
- * current time will get into request as atime
- * (lustre/osc/osc_request.c:osc_build_request())
- */
- retval = ll_file_lockless_io(file, iov_copy, nrsegs_copy, ppos,
- READ, chunk);
+ retval = ll_direct_IO(READ, file, iov_copy, *ppos, nr_segs, 0);
+ if (retval > 0) {
+ lprocfs_counter_add(sbi->ll_stats,
+ LPROC_LL_LOCKLESS_READ,
+ (long)retval);
+ *ppos += retval;
+ }
}
ll_rw_stats_tally(sbi, current->pid, file, count, 0);
if (retval > 0) {
*ppos);
#endif
} else {
- /* lockless write
- *
- * current time will get into request as mtime and
- * ctime (lustre/osc/osc_request.c:osc_build_request())
- */
- retval = ll_file_lockless_io(file, iov_copy, nrsegs_copy,
- ppos, WRITE, chunk);
+ retval = ll_direct_IO(WRITE, file, iov_copy, *ppos, nr_segs, 0);
+ if (retval > 0) {
+ lprocfs_counter_add(sbi->ll_stats,
+ LPROC_LL_LOCKLESS_WRITE,
+ (long)retval);
+ *ppos += retval;
+ }
}
ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, chunk, 1);
int ll_file_punch(struct inode *, loff_t, int);
ssize_t ll_file_lockless_io(struct file *, const struct iovec *,
unsigned long, loff_t *, int, ssize_t);
+ssize_t ll_direct_IO(int rw, struct file *file,const struct iovec *iov,
+ loff_t file_offset, unsigned long nr_segs, int locked);
void ll_clear_file_contended(struct inode*);
int ll_sync_page_range(struct inode *, struct address_space *, loff_t, size_t);
struct ptlrpc_request_set *set,
size_t size, loff_t file_offset,
struct page **pages, int page_count,
- unsigned long user_addr)
+ unsigned long user_addr, int locked)
{
struct brw_page *pga;
int i, rc = 0, pshift;
/* To the end of the page, or the length, whatever is less */
pga[i].count = min_t(int, CFS_PAGE_SIZE -(user_addr & ~CFS_PAGE_MASK),
length);
+
pga[i].flag = OBD_BRW_SYNC;
+ if (!locked)
+ pga[i].flag |= OBD_BRW_SRVLOCK;
+
if (rw == READ)
POISON_PAGE(pages[i], 0x0d);
* then truncate this to be a full-sized RPC. This is 22MB for 4kB pages. */
#define MAX_DIO_SIZE ((128 * 1024 / sizeof(struct brw_page) * CFS_PAGE_SIZE) & \
~(PTLRPC_MAX_BRW_SIZE - 1))
-static ssize_t ll_direct_IO_26(int rw, struct kiocb *iocb,
- const struct iovec *iov, loff_t file_offset,
- unsigned long nr_segs)
+
+ssize_t ll_direct_IO(int rw, struct file *file,
+ const struct iovec *iov, loff_t file_offset,
+ unsigned long nr_segs, int locked)
{
- struct file *file = iocb->ki_filp;
struct inode *inode = file->f_mapping->host;
ssize_t count = iov_length(iov, nr_segs);
ssize_t tot_bytes = 0, result = 0;
bytes,
file_offset, pages,
page_count,
- user_addr);
+ user_addr, locked);
ll_free_user_pages(pages, max_pages, rw==READ);
} else if (page_count == 0) {
GOTO(out, result = -EFAULT);
RETURN(tot_bytes);
}
+static ssize_t ll_direct_IO_26(int rw, struct kiocb *kiocb,
+ const struct iovec *iov, loff_t file_offset,
+ unsigned long nr_segs)
+{
+ return ll_direct_IO(rw, kiocb->ki_filp, iov, file_offset, nr_segs, 1);
+}
+
struct address_space_operations ll_aops = {
.readpage = ll_readpage,
// .readpages = ll_readpages,
struct obdo *oa;
obd_count pages_per_brw;
+ /* one page less under unaligned direct i/o */
pages_per_brw = min_t(obd_count, page_count,
- class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc);
+ class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc -
+ !!pshift);
pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw,
pshift);