Whamcloud - gitweb
b=1141
authorpschwan <pschwan>
Sun, 6 Jul 2003 20:07:45 +0000 (20:07 +0000)
committerpschwan <pschwan>
Sun, 6 Jul 2003 20:07:45 +0000 (20:07 +0000)
Another mostly-reorganization commit, to reduce the size of my working
patch set.

 - remove gratuitous lustre_put_page() inline
 - split filter_preprw() into filter_preprw_read() and filter_preprw_write()
 - reverse the 2.5 waitfor_one_page macro into a 2.4 wait_on_page_locked macro
 - unimportant changes galore

lustre/obdfilter/filter_io.c

index 47d8544..8c3ed83 100644 (file)
 #include <linux/lustre_fsfilt.h>
 #include "filter_internal.h"
 
-static inline void lustre_put_page(struct page *page)
-{
-        page_cache_release(page);
-}
-
 static int filter_start_page_read(struct inode *inode, struct niobuf_local *lnb)
 {
         struct address_space *mapping = inode->i_mapping;
@@ -67,7 +62,7 @@ static int filter_start_page_read(struct inode *inode, struct niobuf_local *lnb)
         if (rc < 0) {
                 CERROR("page index %lu, rc = %d\n", index, rc);
                 lnb->page = NULL;
-                lustre_put_page(page);
+                page_cache_release(page);
                 return lnb->rc = rc;
         }
 
@@ -97,7 +92,7 @@ static int filter_finish_page_read(struct niobuf_local *lnb)
         return 0;
 
 err_page:
-        lustre_put_page(lnb->page);
+        page_cache_release(lnb->page);
         lnb->page = NULL;
         return lnb->rc;
 }
@@ -134,22 +129,19 @@ static struct page *lustre_get_page_write(struct inode *inode,
 
 err_unlock:
         unlock_page(page);
-        lustre_put_page(page);
+        page_cache_release(page);
         return ERR_PTR(rc);
 }
 
-#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
-int waitfor_one_page(struct page *page)
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+int wait_on_page_locked(struct page *page)
 {
-        wait_on_page_locked(page);
+        waitfor_one_page(page);
         return 0;
 }
-#endif
 
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
 /* We should only change the file mtime (and not the ctime, like
- * update_inode_times() in generic_file_write()) when we only change data.
- */
+ * update_inode_times() in generic_file_write()) when we only change data. */
 static inline void inode_update_time(struct inode *inode, int ctime_too)
 {
         time_t now = CURRENT_TIME;
@@ -173,12 +165,12 @@ static int lustre_commit_write(struct niobuf_local *lnb)
         LASSERT(to <= PAGE_SIZE);
         err = page->mapping->a_ops->commit_write(NULL, page, from, to);
         if (!err && IS_SYNC(inode))
-                err = waitfor_one_page(page);
+                err = wait_on_page_locked(page);
         //SetPageUptodate(page); // the client commit_write will do this
 
         SetPageReferenced(page);
         unlock_page(page);
-        lustre_put_page(page);
+        page_cache_release(page);
         return err;
 }
 
@@ -198,10 +190,10 @@ int filter_get_page_write(struct inode *inode, struct niobuf_local *lnb,
 
 
         /* This page is currently locked, so get a temporary page instead. */
-        if (!page) {
+        if (page == NULL) {
                 CDEBUG(D_ERROR,"ino %lu page %ld locked\n", inode->i_ino,index);
                 page = alloc_pages(GFP_KERNEL, 0); /* locked page */
-                if (!page) {
+                if (page == NULL) {
                         CERROR("no memory for a temp page\n");
                         GOTO(err, rc = -ENOMEM);
                 }
@@ -232,13 +224,147 @@ int filter_get_page_write(struct inode *inode, struct niobuf_local *lnb,
 
 err_unlock:
         unlock_page(page);
-        lustre_put_page(page);
+        page_cache_release(page);
 err:
         return lnb->rc = rc;
 }
 
-/*
- * We need to balance prepare_write() calls with commit_write() calls.
+static int filter_preprw_read(struct obd_export *exp, struct obdo *obdo,
+                              int objcount, struct obd_ioobj *obj,
+                              int niocount, struct niobuf_remote *nb,
+                              struct niobuf_local *res,
+                              struct obd_trans_info *oti)
+{
+        struct obd_run_ctxt saved;
+        struct obd_device *obd;
+        struct obd_ioobj *o;
+        struct niobuf_remote *rnb;
+        struct niobuf_local *lnb;
+        struct fsfilt_objinfo *fso;
+        struct dentry *dentry;
+        struct inode *inode;
+        int rc = 0, i, j, tot_bytes = 0, cleanup_phase = 0;
+        unsigned long now = jiffies;
+        ENTRY;
+        LASSERT(objcount == 1);
+
+        obd = exp->exp_obd;
+        if (obd == NULL)
+                RETURN(-EINVAL);
+        OBD_ALLOC(fso, objcount * sizeof(*fso));
+        if (fso == NULL)
+                RETURN(-ENOMEM);
+
+        memset(res, 0, niocount * sizeof(*res));
+
+        push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+        for (i = 0, o = obj; i < objcount; i++, o++) {
+                struct filter_dentry_data *fdd;
+                LASSERT(o->ioo_bufcnt);
+
+                dentry = filter_fid2dentry(obd, NULL, o->ioo_type, o->ioo_id);
+                if (IS_ERR(dentry))
+                        GOTO(out_objinfo, rc = PTR_ERR(dentry));
+
+                if (dentry->d_inode == NULL) {
+                        CERROR("trying to BRW to non-existent file "LPU64"\n",
+                               o->ioo_id);
+                        f_dput(dentry);
+                        GOTO(out_objinfo, rc = -ENOENT);
+                }
+
+                fso[i].fso_dentry = dentry;
+                fso[i].fso_bufcnt = o->ioo_bufcnt;
+
+                fdd = dentry->d_fsdata;
+                if (fdd == NULL || !atomic_read(&fdd->fdd_open_count))
+                        CDEBUG(D_PAGE, "I/O to unopened object "LPU64"\n",
+                               o->ioo_id);
+        }
+
+        if (time_after(jiffies, now + 15 * HZ))
+                CERROR("slow prep setup %lus\n", (jiffies - now) / HZ);
+
+        for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
+                dentry = fso[i].fso_dentry;
+                inode = dentry->d_inode;
+
+                for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
+                        if (j == 0)
+                                lnb->dentry = dentry;
+                        else
+                                lnb->dentry = dget(dentry);
+
+                        lnb->offset = rnb->offset;
+                        lnb->len    = rnb->len;
+                        lnb->flags  = rnb->flags;
+                        lnb->start  = jiffies;
+
+                        if (inode->i_size <= rnb->offset) {
+                                /* If there's no more data, abort early.
+                                 * lnb->page == NULL and lnb->rc == 0, so it's
+                                 * easy to detect later. */
+                                f_dput(dentry);
+                                lnb->dentry = NULL;
+                                break;
+                        } else {
+                                rc = filter_start_page_read(inode, lnb);
+                        }
+
+                        if (rc) {
+                                CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
+                                       "page err %u@"LPU64" %u/%u %p: rc %d\n",
+                                       lnb->len, lnb->offset, j, o->ioo_bufcnt,
+                                       dentry, rc);
+                                f_dput(dentry);
+                                GOTO(out_pages, rc);
+                        }
+
+                        tot_bytes += lnb->rc;
+                        if (lnb->rc < lnb->len)
+                                break; /* short read */
+                }
+        }
+
+        if (time_after(jiffies, now + 15 * HZ))
+                CERROR("slow prep get page %lus\n", (jiffies - now) / HZ);
+
+        lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES, tot_bytes);
+        while (lnb-- > res) {
+                rc = filter_finish_page_read(lnb);
+                if (rc) {
+                        CERROR("error page %u@"LPU64" %u %p: rc %d\n", lnb->len,
+                               lnb->offset, (int)(lnb - res), lnb->dentry, rc);
+                        f_dput(lnb->dentry);
+                        GOTO(out_pages, rc);
+                }
+        }
+
+        if (time_after(jiffies, now + 15 * HZ))
+                CERROR("slow prep finish page %lus\n", (jiffies - now) / HZ);
+
+        EXIT;
+out:
+        OBD_FREE(fso, objcount * sizeof(*fso));
+        /* we saved the journal handle into oti->oti_handle instead */
+        current->journal_info = NULL;
+        pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+        return rc;
+
+out_pages:
+        while (lnb-- > res) {
+                page_cache_release(lnb->page);
+                f_dput(lnb->dentry);
+        }
+        goto out; /* dropped the dentry refs already (one per page) */
+
+out_objinfo:
+        for (i = 0; i < objcount && fso[i].fso_dentry; i++)
+                f_dput(fso[i].fso_dentry);
+        goto out;
+}
+
+/* We need to balance prepare_write() calls with commit_write() calls.
  * If the page has been prepared, but we have no data for it, we don't
  * want to overwrite valid data on disk, but we still need to zero out
  * data for space which was newly allocated.  Like part of what happens
@@ -246,8 +372,7 @@ err:
  *
  * XXX currently __block_prepare_write() creates buffers for all the
  *     pages, and the filesystems mark these buffers as BH_New if they
- *     were newly allocated from disk. We use the BH_New flag similarly.
- */
+ *     were newly allocated from disk. We use the BH_New flag similarly. */
 static int filter_commit_write(struct niobuf_local *lnb, int err)
 {
 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
@@ -276,10 +401,21 @@ static int filter_commit_write(struct niobuf_local *lnb, int err)
         return lustre_commit_write(lnb);
 }
 
-int filter_preprw(int cmd, struct obd_export *exp, struct obdo *obdo,
-                  int objcount, struct obd_ioobj *obj,
-                  int niocount, struct niobuf_remote *nb,
-                  struct niobuf_local *res, struct obd_trans_info *oti)
+/* If we ever start to support multi-object BRW RPCs, we will need to get locks
+ * on mulitple inodes.  That isn't all, because there still exists the
+ * possibility of a truncate starting a new transaction while holding the ext3
+ * rwsem = write while some writes (which have started their transactions here)
+ * blocking on the ext3 rwsem = read => lock inversion.
+ *
+ * The handling gets very ugly when dealing with locked pages.  It may be easier
+ * to just get rid of the locked page code (which has problems of its own) and
+ * either discover we do not need it anymore (i.e. it was a symptom of another
+ * bug) or ensure we get the page locks in an appropriate order. */
+static int filter_preprw_write(struct obd_export *exp, struct obdo *obdo,
+                               int objcount, struct obd_ioobj *obj,
+                               int niocount, struct niobuf_remote *nb,
+                               struct niobuf_local *res,
+                               struct obd_trans_info *oti)
 {
         struct obd_run_ctxt saved;
         struct obd_device *obd;
@@ -288,65 +424,42 @@ int filter_preprw(int cmd, struct obd_export *exp, struct obdo *obdo,
         struct niobuf_local *lnb;
         struct fsfilt_objinfo *fso;
         struct dentry *dentry;
-        struct inode *inode;
-        int pglocked = 0, rc = 0, i, j, tot_bytes = 0;
+        int pglocked = 0, rc = 0, i, j, tot_bytes = 0, cleanup_phase = 0;
         unsigned long now = jiffies;
         ENTRY;
-
-        memset(res, 0, niocount * sizeof(*res));
+        LASSERT(objcount == 1);
 
         obd = exp->exp_obd;
         if (obd == NULL)
                 RETURN(-EINVAL);
-
-        // theoretically we support multi-obj BRW RPCs, but until then...
-        LASSERT(objcount == 1);
-
         OBD_ALLOC(fso, objcount * sizeof(*fso));
-        if (!fso)
+        if (fso == NULL)
                 RETURN(-ENOMEM);
 
-        push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+        memset(res, 0, niocount * sizeof(*res));
 
+        push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
         for (i = 0, o = obj; i < objcount; i++, o++) {
                 struct filter_dentry_data *fdd;
-
                 LASSERT(o->ioo_bufcnt);
 
                 dentry = filter_fid2dentry(obd, NULL, o->ioo_type, o->ioo_id);
-
                 if (IS_ERR(dentry))
                         GOTO(out_objinfo, rc = PTR_ERR(dentry));
 
-                fso[i].fso_dentry = dentry;
-                fso[i].fso_bufcnt = o->ioo_bufcnt;
-
-                if (!dentry->d_inode) {
+                if (dentry->d_inode == NULL) {
                         CERROR("trying to BRW to non-existent file "LPU64"\n",
                                o->ioo_id);
                         f_dput(dentry);
                         GOTO(out_objinfo, rc = -ENOENT);
                 }
 
-                /* If we ever start to support mutli-object BRW RPCs, we will
-                 * need to get locks on mulitple inodes (in order) or use the
-                 * DLM to do the locking for us (and use the same locking in
-                 * filter_setattr() for truncate).  That isn't all, because
-                 * there still exists the possibility of a truncate starting
-                 * a new transaction while holding the ext3 rwsem = write
-                 * while some writes (which have started their transactions
-                 * here) blocking on the ext3 rwsem = read => lock inversion.
-                 *
-                 * The handling gets very ugly when dealing with locked pages.
-                 * It may be easier to just get rid of the locked page code
-                 * (which has problems of its own) and either discover we do
-                 * not need it anymore (i.e. it was a symptom of another bug)
-                 * or ensure we get the page locks in an appropriate order.
-                 */
-                if (cmd & OBD_BRW_WRITE)
-                        down(&dentry->d_inode->i_sem);
+                fso[i].fso_dentry = dentry;
+                fso[i].fso_bufcnt = o->ioo_bufcnt;
+
+                down(&dentry->d_inode->i_sem);
                 fdd = dentry->d_fsdata;
-                if (!fdd || !atomic_read(&fdd->fdd_open_count))
+                if (fdd == NULL || !atomic_read(&fdd->fdd_open_count))
                         CDEBUG(D_PAGE, "I/O to unopened object "LPU64"\n",
                                o->ioo_id);
         }
@@ -354,33 +467,18 @@ int filter_preprw(int cmd, struct obd_export *exp, struct obdo *obdo,
         if (time_after(jiffies, now + 15 * HZ))
                 CERROR("slow prep setup %lus\n", (jiffies - now) / HZ);
 
-        if (cmd & OBD_BRW_WRITE) {
-#warning "FIXME: we need inode->i_sem for each object to protect vs truncate"
-                LASSERT(oti);
-                /* Even worse, we need to get locks on mulitple inodes (in
-                 * order) or use the DLM to do the locking for us (and use
-                 * the same locking in filter_setattr() for truncate.  The
-                 * handling gets very ugly when dealing with locked pages.
-                 * It may be easier to just get rid of the locked page code
-                 * (which has problems of its own) and either discover we do
-                 * not need it anymore (i.e. it was a symptom of another bug)
-                 * or ensure we get the page locks in an appropriate order.
-                 */
-                oti->oti_handle = fsfilt_brw_start(obd, objcount, fso, niocount,
-                                                   oti->oti_handle);
-                if (IS_ERR(oti->oti_handle)) {
-                        rc = PTR_ERR(oti->oti_handle);
-                        CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
-                               "error starting transaction: rc = %d\n", rc);
-                        oti->oti_handle = NULL;
-                        GOTO(out_objinfo, rc);
-                }
+        LASSERT(oti != NULL);
+        oti->oti_handle = fsfilt_brw_start(obd, objcount, fso, niocount, oti);
+        if (IS_ERR(oti->oti_handle)) {
+                rc = PTR_ERR(oti->oti_handle);
+                CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
+                       "error starting transaction: rc = %d\n", rc);
+                oti->oti_handle = NULL;
+                GOTO(out_objinfo, rc);
         }
 
         for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
                 dentry = fso[i].fso_dentry;
-                inode = dentry->d_inode;
-
                 for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
                         if (j == 0)
                                 lnb->dentry = dentry;
@@ -392,20 +490,10 @@ int filter_preprw(int cmd, struct obd_export *exp, struct obdo *obdo,
                         lnb->flags  = rnb->flags;
                         lnb->start  = jiffies;
 
-                        if (cmd & OBD_BRW_WRITE) {
-                                rc = filter_get_page_write(inode,lnb,&pglocked);
-                                if (rc)
-                                        up(&dentry->d_inode->i_sem);
-                        } else if (inode->i_size <= rnb->offset) {
-                                /* If there's no more data, abort early.
-                                 * lnb->page == NULL and lnb->rc == 0, so it's
-                                 * easy to detect later. */
-                                f_dput(dentry);
-                                lnb->dentry = NULL;
-                                break;
-                        } else {
-                                rc = filter_start_page_read(inode, lnb);
-                        }
+                        rc = filter_get_page_write(dentry->d_inode, lnb,
+                                                   &pglocked);
+                        if (rc)
+                                up(&dentry->d_inode->i_sem);
 
                         if (rc) {
                                 CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
@@ -415,38 +503,14 @@ int filter_preprw(int cmd, struct obd_export *exp, struct obdo *obdo,
                                 f_dput(dentry);
                                 GOTO(out_pages, rc);
                         }
-
                         tot_bytes += lnb->len;
-
-                        if ((cmd & OBD_BRW_READ) && lnb->rc < lnb->len) {
-                                /* Likewise with a partial read */
-                                break;
-                        }
                 }
         }
 
         if (time_after(jiffies, now + 15 * HZ))
                 CERROR("slow prep get page %lus\n", (jiffies - now) / HZ);
 
-        if (cmd & OBD_BRW_READ) {
-                lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES,
-                                    tot_bytes);
-                while (lnb-- > res) {
-                        rc = filter_finish_page_read(lnb);
-                        if (rc) {
-                                CERROR("error page %u@"LPU64" %u %p: rc %d\n",
-                                       lnb->len, lnb->offset, (int)(lnb - res),
-                                       lnb->dentry, rc);
-                                f_dput(lnb->dentry);
-                                GOTO(out_pages, rc);
-                        }
-                }
-        } else
-                lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
-                                    tot_bytes);
-
-        if (time_after(jiffies, now + 15 * HZ))
-                CERROR("slow prep finish page %lus\n", (jiffies - now) / HZ);
+        lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,tot_bytes);
 
         EXIT;
 out:
@@ -458,52 +522,56 @@ out:
 
 out_pages:
         while (lnb-- > res) {
-                if (cmd & OBD_BRW_WRITE) {
-                        filter_commit_write(lnb, rc);
-                        up(&lnb->dentry->d_inode->i_sem);
-                } else {
-                        lustre_put_page(lnb->page);
-                }
+                filter_commit_write(lnb, rc);
+                up(&lnb->dentry->d_inode->i_sem);
                 f_dput(lnb->dentry);
         }
-        if (cmd & OBD_BRW_WRITE) {
-                filter_finish_transno(exp, oti, rc);
-                fsfilt_commit(obd,
-                              filter_parent(obd,S_IFREG,obj->ioo_id)->d_inode,
-                              oti->oti_handle, 0);
-        }
+        filter_finish_transno(exp, oti, rc);
+        fsfilt_commit(obd, filter_parent(obd,S_IFREG,obj->ioo_id)->d_inode,
+                      oti->oti_handle, 0);
         goto out; /* dropped the dentry refs already (one per page) */
 
 out_objinfo:
         for (i = 0; i < objcount && fso[i].fso_dentry; i++) {
-                if (cmd & OBD_BRW_WRITE)
-                        up(&fso[i].fso_dentry->d_inode->i_sem);
+                up(&fso[i].fso_dentry->d_inode->i_sem);
                 f_dput(fso[i].fso_dentry);
         }
         goto out;
 }
 
+int filter_preprw(int cmd, struct obd_export *exp, struct obdo *obdo,
+                  int objcount, struct obd_ioobj *obj, int niocount,
+                  struct niobuf_remote *nb, struct niobuf_local *res,
+                  struct obd_trans_info *oti)
+{
+        if (cmd == OBD_BRW_WRITE)
+                return filter_preprw_write(exp, obdo, objcount, obj, niocount,
+                                           nb, res, oti);
+        else if (cmd == OBD_BRW_READ)
+                return filter_preprw_read(exp, obdo, objcount, obj, niocount,
+                                          nb, res, oti);
+        else
+                LBUG();
+}
+
+/* It is highly unlikely that we would ever get an error here.  The page we want
+ * to get was previously locked, so it had to have already allocated the space,
+ * and we were just writing over the same data, so there would be no hole in the
+ * file.
+ *
+ * XXX: possibility of a race with truncate could exist, need to check that.
+ *      There are no guarantees w.r.t. write order even on a local filesystem,
+ *      although the normal response would be to return the number of bytes
+ *      successfully written and leave the rest to the app. */
 static int filter_write_locked_page(struct niobuf_local *lnb)
 {
         struct page *lpage;
-        void        *lpage_addr;
-        void        *lnb_addr;
+        void *lpage_addr, *lnb_addr;
         int rc;
         ENTRY;
 
         lpage = lustre_get_page_write(lnb->dentry->d_inode, lnb->page->index);
         if (IS_ERR(lpage)) {
-                /* It is highly unlikely that we would ever get an error here.
-                 * The page we want to get was previously locked, so it had to
-                 * have already allocated the space, and we were just writing
-                 * over the same data, so there would be no hole in the file.
-                 *
-                 * XXX: possibility of a race with truncate could exist, need
-                 *      to check that.  There are no guarantees w.r.t.
-                 *      write order even on a local filesystem, although the
-                 *      normal response would be to return the number of bytes
-                 *      successfully written and leave the rest to the app.
-                 */
                 rc = PTR_ERR(lpage);
                 CERROR("error getting locked page index %ld: rc = %d\n",
                        lnb->page->index, rc);
@@ -521,21 +589,19 @@ static int filter_write_locked_page(struct niobuf_local *lnb)
         kunmap(lnb->page);
         kunmap(lpage);
 
-        lustre_put_page(lnb->page);
+        page_cache_release(lnb->page);
 
         lnb->page = lpage;
         rc = lustre_commit_write(lnb);
         if (rc)
                 CERROR("error committing locked page %ld: rc = %d\n",
                        lnb->page->index, rc);
-
         RETURN(rc);
 }
 
-int filter_commitrw(int cmd, struct obd_export *exp,
-                    int objcount, struct obd_ioobj *obj,
-                    int niocount, struct niobuf_local *res,
-                    struct obd_trans_info *oti)
+int filter_commitrw(int cmd, struct obd_export *exp, int objcount,
+                    struct obd_ioobj *obj, int niocount,
+                    struct niobuf_local *res, struct obd_trans_info *oti)
 {
         struct obd_run_ctxt saved;
         struct obd_ioobj *o;
@@ -582,7 +648,7 @@ int filter_commitrw(int cmd, struct obd_export *exp,
                                 if (!rc)
                                         rc = err;
                         } else {
-                                lustre_put_page(lnb->page);
+                                page_cache_release(lnb->page);
                         }
 
                         f_dput(lnb->dentry);
@@ -633,21 +699,20 @@ int filter_commitrw(int cmd, struct obd_export *exp,
         }
 
         LASSERT(nested_trans || current->journal_info == NULL);
-
         pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
         RETURN(rc);
 }
 
-int filter_brw(int cmd, struct lustre_handle *conn,
-               struct lov_stripe_md *lsm, obd_count oa_bufs,
-               struct brw_page *pga, struct obd_trans_info *oti)
+int filter_brw(int cmd, struct lustre_handle *conn, struct lov_stripe_md *lsm,
+               obd_count oa_bufs, struct brw_page *pga,
+               struct obd_trans_info *oti)
 {
-        struct obd_export      *exp = class_conn2export(conn);
-        struct obd_ioobj        ioo;
-        struct niobuf_local    *lnb;
-        struct niobuf_remote   *rnb;
-        obd_count               i;
-        int                     ret = 0;
+        struct obd_export *exp = class_conn2export(conn);
+        struct obd_ioobj ioo;
+        struct niobuf_local *lnb;
+        struct niobuf_remote *rnb;
+        obd_count i;
+        int ret = 0;
         ENTRY;
 
         if (exp == NULL)