Whamcloud - gitweb
b: 2226
[fs/lustre-release.git] / lustre / obdfilter / filter_io.c
index c13b986..1ce8825 100644 (file)
@@ -43,8 +43,8 @@ static int filter_start_page_read(struct inode *inode, struct niobuf_local *lnb)
         int rc;
 
         page = grab_cache_page(mapping, index); /* locked page */
-        if (IS_ERR(page))
-                return lnb->rc = PTR_ERR(page);
+        if (page == NULL)
+                return lnb->rc = -ENOMEM;
 
         LASSERT(page->mapping == mapping);
 
@@ -99,138 +99,6 @@ err_page:
         return lnb->rc;
 }
 
-static struct page *lustre_get_page_write(struct inode *inode,
-                                          unsigned long index)
-{
-        struct address_space *mapping = inode->i_mapping;
-        struct page *page;
-        int rc;
-
-        page = grab_cache_page(mapping, index); /* locked page */
-
-        if (!IS_ERR(page)) {
-                /* Note: Called with "O" and "PAGE_SIZE" this is essentially
-                 * a no-op for most filesystems, because we write the whole
-                 * page.  For partial-page I/O this will read in the page.
-                 */
-                rc = mapping->a_ops->prepare_write(NULL, page, 0, PAGE_SIZE);
-                if (rc) {
-                        CERROR("page index %lu, rc = %d\n", index, rc);
-                        if (rc != -ENOSPC)
-                                LBUG();
-                        GOTO(err_unlock, rc);
-                }
-                /* XXX not sure if we need this if we are overwriting page */
-                if (PageError(page)) {
-                        CERROR("error on page index %lu, rc = %d\n", index, rc);
-                        LBUG();
-                        GOTO(err_unlock, rc = -EIO);
-                }
-        }
-        return page;
-
-err_unlock:
-        unlock_page(page);
-        page_cache_release(page);
-        return ERR_PTR(rc);
-}
-
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-int wait_on_page_locked(struct page *page)
-{
-        waitfor_one_page(page);
-        return 0;
-}
-
-/* We should only change the file mtime (and not the ctime, like
- * update_inode_times() in generic_file_write()) when we only change data. */
-static inline void inode_update_time(struct inode *inode, int ctime_too)
-{
-        time_t now = CURRENT_TIME;
-        if (inode->i_mtime == now && (!ctime_too || inode->i_ctime == now))
-                return;
-        inode->i_mtime = now;
-        if (ctime_too)
-                inode->i_ctime = now;
-        mark_inode_dirty_sync(inode);
-}
-#endif
-
-static int lustre_commit_write(struct niobuf_local *lnb)
-{
-        struct page *page = lnb->page;
-        unsigned from = lnb->offset & ~PAGE_MASK;
-        unsigned to = from + lnb->len;
-        struct inode *inode = page->mapping->host;
-        int err;
-
-        LASSERT(to <= PAGE_SIZE);
-        err = page->mapping->a_ops->commit_write(NULL, page, from, to);
-#warning 2.4 folks: wait_on_page_locked does NOT return its error here.
-        if (!err && IS_SYNC(inode))
-                wait_on_page_locked(page);
-        //SetPageUptodate(page); // the client commit_write will do this
-
-        SetPageReferenced(page);
-        unlock_page(page);
-        page_cache_release(page);
-        return err;
-}
-
-int filter_get_page_write(struct inode *inode, struct niobuf_local *lnb,
-                          int *pglocked)
-{
-        unsigned long index = lnb->offset >> PAGE_SHIFT;
-        struct address_space *mapping = inode->i_mapping;
-        struct page *page;
-        int rc;
-
-        //ASSERT_PAGE_INDEX(index, GOTO(err, rc = -EINVAL));
-        if (*pglocked)
-                page = grab_cache_page_nowait(mapping, index); /* locked page */
-        else
-                page = grab_cache_page(mapping, index); /* locked page */
-
-
-        /* This page is currently locked, so get a temporary page instead. */
-        if (page == NULL) {
-                CDEBUG(D_INFO, "ino %lu page %ld locked\n", inode->i_ino,index);
-                page = alloc_pages(GFP_KERNEL, 0); /* locked page */
-                if (page == NULL) {
-                        CERROR("no memory for a temp page\n");
-                        GOTO(err, rc = -ENOMEM);
-                }
-                page->index = index;
-                lnb->page = page;
-                lnb->flags |= N_LOCAL_TEMP_PAGE;
-        } else if (!IS_ERR(page)) {
-                unsigned from = lnb->offset & ~PAGE_MASK, to = from + lnb->len;
-                (*pglocked)++;
-
-                rc = mapping->a_ops->prepare_write(NULL, page, from, to);
-                if (rc) {
-                        if (rc != -ENOSPC)
-                                CERROR("page index %lu, rc = %d\n", index, rc);
-                        GOTO(err_unlock, rc);
-                }
-                /* XXX not sure if we need this if we are overwriting page */
-                if (PageError(page)) {
-                        CERROR("error on page index %lu, rc = %d\n", index, rc);
-                        LBUG();
-                        GOTO(err_unlock, rc = -EIO);
-                }
-                lnb->page = page;
-        }
-
-        return 0;
-
-err_unlock:
-        unlock_page(page);
-        page_cache_release(page);
-err:
-        return lnb->rc = rc;
-}
-
 static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
                               int objcount, struct obd_ioobj *obj,
                               int niocount, struct niobuf_remote *nb,
@@ -240,7 +108,7 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
         struct obd_run_ctxt saved;
         struct obd_ioobj *o;
         struct niobuf_remote *rnb;
-        struct niobuf_local *lnb;
+        struct niobuf_local *lnb = NULL;
         struct fsfilt_objinfo *fso;
         struct dentry *dentry;
         struct inode *inode;
@@ -258,9 +126,8 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
 
         memset(res, 0, niocount * sizeof(*res));
 
-        push_ctxt(&saved, &exp->exp_obd->u.filter.fo_ctxt, NULL);
+        push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
         for (i = 0, o = obj; i < objcount; i++, o++) {
-                struct filter_dentry_data *fdd;
                 LASSERT(o->ioo_bufcnt);
 
                 dentry = filter_oa2dentry(exp->exp_obd, oa);
@@ -276,15 +143,13 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
 
                 fso[i].fso_dentry = dentry;
                 fso[i].fso_bufcnt = o->ioo_bufcnt;
-
-                fdd = dentry->d_fsdata;
-                if (fdd == NULL || !atomic_read(&fdd->fdd_open_count))
-                        CDEBUG(D_PAGE, "I/O to unopened object "LPU64"\n",
-                               o->ioo_id);
         }
 
         if (time_after(jiffies, now + 15 * HZ))
-                CERROR("slow prep setup %lus\n", (jiffies - now) / HZ);
+                CERROR("slow preprw_read setup %lus\n", (jiffies - now) / HZ);
+        else
+                CDEBUG(D_INFO, "preprw_read setup: %lu jiffies\n",
+                       (jiffies - now));
 
         for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
                 dentry = fso[i].fso_dentry;
@@ -325,7 +190,10 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
         }
 
         if (time_after(jiffies, now + 15 * HZ))
-                CERROR("slow prep get page %lus\n", (jiffies - now) / HZ);
+                CERROR("slow start_page_read %lus\n", (jiffies - now) / HZ);
+        else
+                CDEBUG(D_INFO, "start_page_read: %lu jiffies\n",
+                       (jiffies - now));
 
         lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_READ_BYTES,
                             tot_bytes);
@@ -340,7 +208,10 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
         }
 
         if (time_after(jiffies, now + 15 * HZ))
-                CERROR("slow prep finish page %lus\n", (jiffies - now) / HZ);
+                CERROR("slow finish_page_read %lus\n", (jiffies - now) / HZ);
+        else
+                CDEBUG(D_INFO, "finish_page_read: %lu jiffies\n",
+                       (jiffies - now));
 
         EXIT;
 
@@ -355,49 +226,26 @@ static int filter_preprw_read(int cmd, struct obd_export *exp, struct obdo *oa,
                         f_dput(res->dentry);
                 else
                         CERROR("NULL dentry in cleanup -- tell CFS\n");
-                res->dentry = NULL;
         case 0:
                 OBD_FREE(fso, objcount * sizeof(*fso));
-                pop_ctxt(&saved, &exp->exp_obd->u.filter.fo_ctxt, NULL);
+                pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
         }
         return rc;
 }
 
-/* We need to balance prepare_write() calls with commit_write() calls.
- * If the page has been prepared, but we have no data for it, we don't
- * want to overwrite valid data on disk, but we still need to zero out
- * data for space which was newly allocated.  Like part of what happens
- * in __block_prepare_write() for newly allocated blocks.
- *
- * XXX currently __block_prepare_write() creates buffers for all the
- *     pages, and the filesystems mark these buffers as BH_New if they
- *     were newly allocated from disk. We use the BH_New flag similarly. */
-static int filter_commit_write(struct niobuf_local *lnb, int err)
+static int filter_start_page_write(struct inode *inode,
+                                   struct niobuf_local *lnb)
 {
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
-        if (err) {
-                unsigned block_start, block_end;
-                struct buffer_head *bh, *head = lnb->page->buffers;
-                unsigned blocksize = head->b_size;
-
-                /* debugging: just seeing if this ever happens */
-                CDEBUG(err == -ENOSPC ? D_INODE : D_ERROR,
-                       "called for ino %lu:%lu on err %d\n",
-                       lnb->page->mapping->host->i_ino, lnb->page->index, err);
-
-                /* Currently one buffer per page, but in the future... */
-                for (bh = head, block_start = 0; bh != head || !block_start;
-                     block_start = block_end, bh = bh->b_this_page) {
-                        block_end = block_start + blocksize;
-                        if (buffer_new(bh)) {
-                                memset(kmap(lnb->page) + block_start, 0,
-                                       blocksize);
-                                kunmap(lnb->page);
-                        }
-                }
+        struct page *page = alloc_pages(GFP_HIGHUSER, 0);
+        if (page == NULL) {
+                CERROR("no memory for a temp page\n");
+                RETURN(lnb->rc = -ENOMEM);
         }
-#endif
-        return lustre_commit_write(lnb);
+        POISON_PAGE(page, 0xf1);
+        page->index = lnb->offset >> PAGE_SHIFT;
+        lnb->page = page;
+
+        return 0;
 }
 
 /* If we ever start to support multi-object BRW RPCs, we will need to get locks
@@ -417,124 +265,73 @@ static int filter_preprw_write(int cmd, struct obd_export *exp, struct obdo *oa,
                                struct obd_trans_info *oti)
 {
         struct obd_run_ctxt saved;
-        struct obd_ioobj *o;
         struct niobuf_remote *rnb;
-        struct niobuf_local *lnb;
-        struct fsfilt_objinfo *fso;
+        struct niobuf_local *lnb = NULL;
+        struct fsfilt_objinfo fso;
         struct dentry *dentry;
-        int pglocked = 0, rc = 0, i, j, tot_bytes = 0;
+        int rc = 0, i, tot_bytes = 0;
         unsigned long now = jiffies;
         ENTRY;
         LASSERT(objcount == 1);
-
-        OBD_ALLOC(fso, objcount * sizeof(*fso));
-        if (fso == NULL)
-                RETURN(-ENOMEM);
+        LASSERT(obj->ioo_bufcnt > 0);
 
         memset(res, 0, niocount * sizeof(*res));
 
-        push_ctxt(&saved, &exp->exp_obd->u.filter.fo_ctxt, NULL);
-        for (i = 0, o = obj; i < objcount; i++, o++) {
-                struct filter_dentry_data *fdd;
-                LASSERT(o->ioo_bufcnt);
-
-                dentry = filter_oa2dentry(exp->exp_obd, oa);
-                if (IS_ERR(dentry))
-                        GOTO(out_objinfo, rc = PTR_ERR(dentry));
-
-                if (dentry->d_inode == NULL) {
-                        CERROR("trying to BRW to non-existent file "LPU64"\n",
-                               o->ioo_id);
-                        f_dput(dentry);
-                        GOTO(out_objinfo, rc = -ENOENT);
-                }
-
-                fso[i].fso_dentry = dentry;
-                fso[i].fso_bufcnt = o->ioo_bufcnt;
-
-                down(&dentry->d_inode->i_sem);
-                fdd = dentry->d_fsdata;
-                if (fdd == NULL || !atomic_read(&fdd->fdd_open_count))
-                        CDEBUG(D_PAGE, "I/O to unopened object "LPU64"\n",
-                               o->ioo_id);
-        }
-
-        if (time_after(jiffies, now + 15 * HZ))
-                CERROR("slow prep setup %lus\n", (jiffies - now) / HZ);
-
-        LASSERT(oti != NULL);
-        oti->oti_handle = fsfilt_brw_start(exp->exp_obd, objcount, fso,
-                                           niocount, oti);
-        if (IS_ERR(oti->oti_handle)) {
-                rc = PTR_ERR(oti->oti_handle);
-                CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
-                       "error starting transaction: rc = %d\n", rc);
-                oti->oti_handle = NULL;
-                GOTO(out_objinfo, rc);
+        push_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
+        dentry = filter_fid2dentry(exp->exp_obd, NULL, obj->ioo_gr, 
+                                   obj->ioo_id);
+        if (IS_ERR(dentry))
+                GOTO(cleanup, rc = PTR_ERR(dentry));
+
+        if (dentry->d_inode == NULL) {
+                CERROR("trying to BRW to non-existent file "LPU64"\n",
+                       obj->ioo_id);
+                f_dput(dentry);
+                GOTO(cleanup, rc = -ENOENT);
         }
 
-        for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
-                dentry = fso[i].fso_dentry;
-                for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
-                        if (j == 0)
-                                lnb->dentry = dentry;
-                        else
-                                lnb->dentry = dget(dentry);
-
-                        lnb->offset = rnb->offset;
-                        lnb->len    = rnb->len;
-                        lnb->flags  = rnb->flags;
-                        lnb->start  = jiffies;
-
-                        rc = filter_get_page_write(dentry->d_inode, lnb,
-                                                   &pglocked);
-                        if (rc)
-                                up(&dentry->d_inode->i_sem);
+        fso.fso_dentry = dentry;
+        fso.fso_bufcnt = obj->ioo_bufcnt;
 
-                        if (rc) {
-                                CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
-                                       "page err %u@"LPU64" %u/%u %p: rc %d\n",
-                                       lnb->len, lnb->offset, j, o->ioo_bufcnt,
-                                       dentry, rc);
-                                f_dput(dentry);
-                                GOTO(out_pages, rc);
-                        }
-                        tot_bytes += lnb->len;
+        if (time_after(jiffies, now + 15 * HZ))
+                CERROR("slow preprw_write setup %lus\n", (jiffies - now) / HZ);
+        else
+                CDEBUG(D_INFO, "preprw_write setup: %lu jiffies\n",
+                       (jiffies - now));
+
+        for (i = 0, rnb = nb, lnb = res; i < obj->ioo_bufcnt;
+             i++, lnb++, rnb++) {
+                lnb->dentry = dentry;
+                lnb->offset = rnb->offset;
+                lnb->len    = rnb->len;
+                lnb->flags  = rnb->flags;
+                lnb->start  = jiffies;
+
+                rc = filter_start_page_write(dentry->d_inode, lnb);
+                if (rc) {
+                        CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, "page err %u@"
+                               LPU64" %u/%u %p: rc %d\n", lnb->len, lnb->offset,
+                               i, obj->ioo_bufcnt, dentry, rc);
+                        while (lnb-- > res)
+                                __free_pages(lnb->page, 0);
+                        f_dput(dentry);
+                        GOTO(cleanup, rc);
                 }
+                tot_bytes += lnb->len;
         }
 
         if (time_after(jiffies, now + 15 * HZ))
-                CERROR("slow prep get page %lus\n", (jiffies - now) / HZ);
+                CERROR("slow start_page_write %lus\n", (jiffies - now) / HZ);
+        else
+                CDEBUG(D_INFO, "start_page_write: %lu jiffies\n",
+                       (jiffies - now));
 
         lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
                             tot_bytes);
-
         EXIT;
-out:
-        OBD_FREE(fso, objcount * sizeof(*fso));
-        /* we saved the journal handle into oti->oti_handle instead */
-        current->journal_info = NULL;
-        pop_ctxt(&saved, &exp->exp_obd->u.filter.fo_ctxt, NULL);
+cleanup:
+        pop_ctxt(&saved, &exp->exp_obd->obd_ctxt, NULL);
         return rc;
-
-out_pages:
-        while (lnb-- > res) {
-                filter_commit_write(lnb, rc);
-                up(&lnb->dentry->d_inode->i_sem);
-                f_dput(lnb->dentry);
-        }
-        filter_finish_transno(exp, oti, rc);
-        fsfilt_commit(exp->exp_obd,
-                      filter_parent(exp->exp_obd,S_IFREG,obj->ioo_id)->d_inode,
-                      oti->oti_handle, 0);
-        goto out; /* dropped the dentry refs already (one per page) */
-
-out_objinfo:
-        for (i = 0; i < objcount && fso[i].fso_dentry; i++) {
-                up(&fso[i].fso_dentry->d_inode->i_sem);
-                f_dput(fso[i].fso_dentry);
-        }
-        goto out;
 }
 
 int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
@@ -551,58 +348,12 @@ int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
                                           niocount, nb, res, oti);
 
         LBUG();
-
         return -EPROTO;
 }
 
-/* It is highly unlikely that we would ever get an error here.  The page we want
- * to get was previously locked, so it had to have already allocated the space,
- * and we were just writing over the same data, so there would be no hole in the
- * file.
- *
- * XXX: possibility of a race with truncate could exist, need to check that.
- *      There are no guarantees w.r.t. write order even on a local filesystem,
- *      although the normal response would be to return the number of bytes
- *      successfully written and leave the rest to the app. */
-static int filter_write_locked_page(struct niobuf_local *lnb)
-{
-        struct page *lpage;
-        void *lpage_addr, *lnb_addr;
-        int rc;
-        ENTRY;
-
-        lpage = lustre_get_page_write(lnb->dentry->d_inode, lnb->page->index);
-        if (IS_ERR(lpage)) {
-                rc = PTR_ERR(lpage);
-                CERROR("error getting locked page index %ld: rc = %d\n",
-                       lnb->page->index, rc);
-                LBUG();
-                lustre_commit_write(lnb);
-                RETURN(rc);
-        }
-
-        /* 2 kmaps == vanishingly small deadlock opportunity */
-        lpage_addr = kmap(lpage);
-        lnb_addr = kmap(lnb->page);
-
-        memcpy(lpage_addr, lnb_addr, PAGE_SIZE);
-
-        kunmap(lnb->page);
-        kunmap(lpage);
-
-        page_cache_release(lnb->page);
-
-        lnb->page = lpage;
-        rc = lustre_commit_write(lnb);
-        if (rc)
-                CERROR("error committing locked page %ld: rc = %d\n",
-                       lnb->page->index, rc);
-        RETURN(rc);
-}
-
-static int filter_commitrw_read(struct obd_export *exp, int objcount,
-                                struct obd_ioobj *obj, int niocount,
-                                struct niobuf_local *res,
+static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa,
+                                int objcount, struct obd_ioobj *obj,
+                                int niocount, struct niobuf_local *res,
                                 struct obd_trans_info *oti)
 {
         struct obd_ioobj *o;
@@ -621,144 +372,47 @@ static int filter_commitrw_read(struct obd_export *exp, int objcount,
         RETURN(0);
 }
 
-static int
-filter_commitrw_write(int cmd, struct obd_export *exp, struct obdo *oa,
-                      int objcount, struct obd_ioobj *obj, int niocount,
-                      struct niobuf_local *res, struct obd_trans_info *oti)
+void flip_into_page_cache(struct inode *inode, struct page *new_page)
 {
-        struct obd_run_ctxt saved;
-        struct obd_ioobj *o;
-        struct niobuf_local *lnb;
-        struct obd_device *obd = exp->exp_obd;
-        int found_locked = 0, rc = 0, i;
-        int nested_trans = current->journal_info != NULL;
-        unsigned long now = jiffies;  /* DEBUGGING OST TIMEOUTS */
-        ENTRY;
-
-        push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
-
-        if (cmd & OBD_BRW_WRITE) {
-                LASSERT(oti);
-                LASSERT(current->journal_info == NULL ||
-                        current->journal_info == oti->oti_handle);
-                current->journal_info = oti->oti_handle;
-        }
-
-        for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
-                struct inode *inode;
-                int j;
-
-                /* If all of the page reads were beyond EOF, let's pretend
-                 * this read didn't really happen at all. */
-                if (lnb->dentry == NULL) {
-                        oa->o_valid = OBD_MD_FLID|(oa->o_valid&OBD_MD_FLCKSUM);
-                        continue;
-                }
-
-                inode = igrab(lnb->dentry->d_inode);
-
-                if (cmd & OBD_BRW_WRITE) {
-                        /* FIXME: MULTI OBJECT BRW */
-                        if (oa && oa->o_valid & (OBD_MD_FLMTIME|OBD_MD_FLCTIME))
-                                obdo_refresh_inode(inode, oa, OBD_MD_FLATIME |
-                                                   OBD_MD_FLMTIME |
-                                                   OBD_MD_FLCTIME);
-                        else
-                                inode_update_time(lnb->dentry->d_inode, 1);
-                } else if (oa && oa->o_valid & OBD_MD_FLATIME) {
-                        /* Note that we don't necessarily write this to disk */
-                        obdo_refresh_inode(inode, oa, OBD_MD_FLATIME);
-                }
-
-                for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
-                        if (lnb->page == NULL) {
-                                continue;
-                        }
-
-                        if (lnb->flags & N_LOCAL_TEMP_PAGE) {
-                                found_locked++;
-                                continue;
-                        }
-
-                        if (time_after(jiffies, lnb->start + 15 * HZ))
-                                CERROR("slow commitrw %lusi (%lus)\n",
-                                       (jiffies - lnb->start) / HZ,
-                                       (jiffies - now) / HZ);
-
-                        if (cmd & OBD_BRW_WRITE) {
-                                int err = filter_commit_write(lnb, 0);
-
-                                if (!rc)
-                                        rc = err;
-                        } else {
-                                page_cache_release(lnb->page);
-                        }
+        struct page *old_page;
+        int rc;
 
-                        f_dput(lnb->dentry);
-                        if (time_after(jiffies, lnb->start + 15 * HZ))
-                                CERROR("slow commit_write %lus (%lus)\n",
-                                       (jiffies - lnb->start) / HZ,
-                                       (jiffies - now) / HZ);
+        do {
+                /* the dlm is protecting us from read/write concurrency, so we
+                 * expect this find_lock_page to return quickly.  even if we
+                 * race with another writer it won't be doing much work with
+                 * the page locked.  we do this 'cause t_c_p expects a 
+                 * locked page, and it wants to grab the pagecache lock
+                 * as well. */
+                old_page = find_lock_page(inode->i_mapping, new_page->index);
+                if (old_page) {
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+                        truncate_complete_page(old_page);
+#else
+                        truncate_complete_page(old_page->mapping, old_page);
+#endif
+                        unlock_page(old_page);
+                        page_cache_release(old_page);
                 }
 
-                /* FIXME: MULTI OBJECT BRW */
-                if (oa) {
-                        oa->o_valid = OBD_MD_FLID|(oa->o_valid&OBD_MD_FLCKSUM);
-                        obdo_from_inode(oa, inode, FILTER_VALID_FLAGS);
+#if 0 /* this should be a /proc tunable someday */
+                /* racing o_directs (no locking ioctl) could race adding
+                 * their pages, so we repeat the page invalidation unless
+                 * we successfully added our new page */
+                rc = add_to_page_cache_unique(new_page, inode->i_mapping, 
+                                              new_page->index,
+                                              page_hash(inode->i_mapping, 
+                                                        new_page->index));
+                if (rc == 0) {
+                        /* add_to_page_cache clears uptodate|dirty and locks
+                         * the page */
+                        SetPageUptodate(new_page);
+                        unlock_page(new_page);
                 }
-
-                if (cmd & OBD_BRW_WRITE)
-                        up(&inode->i_sem);
-
-                iput(inode);
-        }
-
-        for (i = 0, o = obj, lnb = res; found_locked > 0 && i < objcount;
-             i++, o++) {
-                int j;
-
-                for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
-                        int err;
-                        if (!(lnb->flags & N_LOCAL_TEMP_PAGE))
-                                continue;
-
-                        if (time_after(jiffies, lnb->start + 15 * HZ))
-                                CERROR("slow commitrw locked %lus (%lus)\n",
-                                       (jiffies - lnb->start) / HZ,
-                                       (jiffies - now) / HZ);
-
-                        err = filter_write_locked_page(lnb);
-                        if (!rc)
-                                rc = err;
-                        f_dput(lnb->dentry);
-                        found_locked--;
-
-                        if (time_after(jiffies, lnb->start + 15 * HZ))
-                                CERROR("slow commit_write locked %lus (%lus)\n",
-                                       (jiffies - lnb->start) / HZ,
-                                       (jiffies - now) / HZ);
-                }
-        }
-
-        if (cmd & OBD_BRW_WRITE) {
-                /* We just want any dentry for the commit, for now */
-                struct dentry *dparent = filter_parent(obd, S_IFREG, 0);
-                int err;
-
-                rc = filter_finish_transno(exp, oti, rc);
-                err = fsfilt_commit(obd, dparent->d_inode, oti->oti_handle,
-                                    obd_sync_filter);
-                if (err)
-                        rc = err;
-                if (obd_sync_filter)
-                        LASSERT(oti->oti_transno <= obd->obd_last_committed);
-                if (time_after(jiffies, now + 15 * HZ))
-                        CERROR("slow commitrw commit %lus\n", (jiffies-now)/HZ);
-        }
-
-        LASSERT(nested_trans || current->journal_info == NULL);
-        pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
-        RETURN(rc);
+#else   
+                rc = 0;
+#endif
+        } while (rc != 0);
 }
 
 /* XXX needs to trickle its oa down */
@@ -767,20 +421,19 @@ int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
                     struct niobuf_local *res, struct obd_trans_info *oti)
 {
         if (cmd == OBD_BRW_WRITE)
-                return filter_commitrw_write(cmd, exp, oa, objcount, obj,
-                                             niocount, res, oti);
+                return filter_commitrw_write(exp, oa, objcount, obj, niocount,
+                                             res, oti);
         if (cmd == OBD_BRW_READ)
-                return filter_commitrw_read(exp, objcount, obj, niocount,
+                return filter_commitrw_read(exp, oa, objcount, obj, niocount,
                                             res, oti);
         LBUG();
         return -EPROTO;
 }
 
-int filter_brw(int cmd, struct lustre_handle *conn, struct obdo *oa,
+int filter_brw(int cmd, struct obd_export *exp, struct obdo *oa,
                struct lov_stripe_md *lsm, obd_count oa_bufs,
                struct brw_page *pga, struct obd_trans_info *oti)
 {
-        struct obd_export *exp;
         struct obd_ioobj ioo;
         struct niobuf_local *lnb;
         struct niobuf_remote *rnb;
@@ -788,12 +441,6 @@ int filter_brw(int cmd, struct lustre_handle *conn, struct obdo *oa,
         int ret = 0;
         ENTRY;
 
-        exp = class_conn2export(conn);
-        if (exp == NULL) {
-                CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",conn->cookie);
-                RETURN(-EINVAL);
-        }
-
         OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));
         OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));
 
@@ -805,9 +452,7 @@ int filter_brw(int cmd, struct lustre_handle *conn, struct obdo *oa,
                 rnb[i].len = pga[i].count;
         }
 
-        ioo.ioo_id = oa->o_id;
-        ioo.ioo_gr = 0;
-        ioo.ioo_type = oa->o_mode & S_IFMT;
+        obdo_to_ioobj(oa, &ioo);
         ioo.ioo_bufcnt = oa_bufs;
 
         ret = filter_preprw(cmd, exp, oa, 1, &ioo, oa_bufs, rnb, lnb, oti);
@@ -826,8 +471,8 @@ int filter_brw(int cmd, struct lustre_handle *conn, struct obdo *oa,
                 else
                         memcpy(virt + off, addr + off, pga[i].count);
 
-                kunmap(addr);
-                kunmap(virt);
+                kunmap(lnb[i].page);
+                kunmap(pga[i].pg);
         }
 
         ret = filter_commitrw(cmd, exp, oa, 1, &ioo, oa_bufs, lnb, oti);
@@ -837,6 +482,5 @@ out:
                 OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));
         if (rnb)
                 OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));
-        class_export_put(exp);
         RETURN(ret);
 }