Whamcloud - gitweb
b=1141
authorpschwan <pschwan>
Mon, 7 Jul 2003 05:55:29 +0000 (05:55 +0000)
committerpschwan <pschwan>
Mon, 7 Jul 2003 05:55:29 +0000 (05:55 +0000)
This patch breaks the previous single brw disk transaction into two:
one opened and closed in each of preprw and commitrw.  It also avoids
holding the i_sem across the brw.  No stunning results yet, but I expect
that our behaviour in the face of client bulk timeout is substantially
improved.  Some more analysis will be required before we turn this
into a performance win, I think.

lustre/obdfilter/filter_io.c

index 8c3ed83..4ac5ac1 100644 (file)
@@ -97,42 +97,6 @@ err_page:
         return lnb->rc;
 }
 
-static struct page *lustre_get_page_write(struct inode *inode,
-                                          unsigned long index)
-{
-        struct address_space *mapping = inode->i_mapping;
-        struct page *page;
-        int rc;
-
-        page = grab_cache_page(mapping, index); /* locked page */
-
-        if (!IS_ERR(page)) {
-                /* Note: Called with "O" and "PAGE_SIZE" this is essentially
-                 * a no-op for most filesystems, because we write the whole
-                 * page.  For partial-page I/O this will read in the page.
-                 */
-                rc = mapping->a_ops->prepare_write(NULL, page, 0, PAGE_SIZE);
-                if (rc) {
-                        CERROR("page index %lu, rc = %d\n", index, rc);
-                        if (rc != -ENOSPC)
-                                LBUG();
-                        GOTO(err_unlock, rc);
-                }
-                /* XXX not sure if we need this if we are overwriting page */
-                if (PageError(page)) {
-                        CERROR("error on page index %lu, rc = %d\n", index, rc);
-                        LBUG();
-                        GOTO(err_unlock, rc = -EIO);
-                }
-        }
-        return page;
-
-err_unlock:
-        unlock_page(page);
-        page_cache_release(page);
-        return ERR_PTR(rc);
-}
-
 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
 int wait_on_page_locked(struct page *page)
 {
@@ -154,81 +118,6 @@ static inline void inode_update_time(struct inode *inode, int ctime_too)
 }
 #endif
 
-static int lustre_commit_write(struct niobuf_local *lnb)
-{
-        struct page *page = lnb->page;
-        unsigned from = lnb->offset & ~PAGE_MASK;
-        unsigned to = from + lnb->len;
-        struct inode *inode = page->mapping->host;
-        int err;
-
-        LASSERT(to <= PAGE_SIZE);
-        err = page->mapping->a_ops->commit_write(NULL, page, from, to);
-        if (!err && IS_SYNC(inode))
-                err = wait_on_page_locked(page);
-        //SetPageUptodate(page); // the client commit_write will do this
-
-        SetPageReferenced(page);
-        unlock_page(page);
-        page_cache_release(page);
-        return err;
-}
-
-int filter_get_page_write(struct inode *inode, struct niobuf_local *lnb,
-                          int *pglocked)
-{
-        unsigned long index = lnb->offset >> PAGE_SHIFT;
-        struct address_space *mapping = inode->i_mapping;
-        struct page *page;
-        int rc;
-
-        //ASSERT_PAGE_INDEX(index, GOTO(err, rc = -EINVAL));
-        if (*pglocked)
-                page = grab_cache_page_nowait(mapping, index); /* locked page */
-        else
-                page = grab_cache_page(mapping, index); /* locked page */
-
-
-        /* This page is currently locked, so get a temporary page instead. */
-        if (page == NULL) {
-                CDEBUG(D_ERROR,"ino %lu page %ld locked\n", inode->i_ino,index);
-                page = alloc_pages(GFP_KERNEL, 0); /* locked page */
-                if (page == NULL) {
-                        CERROR("no memory for a temp page\n");
-                        GOTO(err, rc = -ENOMEM);
-                }
-                page->index = index;
-                lnb->page = page;
-                lnb->flags |= N_LOCAL_TEMP_PAGE;
-        } else if (!IS_ERR(page)) {
-                (*pglocked)++;
-
-                rc = mapping->a_ops->prepare_write(NULL, page,
-                                                   lnb->offset & ~PAGE_MASK,
-                                                   lnb->len);
-                if (rc) {
-                        if (rc != -ENOSPC)
-                                CERROR("page index %lu, rc = %d\n", index, rc);
-                        GOTO(err_unlock, rc);
-                }
-                /* XXX not sure if we need this if we are overwriting page */
-                if (PageError(page)) {
-                        CERROR("error on page index %lu, rc = %d\n", index, rc);
-                        LBUG();
-                        GOTO(err_unlock, rc = -EIO);
-                }
-                lnb->page = page;
-        }
-
-        return 0;
-
-err_unlock:
-        unlock_page(page);
-        page_cache_release(page);
-err:
-        return lnb->rc = rc;
-}
-
 static int filter_preprw_read(struct obd_export *exp, struct obdo *obdo,
                               int objcount, struct obd_ioobj *obj,
                               int niocount, struct niobuf_remote *nb,
@@ -239,7 +128,7 @@ static int filter_preprw_read(struct obd_export *exp, struct obdo *obdo,
         struct obd_device *obd;
         struct obd_ioobj *o;
         struct niobuf_remote *rnb;
-        struct niobuf_local *lnb;
+        struct niobuf_local *lnb = NULL;
         struct fsfilt_objinfo *fso;
         struct dentry *dentry;
         struct inode *inode;
@@ -264,13 +153,13 @@ static int filter_preprw_read(struct obd_export *exp, struct obdo *obdo,
 
                 dentry = filter_fid2dentry(obd, NULL, o->ioo_type, o->ioo_id);
                 if (IS_ERR(dentry))
-                        GOTO(out_objinfo, rc = PTR_ERR(dentry));
+                        GOTO(cleanup, rc = PTR_ERR(dentry));
 
                 if (dentry->d_inode == NULL) {
                         CERROR("trying to BRW to non-existent file "LPU64"\n",
                                o->ioo_id);
                         f_dput(dentry);
-                        GOTO(out_objinfo, rc = -ENOENT);
+                        GOTO(cleanup, rc = -ENOENT);
                 }
 
                 fso[i].fso_dentry = dentry;
@@ -283,18 +172,17 @@ static int filter_preprw_read(struct obd_export *exp, struct obdo *obdo,
         }
 
         if (time_after(jiffies, now + 15 * HZ))
-                CERROR("slow prep setup %lus\n", (jiffies - now) / HZ);
+                CERROR("slow preprw_read setup %lus\n", (jiffies - now) / HZ);
+        else
+                CDEBUG(D_INFO, "preprw_read setup: %lu jiffies\n",
+                       (jiffies - now));
 
         for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
                 dentry = fso[i].fso_dentry;
                 inode = dentry->d_inode;
 
                 for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
-                        if (j == 0)
-                                lnb->dentry = dentry;
-                        else
-                                lnb->dentry = dget(dentry);
-
+                        lnb->dentry = dget(dentry);
                         lnb->offset = rnb->offset;
                         lnb->len    = rnb->len;
                         lnb->flags  = rnb->flags;
@@ -317,7 +205,8 @@ static int filter_preprw_read(struct obd_export *exp, struct obdo *obdo,
                                        lnb->len, lnb->offset, j, o->ioo_bufcnt,
                                        dentry, rc);
                                 f_dput(dentry);
-                                GOTO(out_pages, rc);
+                                cleanup_phase = 1;
+                                GOTO(cleanup, rc);
                         }
 
                         tot_bytes += lnb->rc;
@@ -327,7 +216,10 @@ static int filter_preprw_read(struct obd_export *exp, struct obdo *obdo,
         }
 
         if (time_after(jiffies, now + 15 * HZ))
-                CERROR("slow prep get page %lus\n", (jiffies - now) / HZ);
+                CERROR("slow start_page_read %lus\n", (jiffies - now) / HZ);
+        else
+                CDEBUG(D_INFO, "start_page_read: %lu jiffies\n",
+                       (jiffies - now));
 
         lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES, tot_bytes);
         while (lnb-- > res) {
@@ -336,32 +228,106 @@ static int filter_preprw_read(struct obd_export *exp, struct obdo *obdo,
                         CERROR("error page %u@"LPU64" %u %p: rc %d\n", lnb->len,
                                lnb->offset, (int)(lnb - res), lnb->dentry, rc);
                         f_dput(lnb->dentry);
-                        GOTO(out_pages, rc);
+                        cleanup_phase = 1;
+                        GOTO(cleanup, rc);
                 }
         }
 
         if (time_after(jiffies, now + 15 * HZ))
-                CERROR("slow prep finish page %lus\n", (jiffies - now) / HZ);
+                CERROR("slow finish_page_read %lus\n", (jiffies - now) / HZ);
+        else
+                CDEBUG(D_INFO, "finish_page_read: %lu jiffies\n",
+                       (jiffies - now));
 
         EXIT;
-out:
-        OBD_FREE(fso, objcount * sizeof(*fso));
-        /* we saved the journal handle into oti->oti_handle instead */
-        current->journal_info = NULL;
-        pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+ cleanup:
+        switch (cleanup_phase) {
+        case 1:
+                while (lnb-- > res) {
+                        page_cache_release(lnb->page);
+                        f_dput(lnb->dentry);
+                }
+        case 0:
+                for (i = 0; i < objcount; i++) {
+                        if (fso[i].fso_dentry == NULL)
+                                break;
+                        f_dput(fso[i].fso_dentry);
+                }
+                OBD_FREE(fso, objcount * sizeof(*fso));
+                pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+        }
         return rc;
+}
 
-out_pages:
-        while (lnb-- > res) {
-                page_cache_release(lnb->page);
-                f_dput(lnb->dentry);
+static int internal_start_write(struct page *page, struct niobuf_local *lnb)
+{
+        unsigned long index = lnb->offset >> PAGE_SHIFT;
+        struct address_space *mapping = page->mapping;
+        int rc;
+
+        /* Note: Called with "O" and "PAGE_SIZE" this is essentially a no-op for
+         * most filesystems, because we write the whole page.  For partial-page
+         * I/O this will read in the page. */
+        rc = mapping->a_ops->prepare_write(NULL, page, lnb->offset & ~PAGE_MASK,
+                                           lnb->len);
+        if (rc) {
+                CERROR("page index %lu, rc = %d\n", index, rc);
+                if (rc != -ENOSPC)
+                        LBUG();
+                GOTO(err_unlock, rc);
+        }
+        /* XXX not sure if we need this if we are overwriting page */
+        if (PageError(page)) {
+                CERROR("error on page index %lu, rc = %d\n", index, rc);
+                LBUG();
+                GOTO(err_unlock, rc = -EIO);
         }
-        goto out; /* dropped the dentry refs already (one per page) */
+        return rc;
+
+err_unlock:
+        unlock_page(page);
+        page_cache_release(page);
+        return rc;
+}
+
+static int filter_start_page_write(struct inode *inode,
+                                   struct niobuf_local *lnb, int *pglocked)
+{
+        unsigned long index = lnb->offset >> PAGE_SHIFT;
+        struct address_space *mapping = inode->i_mapping;
+        struct page *page;
+        int rc;
+        ENTRY;
+
+        //ASSERT_PAGE_INDEX(index, GOTO(err, rc = -EINVAL));
+        if (*pglocked)
+                page = grab_cache_page_nowait(mapping, index); /* locked page */
+        else
+                page = grab_cache_page(mapping, index); /* locked page */
+
+        /* This page is currently locked, so get a temporary page instead. */
+        if (page == NULL) {
+                CDEBUG(D_ERROR,"ino %lu page %ld locked\n", inode->i_ino,index);
+                page = alloc_pages(GFP_KERNEL, 0); /* locked page */
+                if (page == NULL) {
+                        CERROR("no memory for a temp page\n");
+                        RETURN(lnb->rc = -ENOMEM);
+                }
+                page->index = index;
+                lnb->page = page;
+                lnb->flags |= N_LOCAL_TEMP_PAGE;
+        } else if (!IS_ERR(page)) {
+                (*pglocked)++;
 
-out_objinfo:
-        for (i = 0; i < objcount && fso[i].fso_dentry; i++)
-                f_dput(fso[i].fso_dentry);
-        goto out;
+                rc = internal_start_write(page, lnb);
+                if (rc) {
+                        if (rc != -ENOSPC)
+                                CERROR("page index %lu, rc = %d\n", index, rc);
+                        RETURN(lnb->rc = rc);
+                }
+                lnb->page = page;
+        }
+        RETURN(0);
 }
 
 /* We need to balance prepare_write() calls with commit_write() calls.
@@ -375,6 +341,12 @@ out_objinfo:
  *     were newly allocated from disk. We use the BH_New flag similarly. */
 static int filter_commit_write(struct niobuf_local *lnb, int err)
 {
+        struct page *page = lnb->page;
+        unsigned from = lnb->offset & ~PAGE_MASK;
+        unsigned to = from + lnb->len;
+        struct inode *inode = page->mapping->host;
+        int rc;
+
 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
         if (err) {
                 unsigned block_start, block_end;
@@ -398,7 +370,87 @@ static int filter_commit_write(struct niobuf_local *lnb, int err)
                 }
         }
 #endif
-        return lustre_commit_write(lnb);
+
+        LASSERT(to <= PAGE_SIZE);
+        rc = page->mapping->a_ops->commit_write(NULL, page, from, to);
+        if (!rc && IS_SYNC(inode))
+                rc = wait_on_page_locked(page);
+        //SetPageUptodate(page); // the client commit_write will do this
+
+        SetPageReferenced(page);
+        return rc;
+}
+
+/* It is highly unlikely that we would ever get an error here.  The page we want
+ * to get was previously locked, so it had to have already allocated the space,
+ * and we were just writing over the same data, so there would be no hole in the
+ * file.
+ *
+ * XXX: possibility of a race with truncate could exist, need to check that.
+ *      There are no guarantees w.r.t. write order even on a local filesystem,
+ *      although the normal response would be to return the number of bytes
+ *      successfully written and leave the rest to the app. */
+static int filter_write_locked_page(struct niobuf_local *lnb)
+{
+        struct address_space *mapping = lnb->dentry->d_inode->i_mapping;
+        struct page *lpage;
+        void *lpage_addr, *lnb_addr;
+        int rc;
+        ENTRY;
+
+        lpage = grab_cache_page(mapping, lnb->page->index); /* locked page */
+        if (IS_ERR(lpage)) {
+                rc = PTR_ERR(lpage);
+                CERROR("error getting locked page index %ld: rc = %d\n",
+                       lnb->page->index, rc);
+                LBUG();
+                //filter_commit_write(lnb, 0);
+                RETURN(rc);
+        }
+
+        rc = internal_start_write(lpage, lnb);
+        if (rc) {
+                CERROR("error getting locked page index %ld: rc = %d\n",
+                       lnb->page->index, rc);
+                LBUG();
+                //filter_commit_write(lnb, 0);
+                RETURN(rc);
+        }
+
+        /* 2 kmaps == vanishingly small deadlock opportunity */
+        lpage_addr = kmap(lpage);
+        lnb_addr = kmap(lnb->page);
+
+        memcpy(lpage_addr, lnb_addr, PAGE_SIZE);
+
+        kunmap(lnb->page);
+        kunmap(lpage);
+
+        page_cache_release(lnb->page);
+
+        lnb->page = lpage;
+        rc = filter_commit_write(lnb, 0);
+        if (rc)
+                CERROR("error committing locked page %ld: rc = %d\n",
+                       lnb->page->index, rc);
+        RETURN(rc);
+}
+
+static int filter_commit_page(int cmd, struct niobuf_local *lnb)
+{
+        int rc;
+        if (cmd & OBD_BRW_READ) {
+                page_cache_release(lnb->page);
+                return 0;
+        }
+
+        if (lnb->flags & N_LOCAL_TEMP_PAGE)
+                rc = filter_write_locked_page(lnb);
+        else
+                rc = filter_commit_write(lnb, 0);
+        unlock_page(lnb->page);
+        page_cache_release(lnb->page);
+        return rc;
 }
 
 /* If we ever start to support multi-object BRW RPCs, we will need to get locks
@@ -421,10 +473,11 @@ static int filter_preprw_write(struct obd_export *exp, struct obdo *obdo,
         struct obd_device *obd;
         struct obd_ioobj *o;
         struct niobuf_remote *rnb;
-        struct niobuf_local *lnb;
+        struct niobuf_local *lnb = NULL;
         struct fsfilt_objinfo *fso;
         struct dentry *dentry;
         int pglocked = 0, rc = 0, i, j, tot_bytes = 0, cleanup_phase = 0;
+        int nested_trans = current->journal_info != NULL;
         unsigned long now = jiffies;
         ENTRY;
         LASSERT(objcount == 1);
@@ -445,13 +498,13 @@ static int filter_preprw_write(struct obd_export *exp, struct obdo *obdo,
 
                 dentry = filter_fid2dentry(obd, NULL, o->ioo_type, o->ioo_id);
                 if (IS_ERR(dentry))
-                        GOTO(out_objinfo, rc = PTR_ERR(dentry));
+                        GOTO(cleanup, rc = PTR_ERR(dentry));
 
                 if (dentry->d_inode == NULL) {
                         CERROR("trying to BRW to non-existent file "LPU64"\n",
                                o->ioo_id);
                         f_dput(dentry);
-                        GOTO(out_objinfo, rc = -ENOENT);
+                        GOTO(cleanup, rc = -ENOENT);
                 }
 
                 fso[i].fso_dentry = dentry;
@@ -465,7 +518,10 @@ static int filter_preprw_write(struct obd_export *exp, struct obdo *obdo,
         }
 
         if (time_after(jiffies, now + 15 * HZ))
-                CERROR("slow prep setup %lus\n", (jiffies - now) / HZ);
+                CERROR("slow preprw_write setup %lus\n", (jiffies - now) / HZ);
+        else
+                CDEBUG(D_INFO, "preprw_write setup: %lu jiffies\n",
+                       (jiffies - now));
 
         LASSERT(oti != NULL);
         oti->oti_handle = fsfilt_brw_start(obd, objcount, fso, niocount, oti);
@@ -474,69 +530,77 @@ static int filter_preprw_write(struct obd_export *exp, struct obdo *obdo,
                 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
                        "error starting transaction: rc = %d\n", rc);
                 oti->oti_handle = NULL;
-                GOTO(out_objinfo, rc);
+                GOTO(cleanup, rc);
         }
 
         for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
                 dentry = fso[i].fso_dentry;
                 for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
-                        if (j == 0)
-                                lnb->dentry = dentry;
-                        else
-                                lnb->dentry = dget(dentry);
-
+                        lnb->dentry = dget(dentry);
                         lnb->offset = rnb->offset;
                         lnb->len    = rnb->len;
                         lnb->flags  = rnb->flags;
                         lnb->start  = jiffies;
 
-                        rc = filter_get_page_write(dentry->d_inode, lnb,
-                                                   &pglocked);
-                        if (rc)
-                                up(&dentry->d_inode->i_sem);
-
+                        rc = filter_start_page_write(dentry->d_inode, lnb,
+                                                     &pglocked);
                         if (rc) {
                                 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
                                        "page err %u@"LPU64" %u/%u %p: rc %d\n",
                                        lnb->len, lnb->offset, j, o->ioo_bufcnt,
                                        dentry, rc);
-                                f_dput(dentry);
-                                GOTO(out_pages, rc);
+                                f_dput(fso[i].fso_dentry);
+                                cleanup_phase = 2;
+                                GOTO(cleanup, rc);
                         }
+                }
+        }
+
+        if (time_after(jiffies, now + 15 * HZ))
+                CERROR("slow start_page_write %lus\n", (jiffies - now) / HZ);
+        else
+                CDEBUG(D_INFO, "start_page_write: %lu jiffies\n",
+                       (jiffies - now));
+
+        for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
+                for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
+                        fsfilt_commit(obd, NULL, oti->oti_handle, 0);
                         tot_bytes += lnb->len;
                 }
         }
 
         if (time_after(jiffies, now + 15 * HZ))
-                CERROR("slow prep get page %lus\n", (jiffies - now) / HZ);
+                CERROR("slow prep write commit %lus\n", (jiffies - now) / HZ);
+        else
+                CDEBUG(D_INFO, "fsfilt_commit setup: %lu jiffies\n",
+                       (jiffies - now));
 
         lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,tot_bytes);
 
+        cleanup_phase = 1;
         EXIT;
-out:
-        OBD_FREE(fso, objcount * sizeof(*fso));
-        /* we saved the journal handle into oti->oti_handle instead */
-        current->journal_info = NULL;
-        pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
-        return rc;
-
-out_pages:
-        while (lnb-- > res) {
-                filter_commit_write(lnb, rc);
-                up(&lnb->dentry->d_inode->i_sem);
-                f_dput(lnb->dentry);
-        }
-        filter_finish_transno(exp, oti, rc);
-        fsfilt_commit(obd, filter_parent(obd,S_IFREG,obj->ioo_id)->d_inode,
-                      oti->oti_handle, 0);
-        goto out; /* dropped the dentry refs already (one per page) */
-
-out_objinfo:
-        for (i = 0; i < objcount && fso[i].fso_dentry; i++) {
-                up(&fso[i].fso_dentry->d_inode->i_sem);
-                f_dput(fso[i].fso_dentry);
+ cleanup:
+        switch (cleanup_phase) {
+        case 2:
+                while (lnb-- > res) {
+                        filter_commit_page(OBD_BRW_WRITE, lnb);
+                        f_dput(lnb->dentry);
+                }
+        case 1:
+                fsfilt_commit(obd, NULL, oti->oti_handle, 0);
+                oti->oti_handle = NULL;
+        case 0:
+                for (i = 0; i < objcount; i++) {
+                        if (fso[i].fso_dentry == NULL)
+                                break;
+                        up(&fso[i].fso_dentry->d_inode->i_sem);
+                        f_dput(fso[i].fso_dentry);
+                }
+                OBD_FREE(fso, objcount * sizeof(*fso));
+                pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+                LASSERT(nested_trans || current->journal_info == NULL);
         }
-        goto out;
+        return rc;
 }
 
 int filter_preprw(int cmd, struct obd_export *exp, struct obdo *obdo,
@@ -552,51 +616,7 @@ int filter_preprw(int cmd, struct obd_export *exp, struct obdo *obdo,
                                           nb, res, oti);
         else
                 LBUG();
-}
-
-/* It is highly unlikely that we would ever get an error here.  The page we want
- * to get was previously locked, so it had to have already allocated the space,
- * and we were just writing over the same data, so there would be no hole in the
- * file.
- *
- * XXX: possibility of a race with truncate could exist, need to check that.
- *      There are no guarantees w.r.t. write order even on a local filesystem,
- *      although the normal response would be to return the number of bytes
- *      successfully written and leave the rest to the app. */
-static int filter_write_locked_page(struct niobuf_local *lnb)
-{
-        struct page *lpage;
-        void *lpage_addr, *lnb_addr;
-        int rc;
-        ENTRY;
-
-        lpage = lustre_get_page_write(lnb->dentry->d_inode, lnb->page->index);
-        if (IS_ERR(lpage)) {
-                rc = PTR_ERR(lpage);
-                CERROR("error getting locked page index %ld: rc = %d\n",
-                       lnb->page->index, rc);
-                LBUG();
-                lustre_commit_write(lnb);
-                RETURN(rc);
-        }
-
-        /* 2 kmaps == vanishingly small deadlock opportunity */
-        lpage_addr = kmap(lpage);
-        lnb_addr = kmap(lnb->page);
-
-        memcpy(lpage_addr, lnb_addr, PAGE_SIZE);
-
-        kunmap(lnb->page);
-        kunmap(lpage);
-
-        page_cache_release(lnb->page);
-
-        lnb->page = lpage;
-        rc = lustre_commit_write(lnb);
-        if (rc)
-                CERROR("error committing locked page %ld: rc = %d\n",
-                       lnb->page->index, rc);
-        RETURN(rc);
+        return 0;
 }
 
 int filter_commitrw(int cmd, struct obd_export *exp, int objcount,
@@ -607,81 +627,67 @@ int filter_commitrw(int cmd, struct obd_export *exp, int objcount,
         struct obd_ioobj *o;
         struct niobuf_local *lnb;
         struct obd_device *obd = exp->exp_obd;
-        int found_locked = 0, rc = 0, i;
-        int nested_trans = current->journal_info != NULL;
+        struct fsfilt_objinfo *fso = NULL;
+        int rc = 0, i, j, nested_trans = current->journal_info != NULL;
+        int cleanup_phase = 0;
         unsigned long now = jiffies;  /* DEBUGGING OST TIMEOUTS */
         ENTRY;
 
         push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+        if (cmd & OBD_BRW_READ)
+                goto shared;
 
-        if (cmd & OBD_BRW_WRITE) {
-                LASSERT(oti);
-                LASSERT(current->journal_info == NULL ||
-                        current->journal_info == oti->oti_handle);
-                current->journal_info = oti->oti_handle;
-        }
+        LASSERT(oti != NULL);
+        LASSERT(current->journal_info == NULL ||
+                current->journal_info == oti->oti_handle);
+        current->journal_info = oti->oti_handle;
+        OBD_ALLOC(fso, objcount * sizeof(*fso));
+        if (fso == NULL)
+                GOTO(cleanup, rc = -ENOMEM);
 
+        cleanup_phase = 1;
         for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
-                int j;
+                fso[i].fso_dentry = lnb->dentry;
+                fso[i].fso_bufcnt = o->ioo_bufcnt;
+                down(&lnb->dentry->d_inode->i_sem);
+        }
+
+        oti->oti_handle = fsfilt_brw_start(obd, objcount, fso, niocount, oti);
+        if (IS_ERR(oti->oti_handle)) {
+                rc = PTR_ERR(oti->oti_handle);
+                CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
+                       "error starting transaction: rc = %d\n", rc);
+                oti->oti_handle = NULL;
+                cleanup_phase = 2;
+                GOTO(cleanup, rc);
+        }
+
+        for (i = 0, o = obj, lnb = res; i < objcount; i++, o++)
+                for (j = 0; j < o->ioo_bufcnt; j++, lnb++)
+                        fsfilt_brw_start(obd, objcount, fso, niocount, oti);
 
+ shared:
+        for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
                 if (cmd & OBD_BRW_WRITE) {
                         inode_update_time(lnb->dentry->d_inode, 1);
                         up(&lnb->dentry->d_inode->i_sem);
                 }
                 for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
-                        if (lnb->page == NULL) {
+                        if (lnb->page == NULL)
                                 continue;
-                        }
-
-                        if (lnb->flags & N_LOCAL_TEMP_PAGE) {
-                                found_locked++;
-                                continue;
-                        }
 
                         if (time_after(jiffies, lnb->start + 15 * HZ))
                                 CERROR("slow commitrw %lus\n",
                                        (jiffies - lnb->start) / HZ);
 
-                        if (cmd & OBD_BRW_WRITE) {
-                                int err = filter_commit_write(lnb, 0);
+                        filter_commit_page(cmd, lnb);
 
-                                if (!rc)
-                                        rc = err;
-                        } else {
-                                page_cache_release(lnb->page);
-                        }
-
-                        f_dput(lnb->dentry);
                         if (time_after(jiffies, lnb->start + 15 * HZ))
                                 CERROR("slow commit_write %lus\n",
                                        (jiffies - lnb->start) / HZ);
                 }
         }
 
-        for (i = 0, o = obj, lnb = res; found_locked > 0 && i < objcount;
-             i++, o++) {
-                int j;
-                for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
-                        int err;
-                        if (!(lnb->flags & N_LOCAL_TEMP_PAGE))
-                                continue;
-
-                        if (time_after(jiffies, lnb->start + 15 * HZ))
-                                CERROR("slow commitrw locked %lus\n",
-                                       (jiffies - lnb->start) / HZ);
-
-                        err = filter_write_locked_page(lnb);
-                        if (!rc)
-                                rc = err;
-                        f_dput(lnb->dentry);
-                        found_locked--;
-
-                        if (time_after(jiffies, lnb->start + 15 * HZ))
-                                CERROR("slow commit_write locked %lus\n",
-                                       (jiffies - lnb->start) / HZ);
-                }
-        }
-
         if (cmd & OBD_BRW_WRITE) {
                 /* We just want any dentry for the commit, for now */
                 struct dentry *dparent = filter_parent(obd, S_IFREG, 0);
@@ -698,6 +704,19 @@ int filter_commitrw(int cmd, struct obd_export *exp, int objcount,
                         CERROR("slow commitrw commit %lus\n", (jiffies-now)/HZ);
         }
 
+ cleanup:
+        switch (cleanup_phase) {
+        case 2:
+                for (i = 0, lnb = res; i < objcount; i++)
+                        up(&lnb->dentry->d_inode->i_sem);
+        case 1:
+                if (fso != NULL)
+                        OBD_FREE(fso, objcount * sizeof(*fso));
+        case 0:
+                for (i = 0, o = obj, lnb = res; i < objcount; i++, o++)
+                        for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++)
+                                f_dput(lnb->dentry);
+        }
         LASSERT(nested_trans || current->journal_info == NULL);
         pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
         RETURN(rc);