The only real change is the filter multi-page BRW failure cleanup code.
inline void lustre_put_page(struct page *page);
struct page *lustre_get_page_read(struct inode *dir, unsigned long index);
struct page *lustre_get_page_write(struct inode *dir, unsigned long index);
-int lustre_commit_page(struct page *page, unsigned from, unsigned to);
+int lustre_commit_write(struct page *page, unsigned from, unsigned to);
void set_page_clean(struct page *page);
void set_page_dirty(struct page *page);
struct obd_run_ctxt;
void push_ctxt(struct obd_run_ctxt *save, struct obd_run_ctxt *new);
void pop_ctxt(struct obd_run_ctxt *saved);
-#ifdef CTXT_DEBUG
+#ifdef OBD_CTXT_DEBUG
#define OBD_SET_CTXT_MAGIC(ctxt) (ctxt)->magic = OBD_RUN_CTXT_MAGIC
#else
-#define OBD_SET_CTXT_MAGIC(magic) do {} while(0)
+#define OBD_SET_CTXT_MAGIC(ctxt) do {} while(0)
#endif
struct dentry *simple_mkdir(struct dentry *dir, char *name, int mode);
int lustre_fread(struct file *file, char *str, int len, loff_t *off);
return ERR_PTR(rc);
}
-int lustre_commit_page(struct page *page, unsigned from, unsigned to)
+int lustre_commit_write(struct page *page, unsigned from, unsigned to)
{
struct inode *inode = page->mapping->host;
int err = 0;
#include <linux/lustre_lib.h>
#include <linux/lustre_net.h>
+#ifdef OBD_CTXT_DEBUG
+/* Debugging check only needed during development */
+#define ASSERT_CTXT_MAGIC(magic) do { if ((magic) != OBD_RUN_CTXT_MAGIC) { \
+ CERROR("bad ctxt magic\n"); LBUG(); } } while(0)
+#define ASSERT_NOT_KERNEL_CTXT(msg) do { if (segment_eq(get_fs(), get_ds())) { \
+ CERROR(msg); LBUG(); } } while(0)
+#define ASSERT_KERNEL_CTXT(msg) do { if (!segment_eq(get_fs(), get_ds())) { \
+ CERROR(msg); LBUG(); } } while(0)
+#else
+#define ASSERT_CTXT_MAGIC(magic) do {} while(0)
+#define ASSERT_NOT_KERNEL_CTXT(msg) do {} while(0)
+#define ASSERT_KERNEL_CTXT(msg) do {} while(0)
+#endif
/* push / pop to root of obd store */
void push_ctxt(struct obd_run_ctxt *save, struct obd_run_ctxt *new)
{
+ //ASSERT_NOT_KERNEL_CTXT("already in kernel context!\n");
+ ASSERT_CTXT_MAGIC(new->magic);
+ OBD_SET_CTXT_MAGIC(save);
save->fs = get_fs();
save->pwd = dget(current->fs->pwd);
save->pwdmnt = mntget(current->fs->pwdmnt);
void pop_ctxt(struct obd_run_ctxt *saved)
{
+ ASSERT_KERNEL_CTXT("popping non-kernel context!\n");
+ ASSERT_CTXT_MAGIC(saved->magic);
set_fs(saved->fs);
set_fs_pwd(current->fs, saved->pwdmnt, saved->pwd);
int err;
ENTRY;
- CDEBUG(D_INODE, "creating directory %*s\n", strlen(name), name);
+ ASSERT_KERNEL_CTXT("kernel doing mkdir outside kernel context\n");
+ CDEBUG(D_INODE, "creating directory %*s\n", strlen(name), name);
dchild = lookup_one_len(name, dir, strlen(name));
if (IS_ERR(dchild))
RETURN(dchild);
if (dchild->d_inode) {
- if (!S_ISDIR(dchild->d_inode->i_mode))
- GOTO(out, err = -ENOTDIR);
+ if (!S_ISDIR(dchild->d_inode->i_mode))
+ GOTO(out, err = -ENOTDIR);
RETURN(dchild);
- }
+ }
err = vfs_mkdir(dir->d_inode, dchild, mode);
EXIT;
RETURN(dchild);
}
+/*
+ * Read a file from within kernel context. Prior to calling this
+ * function we should already have done a push_ctxt().
+ */
int lustre_fread(struct file *file, char *str, int len, loff_t *off)
{
- if (!file || !file->f_op || !file->f_op->read || !off)
- RETURN(-ENOSYS);
+ ASSERT_KERNEL_CTXT("kernel doing read outside kernel context\n");
+ if (!file || !file->f_op || !file->f_op->read || !off)
+ RETURN(-ENOSYS);
- return file->f_op->read(file, str, len, off);
+ return file->f_op->read(file, str, len, off);
}
+/*
+ * Write a file from within kernel context. Prior to calling this
+ * function we should already have done a push_ctxt().
+ */
int lustre_fwrite(struct file *file, const char *str, int len, loff_t *off)
{
- if (!file || !file->f_op || !off)
- RETURN(-ENOSYS);
+ ASSERT_KERNEL_CTXT("kernel doing write outside kernel context\n");
+ if (!file || !file->f_op || !off)
+ RETURN(-ENOSYS);
- if (!file->f_op->write)
- RETURN(-EROFS);
+ if (!file->f_op->write)
+ RETURN(-EROFS);
- return file->f_op->write(file, str, len, off);
+ return file->f_op->write(file, str, len, off);
}
+/*
+ * Sync a file from within kernel context. Prior to calling this
+ * function we should already have done a push_ctxt().
+ */
int lustre_fsync(struct file *file)
{
- if (!file || !file->f_op || !file->f_op->fsync)
- RETURN(-ENOSYS);
+ ASSERT_KERNEL_CTXT("kernel doing sync outside kernel context\n");
+ if (!file || !file->f_op || !file->f_op->fsync)
+ RETURN(-ENOSYS);
- return file->f_op->fsync(file, file->f_dentry, 0);
+ return file->f_op->fsync(file, file->f_dentry, 0);
}
inode->i_ctime = dir->i_ctime;
ext2_dec_count(inode);
- err = 0;
out:
return err;
}
ENTRY;
if (blocksize != PAGE_SIZE) {
- CERROR("direct_IO blocksize != PAGE_SIZE, what to do?\n");
- LBUG();
+ CERROR("direct_IO blocksize != PAGE_SIZE\n");
+ return -EINVAL;
}
- OBD_ALLOC(count, sizeof(obd_size) * bufs_per_obdo);
- OBD_ALLOC(offset, sizeof(obd_off) * bufs_per_obdo);
- OBD_ALLOC(flags, sizeof(obd_flag) * bufs_per_obdo);
+ OBD_ALLOC(count, sizeof(*count) * bufs_per_obdo);
+ OBD_ALLOC(offset, sizeof(*offset) * bufs_per_obdo);
+ OBD_ALLOC(flags, sizeof(*flags) * bufs_per_obdo);
if (!count || !offset || !flags)
GOTO(out, rc = -ENOMEM);
oa = ll_i2info(inode)->lli_obdo;
if (!oa)
GOTO(out, rc = -ENOMEM);
+
rc = obd_brw(rw == WRITE ? OBD_BRW_WRITE : OBD_BRW_READ,
ll_i2obdconn(inode), num_obdo, &oa, &bufs_per_obdo,
iobuf->maplist, count, offset, flags, NULL);
if (rc == 0)
rc = bufs_per_obdo * PAGE_SIZE;
- out:
- if (flags)
- OBD_FREE(flags, sizeof(obd_flag) * bufs_per_obdo);
- if (count)
- OBD_FREE(count, sizeof(obd_count) * bufs_per_obdo);
- if (offset)
- OBD_FREE(offset, sizeof(obd_off) * bufs_per_obdo);
+out:
+ OBD_FREE(flags, sizeof(obd_flag) * bufs_per_obdo);
+ OBD_FREE(count, sizeof(obd_count) * bufs_per_obdo);
+ OBD_FREE(offset, sizeof(obd_off) * bufs_per_obdo);
RETURN(rc);
}
err = bufs_per_obdo * 4096;
#endif
out:
- if (oa)
- obdo_free(oa);
- if (flags)
- OBD_FREE(flags, sizeof(obd_flag) * bufs_per_obdo);
- if (count)
- OBD_FREE(count, sizeof(obd_count) * bufs_per_obdo);
- if (offset)
- OBD_FREE(offset, sizeof(obd_off) * bufs_per_obdo);
+ obdo_free(oa);
+ OBD_FREE(flags, sizeof(obd_flag) * bufs_per_obdo);
+ OBD_FREE(count, sizeof(obd_count) * bufs_per_obdo);
+ OBD_FREE(offset, sizeof(obd_off) * bufs_per_obdo);
RETURN(err);
}
body = lustre_msg_buf(req->rq_reqmsg, 0);
/* was this animal open already? */
- /* XXX we chould only check on re-open, or do a refcount... */
+ /* XXX we should only check on re-open, or do a refcount... */
list_for_each(tmp, &mci->mci_open_head) {
struct mds_file_data *fd;
fd = list_entry(tmp, struct mds_file_data, mfd_list);
RETURN(0);
}
-static
-int mds_close(struct ptlrpc_request *req)
+static int mds_close(struct ptlrpc_request *req)
{
struct dentry *de;
struct mds_body *body;
OBD_SET_CTXT_MAGIC(&mds->mds_ctxt);
mds->mds_ctxt.pwdmnt = mnt;
mds->mds_ctxt.pwd = mnt->mnt_root;
- mds->mds_ctxt.fs = KERNEL_DS;
+ mds->mds_ctxt.fs = get_ds();
/*
* Replace the client filesystem delete_inode method with our own,
return(rc);
out_O_mode:
- while (--mode >= 0) {
+ while (mode-- > 0) {
struct dentry *dentry = filter->fo_dentry_O_mode[mode];
if (dentry) {
CDEBUG(D_INODE, "putting O/%s: %p, count = %d\n",
filter->fo_vfsmnt = mnt;
filter->fo_fstype = strdup(data->ioc_inlbuf2);
+ OBD_SET_CTXT_MAGIC(&filter->fo_ctxt);
filter->fo_ctxt.pwdmnt = mnt;
filter->fo_ctxt.pwd = mnt->mnt_root;
- filter->fo_ctxt.fs = KERNEL_DS;
+ filter->fo_ctxt.fs = get_ds();
err = filter_prep(obddev);
if (err)
return err;
} /* filter_write */
-static int filter_pgcache_brw(int rw, struct obd_conn *conn, obd_count num_oa,
+static int filter_pgcache_brw(int cmd, struct obd_conn *conn, obd_count num_oa,
struct obdo **oa, obd_count *oa_bufs,
struct page **pages, obd_size *count,
obd_off *offset, obd_flag *flags, void *callback)
}
sb = conn->oc_dev->u.filter.fo_sb;
- // if (rw == WRITE)
push_ctxt(&saved, &conn->oc_dev->u.filter.fo_ctxt);
pnum = 0; /* pnum indexes buf 0..num_pages */
for (onum = 0; onum < num_oa; onum++) {
for (pg = 0; pg < oa_bufs[onum]; pg++) {
CDEBUG(D_INODE, "OP %d obdo no/pno: (%d,%d) (%ld,%ld) "
"off count (%Ld,%Ld)\n",
- rw, onum, pnum, file->f_dentry->d_inode->i_ino,
+ cmd, onum, pnum, file->f_dentry->d_inode->i_ino,
(unsigned long)offset[pnum] >> PAGE_CACHE_SHIFT,
(unsigned long long)offset[pnum],
(unsigned long long)count[pnum]);
- if (rw == WRITE) {
+ if (cmd & OBD_BRW_WRITE) {
loff_t off;
char *buffer;
off = offset[pnum];
EXIT;
out:
- // if (rw == WRITE)
pop_ctxt(&saved);
error = (retval >= 0) ? 0 : retval;
return error;
!strcmp(filter->fo_fstype, "extN"))
rc = ext3_filter_journal_stop(handle);
+ if (rc)
+ CERROR("error on journal stop: rc = %d\n", rc);
+
current->journal_info = journal_save;
return rc;
//ASSERT_PAGE_INDEX(index, GOTO(err, rc = -EINVAL));
page = grab_cache_page_nowait(mapping, index); /* locked page */
- /* This page is currently locked, so we grab a new one temporarily */
+ /* This page is currently locked, so get a temporary page instead */
if (!page) {
unsigned long addr;
- addr = __get_free_pages(GFP_KERNEL, 0);
+ CDEBUG(D_PAGE, "ino %ld page %ld locked\n", inode->i_ino,index);
+ addr = __get_free_pages(GFP_KERNEL, 0); /* locked page */
if (!addr) {
CERROR("no memory for a temp page\n");
LBUG();
return ERR_PTR(rc);
}
+/*
+ * We need to balance prepare_write() calls with commit_write() calls.
+ * If the page has been prepared, but we have no data for it, we don't
+ * want to overwrite valid data on disk, but we still need to zero out
+ * data for space which was newly allocated. Like part of what happens
+ * in __block_prepare_write() for newly allocated blocks.
+ *
+ * XXX currently __block_prepare_write() creates buffers for all the
+ * pages, and the filesystems mark these buffers as BH_New if they
+ * were newly allocated from disk. We use the BH_New flag similarly.
+ */
+static int filter_commit_write(struct page *page, unsigned from, unsigned to,
+ int err)
+{
+ if (err) {
+ unsigned block_start, block_end;
+ struct buffer_head *bh, *head = page->buffers;
+ unsigned blocksize = head->b_size;
+ void *addr = page_address(page);
+
+ /* Currently one buffer per page, but in the future... */
+ for (bh = head, block_start = 0; bh != head || !block_start;
+ block_start = block_end, bh = bh->b_this_page) {
+ block_end = block_start + blocksize;
+ if (buffer_new(bh))
+ memset(addr + block_start, 0, blocksize);
+ }
+ }
+
+ return lustre_commit_write(page, from, to);
+}
+
static int filter_preprw(int cmd, struct obd_conn *conn,
int objcount, struct obd_ioobj *obj,
int niocount, struct niobuf_remote *nb,
push_ctxt(&saved, &obddev->u.filter.fo_ctxt);
- if (cmd == OBD_BRW_WRITE) {
+ if (cmd & OBD_BRW_WRITE) {
*desc_private = filter_journal_start(&journal_save,
&obddev->u.filter,
objcount, obj, niocount,
filter_parent(obddev, S_IFREG),
o->ioo_id, S_IFREG);
if (IS_ERR(dentry))
- GOTO(out_ctxt, rc = PTR_ERR(dentry));
+ GOTO(out_clean, rc = PTR_ERR(dentry));
inode = dentry->d_inode;
+ if (!inode) {
+ CERROR("trying to BRW to non-existent file %Ld\n",
+ (unsigned long long)o->ioo_id);
+ dput(dentry);
+ GOTO(out_clean, rc = -ENOENT);
+ }
for (j = 0; j < o->ioo_bufcnt; j++, b++, r++) {
unsigned long index = b->offset >> PAGE_SHIFT;
struct page *page;
- /* XXX We _might_ change this to a dcount if we
- * wanted to pass a dentry pointer in the niobuf
- * to avoid doing so many igets on an inode we
- * already have. It appears to be solely for the
- * purpose of having a refcount that we can drop
- * in commitrw where we get one call per page.
- */
- if (j > 0)
- r->dentry = dget(dentry);
- else
+ if (j == 0)
r->dentry = dentry;
+ else
+ r->dentry = dget(dentry);
- /* FIXME: we need to iput all inodes on error */
- if (!inode)
- GOTO(out_ctxt, rc = -EINVAL);
-
- if (cmd == OBD_BRW_WRITE) {
+ if (cmd & OBD_BRW_WRITE)
page = filter_get_page_write(inode, index, r);
-
- /* We unlock the page to avoid deadlocks with
- * the page I/O because we are preparing
- * multiple pages at one time and we have lock
- * ordering problems. Lustre I/O and disk I/O
- * on this page can happen concurrently.
- */
- } else
+ else
page = lustre_get_page_read(inode, index);
- /* FIXME: we need to clean up here... */
- if (IS_ERR(page))
- GOTO(out_ctxt, rc = PTR_ERR(page));
+ if (IS_ERR(page)) {
+ dput(dentry);
+ GOTO(out_clean, rc = PTR_ERR(page));
+ }
r->addr = page_address(page);
r->offset = b->offset;
}
}
- if (cmd == OBD_BRW_WRITE) {
- /* FIXME: need to clean up here */
- rc = filter_journal_stop(journal_save, &obddev->u.filter,
- *desc_private);
+out_stop:
+ if (cmd & OBD_BRW_WRITE) {
+ int err = filter_journal_stop(journal_save, &obddev->u.filter,
+ *desc_private);
+ if (!rc)
+ rc = err;
}
out_ctxt:
pop_ctxt(&saved);
RETURN(rc);
+out_clean:
+ while (r-- > res) {
+ dput(r->dentry);
+ if (cmd & OBD_BRW_WRITE)
+ filter_commit_write(r->page, 0, PAGE_SIZE, rc);
+ else
+ lustre_put_page(r->page);
+ }
+ goto out_stop;
}
static int filter_write_locked_page(struct niobuf_local *lnb)
int rc;
lpage = lustre_get_page_write(lnb->dentry->d_inode, lnb->page->index);
- /* XXX */
+ if (IS_ERR(lpage)) {
+ /* It is highly unlikely that we would ever get an error here.
+ * The page we want to get was previously locked, so it had to
+ * have already allocated the space, and we were just writing
+ * over the same data, so there would be no hole in the file.
+ *
+ * XXX: possibility of a race with truncate could exist, need
+ * to check that. There are no guarantees w.r.t.
+ * write order even on a local filesystem, although the
+ * normal response would be to return the number of bytes
+ * successfully written and leave the rest to the app.
+ */
+ rc = PTR_ERR(lpage);
+ CERROR("error getting locked page index %ld: rc = %d\n",
+ lnb->page->index, rc);
+ GOTO(out, rc);
+ }
memcpy(page_address(lpage), kmap(lnb->page), PAGE_SIZE);
+ rc = lustre_commit_write(lpage, 0, PAGE_SIZE);
+ if (rc)
+ CERROR("error committing locked page %ld: rc = %d\n",
+ lnb->page->index, rc);
+out:
kunmap(lnb->page);
__free_pages(lnb->page, 0);
-
- rc = lustre_commit_page(lpage, 0, PAGE_SIZE);
dput(lnb->dentry);
return rc;
struct niobuf_local *r = res;
void *journal_save;
int found_locked = 0;
+ int rc = 0;
int i;
ENTRY;
- // if (cmd == OBD_BRW_WRITE)
push_ctxt(&saved, &conn->oc_dev->u.filter.fo_ctxt);
journal_save = current->journal_info;
if (journal_save)
for (j = 0 ; j < o->ioo_bufcnt ; j++, r++) {
struct page *page = r->page;
- /* If there was an error setting up a particular page
- * for I/O we still need to continue with the rest of
- * the pages in order to balance prepate/commit_write
- * calls, and to complete as much I/O as possible.
- */
if (!page)
LBUG();
continue;
}
- if (cmd == OBD_BRW_WRITE) {
- int rc;
- rc = lustre_commit_page(page, 0, PAGE_SIZE);
+ if (cmd & OBD_BRW_WRITE) {
+ int err = filter_commit_write(page, 0,
+ PAGE_SIZE, 0);
- /* FIXME: still need to iput the other inodes */
- if (rc)
- RETURN(rc);
+ if (!rc)
+ rc = err;
} else
lustre_put_page(page);
}
}
if (!found_locked)
- goto out;
+ goto out_ctxt;
for (i = 0; i < objcount; i++, obj++) {
int j;
for (j = 0 ; j < o->ioo_bufcnt ; j++, r++) {
- int rc;
+ int err;
if (!(r->flags & N_LOCAL_TEMP_PAGE))
continue;
- rc = filter_write_locked_page(r);
- /* XXX */
+ err = filter_write_locked_page(r);
+ if (!rc)
+ rc = err;
}
}
-out:
+out_ctxt:
current->journal_info = journal_save;
pop_ctxt(&saved);
RETURN(0);
}
rc = obdfs_brw(OBD_BRW_READ, inode, page, 0);
- if ( !rc ) {
+ if (!rc)
SetPageUptodate(page);
- }
prepare_done:
set_page_dirty(page);
return 0;
}
+#define OST_NUM_THREADS 6
+
/* mount the file system (secretly) */
static int ost_setup(struct obd_device *obddev, obd_count len, void *buf)
{
struct ost_obd *ost = &obddev->u.ost;
struct obd_device *tgt;
int err;
+ int i;
ENTRY;
if (data->ioc_dev < 0 || data->ioc_dev > MAX_OBD_DEVICES)
GOTO(error_disc, err = -EINVAL);
}
- err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
- if (err)
- GOTO(error_disc, err = -EINVAL);
- err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
- if (err)
- GOTO(error_disc, err = -EINVAL);
- err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
- if (err)
- GOTO(error_disc, err = -EINVAL);
- err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
- if (err)
- GOTO(error_disc, err = -EINVAL);
- err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
- if (err)
- GOTO(error_disc, err = -EINVAL);
- err = ptlrpc_start_thread(obddev, ost->ost_service, "lustre_ost");
- if (err)
- GOTO(error_disc, err = -EINVAL);
+ for (i = 0; i < OST_NUM_THREADS; i++) {
+ err = ptlrpc_start_thread(obddev, ost->ost_service,
+ "lustre_ost");
+ if (err) {
+ CERROR("error starting thread #%d: rc %d\n", i, err);
+ GOTO(error_disc, err = -EINVAL);
+ }
+ }
RETURN(0);
insmod $MODULE
}
+do_rmmod() {
+ MODULE=$1
+ [ "$MODULE" ] || fail "usage: $0 <module>"
+ lsmod | grep -q $MODULE || return 0
+ rmmod $MODULE || lsmod | sed "s/^/$MODULE failed: /"
+}
+
# Return the next unused loop device on stdout and in the $LOOPDEV
# environment variable.
next_loop_dev() {
quit
EOF
- rmmod kqswnal
- rmmod ksocknal
- rmmod portals
+ do_rmmod kqswnal
+ do_rmmod ksocknal
+ do_rmmod portals
}
cleanup_lustre() {
losetup -d ${LOOP}1
losetup -d ${LOOP}2
- rmmod llite
- rmmod mdc
-
- rmmod mds_extN
- rmmod mds_ext3
- rmmod mds_ext2
- rmmod mds
- rmmod ost
- rmmod osc
- rmmod obdecho
- rmmod obdfilter
- rmmod obdext2
- rmmod extN
-
- rmmod ldlm
- rmmod ptlrpc
- rmmod obdclass
+ do_rmmod llite
+ do_rmmod mdc
+
+ do_rmmod mds_extN
+ do_rmmod mds_ext3
+ do_rmmod mds_ext2
+ do_rmmod mds
+ do_rmmod ost
+ do_rmmod osc
+ do_rmmod obdecho
+ do_rmmod obdfilter
+ do_rmmod obdext2
+ do_rmmod extN
+
+ do_rmmod ldlm
+ do_rmmod ptlrpc
+ do_rmmod obdclass
}
cleanup_ldlm() {