Whamcloud - gitweb
b=18801
authoranserper <anserper>
Tue, 9 Jun 2009 19:11:36 +0000 (19:11 +0000)
committeranserper <anserper>
Tue, 9 Jun 2009 19:11:36 +0000 (19:11 +0000)
i=Alexander Zarochentsev
i=Oleg Drokin

replace lockless I/O with direct I/O

lustre/include/lustre/lustre_user.h
lustre/llite/file.c
lustre/llite/llite_internal.h
lustre/llite/rw26.c
lustre/osc/osc_request.c

index dbeaa1e..c88602d 100644 (file)
@@ -114,9 +114,11 @@ struct obd_statfs;
 
 #define O_LOV_DELAY_CREATE 0100000000  /* hopefully this does not conflict */
 
-#define LL_FILE_IGNORE_LOCK             0x00000001
-#define LL_FILE_GROUP_LOCKED            0x00000002
-#define LL_FILE_READAHEAD               0x00000004
+#define LL_FILE_IGNORE_LOCK         0x00000001
+#define LL_FILE_GROUP_LOCKED        0x00000002
+#define LL_FILE_READAHEAD           0x00000004
+#define LL_FILE_LOCKED_DIRECTIO     0x00000008 /* client-side locks with dio */
+#define LL_FILE_LOCKLESS_IO         0x00000010 /* server-side locks with cio */
 
 #define LOV_USER_MAGIC_V1 0x0BD10BD0
 #define LOV_USER_MAGIC    LOV_USER_MAGIC_V1
index b16def4..e3a5c94 100644 (file)
@@ -1145,8 +1145,20 @@ static int ll_is_file_contended(struct file *file)
                        sbi->ll_lco.lco_flags);
                 RETURN(0);
         }
+
         if (fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK))
                 RETURN(0);
+
+        /* server-side locking for dio unless LL_FILE_LOCKED_DIRECTIO */
+        if ((file->f_flags & O_DIRECT) &&
+            !(fd && (fd->fd_flags & LL_FILE_LOCKED_DIRECTIO)))
+                RETURN(1);
+
+        /* server-side locking for cached I/O with LL_FILE_LOCKLESS_IO */
+        if (!(file->f_flags & O_DIRECT) &&
+            fd && fd->fd_flags & LL_FILE_LOCKLESS_IO)
+                RETURN(1);
+
         if (test_bit(LLI_F_CONTENDED, &lli->lli_flags)) {
                 cfs_time_t cur_time = cfs_time_current();
                 cfs_time_t retry_time;
@@ -1631,13 +1643,13 @@ repeat:
                                  &tree, OBD_BRW_READ);
                 up_read(&lli->lli_truncate_rwsem);
         } else {
-                /* lockless read
-                 *
-                 * current time will get into request as atime
-                 * (lustre/osc/osc_request.c:osc_build_request())
-                 */
-                retval = ll_file_lockless_io(file, iov_copy, nrsegs_copy, ppos,
-                                             READ, chunk);
+                retval = ll_direct_IO(READ, file, iov_copy, *ppos, nr_segs, 0);
+                if (retval > 0) {
+                       lprocfs_counter_add(sbi->ll_stats,
+                                           LPROC_LL_LOCKLESS_READ,
+                                           (long)retval);
+                        *ppos += retval;
+                }
         }
         ll_rw_stats_tally(sbi, current->pid, file, count, 0);
         if (retval > 0) {
@@ -1836,13 +1848,13 @@ repeat:
                                                 *ppos);
 #endif
         } else {
-                /* lockless write
-                 *
-                 * current time will get into request as mtime and
-                 * ctime (lustre/osc/osc_request.c:osc_build_request())
-                 */
-                retval = ll_file_lockless_io(file, iov_copy, nrsegs_copy,
-                                             ppos, WRITE, chunk);
+                retval = ll_direct_IO(WRITE, file, iov_copy, *ppos, nr_segs, 0);
+                if (retval > 0) {
+                       lprocfs_counter_add(sbi->ll_stats,
+                                           LPROC_LL_LOCKLESS_WRITE,
+                                           (long)retval);
+                        *ppos += retval;
+                }
         }
         ll_rw_stats_tally(ll_i2sbi(inode), current->pid, file, chunk, 1);
 
index 933b081..d89e9ef 100644 (file)
@@ -750,6 +750,8 @@ void ll_truncate(struct inode *inode);
 int ll_file_punch(struct inode *, loff_t, int);
 ssize_t ll_file_lockless_io(struct file *, const struct iovec *,
                             unsigned long, loff_t *, int, ssize_t);
+ssize_t ll_direct_IO(int rw, struct file *file,const struct iovec *iov,
+                     loff_t file_offset, unsigned long nr_segs, int locked);
 void ll_clear_file_contended(struct inode*);
 int ll_sync_page_range(struct inode *, struct address_space *, loff_t, size_t);
 
index 9bfbf6a..2be2987 100644 (file)
@@ -162,7 +162,7 @@ static ssize_t ll_direct_IO_26_seg(int rw, struct inode *inode,
                                    struct ptlrpc_request_set *set,
                                    size_t size, loff_t file_offset,
                                    struct page **pages, int page_count,
-                                   unsigned long user_addr)
+                                   unsigned long user_addr, int locked)
 {
         struct brw_page *pga;
         int i, rc = 0, pshift;
@@ -190,7 +190,11 @@ static ssize_t ll_direct_IO_26_seg(int rw, struct inode *inode,
                 /* To the end of the page, or the length, whatever is less */
                 pga[i].count = min_t(int, CFS_PAGE_SIZE -(user_addr & ~CFS_PAGE_MASK),
                                      length);
+
                 pga[i].flag = OBD_BRW_SYNC;
+                if (!locked)
+                        pga[i].flag |= OBD_BRW_SRVLOCK;
+
                 if (rw == READ)
                         POISON_PAGE(pages[i], 0x0d);
 
@@ -215,11 +219,11 @@ static ssize_t ll_direct_IO_26_seg(int rw, struct inode *inode,
  * then truncate this to be a full-sized RPC.  This is 22MB for 4kB pages. */
 #define MAX_DIO_SIZE ((128 * 1024 / sizeof(struct brw_page) * CFS_PAGE_SIZE) & \
                       ~(PTLRPC_MAX_BRW_SIZE - 1))
-static ssize_t ll_direct_IO_26(int rw, struct kiocb *iocb,
-                               const struct iovec *iov, loff_t file_offset,
-                               unsigned long nr_segs)
+
+ssize_t ll_direct_IO(int rw, struct file *file,
+                     const struct iovec *iov, loff_t file_offset,
+                     unsigned long nr_segs, int locked)
 {
-        struct file *file = iocb->ki_filp;
         struct inode *inode = file->f_mapping->host;
         ssize_t count = iov_length(iov, nr_segs);
         ssize_t tot_bytes = 0, result = 0;
@@ -287,7 +291,7 @@ static ssize_t ll_direct_IO_26(int rw, struct kiocb *iocb,
                                                              bytes,
                                                              file_offset, pages,
                                                              page_count,
-                                                             user_addr);
+                                                             user_addr, locked);
                                 ll_free_user_pages(pages, max_pages, rw==READ);
                         } else if (page_count == 0) {
                                 GOTO(out, result = -EFAULT);
@@ -342,6 +346,13 @@ unlock_mutex:
         RETURN(tot_bytes);
 }
 
+static ssize_t ll_direct_IO_26(int rw, struct kiocb *kiocb,
+                               const struct iovec *iov, loff_t file_offset,
+                               unsigned long nr_segs)
+{
+        return ll_direct_IO(rw, kiocb->ki_filp, iov, file_offset, nr_segs, 1);
+}
+
 struct address_space_operations ll_aops = {
         .readpage       = ll_readpage,
 //        .readpages      = ll_readpages,
index 4f64d97..38d0ad0 100644 (file)
@@ -1871,8 +1871,10 @@ static int osc_brw_async(int cmd, struct obd_export *exp,
                 struct obdo *oa;
                 obd_count pages_per_brw;
 
+                /* one page less under unaligned direct i/o */
                 pages_per_brw = min_t(obd_count, page_count,
-                    class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc);
+                    class_exp2cliimp(exp)->imp_obd->u.cli.cl_max_pages_per_rpc -
+                                      !!pshift);
 
                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw,
                                                        pshift);