From e40cd10fb71908b57e92c2174e5795c67e3caf9a Mon Sep 17 00:00:00 2001 From: adilger Date: Sat, 8 Jun 2002 00:58:00 +0000 Subject: [PATCH] Major fixups for multi-page I/Os in filterobd. A few stubs for statfs on the filesystem. --- lustre/extN/Makefile.am | 2 +- lustre/include/linux/lustre_idl.h | 11 +- lustre/include/linux/lustre_lib.h | 2 + lustre/include/linux/lustre_net.h | 5 +- lustre/include/linux/obd.h | 5 +- lustre/include/linux/obd_class.h | 11 +- lustre/obdecho/echo.c | 13 +- lustre/obdfilter/filter.c | 358 ++++++++++++++++++++++++++++++++++++-- lustre/osc/osc_request.c | 2 +- lustre/ost/ost_handler.c | 61 +++---- 10 files changed, 397 insertions(+), 73 deletions(-) diff --git a/lustre/extN/Makefile.am b/lustre/extN/Makefile.am index 317cf2c..5202d9e 100644 --- a/lustre/extN/Makefile.am +++ b/lustre/extN/Makefile.am @@ -54,7 +54,7 @@ patch-stamp: $(EXTNP) patch -p0 < $(srcdir)/extN.patch-$(RELEASE); \ else \ list='$(EXTNP)'; for p in $$list; do \ - sed $(SUB) $(srcdir)/$$p | (cd $(top_srcdir); patch -p1); \ + sed $(SUB) $(srcdir)/$$p|(cd $(top_srcdir);patch -p1)||exit -1;\ done; \ echo "It is OK if the following patch fails"; \ (cd $(top_srcdir); patch -N -p1) < $(srcdir)/extN-2.4.18-exports.diff; \ diff --git a/lustre/include/linux/lustre_idl.h b/lustre/include/linux/lustre_idl.h index 66dbfea..a1e329e 100644 --- a/lustre/include/linux/lustre_idl.h +++ b/lustre/include/linux/lustre_idl.h @@ -69,13 +69,18 @@ struct niobuf_remote { }; struct niobuf_local { - __u64 addr; __u64 offset; __u32 len; __u32 xid; - void *page; + __u32 flags; + void *addr; + struct page *page; + void *target_private; + struct dentry *dentry; }; +#define N_LOCAL_TEMP_PAGE 0x00000001 + /* * OST requests: OBDO & OBD request records */ @@ -92,6 +97,7 @@ struct niobuf_local { #define OST_PUNCH 9 #define OST_OPEN 10 #define OST_CLOSE 11 +#define OST_STATFS 12 typedef uint64_t obd_id; @@ -188,6 +194,7 @@ struct obd_ioobj { #define MDS_REINT 4 #define MDS_READPAGE 6 #define MDS_CONNECT 7 +#define MDS_STATFS 8 #define REINT_SETATTR 1 #define REINT_CREATE 2 diff --git a/lustre/include/linux/lustre_lib.h b/lustre/include/linux/lustre_lib.h index 29232ba..e7ae204 100644 --- a/lustre/include/linux/lustre_lib.h +++ b/lustre/include/linux/lustre_lib.h @@ -37,6 +37,8 @@ inline void lustre_put_page(struct page *page); struct page *lustre_get_page_read(struct inode *dir, unsigned long index); struct page *lustre_get_page_write(struct inode *dir, unsigned long index); int lustre_commit_page(struct page *page, unsigned from, unsigned to); +void set_page_clean(struct page *page); +void set_page_dirty(struct page *page); /* simple.c */ struct obd_run_ctxt; diff --git a/lustre/include/linux/lustre_net.h b/lustre/include/linux/lustre_net.h index 01919dd..ae665db 100644 --- a/lustre/include/linux/lustre_net.h +++ b/lustre/include/linux/lustre_net.h @@ -183,6 +183,8 @@ struct ptlrpc_bulk_page { int b_buflen; struct page *b_page; __u32 b_xid; + __u32 b_flags; + struct dentry *b_dentry; int (*b_cb)(struct ptlrpc_bulk_page *); ptl_md_t b_md; @@ -193,6 +195,7 @@ struct ptlrpc_bulk_page { struct ptlrpc_bulk_desc { int b_flags; struct ptlrpc_connection *b_connection; + struct ptlrpc_client *b_client; __u32 b_portal; int (*b_cb)(struct ptlrpc_bulk_desc *); struct obd_conn b_conn; @@ -201,7 +204,7 @@ struct ptlrpc_bulk_desc { struct list_head b_page_list; __u32 b_page_count; __u32 b_finished_count; - void *b_journal_info; + void *b_desc_private; }; struct ptlrpc_thread { diff --git a/lustre/include/linux/obd.h b/lustre/include/linux/obd.h index 70574be..2c63782 100644 --- a/lustre/include/linux/obd.h +++ b/lustre/include/linux/obd.h @@ -246,10 +246,11 @@ struct obd_ops { int (*o_preprw)(int cmd, struct obd_conn *conn, int objcount, struct obd_ioobj *obj, int niocount, struct niobuf_remote *remote, - struct niobuf_local *local); + struct niobuf_local *local, void **desc_private); int (*o_commitrw)(int cmd, struct obd_conn *conn, int objcount, struct obd_ioobj *obj, - int niocount, struct niobuf_local *local); + int niocount, struct niobuf_local *local, + void *desc_private); int (*o_enqueue)(struct obd_conn *conn, struct ldlm_namespace *ns, struct ldlm_handle *parent_lock, __u64 *res_id, __u32 type, struct ldlm_extent *, __u32 mode, diff --git a/lustre/include/linux/obd_class.h b/lustre/include/linux/obd_class.h index 8c6b65d..40d7343 100644 --- a/lustre/include/linux/obd_class.h +++ b/lustre/include/linux/obd_class.h @@ -309,27 +309,28 @@ static inline int obd_brw(int rw, struct obd_conn *conn, obd_count num_oa, static inline int obd_preprw(int cmd, struct obd_conn *conn, int objcount, struct obd_ioobj *obj, int niocount, struct niobuf_remote *remote, - struct niobuf_local *local) + struct niobuf_local *local, void **desc_private) { int rc; OBD_CHECK_SETUP(conn); OBD_CHECK_OP(conn, preprw); rc = OBP(conn->oc_dev, preprw)(cmd, conn, objcount, obj, niocount, - remote, local); + remote, local, desc_private); RETURN(rc); } static inline int obd_commitrw(int cmd, struct obd_conn *conn, int objcount, struct obd_ioobj *obj, - int niocount, struct niobuf_local *local) + int niocount, struct niobuf_local *local, + void *desc_private) { int rc; OBD_CHECK_SETUP(conn); OBD_CHECK_OP(conn, commitrw); rc = OBP(conn->oc_dev, commitrw)(cmd, conn, objcount, obj, niocount, - local); + local, desc_private); RETURN(rc); } @@ -409,7 +410,7 @@ static __inline__ struct obdo *obdo_alloc(void) static __inline__ void obdo_free(struct obdo *oa) { - if ( !oa ) + if (!oa) return; kmem_cache_free(obdo_cachep, oa); } diff --git a/lustre/obdecho/echo.c b/lustre/obdecho/echo.c index 52e5a18..d3cd0a7 100644 --- a/lustre/obdecho/echo.c +++ b/lustre/obdecho/echo.c @@ -67,7 +67,7 @@ static int echo_getattr(struct obd_conn *conn, struct obdo *oa) int echo_preprw(int cmd, struct obd_conn *conn, int objcount, struct obd_ioobj *obj, int niocount, struct niobuf_remote *nb, - struct niobuf_local *res) + struct niobuf_local *res, void **desc_private) { struct niobuf_local *r = res; int rc = 0; @@ -106,9 +106,9 @@ int echo_preprw(int cmd, struct obd_conn *conn, int objcount, } */ - r->addr = address; r->offset = nb->offset; r->page = virt_to_page(address); + r->addr = kmap(r->page); r->len = nb->len; // r->flags } @@ -125,9 +125,9 @@ preprw_cleanup: */ CERROR("cleaning up %ld pages (%d obdos)\n", (long)(r - res), objcount); while (r-- > res) { - unsigned long addr = r->addr; + kunmap(r->page); - free_pages(addr, 0); + __free_pages(r->page, 0); echo_pages--; } memset(res, 0, sizeof(*res) * niocount); @@ -136,7 +136,8 @@ preprw_cleanup: } int echo_commitrw(int cmd, struct obd_conn *conn, int objcount, - struct obd_ioobj *obj, int niocount, struct niobuf_local *res) + struct obd_ioobj *obj, int niocount, struct niobuf_local *res, + void *desc_private) { struct niobuf_local *r = res; int rc = 0; @@ -207,7 +208,7 @@ static void __exit obdecho_exit(void) obd_unregister_type(OBD_ECHO_DEVICENAME); } -MODULE_AUTHOR("Peter J. Braam "); +MODULE_AUTHOR("Cluster Filesystems Inc. "); MODULE_DESCRIPTION("Lustre Testing Echo OBD driver v1.0"); MODULE_LICENSE("GPL"); diff --git a/lustre/obdfilter/filter.c b/lustre/obdfilter/filter.c index ac2c3b9..0aa5381 100644 --- a/lustre/obdfilter/filter.c +++ b/lustre/obdfilter/filter.c @@ -16,6 +16,8 @@ #include #include +#include +#include extern struct obd_device obd_dev[MAX_OBD_DEVICES]; long filter_memory; @@ -896,73 +898,377 @@ struct inode *ioobj_to_inode(struct obd_conn *conn, struct obd_ioobj *o) RETURN(inode); } +/* + * Calculate the number of buffer credits needed to write multiple pages in + * a single ext3/extN transaction. No, this shouldn't be here, but as yet + * ext3 doesn't have a nice API for calculating this sort of thing in advance. + * + * See comment above ext3_writepage_trans_blocks for details. We assume + * no data journaling is being done, but it does allow for all of the pages + * being non-contiguous. If we are guaranteed contiguous pages we could + * reduce the number of (d)indirect blocks a lot. + * + * With N blocks per page and P pages, for each inode we have at most: + * N*P indirect + * min(N*P, blocksize/4 + 1) dindirect blocks + * 1 tindirect + * + * For the entire filesystem, we have at most: + * min(sum(nindir + P), ngroups) bitmap blocks (from the above) + * min(sum(nindir + P), gdblocks) group descriptor blocks (from the above) + * 1 inode block + * 1 superblock + * 2 * EXT3_SINGLEDATA_TRANS_BLOCKS for the quota files + */ +static int ext3_credits_needed(struct super_block *sb, int objcount, + struct obd_ioobj *obj) +{ + struct obd_ioobj *o = obj; + int blockpp = 1 << (PAGE_CACHE_SHIFT - sb->s_blocksize_bits); + int addrpp = EXT3_ADDR_PER_BLOCK(sb) * blockpp; + int nbitmaps = 0; + int ngdblocks = 0; + int needed = objcount + 1; + int i; + + for (i = 0; i < objcount; i++, o++) { + int nblocks = o->ioo_bufcnt * blockpp; + int ndindirect = min(nblocks, addrpp + 1); + int nindir = nblocks + ndindirect + 1; + + nbitmaps += nindir + nblocks; + ngdblocks += nindir + nblocks; + + needed += nindir; + } + + if (nbitmaps > EXT3_SB(sb)->s_groups_count) + nbitmaps = EXT3_SB(sb)->s_groups_count; + if (ngdblocks > EXT3_SB(sb)->s_gdb_count) + ngdblocks = EXT3_SB(sb)->s_gdb_count; + + needed += nbitmaps + ngdblocks; + +#ifdef CONFIG_QUOTA + /* We assume that there will be 1 bit set in s_dquot.flags for each + * quota file that is active. This is at least true for now. + */ + needed += hweight32(sb_any_quota_enabled(sb)) * + EXT3_SINGLEDATA_TRANS_BLOCKS; +#endif + + return needed; +} + +/* We have to start a huge journal transaction here to hold all of the + * metadata for the pages being written here. This is necessitated by + * the fact that we do lots of prepare_write operations before we do + * any of the matching commit_write operations, so even if we split + * up to use "smaller" transactions none of them could complete until + * all of them were opened. By having a single journal transaction, + * we eliminate duplicate reservations for common blocks like the + * superblock and group descriptors or bitmaps. + * + * We will start the transaction here, but each prepare_write will + * add a refcount to the transaction, and each commit_write will + * remove a refcount. The transaction will be closed when all of + * the pages have been written. + */ +static void *ext3_filter_journal_start(struct filter_obd *filter, + int objcount, struct obd_ioobj *obj, + int niocount, struct niobuf_remote *nb) +{ + journal_t *journal = NULL; + handle_t *handle = NULL; + int needed; + + /* Assumes ext3 and extN have same sb_info layout, but avoids issues + * with having extN built properly before filterobd for now. + */ + journal = EXT3_SB(filter->fo_sb)->s_journal; + needed = ext3_credits_needed(filter->fo_sb, objcount, obj); + + /* The number of blocks we could _possibly_ dirty can very large. + * We reduce our request if it is absurd (and we couldn't get that + * many credits for a single handle anyways). + * + * At some point we have to limit the size of I/Os sent at one time, + * increase the size of the journal, or we have to calculate the + * actual journal requirements more carefully by checking all of + * the blocks instead of being maximally pessimistic. It remains to + * be seen if this is a real problem or not. + */ + if (needed > journal->j_max_transaction_buffers) { + CERROR("want too many journal credits (%d) using %d instead\n", + needed, journal->j_max_transaction_buffers); + needed = journal->j_max_transaction_buffers; + } + + handle = journal_start(journal, needed); + if (IS_ERR(handle)) + CERROR("can't get handle for %d credits: rc = %ld\n", needed, + PTR_ERR(handle)); + + return(handle); +} + +static void *filter_journal_start(void **journal_save, + struct filter_obd *filter, + int objcount, struct obd_ioobj *obj, + int niocount, struct niobuf_remote *nb) +{ + void *handle = NULL; + + /* This may not be necessary - we probably never have a + * transaction started when we enter here, so we can + * remove the saving of the journal state entirely. + * For now leave it in just to see if it ever happens. + */ + *journal_save = current->journal_info; + if (*journal_save) { + CERROR("Already have handle %p???\n", *journal_save); + LBUG(); + current->journal_info = NULL; + } + + if (!strcmp(filter->fo_fstype, "ext3") || + !strcmp(filter->fo_fstype, "extN")) + handle = ext3_filter_journal_start(filter, objcount, obj, + niocount, nb); + return handle; +} + +static int ext3_filter_journal_stop(void *handle) +{ + int rc; + + /* We got a refcount on the handle for each call to prepare_write, + * so we can drop the "parent" handle here to avoid the need for + * osc to call back into filterobd to close the handle. The + * remaining references will be dropped in commit_write. + */ + rc = journal_stop((handle_t *)handle); + + return rc; +} + +static int filter_journal_stop(void *journal_save, struct filter_obd *filter, + void *handle) +{ + int rc = 0; + + if (!strcmp(filter->fo_fstype, "ext3") || + !strcmp(filter->fo_fstype, "extN")) + rc = ext3_filter_journal_stop(handle); + + current->journal_info = journal_save; + + return rc; +} + +struct page *filter_get_page_write(struct inode *inode, unsigned long index, + struct niobuf_local *lnb) +{ + struct address_space *mapping = inode->i_mapping; + struct page *page; + int rc; + + //ASSERT_PAGE_INDEX(index, GOTO(err, rc = -EINVAL)); + page = grab_cache_page_nowait(mapping, index); /* locked page */ + + /* This page is currently locked, so we grab a new one temporarily */ + if (!page) { + unsigned long addr; + addr = __get_free_pages(GFP_KERNEL, 0); + if (!addr) { + CERROR("no memory for a temp page\n"); + LBUG(); + GOTO(err, rc = -ENOMEM); + } + page = virt_to_page(addr); + kmap(page); + page->index = index; + lnb->flags |= N_LOCAL_TEMP_PAGE; + } else if (!IS_ERR(page)) { + /* Note: Called with "O" and "PAGE_SIZE" this is essentially + * a no-op for most filesystems, because we write the whole + * page. For partial-page I/O this will read in the page. + */ + rc = mapping->a_ops->prepare_write(NULL, page, 0, PAGE_SIZE); + if (rc) { + CERROR("page index %lu, rc = %d\n", index, rc); + if (rc != -ENOSPC) + LBUG(); + GOTO(err_unlock, rc); + } + /* XXX not sure if we need this if we are overwriting page */ + if (PageError(page)) { + CERROR("error on page index %lu, rc = %d\n", index, rc); + LBUG(); + GOTO(err_unlock, rc = -EIO); + } + + kmap(page); + } + return page; + +err_unlock: + unlock_page(page); + lustre_put_page(page); +err: + return ERR_PTR(rc); +} + static int filter_preprw(int cmd, struct obd_conn *conn, int objcount, struct obd_ioobj *obj, int niocount, struct niobuf_remote *nb, - struct niobuf_local *res) + struct niobuf_local *res, void **desc_private) { struct obd_run_ctxt saved; + struct obd_device *obddev; struct obd_ioobj *o = obj; struct niobuf_remote *b = nb; struct niobuf_local *r = res; + void *journal_save = NULL; + int rc = 0; int i; ENTRY; memset(res, 0, sizeof(*res) * niocount); + obddev = conn->oc_dev; + + push_ctxt(&saved, &obddev->u.filter.fo_ctxt); + + if (cmd == OBD_BRW_WRITE) { + *desc_private = filter_journal_start(&journal_save, + &obddev->u.filter, + objcount, obj, niocount, + nb); + if (IS_ERR(*desc_private)) + GOTO(out_ctxt, rc = PTR_ERR(*desc_private)); + } - // if (cmd == OBD_BRW_WRITE) - push_ctxt(&saved, &conn->oc_dev->u.filter.fo_ctxt); for (i = 0; i < objcount; i++, o++) { + struct dentry *dentry; + struct inode *inode; int j; + + dentry = filter_fid2dentry(obddev, + filter_parent(obddev, S_IFREG), + o->ioo_id, S_IFREG); + inode = dentry->d_inode; + for (j = 0; j < o->ioo_bufcnt; j++, b++, r++) { unsigned long index = b->offset >> PAGE_SHIFT; - struct inode *inode = ioobj_to_inode(conn, o); struct page *page; + /* XXX We _might_ change this to a dcount if we + * wanted to pass a dentry pointer in the niobuf + * to avoid doing so many igets on an inode we + * already have. It appears to be solely for the + * purpose of having a refcount that we can drop + * in commitrw where we get one call per page. + */ + if (j > 0) + r->dentry = dget(dentry); + else + r->dentry = dentry; + /* FIXME: we need to iput all inodes on error */ if (!inode) RETURN(-EINVAL); - if (cmd == OBD_BRW_WRITE) - page = lustre_get_page_write(inode, index); - else + if (cmd == OBD_BRW_WRITE) { + page = filter_get_page_write(inode, index, r); + + /* We unlock the page to avoid deadlocks with + * the page I/O because we are preparing + * multiple pages at one time and we have lock + * ordering problems. Lustre I/O and disk I/O + * on this page can happen concurrently. + */ + } else page = lustre_get_page_read(inode, index); + + /* FIXME: we need to clean up here... */ if (IS_ERR(page)) RETURN(PTR_ERR(page)); - r->addr = (__u64)(unsigned long)page_address(page); + r->addr = page_address(page); r->offset = b->offset; r->page = page; r->len = PAGE_SIZE; } } - // if (cmd == OBD_BRW_WRITE) + + if (cmd == OBD_BRW_WRITE) { + /* FIXME: need to clean up here */ + rc = filter_journal_stop(journal_save, &obddev->u.filter, + *desc_private); + } +out_ctxt: pop_ctxt(&saved); - return(0); + RETURN(rc); +} + +static int filter_write_locked_page(struct niobuf_local *lnb) +{ + struct page *lpage; + int rc; + + lpage = lustre_get_page_write(lnb->dentry->d_inode, lnb->page->index); + /* XXX */ + + memcpy(page_address(lpage), kmap(lnb->page), PAGE_SIZE); + kunmap(lnb->page); + __free_pages(lnb->page, 0); + + rc = lustre_commit_page(lpage, 0, PAGE_SIZE); + dput(lnb->dentry); + + return rc; } static int filter_commitrw(int cmd, struct obd_conn *conn, int objcount, struct obd_ioobj *obj, - int niocount, struct niobuf_local *res) + int niocount, struct niobuf_local *res, + void *private) { struct obd_run_ctxt saved; struct obd_ioobj *o = obj; struct niobuf_local *r = res; + void *journal_save; + int found_locked = 0; int i; ENTRY; // if (cmd == OBD_BRW_WRITE) push_ctxt(&saved, &conn->oc_dev->u.filter.fo_ctxt); + journal_save = current->journal_info; + if (journal_save) + CERROR("Existing handle %p???\n", journal_save); + current->journal_info = private; for (i = 0; i < objcount; i++, obj++) { int j; for (j = 0 ; j < o->ioo_bufcnt ; j++, r++) { struct page *page = r->page; - if (!r->page) + /* If there was an error setting up a particular page + * for I/O we still need to continue with the rest of + * the pages in order to balance prepate/commit_write + * calls, and to complete as much I/O as possible. + */ + if (!page) LBUG(); + if (r->flags & N_LOCAL_TEMP_PAGE) { + found_locked = 1; + continue; + } + if (cmd == OBD_BRW_WRITE) { - int rc = lustre_commit_page(page, 0, PAGE_SIZE); + int rc; + rc = lustre_commit_page(page, 0, PAGE_SIZE); /* FIXME: still need to iput the other inodes */ if (rc) @@ -970,15 +1276,32 @@ static int filter_commitrw(int cmd, struct obd_conn *conn, } else lustre_put_page(page); - CDEBUG(D_INODE, "put inode %p (%ld), count = %d, nlink = %d\n", + CDEBUG(D_INODE, + "put inode %p (%ld), count = %d, nlink = %d\n", page->mapping->host, page->mapping->host->i_ino, atomic_read(&page->mapping->host->i_count) - 1, page->mapping->host->i_nlink); - iput(page->mapping->host); + dput(r->dentry); } } - // if (cmd == OBD_BRW_WRITE) + if (!found_locked) + goto out; + + for (i = 0; i < objcount; i++, obj++) { + int j; + for (j = 0 ; j < o->ioo_bufcnt ; j++, r++) { + int rc; + if (!(r->flags & N_LOCAL_TEMP_PAGE)) + continue; + + rc = filter_write_locked_page(r); + /* XXX */ + } + } + +out: + current->journal_info = journal_save; pop_ctxt(&saved); RETURN(0); } @@ -1041,8 +1364,7 @@ static int filter_get_info(struct obd_conn *conn, obd_count keylen, } -struct obd_ops filter_obd_ops = { - o_iocontrol: NULL, +static struct obd_ops filter_obd_ops = { o_get_info: filter_get_info, o_setup: filter_setup, o_cleanup: filter_cleanup, diff --git a/lustre/osc/osc_request.c b/lustre/osc/osc_request.c index d32f29d..fe83d9b 100644 --- a/lustre/osc/osc_request.c +++ b/lustre/osc/osc_request.c @@ -497,7 +497,7 @@ static int osc_brw_write(struct obd_conn *conn, obd_count num_oa, for (pages = 0, i = 0; i < num_oa; i++) { ost_pack_ioo(&ptr1, oa[i], oa_bufs[i]); for (j = 0; j < oa_bufs[i]; j++, pages++) { - local[pages].addr = (__u64)(long)kmap(pagearray[pages]); + local[pages].addr = kmap(pagearray[pages]); local[pages].offset = offset[pages]; local[pages].len = count[pages]; ost_pack_niobuf(&ptr2, offset[pages], count[pages], diff --git a/lustre/ost/ost_handler.c b/lustre/ost/ost_handler.c index 22b7f2b..1276c8b 100644 --- a/lustre/ost/ost_handler.c +++ b/lustre/ost/ost_handler.c @@ -294,7 +294,7 @@ static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req) tmp1 = lustre_msg_buf(req->rq_reqmsg, 1); tmp2 = lustre_msg_buf(req->rq_reqmsg, 2); req->rq_status = obd_preprw(cmd, &conn, objcount, - tmp1, niocount, tmp2, local_nb); + tmp1, niocount, tmp2, local_nb, NULL); if (req->rq_status) GOTO(out_local, 0); @@ -323,7 +323,7 @@ static int ost_brw_read(struct ost_obd *obddev, struct ptlrpc_request *req) tmp1 = lustre_msg_buf(req->rq_reqmsg, 1); tmp2 = lustre_msg_buf(req->rq_reqmsg, 2); req->rq_status = obd_commitrw(cmd, &conn, objcount, - tmp1, niocount, local_nb); + tmp1, niocount, local_nb, NULL); out_bulk: ptlrpc_free_bulk(desc); @@ -333,41 +333,23 @@ out: RETURN(rc); } -static int ost_commit_page(struct obd_conn *conn, struct page *page) +static int ost_brw_write_cb(struct ptlrpc_bulk_page *bulk) { struct obd_ioobj obj; - struct niobuf_local buf; + struct niobuf_local lnb; int rc; ENTRY; - memset(&buf, 0, sizeof(buf)); + memset(&lnb, 0, sizeof(lnb)); memset(&obj, 0, sizeof(obj)); - buf.page = page; + lnb.page = bulk->b_page; + lnb.dentry = bulk->b_dentry; + lnb.flags = bulk->b_flags; obj.ioo_bufcnt = 1; - rc = obd_commitrw(OBD_BRW_WRITE, conn, 1, &obj, 1, &buf); - RETURN(rc); -} - -static int ost_brw_write_cb(struct ptlrpc_bulk_page *bulk) -{ - void *journal_save; - int rc; - ENTRY; - - /* Restore the filesystem journal context when we do the commit. - * This is needed for ext3 and reiserfs, but can't really hurt - * other filesystems. - */ - journal_save = current->journal_info; - current->journal_info = bulk->b_desc->b_journal_info; - CDEBUG(D_BUFFS, "journal_info: saved %p->%p, restored %p\n", current, - journal_save, bulk->b_desc->b_journal_info); - rc = ost_commit_page(&bulk->b_desc->b_conn, bulk->b_page); - current->journal_info = journal_save; - CDEBUG(D_BUFFS, "journal_info: restored %p->%p\n", current, - journal_save); + rc = obd_commitrw(OBD_BRW_WRITE, &bulk->b_desc->b_conn, 1, &obj, 1, + &lnb, bulk->b_desc->b_desc_private); if (rc) CERROR("ost_commit_page failed: %d\n", rc); @@ -391,6 +373,7 @@ static int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req) struct ost_body *body; int cmd, rc, i, j, objcount, niocount, size[2] = {sizeof(*body)}; void *tmp1, *tmp2, *end2; + void *desc_priv = NULL; ENTRY; body = lustre_msg_buf(req->rq_reqmsg, 0); @@ -428,7 +411,7 @@ static int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req) tmp1 = lustre_msg_buf(req->rq_reqmsg, 1); tmp2 = lustre_msg_buf(req->rq_reqmsg, 2); req->rq_status = obd_preprw(cmd, &conn, objcount, - tmp1, niocount, tmp2, local_nb); + tmp1, niocount, tmp2, local_nb, &desc_priv); if (req->rq_status) GOTO(out_free, rc = 0); /* XXX is this correct? */ @@ -437,13 +420,9 @@ static int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req) GOTO(fail_preprw, rc = -ENOMEM); desc->b_cb = ost_brw_write_finished_cb; desc->b_portal = OSC_BULK_PORTAL; + desc->b_desc_private = desc_priv; memcpy(&(desc->b_conn), &conn, sizeof(conn)); - /* Save journal context for commit callbacks */ - CDEBUG(D_BUFFS, "journal_info: saved %p->%p\n", current, - current->journal_info); - desc->b_journal_info = current->journal_info; - for (i = 0, lnb = local_nb; i < niocount; i++, lnb++) { struct ptlrpc_service *srv = req->rq_obd->u.ost.ost_service; struct ptlrpc_bulk_page *bulk; @@ -456,8 +435,10 @@ static int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req) bulk->b_xid = srv->srv_xid++; spin_unlock(&srv->srv_lock); - bulk->b_buf = (void *)(unsigned long)lnb->addr; + bulk->b_buf = lnb->addr; bulk->b_page = lnb->page; + bulk->b_flags = lnb->flags; + bulk->b_dentry = lnb->dentry; bulk->b_buflen = PAGE_SIZE; bulk->b_cb = ost_brw_write_cb; @@ -467,7 +448,6 @@ static int ost_brw_write(struct ost_obd *obddev, struct ptlrpc_request *req) } rc = ptlrpc_register_bulk(desc); - current->journal_info = NULL; /* kind of scary */ if (rc) GOTO(fail_bulk, rc); @@ -479,8 +459,8 @@ out: fail_bulk: ptlrpc_free_bulk(desc); - /* FIXME: how do we undo the preprw? */ fail_preprw: + /* FIXME: how do we undo the preprw? */ goto out_free; } @@ -569,6 +549,13 @@ static int ost_handle(struct obd_device *obddev, struct ptlrpc_service *svc, OBD_FAIL_RETURN(OBD_FAIL_OST_PUNCH_NET, 0); rc = ost_punch(ost, req); break; +#if 0 + case OST_STATFS: + CDEBUG(D_INODE, "statfs\n"); + OBD_FAIL_RETURN(OBD_FAIL_OST_STATFS_NET, 0); + rc = ost_statfs(ost, req); + break; +#endif default: req->rq_status = -ENOTSUPP; rc = ptlrpc_error(svc, req); -- 1.8.3.1