Whamcloud - gitweb
merge b_devel into HEAD. Includes:
[fs/lustre-release.git] / lustre / llite / rw.c
index 6818ace..409f308 100644 (file)
@@ -33,6 +33,7 @@
 #include <asm/system.h>
 #include <asm/uaccess.h>
 
+
 #include <linux/fs.h>
 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
 #include <linux/buffer_head.h>
@@ -70,6 +71,7 @@ static void __set_page_clean(struct page *page)
         list_del(&page->list);
         list_add(&page->list, &mapping->clean_pages);
 
+        /* XXX doesn't inode_lock protect i_state ? */
         inode = mapping->host;
         if (list_empty(&mapping->dirty_pages)) {
                 CDEBUG(D_INODE, "inode clean\n");
@@ -81,7 +83,7 @@ static void __set_page_clean(struct page *page)
         EXIT;
 }
 
-inline void set_page_clean(struct page *page)
+void set_page_clean(struct page *page)
 {
         if (PageDirty(page)) {
                 ClearPageDirty(page);
@@ -90,7 +92,7 @@ inline void set_page_clean(struct page *page)
 }
 
 /* SYNCHRONOUS I/O to object storage for an inode */
-static int ll_brw(int cmd, struct inode *inode, struct page *page, int create)
+static int ll_brw(int cmd, struct inode *inode, struct page *page, int flags)
 {
         struct ll_inode_info *lli = ll_i2info(inode);
         struct lov_stripe_md *lsm = lli->lli_smd;
@@ -112,8 +114,8 @@ static int ll_brw(int cmd, struct inode *inode, struct page *page, int create)
                 pg.count = PAGE_SIZE;
 
         CDEBUG(D_PAGE, "%s %d bytes ino %lu at "LPU64"/"LPX64"\n",
-              cmd & OBD_BRW_WRITE ? "write" : "read", pg.count, inode->i_ino,
-              pg.off, pg.off);
+               cmd & OBD_BRW_WRITE ? "write" : "read", pg.count, inode->i_ino,
+               pg.off, pg.off);
         if (pg.count == 0) {
                 CERROR("ZERO COUNT: ino %lu: size %p:%Lu(%p:%Lu) idx %lu off "
                        LPU64"\n",
@@ -121,7 +123,7 @@ static int ll_brw(int cmd, struct inode *inode, struct page *page, int create)
                        page->mapping->host->i_size, page->index, pg.off);
         }
 
-        pg.flag = create ? OBD_BRW_CREATE : 0;
+        pg.flag = flags;
 
         set->brw_callback = ll_brw_sync_wait;
         rc = obd_brw(cmd, ll_i2obdconn(inode), lsm, 1, &pg, set, NULL);
@@ -133,56 +135,174 @@ static int ll_brw(int cmd, struct inode *inode, struct page *page, int create)
                 if (rc)
                         CERROR("error from callback: rc = %d\n", rc);
         }
-        obd_brw_set_free(set);
+        obd_brw_set_decref(set);
 
         RETURN(rc);
 }
 
-/* returns the page unlocked, but with a reference */
-static int ll_readpage(struct file *file, struct page *page)
+/* 
+ * we were asked to read a single page but we're going to try and read a batch
+ * of pages all at once.  this vaguely simulates 2.5's readpages.
+ */
+static int ll_readpage(struct file *file, struct page *first_page)
 {
-        struct inode *inode = page->mapping->host;
-        obd_off offset = ((obd_off)page->index) << PAGE_SHIFT;
-        int rc = 0;
+        struct inode *inode = first_page->mapping->host;
+        struct ll_inode_info *lli = ll_i2info(inode);
+        struct page *page = first_page;
+        struct list_head *pos;
+        struct brw_page *pgs;
+        struct obd_brw_set *set;
+        unsigned long end_index, extent_end = 0;
+        int npgs = 0, rc = 0;
         ENTRY;
 
-        if (!PageLocked(page))
-                LBUG();
+        LASSERT(PageLocked(page));
+        LASSERT(!PageUptodate(page));
+        CDEBUG(D_VFSTRACE, "VFS Op\n");
 
-        if (inode->i_size <= offset) {
+        if (inode->i_size <= ((obd_off)page->index) << PAGE_SHIFT) {
                 CERROR("reading beyond EOF\n");
                 memset(kmap(page), 0, PAGE_SIZE);
                 kunmap(page);
-                GOTO(readpage_out, rc);
+                SetPageUptodate(page);
+                unlock_page(page);
+                RETURN(rc);
         }
 
-        /* XXX Workaround for BA OSTs returning short reads at EOF.  The linux
-         *     OST will return the full page, zero-filled at the end, which
-         *     will just overwrite the data we set here.
-         *     Bug 593 relates to fixing this properly.
+        pgs = kmalloc(PTL_MD_MAX_IOV * sizeof(*pgs), GFP_USER);
+        if ( pgs == NULL )
+                RETURN(-ENOMEM);
+        set = obd_brw_set_new();
+        if ( set == NULL )
+                GOTO(out_pgs, rc = -ENOMEM);
+
+        /* arbitrarily try to read-ahead 8 times what we can pass on 
+         * the wire at once, clamped to file size */
+        end_index = first_page->index + 
+                8 * ((PTL_MD_MAX_IOV * PAGE_SIZE)>>PAGE_CACHE_SHIFT);
+        if ( end_index > inode->i_size >> PAGE_CACHE_SHIFT )
+                end_index = inode->i_size >> PAGE_CACHE_SHIFT;
+
+        /*
+         * find how far we're allowed to read under the extent ll_file_read
+         * is passing us.. 
          */
-        if (inode->i_size < offset + PAGE_SIZE) {
-                int count = inode->i_size - offset;
-                void *addr = kmap(page);
-                //POISON(addr, 0x7c, count);
-                memset(addr + count, 0, PAGE_SIZE - count);
-                kunmap(page);
+        spin_lock(&lli->lli_read_extent_lock);
+        list_for_each(pos, &lli->lli_read_extents) {
+                struct ll_read_extent *rextent;
+                rextent = list_entry(pos, struct ll_read_extent, re_lli_item);
+                if ( rextent->re_task != current )
+                        continue;
+
+                if (rextent->re_extent.end + PAGE_SIZE < rextent->re_extent.end)
+                        /* extent wrapping */
+                        extent_end = ~0;
+                else  {
+                        extent_end = ( rextent->re_extent.end + PAGE_SIZE )
+                                                        << PAGE_CACHE_SHIFT;
+                        /* 32bit indexes, 64bit extents.. */
+                        if ( ((u64)extent_end >> PAGE_CACHE_SHIFT ) < 
+                                        rextent->re_extent.end )
+                                extent_end = ~0;
+                }
+                break;
         }
+        spin_unlock(&lli->lli_read_extent_lock);
+
+        if ( extent_end == 0 ) {
+                CERROR("readpage outside ll_file_read, no lock held?\n");
+                end_index = page->index + 1;
+        } else if ( extent_end < end_index )
+                end_index = extent_end;
+
+        /* to balance the find_get_page ref the other pages get that is
+         * decrefed on teardown.. */
+        page_cache_get(page);
+        do { 
+                unsigned long index ;
+
+                pgs[npgs].pg = page;
+                pgs[npgs].off = ((obd_off)page->index) << PAGE_CACHE_SHIFT;
+                pgs[npgs].flag = 0;
+                pgs[npgs].count = PAGE_SIZE;
+                /* XXX Workaround for BA OSTs returning short reads at EOF.
+                 * The linux OST will return the full page, zero-filled at the
+                 * end, which will just overwrite the data we set here.  Bug
+                 * 593 relates to fixing this properly.
+                 */
+                if (inode->i_size < pgs[npgs].off + PAGE_SIZE) {
+                        int count = inode->i_size - pgs[npgs].off;
+                        void *addr = kmap(page);
+                        pgs[npgs].count = count;
+                        //POISON(addr, 0x7c, count);
+                        memset(addr + count, 0, PAGE_SIZE - count);
+                        kunmap(page);
+                }
+
+                npgs++;
+                if ( npgs == PTL_MD_MAX_IOV )
+                        break;
+
+                /*
+                 * find pages ahead of us that we can read in.  
+                 * grab_cache_page waits on pages that are locked so
+                 * we first try find_get_page, which doesn't.  this stops
+                 * the worst case behaviour of racing threads waiting on 
+                 * each other, but doesn't remove it entirely.
+                 */
+                for ( index = page->index + 1, page = NULL ;
+                        page == NULL && index < end_index ; index++ ) {
+
+                        /* see if the page already exists and needs updating */
+                        page = find_get_page(inode->i_mapping, index);
+                        if ( page ) {
+                                if ( Page_Uptodate(page) || TryLockPage(page) )
+                                        goto out_release;
+                                if ( !page->mapping || Page_Uptodate(page)) 
+                                        goto out_unlock;
+                        } else {
+                                /* ok, we have to create it.. */
+                                page = grab_cache_page(inode->i_mapping, index);
+                                if ( page == NULL ) 
+                                        continue;
+                                if ( Page_Uptodate(page) )
+                                        goto out_unlock;
+                        }
+
+                        break;
+
+                out_unlock:
+                        unlock_page(page);
+                out_release:
+                        page_cache_release(page);
+                        page = NULL;
+                }
 
-        if (PageUptodate(page)) {
-                CERROR("Explain this please?\n");
-                GOTO(readpage_out, rc);
+        } while (page);
+
+        set->brw_callback = ll_brw_sync_wait;
+        rc = obd_brw(OBD_BRW_READ, ll_i2obdconn(inode),
+                     ll_i2info(inode)->lli_smd, npgs, pgs, set, NULL);
+        if (rc) {
+                CERROR("error from obd_brw: rc = %d\n", rc);
+        } else {
+                rc = ll_brw_sync_wait(set, CB_PHASE_START);
+                if (rc)
+                        CERROR("error from callback: rc = %d\n", rc);
         }
+        obd_brw_set_decref(set);
 
-        CDEBUG(D_VFSTRACE, "VFS Op\n");
-        rc = ll_brw(OBD_BRW_READ, inode, page, 0);
-        EXIT;
+        while ( --npgs > -1 ) {
+                page = pgs[npgs].pg;
 
- readpage_out:
-        if (!rc)
-                SetPageUptodate(page);
-        unlock_page(page);
-        return 0;
+                if ( rc == 0 )
+                        SetPageUptodate(page);
+                unlock_page(page);
+                page_cache_release(page);
+        }
+out_pgs:
+        kfree(pgs);
+        RETURN(rc);
 } /* ll_readpage */
 
 void ll_truncate(struct inode *inode)
@@ -190,12 +310,14 @@ void ll_truncate(struct inode *inode)
         struct obdo oa = {0};
         struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
         struct lustre_handle lockh = { 0, 0 };
+        struct ldlm_extent extent = {inode->i_size, OBD_OBJECT_EOF};
         int err;
         ENTRY;
 
         if (!lsm) {
                 /* object not yet allocated */
                 inode->i_mtime = inode->i_ctime = CURRENT_TIME;
+                EXIT;
                 return;
         }
 
@@ -207,9 +329,11 @@ void ll_truncate(struct inode *inode)
         CDEBUG(D_INFO, "calling punch for "LPX64" (all bytes after %Lu)\n",
                oa.o_id, inode->i_size);
 
-        err = ll_size_lock(inode, lsm, inode->i_size, LCK_PW, &lockh);
-        if (err) {
-                CERROR("ll_size_lock failed: %d\n", err);
+         /* i_size has already been set to the new size */
+        err = ll_extent_lock_no_validate(NULL, inode, lsm, LCK_PW, 
+                                        &extent, &lockh);
+        if (err != ELDLM_OK && err != ELDLM_LOCK_MATCHED) {
+                EXIT;
                 return;
         }
 
@@ -221,9 +345,9 @@ void ll_truncate(struct inode *inode)
         else
                 obdo_to_inode(inode, &oa, oa.o_valid);
 
-        err = ll_size_unlock(inode, lsm, LCK_PW, &lockh);
+        err = ll_extent_unlock(NULL, inode, lsm, LCK_PW, &lockh);
         if (err)
-                CERROR("ll_size_unlock failed: %d\n", err);
+                CERROR("ll_extent_unlock failed: %d\n", err);
 
         EXIT;
         return;
@@ -237,11 +361,12 @@ static int ll_prepare_write(struct file *file, struct page *page, unsigned from,
         struct inode *inode = page->mapping->host;
         obd_off offset = ((obd_off)page->index) << PAGE_SHIFT;
         int rc = 0;
-        char *addr;
         ENTRY;
 
-        addr = kmap(page);
-        LASSERT(PageLocked(page));
+        ll_check_dirty(inode->i_sb);
+
+        if (!PageLocked(page))
+                LBUG();
 
         if (PageUptodate(page))
                 RETURN(0);
@@ -254,118 +379,83 @@ static int ll_prepare_write(struct file *file, struct page *page, unsigned from,
                 RETURN(0);
         CDEBUG(D_VFSTRACE, "VFS Op\n");
 
-        /* If are writing to a new page, no need to read old data.  If we
-         * haven't already gotten the file size in ll_file_write() since
-         * we got our extent lock, we need to verify it here before we
-         * overwrite some other node's write (bug 445).
-         */
+        /* If are writing to a new page, no need to read old data.
+         * the extent locking and getattr procedures in ll_file_write have
+         * guaranteed that i_size is stable enough for our zeroing needs */
         if (inode->i_size <= offset) {
-                if (!S_ISBLK(inode->i_mode) && !(file->f_flags & O_APPEND)) {
-                        struct ll_file_data *fd = file->private_data;
-                        struct lov_stripe_md *lsm = ll_i2info(inode)->lli_smd;
-
-                        rc = ll_file_size(inode, lsm, fd->fd_ostdata);
-                        if (rc)
-                                GOTO(prepare_done, rc);
-                }
-                if (inode->i_size <= offset) {
-                        memset(addr, 0, PAGE_SIZE);
-                        GOTO(prepare_done, rc=0);
-                }
+                memset(kmap(page), 0, PAGE_SIZE);
+                kunmap(page);
+                GOTO(prepare_done, rc = 0);
         }
 
         rc = ll_brw(OBD_BRW_READ, inode, page, 0);
 
         EXIT;
  prepare_done:
-        if (!rc)
+        if (rc == 0)
                 SetPageUptodate(page);
-        else
-                kunmap (page);
 
         return rc;
 }
 
-/* Write a page from kupdated or kswapd.
+/*
+ * background file writeback.  This is called regularly from kupdated to write
+ * dirty data, from kswapd when memory is low, and from filemap_fdatasync when
+ * super blocks or inodes are synced..
  *
- * We unlock the page even in the face of an error, otherwise dirty
- * pages could OOM the system if they cannot be written.  Also, there
- * is nobody to return an error code to from here - the application
- * may not even be running anymore.
+ * obd_brw errors down in _batch_writepage are ignored, so pages are always
+ * unlocked.  Also, there is nobody to return an error code to from here - the
+ * application may not even be running anymore.
  *
- * Returns the page unlocked, but with a reference.
+ * this should be async so that things like kswapd can have a chance to
+ * free some more pages that our allocating writeback may need, but it isn't
+ * yet.
  */
-static int ll_writepage(struct page *page) {
+static int ll_writepage(struct page *page)
+{
         struct inode *inode = page->mapping->host;
-        int err;
         ENTRY;
 
-        LASSERT(PageLocked(page));
-
-        /* XXX need to make sure we have LDLM lock on this page */
+        CDEBUG(D_CACHE, "page %p [lau %d] inode %p\n", page,
+                        PageLaunder(page), inode);
         CDEBUG(D_VFSTRACE, "VFS Op\n");
-        err = ll_brw(OBD_BRW_WRITE, inode, page, 1);
-        if (err)
-                CERROR("ll_brw failure %d\n", err);
-        else
-                set_page_clean(page);
+        LASSERT(PageLocked(page));
 
-        unlock_page(page);
-        RETURN(err);
+        /* XXX should obd_brw errors trickle up? */
+        ll_batch_writepage(inode, page);
+        RETURN(0);
 }
 
-
-/* SYNCHRONOUS I/O to object storage for an inode -- object attr will be updated
- * too */
+/*
+ * we really don't want to start writeback here, we want to give callers some
+ * time to further dirty the pages before we write them out.
+ */
 static int ll_commit_write(struct file *file, struct page *page,
                            unsigned from, unsigned to)
 {
         struct inode *inode = page->mapping->host;
-        struct ll_inode_info *lli = ll_i2info(inode);
-        struct lov_stripe_md *md = lli->lli_smd;
-        struct brw_page pg;
-        struct obd_brw_set *set;
-        int rc, create = 1;
         loff_t size;
         ENTRY;
 
-        pg.pg = page;
-        pg.count = to;
-        /* XXX make the starting offset "from" */
-        pg.off = (((obd_off)page->index) << PAGE_SHIFT);
-        pg.flag = create ? OBD_BRW_CREATE : 0;
-
-        set = obd_brw_set_new();
-        if (set == NULL)
-                RETURN(-ENOMEM);
-
-        SetPageUptodate(page);
-
-        if (!PageLocked(page))
-                LBUG();
+        LASSERT(inode == file->f_dentry->d_inode);
+        LASSERT(PageLocked(page));
 
         CDEBUG(D_VFSTRACE, "VFS Op\n");
-        CDEBUG(D_INODE, "commit_page writing (off "LPD64"), count %d\n",
-               pg.off, pg.count);
+        CDEBUG(D_INODE, "inode %p is writing page %p from %d to %d at %lu\n",
+               inode, page, from, to, page->index);
 
-        set->brw_callback = ll_brw_sync_wait;
-        rc = obd_brw(OBD_BRW_WRITE, ll_i2obdconn(inode), md, 1, &pg, set, NULL);
-        if (rc)
-                CERROR("error from obd_brw: rc = %d\n", rc);
-        else {
-                rc = ll_brw_sync_wait(set, CB_PHASE_START);
-                if (rc)
-                        CERROR("error from callback: rc = %d\n", rc);
-        }
-        obd_brw_set_free(set);
-        kunmap(page);
+        /* to match full page case in prepare_write */
+        SetPageUptodate(page);
+        /* mark the page dirty, put it on mapping->dirty,
+         * mark the inode PAGES_DIRTY, put it on sb->dirty */
+        set_page_dirty(page);
 
-        size = pg.off + pg.count;
-        /* do NOT truncate when writing in the middle of a file */
+        /* this is matched by a hack in obdo_to_inode at the moment */
+        size = (((obd_off)page->index) << PAGE_SHIFT) + to;
         if (size > inode->i_size)
                 inode->i_size = size;
 
-        RETURN(rc);
+        RETURN(0);
 } /* ll_commit_write */
 
 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
@@ -384,11 +474,17 @@ static int ll_direct_IO(int rw, struct inode *inode, struct kiobuf *iobuf,
         if (!lsm || !lsm->lsm_object_id)
                 RETURN(-ENOMEM);
 
+        if ((iobuf->offset & (blocksize - 1)) ||
+            (iobuf->length & (blocksize - 1)))
+                RETURN(-EINVAL);
+
+#if 0
         /* XXX Keep here until we find ia64 problem, it crashes otherwise */
         if (blocksize != PAGE_SIZE) {
                 CERROR("direct_IO blocksize != PAGE_SIZE\n");
                 RETURN(-EINVAL);
         }
+#endif
 
         set = obd_brw_set_new();
         if (set == NULL)
@@ -396,17 +492,12 @@ static int ll_direct_IO(int rw, struct inode *inode, struct kiobuf *iobuf,
 
         OBD_ALLOC(pga, sizeof(*pga) * iobuf->nr_pages);
         if (!pga) {
-                obd_brw_set_free(set);
+                obd_brw_set_decref(set);
                 RETURN(-ENOMEM);
         }
 
-        CDEBUG(D_PAGE, "blocksize %u, blocknr %lu, iobuf %p: nr_pages %u, "
-                       "array_len %u, offset %u, length %u\n",
-               blocksize, blocknr, iobuf, iobuf->nr_pages,
-               iobuf->array_len, iobuf->offset, iobuf->length);
-
         flags = (rw == WRITE ? OBD_BRW_CREATE : 0) /* | OBD_BRW_DIRECTIO */;
-        offset = (blocknr << inode->i_blkbits) /* + iobuf->offset? */;
+        offset = (blocknr << inode->i_blkbits);
         length = iobuf->length;
 
         for (i = 0, length = iobuf->length; length > 0;
@@ -417,8 +508,6 @@ static int ll_direct_IO(int rw, struct inode *inode, struct kiobuf *iobuf,
                 pga[i].count = min_t(int, PAGE_SIZE - (offset & ~PAGE_MASK),
                                      length);
                 pga[i].flag = flags;
-                CDEBUG(D_PAGE, "page %d (%p), offset "LPU64", count %u\n",
-                       i, pga[i].pg, pga[i].off, pga[i].count);
                 if (rw == READ) {
                         //POISON(kmap(iobuf->maplist[i]), 0xc5, PAGE_SIZE);
                         //kunmap(iobuf->maplist[i]);
@@ -436,7 +525,7 @@ static int ll_direct_IO(int rw, struct inode *inode, struct kiobuf *iobuf,
                 if (rc)
                         CERROR("error from callback: rc = %d\n", rc);
         }
-        obd_brw_set_free(set);
+        obd_brw_set_decref(set);
         if (rc == 0)
                 rc = iobuf->length;
 
@@ -445,52 +534,8 @@ static int ll_direct_IO(int rw, struct inode *inode, struct kiobuf *iobuf,
 }
 #endif
 
-int ll_flush_inode_pages(struct inode * inode)
-{
-        obd_count        bufs_per_obdo = 0;
-        obd_size         *count = NULL;
-        obd_off          *offset = NULL;
-        obd_flag         *flags = NULL;
-        int              err = 0;
-
-        ENTRY;
-
-#if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))
-        spin_lock(&pagecache_lock);
-
-        spin_unlock(&pagecache_lock);
-#endif
-
-
-        OBD_ALLOC(count, sizeof(*count) * bufs_per_obdo);
-        OBD_ALLOC(offset, sizeof(*offset) * bufs_per_obdo);
-        OBD_ALLOC(flags, sizeof(*flags) * bufs_per_obdo);
-        if (!count || !offset || !flags)
-                GOTO(out, err=-ENOMEM);
-
-#if 0
-        for (i = 0 ; i < bufs_per_obdo ; i++) {
-                count[i] = PAGE_SIZE;
-                offset[i] = ((obd_off)(iobuf->maplist[i])->index) << PAGE_SHIFT;
-                flags[i] = OBD_BRW_CREATE;
-        }
-
-        err = obd_brw(OBD_BRW_WRITE, ll_i2obdconn(inode),
-                      ll_i2info(inode)->lli_smd, bufs_per_obdo,
-                      iobuf->maplist, count, offset, flags, NULL, NULL);
-        if (err == 0)
-                err = bufs_per_obdo * 4096;
-#endif
- out:
-        OBD_FREE(flags, sizeof(*flags) * bufs_per_obdo);
-        OBD_FREE(count, sizeof(*count) * bufs_per_obdo);
-        OBD_FREE(offset, sizeof(*offset) * bufs_per_obdo);
-        RETURN(err);
-}
-
 //#endif
 
-
 struct address_space_operations ll_aops = {
         readpage: ll_readpage,
 #if (LINUX_VERSION_CODE <= KERNEL_VERSION(2,5,0))