Whamcloud - gitweb
b=9332
authoralex <alex>
Tue, 20 Jun 2006 11:30:18 +0000 (11:30 +0000)
committeralex <alex>
Tue, 20 Jun 2006 11:30:18 +0000 (11:30 +0000)
 - instead of taking a lock covering whole requested region ll_file_read()
   and ll_file_write() grab locks that cover a single stripe at most (but
   not in O_APPEND case). this way we improve overall stability because
   we don't hold locks awaiting some failed OST to recover.

lustre/include/obd.h
lustre/include/obd_class.h
lustre/llite/file.c
lustre/llite/llite_internal.h
lustre/llite/llite_lib.c
lustre/llite/lproc_llite.c
lustre/lov/lov_obd.c
lustre/obdclass/lprocfs_status.c

index 73b765b..085fe67 100644 (file)
@@ -951,6 +951,8 @@ struct obd_ops {
                             struct niobuf_remote *remote);
         int (*o_init_export)(struct obd_export *exp);
         int (*o_destroy_export)(struct obd_export *exp);
+        int (*o_extent_calc)(struct obd_export *, struct lov_stripe_md *,
+                             int cmd, obd_off *);
 
         /* llog related obd_methods */
         int (*o_llog_init)(struct obd_device *obd, struct obd_device *disk_obd,
@@ -1019,6 +1021,10 @@ static inline struct lsm_operations *lsm_op_find(int magic)
 
 int lvfs_check_io_health(struct obd_device *obd, struct file *file);
 
+/* Requests for obd_extent_calc() */
+#define OBD_CALC_STRIPE_START   1
+#define OBD_CALC_STRIPE_END     2
+
 static inline void obd_transno_commit_cb(struct obd_device *obd, __u64 transno,
                                          int error)
 {
index 59edbce..60fe1fc 100644 (file)
@@ -646,6 +646,17 @@ static inline int obd_destroy_export(struct obd_export *exp)
         RETURN(0);
 }
 
+static inline int obd_extent_calc(struct obd_export *exp,
+                                  struct lov_stripe_md *md,
+                                  int cmd, obd_off *offset)
+{
+        int rc;
+        ENTRY;
+        EXP_CHECK_OP(exp, extent_calc);
+        rc = OBP(exp->exp_obd, extent_calc)(exp, md, cmd, offset);
+        RETURN(rc);
+}
+
 static inline struct dentry *
 obd_lvfs_fid2dentry(struct obd_export *exp, __u64 id_ino, __u32 gen, __u64 gr)
 {
index de1245c..db01d19 100644 (file)
@@ -1076,12 +1076,15 @@ static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
         struct inode *inode = file->f_dentry->d_inode;
         struct ll_inode_info *lli = ll_i2info(inode);
         struct lov_stripe_md *lsm = lli->lli_smd;
+        struct ll_sb_info *sbi = ll_i2sbi(inode);
         struct ll_lock_tree tree;
         struct ll_lock_tree_node *node;
         struct ost_lvb lvb;
         struct ll_ra_read bead;
-        int rc;
-        ssize_t retval;
+        int rc, ra = 0;
+        loff_t end;
+        ssize_t retval, chunk, sum = 0;
+
         __u64 kms;
         ENTRY;
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
@@ -1121,12 +1124,29 @@ static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
                 RETURN(count);
         }
 
-        node = ll_node_from_inode(inode, *ppos, *ppos + count - 1, LCK_PR);
+repeat:
+        if (sbi->ll_max_rw_chunk != 0) {
+                /* first, let's know the end of the current stripe */
+                end = *ppos;
+                obd_extent_calc(sbi->ll_osc_exp, lsm, OBD_CALC_STRIPE_END, &end);
+
+                /* correct, the end is beyond the request */
+                if (end > *ppos + count - 1)
+                        end = *ppos + count - 1;
+
+                /* and chunk shouldn't be too large even if striping is wide */
+                if (end - *ppos > sbi->ll_max_rw_chunk)
+                        end = *ppos + sbi->ll_max_rw_chunk - 1;
+        } else {
+                end = *ppos + count - 1;
+        }
+       
+        node = ll_node_from_inode(inode, *ppos, end, LCK_PR);
         tree.lt_fd = LUSTRE_FPRIVATE(file);
         rc = ll_tree_lock(&tree, node, buf, count,
                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
         if (rc != 0)
-                RETURN(rc);
+                GOTO(out, retval = rc);
 
         ll_inode_size_lock(inode, 1);
         /*
@@ -1164,8 +1184,9 @@ static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
                 ll_inode_size_unlock(inode, 1);
         }
 
+        chunk = end - *ppos + 1;
         CDEBUG(D_INFO, "Read ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
-               inode->i_ino, count, *ppos, inode->i_size);
+               inode->i_ino, chunk, *ppos, inode->i_size);
 
         /* turn off the kernel's read-ahead */
 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
@@ -1173,16 +1194,32 @@ static ssize_t ll_file_read(struct file *file, char *buf, size_t count,
 #else
         file->f_ra.ra_pages = 0;
 #endif
-        bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
-        bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
-        ll_ra_read_in(file, &bead);
+        /* initialize read-ahead window once per syscall */
+        if (ra == 0) {
+                ra = 1;
+                bead.lrr_start = *ppos >> CFS_PAGE_SHIFT;
+                bead.lrr_count = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
+                ll_ra_read_in(file, &bead);
+        }
+
         /* BUG: 5972 */
         file_accessed(file);
-        retval = generic_file_read(file, buf, count, ppos);
-        ll_ra_read_ex(file, &bead);
+        retval = generic_file_read(file, buf, chunk, ppos);
 
- out:
         ll_tree_unlock(&tree);
+
+        if (retval > 0) {
+                buf += retval;
+                count -= retval;
+                sum += retval;
+                if (retval == chunk && count > 0)
+                        goto repeat;
+        }
+
+ out:
+        if (ra != 0)
+                ll_ra_read_ex(file, &bead);
+        retval = (sum > 0) ? sum : retval;
         RETURN(retval);
 }
 
@@ -1193,10 +1230,13 @@ static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
                              loff_t *ppos)
 {
         struct inode *inode = file->f_dentry->d_inode;
+        struct ll_sb_info *sbi = ll_i2sbi(inode);
+        struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
         struct ll_lock_tree tree;
         struct ll_lock_tree_node *node;
         loff_t maxbytes = ll_file_maxbytes(inode);
-        ssize_t retval;
+        loff_t lock_start, lock_end, end;
+        ssize_t retval, chunk, sum = 0;
         int rc;
         ENTRY;
         CDEBUG(D_VFSTRACE, "VFS Op:inode=%lu/%u(%p),size="LPSZ",offset=%Ld\n",
@@ -1216,25 +1256,50 @@ static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
 
         LASSERT(ll_i2info(inode)->lli_smd != NULL);
 
-        if (file->f_flags & O_APPEND)
-                node = ll_node_from_inode(inode, 0, OBD_OBJECT_EOF, LCK_PW);
-        else
-                node = ll_node_from_inode(inode, *ppos, *ppos  + count - 1,
-                                          LCK_PW);
+        down(&ll_i2info(inode)->lli_write_sem);
+
+repeat:
+        chunk = 0; /* just to fix gcc's warning */
+        end = *ppos + count - 1;
+
+        if (file->f_flags & O_APPEND) {
+                lock_start = 0;
+                lock_end = OBD_OBJECT_EOF;
+        } else if (sbi->ll_max_rw_chunk != 0) {
+                /* first, let's know the end of the current stripe */
+                end = *ppos;
+                obd_extent_calc(sbi->ll_osc_exp, lsm, OBD_CALC_STRIPE_END, &end);
+
+                /* correct, the end is beyond the request */
+                if (end > *ppos + count - 1)
+                        end = *ppos + count - 1;
+
+                /* and chunk shouldn't be too large even if striping is wide */
+                if (end - *ppos > sbi->ll_max_rw_chunk)
+                        end = *ppos + sbi->ll_max_rw_chunk - 1;
+                lock_start = *ppos;
+                lock_end = end;
+        } else {
+                lock_start = *ppos;
+                lock_end = *ppos + count - 1;
+        }
+        node = ll_node_from_inode(inode, lock_start, lock_end, LCK_PW);
 
         if (IS_ERR(node))
-                RETURN(PTR_ERR(node));
+                GOTO(out, retval = PTR_ERR(node));
 
         tree.lt_fd = LUSTRE_FPRIVATE(file);
         rc = ll_tree_lock(&tree, node, buf, count,
                           file->f_flags & O_NONBLOCK ? LDLM_FL_BLOCK_NOWAIT :0);
         if (rc != 0)
-                RETURN(rc);
+                GOTO(out, retval = rc);
 
         /* this is ok, g_f_w will overwrite this under i_mutex if it races
          * with a local truncate, it just makes our maxbyte checking easier */
-        if (file->f_flags & O_APPEND)
+        if (file->f_flags & O_APPEND) {
                 *ppos = inode->i_size;
+                end = *ppos + count - 1;
+        }
 
         if (*ppos >= maxbytes) {
                 send_sig(SIGXFSZ, current, 0);
@@ -1243,14 +1308,26 @@ static ssize_t ll_file_write(struct file *file, const char *buf, size_t count,
         if (*ppos + count > maxbytes)
                 count = maxbytes - *ppos;
 
-        CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
-               inode->i_ino, count, *ppos);
-
         /* generic_file_write handles O_APPEND after getting i_mutex */
-        retval = generic_file_write(file, buf, count, ppos);
+        chunk = end - *ppos + 1;
+        CDEBUG(D_INFO, "Writing inode %lu, "LPSZ" bytes, offset %Lu\n",
+               inode->i_ino, chunk, *ppos);
+        retval = generic_file_write(file, buf, chunk, ppos);
 
 out:
         ll_tree_unlock(&tree);
+
+        if (retval > 0) {
+                buf += retval;
+                count -= retval;
+                sum += retval;
+                if (retval == chunk && count > 0)
+                        goto repeat;
+        }
+
+        up(&ll_i2info(inode)->lli_write_sem);
+
+        retval = (sum > 0) ? sum : retval;
         lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, LPROC_LL_WRITE_BYTES,
                             retval > 0 ? retval : 0);
         RETURN(retval);
index 443bfc1..9a9ec6c 100644 (file)
@@ -79,6 +79,7 @@ struct ll_inode_info {
         struct semaphore        lli_size_sem;
         void                   *lli_size_sem_owner;
         struct semaphore        lli_open_sem;
+        struct semaphore        lli_write_sem;
         struct lov_stripe_md   *lli_smd;
         char                   *lli_symlink_name;
         __u64                   lli_maxbytes;
@@ -207,8 +208,13 @@ struct ll_sb_info {
 
         struct list_head          ll_deathrow; /* inodes to be destroyed (b1443) */
         spinlock_t                ll_deathrow_lock;
+        /* =0 - hold lock over whole read/write
+         * >0 - max. chunk to be read/written w/o lock re-acquiring */
+        unsigned long             ll_max_rw_chunk;
 };
 
+#define LL_DEFAULT_MAX_RW_CHUNK         (32 * 1024 * 1024)
+
 struct ll_ra_read {
         pgoff_t             lrr_start;
         pgoff_t             lrr_count;
index 37bcd94..6a7abff 100644 (file)
@@ -185,6 +185,7 @@ int client_common_fill_super(struct super_block *sb, char *mdc, char *osc)
         sb->s_magic = LL_SUPER_MAGIC;
         sb->s_maxbytes = PAGE_CACHE_MAXBYTES;
         sbi->ll_namelen = osfs.os_namelen;
+        sbi->ll_max_rw_chunk = LL_DEFAULT_MAX_RW_CHUNK;
 
         if ((sbi->ll_flags & LL_SBI_USER_XATTR) &&
             !(data->ocd_connect_flags & OBD_CONNECT_XATTR)) {
@@ -643,6 +644,7 @@ void ll_lli_init(struct ll_inode_info *lli)
 {
         sema_init(&lli->lli_open_sem, 1);
         sema_init(&lli->lli_size_sem, 1);
+        sema_init(&lli->lli_write_sem, 1);
         lli->lli_flags = 0;
         lli->lli_maxbytes = PAGE_CACHE_MAXBYTES;
         spin_lock_init(&lli->lli_lock);
index 025d6e8..5c7ee25 100644 (file)
@@ -338,6 +338,27 @@ static int ll_wr_checksum(struct file *file, const char *buffer,
         return count;
 }
 
+static int ll_rd_max_rw_chunk(char *page, char **start, off_t off,
+                          int count, int *eof, void *data)
+{
+        struct super_block *sb = data;
+
+        return snprintf(page, count, "%lu\n", ll_s2sbi(sb)->ll_max_rw_chunk);
+}
+
+static int ll_wr_max_rw_chunk(struct file *file, const char *buffer,
+                          unsigned long count, void *data)
+{
+        struct super_block *sb = data;
+        int rc, val;
+
+        rc = lprocfs_write_helper(buffer, count, &val);
+        if (rc)
+                return rc;
+        ll_s2sbi(sb)->ll_max_rw_chunk = val;
+        return count;
+}
+
 static struct lprocfs_vars lprocfs_obd_vars[] = {
         { "uuid",         ll_rd_sb_uuid,          0, 0 },
         //{ "mntpt_path",   ll_rd_path,             0, 0 },
@@ -355,6 +376,7 @@ static struct lprocfs_vars lprocfs_obd_vars[] = {
                                      ll_wr_max_read_ahead_whole_mb, 0 },
         { "max_cached_mb", ll_rd_max_cached_mb, ll_wr_max_cached_mb, 0 },
         { "checksum_pages", ll_rd_checksum, ll_wr_checksum, 0 },
+        { "max_rw_chunk", ll_rd_max_rw_chunk, ll_wr_max_rw_chunk, 0 },
         { 0 }
 };
 
index 329a47e..1fcb361 100644 (file)
@@ -2437,6 +2437,31 @@ int lov_test_and_clear_async_rc(struct lov_stripe_md *lsm)
 }
 EXPORT_SYMBOL(lov_test_and_clear_async_rc);
 
+
+static int lov_extent_calc(struct obd_export *exp, struct lov_stripe_md *lsm,
+                           int cmd, obd_off *offset)
+{
+        unsigned long ssize  = lsm->lsm_stripe_size;
+        obd_off start;
+
+        start = *offset;
+        do_div(start, ssize);
+        start = start * ssize;
+
+        CDEBUG(D_DLMTRACE, "offset %Lu, stripe %lu, start %Lu, end %Lu\n", 
+               *offset, ssize, start, start + ssize - 1);
+        if (cmd == OBD_CALC_STRIPE_END) {
+                *offset = start + ssize - 1;
+        } else if (cmd == OBD_CALC_STRIPE_START) {
+                *offset = start;
+        } else {
+                LBUG();
+        }
+
+        RETURN(0);
+}
+
+
 #if 0
 struct lov_multi_wait {
         struct ldlm_lock *lock;
@@ -2583,6 +2608,7 @@ struct obd_ops lov_obd_ops = {
         .o_iocontrol           = lov_iocontrol,
         .o_get_info            = lov_get_info,
         .o_set_info_async      = lov_set_info_async,
+        .o_extent_calc         = lov_extent_calc,
         .o_llog_init           = lov_llog_init,
         .o_llog_finish         = lov_llog_finish,
         .o_notify              = lov_notify,
index 8686178..40767ee 100644 (file)
@@ -705,6 +705,7 @@ int lprocfs_alloc_obd_stats(struct obd_device *obd, unsigned num_private_stats)
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, san_preprw);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, init_export);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, destroy_export);
+        LPROCFS_OBD_OP_INIT(num_private_stats, stats, extent_calc);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, llog_init);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, llog_finish);
         LPROCFS_OBD_OP_INIT(num_private_stats, stats, pin);