#include <linux/lustre_fsfilt.h>
#include "filter_internal.h"
-static inline void lustre_put_page(struct page *page)
-{
- page_cache_release(page);
-}
-
static int filter_start_page_read(struct inode *inode, struct niobuf_local *lnb)
{
struct address_space *mapping = inode->i_mapping;
if (rc < 0) {
CERROR("page index %lu, rc = %d\n", index, rc);
lnb->page = NULL;
- lustre_put_page(page);
+ page_cache_release(page);
return lnb->rc = rc;
}
return 0;
err_page:
- lustre_put_page(lnb->page);
+ page_cache_release(lnb->page);
lnb->page = NULL;
return lnb->rc;
}
err_unlock:
unlock_page(page);
- lustre_put_page(page);
+ page_cache_release(page);
return ERR_PTR(rc);
}
-#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
-int waitfor_one_page(struct page *page)
+#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
+int wait_on_page_locked(struct page *page)
{
- wait_on_page_locked(page);
+ waitfor_one_page(page);
return 0;
}
-#endif
-#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
/* We should only change the file mtime (and not the ctime, like
- * update_inode_times() in generic_file_write()) when we only change data.
- */
+ * update_inode_times() in generic_file_write()) when we only change data. */
static inline void inode_update_time(struct inode *inode, int ctime_too)
{
time_t now = CURRENT_TIME;
LASSERT(to <= PAGE_SIZE);
err = page->mapping->a_ops->commit_write(NULL, page, from, to);
if (!err && IS_SYNC(inode))
- err = waitfor_one_page(page);
+ err = wait_on_page_locked(page);
//SetPageUptodate(page); // the client commit_write will do this
SetPageReferenced(page);
unlock_page(page);
- lustre_put_page(page);
+ page_cache_release(page);
return err;
}
/* This page is currently locked, so get a temporary page instead. */
- if (!page) {
+ if (page == NULL) {
CDEBUG(D_ERROR,"ino %lu page %ld locked\n", inode->i_ino,index);
page = alloc_pages(GFP_KERNEL, 0); /* locked page */
- if (!page) {
+ if (page == NULL) {
CERROR("no memory for a temp page\n");
GOTO(err, rc = -ENOMEM);
}
err_unlock:
unlock_page(page);
- lustre_put_page(page);
+ page_cache_release(page);
err:
return lnb->rc = rc;
}
-/*
- * We need to balance prepare_write() calls with commit_write() calls.
+static int filter_preprw_read(struct obd_export *exp, struct obdo *obdo,
+ int objcount, struct obd_ioobj *obj,
+ int niocount, struct niobuf_remote *nb,
+ struct niobuf_local *res,
+ struct obd_trans_info *oti)
+{
+ struct obd_run_ctxt saved;
+ struct obd_device *obd;
+ struct obd_ioobj *o;
+ struct niobuf_remote *rnb;
+ struct niobuf_local *lnb;
+ struct fsfilt_objinfo *fso;
+ struct dentry *dentry;
+ struct inode *inode;
+ int rc = 0, i, j, tot_bytes = 0, cleanup_phase = 0;
+ unsigned long now = jiffies;
+ ENTRY;
+ LASSERT(objcount == 1);
+
+ obd = exp->exp_obd;
+ if (obd == NULL)
+ RETURN(-EINVAL);
+ OBD_ALLOC(fso, objcount * sizeof(*fso));
+ if (fso == NULL)
+ RETURN(-ENOMEM);
+
+ memset(res, 0, niocount * sizeof(*res));
+
+ push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+ for (i = 0, o = obj; i < objcount; i++, o++) {
+ struct filter_dentry_data *fdd;
+ LASSERT(o->ioo_bufcnt);
+
+ dentry = filter_fid2dentry(obd, NULL, o->ioo_type, o->ioo_id);
+ if (IS_ERR(dentry))
+ GOTO(out_objinfo, rc = PTR_ERR(dentry));
+
+ if (dentry->d_inode == NULL) {
+ CERROR("trying to BRW to non-existent file "LPU64"\n",
+ o->ioo_id);
+ f_dput(dentry);
+ GOTO(out_objinfo, rc = -ENOENT);
+ }
+
+ fso[i].fso_dentry = dentry;
+ fso[i].fso_bufcnt = o->ioo_bufcnt;
+
+ fdd = dentry->d_fsdata;
+ if (fdd == NULL || !atomic_read(&fdd->fdd_open_count))
+ CDEBUG(D_PAGE, "I/O to unopened object "LPU64"\n",
+ o->ioo_id);
+ }
+
+ if (time_after(jiffies, now + 15 * HZ))
+ CERROR("slow prep setup %lus\n", (jiffies - now) / HZ);
+
+ for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
+ dentry = fso[i].fso_dentry;
+ inode = dentry->d_inode;
+
+ for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
+ if (j == 0)
+ lnb->dentry = dentry;
+ else
+ lnb->dentry = dget(dentry);
+
+ lnb->offset = rnb->offset;
+ lnb->len = rnb->len;
+ lnb->flags = rnb->flags;
+ lnb->start = jiffies;
+
+ if (inode->i_size <= rnb->offset) {
+ /* If there's no more data, abort early.
+ * lnb->page == NULL and lnb->rc == 0, so it's
+ * easy to detect later. */
+ f_dput(dentry);
+ lnb->dentry = NULL;
+ break;
+ } else {
+ rc = filter_start_page_read(inode, lnb);
+ }
+
+ if (rc) {
+ CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
+ "page err %u@"LPU64" %u/%u %p: rc %d\n",
+ lnb->len, lnb->offset, j, o->ioo_bufcnt,
+ dentry, rc);
+ f_dput(dentry);
+ GOTO(out_pages, rc);
+ }
+
+ tot_bytes += lnb->rc;
+ if (lnb->rc < lnb->len)
+ break; /* short read */
+ }
+ }
+
+ if (time_after(jiffies, now + 15 * HZ))
+ CERROR("slow prep get page %lus\n", (jiffies - now) / HZ);
+
+ lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES, tot_bytes);
+ while (lnb-- > res) {
+ rc = filter_finish_page_read(lnb);
+ if (rc) {
+ CERROR("error page %u@"LPU64" %u %p: rc %d\n", lnb->len,
+ lnb->offset, (int)(lnb - res), lnb->dentry, rc);
+ f_dput(lnb->dentry);
+ GOTO(out_pages, rc);
+ }
+ }
+
+ if (time_after(jiffies, now + 15 * HZ))
+ CERROR("slow prep finish page %lus\n", (jiffies - now) / HZ);
+
+ EXIT;
+out:
+ OBD_FREE(fso, objcount * sizeof(*fso));
+ /* we saved the journal handle into oti->oti_handle instead */
+ current->journal_info = NULL;
+ pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+ return rc;
+
+out_pages:
+ while (lnb-- > res) {
+ page_cache_release(lnb->page);
+ f_dput(lnb->dentry);
+ }
+ goto out; /* dropped the dentry refs already (one per page) */
+
+out_objinfo:
+ for (i = 0; i < objcount && fso[i].fso_dentry; i++)
+ f_dput(fso[i].fso_dentry);
+ goto out;
+}
+
+/* We need to balance prepare_write() calls with commit_write() calls.
* If the page has been prepared, but we have no data for it, we don't
* want to overwrite valid data on disk, but we still need to zero out
* data for space which was newly allocated. Like part of what happens
*
* XXX currently __block_prepare_write() creates buffers for all the
* pages, and the filesystems mark these buffers as BH_New if they
- * were newly allocated from disk. We use the BH_New flag similarly.
- */
+ * were newly allocated from disk. We use the BH_New flag similarly. */
static int filter_commit_write(struct niobuf_local *lnb, int err)
{
#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
return lustre_commit_write(lnb);
}
-int filter_preprw(int cmd, struct obd_export *exp, struct obdo *obdo,
- int objcount, struct obd_ioobj *obj,
- int niocount, struct niobuf_remote *nb,
- struct niobuf_local *res, struct obd_trans_info *oti)
+/* If we ever start to support multi-object BRW RPCs, we will need to get locks
+ * on mulitple inodes. That isn't all, because there still exists the
+ * possibility of a truncate starting a new transaction while holding the ext3
+ * rwsem = write while some writes (which have started their transactions here)
+ * blocking on the ext3 rwsem = read => lock inversion.
+ *
+ * The handling gets very ugly when dealing with locked pages. It may be easier
+ * to just get rid of the locked page code (which has problems of its own) and
+ * either discover we do not need it anymore (i.e. it was a symptom of another
+ * bug) or ensure we get the page locks in an appropriate order. */
+static int filter_preprw_write(struct obd_export *exp, struct obdo *obdo,
+ int objcount, struct obd_ioobj *obj,
+ int niocount, struct niobuf_remote *nb,
+ struct niobuf_local *res,
+ struct obd_trans_info *oti)
{
struct obd_run_ctxt saved;
struct obd_device *obd;
struct niobuf_local *lnb;
struct fsfilt_objinfo *fso;
struct dentry *dentry;
- struct inode *inode;
- int pglocked = 0, rc = 0, i, j, tot_bytes = 0;
+ int pglocked = 0, rc = 0, i, j, tot_bytes = 0, cleanup_phase = 0;
unsigned long now = jiffies;
ENTRY;
-
- memset(res, 0, niocount * sizeof(*res));
+ LASSERT(objcount == 1);
obd = exp->exp_obd;
if (obd == NULL)
RETURN(-EINVAL);
-
- // theoretically we support multi-obj BRW RPCs, but until then...
- LASSERT(objcount == 1);
-
OBD_ALLOC(fso, objcount * sizeof(*fso));
- if (!fso)
+ if (fso == NULL)
RETURN(-ENOMEM);
- push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+ memset(res, 0, niocount * sizeof(*res));
+ push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
for (i = 0, o = obj; i < objcount; i++, o++) {
struct filter_dentry_data *fdd;
-
LASSERT(o->ioo_bufcnt);
dentry = filter_fid2dentry(obd, NULL, o->ioo_type, o->ioo_id);
-
if (IS_ERR(dentry))
GOTO(out_objinfo, rc = PTR_ERR(dentry));
- fso[i].fso_dentry = dentry;
- fso[i].fso_bufcnt = o->ioo_bufcnt;
-
- if (!dentry->d_inode) {
+ if (dentry->d_inode == NULL) {
CERROR("trying to BRW to non-existent file "LPU64"\n",
o->ioo_id);
f_dput(dentry);
GOTO(out_objinfo, rc = -ENOENT);
}
- /* If we ever start to support mutli-object BRW RPCs, we will
- * need to get locks on mulitple inodes (in order) or use the
- * DLM to do the locking for us (and use the same locking in
- * filter_setattr() for truncate). That isn't all, because
- * there still exists the possibility of a truncate starting
- * a new transaction while holding the ext3 rwsem = write
- * while some writes (which have started their transactions
- * here) blocking on the ext3 rwsem = read => lock inversion.
- *
- * The handling gets very ugly when dealing with locked pages.
- * It may be easier to just get rid of the locked page code
- * (which has problems of its own) and either discover we do
- * not need it anymore (i.e. it was a symptom of another bug)
- * or ensure we get the page locks in an appropriate order.
- */
- if (cmd & OBD_BRW_WRITE)
- down(&dentry->d_inode->i_sem);
+ fso[i].fso_dentry = dentry;
+ fso[i].fso_bufcnt = o->ioo_bufcnt;
+
+ down(&dentry->d_inode->i_sem);
fdd = dentry->d_fsdata;
- if (!fdd || !atomic_read(&fdd->fdd_open_count))
+ if (fdd == NULL || !atomic_read(&fdd->fdd_open_count))
CDEBUG(D_PAGE, "I/O to unopened object "LPU64"\n",
o->ioo_id);
}
if (time_after(jiffies, now + 15 * HZ))
CERROR("slow prep setup %lus\n", (jiffies - now) / HZ);
- if (cmd & OBD_BRW_WRITE) {
-#warning "FIXME: we need inode->i_sem for each object to protect vs truncate"
- LASSERT(oti);
- /* Even worse, we need to get locks on mulitple inodes (in
- * order) or use the DLM to do the locking for us (and use
- * the same locking in filter_setattr() for truncate. The
- * handling gets very ugly when dealing with locked pages.
- * It may be easier to just get rid of the locked page code
- * (which has problems of its own) and either discover we do
- * not need it anymore (i.e. it was a symptom of another bug)
- * or ensure we get the page locks in an appropriate order.
- */
- oti->oti_handle = fsfilt_brw_start(obd, objcount, fso, niocount,
- oti->oti_handle);
- if (IS_ERR(oti->oti_handle)) {
- rc = PTR_ERR(oti->oti_handle);
- CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
- "error starting transaction: rc = %d\n", rc);
- oti->oti_handle = NULL;
- GOTO(out_objinfo, rc);
- }
+ LASSERT(oti != NULL);
+ oti->oti_handle = fsfilt_brw_start(obd, objcount, fso, niocount, oti);
+ if (IS_ERR(oti->oti_handle)) {
+ rc = PTR_ERR(oti->oti_handle);
+ CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
+ "error starting transaction: rc = %d\n", rc);
+ oti->oti_handle = NULL;
+ GOTO(out_objinfo, rc);
}
for (i = 0, o = obj, rnb = nb, lnb = res; i < objcount; i++, o++) {
dentry = fso[i].fso_dentry;
- inode = dentry->d_inode;
-
for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
if (j == 0)
lnb->dentry = dentry;
lnb->flags = rnb->flags;
lnb->start = jiffies;
- if (cmd & OBD_BRW_WRITE) {
- rc = filter_get_page_write(inode,lnb,&pglocked);
- if (rc)
- up(&dentry->d_inode->i_sem);
- } else if (inode->i_size <= rnb->offset) {
- /* If there's no more data, abort early.
- * lnb->page == NULL and lnb->rc == 0, so it's
- * easy to detect later. */
- f_dput(dentry);
- lnb->dentry = NULL;
- break;
- } else {
- rc = filter_start_page_read(inode, lnb);
- }
+ rc = filter_get_page_write(dentry->d_inode, lnb,
+ &pglocked);
+ if (rc)
+ up(&dentry->d_inode->i_sem);
if (rc) {
CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
f_dput(dentry);
GOTO(out_pages, rc);
}
-
tot_bytes += lnb->len;
-
- if ((cmd & OBD_BRW_READ) && lnb->rc < lnb->len) {
- /* Likewise with a partial read */
- break;
- }
}
}
if (time_after(jiffies, now + 15 * HZ))
CERROR("slow prep get page %lus\n", (jiffies - now) / HZ);
- if (cmd & OBD_BRW_READ) {
- lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_READ_BYTES,
- tot_bytes);
- while (lnb-- > res) {
- rc = filter_finish_page_read(lnb);
- if (rc) {
- CERROR("error page %u@"LPU64" %u %p: rc %d\n",
- lnb->len, lnb->offset, (int)(lnb - res),
- lnb->dentry, rc);
- f_dput(lnb->dentry);
- GOTO(out_pages, rc);
- }
- }
- } else
- lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,
- tot_bytes);
-
- if (time_after(jiffies, now + 15 * HZ))
- CERROR("slow prep finish page %lus\n", (jiffies - now) / HZ);
+ lprocfs_counter_add(obd->obd_stats, LPROC_FILTER_WRITE_BYTES,tot_bytes);
EXIT;
out:
out_pages:
while (lnb-- > res) {
- if (cmd & OBD_BRW_WRITE) {
- filter_commit_write(lnb, rc);
- up(&lnb->dentry->d_inode->i_sem);
- } else {
- lustre_put_page(lnb->page);
- }
+ filter_commit_write(lnb, rc);
+ up(&lnb->dentry->d_inode->i_sem);
f_dput(lnb->dentry);
}
- if (cmd & OBD_BRW_WRITE) {
- filter_finish_transno(exp, oti, rc);
- fsfilt_commit(obd,
- filter_parent(obd,S_IFREG,obj->ioo_id)->d_inode,
- oti->oti_handle, 0);
- }
+ filter_finish_transno(exp, oti, rc);
+ fsfilt_commit(obd, filter_parent(obd,S_IFREG,obj->ioo_id)->d_inode,
+ oti->oti_handle, 0);
goto out; /* dropped the dentry refs already (one per page) */
out_objinfo:
for (i = 0; i < objcount && fso[i].fso_dentry; i++) {
- if (cmd & OBD_BRW_WRITE)
- up(&fso[i].fso_dentry->d_inode->i_sem);
+ up(&fso[i].fso_dentry->d_inode->i_sem);
f_dput(fso[i].fso_dentry);
}
goto out;
}
+int filter_preprw(int cmd, struct obd_export *exp, struct obdo *obdo,
+ int objcount, struct obd_ioobj *obj, int niocount,
+ struct niobuf_remote *nb, struct niobuf_local *res,
+ struct obd_trans_info *oti)
+{
+ if (cmd == OBD_BRW_WRITE)
+ return filter_preprw_write(exp, obdo, objcount, obj, niocount,
+ nb, res, oti);
+ else if (cmd == OBD_BRW_READ)
+ return filter_preprw_read(exp, obdo, objcount, obj, niocount,
+ nb, res, oti);
+ else
+ LBUG();
+}
+
+/* It is highly unlikely that we would ever get an error here. The page we want
+ * to get was previously locked, so it had to have already allocated the space,
+ * and we were just writing over the same data, so there would be no hole in the
+ * file.
+ *
+ * XXX: possibility of a race with truncate could exist, need to check that.
+ * There are no guarantees w.r.t. write order even on a local filesystem,
+ * although the normal response would be to return the number of bytes
+ * successfully written and leave the rest to the app. */
static int filter_write_locked_page(struct niobuf_local *lnb)
{
struct page *lpage;
- void *lpage_addr;
- void *lnb_addr;
+ void *lpage_addr, *lnb_addr;
int rc;
ENTRY;
lpage = lustre_get_page_write(lnb->dentry->d_inode, lnb->page->index);
if (IS_ERR(lpage)) {
- /* It is highly unlikely that we would ever get an error here.
- * The page we want to get was previously locked, so it had to
- * have already allocated the space, and we were just writing
- * over the same data, so there would be no hole in the file.
- *
- * XXX: possibility of a race with truncate could exist, need
- * to check that. There are no guarantees w.r.t.
- * write order even on a local filesystem, although the
- * normal response would be to return the number of bytes
- * successfully written and leave the rest to the app.
- */
rc = PTR_ERR(lpage);
CERROR("error getting locked page index %ld: rc = %d\n",
lnb->page->index, rc);
kunmap(lnb->page);
kunmap(lpage);
- lustre_put_page(lnb->page);
+ page_cache_release(lnb->page);
lnb->page = lpage;
rc = lustre_commit_write(lnb);
if (rc)
CERROR("error committing locked page %ld: rc = %d\n",
lnb->page->index, rc);
-
RETURN(rc);
}
-int filter_commitrw(int cmd, struct obd_export *exp,
- int objcount, struct obd_ioobj *obj,
- int niocount, struct niobuf_local *res,
- struct obd_trans_info *oti)
+int filter_commitrw(int cmd, struct obd_export *exp, int objcount,
+ struct obd_ioobj *obj, int niocount,
+ struct niobuf_local *res, struct obd_trans_info *oti)
{
struct obd_run_ctxt saved;
struct obd_ioobj *o;
if (!rc)
rc = err;
} else {
- lustre_put_page(lnb->page);
+ page_cache_release(lnb->page);
}
f_dput(lnb->dentry);
}
LASSERT(nested_trans || current->journal_info == NULL);
-
pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
RETURN(rc);
}
-int filter_brw(int cmd, struct lustre_handle *conn,
- struct lov_stripe_md *lsm, obd_count oa_bufs,
- struct brw_page *pga, struct obd_trans_info *oti)
+int filter_brw(int cmd, struct lustre_handle *conn, struct lov_stripe_md *lsm,
+ obd_count oa_bufs, struct brw_page *pga,
+ struct obd_trans_info *oti)
{
- struct obd_export *exp = class_conn2export(conn);
- struct obd_ioobj ioo;
- struct niobuf_local *lnb;
- struct niobuf_remote *rnb;
- obd_count i;
- int ret = 0;
+ struct obd_export *exp = class_conn2export(conn);
+ struct obd_ioobj ioo;
+ struct niobuf_local *lnb;
+ struct niobuf_remote *rnb;
+ obd_count i;
+ int ret = 0;
ENTRY;
if (exp == NULL)