Whamcloud - gitweb
- landing of b_hd_cleanup_merge to HEAD.
[fs/lustre-release.git] / lustre / obdfilter / filter_io.c
index 10cc687..8b09fc7 100644 (file)
 #include <linux/lustre_snap.h>
 #include "filter_internal.h"
 
-static int filter_start_page_read(struct obd_device *obd, struct inode *inode,
-                                  struct niobuf_local *lnb)
+static int filter_alloc_dio_page(struct obd_device *obd, struct inode *inode,
+                                 struct niobuf_local *lnb)
+
 {
         struct page *page;
-        unsigned long index = lnb->offset >> PAGE_SHIFT;
-
-        page = fsfilt_getpage(obd, inode, index);
-        if (IS_ERR(page)) {
-                CERROR("page index %lu, rc = %ld\n", index, PTR_ERR(page));
+        ENTRY;
+        page = alloc_pages(GFP_HIGHUSER, 0);
+        if (page == NULL) {
+                CERROR("no memory for a temp page\n");
+                lnb->rc = -ENOMEM;
+                RETURN(-ENOMEM);
+        }
 
-                lnb->page = NULL;
-                lnb->rc = PTR_ERR(page);
-                return lnb->rc;
+#if 0
+        POISON_PAGE(page, 0xf1);
+        if (lnb->len != PAGE_SIZE) {
+                memset(kmap(page) + lnb->len, 0, PAGE_SIZE - lnb->len);
+                kunmap(page);
         }
+#endif
+        page->index = lnb->offset >> PAGE_SHIFT;
 
         lnb->page = page;
 
-        return 0;
+        RETURN(0);
 }
 
-static int filter_finish_page_read(struct niobuf_local *lnb)
+void filter_free_dio_pages(int objcount, struct obd_ioobj *obj,
+                           int niocount, struct niobuf_local *res)
 {
-        if (lnb->page == NULL)
-                return 0;
-
-        if (PageUptodate(lnb->page))
-                return 0;
+        int i, j;
 
-        wait_on_page(lnb->page);
-        if (!PageUptodate(lnb->page)) {
-                CERROR("page index %lu/offset "LPX64" not uptodate\n",
-                       lnb->page->index, lnb->offset);
-                GOTO(err_page, lnb->rc = -EIO);
-        }
-        if (PageError(lnb->page)) {
-                CERROR("page index %lu/offset "LPX64" has error\n",
-                       lnb->page->index, lnb->offset);
-                GOTO(err_page, lnb->rc = -EIO);
+        for (i = 0; i < objcount; i++, obj++) {
+                for (j = 0 ; j < obj->ioo_bufcnt ; j++, res++) {
+                        if (res->page != NULL) {
+                                __free_page(res->page);
+                                res->page = NULL;
+                        }
+                }
         }
-
-        return 0;
-
-err_page:
-        page_cache_release(lnb->page);
-        lnb->page = NULL;
-        return lnb->rc;
 }
 
 /* Grab the dirty and seen grant announcements from the incoming obdo.
@@ -263,6 +258,7 @@ long filter_grant(struct obd_export *exp, obd_size current_grant,
         return grant;
 }
 
+
 static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
                               int objcount, struct obd_ioobj *obj,
                               int niocount, struct niobuf_remote *nb,
@@ -271,106 +267,84 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
 {
         struct obd_device *obd = exp->exp_obd;
         struct lvfs_run_ctxt saved;
-        struct obd_ioobj *o;
         struct niobuf_remote *rnb;
-        struct niobuf_local *lnb = NULL;
-        struct fsfilt_objinfo *fso;
-        struct dentry *dentry;
+        struct niobuf_local *lnb;
+        struct dentry *dentry = NULL;
         struct inode *inode;
-        int rc = 0, i, j, tot_bytes = 0, cleanup_phase = 0;
+        void *iobuf = NULL; 
+        int rc = 0, i, tot_bytes = 0;
         unsigned long now = jiffies;
         ENTRY;
 
         /* We are currently not supporting multi-obj BRW_READ RPCS at all.
          * When we do this function's dentry cleanup will need to be fixed */
-        LASSERT(objcount == 1);
-        LASSERT(obj->ioo_bufcnt > 0);
+        LASSERTF(objcount == 1, "%d\n", objcount);
+        LASSERTF(obj->ioo_bufcnt > 0, "%d\n", obj->ioo_bufcnt);
 
         if (oa && oa->o_valid & OBD_MD_FLGRANT) {
                 spin_lock(&obd->obd_osfs_lock);
                 filter_grant_incoming(exp, oa);
 
-#if 0
-                /* Reads do not increase grants */
-                oa->o_grant = filter_grant(exp, oa->o_grant, oa->o_undirty,
-                                           filter_grant_space_left(exp));
-#else
                 oa->o_grant = 0;
-#endif
+                
                 spin_unlock(&obd->obd_osfs_lock);
         }
 
-        OBD_ALLOC(fso, objcount * sizeof(*fso));
-        if (fso == NULL)
-                RETURN(-ENOMEM);
-
         memset(res, 0, niocount * sizeof(*res));
 
         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
-        for (i = 0, o = obj; i < objcount; i++, o++) {
-                LASSERT(o->ioo_bufcnt);
-
-                dentry = filter_oa2dentry(obd, oa);
-                if (IS_ERR(dentry))
-                        GOTO(cleanup, rc = PTR_ERR(dentry));
+        rc = filter_alloc_iobuf(OBD_BRW_READ, obj->ioo_bufcnt, &iobuf);
+        if (rc)
+                GOTO(cleanup, rc);
 
-                if (dentry->d_inode == NULL) {
-                        CERROR("trying to BRW to non-existent file "LPU64"\n",
-                               o->ioo_id);
-                        f_dput(dentry);
-                        GOTO(cleanup, rc = -ENOENT);
-                }
+        dentry = filter_oa2dentry(obd, oa);
+        if (IS_ERR(dentry))
+                GOTO(cleanup, rc = PTR_ERR(dentry));
 
-                fso[i].fso_dentry = dentry;
-                fso[i].fso_bufcnt = o->ioo_bufcnt;
+        if (dentry->d_inode == NULL) {
+                CERROR("trying to BRW to non-existent file "LPU64"\n",
+                               obj->ioo_id);
+                GOTO(cleanup, rc = -ENOENT);
         }
 
+        inode = dentry->d_inode; 
+
         if (time_after(jiffies, now + 15 * HZ))
                 CERROR("slow preprw_read setup %lus\n", (jiffies - now) / HZ);
         else
                 CDEBUG(D_INFO, "preprw_read setup: %lu jiffies\n",
                        (jiffies - now));
 
-        for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
-                dentry = fso[i].fso_dentry;
-                inode = dentry->d_inode;
-
-                for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
-                        lnb->dentry = dentry;
-                        lnb->offset = rnb->offset;
-                        lnb->len    = rnb->len;
-                        lnb->flags  = rnb->flags;
-
-                        if (inode->i_size <= rnb->offset) {
-                                /* If there's no more data, abort early.
-                                 * lnb->page == NULL and lnb->rc == 0, so it's
-                                 * easy to detect later. */
-                                break;
-                        } else {
-                                rc = filter_start_page_read(obd, inode, lnb);
-                        }
+        for (i = 0, lnb = res, rnb = nb; i < obj->ioo_bufcnt;
+             i++, rnb++, lnb++) {
+                lnb->dentry = dentry;
+                lnb->offset = rnb->offset;
+                lnb->len    = rnb->len;
+                lnb->flags  = rnb->flags;
 
-                        if (rc) {
-                                CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
-                                       "page err %u@"LPU64" %u/%u %p: rc %d\n",
-                                       lnb->len, lnb->offset, j, o->ioo_bufcnt,
-                                       dentry, rc);
-                                cleanup_phase = 1;
-                                GOTO(cleanup, rc);
-                        }
+                if (inode->i_size <= rnb->offset)
+                      /* If there's no more data, abort early.
+                      * lnb->page == NULL and lnb->rc == 0, so it's
+                      * easy to detect later. */
+                        break;
+                else
+                        rc = filter_alloc_dio_page(obd, inode, lnb);
+                if (rc) {
+                        CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
+                             "page err %u@"LPU64" %u/%u %p: rc %d\n",
+                              lnb->len, lnb->offset, i, obj->ioo_bufcnt,
+                              dentry, rc);
+                        GOTO(cleanup, rc);
+                }
 
-                        if (inode->i_size < lnb->offset + lnb->len - 1)
-                                lnb->rc = inode->i_size - lnb->offset;
-                        else
-                                lnb->rc = lnb->len;
+                if (inode->i_size < lnb->offset + lnb->len - 1)
+                        lnb->rc = inode->i_size - lnb->offset;
+                else
+                        lnb->rc = lnb->len;
 
-                        tot_bytes += lnb->rc;
-                        if (lnb->rc < lnb->len) {
-                                /* short read, be sure to wait on it */
-                                lnb++;
-                                break;
-                        }
-                }
+                tot_bytes += lnb->rc;
+
+                filter_iobuf_add_page(obd, iobuf, inode, lnb->page);
         }
 
         if (time_after(jiffies, now + 15 * HZ))
@@ -379,42 +353,33 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
                 CDEBUG(D_INFO, "start_page_read: %lu jiffies\n",
                        (jiffies - now));
 
-        lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES, tot_bytes);
-        while (lnb-- > res) {
-                rc = filter_finish_page_read(lnb);
-                if (rc) {
-                        CERROR("error page %u@"LPU64" %u %p: rc %d\n", lnb->len,
-                               lnb->offset, (int)(lnb - res), lnb->dentry, rc);
-                        cleanup_phase = 1;
-                        GOTO(cleanup, rc);
-                }
-        }
+        rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf, exp,
+                              NULL, NULL, NULL);
+        if (rc)
+                GOTO(cleanup, rc);
 
-        if (time_after(jiffies, now + 15 * HZ))
-                CERROR("slow finish_page_read %lus\n", (jiffies - now) / HZ);
-        else
-                CDEBUG(D_INFO, "finish_page_read: %lu jiffies\n",
-                       (jiffies - now));
+        lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES, tot_bytes);
 
         filter_tally_read(&exp->exp_obd->u.filter, res, niocount);
 
         EXIT;
 
- cleanup:
-        switch (cleanup_phase) {
-        case 1:
-                for (lnb = res; lnb < (res + niocount); lnb++) {
-                        if (lnb->page)
-                                page_cache_release(lnb->page);
-                }
-                if (res->dentry != NULL)
-                        f_dput(res->dentry);
+cleanup:
+        if (rc != 0) {
+                filter_free_dio_pages(objcount, obj, niocount, res);
+
+                if (dentry != NULL)
+                        f_dput(dentry);
                 else
                         CERROR("NULL dentry in cleanup -- tell CFS\n");
-        case 0:
-                OBD_FREE(fso, objcount * sizeof(*fso));
-                pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
         }
+
+        if (iobuf != NULL)
+                filter_free_iobuf(iobuf);
+
+        pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+        if (rc)
+                CERROR("io error %d\n", rc);
         return rc;
 }
 
@@ -525,55 +490,6 @@ static int filter_grant_check(struct obd_export *exp, int objcount,
         return rc;
 }
 
-static int filter_start_page_write(struct obd_device *obd, struct inode *inode,
-                                   struct niobuf_local *lnb)
-{
-        struct page *page;
-
-        if (lnb->len != PAGE_SIZE)
-                return filter_start_page_read(obd, inode, lnb);
-
-        page = alloc_pages(GFP_HIGHUSER, 0);
-        if (page == NULL) {
-                CERROR("no memory for a temp page\n");
-                RETURN(lnb->rc = -ENOMEM);
-        }
-#if 0
-        POISON_PAGE(page, 0xf1);
-        if (lnb->len != PAGE_SIZE) {
-                memset(kmap(page) + lnb->len, 0, PAGE_SIZE - lnb->len);
-                kunmap(page);
-        }
-#endif
-        page->index = lnb->offset >> PAGE_SHIFT;
-        lnb->page = page;
-
-        return 0;
-}
-
-static void filter_abort_page_write(struct niobuf_local *lnb)
-{
-        LASSERT(lnb->page != NULL);
-
-        if (lnb->len != PAGE_SIZE)
-                page_cache_release(lnb->page);
-        else
-                __free_pages(lnb->page, 0);
-}
-
-/* a helper for both the 2.4 and 2.6 commitrw paths which are both built
- * up by our shared filter_preprw_write() */
-void filter_release_write_page(struct filter_obd *filter, struct inode *inode,
-                               struct niobuf_local *lnb, int rc)
-{
-        if (lnb->len != PAGE_SIZE)
-                return filter_release_read_page(filter, inode, lnb->page);
-
-        if (rc == 0)
-                flip_into_page_cache(inode, lnb->page);
-        __free_page(lnb->page);
-}
-
 /* If we ever start to support multi-object BRW RPCs, we will need to get locks
  * on mulitple inodes.  That isn't all, because there still exists the
  * possibility of a truncate starting a new transaction while holding the ext3
@@ -592,24 +508,32 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
 {
         struct lvfs_run_ctxt saved;
         struct niobuf_remote *rnb;
-        struct niobuf_local *lnb;
+        struct niobuf_local *lnb = res;
         struct fsfilt_objinfo fso;
-        struct dentry *dentry;
+        struct dentry *dentry = NULL;
+        void *iobuf; 
         obd_size left;
         unsigned long now = jiffies;
-        int rc = 0, i, tot_bytes = 0, cleanup_phase = 1;
+        int rc = 0, i, tot_bytes = 0, cleanup_phase = 0;
         ENTRY;
         LASSERT(objcount == 1);
         LASSERT(obj->ioo_bufcnt > 0);
 
         memset(res, 0, niocount * sizeof(*res));
 
+        rc = filter_alloc_iobuf(OBD_BRW_READ, obj->ioo_bufcnt, &iobuf);
+        if (rc)
+                GOTO(cleanup, rc);
+        cleanup_phase = 1;
+
         push_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
         dentry = filter_fid2dentry(exp->exp_obd, NULL, obj->ioo_gr,
                                    obj->ioo_id);
         if (IS_ERR(dentry))
                 GOTO(cleanup, rc = PTR_ERR(dentry));
-
+        
+        cleanup_phase = 2;
+        
         if (dentry->d_inode == NULL) {
                 CERROR("trying to BRW to non-existent file "LPU64"\n",
                        obj->ioo_id);
@@ -629,7 +553,8 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
         spin_lock(&exp->exp_obd->obd_osfs_lock);
         if (oa)
                 filter_grant_incoming(exp, oa);
-        cleanup_phase = 0;
+        
+        cleanup_phase = 3;
 
         left = filter_grant_space_left(exp);
 
@@ -640,10 +565,8 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
 
         spin_unlock(&exp->exp_obd->obd_osfs_lock);
 
-        if (rc) {
-                f_dput(dentry);
+        if (rc) 
                 GOTO(cleanup, rc);
-        }
 
         for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt;
              i++, lnb++, rnb++) {
@@ -655,31 +578,39 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
                 lnb->len    = rnb->len;
                 lnb->flags  = rnb->flags;
 
-                rc = filter_start_page_write(exp->exp_obd, dentry->d_inode,lnb);
+                rc = filter_alloc_dio_page(exp->exp_obd, dentry->d_inode,lnb);
                 if (rc) {
                         CERROR("page err %u@"LPU64" %u/%u %p: rc %d\n",
                                lnb->len, lnb->offset,
                                i, obj->ioo_bufcnt, dentry, rc);
-                        while (lnb-- > res)
-                                filter_abort_page_write(lnb);
-                        f_dput(dentry);
                         GOTO(cleanup, rc);
                 }
+                cleanup_phase = 4;
+
+                /* If the filter writes a partial page, then has the file
+                 * extended, the client will read in the whole page.  the
+                 * filter has to be careful to zero the rest of the partial
+                 * page on disk.  we do it by hand for partial extending
+                 * writes, send_bio() is responsible for zeroing pages when
+                 * asked to read unmapped blocks -- brw_kiovec() does this. */
+                if (lnb->len != PAGE_SIZE) {
+                        if (lnb->offset + lnb->len < dentry->d_inode->i_size) {
+                                filter_iobuf_add_page(exp->exp_obd, iobuf,
+                                                      dentry->d_inode,
+                                                      lnb->page);
+                        } else {
+                                memset(kmap(lnb->page) + lnb->len, 0,
+                                       PAGE_SIZE - lnb->len);
+                                kunmap(lnb->page);
+                        }
+                }
                 if (lnb->rc == 0)
                         tot_bytes += lnb->len;
         }
 
-        while (lnb-- > res) {
-                if (lnb->len == PAGE_SIZE)
-                        continue;
-                rc = filter_finish_page_read(lnb);
-                if (rc) {
-                        CERROR("error page %u@"LPU64" %u %p: rc %d\n", lnb->len,
-                               lnb->offset, (int)(lnb - res), lnb->dentry, rc);
-                        GOTO(cleanup, rc);
-                }
-        }
-
+        rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf, exp,
+                              NULL, NULL, NULL);
+        
         if (time_after(jiffies, now + 15 * HZ))
                 CERROR("slow start_page_write %lus\n", (jiffies - now) / HZ);
         else
@@ -691,15 +622,28 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
         EXIT;
 cleanup:
         switch(cleanup_phase) {
+        case 4:
+                if (rc)
+                        filter_free_dio_pages(objcount, obj, niocount, res);
+        case 3:
+                pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+                filter_free_iobuf(iobuf);
+        case 2:
+                if (rc)
+                        f_dput(dentry);
+                break;
         case 1:
                 spin_lock(&exp->exp_obd->obd_osfs_lock);
                 if (oa)
                         filter_grant_incoming(exp, oa);
                 spin_unlock(&exp->exp_obd->obd_osfs_lock);
-        default: ;
+                pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
+                filter_free_iobuf(iobuf);
+                break;
+        default:;
+        
         }
-        pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);
-        return rc;
+        RETURN(rc);
 }
 
 int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
@@ -742,24 +686,14 @@ static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa,
                                 int niocount, struct niobuf_local *res,
                                 struct obd_trans_info *oti, int rc)
 {
-        struct obd_ioobj *o;
-        struct niobuf_local *lnb;
-        int i, j;
         struct inode *inode = NULL;
         ENTRY;
 
         if (res->dentry != NULL)
                 inode = res->dentry->d_inode;
 
-        for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
-                for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
-                        if (lnb->page == NULL)
-                                continue;
-                        filter_release_read_page(&exp->exp_obd->u.filter,
-                                                 inode, lnb->page);
-                }
-        }
-
+        filter_free_dio_pages(objcount, obj, niocount, res);
+        
         if (res->dentry != NULL)
                 f_dput(res->dentry);
         RETURN(rc);
@@ -992,7 +926,7 @@ int filter_brw(int cmd, struct obd_export *exp, struct obdo *oa,
                 GOTO(out, ret = -ENOMEM);
 
         for (i = 0; i < oa_bufs; i++) {
-                rnb[i].offset = pga[i].off;
+                rnb[i].offset = pga[i].disk_offset;
                 rnb[i].len = pga[i].count;
         }
 
@@ -1004,9 +938,16 @@ int filter_brw(int cmd, struct obd_export *exp, struct obdo *oa,
                 GOTO(out, ret);
 
         for (i = 0; i < oa_bufs; i++) {
-                void *virt = kmap(pga[i].pg);
-                obd_off off = pga[i].off & ~PAGE_MASK;
-                void *addr = kmap(lnb[i].page);
+                void *virt;
+                obd_off off;
+                void *addr;
+
+                if (lnb[i].page == NULL)
+                        break;
+
+                off = pga[i].disk_offset & ~PAGE_MASK;
+                virt = kmap(pga[i].pg);
+                addr = kmap(lnb[i].page);
 
                 /* 2 kmaps == vanishingly small deadlock opportunity */