if (IS_ERR(page))
return lnb->rc = PTR_ERR(page);
+ LASSERT(page->mapping == mapping);
+
lnb->page = page;
if (inode->i_size < lnb->offset + lnb->len - 1)
struct fsfilt_objinfo *fso;
struct dentry *dentry;
struct inode *inode;
- int rc = 0, i, j, tot_bytes = 0;
+ int rc = 0, i, j, tot_bytes = 0, cleanup_phase = 0;
unsigned long now = jiffies;
ENTRY;
- /* We are currently not supporting multi-obj BRW_READ RPCS at all */
+ /* We are currently not supporting multi-obj BRW_READ RPCS at all.
+ * When we do this function's dentry cleanup will need to be fixed */
LASSERT(objcount == 1);
OBD_ALLOC(fso, objcount * sizeof(*fso));
dentry = filter_oa2dentry(exp->exp_obd, oa);
if (IS_ERR(dentry))
- GOTO(out_objinfo, rc = PTR_ERR(dentry));
+ GOTO(cleanup, rc = PTR_ERR(dentry));
if (dentry->d_inode == NULL) {
CERROR("trying to BRW to non-existent file "LPU64"\n",
o->ioo_id);
f_dput(dentry);
- GOTO(out_objinfo, rc = -ENOENT);
+ GOTO(cleanup, rc = -ENOENT);
}
fso[i].fso_dentry = dentry;
inode = dentry->d_inode;
for (j = 0; j < o->ioo_bufcnt; j++, rnb++, lnb++) {
- if (j == 0)
- lnb->dentry = dentry;
- else
- lnb->dentry = dget(dentry);
-
+ lnb->dentry = dentry;
lnb->offset = rnb->offset;
lnb->len = rnb->len;
lnb->flags = rnb->flags;
/* If there's no more data, abort early.
* lnb->page == NULL and lnb->rc == 0, so it's
* easy to detect later. */
- f_dput(dentry);
- lnb->dentry = NULL;
break;
} else {
rc = filter_start_page_read(inode, lnb);
"page err %u@"LPU64" %u/%u %p: rc %d\n",
lnb->len, lnb->offset, j, o->ioo_bufcnt,
dentry, rc);
- f_dput(dentry);
- GOTO(out_pages, rc);
+ cleanup_phase = 1;
+ GOTO(cleanup, rc);
}
tot_bytes += lnb->rc;
- if (lnb->rc < lnb->len)
- break; /* short read */
+ if (lnb->rc < lnb->len) {
+ /* short read, be sure to wait on it */
+ lnb++;
+ break;
+ }
}
}
if (rc) {
CERROR("error page %u@"LPU64" %u %p: rc %d\n", lnb->len,
lnb->offset, (int)(lnb - res), lnb->dentry, rc);
- f_dput(lnb->dentry);
- GOTO(out_pages, rc);
+ cleanup_phase = 1;
+ GOTO(cleanup, rc);
}
}
CERROR("slow prep finish page %lus\n", (jiffies - now) / HZ);
EXIT;
-out:
- OBD_FREE(fso, objcount * sizeof(*fso));
- /* we saved the journal handle into oti->oti_handle instead */
- current->journal_info = NULL;
- pop_ctxt(&saved, &exp->exp_obd->u.filter.fo_ctxt, NULL);
- return rc;
-out_pages:
- while (lnb-- > res) {
- page_cache_release(lnb->page);
- f_dput(lnb->dentry);
+ cleanup:
+ switch (cleanup_phase) {
+ case 1:
+ for (lnb = res; lnb < (res + niocount); lnb++) {
+ if (lnb->page)
+ page_cache_release(lnb->page);
+ }
+ if (res->dentry != NULL)
+ f_dput(res->dentry);
+ else
+ CERROR("NULL dentry in cleanup -- tell CFS\n");
+ res->dentry = NULL;
+ case 0:
+ OBD_FREE(fso, objcount * sizeof(*fso));
+ pop_ctxt(&saved, &exp->exp_obd->u.filter.fo_ctxt, NULL);
}
- goto out; /* dropped the dentry refs already (one per page) */
-
-out_objinfo:
- for (i = 0; i < objcount && fso[i].fso_dentry; i++)
- f_dput(fso[i].fso_dentry);
- goto out;
+ return rc;
}
/* We need to balance prepare_write() calls with commit_write() calls.
ENTRY;
LASSERT(objcount == 1);
+ /* We should never be called during a transaction */
+ LASSERT(oti != NULL);
+ LASSERT(oti->oti_handle == NULL);
+ LASSERT(current->journal_info == NULL);
+
OBD_ALLOC(fso, objcount * sizeof(*fso));
if (fso == NULL)
RETURN(-ENOMEM);
if (time_after(jiffies, now + 15 * HZ))
CERROR("slow prep setup %lus\n", (jiffies - now) / HZ);
- LASSERT(oti != NULL);
oti->oti_handle = fsfilt_brw_start(exp->exp_obd, objcount, fso,
- niocount, oti->oti_handle);
+ niocount, oti);
if (IS_ERR(oti->oti_handle)) {
rc = PTR_ERR(oti->oti_handle);
CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR,
fsfilt_commit(exp->exp_obd,
filter_parent(exp->exp_obd,S_IFREG,obj->ioo_id)->d_inode,
oti->oti_handle, 0);
+ LASSERT(current->journal_info == NULL);
+ oti->oti_handle = NULL;
goto out; /* dropped the dentry refs already (one per page) */
out_objinfo:
RETURN(rc);
}
-int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
- int objcount, struct obd_ioobj *obj, int niocount,
- struct niobuf_local *res, struct obd_trans_info *oti)
+static int filter_commitrw_read(struct obd_export *exp, int objcount,
+ struct obd_ioobj *obj, int niocount,
+ struct niobuf_local *res,
+ struct obd_trans_info *oti)
+{
+ struct obd_ioobj *o;
+ struct niobuf_local *lnb;
+ int i, j;
+ ENTRY;
+
+ for (i = 0, o = obj, lnb = res; i < objcount; i++, o++) {
+ for (j = 0 ; j < o->ioo_bufcnt ; j++, lnb++) {
+ if (lnb->page != NULL)
+ page_cache_release(lnb->page);
+ }
+ }
+ if (res->dentry != NULL)
+ f_dput(res->dentry);
+ RETURN(0);
+}
+
+static int
+filter_commitrw_write(int cmd, struct obd_export *exp, struct obdo *oa,
+ int objcount, struct obd_ioobj *obj, int niocount,
+ struct niobuf_local *res, struct obd_trans_info *oti)
{
struct obd_run_ctxt saved;
struct obd_ioobj *o;
struct niobuf_local *lnb;
struct obd_device *obd = exp->exp_obd;
int found_locked = 0, rc = 0, i;
- int nested_trans = current->journal_info != NULL;
unsigned long now = jiffies; /* DEBUGGING OST TIMEOUTS */
ENTRY;
push_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
+ LASSERT(current->journal_info == NULL);
if (cmd & OBD_BRW_WRITE) {
- LASSERT(oti);
- LASSERT(current->journal_info == NULL ||
- current->journal_info == oti->oti_handle);
+ LASSERT(oti != NULL);
+ LASSERT(oti->oti_handle != NULL);
current->journal_info = oti->oti_handle;
}
rc = filter_finish_transno(exp, oti, rc);
err = fsfilt_commit(obd, dparent->d_inode, oti->oti_handle,
obd_sync_filter);
+ LASSERT(current->journal_info == NULL);
+ oti->oti_handle = NULL;
if (err)
rc = err;
if (obd_sync_filter)
CERROR("slow commitrw commit %lus\n", (jiffies-now)/HZ);
}
- LASSERT(nested_trans || current->journal_info == NULL);
pop_ctxt(&saved, &obd->u.filter.fo_ctxt, NULL);
RETURN(rc);
}
+/* XXX needs to trickle its oa down */
+int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
+ int objcount, struct obd_ioobj *obj, int niocount,
+ struct niobuf_local *res, struct obd_trans_info *oti)
+{
+ if (cmd == OBD_BRW_WRITE)
+ return filter_commitrw_write(cmd, exp, oa, objcount, obj,
+ niocount, res, oti);
+ if (cmd == OBD_BRW_READ)
+ return filter_commitrw_read(exp, objcount, obj, niocount,
+ res, oti);
+ LBUG();
+ return -EPROTO;
+}
+
int filter_brw(int cmd, struct lustre_handle *conn, struct obdo *oa,
struct lov_stripe_md *lsm, obd_count oa_bufs,
struct brw_page *pga, struct obd_trans_info *oti)