From 474ca116ee807a6a042c5c15d6eadd661b28c765 Mon Sep 17 00:00:00 2001 From: phil Date: Tue, 9 Sep 2003 03:54:29 +0000 Subject: [PATCH] merge b_llpmd into b_devel. the major highlights: - new I/O backend - new client page cache and llite/lov/osc plumbing - pre-creation of OST objects - most of the OBD protocol now revolves around exports, not obd_devices --- .../kernel_patches/patches/add_page_private.patch | 15 ++ lustre/kernel_patches/pc/add_page_private.pc | 1 + lustre/llite/llite_lib.c | 137 +++++-------- lustre/llite/rw24.c | 183 +++++++++++++++++ lustre/llite/rw26.c | 118 +++++++++++ lustre/mdc/Makefile.mk | 2 +- lustre/osc/osc_internal.h | 17 ++ lustre/osc/osc_rpcd.c | 223 +++++++++++++++++++++ 8 files changed, 607 insertions(+), 89 deletions(-) create mode 100644 lustre/kernel_patches/patches/add_page_private.patch create mode 100644 lustre/kernel_patches/pc/add_page_private.pc create mode 100644 lustre/llite/rw24.c create mode 100644 lustre/llite/rw26.c create mode 100644 lustre/osc/osc_internal.h create mode 100644 lustre/osc/osc_rpcd.c diff --git a/lustre/kernel_patches/patches/add_page_private.patch b/lustre/kernel_patches/patches/add_page_private.patch new file mode 100644 index 0000000..f82fb92 --- /dev/null +++ b/lustre/kernel_patches/patches/add_page_private.patch @@ -0,0 +1,15 @@ + include/linux/mm.h | 1 + + 1 files changed, 1 insertion(+) + +--- linux-2.4.20-b_llpio-l21/include/linux/mm.h~add_page_private 2003-07-21 21:42:50.000000000 -0700 ++++ linux-2.4.20-b_llpio-l21-zab/include/linux/mm.h 2003-07-21 21:44:16.000000000 -0700 +@@ -162,6 +162,7 @@ typedef struct page { + protected by pagemap_lru_lock !! */ + struct page **pprev_hash; /* Complement to *next_hash. */ + struct buffer_head * buffers; /* Buffer maps us to a disk block. */ ++ unsigned long private; + + /* + * On machines where all RAM is mapped into kernel address space, + +_ diff --git a/lustre/kernel_patches/pc/add_page_private.pc b/lustre/kernel_patches/pc/add_page_private.pc new file mode 100644 index 0000000..476581c --- /dev/null +++ b/lustre/kernel_patches/pc/add_page_private.pc @@ -0,0 +1 @@ +include/linux/mm.h diff --git a/lustre/llite/llite_lib.c b/lustre/llite/llite_lib.c index 07e9278..667d8c2 100644 --- a/lustre/llite/llite_lib.c +++ b/lustre/llite/llite_lib.c @@ -100,11 +100,19 @@ void ll_options(char *options, char **ost, char **mds, int *flags) while ((this_char = strsep (&opt_ptr, ",")) != NULL) { #endif CDEBUG(D_SUPER, "this_char %s\n", this_char); - if ((!*ost && (*ost = ll_read_opt("osc", this_char)))|| - (!*mds && (*mds = ll_read_opt("mdc", this_char)))|| - (!(*flags & LL_SBI_NOLCK) && - ((*flags) = (*flags) | - ll_set_opt("nolock", this_char, LL_SBI_NOLCK)))) + if (!*ost && (*ost = ll_read_opt("osc", this_char))) + continue; + if (!*mds && (*mds = ll_read_opt("mdc", this_char))) + continue; + if (!(*flags & LL_SBI_NOLCK) && + ((*flags) = (*flags) | + ll_set_opt("nolock", this_char, + LL_SBI_NOLCK))) + continue; + if (!(*flags & LL_SBI_READAHEAD) && + ((*flags) = (*flags) | + ll_set_opt("readahead", this_char, + LL_SBI_READAHEAD))) continue; } EXIT; @@ -113,17 +121,8 @@ void ll_options(char *options, char **ost, char **mds, int *flags) void ll_lli_init(struct ll_inode_info *lli) { sema_init(&lli->lli_open_sem, 1); - spin_lock_init(&lli->lli_read_extent_lock); - INIT_LIST_HEAD(&lli->lli_read_extents); lli->lli_flags = 0; lli->lli_maxbytes = PAGE_CACHE_MAXBYTES; -#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0)) - spin_lock_init(&lli->lli_pg_lock); - INIT_LIST_HEAD(&lli->lli_lc_item); - plist_init(&lli->lli_pl_read); - plist_init(&lli->lli_pl_write); - atomic_set(&lli->lli_in_writepages, 0); -#endif } int ll_fill_super(struct super_block *sb, void *data, int silent) @@ -137,6 +136,8 @@ int ll_fill_super(struct super_block *sb, void *data, int silent) struct ll_fid rootfid; struct obd_statfs osfs; struct ptlrpc_request *request = NULL; + struct lustre_handle osc_conn = {0, }; + struct lustre_handle mdc_conn = {0, }; struct lustre_md md; class_uuid_t uuid; @@ -179,11 +180,12 @@ int ll_fill_super(struct super_block *sb, void *data, int silent) CERROR("could not register mount in /proc/lustre"); } - err = obd_connect(&sbi->ll_mdc_conn, obd, &sbi->ll_sb_uuid); + err = obd_connect(&mdc_conn, obd, &sbi->ll_sb_uuid); if (err) { CERROR("cannot connect to %s: rc = %d\n", mdc, err); GOTO(out_free, err); } + sbi->ll_mdc_exp = class_conn2export(&mdc_conn); err = obd_statfs(obd, &osfs, jiffies - HZ); if (err) @@ -201,13 +203,14 @@ int ll_fill_super(struct super_block *sb, void *data, int silent) GOTO(out_mdc, err); } - err = obd_connect(&sbi->ll_osc_conn, obd, &sbi->ll_sb_uuid); + err = obd_connect(&osc_conn, obd, &sbi->ll_sb_uuid); if (err) { CERROR("cannot connect to %s: rc = %d\n", osc, err); GOTO(out_mdc, err); } + sbi->ll_osc_exp = class_conn2export(&osc_conn); - err = mdc_getstatus(&sbi->ll_mdc_conn, &rootfid); + err = mdc_getstatus(sbi->ll_mdc_exp, &rootfid); if (err) { CERROR("cannot mds_connect: rc = %d\n", err); GOTO(out_osc, err); @@ -219,30 +222,18 @@ int ll_fill_super(struct super_block *sb, void *data, int silent) /* make root inode * XXX: move this to after cbd setup? */ - err = mdc_getattr(&sbi->ll_mdc_conn, &rootfid, + err = mdc_getattr(sbi->ll_mdc_exp, &rootfid, OBD_MD_FLNOTOBD|OBD_MD_FLBLOCKS, 0, &request); if (err) { CERROR("mdc_getattr failed for root: rc = %d\n", err); GOTO(out_osc, err); } - /* initialize committed transaction callback daemon */ - spin_lock_init(&sbi->ll_commitcbd_lock); - init_waitqueue_head(&sbi->ll_commitcbd_waitq); - init_waitqueue_head(&sbi->ll_commitcbd_ctl_waitq); - sbi->ll_commitcbd_flags = 0; - err = ll_commitcbd_setup(sbi); - if (err) { - CERROR("failed to start commit callback daemon: rc = %d\n",err); - ptlrpc_req_finished (request); - GOTO(out_lliod, err); - } - - err = mdc_req2lustre_md(request, 0, &sbi->ll_osc_conn, &md); + err = mdc_req2lustre_md(request, 0, sbi->ll_osc_exp, &md); if (err) { CERROR("failed to understand root inode md: rc = %d\n",err); ptlrpc_req_finished (request); - GOTO(out_lliod, err); + GOTO(out_osc, err); } LASSERT(sbi->ll_rootino != 0); @@ -253,17 +244,9 @@ int ll_fill_super(struct super_block *sb, void *data, int silent) if (root == NULL || is_bad_inode(root)) { /* XXX might need iput() for bad inode */ CERROR("lustre_lite: bad iget4 for root\n"); - GOTO(out_cbd, err = -EBADF); + GOTO(out_osc, err = -EBADF); } -#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0)) - /* initialize the pagecache writeback thread */ - err = lliod_start(sbi, root); - if (err) { - CERROR("failed to start lliod: rc = %d\n",err); - GOTO(out_root, sb = NULL); - } -#endif sb->s_root = d_alloc_root(root); out_dev: @@ -274,20 +257,11 @@ out_dev: RETURN(err); -#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0)) -out_root: iput(root); -#endif -out_cbd: - ll_commitcbd_cleanup(sbi); -out_lliod: -#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0)) - lliod_stop(sbi); -#endif out_osc: - obd_disconnect(&sbi->ll_osc_conn, 0); + obd_disconnect(sbi->ll_osc_exp, 0); out_mdc: - obd_disconnect(&sbi->ll_mdc_conn, 0); + obd_disconnect(sbi->ll_mdc_exp, 0); out_free: lprocfs_unregister_mountpoint(sbi); OBD_FREE(sbi, sizeof(*sbi)); @@ -298,18 +272,14 @@ out_free: void ll_put_super(struct super_block *sb) { struct ll_sb_info *sbi = ll_s2sbi(sb); - struct obd_device *obd = class_conn2obd(&sbi->ll_mdc_conn); + struct obd_device *obd = class_exp2obd(sbi->ll_mdc_exp); struct hlist_node *tmp, *next; struct ll_fid rootfid; ENTRY; CDEBUG(D_VFSTRACE, "VFS Op: sb %p\n", sb); list_del(&sbi->ll_conn_chain); - ll_commitcbd_cleanup(sbi); -#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0)) - lliod_stop(sbi); -#endif - obd_disconnect(&sbi->ll_osc_conn, 0); + obd_disconnect(sbi->ll_osc_exp, 0); /* NULL request to force sync on the MDS, and get the last_committed * value to flush remaining RPCs from the sending queue on client. @@ -318,7 +288,7 @@ void ll_put_super(struct super_block *sb) * which we can call for other reasons as well. */ if (!obd->obd_no_recov) - mdc_getstatus(&sbi->ll_mdc_conn, &rootfid); + mdc_getstatus(sbi->ll_mdc_exp, &rootfid); lprocfs_unregister_mountpoint(sbi); if (sbi->ll_proc_root) { @@ -326,9 +296,9 @@ void ll_put_super(struct super_block *sb) sbi->ll_proc_root = NULL; } - obd_disconnect(&sbi->ll_mdc_conn, 0); + obd_disconnect(sbi->ll_mdc_exp, 0); -#warning Why do we need this? +#warning We do this to get rid of orphaned dentries. That is not really trw. spin_lock(&dcache_lock); hlist_for_each_safe(tmp, next, &sbi->ll_orphan_dentry_list) { struct dentry *dentry = hlist_entry(tmp, struct dentry, d_hash); @@ -376,14 +346,15 @@ void ll_clear_inode(struct inode *inode) inode->i_generation, inode); ll_inode2fid(&fid, inode); - mdc_change_cbdata(&sbi->ll_mdc_conn, &fid, null_if_equal, inode); + clear_bit(LLI_F_HAVE_MDS_SIZE_LOCK, &(ll_i2info(inode)->lli_flags)); + mdc_change_cbdata(sbi->ll_mdc_exp, &fid, null_if_equal, inode); if (lli->lli_smd) - obd_change_cbdata(&sbi->ll_osc_conn, lli->lli_smd, + obd_change_cbdata(sbi->ll_osc_exp, lli->lli_smd, null_if_equal, inode); if (lli->lli_smd) { - obd_free_memmd(&sbi->ll_osc_conn, &lli->lli_smd); + obd_free_memmd(sbi->ll_osc_exp, &lli->lli_smd); lli->lli_smd = NULL; } @@ -462,8 +433,8 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) struct lustre_md md; ll_prepare_mdc_op_data(&op_data, inode, NULL, NULL, 0, 0); - rc = mdc_setattr(&sbi->ll_mdc_conn, &op_data, - attr, NULL, 0, NULL, 0, &request); + rc = mdc_setattr(sbi->ll_mdc_exp, &op_data, + attr, NULL, 0, NULL, 0, &request); if (rc) { ptlrpc_req_finished(request); @@ -472,7 +443,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) RETURN(rc); } - rc = mdc_req2lustre_md(request, 0, &sbi->ll_osc_conn, &md); + rc = mdc_req2lustre_md(request, 0, sbi->ll_osc_exp, &md); if (rc) { ptlrpc_req_finished(request); RETURN(rc); @@ -508,7 +479,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) } if (ia_valid & ATTR_SIZE) { - struct ldlm_extent extent = { .start = attr->ia_size, + struct ldlm_extent extent = { .start = 0, .end = OBD_OBJECT_EOF }; struct lustre_handle lockh = { 0 }; int err, ast_flags = 0; @@ -543,7 +514,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) rc = vmtruncate(inode, attr->ia_size); if (rc == 0) - set_bit(LLI_F_HAVE_SIZE_LOCK, + set_bit(LLI_F_HAVE_OST_SIZE_LOCK, &ll_i2info(inode)->lli_flags); /* unlock now as we don't mind others file lockers racing with @@ -563,7 +534,7 @@ int ll_setattr_raw(struct inode *inode, struct iattr *attr) oa.o_valid = OBD_MD_FLID; obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME); - rc = obd_setattr(&sbi->ll_osc_conn, &oa, lsm, NULL); + rc = obd_setattr(sbi->ll_osc_exp, &oa, lsm, NULL); if (rc) CERROR("obd_setattr fails: rc=%d\n", rc); } @@ -584,7 +555,7 @@ int ll_statfs_internal(struct super_block *sb, struct obd_statfs *osfs, int rc; ENTRY; - rc = obd_statfs(class_conn2obd(&sbi->ll_mdc_conn), osfs, max_age); + rc = obd_statfs(class_exp2obd(sbi->ll_mdc_exp), osfs, max_age); if (rc) { CERROR("mdc_statfs fails: rc = %d\n", rc); RETURN(rc); @@ -593,7 +564,7 @@ int ll_statfs_internal(struct super_block *sb, struct obd_statfs *osfs, CDEBUG(D_SUPER, "MDC blocks "LPU64"/"LPU64" objects "LPU64"/"LPU64"\n", osfs->os_bavail, osfs->os_blocks, osfs->os_ffree,osfs->os_files); - rc = obd_statfs(class_conn2obd(&sbi->ll_osc_conn), &obd_osfs, max_age); + rc = obd_statfs(class_exp2obd(sbi->ll_osc_exp), &obd_osfs, max_age); if (rc) { CERROR("obd_statfs fails: rc = %d\n", rc); RETURN(rc); @@ -772,16 +743,6 @@ void ll_read_inode2(struct inode *inode, void *opaque) } } -int it_disposition(struct lookup_intent *it, int flag) -{ - return it->d.lustre.it_disposition & flag; -} - -void it_set_disposition(struct lookup_intent *it, int flag) -{ - it->d.lustre.it_disposition |= flag; -} - void ll_umount_begin(struct super_block *sb) { struct ll_sb_info *sbi = ll_s2sbi(sb); @@ -790,27 +751,27 @@ void ll_umount_begin(struct super_block *sb) ENTRY; CDEBUG(D_VFSTRACE, "VFS Op:\n"); - obd = class_conn2obd(&sbi->ll_mdc_conn); + obd = class_exp2obd(sbi->ll_mdc_exp); if (obd == NULL) { CERROR("Invalid MDC connection handle "LPX64"\n", - sbi->ll_mdc_conn.cookie); + sbi->ll_mdc_exp->exp_handle.h_cookie); EXIT; return; } obd->obd_no_recov = 1; - obd_iocontrol(IOC_OSC_SET_ACTIVE, &sbi->ll_mdc_conn, sizeof ioc_data, + obd_iocontrol(IOC_OSC_SET_ACTIVE, sbi->ll_mdc_exp, sizeof ioc_data, &ioc_data, NULL); - obd = class_conn2obd(&sbi->ll_osc_conn); + obd = class_exp2obd(sbi->ll_osc_exp); if (obd == NULL) { CERROR("Invalid LOV connection handle "LPX64"\n", - sbi->ll_osc_conn.cookie); + sbi->ll_osc_exp->exp_handle.h_cookie); EXIT; return; } obd->obd_no_recov = 1; - obd_iocontrol(IOC_OSC_SET_ACTIVE, &sbi->ll_osc_conn, sizeof ioc_data, + obd_iocontrol(IOC_OSC_SET_ACTIVE, sbi->ll_osc_exp, sizeof ioc_data, &ioc_data, NULL); /* Really, we'd like to wait until there are no requests outstanding, diff --git a/lustre/llite/rw24.c b/lustre/llite/rw24.c new file mode 100644 index 0000000..ce1ea6c --- /dev/null +++ b/lustre/llite/rw24.c @@ -0,0 +1,183 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Lustre Lite I/O page cache for the 2.4 kernel generation + * + * Copyright (c) 2001-2003 Cluster File Systems, Inc. + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#define DEBUG_SUBSYSTEM S_LLITE + +#include +#include +#include "llite_internal.h" +#include + +/* called as the osc engine completes an rpc that included our ocp. + * the ocp itself holds a reference to the page and will drop it when + * the page is removed from the page cache. our job is simply to + * transfer rc into the page and unlock it */ +void ll_complete_writepage_24(struct obd_client_page *ocp, int rc) +{ + struct page *page = ocp->ocp_page; + + LASSERT(page->private == (unsigned long)ocp); + LASSERT(PageLocked(page)); + + if (rc != 0) { + CERROR("writeback error on page %p index %ld: %d\n", page, + page->index, rc); + SetPageError(page); + } + ocp->ocp_flags &= ~OCP_IO_READY; + unlock_page(page); + page_cache_release(page); +} + +static int ll_writepage_24(struct page *page) +{ + struct obd_client_page *ocp; + ENTRY; + + LASSERT(!PageDirty(page)); + LASSERT(PageLocked(page)); + LASSERT(page->private != 0); + + ocp = (struct obd_client_page *)page->private; + ocp->ocp_flags |= OCP_IO_READY; + page_cache_get(page); + + /* sadly, not all callers who writepage eventually call sync_page + * (ahem, kswapd) so we need to raise this page's priority + * immediately */ + RETURN(ll_sync_page(page)); +} + +static int ll_direct_IO_24(int rw, struct inode *inode, struct kiobuf *iobuf, + unsigned long blocknr, int blocksize) +{ + struct ll_inode_info *lli = ll_i2info(inode); + struct lov_stripe_md *lsm = lli->lli_smd; + struct brw_page *pga; + struct ptlrpc_request_set *set; + struct obdo oa; + int length, i, flags, rc = 0; + loff_t offset; + ENTRY; + + if (!lsm || !lsm->lsm_object_id) + RETURN(-EBADF); + + /* FIXME: io smaller than PAGE_SIZE is broken on ia64 */ + if ((iobuf->offset & (PAGE_SIZE - 1)) || + (iobuf->length & (PAGE_SIZE - 1))) + RETURN(-EINVAL); + + set = ptlrpc_prep_set(); + if (set == NULL) + RETURN(-ENOMEM); + + OBD_ALLOC(pga, sizeof(*pga) * iobuf->nr_pages); + if (!pga) { + ptlrpc_set_destroy(set); + RETURN(-ENOMEM); + } + + flags = (rw == WRITE ? OBD_BRW_CREATE : 0) /* | OBD_BRW_DIRECTIO */; + offset = ((obd_off)blocknr << inode->i_blkbits); + length = iobuf->length; + + for (i = 0, length = iobuf->length; length > 0; + length -= pga[i].count, offset += pga[i].count, i++) { /*i last!*/ + pga[i].pg = iobuf->maplist[i]; + pga[i].off = offset; + /* To the end of the page, or the length, whatever is less */ + pga[i].count = min_t(int, PAGE_SIZE - (offset & ~PAGE_MASK), + length); + pga[i].flag = flags; + if (rw == READ) { + //POISON(kmap(iobuf->maplist[i]), 0xc5, PAGE_SIZE); + //kunmap(iobuf->maplist[i]); + } + } + + oa.o_id = lsm->lsm_object_id; + oa.o_valid = OBD_MD_FLID; + obdo_from_inode(&oa, inode, OBD_MD_FLTYPE | OBD_MD_FLATIME | + OBD_MD_FLMTIME | OBD_MD_FLCTIME); + + if (rw == WRITE) + lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, + LPROC_LL_DIRECT_WRITE, iobuf->length); + else + lprocfs_counter_add(ll_i2sbi(inode)->ll_stats, + LPROC_LL_DIRECT_READ, iobuf->length); + rc = obd_brw_async(rw == WRITE ? OBD_BRW_WRITE : OBD_BRW_READ, + ll_i2obdexp(inode), &oa, lsm, iobuf->nr_pages, pga, + set, NULL); + if (rc) { + CDEBUG(rc == -ENOSPC ? D_INODE : D_ERROR, + "error from obd_brw_async: rc = %d\n", rc); + } else { + rc = ptlrpc_set_wait(set); + if (rc) + CERROR("error from callback: rc = %d\n", rc); + } + ptlrpc_set_destroy(set); + if (rc == 0) + rc = iobuf->length; + + OBD_FREE(pga, sizeof(*pga) * iobuf->nr_pages); + RETURN(rc); +} + +struct address_space_operations ll_aops = { + readpage: ll_readpage, + direct_IO: ll_direct_IO_24, + writepage: ll_writepage_24, +/* we shouldn't use this until we have a better story about sync_page + * and writepage completion racing. also, until we differentiate between + * writepage and syncpage it seems of little value to raise the priority + * twice*/ +// sync_page: ll_sync_page, + prepare_write: ll_prepare_write, + commit_write: ll_commit_write, + removepage: ll_removepage, + bmap: NULL +}; diff --git a/lustre/llite/rw26.c b/lustre/llite/rw26.c new file mode 100644 index 0000000..26e6531 --- /dev/null +++ b/lustre/llite/rw26.c @@ -0,0 +1,118 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Lustre Lite I/O page cache routines for the 2.5/2.6 kernel generation + * + * Copyright (c) 2001-2003 Cluster File Systems, Inc. + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define DEBUG_SUBSYSTEM S_LLITE + +#include +#include +#include "llite_internal.h" +#include + +/* called as the osc engine completes an rpc that included our ocp. + * the ocp itself holds a reference to the page and will drop it when + * the page is removed from the page cache. our job is simply to + * transfer rc into the page and unlock it */ +void ll_complete_writepage_26(struct obd_client_page *ocp, int rc) +{ + struct page *page = ocp->ocp_page; + + LASSERT(page->private == (unsigned long)ocp); + LASSERT(PageLocked(page)); + + if (rc != 0) { + CERROR("writeback error on page %p index %ld: %d\n", page, + page->index, rc); + SetPageError(page); + } + ocp->ocp_flags &= ~OCP_IO_READY; + + /* let everyone get at this page again.. I wonder if this ordering + * is corect */ + unlock_page(page); + end_page_writeback(page); + + page_cache_release(page); +} + +static int ll_writepage_26(struct page *page, struct writeback_control *wbc) +{ + struct obd_client_page *ocp; + ENTRY; + + LASSERT(!PageDirty(page)); + LASSERT(PageLocked(page)); + LASSERT(page->private != 0); + + ocp = (struct obd_client_page *)page->private; + ocp->ocp_flags |= OCP_IO_READY; + + page_cache_get(page); + + /* filemap_fdatawait() makes me think we need to set PageWriteback + * on pages that are in flight. But our ocp mechanics doesn't + * really expect a page to be on both the osc lru and in flight. + * so for now, we don't unlock the page.. dirtiers whill wait + * for io to complete */ + SetPageWriteback(page); + + /* sadly, not all callers who writepage eventually call sync_page + * (ahem, kswapd) so we need to raise this page's priority + * immediately */ + RETURN(ll_sync_page(page)); +} + +struct address_space_operations ll_aops = { + readpage: ll_readpage, +// readpages: ll_readpages, +// direct_IO: ll_direct_IO_26, + writepage: ll_writepage_26, + writepages: generic_writepages, + set_page_dirty: __set_page_dirty_nobuffers, + sync_page: block_sync_page, + prepare_write: ll_prepare_write, + commit_write: ll_commit_write, + bmap: NULL +}; diff --git a/lustre/mdc/Makefile.mk b/lustre/mdc/Makefile.mk index b12e5fc..a93f1cf 100644 --- a/lustre/mdc/Makefile.mk +++ b/lustre/mdc/Makefile.mk @@ -6,4 +6,4 @@ include $(src)/../portals/Kernelenv obj-y += mdc.o -mdc-objs := mdc_request.o mdc_reint.o lproc_mdc.o mdc_lib.o +mdc-objs := mdc_locks.o mdc_request.o mdc_reint.o lproc_mdc.o mdc_lib.o diff --git a/lustre/osc/osc_internal.h b/lustre/osc/osc_internal.h new file mode 100644 index 0000000..46b3b2d --- /dev/null +++ b/lustre/osc/osc_internal.h @@ -0,0 +1,17 @@ +int osc_create(struct obd_export *exp, struct obdo *oa, + struct lov_stripe_md **ea, struct obd_trans_info *oti); +int osc_real_create(struct obd_export *exp, struct obdo *oa, + struct lov_stripe_md **ea, struct obd_trans_info *oti); +int osccd_setup(void); +int osccd_cleanup(void); +void oscc_init(struct lustre_handle *exph); + +int lproc_osc_attach_seqstat(struct obd_device *dev); +extern atomic_t osc_max_rpcs_in_flight; +extern atomic_t osc_max_pages_per_rpc; +int osc_rpcd_addref(void); +int osc_rpcd_decref(void); +void lproc_osc_hist(struct osc_histogram *oh, unsigned int value); +void lproc_osc_hist_pow2(struct osc_histogram *oh, unsigned int value); +int lproc_osc_attach_seqstat(struct obd_device *dev); +void osc_rpcd_add_req(struct ptlrpc_request *req); diff --git a/lustre/osc/osc_rpcd.c b/lustre/osc/osc_rpcd.c new file mode 100644 index 0000000..0d36ad6 --- /dev/null +++ b/lustre/osc/osc_rpcd.c @@ -0,0 +1,223 @@ +/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- + * vim:expandtab:shiftwidth=8:tabstop=8: + * + * Copyright (C) 2001-2003 Cluster File Systems, Inc. + * Author Peter Braam + * + * This file is part of Lustre, http://www.lustre.org. + * + * Lustre is free software; you can redistribute it and/or + * modify it under the terms of version 2 of the GNU General Public + * License as published by the Free Software Foundation. + * + * Lustre is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with Lustre; if not, write to the Free Software + * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. + * + * For testing and management it is treated as an obd_device, + * although * it does not export a full OBD method table (the + * requests are coming * in over the wire, so object target modules + * do not have a full * method table.) + * + */ + +#define EXPORT_SYMTAB +#define DEBUG_SUBSYSTEM S_OSC + +#ifdef __KERNEL__ +# include +# include +# include +# include +# include +# if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)) +# include +# include +# else +# include +# endif +#else /* __KERNEL__ */ +# include +#endif + +#include +#include /* for mds_objid */ +#include +#include +#include +#include + +#ifndef __CYGWIN__ +# include +# include +#else +# include +#endif + +#include +#include /* for OBD_FAIL_CHECK */ +#include /* for PTL_MD_MAX_IOV */ +#include + +#define LIOD_STOP 0 +static struct osc_rpcd_ctl { + unsigned long orc_flags; + spinlock_t orc_lock; + struct completion orc_starting; + struct completion orc_finishing; + struct list_head orc_req_list; + wait_queue_head_t orc_waitq; + struct ptlrpc_request_set *orc_set; +} osc_orc; + +static DECLARE_MUTEX(osc_rpcd_sem); +static int osc_rpcd_users = 0; + +void osc_rpcd_add_req(struct ptlrpc_request *req) +{ + struct osc_rpcd_ctl *orc = &osc_orc; + + ptlrpc_set_add_new_req(orc->orc_set, req); + wake_up(&orc->orc_waitq); +} + +static int osc_rpcd_check(struct osc_rpcd_ctl *orc) +{ + struct list_head *tmp, *pos; + struct ptlrpc_request *req; + unsigned long flags; + int rc = 0; + ENTRY; + + if (test_bit(LIOD_STOP, &orc->orc_flags)) + RETURN(1); + + spin_lock_irqsave(&orc->orc_set->set_new_req_lock, flags); + list_for_each_safe(pos, tmp, &orc->orc_set->set_new_requests) { + req = list_entry(pos, struct ptlrpc_request, rq_set_chain); + list_del_init(&req->rq_set_chain); + ptlrpc_set_add_req(orc->orc_set, req); + } + spin_unlock_irqrestore(&orc->orc_set->set_new_req_lock, flags); + + if (orc->orc_set->set_remaining) { + rc = ptlrpc_check_set(orc->orc_set); + + /* XXX our set never completes, so we prune the completed + * reqs after each iteration. boy could this be smarter. */ + list_for_each_safe(pos, tmp, &orc->orc_set->set_requests) { + req = list_entry(pos, struct ptlrpc_request, + rq_set_chain); + if (req->rq_phase != RQ_PHASE_COMPLETE) + continue; + + list_del_init(&req->rq_set_chain); + req->rq_set = NULL; + ptlrpc_req_finished (req); + } + } + + RETURN(rc); +} + +/* ptlrpc's code paths like to execute in process context, so we have this + * thread which spins on a set which contains the io rpcs. llite specifies + * osc_rpcd's set when it pushes pages down into the oscs */ +static int osc_rpcd(void *arg) +{ + struct osc_rpcd_ctl *orc = arg; + unsigned long flags; + ENTRY; + + kportal_daemonize("liod_writeback"); + + SIGNAL_MASK_LOCK(current, flags); + sigfillset(¤t->blocked); + RECALC_SIGPENDING; + SIGNAL_MASK_UNLOCK(current, flags); + + complete(&orc->orc_starting); + + /* like kswapd */ + current->flags |= PF_MEMALLOC; + + /* this mainloop strongly resembles ptlrpc_set_wait except + * that our set never completes. osc_rpcd_check calls ptlrpc_check_set + * when there are requests in the set. new requests come in + * on the set's new_req_list and osc_rpcd_check moves them into + * the set. */ + while (1) { + wait_queue_t set_wait; + struct l_wait_info lwi; + int timeout; + + timeout = ptlrpc_set_next_timeout(orc->orc_set); + /* XXX the interrupted thing isn't really functional. */ + lwi = LWI_TIMEOUT_INTR(timeout * HZ, ptlrpc_expired_set, + ptlrpc_interrupted_set, orc->orc_set); + + /* ala the pinger, wait on orc's waitqueue and the set's */ + init_waitqueue_entry(&set_wait, current); + add_wait_queue(&orc->orc_set->set_waitq, &set_wait); + l_wait_event(orc->orc_waitq, osc_rpcd_check(orc), &lwi); + remove_wait_queue(&orc->orc_set->set_waitq, &set_wait); + + if (test_bit(LIOD_STOP, &orc->orc_flags)) + break; + } + /* XXX should be making sure we don't have anything in flight */ + complete(&orc->orc_finishing); + return 0; +} + +int osc_rpcd_addref(void) +{ + struct osc_rpcd_ctl *orc = &osc_orc; + int rc = 0; + ENTRY; + + down(&osc_rpcd_sem); + if (++osc_rpcd_users != 1) + GOTO(out, rc); + + memset(orc, 0, sizeof(*orc)); + init_completion(&orc->orc_starting); + init_completion(&orc->orc_finishing); + init_waitqueue_head(&orc->orc_waitq); + orc->orc_flags = 0; + spin_lock_init(&orc->orc_lock); + INIT_LIST_HEAD(&orc->orc_req_list); + + orc->orc_set = ptlrpc_prep_set(); + if (orc->orc_set == NULL) + GOTO(out, rc = -ENOMEM); + + if (kernel_thread(osc_rpcd, orc, 0) < 0) { + ptlrpc_set_destroy(orc->orc_set); + GOTO(out, rc = -ECHILD); + } + + wait_for_completion(&orc->orc_starting); +out: + up(&osc_rpcd_sem); + RETURN(rc); +} + +void osc_rpcd_decref(void) +{ + struct osc_rpcd_ctl *orc = &osc_orc; + + down(&osc_rpcd_sem); + if (--osc_rpcd_users == 0) { + set_bit(LIOD_STOP, &orc->orc_flags); + wake_up(&orc->orc_waitq); + wait_for_completion(&orc->orc_finishing); + ptlrpc_set_destroy(orc->orc_set); + } + up(&osc_rpcd_sem); +} -- 1.8.3.1