if (IS_ERR(page))
return lnb->rc = PTR_ERR(page);
+ LASSERT(page->mapping == mapping);
+
lnb->page = page;
if (inode->i_size < lnb->offset + lnb->len - 1)
struct fsfilt_objinfo *fso;
struct dentry *dentry;
struct inode *inode;
- int rc = 0, i, j, tot_bytes = 0;
+ int rc = 0, i, j, tot_bytes = 0, cleanup_phase = 0;
unsigned long now = jiffies;
ENTRY;
- /* We are currently not supporting multi-obj BRW_READ RPCS at all */
+ /* We are currently not supporting multi-obj BRW_READ RPCS at all.
+ * When we do this function's dentry cleanup will need to be fixed */
LASSERT(objcount == 1);
OBD_ALLOC(fso, objcount * sizeof(*fso));
dentry = filter_oa2dentry(exp->exp_obd, oa);
if (IS_ERR(dentry))
- GOTO(out_objinfo, rc = PTR_ERR(dentry));
+ GOTO(cleanup, rc = PTR_ERR(dentry));
if (dentry->d_inode == NULL) {
CERROR("trying to BRW to non-existent file "LPU64"\n",
o->ioo_id);
f_dput(dentry);
- GOTO(out_objinfo, rc = -ENOENT);
+ GOTO(cleanup, rc = -ENOENT);
}
fso[i].fso_dentry = dentry;
inode = dentry->d_inode;
for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
- if (j == 0)
- lnb->dentry = dentry;
- else
- lnb->dentry = dget(dentry);
-
+ lnb->dentry = dentry;
lnb->offset = rnb->offset;
lnb->len = rnb->len;
lnb->flags = rnb->flags;
/* If there's no more data, abort early.
* lnb->page == NULL and lnb->rc == 0, so it's
* easy to detect later. */
- f_dput(dentry);
lnb->dentry = NULL;
break;
} else {
"page err %u@"LPU64" %u/%u %p: rc %d\n",
lnb->len, lnb->offset, j, o->ioo_bufcnt,
dentry, rc);
- f_dput(dentry);
- GOTO(out_pages, rc);
+ cleanup_phase = 1;
+ GOTO(cleanup, rc);
}
tot_bytes += lnb->rc;
- if (lnb->rc < lnb->len)
- break; /* short read */
+ if (lnb->rc < lnb->len) {
+ /* short read, be sure to wait on it */
+ lnb++;
+ break;
+ }
}
}
if (rc) {
CERROR("error page %u@"LPU64" %u %p: rc %d\n", lnb->len,
lnb->offset, (int)(lnb - res), lnb->dentry, rc);
- f_dput(lnb->dentry);
- GOTO(out_pages, rc);
+ cleanup_phase = 1;
+ GOTO(cleanup, rc);
}
}
CERROR("slow prep finish page %lus\n", (jiffies - now) / HZ);
EXIT;
-out:
- OBD_FREE(fso, objcount * sizeof(*fso));
- /* we saved the journal handle into oti->oti_handle instead */
- current->journal_info = NULL;
- pop_ctxt(&saved, &exp->exp_obd->u.filter.fo_ctxt, NULL);
- return rc;
-out_pages:
- while (lnb-- > res) {
- page_cache_release(lnb->page);
- f_dput(lnb->dentry);
+ cleanup:
+ switch (cleanup_phase) {
+ case 1:
+ for (lnb = res; lnb < (res + niocount); lnb++) {
+ if (lnb->page)
+ page_cache_release(lnb->page);
+ }
+ f_dput(res->dentry);
+ case 0:
+ OBD_FREE(fso, objcount * sizeof(*fso));
+ pop_ctxt(&saved, &exp->exp_obd->u.filter.fo_ctxt, NULL);
}
- goto out; /* dropped the dentry refs already (one per page) */
-
-out_objinfo:
- for (i = 0; i < objcount && fso[i].fso_dentry; i++)
- f_dput(fso[i].fso_dentry);
- goto out;
+ return rc;
}
/* We need to balance prepare_write() calls with commit_write() calls.
RETURN(rc);
}
-int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
- int objcount, struct obd_ioobj *obj, int niocount,
- struct niobuf_local *res, struct obd_trans_info *oti)
+static int filter_commitrw_read(struct obd_export *exp, int objcount,
+ struct obd_ioobj *obj, int niocount,
+ struct niobuf_local *res,
+ struct obd_trans_info *oti)
+{
+ struct obd_ioobj *o;
+ struct niobuf_local *lnb;
+ int i, j;
+ ENTRY;
+
+ for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
+ for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
+ if (lnb->page != NULL)
+ page_cache_release(lnb->page);
+ }
+ }
+ f_dput(res->dentry);
+ RETURN(0);
+}
+
+static int
+filter_commitrw_write(int cmd, struct obd_export *exp, struct obdo *oa,
+ int objcount, struct obd_ioobj *obj, int niocount,
+ struct niobuf_local *res, struct obd_trans_info *oti)
{
struct obd_run_ctxt saved;
struct obd_ioobj *o;
RETURN(rc);
}
+/* XXX needs to trickle its oa down */
+int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
+ int objcount, struct obd_ioobj *obj, int niocount,
+ struct niobuf_local *res, struct obd_trans_info *oti)
+{
+ if (cmd == OBD_BRW_WRITE)
+ return filter_commitrw_write(cmd, exp, oa, objcount, obj,
+ niocount, res, oti);
+ if (cmd == OBD_BRW_READ)
+ return filter_commitrw_read(exp, objcount, obj, niocount,
+ res, oti);
+ LBUG();
+ return -EPROTO;
+}
+
int filter_brw(int cmd, struct lustre_handle *conn, struct obdo *oa,
struct lov_stripe_md *lsm, obd_count oa_bufs,
struct brw_page *pga, struct obd_trans_info *oti)