Whamcloud - gitweb
- show correct id in debug line
[fs/lustre-release.git] / lustre / obdfilter / filter_io_24.c
index 32adb9f..0010961 100644 (file)
@@ -29,8 +29,6 @@
 #include <linux/pagemap.h> // XXX kill me soon
 #include <linux/version.h>
 
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-
 #define DEBUG_SUBSYSTEM S_FILTER
 
 #include <linux/iobuf.h>
 #include <linux/lustre_fsfilt.h>
 #include "filter_internal.h"
 
-
-/* We should only change the file mtime (and not the ctime, like
- * update_inode_times() in generic_file_write()) when we only change data. */
-void inode_update_time(struct inode *inode, int ctime_too)
-{
-        time_t now = CURRENT_TIME;
-        if (inode->i_mtime == now && (!ctime_too || inode->i_ctime == now))
-                return;
-        inode->i_mtime = now;
-        if (ctime_too)
-                inode->i_ctime = now;
-        mark_inode_dirty_sync(inode);
-}
-
 /* Bug 2254 -- this is better done in ext3_map_inode_page, but this
  * workaround will suffice until everyone has upgraded their kernels */
 static void check_pending_bhs(unsigned long *blocks, int nr_pages, dev_t dev,
@@ -79,27 +63,113 @@ static void check_pending_bhs(unsigned long *blocks, int nr_pages, dev_t dev,
 #endif
 }
 
-/* Must be called with i_sem taken; this will drop it */
-static int filter_direct_io(int rw, struct dentry *dchild, struct kiobuf *iobuf,
-                            struct obd_export *exp, struct iattr *attr,
-                            struct obd_trans_info *oti, void **wait_handle)
+/* when brw_kiovec() is asked to read from block -1UL it just zeros
+ * the page.  this gives us a chance to verify the write mappings
+ * as well */
+static int filter_cleanup_mappings(int rw, struct kiobuf *iobuf,
+                                   struct inode *inode)
+{
+        int i, blocks_per_page_bits = PAGE_SHIFT - inode->i_blkbits;
+        ENTRY;
+
+        for (i = 0 ; i < iobuf->nr_pages << blocks_per_page_bits; i++) {
+                if (iobuf->blocks[i] > 0)
+                        continue;
+
+                if (rw == OBD_BRW_WRITE)
+                        RETURN(-EINVAL);
+
+                iobuf->blocks[i] = -1UL;
+        }
+        RETURN(0);
+}
+
+#if 0
+static void dump_page(int rw, unsigned long block, struct page *page)
+{
+        char *blah = kmap(page);
+        CDEBUG(D_PAGE, "rw %d block %lu: %02x %02x %02x %02x\n", rw, block,
+                       blah[0], blah[1], blah[2], blah[3]);
+        kunmap(page);
+}
+#endif
+
+/* These are our hacks to keep our directio/bh IO coherent with ext3's
+ * page cache use.  Most notably ext3 reads file data into the page
+ * cache when it is zeroing the tail of partial-block truncates and
+ * leaves it there, sometimes generating io from it at later truncates.
+ * This removes the partial page and its buffers from the page cache,
+ * so it should only ever cause a wait in rare cases, as otherwise we
+ * always do full-page IO to the OST.
+ *
+ * The call to truncate_complete_page() will call journal_flushpage() to
+ * free the buffers and drop the page from cache.  The buffers should not
+ * be dirty, because we already called fdatasync/fdatawait on them.
+ */
+static int filter_clear_page_cache(struct inode *inode, struct kiobuf *iobuf)
+{
+        struct page *page;
+        int i, rc, rc2;
+
+        check_pending_bhs(KIOBUF_GET_BLOCKS(iobuf), iobuf->nr_pages,
+                          inode->i_dev, 1 << inode->i_blkbits);
+
+        /* This is nearly generic_osync_inode, without the waiting on the inode
+        rc = generic_osync_inode(inode, inode->i_mapping,
+                                 OSYNC_DATA|OSYNC_METADATA);
+         */
+        rc = filemap_fdatasync(inode->i_mapping);
+        rc2 = fsync_inode_data_buffers(inode);
+        if (rc == 0)
+                rc = rc2;
+        rc2 = filemap_fdatawait(inode->i_mapping);
+        if (rc == 0)
+                rc = rc2;
+        if (rc != 0)
+                RETURN(rc);
+
+        /* be careful to call this after fsync_inode_data_buffers has waited
+         * for IO to complete before we evict it from the cache */
+        for (i = 0; i < iobuf->nr_pages ; i++) {
+                page = find_lock_page(inode->i_mapping,
+                                      iobuf->maplist[i]->index);
+                if (page == NULL)
+                        continue;
+                if (page->mapping != NULL)
+                        ll_truncate_complete_page(page);
+
+                unlock_page(page);
+                page_cache_release(page);
+        }
+
+        return 0;
+}
+
+/* Must be called with i_sem taken for writes; this will drop it */
+int filter_direct_io(int rw, struct dentry *dchild, void *buf,
+                     struct obd_export *exp, struct iattr *attr,
+                     struct obd_trans_info *oti, void **wait_handle)
 {
         struct obd_device *obd = exp->exp_obd;
         struct inode *inode = dchild->d_inode;
-        struct page *page;
-        unsigned long *b = iobuf->blocks;
-        int rc, i, create = (rw == OBD_BRW_WRITE), blocks_per_page;
-        int *cr, cleanup_phase = 0, *created = NULL;
-        int committed = 0;
+        struct kiobuf *iobuf = buf;
+        int rc, create = (rw == OBD_BRW_WRITE), *created = NULL, committed = 0;
+        int blocks_per_page = PAGE_SIZE >> inode->i_blkbits, cleanup_phase = 0;
+        struct semaphore *sem = NULL;
         ENTRY;
 
-        blocks_per_page = PAGE_SIZE >> inode->i_blkbits;
+        LASSERTF(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ, "%x\n", rw);
+
+        if (iobuf->nr_pages == 0)
+                GOTO(cleanup, rc = 0);
+
         if (iobuf->nr_pages * blocks_per_page > KIO_MAX_SECTORS)
                 GOTO(cleanup, rc = -EINVAL);
 
-        OBD_ALLOC(created, sizeof(*created) * iobuf->nr_pages*blocks_per_page);
-        if (created == NULL)
-                GOTO(cleanup, rc = -ENOMEM);
+        if (iobuf->nr_pages * blocks_per_page > 
+            OBDFILTER_CREATED_SCRATCHPAD_ENTRIES)
+                GOTO(cleanup, rc = -EINVAL);
+
         cleanup_phase = 1;
 
         rc = lock_kiovec(1, &iobuf, 1);
@@ -107,70 +177,58 @@ static int filter_direct_io(int rw, struct dentry *dchild, struct kiobuf *iobuf,
                 GOTO(cleanup, rc);
         cleanup_phase = 2;
 
-        down(&exp->exp_obd->u.filter.fo_alloc_lock);
-        for (i = 0, cr = created, b = iobuf->blocks; i < iobuf->nr_pages; i++){
-                page = iobuf->maplist[i];
-
-                rc = fsfilt_map_inode_page(obd, inode, page, b, cr, create);
-                if (rc) {
-                        CERROR("ino %lu, blk %lu cr %u create %d: rc %d\n",
-                               inode->i_ino, *b, *cr, create, rc);
-                        up(&exp->exp_obd->u.filter.fo_alloc_lock);
-                        GOTO(cleanup, rc);
-                }
-
-                b += blocks_per_page;
-                cr += blocks_per_page;
+        if (rw == OBD_BRW_WRITE) {
+                create = 1;
+                sem = &obd->u.filter.fo_alloc_lock;
         }
-        up(&exp->exp_obd->u.filter.fo_alloc_lock);
-
-        filter_tally_write(&obd->u.filter, iobuf->maplist, iobuf->nr_pages,
-                           iobuf->blocks, blocks_per_page);
-
-        if (attr->ia_size > inode->i_size)
-                attr->ia_valid |= ATTR_SIZE;
-        rc = fsfilt_setattr(obd, dchild, oti->oti_handle, attr, 0);
+        
+        rc = fsfilt_map_inode_pages(obd, inode, iobuf->maplist,
+                                    iobuf->nr_pages, iobuf->blocks, 
+                                    obdfilter_created_scratchpad, create, sem);
         if (rc)
                 GOTO(cleanup, rc);
 
-        up(&inode->i_sem);
-        cleanup_phase = 3;
-
-        rc = filter_finish_transno(exp, oti, 0);
+        rc = filter_cleanup_mappings(rw, iobuf, inode);
         if (rc)
                 GOTO(cleanup, rc);
 
-        rc = fsfilt_commit_async(obd, inode, oti->oti_handle, wait_handle);
-        oti->oti_handle = NULL;
-        committed = 1;
-        if (rc)
-                GOTO(cleanup, rc);
+        if (rw == OBD_BRW_WRITE) {
+                filter_tally_write(&obd->u.filter, iobuf->maplist,
+                                   iobuf->nr_pages, iobuf->blocks,
+                                   blocks_per_page);
 
-        check_pending_bhs(iobuf->blocks, iobuf->nr_pages, inode->i_dev,
-                          1 << inode->i_blkbits);
+                if (attr->ia_size > inode->i_size)
+                        attr->ia_valid |= ATTR_SIZE;
+                rc = fsfilt_setattr(obd, dchild, oti->oti_handle, attr, 0);
+                if (rc)
+                        GOTO(cleanup, rc);
+                up(&inode->i_sem);
+                cleanup_phase = 3;
+                rc = filter_finish_transno(exp, oti, 0);
+                if (rc)
+                        GOTO(cleanup, rc);
 
-        rc = filemap_fdatasync(inode->i_mapping);
-        if (rc == 0)
-                rc = fsync_inode_data_buffers(inode);
-        if (rc == 0)
-                rc = filemap_fdatawait(inode->i_mapping);
+                rc = fsfilt_commit_async(obd,inode,oti->oti_handle,wait_handle);
+                committed = 1;
+                if (rc)
+                        GOTO(cleanup, rc);
+        }
+
+        rc = filter_clear_page_cache(inode, iobuf);
         if (rc < 0)
                 GOTO(cleanup, rc);
 
-        rc = brw_kiovec(WRITE, 1, &iobuf, inode->i_dev, iobuf->blocks,
-                        1 << inode->i_blkbits);
+        rc = fsfilt_send_bio(rw, obd, inode, iobuf);
+
         CDEBUG(D_INFO, "tried to write %d pages, rc = %d\n",
                iobuf->nr_pages, rc);
-        if (rc != (1 << inode->i_blkbits) * iobuf->nr_pages * blocks_per_page)
-                CERROR("short write?  expected %d, wrote %d\n",
-                       (1 << inode->i_blkbits) * iobuf->nr_pages *
-                       blocks_per_page, rc);
+
         if (rc > 0)
                 rc = 0;
 
         EXIT;
 cleanup:
-        if (!committed) {
+        if (!committed && (rw == OBD_BRW_WRITE)) {                
                 int err = fsfilt_commit_async(obd, inode,
                                               oti->oti_handle, wait_handle);
                 oti->oti_handle = NULL;
@@ -187,12 +245,9 @@ cleanup:
         case 2:
                 unlock_kiovec(1, &iobuf);
         case 1:
-                OBD_FREE(created, sizeof(*created) *
-                         iobuf->nr_pages*blocks_per_page);
         case 0:
-                if (cleanup_phase == 3)
-                        break;
-                up(&inode->i_sem);
+                if (cleanup_phase != 3 && rw == OBD_BRW_WRITE)            
+                        up(&inode->i_sem);
                 break;
         default:
                 CERROR("corrupt cleanup_phase (%d)?\n", cleanup_phase);
@@ -202,18 +257,100 @@ cleanup:
         return rc;
 }
 
+/* See if there are unallocated parts in given file region */
+int filter_range_is_mapped(struct inode *inode, obd_size offset, int len)
+{
+        int (*fs_bmap)(struct address_space *, long) =
+                inode->i_mapping->a_ops->bmap;
+        int j;
+
+        /* We can't know if the range is mapped already or not */
+        if (fs_bmap == NULL)
+                return 0;
+
+        offset >>= inode->i_blkbits;
+        len >>= inode->i_blkbits;
+
+        for (j = 0; j < len; j++)
+                if (fs_bmap(inode->i_mapping, offset + j) == 0)
+                        return 0;
+
+        return 1;
+}
+
+/* some kernels require alloc_kiovec callers to zero members through the use of
+ * map_user_kiobuf and unmap_.. we don't use those, so we have a little helper
+ * that makes sure we don't break the rules. */
+static void clear_kiobuf(struct kiobuf *iobuf)
+{
+        int i;
+
+        for (i = 0; i < iobuf->array_len; i++)
+                iobuf->maplist[i] = NULL;
+
+        iobuf->nr_pages = 0;
+        iobuf->offset = 0;
+        iobuf->length = 0;
+}
+
+int filter_alloc_iobuf(int rw, int num_pages, void **ret)
+{
+        int rc;
+        struct kiobuf *iobuf;
+        ENTRY;
+
+        LASSERTF(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ, "%x\n", rw);
+
+        rc = alloc_kiovec(1, &iobuf);
+        if (rc)
+                RETURN(rc);
+
+        rc = expand_kiobuf(iobuf, num_pages);
+        if (rc) {
+                free_kiovec(1, &iobuf);
+                RETURN(rc);
+        }
+
+#ifdef HAVE_KIOBUF_DOVARY
+        iobuf->dovary = 0; /* this prevents corruption, not present in 2.4.20 */
+#endif
+        clear_kiobuf(iobuf);
+        *ret = iobuf;
+        RETURN(0);
+}
+
+void filter_free_iobuf(void *buf)
+{
+        struct kiobuf *iobuf = buf;
+
+        clear_kiobuf(iobuf);
+        free_kiovec(1, &iobuf);
+}
+
+int filter_iobuf_add_page(struct obd_device *obd, void *buf,
+                           struct inode *inode, struct page *page)
+{
+        struct kiobuf *iobuf = buf;
+
+        iobuf->maplist[iobuf->nr_pages++] = page;
+        iobuf->length += PAGE_SIZE;
+
+        return 0;
+}
+
 int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount,
                           struct obd_ioobj *obj, int niocount,
-                          struct niobuf_local *res, struct obd_trans_info *oti)
+                          struct niobuf_local *res, struct obd_trans_info *oti,
+                          int rc)
 {
         struct obd_device *obd = exp->exp_obd;
-        struct obd_run_ctxt saved;
+        struct lvfs_run_ctxt saved;
         struct niobuf_local *lnb;
         struct fsfilt_objinfo fso;
         struct iattr iattr = { 0 };
-        struct kiobuf *iobuf;
+        void *iobuf = NULL;
         struct inode *inode = NULL;
-        int rc = 0, i, cleanup_phase = 0, err;
+        int i, n, cleanup_phase = 0, err;
         unsigned long now = jiffies; /* DEBUGGING OST TIMEOUTS */
         void *wait_handle;
         ENTRY;
@@ -221,31 +358,31 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount,
         LASSERT(objcount == 1);
         LASSERT(current->journal_info == NULL);
 
-        rc = alloc_kiovec(1, &iobuf);
-        if (rc)
+        if (rc != 0)
                 GOTO(cleanup, rc);
-        cleanup_phase = 1;
 
-#if (LINUX_VERSION_CODE == KERNEL_VERSION(2,4,18))
-        iobuf->dovary = 0; /* this prevents corruption, not present in 2.4.20 */
-#endif
-        rc = expand_kiobuf(iobuf, obj->ioo_bufcnt);
+        rc = filter_alloc_iobuf(OBD_BRW_WRITE, obj->ioo_bufcnt, &iobuf);
         if (rc)
                 GOTO(cleanup, rc);
-
-        iobuf->offset = 0;
-        iobuf->length = PAGE_SIZE * obj->ioo_bufcnt;
-        iobuf->nr_pages = obj->ioo_bufcnt;
-
         cleanup_phase = 1;
+
         fso.fso_dentry = res->dentry;
         fso.fso_bufcnt = obj->ioo_bufcnt;
         inode = res->dentry->d_inode;
 
-        iattr_from_obdo(&iattr,oa,OBD_MD_FLATIME|OBD_MD_FLMTIME|OBD_MD_FLCTIME);
-        for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++) {
+        for (i = 0, lnb = res, n = 0; i < obj->ioo_bufcnt; i++, lnb++) {
                 loff_t this_size;
-                iobuf->maplist[i] = lnb->page;
+
+                /* If overwriting an existing block, we don't need a grant */
+                if (!(lnb->flags & OBD_BRW_GRANTED) && lnb->rc == -ENOSPC &&
+                    filter_range_is_mapped(inode, lnb->offset, lnb->len))
+                        lnb->rc = 0;
+
+                if (lnb->rc) /* ENOSPC, network RPC error */
+                        continue;
+
+                filter_iobuf_add_page(obd, iobuf, inode, lnb->page);
+                
                 /* We expect these pages to be in offset order, but we'll
                  * be forgiving */
                 this_size = lnb->offset + lnb->len;
@@ -253,13 +390,14 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount,
                         iattr.ia_size = this_size;
         }
 
-        push_ctxt(&saved, &obd->obd_ctxt, NULL);
+        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
         cleanup_phase = 2;
 
         down(&inode->i_sem);
         oti->oti_handle = fsfilt_brw_start(obd, objcount, &fso, niocount, res,
                                            oti);
         if (IS_ERR(oti->oti_handle)) {
+                up(&inode->i_sem);
                 rc = PTR_ERR(oti->oti_handle);
                 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
                        "error starting transaction: rc = %d\n", rc);
@@ -267,45 +405,38 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount,
                 GOTO(cleanup, rc);
         }
 
-        if (time_after(jiffies, now + 15 * HZ))
-                CERROR("slow brw_start %lus\n", (jiffies - now) / HZ);
+        fsfilt_check_slow(now, obd_timeout, "brw_start");
 
+        iattr_from_obdo(&iattr,oa,OBD_MD_FLATIME|OBD_MD_FLMTIME|OBD_MD_FLCTIME);
+        /* filter_direct_io drops i_sem */
         rc = filter_direct_io(OBD_BRW_WRITE, res->dentry, iobuf, exp, &iattr,
                               oti, &wait_handle);
         if (rc == 0)
                 obdo_from_inode(oa, inode, FILTER_VALID_FLAGS);
 
-        if (time_after(jiffies, now + 15 * HZ))
-                CERROR("slow direct_io %lus\n", (jiffies - now) / HZ);
+        fsfilt_check_slow(now, obd_timeout, "direct_io");
 
         err = fsfilt_commit_wait(obd, inode, wait_handle);
         if (err)
                 rc = err;
-        if (obd_sync_filter)
-                LASSERT(oti->oti_transno <= obd->obd_last_committed);
-        if (time_after(jiffies, now + 15 * HZ))
-                CERROR("slow commitrw commit %lus\n", (jiffies - now) / HZ);
-
+        if (obd_sync_filter && !err)
+                LASSERTF(oti->oti_transno <= obd->obd_last_committed,
+                         "oti_transno "LPU64" last_committed "LPU64"\n",
+                         oti->oti_transno, obd->obd_last_committed);
+        fsfilt_check_slow(now, obd_timeout, "commitrw commit");
 cleanup:
+        filter_grant_commit(exp, niocount, res);
+
         switch (cleanup_phase) {
         case 2:
-                pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+                pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 LASSERT(current->journal_info == NULL);
         case 1:
-                free_kiovec(1, &iobuf);
+                filter_free_iobuf(iobuf);
         case 0:
-                for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++) {
-                        /* flip_.. gets a ref, while free_page only frees
-                         * when it decrefs to 0 */
-                        if (rc == 0)
-                                flip_into_page_cache(inode, lnb->page);
-                        __free_page(lnb->page);
-                }
+                filter_free_dio_pages(objcount, obj, niocount, res);
                 f_dput(res->dentry);
         }
 
         RETURN(rc);
 }
-
-#endif
-