Whamcloud - gitweb
b=5153
[fs/lustre-release.git] / lustre / obdfilter / filter_io_26.c
index cc0007f..24e6229 100644 (file)
@@ -28,6 +28,7 @@
 #include <linux/module.h>
 #include <linux/pagemap.h> // XXX kill me soon
 #include <linux/version.h>
+#include <linux/buffer_head.h>
 
 #define DEBUG_SUBSYSTEM S_FILTER
 
 
 #warning "implement writeback mode -bzzz"
 
-int ext3_map_inode_page(struct inode *inode, struct page *page,
-                        unsigned long *blocks, int *created, int create);
-
 /* 512byte block min */
 #define MAX_BLOCKS_PER_PAGE (PAGE_SIZE / 512)
 struct dio_request {
         atomic_t numreqs;       /* number of reqs being processed */
+        struct bio *bio_current;/* bio currently being constructed */
         struct bio *bio_list;   /* list of completed bios */
-        wait_queue_head_t wait;
-       int created[MAX_BLOCKS_PER_PAGE];
-       unsigned long blocks[MAX_BLOCKS_PER_PAGE];
-        spinlock_t lock;
+        wait_queue_head_t dr_wait;
+        int dr_num_pages;
+        int dr_rw;
+        int dr_error;
+        int dr_created[MAX_BLOCKS_PER_PAGE];
+        unsigned long dr_blocks[MAX_BLOCKS_PER_PAGE];
+        spinlock_t dr_lock;
+
 };
 
 static int dio_complete_routine(struct bio *bio, unsigned int done, int error)
@@ -56,67 +59,266 @@ static int dio_complete_routine(struct bio *bio, unsigned int done, int error)
         struct dio_request *dreq = bio->bi_private;
         unsigned long flags;
 
-        spin_lock_irqsave(&dreq->lock, flags);
+        spin_lock_irqsave(&dreq->dr_lock, flags);
         bio->bi_private = dreq->bio_list;
         dreq->bio_list = bio;
-        spin_unlock_irqrestore(&dreq->lock, flags);
+        spin_unlock_irqrestore(&dreq->dr_lock, flags);
         if (atomic_dec_and_test(&dreq->numreqs))
-                wake_up(&dreq->wait);
-
+                wake_up(&dreq->dr_wait);
+        if (dreq->dr_error == 0)
+                dreq->dr_error = error;
         return 0;
 }
 
 static int can_be_merged(struct bio *bio, sector_t sector)
 {
-       int size;
-       
-       if (!bio)
-               return 0;
-       
-       size = bio->bi_size >> 9;
-       return bio->bi_sector + size == sector ? 1 : 0;
+        unsigned int size;
+        if (!bio)
+                return 0;
+
+        size = bio->bi_size >> 9;
+        return bio->bi_sector + size == sector ? 1 : 0;
+}
+int filter_alloc_iobuf(int rw, int num_pages, void **ret)
+{
+        struct dio_request *dreq;
+
+        LASSERTF(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ, "%x\n", rw);
+
+        OBD_ALLOC(dreq, sizeof(*dreq));
+        if (dreq == NULL)
+                RETURN(-ENOMEM);
+
+        dreq->bio_list = NULL;
+        init_waitqueue_head(&dreq->dr_wait);
+        atomic_set(&dreq->numreqs, 0);
+        spin_lock_init(&dreq->dr_lock);
+        dreq->dr_num_pages = num_pages;
+        dreq->dr_rw = rw;
+
+        *ret = dreq;
+        RETURN(0);
+}
+
+void filter_free_iobuf(void *iobuf)
+{
+        struct dio_request *dreq = iobuf;
+
+        /* free all bios */
+        while (dreq->bio_list) {
+                struct bio *bio = dreq->bio_list;
+                dreq->bio_list = bio->bi_private;
+                bio_put(bio);
+        }
+
+        OBD_FREE(dreq, sizeof(*dreq));
 }
 
-int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount,
-                          struct obd_ioobj *obj, int niocount,
-                          struct niobuf_local *res, struct obd_trans_info *oti)
+int filter_iobuf_add_page(struct obd_device *obd, void *iobuf,
+                          struct inode *inode, struct page *page)
+{
+        struct dio_request *dreq = iobuf;
+        int blocks_per_page = PAGE_SIZE >> inode->i_blkbits;
+        unsigned int len = inode->i_sb->s_blocksize, offs;
+        struct bio *bio = dreq->bio_current;
+        sector_t sector;
+        int k, rc;
+        ENTRY;
+
+        /* get block number for next page */
+        rc = fsfilt_map_inode_pages(obd, inode, &page, 1, dreq->dr_blocks,
+                                    dreq->dr_created,
+                                    dreq->dr_rw == OBD_BRW_WRITE, NULL);
+        if (rc)
+                RETURN(rc);
+
+        for (k = 0, offs = 0; k < blocks_per_page; k++, offs += len) {
+                if (dreq->dr_created[k] == -1) {
+                        memset(kmap(page) + offs, 0, len);
+                        kunmap(page);
+                        continue;
+                }
+
+                sector = dreq->dr_blocks[k] <<(inode->i_sb->s_blocksize_bits-9);
+
+                if (!bio || !can_be_merged(bio, sector) ||
+                    !bio_add_page(bio, page, len, offs)) {
+                        if (bio) {
+                                atomic_inc(&dreq->numreqs);
+                                /* FIXME
+                                filter_tally_write(&obd->u.filter,dreq->maplist,
+                                                   dreq->nr_pages,dreq->blocks,
+                                                   blocks_per_page);
+                                */
+                                fsfilt_send_bio(dreq->dr_rw, obd, inode, bio);
+                                dreq->bio_current = bio = NULL;
+                        }
+                        /* allocate new bio */
+                        dreq->bio_current = bio =
+                                bio_alloc(GFP_NOIO, dreq->dr_num_pages *
+                                                    blocks_per_page);
+                        bio->bi_bdev = inode->i_sb->s_bdev;
+                        bio->bi_sector = sector;
+                        bio->bi_end_io = dio_complete_routine;
+                        bio->bi_private = dreq;
+
+                        if (!bio_add_page(bio, page, len, offs))
+                                LBUG();
+                }
+        }
+        dreq->dr_num_pages--;
+
+        RETURN(0);
+}
+
+static void filter_clear_page_cache(struct inode *inode, struct kiobuf *iobuf)
+{
+#if 0
+        struct page *page;
+        int i;
+
+        for (i = 0; i < iobuf->nr_pages ; i++) {
+                page = find_lock_page(inode->i_mapping,
+                                      iobuf->maplist[i]->index);
+                if (page == NULL)
+                        continue;
+                if (page->mapping != NULL) {
+                        block_invalidatepage(page, 0);
+                        truncate_complete_page(page);
+                }
+                unlock_page(page);
+                page_cache_release(page);
+        }
+#endif
+}
+
+/* Must be called with i_sem taken for writes; this will drop it */
+int filter_direct_io(int rw, struct dentry *dchild, void *iobuf,
+                     struct obd_export *exp, struct iattr *attr,
+                     struct obd_trans_info *oti, void **wait_handle)
+{
+        struct dio_request *dreq = iobuf;
+        struct inode *inode = dchild->d_inode;
+        int rc;
+        ENTRY;
+
+        LASSERTF(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ, "%x\n", rw);
+
+        /* This is nearly osync_inode, without the waiting
+        rc = generic_osync_inode(inode, inode->i_mapping,
+                                 OSYNC_DATA|OSYNC_METADATA); */
+        rc = filemap_fdatawrite(inode->i_mapping);
+        if (rc == 0)
+                rc = sync_mapping_buffers(inode->i_mapping);
+        if (rc == 0)
+                rc = filemap_fdatawait(inode->i_mapping);
+        if (rc < 0)
+                GOTO(cleanup, rc);
+
+        if (rw == OBD_BRW_WRITE)
+                up(&inode->i_sem);
+
+        /* be careful to call this after fsync_inode_data_buffers has waited
+         * for IO to complete before we evict it from the cache */
+        filter_clear_page_cache(inode, iobuf);
+
+        if (dreq->bio_current != NULL) {
+                atomic_inc(&dreq->numreqs);
+                fsfilt_send_bio(rw, exp->exp_obd, inode, dreq->bio_current);
+                dreq->bio_current = NULL;
+        }
+
+        /* time to wait for I/O completion */
+        wait_event(dreq->dr_wait, atomic_read(&dreq->numreqs) == 0);
+
+        rc = dreq->dr_error;
+        if (rw == OBD_BRW_WRITE && rc == 0) {
+                /* FIXME:
+                filter_tally_write(&obd->u.filter, dreq->maplist,
+                                   dreq->nr_pages, dreq->blocks,
+                                   blocks_per_page);
+                */
+
+                if (attr->ia_size > inode->i_size) {
+                        CDEBUG(D_INFO, "setting i_size to "LPU64"\n",
+                               attr->ia_size);
+
+                        attr->ia_valid |= ATTR_SIZE;
+                        down(&inode->i_sem);
+                        fsfilt_setattr(exp->exp_obd, dchild, oti->oti_handle,
+                                       attr, 0);
+                        up(&inode->i_sem);
+                }
+        }
+
+cleanup:
+        RETURN(rc);
+}
+
+
+
+/* See if there are unallocated parts in given file region */
+static int filter_range_is_mapped(struct inode *inode, obd_size offset, int len)
+{
+        sector_t (*fs_bmap)(struct address_space *, sector_t) =
+                inode->i_mapping->a_ops->bmap;
+        int j;
+
+        /* We can't know if we are overwriting or not */
+        if (fs_bmap == NULL)
+                return 0;
+
+        offset >>= inode->i_blkbits;
+        len >>= inode->i_blkbits;
+
+        for (j = 0; j <= len; j++)
+                if (fs_bmap(inode->i_mapping, offset + j) == 0)
+                        return 0;
+
+        return 1;
+}
+
+int filter_commitrw_write(struct obd_export *exp, struct obdo *oa,
+                          int objcount, struct obd_ioobj *obj, int niocount,
+                          struct niobuf_local *res, struct obd_trans_info *oti,
+                          int rc)
 {
-        struct obd_device *obd = exp->exp_obd;
-        struct obd_run_ctxt saved;
         struct niobuf_local *lnb;
+        struct dio_request *dreq = NULL;
+        struct lvfs_run_ctxt saved;
         struct fsfilt_objinfo fso;
-        struct iattr iattr = { .ia_valid = ATTR_SIZE, .ia_size = 0, };
+        struct iattr iattr = { 0 };
         struct inode *inode = NULL;
-        int rc = 0, i, k, cleanup_phase = 0, err;
-        unsigned long now = jiffies; /* DEBUGGING OST TIMEOUTS */
-       int blocks_per_page;
-        struct dio_request *dreq;
-        struct bio *bio = NULL;
+        unsigned long now = jiffies;
+        int i, err, cleanup_phase = 0;
+        struct obd_device *obd = exp->exp_obd;
+
         ENTRY;
+
         LASSERT(oti != NULL);
         LASSERT(objcount == 1);
         LASSERT(current->journal_info == NULL);
 
+        if (rc != 0)
+                GOTO(cleanup, rc);
+
         inode = res->dentry->d_inode;
-        blocks_per_page = PAGE_SIZE >> inode->i_blkbits;
-       LASSERT(blocks_per_page <= MAX_BLOCKS_PER_PAGE);
 
-        OBD_ALLOC(dreq, sizeof(*dreq));
-        if (dreq == NULL)
-                RETURN(-ENOMEM);
-        dreq->bio_list = NULL;
-        init_waitqueue_head(&dreq->wait);
-        atomic_set(&dreq->numreqs, 0);
-        spin_lock_init(&dreq->lock);
+        rc = filter_alloc_iobuf(OBD_BRW_WRITE, obj->ioo_bufcnt, (void **)&dreq);
+        if (rc)
+                GOTO(cleanup, rc);
 
         cleanup_phase = 1;
         fso.fso_dentry = res->dentry;
         fso.fso_bufcnt = obj->ioo_bufcnt;
 
-        push_ctxt(&saved, &obd->obd_ctxt, NULL);
-        cleanup_phase = 2; 
+        push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+        cleanup_phase = 2;
 
-        oti->oti_handle = fsfilt_brw_start(obd, objcount, &fso, niocount, oti);
+        generic_osync_inode(inode, inode->i_mapping, OSYNC_DATA|OSYNC_METADATA);
+
+        oti->oti_handle = fsfilt_brw_start(obd, objcount, &fso, niocount, res,
+                                           oti);
         if (IS_ERR(oti->oti_handle)) {
                 rc = PTR_ERR(oti->oti_handle);
                 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
@@ -125,103 +327,67 @@ int filter_commitrw_write(struct obd_export *exp, struct obdo *oa, int objcount,
                 GOTO(cleanup, rc);
         }
 
+        /* have to call fsfilt_commit() from this point on */
+
         if (time_after(jiffies, now + 15 * HZ))
                 CERROR("slow brw_start %lus\n", (jiffies - now) / HZ);
 
+        down(&inode->i_sem);
         for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++) {
                 loff_t this_size;
-               sector_t sector;
-               int offs;
-
-               /* get block number for next page */
-                rc = ext3_map_inode_page(inode, lnb->page, dreq->blocks,
-                                                dreq->created, 1);
-                if (rc)
-                        GOTO(cleanup, rc);
-
-               for (k = 0; k < blocks_per_page; k++) {
-                       sector = dreq->blocks[k] * (inode->i_sb->s_blocksize >> 9);
-                       offs = k * inode->i_sb->s_blocksize;
-
-                       if (!bio || !can_be_merged(bio, sector) ||
-                               !bio_add_page(bio, lnb->page, lnb->len, offs)) {
-                               if (bio) {
-                                        atomic_inc(&dreq->numreqs);
-                                       submit_bio(WRITE, bio);
-                                       bio = NULL;
-                               }
-                               /* allocate new bio */
-                               bio = bio_alloc(GFP_NOIO, obj->ioo_bufcnt);
-                               bio->bi_bdev = inode->i_sb->s_bdev;
-                               bio->bi_sector = sector;
-                               bio->bi_end_io = dio_complete_routine; 
-                                bio->bi_private = dreq;
-
-                               if (!bio_add_page(bio, lnb->page, lnb->len, 0))
-                                       LBUG();
-                       }
-               }
-
-                /* We expect these pages to be in offset order, but we'll
+
+                /* If overwriting an existing block, we don't need a grant */
+                if (!(lnb->flags & OBD_BRW_GRANTED) && lnb->rc == -ENOSPC &&
+                    filter_range_is_mapped(inode, lnb->offset, lnb->len))
+                        lnb->rc = 0;
+
+                if (lnb->rc) /* ENOSPC, network RPC error, etc. */ 
+                        continue;
+
+                err = filter_iobuf_add_page(obd, dreq, inode, lnb->page);
+                if (err != 0) {
+                        lnb->rc = err;
+                        continue;
+                }
+
+                /* we expect these pages to be in offset order, but we'll
                  * be forgiving */
                 this_size = lnb->offset + lnb->len;
                 if (this_size > iattr.ia_size)
                         iattr.ia_size = this_size;
         }
-       if (bio) {
-                atomic_inc(&dreq->numreqs);
-                submit_bio(WRITE, bio);
-        }
-
-       /* time to wait for I/O completion */
-        wait_event(dreq->wait, atomic_read(&dreq->numreqs) == 0);
 
-        /* free all bios */
-        while (dreq->bio_list) {
-                bio = dreq->bio_list;
-                dreq->bio_list = bio->bi_private;
-                bio_put(bio);
-        }
-
-        if (rc == 0) {
-                down(&inode->i_sem);
-                inode_update_time(inode, 1);
-                if (iattr.ia_size > inode->i_size) {
-                        CDEBUG(D_INFO, "setting i_size to "LPU64"\n",
-                               iattr.ia_size);
-                        fsfilt_setattr(obd, res->dentry, oti->oti_handle,
-                                       &iattr, 0);
-                }
-                up(&inode->i_sem);
-        }
+        iattr_from_obdo(&iattr,oa,OBD_MD_FLATIME|OBD_MD_FLMTIME|OBD_MD_FLCTIME);
+        rc = filter_direct_io(OBD_BRW_WRITE, res->dentry, dreq, exp, &iattr,
+                              oti, NULL);
+        rc = filter_finish_transno(exp, oti, rc);
 
         if (time_after(jiffies, now + 15 * HZ))
                 CERROR("slow direct_io %lus\n", (jiffies - now) / HZ);
 
-        rc = filter_finish_transno(exp, oti, rc);
-        err = fsfilt_commit(obd, inode, oti->oti_handle, obd_sync_filter);
+
+        err = fsfilt_commit(obd, obd->u.filter.fo_sb, inode, oti->oti_handle,
+                            obd_sync_filter);
         if (err)
                 rc = err;
+
         if (obd_sync_filter)
                 LASSERT(oti->oti_transno <= obd->obd_last_committed);
+
         if (time_after(jiffies, now + 15 * HZ))
                 CERROR("slow commitrw commit %lus\n", (jiffies - now) / HZ);
 
 cleanup:
+        filter_grant_commit(exp, niocount, res);
+
         switch (cleanup_phase) {
         case 2:
-                pop_ctxt(&saved, &obd->obd_ctxt, NULL);
+                pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
                 LASSERT(current->journal_info == NULL);
         case 1:
-                OBD_FREE(dreq, sizeof(*dreq));
+                filter_free_iobuf(dreq);
         case 0:
-                for (i = 0, lnb = res; i < obj->ioo_bufcnt; i++, lnb++) {
-                        /* flip_.. gets a ref, while free_page only frees
-                         * when it decrefs to 0 */
-                        if (rc == 0)
-                                flip_into_page_cache(inode, lnb->page);
-                        __free_page(lnb->page);
-                }
+                filter_free_dio_pages(objcount, obj, niocount, res);
                 f_dput(res->dentry);
         }